from __future__ import annotations

import os
from typing import Any
from urllib.parse import urlencode, urlparse

import httpx
from django.db import connections, DatabaseError
from django.http import HttpResponse
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response

from config.master_business_views import _require_master_admin


def _row_to_voice(row, cols) -> dict:
    return dict(zip(cols, row))


def _label_value(labels: Any, *candidates: str) -> str:
    """
    ElevenLabs `labels` is a free-form string map; keys vary by casing/spacing
    (e.g. "use case" vs "use_case"). Match case-insensitively.
    """
    if not isinstance(labels, dict):
        return ""
    norm: dict[str, Any] = {}
    for k, val in labels.items():
        if k is None:
            continue
        nk = str(k).strip().lower().replace(" ", "_")
        norm[nk] = val
    for cand in candidates:
        nk = cand.strip().lower().replace(" ", "_")
        v = norm.get(nk)
        if v is not None and str(v).strip():
            return str(v).strip()
    return ""


def _split_cartesia_lang_locale(raw: Any) -> tuple[str, str]:
    """
    Cartesia `language` is either a bare code (`en`) or a locale pair (`en_GB`, `en-GB`).
    Store the base code in `language` and the full locale token in `locale` when present.
    """
    s = (str(raw).strip() if raw is not None else "") or ""
    if not s:
        return "", ""
    if "_" in s:
        lang, _rest = s.split("_", 1)
        lang = lang.strip().lower()
        return (lang, s) if lang else ("", s)
    if "-" in s:
        lang, _rest = s.split("-", 1)
        lang = lang.strip().lower()
        return (lang, s.replace("-", "_")) if lang else ("", s)
    return s.lower(), ""


def _elevenlabs_verified_pick(voice: dict[str, Any]) -> dict[str, Any]:
    """Prefer an English verified_language row; otherwise use the first entry."""
    raw = voice.get("verified_languages")
    if not isinstance(raw, list) or not raw:
        return {}
    for entry in raw:
        if not isinstance(entry, dict):
            continue
        lang = str(entry.get("language") or "").lower()
        if lang.startswith("en"):
            return entry
    first = raw[0]
    return first if isinstance(first, dict) else {}


def _elevenlabs_first_preview_from_verified(voice: dict[str, Any]) -> str:
    """Scan all verified_language rows — preview URLs are often only on some locales."""
    raw = voice.get("verified_languages")
    if not isinstance(raw, list):
        return ""
    for entry in raw:
        if not isinstance(entry, dict):
            continue
        u = str(entry.get("preview_url") or entry.get("previewUrl") or "").strip()
        if u:
            return u
    return ""


def _elevenlabs_model_id_from_voice(voice: dict[str, Any]) -> Any:
    """Pick model_id from verified_languages (prefer English), else high_quality_base_model_ids."""
    raw = voice.get("verified_languages")
    if isinstance(raw, list):
        preferred: Any = None
        fallback: Any = None
        for entry in raw:
            if not isinstance(entry, dict):
                continue
            mid = entry.get("model_id") or entry.get("modelId")
            if mid is None or str(mid).strip() == "":
                continue
            if fallback is None:
                fallback = mid
            lang = str(entry.get("language") or "").lower()
            if lang.startswith("en"):
                preferred = mid
        if preferred is not None:
            return preferred
        if fallback is not None:
            return fallback
    hq = voice.get("high_quality_base_model_ids")
    if isinstance(hq, list) and hq:
        first = hq[0]
        if first is not None and str(first).strip():
            return first
    return None


def _elevenlabs_preview_from_samples(voice: dict[str, Any]) -> str:
    """Some list payloads expose sample audio URLs even when preview_url is absent."""
    samples = voice.get("samples")
    if not isinstance(samples, list):
        return ""
    for s in samples:
        if not isinstance(s, dict):
            continue
        u = (
            s.get("preview_url")
            or s.get("previewUrl")
            or s.get("url")
            or s.get("audio_url")
            or s.get("sample_url")
        )
        if u is not None and str(u).strip():
            return str(u).strip()
    return ""


