o
    ei5                     @   s2  d Z ddlZddlZddlZddlZddlZddlZddlmZ e	e
ZG dd deZG dd deZG dd	 d	eZG d
d deZG dd deZG dd deZG dd deZG dd deZG dd deZG dd deZG dd deZG dd deZG dd dejjZG dd deZdS ) a,  Implements `AMQPConnectionWorkflow` - the default workflow of performing
multiple TCP/[SSL]/AMQP connection attempts with timeouts and retries until one
succeeds or all attempts fail.

Defines the interface `AbstractAMQPConnectionWorkflow` that facilitates
implementing custom connection workflows.

    N)__version__c                   @      e Zd ZdZdS )AMQPConnectorExceptionzBase exception for this moduleN__name__
__module____qualname____doc__ r
   r
   f/var/www/html/pca-backend/venv/lib/python3.10/site-packages/pika/adapters/utils/connection_workflow.pyr          r   c                   @   r   )AMQPConnectorStackTimeoutz:Overall TCP/[SSL]/AMQP stack connection attempt timed out.Nr   r
   r
   r
   r   r      r   r   c                   @   r   )AMQPConnectorAbortedz Asynchronous request was abortedNr   r
   r
   r
   r   r      r   r   c                   @   r   )AMQPConnectorWrongStatezjAMQPConnector operation requested in wrong state, such as aborting after
    completion was reported.
    Nr   r
   r
   r
   r   r   "   r   r   c                       (   e Zd ZdZ fddZdd Z  ZS )AMQPConnectorPhaseErrorBasezMWrapper for exception that occurred during a particular bring-up phase.

    c                    s   t t| j|  || _dS )z

        :param BaseException exception: error that occurred while waiting for a
            subclass-specific protocol bring-up phase to complete.
        :param args: args for parent class
        N)superr   __init__	exception)selfr   args	__class__r
   r   r   -   s   
z$AMQPConnectorPhaseErrorBase.__init__c                 C   s   d | jj| jS )Nz{}: {!r})formatr   r   r   r   r
   r
   r   __repr__7   s   z$AMQPConnectorPhaseErrorBase.__repr__r   r   r   r	   r   r   __classcell__r
   r
   r   r   r   (       
r   c                   @   r   )AMQPConnectorSocketConnectErrorz*Error connecting TCP socket to remote peerNr   r
   r
   r
   r   r   ;   r   r   c                   @   r   ) AMQPConnectorTransportSetupErrorzOError setting up transport after TCP connected but before AMQP handshake.

    Nr   r
   r
   r
   r   r    ?   r   r    c                   @   r   )AMQPConnectorAMQPHandshakeErrorzError during AMQP handshakeNr   r
   r
   r
   r   r!   E   r   r!   c                   @   r   )AMQPConnectionWorkflowAbortedz%AMQP Connection workflow was aborted.Nr   r
   r
   r
   r   r"   I   r   r"   c                   @   r   ) AMQPConnectionWorkflowWrongStatezuAMQP Connection Workflow operation requested in wrong state, such as
    aborting after completion was reported.
    Nr   r
   r
   r
   r   r#   M   r   r#   c                       r   )AMQPConnectionWorkflowFailedz5Indicates that AMQP connection workflow failed.

    c                    s   t t| j|  t|| _dS )z
        :param sequence exceptions: Exceptions that occurred during the
            workflow.
        :param args: args to pass to base class

        N)r   r$   r   tuple
exceptions)r   r&   r   r   r
   r   r   X   s   z%AMQPConnectionWorkflowFailed.__init__c                 C   s:   d | jjt| j| jd t| jdkr| jd S d S )NzG{}: {} exceptions in all; last exception - {!r}; first exception - {!r}   r   )r   r   r   lenr&   r   r
   r
   r   r   b   s   z%AMQPConnectionWorkflowFailed.__repr__r   r
   r
   r   r   r$   S   r   r$   c                   @   s   e Zd ZdZdZdZdZdZdZdZ	dZ
