o
    (j@W                  	   @  s  d Z ddlmZ ddlZddlZddlZddlZddlZddlZddl	Z	ddl
m
Z
mZ ddlmZmZmZmZmZ ddlZddlmZ ejejeZejde ee ddlmZ e  ddlmZ dd	l m!Z" dd
l#m$Z$ ddl%m&Z& ej'ej(de)ej*ede+ gd e,dZ-da.dd Z/eej0e/ eej1e/ dFddZ2dGddZ3dHd"d#Z4dId%d&Z5dJd(d)Z6dd*l7m8Z9 dKd/d0Z:dKd1d2Z;dLd5d6Z<dMd7d8Z=dMd9d:Z>	dNdOd=d>Z?dPd@dAZ@dQdCdDZAeBdEkreA  dS dS )Ru  
Generic multi-bid call processing pipeline.

Replaces the hardcoded ``pipeline_6004.py`` with a DB-driven design.
Pipeline configuration is loaded from ``business_pipeline_config``; AI agent
prompts from ``business_agent_config``; all results land in
``{bid}_call_records``.

Stages per bid
--------------
1. SYNC  — pull new ANSWER calls from source DB into ``{bid}_call_records``
           (watermarked; optionally filtered to CRM-known phones)
2. TRANSCRIBE — fetch audio, call STT provider, save transcript
3. ANALYZE    — run all enabled AI agents via AgentRunner, save analysis

Usage
-----
    python3 call_processor.py                  # process all enabled bids once
    python3 call_processor.py --continuous     # loop forever
    python3 call_processor.py --bid 6004       # single bid
    python3 call_processor.py --bid 6004 --stage sync
    python3 call_processor.py --interval 120
    )annotationsN)datetime	timedelta)AnyDictListOptionalSet)
DictCursor)load_dotenv)DatabaseHandler)push_leadsquared_activities)get_stt_provider)AgentRunneru4   %(asctime)s [%(levelname)s] %(name)s — %(message)szcall_processor.log)levelformathandlerscall_processorFc                 C  s   t d dad S )Nu7   Shutdown signal received — finishing current batch…T)loggerinfo	_shutdown)sigframe r   call_processor.py_handle_signalC   s   
r   phoner   returnSet[str]c                 C  s   d dd t| p	dD }|st S t|dkr|dd  n|}|d| |h}t|dkr@|d| d| d	| h d
d |D S )N c                 s  s    | ]	}|  r|V  qd S N)isdigit).0chr   r   r   	<genexpr>O       z"_phone_variants.<locals>.<genexpr>
   i+91z+910c                 S  s   h | ]}|r|qS r   r   )r"   vr   r   r   	<setcomp>V       z"_phone_variants.<locals>.<setcomp>)joinstrsetlenupdate)r   digitscore10variantsr   r   r   _phone_variantsN   s   "r5   cfgr   pymysql.Connectionc              
   C  s>   t j| d t| dpd| d | dpd| d dtd	d
