o
    eiF                     @   s>  d Z ddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlmZmZ ddlZddlZejejfZejejfZeeZejeZdd Zdd Zdd	 ZG d
d deZ G dd deZ!G dd deZ"G dd deZ#G dd deZ$G dd deZ%G dd de%Z&G dd de%Z'dS )z^Utilities for implementing `nbio_interface.AbstractIOServices` for
pika connection adapters.

    N)AbstractIOReferenceAbstractStreamTransportc                 C   s   t | std|| dS )zRaise TypeError if callback is not callable

    :param callback: callback to check
    :param name: Name to include in exception text
    :raises TypeError:

    z!{} must be callable, but got {!r}N)callable	TypeErrorformat)callbackname r	   d/var/www/html/pca-backend/venv/lib/python3.10/site-packages/pika/adapters/utils/io_services_utils.pycheck_callback_arg,   s
   r   c                 C   s   t | tjstd| dS )zqRaise TypeError if file descriptor is not an integer

    :param fd: file descriptor
    :raises TypeError:

    z0Paramter must be a file descriptor, but got {!r}N)
isinstancenumbersIntegralr   r   )fdr	   r	   r
   check_fd_arg9   s
   r   c                    s   t   fdd}|S )z0Function decorator for retrying on SIGINT.

    c               
      sJ   	 z | i |W S  t jjy$ } z|jtjkrW Y d}~q  d}~ww )zWrapper for decorated functionTN)pikacompatSOCKET_ERRORerrnoEINTR)argskwargserrorfuncr	   r
   retry_sigint_wrapJ   s   z+_retry_on_sigint.<locals>.retry_sigint_wrap)	functoolswraps)r   r   r	   r   r
   _retry_on_sigintE   s   r   c                   @   s   e Zd ZdZdd ZdS )SocketConnectionMixinzImplements
    `pika.adapters.utils.nbio_interface.AbstractIOServices.connect_socket()`
    on top of
    `pika.adapters.utils.nbio_interface.AbstractFileDescriptorServices` and
    basic `pika.adapters.utils.nbio_interface.AbstractIOServices`.

    c                 C   s   t | |||d S )z[Implement
        :py:meth:`.nbio_interface.AbstractIOServices.connect_socket()`.

        )nbiosockresolved_addron_done)_AsyncSocketConnectorstart)selfr!   r"   r#   r	   r	   r
   connect_socketb   s   z$SocketConnectionMixin.connect_socketN)__name__
__module____qualname____doc__r'   r	   r	   r	   r
   r   Y   s    r   c                   @   s   e Zd ZdZ		dddZdS )StreamingConnectionMixinzImplements
    `.nbio_interface.AbstractIOServices.create_streaming_connection()` on
    top of `.nbio_interface.AbstractFileDescriptorServices` and basic
    `nbio_interface.AbstractIOServices` services.

    Nc                 C   s   zt | |||||d W S  ty@ } z'td|| z|  W   ty; } ztd|| W Y d}~ d}~ww d}~ww )zhImplement
        :py:meth:`.nbio_interface.AbstractIOServices.create_streaming_connection()`.

        )r    protocol_factoryr!   ssl_contextserver_hostnamer#   z*create_streaming_connection(%s) failed: %rz%s.close() failed: %rN)_AsyncStreamConnectorr%   	Exception_LOGGERr   close)r&   r-   r!   r#   r.   r/   r   r	   r	   r
   create_streaming_connectiont   s2   

z4StreamingConnectionMixin.create_streaming_connection)NN)r(   r)   r*   r+   r4   r	   r	   r	   r
   r,   l   s
    r,   c                   @   s    e Zd ZdZdd Zdd ZdS )_AsyncServiceAsyncHandlezGThis module's adaptation of `.nbio_interface.AbstractIOReference`

    c                 C   s   |j | _dS )zZ
        :param subject: subject of the reference containing a `cancel()` method

        N)cancel_cancel)r&   subjectr	   r	   r
   __init__   s   z!_AsyncServiceAsyncHandle.__init__c                 C   s   |   S )zCancel pending operation

        :returns: False if was already done or cancelled; True otherwise
        :rtype: bool

        )r7   r&   r	   r	   r
   r6      s   z_AsyncServiceAsyncHandle.cancelN)r(   r)   r*   r+   r9   r6   r	   r	   r	   r
   r5      s    r5   c                   @   sh   e Zd ZdZdZdZdZdZdd Ze	dd	 Z
