"""
Load runtime settings from cluster DB table `business_id_agents` (livekitvoicebot_cluster).

Schema varies by deployment: some rows use only `business_id`, `name`, and JSON `config`;
others add `role`, `agent_id`, `user_id`, `email`, etc. We introspect columns and merge
`config` JSON over scalar columns so all values stay dynamic (no hard-coded column lists).
"""

from __future__ import annotations

import json
import logging
import os
import time
from typing import Any
from urllib.parse import unquote, urlparse

log = logging.getLogger("mcube.business_id_agents")

_CACHE: dict[tuple[Any, ...], tuple[float, dict[str, Any] | None]] = {}
_CACHE_TTL_S = float(os.getenv("BUSINESS_ID_AGENTS_CACHE_TTL_S", "120"))


def _parse_mysql_cluster_url(url: str) -> dict[str, Any] | None:
    url = (url or "").strip()
    if not url:
        return None
    parsed = urlparse(url)
    if parsed.scheme not in ("mysql", "mysql2", "mariadb"):
        log.warning("business_id_agents: unsupported DATABASE_CLUSTER_URL scheme=%s", parsed.scheme)
        return None
    user = unquote(parsed.username or "")
    password = unquote(parsed.password or "")
    host = parsed.hostname or "127.0.0.1"
    port = int(parsed.port or 3306)
    db = (parsed.path or "").lstrip("/").split("?", 1)[0]
    if not db:
        return None
    return {
        "host": host,
        "port": port,
        "user": user,
        "password": password,
        "database": db,
        "charset": "utf8mb4",
        "cursorclass": None,  # set after import pymysql
    }


def _merged_agent_record(row: dict[str, Any]) -> dict[str, Any]:
    """Flatten table row + JSON config (config wins on key collision)."""
    out: dict[str, Any] = {}
    raw_cfg = row.get("config")
    if isinstance(raw_cfg, (bytes, bytearray)):
        raw_cfg = raw_cfg.decode("utf-8", errors="replace")
    cfg: dict[str, Any] = {}
    if isinstance(raw_cfg, str) and raw_cfg.strip():
        try:
            parsed = json.loads(raw_cfg)
            if isinstance(parsed, dict):
                cfg = parsed
        except Exception:
            pass
    elif isinstance(raw_cfg, dict):
        cfg = raw_cfg

    for k, v in row.items():
        if k == "config":
            continue
        out[k] = v
    for k, v in cfg.items():
        out[k] = v
    out["_config"] = cfg
    out["_table_columns"] = {k for k in row.keys() if k != "config"}
    return out


def _runtime_mcube_overrides(flat: dict[str, Any]) -> dict[str, str]:
    """
    Map merged agent row + JSON `config` into Redis / RabbitMQ keys for MCube runtime.
    Accepts MCUBE_* aliases and plain names (same as mcube_defaults / cluster bot exports).
    """
    g = lambda *keys: next((str(flat[k]).strip() for k in keys if flat.get(k) not in (None, "") and str(flat[k]).strip()), "")

    system_prompt = (
        g("system_prompt", "MCUBE_SYSTEM_PROMPT", "prompt")
        or ""
    ).replace("\\n", "\n")

    out: dict[str, str] = {}
    if system_prompt:
        out["system_prompt"] = system_prompt

    # Optional conversation-level fields that some deployments store in this table.
    # These are not required by the MCube runtime today, but we return them so callers can
    # keep behavior dynamic per business/agent without relying on `.env`.
    def _jsonish(key: str) -> str:
        v = flat.get(key)
        if v is None:
            return ""
        if isinstance(v, (bytes, bytearray)):
            v = v.decode("utf-8", errors="replace")
        if isinstance(v, str):
            s = v.strip()
            return s
        if isinstance(v, (dict, list)):
            try:
                return json.dumps(v, ensure_ascii=False)
            except Exception:
                return ""
        return str(v).strip()

    # Common schema fields (cluster bots use these names; some tenants also mirror them here).
    for k in ("message_inbound", "message_outbound", "platform_settings", "conversation_behavior"):
        sv = _jsonish(k)
        if sv:
            out[k] = sv

    pairs = (
        ("first_message", ("first_message", "MCUBE_FIRST_MESSAGE", "message_outbound")),
        ("llm_model", ("llm_model", "MCUBE_LLM_MODEL")),
        ("llm_provider", ("llm_provider", "MCUBE_LLM_PROVIDER")),
        ("stt_provider", ("stt_provider", "MCUBE_STT_PROVIDER")),
        ("stt_language_code", ("stt_language_code", "MCUBE_STT_LANGUAGE_CODE")),
        ("stt_model_id", ("stt_model_id", "MCUBE_STT_MODEL_ID")),
        ("tts_provider", ("tts_provider", "MCUBE_TTS_PROVIDER")),
        ("tts_model", ("tts_model", "MCUBE_TTS_MODEL")),
        ("tts_voice_id", ("tts_voice_id", "voice_id", "MCUBE_TTS_VOICE_ID")),
        ("tts_encoding", ("tts_encoding", "MCUBE_TTS_ENCODING")),
        ("tts_chunk_ms", ("tts_chunk_ms", "MCUBE_TTS_CHUNK_MS")),
        ("tts_gain", ("tts_gain", "MCUBE_TTS_GAIN")),
        ("playback_pace_factor", ("playback_pace_factor", "MCUBE_PLAYBACK_PACE_FACTOR")),
        ("checkpoint_every", ("checkpoint_every", "MCUBE_CHECKPOINT_EVERY")),
    )
    for out_key, alias_keys in pairs:
        v = g(*alias_keys)
        if v:
            out[out_key] = v

    return out