S )Nsource_db_hostsource_db_porti  source_db_usersource_db_passwordr   source_db_nameutf8mb4r&   )hostportuserpassworddatabasecharsetcursorclassconnect_timeout)pymysqlconnectintgetr
   )r6   r   r   r   _open_source_conn[   s   rJ   connbidr.   Optional[str]c                 C  sf   | d| d| d| dg}|   }|d dd | D }|D ]
}||v r0|  S q&d S )N_callhistory_call_history_callarchive_call_archivezSHOW TABLESc                 S  s   h | ]
}t | d  qS )r   )listvaluesr"   rr   r   r   r+   q   s    z'_detect_source_table.<locals>.<setcomp>cursorexecutefetchall)rK   rL   
candidatesrW   existingtr   r   r   _detect_source_tableh   s   
r]   tablec                 C  s,   |   }|d| d dd | D S )NzSHOW COLUMNS FROM ``c                 S  s   h | ]}|d  qS )Fieldr   rT   r   r   r   r+   {   r,   z)_detect_source_columns.<locals>.<setcomp>rV   )rK   r^   rW   r   r   r   _detect_source_columnsx   s   ra   Optional[datetime]c                 C  sr   | d u rd S t | tr| jr| jd dS | S ztt| dd}|jr,|jd dW S |W S  ty8   Y d S w )N)tzinfoZz+00:00)
isinstancer   rc   replacefromisoformatr.   	Exception)valueparsedr   r   r   _parse_pipeline_datetime~   s   
rk   )is_below_min_durationdbr   pipeline_cfgrH   c           '        s  t d|  t|dpd}t|dpd}|d}|dkr(|s(|| }t|dd}|d	p6d
}t|dp>d}||  t }	|rc|| |}	|	sZt 	d|  dS t d| t
|	 d|  }
|| |
}|rxtt|}n	t t|d }t d| |d zt|}W n ty } zt d| | W Y d}~dS d}~ww d}d}zz|t|| }|st d|  W W |  dS t d| | t|| g d}g d}| fdd|D  }ddd |D }| }|d| d| d ||f | pg }|st d!|  W W |  dS t d"| t
| d}|D ]}|d#}|d$}d}|rU|rUz
t||  }W n tyT   d}Y nw t|||rb|d%7 }q*t|}|ryt|}| |	sy|d%7 }q*|d&pd'! }t|d(p|d)pd*" } t|d+pd*" }!t|d, " }"|!rd-nd.}#|!rdnd/}$t|d0p| |"|!|#|!rdnd1|$t|d2pd*t|d3pd*||||t|d4pd*| |d5}%|#| |% |d%7 }|r|du s||kr|}q*t d6| ||t
| |r.|td%d7 }&|$| |
|&%  t d8| |& W n tyK } zt jd9| |dd: W Y d}~nd}~ww W |  |S W |  |S |  w );zHPull new ANSWER calls from source DB and upsert into {bid}_call_records.z[%s] === STAGE 1: SYNC ===
sync_batchi  min_call_duration_sr   min_call_duration_effective_atlead_filter_enabledTcrm_providerleadsquaredlookback_daysZ   uv   [%s] Lead filter enabled but crm_leads_cache is empty — run sync_crm_leads.py first (or disable lead_filter_enabled)z,[%s] Loaded %d phone variants from CRM cache
call_sync_)daysz#[%s] Fetching ANSWER calls since %s%Y-%m-%d %H:%M:%Sz$[%s] Source DB connection failed: %sNz,[%s] No source call table found in source DBz[%s] Source table: %s)
callidrL   	agentname	groupname	starttimeendtime
dialstatus	directionfilename	emp_phone)calltocallfromclicktocalldidc                   s   g | ]}| v r|qS r   r   r"   c
avail_colsr   r   
<listcomp>   s    zstage_sync.<locals>.<listcomp>z, c                 s  s    | ]	}d | d V  qdS )r_   Nr   r   r   r   r   r$      r%   zstage_sync.<locals>.<genexpr>zSELECT z FROM `zP` WHERE dialstatus = 'ANSWER' AND starttime > %s ORDER BY starttime ASC LIMIT %sz([%s] No new ANSWER calls since watermarkz([%s] Fetched %d ANSWER calls from sourcer}   r~      r   inboundr   r   r   r   rz   pendingfailedzNo audio file URLrL   syncr{   r|   r   )rL   rz   file_urlstatus
fail_stagefail_reason
agent_name
group_namer   
call_startcall_endcall_duration_scall_statusagent_phonecustomer_phonez4[%s] Sync: %d upserted, %d skipped out of %d fetched)hoursz[%s] Watermark advanced to %sz[%s] Sync error: %sexc_info)&r   r   rH   rI    ensure_min_duration_effective_atboolensure_call_records_tabler/   get_lead_phone_setwarningr0   get_sync_watermarkr   rg   r.   nowr   strftimerJ   rh   errorr]   closera   r-   rW   rX   rY   total_secondsshared_is_below_min_duration_pick_customer_phoner5   intersectionlowerstripupsert_call_recordset_sync_watermark	isoformat)'rL   rm   rn   
batch_sizemin_durationeffective_atlead_filterrs   ru   	phone_setwatermark_keyraw_wm	watermarksrc_connexcinsertedlatest_starttime	src_table
fixed_colsoptional_colsselect_colscols_sql
src_cursorcallsskippedcallstartend
duration_sr   r4   r   r   r   rz   r   r   recordnew_wmr   r   r   
stage_sync   s   




