o
    M)jU                     @  s   d dl mZ d dlZd dlZd dlZd dlmZmZ d dlZd dl	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZ eeZG d	d
 d
ZdS )    )annotationsN)DictAny)settings)get_connection)stt_jobs)get_stt_provider)SarvamSlotTimeoutErrorc                   @  s   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zd<ddZd<ddZ	d<ddZ
d=ddZd>ddZd=ddZd?dd Zd@d!d"ZdAd$d%ZdBd&d'ZdCd*d+ZdCd,d-ZdDd.d/ZdEd1d2ZdEd3d4ZdFd5d6ZdGd9d:Zd;S )HRabbitMQTranscriptionWorkerc                 C  s(   t  | _tj| _d | _d | _tj| _d S N)r   sttr   max_retries
connectionchannelrabbitmq_queue
queue_nameself r   \/home/aiteam/pcaa-dev/call-proccessing/stt_pipeline/workers/rabbitmq_transcription_worker.py__init__   s
   z$RabbitMQTranscriptionWorker.__init__c              	   C  sh   |    ttjtjtjttjtj	dd| _
| j
 | _| jj| jdd td| j| jj d S )NX  hostportcredentials	heartbeatTqueuedurablez2RabbitMQ worker connected | queue=%s | provider=%s)_closepikaBlockingConnectionConnectionParametersr   rabbitmq_hostrabbitmq_portPlainCredentialsrabbitmq_userrabbitmq_passwordr   r   queue_declarer   loggerinfor   provider_namer   r   r   r   _connect   s&   z$RabbitMQTranscriptionWorker._connectc                 C  sp   z| j r| j jr| j   W n	 ty   Y nw z| jr%| jjr%| j  W n	 ty/   Y nw d | _ d | _d S r   )r   is_openclose	Exceptionr   r   r   r   r   r    0   s    


z"RabbitMQTranscriptionWorker._closec              
   C  s   	 z!|    | jjdd | jj| j| jd td | j  W n. t	y0   td Y n" t
yP } ztd| |   td W Y d }~nd }~ww q|   d S )	NT   )prefetch_count)r   on_message_callbackzWaiting for messages...zWorker shutting downz.Worker connection lost, reconnecting in 5s: %s   )r-   r   	basic_qosbasic_consumer   	_callbackr*   r+   start_consumingKeyboardInterruptr0   	exceptionr    timesleepr   excr   r   r   start@   s*   

z!RabbitMQTranscriptionWorker.startc                 C  s   d }zt | }td|d|d |j|jd W n- tyL } z!t	d| z
|j
|jdd W n	 ty@   Y nw W Y d }~d S d }~ww z| | W d S  tyv } zt	d|d|d| W Y d }~d S d }~ww )	Nz[%s/%s] Received jobbidcall_id)delivery_tagzFailed to parse/ack message: %sT)rB   requeuez [%s/%s] Job failed after ack: %s)jsonloadsdecoder*   r+   get	basic_ackrB   r0   r:   
basic_nack_process_job)r   chmethod
propertiesbodyjobr>   r   r   r   r7   V   s<   z%RabbitMQTranscriptionWorker._callbackr@   strreturnc                 C     d| dS )N`z_raw_calls`r   r   r@   r   r   r   
_raw_tablew      z&RabbitMQTranscriptionWorker._raw_tablec                 C  rR   )NrS   z_sarvamresponse`r   rT   r   r   r   _response_tablez   rV   z+RabbitMQTranscriptionWorker._response_tablec                 C  rR   )NrS   z_callanalytics`r   rT   r   r   r   _analytics_table}   rV   z,RabbitMQTranscriptionWorker._analytics_tableboolc              
   C  s   zLt  >}| (}|d|f | }tt|pi dpdW  d    W  d    W S 1 s4w   Y  W d    W d S 1 sEw   Y  W d S  tyf } zt	d|| W Y d }~dS d }~ww )Nz
                        SELECT webhook_ingest_enabled
                        FROM business_pipeline_config
                        WHERE bid = %s
                        LIMIT 1
                        webhook_ingest_enabledr   z.[%s] Could not read webhook_ingest_enabled: %sF)
r   cursorexecutefetchonerY   intrG   r0   r*   warning)r   r@   conncurrowr>   r   r   r   _is_webhook_ingest_enabled   s$   
	&z6RabbitMQTranscriptionWorker._is_webhook_ingest_enabledrA   Nonec              
   C  s   ||d}t j}z>ttjt jt jtt jt j	dd}|
 }|j|dd |jd|t|tjddd	 |  td
||| W d S  tya } ztd||| W Y d }~d S d }~ww )N)r@   rA   r   r   Tr       )delivery_mode)exchangerouting_keyrN   rM   z%[%s/%s] Published analytics job to %sz+[%s/%s] Failed to publish analytics job: %s)r   rabbitmq_analytics_queuer!   r"   r#   r$   r%   r&   r'   r(   r   r)   basic_publishrD   dumpsBasicPropertiesr/   r*   r+   r0   r_   )r   r@   rA   payloadr   r`   r   r>   r   r   r   _publish_analytics_job   s8   

