# """
# ElevenLabs WebSocket Service
# Integrates ElevenLabs Conversational AI WebSocket API for service_type 4
# Handles real-time audio streaming between Mcube and ElevenLabs
# """

# import asyncio
# import json
# import base64
# import time
# import websockets
# from websockets.protocol import State
# from typing import Optional, Callable
# from abc import ABC

# from config import Config
# from services.log_utils import Log


# class ElevenLabsWebSocketService(ABC):
#     """
#     ElevenLabs WebSocket service for real-time conversational AI.
#     Handles bidirectional audio streaming and message processing.
#     """
    
#     def __init__(self, agent_id: Optional[str] = None, api_key: Optional[str] = None):
#         """
#         Initialize ElevenLabs WebSocket service.
        
#         Args:
#             agent_id: ElevenLabs agent ID (from config or bot_config)
#             api_key: ElevenLabs API key (from config)
#         """
#         self.agent_id = agent_id or Config.ELEVENLABS_AGENT_ID if hasattr(Config, 'ELEVENLABS_AGENT_ID') else None
#         self.api_key = api_key or Config.ELEVENLABS_API_KEY
        
#         if not self.agent_id:
#             raise ValueError("ELEVENLABS_AGENT_ID is required. Set it in config or bot_config.")
#         if not self.api_key:
#             raise ValueError("ELEVENLABS_API_KEY is required. Set it in config.")
        
#         self.websocket: Optional[websockets.WebSocketClientProtocol] = None
#         self.keep_alive_task: Optional[asyncio.Task] = None
#         self.running = False
#         self.connected = False
#         self.last_reconnect_time = 0  # Track last reconnection time for cooldown
#         self.last_audio_send_time = 0  # Track last audio send time for rate limiting
#         self.min_audio_interval = 0.005  # Minimum 80ms between audio chunks (12.5 chunks/second) - realistic for audio streaming
        
#         # Audio format from conversation initiation
#         self.agent_audio_format: Optional[str] = None
#         self.user_audio_format: Optional[str] = None
#         self.conversation_id: Optional[str] = None
        
#         # Message handlers - support both sync and async callbacks
#         self.on_audio_received: Optional[Callable] = None
#         self.on_transcript_received: Optional[Callable] = None
#         self.on_response_received: Optional[Callable] = None
#         self.on_vad_score_received: Optional[Callable] = None  # New handler for VAD scores
#         self.on_interruption_received: Optional[Callable] = None  # Handler for interruption events
#         self.on_tool_request_received: Optional[Callable] = None  # Handler for tool requests (e.g., end_call)
#         self.on_connection_closed: Optional[Callable] = None  # Handler for connection closed events (to detect terminal reasons)
#         self.on_automated_greeting_detected: Optional[Callable] = None  # Handler for automated greeting/voicemail detection
#         self.on_tool_abandoned: Optional[Callable] = None  # Handler for tool execution abandonment (e.g., end_call abandoned)
#         self.on_conversation_id_received: Optional[Callable] = None  # Handler for when conversation_id is received (to store in CallSession)
        
#         Log.info(f"🎙️ ElevenLabs WebSocket service initialized - Agent ID: {self.agent_id[:20]}...")
    
#     async def get_signed_url(self) -> Optional[str]:
#         """Get signed URL for private agent (if needed)."""
#         if not self.api_key:
#             return None
        
#         try:
#             import aiohttp
#         except ImportError:
#             Log.error("aiohttp not installed. Install it with: pip install aiohttp")
#             return None
        
#         try:
#             url = f"https://api.elevenlabs.io/v1/convai/conversation/get-signed-url?agent_id={self.agent_id}"
#             headers = {"xi-api-key": self.api_key}
            
#             # Use connection pooling to avoid connection exhaustion in concurrent calls
#             connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
#             timeout = aiohttp.ClientTimeout(total=10.0)
#             async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
#                 async with session.get(url, headers=headers) as response:
#                     if response.status == 200:
#                         data = await response.json()
#                         return data.get("signed_url")
#                     else:
#                         Log.error(f"Failed to get signed URL: {response.status}")
#                         return None
#         except Exception as e:
#             Log.error(f"Error getting signed URL: {e}")
#             return None
    
#     def get_websocket_url(self) -> str:
#         """Get WebSocket URL for ElevenLabs. Includes conversation_id if resuming existing conversation."""
#         if self.conversation_id:
#             return (
#                 f"wss://api.elevenlabs.io/v1/convai/conversation"
#                 f"?agent_id={self.agent_id}"
#                 f"&conversation_id={self.conversation_id}"
#             )
#         return f"wss://api.elevenlabs.io/v1/convai/conversation?agent_id={self.agent_id}"
    
#     async def connect(self, use_signed_url: bool = False, max_retries: int = 3):
#         """
#         Connect to ElevenLabs WebSocket with timeout and retry logic.
        
#         Args:
#             use_signed_url: Whether to use signed URL (for private agents)
#             max_retries: Maximum number of connection retry attempts (default: 3)
#         """
#         url = self.get_websocket_url()
#         headers = {"xi-api-key": self.api_key}
        
#         # Log whether resuming existing conversation or starting new one
#         if self.conversation_id:
#             Log.info(f"🔁 Resuming ElevenLabs conversation: {self.conversation_id[:20]}...")
#         else:
#             Log.info("🆕 Starting NEW ElevenLabs conversation")
        
#         # Increased timeout from 10s to 20s for better reliability
#         connection_timeout = 20.0
#         base_delay = 1.0  # Start with 1 second delay
        
#         for attempt in range(1, max_retries + 1):
#             try:
#                 if attempt > 1:
#                     delay = base_delay * (2 ** (attempt - 2))  # Exponential backoff: 1s, 2s, 4s
#                     Log.info(f"🔄 Retrying connection (attempt {attempt}/{max_retries}) after {delay:.1f}s delay...")
#                     await asyncio.sleep(delay)
                
#                 Log.info(f"Connecting to ElevenLabs WebSocket: {url[:80]}... (attempt {attempt}/{max_retries})")
                
#                 # Add timeout to prevent blocking on concurrent calls
#                 # Use asyncio.wait_for to ensure connection doesn't hang
#                 # For websockets 10.0-13.x, use extra_headers (list of tuples)
#                 # Convert dict to list of tuples: {"key": "value"} -> [("key", "value")]
#                 extra_headers_list = [(k, v) for k, v in headers.items()]
#                 self.websocket = await asyncio.wait_for(
#                     websockets.connect(
#                         url, 
#                         extra_headers=extra_headers_list,
#                         ping_interval=20,  # Send ping every 20 seconds
#                         ping_timeout=10,  # Wait 10 seconds for pong
#                         close_timeout=10  # Wait 10 seconds for close
#                     ),
#                     timeout=connection_timeout  # Increased to 20 seconds
#                 )
#                 self.running = True
#                 self.connected = True
#                 Log.info(f"✅ Connected to ElevenLabs WebSocket successfully (attempt {attempt})")
                
#                 # Start keep-alive task
#                 self.keep_alive_task = asyncio.create_task(self._keep_alive())
#                 return  # Success - exit retry loop
                
#             except asyncio.TimeoutError:
#                 if attempt < max_retries:
#                     Log.warning(f"⚠️ Connection timeout ({connection_timeout}s) on attempt {attempt}/{max_retries}, will retry...")
#                     continue
#                 else:
#                     error_msg = f"❌ Failed to connect to ElevenLabs WebSocket: Connection timeout ({connection_timeout}s) after {max_retries} attempts"
#                     Log.error(error_msg)
#                     self.connected = False
#                     raise Exception(error_msg)
#             except Exception as e:
#                 if attempt < max_retries:
#                     import traceback
#                     Log.warning(f"⚠️ Connection error on attempt {attempt}/{max_retries}: {e}")
#                     Log.debug(f"Traceback: {traceback.format_exc()}")
#                     continue
#                 else:
#                     import traceback
#                     error_msg = f"❌ Failed to connect to ElevenLabs WebSocket after {max_retries} attempts: {e}"
#                     Log.error(error_msg)
#                     Log.error(f"Full traceback: {traceback.format_exc()}")
#                     self.connected = False
#                     raise Exception(error_msg) from e
        
#         # Should not reach here, but just in case
#         self.connected = False
#         raise Exception(f"❌ Failed to connect to ElevenLabs WebSocket after {max_retries} attempts")
    
#     async def _keep_alive(self):
#         """Send keep-alive messages every 5 seconds using user_activity to prevent idle timeouts without interfering with bot speech."""
#         consecutive_failures = 0
#         max_consecutive_failures = 5  # Increased to allow more retries
#         last_success_time = 0
        
#         while self.running:
#             try:
#                 await asyncio.sleep(5)  # 5 seconds: balanced interval that prevents timeouts without interfering with conversation flow
                
#                 # Double-check all conditions before sending (prevent zombie messages)
#                 if not self.running:
#                     break
                
#                 if not self.websocket:
#                     consecutive_failures += 1
#                     if consecutive_failures >= max_consecutive_failures:
#                         Log.warning("⚠️ Keep-alive: WebSocket is None, stopping task")
#                         break
#                     continue
                
