import json
import logging
import time
import re
import os
import csv
import hashlib
from io import BytesIO
from io import StringIO
from datetime import datetime
from typing import Any, Dict, List, Optional
from urllib.parse import unquote

import pymysql
import requests
from fastapi import Body, Depends, FastAPI, File, Header, HTTPException, Query, Request, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse, PlainTextResponse, StreamingResponse
from pydantic import BaseModel, Field

from auth_handler import AuthHandler
from config import Config
from db_handler import DatabaseHandler
from analytics import AnalyticsService
from quality_parameters_handler import QualityParametersHandler
from objection_handler import ObjectionClassificationsHandler
from leadsquared_service import LeadSquaredService
from mcube_ai_agents import AGENT_PROMPTS, get_agent_instruction, get_default_catalog, normalize_agent
from rag_handler import RAGHandler

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)


def _build_config_dict() -> Dict[str, Any]:
    cfg: Dict[str, Any] = {}
    for key in dir(Config):
        if key.isupper():
            cfg[key] = getattr(Config, key)
    return cfg


CONFIG = _build_config_dict()
auth_handler = AuthHandler(CONFIG)
db_handler = DatabaseHandler(CONFIG)
rag_handler = RAGHandler(CONFIG)
analytics_service = AnalyticsService(db_handler)
quality_params_handler = QualityParametersHandler(CONFIG)
objection_handler = ObjectionClassificationsHandler(CONFIG)
_PRESALES_MAP_CACHE: Dict[str, Dict[str, Any]] = {}
_PRESALES_MAP_CACHE_TTL_SECONDS = 300


class IngestDocumentsRequest(BaseModel):
    documents: List[Dict[str, Any]] = Field(default_factory=list)


class IngestTranscriptsRequest(BaseModel):
    presales_only: bool = True
    limit: int = 1000
    overwrite_existing: bool = False
    callids: Optional[List[str]] = None


class QueryRequest(BaseModel):
    user_id: str
    message: str
    query_embedding: Optional[List[float]] = None
    conversation_id: Optional[str] = None
    top_k: Optional[int] = None
    min_similarity: Optional[float] = None
    metadata: Optional[Dict[str, Any]] = None
    profile_updates: Optional[Dict[str, Any]] = None
    agent_type: Optional[str] = "lead_quality_agent"


class ScoreCallRequest(BaseModel):
    call_id: str
    user_id: Optional[str] = "system"
    agent_type: Optional[str] = "lead_quality_agent"
    force_llm: bool = False


class AgentConfigUpsertRequest(BaseModel):
    display_name: Optional[str] = None
    is_enabled: Optional[bool] = None
    is_locked: Optional[bool] = None
    system_prompt: Optional[str] = None
    provider: Optional[str] = None
    model_name: Optional[str] = None
    runtime_config: Optional[Dict[str, Any]] = None


class AgentAnalyzeCallRequest(BaseModel):
    call_id: str
    agent_type: Optional[str] = "lead_quality_agent"
    force_refresh: bool = False


def _extract_bearer_token(authorization: Optional[str]) -> str:
    if not authorization or not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Missing authorization header")
    return authorization.replace("Bearer ", "", 1).strip()


def _has_business_access(user: Dict[str, Any], bid: str) -> bool:
    if not user:
        return False
    if user.get("is_master"):
        return True
    businesses = user.get("businesses", []) or []
    for business in businesses:
        if str(business.get("bid")) == str(bid):
            return True
    return False


def _auth_user(authorization: Optional[str] = Header(default=None)) -> Dict[str, Any]:
    token = _extract_bearer_token(authorization)
    user = auth_handler.validate_token(token)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid or expired token")
    return user


def _auth_user_for_bid(bid: str, user: Dict[str, Any]) -> Dict[str, Any]:
    if not _has_business_access(user, bid):
        raise HTTPException(status_code=403, detail=f"Access denied to business {bid}")
    return user


def _as_bool(value: Optional[str], default: bool = False) -> bool:
    if value is None:
        return default
    return str(value).strip().lower() in ("1", "true", "yes", "y", "on")


def user_has_business_admin(user: Dict[str, Any], bid: str) -> bool:
    if not user:
        return False
    if user.get("is_master"):
        return True
    for business in user.get("businesses", []) or []:
        role = str(business.get("role", "")).strip().lower()
        if str(business.get("bid")) == str(bid) and role == "admin":
            return True
    return False


def _require_business_admin_or_master(bid: str, user: Dict[str, Any]) -> None:
    if not user_has_business_admin(user, bid):
        raise HTTPException(status_code=403, detail="Business admin access required")


def get_sync_source_db_config() -> Dict[str, Any]:
    return {
        "host": os.getenv("SYNC_SOURCE_DB_HOST", CONFIG.get("DB_HOST", "127.0.0.1")),
        "port": int(os.getenv("SYNC_SOURCE_DB_PORT", CONFIG.get("DB_PORT", 3306))),
        "user": os.getenv("SYNC_SOURCE_DB_USER", CONFIG.get("DB_USER", "admin")),
        "password": os.getenv("SYNC_SOURCE_DB_PASSWORD", CONFIG.get("DB_PASSWORD", "")),
        "database": os.getenv("SYNC_SOURCE_DB_NAME", CONFIG.get("DB_NAME", "voicebot_cluster")),
        "charset": "utf8mb4",
    }


def _normalize_phone_variants(phone: Any) -> List[str]:
    digits = "".join(ch for ch in str(phone or "") if ch.isdigit())
    if not digits:
        return []

    core10 = digits[-10:] if len(digits) > 10 else digits
    variants = {digits, f"+{digits}", core10}
    if len(core10) == 10:
        variants.update({f"91{core10}", f"+91{core10}", f"0{core10}"})
    return [v for v in variants if v]


def _extract_lsq_lead_list(payload: Any) -> List[Dict[str, Any]]:
    if payload is None:
        return []
    if isinstance(payload, list):
        return payload
    if isinstance(payload, dict):
        for key in ("List", "Data", "list", "data"):
            value = payload.get(key)
            if isinstance(value, list):
                return value
    return []

def _to_json_safe(value: Any) -> Any:
    if isinstance(value, datetime):
        return value.isoformat()
    if isinstance(value, dict):
        return {str(k): _to_json_safe(v) for k, v in value.items()}
    if isinstance(value, list):
        return [_to_json_safe(v) for v in value]
    return value


def _build_customer_profile(details: Dict[str, Any], crm: Dict[str, Any]) -> Dict[str, Any]:
    calls = details.get("calls") or []
    answered_calls = sum(1 for call in calls if str(call.get("call_status", "")).upper() == "ANSWER")
    transcript_calls = sum(1 for call in calls if bool(call.get("has_transcript")))
    crm_lead = crm.get("lead") if isinstance(crm, dict) else None
    return {
        "lead_phone": details.get("lead_phone"),
        "owner_name": (crm_lead or {}).get("owner_name") or details.get("owner_name"),
        "total_conversations": int(details.get("total_conversations") or 0),
        "answered_calls": int(answered_calls),
        "transcript_calls": int(transcript_calls),
        "avg_quality_score": float(details.get("avg_quality_score") or 0),
        "talk_listen_ratio": details.get("talk_listen_ratio") or "N/A",
        "total_duration_seconds": int(details.get("total_duration_seconds") or 0),
        "crm_connected": bool(crm.get("connected")) if isinstance(crm, dict) else False,
        "crm_matched": bool(crm.get("matched")) if isinstance(crm, dict) else False,
        "crm_status": (crm_lead or {}).get("status"),
        "crm_next_task_due_date": (crm_lead or {}).get("next_task_due_date"),
    }


def _db_conn():
    return pymysql.connect(
        host=CONFIG.get("DB_HOST", "127.0.0.1"),
        port=int(CONFIG.get("DB_PORT", 3306)),
        user=CONFIG.get("DB_USER", "admin"),
        password=CONFIG.get("DB_PASSWORD", ""),
        database=CONFIG.get("DB_NAME", "voicebot_cluster"),
        charset="utf8mb4",
        cursorclass=pymysql.cursors.DictCursor,
        autocommit=True,
    )


def _ensure_agent_tables():
    conn = _db_conn()
    try:
        cursor = conn.cursor()
        cursor.execute(
            """
            CREATE TABLE IF NOT EXISTS rag_agent_configs (
                id BIGINT AUTO_INCREMENT PRIMARY KEY,
                bid VARCHAR(50) NOT NULL,
                agent_type VARCHAR(100) NOT NULL,
                display_name VARCHAR(150) NOT NULL,
                is_enabled BOOLEAN DEFAULT FALSE,
                is_locked BOOLEAN DEFAULT TRUE,
                system_prompt LONGTEXT NOT NULL,
                provider VARCHAR(100) DEFAULT 'auto',
                model_name VARCHAR(150) DEFAULT '',
                runtime_config JSON,
                updated_by VARCHAR(100) DEFAULT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                UNIQUE KEY uniq_bid_agent (bid, agent_type),
                INDEX idx_bid_enabled (bid, is_enabled)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
            """
        )
        cursor.execute(
            """
            CREATE TABLE IF NOT EXISTS rag_agent_call_analysis (
                id BIGINT AUTO_INCREMENT PRIMARY KEY,
                bid VARCHAR(50) NOT NULL,
                call_id VARCHAR(100) NOT NULL,
                agent_type VARCHAR(100) NOT NULL,
                status ENUM('completed','failed') DEFAULT 'completed',
                model_name VARCHAR(150) DEFAULT '',
                summary TEXT,
                result_json JSON,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                UNIQUE KEY uniq_bid_call_agent (bid, call_id, agent_type),
                INDEX idx_bid_created (bid, created_at)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
            """
        )
        cursor.execute(
            """
            CREATE TABLE IF NOT EXISTS rag_agent_knowledge_uploads (
                id BIGINT AUTO_INCREMENT PRIMARY KEY,
                bid VARCHAR(50) NOT NULL,
                agent_type VARCHAR(100) NOT NULL,
                version_no INT NOT NULL,
                filename VARCHAR(255) NOT NULL,
                page_count INT NOT NULL,
                char_count INT NOT NULL,
                knowledge_bands JSON,
                compiled_prompt LONGTEXT,
                uploaded_by VARCHAR(100) DEFAULT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                UNIQUE KEY uniq_bid_agent_version (bid, agent_type, version_no),
                INDEX idx_bid_agent_created (bid, agent_type, created_at)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
            """
        )
    finally:
        conn.close()


def _ensure_default_agent_configs(bid: str):
    catalog = get_default_catalog()
    conn = _db_conn()
    try:
        cursor = conn.cursor()
        for item in catalog:
            cursor.execute(
                """
                INSERT INTO rag_agent_configs (
                    bid, agent_type, display_name, is_enabled, is_locked, system_prompt, provider, model_name, runtime_config
                )
                VALUES (%s, %s, %s, %s, %s, %s, 'auto', '', %s)
                ON DUPLICATE KEY UPDATE agent_type = agent_type
                """,
                (
                    str(bid),
                    str(item["agent_type"]),
                    str(item["display_name"]),
                    1 if item.get("is_enabled") else 0,
                    1 if item.get("is_locked") else 0,
                    str(item["instruction"]),
                    json.dumps(item.get("runtime_config") or {"temperature": 0.2, "top_p": 0.9}, ensure_ascii=True),
                ),
            )
    finally:
        conn.close()


def _get_agent_configs(bid: str) -> List[Dict[str, Any]]:
    _ensure_default_agent_configs(bid)
    conn = _db_conn()
    try:
        cursor = conn.cursor()
        cursor.execute(
            """
            SELECT bid, agent_type, display_name, is_enabled, is_locked, system_prompt, provider, model_name, runtime_config, updated_at
            FROM rag_agent_configs
            WHERE bid = %s
            ORDER BY id ASC
            """,
            (str(bid),),
        )
        rows = cursor.fetchall() or []
    finally:
        conn.close()
    for row in rows:
        if isinstance(row.get("runtime_config"), str):
            try:
                row["runtime_config"] = json.loads(row["runtime_config"])
            except Exception:
                row["runtime_config"] = {}
    return _to_json_safe(rows)


def _get_agent_config(bid: str, agent_type: str) -> Optional[Dict[str, Any]]:
    for row in _get_agent_configs(bid):
        if str(row.get("agent_type")) == str(agent_type):
            return row
    return None


def _resolve_agent_instruction_for_bid(bid: str, agent_type: str) -> str:
    cfg = _get_agent_config(bid, agent_type)
    if cfg and cfg.get("system_prompt"):
        return str(cfg.get("system_prompt"))
    return get_agent_instruction(agent_type)


def _extract_pdf_pages(pdf_bytes: bytes) -> List[str]:
    parser_error: Optional[Exception] = None
    for module_name in ("pypdf", "PyPDF2"):
        try:
            module = __import__(module_name)
            reader = module.PdfReader(BytesIO(pdf_bytes))
            pages: List[str] = []
            for page in reader.pages:
                pages.append((page.extract_text() or "").strip())
            return pages
        except Exception as exc:  # noqa: BLE001
            parser_error = exc
            continue
    # Offline fallback for restricted environments:
    # 1) estimate page count by /Type /Page markers
    # 2) extract rough text tokens per page section from literal PDF strings.
    try:
        page_count = len(re.findall(rb"/Type\s*/Page\b", pdf_bytes))
        if page_count <= 0:
            raise ValueError("No /Type /Page markers found")

        text_blob = pdf_bytes.decode("latin1", errors="ignore")
        page_splits = re.split(r"/Type\s*/Page\b", text_blob)
        candidate_sections = page_splits[1:] if len(page_splits) > 1 else [text_blob]

        pages: List[str] = []
        for idx in range(page_count):
            segment = candidate_sections[idx] if idx < len(candidate_sections) else ""
            literals = re.findall(r"\(([^()]*)\)", segment)
            cleaned = " ".join([re.sub(r"\\[nrtbf()\\]", " ", lit) for lit in literals]).strip()
            pages.append(cleaned or " ")

        return pages
    except Exception as fallback_exc:  # noqa: BLE001
        raise HTTPException(
            status_code=500,
            detail=(
                "PDF parser is unavailable on the server and fallback extraction failed. "
                "Install 'pypdf' inside backend venv for reliable parsing. "
                f"Last parser error: {parser_error}; fallback error: {fallback_exc}"
            ),
        )


def _assert_raj_presales_agent(agent_type: str):
    if agent_type != "presales_lead_agent":
        raise HTTPException(
            status_code=400,
            detail=(
                "This endpoint is currently enabled only for Raj Mehta (presales_lead_agent). "
                f"Received agent_type={agent_type}"
            ),
        )


def _list_conversations(bid: str, user_id: Optional[str], limit: int) -> Dict[str, Any]:
    conn = _db_conn()
    try:
        cursor = conn.cursor()
        if user_id:
            cursor.execute(
                """
                SELECT c.conversation_id, c.user_id, c.metadata, c.updated_at,
                    (SELECT content FROM rag_messages m WHERE m.bid = c.bid AND m.conversation_id = c.conversation_id AND m.role = 'user' ORDER BY m.id ASC LIMIT 1) AS first_user_message,
                    (SELECT content FROM rag_messages m WHERE m.bid = c.bid AND m.conversation_id = c.conversation_id ORDER BY m.id DESC LIMIT 1) AS last_message
                FROM rag_conversations c
                WHERE c.bid = %s AND c.user_id = %s
                ORDER BY c.updated_at DESC
                LIMIT %s
                """,
                (str(bid), str(user_id), int(limit)),
            )
        else:
            cursor.execute(
                """
                SELECT c.conversation_id, c.user_id, c.metadata, c.updated_at,
                    (SELECT content FROM rag_messages m WHERE m.bid = c.bid AND m.conversation_id = c.conversation_id AND m.role = 'user' ORDER BY m.id ASC LIMIT 1) AS first_user_message,
                    (SELECT content FROM rag_messages m WHERE m.bid = c.bid AND m.conversation_id = c.conversation_id ORDER BY m.id DESC LIMIT 1) AS last_message
                FROM rag_conversations c
                WHERE c.bid = %s
                ORDER BY c.updated_at DESC
                LIMIT %s
                """,
                (str(bid), int(limit)),
            )
        rows = cursor.fetchall() or []
    finally:
        conn.close()
    for row in rows:
        if isinstance(row.get("metadata"), str):
            try:
                row["metadata"] = json.loads(row["metadata"])
            except Exception:
                row["metadata"] = {}
    return {"conversations": _to_json_safe(rows)}


def _collect_call_context(bid: str, call_id: str) -> Dict[str, Any]:
    call = db_handler.get_call_by_id(bid, call_id) or {}
    transcript = db_handler.get_call_transcript(bid, call_id) or {}
    analytics = db_handler.get_call_analytics(bid, call_id) or {}
    bant = db_handler.get_bant_analysis(bid, call_id) or {}
    return {
        "call": _to_json_safe(call),
        "transcript": _to_json_safe(transcript),
        "analytics": _to_json_safe(analytics),
        "bant": _to_json_safe(bant),
    }


def _score_call_fallback(context: Dict[str, Any]) -> Dict[str, Any]:
    analytics = context.get("analytics") or {}
    bant = context.get("bant") or {}
    transcript = context.get("transcript") or {}
    quality = analytics.get("quality_score")
    sentiment = analytics.get("sentiment")
    summary = analytics.get("summary") or "Call analyzed using existing call metadata."
    profile = bant.get("profile") or {}
    profile_summary = bant.get("summary") or ""
    return {
        "summary": summary,
        "quality_score": quality if quality is not None else 0,
        "sentiment": sentiment or "unknown",
        "compliance_risk": "medium" if quality is not None and quality < 60 else "low",
        "customer_profile": {
            "profile": profile,
            "summary": profile_summary,
            "transcript_language": transcript.get("language"),
        },
        "recommended_actions": [
            "Review objection handling moments and add specific rebuttal playbook entries.",
            "Track repeat intent and follow-up commitments in CRM.",
            "Coach agent using this call transcript and quality notes.",
        ],
    }


def _resolve_agent_model_settings(bid: str, agent_type: str) -> Dict[str, Any]:
    cfg = _get_agent_config(bid, agent_type) or {}
    runtime_cfg = cfg.get("runtime_config") or {}
    if not isinstance(runtime_cfg, dict):
        runtime_cfg = {}
    return {
        "provider": str(cfg.get("provider") or "auto"),
        "model_name": str(cfg.get("model_name") or "").strip(),
        "runtime_config": runtime_cfg,
    }


def _score_call_with_llm(context: Dict[str, Any], bid: str, agent_type: str) -> Optional[Dict[str, Any]]:
    transcript_text = (context.get("transcript") or {}).get("transcript")
    if not transcript_text:
        return None

    settings = _resolve_agent_model_settings(bid, agent_type)
    prompt = (
        f"{_resolve_agent_instruction_for_bid(bid, agent_type)}\n"
        "Return STRICT JSON with keys: summary, quality_score, sentiment, compliance_risk, "
        "customer_profile, recommended_actions.\n"
        "quality_score must be 0-100 numeric.\n\n"
        f"CALL CONTEXT:\n{json.dumps(context, ensure_ascii=True)}"
    )
    answer = rag_handler._invoke_chat_model(  # noqa: SLF001
        prompt,
        provider=settings["provider"],
        model_name=settings["model_name"] or None,
        runtime_config=settings["runtime_config"],
    )
    if not answer:
        return None

    start = answer.find("{")
    end = answer.rfind("}")
    if start == -1 or end == -1 or end <= start:
        return None

    try:
        parsed = json.loads(answer[start : end + 1])
    except Exception:
        return None
    if not isinstance(parsed, dict):
        return None
    return parsed


def _agent_report(bid: str, agent_id: str, days: int) -> Dict[str, Any]:
    table_calls = f"{bid}_raw_calls"
    table_analytics = f"{bid}_callanalytics"
    conn = pymysql.connect(
        host=CONFIG.get("DB_HOST", "127.0.0.1"),
        port=int(CONFIG.get("DB_PORT", 3306)),
        user=CONFIG.get("DB_USER", "admin"),
        password=CONFIG.get("DB_PASSWORD", ""),
        database=CONFIG.get("DB_NAME", "voicebot_cluster"),
        charset="utf8mb4",
        cursorclass=pymysql.cursors.DictCursor,
        autocommit=True,
    )
    try:
        cursor = conn.cursor()
        query = f"""
            SELECT
                COUNT(*) AS total_calls,
                AVG(a.quality_score) AS avg_quality_score,
                AVG(a.agent_speak_percentage) AS avg_agent_speak_percentage,
                AVG(a.customer_speak_percentage) AS avg_customer_speak_percentage,
                SUM(CASE WHEN a.sentiment = 'positive' THEN 1 ELSE 0 END) AS positive_calls,
                SUM(CASE WHEN a.sentiment = 'neutral' THEN 1 ELSE 0 END) AS neutral_calls,
                SUM(CASE WHEN a.sentiment = 'negative' THEN 1 ELSE 0 END) AS negative_calls
            FROM `{table_calls}` r
            LEFT JOIN `{table_analytics}` a ON r.callid = a.callid
            WHERE COALESCE(NULLIF(r.agentname, ''), NULLIF(r.agent_callinfo, ''), 'Unknown') = %s
              AND r.call_starttime >= DATE_SUB(NOW(), INTERVAL %s DAY)
        """
        cursor.execute(query, (agent_id, int(days)))
        row = cursor.fetchone() or {}
    finally:
        conn.close()
    return {
        "agent_id": agent_id,
        "window_days": int(days),
        "metrics": _to_json_safe(row),
        "generated_at": datetime.utcnow().isoformat() + "Z",
    }


def _ingestion_progress(bid: str, limit_runs: int = 10) -> Dict[str, Any]:
    conn = pymysql.connect(
        host=CONFIG.get("DB_HOST", "127.0.0.1"),
        port=int(CONFIG.get("DB_PORT", 3306)),
        user=CONFIG.get("DB_USER", "admin"),
        password=CONFIG.get("DB_PASSWORD", ""),
        database=CONFIG.get("DB_NAME", "voicebot_cluster"),
        charset="utf8mb4",
        cursorclass=pymysql.cursors.DictCursor,
        autocommit=True,
    )
    try:
        cursor = conn.cursor()
        cursor.execute(
            """
            SELECT bid, status, last_run_started_at, last_run_finished_at, last_duration_ms,
                   processed_calls, ingested_documents, ingested_chunks, skipped, last_error, updated_at
            FROM rag_ingestion_progress
            WHERE bid = %s
            LIMIT 1
            """,
            (str(bid),),
        )
        current = cursor.fetchone() or {}
        cursor.execute(
            """
            SELECT id, bid, started_at, finished_at, duration_ms, status,
                   processed_calls, ingested_documents, ingested_chunks, skipped, error, created_at
            FROM rag_ingestion_runs
            WHERE bid = %s
            ORDER BY id DESC
            LIMIT %s
            """,
            (str(bid), int(limit_runs)),
        )
        runs = cursor.fetchall() or []
    finally:
        conn.close()
    return {"bid": str(bid), "current": _to_json_safe(current), "recent_runs": _to_json_safe(runs)}


