o
    WF)jaj                     @  sx  U d Z ddlmZ ddlZddlZddlZddlZddlmZ ddlm	Z	m
Z
mZmZmZ ddlmZ ddlZddlZddlmZ ddlmZmZmZ eeZd	d
ddddddddddddddddddddddddddd d!Zd"ed#< dSd(d)ZdTd-d.ZdUd0d1ZdVd3d4Z dWd8d9Z!dXd;d<Z"dYd?d@Z#dYdAdBZ$dZdEdFZ%d[dJdKZ&d\dLdMZ'd]dOdPZ(G dQdR dRZ)dS )^zXUniversal per-BID call ingest webhook (additive; does not replace orchestrator polling).    )annotationsN)datetime)AnyDictListOptionalTuple)urlparse)
DictCursor)call_duration_seconds evaluate_min_duration_for_ingestpurge_unprocessed_if_below_min1zUPCAA universal call ingest. POST JSON to the per-BID URL with X-Ingest-Secret header.u!   string — unique call identifieru1   string — must be ANSWER to ingest and queue STTu*   string — YYYY-MM-DD HH:MM:SS or ISO-8601u'   string — full HTTPS URL to audio file)call_idcall_statuscall_start_timerecording_urlstringnumberzinbound | outboundu%   string — e.g. mcube, exotel, manual)call_end_timeduration_seconds
agent_name
group_name	directionagent_phonecustomer_phonesource97393902541780895736ANSWERz2026-06-08 10:45:38z2026-06-08 10:50:36i*  zYhttps://recordings.mcube.com/mcubefiles112/appmcube/2026/06/6004/97393902541780895736.wavzRicha vishwakarmazPresales Teamoutbound
9XXXXXXXXXmcube)r   r   r   r   r   r   r   r   r   r   r   r   )versiondescriptionrequired_fieldsrecommended_fieldsexampleDict[str, Any]CANONICAL_INGEST_SCHEMAbidstrpublic_base_urlreturnc                 C  s^   |pd d}d|  d}i tt| d|r| | n|dddd	d
idddddddS )N /z/api/v1/bids/z/calls/ingestPOSTzapplication/jsonu=   <shared INGEST_SECRET from server .env — same for all BIDs>)zContent-TypezX-Ingest-SecretzX-Webhook-Secretz"Accepted alias for X-Ingest-Secretz(Accepted (processed, queued, or skipped)zInvalid or missing secretz$Webhook ingest disabled for this BIDzMalformed payloadzBID raw_calls table missing)200401403400404)r)   methodurlheadersaliases	responses)rstripr(   r*   )r)   r+   basepath r=   >/home/aiteam/pcaa-dev/dashboard-backend/call_ingest_service.pyingest_schema_for_bid=   s(   r?   payloadkeysr   c                 G  sB   |D ]}|| v r| | d urt | |  dkr| |   S qd S )Nr-   )r*   strip)r@   rA   keyr=   r=   r>   _first_valueV   s
   (rD   Optional[datetime]c                 C  sv   | d u rd S t | tr| jr| jd dS | S zt| dd}t|}|jr.|jd dW S |W S  ty:   Y d S w )N)tzinfoZz+00:00)
isinstancer   rF   replacer*   fromisoformat	Exception)valuetextparsedr=   r=   r>   _parse_datetime]   s   

rO   Optional[str]c                 C  s.   t | }|s| d urt|  S d S |dS )Nz%Y-%m-%d %H:%M:%S)rO   r*   rB   strftime)rL   rN   r=   r=   r>   _format_db_datetimej   s   
rR   filename
source_bid	List[str]c              	   C  s   t | pd } | sg S | dr| gS zt|}|s| gW S |d}|d}W n ty6   | g Y S w tj| }t |pAd }d| d| d| d| d| d| d| d| gS )	Nr-   httpz%Yz%mz4https://recordings.mcube.com/mcubefiles112/appmcube/r.   z3https://recordings.mcube.com/mcubefiles112/classic/z	/inbound/)	r*   rB   
startswithrO   rQ   rK   osr<   basename)rS   	starttimerT   dtyearmonthnamer)   r=   r=   r>   _recording_url_candidatesq   s&   


