o
    M)j7                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlmZm	Z	 d dl
mZmZmZmZ d dlmZmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ e  d
edefddZd
edefddZe e!Z"e"#e$  e dZ%e%#e$  d dl&m'Z(m)Z)m*Z+m,Z,m-Z. d dl/m0Z0 G dd dZ1e!dkrd dl2Z2e23 Z4e4j5ddd e4j5de6dd e47 Z8e1e8j9Z:e:j;e8j<d dS dS )    N)datetime	timedelta)AnyDictListOptional)unquoteurlparse)load_dotenv)
DictCursor)CallAnalyzer)Config)DatabaseHandlerbidreturnc                 C   s$   d dd t|  D }|pdS )N c                 s   s$    | ]}|  s|d v r|V  qdS )z_-N)isalnum.0c r   ?/home/aiteam/pcaa-dev/dashboard-backend/orchestrate_pipeline.py	<genexpr>   s   " z1_sanitize_bid_for_log_filename.<locals>.<genexpr>unknown)joinstrstrip)r   sr   r   r   _sanitize_bid_for_log_filename   s   r   c                 C   sB  t | }tjtjt}tj|d| d}tj|d| d}td}t	t
}|j  |tj d|_t|}|| ttj}|| || || t	d}	|	j  |	tj d|	_t|}
|
td |	|
 t	d}|j  |tj d|_|| || ||fS )	zOSeparate orchestration vs analytics logs per business under dashboard-backend/.orchestration_z.loganalytics_updates_z)%(asctime)s - %(levelname)s - %(message)sFanalytics_updatesz%(asctime)s - %(message)sleadsquared_activity_push)r   ospathdirnameabspath__file__r   logging	Formatter	getLogger__name__handlersclearsetLevelINFO	propagateFileHandlersetFormatterStreamHandlersysstderr
addHandler)r   bid_safebase_dir	orch_pathanalytics_pathline_fmtorch_modofhoshalogahlsq_logr   r   r   $_setup_per_bid_orchestration_logging   s:   















rB   r!   )call_duration_seconds evaluate_min_duration_for_ingestis_below_min_durationpurge_unprocessed_if_below_minskip_reason)resolve_mcube_group_sqlc                   @   s  e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zde	fddZ
de	dee fddZde	defddZddde	defddZddde	dededeeee f fddZdd  Zd!ed"ed#eddfd$d%Zd&d' Zd(ede	fd)d*Zd+e	d,edee fd-d.Zdud/ee d0ed1ee defd2d3Zdefd4d5Zd6d7 Zdvd8d9Zddd:d;d<ed+e	d=edefd>d?Zd@e de!fdAdBZ"		dvdCdDZ#de$e fdEdFZ%dGdH Z&dwdJdKZ'de	defdLdMZ(dNefdOdPZ)dNedefdQdRZ*dNedefdSdTZ+dudUdVZ,dxdXedefdYdZZ-defd[d\Z.de	defd]d^Z/d_e0defd`daZ1dyd=ede2eef fdbdcZ3defdddeZ4dvdfdgZ5dhdi Z6djedkeddfdldmZ7dndo Z8d=edefdpdqZ9dzdsdtZ:dS ){Orchestratorc                 C   s   || _ t|\| _| _t | _| j | _d | _G dd d}|| j| _t	| j| _
t| j| _|   d| _d| _d| _d| _g | _d| _d | _d| _|   tdd	| _td
d| _d S )Nc                   @   s&   e Zd Zdd ZdddZdd ZdS )	z,Orchestrator.__init__.<locals>.ConfigWrapperc                 S   s
   || _ d S N)_config)selfconfigr   r   r   __init__\   s   
z5Orchestrator.__init__.<locals>.ConfigWrapper.__init__Nc                 S   s   t | j||S rJ   getattrrK   )rL   keydefaultr   r   r   get_   s   z0Orchestrator.__init__.<locals>.ConfigWrapper.getc                 S   s   t | j|S rJ   rO   )rL   rQ   r   r   r   __getattr__b      z8Orchestrator.__init__.<locals>.ConfigWrapper.__getattr__rJ   )r+   
__module____qualname__rN   rS   rT   r   r   r   r   ConfigWrapper[   s    
rX   TrR   Fr   Z   RABBITMQ_HOST	localhostRABBITMQ_QUEUEstt_jobs)r   rB   _orch_log_path_analytics_log_pathr   rM   
source_bidsource_db_overrideconfig_wrappedr   
db_handlerr   analyzer_load_telephony_source_configanalytics_enabledanalysis_modewebhook_ingest_enabledgroup_filter_enabledallowed_groupnamesmin_call_duration_smin_call_duration_effective_atlookback_days_load_processing_configr#   getenvrabbitmq_hostrabbitmq_queue)rL   r   rX   r   r   r   rN   T   s*   
zOrchestrator.__init__c              
   C   s  z| j   | j | jpi }W n ty/ } ztd| j d|  W Y d}~dS d}~ww t|dp7d	 
 | _|d}|durXt| rStt|nt|| _| jdkr`d| _tt|d	phd
| _| j |d| _| jrtd| j| jrd| jnd |d}|durtd
t|nd
| _| jd
kr| j | j| _td| j| j| j n|d| _|d}|durz
td
t|| _W n ttfy   Y nw tt|dpd
| _| jrtd| j dS dS )zALoad optional per-business controls without changing legacy BIDs.z'Could not read pipeline config for BID : Nrg   rR   rf   transcription_onlyFri   r   rj   z#Group filter enabled for BID %s: %s, z(no groups selected)rk   zCMin call duration for BID %s: %ss (applies to calls from %s onward)rl   rm   rh   u@   Webhook ingest enabled for BID %s — polling ingest is skipped.)rc   %ensure_business_pipeline_config_tableget_pipeline_configr   	Exceptionloggerwarningr   rS   r   lowerrg   isdigitboolintrf   ri   _decode_allowed_groupnamesrj   infor   maxrk    ensure_min_duration_effective_atrl   rm   	TypeError
ValueErrorrh   )rL   cfgeraw_enabledraw_minraw_lookbackr   r   r   rn   w   s^   

"



z$Orchestrator._load_processing_configc              
   C   sb  z| j | jp	g }W n ty* } ztd| j d|  W Y d}~dS d}~ww |s/dS d}|D ]}t|dp<ddkrD|} nq3|sK|d }|dpQi }|d}|d	}|d
}|d}	t|dpmd}
t|dpw| j	 }|r|r|r|	r||
|||	d| _
|| _td|dd d| j d| j  dS td| j d dS )zIPrefer active telephony integration source DB/source_bid when configured.z.Could not read telephony integrations for BID rr   N	is_activer      rM   hostuserpassworddatabaseporti  r`   )r   r   r   r   r   zUsing provider	telephonyz integration source BID z for customer BID z%Active telephony integration for BID zC is missing DB fields; falling back to SYNC_SOURCE_DB_* env config.)rc   list_telephony_integrationsr   rw   rx   ry   r}   rS   r   r   ra   r`   r   )rL   integrationsr   activeitemr   r   r   r   r   r   r`   r   r   r   re      sT   



