import asyncio

import aiohttp

import base64

import json

import logging

import os

import sys

import time

from dataclasses import dataclass

from typing import Any, AsyncGenerator, Optional

import types

import re

import openai



# Conversation context storage per call_id to maintain memory across utterances

_call_contexts: dict[str, "ChatContext"] = {}
_direct_call_contexts: dict[str, list[dict[str, Any]]] = {}
# Caller first name / preferred name from conversation (phone calls only; bounded per process).
_caller_names: dict[str, str] = {}
# Set when the caller explicitly picks English / Hindi / Kannada in the fast-path handler.
_caller_language_locked: dict[str, str] = {}
# True after the scripted "which language are you comfortable with" line was used for this call.
_language_preference_prompt_sent: dict[str, bool] = {}
# Latest explicit en/hi/kn from the caller's words (may be set before we know their name).
_caller_explicit_language: dict[str, str] = {}
# True after we used the "glad to speak X, may I know your name" line without a name yet.
_language_prompted_without_name: dict[str, bool] = {}

_NOT_NAME_WORDS: frozenset[str] = frozenset(
    {
        "hello",
        "hi",
        "hey",
        "yes",
        "no",
        "yeah",
        "ya",
        "okay",
        "ok",
        "sure",
        "english",
        "hindi",
        "hinthi",
        "hindee",
        "hindhi",
        "hinglish",
        "kannada",
        "kanada",
        "namaste",
        "thanks",
        "thank",
        "you",
        "please",
        "here",
        "there",
        "interested",
        "calling",
        "good",
        "fine",
        "well",
        "not",
        "the",
        "a",
        "an",
        "is",
        "are",
        "was",
        "were",
        "doing",
        "speaking",
        "listening",
        "something",
        "nothing",
        "same",
        "busy",
        "about",
        "from",
        "when",
        "what",
        "why",
        "how",
    }
)


def _format_caller_name_for_speech(raw: str) -> str:
    def fmt_word(w: str) -> str:
        if "-" in w:
            return "-".join(fmt_word(x) for x in w.split("-") if x)
        if len(w) <= 1:
            return w.upper()
        return w[:1].upper() + w[1:].lower()

    parts = [p for p in re.split(r"\s+", raw.strip()) if p]
    return " ".join(fmt_word(p) for p in parts)


def _looks_like_person_name(candidate: str) -> bool:
    c = candidate.strip()
    if len(c) < 2 or len(c) > 60:
        return False
    words = c.split()
    if len(words) > 4:
        return False
    for w in words:
        wl = re.sub(r"[^a-z]", "", w.lower())
        if wl and wl in _NOT_NAME_WORDS:
            return False
        if not re.match(r"^[A-Za-z][A-Za-z'.-]{0,29}$", w):
            return False
    return True


_NAME_CAPTURE_STOP = (
    r"(?:and|but|I|we|can|want|need|from|in|for|at|on|with|here|there|the|is|are|was|were)\b"
)


def _trim_name_capture_tail_noise(c: str) -> str:
    """Drop trailing filler tokens STT often glues after a name (e.g. 'Priya here', 'John from Bangalore')."""
    junk = frozenset(
        {
            "from",
            "in",
            "for",
            "at",
            "on",
            "with",
            "here",
            "there",
            "sir",
            "madam",
            "maam",
            "mam",
            "ji",
            "speaking",
            "calling",
            "listening",
        }
    )
    parts = [p for p in c.strip().split() if p]
    while len(parts) > 1:
        wl = re.sub(r"[^a-z]", "", parts[-1].lower())
        if wl not in junk:
            break
        parts.pop()
    return " ".join(parts)


def _extract_caller_name(text: str) -> Optional[str]:
    """Best-effort name from Latin-script transcripts (STT)."""
    raw = (text or "").strip()
    if not raw:
        return None

    explicit_patterns = [
        re.compile(
            rf"\bmy name is[,\s]+([A-Za-z][A-Za-z\s'.-]{{0,80}}?)(?=[,.;!?]|$|\s+{_NAME_CAPTURE_STOP})",
            re.I,
        ),
        re.compile(
            rf"\bcall me[,\s]+([A-Za-z][A-Za-z'.-]{{0,40}}?)(?=[,.;!?]|$|\s+{_NAME_CAPTURE_STOP})",
            re.I,
        ),
        re.compile(
            rf"\bthis is[,\s]+([A-Za-z][A-Za-z'.-]{{0,40}}?)(?=[,.;!?]|$|\s+{_NAME_CAPTURE_STOP})",
            re.I,
        ),
        re.compile(
            rf"\bi am[,\s]+([A-Za-z][A-Za-z\s'.-]{{0,40}}?)(?=[,.;!?]|$|\s+{_NAME_CAPTURE_STOP})",
            re.I,
        ),
        re.compile(
            rf"\bi'?m[,\s]+([A-Za-z][A-Za-z\s'.-]{{0,40}}?)(?=[,.;!?]|$|\s+{_NAME_CAPTURE_STOP})",
            re.I,
        ),
    ]
    for pat in explicit_patterns:
        m = pat.search(raw)
        if not m:
            continue
        cand = m.group(1).strip().strip("'\"")
        cand = re.split(r"[,.;!?]", cand)[0].strip()
        cand = _trim_name_capture_tail_noise(cand)
        if _looks_like_person_name(cand):
            return _format_caller_name_for_speech(cand)

    # Single-line reply that is likely only a name (after name question).
    line = re.sub(r"[,.;!?]+$", "", raw).strip()
    if "\n" in line:
        line = line.split("\n", 1)[0].strip()
    norm_line = re.sub(r"\s+", " ", line).strip()
    tokens = re.findall(r"[A-Za-z][A-Za-z'.-]*", norm_line)
    if not tokens:
        return None
    if len(tokens) == 1:
        cand = tokens[0]
        if cand.lower() not in _NOT_NAME_WORDS and _looks_like_person_name(cand):
            return _format_caller_name_for_speech(cand)
        return None
    if len(tokens) == 2 and tokens[0].lower() in {"hi", "hey", "hello"}:
        cand = tokens[1]
        if cand.lower() not in _NOT_NAME_WORDS and _looks_like_person_name(cand):
            return _format_caller_name_for_speech(cand)
        return None
    if len(tokens) == 2 and " ".join(tokens) == norm_line:
        cand = " ".join(tokens)
        cand = _trim_name_capture_tail_noise(cand)
        if cand.lower() not in _NOT_NAME_WORDS and _looks_like_person_name(cand):
            return _format_caller_name_for_speech(cand)

    return None


def _maybe_capture_caller_name(call_id: str, user_text: str) -> bool:
    """Parse caller name from transcript; returns True when we newly stored or updated it."""
    ext = _extract_caller_name(user_text)
    if not ext:
        return False
    prev = _caller_names.get(call_id)
    _caller_names[call_id] = ext
    if prev != ext:
        _language_prompted_without_name.pop(call_id, None)
    return prev != ext


@dataclass(frozen=True)
class DirectToolCall:
    name: str
    arguments: str

# Phase 9.1: OpenAI client for direct API calls (bypass LiveKit gateway)
_openai_client = None

def get_openai_client():
    global _openai_client
    if _openai_client is None:
        api_key = os.getenv("OPENAI_API_KEY")
        if not api_key:
            raise RuntimeError("OPENAI_API_KEY not set")
        _openai_client = openai.AsyncOpenAI(api_key=api_key)
    return _openai_client





from .mcube_language import detect_language_preference, resolve_detected_language


def sanitize_text_for_tts(text: str) -> str:
    """
    Normalize text before TTS: snake_case, slug-hyphens, glued CamelCase, and markdown crumbs
    often sound like 'underscore', dashes, or chopped words on phone TTS.
    """
    if not text:
        return ""
    t = text.replace("\u00a0", " ").replace("\u200b", "")
    t = re.sub(r"_+", " ", t)
    # Slug-style hyphens between letters/digits (SNN-Felicity) -> spaces for natural speech.
    t = re.sub(r"(?<=[A-Za-z0-9])-(?=[A-Za-z0-9])", " ", t)
    # Drop markdown/code characters the model sometimes emits into spoken lines.
    t = re.sub(r"[`*#|]+", " ", t)
    # Split glued words: fooBar -> foo Bar (helps some compound brand tokens).
    t = re.sub(r"(?<=[a-z])(?=[A-Z])", " ", t)
    t = re.sub(r"(?<=[A-Za-z])(?=[0-9])", " ", t)
    t = re.sub(r"(?<=[0-9])(?=[A-Za-z])", " ", t)
    t = re.sub(r"[ \t\r\n]+", " ", t).strip()
    return t


def collapse_spoken_repetition(text: str) -> str:
    """
    Strip stutter-style repeats from model or scripted lines before TTS (e.g. "Hello Hello",
    duplicated sentences).
    """
    t = (text or "").strip()
    if not t:
        return t
    for _ in range(24):
        n = re.sub(r"(\b[\w'-]{2,48}\b)(\s+\1\b)+", r"\1", t, flags=re.IGNORECASE)
        if n == t:
            break
        t = n
    parts = re.split(r"(?<=[.!?…])\s+", t)
    if len(parts) < 2:
        return t
    out: list[str] = []
    prev_key: Optional[str] = None
    for p in parts:
        s = p.strip()
        if not s:
            continue
        key = re.sub(r"\s+", " ", s.lower())
        if key == prev_key:
            continue
        out.append(s)
        prev_key = key
    joined = " ".join(x.strip() for x in out if x.strip()).strip()
    return joined or t


def prepare_text_for_phone_tts(text: str) -> str:
    """Normalize then remove duplicate phrasing for natural phone playback."""
    return collapse_spoken_repetition(sanitize_text_for_tts(text))


def _fast_response_for_user_text(
    user_text: str,
    call_id: str,
    *,
    name_newly_learned: bool,
) -> Optional[str]:
    text = (user_text or "").strip()
    if not text:
        return None

    name = _caller_names.get(call_id)
    words = set(re.findall(r"[a-zA-Z']+", text.lower()))
    preferred_language = detect_language_preference(text)
    lang_label = {"en": "English", "hi": "Hindi", "kn": "Kannada"}

    # Already acknowledged this language choice — avoid repeating the same scripted line.
    if (
        preferred_language
        and name
        and _caller_language_locked.get(call_id) == preferred_language
    ):
        return None

    # Caller named a language before sharing their name — ask for name first.
    if preferred_language and not name:
        ln = lang_label.get(preferred_language, "that language")
        if (
            _language_prompted_without_name.get(call_id)
            and _caller_explicit_language.get(call_id) == preferred_language
        ):
            return "May I have your name, please?"
        _language_prompted_without_name[call_id] = True
        return f"I'd be glad to speak in {ln}. First, may I know your name, please?"

    # Caller chose a language and we already know their name.
    if preferred_language and name:
        _caller_language_locked[call_id] = preferred_language
        if preferred_language == "en":
            return (
                f"Sure, {name}, we can continue in English. "
                "This is Alisha from SNN Estates, calling about SNN Felicity."
            )
        if preferred_language == "hi":
            return (
                f"Bilkul, {name}, hum Hindi mein baat kar sakte hain. Main Alisha, SNN Estates se."
            )
        if preferred_language == "kn":
            return (
                f"{name}, I'll keep it simple in Kannada or plain English — whichever is easier for you."
            )

    # Just learned their name this turn; ask preferred language next.
    if (
        name_newly_learned
        and name
        and not preferred_language
        and call_id not in _caller_language_locked
        and not _caller_explicit_language.get(call_id)
    ):
        _language_preference_prompt_sent[call_id] = True
        return (
            f"Thank you, {name}. Which language are you comfortable speaking in — "
            "English, Hindi, or Kannada?"
        )

    compact_reply = re.sub(r"\s+", " ", text.lower()).strip()
    if {"who", "this"} <= words or "who is this" in text.lower():
        if not name:
            return (
                "This is Alisha from SNN Estates, calling about your enquiry for SNN Felicity. "
                "May I know your name, please?"
            )
        return (
            f"This is Alisha from SNN Estates, calling about your enquiry for SNN Felicity, {name}. "
            "Is this a good time to speak?"
        )

    if words <= {"hello", "hi", "hey"}:
        if not name:
            return (
                "Hello, this is Alisha from SNN Estates. May I know your name, please?"
            )
        return (
            f"Hi {name}, I'm here with you from SNN Estates. "
            "What would you like to know about SNN Felicity?"
        )

    if compact_reply in {"yes", "yeah", "ya", "okay", "ok"}:
        if not name:
            return (
                "Thank you. May I know your name, please? "
                "Then we can choose a language you're comfortable with."
            )
        return (
            f"Thank you, {name}. I'll keep this brief. "
            "Are you looking at SNN Felicity for living or investment?"
        )

    return None