d
d Zdd Ze	dd Ze	dd Ze	dd ZdS )r$   zConnects the given non-blocking socket asynchronously using
    `.nbio_interface.AbstractFileDescriptorServices` and basic
    `.nbio_interface.AbstractIOServices`. Used for implementing
    `.nbio_interface.AbstractIOServices.connect_socket()`.
    r            c              
   C   s   t |d zt|j|d  W n- ty= } z!ttds#td nd|||}t	| t
|W Y d}~nd}~ww || _|| _|| _|| _| j| _d| _dS )a  
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:
        :param socket.socket sock: non-blocking socket that needs to be
            connected via `socket.socket.connect()`
        :param tuple resolved_addr: resolved destination address/port two-tuple
            which is compatible with the given's socket's address family
        :param callable on_done: user callback that takes None upon successful
            completion or exception upon error (check for `BaseException`) as
            its only arg. It will not be called if the operation was cancelled.
        :raises ValueError: if host portion of `resolved_addr` is not an IP
            address or is inconsistent with the socket's address family as
            validated via `socket.inet_pton()`
        r#   r   	inet_ptonz8Unable to check resolved address: no socket.inet_pton().z9Invalid or unresolved IP address {!r} for socket {}: {!r}NF)r   socketr>   familyr1   hasattrr2   debugr   r   
ValueError_nbio_sock_addr_on_done_STATE_NOT_STARTED_state_watching_socket_events)r&   r    r!   r"   r#   r   msgr	   r	   r
   r9      s0   



z_AsyncSocketConnector.__init__c                 C   s&   | j rd| _ | j| j  dS dS )z'Remove socket watcher, if any

        FN)rJ   rD   remove_writerrE   filenor:   r	   r	   r
   _cleanup   s   z_AsyncSocketConnector._cleanupc                 C   s8   | j | jksJ d| j f| j| _ | j| j t| S )zZStart asynchronous connection establishment.

        :rtype: AbstractIOReference
        z:_AsyncSocketConnector.start(): expected _STATE_NOT_STARTED)rI   rH   _STATE_ACTIVErD   add_callback_threadsafe_start_asyncr5   r:   r	   r	   r
   r%      s   z_AsyncSocketConnector.startc                 C   sH   | j | jkr| j| _ td| j| j |   dS td| j | j dS )Cancel pending connection request without calling user's completion
        callback.

        :returns: False if was already done or cancelled; True otherwise
        :rtype: bool

        z-User canceled connection request for %s to %sTzD_AsyncSocketConnector cancel requested when not ACTIVE: state=%s; %sF)rI   rO   _STATE_CANCELEDr2   rB   rE   rF   rN   r:   r	   r	   r
   r6      s   z_AsyncSocketConnector.cancelc                 C   sf   t d|| j t|ttdfsJ d|f| j| jks$J d| jf| j| _| 	  | 
| dS )zAdvance to COMPLETED state, remove socket watcher, and invoke user's
        completion callback.

        :param BaseException | None result: value to pass in user's callback

        z0_AsyncSocketConnector._report_completion(%r); %sNzP_AsyncSocketConnector._report_completion() expected exception or None as result.zF_AsyncSocketConnector._report_completion() expected _STATE_NOT_STARTED)r2   rB   rE   r   BaseExceptiontyperI   rO   _STATE_COMPLETEDrN   rG   r&   resultr	   r	   r
   _report_completion	  s   z(_AsyncSocketConnector._report_completionc              
   C   s   | j | jkrtd| j| j| j  dS z	| j| j W n9 ttj	j
fyU } z)t|tj	j
r5|jtv r5ntd| j| j| | | W Y d}~dS W Y d}~nd}~ww z| j| j | j W n  ty } ztd| j| | | W Y d}~dS d}~ww d| _td| j dS )zCalled as callback from I/O loop to kick-start the workflow, so it's
        safe to call user's completion callback from here, if needed

        zJAbandoning sock=%s connection establishment to %s due to inactive state=%sNz%s.connect(%s) failed: %rzasync.set_writer(%s) failed: %rTz/Connection-establishment is in progress for %s.)rI   rO   r2   rB   rE   rF   connectr1   r   r   r   r   r   (_CONNECTION_IN_PROGRESS_SOCK_ERROR_CODESr   rY   rD   
set_writerrM   _on_writable	exceptionrJ   r&   r   r	   r	   r
   rQ      sD   




z"_AsyncSocketConnector._start_asyncc                 C   s   | j | jkrtd| j| j  dS | jtjtj}|s&t	d| j d}nt
|}td| j|| tj||}| | dS )zwCalled when socket connects or fails to. Check for predicament and
        invoke user's completion callback.

        z_Socket connection-establishment event watcher called in inactive state (ignoring): %s; state=%sNzSocket connected: %sz+Socket failed to connect: %s; error=%s (%s))rI   rO   r2   r   rE   
