import asyncio

import aiohttp

import base64

import json

import logging

import os

import sys

import time

from dataclasses import dataclass

from typing import Any, AsyncGenerator, Optional

import types

import re

import openai

import redis.asyncio as redis_async

REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
_redis_client: Optional[redis_async.Redis] = None



# Conversation context storage per call_id to maintain memory across utterances

_call_contexts: dict[str, "ChatContext"] = {}
_direct_call_contexts: dict[str, list[dict[str, Any]]] = {}
# Caller first name / preferred name from conversation (phone calls only; now stored in Redis).
# _caller_names: dict[str, str] = {}
# Set when the caller explicitly picks English / Hindi / Kannada in the fast-path handler (now stored in Redis).
# _caller_language_locked: dict[str, str] = {}
# True after the scripted "which language are you comfortable with" line was used for this call (now stored in Redis).
# _language_preference_prompt_sent: dict[str, bool] = {}
# Latest explicit en/hi/kn from the caller's words (may be set before we know their name) (now stored in Redis).
# _caller_explicit_language: dict[str, str] = {}
# True after we used the "glad to speak X, may I know your name" line without a name yet (now stored in Redis).
# _language_prompted_without_name: dict[str, bool] = {}

_NOT_NAME_WORDS: frozenset[str] = frozenset(
    {
        "hello",
        "hi",
        "hey",
        "yes",
        "no",
        "yeah",
        "ya",
        "okay",
        "ok",
        "sure",
        "english",
        "hindi",
        "hinthi",
        "hindee",
        "hindhi",
        "hinglish",
        "kannada",
        "kanada",
        "namaste",
        "thanks",
        "thank",
        "you",
        "please",
        "here",
        "there",
        "interested",
        "calling",
        "good",
        "fine",
        "well",
        "not",
        "the",
        "a",
        "an",
        "is",
        "are",
        "was",
        "were",
        "doing",
        "speaking",
        "listening",
        "something",
        "nothing",
        "same",
        "busy",
        "about",
        "from",
        "when",
        "what",
        "why",
        "how",
    }
)


def _format_caller_name_for_speech(raw: str) -> str:
    def fmt_word(w: str) -> str:
        if "-" in w:
            return "-".join(fmt_word(x) for x in w.split("-") if x)
        if len(w) <= 1:
            return w.upper()
        return w[:1].upper() + w[1:].lower()

    parts = [p for p in re.split(r"\s+", raw.strip()) if p]
    return " ".join(fmt_word(p) for p in parts)


def _looks_like_person_name(candidate: str) -> bool:
    c = candidate.strip()
    if len(c) < 2 or len(c) > 60:
        return False
    words = c.split()
    if len(words) > 4:
        return False
    for w in words:
        wl = re.sub(r"[^a-z]", "", w.lower())
        if wl and wl in _NOT_NAME_WORDS:
            return False
        if not re.match(r"^[A-Za-z][A-Za-z'.-]{0,29}$", w):
            return False
    return True


_NAME_CAPTURE_STOP = (
    r"(?:and|but|I|we|can|want|need|from|in|for|at|on|with|here|there|the|is|are|was|were)\b"
)


def _trim_name_capture_tail_noise(c: str) -> str:
    """Drop trailing filler tokens STT often glues after a name (e.g. 'Priya here', 'John from Bangalore')."""
    junk = frozenset(
        {
            "from",
            "in",
            "for",
            "at",
            "on",
            "with",
            "here",
            "there",
            "sir",
            "madam",
            "maam",
            "mam",
            "ji",
            "speaking",
            "calling",
            "listening",
        }
    )
    parts = [p for p in c.strip().split() if p]
    while len(parts) > 1:
        wl = re.sub(r"[^a-z]", "", parts[-1].lower())
        if wl not in junk:
            break
        parts.pop()
    return " ".join(parts)


def _extract_caller_name(text: str) -> Optional[str]:
    """Best-effort name from Latin-script transcripts (STT)."""
    raw = (text or "").strip()
    if not raw:
        return None

    explicit_patterns = [
        re.compile(
            rf"\bmy name is[,\s]+([A-Za-z][A-Za-z\s'.-]{{0,80}}?)(?=[,.;!?]|$|\s+{_NAME_CAPTURE_STOP})",
            re.I,
        ),
        re.compile(
            rf"\bcall me[,\s]+([A-Za-z][A-Za-z'.-]{{0,40}}?)(?=[,.;!?]|$|\s+{_NAME_CAPTURE_STOP})",
            re.I,
        ),
        re.compile(
            rf"\bthis is[,\s]+([A-Za-z][A-Za-z'.-]{{0,40}}?)(?=[,.;!?]|$|\s+{_NAME_CAPTURE_STOP})",
            re.I,
        ),
        re.compile(
            rf"\bi am[,\s]+([A-Za-z][A-Za-z\s'.-]{{0,40}}?)(?=[,.;!?]|$|\s+{_NAME_CAPTURE_STOP})",
            re.I,
        ),
        re.compile(
            rf"\bi'?m[,\s]+([A-Za-z][A-Za-z\s'.-]{{0,40}}?)(?=[,.;!?]|$|\s+{_NAME_CAPTURE_STOP})",
            re.I,
        ),
    ]
    for pat in explicit_patterns:
        m = pat.search(raw)
        if not m:
            continue
        cand = m.group(1).strip().strip("'\"")
        cand = re.split(r"[,.;!?]", cand)[0].strip()
        cand = _trim_name_capture_tail_noise(cand)
        if _looks_like_person_name(cand):
            return _format_caller_name_for_speech(cand)

    # Single-line reply that is likely only a name (after name question).
    line = re.sub(r"[,.;!?]+$", "", raw).strip()
    if "\n" in line:
        line = line.split("\n", 1)[0].strip()
    norm_line = re.sub(r"\s+", " ", line).strip()
    tokens = re.findall(r"[A-Za-z][A-Za-z'.-]*", norm_line)
    if not tokens:
        return None
    if len(tokens) == 1:
        cand = tokens[0]
        if cand.lower() not in _NOT_NAME_WORDS and _looks_like_person_name(cand):
            return _format_caller_name_for_speech(cand)
        return None
    if len(tokens) == 2 and tokens[0].lower() in {"hi", "hey", "hello"}:
        cand = tokens[1]
        if cand.lower() not in _NOT_NAME_WORDS and _looks_like_person_name(cand):
            return _format_caller_name_for_speech(cand)
        return None
    if len(tokens) == 2 and " ".join(tokens) == norm_line:
        cand = " ".join(tokens)
        cand = _trim_name_capture_tail_noise(cand)
        if cand.lower() not in _NOT_NAME_WORDS and _looks_like_person_name(cand):
            return _format_caller_name_for_speech(cand)

    return None


async def _maybe_capture_caller_name(call_id: str, user_text: str) -> bool:
    """Parse caller name from transcript; returns True when we newly stored or updated it."""
    ext = _extract_caller_name(user_text)
    if not ext:
        return False
    prev_bytes = await _redis_client.get(f"caller_name:{call_id}") if _redis_client else None
    prev = prev_bytes.decode("utf-8") if prev_bytes else None
    await _redis_client.set(f"caller_name:{call_id}", ext, ex=3600)
    if prev != ext:
        await _redis_client.delete(f"language_prompted_without_name:{call_id}")
    return prev != ext


@dataclass(frozen=True)
class DirectToolCall:
    name: str
    arguments: str

# Phase 9.1: OpenAI client for direct API calls (bypass LiveKit gateway)
_openai_client = None

def get_openai_client():
    global _openai_client
    if _openai_client is None:
        api_key = os.getenv("OPENAI_API_KEY")
        if not api_key:
            raise RuntimeError("OPENAI_API_KEY not set")
        _openai_client = openai.AsyncOpenAI(api_key=api_key)
    return _openai_client





from .mcube_language import detect_language_preference, resolve_detected_language


def sanitize_text_for_tts(text: str) -> str:
    """
    Normalize text before TTS: snake_case, slug-hyphens, glued CamelCase, and markdown crumbs
    often sound like 'underscore', dashes, or chopped words on phone TTS.
    """
    if not text:
        return ""
    t = text.replace("\u00a0", " ").replace("\u200b", "")
    t = re.sub(r"_+", " ", t)
    # Slug-style hyphens between letters/digits (SNN-Felicity) -> spaces for natural speech.
    t = re.sub(r"(?<=[A-Za-z0-9])-(?=[A-Za-z0-9])", " ", t)
    # Drop markdown/code characters the model sometimes emits into spoken lines.
    t = re.sub(r"[`*#|]+", " ", t)
    # Split glued words: fooBar -> foo Bar (helps some compound brand tokens).
    t = re.sub(r"(?<=[a-z])(?=[A-Z])", " ", t)
    t = re.sub(r"(?<=[A-Za-z])(?=[0-9])", " ", t)
    t = re.sub(r"(?<=[0-9])(?=[A-Za-z])", " ", t)
    t = re.sub(r"[ \t\r\n]+", " ", t).strip()
    return t


