o
    ei$                     @   s   d Z ddlZddlZddlZddlmZ ddlmZmZ e	e
Zejdkr-ee  G dd dejZG dd	 d	ejejejejZG d
d dejZG dd dejZdS )z#Use pika with the Asyncio EventLoop    N)base_connection)nbio_interfaceio_services_utilswin32c                       s@   e Zd ZdZ						d fdd	Ze		d	ddZ  ZS )
AsyncioConnectionz; The AsyncioConnection runs on the Asyncio EventLoop.

    NTc                    s6   t |tjr	|}nt|}t j||||||d dS )a   Create a new instance of the AsyncioConnection class, connecting
        to RabbitMQ automatically

        :param pika.connection.Parameters parameters: Connection parameters
        :param callable on_open_callback: The method to call when the connection
            is open
        :param None | method on_open_error_callback: Called if the connection
            can't be established or connection establishment is interrupted by
            `Connection.close()`: on_open_error_callback(Connection, exception).
        :param None | method on_close_callback: Called when a previously fully
            open connection is closed:
            `on_close_callback(Connection, exception)`, where `exception` is
            either an instance of `exceptions.ConnectionClosed` if closed by
            user or broker or exception of another type that describes the cause
            of connection failure.
        :param None | asyncio.AbstractEventLoop |
            nbio_interface.AbstractIOServices custom_ioloop:
                Defaults to asyncio.get_event_loop().
        :param bool internal_connection_workflow: True for autonomous connection
            establishment which is default; False for externally-managed
            connection workflow via the `create_connection()` factory.

        )internal_connection_workflowN)
isinstancer   AbstractIOServices_AsyncioIOServicesAdaptersuper__init__)self
parameterson_open_callbackon_open_error_callbackon_close_callbackcustom_ioloopr   nbio	__class__ _/var/www/html/pca-backend/venv/lib/python3.10/site-packages/pika/adapters/asyncio_connection.pyr      s   
zAsyncioConnection.__init__c                    s*   t | fdd} j||||dS )z`Implement
        :py:classmethod::`pika.adapters.BaseConnection.create_connection()`.

        c                    s   | du rt d | ddS )zConnection factory.NzIExpected pika.connection.Parameters instance, but got None in params arg.F)r   r   r   )
ValueError)paramsclsr   r   r   connection_factoryM   s   z?AsyncioConnection.create_connection.<locals>.connection_factory)connection_configsr   r   workflowon_done)r
   _start_connection_workflow)r   r   r   r   r   r   r   r   r   create_connectionA   s   

z#AsyncioConnection.create_connection)NNNNNT)NN)__name__
__module____qualname____doc__r   classmethodr!   __classcell__r   r   r   r   r      s    +r   c                   @   s   e Zd ZdZdddZdd Zdd Zd	d
 Zdd Zdd Z	dd Z
				dddZdd Zdd Zdd Zdd Zdd ZdS ) r
   zImplements
    :py:class:`.utils.nbio_interface.AbstractIOServices` interface
    on top of `asyncio`.

    NOTE:
    :py:class:`.utils.nbio_interface.AbstractFileDescriptorServices`
    interface is only required by the mixins.

    Nc                 C   s   |pt  | _dS )z{
        :param asyncio.AbstractEventLoop | None loop: If None, gets default
            event loop from asyncio.

        N)asyncioget_event_loop_loop)r   loopr   r   r   r   m   s   z"_AsyncioIOServicesAdapter.__init__c                 C   s   | j S )zdImplement
        :py:meth:`.utils.nbio_interface.AbstractIOServices.get_native_ioloop()`.

        )r*   r   r   r   r   get_native_ioloopu   s   z+_AsyncioIOServicesAdapter.get_native_ioloopc                 C      | j   dS )zXImplement
        :py:meth:`.utils.nbio_interface.AbstractIOServices.close()`.

        N)r*   closer,   r   r   r   r/   |   s   z_AsyncioIOServicesAdapter.closec                 C   r.   )zNImplement :py:meth:`.utils.nbio_interface.AbstractIOServices.run()`.

        N)r*   run_foreverr,   r   r   r   run      z_AsyncioIOServicesAdapter.runc                 C   r.   )zOImplement :py:meth:`.utils.nbio_interface.AbstractIOServices.stop()`.

        N)r*   stopr,   r   r   r   r3      r2   z_AsyncioIOServicesAdapter.stopc                 C   s   | j | dS )zjImplement
        :py:meth:`.utils.nbio_interface.AbstractIOServices.add_callback_threadsafe()`.

        N)r*   call_soon_threadsafe)r   callbackr   r   r   add_callback_threadsafe   s   z1_AsyncioIOServicesAdapter.add_callback_threadsafec                 C   s   t | j||S )z]Implement
        :py:meth:`.utils.nbio_interface.AbstractIOServices.call_later()`.

        )_TimerHandler*   
call_later)r   delayr5   r   r   r   r8      s   z$_AsyncioIOServicesAdapter.call_laterr   c              
   C   s    |  | jj||||||d|S )z^Implement
        :py:meth:`.utils.nbio_interface.AbstractIOServices.getaddrinfo()`.

        )familytypeprotoflags)_schedule_and_wrap_in_io_refr*   getaddrinfo)r   hostportr   r:   socktyper<   r=   r   r   r   r?      s   z%_AsyncioIOServicesAdapter.getaddrinfoc                 C      | j || td| dS )ziImplement
        :py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.set_reader()`.

        zset_reader(%s, _)N)r*   