getsockoptr?   
SOL_SOCKETSO_ERRORinfoosstrerrorr   r   r   rY   )r&   
error_coderX   	error_msgr	   r	   r
   r]   G  s"   
z"_AsyncSocketConnector._on_writableN)r(   r)   r*   r+   rH   rO   rS   rV   r9   _log_exceptionsrN   r%   r6   rY   rQ   r]   r	   r	   r	   r
   r$      s"    $


&r$   c                   @   st   e Zd ZdZdZdZdZdZdd Ze	dd	 Z
d
d Zdd Ze	dd Ze	dd Ze	dd Ze	dd ZdS )r0   zPerforms asynchronous SSL session establishment, if requested, on the
    already-connected socket and links the streaming transport to protocol.
    Used for implementing
    `.nbio_interface.AbstractIOServices.create_streaming_connection()`.

    r   r;   r<   r=   c              
   C   s   t |d t |d t|tdtjfstd||dur'|du r'tdz|  W n tyA } ztd||d}~ww || _	|| _
|| _|| _|| _|| _| j| _d| _dS )a  
        NOTE: We take ownership of the given socket upon successful completion
        of the constructor.

        See `AbstractIOServices.create_streaming_connection()` for detailed
        documentation of the corresponding args.

        :param AbstractIOServices | AbstractFileDescriptorServices nbio:
        :param callable protocol_factory:
        :param socket.socket sock:
        :param ssl.SSLContext | None ssl_context:
        :param str | None server_hostname:
        :param callable on_done:

        r-   r#   Nz8Expected ssl_context=None | ssl.SSLContext, but got {!r}z?Non-None server_hostname must not be passed without ssl contextzEExpected connected socket, but getpeername() failed: error={!r}; {}; F)r   r   rU   ssl
SSLContextrC   r   getpeernamer1   rD   _protocol_factoryrE   _ssl_context_server_hostnamerG   rH   rI   _watching_socket)r&   r    r-   r!   r.   r/   r#   r   r	   r	   r
   r9   p  s2   


z_AsyncStreamConnector.__init__c              
   C   s  t d| | jr&t d|| j d| _| j| j  | j| j  zQ|rNt d|| j z| j  W n* t	yM } z	t 
d|| j  d}~ww W d| _d| _d| _d| _d| _d| _dS W d| _d| _d| _d| _d| _d| _dS d| _d| _d| _d| _d| _d| _w )zeCancel pending async operations, if any

        :param bool close: close the socket if true
        z"_AsyncStreamConnector._cleanup(%r)z5_AsyncStreamConnector._cleanup(%r): removing RdWr; %sFz6_AsyncStreamConnector._cleanup(%r): closing socket; %sz"_sock.close() failed: error=%r; %sN)r2   rB   ro   rE   rD   remove_readerrM   rL   r3   r1   r^   rl   rm   rn   rG   )r&   r3   r   r	   r	   r
   rN     sX   

z_AsyncStreamConnector._cleanupc                 C   sF   t d| j | j| jksJ d| jf| j| _| j| j t	| S )zCKick off the workflow

        :rtype: AbstractIOReference
        z!_AsyncStreamConnector.start(); %sz9_AsyncStreamConnector.start() expected _STATE_NOT_STARTED)
r2   rB   rE   rI   rH   rO   rD   rP   rQ   r5   r:   r	   r	   r
   r%     s   z_AsyncStreamConnector.startc                 C   sH   | j | jkr| j| _ td| j | jdd dS td| j | j dS )rR   z%User canceled streaming linkup for %sTr3   zD_AsyncStreamConnector cancel requested when not ACTIVE: state=%s; %sF)rI   rO   rS   r2   rB   rE   rN   r:   r	   r	   r
   r6     s   z_AsyncStreamConnector.cancelc              
   C   s   t d|| j t|ttfsJ d|| jf| j| jks$J d| jf| j| _z$z| 	| W n t
