o
    1li_A                     @   s  d Z ddlZddlZddlZddlZddlZzddlZW n ey)   ddlZY nw ddl	Z	ddl
Z	ddlZ	ddlZddlmZ ddlmZ ddlmZmZ ddlmZmZ eeZG dd deZG d	d
 d
eZG dd deZG dd deZG dd deZ G dd deZ!dS )z Use pika with the Gevent IOLoop.    N)BaseConnection)check_callback_arg)AbstractIOReferenceAbstractIOServices)AbstractSelectorIOLoopSelectorIOServicesAdapterc                       s@   e Zd ZdZ						d fdd	Ze		d	ddZ  ZS )
GeventConnectionzwImplementation of pika's ``BaseConnection``.

    An async selector-based connection which integrates with Gevent.
    NTc                    sX   t jjrtd|ptt }t|tr|}nt	|}t
t| j||||||d dS )a  Create a new GeventConnection instance and connect to RabbitMQ on
        Gevent's event-loop.

        :param pika.connection.Parameters|None parameters: The connection
            parameters
        :param callable|None on_open_callback: The method to call when the
            connection is open
        :param callable|None on_open_error_callback: Called if the connection
            can't be established or connection establishment is interrupted by
            `Connection.close()`:
            on_open_error_callback(Connection, exception)
        :param callable|None on_close_callback: Called when a previously fully
            open connection is closed:
            `on_close_callback(Connection, exception)`, where `exception` is
            either an instance of `exceptions.ConnectionClosed` if closed by
            user or broker or exception of another type that describes the
            cause of connection failure
        :param gevent._interfaces.ILoop|nbio_interface.AbstractIOServices|None
            custom_ioloop: Use a custom Gevent ILoop.
        :param bool internal_connection_workflow: True for autonomous connection
            establishment which is default; False for externally-managed
            connection workflow via the `create_connection()` factory
        z-GeventConnection is not supported on Windows.)internal_connection_workflowN)pikacompat
ON_WINDOWSRuntimeError_GeventSelectorIOLoopgeventget_hub
isinstancer    _GeventSelectorIOServicesAdaptersuperr   __init__)self
parameterson_open_callbackon_open_error_callbackon_close_callbackcustom_ioloopr	   nbio	__class__ o/var/www/html/tatsat2dev/dashboard-backend/venv/lib/python3.10/site-packages/pika/adapters/gevent_connection.pyr   '   s    



zGeventConnection.__init__c                    s:   |pt t }t| fdd} j||||dS )z_Implement
        :py:classmethod::`pika.adapters.BaseConnection.create_connection()`.
        c                    s   | du rt d | ddS )zConnection factory.NzIExpected pika.connection.Parameters instance, but got None in params arg.F)r   r   r	   )
ValueError)paramsclsr   r   r   connection_factoryf   s   z>GeventConnection.create_connection.<locals>.connection_factory)connection_configsr$   r   workflowon_done)r   r   r   r   _start_connection_workflow)r#   r%   r'   r   r&   r$   r   r"   r   create_connectionX   s   	
	z"GeventConnection.create_connection)NNNNNT)NN)__name__
__module____qualname____doc__r   classmethodr)   __classcell__r   r   r   r   r   !   s    1r   c                   @   s4   e Zd ZdZdd Zedd Zdd Zdd	 Zd
S )_TSafeCallbackQueueziDispatch callbacks from any thread to be executed in the main thread
    efficiently with IO events.
    c                 C   s(   t  | _t \| _| _t | _	dS )zQ
        :param _GeventSelectorIOLoop loop: IO loop to add callbacks to.
        N)
queueQueue_queueospipe_read_fd	_write_fd	threadingRLock_write_lockr   r   r   r   r   |   s   
z_TSafeCallbackQueue.__init__c                 C   s   | j S )z?The file-descriptor to register for READ events in the IO loop.)r6   r;   r   r   r   fd   s   z_TSafeCallbackQueue.fdc                 C   sF   | j | | j t| jd W d   dS 1 sw   Y  dS )zAdd an item to the queue from any thread. The configured handler
        will be invoked with the item in the main thread.

        :param item: Object to add to the queue.
           N)r3   putr:   r4   writer7   r   callbackr   r   r   add_callback_threadsafe   s   "z+_TSafeCallbackQueue.add_callback_threadsafec                 C   sH   z| j  }W n tjy   td Y dS w t| jd |  dS )a  Invoke the next callback from the queue.

        MUST run in the main thread. If no callback was added to the queue,
        this will block the IO loop.

        Performs a blocking READ on the pipe so must only be called when the
        pipe is ready for reading.
        zCallback queue was empty.   N)	r3   
