o
    i                     @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZ	d dl
Z
d dlZd dlZd dlmZmZ d dlmZmZ d dlmZ d dlmZmZmZmZmZ d dlmZmZ d dlZd dlZd dlm Z  d d	l!m"Z" d d
l#m$Z$m%Z% d dl&m'Z'm(Z( ddl)m*Z*m+Z+m,Z, ddl-m.Z. ddl/m0Z0 ddl1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7 ddl8m9Z9m:Z: ddl;m<Z< ddl=m>Z>m?Z?m@Z@ ddl,mAZAmBZB ddlCmDZD ddlEmFZF dZGdZHdZIdZJd;dd ZKd<d$d%ZLG d&d' d'eZMeMZNG d(d) d)ZOeG d*d+ d+ZPejQRd,re4jSZTne4jUZTed-ZVed.d/G d0d1 d1eeV ZWeWejXd2d3ZYeP ZZeG d4d5 d5Z[e[Z\eG d6d7 d7Z]ed8 Z^G d9d: d:e,j_e^ Z`dS )=    )annotationsN)	AwaitableCallable)	dataclassfield)Enum)AnyGenericLiteralTypeVaroverload)urljoinurlparse)web)MessageToDict)apirtc)agentmodels   )ipc	telemetryutils)AssignmentTimeoutError)_InferenceRunner)JobAcceptArguments
JobContextJobExecutorType
JobProcess
JobRequestRunningJobInfo)	DEV_LEVELlogger)Plugin)ATTRIBUTE_AGENT_NAME	NOT_GIVEN
NotGivenOr)http_serveris_given)get_cpu_monitor)__version__g      @      @      ?   procr   returnr   c                 C     d S N )r.   r2   r2   W/var/www/html/livekit_bhavya/venv/lib/python3.10/site-packages/livekit/agents/worker.py_default_setup_fncA   s   r4   ctxr   Nonec                   s   |   I d H  d S r1   )accept)r5   r2   r2   r3   _default_request_fncE   s   r8   c                   @  s   e Zd ZejjZejjZdS )
ServerTypeN)	__name__
__module____qualname__r   JobTypeJT_ROOMROOMJT_PUBLISHER	PUBLISHERr2   r2   r2   r3   r9   I   s    r9   c                   @  sD   e Zd ZdZe ZdddZdddZdd	d
Z	e
dddZdS )_DefaultLoadCalcNr/   r6   c                 C  s@   t d| _t | _tj| jddd| _t	 | _
| j  d S )N   Tworker_cpu_load_monitor)targetdaemonname)r   MovingAverage_m_avgr)   _cpu_monitor	threadingThread
_calc_load_threadLock_lockstartselfr2   r2   r3   __init__U   s   
z_DefaultLoadCalc.__init__c                 C  sD   	 | j jdd}| j | j| W d    n1 sw   Y  q)NTr,   interval)rJ   cpu_percentrP   rI   
add_sample)rS   cpu_pr2   r2   r3   rM   ^   s   z_DefaultLoadCalc._calc_loadfloatc                 C  s4   | j  | j W  d    S 1 sw   Y  d S r1   )rP   rI   get_avgrR   r2   r2   r3   _get_avgd   s   $z_DefaultLoadCalc._get_avgworkerAgentServerc                 C  sL   | j d u r!| j | j d u rt | _ W d    n1 sw   Y  | j  S r1   )	_instance_instance_lockrB   r\   )clsr]   r2   r2   r3   get_loadh   s   


