o
    Wi"                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m	Z	 d dl
mZ d dlZd dlZd dlZd dlmZ zd dlmZ d dlmZ W n ey[   dZdZY nw eeZG dd	 d	ZdS )
    N)Counter)datetime)Decimal)
DictCursor)QdrantClient)modelsc                   @   s  e Zd ZdZh dZdd Zdd Zedd Zd	d
 Z	dd Z
dd Zedd Zedd Zedd Zedd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zdbd&d'Zd(d) Zd*d+ Zd,d- Zd.d/ Zd0d1 Zd2d3 Zd4d5 Zdcd6d7Zdcd8d9Z ddd;d<Z!d=d> Z"ded@dAZ#dbdBdCZ$dfdEdFZ%	G	H	I	J	%dgdKdLZ&dMdN Z'dOdP Z(dQdR Z)dhdTdUZ*dVdW Z+dcdXdYZ,dcdZd[Z-did\d]Z.d^d_ Z/djd`daZ0d%S )k
RAGHandlerzOMulti-tenant RAG operations with conversation memory and lightweight profiling.>,   aianasatbebyheifinisitmemyofonortousweandarebutforhasherhisitsourshethewasyoufromhavethatthemtheythiswerewillwithyourtheirc              
   C   s  || _ |ddt|dd|dd|dd|d	d
dtdd| _t|dd| _t|dd| _t|dd| _t	|dd| _
|dd| _t|dd| _|dd| _|ddpgd | _t	|dd| _t	|dd| _t|dd| _t|d d!| _t|d"d#| _td$|d$d%| _td&|d&d| _td'|d'd| _td(|d(|d)d*| _td+|d+d,| _td-|d-d.| _td/|d/d0d1| _ttd2t|d2d3| _d | _ | jr-| jr-zt!j"d4| j| j| jd5| _ W n t#y, } zt$%d6| W Y d }~nd }~ww d | _&d | _'| j
rit(d urizt(| j| j| j| jdd7| _'W n t#yh } zt$%d8| d | _'W Y d }~nd }~ww z| )  W d S  t#y } zt$%d9| W Y d }~d S d }~ww ):NDB_HOSTz	127.0.0.1DB_PORTi  DB_USERadminDB_PASSWORD DB_NAMEvoicebot_clusterutf8mb4T)hostportuserpassworddatabasecharsetcursorclass
autocommit	RAG_TOP_K   RAG_SIMILARITY_THRESHOLD皙?RAG_MEMORY_MESSAGES   RAG_USE_QDRANTRAG_QDRANT_HOSTRAG_QDRANT_PORTi  RAG_QDRANT_COLLECTION_PREFIXrag_chunks_RAG_QDRANT_API_KEYRAG_QDRANT_HTTPSFRAG_USE_REDIS_CACHERAG_EMBED_CACHE_TTL_SECONDSiQ RAG_RETRIEVAL_CACHE_TTL_SECONDSi,  RAG_MEMORY_TTL_SECONDSi ' 
AWS_REGIONz	us-east-1AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYRAG_CHAT_MODELAWS_NOVA_MODELzamazon.nova-lite-v1:0RAG_EMBEDDING_MODELzamazon.titan-embed-text-v2:0RAG_CHAT_PROVIDERbedrockOLLAMA_BASE_URLzhttp://127.0.0.1:11434/OLLAMA_TIMEOUT_SECONDS   zbedrock-runtime)service_nameregion_nameaws_access_key_idaws_secret_access_keyz(Failed to initialize Bedrock runtime: %s)r>   r?   httpsapi_keycheck_compatibilityz=Qdrant unavailable, continuing with MySQL vector fallback: %sz*Could not ensure RAG tables at startup: %s)*configgetintr   	db_config	rag_top_kfloatrag_similarity_thresholdrag_memory_messagesboolrag_use_qdrantrag_qdrant_hostrag_qdrant_portrag_qdrant_collection_prefixrag_qdrant_api_keyrag_qdrant_httpsrag_use_redis_cacherag_embed_cache_ttlrag_retrieval_cache_ttlrag_memory_ttlosgetenv
aws_regionre   rf   rag_chat_modelrag_embedding_modelrag_chat_providerrstripollama_base_urlstrollama_timeout_secondsbedrock_runtimeboto3client	Exceptionloggerwarningredis_clientqdrant_clientr   ensure_tables)selfrj   exc r   (/var/www/html/pca-backend/rag_handler.py__init__$   s   



zRAGHandler.__init__c                 C   s   t jdi | jS )Nr   )pymysqlconnectrm   r   r   r   r   get_connectionj   s   zRAGHandler.get_connectionc                 C   s&   t dt| std|  t| S )Nz^[A-Za-z0-9_]+$zUnsafe identifier: )rematchr   
ValueErrornamer   r   r   _safe_identifierm   s   zRAGHandler._safe_identifierc                 C   s   | d|f | d uS )NzSHOW TABLES LIKE %s)executefetchoner   cursor
table_namer   r   r   _table_existss   s   zRAGHandler._table_existsc                 C   s*   | d| | d dd | D S )NzSHOW COLUMNS FROM ``c                 S   s   h | ]}|d  qS )Fieldr   ).0rowr   r   r   	<setcomp>y       z,RAGHandler._table_columns.<locals>.<setcomp>)r   r   fetchallr   r   r   r   _table_columnsw   s   zRAGHandler._table_columnsc                 C   sd   |   }z(| }|d |d |d |d |d |  W |  d S |  w )Nad  
                CREATE TABLE IF NOT EXISTS rag_documents (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    source_id VARCHAR(255) NOT NULL,
                    title VARCHAR(500),
                    source_type VARCHAR(100),
                    source_uri TEXT,
                    metadata JSON,
                    is_active BOOLEAN DEFAULT TRUE,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uniq_bid_source (bid, source_id),
                    INDEX idx_bid_active (bid, is_active),
                    INDEX idx_created_at (created_at)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
                a
  
                CREATE TABLE IF NOT EXISTS rag_chunks (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    document_id BIGINT NOT NULL,
                    chunk_id VARCHAR(255) NOT NULL,
                    chunk_index INT DEFAULT 0,
                    content LONGTEXT NOT NULL,
                    token_count INT,
                    metadata JSON,
                    embedding JSON NOT NULL,
                    embedding_dim INT NOT NULL,
                    embedding_norm DOUBLE NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uniq_bid_doc_chunk (bid, document_id, chunk_id),
                    INDEX idx_bid_doc (bid, document_id),
                    INDEX idx_bid_created (bid, created_at)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
                a  
                CREATE TABLE IF NOT EXISTS rag_conversations (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    conversation_id VARCHAR(100) NOT NULL,
                    user_id VARCHAR(100) NOT NULL,
                    metadata JSON,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uniq_bid_conversation (bid, conversation_id),
                    INDEX idx_bid_user (bid, user_id),
                    INDEX idx_updated_at (updated_at)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
                a  
                CREATE TABLE IF NOT EXISTS rag_messages (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    conversation_id VARCHAR(100) NOT NULL,
                    user_id VARCHAR(100) NOT NULL,
                    role ENUM('system','user','assistant') NOT NULL,
                    content LONGTEXT NOT NULL,
                    metadata JSON,
                    retrieved_chunk_ids JSON,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    INDEX idx_bid_conversation (bid, conversation_id),
                    INDEX idx_bid_user_created (bid, user_id, created_at)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
                a  
                CREATE TABLE IF NOT EXISTS rag_user_profiles (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    user_id VARCHAR(100) NOT NULL,
                    profile JSON NOT NULL,
                    profile_version INT DEFAULT 1,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uniq_bid_user_profile (bid, user_id),
                    INDEX idx_bid_updated (bid, updated_at)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
                )r   r   r   commitclose)r   connr   r   r   r   r   {   s(   
zRAGHandler.ensure_tablesc                 C   sR   | d u r|S t | ttfr| S t | tr'zt| W S  tjy&   | Y S w |S N)
isinstancedictlistr   jsonloadsJSONDecodeError)valuefallbackr   r   r   _parse_json   s   
zRAGHandler._parse_jsonc              
   C   s   | d u rd S t | trzt| } W n tjy   Y d S w t | ts%d S g }| D ]}z	|t| W q) tt	fyA   Y  d S w |S r   )
r   r   r   r   r   r   appendro   	TypeErrorr   )	embeddingvectorr   r   r   r   _ensure_embedding_list   s$   

z!RAGHandler._ensure_embedding_listc                 C   s   t tdd | D S )Nc                 s   s    | ]}|| V  qd S r   r   )r   r   r   r   r   	<genexpr>   s    z*RAGHandler._vector_norm.<locals>.<genexpr>)mathsqrtsum)r   r   r   r   _vector_norm      zRAGHandler._vector_normc                 C   s   t dd t| |D S )Nc                 s   s    | ]	\}}|| V  qd S r   r   )r   xyr   r   r   r         z*RAGHandler._dot_product.<locals>.<genexpr>)r   zip)r	   br   r   r   _dot_product  r   zRAGHandler._dot_productc                    sd   t |tr	t|S t |tr| S t |tr" fdd| D S t |tr0 fdd|D S |S )Nc                    s    i | ]\}}t | |qS r   )r   
_json_safe)r   kvr   r   r   
<dictcomp>  s     z)RAGHandler._json_safe.<locals>.<dictcomp>c                    s   g | ]}  |qS r   )r   r   r   r   r   r   
<listcomp>  s    z)RAGHandler._json_safe.<locals>.<listcomp>)r   r   ro   r   	isoformatr   itemsr   )r   r   r   r   r   r     s   



zRAGHandler._json_safec                 C   sB   | j sd S z| j |}|rt|W S d W S  ty    Y d S w r   )r   rk   r   r   r   )r   keyrawr   r   r   
_cache_get  s   zRAGHandler._cache_getc                 C   sJ   | j sd S z| j |t|tj| |dd W d S  ty$   Y d S w )NTensure_ascii)r   setexrl   r   dumpsr   r   )r   r   r   ttl_secondsr   r   r   
_cache_set  s   *zRAGHandler._cache_setc                 C   s*   t | j d| d }d| S )N|utf-8zrag:emb:)hashlibsha256r   encode	hexdigest)r   textdigestr   r   r   _embed_cache_key!  s    
zRAGHandler._embed_cache_keyc                 C   sH   t ddd |d d D d }d| d| d| d| S )N,c                 S   s   g | ]}|d qS )z.6fr   r   r   r   r   r   &  r   z3RAGHandler._retrieval_cache_key.<locals>.<listcomp>@   r   zrag:ret::)r   r   joinr   r   )r   bidr   top_kmin_similarity
vector_sigr   r   r   _retrieval_cache_key%  s   ,zRAGHandler._retrieval_cache_keyc                 C   s   d| d| S )Nz	rag:conv:r   r   )r   r   conversation_idr   r   r   _conversation_cache_key)  s   z"RAGHandler._conversation_cache_keyc                 C   sp   d}d}t |tr*t|dp|dpd}|d}t |tr*t|dp(d}d| d| d| d| S )	Nglobalallcontext_scopescopecontextcall_idzrag:resume:r   )r   r   r   rk   )r   r   user_idmetadatar   r   r   r   r   r   _conversation_resume_key,  s   


z#RAGHandler._conversation_resume_keyNc                 C   sB   |rt |S | |||}| |}t|t r| r| S d S r   )r   r   r   r   strip)r   r   r   r   explicit_idr   cachedr   r   r   _resolve_conversation_id6  s   
z#RAGHandler._resolve_conversation_idc                 C   s.   |sd S |  |||}| |t|| j d S r   )r   r   r   r|   )r   r   r   r   r   r   r   r   r   _save_conversation_resume?  s   z$RAGHandler._save_conversation_resumec                 C   sp   | j sd S | ||}z"| j |tj| |dd | j |dd | j || j W d S  t	y7   Y d S w )NTr   ip)
r   r   rpushr   r   r   ltrimexpirer|   r   )r   r   r   message_objr   r   r   r   _push_cached_messageE  s   zRAGHandler._push_cached_messagec                 C   s   | j  | t| S r   )rv   r   r   )r   r   r   r   r   _qdrant_collection_nameP  s   z"RAGHandler._qdrant_collection_namec                 C   s(   t | d| d| d }|S )Nr   r   )r   r   r   r   )r   r   	source_idchunk_idr   r   r   r   _qdrant_point_idS  s   $zRAGHandler._qdrant_point_idc              
   C   s   | j rtd u r	dS | |}z#dd | j  jD }||vr/| j j|tjt|tjj	dd W dS  t
yK } ztd|| W Y d }~dS d }~ww )NFc                 S   s   g | ]}|j qS r   r   r   cr   r   r   r   \  s    z8RAGHandler._ensure_qdrant_collection.<locals>.<listcomp>)sizedistance)collection_namevectors_configTz)Failed to ensure Qdrant collection %s: %s)r   qmr  get_collectionscollectionscreate_collectionVectorParamsrl   DistanceCOSINEr   r   r   )r   r   vector_sizer  existingr   r   r   r   _ensure_qdrant_collectionW  s    
z$RAGHandler._ensure_qdrant_collectionc           
   	   C   s   t |dp|dpt }|d}|d}|d}tj| |dp)i dd}|d	t ||||||f |d
t ||f | }	|	d |fS )Nr  idtitlesource_type
source_urir   Tr   a  
            INSERT INTO rag_documents (bid, source_id, title, source_type, source_uri, metadata, is_active)
            VALUES (%s, %s, %s, %s, %s, %s, TRUE)
            ON DUPLICATE KEY UPDATE
                title = VALUES(title),
                source_type = VALUES(source_type),
                source_uri = VALUES(source_uri),
                metadata = VALUES(metadata),
                is_active = TRUE,
                updated_at = CURRENT_TIMESTAMP
            z>SELECT id FROM rag_documents WHERE bid = %s AND source_id = %s)	r   rk   uuiduuid4r   r   r   r   r   )
r   r   r   documentr  r  r  r  r   r   r   r   r   _upsert_documentg  s    



zRAGHandler._upsert_documentc                 C   s  |sddddS |   }d}d}d}g }d}z]zN| }	|D ]}
|
dp)g }|s-q | |	||
\}}|d7 }t|D ]\}}|dpM|dpMd }|sW|d7 }q>| |d	}|sf|d7 }q>| |}|dkrt|d7 }q>t|d
p| d| }t	|d|}|d}t
j| |dpi dd}|	dt|||||||t
j|ddt||f
 |d7 }| jr'td ur'|s| |t|}|r'z:t|t	|t|t|t	||| |dpi |
d|
d|
dd
}|tj| |||||d W q> ty& } ztd||| W Y d }~q>d }~ww q>q |  |r\| jr\z| jj| ||d W n ty[ } ztd|| W Y d }~nd }~ww |||dW W |  S  tyt   |   w |  w )Nr   	documentschunksskippedFr"     contentr   r:   r   r  z-chunk-chunk_indextoken_countr   Tr   ar  
                        INSERT INTO rag_chunks (
                            bid, document_id, chunk_id, chunk_index, content, token_count,
                            metadata, embedding, embedding_dim, embedding_norm
                        )
                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                        ON DUPLICATE KEY UPDATE
                            chunk_index = VALUES(chunk_index),
                            content = VALUES(content),
                            token_count = VALUES(token_count),
                            metadata = VALUES(metadata),
                            embedding = VALUES(embedding),
                            embedding_dim = VALUES(embedding_dim),
                            embedding_norm = VALUES(embedding_norm),
                            updated_at = CURRENT_TIMESTAMP
                        r  r  r  )
r   document_idr  r  r&  r%  r   r  r  r  )r  r   payloadz*Failed to stage Qdrant point for %s/%s: %s)r  pointszDQdrant upsert failed for bid %s; MySQL ingestion still succeeded: %s)r   r   rk   r  	enumerater   r   r   r   rl   r   r   r   r   lenr   r  r  r   PointStructr  r   r   r   r   upsertr  r   rollback)r   r   r!  r   inserted_docsinserted_chunksskipped_chunksqdrant_pointsqdrant_readyr   r  r"  r(  r  idxchunk
chunk_textr   normr  r&  r'  r   r)  r   r   r   r   ingest_documents  s   


P

	
zRAGHandler.ingest_documentsc              
   C   s>   t |pt }|dt ||t |tj|pi ddf |S )Na  
            INSERT INTO rag_conversations (bid, conversation_id, user_id, metadata)
            VALUES (%s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE
                metadata = COALESCE(VALUES(metadata), metadata),
                updated_at = CURRENT_TIMESTAMP
            Tr   )r   r  r  r   r   r   )r   r   r   r   r   r   conv_idr   r   r   _get_or_create_conversation  s    
z&RAGHandler._get_or_create_conversationc                 C   s   |   }zU| }	|||pi |pg t  d d}
|	dt|t|t|||tj|p.i ddtj|p6g ddf |	dt|t|f |	  | 
|||
 W |  d S |  w )NZroler%  r   retrieved_chunk_ids
created_atz
                INSERT INTO rag_messages (
                    bid, conversation_id, user_id, role, content, metadata, retrieved_chunk_ids
                )
                VALUES (%s, %s, %s, %s, %s, %s, %s)
                Tr   z
                UPDATE rag_conversations
                SET updated_at = CURRENT_TIMESTAMP
                WHERE bid = %s AND conversation_id = %s
                )r   r   r   utcnowr   r   r   r   r   r   r  r   )r   r   r   r   r>  r%  r   r?  r   r   r  r   r   r   save_message  s6   zRAGHandler.save_message   c              
   C   s|  | j r\| ||}zI| j |t| d}|rQg }|D ]0}z%t|}||d|d|dp3i |dp9g |dd W q tyK   Y qw |rQ|W S W n	 ty[   Y nw | 	 }	zX|	
 }
|
dt|t|t|f t|
 }|  |D ]1}| |di |d< | |dg |d< |d}|rt|d	r| |d< | ||| q|W |	  S |	  w )
Nr   r>  r%  r   r?  r@  r=  z
                SELECT role, content, metadata, retrieved_chunk_ids, created_at
                FROM rag_messages
                WHERE bid = %s AND conversation_id = %s
                ORDER BY id DESC
                LIMIT %s
                r   )r   r   lrangerl   r   r   r   rk   r   r   r   r   r   r   r   reverser   hasattrr   r  r   )r   r   r   limitr   r   rowsitemr   r   r   r@  r   r   r   get_conversation_messages5  sV   


z$RAGHandler.get_conversation_messagesc                 C   s   |   }zN| }|dt|t|f | }|s,t|i i g dddW |  S | |di }t|||dd|drJ|d nd dW |  S |  w )	Nz
                SELECT profile, profile_version, updated_at
                FROM rag_user_profiles
                WHERE bid = %s AND user_id = %s
                )	intereststraitslast_topicsr$  )r   profileprofile_versionrN  rO  
updated_at)r   rN  rO  rP  )	r   r   r   r   r   r   r   rk   r   )r   r   r   r   r   r   rN  r   r   r   get_user_profileg  s0   

zRAGHandler.get_user_profilerG   c                    sB   t d|pd } fdd|D }t|}dd ||D S )Nz[a-zA-Z0-9]{3,}r:   c                    s   g | ]	}| j vr|qS r   )	STOPWORDSr   wr   r   r   r     s    z.RAGHandler._extract_topics.<locals>.<listcomp>c                 S   s   g | ]\}}|qS r   r   )r   term_r   r   r   r     r   )r   findalllowerr   most_common)r   r   	max_termswordsfilteredcountsr   r   r   _extract_topics  s   zRAGHandler._extract_topicsc              
   C   s&  |  ||}|dpi }|dpi }|dpi }|dp!g }	| |}
|
D ]}t||dd ||< q)|
|	 d d }	|pCi }|di  D ]
\}}||t|< qL||d< ||d< |	|d< |  }z&| }|dt|t|t	j
|d	d
|ddf |  W |  |S |  w )NrN  rK  rL  rM  r   r$     aU  
                INSERT INTO rag_user_profiles (bid, user_id, profile, profile_version)
                VALUES (%s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    profile = VALUES(profile),
                    profile_version = profile_version + 1,
                    updated_at = CURRENT_TIMESTAMP
                Tr   rO  )rQ  rk   r^  rl   r   r   r   r   r   r   r   r   r   )r   r   r   	user_textprofile_updatesr  rN  rK  rL  rM  
new_topicstopicupdatesr   r   r   r   r   r   r   update_user_profile  s<   
	


zRAGHandler.update_user_profile  c                    s  |pd  }g |sS | |g }t|tr|dg }t|ts$g }|rd g d d  fdd}|D ]e}t|tsBq:t|dpN|dpNd}t|d	p\|d
p\d  }|scq:d u rq|dpp|dd|g   }	d u r||kst|	|kr|  ||dp|d	| q:|  rS |
 }
g }d |
D ],}|	| tdd |D |kr݈	d   d|  ddid  d7  g }q|r	d   d|  ddid S )Nr:   segmentsr   c               	      sZ   d dd D  } | r'd   | pdtdd  d7  g d d S )	N c                 S   $   g | ]}t | rt | qS r   r   r   )r   tr   r   r   r        $ z?RAGHandler._chunk_transcript.<locals>.flush.<locals>.<listcomp>zseg-unknown)speakersegment_startsegment_countr  r&  r   r   r$  )r   r   r   r,  r   r&  r"  current_speakercurrent_textro  r   r   flush  s   z+RAGHandler._chunk_transcript.<locals>.flushrn  speaker_labelrm  r   
transcript
start_timestartrh  c                 s   s    | ]	}t |d  V  qdS )r$  N)r,  rS  r   r   r   r     r   z/RAGHandler._chunk_transcript.<locals>.<genexpr>ztxt-rq  r$  )r   r   r   r   rk   r   r   r   r,  r   splitr   )r   rx  speaker_segments	max_charsrg  rv  segmentrn  seg_text	projectedr[  chunk_wordswordr   rs  r   _chunk_transcript  sx   


 
zRAGHandler._chunk_transcriptT  Fcall_transcriptc           K      C   s  |  t|}| d}| d}| d}	| d}
| d}|  }z| }| ||	sB|ddddd|	 dd	W |  S | ||}| ||}| ||
}| ||}|r^|n|rb|nd }|sw|ddddd
| d	W |  S | ||}| ||	}|r| ||
nt }|r| ||nt }d|v rdnd}d|v rdnd}d|v rdnd}d|v rdnd}d|v rdnd}d|v rdnd}d|v rdnd}d|v rdnd}d|v rdnd}d|v rdnd} d |v rd!nd}!d"|v rd#nd}"d$|v rd%nd}#d&|v rd'nd}$d(|v rd)nd}%d*|v rd+nd}&d,d-g}'g }(d.d/ |p)g D })|)rGd0d1gt	|) }*|'
d2|* d3 |(|) |r^||kr^d|v sYd|v r^|'
d4 d5g d6| d7| d8| d9| d:| d;| d<| d=| d>|  d?|r|! d@|" dA|# dB|$ dCndD dE|r|% dF|& dGndH dI| dJ|	 dK|rdL|
 dMnd5 dN|rdL| dOnd5 dPdQ|' dR| dS}+|(
t| ||+|( | },W |  n|  w |,s|dddddTS g }-d}.d}/|,D ]}0t|0dU}1| dV|1 }2|0dWp2d5}3|0d}4| |3|4}5|5sG|.dX7 }.q|0dY}6|0d&}7|0d$}8|0d"}9|0d}:|0d};|0dZ}<| |<t|<tru|<nd }=|0d[}>g }?|6r|?
d\|6  |7d ur|?
d]|7  |8r|?
d^|8  |9r|?
d_|9  |:r|?
d`|:  |;r|?
da|;  |>r|?
db|>  |=rz|?
dctj| |=ddde  W n
 ty   Y nw |?rdfdg|? }@|5ddhd|@didjdkdl t|5D ]	\}A}B|A|Bdm< q