z*Orchestrator._load_telephony_source_configc              	   C   s,   t j| jj| jj| jj| jj| jjtddS )NT)r   r   r   r   r   cursorclass
autocommit)	pymysqlconnectrM   DB_HOSTDB_PORTDB_USERDB_PASSWORDDB_NAMEr   rL   r   r   r   get_db_connection   s   zOrchestrator.get_db_connectionc                 C   sh   | j rtj| j d t| j d | j d | j d | j d tdS tj| jj| jj| jj| jj	| jj
tdS )Nr   r   r   r   r   )r   r   r   r   r   r   )ra   r   r   r}   r   rM   SYNC_SOURCE_DB_HOSTSYNC_SOURCE_DB_PORTSYNC_SOURCE_DB_USERSYNC_SOURCE_DB_PASSWORDSYNC_SOURCE_DB_NAMEr   r   r   r   get_source_db_connection   s"   z%Orchestrator.get_source_db_connectionc                 C   sv   |d u rd S t |tr|jr|jd dS |S zt|dd}t|}|jr.|jd dW S |W S  ty:   Y d S w )N)tzinfoZz+00:00)
isinstancer   r   replacer   fromisoformatrw   )rL   valuetextparsedr   r   r   _parse_datetime   s   

zOrchestrator._parse_datetimecallc                 C   s   |  |dp|dS )Ncall_starttime	starttime)r   rS   rL   r   r   r   r   _call_starttime  s   zOrchestrator._call_starttimer   c                 C   s   t |S rJ   )shared_call_duration_secondsr   r   r   r   _call_duration_seconds  s   z#Orchestrator._call_duration_secondsc                 C   s   t || j| jS rJ   )shared_is_below_min_durationrk   rl   r   r   r   r   _is_below_min_duration	  s
   z#Orchestrator._is_below_min_durationNaudio_duration_sc                C   s   t || j|dS )Nr   )shared_skip_reasonrk   )rL   r   r   r   r   r   _min_duration_skip_reason  s
   z&Orchestrator._min_duration_skip_reasonNot ingested
log_prefixrecording_urlr   c                C   sT   | j dkrdS t|| j | j|dd\}}}|r&td|d|| d|fS d|fS )zKReturn (should_skip, probed_audio_seconds) using WAV probe when configured.r   )FNT)probe_audioz[%s] %s (%s)callidF)rk   rD   rl   rx   r   rS   )rL   r   r   r   skipreasonprobedr   r   r   _min_duration_blocks_action  s$   
z(Orchestrator._min_duration_blocks_actionc                 C   s   | j sd S dd | jpg D S )Nc                 S   s$   g | ]}t | rt | qS r   )r   r   r   gr   r   r   
<listcomp>5  s   $ z5Orchestrator._active_group_filter.<locals>.<listcomp>)ri   rj   r   r   r   r   _active_group_filter2  s   z!Orchestrator._active_group_filterwhere_partsquery_params
group_exprc                 C   sj   |   }|d u r
d S |s|d d S ddgt| }|d| d| d |dd |D  d S )	Nz1=0rt   %szLOWER(TRIM(z)) IN ()c                 S   s   g | ]}|  qS r   rz   r   r   r   r   r   @      z5Orchestrator._append_group_filter.<locals>.<listcomp>)r   appendr   lenextend)rL   r   r   r   groupsplaceholdersr   r   r   _append_group_filter7  s   
z!Orchestrator._append_group_filterc                 C   sl   | j  d| j  dg}|D ]}|d|f | r'td|  |  S qtd| j  dd| )N_callhistory_call_historyzSHOW TABLES LIKE %szUsing source table: z2No source call history table found for source BID z	. Tried: rt   )r`   executefetchonerx   r   RuntimeErrorr   )rL   source_cursor
candidates
table_namer   r   r   _resolve_source_tableB  s   
z"Orchestrator._resolve_source_tabler   c                 C   sT   | d|f i }| pg D ]}t|dp|dpd}|r'||| < q|S )Nz
            SELECT column_name
            FROM information_schema.columns
            WHERE table_schema = DATABASE() AND table_name = %s
            column_nameCOLUMN_NAMEr   )r   fetchallr   rS   rz   )rL   r   r   colsrownamer   r   r   _source_table_columnsN  s   z"Orchestrator._source_table_columnssource_colsr   c                 G   s*   |D ]}|  }||v r||   S qd S rJ   r   )rL   r   r   	candidaterQ   r   r   r   _first_source_col^  s   zOrchestrator._first_source_colcol_namealiasdefault_sqlc                 C   s4   |r