def augment_system_prompt_with_language(system_prompt: str, language: str) -> str:

    """

    Add language-specific instructions to the system prompt based on detected language.

    """

    if language == "hi":

        language_instruction = (
            "\n\nLANGUAGE INSTRUCTION: Respond only in Hindi or Hinglish (Hindi in Latin script). "
            "Do not use Kannada script or Kannada vocabulary unless quoting the user. "
            "Use natural conversational Hindi as spoken in North India; be warm and polite (e.g. 'aap', 'ji')."
        )

    elif language == "kn":

        language_instruction = (
            "\n\nLANGUAGE INSTRUCTION: Respond only in Kannada or very simple English. "
            "Do not reply in Hindi or Devanagari unless quoting the user. "
            "If Kannada output is uncertain, use minimal English and acknowledge their Kannada preference briefly."
        )

    else:

        language_instruction = "\n\nLANGUAGE INSTRUCTION: Respond in English. Use clear, professional, and natural conversational English."

    spoken_format = (
        "\n\nSPOKEN OUTPUT: Read aloud by phone TTS. Write every person or place name as normal words "
        "with spaces between words. Never join name parts with underscores, slashes, or programming-style hyphens; "
        "never output snake_case or glued tokens like WordWord for names. "
        "Use inclusive, neutral wording when referring to people. "
        "Do not repeat the same sentence, clause, or word back-to-back; say each idea once."
    )

    return system_prompt + language_instruction + spoken_format





def _install_livekit_blingfire_stub() -> None:

    """

    Work around Windows WDAC/App Control blocking `lk_blingfire` native extension.

    `livekit.agents` imports `livekit.agents.tokenize.blingfire` unconditionally; we stub it.

    """



    module_name = "livekit.agents.tokenize.blingfire"

    if module_name in sys.modules:

        return



    class SentenceTokenizer:  # minimal compatibility surface

        def __init__(

            self,

            *,

            min_sentence_len: int = 20,

            stream_context_len: int = 10,

            retain_format: bool = False,

        ) -> None:

            self._min_sentence_len = int(min_sentence_len)



        def tokenize(self, text: str, *, language: str | None = None) -> list[str]:

            parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", text or "") if p.strip()]

            return [p for p in parts if len(p) >= self._min_sentence_len]



        def stream(self, *, language: str | None = None):

            raise NotImplementedError("Streaming tokenizer not supported in fallback stub")



    stub = types.ModuleType(module_name)

    stub.__dict__.update({"__all__": ["SentenceTokenizer"], "SentenceTokenizer": SentenceTokenizer})

    sys.modules[module_name] = stub





_install_livekit_blingfire_stub()



from livekit.agents import inference, function_tool

from livekit.agents.llm.chat_context import ChatContext

from livekit.plugins import cartesia, deepgram, elevenlabs



from .env_load import load_agent_runtime_dotenv

from .business_id_agents import get_runtime_overrides_from_agents_table

from .mcube_defaults import apply_voice_defaults_to_dict, get_default_mcube_call_config

from .mq import (

    AI_UTTERANCES_QUEUE,

    RABBITMQ_URL,

    control_queue_name,

    declare_durable_queue,

    connect,

    get_channel,

    publish_json,

    tts_queue_name,

)



log = logging.getLogger("mcube.ai_worker")



load_agent_runtime_dotenv()



_PUBLISH_CHANNEL = None  # set in main(); used by process_utterance





def _env_fallback_runtime() -> dict[str, str]:

    """Fallback only when DB + queue payload do not supply values (see process_utterance)."""

    d = get_default_mcube_call_config()

    return {

        "system_prompt": d["system_prompt"],

        "llm_model": d["llm_model"],

        "llm_provider": d["llm_provider"],

        "tts_model": d["tts_model"],

        "tts_provider": d.get("tts_provider") or "elevenlabs",

        "tts_voice_id": d["tts_voice_id"],

        "tts_encoding": d["tts_encoding"],

        "tts_chunk_ms": d.get("tts_chunk_ms") or "200",

    }





async def _resolve_runtime_from_payload(payload: dict[str, Any]) -> dict[str, str]:

    """

    Merge precedence (later wins):

    1) `.env` defaults (MCUBE_* via mcube_defaults)

    2) `business_id_agents` row (+ JSON config) when `business_id` is present

    3) Explicit fields on the RabbitMQ payload (Redis call config / ws_bridge)

    """

    base = _env_fallback_runtime()



    # TEMPORARILY skip DB lookup – use .env.local directly for all config

    # bid = payload.get("business_id")

    # if bid is not None:

    #     db_ov = await asyncio.to_thread(

    #         get_runtime_overrides_from_agents_table,

    #         bid,

    #         agent_id=payload.get("agent_id"),

    #         user_id=payload.get("user_id"),

    #         email=payload.get("agent_email") or payload.get("email"),

    #         name=payload.get("agent_name") or payload.get("name"),

    #     )

    #     base.update(db_ov)



    override_keys = (

        "system_prompt",

        "llm_model",

        "llm_provider",

        "tts_model",

        "tts_provider",

        "tts_voice_id",

        "tts_encoding",

        "tts_chunk_ms",

        "stt_language_code",

        "stt_model_id",

        "stt_provider",

    )

    out = dict(base)

    for k in override_keys:

        if k not in payload:

            continue

        v = payload.get(k)

        if v is None:

            continue

        if isinstance(v, str) and k != "llm_provider" and not v.strip():

            continue

        if k == "system_prompt" and isinstance(v, str):

            out[k] = v.replace("\\n", "\n")

        else:

            out[k] = str(v).strip() if isinstance(v, str) else v


    apply_voice_defaults_to_dict(out)


    out.setdefault("tts_provider", base.get("tts_provider") or "")

    out.setdefault("llm_model", base.get("llm_model", ""))

    out.setdefault("tts_model", base.get("tts_model", ""))

    out.setdefault("tts_voice_id", base.get("tts_voice_id", ""))

    out.setdefault("tts_encoding", base.get("tts_encoding", ""))

    out.setdefault("system_prompt", base.get("system_prompt", ""))

    out.setdefault("tts_chunk_ms", base.get("tts_chunk_ms", "200"))

    out.setdefault("stt_language_code", base.get("stt_language_code", ""))

    out.setdefault("stt_model_id", base.get("stt_model_id", ""))

    out.setdefault("stt_provider", base.get("stt_provider") or "")

    return out


@dataclass(frozen=True)

class PublishTtsChunk:

    call_id: str

    sequence_id: int

    chunk_seq: int

    pcm16_8k_bytes: bytes





def _pcm16_to_b64(pcm16_bytes: bytes) -> str:

    return base64.b64encode(pcm16_bytes).decode("ascii")





def _resample_pcm16(pcm16_bytes: bytes, *, from_rate: int, to_rate: int, state: object) -> tuple[bytes, object]:

    """

    audioop.ratecv works on raw int16 PCM.

    Returns (converted_bytes, new_state).

    """

    import audioop



    converted, new_state = audioop.ratecv(

        pcm16_bytes,

        2,  # width bytes for int16

        1,  # channels

        from_rate,

        to_rate,

        state,

    )

    return converted, new_state





async def _run_llm(system_prompt: str, llm_model: str, user_text: str) -> str:
    # Phase 9.1: Reverted to LiveKit gateway temporarily to fix call processing
    llm = inference.LLM(model=llm_model, max_tokens=120)

    chat_ctx = ChatContext.empty()

    chat_ctx.add_message(role="system", content=system_prompt)

    chat_ctx.add_message(role="user", content=user_text)

    stream = llm.chat(chat_ctx=chat_ctx)

    text = []

    async for token in stream.to_str_iterable():

        text.append(token)

    return "".join(text).strip()





def _env_bool(key: str, default: bool = False) -> bool:
    value = os.getenv(key)
    if value is None:
        return default
    return value.strip().lower() in ("1", "true", "yes", "on")


def _env_int(key: str, default: int, *, min_value: int = 1, max_value: int = 4096) -> int:
    try:
        value = int(float(os.getenv(key, str(default))))
    except Exception:
        value = default
    return max(min_value, min(max_value, value))


def _openai_tool_schema(raw_schema: dict[str, Any]) -> dict[str, Any]:
    return {
        "type": "function",
        "function": {
            "name": raw_schema["name"],
            "description": raw_schema.get("description", ""),
            "parameters": raw_schema.get("parameters") or {"type": "object", "properties": {}},
        },
    }


def _direct_llm_settings(backend: str) -> tuple[str, str]:
    if backend == "together":
        api_key = os.getenv("TOGETHER_API_KEY", "").strip()
        base_url = os.getenv("MCUBE_TOGETHER_BASE_URL", "https://api.together.xyz/v1").strip()
        if not api_key:
            raise RuntimeError("TOGETHER_API_KEY not set")
        return api_key, base_url.rstrip("/")

    api_key = os.getenv("OPENAI_API_KEY", "").strip()
    base_url = os.getenv("MCUBE_DIRECT_OPENAI_BASE_URL", "https://api.openai.com/v1").strip()
    if not api_key:
        raise RuntimeError("OPENAI_API_KEY not set")
    return api_key, base_url.rstrip("/")


async def _chat_completions_turn(
    *,
    call_id: str,
    system_prompt: str,
    llm_model: str,
    user_text: str,
    backend: str,
    tools: list[dict[str, Any]],
) -> tuple[str, list[DirectToolCall]]:
    api_key, base_url = _direct_llm_settings(backend)
    timeout_s = float(os.getenv("MCUBE_DIRECT_OPENAI_TIMEOUT_S", "90"))
    max_tokens_key = "MCUBE_CHAT_COMPLETIONS_MAX_TOKENS" if backend == "together" else "MCUBE_DIRECT_OPENAI_MAX_TOKENS"
    max_tokens = _env_int(max_tokens_key, 384 if backend == "together" else 130, min_value=16, max_value=2048)

    history = _direct_call_contexts.get(call_id)
    if not history:
        history = [{"role": "system", "content": system_prompt}]
        _direct_call_contexts[call_id] = history
    elif history[0].get("role") != "system":
        history.insert(0, {"role": "system", "content": system_prompt})
    else:
        # Keep prior user/assistant turns; only refresh system (language/caller hints change per turn).
        history[0] = {"role": "system", "content": system_prompt}
    history.append({"role": "user", "content": user_text})

    body: dict[str, Any] = {
        "model": llm_model,
        "messages": history,
        "max_tokens": max_tokens,
        "temperature": 0.4,
    }
    if tools and not (backend == "together" and _env_bool("MCUBE_TOGETHER_DISABLE_TOOLS")):
        body["tools"] = [_openai_tool_schema(t) for t in tools]
        body["tool_choice"] = "auto"

    headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
    started = time.time()
    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout_s)) as session:
        async with session.post(f"{base_url}/chat/completions", headers=headers, json=body) as resp:
            response_text = await resp.text()
            if resp.status >= 400:
                raise RuntimeError(f"{backend} chat completions failed status={resp.status} body={response_text[:500]}")
            data = json.loads(response_text)

    log.info(
        "direct_llm_generated call_id=%s backend=%s model=%s latency_ms=%s",
        call_id,
        backend,
        llm_model,
        int((time.time() - started) * 1000),
    )

    message = ((data.get("choices") or [{}])[0].get("message") or {})
    content = message.get("content") or ""
    tool_calls: list[DirectToolCall] = []
    for tc in message.get("tool_calls") or []:
        fn = tc.get("function") or {}
        name = fn.get("name")
        if name:
            tool_calls.append(DirectToolCall(name=str(name), arguments=str(fn.get("arguments") or "{}")))

    if content:
        history.append({"role": "assistant", "content": content})
    elif tool_calls:
        history.append({"role": "assistant", "content": "", "tool_calls": message.get("tool_calls") or []})

    return str(content).strip(), tool_calls


