o
    Ze”i+  ã                   @   sº   d Z ddlZddlZddlZddlZddlZddlmZ ddlmZm	Z	 ddl
Z
ddlmZ ddlmZ ddlmZ ejejdd	 e d
¡ZG dd„ dƒZdd„ Zedkr[eƒ  dS dS )z²
Continuous RAG transcript ingestion worker.

- Scans all *_sarvamresponse tables.
- Prioritizes configured business IDs (default includes 6004).
- Writes run progress to MySQL.
é    N)Údatetime)ÚDictÚList)Ú
DictCursor)ÚConfig)Ú
RAGHandlerz4%(asctime)s - %(name)s - %(levelname)s - %(message)s)ÚlevelÚformatÚrag_ingestion_workerc                   @   sÒ   e Zd Z						d'dededed	ed
edee dB defdd„Zdd„ Z	dd„ Z
dee fdd„Zdee dee fdd„Zdededefdd„Zd(dededededed edB fd!d"„Zd#d$„ Zd%d&„ ZdS ))ÚRagIngestionWorkeréÐ  TFé,  NÚconfigÚlimit_per_bidÚpresales_onlyÚoverwrite_existingÚinterval_secondsÚpriority_bidsÚrun_oncec              	   C   s¦   || _ t|ƒ| _t|ƒ| _t|ƒ| _t|ƒ| _dd„ |pg D ƒ| _t|ƒ| _t	|ƒ| _
| dd¡t| dd¡ƒ| dd¡| d	d
¡| dd¡dtddœ| _|  ¡  d S )Nc                 S   s$   g | ]}t |ƒ ¡ rt |ƒ ¡ ‘qS © )ÚstrÚstrip©Ú.0Úbr   r   úrag_ingestion_worker.pyÚ
<listcomp>0   s   $ z/RagIngestionWorker.__init__.<locals>.<listcomp>ÚDB_HOSTz	127.0.0.1ÚDB_PORTiê  ÚDB_USERÚadminÚDB_PASSWORDÚ ÚDB_NAMEÚvoicebot_clusterÚutf8mb4T)ÚhostÚportÚuserÚpasswordÚdatabaseÚcharsetÚcursorclassÚ
autocommit)r   Úintr   Úboolr   r   r   r   r   r   ÚragÚgetr   Ú	db_configÚ_ensure_progress_tables)Úselfr   r   r   r   r   r   r   r   r   r   Ú__init__!   s$   










ø
zRagIngestionWorker.__init__c                 C   s   t jdi | j¤ŽS )Nr   )ÚpymysqlÚconnectr2   ©r4   r   r   r   Ú_conn?   s   zRagIngestionWorker._connc                 C   s>   |   ¡ }z| ¡ }| d¡ | d¡ W | ¡  d S | ¡  w )Na»  
                CREATE TABLE IF NOT EXISTS rag_ingestion_progress (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    status ENUM('idle','running','success','error') DEFAULT 'idle',
                    last_run_started_at DATETIME NULL,
                    last_run_finished_at DATETIME NULL,
                    last_duration_ms BIGINT DEFAULT 0,
                    processed_calls INT DEFAULT 0,
                    ingested_documents INT DEFAULT 0,
                    ingested_chunks INT DEFAULT 0,
                    skipped INT DEFAULT 0,
                    last_error TEXT,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uniq_bid (bid),
                    INDEX idx_updated_at (updated_at)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
                a»  
                CREATE TABLE IF NOT EXISTS rag_ingestion_runs (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    started_at DATETIME NOT NULL,
                    finished_at DATETIME NULL,
                    duration_ms BIGINT DEFAULT 0,
                    status ENUM('running','success','error') DEFAULT 'running',
                    processed_calls INT DEFAULT 0,
                    ingested_documents INT DEFAULT 0,
                    ingested_chunks INT DEFAULT 0,
                    skipped INT DEFAULT 0,
                    details JSON,
                    error TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    INDEX idx_bid_created (bid, created_at),
                    INDEX idx_status_created (status, created_at)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
                )r9   ÚcursorÚexecuteÚclose)r4   Úconnr:   r   r   r   r3   B   s   ÿÿz*RagIngestionWorker._ensure_progress_tablesÚreturnc                 C   sŠ   |   ¡ }z| ¡ }| dd¡ | ¡ }W | ¡  n| ¡  w g }|D ]}t| ¡ ƒd }t d|¡}|r<| 	| 
d¡¡ q"tt|ƒƒ}|S )NzSHOW TABLES LIKE %s)z%_sarvamresponser   z ^([A-Za-z0-9_]+)_sarvamresponse$é   )r9   r:   r;   Úfetchallr<   ÚlistÚvaluesÚreÚmatchÚappendÚgroupÚsortedÚset)r4   r=   r:   ÚrowsÚbidsÚrowÚ
table_namerD   r   r   r   Ú_discover_bidsr   s   
€z!RagIngestionWorker._discover_bidsrJ   c                    s.   ‡ fdd„| j D ƒ‰‡fdd„ˆ D ƒ}ˆ| S )Nc                    s   g | ]}|ˆ v r|‘qS r   r   r   )rJ   r   r   r   „   ó    z5RagIngestionWorker._schedule_bids.<locals>.<listcomp>c                    s   g | ]}|ˆ vr|‘qS r   r   r   )Úpriorityr   r   r   …   rN   )r   )r4   rJ   Úrestr   )rJ   rO   r   Ú_schedule_bidsƒ   s   z!RagIngestionWorker._schedule_bidsÚbidÚ
started_atc                 C   sX   |   ¡ }z"| ¡ }| dt|ƒ|f¡ | dt|ƒ|f¡ t|jƒW | ¡  S | ¡  w )Na[  
                INSERT INTO rag_ingestion_progress (bid, status, last_run_started_at, last_error)
                VALUES (%s, 'running', %s, NULL)
                ON DUPLICATE KEY UPDATE
                    status='running',
                    last_run_started_at=VALUES(last_run_started_at),
                    last_error=NULL
                z…
                INSERT INTO rag_ingestion_runs (bid, started_at, status)
                VALUES (%s, %s, 'running')
                )r9   r:   r;   r   r.   Ú	lastrowidr<   )r4   rR   rS   r=   r:   r   r   r   Ú_mark_runningˆ   s   