z_DefaultLoadCalc.get_loadr/   r6   r/   rZ   )r]   r^   r/   rZ   )r:   r;   r<   r_   rK   rO   r`   rT   rM   r\   classmethodrb   r2   r2   r2   r3   rB   Q   s    

	
rB   c                   @  s\   e Zd ZU dZded< dZded< dZded< dZded< ee	dZ
ded	< d
Zded< dS )WorkerPermissionsTboolcan_publishcan_subscribecan_publish_datacan_update_metadatadefault_factoryzlist[models.TrackSource]can_publish_sourcesFhiddenN)r:   r;   r<   rh   __annotations__ri   rj   rk   r   listrn   ro   r2   r2   r2   r3   rf   r   s   
 rf   winTT)frozenc                   @  s,   e Zd ZU ded< ded< edd	d
ZdS )ServerEnvOptionrs   dev_defaultprod_defaultoptT | ServerEnvOption[T]devmoderg   r/   c                 C  s   t | tr|r
| jS | jS | S r1   )
isinstanceru   rv   rw   )rx   rz   r2   r2   r3   getvalue   s   
zServerEnvOption.getvalueN)rx   ry   rz   rg   r/   rs   )r:   r;   r<   rp   staticmethodr|   r2   r2   r2   r3   ru      s
   
 ru   gffffff?rv   rw   c                   @  s  e Zd ZU ded< 	 eZded< 	 eZded< 	 ej	Z
ded< 	 eZd	ed
< 	 eZded< 	 dZded< 	 dZded< 	 dZded< 	 edeee  ddZded< 	 dZded< 	 dZded< 	 eedZded< 	 dZd ed!< 	 ej Z!d"ed#< 	 d$Z"ded%< 	 d&Z#d'ed(< 	 d&Z$d'ed)< 	 d&Z%d'ed*< 	 dZ&d ed+< edd,dZ'ded-< 	 e(Z)d.ed/< 	 e*j+,d0sd1nd2Z-d3ed4< 	 e(Z.d5ed6< 	 d&Z/d'ed7< 	 d>d<d=Z0d&S )?ServerOptions'Callable[[JobContext], Awaitable[None]]entrypoint_fncz'Callable[[JobRequest], Awaitable[None]]request_fnczCallable[[JobProcess], Any]prewarm_fncz4Callable[[AgentServer], float] | Callable[[], float]load_fncr   job_executor_typefloat | ServerEnvOption[float]load_threshold  rZ   job_memory_warn_mbr   job_memory_limit_mb  intdrain_timeout   r~   int | ServerEnvOption[int]num_idle_processes      $@shutdown_process_timeoutinitialize_process_timeoutrl   rf   permissions str
agent_name
WorkerTypeworker_type   	max_retryN
str | Nonews_urlapi_key
api_secrethost  portNotGivenOr[str | None]
http_proxylinuxspawn
forkserverLiteral['spawn', 'forkserver']multiprocessing_contextNotGivenOr[int]prometheus_portprometheus_multiproc_dirrz   rg   r/   r6   c                 C  s6   t | j|}|dkr|std|  d S d S d S )Nr   z?load_threshold in prod env must be less than 1, current value: )ru   r|   r   r"   warning)rS   rz   r   r2   r2   r3   validate_config   s   zServerOptions.validate_config)rz   rg   r/   r6   )1r:   r;   r<   rp   r8   r   r4   r   rB   rb   r   _default_job_executor_typer   _default_load_thresholdr   r   r   r   ru   minmathceilr)   	cpu_countr   r   r   r   rf   r   r   r   r?   r   r   r   r   r   r   r   r%   r   sysplatform
startswithr   r   r   r   r2   r2   r2   r3   r      sl   
 
r   c                   @  s   e Zd ZU ded< ded< dS )
WorkerInfor   	http_portrg   cloud_agentsN)r:   r;   r<   rp   r2   r2   r2   r3   r      s   
 r   )worker_startedworker_registeredc                      sV  e Zd Zedee  dZedddZ	e
edddeddeddddd	e	eejd
s.dnddddddd fd2d3Zedd4d5Zejdd7d5Zedd8d9Zejdd;d9Zedd>d?Zed	ejddd@ddJdKZed	ejddd@ddMdKZ	dd	ejddd@ddPdKZeddRdSZdTdTdUddYdZZeeeeeeeeeeddd[dd`daZeddbdcZeddedfZ eddgdhZ!efddkdlZ"e#j$e%dmdTddddnddudvZ&ddwdxZ'dd{d|Z(e#j$e%dmdd}d~Z)dddZ*dddZ+dddZ,dddZ-dddZ.dddZ/dddZ0dddZ1dddZ2dddZ3dddZ4  Z5S )r^   r   r~   r   r   r   r   r   Nr   r   r   r   )r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   	setup_fncr   r   r   r   r   r   r   r   rZ   r   r   r   r   r   r   r   r   rf   r   r   r   r   r   r   r   r   r   r   r   r   r   "Callable[[JobProcess], Any] | Noner   ;Callable[[AgentServer], float] | Callable[[], float] | Noner   
int | Noner   r/   r6   c                  sD  t    |ptjdpd| _|ptjdpd| _|p$tjdp$d| _tjdp-d| _|| _	|| _
|| _|| _|| _|| _|| _|| _|| _|| _|	| _|
| _|| _|| _|| _t|| _t|srtjdpqtjd}|| _d| _tj| _ d| _!d | _"d | _#d | _$|| _%|| _&d	\| _'| _(| _)| _*d | _+t,- | _.d S )
NLIVEKIT_URLr   LIVEKIT_API_KEYLIVEKIT_API_SECRETLIVEKIT_WORKER_TOKENHTTPS_PROXY
HTTP_PROXYunregistered)TFFF)/superrT   osenvironget_ws_url_api_key_api_secret_worker_token_host_port_job_executor_type_load_threshold_job_memory_warn_mb_job_memory_limit_mb_drain_timeout_num_idle_processes_shutdown_process_timeout_initialize_process_timeout_permissions
_max_retry_prometheus_port_prometheus_multiproc_dir_mp_ctx_strmpget_context_mp_ctxr(   _http_proxy_agent_namer9   r?   _server_type_id_entrypoint_fnc_request_fnc_session_end_fnc
_setup_fnc	_load_fnc_closed	_draining_connecting_connection_failed_http_serverasynciorO   rP   )rS   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   	__class__r2   r3   rT     sF   
zAgentServer.__init__c                 C     | j S r1   )r   rR   r2   r2   r3   r   T     zAgentServer.setup_fncvaluec                 C  "   |d urt |std|| _d S )Nz$setup_fnc must be a callable or None)callable	TypeErrorr   rS   r   r2   r2   r3   r   X     
c                 C  r   r1   )r   rR   r2   r2   r3   r   ^  r   zAgentServer.load_fncCallable[..., float] | Nonec                 C  r   )Nz#load_fnc must be a callable or None)r   r   r   r   r2   r2   r3   r   b  r   optionsr   c                 C  s
  | di d|j d|jd|jd|jd|jd|jd|jd|jd	|jd