z2RabbitMQTranscriptionWorker._publish_analytics_jobc                 C  s   t jrdS | |S )NT)r   stt_enqueue_analytics_alwaysrc   rT   r   r   r   _should_enqueue_analytics   s   
z5RabbitMQTranscriptionWorker._should_enqueue_analytics
raw_statusr^   c                 C  s.   t |dkrd S | |sd S | || d S )Nrf   )r^   rq   ro   )r   r@   rA   rr   r   r   r   _maybe_enqueue_analytics   s
   
z4RabbitMQTranscriptionWorker._maybe_enqueue_analyticsc                 C  s\   |  |}| |}|d| d|f | sdS |d| d|f | r,dS dS )zCReturn the correct raw_calls status when transcript already exists.z'
            SELECT 1
            FROM z
            WHERE callid = %s
              AND transcript IS NOT NULL
              AND transcript != ''
            LIMIT 1
            NzSELECT 1 FROM  WHERE callid = %s LIMIT 1   rf   )rW   rX   r\   r]   )r   ra   r@   rA   
resp_tableanalytics_tabler   r   r   _existing_result_status   s   


z3RabbitMQTranscriptionWorker._existing_result_statusstatusc                 C  s(   |  |}|d| d|||f d S )N
            UPDATE z
            SET status = %s,
                transcription_status = CASE
                    WHEN %s IN (2, 3) THEN 'completed'
                    ELSE transcription_status
                END
            WHERE callid = %s
            )rU   r\   )r   ra   r@   rA   ry   	raw_tabler   r   r   _set_raw_status   s   
	z+RabbitMQTranscriptionWorker._set_raw_statusc                 C  s.   |  |}|d| d|f t|jpdS )Nz
            DELETE FROM zk
            WHERE callid = %s
              AND (transcript IS NULL OR TRIM(transcript) = '')
            r   )rW   r\   r^   rowcount)r   ra   r@   rA   rv   r   r   r   _purge_empty_transcript_rows   s   
z8RabbitMQTranscriptionWorker._purge_empty_transcript_rowsr>   r0   c                 C  s"   t |dd gd dkpdt|v S )Nargsr   i&  zDuplicate entrygetattrrP   r=   r   r   r   _is_duplicate_key_error   s   "z3RabbitMQTranscriptionWorker._is_duplicate_key_errorc                 C  s"   t |dd gd dkodt|v S )Nr   r   i  call_starttimer   r=   r   r   r    _is_unknown_call_starttime_error   s   
z<RabbitMQTranscriptionWorker._is_unknown_call_starttime_errorc              	   C  sJ   t |pd}|dr#zt|ddd W S  ttfy"   Y dS w dS )Nre   
stt_retry::r1   r   )rP   
startswithr^   split	TypeError
ValueError)r   transcription_statustsr   r   r   _parse_stt_retry_count   s   
z2RabbitMQTranscriptionWorker._parse_stt_retry_counterror_messagec                 C  s   |  }d|v pd|v S )Nzaudio download failed: http 404zdownload failed: http 404)lower)r   r   msgr   r   r   _is_recording_not_ready_error  s   z9RabbitMQTranscriptionWorker._is_recording_not_ready_errorc                   s0   |  |rdS |  d}t fdd|D S )NF)zaudio too shortzaudio download failed: http 403zaudio download failed: http 410zempty transcriptzinvalid audiocorruptzfile formatunsupportedc                 3  s    | ]}| v V  qd S r   r   ).0markerr   r   r   	<genexpr>  s    zHRabbitMQTranscriptionWorker._is_permanent_stt_failure.<locals>.<genexpr>)r   r   any)r   r   permanent_markersr   r   r   _is_permanent_stt_failure  s
   

z5RabbitMQTranscriptionWorker._is_permanent_stt_failurec           
      C  sf  |  |}| |||}|d ur"| |||| td||| d S | |r;|d| d|f td|| d S | |rU|d| d|f td||| d S |d| d|f | pei }| 	|
d	d
 }td
tt| ddpzd}	||	kr|d| d|f td||||	| d S |d| dd| |f td||||	| d S )Nz@[%s/%s] Failure after transcript existed; repaired raw status=%sz
                UPDATE z
                SET status = 0,
                    transcription_status = 'url_wait',
                    transcription_requested = 0
                WHERE callid = %s
                zJ[%s/%s] Recording file not ready (404); reset to status=0 for URL re-checkz
                SET status = -2,
                    transcription_status = 'backlog_cleared',
                    transcription_requested = 0
                WHERE callid = %s
                zD[%s/%s] Permanent STT failure; marked terminal (backlog_cleared): %sz!SELECT transcription_status FROM rt   r   r1   r   ru   zE[%s/%s] STT failed %s/%s times; marked terminal (backlog_cleared): %srz   z
            SET status = 0,
                transcription_status = %s,
                transcription_requested = 0
            WHERE callid = %s
            r   zA[%s/%s] STT failed attempt %s/%s; reset to status=0 for retry: %s)rU   rx   r|   r*   r_   r   r\   r   r]   r   rG   maxr^   r   )
