o
    eK±iÊb  ã                   @   s2  d Z ddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlmZmZ ddlmZ ej ej e¡¡Zej de¡ e e¡ ddlmZ eƒ  ejejde ej ej ej e¡¡d¡¡e ¡ gd e d	¡Zd
Ze  d¡p†e  d¡p†dZ!dZ"e  dd¡e  dd¡e  dd¡e  dd¡dedœZ#e  dd¡e  dd¡e  dd¡e  dd¡dedœZ$dZ%dZ&dZ'dZ(d Z)d!a*d"d#„ Z+e ej,e+¡ e ej-e+¡ d$d%„ Z.d&d'„ Z/d(d)„ Z0d*d+„ Z1d,d-„ Z2d.d/„ Z3d0d1„ Z4d2d3„ Z5d4d5„ Z6d6d7„ Z7d8d9„ Z8d:d;„ Z9d<d=„ Z:e;d>kre:ƒ  dS dS )?aR  
Automated Pipeline for BID 6004 (MCUBE Sales)
Continuously syncs, transcribes, and analyzes calls.

Stage 1: SYNC - Pull new calls from source DB to destination DB
Stage 2: TRANSCRIBE - Use Sarvam speech_to_text_translate_job for English translation with diarization
Stage 3: ANALYZE - Use AWS Bedrock Nova for quality scoring, BANT, sentiment, talk-listen ratio

Usage:
    python3 pipeline_6004.py                  # Run once
    python3 pipeline_6004.py --continuous     # Run continuously (default interval: 120s)
    python3 pipeline_6004.py --continuous --interval 60  # Custom interval
é    N)ÚdatetimeÚ	timedelta)Ú
DictCursor)Úload_dotenvz'%(asctime)s [%(levelname)s] %(message)szpipeline_6004.log)ÚlevelÚformatÚhandlersÚpipeline_6004Ú6004ÚSARVAM_PIPELINE_KEYÚSARVAM_SUBSCRIPTION_KEYÚ$sk_o6cnn95f_gp7r6jV6LfihWL05Vj2CrVswé   ÚSYNC_SOURCE_DB_HOSTz	127.0.0.1ÚSYNC_SOURCE_DB_USERÚadminÚSYNC_SOURCE_DB_PASSWORDÚ ÚSYNC_SOURCE_DB_NAMEÚvoicebot_clusterÚutf8mb4)ÚhostÚuserÚpasswordÚdatabaseÚcharsetÚcursorclassÚDB_HOSTÚDB_USERÚDB_PASSWORDÚDB_NAMEiô  é   éx   éZ   Fc                 C   s   t  d¡ dad S )Nz4Shutdown signal received, finishing current batch...T)ÚloggerÚinfoÚshutdown_requested)ÚsigÚframe© r)   úpipeline_6004.pyÚsignal_handler\   s   
r+   c                 C   sŽ   d  dd„ t| p	dƒD ƒ¡}|stƒ S t|ƒdkr|dd… n|}|d|› |h}t|ƒdkr@| d|› d	|› d
|› h¡ dd„ |D ƒS )zMReturn a set of phone variants for a given number, matching db_handler logic.r   c                 s   s    | ]	}|  ¡ r|V  qd S )N)Úisdigit)Ú.0Úchr)   r)   r*   Ú	<genexpr>i   s   € z#_normalize_phone.<locals>.<genexpr>é
   iöÿÿÿNú+Ú91z+91Ú0c                 S   s   h | ]}|r|’qS r)   r)   )r-   Úvr)   r)   r*   Ú	<setcomp>p   ó    z#_normalize_phone.<locals>.<setcomp>)ÚjoinÚstrÚsetÚlenÚupdate)ÚphoneÚdigitsÚcore10Úvariantsr)   r)   r*   Ú_normalize_phoneg   s   "r@   c                 C   s€   |   d¡pd ¡ }t|   d¡pdƒ ¡ }t|   d¡pdƒ ¡ }t|   d¡p&dƒ ¡ }|dkr2|r2|S |dkr:|r:|S |p?|p?|S )z7Extract the customer-side phone from a callhistory row.Ú	directionÚinboundÚcalltor   ÚcallfromÚclicktocalldidÚoutbound)ÚgetÚlowerr8   Ústrip)ÚcallrA   rC   rD   rE   r)   r)   r*   Ú_pick_customer_phones   s   rK   c              	   C   s²   |   ¡ }| dtf¡ | ¡ pg }tƒ }|D ]@}t| d¡pdƒ ¡ }|r,| t	|ƒ¡ zt
 | d¡p5d¡}W n tyC   g }Y nw |D ]}t|ƒ ¡ }|rU| |¡ qFq|S )zLLoad all phone variants for BID 6004 from crm_leads_cache into a Python set.zeSELECT phone_primary, phone_variants FROM crm_leads_cache WHERE bid = %s AND provider = 'leadsquared'Úphone_primaryr   Úphone_variantsz[])ÚcursorÚexecuteÚBIDÚfetchallr9   r8   rG   rI   r;   r@   ÚjsonÚloadsÚ	ExceptionÚadd)Ú	dest_connrN   ÚrowsÚ	phone_setÚrowÚprimaryr?   r4   r)   r)   r*   Ú_load_lead_phone_set€   s.   þÿ