r_    Tuple[Dict[str, Any], List[str]]c                C  s  g }t | ddd}t | ddd}t | ddd	d
}t | dddd}t | ddddd}t | dddd}|s8|d |s?|d |sF|d |sM|d t|pQd }	t|}
|	rp|	dspt|	|
pe||}|rn|d n|	}	tt | dpwd   p~d }t|pd t|pd  |
t||	tt | d!d"d#pdtt | d$d%pd|tt | d&d'd(pdtt | d)d*d+pdd,tt | d-d.pdd/}|d,urt| dkrzt	dt
t||d< W n ttfy   Y nw |d d,u rt|d |d |d0|d< ||fS )1z?Map canonical or common alias fields to raw_calls column names.r   callidcallIdr   
dialstatus
callStatusr   call_starttimerZ   
call_startr   call_endtimeendtimecall_endr   fileurlfileUrlfile_urlrS   r   durationansweredtimetalktimezcall_id is requiredzcall_status is requiredzcall_start_time is requiredzrecording_url is requiredr-   rV   r   r   inboundr   	agentname
callernamer   	groupnamer   agent_callinfo	emp_phoner   customer_callinfoclicktocalldidNr   provider)ra   r   re   rg   rj   rq   rs   r   rt   rv   r   r   )re   rg   r   )rD   appendr*   rB   rR   rW   r_   loweruppermaxintfloat	TypeError
ValueErrorr   )r@   rT   errorsr   r   rf   ri   	recordingrm   rj   	start_fmt
candidatesr   
normalizedr=   r=   r>   normalize_ingest_payload   sd   



r   r6   boolc                 C  sF   t t| pd}|jpd dkrdS |jpd }d|v o"|dS )Nr-   zrecordings.mcube.comFz/mcubefiles)z.wavz.mp3z.oggz.mpegz.m4az.webmz.flac)r	   r*   hostnamerz   r<   endswith)r6   rN   r<   r=   r=   r>   _is_trusted_mcube_recording_url   s   r   c                 C  s   t | pd } | dsdS z'tjdddddd	| gd
d
d}|jdkr5tdd |jp,d D r5W d
S W n	 t	y?   Y nw z+tjddddd	dddddd| gd
d
d}|jdkrf|jp^d dv riW d
S W dS W dS  t	yu   Y dS w )Nr-   )zhttp://zhttps://Fcurlz-Iz-Lz
--max-time20z-sT)capture_outputrM   r   c                 3  s0    | ]   d ot fdddD V  qdS )zHTTP/c                 3  s    | ]}| v V  qd S Nr=   ).0codeliner=   r>   	<genexpr>   s    z6_http_recording_reachable.<locals>.<genexpr>.<genexpr>)z 200z 206N)rW   any)r   r=   r   r>   r      s
    
z,_http_recording_reachable.<locals>.<genexpr>z-oz	/dev/nullz-wz%{http_code}z-rz0-1023)r0   206)
r*   rB   rW   
subprocessrun
returncoder   stdout
splitlinesrK   )r6   headrangedr=   r=   r>   _http_recording_reachable   sZ   
r   rj   ra   c                 C  s@   t | rdS tdd dv }|rt| rtd| dS dS )NTORCHESTRATOR_TRUST_MCUBE_URLSr-   )r   trueyeszA[%s] Trusting Mcube recording URL (ORCHESTRATOR_TRUST_MCUBE_URLS)F)r   rX   getenvrz   r   loggerinfo)rj   ra   trust_mcuber=   r=   r>   recording_url_ready   s   r   rs   cfgTuple[bool, Optional[str]]c                 C  sj   t t|dpdsdS dd |dpg D }|sdS t| p!d  }||vr3d	d
|  dfS dS )Ngroup_filter_enabledr   )TNc                 S  s(   g | ]}t | rt |  qS r=   )r*   rB   rz   )r   gr=   r=   r>   
<listcomp>  s    
z"_group_allowed.<locals>.<listcomp>_allowed_groupnames)Fz5group_filter_enabled but no allowed groups configuredr-   Fzgroup 'z' not in allowed list)r   r}   getr*   rB   rz   )rs   r   allowedr^   r=   r=   r>   _group_allowed  s   r   c                   C  s    t dd pt dd S )z9One platform-wide secret (Mcube sends this for all BIDs).INGEST_SECRETr-   WEBHOOK_SECRET)rX   r   rB   r=   r=   r=   r>   shared_ingest_secret  s   r   providedc                 C  sD   | sdS t  }|r| |krdS t|dpd }t|o | |kS )NFTingest_secretr-   )r   r*   r   rB   r   )r   r   global_secretper_bidr=   r=   r>   _verify_secret  s   r   c                   @  sh   e Zd Zdd Zdd Zd+d	d
Zd,ddZd-ddZd.ddZddddd/d%d&Z	dd'd0d)d*Z
dS )1CallIngestServicec                 C  s
   || _ d S r   )