def _split_bcp47_language_tag(language: str, locale: str) -> tuple[str, str]:
    """
    If ElevenLabs returns a BCP-47 tag in `language` (e.g. en-US) but `locale` is empty,
    split into base language + full tag for the locale column.
    """
    language = (language or "").strip()
    locale = (locale or "").strip()
    if locale:
        return language, locale
    if "-" in language and language.count("-") == 1 and "_" not in language:
        base, _region = language.split("-", 1)
        base = base.strip().lower()
        if base:
            return base, language
    return language, locale


def _cartesia_preview_url(v: dict[str, Any]) -> str:
    for key in ("preview_file_url", "preview_url", "previewUrl"):
        val = v.get(key)
        if val is not None and str(val).strip():
            return str(val).strip()
    prev = v.get("preview")
    if isinstance(prev, dict):
        for key in ("file_url", "url", "preview_file_url", "preview_url"):
            val = prev.get(key)
            if val is not None and str(val).strip():
                return str(val).strip()
    return ""


def _voice_plan_db_alias() -> str:
    """
    Voice tables may live in either DB depending on environment.
    Prefer the DB that actually has `voice_plan` and the larger row count.
    """
    preferred = (os.getenv("VOICE_PLAN_DB") or "").strip().lower()
    if preferred in {"default", "cluster"}:
        return preferred

    def _table_exists(alias: str) -> bool:
        with connections[alias].cursor() as cur:
            cur.execute("SHOW TABLES LIKE 'voice_plan'")
            return cur.fetchone() is not None

    def _count(alias: str) -> int:
        try:
            with connections[alias].cursor() as cur:
                cur.execute("SELECT COUNT(*) FROM voice_plan")
                return int(cur.fetchone()[0] or 0)
        except Exception:
            return -1

    has_default = _table_exists("default")
    has_cluster = _table_exists("cluster")

    if has_default and not has_cluster:
        return "default"
    if has_cluster and not has_default:
        return "cluster"
    if not has_default and not has_cluster:
        return "default"

    c_default = _count("default")
    c_cluster = _count("cluster")
    return "cluster" if c_cluster > c_default else "default"

def _ensure_voice_plan_schema() -> None:
    """
    The existing DB in some environments defines `voice_plan.voice_id` as a numeric type.
    Provider voice IDs (ElevenLabs/Cartesia) are strings, so we must ensure it is VARCHAR.
    """
    alias = _voice_plan_db_alias()
    with connections[alias].cursor() as cur:
        cur.execute(
            """
            SELECT DATA_TYPE
            FROM INFORMATION_SCHEMA.COLUMNS
            WHERE TABLE_SCHEMA = DATABASE()
              AND TABLE_NAME = 'voice_plan'
              AND COLUMN_NAME = 'voice_id'
            """
        )
        row = cur.fetchone()
        if not row:
            return
        data_type = str(row[0] or "").lower()
        if data_type in {"varchar", "char", "text", "mediumtext", "longtext"}:
            return
        # Convert numeric/other types to VARCHAR so string IDs work.
        cur.execute("ALTER TABLE voice_plan MODIFY voice_id VARCHAR(255)")