def _run_agent_call_analysis(bid: str, call_id: str, agent_type: str) -> Dict[str, Any]:
    context = _collect_call_context(bid, call_id)
    transcript_obj = context.get("transcript") or {}
    if not transcript_obj.get("transcript"):
        raise HTTPException(status_code=404, detail=f"Transcript not found for call_id={call_id}")

    agent_instruction = _resolve_agent_instruction_for_bid(bid, agent_type)
    prompt = (
        f"{agent_instruction}\n"
        "Return STRICT JSON with keys: summary, score, risks, opportunities, actions, model_data.\n"
        "score must be 0-100 numeric.\n\n"
        f"CALL CONTEXT:\n{json.dumps(context, ensure_ascii=True)}"
    )
    settings = _resolve_agent_model_settings(bid, agent_type)
    response_text = (
        rag_handler._invoke_chat_model(
            prompt,
            provider=settings["provider"],
            model_name=settings["model_name"] or None,
            runtime_config=settings["runtime_config"],
        )
        or ""
    )
    start = response_text.find("{")
    end = response_text.rfind("}")
    if start == -1 or end == -1 or end <= start:
        result_obj = {
            "summary": "Model returned non-JSON output.",
            "score": 0,
            "risks": [],
            "opportunities": [],
            "actions": [],
            "model_data": {"raw_response": response_text[:2000]},
        }
    else:
        try:
            result_obj = json.loads(response_text[start : end + 1])
        except Exception:
            result_obj = {
                "summary": "Model output parse failed.",
                "score": 0,
                "risks": [],
                "opportunities": [],
                "actions": [],
                "model_data": {"raw_response": response_text[:2000]},
            }

    conn = _db_conn()
    try:
        cursor = conn.cursor()
        cursor.execute(
            """
            INSERT INTO rag_agent_call_analysis (bid, call_id, agent_type, status, model_name, summary, result_json)
            VALUES (%s, %s, %s, 'completed', %s, %s, %s)
            ON DUPLICATE KEY UPDATE
                status='completed',
                model_name=VALUES(model_name),
                summary=VALUES(summary),
                result_json=VALUES(result_json),
                updated_at=CURRENT_TIMESTAMP
            """,
            (
                str(bid),
                str(call_id),
                str(agent_type),
                str(settings["model_name"] or CONFIG.get("RAG_CHAT_MODEL", "")),
                str(result_obj.get("summary") or ""),
                json.dumps(result_obj, ensure_ascii=True),
            ),
        )
    finally:
        conn.close()
    return {
        "bid": str(bid),
        "call_id": str(call_id),
        "agent_type": str(agent_type),
        "result": _to_json_safe(result_obj),
        "generated_at": datetime.utcnow().isoformat() + "Z",
    }


def _extract_lsq_lead_record(payload: Any) -> Optional[Dict[str, Any]]:
    if payload is None:
        return None
    if isinstance(payload, list):
        return payload[0] if payload else None
    if isinstance(payload, dict):
        for key in ("List", "Data", "list", "data"):
            value = payload.get(key)
            if isinstance(value, list) and value:
                return value[0]
        return payload
    return None


def _first_present_str(*values: Any) -> Optional[str]:
    for value in values:
        if value is None:
            continue
        text = str(value).strip()
        if text:
            return text
    return None


def _lsq_field(record: Optional[Dict[str, Any]], *keys: str) -> Optional[Any]:
    if not isinstance(record, dict):
        return None
    for key in keys:
        if key in record and record.get(key) not in (None, ""):
            return record.get(key)
    for key in keys:
        for record_key, value in record.items():
            if str(record_key).lower() == str(key).lower() and value not in (None, ""):
                return value
    return None


def _lsq_display_name(record: Optional[Dict[str, Any]]) -> Optional[str]:
    if not isinstance(record, dict):
        return None
    first = _first_present_str(_lsq_field(record, "FirstName", "first_name"))
    last = _first_present_str(_lsq_field(record, "LastName", "last_name"))
    full_name = " ".join([part for part in [first, last] if part]).strip() or None
    return _first_present_str(
        _lsq_field(record, "ProspectName", "prospect_name", "ContactName", "contact_name"),
        full_name,
        _lsq_field(record, "Name", "name"),
    )


def _select_lsq_record_for_phone(payload: Any, phone: Any) -> Optional[Dict[str, Any]]:
    target_variants = set(_normalize_phone_variants(phone))
    if not target_variants:
        return _extract_lsq_lead_record(payload)

    leads = _extract_lsq_lead_list(payload)
    if not leads:
        single = _extract_lsq_lead_record(payload)
        leads = [single] if isinstance(single, dict) else []

    for lead in leads:
        lead_phone = _lsq_field(lead, "Phone", "Mobile", "PhoneNumber", "phone", "mobile")
        if not lead_phone:
            continue
        lead_variants = set(_normalize_phone_variants(lead_phone))
        if target_variants.intersection(lead_variants):
            return lead
    return None


def _record_matches_phone(record: Any, phone: Any, fallback_phone: Optional[Any] = None) -> bool:
    target_variants = set(_normalize_phone_variants(phone))
    if not target_variants:
        return False

    lead_phone = _lsq_field(record, "Phone", "Mobile", "PhoneNumber", "phone", "mobile") if isinstance(record, dict) else None
    if lead_phone:
        return bool(target_variants.intersection(set(_normalize_phone_variants(lead_phone))))

    if fallback_phone:
        return bool(target_variants.intersection(set(_normalize_phone_variants(fallback_phone))))

    return False


def _lsq_external_id(record: Optional[Dict[str, Any]], fallback_phone: Optional[str] = None) -> Optional[str]:
    if not isinstance(record, dict):
        return None
    lead_id = _first_present_str(
        _lsq_field(record, "ProspectID", "LeadId", "Id", "lead_id", "ProspectId"),
    )
    if lead_id:
        return lead_id
    phone = _first_present_str(_lsq_field(record, "Phone", "Mobile", "PhoneNumber"), fallback_phone) or ""
    email = _first_present_str(_lsq_field(record, "EmailAddress", "email")) or ""
    name = _lsq_display_name(record) or ""
    digest_source = f"{phone}|{email}|{name}"
    return hashlib.sha256(digest_source.encode("utf-8")).hexdigest()


def _crm_payload_from_lsq(record: Dict[str, Any]) -> Dict[str, Any]:
    return {
        "raw": record,
        "name": _lsq_display_name(record),
        "email": _lsq_field(record, "EmailAddress", "email"),
        "phone": _lsq_field(record, "Phone", "Mobile", "PhoneNumber"),
        "status": _lsq_field(record, "LeadStatus", "Status", "lead_status"),
        "owner_name": _lsq_field(record, "OwnerName", "Owner", "owner_name"),
        "next_task_due_date": _lsq_field(record, "NextTaskDueDate", "next_task_due_date"),
    }


def _crm_write_allowed() -> bool:
    return str(os.getenv("ALLOW_CRM_WRITE", "false")).strip().lower() == "true"


def _get_lsq_service_for_bid_or_error(bid: str) -> LeadSquaredService:
    creds = db_handler.get_crm_credentials(bid, "leadsquared")
    if not creds or not creds.get("access_key") or not creds.get("secret_key") or not creds.get("is_active"):
        raise HTTPException(status_code=400, detail="LeadSquared integration not configured or inactive")
    return LeadSquaredService(
        access_key=creds["access_key"],
        secret_key=creds["secret_key"],
        api_host=creds.get("api_host"),
    )


def _get_presales_mapping_from_leadsquared(
    bid: str,
    groupname: Optional[str] = None,
    row_count: int = 250,
    force_refresh: bool = False,
) -> Dict[str, Any]:
    safe_row_count = max(10, min(int(row_count or 250), 500))
    cache_key = f"{bid}:{groupname or ''}:{safe_row_count}"
    now = int(time.time())

    cached = _PRESALES_MAP_CACHE.get(cache_key)
    if cached and not force_refresh and (now - int(cached.get("ts", 0)) <= _PRESALES_MAP_CACHE_TTL_SECONDS):
        return cached.get("data", {})

    creds = db_handler.get_crm_credentials(bid, "leadsquared")
    if not creds or not creds.get("access_key") or not creds.get("secret_key") or not creds.get("is_active"):
        return {
            "success": False,
            "connected": False,
            "message": "LeadSquared integration not configured or inactive",
            "groups": [],
            "agents": [],
            "customers": [],
            "stats": {"lsq_leads_fetched": 0, "matched_customers": 0, "mapped_rows": 0},
        }

    service = LeadSquaredService(
        access_key=creds["access_key"],
        secret_key=creds["secret_key"],
        api_host=creds.get("api_host"),
    )
    lsq_search = service.search_leads({"Paging": {"Offset": 0, "RowCount": safe_row_count}})
    if not lsq_search.get("success"):
        return {
            "success": False,
            "connected": True,
            "message": lsq_search.get("message") or "Failed to fetch leads from LeadSquared",
            "groups": [],
            "agents": [],
            "customers": [],
            "stats": {"lsq_leads_fetched": 0, "matched_customers": 0, "mapped_rows": 0},
        }

    leads = _extract_lsq_lead_list(lsq_search.get("data"))
    phone_to_lead: Dict[str, Dict[str, Any]] = {}
    all_phone_variants: set[str] = set()
    for lead in leads:
        phone = _lsq_field(lead, "Phone", "Mobile", "PhoneNumber")
        if not phone:
            continue
        for variant in _normalize_phone_variants(phone):
            all_phone_variants.add(variant)
            if variant not in phone_to_lead:
                phone_to_lead[variant] = lead

    rows = db_handler.get_group_agent_customer_rows(
        bid=bid,
        customer_numbers=list(all_phone_variants),
        groupname=groupname,
    )

    group_index: Dict[str, Dict[str, Any]] = {}
    agent_index: Dict[str, Dict[str, Any]] = {}
    customer_rows: List[Dict[str, Any]] = []

    for row in rows:
        group = row.get("groupname") or "-"
        agent = row.get("agentname") or "-"
        customer_phone = row.get("customer_callinfo") or ""
        matched_lead = None
        for variant in _normalize_phone_variants(customer_phone):
            matched_lead = phone_to_lead.get(variant)
            if matched_lead:
                break

        if group not in group_index:
            group_index[group] = {"groupname": group, "totalCalls": 0, "matchedCustomers": set(), "agents": set()}
        if agent not in agent_index:
            agent_index[agent] = {"agentname": agent, "groupnames": set(), "totalCalls": 0, "matchedCustomers": set()}

        total_calls = int(row.get("total_calls") or 0)
        group_index[group]["totalCalls"] += total_calls
        group_index[group]["agents"].add(agent)
        group_index[group]["matchedCustomers"].add(customer_phone)

        agent_index[agent]["totalCalls"] += total_calls
        agent_index[agent]["groupnames"].add(group)
        agent_index[agent]["matchedCustomers"].add(customer_phone)

        customer_rows.append(
            {
                "customer_callinfo": customer_phone,
                "groupname": group,
                "agentname": agent,
                "total_calls": total_calls,
                "last_call": row.get("last_call"),
                "lsq_name": _lsq_display_name(matched_lead),
                "lsq_owner_name": _lsq_field(matched_lead, "OwnerName", "Owner", "owner_name"),
                "lsq_status": _lsq_field(matched_lead, "LeadStatus", "Status", "lead_status"),
            }
        )

    groups = [
        {
            "groupname": val["groupname"],
            "totalCalls": int(val["totalCalls"]),
            "matchedCustomers": len(val["matchedCustomers"]),
            "agentsCount": len(val["agents"]),
        }
        for val in group_index.values()
    ]
    groups.sort(key=lambda x: x["totalCalls"], reverse=True)

    agents = []
    for val in agent_index.values():
        groupnames = sorted(val["groupnames"])
        agents.append(
            {
                "agentname": val["agentname"],
                "groupnames": groupnames,
                "groupname": groupnames[0] if groupnames else None,
                "totalCalls": int(val["totalCalls"]),
                "matchedCustomers": len(val["matchedCustomers"]),
            }
        )
    agents.sort(key=lambda x: x["totalCalls"], reverse=True)

    response = {
        "success": True,
        "connected": True,
        "message": "Pre-sales mapping generated from LeadSquared leads",
        "groups": groups,
        "agents": agents,
        "customers": customer_rows,
        "stats": {
            "lsq_leads_fetched": len(leads),
            "matched_customers": len({row["customer_callinfo"] for row in customer_rows}),
            "mapped_rows": len(customer_rows),
        },
    }
    _PRESALES_MAP_CACHE[cache_key] = {"ts": now, "data": response}
    return response


app = FastAPI(title="MCube AI FastAPI", version="1.0.0")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

_metrics = {"requests_total": 0, "errors_total": 0}
_ensure_agent_tables()


@app.middleware("http")
async def metrics_and_latency(request: Request, call_next):
    _metrics["requests_total"] += 1
    started = time.perf_counter()
    try:
        response = await call_next(request)
    except Exception:
        _metrics["errors_total"] += 1
        raise
    latency_ms = round((time.perf_counter() - started) * 1000, 2)
    logger.info("path=%s status=%s latency_ms=%s", request.url.path, response.status_code, latency_ms)
    response.headers["X-Process-Time-Ms"] = str(latency_ms)
    return response


@app.get("/health")
def health():
    return {
        "status": "healthy",
        "service": "mcube-ai-fastapi",
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "metrics": _metrics,
    }


