import asyncio
import array
import audioop
import base64
import json
import logging
import os
import signal
import sys
import time
from dataclasses import dataclass
from collections import deque
from typing import Any, Optional
import types
import re

from .mcube_language import detect_language_preference, resolve_detected_language


def _install_livekit_blingfire_stub() -> None:
    """
    LiveKit Agents currently imports `livekit.agents.tokenize.blingfire`, which loads a native
    extension (`lk_blingfire`). On some Windows setups (WDAC / App Control) that extension is blocked
    and the entire `livekit.agents` import fails.

    The MCube WS bridge doesn't depend on blingfire, so we provide a minimal stub module to keep
    `livekit.agents` importable.
    """

    module_name = "livekit.agents.tokenize.blingfire"
    if module_name in sys.modules:
        return

    class SentenceTokenizer:  # minimal compatibility surface
        def __init__(
            self,
            *,
            min_sentence_len: int = 20,
            stream_context_len: int = 10,
            retain_format: bool = False,
        ) -> None:
            self._min_sentence_len = int(min_sentence_len)
            self._retain_format = bool(retain_format)

        def tokenize(self, text: str, *, language: str | None = None) -> list[str]:
            # Very naive fallback: split on sentence-ending punctuation.
            parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", text or "") if p.strip()]
            return [p for p in parts if len(p) >= self._min_sentence_len]

        def stream(self, *, language: str | None = None):
            raise NotImplementedError("Streaming tokenizer not supported in fallback stub")

    stub = types.ModuleType(module_name)
    stub.__dict__.update({"__all__": ["SentenceTokenizer"], "SentenceTokenizer": SentenceTokenizer})
    sys.modules[module_name] = stub


_install_livekit_blingfire_stub()

import aiohttp
import redis.asyncio as redis_async
import websockets
from livekit.agents.stt.stt import SpeechEventType
from livekit.plugins import cartesia, deepgram, elevenlabs
from livekit import rtc
from livekit.agents.stt.stream_adapter import StreamAdapter
from livekit.plugins.silero.vad import VAD as SileroVAD
from .env_load import load_agent_runtime_dotenv
from .mcube_defaults import apply_voice_defaults_to_dict, get_default_mcube_call_config
from .mcube_codec import MCUBE_AUDIO_SPEC, from_base64_str, pcm16_to_mulaw
from .mq import (
    AI_UTTERANCES_QUEUE,
    RABBITMQ_URL,
    control_queue_name,
    declare_durable_queue,
    connect,
    get_channel,
    publish_json,
    tts_queue_name,
)

log = logging.getLogger("mcube.ws_bridge")

load_agent_runtime_dotenv()

_MCUBE_DEF = get_default_mcube_call_config()

WS_PORT = int(os.getenv("MCUBE_WS_BRIDGE_PORT", "9001"))
WS_HOST = os.getenv("MCUBE_WS_BRIDGE_HOST", "0.0.0.0")

REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")

SYSTEM_PROMPT = (_MCUBE_DEF.get("system_prompt") or "").replace("\\n", "\n")
LLM_MODEL = _MCUBE_DEF.get("llm_model") or ""
LLM_PROVIDER = (_MCUBE_DEF.get("llm_provider") or "").strip()
TTS_MODEL = _MCUBE_DEF.get("tts_model") or ""
TTS_PROVIDER = (_MCUBE_DEF.get("tts_provider") or "").strip()
TTS_VOICE_ID = _MCUBE_DEF.get("tts_voice_id") or ""
TTS_ENCODING = _MCUBE_DEF.get("tts_encoding") or "pcm_16000"
STT_LANGUAGE_CODE = (_MCUBE_DEF.get("stt_language_code") or "").strip()
STT_MODEL_ID = _MCUBE_DEF.get("stt_model_id") or ""
STT_PROVIDER = (_MCUBE_DEF.get("stt_provider") or "").strip()

# Barge-in configuration
_BARGE_IN_MIN_INTERIM_CHARS = int(os.getenv("MCUBE_BARGE_IN_MIN_INTERIM_CHARS", "10"))


@dataclass
class PendingPlay:
    name: str
    future: asyncio.Future[None]