def _ensure_voice_plan_columns() -> None:
    """
    Ensure `voice_plan` can store provider voice metadata we want to expose in the UI.
    Older environments may be missing these columns and/or have `model_id` as INT.
    """
    alias = _voice_plan_db_alias()
    with connections[alias].cursor() as cur:
        # Ensure model_id can store string model IDs (e.g. eleven_turbo_v2 / sonic-3).
        cur.execute(
            """
            SELECT DATA_TYPE
            FROM INFORMATION_SCHEMA.COLUMNS
            WHERE TABLE_SCHEMA = DATABASE()
              AND TABLE_NAME = 'voice_plan'
              AND COLUMN_NAME = 'model_id'
            """
        )
        row = cur.fetchone()
        if row:
            data_type = str(row[0] or "").lower()
            if data_type not in {"varchar", "char", "text", "mediumtext", "longtext"}:
                cur.execute("ALTER TABLE voice_plan MODIFY model_id VARCHAR(255) NULL")

        # Add optional metadata columns if missing.
        cur.execute(
            """
            SELECT COLUMN_NAME
            FROM INFORMATION_SCHEMA.COLUMNS
            WHERE TABLE_SCHEMA = DATABASE()
              AND TABLE_NAME = 'voice_plan'
            """
        )
        existing = {str(r[0] or "").lower() for r in (cur.fetchall() or [])}

        to_add: list[str] = []
        if "language" not in existing:
            to_add.append("ADD COLUMN language VARCHAR(64) NULL")
        if "locale" not in existing:
            to_add.append("ADD COLUMN locale VARCHAR(64) NULL")
        if "preview_url" not in existing:
            to_add.append("ADD COLUMN preview_url TEXT NULL")

        if to_add:
            cur.execute(f"ALTER TABLE voice_plan {', '.join(to_add)}")


@api_view(["GET"])
@permission_classes([IsAuthenticated])
def master_voice_plans(request):
    _, err = _require_master_admin(request)
    if err:
        return err

    q = request.query_params
    search = (q.get("search") or "").strip()
    plan = (q.get("plan") or "").strip()
    category = (q.get("category") or "").strip()
    model_id = (q.get("model_id") or "").strip()
    gender = (q.get("gender") or "").strip()
    per_page = max(1, min(200, int(q.get("per_page") or 50)))
    page = max(1, int(q.get("page") or 1))

    conditions = []
    params = []

    if search:
        conditions.append("(name LIKE %s OR voice_id LIKE %s)")
        params += [f"%{search}%", f"%{search}%"]
    if plan:
        conditions.append("plans = %s")
        params.append(plan)
    if category:
        conditions.append("category = %s")
        params.append(category)
    if model_id:
        conditions.append("model_id = %s")
        params.append(model_id)
    if gender:
        conditions.append("gender = %s")
        params.append(gender)

    where = ("WHERE " + " AND ".join(conditions)) if conditions else ""

    sort_col = "name"
    sort_dir = "ASC" if (q.get("sort_order") or "asc").lower() == "asc" else "DESC"

    try:
        _ensure_voice_plan_schema()
        _ensure_voice_plan_columns()
        alias = _voice_plan_db_alias()
        with connections[alias].cursor() as cur:
            # One row per voice_id (duplicates can exist from historical syncs / no UNIQUE on voice_id).
            cur.execute(f"SELECT COUNT(DISTINCT voice_id) FROM voice_plan {where}", params)
            total = cur.fetchone()[0]

            offset = (page - 1) * per_page
            # MySQL 8+ window function; picks a deterministic row per voice_id for display.
            cur.execute(
                f"""
                SELECT voice_id, name, category, model_id, plans, gender, accent, age, use_case, description, language, locale, preview_url
                FROM (
                    SELECT voice_id, name, category, model_id, plans, gender, accent, age, use_case, description, language, locale, preview_url,
                           ROW_NUMBER() OVER (
                               PARTITION BY voice_id
                               ORDER BY
                                   (preview_url IS NOT NULL AND preview_url <> '') DESC,
                                   (locale IS NOT NULL AND locale <> '') DESC,
                                   (language IS NOT NULL AND language <> '') DESC,
                                   (accent IS NOT NULL AND accent <> '') DESC,
                                   (age IS NOT NULL AND age <> '') DESC,
                                   (use_case IS NOT NULL AND use_case <> '') DESC,
                                   name {sort_dir},
                                   plans ASC,
                                   category ASC,
                                   model_id ASC
                           ) AS rn
                    FROM voice_plan vp
                    {where}
                ) ranked
                WHERE ranked.rn = 1
                ORDER BY {sort_col} {sort_dir}
                LIMIT %s OFFSET %s
                """,
                params + [per_page, offset],
            )
            cols = [c[0] for c in cur.description]
            rows = [_row_to_voice(r, cols) for r in cur.fetchall()]
    except DatabaseError as e:
        return Response({"success": False, "message": str(e)}, status=500)

    return Response({
        "success": True,
        "data": rows,
        "pagination": {
            "page": page,
            "per_page": per_page,
            "total": total,
            "total_pages": (total + per_page - 1) // per_page if total else 0,
        },
    })


