o
    2i                     @  s   d dl mZ d dlZd dlZd dlmZmZmZmZ d dl	m
Z
 eeZdZdZdZdZd=d>ddZd?ddZd@dd ZdAdBd%d&ZdCdDd)d*ZdEd,d-ZdFdGd2d3ZdHdId5d6ZdJd8d9ZdKd;d<ZdS )L    )annotationsN)AnyDictListOptional)get_connectionpending
processingdonefailedbidstrcall_idrecording_urlmetadataOptional[Dict]returnOptional[int]c                 C  s   t  ^}| I}|d| ||tt|pi f |jdkr/|jW  d    W  d    S |d| |f | }|rA|d nd W  d    W  d    S 1 sTw   Y  W d    d S 1 sdw   Y  d S )NzINSERT IGNORE INTO stt_jobs (bid, call_id, recording_url, status, metadata, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, NOW(), NOW())r   z3SELECT id FROM stt_jobs WHERE bid=%s AND call_id=%sid)	r   cursorexecuteSTATUS_PENDINGjsondumpsrowcount	lastrowidfetchone)r   r   r   r   conncurrow r    B/home/aiteam/pcaa-dev/call-proccessing/stt_pipeline/db/stt_jobs.py
insert_job   s"   

"r"   job_idintNonec                 C  s   t | t d S N)_update_statusSTATUS_PROCESSING)r#   r    r    r!   mark_processing   s   r)   
transcriptspeaker_segments
List[Dict]speaker_countdurationfloatstt_providerc                 C  s   t  5}| }|dt|t||||| f W d    n1 s#w   Y  W d    d S W d    d S 1 s;w   Y  d S )NzUPDATE stt_jobs SET status=%s, transcript=%s, speaker_segments=%s, speaker_count=%s, duration_seconds=%s, stt_provider=%s, error_message=NULL, updated_at=NOW() WHERE id=%s)r   r   r   STATUS_DONEr   r   )r#   r*   r+   r-   r.   r0   r   r   r    r    r!   	mark_done"   s   
"r2   Terrorincrement_retryboolc              	   C  s   t  <}| }|rdnd}|d| dt|d d | f W d    n1 s*w   Y  W d    d S W d    d S 1 sBw   Y  d S )Nz, retry_count = retry_count + 1 z/UPDATE stt_jobs SET status=%s, error_message=%sz, updated_at=NOW() WHERE id=%si  )r   r   r   STATUS_FAILED)r#   r3   r4   r   r   	retry_sqlr    r    r!   mark_failed.   s   

"r9      stale_after_minutesc              	   C  sz   t  0}| }|dtt| f |jW  d    W  d    S 1 s&w   Y  W d    d S 1 s6w   Y  d S )NztUPDATE stt_jobs SET status=%s, updated_at=NOW() WHERE status=%s AND updated_at < DATE_SUB(NOW(), INTERVAL %s MINUTE))r   r   r   r   r(   r   )r;   r   r   r    r    r!   reset_stale_processing_jobs7   s   
"r<   	List[str]c              	   C  sr   t  '}| }|d| f | }W d    n1 sw   Y  W d    n1 s-w   Y  dd |D S )Nz)SELECT call_id FROM stt_jobs WHERE bid=%sc                 S  s   g | ]}|d  qS )r   r    .0rr    r    r!   
<listcomp>F   s    z)get_all_seen_call_ids.<locals>.<listcomp>r   r   r   fetchall)r   r   r   rowsr    r    r!   get_all_seen_call_idsA   s   

rE   
   Optional[str]limitList[Dict[str, Any]]c              	   C  s   t  <}| '}| r|dt| |f n|dt|f | W  d    W  d    S 1 s2w   Y  W d    d S 1 sBw   Y  d S )NzRSELECT * FROM stt_jobs WHERE status=%s AND bid=%s ORDER BY created_at ASC LIMIT %szGSELECT * FROM stt_jobs WHERE status=%s ORDER BY created_at ASC LIMIT %s)r   r   r   r   rC   )r   rH   r   r   r    r    r!   get_pending_jobsH   s    
"rJ   max_retriesc              	   C  s|   t  1}| }|dt| |f | W  d    W  d    S 1 s'w   Y  W d    d S 1 s7w   Y  d S )NzZSELECT * FROM stt_jobs WHERE status=%s AND retry_count<%s ORDER BY updated_at ASC LIMIT %s)r   r   r   r7   rC   )rK   rH   r   r   r    r    r!   get_retryable_failed_jobsW   s   
"rL   Dict[str, int]c               	   C  sn   t  %} |  }|d | }W d    n1 sw   Y  W d    n1 s+w   Y  dd |D S )Nz<SELECT status, COUNT(*) as cnt FROM stt_jobs GROUP BY statusc                 S  s   i | ]	}|d  |d qS )statuscntr    r>   r    r    r!   
<dictcomp>f   s    z!get_job_stats.<locals>.<dictcomp>rB   )r   r   rD   r    r    r!   get_job_statsa   s   


rQ   rN   c              	   C  st   t  -}| }|d|| f W d    n1 sw   Y  W d    d S W d    d S 1 s3w   Y  d S )Nz;UPDATE stt_jobs SET status=%s, updated_at=NOW() WHERE id=%s)r   r   r   )r#   rN   r   r   r    r    r!   r'   h   s   
"r'   r&   )
r   r   r   r   r   r   r   r   r   r   )r#   r$   r   r%   )r#   r$   r*   r   r+   r,   r-   r$   r.   r/   r0   r   r   r%   )T)r#   r$   r3   r   r4   r5   r   r%   )r:   )r;   r$   r   r$   )r   r   r   r=   )NrF   )r   rG   rH   r$   r   rI   )rF   )rK   r$   rH   r$   r   rI   )r   rM   )r#   r$   rN   r   r   r%   )
__future__r   r   loggingtypingr   r   r   r   db.connectionr   	getLogger__name__loggerr   r(   r1   r7   r"   r)   r2   r9   r<   rE   rJ   rL   rQ   r'   r    r    r    r!   <module>   s(    


	