def collapse_spoken_repetition(text: str) -> str:
    """
    Strip stutter-style repeats from model or scripted lines before TTS (e.g. "Hello Hello",
    duplicated sentences).
    """
    t = (text or "").strip()
    if not t:
        return t
    for _ in range(24):
        n = re.sub(r"(\b[\w'-]{2,48}\b)(\s+\1\b)+", r"\1", t, flags=re.IGNORECASE)
        if n == t:
            break
        t = n
    parts = re.split(r"(?<=[.!?…])\s+", t)
    if len(parts) < 2:
        return t
    out: list[str] = []
    prev_key: Optional[str] = None
    for p in parts:
        s = p.strip()
        if not s:
            continue
        key = re.sub(r"\s+", " ", s.lower())
        if key == prev_key:
            continue
        out.append(s)
        prev_key = key
    joined = " ".join(x.strip() for x in out if x.strip()).strip()
    return joined or t


def prepare_text_for_phone_tts(text: str) -> str:
    """Normalize then remove duplicate phrasing for natural phone playback."""
    return collapse_spoken_repetition(sanitize_text_for_tts(text))


async def _fast_response_for_user_text(
    user_text: str,
    call_id: str,
    *,
    name_newly_learned: bool,
) -> Optional[str]:
    text = (user_text or "").strip()
    if not text:
        return None

    name_bytes = await _redis_client.get(f"caller_name:{call_id}") if _redis_client else None
    name = name_bytes.decode("utf-8") if name_bytes else None
    words = set(re.findall(r"[a-zA-Z']+", text.lower()))
    preferred_language = detect_language_preference(text)
    lang_label = {"en": "English", "hi": "Hindi", "kn": "Kannada"}

    # Already acknowledged this language choice — avoid repeating the same scripted line.
    language_locked_bytes = await _redis_client.get(f"language_locked:{call_id}") if _redis_client else None
    language_locked = language_locked_bytes.decode("utf-8") if language_locked_bytes else None
    if (
        preferred_language
        and name
        and language_locked == preferred_language
    ):
        return None

    # Caller named a language before sharing their name — ask for name first.
    if preferred_language and not name:
        ln = lang_label.get(preferred_language, "that language")
        prompted_without_name_bytes = await _redis_client.get(f"language_prompted_without_name:{call_id}") if _redis_client else None
        prompted_without_name = prompted_without_name_bytes.decode("utf-8") if prompted_without_name_bytes else None
        explicit_language_bytes = await _redis_client.get(f"caller_language:{call_id}") if _redis_client else None
        explicit_language = explicit_language_bytes.decode("utf-8") if explicit_language_bytes else None
        if (
            prompted_without_name
            and explicit_language == preferred_language
        ):
            return "May I have your name, please?"
        await _redis_client.set(f"language_prompted_without_name:{call_id}", "True", ex=3600)
        return f"I'd be glad to speak in {ln}. First, may I know your name, please?"

    # Caller chose a language and we already know their name.
    if preferred_language and name:
        await _redis_client.set(f"language_locked:{call_id}", preferred_language, ex=3600)
        if preferred_language == "en":
            return (
                f"Sure, {name}, we can continue in English. "
                "This is Alisha from SNN Estates, calling about SNN Felicity."
            )
        if preferred_language == "hi":
            return (
                f"Bilkul, {name}, hum Hindi mein baat kar sakte hain. Main Alisha, SNN Estates se."
            )
        if preferred_language == "kn":
            return (
                f"{name}, I'll keep it simple in Kannada or plain English — whichever is easier for you."
            )

    # Just learned their name this turn; ask preferred language next.
    if (
        name_newly_learned
        and name
        and not preferred_language
        and not language_locked
        and not explicit_language
    ):
        await _redis_client.set(f"prompt_sent:{call_id}", "True", ex=3600)
        return (
            f"Thank you, {name}. Which language are you comfortable speaking in — "
            "English, Hindi, or Kannada?"
        )

    compact_reply = re.sub(r"\s+", " ", text.lower()).strip()
    if {"who", "this"} <= words or "who is this" in text.lower():
        if not name:
            return (
                "This is Alisha from SNN Estates, calling about your enquiry for SNN Felicity. "
                "May I know your name, please?"
            )
        return (
            f"This is Alisha from SNN Estates, calling about your enquiry for SNN Felicity, {name}. "
            "Is this a good time to speak?"
        )

    if words <= {"hello", "hi", "hey"}:
        if not name:
            return (
                "Hello, this is Alisha from SNN Estates. May I know your name, please?"
            )
        return (
            f"Hi {name}, I'm here with you from SNN Estates. "
            "What would you like to know about SNN Felicity?"
        )

    if compact_reply in {"yes", "yeah", "ya", "okay", "ok"}:
        if not name:
            return (
                "Thank you. May I know your name, please? "
                "Then we can choose a language you're comfortable with."
            )
        return (
            f"Thank you, {name}. I'll keep this brief. "
            "Are you looking at SNN Felicity for living or investment?"
        )

    return None





def augment_system_prompt_with_language(system_prompt: str, language: str) -> str:

    """

    Add language-specific instructions to the system prompt based on detected language.

    """

    if language == "hi":

        language_instruction = (
            "\n\nLANGUAGE INSTRUCTION: STRICTLY respond in Hindi ONLY. "
            "Do NOT respond in English. "
            "Do not use Kannada script or Kannada vocabulary unless quoting the user. "
            "Use natural conversational Hindi as spoken in North India; be warm and polite (e.g. 'aap', 'ji'). "
            "This is a strict requirement - always respond in Hindi."
        )

    elif language == "kn":

        language_instruction = (
            "\n\nLANGUAGE INSTRUCTION: Respond only in Kannada or very simple English. "
            "Do not reply in Hindi or Devanagari unless quoting the user. "
            "If Kannada output is uncertain, use minimal English and acknowledge their Kannada preference briefly."
        )

    else:

        language_instruction = "\n\nLANGUAGE INSTRUCTION: Respond in English. Use clear, professional, and natural conversational English."

    spoken_format = (
        "\n\nSPOKEN OUTPUT: Read aloud by phone TTS. Write every person or place name as normal words "
        "with spaces between words. Never join name parts with underscores, slashes, or programming-style hyphens; "
        "never output snake_case or glued tokens like WordWord for names. "
        "Use inclusive, neutral wording when referring to people. "
        "Do not repeat the same sentence, clause, or word back-to-back; say each idea once."
    )

    return system_prompt + language_instruction + spoken_format





def _install_livekit_blingfire_stub() -> None:

    """

    Work around Windows WDAC/App Control blocking `lk_blingfire` native extension.

    `livekit.agents` imports `livekit.agents.tokenize.blingfire` unconditionally; we stub it.

    """



    module_name = "livekit.agents.tokenize.blingfire"

    if module_name in sys.modules:

        return



    class SentenceTokenizer:  # minimal compatibility surface

        def __init__(
            self,
            *,
            min_sentence_len: int = 20,
            stream_context_len: int = 10,
            retain_format: bool = False,
        ) -> None:
            self._min_sentence_len = int(min_sentence_len)

        def tokenize(self, text: str, *, language: str | None = None) -> list[str]:

            parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", text or "") if p.strip()]

            return [p for p in parts if len(p) >= self._min_sentence_len]



        def stream(self, *, language: str | None = None):

            raise NotImplementedError("Streaming tokenizer not supported in fallback stub")



    stub = types.ModuleType(module_name)

    stub.__dict__.update({"__all__": ["SentenceTokenizer"], "SentenceTokenizer": SentenceTokenizer})

    sys.modules[module_name] = stub





_install_livekit_blingfire_stub()



from livekit.agents import inference, function_tool

from livekit.agents.llm.chat_context import ChatContext

from livekit.plugins import cartesia, deepgram, elevenlabs



from .env_load import load_agent_runtime_dotenv

from .business_id_agents import get_runtime_overrides_from_agents_table

from .mcube_defaults import apply_voice_defaults_to_dict, get_default_mcube_call_config

from .mq import (

    AI_UTTERANCES_QUEUE,

    RABBITMQ_URL,

    control_queue_name,

    declare_durable_queue,

    connect,

    get_channel,

    publish_json,

    tts_queue_name,

)



log = logging.getLogger("mcube.ai_worker")



load_agent_runtime_dotenv()



_PUBLISH_CHANNEL = None  # set in main(); used by process_utterance