async def _synthesize_tts_pcm16_8k(
    text: str,
    *,
    tts_provider: str,
    tts_model: str,
    tts_voice_id: str,
    tts_encoding: str,
    session: aiohttp.ClientSession,
    language: str,
    chunk_ms: Optional[int] = None,
) -> AsyncGenerator[bytes, None]:
    """Synthesize TTS and yield chunks as they arrive for true streaming (Phase 6.3)."""
    import audioop

    text = sanitize_text_for_tts(text)

    if tts_provider == "cartesia":
        cartesia_key = os.getenv("CARTESIA_API_KEY") or ""

        tts = cartesia.TTS(
            model=tts_model or "sonic-3.5",
            voice=tts_voice_id,
            language=language,
            api_key=cartesia_key,
            encoding="pcm_s16le",
            sample_rate=16000,
            http_session=session,
        )

        target_rate = 8000
        chunk_ms_eff = chunk_ms if chunk_ms is not None else int(os.getenv("MCUBE_TTS_CHUNK_MS", "200"))
        chunk_samples = int((chunk_ms_eff / 1000.0) * target_rate)
        chunk_bytes = chunk_samples * 2

        buffer = bytearray()
        resample_state = None

        async for synth_audio in tts.synthesize(text):
            frame = synth_audio.frame
            pcm16_from = frame.data.tobytes()
            from_rate = frame.sample_rate
            if from_rate != target_rate:
                converted, resample_state = audioop.ratecv(
                    pcm16_from, 2, 1, from_rate, target_rate, resample_state,
                )
                buffer.extend(converted)
            else:
                buffer.extend(pcm16_from)
            while len(buffer) >= chunk_bytes:
                chunk = bytes(buffer[:chunk_bytes])
                del buffer[:chunk_bytes]
                yield chunk

        if buffer:
            if len(buffer) < chunk_bytes:
                buffer.extend(b"\x00" * (chunk_bytes - len(buffer)))
            yield bytes(buffer[:chunk_bytes])

    else:
        elevenlabs_key = os.getenv("ELEVENLABS_API_KEY") or os.getenv("ELEVEN_API_KEY") or ""
        if not elevenlabs_key:
            raise RuntimeError("ELEVENLABS_API_KEY not set")

        try:
            stream_lat = int(os.getenv("MCUBE_ELEVENLABS_STREAMING_LATENCY", "2"))
        except ValueError:
            stream_lat = 2
        stream_lat = max(0, min(4, stream_lat))

        tts = elevenlabs.TTS(
            voice_id=tts_voice_id,
            model=tts_model,
            api_key=elevenlabs_key,
            encoding=tts_encoding or "pcm_16000",
            streaming_latency=stream_lat,
            http_session=session,
        )

        target_rate = 8000
        chunk_ms_eff = chunk_ms if chunk_ms is not None else int(os.getenv("MCUBE_TTS_CHUNK_MS", "200"))
        chunk_bytes = int((chunk_ms_eff / 1000.0) * target_rate) * 2
        buffer = bytearray()
        resample_state = None

        async for frame in tts.synthesize(text):
            # Handle both direct frame and wrapped SynthesizedAudio objects
            if hasattr(frame, 'frame'):
                pcm_data = frame.frame.data.tobytes()
            elif hasattr(frame, 'data'):
                pcm_data = frame.data.tobytes()
            else:
                # Try to get raw bytes directly
                pcm_data = bytes(frame)
            source_rate = 16000
            if hasattr(frame, "frame"):
                source_rate = getattr(frame.frame, "sample_rate", source_rate)
            elif hasattr(frame, "sample_rate"):
                source_rate = getattr(frame, "sample_rate", source_rate)
            if source_rate != target_rate:
                pcm_data, resample_state = audioop.ratecv(
                    pcm_data, 2, 1, int(source_rate), target_rate, resample_state
                )
            buffer.extend(pcm_data)
            while len(buffer) >= chunk_bytes:
                chunk = bytes(buffer[:chunk_bytes])
                del buffer[:chunk_bytes]
                yield chunk

        if buffer:
            if len(buffer) < chunk_bytes:
                buffer.extend(b"\x00" * (chunk_bytes - len(buffer)))
            yield bytes(buffer)


async def process_utterance(message) -> None:

    async with message.process(requeue=False):

        payload = json.loads(message.body.decode("utf-8"))



        call_id = str(payload["call_id"])

        sequence_id = int(payload.get("sequence_id", 0))

        user_text = payload.get("transcript") or ""

        bot_say_text = (payload.get("bot_say_text") or "").strip()

        rt = await _resolve_runtime_from_payload(payload)

        system_prompt = rt["system_prompt"]

        llm_model = rt["llm_model"]

        llm_provider = str(rt.get("llm_provider") or "")

        tts_model = rt["tts_model"]

        tts_provider = str(rt.get("tts_provider") or "elevenlabs")

        tts_voice_id = rt["tts_voice_id"]

        tts_encoding = rt["tts_encoding"]

        try:

            tts_chunk_ms = max(5, min(500, int(float(str(rt.get("tts_chunk_ms") or "200")))))

        except Exception:

            tts_chunk_ms = 200



        # Detect language: merge STT default with preference / script / Latin (see mcube_language.resolve_detected_language)

        stt_default = str(rt.get("stt_language_code") or "en").strip() or "en"

        detected_language = stt_default

        name_newly_learned = False

        if (user_text or "").strip():

            name_newly_learned = _maybe_capture_caller_name(call_id, user_text)

        ut_strip = (user_text or "").strip()
        lang_pref_in_ut = detect_language_preference(ut_strip) if ut_strip else None
        if lang_pref_in_ut:
            _caller_explicit_language[call_id] = lang_pref_in_ut

        if user_text and not bot_say_text:

            detected_language = resolve_detected_language(stt_default, user_text)

            if str(stt_default) != str(detected_language):
                log.info(
                    "language_detect_override call_id=%s seq=%s stt_default=%s resolved=%s text=%r",
                    call_id,
                    sequence_id,
                    stt_default,
                    detected_language,
                    user_text[:80],
                )

            log.info("language_detected call_id=%s seq=%s language=%s text=%r", call_id, sequence_id, detected_language, user_text[:50])



        # Augment system prompt with language instructions

        system_prompt_with_lang = augment_system_prompt_with_language(system_prompt, detected_language)

        caller_name = _caller_names.get(call_id)
        if caller_name:
            system_prompt_with_lang += (
                f"\n\nCALLER NAME: The caller introduced themselves as {caller_name}. "
                "Address them by name naturally when it fits the conversation."
            )

        if (
            caller_name
            and call_id not in _caller_language_locked
            and not _language_preference_prompt_sent.get(call_id)
            and not _caller_explicit_language.get(call_id)
        ):
            system_prompt_with_lang += (
                "\n\nLANGUAGE PREFERENCE (phone): You support English, Hindi, and Kannada. "
                "If the caller has shared their name but has not clearly chosen which language they prefer, "
                "ask once which they are comfortable with before a long sales pitch. "
                "If you already asked this call or they already chose, do not repeat."
            )



        start_t = time.time()

        log.info(

            "utterance_received call_id=%s seq=%s stt_text=%r llm_model=%s tts_provider=%s tts_model=%s tts_voice_id=%s language=%s",

            call_id,

            sequence_id,

            (bot_say_text or user_text or "")[:120],

            llm_model,

            tts_provider,

            tts_model,

            tts_voice_id,

            detected_language,

        )

        try:

            # If ws_bridge requested a direct bot message, skip LLM and just synthesize this text.

            if bot_say_text:

                response_text = bot_say_text

                response_text = prepare_text_for_phone_tts(response_text)

                # Phase 6.3: Create session for streaming TTS and publish chunks as they arrive
                connection_queue = tts_queue_name(call_id)
                channel = _PUBLISH_CHANNEL
                if channel is None:
                    raise RuntimeError("publish channel not initialized")

                async with aiohttp.ClientSession() as session:
                    chunks_generator = _synthesize_tts_pcm16_8k(
                        response_text,
                        tts_provider=tts_provider,
                        tts_model=tts_model,
                        tts_voice_id=tts_voice_id,
                        tts_encoding=tts_encoding,
                        session=session,
                        chunk_ms=tts_chunk_ms,
                        language=detected_language,
                    )
                    # Phase 6.3: Publish chunks immediately as they arrive (true streaming)
                    chunk_seq = 0
                    async for pcm16_8k_bytes in chunks_generator:
                        await publish_json(
                            channel=channel,
                            queue_name=connection_queue,
                            payload={
                                "call_id": call_id,
                                "sequence_id": sequence_id,
                                "chunk_seq": chunk_seq,
                                "audio_pcm_b64": _pcm16_to_b64(pcm16_8k_bytes),
                                "type": "tts_audio_chunk",
                            },
                        )
                        chunk_seq += 1

                    # Publish end-of-tts marker
                    await publish_json(
                        channel=channel,
                        queue_name=connection_queue,
                        payload={
                            "call_id": call_id,
                            "sequence_id": sequence_id,
                            "type": "tts_end",
                        },
                    )
                    # Skip the publishing loop below
                    chunks_8k = None

            else:

                # Tool schemas (the model can emit these as structured tool calls).

                end_call_schema = {

                    "type": "function",

                    "name": "end_call",

                    "description": "Terminate the current phone call immediately.",

                    "parameters": {

                        "type": "object",

                        "properties": {},

                        "required": [],

                        "additionalProperties": False,

                    },

                }



                transfer_to_number_schema = {

                    "type": "function",

                    "name": "transfer_to_number",

                    "description": "Transfer the call to another phone number.",

                    "parameters": {

                        "type": "object",

                        "properties": {

                            "phone_number": {

                                "type": "string",

                                "description": "Target phone number in E.164 format (preferred).",

                            },

                            "transfer_number": {

                                "type": "string",

                                "description": "Alias for phone_number.",

                            },

                            "sip_uri": {

                                "type": "string",

                                "description": "Optional SIP URI like sip:+918...@host.",

                            },

                        },

                        "additionalProperties": False,

                    },

                }



                tools = [

                    end_call_schema,

                    transfer_to_number_schema,

                ]



                tool_instructions = (

                    "\n\nTool-use rules for MCube phone calls:\n"

                    "- Call `end_call` to terminate the current call.\n"

                    "- Call `transfer_to_number` to transfer; provide phone_number or sip_uri.\n"

                    "- When calling a tool, do not include additional spoken content; the caller side will act on the tool."

                )

                system_prompt_with_tools = f"{system_prompt_with_lang}{tool_instructions}"

                response_text = _fast_response_for_user_text(
                    user_text,
                    call_id,
                    name_newly_learned=name_newly_learned,
                )
                tool_calls: list[DirectToolCall] = []

                if response_text:
                    log.info("fast_response_generated call_id=%s seq=%s language=%s", call_id, sequence_id, detected_language)
                else:
                    llm_backend = os.getenv("MCUBE_LLM_BACKEND", "together").strip().lower() or "together"
                    if llm_backend not in ("openai", "together"):
                        raise RuntimeError(
                            f"Unsupported direct MCube LLM backend {llm_backend!r}; use MCUBE_LLM_BACKEND=openai or together"
                        )

                    response_text, tool_calls = await _chat_completions_turn(
                        call_id=call_id,
                        system_prompt=system_prompt_with_tools,
                        llm_model=llm_model,
                        user_text=user_text,
                        backend=llm_backend,
                        tools=tools,
                    )

                text_parts = [response_text] if response_text else []



                # If the model requested a control action, publish it and skip TTS.

                if tool_calls:

                    control_queue = control_queue_name(call_id)

                    channel = _PUBLISH_CHANNEL

                    if channel is None:

                        raise RuntimeError("publish channel not initialized")



                    for tc in tool_calls:

                        try:

                            tool_args = json.loads(tc.arguments or "{}")

                        except Exception:

                            tool_args = {}



                        if tc.name == "end_call":

                            await publish_json(

                                channel=channel,

                                queue_name=control_queue,

                                payload={

                                    "type": "mcube_terminate",

                                    "call_id": call_id,

                                    "sequence_id": sequence_id,

                                },

                            )

                            return



                        if tc.name == "transfer_to_number":

                            transfer_to = (

                                tool_args.get("phone_number")

                                or tool_args.get("transfer_number")

                                or tool_args.get("sip_uri")

                            )

                            await publish_json(

                                channel=channel,

                                queue_name=control_queue,

                                payload={

                                    "type": "mcube_transfer",

                                    "call_id": call_id,

                                    "sequence_id": sequence_id,

                                    "transfer_to": transfer_to,

                                },

                            )

                            return



                response_text = "".join(text_parts).strip()

                if not response_text:

                    response_text = "Okay."

                response_text = prepare_text_for_phone_tts(response_text)

                

                # Add assistant response to conversation context

                if call_id in _call_contexts:

                    _call_contexts[call_id].add_message(role="assistant", content=response_text)



                # Phase 6.3: Create session for streaming TTS and publish chunks as they arrive
                connection_queue = tts_queue_name(call_id)
                channel = _PUBLISH_CHANNEL
                if channel is None:
                    raise RuntimeError("publish channel not initialized")

                async with aiohttp.ClientSession() as session:
                    chunks_generator = _synthesize_tts_pcm16_8k(
                        response_text,
                        tts_provider=tts_provider,
                        tts_model=tts_model,
                        tts_voice_id=tts_voice_id,
                        tts_encoding=tts_encoding,
                        session=session,
                        chunk_ms=tts_chunk_ms,
                        language=detected_language,
                    )
                    # Phase 6.3: Publish chunks immediately as they arrive (true streaming)
                    chunk_seq = 0
                    async for pcm16_8k_bytes in chunks_generator:
                        await publish_json(
                            channel=channel,
                            queue_name=connection_queue,
                            payload={
                                "call_id": call_id,
                                "sequence_id": sequence_id,
                                "chunk_seq": chunk_seq,
                                "audio_pcm_b64": _pcm16_to_b64(pcm16_8k_bytes),
                                "type": "tts_audio_chunk",
                            },
                        )
                        chunk_seq += 1

                    # Publish end-of-tts marker
                    await publish_json(
                        channel=channel,
                        queue_name=connection_queue,
                        payload={
                            "call_id": call_id,
                            "sequence_id": sequence_id,
                            "type": "tts_end",
                        },
                    )
                    # Skip the publishing loop below
                    chunks_8k = None

        except Exception as e:
            log.error("utterance_processing_failed call_id=%s seq=%s error=%s", call_id, sequence_id, str(e))
            raise



        # Publish chunks (one per queue message) so WS can stream playback.
        # Phase 6.3: Skip if chunks_8k is None (streaming already done)
        if chunks_8k is not None:
            connection_queue = tts_queue_name(call_id)



            # Publish all chunks in order.

            # WS bridge assumes chunk_seq increases and plays sequentially.

            channel = _PUBLISH_CHANNEL

            if channel is None:

                raise RuntimeError("publish channel not initialized")

            # The ws_bridge declares/consumes this queue; we can publish without declaring here.

            for chunk_seq, pcm16_8k_bytes in enumerate(chunks_8k):

                await publish_json(

                    channel=channel,

                    queue_name=connection_queue,

                    payload={

                        "call_id": call_id,

                        "sequence_id": sequence_id,

                        "chunk_seq": chunk_seq,

                        "audio_pcm_b64": _pcm16_to_b64(pcm16_8k_bytes),

                        "type": "tts_audio_chunk",

                    },

                )



            # Publish a final marker for end-of-tts.
            await publish_json(

                channel=channel,

                queue_name=connection_queue,

                payload={

                    "call_id": call_id,

                    "sequence_id": sequence_id,

                    "type": "tts_done",

                },

            )

            log.info(

                "tts_published call_id=%s seq=%s chunks=%s tts_generation_ms=%s",

                call_id,

                sequence_id,

                len(chunks_8k),

                int((time.time() - start_t) * 1000),

            )

        # Phase 6.3: Log for streaming case (chunks_8k is None)
        if chunks_8k is None:
            log.info(
                "tts_published_streaming call_id=%s seq=%s tts_generation_ms=%s",
                call_id,
                sequence_id,
                int((time.time() - start_t) * 1000),
            )