db_handler)selfr   r=   r=   r>   __init__&  s   
zCallIngestService.__init__c              	   C  sR   t j| jjdt| jjdpd| jjd| jjd| jjdtddS )	NDB_HOSTDB_PORTi  DB_USERDB_PASSWORDDB_NAMET)hostportuserpassworddatabasecursorclass
autocommit)pymysqlconnectr   configr   r}   r
   )r   r=   r=   r>   _db_conn)  s   zCallIngestService._db_connr)   r*   r,   r'   c                 C  s`   | j   | j |pi }tdt|dpddkr#| j ||d< | j |d|d< |S )Nr   min_call_duration_smin_call_duration_effective_atallowed_groupnamesr   )r   %ensure_business_pipeline_config_tableget_pipeline_configr|   r}   r    ensure_min_duration_effective_at_decode_allowed_groupnames)r   r)   r   r=   r=   r>   _load_bid_config4  s   
z"CallIngestService._load_bid_configr   c                 C  s$   | d}| d|f | d uS )N
_raw_callszSHOW TABLES LIKE %s)executefetchone)r   cursorr)   
table_namer=   r=   r>   _raw_table_exists>  s   
z#CallIngestService._raw_table_existsra   rP   r   r@   actionreasonqueuedsignature_validNonec          
   
   C  sX   z| j j||||||||d W d S  ty+ }	 ztd||	 W Y d }	~	d S d }	~	ww )Nr)   ra   r   r@   r   r   r   r   z)Failed to write ingest log for BID %s: %s)r   log_call_ingest_eventrK   r   warning)
r   r)   ra   r   r@   r   r   r   r   excr=   r=   r>   _log_ingestC  s   
zCallIngestService._log_ingestrowc                 C  s   | d| d||d |d d|dpd|dpd|d	|d
|dp)d|dp/d|dp5d|dp;dd d d |df d S )Nz
            INSERT INTO `a  _raw_calls`
            (bid, callid, fileurl, status, agentname, groupname, call_starttime, call_endtime,
             call_status, agent_callinfo, customer_callinfo, direction,
             transcription_requested, transcription_status, selected_for_processing,
             duration_seconds)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE
            fileurl = VALUES(fileurl),
            agentname = VALUES(agentname),
            groupname = VALUES(groupname),
            call_starttime = VALUES(call_starttime),
            call_endtime = VALUES(call_endtime),
            call_status = VALUES(call_status),
            agent_callinfo = VALUES(agent_callinfo),
            customer_callinfo = VALUES(customer_callinfo),
            direction = VALUES(direction),
            duration_seconds = VALUES(duration_seconds)
            ra   rj   r   rq   r-   rs   re   rg   r   r   rt   rv   r   rp   r   )r   r   )r   r   r)   r   r=   r=   r>   _upsert_raw_call]  s,   z"CallIngestService._upsert_raw_callr   Nmin_duration_sduration_roweffective_atr   r   r   r}   r   Optional[Dict[str, Any]]c                C  s@  |   }zz>| .}t|pi ||||dkd\}	}
}|	rAt||||p'i || td||
 	 W d   W W |  dS |durX|d| dtdt	t
||f d| d	}|d
| d|f | r|d| d|f 	 W d   W W |  dS |d| d|f | pi }t	|ddur|dnd}|dv r	 W d   W W |  dS t|dpddkr	 W d   W W |  dS |d| d|f |jdk r	 W d   W W |  dS tdd}tdd}ttj|d}| }|j|dd |jd|t|||d tjd!d"d# |  	 W d   W W |  dS 1 s?w   Y  W nN ty } zAtd$||| z"| }|d| d%|f W d   n	1 srw   Y  W n
 ty   Y nw W Y d}~W |  dS d}~ww W |  dS |  w )&zUMirror orchestrate_pipeline.trigger_transcription without reconfiguring log handlers.r   probe_audioz#[%s] Webhook STT queue skipped (%s)NFz!
                        UPDATE `z_raw_calls`
                        SET duration_seconds = %s
                        WHERE callid = %s
                        `z_sarvamresponse`z$
                    SELECT id FROM z
                    WHERE callid = %s
                      AND transcript IS NOT NULL
                      AND TRIM(transcript) != ''
                    zUPDATE `z,_raw_calls` SET status = 2 WHERE callid = %sTz*SELECT status, transcription_status FROM `z%_raw_calls` WHERE callid = %s LIMIT 1statusi)         transcription_statusr-   backlog_clearedz
                    UPDATE `z_raw_calls`
                    SET status = 1
                    WHERE callid = %s
                      AND status IN (0, -2)
                      AND COALESCE(transcription_status, '') != 'backlog_cleared'
                    r   RABBITMQ_HOST	localhostRABBITMQ_QUEUEstt_jobs)r   )queuedurable)r)   r   r   r   )delivery_mode)exchangerouting_keybody