a
L

$
r   c                 C  s  t d|  |dpd}|dpd}t|dpd}zt||}W n ty> } zt d| | W Y d	}~d
S d	}~ww t|dpFd
}|d}	|d
krX|	sX|| }	|j| |||	d}
|
skt d|  d
S t d| t	|
 d
}|
D ]u}t
r~ np|d }|dpd}|s|j| |ddd qxt d| | || |d z%|||}|jstd|| || |d7 }t d| |t	|j W qx ty } zt d| || |j| |dt|d W Y d	}~qxd	}~ww t d| |t	|
 |S )z7Transcribe pending calls and update {bid}_call_records.z [%s] === STAGE 2: TRANSCRIBE ===stt_providersarvamstt_api_keyr   transcribe_batch   z![%s] Cannot load STT provider: %sNr   rp   rq   )batchmin_duration_sr   z#[%s] No calls pending transcriptionz[%s] %d call(s) to transcriberz   r   
transcribezNo audio URLstagereasonz[%s] Transcribing %stranscribingzEmpty transcript returnedr   u    [%s] Transcribed %s — %d charsz$[%s] Transcription failed for %s: %sz#[%s] Transcription: %d/%d succeeded)r   r   rI   rH   r   
ValueErrorr   r   get_calls_to_transcriber0   r   	fail_callset_call_statusr   
transcriptRuntimeErrorsave_call_transcriptionrh   r.   )rL   rm   rn   stt_provider_namer   r   sttr   r   r   r   successr   rz   r   
stt_resultr   r   r   stage_transcribe+  sd   