async def main() -> None:

    logging.basicConfig(level=logging.INFO)



    connection = await connect()

    channel = await get_channel(connection)

    global _PUBLISH_CHANNEL

    _PUBLISH_CHANNEL = channel



    await declare_durable_queue(channel, AI_UTTERANCES_QUEUE)



    log.info("ai_worker: consuming %s on %s", AI_UTTERANCES_QUEUE, RABBITMQ_URL)



    queue = await channel.declare_queue(AI_UTTERANCES_QUEUE, durable=True)

    await queue.consume(process_utterance, no_ack=False)



    await asyncio.Future()  # run forever





if __name__ == "__main__":

    asyncio.run(main())

# import asyncio
# import base64
# import json
# import logging
# import os
# import sys
# import time
# from dataclasses import dataclass
# from typing import Any, AsyncIterator, Literal, Optional, TypedDict
# import types
# import re
# import difflib

# import aiohttp
# import httpx
# import openai
# import redis.asyncio as redis_async


# def _pcm16_le_aligned(b: bytes) -> bytes:
#     """Drop trailing odd byte so length is a multiple of 2 (int16 mono)."""
#     if not b:
#         return b
#     n = len(b) - (len(b) % 2)
#     return bytes(b[:n])


# def _install_livekit_blingfire_stub() -> None:
#     """
#     Work around Windows WDAC/App Control blocking `lk_blingfire` native extension.
#     `livekit.agents` imports `livekit.agents.tokenize.blingfire` unconditionally; we stub it.
#     """

#     module_name = "livekit.agents.tokenize.blingfire"
#     if module_name in sys.modules:
#         return

#     class SentenceTokenizer:  # minimal compatibility surface
#         def __init__(
#             self,
#             *,
#             min_sentence_len: int = 20,
#             stream_context_len: int = 10,
#             retain_format: bool = False,
#         ) -> None:
#             self._min_sentence_len = int(min_sentence_len)

#         def tokenize(self, text: str, *, language: str | None = None) -> list[str]:
#             parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", text or "") if p.strip()]
#             return [p for p in parts if len(p) >= self._min_sentence_len]

#         def stream(self, *, language: str | None = None):
#             raise NotImplementedError("Streaming tokenizer not supported in fallback stub")

#     stub = types.ModuleType(module_name)
#     stub.__dict__.update({"__all__": ["SentenceTokenizer"], "SentenceTokenizer": SentenceTokenizer})
#     sys.modules[module_name] = stub


# _install_livekit_blingfire_stub()

# from livekit.agents import inference, function_tool
# from livekit.agents.llm.chat_context import ChatContext
# from livekit.plugins import cartesia, deepgram, elevenlabs

# from .env_load import load_agent_runtime_dotenv
# from .business_id_agents import get_runtime_overrides_from_agents_table
# from .mcube_defaults import apply_voice_defaults_to_dict, get_default_mcube_call_config
# from .mq import (
#     AI_UTTERANCES_QUEUE,
#     RABBITMQ_URL,
#     control_queue_name,
#     declare_durable_queue,
#     connect,
#     get_channel,
#     publish_json,
#     tts_queue_name,
# )
# from .pipeline_log import append_event
# from .service_log import configure_mcube_file_logging

# log = logging.getLogger("mcube.ai_worker")

# load_agent_runtime_dotenv()


# def _configure_stdio_utf8() -> None:
#     """Windows cp1252 breaks on arrows/symbols in operator lines and aborts utterance handling."""
#     for stream in (sys.stdout, sys.stderr):
#         try:
#             if hasattr(stream, "reconfigure"):
#                 stream.reconfigure(encoding="utf-8", errors="replace")
#         except Exception:
#             pass


# _configure_stdio_utf8()


# def _safe_terminal_print(line: str) -> None:
#     try:
#         print(line, flush=True)
#     except UnicodeEncodeError:
#         enc = getattr(sys.stdout, "encoding", None) or "ascii"
#         try:
#             print((line or "").encode(enc, errors="replace").decode(enc), flush=True)
#         except Exception:
#             print((line or "").encode("ascii", errors="replace").decode("ascii"), flush=True)


# def _terminal_ai(msg: str) -> None:
#     """Match ws_bridge: same env disables stdout (MCUBE_SPEAKING_STDOUT=0)."""
#     if os.getenv("MCUBE_SPEAKING_STDOUT", "1").strip().lower() in ("0", "false", "no"):
#         return
#     _safe_terminal_print(f"[mcube-ai {time.strftime('%H:%M:%S')}] {msg}")


# def _terminal_ai_bot_text(call_id: str, sequence_id: int, text: str, *, source: str) -> None:
#     """Two-line BOT transcript so terminals match ws_bridge USER lines (grep mcube-ai / mcube)."""
#     if os.getenv("MCUBE_SPEAKING_STDOUT", "1").strip().lower() in ("0", "false", "no"):
#         return
#     try:
#         m = int(os.getenv("MCUBE_TERMINAL_TEXT_MAX", "500"))
#     except Exception:
#         m = 500
#     m = max(40, min(8000, m))
#     ts = time.strftime("%H:%M:%S")
#     clip = (text or "").replace("\r\n", "\n").strip()
#     if len(clip) > m:
#         clip = clip[:m] + " ...(truncated)"
#     _safe_terminal_print(f"[mcube-ai {ts}] BOT > call={call_id} seq={sequence_id} ({source})")
#     _safe_terminal_print(f"[mcube-ai {ts}]      {clip}")

# REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# _MCUBE_CHAT_MAX_MESSAGES = max(4, int(os.getenv("MCUBE_CHAT_MAX_MESSAGES", "40")))
# _REDIS_CLIENT: redis_async.Redis | None = None

# _PUBLISH_CHANNEL = None  # set in main(); used by process_utterance

# # Optional Async OpenAI SDK (separate from httpx Chat Completions path in `_chat_completions_turn`).
# _openai_client: Optional[Any] = None


# def get_openai_client() -> Any:
#     global _openai_client
#     if _openai_client is None:
#         api_key = os.getenv("OPENAI_API_KEY")
#         if not api_key:
#             raise RuntimeError("OPENAI_API_KEY not set")
#         _openai_client = openai.AsyncOpenAI(api_key=api_key)
#     return _openai_client


# def detect_language(text: str) -> str:
#     """
#     Detect if text is Hindi/Hinglish or English.
#     Returns: 'hi' for Hindi/Hinglish, 'en' for English.
#     """
#     if not text:
#         return "en"

#     hindi_chars = sum(1 for char in text if "\u0900" <= char <= "\u097F")
#     if hindi_chars > 0:
#         return "hi"

#     hinglish_words = [
#         "namaste",
#         "kaise",
#         "ho",
#         "aap",
#         "main",
#         "hai",
#         "kya",
#         "kar",
#         "raha",
#         "hai",
#         "nahi",
#         "please",
#         "thank",
#         "you",
#         "ji",
#         "bhai",
#         "yaar",
#         "dost",
#         "acha",
#         "theek",
#         "hai",
#         "sahi",
#         "galat",
#         "batao",
#         "samajh",
#         "nahi",
#         "kripya",
#         "karein",
#         "dhanyawad",
#         "shukriya",
#         "haan",
#         "na",
#         "bolo",
#         "hindi",
#         "hinglish",
#     ]

#     text_lower = text.lower()
#     hinglish_count = sum(1 for word in hinglish_words if word in text_lower.split())

#     if hinglish_count >= 1:
#         return "hi"

#     return "en"


# def augment_system_prompt_with_language(system_prompt: str, language: str) -> str:
#     """Add language-specific instructions to the system prompt based on detected language."""
#     if language == "hi":
#         language_instruction = (
#             "\n\nLANGUAGE INSTRUCTION: Respond in Hindi or Hinglish (Hindi written in English script). "
#             "Use natural conversational Hindi as spoken in North India. Be warm, polite, and use "
#             "appropriate honorifics like 'aap', 'ji', etc."
#         )
#     else:
#         language_instruction = (
#             "\n\nLANGUAGE INSTRUCTION: Respond in English. Use clear, professional, and natural conversational English."
#         )

#     return system_prompt + language_instruction


# def _chat_history_enabled() -> bool:
#     return os.getenv("MCUBE_CHAT_HISTORY_DISABLE", "").strip().lower() not in ("1", "true", "yes")


# def _chat_redis_key(call_id: str) -> str:
#     return f"mcube:chat:{call_id}"