d	d
 Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd ddZdS )!AMQPConnectorz;Performs a single TCP/[SSL]/AMQP connection workflow.

    r   r(                  c                 C   sH   || _ || _d| _d| _d| _d| _d| _d| _d| _d| _	| j
| _dS )a  

        :param callable conn_factory: A function that takes
            `pika.connection.Parameters` as its only arg and returns a brand new
            `pika.connection.Connection`-based adapter instance each time it is
            called. The factory must instantiate the connection with
            `internal_connection_workflow=False`.
        :param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:

        N)_conn_factory_nbio_addr_record_conn_params_on_done_tcp_timeout_ref_stack_timeout_ref	_task_ref_sock
_amqp_conn_STATE_INIT_state)r   conn_factorynbior
   r
   r   r   w   s   zAMQPConnector.__init__c                 C   s
  | j | jkrtd| j || _|| _|| _| j| _ tj| jdd  | _	| j	
tjjtjd tj| jj| j	 | j	d | jd }tdt| | jj| j	|| jd| _d| _| jjdurm| j| jj| j| _d| _| jjdur| j| jj| j | _dS dS )	a  Asynchronously perform a single TCP/[SSL]/AMQP connection attempt.

        :param tuple addr_record: a single resolved address record compatible
            with `socket.getaddrinfo()` format.
        :param pika.connection.Parameters conn_params:
        :param callable on_done: Function to call upon completion of the
            workflow: `on_done(pika.connection.Connection | BaseException)`. If
            exception, it's going to be one of the following:
                `AMQPConnectorSocketConnectError`
                `AMQPConnectorTransportSetupError`
                `AMQPConnectorAMQPHandshakeError`
                `AMQPConnectorAborted`

        )Already in progress or finished; state={}Nr,   r(   Fr-   z Pika version %s connecting to %r)on_done)!r;   r:   r   r   r2   r3   r4   
_STATE_TCPsocketr8   
setsockoptpikacompatSOL_TCPTCP_NODELAYtcp_socket_optsset_sock_optstcp_optionssetblocking_LOGinfor   r1   connect_socket_on_tcp_connection_doner7   r5   socket_timeout
call_later_on_tcp_connection_timeoutr6   stack_timeout_on_overall_timeout)r   addr_recordconn_paramsr?   addrr
   r
   r   start   s>   




zAMQPConnector.startc                 C   s   | j | jkr
td| j | jkrtd| j| _ |   td| jj	| j
 | jdu r>td | jt| jt  dS | jjsPtd | jdd dS td	 | j | jkscJ d
| j dS )a  Abort the workflow asynchronously. The completion callback will be
        called with an instance of AMQPConnectorAborted.

        NOTE: we can't cancel/close synchronously because aborting pika
        Connection and its transport requires an asynchronous operation.

        :raises AMQPConnectorWrongState: If called after completion has been
            reported or the workflow not started yet.
        Cannot abort before starting.*Cannot abort after completion was reportedzCAMQPConnector: beginning client-initiated asynchronous abort; %r/%sNzXAMQPConnector.abort(): no connection, so just scheduling completion report via I/O loop.z*AMQPConnector.abort(): closing Connection.@  z3Client-initiated abort of AMQP Connection Workflow.zCAMQPConnector.abort(): closing of Connection was already initiated.z9Connection is closing, but not in TIMEOUT state; state={})r;   r:   r   _STATE_DONE_STATE_ABORTING_deactivaterK   rL   r3   hostr2   r9   debugr1   add_callback_threadsafe	functoolspartial_report_completion_and_cleanupr   
is_closingclose_STATE_TIMEOUTr   r   r
   r
   r   abort   s6   





zAMQPConnector.abortc                 C   sF   |    | jdur| j  d| _d| _d| _d| _d| _| j| _dS )zqCancel asynchronous tasks and clean up to assist garbage collection.

        Transition to STATE_DONE.

        N)	r]   r8   re   r0   r1   r2   r4   r[   r;   r   r
   r
   r   _close   s   

zAMQPConnector._closec                 C   sp   | j du sJ d| j| jdur| j  d| _| jdur'| j  d| _| jdur6| j  d| _dS dS )$Cancel asynchronous tasks.

        Nz:_deactivate called with self._amqp_conn not None; state={})r9   r   r;   r5   cancelr6   r7   r   r
   r
   r   r]     s   






