o
    ej!                     @  s$  d Z ddlmZ ddlZddlZddlZddlZddlZddlZddl	Z	ddl
mZ ddlmZ ejejeZejde eejed ejejdd ed	Zd/ddZd0ddZd1ddZ		d2d3ddZd4d d!Zd5d#d$Zd6d&d'Zd7d)d*Z d8d,d-Z!e"d.kre!  dS dS )9a)  
Reset a poisoned stt_jobs queue and re-publish one job per call that still needs Sarvam STT.

Use when the queue has thousands of duplicate messages and workers cannot catch up.

  python3 repair_stt_backlog.py --dry-run
  python3 repair_stt_backlog.py
  python3 repair_stt_backlog.py --bid 6004
    )annotationsN)load_dotenv)
DictCursorz.envz'%(asctime)s [%(levelname)s] %(message)s)levelformatrepair_stt_backlog
resp_tablestrreturnc                 C  s   d|  dS )NzEXISTS (SELECT 1 FROM `zX` s WHERE s.callid = r.callid AND s.transcript IS NOT NULL AND TRIM(s.transcript) != '') )r   r   r   repair_stt_backlog.py_has_transcript_sql#   s   
r   only_bid
str | None	list[str]c                 C  sR   | r|   gS ddlm} ddlm} G dd d}||| }|  | S )Nr   DatabaseHandlerConfigc                   @     e Zd Zdd ZdddZdS )zget_bids.<locals>.ConfigWrapperc                 S  
   || _ d S N_cfgselfcfgr   r   r   __init__2      
z(get_bids.<locals>.ConfigWrapper.__init__Nc                 S     t | j||S r   getattrr   r   keydefaultr   r   r   get5      z#get_bids.<locals>.ConfigWrapper.getr   __name__
__module____qualname__r   r%   r   r   r   r   ConfigWrapper1       r+   )strip
db_handlerr   configr   %ensure_business_pipeline_config_tableget_enabled_pipeline_bids)r   r   r   r+   handlerr   r   r   get_bids+   s   
r3   bidtuple[int, datetime | None]c                 C  s  ddl m } ddlm} ddlm} G dd d}||| }|  || p*i }|d}|d ur;tdt	|nd}|dkrCdS |d	}	t
|	|r[||	jrX|	jd d
fS |	fS |	rz|t|	dd}
||
jru|
jd d
fW S |
fW S  ty   |d f Y S w |d fS )Nr   )datetimer   r   c                   @  r   )z*get_duration_filter.<locals>.ConfigWrapperc                 S  r   r   r   r   r   r   r   r   D   r   z3get_duration_filter.<locals>.ConfigWrapper.__init__Nc                 S  r   r   r    r"   r   r   r   r%   G   r&   z.get_duration_filter.<locals>.ConfigWrapper.getr   r'   r   r   r   r   r+   C   r,   r+   min_call_duration_sr   Nmin_call_duration_effective_at)tzinfoZz+00:00)r6   r.   r   r/   r   r0   get_pipeline_configr%   maxint
isinstancer:   replacefromisoformatr	   	Exception)r4   r6   r   r   r+   r2   r   rawmin_seffective_atparsedr   r   r   get_duration_filter=   s,   


 rG   limitr>   min_duration_s
list[dict]c           
   	   C  s|   | d}| d}t |}d}g }	|dkr!|r!d}|	||g | d| d| d| d	t|	|g  t|  p<g S )
N
_raw_calls_sarvamresponse r   z
          AND COALESCE(r.synced_at, r.call_starttime) >= %s
          AND r.call_starttime IS NOT NULL
          AND r.call_endtime IS NOT NULL
          AND TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) >= %s
        z2
        SELECT r.callid, r.fileurl
        FROM `zz` r
        WHERE r.fileurl IS NOT NULL AND TRIM(r.fileurl) != ''
          AND r.status IN (0, 1, -2)
          AND NOT (z)
          z@
        ORDER BY r.call_starttime ASC
        LIMIT %s
        )r   extendexecutetuplelistfetchall)
curr4   rH   rI   rE   rC   resphas_txduration_clausequery_paramsr   r   r   collect_calls]   s&   