#                 # CRITICAL FIX: Check actual WebSocket state, not just flag
#                 # The flag might be out of sync, but WebSocket might still be open
#                 if not self.is_connected():
#                     consecutive_failures += 1
#                     # Don't stop immediately - connection might recover
#                     if consecutive_failures >= max_consecutive_failures:
#                         Log.warning("⚠️ Keep-alive: connection not connected after multiple checks, stopping task")
#                         break
#                     continue
                
#                 # Reset failure counter on successful check
#                 consecutive_failures = 0
                
#                 try:
#                     # Send user_activity as keep-alive (proper JSON message)
#                     # This is better than sending raw text which causes 1008 errors
#                     await self.send_user_activity()
#                     last_success_time = time.time()
#                     # Log every successful keep-alive (reduced frequency to avoid spam)
#                     # Log every 25 seconds (5 keep-alive cycles)
#                     if int(time.time()) % 25 == 0:
#                         Log.debug(f"💓 Keep-alive sent successfully (connection active)")
#                 except websockets.exceptions.ConnectionClosed as e:
#                     # Connection is actually closed - stop keep-alive
#                     close_code = e.code if hasattr(e, 'code') else 'unknown'
#                     close_reason = e.reason if hasattr(e, 'reason') and e.reason else 'No reason'
#                     Log.warning(f"⚠️ Keep-alive: connection closed during send (Code: {close_code}, Reason: {close_reason})")
#                     self.connected = False
#                     break
#                 except Exception as e:
#                     # Transient error - log but continue trying
#                     consecutive_failures += 1
#                     # Only log if we still think we're connected (might be transient)
#                     if self.is_connected() and self.running:
#                         Log.warning(f"⚠️ Keep-alive error (attempt {consecutive_failures}/{max_consecutive_failures}): {e}")
                    
#                     # Only stop if too many consecutive failures AND connection is actually closed
#                     if consecutive_failures >= max_consecutive_failures and not self.is_connected():
#                         Log.error(f"❌ Keep-alive: too many consecutive failures, stopping task")
#                         break
#                     # Continue loop to retry on next iteration
#                     continue
                    
#             except asyncio.CancelledError:
#                 # Task was cancelled, exit gracefully
#                 break
#             except Exception as e:
#                 # Unexpected error - log but continue trying
#                 consecutive_failures += 1
#                 if self.running:
#                     Log.warning(f"⚠️ Keep-alive unexpected error (attempt {consecutive_failures}/{max_consecutive_failures}): {e}")
                
#                 # Only stop if too many consecutive failures AND connection is actually closed
#                 if consecutive_failures >= max_consecutive_failures and (not self.websocket or not self.is_connected()):
#                     Log.error(f"❌ Keep-alive: too many consecutive failures, stopping task")
#                     break
#                 # Continue loop to retry
#                 continue
    
#     async def send_conversation_initiation(
#         self, 
#         custom_llm_extra_body: Optional[dict] = None, 
#         text_only: bool = False,
#         dynamic_variables: Optional[dict] = None
#     ):
#         """
#         Send conversation initiation message.
        
#         Args:
#             custom_llm_extra_body: Custom LLM configuration
#             text_only: Whether to use text-only mode (False for audio)
#             dynamic_variables: Dynamic variables for the conversation
#         """
#         message = {
#             "type": "conversation_initiation_client_data"
#         }
        
#         conversation_config = {
#             "agent": {
#                 "output_audio_format": "ulaw_8000",
#                 "vad": {
#                     "enable": True,
#                     # "threshold": 0.38,  # Balanced: slight reduction from 0.4 to improve latency, but maintains noise filtering
#                     # "prefix_padding_ms": 250,  # Balanced: reduced from 300ms but not too aggressive (was 300ms)
#                     # "silence_duration_ms": 250  # Balanced: reduced from 300ms but maintains quality (was 300ms)

#                     "threshold": 0.5,
#                     "prefix_padding_ms": 300,
#                     "silence_duration_ms": 300
#                 }
#             },
#             "tts": {
#                 "output_audio_format": "ulaw_8000"
#             }
#         }
        
#         if text_only:
#             if "agent" not in conversation_config:
#                 conversation_config["agent"] = {}
#             conversation_config["agent"]["text_only"] = True
            
#         message["conversation_config_override"] = conversation_config
        
#         if custom_llm_extra_body:
#             message["custom_llm_extra_body"] = custom_llm_extra_body
        
#         if dynamic_variables:
#             message["dynamic_variables"] = dynamic_variables
        
#         await self._send_message(message)
#         Log.info("📤 Sent conversation initiation to ElevenLabs")
    
#     async def send_audio_chunk(self, audio_data: bytes):
#         """
#         Send audio chunk to ElevenLabs with rate limiting to prevent 1008 policy violations.
        
#         Args:
#             audio_data: Raw audio bytes (PCM format expected)
#         """
#         # Check if connection is still open before processing
#         if not self.connected or not self.websocket:
#             return  # Silently skip if connection is closed
        
#         try:
#             import time
#             current_time = time.time()
#             time_since_last_send = current_time - self.last_audio_send_time
            
#             # Enforce minimum interval between audio chunks (80ms = 12.5 chunks/second)
#             # This prevents overwhelming the connection and rate limiting
#             min_interval = 0.005  # 80ms - realistic for audio streaming
#             if time_since_last_send < min_interval:
#                 # Wait to maintain proper rate
#                 sleep_time = min_interval - time_since_last_send
#                 await asyncio.sleep(sleep_time)
#                 # Update time after sleep
#                 self.last_audio_send_time = time.time()
#             else:
#                 self.last_audio_send_time = current_time
            
#             # Convert audio bytes to base64
#             audio_base64 = base64.b64encode(audio_data).decode('utf-8')
            
#             # ElevenLabs expects the message format: {"type": "user_audio_chunk", "user_audio_chunk": base64_string}
#             message = {
#                 "type": "user_audio_chunk",
#                 "user_audio_chunk": audio_base64
#             }
#             await self._send_message(message, silent=True)
#         except Exception:
#             pass
    
#     async def send_client_tool_result(self, tool_call_id: str, result: str, is_error: bool = False):
#         """Send tool result to ElevenLabs."""
#         message = {
#             "type": "client_tool_result",
#             "tool_call_id": tool_call_id,
#             "result": result,
#             "is_error": is_error
#         }
#         await self._send_message(message)
    
#     async def send_user_activity(self):
#         """Send user_activity message as keep-alive."""
#         # CRITICAL FIX: Check actual WebSocket state, not just flag
#         # The flag might be out of sync, but WebSocket might still be open
#         if not self.running or not self.websocket:
#             return
#         # Use is_connected() to check actual state, not just self.connected flag
#         if not self.is_connected():
#             return
#         message = {
#             "type": "user_activity"
#         }
#         await self._send_message(message, silent=True)

    
#     async def send_interruption(self) -> bool:
#         if not self.connected or not self.websocket:
#             return False
#         try:
#             message = {
#                 "type": "interrupt",
#                 "event_id": f"interrupt_{int(time.time())}"
#             }
#             await self._send_message(message)
#             return True
#         except Exception:
#             return False

#     async def _send_message(self, message: dict, silent: bool = False):
#         if not self.websocket:
#             return
#         try:
#             message_json = json.dumps(message)
#             await self.websocket.send(message_json)
#         except websockets.exceptions.ConnectionClosed as e:
#             # Connection is actually closed - set flag
#             self.connected = False
#             if not silent:
#                 Log.warning(f"⚠️ WebSocket connection closed while sending: {e.code} - {e.reason}")
#         except websockets.exceptions.InvalidState as e:
#             # WebSocket is in invalid state (closed, closing, etc.)
#             self.connected = False
#             if not silent:
#                 Log.warning(f"⚠️ WebSocket in invalid state: {e}")
#         except Exception as e:
#             # Only set connected=False for definitive connection errors
#             # Be more specific to avoid false positives from transient errors
#             error_str = str(e).lower()
#             error_type = type(e).__name__.lower()
            
#             # Check for definitive connection errors
#             is_connection_error = (
#                 isinstance(e, (ConnectionError, OSError)) or
#                 "websocket" in error_str or
#                 error_type in ["connectionerror", "oserror"]
#             )
            
#             # Only mark as disconnected if it's a real connection error
#             if is_connection_error and any(phrase in error_str for phrase in [
#                 "connection closed",
#                 "connection lost",
#                 "not connected",
#                 "websocket is closed",
#                 "websocket is closing"
#             ]):
#                 self.connected = False
#                 if not silent:
#                     Log.warning(f"⚠️ Connection error detected: {e}")
#             # For other errors (encoding, network timeouts, etc.), don't mark as disconnected
#             # The connection might still be valid - let is_connected() check actual state
    
#     async def receive_messages(self, on_message: Callable[[dict], None]):
#         """
#         Receive messages from ElevenLabs WebSocket.
        
#         Args:
#             on_message: Callback function to handle received messages
#         """
#         if not self.websocket:
#             raise Exception("WebSocket not connected")
        
#         try:
#             async for message in self.websocket:
#                 try:
#                     data = json.loads(message)
#                     msg_type = data.get("type", "unknown")
                    
#                     # Handle ping messages
#                     if msg_type == "ping":
#                         ping_event = data.get("ping_event", {})
#                         event_id = ping_event.get("event_id")
#                         if event_id:
#                                 pong_msg = {"type": "pong", "event_id": event_id}
#                                 await self._send_message(pong_msg, silent=True)
#                         continue
                        