# async def _get_redis() -> redis_async.Redis:
#     global _REDIS_CLIENT
#     if _REDIS_CLIENT is None:
#         _REDIS_CLIENT = redis_async.from_url(REDIS_URL, decode_responses=True)
#     return _REDIS_CLIENT


# class _Msg(TypedDict):
#     role: Literal["user", "assistant"]
#     content: str


# async def _load_chat_messages(call_id: str) -> list[_Msg]:
#     if not _chat_history_enabled():
#         return []
#     try:
#         r = await _get_redis()
#         raw = await r.get(_chat_redis_key(call_id))
#         if not raw:
#             return []
#         data = json.loads(raw)
#         if not isinstance(data, list):
#             return []
#         out: list[_Msg] = []
#         for x in data:
#             if not isinstance(x, dict):
#                 continue
#             role = x.get("role")
#             content = (x.get("content") or "").strip()
#             if role not in ("user", "assistant") or not content:
#                 continue
#             out.append({"role": role, "content": content})
#         return out
#     except Exception:
#         log.warning("chat_history_load_failed call_id=%s", call_id, exc_info=True)
#         return []


# async def _save_chat_messages(call_id: str, messages: list[_Msg]) -> None:
#     if not _chat_history_enabled():
#         return
#     if len(messages) > _MCUBE_CHAT_MAX_MESSAGES:
#         messages = messages[-_MCUBE_CHAT_MAX_MESSAGES :]
#     try:
#         r = await _get_redis()
#         await r.set(
#             _chat_redis_key(call_id),
#             json.dumps(messages, ensure_ascii=False),
#             ex=3600 * 6,
#         )
#     except Exception:
#         log.warning("chat_history_save_failed call_id=%s", call_id, exc_info=True)


# async def _append_chat_exchange(call_id: str, user_line: str, assistant_line: str) -> None:
#     if not _chat_history_enabled():
#         return
#     msgs: list[_Msg] = await _load_chat_messages(call_id)
#     u = (user_line or "").strip()
#     a = (assistant_line or "").strip()
#     if u:
#         msgs.append({"role": "user", "content": u})
#     if a:
#         msgs.append({"role": "assistant", "content": a})
#     await _save_chat_messages(call_id, msgs)


# async def _append_assistant_only(call_id: str, assistant_line: str) -> None:
#     if not _chat_history_enabled():
#         return
#     a = (assistant_line or "").strip()
#     if not a:
#         return
#     msgs: list[_Msg] = await _load_chat_messages(call_id)
#     msgs.append({"role": "assistant", "content": a})
#     await _save_chat_messages(call_id, msgs)


# def _last_assistant_from_turns(turns: list[_Msg]) -> str:
#     for m in reversed(turns):
#         if m["role"] == "assistant":
#             return (m["content"] or "").strip()
#     return ""


# def _is_near_duplicate_assistant_reply(new: str, previous: str) -> bool:
#     """Detect mid-call repetition loops (same line TTS again)."""
#     if not new or not previous:
#         return False
#     try:
#         threshold = float(os.getenv("MCUBE_REPLY_DUPLICATE_SIMILARITY", "0.86"))
#     except Exception:
#         threshold = 0.86
#     threshold = max(0.7, min(0.98, threshold))
#     a = re.sub(r"\s+", " ", new.lower().strip())
#     b = re.sub(r"\s+", " ", previous.lower().strip())
#     if a == b:
#         return True
#     if len(a) >= 24 and (a in b or b in a):
#         return True
#     return difflib.SequenceMatcher(None, a, b).ratio() >= threshold


# async def _publish_tts_done(
#     call_id: str, sequence_id: int, *, final_chunk_seq: int, started_at: float
# ) -> None:
#     """Notify ws_bridge that TTS is finished (required to clear `response_pending`)."""
#     ch = _PUBLISH_CHANNEL
#     if ch is None:
#         return
#     await publish_json(
#         channel=ch,
#         queue_name=tts_queue_name(call_id),
#         payload={
#             "call_id": call_id,
#             "sequence_id": sequence_id,
#             "type": "tts_done",
#             "final_chunk_seq": final_chunk_seq,
#             "tts_generation_ms": int((time.time() - started_at) * 1000),
#         },
#     )


# def _env_fallback_runtime() -> dict[str, str]:
#     """Fallback only when DB + queue payload do not supply values (see process_utterance)."""
#     d = get_default_mcube_call_config()
#     return {
#         "system_prompt": d["system_prompt"],
#         "llm_model": d["llm_model"],
#         "llm_provider": d["llm_provider"],
#         "tts_model": d["tts_model"],
#         "tts_provider": d.get("tts_provider") or "elevenlabs",
#         "tts_voice_id": d["tts_voice_id"],
#         "tts_encoding": d["tts_encoding"],
#         "tts_chunk_ms": d.get("tts_chunk_ms") or "200",
#         "stt_language_code": d.get("stt_language_code") or "en",
#         "stt_model_id": d.get("stt_model_id") or "",
#         "stt_provider": d.get("stt_provider") or "elevenlabs",
#     }


# async def _resolve_runtime_from_payload(payload: dict[str, Any]) -> dict[str, str]:
#     """
#     Merge precedence (later wins):
#     1) `.env` defaults (MCUBE_* via mcube_defaults)
#     2) `business_id_agents` row (+ JSON config) when `business_id` is present
#     3) Explicit fields on the RabbitMQ payload (Redis call config / ws_bridge)
#     """
#     base = _env_fallback_runtime()

#     skip_agents_db = os.getenv("MCUBE_SKIP_AGENTS_DB_LOOKUP", "").strip().lower() in ("1", "true", "yes")
#     bid = payload.get("business_id")
#     if not skip_agents_db and bid is not None:
#         db_ov = await asyncio.to_thread(
#             get_runtime_overrides_from_agents_table,
#             bid,
#             agent_id=payload.get("agent_id"),
#             user_id=payload.get("user_id"),
#             email=payload.get("agent_email") or payload.get("email"),
#             name=payload.get("agent_name") or payload.get("name"),
#         )
#         base.update(db_ov)

#     override_keys = (
#         "system_prompt",
#         "llm_model",
#         "llm_provider",
#         "tts_model",
#         "tts_provider",
#         "tts_voice_id",
#         "tts_encoding",
#         "tts_chunk_ms",
#         "stt_language_code",
#         "stt_model_id",
#         "stt_provider",
#     )
#     out = dict(base)
#     for k in override_keys:
#         if k not in payload:
#             continue
#         v = payload.get(k)
#         if v is None:
#             continue
#         if isinstance(v, str) and k != "llm_provider" and not v.strip():
#             continue
#         if k == "system_prompt" and isinstance(v, str):
#             out[k] = v.replace("\\n", "\n")
#         else:
#             out[k] = str(v).strip() if isinstance(v, str) else v

#     out.setdefault("tts_provider", "elevenlabs")
#     out.setdefault("llm_model", base.get("llm_model", ""))
#     out.setdefault("tts_model", base.get("tts_model", ""))
#     out.setdefault("tts_voice_id", base.get("tts_voice_id", ""))
#     out.setdefault("tts_encoding", base.get("tts_encoding", ""))
#     out.setdefault("system_prompt", base.get("system_prompt", ""))
#     out.setdefault("tts_chunk_ms", base.get("tts_chunk_ms", "200"))
#     out.setdefault("stt_language_code", base.get("stt_language_code", "en"))
#     out.setdefault("stt_model_id", base.get("stt_model_id", ""))
#     out.setdefault("stt_provider", base.get("stt_provider", "elevenlabs"))

#     apply_voice_defaults_to_dict(out)

#     # Force Chat Completions model id from env (ignores Redis/payload llm_model). Use when testing Together/OpenAI
#     # without stale campaign/bot rows sending openai/gpt-4o-mini.
#     if os.getenv("MCUBE_LLM_MODEL_FROM_ENV", "").strip().lower() in ("1", "true", "yes"):
#         em = (os.getenv("MCUBE_LLM_MODEL") or "").strip()
#         if em:
#             out["llm_model"] = em
#     return out


# @dataclass(frozen=True)
# class PublishTtsChunk:
#     call_id: str
#     sequence_id: int
#     chunk_seq: int
#     pcm16_8k_bytes: bytes


# def _pcm16_to_b64(pcm16_bytes: bytes) -> str:
#     return base64.b64encode(pcm16_bytes).decode("ascii")


# def _resample_pcm16(pcm16_bytes: bytes, *, from_rate: int, to_rate: int, state: object) -> tuple[bytes, object]:
#     """
#     audioop.ratecv works on raw int16 PCM.
#     Returns (converted_bytes, new_state).
#     """
#     import audioop

#     converted, new_state = audioop.ratecv(
#         pcm16_bytes,
#         2,  # width bytes for int16
#         1,  # channels
#         from_rate,
#         to_rate,
#         state,
#     )
#     return converted, new_state


# async def _run_llm(system_prompt: str, llm_model: str, user_text: str) -> str:
#     llm = inference.LLM(model=llm_model, max_tokens=120)
#     chat_ctx = ChatContext.empty()
#     chat_ctx.add_message(role="system", content=system_prompt)
#     chat_ctx.add_message(role="user", content=user_text)
#     stream = llm.chat(chat_ctx=chat_ctx)
#     text = []
#     async for token in stream.to_str_iterable():
#         text.append(token)
#     return "".join(text).strip()


# @dataclass
# class _ToolCallLite:
#     """Minimal tool-call shape for OpenAI direct + downstream (name, arguments JSON str)."""

#     name: str
#     arguments: str


# def _mcube_llm_backend() -> str:
#     """livekit (default) | openai | together — see docs/direct-llm-instead-of-livekit-inference.md"""
#     return os.getenv("MCUBE_LLM_BACKEND", "livekit").strip().lower()


# def _openai_api_model_id(llm_model: str) -> str:
#     """Strip LiveKit-style `openai/` prefix for the OpenAI REST model field."""
#     m = (llm_model or "").strip()
#     if m.lower().startswith("openai/"):
#         return m.split("/", 1)[1].strip() or "gpt-4o-mini"
#     return m or "gpt-4o-mini"


# def _rest_chat_model_id(provider: str, llm_model: str) -> str:
#     """Model id for Chat Completions URL (OpenAI or Together)."""
#     m = (llm_model or "").strip()
#     p = (provider or "").strip().lower()
#     if p == "openai":
#         return _openai_api_model_id(llm_model)
#     if p == "together":
#         if m.lower().startswith("together/"):
#             return m.split("/", 1)[1].strip()
#         if m:
#             return m
#         return (
#             os.getenv("MCUBE_TOGETHER_MODEL") or "meta-llama/Llama-3.3-70B-Instruct-Turbo"
#         ).strip()
#     return m or "gpt-4o-mini"


# def _direct_openai_max_tokens() -> int:
#     """
#     Chat Completions completion budget for PSTN (shorter = typically lower latency).
#     ``MCUBE_DIRECT_OPENAI_MAX_TOKENS`` is clamped to 110–150 inclusive.
#     """
#     try:
#         n = int(os.getenv("MCUBE_DIRECT_OPENAI_MAX_TOKENS", "130"))
#     except Exception:
#         n = 130
#     return max(110, min(150, n))


# def _direct_openai_latency_target_ms() -> int:
#     """Warn when HTTP latency exceeds this (default 500); cannot enforce remote OpenAI speed."""
#     try:
#         return max(100, min(8000, int(os.getenv("MCUBE_DIRECT_OPENAI_LATENCY_TARGET_MS", "500"))))
#     except Exception:
#         return 500


# def _openai_chat_tools_payload() -> list[dict[str, Any]]:
#     return [
#         {
#             "type": "function",
#             "function": {
#                 "name": "end_call",
#                 "description": "Terminate the current phone call immediately.",
#                 "parameters": {"type": "object", "properties": {}, "required": []},
#             },
#         },
#         {
#             "type": "function",
#             "function": {
#                 "name": "transfer_to_number",
#                 "description": "Transfer the call to another phone number.",
#                 "parameters": {
#                     "type": "object",
#                     "properties": {
#                         "phone_number": {"type": "string"},
#                         "transfer_number": {"type": "string"},
#                         "sip_uri": {"type": "string"},
#                     },
#                 },
#             },
#         },
#     ]