d| d| S |d ur| d| S d| S )Nc.`z` AS z AS zNULL AS r   )rL   r   r   r   r   r   r   _source_col_expre  s
   
zOrchestrator._source_col_exprc                 C   s   d| j  S )Norchestrator_ingest_)r   r   r   r   r   _ingest_watermark_keyl  rU   z"Orchestrator._ingest_watermark_keyc                 C   s   d }|   }z6| $}|d| j d | }|r(|dr(| |d }W d    n1 s2w   Y  W |  n|  w | j	| j| 
 }|rS| |nd }dd ||fD }|skt jdddddS t|S )Nz/SELECT MAX(call_starttime) AS last_start FROM `_raw_calls`
last_startc                 S   s   g | ]}|r|qS r   r   )r   dr   r   r   r     r   z6Orchestrator._get_ingest_watermark.<locals>.<listcomp>r   )hourminutesecondmicrosecond)r   cursorr   r   r   rS   r   closerc   get_sync_watermarkr   r   nowr   r   )rL   db_max	dest_conndest_cursorresultstored	stored_dtr   r   r   r   _get_ingest_watermarko  s(   
z"Orchestrator._get_ingest_watermarkc                 C   sD   |  |}|s	d S |  }||krd S | j| j|  |  d S rJ   )r   r  rc   set_sync_watermarkr   r   	isoformat)rL   r   new_dtcurrentr   r   r   _advance_ingest_watermark  s   
z&Orchestrator._advance_ingest_watermark2   after_startbefore_startlimitsource_tabler  c             	   C   s  |  |dd}|  |dddd}|  |ddd	d
}	|  |dddd}
|  |d}|  |dddddd}|  |dddd}t|| j||dd\}}}}|  |dddd}|  |d d!d"d#}|  |d$d%d&d'd(}|  |d)d*}|rs|rs|s{td+| d,g }g }|d ur|d-| d. || |d ur|d-| d/ || |
r|d-|
 d0 |  }|d ur|sg S |d ur| ||| |rd1|nd2}|d ur|d u rd3nd4}d5g d6| d7| |dd8 d9| d:| d;| |	d d9| |
dd< d9| |dd= d:| d>| |dd8 d9| |d!d8 d9| |d$ d9| |d) d?| d@| dA| dB| dC| dD}|	|t
| jg| | |g  | prg S )ENr   call_idr   r   
start_timecall_start_timeendtimecall_endtimeend_timecall_end_time
dialstatuscall_status
callstatusstatus	directionfilenamefileurlfileUrlfile_urlr   	audio_url	agentname
callernamecaller_name
agent_namer   
call_aliascallfrom	emp_phoneagent_callinfoagent_phonecalltoclicktocalldidcustomer_callinfocustomer_phoneansweredtimeanswered_timetalktime	talk_timebillsecpulsempulseSource table z missing required columnsr   z` > %sz` <= %sz` IN ('ANSWER') AND z1=1ASCDESCr   z'
            SELECT
                c.`z8` AS callid,
                %s AS bid,
                z''z,
                z,
                c.`z ` AS starttime,
                z'ANSWER'z	'inbound'z` AS filename,
                z
            FROM `z` c
            z
            WHERE z
            ORDER BY c.`z` z
            LIMIT %s
        )r   rH   r`   r   r   r   r   r   r   r   tupler   )rL   r   r  r   r  r  r  
callid_col	start_colend_coldialstatus_coldirection_colfilename_col	agent_colgroup_join_sqlgroup_join_paramsgroupname_select_sqlgroup_filter_exprcallfrom_col
callto_colanswered_col	pulse_colr   r   active_groups	where_sqlorderqueryr   r   r   _fetch_answer_calls_from_source  s   











	


z,Orchestrator._fetch_answer_calls_from_sourcelookback_startc                 C   s   |   }z7| $}|d| j d|f dd | pg D W  d    W |  S 1 s0w   Y  W |  d S |  w )NzSELECT callid FROM `z&_raw_calls` WHERE call_starttime >= %sc                 S   "   h | ]}| d rt|d  qS r   rS   r   r   r   r   r   r   	<setcomp>  s    
zBOrchestrator._load_existing_callids_in_lookback.<locals>.<setcomp>)r   r   r   r   r   r   )rL   rN  connr   r   r   r   "_load_existing_callids_in_lookback  s   


z/Orchestrator._load_existing_callids_in_lookbackc           	   
   C   s   z1| d| d| d| d|f | pi }t|dpd}|r/td| j|| W d S W d S  tyJ } ztd| W Y d }~d S d }~ww )	Nz>
                SELECT COUNT(*) AS cnt
                FROM `z` c
                WHERE c.`z ` > %s
                  AND c.`z` = 'ANSWER'
                cntr   ztNo calls ingested for BID %s although %s ANSWER call(s) exist after watermark %s (check group filter / min duration)z+Could not log ingestion skip diagnostic: %s)	r   r   r}   rS   rx   r   r   rw   ry   )	rL   r   r  r;  r=  	watermarkr   pendingr   r   r   r   _log_ingestion_skip_diagnostic  s2   	
z+Orchestrator._log_ingestion_skip_diagnosticc              	   C   s   |sg S t | }|dr|gS zt|t rt|d}n|}|d}|d}W n ty8   |g Y S w | jp>| j	}t
j|}d| d| d| d| d| d| d| d| gS )	Nhttp%Y-%m-%d %H:%M:%Sz%Yz%mz4https://recordings.mcube.com/mcubefiles112/appmcube//z3https://recordings.mcube.com/mcubefiles112/classic/z	/inbound/)r   r   
startswithr   r   strptimestrftimerw   r`   r   r#   r$   basename)rL   r  r   dtyearmonthr   r   r   r   r   _recording_url_candidates  s&   