class McubeCallSession:
    def __init__(
        self,
        *,
        call_id: str,
        websocket: websockets.WebSocketServerProtocol,
        rabbit_channel,
        redis_client: redis_async.Redis,
    ):
        self.call_id = call_id
        self.ws = websocket
        self.rabbit_channel = rabbit_channel
        self.redis_client = redis_client
        self.stream_id: Optional[str] = None

        # Playback state (single active utterance at a time).
        self.active_sequence_id: Optional[int] = None
        self.cancelled_sequence_id: Optional[int] = None
        self.bot_playing = False
        self._bot_playing_started_at: Optional[float] = None
        self.response_pending = False  # LLM/TTS running but playback not started yet

        # Used to wait for playedStream(name).
        self.pending_plays: dict[str, PendingPlay] = {}

        # Internal queue of segments to play.
        self.tts_segments: asyncio.Queue[tuple[int, int, bytes]] = asyncio.Queue()
        self._tts_done_received: bool = False
        self._tts_final_chunk_seq: Optional[int] = None
        self._last_tts_chunk_seq: Optional[int] = None

        # Per-call config (defaults can be overridden on MCube call start).
        self.system_prompt = SYSTEM_PROMPT
        self.llm_model = LLM_MODEL
        self.llm_provider = LLM_PROVIDER
        self.first_message: str = (_MCUBE_DEF.get("first_message") or "").strip()
        self._first_message_sent: bool = False

        self.stt_provider = STT_PROVIDER
        self.stt_language_code = STT_LANGUAGE_CODE
        self.stt_model_id = STT_MODEL_ID
        self.detected_language: Optional[str] = None  # Track detected language per call

        self.tts_provider = TTS_PROVIDER
        self.tts_model = TTS_MODEL
        self.tts_voice_id = TTS_VOICE_ID
        self.tts_encoding = TTS_ENCODING

        try:
            self.tts_chunk_ms = max(5, min(500, int(float(_MCUBE_DEF.get("tts_chunk_ms", "200")))))
        except Exception:
            self.tts_chunk_ms = 200
        try:
            self.tts_gain = float(_MCUBE_DEF.get("tts_gain", "0.35"))
        except Exception:
            self.tts_gain = 0.35
        try:
            self.playback_pace_factor = float(_MCUBE_DEF.get("playback_pace_factor", "1.0"))
        except Exception:
            self.playback_pace_factor = 1.0
        try:
            self.checkpoint_every = max(0, int(float(_MCUBE_DEF.get("checkpoint_every", "10"))))
        except Exception:
            self.checkpoint_every = 10

        # Passed through to ai_worker for `business_id_agents` lookups (cluster DB).
        self.business_id: Optional[Any] = None
        self.bot_id: Optional[Any] = None
        self.agent_id: Optional[Any] = None
        self.user_id: Optional[Any] = None
        self.agent_email: Optional[str] = None
        self.agent_name: Optional[str] = None

        # STT (initialized after we receive MCube 'start' so we can load per-call config)
        self.stt: object | None = None
        self.stt_stream = None
        self._stt_ready_event = asyncio.Event()
        self._stt_task: Optional[asyncio.Task[None]] = None
        self._ws_reader_task: Optional[asyncio.Task[None]] = None
        self._playback_task: Optional[asyncio.Task[None]] = None
        self._tts_consumer_task: Optional[asyncio.Task[None]] = None
        self._control_consumer_task: Optional[asyncio.Task[None]] = None

        # STT reconnection state
        self._stt_reconnect_count = 0
        self._stt_max_reconnect_attempts = 20
        self._stt_reconnecting = False

        # Audio resampling state: 8k -> 16k for STT frames.
        self._resample_state = None
        self._pcm16_16k_buffer = bytearray()

        self._frame_16k_samples = 320  # 20ms @ 16k
        self._frame_16k_bytes = self._frame_16k_samples * 2

        # Audio resampling state: 16k -> 8k for TTS playback to MCube (no longer needed - TTS generates 8kHz directly)
        # self._tts_resample_state = None

        self._sequence_lock = asyncio.Lock()

        # Noise cancellation (Simple noise gate for manual frame processing)
        # Phase 6.1: Implemented simple noise gate instead of LiveKit BVC
        # LiveKit BVC requires AudioStream integration, not manual frame processing
        # Noise gate filters out low-volume background noise (threshold-based)
        # Phase 8.4: Reduced threshold from 500 to 300 for less aggressive clipping
        # Phase 8.5: Further reduced to 100 to allow user speech through
        self.noise_cancellation_enabled = True
        self.noise_gate_threshold = 100  # Threshold for noise gate (RMS value)
        self.noise_gate_enabled = True

        # Background audio (not applicable for MCube telephony - only for LiveKit rooms)
        # MCube uses direct WebSocket telephony, not LiveKit room-based architecture
        self.background_audio_enabled = False

    async def start(self):
        # Start STT consumer and playback.
        self._stt_task = asyncio.create_task(self._consume_stt_events(), name="mcube.stt_events")
        self._playback_task = asyncio.create_task(self._playback_loop(), name="mcube.playback_loop")

        # Start MQ consumer for this call's TTS queue.
        self._tts_consumer_task = asyncio.create_task(self._consume_tts_queue(), name="mcube.tts_consumer")

        # Start MQ consumer for this call's control queue (terminate/transfer).
        self._control_consumer_task = asyncio.create_task(
            self._consume_control_queue(), name="mcube.control_consumer"
        )

        # Read websocket events.
        await self._ws_reader_loop()

    async def _ws_reader_loop(self):
        try:
            async for raw in self.ws:
                try:
                    msg = json.loads(raw)
                except Exception:
                    log.warning("ws non-json frame ignored call_id=%s", self.call_id)
                    continue

                try:
                    match msg.get("event"):
                        case "start":
                            await self._on_start(msg)
                        case "media":
                            await self._on_media(msg)
                        case "playedStream":
                            await self._on_played_stream(msg)
                        case "stop":
                            break
                        case _:
                            # ignore unknown events
                            pass
                except Exception:
                    # Never crash the session because of one malformed frame.
                    log.exception("ws event handling failed call_id=%s event=%s", self.call_id, msg.get("event"))
        finally:
            await self._cleanup()
            # Cancel tasks gracefully.

    async def _cleanup(self):
        try:
            if self.stream_id:
                await self.ws.send(json.dumps({"event": "terminate", "streamId": self.stream_id}))
        except Exception:
            pass

        for pp in list(self.pending_plays.values()):
            if not pp.future.done():
                pp.future.cancel()
        self.pending_plays.clear()

        # Stop STT input.
        try:
            if self.stt_stream is not None:
                self.stt_stream.end_input()
        except Exception:
            pass

        # Close STT HTTP session (required for LiveKit plugins like Cartesia).
        try:
            if getattr(self, "_http_session", None) is not None:
                await self._http_session.close()
        except Exception:
            pass

        # Cancel background tasks.
        for t in [self._control_consumer_task, self._tts_consumer_task, self._stt_task, self._playback_task]:
            if t and not t.done():
                t.cancel()

    async def _on_start(self, msg: dict):
        self.stream_id = msg["start"]["streamId"]
        encoding = msg["start"]["mediaFormat"].get("encoding")
        sample_rate = int(msg["start"]["mediaFormat"].get("sampleRate", MCUBE_AUDIO_SPEC.sample_rate))

        # MCube sends 8kHz mulaw, we resample to 16kHz for STT and downsample TTS 16kHz->8kHz for playback
        if encoding != MCUBE_AUDIO_SPEC.encoding or sample_rate != MCUBE_AUDIO_SPEC.sample_rate:
            log.info(
                "MCube audio format: call_id=%s encoding=%s sr=%s (will resample to 16kHz for STT)",
                self.call_id,
                encoding,
                sample_rate,
            )

        log.info("mcube start call_id=%s stream_id=%s", self.call_id, self.stream_id)

        await self._load_call_config()
        await self._init_stt()

        # Optional: bot speaks immediately on connect.
        if self.first_message and not self._first_message_sent:
            self._first_message_sent = True
            await self._publish_utterance("", bot_say_text=self.first_message, language=self.stt_language_code)

    async def _load_call_config(self) -> None:
        """
        Load per-call config from Redis (written by the /api/mcube/outbound-call endpoint).
        If not present, fall back to env defaults.
        """
        try:
            raw = await self.redis_client.get(f"mcube_call_config:{self.call_id}")
            if not raw:
                return
            if isinstance(raw, (bytes, bytearray)):
                raw_str = raw.decode("utf-8", errors="replace")
            else:
                raw_str = str(raw)
            cfg = json.loads(raw_str)
            apply_voice_defaults_to_dict(cfg)
        except Exception:
            log.exception("failed to load mcube call config call_id=%s", self.call_id)
            return

        self.system_prompt = cfg.get("system_prompt", self.system_prompt)
        self.first_message = (cfg.get("first_message") or self.first_message or "").strip()
        self.llm_model = cfg.get("llm_model", self.llm_model)
        self.llm_provider = cfg.get("llm_provider", self.llm_provider) or ""

        self.stt_provider = cfg.get("stt_provider", self.stt_provider) or self.stt_provider
        self.stt_language_code = cfg.get("stt_language_code", self.stt_language_code) or self.stt_language_code
        self.stt_model_id = cfg.get("stt_model_id", self.stt_model_id) or self.stt_model_id

        self.tts_provider = cfg.get("tts_provider", self.tts_provider) or self.tts_provider
        self.tts_model = cfg.get("tts_model", self.tts_model) or self.tts_model
        self.tts_voice_id = cfg.get("tts_voice_id", self.tts_voice_id) or self.tts_voice_id
        self.tts_encoding = cfg.get("tts_encoding", self.tts_encoding) or self.tts_encoding

        for ck, attr, caster in (
            ("tts_chunk_ms", "tts_chunk_ms", lambda x: max(5, min(500, int(float(x))))),
            ("tts_gain", "tts_gain", lambda x: float(x)),
            ("playback_pace_factor", "playback_pace_factor", lambda x: float(x)),
            ("checkpoint_every", "checkpoint_every", lambda x: max(0, int(float(x)))),
        ): 
            if cfg.get(ck) is not None:
                try:
                    setattr(self, attr, caster(cfg.get(ck)))
                except Exception:
                    pass

        id_map = (
            ("business_id", "business_id"),
            ("bot_id", "bot_id"),
            ("agent_id", "agent_id"),
            ("user_id", "user_id"),
            ("agent_email", "agent_email"),
            ("email", "agent_email"),
            ("agent_name", "agent_name"),
        )
        for json_key, attr in id_map:
            if cfg.get(json_key) is not None:
                setattr(self, attr, cfg.get(json_key))

    async def _init_stt(self) -> None:
        """
        Initialize STT stream for the configured provider.
        """
        # Reset audio buffers + resampler so early/old frames don't leak into the new stream.
        self._resample_state = None
        self._pcm16_16k_buffer = bytearray()
        self._stt_ready_event.clear()

        elevenlabs_key = os.getenv("ELEVENLABS_API_KEY") or os.getenv("ELEVEN_API_KEY") or ""
        cartesia_key = os.getenv("CARTESIA_API_KEY") or ""
        deepgram_key = os.getenv("DEEPGRAM_API_KEY") or ""

        # Create an explicit http session for STT plugins.
        # LiveKit plugins require this when used outside the standard agent worker job context.
        self._http_session = aiohttp.ClientSession()

        log.info(
            "mcube stt init provider=%s model_id=%s lang=%s noise_cancellation=%s",
            self.stt_provider,
            self.stt_model_id,
            self.stt_language_code,
            self.noise_cancellation_enabled,
        )

        # Phase 6.1: Simple noise gate - no initialization needed

        if self.stt_provider == "deepgram":
            if not deepgram_key:
                log.warning("DEEPGRAM_API_KEY not set; falling back to elevenlabs STT")
            else:
                self.stt = deepgram.STT(
                    model=self.stt_model_id or "nova-2",
                    language=self.stt_language_code or "en",
                    api_key=deepgram_key,
                    http_session=self._http_session,
                )
                self.stt_stream = self.stt.stream()
                self._stt_ready_event.set()
                return

        if self.stt_provider == "cartesia":
            if not cartesia_key:
                log.warning("CARTESIA_API_KEY not set; falling back to elevenlabs STT")
            else:
                self.stt = cartesia.STT(
                    model=self.stt_model_id or "ink-whisper",
                    language=self.stt_language_code or "en",
                    encoding="pcm_s16le",
                    sample_rate=16000,
                    api_key=cartesia_key,
                    http_session=self._http_session,
                )
                self.stt_stream = self.stt.stream()
                self._stt_ready_event.set()
                return

        # Default: ElevenLabs STT
        # In this LiveKit version, elevenlabs.STT may be non-streaming.
        # We wrap it with StreamAdapter so our WS bridge can still drive it
        # using the same streaming audio-frame pipeline.
        # Configure for 200 concurrent calls with extended timeouts
        base_stt = elevenlabs.STT(
            model_id="scribe_v2_realtime",
            language_code=self.stt_language_code,
            api_key=elevenlabs_key,
            http_session=self._http_session,
        )

        try:
            self.stt = base_stt
            self.stt_stream = self.stt.stream()
        except NotImplementedError:
            # ElevenLabs STT is non-streaming in our installed LiveKit version.
            # Use LiveKit's StreamAdapter + a real VAD implementation so we can
            # drive the rest of the pipeline as if STT was streaming.
            vad = SileroVAD.load(sample_rate=16000, force_cpu=True)
            self.stt = StreamAdapter(stt=base_stt, vad=vad)
            self.stt_stream = self.stt.stream()
        self._stt_ready_event.set()

    async def _reinit_stt_with_language(self, language: str) -> None:
        """
        Reinitialize STT with a new language after detection.
        """
        log.info("mcube reinit_stt call_id=%s new_language=%s", self.call_id, language)
        
        # Close existing STT
        try:
            if self.stt_stream:
                self.stt_stream.end_input()
        except Exception:
            pass
        
        # Close HTTP session
        try:
            if getattr(self, "_http_session", None) is not None:
                await self._http_session.close()
        except Exception:
            pass

        # Reset state
        self._resample_state = None
        self._pcm16_16k_buffer = bytearray()
        self._stt_ready_event.clear()
        self.stt_language_code = language

        # Reinitialize with new language
        await self._init_stt()

    async def _on_media(self, msg: dict):
        if not self.stream_id:
            return
        if not self._stt_ready_event.is_set():
            # Ignore early frames until STT is initialized with per-call config.
            return

        media = msg.get("media") or {}
        if media.get("track") != "inbound":
            return

        b64 = media.get("payload") or ""
        try:
            mulaw_bytes = from_base64_str(b64)
            pcm16_8k = audioop.ulaw2lin(mulaw_bytes, 2)
        except Exception:
            log.warning("invalid media payload ignored call_id=%s bytes=%s", self.call_id, len(b64))
            return

        # For now, we buffer/resample 8k -> 16k and feed fixed-size 20ms frames to STT.
        pcm16_16k, self._resample_state = audioop.ratecv(
            pcm16_8k,
            2,
            1,
            8000,
            16000,
            self._resample_state,
        )

        # Phase 6.1: Apply simple noise gate (filters low-volume background noise)
        if self.noise_gate_enabled:
            # Calculate RMS (root mean square) to detect noise level
            samples = array.array('h', pcm16_16k)
            if samples:
                rms = sum(abs(s) for s in samples) / len(samples)
                # If below threshold, suppress the audio (set to zeros)
                if rms < self.noise_gate_threshold:
                    pcm16_16k = b'\x00' * len(pcm16_16k)

        self._pcm16_16k_buffer.extend(pcm16_16k)

        while len(self._pcm16_16k_buffer) >= self._frame_16k_bytes:
            frame_bytes = bytes(self._pcm16_16k_buffer[: self._frame_16k_bytes])
            del self._pcm16_16k_buffer[: self._frame_16k_bytes]
            frame = rtc.AudioFrame(
                frame_bytes,
                sample_rate=16000,
                num_channels=1,
                samples_per_channel=self._frame_16k_samples,
            )
            assert self.stt_stream is not None
            try:
                self.stt_stream.push_frame(frame)
            except (aiohttp.ClientConnectionResetError, aiohttp.ClientConnectionError, ConnectionResetError, RuntimeError) as e:
                log.warning("stt push failed call_id=%s error=%s; reinitializing STT", self.call_id, e)
                await self._reinit_stt_with_language(self.detected_language or self.stt_language_code or "en")
                return

    async def _on_played_stream(self, msg: dict):
        name = msg.get("name")
        if not name:
            return

        pp = self.pending_plays.get(name)
        if pp and not pp.future.done():
            pp.future.set_result(None)
            # Clear it now that it's acknowledged.
            self.pending_plays.pop(name, None)

    async def _consume_stt_events(self):
        while self._stt_reconnect_count < self._stt_max_reconnect_attempts:
            try:
                await self._stt_ready_event.wait()
                assert self.stt_stream is not None

                async for ev in self.stt_stream:
                    if ev.type == SpeechEventType.INTERIM_TRANSCRIPT:
                        if self.bot_playing:
                            # Phase 3.4: Enhanced barge-in logic to distinguish true interruptions from backchanneling
                            alt = ev.alternatives[0] if ev.alternatives else None
                            partial = (alt.text if alt else "").strip().lower()
                            
                            # Filter out backchanneling (uh-huh, okay, right, mm-hmm, etc.)
                            backchannel_words = {"uh-huh", "uhhuh", "okay", "ok", "right", "yes", "yeah", "mm-hmm", "mhm", "mmhmm"}
                            is_backchannel = any(word in partial for word in backchannel_words)
                            playback_age = (
                                time.time() - self._bot_playing_started_at
                                if self._bot_playing_started_at is not None
                                else 999.0
                            )
                            word_count = len(partial.split())
                            is_substantial_interrupt = len(partial) >= _BARGE_IN_MIN_INTERIM_CHARS or word_count >= 3
                            
                            # Only trigger barge-in if:
                            # 1. Not a backchannel acknowledgment
                            # 2. Text is substantial enough to avoid echo/noise false positives
                            # 3. Playback has been audible long enough to distinguish real interruption
                            if partial and not is_backchannel and is_substantial_interrupt and playback_age >= 0.8:
                                log.info("barge-in triggered call_id=%s partial=%r", self.call_id, partial)
                                await self._barge_in_clear()
                        continue

                    if ev.type == SpeechEventType.FINAL_TRANSCRIPT:
                        alt = ev.alternatives[0] if ev.alternatives else None
                        text = (alt.text if alt else "").strip() if alt else ""
                        if text:
                            # The actual utterance publish happens from websocket reader loop
                            # via a shared hook. We'll store it and let the websocket loop schedule.
                            await self._handle_final_transcript(text)

            except (aiohttp.ClientConnectionResetError, aiohttp.ClientConnectionError, ConnectionResetError) as e:
                self._stt_reconnect_count += 1
                if self._stt_reconnect_count >= self._stt_max_reconnect_attempts:
                    log.error("STT reconnection failed after %d attempts call_id=%s error=%s", 
                              self._stt_reconnect_count, self.call_id, str(e))
                    break
                
                # Exponential backoff: 1s, 2s, 4s, 8s, 16s
                backoff_delay = min(2 ** (self._stt_reconnect_count - 1), 16)
                log.warning("STT connection reset attempt=%d/%d call_id=%s reconnecting in %ds error=%s",
                           self._stt_reconnect_count, self._stt_max_reconnect_attempts, 
                           self.call_id, backoff_delay, str(e))
                
                await asyncio.sleep(backoff_delay)
                
                # Reinitialize STT
                try:
                    if self._http_session and not self._http_session.closed:
                        await self._http_session.close()
                    self._http_session = aiohttp.ClientSession()
                    await self._init_stt()
                    log.info("STT reconnection successful call_id=%s attempt=%d", 
                            self.call_id, self._stt_reconnect_count)
                except Exception as init_error:
                    log.error("STT reinitialization failed call_id=%s error=%s", 
                             self.call_id, str(init_error))
                    
            except Exception as e:
                log.error("STT event processing error call_id=%s error=%s", self.call_id, str(e))
                break

    async def _handle_final_transcript(self, text: str):
        # If the caller speaks over the bot, stop playback and process the final
        # transcript as the caller's interruption/doubt instead of dropping it.
        if self.bot_playing:
            log.info("barge-in final call_id=%s text=%r", self.call_id, text[:80])
            # Add small delay to ensure user is actually trying to interrupt
            await asyncio.sleep(0.3)
            await self._barge_in_clear()
        elif self.response_pending:
            # While the prior LLM/TTS turn is still in flight, we normally drop transcripts
            # to avoid piling up — but language-choice replies (e.g. "Hindi") must never be lost.
            pref = detect_language_preference(text)
            has_indic_script = any(
                ("\u0900" <= c <= "\u097F") or ("\u0C80" <= c <= "\u0CFF") for c in (text or "")
            )
            if pref is None and not has_indic_script:
                return
            log.info(
                "mcube language_reply_while_pending call_id=%s pref=%s script=%s text=%r",
                self.call_id,
                pref,
                has_indic_script,
                text[:80],
            )
            self._force_stop_playback()

        if text:
            self.detected_language = resolve_detected_language(self.stt_language_code or "en", text)
            if str(self.stt_language_code or "en") != str(self.detected_language):
                log.info(
                    "mcube language_detect_override call_id=%s stt_default=%s resolved=%s text=%r",
                    self.call_id,
                    self.stt_language_code or "en",
                    self.detected_language,
                    text[:80],
                )
            log.info(
                "mcube language_resolved call_id=%s language=%s text=%r",
                self.call_id,
                self.detected_language,
                text[:50],
            )
            
        # Reinitialize STT only for languages the configured STT path can reasonably handle.
        if self.detected_language in ("en", "hi") and self.detected_language != self.stt_language_code:
            await self._reinit_stt_with_language(self.detected_language)

        # Allocate per-utterance sequence id.
        # (We still store in redis in case we restart; for now it's best-effort.)
        await self._publish_utterance(text, language=self.detected_language or self.stt_language_code)

    async def _publish_utterance(self, text: str, *, bot_say_text: str | None = None, language: str = "en"):
        # Sequence id: try to atomically allocate from redis. If redis is down, fall back to time-based.
        async with self._sequence_lock:
            try:
                seq = await self.redis_client.incr(f"mcube:seq:{self.call_id}")
                await self.redis_client.expire(f"mcube:seq:{self.call_id}", 3600)
            except Exception:
                seq = int(time.time())

        self.active_sequence_id = int(seq)
        # Bot should only be considered "speaking" after we start sending TTS segments.
        self.bot_playing = False
        self._bot_playing_started_at = None
        self.response_pending = True
        self._tts_done_received = False
        self._tts_final_chunk_seq = None
        self._last_tts_chunk_seq = None

        # Publish utterance to AI worker.
        payload = {
            "call_id": self.call_id,
            "sequence_id": int(seq),
            "transcript": text,
            "system_prompt": self.system_prompt,
            "llm_model": self.llm_model,
            "llm_provider": self.llm_provider,
            "tts_provider": self.tts_provider,
            "tts_model": self.tts_model,
            "tts_voice_id": self.tts_voice_id,
            "tts_encoding": self.tts_encoding,
            "tts_chunk_ms": self.tts_chunk_ms,
            "stt_language_code": language,
            "stt_provider": self.stt_provider,
            "stt_model_id": self.stt_model_id,
        }
        if bot_say_text:
            payload["bot_say_text"] = bot_say_text

        for k in ("business_id", "bot_id", "agent_id", "user_id", "agent_email", "agent_name"):
            v = getattr(self, k, None)
            if v is not None:
                payload[k] = v

        await publish_json(
            channel=self.rabbit_channel,
            queue_name=AI_UTTERANCES_QUEUE,
            payload=payload,
        )
        log.info(
            "published utterance call_id=%s seq=%s text=%r tts_provider=%s tts_model=%s tts_voice_id=%s tts_encoding=%s language=%s",
            self.call_id,
            seq,
            text,
            self.tts_provider,
            self.tts_model,
            self.tts_voice_id,
            self.tts_encoding,
            language,
        )

    async def _consume_tts_queue(self):
        # This consumer listens to per-call queue `tts.{call_id}`.
        if not self.stream_id:
            # Wait until start event arrives.
            while not self.stream_id:
                await asyncio.sleep(0.01)

        queue_name = tts_queue_name(self.call_id)

        # Declare queue and consume.
        queue = await self.rabbit_channel.declare_queue(queue_name, durable=True)

        async with queue.iterator() as q_iter:
            async for message in q_iter:
                async with message.process(requeue=False):
                    data = message.body.decode("utf-8")
                    try:
                        payload = json.loads(data)
                    except Exception:
                        continue

                    seq = int(payload.get("sequence_id", -1))
                    if self.cancelled_sequence_id is not None and seq == self.cancelled_sequence_id:
                        continue

                    if payload.get("type") == "tts_audio_chunk":
                        if self.active_sequence_id is None or seq != self.active_sequence_id:
                            continue
                        chunk_seq = int(payload["chunk_seq"])
                        self._last_tts_chunk_seq = chunk_seq
                        pcm16_8k_bytes = from_base64_str(payload["audio_pcm_b64"])
                        if chunk_seq == 0:
                            log.info(
                                "tts_audio_chunk_first call_id=%s seq=%s bytes=%s",
                                self.call_id,
                                seq,
                                len(pcm16_8k_bytes),
                            )
                        await self.tts_segments.put((seq, chunk_seq, pcm16_8k_bytes))

                    elif payload.get("type") in ("tts_done", "tts_end"):
                        if self.active_sequence_id is None or seq != self.active_sequence_id:
                            continue
                        self._tts_done_received = True
                        final_chunk_seq = payload.get("final_chunk_seq")
                        self._tts_final_chunk_seq = (
                            int(final_chunk_seq) if final_chunk_seq is not None else self._last_tts_chunk_seq
                        )
                        self._maybe_finish_playback()

    def _extract_transfer_to(self, payload: dict) -> Optional[str]:
        """
        Extract transfer target from tool/control payload.

        We accept several keys so different producers (ai_worker variants) stay compatible.
        """
        transfer_to = (
            payload.get("transfer_to")
            or payload.get("phone_number")
            or payload.get("transfer_number")
            or payload.get("sip_uri")
        )
        if not transfer_to or not isinstance(transfer_to, str):
            return None

        transfer_to = transfer_to.strip()
        if not transfer_to:
            return None

        # If passed as `sip:+918...@host` convert to E.164-like number.
        if transfer_to.lower().startswith("sip:"):
            try:
                transfer_to = transfer_to.split(":", 1)[1].split("@", 1)[0]
            except Exception:
                return None

        return transfer_to

    def _force_stop_playback(self, *, cancelled_seq: Optional[int] = None) -> None:
        """
        Immediately stop any bot playback and cancel pending playedStream futures.
        This is used for MCube `terminate/transfer` and should be more aggressive
        than barge-in clear.
        """
        if cancelled_seq is None:
            cancelled_seq = self.active_sequence_id

        self.cancelled_sequence_id = cancelled_seq
        self.bot_playing = False
        self._bot_playing_started_at = None
        self.response_pending = False
        self.active_sequence_id = None

        self._tts_done_received = False
        self._tts_final_chunk_seq = None
        self._last_tts_chunk_seq = None

        # Cancel any pending playedStream waits.
        for _, pp in list(self.pending_plays.items()):
            if not pp.future.done():
                pp.future.cancel()
        self.pending_plays.clear()

        # Drain any queued segments so playback loop doesn't wait on them.
        try:
            while True:
                self.tts_segments.get_nowait()
        except asyncio.QueueEmpty:
            pass

    async def _consume_control_queue(self) -> None:
        """
        Consume per-call control queue and execute MCube call actions (terminate/transfer).
        """
        queue_name = control_queue_name(self.call_id)
        queue = await self.rabbit_channel.declare_queue(queue_name, durable=True)

        async with queue.iterator() as q_iter:
            async for message in q_iter:
                async with message.process(requeue=False):
                    try:
                        payload = json.loads(message.body.decode("utf-8"))
                    except Exception:
                        continue

                    await self._handle_control_message(payload)

    async def _handle_control_message(self, payload: dict) -> None:
        msg_type = payload.get("type")
        seq = payload.get("sequence_id")
        cancelled_seq: Optional[int] = int(seq) if isinstance(seq, (int, str)) and str(seq).isdigit() else None

        # Only handle tool actions that apply to the currently active speaking sequence.
        if (
            cancelled_seq is not None
            and self.active_sequence_id is not None
            and cancelled_seq != self.active_sequence_id
        ):
            # Stale control message; ignore.
            return

        if msg_type == "mcube_terminate":
            if self.stream_id:
                await self.ws.send(json.dumps({"event": "terminate", "streamId": self.stream_id}))
            self._force_stop_playback(cancelled_seq=cancelled_seq)
            return

        if msg_type == "mcube_transfer":
            transfer_to = self._extract_transfer_to(payload)
            if transfer_to and self.stream_id:
                await self.ws.send(
                    json.dumps(
                        {
                            "event": "transfer",
                            "streamId": self.stream_id,
                            "transferTo": transfer_to,
                            "showOriginalCallerId": True,
                        }
                    )
                )
            self._force_stop_playback(cancelled_seq=cancelled_seq)
            return

        # Unknown control message types are ignored.

    async def _playback_loop(self):
        # Real-time paced playback.
        # We intentionally DO NOT block on `playedStream` acks: waiting on per-chunk acks can create
        # periodic gaps if MCube doesn't ack every segment. Instead we pace by chunk duration.
        checkpoint_every = self.checkpoint_every

        while True:
            seq, chunk_seq, pcm16_8k_bytes = await self.tts_segments.get()

            # If we were cancelled mid-playback, drop remaining chunks.
            if self.cancelled_sequence_id is not None and seq == self.cancelled_sequence_id:
                continue
            if self.active_sequence_id is None or seq != self.active_sequence_id:
                continue
            if not self.stream_id:
                continue

            name = f"{self.call_id}_seg_{chunk_seq}"
            if not self.bot_playing:
                self._bot_playing_started_at = time.time()
            self.bot_playing = True

            # Send playAudio + checkpoint as per MCube docs.
            # Empirically, some synthesized audio can clip and sound like pops/gunshots after μ-law companding.
            # Apply a small gain reduction before converting to μ-law.
            try:
                import audioop

                gain = float(self.tts_gain)
                pcm16_8k_bytes = audioop.mul(pcm16_8k_bytes, 2, gain)
            except Exception:
                pass

            # TTS now generates 8kHz audio directly (no downsampling needed)
            mulaw = pcm16_to_mulaw(pcm16_8k_bytes)
            await self.ws.send(
                json.dumps(
                    {
                        "event": "playAudio",
                        "media": {
                            "contentType": MCUBE_AUDIO_SPEC.encoding,
                            "sampleRate": 8000,  # Send at 8kHz for MCube
                            "payload": base64.b64encode(mulaw).decode("ascii"),
                            "name": name,
                        },
                    }
                )
            )

            # Optional: send checkpoint every N chunks (and always the first) for observability.
            if checkpoint_every > 0 and (chunk_seq == 0 or (chunk_seq % checkpoint_every == 0)):
                await self.ws.send(
                    json.dumps(
                        {
                            "event": "checkpoint",
                            "streamId": self.stream_id,
                            "name": name,
                        }
                    )
                )

            # Pace by duration of this chunk (PCM16 mono @ 8k after downsampling).
            try:
                chunk_duration_s = (len(pcm16_8k_bytes) / 2) / 8000  # Use 8000 since we downsampled to 8kHz
                pace = float(self.playback_pace_factor)
                await asyncio.sleep(max(0.0, chunk_duration_s * 0.98 * pace))
            except Exception:
                await asyncio.sleep(0.1)

            self._maybe_finish_playback(played_chunk_seq=chunk_seq)

    def _maybe_finish_playback(self, *, played_chunk_seq: Optional[int] = None) -> None:
        if not self._tts_done_received or self._tts_final_chunk_seq is None:
            return

        last_played = played_chunk_seq if played_chunk_seq is not None else self._last_tts_chunk_seq
        if last_played is None or last_played < self._tts_final_chunk_seq or not self.tts_segments.empty():
            return

        self.bot_playing = False
        self._bot_playing_started_at = None
        self.response_pending = False
        self.cancelled_sequence_id = None
        self.active_sequence_id = None
        self._tts_done_received = False
        self._tts_final_chunk_seq = None
        self._last_tts_chunk_seq = None

    async def _barge_in_clear(self):
        if not self.stream_id:
            return
        if not self.bot_playing:
            return

        log.info("barge-in clear call_id=%s seq=%s", self.call_id, self.active_sequence_id)
        self.cancelled_sequence_id = self.active_sequence_id
        self.bot_playing = False
        self._bot_playing_started_at = None
        self.response_pending = False
        self.active_sequence_id = None

        # Cancel any pending playedStream waits.
        for name, pp in list(self.pending_plays.items()):
            if not pp.future.done():
                pp.future.cancel()
            self.pending_plays.pop(name, None)

        # Clear current playback on MCube.
        try:
            await self.ws.send(json.dumps({"event": "clearAudio", "streamId": self.stream_id}))
        except Exception:
            pass

        # Drain any queued segments for the old sequence.
        try:
            while True:
                self.tts_segments.get_nowait()
        except asyncio.QueueEmpty:
            pass