y@   t d| j|  w W | jt|td dS | jt|td w )a  Advance to COMPLETED state, cancel async operation(s), and invoke
        user's completion callback.

        :param BaseException | tuple result: value to pass in user's callback.
            `tuple(transport, protocol)` on success, exception on error

        z0_AsyncStreamConnector._report_completion(%r); %szQ_AsyncStreamConnector._report_completion() expected exception or tuple as result.zA_AsyncStreamConnector._report_completion() expected _STATE_ACTIVEz%r: _on_done(%r) failed.rq   N)r2   rB   rE   r   rT   tuplerI   rO   rV   rG   r1   r^   rY   rN   rW   r	   r	   r
   rY     s,   	*z(_AsyncStreamConnector._report_completionc              
   C   s   t d| j | j| jkrt d| j| j dS | jdu r#|   dS t d| j z| jj| jddd| jd| _W n  t	yZ } zt 
d| j| | | W Y d}~dS d}~ww |   dS )zCalled as callback from I/O loop to kick-start the workflow, so it's
        safe to call user's completion callback from here if needed

        z(_AsyncStreamConnector._start_async(); %szMAbandoning streaming linkup due to inactive state transition; state=%s; %s; .NzStarting SSL handshake on %sF)server_sidedo_handshake_on_connectsuppress_ragged_eofsr/   zSSL wrap_socket(%s) failed: %r)r2   rB   rE   rI   rO   rm   _linkupwrap_socketrn   r1   r^   rY   _do_ssl_handshaker_   r	   r	   r
   rQ   
  s6   


z"_AsyncStreamConnector._start_asyncc              
   C   sj  t d d}zz|  }W n ty# } z	t d|| j  d}~ww | jdu rIz
t| j|| j}W n5 tyH } z	t d|| j  d}~ww z
t	| j|| j}W n tyh } z	t d|| j  d}~ww t d| z|
| W n ty } z
t d||| j  d}~ww t d|| W n ty } z|}W Y d}~n	d}~ww ||f}| | dS )	z}Connection is ready: instantiate and link up transport and protocol,
        and invoke user's completion callback.

        z_AsyncStreamConnector._linkup()Nz'protocol_factory() failed: error=%r; %sz%PlainTransport() failed: error=%r; %sz#SSLTransport() failed: error=%r; %sz_linkup(): created transport %rz1protocol.connection_made(%r) failed: error=%r; %sz2_linkup(): introduced transport to protocol %r; %r)r2   rB   rl   r1   r^   rE   rm   _AsyncPlaintextTransportrD   _AsyncSSLTransportconnection_maderY   )r&   	transportprotocolr   rX   r	   r	   r
   rv   0  sn   


z_AsyncStreamConnector._linkupc              
   C   s  t d | j| jkrt d| j| j dS d}zqz| j  W n^ tjy~ } zQ|jtj	krNt d| j d| _
