o
    fi                      @   s&  d dl Z d dlZd dlZd dlZd dlmZ d dlZeddZeddZ	eddZ
ed	d
ZedZeddG dd dZdejfddZdejdejjfddZdejjdedejjfddZdedefddZdedefddZdejjdededdfd d!Zd"ejdefd#d$ZdS )%    N)	dataclassRABBITMQ_URLz"amqp://guest:guest@127.0.0.1:5672/AI_UTTERANCES_QUEUEzai.utterancesTTS_QUEUE_PREFIXztts.CONTROL_QUEUE_PREFIXzmcube.control.zmcube.mqT)frozenc                   @   s$   e Zd ZU eed< defddZdS )McubeMessagebodyreturnc                 C   s   t | jdS )Nutf-8)jsondumpsr	   encode)self r   M/var/www/html/livekitdocker/backend/agent_runtime/src/mcube_integration/mq.pyto_bytes   s   zMcubeMessage.to_bytesN)__name__
__module____qualname__dict__annotations__bytesr   r   r   r   r   r      s   
 r   r
   c               
      s   t tdd} t tdd}d}	 z	ttI dH W S  tyO } z&|d7 }t|| d	t|d
  }t	d||| t
|I dH  W Y d}~nd}~ww q)a8  
    Connect to RabbitMQ with retries.

    Local dev can start consumers before RabbitMQ is fully ready; in that case aio-pika
    may raise (e.g. "Server connection unexpectedly closed") and crash the whole process.
    Keeping a retry loop here prevents websocket disconnects caused by consumer shutdown.
    RABBITMQ_CONNECT_RETRY_BASE_Sz1.0RABBITMQ_CONNECT_RETRY_MAX_Sz10.0r   TN         z5RabbitMQ connect failed (attempt=%s delay_s=%.1f): %s)floatosgetenvaio_pikaconnect_robustr   	Exceptionminlogwarningasynciosleep)base_smax_sattemptedelayr   r   r   connect   s(   r.   
connectionc                    s2   |   I d H }|jttdddI d H  |S )NRABBITMQ_PREFETCH10)prefetch_count)channelset_qosintr   r    )r/   r3   r   r   r   get_channel7   s   r6   r3   
queue_namec                    s   | j |ddI d H S )NT)durable)declare_queue)r3   r7   r   r   r   declare_durable_queue=   s   r:   call_idc                 C      t  |  S N)r   r;   r   r   r   tts_queue_nameC      r?   c                 C   r<   r=   )r   r>   r   r   r   control_queue_nameH   r@   rA   payloadc                    sD   | j }t|d}tj|tjjdd}|j||dI d H  d S )Nr   zapplication/json)r	   delivery_modecontent_type)routing_key)	default_exchanger   r   r   r!   MessageDeliveryMode
PERSISTENTpublish)r3   r7   rB   exchanger	   messager   r   r   publish_jsonM   s   rM   msgc                    s<   | j jddd}zt|W S  tjy   d|i Y S w )Nr   replace)errors_raw)r	   decoder   loadsJSONDecodeError)rN   rawr   r   r   safe_get_message_text\   s   rV   )r'   loggingr   r   dataclassesr   r!   r    r   r   r   r   	getLoggerr%   r   RobustConnectionr.   abcAbstractChannelr6   strAbstractQueuer:   r?   rA   r   rM   IncomingMessagerV   r   r   r   r   <module>   sH    