async def _ws_handler(websocket, *, rabbit_channel, redis_client):
    # websockets>=12 passes only `websocket` to the handler.
    # The request path is available as `websocket.path`.
    path = getattr(websocket, "path", "") or ""
    # Some websocket versions/proxies may not populate `websocket.path` reliably.
    req = getattr(websocket, "request", None)
    if not path and req is not None:
        path = getattr(req, "path", "") or ""

    log.info(
        "ws accepted path=%s remote=%s call_hint_from_path=%s",
        path,
        getattr(websocket, "remote_address", None),
        path.strip("/").split("/")[-1] if path else "",
    )

    # Path expected: /bid/websocket/{callId} (or /ws/{callId} depending on MCube env)
    parts = path.strip("/").split("/") if path else []
    call_id = parts[-1] if parts else ""
    if not call_id:
        await websocket.close(code=1008, reason="missing call_id")
        return

    sess = McubeCallSession(
        call_id=call_id,
        websocket=websocket,
        rabbit_channel=rabbit_channel,
        redis_client=redis_client,
    )
    await sess.start()


async def main():
    logging.basicConfig(level=logging.INFO)

    connection = await connect()
    channel = await get_channel(connection)

    redis_client = redis_async.from_url(REDIS_URL, decode_responses=False)

    log.info("mcube ws bridge listening on %s:%s", WS_HOST, WS_PORT)
    stop = asyncio.Future()

    def _signal_handler(*_):
        if not stop.done():
            stop.set_result(True)

    loop = asyncio.get_running_loop()
    for sig in [signal.SIGINT, signal.SIGTERM]:
        try:
            loop.add_signal_handler(sig, _signal_handler)
        except NotImplementedError:
            # Windows fallback (not expected here)
            pass

    async def runner():
        async with websockets.serve(
            lambda ws: _ws_handler(ws, rabbit_channel=channel, redis_client=redis_client),
            WS_HOST,
            WS_PORT,
            max_size=2**23,
            ping_interval=20,
            ping_timeout=60,
        ):
            await stop

    await runner()