|sC|  }Cz"|C }D|Ddn||2f |D r8|.dX7 }.W |C  qW |C  n|C  w dod/ |5D }E| |E}Fg }Gt|5|FD ]!\}B}H|Hs_qV|G
|Bdp |Bdm |Bdq |H|Bdrpri ds qV|Gs|.dX7 }.qi dt|dU|1d|0dd|0dd|0dr|0d nd d|0dd|;d|:d|0dd|0dd |6d"|9d$|8d&|7d[|>dZ|=}I|-
|2du|1 |dv| dw|1 |I|Gdx |/dX7 }/q|-r| ||-nddddy}J||/|Jdzd|Jd{d|.|Jd|d dTS )}N_calls
_raw_calls_sarvamresponse_callanalytics_bantr   zTable z
 not found)r   processed_callsingested_documentsingested_chunksr#  errorz No source calls table found for 	agentnamezc.agentnameNULLcustomer_callinfozc.customer_callinfocall_starttimezc.call_starttime	directionzc.directionsales_intentzc.sales_intentcall_purposezc.call_purposezc.call_starttime DESCzs.created_at DESCr|  zs.speaker_segmentsdurationz
s.durationlanguagez
s.languagesummaryz	a.summaryobjection_typeza.objection_type	sentimentza.sentimentquality_scoreza.quality_scoreprofile_jsonzb.profile_jsonprofile_summaryzb.profile_summaryzs.transcript IS NOT NULLzs.transcript != ''c                 S   ri  r   rj  r  r   r   r   r   Y  rl  z3RAGHandler.backfill_transcripts.<locals>.<listcomp>r   z%szc.callid IN ()a  (LOWER(COALESCE(c.sales_intent,'')) IN ('high','medium','pre-sales','presales','pre_sales') OR LOWER(COALESCE(c.call_purpose,'')) LIKE '%%sale%%' OR LOWER(COALESCE(c.call_purpose,'')) LIKE '%%prospect%%' OR LOWER(COALESCE(c.call_purpose,'')) LIKE '%%demo%%')r:   zJ
                SELECT
                    c.callid,
                    z# AS agentname,
                    z+ AS customer_callinfo,
                    z( AS call_starttime,
                    z# AS direction,
                    z& AS sales_intent,
                    zH AS call_purpose,
                    s.transcript,
                    z* AS speaker_segments,
                    z" AS duration,
                    z" AS language,
                    z AS analytics_summary, z AS objection_type, z AS sentiment, z AS quality_scorez[NULL AS analytics_summary, NULL AS objection_type, NULL AS sentiment, NULL AS quality_scorez+
                    ,
                    z AS bant_profile, z AS bant_summaryz*NULL AS bant_profile, NULL AS bant_summaryz
                FROM `z` c
                JOIN `za` s ON s.callid COLLATE utf8mb4_unicode_ci = c.callid COLLATE utf8mb4_unicode_ci
                zLEFT JOIN `zP` a ON a.callid COLLATE utf8mb4_unicode_ci = c.callid COLLATE utf8mb4_unicode_ciz
                zP` b ON b.callid COLLATE utf8mb4_unicode_ci = c.callid COLLATE utf8mb4_unicode_ciz
                WHERE z AND z
                ORDER BY z&
                LIMIT %s
            )r   r  r  r  r#  callidz-call-rx  r$  analytics_summarybant_profilebant_summaryz	Summary: zQuality Score: zSentiment: zObjection Type: zCall Purpose: zSales Intent: zCustomer Profile Summary: zCustomer Profile (BANT): Tr   zCALL ANALYSIS CONTEXT

z
analysis-0systemanalysis_context)rn  
chunk_typerq  r&  zFSELECT id FROM rag_documents WHERE bid = %s AND source_id = %s LIMIT 1c                 S      g | ]}|d  qS rr  r   r  r   r   r   r     r   r  r   r   )r  r&  r   r   r   r   zCall zcall://r`   )r  r  r  r  r   r"  r   r!  r"  r#  )r   r   r   r   r   r   r   setr   r,  r   extendrl   r   r   rk   r  r   r   r   r   r   r   r   insertr+  r   _generate_embeddingsr   r   r9  )Kr   r   presales_onlyrG  overwrite_existingr  callidscalls_tableraw_calls_tablesarvam_tableanalytics_table
bant_tabler   r   	has_callshas_raw_callshas_analyticshas_bantsource_calls_tablesource_columnssarvam_columnsanalytics_columnsbant_columnscol_agentnamecol_customercol_call_startcol_directioncol_sales_intentcol_call_purpose
order_exprcol_speaker_segmentscol_durationcol_languagecol_analytics_summarycol_objection_typecol_sentimentcol_quality_scorecol_bant_profilecol_bant_summarywhere_partsparamsnormalized_callidsplaceholdersqueryrH  r!  r#  r  r   r  r  rx  r|  r"  r  r  r  r  r  r  bant_profile_rawr  r  analysis_linesanalysis_textr
   r6  
check_conncheck_cursorchunk_texts
embeddingsfinal_chunksembr   resultr   r   r   backfill_transcripts  s  	






_
O


	
$











$






	




zRAGHandler.backfill_transcriptsc                 C   s`  |  |}| |}|r| |S | jsd S t|pd}|g}t|dkr6|d d |d d |d d g}t|D ]s\}}z:d|i}| jj| jt	
|ddd}	t	|	d  }
|
d	}|seW  d S | |}|rt| ||| j |W   S  ty } z)t|}d
|v pd|v }|r|t|d k rtdt||d   W Y d }~q: d }~ww d S )Nr:   i.  i@  i  	inputTextapplication/jsonmodelIdbodyacceptcontentTyper  r   zToo many input tokensValidationExceptionr$  z@Embedding input too large; retrying with shorter text (%s chars))r   r   r   r   r   r,  r+  invoke_modelr   r   r   r   readrk   r   rz   r   r   r   )r   r   	cache_keyr   original_text
candidatesr5  	candidater  responser)  r   r   r   msgtoken_errorr   r   r   _generate_single_embedding  sJ   


"


z%RAGHandler._generate_single_embeddingc                 C   s
   |  |S r   )r  )r   r   r   r   r   _generate_query_embeddingE  s   
z$RAGHandler._generate_query_embeddingc                 C   s6   g }|D ]}|  |}|s|d  q|| q|S r   )r  r   )r   textsvectorsr   r   r   r   r   r  H  s   
zRAGHandler._generate_embeddings  c                 C   s  |  |}|std| |}|dkrtdt|p| j}t|d ur&|n| j}| ||||}| |}	|	r;|	S | j	rzv| 
|}
| j	j|
||ddd}g }|D ]C}tt|dd}||k rbqSt|d	i pii }||d
|d|d|d|d|d|d|dpi t|dd	 qS|jdd dd |d | }| ||| j |r|W S W n ty } ztd|| W Y d }~nd }~ww |  }z| }|dt|t|f | }W |  n|  w g }|D ]e}|dt|krq|  |d}|sq|t|dpd }|dkr!q| ||| }||k r/q||d
 |d |d|d|d|d|d| |di t|dd	 q|jdd dd |d | }| ||| j |S )Nz7query_embedding is required and must be a numeric arrayr   z#query_embedding norm cannot be zeroTF)r  query_vectorrG  with_payloadwith_vectorsscoreg        r)  r  r(  r  r  r  r  r%  r      )	r  r(  r  r  r  r  r%  r   
similarityc                 S      | d S Nr  r   r   r   r   r   <lambda>      z,RAGHandler.retrieve_chunks.<locals>.<lambda>)r   rE  zBQdrant search failed for bid %s; falling back to MySQL vectors: %sa  
                SELECT
                    c.id,
                    c.chunk_id,
                    c.document_id,
                    c.content,
                    c.metadata,
                    c.embedding,
                    c.embedding_dim,
                    c.embedding_norm,
                    d.source_id,
                    d.title,
                    d.source_type,
                    d.source_uri
                FROM rag_chunks c
                JOIN rag_documents d ON d.id = c.document_id
                WHERE c.bid = %s AND d.is_active = TRUE
                ORDER BY c.updated_at DESC
                LIMIT %s
                embedding_dimr   embedding_normc                 S   r  r  r   r  r   r   r   r    r  )r   r   r   rl   rn   ro   rp   r   r   r   r  searchgetattrr   rk   roundsortr   r{   r   r   r   r   r   r   r   r   r   r,  r   r   )r   r   query_embeddingr   r   candidate_limitr   q_normr  r   r  hitsscoredhitr  r)  r  r   r   r   rH  r   r  denomr  r   r   r   retrieve_chunksR  s   






zRAGHandler.retrieve_chunksc                 C   s   g }t |ddD ] \}}|d| d|dpd d|d d	|d
  qg }|| j d  D ]}	|	dd}
|	d
d}||
 d|  q3d}|rUd	|nd}|r^d|nd}d| dtj|dd d| d| d| d}|S )Nr$  )rz  z[Source z] title=r  Untitledz score=r  r  r%  r>  r@   r:   z: zYou are an enterprise RAG assistant. Use only provided context when factual claims are needed. If context is missing, state uncertainty and ask a targeted follow-up question. Keep response concise and actionable.None

zSYSTEM:
z

USER PROFILE:
Tr   z

RECENT MEMORY:
z

RETRIEVED CONTEXT:
z

USER QUESTION:
z

Answer with direct guidance.)r+  r   rk   rq   r   r   r   )r   messageretrieved_chunksrN  memory_messagescontext_blocksr5  r6  memory_linesmr>  r%  system_promptmemory_textcontext_textpromptr   r   r   _build_prompt  s6   $zRAGHandler._build_promptc                 C   s  | j sd S |pi }dd|igdgt|d| jddt|d| jdd	t|d
| jdddd}| j jt|pB| jt	|ddd}t
|d  }|di }|di }|dg }	g }
|	D ]}t|trw|dnd }|r|
| qk|
rd|
 S d S )Nr@   r   )r>  r%  
max_tokensRAG_MAX_TOKENSX  temperatureRAG_TEMPERATURErI   top_p	RAG_TOP_P?)max_new_tokensr  r  )messagesinferenceConfigr  r  r  outputr  r%  r  )r   rl   rk   rj   ro   r  r   r   r   r   r   r  r   r   r   r   r   )r   r  
model_nameruntime_configrequest_bodyr  r)  r%  r  r%  r  rI  r   r   r   r   _invoke_bedrock_chat_model  s:   
z%RAGHandler._invoke_bedrock_chat_modelc           	      C   s   |pi }t |p| jpd }|sd S ||dt|d| jddt|d| jddt|d	| jd
ddd}tj| j	 d|| j
d}|  |jrW| ni }|d}|rft | S d S )Nr:   Fr  r  rI   r  r   r!  r  r  r  )r  r  num_predict)modelr  streamoptionsz/api/generate)r   timeoutr  )r   r   r   ro   rk   rj   rl   requestspostr   r   raise_for_statusr%  r   )	r   r  r&  r'  r+  r)  r  dataanswerr   r   r   _invoke_ollama_chat_model  s*   


z$RAGHandler._invoke_ollama_chat_modelc                 C   s   t |p| jpd  }|pi }|dkr| j|||dS |dkr)| j|||dS |dkrdt | jp2d  }|dkrP| j|||d}|rH|S | j|||dS | j|||d}|r\|S | j|||dS | j|||d}|rp|S | j|||dS )Nr^   ollama)r&  r'  auto)r   r   r   rX  r4  r)  )r   r  providerr&  r'  selected_providerdefault_providerr3  r   r   r   _invoke_chat_model(  s*   zRAGHandler._invoke_chat_modelc                 C   s2   |s	 dS |d }| ddd d }d| dS )NzI do not have enough context in the knowledge base to answer this accurately yet. Please ingest relevant documents or include a query embedding.r   r%  r:   i  z9Using the closest stored context, here is what I found:

zq

If you want a stronger answer, provide a query embedding or configure Bedrock credentials for model generation.rk   )r   r  r  topsnippetr   r   r   _fallback_answerD  s   zRAGHandler._fallback_answerc
              
   C   s(  |rt | std| j|||pi |d}|  }
z|
 }| j|||||d}|
  W |
  n|
  w | 	|||p@i | |d u rL| 
|}|d u rTtd| j||||d}| j||| jd}| j||||	d}| j|||d||d	d
 |D d | j||||d}t|trt |dpd }|rd| d| }d }d }d }d }t|tr|d}|d}|d}z| j||||d}W n ty } ztd| W Y d }~nd }~ww |s| ||}| j|||d|d|p| jidd
 |D d ||dd
 |D |t  d dS )Nzmessage is required)r   r   r   r   )r   r   r   r   zUquery_embedding is required unless Bedrock embedding credentials/model are configured)r   r  r   r   )rG  )r`  ra  r@   c                 S   r  r  r   r  r   r   r   r     r   z$RAGHandler.query.<locals>.<listcomp>)r   r   r   r>  r%  r   r?  )r  r  rN  r  agent_instructionr:   zAGENT INSTRUCTION:
r  llm_providerllm_model_namellm_runtime_config)r7  r&  r'  z,Bedrock chat call failed, using fallback: %s	assistantr   c                 S   r  r?  r   r  r   r   r   r     r   c              	   S   s8   g | ]}|d  | d| d| d| ddqS )r  r  r  r  r   )r  r  r  r  r   r;  r  r   r   r   r     s    r<  )r   r3  retrieved_contextrN  	timestamp)r   r   r   r   r   r   r;  r   r   r   r  r  rJ  rq   re  rB  r  r   r   rk   r:  r   r   r   r>  rn   r   rA  r   )r   r   r   r  r  r   r   r   r   ra  r   r   r  memoryrN  r  r@  r3  model_providerr&  runtime_cfgr   r   r   r   r  S  s   









zRAGHandler.queryr   )NN)rC  )rG   )rf  )Tr  Fr  N)NNr  )NNN)NNNNNN)1__name__
__module____qualname____doc__rR  r   r   staticmethodr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r  r  r  r  r9  r;  rB  rJ  rQ  r^  re  r  r  r  r  r  r  r  r)  r4  r:  r>  r  r   r   r   r   r      st    F
a



	

	
y

)2
"

.[
  	)

v
 
&
r   )r   loggingr   r}   r   r  r   r  r   r   decimalr   r   r   r/  pymysql.cursorsr   r   r   qdrant_client.httpr   r  r   	getLoggerrJ  r   r   r   r   r   r   <module>   s.    