z&Orchestrator._recording_url_candidatesc                 C   s$   |  ||}|r|d S t|pdS )Nr   r   )rd  r   )rL   r  r   r   r   r   r   _get_full_url)  s   zOrchestrator._get_full_url   c           (      C   s  t  d}td|  td| j d| j d| j  td|  z|  }t  t	| j
d }td| d	| j
 d
| d |  }g }z| }| |}| ||}	| |	dd}
| |	dddd}| |	dddd}|
std| d|std| d|  }t|| j||	dd\}}}}|dur|std| j| 	 W d   W |  W dS | j|||	||d}t|}dd  |D }td|t| }|dkr:| j
dkr:| |}| j|||	||t|d! d"d#}d}|D ]/}t|dpd$}|r||v s||v rq|| || |d%7 }||kr- nq|r:td&|| j
| |sF|  ||||| W d   n	1 sQw   Y  W |  n|  w td't|  |rd(!d)d* |D }td+|  |sW dS | " }z| }d,| j d-}d}d}d}|D ]}|d} | #| }!|!r|du s|!|kr|!}|d | |d.|d/|d0|d1d2}i ||}"| $|d3 |d }#| j%|"|#d4d5\}$}%|$r|d%7 }q|%durtdt&t'|%}&nt(|"}&|)||d6p| j|d |#dt|d7p!d$t|d8p*d$|d |d.|dp9d9|d:p@d$|d;pGd$t|d<pOd=* + pWd=ddd|&f |j,d%krh|d%7 }q|rr| -| |rtd>|| j.| j td?| d@ |W  d   W |  W S 1 sw   Y  W |  W dS |  w  t/y }' ztdA|'  W Y d}'~'dS d}'~'ww )Bz>Fetch calls from source DB newer than the latest local record.r[  zOrchestration initiated at zLogs for BID u   : orchestration → u   ; analytics → z1Initial limit for extraction/ingestion is set to )dayszFound local watermark: z (lookback zd from r   r   r  r   r   r  r  r  r  r  r  r5  z has no callid/call_id columnz' has no starttime/call_starttime columnr   r$  NzjGroup filter is enabled for BID %s, but source table %s has no resolvable group column. Ingesting 0 calls.r   )r  r  c                 S   rO  rP  rQ  r   r   r   r   rS  h     " z,Orchestrator.ingest_calls.<locals>.<setcomp>      r
  r   r   z[Gap-fill backfill: found %s missed ANSWER call(s) within last %s days (before watermark %s)z"Number of records to be ingested: rt   c                 S   s   g | ]}t |d  qS rP  )r   r   r   r   r   r     s    z-Orchestrator.ingest_calls.<locals>.<listcomp>z!Call IDs that would be ingested: z&
                        INSERT INTO `aX  _raw_calls`
                        (bid, callid, fileurl, status, agentname, groupname, call_starttime, call_endtime,
                         call_status, agent_callinfo, customer_callinfo, direction,
                         transcription_requested, transcription_status, selected_for_processing,
                         duration_seconds)
                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                        ON DUPLICATE KEY UPDATE
                        fileurl = VALUES(fileurl),
                        agentname = VALUES(agentname),
                        groupname = VALUES(groupname),
                        call_starttime = VALUES(call_starttime),
                        call_endtime = VALUES(call_endtime),
                        call_status = VALUES(call_status),
                        agent_callinfo = VALUES(agent_callinfo),
                        customer_callinfo = VALUES(customer_callinfo),
                        direction = VALUES(direction),
                        duration_seconds = VALUES(duration_seconds)
                    r  duration_secondsr.  r3  )r   r   r  rk  r.  r3  r  r   r   r   r   	groupnameANSWERr'  r+  r  inboundzNDid not ingest %s call(s) below min duration %ss for BID %s (not stored in DB)zSuccessfully ingested z calls into local DB.zError during ingestion: )0r   r   r_  rx   r   r   r^   r_   r  r   rm   r   r   r   r   r   r   r   rH   r`   errorr   rM  listr   r   rU  minr   rS   r   addrY  r   r   r   re  r   r}   roundr   r   r   rz   rowcountr  rk   rw   )(rL   r  current_time_strrW  rN  source_conncallsr   r  r   r:  r;  r=  rI  _rD  forward_callsseen_ids	remainingexisting_idsbackfill_candidates
backfilledr   cidcall_ids_strr   r   insert_queryinsertedskipped_shortmax_batch_startr   r   parsed_startcall_for_durationr   skip_minprobed_audioeffective_durationr   r   r   r   ingest_calls-  s^  




L




L



 
\\zOrchestrator.ingest_callsc                 C   s   dt |dpdv S )N/recording-uploads/r  r   )r   rS   r   r   r   r   _is_manual_upload  s   zOrchestrator._is_manual_uploadurlc                 C   s   d}t |pd}||vrd S t||dd ddd }tjtjtjtd}tjtj||}|	|tj
 sF||krFdS tj|rTtj|dkrVdS dS )	Nr  r   r   ?r   recording_uploadsFT)r   r   splitr#   r$   r&   r   r%   r'   r]  sepisfilegetsize)rL   r  markerrelr8   	full_pathr   r   r    _validate_local_recording_upload  s    z-Orchestrator._validate_local_recording_uploadc                 C   sF   t t|pd}|jpd dkrdS |jpd }d|v o"|dS )Nr   zrecordings.mcube.comFz/mcubefiles)z.wavz.mp3z.oggz.mpegz.m4az.webmz.flac)r	   r   hostnamerz   r$   endswith)rL   r  r   r$   r   r   r   _is_trusted_mcube_recording_url  s   z,Orchestrator._is_trusted_mcube_recording_urlc                 C   s   t |pd }|dsdS z'tjdddddd	|gd