| j| j | j | j| j  n&|jtjkrst d| j d| _
| j| j | j | j| j  n W Y d}~nd}~ww d}t d| j W n  ty } zt d	|| j | | W Y d}~dS d}~ww |rt d
| j | j| j  | j| j  d| _
t d| j |   dS dS )zJPerform asynchronous SSL handshake on the already wrapped socket

        z)_AsyncStreamConnector._do_ssl_handshake()z`_do_ssl_handshake: Abandoning streaming linkup due to inactive state transition; state=%s; %s; .NFzSSL handshake wants read; %s.TzSSL handshake wants write. %sz(SSL handshake completed successfully: %sz%SSL do_handshake failed: error=%r; %sz8_do_ssl_handshake: removing watchers ahead of linkup: %sz=_do_ssl_handshake: pre-linkup removal of watchers is done; %s)r2   rB   rI   rO   rE   do_handshakeri   SSLErrorr   SSL_ERROR_WANT_READro   rD   
set_readerrM   rx   rL   SSL_ERROR_WANT_WRITEr\   rp   rc   r1   r^   rY   rv   )r&   doner   r	   r	   r
   rx   j  sr   

z'_AsyncStreamConnector._do_ssl_handshakeN)r(   r)   r*   r+   rH   rO   rS   rV   r9   rh   rN   r%   r6   rY   rQ   rv   rx   r	   r	   r	   r
   r0   d  s&    0
"

%
9r0   c                   @   s   e Zd ZdZdZdZdZdZdZdZ	G dd	 d	e
Zd
d Zdd Zdd Zdd Zdd Zdd Zdd Zeedd Zeedd Zedd Zedd Zed d! Zed"d# Zd$S )%_AsyncTransportBasezIBase class for `_AsyncPlaintextTransport` and `_AsyncSSLTransport`.

    r;   r<   r=      i   i  c                       s    e Zd ZdZ fddZ  ZS )z_AsyncTransportBase.RxEndOfFilezNWe raise this internally when EOF (empty read) is detected on input.

        c                    s   t tj| dd d S )NzEnd of input stream (EOF))superr   RxEndOfFiler9   r:   	__class__r	   r
   r9     s   z(_AsyncTransportBase.RxEndOfFile.__init__)r(   r)   r*   r+   r9   __classcell__r	   r	   r   r
   r     s    r   c                 C   s:   t d| || _|| _|| _| j| _t | _	d| _
dS )a~  

        :param socket.socket | ssl.SSLSocket sock: connected socket
        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
            corresponding protocol in this transport/protocol pairing; the
            protocol already had its `connection_made()` method called.
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:

        z _AsyncTransportBase.__init__: %sr   N)r2   rB   rE   	_protocolrD   rO   rI   collectionsdeque_tx_buffers_tx_buffered_byte_countr&   r!   r}   r    r	   r	   r
   r9     s   


z_AsyncTransportBase.__init__c                 C   s    t d| j| j | d dS )a  Close connection abruptly without waiting for pending I/O to
        complete. Will invoke the corresponding protocol's `connection_lost()`
        method asynchronously (not in context of the abort() call).

        :raises Exception: Exception-based exception on error
        z+Aborting transport connection: state=%s; %sN)r2   rc   rI   rE   _initiate_abortr:   r	   r	   r
   abort  s   
z_AsyncTransportBase.abortc                 C      | j S )zReturn the protocol linked to this transport.

        :rtype: pika.adapters.utils.nbio_interface.AbstractStreamProtocol
        )r   r:   r	   r	   r
   get_protocol     z _AsyncTransportBase.get_protocolc                 C   r   )ze
        :returns: Current size of output data buffered by the transport
        :rtype: int
        )r   r:   r	   r	   r
   get_write_buffer_size  r   z)_AsyncTransportBase.get_write_buffer_sizec                 C   sh   |st d| j| j td|| j| jkr#t d| j| j dS | j	| |  j
t|7  _
dS )Buffer the given data until it can be sent asynchronously.

        :param bytes data:
        :raises ValueError: if called with empty data

        z,write() called with empty data: state=%s; %sz#write() called with empty data {!r};Ignoring write() called during inactive state: state=%s; %sN)r2   r   rI   rE   rC   r   rO   rB   r   appendr   len)r&   datar	   r	   r
   _buffer_tx_data  s   z#_AsyncTransportBase._buffer_tx_datac              
   C   s   d}| j | jkrS|| jk rU| | j| j}|t|7 }|s(td| j | 	 z| j
| W n tyE } z	td|| j  d}~ww | j | jkrW|| jk sdS dS dS dS )a  Utility method for use by subclasses to ingest data from socket and
        dispatch it to protocol's `data_received()` method socket-specific
        "try again" exception, per-event data consumption limit is reached,
        transport becomes inactive, or a fatal failure.

        Consumes up to `self._MAX_CONSUME_BYTES` to prevent event starvation or
        until state becomes inactive (e.g., `protocol.data_received()` callback
        aborts the transport)

        :raises: Whatever the corresponding `sock.recv()` raises except the
                 socket error with errno.EINTR
        :raises: Whatever the `protocol.data_received()` callback raises
        :raises _AsyncTransportBase.RxEndOfFile: upon shutdown of input stream

        r   zSocket EOF; %sz-protocol.data_received() failed: error=%r; %sN)rI   rO   _MAX_CONSUME_BYTES_sigint_safe_recvrE   _MAX_RECV_BYTESr   r2   r   r   r   data_receivedr1   r^   )r&   bytes_consumedr   r   r	   r	   r
   _consume  s.   
z_AsyncTransportBase._consumec                 C   s   | j rE| | j| j d }| j  }|t|k r+td|t| | j ||d  |  j|8  _| jdks@J d| j| j	f| j sdS dS )a  Utility method for use by subclasses to emit data from tx_buffers.
        This method sends chunks from `tx_buffers` until all chunks are
        exhausted or sending is interrupted by an exception. Maintains integrity
        of `self.tx_buffers`.

        :raises: whatever the corresponding `sock.send()` raises except the
                 socket error with errno.EINTR

        r   z/Partial send, requeing remaining data; %s of %sNz7_AsyncTransportBase._produce() tx buffer size underflow)
r   _sigint_safe_sendrE   popleftr   r2   rB   
appendleftr   rI   )r&   num_bytes_sentchunkr	   r	   r
   _produce(  s    

z_AsyncTransportBase._producec                 C   
   |  |S )am  Receive data from socket, retrying on SIGINT.

        :param sock: stream or SSL socket
        :param max_bytes: maximum number of bytes to receive
        :returns: received data or empty bytes uppon end of file
        :rtype: bytes
        :raises: whatever the corresponding `sock.recv()` raises except socket
                 error with errno.EINTR

        )recv)r!   	max_bytesr	   r	   r
   r   A     
z%_AsyncTransportBase._sigint_safe_recvc                 C   r   )a@  Send data to socket, retrying on SIGINT.

        :param sock: stream or SSL socket
        :param data: data bytes to send
        :returns: number of bytes actually sent
        :rtype: int
        :raises: whatever the corresponding `sock.send()` raises except socket
                 error with errno.EINTR

        )send)r!   r   r	   r	   r
   r   P  r   z%_AsyncTransportBase._sigint_safe_sendc                 C   sT   | j | jkr(td| j | j | j| j  | j| j  | j	
  dS dS )z2Unregister the transport from I/O events

        z$Deactivating transport: state=%s; %sN)rI   rO   r2   rc   rE   rD   rp   rM   rL   r   clearr:   r	   r	   r
   _deactivate_  s   
z_AsyncTransportBase._deactivatec                 C   st   | j | jkr8td| j | j z	| jtj W n tj	j
y#   Y nw | j  d| _d| _d| _| j| _ dS dS )z{Close the transport's socket and unlink the transport it from
        references to other assets (protocol, etc.)

        z4Closing transport socket and unlinking: state=%s; %sN)rI   rV   r2   rc   rE   shutdownr?   	SHUT_RDWRr   r   r   r3   r   rD   r:   r	   r	   r
   _close_and_finalizek  s   
z'_AsyncTransportBase._close_and_finalizec                 C   s   t d| j|| j | j| jksJ d| jf| j| jkrdS |   |du r9| j| jkr4t d dS | j| _n| j| jkrN| j| jksLJ d| jfdS | j	| _| j
t| j| dS )a  Initiate asynchronous abort of the transport that concludes with a
        call to the protocol's `connection_lost()` method. No flushing of
        output buffers will take place.

        :param BaseException | None error: None if being canceled by user,
            including via falsie return value from protocol.eof_received;
            otherwise the exception corresponding to the the failed connection.
        zo_AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=%s; error=%r; %szB_AsyncTransportBase._initate_abort() expected non-_STATE_COMPLETEDNzM_AsyncTransportBase._initiate_abort(): ignoring - user-abort already pending.zD_AsyncTransportBase._initate_abort() expected _STATE_ABORTED_BY_USER)r2   rc   rI   rE   rV   r   _STATE_ABORTED_BY_USERrB   rO   _STATE_FAILEDrD   rP   r   partial_connection_lost_notify_asyncr_   r	   r	   r
   r   ~  s4   



z#_AsyncTransportBase._initiate_abortc              
   C   s   t d| j| | j| jkrdS |dur)| j| jkr)| j| jks'J d| jfdS z&z| j| W n tyH } z
t 	d||| j
  d}~ww W |   dS |   w )a  Handle aborting of transport either due to socket error or user-
        initiated `abort()` call. Must be called from an I/O loop callback owned
        by us in order to avoid reentry into user code from user's API call into
        the transport.

        :param BaseException | None error: None if being canceled by user;
            otherwise the exception corresponding to the the failed connection.
        z1Concluding transport shutdown: state=%s; error=%rNzS_AsyncTransportBase._connection_lost_notify_async() expected _STATE_ABORTED_BY_USERz/protocol.connection_lost(%r) failed: exc=%r; %s)r2   rB   rI   rV   r   r   r   connection_lostr1   r^   rE   r   )r&   r   excr	   r	   r
   r     s,   
z1_AsyncTransportBase._connection_lost_notify_asyncN)r(   r)   r*   r+   rO   r   r   rV   r   r   OSErrorr   r9   r   r   r   r   r   r   staticmethodr   r   r   rh   r   r   r   r   r	   r	   r	   r
   r     s<    	%


4r   c                       s@   e Zd ZdZ fddZdd Zedd Zedd	 Z  Z	S )
ry   z`Implementation of `nbio_interface.AbstractStreamTransport` for a
    plaintext connection.

    c                    s.   t t| ||| | j| j | j dS )a{  

        :param socket.socket sock: non-blocking connected socket
        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
            corresponding protocol in this transport/protocol pairing; the
            protocol already had its `connection_made()` method called.
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:

        N)r   ry   r9   rD   r   rE   rM   _on_socket_readabler   r   r	   r
   r9     s   