€ýr[   c                 C   sr   |   ¡ }z| dt ¡ | ¡ }|r|d nd}W n ty$   d}Y nw |du r2t ¡ ttd S |tdd S )zÐReturn the watermark datetime for the call sync.

    Uses MAX(call_starttime) from 6004_raw_calls minus a 1-hour overlap buffer.
    Falls back to SYNC_FIRST_RUN_LOOKBACK_DAYS ago if the table is empty.
    z8SELECT MAX(call_starttime) AS latest FROM `%s_raw_calls`ÚlatestN)Údaysr!   )Úhours)	rN   rO   rP   ÚfetchonerT   r   Únowr   ÚSYNC_FIRST_RUN_LOOKBACK_DAYS)rV   rN   rY   r\   r)   r)   r*   Ú_get_call_sync_watermark˜   s   ÿrb   c                     sÂ  t  d¡ ztjd4i t¤Ž} tjd4i t¤Ž}W n ty0 } zt  d|¡ W Y d}~dS d}~ww z¥zmt|ƒ}t  dt	|ƒt
¡ t|ƒ}t  d| d¡¡ |  ¡ }t
› dt
› d	t
› d
t
› dg}| d¡ dd„ | ¡ D ƒ‰‡fdd„|D ƒ}|st  dt
¡ W W |  ¡  | ¡  dS |d }t  d|¡ | d|› d¡ dd„ | ¡ D ƒ‰ g d¢}	g d¢}
|
‡ fdd„|	D ƒ7 }
d |
¡}| d|› d|› d|tf¡ | ¡ pØg }|sìt  d¡ W W |  ¡  | ¡  dS t  dt	|ƒ¡ | ¡ }dt
 }d}d}d}|D ]‚}t|ƒ}t|ƒ}| |¡r|d 7 }| d!¡pd" ¡ }t| d#¡p0| d$¡p0d%ƒ ¡ }| |t| d&¡p?t