#                     if asyncio.iscoroutinefunction(on_message):
#                         await on_message(data)
#                     else:
#                         on_message(data)
#                 except Exception as e:
#                     import traceback
#                     Log.error(f"❌ Error processing message: {e}")
#                     Log.error(f"Message data: {data}")
#                     Log.error(f"Full traceback: {traceback.format_exc()}")
            
#             # Loop ended naturally (WebSocket closed cleanly)
#             # Check if we can get close reason from WebSocket state
#             if self.websocket:
#                 try:
#                     close_code = getattr(self.websocket, 'close_code', None)
#                     close_reason = getattr(self.websocket, 'close_reason', None) or 'Connection closed cleanly'
#                     if close_code is not None or close_reason:
#                         # Log the raw close reason for debugging
#                         Log.warning(f"⚠️ ElevenLabs WebSocket connection closed - Code: {close_code}, Reason: '{close_reason}'")
#                         self.connected = False
                        
#                         # Check if this is a terminal call end reason
#                         is_terminal_reason = self._is_terminal_call_end_reason(close_reason)
#                         if is_terminal_reason:
#                             Log.info(f"🛑 Call ended legitimately (ElevenLabs reason: '{close_reason}') - reconnections disabled")
#                         else:
#                             # Log non-terminal reasons for debugging
#                             Log.debug(f"⚠️ Non-terminal close reason: '{close_reason}' - reconnection may be allowed")
                        
#                         # Notify callback if set
#                         if self.on_connection_closed:
#                             try:
#                                 if asyncio.iscoroutinefunction(self.on_connection_closed):
#                                     await self.on_connection_closed(close_code or 1000, close_reason, is_terminal_reason)
#                                 else:
#                                     self.on_connection_closed(close_code or 1000, close_reason, is_terminal_reason)
#                             except Exception as callback_error:
#                                 Log.error(f"❌ Error in connection_closed callback: {callback_error}")
#                 except Exception as e:
#                     Log.debug(f"Could not get close reason from WebSocket: {e}")
#                     # Still mark as disconnected
#                     self.connected = False
#                     # Try to notify callback with unknown reason
#                     if self.on_connection_closed:
#                         try:
#                             if asyncio.iscoroutinefunction(self.on_connection_closed):
#                                 await self.on_connection_closed(1000, 'Connection closed (reason unknown)', False)
#                             else:
#                                 self.on_connection_closed(1000, 'Connection closed (reason unknown)', False)
#                         except Exception:
#                             pass
                    
#         except websockets.exceptions.ConnectionClosed as e:
#             self.connected = False
#             # Log closure reason to help diagnose
#             close_code = e.code if hasattr(e, 'code') else 'unknown'
#             close_reason = e.reason if hasattr(e, 'reason') and e.reason else 'No reason provided'
            
#             # Log the raw close reason for debugging
#             Log.warning(f"⚠️ ElevenLabs WebSocket connection closed - Code: {close_code}, Reason: '{close_reason}'")
            
#             # Check if this is a terminal call end reason (should not reconnect)
#             is_terminal_reason = self._is_terminal_call_end_reason(close_reason)
#             if is_terminal_reason:
#                 Log.info(f"🛑 Call ended legitimately (ElevenLabs reason: '{close_reason}') - reconnections disabled")
#             else:
#                 # Log non-terminal reasons for debugging
#                 Log.debug(f"⚠️ Non-terminal close reason: '{close_reason}' - reconnection may be allowed")
            
#             # Notify callback if set (call_session can mark call_ended)
#             if self.on_connection_closed:
#                 try:
#                     if asyncio.iscoroutinefunction(self.on_connection_closed):
#                         await self.on_connection_closed(close_code, close_reason, is_terminal_reason)
#                     else:
#                         self.on_connection_closed(close_code, close_reason, is_terminal_reason)
#                 except Exception as callback_error:
#                     Log.error(f"❌ Error in connection_closed callback: {callback_error}")
            
#             # Log if keep-alive was active
#             if self.keep_alive_task and not self.keep_alive_task.done():
#                 Log.warning(f"⚠️ Connection closed while keep-alive was active - may indicate server-side closure")
#         except Exception as e:
#             Log.error(f"❌ Error receiving messages: {e}")
#             import traceback
#             Log.error(f"Traceback: {traceback.format_exc()}")
#             self.connected = False
#         finally:
#             self.running = False
#             Log.info("ElevenLabs receive_messages task ended")

#     async def handle_message(self, message: dict):
#         msg_type = message.get("type", "unknown")
        
#         if msg_type == "conversation_initiation_metadata":
#             metadata_event = message.get("conversation_initiation_metadata_event", {})
#             self.conversation_id = metadata_event.get('conversation_id')
#             if self.conversation_id:
#                 Log.info(f"📝 Received conversation_id: {self.conversation_id[:20]}... (will reuse on reconnect)")
#                 # Notify CallSession to store conversation_id for safety
#                 if self.on_conversation_id_received:
#                     try:
#                         if asyncio.iscoroutinefunction(self.on_conversation_id_received):
#                             await self.on_conversation_id_received(self.conversation_id)
#                         else:
#                             self.on_conversation_id_received(self.conversation_id)
#                     except Exception as callback_error:
#                         Log.error(f"❌ Error in conversation_id_received callback: {callback_error}")
#             # Get actual format from ElevenLabs (could be pcm_16000, mulaw_8000, ulaw_8000, etc.)
#             self.agent_audio_format = metadata_event.get('agent_output_audio_format', 'mulaw_8000')
#             self.user_audio_format = metadata_event.get('user_input_audio_format', 'pcm_8000')
        
#         elif msg_type == "user_transcript":
#             text = message.get("user_transcription_event", {})
#             Log.info(f"🎤 [User Transcript]: {text}")
#             if self.on_transcript_received:
#                 # Handle both sync and async callbacks
#                 if asyncio.iscoroutinefunction(self.on_transcript_received):
#                     await self.on_transcript_received(text)
#                 else:
#                     self.on_transcript_received(text)     
#         elif msg_type == "agent_response":
#             text = message.get("agent_response_event", {}).get("agent_response", "")
#             Log.info(f"🤖 [Agent Response]: {text}")
#             if self.on_response_received:
#                 # Handle both sync and async callbacks
#                 if asyncio.iscoroutinefunction(self.on_response_received):
#                     await self.on_response_received(text)
#                 else:
#                     self.on_response_received(text)
        
#         elif msg_type == "audio":
#             audio_base64 = message.get("audio_event", {}).get("audio_base_64", "")
            
#             if audio_base64:
#                 audio_bytes = base64.b64decode(audio_base64)
#                 if self.on_audio_received:
#                     if asyncio.iscoroutinefunction(self.on_audio_received):
#                         await self.on_audio_received(audio_bytes)
#                     else:
#                         self.on_audio_received(audio_bytes)
        
#         elif msg_type == "interruption":
#             event_id = message.get("interruption_event", {}).get("event_id", "")
#             if self.on_interruption_received:
#                 if asyncio.iscoroutinefunction(self.on_interruption_received):
#                     await self.on_interruption_received(event_id)   
#                 else:
#                     self.on_interruption_received(event_id)
        
#         elif msg_type == "client_tool_call":
#             tool_call = message.get("client_tool_call", {})
#             tool_name = tool_call.get("tool_name", "")
#             tool_call_id = tool_call.get("tool_call_id", "")
#             # Parameters logged only if needed for debugging
        
#         elif msg_type == "contextual_update":
#             text = message.get("text", "")
#             Log.info(f"📝 [Contextual Update]: {text}")
            
#             # Check if contextual update contains automated greeting/voicemail detection
#             if text and "automated greeting detected" in text.lower():
#                 # Extract the greeting text from the message
#                 # Format: "Automated greeting detected: '...'" or "Automated greeting detected: ..."
#                 import re
#                 # Try to match quoted text first
#                 match = re.search(r"automated greeting detected:\s*['\"](.*?)['\"]", text, re.IGNORECASE | re.DOTALL)
#                 if not match:
#                     # If no quotes, try to match everything after the colon
#                     match = re.search(r"automated greeting detected:\s*(.+)", text, re.IGNORECASE | re.DOTALL)
                
#                 if match:
#                     greeting_text = match.group(1).strip()
#                     Log.warning(f"📞 [Automated Greeting Detected via Contextual Update]: {greeting_text}")
                    
#                     # Check if this is a voicemail greeting
#                     greeting_lower = greeting_text.lower()
#                     voicemail_patterns = [
#                         'voicemail',
#                         'leave a message',
#                         'record your message',
#                         'not available',
#                         'at the tone',
#                         'please record',
#                         'after the tone',
#                         'finished recording',
#                         'person you are trying to reach',
#                         'call has been forwarded to voice mail',
#                         'forwarded to voice mail',
#                         'your call has been forwarded'
#                     ]
                    
#                     if any(pattern in greeting_lower for pattern in voicemail_patterns):
#                         Log.info(f"🛑 Voicemail detected via contextual update - marking call as ended")
#                         # Notify callback if set (call_session can mark call_ended)
#                         if self.on_automated_greeting_detected:
#                             try:
#                                 if asyncio.iscoroutinefunction(self.on_automated_greeting_detected):
#                                     await self.on_automated_greeting_detected(greeting_text, is_voicemail=True)
#                                 else:
#                                     self.on_automated_greeting_detected(greeting_text, is_voicemail=True)
#                             except Exception as callback_error:
#                                 Log.error(f"❌ Error in automated_greeting_detected callback: {callback_error}")
        