@api_view(["POST"])
@permission_classes([IsAuthenticated])
def master_voice_sync(request):
    _, err = _require_master_admin(request)
    if err:
        return err

    eleven_api_key = (os.getenv("ELEVENLABS_API_KEY") or "").strip()
    cartesia_api_key = (os.getenv("CARTESIA_API_KEY") or "").strip()

    def _upsert_voice_plans(items: list[dict[str, Any]]) -> dict[str, int]:
        """
        Upserts into `voice_plan` table (existing DB schema).

        Expected keys per item:
          voice_id, name, category, model_id, plans, gender
          (+ optional accent, age, use_case, description, language, locale, preview_url)
        """
        synced = 0
        updated = 0
        _ensure_voice_plan_schema()
        _ensure_voice_plan_columns()
        alias = _voice_plan_db_alias()
        with connections[alias].cursor() as cur:
            for v in items:
                voice_id = (v.get("voice_id") or "").strip()
                if not voice_id:
                    continue
                name = (v.get("name") or "").strip() or voice_id
                category = (v.get("category") or "").strip() or ""
                model_id = (str(v.get("model_id")).strip() if v.get("model_id") is not None else "") or None
                plans = (v.get("plans") or "").strip() or "basic"
                gender = (v.get("gender") or "").strip() or ""
                accent = (v.get("accent") or "").strip() or ""
                age = (v.get("age") or "").strip() or ""
                use_case = (v.get("use_case") or "").strip() or ""
                description = (v.get("description") or "").strip() or ""
                language = (v.get("language") or "").strip() or ""
                locale = (v.get("locale") or "").strip() or ""
                preview_url = (v.get("preview_url") or "").strip() or ""

                # MySQL may report UPDATE rowcount=0 when matched but no column values changed.
                # That made us fall through to INSERT and hit duplicate-key errors. Branch on existence.
                cur.execute("SELECT 1 FROM voice_plan WHERE voice_id=%s LIMIT 1", [voice_id])
                exists = cur.fetchone() is not None
                if exists:
                    cur.execute(
                        """
                        UPDATE voice_plan
                        SET name=%s,
                            category=%s,
                            model_id=%s,
                            plans=%s,
                            gender=%s,
                            accent=%s,
                            age=%s,
                            use_case=%s,
                            description=%s,
                            language=%s,
                            locale=%s,
                            preview_url=%s
                        WHERE voice_id=%s
                        """,
                        [name, category, model_id, plans, gender, accent, age, use_case, description, language, locale, preview_url, voice_id],
                    )
                    updated += 1
                else:
                    cur.execute(
                        """
                        INSERT INTO voice_plan (voice_id, name, category, model_id, plans, gender, accent, age, use_case, description, language, locale, preview_url)
                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                        """,
                        [voice_id, name, category, model_id, plans, gender, accent, age, use_case, description, language, locale, preview_url],
                    )
                    synced += 1
        return {"synced": synced, "updated": updated, "total": synced + updated}

    async def _fetch_elevenlabs_voices() -> list[dict[str, Any]]:
        # https://api.elevenlabs.io/v1/voices — model/accent/locale/per-locale preview
        # often live under `verified_languages[]`, not only in `labels`.
        async with httpx.AsyncClient(timeout=30) as client:
            resp = await client.get(
                "https://api.elevenlabs.io/v1/voices",
                headers={"xi-api-key": eleven_api_key},
            )
            resp.raise_for_status()
            data = resp.json() or {}
        voices = data.get("voices") or []
        out: list[dict[str, Any]] = []
        for v in voices:
            if not isinstance(v, dict):
                continue
            raw_labels = v.get("labels")
            labels = raw_labels if isinstance(raw_labels, dict) else {}
            vl = _elevenlabs_verified_pick(v)

            language = (
                str(vl.get("language") or "").strip()
                or _label_value(labels, "language", "Language")
            )
            locale = str(vl.get("locale") or "").strip() or _label_value(labels, "locale", "Locale")
            language, locale = _split_bcp47_language_tag(language, locale)
            model_raw = _elevenlabs_model_id_from_voice(v)
            if model_raw is None or str(model_raw).strip() == "":
                model_raw = v.get("model_id") or v.get("modelId")
            model_id = str(model_raw).strip() if model_raw is not None and str(model_raw).strip() else None

            accent = (
                str(vl.get("accent") or "").strip()
                or _label_value(labels, "accent", "Accent")
            )
            preview_url = (
                str(v.get("preview_url") or v.get("previewUrl") or "").strip()
                or str(vl.get("preview_url") or vl.get("previewUrl") or "").strip()
                or _elevenlabs_first_preview_from_verified(v)
                or _elevenlabs_preview_from_samples(v)
            )

            cat_raw = v.get("category")
            if isinstance(cat_raw, dict):
                category = str(cat_raw.get("value") or cat_raw.get("name") or "elevenlabs").strip() or "elevenlabs"
            else:
                category = str(cat_raw or "elevenlabs").strip() or "elevenlabs"

            out.append(
                {
                    "voice_id": v.get("voice_id") or v.get("voiceId") or "",
                    "name": v.get("name") or "",
                    "category": category,
                    "model_id": model_id,
                    "plans": "elevenlabs",
                    "gender": _label_value(labels, "gender", "Gender"),
                    "accent": accent,
                    "age": _label_value(labels, "age", "Age"),
                    "use_case": _label_value(labels, "use_case", "usecase", "use case"),
                    "language": language,
                    "locale": locale,
                    "preview_url": preview_url,
                    "description": str(v.get("description") or "").strip(),
                }
            )
        return out

    async def _fetch_cartesia_voices() -> list[dict[str, Any]]:
        # https://docs.cartesia.ai/api-reference/voices/list
        # Requires Authorization: Bearer <key> and Cartesia-Version header.
        # Preview URLs are `preview_file_url` and only returned when `expand[]=preview_file_url`.
        headers = {
            "Authorization": f"Bearer {cartesia_api_key}",
            "Cartesia-Version": os.getenv("CARTESIA_VERSION", "2025-04-16"),
        }

        out: list[dict[str, Any]] = []
        starting_after: str | None = None

        async with httpx.AsyncClient(timeout=120.0) as client:
            # Cartesia voices API is paginated (default limit is small).
            while True:
                pairs = [("limit", "100"), ("expand[]", "preview_file_url")]
                if starting_after:
                    pairs.append(("starting_after", starting_after))
                # Keep literal `expand[]` in the query string (some clients over-encode brackets).
                query = urlencode(pairs, safe="[]")
                resp = await client.get(f"https://api.cartesia.ai/voices/?{query}", headers=headers)
                resp.raise_for_status()
                data = resp.json()

                if isinstance(data, dict):
                    voices = data.get("data") or []
                    has_more = bool(data.get("has_more"))
                else:
                    voices = data or []
                    has_more = False

                last_id: str | None = None
                for v in voices:
                    if not isinstance(v, dict):
                        continue
                    vid = v.get("id") or v.get("voice_id") or ""
                    last_id = str(vid) if vid is not None else last_id
                    lang_raw = v.get("language")
                    language, locale = _split_cartesia_lang_locale(lang_raw)
                    preview_url = _cartesia_preview_url(v)
                    model_raw = v.get("model_id") or v.get("modelId") or v.get("base_model") or v.get("model")
                    model_id = str(model_raw).strip() if model_raw is not None and str(model_raw).strip() else None
                    accent = str(v.get("accent") or "").strip()
                    if not accent and locale and "_" in locale:
                        accent = locale.split("_", 1)[-1].strip()
                    out.append(
                        {
                            "voice_id": vid,
                            "name": v.get("name") or "",
                            "category": "cartesia",
                            "model_id": model_id,
                            "plans": "cartesia",
                            "gender": (v.get("gender") or ""),
                            "accent": accent,
                            "age": str(v.get("age") or "").strip(),
                            "use_case": str(v.get("use_case") or v.get("useCase") or "").strip(),
                            "description": str(v.get("description") or "").strip(),
                            "language": language,
                            "locale": locale,
                            "preview_url": preview_url,
                        }
                    )

                if not has_more:
                    break
                if not last_id:
                    break
                starting_after = last_id

            # List responses sometimes omit `preview_file_url`; GET /voices/{id} often has it.
            q_enrich = urlencode([("expand[]", "preview_file_url")], safe="[]")
            enrich_budget = 200
            for item in out:
                if enrich_budget <= 0:
                    break
                if str(item.get("preview_url") or "").strip():
                    continue
                vid = item.get("voice_id")
                if not vid:
                    continue
                enrich_budget -= 1
                try:
                    detail = await client.get(
                        f"https://api.cartesia.ai/voices/{vid}/?{q_enrich}",
                        headers=headers,
                    )
                    detail.raise_for_status()
                    body = detail.json()
                except httpx.HTTPError:
                    continue
                if isinstance(body, dict):
                    pu = _cartesia_preview_url(body)
                    if pu:
                        item["preview_url"] = pu

        return out

    provider = (request.data.get("provider") if isinstance(request.data, dict) else None) or ""
    provider = str(provider).strip().lower()

    can_eleven = bool(eleven_api_key)
    can_cartesia = bool(cartesia_api_key)

    # If provider isn't specified, sync everything we have keys for.
    items: list[dict[str, Any]] = []
    provider_results: dict[str, Any] = {
        "elevenlabs": {"enabled": can_eleven, "synced": 0, "updated": 0, "total": 0},
        "cartesia": {"enabled": can_cartesia, "synced": 0, "updated": 0, "total": 0},
    }

    try:
        fetched_counts: dict[str, int] = {"elevenlabs": 0, "cartesia": 0}

        async def _run() -> None:
            nonlocal items
            if (provider in {"", "elevenlabs"}) and can_eleven:
                ev = await _fetch_elevenlabs_voices()
                fetched_counts["elevenlabs"] = len(ev)
                items += ev
            if (provider in {"", "cartesia"}) and can_cartesia:
                cv = await _fetch_cartesia_voices()
                fetched_counts["cartesia"] = len(cv)
                items += cv

        # Run async fetch in a simple event loop hop.
        import asyncio

        asyncio.run(_run())

        if not items:
            return Response(
                {
                    "success": True,
                    "data": {
                        "synced": 0,
                        "updated": 0,
                        "total": 0,
                        "message": "No provider API keys configured (or provider has no voices).",
                        "providers": provider_results,
                        "fetched": fetched_counts,
                    },
                }
            )

        try:
            up = _upsert_voice_plans(items)
        except DatabaseError as e:
            return Response(
                {
                    "success": False,
                    "message": f"Database error while saving voices: {str(e)}",
                    "data": {"providers": provider_results},
                },
                status=500,
            )

        # Attribute totals to the selected provider if specified.
        if provider in {"elevenlabs", "cartesia"}:
            provider_results[provider].update(up)
        else:
            # When syncing both, we don't currently split counts precisely.
            provider_results["combined"] = up

        return Response(
            {
                "success": True,
                "data": {
                    **up,
                    "message": "Voice plans synced.",
                    "providers": provider_results,
                    "fetched": fetched_counts,
                },
            }
        )
    except (httpx.HTTPError, ValueError, RuntimeError) as e:
        # Do not leak secrets; return a concise error string.
        return Response(
            {"success": False, "message": f"Voice sync failed: {str(e)}", "data": {"providers": provider_results}},
            status=502,
        )


