o
    i=                  	   @   s  d Z ddlZddlZddlZddlZddlZddlZddlZddlmZm	Z	 ddl
mZmZmZmZmZ ddlZddlZddlmZ ddlmZ ejejeZejde ee e  ddlmZ ddlm Z  dd	l!m"Z" dd
l#m$Z$ ej%ej&de'ej(ede) gd e*dZ+da,dd Z-eej.e- eej/e- G dd dZ0G dd de0Z1G dd de0Z2dd Z3e4dkre3  dS dS )a  
Unified Call Processing Pipeline.

This module combines the polling-based sync from call_processor.py 
and the orchestration logic from orchestrate_pipeline.py into a single, 
RabbitMQ-powered system.

Modes:
  --mode orchestrator : Scans for new calls, syncs them, and queues tasks in RabbitMQ.
  --mode worker       : Consumes tasks from RabbitMQ and performs Transcription/Analysis.

Usage:
  python3 unified_pipeline.py --mode orchestrator --bid 1713
  python3 unified_pipeline.py --mode worker
    N)datetime	timedelta)AnyDictListOptionalSet)
DictCursor)load_dotenv)Config)DatabaseHandler)get_stt_provider)AgentRunneru4   %(asctime)s [%(levelname)s] %(name)s — %(message)szunified_pipeline.log)levelformathandlersunified_pipelineFc                 C   s   t d dad S )Nz4Shutdown signal received - finishing current task...T)loggerinfo	_shutdown)sigframe r   ;/home/aiteam/pcaa-dev/dashboard-backend/unified_pipeline.py_handle_signal;   s   
r   c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	SharedPipelineBasez0Base class for shared DB and RabbitMQ resources.c                 C   s6   t  | _G dd d}t|| j| _d | _d | _d S )Nc                   @   s&   e Zd Zdd ZdddZdd ZdS )	z2SharedPipelineBase.__init__.<locals>.ConfigWrapperc                 S   s
   || _ d S N)_cfg)selfcfgr   r   r   __init__J   s   
z;SharedPipelineBase.__init__.<locals>.ConfigWrapper.__init__Nc                 S   s   t | j||S r   getattrr   )r   keydefaultr   r   r   getL   s   z6SharedPipelineBase.__init__.<locals>.ConfigWrapper.getc                 S   s   t | j|S r   r!   )r   r#   r   r   r   __getattr__N   s   z>SharedPipelineBase.__init__.<locals>.ConfigWrapper.__getattr__r   )__name__
__module____qualname__r    r%   r&   r   r   r   r   ConfigWrapperI   s    
r*   )r   configr   dbrabbitmq_connrabbitmq_channel)r   r*   r   r   r   r    E   s
   
zSharedPipelineBase.__init__c              
   C   s   zGt | jj| jj}t j| jj|| jj| jjd}t 	|| _
| j
 | _dD ]}| jj | }| jj|dd q(td| jj  W dS  ty\ } z	td|   d}~ww )z!Establish connection to RabbitMQ.)hostcredentials	heartbeatblocked_connection_timeouttranscriptionanalysisT)queuedurablezConnected to RabbitMQ at zRabbitMQ connection failed: N)pikaPlainCredentialsr+   RABBITMQ_USERRABBITMQ_PASSConnectionParametersRABBITMQ_HOSTRABBITMQ_HEARTBEATRABBITMQ_TIMEOUTBlockingConnectionr-   channelr.   RABBITMQ_QUEUE_PREFIXqueue_declarer   r   	Exceptionerror)r   r0   
parametersstage
queue_nameer   r   r   connect_rabbitmqU   s&   z#SharedPipelineBase.connect_rabbitmqc                 C   s$   | j r| j js| j   d S d S d S r   )r-   	is_closedcloser   r   r   r   close_rabbitmql   s   z!SharedPipelineBase.close_rabbitmqN)r'   r(   r)   __doc__r    rJ   rN   r   r   r   r   r   C   s
    r   c                       s   e Zd ZdZ fddZddedefddZded	ed