d
d}|jdkr5tdd |jp,d D r5W d
S W n	 t	y?   Y nw z+tjddddd	dddddd|gd
d
d}|jdkrf|jp^d dv riW d
S W dS W dS  t	yu   Y dS w )Nr   )zhttp://zhttps://Fcurlz-Iz-Lz
--max-time20z-sT)capture_outputr   r   c                 3   s0    | ]   d ot fdddD V  qdS )zHTTP/c                 3       | ]}| v V  qd S rJ   r   )r   codeliner   r   r          zCOrchestrator._http_recording_reachable.<locals>.<genexpr>.<genexpr>)z 200z 206N)r]  any)r   r   r  r   r     s
    
z9Orchestrator._http_recording_reachable.<locals>.<genexpr>z-oz	/dev/nullz-wz%{http_code}z-rz0-1023)200206)
r   r   r]  
subprocessrun
returncoder  stdout
splitlinesrw   )rL   r  headrangedr   r   r   _http_recording_reachable  sZ   
z&Orchestrator._http_recording_reachablec                 C   s   |  |}|d ur|S |r| |rtd|d| dS | |r%dS tdd dv }|rF| 	|rFtd|r@|dnd| dS | 	|rYtd	|rU|dnd| d
S )Nz3[%s] Trusting manual upload URL after DB insert: %sr   TORCHESTRATOR_TRUST_MCUBE_URLSr   )1trueyesz^[%s] HTTP probe inconclusive; trusting Mcube recording URL (ORCHESTRATOR_TRUST_MCUBE_URLS): %sr  z@[%s] Mcube recording not reachable yet; will retry URL check: %sF)
r  r  rx   r   rS   r  r#   ro   rz   r  )rL   r  r   local_resulttrust_mcuber   r   r   validate_url?  s.   


zOrchestrator.validate_urlr   
resp_aliasc                 C   s   | d| dS )Nz!.transcript IS NOT NULL AND TRIM(z.transcript) != ''r   )rL   r  r   r   r   _has_transcript_sql\  s   z Orchestrator._has_transcript_sqlc              	   C   sJ   t |pd}|dr#zt|ddd W S  ttfy"   Y dS w dS )Nr   
stt_retry::r   r   )r   r]  r}   r  r   r   )rL   transcription_statustsr   r   r   _parse_stt_retry_count_  s   
z#Orchestrator._parse_stt_retry_countc                 C   s   t |dpd}|dsdS | |d}|sdS tdttdd}t	 | 
 d	 }||k rBtd
|d|| dS dS )Nr  r   r  F
updated_atr   STT_RETRY_COOLDOWN_MINUTES10      N@z?[%s] STT retry cooldown active (%.0fm / %sm); skipping re-queuer   T)r   rS   r]  r   r   r}   r#   ro   r   r   total_secondsrx   r   )rL   r   r  r  cooldown_minutesage_minutesr   r   r   _stt_retry_cooldown_activeh  s"   
z'Orchestrator._stt_retry_cooldown_activeexcc                    s&   t |  d}t fdd|D S )N)zcould not connecttimeoutz	timed outthrottlz
rate limitzservice unavailablezconnection resetzendpoint urlc                 3   r  rJ   r   )r   r  msgr   r   r     r  z=Orchestrator._is_transient_analytics_error.<locals>.<genexpr>)r   rz   r  )rL   r  transient_markersr   r  r   _is_transient_analytics_error{  s   
z*Orchestrator._is_transient_analytics_errorc           
         s  d| j  dd| j  d}| d}tdttdd}dddddd	|  }z|   d
 d| d| d  j	d< d( fdd	}|ddd| d| d |ddd| d| d |ddd| d| d| d  d df  
 pg D ]:}t|dpd}|d }	|r|	sq| |	||	d!rÈ d" d#|f  j	rÈd$  d7  < td%| qW d&   n1 sw   Y  |  W |  n|  w t rtd'| j  S ))z;Fix common status mismatches so STT/analytics can progress.`r   _sarvamresponse`r   ri  $ORCHESTRATOR_RETRY_STT_AFTER_MINUTES15r   )promoted_to_transcribedreset_phantom_transcribedreset_stuck_queuedreset_stt_failedreset_invalid_url_revalidated
                    UPDATE z" r
                    INNER JOIN zq s ON r.callid = s.callid
                    SET r.status = 2
                    WHERE r.status IN (1, -2) AND 
                    r  r   c                    s|     d d| d| f dd   pg D }|sd S ddgt| }  d d	| d