# async def _chat_completions_turn(
#     *,
#     provider: str,
#     sys_body: str,
#     prior_turns: list[_Msg],
#     user_text: str,
#     llm_model: str,
# ) -> tuple[list[_ToolCallLite], str, int]:
#     """
#     One non-streaming OpenAI-compatible Chat Completions call (OpenAI or Together AI).

#     Returns ``llm_http_ms``: round-trip time for the HTTP request/response only (no TTS).
#     """
#     prov = (provider or "").strip().lower()
#     if prov == "openai":
#         api_key = (os.getenv("OPENAI_API_KEY") or "").strip().strip('"').strip("'")
#         if not api_key:
#             raise RuntimeError("OPENAI_API_KEY is not set (required when MCUBE_LLM_BACKEND=openai)")
#         base = (os.getenv("MCUBE_DIRECT_OPENAI_BASE_URL") or "https://api.openai.com/v1").rstrip("/")
#     elif prov == "together":
#         api_key = (os.getenv("TOGETHER_API_KEY") or "").strip().strip('"').strip("'")
#         if not api_key:
#             raise RuntimeError("TOGETHER_API_KEY is not set (required when MCUBE_LLM_BACKEND=together)")
#         base = (os.getenv("MCUBE_TOGETHER_BASE_URL") or "https://api.together.xyz/v1").rstrip("/")
#     else:
#         raise RuntimeError(f"unsupported Chat Completions provider: {provider!r}")

#     url = f"{base}/chat/completions"
#     try:
#         timeout_raw = os.getenv("MCUBE_CHAT_COMPLETIONS_TIMEOUT_S") or os.getenv(
#             "MCUBE_DIRECT_OPENAI_TIMEOUT_S", "90"
#         )
#         timeout_s = float(timeout_raw)
#     except Exception:
#         timeout_s = 90.0
#     timeout_s = max(15.0, min(300.0, timeout_s))

#     messages: list[dict[str, str]] = [{"role": "system", "content": sys_body}]
#     for m in prior_turns:
#         messages.append({"role": m["role"], "content": m["content"]})
#     messages.append({"role": "user", "content": user_text})

#     max_out = _direct_openai_max_tokens()
#     model_id = _rest_chat_model_id(prov, llm_model)
#     payload_base: dict[str, Any] = {
#         "model": model_id,
#         "messages": messages,
#         "max_tokens": max_out,
#     }
#     use_tools = True
#     if prov == "together" and os.getenv("MCUBE_TOGETHER_DISABLE_TOOLS", "").strip().lower() in (
#         "1",
#         "true",
#         "yes",
#     ):
#         use_tools = False

#     log.info(
#         "chat_completions request provider=%s model_id=%s url=%s max_tokens=%s use_tools=%s",
#         prov,
#         model_id,
#         url,
#         max_out,
#         use_tools,
#     )

#     hdrs = {
#         "Authorization": f"Bearer {api_key}",
#         "Content-Type": "application/json",
#         # Some CDNs block bare httpx; Chat Completions still succeed with an explicit UA.
#         "User-Agent": "mcube-ai-worker/1.0",
#     }

#     async def _post_payload(body: dict[str, Any]) -> tuple[httpx.Response, str, int]:
#         t_llm0 = time.perf_counter()
#         async with httpx.AsyncClient(timeout=httpx.Timeout(timeout_s)) as client:
#             resp = await client.post(url, headers=hdrs, json=body)
#             raw = resp.text
#         llm_ms = int(max(0.0, (time.perf_counter() - t_llm0)) * 1000)
#         return resp, raw, llm_ms

#     payload = dict(payload_base)
#     if use_tools:
#         payload["tools"] = _openai_chat_tools_payload()
#         payload["tool_choice"] = "auto"

#     resp, raw, llm_http_ms = await _post_payload(payload)

#     # Together (and some OpenAI-compatible hosts) return 4xx when `tools` are unsupported for a model.
#     if resp.status_code != 200 and use_tools and prov in ("together", "openai"):
#         log.warning(
#             "chat_completions_http first_try provider=%s status=%s model=%s — retrying without tools. body=%r",
#             prov,
#             resp.status_code,
#             model_id,
#             raw[:1200],
#         )
#         append_event(
#             "chat_completions_retry_no_tools",
#             provider=prov,
#             status=resp.status_code,
#             model=model_id,
#             body_head=raw[:500],
#         )
#         resp, raw, llm_http_ms = await _post_payload(dict(payload_base))

#     if resp.status_code != 200:
#         head = raw[:1500]
#         raise RuntimeError(f"chat_completions_http provider={prov} status={resp.status_code} body_head={head!r}")

#     try:
#         data = json.loads(raw)
#     except Exception as e:
#         raise RuntimeError(f"chat_completions_invalid_json provider={prov}: {e}") from e

#     choices = data.get("choices") if isinstance(data, dict) else None
#     if not isinstance(choices, list) or not choices:
#         raise RuntimeError(f"chat_completions_no_choices provider={prov}")

#     msg0 = choices[0].get("message") if isinstance(choices[0], dict) else None
#     if not isinstance(msg0, dict):
#         raise RuntimeError(f"chat_completions_no_message provider={prov}")

#     tcalls_raw = msg0.get("tool_calls")
#     out_tools: list[_ToolCallLite] = []
#     if isinstance(tcalls_raw, list):
#         for tc in tcalls_raw:
#             if not isinstance(tc, dict):
#                 continue
#             fn = tc.get("function")
#             if not isinstance(fn, dict):
#                 continue
#             name = str(fn.get("name") or "").strip()
#             if not name:
#                 continue
#             args = fn.get("arguments")
#             if isinstance(args, str):
#                 arg_str = args
#             else:
#                 arg_str = json.dumps(args or {})
#             out_tools.append(_ToolCallLite(name=name, arguments=arg_str or "{}"))

#     if out_tools:
#         return out_tools, "", llm_http_ms

#     content = msg0.get("content")
#     text = (content if isinstance(content, str) else "") or ""
#     return [], (text.strip() or "Okay."), llm_http_ms


# async def _iter_cartesia_pcm16_8k_chunks(
#     text: str,
#     *,
#     tts_model: str,
#     tts_voice_id: str,
#     chunk_ms: int,
#     session: aiohttp.ClientSession,
#     language: str = "en",
# ) -> AsyncIterator[bytes]:
#     """Stream Cartesia TTS as PCM16 mono @ 8 kHz chunks (same framing as batch path)."""
#     import audioop

#     cartesia_key = os.getenv("CARTESIA_API_KEY") or ""
#     if not cartesia_key.strip():
#         raise RuntimeError("CARTESIA_API_KEY is not set (required for Cartesia TTS)")
#     try:
#         cart_vol = float(os.getenv("MCUBE_CARTESIA_TTS_VOLUME", "1.25"))
#     except Exception:
#         cart_vol = 1.25
#     cart_vol = max(0.5, min(2.0, cart_vol))
#     tts = cartesia.TTS(
#         model=tts_model or "sonic-2",
#         voice=tts_voice_id,
#         language=language or "en",
#         api_key=cartesia_key,
#         encoding="pcm_s16le",
#         sample_rate=24000,
#         volume=cart_vol,
#         http_session=session,
#     )
#     target_rate = 8000
#     chunk_samples = int((chunk_ms / 1000.0) * target_rate)
#     chunk_bytes = max(2, chunk_samples * 2)
#     buffer = bytearray()
#     resample_state = None
#     async for synth_audio in tts.synthesize(text):
#         frame = synth_audio.frame
#         pcm16_from = frame.data.tobytes()
#         from_rate = frame.sample_rate
#         converted, resample_state = audioop.ratecv(
#             pcm16_from,
#             2,
#             1,
#             from_rate,
#             target_rate,
#             resample_state,
#         )
#         buffer.extend(converted)
#         while len(buffer) >= chunk_bytes:
#             chunk = bytes(buffer[:chunk_bytes])
#             del buffer[:chunk_bytes]
#             yield chunk
#     if buffer:
#         tail = _pcm16_le_aligned(bytes(buffer))
#         if tail:
#             yield tail


# async def _publish_cartesia_pcm_stream_to_queue(
#     *,
#     call_id: str,
#     sequence_id: int,
#     response_text: str,
#     chunk_ms: int,
#     tts_model: str,
#     tts_voice_id: str,
#     mark_greeting: bool,
#     start_t: float,
#     language: str = "en",
# ) -> int:
#     """Stream Cartesia PCM chunks to the per-call TTS queue (lower latency than batching full utterance)."""
#     channel = _PUBLISH_CHANNEL
#     if channel is None:
#         raise RuntimeError("publish channel not initialized")
#     connection_queue = tts_queue_name(call_id)
#     session = aiohttp.ClientSession()
#     try:
#         n = 0
#         t_first: float | None = None
#         async for pcm16_8k_bytes in _iter_cartesia_pcm16_8k_chunks(
#             response_text,
#             tts_model=tts_model,
#             tts_voice_id=tts_voice_id,
#             chunk_ms=chunk_ms,
#             session=session,
#             language=language,
#         ):
#             if t_first is None:
#                 t_first = time.time()
#                 log.info(
#                     "cartesia_first_chunk call_id=%s seq=%s greeting=%s lead_ms=%.0f",
#                     call_id,
#                     sequence_id,
#                     mark_greeting,
#                     (t_first - start_t) * 1000.0,
#                 )
#                 append_event(
#                     "cartesia_first_chunk_ready",
#                     call_id=call_id,
#                     sequence_id=sequence_id,
#                     greeting=mark_greeting,
#                     lead_ms=int((t_first - start_t) * 1000.0),
#                 )
#             pl: dict[str, object] = {
#                 "call_id": call_id,
#                 "sequence_id": sequence_id,
#                 "chunk_seq": n,
#                 "audio_pcm_b64": _pcm16_to_b64(pcm16_8k_bytes),
#                 "type": "tts_audio_chunk",
#             }
#             if n == 0:
#                 pl["tts_text"] = (response_text or "").strip()[:4000]
#                 if mark_greeting:
#                     pl["is_mcube_greeting"] = True
#             await publish_json(
#                 channel=channel,
#                 queue_name=connection_queue,
#                 payload=pl,
#             )
#             n += 1
#         return n
#     finally:
#         await session.close()


# async def _synthesize_tts_pcm16_8k(
#     text: str,
#     *,
#     tts_provider: str,
#     tts_model: str,
#     tts_voice_id: str,
#     tts_encoding: str,
#     chunk_ms: int | None = None,
#     language: str = "en",
# ) -> list[bytes]:
#     """
#     Synthesizes TTS to PCM16 at 8k, then returns a list of fixed-size chunks.
#     """
#     import audioop
#     import aiohttp
#     import io
#     import av

#     # Create provider-specific TTS instance with an explicit HTTP session.
#     session: aiohttp.ClientSession | None = None
#     try:
#         session = aiohttp.ClientSession()

#         if tts_provider == "cartesia":
#             chunk_ms_eff = chunk_ms if chunk_ms is not None else int(os.getenv("MCUBE_TTS_CHUNK_MS", "200"))
#             return [
#                 c
#                 async for c in _iter_cartesia_pcm16_8k_chunks(
#                     text,
#                     tts_model=tts_model,
#                     tts_voice_id=tts_voice_id,
#                     chunk_ms=chunk_ms_eff,
#                     session=session,
#                     language=language or "en",
#                 )
#             ]
#         else:
#             # ElevenLabs: fetch MP3 and decode+resample to *exactly* PCM16 mono @ 8kHz.
#             # This avoids format/endianness issues that can cause loud popping ("gunshot") audio.
#             elevenlabs_key = os.getenv("ELEVENLABS_API_KEY") or os.getenv("ELEVEN_API_KEY") or ""
#             if not elevenlabs_key:
#                 raise RuntimeError("ELEVENLABS_API_KEY not set")

#             url = f"https://api.elevenlabs.io/v1/text-to-speech/{tts_voice_id}/stream"
#             headers = {
#                 "xi-api-key": elevenlabs_key,
#                 "content-type": "application/json",
#                 "accept": "audio/mpeg",
#             }
#             body = {
#                 "text": text,
#                 "model_id": tts_model,
#                 # keep sending desired format; API may ignore, but it doesn't hurt
#                 "output_format": tts_encoding,
#             }