#         elif msg_type == "vad_score":
#             score = message.get("vad_score_event", {}).get("vad_score", 0.0)
#             if self.on_vad_score_received:
#                 # Handle both sync and async callbacks
#                 if asyncio.iscoroutinefunction(self.on_vad_score_received):
#                     await self.on_vad_score_received(score)
#                 else:
#                     self.on_vad_score_received(score)

#         elif msg_type == "agent_tool_request":
#             tool_request = message.get("agent_tool_request", {})
#             tool_name = tool_request.get("tool_name", "")
#             tool_call_id = tool_request.get("tool_call_id", "")
#             tool_type = tool_request.get("tool_type", "")
#             event_id = tool_request.get("event_id", "N/A")
#             tool_params = tool_request.get("parameters", {})
            
#             Log.info(f"🔧 [Agent Tool Request]: {tool_name} (id: {tool_call_id}, type: {tool_type}, event_id: {event_id})")
#             Log.info(f"   📋 Tool Parameters received from ElevenLabs: {tool_params}")
#             Log.info(f"   📦 Full tool request: {tool_request}")
            
#             # Notify callback if set
#             if self.on_tool_request_received:
#                 if asyncio.iscoroutinefunction(self.on_tool_request_received):
#                     await self.on_tool_request_received(tool_name, tool_call_id, tool_request)
#                 else:
#                     self.on_tool_request_received(tool_name, tool_call_id, tool_request)

#         elif msg_type == "agent_response_metadata":
#             # Handle agent response metadata messages
#             metadata_event = message.get("agent_response_metadata_event", {})
#             event_id = metadata_event.get("event_id", "")
#             metadata = metadata_event.get("metadata", {})
#             Log.debug(f"📊 [Agent Response Metadata] (event_id: {event_id}): {metadata}")
#             # Note: Metadata may contain useful information about the response
#             # Can be used for analytics or response tracking
        
#         elif msg_type == "agent_response_correction":
#             # Handle agent response correction messages
#             correction_event = message.get("agent_response_correction_event", {})
#             corrected_text = correction_event.get("corrected_agent_response", "")
#             original_text = correction_event.get("original_agent_response", "")
#             event_id = correction_event.get("event_id", "")
#             if corrected_text:
#                 Log.debug(f"🔧 [Agent Response Correction] (event_id: {event_id}): '{original_text}' -> '{corrected_text}'")
#             else:
#                 Log.debug(f"🔧 [Agent Response Correction] (event_id: {event_id})")
#             # Note: These corrections are typically handled automatically by ElevenLabs
#             # No action needed unless we want to track corrections for analytics
        
#         elif msg_type == "agent_tool_response":
#             # Handle agent tool response messages
#             tool_response = message.get("agent_tool_response", {})
#             tool_call_id = tool_response.get("tool_call_id", "")
#             result = tool_response.get("result", "")
#             is_error = tool_response.get("is_error", False)
            
#             # Check if tool execution was abandoned (user interrupted)
#             result_lower = result.lower() if result else ""
#             is_abandoned = (
#                 "abandoned" in result_lower or
#                 "tool execution was abandoned" in result_lower or
#                 "failed to end the call" in result_lower
#             )
            
#             if is_abandoned:
#                 Log.warning(f"⚠️ [Tool Execution Abandoned] (tool_call_id: {tool_call_id}): {result[:200] if result else 'No result'}")
#                 # Notify callback if set (call_session can reset call_ended if it was end_call)
#                 if self.on_tool_abandoned:
#                     try:
#                         if asyncio.iscoroutinefunction(self.on_tool_abandoned):
#                             await self.on_tool_abandoned(tool_call_id, result)
#                         else:
#                             self.on_tool_abandoned(tool_call_id, result)
#                     except Exception as callback_error:
#                         Log.error(f"❌ Error in tool_abandoned callback: {callback_error}")
#             else:
#                 Log.debug(f"🔧 [Agent Tool Response] (tool_call_id: {tool_call_id}, is_error: {is_error}): {result[:100] if result else 'No result'}")
#             # Note: These are responses from ElevenLabs about tool execution
#             # Typically no action needed unless we want to track tool execution

#         elif msg_type == "automated_greeting_detected" or msg_type == "automated_greeting":
#             # Handle automated greeting detection (voicemail detection)
#             greeting_event = message.get("automated_greeting_detected_event", {}) or message.get("automated_greeting_event", {})
#             greeting_text = greeting_event.get("greeting_text", "") or greeting_event.get("text", "") or str(greeting_event)
            
#             Log.warning(f"📞 [Automated Greeting Detected]: {greeting_text}")
            
#             # Check if this is a voicemail greeting
#             if greeting_text:
#                 greeting_lower = greeting_text.lower()
#                 voicemail_patterns = [
#                     'voicemail',
#                     'leave a message',
#                     'record your message',
#                     'not available',
#                     'at the tone',
#                     'please record',
#                     'after the tone',
#                     'finished recording',
#                     'person you are trying to reach',
#                     'call has been forwarded to voice mail',
#                     'forwarded to voice mail'
#                 ]
                
#                 if any(pattern in greeting_lower for pattern in voicemail_patterns):
#                     Log.info(f"🛑 Voicemail detected via automated greeting - marking call as ended")
#                     # Notify callback if set (call_session can mark call_ended)
#                     if self.on_automated_greeting_detected:
#                         try:
#                             if asyncio.iscoroutinefunction(self.on_automated_greeting_detected):
#                                 await self.on_automated_greeting_detected(greeting_text, is_voicemail=True)
#                             else:
#                                 self.on_automated_greeting_detected(greeting_text, is_voicemail=True)
#                         except Exception as callback_error:
#                             Log.error(f"❌ Error in automated_greeting_detected callback: {callback_error}")
        
#         elif msg_type == "error":
#             error = message.get("error", {})
#             error_str = str(error).lower()
#             Log.error(f"❌ [Error from ElevenLabs]: {error}")
            
#             # Check if error indicates terminal call end (maximum duration, silence, etc.)
#             if self._is_terminal_call_end_reason(error_str):
#                 Log.info(f"🛑 Error indicates terminal call end - reconnections disabled")
#                 # Notify callback if set
#                 if self.on_connection_closed:
#                     try:
#                         if asyncio.iscoroutinefunction(self.on_connection_closed):
#                             await self.on_connection_closed('error', error_str, True)
#                         else:
#                             self.on_connection_closed('error', error_str, True)
#                     except Exception as callback_error:
#                         Log.error(f"❌ Error in connection_closed callback: {callback_error}")

#         else:
#             # Log unknown message types only if they might be important
#             # Suppress debug logs for known but unhandled message types
#             known_unhandled_types = ["ping", "pong", "agent_chat_response_part"]
#             if msg_type not in known_unhandled_types:
#                 Log.debug(f"📨 [Unknown Message Type]: {msg_type}")
    
#     def _is_terminal_call_end_reason(self, close_reason: str) -> bool:
#         """
#         Check if the WebSocket close reason indicates a terminal call end.
#         Terminal reasons mean the call legitimately ended and should not reconnect.
        
#         Args:
#             close_reason: The close reason string from ElevenLabs (may include "How the call ended: " prefix)
            
#         Returns:
#             True if this is a terminal call end reason (should not reconnect)
#         """
#         if not close_reason:
#             return False
        
#         reason_lower = close_reason.lower()
        
#         # Remove "How the call ended:" prefix if present (ElevenLabs format)
#         if "how the call ended:" in reason_lower:
#             # Extract the actual reason after the colon
#             parts = reason_lower.split("how the call ended:", 1)
#             if len(parts) > 1:
#                 reason_lower = parts[1].strip()
        
#         # Terminal call end reasons (from ElevenLabs call overview):
#         terminal_patterns = [
#             # Client ended call variations
#             "client ended call",
#             "client ended",
#             "user ended call",
#             "user hung up",
#             "user disconnected",
            
#             # Agent ended call variations (all scenarios are terminal)
#             "agent ended call",
#             "agent ended",
#             "agent ended the call",
            
#             # Duration and silence
#             "call exceeded maximum duration",
#             "maximum duration",
#             "call ended due to silence",
#             "silence",
            
#             # Voicemail
#             "voicemail detected",
#             "voicemail",
            
#             # Generic terminal patterns
#             "call ended",
#             "call terminated",
#             "conversation ended",
#             "session ended",
#         ]
        
#         # Check if any terminal pattern matches
#         for pattern in terminal_patterns:
#             if pattern in reason_lower:
#                 return True
        
#         return False
    
#     def is_connected(self) -> bool:
#         """Check if WebSocket is connected with improved state detection."""
#         if not self.websocket:
#             return False
        
#         # Check actual WebSocket state first (most reliable)
#         try:
#             if hasattr(self.websocket, 'state'):
#                 if hasattr(self.websocket.state, 'name'):
#                     state_name = self.websocket.state.name
#                     is_open = state_name == 'OPEN'
#                     # Sync internal flag with actual state if they don't match
#                     if is_open != self.connected:
#                         self.connected = is_open
#                     return is_open
#                 # Fallback: check if it's the OPEN state constant
#                 is_open = self.websocket.state == State.OPEN
#                 if is_open != self.connected:
#                     self.connected = is_open
#                 return is_open
#         except AttributeError:
#             # State attribute doesn't exist, fall back to flag
#             return self.connected
#         except Exception as e:
#             # Unexpected error checking state, fall back to flag
#             return self.connected
        
