o
    =)j'                     @  s  d Z ddlmZ ddlZddlZddlZddlZddlZddlZddl	Z	ddl
mZ ejejeZejeZejeddZeddZeejed	 eejed	d
d ejejdd edZedeeddZeeeeddZedeeddZedeeddZedeeddZ edd! dvZ"ejedd  d!d"d#d$fZ#dMd'd(Z$dNd*d+Z%dOd-d.Z&d
d/dPd3d4Z'dQdRd5d6Z(dSd8d9Z)dTd;d<Z*dUd>d?Z+dVdWdBdCZ,dSdDdEZ-d
dFdXdHdIZ.dJdK Z/e0dLkre/  dS dS )Yut  
Keep N RabbitMQ STT workers running for the shared stt_jobs queue.

All pipeline-enabled BIDs (6004, 8329, 8398, …) publish to the same queue; workers
process jobs for any BID based on the job payload. Run via systemd or cron.

  python stt_worker_supervisor.py          # loop every STT_SUPERVISOR_INTERVAL_SEC
  python stt_worker_supervisor.py --once   # single sync
    )annotationsN)load_dotenvzcall-proccessingstt_pipelineSTT_SUPERVISOR_LOCKz/tmp/stt_worker_supervisor.lockz.envF)overridez'%(asctime)s [%(levelname)s] %(message)s)levelformatstt_worker_supervisor   STT_WORKER_COUNT4STT_WORKER_MAX8   STT_QUEUE_SCALE_THRESHOLD15STT_QUEUE_WORKERS_PER_MESSAGES12
   STT_SUPERVISOR_INTERVAL_SEC60STT_STUCK_HEAL_ENABLED1)0falsenorun.pyz	 --workerzstt_pipeline/run.py --workerz,stt_pipeline/venv/bin/python run.py --workerz-stt_pipeline/venv/bin/python.*run.py --workerzstt_pipeline.*run\.py --workerreturnintc                  C  sZ   t tt jt jB } zt| tjtjB  W | S  t	y,   t
dt td Y | S w )Nz7Another STT supervisor is already running (%s); exitingr   )osopenSUPERVISOR_LOCKO_CREATO_RDWRfcntlflockLOCK_EXLOCK_NBBlockingIOErrorloggererrorsysexit)fd r.   @/home/aiteam/pcaa-dev/dashboard-backend/stt_worker_supervisor.pyacquire_supervisor_lock9   s   r0   strc                  C  sB   t jtdddt jtdddfD ]} t j| r|   S qdS )Nvenvbinpythonpython3)r   pathjoinSTT_DIRBASE_DIRisfile)	candidater.   r.   r/   worker_pythonC   s   r<   	list[int]c               	   C  sj   t  } tD ]+}z tjdd|gdddd}|jdkr&| dd |j D  W q ty0   Y qw t	| S )	Npgrepz-fTr   )capture_outputtexttimeoutr   c                 s  s$    | ]}|   rt|V  qd S N)stripisdigitr   ).0xr.   r.   r/   	<genexpr>X   s   " z#list_worker_pids.<locals>.<genexpr>)
setWORKER_PGREP_PATTERNS
subprocessrun
returncodeupdatestdoutsplit	Exceptionsorted)pidspatternresultr.   r.   r/   list_worker_pidsM   s    
rU   forcepidrW   boolc             
   C  sf   zt | |r	tjntj W dS  ty   Y dS  ty2 } ztd| | W Y d }~dS d }~ww )NTFz Failed to stop worker pid=%s: %s)	r   killsignalSIGKILLSIGTERMProcessLookupErrorrP   r)   warning)rX   rW   excr.   r.   r/   stop_worker^   s   ra   c                 C  s|   g }t  D ]}t|| dr|| q|r<| s<td t  }|r<td| |D ]}t|ddr6|| q)td |S )zOStop every STT worker PID we can find (including legacy relative-path workers).rV      z=STT workers still running after SIGTERM (%s); sending SIGKILLTr
   )rU   ra   appendtimesleepr)   r_   )rW   stoppedrX   	survivorsr.   r.   r/   stop_all_workersi   s&   




rh   dictc               
   C  s   zTdd l } tdd}ttdd}tdd}tdd}td	d
}| | j||| ||d}| }|j|ddd}|t|j	j
t|j	jd}	|  |	W S  tyl }
 zdt|
