o
    yÓ	i/i  ć                   @   s   d Z ddlZddlmZmZ ddlZddlZddlZddlmZ ddl	m
Z
mZmZmZ ddlmZ e e”ZG dd dZe ZdS )	z
FIXED Database Service for Voice Bot - Concurrent Call Support
Handles MySQL database operations with proper concurrency support.
é    N)ŚErrorŚpooling)Śdatetime)ŚDictŚAnyŚOptionalŚList)ŚConfigc                   @   sP  e Zd ZdZdZe ” Zdd Zdd Z	dd Z
d	ed
efddZd	ed
efddZdeeef d
ee fddZded	edeeef d
efddZ	d(ded	edededee d
efddZded	ed
eeeef  fddZ	d)ded	eded
efdd Zd	ed
eeeef  fd!d"Zded	ed#ed
efd$d%Zded	ed
efd&d'ZdS )*ŚDatabaseServicezRHandles all database operations for the voice bot system with concurrency support.Nc              
   C   s0   t jt jt jt jt jddddd	| _|  ”  d S )NTZvoicebot_poolé   )	ZhostZdatabaseŚuserZpasswordŚcharsetZ
autocommitZ	pool_nameZ	pool_sizeZpool_reset_session)r	   ZDATABASE_HOSTZDATABASE_NAMEZDATABASE_USERZDATABASE_PASSWORDZDATABASE_CHARSETŚconfigŚ_ensure_pool)Śself© r   śservices/database_service.pyŚ__init__   s   ÷zDatabaseService.__init__c                 C   s¦   t jdu rQt j? t jdu r7ztjjjdi | j¤t _t 	d” W n t
y6 } z	t d| ”  d}~ww W d   dS W d   dS 1 sJw   Y  dS dS )z4Ensure connection pool is initialized (thread-safe).Nz1Database connection pool initialized successfullyz Error creating connection pool: r   )r
   Ś_connection_poolŚ
_pool_lockŚmysqlZ	connectorr   ZMySQLConnectionPoolr   ŚloggerŚinfor   Śerror©r   Śer   r   r   r   '   s    

žü’"ü’zDatabaseService._ensure_poolc              
   C   s8   zt j ” W S  ty } z	t d| ”  d}~ww )zGet a connection from the pool.z$Error getting connection from pool: N)r
   r   Śget_connectionr   r   r   r   r   r   r   r   3   s   žzDatabaseService.get_connectionŚbusiness_idŚreturnc                 C   s
   | dS )z<Return dynamic call_history table name based on business_id.Z_call_historyr   )r   r   r   r   r   Śget_call_history_table;   s   