@api_view(["GET"])
@permission_classes([IsAuthenticated])
def master_voice_by_plan(request, plan: str):
    _, err = _require_master_admin(request)
    if err:
        return err

    try:
        with connections["default"].cursor() as cur:
            cur.execute(
                "SELECT voice_id, name, category, model_id, plans, gender "
                "FROM voice_plans WHERE plans = %s ORDER BY name ASC",
                [plan],
            )
            cols = [c[0] for c in cur.description]
            rows = [_row_to_voice(r, cols) for r in cur.fetchall()]
    except DatabaseError as e:
        return Response({"success": False, "message": str(e)}, status=500)

    return Response({"success": True, "data": rows})


@api_view(["GET"])
@permission_classes([IsAuthenticated])
def master_voice_preview_proxy(request):
    """
    Cartesia preview URLs under files.cartesia.ai require `Authorization: Bearer`.
    The HTML <audio> element cannot send that header, so the UI calls this
    endpoint with the user's master JWT; we forward the request with the server
    Cartesia API key.
    """
    _, err = _require_master_admin(request)
    if err:
        return err

    raw_url = (request.GET.get("url") or "").strip()
    if not raw_url:
        return Response({"success": False, "message": "Missing url parameter."}, status=400)

    try:
        parsed = urlparse(raw_url)
    except Exception:
        return Response({"success": False, "message": "Invalid url."}, status=400)

    if parsed.scheme not in ("http", "https") or not parsed.netloc:
        return Response({"success": False, "message": "Invalid url."}, status=400)

    host = (parsed.hostname or "").lower()
    allowed_hosts = frozenset({"files.cartesia.ai", "api.cartesia.ai"})
    if host not in allowed_hosts:
        return Response({"success": False, "message": "Preview host not allowed."}, status=403)

    cartesia_key = (os.getenv("CARTESIA_API_KEY") or "").strip()
    if not cartesia_key:
        return Response({"success": False, "message": "Cartesia API key not configured."}, status=502)

    headers = {
        "Authorization": f"Bearer {cartesia_key}",
        "Cartesia-Version": os.getenv("CARTESIA_VERSION", "2025-04-16"),
    }

    try:
        with httpx.Client(timeout=120.0, follow_redirects=True) as client:
            upstream = client.get(raw_url, headers=headers)
    except httpx.HTTPError as e:
        return Response({"success": False, "message": f"Upstream error: {e!s}"}, status=502)

    ct = upstream.headers.get("content-type") or "application/octet-stream"
    if upstream.status_code != 200:
        return HttpResponse(
            upstream.content,
            status=upstream.status_code,
            content_type=ct,
        )

    return HttpResponse(upstream.content, content_type=ct)