edefddZ	dede
defddZdededefddZdefddZ  ZS )Orchestratorz0Responsible for syncing calls and queuing tasks.c                    s   t    |   d S r   )superr    rJ   rM   	__class__r   r   r    s   s   
zOrchestrator.__init__2   bidlimitc                 C   s~   t d| d | j|}|r|dds!t d| d n| ||| | j|d|d | j|d|d | | d	S )
z,Run the orchestrator steps for a single bid.[z] Starting orchestrator runpipeline_enabledTz5] Pipeline disabled or missing config - skipping syncr4   rV   r5   N)	r   r   r,   get_pipeline_configr%   warning
sync_callsqueue_tasksrecover_stuck_calls)r   rU   rV   r   r   r   r   run_bidw   s   zOrchestrator.run_bidfilename	starttimereturnc           	      C   s   |sdS | dr|S d|v r|d}d| S z(t|tr&tt|n|}|d|d}}d| d| d| d| W S  tyc } zt	d	| d
| d|  |W  Y d}~S d}~ww )z`Reconstruct the full recording URL if it's just a filename (Logic from orchestrate_pipeline.py). http/zhttps://recordings.mcube.com/z%Yz%mz3https://recordings.mcube.com/mcubefiles112/classic/z	/inbound/rW   z] Error reconstructing URL for : N)

startswithlstrip
isinstancestrr   fromisoformatstrftimerD   r   rE   )	r   rU   r`   ra   pathdtyearmonthrI   r   r   r   _get_reconstructed_url   s   

z#Orchestrator._get_reconstructed_urlpipeline_cfgc                 C   s   ddl m} ||| j|}|dkrr| j D}| }| d}|d| d | p.g }	|	D ]!}
| ||
d |
d }||
d krR|d| d	||
d
 f q1W d   n1 s]w   Y  t	d| dt
|	 d dS dS )z6Pull new calls from source DB into {bid}_call_records.r   )
stage_sync_call_recordsz*SELECT callid, file_url, call_start FROM `z8` WHERE status = 'pending' AND file_url NOT LIKE 'http%'file_url
call_startzUPDATE `z%` SET file_url = %s WHERE callid = %scallidNrW   z"] URL reconstruction complete for z records)call_processorrs   r,   get_connectioncursorexecutefetchallrq   r   r   len)r   rU   rr   rV   rs   insertedconnrz   table	to_repairrowrepairedr   r   r   r\      s"   
 
zOrchestrator.sync_callsrG   c           	      C   s   | j j | }|dkr| jj||d}n| jj||d}|s!dS td| dt| d|  |D ]2}||d |t	 
 d}| jjd	|t|tjd