@app.get("/rag/agents")
def list_agents(
    bid: str = Query(..., min_length=1),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    return {"agents": _get_agent_configs(bid)}


@app.get("/rag/{bid}/agents/catalog")
def rag_agent_catalog(
    bid: str,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    return {
        "bid": str(bid),
        "catalog": get_default_catalog(),
        "providers": ["auto", "ollama", "bedrock"],
        "suggested_models": {
            "ollama": ["qwen2.5:7b", "gemma2:9b", "deepseek-r1:8b"],
            "bedrock": [str(CONFIG.get("RAG_CHAT_MODEL", ""))],
        },
    }


@app.get("/rag/{bid}/ollama/models")
def rag_ollama_models(
    bid: str,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    base_url = str(CONFIG.get("OLLAMA_BASE_URL", "http://127.0.0.1:11434")).rstrip("/")
    try:
        response = requests.get(f"{base_url}/api/tags", timeout=30)
        response.raise_for_status()
        payload = response.json() if response.content else {}
    except Exception as exc:  # noqa: BLE001
        raise HTTPException(status_code=502, detail=f"Failed to fetch models from Ollama ({base_url}): {exc}")
    models = payload.get("models") or []
    names = [str(item.get("name")) for item in models if item.get("name")]
    return {"ollama_base_url": base_url, "models": names, "raw": payload}


@app.post("/rag/{bid}/documents")
def rag_ingest_documents(bid: str, payload: IngestDocumentsRequest, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    if not payload.documents:
        raise HTTPException(status_code=400, detail="documents array is required")
    result = rag_handler.ingest_documents(bid, payload.documents)
    return {"status": "success", "ingestion": result}


@app.post("/rag/{bid}/ingest-transcripts")
def rag_ingest_transcripts(bid: str, payload: IngestTranscriptsRequest, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    return rag_handler.backfill_transcripts(
        bid=bid,
        presales_only=payload.presales_only,
        limit=int(payload.limit),
        overwrite_existing=payload.overwrite_existing,
        callids=payload.callids,
    )


@app.post("/rag/{bid}/query")
def rag_query(bid: str, payload: QueryRequest, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    if not payload.user_id:
        raise HTTPException(status_code=400, detail="user_id is required")
    if not payload.message or not payload.message.strip():
        raise HTTPException(status_code=400, detail="message is required")

    agent_type = normalize_agent(payload.agent_type)
    agent_cfg = _get_agent_config(bid, agent_type) or {}
    if agent_cfg.get("is_locked"):
        raise HTTPException(status_code=403, detail=f"{agent_type} is locked for this business")
    if agent_cfg.get("is_enabled") is False:
        raise HTTPException(status_code=403, detail=f"{agent_type} is not enabled for this business")
    agent_instruction = _resolve_agent_instruction_for_bid(bid, agent_type)
    metadata = dict(payload.metadata or {})
    metadata["agent_type"] = agent_type
    metadata["llm_provider"] = str(agent_cfg.get("provider") or "auto")
    metadata["llm_model_name"] = str(agent_cfg.get("model_name") or "")
    metadata["llm_runtime_config"] = agent_cfg.get("runtime_config") or {}
    # Keep embedding input compact (user message only) and pass agent instruction separately for generation.
    max_instruction_chars = int(CONFIG.get("RAG_AGENT_INSTRUCTION_MAX_CHARS", 12000))
    metadata["agent_instruction"] = str(agent_instruction or "")[:max_instruction_chars]
    metadata["agent_instruction_truncated"] = len(str(agent_instruction or "")) > max_instruction_chars

    result = rag_handler.query(
        bid=bid,
        user_id=payload.user_id,
        message=payload.message,
        query_embedding=payload.query_embedding,
        conversation_id=payload.conversation_id,
        top_k=payload.top_k,
        min_similarity=payload.min_similarity,
        metadata=metadata,
        profile_updates=payload.profile_updates,
    )
    result["agent_type"] = agent_type
    return result


@app.get("/rag/{bid}/conversations/{conversation_id}")
def rag_get_conversation(
    bid: str,
    conversation_id: str,
    limit: int = Query(default=200, ge=1, le=1000),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    messages = rag_handler.get_conversation_messages(bid, conversation_id, limit=limit)
    return {"conversation_id": conversation_id, "messages": messages}


@app.get("/rag/{bid}/conversations")
def rag_list_conversations(
    bid: str,
    user_id: Optional[str] = Query(default=None),
    limit: int = Query(default=50, ge=1, le=200),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    return _list_conversations(bid=bid, user_id=user_id, limit=limit)


@app.get("/rag/{bid}/profiles/{user_id}")
def rag_get_profile(bid: str, user_id: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    return rag_handler.get_user_profile(bid, user_id)


@app.post("/rag/{bid}/score-call")
def rag_score_call(bid: str, payload: ScoreCallRequest, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    context = _collect_call_context(bid, payload.call_id)
    transcript_obj = context.get("transcript") or {}
    if not transcript_obj.get("transcript"):
        raise HTTPException(status_code=404, detail=f"Transcript not found for call_id={payload.call_id}")

    scored = _score_call_with_llm(context, bid, normalize_agent(payload.agent_type))
    if payload.force_llm and not scored:
        raise HTTPException(status_code=502, detail="LLM scoring failed")
    response = scored or _score_call_fallback(context)
    return {
        "bid": str(bid),
        "call_id": payload.call_id,
        "agent_type": normalize_agent(payload.agent_type),
        "scorecard": response,
        "generated_at": datetime.utcnow().isoformat() + "Z",
    }


@app.get("/rag/{bid}/agent-report")
def rag_agent_report(
    bid: str,
    agent_id: str = Query(..., min_length=1),
    days: int = Query(default=30, ge=1, le=365),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    return _agent_report(bid, agent_id, days)


@app.get("/rag/{bid}/ingestion-progress")
def rag_ingestion_progress(
    bid: str,
    limit_runs: int = Query(default=10, ge=1, le=100),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    return _ingestion_progress(bid, limit_runs=limit_runs)


@app.put("/rag/{bid}/agents/config/{agent_type}")
def rag_upsert_agent_config(
    bid: str,
    agent_type: str,
    payload: AgentConfigUpsertRequest,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    agent_type = normalize_agent(agent_type)
    _ensure_default_agent_configs(bid)
    existing = _get_agent_config(bid, agent_type) or {}
    if not existing:
        raise HTTPException(status_code=404, detail=f"Unknown agent type: {agent_type}")

    update_obj = {
        "display_name": payload.display_name if payload.display_name is not None else existing.get("display_name"),
        "is_enabled": int(payload.is_enabled) if payload.is_enabled is not None else int(bool(existing.get("is_enabled"))),
        "is_locked": int(payload.is_locked) if payload.is_locked is not None else int(bool(existing.get("is_locked"))),
        "system_prompt": payload.system_prompt if payload.system_prompt is not None else existing.get("system_prompt"),
        "provider": payload.provider if payload.provider is not None else existing.get("provider"),
        "model_name": payload.model_name if payload.model_name is not None else existing.get("model_name"),
        "runtime_config": payload.runtime_config if payload.runtime_config is not None else (existing.get("runtime_config") or {}),
    }
    conn = _db_conn()
    try:
        cursor = conn.cursor()
        cursor.execute(
            """
            UPDATE rag_agent_configs
            SET display_name = %s,
                is_enabled = %s,
                is_locked = %s,
                system_prompt = %s,
                provider = %s,
                model_name = %s,
                runtime_config = %s,
                updated_by = %s
            WHERE bid = %s AND agent_type = %s
            """,
            (
                str(update_obj["display_name"]),
                int(update_obj["is_enabled"]),
                int(update_obj["is_locked"]),
                str(update_obj["system_prompt"]),
                str(update_obj["provider"] or "auto"),
                str(update_obj["model_name"] or ""),
                json.dumps(update_obj["runtime_config"], ensure_ascii=True),
                str(user.get("username") or user.get("id") or "system"),
                str(bid),
                str(agent_type),
            ),
        )
    finally:
        conn.close()
    return {"status": "ok", "agent": _get_agent_config(bid, agent_type)}


@app.post("/rag/{bid}/agents/{agent_type}/knowledge-pdf")
async def rag_upload_agent_knowledge_pdf(
    bid: str,
    agent_type: str,
    request: Request,
    filename: str = Query(..., min_length=1),
    append_to_existing_prompt: bool = Query(default=False),
    user: Dict[str, Any] = Depends(_auth_user),
):
    """
    Upload a knowledge PDF containing prompt + knowledge bands for an agent.
    For current rollout, this endpoint is restricted to the pre-sales lead agent.
    """
    _auth_user_for_bid(bid, user)
    agent_type = normalize_agent(agent_type)
    _assert_raj_presales_agent(agent_type)
    if not filename or not str(filename).lower().endswith(".pdf"):
        raise HTTPException(status_code=400, detail="Only .pdf files are supported")

    _ensure_default_agent_configs(bid)
    existing = _get_agent_config(bid, agent_type) or {}
    if not existing:
        raise HTTPException(status_code=404, detail=f"Unknown agent type: {agent_type}")

    content_type = str(request.headers.get("content-type") or "").lower()
    if "application/pdf" not in content_type:
        raise HTTPException(status_code=400, detail="Request content-type must be application/pdf")
    file_bytes = await request.body()
    if not file_bytes:
        raise HTTPException(status_code=400, detail="Uploaded PDF is empty")
    if len(file_bytes) > 50 * 1024 * 1024:
        raise HTTPException(status_code=400, detail="PDF exceeds 50MB limit")

    page_texts = _extract_pdf_pages(file_bytes)
    page_count = len(page_texts)
    if page_count < 1:
        raise HTTPException(status_code=400, detail="No pages were detected in the uploaded PDF")

    cleaned_pages = [str(text or "").strip() for text in page_texts]
    if not any(cleaned_pages):
        raise HTTPException(status_code=400, detail="Could not extract usable text from the PDF")

    knowledge_bands = [{"page": idx + 1, "text": text} for idx, text in enumerate(cleaned_pages)]
    compiled_knowledge = "\n\n".join(
        [f"[Knowledge Band Page {item['page']}]\n{item['text']}" for item in knowledge_bands if item["text"]]
    )
    if not compiled_knowledge:
        raise HTTPException(status_code=400, detail="No valid knowledge-band content found in the PDF")

    base_instruction = get_agent_instruction(agent_type)
    knowledge_instruction = (
        "Use the following uploaded prompt and knowledge bands as the primary rubric for evaluation.\n\n"
        f"{compiled_knowledge}"
    )
    if append_to_existing_prompt and existing.get("system_prompt"):
        system_prompt = f"{existing.get('system_prompt')}\n\n{knowledge_instruction}"
    else:
        system_prompt = f"{base_instruction}\n\n{knowledge_instruction}"

    runtime_config = dict(existing.get("runtime_config") or {})
    runtime_config["knowledge_pdf_meta"] = {
        "filename": str(filename),
        "uploaded_by": str(user.get("username") or user.get("id") or "system"),
        "uploaded_at": datetime.utcnow().isoformat() + "Z",
        "page_count": page_count,
        "char_count": len(compiled_knowledge),
    }
    runtime_config["knowledge_bands"] = knowledge_bands
    runtime_config["knowledge_compiled_prompt"] = compiled_knowledge

    conn = _db_conn()
    version_no = 1
    try:
        cursor = conn.cursor()
        cursor.execute(
            """
            SELECT COALESCE(MAX(version_no), 0) AS max_version
            FROM rag_agent_knowledge_uploads
            WHERE bid = %s AND agent_type = %s
            """,
            (str(bid), str(agent_type)),
        )
        row = cursor.fetchone() or {}
        version_no = int(row.get("max_version") or 0) + 1
        runtime_config["knowledge_pdf_meta"]["version_no"] = version_no
        cursor.execute(
            """
            UPDATE rag_agent_configs
            SET system_prompt = %s,
                runtime_config = %s,
                updated_by = %s
            WHERE bid = %s AND agent_type = %s
            """,
            (
                str(system_prompt),
                json.dumps(runtime_config, ensure_ascii=True),
                str(user.get("username") or user.get("id") or "system"),
                str(bid),
                str(agent_type),
            ),
        )
        cursor.execute(
            """
            INSERT INTO rag_agent_knowledge_uploads (
                bid, agent_type, version_no, filename, page_count, char_count,
                knowledge_bands, compiled_prompt, uploaded_by
            )
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
            """,
            (
                str(bid),
                str(agent_type),
                int(version_no),
                str(filename),
                int(page_count),
                int(len(compiled_knowledge)),
                json.dumps(knowledge_bands, ensure_ascii=True),
                str(compiled_knowledge),
                str(user.get("username") or user.get("id") or "system"),
            ),
        )
    finally:
        conn.close()

    return {
        "status": "ok",
        "bid": str(bid),
        "agent_type": str(agent_type),
        "version_no": int(version_no),
        "file": {
            "filename": str(filename),
            "page_count": page_count,
            "char_count": len(compiled_knowledge),
        },
        "agent": _get_agent_config(bid, agent_type),
    }


@app.get("/rag/{bid}/agents/{agent_type}/knowledge-pdf")
def rag_get_agent_knowledge_pdf(
    bid: str,
    agent_type: str,
    user: Dict[str, Any] = Depends(_auth_user),
):
    """
    Preview latest uploaded knowledge PDF extraction for agent configuration.
    """
    _auth_user_for_bid(bid, user)
    agent_type = normalize_agent(agent_type)
    _assert_raj_presales_agent(agent_type)

    cfg = _get_agent_config(bid, agent_type) or {}
    runtime_config = cfg.get("runtime_config") or {}
    meta = runtime_config.get("knowledge_pdf_meta") or {}
    knowledge_bands = runtime_config.get("knowledge_bands") or []
    compiled = runtime_config.get("knowledge_compiled_prompt") or ""

    if not meta:
        return {
            "status": "ok",
            "bid": str(bid),
            "agent_type": str(agent_type),
            "has_uploaded_knowledge": False,
            "knowledge_pdf": None,
        }

    return {
        "status": "ok",
        "bid": str(bid),
        "agent_type": str(agent_type),
        "has_uploaded_knowledge": True,
        "knowledge_pdf": {
            "meta": meta,
            "knowledge_bands": knowledge_bands,
            "compiled_prompt_preview": str(compiled)[:4000],
        },
    }


@app.get("/rag/{bid}/agents/{agent_type}/knowledge-pdf/history")
def rag_get_agent_knowledge_pdf_history(
    bid: str,
    agent_type: str,
    limit: int = Query(default=10, ge=1, le=50),
    user: Dict[str, Any] = Depends(_auth_user),
):
    """
    List previous uploaded versions for knowledge PDF on the agent.
    """
    _auth_user_for_bid(bid, user)
    agent_type = normalize_agent(agent_type)
    _assert_raj_presales_agent(agent_type)

    conn = _db_conn()
    try:
        cursor = conn.cursor()
        cursor.execute(
            """
            SELECT version_no, filename, page_count, char_count, uploaded_by, created_at
            FROM rag_agent_knowledge_uploads
            WHERE bid = %s AND agent_type = %s
            ORDER BY version_no DESC
            LIMIT %s
            """,
            (str(bid), str(agent_type), int(limit)),
        )
        rows = cursor.fetchall() or []
    finally:
        conn.close()

    return {
        "status": "ok",
        "bid": str(bid),
        "agent_type": str(agent_type),
        "versions": _to_json_safe(rows),
    }


@app.post("/rag/{bid}/agents/analyze-call")
def rag_agent_analyze_call(
    bid: str,
    payload: AgentAnalyzeCallRequest,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    agent_type = normalize_agent(payload.agent_type)
    agent_cfg = _get_agent_config(bid, agent_type) or {}
    if agent_cfg.get("is_locked"):
        raise HTTPException(status_code=403, detail=f"{agent_type} is locked for this business")
    if agent_cfg.get("is_enabled") is False:
        raise HTTPException(status_code=403, detail=f"{agent_type} is not enabled for this business")
    if not payload.force_refresh:
        conn = _db_conn()
        try:
            cursor = conn.cursor()
            cursor.execute(
                """
                SELECT bid, call_id, agent_type, model_name, summary, result_json, updated_at
                FROM rag_agent_call_analysis
                WHERE bid = %s AND call_id = %s AND agent_type = %s
                LIMIT 1
                """,
                (str(bid), str(payload.call_id), str(agent_type)),
            )
            cached = cursor.fetchone()
        finally:
            conn.close()
        if cached:
            if isinstance(cached.get("result_json"), str):
                try:
                    cached["result_json"] = json.loads(cached["result_json"])
                except Exception:
                    pass
            return {"status": "cached", "analysis": _to_json_safe(cached)}
    return {"status": "generated", "analysis": _run_agent_call_analysis(bid, payload.call_id, agent_type)}


@app.get("/rag/{bid}/agents/analysis/{call_id}")
def rag_get_agent_analysis(
    bid: str,
    call_id: str,
    agent_type: str = Query(default="lead_quality_agent"),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    agent_type = normalize_agent(agent_type)
    conn = _db_conn()
    try:
        cursor = conn.cursor()
        cursor.execute(
            """
            SELECT bid, call_id, agent_type, model_name, summary, result_json, updated_at
            FROM rag_agent_call_analysis
            WHERE bid = %s AND call_id = %s AND agent_type = %s
            LIMIT 1
            """,
            (str(bid), str(call_id), str(agent_type)),
        )
        row = cursor.fetchone()
    finally:
        conn.close()
    if not row:
        raise HTTPException(status_code=404, detail="No analysis found")
    if isinstance(row.get("result_json"), str):
        try:
            row["result_json"] = json.loads(row["result_json"])
        except Exception:
            pass
    return {"analysis": _to_json_safe(row)}


@app.post("/auth/register")
async def auth_register(payload: Dict[str, Any]):
    required_fields = ["bid", "username", "email", "password"]
    for field in required_fields:
        if not payload.get(field):
            raise HTTPException(status_code=400, detail=f"{field} is required")
    result, status_code = auth_handler.create_user(
        bid=payload["bid"],
        username=payload["username"],
        email=payload["email"],
        password=payload["password"],
        full_name=payload.get("full_name"),
        role=payload.get("role", "user"),
    )
    return JSONResponse(status_code=status_code, content=result)


@app.post("/auth/login")
async def auth_login(payload: Dict[str, Any], request: Request):
    if not payload.get("username") or not payload.get("password"):
        raise HTTPException(status_code=400, detail="Username and password are required")
    result, status_code = auth_handler.login(
        payload["username"],
        payload["password"],
        ip_address=request.client.host if request.client else None,
        user_agent=request.headers.get("User-Agent"),
    )
    return JSONResponse(status_code=status_code, content=result)


@app.post("/auth/logout")
def auth_logout(user: Dict[str, Any] = Depends(_auth_user), authorization: Optional[str] = Header(default=None)):
    token = _extract_bearer_token(authorization)
    result, status_code = auth_handler.logout(token)
    return JSONResponse(status_code=status_code, content=result)


@app.get("/auth/me")
def auth_me(user: Dict[str, Any] = Depends(_auth_user)):
    return {"user": user}


@app.get("/auth/users/{bid}")
def auth_users_for_business(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    users, status_code = auth_handler.get_users_by_business(bid)
    return JSONResponse(status_code=status_code, content=jsonable_encoder({"users": users}))


@app.post("/business-admin/users/create")
async def business_admin_users_create(payload: Dict[str, Any], request: Request, user: Dict[str, Any] = Depends(_auth_user)):
    """
    Create a new user and assign access to the given business (`bid`).
    Business admins can do this for their own `bid`. Master admins can also do this.
    """
    bid = payload.get("bid")
    if not bid:
        raise HTTPException(status_code=400, detail="bid is required")

    if not user_has_business_admin(user, str(bid)):
        raise HTTPException(status_code=403, detail="Business admin access required")

    for field in ["username", "email", "password"]:
        if not payload.get(field):
            raise HTTPException(status_code=400, detail=f"{field} is required")

    # Role used for business assignment ('admin'/'user'). Default to 'user'.
    business_role = str(payload.get("role", "user") or "user").strip().lower()
    if business_role not in ("user", "admin"):
        business_role = "user"

    result, status_code = auth_handler.create_user(
        username=payload["username"],
        email=payload["email"],
        password=payload["password"],
        full_name=payload.get("full_name") or payload.get("name"),
        role=business_role,  # Stored on business_users row (not the per-bid role)
        is_master=False,
    )

    if status_code == 201:
        user_id = result["id"]
        # Ensure business access is created/updated.
        auth_handler.assign_business_access(user_id=user_id, bid=str(bid), role=business_role)
        auth_handler.log_activity(
            user_id=user.get("id"),
            username=user.get("username"),
            activity_type="create_user",
            description=f"Created new user: {payload['username']} for bid {bid}",
            ip_address=request.client.host if request.client else None,
            user_agent=request.headers.get("User-Agent"),
        )

    return JSONResponse(status_code=status_code, content=result)


@app.get("/list-businesses")
def list_businesses():
    return db_handler.get_all_businesses()


@app.get("/businesses/{bid}/info")
def business_info(bid: str):
    info = db_handler.get_business_info(bid)
    if not info:
        raise HTTPException(status_code=404, detail="Business not found")
    return info


@app.get("/groupnames/{bid}")
def groupnames(
    bid: str,
    presales_only: bool = Query(default=False),
    lsq_row_count: int = Query(default=250),
    force_refresh: bool = Query(default=False),
):
    if presales_only:
        mapping = _get_presales_mapping_from_leadsquared(
            bid=bid,
            groupname=None,
            row_count=lsq_row_count,
            force_refresh=force_refresh,
        )
        groups = mapping.get("groups", [])
        return groups or db_handler.get_all_groupnames(bid)
    return db_handler.get_all_groupnames(bid)


@app.get("/agentnames/{bid}")
def agentnames(
    bid: str,
    groupname: Optional[str] = Query(default=None),
    presales_only: bool = Query(default=False),
    lsq_row_count: int = Query(default=250),
    force_refresh: bool = Query(default=False),
):
    if presales_only:
        mapping = _get_presales_mapping_from_leadsquared(
            bid=bid,
            groupname=groupname,
            row_count=lsq_row_count,
            force_refresh=force_refresh,
        )
        agents = [item.get("agentname") for item in mapping.get("agents", []) if item.get("agentname")]
        return agents or db_handler.get_agent_names(bid, groupname)
    return db_handler.get_agent_names(bid, groupname)


@app.get("/location-stats/{bid}")
def location_stats(bid: str, groupname: Optional[str] = Query(default=None)):
    return db_handler.get_location_stats(bid, groupname)


@app.get("/location-calls/{bid}")
def location_calls(
    bid: str,
    groupname: Optional[str] = Query(default=None),
    direction: Optional[str] = Query(default=None),
    call_status: Optional[str] = Query(default=None),
    limit: int = Query(default=100),
    offset: int = Query(default=0),
):
    return db_handler.get_filtered_raw_calls(bid, groupname, direction, call_status, limit, offset)


@app.get("/raw-calls/{bid}/{callid}")
def raw_call_details(bid: str, callid: str):
    call = db_handler.get_raw_call_details(bid, callid)
    if not call:
        raise HTTPException(status_code=404, detail="Call not found")
    return call


@app.get("/analytics/{bid}/pending")
def pending_analytics(bid: str, limit: int = Query(default=10)):
    calls = db_handler.get_calls_for_analysis(bid, limit)
    return {"count": len(calls), "calls": calls}


@app.get("/analytics/{bid}/dashboard")
def analytics_dashboard(bid: str, groupname: Optional[str] = Query(default=None)):
    try:
        with db_handler.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(f"SHOW TABLES LIKE '{bid}_raw_calls'")
                if not cur.fetchone():
                    return {"overview": {}, "sentiment_by_location": [], "quality_by_location": [], "quality_by_agent": [], "call_purposes": [], "concerns_frequency": [], "busy_locations": []}
    except Exception:
        pass
    return {
        "overview": db_handler.get_analytics_overview(bid, groupname),
        "sentiment_by_location": db_handler.get_sentiment_by_location(bid, groupname),
        "quality_by_location": db_handler.get_quality_by_location(bid, groupname),
        "quality_by_agent": db_handler.get_quality_by_agent(bid, groupname),
        "call_purposes": db_handler.get_call_purpose_frequency(bid, groupname),
        "concerns_frequency": db_handler.get_concerns_frequency(bid, groupname),
        "busy_locations": db_handler.get_busy_locations(bid, groupname),
    }


@app.get("/analytics/{bid}/calls-by-objection")
def analytics_calls_by_objection(
    bid: str,
    objection: str = Query(...),
    groupname: Optional[str] = Query(default=None),
):
    return db_handler.get_calls_by_objection(bid, objection, groupname)


@app.get("/calls/{bid}")
def calls_list(
    bid: str,
    status: Optional[int] = Query(default=None),
    sales_intent: Optional[str] = Query(default=None),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    limit: int = Query(default=100),
    offset: int = Query(default=0),
):
    filters: Dict[str, Any] = {}
    if status is not None:
        filters["status"] = status
    if sales_intent:
        filters["sales_intent"] = sales_intent
    if date_from:
        filters["date_from"] = date_from
    if date_to:
        filters["date_to"] = date_to
    calls = db_handler.get_calls(bid, filters, limit, offset)
    total_count = db_handler.get_calls_count(bid, filters)
    return {"calls": calls, "total": total_count, "limit": limit, "offset": offset}


@app.get("/calls/{bid}/{callid}")
def call_details(bid: str, callid: str):
    call = db_handler.get_call_by_id(bid, callid)
    if not call:
        raise HTTPException(status_code=404, detail="Call not found")

    transcript_data = db_handler.get_call_transcript(bid, callid)
    if transcript_data:
        call["transcripts"] = transcript_data.get("transcript", "")
        call["language"] = transcript_data.get("language", "")
        speaker_segments = transcript_data.get("speaker_segments")
        if speaker_segments and isinstance(speaker_segments, str):
            try:
                speaker_segments = json.loads(speaker_segments)
            except json.JSONDecodeError:
                speaker_segments = []
        call["speaker_segments"] = speaker_segments
        call["num_speakers"] = transcript_data.get("num_speakers")
        call["duration"] = transcript_data.get("duration")

    analytics_data = db_handler.get_call_analytics(bid, callid)
    if analytics_data:
        call["summary"] = analytics_data.get("summary", "")
        call["call_purpose"] = analytics_data.get("call_purpose", "")
        call["objections_concerns"] = analytics_data.get("objections_concerns", "")
        call["objection_type"] = analytics_data.get("objection_type", "")
        call["quality_score"] = analytics_data.get("quality_score")
        call["sentiments"] = analytics_data.get("sentiment", "")
        call["parameter_scores"] = analytics_data.get("parameter_scores")
        call["talk_listen_ratio"] = analytics_data.get("talk_listen_ratio")
        call["agent_speak_percentage"] = analytics_data.get("agent_speak_percentage")
        call["customer_speak_percentage"] = analytics_data.get("customer_speak_percentage")
        call["dead_air_percentage"] = analytics_data.get("dead_air_percentage")

    bant_data = db_handler.get_bant_analysis(bid, callid)
    if bant_data:
        call["bant_profile"] = bant_data.get("profile")
        call["bant_summary"] = bant_data.get("summary")
    return call


@app.get("/bant/{bid}/{callid}")
def bant_details(bid: str, callid: str):
    bant = db_handler.get_bant_analysis(bid, callid)
    if not bant:
        raise HTTPException(status_code=404, detail="BANT not found")
    return bant


@app.delete("/calls/{bid}/{callid}/transcript")
def delete_call_transcript(bid: str, callid: str):
    db_handler.delete_transcript(bid, callid)
    db_handler.reset_transcription_status(bid, callid)
    return {"message": "Transcript deleted successfully", "callid": callid}


@app.patch("/calls/{bid}/{callid}/segment/{segment_index}")
def patch_call_segment(bid: str, callid: str, segment_index: int, payload: Dict[str, Any]):
    new_text = str(payload.get("text", "")).strip()
    if not new_text:
        raise HTTPException(status_code=400, detail="Text cannot be empty")
    db_handler.update_speaker_segment_text(bid, callid, segment_index, new_text)
    return {"message": "Segment updated successfully", "callid": callid, "segment_index": segment_index}


@app.get("/calls/{bid}/recent")
def recent_calls(bid: str, limit: int = Query(default=10)):
    return db_handler.get_recent_calls(bid, limit)


@app.post("/calls/search")
def calls_search(payload: Dict[str, Any]):
    bid = payload.get("bid")
    query = payload.get("query", "")
    limit = int(payload.get("limit", 50))
    if not bid:
        raise HTTPException(status_code=400, detail="Business ID is required")
    return db_handler.search_calls(bid, query, limit)


@app.get("/leads/{bid}")
def leads_list(
    bid: str,
    groupname: Optional[str] = Query(default=None),
    limit: int = Query(default=100),
    offset: int = Query(default=0),
    transcripts_only: bool = Query(default=False),
    direction: Optional[str] = Query(default=None),
):
    result = db_handler.get_leads_list(bid=bid, groupname=groupname, limit=limit, offset=offset, transcripts_only=transcripts_only, direction=direction)
    return {"leads": result.get("leads", []), "total": int(result.get("total", 0)), "limit": limit, "offset": offset}


# ── Lead Insights — must be registered BEFORE the catch-all {lead_phone:path} GET ──

@app.get("/leads/{bid}/{lead_phone:path}/insights")
def get_lead_insights(bid: str, lead_phone: str):
    """Return cached lead insights (unified across all calls) or 404 if not yet generated."""
    decoded_phone = unquote(lead_phone)
    result = db_handler.get_lead_insights(bid, decoded_phone)
    if not result:
        raise HTTPException(status_code=404, detail="Insights not yet generated. POST to /insights/generate to create them.")
    return result


@app.post("/leads/{bid}/{lead_phone:path}/insights/generate")
def generate_lead_insights_endpoint(bid: str, lead_phone: str):
    """Trigger AI generation of unified lead insights. Overwrites cached result."""
    from lead_insights import generate_lead_insights
    decoded_phone = unquote(lead_phone)
    try:
        insights = generate_lead_insights(db_handler, bid, decoded_phone)
        return {"success": True, "insights": insights}
    except ValueError as exc:
        raise HTTPException(status_code=400, detail=str(exc))
    except Exception as exc:
        logger.error("Lead insights generation failed for %s/%s: %s", bid, decoded_phone, exc, exc_info=True)
        raise HTTPException(status_code=500, detail=f"Generation failed: {exc}")


@app.post("/leads/{bid}/{lead_phone:path}/rich-summary/generate")
def generate_call_rich_summary(bid: str, lead_phone: str, body: Dict[str, Any] = Body(...)):
    """Generate and save a rich_summary for a specific callid. Body: {callid: '...'}"""
    from lead_insights import generate_and_save_rich_summary
    callid = body.get("callid")
    if not callid:
        raise HTTPException(status_code=400, detail="callid is required")
    try:
        result = generate_and_save_rich_summary(db_handler, bid, callid)
        return {"success": True, "rich_summary": result}
    except ValueError as exc:
        raise HTTPException(status_code=400, detail=str(exc))
    except Exception as exc:
        logger.error("Rich summary generation failed for %s/%s: %s", bid, callid, exc, exc_info=True)
        raise HTTPException(status_code=500, detail=f"Generation failed: {exc}")


@app.put("/leads/{bid}/{lead_phone:path}/profile")
def update_lead_profile_alias(
    bid: str,
    lead_phone: str,
    body: Dict[str, Any] = Body(...),
):
    """Alias so PUT /profile also stays before the catch-all."""
    import hashlib as _hl
    decoded_phone = unquote(lead_phone)
    existing = db_handler.get_cached_crm_lead_by_phone(bid=bid, provider="leadsquared", phone=decoded_phone)
    existing_payload = (existing or {}).get("lead_payload") or {}
    if isinstance(existing_payload, str):
        try:
            import json as _json
            existing_payload = _json.loads(existing_payload)
        except Exception:
            existing_payload = {}
    merged_payload = {**existing_payload, **{k: v for k, v in body.items() if v is not None and v != ""}}
    eid = (existing or {}).get("external_lead_id") or _hl.sha256(f"{decoded_phone}|manual".encode()).hexdigest()
    db_handler.upsert_crm_lead_cache(
        bid=bid,
        provider="leadsquared",
        external_lead_id=eid,
        lead_name=body.get("contact_name") or (existing or {}).get("lead_name"),
        owner_name=body.get("owner_name") or (existing or {}).get("owner_name"),
        email=body.get("email") or (existing or {}).get("email"),
        phone_primary=decoded_phone,
        lead_status=body.get("lead_status") or (existing or {}).get("lead_status"),
        next_task_due_date=body.get("next_task_due_date") or (existing or {}).get("next_task_due_date"),
        lead_payload=merged_payload,
    )
    return {"success": True}


# ── Lead Notes (must be before catch-all GET) ─────────────────────────────────

@app.get("/leads/{bid}/{lead_phone:path}/notes")
def get_lead_notes(bid: str, lead_phone: str):
    decoded_phone = unquote(lead_phone)
    notes = db_handler.get_lead_notes(bid, decoded_phone)
    return {"notes": notes}


@app.post("/leads/{bid}/{lead_phone:path}/notes")
def add_lead_note(bid: str, lead_phone: str, body: Dict[str, Any] = Body(...)):
    decoded_phone = unquote(lead_phone)
    content = (body.get("content") or "").strip()
    if not content:
        raise HTTPException(status_code=400, detail="content is required")
    created_by = body.get("created_by")
    note_id = db_handler.add_lead_note(bid, decoded_phone, content, created_by)
    return {"id": note_id, "message": "Note saved"}


@app.delete("/leads/{bid}/notes/{note_id}")
def delete_lead_note(bid: str, note_id: int):
    deleted = db_handler.delete_lead_note(note_id, bid)
    if not deleted:
        raise HTTPException(status_code=404, detail="Note not found")
    return {"message": "Note deleted"}


# ── Lead Tasks (must be before catch-all GET) ─────────────────────────────────

@app.get("/leads/{bid}/{lead_phone:path}/tasks")
def get_lead_tasks(bid: str, lead_phone: str):
    decoded_phone = unquote(lead_phone)
    tasks = db_handler.get_lead_tasks(bid, decoded_phone)
    return {"tasks": tasks}


@app.post("/leads/{bid}/{lead_phone:path}/tasks")
def add_lead_task(bid: str, lead_phone: str, body: Dict[str, Any] = Body(...)):
    decoded_phone = unquote(lead_phone)
    title = (body.get("title") or "").strip()
    if not title:
        raise HTTPException(status_code=400, detail="title is required")
    due_date = body.get("due_date") or None
    created_by = body.get("created_by")
    task_id = db_handler.add_lead_task(bid, decoded_phone, title, due_date, created_by)
    return {"id": task_id, "message": "Task created"}


@app.patch("/leads/{bid}/tasks/{task_id}")
def update_lead_task(bid: str, task_id: int, body: Dict[str, Any] = Body(...)):
    done = bool(body.get("done", False))
    updated = db_handler.update_lead_task(task_id, bid, done)
    if not updated:
        raise HTTPException(status_code=404, detail="Task not found")
    return {"message": "Task updated"}


@app.delete("/leads/{bid}/tasks/{task_id}")
def delete_lead_task(bid: str, task_id: int):
    deleted = db_handler.delete_lead_task(task_id, bid)
    if not deleted:
        raise HTTPException(status_code=404, detail="Task not found")
    return {"message": "Task deleted"}


@app.get("/leads/{bid}/{lead_phone:path}")
def lead_detail(
    bid: str,
    lead_phone: str,
    groupname: Optional[str] = Query(default=None),
    include_crm: bool = Query(default=True),
):
    decoded_phone = unquote(lead_phone)
    details = db_handler.get_lead_details(bid=bid, lead_phone=decoded_phone, groupname=groupname)
    if not details:
        raise HTTPException(status_code=404, detail="Lead not found")
    crm = {"connected": False, "matched": False, "provider": "leadsquared", "lead": None}
    if include_crm:
        creds = db_handler.get_crm_credentials(bid, "leadsquared")
        if creds and creds.get("access_key") and creds.get("secret_key") and creds.get("is_active"):
            crm["connected"] = True
            cached = db_handler.get_cached_crm_lead_by_phone(bid=bid, provider="leadsquared", phone=decoded_phone)
            if cached and not _record_matches_phone(
                record=cached.get("lead_payload"),
                phone=decoded_phone,
                fallback_phone=None,
            ):
                cached = None
            if cached:
                crm["matched"] = True
                crm["lead"] = {
                    "raw": cached.get("lead_payload") or {},
                    "name": cached.get("lead_name"),
                    "email": cached.get("email"),
                    "phone": cached.get("phone_primary"),
                    "status": cached.get("lead_status"),
                    "owner_name": cached.get("owner_name"),
                    "next_task_due_date": cached.get("next_task_due_date"),
                }
            else:
                # No cache hit — try a live LSQ lookup and cache the result
                try:
                    from leadsquared_service import LeadSquaredService
                    lsq = LeadSquaredService(
                        access_key=creds["access_key"],
                        secret_key=creds["secret_key"],
                        api_host=creds.get("api_host") or "https://api-in21.leadsquared.com/v2/",
                    )
                    live = lsq.search_lead_by_phone(decoded_phone)
                    if live.get("success") and live.get("data"):
                        lead_rec = live["data"][0]
                        first = (lead_rec.get("FirstName") or "").strip()
                        last = (lead_rec.get("LastName") or "").strip()
                        name = (first + " " + last).strip() or lead_rec.get("ProspectName") or None
                        phone_val = lead_rec.get("Phone") or lead_rec.get("Mobile") or decoded_phone
                        import hashlib as _hl
                        eid = lead_rec.get("ProspectID") or _hl.sha256(decoded_phone.encode()).hexdigest()
                        db_handler.upsert_crm_lead_cache(
                            bid=bid, provider="leadsquared", external_lead_id=eid,
                            lead_name=name,
                            owner_name=lead_rec.get("OwnerName"),
                            email=lead_rec.get("EmailAddress"),
                            phone_primary=phone_val,
                            lead_status=lead_rec.get("LeadStatus"),
                            lead_payload=lead_rec,
                        )
                        crm["matched"] = True
                        crm["lead"] = {
                            "raw": lead_rec,
                            "name": name,
                            "email": lead_rec.get("EmailAddress"),
                            "phone": phone_val,
                            "status": lead_rec.get("LeadStatus"),
                            "owner_name": lead_rec.get("OwnerName"),
                            "next_task_due_date": lead_rec.get("NextTaskDueDate"),
                        }
                    else:
                        crm["message"] = "No CRM record found for this lead."
                except Exception:
                    crm["message"] = "No CRM record found for this lead."
    details["crm"] = crm
    details["customer_profile"] = _build_customer_profile(details, crm)
    return details


@app.get("/analytics/{bid}/stats")
def analytics_stats(bid: str, date_from: Optional[str] = Query(default=None), date_to: Optional[str] = Query(default=None)):
    return analytics_service.get_call_statistics(bid, date_from, date_to)


@app.get("/analytics/{bid}/sentiment")
def analytics_sentiment(bid: str, date_from: Optional[str] = Query(default=None), date_to: Optional[str] = Query(default=None)):
    return analytics_service.get_sentiment_distribution(bid, date_from, date_to)


@app.get("/analytics/{bid}/intent")
def analytics_intent(bid: str, date_from: Optional[str] = Query(default=None), date_to: Optional[str] = Query(default=None)):
    return analytics_service.get_intent_distribution(bid, date_from, date_to)


@app.get("/analytics/{bid}/trends")
def analytics_trends(bid: str, period: str = Query(default="day"), days: int = Query(default=7)):
    return analytics_service.get_trends(bid, period, days)


@app.get("/analytics/{bid}/agents")
def analytics_agents(bid: str, date_from: Optional[str] = Query(default=None), date_to: Optional[str] = Query(default=None)):
    return analytics_service.get_agent_performance(bid, date_from, date_to)


@app.get("/analytics/{bid}/leaderboard")
def analytics_leaderboard(
    bid: str,
    groupname: Optional[str] = Query(default=None),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
):
    """Agent leaderboard ranked by average call quality score."""
    data = db_handler.get_agent_leaderboard(bid, groupname, date_from, date_to)
    return {"leaderboard": data, "total_agents": len(data)}


@app.get("/analytics/{bid}/keywords")
def analytics_keywords(
    bid: str,
    limit: int = Query(default=20),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
):
    return analytics_service.get_top_keywords(bid, limit, date_from, date_to)


@app.get("/analytics/{bid}/{callid}")
def call_analytics(bid: str, callid: str):
    analytics = db_handler.get_call_analytics(bid, callid)
    if not analytics:
        raise HTTPException(status_code=404, detail="Analytics not found for this call")
    return analytics


@app.post("/queue-calls/{bid}")
def queue_calls(bid: str):
    calls = db_handler.get_calls(bid, {"status": 0}, limit=1000)
    return {"message": f"Queued {len(calls)} calls for processing", "count": len(calls), "business_id": bid}


@app.post("/process-calls/{bid}")
def process_calls(bid: str):
    return JSONResponse(status_code=202, content={"message": "Processing started", "business_id": bid})


@app.get("/transcripts/{bid}")
def get_transcripts(bid: str):
    return db_handler.get_transcripts(bid)


@app.get("/export/{bid}/calls")
def export_calls(
    bid: str,
    format: str = Query(default="json"),
    status: Optional[int] = Query(default=None),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
):
    filters = {k: v for k, v in {"status": status, "date_from": date_from, "date_to": date_to}.items() if v is not None}
    calls = db_handler.get_calls(bid, filters, limit=10000)
    if format == "csv":
        output = StringIO()
        if calls:
            writer = csv.DictWriter(output, fieldnames=calls[0].keys())
            writer.writeheader()
            writer.writerows(calls)
        headers = {"Content-Disposition": f"attachment; filename=calls_{bid}_{datetime.now().strftime('%Y%m%d')}.csv"}
        return PlainTextResponse(content=output.getvalue(), media_type="text/csv", headers=headers)
    return calls


@app.post("/webhook/call-update")
def webhook_call_update(payload: Dict[str, Any]):
    bid = payload.get("bid")
    callid = payload.get("callid")
    if not bid or not callid:
        raise HTTPException(status_code=400, detail="bid and callid are required")
    success = db_handler.update_call(bid, callid, payload.get("data", {}))
    if success:
        try:
            rag_handler.backfill_transcripts(bid=bid, presales_only=False, limit=1, overwrite_existing=True, callids=[callid])
        except Exception as rag_exc:
            logger.warning("RAG auto-ingest skipped for %s/%s: %s", bid, callid, rag_exc)
        return {"message": "Call updated successfully"}
    raise HTTPException(status_code=500, detail="Failed to update call")


@app.post("/webhook/conversation-summary")
def webhook_conversation_summary(payload: Dict[str, Any]):
    bid = payload.get("business_id")
    callid = payload.get("callid")
    transfer_reason = payload.get("transfer_reason")
    if not bid or not callid:
        raise HTTPException(status_code=400, detail="business_id and callid are required")
    success = db_handler.save_conversation_summary(bid, callid, transfer_reason)
    if not success:
        raise HTTPException(status_code=500, detail="Failed to save summary")
    return {"message": "Summary saved successfully"}


@app.get("/quality-parameters/{bid}")
def quality_parameters_list(bid: str):
    return quality_params_handler.get_parameters(bid)


@app.post("/quality-parameters/{bid}")
def quality_parameters_save(bid: str, payload: Dict[str, Any]):
    parameter_id = quality_params_handler.save_parameter(bid, payload or {})
    return {"message": "Parameter saved successfully", "parameter_id": parameter_id}


@app.get("/data-capture-fields/{bid}")
def data_capture_fields_get(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    """Field definitions for Settings → Data Capture Points and lead-insights schema."""
    _auth_user_for_bid(bid, user)
    rows = db_handler.get_data_capture_fields(bid)
    return {"fields": rows}


@app.put("/data-capture-fields/{bid}")
def data_capture_fields_put(
    bid: str,
    body: Dict[str, Any] = Body(...),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    _require_business_admin_or_master(bid, user)
    fields = body.get("fields") if isinstance(body, dict) else []
    db_handler.replace_data_capture_fields(bid, fields or [])
    return {"success": True, "fields": db_handler.get_data_capture_fields(bid)}


@app.post("/quality-parameters/{bid}/upload")
async def quality_parameters_upload(bid: str, file: UploadFile = File(...)):
    if not file:
        raise HTTPException(status_code=400, detail="No file provided")
    try:
        content = await file.read()
        result = quality_params_handler.import_parameters_from_file(bid, content, file.filename or "")
        return {"message": "Quality parameters uploaded successfully", **result}
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    except Exception as e:
        logger.exception("Quality parameters upload failed: %s", e)
        raise HTTPException(status_code=500, detail="Failed to upload quality parameters")


@app.get("/quality-parameters/{bid}/{param_id}")
def quality_parameter_get(bid: str, param_id: int):
    parameter = quality_params_handler.get_parameter_by_id(bid, param_id)
    if not parameter:
        raise HTTPException(status_code=404, detail="Parameter not found")
    return parameter


@app.delete("/quality-parameters/{bid}/{param_id}")
def quality_parameter_delete(bid: str, param_id: int):
    success = quality_params_handler.delete_parameter(bid, param_id)
    if not success:
        raise HTTPException(status_code=404, detail="Parameter not found or could not be deleted")
    return {"message": "Parameter deleted successfully"}


@app.get("/quality-parameters/{bid}/groups")
def quality_groups(bid: str):
    return quality_params_handler.get_parameter_groups(bid)


@app.get("/quality-parameters/{bid}/total-score")
def quality_total_score(bid: str):
    return {"total_score": quality_params_handler.calculate_total_possible_score(bid)}


@app.get("/objection-classifications/{bid}")
def objection_classifications(bid: str, business_type: Optional[str] = Query(default=None), is_active: Optional[str] = Query(default=None)):
    active = None if is_active is None else str(is_active).lower() == "true"
    return objection_handler.get_all_classifications(bid, business_type, active)


@app.get("/objection-classifications/{bid}/{classification_id}")
def objection_get_by_id(bid: str, classification_id: int):
    classification = objection_handler.get_classification_by_id(bid, classification_id)
    if not classification:
        raise HTTPException(status_code=404, detail="Classification not found")
    return classification


@app.post("/objection-classifications/{bid}")
def objection_create(bid: str, payload: Dict[str, Any]):
    if not payload or not payload.get("category_name"):
        raise HTTPException(status_code=400, detail="category_name is required")
    classification_id = objection_handler.create_classification(bid, payload, payload.get("created_by", "admin"))
    return JSONResponse(status_code=201, content={"id": classification_id, "message": "Classification created successfully"})


@app.put("/objection-classifications/{bid}/{classification_id}")
def objection_update(bid: str, classification_id: int, payload: Dict[str, Any]):
    success = objection_handler.update_classification(bid, classification_id, payload or {}, (payload or {}).get("updated_by", "admin"))
    if not success:
        raise HTTPException(status_code=404, detail="Classification not found or no changes made")
    return {"message": "Classification updated successfully"}


@app.delete("/objection-classifications/{bid}/{classification_id}")
def objection_delete(bid: str, classification_id: int):
    success = objection_handler.delete_classification(bid, classification_id)
    if not success:
        raise HTTPException(status_code=404, detail="Classification not found")
    return {"message": "Classification deleted successfully"}


@app.post("/objection-classifications/{bid}/{classification_id}/toggle")
def objection_toggle(bid: str, classification_id: int):
    success = objection_handler.toggle_active_status(bid, classification_id)
    if not success:
        raise HTTPException(status_code=404, detail="Classification not found")
    return {"message": "Classification status toggled successfully"}


@app.get("/objection-classifications/{bid}/search")
def objection_search(bid: str, q: str = Query(...)):
    return objection_handler.search_classifications(bid, q)


@app.get("/objection-classifications/{bid}/by-severity/{severity}")
def objection_by_severity(bid: str, severity: str):
    if severity not in ["low", "medium", "high", "critical"]:
        raise HTTPException(status_code=400, detail="Invalid severity level")
    return objection_handler.get_classifications_by_severity(bid, severity)


@app.post("/objection-classifications/{bid}/classify")
def objection_classify(bid: str, payload: Dict[str, Any]):
    text = (payload or {}).get("objection_text")
    if not text:
        raise HTTPException(status_code=400, detail="objection_text is required")
    return objection_handler.classify_objection(bid, text)


@app.get("/objection-classifications/{bid}/statistics")
def objection_statistics(bid: str):
    return objection_handler.get_statistics(bid)


@app.get("/crm/{bid}/leadsquared/presales-mapping")
def crm_presales_mapping(
    bid: str,
    groupname: Optional[str] = Query(default=None),
    lsq_row_count: int = Query(default=250),
    force_refresh: bool = Query(default=False),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    mapping = _get_presales_mapping_from_leadsquared(bid, groupname=groupname, row_count=lsq_row_count, force_refresh=force_refresh)
    return JSONResponse(status_code=(200 if mapping.get("success") else 400), content=mapping)


@app.get("/crm/{bid}/leadsquared/integration")
def crm_lsq_get_integration(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    integration = db_handler.get_crm_integration(bid, "leadsquared")
    if not integration:
        return JSONResponse(status_code=404, content={"success": False, "message": "LeadSquared integration not found", "data": None})
    return {"success": True, "data": integration}


@app.post("/crm/{bid}/leadsquared/integration")
def crm_lsq_save_integration(bid: str, payload: Dict[str, Any], user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    _require_business_admin_or_master(bid, user)
    access_key = str(payload.get("lsq_access_key", "")).strip()
    secret_key = str(payload.get("lsq_secret_key", "")).strip()
    api_host = str(payload.get("lsq_api_host", "")).strip() or "https://api-in21.leadsquared.com/v2/"
    is_active = bool(payload.get("is_active", True))
    if not access_key or not secret_key:
        raise HTTPException(status_code=400, detail="lsq_access_key and lsq_secret_key are required")
    db_handler.upsert_crm_integration(
        bid=bid,
        provider="leadsquared",
        access_key=access_key,
        secret_key=secret_key,
        api_host=api_host,
        is_active=is_active,
        config=payload.get("config") or {},
    )
    return {"success": True, "message": "LeadSquared integration saved successfully"}


@app.post("/crm/{bid}/leadsquared/integration/test")
def crm_lsq_test_integration(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    creds = db_handler.get_crm_credentials(bid, "leadsquared")
    if not creds or not creds.get("access_key") or not creds.get("secret_key"):
        return JSONResponse(status_code=404, content={"success": False, "message": "LeadSquared integration not configured"})
    service = LeadSquaredService(access_key=creds["access_key"], secret_key=creds["secret_key"], api_host=creds.get("api_host"))
    result = service.test_connection()
    if result.get("success"):
        db_handler.mark_crm_integration_tested(bid, "leadsquared")
        return result
    return JSONResponse(status_code=400, content=result)


@app.delete("/crm/{bid}/leadsquared/integration")
def crm_lsq_delete_integration(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    _require_business_admin_or_master(bid, user)
    removed = db_handler.delete_crm_integration(bid, "leadsquared")
    return {"success": True, "message": "LeadSquared integration deleted successfully" if removed else "LeadSquared integration did not exist"}


@app.post("/crm/{bid}/leadsquared/leads/search")
def crm_lsq_search_leads(bid: str, payload: Dict[str, Any], user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    service = _get_lsq_service_for_bid_or_error(bid)
    return service.search_leads(payload or {})


@app.get("/crm/{bid}/leadsquared/leads/{lead_id}")
def crm_lsq_get_lead(bid: str, lead_id: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    service = _get_lsq_service_for_bid_or_error(bid)
    result = service.get_lead(lead_id)
    return JSONResponse(status_code=(200 if result.get("success") else 400), content=result)


@app.post("/crm/{bid}/leadsquared/leads")
def crm_lsq_create_lead(bid: str, payload: Dict[str, Any], user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    if not _crm_write_allowed():
        raise HTTPException(status_code=403, detail="CRM write operations are disabled")
    service = _get_lsq_service_for_bid_or_error(bid)
    result = service.create_lead(payload or {})
    return JSONResponse(status_code=(200 if result.get("success") else 400), content=result)


@app.put("/crm/{bid}/leadsquared/leads/{lead_id}")
def crm_lsq_update_lead(bid: str, lead_id: str, payload: Dict[str, Any], user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    if not _crm_write_allowed():
        raise HTTPException(status_code=403, detail="CRM write operations are disabled")
    service = _get_lsq_service_for_bid_or_error(bid)
    result = service.update_lead(lead_id, payload or {})
    return JSONResponse(status_code=(200 if result.get("success") else 400), content=result)


@app.delete("/crm/{bid}/leadsquared/leads/{lead_id}")
def crm_lsq_delete_lead(bid: str, lead_id: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    if not _crm_write_allowed():
        raise HTTPException(status_code=403, detail="CRM write operations are disabled")
    service = _get_lsq_service_for_bid_or_error(bid)
    result = service.delete_lead(lead_id)
    return JSONResponse(status_code=(200 if result.get("success") else 400), content=result)


@app.get("/audio/proxy/{bid}/{callid}")
def audio_proxy(bid: str, callid: str):
    call = db_handler.get_call_by_id(bid, callid)
    file_url = (call or {}).get("fileUrl")
    if not file_url:
        raise HTTPException(status_code=404, detail="Audio file not found")
    response = requests.get(file_url, stream=True, timeout=60)
    if response.status_code != 200:
        raise HTTPException(status_code=502, detail="Failed to fetch audio file")

    def _iter():
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                yield chunk

    return StreamingResponse(
        _iter(),
        media_type=response.headers.get("Content-Type", "audio/mpeg"),
        headers={"Accept-Ranges": "bytes", "Cache-Control": "public, max-age=3600"},
    )


@app.get("/admin/users")
def admin_users(user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    users, status_code = auth_handler.get_all_users()
    return JSONResponse(status_code=status_code, content=jsonable_encoder({"users": users}))


@app.post("/admin/users/create")
async def admin_users_create(payload: Dict[str, Any], request: Request, user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    for field in ["username", "email", "password"]:
        if not payload.get(field):
            raise HTTPException(status_code=400, detail=f"{field} is required")
    result, status_code = auth_handler.create_user(
        username=payload["username"],
        email=payload["email"],
        password=payload["password"],
        full_name=payload.get("full_name"),
        role=payload.get("role", "user"),
        is_master=payload.get("is_master", False),
    )
    if status_code == 201:
        user_id = result["id"]
        default_role = payload.get("role", "user")
        if payload.get("businesses"):
            for business in payload["businesses"]:
                bid = business.get("bid") if isinstance(business, dict) else business
                role = business.get("role", default_role) if isinstance(business, dict) else default_role
                auth_handler.assign_business_access(user_id, bid, role)
        auth_handler.log_activity(
            user_id=user.get("id"),
            username=user.get("username"),
            activity_type="create_user",
            description=f"Created new user: {payload['username']}",
            ip_address=request.client.host if request.client else None,
            user_agent=request.headers.get("User-Agent"),
        )
    return JSONResponse(status_code=status_code, content=result)


@app.get("/admin/businesses")
def admin_businesses():
    businesses, status_code = auth_handler.get_all_businesses()
    return JSONResponse(status_code=status_code, content=jsonable_encoder({"businesses": businesses}))


@app.post("/admin/businesses/create")
async def admin_businesses_create(request: Request, user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    data = await request.json()
    if not data.get("bid") or not data.get("name"):
        raise HTTPException(status_code=400, detail="bid and name are required")

    result, status_code = auth_handler.create_business(bid=data["bid"], name=data["name"], description=data.get("description"))
    if status_code == 201:
        auth_handler.log_activity(
            user_id=user.get("id"),
            username=user.get("username"),
            activity_type="create_business",
            description=f"Created new business: {data['name']} (ID: {data['bid']})",
            ip_address=request.client.host if request.client else None,
            user_agent=request.headers.get("User-Agent"),
        )
    return JSONResponse(status_code=status_code, content=result)


@app.post("/admin/users/{user_id}/businesses")
def admin_assign_business(user_id: int, payload: Dict[str, Any], user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    if not payload.get("bid"):
        raise HTTPException(status_code=400, detail="bid is required")
    result, status_code = auth_handler.assign_business_access(user_id=user_id, bid=payload["bid"], role=payload.get("role", "user"))
    return JSONResponse(status_code=status_code, content=result)


@app.post("/admin/onboard-business")
async def admin_onboard_business(request: Request, user: Dict[str, Any] = Depends(_auth_user)):
    """One-shot business onboarding: create business + login credentials + 4 tables."""
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    data = await request.json()
    name = (data.get("name") or "").strip()
    if not name:
        raise HTTPException(status_code=400, detail="name is required")

    # Auto-generate BID if not provided: max existing numeric BID + 1
    bid = str(data.get("bid") or "").strip()
    if not bid:
        try:
            with db_handler.get_connection() as conn:
                with conn.cursor() as cur:
                    cur.execute("SELECT bid FROM businesses")
                    existing = [r["bid"] for r in cur.fetchall()]
            numeric = [int(b) for b in existing if str(b).isdigit()]
            bid = str(max(numeric) + 1) if numeric else "1000"
        except Exception:
            import random
            bid = str(random.randint(2000, 9999))

    # Create business (also creates raw_calls, sarvamresponse, callanalytics tables)
    biz_result, biz_status = auth_handler.create_business(bid=bid, name=name, description=data.get("description"))
    if biz_status not in (200, 201):
        raise HTTPException(status_code=biz_status, detail=biz_result.get("error", "Failed to create business"))

    # Create 4th table: {bid}_bant
    try:
        db_handler.ensure_bant_table(bid)
    except Exception as e:
        logger.warning(f"Could not create bant table for {bid}: {e}")

    # Derive username: sanitize business name to alphanumeric+underscore
    base_username = re.sub(r"[^a-z0-9]", "_", name.lower().strip())
    base_username = re.sub(r"_+", "_", base_username).strip("_") or bid
    username = base_username
    # Ensure username uniqueness
    suffix = 1
    while True:
        try:
            with db_handler.get_connection() as conn:
                with conn.cursor() as cur:
                    cur.execute("SELECT id FROM business_users WHERE username = %s", (username,))
                    if not cur.fetchone():
                        break
        except Exception:
            break
        username = f"{base_username}_{suffix}"
        suffix += 1

    default_password = "1234"
    email = f"{username}@pcaa.local"

    user_result, user_status = auth_handler.create_user(
        username=username,
        email=email,
        password=default_password,
        full_name=name,
        role="admin",
        is_master=False,
    )
    if user_status not in (200, 201):
        raise HTTPException(status_code=user_status, detail=user_result.get("error", "Failed to create user"))

    new_user_id = user_result["id"]
    auth_handler.assign_business_access(user_id=new_user_id, bid=bid, role="admin")

    auth_handler.log_activity(
        user_id=user.get("id"), username=user.get("username"),
        activity_type="onboard_business",
        description=f"Onboarded business: {name} (BID: {bid}), user: {username}",
        ip_address=request.client.host if request.client else None,
        user_agent=request.headers.get("User-Agent"),
    )

    return JSONResponse(status_code=201, content={
        "bid": bid,
        "name": name,
        "description": data.get("description"),
        "tables_created": [f"{bid}_raw_calls", f"{bid}_sarvamresponse", f"{bid}_callanalytics", f"{bid}_bant"],
        "credentials": {
            "username": username,
            "password": default_password,
            "email": email,
        },
    })


@app.post("/admin/businesses/{bid}/orchestration/start")
async def start_business_orchestration(bid: str, request: Request, user: Dict[str, Any] = Depends(_auth_user)):
    """Generate and start a per-BID orchestration loop as a background process."""
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    data = await request.json()
    transcribe_limit = int(data.get("transcribe_limit", 20))
    analyze_limit = int(data.get("analyze_limit", 20))
    max_per_day = int(data.get("max_per_day", 0))
    skip_ingest = bool(data.get("skip_ingest", False))

    base_dir = os.path.dirname(os.path.abspath(__file__))
    script_path = os.path.join(base_dir, f"orchestrator_loop_{bid}.sh")
    lockfile = f"/tmp/orchestrator_loop_{bid}.lock"

    # Check if already running
    already_running, existing_pid, _ = _check_process(f"orchestrator_loop_{bid}.sh")
    if already_running:
        return JSONResponse(content={"started": False, "reason": "already_running", "pid": existing_pid})

    skip_flag = "--skip-ingest" if skip_ingest else ""
    max_per_day_flag = f"--max-per-day {max_per_day}" if max_per_day > 0 else ""

    script_content = f"""#!/bin/bash
# Auto-generated orchestration loop for BID {bid}
cd {base_dir}

LOCKFILE="{lockfile}"
if [ -f "$LOCKFILE" ]; then
    PID=$(cat "$LOCKFILE")
    if ps -p $PID > /dev/null 2>&1; then
        echo "Orchestrator for BID {bid} already running (PID: $PID)."
        exit 1
    fi
fi
echo $$ > "$LOCKFILE"
trap "rm -f $LOCKFILE" EXIT

echo "Starting orchestration loop for BID {bid}."
while true; do
    echo "--- [$(date)] BID {bid} orchestration run ---"
    python3 orchestrate_pipeline.py \\
        --bid {bid} \\
        --ingest-limit 0 \\
        --transcribe-limit {transcribe_limit} \\
        --analyze-limit {analyze_limit} \\
        {max_per_day_flag} \\
        {skip_flag}
    echo "--- [$(date)] BID {bid} sleeping 5 min ---"
    sleep 300
done
"""
    with open(script_path, "w") as f:
        f.write(script_content)
    import stat as _stat
    os.chmod(script_path, os.stat(script_path).st_mode | _stat.S_IEXEC | _stat.S_IXGRP | _stat.S_IXOTH)

    proc = _subprocess.Popen(
        ["bash", script_path],
        cwd=base_dir,
        stdout=_subprocess.DEVNULL,
        stderr=_subprocess.DEVNULL,
        start_new_session=True,
    )

    auth_handler.log_activity(
        user_id=user.get("id"), username=user.get("username"),
        activity_type="start_orchestration",
        description=f"Started orchestration loop for BID {bid} (PID {proc.pid}, transcribe={transcribe_limit}, analyze={analyze_limit}, max_per_day={max_per_day})",
        ip_address=request.client.host if request.client else None,
        user_agent=request.headers.get("User-Agent"),
    )

    return JSONResponse(content={"started": True, "pid": proc.pid, "bid": bid, "script": script_path})


@app.get("/admin/businesses/{bid}/orchestration/status")
def get_business_orchestration_status(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    """Check if the orchestration loop for a specific BID is running."""
    running, pid, cmd = _check_process(f"orchestrator_loop_{bid}.sh")
    lockfile = f"/tmp/orchestrator_loop_{bid}.lock"
    base_dir = os.path.dirname(os.path.abspath(__file__))
    script_path = os.path.join(base_dir, f"orchestrator_loop_{bid}.sh")
    return {
        "bid": bid,
        "running": running,
        "pid": pid,
        "script_exists": os.path.exists(script_path),
    }


@app.post("/admin/businesses/{bid}/orchestration/stop")
def stop_business_orchestration(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    """Stop the orchestration loop for a specific BID."""
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    running, pid, _ = _check_process(f"orchestrator_loop_{bid}.sh")
    if not running or not pid:
        return JSONResponse(content={"stopped": False, "reason": "not_running"})
    try:
        import signal
        os.kill(pid, signal.SIGTERM)
        auth_handler.log_activity(
            user_id=user.get("id"), username=user.get("username"),
            activity_type="stop_orchestration",
            description=f"Stopped orchestration loop for BID {bid} (PID {pid})",
            ip_address=None, user_agent=None,
        )
        return JSONResponse(content={"stopped": True, "pid": pid})
    except ProcessLookupError:
        return JSONResponse(content={"stopped": False, "reason": "process_not_found"})
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.get("/admin/activity-log")
def admin_activity_log(
    limit: int = Query(default=100),
    user_id: Optional[int] = Query(default=None),
    activity_type: Optional[str] = Query(default=None),
    user: Dict[str, Any] = Depends(_auth_user),
):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    logs, status_code = auth_handler.get_activity_log(limit=limit, user_id=user_id, activity_type=activity_type)
    return JSONResponse(status_code=status_code, content={"logs": logs})


@app.post("/api/embed/token")
def embed_token(payload: Dict[str, Any], request: Request):
    api_key = (payload or {}).get("api_key")
    bid = (payload or {}).get("bid")
    if not api_key or not bid:
        raise HTTPException(status_code=400, detail="api_key and bid are required")
    key_record = auth_handler.validate_api_key(api_key, str(bid))
    if not key_record:
        raise HTTPException(status_code=401, detail="Invalid API key or unauthorized for this business")
    origin = request.headers.get("Origin")
    if key_record.get("allowed_origins") and origin:
        allowed = [o.strip() for o in str(key_record["allowed_origins"]).split(",")]
        if origin not in allowed:
            raise HTTPException(status_code=403, detail="Origin not allowed")
    token = auth_handler.generate_embed_token(bid=str(bid), partner_name=key_record["partner_name"], api_key_id=key_record["id"])
    embed_base_url = CONFIG.get("EMBED_BASE_URL", "http://localhost:6174")
    return {"token": token, "embed_url": f"{embed_base_url}/#/embed?token={token}", "expires_in": 3600, "bid": str(bid), "partner_name": key_record["partner_name"]}


@app.get("/api/embed/validate")
def embed_validate(token: str = Query(...)):
    result = auth_handler.validate_embed_token(token)
    if not result:
        raise HTTPException(status_code=401, detail="Invalid or expired embed token")
    return {"valid": True, "bid": result["bid"], "partner_name": result["partner_name"]}


@app.post("/admin/embed-keys")
def admin_embed_keys_create(payload: Dict[str, Any], request: Request, user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    bid = payload.get("bid")
    partner_name = payload.get("partner_name")
    if not bid or not partner_name:
        raise HTTPException(status_code=400, detail="bid and partner_name are required")
    result, status_code = auth_handler.create_embed_api_key(
        bid=str(bid),
        partner_name=partner_name,
        allowed_origins=payload.get("allowed_origins"),
        expires_at=payload.get("expires_at"),
    )
    auth_handler.log_activity(
        user_id=user["id"],
        username=user["username"],
        activity_type="create_embed_key",
        description=f"Created embed API key for business {bid} (partner: {partner_name})",
        ip_address=request.client.host if request.client else None,
        user_agent=request.headers.get("User-Agent"),
    )
    return JSONResponse(status_code=status_code, content=result)


@app.get("/admin/embed-keys")
def admin_embed_keys_list(bid: Optional[str] = Query(default=None), user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    keys, status_code = auth_handler.list_embed_api_keys(bid=bid)
    return JSONResponse(status_code=status_code, content={"keys": keys})


@app.delete("/admin/embed-keys/{key_id}")
def admin_embed_keys_revoke(key_id: int, request: Request, user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    result, status_code = auth_handler.revoke_embed_api_key(key_id)
    auth_handler.log_activity(
        user_id=user["id"],
        username=user["username"],
        activity_type="revoke_embed_key",
        description=f"Revoked embed API key {key_id}",
        ip_address=request.client.host if request.client else None,
        user_agent=request.headers.get("User-Agent"),
    )
    return JSONResponse(status_code=status_code, content=result)


CALL_SYNC_CACHE_TTL_SECONDS = int(os.getenv("CALL_SYNC_CACHE_TTL_SECONDS", "3600"))
CALL_SYNC_CACHE_SCHEMA_VERSION = "v5"


def _resolve_source_call_table(cursor, bid: str, table_kind: str):
    candidates = [f"{bid}_callhistory", f"{bid}_call_history"] if table_kind == "history" else [f"{bid}_callarchive", f"{bid}_call_archive"]
    for candidate in candidates:
        cursor.execute("SHOW TABLES LIKE %s", (candidate,))
        if cursor.fetchone():
            return candidate
    return None


def _table_has_column(cursor, table_name: str, column_name: str) -> bool:
    cursor.execute(f"SHOW COLUMNS FROM `{table_name}` LIKE %s", (column_name,))
    return cursor.fetchone() is not None


def _serialize_call_sync_row(row: Dict[str, Any]) -> Dict[str, Any]:
    output = {}
    for key, value in row.items():
        output[key] = value.isoformat() if isinstance(value, datetime) else value
    return output


def _fetch_source_calls(bid: str, table_kind: str, date_from: Optional[str] = None, date_to: Optional[str] = None, limit: int = 500):
    source_config = get_sync_source_db_config()
    conn = pymysql.connect(**source_config)
    try:
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        table_name = _resolve_source_call_table(cursor, bid, table_kind)
        if not table_name:
            return [], None
        has_direction = _table_has_column(cursor, table_name, "direction")
        has_customer_callinfo = _table_has_column(cursor, table_name, "customer_callinfo")
        has_callfrom = _table_has_column(cursor, table_name, "callfrom")
        has_callto = _table_has_column(cursor, table_name, "callto")
        has_clicktocall = _table_has_column(cursor, table_name, "clicktocalldid")

        customer_candidates = []
        if has_customer_callinfo:
            customer_candidates.append("NULLIF(TRIM(CAST(customer_callinfo AS CHAR)), '')")
        if has_direction and has_callto and has_callfrom:
            customer_candidates.append(
                "CASE "
                "WHEN LOWER(TRIM(CAST(direction AS CHAR))) = 'outbound' THEN NULLIF(TRIM(CAST(callto AS CHAR)), '') "
                "WHEN LOWER(TRIM(CAST(direction AS CHAR))) = 'inbound' THEN NULLIF(TRIM(CAST(callfrom AS CHAR)), '') "
                "ELSE NULL END"
            )
        if has_callto:
            customer_candidates.append("NULLIF(TRIM(CAST(callto AS CHAR)), '')")
        if has_clicktocall:
            customer_candidates.append("NULLIF(TRIM(CAST(clicktocalldid AS CHAR)), '')")
        customer_expr = f"COALESCE({', '.join(customer_candidates)})" if customer_candidates else "NULL"

        query = f"""
            SELECT
                callid, %s as bid, agentname, groupname, starttime, endtime, dialstatus, direction, filename,
                {customer_expr} as customer_callinfo, countrycode, emp_phone, clicktocalldid
            FROM `{table_name}`
            WHERE 1 = 1
        """
        params: List[Any] = [str(bid)]
        if date_from and date_to:
            query += " AND DATE(starttime) BETWEEN %s AND %s"
            params.extend([date_from, date_to])
        query += " ORDER BY starttime DESC LIMIT %s"
        params.append(int(limit))
        cursor.execute(query, tuple(params))
        rows = cursor.fetchall() or []
        return [_serialize_call_sync_row(r) for r in rows], table_name
    finally:
        conn.close()


def _fetch_source_count(bid: str, table_kind: str, date_from: Optional[str] = None, date_to: Optional[str] = None) -> int:
    source_config = get_sync_source_db_config()
    conn = pymysql.connect(**source_config)
    try:
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        table_name = _resolve_source_call_table(cursor, bid, table_kind)
        if not table_name:
            return 0
        query = f"SELECT COUNT(*) as total FROM `{table_name}` WHERE 1 = 1"
        params: List[Any] = []
        if date_from and date_to:
            query += " AND DATE(starttime) BETWEEN %s AND %s"
            params.extend([date_from, date_to])
        cursor.execute(query, tuple(params))
        row = cursor.fetchone() or {}
        return int(row.get("total") or 0)
    finally:
        conn.close()


@app.get("/sync/cache/{bid}/history")
def sync_cache_history(
    bid: str,
    limit: int = Query(default=300),
    refresh: bool = Query(default=False),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    _require_business_admin_or_master(bid, user)
    cache_key = f"callsync:{CALL_SYNC_CACHE_SCHEMA_VERSION}:{bid}:history:limit:{limit}"
    if not refresh:
        cached = db_handler.get_call_sync_cache(cache_key)
        if cached:
            cached["cache_hit"] = True
            return cached
    records, table_name = _fetch_source_calls(bid, "history", limit=limit)
    if not table_name:
        return {
            "source": "call_history",
            "table": None,
            "count": 0,
            "records": [],
            "cached_at": datetime.now().isoformat(),
            "cache_hit": False,
            "message": "Source call history table not found for this business",
            "expected_tables": [f"{bid}_call_history", f"{bid}_callhistory"],
            "configured_source_db": {"host": get_sync_source_db_config().get("host"), "database": get_sync_source_db_config().get("database")},
        }
    payload = {"source": "call_history", "table": table_name, "count": len(records), "records": records, "cached_at": datetime.now().isoformat(), "cache_hit": False}
    db_handler.upsert_call_sync_cache(
        cache_key=cache_key,
        bid=bid,
        source="call_history",
        payload=payload,
        ttl_seconds=CALL_SYNC_CACHE_TTL_SECONDS,
    )
    return payload


@app.get("/sync/cache/{bid}/archive")
def sync_cache_archive(
    bid: str,
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    limit: int = Query(default=500),
    refresh: bool = Query(default=False),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    _require_business_admin_or_master(bid, user)
    range_key = f"{date_from}:{date_to}" if date_from and date_to else "recent"
    cache_key = f"callsync:{CALL_SYNC_CACHE_SCHEMA_VERSION}:{bid}:archive:{range_key}:limit:{limit}"
    if not refresh:
        cached = db_handler.get_call_sync_cache(cache_key)
        if cached:
            cached["cache_hit"] = True
            return cached
    records, table_name = _fetch_source_calls(bid, "archive", date_from=date_from, date_to=date_to, limit=limit)
    if not table_name:
        return {
            "source": "call_archive",
            "table": None,
            "count": 0,
            "records": [],
            "date_from": date_from,
            "date_to": date_to,
            "cached_at": datetime.now().isoformat(),
            "cache_hit": False,
            "message": "Source call archive table not found for this business",
            "expected_tables": [f"{bid}_callarchive", f"{bid}_call_archive"],
            "configured_source_db": {"host": get_sync_source_db_config().get("host"), "database": get_sync_source_db_config().get("database")},
        }
    payload = {
        "source": "call_archive",
        "table": table_name,
        "count": len(records),
        "records": records,
        "date_from": date_from,
        "date_to": date_to,
        "cached_at": datetime.now().isoformat(),
        "cache_hit": False,
    }
    db_handler.upsert_call_sync_cache(
        cache_key=cache_key,
        bid=bid,
        source="call_archive",
        payload=payload,
        ttl_seconds=CALL_SYNC_CACHE_TTL_SECONDS,
    )
    return payload


@app.get("/sync/check-count/{bid}")
def sync_check_count(
    bid: str,
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    _require_business_admin_or_master(bid, user)
    history_total = _fetch_source_count(bid, "history")
    archive_total = _fetch_source_count(bid, "archive", date_from=date_from, date_to=date_to) if date_from and date_to else 0
    return {
        "callhistory": {"total": history_total},
        "callarchive": {"total": archive_total},
        "total_count": int(history_total + archive_total),
        "date_from": date_from,
        "date_to": date_to,
    }


@app.post("/sync/calls/{bid}")
def sync_calls(
    bid: str,
    payload: Dict[str, Any],
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    _require_business_admin_or_master(bid, user)
    date_from = payload.get("date_from")
    date_to = payload.get("date_to")
    limit = int(payload.get("limit", 500))
    if date_from and date_to:
        records, table_name = _fetch_source_calls(bid, "archive", date_from=date_from, date_to=date_to, limit=limit)
        source = "call_archive"
        cache_key = f"callsync:{CALL_SYNC_CACHE_SCHEMA_VERSION}:{bid}:archive:{date_from}:{date_to}:limit:{limit}"
    else:
        records, table_name = _fetch_source_calls(bid, "history", limit=limit)
        source = "call_history"
        cache_key = f"callsync:{CALL_SYNC_CACHE_SCHEMA_VERSION}:{bid}:history:limit:{limit}"

    payload_obj = {
        "source": source,
        "table": table_name,
        "count": len(records),
        "records": records,
        "date_from": date_from,
        "date_to": date_to,
        "cached_at": datetime.now().isoformat(),
    }
    if table_name:
        db_handler.upsert_call_sync_cache(
            cache_key=cache_key,
            bid=bid,
            source=source,
            payload=payload_obj,
            ttl_seconds=CALL_SYNC_CACHE_TTL_SECONDS,
        )

    if not table_name:
        return {"message": f"Source table not found for {source}", "cached_count": 0, "source": source, "table": None, "cache_key": cache_key, "stored_in": "mysql"}
    return {"message": f"Successfully cached {len(records)} calls from {source}", "cached_count": len(records), "source": source, "table": table_name, "cache_key": cache_key, "stored_in": "mysql"}


# ── TELEPHONY INTEGRATION ENDPOINTS ──────────────────────────────────────────

# Mcube 2.0 source database (10.40.180.35/mcube_cl1)
_MCUBE2_DB_CONFIG: Dict[str, Any] = {
    "host": "10.40.180.35",
    "port": 3306,
    "user": "root",
    "password": "4Tq73tXMcUbEJ5Q3t3",
    "database": "mcube_cl1",
    "charset": "utf8mb4",
}


def _fetch_source_calls_with_config(
    db_config: Dict[str, Any],
    source_bid: str,
    table_kind: str,
    date_from: Optional[str] = None,
    date_to: Optional[str] = None,
    limit: int = 500,
    directions: Optional[List[str]] = None,
    dialstatuses: Optional[List[str]] = None,
):
    """Like _fetch_source_calls but uses an explicit DB config dict."""
    conn = pymysql.connect(**db_config)
    try:
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        table_name = _resolve_source_call_table(cursor, source_bid, table_kind)
        if not table_name:
            # Also try archive/history fallback
            alt_kind = "archive" if table_kind == "history" else "history"
            table_name = _resolve_source_call_table(cursor, source_bid, alt_kind)
        if not table_name:
            return [], None
        has_direction = _table_has_column(cursor, table_name, "direction")
        has_customer_callinfo = _table_has_column(cursor, table_name, "customer_callinfo")
        has_callfrom = _table_has_column(cursor, table_name, "callfrom")
        has_callto = _table_has_column(cursor, table_name, "callto")
        has_clicktocall = _table_has_column(cursor, table_name, "clicktocalldid")

        customer_candidates = []
        if has_customer_callinfo:
            customer_candidates.append("NULLIF(TRIM(CAST(customer_callinfo AS CHAR)), '')")
        if has_direction and has_callto and has_callfrom:
            customer_candidates.append(
                "CASE "
                "WHEN LOWER(TRIM(CAST(direction AS CHAR))) = 'outbound' THEN NULLIF(TRIM(CAST(callto AS CHAR)), '') "
                "WHEN LOWER(TRIM(CAST(direction AS CHAR))) = 'inbound' THEN NULLIF(TRIM(CAST(callfrom AS CHAR)), '') "
                "ELSE NULL END"
            )
        if has_callto:
            customer_candidates.append("NULLIF(TRIM(CAST(callto AS CHAR)), '')")
        if has_clicktocall:
            customer_candidates.append("NULLIF(TRIM(CAST(clicktocalldid AS CHAR)), '')")
        customer_expr = f"COALESCE({', '.join(customer_candidates)})" if customer_candidates else "NULL"

        query = f"""
            SELECT callid, %s as bid, agentname, groupname, starttime, endtime,
                   dialstatus, direction, filename,
                   {customer_expr} as customer_callinfo,
                   countrycode, emp_phone, clicktocalldid
            FROM `{table_name}` WHERE 1=1
        """
        params: List[Any] = [str(source_bid)]
        if date_from and date_to:
            query += " AND DATE(starttime) BETWEEN %s AND %s"
            params.extend([date_from, date_to])
        if dialstatuses:
            placeholders = ", ".join(["%s"] * len(dialstatuses))
            query += f" AND dialstatus IN ({placeholders})"
            params.extend(dialstatuses)
        if directions and has_direction:
            # direction values are mixed case (Outbound / inbound) — do case-insensitive match
            placeholders = ", ".join(["%s"] * len(directions))
            query += f" AND LOWER(direction) IN ({placeholders})"
            params.extend([d.lower() for d in directions])
        query += " ORDER BY starttime DESC LIMIT %s"
        params.append(int(limit))
        cursor.execute(query, tuple(params))
        rows = cursor.fetchall() or []
        return [_serialize_call_sync_row(r) for r in rows], table_name
    finally:
        conn.close()


@app.get("/telephony/{bid}/integrations")
def telephony_list(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    integrations = db_handler.get_telephony_integrations(bid)
    return {"integrations": integrations}


@app.post("/telephony/{bid}/integrations")
async def telephony_connect(bid: str, payload: Dict[str, Any], user: Dict[str, Any] = Depends(_auth_user)):
    _require_business_admin_or_master(bid, user)
    provider = payload.get("provider")
    if not provider:
        raise HTTPException(status_code=400, detail="provider is required")
    source_bid = str(payload.get("source_bid") or bid)

    config = {}
    if provider == "mcube2":
        # Verify the BID exists on the Mcube 2.0 server
        try:
            conn = pymysql.connect(**_MCUBE2_DB_CONFIG, connect_timeout=5)
            cur = conn.cursor()
            cur.execute("SHOW TABLES LIKE %s", (f"{source_bid}_callhistory",))
            found = cur.fetchone()
            conn.close()
            if not found:
                raise HTTPException(
                    status_code=400,
                    detail=f"Table {source_bid}_callhistory not found on Mcube 2.0 server. Check the source BID."
                )
        except HTTPException:
            raise
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"Could not connect to Mcube 2.0 server: {str(e)}")
        config = {k: v for k, v in _MCUBE2_DB_CONFIG.items() if k != "charset"}

    db_handler.save_telephony_integration(bid, provider, source_bid=source_bid, config=config)
    return {"success": True, "message": f"{provider} connected for BID {bid} (server: {_MCUBE2_DB_CONFIG['host']})"}


@app.delete("/telephony/{bid}/integrations/{provider}")
def telephony_disconnect(bid: str, provider: str, user: Dict[str, Any] = Depends(_auth_user)):
    _require_business_admin_or_master(bid, user)
    db_handler.delete_telephony_integration(bid, provider)
    return {"success": True, "message": f"{provider} disconnected"}


@app.get("/telephony/{bid}/preview")
def telephony_preview(
    bid: str,
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    limit: int = Query(default=300, le=1000),
    direction: Optional[str] = Query(default=None, description="Comma-separated directions, e.g. Outbound,inbound"),
    dialstatus: Optional[str] = Query(default=None, description="Comma-separated statuses, e.g. ANSWER,BUSY. Omit for all statuses."),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    _require_business_admin_or_master(bid, user)

    integrations = db_handler.get_telephony_integrations(bid)
    active = [i for i in integrations if i.get("is_active")]
    if not active:
        return {"records": [], "count": 0, "provider": None,
                "error": "No telephony provider connected. Go to Telephony Integration and connect one first."}

    integration = active[0]
    provider = integration["provider"]
    source_bid = integration.get("source_bid") or bid

    # Parse comma-separated filter params
    directions = [d.strip() for d in direction.split(",") if d.strip()] if direction else None
    dialstatuses = [s.strip() for s in dialstatus.split(",") if s.strip()] if dialstatus else None

    if provider == "mcube2":
        table_kind = "archive" if (date_from and date_to) else "history"
        try:
            records, table_name = _fetch_source_calls_with_config(
                _MCUBE2_DB_CONFIG, source_bid, table_kind, date_from, date_to, limit,
                directions=directions, dialstatuses=dialstatuses,
            )
            return {
                "records": records,
                "count": len(records),
                "provider": provider,
                "source_bid": source_bid,
                "table": table_name,
                "server": _MCUBE2_DB_CONFIG["host"],
                "date_from": date_from,
                "date_to": date_to,
                "filters": {"direction": directions, "dialstatus": dialstatuses},
            }
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"Failed to fetch from Mcube 2.0: {str(e)}")

    if provider == "mcube_classic":
        try:
            result = db_handler.get_raw_calls_preview(source_bid, date_from, date_to, limit)
            return {"records": result, "count": len(result), "provider": provider,
                    "source_bid": source_bid, "table": f"{source_bid}_raw_calls"}
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"Failed to fetch from Mcube Classic: {str(e)}")

    raise HTTPException(status_code=400, detail=f"Preview not supported for provider: {provider}")


@app.post("/telephony/{bid}/sync-to-db")
async def telephony_sync_to_db(
    bid: str,
    payload: Dict[str, Any],
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    _require_business_admin_or_master(bid, user)
    records = payload.get("records", [])
    if not records:
        return {"inserted": 0, "skipped": 0, "message": "No records provided"}
    result = db_handler.insert_raw_calls_from_telephony(bid, records)
    result["message"] = f"Pushed {result['inserted']} records to {bid}_raw_calls"
    return result


# ─────────────────────────────────────────────────────────────────────────────

@app.post("/sync/upload/{bid}")
async def sync_upload_raw_calls(
    bid: str,
    request: Request,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    _require_business_admin_or_master(bid, user)
    # Multipart parsing requires python-multipart dependency.
    raise HTTPException(
        status_code=501,
        detail="Excel multipart upload requires python-multipart in this runtime. Use admin JSON business create path or install python-multipart.",
    )


@app.get("/transcription/calls/{bid}")
def transcription_calls(
    bid: str,
    status: str = Query(default="0"),
    groupname: Optional[str] = Query(default=None),
    agentname: Optional[str] = Query(default=None),
    start_date: Optional[str] = Query(default=None),
    end_date: Optional[str] = Query(default=None),
):
    conn = _db_conn()
    try:
        cursor = conn.cursor()
        where = ["call_status = 'ANSWER'"]
        params: List[Any] = []
        if status != "all":
            where.append("status = %s")
            params.append(status)
        if groupname:
            where.append("groupname = %s")
            params.append(groupname)
        if agentname:
            where.append("agentname = %s")
            params.append(agentname)
        if start_date:
            where.append("DATE(call_starttime) >= %s")
            params.append(start_date)
        if end_date:
            where.append("DATE(call_starttime) <= %s")
            params.append(end_date)
        query = f"""
            SELECT callid, bid, agentname, groupname, call_starttime, call_endtime, call_status, status,
                   transcription_status, transcription_requested, selected_for_processing
            FROM `{bid}_raw_calls`
            WHERE {" AND ".join(where)}
            ORDER BY call_starttime DESC
            LIMIT 500
        """
        cursor.execute(query, tuple(params))
        calls = cursor.fetchall() or []
        return {"calls": _to_json_safe(calls), "count": len(calls)}
    finally:
        conn.close()


@app.post("/transcription/trigger/{bid}")
def transcription_trigger(bid: str, payload: Dict[str, Any]):
    callids = payload.get("callids", [])
    if not callids:
        raise HTTPException(status_code=400, detail="No call IDs provided")
    conn = _db_conn()
    try:
        cursor = conn.cursor()
        placeholders = ",".join(["%s"] * len(callids))
        query = f"""
            UPDATE `{bid}_raw_calls`
            SET transcription_requested = 1, transcription_status = 'pending', selected_for_processing = 1
            WHERE callid IN ({placeholders})
        """
        cursor.execute(query, callids)
        affected = cursor.rowcount
    finally:
        conn.close()
    return {"message": f"Successfully queued {affected} calls for transcription", "queued_count": affected}


@app.post("/transcription/trigger-combined/{bid}")
def transcription_trigger_combined(bid: str, payload: Dict[str, Any]):
    callids = payload.get("callids", [])
    t = transcription_trigger(bid, {"callids": callids})
    a = analysis_trigger(bid, {"callids": callids})
    return {"message": "Combined trigger executed", "transcription": t, "analysis": a}


@app.get("/transcription/status/{bid}/{batch_id}")
def transcription_status(bid: str, batch_id: str):
    return {"bid": str(bid), "batch_id": str(batch_id), "status": "completed", "message": "Legacy batch status endpoint is now immediate"}


@app.post("/analysis/trigger/{bid}")
def analysis_trigger(bid: str, payload: Dict[str, Any]):
    callids = payload.get("callids", [])
    if not callids:
        raise HTTPException(status_code=400, detail="No call IDs provided")
    try:
        from analyze_calls_with_parameters import CallAnalyzer
        analyzer = CallAnalyzer(CONFIG)
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to initialize analyzer: {e}")

    success_count = 0
    error_count = 0
    errors: List[str] = []
    for callid in callids:
        try:
            call_data = db_handler.get_raw_call_details(bid, callid)
            if not call_data:
                errors.append(f"{callid}: Call not found")
                error_count += 1
                continue
            if call_data.get("call_status") != "ANSWER":
                errors.append(f"{callid}: Call not answered")
                error_count += 1
                continue
            transcript = call_data.get("transcripts")
            if not transcript:
                errors.append(f"{callid}: No transcript available")
                error_count += 1
                continue
            speaker_segments = call_data.get("speaker_segments")
            if speaker_segments and isinstance(speaker_segments, str):
                speaker_segments = json.loads(speaker_segments)
            actual_duration = call_data.get("duration") or call_data.get("duration_seconds")
            analyzer.analyze_call(bid=bid, callid=callid, transcript=transcript, speaker_segments=speaker_segments or [], actual_duration=actual_duration)
            success_count += 1
        except Exception as e:
            errors.append(f"{callid}: {e}")
            error_count += 1

    response_data: Dict[str, Any] = {"message": f"Analysis completed: {success_count} successful, {error_count} failed", "success_count": success_count, "error_count": error_count}
    if errors:
        response_data["errors"] = errors[:10]
    return response_data


# =============================================================================
# Pipeline config endpoints
# =============================================================================

class PipelineConfigRequest(BaseModel):
    pipeline_enabled: Optional[bool] = None
    source_db_host: Optional[str] = None
    source_db_port: Optional[int] = None
    source_db_user: Optional[str] = None
    source_db_password: Optional[str] = None  # stored encrypted
    source_db_name: Optional[str] = None
    stt_provider: Optional[str] = None
    stt_api_key: Optional[str] = None           # stored encrypted
    min_call_duration_s: Optional[int] = None
    sync_batch: Optional[int] = None
    transcribe_batch: Optional[int] = None
    sync_interval_s: Optional[int] = None
    lead_filter_enabled: Optional[bool] = None
    crm_provider: Optional[str] = None
    lookback_days: Optional[int] = None


@app.get("/pipeline/{bid}/config")
def get_pipeline_config(
    bid: str,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    db_handler.ensure_business_pipeline_config_table()
    cfg = db_handler.get_pipeline_config(bid)
    if not cfg:
        return {"bid": bid, "exists": False}

    # Mask encrypted secrets before returning
    safe = dict(cfg)
    for k in ("source_db_password_enc", "stt_api_key_enc"):
        if safe.get(k):
            safe[k] = "***"
    for col in ("created_at", "updated_at"):
        if hasattr(safe.get(col), "isoformat"):
            safe[col] = safe[col].isoformat()
    safe["exists"] = True
    return safe


@app.put("/pipeline/{bid}/config")
def save_pipeline_config(
    bid: str,
    body: PipelineConfigRequest,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_business_admin_or_master(bid, user)
    db_handler.ensure_business_pipeline_config_table()

    data = {k: v for k, v in body.dict().items() if v is not None}
    db_handler.save_pipeline_config(bid, data)
    return {"success": True, "message": f"Pipeline config saved for bid {bid}"}


@app.get("/pipeline/{bid}/status")
def get_pipeline_status(
    bid: str,
    user: Dict[str, Any] = Depends(_auth_user),
):
    """Return pipeline operational status: watermarks, pending counts, last errors."""
    _auth_user_for_bid(bid, user)
    db_handler.ensure_business_pipeline_config_table()
    db_handler.ensure_sync_watermarks_table()

    cfg = db_handler.get_pipeline_config(bid)
    call_sync_wm = db_handler.get_sync_watermark(bid, f"call_sync_{bid}")
    lsq_leads_wm = db_handler.get_sync_watermark(bid, "lsq_leads")

    # Counts from call_records if the table exists
    counts: Dict[str, Any] = {}
    try:
        db_handler.ensure_call_records_table(bid)
        table = f"{bid}_call_records"
        with db_handler.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"""SELECT status, COUNT(*) AS cnt
                    FROM `{table}` GROUP BY status"""
            )
            for row in (cursor.fetchall() or []):
                counts[row["status"]] = row["cnt"]
    except Exception:
        pass

    return {
        "bid": bid,
        "pipeline_enabled": bool(cfg.get("pipeline_enabled", False)) if cfg else False,
        "watermarks": {
            "call_sync": str(call_sync_wm) if call_sync_wm else None,
            "lsq_leads": str(lsq_leads_wm) if lsq_leads_wm else None,
        },
        "call_record_counts": counts,
    }


# =============================================================================
# Agent config endpoints
# =============================================================================

class AgentConfigRequest(BaseModel):
    agent_name: str
    agent_enabled: Optional[bool] = True
    model_provider: Optional[str] = "bedrock"
    model_id: Optional[str] = "amazon.nova-lite-v1:0"
    system_prompt: Optional[str] = None
    user_prompt_template: Optional[str] = None
    output_schema: Optional[str] = None
    temperature: Optional[float] = 0.1
    max_tokens: Optional[int] = 4096


@app.get("/agents/{bid}")
def list_agent_configs(
    bid: str,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    db_handler.ensure_business_agent_config_table()
    configs = db_handler.get_agent_configs(bid)
    for cfg in configs:
        for col in ("created_at", "updated_at"):
            if hasattr(cfg.get(col), "isoformat"):
                cfg[col] = cfg[col].isoformat()
    return {"bid": bid, "agents": configs}


@app.post("/agents/{bid}")
def create_or_update_agent_config(
    bid: str,
    body: AgentConfigRequest,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_business_admin_or_master(bid, user)
    db_handler.ensure_business_agent_config_table()
    data = {k: v for k, v in body.dict().items() if v is not None}
    row_id = db_handler.save_agent_config(bid, data)
    return {"success": True, "id": row_id, "agent_name": body.agent_name}


@app.delete("/agents/{bid}/{agent_name}")
def delete_agent_config(
    bid: str,
    agent_name: str,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_business_admin_or_master(bid, user)
    deleted = db_handler.delete_agent_config(bid, agent_name)
    if not deleted:
        raise HTTPException(status_code=404, detail=f"Agent '{agent_name}' not found for bid {bid}")
    return {"success": True, "deleted": agent_name}


# =============================================================================
# Call records endpoints
# =============================================================================

@app.get("/call-records/{bid}")
def list_call_records(
    bid: str,
    page: int = Query(default=1, ge=1),
    page_size: int = Query(default=20, ge=1, le=200),
    status: Optional[str] = Query(default=None),
    agent_name: Optional[str] = Query(default=None),
    search: Optional[str] = Query(default=None),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    try:
        db_handler.ensure_call_records_table(bid)
        result = db_handler.get_call_records_list(
            bid=bid,
            page=page,
            page_size=page_size,
            status=status,
            agent_name=agent_name,
            search=search,
            date_from=date_from,
            date_to=date_to,
        )
        return result
    except Exception as exc:
        logger.error("list_call_records error bid=%s: %s", bid, exc, exc_info=True)
        raise HTTPException(status_code=500, detail="Failed to fetch call records")


@app.get("/call-records/{bid}/{callid:path}")
def get_call_record_detail(
    bid: str,
    callid: str,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    try:
        db_handler.ensure_call_records_table(bid)
        record = db_handler.get_call_record_detail(bid, callid)
        if not record:
            raise HTTPException(status_code=404, detail=f"Call record {callid} not found")
        return record
    except HTTPException:
        raise
    except Exception as exc:
        logger.error("get_call_record_detail error bid=%s callid=%s: %s", bid, callid, exc, exc_info=True)
        raise HTTPException(status_code=500, detail="Failed to fetch call record")


# ---------------------------------------------------------------------------
# STT Pipeline management endpoints
# ---------------------------------------------------------------------------

class SttPipelineConfigUpdate(BaseModel):
    enabled: Optional[bool] = None
    raw_calls_id_col: Optional[str] = None
    raw_calls_url_col: Optional[str] = None
    batch_size: Optional[int] = None
    poll_interval_s: Optional[int] = None
    notes: Optional[str] = None


def _stt_bid_row_with_stats(cfg: Dict[str, Any]) -> Dict[str, Any]:
    bid = cfg["bid"]
    stats = db_handler.get_stt_job_stats(bid=bid)
    return {
        **cfg,
        "enabled": bool(cfg.get("enabled")),
        "job_stats": {
            "pending": stats.get("pending", 0),
            "processing": stats.get("processing", 0),
            "done": stats.get("done", 0),
            "failed": stats.get("failed", 0),
        },
    }


@app.get("/stt-pipeline/bids")
def stt_pipeline_list_bids(user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    try:
        # Merge DB config rows with discovered tables
        configs = {c["bid"]: c for c in db_handler.get_stt_pipeline_bid_configs()}
        discovered = db_handler.discover_stt_raw_call_bids()
        for bid in discovered:
            if bid not in configs:
                configs[bid] = {
                    "bid": bid,
                    "enabled": False,
                    "raw_calls_id_col": "id",
                    "raw_calls_url_col": "recording_url",
                    "batch_size": 10,
                    "poll_interval_s": 30,
                    "notes": None,
                    "created_at": None,
                    "updated_at": None,
                }
        result = [_stt_bid_row_with_stats(v) for v in sorted(configs.values(), key=lambda x: x["bid"])]
        return {"bids": result, "total": len(result)}
    except Exception as exc:
        logger.error("stt_pipeline_list_bids error: %s", exc, exc_info=True)
        raise HTTPException(status_code=500, detail="Failed to list STT pipeline bids")


@app.post("/stt-pipeline/bids/{bid}/toggle")
def stt_pipeline_toggle_bid(
    bid: str,
    body: Dict[str, Any] = Body(...),
    user: Dict[str, Any] = Depends(_auth_user),
):
    if not user.get("is_master") and not user_has_business_admin(user, bid):
        raise HTTPException(status_code=403, detail="Business admin or master access required")
    enabled = bool(body.get("enabled", False))
    try:
        db_handler.toggle_stt_pipeline_bid(bid, enabled)
        return {"bid": bid, "enabled": enabled}
    except Exception as exc:
        logger.error("stt_pipeline_toggle_bid error bid=%s: %s", bid, exc, exc_info=True)
        raise HTTPException(status_code=500, detail="Failed to toggle STT pipeline bid")


@app.put("/stt-pipeline/bids/{bid}/config")
def stt_pipeline_update_config(
    bid: str,
    body: SttPipelineConfigUpdate,
    user: Dict[str, Any] = Depends(_auth_user),
):
    if not user.get("is_master") and not user_has_business_admin(user, bid):
        raise HTTPException(status_code=403, detail="Business admin or master access required")
    data = {k: v for k, v in body.dict().items() if v is not None}
    try:
        db_handler.upsert_stt_pipeline_bid_config(bid, data)
        cfg = db_handler.get_stt_pipeline_bid_config(bid) or {"bid": bid, **data}
        return _stt_bid_row_with_stats(cfg)
    except Exception as exc:
        logger.error("stt_pipeline_update_config error bid=%s: %s", bid, exc, exc_info=True)
        raise HTTPException(status_code=500, detail="Failed to update STT pipeline config")


@app.get("/stt-pipeline/stats")
def stt_pipeline_stats(user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    try:
        stats = db_handler.get_stt_job_stats()
        return {
            "pending": stats.get("pending", 0),
            "processing": stats.get("processing", 0),
            "done": stats.get("done", 0),
            "failed": stats.get("failed", 0),
            "total": sum(stats.values()),
        }
    except Exception as exc:
        logger.error("stt_pipeline_stats error: %s", exc, exc_info=True)
        raise HTTPException(status_code=500, detail="Failed to get STT pipeline stats")


# ============================================================================
# PCA MASTER PANEL ENDPOINTS
# ============================================================================

import subprocess as _subprocess


def _check_process(keyword: str):
    """Return (running, pid, command) for the first process matching keyword."""
    try:
        result = _subprocess.run(["ps", "aux"], capture_output=True, text=True)
        for line in result.stdout.splitlines():
            if keyword in line and "grep" not in line:
                parts = line.split(None, 10)
                pid = int(parts[1]) if len(parts) > 1 else None
                cmd = parts[10].strip() if len(parts) > 10 else line
                return True, pid, cmd
        return False, None, None
    except Exception:
        return False, None, None


def _get_rabbitmq_queue_info(queue_name: str):
    """Return (message_count, consumer_count, error) for a RabbitMQ queue."""
    try:
        import pika
        rmq_host = os.getenv("RABBITMQ_HOST", "localhost")
        conn = pika.BlockingConnection(pika.ConnectionParameters(host=rmq_host, socket_timeout=3))
        ch = conn.channel()
        q = ch.queue_declare(queue=queue_name, passive=True)
        msgs = q.method.message_count
        consumers = q.method.consumer_count
        conn.close()
        return msgs, consumers, None
    except Exception as exc:
        return None, None, str(exc)


def _log_size_str(path: str):
    try:
        size = os.path.getsize(path)
        if size < 1024:
            return True, f"{size} B"
        elif size < 1024 * 1024:
            return True, f"{size/1024:.1f} KB"
        else:
            return True, f"{size/1024/1024:.1f} MB"
    except Exception:
        return False, "N/A"


@app.get("/pca/status")
def pca_status():
    """PCA Master Panel — live status of pipeline jobs, queues, DB, and logs."""
    orch_running, orch_pid, orch_cmd = _check_process("orchestrate_pipeline.py")
    loop_running, loop_pid, loop_cmd = _check_process("orchestrator_loop.sh")
    stt_running, stt_pid, stt_cmd = _check_process("run.py --worker")
    if not stt_running:
        stt_running, stt_pid, stt_cmd = _check_process("rabbitmq_transcription_worker")

    jobs = [
        {
            "name": "Orchestrator Loop",
            "job_key": "orchestrator-loop",
            "script": "orchestrator_loop.sh",
            "description": "Runs orchestrate_pipeline.py every 5 minutes",
            "running": loop_running,
            "pid": loop_pid,
            "command": loop_cmd,
        },
        {
            "name": "Orchestrate Pipeline",
            "job_key": "orchestrate-pipeline",
            "script": "orchestrate_pipeline.py",
            "description": "Ingests calls, queues STT jobs, triggers analytics",
            "running": orch_running,
            "scheduled": loop_running and not orch_running,  # idle between loop invocations
            "pid": orch_pid,
            "command": orch_cmd,
        },
        {
            "name": "STT Worker (run.py)",
            "job_key": "stt-worker",
            "script": "run.py --worker",
            "description": "Consumes RabbitMQ jobs and calls Sarvam AI for transcription",
            "running": stt_running,
            "pid": stt_pid,
            "command": stt_cmd,
        },
    ]

    stt_msgs, stt_consumers, stt_err = _get_rabbitmq_queue_info("stt_jobs")
    queues = [
        {
            "name": "stt_jobs",
            "description": "STT transcription job queue (Sarvam AI)",
            "messages": stt_msgs,
            "consumers": stt_consumers,
            "error": stt_err,
        }
    ]

    db_status: Dict[str, Any] = {}
    try:
        with db_handler.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute("SELECT status, COUNT(*) as cnt FROM `1713_raw_calls` GROUP BY status ORDER BY status")
                rows = cur.fetchall()
                total = 0
                by_status: Dict[str, int] = {}
                for row in rows:
                    s = str(row["status"] if isinstance(row, dict) else row[0])
                    c = int(row["cnt"] if isinstance(row, dict) else row[1])
                    by_status[s] = c
                    total += c
                db_status = {"total": total, "by_status": by_status}
    except Exception as exc:
        db_status = {"error": str(exc)}

    base_dir = os.path.dirname(os.path.abspath(__file__))
    log_files = [
        {
            "name": "Orchestration Log",
            "filename": "orchestration.log",
            "log_key": "orchestration",
            "description": "All orchestration events — ingestion, queuing, analytics",
        },
        {
            "name": "Analytics Updates Log",
            "filename": "analytics_updates.log",
            "log_key": "analytics_updates",
            "description": "Per-call analytics success/failure records",
        },
    ]
    for lf in log_files:
        exists, size_str = _log_size_str(os.path.join(base_dir, lf["filename"]))
        lf["exists"] = exists
        lf["size"] = size_str

    return {
        "jobs": jobs,
        "queues": queues,
        "db_status": db_status,
        "logs": log_files,
        "timestamp": datetime.utcnow().isoformat() + "Z",
    }


@app.get("/pca/queue/{queue_name}/details")
def pca_queue_details(queue_name: str):
    """Detailed view of a RabbitMQ queue + matching DB record breakdown."""
    allowed_queues = {"stt_jobs"}
    if queue_name not in allowed_queues:
        raise HTTPException(status_code=404, detail="Unknown queue")

    # RabbitMQ stats
    msgs, consumers, q_err = _get_rabbitmq_queue_info(queue_name)

    # DB breakdown — all statuses for 1713_raw_calls
    db_breakdown: Dict[str, Any] = {}
    queued_calls: List[Dict] = []
    try:
        with db_handler.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(
                    "SELECT status, COUNT(*) as cnt FROM `1713_raw_calls` GROUP BY status ORDER BY status"
                )
                by_status: Dict[str, int] = {}
                total = 0
                for row in cur.fetchall():
                    s = str(row["status"] if isinstance(row, dict) else row[0])
                    c = int(row["cnt"] if isinstance(row, dict) else row[1])
                    by_status[s] = c
                    total += c
                db_breakdown = {"total": total, "by_status": by_status}

                # Calls currently queued (status=1) — pending transcription
                cur.execute(
                    "SELECT callid, agentname, groupname, call_starttime, call_endtime, fileurl "
                    "FROM `1713_raw_calls` WHERE status = 1 ORDER BY call_starttime DESC LIMIT 100"
                )
                for row in cur.fetchall():
                    r = dict(row) if isinstance(row, dict) else {
                        "callid": row[0], "agentname": row[1], "groupname": row[2],
                        "call_starttime": row[3], "call_endtime": row[4], "fileurl": row[5],
                    }
                    r["call_starttime"] = str(r["call_starttime"]) if r.get("call_starttime") else None
                    r["call_endtime"] = str(r["call_endtime"]) if r.get("call_endtime") else None
                    queued_calls.append(r)
    except Exception as exc:
        db_breakdown = {"error": str(exc)}

    return {
        "queue_name": queue_name,
        "queue": {"messages": msgs, "consumers": consumers, "error": q_err},
        "db_breakdown": db_breakdown,
        "queued_calls": queued_calls,
        "timestamp": datetime.utcnow().isoformat() + "Z",
    }


@app.get("/pca/failures")
def pca_failures():
    """Return failed/stuck call records across all active BIDs + error lines from logs."""
    base_dir = os.path.dirname(os.path.abspath(__file__))
    active_bids = ["1713", "7491"]

    stt_failed: List[Dict] = []
    invalid_url: List[Dict] = []
    analytics_no_transcript: List[Dict] = []

    try:
        with db_handler.get_connection() as conn:
            with conn.cursor() as cur:
                for bid in active_bids:
                    raw_table = f"`{bid}_raw_calls`"
                    resp_table = f"`{bid}_sarvamresponse`"

                    # status=-2: transient STT failures (rate limit / network error)
                    cur.execute(
                        f"SELECT callid, agentname, groupname, call_starttime, fileurl, '{bid}' as bid "
                        f"FROM {raw_table} WHERE status = -2 ORDER BY call_starttime DESC LIMIT 100"
                    )
                    for row in cur.fetchall():
                        r = dict(row) if isinstance(row, dict) else {}
                        if r.get("call_starttime"):
                            r["call_starttime"] = str(r["call_starttime"])
                        stt_failed.append(r)

                    # status=-1: permanently invalid URL (404)
                    cur.execute(
                        f"SELECT callid, agentname, groupname, call_starttime, fileurl, '{bid}' as bid "
                        f"FROM {raw_table} WHERE status = -1 ORDER BY call_starttime DESC LIMIT 100"
                    )
                    for row in cur.fetchall():
                        r = dict(row) if isinstance(row, dict) else {}
                        if r.get("call_starttime"):
                            r["call_starttime"] = str(r["call_starttime"])
                        invalid_url.append(r)

                    # status=2 with empty transcript (old RabbitMQ placeholder rows)
                    cur.execute(
                        f"SELECT r.callid, r.agentname, r.groupname, r.call_starttime, r.fileurl, '{bid}' as bid "
                        f"FROM {raw_table} r "
                        f"LEFT JOIN {resp_table} s ON r.callid = s.callid "
                        f"WHERE r.status = 2 AND (s.transcript IS NULL OR s.transcript = '') "
                        f"ORDER BY r.call_starttime DESC LIMIT 50"
                    )
                    for row in cur.fetchall():
                        r = dict(row) if isinstance(row, dict) else {}
                        if r.get("call_starttime"):
                            r["call_starttime"] = str(r["call_starttime"])
                        analytics_no_transcript.append(r)

    except Exception as exc:
        stt_failed = [{"error": str(exc)}]

    # Error lines from logs
    log_errors: Dict[str, List[str]] = {}
    for log_key, filename in [("orchestration", "orchestration.log"), ("analytics_updates", "analytics_updates.log")]:
        path = os.path.join(base_dir, filename)
        try:
            with open(path, "r") as f:
                all_lines = f.readlines()
            errors = [
                l.rstrip("\n") for l in all_lines[-2000:]
                if "ERROR" in l or "FAIL" in l or "WARNING" in l
            ][-100:]
            log_errors[log_key] = errors
        except Exception:
            log_errors[log_key] = []

    return {
        "stt_failed": stt_failed,
        "invalid_url": invalid_url,
        "analytics_no_transcript": analytics_no_transcript,
        "log_errors": log_errors,
        "summary": {
            "stt_failed_count": len(stt_failed),
            "invalid_url_count": len(invalid_url),
            "no_transcript_count": len(analytics_no_transcript),
        },
        "timestamp": datetime.utcnow().isoformat() + "Z",
    }


@app.get("/pca/api-credits")
def pca_api_credits():
    """Check status and key validity of external API providers."""
    import requests as _requests

    results: List[Dict[str, Any]] = []

    # ── Sarvam AI ────────────────────────────────────────────────────────────
    sarvam_key = os.getenv("SARVAM_SUBSCRIPTION_KEY", "")
    sarvam_info: Dict[str, Any] = {"provider": "Sarvam AI", "key_set": bool(sarvam_key)}
    if sarvam_key:
        try:
            # Probe with an intentionally invalid body — tells us if key is valid
            # without consuming credits
            r = _requests.post(
                "https://api.sarvam.ai/speech-to-text",
                headers={"api-subscription-key": sarvam_key},
                files={"file": ("probe.wav", b"RIFF\x00\x00\x00\x00WAVEfmt ", "audio/wav")},
                data={"model": "saarika:v2", "language_code": "hi-IN"},
                timeout=8,
            )
            body = r.json() if r.headers.get("content-type", "").startswith("application/json") else {}
            err_code = body.get("error", {}).get("code", "")
            if r.status_code == 401 or err_code in ("unauthorized", "invalid_api_key"):
                sarvam_info["status"] = "invalid_key"
                sarvam_info["status_label"] = "Invalid Key"
            elif err_code == "insufficient_quota_error":
                sarvam_info["status"] = "exhausted"
                sarvam_info["status_label"] = "Credits Exhausted"
            elif r.status_code in (200, 400, 422):
                # 400/422 = bad audio but key is accepted → key is valid & quota available
                sarvam_info["status"] = "active"
                sarvam_info["status_label"] = "Active"
            else:
                sarvam_info["status"] = "unknown"
                sarvam_info["status_label"] = f"Unknown (HTTP {r.status_code})"
        except Exception as exc:
            sarvam_info["status"] = "error"
            sarvam_info["status_label"] = f"Error: {exc}"
    else:
        sarvam_info["status"] = "not_configured"
        sarvam_info["status_label"] = "Not Configured"
    sarvam_info["dashboard_url"] = "https://dashboard.sarvam.ai"
    sarvam_info["model"] = "saaras:v2.5 (batch STT + diarization)"
    results.append(sarvam_info)

    # ── AWS Bedrock (Nova) ───────────────────────────────────────────────────
    aws_key = os.getenv("AWS_ACCESS_KEY_ID", "")
    aws_region = os.getenv("AWS_REGION", "us-east-1")
    nova_model = os.getenv("AWS_NOVA_MODEL", "amazon.nova-lite-v1:0")
    aws_info: Dict[str, Any] = {
        "provider": "AWS Bedrock",
        "key_set": bool(aws_key),
        "model": nova_model,
        "region": aws_region,
        "billing_type": "pay_per_use",
        "dashboard_url": "https://console.aws.amazon.com/billing/home",
    }
    if aws_key:
        try:
            import boto3
            client = boto3.client(
                "bedrock-runtime",
                region_name=aws_region,
                aws_access_key_id=aws_key,
                aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY", ""),
            )
            # Minimal probe: list foundation models (cheap/free call)
            bdc = boto3.client(
                "bedrock",
                region_name=aws_region,
                aws_access_key_id=aws_key,
                aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY", ""),
            )
            bdc.list_foundation_models(byOutputModality="TEXT")
            aws_info["status"] = "active"
            aws_info["status_label"] = "Active"
        except Exception as exc:
            msg = str(exc)
            if "InvalidClientTokenId" in msg or "InvalidSignature" in msg or "AuthFailure" in msg:
                aws_info["status"] = "invalid_key"
                aws_info["status_label"] = "Invalid Credentials"
            elif "ExpiredTokenException" in msg:
                aws_info["status"] = "expired"
                aws_info["status_label"] = "Token Expired"
            elif "AccessDeniedException" in msg:
                aws_info["status"] = "access_denied"
                aws_info["status_label"] = "Access Denied"
            else:
                aws_info["status"] = "error"
                aws_info["status_label"] = f"Error: {msg[:120]}"
    else:
        aws_info["status"] = "not_configured"
        aws_info["status_label"] = "Not Configured"
    results.append(aws_info)

    return {"providers": results, "timestamp": datetime.utcnow().isoformat() + "Z"}


@app.get("/pca/logs/{log_key}")
def pca_log_tail(log_key: str, lines: int = Query(default=200, le=1000, ge=1),
                 filter: Optional[str] = Query(default=None)):
    """Return the last N lines of a pipeline log file."""
    allowed = {
        "orchestration": "orchestration.log",
        "analytics_updates": "analytics_updates.log",
    }
    if log_key not in allowed:
        raise HTTPException(status_code=404, detail="Unknown log file")

    base_dir = os.path.dirname(os.path.abspath(__file__))
    log_path = os.path.join(base_dir, allowed[log_key])

    if not os.path.exists(log_path):
        return {"log_key": log_key, "lines": [], "total_lines": 0, "error": "Log file not found"}

    try:
        with open(log_path, "r") as f:
            all_lines = f.readlines()
        if filter == "errors":
            filtered = [l for l in all_lines if "ERROR" in l or "FAIL" in l]
        elif filter == "warnings":
            filtered = [l for l in all_lines if "WARNING" in l or "WARN" in l]
        else:
            filtered = all_lines
        tail = [l.rstrip("\n") for l in filtered[-lines:]]
        return {
            "log_key": log_key,
            "filename": allowed[log_key],
            "filter": filter,
            "lines": tail,
            "total_lines": len(all_lines),
            "filtered_total": len(filtered),
            "returned_lines": len(tail),
        }
    except Exception as exc:
        raise HTTPException(status_code=500, detail=str(exc))


class PCAStartJobRequest(BaseModel):
    bid: str = "1713"
    limit: int = 50
    ignore_watermark: bool = False


@app.post("/pca/jobs/{job_key}/start")
def pca_start_job(job_key: str, body: PCAStartJobRequest = PCAStartJobRequest()):
    """Start a pipeline job as a detached background process."""
    base_dir = os.path.dirname(os.path.abspath(__file__))
    stt_dir = os.path.join(base_dir, "..", "call-proccessing", "stt_pipeline")
    stt_dir = os.path.normpath(stt_dir)
    venv_python = os.path.join(stt_dir, "venv", "bin", "python3")
    if not os.path.exists(venv_python):
        venv_python = "python3"

    _JOBS = {
        "orchestrator-loop": {
            "check_keyword": "orchestrator_loop.sh",
            "cmd": ["bash", "orchestrator_loop.sh"],
            "cwd": base_dir,
        },
        "orchestrate-pipeline": {
            "check_keyword": "orchestrate_pipeline.py",
            "cmd": None,  # built dynamically
            "cwd": base_dir,
        },
        "stt-worker": {
            "check_keyword": "run.py --worker",
            "cmd": [venv_python, "run.py", "--worker"],
            "cwd": stt_dir,
        },
    }

    if job_key not in _JOBS:
        raise HTTPException(status_code=404, detail=f"Unknown job: {job_key}")

    job_def = _JOBS[job_key]

    # Block if already running
    already_running, existing_pid, _ = _check_process(job_def["check_keyword"])
    if already_running:
        return {"started": False, "reason": "already_running", "pid": existing_pid}

    # Build dynamic command for orchestrate-pipeline
    if job_key == "orchestrate-pipeline":
        cmd = ["python3", "orchestrate_pipeline.py", "--bid", str(body.bid), "--limit", str(body.limit)]
        if body.ignore_watermark:
            cmd.append("--ignore-watermark")
    else:
        cmd = job_def["cmd"]

    try:
        proc = _subprocess.Popen(
            cmd,
            cwd=job_def["cwd"],
            stdout=_subprocess.DEVNULL,
            stderr=_subprocess.DEVNULL,
            start_new_session=True,
        )
        return {"started": True, "pid": proc.pid, "command": " ".join(cmd)}
    except Exception as exc:
        raise HTTPException(status_code=500, detail=f"Failed to start job: {exc}")


@app.get("/pca/stt/config")
def pca_get_stt_config(user: Dict[str, Any] = Depends(_auth_user)):
    """Return current STT key status (masked) for the master STT panel."""
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin required")
    import requests as _requests
    key = os.getenv("SARVAM_SUBSCRIPTION_KEY", "")
    masked = (key[:8] + "…" + key[-4:]) if len(key) > 12 else ("*" * len(key) if key else "")
    status = "not_configured"
    status_label = "Not Configured"
    if key:
        try:
            r = _requests.post(
                "https://api.sarvam.ai/speech-to-text",
                headers={"api-subscription-key": key},
                files={"file": ("probe.wav", b"RIFF\x00\x00\x00\x00WAVEfmt ", "audio/wav")},
                data={"model": "saarika:v2", "language_code": "hi-IN"},
                timeout=8,
            )
            body = r.json() if r.headers.get("content-type", "").startswith("application/json") else {}
            err_code = body.get("error", {}).get("code", "")
            if r.status_code == 401 or err_code in ("unauthorized", "invalid_api_key"):
                status, status_label = "invalid_key", "Invalid Key"
            elif err_code == "insufficient_quota_error":
                status, status_label = "exhausted", "Credits Exhausted"
            elif r.status_code in (200, 400, 422):
                status, status_label = "active", "Active"
            else:
                status, status_label = "unknown", f"Unknown (HTTP {r.status_code})"
        except Exception as exc:
            status, status_label = "error", f"Error: {exc}"
    return {"key_masked": masked, "key_set": bool(key), "status": status, "status_label": status_label,
            "model": "saaras:v2.5", "dashboard_url": "https://dashboard.sarvam.ai"}


@app.post("/pca/stt/update-key")
async def pca_update_stt_key(request: Request, user: Dict[str, Any] = Depends(_auth_user)):
    """Replace the Sarvam subscription key in .env and process environment."""
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin required")
    data = await request.json()
    new_key = (data.get("key") or "").strip()
    if not new_key:
        raise HTTPException(status_code=400, detail="key is required")

    # Update live environment so the running process picks it up immediately
    os.environ["SARVAM_SUBSCRIPTION_KEY"] = new_key

    # Persist to .env file
    env_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env")
    try:
        if os.path.exists(env_path):
            with open(env_path, "r") as f:
                lines = f.readlines()
            found = False
            new_lines = []
            for line in lines:
                if line.startswith("SARVAM_SUBSCRIPTION_KEY=") or line.startswith("SARVAM_SUBSCRIPTION_KEY ="):
                    new_lines.append(f"SARVAM_SUBSCRIPTION_KEY={new_key}\n")
                    found = True
                else:
                    new_lines.append(line)
            if not found:
                new_lines.append(f"SARVAM_SUBSCRIPTION_KEY={new_key}\n")
            with open(env_path, "w") as f:
                f.writelines(new_lines)
    except Exception as exc:
        # Key is already live in env — log but don't fail
        logger.warning(f"Could not persist Sarvam key to .env: {exc}")

    auth_handler.log_activity(
        user_id=user.get("id"), username=user.get("username"),
        activity_type="update_stt_key",
        description="Updated Sarvam STT subscription key",
        ip_address=None, user_agent=None,
    )
    return {"updated": True}


@app.get("/pca/businesses-overview")
def pca_businesses_overview():
    """All businesses with pipeline stats for the master overview table."""
    businesses_list, status_code = auth_handler.get_all_businesses()
    if status_code != 200 or not isinstance(businesses_list, list):
        businesses_list = []

    loop_running, _, _ = _check_process("orchestrator_loop.sh")
    stt_running, _, _ = _check_process("run.py --worker")
    if not stt_running:
        stt_running, _, _ = _check_process("rabbitmq_transcription_worker")
    orch_running, _, _ = _check_process("orchestrate_pipeline.py")

    result_businesses = []
    try:
        with db_handler.get_connection() as conn:
            with conn.cursor() as cur:
                for biz in businesses_list:
                    bid = str(biz["bid"])
                    loop_running, loop_pid, _ = _check_process(f"orchestrator_loop_{bid}.sh")
                    entry: Dict[str, Any] = {
                        "bid": bid,
                        "name": biz.get("name", f"Business {bid}"),
                        "is_active": bool(biz.get("is_active", True)),
                        "has_pipeline": False,
                        "loop_running": loop_running,
                        "loop_pid": loop_pid,
                        "total_calls": 0,
                        "by_status": {},
                        "calls_today": 0,
                    }
                    cur.execute(f"SHOW TABLES LIKE '{bid}_raw_calls'")
                    if cur.fetchone():
                        entry["has_pipeline"] = True
                        cur.execute(f"SELECT status, COUNT(*) as cnt FROM `{bid}_raw_calls` GROUP BY status ORDER BY status")
                        by_status: Dict[str, int] = {}
                        total = 0
                        for row in cur.fetchall():
                            r = dict(row) if isinstance(row, dict) else {"status": row[0], "cnt": row[1]}
                            s = str(r["status"])
                            c = int(r["cnt"])
                            by_status[s] = c
                            total += c
                        entry["by_status"] = by_status
                        entry["total_calls"] = total
                        cur.execute(f"SELECT COUNT(*) as cnt FROM `{bid}_raw_calls` WHERE DATE(call_starttime) = CURDATE()")
                        row = cur.fetchone()
                        entry["calls_today"] = int(row["cnt"] if isinstance(row, dict) else row[0]) if row else 0
                    result_businesses.append(entry)
    except Exception as exc:
        return {"error": str(exc), "businesses": [], "global_jobs": {}}

    return {
        "businesses": result_businesses,
        "global_jobs": {
            "loop_running": loop_running,
            "orch_running": orch_running,
            "stt_running": stt_running,
        },
        "timestamp": datetime.utcnow().isoformat() + "Z",
    }


@app.get("/pca/bid/{bid}/stats")
def pca_bid_stats(bid: str):
    """Per-BID detailed stats: status breakdown, calls per day/hour, transcription minutes."""
    result: Dict[str, Any] = {"bid": bid}
    try:
        with db_handler.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(f"SHOW TABLES LIKE '{bid}_raw_calls'")
                if not cur.fetchone():
                    return {"bid": bid, "error": f"No raw_calls table for BID {bid}"}

                raw_table = f"`{bid}_raw_calls`"

                # Status breakdown
                cur.execute(f"SELECT status, COUNT(*) as cnt FROM {raw_table} GROUP BY status ORDER BY status")
                by_status: Dict[str, int] = {}
                total = 0
                for row in cur.fetchall():
                    r = dict(row) if isinstance(row, dict) else {"status": row[0], "cnt": row[1]}
                    s = str(r["status"])
                    c = int(r["cnt"])
                    by_status[s] = c
                    total += c
                result["by_status"] = by_status
                result["total"] = total

                # Calls per day (last 30 days)
                cur.execute(f"""
                    SELECT DATE(call_starttime) as day, COUNT(*) as cnt
                    FROM {raw_table}
                    WHERE call_starttime >= DATE_SUB(CURDATE(), INTERVAL 30 DAY)
                    GROUP BY DATE(call_starttime)
                    ORDER BY day
                """)
                rows = cur.fetchall()
                result["calls_per_day"] = [
                    {"date": str(r["day"] if isinstance(r, dict) else r[0]),
                     "count": int(r["cnt"] if isinstance(r, dict) else r[1])}
                    for r in rows
                ]

                # Calls per hour (today)
                cur.execute(f"""
                    SELECT HOUR(call_starttime) as hr, COUNT(*) as cnt
                    FROM {raw_table}
                    WHERE DATE(call_starttime) = CURDATE()
                    GROUP BY HOUR(call_starttime)
                    ORDER BY hr
                """)
                rows = cur.fetchall()
                result["calls_per_hour"] = [
                    {"hour": int(r["hr"] if isinstance(r, dict) else r[0]),
                     "count": int(r["cnt"] if isinstance(r, dict) else r[1])}
                    for r in rows
                ]

                # Calls today
                cur.execute(f"SELECT COUNT(*) as cnt FROM {raw_table} WHERE DATE(call_starttime) = CURDATE()")
                row = cur.fetchone()
                result["calls_today"] = int(row["cnt"] if isinstance(row, dict) else row[0]) if row else 0

                # Total transcription minutes from sarvam response
                cur.execute(f"SHOW TABLES LIKE '{bid}_sarvamresponse'")
                if cur.fetchone():
                    cur.execute(f"SELECT COALESCE(SUM(duration), 0) / 60 as total_minutes FROM `{bid}_sarvamresponse`")
                    row = cur.fetchone()
                    mins = float(row["total_minutes"] if isinstance(row, dict) else row[0]) if row else 0.0
                    result["total_minutes"] = round(mins, 1)
                else:
                    result["total_minutes"] = 0.0

    except Exception as exc:
        result["error"] = str(exc)

    return result


# ── Webhook: Call Ingest ──────────────────────────────────────────────────────
#
# POST /webhook/call-ingest
#   Accepts a call record pushed by any telephony platform.
#   Inserts into {bid}_raw_calls (status=0) and optionally queues for STT
#   (status=1) via the stt_jobs RabbitMQ queue.
#
# Authentication: X-Webhook-Secret header must match WEBHOOK_SECRET env var.
# Idempotency:    callid is a UNIQUE key — duplicate pushes do an UPDATE.
# ─────────────────────────────────────────────────────────────────────────────

_WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET", "")

_RAW_CALLS_DDL = """
CREATE TABLE IF NOT EXISTS `{bid}_raw_calls` (
    id                      INT AUTO_INCREMENT PRIMARY KEY,
    bid                     INT NOT NULL,
    callid                  VARCHAR(100) NOT NULL,
    fileurl                 TEXT,
    status                  INT DEFAULT 0 COMMENT '0=ingested,1=queued,2=transcribed,3=analyzed,-1=invalid_url,-2=stt_failed',
    agentname               VARCHAR(100),
    groupname               VARCHAR(255),
    call_starttime          DATETIME,
    call_endtime            DATETIME,
    call_status             VARCHAR(50),
    agent_callinfo          VARCHAR(100),
    customer_callinfo       VARCHAR(50),
    direction               VARCHAR(20) DEFAULT 'inbound',
    duration_seconds        INT,
    extra_fields            JSON,
    webhook_source          VARCHAR(100),
    transcription_requested TINYINT(1) DEFAULT 0,
    transcription_status    VARCHAR(20) DEFAULT 'not_requested',
    selected_for_processing TINYINT(1) DEFAULT 0,
    synced_at               DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at              DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    UNIQUE KEY uq_callid (callid),
    INDEX idx_bid (bid),
    INDEX idx_status (status),
    INDEX idx_agentname (agentname),
    INDEX idx_call_starttime (call_starttime),
    INDEX idx_call_status (call_status),
    INDEX idx_direction (direction),
    INDEX idx_transcription_status (transcription_status),
    INDEX idx_selected_for_processing (selected_for_processing)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
"""

# Webhook field schema — used by both /schema and /call-ingest validation
WEBHOOK_FIELD_SCHEMA = [
    {"field": "bid",              "type": "string",   "required": True,  "description": "Business ID (numeric string). Routes to {bid}_raw_calls table."},
    {"field": "call_id",          "type": "string",   "required": True,  "description": "Unique call identifier from the telephony platform."},
    {"field": "recording_url",    "type": "string",   "required": True,  "description": "Full URL to the call recording audio file."},
    {"field": "start_time",       "type": "datetime", "required": True,  "description": "Call start time (YYYY-MM-DD HH:MM:SS or ISO-8601)."},
    {"field": "end_time",         "type": "datetime", "required": False, "description": "Call end time (YYYY-MM-DD HH:MM:SS or ISO-8601)."},
    {"field": "agent_name",       "type": "string",   "required": False, "description": "Name of the agent who handled the call."},
    {"field": "agent_group",      "type": "string",   "required": False, "description": "Team/group the agent belongs to."},
    {"field": "agent_phone",      "type": "string",   "required": False, "description": "Agent's phone number (callfrom)."},
    {"field": "customer_phone",   "type": "string",   "required": False, "description": "Customer's phone number (callto)."},
    {"field": "call_status",      "type": "string",   "required": False, "description": "Telephony dial status (e.g. ANSWER, NO ANSWER, BUSY). Default: ANSWER."},
    {"field": "direction",        "type": "string",   "required": False, "description": "Call direction: inbound or outbound. Default: inbound."},
    {"field": "duration_seconds", "type": "integer",  "required": False, "description": "Total call duration in seconds."},
    {"field": "extra_fields",     "type": "object",   "required": False, "description": "Any additional key-value data to store as JSON alongside the call."},
    {"field": "auto_queue",       "type": "boolean",  "required": False, "description": "If true, immediately publish to STT queue (status→1). Default: false."},
    {"field": "source",           "type": "string",   "required": False, "description": "Label identifying the sending system (e.g. mcube, exotel, ozonetel)."},
]


def _ensure_raw_calls_table(conn, bid: str) -> None:
    """Create {bid}_raw_calls if it doesn't exist; add new columns to existing tables."""
    ddl = _RAW_CALLS_DDL.replace("{bid}", str(bid))
    with conn.cursor() as cur:
        cur.execute(ddl)
        # Add columns that may not exist on tables created before this webhook feature.
        # MySQL 8.0 does not support ADD COLUMN IF NOT EXISTS, so we check first.
        db_name = CONFIG.get("DB_NAME", "voicebot_cluster")
        cur.execute(
            "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS "
            "WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s",
            (db_name, f"{bid}_raw_calls"),
        )
        existing_cols = {row["COLUMN_NAME"] for row in cur.fetchall()}
        _NEW_COLS = [
            ("duration_seconds", "INT",           "direction"),
            ("extra_fields",     "JSON",           "duration_seconds"),
            ("webhook_source",   "VARCHAR(100)",   "extra_fields"),
        ]
        for col_name, col_def, after_col in _NEW_COLS:
            if col_name not in existing_cols:
                cur.execute(
                    f"ALTER TABLE `{bid}_raw_calls` ADD COLUMN `{col_name}` {col_def} AFTER `{after_col}`"
                )


def _parse_datetime(value: Any) -> Optional[str]:
    """Parse a datetime value to MySQL-compatible string or None."""
    if value is None:
        return None
    if isinstance(value, str):
        value = value.strip()
        if not value:
            return None
        # ISO-8601 with T separator → normalize to space
        value = value.replace("T", " ")
        # Trim timezone if present
        if "+" in value:
            value = value[: value.index("+")]
        elif value.endswith("Z"):
            value = value[:-1]
        return value[:19]  # keep YYYY-MM-DD HH:MM:SS
    if isinstance(value, datetime):
        return value.strftime("%Y-%m-%d %H:%M:%S")
    return str(value)


def _publish_to_stt_queue(bid: str, call_id: str, recording_url: str) -> bool:
    """Publish a job to the RabbitMQ stt_jobs queue. Returns True on success."""
    try:
        rmq_host = os.getenv("RABBITMQ_HOST", "localhost")
        rmq_queue = os.getenv("RABBITMQ_QUEUE", "stt_jobs")
        import pika as _pika
        rmq_conn = _pika.BlockingConnection(_pika.ConnectionParameters(host=rmq_host, socket_timeout=5))
        channel = rmq_conn.channel()
        channel.queue_declare(queue=rmq_queue, durable=True)
        payload = json.dumps({"bid": bid, "call_id": call_id, "recording_url": recording_url})
        channel.basic_publish(
            exchange="",
            routing_key=rmq_queue,
            body=payload,
            properties=_pika.BasicProperties(delivery_mode=2),
        )
        rmq_conn.close()
        return True
    except Exception as _exc:
        logger.warning("Webhook: failed to publish to RabbitMQ for %s/%s: %s", bid, call_id, _exc)
        return False


class WebhookCallIngestRequest(BaseModel):
    bid: str
    call_id: str
    recording_url: str
    start_time: str
    end_time: Optional[str] = None
    agent_name: Optional[str] = None
    agent_group: Optional[str] = None
    agent_phone: Optional[str] = None
    customer_phone: Optional[str] = None
    call_status: Optional[str] = "ANSWER"
    direction: Optional[str] = "inbound"
    duration_seconds: Optional[int] = None
    extra_fields: Optional[Dict[str, Any]] = None
    auto_queue: bool = False
    source: Optional[str] = None


@app.get("/webhook/call-ingest/schema")
def webhook_call_ingest_schema():
    """Return the accepted field schema for the call-ingest webhook."""
    return {
        "endpoint": "POST /webhook/call-ingest",
        "authentication": "X-Webhook-Secret header (matches WEBHOOK_SECRET env var)",
        "description": (
            "Push a call record into PCAA. The record is inserted into {bid}_raw_calls "
            "(status=0 ingested). If auto_queue=true it is immediately published to the "
            "stt_jobs RabbitMQ queue (status=1) to trigger transcription and analytics."
        ),
        "fields": WEBHOOK_FIELD_SCHEMA,
        "status_lifecycle": {
            "0": "Ingested — record stored, not yet queued",
            "1": "Queued — published to stt_jobs queue, awaiting STT worker",
            "2": "Transcribed — STT worker finished",
            "3": "Analyzed — analytics pipeline finished",
            "-1": "Invalid URL — recording file not reachable",
            "-2": "STT Failed — Sarvam/Deepgram returned an error",
        },
        "example_payload": {
            "bid": "1713",
            "call_id": "C9876543",
            "recording_url": "https://recordings.mcube.com/mcubefiles112/classic/2026/03/1713/inbound/call_9876543.mp3",
            "start_time": "2026-03-17 10:30:00",
            "end_time": "2026-03-17 10:35:00",
            "agent_name": "Rahul Sharma",
            "agent_group": "Presales Team",
            "agent_phone": "9876543210",
            "customer_phone": "9123456789",
            "call_status": "ANSWER",
            "direction": "inbound",
            "duration_seconds": 300,
            "auto_queue": True,
            "source": "mcube",
        },
    }


@app.post("/webhook/call-ingest")
def webhook_call_ingest(
    body: WebhookCallIngestRequest,
    x_webhook_secret: Optional[str] = Header(default=None),
):
    """
    Ingest a single call record pushed by any telephony platform.

    - Validates the webhook secret.
    - Ensures {bid}_raw_calls table exists.
    - Inserts/updates the record (idempotent on callid).
    - If auto_queue=true, publishes to stt_jobs queue immediately.
    """
    # ── Auth ─────────────────────────────────────────────────────────────────
    if not _WEBHOOK_SECRET:
        raise HTTPException(status_code=503, detail="Webhook not configured (WEBHOOK_SECRET not set)")
    if x_webhook_secret != _WEBHOOK_SECRET:
        raise HTTPException(status_code=401, detail="Invalid webhook secret")

    # ── Basic validation ─────────────────────────────────────────────────────
    bid = str(body.bid).strip()
    call_id = str(body.call_id).strip()
    recording_url = (body.recording_url or "").strip()

    if not bid or not bid.isdigit():
        raise HTTPException(status_code=400, detail="bid must be a non-empty numeric string")
    if not call_id:
        raise HTTPException(status_code=400, detail="call_id is required")
    if not recording_url:
        raise HTTPException(status_code=400, detail="recording_url is required")

    start_time = _parse_datetime(body.start_time)
    if not start_time:
        raise HTTPException(status_code=400, detail="start_time is required and must be a valid datetime")

    end_time = _parse_datetime(body.end_time)
    extra_json = json.dumps(body.extra_fields) if body.extra_fields else None

    # ── DB insert ─────────────────────────────────────────────────────────────
    try:
        with db_handler.get_connection() as conn:
            _ensure_raw_calls_table(conn, bid)

            initial_status = 1 if body.auto_queue else 0

            upsert_sql = f"""
                INSERT INTO `{bid}_raw_calls`
                    (bid, callid, fileurl, status, agentname, groupname,
                     call_starttime, call_endtime, call_status,
                     agent_callinfo, customer_callinfo, direction,
                     duration_seconds, extra_fields, webhook_source,
                     transcription_requested, transcription_status, selected_for_processing)
                VALUES
                    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    fileurl              = VALUES(fileurl),
                    agentname            = VALUES(agentname),
                    groupname            = VALUES(groupname),
                    call_starttime       = VALUES(call_starttime),
                    call_endtime         = VALUES(call_endtime),
                    call_status          = VALUES(call_status),
                    agent_callinfo       = VALUES(agent_callinfo),
                    customer_callinfo    = VALUES(customer_callinfo),
                    direction            = VALUES(direction),
                    duration_seconds     = VALUES(duration_seconds),
                    extra_fields         = VALUES(extra_fields),
                    webhook_source       = VALUES(webhook_source),
                    updated_at           = CURRENT_TIMESTAMP
            """
            with conn.cursor() as cur:
                cur.execute(upsert_sql, (
                    int(bid),
                    call_id,
                    recording_url,
                    initial_status,
                    body.agent_name or "",
                    body.agent_group or "",
                    start_time,
                    end_time,
                    body.call_status or "ANSWER",
                    body.agent_phone or "",
                    body.customer_phone or "",
                    body.direction or "inbound",
                    body.duration_seconds,
                    extra_json,
                    body.source or "",
                    1 if body.auto_queue else 0,
                    "queued" if body.auto_queue else "not_requested",
                    1 if body.auto_queue else 0,
                ))
                row_count = cur.rowcount  # 1=inserted, 2=updated, 0=no change

    except HTTPException:
        raise
    except Exception as exc:
        logger.error("Webhook call-ingest DB error bid=%s call_id=%s: %s", bid, call_id, exc)
        raise HTTPException(status_code=500, detail=f"Database error: {exc}")

    # ── RabbitMQ queue ────────────────────────────────────────────────────────
    queued = False
    queue_error = None
    if body.auto_queue:
        queued = _publish_to_stt_queue(bid, call_id, recording_url)
        if not queued:
            # Record was inserted with status=1 but queue failed — roll back to 0
            queue_error = "RabbitMQ publish failed; record stored at status=0"
            try:
                with db_handler.get_connection() as conn:
                    with conn.cursor() as cur:
                        cur.execute(
                            f"UPDATE `{bid}_raw_calls` SET status=0, transcription_requested=0, transcription_status='not_requested', selected_for_processing=0 WHERE callid=%s",
                            (call_id,),
                        )
            except Exception:
                pass

    action = "updated" if row_count == 2 else "inserted"
    final_status = (1 if queued else 0) if body.auto_queue else 0
    status_label = {0: "ingested", 1: "queued_for_stt", 2: "transcribed", 3: "analyzed"}.get(final_status, str(final_status))

    logger.info(
        "Webhook call-ingest: bid=%s call_id=%s action=%s status=%s queued=%s",
        bid, call_id, action, final_status, queued,
    )

    response: Dict[str, Any] = {
        "success": True,
        "action": action,
        "bid": bid,
        "call_id": call_id,
        "status": final_status,
        "status_label": status_label,
        "queued_for_stt": queued,
        "timestamp": datetime.utcnow().isoformat() + "Z",
    }
    if queue_error:
        response["warning"] = queue_error
    return response


@app.post("/webhook/call-ingest/batch")
def webhook_call_ingest_batch(
    payload: Dict[str, Any],
    x_webhook_secret: Optional[str] = Header(default=None),
):
    """
    Batch-ingest multiple call records in one request.
    Body: { "calls": [ <WebhookCallIngestRequest>, ... ] }
    Each call is processed independently; partial success is returned.
    """
    if not _WEBHOOK_SECRET:
        raise HTTPException(status_code=503, detail="Webhook not configured")
    if x_webhook_secret != _WEBHOOK_SECRET:
        raise HTTPException(status_code=401, detail="Invalid webhook secret")

    calls_raw = payload.get("calls", [])
    if not isinstance(calls_raw, list) or not calls_raw:
        raise HTTPException(status_code=400, detail="calls must be a non-empty list")
    if len(calls_raw) > 500:
        raise HTTPException(status_code=400, detail="Maximum 500 calls per batch request")

    results = []
    for raw in calls_raw:
        try:
            item = WebhookCallIngestRequest(**raw)
            # Re-use single-ingest logic by calling it inline
            result = webhook_call_ingest(item, x_webhook_secret=x_webhook_secret)
            results.append({"call_id": item.call_id, "success": True, **result})
        except HTTPException as exc:
            results.append({"call_id": raw.get("call_id", "?"), "success": False, "error": exc.detail})
        except Exception as exc:
            results.append({"call_id": raw.get("call_id", "?"), "success": False, "error": str(exc)})

    succeeded = sum(1 for r in results if r.get("success"))
    return {
        "total": len(results),
        "succeeded": succeeded,
        "failed": len(results) - succeeded,
        "results": results,
        "timestamp": datetime.utcnow().isoformat() + "Z",
    }


# ── CSV Export Endpoints ────────────────────────────────────────────────────

@app.get("/export/{bid}/transcriptions")
def export_transcriptions(
    bid: str,
    groupname: Optional[str] = Query(default=None),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    direction: Optional[str] = Query(default=None),
    call_status: Optional[str] = Query(default=None),
):
    """Stream a CSV of all transcribed calls with their transcripts."""
    raw_table = f"`{bid}_raw_calls`"
    sarvam_table = f"`{bid}_sarvamresponse`"

    conditions = ["s.transcript IS NOT NULL AND s.transcript != ''"]
    params: list = []

    if groupname:
        conditions.append("r.groupname = %s")
        params.append(groupname)
    if direction:
        conditions.append("r.direction = %s")
        params.append(direction)
    if call_status:
        conditions.append("r.call_status = %s")
        params.append(call_status.upper())
    if date_from:
        conditions.append("r.call_starttime >= %s")
        params.append(date_from)
    if date_to:
        conditions.append("r.call_starttime <= %s")
        params.append(date_to + " 23:59:59")

    where = "WHERE " + " AND ".join(conditions)

    sql = f"""
        SELECT
            r.callid,
            r.agentname,
            r.direction,
            r.call_status,
            r.call_starttime,
            r.groupname,
            s.duration,
            s.language,
            s.num_speakers,
            s.transcript
        FROM {raw_table} r
        JOIN {sarvam_table} s ON r.callid = s.callid
        {where}
        ORDER BY r.call_starttime DESC
    """

    def generate():
        output = StringIO()
        writer = csv.writer(output)
        writer.writerow([
            "Call ID", "Agent", "Direction", "Call Status", "Date/Time",
            "Location/Group", "Duration (s)", "Language", "Num Speakers", "Transcript"
        ])
        yield output.getvalue()
        output.seek(0); output.truncate(0)

        with db_handler.get_connection() as conn:
            with conn.cursor(pymysql.cursors.DictCursor) as cur:
                cur.execute(sql, params)
                for row in cur:
                    writer.writerow([
                        row.get("callid", ""),
                        row.get("agentname", ""),
                        row.get("direction", ""),
                        row.get("call_status", ""),
                        str(row.get("call_starttime", "")),
                        row.get("groupname", ""),
                        row.get("duration", ""),
                        row.get("language", ""),
                        row.get("num_speakers", ""),
                        (row.get("transcript") or "").replace("\n", " "),
                    ])
                    yield output.getvalue()
                    output.seek(0); output.truncate(0)

    filename = f"transcriptions_{bid}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.csv"
    return StreamingResponse(
        generate(),
        media_type="text/csv",
        headers={"Content-Disposition": f'attachment; filename="{filename}"'},
    )


@app.get("/export/{bid}/quality")
def export_quality(
    bid: str,
    groupname: Optional[str] = Query(default=None),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    direction: Optional[str] = Query(default=None),
    call_status: Optional[str] = Query(default=None),
):
    """Stream a CSV of all analyzed calls with quality scores and per-parameter breakdown."""
    raw_table = f"`{bid}_raw_calls`"
    analytics_table = f"`{bid}_callanalytics`"

    conditions = ["a.quality_score IS NOT NULL"]
    params: list = []

    if groupname:
        conditions.append("r.groupname = %s")
        params.append(groupname)
    if direction:
        conditions.append("r.direction = %s")
        params.append(direction)
    if call_status:
        conditions.append("r.call_status = %s")
        params.append(call_status.upper())
    if date_from:
        conditions.append("r.call_starttime >= %s")
        params.append(date_from)
    if date_to:
        conditions.append("r.call_starttime <= %s")
        params.append(date_to + " 23:59:59")

    where = "WHERE " + " AND ".join(conditions)

    sql = f"""
        SELECT
            r.callid,
            r.agentname,
            r.direction,
            r.call_status,
            r.call_starttime,
            r.groupname,
            a.quality_score,
            a.total_possible_score,
            a.sentiment,
            a.call_purpose,
            a.summary,
            a.talk_listen_ratio,
            a.agent_speak_percentage,
            a.customer_speak_percentage,
            a.dead_air_percentage,
            a.parameter_scores
        FROM {raw_table} r
        JOIN {analytics_table} a ON r.callid = a.callid
        {where}
        ORDER BY r.call_starttime DESC
    """

    # Collect all rows first so we can determine parameter column names
    rows = []
    all_param_names: list = []
    seen_params: set = set()

    with db_handler.get_connection() as conn:
        with conn.cursor(pymysql.cursors.DictCursor) as cur:
            cur.execute(sql, params)
            for row in cur:
                ps_raw = row.get("parameter_scores")
                if ps_raw:
                    try:
                        ps = json.loads(ps_raw) if isinstance(ps_raw, str) else ps_raw
                    except Exception:
                        ps = {}
                else:
                    ps = {}
                row["_param_scores"] = ps
                for pname in ps:
                    if pname not in seen_params:
                        all_param_names.append(pname)
                        seen_params.add(pname)
                rows.append(row)

    def generate():
        output = StringIO()
        writer = csv.writer(output)

        # Header row
        base_headers = [
            "Call ID", "Agent", "Direction", "Call Status", "Date/Time", "Location/Group",
            "Quality Score", "Total Possible Score", "Score %",
            "Sentiment", "Call Purpose", "Summary",
            "Talk-Listen Ratio", "Agent Talk %", "Customer Talk %", "Dead Air %",
        ]
        param_headers = []
        for pname in all_param_names:
            param_headers += [f"{pname} (Score)", f"{pname} (Max)", f"{pname} (Reasoning)"]
        writer.writerow(base_headers + param_headers)
        yield output.getvalue()
        output.seek(0); output.truncate(0)

        for row in rows:
            qs = row.get("quality_score") or 0
            tps = row.get("total_possible_score") or 0
            score_pct = f"{round((float(qs) / float(tps)) * 100, 1)}%" if tps else ""
            base_row = [
                row.get("callid", ""),
                row.get("agentname", ""),
                row.get("direction", ""),
                row.get("call_status", ""),
                str(row.get("call_starttime", "")),
                row.get("groupname", ""),
                qs,
                tps,
                score_pct,
                row.get("sentiment", ""),
                row.get("call_purpose", ""),
                (row.get("summary") or "").replace("\n", " "),
                row.get("talk_listen_ratio", ""),
                row.get("agent_speak_percentage", ""),
                row.get("customer_speak_percentage", ""),
                row.get("dead_air_percentage", ""),
            ]
            ps = row["_param_scores"]
            param_cells = []
            for pname in all_param_names:
                pdata = ps.get(pname, {})
                param_cells += [
                    pdata.get("score", "N/A"),
                    pdata.get("max_score", ""),
                    (pdata.get("reasoning") or "").replace("\n", " "),
                ]
            writer.writerow(base_row + param_cells)
            yield output.getvalue()
            output.seek(0); output.truncate(0)

    filename = f"quality_report_{bid}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.csv"
    return StreamingResponse(
        generate(),
        media_type="text/csv",
        headers={"Content-Disposition": f'attachment; filename="{filename}"'},
    )