z&DatabaseService.get_call_history_tablec              
   C   sā   d}d}z^z3|   ” }| ” }|  |”}d| d}| |” t d| d” W W |r/| ”  |r7| ”  dS dS  tyb } zt d| ” W Y d}~W |rT| ”  |r\| ”  dS dS d}~ww |ri| ”  |rp| ”  w w )	z2Create the call_history table if it doesn't exist.Nz)
            CREATE TABLE IF NOT EXISTS `a
	  ` (
                `id` INT AUTO_INCREMENT PRIMARY KEY,
                `callid` VARCHAR(255) NOT NULL COMMENT 'Unique call identifier',
                `business_id` VARCHAR(255) COMMENT 'Business ID or session ID',
                `agent_id` VARCHAR(255) COMMENT 'Agent ID assigned to the call',
                `phone_number` VARCHAR(20) COMMENT 'Phone number of the caller',
                `did` VARCHAR(20) COMMENT 'DID (Direct Inward Dialing) number',
                `stream_id` VARCHAR(255) COMMENT 'Stream ID for the call session',
                `conversation_data` JSON COMMENT 'Full conversation data in JSON format',
                `user_messages` TEXT COMMENT 'All user messages concatenated',
                `bot_messages` LONGTEXT COMMENT 'All bot responses concatenated',
                `call_start_time` DATETIME COMMENT 'When the call started',
                `call_end_time` DATETIME COMMENT 'When the call ended',
                `call_duration` INT COMMENT 'Call duration in seconds',
                `call_status` ENUM('active', 'completed', 'failed', 'transferred') DEFAULT 'active',
                `transfer_status` VARCHAR(50) COMMENT 'Transfer status if applicable',
                `transfer_number` VARCHAR(20) COMMENT 'Number transferred to',
                `confidence_scores` JSON COMMENT 'Confidence scores for transcriptions',
                `total_messages` INT DEFAULT 0 COMMENT 'Total number of messages exchanged',
                `user_message_count` INT DEFAULT 0 COMMENT 'Number of user messages',
                `bot_message_count` INT DEFAULT 0 COMMENT 'Number of bot messages',
                `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                INDEX `idx_callid` (`callid`),
                INDEX `idx_business_id` (`business_id`),
                INDEX `idx_agent_id` (`agent_id`),
                INDEX `idx_phone_number` (`phone_number`),
                INDEX `idx_stream_id` (`stream_id`),
                INDEX `idx_call_start_time` (`call_start_time`),
                INDEX `idx_call_status` (`call_status`),
                INDEX `idx_did` (`did`)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
            zTable 'z(' created successfully or already existsTzError creating table: F)	r   Ścursorr   Śexecuter   r   Ścloser   r   )r   r   Ś
connectionr    Ścall_history_tableZcreate_table_queryr   r   r   r   Ścreate_call_history_table?   s@   
’
#’ś’ś
’z)DatabaseService.create_call_history_tableŚsession_datac           
   
   C   sV  d}d}zzg|   ” }|jdd}| d” | ”  | dd”}|  |” |  |”}| ” }d|v rAt|d t	rAt
 |d ”|d< d| d	}| ||” |j}| ”  t d
| ” |W W |re| ”  |rl| ”  S S  ty }	 z$t d|	 ” |r| ”  W Y d}	~	W |r| ”  |r| ”  dS dS d}	~	ww |r£| ”  |rŖ| ”  w w )z@Create a new call session in the database with proper isolation.NT©Z
dictionaryś.SET TRANSACTION ISOLATION LEVEL READ COMMITTEDr   ŚdefaultŚconversation_dataz
            INSERT INTO `aį  ` 
            (callid, business_id, agent_id, phone_number, did, stream_id, conversation_data, 
             call_start_time, call_status, total_messages, user_message_count, bot_message_count)
            VALUES (%(callid)s, %(business_id)s, %(agent_id)s, %(phone_number)s, %(did)s, %(stream_id)s,
                    %(conversation_data)s, %(call_start_time)s, %(call_status)s, 
                    %(total_messages)s, %(user_message_count)s, %(bot_message_count)s)
            zCreated call session with ID: zError creating call session: )r   r    r!   Śstart_transactionŚgetr%   r   ŚcopyŚ
isinstanceŚdictŚjsonŚdumpsZ	lastrowidŚcommitr   r   r"   r   r   Śrollback)
r   r&   r#   r    r   r$   Zprocessed_dataZinsert_queryŚ
session_idr   r   r   r   Ścreate_call_sessionx   sV   


’	
’ų’ų
’z#DatabaseService.create_call_sessionr4   Śupdate_datac              
   C   sę  d}d}z]zå|   ” }|jdd}| d” | ”  |  |”}t d| d| ” g }i }| ” D ]4\}	}
|	dv rDt|
t	rDt
 |
”}
| d|	 d	|	 d
” |
||	< t d|	 dt|
dd  d” q2|st d” W W |rv| ”  |r~| ”  dS dS ||d< d| dd |” d}t d| ” t d| ” | ||” |j}t d| d| ” |dkrÖt d| ” | ”  W W |rĢ| ”  |rŌ| ”  dS dS | ”  W W |rā| ”  |rź| ”  dS dS  ty' } z/t d| ” |r| ”  ddl}| ”  W Y d}~W |r| ”  |r!| ”  dS dS d}~w tyb } z/t d| ” |r>| ”  ddl}| ”  W Y d}~W |rS| ”  |r\| ”  dS dS d}~ww |rj| ”  |rr| ”  w w )zAUpdate an existing call session with proper concurrency handling.NTr'   r(   zUpdating session ś
 in table )r*   Śconfidence_scoresś`z` = %(z)szUpdate field ś: éd   z...zNo fields to updater4   ś
            UPDATE `z` 
            SET z, z3
            WHERE id = %(session_id)s
            zExecuting update query: śQuery values: zUpdated call session ś, rows affected: r   z!No rows were updated for session FzError updating call session: z(Unexpected error updating call session: )r   r    r!   r+   r   r   r   Śitemsr.   r/   r0   r1   ŚappendŚstrŚwarningr"   ŚjoinŚrowcountr3   r2   r   r   Ś	tracebackŚ	print_excŚ	Exception)r   r4   r   r6   r#   r    r$   Zset_clausesŚvaluesŚkeyŚvalueŚupdate_queryŚrows_affectedr   rE   r   r   r   Śupdate_call_session©   sØ   


&
(’Ų’ž’ģ’ļ	’ö’ö
’z#DatabaseService.update_call_sessionŚmessage_typeŚmessage_contentŚconfidence_scorec              
   C   sf  d}d}zzg|   ” }|jdd}| d” | ”  t d| d| d| ” |  |”}d| d	}	| |	|f” | ” }
|
sbt d
| d| ” | 	”  W W |rX| 
”  |r`| 
”  dS dS |
 di ”}t|trzt |”}W n tjy   i }Y nw | dg ”}||t ”  ” |pd|d}| |” |||
 d”|
 d”d|
 d”r²|
 d” ” nd|
 d”ræ|
 d” ” nddd}g }g }g }|D ],}| d”dkré| | dd”” | | dd”” qĶ| d”dkrł| | dd”” qĶd| d}t |”d  |”d  |”t |”t|t|t||d!}| ||” |j}|d"krLt d#| d$” | 	”  W W |rA| 