r   ra   r@   rA   r   r{   existing_statusrb   retry_countr   r   r   r   _handle_stt_failure  s   







z/RabbitMQTranscriptionWorker._handle_stt_failurerO   Dict[str, Any]c                 C  s  | d}| d}| d}| d}td||| z|r%t| t g}| O}| |||}|r?td||| | |||}	|	d uru| 	||||	 |
  td|||	 | |||	 	 W d    W d    W d S W d    n1 sw   Y  |
  W d    n1 sw   Y  dd	lm}
 |
|||\}}}|rt >}| &}| |}|d
| d|d urtdtt|nd |f W d    n1 sw   Y  |
  W d    n1 sw   Y  td||| W d S z
| jj||d}W n ty   t a}| L}| |||}	|	d urO| 	||||	 |
  td|||	 | |||	 	 W d    W d    Y W d S | 	|||d |
  W d    n	1 sfw   Y  W d    n	1 svw   Y  td|| Y W d S w tdd |jD }| |}| |}t }| }z1|d| d| d||jt|j||j|jdd| dtt  t|jdd|f
 W n t yX } zv| !|r| |||}	|	d ur| 	||||	 td|||	 | |||	 W Y d }~W d    W d    W d S  | "|s# |d| d||jt|j||j|jdd| dtt  t|jddf	 W Y d }~nd }~ww | 	|||d W d    n	1 slw   Y  W d    n	1 s|w   Y  | ||d |rt#||j|j||j|j td|||j|t|j W d S  t y } zXt$|}t%d||| |rt&|| t )}| }| '|||| W d    n	1 sw   Y  |
  W d    n1 sw   Y  W Y d }~d S W Y d }~d S d }~ww )Njob_idr@   rA   recording_urlz [%s/%s] Starting STT (job_id=%s)z9[%s/%s] Removed %s empty sarvamresponse row(s) before STTz4[%s/%s] Transcript already exists; set raw status=%sr   )should_skip_sttz$
                            UPDATE a3  
                            SET status = -2,
                                transcription_status = 'skipped_short',
                                transcription_requested = 0,
                                duration_seconds = %s
                            WHERE callid = %s
                            z-[%s/%s] STT skipped before provider call (%s))	audio_urlcallidz:[%s/%s] Slot wait timeout but transcript exists; status=%sz@[%s/%s] Sarvam slot wait timeout; reset to status=0 for re-queuec                 S  s   h | ]}|d  qS )
speaker_idr   )r   segr   r   r   	<setcomp>  s    z;RabbitMQTranscriptionWorker._process_job.<locals>.<setcomp>z)
                            INSERT INTO az   
                            (callid, transcript, speaker_segments, num_speakers, duration, 
                             stt_provider, status, request_id, raw_response, created_at, updated_at, call_starttime) 
                            SELECT 
                                %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), call_starttime
                            FROM z/ WHERE callid = %s
                            1job__T)
transcriptsyncedz3[%s/%s] Duplicate transcript row; set raw status=%sa6   
                            (callid, transcript, speaker_segments, num_speakers, duration, 
                             stt_provider, status, request_id, raw_response, created_at, updated_at) 
                            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
                            rf   u0   [%s/%s] Done — %.1fs, %d speakers, %d segmentsz[%s/%s] Failed: %s)(rG   r*   r+   r   mark_processingr   r[   r~   rx   r|   commitrs   stt.min_duration_gater   rU   r\   r   r^   roundr   
transcriber	   r_   lenspeaker_segmentsrW   r   rD   rl   durationproviderr;   r0   r   r   	mark_donerP   errormark_failedr   )r   rO   r   r@   rA   r   r`   ra   removedr   r   skip_min
min_reasonprobed_audior{   resultspeaker_countrv   
insert_excr>   r   r   r   r   rJ   y  s  








	







	


&
&A	
	

:z(RabbitMQTranscriptionWorker._process_jobN)r@   rP   rQ   rP   )r@   rP   rQ   rY   )r@   rP   rA   rP   rQ   rd   )r@   rP   rA   rP   rr   r^   rQ   rd   )r@   rP   rA   rP   )r@   rP   rA   rP   ry   r^   )r@   rP   rA   rP   rQ   r^   )r>   r0   rQ   rY   )rQ   r^   )r   rP   rQ   rY   )r@   rP   rA   rP   r   rP   rQ   rd   )rO   r   )__name__
__module____qualname__r   r-   r    r?   r7   rU   rW   rX   rc   ro   rq   rs   rx   r|   r~   r   r   r   r   r   r   rJ   r   r   r   r   r
      s.    
!












	

dr
   )
__future__r   rD   loggingr!   typingr   r   r;   config.settingsr   db.connectionr   dbr   stt.factoryr   stt.concurrencyr	   	getLoggerr   r*   r
   r   r   r   r   <module>   s    