|j	d|j
d|jd|jd|jd|jd|jd|jdt|jrP|jnd d|jd|j}|j|j|j|j|jd |S d|jd|j}|j|j|j|j|jd |S )Nr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   )r   type
on_requestr2   )r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r(   r   r   r   rtc_sessionr   r   r   r   )ra   r   serverr2   r2   r3   from_server_optionsh  sz   	
zAgentServer.from_server_options)r   r   r   on_session_endfuncr   r   r   r9   r   "Callable[[JobRequest], Any] | Noner  "Callable[[JobContext], Any] | Nonec                C  r0   r1   r2   )rS   r  r   r   r   r  r2   r2   r3   r     s   	zAgentServer.rtc_session\Callable[[Callable[[JobContext], Awaitable[None]]], Callable[[JobContext], Awaitable[None]]]c                C  r0   r1   r2   )rS   r   r   r   r  r2   r2   r3   r     s   
.Callable[[JobContext], Awaitable[None]] | NoneCallable[[JobContext], Awaitable[None]] | Callable[[Callable[[JobContext], Awaitable[None]]], Callable[[JobContext], Awaitable[None]]]c                  s*   d fdd}|dur||S |S )a  
        Decorator or direct registrar for the RTC session entrypoint.

        Usage:
            @server.rtc_session(agent_name="survey_agent")
            async def my_agent(job_ctx: JobContext): ...

            server.rtc_session(my_agent, agent_name="survey_agent")
        fr   r/   c                   s4   j d ur	td| _ __ __| S )NzHThe AgentServer currently only supports registering only one rtc_session)r   RuntimeErrorr   r   r   r   )r	  r   r   r  rS   r   r2   r3   	decorator  s   
z*AgentServer.rtc_session.<locals>.decoratorN)r	  r   r/   r   r2   )rS   r  r   r   r   r  r  r2   r  r3   r     s   r   c                 C  s    t | jr| jjndt| jdS )Nr   )r   r   )r   r   r   rg   r   rR   r2   r2   r3   worker_info  s   zAgentServer.worker_infoF)rz   r   rz   rg   r   c                  s  j 4 I dH  jstdjdu rtdjdu r"t_jdu r*t_j	du r3t
j_	jjrSj	t
jkrFtd t
j_	jtkrStd t_t _ _ttjt   _i _d_tjjtj  djd_!d_"t#t$j%dkrt&j'j(t$j%d	d
ddd
ddj)jj*pdd_"t&j+j,jjj-t./j0 jj1j"j)j2j3j4j5j*pdd_6tj7j8_9d_:d_;t<j=j>t./j? jd_@d_AdGfdd}dGfdd}j@jBCtDEd|g j@jBCtDEd|g d_Fd_GjHstIdjJstIdjKs"tIdd_LjMdur8tNj<j=j>jMjd_LjOrCjOtPjQd< ndtPjQv rOtPjQd _OjOr[tPjRjOdd  jOrtPjSTjOrtjUd!d"jOid# tPVjOD ]7}tPjSWjO|}ztPjSX|rtPY| W qw ty } ztjd$| |d% W Y d}~qwd}~ww jHtPjQd&< jJtPjQd'< jKtPjQd(< tjZd)t[t\j[d*d# j]d+krd,d- t^j_D d.g }tjZd/d0|id# j)`| j"durtZd1 j"a I dH  j"b I dH  d2_dHfd6d7}	j@a I dH  tZd8j@jc d9j@jd  jLr?jLa I dH  tZd:jLjcjLjd j6ed;|	 j6ed<|	 j6ed=|	 j6a I dH  tfjgj*pcdd>_;thjijHjJjKj;d?_:tjjjd_tjktd@dI fdAdB}
g }tjl|
 dCdD_G|mjG |stjln dEdD_F|mjF odF W d  I dH  n1 I dH sw   Y  jI dH  dS )Ju,  This method starts the worker's internal event loop, initializes any required
        executors, HTTP servers, and process pools, and optionally registers the worker
        with the LiveKit server.

        Args:
            devmode (bool, optional):
                If True, the worker runs in development mode.
                This affects certain environment-dependent defaults, such as the
                number of idle processes, logging verbosity, and load thresholds,
                making it easier to test and debug without production constraints.

            unregistered (bool, optional):
                If True, the worker will start without registering itself with the
                LiveKit server.
                This allows the worker to operate in a partially connected state—
                capable of using other providers or local processing—but invisible
                to the central LiveKit job dispatcher.
                Useful for local testing, isolated jobs, or running without being
                assigned new jobs.
        Nzworker is already runningzNo RTC session entrypoint has been registered.