#             async with session.post(url, headers=headers, json=body) as resp:
#                 mp3_bytes = await resp.read()
#                 if resp.status != 200:
#                     head = mp3_bytes[:300].decode("utf-8", errors="replace")
#                     raise RuntimeError(f"elevenlabs_tts_failed status={resp.status} body_head={head!r}")

#             # Decode + resample using PyAV resampler.
#             target_rate = 8000
#             resampler = av.audio.resampler.AudioResampler(format="s16", layout="mono", rate=target_rate)
#             pcm16_8k = bytearray()

#             with av.open(io.BytesIO(mp3_bytes)) as container:
#                 audio_stream = next((s for s in container.streams if s.type == "audio"), None)
#                 if audio_stream is None:
#                     raise RuntimeError("elevenlabs_decode_failed: no audio stream")
#                 for frame in container.decode(audio_stream):
#                     for out in resampler.resample(frame) or []:
#                         # out is an AudioFrame in s16 mono @ 8k
#                         plane = out.planes[0]
#                         try:
#                             pcm16_8k.extend(plane.to_bytes())  # newer PyAV
#                         except Exception:
#                             # older PyAV: plane is bytes-like via memoryview()
#                             pcm16_8k.extend(memoryview(plane))

#             # Chunk directly at 8k now.
#             chunk_ms_eff = chunk_ms if chunk_ms is not None else int(os.getenv("MCUBE_TTS_CHUNK_MS", "200"))
#             chunk_bytes = int((chunk_ms_eff / 1000.0) * target_rate) * 2  # int16 mono
#             chunks: list[bytes] = []
#             buf = bytes(pcm16_8k)
#             for i in range(0, len(buf), chunk_bytes):
#                 piece = _pcm16_le_aligned(buf[i : i + chunk_bytes])
#                 if piece:
#                     chunks.append(piece)
#             return chunks
#     finally:
#         if session is not None:
#             await session.close()


# async def process_utterance(message) -> None:
#     async with message.process(requeue=False):
#         payload = json.loads(message.body.decode("utf-8"))

#         call_id = str(payload["call_id"])
#         sequence_id = int(payload.get("sequence_id", 0))
#         user_text = payload.get("transcript") or ""
#         bot_say_text = (payload.get("bot_say_text") or "").strip()

#         rt = await _resolve_runtime_from_payload(payload)
#         system_prompt = rt["system_prompt"]
#         llm_model = rt["llm_model"]
#         llm_provider = str(rt.get("llm_provider") or "")
#         tts_model = rt["tts_model"]
#         tts_provider = str(rt.get("tts_provider") or "elevenlabs")
#         tts_voice_id = rt["tts_voice_id"]
#         tts_encoding = rt["tts_encoding"]
#         try:
#             tts_chunk_ms = max(5, min(500, int(float(str(rt.get("tts_chunk_ms") or "200")))))
#         except Exception:
#             tts_chunk_ms = 200

#         # Greeting (MCUBE_FIRST_MESSAGE): use larger PCM frames than normal so the opening line has
#         # fewer RabbitMQ + playAudio edges (main cause of "breaking" on the first sentence).
#         tts_chunk_for_utterance = tts_chunk_ms
#         if bot_say_text:
#             try:
#                 floor_ms = int(os.getenv("MCUBE_GREETING_MIN_TTS_CHUNK_MS", "240"))
#             except Exception:
#                 floor_ms = 240
#             floor_ms = max(80, min(500, floor_ms))
#             tts_chunk_for_utterance = max(tts_chunk_ms, floor_ms)
#             try:
#                 g_raw = os.getenv("MCUBE_GREETING_TTS_CHUNK_MS", "").strip()
#                 if g_raw:
#                     g = int(g_raw)
#                     if g > 0:
#                         tts_chunk_for_utterance = min(tts_chunk_for_utterance, max(40, g))
#             except Exception:
#                 tts_chunk_for_utterance = max(tts_chunk_ms, floor_ms)
#             if tts_chunk_for_utterance != tts_chunk_ms:
#                 log.info(
#                     "greeting chunk_ms=%s (base tts_chunk_ms=%s) opening-line TTS coalescing",
#                     tts_chunk_for_utterance,
#                     tts_chunk_ms,
#                 )

#         detected_language = str(rt.get("stt_language_code") or "en")
#         if user_text and not bot_say_text:
#             detected_language = detect_language(user_text)
#             log.info(
#                 "language_detected call_id=%s seq=%s language=%s text=%r",
#                 call_id,
#                 sequence_id,
#                 detected_language,
#                 user_text[:50],
#             )

#         start_t = time.time()
#         preview = (bot_say_text or user_text or "")[:120]
#         log.info(
#             "utterance_received call_id=%s seq=%s stt_text=%r llm_model=%s tts_provider=%s tts_model=%s tts_voice_id=%s language=%s",
#             call_id,
#             sequence_id,
#             preview,
#             llm_model,
#             tts_provider,
#             tts_model,
#             tts_voice_id,
#             detected_language,
#         )
#         append_event(
#             "utterance_received",
#             call_id=call_id,
#             sequence_id=sequence_id,
#             preview=preview,
#             is_greeting=bool(bot_say_text),
#             tts_provider=tts_provider,
#         )
#         _terminal_ai(
#             f"> PULLED from queue={AI_UTTERANCES_QUEUE!r} call={call_id} seq={sequence_id} "
#             f"greeting={bool(bot_say_text)} text={preview!r}"
#         )
#         chunks_8k: list[bytes] | None = None
#         tts_streamed_chunk_count: int | None = None
#         cartesia_llm_pending = False
#         try:
#             # If ws_bridge requested a direct bot message, skip LLM and just synthesize this text.
#             if bot_say_text:
#                 response_text = bot_say_text
#                 _terminal_ai_bot_text(call_id, sequence_id, response_text, source="greeting TTS")
#                 try:
#                     await _append_assistant_only(call_id, response_text)
#                 except Exception:
#                     log.warning(
#                         "chat_history_append_failed (greeting) call_id=%s",
#                         call_id,
#                         exc_info=True,
#                     )
#                 # Cartesia: stream chunks to Rabbit as they are synthesized so the bridge can
#                 # buffer/play the opening line while the rest of the greeting is still generating.
#                 if tts_provider == "cartesia":
#                     tts_streamed_chunk_count = await _publish_cartesia_pcm_stream_to_queue(
#                         call_id=call_id,
#                         sequence_id=sequence_id,
#                         response_text=response_text,
#                         chunk_ms=tts_chunk_for_utterance,
#                         tts_model=tts_model,
#                         tts_voice_id=tts_voice_id,
#                         mark_greeting=True,
#                         start_t=start_t,
#                         language=detected_language,
#                     )
#                 else:
#                     chunks_8k = await _synthesize_tts_pcm16_8k(
#                         response_text,
#                         tts_provider=tts_provider,
#                         tts_model=tts_model,
#                         tts_voice_id=tts_voice_id,
#                         tts_encoding=tts_encoding,
#                         chunk_ms=tts_chunk_for_utterance,
#                         language=detected_language,
#                     )
#             else:
#                 # Tool schemas (the model can emit these as structured tool calls).
#                 end_call_schema = {
#                     "type": "function",
#                     "name": "end_call",
#                     "description": "Terminate the current phone call immediately.",
#                     "parameters": {
#                         "type": "object",
#                         "properties": {},
#                         "required": [],
#                         "additionalProperties": False,
#                     },
#                 }

#                 transfer_to_number_schema = {
#                     "type": "function",
#                     "name": "transfer_to_number",
#                     "description": "Transfer the call to another phone number.",
#                     "parameters": {
#                         "type": "object",
#                         "properties": {
#                             "phone_number": {
#                                 "type": "string",
#                                 "description": "Target phone number in E.164 format (preferred).",
#                             },
#                             "transfer_number": {
#                                 "type": "string",
#                                 "description": "Alias for phone_number.",
#                             },
#                             "sip_uri": {
#                                 "type": "string",
#                                 "description": "Optional SIP URI like sip:+918...@host.",
#                             },
#                         },
#                         "additionalProperties": False,
#                     },
#                 }

#                 async def _end_call_tool(raw_arguments: dict[str, object]) -> str:
#                     return "ok"

#                 async def _transfer_to_number_tool(raw_arguments: dict[str, object]) -> str:
#                     return "ok"

#                 tools = [
#                     function_tool(_end_call_tool, raw_schema=end_call_schema),
#                     function_tool(_transfer_to_number_tool, raw_schema=transfer_to_number_schema),
#                 ]

#                 tool_instructions = (
#                     "\n\nTool-use rules for MCube phone calls:\n"
#                     "- Call `end_call` only when the caller clearly asks to hang up or says goodbye and you have acknowledged—"
#                     "never because of silence, timeout, or the start of the call.\n"
#                     "- Call `transfer_to_number` to transfer; provide phone_number or sip_uri.\n"
#                     "- When calling a tool, do not include additional spoken content; the caller side will act on the tool."
#                     "\n\nConversation memory: prior messages in this chat are earlier turns on the same phone call. "
#                     "Do not repeat the same sentence, greeting, product pitch, or question you already said. "
#                     "Do not ask again what the user already answered. Advance with one new idea or one new question per reply."
#                     "\n\nNever repeat your immediately previous reply verbatim or with only tiny edits—vary wording and move the conversation forward."
#                     "\n\nWhen speaking (no tool): keep a natural medium pace; short clauses sound clearer on phone audio."
#                 )
#                 system_prompt_with_lang = augment_system_prompt_with_language(system_prompt, detected_language)
#                 system_prompt_with_tools = f"{system_prompt_with_lang}{tool_instructions}"

#                 backend = _mcube_llm_backend()
#                 llm: Any = None
#                 if backend not in ("openai", "together"):
#                     lk_url = (os.getenv("LIVEKIT_URL") or "").strip()
#                     lk_key = (os.getenv("LIVEKIT_API_KEY") or "").strip()
#                     if not lk_url or not lk_key:
#                         log.error(
#                             "mcube_llm_backend=livekit but LIVEKIT_URL/LIVEKIT_API_KEY missing "
#                             "(have_url=%s have_key=%s) — inference.LLM will not connect",
#                             bool(lk_url),
#                             bool(lk_key),
#                         )
#                         append_event(
#                             "livekit_inference_missing_env",
#                             have_url=bool(lk_url),
#                             have_key=bool(lk_key),
#                             call_id=call_id,
#                         )
#                     llm = (
#                         inference.LLM(model=llm_model, provider=llm_provider or None)
#                         if llm_provider
#                         else inference.LLM(model=llm_model)
#                     )
#                 prior_turns = await _load_chat_messages(call_id)
#                 last_asst = _last_assistant_from_turns(prior_turns)

#                 tool_calls: list[Any] = []
#                 response_text = ""

#                 for attempt in range(2):
#                     sys_body = system_prompt_with_tools
#                     if attempt > 0:
#                         sys_body += (
#                             "\n\nRegenerate: your last draft was too similar to what you already said on this call. "
#                             "Reply with exactly one or two short new sentences—different words, new information or question."
#                         )