add_readerLOGGERdebug)r   fdon_readabler   r   r   
set_reader      z$_AsyncioIOServicesAdapter.set_readerc                 C      t d| | j|S )zlImplement
        :py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.remove_reader()`.

        zremove_reader(%s))rE   rF   r*   remove_readerr   rG   r   r   r   rL         z'_AsyncioIOServicesAdapter.remove_readerc                 C   rC   )ziImplement
        :py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.set_writer()`.

        zset_writer(%s, _)N)r*   
add_writerrE   rF   )r   rG   on_writabler   r   r   
set_writer   rJ   z$_AsyncioIOServicesAdapter.set_writerc                 C   rK   )zlImplement
        :py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.remove_writer()`.

        zremove_writer(%s))rE   rF   r*   remove_writerrM   r   r   r   rR      rN   z'_AsyncioIOServicesAdapter.remove_writerc                 C   s,   t |std|ttj|| jd|S )a  Schedule the coroutine to run and return _AsyncioIOReference

        :param coroutine-obj coro:
        :param callable on_done: user callback that takes the completion result
            or exception as its only arg. It will not be called if the operation
            was cancelled.
        :rtype: _AsyncioIOReference which is derived from
            nbio_interface.AbstractIOReference

        *on_done arg must be callable, but got {!r})r+   )callable	TypeErrorformat_AsyncioIOReferencer(   ensure_futurer*   )r   coror   r   r   r   r>      s   z6_AsyncioIOServicesAdapter._schedule_and_wrap_in_io_refN)r   r   r   r   )r"   r#   r$   r%   r   r-   r/   r1   r3   r6   r8   r?   rI   rL   rQ   rR   r>   r   r   r   r   r
   _   s&    


r
   c                   @       e Zd ZdZdd Zdd ZdS )r7   zJThis module's adaptation of `nbio_interface.AbstractTimerReference`.

    c                 C   s
   || _ dS )z0

        :param asyncio.Handle handle:
        N)_handle)r   handler   r   r   r      s   
z_TimerHandle.__init__c                 C   s"   | j d ur| j   d | _ d S d S rZ   )r\   cancelr,   r   r   r   r^      s   


z_TimerHandle.cancelNr"   r#   r$   r%   r   r^   r   r   r   r   r7      s    r7   c                   @   r[   )rW   zGThis module's adaptation of `nbio_interface.AbstractIOReference`.

    c                    s6   t  std || _ fdd}|| dS )z
        :param asyncio.Future future:
        :param callable on_done: user callback that takes the completion result
            or exception as its only arg. It will not be called if the operation
            was cancelled.

        rS   c                    s$   |   s |  p|   dS dS )z3Handle completion callback from the future instanceN)	cancelled	exceptionresult)futurer   r   r   on_done_adapter
  s   z5_AsyncioIOReference.__init__.<locals>.on_done_adapterN)rT   rU   rV   _futureadd_done_callback)r   rc   r   re   r   rd   r   r      s   z_AsyncioIOReference.__init__c                 C   s
   | j  S )zCancel pending operation

        :returns: False if was already done or cancelled; True otherwise
        :rtype: bool

        )rf   r^   r,   r   r   r   r^     s   
z_AsyncioIOReference.cancelNr_   r   r   r   r   rW      s    rW   )r%   r(   loggingsyspika.adaptersr   pika.adapters.utilsr   r   	getLoggerr"   rE   platformset_event_loop_policyWindowsSelectorEventLoopPolicyBaseConnectionr   SocketConnectionMixinStreamingConnectionMixinr	   AbstractFileDescriptorServicesr
   AbstractTimerReferencer7   AbstractIOReferencerW   r   r   r   r   <module>   s$    

N 