Define one using the @server.rtc_session() decorator, for example:
    @server.rtc_session(agent_name="my_agent")
    async def my_agent(ctx: JobContext):
        ...
zLcustom load_fnc is not supported when hosting on Cloud, reverting to defaultzRcustom load_threshold is not supported when hosting on Cloud, reverting to default   )loopr   i,  rC   i  <   r+   )runnersinitialize_timeoutclose_timeoutmemory_warn_mbmemory_limit_mbping_intervalping_timeouthigh_ping_thresholdmp_ctxr  r   )initialize_process_fncjob_entrypoint_fncsession_end_fncr   r  r   inference_executorr  r  r  r  r  r           _r   r/   web.Responsec                   s@    j r j  stjdddS  jrtjdddS tjddS )Ni  zinference process not running)statustextzfailed to connect to livekitOK)r"  )_inference_executoris_aliver   Responser   )r  rR   r2   r3   health_check:  s   z%AgentServer.run.<locals>.health_checkc              	     s>   t  jtj jj jt	 j
tdd}tj|ddS )Npython)r   r   worker_loadactive_jobssdk_versionproject_typezapplication/json)bodycontent_type)jsondumpsr   r   r=   Namer   r   _worker_loadlenr*  r*   r   r&  )r  r-  rR   r2   r3   r]   C  s   
zAgentServer.run.<locals>.worker/z/workerz;ws_url is required, or set LIVEKIT_URL environment variablez@api_key is required, or set LIVEKIT_API_KEY environment variablezFapi_secret is required, or set LIVEKIT_API_SECRET environment variablePROMETHEUS_MULTIPROC_DIRT)exist_okz*cleaning prometheus multiprocess directorypathextrazfailed to remove exc_infor   r   r   zstarting worker)versionzrtc-versionr   c                 S  s   g | ]}|j qS r2   )package.0pr2   r2   r3   
<listcomp>  s    z#AgentServer.run.<locals>.<listcomp>avzpreloading pluginspackageszstarting inference executorFr.   ipc.job_executor.JobExecutorr6   c                   s0    j  | } j| | jj d S r1   )_loopcreate_task_update_job_status_job_lifecycle_tasksaddadd_done_callbackdiscard)r.   trR   r2   r3   rG       z+AgentServer.run.<locals>._update_job_statuszHTTP server listening on :z2Prometheus metrics exposed at http://%s:%s/metricsprocess_startedprocess_closedprocess_job_launched)proxy)sessionr"   c                    s   t jt} 	 |  I dH  d	fdd}t d|I dH _t	j
j t	j
  tj }tj }t|sutj}|dkrojtj }|dkrnt|j d}tt|| |}j| nj| q)
zperiodically check loadTNr/   rZ   c                    sF    j d usJ t j } t| j }t|dkr   S    S )Nr   )r   inspect	signaturerq   
parametersvaluesr3  )rV  rW  rR   r2   r3   r     s   
z5AgentServer.run.<locals>._load_task.<locals>.load_fncr   r  rd   )r   aiorV   UPDATE_LOAD_INTERVALtickr   get_event_looprun_in_executorr2  r   metrics_update_worker_load_update_child_proc_countru   r|   r   r   r   isinfr3  r*  maxr   r   
_proc_poolset_target_idle_processes)rV   r   r   default_num_idle_processesr*  job_loadavailable_loadavailable_jobrz   rS   r2   r3   
_load_task  s6   	


