"""Universal per-BID call ingest webhook (additive; does not replace orchestrator polling)."""

from __future__ import annotations

import json
import logging
import os
import subprocess
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse

import pika
import pymysql
from pymysql.cursors import DictCursor

from min_duration_util import (
    call_duration_seconds,
    evaluate_min_duration_for_ingest,
    purge_unprocessed_if_below_min,
)

logger = logging.getLogger(__name__)

CANONICAL_INGEST_SCHEMA: Dict[str, Any] = {
    "version": "1",
    "description": "PCAA universal call ingest. POST JSON to the per-BID URL with X-Ingest-Secret header.",
    "required_fields": {
        "call_id": "string — unique call identifier",
        "call_status": "string — must be ANSWER to ingest and queue STT",
        "call_start_time": "string — YYYY-MM-DD HH:MM:SS or ISO-8601",
        "recording_url": "string — full HTTPS URL to audio file",
    },
    "recommended_fields": {
        "call_end_time": "string",
        "duration_seconds": "number",
        "agent_name": "string",
        "group_name": "string",
        "direction": "inbound | outbound",
        "agent_phone": "string",
        "customer_phone": "string",
        "source": "string — e.g. mcube, exotel, manual",
    },
    "example": {
        "call_id": "97393902541780895736",
        "call_status": "ANSWER",
        "call_start_time": "2026-06-08 10:45:38",
        "call_end_time": "2026-06-08 10:50:36",
        "duration_seconds": 298,
        "recording_url": "https://recordings.mcube.com/mcubefiles112/appmcube/2026/06/6004/97393902541780895736.wav",
        "agent_name": "Richa vishwakarma",
        "group_name": "Presales Team",
        "direction": "outbound",
        "agent_phone": "9XXXXXXXXX",
        "customer_phone": "9XXXXXXXXX",
        "source": "mcube",
    },
}


def ingest_schema_for_bid(bid: str, public_base_url: str) -> Dict[str, Any]:
    base = (public_base_url or "").rstrip("/")
    path = f"/api/v1/bids/{bid}/calls/ingest"
    return {
        **CANONICAL_INGEST_SCHEMA,
        "bid": str(bid),
        "method": "POST",
        "url": f"{base}{path}" if base else path,
        "headers": {
            "Content-Type": "application/json",
            "X-Ingest-Secret": "<shared INGEST_SECRET from server .env — same for all BIDs>",
        },
        "aliases": {
            "X-Webhook-Secret": "Accepted alias for X-Ingest-Secret",
        },
        "responses": {
            "200": "Accepted (processed, queued, or skipped)",
            "401": "Invalid or missing secret",
            "403": "Webhook ingest disabled for this BID",
            "400": "Malformed payload",
            "404": "BID raw_calls table missing",
        },
    }


def _first_value(payload: Dict[str, Any], *keys: str) -> Any:
    for key in keys:
        if key in payload and payload[key] is not None and str(payload[key]).strip() != "":
            return payload[key]
    return None


def _parse_datetime(value) -> Optional[datetime]:
    if value is None:
        return None
    if isinstance(value, datetime):
        return value.replace(tzinfo=None) if value.tzinfo else value
    try:
        text = str(value).replace("Z", "+00:00")
        parsed = datetime.fromisoformat(text)
        return parsed.replace(tzinfo=None) if parsed.tzinfo else parsed
    except Exception:
        return None


def _format_db_datetime(value) -> Optional[str]:
    parsed = _parse_datetime(value)
    if not parsed:
        return str(value).strip() if value is not None else None
    return parsed.strftime("%Y-%m-%d %H:%M:%S")


def _recording_url_candidates(filename: str, starttime, source_bid: str) -> List[str]:
    filename = str(filename or "").strip()
    if not filename:
        return []
    if filename.startswith("http"):
        return [filename]
    try:
        dt = _parse_datetime(starttime)
        if not dt:
            return [filename]
        year = dt.strftime("%Y")
        month = dt.strftime("%m")
    except Exception:
        return [filename]
    name = os.path.basename(filename)
    bid = str(source_bid or "").strip()
    return [
        f"https://recordings.mcube.com/mcubefiles112/appmcube/{year}/{month}/{bid}/{name}",
        f"https://recordings.mcube.com/mcubefiles112/classic/{year}/{month}/{bid}/inbound/{name}",
    ]