@api_view(["GET"])
@permission_classes([IsAuthenticated])
def master_voice_preview_by_id(request):
    """
    On-demand preview fetch by voice_id when preview_url is missing in DB.

    - provider=cartesia: GET /voices/{id}?expand[]=preview_file_url, then download preview_file_url via Cartesia auth.
    - provider=elevenlabs: GET /v1/voices/{id}, then fetch preview_url if present (no auth expected).
    """
    _, err = _require_master_admin(request)
    if err:
        return err

    provider = (request.GET.get("provider") or "").strip().lower()
    voice_id = (request.GET.get("voice_id") or "").strip()
    if provider not in {"cartesia", "elevenlabs"}:
        return Response({"success": False, "message": "Invalid provider."}, status=400)
    if not voice_id:
        return Response({"success": False, "message": "Missing voice_id."}, status=400)

    if provider == "cartesia":
        cartesia_key = (os.getenv("CARTESIA_API_KEY") or "").strip()
        if not cartesia_key:
            return Response({"success": False, "message": "Cartesia API key not configured."}, status=502)

        headers = {
            "Authorization": f"Bearer {cartesia_key}",
            "Cartesia-Version": os.getenv("CARTESIA_VERSION", "2025-04-16"),
        }
        # Try to reuse stored metadata (model_id / language) if available.
        model_id: str | None = None
        language: str | None = None
        try:
            alias = _voice_plan_db_alias()
            with connections[alias].cursor() as cur:
                cur.execute(
                    "SELECT model_id, language FROM voice_plan WHERE voice_id=%s ORDER BY "
                    "(language IS NOT NULL AND language <> '') DESC, "
                    "(model_id IS NOT NULL AND model_id <> '') DESC "
                    "LIMIT 1",
                    [voice_id],
                )
                row = cur.fetchone()
                if row:
                    model_id = (str(row[0]).strip() if row[0] is not None else "") or None
                    language = (str(row[1]).strip() if row[1] is not None else "") or None
        except Exception:
            pass

        q = urlencode([("expand[]", "preview_file_url")], safe="[]")
        try:
            with httpx.Client(timeout=120.0, follow_redirects=True) as client:
                detail = client.get(f"https://api.cartesia.ai/voices/{voice_id}/?{q}", headers=headers)
                detail.raise_for_status()
                payload = detail.json()
                if not isinstance(payload, dict):
                    return Response({"success": False, "message": "Unexpected Cartesia response."}, status=502)
                preview_url = _cartesia_preview_url(payload)
                if preview_url:
                    # Reuse the proxy behavior to download audio with Authorization.
                    upstream = client.get(preview_url, headers=headers)
                else:
                    # If no preview is provided, generate a short sample via TTS.
                    # This consumes TTS credits, but makes every Cartesia voice playable.
                    mid = model_id or "sonic-3"
                    lang = (language or str(payload.get("language") or "").strip() or "en").replace("-", "_")
                    upstream = client.post(
                        "https://api.cartesia.ai/tts/bytes",
                        headers={**headers, "Content-Type": "application/json"},
                        json={
                            "model_id": mid,
                            "transcript": "Hello! This is a voice preview.",
                            "voice": {"mode": "id", "id": voice_id},
                            "output_format": {"container": "wav", "encoding": "pcm_s16le", "sample_rate": 44100},
                            "language": lang,
                        },
                    )
        except httpx.HTTPStatusError as e:
            status = e.response.status_code if e.response is not None else 502
            return Response({"success": False, "message": f"Cartesia error: {status}"}, status=502)
        except httpx.HTTPError as e:
            return Response({"success": False, "message": f"Upstream error: {e!s}"}, status=502)

        ct = upstream.headers.get("content-type") or "application/octet-stream"
        if upstream.status_code != 200:
            return HttpResponse(upstream.content, status=upstream.status_code, content_type=ct)
        return HttpResponse(upstream.content, content_type=ct)

    # provider == "elevenlabs"
    eleven_key = (os.getenv("ELEVENLABS_API_KEY") or "").strip()
    if not eleven_key:
        return Response({"success": False, "message": "ElevenLabs API key not configured."}, status=502)

    try:
        with httpx.Client(timeout=60.0, follow_redirects=True) as client:
            detail = client.get(
                f"https://api.elevenlabs.io/v1/voices/{voice_id}",
                headers={"xi-api-key": eleven_key},
            )
            detail.raise_for_status()
            v = detail.json()
            if not isinstance(v, dict):
                return Response({"success": False, "message": "Unexpected ElevenLabs response."}, status=502)
            preview_url = str(v.get("preview_url") or v.get("previewUrl") or "").strip()
            if not preview_url:
                preview_url = _elevenlabs_first_preview_from_verified(v) or _elevenlabs_preview_from_samples(v)
            if not preview_url:
                return Response({"success": False, "message": "No preview available for this voice."}, status=404)
            upstream = client.get(preview_url)
    except httpx.HTTPStatusError as e:
        status = e.response.status_code if e.response is not None else 502
        return Response({"success": False, "message": f"ElevenLabs error: {status}"}, status=502)
    except httpx.HTTPError as e:
        return Response({"success": False, "message": f"Upstream error: {e!s}"}, status=502)

    ct = upstream.headers.get("content-type") or "application/octet-stream"
    if upstream.status_code != 200:
        return HttpResponse(upstream.content, status=upstream.status_code, content_type=ct)
    return HttpResponse(upstream.content, content_type=ct)