t|  j|< d S )Nz.
                        SELECT r.callid FROM z r
                        WHERE r.status = %s
                          AND r.fileurl IS NOT NULL AND r.fileurl != ''
                          ze
                        ORDER BY r.id DESC
                        LIMIT %s
                        c                 S   s"   g | ]}| d rt|d  qS rP  rQ  rR  r   r   r   r     rh  zOOrchestrator.repair_pipeline_state.<locals>._reset_statuses.<locals>.<listcomp>rt   r   UPDATE z! SET status = 0 WHERE callid IN (r   )r   r   r   r   r9  rt  )from_statusstat_keyextra_wherecall_idsr   r   r  	raw_tablestatsr   r   _reset_statuses  s"   z;Orchestrator.repair_pipeline_state.<locals>._reset_statuses   r  zN
                      AND NOT EXISTS (
                        SELECT 1 FROM 9 s
                        WHERE s.callid = r.callid AND z-
                      )
                    r   r  r  z
                      AND COALESCE(r.transcription_status, '') != 'backlog_cleared'
                      AND (r.updated_at IS NULL OR r.updated_at < DATE_SUB(NOW(), INTERVAL zW MINUTE))
                      AND NOT EXISTS (
                        SELECT 1 FROM zE
                    SELECT callid, fileurl
                    FROM z
                    WHERE status = -1
                      AND fileurl IS NOT NULL AND TRIM(fileurl) != ''
                    ORDER BY id DESC
                    LIMIT %s
                    r   r  )r   r  r  z1 SET status = 0 WHERE callid = %s AND status = -1r  u6   [%s] Re-validated recording URL; reset status -1 → 0Nz$Pipeline state repair for BID %s: %s)r   )r   r  r   r}   r#   ro   r   r   r   rt  r   r   rS   r  rx   r   commitr   r  values)
rL   r  
resp_tablehas_txretry_after_minutesrT  r  r   r   r  r   r  r   repair_pipeline_state  s   







Zz"Orchestrator.repair_pipeline_statec                 C   sr  d| j  d}d| j  d}d}d}d}z)ttj| jd}| }|j| jddd}t|j	j
}t|j	j}|  W n	 tyF   Y nw td	ttd
d}	|  }
d}zN|
 8}d}tdttdd}|dkry||kryd|	 d}|d| d| d| d| d	 |j}W d   n1 sw   Y  |
  W |
  n|
  w |rtd| j || |S )z>Reset status=1 calls that have no transcript and no queue job.r  r   r  z5s.transcript IS NOT NULL AND TRIM(s.transcript) != ''r   r   T)queuedurablepassiveri  "ORCHESTRATOR_PHANTOM_QUEUE_MINUTES12r   r   STT_WORKER_COUNT4zs
                      AND (updated_at IS NULL
                           OR updated_at < DATE_SUB(NOW(), INTERVAL z MINUTE))
                    r  ai   r
                    SET r.status = 0,
                        r.transcription_status = 'not_requested',
                        r.transcription_requested = 0
                    WHERE r.status = 1
                      AND COALESCE(r.transcription_status, '') != 'backlog_cleared'
                      AND NOT EXISTS (
                        SELECT 1 FROM r  z/
                      )
                      r  NzRBID %s: healed %s phantom queued call(s) (status 1, no transcript, queue depth=%s))r   pikaBlockingConnectionConnectionParametersrp   channelqueue_declarerq   r}   methodmessage_countconsumer_countr   rw   r   r#   ro   r   r   r   rt  r  rx   r   )rL   r  r  r  queue_depth	consumersrmq_connchqphantom_minutesrT  healedr   extratarget_workersr   r   r   _heal_phantom_queued_calls  sb   
	
z'Orchestrator._heal_phantom_queued_callsc              
   C   s   zF|   }| }|d| j d | pg }W d    n1 s$w   Y  |  |rDddd |D }td| j| W d S W d S  t	ya } zt
d| j| W Y d }~d S d }~ww )NzN
                    SELECT status, COUNT(*) AS cnt
                    FROM `zw_raw_calls`
                    WHERE status IN (0, 1, -1, -2)
                    GROUP BY status
                    rt   c                 s   s(    | ]}d |d  d|d  V  qdS )zstatus r  =rV  Nr   )r   rr   r   r   r   B  s   & z?Orchestrator._log_transcription_backlog_hint.<locals>.<genexpr>z$Transcription backlog for BID %s: %sz7Could not log transcription backlog hint for BID %s: %s)r   r   r   r   r   r   r   rx   r   rw   ry   )rL   rT  r   rowssummaryr   r   r   r   _log_transcription_backlog_hint3  s&   

z,Orchestrator._log_transcription_backlog_hintc                 C   sL  t |d }|d }| j||dd\}}|rM|  }z-| }t|| j||| j| j|d W d    n1 s8w   Y  |  W |	  dS |	  w |d ur|  }z2| }|
d| j dtd	tt||f W d    n1 syw   Y  |  W |	  n|	  w td
| d |  }zz*| }d| j d}|
d| d|f | rtd
| d| d |
d| j d|f 	 W d    W W |	  dS |
d| j d|f | pi }	t|	dd ur|	dnd}
|
dv r$td
| d|
 d 	 W d    W W |	  dS t |	dp,ddkrItd