"r   agent_runnerr   c                 C  s&  t d|  d}|j| |d}|st d|  dS t d| t| d}|D ]}tr, n|d }|dp6d	}g }	|d
}
|
rZzt|
trKt	|
n|
}	W n t
yY   g }	Y nw |dp`d	|dpfd	t|dpmd	t|dpud	d}t d| | || |d zY|j| |||	|d}|ds|| |i  n+|| || z	t| ||| W n t
y } zt d| || W Y d}~nd}~ww |d7 }|d}t d| |||d W q& t
y } zt d| || |j| |dt|d W Y d}~q&d}~ww t d| |t| |S )z5Run AI agents on transcribed calls and save analysis.z[%s] === STAGE 3: ANALYZE ===r   )r   z[%s] No calls pending analysisr   z[%s] %d call(s) to analyzerz   r   r   speaker_segmentsr   r   r   r   )r   r   r   r   z[%s] Analyzing %s	analyzing)rL   rz   r   r   call_metadata
agents_runz'[%s] LeadSquared push failed for %s: %sNr   quality_scoreu'   [%s] Analyzed %s — score=%s agents=%sz[%s] Analysis failed for %s: %sanalyzer   z[%s] Analysis: %d/%d succeeded)r   r   get_calls_to_analyzer0   r   rI   re   r.   jsonloadsrh   r   runsave_call_analysis_push_leadsquared_activitiesr   r   )rL   rm   r   r   r   r   r   rz   r   r   raw_segs	call_metaanalysisr   scorer   r   r   stage_analyzeh  st   


"r   c                 C  sN   | dpd}|rz| |pdW S  ty   Y nw tdp&tdp&dS )zGDecrypt the STT API key stored in pipeline config, falling back to env.stt_api_key_encr   SARVAM_SUBSCRIPTION_KEYSARVAM_PIPELINE_KEYrI   _decrypt_textrh   osgetenvrm   r6   	encryptedr   r   r   _decrypt_stt_key  s   r   c                 C  sD   | dpd}|rz| |pdW S  ty   Y nw tdp!dS )Nsource_db_password_encr   SYNC_SOURCE_DB_PASSWORDr   r   r   r   r   _decrypt_source_password  s   r  stage_filterDict[str, int]c                 C  s   | | }|std|  i S |ddstd|  i S t|}t|||d< t|||d< i }|r7|dkr?t| |||d< t	rC|S |rI|d	krQt
| |||d
< t	rU|S |r[|dkrct| |||d< |S )zqRun the full pipeline (or a single stage) for one bid.

    Returns counts: {synced, transcribed, analyzed}.
    u*   [%s] No pipeline config found — skippingpipeline_enabledTu#   [%s] Pipeline disabled — skippingr   r;   r   syncedr   transcribedr   analyzed)get_pipeline_configr   r   rI   r   dictr   r  r   r   r   r   )rL   rm   r   r  r6   countsr   r   r   run_bid  s,   

r  Dict[str, Any]c                  C  s8   ddl m}  i }t| D ]}| rt| |||< q|S )Nr   )Config)configr  dirisuppergetattr)r  r6   keyr   r   r   _build_config  s   r  Nonec                  C  s  t jdd} | jddd | jdg ddd	 | jd
ddd | jdtddd |  }t }t|}t|}|  |	  d}	 |d7 }t
rIntd td td|t d td zL|jrm|jg}n| }|sytd n6|D ]3}t
r n.zt||||jd}td|| W q{ ty }	 ztjd||	dd W Y d }	~	q{d }	~	ww W n ty }	 ztjd||	dd W Y d }	~	nd }	~	ww |jsntd|j t|jD ]}
t
r ntd qqBtd  d S )!NzGeneric multi-bid call pipeline)descriptionz--bidz,Process only this bid (default: all enabled))helpz--stage)r   r   r   zRun a single stage only)choicesr  z--continuous
store_truezLoop indefinitely)actionr  z
--intervalx   z,Seconds between pipeline runs (default: 120))typedefaultr  r   Tr   r   zF======================================================================u     PIPELINE RUN #%d — %sry   z!No enabled pipeline configs found)r  z[%s] Run complete: %sz![%s] Unhandled pipeline error: %sr   zPipeline iteration %d error: %su   Waiting %ds before next run…zcall_processor stopped.)argparseArgumentParseradd_argumentrH   
parse_argsr  r   r   %ensure_business_pipeline_config_table"ensure_business_agent_config_tabler   r   r   r   r   r   rL   get_enabled_pipeline_bidsr  r   rh   r   
continuousintervalrangetimesleep)parserargsr  rm   runner	iterationbidsrL   r  r   _r   r   r   main  sr   



(r1  __main__)r   r   r   r   )r6   r   r   r7   )rK   r7   rL   r.   r   rM   )rK   r7   r^   r.   r   r   )r   rb   )rL   r.   rm   r   rn   r   r   rH   )rL   r.   rm   r   r   r   r   rH   )rm   r   r6   r   r   r.   r    )
rL   r.   rm   r   r   r   r  rM   r   r  )r   r  )r   r  )C__doc__
__future__r   r  r   loggingr   signalsysr)  r   r   typingr   r   r   r   r	   rF   pymysql.cursorsr
   pathdirnameabspath__file__BACKEND_DIRinsertchdirdotenvr   
db_handlerr   leadsquared_activity_pushr   r   r   r   r   r   basicConfigINFOFileHandlerr-   StreamHandler	getLoggerr   r   r   SIGTERMSIGINTr5   rJ   r]   ra   rk   min_duration_utilrl   r   r   r   r   r   r  r  r  r1  __name__r   r   r   r   <module>   sj   







 

=
G

-
	C