def _env_fallback_runtime() -> dict[str, str]:

    """Fallback only when DB + queue payload do not supply values (see process_utterance)."""

    d = get_default_mcube_call_config()

    return {

        "system_prompt": d["system_prompt"],

        "llm_model": d["llm_model"],

        "llm_provider": d["llm_provider"],

        "tts_model": d["tts_model"],

        "tts_provider": d.get("tts_provider") or "elevenlabs",

        "tts_voice_id": d["tts_voice_id"],

        "tts_encoding": d["tts_encoding"],

        "tts_chunk_ms": d.get("tts_chunk_ms") or "200",

    }





async def _resolve_runtime_from_payload(payload: dict[str, Any]) -> dict[str, str]:

    """

    Merge precedence (later wins):

    1) `.env` defaults (MCUBE_* via mcube_defaults)

    2) `business_id_agents` row (+ JSON config) when `business_id` is present

    3) Explicit fields on the RabbitMQ payload (Redis call config / ws_bridge)

    """

    base = _env_fallback_runtime()



    # TEMPORARILY skip DB lookup – use .env.local directly for all config

    # bid = payload.get("business_id")

    # if bid is not None:

    #     db_ov = await asyncio.to_thread(

    #         get_runtime_overrides_from_agents_table,

    #         bid,

    #         agent_id=payload.get("agent_id"),

    #         user_id=payload.get("user_id"),

    #         email=payload.get("agent_email") or payload.get("email"),

    #         name=payload.get("agent_name") or payload.get("name"),

    #     )

    #     base.update(db_ov)



    override_keys = (

        "system_prompt",

        "llm_model",

        "llm_provider",

        "tts_model",

        "tts_provider",

        "tts_voice_id",

        "tts_encoding",

        "tts_chunk_ms",

        "stt_language_code",

        "stt_model_id",

        "stt_provider",

    )

    out = dict(base)

    for k in override_keys:

        if k not in payload:

            continue

        v = payload.get(k)

        if v is None:

            continue

        if isinstance(v, str) and k != "llm_provider" and not v.strip():

            continue

        if k == "system_prompt" and isinstance(v, str):

            out[k] = v.replace("\\n", "\n")

        else:

            out[k] = str(v).strip() if isinstance(v, str) else v


    apply_voice_defaults_to_dict(out)


    out.setdefault("tts_provider", base.get("tts_provider") or "")

    out.setdefault("llm_model", base.get("llm_model", ""))

    out.setdefault("tts_model", base.get("tts_model", ""))

    out.setdefault("tts_voice_id", base.get("tts_voice_id", ""))

    out.setdefault("tts_encoding", base.get("tts_encoding", ""))

    out.setdefault("system_prompt", base.get("system_prompt", ""))

    out.setdefault("tts_chunk_ms", base.get("tts_chunk_ms", "200"))

    out.setdefault("stt_language_code", base.get("stt_language_code", ""))

    out.setdefault("stt_model_id", base.get("stt_model_id", ""))

    out.setdefault("stt_provider", base.get("stt_provider") or "")

    return out


@dataclass(frozen=True)

class PublishTtsChunk:

    call_id: str

    sequence_id: int

    chunk_seq: int

    pcm16_8k_bytes: bytes





def _pcm16_to_b64(pcm16_bytes: bytes) -> str:

    return base64.b64encode(pcm16_bytes).decode("ascii")





def _resample_pcm16(pcm16_bytes: bytes, *, from_rate: int, to_rate: int, state: object) -> tuple[bytes, object]:

    """

    audioop.ratecv works on raw int16 PCM.

    Returns (converted_bytes, new_state).

    """

    import audioop



    converted, new_state = audioop.ratecv(

        pcm16_bytes,

        2,  # width bytes for int16

        1,  # channels

        from_rate,

        to_rate,

        state,

    )

    return converted, new_state





async def _run_llm(system_prompt: str, llm_model: str, user_text: str, call_id: str) -> str:
    # Phase 9.1: Reverted to LiveKit gateway temporarily to fix call processing
    llm = inference.LLM(model=llm_model, max_tokens=120)

    # Use shared context store to maintain conversation memory
    if call_id not in _call_contexts:
        _call_contexts[call_id] = ChatContext.empty()
    chat_ctx = _call_contexts[call_id]

    # Refresh system message (language/caller hints may change per turn)
    if not chat_ctx.messages or chat_ctx.messages[0].role != "system":
        chat_ctx.add_message(role="system", content=system_prompt)
    else:
        chat_ctx.messages[0] = type(chat_ctx.messages[0])(role="system", content=system_prompt)

    chat_ctx.add_message(role="user", content=user_text)

    stream = llm.chat(chat_ctx=chat_ctx)

    text = []

    async for token in stream.to_str_iterable():

        text.append(token)

    return "".join(text).strip()





def _env_bool(key: str, default: bool = False) -> bool:
    value = os.getenv(key)
    if value is None:
        return default
    return value.strip().lower() in ("1", "true", "yes", "on")


def _env_int(key: str, default: int, *, min_value: int = 1, max_value: int = 4096) -> int:
    try:
        value = int(float(os.getenv(key, str(default))))
    except Exception:
        value = default
    return max(min_value, min(max_value, value))


def _openai_tool_schema(raw_schema: dict[str, Any]) -> dict[str, Any]:
    return {
        "type": "function",
        "function": {
            "name": raw_schema["name"],
            "description": raw_schema.get("description", ""),
            "parameters": raw_schema.get("parameters") or {"type": "object", "properties": {}},
        },
    }


def _direct_llm_settings(backend: str) -> tuple[str, str]:
    if backend == "together":
        api_key = os.getenv("TOGETHER_API_KEY", "").strip()
        base_url = os.getenv("MCUBE_TOGETHER_BASE_URL", "https://api.together.xyz/v1").strip()
        if not api_key:
            raise RuntimeError("TOGETHER_API_KEY not set")
        return api_key, base_url.rstrip("/")

    api_key = os.getenv("OPENAI_API_KEY", "").strip()
    base_url = os.getenv("MCUBE_DIRECT_OPENAI_BASE_URL", "https://api.openai.com/v1").strip()
    if not api_key:
        raise RuntimeError("OPENAI_API_KEY not set")
    return api_key, base_url.rstrip("/")


async def _chat_completions_turn(
    *,
    call_id: str,
    system_prompt: str,
    llm_model: str,
    user_text: str,
    backend: str,
    tools: list[dict[str, Any]],
) -> tuple[str, list[DirectToolCall]]:
    api_key, base_url = _direct_llm_settings(backend)
    timeout_s = float(os.getenv("MCUBE_DIRECT_OPENAI_TIMEOUT_S", "90"))
    max_tokens_key = "MCUBE_CHAT_COMPLETIONS_MAX_TOKENS" if backend == "together" else "MCUBE_DIRECT_OPENAI_MAX_TOKENS"
    max_tokens = _env_int(max_tokens_key, 384 if backend == "together" else 130, min_value=16, max_value=2048)

    history = _direct_call_contexts.get(call_id)
    if not history:
        history = [{"role": "system", "content": system_prompt}]
        _direct_call_contexts[call_id] = history
    elif history[0].get("role") != "system":
        history.insert(0, {"role": "system", "content": system_prompt})
    else:
        # Keep prior user/assistant turns; only refresh system (language/caller hints change per turn).
        history[0] = {"role": "system", "content": system_prompt}
    history.append({"role": "user", "content": user_text})
    
    # Log the system prompt to verify language instruction is present
    log.info("llm_system_prompt call_id=%s language_instruction_check=%r", call_id, "LANGUAGE INSTRUCTION" in system_prompt)

    body: dict[str, Any] = {
        "model": llm_model,
        "messages": history,
        "max_tokens": max_tokens,
        "temperature": 0.4,
    }
    if tools and not (backend == "together" and _env_bool("MCUBE_TOGETHER_DISABLE_TOOLS")):
        body["tools"] = [_openai_tool_schema(t) for t in tools]
        body["tool_choice"] = "auto"

    headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
    started = time.time()
    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout_s)) as session:
        async with session.post(f"{base_url}/chat/completions", headers=headers, json=body) as resp:
            response_text = await resp.text()
            if resp.status >= 400:
                raise RuntimeError(f"{backend} chat completions failed status={resp.status} body={response_text[:500]}")
            data = json.loads(response_text)

    log.info(
        "direct_llm_generated call_id=%s backend=%s model=%s latency_ms=%s",
        call_id,
        backend,
        llm_model,
        int((time.time() - started) * 1000),
    )

    message = ((data.get("choices") or [{}])[0].get("message") or {})
    content = message.get("content") or ""
    log.info("llm_raw_response call_id=%s content=%r", call_id, content[:200] if content else "")
    tool_calls: list[DirectToolCall] = []
    for tc in message.get("tool_calls") or []:
        fn = tc.get("function") or {}
        name = fn.get("name")
        if name:
            tool_calls.append(DirectToolCall(name=str(name), arguments=str(fn.get("arguments") or "{}")))

    if content:
        history.append({"role": "assistant", "content": content})
    elif tool_calls:
        history.append({"role": "assistant", "content": "", "tool_calls": message.get("tool_calls") or []})

    return str(content).strip(), tool_calls