#         # Fallback to flag if we can't check state
#         return self.connected
    
#     async def close(self):
#         self.running = False
#         self.connected = False
#         if self.keep_alive_task and not self.keep_alive_task.done():
#             self.keep_alive_task.cancel()
#             try:
#                 await self.keep_alive_task
#             except asyncio.CancelledError:
#                 pass
#             except Exception:
#                 pass
#             finally:
#                 self.keep_alive_task = None
        
#         if self.websocket:
#             try:
#                 await self.websocket.close()
#             except Exception:
#                 pass
    
"""
ElevenLabs WebSocket Service
Integrates ElevenLabs Conversational AI WebSocket API for service_type 4
Handles real-time audio streaming between Mcube and ElevenLabs
"""

import asyncio
import json
import base64
import time
import websockets
from websockets.protocol import State
from typing import Optional, Callable
from abc import ABC

from config import Config
from services.log_utils import Log


class ElevenLabsWebSocketService(ABC):
    """
    ElevenLabs WebSocket service for real-time conversational AI.
    Handles bidirectional audio streaming and message processing.
    """
    
    def __init__(self, agent_id: Optional[str] = None, api_key: Optional[str] = None):
        """
        Initialize ElevenLabs WebSocket service.
        
        Args:
            agent_id: ElevenLabs agent ID (from config or bot_config)
            api_key: ElevenLabs API key (from config)
        """
        self.agent_id = agent_id or Config.ELEVENLABS_AGENT_ID if hasattr(Config, 'ELEVENLABS_AGENT_ID') else None
        self.api_key = api_key or Config.ELEVENLABS_API_KEY
        
        if not self.agent_id:
            raise ValueError("ELEVENLABS_AGENT_ID is required. Set it in config or bot_config.")
        if not self.api_key:
            raise ValueError("ELEVENLABS_API_KEY is required. Set it in config.")
        
        self.websocket: Optional[websockets.WebSocketClientProtocol] = None
        self.keep_alive_task: Optional[asyncio.Task] = None
        self.running = False
        self.connected = False
        self.last_reconnect_time = 0  # Track last reconnection time for cooldown
        self.last_audio_send_time = 0  # Track last audio send time for rate limiting
        self.min_audio_interval = 0.005  # Minimum 80ms between audio chunks (12.5 chunks/second) - realistic for audio streaming
        
        # Audio format from conversation initiation
        self.agent_audio_format: Optional[str] = None
        self.user_audio_format: Optional[str] = None
        self.conversation_id: Optional[str] = None
        
        # Message handlers - support both sync and async callbacks
        self.on_audio_received: Optional[Callable] = None
        self.on_transcript_received: Optional[Callable] = None
        self.on_response_received: Optional[Callable] = None
        self.on_vad_score_received: Optional[Callable] = None  # New handler for VAD scores
        self.on_interruption_received: Optional[Callable] = None  # Handler for interruption events
        self.on_tool_request_received: Optional[Callable] = None  # Handler for tool requests (e.g., end_call)
        self.on_connection_closed: Optional[Callable] = None  # Handler for connection closed events (to detect terminal reasons)
        self.on_automated_greeting_detected: Optional[Callable] = None  # Handler for automated greeting/voicemail detection
        self.on_tool_abandoned: Optional[Callable] = None  # Handler for tool execution abandonment (e.g., end_call abandoned)
        self.on_conversation_id_received: Optional[Callable] = None  # Handler for when conversation_id is received (to store in CallSession)
        
        Log.info(f"🎙️ ElevenLabs WebSocket service initialized - Agent ID: {self.agent_id[:20]}...")
    
    async def get_signed_url(self) -> Optional[str]:
        """Get signed URL for private agent (if needed)."""
        if not self.api_key:
            return None
        
        try:
            import aiohttp
        except ImportError:
            Log.error("aiohttp not installed. Install it with: pip install aiohttp")
            return None
        
        try:
            url = f"https://api.elevenlabs.io/v1/convai/conversation/get-signed-url?agent_id={self.agent_id}"
            headers = {"xi-api-key": self.api_key}
            
            # Use connection pooling to avoid connection exhaustion in concurrent calls
            connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
            timeout = aiohttp.ClientTimeout(total=10.0)
            async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
                async with session.get(url, headers=headers) as response:
                    if response.status == 200:
                        data = await response.json()
                        return data.get("signed_url")
                    else:
                        Log.error(f"Failed to get signed URL: {response.status}")
                        return None
        except Exception as e:
            Log.error(f"Error getting signed URL: {e}")
            return None
    
    def get_websocket_url(self) -> str:
        """Get WebSocket URL for ElevenLabs. Includes conversation_id if resuming existing conversation."""
        if self.conversation_id:
            return (
                f"wss://api.elevenlabs.io/v1/convai/conversation"
                f"?agent_id={self.agent_id}"
                f"&conversation_id={self.conversation_id}"
            )
        return f"wss://api.elevenlabs.io/v1/convai/conversation?agent_id={self.agent_id}"
    
    async def connect(self, use_signed_url: bool = False, max_retries: int = 3):
        """
        Connect to ElevenLabs WebSocket with timeout and retry logic.
        
        Args:
            use_signed_url: Whether to use signed URL (for private agents)
            max_retries: Maximum number of connection retry attempts (default: 3)
        """
        url = self.get_websocket_url()
        headers = {"xi-api-key": self.api_key}
        
        # Log whether resuming existing conversation or starting new one
        if self.conversation_id:
            Log.info(f"🔁 Resuming ElevenLabs conversation: {self.conversation_id[:20]}...")
        else:
            Log.info("🆕 Starting NEW ElevenLabs conversation")
        
        # Increased timeout from 10s to 20s for better reliability
        connection_timeout = 20.0
        base_delay = 1.0  # Start with 1 second delay
        
        for attempt in range(1, max_retries + 1):
            try:
                if attempt > 1:
                    delay = base_delay * (2 ** (attempt - 2))  # Exponential backoff: 1s, 2s, 4s
                    Log.info(f"🔄 Retrying connection (attempt {attempt}/{max_retries}) after {delay:.1f}s delay...")
                    await asyncio.sleep(delay)
                
                # Log full URL if conversation_id is present, otherwise truncate
                if self.conversation_id:
                    Log.info(f"Connecting to ElevenLabs WebSocket: {url} (attempt {attempt}/{max_retries})")
                else:
                    Log.info(f"Connecting to ElevenLabs WebSocket: {url[:80]}... (attempt {attempt}/{max_retries})")
                
                # Add timeout to prevent blocking on concurrent calls
                # Use asyncio.wait_for to ensure connection doesn't hang
                self.websocket = await asyncio.wait_for(
                    websockets.connect(
                        url, 
                        additional_headers=headers,
                        ping_interval=20,  # Send ping every 20 seconds
                        ping_timeout=10,  # Wait 10 seconds for pong
                        close_timeout=10  # Wait 10 seconds for close
                    ),
                    timeout=connection_timeout  # Increased to 20 seconds
                )
                self.running = True
                self.connected = True
                Log.info(f"✅ Connected to ElevenLabs WebSocket successfully (attempt {attempt})")
                
                # Start keep-alive task
                self.keep_alive_task = asyncio.create_task(self._keep_alive())
                return  # Success - exit retry loop
                
            except asyncio.TimeoutError:
                if attempt < max_retries:
                    Log.warning(f"⚠️ Connection timeout ({connection_timeout}s) on attempt {attempt}/{max_retries}, will retry...")
                    continue
                else:
                    error_msg = f"❌ Failed to connect to ElevenLabs WebSocket: Connection timeout ({connection_timeout}s) after {max_retries} attempts"
                    Log.error(error_msg)
                    self.connected = False
                    raise Exception(error_msg)
            except Exception as e:
                if attempt < max_retries:
                    import traceback
                    Log.warning(f"⚠️ Connection error on attempt {attempt}/{max_retries}: {e}")
                    Log.debug(f"Traceback: {traceback.format_exc()}")
                    continue
                else:
                    import traceback
                    error_msg = f"❌ Failed to connect to ElevenLabs WebSocket after {max_retries} attempts: {e}"
                    Log.error(error_msg)
                    Log.error(f"Full traceback: {traceback.format_exc()}")
                    self.connected = False
                    raise Exception(error_msg) from e
        
        # Should not reach here, but just in case
        self.connected = False
        raise Exception(f"❌ Failed to connect to ElevenLabs WebSocket after {max_retries} attempts")
    
    async def _keep_alive(self):
        """Send keep-alive messages every 5 seconds using user_activity to prevent idle timeouts without interfering with bot speech."""
        consecutive_failures = 0
        max_consecutive_failures = 5  # Increased to allow more retries
        last_success_time = 0
        
        while self.running:
            try:
                await asyncio.sleep(5)  # 5 seconds: balanced interval that prevents timeouts without interfering with conversation flow
                
                # Double-check all conditions before sending (prevent zombie messages)
                if not self.running:
                    break
                
                if not self.websocket:
                    consecutive_failures += 1
                    if consecutive_failures >= max_consecutive_failures:
                        Log.warning("⚠️ Keep-alive: WebSocket is None, stopping task")
                        break
                    continue
                
                # CRITICAL FIX: Check actual WebSocket state, not just flag
                # The flag might be out of sync, but WebSocket might still be open
                if not self.is_connected():
                    consecutive_failures += 1
                    # Don't stop immediately - connection might recover
                    if consecutive_failures >= max_consecutive_failures:
                        Log.warning("⚠️ Keep-alive: connection not connected after multiple checks, stopping task")
                        break
                    continue
                
                # Reset failure counter on successful check
                consecutive_failures = 0
                
                try:
                    # Send user_activity as keep-alive (proper JSON message)
                    # This is better than sending raw text which causes 1008 errors
                    await self.send_user_activity()
                    last_success_time = time.time()
                    # Log every successful keep-alive (reduced frequency to avoid spam)
                    # Log every 25 seconds (5 keep-alive cycles)
                    if int(time.time()) % 25 == 0:
                        Log.debug(f"💓 Keep-alive sent successfully (connection active)")
                except websockets.exceptions.ConnectionClosed as e:
                    # Connection is actually closed - stop keep-alive
                    close_code = e.code if hasattr(e, 'code') else 'unknown'
                    close_reason = e.reason if hasattr(e, 'reason') and e.reason else 'No reason'
                    Log.warning(f"⚠️ Keep-alive: connection closed during send (Code: {close_code}, Reason: {close_reason})")
                    self.connected = False
                    break
                except Exception as e:
                    # Transient error - log but continue trying
                    consecutive_failures += 1
                    # Only log if we still think we're connected (might be transient)
                    if self.is_connected() and self.running:
                        Log.warning(f"⚠️ Keep-alive error (attempt {consecutive_failures}/{max_consecutive_failures}): {e}")
                    
                    # Only stop if too many consecutive failures AND connection is actually closed
                    if consecutive_failures >= max_consecutive_failures and not self.is_connected():
                        Log.error(f"❌ Keep-alive: too many consecutive failures, stopping task")
                        break
                    # Continue loop to retry on next iteration
                    continue
                    
            except asyncio.CancelledError:
                # Task was cancelled, exit gracefully
                break
            except Exception as e:
                # Unexpected error - log but continue trying
                consecutive_failures += 1
                if self.running:
                    Log.warning(f"⚠️ Keep-alive unexpected error (attempt {consecutive_failures}/{max_consecutive_failures}): {e}")
                
                # Only stop if too many consecutive failures AND connection is actually closed
                if consecutive_failures >= max_consecutive_failures and (not self.websocket or not self.is_connected()):
                    Log.error(f"❌ Keep-alive: too many consecutive failures, stopping task")
                    break
                # Continue loop to retry
                continue
    
    async def send_conversation_initiation(
        self, 
        custom_llm_extra_body: Optional[dict] = None, 
        text_only: bool = False,
        dynamic_variables: Optional[dict] = None
    ):
        """
        Send conversation initiation message.
        
        Args:
            custom_llm_extra_body: Custom LLM configuration
            text_only: Whether to use text-only mode (False for audio)
            dynamic_variables: Dynamic variables for the conversation
        """
        message = {
            "type": "conversation_initiation_client_data"
        }
        
        conversation_config = {
            "agent": {
                "output_audio_format": "ulaw_8000",
                "vad": {
                    "enable": True,
                    # "threshold": 0.38,  # Balanced: slight reduction from 0.4 to improve latency, but maintains noise filtering
                    # "prefix_padding_ms": 250,  # Balanced: reduced from 300ms but not too aggressive (was 300ms)
                    # "silence_duration_ms": 250  # Balanced: reduced from 300ms but maintains quality (was 300ms)

                    "threshold": 0.5,
                    "prefix_padding_ms": 300,
                    "silence_duration_ms": 300
                }
            },
            "tts": {
                "output_audio_format": "ulaw_8000"
            }
        }
        
        if text_only:
            if "agent" not in conversation_config:
                conversation_config["agent"] = {}
            conversation_config["agent"]["text_only"] = True
            
        message["conversation_config_override"] = conversation_config
        
        if custom_llm_extra_body:
            message["custom_llm_extra_body"] = custom_llm_extra_body
        
        if dynamic_variables:
            message["dynamic_variables"] = dynamic_variables
        
        await self._send_message(message)
        Log.info("📤 Sent conversation initiation to ElevenLabs")
    
    async def send_audio_chunk(self, audio_data: bytes):
        """
        Send audio chunk to ElevenLabs with rate limiting to prevent 1008 policy violations.
        
        Args:
            audio_data: Raw audio bytes (PCM format expected)
        """
        # Check if connection is still open before processing
        if not self.connected or not self.websocket:
            return  # Silently skip if connection is closed
        
        try:
            import time
            current_time = time.time()
            time_since_last_send = current_time - self.last_audio_send_time
            
            # Enforce minimum interval between audio chunks (80ms = 12.5 chunks/second)
            # This prevents overwhelming the connection and rate limiting
            min_interval = 0.005  # 80ms - realistic for audio streaming
            if time_since_last_send < min_interval:
                # Wait to maintain proper rate
                sleep_time = min_interval - time_since_last_send
                await asyncio.sleep(sleep_time)
                # Update time after sleep
                self.last_audio_send_time = time.time()
            else:
                self.last_audio_send_time = current_time
            
            # Convert audio bytes to base64
            audio_base64 = base64.b64encode(audio_data).decode('utf-8')
            
            # ElevenLabs expects the message format: {"type": "user_audio_chunk", "user_audio_chunk": base64_string}
            message = {
                "type": "user_audio_chunk",
                "user_audio_chunk": audio_base64
            }
            await self._send_message(message, silent=True)
        except Exception:
            pass
    
    async def send_client_tool_result(self, tool_call_id: str, result: str, is_error: bool = False):
        """Send tool result to ElevenLabs."""
        message = {
            "type": "client_tool_result",
            "tool_call_id": tool_call_id,
            "result": result,
            "is_error": is_error
        }
        await self._send_message(message)
    
    async def send_user_activity(self):
        """Send user_activity message as keep-alive."""
        # CRITICAL FIX: Check actual WebSocket state, not just flag
        # The flag might be out of sync, but WebSocket might still be open
        if not self.running or not self.websocket:
            return
        # Use is_connected() to check actual state, not just self.connected flag
        if not self.is_connected():
            return
        message = {
            "type": "user_activity"
        }
        await self._send_message(message, silent=True)

    
    async def send_interruption(self) -> bool:
        if not self.connected or not self.websocket:
            return False
        try:
            message = {
                "type": "interrupt",
                "event_id": f"interrupt_{int(time.time())}"
            }
            await self._send_message(message)
            return True
        except Exception:
            return False

    async def _send_message(self, message: dict, silent: bool = False):
        if not self.websocket:
            return
        try:
            message_json = json.dumps(message)
            await self.websocket.send(message_json)
        except websockets.exceptions.ConnectionClosed as e:
            # Connection is actually closed - set flag
            self.connected = False
            if not silent:
                Log.warning(f"⚠️ WebSocket connection closed while sending: {e.code} - {e.reason}")
        except websockets.exceptions.InvalidState as e:
            # WebSocket is in invalid state (closed, closing, etc.)
            self.connected = False
            if not silent:
                Log.warning(f"⚠️ WebSocket in invalid state: {e}")
        except Exception as e:
            # Only set connected=False for definitive connection errors
            # Be more specific to avoid false positives from transient errors
            error_str = str(e).lower()
            error_type = type(e).__name__.lower()
            
            # Check for definitive connection errors
            is_connection_error = (
                isinstance(e, (ConnectionError, OSError)) or
                "websocket" in error_str or
                error_type in ["connectionerror", "oserror"]
            )
            
            # Only mark as disconnected if it's a real connection error
            if is_connection_error and any(phrase in error_str for phrase in [
                "connection closed",
                "connection lost",
                "not connected",
                "websocket is closed",
                "websocket is closing"
            ]):
                self.connected = False
                if not silent:
                    Log.warning(f"⚠️ Connection error detected: {e}")
            # For other errors (encoding, network timeouts, etc.), don't mark as disconnected
            # The connection might still be valid - let is_connected() check actual state
    
    async def receive_messages(self, on_message: Callable[[dict], None]):
        """
        Receive messages from ElevenLabs WebSocket.
        
        Args:
            on_message: Callback function to handle received messages
        """
        if not self.websocket:
            raise Exception("WebSocket not connected")
        
        try:
            async for message in self.websocket:
                try:
                    data = json.loads(message)
                    msg_type = data.get("type", "unknown")
                    
                    # Handle ping messages
                    if msg_type == "ping":
                        ping_event = data.get("ping_event", {})
                        event_id = ping_event.get("event_id")
                        if event_id:
                                pong_msg = {"type": "pong", "event_id": event_id}
                                await self._send_message(pong_msg, silent=True)
                        continue
                        
                    if asyncio.iscoroutinefunction(on_message):
                        await on_message(data)
                    else:
                        on_message(data)
                except Exception as e:
                    import traceback
                    Log.error(f"❌ Error processing message: {e}")
                    Log.error(f"Message data: {data}")
                    Log.error(f"Full traceback: {traceback.format_exc()}")
            
            # Loop ended naturally (WebSocket closed cleanly)
            # Check if we can get close reason from WebSocket state
            if self.websocket:
                try:
                    close_code = getattr(self.websocket, 'close_code', None)
                    close_reason = getattr(self.websocket, 'close_reason', None) or 'Connection closed cleanly'
                    if close_code is not None or close_reason:
                        # Log the raw close reason for debugging
                        Log.warning(f"⚠️ ElevenLabs WebSocket connection closed - Code: {close_code}, Reason: '{close_reason}'")
                        self.connected = False
                        
                        # Check if this is a terminal call end reason
                        is_terminal_reason = self._is_terminal_call_end_reason(close_reason)
                        if is_terminal_reason:
                            Log.info(f"🛑 Call ended legitimately (ElevenLabs reason: '{close_reason}') - reconnections disabled")
                        else:
                            # Log non-terminal reasons for debugging
                            Log.debug(f"⚠️ Non-terminal close reason: '{close_reason}' - reconnection may be allowed")
                        
                        # Notify callback if set
                        if self.on_connection_closed:
                            try:
                                if asyncio.iscoroutinefunction(self.on_connection_closed):
                                    await self.on_connection_closed(close_code or 1000, close_reason, is_terminal_reason)
                                else:
                                    self.on_connection_closed(close_code or 1000, close_reason, is_terminal_reason)
                            except Exception as callback_error:
                                Log.error(f"❌ Error in connection_closed callback: {callback_error}")
                except Exception as e:
                    Log.debug(f"Could not get close reason from WebSocket: {e}")
                    # Still mark as disconnected
                    self.connected = False
                    # Try to notify callback with unknown reason
                    if self.on_connection_closed:
                        try:
                            if asyncio.iscoroutinefunction(self.on_connection_closed):
                                await self.on_connection_closed(1000, 'Connection closed (reason unknown)', False)
                            else:
                                self.on_connection_closed(1000, 'Connection closed (reason unknown)', False)
                        except Exception:
                            pass
                    
        except websockets.exceptions.ConnectionClosed as e:
            self.connected = False
            # Log closure reason to help diagnose
            close_code = e.code if hasattr(e, 'code') else 'unknown'
            close_reason = e.reason if hasattr(e, 'reason') and e.reason else 'No reason provided'
            
            # Log the raw close reason for debugging
            Log.warning(f"⚠️ ElevenLabs WebSocket connection closed - Code: {close_code}, Reason: '{close_reason}'")
            
            # Check if this is a terminal call end reason (should not reconnect)
            is_terminal_reason = self._is_terminal_call_end_reason(close_reason)
            if is_terminal_reason:
                Log.info(f"🛑 Call ended legitimately (ElevenLabs reason: '{close_reason}') - reconnections disabled")
            else:
                # Log non-terminal reasons for debugging
                Log.debug(f"⚠️ Non-terminal close reason: '{close_reason}' - reconnection may be allowed")
            
            # Notify callback if set (call_session can mark call_ended)
            if self.on_connection_closed:
                try:
                    if asyncio.iscoroutinefunction(self.on_connection_closed):
                        await self.on_connection_closed(close_code, close_reason, is_terminal_reason)
                    else:
                        self.on_connection_closed(close_code, close_reason, is_terminal_reason)
                except Exception as callback_error:
                    Log.error(f"❌ Error in connection_closed callback: {callback_error}")
            
            # Log if keep-alive was active
            if self.keep_alive_task and not self.keep_alive_task.done():
                Log.warning(f"⚠️ Connection closed while keep-alive was active - may indicate server-side closure")
        except Exception as e:
            Log.error(f"❌ Error receiving messages: {e}")
            import traceback
            Log.error(f"Traceback: {traceback.format_exc()}")
            self.connected = False
        finally:
            self.running = False
            Log.info("ElevenLabs receive_messages task ended")

    async def handle_message(self, message: dict):
        msg_type = message.get("type", "unknown")
        
        if msg_type == "conversation_initiation_metadata":
            metadata_event = message.get("conversation_initiation_metadata_event", {})
            self.conversation_id = metadata_event.get('conversation_id')
            if self.conversation_id:
                Log.info(f"📝 Received conversation_id: {self.conversation_id[:20]}... (will reuse on reconnect)")
                # Notify CallSession to store conversation_id for safety
                if self.on_conversation_id_received:
                    try:
                        if asyncio.iscoroutinefunction(self.on_conversation_id_received):
                            await self.on_conversation_id_received(self.conversation_id)
                        else:
                            self.on_conversation_id_received(self.conversation_id)
                    except Exception as callback_error:
                        Log.error(f"❌ Error in conversation_id_received callback: {callback_error}")
            # Get actual format from ElevenLabs (could be pcm_16000, mulaw_8000, ulaw_8000, etc.)
            self.agent_audio_format = metadata_event.get('agent_output_audio_format', 'mulaw_8000')
            self.user_audio_format = metadata_event.get('user_input_audio_format', 'pcm_8000')
        
        elif msg_type == "user_transcript":
            text = message.get("user_transcription_event", {})
            Log.info(f"🎤 [User Transcript]: {text}")
            if self.on_transcript_received:
                # Handle both sync and async callbacks
                if asyncio.iscoroutinefunction(self.on_transcript_received):
                    await self.on_transcript_received(text)
                else:
                    self.on_transcript_received(text)     
        elif msg_type == "agent_response":
            text = message.get("agent_response_event", {}).get("agent_response", "")
            Log.info(f"🤖 [Agent Response]: {text}")
            if self.on_response_received:
                # Handle both sync and async callbacks
                if asyncio.iscoroutinefunction(self.on_response_received):
                    await self.on_response_received(text)
                else:
                    self.on_response_received(text)
        
        elif msg_type == "audio":
            audio_base64 = message.get("audio_event", {}).get("audio_base_64", "")
            
            if audio_base64:
                audio_bytes = base64.b64decode(audio_base64)
                if self.on_audio_received:
                    if asyncio.iscoroutinefunction(self.on_audio_received):
                        await self.on_audio_received(audio_bytes)
                    else:
                        self.on_audio_received(audio_bytes)
        
        elif msg_type == "interruption":
            event_id = message.get("interruption_event", {}).get("event_id", "")
            if self.on_interruption_received:
                if asyncio.iscoroutinefunction(self.on_interruption_received):
                    await self.on_interruption_received(event_id)   
                else:
                    self.on_interruption_received(event_id)
        
        elif msg_type == "client_tool_call":
            tool_call = message.get("client_tool_call", {})
            tool_name = tool_call.get("tool_name", "")
            tool_call_id = tool_call.get("tool_call_id", "")
            # Parameters logged only if needed for debugging
        
        elif msg_type == "contextual_update":
            text = message.get("text", "") or ""
            text_lower = text.lower()
            # Language-related: "Agent changed the language to IN flag Tamil" or
            # "The user is speaking in Tamil and asking to provide all the details...".
            # Log at DEBUG to reduce I/O and perceived delay; no further action needed.
            if ("agent changed the language" in text_lower or "changed the language to" in text_lower or
                "the user is speaking in" in text_lower or "user is speaking in" in text_lower or
                "in flag tamil" in text_lower):
                Log.debug(f"📝 [Contextual Update - Language]: {text[:160]}...")
            else:
                Log.info(f"📝 [Contextual Update]: {text[:200]}...")
            
            # Check if contextual update contains automated greeting/voicemail detection
            if text and "automated greeting detected" in text_lower:
                # Extract the greeting text from the message
                # Format: "Automated greeting detected: '...'" or "Automated greeting detected: ..."
                import re
                # Try to match quoted text first
                match = re.search(r"automated greeting detected:\s*['\"](.*?)['\"]", text, re.IGNORECASE | re.DOTALL)
                if not match:
                    # If no quotes, try to match everything after the colon
                    match = re.search(r"automated greeting detected:\s*(.+)", text, re.IGNORECASE | re.DOTALL)
                
                if match:
                    greeting_text = match.group(1).strip()
                    Log.warning(f"📞 [Automated Greeting Detected via Contextual Update]: {greeting_text}")
                    
                    # Check if this is a voicemail greeting
                    greeting_lower = greeting_text.lower()
                    voicemail_patterns = [
                        'voicemail',
                        'leave a message',
                        'record your message',
                        'not available',
                        'at the tone',
                        'please record',
                        'after the tone',
                        'finished recording',
                        'person you are trying to reach',
                        'call has been forwarded to voice mail',
                        'forwarded to voice mail',
                        'your call has been forwarded'
                    ]
                    
                    if any(pattern in greeting_lower for pattern in voicemail_patterns):
                        Log.info(f"🛑 Voicemail detected via contextual update - marking call as ended")
                        # Notify callback if set (call_session can mark call_ended)
                        if self.on_automated_greeting_detected:
                            try:
                                if asyncio.iscoroutinefunction(self.on_automated_greeting_detected):
                                    await self.on_automated_greeting_detected(greeting_text, is_voicemail=True)
                                else:
                                    self.on_automated_greeting_detected(greeting_text, is_voicemail=True)
                            except Exception as callback_error:
                                Log.error(f"❌ Error in automated_greeting_detected callback: {callback_error}")
        
        elif msg_type == "vad_score":
            score = message.get("vad_score_event", {}).get("vad_score", 0.0)
            if self.on_vad_score_received:
                # Handle both sync and async callbacks
                if asyncio.iscoroutinefunction(self.on_vad_score_received):
                    await self.on_vad_score_received(score)
                else:
                    self.on_vad_score_received(score)

        elif msg_type == "agent_tool_request":
            tool_request = message.get("agent_tool_request", {})
            tool_name = tool_request.get("tool_name", "")
            tool_call_id = tool_request.get("tool_call_id", "")
            tool_type = tool_request.get("tool_type", "")
            event_id = tool_request.get("event_id", "N/A")
            tool_params = tool_request.get("parameters", {})
            
            Log.info(f"🔧 [Agent Tool Request]: {tool_name} (id: {tool_call_id}, type: {tool_type}, event_id: {event_id})")
            Log.info(f"   📋 Tool Parameters received from ElevenLabs: {tool_params}")
            Log.info(f"   📦 Full tool request: {tool_request}")
            
            # Notify callback if set
            if self.on_tool_request_received:
                if asyncio.iscoroutinefunction(self.on_tool_request_received):
                    await self.on_tool_request_received(tool_name, tool_call_id, tool_request)
                else:
                    self.on_tool_request_received(tool_name, tool_call_id, tool_request)

        elif msg_type == "agent_response_metadata":
            # Handle agent response metadata messages
            metadata_event = message.get("agent_response_metadata_event", {})
            event_id = metadata_event.get("event_id", "")
            metadata = metadata_event.get("metadata", {})
            Log.debug(f"📊 [Agent Response Metadata] (event_id: {event_id}): {metadata}")
            # Note: Metadata may contain useful information about the response
            # Can be used for analytics or response tracking
        
        elif msg_type == "agent_response_correction":
            # Handle agent response correction messages
            correction_event = message.get("agent_response_correction_event", {})
            corrected_text = correction_event.get("corrected_agent_response", "")
            original_text = correction_event.get("original_agent_response", "")
            event_id = correction_event.get("event_id", "")
            if corrected_text:
                Log.debug(f"🔧 [Agent Response Correction] (event_id: {event_id}): '{original_text}' -> '{corrected_text}'")
            else:
                Log.debug(f"🔧 [Agent Response Correction] (event_id: {event_id})")
            # Note: These corrections are typically handled automatically by ElevenLabs
            # No action needed unless we want to track corrections for analytics
        
        elif msg_type == "agent_tool_response":
            # Handle agent tool response messages
            tool_response = message.get("agent_tool_response", {})
            tool_call_id = tool_response.get("tool_call_id", "")
            result = tool_response.get("result", "")
            is_error = tool_response.get("is_error", False)
            
            # Check if tool execution was abandoned (user interrupted)
            result_lower = result.lower() if result else ""
            is_abandoned = (
                "abandoned" in result_lower or
                "tool execution was abandoned" in result_lower or
                "failed to end the call" in result_lower
            )
            
            if is_abandoned:
                Log.warning(f"⚠️ [Tool Execution Abandoned] (tool_call_id: {tool_call_id}): {result[:200] if result else 'No result'}")
                # Notify callback if set (call_session can reset call_ended if it was end_call)
                if self.on_tool_abandoned:
                    try:
                        if asyncio.iscoroutinefunction(self.on_tool_abandoned):
                            await self.on_tool_abandoned(tool_call_id, result)
                        else:
                            self.on_tool_abandoned(tool_call_id, result)
                    except Exception as callback_error:
                        Log.error(f"❌ Error in tool_abandoned callback: {callback_error}")
            else:
                Log.debug(f"🔧 [Agent Tool Response] (tool_call_id: {tool_call_id}, is_error: {is_error}): {result[:100] if result else 'No result'}")
            # Note: These are responses from ElevenLabs about tool execution
            # Typically no action needed unless we want to track tool execution

        elif msg_type == "automated_greeting_detected" or msg_type == "automated_greeting":
            # Handle automated greeting detection (voicemail detection)
            greeting_event = message.get("automated_greeting_detected_event", {}) or message.get("automated_greeting_event", {})
            greeting_text = greeting_event.get("greeting_text", "") or greeting_event.get("text", "") or str(greeting_event)
            
            Log.warning(f"📞 [Automated Greeting Detected]: {greeting_text}")
            
            # Check if this is a voicemail greeting
            if greeting_text:
                greeting_lower = greeting_text.lower()
                voicemail_patterns = [
                    'voicemail',
                    'leave a message',
                    'record your message',
                    'not available',
                    'at the tone',
                    'please record',
                    'after the tone',
                    'finished recording',
                    'person you are trying to reach',
                    'call has been forwarded to voice mail',
                    'forwarded to voice mail'
                ]
                
                if any(pattern in greeting_lower for pattern in voicemail_patterns):
                    Log.info(f"🛑 Voicemail detected via automated greeting - marking call as ended")
                    # Notify callback if set (call_session can mark call_ended)
                    if self.on_automated_greeting_detected:
                        try:
                            if asyncio.iscoroutinefunction(self.on_automated_greeting_detected):
                                await self.on_automated_greeting_detected(greeting_text, is_voicemail=True)
                            else:
                                self.on_automated_greeting_detected(greeting_text, is_voicemail=True)
                        except Exception as callback_error:
                            Log.error(f"❌ Error in automated_greeting_detected callback: {callback_error}")
        
        elif msg_type == "error":
            error = message.get("error", {})
            error_str = str(error).lower()
            Log.error(f"❌ [Error from ElevenLabs]: {error}")
            
            # Check if error indicates terminal call end (maximum duration, silence, etc.)
            if self._is_terminal_call_end_reason(error_str):
                Log.info(f"🛑 Error indicates terminal call end - reconnections disabled")
                # Notify callback if set
                if self.on_connection_closed:
                    try:
                        if asyncio.iscoroutinefunction(self.on_connection_closed):
                            await self.on_connection_closed('error', error_str, True)
                        else:
                            self.on_connection_closed('error', error_str, True)
                    except Exception as callback_error:
                        Log.error(f"❌ Error in connection_closed callback: {callback_error}")

        else:
            # Log unknown message types only if they might be important
            # Suppress debug logs for known but unhandled message types
            known_unhandled_types = ["ping", "pong", "agent_chat_response_part"]
            if msg_type not in known_unhandled_types:
                Log.debug(f"📨 [Unknown Message Type]: {msg_type}")
    
    def _is_terminal_call_end_reason(self, close_reason: str) -> bool:
        """
        Check if the WebSocket close reason indicates a terminal call end.
        Terminal reasons mean the call legitimately ended and should not reconnect.
        
        Args:
            close_reason: The close reason string from ElevenLabs (may include "How the call ended: " prefix)
            
        Returns:
            True if this is a terminal call end reason (should not reconnect)
        """
        if not close_reason:
            return False
        
        reason_lower = close_reason.lower()
        
        # Remove "How the call ended:" prefix if present (ElevenLabs format)
        if "how the call ended:" in reason_lower:
            # Extract the actual reason after the colon
            parts = reason_lower.split("how the call ended:", 1)
            if len(parts) > 1:
                reason_lower = parts[1].strip()
        
        # Terminal call end reasons (from ElevenLabs call overview):
        terminal_patterns = [
            # WebSocket code 1000 - normal closure (ElevenLabs intentionally closed, e.g. conversation complete)
            "connection closed cleanly",
            # Client ended call variations
            "client ended call",
            "client ended",
            "user ended call",
            "user hung up",
            "user disconnected",
            
            # Agent ended call variations (all scenarios are terminal)
            "agent ended call",
            "agent ended",
            "agent ended the call",
            
            # Duration and silence
            "call exceeded maximum duration",
            "maximum duration",
            "call ended due to silence",
            "silence",
            
            # Voicemail
            "voicemail detected",
            "voicemail",
            
            # Generic terminal patterns
            "call ended",
            "call terminated",
            "conversation ended",
            "session ended",
        ]
        
        # Check if any terminal pattern matches
        for pattern in terminal_patterns:
            if pattern in reason_lower:
                return True
        
        return False
    
    def is_connected(self) -> bool:
        """Check if WebSocket is connected with improved state detection."""
        if not self.websocket:
            return False
        
        # Check actual WebSocket state first (most reliable)
        try:
            if hasattr(self.websocket, 'state'):
                if hasattr(self.websocket.state, 'name'):
                    state_name = self.websocket.state.name
                    is_open = state_name == 'OPEN'
                    # Sync internal flag with actual state if they don't match
                    if is_open != self.connected:
                        self.connected = is_open
                    return is_open
                # Fallback: check if it's the OPEN state constant
                is_open = self.websocket.state == State.OPEN
                if is_open != self.connected:
                    self.connected = is_open
                return is_open
        except AttributeError:
            # State attribute doesn't exist, fall back to flag
            return self.connected
        except Exception as e:
            # Unexpected error checking state, fall back to flag
            return self.connected
        
        # Fallback to flag if we can't check state
        return self.connected
    
    async def close(self):
        self.running = False
        self.connected = False
        if self.keep_alive_task and not self.keep_alive_task.done():
            self.keep_alive_task.cancel()
            try:
                await self.keep_alive_task
            except asyncio.CancelledError:
                pass
            except Exception:
                pass
            finally:
                self.keep_alive_task = None
        
        if self.websocket:
            try:
                await self.websocket.close()
            except Exception:
                pass
    
   