if __name__ == "__main__":
    asyncio.run(main())

# import asyncio
# import array
# import audioop
# import base64
# import json
# import logging
# import os
# import signal
# import sys
# import time
# from dataclasses import dataclass
# from collections import deque
# from typing import Any, Optional
# import types
# import re


# def detect_language(text: str) -> str:
#     """
#     Detect if text is Hindi/Hinglish or English.
#     Returns: 'hi' for Hindi/Hinglish, 'en' for English.
#     """
#     if not text:
#         return "en"
    
#     # Check for Hindi Unicode characters (Devanagari script range)
#     hindi_chars = sum(1 for char in text if '\u0900' <= char <= '\u097F')
#     if hindi_chars > 0:
#         return "hi"
    
#     # Check for common Hindi words written in Latin script (Hinglish)
#     hinglish_words = [
#         'namaste', 'kaise', 'ho', 'aap', 'main', 'hai', 'kya', 'kar', 'raha',
#         'hai', 'nahi', 'please', 'thank', 'you', 'ji', 'bhai', 'yaar', 'dost',
#         'acha', 'theek', 'hai', 'sahi', 'galat', 'batao', 'samajh', 'nahi',
#         'kripya', 'karein', 'dhanyawad', 'shukriya', 'haan', 'na', 'bolo'
#     ]
    