def normalize_ingest_payload(payload: Dict[str, Any], *, source_bid: str) -> Tuple[Dict[str, Any], List[str]]:
    """Map canonical or common alias fields to raw_calls column names."""
    errors: List[str] = []
    call_id = _first_value(payload, "call_id", "callid", "callId")
    call_status = _first_value(payload, "call_status", "dialstatus", "callStatus")
    call_start = _first_value(payload, "call_start_time", "call_starttime", "starttime", "call_start")
    call_end = _first_value(payload, "call_end_time", "call_endtime", "endtime", "call_end")
    recording = _first_value(payload, "recording_url", "fileurl", "fileUrl", "file_url", "filename")
    duration = _first_value(payload, "duration_seconds", "duration", "answeredtime", "talktime")

    if not call_id:
        errors.append("call_id is required")
    if not call_status:
        errors.append("call_status is required")
    if not call_start:
        errors.append("call_start_time is required")
    if not recording:
        errors.append("recording_url is required")

    fileurl = str(recording or "").strip()
    start_fmt = _format_db_datetime(call_start)
    if fileurl and not fileurl.startswith("http"):
        candidates = _recording_url_candidates(fileurl, start_fmt or call_start, source_bid)
        fileurl = candidates[0] if candidates else fileurl

    direction = str(_first_value(payload, "direction") or "inbound").strip().lower() or "inbound"

    normalized = {
        "callid": str(call_id or "").strip(),
        "call_status": str(call_status or "").strip().upper(),
        "call_starttime": start_fmt,
        "call_endtime": _format_db_datetime(call_end),
        "fileurl": fileurl,
        "agentname": str(_first_value(payload, "agent_name", "agentname", "callername") or ""),
        "groupname": str(_first_value(payload, "group_name", "groupname") or ""),
        "direction": direction,
        "agent_callinfo": str(_first_value(payload, "agent_phone", "agent_callinfo", "emp_phone") or ""),
        "customer_callinfo": str(
            _first_value(payload, "customer_phone", "customer_callinfo", "clicktocalldid") or ""
        ),
        "duration_seconds": None,
        "source": str(_first_value(payload, "source", "provider") or ""),
    }

    if duration is not None and str(duration).strip() != "":
        try:
            normalized["duration_seconds"] = max(0, int(float(duration)))
        except (TypeError, ValueError):
            pass
    if normalized["duration_seconds"] is None:
        normalized["duration_seconds"] = call_duration_seconds(
            {
                "call_starttime": normalized["call_starttime"],
                "call_endtime": normalized["call_endtime"],
                "duration_seconds": duration,
            }
        )

    return normalized, errors


def _is_trusted_mcube_recording_url(url: str) -> bool:
    parsed = urlparse(str(url or ""))
    if (parsed.hostname or "").lower() != "recordings.mcube.com":
        return False
    path = (parsed.path or "").lower()
    return "/mcubefiles" in path and path.endswith(
        (".wav", ".mp3", ".ogg", ".mpeg", ".m4a", ".webm", ".flac")
    )


def _http_recording_reachable(url: str) -> bool:
    url = str(url or "").strip()
    if not url.startswith(("http://", "https://")):
        return False
    try:
        head = subprocess.run(
            ["curl", "-I", "-L", "--max-time", "20", "-s", url],
            capture_output=True,
            text=True,
        )
        if head.returncode == 0 and any(
            line.startswith("HTTP/") and any(code in line for code in (" 200", " 206"))
            for line in (head.stdout or "").splitlines()
        ):
            return True
    except Exception:
        pass
    try:
        ranged = subprocess.run(
            [
                "curl",
                "-L",
                "--max-time",
                "20",
                "-s",
                "-o",
                "/dev/null",
                "-w",
                "%{http_code}",
                "-r",
                "0-1023",
                url,
            ],
            capture_output=True,
            text=True,
        )
        if ranged.returncode == 0 and (ranged.stdout or "").strip() in ("200", "206"):
            return True
    except Exception:
        pass
    return False


def recording_url_ready(fileurl: str, callid: str) -> bool:
    if _http_recording_reachable(fileurl):
        return True
    trust_mcube = os.getenv("ORCHESTRATOR_TRUST_MCUBE_URLS", "").lower() in ("1", "true", "yes")
    if trust_mcube and _is_trusted_mcube_recording_url(fileurl):
        logger.info("[%s] Trusting Mcube recording URL (ORCHESTRATOR_TRUST_MCUBE_URLS)", callid)
        return True
    return False