zAMQPConnector._deactivatec                 C   >   t |trtd| ntd| | j}|   || dS )zClean up and invoke client's `on_done` callback.

        :param pika.connection.Connection | BaseException result: value to pass
            to user's `on_done` callback.
        z%AMQPConnector - reporting failure: %rz%AMQPConnector - reporting success: %rN
isinstanceBaseExceptionrK   errorrL   r4   rh   r   resultr?   r
   r
   r   rc        
z,AMQPConnector._report_completion_and_cleanupc                 C   s0   d| _ ttd| jj| j}| | dS )ztHandle TCP connection timeout

        Reports AMQPConnectorSocketConnectError with socket.timeout inside.

        Nz)TCP connection attempt timed out: {!r}/{})	r5   r   rA   timeoutr   r3   r^   r2   rc   )r   ro   r
   r
   r   rQ   ,  s   
z(AMQPConnector._on_tcp_connection_timeoutc                 C   s   d| _ | j}| j| _|| jkr;d| jj| jt| jj	}t
| | jjr.J d| j| jjs9| jd| dS || jkrNttd| jj| j}n|| jksUJ ttd| jj| jt| jj	}| | dS )a  Handle overall TCP/[SSL]/AMQP connection attempt timeout by reporting
        `Timeout` error to the client.

        Reports AMQPConnectorSocketConnectError if timeout occurred during
            socket TCP connection attempt.
        Reports AMQPConnectorTransportSetupError if timeout occurred during
            tramsport [SSL] setup attempt.
        Reports AMQPConnectorAMQPHandshakeError if timeout occurred during
            AMQP handshake.

        Nz0Timeout while setting up AMQP to {!r}/{}; ssl={}zUnexpected open state of {!r}rZ   z*Timeout while connecting socket to {!r}/{}z5Timeout while setting up transport to {!r}/{}; ssl={})r6   r;   rf   _STATE_AMQPr   r3   r^   r2   boolssl_optionsrK   ro   r9   is_openrd   re   r@   r   r   _STATE_TRANSPORTr    rc   )r   
prev_statemsgro   r
   r
   r   rS   9  sB   








z!AMQPConnector._on_overall_timeoutc                 C   s   d| _ | jdur| j  d| _|dur%td|| j | t| dS td| j	 | j
| _d }}| jjdurL| jjj}| jjj}|du rL| jj}| jjt| j| j| j	||| jd| _ d| _	dS )a  Handle completion of asynchronous socket connection attempt.

        Reports AMQPConnectorSocketConnectError if TCP socket connection
            failed.

        :param None|BaseException exc: None on success; exception object on
            failure

        Nz*TCP Connection attempt failed: %r; dest=%rz)TCP connection to broker established: %r.)protocol_factorysockssl_contextserver_hostnamer?   )r7   r5   rj   rK   ro   r2   rc   r   r_   r8   rx   r;   r3   rv   contextr~   r^   r1   create_streaming_connectionra   rb   r0    _on_transport_establishment_done)r   excr}   r~   r
   r
   r   rN   g  s<   





z%AMQPConnector._on_tcp_connection_donec                 C   s   d| _ t|tr!td|| jj| jt| jj	 | 
t| dS td| |\}| _| j| _| jj| jdd | j| j dS )aQ  Handle asynchronous completion of
        `AbstractIOServices.create_streaming_connection()`

        Reports AMQPConnectorTransportSetupError if transport ([SSL]) setup
            failed.

        :param sequence|BaseException result: On success, a two-tuple
            (transport, protocol); on failure, exception instance.

        NzCAttempt to create the streaming transport failed: %r; %r/%s; ssl=%sz"Streaming transport linked up: %r.T)remove_default)r7   rm   rn   rK   ro   r3   r^   r2   ru   rv   rc   r    rL   r9   rt   r;   add_on_open_error_callback_on_amqp_handshake_doneadd_on_open_callback)r   rq   
_transportr
   r
   r   r     s$   