iW  Y d }
~
S d }
~
ww )Nr   RABBITMQ_HOST	localhostRABBITMQ_PORT5672RABBITMQ_USERguestRABBITMQ_PASSWORDRABBITMQ_QUEUEstt_jobs)hostportcredentialsT)queuedurablepassive)rv   messages	consumersr*   )pikar   getenvr   BlockingConnectionConnectionParametersPlainCredentialschannelqueue_declaremethodmessage_countconsumer_countcloserP   r1   )r{   rs   rt   userpasswordrv   connchqstatsr`   r.   r.   r/   rabbitmq_queue_stats~   s4   


r   slotc                 C  s   t  }d|  d}tj }d|d< z@t|ddd}tj|tjt	dd	gt	tj
|tj|d
d
d}W d    n1 s<w   Y  td| |j| d
|j|| dW S  tys } ztd| | dt|| dW  Y d }~S d }~ww )Nz/tmp/stt_worker_z.logr   PYTHONUNBUFFEREDabr   )	bufferingr   z--workerT)cwdstdinrN   stderrenvstart_new_session	close_fdsz(Started STT worker slot=%s pid=%s log=%s)startedrX   logr   z&Failed to start STT worker slot=%s: %sF)r   r*   r   )r<   r   environcopyr    rJ   Popenr6   r7   r8   DEVNULLSTDOUTr)   inforX   rP   r*   r1   )r   pylog_pathr   log_fhprocr`   r.   r.   r/   start_worker   s0   
r   targetc                 C  sb   g }t dD ](}t }t|| kr|  S || d D ]}t|ddr(|| qtd q|S )z6Kill excess STT workers until at most *target* remain.r   NTrV   r
   )rangerU   lenra   rc   rd   re   )r   rf   _rR   rX   r.   r.   r/   _trim_workers_to   s   
r   queue_statsdict | Nonec                 C  sX   | pt  }|drtS t|dpd}|tk rtS ttd|t  }tttt|S )z-Scale workers up when stt_jobs backlog grows.r*   ry   r   )	r   getr   r   r   maxr   minr   )r   r   ry   scaledr.   r.   r/   effective_worker_target   s   

r   c               
   C  s   t sddiS z%ddlm}  ddlm} ddlm} G dd d}|||  }||W S  tyI } zt	d	| d
t
|iW  Y d }~S d }~ww )NskippedTr   )Config)DatabaseHandler)heal_stuck_queued_callsc                   @  s&   e Zd Zdd ZdddZdd ZdS )	z2heal_stuck_calls_if_enabled.<locals>.ConfigWrapperc                 S  s
   || _ d S rB   )_config)selfconfigr.   r.   r/   __init__   s   
z;heal_stuck_calls_if_enabled.<locals>.ConfigWrapper.__init__Nc                 S  s   t | j||S rB   getattrr   )r   keydefaultr.   r.   r/   r      s   z6heal_stuck_calls_if_enabled.<locals>.ConfigWrapper.getc                 S  s   t | j|S rB   r   )r   r   r.   r.   r/   __getattr__   s   z>heal_stuck_calls_if_enabled.<locals>.ConfigWrapper.__getattr__rB   )__name__
__module____qualname__r   r   r   r.   r.   r.   r/   ConfigWrapper   s    
r   zSTT stuck-call heal failed: %sr*   )r   r   r   
db_handlerr   stt_queue_healerr   rP   r)   r_   r1   )r   r   r   r   dbr`   r.   r.   r/   heal_stuck_calls_if_enabled   s   

r   force_restartr   c                 C  s*  t  }t|}g }t }| r|rtdd}g }n
|t| t }g }tt||D ]}t|d }|	dr=|
| td q+|t| t }|rV| sVtd t  }	t }
t|	p`i 	dped}|tt||k r| std|t| tdd	S |ttt|t| t|||||	|
d

S )NTrV   r
   r   r   rz   r   zBRabbitMQ consumers (%s) < running workers (%s); restarting workersr   )
target_workersbase_workersmax_workersrunning_beforerunning_afterrf   r   worker_pidsrv   heal)r   r   rU   rh   extendr   r   r   r   r   rc   rd   re   r   r   r   r)   r_   sync_workersr   r   )r   r   r   rf   rR   r   r   rT   
pids_afterrv   r   rz   r.   r.   r/   r      sP   




r   c               
   C  s   t   dtjv } dtjv }td| rdndt d|rdndtt t 	 zt	|d
}td|d |d t
|d |d W n tyX } ztd| W Y d }~nd }~ww | r]d S tt q%)Nz--oncez	--restartzDSTT worker supervisor (%s%s); target_workers=%s python=%s stt_dir=%soncezevery sz, restart-all Tr   z2Sync: running %s/%s workers (started %s); queue=%sr   r   r   rv   z%STT worker supervisor sync failed: %s)r0   r+   argvr)   r   r   r   r<   r8   r   r   r   rP   	exceptionrd   re   )r   r   summaryr`   r.   r.   r/   main  s:   





r   __main__)r   r   )r   r1   )r   r=   )rX   r   rW   rY   r   rY   )F)rW   rY   r   r=   )r   ri   )r   r   r   ri   )r   r   r   r=   rB   )r   r   r   r   )r   rY   r   ri   )1__doc__
__future__r   r$   loggingr   r[   rJ   r+   rd   dotenvr   r6   dirnameabspath__file__r9   PROJECT_ROOTr7   r8   r|   r!   basicConfigINFO	getLoggerr)   r   r   r   r   r   r   r   lowerr   rI   r0   r<   rU   ra   rh   r   r   r   r   r   r   r   r   r.   r.   r.   r/   <module>   s`   	

	







-