”  |rJ| 
”  dS dS | ”  t d%| d| ” W W |rd| 
”  |rm| 
”  dS dS  ty¢ } z't d&| ” |r| 	”  W Y d}~W |r| 
”  |r| 
”  dS dS d}~ww |rŖ| 
”  |r²| 
”  w w )'z>Add a message to an existing call session with atomic updates.NTr'   r(   zAdding z message to session z for business z²
            SELECT conversation_data, user_messages, bot_messages, confidence_scores,
                   total_messages, user_message_count, bot_message_count
            FROM `ś4` 
            WHERE id = %s FOR UPDATE
            zSession z not found for business Fr*   Śmessagesg        )ŚspeakerŚmessageZ	timestampŚ
confidencer4   ZcallidŚphone_number)r4   Zcall_idrV   Ścall_start_timeŚcall_end_time)ŚstartŚend)rR   Śsession_infoZ
timestampsrS   r   rT   Ś rU   Zbotr<   aė  ` 
            SET conversation_data = %(conversation_data)s,
                user_messages = %(user_messages)s,
                bot_messages = %(bot_messages)s,
                confidence_scores = %(confidence_scores)s,
                total_messages = %(total_messages)s,
                user_message_count = %(user_message_count)s,
                bot_message_count = %(bot_message_count)s,
                updated_at = CURRENT_TIMESTAMP
            WHERE id = %(session_id)s
            z | )r*   Śuser_messagesŚbot_messagesr8   Śtotal_messagesZuser_message_countZbot_message_countr4   r   zFailed to update session z with new messagezSuccessfully added z!Error adding message to session: )r   r    r!   r+   r   r   r   Śfetchoner   r3   r"   r,   r.   rA   r0   ŚloadsŚJSONDecodeErrorr   ŚnowZ	isoformatr@   r1   rC   ŚlenrD   r2   r   )r   r4   r   rN   rO   rP   r#   r    r$   Śselect_queryŚsessionr*   rR   Znew_messager]   r^   r8   ŚmsgrK   Śupdate_valuesrL   r   r   r   r   Śadd_message_to_sessionņ   sŅ   

ż[’¦
’
ū
żžł’ų
’ō’ų’ų
’z&DatabaseService.add_message_to_sessionc           	   
   C   sV  d}d}zzm|   ” }|jdd}|  |”}d| d}| ||f” | ” }|rb| d”rEzt |d ”|d< W n tjt	fyD   Y nw | d”rbzt |d ”|d< W n tjt	fya   Y nw |W W |rk| 
”  |rr| 
”  S S  ty } zt d| ” W Y d}~W |r| 
”  |r| 
”  dS dS d}~ww |r£| 
”  |rŖ| 
”  w w )	zGet a call session by ID.NTr'   z
            SELECT * FROM `z)` 
            WHERE id = %s
            r*   r8   zError getting call session: )r   r    r   r!   r`   r,   r0   ra   rb   Ś	TypeErrorr"   r   r   r   )	r   r4   r   r#   r    r$   re   Śresultr   r   r   r   Śget_call_sessionr  sZ   