async def _synthesize_tts_pcm16_8k(
    text: str,
    *,
    tts_provider: str,
    tts_model: str,
    tts_voice_id: str,
    tts_encoding: str,
    session: aiohttp.ClientSession,
    language: str,
    chunk_ms: Optional[int] = None,
) -> AsyncGenerator[bytes, None]:
    """Synthesize TTS and yield chunks as they arrive for true streaming (Phase 6.3)."""
    import audioop

    text = sanitize_text_for_tts(text)

    if tts_provider == "cartesia":
        cartesia_key = os.getenv("CARTESIA_API_KEY") or ""

        tts = cartesia.TTS(
            model=tts_model or "sonic-3.5",
            voice=tts_voice_id,
            language=language,
            api_key=cartesia_key,
            encoding="pcm_s16le",
            sample_rate=8000,  # Generate at 8kHz for MCube (no downsampling needed)
            http_session=session,
        )

        target_rate = 8000
        chunk_ms_eff = chunk_ms if chunk_ms is not None else int(os.getenv("MCUBE_TTS_CHUNK_MS", "200"))
        chunk_samples = int((chunk_ms_eff / 1000.0) * target_rate)
        chunk_bytes = chunk_samples * 2

        buffer = bytearray()
        resample_state = None

        async for synth_audio in tts.synthesize(text):
            frame = synth_audio.frame
            pcm16_from = frame.data.tobytes()
            from_rate = frame.sample_rate
            if from_rate != target_rate:
                converted, resample_state = audioop.ratecv(
                    pcm16_from, 2, 1, from_rate, target_rate, resample_state,
                )
                buffer.extend(converted)
            else:
                buffer.extend(pcm16_from)
            while len(buffer) >= chunk_bytes:
                chunk = bytes(buffer[:chunk_bytes])
                del buffer[:chunk_bytes]
                yield chunk

        if buffer:
            if len(buffer) < chunk_bytes:
                buffer.extend(b"\x00" * (chunk_bytes - len(buffer)))
            yield bytes(buffer[:chunk_bytes])

    else:
        elevenlabs_key = os.getenv("ELEVENLABS_API_KEY") or os.getenv("ELEVEN_API_KEY") or ""
        if not elevenlabs_key:
            raise RuntimeError("ELEVENLABS_API_KEY not set")

        try:
            stream_lat = int(os.getenv("MCUBE_ELEVENLABS_STREAMING_LATENCY", "2"))
        except ValueError:
            stream_lat = 2
        stream_lat = max(0, min(4, stream_lat))

        tts = elevenlabs.TTS(
            voice_id=tts_voice_id,
            model=tts_model,
            api_key=elevenlabs_key,
            encoding=tts_encoding or "pcm_16000",
            streaming_latency=stream_lat,
            http_session=session,
        )

        target_rate = 8000
        chunk_ms_eff = chunk_ms if chunk_ms is not None else int(os.getenv("MCUBE_TTS_CHUNK_MS", "200"))
        chunk_bytes = int((chunk_ms_eff / 1000.0) * target_rate) * 2
        buffer = bytearray()
        resample_state = None

        async for frame in tts.synthesize(text):
            # Handle both direct frame and wrapped SynthesizedAudio objects
            if hasattr(frame, 'frame'):
                pcm_data = frame.frame.data.tobytes()
            elif hasattr(frame, 'data'):
                pcm_data = frame.data.tobytes()
            else:
                # Try to get raw bytes directly
                pcm_data = bytes(frame)
            source_rate = 16000
            if hasattr(frame, "frame"):
                source_rate = getattr(frame.frame, "sample_rate", source_rate)
            elif hasattr(frame, "sample_rate"):
                source_rate = getattr(frame, "sample_rate", source_rate)
            if source_rate != target_rate:
                pcm_data, resample_state = audioop.ratecv(
                    pcm_data, 2, 1, int(source_rate), target_rate, resample_state
                )
            buffer.extend(pcm_data)
            while len(buffer) >= chunk_bytes:
                chunk = bytes(buffer[:chunk_bytes])
                del buffer[:chunk_bytes]
                yield chunk

        if buffer:
            if len(buffer) < chunk_bytes:
                buffer.extend(b"\x00" * (chunk_bytes - len(buffer)))
            yield bytes(buffer)


