o
    j                     @   s4  d dl Z d dlZd dlZd dlZd dlmZ d dlZeddZeddZ	eddZ
ed	d
ZedZeddG dd dZdejfddZdddejdedB dejjfddZdejjdedejjfddZdedefddZdedefdd Zdejjded!eddfd"d#Zd$ejdefd%d&ZdS )'    N)	dataclassRABBITMQ_URLz"amqp://guest:guest@127.0.0.1:5672/AI_UTTERANCES_QUEUEzai.utterancesTTS_QUEUE_PREFIXztts.CONTROL_QUEUE_PREFIXzmcube.control.zmcube.mqT)frozenc                   @   s$   e Zd ZU eed< defddZdS )McubeMessagebodyreturnc                 C   s   t | jdS )Nutf-8)jsondumpsr	   encode)self r   M/var/www/html/livekitdocker/backend/agent_runtime/src/mcube_integration/mq.pyto_bytes   s   zMcubeMessage.to_bytesN)__name__
__module____qualname__dict__annotations__bytesr   r   r   r   r   r      s   
 r   r
   c               
      s   t tdd} t tdd}d}	 z	ttI dH W S  tyO } z&|d7 }t|| d	t|d
  }t	d||| t
|I dH  W Y d}~nd}~ww q)a8  
    Connect to RabbitMQ with retries.

    Local dev can start consumers before RabbitMQ is fully ready; in that case aio-pika
    may raise (e.g. "Server connection unexpectedly closed") and crash the whole process.
    Keeping a retry loop here prevents websocket disconnects caused by consumer shutdown.
    RABBITMQ_CONNECT_RETRY_BASE_Sz1.0RABBITMQ_CONNECT_RETRY_MAX_Sz10.0r   TN         z5RabbitMQ connect failed (attempt=%s delay_s=%.1f): %s)floatosgetenvaio_pikaconnect_robustr   	Exceptionminlogwarningasynciosleep)base_smax_sattemptedelayr   r   r   connect   s(   r.   prefetch_count
connectionr0   c                   sL   |   I d H }|d urt|nttdd}|jtd|dI d H  |S )NRABBITMQ_PREFETCH10r   r/   )channelintr   r    set_qosmax)r1   r0   r4   prefetchr   r   r   get_channel7   s   r9   r4   
queue_namec                    s   | j |ddI d H S )NT)durable)declare_queue)r4   r:   r   r   r   declare_durable_queueF   s   r=   call_idc                 C      t  |  S N)r   r>   r   r   r   tts_queue_nameL      rB   c                 C   r?   r@   )r   rA   r   r   r   control_queue_nameQ   rC   rD   payloadc                    sD   | j }t|d}tj|tjjdd}|j||dI d H  d S )Nr   zapplication/json)r	   delivery_modecontent_type)routing_key)	default_exchanger   r   r   r!   MessageDeliveryMode
PERSISTENTpublish)r4   r:   rE   exchanger	   messager   r   r   publish_jsonV   s   rP   msgc                    s<   | j jddd}zt|W S  tjy   d|i Y S w )Nr   replace)errors_raw)r	   decoder   loadsJSONDecodeError)rQ   rawr   r   r   safe_get_message_texte   s   rY   )r'   loggingr   r   dataclassesr   r!   r    r   r   r   r   	getLoggerr%   r   RobustConnectionr.   r5   abcAbstractChannelr9   strAbstractQueuer=   rB   rD   r   rP   IncomingMessagerY   r   r   r   r   <module>   sV    