#     text_lower = text.lower()
#     hinglish_count = sum(1 for word in hinglish_words if word in text_lower.split())
    
#     # If more than 1 Hinglish word detected, treat as Hindi
#     if hinglish_count >= 1:
#         return "hi"
    
#     return "en"


# def _install_livekit_blingfire_stub() -> None:
#     """
#     LiveKit Agents currently imports `livekit.agents.tokenize.blingfire`, which loads a native
#     extension (`lk_blingfire`). On some Windows setups (WDAC / App Control) that extension is blocked
#     and the entire `livekit.agents` import fails.

#     The MCube WS bridge doesn't depend on blingfire, so we provide a minimal stub module to keep
#     `livekit.agents` importable.
#     """

#     module_name = "livekit.agents.tokenize.blingfire"
#     if module_name in sys.modules:
#         return

#     class SentenceTokenizer:  # minimal compatibility surface
#         def __init__(
#             self,
#             *,
#             min_sentence_len: int = 20,
#             stream_context_len: int = 10,
#             retain_format: bool = False,
#         ) -> None:
#             self._min_sentence_len = int(min_sentence_len)
#             self._retain_format = bool(retain_format)

#         def tokenize(self, text: str, *, language: str | None = None) -> list[str]:
#             # Very naive fallback: split on sentence-ending punctuation.
#             parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", text or "") if p.strip()]
#             return [p for p in parts if len(p) >= self._min_sentence_len]

#         def stream(self, *, language: str | None = None):
#             raise NotImplementedError("Streaming tokenizer not supported in fallback stub")

#     stub = types.ModuleType(module_name)
#     stub.__dict__.update({"__all__": ["SentenceTokenizer"], "SentenceTokenizer": SentenceTokenizer})
#     sys.modules[module_name] = stub


# _install_livekit_blingfire_stub()

# import aiohttp
# import redis.asyncio as redis_async
# import websockets
# from livekit.agents.stt.stt import SpeechEventType
# from livekit.plugins import cartesia, deepgram, elevenlabs, noise_cancellation
# from livekit import rtc
# from livekit.agents.stt.stream_adapter import StreamAdapter
# from livekit.plugins.silero.vad import VAD as SileroVAD
# from .env_load import load_agent_runtime_dotenv
# from .mcube_defaults import get_default_mcube_call_config
# from .mcube_codec import MCUBE_AUDIO_SPEC, from_base64_str, pcm16_to_mulaw
# from .mq import (
#     AI_UTTERANCES_QUEUE,
#     RABBITMQ_URL,
#     control_queue_name,
#     declare_durable_queue,
#     connect,
#     get_channel,
#     publish_json,
#     tts_queue_name,
# )

# log = logging.getLogger("mcube.ws_bridge")

# load_agent_runtime_dotenv()

# _MCUBE_DEF = get_default_mcube_call_config()

# WS_PORT = int(os.getenv("MCUBE_WS_BRIDGE_PORT", "9001"))
# WS_HOST = os.getenv("MCUBE_WS_BRIDGE_HOST", "0.0.0.0")

# REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")

# SYSTEM_PROMPT = (_MCUBE_DEF.get("system_prompt") or "").replace("\\n", "\n")
# LLM_MODEL = _MCUBE_DEF["llm_model"]
# LLM_PROVIDER = _MCUBE_DEF.get("llm_provider") or ""
# TTS_MODEL = _MCUBE_DEF["tts_model"]
# TTS_PROVIDER = _MCUBE_DEF.get("tts_provider") or "elevenlabs"
# TTS_VOICE_ID = _MCUBE_DEF["tts_voice_id"]
# TTS_ENCODING = _MCUBE_DEF["tts_encoding"]
# STT_LANGUAGE_CODE = _MCUBE_DEF["stt_language_code"]
# STT_MODEL_ID = _MCUBE_DEF["stt_model_id"]
# STT_PROVIDER = _MCUBE_DEF["stt_provider"]


# @dataclass
# class PendingPlay:
#     name: str
#     future: asyncio.Future[None]


# class McubeCallSession:
#     def __init__(
#         self,
#         *,
#         call_id: str,
#         websocket: websockets.WebSocketServerProtocol,
#         rabbit_channel,
#         redis_client: redis_async.Redis,
#     ):
#         self.call_id = call_id
#         self.ws = websocket
#         self.rabbit_channel = rabbit_channel
#         self.redis_client = redis_client
#         self.stream_id: Optional[str] = None

#         # Playback state (single active utterance at a time).
#         self.active_sequence_id: Optional[int] = None
#         self.cancelled_sequence_id: Optional[int] = None
#         self.bot_playing = False
#         self.response_pending = False  # LLM/TTS running but playback not started yet

#         # Used to wait for playedStream(name).
#         self.pending_plays: dict[str, PendingPlay] = {}

#         # Internal queue of segments to play.
#         self.tts_segments: asyncio.Queue[tuple[int, int, bytes]] = asyncio.Queue()
#         self._tts_done_received: bool = False
#         self._tts_final_chunk_seq: Optional[int] = None

#         # Per-call config (defaults can be overridden on MCube call start).
#         self.system_prompt = SYSTEM_PROMPT
#         self.llm_model = LLM_MODEL
#         self.llm_provider = LLM_PROVIDER
#         self.first_message: str = (_MCUBE_DEF.get("first_message") or "").strip()
#         self._first_message_sent: bool = False

#         self.stt_provider = STT_PROVIDER
#         self.stt_language_code = STT_LANGUAGE_CODE
#         self.stt_model_id = STT_MODEL_ID
#         self.detected_language: Optional[str] = None  # Track detected language per call

#         self.tts_provider = TTS_PROVIDER
#         self.tts_model = TTS_MODEL
#         self.tts_voice_id = TTS_VOICE_ID
#         self.tts_encoding = TTS_ENCODING

#         try:
#             self.tts_chunk_ms = max(5, min(500, int(float(_MCUBE_DEF.get("tts_chunk_ms", "200")))))
#         except Exception:
#             self.tts_chunk_ms = 200
#         try:
#             self.tts_gain = float(_MCUBE_DEF.get("tts_gain", "0.35"))
#         except Exception:
#             self.tts_gain = 0.35
#         try:
#             self.playback_pace_factor = float(_MCUBE_DEF.get("playback_pace_factor", "1.0"))
#         except Exception:
#             self.playback_pace_factor = 1.0
#         try:
#             self.checkpoint_every = max(0, int(float(_MCUBE_DEF.get("checkpoint_every", "10"))))
#         except Exception:
#             self.checkpoint_every = 10

#         # Passed through to ai_worker for `business_id_agents` lookups (cluster DB).
#         self.business_id: Optional[Any] = None
#         self.bot_id: Optional[Any] = None
#         self.agent_id: Optional[Any] = None
#         self.user_id: Optional[Any] = None
#         self.agent_email: Optional[str] = None
#         self.agent_name: Optional[str] = None

#         # STT (initialized after we receive MCube 'start' so we can load per-call config)
#         self.stt: object | None = None
#         self.stt_stream = None
#         self._stt_ready_event = asyncio.Event()
#         self._stt_task: Optional[asyncio.Task[None]] = None
#         self._ws_reader_task: Optional[asyncio.Task[None]] = None
#         self._playback_task: Optional[asyncio.Task[None]] = None
#         self._tts_consumer_task: Optional[asyncio.Task[None]] = None
#         self._control_consumer_task: Optional[asyncio.Task[None]] = None

#         # STT reconnection state
#         self._stt_reconnect_count = 0
#         self._stt_max_reconnect_attempts = 5
#         self._stt_reconnecting = False

#         # Audio resampling state: 8k -> 16k for STT frames.
#         self._resample_state = None
#         self._pcm16_16k_buffer = bytearray()

#         self._frame_16k_samples = 320  # 20ms @ 16k
#         self._frame_16k_bytes = self._frame_16k_samples * 2

#         self._sequence_lock = asyncio.Lock()

#         # Noise cancellation (Simple noise gate for manual frame processing)
#         # Phase 6.1: Implemented simple noise gate instead of LiveKit BVC
#         # LiveKit BVC requires AudioStream integration, not manual frame processing
#         # Noise gate filters out low-volume background noise (threshold-based)
#         # Phase 8.4: Reduced threshold from 500 to 300 for less aggressive clipping
#         # Phase 8.5: Further reduced to 100 to allow user speech through
#         self.noise_cancellation_enabled = True
#         self.noise_gate_threshold = 100  # Threshold for noise gate (RMS value)
#         self.noise_gate_enabled = True

#         # Background audio (not applicable for MCube telephony - only for LiveKit rooms)
#         # MCube uses direct WebSocket telephony, not LiveKit room-based architecture
#         self.background_audio_enabled = False

#     async def start(self):
#         # Start STT consumer and playback.
#         self._stt_task = asyncio.create_task(self._consume_stt_events(), name="mcube.stt_events")
#         self._playback_task = asyncio.create_task(self._playback_loop(), name="mcube.playback_loop")