z.AMQPConnector._on_transport_establishment_doneNc                 C   s   t d| j|| jj| j d| _| j| jkrt }nL| j| j	kr3t
td| jj| jt| jj}n3| j| jkr[|du rKt d| jj| j| |}nt d| jj| j| t
|}nt d| j|| dS | | dS )a  Handle completion of AMQP connection handshake attempt.

        NOTE: we handle two types of callbacks - success with just connection
        arg as well as the open-error callback with connection and error

        Reports AMQPConnectorAMQPHandshakeError if AMQP handshake failed.

        :param pika.connection.Connection connection:
        :param BaseException | None error: None on success, otherwise
            failure

        zJAMQPConnector: AMQP handshake attempt completed; state=%s; error=%r; %r/%sNz,Timeout during AMQP handshake{!r}/{}; ssl={}z8AMQPConnector: AMQP connection established for %r/%s: %rz=AMQPConnector: AMQP connection handshake failed for %r/%s: %rzgAMQPConnector: Ignoring AMQP handshake completion notification due to wrong state=%s; error=%r; conn=%r)rK   r_   r;   r3   r^   r2   r9   r\   r   rf   r!   r   r   ru   rv   rt   rc   )r   
connectionro   rq   r
   r
   r   r     sH   



z%AMQPConnector._on_amqp_handshake_done)N)r   r   r   r	   r:   r@   rx   rt   rf   r\   r[   r   rW   rg   rh   r]   rc   rQ   rS   rN   r   r   r
   r
   r
   r   r*   j   s(    1..-"r*   c                   @   s    e Zd ZdZdd Zdd ZdS )AbstractAMQPConnectionWorkflowzMInterface for implementing a custom TCP/[SSL]/AMQP connection workflow.

    c                 C      t )a  Asynchronously perform the workflow until success or all retries
        are exhausted. Called by the adapter.

        :param sequence connection_configs: A sequence of one or more
            `pika.connection.Parameters`-based objects. Will attempt to connect
            using each config in the given order.
        :param callable connector_factory: call it without args to obtain a new
            instance of `AMQPConnector` for each connection attempt.
            See `AMQPConnector` for details.
        :param native_loop: Native I/O loop passed by app to the adapter or
            obtained by the adapter by default.
        :param callable on_done: Function to call upon completion of the
            workflow:
            `on_done(pika.connection.Connection |
                     AMQPConnectionWorkflowFailed |
                     AMQPConnectionWorkflowAborted)`.
            `Connection`-based adapter on success,
            `AMQPConnectionWorkflowFailed` on failure,
            `AMQPConnectionWorkflowAborted` if workflow was aborted.

        :raises AMQPConnectionWorkflowWrongState: If called in wrong state, such
            as after starting the workflow.
        NotImplementedError)r   connection_configsconnector_factorynative_loopr?   r
   r
   r   rW     s   z$AbstractAMQPConnectionWorkflow.startc                 C   r   )a  Abort the workflow asynchronously. The completion callback will be
        called with an instance of AMQPConnectionWorkflowAborted.

        NOTE: we can't cancel/close synchronously because aborting pika
        Connection and its transport requires an asynchronous operation.

        :raises AMQPConnectionWorkflowWrongState: If called in wrong state, such
            as before starting or after completion has been reported.
        r   r   r
   r
   r   rg     s   
z$AbstractAMQPConnectionWorkflow.abortN)r   r   r   r	   rW   rg   r
   r
   r
   r   r     s    r   c                   @   s   e Zd ZdZejZejZdZ	dZ
dZdZd ddZd	d
 Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd ZdS )!AMQPConnectionWorkflowa  Implements Pika's default workflow for performing multiple TCP/[SSL]/AMQP
    connection attempts with timeouts and retries until one succeeds or all
    attempts fail.

    The workflow:
        while not success and retries remain:
            1. For each given config (pika.connection.Parameters object):
                A. Perform DNS resolution of the config's host.
                B. Attempt to establish TCP/[SSL]/AMQP for each resolved address
                   until one succeeds, in which case we're done.
            2. If all configs failed but retries remain, resume from beginning
               after the given retry pause. NOTE: failure of DNS resolution
               is equivalent to one cycle and will be retried after the pause
               if retries remain.

    r   r(   r+   r,   Fc                 C   sT   d| _ d| _|| _d| _d| _d| _d| _d| _d| _d| _	d| _
g | _| j| _dS )a  
        :param int | float retry_pause: Non-negative number of seconds to wait
            before retrying the config sequence. Meaningful only if retries is
            greater than 0. Defaults to 2 seconds.
        :param bool _until_first_amqp_attempt: INTERNAL USE ONLY; ends workflow
            after first AMQP handshake attempt, regardless of outcome (success
            or failure). The automatic connection logic in
            `pika.connection.Connection` enables this because it's not
            designed/tested to reset all state properly to handle more than one
            AMQP handshake attempt.

        TODO: Do we need getaddrinfo timeout?
        TODO: Would it be useful to implement exponential back-off?

        N)_attempts_remaining_retry_pause_until_first_amqp_attemptr1   _current_config_index_connection_configs_connector_factoryr4   