get_nowaitr1   EmptyLOGGERwarningr4   readr6   r@   r   r   r   run_next_callback   s   	
z%_TSafeCallbackQueue.run_next_callbackN)	r*   r+   r,   r-   r   propertyr<   rB   rI   r   r   r   r   r0   w   s    
r0   c                   @   sn   e Zd ZdZdZdZdZdddZdd	 Zd
d Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd ZdS )r   zImplementation of `AbstractSelectorIOLoop` using the Gevent event loop.

    Required by implementations of `SelectorIOServicesAdapter`.
    rC      r   Nc                    sL   |pt   _i  _t j  _t  _ fdd} 	 jj
| j dS )z>
        :param gevent._interfaces.ILoop gevent_loop:
        c                    s   ~ ~ j   dS )z$Swallow the fd and events arguments.N)_callback_queuerI   )r<   eventsr;   r   r   run_callback_in_main_thread   s   zC_GeventSelectorIOLoop.__init__.<locals>.run_callback_in_main_threadN)r   r   _hub_io_watchers_by_fdhubWaiter_waiterr0   rL   add_handlerr<   READ)r   
gevent_hubrN   r   r;   r   r      s   z_GeventSelectorIOLoop.__init__c                 C   s   | j j  d| _ dS )zRelease the loop's resources.N)rO   loopdestroyr;   r   r   r   close   s   
z_GeventSelectorIOLoop.closec                 C   s,   t d | j  t d | j  dS )zNRun the I/O loop. It will loop until requested to exit. See `stop()`.
        z"Passing control to Gevent's IOLoopz,Control was passed back from Gevent's IOLoopN)rF   debugrS   getclearr;   r   r   r   start   s   


z_GeventSelectorIOLoop.startc                 C   s   | j d dS )a$  Request exit from the ioloop. The loop is NOT guaranteed to
        stop before this method returns.

        To invoke `stop()` safely from a thread other than this IOLoop's thread,
        call it via `add_callback_threadsafe`; e.g.,

            `ioloop.add_callback(ioloop.stop)`
        N)rS   switchr;   r   r   r   stop   s   	z_GeventSelectorIOLoop.stopc                 C   sV   t  | jkrtd | jj| dS td t| jjj|}| j	
| dS )a  Requests a call to the given function as soon as possible in the
        context of this IOLoop's thread.

        NOTE: This is the only thread-safe method in IOLoop. All other
        manipulations of IOLoop must be performed from the IOLoop's thread.

        For example, a thread may request a call to the `stop` method of an
        ioloop that is running in a different thread via
        `ioloop.add_callback_threadsafe(ioloop.stop)`

        :param callable callback: The callback method
        z Adding callback from main threadz#Adding callback from another threadN)r   r   rO   rF   rZ   rW   run_callback	functoolspartialrL   rB   r@   r   r   r   add_callback   s   

z"_GeventSelectorIOLoop.add_callbackc                 C   s   | j j|}|| |S )a  Add the callback to the IOLoop timer to be called after delay seconds
        from the time of call on best-effort basis. Returns a handle to the
        timeout.

        :param float delay: The number of seconds to wait to call callback
        :param callable callback: The callback method
        :returns: handle to the created timeout that may be passed to
            `remove_timeout()`
        :rtype: object
        )rO   rW   timerr]   )r   delayrA   rd   r   r   r   
call_later   s   
z _GeventSelectorIOLoop.call_laterc                 C   s   |   dS )zURemove a timeout

        :param timeout_handle: Handle of timeout to remove
        N)rY   )r   timeout_handler   r   r   remove_timeout  s   z$_GeventSelectorIOLoop.remove_timeoutc                 C   s,   | j j||}|| j|< |||| dS )a  Start watching the given file descriptor for events

        :param int fd: The file descriptor
        :param callable handler: When requested event(s) occur,
            `handler(fd, events)` will be called.
        :param int events: The event mask (READ|WRITE)
        N)rO   rW   iorP   r]   )r   r<   handlerrM   
io_watcherr   r   r   rT     s   
z!_GeventSelectorIOLoop.add_handlerc                 C   s2   | j | }|j}|  | j |= | ||| dS )zChange the events being watched for.

        :param int fd: The file descriptor
        :param int events: The new event mask (READ|WRITE)
        N)rP   rA   rY   rT   )r   r<   rM   rk   rA   r   r   r   update_handler!  s
   
z$_GeventSelectorIOLoop.update_handlerc                 C   s   | j | }|  | j |= dS )zgStop watching the given file descriptor for events

        :param int fd: The file descriptor
        N)rP   rY   )r   r<   rk   r   r   r   remove_handler/  s   
z$_GeventSelectorIOLoop.remove_handler)N)r*   r+   r,   r-   rU   WRITEERRORr   rY   r]   r_   rc   rf   rh   rT   rl   rm   r   r   r   r   r      s    
	r   c                   @   s"   e Zd ZdZ				dddZdS )r   zESelectorIOServicesAdapter implementation using Gevent's DNS resolver.r   c           	   
   C   s*   t | j|||||||d}|  t|S )zOImplement :py:meth:`.nbio_interface.AbstractIOServices.getaddrinfo()`.
        )native_loophostportfamilysocktypeprotoflagsr'   )_GeventAddressResolver_loopr]   _GeventIOLoopIOHandle)	r   rq   rr   r'   rs   rt   ru   rv   resolverr   r   r   getaddrinfo<  s   