propertiesz-[%s] Failed to queue transcription for %s: %sz;_raw_calls` SET status = 0 WHERE callid = %s AND status = 1)r   r   r   r   r   r   closer   r|   r}   roundr   r   r*   rowcountrX   r   pikaBlockingConnectionConnectionParameterschannelqueue_declarebasic_publishjsondumpsBasicPropertiesrK   error)r   r)   r   r   r   r   r   connr   skip_min
min_reasonprobed_audio
resp_tablecurrentcurrent_statusrabbitmq_hostrabbitmq_queuermq_connr  r   r=   r=   r>   _queue_transcription  s   e		
e
 ee

e

eY

ez&CallIngestService._queue_transcription)r   r   c                C  s|  t | }| |}tdt|dpd}|d}t||}tt|dp)dsV| j|t |dp;|dp;dt |dpCd|d	d
d|d ddd| ddS |s| j|t |dpg|dpgdt |dpod|d	dddd dddddS t |dp| }t	||d\}	}
|
r| j||	d|	dpd|d	dddd ddd
|
ddS |	d }|	dpt |dpd}|  }z|  }| ||s| j||||d	dddd ddd| dddW  d    W |  S |	d  d!kr%| j||||d"d#ddd d||d"d#d$d%d&W  d    W |  S ||	d'|	d(|	d)d*}t||||	d+|dkd,\}}}|r|t|||||| | j||||d"d-| ddd d||d"d.d/| d0d%d&W  d    W |  S |d urtdtt||	d)< t|	d1pd|\}}|s| j||||d"d2| ddd d||d"d3|d%d&W  d    W |  S | |||	 W d    n	1 sw   Y  W |  n|  w d}d4}d5}||	d'|	d(|	d)d*}t|	d+ |r| j|||	d+ |||d6}|rd7}d8}nd4}d9}| j||||||r'd n|d4kr.d:nd;|dd d|||||d%d<S )=Nr   r   r   webhook_ingest_enabledr   ra   r-   r   rejectedwebhook_ingest_disabledFr   	forbiddenz&Webhook ingest is not enabled for BID i  )successr
  messagehttp_statusinvalid_secretunauthorizedz"Invalid or missing X-Ingest-Secreti  rT   )rT   validation_errorTz; i  raw_calls_table_missing	not_foundzTable z_raw_calls does not existi  r   r   skippednot_answeredzOnly ANSWER calls are ingested   )r  r)   r   r   r   r  r  re   rg   r   )ra   re   rg   r   rj   r   zmin_duration:duration_below_minzCall not ingested ()rs   zgroup_filter:group_not_allowedingesteduB   Call ingested; recording URL not ready — orchestrator will retryr   r   z*Call ingested and queued for transcriptionzCCall ingested; STT queue skipped (already queued or terminal state)url_not_readyqueue_skipped)r  r)   r   r   r   r  r  )r*   rB   r   r|   r}   r   r   r   r   r   joinr   r   r   r   r   r   r   r   r   r   r  )r   r)   r@   r   r   r   r   r   rT   r   field_errorsr   r   r  r   r   r  r  r  r   group_reasonr   r   r  r=   r=   r>   process  s  




l
l

l

llzCallIngestService.process)r)   r*   r,   r'   )r)   r*   r,   r   )r)   r*   ra   rP   r   r*   r@   r'   r   r*   r   rP   r   r   r   r   r,   r   )r)   r*   r   r'   r,   r   )r)   r*   r   r*   r   r*   r   r}   r   r   r,   r   )r)   r*   r@   r'   r   rP   r,   r'   )__name__
__module____qualname__r   r   r   r   r   r   r  r.  r=   r=   r=   r>   r   %  s    




/yr   )r)   r*   r+   r*   r,   r'   )r@   r'   rA   r*   r,   r   )r,   rE   )r,   rP   )rS   r*   rT   r*   r,   rU   )r@   r'   rT   r*   r,   r`   )r6   r*   r,   r   )rj   r*   ra   r*   r,   r   )rs   r*   r   r'   r,   r   )r,   r*   )r   rP   r   r'   r,   r   )*__doc__
__future__r   r  loggingrX   r   r   typingr   r   r   r   r   urllib.parser	   r  r   pymysql.cursorsr
   min_duration_utilr   r   r   	getLoggerr/  r   r(   __annotations__r?   rD   rO   rR   r_   r   r   r   r   r   r   r   r   r=   r=   r=   r>   <module>   sr    

$





=


+