async def process_utterance(message) -> None:

    async with message.process(requeue=False):

        payload = json.loads(message.body.decode("utf-8"))



        call_id = str(payload["call_id"])

        sequence_id = int(payload.get("sequence_id", 0))

        user_text = payload.get("transcript") or ""

        bot_say_text = (payload.get("bot_say_text") or "").strip()

        rt = await _resolve_runtime_from_payload(payload)

        system_prompt = rt["system_prompt"]

        llm_model = rt["llm_model"]

        llm_provider = str(rt.get("llm_provider") or "")

        tts_model = rt["tts_model"]

        tts_provider = str(rt.get("tts_provider") or "elevenlabs")

        tts_voice_id = rt["tts_voice_id"]

        tts_encoding = rt["tts_encoding"]

        try:

            tts_chunk_ms = max(5, min(500, int(float(str(rt.get("tts_chunk_ms") or "200")))))

        except Exception:

            tts_chunk_ms = 200



        # Detect language: merge STT default with preference / script / Latin (see mcube_language.resolve_detected_language)

        stt_default = str(rt.get("stt_language_code") or "en").strip() or "en"

        detected_language = stt_default

        name_newly_learned = False

        if (user_text or "").strip():

            name_newly_learned = await _maybe_capture_caller_name(call_id, user_text)

        ut_strip = (user_text or "").strip()
        lang_pref_in_ut = detect_language_preference(ut_strip) if ut_strip else None
        if lang_pref_in_ut:
            await _redis_client.set(f"caller_language:{call_id}", lang_pref_in_ut, ex=3600)

        if user_text and not bot_say_text:

            detected_language = resolve_detected_language(stt_default, user_text)

            if str(stt_default) != str(detected_language):
                log.info(
                    "language_detect_override call_id=%s seq=%s stt_default=%s resolved=%s text=%r",
                    call_id,
                    sequence_id,
                    stt_default,
                    detected_language,
                    user_text[:80],
                )

            log.info("language_detected call_id=%s seq=%s language=%s text=%r", call_id, sequence_id, detected_language, user_text[:50])



        # Augment system prompt with language instructions (will be done after tool instructions for better LLM adherence)
        system_prompt_base = system_prompt

        caller_name_bytes = await _redis_client.get(f"caller_name:{call_id}") if _redis_client else None
        caller_name = caller_name_bytes.decode("utf-8") if caller_name_bytes else None
        if caller_name:
            system_prompt_base += (
                f"\n\nCALLER NAME: The caller introduced themselves as {caller_name}. "
                "Address them by name naturally when it fits the conversation."
            )

        language_locked_bytes = await _redis_client.get(f"language_locked:{call_id}") if _redis_client else None
        language_locked = language_locked_bytes.decode("utf-8") if language_locked_bytes else None
        prompt_sent_bytes = await _redis_client.get(f"prompt_sent:{call_id}") if _redis_client else None
        prompt_sent = prompt_sent_bytes.decode("utf-8") if prompt_sent_bytes else None
        explicit_language_bytes = await _redis_client.get(f"caller_language:{call_id}") if _redis_client else None
        explicit_language = explicit_language_bytes.decode("utf-8") if explicit_language_bytes else None

        if (
            caller_name
            and not language_locked
            and not prompt_sent
            and not explicit_language
        ):
            system_prompt_base += (
                "\n\nLANGUAGE PREFERENCE (phone): You support English, Hindi, and Kannada. "
                "If the caller has shared their name but has not clearly chosen which language they prefer, "
                "ask once which they are comfortable with before a long sales pitch. "
                "If you already asked this call or they already chose, do not repeat."
            )



        start_t = time.time()

        log.info(

            "utterance_received call_id=%s seq=%s stt_text=%r llm_model=%s tts_provider=%s tts_model=%s tts_voice_id=%s language=%s",

            call_id,

            sequence_id,

            (bot_say_text or user_text or "")[:120],

            llm_model,

            tts_provider,

            tts_model,

            tts_voice_id,

            detected_language,

        )

        try:

            # If ws_bridge requested a direct bot message, skip LLM and just synthesize this text.

            if bot_say_text:

                response_text = bot_say_text

                response_text = prepare_text_for_phone_tts(response_text)

                # Phase 6.3: Create session for streaming TTS and publish chunks as they arrive
                connection_queue = tts_queue_name(call_id)
                channel = _PUBLISH_CHANNEL
                if channel is None:
                    raise RuntimeError("publish channel not initialized")

                async with aiohttp.ClientSession() as session:
                    chunks_generator = _synthesize_tts_pcm16_8k(
                        response_text,
                        tts_provider=tts_provider,
                        tts_model=tts_model,
                        tts_voice_id=tts_voice_id,
                        tts_encoding=tts_encoding,
                        session=session,
                        chunk_ms=tts_chunk_ms,
                        language=detected_language,
                    )
                    # Phase 6.3: Publish chunks immediately as they arrive (true streaming)
                    chunk_seq = 0
                    async for pcm16_8k_bytes in chunks_generator:
                        await publish_json(
                            channel=channel,
                            queue_name=connection_queue,
                            payload={
                                "call_id": call_id,
                                "sequence_id": sequence_id,
                                "chunk_seq": chunk_seq,
                                "audio_pcm_b64": _pcm16_to_b64(pcm16_8k_bytes),
                                "type": "tts_audio_chunk",
                            },
                        )
                        chunk_seq += 1

                    # Publish end-of-tts marker
                    await publish_json(
                        channel=channel,
                        queue_name=connection_queue,
                        payload={
                            "call_id": call_id,
                            "sequence_id": sequence_id,
                            "type": "tts_end",
                        },
                    )
                    # Skip the publishing loop below
                    chunks_8k = None

            else:

                # Tool schemas (the model can emit these as structured tool calls).

                end_call_schema = {

                    "type": "function",

                    "name": "end_call",

                    "description": "Terminate the current phone call immediately.",

                    "parameters": {

                        "type": "object",

                        "properties": {},

                        "required": [],

                        "additionalProperties": False,

                    },

                }



                transfer_to_number_schema = {

                    "type": "function",

                    "name": "transfer_to_number",

                    "description": "Transfer the call to another phone number.",

                    "parameters": {

                        "type": "object",

                        "properties": {

                            "phone_number": {

                                "type": "string",

                                "description": "Target phone number in E.164 format (preferred).",

                            },

                            "transfer_number": {

                                "type": "string",

                                "description": "Alias for phone_number.",

                            },

                            "sip_uri": {

                                "type": "string",

                                "description": "Optional SIP URI like sip:+918...@host.",

                            },

                        },

                        "additionalProperties": False,

                    },

                }



                tools = [

                    end_call_schema,

                    transfer_to_number_schema,

                ]



                tool_instructions = (

                    "\n\nTool-use rules for MCube phone calls:\n"

                    "- Call `end_call` to terminate the current call.\n"

                    "- Call `transfer_to_number` to transfer; provide phone_number or sip_uri.\n"

                    "- When calling a tool, do not include additional spoken content; the caller side will act on the tool."

                )

                # Add tool instructions first, then language instruction at the end for better LLM adherence
                system_prompt_with_tools = f"{system_prompt_base}{tool_instructions}"
                system_prompt_with_tools = augment_system_prompt_with_language(system_prompt_with_tools, detected_language)

                response_text = await _fast_response_for_user_text(
                    user_text,
                    call_id,
                    name_newly_learned=name_newly_learned,
                )
                tool_calls: list[DirectToolCall] = []

                # Skip fast_response for Hindi users to ensure language instruction is followed
                if detected_language == "hi":
                    response_text = None

                if response_text:
                    log.info("fast_response_generated call_id=%s seq=%s language=%s response=%r", call_id, sequence_id, detected_language, response_text[:200])
                else:
                    llm_backend = os.getenv("MCUBE_LLM_BACKEND", "together").strip().lower() or "together"
                    if llm_backend not in ("openai", "together"):
                        raise RuntimeError(
                            f"Unsupported direct MCube LLM backend {llm_backend!r}; use MCUBE_LLM_BACKEND=openai or together"
                        )

                    response_text, tool_calls = await _chat_completions_turn(
                        call_id=call_id,
                        system_prompt=system_prompt_with_tools,
                        llm_model=llm_model,
                        user_text=user_text,
                        backend=llm_backend,
                        tools=tools,
                    )
                    log.info("llm_chat_completions_response call_id=%s seq=%s language=%s response=%r", call_id, sequence_id, detected_language, response_text[:200] if response_text else "")

                text_parts = [response_text] if response_text else []



                # If the model requested a control action, publish it and skip TTS.

                if tool_calls:

                    control_queue = control_queue_name(call_id)

                    channel = _PUBLISH_CHANNEL

                    if channel is None:

                        raise RuntimeError("publish channel not initialized")



                    for tc in tool_calls:

                        try:

                            tool_args = json.loads(tc.arguments or "{}")

                        except Exception:

                            tool_args = {}



                        if tc.name == "end_call":

                            await publish_json(

                                channel=channel,

                                queue_name=control_queue,

                                payload={

                                    "type": "mcube_terminate",

                                    "call_id": call_id,

                                    "sequence_id": sequence_id,

                                },

                            )

                            return



                        if tc.name == "transfer_to_number":

                            transfer_to = (

                                tool_args.get("phone_number")

                                or tool_args.get("transfer_number")

                                or tool_args.get("sip_uri")

                            )

                            await publish_json(

                                channel=channel,

                                queue_name=control_queue,

                                payload={

                                    "type": "mcube_transfer",

                                    "call_id": call_id,

                                    "sequence_id": sequence_id,

                                    "transfer_to": transfer_to,

                                },

                            )

                            return



                response_text = "".join(text_parts).strip()

                if not response_text:

                    response_text = "Okay."

                response_text = prepare_text_for_phone_tts(response_text)

                

                # Add assistant response to conversation context

                if call_id in _call_contexts:

                    _call_contexts[call_id].add_message(role="assistant", content=response_text)



                # Phase 6.3: Create session for streaming TTS and publish chunks as they arrive
                connection_queue = tts_queue_name(call_id)
                channel = _PUBLISH_CHANNEL
                if channel is None:
                    raise RuntimeError("publish channel not initialized")

                async with aiohttp.ClientSession() as session:
                    chunks_generator = _synthesize_tts_pcm16_8k(
                        response_text,
                        tts_provider=tts_provider,
                        tts_model=tts_model,
                        tts_voice_id=tts_voice_id,
                        tts_encoding=tts_encoding,
                        session=session,
                        chunk_ms=tts_chunk_ms,
                        language=detected_language,
                    )
                    # Phase 6.3: Publish chunks immediately as they arrive (true streaming)
                    chunk_seq = 0
                    async for pcm16_8k_bytes in chunks_generator:
                        await publish_json(
                            channel=channel,
                            queue_name=connection_queue,
                            payload={
                                "call_id": call_id,
                                "sequence_id": sequence_id,
                                "chunk_seq": chunk_seq,
                                "audio_pcm_b64": _pcm16_to_b64(pcm16_8k_bytes),
                                "type": "tts_audio_chunk",
                            },
                        )
                        chunk_seq += 1

                    # Publish end-of-tts marker
                    await publish_json(
                        channel=channel,
                        queue_name=connection_queue,
                        payload={
                            "call_id": call_id,
                            "sequence_id": sequence_id,
                            "type": "tts_end",
                        },
                    )
                    # Skip the publishing loop below
                    chunks_8k = None

        except Exception as e:
            log.error("utterance_processing_failed call_id=%s seq=%s error=%s", call_id, sequence_id, str(e))
            raise



        # Publish chunks (one per queue message) so WS can stream playback.
        # Phase 6.3: Skip if chunks_8k is None (streaming already done)
        if chunks_8k is not None:
            connection_queue = tts_queue_name(call_id)



            # Publish all chunks in order.

            # WS bridge assumes chunk_seq increases and plays sequentially.

            channel = _PUBLISH_CHANNEL

            if channel is None:

                raise RuntimeError("publish channel not initialized")

            # The ws_bridge declares/consumes this queue; we can publish without declaring here.

            for chunk_seq, pcm16_8k_bytes in enumerate(chunks_8k):

                await publish_json(

                    channel=channel,

                    queue_name=connection_queue,

                    payload={

                        "call_id": call_id,

                        "sequence_id": sequence_id,

                        "chunk_seq": chunk_seq,

                        "audio_pcm_b64": _pcm16_to_b64(pcm16_8k_bytes),

                        "type": "tts_audio_chunk",

                    },

                )



            # Publish a final marker for end-of-tts.
            await publish_json(

                channel=channel,

                queue_name=connection_queue,

                payload={

                    "call_id": call_id,

                    "sequence_id": sequence_id,

                    "type": "tts_done",

                },

            )

            log.info(

                "tts_published call_id=%s seq=%s chunks=%s tts_generation_ms=%s",

                call_id,

                sequence_id,

                len(chunks_8k),

                int((time.time() - start_t) * 1000),

            )

        # Phase 6.3: Log for streaming case (chunks_8k is None)
        if chunks_8k is None:
            log.info(
                "tts_published_streaming call_id=%s seq=%s tts_generation_ms=%s",
                call_id,
                sequence_id,
                int((time.time() - start_t) * 1000),
            )





