o
    P'j                     @  s`  d Z ddlmZ ddlZddlZddlZddlZddlZej	ej
eZejde ddlmZ eejed ddlmZ ddlmZ ejejdd	 ed
ZeeddZeeddZeeddZG dd dZd-ddZd.ddZ d/ddZ!d/ddZ"d.dd Z#d0d"d#Z$d1d$d%Z%d1d&d'Z&d1d(d)Z'd*d+ Z(e)d,kre(  dS dS )2z
Ensure orchestrator_loop_{bid}.sh is running for every business with pipeline_enabled=1.
Run via systemd (mcube-orchestrator-supervisor.service) or cron every minute.
    )annotationsN)load_dotenvz.env)Config)DatabaseHandlerz'%(asctime)s [%(levelname)s] %(message)s)levelformatorchestrator_supervisorORCHESTRATOR_LOOP_INTERVAL_SEC300ORCHESTRATOR_RUN_LIMIT50$ORCHESTRATOR_SUPERVISOR_INTERVAL_SEC60c                   @  s&   e Zd Zdd ZdddZdd ZdS )	ConfigWrapperc                 C  s
   || _ d S N)_config)selfconfig r   orchestrator_supervisor.py__init__$   s   
zConfigWrapper.__init__Nc                 C  s   t | j||S r   getattrr   )r   keydefaultr   r   r   get'   s   zConfigWrapper.getc                 C  s   t | j|S r   r   )r   r   r   r   r   __getattr__*   s   zConfigWrapper.__getattr__r   )__name__
__module____qualname__r   r   r   r   r   r   r   r   #   s    
r   returnstrc                  C  s&   t jtddd} t j| r| S dS )Nvenvbinpythonpython3)ospathjoinBASE_DIRisfile)venv_pyr   r   r   orchestrator_python.   s   r,   bidc                 C  s   t jtd|  dS )Norchestrator_loop_.sh)r&   r'   r(   r)   )r-   r   r   r   loop_script_path3   s   r0   boolc                 C  sJ   d|  d}zt jdd|gdddd}t|j W S  ty$   Y dS w )	Nr.   r/   pgrep-fT   )capture_outputtexttimeoutF)
subprocessrunr1   stdoutstrip	Exception)r-   patternresultr   r   r   loop_is_running7   s   r?   c              
   C  s   t |  } | s
dS t| sdS d|  d}ztjdd|gdd W n ty< } ztd| | W Y d	}~dS d	}~ww td
|  dS )z+Stop orchestrator_loop_{bid}.sh if running.Fr.   r/   pkillr3   
   r7   z[%s] stop_loop failed: %sNz$Stopped orchestrator loop for BID %sT)	r!   r;   r?   r8   r9   r<   loggerwarninginfo)r-   r=   excr   r   r   	stop_loopE   s   rG   c                 C  sz   t | }t }dt d| d|  dt dt d}t|ddd	}|| W d
   n1 s0w   Y  t|d |S )z3Create or refresh per-BID orchestrator loop script.z*#!/usr/bin/env bash
set -euo pipefail
cd "z"
while true; do
  z orchestrate_pipeline.py --bid z	 --limit z	
  sleep z
done
wzutf-8)encodingNi  )	r0   r,   r)   ORCHESTRATE_LIMITLOOP_INTERVAL_SECopenwriter&   chmod)r-   scriptpycontentfhr   r   r   ensure_loop_scriptV   s&   	rS   dictc                 C  s   t |  } | s| dddS t| r| dddS t| }d|  d}tjddd	| d
| dgtjtjdtd}|jdd\}}d }|r`| r`zt	| 
 d }W n ty_   d }Y nw |rktd| |  td| | | d||dS )NFinvalid_bid)r-   startedreasonalready_runningz
/tmp/orch_z.logbashz-lcznohup bash "z" >>"z" 2>&1 & echo $!T)r:   stderrr6   cwdrA   rB   z[%s] start stderr: %sz-Started orchestrator loop for BID %s (pid=%s))r-   rV   pidlog)r!   r;   r?   rS   r8   PopenPIPEr)   communicateint
splitlines
ValueErrorrC   rD   rE   )r-   rO   log_pathprocouterrr]   r   r   r   
start_loopi   s4   ri   c               
   C  R   z
ddl m}  |  W S  ty( } ztd| dt|iW  Y d}~S d}~ww )zIKeep STT_WORKER_COUNT RabbitMQ consumers running (shared stt_jobs queue).r   sync_workerszSTT worker sync failed: %serrorN)stt_worker_supervisorrl   r<   rC   rD   r!   rl   rF   r   r   r   sync_stt_workers      rp   c               
   C  rj   )zNKeep ANALYTICS_WORKER_COUNT RabbitMQ consumers running (analytics_jobs queue).r   rk   z Analytics worker sync failed: %srm   N)analytics_worker_supervisorrl   r<   rC   rD   r!   ro   r   r   r   sync_analytics_workers   rq   rs   c                  C  s   t tt } |   dd |  pg D }g }g }g }ttD ]%}|dr-|	ds.q!|t
dd }|r<||v r=q!t|rF|| q!t|D ]}t|rW|| qKt|}|dre|| qKt||||dS )Nc                 S  s$   h | ]}t | rt | qS r   )r!   r;   ).0br   r   r   	<setcomp>   s   $ z%sync_enabled_loops.<locals>.<setcomp>r.   r/   rV   )enabled_bidsrV   rX   stopped)r   r   r   %ensure_business_pipeline_config_tableget_enabled_pipeline_bidsr&   listdirr)   
startswithendswithlenrG   appendsortedr?   ri   r   )dbenabledrV   alreadyry   rO   r-   r>   r   r   r   sync_enabled_loops   s8   



r   c               
   C  s&  dt jv } td| rdndt dt t 	 zYt }t }t	 }tdt
|d t
|d	 t
|d
 t
|dp:g  |rW|dsWtd|d|d|dpUi  |ro|dsotd|d|d|d W n ty } ztd| W Y d }~nd }~ww | rd S tt q)Nz--oncez<Orchestrator supervisor (%s); enabled loops use %s, limit=%soncezevery sTzESync complete: %s enabled, %s started, %s already running, %s stoppedrx   rV   rX   ry   rm   z$STT workers: %s/%s running; queue=%srunning_aftertarget_workersqueuez*Analytics workers: %s/%s running; queue=%szSupervisor sync failed: %s)sysargvrC   rE   SUPERVISOR_INTERVAL_SECr,   rJ   r   rp   rs   r   r   r<   	exceptiontimesleep)r   summarystt	analyticsrF   r   r   r   main   sT   




r   __main__)r    r!   )r-   r!   r    r!   )r-   r!   r    r1   )r-   r!   r    rT   )r    rT   )*__doc__
__future__r   loggingr&   r8   r   r   r'   dirnameabspath__file__r)   insertdotenvr   r(   r   r   
db_handlerr   basicConfigINFO	getLoggerrC   rb   getenvrK   rJ   r   r   r,   r0   r?   rG   rS   ri   rp   rs   r   r   r   r   r   r   r   <module>   sF   









)