ƒ|d' | d(¡pJd%d| d)¡pRd%| d*¡pYd%|d+ |d, | d-¡pfd%|||dd.df¡ |jd kr{|d 7 }q|jd/kr…|d 7 }q| ¡  t  d0||t	|ƒ|¡ |W W |  ¡  | ¡  S  ty× } z)t jd1|d2d3 z| ¡  W n
 tyÂ   Y nw W Y d}~W |  ¡  | ¡  dS d}~ww |  ¡  | ¡  w )5a  Sync new ANSWER calls from source DB to destination DB.

    Imports ALL ANSWER calls regardless of CRM match.
    CRM data from crm_leads_cache is used for enrichment only (lead names / owner).
    Uses a watermark derived from the latest call already in raw_calls.
    z=== STAGE 1: SYNC ===zDB connection failed: %sNr   zSLoaded %d phone variants for BID %s from crm_leads_cache (used for enrichment only)zFetching ANSWER calls since %sú%Y-%m-%d %H:%M:%SÚ_callhistoryÚ_call_historyÚ_callarchiveÚ_call_archivezSHOW TABLESc                 S   s   h | ]
}t | ¡ ƒd  ’qS )r   )ÚlistÚvalues©r-   Úrr)   r)   r*   r5   Î   s    zsync_calls.<locals>.<setcomp>c                    ó   g | ]}|ˆ v r|‘qS r)   r)   )r-   Út)Úexisting_tablesr)   r*   Ú
<listcomp>Ï   ó    zsync_calls.<locals>.<listcomp>z2No source call table found for BID %s in source DBzUsing source table: %szSHOW COLUMNS FROM `ú`c                 S   s   h | ]}|d  ’qS )ÚFieldr)   rj   r)   r)   r*   r5   Ú   r6   )rC   rD   rE   )
ÚcallidÚbidÚ	agentnameÚ	groupnameÚ	starttimeÚendtimeÚ
dialstatusrA   ÚfilenameÚ	emp_phonec                    rl   r)   r)   )r-   Úc)Úavailable_colsr)   r*   ro   Þ   rp   z, zSELECT z FROM `zP` WHERE dialstatus = 'ANSWER' AND starttime > %s ORDER BY starttime ASC LIMIT %sz#No new ANSWER calls since watermarkz?Fetched %d ANSWER calls from source, filtering by lead phone...aé  
            INSERT INTO `%s_raw_calls`
            (bid, callid, fileurl, status, agentname, groupname, call_starttime, call_endtime,
             call_status, agent_callinfo, customer_callinfo, direction,
             transcription_requested, transcription_status, selected_for_processing)
            VALUES (%%s, %%s, %%s, %%s, %%s, %%s, %%s, %%s, %%s, %%s, %%s, %%s, %%s, %%s, %%s)
            ON DUPLICATE KEY UPDATE
            fileurl = VALUES(fileurl),
            agentname = VALUES(agentname),
            groupname = VALUES(groupname),
            call_starttime = VALUES(call_starttime),
            call_endtime = VALUES(call_endtime),
            call_status = VALUES(call_status),
            agent_callinfo = CASE WHEN VALUES(agent_callinfo) != '' THEN VALUES(agent_callinfo) ELSE agent_callinfo END,
            customer_callinfo = CASE WHEN VALUES(customer_callinfo) != '' THEN VALUES(customer_callinfo) ELSE customer_callinfo END,
            direction = VALUES(direction)
        r!   rA   rB   r{   rD   r   rt   rs   rz   ru   rv   rw   rx   ry   Úpendingé   zKSync complete: %d inserted, %d updated out of %d fetched (%d had CRM match)zSync error: %sT©Úexc_infor)   )r$   r%   ÚpymysqlÚconnectÚ	SOURCE_DBÚDEST_DBrT   Úerrorr[   r:   rP   rb   ÚstrftimerN   rO   rQ   Úcloser7   Ú
SYNC_BATCHrK   r@   ÚintersectionrG   rH   r8   rI   ÚrowcountÚcommitÚrollback)Úsource_connrV   ÚerX   Ú	watermarkÚsource_cursorÚsource_table_candidatesÚsource_tablesÚsource_tableÚoptional_colsÚselect_colsÚcols_sqlÚcallsÚdest_cursorÚinsert_qÚinsertedÚupdatedÚcrm_matchedrJ   Úcustomer_phonerM   rA   Úagent_phoner)   )r}   rn   r*   Ú
sync_calls¬   sÒ   
€þü
cž
ü
K¶ð$ñ€þ

÷ÿ€÷
r    c                  C   sP   t jdi t¤Ž} z|  ¡ }dttttf }| |¡ | ¡ W |  	¡  S |  	¡  w )z!Get calls that need transcriptiona«  
            SELECT r.callid, r.fileurl
            FROM %s_raw_calls r
            LEFT JOIN %s_sarvamresponse s ON r.callid = s.callid
            WHERE r.call_status = 'ANSWER'
              AND r.fileurl IS NOT NULL AND r.fileurl != ''
              AND s.callid IS NULL
              AND TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) >= %d
            ORDER BY r.call_starttime DESC
            LIMIT %d
        Nr)   )
r‚   rƒ   r…   rN   rP   ÚMIN_CALL_DURATION_SECONDSÚTRANSCRIBE_BATCHrO   rQ   rˆ   ©ÚconnrN   Úqueryr)   r)   r*   Úget_pending_transcriptions<  s   