async def main() -> None:

    logging.basicConfig(level=logging.INFO)

    global _redis_client
    _redis_client = redis_async.from_url(REDIS_URL, decode_responses=False)
    log.info("ai_worker: connected to redis %s", REDIS_URL)

    connection = await connect()

    channel = await get_channel(connection)

    global _PUBLISH_CHANNEL

    _PUBLISH_CHANNEL = channel



    await declare_durable_queue(channel, AI_UTTERANCES_QUEUE)



    log.info("ai_worker: consuming %s on %s", AI_UTTERANCES_QUEUE, RABBITMQ_URL)



    queue = await channel.declare_queue(AI_UTTERANCES_QUEUE, durable=True)

    await queue.consume(process_utterance, no_ack=False)



    await asyncio.Future()  # run forever





if __name__ == "__main__":

    asyncio.run(main())



# import asyncio

# import aiohttp

# import base64

# import json

# import logging

# import os

# import sys

# import time

# from dataclasses import dataclass

# from typing import Any, AsyncGenerator, Optional

# import types

# import re

# import openai



# # Conversation context storage per call_id to maintain memory across utterances

# _call_contexts: dict[str, "ChatContext"] = {}

# # Phase 9.1: OpenAI client for direct API calls (bypass LiveKit gateway)
# _openai_client = None

# def get_openai_client():
#     global _openai_client
#     if _openai_client is None:
#         api_key = os.getenv("OPENAI_API_KEY")
#         if not api_key:
#             raise RuntimeError("OPENAI_API_KEY not set")
#         _openai_client = openai.AsyncOpenAI(api_key=api_key)
#     return _openai_client





# def detect_language(text: str) -> str:

#     """

#     Detect if text is Hindi/Hinglish or English.

#     Returns: 'hi' for Hindi/Hinglish, 'en' for English.

#     """

#     if not text:

#         return "en"

    

#     # Check for Hindi Unicode characters (Devanagari script range)

#     hindi_chars = sum(1 for char in text if '\u0900' <= char <= '\u097F')

#     if hindi_chars > 0:

#         return "hi"

    

#     # Check for common Hindi words written in Latin script (Hinglish)

#     hinglish_words = [

#         'namaste', 'kaise', 'ho', 'aap', 'main', 'hai', 'kya', 'kar', 'raha',

#         'hai', 'nahi', 'please', 'thank', 'you', 'ji', 'yaar', 'dost',

#         'acha', 'theek', 'hai', 'sahi', 'galat', 'batao', 'samajh', 'nahi',

#         'kripya', 'karein', 'dhanyawad', 'shukriya', 'haan', 'na', 'bolo',

#         'hindi', 'hinglish'

#     ]

    

#     text_lower = text.lower()

#     hinglish_count = sum(1 for word in hinglish_words if word in text_lower.split())

    

#     # If more than 1 Hinglish word detected, treat as Hindi

#     if hinglish_count >= 1:

#         return "hi"

    

#     return "en"





# def augment_system_prompt_with_language(system_prompt: str, language: str) -> str:

#     """

#     Add language-specific instructions to the system prompt based on detected language.

#     """

#     if language == "hi":

#         language_instruction = "\n\nLANGUAGE INSTRUCTION: Respond in Hindi or Hinglish (Hindi written in English script). Use natural conversational Hindi as spoken in North India. Be warm, polite, and use appropriate honorifics like 'aap', 'ji', etc."

#     else:

#         language_instruction = "\n\nLANGUAGE INSTRUCTION: Respond in English. Use clear, professional, and natural conversational English."

    

#     return system_prompt + language_instruction





# def _install_livekit_blingfire_stub() -> None:

#     """

#     Work around Windows WDAC/App Control blocking `lk_blingfire` native extension.

#     `livekit.agents` imports `livekit.agents.tokenize.blingfire` unconditionally; we stub it.

#     """



#     module_name = "livekit.agents.tokenize.blingfire"

#     if module_name in sys.modules:

#         return



#     class SentenceTokenizer:  # minimal compatibility surface

#         def __init__(

#             self,

#             *,

#             min_sentence_len: int = 20,

#             stream_context_len: int = 10,

#             retain_format: bool = False,

#         ) -> None:

#             self._min_sentence_len = int(min_sentence_len)



#         def tokenize(self, text: str, *, language: str | None = None) -> list[str]:

#             parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", text or "") if p.strip()]

#             return [p for p in parts if len(p) >= self._min_sentence_len]



#         def stream(self, *, language: str | None = None):

#             raise NotImplementedError("Streaming tokenizer not supported in fallback stub")



#     stub = types.ModuleType(module_name)

#     stub.__dict__.update({"__all__": ["SentenceTokenizer"], "SentenceTokenizer": SentenceTokenizer})

#     sys.modules[module_name] = stub





# _install_livekit_blingfire_stub()



# from livekit.agents import inference, function_tool

# from livekit.agents.llm.chat_context import ChatContext

# from livekit.plugins import cartesia, deepgram, elevenlabs



# from .env_load import load_agent_runtime_dotenv

# from .business_id_agents import get_runtime_overrides_from_agents_table

# from .mcube_defaults import get_default_mcube_call_config

# from .mq import (

#     AI_UTTERANCES_QUEUE,

#     RABBITMQ_URL,

#     control_queue_name,

#     declare_durable_queue,

#     connect,

#     get_channel,

#     publish_json,

#     tts_queue_name,

# )



# log = logging.getLogger("mcube.ai_worker")



# load_agent_runtime_dotenv()



# _PUBLISH_CHANNEL = None  # set in main(); used by process_utterance





# def _env_fallback_runtime() -> dict[str, str]:

#     """Fallback only when DB + queue payload do not supply values (see process_utterance)."""

#     d = get_default_mcube_call_config()

#     return {

#         "system_prompt": d["system_prompt"],

#         "llm_model": d["llm_model"],

#         "llm_provider": d["llm_provider"],

#         "tts_model": d["tts_model"],

#         "tts_provider": d.get("tts_provider") or "elevenlabs",

#         "tts_voice_id": d["tts_voice_id"],

#         "tts_encoding": d["tts_encoding"],

#         "tts_chunk_ms": d.get("tts_chunk_ms") or "200",

#     }





# async def _resolve_runtime_from_payload(payload: dict[str, Any]) -> dict[str, str]:

#     """

#     Merge precedence (later wins):

#     1) `.env` defaults (MCUBE_* via mcube_defaults)

#     2) `business_id_agents` row (+ JSON config) when `business_id` is present

#     3) Explicit fields on the RabbitMQ payload (Redis call config / ws_bridge)

#     """

#     base = _env_fallback_runtime()



#     # TEMPORARILY skip DB lookup – use .env.local directly for all config

#     # bid = payload.get("business_id")

#     # if bid is not None:

#     #     db_ov = await asyncio.to_thread(

#     #         get_runtime_overrides_from_agents_table,

#     #         bid,

#     #         agent_id=payload.get("agent_id"),

#     #         user_id=payload.get("user_id"),

#     #         email=payload.get("agent_email") or payload.get("email"),

#     #         name=payload.get("agent_name") or payload.get("name"),

#     #     )

#     #     base.update(db_ov)



#     override_keys = (

#         "system_prompt",

#         "llm_model",

#         "llm_provider",

#         "tts_model",

#         "tts_provider",

#         "tts_voice_id",

#         "tts_encoding",

#         "tts_chunk_ms",

#         "stt_language_code",

#         "stt_model_id",

#         "stt_provider",

#     )

#     out = dict(base)

#     for k in override_keys:

#         if k not in payload:

#             continue

#         v = payload.get(k)

#         if v is None:

#             continue

#         if isinstance(v, str) and k != "llm_provider" and not v.strip():

#             continue

#         if k == "system_prompt" and isinstance(v, str):

#             out[k] = v.replace("\\n", "\n")

#         else:

#             out[k] = str(v).strip() if isinstance(v, str) else v



#     out.setdefault("tts_provider", "elevenlabs")

#     out.setdefault("llm_model", base.get("llm_model", ""))

#     out.setdefault("tts_model", base.get("tts_model", ""))

#     out.setdefault("tts_voice_id", base.get("tts_voice_id", ""))

#     out.setdefault("tts_encoding", base.get("tts_encoding", ""))

#     out.setdefault("system_prompt", base.get("system_prompt", ""))

#     out.setdefault("tts_chunk_ms", base.get("tts_chunk_ms", "200"))

#     out.setdefault("stt_language_code", base.get("stt_language_code", "en"))

#     out.setdefault("stt_model_id", base.get("stt_model_id", ""))

#     out.setdefault("stt_provider", base.get("stt_provider", "elevenlabs"))

#     return out


# @dataclass(frozen=True)

# class PublishTtsChunk:

#     call_id: str

#     sequence_id: int

#     chunk_seq: int

#     pcm16_8k_bytes: bytes