def _group_allowed(groupname: str, cfg: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
    if not bool(int(cfg.get("group_filter_enabled") or 0)):
        return True, None
    allowed = [
        str(g).strip().lower()
        for g in (cfg.get("_allowed_groupnames") or [])
        if str(g).strip()
    ]
    if not allowed:
        return False, "group_filter_enabled but no allowed groups configured"
    name = str(groupname or "").strip().lower()
    if name not in allowed:
        return False, f"group '{groupname}' not in allowed list"
    return True, None


def shared_ingest_secret() -> str:
    """One platform-wide secret (Mcube sends this for all BIDs)."""
    return (
        os.getenv("INGEST_SECRET", "").strip()
        or os.getenv("WEBHOOK_SECRET", "").strip()
    )


def _verify_secret(provided: Optional[str], cfg: Dict[str, Any]) -> bool:
    if not provided:
        return False
    global_secret = shared_ingest_secret()
    if global_secret and provided == global_secret:
        return True
    per_bid = str(cfg.get("ingest_secret") or "").strip()
    return bool(per_bid and provided == per_bid)


class CallIngestService:
    def __init__(self, db_handler):
        self.db_handler = db_handler

    def _db_conn(self):
        return pymysql.connect(
            host=self.db_handler.config.get("DB_HOST"),
            port=int(self.db_handler.config.get("DB_PORT") or 3306),
            user=self.db_handler.config.get("DB_USER"),
            password=self.db_handler.config.get("DB_PASSWORD"),
            database=self.db_handler.config.get("DB_NAME"),
            cursorclass=DictCursor,
            autocommit=True,
        )

    def _load_bid_config(self, bid: str) -> Dict[str, Any]:
        self.db_handler.ensure_business_pipeline_config_table()
        cfg = self.db_handler.get_pipeline_config(bid) or {}
        if max(0, int(cfg.get("min_call_duration_s") or 0)) > 0:
            cfg["min_call_duration_effective_at"] = self.db_handler.ensure_min_duration_effective_at(bid)
        cfg["_allowed_groupnames"] = self.db_handler._decode_allowed_groupnames(
            cfg.get("allowed_groupnames")
        )
        return cfg

    def _raw_table_exists(self, cursor, bid: str) -> bool:
        table_name = f"{bid}_raw_calls"
        cursor.execute("SHOW TABLES LIKE %s", (table_name,))
        return cursor.fetchone() is not None

    def _log_ingest(
        self,
        *,
        bid: str,
        callid: Optional[str],
        source: str,
        payload: Dict[str, Any],
        action: str,
        reason: Optional[str],
        queued: bool,
        signature_valid: bool,
    ) -> None:
        try:
            self.db_handler.log_call_ingest_event(
                bid=bid,
                callid=callid,
                source=source,
                payload=payload,
                action=action,
                reason=reason,
                queued=queued,
                signature_valid=signature_valid,
            )
        except Exception as exc:
            logger.warning("Failed to write ingest log for BID %s: %s", bid, exc)

    def _upsert_raw_call(self, cursor, bid: str, row: Dict[str, Any]) -> None:
        cursor.execute(
            f"""
            INSERT INTO `{bid}_raw_calls`
            (bid, callid, fileurl, status, agentname, groupname, call_starttime, call_endtime,
             call_status, agent_callinfo, customer_callinfo, direction,
             transcription_requested, transcription_status, selected_for_processing,
             duration_seconds)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE
            fileurl = VALUES(fileurl),
            agentname = VALUES(agentname),
            groupname = VALUES(groupname),
            call_starttime = VALUES(call_starttime),
            call_endtime = VALUES(call_endtime),
            call_status = VALUES(call_status),
            agent_callinfo = VALUES(agent_callinfo),
            customer_callinfo = VALUES(customer_callinfo),
            direction = VALUES(direction),
            duration_seconds = VALUES(duration_seconds)
            """,
            (
                bid,
                row["callid"],
                row["fileurl"],
                0,
                row.get("agentname") or "",
                row.get("groupname") or "",
                row.get("call_starttime"),
                row.get("call_endtime"),
                row.get("call_status") or "ANSWER",
                row.get("agent_callinfo") or "",
                row.get("customer_callinfo") or "",
                row.get("direction") or "inbound",
                None,
                None,
                None,
                row.get("duration_seconds"),
            ),
        )

    def _queue_transcription(
        self,
        bid: str,
        call_id: str,
        recording_url: str,
        *,
        min_duration_s: int = 0,
        duration_row: Optional[Dict[str, Any]] = None,
        effective_at=None,
    ) -> bool:
        """Mirror orchestrate_pipeline.trigger_transcription without reconfiguring log handlers."""
        conn = self._db_conn()
        try:
            with conn.cursor() as cursor:
                skip_min, min_reason, probed_audio = evaluate_min_duration_for_ingest(
                    duration_row or {},
                    min_duration_s,
                    effective_at,
                    recording_url,
                    probe_audio=min_duration_s > 0,
                )
                if skip_min:
                    purge_unprocessed_if_below_min(
                        cursor,
                        bid,
                        call_id,
                        duration_row or {},
                        min_duration_s,
                        effective_at,
                    )
                    logger.info(
                        "[%s] Webhook STT queue skipped (%s)",
                        call_id,
                        min_reason,
                    )
                    return False
                if probed_audio is not None:
                    cursor.execute(
                        f"""
                        UPDATE `{bid}_raw_calls`
                        SET duration_seconds = %s
                        WHERE callid = %s
                        """,
                        (max(0, int(round(probed_audio))), call_id),
                    )

                resp_table = f"`{bid}_sarvamresponse`"
                cursor.execute(
                    f"""
                    SELECT id FROM {resp_table}
                    WHERE callid = %s
                      AND transcript IS NOT NULL
                      AND TRIM(transcript) != ''
                    """,
                    (call_id,),
                )
                if cursor.fetchone():
                    cursor.execute(
                        f"UPDATE `{bid}_raw_calls` SET status = 2 WHERE callid = %s",
                        (call_id,),
                    )
                    return True

                cursor.execute(
                    f"SELECT status, transcription_status FROM `{bid}_raw_calls` WHERE callid = %s LIMIT 1",
                    (call_id,),
                )
                current = cursor.fetchone() or {}
                current_status = int(current.get("status") if current.get("status") is not None else -99)
                if current_status in (1, 2, 3):
                    return True
                if str(current.get("transcription_status") or "") == "backlog_cleared":
                    return False

                cursor.execute(
                    f"""
                    UPDATE `{bid}_raw_calls`
                    SET status = 1
                    WHERE callid = %s
                      AND status IN (0, -2)
                      AND COALESCE(transcription_status, '') != 'backlog_cleared'
                    """,
                    (call_id,),
                )
                if cursor.rowcount < 1:
                    return False

                rabbitmq_host = os.getenv("RABBITMQ_HOST", "localhost")
                rabbitmq_queue = os.getenv("RABBITMQ_QUEUE", "stt_jobs")
                rmq_conn = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host))
                channel = rmq_conn.channel()
                channel.queue_declare(queue=rabbitmq_queue, durable=True)
                channel.basic_publish(
                    exchange="",
                    routing_key=rabbitmq_queue,
                    body=json.dumps(
                        {"bid": bid, "call_id": call_id, "recording_url": recording_url}
                    ),
                    properties=pika.BasicProperties(delivery_mode=2),
                )
                rmq_conn.close()
                return True
        except Exception as exc:
            logger.error("[%s] Failed to queue transcription for %s: %s", bid, call_id, exc)
            try:
                with conn.cursor() as cursor:
                    cursor.execute(
                        f"UPDATE `{bid}_raw_calls` SET status = 0 WHERE callid = %s AND status = 1",
                        (call_id,),
                    )
            except Exception:
                pass
            return False
        finally:
            conn.close()

    def process(
        self,
        bid: str,
        payload: Dict[str, Any],
        *,
        ingest_secret: Optional[str] = None,
    ) -> Dict[str, Any]:
        bid = str(bid).strip()
        cfg = self._load_bid_config(bid)
        min_duration_s = max(0, int(cfg.get("min_call_duration_s") or 0))
        effective_at = cfg.get("min_call_duration_effective_at")
        signature_valid = _verify_secret(ingest_secret, cfg)

        if not bool(int(cfg.get("webhook_ingest_enabled") or 0)):
            self._log_ingest(
                bid=bid,
                callid=str(payload.get("call_id") or payload.get("callid") or ""),
                source=str(payload.get("source") or ""),
                payload=payload,
                action="rejected",
                reason="webhook_ingest_disabled",
                queued=False,
                signature_valid=signature_valid,
            )
            return {
                "success": False,
                "error": "forbidden",
                "message": f"Webhook ingest is not enabled for BID {bid}",
                "http_status": 403,
            }

        if not signature_valid:
            self._log_ingest(
                bid=bid,
                callid=str(payload.get("call_id") or payload.get("callid") or ""),
                source=str(payload.get("source") or ""),
                payload=payload,
                action="rejected",
                reason="invalid_secret",
                queued=False,
                signature_valid=False,
            )
            return {
                "success": False,
                "error": "unauthorized",
                "message": "Invalid or missing X-Ingest-Secret",
                "http_status": 401,
            }

        source_bid = str(cfg.get("source_bid") or bid).strip()
        normalized, field_errors = normalize_ingest_payload(payload, source_bid=source_bid)
        if field_errors:
            self._log_ingest(
                bid=bid,
                callid=normalized.get("callid"),
                source=normalized.get("source") or "",
                payload=payload,
                action="rejected",
                reason="validation_error",
                queued=False,
                signature_valid=True,
            )
            return {
                "success": False,
                "error": "validation_error",
                "message": "; ".join(field_errors),
                "http_status": 400,
            }

        call_id = normalized["callid"]
        source = normalized.get("source") or str(payload.get("source") or "")

        conn = self._db_conn()
        try:
            with conn.cursor() as cursor:
                if not self._raw_table_exists(cursor, bid):
                    self._log_ingest(
                        bid=bid,
                        callid=call_id,
                        source=source,
                        payload=payload,
                        action="rejected",
                        reason="raw_calls_table_missing",
                        queued=False,
                        signature_valid=True,
                    )
                    return {
                        "success": False,
                        "error": "not_found",
                        "message": f"Table {bid}_raw_calls does not exist",
                        "http_status": 404,
                    }

                if normalized["call_status"] != "ANSWER":
                    self._log_ingest(
                        bid=bid,
                        callid=call_id,
                        source=source,
                        payload=payload,
                        action="skipped",
                        reason="not_answered",
                        queued=False,
                        signature_valid=True,
                    )
                    return {
                        "success": True,
                        "bid": bid,
                        "call_id": call_id,
                        "action": "skipped",
                        "reason": "not_answered",
                        "message": "Only ANSWER calls are ingested",
                        "http_status": 200,
                    }

                duration_row = {
                    "callid": call_id,
                    "call_starttime": normalized.get("call_starttime"),
                    "call_endtime": normalized.get("call_endtime"),
                    "duration_seconds": normalized.get("duration_seconds"),
                }
                skip_min, min_reason, probed_audio = evaluate_min_duration_for_ingest(
                    duration_row,
                    min_duration_s,
                    effective_at,
                    normalized.get("fileurl"),
                    probe_audio=min_duration_s > 0,
                )
                if skip_min:
                    purge_unprocessed_if_below_min(
                        cursor,
                        bid,
                        call_id,
                        duration_row,
                        min_duration_s,
                        effective_at,
                    )
                    self._log_ingest(
                        bid=bid,
                        callid=call_id,
                        source=source,
                        payload=payload,
                        action="skipped",
                        reason=f"min_duration:{min_reason}",
                        queued=False,
                        signature_valid=True,
                    )
                    return {
                        "success": True,
                        "bid": bid,
                        "call_id": call_id,
                        "action": "skipped",
                        "reason": "duration_below_min",
                        "message": f"Call not ingested ({min_reason})",
                        "http_status": 200,
                    }
                if probed_audio is not None:
                    normalized["duration_seconds"] = max(0, int(round(probed_audio)))

                allowed, group_reason = _group_allowed(normalized.get("groupname") or "", cfg)
                if not allowed:
                    self._log_ingest(
                        bid=bid,
                        callid=call_id,
                        source=source,
                        payload=payload,
                        action="skipped",
                        reason=f"group_filter:{group_reason}",
                        queued=False,
                        signature_valid=True,
                    )
                    return {
                        "success": True,
                        "bid": bid,
                        "call_id": call_id,
                        "action": "skipped",
                        "reason": "group_not_allowed",
                        "message": group_reason,
                        "http_status": 200,
                    }

                self._upsert_raw_call(cursor, bid, normalized)
        finally:
            conn.close()

        queued = False
        action = "ingested"
        message = "Call ingested; recording URL not ready — orchestrator will retry"
        duration_row = {
            "callid": call_id,
            "call_starttime": normalized.get("call_starttime"),
            "call_endtime": normalized.get("call_endtime"),
            "duration_seconds": normalized.get("duration_seconds"),
        }
        if recording_url_ready(normalized["fileurl"], call_id):
            queued = self._queue_transcription(
                bid,
                call_id,
                normalized["fileurl"],
                min_duration_s=min_duration_s,
                duration_row=duration_row,
                effective_at=effective_at,
            )
            if queued:
                action = "queued"
                message = "Call ingested and queued for transcription"
            else:
                action = "ingested"
                message = "Call ingested; STT queue skipped (already queued or terminal state)"

        self._log_ingest(
            bid=bid,
            callid=call_id,
            source=source,
            payload=payload,
            action=action,
            reason=None if queued else ("url_not_ready" if action == "ingested" else "queue_skipped"),
            queued=queued,
            signature_valid=True,
        )

        return {
            "success": True,
            "bid": bid,
            "call_id": call_id,
            "action": action,
            "queued": queued,
            "message": message,
            "http_status": 200,
        }