z!_AsyncPlaintextTransport.__init__c                 C   s|   | j | jkrtd| j | j dS |sJ d|| j f|  dk}| | |r<| j| j	 | j
 td| j dS dS )r   r   Nz7_AsyncPlaintextTransport.write(): empty data from user.r   !Turned on writability watcher: %s)rI   rO   r2   rB   rE   r   r   rD   r\   rM   _on_socket_writabler&   r   tx_buffer_was_emptyr	   r	   r
   write  s   
z_AsyncPlaintextTransport.writec                 C   s  | j | jkrtd| j | j dS z|   W n | jyl   z| j }W n! t	yG } zt
d|| j | | W Y d}~Y dS d}~ww |r]td| j | j| j  Y dS td| j | d Y dS  t	tjjfy } z8t|tjjr|jtv rtd| j nt
d|| jdtjt   | | W Y d}~dS W Y d}~dS d}~ww | j | jkrtd	| j | j dS dS )
zIngest data from socket and dispatch it to protocol until exception
        occurs (typically EAGAIN or EWOULDBLOCK), per-event data consumption
        limit is reached, transport becomes inactive, or failure.

        EIgnoring readability notification due to inactive state: state=%s; %sNz,protocol.eof_received() failed: error=%r; %sz0protocol.eof_received() elected to keep open: %sz,protocol.eof_received() elected to close: %szRecv would block on %sa_AsyncBaseTransport._consume() failed, aborting connection: error=%r; sock=%s; Caller's stack:
%s z>Leaving Plaintext consumer due to inactive state: state=%s; %s)rI   rO   r2   rB   rE   r   r   r   eof_receivedr1   r^   r   rc   rD   rp   rM   r   r   r   r   r   _TRY_IO_AGAIN_SOCK_ERROR_CODESjoin	tracebackformat_exceptionsysexc_info)r&   	keep_openr   r	   r	   r
   r   	  sb   

z,_AsyncPlaintextTransport._on_socket_readablec                 C   s  | j | jkrtd| j | j dS | jsJ d| j fz|   W nH ttj	j
fyi } z8t|tj	j
rA|jtv rAtd| j ntd|| jdtjt   | | W Y d}~dS W Y d}~dS d}~ww | js| j| j  td| j dS dS )-Handle writable socket notification

        EIgnoring writability notification due to inactive state: state=%s; %sNzP_AsyncPlaintextTransport._on_socket_writable() called, but _tx_buffers is empty.zSend would block on %sa_AsyncBaseTransport._produce() failed, aborting connection: error=%r; sock=%s; Caller's stack:
%sr   z"Turned off writability watcher: %s)rI   rO   r2   rB   rE   r   r   r1   r   r   r   r   r   r   r^   r   r   r   r   r   r   rD   rL   rM   r_   r	   r	   r
   r   =  s<   

z,_AsyncPlaintextTransport._on_socket_writable)
r(   r)   r*   r+   r9   r   rh   r   r   r   r	   r	   r   r
   ry     s    