def fetch_business_id_agents_row(
    business_id: int,
    *,
    agent_id: Any = None,
    user_id: Any = None,
    email: Any = None,
    name: Any = None,
) -> dict[str, Any] | None:
    """
    SELECT matching row from business_id_agents. Uses dynamic WHERE only for columns that exist.
    """
    params_base = _parse_mysql_cluster_url(os.getenv("DATABASE_CLUSTER_URL", ""))
    if not params_base:
        log.debug("business_id_agents: DATABASE_CLUSTER_URL not set; skip DB fetch")
        return None

    try:
        import pymysql
        from pymysql.cursors import DictCursor
    except ImportError:
        log.warning("business_id_agents: pymysql not installed")
        return None

    conn_kw = {k: v for k, v in params_base.items() if k != "cursorclass"}
    conn_kw["cursorclass"] = DictCursor

    try:
        conn = pymysql.connect(**conn_kw)
    except Exception:
        log.exception("business_id_agents: DB connect failed")
        return None

    try:
        with conn.cursor() as cur:
            cur.execute(
                """
                SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
                WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'business_id_agents'
                """
            )
            col_rows = cur.fetchall()
            cols = {r["COLUMN_NAME"] for r in col_rows} if col_rows else set()
            if not cols:
                cur.execute("SHOW COLUMNS FROM `business_id_agents`")
                cols = {r["Field"] if isinstance(r, dict) else r[0] for r in cur.fetchall()}

            if not cols:
                log.warning("business_id_agents: table business_id_agents not found or empty schema")
                return None

            def q_ident(n: str) -> str:
                return f"`{str(n).replace(chr(96), chr(96)+chr(96))}`"

            conditions = [f"{q_ident('business_id')} = %s"]
            sql_params: list[Any] = [business_id]

            for key, val in (
                ("agent_id", agent_id),
                ("user_id", user_id),
                ("email", email),
                ("username", email),
                ("name", name),
            ):
                if val is None or str(val).strip() == "":
                    continue
                if key not in cols:
                    continue
                conditions.append(f"{q_ident(key)} = %s")
                sql_params.append(val)

            sql = f"SELECT * FROM `business_id_agents` WHERE {' AND '.join(conditions)} LIMIT 1"
            cur.execute(sql, sql_params)
            row = cur.fetchone()
            if row:
                return dict(row)

            # Fallback: business_id only (first row) when we could not match composite keys.
            if "business_id" in cols:
                order = f" ORDER BY {q_ident('id')} ASC" if "id" in cols else ""
                cur.execute(
                    f"SELECT * FROM `business_id_agents` WHERE {q_ident('business_id')} = %s{order} LIMIT 1",
                    [business_id],
                )
                row = cur.fetchone()
                if row:
                    log.info(
                        "business_id_agents: using first agent row for business_id=%s (no tighter match)",
                        business_id,
                    )
                    return dict(row)
    except Exception:
        log.exception("business_id_agents: query failed business_id=%s", business_id)
        return None
    finally:
        try:
            conn.close()
        except Exception:
            pass

    return None


def get_runtime_overrides_from_agents_table(
    business_id: int | None,
    *,
    agent_id: Any = None,
    user_id: Any = None,
    email: Any = None,
    name: Any = None,
) -> dict[str, str]:
    """Returns only ai_worker-relevant string overrides (may be empty)."""
    if business_id is None:
        return {}
    try:
        bid = int(business_id)
    except Exception:
        return {}

    cache_key = (bid, agent_id, user_id, email, name)
    now = time.monotonic()
    hit = _CACHE.get(cache_key)
    if hit and now - hit[0] < _CACHE_TTL_S:
        merged = hit[1]
        return _runtime_mcube_overrides(merged) if merged else {}

    row = fetch_business_id_agents_row(
        bid,
        agent_id=agent_id,
        user_id=user_id,
        email=email,
        name=name,
    )
    flat = _merged_agent_record(row) if row else {}
    _CACHE[cache_key] = (now, flat if row else None)

    if not row:
        return {}

    overrides = _runtime_mcube_overrides(flat)
    if overrides:
        log.info(
            "business_id_agents: loaded overrides for business_id=%s keys=%s",
            bid,
            list(overrides.keys()),
        )
    return overrides


def get_full_agent_context(
    business_id: int | None,
    *,
    agent_id: Any = None,
    user_id: Any = None,
    email: Any = None,
    name: Any = None,
) -> dict[str, Any]:
    """
    Full merged agent record for debugging / future use (all columns + config),
    excluding raw duplicate storage.
    """
    if business_id is None:
        return {}
    try:
        bid = int(business_id)
    except Exception:
        return {}

    row = fetch_business_id_agents_row(
        bid,
        agent_id=agent_id,
        user_id=user_id,
        email=email,
        name=name,
    )
    if not row:
        return {}
    return _merged_agent_record(row)