_connectorr7   _addrinfo_iter_connection_errorsr:   r;   )r   r   r
   r
   r   r   3  s   zAMQPConnectionWorkflow.__init__c                 C   s
   || _ dS )a  Called by the conneciton adapter only on pika's
        `AMQPConnectionWorkflow` instance to provide it the adapter-specific
        `AbstractIOServices` object before calling the `start()` method.

        NOTE: Custom workflow implementations should use the native I/O loop
        directly because `AbstractIOServices` is private to Pika
        implementation and its interface may change without notice.

        :param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:

        N)r1   )r   r=   r
   r
   r   set_io_services]  s   
z&AMQPConnectionWorkflow.set_io_servicesc              
   C   s   | j | jkrtd| j zt| W n ty' } ztd|d}~ww |s1td||| _|| _	|| _
|d j| _|d j| _| j| _ td | jdtj| jdd	| _dS )
af  Override `AbstractAMQPConnectionWorkflow.start()`.

        NOTE: This implementation uses `connection_attempts` and `retry_delay`
        values from the last element of the given `connection_configs` sequence
        as the overall number of connection attempts of the entire
        `connection_configs` sequence and pause between each sequence.

        r>   z3connection_configs does not support iteration: {!r}Nz"connection_configs is empty: {!r}.r'   z1Starting AMQP Connection workflow asynchronously.r   Tfirst)r;   r:   r   r   iter	Exception	TypeError
ValueErrorr   r   r4   connection_attemptsr   retry_delayr   _STATE_ACTIVErK   r_   r1   rP   ra   rb   _start_new_cycle_asyncr7   )r   r   r   r   r?   ro   r
   r
   r   rW   k  s8   


zAMQPConnectionWorkflow.startc                 C   s   | j | jkr
td| j | jkrtd| j| _ |   td | jdu r9t	d | j
t| jt  dS t	d | j  dS )z<Override `AbstractAMQPConnectionWorkflow.abort()`.

        rX   rY   zFAMQPConnectionWorkflow: beginning client-initiated asynchronous abort.Nz`AMQPConnectionWorkflow.abort(): no connector, so just scheduling completion report via I/O loop.z=AMQPConnectionWorkflow.abort(): requesting connector.abort().)r;   r:   r   r[   r\   r]   rK   rL   r   r_   r1   r`   ra   rb   rc   r"   rg   r   r
   r
   r   rg     s$   



zAMQPConnectionWorkflow.abortc                 C   s>   |    d| _d| _d| _d| _d| _d| _d| _| j| _	dS )zrCancel asynchronous tasks and clean up to assist garbage collection.

        Transition to _STATE_DONE.

        N)
r]   r   r1   r   r4   r   r   r   r[   r;   r   r
   r
   r   rh     s   zAMQPConnectionWorkflow._closec                 C   s"   | j dur| j   d| _ dS dS )ri   N)r7   rj   r   r
   r
   r   r]     s   


z"AMQPConnectionWorkflow._deactivatec                 C   rk   )zClean up and invoke client's `on_done` callback.

        :param pika.connection.Connection | AMQPConnectionWorkflowFailed result:
            value to pass to user's `on_done` callback.
        z.AMQPConnectionWorkflow - reporting failure: %rz.AMQPConnectionWorkflow - reporting success: %rNrl   rp   r
   r
   r   rc     rr   z5AMQPConnectionWorkflow._report_completion_and_cleanupc                 C   s   d| _ | jdksJ | j| jdkr$t| j}td| | | dS |  jd8  _td| j d| _| j	