z,_GeventSelectorIOServicesAdapter.getaddrinfoN)r   r   r   r   )r*   r+   r,   r-   r{   r   r   r   r   r   9  s    r   c                   @   s    e Zd ZdZdd Zdd ZdS )ry   zXImplement `AbstractIOReference`.

    Only used to wrap the _GeventAddressResolver.
    c                 C   s   |j | _dS )zY
        :param subject: subject of the reference containing a `cancel()` method
        N)cancel_cancel)r   subjectr   r   r   r   Y  s   z_GeventIOLoopIOHandle.__init__c                 C   s   |   S )zCancel pending operation

        :returns: False if was already done or cancelled; True otherwise
        :rtype: bool
        )r}   r;   r   r   r   r|   _  s   z_GeventIOLoopIOHandle.cancelN)r*   r+   r,   r-   r   r|   r   r   r   r   ry   S  s    ry   c                   @   sL   e Zd ZdZdZdd Zdd Zdd Zd	d
 Zdd Z	dd Z
dd ZdS )rw   zPerforms getaddrinfo asynchronously Gevent's configured resolver in a
    separate greenlet and invoking the provided callback with the result.

    See: http://www.gevent.org/dns.html
    )	rx   _on_done	_greenlet_ga_host_ga_port
_ga_family_ga_socktype	_ga_proto	_ga_flagsc	           	      C   sD   t |d || _|| _d| _|| _|| _|| _|| _|| _|| _	dS )a  Initialize the `_GeventAddressResolver`.

        :param AbstractSelectorIOLoop native_loop:
        :param host: `see socket.getaddrinfo()`
        :param port: `see socket.getaddrinfo()`
        :param family: `see socket.getaddrinfo()`
        :param socktype: `see socket.getaddrinfo()`
        :param proto: `see socket.getaddrinfo()`
        :param flags: `see socket.getaddrinfo()`
        :param on_done: on_done(records|BaseException) callback for reporting
            result from the given I/O loop. The single arg will be either an
            exception object (check for `BaseException`) in case of failure or
            the result returned by `socket.getaddrinfo()`.
        r'   N)
r   rx   r   r   r   r   r   r   r   r   )	r   rp   rq   rr   rs   rt   ru   rv   r'   r   r   r   r   z  s   

z_GeventAddressResolver.__init__c                 C   s*   | j du rt| j| _ dS td dS )z-Start an asynchronous getaddrinfo invocation.Nz&_GeventAddressResolver already started)r   r   	spawn_raw_resolverF   rG   r;   r   r   r   r]     s   
z_GeventAddressResolver.startc                 C   s&   d}| j durd}|   |   |S )zCancel the pending resolver.FNT)r   _stop_greenlet_cleanup)r   changedr   r   r   r|     s   
z_GeventAddressResolver.cancelc                 C   s   |    d| _d| _dS )z,Stop the resolver and release any resources.N)r   rx   r   r;   r   r   r   r     s   
z_GeventAddressResolver._cleanupc                 C   s$   | j durt| j  d| _ dS dS )zbStop the greenlet performing getaddrinfo if running.

        Otherwise, this is a no-op.
        N)r   r   killr;   r   r   r   r     s   

z%_GeventAddressResolver._stop_greenletc              
   C   sz   zt j| j| j| j| j| j| j}W n t	y- } zt
d| |}W Y d}~nd}~ww t| j|}| j| dS )zoCall `getaddrinfo()` and return result via user's callback
        function on the configured IO loop.
        zAddress resolution failed: %rN)r   socketr{   r   r   r   r   r   r   	ExceptionrF   errorra   rb   _dispatch_callbackrx   rc   )r   resultexcrA   r   r   r   r     s   z_GeventAddressResolver._resolvec                 C   s2   zt d| j | | W |   dS |   w )zInvoke the configured completion callback and any subsequent cleanup.

        :param result: result from getaddrinfo, or the exception if raised.
        z9Invoking async getaddrinfo() completion callback; host=%rN)rF   rZ   r   r   r   )r   r   r   r   r   r     s   z)_GeventAddressResolver._dispatch_callbackN)r*   r+   r,   r-   	__slots__r   r]   r|   r   r   r   r   r   r   r   r   rw   h  s    	rw   )"r-   ra   loggingr4   r8   weakrefr1   ImportErrorr2   r   
gevent.hubgevent.socketpika.compatr
   pika.adapters.base_connectionr   %pika.adapters.utils.io_services_utilsr   "pika.adapters.utils.nbio_interfacer   r   +pika.adapters.utils.selector_ioloop_adapterr   r   	getLoggerr*   rF   r   objectr0   r   r   ry   rw   r   r   r   r   <module>   s6    
V5 