o
    ǡi]                     @  s   d dl mZ d dlZd dlZd dlZd dlZd dlmZmZm	Z	m
Z
 d dlmZ d dlmZmZmZ eeZG dd dZdS )    )annotationsN)AnyDictListOptional)settings)	raw_callsstt_jobs
bid_configc                   @  sR   e Zd Zdd ZdddZdddZd	d
 Zdd Zdd ZdddZ	dd Z
dS )RabbitMQJobProducerc              	   C  sx   t j| _t j| _|   |   ttjt j	t j
tt jt jd| _| j | _| jj| jdd td| j d S )N)hostportcredentialsT)queuedurablez(RabbitMQ Producer initialized | queue=%s)r   
batch_sizerabbitmq_queue
queue_name_ensure_bid_config_table_auto_register_bidspikaBlockingConnectionConnectionParametersrabbitmq_hostrabbitmq_portPlainCredentialsrabbitmq_userrabbitmq_password
connectionchannelqueue_declareloggerinfoself r%   T/home/aiteam/pcaa-dev/call-proccessing/stt_pipeline/workers/rabbitmq_job_producer.py__init__   s"   
zRabbitMQJobProducer.__init__returnNonec              
   C  sB   zt   W d S  ty  } ztd| W Y d }~d S d }~ww )Nz2Could not ensure stt_pipeline_bid_config table: %s)r
   ensure_table	Exceptionr!   warningr$   excr%   r%   r&   r   &   s   z,RabbitMQJobProducer._ensure_bid_config_tablec              
   C  sp   zt  }|D ]}t| q|rtdt| W d S W d S  ty7 } ztd| W Y d }~d S d }~ww )Nz4Auto-registered %d bid(s) in stt_pipeline_bid_configzAuto-register bids failed: %s)	r   get_all_bidsr
   ensure_bid_registeredr!   debuglenr+   r,   )r$   all_bidsbidr.   r%   r%   r&   r   ,   s   z'RabbitMQJobProducer._auto_register_bidsc              
   C  sn   t dtj| j 	 z|   W n ty( } zt d| W Y d }~nd }~ww t dtj t	tj q
)Nz&Producer started | poll=%ds | batch=%dTz&Unexpected error in producer cycle: %szSleeping %ds...)
r!   r"   r   poll_interval_secondsr   _run_one_cycler+   	exceptiontimesleepr-   r%   r%   r&   run_forever6   s   zRabbitMQJobProducer.run_foreverc                 C  s   t d |   d S )NzRunning producer cycle once...)r!   r"   r6   r#   r%   r%   r&   run_onceB   s   
zRabbitMQJobProducer.run_oncec           	        s   t  }|stjr fddtjD }ntd d S d}|D ](}|d }|dp+d}|dp2d	}|d
p: j} ||||}||7 }q|dkrTt	d| d S td d S )Nc                   s   g | ]
}|d d j dqS )idrecording_url)r4   raw_calls_id_colraw_calls_url_colr   )r   ).0br#   r%   r&   
<listcomp>K   s    z6RabbitMQJobProducer._run_one_cycle.<locals>.<listcomp>u2   No enabled BIDs and no whitelist — nothing to dor   r4   r>   r<   r?   r=   r   z&Cycle complete: Produced %d new job(s)z!Cycle complete: No new jobs found)
r
   get_enabled_bidsr   bid_whitelistr!   r1   getr   _discover_and_publishr"   )	r$   active_configstotal_producedcfgr4   id_colurl_colbatchcountr%   r#   r&   r6   F   s&   


z"RabbitMQJobProducer._run_one_cycler4   strrJ   rK   rL   intc                 C  s   t |}tj|||d ||d}d}|D ]H}|d}	|	sq|d }
dd | D }t j||
|	|d}|r]|||
|	|d	}| jjd
| j	t
|tjddd |d7 }td||
| q|S )N   )r4   already_seen_idslimitrJ   rK   r   r=   call_idc                 S  s*   i | ]\}}|d vr|dur|t |qS ))rS   r=   N)rN   )r@   kvr%   r%   r&   
<dictcomp>u   s    z=RabbitMQJobProducer._discover_and_publish.<locals>.<dictcomp>)r4   rS   r=   metadata)job_idr4   rS   r=   rW    )delivery_mode)exchangerouting_keybody
properties   z[%s/%s] Published job_id=%d)r	   get_all_seen_call_idsr   get_new_callsrE   items
insert_jobr   basic_publishr   jsondumpsr   BasicPropertiesr!   r1   )r$   r4   rJ   rK   rL   already_seen	new_callsrM   callr=   rS   rW   rX   job_payloadr%   r%   r&   rF   c   sP   

z)RabbitMQJobProducer._discover_and_publishc                 C  s2   t | dr| jr| jjr| j  d S d S d S d S )Nr   )hasattrr   is_opencloser#   r%   r%   r&   rn      s   zRabbitMQJobProducer.closeN)r(   r)   )
r4   rN   rJ   rN   rK   rN   rL   rO   r(   rO   )__name__
__module____qualname__r'   r   r   r:   r;   r6   rF   rn   r%   r%   r%   r&   r      s    



2r   )
__future__r   re   loggingr8   r   typingr   r   r   r   config.settingsr   dbr   r	   r
   	getLoggerro   r!   r   r%   r%   r%   r&   <module>   s    