#         # Start MQ consumer for this call's TTS queue.
#         self._tts_consumer_task = asyncio.create_task(self._consume_tts_queue(), name="mcube.tts_consumer")

#         # Start MQ consumer for this call's control queue (terminate/transfer).
#         self._control_consumer_task = asyncio.create_task(
#             self._consume_control_queue(), name="mcube.control_consumer"
#         )

#         # Read websocket events.
#         await self._ws_reader_loop()

#     async def _ws_reader_loop(self):
#         try:
#             async for raw in self.ws:
#                 try:
#                     msg = json.loads(raw)
#                 except Exception:
#                     log.warning("ws non-json frame ignored call_id=%s", self.call_id)
#                     continue

#                 try:
#                     match msg.get("event"):
#                         case "start":
#                             await self._on_start(msg)
#                         case "media":
#                             await self._on_media(msg)
#                         case "playedStream":
#                             await self._on_played_stream(msg)
#                         case "stop":
#                             break
#                         case _:
#                             # ignore unknown events
#                             pass
#                 except Exception:
#                     # Never crash the session because of one malformed frame.
#                     log.exception("ws event handling failed call_id=%s event=%s", self.call_id, msg.get("event"))
#         finally:
#             await self._cleanup()
#             # Cancel tasks gracefully.

#     async def _cleanup(self):
#         try:
#             if self.stream_id:
#                 await self.ws.send(json.dumps({"event": "terminate", "streamId": self.stream_id}))
#         except Exception:
#             pass

#         for pp in list(self.pending_plays.values()):
#             if not pp.future.done():
#                 pp.future.cancel()
#         self.pending_plays.clear()

#         # Stop STT input.
#         try:
#             if self.stt_stream is not None:
#                 self.stt_stream.end_input()
#         except Exception:
#             pass

#         # Close STT HTTP session (required for LiveKit plugins like Cartesia).
#         try:
#             if getattr(self, "_http_session", None) is not None:
#                 await self._http_session.close()
#         except Exception:
#             pass

#         # Cancel background tasks.
#         for t in [self._control_consumer_task, self._tts_consumer_task, self._stt_task, self._playback_task]:
#             if t and not t.done():
#                 t.cancel()

#     async def _on_start(self, msg: dict):
#         self.stream_id = msg["start"]["streamId"]
#         encoding = msg["start"]["mediaFormat"].get("encoding")
#         sample_rate = int(msg["start"]["mediaFormat"].get("sampleRate", MCUBE_AUDIO_SPEC.sample_rate))

#         if encoding != MCUBE_AUDIO_SPEC.encoding or sample_rate != MCUBE_AUDIO_SPEC.sample_rate:
#             log.warning(
#                 "MCube audio format mismatch call_id=%s encoding=%s sr=%s (expected mulaw@8kHz)",
#                 self.call_id,
#                 encoding,
#                 sample_rate,
#             )

#         log.info("mcube start call_id=%s stream_id=%s", self.call_id, self.stream_id)

#         await self._load_call_config()
#         await self._init_stt()

#         # Optional: bot speaks immediately on connect.
#         if self.first_message and not self._first_message_sent:
#             self._first_message_sent = True
#             await self._publish_utterance("", bot_say_text=self.first_message, language=self.stt_language_code)

#     async def _load_call_config(self) -> None:
#         """
#         Load per-call config from Redis (written by the /api/mcube/outbound-call endpoint).
#         If not present, fall back to env defaults.
#         """
#         try:
#             raw = await self.redis_client.get(f"mcube_call_config:{self.call_id}")
#             if not raw:
#                 return
#             if isinstance(raw, (bytes, bytearray)):
#                 raw_str = raw.decode("utf-8", errors="replace")
#             else:
#                 raw_str = str(raw)
#             cfg = json.loads(raw_str)
#         except Exception:
#             log.exception("failed to load mcube call config call_id=%s", self.call_id)
#             return

#         self.system_prompt = cfg.get("system_prompt", self.system_prompt)
#         self.first_message = (cfg.get("first_message") or self.first_message or "").strip()
#         self.llm_model = cfg.get("llm_model", self.llm_model)
#         self.llm_provider = cfg.get("llm_provider", self.llm_provider) or ""

#         self.stt_provider = cfg.get("stt_provider", self.stt_provider) or self.stt_provider
#         self.stt_language_code = cfg.get("stt_language_code", self.stt_language_code) or self.stt_language_code
#         self.stt_model_id = cfg.get("stt_model_id", self.stt_model_id) or self.stt_model_id

#         self.tts_provider = cfg.get("tts_provider", self.tts_provider) or self.tts_provider
#         self.tts_model = cfg.get("tts_model", self.tts_model) or self.tts_model
#         self.tts_voice_id = cfg.get("tts_voice_id", self.tts_voice_id) or self.tts_voice_id
#         self.tts_encoding = cfg.get("tts_encoding", self.tts_encoding) or self.tts_encoding

#         for ck, attr, caster in (
#             ("tts_chunk_ms", "tts_chunk_ms", lambda x: max(5, min(500, int(float(x))))),
#             ("tts_gain", "tts_gain", lambda x: float(x)),
#             ("playback_pace_factor", "playback_pace_factor", lambda x: float(x)),
#             ("checkpoint_every", "checkpoint_every", lambda x: max(0, int(float(x)))),
#         ):
#             if cfg.get(ck) is not None:
#                 try:
#                     setattr(self, attr, caster(cfg.get(ck)))
#                 except Exception:
#                     pass

#         id_map = (
#             ("business_id", "business_id"),
#             ("bot_id", "bot_id"),
#             ("agent_id", "agent_id"),
#             ("user_id", "user_id"),
#             ("agent_email", "agent_email"),
#             ("email", "agent_email"),
#             ("agent_name", "agent_name"),
#         )
#         for json_key, attr in id_map:
#             if cfg.get(json_key) is not None:
#                 setattr(self, attr, cfg.get(json_key))

#     async def _init_stt(self) -> None:
#         """
#         Initialize STT stream for the configured provider.
#         """
#         # Reset audio buffers + resampler so early/old frames don't leak into the new stream.
#         self._resample_state = None
#         self._pcm16_16k_buffer = bytearray()
#         self._stt_ready_event.clear()

#         elevenlabs_key = os.getenv("ELEVENLABS_API_KEY") or os.getenv("ELEVEN_API_KEY") or ""
#         cartesia_key = os.getenv("CARTESIA_API_KEY") or ""
#         deepgram_key = os.getenv("DEEPGRAM_API_KEY") or ""

#         # Create an explicit http session for STT plugins.
#         # LiveKit plugins require this when used outside the standard agent worker job context.
#         self._http_session = aiohttp.ClientSession()

#         log.info(
#             "mcube stt init provider=%s model_id=%s lang=%s noise_cancellation=%s",
#             self.stt_provider,
#             self.stt_model_id,
#             self.stt_language_code,
#             self.noise_cancellation_enabled,
#         )

#         # Phase 6.1: Simple noise gate - no initialization needed

#         if self.stt_provider == "deepgram":
#             if not deepgram_key:
#                 log.warning("DEEPGRAM_API_KEY not set; falling back to elevenlabs STT")
#             else:
#                 self.stt = deepgram.STT(
#                     model=self.stt_model_id or "nova-2",
#                     language=self.stt_language_code or "en",
#                     api_key=deepgram_key,
#                     http_session=self._http_session,
#                 )
#                 self.stt_stream = self.stt.stream()
#                 self._stt_ready_event.set()
#                 return

#         if self.stt_provider == "cartesia":
#             if not cartesia_key:
#                 log.warning("CARTESIA_API_KEY not set; falling back to elevenlabs STT")
#             else:
#                 self.stt = cartesia.STT(
#                     model=self.stt_model_id or "ink-whisper",
#                     language=self.stt_language_code or "en",
#                     encoding="pcm_s16le",
#                     sample_rate=16000,
#                     api_key=cartesia_key,
#                     http_session=self._http_session,
#                 )
#                 self.stt_stream = self.stt.stream()
#                 self._stt_ready_event.set()
#                 return

#         # Default: ElevenLabs STT
#         # In this LiveKit version, elevenlabs.STT may be non-streaming.
#         # We wrap it with StreamAdapter so our WS bridge can still drive it
#         # using the same streaming audio-frame pipeline.
#         # Configure for 200 concurrent calls with extended timeouts
#         base_stt = elevenlabs.STT(
#             model_id="scribe_v2_realtime",
#             language_code=self.stt_language_code,
#             api_key=elevenlabs_key,
#             http_session=self._http_session,
#         )

#         try:
#             self.stt = base_stt
#             self.stt_stream = self.stt.stream()
#         except NotImplementedError:
#             # ElevenLabs STT is non-streaming in our installed LiveKit version.
#             # Use LiveKit's StreamAdapter + a real VAD implementation so we can
#             # drive the rest of the pipeline as if STT was streaming.
#             vad = SileroVAD.load(sample_rate=16000, force_cpu=True)
#             self.stt = StreamAdapter(stt=base_stt, vad=vad)
#             self.stt_stream = self.stt.stream()
#         self._stt_ready_event.set()

#     async def _reinit_stt_with_language(self, language: str) -> None:
#         """
#         Reinitialize STT with a new language after detection.
#         """
#         log.info("mcube reinit_stt call_id=%s new_language=%s", self.call_id, language)
        
#         # Close existing STT
#         try:
#             if self.stt_stream:
#                 self.stt_stream.end_input()
#         except Exception:
#             pass
        
#         # Close HTTP session
#         try:
#             if getattr(self, "_http_session", None) is not None:
#                 await self._http_session.close()
#         except Exception:
#             pass

#         # Reset state
#         self._resample_state = None
#         self._pcm16_16k_buffer = bytearray()
#         self._stt_ready_event.clear()
#         self.stt_language_code = language

#         # Reinitialize with new language
#         await self._init_stt()