dd |dkrYdnd}| j||d | q3dS )z/Find calls in the DB and push them to RabbitMQ.r4   )batchNrW   z
] Queuing z calls for rw   )rU   rw   rG   	timestamprc      )delivery_mode)exchangerouting_keybody
propertiestranscribing	analyzing)r+   rB   r,   get_calls_to_transcribeget_calls_to_analyzer   r   r}   r   now	isoformatr.   basic_publishjsondumpsr8   BasicPropertiesset_call_status)	r   rU   rG   rV   rH   callscallpayload
new_statusr   r   r   r]      s,    

	zOrchestrator.queue_tasksc              
   C   s   | d}t  tdd }| j D}| }|d| d|f |j}|d| d|f |j}|s8|rOt	d| d| d	| d
 W d   dS W d   dS 1 sZw   Y  dS )zAResets calls stuck in 'transcribing' or 'analyzing' for too long.rt      )hoursz
                UPDATE `z` 
                SET status = 'pending' 
                WHERE status = 'transcribing' 
                AND (transcript IS NULL OR transcript = '')
                AND updated_at < %s
            z` 
                SET status = 'transcribed' 
                WHERE status = 'analyzing' 
                AND updated_at < %s
            rW   z] Recovered z transcription and z analysis tasksN)
r   r   r   r,   ry   rz   r{   rowcountr   r   )r   rU   r   
stuck_timer   rz   t_recovereda_recoveredr   r   r   r^      s*   
 "z Orchestrator.recover_stuck_calls)rT   )r'   r(   r)   rO   r    rj   intr_   r   rq   r   r\   r]   r^   __classcell__r   r   rR   r   rP   p   s     rP   c                       s\   e Zd ZdZ fddZdd Zdd Zded	ed
efddZ	ded	ed
efddZ
  ZS )Workerz/Responsible for processing tasks from RabbitMQ.c                    s"   t    t| j| _|   d S r   )rQ   r    r   r,   agent_runnerrJ   rM   rR   r   r   r       s   
zWorker.__init__c                 C   sf   dD ]}| j j | }| jj|| jd qtd z| j  W dS  ty2   | j	  Y dS w )z Start consuming from all queues.r3   )r6   on_message_callbackz%Worker started - waiting for tasks...N)
r+   rB   r.   basic_consumeprocess_messager   r   start_consumingKeyboardInterruptstop_consuming)r   rG   rH   r   r   r   start   s   
zWorker.startc              
   C   s2  zit |}|d }|d }|d }td| d| d|  d}	|dkr-| ||}	n
|d	kr7| ||}	|	rQ|j|jd
 td| d| d|  W dS |j|jdd t	d| d| d|  W dS  t
y }
 z#tj	d|
 dd |jr|j|jdd W Y d}
~
dS W Y d}
~
dS d}
~
ww )z7Dispatches the message to the correct processing logic.rU   rw   rG   rW   z] Processing z
 for call Fr4   r5   )delivery_tagz] Successfully processed z for )r   requeuez] Failed to process z#Critical error in worker callback: Texc_infoN)r   loadsr   r   do_transcriptiondo_analysis	basic_ackr   
basic_nackrE   rD   )r   chmethodr   r   taskrU   rw   rG   successrI   r   r   r   r     s.   
""zWorker.process_messagerU   rw   rb   c                 C   s^  | j |}|s
dS |dpd}ddlm} || j |}zt||}W n tyB } ztd| d|  W Y d}~dS d}~ww | j 	||}	|	rQ|	d	s]| j j
||d
dd dS | j ||d z||	d	 |}
|
r~|
jr~| j |||
 W dS td ty } z!td| d| d|  | j j
||d
t|d W Y d}~dS d}~ww )z+Logic from call_processor.stage_transcribe.Fstt_providersarvamr   )_decrypt_stt_keyrW   z] STT provider init failed: Nru   
transcribezNo audio URL or record missingrG   reasonr   TzEmpty transcript returnedz] Transcription failed for rf   )r,   rZ   r%   rx   r   r   rD   r   rE   get_call_record_detail	fail_callr   r   
transcriptsave_call_transcriptionRuntimeErrorrj   )r   rU   rw   r   stt_provider_namer   api_keysttrI   r   resultexcr   r   r   r      s:   
zWorker.do_transcriptionc           	      C   sJ  | j ||}|std| d| d dS |dpd}|s.td| d| d dS |d	p4g }|d
p;d|dpAdt|dpHdt|dpPdd}| j ||d z| jj	|||||d}|ru| j 
||| W dS W dS  ty } z!td| d| d|  | j j||dt|d W Y d}~dS d}~ww )z(Logic from call_processor.stage_analyze.rW   z] Call record z not found for analysisFr   rc   z] Call z has no transcript for analysisspeaker_segments
agent_namecustomer_phonerv   call_duration_s)r   r   rv   r   r   )rU   rw   r   r   call_metadataTz] Analysis failed for rf   analyzer   N)r,   r   r   rE   r%   r[   rj   r   r   runsave_call_analysisrD   r   )	r   rU   rw   r   r   r   	call_metar5   r   r   r   r   r   B  sB   zWorker.do_analysis)r'   r(   r)   rO   r    r   r   rj   boolr   r   r   r   r   rR   r   r      s    "r   c                  C   sH  t jdd} | jdddgddd | jd	d
d | jdtddd | jdtddd | jdddd |  }|jdkrt }	 |jrF|jgn|j	 }|D ]/}t
rS n*z
|j||jd W qM ty| } ztjd| d| dd W Y d }~qMd }~ww |jrt
rnt|j q?|  d S |jdkrt }|  |  d S d S )Nz Unified Call Processing Pipeline)descriptionz--modeorchestratorworkerTz:Mode: orchestrator (queue tasks) or worker (process tasks))choicesrequiredhelpz--bidz Orchestrator mode: only this bid)r   z--limitrT   z Batch limit for sync and queuing)typer$   r   z
--interval<   zLoop interval for orchestratorz--continuous
store_truezLoop orchestrator indefinitely)actionr   rY   zOrchestrator error for bid rf   r   )argparseArgumentParseradd_argumentr   
parse_argsmoderP   rU   r,   get_enabled_pipeline_bidsr   r_   rV   rD   r   rE   
continuoustimesleepintervalrN   r   r   )parserargsr   bidsrU   rI   r   r   r   r   mainl  s<   
&
r   __main__)5rO   r   r   loggingossignalsysr   r   r   typingr   r   r   r   r   r8   pymysqlpymysql.cursorsr	   dotenvr
   rm   dirnameabspath__file__BACKEND_DIRinsertchdirr+   r   
db_handlerr   r   r   r   r   basicConfigINFOFileHandlerjoinStreamHandler	getLoggerr   r   r   SIGTERMSIGINTr   rP   r   r   r'   r   r   r   r   <module>   sR   

-~~