|r<dn| j| j| _ dS )aQ  Start a new workflow cycle (if any more attempts are left) beginning
        with the first Parameters object in self._connection_configs. If out of
        attempts, report `AMQPConnectionWorkflowFailed`.

        :param bool first: if True, don't delay; otherwise delay next attempt by
            `self._retry_pause` seconds.
        Nr   z$AMQP connection workflow failed: %r.r(   zQBeginning a new AMQP connection workflow cycle; attempts remaining after this: %s)r7   r   r$   r   rK   ro   rc   r_   r   r1   rP   r   _try_next_config_async)r   r   ro   r
   r
   r   r     s    



z-AMQPConnectionWorkflow._start_new_cycle_asyncc                 C   s   d| _ | jdu rd| _n|  jd7  _| jt| jkr(td | jdd dS | j| j }td|j|j | j du s>J | j	j
|j|j| j| j| jd| _ dS )	zwAttempt to connect using the next Parameters config. If there are no
        more configs, start a new cycle.

        Nr   r(   z-_try_next_config_async: starting a new cycle.Fr   z_try_next_config_async: %r:%s)r^   portsocktypeprotor?   )r7   r   r)   r   rK   r_   r   r^   r   r1   getaddrinfo
_SOCK_TYPE_IPPROTO_on_getaddrinfo_async_done)r   paramsr
   r
   r   r     s$   

z-AMQPConnectionWorkflow._try_next_config_asyncc                 C   s^   d| _ t|trtd| | j| | jdd dS tdt	| t
|| _|   dS )zHandles completion callback from asynchronous `getaddrinfo()`.

        :param list | BaseException addrinfos_or_exc: resolved address records
            returned by `getaddrinfo()` or an exception object from failure.
        Nzgetaddrinfo failed: %r.Fr   zgetaddrinfo returned %s records)r7   rm   rn   rK   ro   r   appendr   r_   r)   r   r   _try_next_resolved_address)r   addrinfos_or_excr
   r
   r   r     s   

z1AMQPConnectionWorkflow._on_getaddrinfo_async_donec                 C   sl   zt | j}W n ty   td |   Y dS w td| |  | _| jj|| j	| j
 | jd dS )z}Try connecting using next resolved address. If there aren't any left,
        continue with next Parameters config.

        z8_try_next_resolved_address: continuing with next config.Nz-Attempting to connect using address record %r)rT   rU   r?   )nextr   StopIterationrK   r_   r   r   r   rW   r   r   _on_connector_done)r   rT   r
   r
   r   r   )  s    


z1AMQPConnectionWorkflow._try_next_resolved_addressc                 C   s   d| _ td| t|trY| j| t|tr/| j| j	ks'J d
| j| t  dS | jrSt|trStd t|jtjjrGt}nt| j}| | dS |   dS | | dS )zHandle completion of connection attempt by `AMQPConnector`.

        :param pika.connection.Connection | BaseException conn_or_exc: See
            `AMQPConnector.start()` for exception details.

        Nz$Connection attempt completed with %rz&Expected _STATE_ABORTING, but got {!r}zcEnding AMQP connection workflow after first failed AMQP handshake due to _until_first_amqp_attempt.)r   rK   r_   rm   rn   r   r   r   r;   r\   r   rc   r"   r   r!   r   rC   r&   ConnectionOpenAbortedr$   r   )r   conn_or_excro   r
   r
   r   r   ?  s2   



z)AMQPConnectionWorkflow._on_connector_doneN)F)r   r   r   r	   rA   SOCK_STREAMr   IPPROTO_TCPr   r:   r   r\   r[   r   r   rW   rg   rh   r]   rc   r   r   r   r   r   r
   r
   r
   r   r     s(    
*,r   )r	   ra   loggingrA   pika.compatrC   pika.exceptionspika.tcp_socket_optsr   	getLoggerr   rK   r   r   r   r   r   r   r   r    r!   r"   r#   r$   objectr*   rD   AbstractBaser   r   r
   r
   r
   r   <module>   s4    	
   -