rX   dry_runboolc                 C  s|   | d}| d}t |}d| d| d}|r2| d| d| d t|  p+i d	p0d
S | | t| jp<d
S )NrK   rL   z
        UPDATE `z` r
        SET status = 0
        WHERE r.status IN (1, -2)
          AND r.fileurl IS NOT NULL AND TRIM(r.fileurl) != ''
          AND NOT (z)
    zSELECT COUNT(*) AS n FROM `z[` r WHERE r.status IN (1, -2) AND r.fileurl IS NOT NULL AND TRIM(r.fileurl) != '' AND NOT ()nr   )r   rO   r>   fetchoner%   rowcount)rS   r4   rY   rC   rT   rU   sqlr   r   r   reset_queued_without_transcript   s"   


r`   queuec                 C  s8   |r| j |ddd}t|jjS | j|d}t|jjS )NT)ra   durablepassive)ra   )queue_declarer>   methodmessage_countqueue_purge)channelra   rY   qr   r   r   purge_queue   s
   rj   jobsc                 C  sH   d}|D ]}|r|d7 }q| j d|t|tjddd |d7 }q|S )Nr      rM      )delivery_mode)exchangerouting_keybody
properties)basic_publishjsondumpspikaBasicProperties)rh   ra   rk   rY   	publishedjobr   r   r   publish_jobs   s   

rz   callidsc                 C  sP   |r|rt |S ddgt | }| d| d| dt| t| jp&dS )N, z%szUPDATE `z,_raw_calls` SET status = 1 WHERE callid IN (r[   r   )lenjoinrO   rP   r>   r^   )rS   r4   r{   rY   placeholdersr   r   r   mark_queued   s   r   Nonec               	   C  sx  t jdd} | jddd | jdtddd	 | jd
dd |  }ddlm} | }tdd}tdd}t	j
|j|j|j|j|jtdd}t|j}|sXtd td ttj|d}| }	|	j|dd t|	||j}
td|jr{dnd|
| g }zzu| g}|D ]U}t |\}}t!|||j}td||jrdnd| t"|||j#||d }|D ]}|$|t%|d! |d" d# q|js|rt&||d$d% |D dd& td'|t'| q|js|(  W d    n1 sw   Y  W n t)y   |*   w W |+  n|+  w t,|	|||j}|+  td(|jr&d)nd*||d+-| |js:td, d S d S )-Nz+Purge stt_jobs backlog and re-queue from DB)descriptionz--bidz4Only repair this BID (default: all pipeline_enabled))helpz--limiti  zMax calls to re-queue per BID)typer$   r   z	--dry-run
store_true)actionr   r   RABBITMQ_QUEUEstt_jobsRABBITMQ_HOST	localhostF)hostportuserpassworddatabasecursorclass
autocommitzNo BIDs to repairrl   )r   T)ra   rb   z%s purged %s message(s) from %sz
Would havePurgedu1   BID %s: %s reset %s row(s) from status 1/-2 → 0WouldReset)rI   rE   callidfileurl)r4   call_idrecording_urlc                 S  s   g | ]}t |d  qS )r   )r	   ).0cr   r   r   
<listcomp>   s    zmain.<locals>.<listcomp>)rY   zBID %s: %s call(s) to publishz!%s %s job(s) on %s for BID(s): %szWould publish	Publishedr|   z<Restart STT workers: sudo systemctl restart mcube-stt-worker).argparseArgumentParseradd_argumentr>   
parse_argsr/   r   osgetenvpymysqlconnectDB_HOSTDB_PORTDB_USERDB_PASSWORDDB_NAMEr   r3   r4   loggererrorsysexitrv   BlockingConnectionConnectionParametersrh   rd   rj   rY   infocursorrG   r`   rX   rH   appendr	   r   r}   commitrB   rollbackcloserz   r~   )parserargsr   r   ra   r   connbidsrmqrh   purged
total_jobsrS   r4   rI   rE   reset_ncallscallrx   r   r   r   main   s   
	




r   __main__)r   r	   r
   r	   )r   r   r
   r   )r4   r	   r
   r5   r8   )r4   r	   rH   r>   rI   r>   r
   rJ   )r4   r	   rY   rZ   r
   r>   )ra   r	   rY   rZ   r
   r>   )ra   r	   rk   rJ   rY   rZ   r
   r>   )r4   r	   r{   r   rY   rZ   r
   r>   )r
   r   )#__doc__
__future__r   r   rt   loggingr   r   rv   r   dotenvr   pymysql.cursorsr   pathdirnameabspath__file__BASE_DIRinsertr~   basicConfigINFO	getLoggerr   r   r3   rG   rX   r`   rj   rz   r   r   r(   r   r   r   r   <module>   sB   	



$
$



R