#     async def _on_media(self, msg: dict):
#         if not self.stream_id:
#             return
#         if not self._stt_ready_event.is_set():
#             # Ignore early frames until STT is initialized with per-call config.
#             return

#         media = msg.get("media") or {}
#         if media.get("track") != "inbound":
#             return

#         b64 = media.get("payload") or ""
#         try:
#             mulaw_bytes = from_base64_str(b64)
#             pcm16_8k = audioop.ulaw2lin(mulaw_bytes, 2)
#         except Exception:
#             log.warning("invalid media payload ignored call_id=%s bytes=%s", self.call_id, len(b64))
#             return

#         # For now, we buffer/resample 8k -> 16k and feed fixed-size 20ms frames to STT.
#         pcm16_16k, self._resample_state = audioop.ratecv(
#             pcm16_8k,
#             2,
#             1,
#             8000,
#             16000,
#             self._resample_state,
#         )

#         # Phase 6.1: Apply simple noise gate (filters low-volume background noise)
#         if self.noise_gate_enabled:
#             # Calculate RMS (root mean square) to detect noise level
#             samples = array.array('h', pcm16_16k)
#             if samples:
#                 rms = sum(abs(s) for s in samples) / len(samples)
#                 # If below threshold, suppress the audio (set to zeros)
#                 if rms < self.noise_gate_threshold:
#                     pcm16_16k = b'\x00' * len(pcm16_16k)

#         self._pcm16_16k_buffer.extend(pcm16_16k)

#         while len(self._pcm16_16k_buffer) >= self._frame_16k_bytes:
#             frame_bytes = bytes(self._pcm16_16k_buffer[: self._frame_16k_bytes])
#             del self._pcm16_16k_buffer[: self._frame_16k_bytes]
#             frame = rtc.AudioFrame(
#                 frame_bytes,
#                 sample_rate=16000,
#                 num_channels=1,
#                 samples_per_channel=self._frame_16k_samples,
#             )
#             assert self.stt_stream is not None
#             self.stt_stream.push_frame(frame)

#     async def _on_played_stream(self, msg: dict):
#         name = msg.get("name")
#         if not name:
#             return

#         pp = self.pending_plays.get(name)
#         if pp and not pp.future.done():
#             pp.future.set_result(None)
#             # Clear it now that it's acknowledged.
#             self.pending_plays.pop(name, None)

#     async def _consume_stt_events(self):
#         while self._stt_reconnect_count < self._stt_max_reconnect_attempts:
#             try:
#                 await self._stt_ready_event.wait()
#                 assert self.stt_stream is not None

#                 async for ev in self.stt_stream:
#                     if ev.type == SpeechEventType.INTERIM_TRANSCRIPT:
#                         if self.bot_playing:
#                             # Phase 3.4: Enhanced barge-in logic to distinguish true interruptions from backchanneling
#                             alt = ev.alternatives[0] if ev.alternatives else None
#                             partial = (alt.text if alt else "").strip().lower()
                            
#                             # Filter out backchanneling (uh-huh, okay, right, mm-hmm, etc.)
#                             backchannel_words = {"uh-huh", "uhhuh", "okay", "ok", "right", "yes", "yeah", "mm-hmm", "mhm", "mmhmm"}
#                             is_backchannel = any(word in partial for word in backchannel_words)
                            
#                             # Only trigger barge-in if:
#                             # 1. Not a backchannel acknowledgment
#                             # 2. Text is substantial enough (at least 2 characters)
#                             # 3. Not just noise
#                             if partial and not is_backchannel and len(partial) >= 2:
#                                 log.info("barge-in triggered call_id=%s partial=%r", self.call_id, partial)
#                                 await self._barge_in_clear()
#                         continue

#                     if ev.type == SpeechEventType.FINAL_TRANSCRIPT:
#                         alt = ev.alternatives[0] if ev.alternatives else None
#                         text = (alt.text if alt else "").strip() if alt else ""
#                         if text:
#                             # The actual utterance publish happens from websocket reader loop
#                             # via a shared hook. We'll store it and let the websocket loop schedule.
#                             await self._handle_final_transcript(text)

#             except (aiohttp.ClientConnectionResetError, aiohttp.ClientConnectionError, ConnectionResetError) as e:
#                 self._stt_reconnect_count += 1
#                 if self._stt_reconnect_count >= self._stt_max_reconnect_attempts:
#                     log.error("STT reconnection failed after %d attempts call_id=%s error=%s", 
#                               self._stt_reconnect_count, self.call_id, str(e))
#                     break
                
#                 # Exponential backoff: 1s, 2s, 4s, 8s, 16s
#                 backoff_delay = min(2 ** (self._stt_reconnect_count - 1), 16)
#                 log.warning("STT connection reset attempt=%d/%d call_id=%s reconnecting in %ds error=%s",
#                            self._stt_reconnect_count, self._stt_max_reconnect_attempts, 
#                            self.call_id, backoff_delay, str(e))
                
#                 await asyncio.sleep(backoff_delay)
                
#                 # Reinitialize STT
#                 try:
#                     if self._http_session and not self._http_session.closed:
#                         await self._http_session.close()
#                     self._http_session = aiohttp.ClientSession()
#                     await self._init_stt()
#                     log.info("STT reconnection successful call_id=%s attempt=%d", 
#                             self.call_id, self._stt_reconnect_count)
#                 except Exception as init_error:
#                     log.error("STT reinitialization failed call_id=%s error=%s", 
#                              self.call_id, str(init_error))
                    
#             except Exception as e:
#                 log.error("STT event processing error call_id=%s error=%s", self.call_id, str(e))
#                 break

#     async def _handle_final_transcript(self, text: str):
#         # Avoid overlap: only accept a new utterance when we're not actively playing.
#         if self.bot_playing or self.response_pending:
#             # If barge-in already cleared, we allow; otherwise drop.
#             return

#         # Detect language from first utterance and lock it
#         if self.detected_language is None and text:
#             self.detected_language = detect_language(text)
#             log.info("mcube language_detected call_id=%s language=%s text=%r", self.call_id, self.detected_language, text[:50])
            
#             # Reinitialize STT with detected language if different from default
#             if self.detected_language != self.stt_language_code:
#                 await self._reinit_stt_with_language(self.detected_language)

#         # Allocate per-utterance sequence id.
#         # (We still store in redis in case we restart; for now it's best-effort.)
#         await self._publish_utterance(text, language=self.detected_language or self.stt_language_code)

#     async def _publish_utterance(self, text: str, *, bot_say_text: str | None = None, language: str = "en"):
#         # Sequence id: try to atomically allocate from redis. If redis is down, fall back to time-based.
#         async with self._sequence_lock:
#             try:
#                 seq = await self.redis_client.incr(f"mcube:seq:{self.call_id}")
#                 await self.redis_client.expire(f"mcube:seq:{self.call_id}", 3600)
#             except Exception:
#                 seq = int(time.time())

#         self.active_sequence_id = int(seq)
#         # Bot should only be considered "speaking" after we start sending TTS segments.
#         self.bot_playing = False
#         self.response_pending = True
#         self._tts_done_received = False
#         self._tts_final_chunk_seq = None

#         # Publish utterance to AI worker.
#         payload = {
#             "call_id": self.call_id,
#             "sequence_id": int(seq),
#             "transcript": text,
#             "system_prompt": self.system_prompt,
#             "llm_model": self.llm_model,
#             "llm_provider": self.llm_provider,
#             "tts_provider": self.tts_provider,
#             "tts_model": self.tts_model,
#             "tts_voice_id": self.tts_voice_id,
#             "tts_encoding": self.tts_encoding,
#             "tts_chunk_ms": self.tts_chunk_ms,
#             "stt_language_code": language,
#             "stt_provider": self.stt_provider,
#             "stt_model_id": self.stt_model_id,
#         }
#         if bot_say_text:
#             payload["bot_say_text"] = bot_say_text

#         for k in ("business_id", "bot_id", "agent_id", "user_id", "agent_email", "agent_name"):
#             v = getattr(self, k, None)
#             if v is not None:
#                 payload[k] = v

#         await publish_json(
#             channel=self.rabbit_channel,
#             queue_name=AI_UTTERANCES_QUEUE,
#             payload=payload,
#         )
#         log.info(
#             "published utterance call_id=%s seq=%s text=%r tts_provider=%s tts_model=%s tts_voice_id=%s tts_encoding=%s language=%s",
#             self.call_id,
#             seq,
#             text,
#             self.tts_provider,
#             self.tts_model,
#             self.tts_voice_id,
#             self.tts_encoding,
#             language,
#         )

#     async def _consume_tts_queue(self):
#         # This consumer listens to per-call queue `tts.{call_id}`.
#         if not self.stream_id:
#             # Wait until start event arrives.
#             while not self.stream_id:
#                 await asyncio.sleep(0.01)

#         queue_name = tts_queue_name(self.call_id)

#         # Declare queue and consume.
#         queue = await self.rabbit_channel.declare_queue(queue_name, durable=True)

#         async with queue.iterator() as q_iter:
#             async for message in q_iter:
#                 async with message.process(requeue=False):
#                     data = message.body.decode("utf-8")
#                     try:
#                         payload = json.loads(data)
#                     except Exception:
#                         continue

#                     seq = int(payload.get("sequence_id", -1))
#                     if self.cancelled_sequence_id is not None and seq == self.cancelled_sequence_id:
#                         continue

#                     if payload.get("type") == "tts_audio_chunk":
#                         if self.active_sequence_id is None or seq != self.active_sequence_id:
#                             continue
#                         chunk_seq = int(payload["chunk_seq"])
#                         pcm16_8k_bytes = from_base64_str(payload["audio_pcm_b64"])
#                         if chunk_seq == 0:
#                             log.info(
#                                 "tts_audio_chunk_first call_id=%s seq=%s bytes=%s",
#                                 self.call_id,
#                                 seq,
#                                 len(pcm16_8k_bytes),
#                             )
#                         await self.tts_segments.put((seq, chunk_seq, pcm16_8k_bytes))