’
’
’
’ś’ś
’z DatabaseService.get_call_sessionŚ	completedŚcall_statusc              
   C   sÜ  d}d}zŪzŖ|   ” }|jdd}| d” | ”  |  |”}d| d}| ||f” | ” }|sG| ”  W W |r=| ”  |rE| ”  dS dS | d”}	t	 
” }
d	}|	rjt|	trbt	 |	 d
d””}	t|
|	  ” }d| d}|
|||d}| ||” |j}|d	kr| ”  W W |r| ”  |r| ”  dS dS | ”  W W |r¦| ”  |r®| ”  dS dS  tyß } z$t d| ” |rÅ| ”  W Y d}~W |rŃ| ”  |rŁ| ”  dS dS d}~ww |rę| ”  |rķ| ”  w w )zEnd a call session.NTr'   r(   z*
            SELECT call_start_time FROM `rQ   FrW   r   ŚZz+00:00r<   zł` 
            SET call_end_time = %(call_end_time)s,
                call_duration = %(call_duration)s,
                call_status = %(call_status)s,
                updated_at = CURRENT_TIMESTAMP
            WHERE id = %(session_id)s
            )rX   Ścall_durationrn   r4   zError ending call session: )r   r    r!   r+   r   r`   r3   r"   r,   r   rc   r.   rA   ZfromisoformatŚreplaceŚintZtotal_secondsrD   r2   r   r   r   )r   r4   r   rn   r#   r    r$   re   rf   rW   rX   rp   rK   rh   rL   r   r   r   r   Śend_call_session  s   

’-’
Ó
’
ü’õ’ų’ų
’z DatabaseService.end_call_sessionc              
   C   s  d}d}z±z|   ” }|jdd}|  |”}d| d}| |” | ” }g }|r{dd |jD }|D ]H}	tt||	}
|
 d”rXzt	 
|
d ”|
d< W n t	jtfyW   Y nw |
 d	”ruzt	 
|
d	 ”|
d	< W n t	jtfyt   Y nw | |
” q2|W W |r| ”  |r| ”  S S  tyµ } zt d
| ” g W  Y d}~W |r©| ”  |r°| ”  S S d}~ww |r¼| ”  |rĆ| ”  w w )z%Get all call sessions for a business.NTr'   zSELECT * FROM `z` ORDER BY created_at DESCc                 S   s   g | ]}|d  qS )r   r   )Ś.0Zdescr   r   r   Ś
<listcomp>÷  s    z9DatabaseService.get_all_call_sessions.<locals>.<listcomp>r*   r8   z!Error getting all call sessions: )r   r    r   r!   ŚfetchallŚdescriptionr/   Śzipr,   r0   ra   rb   rj   r@   r"   r   r   r   )r   r   r#   r    r$   ŚqueryZresultsZsessionsŚcolumnsrk   rf   r   r   r   r   Śget_all_call_sessionsč  s`   