z#AgentServer.run.<locals>._load_task	load_taskrG   worker_conn_taskr   )r  r   r/   r   r.   rD  r/   r6   rc   )prP   r   	Exceptionr   r
  r   r8   r   r4   r   rB   rb   r  r   r"   r   r   r   r   r\  rE  _devmodesetTaskr   rH  _pending_assignments_close_futurer   rY  Chanr   WorkerMessage	_msg_chanr$  r3  r   registered_runnersr   inference_proc_executorInferenceProcExecutorr   r   	proc_poolProcPoolr   ru   r|   r   r   r   r   r   r   rc  WorkerStatusWS_AVAILABLE_previous_status_api_http_sessionr'   
HttpServerr   r   r   r2  app
add_routesr   r   
_conn_taskrj  r   
ValueErrorr   r   _prometheus_serverr   r   r   r   r   makedirsr7  existsdebuglistdirjoinisfileunlinkinfor*   r   r   r#   registered_pluginsset_forkserver_preloadrQ   
initializer   r   onaiohttpClientSessionr   
LiveKitAPIFuturelog_exceptionsrF  append_connection_taskemit)rS   rz   r   r'  r]   filename	file_patheplugin_packagesrG  rj  tasksr2   ri  r3   run  sB  






	
"


) * zAgentServer.run)r   r   r   r   r   r   r   r   r   r   r   r   NotGivenOr[str]r   NotGivenOr[JobExecutorType]NotGivenOr[float]c                C  s   | j stdt|r|| _t|r|| _t|r|| _t|r#|| _t|r*|| _t|r1|| _t|r8|| _	t|r?|| _
t|	rF|	| _t|
rM|
| _t|rV|| _d S d S )Nz/cannot update options after starting the server)r   r
  r(   r   r   r   r   r   r   r   r   r   r   r   )rS   r   r   r   r   r   r   r   r   r   r   r   r   r2   r2   r3   update_options  s2   
zAgentServer.update_optionsc                 C  r   r1   )r   rR   r2   r2   r3   id  r   zAgentServer.idlist[RunningJobInfo]c                 C  s   dd | j jD S )Nc                 S  s   g | ]}|j r|j qS r2   running_job)r?  r.   r2   r2   r3   rA     s    z+AgentServer.active_jobs.<locals>.<listcomp>)rc  	processesrR   r2   r2   r3   r*    s   zAgentServer.active_jobsc                 C  r   r1   )r   rR   r2   r2   r3   draining"  r   zAgentServer.drainingtimeoutNotGivenOr[int | None]c              	     s   t |r|n j} j4 I dH U  jr!	 W d  I dH  dS tjd j|dd d _  I dH  d
 fdd	}|rJt	| |I dH  n| I dH  W d  I dH  dS W d  I dH  dS 1 I dH slw   Y  dS )zcWhen timeout isn't None, it will raise asyncio.TimeoutError if the processes didn't finish in time.Nzdraining worker)r  r  r8  Tr/   r6   c                    sP   t j jddiI d H  	 dd  jjD } | sd S | D ]	}| I d H  qq)Nreturn_exceptionsTc                 S  s   g | ]}|j r|qS r2   r  r>  r2   r2   r3   rA  9  s    z5AgentServer.drain.<locals>._drain.<locals>.<listcomp>)r   gatherrH  rc  r  r  )procsr.   rR   r2   r3   _drain3  s   z!AgentServer.drain.<locals>._drainrc   )
r(   r   rP   r   r"   r  r  _update_worker_statusr   wait_for)rS   r  r  r2   rR   r3   drain&  s    .zAgentServer.drainrT  )fake_jobagent_identity	room_infotokenroomr  r  r  models.Room | Noner  c             	     s8  | j 4 I d H  |d urt j|ddj}|d u r&|s!tdtd}|d u r:|s0tdtj	td|d}t
j|sCtdntd	|t
jjd d
}|pit| j| j|dtjd|dd }t| jt|ddd|| j||d}| j|I d H  W d   I d H  d S 1 I d H sw   Y  d S )NF)verify_signaturez,agent_identity is None but fake_job is Falsezfake-agent-z'room_info is None but fake_job is FalseFAKE_RM_)sidrG   zsimulated-job-z	fake-job-)r  r  r   participantr   T)	room_joinr  r   r   )identityrG   metadata)	worker_idaccept_argumentsjoburlr  r  )rP   r   TokenVerifierverifyr  r  r   	shortuuidr   Roomr   Jobr=   r>   AccessTokenr   r   with_identity	with_kindwith_grantsVideoGrantsto_jwtr    r   r   r   rc  
launch_job)rS   r  r  r  r  r  r  running_infor2   r2   r3   simulate_jobD  sJ   

		.zAgentServer.simulate_jobc              	     s  | j 4 I d H  | jr#| jd ur| jI d H  	 W d   I d H  d S tjdd| jid | jd us4J | jd us;J | jd usBJ | jd usIJ d| _| j	d ur[t