÷
û
z RagIngestionWorker._mark_runningÚrun_idÚstatusÚresultÚerrorc                 C   sê   t  ¡ }t||  ¡ d ƒ}t| dd¡pdƒ}	t| dd¡p dƒ}
t| dd¡p*dƒ}t| dd¡p4dƒ}|  ¡ }z5| ¡ }| d||||	|
|||t|ƒf	¡ | d||||	|
||t	j
|p_i d	d
|t|ƒf
¡ W | ¡  d S | ¡  w )Niè  Úprocessed_callsr   Úingested_documentsÚingested_chunksÚskippedaŽ  
                UPDATE rag_ingestion_progress
                SET status=%s,
                    last_run_finished_at=%s,
                    last_duration_ms=%s,
                    processed_calls=%s,
                    ingested_documents=%s,
                    ingested_chunks=%s,
                    skipped=%s,
                    last_error=%s
                WHERE bid=%s
                a–  
                UPDATE rag_ingestion_runs
                SET finished_at=%s,
                    duration_ms=%s,
                    status=%s,
                    processed_calls=%s,
                    ingested_documents=%s,
                    ingested_chunks=%s,
                    skipped=%s,
                    details=%s,
                    error=%s
                WHERE id=%s
                T)Úensure_ascii)r   Úutcnowr.   Útotal_secondsr1   r9   r:   r;   r   ÚjsonÚdumpsr<   )r4   rR   rV   rS   rW   rX   rY   Úfinished_atÚduration_msrZ   Úingested_docsr\   r]   r=   r:   r   r   r   Ú_mark_finished¢   sJ   ÷óöòz!RagIngestionWorker._mark_finishedc                 C   s*  |   |  ¡ ¡}|st d¡ d S t dt|ƒd |¡¡ |D ]t}t ¡ }|  ||¡}t d||¡ z4| j	j
|| j| j| jd}| j|||d|d d t d|| d	d
¡| dd
¡| dd
¡| dd
¡¡ W q ty’ } z!t|ƒ}| j|||dd
d
d
d
dœ|d t d||¡ W Y d }~qd }~ww d S )Nz!No *_sarvamresponse tables found.z Discovered %s bids to ingest: %sz, zStarting bid=%s (run_id=%s))rR   r   Úlimitr   Úsuccess)rX   rY   z6Completed bid=%s calls=%s docs=%s chunks=%s skipped=%srZ   r   r[   r\   r]   rY   )rZ   r[   r\   r]   zIngestion failed for bid=%s: %s)rQ   rM   ÚloggerÚinfoÚlenÚjoinr   r_   rU   r0   Úbackfill_transcriptsr   r   r   rf   r1   Ú	Exceptionr   Ú	exception)r4   rJ   rR   rS   rV   rX   ÚexcÚerrr   r   r   Úrun_iterationä   sN   
ü



úú€öìz RagIngestionWorker.run_iterationc                 C   s0   	 |   ¡  | jr
d S t d| j¡ t | j¡ q)NTz%Sleeping %s seconds before next scan.)rr   r   ri   rj   r   ÚtimeÚsleepr8   r   r   r   Úrun_forever  s   ûzRagIngestionWorker.run_forever)r   TFr   NF)N)Ú__name__Ú
__module__Ú__qualname__r   r.   r/   r   r   r5   r9   r3   rM   rQ   r   rU   rf   rr   ru   r   r   r   r   r       s<    øþýüûú
ù
ø0(B'r   c               	   C   s¶   t jdd} | jdtdd | jddd | jd	dd | jd
tdd | jdddd | jddd |  ¡ }dd„ t|jƒ d¡D ƒ}tt	j
|j|j|j|j||jd}| ¡  d S )Nz(Continuous transcript ingestion into RAG)Údescriptionz--limit-per-bidr   )ÚtypeÚdefaultz--presales-onlyÚ
store_true)Úactionz--overwrite-existingz--interval-secondsr   z--priority-bidsr"   z"Comma separated bids to prioritize)r{   Úhelpz
--run-oncec                 S   s   g | ]
}|  ¡ r|  ¡ ‘qS r   )r   r   r   r   r   r     s    zmain.<locals>.<listcomp>ú,)r   r   r   r   r   r   r   )ÚargparseÚArgumentParserÚadd_argumentr.   Ú
parse_argsr   r   Úsplitr   r   Ú__dict__r   r   r   r   r   ru   )ÚparserÚargsr   Úworkerr   r   r   Úmain  s&   ù	r‰   Ú__main__)Ú__doc__r€   ra   ÚloggingrC   rs   r   Útypingr   r   r6   Úpymysql.cursorsr   r   r   Úrag_handlerr   ÚbasicConfigÚINFOÚ	getLoggerri   r   r‰   rv   r   r   r   r   Ú<module>   s.   þ
 u
ÿ