#                     if backend in ("openai", "together"):
#                         tool_calls, response_text, http_llm_ms = await _chat_completions_turn(
#                             provider=backend,
#                             sys_body=sys_body,
#                             prior_turns=prior_turns,
#                             user_text=user_text,
#                             llm_model=llm_model,
#                         )
#                         log.info(
#                             "chat_completions_turn provider=%s call_id=%s seq=%s tools=%s text_len=%s llm_http_ms=%s",
#                             backend,
#                             call_id,
#                             sequence_id,
#                             len(tool_calls),
#                             len(response_text or ""),
#                             http_llm_ms,
#                         )
#                         _mx = _direct_openai_max_tokens()
#                         _mid = _rest_chat_model_id(backend, llm_model)
#                         append_event(
#                             "chat_completions_llm",
#                             provider=backend,
#                             call_id=call_id,
#                             sequence_id=sequence_id,
#                             attempt=attempt,
#                             llm_http_ms=http_llm_ms,
#                             max_tokens=_mx,
#                             model=_mid,
#                             tools=len(tool_calls),
#                         )
#                         if backend == "openai":
#                             append_event(
#                                 "direct_openai_llm",
#                                 call_id=call_id,
#                                 sequence_id=sequence_id,
#                                 attempt=attempt,
#                                 llm_http_ms=http_llm_ms,
#                                 max_tokens=_mx,
#                                 model=_mid,
#                                 tools=len(tool_calls),
#                             )
#                         _tgt = _direct_openai_latency_target_ms()
#                         _prov_label = "OpenAI" if backend == "openai" else "Together"
#                         _terminal_ai(
#                             f"t {_prov_label} Chat Completions latency={http_llm_ms}ms "
#                             f"max_tokens={_mx} target<={_tgt}ms "
#                             f"(HTTP only, no TTS) call={call_id} seq={sequence_id} attempt={attempt}"
#                         )
#                         if http_llm_ms > _tgt:
#                             append_event(
#                                 "chat_completions_llm_slow",
#                                 provider=backend,
#                                 call_id=call_id,
#                                 sequence_id=sequence_id,
#                                 attempt=attempt,
#                                 llm_http_ms=http_llm_ms,
#                                 latency_target_ms=_tgt,
#                                 max_tokens=_mx,
#                             )
#                             _terminal_ai(
#                                 f"! {_prov_label} LLM {http_llm_ms}ms exceeds latency target {_tgt}ms "
#                                 f"(network/API; smaller max_tokens / faster model helps)"
#                             )
#                     else:
#                         assert llm is not None
#                         chat_ctx = ChatContext.empty()
#                         chat_ctx.add_message(role="system", content=sys_body)
#                         for m in prior_turns:
#                             chat_ctx.add_message(role=m["role"], content=m["content"])
#                         chat_ctx.add_message(role="user", content=user_text)

#                         stream = llm.chat(
#                             chat_ctx=chat_ctx,
#                             tools=tools,
#                             parallel_tool_calls=False,
#                         )

#                         tool_calls = []
#                         text_parts: list[str] = []
#                         async for chunk in stream:
#                             if chunk.delta:
#                                 if chunk.delta.content:
#                                     text_parts.append(chunk.delta.content)
#                                 if chunk.delta.tool_calls:
#                                     tool_calls.extend(chunk.delta.tool_calls)

#                         if not tool_calls:
#                             response_text = "".join(text_parts).strip()
#                             if not response_text:
#                                 response_text = "Okay."

#                     if tool_calls:
#                         break

#                     if not response_text:
#                         response_text = "Okay."

#                     if (
#                         attempt == 0
#                         and last_asst
#                         and _is_near_duplicate_assistant_reply(response_text, last_asst)
#                     ):
#                         log.info(
#                             "llm_reply_near_duplicate_retry call_id=%s seq=%s",
#                             call_id,
#                             sequence_id,
#                         )
#                         append_event(
#                             "llm_reply_dedup_retry",
#                             call_id=call_id,
#                             sequence_id=sequence_id,
#                         )
#                         continue
#                     break

#                 if not tool_calls and (response_text or "").strip():
#                     if backend == "openai":
#                         src = "OpenAI direct"
#                     elif backend == "together":
#                         src = "Together AI"
#                     else:
#                         src = "LiveKit LLM"
#                     _terminal_ai_bot_text(call_id, sequence_id, response_text, source=src)

#                 # If the model requested a control action, publish it and skip TTS.
#                 if tool_calls:
#                     _terminal_ai(
#                         f"[tools] call={call_id} seq={sequence_id} "
#                         f"names={[getattr(tc, 'name', '') for tc in tool_calls]!r}"
#                     )
#                     control_queue = control_queue_name(call_id)
#                     channel = _PUBLISH_CHANNEL
#                     if channel is None:
#                         raise RuntimeError("publish channel not initialized")

#                     for tc in tool_calls:
#                         try:
#                             tool_args = json.loads(tc.arguments or "{}")
#                         except Exception:
#                             tool_args = {}

#                         if tc.name == "end_call":
#                             await publish_json(
#                                 channel=channel,
#                                 queue_name=control_queue,
#                                 payload={
#                                     "type": "mcube_terminate",
#                                     "call_id": call_id,
#                                     "sequence_id": sequence_id,
#                                 },
#                             )
#                             return

#                         if tc.name == "transfer_to_number":
#                             transfer_to = (
#                                 tool_args.get("phone_number")
#                                 or tool_args.get("transfer_number")
#                                 or tool_args.get("sip_uri")
#                             )
#                             await publish_json(
#                                 channel=channel,
#                                 queue_name=control_queue,
#                                 payload={
#                                     "type": "mcube_transfer",
#                                     "call_id": call_id,
#                                     "sequence_id": sequence_id,
#                                     "transfer_to": transfer_to,
#                                 },
#                             )
#                             return

#                 if tts_provider == "cartesia":
#                     cartesia_llm_pending = True
#                 else:
#                     chunks_8k = await _synthesize_tts_pcm16_8k(
#                         response_text,
#                         tts_provider=tts_provider,
#                         tts_model=tts_model,
#                         tts_voice_id=tts_voice_id,
#                         tts_encoding=tts_encoding,
#                         chunk_ms=tts_chunk_ms,
#                         language=detected_language,
#                     )
#         except Exception as e:
#             log.exception("utterance_processing_failed call_id=%s seq=%s", call_id, sequence_id)
#             _terminal_ai(
#                 f"X UTTERANCE FAILED call={call_id} seq={sequence_id} greeting={bool(bot_say_text)}: {e!r}"
#             )
#             append_event(
#                 "utterance_failed",
#                 call_id=call_id,
#                 sequence_id=sequence_id,
#                 error=str(e),
#                 is_greeting=bool(bot_say_text),
#             )
#             await _publish_tts_done(call_id, sequence_id, final_chunk_seq=-1, started_at=start_t)
#             return

#         # Cartesia streaming + final tts_done live outside the LLM try above; failures here used to skip
#         # tts_done entirely → ws_bridge stuck with response_pending and no PSTN audio (≈5–8s hangup).
#         try:
#             if not bot_say_text:
#                 try:
#                     await _append_chat_exchange(call_id, user_text, response_text)
#                 except Exception:
#                     log.warning("chat_history_append_failed call_id=%s", call_id, exc_info=True)

#             if cartesia_llm_pending:
#                 tts_streamed_chunk_count = await _publish_cartesia_pcm_stream_to_queue(
#                     call_id=call_id,
#                     sequence_id=sequence_id,
#                     response_text=response_text,
#                     chunk_ms=tts_chunk_ms,
#                     tts_model=tts_model,
#                     tts_voice_id=tts_voice_id,
#                     mark_greeting=False,
#                     start_t=start_t,
#                     language=detected_language,
#                 )

#             if tts_streamed_chunk_count is not None:
#                 chunk_count = tts_streamed_chunk_count
#             else:
#                 chunk_count = len(chunks_8k or [])

#             if chunk_count == 0:
#                 log.error(
#                     "tts produced zero chunks call_id=%s seq=%s bot_say=%r",
#                     call_id,
#                     sequence_id,
#                     (bot_say_text or user_text or "")[:200],
#                 )
#                 _terminal_ai(
#                     f"X TTS ZERO CHUNKS call={call_id} seq={sequence_id} provider={tts_provider!r} "
#                     f"- check API keys / MCUBE_TTS_* / logs above"
#                 )
#                 append_event(
#                     "tts_zero_chunks",
#                     call_id=call_id,
#                     sequence_id=sequence_id,
#                     is_greeting=bool(bot_say_text),
#                     hint="Check CARTESIA_API_KEY, MCUBE_TTS_MODEL/voice, or Cartesia API errors above",
#                 )
#                 await _publish_tts_done(call_id, sequence_id, final_chunk_seq=-1, started_at=start_t)
#                 return

#             connection_queue = tts_queue_name(call_id)
#             channel = _PUBLISH_CHANNEL
#             if channel is None:
#                 raise RuntimeError("publish channel not initialized")

#             if tts_streamed_chunk_count is None:
#                 assert chunks_8k is not None
#                 for chunk_seq, pcm16_8k_bytes in enumerate(chunks_8k):
#                     pl2: dict[str, object] = {
#                         "call_id": call_id,
#                         "sequence_id": sequence_id,
#                         "chunk_seq": chunk_seq,
#                         "audio_pcm_b64": _pcm16_to_b64(pcm16_8k_bytes),
#                         "type": "tts_audio_chunk",
#                     }
#                     if chunk_seq == 0:
#                         pl2["tts_text"] = (response_text or "").strip()[:4000]
#                         if bot_say_text:
#                             pl2["is_mcube_greeting"] = True
#                     await publish_json(
#                         channel=channel,
#                         queue_name=connection_queue,
#                         payload=pl2,
#                     )

#             await publish_json(
#                 channel=channel,
#                 queue_name=connection_queue,
#                 payload={
#                     "call_id": call_id,
#                     "sequence_id": sequence_id,
#                     "type": "tts_done",
#                     "final_chunk_seq": max(0, chunk_count - 1),
#                     "tts_generation_ms": int((time.time() - start_t) * 1000),
#                 },
#             )
#             gen_ms = int((time.time() - start_t) * 1000)
#             log.info(
#                 "tts_published call_id=%s seq=%s chunks=%s tts_generation_ms=%s streamed_cartesia=%s",
#                 call_id,
#                 sequence_id,
#                 chunk_count,
#                 gen_ms,
#                 tts_streamed_chunk_count is not None,
#             )
#             append_event(
#                 "tts_published",
#                 call_id=call_id,
#                 sequence_id=sequence_id,
#                 chunks=chunk_count,
#                 tts_generation_ms=gen_ms,
#                 is_greeting=bool(bot_say_text),
#             )
#             _terminal_ai(
#                 f"* TTS PUBLISHED -> queue {tts_queue_name(call_id)!r} call={call_id} seq={sequence_id} "
#                 f"chunks={chunk_count} ms={gen_ms} greeting={bool(bot_say_text)}"
#             )
#         except Exception as e:
#             log.exception("tts_publish_pipeline_failed call_id=%s seq=%s", call_id, sequence_id)
#             _terminal_ai(
#                 f"X TTS PIPELINE FAILED call={call_id} seq={sequence_id} provider={tts_provider!r}: {e!r}"
#             )
#             append_event(
#                 "tts_publish_pipeline_failed",
#                 call_id=call_id,
#                 sequence_id=sequence_id,
#                 error=str(e),
#                 tts_provider=tts_provider,
#             )
#             await _publish_tts_done(call_id, sequence_id, final_chunk_seq=-1, started_at=start_t)


# async def main() -> None:
#     logging.basicConfig(level=logging.INFO)
#     configure_mcube_file_logging("ai_worker")

#     connection = await connect()
#     # One utterance at a time avoids stuck prefetch blocking later calls (default RABBITMQ_PREFETCH can be 10).
#     ai_pref = int(os.getenv("RABBITMQ_PREFETCH_AI", "1"))
#     channel = await get_channel(connection, prefetch_count=ai_pref)
#     global _PUBLISH_CHANNEL
#     _PUBLISH_CHANNEL = channel

#     await declare_durable_queue(channel, AI_UTTERANCES_QUEUE)

#     log.info("ai_worker: consuming %s on %s", AI_UTTERANCES_QUEUE, RABBITMQ_URL)
#     _bk = _mcube_llm_backend()
#     _key_ok = bool((os.getenv("OPENAI_API_KEY") or "").strip().strip('"').strip("'"))
#     _tg_ok = bool((os.getenv("TOGETHER_API_KEY") or "").strip().strip('"').strip("'"))
#     _cart_ok = bool((os.getenv("CARTESIA_API_KEY") or "").strip())
#     _lm = (os.getenv("MCUBE_LLM_MODEL") or "").strip()
#     _tg_base = (os.getenv("MCUBE_TOGETHER_BASE_URL") or "https://api.together.xyz/v1").rstrip("/")
#     _terminal_ai(
#         f"cfg LLM backend={_bk!r} model={_lm!r} together_api={_tg_base!r} "
#         f"OPENAI_API_KEY={'set' if _key_ok else 'MISSING'} "
#         f"TOGETHER_API_KEY={'set' if _tg_ok else 'MISSING'} "
#         f"CARTESIA_API_KEY={'set' if _cart_ok else 'MISSING'} "
#         f"(restart after editing backend/.env)"
#     )

#     queue = await channel.declare_queue(AI_UTTERANCES_QUEUE, durable=True)
#     await queue.consume(process_utterance, no_ack=False)

#     await asyncio.Future()  # run forever


# if __name__ == "__main__":
#     asyncio.run(main())