ö
r¦   c                 C   sž  ddl m} |td}d}z¬zytj| dd}|jdkr<t d||j¡ W W |r8tj	 
|j¡r:t |j¡ dS dS dS tjd	d
dd}| |j¡ | ¡  d}ttƒD ]J}z|jjddddd}| |jg¡}|rmW  n2W qT tyž }	 z$dt|	ƒv r“|td k r“t d|d |¡ t d¡ d}n‚ W Y d}	~	qTd}	~	ww |s¾t d|¡ W W |rºtj	 
|j¡r¼t |j¡ dS dS dS | ¡  |jddd}
| ¡ sêt d|¡ W W |rætj	 
|j¡rèt |j¡ dS dS dS d}|
jr|
jD ]}|j dkr|j!r|j!d j"} nqó|sd}|jj#|j$|gd}d}|j%r)||j%v r)|j%| j&}|sKt d|¡ W W |rGtj	 
|j¡rIt |j¡ dS dS dS tj|dd}|jdkrqW W |rmtj	 
|j¡rot |j¡ dS dS dS | '¡ W W |rˆtj	 
|j¡r‰t |j¡ S S S  t(yº }	 z$t d||	¡ W Y d}	~	W |r²tj	 
|j¡r´t |j¡ dS dS dS d}	~	ww |rÍtj	 
|j¡rÎt |j¡ w w w )zJTranscribe a single call using Sarvam translate batch API with diarizationr   )ÚSarvamAI)Úapi_subscription_keyNé<   )ÚtimeoutéÈ   z(Failed to download audio for %s: HTTP %sz.wavFz/tmp)ÚsuffixÚdeleteÚdirzsaaras:v2.5Tr   zTranslate all speech to English)ÚmodelÚwith_diarizationÚnum_speakersÚpromptÚ403r!   z2Upload attempt %d failed for %s (403), retrying...z!All upload attempts failed for %sé   i,  )Úpoll_intervalrª   zJob failed for %sÚSuccessz0.json)Újob_idÚfileszNo download URL for %szError transcribing %s: %s))Úsarvamair§   ÚSARVAM_API_KEYÚrequestsrG   Ústatus_coder$   r†   ÚosÚpathÚexistsÚnameÚunlinkÚtempfileÚNamedTemporaryFileÚwriteÚcontentrˆ   ÚrangeÚMAX_UPLOAD_RETRIESÚspeech_to_text_translate_jobÚ
create_jobÚupload_filesÚRuntimeErrorr8   ÚwarningÚtimeÚsleepÚstartÚwait_until_completeÚis_successfulÚjob_detailsÚstateÚoutputsÚ	file_nameÚget_download_linksr·   Údownload_urlsÚfile_urlrR   rT   )Ú	audio_urlrs   r§   ÚclientÚtmpÚaudio_responseÚjobÚattemptÚokr   ÚstatusÚoutput_fileÚdetailÚlinksÚdownload_urlÚrespr)   r)   r*   Útranscribe_callR  sº   

Iÿºüÿ
þ€ü+ÿØ#ÿà
€þÿöÿ
úÿüÿ€üÿræ   c                 C   sì  t jdi t¤Ž}zèzÄ| dd¡}| dd|  ¡}t |¡}| di ¡}t|tƒr.| dg ¡ng }g }|D ]L}	t|	 dd¡ƒ}
|
 	d	¡rL|
}|
 