# def _pcm16_to_b64(pcm16_bytes: bytes) -> str:

#     return base64.b64encode(pcm16_bytes).decode("ascii")





# def _resample_pcm16(pcm16_bytes: bytes, *, from_rate: int, to_rate: int, state: object) -> tuple[bytes, object]:

#     """

#     audioop.ratecv works on raw int16 PCM.

#     Returns (converted_bytes, new_state).

#     """

#     import audioop



#     converted, new_state = audioop.ratecv(

#         pcm16_bytes,

#         2,  # width bytes for int16

#         1,  # channels

#         from_rate,

#         to_rate,

#         state,

#     )

#     return converted, new_state





# async def _run_llm(system_prompt: str, llm_model: str, user_text: str) -> str:
#     # Phase 9.1: Reverted to LiveKit gateway temporarily to fix call processing
#     llm = inference.LLM(model=llm_model, max_tokens=120)

#     chat_ctx = ChatContext.empty()

#     chat_ctx.add_message(role="system", content=system_prompt)

#     chat_ctx.add_message(role="user", content=user_text)

#     stream = llm.chat(chat_ctx=chat_ctx)

#     text = []

#     async for token in stream.to_str_iterable():

#         text.append(token)

#     return "".join(text).strip()





# async def _synthesize_tts_pcm16_8k(
#     text: str,
#     *,
#     tts_provider: str,
#     tts_model: str,
#     tts_voice_id: str,
#     tts_encoding: str,
#     session: aiohttp.ClientSession,
#     language: str,
#     chunk_ms: Optional[int] = None,
# ) -> AsyncGenerator[bytes, None]:
#     """Synthesize TTS and yield chunks as they arrive for true streaming (Phase 6.3)."""
#     import audioop

#     if tts_provider == "cartesia":
#         cartesia_key = os.getenv("CARTESIA_API_KEY") or ""

#         tts = cartesia.TTS(
#             model=tts_model or "sonic-3.5",
#             voice=tts_voice_id,
#             language=language,
#             api_key=cartesia_key,
#             encoding="pcm_s16le",
#             sample_rate=16000,
#             http_session=session,
#         )

#         target_rate = 16000
#         chunk_ms_eff = chunk_ms if chunk_ms is not None else int(os.getenv("MCUBE_TTS_CHUNK_MS", "200"))
#         chunk_samples = int((chunk_ms_eff / 1000.0) * target_rate)
#         chunk_bytes = chunk_samples * 2

#         buffer = bytearray()

#         async for synth_audio in tts.synthesize(text):
#             frame = synth_audio.frame
#             pcm16_from = frame.data.tobytes()
#             from_rate = frame.sample_rate
#             if from_rate != target_rate:
#                 resample_state = None
#                 converted, resample_state = audioop.ratecv(
#                     pcm16_from, 2, 1, from_rate, target_rate, resample_state,
#                 )
#                 buffer.extend(converted)
#             else:
#                 buffer.extend(pcm16_from)
#             while len(buffer) >= chunk_bytes:
#                 chunk = bytes(buffer[:chunk_bytes])
#                 del buffer[:chunk_bytes]
#                 yield chunk

#         if buffer:
#             if len(buffer) < chunk_bytes:
#                 buffer.extend(b"\x00" * (chunk_bytes - len(buffer)))
#             yield bytes(buffer[:chunk_bytes])

#     else:
#         elevenlabs_key = os.getenv("ELEVENLABS_API_KEY") or os.getenv("ELEVEN_API_KEY") or ""
#         if not elevenlabs_key:
#             raise RuntimeError("ELEVENLABS_API_KEY not set")

#         tts = elevenlabs.TTS(
#             voice_id=tts_voice_id,
#             model=tts_model,
#             api_key=elevenlabs_key,
#             encoding=tts_encoding or "pcm_16000",
#             streaming_latency=0,
#             http_session=session,
#         )

#         target_rate = 16000
#         chunk_ms_eff = chunk_ms if chunk_ms is not None else int(os.getenv("MCUBE_TTS_CHUNK_MS", "200"))
#         chunk_bytes = int((chunk_ms_eff / 1000.0) * target_rate) * 2
#         buffer = bytearray()

#         async for frame in tts.synthesize(text):
#             # Handle both direct frame and wrapped SynthesizedAudio objects
#             if hasattr(frame, 'frame'):
#                 pcm_data = frame.frame.data.tobytes()
#             elif hasattr(frame, 'data'):
#                 pcm_data = frame.data.tobytes()
#             else:
#                 # Try to get raw bytes directly
#                 pcm_data = bytes(frame)
#             buffer.extend(pcm_data)
#             while len(buffer) >= chunk_bytes:
#                 chunk = bytes(buffer[:chunk_bytes])
#                 del buffer[:chunk_bytes]
#                 yield chunk

#         if buffer:
#             if len(buffer) < chunk_bytes:
#                 buffer.extend(b"\x00" * (chunk_bytes - len(buffer)))
#             yield bytes(buffer)


# async def process_utterance(message) -> None:

#     async with message.process(requeue=False):

#         payload = json.loads(message.body.decode("utf-8"))



#         call_id = str(payload["call_id"])

#         sequence_id = int(payload.get("sequence_id", 0))

#         user_text = payload.get("transcript") or ""

#         bot_say_text = (payload.get("bot_say_text") or "").strip()



#         rt = await _resolve_runtime_from_payload(payload)

#         system_prompt = rt["system_prompt"]

#         llm_model = rt["llm_model"]

#         llm_provider = str(rt.get("llm_provider") or "")

#         tts_model = rt["tts_model"]

#         tts_provider = str(rt.get("tts_provider") or "elevenlabs")

#         tts_voice_id = rt["tts_voice_id"]

#         tts_encoding = rt["tts_encoding"]

#         try:

#             tts_chunk_ms = max(5, min(500, int(float(str(rt.get("tts_chunk_ms") or "200")))))

#         except Exception:

#             tts_chunk_ms = 200



#         # Detect language from user text or use configured default

#         detected_language = rt.get("stt_language_code", "en")

#         if user_text and not bot_say_text:

#             detected_language = detect_language(user_text)

#             log.info("language_detected call_id=%s seq=%s language=%s text=%r", call_id, sequence_id, detected_language, user_text[:50])



#         # Augment system prompt with language instructions

#         system_prompt_with_lang = augment_system_prompt_with_language(system_prompt, detected_language)



#         start_t = time.time()

#         log.info(

#             "utterance_received call_id=%s seq=%s stt_text=%r llm_model=%s tts_provider=%s tts_model=%s tts_voice_id=%s language=%s",

#             call_id,

#             sequence_id,

#             (bot_say_text or user_text or "")[:120],

#             llm_model,

#             tts_provider,

#             tts_model,

#             tts_voice_id,

#             detected_language,

#         )

#         try:

#             # If ws_bridge requested a direct bot message, skip LLM and just synthesize this text.

#             if bot_say_text:

#                 response_text = bot_say_text

#                 # Phase 6.3: Create session for streaming TTS and publish chunks as they arrive
#                 connection_queue = tts_queue_name(call_id)
#                 channel = _PUBLISH_CHANNEL
#                 if channel is None:
#                     raise RuntimeError("publish channel not initialized")

#                 async with aiohttp.ClientSession() as session:
#                     chunks_generator = _synthesize_tts_pcm16_8k(
#                         response_text,
#                         tts_provider=tts_provider,
#                         tts_model=tts_model,
#                         tts_voice_id=tts_voice_id,
#                         tts_encoding=tts_encoding,
#                         session=session,
#                         chunk_ms=tts_chunk_ms,
#                         language=detected_language,
#                     )
#                     # Phase 6.3: Publish chunks immediately as they arrive (true streaming)
#                     chunk_seq = 0
#                     async for pcm16_8k_bytes in chunks_generator:
#                         await publish_json(
#                             channel=channel,
#                             queue_name=connection_queue,
#                             payload={
#                                 "call_id": call_id,
#                                 "sequence_id": sequence_id,
#                                 "chunk_seq": chunk_seq,
#                                 "audio_pcm_b64": _pcm16_to_b64(pcm16_8k_bytes),
#                                 "type": "tts_audio_chunk",
#                             },
#                         )
#                         chunk_seq += 1

#                     # Publish end-of-tts marker
#                     await publish_json(
#                         channel=channel,
#                         queue_name=connection_queue,
#                         payload={
#                             "call_id": call_id,
#                             "sequence_id": sequence_id,
#                             "type": "tts_end",
#                         },
#                     )
#                     # Skip the publishing loop below
#                     chunks_8k = None

#             else:

#                 # Tool schemas (the model can emit these as structured tool calls).

#                 end_call_schema = {

#                     "type": "function",

#                     "name": "end_call",

#                     "description": "Terminate the current phone call immediately.",

#                     "parameters": {

#                         "type": "object",

#                         "properties": {},

#                         "required": [],