| d 	 W d    W W |	  dS |
d | j d!|f |jd"k rstd
| d# 	 W d    W W |	  dS ttj| jd$}| }|j| jdd% | j||d&}|jd| jt|tjd'd(d) |	  td
| d* 	 W d    W W |	  dS 1 sw   Y  W nR ty } zEtd
| d+|  z#| }|
d| j d,|f W d    n	1 sw   Y  W n
 ty   Y nw W Y d }~W |	  dS d }~ww W |	  d S |	  w )-Nr   r  zSkipping transcriptionr   r   Fz!
                        UPDATE `z_raw_calls`
                        SET duration_seconds = %s
                        WHERE callid = %s
                        r   [z] Triggering transcription...r  r  z$
                    SELECT id FROM z
                    WHERE callid = %s
                      AND transcript IS NOT NULL
                      AND TRIM(transcript) != ''
                    z"] Transcription already exists in z. Skipping queuing.UPDATE `z,_raw_calls` SET status = 2 WHERE callid = %sTz*SELECT status, transcription_status FROM `%_raw_calls` WHERE callid = %s LIMIT 1r  i)r   r  rf  z*] Skip RabbitMQ publish (raw_calls status=z already queued or done).r  r   backlog_clearedz;] Skip RabbitMQ publish (backlog_cleared terminal failure).z
                    UPDATE `z_raw_calls`
                    SET status = 1
                    WHERE callid = %s
                      AND status IN (0, -2)
                      AND COALESCE(transcription_status, '') != 'backlog_cleared'
                    r   z6] Skip RabbitMQ publish (status changed concurrently).r  )r  r  )r   r  r   r  )delivery_mode)exchangerouting_keybody
propertiesz(] Successfully queued for transcription.z!] Failed to queue transcription: z_raw_calls`
                        SET status = 0
                        WHERE callid = %s AND status = 1
                        ) r   r   r   r   rF   r   rk   rl   r  r   r   r   r}   rs  rx   r   r   rS   rt  r  r  r  rp   r  r  rq   basic_publishjsondumpsBasicPropertiesrw   ro  )rL   r   r  r   r  r  rT  r   r  r  current_statusr  r  job_payloadr   r   r   r   trigger_transcriptionG  s   






		Q"
QQ
Q
QA
	Qz"Orchestrator.trigger_transcriptionr  
transcriptc           	   
   C   s   zX| j | jp	i }t|dpddkrW dS ddlm} | j | j|}|s+W dS || j||| jd}|s:W dS | j 	| j||}|rVt
d||d|d	 W dS W dS  tyr } zt
d
|| W Y d}~dS d}~ww )z;Non-fatal sales propensity scoring after quality analytics.propensity_enabledr   r   N)score_call_propensity)analytics_contextrM   z [%s] Propensity score=%s band=%spropensity_scorepropensity_bandz/[%s] Propensity scoring skipped (non-fatal): %s)rc   rv   r   r}   rS   propensity_scoringr  get_call_analyticsrb   save_propensity_analyticsrx   r   rw   ry   )	rL   r  r  r   r  	analyticspayloadsavedr  r   r   r   _run_propensity_if_enabled  s:   
z'Orchestrator._run_propensity_if_enabledc                 C   s"  t d| d z|  }zw| e}|d| j d|f | }|rIt|dp-ddkrIt d| d 	 W d    W |	  W d	S |d
| j d|f | rqt d| d 	 W d    W |	  W d	S W d    n1 s{w   Y  W |	  n|	  w W n t
y } zt d| d|  W Y d }~nd }~ww tdttdd}d }t|D ]}zq| j| j|}	|	r|	dst d| d W  dS |	d }
|	d}|rt|trt|}|	d}| jj| j||
|pg |rt|nd d}t d| d|d d |  }zs| F}d }|rCz
ttt|}W n ttfyB   d }Y nw |d urW|d| j d|||f n|d| j d|f W d    n	1 snw   Y  td| j d| d |d  t d!|  W |	  n|	  w | ||
 td"d# d$vrz dd%l m!}m"} | j#| j|}|r|| j|||| j W n t
y } zt $d| d&|  W Y d }~nd }~ww td'd# d$vr-zdd(l%m&} || jj'| j||d)d* W W  d	S  t
y, } zt $d| d+|  W Y d }~W  d	S d }~ww W  d	S  t
y} } z@|}||d k rf| (|rft)d,d-| }t d.||d ||| t*+| W Y d }~qt d| d/|  W Y d }~ dS d }~ww |rt d| d0| d1|  dS )2Nr  z] Triggering analytics...zSELECT status FROM `r	  r  r   rf  z'] Already analyzed (status=3), skippingTzSELECT id FROM `z)_callanalytics` WHERE callid = %s LIMIT 1z(] Analytics row already exists, skippingz)] Could not check analytics idempotency: r   ANALYTICS_MAX_RETRIES3r  z(] No transcript found in response table.Fspeaker_segmentsduration)r   r   r  r&  actual_durationz%] Analytics complete. Quality Score: quality_score%z)
                                UPDATE `a)  _raw_calls`
                                SET status = 3,
                                    duration_seconds = %s,
                                    call_endtime = DATE_ADD(call_starttime, INTERVAL %s SECOND)
                                WHERE callid = %s
                                r  z,_raw_calls` SET status = 3 WHERE callid = %sz&SUCCESS: Analytics created/updated in z_callanalytics for callid=z with Quality Score=z"Analytics is completed for record ORCHESTRATOR_PUSH_LEADSQUAREDr  )0falseno)call_dict_for_lsq_pushpush_leadsquared_activitiesz] LeadSquared push failed: ORCHESTRATOR_PUSH_API)ApiPushServiceanalytics_saved)analytics_datatrigger_eventz] API push failed:    r  uC   [%s] Analytics transient error (attempt %s/%s): %s — retry in %ssz] Analytics failed: z] Analytics failed after z attempts: ),rx   r   r   r   r   r   r   r}   rS   r   rw   ry   r   r#   ro   rangerc   get_call_transcriptro  r   r   r  loadsrd   analyze_callfloatrs  r   r   analytics_loggerr#  rz   r"   r/  r0  get_raw_call_details	exceptionapi_push_servicer2  push_call_updater  rq  timesleep)rL   r  rT  r   r   r  max_attempts
last_errorattempttranscript_datar  r&  r'  r   rk  r/  r0  raw_callr2  r   wait_sr   r   r   trigger_analytics  s  
"




"
	 
zOrchestrator.trigger_analyticsc           
      C   s   | j sdS ddlm} d| j d}d| j d}|d| d| d|f | p,g }d}|D ]}t|d	p:d
 }	|	rJ|| j|	rJ|d7 }q1|rUt	
d| j| |S )z@Backup: re-queue status=2 calls missed by STT analytics publish.r   )publish_analytics_jobr  r   z_callanalytics`z.
            SELECT r.callid
            FROM z r
            LEFT JOIN z a ON r.callid = a.callid
            WHERE r.status = 2 AND a.id IS NULL
            ORDER BY r.id DESC
            LIMIT %s
            r   r   r   z>BID %s: enqueued %s status=2 call(s) to analytics_jobs backlog)rf   analytics_queuerJ  r   r   r   r   rS   r   rx   r   )