d	d¡}nd	|
 }|
}d
| }|	 dd¡}|	 dd¡}|	 dd¡}|dk}|r€| ||||||||r{dnddœ¡ q4|rŽttdd„ |D ƒƒƒnd}|r˜|d d nd}| ¡ }dt }| || ||r®t |¡nd||||df¡ dt }| || f¡ | ¡  W W | ¡  dS  tyð } zt d| |¡ | ¡  W Y d}~W | ¡  dS d}~ww | ¡  w )z"Save Sarvam transcript to databaseÚ
transcriptr   Ú
request_idÚsarvam_batch_Údiarized_transcriptÚentriesÚ
speaker_idr3   Úspeaker_zSpeaker Ústart_time_secondsr   Úend_time_secondsÚ	speaker_0ÚagentÚcustomer)Úspeakerrì   ÚtextrÏ   ÚendÚ
start_timeÚend_timeÚrolec                 s   s    | ]}|d  V  qdS )ró   Nr)   )r-   Úsr)   r)   r*   r/   Ñ  s   € z"save_transcript.<locals>.<genexpr>r   éÿÿÿÿrõ   a  
            INSERT INTO %s_sarvamresponse
            (callid, transcript, speaker_segments, duration, num_speakers, request_id, raw_response, stt_provider, created_at)
            VALUES (%%s, %%s, %%s, %%s, %%s, %%s, %%s, %%s, NOW())
            ON DUPLICATE KEY UPDATE
            transcript = VALUES(transcript), speaker_segments = VALUES(speaker_segments),
            duration = VALUES(duration), num_speakers = VALUES(num_speakers),
            raw_response = VALUES(raw_response), updated_at = NOW()
        NÚsarvamzYUPDATE %s_raw_calls SET transcription_status = 'completed', status = 1 WHERE callid = %%sTz"Error saving transcript for %s: %sFr)   )r‚   rƒ   r…   rG   rR   ÚdumpsÚ
isinstanceÚdictr8   Ú
startswithÚreplaceÚappendr:   r9   rN   rP   rO   rŒ   rˆ   rT   r$   r†   r   )rs   Úresultr¤   Útranscript_textrè   Úraw_responseÚdiarizedrë   Úspeaker_segmentsÚentryÚraw_idrì   Únumró   rô   rÏ   rõ   Úis_agentr±   ÚdurationrN   rš   Úupdate_qr   r)   r)   r*   Úsave_transcriptª  sp   


ø€ø

ýû€û
r  c                  C   sf  t  d¡ tƒ } | st  dt¡ dS t  dt| ƒ¡ d}d}t| dƒD ]ƒ\}}tr0t  d¡  nw|d }|d }t  d	|t| ƒ|¡ t||ƒ}|rm| d
¡rmt	||ƒrh|d7 }| d
d¡}t  d|dd… ¡ n3|d7 }n.|d7 }|r~| d
¡s~t  
d|¡ tjdi t¤Ž}	|	 ¡ }
|
 dt |f¡ |	 ¡  |	 ¡  |t| ƒk r¦t d¡ q#t  d||t| ƒ¡ |S )zTranscribe all pending callsz=== STAGE 2: TRANSCRIBE ===z@No eligible calls for transcription (requires ANSWER and >= %ds)r   zFound %d calls to transcriber!   z*Shutdown requested, stopping transcriptionrs   Úfileurlz[%d/%d] Transcribing %srç   r   z  OK - %s...NéP   z  Empty transcript for %szJUPDATE %s_raw_calls SET transcription_status = 'failed' WHERE callid = %%sz.Transcription: %d success, %d failed out of %dr)   )r$   r%   r¦   r¡   r:   Ú	enumerater&   ræ   rG   r  rÌ   r‚   rƒ   r…   rN   rO   rP   rŒ   rˆ   rÍ   rÎ   )r~   ÚsuccessÚfailedÚidxrJ   rs   r  r  rm   r¤   rN   r)   r)   r*   Útranscribe_pendingò  sL   




þ
€r  c                  C   sV   t jdi t¤Ž} z|  ¡ }dtttttf }| |¡ | ¡ duW |  ¡  S |  ¡  w )zeCheck if the latest call is eligible for analysis (answered, long enough, transcribed, not analyzed).a¬  
            SELECT r.callid
            FROM %s_raw_calls r
            INNER JOIN %s_sarvamresponse s ON r.callid = s.callid
            LEFT JOIN %s_callanalytics a ON r.callid = a.callid
            WHERE r.callid = (
                SELECT callid
                FROM %s_raw_calls
                WHERE call_starttime IS NOT NULL
                ORDER BY call_starttime DESC
                LIMIT 1
            )
              AND r.call_status = 'ANSWER'
              AND s.transcript IS NOT NULL
              AND s.transcript != ''
              AND a.callid IS NULL
              AND TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) >= %d
            LIMIT 1
        Nr)   )	r‚   rƒ   r…   rN   rP   r¡   rO   r_   rˆ   r£   r)   r)   r*   Úlatest_call_ready_for_analysis'  s   î