#                         "additionalProperties": False,

#                     },

#                 }



#                 transfer_to_number_schema = {

#                     "type": "function",

#                     "name": "transfer_to_number",

#                     "description": "Transfer the call to another phone number.",

#                     "parameters": {

#                         "type": "object",

#                         "properties": {

#                             "phone_number": {

#                                 "type": "string",

#                                 "description": "Target phone number in E.164 format (preferred).",

#                             },

#                             "transfer_number": {

#                                 "type": "string",

#                                 "description": "Alias for phone_number.",

#                             },

#                             "sip_uri": {

#                                 "type": "string",

#                                 "description": "Optional SIP URI like sip:+918...@host.",

#                             },

#                         },

#                         "additionalProperties": False,

#                     },

#                 }



#                 async def _end_call_tool(raw_arguments: dict[str, object]) -> str:

#                     return "ok"



#                 async def _transfer_to_number_tool(raw_arguments: dict[str, object]) -> str:

#                     return "ok"



#                 tools = [

#                     function_tool(_end_call_tool, raw_schema=end_call_schema),

#                     function_tool(_transfer_to_number_tool, raw_schema=transfer_to_number_schema),

#                 ]



#                 tool_instructions = (

#                     "\n\nTool-use rules for MCube phone calls:\n"

#                     "- Call `end_call` to terminate the current call.\n"

#                     "- Call `transfer_to_number` to transfer; provide phone_number or sip_uri.\n"

#                     "- When calling a tool, do not include additional spoken content; the caller side will act on the tool."

#                 )

#                 system_prompt_with_tools = f"{system_prompt_with_lang}{tool_instructions}"



#                 # Phase 9.1: Reverted to LiveKit gateway temporarily to fix call processing
#                 llm = inference.LLM(model=llm_model)

#                 # Reuse existing context for this call to maintain conversation memory

#                 if call_id not in _call_contexts:

#                     _call_contexts[call_id] = ChatContext.empty()

#                     _call_contexts[call_id].add_message(role="system", content=system_prompt_with_tools)

                

#                 chat_ctx = _call_contexts[call_id]

#                 chat_ctx.add_message(role="user", content=user_text)

#                 # Measure LLM latency

#                 llm_start = time.time()

#                 stream = llm.chat(
#                     chat_ctx=chat_ctx,
#                     tools=tools,
#                     parallel_tool_calls=False,
#                 )



#                 # Iterate the stream and aggregate text + tool calls ourselves.
#                 tool_calls: list[Any] = []

#                 text_parts: list[str] = []

#                 async for chunk in stream:

#                     if chunk.delta:

#                         if chunk.delta.content:

#                             text_parts.append(chunk.delta.content)

#                         if chunk.delta.tool_calls:

#                             tool_calls.extend(chunk.delta.tool_calls)

#                 llm_latency_ms = int((time.time() - llm_start) * 1000)

#                 log.info("llm_generated call_id=%s seq=%s llm_latency_ms=%s", call_id, sequence_id, llm_latency_ms)



#                 # If the model requested a control action, publish it and skip TTS.

#                 if tool_calls:

#                     control_queue = control_queue_name(call_id)

#                     channel = _PUBLISH_CHANNEL

#                     if channel is None:

#                         raise RuntimeError("publish channel not initialized")



#                     for tc in tool_calls:

#                         try:

#                             tool_args = json.loads(tc.arguments or "{}")

#                         except Exception:

#                             tool_args = {}



#                         if tc.name == "end_call":

#                             await publish_json(

#                                 channel=channel,

#                                 queue_name=control_queue,

#                                 payload={

#                                     "type": "mcube_terminate",

#                                     "call_id": call_id,

#                                     "sequence_id": sequence_id,

#                                 },

#                             )

#                             return



#                         if tc.name == "transfer_to_number":

#                             transfer_to = (

#                                 tool_args.get("phone_number")

#                                 or tool_args.get("transfer_number")

#                                 or tool_args.get("sip_uri")

#                             )

#                             await publish_json(

#                                 channel=channel,

#                                 queue_name=control_queue,

#                                 payload={

#                                     "type": "mcube_transfer",

#                                     "call_id": call_id,

#                                     "sequence_id": sequence_id,

#                                     "transfer_to": transfer_to,

#                                 },

#                             )

#                             return



#                 response_text = "".join(text_parts).strip()

#                 if not response_text:

#                     response_text = "Okay."

                

#                 # Add assistant response to conversation context

#                 if call_id in _call_contexts:

#                     _call_contexts[call_id].add_message(role="assistant", content=response_text)



#                 # Phase 6.3: Create session for streaming TTS and publish chunks as they arrive
#                 connection_queue = tts_queue_name(call_id)
#                 channel = _PUBLISH_CHANNEL
#                 if channel is None:
#                     raise RuntimeError("publish channel not initialized")

#                 async with aiohttp.ClientSession() as session:
#                     chunks_generator = _synthesize_tts_pcm16_8k(
#                         response_text,
#                         tts_provider=tts_provider,
#                         tts_model=tts_model,
#                         tts_voice_id=tts_voice_id,
#                         tts_encoding=tts_encoding,
#                         session=session,
#                         chunk_ms=tts_chunk_ms,
#                         language=detected_language,
#                     )
#                     # Phase 6.3: Publish chunks immediately as they arrive (true streaming)
#                     chunk_seq = 0
#                     async for pcm16_8k_bytes in chunks_generator:
#                         await publish_json(
#                             channel=channel,
#                             queue_name=connection_queue,
#                             payload={
#                                 "call_id": call_id,
#                                 "sequence_id": sequence_id,
#                                 "chunk_seq": chunk_seq,
#                                 "audio_pcm_b64": _pcm16_to_b64(pcm16_8k_bytes),
#                                 "type": "tts_audio_chunk",
#                             },
#                         )
#                         chunk_seq += 1

#                     # Publish end-of-tts marker
#                     await publish_json(
#                         channel=channel,
#                         queue_name=connection_queue,
#                         payload={
#                             "call_id": call_id,
#                             "sequence_id": sequence_id,
#                             "type": "tts_end",
#                         },
#                     )
#                     # Skip the publishing loop below
#                     chunks_8k = None

#         except Exception as e:
#             log.error("utterance_processing_failed call_id=%s seq=%s error=%s", call_id, sequence_id, str(e))
#             raise



#         # Publish chunks (one per queue message) so WS can stream playback.
#         # Phase 6.3: Skip if chunks_8k is None (streaming already done)
#         if chunks_8k is not None:
#             connection_queue = tts_queue_name(call_id)



#             # Publish all chunks in order.

#             # WS bridge assumes chunk_seq increases and plays sequentially.

#             channel = _PUBLISH_CHANNEL

#             if channel is None:

#                 raise RuntimeError("publish channel not initialized")

#             # The ws_bridge declares/consumes this queue; we can publish without declaring here.

#             for chunk_seq, pcm16_8k_bytes in enumerate(chunks_8k):

#                 await publish_json(

#                     channel=channel,

#                     queue_name=connection_queue,

#                     payload={

#                         "call_id": call_id,

#                         "sequence_id": sequence_id,

#                         "chunk_seq": chunk_seq,

#                         "audio_pcm_b64": _pcm16_to_b64(pcm16_8k_bytes),

#                         "type": "tts_audio_chunk",

#                     },

#                 )



#             # Publish a final marker for end-of-tts.
#             await publish_json(

#                 channel=channel,

#                 queue_name=connection_queue,

#                 payload={

#                     "call_id": call_id,

#                     "sequence_id": sequence_id,

#                     "type": "tts_done",

#                 },

#             )

#             log.info(

#                 "tts_published call_id=%s seq=%s chunks=%s tts_generation_ms=%s",

#                 call_id,

#                 sequence_id,

#                 len(chunks_8k),

#                 int((time.time() - start_t) * 1000),

#             )

#         # Phase 6.3: Log for streaming case (chunks_8k is None)
#         if chunks_8k is None:
#             log.info(
#                 "tts_published_streaming call_id=%s seq=%s tts_generation_ms=%s",
#                 call_id,
#                 sequence_id,
#                 int((time.time() - start_t) * 1000),
#             )





# async def main() -> None:

#     logging.basicConfig(level=logging.INFO)



#     connection = await connect()

#     channel = await get_channel(connection)

#     global _PUBLISH_CHANNEL

#     _PUBLISH_CHANNEL = channel



#     await declare_durable_queue(channel, AI_UTTERANCES_QUEUE)



#     log.info("ai_worker: consuming %s on %s", AI_UTTERANCES_QUEUE, RABBITMQ_URL)



#     queue = await channel.declare_queue(AI_UTTERANCES_QUEUE, durable=True)

#     await queue.consume(process_utterance, no_ack=False)



#     await asyncio.Future()  # run forever





# if __name__ == "__main__":

#     asyncio.run(main())