3ry   c                       s`   e Zd ZdZ fddZdd Zedd Zedd	 Ze fd
dZ	e fddZ
  ZS )rz   z\Implementation of `.nbio_interface.AbstractStreamTransport` for an SSL
    connection.

    c                    sJ   t t| ||| | j| _d| _| j| j	 | j
 | j| j
 dS )a{  

        :param ssl.SSLSocket sock: non-blocking connected socket
        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
            corresponding protocol in this transport/protocol pairing; the
            protocol already had its `connection_made()` method called.
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:

        N)r   rz   r9   r   _ssl_readable_action_ssl_writable_actionrD   r   rE   rM   r   rP   r   r   r	   r
   r9   g  s
   
z_AsyncSSLTransport.__init__c                 C   s   | j | jkrtd| j | j dS |sJ d|| j f|  dk}| | |rE| jdu rG| j| _| j	
| j | j td| j dS dS dS )r   r   Nz1_AsyncSSLTransport.write(): empty data from user.r   r   )rI   rO   r2   rB   rE   r   r   r   r   rD   r\   rM   r   r   r	   r	   r
   r   {  s    
z_AsyncSSLTransport.writec              
   C   ~   | j | jkrtd| j | j dS | jr4z|   W dS  ty3 } z| | W Y d}~dS d}~ww td| j| j dS )z+Handle readable socket indication

        r   Nz>SSL readable action was suppressed: ssl_writable_action=%r; %s)	rI   rO   r2   rB   rE   r   r1   r   r   r_   r	   r	   r
   r     $   z&_AsyncSSLTransport._on_socket_readablec              
   C   r   )r   r   Nz>SSL writable action was suppressed: ssl_readable_action=%r; %s)	rI   rO   r2   rB   rE   r   r1   r   r   r_   r	   r	   r
   r     r   z&_AsyncSSLTransport._on_socket_writablec                    s  d}z	t t|   W nC tjyN } z6|jtjkr"td| j	 n"|jtj
kr2td| j	 d}ntd|| j	dtjt    W Y d}~nd}~ww | j| jkr`td| j| j	 dS | j| j |r| jsw| j| j	 | j | j| _| j| jkr| j| j	  d| _n!| js| j| j	 | j | j| _| jr| j| j	  d| _| jr| js| j| _| j| j	 | j dS dS dS )	a  [override] Ingest data from socket and dispatch it to protocol until
        exception occurs (typically ssl.SSLError with
        SSL_ERROR_WANT_READ/WRITE), per-event data consumption limit is reached,
        transport becomes inactive, or failure.

        Update consumer/producer registration.

        :raises Exception: error that signals that connection needs to be
            aborted
        TzSSL ingester wants read: %szSSL ingester wants write: %sFr   r   Nz8Leaving SSL consumer due to inactive state: state=%s; %s) r   rz   r   ri   r   r   r   r2   rB   rE   r   r^   r   r   r   r   r   rI   rO   rD   rP   r   r   r   rM   r   rL   r\   r   rp   r   r   )r&   next_consume_on_readabler   r   r	   r
   r     s^   
z_AsyncSSLTransport._consumec                    sB  d}z	t t|   W nE tjyP } z8|jtjkr$td| j	 d}n"|jtj
kr4td| j	 d}ntd|| j	dtjt    W Y d}~nd}~ww | jr]J dt| jf| jr|duskJ d	| jf|r| js{| j| j	 | j | j| _| j| jkr| j| j	  d| _nb| js| j| j	 | j | j| _| jr| j| j	  d| _n@| j| jkr| j| j	  d| _| j| jksJ d
| jfn | j| jksJ dd| jd| jd| jfd| _| j| j	  | js| j| _| j| j	 | j | j| j dS | j	  r| j| j dS dS )aI  [override] Emit data from tx_buffers all chunks are exhausted or
        sending is interrupted by an exception (typically ssl.SSLError with
        SSL_ERROR_WANT_READ/WRITE).

        Update consumer/producer registration.

        :raises Exception: error that signals that connection needs to be
            aborted

        NzSSL emitter wants read: %sFzSSL emitter wants write: %sTr   r   z__AsyncSSLTransport._produce(): no exception from parent class, but data remains in _tx_buffers.zE_AsyncSSLTransport._produce(): next_produce_on_writable is still Nonezr_AsyncSSLTransport._produce(): with empty tx_buffers, writable_action cannot be _produce when readable is _producez_AsyncSSLTransport._produce(): with empty tx_buffers, expected writable_action as _produce when readable_action is not _producezwritable_action:zreadable_action:zstate:)!r   rz   r   ri   r   r   r   r2   rB   rE   r   r^   r   r   r   r   r   r   r   rI   r   rD   r\   rM   r   r   rp   r   r   rL   r   rP   pending)r&   next_produce_on_writabler   r   r	   r
   r     s   


z_AsyncSSLTransport._produce)r(   r)   r*   r+   r9   r   rh   r   r   r   r   r   r	   r	   r   r
   rz   a  s    

Hrz   )(r+   r   r   r   loggingr   rd   r?   ri   r   r   "pika.adapters.utils.nbio_interfacer   r   pika.compatr   pika.diagnostic_utilsEAGAINEWOULDBLOCKr   EINPROGRESSr[   	getLoggerr(   r2   diagnostic_utilscreate_log_exception_decoratorrh   r   r   r   objectr   r,   r5   r$   r0   r   ry   rz   r	   r	   r	   r
   <module>   sR    	
) :  
F  1 