j| j	I d H  | jd urjt
j| jI d H  tj| jddiI d H  | j I d H  | jd ur| j I d H  | j I d H  | j I d H  | jr| j I d H  | j I d H  | j  | j s| jd  W d   I d H  d S W d   I d H  d S 1 I d H sw   Y  d S )Nzshutting down workerr  r8  Tr  )rP   r   rt  r"   r  r  r  r  r   r  r   rY  cancel_and_waitrj  r   r  rH  rc  acloser$  closer  rw  done
set_resultrR   r2   r2   r3   r  |  s@   





).zAgentServer.aclosemsgagent.WorkerMessagec                   s@   | j r|d}|dkrdS |dkrdS | j|I dH  dS )zB_queue_msg raises aio.ChanClosed when the worker is closing/closedmessageupdate_workerNping)r   
WhichOneofrw  send)rS   r  whichr2   r2   r3   
_queue_msg  s   
zAgentServer._queue_msgc                   s  | j d usJ d}d }| js_z:zd| _t| j| jtjdd	 }dd| i}t
| j}|j}|drA|dd}| d|j |j d	d	 }t|d
}i }	| jr`| j|	d< | j j|||	d| jpkd tdI d H }d}t }
| jj|
j_|
jjtj | j!j"| j!j#| j!j$| j!j%| j!j&| j!j'dd | j(|
j_)t*|
j_+|,|
- I d H  |. I d H }t/ }|0| |1dst2d| 3|j d| _| 4 I d H  | 5|I d H  W n] t2y= } zP| jrW Y d }~W |d ur|6 I d H  d S d S || j7krd| _8t9d| dd t:|d d}|d7 }t;j<d| d|d t=>|I d H  W Y d }~nd }~ww W |d urK|6 I d H  n|d urY|6 I d H  w w | jrd S d S )Nr   T)r   AuthorizationzBearer httpwsz://r4  r   worker_token)headersparamsautopingrR  	heartbeat)rh   ri   rj   rk   rn   ro   r   registerz+expected register response as first messageFz#failed to connect to livekit after z	 attempts   
   r   z*failed to connect to livekit, retrying in sr:  )?r  r   r   r   r  r   r   r  r  r  r   r   schemer   replacenetlocr7  rstripr   r   
ws_connectr   HEARTBEAT_INTERVALr   rv  r   r   r  r   allowed_permissionsCopyFromr   ParticipantPermissionr   rh   ri   rj   rk   rn   ro   r   r   r*   r<  
send_bytesSerializeToStringreceive_bytesServerMessageParseFromStringHasFieldro  _handle_register_report_active_jobs_run_wsr  r   r   r
  r   r"   r   r   sleep)rS   retry_countr  join_jwtr  parser  base	agent_urlr  reqfirst_msg_br  r  retry_delayr2   r2   r3   r    s   

 

	






zAgentServer._connection_taskr  aiohttp.ClientWebSocketResponsec                   s   d d
fdd}d
 fdd}d
 fdd	}t | t | t | g}zt j| I d H  W tjj| I d H  d S tjj| I d H  w )NFr/   r6   c                    s.   t jt} 	 |  I dH    I dH  q)z!periodically update worker statusTN)r   rY  rV   UPDATE_STATUS_INTERVALr[  r  rU   rR   r2   r3   rj    s   z'AgentServer._run_ws.<locals>._load_taskc                    sL   	 zj  I d H } |  I d H  W n tjjy$   d Y d S w q)NT)rw  recvr  r  r   rY  
ChanClosed)r  
closing_wsrS   r  r2   r3   
_send_task  s   z'AgentServer._run_ws.<locals>._send_taskc                    s   	   I d H } | jtjjtjjtjjfv r rd S td| jtjjkr.t	
d| j q| j}t }|| |d}|dkrJ|j n(|dkrU|j n|dkrrjj|jdd	}j| |jj q)
NTz%worker connection closed unexpectedlyzunexpected message type: %sr  availability
assignmentterminationagent_job_terminationrl  )receiver   r  	WSMsgTypeCLOSECLOSEDCLOSINGro  BINARYr"   r   datar   r  r  r  _handle_availabilityr  _handle_assignmentr  rE  rF  _handle_terminationr  rH  rI  rJ  rK  )r  r  
server_msgr  	user_taskr  r2   r3   
_recv_task&  s<   


z'AgentServer._run_ws.<locals>._recv_taskrc   )r   rF  r  r   rY  r  )rS   r  rj  r  r   r  r2   r  r3   r    s   

#

*zAgentServer._run_wsjobsc              
     s   | j std|D ]M}tjtd|jj|jjdd |j}t	j
|| j dgd}ttjtjj d |d< t|j|j| jt	j|| j dd	|j|jd
}| j|I d H  q
d S )Nz%api_secret is required to reload jobszreloading job)job_idr   r8  HS256)
algorithmsi  exp)	algorithmr  r  r  r  r  r  )r   r
  r"   logr!   r  r  r   r  jwtdecoder   datetimenowtimezoneutc	timestampr    r  r   encoder  r  rc  r  )rS   r!  ajoriginal_tokendecodedr  r2   r2   r3   _reload_jobsR  s,    zAgentServer._reload_jobsregagent.RegisterWorkerResponsec                 C  sF   |j | _tjd| j|j | j|jj|jjdd | 	d|j |j d S )Nzregistered worker)r   r  r  regionprotocolr8  r   )
r  r   r"   r  r   r   server_infor7  r8  r  )rS   r5  r2   r2   r3   r  l  s   
zAgentServer._handle_registeragent.AvailabilityRequestc                 C  s0   | j | |}| j| || jj d S r1   )rE  rF  _answer_availabilityrH  rI  rJ  rK  )rS   r  taskr2   r2   r3   r  z  rM  z AgentServer._handle_availabilityc                 C  s$   | j rdS t| j| j}| j|k S )NF)r   ru   r|   r   rp  r2  )rS   r   r2   r2   r3   _is_available  s   
zAgentServer._is_availablec              
     s     st }jj|j_d|j_|I dH  dS ddfdd dfdd}t	j |dt
jdjjjjjjjjjjjjjjdd tjt
dd fdd}jj| dd}j| |jj dS )zAsk the user if they want to accept this job and forward the answer to the server.
        If we get the job assigned, we start a new process.FN	terminaterg   r/   r6   c                   s>   d t  }jj|j_d|j_| |j_|I d H  d S )NTF)	r   rv  r  r  r  r"  	availabler>  r  )r>  availability_resp)answeredr  rS   r2   r3   
_on_reject  s   z4AgentServer._answer_availability.<locals>._on_rejectargsr   c                   s.  d t  }jj|j_d|j_| j|j_| j	|j_
| j|j_j|jjt< | jr2|jj| j tjt j  }|jj< |I d H  zt|tI d H  W n$ tjyv   jjd  tjdj djdd t d w | }t| j|j pj!|j"j#dd}j$%|I d H  d S )NTzassignment for job z
 timed outjob_requestr   r8  Fr'  )&r   rv  r  r  r  r"  r?  r  participant_identityrG   participant_namer  participant_metadatar   participant_attributesr$   
attributesupdater   r  JobAssignmentrs  r  r  ASSIGNMENT_TIMEOUTTimeoutErrorpopr"   r   r   resultr    r  r   r  r   rc  r  )rC  r@  wait_assignment
job_assignr  )rA  job_reqr  rS   r2   r3   
_on_accept  sF   





	z4AgentServer._answer_availability.<locals>._on_accept)r  	on_reject	on_acceptzreceived job request)r"  dispatch_idr  room_idr   resumingenable_recordingr8  rT  c                     s   j d usJ z
 I d H  W n ty&   tjdjdd Y nw s>tjdjdd  ddI d H  d S d S )Nzjob_request_fnc failedrD  r8  zOno answer was given inside the job_request_fnc, automatically rejecting the jobF)r>  )r   ro  r"   	exceptionr   r   r2   )rB  rA  rS  rS   r2   r3   _job_request_task  s"   


z;AgentServer._answer_availability.<locals>._job_request_taskrE  rl  )r>  rg   r/   r6   )rC  r   r/   r6   rc   )r=  r   rv  r  r  r  r"  r?  r  r   r"   r  rW  r  rG   r  r   rY  rZ  r   r  rE  rF  rH  rI  rJ  rK  )rS   r  r@  rT  r\  r  r2   )rB  rA  rS  r  rS   r3   r;    s6   
+
z AgentServer._answer_availabilityr  agent.JobAssignmentc              	   C  s   t jd| j|jjj|jjj|jj|jj|jj	dd |jj| j
v rHttj | j
|jj}|| W d    d S 1 sAw   Y  d S t jdt|j| jdd d S )Nzreceived assignment)r   rX  r  r"  rW  rZ  r8  z&received assignment for an unknown job)r  r   )r"   r  r   r  r  r  rG   r  rW  rZ  rs  
contextlibsuppressr   InvalidStateErrorrO  r  r   r   )rS   r  futr2   r2   r3   r    s&   "
zAgentServer._handle_assignmentagent.JobTerminationc                   s*   | j |j}|sd S | I d H  d S r1   )rc  get_by_job_idr"  r  )rS   r  r.   r2   r2   r3   r    s
   zAgentServer._handle_terminationc           	        s&  t | j}| jr"tjtjj|d}tj|d}| |I d H  d S t	
| j| j}| j|k}| o5| j }|r<tjjntjj}tj| j||d}| j|krk| jsk|| _| j|d}|rdtjd|d ntjd|d tj|d}ttjj | |I d H  W d    d S 1 sw   Y  d S )N)r!  	job_count)r  )loadr!  rd  )re  	thresholdz2worker is at full capacity, marking as unavailabler8  z.worker is below capacity, marking as available)r3  r*  r   r   UpdateWorkerStatusr}  WS_FULLrv  r  ru   r|   r   rp  r2  r~  r  r"   r  r^  r_  r   rY  r  )	rS   job_cntrK  r  r   is_fullcurrently_availabler!  r9  r2   r2   r3   r  
  s.   

"z!AgentServer._update_worker_statusr.   rD  c                   s   |j }|d u r
d S tjj}|jtjjjkrtjj}n|jtjjj	kr(tjj
}n|jtjjjkr4tjj}tj|jj|dd}tj|d}| |I d H  d S )Nr   )r"  r!  error)
update_job)r  r   	JobStatus
JS_RUNNINGr!  r   job_executorFAILED	JS_FAILEDSUCCESS
JS_SUCCESSRUNNINGUpdateJobStatusr  r  rv  r  )rS   r.   job_infor!  rK  r  r2   r2   r3   rG  *  s   

zAgentServer._update_job_statusc                   sb   | j }|sd S dd |D }tj|d}tj|d}| |I d H  tjdt||dd d S )Nc                 S  s   g | ]}|j jqS r2   )r  r  )r?  rw  r2   r2   r3   rA  @  s    z3AgentServer._report_active_jobs.<locals>.<listcomp>)job_ids)migrate_jobz'reported active jobs after registration)rd  rx  r8  )r*  r   MigrateJobRequestrv  r  r"   r  r3  )rS   r*  rx  migrate_reqr  r2   r2   r3   r  ;  s   
zAgentServer._report_active_jobs),r   r   r   r   r   rZ   r   rZ   r   r   r   r   r   rZ   r   rZ   r   rf   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r/   r6   )r/   r   )r   r   r/   r6   )r/   r   )r   r   r/   r6   )r   r   r/   r^   )r  r   r   r   r   r9   r   r  r  r  r/   r   )
r   r   r   r9   r   r  r  r  r/   r  r1   )r  r  r   r   r   r9   r   r  r  r  r/   r  )r/   r   )rz   rg   r   rg   r/   r6   )r   r  r   r  r   r  r   r   r   r  r   r  r   r  r   r  r   r   r   r   r   rZ   r   rZ   r/   r6   )r/   r   )r/   r  )r/   rg   )r  r  r/   r6   )r  r   r  rg   r  r   r  r  r  r   r/   r6   rc   )r  r  r/   r6   )r  r	  r/   r6   )r!  r  r/   r6   )r5  r6  r/   r6   )r  r:  r/   r6   )r  r]  r/   r6   )r  rb  r/   r6   rn  )6r:   r;   r<   ru   r   r   r)   r   _default_num_idle_processes_default_portr   r   _default_permissionsr%   r   r   r   rT   propertyr   setterr   re   r  r   r9   r?   r   r  r  r  r  r*  r  r  r   r  r"   r  r  r  r  r  r4  r  r  r=  r;  r  r  r  rG  r  __classcell__r2   r2   r   r3   r^      s    N
+  4

7
-

]
@




g


 r^   )r.   r   r/   r   )r5   r   r/   r6   )a
__future__r   r   r^  r+  rU  r/  r   multiprocessingr   r   r   rK   collections.abcr   r   dataclassesr   r   enumr   typingr   r	   r
   r   r   urllib.parser   r   r  r)  r   google.protobuf.json_formatr   livekitr   r   livekit.protocolr   r   r   r   r   r   _exceptionsr   inference_runnerr   r  r   r   r   r   r   r    r(  r!   r"   pluginr#   typesr$   r%   r&   r'   r(   utils.hwr)   r<  r*   rM  r
  rZ  r  r4   r8   r9   r   rB   rf   r   r   THREADr   PROCESSrs   ru   infr   r~  r   WorkerOptionsr   
EventTypesEventEmitterr^   r2   r2   r2   r3   <module>   st    

!	\