rL   r   r  rJ  r  analytics_tabler  queuedr   r  r   r   r   _enqueue_analytics_backlogq  s6   z'Orchestrator._enqueue_analytics_backlog
   c                 C   s  t d| j d|  | j|d |   |   | jr%t d| j n| j|d |  }z|	 }d| j d}g d}|
d| d	d
| d| j|f | }g }|D ]}| |req]|d }	|	}
| |	|d D ]}||	krt d|d  d|  | ||r|}
 nqs|
|	krt d|d  d|
  |
d| j d|
|d f |
|d< | |d |stdttdd}| |d}d }|rt |  d }|d ur||k rt d|d |||d  nt d|d  d|d   |
d| j d|d f q]| j||d dd\}}|r3t|| j|d || j| j|d q]|| q]t dt | d |D ]}| !| qF| j"sjt d | j| j# 	 W d    W |$  d S d!d"l%m&} | sx| jrt d#| j | '|| 	 W d    W |$  d S |
d$| d%|f | }t dt | d& |D ]}t(|d }t d'|  | )| qW d    n	1 sw   Y  W |$  n|$  w t d( d S ))NzStarting orchestration for BID z, limit r  z;Skipping polling ingest for BID %s (webhook ingest enabled)r  r   )z
status = 0zfileurl IS NOT NULLzfileurl != ''z7COALESCE(transcription_status, '') != 'backlog_cleared'z2call_starttime >= DATE_SUB(NOW(), INTERVAL %s DAY)zSELECT * FROM z WHERE r6  z ORDER BY id DESC LIMIT %sr  r   r  r   z"] Trying alternate recording URL: z] Repaired truncated URL: r  z._raw_calls` SET fileurl = %s WHERE callid = %sri  ORCHESTRATOR_URL_RETRY_MINUTES45r  z][%s] Recording URL not ready yet (age=%.0fm < %sm); will retry instead of marking invalid: %sz] Invalid file URL: z-_raw_calls` SET status = -1 WHERE callid = %szSkipping STT queuer   r   zFound z( valid calls to queue for transcription.z[Analytics is disabled for BID %s (analysis_mode=%s). Leaving transcribed calls at status=2.r   )event_driven_analytics_enabledzTSkipping orchestrator batch analytics for BID %s (event-driven analytics_jobs queue)zSELECT callid FROM z+ WHERE status = 2 ORDER BY id DESC LIMIT %sz calls ready for analytics.z&Transcription is completed for record z3This instance of the orchestrator job is completed.)*rx   r   r   r  r  r  rh   r  r   r   r   r   rm   r   r  rd  r  r   r}   r#   ro   r   rS   r   r   r  ry   r   rF   rk   rl   r   r   r  rf   rg   r   rK  rS  rN  r   rI  )rL   r  rT  r   r  r   r   valid_callsr   current_urlrepaired_urlcandidate_urlretry_minutes
call_startr  r  r  rS  analyzable_callsr  r   r   r   r    s   

	
	uu
uzOrchestrator.runrJ   )r   N)rf  )r   )r	  )rO  );r+   rV   rW   rN   rn   re   r   r   r   dictr   r   r}   r   r|   r   r   r   r9  r;  r   r   rp  r   r   r   r   r   r   r  r  rM  r   setrU  rY  r   rd  re  r  r  r  r  r  r  r  r  r  rw   r  r   r  r  r  r  r#  rI  rN  r  r   r   r   r   rI   S   s    #4.
$
	
Z

 O	
*	q
9 ! 	!rI   __main__z--bid1713)rR   z--limitrO  )typerR   rP  )=r  r(   r#   r  r   r  r4   rA  r   r   typingr   r   r   r   urllib.parser   r	   dotenvr
   pymysql.cursorsr   analyze_calls_with_parametersr   rM   r   rc   r   r   r   r9  rB   r*   r+   rx   r6   NullHandlerr<  min_duration_utilrC   r   rD   rE   r   rF   rG   r   mcube_group_utilrH   rI   argparseArgumentParserparseradd_argumentr}   
parse_argsargsr   orchr  r  r   r   r   r   <module>   s\   
%
           T