’
’
’ś
’ś
’z%DatabaseService.get_all_call_sessionsŚtransfer_numberc              
   C   s“  d}d}zĒz|   ” }|jdd}| d” | ”  |  |”}t d| d| ” d| d}||d	}t d
| ” t d| ” | ||” |j}	t d| d|	 ” |	dkr{t d| d” | 	”  W W |rq| 
”  |ry| 
”  dS dS | ”  t d| ” W W |r| 
”  |r| 
”  dS dS  tyĖ }
 z't d| d|
 ” |r±| 	”  W Y d}
~
W |r½| 
”  |rÅ| 
”  dS dS d}
~
ww |rŅ| 
”  |rŁ| 
”  w w )z=Update call session with transfer status and transfer number.NTr'   r(   z%Updating transfer status for session r7   r<   zķ` 
            SET transfer_status = 1,
                transfer_number = %(transfer_number)s,
                call_status = 'transferred',
                updated_at = CURRENT_TIMESTAMP
            WHERE id = %(session_id)s
            )r4   r|   z!Executing transfer update query: r=   z$Updated transfer status for session r>   r   zNo rows updated for session z - session may not existFz1Successfully updated transfer status for session z+Error updating transfer status for session r:   )r   r    r!   r+   r   r   r   rD   r   r3   r"   r2   r   )r   r4   r   r|   r#   r    r$   rK   rH   rL   r   r   r   r   Śupdate_transfer_status  sl   

’
ž’ō’ų’ų
’z&DatabaseService.update_transfer_statusc              
   C   s  | j st d” g ddS d}d}z­zI|  ” }|jdd}d}||d}| ||” | ” }d	d
lm} | 	|”}	| 
|	”}
|	|
||t|t|	ddW W |rT| ”  |r[| ”  S S  ty } z&t d| d| ” g t|dW  Y d}~W |r| ”  |r| ”  S S d}~w ty¾ } z&t d| d| ” g t|dW  Y d}~W |r²| ”  |r¹| ”  S S d}~ww |rÅ| ”  |rĢ| ”  w w )zc
        Get clean conversation data for a session (only transcribed speech, no raw audio)
        zDatabase is disabledzDatabase disabled)rR   r   NTr'   zė
            SELECT message, speaker, timestamp, confidence, session_id
            FROM conversation_logs 
            WHERE session_id = %(session_id)s AND business_id = %(business_id)s
            ORDER BY timestamp ASC
            )r4   r   é   )ŚConversationCleaner)r4   r   r_   Ścleaned_messages)rR   Śsummaryr[   z2Error getting clean conversation data for session r:   z=Unexpected error getting clean conversation data for session )Śenabledr   rB   r   r    r!   rv   Zconversation_cleanerr   Zclean_conversation_messagesZget_conversation_summaryrd   r"   r   r   rA   rG   )r   r4   r   r#   r    ry   rH   rR   r   r   r   r   r   r   r   Śget_clean_conversation_dataO  sl   

ž

üż
’÷
’ś
’ś
’z+DatabaseService.get_clean_conversation_data)N)rm   )Ś__name__Ś
__module__Ś__qualname__Ś__doc__r   Ś	threadingZLockr   r   r   r   rA   r   Śboolr%   r   r   r   rr   r5   rM   Śfloatri   rl   rs   r   r{   r}   r/   r   r   r   r   r   r
      s@    9"1Kž’’ž
ž "+’’
’L-:r
   )r   Zmysql.connectorr   r   r   r0   Zloggingr   r   Śtypingr   r   r   r   r   r	   Z	getLoggerr   r   r
   Zdatabase_servicer   r   r   r   Ś<module>   s"    
     