#                     elif payload.get("type") == "tts_done":
#                         if self.active_sequence_id is None or seq != self.active_sequence_id:
#                             continue
#                         self._tts_done_received = True
#                         self._tts_final_chunk_seq = int(payload.get("final_chunk_seq", -1))

#     def _extract_transfer_to(self, payload: dict) -> Optional[str]:
#         """
#         Extract transfer target from tool/control payload.

#         We accept several keys so different producers (ai_worker variants) stay compatible.
#         """
#         transfer_to = (
#             payload.get("transfer_to")
#             or payload.get("phone_number")
#             or payload.get("transfer_number")
#             or payload.get("sip_uri")
#         )
#         if not transfer_to or not isinstance(transfer_to, str):
#             return None

#         transfer_to = transfer_to.strip()
#         if not transfer_to:
#             return None

#         # If passed as `sip:+918...@host` convert to E.164-like number.
#         if transfer_to.lower().startswith("sip:"):
#             try:
#                 transfer_to = transfer_to.split(":", 1)[1].split("@", 1)[0]
#             except Exception:
#                 return None

#         return transfer_to

#     def _force_stop_playback(self, *, cancelled_seq: Optional[int] = None) -> None:
#         """
#         Immediately stop any bot playback and cancel pending playedStream futures.
#         This is used for MCube `terminate/transfer` and should be more aggressive
#         than barge-in clear.
#         """
#         if cancelled_seq is None:
#             cancelled_seq = self.active_sequence_id

#         self.cancelled_sequence_id = cancelled_seq
#         self.bot_playing = False
#         self.response_pending = False
#         self.active_sequence_id = None

#         self._tts_done_received = False
#         self._tts_final_chunk_seq = None

#         # Cancel any pending playedStream waits.
#         for _, pp in list(self.pending_plays.items()):
#             if not pp.future.done():
#                 pp.future.cancel()
#         self.pending_plays.clear()

#         # Drain any queued segments so playback loop doesn't wait on them.
#         try:
#             while True:
#                 self.tts_segments.get_nowait()
#         except asyncio.QueueEmpty:
#             pass

#     async def _consume_control_queue(self) -> None:
#         """
#         Consume per-call control queue and execute MCube call actions (terminate/transfer).
#         """
#         queue_name = control_queue_name(self.call_id)
#         queue = await self.rabbit_channel.declare_queue(queue_name, durable=True)

#         async with queue.iterator() as q_iter:
#             async for message in q_iter:
#                 async with message.process(requeue=False):
#                     try:
#                         payload = json.loads(message.body.decode("utf-8"))
#                     except Exception:
#                         continue

#                     await self._handle_control_message(payload)

#     async def _handle_control_message(self, payload: dict) -> None:
#         msg_type = payload.get("type")
#         seq = payload.get("sequence_id")
#         cancelled_seq: Optional[int] = int(seq) if isinstance(seq, (int, str)) and str(seq).isdigit() else None

#         # Only handle tool actions that apply to the currently active speaking sequence.
#         if (
#             cancelled_seq is not None
#             and self.active_sequence_id is not None
#             and cancelled_seq != self.active_sequence_id
#         ):
#             # Stale control message; ignore.
#             return

#         if msg_type == "mcube_terminate":
#             if self.stream_id:
#                 await self.ws.send(json.dumps({"event": "terminate", "streamId": self.stream_id}))
#             self._force_stop_playback(cancelled_seq=cancelled_seq)
#             return

#         if msg_type == "mcube_transfer":
#             transfer_to = self._extract_transfer_to(payload)
#             if transfer_to and self.stream_id:
#                 await self.ws.send(
#                     json.dumps(
#                         {
#                             "event": "transfer",
#                             "streamId": self.stream_id,
#                             "transferTo": transfer_to,
#                             "showOriginalCallerId": True,
#                         }
#                     )
#                 )
#             self._force_stop_playback(cancelled_seq=cancelled_seq)
#             return

#         # Unknown control message types are ignored.

#     async def _playback_loop(self):
#         # Real-time paced playback.
#         # We intentionally DO NOT block on `playedStream` acks: waiting on per-chunk acks can create
#         # periodic gaps if MCube doesn't ack every segment. Instead we pace by chunk duration.
#         checkpoint_every = self.checkpoint_every

#         while True:
#             seq, chunk_seq, pcm16_8k_bytes = await self.tts_segments.get()

#             # If we were cancelled mid-playback, drop remaining chunks.
#             if self.cancelled_sequence_id is not None and seq == self.cancelled_sequence_id:
#                 continue
#             if self.active_sequence_id is None or seq != self.active_sequence_id:
#                 continue
#             if not self.stream_id:
#                 continue

#             name = f"{self.call_id}_seg_{chunk_seq}"
#             self.bot_playing = True

#             # Send playAudio + checkpoint as per MCube docs.
#             # Empirically, some synthesized audio can clip and sound like pops/gunshots after μ-law companding.
#             # Apply a small gain reduction before converting to μ-law.
#             try:
#                 import audioop

#                 gain = float(self.tts_gain)
#                 pcm16_8k_bytes = audioop.mul(pcm16_8k_bytes, 2, gain)
#             except Exception:
#                 pass
#             mulaw = pcm16_to_mulaw(pcm16_8k_bytes)
#             await self.ws.send(
#                 json.dumps(
#                     {
#                         "event": "playAudio",
#                         "media": {
#                             "contentType": MCUBE_AUDIO_SPEC.encoding,
#                             "sampleRate": MCUBE_AUDIO_SPEC.sample_rate,
#                             "payload": base64.b64encode(mulaw).decode("ascii"),
#                             "name": name,
#                         },
#                     }
#                 )
#             )

#             # Optional: send checkpoint every N chunks (and always the first) for observability.
#             if checkpoint_every > 0 and (chunk_seq == 0 or (chunk_seq % checkpoint_every == 0)):
#                 await self.ws.send(
#                     json.dumps(
#                         {
#                             "event": "checkpoint",
#                             "streamId": self.stream_id,
#                             "name": name,
#                         }
#                     )
#                 )

#             # Pace by duration of this chunk (PCM16 mono @ 8k).
#             try:
#                 chunk_duration_s = (len(pcm16_8k_bytes) / 2) / MCUBE_AUDIO_SPEC.sample_rate
#                 pace = float(self.playback_pace_factor)
#                 await asyncio.sleep(max(0.0, chunk_duration_s * 0.98 * pace))
#             except Exception:
#                 await asyncio.sleep(0.1)

#             # If this is the final chunk and we already got tts_done, stop.
#             if (
#                 self._tts_done_received
#                 and self._tts_final_chunk_seq is not None
#                 and chunk_seq >= self._tts_final_chunk_seq
#                 and self.tts_segments.empty()
#             ):
#                 self.bot_playing = False
#                 self.response_pending = False
#                 self.cancelled_sequence_id = None
#                 self.active_sequence_id = None
#                 self._tts_done_received = False
#                 self._tts_final_chunk_seq = None

#     async def _barge_in_clear(self):
#         if not self.stream_id:
#             return
#         if not self.bot_playing:
#             return

#         log.info("barge-in clear call_id=%s seq=%s", self.call_id, self.active_sequence_id)
#         self.cancelled_sequence_id = self.active_sequence_id
#         self.bot_playing = False
#         self.response_pending = False
#         self.active_sequence_id = None

#         # Cancel any pending playedStream waits.
#         for name, pp in list(self.pending_plays.items()):
#             if not pp.future.done():
#                 pp.future.cancel()
#             self.pending_plays.pop(name, None)

#         # Clear current playback on MCube.
#         try:
#             await self.ws.send(json.dumps({"event": "clearAudio", "streamId": self.stream_id}))
#         except Exception:
#             pass

#         # Drain any queued segments for the old sequence.
#         try:
#             while True:
#                 self.tts_segments.get_nowait()
#         except asyncio.QueueEmpty:
#             pass


# async def _ws_handler(websocket, *, rabbit_channel, redis_client):
#     # websockets>=12 passes only `websocket` to the handler.
#     # The request path is available as `websocket.path`.
#     path = getattr(websocket, "path", "") or ""
#     # Some websocket versions/proxies may not populate `websocket.path` reliably.
#     req = getattr(websocket, "request", None)
#     if not path and req is not None:
#         path = getattr(req, "path", "") or ""

#     log.info(
#         "ws accepted path=%s remote=%s call_hint_from_path=%s",
#         path,
#         getattr(websocket, "remote_address", None),
#         path.strip("/").split("/")[-1] if path else "",
#     )

#     # Path expected: /bid/websocket/{callId} (or /ws/{callId} depending on MCube env)
#     parts = path.strip("/").split("/") if path else []
#     call_id = parts[-1] if parts else ""
#     if not call_id:
#         await websocket.close(code=1008, reason="missing call_id")
#         return

#     sess = McubeCallSession(
#         call_id=call_id,
#         websocket=websocket,
#         rabbit_channel=rabbit_channel,
#         redis_client=redis_client,
#     )
#     await sess.start()


# async def main():
#     logging.basicConfig(level=logging.INFO)

#     connection = await connect()
#     channel = await get_channel(connection)

#     redis_client = redis_async.from_url(REDIS_URL, decode_responses=False)

#     log.info("mcube ws bridge listening on %s:%s", WS_HOST, WS_PORT)
#     stop = asyncio.Future()

#     def _signal_handler(*_):
#         if not stop.done():
#             stop.set_result(True)

#     loop = asyncio.get_running_loop()
#     for sig in [signal.SIGINT, signal.SIGTERM]:
#         try:
#             loop.add_signal_handler(sig, _signal_handler)
#         except NotImplementedError:
#             # Windows fallback (not expected here)
#             pass

#     async def runner():
#         async with websockets.serve(
#             lambda ws: _ws_handler(ws, rabbit_channel=channel, redis_client=redis_client),
#             WS_HOST,
#             WS_PORT,
#             max_size=2**23,
#             ping_interval=20,
#             ping_timeout=60,
#         ):
#             await stop

#     await runner()


# if __name__ == "__main__":
#     asyncio.run(main())