r  c            	      C   sà   t  d¡ tƒ st  dt¡ dS ddlm}  ddlm} | ƒ }i }t|ƒD ]}| 	¡ r2t
||ƒ||< q%|t|td}t|d ƒ}t|d ƒ}|dksN|dkrit  d	||¡ |d D ]}t  d
|d |d ¡ qY|S t  d¡ |S )z8Analyze transcribed calls that haven't been analyzed yetz=== STAGE 3: ANALYZE ===zSLatest call not ready for analysis (answered + >= %ds + transcribed + not analyzed)r   )ÚConfig)Úbatch_analyze_calls)Úlimitr  r  zAnalysis: %d success, %d failedz  FAILED: %s - %srs   r†   zNo calls need analysis)r$   r%   r  r¡   Úconfigr  Úanalyze_calls_with_parametersr  r®   ÚisupperÚgetattrrP   ÚANALYZE_BATCHr:   r†   )	r  r  ÚcfgÚconfig_dictÚkeyÚresultsÚsuccess_countÚfailed_countÚfr)   r)   r*   Úanalyze_pendingD  s,   
€
þr%  c                  C   s~   t  d¡ t  d¡ t  dt ¡  d¡¡ t  d¡ tƒ } tr!dS tƒ }tr(dS tƒ }t  d¡ t  d| ||¡ t  d¡ dS )z&Run one iteration of the full pipeliner   zF======================================================================z  PIPELINE RUN - %src   Nz8Pipeline summary: synced=%d, transcribed=%d, analyzed=%d)	r$   r%   r   r`   r‡   r    r&   r  r%  )Ú	new_callsÚtranscribedÚanalyzedr)   r)   r*   Úrun_pipelinef  s   



r)  c               
   C   sL  t jdd} | jdddd | jdtdd	d
 | jdddd | jdddd | jdddd |  ¡ }t dt|j|j	¡ d}	 |d7 }t
rLt d¡ nSz|jrTtƒ  n|jr[tƒ  n
|jrbtƒ  ntƒ  W n ty } ztjd||dd W Y d }~nd }~ww |js†nt d|j	¡ t|j	ƒD ]}t
r˜ nt d¡ q’q@t d¡ d S )NzAutomated pipeline for BID 6004)Údescriptionz--continuousÚ
store_truezRun continuously)ÚactionÚhelpz
--intervalr"   z/Interval between runs in seconds (default: 120))ÚtypeÚdefaultr-  z--sync-onlyzOnly run sync stagez--transcribe-onlyzOnly run transcription stagez--analyze-onlyzOnly run analysis stagez9Pipeline started for BID %s (continuous=%s, interval=%ds)r   Tr!   zShutting down gracefully...z"Pipeline error in iteration %d: %sr€   z%Waiting %d seconds before next run...zPipeline stopped.)ÚargparseÚArgumentParserÚadd_argumentÚintÚ
parse_argsr$   r%   rP   Ú
continuousÚintervalr&   Ú	sync_onlyr    Útranscribe_onlyr  Úanalyze_onlyr%  r)  rT   r†   rÆ   rÍ   rÎ   )ÚparserÚargsÚ	iterationr   Ú_r)   r)   r*   Úmain  sH   
€€ÿår>  Ú__main__)<Ú__doc__Úsysr½   rÍ   rR   Úloggingr0  rÂ   Úsignalr‚   r»   r   r   Úpymysql.cursorsr   r¾   ÚdirnameÚabspathÚ__file__ÚBACKEND_DIRÚinsertÚchdirÚdotenvr   ÚbasicConfigÚINFOÚFileHandlerr7   ÚStreamHandlerÚ	getLoggerr$   rP   Úgetenvrº   rÇ   r„   r…   r‰   r¢   r  r¡   ra   r&   r+   ÚSIGTERMÚSIGINTr@   rK   r[   rb   r    r¦   ræ   r  r  r  r%  r)  r>  Ú__name__r)   r)   r)   r*   Ú<module>   s’   
"þý

ÿý



ú



ú
 XH5"
,
ÿ