import glob
import json
import logging
import time
import re
import os
import csv
import hashlib
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
import secrets
import string
from io import BytesIO
from io import StringIO
from datetime import datetime, time as dt_time, timedelta
from typing import Any, Dict, List, Optional
from urllib.parse import unquote

import pymysql
import requests
from fastapi import Body, Depends, FastAPI, File, Header, HTTPException, Query, Request, UploadFile
from fastapi.encoders import jsonable_encoder
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, PlainTextResponse, StreamingResponse
from starlette.middleware.base import BaseHTTPMiddleware
from pydantic import BaseModel, Field

from auth_handler import AuthHandler
from config import Config
from db_handler import DatabaseHandler
import rbac as rbac_module
from analytics import AnalyticsService
from quality_parameters_handler import QualityParametersHandler
from propensity_parameters_handler import PropensityParametersHandler
from objection_handler import ObjectionClassificationsHandler
from leadsquared_service import LeadSquaredService
from mcube_ai_agents import AGENT_PROMPTS, get_agent_instruction, get_default_catalog, normalize_agent
from rag_handler import RAGHandler
from lead_insights_service import build_lead_insights
from lead_upload_service import import_raw_calls_from_csv, insert_lead_recording
from call_ingest_service import (
    CallIngestService,
    CANONICAL_INGEST_SCHEMA,
    ingest_schema_for_bid,
    shared_ingest_secret,
)
from api_push_service import ApiPushService

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)


def _build_config_dict() -> Dict[str, Any]:
    cfg: Dict[str, Any] = {}
    for key in dir(Config):
        if key.isupper():
            cfg[key] = getattr(Config, key)
    return cfg


CONFIG = _build_config_dict()
auth_handler = AuthHandler(CONFIG)
db_handler = DatabaseHandler(CONFIG)
call_ingest_service = CallIngestService(db_handler)
api_push_service = ApiPushService(db_handler)
rag_handler = RAGHandler(CONFIG)
analytics_service = AnalyticsService(db_handler)
quality_params_handler = QualityParametersHandler(CONFIG)
propensity_params_handler = PropensityParametersHandler(CONFIG)
objection_handler = ObjectionClassificationsHandler(CONFIG)
_PRESALES_MAP_CACHE: Dict[str, Dict[str, Any]] = {}
_PRESALES_MAP_CACHE_TTL_SECONDS = 300


class IngestDocumentsRequest(BaseModel):
    documents: List[Dict[str, Any]] = Field(default_factory=list)


class IngestTranscriptsRequest(BaseModel):
    presales_only: bool = True
    limit: int = 1000
    overwrite_existing: bool = False
    callids: Optional[List[str]] = None


class QueryRequest(BaseModel):
    user_id: str
    message: str
    query_embedding: Optional[List[float]] = None
    conversation_id: Optional[str] = None
    top_k: Optional[int] = None
    min_similarity: Optional[float] = None
    metadata: Optional[Dict[str, Any]] = None
    profile_updates: Optional[Dict[str, Any]] = None
    agent_type: Optional[str] = "lead_quality_agent"


class TelephonyIntegrationConnectRequest(BaseModel):
    provider: str = Field(..., min_length=1)
    source_bid: str = Field(..., min_length=1)
    config: Optional[Dict[str, Any]] = None


ONBOARDING_INGEST_LOOKBACK_DAYS = (0, 1, 10, 20, 50)


class OnboardingProcessingRequest(BaseModel):
    provider: str = Field(..., min_length=1)
    source_bid: Optional[str] = None
    analysis_mode: str = "default"
    custom_parameters: Optional[List[Dict[str, Any]]] = None
    group_filter_enabled: bool = False
    allowed_groupnames: Optional[List[str]] = None
    ingest_lookback_days: int = 0


class TelephonySyncToDbRequest(BaseModel):
    records: List[Dict[str, Any]] = Field(default_factory=list)


class ScoreCallRequest(BaseModel):
    call_id: str
    user_id: Optional[str] = "system"
    agent_type: Optional[str] = "lead_quality_agent"
    force_llm: bool = False


class AgentConfigUpsertRequest(BaseModel):
    display_name: Optional[str] = None
    is_enabled: Optional[bool] = None
    is_locked: Optional[bool] = None
    system_prompt: Optional[str] = None
    provider: Optional[str] = None
    model_name: Optional[str] = None
    runtime_config: Optional[Dict[str, Any]] = None


class AgentAnalyzeCallRequest(BaseModel):
    call_id: str
    agent_type: Optional[str] = "lead_quality_agent"
    force_refresh: bool = False


class ReportedIssueCreateRequest(BaseModel):
    business_id: str = Field(..., min_length=1)
    business_name: Optional[str] = None
    business_user_name: Optional[str] = None
    page_path: Optional[str] = None
    call_id: Optional[str] = None
    issue_text: str = Field(..., min_length=1)
    scope: Optional[str] = None


class ReportedIssueUpdateRequest(BaseModel):
    status: str = Field(..., min_length=1)


def _extract_bearer_token(authorization: Optional[str]) -> str:
    if not authorization or not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Missing authorization header")
    return authorization.replace("Bearer ", "", 1).strip()


def _has_business_access(user: Dict[str, Any], bid: str) -> bool:
    if not user:
        return False
    if user.get("is_master"):
        return True
    businesses = user.get("businesses", []) or []
    for business in businesses:
        if str(business.get("bid")) == str(bid):
            return True
    return False


def _auth_user(authorization: Optional[str] = Header(default=None)) -> Dict[str, Any]:
    token = _extract_bearer_token(authorization)
    user = auth_handler.validate_token(token)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid or expired token")
    return user


def _optional_auth_user(authorization: Optional[str] = Header(default=None)) -> Optional[Dict[str, Any]]:
    if not authorization or not authorization.startswith("Bearer "):
        return None
    token = authorization.replace("Bearer ", "", 1).strip()
    if not token:
        return None
    return auth_handler.validate_token(token)


def _auth_user_for_bid(bid: str, user: Dict[str, Any]) -> Dict[str, Any]:
    if not _has_business_access(user, bid):
        raise HTTPException(status_code=403, detail=f"Access denied to business {bid}")
    return user


def _as_bool(value: Optional[str], default: bool = False) -> bool:
    if value is None:
        return default
    return str(value).strip().lower() in ("1", "true", "yes", "y", "on")


def user_has_business_admin(user: Dict[str, Any], bid: str) -> bool:
    if rbac_module.is_business_admin(user, bid):
        return True
    user_id = user.get("id")
    if not user_id:
        return False
    try:
        for business in auth_handler.get_user_businesses(user_id) or []:
            if str(business.get("bid")) == str(bid):
                role = str(business.get("role") or "").strip().lower()
                if role in ("admin", "business_admin"):
                    return True
    except Exception:
        pass
    return False


def _require_business_admin_or_master(bid: str, user: Dict[str, Any]) -> None:
    if not user_has_business_admin(user, bid):
        raise HTTPException(status_code=403, detail="Business admin access required")


def _require_settings_manage_or_admin(bid: str, user: Dict[str, Any]) -> None:
    if user.get("is_master") or user_has_business_admin(user, bid):
        return
    _require_permission(bid, user, "settings.manage")


def _require_permission(bid: str, user: Dict[str, Any], permission: str) -> None:
    if not rbac_module.has_permission(user, bid, permission, auth_handler, reload=True):
        raise HTTPException(status_code=403, detail=f"Permission denied: {permission}")


def _scope_clause_for_user(user: Optional[Dict[str, Any]], bid: str) -> tuple:
    """Return (scope_sql, scope_params) for raw_calls filtering; empty when not scoped."""
    if not user or not _has_business_access(user, bid):
        return "", []
    if not rbac_module.should_apply_data_scope(user, bid, auth_handler):
        return "", []
    ctx = rbac_module.resolve_business_context(user, bid, auth_handler, reload=True)
    return rbac_module.build_raw_call_scope_sql(ctx.get("scope") or {})


def _assert_call_in_scope(user: Optional[Dict[str, Any]], bid: str, call_row: Dict[str, Any]) -> None:
    if not user or not _has_business_access(user, bid):
        return
    if not rbac_module.should_apply_data_scope(user, bid, auth_handler):
        return
    ctx = rbac_module.resolve_business_context(user, bid, auth_handler, reload=True)
    if not rbac_module.call_record_in_scope(call_row, ctx.get("scope") or {}):
        raise HTTPException(status_code=404, detail="Call not found")


def _enrich_auth_user(user: Dict[str, Any]) -> Dict[str, Any]:
    """Reload per-business role/permissions/scope from DB for /auth/me."""
    if user.get("is_master"):
        return user
    user_id = user.get("id")
    if not user_id:
        return user
    enriched = dict(user)
    businesses = auth_handler.get_user_businesses(user_id) or []
    enriched["businesses"] = businesses
    return enriched


def _resolve_list_scope(
    user: Optional[Dict[str, Any]],
    bid: str,
    groupname: Optional[str] = None,
    agent_names: Optional[str] = None,
) -> Dict[str, Any]:
    scope_where, scope_params = _scope_clause_for_user(user, bid)
    effective_groupname = groupname
    effective_agent_names = agent_names
    if user and _has_business_access(user, bid):
        effective_groupname = rbac_module.effective_groupname_for_scope(user, bid, auth_handler, groupname)
        effective_agent_names = rbac_module.effective_agent_names_for_scope(
            user, bid, auth_handler, agent_names
        )
    return {
        "groupname": effective_groupname,
        "agent_names": effective_agent_names,
        "scope_where": scope_where or None,
        "scope_params": scope_params or None,
    }


def get_sync_source_db_config() -> Dict[str, Any]:
    return {
        "host": os.getenv("SYNC_SOURCE_DB_HOST", CONFIG.get("DB_HOST", "127.0.0.1")),
        "port": int(os.getenv("SYNC_SOURCE_DB_PORT", CONFIG.get("DB_PORT", 3306))),
        "user": os.getenv("SYNC_SOURCE_DB_USER", CONFIG.get("DB_USER", "admin")),
        "password": os.getenv("SYNC_SOURCE_DB_PASSWORD", CONFIG.get("DB_PASSWORD", "")),
        "database": os.getenv("SYNC_SOURCE_DB_NAME", CONFIG.get("DB_NAME", "voicebot_cluster")),
        "charset": "utf8mb4",
    }


def _first_env(*names: str) -> Optional[str]:
    for name in names:
        value = os.getenv(name)
        if value not in (None, ""):
            return value
    return None


def _provider_env_source_config(provider: str) -> Dict[str, Any]:
    provider_key = str(provider or "").strip().lower().replace("-", "_")
    if provider_key in {"mcube_2", "mcube2", "mcube_2_0", "mcube2_0"}:
        provider_aliases = {"mcube_2", "mcube2", "mcube_2_0", "mcube2_0"}
        prefix_sets = [("MCUBE2", "MCUBE_2", "MCUBE_2_0")]
        friendly = "Mcube 2.0"
    elif provider_key in {"mcube_classic", "classic", "sales", "mcube_sales"}:
        provider_aliases = {"mcube_classic", "classic", "sales", "mcube_sales"}
        prefix_sets = [("MCUBE_CLASSIC", "MCUBE_SALES", "SALES")]
        friendly = "Mcube Classic"
    else:
        raise HTTPException(status_code=400, detail="Unsupported telephony provider")

    prefixes = prefix_sets[0]
    host = _first_env(*(f"{p}_DB_HOST" for p in prefixes))
    user = _first_env(*(f"{p}_DB_USER" for p in prefixes))
    password = _first_env(*(f"{p}_DB_PASSWORD" for p in prefixes))
    database = _first_env(*(f"{p}_DB_NAME" for p in prefixes))
    port = _first_env(*(f"{p}_DB_PORT" for p in prefixes))

    if provider_key in {"mcube_classic", "classic", "sales", "mcube_sales"}:
        host = host or os.getenv("SYNC_SOURCE_DB_HOST")
        user = user or os.getenv("SYNC_SOURCE_DB_USER")
        password = password or os.getenv("SYNC_SOURCE_DB_PASSWORD")
        database = database or os.getenv("SYNC_SOURCE_DB_NAME")
        port = port or os.getenv("SYNC_SOURCE_DB_PORT")

    missing = [
        label
        for label, value in (
            ("host", host),
            ("user", user),
            ("password", password),
            ("database", database),
        )
        if not value
    ]
    if missing:
        existing = _existing_provider_source_config(provider_aliases)
        if existing:
            return existing
        raise HTTPException(status_code=400, detail=f"{friendly} default config is missing: {', '.join(missing)}")

    return {
        "host": host,
        "port": int(port or 3306),
        "user": user,
        "password": password,
        "database": database,
        "charset": "utf8mb4",
    }


def _existing_provider_source_config(provider_aliases: set) -> Optional[Dict[str, Any]]:
    try:
        with _db_conn() as conn:
            cursor = conn.cursor()
            placeholders = ", ".join(["%s"] * len(provider_aliases))
            cursor.execute(
                f"""
                SELECT bid
                FROM business_telephony_integrations
                WHERE LOWER(provider) IN ({placeholders})
                  AND is_active = 1
                ORDER BY updated_at DESC, created_at DESC
                LIMIT 5
                """,
                tuple(provider_aliases),
            )
            rows = cursor.fetchall() or []
    except Exception:
        return None

    for row in rows:
        for item in db_handler.list_telephony_integrations(row.get("bid")) or []:
            if str(item.get("provider") or "").lower() in provider_aliases:
                cfg = dict(item.get("config") or {})
                if cfg.get("host") and cfg.get("user") and cfg.get("password") and cfg.get("database"):
                    return {
                        "host": cfg.get("host"),
                        "port": int(cfg.get("port") or 3306),
                        "user": cfg.get("user"),
                        "password": cfg.get("password"),
                        "database": cfg.get("database"),
                        "charset": "utf8mb4",
                    }
    return None


def _normalize_phone_variants(phone: Any) -> List[str]:
    digits = "".join(ch for ch in str(phone or "") if ch.isdigit())
    if not digits:
        return []

    core10 = digits[-10:] if len(digits) > 10 else digits
    variants = {digits, f"+{digits}", core10}
    if len(core10) == 10:
        variants.update({f"91{core10}", f"+91{core10}", f"0{core10}"})
    return [v for v in variants if v]


def _extract_lsq_lead_list(payload: Any) -> List[Dict[str, Any]]:
    if payload is None:
        return []
    if isinstance(payload, list):
        return payload
    if isinstance(payload, dict):
        for key in ("List", "Data", "list", "data"):
            value = payload.get(key)
            if isinstance(value, list):
                return value
    return []

def _to_json_safe(value: Any) -> Any:
    if isinstance(value, datetime):
        return value.isoformat()
    if isinstance(value, dict):
        return {str(k): _to_json_safe(v) for k, v in value.items()}
    if isinstance(value, list):
        return [_to_json_safe(v) for v in value]
    return value


def _build_customer_profile(details: Dict[str, Any], crm: Dict[str, Any]) -> Dict[str, Any]:
    calls = details.get("calls") or []
    answered_calls = sum(1 for call in calls if str(call.get("call_status", "")).upper() == "ANSWER")
    transcript_calls = sum(1 for call in calls if bool(call.get("has_transcript")))
    crm_lead = crm.get("lead") if isinstance(crm, dict) else None
    return {
        "lead_phone": details.get("lead_phone"),
        "owner_name": (crm_lead or {}).get("owner_name") or details.get("owner_name"),
        "total_conversations": int(details.get("total_conversations") or 0),
        "answered_calls": int(answered_calls),
        "transcript_calls": int(transcript_calls),
        "avg_quality_score": float(details.get("avg_quality_score") or 0),
        "talk_listen_ratio": details.get("talk_listen_ratio") or "N/A",
        "total_duration_seconds": int(details.get("total_duration_seconds") or 0),
        "crm_connected": bool(crm.get("connected")) if isinstance(crm, dict) else False,
        "crm_matched": bool(crm.get("matched")) if isinstance(crm, dict) else False,
        "crm_status": (crm_lead or {}).get("status"),
        "crm_next_task_due_date": (crm_lead or {}).get("next_task_due_date"),
    }


def _db_conn():
    return pymysql.connect(
        host=CONFIG.get("DB_HOST", "127.0.0.1"),
        port=int(CONFIG.get("DB_PORT", 3306)),
        user=CONFIG.get("DB_USER", "admin"),
        password=CONFIG.get("DB_PASSWORD", ""),
        database=CONFIG.get("DB_NAME", "voicebot_cluster"),
        charset="utf8mb4",
        cursorclass=pymysql.cursors.DictCursor,
        autocommit=True,
    )


def _ensure_reported_issues_table() -> None:
    with _db_conn() as conn:
        cursor = conn.cursor()
        cursor.execute(
            """
            CREATE TABLE IF NOT EXISTS reported_issues (
                id BIGINT AUTO_INCREMENT PRIMARY KEY,
                business_id VARCHAR(50) NOT NULL,
                business_name VARCHAR(255) DEFAULT NULL,
                business_user_name VARCHAR(255) DEFAULT NULL,
                user_id BIGINT DEFAULT NULL,
                username VARCHAR(100) DEFAULT NULL,
                user_email VARCHAR(255) DEFAULT NULL,
                page_path VARCHAR(500) DEFAULT NULL,
                call_id VARCHAR(100) DEFAULT NULL,
                scope VARCHAR(100) DEFAULT NULL,
                issue_text TEXT NOT NULL,
                status ENUM('open','in_progress','resolved','dismissed') DEFAULT 'open',
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                INDEX idx_business_status_created (business_id, status, created_at),
                INDEX idx_status_created (status, created_at)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
            """
        )
        cursor.execute(
            """
            SELECT 1
            FROM INFORMATION_SCHEMA.COLUMNS
            WHERE TABLE_SCHEMA = DATABASE()
              AND TABLE_NAME = 'reported_issues'
              AND COLUMN_NAME = 'business_user_name'
            LIMIT 1
            """
        )
        if cursor.fetchone() is None:
            cursor.execute(
                """
                ALTER TABLE reported_issues
                ADD COLUMN business_user_name VARCHAR(255) DEFAULT NULL AFTER business_name
                """
            )


def _serialize_reported_issue(row: Dict[str, Any]) -> Dict[str, Any]:
    return _to_json_safe(row or {})


def _ensure_agent_tables():
    conn = _db_conn()
    try:
        cursor = conn.cursor()
        cursor.execute(
            """
            CREATE TABLE IF NOT EXISTS rag_agent_configs (
                id BIGINT AUTO_INCREMENT PRIMARY KEY,
                bid VARCHAR(50) NOT NULL,
                agent_type VARCHAR(100) NOT NULL,
                display_name VARCHAR(150) NOT NULL,
                is_enabled BOOLEAN DEFAULT FALSE,
                is_locked BOOLEAN DEFAULT TRUE,
                system_prompt LONGTEXT NOT NULL,
                provider VARCHAR(100) DEFAULT 'auto',
                model_name VARCHAR(150) DEFAULT '',
                runtime_config JSON,
                updated_by VARCHAR(100) DEFAULT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                UNIQUE KEY uniq_bid_agent (bid, agent_type),
                INDEX idx_bid_enabled (bid, is_enabled)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
            """
        )
        cursor.execute(
            """
            CREATE TABLE IF NOT EXISTS rag_agent_call_analysis (
                id BIGINT AUTO_INCREMENT PRIMARY KEY,
                bid VARCHAR(50) NOT NULL,
                call_id VARCHAR(100) NOT NULL,
                agent_type VARCHAR(100) NOT NULL,
                status ENUM('completed','failed') DEFAULT 'completed',
                model_name VARCHAR(150) DEFAULT '',
                summary TEXT,
                result_json JSON,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                UNIQUE KEY uniq_bid_call_agent (bid, call_id, agent_type),
                INDEX idx_bid_created (bid, created_at)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
            """
        )
        cursor.execute(
            """
            CREATE TABLE IF NOT EXISTS rag_agent_knowledge_uploads (
                id BIGINT AUTO_INCREMENT PRIMARY KEY,
                bid VARCHAR(50) NOT NULL,
                agent_type VARCHAR(100) NOT NULL,
                version_no INT NOT NULL,
                filename VARCHAR(255) NOT NULL,
                page_count INT NOT NULL,
                char_count INT NOT NULL,
                knowledge_bands JSON,
                compiled_prompt LONGTEXT,
                uploaded_by VARCHAR(100) DEFAULT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                UNIQUE KEY uniq_bid_agent_version (bid, agent_type, version_no),
                INDEX idx_bid_agent_created (bid, agent_type, created_at)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
            """
        )
    finally:
        conn.close()


def _ensure_default_agent_configs(bid: str):
    catalog = get_default_catalog()
    conn = _db_conn()
    try:
        cursor = conn.cursor()
        for item in catalog:
            cursor.execute(
                """
                INSERT INTO rag_agent_configs (
                    bid, agent_type, display_name, is_enabled, is_locked, system_prompt, provider, model_name, runtime_config
                )
                VALUES (%s, %s, %s, %s, %s, %s, 'auto', '', %s)
                ON DUPLICATE KEY UPDATE agent_type = agent_type
                """,
                (
                    str(bid),
                    str(item["agent_type"]),
                    str(item["display_name"]),
                    1 if item.get("is_enabled") else 0,
                    1 if item.get("is_locked") else 0,
                    str(item["instruction"]),
                    json.dumps(item.get("runtime_config") or {"temperature": 0.2, "top_p": 0.9}, ensure_ascii=True),
                ),
            )
    finally:
        conn.close()


def _get_agent_configs(bid: str) -> List[Dict[str, Any]]:
    _ensure_default_agent_configs(bid)
    conn = _db_conn()
    try:
        cursor = conn.cursor()
        cursor.execute(
            """
            SELECT bid, agent_type, display_name, is_enabled, is_locked, system_prompt, provider, model_name, runtime_config, updated_at
            FROM rag_agent_configs
            WHERE bid = %s
            ORDER BY id ASC
            """,
            (str(bid),),
        )
        rows = cursor.fetchall() or []
    finally:
        conn.close()
    for row in rows:
        if isinstance(row.get("runtime_config"), str):
            try:
                row["runtime_config"] = json.loads(row["runtime_config"])
            except Exception:
                row["runtime_config"] = {}
    return _to_json_safe(rows)


def _get_agent_config(bid: str, agent_type: str) -> Optional[Dict[str, Any]]:
    for row in _get_agent_configs(bid):
        if str(row.get("agent_type")) == str(agent_type):
            return row
    return None


def _resolve_agent_instruction_for_bid(bid: str, agent_type: str) -> str:
    cfg = _get_agent_config(bid, agent_type)
    if cfg and cfg.get("system_prompt"):
        return str(cfg.get("system_prompt"))
    return get_agent_instruction(agent_type)


def _extract_pdf_pages(pdf_bytes: bytes) -> List[str]:
    parser_error: Optional[Exception] = None
    for module_name in ("pypdf", "PyPDF2"):
        try:
            module = __import__(module_name)
            reader = module.PdfReader(BytesIO(pdf_bytes))
            pages: List[str] = []
            for page in reader.pages:
                pages.append((page.extract_text() or "").strip())
            return pages
        except Exception as exc:  # noqa: BLE001
            parser_error = exc
            continue
    # Offline fallback for restricted environments:
    # 1) estimate page count by /Type /Page markers
    # 2) extract rough text tokens per page section from literal PDF strings.
    try:
        page_count = len(re.findall(rb"/Type\s*/Page\b", pdf_bytes))
        if page_count <= 0:
            raise ValueError("No /Type /Page markers found")

        text_blob = pdf_bytes.decode("latin1", errors="ignore")
        page_splits = re.split(r"/Type\s*/Page\b", text_blob)
        candidate_sections = page_splits[1:] if len(page_splits) > 1 else [text_blob]

        pages: List[str] = []
        for idx in range(page_count):
            segment = candidate_sections[idx] if idx < len(candidate_sections) else ""
            literals = re.findall(r"\(([^()]*)\)", segment)
            cleaned = " ".join([re.sub(r"\\[nrtbf()\\]", " ", lit) for lit in literals]).strip()
            pages.append(cleaned or " ")

        return pages
    except Exception as fallback_exc:  # noqa: BLE001
        raise HTTPException(
            status_code=500,
            detail=(
                "PDF parser is unavailable on the server and fallback extraction failed. "
                "Install 'pypdf' inside backend venv for reliable parsing. "
                f"Last parser error: {parser_error}; fallback error: {fallback_exc}"
            ),
        )


def _assert_raj_presales_agent(agent_type: str):
    if agent_type != "presales_lead_agent":
        raise HTTPException(
            status_code=400,
            detail=(
                "This endpoint is currently enabled only for Raj Mehta (presales_lead_agent). "
                f"Received agent_type={agent_type}"
            ),
        )


def _list_conversations(bid: str, user_id: Optional[str], limit: int) -> Dict[str, Any]:
    conn = _db_conn()
    try:
        cursor = conn.cursor()
        if user_id:
            cursor.execute(
                """
                SELECT c.conversation_id, c.user_id, c.metadata, c.updated_at,
                    (SELECT content FROM rag_messages m WHERE m.bid = c.bid AND m.conversation_id = c.conversation_id AND m.role = 'user' ORDER BY m.id ASC LIMIT 1) AS first_user_message,
                    (SELECT content FROM rag_messages m WHERE m.bid = c.bid AND m.conversation_id = c.conversation_id ORDER BY m.id DESC LIMIT 1) AS last_message
                FROM rag_conversations c
                WHERE c.bid = %s AND c.user_id = %s
                ORDER BY c.updated_at DESC
                LIMIT %s
                """,
                (str(bid), str(user_id), int(limit)),
            )
        else:
            cursor.execute(
                """
                SELECT c.conversation_id, c.user_id, c.metadata, c.updated_at,
                    (SELECT content FROM rag_messages m WHERE m.bid = c.bid AND m.conversation_id = c.conversation_id AND m.role = 'user' ORDER BY m.id ASC LIMIT 1) AS first_user_message,
                    (SELECT content FROM rag_messages m WHERE m.bid = c.bid AND m.conversation_id = c.conversation_id ORDER BY m.id DESC LIMIT 1) AS last_message
                FROM rag_conversations c
                WHERE c.bid = %s
                ORDER BY c.updated_at DESC
                LIMIT %s
                """,
                (str(bid), int(limit)),
            )
        rows = cursor.fetchall() or []
    finally:
        conn.close()
    for row in rows:
        if isinstance(row.get("metadata"), str):
            try:
                row["metadata"] = json.loads(row["metadata"])
            except Exception:
                row["metadata"] = {}
    return {"conversations": _to_json_safe(rows)}


def _collect_call_context(bid: str, call_id: str) -> Dict[str, Any]:
    call = db_handler.get_call_by_id(bid, call_id) or {}
    transcript = db_handler.get_call_transcript(bid, call_id) or {}
    analytics = db_handler.get_call_analytics(bid, call_id) or {}
    bant = db_handler.get_bant_analysis(bid, call_id) or {}
    return {
        "call": _to_json_safe(call),
        "transcript": _to_json_safe(transcript),
        "analytics": _to_json_safe(analytics),
        "bant": _to_json_safe(bant),
    }


def _score_call_fallback(context: Dict[str, Any]) -> Dict[str, Any]:
    analytics = context.get("analytics") or {}
    bant = context.get("bant") or {}
    transcript = context.get("transcript") or {}
    quality = analytics.get("quality_score")
    sentiment = analytics.get("sentiment")
    summary = analytics.get("summary") or "Call analyzed using existing call metadata."
    profile = bant.get("profile") or {}
    profile_summary = bant.get("summary") or ""
    return {
        "summary": summary,
        "quality_score": quality if quality is not None else 0,
        "sentiment": sentiment or "unknown",
        "compliance_risk": "medium" if quality is not None and quality < 60 else "low",
        "customer_profile": {
            "profile": profile,
            "summary": profile_summary,
            "transcript_language": transcript.get("language"),
        },
        "recommended_actions": [
            "Review objection handling moments and add specific rebuttal playbook entries.",
            "Track repeat intent and follow-up commitments in CRM.",
            "Coach agent using this call transcript and quality notes.",
        ],
    }


def _resolve_agent_model_settings(bid: str, agent_type: str) -> Dict[str, Any]:
    cfg = _get_agent_config(bid, agent_type) or {}
    runtime_cfg = cfg.get("runtime_config") or {}
    if not isinstance(runtime_cfg, dict):
        runtime_cfg = {}
    return {
        "provider": str(cfg.get("provider") or "auto"),
        "model_name": str(cfg.get("model_name") or "").strip(),
        "runtime_config": runtime_cfg,
    }


def _score_call_with_llm(context: Dict[str, Any], bid: str, agent_type: str) -> Optional[Dict[str, Any]]:
    transcript_text = (context.get("transcript") or {}).get("transcript")
    if not transcript_text:
        return None

    settings = _resolve_agent_model_settings(bid, agent_type)
    prompt = (
        f"{_resolve_agent_instruction_for_bid(bid, agent_type)}\n"
        "Return STRICT JSON with keys: summary, quality_score, sentiment, compliance_risk, "
        "customer_profile, recommended_actions.\n"
        "quality_score must be 0-100 numeric.\n\n"
        f"CALL CONTEXT:\n{json.dumps(context, ensure_ascii=True)}"
    )
    answer = rag_handler._invoke_chat_model(  # noqa: SLF001
        prompt,
        provider=settings["provider"],
        model_name=settings["model_name"] or None,
        runtime_config=settings["runtime_config"],
    )
    if not answer:
        return None

    start = answer.find("{")
    end = answer.rfind("}")
    if start == -1 or end == -1 or end <= start:
        return None

    try:
        parsed = json.loads(answer[start : end + 1])
    except Exception:
        return None
    if not isinstance(parsed, dict):
        return None
    return parsed


def _agent_report(bid: str, agent_id: str, days: int) -> Dict[str, Any]:
    table_calls = f"{bid}_raw_calls"
    table_analytics = f"{bid}_callanalytics"
    conn = pymysql.connect(
        host=CONFIG.get("DB_HOST", "127.0.0.1"),
        port=int(CONFIG.get("DB_PORT", 3306)),
        user=CONFIG.get("DB_USER", "admin"),
        password=CONFIG.get("DB_PASSWORD", ""),
        database=CONFIG.get("DB_NAME", "voicebot_cluster"),
        charset="utf8mb4",
        cursorclass=pymysql.cursors.DictCursor,
        autocommit=True,
    )
    try:
        cursor = conn.cursor()
        query = f"""
            SELECT
                COUNT(*) AS total_calls,
                AVG(a.quality_score) AS avg_quality_score,
                AVG(a.agent_speak_percentage) AS avg_agent_speak_percentage,
                AVG(a.customer_speak_percentage) AS avg_customer_speak_percentage,
                SUM(CASE WHEN a.sentiment = 'positive' THEN 1 ELSE 0 END) AS positive_calls,
                SUM(CASE WHEN a.sentiment = 'neutral' THEN 1 ELSE 0 END) AS neutral_calls,
                SUM(CASE WHEN a.sentiment = 'negative' THEN 1 ELSE 0 END) AS negative_calls
            FROM `{table_calls}` r
            LEFT JOIN `{table_analytics}` a ON r.callid = a.callid
            WHERE COALESCE(NULLIF(r.agentname, ''), NULLIF(r.agent_callinfo, ''), 'Unknown') = %s
              AND r.call_starttime >= DATE_SUB(NOW(), INTERVAL %s DAY)
        """
        cursor.execute(query, (agent_id, int(days)))
        row = cursor.fetchone() or {}
    finally:
        conn.close()
    return {
        "agent_id": agent_id,
        "window_days": int(days),
        "metrics": _to_json_safe(row),
        "generated_at": datetime.utcnow().isoformat() + "Z",
    }


def _ingestion_progress(bid: str, limit_runs: int = 10) -> Dict[str, Any]:
    conn = pymysql.connect(
        host=CONFIG.get("DB_HOST", "127.0.0.1"),
        port=int(CONFIG.get("DB_PORT", 3306)),
        user=CONFIG.get("DB_USER", "admin"),
        password=CONFIG.get("DB_PASSWORD", ""),
        database=CONFIG.get("DB_NAME", "voicebot_cluster"),
        charset="utf8mb4",
        cursorclass=pymysql.cursors.DictCursor,
        autocommit=True,
    )
    try:
        cursor = conn.cursor()
        cursor.execute(
            """
            SELECT bid, status, last_run_started_at, last_run_finished_at, last_duration_ms,
                   processed_calls, ingested_documents, ingested_chunks, skipped, last_error, updated_at
            FROM rag_ingestion_progress
            WHERE bid = %s
            LIMIT 1
            """,
            (str(bid),),
        )
        current = cursor.fetchone() or {}
        cursor.execute(
            """
            SELECT id, bid, started_at, finished_at, duration_ms, status,
                   processed_calls, ingested_documents, ingested_chunks, skipped, error, created_at
            FROM rag_ingestion_runs
            WHERE bid = %s
            ORDER BY id DESC
            LIMIT %s
            """,
            (str(bid), int(limit_runs)),
        )
        runs = cursor.fetchall() or []
    finally:
        conn.close()
    return {"bid": str(bid), "current": _to_json_safe(current), "recent_runs": _to_json_safe(runs)}


def _run_agent_call_analysis(bid: str, call_id: str, agent_type: str) -> Dict[str, Any]:
    context = _collect_call_context(bid, call_id)
    transcript_obj = context.get("transcript") or {}
    if not transcript_obj.get("transcript"):
        raise HTTPException(status_code=404, detail=f"Transcript not found for call_id={call_id}")

    agent_instruction = _resolve_agent_instruction_for_bid(bid, agent_type)
    prompt = (
        f"{agent_instruction}\n"
        "Return STRICT JSON with keys: summary, score, risks, opportunities, actions, model_data.\n"
        "score must be 0-100 numeric.\n\n"
        f"CALL CONTEXT:\n{json.dumps(context, ensure_ascii=True)}"
    )
    settings = _resolve_agent_model_settings(bid, agent_type)
    response_text = (
        rag_handler._invoke_chat_model(
            prompt,
            provider=settings["provider"],
            model_name=settings["model_name"] or None,
            runtime_config=settings["runtime_config"],
        )
        or ""
    )
    start = response_text.find("{")
    end = response_text.rfind("}")
    if start == -1 or end == -1 or end <= start:
        result_obj = {
            "summary": "Model returned non-JSON output.",
            "score": 0,
            "risks": [],
            "opportunities": [],
            "actions": [],
            "model_data": {"raw_response": response_text[:2000]},
        }
    else:
        try:
            result_obj = json.loads(response_text[start : end + 1])
        except Exception:
            result_obj = {
                "summary": "Model output parse failed.",
                "score": 0,
                "risks": [],
                "opportunities": [],
                "actions": [],
                "model_data": {"raw_response": response_text[:2000]},
            }

    conn = _db_conn()
    try:
        cursor = conn.cursor()
        cursor.execute(
            """
            INSERT INTO rag_agent_call_analysis (bid, call_id, agent_type, status, model_name, summary, result_json)
            VALUES (%s, %s, %s, 'completed', %s, %s, %s)
            ON DUPLICATE KEY UPDATE
                status='completed',
                model_name=VALUES(model_name),
                summary=VALUES(summary),
                result_json=VALUES(result_json),
                updated_at=CURRENT_TIMESTAMP
            """,
            (
                str(bid),
                str(call_id),
                str(agent_type),
                str(settings["model_name"] or CONFIG.get("RAG_CHAT_MODEL", "")),
                str(result_obj.get("summary") or ""),
                json.dumps(result_obj, ensure_ascii=True),
            ),
        )
    finally:
        conn.close()
    return {
        "bid": str(bid),
        "call_id": str(call_id),
        "agent_type": str(agent_type),
        "result": _to_json_safe(result_obj),
        "generated_at": datetime.utcnow().isoformat() + "Z",
    }


def _extract_lsq_lead_record(payload: Any) -> Optional[Dict[str, Any]]:
    if payload is None:
        return None
    if isinstance(payload, list):
        return payload[0] if payload else None
    if isinstance(payload, dict):
        for key in ("List", "Data", "list", "data"):
            value = payload.get(key)
            if isinstance(value, list) and value:
                return value[0]
        return payload
    return None


def _first_present_str(*values: Any) -> Optional[str]:
    for value in values:
        if value is None:
            continue
        text = str(value).strip()
        if text:
            return text
    return None


def _lsq_field(record: Optional[Dict[str, Any]], *keys: str) -> Optional[Any]:
    if not isinstance(record, dict):
        return None
    for key in keys:
        if key in record and record.get(key) not in (None, ""):
            return record.get(key)
    for key in keys:
        for record_key, value in record.items():
            if str(record_key).lower() == str(key).lower() and value not in (None, ""):
                return value
    return None


def _lsq_display_name(record: Optional[Dict[str, Any]]) -> Optional[str]:
    if not isinstance(record, dict):
        return None
    first = _first_present_str(_lsq_field(record, "FirstName", "first_name"))
    last = _first_present_str(_lsq_field(record, "LastName", "last_name"))
    full_name = " ".join([part for part in [first, last] if part]).strip() or None
    return _first_present_str(
        _lsq_field(record, "ProspectName", "prospect_name", "ContactName", "contact_name"),
        full_name,
        _lsq_field(record, "Name", "name"),
    )


def _select_lsq_record_for_phone(payload: Any, phone: Any) -> Optional[Dict[str, Any]]:
    target_variants = set(_normalize_phone_variants(phone))
    if not target_variants:
        return _extract_lsq_lead_record(payload)

    leads = _extract_lsq_lead_list(payload)
    if not leads:
        single = _extract_lsq_lead_record(payload)
        leads = [single] if isinstance(single, dict) else []

    for lead in leads:
        lead_phone = _lsq_field(lead, "Phone", "Mobile", "PhoneNumber", "phone", "mobile")
        if not lead_phone:
            continue
        lead_variants = set(_normalize_phone_variants(lead_phone))
        if target_variants.intersection(lead_variants):
            return lead
    return None


def _record_matches_phone(record: Any, phone: Any, fallback_phone: Optional[Any] = None) -> bool:
    target_variants = set(_normalize_phone_variants(phone))
    if not target_variants:
        return False

    lead_phone = _lsq_field(record, "Phone", "Mobile", "PhoneNumber", "phone", "mobile") if isinstance(record, dict) else None
    if lead_phone:
        return bool(target_variants.intersection(set(_normalize_phone_variants(lead_phone))))

    if fallback_phone:
        return bool(target_variants.intersection(set(_normalize_phone_variants(fallback_phone))))

    return False


def _lsq_external_id(record: Optional[Dict[str, Any]], fallback_phone: Optional[str] = None) -> Optional[str]:
    if not isinstance(record, dict):
        return None
    lead_id = _first_present_str(
        _lsq_field(record, "ProspectID", "LeadId", "Id", "lead_id", "ProspectId"),
    )
    if lead_id:
        return lead_id
    phone = _first_present_str(_lsq_field(record, "Phone", "Mobile", "PhoneNumber"), fallback_phone) or ""
    email = _first_present_str(_lsq_field(record, "EmailAddress", "email")) or ""
    name = _lsq_display_name(record) or ""
    digest_source = f"{phone}|{email}|{name}"
    return hashlib.sha256(digest_source.encode("utf-8")).hexdigest()


def _crm_payload_from_lsq(record: Dict[str, Any]) -> Dict[str, Any]:
    return {
        "raw": record,
        "name": _lsq_display_name(record),
        "email": _lsq_field(record, "EmailAddress", "email"),
        "phone": _lsq_field(record, "Phone", "Mobile", "PhoneNumber"),
        "status": _lsq_field(record, "LeadStatus", "Status", "lead_status"),
        "owner_name": _lsq_field(record, "OwnerName", "Owner", "owner_name"),
        "next_task_due_date": _lsq_field(record, "NextTaskDueDate", "next_task_due_date"),
    }


def _crm_write_allowed() -> bool:
    return str(os.getenv("ALLOW_CRM_WRITE", "false")).strip().lower() == "true"


def _get_lsq_service_for_bid_or_error(bid: str) -> LeadSquaredService:
    creds = db_handler.get_crm_credentials(bid, "leadsquared")
    if not creds or not creds.get("access_key") or not creds.get("secret_key") or not creds.get("is_active"):
        raise HTTPException(status_code=400, detail="LeadSquared integration not configured or inactive")
    return LeadSquaredService(
        access_key=creds["access_key"],
        secret_key=creds["secret_key"],
        api_host=creds.get("api_host"),
    )


def _get_presales_mapping_from_leadsquared(
    bid: str,
    groupname: Optional[str] = None,
    row_count: int = 250,
    force_refresh: bool = False,
) -> Dict[str, Any]:
    safe_row_count = max(10, min(int(row_count or 250), 500))
    cache_key = f"{bid}:{groupname or ''}:{safe_row_count}"
    now = int(time.time())

    cached = _PRESALES_MAP_CACHE.get(cache_key)
    if cached and not force_refresh and (now - int(cached.get("ts", 0)) <= _PRESALES_MAP_CACHE_TTL_SECONDS):
        return cached.get("data", {})

    creds = db_handler.get_crm_credentials(bid, "leadsquared")
    if not creds or not creds.get("access_key") or not creds.get("secret_key") or not creds.get("is_active"):
        return {
            "success": False,
            "connected": False,
            "message": "LeadSquared integration not configured or inactive",
            "groups": [],
            "agents": [],
            "customers": [],
            "stats": {"lsq_leads_fetched": 0, "matched_customers": 0, "mapped_rows": 0},
        }

    service = LeadSquaredService(
        access_key=creds["access_key"],
        secret_key=creds["secret_key"],
        api_host=creds.get("api_host"),
    )
    lsq_search = service.search_leads({"Paging": {"Offset": 0, "RowCount": safe_row_count}})
    if not lsq_search.get("success"):
        return {
            "success": False,
            "connected": True,
            "message": lsq_search.get("message") or "Failed to fetch leads from LeadSquared",
            "groups": [],
            "agents": [],
            "customers": [],
            "stats": {"lsq_leads_fetched": 0, "matched_customers": 0, "mapped_rows": 0},
        }

    leads = _extract_lsq_lead_list(lsq_search.get("data"))
    phone_to_lead: Dict[str, Dict[str, Any]] = {}
    all_phone_variants: set[str] = set()
    for lead in leads:
        phone = _lsq_field(lead, "Phone", "Mobile", "PhoneNumber")
        if not phone:
            continue
        for variant in _normalize_phone_variants(phone):
            all_phone_variants.add(variant)
            if variant not in phone_to_lead:
                phone_to_lead[variant] = lead

    rows = db_handler.get_group_agent_customer_rows(
        bid=bid,
        customer_numbers=list(all_phone_variants),
        groupname=groupname,
    )

    group_index: Dict[str, Dict[str, Any]] = {}
    agent_index: Dict[str, Dict[str, Any]] = {}
    customer_rows: List[Dict[str, Any]] = []

    for row in rows:
        group = row.get("groupname") or "-"
        agent = row.get("agentname") or "-"
        customer_phone = row.get("customer_callinfo") or ""
        matched_lead = None
        for variant in _normalize_phone_variants(customer_phone):
            matched_lead = phone_to_lead.get(variant)
            if matched_lead:
                break

        if group not in group_index:
            group_index[group] = {"groupname": group, "totalCalls": 0, "matchedCustomers": set(), "agents": set()}
        if agent not in agent_index:
            agent_index[agent] = {"agentname": agent, "groupnames": set(), "totalCalls": 0, "matchedCustomers": set()}

        total_calls = int(row.get("total_calls") or 0)
        group_index[group]["totalCalls"] += total_calls
        group_index[group]["agents"].add(agent)
        group_index[group]["matchedCustomers"].add(customer_phone)

        agent_index[agent]["totalCalls"] += total_calls
        agent_index[agent]["groupnames"].add(group)
        agent_index[agent]["matchedCustomers"].add(customer_phone)

        customer_rows.append(
            {
                "customer_callinfo": customer_phone,
                "groupname": group,
                "agentname": agent,
                "total_calls": total_calls,
                "last_call": row.get("last_call"),
                "lsq_name": _lsq_display_name(matched_lead),
                "lsq_owner_name": _lsq_field(matched_lead, "OwnerName", "Owner", "owner_name"),
                "lsq_status": _lsq_field(matched_lead, "LeadStatus", "Status", "lead_status"),
            }
        )

    groups = [
        {
            "groupname": val["groupname"],
            "totalCalls": int(val["totalCalls"]),
            "matchedCustomers": len(val["matchedCustomers"]),
            "agentsCount": len(val["agents"]),
        }
        for val in group_index.values()
    ]
    groups.sort(key=lambda x: x["totalCalls"], reverse=True)

    agents = []
    for val in agent_index.values():
        groupnames = sorted(val["groupnames"])
        agents.append(
            {
                "agentname": val["agentname"],
                "groupnames": groupnames,
                "groupname": groupnames[0] if groupnames else None,
                "totalCalls": int(val["totalCalls"]),
                "matchedCustomers": len(val["matchedCustomers"]),
            }
        )
    agents.sort(key=lambda x: x["totalCalls"], reverse=True)

    response = {
        "success": True,
        "connected": True,
        "message": "Pre-sales mapping generated from LeadSquared leads",
        "groups": groups,
        "agents": agents,
        "customers": customer_rows,
        "stats": {
            "lsq_leads_fetched": len(leads),
            "matched_customers": len({row["customer_callinfo"] for row in customer_rows}),
            "mapped_rows": len(customer_rows),
        },
    }
    _PRESALES_MAP_CACHE[cache_key] = {"ts": now, "data": response}
    return response


class StripApiPrefixMiddleware(BaseHTTPMiddleware):
    """Map public URLs under /api/* to app routes (/*) when the proxy forwards the full path."""

    _prefix = "/api"

    async def dispatch(self, request: Request, call_next):
        path = request.scope.get("path") or ""
        if path.startswith(self._prefix + "/"):
            request.scope["path"] = path[len(self._prefix) :] or "/"
        elif path == self._prefix or path == self._prefix + "/":
            request.scope["path"] = "/"
        return await call_next(request)


app = FastAPI(title="MCube AI FastAPI", version="1.0.0")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
app.add_middleware(StripApiPrefixMiddleware)

_metrics = {"requests_total": 0, "errors_total": 0}
_ensure_agent_tables()


@app.middleware("http")
async def metrics_and_latency(request: Request, call_next):
    _metrics["requests_total"] += 1
    started = time.perf_counter()
    try:
        response = await call_next(request)
    except Exception:
        _metrics["errors_total"] += 1
        raise
    latency_ms = round((time.perf_counter() - started) * 1000, 2)
    logger.info("path=%s status=%s latency_ms=%s", request.url.path, response.status_code, latency_ms)
    response.headers["X-Process-Time-Ms"] = str(latency_ms)
    return response


@app.get("/health")
def health():
    return {
        "status": "healthy",
        "service": "mcube-ai-fastapi",
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "metrics": _metrics,
    }


@app.get("/rag/agents")
def list_agents(
    bid: str = Query(..., min_length=1),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    return {"agents": _get_agent_configs(bid)}


@app.get("/rag/{bid}/agents/catalog")
def rag_agent_catalog(
    bid: str,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    return {
        "bid": str(bid),
        "catalog": get_default_catalog(),
        "providers": ["auto", "ollama", "bedrock"],
        "suggested_models": {
            "ollama": ["qwen2.5:7b", "gemma2:9b", "deepseek-r1:8b"],
            "bedrock": [str(CONFIG.get("RAG_CHAT_MODEL", ""))],
        },
    }


@app.get("/rag/{bid}/ollama/models")
def rag_ollama_models(
    bid: str,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    base_url = str(CONFIG.get("OLLAMA_BASE_URL", "http://127.0.0.1:11434")).rstrip("/")
    try:
        response = requests.get(f"{base_url}/api/tags", timeout=30)
        response.raise_for_status()
        payload = response.json() if response.content else {}
    except Exception as exc:  # noqa: BLE001
        raise HTTPException(status_code=502, detail=f"Failed to fetch models from Ollama ({base_url}): {exc}")
    models = payload.get("models") or []
    names = [str(item.get("name")) for item in models if item.get("name")]
    return {"ollama_base_url": base_url, "models": names, "raw": payload}


@app.post("/rag/{bid}/documents")
def rag_ingest_documents(bid: str, payload: IngestDocumentsRequest, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    if not payload.documents:
        raise HTTPException(status_code=400, detail="documents array is required")
    result = rag_handler.ingest_documents(bid, payload.documents)
    return {"status": "success", "ingestion": result}


@app.post("/rag/{bid}/ingest-transcripts")
def rag_ingest_transcripts(bid: str, payload: IngestTranscriptsRequest, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    return rag_handler.backfill_transcripts(
        bid=bid,
        presales_only=payload.presales_only,
        limit=int(payload.limit),
        overwrite_existing=payload.overwrite_existing,
        callids=payload.callids,
    )


@app.post("/rag/{bid}/query")
def rag_query(bid: str, payload: QueryRequest, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    if not payload.user_id:
        raise HTTPException(status_code=400, detail="user_id is required")
    if not payload.message or not payload.message.strip():
        raise HTTPException(status_code=400, detail="message is required")

    agent_type = normalize_agent(payload.agent_type)
    agent_cfg = _get_agent_config(bid, agent_type) or {}
    if agent_cfg.get("is_locked"):
        raise HTTPException(status_code=403, detail=f"{agent_type} is locked for this business")
    if agent_cfg.get("is_enabled") is False:
        raise HTTPException(status_code=403, detail=f"{agent_type} is not enabled for this business")
    agent_instruction = _resolve_agent_instruction_for_bid(bid, agent_type)
    metadata = dict(payload.metadata or {})
    metadata["agent_type"] = agent_type
    metadata["llm_provider"] = str(agent_cfg.get("provider") or "auto")
    metadata["llm_model_name"] = str(agent_cfg.get("model_name") or "")
    metadata["llm_runtime_config"] = agent_cfg.get("runtime_config") or {}
    # Keep embedding input compact (user message only) and pass agent instruction separately for generation.
    max_instruction_chars = int(CONFIG.get("RAG_AGENT_INSTRUCTION_MAX_CHARS", 12000))
    metadata["agent_instruction"] = str(agent_instruction or "")[:max_instruction_chars]
    metadata["agent_instruction_truncated"] = len(str(agent_instruction or "")) > max_instruction_chars

    result = rag_handler.query(
        bid=bid,
        user_id=payload.user_id,
        message=payload.message,
        query_embedding=payload.query_embedding,
        conversation_id=payload.conversation_id,
        top_k=payload.top_k,
        min_similarity=payload.min_similarity,
        metadata=metadata,
        profile_updates=payload.profile_updates,
    )
    result["agent_type"] = agent_type
    return result


@app.get("/rag/{bid}/conversations/{conversation_id}")
def rag_get_conversation(
    bid: str,
    conversation_id: str,
    limit: int = Query(default=200, ge=1, le=1000),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    messages = rag_handler.get_conversation_messages(bid, conversation_id, limit=limit)
    return {"conversation_id": conversation_id, "messages": messages}


@app.get("/rag/{bid}/conversations")
def rag_list_conversations(
    bid: str,
    user_id: Optional[str] = Query(default=None),
    limit: int = Query(default=50, ge=1, le=200),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    return _list_conversations(bid=bid, user_id=user_id, limit=limit)


@app.get("/rag/{bid}/profiles/{user_id}")
def rag_get_profile(bid: str, user_id: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    return rag_handler.get_user_profile(bid, user_id)


@app.post("/rag/{bid}/score-call")
def rag_score_call(bid: str, payload: ScoreCallRequest, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    context = _collect_call_context(bid, payload.call_id)
    transcript_obj = context.get("transcript") or {}
    if not transcript_obj.get("transcript"):
        raise HTTPException(status_code=404, detail=f"Transcript not found for call_id={payload.call_id}")

    scored = _score_call_with_llm(context, bid, normalize_agent(payload.agent_type))
    if payload.force_llm and not scored:
        raise HTTPException(status_code=502, detail="LLM scoring failed")
    response = scored or _score_call_fallback(context)
    return {
        "bid": str(bid),
        "call_id": payload.call_id,
        "agent_type": normalize_agent(payload.agent_type),
        "scorecard": response,
        "generated_at": datetime.utcnow().isoformat() + "Z",
    }


@app.get("/rag/{bid}/agent-report")
def rag_agent_report(
    bid: str,
    agent_id: str = Query(..., min_length=1),
    days: int = Query(default=30, ge=1, le=365),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    return _agent_report(bid, agent_id, days)


@app.get("/rag/{bid}/ingestion-progress")
def rag_ingestion_progress(
    bid: str,
    limit_runs: int = Query(default=10, ge=1, le=100),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    return _ingestion_progress(bid, limit_runs=limit_runs)


@app.put("/rag/{bid}/agents/config/{agent_type}")
def rag_upsert_agent_config(
    bid: str,
    agent_type: str,
    payload: AgentConfigUpsertRequest,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    agent_type = normalize_agent(agent_type)
    _ensure_default_agent_configs(bid)
    existing = _get_agent_config(bid, agent_type) or {}
    if not existing:
        raise HTTPException(status_code=404, detail=f"Unknown agent type: {agent_type}")

    update_obj = {
        "display_name": payload.display_name if payload.display_name is not None else existing.get("display_name"),
        "is_enabled": int(payload.is_enabled) if payload.is_enabled is not None else int(bool(existing.get("is_enabled"))),
        "is_locked": int(payload.is_locked) if payload.is_locked is not None else int(bool(existing.get("is_locked"))),
        "system_prompt": payload.system_prompt if payload.system_prompt is not None else existing.get("system_prompt"),
        "provider": payload.provider if payload.provider is not None else existing.get("provider"),
        "model_name": payload.model_name if payload.model_name is not None else existing.get("model_name"),
        "runtime_config": payload.runtime_config if payload.runtime_config is not None else (existing.get("runtime_config") or {}),
    }
    conn = _db_conn()
    try:
        cursor = conn.cursor()
        cursor.execute(
            """
            UPDATE rag_agent_configs
            SET display_name = %s,
                is_enabled = %s,
                is_locked = %s,
                system_prompt = %s,
                provider = %s,
                model_name = %s,
                runtime_config = %s,
                updated_by = %s
            WHERE bid = %s AND agent_type = %s
            """,
            (
                str(update_obj["display_name"]),
                int(update_obj["is_enabled"]),
                int(update_obj["is_locked"]),
                str(update_obj["system_prompt"]),
                str(update_obj["provider"] or "auto"),
                str(update_obj["model_name"] or ""),
                json.dumps(update_obj["runtime_config"], ensure_ascii=True),
                str(user.get("username") or user.get("id") or "system"),
                str(bid),
                str(agent_type),
            ),
        )
    finally:
        conn.close()
    return {"status": "ok", "agent": _get_agent_config(bid, agent_type)}


@app.post("/rag/{bid}/agents/{agent_type}/knowledge-pdf")
async def rag_upload_agent_knowledge_pdf(
    bid: str,
    agent_type: str,
    request: Request,
    filename: str = Query(..., min_length=1),
    append_to_existing_prompt: bool = Query(default=False),
    user: Dict[str, Any] = Depends(_auth_user),
):
    """
    Upload a knowledge PDF containing prompt + knowledge bands for an agent.
    For current rollout, this endpoint is restricted to the pre-sales lead agent.
    """
    _auth_user_for_bid(bid, user)
    agent_type = normalize_agent(agent_type)
    _assert_raj_presales_agent(agent_type)
    if not filename or not str(filename).lower().endswith(".pdf"):
        raise HTTPException(status_code=400, detail="Only .pdf files are supported")

    _ensure_default_agent_configs(bid)
    existing = _get_agent_config(bid, agent_type) or {}
    if not existing:
        raise HTTPException(status_code=404, detail=f"Unknown agent type: {agent_type}")

    content_type = str(request.headers.get("content-type") or "").lower()
    if "application/pdf" not in content_type:
        raise HTTPException(status_code=400, detail="Request content-type must be application/pdf")
    file_bytes = await request.body()
    if not file_bytes:
        raise HTTPException(status_code=400, detail="Uploaded PDF is empty")
    if len(file_bytes) > 50 * 1024 * 1024:
        raise HTTPException(status_code=400, detail="PDF exceeds 50MB limit")

    page_texts = _extract_pdf_pages(file_bytes)
    page_count = len(page_texts)
    if page_count < 1:
        raise HTTPException(status_code=400, detail="No pages were detected in the uploaded PDF")

    cleaned_pages = [str(text or "").strip() for text in page_texts]
    if not any(cleaned_pages):
        raise HTTPException(status_code=400, detail="Could not extract usable text from the PDF")

    knowledge_bands = [{"page": idx + 1, "text": text} for idx, text in enumerate(cleaned_pages)]
    compiled_knowledge = "\n\n".join(
        [f"[Knowledge Band Page {item['page']}]\n{item['text']}" for item in knowledge_bands if item["text"]]
    )
    if not compiled_knowledge:
        raise HTTPException(status_code=400, detail="No valid knowledge-band content found in the PDF")

    base_instruction = get_agent_instruction(agent_type)
    knowledge_instruction = (
        "Use the following uploaded prompt and knowledge bands as the primary rubric for evaluation.\n\n"
        f"{compiled_knowledge}"
    )
    if append_to_existing_prompt and existing.get("system_prompt"):
        system_prompt = f"{existing.get('system_prompt')}\n\n{knowledge_instruction}"
    else:
        system_prompt = f"{base_instruction}\n\n{knowledge_instruction}"

    runtime_config = dict(existing.get("runtime_config") or {})
    runtime_config["knowledge_pdf_meta"] = {
        "filename": str(filename),
        "uploaded_by": str(user.get("username") or user.get("id") or "system"),
        "uploaded_at": datetime.utcnow().isoformat() + "Z",
        "page_count": page_count,
        "char_count": len(compiled_knowledge),
    }
    runtime_config["knowledge_bands"] = knowledge_bands
    runtime_config["knowledge_compiled_prompt"] = compiled_knowledge

    conn = _db_conn()
    version_no = 1
    try:
        cursor = conn.cursor()
        cursor.execute(
            """
            SELECT COALESCE(MAX(version_no), 0) AS max_version
            FROM rag_agent_knowledge_uploads
            WHERE bid = %s AND agent_type = %s
            """,
            (str(bid), str(agent_type)),
        )
        row = cursor.fetchone() or {}
        version_no = int(row.get("max_version") or 0) + 1
        runtime_config["knowledge_pdf_meta"]["version_no"] = version_no
        cursor.execute(
            """
            UPDATE rag_agent_configs
            SET system_prompt = %s,
                runtime_config = %s,
                updated_by = %s
            WHERE bid = %s AND agent_type = %s
            """,
            (
                str(system_prompt),
                json.dumps(runtime_config, ensure_ascii=True),
                str(user.get("username") or user.get("id") or "system"),
                str(bid),
                str(agent_type),
            ),
        )
        cursor.execute(
            """
            INSERT INTO rag_agent_knowledge_uploads (
                bid, agent_type, version_no, filename, page_count, char_count,
                knowledge_bands, compiled_prompt, uploaded_by
            )
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
            """,
            (
                str(bid),
                str(agent_type),
                int(version_no),
                str(filename),
                int(page_count),
                int(len(compiled_knowledge)),
                json.dumps(knowledge_bands, ensure_ascii=True),
                str(compiled_knowledge),
                str(user.get("username") or user.get("id") or "system"),
            ),
        )
    finally:
        conn.close()

    return {
        "status": "ok",
        "bid": str(bid),
        "agent_type": str(agent_type),
        "version_no": int(version_no),
        "file": {
            "filename": str(filename),
            "page_count": page_count,
            "char_count": len(compiled_knowledge),
        },
        "agent": _get_agent_config(bid, agent_type),
    }


@app.get("/rag/{bid}/agents/{agent_type}/knowledge-pdf")
def rag_get_agent_knowledge_pdf(
    bid: str,
    agent_type: str,
    user: Dict[str, Any] = Depends(_auth_user),
):
    """
    Preview latest uploaded knowledge PDF extraction for agent configuration.
    """
    _auth_user_for_bid(bid, user)
    agent_type = normalize_agent(agent_type)
    _assert_raj_presales_agent(agent_type)

    cfg = _get_agent_config(bid, agent_type) or {}
    runtime_config = cfg.get("runtime_config") or {}
    meta = runtime_config.get("knowledge_pdf_meta") or {}
    knowledge_bands = runtime_config.get("knowledge_bands") or []
    compiled = runtime_config.get("knowledge_compiled_prompt") or ""

    if not meta:
        return {
            "status": "ok",
            "bid": str(bid),
            "agent_type": str(agent_type),
            "has_uploaded_knowledge": False,
            "knowledge_pdf": None,
        }

    return {
        "status": "ok",
        "bid": str(bid),
        "agent_type": str(agent_type),
        "has_uploaded_knowledge": True,
        "knowledge_pdf": {
            "meta": meta,
            "knowledge_bands": knowledge_bands,
            "compiled_prompt_preview": str(compiled)[:4000],
        },
    }


@app.get("/rag/{bid}/agents/{agent_type}/knowledge-pdf/history")
def rag_get_agent_knowledge_pdf_history(
    bid: str,
    agent_type: str,
    limit: int = Query(default=10, ge=1, le=50),
    user: Dict[str, Any] = Depends(_auth_user),
):
    """
    List previous uploaded versions for knowledge PDF on the agent.
    """
    _auth_user_for_bid(bid, user)
    agent_type = normalize_agent(agent_type)
    _assert_raj_presales_agent(agent_type)

    conn = _db_conn()
    try:
        cursor = conn.cursor()
        cursor.execute(
            """
            SELECT version_no, filename, page_count, char_count, uploaded_by, created_at
            FROM rag_agent_knowledge_uploads
            WHERE bid = %s AND agent_type = %s
            ORDER BY version_no DESC
            LIMIT %s
            """,
            (str(bid), str(agent_type), int(limit)),
        )
        rows = cursor.fetchall() or []
    finally:
        conn.close()

    return {
        "status": "ok",
        "bid": str(bid),
        "agent_type": str(agent_type),
        "versions": _to_json_safe(rows),
    }


@app.post("/rag/{bid}/agents/analyze-call")
def rag_agent_analyze_call(
    bid: str,
    payload: AgentAnalyzeCallRequest,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    agent_type = normalize_agent(payload.agent_type)
    agent_cfg = _get_agent_config(bid, agent_type) or {}
    if agent_cfg.get("is_locked"):
        raise HTTPException(status_code=403, detail=f"{agent_type} is locked for this business")
    if agent_cfg.get("is_enabled") is False:
        raise HTTPException(status_code=403, detail=f"{agent_type} is not enabled for this business")
    if not payload.force_refresh:
        conn = _db_conn()
        try:
            cursor = conn.cursor()
            cursor.execute(
                """
                SELECT bid, call_id, agent_type, model_name, summary, result_json, updated_at
                FROM rag_agent_call_analysis
                WHERE bid = %s AND call_id = %s AND agent_type = %s
                LIMIT 1
                """,
                (str(bid), str(payload.call_id), str(agent_type)),
            )
            cached = cursor.fetchone()
        finally:
            conn.close()
        if cached:
            if isinstance(cached.get("result_json"), str):
                try:
                    cached["result_json"] = json.loads(cached["result_json"])
                except Exception:
                    pass
            return {"status": "cached", "analysis": _to_json_safe(cached)}
    return {"status": "generated", "analysis": _run_agent_call_analysis(bid, payload.call_id, agent_type)}


@app.get("/rag/{bid}/agents/analysis/{call_id}")
def rag_get_agent_analysis(
    bid: str,
    call_id: str,
    agent_type: str = Query(default="lead_quality_agent"),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    agent_type = normalize_agent(agent_type)
    conn = _db_conn()
    try:
        cursor = conn.cursor()
        cursor.execute(
            """
            SELECT bid, call_id, agent_type, model_name, summary, result_json, updated_at
            FROM rag_agent_call_analysis
            WHERE bid = %s AND call_id = %s AND agent_type = %s
            LIMIT 1
            """,
            (str(bid), str(call_id), str(agent_type)),
        )
        row = cursor.fetchone()
    finally:
        conn.close()
    if not row:
        raise HTTPException(status_code=404, detail="No analysis found")
    if isinstance(row.get("result_json"), str):
        try:
            row["result_json"] = json.loads(row["result_json"])
        except Exception:
            pass
    return {"analysis": _to_json_safe(row)}


@app.post("/auth/register")
async def auth_register(payload: Dict[str, Any]):
    required_fields = ["bid", "username", "email", "password"]
    for field in required_fields:
        if not payload.get(field):
            raise HTTPException(status_code=400, detail=f"{field} is required")
    result, status_code = auth_handler.create_user(
        username=payload["username"],
        email=payload["email"],
        password=payload["password"],
        full_name=payload.get("full_name"),
        role=payload.get("role", "user"),
    )
    if status_code == 201:
        assign_result, assign_status = auth_handler.assign_business_access(
            user_id=result["id"],
            bid=str(payload["bid"]),
            role=payload.get("role", "user"),
        )
        if assign_status >= 400:
            return JSONResponse(status_code=assign_status, content=assign_result)
    return JSONResponse(status_code=status_code, content=result)


@app.post("/auth/login")
async def auth_login(payload: Dict[str, Any], request: Request):
    if not payload.get("username") or not payload.get("password"):
        raise HTTPException(status_code=400, detail="Username and password are required")
    result, status_code = auth_handler.login(
        payload["username"],
        payload["password"],
        ip_address=request.client.host if request.client else None,
        user_agent=request.headers.get("User-Agent"),
    )
    return JSONResponse(status_code=status_code, content=result)


@app.post("/auth/logout")
def auth_logout(user: Dict[str, Any] = Depends(_auth_user), authorization: Optional[str] = Header(default=None)):
    token = _extract_bearer_token(authorization)
    result, status_code = auth_handler.logout(token)
    return JSONResponse(status_code=status_code, content=result)


@app.get("/auth/me")
def auth_me(user: Dict[str, Any] = Depends(_auth_user)):
    return {"user": _enrich_auth_user(user)}


@app.get("/auth/users/{bid}")
def auth_users_for_business(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    if not user.get("is_master") and not user_has_business_admin(user, bid):
        _require_permission(bid, user, "users.view")
    users, status_code = auth_handler.get_users_by_business(bid)
    return JSONResponse(status_code=status_code, content=jsonable_encoder({"users": users}))


@app.get("/auth/role-permissions/{role}")
def auth_role_permissions(role: str, user: Dict[str, Any] = Depends(_auth_user)):
    normalized = AuthHandler.normalize_role(role)
    permissions = auth_handler.get_role_base_permissions(normalized)
    return {"role": normalized, "permissions": permissions}


@app.post("/business-admin/users/create")
async def business_admin_users_create(payload: Dict[str, Any], user: Dict[str, Any] = Depends(_auth_user)):
    for field in ["bid", "username", "email", "password"]:
        if not payload.get(field):
            raise HTTPException(status_code=400, detail=f"{field} is required")

    bid = str(payload.get("bid")).strip()
    if not user.get("is_master") and not user_has_business_admin(user, bid):
        _require_permission(bid, user, "users.create")

    role = payload.get("role", "user")
    result, status_code = auth_handler.create_user(
        username=payload["username"],
        email=payload["email"],
        password=payload["password"],
        full_name=payload.get("full_name"),
        role=role,
        is_master=False,
    )
    if status_code != 201:
        return JSONResponse(status_code=status_code, content=result)

    assign_result, assign_status = auth_handler.assign_business_access(
        user_id=result["id"],
        bid=bid,
        role=role,
    )
    if assign_status >= 400:
        return JSONResponse(status_code=assign_status, content=assign_result)

    desired_permissions = payload.get("permissions")
    if desired_permissions is not None:
        if not user.get("is_master") and not user_has_business_admin(user, bid):
            _require_permission(bid, user, "roles.assign")
        perm_result, perm_status = auth_handler.sync_permission_overrides(
            result["id"], bid, role, desired_permissions
        )
        if perm_status >= 400:
            return JSONResponse(status_code=perm_status, content=perm_result)

    return JSONResponse(status_code=201, content=result)


@app.put("/business-admin/users/{user_id}/access")
async def business_admin_users_update_access(
    user_id: int,
    payload: Dict[str, Any],
    user: Dict[str, Any] = Depends(_auth_user),
):
    bid = str(payload.get("bid") or "").strip()
    if not bid:
        raise HTTPException(status_code=400, detail="bid is required")
    is_privileged = user.get("is_master") or user_has_business_admin(user, bid)

    role = payload.get("role")
    if role is not None:
        if not is_privileged:
            _require_permission(bid, user, "roles.assign")
        result, status_code = auth_handler.assign_business_access(user_id, bid, role)
        if status_code >= 400:
            return JSONResponse(status_code=status_code, content=result)

    if "groupnames" in payload or "agent_mappings" in payload:
        if not is_privileged:
            _require_permission(bid, user, "users.update")

    if "groupnames" in payload:
        result, status_code = auth_handler.replace_group_mappings(user_id, bid, payload.get("groupnames") or [])
        if status_code >= 400:
            return JSONResponse(status_code=status_code, content=result)

    if "agent_mappings" in payload:
        result, status_code = auth_handler.replace_agent_mappings(user_id, bid, payload.get("agent_mappings") or [])
        if status_code >= 400:
            return JSONResponse(status_code=status_code, content=result)

    if "permissions" in payload:
        if not is_privileged:
            _require_permission(bid, user, "roles.assign")
        users_preview, preview_status = auth_handler.get_users_by_business(bid)
        if preview_status >= 400:
            return JSONResponse(status_code=preview_status, content=users_preview)
        target_preview = next((u for u in users_preview if int(u.get("id") or 0) == int(user_id)), None)
        effective_role = AuthHandler.normalize_role(role or (target_preview or {}).get("role") or "user")
        perm_result, perm_status = auth_handler.sync_permission_overrides(
            user_id, bid, effective_role, payload.get("permissions") or []
        )
        if perm_status >= 400:
            return JSONResponse(status_code=perm_status, content=perm_result)

    users, status_code = auth_handler.get_users_by_business(bid)
    if status_code >= 400:
        return JSONResponse(status_code=status_code, content=users)
    target = next((u for u in users if int(u.get("id") or 0) == int(user_id)), None)
    if not target:
        raise HTTPException(status_code=404, detail="User not found in business")
    return jsonable_encoder({"user": target})


@app.get("/list-businesses")
def list_businesses():
    return db_handler.get_all_businesses()


@app.get("/businesses/{bid}/info")
def business_info(bid: str):
    info = db_handler.get_business_info(bid)
    if not info:
        raise HTTPException(status_code=404, detail="Business not found")
    return info


@app.post("/reported-issues")
def create_reported_issue(payload: ReportedIssueCreateRequest, user: Dict[str, Any] = Depends(_auth_user)):
    bid = str(payload.business_id or "").strip()
    issue_text = str(payload.issue_text or "").strip()
    if not bid:
        raise HTTPException(status_code=400, detail="business_id is required")
    if not issue_text:
        raise HTTPException(status_code=400, detail="issue_text is required")
    _auth_user_for_bid(bid, user)

    business_name = (payload.business_name or "").strip() or None
    if not business_name:
        try:
            info = db_handler.get_business_info(bid) or {}
            business_name = (
                info.get("name")
                or info.get("business_name")
                or info.get("businessName")
                or None
            )
        except Exception:
            business_name = None

    raw_user_id = user.get("id")
    user_id = int(raw_user_id) if str(raw_user_id or "").isdigit() else None

    _ensure_reported_issues_table()
    with _db_conn() as conn:
        cursor = conn.cursor()
        cursor.execute(
            """
            INSERT INTO reported_issues (
                business_id, business_name, business_user_name, user_id, username, user_email,
                page_path, call_id, scope, issue_text, status
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'open')
            """,
            (
                bid,
                business_name,
                (payload.business_user_name or user.get("username") or None),
                user_id,
                user.get("username"),
                user.get("email"),
                (payload.page_path or None),
                (payload.call_id or None),
                (payload.scope or None),
                issue_text,
            ),
        )
        issue_id = cursor.lastrowid
        cursor.execute("SELECT * FROM reported_issues WHERE id = %s", (issue_id,))
        issue = cursor.fetchone()

    return {"success": True, "issue": _serialize_reported_issue(issue)}


@app.get("/reported-issues")
def list_reported_issues(
    business_id: Optional[str] = Query(default=None),
    status: Optional[str] = Query(default=None),
    limit: int = Query(default=100, ge=1, le=500),
    offset: int = Query(default=0, ge=0),
    user: Dict[str, Any] = Depends(_auth_user),
):
    status_value = str(status or "").strip()
    if status_value and status_value not in {"open", "in_progress", "resolved", "dismissed"}:
        raise HTTPException(status_code=400, detail="Invalid issue status")

    bid = str(business_id or "").strip()
    if user.get("is_master"):
        pass
    elif bid:
        _auth_user_for_bid(bid, user)
    else:
        businesses = user.get("businesses") or []
        if not businesses:
            raise HTTPException(status_code=403, detail="No business access")
        bid = str(businesses[0].get("bid") or "").strip()
        _auth_user_for_bid(bid, user)

    _ensure_reported_issues_table()
    where = []
    params: List[Any] = []
    if bid:
        where.append("business_id = %s")
        params.append(bid)
    if status_value:
        where.append("status = %s")
        params.append(status_value)
    where_sql = f"WHERE {' AND '.join(where)}" if where else ""

    with _db_conn() as conn:
        cursor = conn.cursor()
        cursor.execute(
            f"SELECT COUNT(*) AS total FROM reported_issues {where_sql}",
            params,
        )
        total = int((cursor.fetchone() or {}).get("total") or 0)
        cursor.execute(
            f"""
            SELECT
                ri.*,
                COALESCE(NULLIF(ri.business_user_name, ''), NULLIF(bu.full_name, ''), ri.username) AS business_user_name_resolved
            FROM reported_issues ri
            LEFT JOIN business_users bu ON bu.id = ri.user_id
            {where_sql}
            ORDER BY ri.created_at DESC, ri.id DESC
            LIMIT %s OFFSET %s
            """,
            params + [limit, offset],
        )
        issues = cursor.fetchall() or []

    for issue in issues:
        issue["business_user_name"] = issue.pop("business_user_name_resolved", None) or issue.get("business_user_name")

    return {"issues": _to_json_safe(issues), "total": total, "limit": limit, "offset": offset}


@app.patch("/reported-issues/{issue_id}")
def update_reported_issue(
    issue_id: int,
    payload: ReportedIssueUpdateRequest,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_master(user)
    status_value = str(payload.status or "").strip()
    if status_value not in {"open", "in_progress", "resolved", "dismissed"}:
        raise HTTPException(status_code=400, detail="Invalid issue status")

    _ensure_reported_issues_table()
    with _db_conn() as conn:
        cursor = conn.cursor()
        cursor.execute(
            "UPDATE reported_issues SET status = %s WHERE id = %s",
            (status_value, issue_id),
        )
        if cursor.rowcount == 0:
            raise HTTPException(status_code=404, detail="Reported issue not found")
        cursor.execute("SELECT * FROM reported_issues WHERE id = %s", (issue_id,))
        issue = cursor.fetchone()

    return {"success": True, "issue": _serialize_reported_issue(issue)}


@app.get("/groupnames/{bid}")
def groupnames(
    bid: str,
    presales_only: bool = Query(default=False),
    lsq_row_count: int = Query(default=250),
    force_refresh: bool = Query(default=False),
):
    if presales_only:
        mapping = _get_presales_mapping_from_leadsquared(
            bid=bid,
            groupname=None,
            row_count=lsq_row_count,
            force_refresh=force_refresh,
        )
        groups = mapping.get("groups", [])
        return groups or db_handler.get_all_groupnames(bid)
    return db_handler.get_all_groupnames(bid)


@app.get("/agentnames/{bid}")
def agentnames(
    bid: str,
    groupname: Optional[str] = Query(default=None),
    presales_only: bool = Query(default=False),
    lsq_row_count: int = Query(default=250),
    force_refresh: bool = Query(default=False),
):
    if presales_only:
        mapping = _get_presales_mapping_from_leadsquared(
            bid=bid,
            groupname=groupname,
            row_count=lsq_row_count,
            force_refresh=force_refresh,
        )
        agents = [item.get("agentname") for item in mapping.get("agents", []) if item.get("agentname")]
        return agents or db_handler.get_agent_names(bid, groupname)
    return db_handler.get_agent_names(bid, groupname)


@app.get("/location-stats/{bid}")
def location_stats(
    bid: str,
    groupname: Optional[str] = Query(default=None),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    agent_names: Optional[str] = Query(default=None),
    detailed_calls: Optional[str] = Query(default=None),
    detailed_threshold_seconds: int = Query(default=120),
    direction: Optional[str] = Query(default=None),
    user: Optional[Dict[str, Any]] = Depends(_optional_auth_user),
):
    scope = _resolve_list_scope(user, bid, groupname, agent_names)
    return db_handler.get_location_stats(
        bid,
        scope["groupname"],
        date_from,
        date_to,
        agent_names=scope["agent_names"],
        detailed_calls=detailed_calls,
        detailed_threshold_seconds=detailed_threshold_seconds,
        direction=direction,
        scope_where=scope["scope_where"],
        scope_params=scope["scope_params"],
    )


@app.get("/location-calls/{bid}")
def location_calls(
    bid: str,
    groupname: Optional[str] = Query(default=None),
    direction: Optional[str] = Query(default=None),
    call_status: Optional[str] = Query(default=None),
    limit: int = Query(default=100),
    offset: int = Query(default=0),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    user: Optional[Dict[str, Any]] = Depends(_optional_auth_user),
):
    scope = _resolve_list_scope(user, bid, groupname, None)
    return db_handler.get_filtered_raw_calls(
        bid,
        scope["groupname"],
        direction,
        call_status,
        limit,
        offset,
        date_from,
        date_to,
        scope_where=scope["scope_where"],
        scope_params=scope["scope_params"],
    )


@app.get("/raw-calls/{bid}/{callid}")
def raw_call_details(
    bid: str,
    callid: str,
    user: Optional[Dict[str, Any]] = Depends(_optional_auth_user),
):
    call = db_handler.get_raw_call_details(bid, callid)
    if not call:
        raise HTTPException(status_code=404, detail="Call not found")
    _assert_call_in_scope(user, bid, call)
    return call


@app.get("/analytics/{bid}/pending")
def pending_analytics(bid: str, limit: int = Query(default=10)):
    calls = db_handler.get_calls_for_analysis(bid, limit)
    return {"count": len(calls), "calls": calls}


@app.get("/analytics/{bid}/dashboard")
def analytics_dashboard(
    bid: str,
    groupname: Optional[str] = Query(default=None),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    user: Optional[Dict[str, Any]] = Depends(_optional_auth_user),
):
    scope = _resolve_list_scope(user, bid, groupname, None)
    return {
        "overview": db_handler.get_analytics_overview(
            bid, scope["groupname"], date_from, date_to, scope["scope_where"], scope["scope_params"]
        ),
        "sentiment_by_location": db_handler.get_sentiment_by_location(
            bid, scope["groupname"], date_from, date_to, scope["scope_where"], scope["scope_params"]
        ),
        "quality_by_location": db_handler.get_quality_by_location(
            bid, scope["groupname"], date_from, date_to, scope["scope_where"], scope["scope_params"]
        ),
        "quality_by_agent": db_handler.get_quality_by_agent(
            bid, scope["groupname"], date_from, date_to, scope["scope_where"], scope["scope_params"]
        ),
        "call_purposes": db_handler.get_call_purpose_frequency(
            bid, scope["groupname"], date_from, date_to, scope["scope_where"], scope["scope_params"]
        ),
        "concerns_frequency": db_handler.get_concerns_frequency(
            bid, scope["groupname"], date_from, date_to, scope["scope_where"], scope["scope_params"]
        ),
        "busy_locations": db_handler.get_busy_locations(
            bid, scope["groupname"], date_from, date_to, scope["scope_where"], scope["scope_params"]
        ),
    }


@app.get("/analytics/{bid}/calls-by-objection")
def analytics_calls_by_objection(
    bid: str,
    objection: str = Query(...),
    groupname: Optional[str] = Query(default=None),
    user: Optional[Dict[str, Any]] = Depends(_optional_auth_user),
):
    scope = _resolve_list_scope(user, bid, groupname, None)
    return db_handler.get_calls_by_objection(
        bid, objection, scope["groupname"], scope["scope_where"], scope["scope_params"]
    )


@app.get("/calls/{bid}")
def calls_list(
    bid: str,
    status: Optional[int] = Query(default=None),
    sales_intent: Optional[str] = Query(default=None),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    limit: int = Query(default=100),
    offset: int = Query(default=0),
    user: Optional[Dict[str, Any]] = Depends(_optional_auth_user),
):
    scope = _resolve_list_scope(user, bid)
    filters: Dict[str, Any] = {}
    if status is not None:
        filters["status"] = status
    if sales_intent:
        filters["sales_intent"] = sales_intent
    if date_from:
        filters["date_from"] = date_from
    if date_to:
        filters["date_to"] = date_to
    calls = db_handler.get_calls(
        bid, filters, limit, offset, scope["scope_where"], scope["scope_params"]
    )
    total_count = db_handler.get_calls_count(
        bid, filters, scope["scope_where"], scope["scope_params"]
    )
    return {"calls": calls, "total": total_count, "limit": limit, "offset": offset}


@app.get("/calls/{bid}/{callid}")
def call_details(
    bid: str,
    callid: str,
    user: Optional[Dict[str, Any]] = Depends(_optional_auth_user),
):
    call = db_handler.get_call_by_id(bid, callid)
    if not call:
        raise HTTPException(status_code=404, detail="Call not found")
    _assert_call_in_scope(user, bid, call)

    transcript_data = db_handler.get_call_transcript(bid, callid)
    if transcript_data:
        call["transcripts"] = transcript_data.get("transcript", "")
        call["language"] = transcript_data.get("language", "")
        speaker_segments = transcript_data.get("speaker_segments")
        if speaker_segments and isinstance(speaker_segments, str):
            try:
                speaker_segments = json.loads(speaker_segments)
            except json.JSONDecodeError:
                speaker_segments = []
        call["speaker_segments"] = speaker_segments
        call["num_speakers"] = transcript_data.get("num_speakers")
        call["duration"] = transcript_data.get("duration")

    analytics_data = db_handler.get_call_analytics(bid, callid)
    if analytics_data:
        call["summary"] = analytics_data.get("summary", "")
        call["call_purpose"] = analytics_data.get("call_purpose", "")
        call["objections_concerns"] = analytics_data.get("objections_concerns", "")
        call["objection_type"] = analytics_data.get("objection_type", "")
        call["quality_score"] = analytics_data.get("quality_score")
        call["propensity_score"] = analytics_data.get("propensity_score")
        call["propensity_band"] = analytics_data.get("propensity_band")
        call["propensity_parameter_scores"] = analytics_data.get("propensity_parameter_scores")
        call["propensity_parameter_detections"] = analytics_data.get("propensity_parameter_detections")
        call["sentiments"] = analytics_data.get("sentiment", "")
        call["parameter_scores"] = analytics_data.get("parameter_scores")
        call["talk_listen_ratio"] = analytics_data.get("talk_listen_ratio")
        call["agent_speak_percentage"] = analytics_data.get("agent_speak_percentage")
        call["customer_speak_percentage"] = analytics_data.get("customer_speak_percentage")
        call["dead_air_percentage"] = analytics_data.get("dead_air_percentage")

    bant_data = db_handler.get_bant_analysis(bid, callid)
    if bant_data:
        call["bant_profile"] = bant_data.get("profile")
        call["bant_summary"] = bant_data.get("summary")
    return call


@app.get("/bant/{bid}/{callid}")
def bant_details(bid: str, callid: str):
    bant = db_handler.get_bant_analysis(bid, callid)
    if not bant:
        raise HTTPException(status_code=404, detail="BANT not found")
    return bant


@app.delete("/calls/{bid}/{callid}/transcript")
def delete_call_transcript(bid: str, callid: str):
    db_handler.delete_transcript(bid, callid)
    db_handler.reset_transcription_status(bid, callid)
    return {"message": "Transcript deleted successfully", "callid": callid}


@app.patch("/calls/{bid}/{callid}/segment/{segment_index}")
def patch_call_segment(bid: str, callid: str, segment_index: int, payload: Dict[str, Any]):
    new_text = str(payload.get("text", "")).strip()
    if not new_text:
        raise HTTPException(status_code=400, detail="Text cannot be empty")
    db_handler.update_speaker_segment_text(bid, callid, segment_index, new_text)
    return {"message": "Segment updated successfully", "callid": callid, "segment_index": segment_index}


@app.get("/calls/{bid}/recent")
def recent_calls(
    bid: str,
    limit: int = Query(default=10),
    user: Optional[Dict[str, Any]] = Depends(_optional_auth_user),
):
    scope = _resolve_list_scope(user, bid)
    return db_handler.get_recent_calls(
        bid, limit, scope["scope_where"], scope["scope_params"]
    )


@app.post("/calls/search")
def calls_search(
    payload: Dict[str, Any],
    user: Optional[Dict[str, Any]] = Depends(_optional_auth_user),
):
    bid = payload.get("bid")
    query = payload.get("query", "")
    limit = int(payload.get("limit", 50))
    if not bid:
        raise HTTPException(status_code=400, detail="Business ID is required")
    scope = _resolve_list_scope(user, str(bid))
    return db_handler.search_calls(
        str(bid), query, limit, scope["scope_where"], scope["scope_params"]
    )


@app.get("/leads/{bid}")
def leads_list(
    bid: str,
    groupname: Optional[str] = Query(default=None),
    limit: int = Query(default=100),
    offset: int = Query(default=0),
    transcripts_only: bool = Query(default=False),
    direction: Optional[str] = Query(default=None),
    user: Optional[Dict[str, Any]] = Depends(_optional_auth_user),
):
    scope = _resolve_list_scope(user, bid, groupname, None)
    result = db_handler.get_leads_list(
        bid=bid,
        groupname=scope["groupname"],
        limit=limit,
        offset=offset,
        transcripts_only=transcripts_only,
        direction=direction,
        scope_where=scope["scope_where"],
        scope_params=scope["scope_params"],
    )
    return {"leads": result.get("leads", []), "total": int(result.get("total", 0)), "limit": limit, "offset": offset}


@app.post("/leads/{bid}/bulk-upload")
def leads_bulk_upload(
    bid: str,
    payload: Dict[str, Any] = Body(default={}),
    user: Dict[str, Any] = Depends(_auth_user),
):
    """Import calls from CSV (base64) into {bid}_raw_calls for bulk lead recordings."""
    _auth_user_for_bid(bid, user)
    file_b64 = str((payload or {}).get("file_data_base64") or "").strip()
    if not file_b64:
        raise HTTPException(status_code=400, detail="file_data_base64 is required")
    try:
        csv_bytes = base64.b64decode(file_b64, validate=False)
    except Exception as exc:
        raise HTTPException(status_code=400, detail="Invalid file_data_base64") from exc
    if not csv_bytes:
        raise HTTPException(status_code=400, detail="CSV file is empty")
    try:
        csv_text = csv_bytes.decode("utf-8-sig")
    except UnicodeDecodeError:
        csv_text = csv_bytes.decode("latin-1", errors="replace")

    groupname = (payload or {}).get("groupname")
    try:
        result = import_raw_calls_from_csv(
            db_handler,
            str(bid),
            csv_text,
            groupname=str(groupname).strip() if groupname else None,
        )
    except ValueError as exc:
        raise HTTPException(status_code=400, detail=str(exc)) from exc
    except Exception as exc:
        logger.exception("Bulk upload failed for bid %s", bid)
        raise HTTPException(status_code=500, detail=f"Bulk upload failed: {exc}") from exc
    return result


@app.post("/leads/{bid}/{lead_phone:path}/recordings")
def leads_upload_recording(
    bid: str,
    lead_phone: str,
    payload: Dict[str, Any] = Body(default={}),
    user: Dict[str, Any] = Depends(_auth_user),
):
    """Upload a single recording (URL or base64 file) for a lead."""
    _auth_user_for_bid(bid, user)
    decoded_phone = unquote(lead_phone)
    try:
        return insert_lead_recording(
            db_handler,
            CONFIG,
            str(bid),
            decoded_phone,
            payload or {},
        )
    except ValueError as exc:
        raise HTTPException(status_code=400, detail=str(exc)) from exc
    except Exception as exc:
        logger.exception("Lead recording upload failed for bid %s", bid)
        raise HTTPException(status_code=500, detail=f"Upload failed: {exc}") from exc


def _invoke_lead_insights_chat(prompt: str) -> Optional[str]:
    return rag_handler._invoke_chat_model(prompt)  # noqa: SLF001


def _enrich_lead_details_with_bant(bid: str, details: Dict[str, Any]) -> Dict[str, Any]:
    for call in details.get("calls") or []:
        callid = call.get("callid")
        if not callid or call.get("bant_profile"):
            continue
        bant = db_handler.get_bant_analysis(bid, callid)
        if bant:
            call["bant_profile"] = bant.get("profile")
            call["bant_summary"] = bant.get("summary") or ""
    return details


@app.get("/leads/{bid}/{lead_phone:path}/insights")
def lead_insights(bid: str, lead_phone: str):
    decoded_phone = unquote(lead_phone)
    details = db_handler.get_lead_details(bid=bid, lead_phone=decoded_phone)
    if not details:
        raise HTTPException(status_code=404, detail="Lead not found")
    details = _enrich_lead_details_with_bant(bid, details)
    data_capture_fields = db_handler.list_data_capture_fields(bid)
    return {
        "insights": build_lead_insights(
            details,
            data_capture_fields=data_capture_fields,
        )
    }


@app.post("/leads/{bid}/{lead_phone:path}/insights/generate")
def generate_lead_insights(bid: str, lead_phone: str):
    decoded_phone = unquote(lead_phone)
    details = db_handler.get_lead_details(bid=bid, lead_phone=decoded_phone)
    if not details:
        raise HTTPException(status_code=404, detail="Lead not found")
    details = _enrich_lead_details_with_bant(bid, details)
    calls_with_transcripts = [call for call in details.get("calls", []) if call.get("transcript")]
    if not calls_with_transcripts:
        raise HTTPException(status_code=400, detail="No transcripts found for this lead.")
    data_capture_fields = db_handler.list_data_capture_fields(bid)
    return {
        "insights": build_lead_insights(
            details,
            invoke_chat=_invoke_lead_insights_chat,
            use_llm=True,
            data_capture_fields=data_capture_fields,
        )
    }


@app.get("/leads/{bid}/{lead_phone:path}")
def lead_detail(
    bid: str,
    lead_phone: str,
    groupname: Optional[str] = Query(default=None),
    include_crm: bool = Query(default=True),
    user: Optional[Dict[str, Any]] = Depends(_optional_auth_user),
):
    decoded_phone = unquote(lead_phone)
    scope = _resolve_list_scope(user, bid, groupname, None)
    details = db_handler.get_lead_details(
        bid=bid,
        lead_phone=decoded_phone,
        groupname=scope["groupname"],
        scope_where=scope["scope_where"],
        scope_params=scope["scope_params"],
    )
    if not details:
        raise HTTPException(status_code=404, detail="Lead not found")
    crm = {"connected": False, "matched": False, "provider": "leadsquared", "lead": None}
    if include_crm:
        creds = db_handler.get_crm_credentials(bid, "leadsquared")
        if creds and creds.get("access_key") and creds.get("secret_key") and creds.get("is_active"):
            crm["connected"] = True
            cached = db_handler.get_cached_crm_lead_by_phone(bid=bid, provider="leadsquared", phone=decoded_phone)
            if cached and not _record_matches_phone(
                record=cached.get("lead_payload"),
                phone=decoded_phone,
                fallback_phone=None,
            ):
                cached = None
            if cached:
                crm["matched"] = True
                crm["lead"] = {
                    "raw": cached.get("lead_payload") or {},
                    "name": cached.get("lead_name"),
                    "email": cached.get("email"),
                    "phone": cached.get("phone_primary"),
                    "status": cached.get("lead_status"),
                    "owner_name": cached.get("owner_name"),
                    "next_task_due_date": cached.get("next_task_due_date"),
                }
            else:
                # No cache hit — try a live LSQ lookup and cache the result
                try:
                    from leadsquared_service import LeadSquaredService
                    lsq = LeadSquaredService(
                        access_key=creds["access_key"],
                        secret_key=creds["secret_key"],
                        api_host=creds.get("api_host") or "https://api-in21.leadsquared.com/v2/",
                    )
                    live = lsq.search_lead_by_phone(decoded_phone)
                    if live.get("success") and live.get("data"):
                        lead_rec = live["data"][0]
                        first = (lead_rec.get("FirstName") or "").strip()
                        last = (lead_rec.get("LastName") or "").strip()
                        name = (first + " " + last).strip() or lead_rec.get("ProspectName") or None
                        phone_val = lead_rec.get("Phone") or lead_rec.get("Mobile") or decoded_phone
                        import hashlib as _hl
                        eid = lead_rec.get("ProspectID") or _hl.sha256(decoded_phone.encode()).hexdigest()
                        db_handler.upsert_crm_lead_cache(
                            bid=bid, provider="leadsquared", external_lead_id=eid,
                            lead_name=name,
                            owner_name=lead_rec.get("OwnerName"),
                            email=lead_rec.get("EmailAddress"),
                            phone_primary=phone_val,
                            lead_status=lead_rec.get("LeadStatus"),
                            lead_payload=lead_rec,
                        )
                        crm["matched"] = True
                        crm["lead"] = {
                            "raw": lead_rec,
                            "name": name,
                            "email": lead_rec.get("EmailAddress"),
                            "phone": phone_val,
                            "status": lead_rec.get("LeadStatus"),
                            "owner_name": lead_rec.get("OwnerName"),
                            "next_task_due_date": lead_rec.get("NextTaskDueDate"),
                        }
                    else:
                        crm["message"] = "No CRM record found for this lead."
                except Exception:
                    crm["message"] = "No CRM record found for this lead."
    details["crm"] = crm
    details["customer_profile"] = _build_customer_profile(details, crm)
    return details


@app.put("/leads/{bid}/{lead_phone:path}/profile")
def update_lead_profile(
    bid: str,
    lead_phone: str,
    body: Dict[str, Any] = Body(...),
):
    """Persist manually-entered lead profile data into crm_leads_cache."""
    import hashlib as _hl
    decoded_phone = unquote(lead_phone)
    # Check for existing cache entry to merge with
    existing = db_handler.get_cached_crm_lead_by_phone(bid=bid, provider="leadsquared", phone=decoded_phone)
    existing_payload = (existing or {}).get("lead_payload") or {}
    if isinstance(existing_payload, str):
        try:
            import json as _json
            existing_payload = _json.loads(existing_payload)
        except Exception:
            existing_payload = {}

    # Merge: existing LSQ data + manual overrides
    merged_payload = {**existing_payload, **{k: v for k, v in body.items() if v is not None and v != ""}}

    eid = (existing or {}).get("external_lead_id") or _hl.sha256(f"{decoded_phone}|manual".encode()).hexdigest()

    db_handler.upsert_crm_lead_cache(
        bid=bid,
        provider="leadsquared",
        external_lead_id=eid,
        lead_name=body.get("contact_name") or (existing or {}).get("lead_name"),
        owner_name=body.get("owner_name") or (existing or {}).get("owner_name"),
        email=body.get("email") or (existing or {}).get("email"),
        phone_primary=decoded_phone,
        lead_status=body.get("lead_status") or (existing or {}).get("lead_status"),
        next_task_due_date=body.get("next_task_due_date") or (existing or {}).get("next_task_due_date"),
        lead_payload=merged_payload,
    )
    return {"success": True}


@app.get("/analytics/{bid}/stats")
def analytics_stats(bid: str, date_from: Optional[str] = Query(default=None), date_to: Optional[str] = Query(default=None)):
    return analytics_service.get_call_statistics(bid, date_from, date_to)


@app.get("/analytics/{bid}/sentiment")
def analytics_sentiment(bid: str, date_from: Optional[str] = Query(default=None), date_to: Optional[str] = Query(default=None)):
    return analytics_service.get_sentiment_distribution(bid, date_from, date_to)


@app.get("/analytics/{bid}/intent")
def analytics_intent(bid: str, date_from: Optional[str] = Query(default=None), date_to: Optional[str] = Query(default=None)):
    return analytics_service.get_intent_distribution(bid, date_from, date_to)


@app.get("/analytics/{bid}/trends")
def analytics_trends(bid: str, period: str = Query(default="day"), days: int = Query(default=7)):
    return analytics_service.get_trends(bid, period, days)


@app.get("/analytics/{bid}/agents")
def analytics_agents(bid: str, date_from: Optional[str] = Query(default=None), date_to: Optional[str] = Query(default=None)):
    return analytics_service.get_agent_performance(bid, date_from, date_to)


@app.get("/analytics/{bid}/leaderboard")
def analytics_leaderboard(
    bid: str,
    groupname: Optional[str] = Query(default=None),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    user: Optional[Dict[str, Any]] = Depends(_optional_auth_user),
):
    """Agent leaderboard ranked by average call quality score."""
    scope = _resolve_list_scope(user, bid, groupname, None)
    data = db_handler.get_agent_leaderboard(
        bid,
        scope["groupname"],
        date_from,
        date_to,
        scope["scope_where"],
        scope["scope_params"],
    )
    return {"leaderboard": data, "total_agents": len(data)}


@app.get("/analytics/{bid}/keywords")
def analytics_keywords(
    bid: str,
    limit: int = Query(default=20),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
):
    return analytics_service.get_top_keywords(bid, limit, date_from, date_to)


@app.get("/analytics/{bid}/{callid}")
def call_analytics(bid: str, callid: str):
    analytics = db_handler.get_call_analytics(bid, callid)
    if not analytics:
        raise HTTPException(status_code=404, detail="Analytics not found for this call")
    return analytics


@app.post("/queue-calls/{bid}")
def queue_calls(bid: str):
    calls = db_handler.get_calls(bid, {"status": 0}, limit=1000)
    return {"message": f"Queued {len(calls)} calls for processing", "count": len(calls), "business_id": bid}


@app.post("/process-calls/{bid}")
def process_calls(bid: str):
    return JSONResponse(status_code=202, content={"message": "Processing started", "business_id": bid})


@app.get("/transcripts/{bid}")
def get_transcripts(bid: str):
    return db_handler.get_transcripts(bid)


@app.get("/export/{bid}/calls")
def export_calls(
    bid: str,
    format: str = Query(default="json"),
    status: Optional[int] = Query(default=None),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
):
    filters = {k: v for k, v in {"status": status, "date_from": date_from, "date_to": date_to}.items() if v is not None}
    calls = db_handler.get_calls(bid, filters, limit=10000)
    if format == "csv":
        output = StringIO()
        if calls:
            writer = csv.DictWriter(output, fieldnames=calls[0].keys())
            writer.writeheader()
            writer.writerows(calls)
        headers = {"Content-Disposition": f"attachment; filename=calls_{bid}_{datetime.now().strftime('%Y%m%d')}.csv"}
        return PlainTextResponse(content=output.getvalue(), media_type="text/csv", headers=headers)
    return calls


def _split_csv_filter(value: Optional[str]) -> List[str]:
    return [item.strip() for item in str(value or "").split(",") if item.strip()]


def _export_filters(
    groupname: Optional[str],
    date_from: Optional[str],
    date_to: Optional[str],
    agent_names: Optional[str] = None,
    customer_name: Optional[str] = None,
    detailed_calls: Optional[str] = None,
    quality_min: Optional[float] = None,
    quality_max: Optional[float] = None,
    direction: Optional[str] = None,
    call_status: Optional[str] = None,
    include_quality_filters: bool = False,
) -> tuple[str, List[Any]]:
    where_clauses: List[str] = []
    params: List[Any] = []
    if groupname:
        where_clauses.append("r.groupname = %s")
        params.append(groupname)
    if date_from:
        where_clauses.append("DATE(r.call_starttime) >= %s")
        params.append(date_from)
    if date_to:
        where_clauses.append("DATE(r.call_starttime) <= %s")
        params.append(date_to)
    agents = _split_csv_filter(agent_names)
    if agents:
        placeholders = ", ".join(["%s"] * len(agents))
        where_clauses.append(f"r.agentname IN ({placeholders})")
        params.extend(agents)
    if customer_name:
        where_clauses.append("CAST(r.customer_callinfo AS CHAR) LIKE %s")
        params.append(f"%{customer_name}%")
    if direction:
        where_clauses.append("LOWER(r.direction) = LOWER(%s)")
        params.append(direction)
    if call_status:
        where_clauses.append("UPPER(r.call_status) = UPPER(%s)")
        params.append(call_status)
    detailed = str(detailed_calls or "").strip().lower()
    if detailed == "yes":
        where_clauses.append("TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) > 120")
    elif detailed == "no":
        where_clauses.append("(TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) <= 120 OR r.call_endtime IS NULL)")
    if include_quality_filters:
        if quality_min is not None:
            where_clauses.append("a.quality_score >= %s")
            params.append(quality_min)
        if quality_max is not None:
            where_clauses.append("a.quality_score <= %s")
            params.append(quality_max)
    return (" WHERE " + " AND ".join(where_clauses)) if where_clauses else "", params


EXPORT_HEADER_LABELS = {
    "customer_number": "CUSTOMER NUMBER",
    "customer_name": "CUSTOMER NAME",
    "callid": "CALL ID",
    "agentname": "AGENT NAME",
    "groupname": "GROUP NAME",
    "call_starttime": "CALL START TIME",
    "call_endtime": "CALL END TIME",
    "direction": "DIRECTION",
    "call_status": "CALL STATUS",
    "duration_seconds": "DURATION SECONDS",
    "transcription_status": "TRANSCRIPTION STATUS",
    "language": "LANGUAGE",
    "num_speakers": "NUM SPEAKERS",
    "transcript_duration": "TRANSCRIPT DURATION",
    "transcript": "TRANSCRIPT",
    "quality_score": "QUALITY SCORE",
    "sentiment": "SENTIMENT",
    "call_purpose": "CALL PURPOSE",
    "objections_concerns": "OBJECTIONS CONCERNS",
    "objection_type": "OBJECTION TYPE",
    "talk_listen_ratio": "TALK LISTEN RATIO",
    "agent_speak_percentage": "AGENT SPEAK PERCENTAGE",
    "customer_speak_percentage": "CUSTOMER SPEAK PERCENTAGE",
    "dead_air_percentage": "DEAD AIR PERCENTAGE",
    "summary": "SUMMARY",
    "parameter_scores": "PARAMETER SCORES",
}
EXPORT_FIRST_COLUMNS = ("customer_number", "customer_name", "callid")
QUALITY_EXPORT_HEADER_LABELS = {**EXPORT_HEADER_LABELS, "duration_seconds": "DURATION"}
TRANSCRIPTION_EXPORT_HEADER_LABELS = {**EXPORT_HEADER_LABELS, "duration_seconds": "DURATION"}


def _format_seconds_as_hhmmss(seconds: Any) -> str:
    try:
        total = int(round(float(seconds)))
    except (TypeError, ValueError):
        return ""
    if total < 0:
        total = 0
    hours = total // 3600
    minutes = (total % 3600) // 60
    secs = total % 60
    return f"{hours:02d}:{minutes:02d}:{secs:02d}"


def _export_ratio_mm_ss(left: int, right: int) -> str:
    """Two-part ratio for export (e.g. 49:51). Excel formula avoids 49:51:00 display."""
    return f'="{int(left)}:{int(right)}"'


def _parse_speak_percentage(value: Any) -> Optional[int]:
    if value is None or str(value).strip() == "":
        return None
    try:
        return int(round(float(value)))
    except (TypeError, ValueError):
        return None


def _ratio_from_talk_times(agent_talk_time: Any, customer_talk_time: Any) -> Optional[tuple]:
    try:
        agent_seconds = max(0.0, float(agent_talk_time or 0))
        customer_seconds = max(0.0, float(customer_talk_time or 0))
    except (TypeError, ValueError):
        return None
    total = agent_seconds + customer_seconds
    if total <= 0:
        return None
    return int(round(100 * agent_seconds / total)), int(round(100 * customer_seconds / total))


def _parse_talk_listen_ratio_parts(value: Any) -> Optional[tuple]:
    """Normalize mixed inputs to (left, right) for MM:SS-style export."""
    if value is None:
        return None
    if isinstance(value, timedelta):
        total = max(0, int(value.total_seconds()))
        hours, rem = divmod(total, 3600)
        minutes, seconds = divmod(rem, 60)
        if hours == 0:
            return minutes, seconds
        return hours * 60 + minutes, seconds
    if isinstance(value, dt_time):
        if value.hour == 0:
            return value.minute, value.second
        return value.hour * 60 + value.minute, value.second

    if isinstance(value, (int, float)) and not isinstance(value, bool):
        numeric = float(value)
        if 0 < numeric < 1:
            total_seconds = int(round(numeric * 86400))
            minutes, seconds = divmod(total_seconds, 60)
            hours, minutes = divmod(minutes, 60)
            if hours == 0:
                return minutes, seconds
            return hours * 60 + minutes, seconds
        return None

    text = str(value).strip()
    if not text or text.upper() in ("N/A", "NA"):
        return None
    normalized = re.sub(r"\s*:\s*", ":", text)
    parts = [p for p in normalized.split(":") if p != ""]
    if len(parts) < 2:
        return None

    def _to_int(part: str) -> int:
        return int(round(float(part)))

    if len(parts) >= 3:
        third = parts[2]
        if third in ("0", "00", "0.0", "0.00") or (third.replace(".", "", 1).isdigit() and int(float(third)) == 0):
            return _to_int(parts[0]), _to_int(parts[1])
        if parts[0] in ("0", "00") and parts[1].replace(".", "", 1).isdigit() and parts[2].replace(".", "", 1).isdigit():
            return _to_int(parts[1]), _to_int(parts[2])
        return _to_int(parts[0]), _to_int(parts[1])

    if len(parts) == 2 and all(p.replace(".", "", 1).isdigit() for p in parts):
        return _to_int(parts[0]), _to_int(parts[1])
    return None


def _quality_export_talk_listen_ratio(row: Dict[str, Any]) -> str:
    """Standardize Talk Listen Ratio to MM:SS-style strings (e.g. 49:51, 81:19)."""
    agent_pct = _parse_speak_percentage(row.get("agent_speak_percentage"))
    customer_pct = _parse_speak_percentage(row.get("customer_speak_percentage"))
    if agent_pct is not None and customer_pct is not None and (agent_pct > 0 or customer_pct > 0):
        return _export_ratio_mm_ss(agent_pct, customer_pct)

    from_talk_times = _ratio_from_talk_times(row.get("agent_talk_time"), row.get("customer_talk_time"))
    if from_talk_times:
        return _export_ratio_mm_ss(from_talk_times[0], from_talk_times[1])

    parsed = _parse_talk_listen_ratio_parts(row.get("talk_listen_ratio"))
    if parsed:
        return _export_ratio_mm_ss(parsed[0], parsed[1])

    raw = str(row.get("talk_listen_ratio") or "").strip()
    if raw.upper() in ("N/A", "NA"):
        return "NA"
    return raw


def _prepare_transcription_export_rows(rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    prepared = []
    for row in rows or []:
        new_row = dict(row)
        if "duration_seconds" in new_row:
            new_row["duration_seconds"] = _format_seconds_as_hhmmss(new_row.get("duration_seconds"))
        prepared.append(new_row)
    return prepared


def _prepare_quality_export_rows(rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    prepared = []
    for row in rows or []:
        new_row = dict(row)
        if "duration_seconds" in new_row:
            new_row["duration_seconds"] = _format_seconds_as_hhmmss(new_row.get("duration_seconds"))
        new_row["talk_listen_ratio"] = _quality_export_talk_listen_ratio(new_row)
        prepared.append(new_row)
    return prepared


def _add_customer_export_fields(bid: str, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    if not rows:
        return rows
    phones = [
        str(row.get("customer_number") or row.get("customer_callinfo") or "").strip()
        for row in rows
    ]
    phones = [phone for phone in phones if phone]
    crm_by_phone: Dict[str, Dict[str, Any]] = {}
    if phones:
        try:
            crm_by_phone = db_handler.get_crm_enrichment_for_phones(bid, "leadsquared", phones) or {}
        except Exception as exc:
            logger.warning("Customer name enrichment skipped for export %s: %s", bid, exc)

    enriched = []
    for row in rows:
        new_row = dict(row)
        customer_number = str(new_row.pop("customer_callinfo", None) or new_row.get("customer_number") or "").strip()
        crm_info = crm_by_phone.get(customer_number) or {}
        new_row["customer_number"] = customer_number
        new_row["customer_name"] = new_row.get("customer_name") or crm_info.get("lead_name") or ""
        enriched.append(new_row)
    return enriched


def _format_export_csv_rows(
    rows: List[Dict[str, Any]],
    header_labels: Optional[Dict[str, str]] = None,
) -> List[Dict[str, Any]]:
    labels = header_labels or EXPORT_HEADER_LABELS
    formatted = []
    for row in rows or []:
        ordered_keys = list(EXPORT_FIRST_COLUMNS)
        ordered_keys.extend(key for key in row.keys() if key not in EXPORT_FIRST_COLUMNS)
        formatted.append({
            labels.get(key, str(key).replace("_", " ").upper()): row.get(key, "")
            for key in ordered_keys
        })
    return formatted


def _csv_response(
    rows: List[Dict[str, Any]],
    filename: str,
    header_labels: Optional[Dict[str, str]] = None,
) -> PlainTextResponse:
    rows = _format_export_csv_rows(rows, header_labels=header_labels)
    output = StringIO()
    if rows:
        writer = csv.DictWriter(output, fieldnames=list(rows[0].keys()), extrasaction="ignore")
        writer.writeheader()
        writer.writerows(rows)
    headers = {"Content-Disposition": f"attachment; filename={filename}"}
    return PlainTextResponse(content=output.getvalue(), media_type="text/csv", headers=headers)


@app.get("/export/{bid}/transcriptions")
def export_transcriptions(
    bid: str,
    groupname: Optional[str] = Query(default=None),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    agent_names: Optional[str] = Query(default=None),
    customer_name: Optional[str] = Query(default=None),
    detailed_calls: Optional[str] = Query(default=None),
    quality_min: Optional[float] = Query(default=None),
    quality_max: Optional[float] = Query(default=None),
    direction: Optional[str] = Query(default=None),
    call_status: Optional[str] = Query(default=None),
):
    raw_table = f"{bid}_raw_calls"
    sarvam_table = f"{bid}_sarvamresponse"
    analytics_table = f"{bid}_callanalytics"
    with db_handler.get_connection() as conn:
        cursor = conn.cursor()
        if not db_handler._table_exists(cursor, raw_table):
            rows: List[Dict[str, Any]] = []
        else:
            has_sarvam = db_handler._table_exists(cursor, sarvam_table)
            needs_quality_join = quality_min is not None or quality_max is not None
            has_analytics = db_handler._table_exists(cursor, analytics_table)
            if needs_quality_join and not has_analytics:
                return _csv_response([], f"transcriptions_{bid}_{datetime.now().strftime('%Y%m%d')}.csv")
            where_sql, params = _export_filters(
                groupname,
                date_from,
                date_to,
                agent_names=agent_names,
                customer_name=customer_name,
                detailed_calls=detailed_calls,
                quality_min=quality_min,
                quality_max=quality_max,
                direction=direction,
                call_status=call_status,
                include_quality_filters=needs_quality_join,
            )
            sarvam_cols = db_handler.sarvam_export_select_exprs(cursor, sarvam_table)
            quality_join_sql = f"INNER JOIN `{analytics_table}` a ON r.callid = a.callid" if needs_quality_join else ""
            cursor.execute(
                f"""
                SELECT
                    r.customer_callinfo AS customer_number,
                    r.callid,
                    r.agentname,
                    r.groupname,
                    r.call_starttime,
                    r.call_endtime,
                    r.direction,
                    r.call_status,
                    TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) AS duration_seconds,
                    r.transcription_status,
                    {sarvam_cols["num_speakers"]} AS num_speakers,
                    {sarvam_cols["transcript_duration"]} AS transcript_duration,
                    {sarvam_cols["transcript"]} AS transcript,
                    {sarvam_cols["raw_response"]} AS raw_response
                FROM `{raw_table}` r
                {quality_join_sql}
                {where_sql}
                ORDER BY r.call_starttime DESC
                LIMIT 10000
                """,
                params,
            )
            rows = db_handler.prepare_transcription_csv_export_rows(cursor.fetchall() or [])
            rows = _add_customer_export_fields(bid, rows)
            rows = _prepare_transcription_export_rows(rows)
    filename = f"transcriptions_{bid}_{datetime.now().strftime('%Y%m%d')}.csv"
    return _csv_response(rows, filename, header_labels=TRANSCRIPTION_EXPORT_HEADER_LABELS)


@app.get("/export/{bid}/quality")
def export_quality(
    bid: str,
    groupname: Optional[str] = Query(default=None),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    agent_names: Optional[str] = Query(default=None),
    customer_name: Optional[str] = Query(default=None),
    detailed_calls: Optional[str] = Query(default=None),
    quality_min: Optional[float] = Query(default=None),
    quality_max: Optional[float] = Query(default=None),
    direction: Optional[str] = Query(default=None),
    call_status: Optional[str] = Query(default=None),
):
    raw_table = f"{bid}_raw_calls"
    analytics_table = f"{bid}_callanalytics"
    with db_handler.get_connection() as conn:
        cursor = conn.cursor()
        if not db_handler._table_exists(cursor, raw_table) or not db_handler._table_exists(cursor, analytics_table):
            rows: List[Dict[str, Any]] = []
        else:
            where_sql, params = _export_filters(
                groupname,
                date_from,
                date_to,
                agent_names=agent_names,
                customer_name=customer_name,
                detailed_calls=detailed_calls,
                quality_min=quality_min,
                quality_max=quality_max,
                direction=direction,
                call_status=call_status,
                include_quality_filters=True,
            )
            cursor.execute(
                f"""
                SELECT
                    r.customer_callinfo AS customer_number,
                    r.callid,
                    r.agentname,
                    r.groupname,
                    r.call_starttime,
                    r.call_endtime,
                    r.direction,
                    r.call_status,
                    TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) AS duration_seconds,
                    a.quality_score,
                    a.sentiment,
                    a.call_purpose,
                    a.objections_concerns,
                    a.objection_type,
                    a.talk_listen_ratio,
                    a.agent_talk_time,
                    a.customer_talk_time,
                    a.agent_speak_percentage,
                    a.customer_speak_percentage,
                    a.dead_air_percentage,
                    a.summary,
                    a.parameter_scores
                FROM `{raw_table}` r
                INNER JOIN `{analytics_table}` a ON r.callid = a.callid
                {where_sql}
                ORDER BY r.call_starttime DESC
                LIMIT 10000
                """,
                params,
            )
            rows = cursor.fetchall() or []
            rows = _add_customer_export_fields(bid, rows)
            rows = _prepare_quality_export_rows(rows)
    filename = f"quality_{bid}_{datetime.now().strftime('%Y%m%d')}.csv"
    return _csv_response(rows, filename, header_labels=QUALITY_EXPORT_HEADER_LABELS)


def _public_api_base(request: Request) -> str:
    forwarded = (request.headers.get("x-forwarded-proto") or "https").split(",")[0].strip()
    host = (request.headers.get("x-forwarded-host") or request.headers.get("host") or "").split(",")[0].strip()
    if host:
        return f"{forwarded}://{host}".rstrip("/")
    return str(request.base_url).rstrip("/")


def _resolve_ingest_secret(
    x_ingest_secret: Optional[str] = None,
    x_webhook_secret: Optional[str] = None,
) -> Optional[str]:
    return (x_ingest_secret or x_webhook_secret or "").strip() or None


def _call_ingest_response(result: Dict[str, Any]) -> JSONResponse:
    status = int(result.pop("http_status", 200))
    return JSONResponse(status_code=status, content=result)


@app.get("/v1/bids/{bid}/calls/ingest/schema")
def call_ingest_schema_v1(bid: str, request: Request):
    """Public JSON schema + URL for Mcube or any external integrator."""
    return ingest_schema_for_bid(bid, _public_api_base(request))


@app.post("/v1/bids/{bid}/calls/ingest")
def call_ingest_v1(
    bid: str,
    payload: Dict[str, Any] = Body(...),
    x_ingest_secret: Optional[str] = Header(default=None, alias="X-Ingest-Secret"),
    x_webhook_secret: Optional[str] = Header(default=None, alias="X-Webhook-Secret"),
):
    """Universal per-BID call ingest. Opt-in via webhook_ingest_enabled (default off)."""
    result = call_ingest_service.process(
        bid,
        payload,
        ingest_secret=_resolve_ingest_secret(x_ingest_secret, x_webhook_secret),
    )
    return _call_ingest_response(result)


@app.get("/webhook/call-ingest/schema")
def webhook_call_ingest_schema(
    request: Request,
    bid: Optional[str] = Query(default=None),
    user: Dict[str, Any] = Depends(_auth_user),
):
    """Schema for settings UI; optional bid returns per-BID ingest URL."""
    if bid:
        _auth_user_for_bid(bid, user)
        return ingest_schema_for_bid(bid, _public_api_base(request))
    return {
        **CANONICAL_INGEST_SCHEMA,
        "note": "Pass ?bid=6004 for per-BID ingest URL and settings.",
    }


@app.post("/webhook/call-ingest")
def webhook_call_ingest_alias(
    payload: Dict[str, Any] = Body(...),
    x_ingest_secret: Optional[str] = Header(default=None, alias="X-Ingest-Secret"),
    x_webhook_secret: Optional[str] = Header(default=None, alias="X-Webhook-Secret"),
):
    """Alias for /v1/bids/{bid}/calls/ingest — bid must be in JSON body."""
    bid = str(payload.get("bid") or "").strip()
    if not bid:
        raise HTTPException(status_code=400, detail="bid is required in JSON body for this endpoint")
    body = {k: v for k, v in payload.items() if k != "bid"}
    result = call_ingest_service.process(
        bid,
        body,
        ingest_secret=_resolve_ingest_secret(x_ingest_secret, x_webhook_secret),
    )
    return _call_ingest_response(result)


@app.post("/webhook/call-update")
def webhook_call_update(payload: Dict[str, Any]):
    bid = payload.get("bid")
    callid = payload.get("callid")
    if not bid or not callid:
        raise HTTPException(status_code=400, detail="bid and callid are required")
    success = db_handler.update_call(bid, callid, payload.get("data", {}))
    if success:
        try:
            rag_handler.backfill_transcripts(bid=bid, presales_only=False, limit=1, overwrite_existing=True, callids=[callid])
        except Exception as rag_exc:
            logger.warning("RAG auto-ingest skipped for %s/%s: %s", bid, callid, rag_exc)
        return {"message": "Call updated successfully"}
    raise HTTPException(status_code=500, detail="Failed to update call")


@app.post("/webhook/conversation-summary")
def webhook_conversation_summary(payload: Dict[str, Any]):
    bid = payload.get("business_id")
    callid = payload.get("callid")
    transfer_reason = payload.get("transfer_reason")
    if not bid or not callid:
        raise HTTPException(status_code=400, detail="business_id and callid are required")
    success = db_handler.save_conversation_summary(bid, callid, transfer_reason)
    if not success:
        raise HTTPException(status_code=500, detail="Failed to save summary")
    return {"message": "Summary saved successfully"}


@app.get("/data-capture-fields/{bid}")
def data_capture_fields_list(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    fields = db_handler.list_data_capture_fields(bid)
    return {"fields": fields}


@app.put("/data-capture-fields/{bid}")
def data_capture_fields_save(bid: str, payload: Dict[str, Any], user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    _require_settings_manage_or_admin(bid, user)
    fields = db_handler.replace_data_capture_fields(bid, (payload or {}).get("fields") or [])
    return {"fields": fields}


@app.get("/quality-parameters/{bid}")
def quality_parameters_list(bid: str):
    return quality_params_handler.get_parameters(bid)


@app.post("/quality-parameters/{bid}")
def quality_parameters_save(bid: str, payload: Dict[str, Any]):
    parameter_id = quality_params_handler.save_parameter(bid, payload or {})
    return {"message": "Parameter saved successfully", "parameter_id": parameter_id}


@app.get("/admin/default-parameters")
def admin_default_parameters_list(user: Dict[str, Any] = Depends(_auth_user)):
    _require_master(user)
    return quality_params_handler.get_template_parameters("default", seed_from_bid="8329")


@app.post("/admin/default-parameters")
def admin_default_parameters_save(payload: Dict[str, Any], user: Dict[str, Any] = Depends(_auth_user)):
    _require_master(user)
    parameter_id = quality_params_handler.save_template_parameter("default", payload or {})
    return {"message": "Default parameter saved successfully", "parameter_id": parameter_id}


@app.delete("/admin/default-parameters/{param_id}")
def admin_default_parameters_delete(param_id: int, user: Dict[str, Any] = Depends(_auth_user)):
    _require_master(user)
    deleted = quality_params_handler.delete_template_parameter("default", param_id)
    if not deleted:
        raise HTTPException(status_code=404, detail="Default parameter not found")
    return {"message": "Default parameter deleted successfully"}


@app.get("/quality-parameters/{bid}/{param_id}")
def quality_parameter_get(bid: str, param_id: int):
    parameter = quality_params_handler.get_parameter_by_id(bid, param_id)
    if not parameter:
        raise HTTPException(status_code=404, detail="Parameter not found")
    return parameter


@app.delete("/quality-parameters/{bid}/{param_id}")
def quality_parameter_delete(bid: str, param_id: int):
    success = quality_params_handler.delete_parameter(bid, param_id)
    if not success:
        raise HTTPException(status_code=404, detail="Parameter not found or could not be deleted")
    return {"message": "Parameter deleted successfully"}


@app.get("/quality-parameters/{bid}/groups")
def quality_groups(bid: str):
    return quality_params_handler.get_parameter_groups(bid)


@app.get("/quality-parameters/{bid}/total-score")
def quality_total_score(bid: str):
    return {"total_score": quality_params_handler.calculate_total_possible_score(bid)}


@app.post("/quality-parameters/{bid}/upload")
async def quality_parameters_upload(bid: str, file: UploadFile = File(...)):
    content = await file.read()
    try:
        return quality_params_handler.import_parameters_file(
            bid, file.filename or "upload.csv", content
        )
    except ValueError as exc:
        raise HTTPException(status_code=400, detail=str(exc)) from exc


@app.get("/propensity-parameters/{bid}")
def propensity_parameters_list(bid: str, seed: bool = Query(default=False)):
    propensity_params_handler.ensure_table()
    if seed:
        propensity_params_handler.seed_defaults_if_empty(bid)
    return propensity_params_handler.get_parameters(bid, active_only=False)


@app.post("/propensity-parameters/{bid}")
def propensity_parameters_save(bid: str, payload: Dict[str, Any]):
    propensity_params_handler.ensure_table()
    parameter_id = propensity_params_handler.save_parameter(bid, payload or {})
    return {"message": "Parameter saved successfully", "parameter_id": parameter_id}


@app.post("/propensity-parameters/{bid}/seed")
def propensity_parameters_seed(bid: str):
    propensity_params_handler.ensure_table()
    inserted = propensity_params_handler.seed_defaults_if_empty(bid)
    return {"message": "Defaults seeded", "inserted": inserted}


@app.delete("/propensity-parameters/{bid}/{param_id}")
def propensity_parameter_delete(bid: str, param_id: int):
    success = propensity_params_handler.delete_parameter(bid, param_id)
    if not success:
        raise HTTPException(status_code=404, detail="Parameter not found or could not be deleted")
    return {"message": "Parameter deleted successfully"}


@app.get("/propensity-parameters/{bid}/total-score")
def propensity_total_score(bid: str):
    return {"total_score": propensity_params_handler.calculate_total_possible_score(bid)}


@app.post("/propensity-parameters/{bid}/upload")
async def propensity_parameters_upload(bid: str, file: UploadFile = File(...)):
    content = await file.read()
    try:
        return propensity_params_handler.import_parameters_file(
            bid, file.filename or "upload.csv", content
        )
    except ValueError as exc:
        raise HTTPException(status_code=400, detail=str(exc)) from exc


@app.get("/objection-classifications/{bid}")
def objection_classifications(bid: str, business_type: Optional[str] = Query(default=None), is_active: Optional[str] = Query(default=None)):
    active = None if is_active is None else str(is_active).lower() == "true"
    return objection_handler.get_all_classifications(bid, business_type, active)


@app.get("/objection-classifications/{bid}/{classification_id}")
def objection_get_by_id(bid: str, classification_id: int):
    classification = objection_handler.get_classification_by_id(bid, classification_id)
    if not classification:
        raise HTTPException(status_code=404, detail="Classification not found")
    return classification


@app.post("/objection-classifications/{bid}")
def objection_create(bid: str, payload: Dict[str, Any]):
    if not payload or not payload.get("category_name"):
        raise HTTPException(status_code=400, detail="category_name is required")
    classification_id = objection_handler.create_classification(bid, payload, payload.get("created_by", "admin"))
    return JSONResponse(status_code=201, content={"id": classification_id, "message": "Classification created successfully"})


@app.put("/objection-classifications/{bid}/{classification_id}")
def objection_update(bid: str, classification_id: int, payload: Dict[str, Any]):
    success = objection_handler.update_classification(bid, classification_id, payload or {}, (payload or {}).get("updated_by", "admin"))
    if not success:
        raise HTTPException(status_code=404, detail="Classification not found or no changes made")
    return {"message": "Classification updated successfully"}


@app.delete("/objection-classifications/{bid}/{classification_id}")
def objection_delete(bid: str, classification_id: int):
    success = objection_handler.delete_classification(bid, classification_id)
    if not success:
        raise HTTPException(status_code=404, detail="Classification not found")
    return {"message": "Classification deleted successfully"}


@app.post("/objection-classifications/{bid}/{classification_id}/toggle")
def objection_toggle(bid: str, classification_id: int):
    success = objection_handler.toggle_active_status(bid, classification_id)
    if not success:
        raise HTTPException(status_code=404, detail="Classification not found")
    return {"message": "Classification status toggled successfully"}


@app.get("/objection-classifications/{bid}/search")
def objection_search(bid: str, q: str = Query(...)):
    return objection_handler.search_classifications(bid, q)


@app.get("/objection-classifications/{bid}/by-severity/{severity}")
def objection_by_severity(bid: str, severity: str):
    if severity not in ["low", "medium", "high", "critical"]:
        raise HTTPException(status_code=400, detail="Invalid severity level")
    return objection_handler.get_classifications_by_severity(bid, severity)


@app.post("/objection-classifications/{bid}/classify")
def objection_classify(bid: str, payload: Dict[str, Any]):
    text = (payload or {}).get("objection_text")
    if not text:
        raise HTTPException(status_code=400, detail="objection_text is required")
    return objection_handler.classify_objection(bid, text)


@app.get("/objection-classifications/{bid}/statistics")
def objection_statistics(bid: str):
    return objection_handler.get_statistics(bid)


@app.get("/crm/{bid}/leadsquared/presales-mapping")
def crm_presales_mapping(
    bid: str,
    groupname: Optional[str] = Query(default=None),
    lsq_row_count: int = Query(default=250),
    force_refresh: bool = Query(default=False),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    mapping = _get_presales_mapping_from_leadsquared(bid, groupname=groupname, row_count=lsq_row_count, force_refresh=force_refresh)
    return JSONResponse(status_code=(200 if mapping.get("success") else 400), content=mapping)


@app.get("/crm/{bid}/leadsquared/integration")
def crm_lsq_get_integration(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    integration = db_handler.get_crm_integration(bid, "leadsquared")
    if not integration:
        return JSONResponse(status_code=404, content={"success": False, "message": "LeadSquared integration not found", "data": None})
    return {"success": True, "data": integration}


@app.post("/crm/{bid}/leadsquared/integration")
def crm_lsq_save_integration(bid: str, payload: Dict[str, Any], user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    _require_business_admin_or_master(bid, user)
    access_key = str(payload.get("lsq_access_key", "")).strip()
    secret_key = str(payload.get("lsq_secret_key", "")).strip()
    api_host = str(payload.get("lsq_api_host", "")).strip()
    is_active = bool(payload.get("is_active", True))

    existing_integration = db_handler.get_crm_integration(bid, "leadsquared")
    existing_creds = db_handler.get_crm_credentials(bid, "leadsquared") if existing_integration else {}
    if not access_key:
        access_key = existing_creds.get("access_key", "")
    if not secret_key:
        secret_key = existing_creds.get("secret_key", "")

    if not access_key or not secret_key:
        raise HTTPException(status_code=400, detail="lsq_access_key and lsq_secret_key are required")

    if not api_host:
        api_host = (existing_integration or {}).get("api_host") or "https://api-in21.leadsquared.com/v2/"

    db_handler.upsert_crm_integration(
        bid=bid,
        provider="leadsquared",
        access_key=access_key,
        secret_key=secret_key,
        api_host=api_host,
        is_active=is_active,
        config=payload.get("config") or {},
    )
    return {"success": True, "message": "LeadSquared integration saved successfully"}


@app.post("/crm/{bid}/leadsquared/integration/push-activity")
def crm_lsq_push_activity(
    bid: str,
    payload: Dict[str, Any],
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    if not _crm_write_allowed():
        raise HTTPException(status_code=403, detail="CRM write operations are disabled")
    if not payload:
        raise HTTPException(status_code=400, detail="Activity payload is required")

    service = _get_lsq_service_for_bid_or_error(bid)
    result = service.create_activity(payload)
    try:
        db_handler.log_crm_push(
            bid=bid,
            provider="leadsquared",
            status="pushed" if result.get("success") else "failed",
            crm_lead_id=payload.get("RelatedProspectId"),
            activity_event=payload.get("ActivityEvent"),
            message="Manual LeadSquared activity push" if result.get("success") else result.get("message"),
            payload_preview=payload,
            response_preview=result.get("data"),
        )
    except Exception:
        logger.exception("Failed to write manual LeadSquared push log for bid=%s", bid)
    return JSONResponse(status_code=(200 if result.get("success") else 400), content=result)


@app.get("/crm/{bid}/leadsquared/push-logs")
def crm_lsq_push_logs(
    bid: str,
    limit: int = Query(default=100, ge=1, le=500),
    offset: int = Query(default=0, ge=0),
    status: Optional[str] = Query(default=None),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    integration = db_handler.get_crm_integration(bid, "leadsquared")
    config = (integration or {}).get("config") or {}
    active = bool(integration and integration.get("is_active") and integration.get("has_credentials") and config.get("enabled"))
    if not active:
        return {
            "success": True,
            "enabled": False,
            "message": "LeadSquared integration is not active for this business",
            "logs": [],
            "total": 0,
            "limit": limit,
            "offset": offset,
        }
    data = db_handler.get_crm_push_logs(bid, provider="leadsquared", limit=limit, offset=offset, status=status)
    return {"success": True, "enabled": True, **data}


@app.post("/crm/{bid}/leadsquared/integration/test")
def crm_lsq_test_integration(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    creds = db_handler.get_crm_credentials(bid, "leadsquared")
    if not creds or not creds.get("access_key") or not creds.get("secret_key"):
        return JSONResponse(status_code=404, content={"success": False, "message": "LeadSquared integration not configured"})
    service = LeadSquaredService(access_key=creds["access_key"], secret_key=creds["secret_key"], api_host=creds.get("api_host"))
    result = service.test_connection()
    if result.get("success"):
        db_handler.mark_crm_integration_tested(bid, "leadsquared")
        return result
    return JSONResponse(status_code=400, content=result)


@app.delete("/crm/{bid}/leadsquared/integration")
def crm_lsq_delete_integration(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    _require_business_admin_or_master(bid, user)
    removed = db_handler.delete_crm_integration(bid, "leadsquared")
    return {"success": True, "message": "LeadSquared integration deleted successfully" if removed else "LeadSquared integration did not exist"}


@app.post("/crm/{bid}/leadsquared/leads/search")
def crm_lsq_search_leads(bid: str, payload: Dict[str, Any], user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    service = _get_lsq_service_for_bid_or_error(bid)
    return service.search_leads(payload or {})


@app.get("/crm/{bid}/leadsquared/leads/{lead_id}")
def crm_lsq_get_lead(bid: str, lead_id: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    service = _get_lsq_service_for_bid_or_error(bid)
    result = service.get_lead(lead_id)
    return JSONResponse(status_code=(200 if result.get("success") else 400), content=result)


@app.post("/crm/{bid}/leadsquared/leads")
def crm_lsq_create_lead(bid: str, payload: Dict[str, Any], user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    if not _crm_write_allowed():
        raise HTTPException(status_code=403, detail="CRM write operations are disabled")
    service = _get_lsq_service_for_bid_or_error(bid)
    result = service.create_lead(payload or {})
    return JSONResponse(status_code=(200 if result.get("success") else 400), content=result)


@app.put("/crm/{bid}/leadsquared/leads/{lead_id}")
def crm_lsq_update_lead(bid: str, lead_id: str, payload: Dict[str, Any], user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    if not _crm_write_allowed():
        raise HTTPException(status_code=403, detail="CRM write operations are disabled")
    service = _get_lsq_service_for_bid_or_error(bid)
    result = service.update_lead(lead_id, payload or {})
    return JSONResponse(status_code=(200 if result.get("success") else 400), content=result)


@app.delete("/crm/{bid}/leadsquared/leads/{lead_id}")
def crm_lsq_delete_lead(bid: str, lead_id: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    if not _crm_write_allowed():
        raise HTTPException(status_code=403, detail="CRM write operations are disabled")
    service = _get_lsq_service_for_bid_or_error(bid)
    result = service.delete_lead(lead_id)
    return JSONResponse(status_code=(200 if result.get("success") else 400), content=result)


@app.get("/audio/proxy/{bid}/{callid}")
def audio_proxy(bid: str, callid: str):
    call = db_handler.get_call_by_id(bid, callid)
    file_url = (call or {}).get("fileUrl")
    if not file_url:
        raise HTTPException(status_code=404, detail="Audio file not found")
    response = requests.get(file_url, stream=True, timeout=60)
    if response.status_code != 200:
        raise HTTPException(status_code=502, detail="Failed to fetch audio file")

    def _iter():
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                yield chunk

    return StreamingResponse(
        _iter(),
        media_type=response.headers.get("Content-Type", "audio/mpeg"),
        headers={"Accept-Ranges": "bytes", "Cache-Control": "public, max-age=3600"},
    )


@app.get("/admin/users")
def admin_users(user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    users, status_code = auth_handler.get_all_users()
    return JSONResponse(status_code=status_code, content=jsonable_encoder({"users": users}))


@app.post("/admin/users/create")
async def admin_users_create(payload: Dict[str, Any], request: Request, user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    for field in ["username", "email", "password"]:
        if not payload.get(field):
            raise HTTPException(status_code=400, detail=f"{field} is required")
    result, status_code = auth_handler.create_user(
        username=payload["username"],
        email=payload["email"],
        password=payload["password"],
        full_name=payload.get("full_name"),
        role=payload.get("role", "user"),
        is_master=payload.get("is_master", False),
    )
    if status_code == 201:
        user_id = result["id"]
        default_role = payload.get("role", "user")
        if payload.get("businesses"):
            for business in payload["businesses"]:
                bid = business.get("bid") if isinstance(business, dict) else business
                role = business.get("role", default_role) if isinstance(business, dict) else default_role
                auth_handler.assign_business_access(user_id, bid, role)
        auth_handler.log_activity(
            user_id=user.get("id"),
            username=user.get("username"),
            activity_type="create_user",
            description=f"Created new user: {payload['username']}",
            ip_address=request.client.host if request.client else None,
            user_agent=request.headers.get("User-Agent"),
        )
    return JSONResponse(status_code=status_code, content=result)


@app.get("/admin/businesses")
def admin_businesses():
    businesses, status_code = auth_handler.get_all_businesses()
    return JSONResponse(status_code=status_code, content={"businesses": businesses})


@app.post("/admin/businesses/create")
async def admin_businesses_create(request: Request, user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    data = await request.json()
    if not data.get("bid") or not data.get("name"):
        raise HTTPException(status_code=400, detail="bid and name are required")

    result, status_code = auth_handler.create_business(bid=data["bid"], name=data["name"], description=data.get("description"))
    if status_code == 201:
        auth_handler.log_activity(
            user_id=user.get("id"),
            username=user.get("username"),
            activity_type="create_business",
            description=f"Created new business: {data['name']} (ID: {data['bid']})",
            ip_address=request.client.host if request.client else None,
            user_agent=request.headers.get("User-Agent"),
        )
    return JSONResponse(status_code=status_code, content=result)


def _generate_business_bid() -> str:
    with _db_conn() as conn:
        cursor = conn.cursor()
        cursor.execute(
            """
            SELECT MAX(CAST(bid AS UNSIGNED)) AS max_bid
            FROM businesses
            WHERE bid REGEXP '^[0-9]+$'
            """
        )
        row = cursor.fetchone() or {}
    return str(max(int(row.get("max_bid") or 0) + 1, 1000))


def _generate_onboarding_password(length: int = 14) -> str:
    alphabet = string.ascii_letters + string.digits
    return "".join(secrets.choice(alphabet) for _ in range(length))


def _delete_from_table_if_column_exists(cursor, table_name: str, column_name: str, value: str) -> None:
    cursor.execute(
        """
        SELECT 1
        FROM information_schema.columns
        WHERE table_schema = DATABASE()
          AND table_name = %s
          AND column_name = %s
        LIMIT 1
        """,
        (table_name, column_name),
    )
    if not cursor.fetchone():
        return
    cursor.execute(f"DELETE FROM `{table_name}` WHERE `{column_name}` = %s", (value,))


def _validate_business_password(bid: str, password: str) -> bool:
    with _db_conn() as conn:
        cursor = conn.cursor()
        cursor.execute(
            """
            SELECT bu.password_hash, bu.plain_password
            FROM business_users bu
            JOIN user_business_access uba ON uba.user_id = bu.id
            WHERE uba.bid = %s
              AND bu.is_active = 1
            ORDER BY bu.is_master ASC, bu.id ASC
            """,
            (bid,),
        )
        rows = cursor.fetchall() or []

    for row in rows:
        if row.get("plain_password") and password == row.get("plain_password"):
            return True
        if auth_handler.verify_password(password, row.get("password_hash")):
            return True
    return False


@app.post("/admin/onboard-business")
async def admin_onboard_business(request: Request, user: Dict[str, Any] = Depends(_auth_user)):
    _require_master(user)
    data = await request.json()

    name = str(data.get("name") or "").strip()
    if not name:
        raise HTTPException(status_code=400, detail="Business name is required")

    bid = str(data.get("bid") or "").strip() or _generate_business_bid()
    if not re.fullmatch(r"[A-Za-z0-9_-]{2,20}", bid):
        raise HTTPException(status_code=400, detail="BID must be 2-20 letters, numbers, underscores, or hyphens")

    description = str(data.get("description") or "").strip() or None
    created, status_code = auth_handler.create_business(bid=bid, name=name, description=description)
    if status_code != 201:
        detail = created.get("error") if isinstance(created, dict) else "Failed to create business"
        raise HTTPException(status_code=status_code, detail=detail)

    username_base = re.sub(r"[^a-zA-Z0-9]+", "_", name).strip("_").lower() or f"business_{bid}"
    username = f"{username_base}_{bid}_admin"[:100]
    email = str(data.get("email") or f"{username}@mcube.local").strip().lower()
    password = _generate_onboarding_password()

    new_user, user_status = auth_handler.create_user(
        username=username,
        email=email,
        password=password,
        full_name=f"{name} Admin",
        role="admin",
        is_master=False,
    )
    if user_status != 201:
        detail = new_user.get("error") if isinstance(new_user, dict) else "Business created, but admin user creation failed"
        raise HTTPException(status_code=user_status, detail=detail)

    auth_handler.assign_business_access(new_user["id"], bid, "admin")
    auth_handler.log_activity(
        user_id=user.get("id"),
        username=user.get("username"),
        activity_type="create_business",
        description=f"Onboarded new business: {name} (ID: {bid})",
        ip_address=request.client.host if request.client else None,
        user_agent=request.headers.get("User-Agent"),
    )

    return {
        "bid": bid,
        "name": name,
        "description": description,
        "business_user_id": new_user["id"],
        "credentials": {
            "username": username,
            "password": password,
            "email": email,
        },
        "tables_created": [
            f"{bid}_raw_calls",
            f"{bid}_sarvamresponse",
            f"{bid}_callanalytics",
        ],
    }


@app.put("/admin/businesses/{bid}/credentials")
async def admin_update_business_credentials(
    bid: str,
    request: Request,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_master(user)
    bid = str(bid).strip()
    if not re.fullmatch(r"[A-Za-z0-9_-]{2,20}", bid):
        raise HTTPException(status_code=400, detail="Invalid business ID")

    data = await request.json()
    username = str(data.get("username") or "").strip()
    email = str(data.get("email") or "").strip()
    password = str(data.get("password") or "")
    business_user_id = data.get("business_user_id")
    if business_user_id is not None:
        try:
            business_user_id = int(business_user_id)
        except (TypeError, ValueError):
            raise HTTPException(status_code=400, detail="Invalid business_user_id")

    result, status_code = auth_handler.update_business_user_credentials(
        bid=bid,
        username=username,
        email=email,
        password=password,
        user_id=business_user_id,
    )
    if status_code != 200:
        detail = result.get("error") if isinstance(result, dict) else "Failed to update credentials"
        raise HTTPException(status_code=status_code, detail=detail)

    auth_handler.log_activity(
        user_id=user.get("id"),
        username=user.get("username"),
        activity_type="update_business_credentials",
        description=f"Updated login credentials for business {bid}",
        ip_address=request.client.host if request.client else None,
        user_agent=request.headers.get("User-Agent"),
    )
    return result


@app.post("/business/{bid}/validate-password")
async def validate_business_delete_password(
    bid: str,
    payload: Dict[str, Any],
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_master(user)
    password = str((payload or {}).get("password") or "")
    if not password:
        raise HTTPException(status_code=400, detail="Password is required")
    if not _validate_business_password(str(bid), password):
        raise HTTPException(status_code=401, detail="Incorrect password. Please try again.")
    return {"success": True}


def _business_user_password_policy_error(password: str) -> Optional[str]:
    if not password:
        return "New password is required"
    if len(password) < 5:
        return "New password must be at least 5 characters long"
    if not re.search(r"[A-Z]", password):
        return "New password must include an uppercase letter"
    if not re.search(r"[a-z]", password):
        return "New password must include a lowercase letter"
    if not re.search(r"\d", password):
        return "New password must include a number"
    if not re.search(r"[^A-Za-z0-9]", password):
        return "New password must include a special character"
    return None


@app.post("/pca/businesses/{bid}/users/{user_id}/change-password")
def pca_change_business_user_password(
    bid: str,
    user_id: int,
    payload: Dict[str, Any],
    request: Request,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_master(user)
    bid = str(bid).strip()
    if not re.fullmatch(r"[A-Za-z0-9_-]{2,20}", bid):
        raise HTTPException(status_code=400, detail="Invalid business ID")

    previous_password = str((payload or {}).get("previous_password") or "")
    new_password = str((payload or {}).get("new_password") or "")
    confirm_password = str((payload or {}).get("confirm_password") or "")

    if not previous_password:
        raise HTTPException(status_code=400, detail="Previous password is required")
    policy_error = _business_user_password_policy_error(new_password)
    if policy_error:
        raise HTTPException(status_code=400, detail=policy_error)
    if not confirm_password:
        raise HTTPException(status_code=400, detail="Re-enter new password is required")
    if new_password != confirm_password:
        raise HTTPException(status_code=400, detail="New passwords do not match")

    with _db_conn() as conn:
        cursor = conn.cursor()
        cursor.execute(
            """SELECT bu.username, bu.full_name
            FROM user_business_access uba
            JOIN business_users bu ON bu.id = uba.user_id
            WHERE uba.bid = %s AND uba.user_id = %s AND bu.is_active = TRUE
            LIMIT 1""",
            (bid, user_id),
        )
        target_user = cursor.fetchone()
    if not target_user:
        raise HTTPException(status_code=404, detail="Login user is not linked to this business")

    result, status_code = auth_handler.change_user_password(user_id, previous_password, new_password)
    if status_code == 200:
        auth_handler.log_activity(
            user_id=user.get("id"),
            username=user.get("username"),
            activity_type="change_password",
            description=f"Changed password for business {bid} user: {target_user.get('username') or user_id}",
            ip_address=request.client.host if request.client else None,
            user_agent=request.headers.get("User-Agent"),
        )
    return JSONResponse(status_code=status_code, content=result)


@app.delete("/business/{bid}")
def delete_business(bid: str, request: Request, user: Dict[str, Any] = Depends(_auth_user)):
    _require_master(user)
    bid = str(bid).strip()
    if not re.fullmatch(r"[A-Za-z0-9_-]{2,20}", bid):
        raise HTTPException(status_code=400, detail="Invalid business ID")

    suffixes = [
        "raw_calls",
        "sarvamresponse",
        "callanalytics",
        "leads",
        "lead_notes",
        "lead_tasks",
        "lead_insights",
    ]
    related_tables = [
        "business_crm_integrations",
        "business_telephony_integrations",
        "business_pipeline_config",
        "pca_business_allocations",
        "business_agent_config",
        "embed_api_keys",
    ]

    with _db_conn() as conn:
        cursor = conn.cursor()
        try:
            cursor.execute("SELECT name FROM businesses WHERE bid = %s", (bid,))
            business = cursor.fetchone()
            if not business:
                raise HTTPException(status_code=404, detail="Business not found")

            cursor.execute(
                """
                SELECT bu.id
                FROM business_users bu
                JOIN user_business_access uba ON uba.user_id = bu.id
                WHERE uba.bid = %s
                  AND bu.is_master = 0
                  AND NOT EXISTS (
                    SELECT 1
                    FROM user_business_access other_uba
                    WHERE other_uba.user_id = bu.id
                      AND other_uba.bid <> %s
                  )
                """,
                (bid, bid),
            )
            business_only_user_ids = [row["id"] for row in (cursor.fetchall() or [])]

            for table_name in related_tables:
                _delete_from_table_if_column_exists(cursor, table_name, "bid", bid)

            cursor.execute("DELETE FROM user_business_access WHERE bid = %s", (bid,))

            deleted_users = 0
            if business_only_user_ids:
                placeholders = ",".join(["%s"] * len(business_only_user_ids))
                cursor.execute(
                    f"DELETE FROM business_users WHERE is_master = 0 AND id IN ({placeholders})",
                    business_only_user_ids,
                )
                deleted_users = cursor.rowcount

            cursor.execute("DELETE FROM businesses WHERE bid = %s", (bid,))

            dropped_tables = []
            for suffix in suffixes:
                table_name = f"{bid}_{suffix}"
                if _table_exists(cursor, table_name):
                    cursor.execute(f"DROP TABLE `{table_name}`")
                    dropped_tables.append(table_name)

            conn.commit()
        except HTTPException:
            conn.rollback()
            raise
        except Exception as exc:
            conn.rollback()
            logger.error("Error deleting business %s: %s", bid, exc)
            raise HTTPException(status_code=500, detail=f"Failed to delete business: {exc}")

    auth_handler.log_activity(
        user_id=user.get("id"),
        username=user.get("username"),
        activity_type="delete",
        description=f"Deleted business: {business.get('name')} (ID: {bid})",
        ip_address=request.client.host if request.client else None,
        user_agent=request.headers.get("User-Agent"),
    )
    return {
        "success": True,
        "message": "Business deleted successfully",
        "dropped_tables": dropped_tables,
        "deleted_users": deleted_users,
    }


@app.post("/admin/users/{user_id}/businesses")
def admin_assign_business(user_id: int, payload: Dict[str, Any], user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    if not payload.get("bid"):
        raise HTTPException(status_code=400, detail="bid is required")
    result, status_code = auth_handler.assign_business_access(user_id=user_id, bid=payload["bid"], role=payload.get("role", "user"))
    return JSONResponse(status_code=status_code, content=result)


@app.get("/admin/activity-log")
def admin_activity_log(
    limit: int = Query(default=100),
    offset: int = Query(default=0),
    user_id: Optional[int] = Query(default=None),
    activity_type: Optional[str] = Query(default=None),
    action_code: Optional[str] = Query(default=None),
    bid: Optional[str] = Query(default=None),
    username_contains: Optional[str] = Query(default=None),
    description_contains: Optional[str] = Query(default=None),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    user: Dict[str, Any] = Depends(_auth_user),
):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    result, status_code = auth_handler.get_activity_log(
        limit=limit,
        offset=offset,
        user_id=user_id,
        activity_type=activity_type,
        action_code=action_code,
        bid=bid,
        username_contains=username_contains,
        description_contains=description_contains,
        date_from=date_from,
        date_to=date_to,
    )
    if status_code != 200:
        return JSONResponse(status_code=status_code, content=jsonable_encoder(result))
    return JSONResponse(status_code=status_code, content=jsonable_encoder(result))


@app.post("/api/embed/token")
def embed_token(payload: Dict[str, Any], request: Request):
    api_key = (payload or {}).get("api_key")
    bid = (payload or {}).get("bid")
    if not api_key or not bid:
        raise HTTPException(status_code=400, detail="api_key and bid are required")
    key_record = auth_handler.validate_api_key(api_key, str(bid))
    if not key_record:
        raise HTTPException(status_code=401, detail="Invalid API key or unauthorized for this business")
    origin = request.headers.get("Origin")
    if key_record.get("allowed_origins") and origin:
        allowed = [o.strip() for o in str(key_record["allowed_origins"]).split(",")]
        if origin not in allowed:
            raise HTTPException(status_code=403, detail="Origin not allowed")
    token = auth_handler.generate_embed_token(bid=str(bid), partner_name=key_record["partner_name"], api_key_id=key_record["id"])
    embed_base_url = CONFIG.get("EMBED_BASE_URL", "http://localhost:6174")
    return {"token": token, "embed_url": f"{embed_base_url}/#/embed?token={token}", "expires_in": 3600, "bid": str(bid), "partner_name": key_record["partner_name"]}


@app.get("/api/embed/validate")
def embed_validate(token: str = Query(...)):
    result = auth_handler.validate_embed_token(token)
    if not result:
        raise HTTPException(status_code=401, detail="Invalid or expired embed token")
    return {"valid": True, "bid": result["bid"], "partner_name": result["partner_name"]}


@app.post("/admin/embed-keys")
def admin_embed_keys_create(payload: Dict[str, Any], request: Request, user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    bid = payload.get("bid")
    partner_name = payload.get("partner_name")
    if not bid or not partner_name:
        raise HTTPException(status_code=400, detail="bid and partner_name are required")
    result, status_code = auth_handler.create_embed_api_key(
        bid=str(bid),
        partner_name=partner_name,
        allowed_origins=payload.get("allowed_origins"),
        expires_at=payload.get("expires_at"),
    )
    auth_handler.log_activity(
        user_id=user["id"],
        username=user["username"],
        activity_type="create_embed_key",
        description=f"Created embed API key for business {bid} (partner: {partner_name})",
        ip_address=request.client.host if request.client else None,
        user_agent=request.headers.get("User-Agent"),
    )
    return JSONResponse(status_code=status_code, content=result)


@app.get("/admin/embed-keys")
def admin_embed_keys_list(bid: Optional[str] = Query(default=None), user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    keys, status_code = auth_handler.list_embed_api_keys(bid=bid)
    return JSONResponse(status_code=status_code, content={"keys": keys})


@app.delete("/admin/embed-keys/{key_id}")
def admin_embed_keys_revoke(key_id: int, request: Request, user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    result, status_code = auth_handler.revoke_embed_api_key(key_id)
    auth_handler.log_activity(
        user_id=user["id"],
        username=user["username"],
        activity_type="revoke_embed_key",
        description=f"Revoked embed API key {key_id}",
        ip_address=request.client.host if request.client else None,
        user_agent=request.headers.get("User-Agent"),
    )
    return JSONResponse(status_code=status_code, content=result)


# =============================================================================
# Telephony integrations + call sync (API expected by frontend)
# =============================================================================

def _get_active_telephony_integration_or_none(bid: str) -> Optional[Dict[str, Any]]:
    try:
        items = db_handler.list_telephony_integrations(bid)
    except Exception:
        return None
    if not items:
        return None
    for item in items:
        if int(item.get("is_active") or 0) == 1:
            return item
    return items[0]


def _source_db_config_from_telephony_integration(integration: Dict[str, Any]) -> Dict[str, Any]:
    cfg = (integration or {}).get("config") or {}
    host = cfg.get("host")
    user = cfg.get("user")
    password = cfg.get("password")
    database = cfg.get("database")
    port = int(cfg.get("port") or 3306)
    if not host or not user or not password or not database:
        raise HTTPException(status_code=400, detail="Telephony integration is missing DB connection details")
    return {
        "host": host,
        "port": port,
        "user": user,
        "password": password,
        "database": database,
        "charset": "utf8mb4",
    }


def _source_cols_map(cursor, table_name: str) -> Dict[str, str]:
    return {str(c).lower(): str(c) for c in _existing_table_columns(cursor, table_name)}


def _source_sql_col(
    alias: str,
    source_cols: Dict[str, str],
    *candidates: str,
    default: str = "NULL",
) -> str:
    for name in candidates:
        key = str(name).lower()
        if key in source_cols:
            return f"{alias}.`{source_cols[key]}`"
    return default


def _customer_callinfo_expr(
    alias: str,
    source_cols: Dict[str, str],
) -> str:
    has_direction = "direction" in source_cols
    customer_candidates = []
    if "customer_callinfo" in source_cols:
        col = source_cols["customer_callinfo"]
        customer_candidates.append(f"NULLIF(TRIM(CAST({alias}.`{col}` AS CHAR)), '')")
    if has_direction and "callto" in source_cols and "callfrom" in source_cols:
        callto = source_cols["callto"]
        callfrom = source_cols["callfrom"]
        direction = source_cols["direction"]
        customer_candidates.append(
            "CASE "
            f"WHEN LOWER(TRIM(CAST({alias}.`{direction}` AS CHAR))) = 'outbound' "
            f"THEN NULLIF(TRIM(CAST({alias}.`{callto}` AS CHAR)), '') "
            f"WHEN LOWER(TRIM(CAST({alias}.`{direction}` AS CHAR))) = 'inbound' "
            f"THEN NULLIF(TRIM(CAST({alias}.`{callfrom}` AS CHAR)), '') "
            "ELSE NULL END"
        )
    if "callfrom" in source_cols:
        col = source_cols["callfrom"]
        customer_candidates.append(f"NULLIF(TRIM(CAST({alias}.`{col}` AS CHAR)), '')")
    if "callto" in source_cols:
        col = source_cols["callto"]
        customer_candidates.append(f"NULLIF(TRIM(CAST({alias}.`{col}` AS CHAR)), '')")
    if "clicktocalldid" in source_cols:
        col = source_cols["clicktocalldid"]
        customer_candidates.append(f"NULLIF(TRIM(CAST({alias}.`{col}` AS CHAR)), '')")
    if not customer_candidates:
        return "NULL"
    return f"COALESCE({', '.join(customer_candidates)})"


def _fetch_source_calls_from_config(
    source_bid: str,
    table_kind: str,
    source_config: Dict[str, Any],
    date_from: Optional[str] = None,
    date_to: Optional[str] = None,
    dialstatus_csv: Optional[str] = None,
    direction_csv: Optional[str] = None,
    limit: int = 300,
):
    from mcube_group_util import resolve_mcube_group_sql

    conn = pymysql.connect(**source_config)
    try:
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        table_name = _resolve_source_call_table(cursor, source_bid, table_kind)
        if not table_name:
            return [], None

        alias = "c"
        source_cols = _source_cols_map(cursor, table_name)
        start_col = _source_sql_col(alias, source_cols, "starttime", "call_starttime")
        end_col = _source_sql_col(alias, source_cols, "endtime", "call_endtime")
        dialstatus_col = _source_sql_col(alias, source_cols, "dialstatus", "call_status")
        direction_col = _source_sql_col(alias, source_cols, "direction", default="'inbound'")
        filename_col = _source_sql_col(alias, source_cols, "filename", "fileurl", "file_url")
        callid_col = _source_sql_col(alias, source_cols, "callid", "call_id")
        agent_col = _source_sql_col(
            alias, source_cols, "agentname", "callername", "agent_name", default="''"
        )
        agent_sql = f"{agent_col} AS agentname"
        emp_phone_sql = (
            f"{_source_sql_col(alias, source_cols, 'emp_phone', 'callfrom', default='NULL')} AS emp_phone"
        )
        clicktocall_sql = (
            f"{_source_sql_col(alias, source_cols, 'clicktocalldid', default='NULL')} AS clicktocalldid"
        )
        countrycode_sql = (
            f"{_source_sql_col(alias, source_cols, 'countrycode', default='NULL')} AS countrycode"
        )
        group_join_sql, group_join_params, groupname_sql, _ = resolve_mcube_group_sql(
            cursor, source_bid, table_name, source_cols, call_alias=alias
        )
        customer_expr = _customer_callinfo_expr(alias, source_cols)

        where = ["1 = 1"]
        where_params: List[Any] = []

        if "bid" in source_cols:
            where.append(f"{alias}.`{source_cols['bid']}` = %s")
            where_params.append(str(source_bid))

        if date_from and date_to and start_col != "NULL":
            where.append(f"DATE({start_col}) BETWEEN %s AND %s")
            where_params.extend([date_from, date_to])

        if dialstatus_csv and dialstatus_col != "NULL":
            vals = [v.strip() for v in str(dialstatus_csv).split(",") if v.strip()]
            if vals:
                where.append(
                    f"{dialstatus_col} IN (" + ", ".join(["%s"] * len(vals)) + ")"
                )
                where_params.extend(vals)

        if direction_csv and "direction" in source_cols:
            vals = [v.strip() for v in str(direction_csv).split(",") if v.strip()]
            if vals:
                where.append(
                    f"{alias}.`{source_cols['direction']}` IN ("
                    + ", ".join(["%s"] * len(vals))
                    + ")"
                )
                where_params.extend(vals)

        order_col = start_col if start_col != "NULL" else f"{alias}.`{source_cols.get('callid', 'callid')}`"
        query = f"""
            SELECT
                {callid_col} AS callid,
                %s AS bid,
                {agent_sql},
                {groupname_sql},
                {start_col} AS starttime,
                {end_col} AS endtime,
                {dialstatus_col} AS dialstatus,
                {direction_col} AS direction,
                {filename_col} AS filename,
                {customer_expr} AS customer_callinfo,
                {countrycode_sql},
                {emp_phone_sql},
                {clicktocall_sql}
            FROM `{table_name}` {alias}
            {group_join_sql}
            WHERE {" AND ".join(where)}
            ORDER BY {order_col} DESC
            LIMIT %s
        """
        params: List[Any] = [str(source_bid)] + list(group_join_params) + where_params + [int(limit)]
        cursor.execute(query, tuple(params))
        rows = cursor.fetchall() or []
        return [_serialize_call_sync_row(r) for r in rows], table_name
    finally:
        conn.close()


CALL_SYNC_CACHE_TTL_SECONDS = int(os.getenv("CALL_SYNC_CACHE_TTL_SECONDS", "3600"))
CALL_SYNC_CACHE_SCHEMA_VERSION = "v5"


def _resolve_source_call_table(cursor, bid: str, table_kind: str):
    candidates = [f"{bid}_callhistory", f"{bid}_call_history"] if table_kind == "history" else [f"{bid}_callarchive", f"{bid}_call_archive"]
    for candidate in candidates:
        cursor.execute("SHOW TABLES LIKE %s", (candidate,))
        if cursor.fetchone():
            return candidate
    return None


def _table_has_column(cursor, table_name: str, column_name: str) -> bool:
    cursor.execute(f"SHOW COLUMNS FROM `{table_name}` LIKE %s", (column_name,))
    return cursor.fetchone() is not None


def _serialize_call_sync_row(row: Dict[str, Any]) -> Dict[str, Any]:
    output = {}
    for key, value in row.items():
        output[key] = value.isoformat() if isinstance(value, datetime) else value
    return output


def _fetch_source_calls(bid: str, table_kind: str, date_from: Optional[str] = None, date_to: Optional[str] = None, limit: int = 500):
    source_config = get_sync_source_db_config()
    conn = pymysql.connect(**source_config)
    try:
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        table_name = _resolve_source_call_table(cursor, bid, table_kind)
        if not table_name:
            return [], None
        has_direction = _table_has_column(cursor, table_name, "direction")
        has_customer_callinfo = _table_has_column(cursor, table_name, "customer_callinfo")
        has_callfrom = _table_has_column(cursor, table_name, "callfrom")
        has_callto = _table_has_column(cursor, table_name, "callto")
        has_clicktocall = _table_has_column(cursor, table_name, "clicktocalldid")

        customer_candidates = []
        if has_customer_callinfo:
            customer_candidates.append("NULLIF(TRIM(CAST(customer_callinfo AS CHAR)), '')")
        if has_direction and has_callto and has_callfrom:
            customer_candidates.append(
                "CASE "
                "WHEN LOWER(TRIM(CAST(direction AS CHAR))) = 'outbound' THEN NULLIF(TRIM(CAST(callto AS CHAR)), '') "
                "WHEN LOWER(TRIM(CAST(direction AS CHAR))) = 'inbound' THEN NULLIF(TRIM(CAST(callfrom AS CHAR)), '') "
                "ELSE NULL END"
            )
        if has_callto:
            customer_candidates.append("NULLIF(TRIM(CAST(callto AS CHAR)), '')")
        if has_clicktocall:
            customer_candidates.append("NULLIF(TRIM(CAST(clicktocalldid AS CHAR)), '')")
        customer_expr = f"COALESCE({', '.join(customer_candidates)})" if customer_candidates else "NULL"

        query = f"""
            SELECT
                callid, %s as bid, agentname, groupname, starttime, endtime, dialstatus, direction, filename,
                {customer_expr} as customer_callinfo, countrycode, emp_phone, clicktocalldid
            FROM `{table_name}`
            WHERE 1 = 1
        """
        params: List[Any] = [str(bid)]
        if date_from and date_to:
            query += " AND DATE(starttime) BETWEEN %s AND %s"
            params.extend([date_from, date_to])
        query += " ORDER BY starttime DESC LIMIT %s"
        params.append(int(limit))
        cursor.execute(query, tuple(params))
        rows = cursor.fetchall() or []
        return [_serialize_call_sync_row(r) for r in rows], table_name
    finally:
        conn.close()


def _fetch_source_count(bid: str, table_kind: str, date_from: Optional[str] = None, date_to: Optional[str] = None) -> int:
    source_config = get_sync_source_db_config()
    conn = pymysql.connect(**source_config)
    try:
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        table_name = _resolve_source_call_table(cursor, bid, table_kind)
        if not table_name:
            return 0
        query = f"SELECT COUNT(*) as total FROM `{table_name}` WHERE 1 = 1"
        params: List[Any] = []
        if date_from and date_to:
            query += " AND DATE(starttime) BETWEEN %s AND %s"
            params.extend([date_from, date_to])
        cursor.execute(query, tuple(params))
        row = cursor.fetchone() or {}
        return int(row.get("total") or 0)
    finally:
        conn.close()


@app.get("/telephony/{bid}/integrations")
def telephony_list_integrations(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    integrations = db_handler.list_telephony_integrations(bid)
    return {"integrations": integrations}


def _normalize_telephony_source_groups(groups) -> List[Dict[str, Any]]:
    """Return [{groupname, call_count}, ...] for telephony UI consumers."""
    normalized: List[Dict[str, Any]] = []
    for item in groups or []:
        if isinstance(item, str):
            name = item.strip()
            if name:
                normalized.append({"groupname": name, "call_count": None})
            continue
        if not isinstance(item, dict):
            continue
        name = str(item.get("groupname") or item.get("name") or "").strip()
        if not name:
            continue
        count = item.get("call_count")
        if count is None:
            count = item.get("totalCalls")
        normalized.append({"groupname": name, "call_count": count})
    return normalized


def _fetch_telephony_source_groups(source_config: Dict[str, Any], src_bid: str) -> List[Dict[str, Any]]:
    """Load distinct groupnames from a Mcube source callhistory table."""
    from mcube_group_util import fetch_mcube_source_groups

    conn = pymysql.connect(**source_config)
    try:
        cursor = conn.cursor()
        table_name = _resolve_source_call_table(cursor, src_bid, "history")
        if not table_name:
            return []
        return fetch_mcube_source_groups(cursor, src_bid, table_name)
    finally:
        conn.close()


def _compute_onboarding_ingest_watermark(ingest_lookback_days: int) -> datetime:
    """Start time for first ingest on a new business (0 = start of today)."""
    now = datetime.now().replace(microsecond=0)
    if ingest_lookback_days <= 0:
        return now.replace(hour=0, minute=0, second=0, microsecond=0)
    return now - timedelta(days=int(ingest_lookback_days))


def _should_bootstrap_ingest_watermark(bid: str) -> bool:
    """Only seed ingest watermark for brand-new pipelines (do not disturb running BIDs)."""
    wm_key = f"orchestrator_ingest_{bid}"
    if db_handler.get_sync_watermark(bid, wm_key):
        return False
    raw_table = f"{bid}_raw_calls"
    try:
        with _db_conn() as conn:
            cursor = conn.cursor()
            cursor.execute(f"SELECT COUNT(*) AS n FROM `{raw_table}`")
            row = cursor.fetchone() or {}
            if int(row.get("n") or 0) > 0:
                return False
    except Exception:
        pass
    return True


@app.get("/telephony/{bid}/source-groups")
def telephony_source_groups(
    bid: str,
    provider: Optional[str] = Query(default=None),
    source_bid: Optional[str] = Query(default=None),
    user: Dict[str, Any] = Depends(_auth_user),
):
    """Distinct groupnames from the telephony source DB (or local raw_calls fallback)."""
    _auth_user_for_bid(bid, user)
    integration = _get_active_telephony_integration_or_none(bid)
    src_bid = str(source_bid or (integration or {}).get("source_bid") or bid).strip()
    groups: List[Dict[str, Any]] = []
    provider_key = str(provider or (integration or {}).get("provider") or "").strip().lower()

    source_config = None
    if integration:
        try:
            source_config = _source_db_config_from_telephony_integration(integration)
        except Exception as exc:
            logger.warning("telephony source-groups integration config failed for bid=%s: %s", bid, exc)
    elif provider_key:
        try:
            source_config = _provider_env_source_config(provider_key)
        except HTTPException:
            raise
        except Exception as exc:
            logger.warning("telephony source-groups provider config failed for bid=%s: %s", bid, exc)

    if source_config:
        try:
            groups = _fetch_telephony_source_groups(source_config, src_bid)
        except HTTPException:
            raise
        except Exception as exc:
            logger.warning("telephony source-groups query failed for bid=%s: %s", bid, exc)

    if not groups:
        try:
            groups = _normalize_telephony_source_groups(db_handler.get_all_groupnames(bid) or [])
        except Exception:
            groups = []

    return {"bid": bid, "source_bid": src_bid, "groups": _normalize_telephony_source_groups(groups)}


@app.post("/telephony/{bid}/integrations")
def telephony_connect_integration(bid: str, payload: TelephonyIntegrationConnectRequest, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    _require_settings_manage_or_admin(bid, user)
    provider = str(payload.provider or "").strip().lower()
    source_bid = str(payload.source_bid or "").strip()
    if not provider or not source_bid:
        raise HTTPException(status_code=400, detail="provider and source_bid are required")

    cfg = dict(payload.config or {})
    # If no explicit config provided, fall back to env-based sync source config.
    if not cfg:
        env_cfg = get_sync_source_db_config()
        cfg = {
            "host": env_cfg.get("host"),
            "port": env_cfg.get("port"),
            "user": env_cfg.get("user"),
            "password": env_cfg.get("password"),
            "database": env_cfg.get("database"),
        }
    db_handler.upsert_telephony_integration(bid=bid, provider=provider, source_bid=source_bid, config=cfg, is_active=True)
    db_handler.set_active_telephony_provider(bid=bid, provider=provider)
    return {"success": True, "message": "Telephony integration saved successfully"}


def _copy_quality_parameters(source_bid: str, target_bid: str, custom_parameters: Optional[List[Dict[str, Any]]] = None) -> int:
    params = custom_parameters if custom_parameters is not None else quality_params_handler.get_parameters(source_bid)
    with _db_conn() as conn:
        cursor = conn.cursor()
        cursor.execute("DELETE FROM quality_parameters WHERE bid = %s", (str(target_bid),))

    count = 0
    for param in params or []:
        item = dict(param)
        item.pop("id", None)
        item.pop("created_at", None)
        item.pop("updated_at", None)
        quality_params_handler.save_parameter(str(target_bid), item)
        count += 1
    return count


def _copy_default_template_parameters(target_bid: str) -> int:
    params = quality_params_handler.get_template_parameters("default", seed_from_bid="8329")
    with _db_conn() as conn:
        cursor = conn.cursor()
        cursor.execute("DELETE FROM quality_parameters WHERE bid = %s", (str(target_bid),))

    count = 0
    for param in params or []:
        item = dict(param)
        item.pop("id", None)
        item.pop("template_key", None)
        item.pop("created_at", None)
        item.pop("updated_at", None)
        quality_params_handler.save_parameter(str(target_bid), item)
        count += 1
    return count


def _verify_source_table_exists(source_config: Dict[str, Any], source_bid: str) -> str:
    conn = pymysql.connect(**source_config)
    try:
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        table_name = _resolve_source_call_table(cursor, source_bid, "history")
        if not table_name:
            raise HTTPException(
                status_code=400,
                detail=f"Source table not found for BID {source_bid}. Expected {source_bid}_callhistory.",
            )
        return table_name
    finally:
        conn.close()


@app.post("/admin/businesses/{bid}/onboarding-processing")
def admin_business_onboarding_processing(
    bid: str,
    payload: OnboardingProcessingRequest,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_master(user)
    bid = str(bid).strip()
    source_bid = str(payload.source_bid or bid).strip()
    provider = str(payload.provider or "").strip().lower()
    analysis_mode = str(payload.analysis_mode or "default").strip().lower()
    if analysis_mode not in {"default", "custom", "transcription_only"}:
        raise HTTPException(status_code=400, detail="Invalid analysis mode")

    db_handler.ensure_business_pipeline_config_table()
    analytics_enabled = analysis_mode != "transcription_only"
    provider_config = None
    source_table = None

    if provider != "csv":
        provider_config = _provider_env_source_config(provider)
        source_table = _verify_source_table_exists(provider_config, source_bid)
        db_handler.upsert_telephony_integration(
            bid=bid,
            provider=provider,
            source_bid=source_bid,
            config=provider_config,
            is_active=True,
        )
        db_handler.set_active_telephony_provider(bid=bid, provider=provider)

    ingest_lookback_days = int(payload.ingest_lookback_days or 0)
    if ingest_lookback_days not in ONBOARDING_INGEST_LOOKBACK_DAYS:
        raise HTTPException(
            status_code=400,
            detail=f"ingest_lookback_days must be one of: {', '.join(str(v) for v in ONBOARDING_INGEST_LOOKBACK_DAYS)}",
        )

    allowed_groups = [
        str(g).strip() for g in (payload.allowed_groupnames or []) if str(g).strip()
    ]
    if payload.group_filter_enabled and not allowed_groups:
        raise HTTPException(
            status_code=400,
            detail="Select at least one group, or disable group filter.",
        )

    params_saved = 0
    if analysis_mode == "default":
        params_saved = _copy_default_template_parameters(bid)
    elif analysis_mode == "custom":
        params_saved = _copy_quality_parameters("", bid, payload.custom_parameters or [])

    cfg_payload = {
        "pipeline_enabled": True,
        "source_type": provider,
        "source_bid": source_bid,
        "source_table": source_table or "",
        "analysis_mode": analysis_mode,
        "analytics_enabled": analytics_enabled,
        "processing_mode": analysis_mode,
        "stt_provider": "sarvam",
        "lookback_days": ingest_lookback_days,
        "group_filter_enabled": 1 if payload.group_filter_enabled else 0,
        "allowed_groupnames": allowed_groups,
    }
    if provider_config:
        cfg_payload.update(
            {
                "source_db_host": provider_config.get("host"),
                "source_db_port": provider_config.get("port"),
                "source_db_user": provider_config.get("user"),
                "source_db_password": provider_config.get("password"),
                "source_db_name": provider_config.get("database"),
            }
        )
    db_handler.save_pipeline_config(bid, cfg_payload)

    ingest_watermark_set = False
    if provider != "csv" and _should_bootstrap_ingest_watermark(bid):
        wm = _compute_onboarding_ingest_watermark(ingest_lookback_days)
        db_handler.set_sync_watermark(bid, f"orchestrator_ingest_{bid}", wm.isoformat())
        ingest_watermark_set = True

    return {
        "success": True,
        "bid": bid,
        "provider": provider,
        "source_bid": source_bid,
        "source_table": source_table,
        "analysis_mode": analysis_mode,
        "analytics_enabled": analytics_enabled,
        "parameters_saved": params_saved,
        "ingest_lookback_days": ingest_lookback_days,
        "ingest_watermark_set": ingest_watermark_set,
        "group_filter_enabled": bool(payload.group_filter_enabled),
        "allowed_groupnames": allowed_groups,
    }


@app.delete("/telephony/{bid}/integrations/{provider}")
def telephony_disconnect_integration(bid: str, provider: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    _require_settings_manage_or_admin(bid, user)
    removed = db_handler.delete_telephony_integration(bid=bid, provider=provider)
    return {"success": True, "message": "Disconnected successfully" if removed else "Integration did not exist"}


@app.get("/telephony/{bid}/preview")
def telephony_preview_calls(
    bid: str,
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    dialstatus: Optional[str] = Query(default=None),
    direction: Optional[str] = Query(default=None),
    limit: int = Query(default=300),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    integration = _get_active_telephony_integration_or_none(bid)
    if not integration:
        raise HTTPException(status_code=400, detail="No telephony provider connected")
    source_config = _source_db_config_from_telephony_integration(integration)
    source_bid = str(integration.get("source_bid") or bid)
    if date_from and date_to:
        archive_records, archive_table = _fetch_source_calls_from_config(
            source_bid=source_bid,
            table_kind="archive",
            source_config=source_config,
            date_from=date_from,
            date_to=date_to,
            dialstatus_csv=dialstatus,
            direction_csv=direction,
            limit=limit,
        )
        history_records, history_table = _fetch_source_calls_from_config(
            source_bid=source_bid,
            table_kind="history",
            source_config=source_config,
            date_from=date_from,
            date_to=date_to,
            dialstatus_csv=dialstatus,
            direction_csv=direction,
            limit=limit,
        )
        seen_callids = set()
        records = []
        for row in [*archive_records, *history_records]:
            callid = str(row.get("callid") or "")
            if callid and callid in seen_callids:
                continue
            if callid:
                seen_callids.add(callid)
            records.append(row)
        records.sort(key=lambda row: str(row.get("starttime") or ""), reverse=True)
        records = records[: int(limit)]
        return {
            "source": "archive+history",
            "table": {"archive": archive_table, "history": history_table},
            "count": len(records),
            "records": records,
        }

    records, table = _fetch_source_calls_from_config(
        source_bid=source_bid,
        table_kind="history",
        source_config=source_config,
        dialstatus_csv=dialstatus,
        direction_csv=direction,
        limit=limit,
    )
    return {"source": "history", "table": table, "count": len(records), "records": records}


def _existing_table_columns(cursor, table_name: str) -> set:
    cursor.execute(f"SHOW COLUMNS FROM `{table_name}`")
    rows = cursor.fetchall() or []
    cols = set()
    for row in rows:
        # DictCursor returns dict; normal cursor returns tuple.
        if isinstance(row, dict):
            cols.add(row.get("Field"))
        else:
            cols.add(row[0])
    return {c for c in cols if c}


@app.post("/telephony/{bid}/sync-to-db")
def telephony_sync_to_db(bid: str, payload: TelephonySyncToDbRequest, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    _require_settings_manage_or_admin(bid, user)
    records = payload.records or []
    if not records:
        raise HTTPException(status_code=400, detail="records array is required")

    table_name = f"{bid}_raw_calls"
    conn = _db_conn()
    try:
        cursor = conn.cursor()
        cursor.execute("SHOW TABLES LIKE %s", (table_name,))
        if not cursor.fetchone():
            raise HTTPException(status_code=400, detail=f"Table {table_name} does not exist")

        existing_cols = _existing_table_columns(cursor, table_name)

        # Map source fields into raw_calls field names commonly used in this codebase.
        mapped_rows: List[Dict[str, Any]] = []
        for r in records:
            row = dict(r or {})
            mapped = {
                "bid": str(bid),
                "callid": str(row.get("callid") or ""),
                "agentname": row.get("agentname"),
                "groupname": row.get("groupname"),
                "call_starttime": row.get("starttime") or row.get("call_starttime"),
                "call_endtime": row.get("endtime") or row.get("call_endtime"),
                "call_status": row.get("dialstatus") or row.get("call_status"),
                "direction": row.get("direction"),
                "fileurl": row.get("filename") or row.get("fileurl"),
                "agent_callinfo": row.get("emp_phone") or row.get("agent_callinfo"),
                "customer_callinfo": row.get("customer_callinfo") or row.get("clicktocalldid") or row.get("customer_callinfo"),
            }
            if not mapped["callid"]:
                continue
            # Keep only columns that exist on the destination table.
            mapped_rows.append({k: v for k, v in mapped.items() if k in existing_cols and v is not None})

        if not mapped_rows:
            raise HTTPException(status_code=400, detail="No valid records (missing callid)")

        # Build insert statement dynamically from existing columns.
        columns = sorted({k for row in mapped_rows for k in row.keys()})
        if "callid" not in columns:
            raise HTTPException(status_code=400, detail="callid is required")

        cols_sql = ", ".join(f"`{c}`" for c in columns)
        placeholders = ", ".join(["%s"] * len(columns))
        update_cols = [c for c in columns if c not in ("callid",)]
        update_sql = ", ".join(f"`{c}` = VALUES(`{c}`)" for c in update_cols) if update_cols else ""
        sql = f"INSERT INTO `{table_name}` ({cols_sql}) VALUES ({placeholders})"
        if update_sql:
            sql += f" ON DUPLICATE KEY UPDATE {update_sql}"

        values = [[row.get(c) for c in columns] for row in mapped_rows]
        cursor.executemany(sql, values)
        conn.commit()
        return {"success": True, "inserted": len(mapped_rows)}
    finally:
        conn.close()


@app.get("/sync/cache/{bid}/history")
def sync_cache_history(
    bid: str,
    limit: int = Query(default=300),
    refresh: bool = Query(default=False),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    _require_settings_manage_or_admin(bid, user)
    cache_key = f"callsync:{CALL_SYNC_CACHE_SCHEMA_VERSION}:{bid}:history:limit:{limit}"
    if not refresh:
        cached = db_handler.get_call_sync_cache(cache_key)
        if cached:
            cached["cache_hit"] = True
            return cached
    records, table_name = _fetch_source_calls(bid, "history", limit=limit)
    if not table_name:
        return {
            "source": "call_history",
            "table": None,
            "count": 0,
            "records": [],
            "cached_at": datetime.now().isoformat(),
            "cache_hit": False,
            "message": "Source call history table not found for this business",
            "expected_tables": [f"{bid}_call_history", f"{bid}_callhistory"],
            "configured_source_db": {"host": get_sync_source_db_config().get("host"), "database": get_sync_source_db_config().get("database")},
        }
    payload = {"source": "call_history", "table": table_name, "count": len(records), "records": records, "cached_at": datetime.now().isoformat(), "cache_hit": False}
    db_handler.upsert_call_sync_cache(
        cache_key=cache_key,
        bid=bid,
        source="call_history",
        payload=payload,
        ttl_seconds=CALL_SYNC_CACHE_TTL_SECONDS,
    )
    return payload


@app.get("/sync/cache/{bid}/archive")
def sync_cache_archive(
    bid: str,
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    limit: int = Query(default=500),
    refresh: bool = Query(default=False),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    _require_settings_manage_or_admin(bid, user)
    range_key = f"{date_from}:{date_to}" if date_from and date_to else "recent"
    cache_key = f"callsync:{CALL_SYNC_CACHE_SCHEMA_VERSION}:{bid}:archive:{range_key}:limit:{limit}"
    if not refresh:
        cached = db_handler.get_call_sync_cache(cache_key)
        if cached:
            cached["cache_hit"] = True
            return cached
    records, table_name = _fetch_source_calls(bid, "archive", date_from=date_from, date_to=date_to, limit=limit)
    if not table_name:
        return {
            "source": "call_archive",
            "table": None,
            "count": 0,
            "records": [],
            "date_from": date_from,
            "date_to": date_to,
            "cached_at": datetime.now().isoformat(),
            "cache_hit": False,
            "message": "Source call archive table not found for this business",
            "expected_tables": [f"{bid}_callarchive", f"{bid}_call_archive"],
            "configured_source_db": {"host": get_sync_source_db_config().get("host"), "database": get_sync_source_db_config().get("database")},
        }
    payload = {
        "source": "call_archive",
        "table": table_name,
        "count": len(records),
        "records": records,
        "date_from": date_from,
        "date_to": date_to,
        "cached_at": datetime.now().isoformat(),
        "cache_hit": False,
    }
    db_handler.upsert_call_sync_cache(
        cache_key=cache_key,
        bid=bid,
        source="call_archive",
        payload=payload,
        ttl_seconds=CALL_SYNC_CACHE_TTL_SECONDS,
    )
    return payload


@app.get("/sync/check-count/{bid}")
def sync_check_count(
    bid: str,
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    _require_settings_manage_or_admin(bid, user)
    history_total = _fetch_source_count(bid, "history")
    archive_total = _fetch_source_count(bid, "archive", date_from=date_from, date_to=date_to) if date_from and date_to else 0
    return {
        "callhistory": {"total": history_total},
        "callarchive": {"total": archive_total},
        "total_count": int(history_total + archive_total),
        "date_from": date_from,
        "date_to": date_to,
    }


@app.post("/sync/calls/{bid}")
def sync_calls(
    bid: str,
    payload: Dict[str, Any],
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    _require_settings_manage_or_admin(bid, user)
    date_from = payload.get("date_from")
    date_to = payload.get("date_to")
    limit = int(payload.get("limit", 500))
    if date_from and date_to:
        records, table_name = _fetch_source_calls(bid, "archive", date_from=date_from, date_to=date_to, limit=limit)
        source = "call_archive"
        cache_key = f"callsync:{CALL_SYNC_CACHE_SCHEMA_VERSION}:{bid}:archive:{date_from}:{date_to}:limit:{limit}"
    else:
        records, table_name = _fetch_source_calls(bid, "history", limit=limit)
        source = "call_history"
        cache_key = f"callsync:{CALL_SYNC_CACHE_SCHEMA_VERSION}:{bid}:history:limit:{limit}"

    payload_obj = {
        "source": source,
        "table": table_name,
        "count": len(records),
        "records": records,
        "date_from": date_from,
        "date_to": date_to,
        "cached_at": datetime.now().isoformat(),
    }
    if table_name:
        db_handler.upsert_call_sync_cache(
            cache_key=cache_key,
            bid=bid,
            source=source,
            payload=payload_obj,
            ttl_seconds=CALL_SYNC_CACHE_TTL_SECONDS,
        )

    if not table_name:
        return {"message": f"Source table not found for {source}", "cached_count": 0, "source": source, "table": None, "cache_key": cache_key, "stored_in": "mysql"}
    return {"message": f"Successfully cached {len(records)} calls from {source}", "cached_count": len(records), "source": source, "table": table_name, "cache_key": cache_key, "stored_in": "mysql"}


@app.post("/sync/upload/{bid}")
async def sync_upload_raw_calls(
    bid: str,
    request: Request,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    _require_settings_manage_or_admin(bid, user)
    # Multipart parsing requires python-multipart dependency.
    raise HTTPException(
        status_code=501,
        detail="Excel multipart upload requires python-multipart in this runtime. Use admin JSON business create path or install python-multipart.",
    )


@app.get("/transcription/calls/{bid}")
def transcription_calls(
    bid: str,
    status: str = Query(default="0"),
    groupname: Optional[str] = Query(default=None),
    agentname: Optional[str] = Query(default=None),
    start_date: Optional[str] = Query(default=None),
    end_date: Optional[str] = Query(default=None),
):
    conn = _db_conn()
    try:
        cursor = conn.cursor()
        where = ["call_status = 'ANSWER'"]
        params: List[Any] = []
        if status != "all":
            where.append("status = %s")
            params.append(status)
        if groupname:
            where.append("groupname = %s")
            params.append(groupname)
        if agentname:
            where.append("agentname = %s")
            params.append(agentname)
        if start_date:
            where.append("DATE(call_starttime) >= %s")
            params.append(start_date)
        if end_date:
            where.append("DATE(call_starttime) <= %s")
            params.append(end_date)
        query = f"""
            SELECT callid, bid, agentname, groupname, call_starttime, call_endtime, call_status, status,
                   transcription_status, transcription_requested, selected_for_processing
            FROM `{bid}_raw_calls`
            WHERE {" AND ".join(where)}
            ORDER BY call_starttime DESC
            LIMIT 500
        """
        cursor.execute(query, tuple(params))
        calls = cursor.fetchall() or []
        return {"calls": _to_json_safe(calls), "count": len(calls)}
    finally:
        conn.close()


@app.post("/transcription/trigger/{bid}")
def transcription_trigger(bid: str, payload: Dict[str, Any]):
    callids = payload.get("callids", [])
    if not callids:
        raise HTTPException(status_code=400, detail="No call IDs provided")
    conn = _db_conn()
    try:
        cursor = conn.cursor()
        placeholders = ",".join(["%s"] * len(callids))
        query = f"""
            UPDATE `{bid}_raw_calls`
            SET transcription_requested = 1, transcription_status = 'pending', selected_for_processing = 1
            WHERE callid IN ({placeholders})
        """
        cursor.execute(query, callids)
        affected = cursor.rowcount
    finally:
        conn.close()
    return {"message": f"Successfully queued {affected} calls for transcription", "queued_count": affected}


@app.post("/transcription/trigger-combined/{bid}")
def transcription_trigger_combined(bid: str, payload: Dict[str, Any]):
    callids = payload.get("callids", [])
    t = transcription_trigger(bid, {"callids": callids})
    a = analysis_trigger(bid, {"callids": callids})
    return {"message": "Combined trigger executed", "transcription": t, "analysis": a}


@app.get("/transcription/status/{bid}/{batch_id}")
def transcription_status(bid: str, batch_id: str):
    return {"bid": str(bid), "batch_id": str(batch_id), "status": "completed", "message": "Legacy batch status endpoint is now immediate"}


@app.post("/analysis/trigger/{bid}")
def analysis_trigger(bid: str, payload: Dict[str, Any]):
    callids = payload.get("callids", [])
    if not callids:
        raise HTTPException(status_code=400, detail="No call IDs provided")
    try:
        from analyze_calls_with_parameters import CallAnalyzer
        analyzer = CallAnalyzer(CONFIG)
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to initialize analyzer: {e}")

    success_count = 0
    error_count = 0
    errors: List[str] = []
    for callid in callids:
        try:
            call_data = db_handler.get_raw_call_details(bid, callid)
            if not call_data:
                errors.append(f"{callid}: Call not found")
                error_count += 1
                continue
            if call_data.get("call_status") != "ANSWER":
                errors.append(f"{callid}: Call not answered")
                error_count += 1
                continue
            transcript = call_data.get("transcripts")
            if not transcript:
                errors.append(f"{callid}: No transcript available")
                error_count += 1
                continue
            speaker_segments = call_data.get("speaker_segments")
            if speaker_segments and isinstance(speaker_segments, str):
                speaker_segments = json.loads(speaker_segments)
            actual_duration = call_data.get("duration") or call_data.get("duration_seconds")
            analyzer.analyze_call(bid=bid, callid=callid, transcript=transcript, speaker_segments=speaker_segments or [], actual_duration=actual_duration)
            success_count += 1
        except Exception as e:
            errors.append(f"{callid}: {e}")
            error_count += 1

    response_data: Dict[str, Any] = {"message": f"Analysis completed: {success_count} successful, {error_count} failed", "success_count": success_count, "error_count": error_count}
    if errors:
        response_data["errors"] = errors[:10]
    return response_data


# =============================================================================
# Pipeline config endpoints
# =============================================================================

class PipelineConfigRequest(BaseModel):
    pipeline_enabled: Optional[bool] = None
    source_db_host: Optional[str] = None
    source_db_port: Optional[int] = None
    source_db_user: Optional[str] = None
    source_db_password: Optional[str] = None  # stored encrypted
    source_db_name: Optional[str] = None
    stt_provider: Optional[str] = None
    stt_api_key: Optional[str] = None           # stored encrypted
    min_call_duration_s: Optional[int] = None
    sync_batch: Optional[int] = None
    transcribe_batch: Optional[int] = None
    sync_interval_s: Optional[int] = None
    lead_filter_enabled: Optional[bool] = None
    crm_provider: Optional[str] = None
    lookback_days: Optional[int] = None
    source_type: Optional[str] = None
    source_bid: Optional[str] = None
    analysis_mode: Optional[str] = None
    analytics_enabled: Optional[bool] = None
    group_filter_enabled: Optional[bool] = None
    allowed_groupnames: Optional[List[str]] = None
    propensity_enabled: Optional[bool] = None
    summary_mode: Optional[str] = None
    summary_instructions: Optional[str] = None
    webhook_ingest_enabled: Optional[bool] = None
    ingest_secret: Optional[str] = None


class SummaryConfigRequest(BaseModel):
    summary_mode: str
    summary_instructions: Optional[str] = None


@app.put("/pipeline/{bid}/summary-config")
def save_summary_config(
    bid: str,
    body: SummaryConfigRequest,
    user: Dict[str, Any] = Depends(_auth_user),
):
    """Save only call summary style fields (isolated from full pipeline config upsert)."""
    _require_settings_manage_or_admin(bid, user)
    from summary_config import MAX_SUMMARY_INSTRUCTIONS_LEN, normalize_summary_mode

    mode = normalize_summary_mode(body.summary_mode)
    instructions = (body.summary_instructions or "").strip() or None
    if mode == "custom":
        if not instructions:
            raise HTTPException(
                status_code=400,
                detail="summary_instructions is required when summary_mode is custom",
            )
        if len(instructions) > MAX_SUMMARY_INSTRUCTIONS_LEN:
            raise HTTPException(
                status_code=400,
                detail=f"summary_instructions must be at most {MAX_SUMMARY_INSTRUCTIONS_LEN} characters",
            )
    else:
        instructions = None

    try:
        db_handler.ensure_business_pipeline_config_table()
        db_handler.save_summary_config(bid, mode, instructions)
    except ValueError as exc:
        raise HTTPException(status_code=400, detail=str(exc)) from exc
    except Exception as exc:
        logger.exception("Failed to save summary config for bid %s: %s", bid, exc)
        raise HTTPException(
            status_code=500,
            detail=f"Failed to save summary config: {exc}",
        ) from exc

    return {
        "success": True,
        "bid": bid,
        "summary_mode": mode,
        "summary_instructions": instructions or "",
    }


@app.get("/pipeline/{bid}/config")
def get_pipeline_config(
    bid: str,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    db_handler.ensure_business_pipeline_config_table()
    cfg = db_handler.get_pipeline_config(bid)
    if not cfg:
        return {"bid": bid, "exists": False}

    # Mask encrypted secrets before returning
    safe = dict(cfg)
    for k in ("source_db_password_enc", "stt_api_key_enc"):
        if safe.get(k):
            safe[k] = "***"
    for col in ("created_at", "updated_at"):
        if hasattr(safe.get(col), "isoformat"):
            safe[col] = safe[col].isoformat()
    safe["allowed_groupnames"] = db_handler._decode_allowed_groupnames(safe.get("allowed_groupnames"))
    safe["group_filter_enabled"] = bool(safe.get("group_filter_enabled"))
    safe["propensity_enabled"] = bool(int(safe.get("propensity_enabled") or 0))
    from summary_config import normalize_summary_mode

    safe["summary_mode"] = normalize_summary_mode(safe.get("summary_mode"))
    safe["summary_instructions"] = safe.get("summary_instructions") or ""
    safe["webhook_ingest_enabled"] = bool(int(safe.get("webhook_ingest_enabled") or 0))
    if safe.get("ingest_secret"):
        safe["ingest_secret"] = "***"
        safe["ingest_secret_configured"] = True
    else:
        safe["ingest_secret_configured"] = False
        safe.pop("ingest_secret", None)
    safe["exists"] = True
    return safe


@app.post("/pipeline/{bid}/ingest/enable")
def enable_call_ingest_webhook(
    bid: str,
    request: Request,
    user: Dict[str, Any] = Depends(_auth_user),
):
    """Enable per-BID webhook ingest using the shared INGEST_SECRET (does not disable polling)."""
    _require_settings_manage_or_admin(bid, user)
    if not shared_ingest_secret():
        raise HTTPException(
            status_code=400,
            detail="Set INGEST_SECRET (or WEBHOOK_SECRET) in server .env before enabling webhook ingest.",
        )
    db_handler.ensure_business_pipeline_config_table()
    db_handler.save_pipeline_config(
        bid,
        {
            "webhook_ingest_enabled": 1,
            "ingest_secret": "",
        },
    )
    base = _public_api_base(request)
    return {
        "success": True,
        "bid": bid,
        "webhook_ingest_enabled": True,
        "ingest_secret_configured": True,
        "ingest_url": f"{base}/api/webhook/call-ingest",
        "per_bid_url": f"{base}/api/v1/bids/{bid}/calls/ingest",
        "schema_url": f"{base}/api/v1/bids/{bid}/calls/ingest/schema",
        "header": "X-Ingest-Secret",
        "note": (
            f"Webhook enabled for BID {bid}. Mcube uses the same shared secret for all BIDs "
            "(INGEST_SECRET in server .env). Orchestrator polling remains active as backup."
        ),
    }


@app.post("/pipeline/{bid}/ingest/disable")
def disable_call_ingest_webhook(
    bid: str,
    user: Dict[str, Any] = Depends(_auth_user),
):
    """Disable webhook ingest for a BID (Mcube may still POST; PCAA returns 403)."""
    _require_settings_manage_or_admin(bid, user)
    db_handler.ensure_business_pipeline_config_table()
    db_handler.save_pipeline_config(bid, {"webhook_ingest_enabled": 0})
    return {
        "success": True,
        "bid": bid,
        "webhook_ingest_enabled": False,
        "message": f"Webhook ingest disabled for BID {bid}. Polling unchanged.",
    }


@app.put("/pipeline/{bid}/config")
def save_pipeline_config(
    bid: str,
    body: PipelineConfigRequest,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_settings_manage_or_admin(bid, user)
    db_handler.ensure_business_pipeline_config_table()

    raw = body.model_dump() if hasattr(body, "model_dump") else body.dict()
    data = {k: v for k, v in raw.items() if v is not None and v != ""}
    if "summary_mode" in data:
        from summary_config import MAX_SUMMARY_INSTRUCTIONS_LEN, normalize_summary_mode

        mode = normalize_summary_mode(data.get("summary_mode"))
        data["summary_mode"] = mode
        if mode == "custom":
            instructions = (data.get("summary_instructions") or "").strip()
            if not instructions:
                raise HTTPException(
                    status_code=400,
                    detail="summary_instructions is required when summary_mode is custom",
                )
            if len(instructions) > MAX_SUMMARY_INSTRUCTIONS_LEN:
                raise HTTPException(
                    status_code=400,
                    detail=f"summary_instructions must be at most {MAX_SUMMARY_INSTRUCTIONS_LEN} characters",
                )
            data["summary_instructions"] = instructions
        else:
            data["summary_instructions"] = None
    if "summary_instructions" in data and data.get("summary_instructions") is not None:
        from summary_config import MAX_SUMMARY_INSTRUCTIONS_LEN

        text = str(data["summary_instructions"]).strip()
        if len(text) > MAX_SUMMARY_INSTRUCTIONS_LEN:
            raise HTTPException(
                status_code=400,
                detail=f"summary_instructions must be at most {MAX_SUMMARY_INSTRUCTIONS_LEN} characters",
            )
        data["summary_instructions"] = text or None

    try:
        db_handler.save_pipeline_config(bid, data)
    except Exception as exc:
        logger.exception("Failed to save pipeline config for bid %s: %s", bid, exc)
        raise HTTPException(
            status_code=500,
            detail=f"Failed to save pipeline config: {exc}",
        ) from exc
    return {"success": True, "message": f"Pipeline config saved for bid {bid}"}


@app.get("/pipeline/{bid}/status")
def get_pipeline_status(
    bid: str,
    user: Dict[str, Any] = Depends(_auth_user),
):
    """Return pipeline operational status: watermarks, pending counts, last errors."""
    _auth_user_for_bid(bid, user)
    db_handler.ensure_business_pipeline_config_table()
    db_handler.ensure_sync_watermarks_table()

    cfg = db_handler.get_pipeline_config(bid)
    call_sync_wm = db_handler.get_sync_watermark(bid, f"call_sync_{bid}")
    lsq_leads_wm = db_handler.get_sync_watermark(bid, "lsq_leads")

    # Counts from call_records if the table exists
    counts: Dict[str, Any] = {}
    try:
        db_handler.ensure_call_records_table(bid)
        table = f"{bid}_call_records"
        with db_handler.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"""SELECT status, COUNT(*) AS cnt
                    FROM `{table}` GROUP BY status"""
            )
            for row in (cursor.fetchall() or []):
                counts[row["status"]] = row["cnt"]
    except Exception:
        pass

    return {
        "bid": bid,
        "pipeline_enabled": bool(cfg.get("pipeline_enabled", False)) if cfg else False,
        "watermarks": {
            "call_sync": str(call_sync_wm) if call_sync_wm else None,
            "lsq_leads": str(lsq_leads_wm) if lsq_leads_wm else None,
        },
        "call_record_counts": counts,
    }


# =============================================================================
# Agent config endpoints
# =============================================================================

class AgentConfigRequest(BaseModel):
    agent_name: str
    agent_enabled: Optional[bool] = True
    model_provider: Optional[str] = "bedrock"
    model_id: Optional[str] = "amazon.nova-lite-v1:0"
    system_prompt: Optional[str] = None
    user_prompt_template: Optional[str] = None
    output_schema: Optional[str] = None
    temperature: Optional[float] = 0.1
    max_tokens: Optional[int] = 4096


@app.get("/agents/{bid}")
def list_agent_configs(
    bid: str,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    db_handler.ensure_business_agent_config_table()
    configs = db_handler.get_agent_configs(bid)
    for cfg in configs:
        for col in ("created_at", "updated_at"):
            if hasattr(cfg.get(col), "isoformat"):
                cfg[col] = cfg[col].isoformat()
    return {"bid": bid, "agents": configs}


@app.post("/agents/{bid}")
def create_or_update_agent_config(
    bid: str,
    body: AgentConfigRequest,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_business_admin_or_master(bid, user)
    db_handler.ensure_business_agent_config_table()
    data = {k: v for k, v in body.dict().items() if v is not None}
    row_id = db_handler.save_agent_config(bid, data)
    return {"success": True, "id": row_id, "agent_name": body.agent_name}


@app.delete("/agents/{bid}/{agent_name}")
def delete_agent_config(
    bid: str,
    agent_name: str,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_business_admin_or_master(bid, user)
    deleted = db_handler.delete_agent_config(bid, agent_name)
    if not deleted:
        raise HTTPException(status_code=404, detail=f"Agent '{agent_name}' not found for bid {bid}")
    return {"success": True, "deleted": agent_name}


# =============================================================================
# Call records endpoints
# =============================================================================

@app.get("/call-records/{bid}")
def list_call_records(
    bid: str,
    page: int = Query(default=1, ge=1),
    page_size: int = Query(default=20, ge=1, le=200),
    status: Optional[str] = Query(default=None),
    agent_name: Optional[str] = Query(default=None),
    search: Optional[str] = Query(default=None),
    date_from: Optional[str] = Query(default=None),
    date_to: Optional[str] = Query(default=None),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    try:
        db_handler.ensure_call_records_table(bid)
        result = db_handler.get_call_records_list(
            bid=bid,
            page=page,
            page_size=page_size,
            status=status,
            agent_name=agent_name,
            search=search,
            date_from=date_from,
            date_to=date_to,
        )
        return result
    except Exception as exc:
        logger.error("list_call_records error bid=%s: %s", bid, exc, exc_info=True)
        raise HTTPException(status_code=500, detail="Failed to fetch call records")


@app.get("/call-records/{bid}/{callid:path}")
def get_call_record_detail(
    bid: str,
    callid: str,
    user: Dict[str, Any] = Depends(_auth_user),
):
    _auth_user_for_bid(bid, user)
    try:
        db_handler.ensure_call_records_table(bid)
        record = db_handler.get_call_record_detail(bid, callid)
        if not record:
            raise HTTPException(status_code=404, detail=f"Call record {callid} not found")
        return record
    except HTTPException:
        raise
    except Exception as exc:
        logger.error("get_call_record_detail error bid=%s callid=%s: %s", bid, callid, exc, exc_info=True)
        raise HTTPException(status_code=500, detail="Failed to fetch call record")


# ---------------------------------------------------------------------------
# STT Pipeline management endpoints
# ---------------------------------------------------------------------------

class SttPipelineConfigUpdate(BaseModel):
    enabled: Optional[bool] = None
    raw_calls_id_col: Optional[str] = None
    raw_calls_url_col: Optional[str] = None
    batch_size: Optional[int] = None
    poll_interval_s: Optional[int] = None
    notes: Optional[str] = None


def _stt_bid_row_with_stats(cfg: Dict[str, Any]) -> Dict[str, Any]:
    bid = cfg["bid"]
    stats = db_handler.get_stt_job_stats(bid=bid)
    return {
        **cfg,
        "enabled": bool(cfg.get("enabled")),
        "job_stats": {
            "pending": stats.get("pending", 0),
            "processing": stats.get("processing", 0),
            "done": stats.get("done", 0),
            "failed": stats.get("failed", 0),
        },
    }


@app.get("/stt-pipeline/bids")
def stt_pipeline_list_bids(user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    try:
        # Merge DB config rows with discovered tables
        configs = {c["bid"]: c for c in db_handler.get_stt_pipeline_bid_configs()}
        discovered = db_handler.discover_stt_raw_call_bids()
        for bid in discovered:
            if bid not in configs:
                configs[bid] = {
                    "bid": bid,
                    "enabled": False,
                    "raw_calls_id_col": "id",
                    "raw_calls_url_col": "recording_url",
                    "batch_size": 10,
                    "poll_interval_s": 30,
                    "notes": None,
                    "created_at": None,
                    "updated_at": None,
                }
        result = [_stt_bid_row_with_stats(v) for v in sorted(configs.values(), key=lambda x: x["bid"])]
        return {"bids": result, "total": len(result)}
    except Exception as exc:
        logger.error("stt_pipeline_list_bids error: %s", exc, exc_info=True)
        raise HTTPException(status_code=500, detail="Failed to list STT pipeline bids")


@app.post("/stt-pipeline/bids/{bid}/toggle")
def stt_pipeline_toggle_bid(
    bid: str,
    body: Dict[str, Any] = Body(...),
    user: Dict[str, Any] = Depends(_auth_user),
):
    if not user.get("is_master") and not user_has_business_admin(user, bid):
        raise HTTPException(status_code=403, detail="Business admin or master access required")
    enabled = bool(body.get("enabled", False))
    try:
        db_handler.toggle_stt_pipeline_bid(bid, enabled)
        return {"bid": bid, "enabled": enabled}
    except Exception as exc:
        logger.error("stt_pipeline_toggle_bid error bid=%s: %s", bid, exc, exc_info=True)
        raise HTTPException(status_code=500, detail="Failed to toggle STT pipeline bid")


@app.put("/stt-pipeline/bids/{bid}/config")
def stt_pipeline_update_config(
    bid: str,
    body: SttPipelineConfigUpdate,
    user: Dict[str, Any] = Depends(_auth_user),
):
    if not user.get("is_master") and not user_has_business_admin(user, bid):
        raise HTTPException(status_code=403, detail="Business admin or master access required")
    data = {k: v for k, v in body.dict().items() if v is not None}
    try:
        db_handler.upsert_stt_pipeline_bid_config(bid, data)
        cfg = db_handler.get_stt_pipeline_bid_config(bid) or {"bid": bid, **data}
        return _stt_bid_row_with_stats(cfg)
    except Exception as exc:
        logger.error("stt_pipeline_update_config error bid=%s: %s", bid, exc, exc_info=True)
        raise HTTPException(status_code=500, detail="Failed to update STT pipeline config")


@app.get("/stt-pipeline/stats")
def stt_pipeline_stats(user: Dict[str, Any] = Depends(_auth_user)):
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")
    try:
        stats = db_handler.get_stt_job_stats()
        return {
            "pending": stats.get("pending", 0),
            "processing": stats.get("processing", 0),
            "done": stats.get("done", 0),
            "failed": stats.get("failed", 0),
            "total": sum(stats.values()),
        }
    except Exception as exc:
        logger.error("stt_pipeline_stats error: %s", exc, exc_info=True)
        raise HTTPException(status_code=500, detail="Failed to get STT pipeline stats")


# ---------------------------------------------------------------------------
# PCA Master Panel endpoints
# ---------------------------------------------------------------------------

def _require_master(user: Dict[str, Any]) -> None:
    if not user.get("is_master"):
        raise HTTPException(status_code=403, detail="Master admin access required")


def _table_exists(cursor, table_name: str) -> bool:
    cursor.execute(
        """
        SELECT 1
        FROM information_schema.tables
        WHERE table_schema = DATABASE() AND table_name = %s
        LIMIT 1
        """,
        (table_name,),
    )
    return cursor.fetchone() is not None


def _mask_key(key: str) -> str:
    key = str(key or "")
    if len(key) <= 8:
        return "****" if key else ""
    return f"{key[:4]}...{key[-4:]}"


def _read_last_lines(path: str, limit: int = 200) -> List[str]:
    if not os.path.exists(path):
        return []
    try:
        with open(path, "r", encoding="utf-8", errors="ignore") as fh:
            lines = fh.readlines()
        return [line.rstrip("\n") for line in lines[-max(1, min(limit, 2000)):]]
    except Exception:
        return []


def _pca_log_key_safe(log_key: str) -> bool:
    return bool(re.match(r"^[a-zA-Z0-9_.-]+$", str(log_key or "")))


def _pca_bid_safe(bid: str) -> str:
    return "".join(c for c in str(bid or "").strip() if c.isalnum() or c in "_-")


def _pca_log_line_matches_bid(line: str, bid: Optional[str]) -> bool:
    bid_safe = _pca_bid_safe(bid or "")
    if not bid_safe:
        return True
    text = str(line or "")
    if not text:
        return True

    matching_tokens = (
        f"{bid_safe}_",
        f"bid={bid_safe}",
        f"bid:{bid_safe}",
        f"BID {bid_safe}",
        f"Business ID: {bid_safe}",
    )
    if any(token in text for token in matching_tokens):
        return True

    table_match = re.search(
        r"\b([A-Za-z0-9_-]+)_(?:raw_calls|callanalytics|sarvamresponse|callhistory|calls)\b",
        text,
    )
    if table_match:
        return table_match.group(1) == bid_safe

    return True


def _resolve_pca_log_file(backend_dir: str, log_key: str, bid: Optional[str] = None) -> str:
    """Map API log_key to a path under backend_dir (per-BID stems or legacy names)."""
    backend_real = os.path.realpath(backend_dir)
    bid_safe = _pca_bid_safe(bid or "")
    if bid_safe and log_key in {"orchestration", "analytics_updates"}:
        raise HTTPException(
            status_code=403,
            detail=f"Shared legacy logs are not available inside BID {bid_safe} Manage view",
        )
    if bid_safe and (
        (log_key.startswith("orchestration_") and not log_key.endswith(f"_{bid_safe}"))
        or (log_key.startswith("analytics_updates_") and not log_key.endswith(f"_{bid_safe}"))
    ):
        raise HTTPException(status_code=403, detail=f"log_key does not belong to BID {bid_safe}")
    if log_key == "orchestration":
        candidate = os.path.join(backend_real, "orchestration.log")
        if os.path.isfile(candidate):
            return candidate
        raise HTTPException(
            status_code=404,
            detail="Legacy orchestration.log not found; use orchestration_<bid> log key from PCA status.",
        )
    if log_key == "analytics_updates":
        candidate = os.path.join(backend_real, "analytics_updates.log")
        if os.path.isfile(candidate):
            return candidate
        raise HTTPException(
            status_code=404,
            detail="Legacy analytics_updates.log not found; use analytics_updates_<bid> log key.",
        )
    if not _pca_log_key_safe(log_key):
        raise HTTPException(status_code=400, detail="Invalid log_key")
    filename = f"{log_key}.log"
    candidate = os.path.realpath(os.path.join(backend_real, filename))
    if candidate != backend_real and not candidate.startswith(backend_real + os.sep):
        raise HTTPException(status_code=400, detail="Invalid log path")
    if os.path.isfile(candidate):
        return candidate
    raise HTTPException(status_code=404, detail="Log file not found")


def _pca_collect_log_cards(backend_dir: str, bid: Optional[str] = None) -> List[Dict[str, Any]]:
    """List per-BID orchestration / analytics log files plus legacy shared files if present."""
    cards: List[Dict[str, Any]] = []
    seen = set()
    bid_safe = _pca_bid_safe(bid or "")

    def add_card(log_key: str, filename: str, name: str, description: str) -> None:
        if log_key in seen:
            return
        full = os.path.join(backend_dir, filename)
        exists = os.path.isfile(full)
        size_b = os.path.getsize(full) if exists else 0
        cards.append({
            "log_key": log_key,
            "name": name,
            "description": description,
            "filename": filename,
            "exists": exists,
            "size": f"{int(size_b / 1024)} KB" if size_b else "0 KB",
        })
        seen.add(log_key)

    orch_glob = f"orchestration_{bid_safe}.log" if bid_safe else "orchestration_*.log"
    orch_paths = sorted(glob.glob(os.path.join(backend_dir, orch_glob)))
    for path in orch_paths:
        base = os.path.basename(path)
        stem = base[:-4] if base.endswith(".log") else base
        bid_part = stem[len("orchestration_") :]
        add_card(
            stem,
            base,
            f"Orchestration ({bid_part})",
            "Ingest / queue / orchestration log for this business",
        )

    analytics_glob = f"analytics_updates_{bid_safe}.log" if bid_safe else "analytics_updates_*.log"
    analytics_paths = sorted(glob.glob(os.path.join(backend_dir, analytics_glob)))
    for path in analytics_paths:
        base = os.path.basename(path)
        stem = base[:-4] if base.endswith(".log") else base
        bid_part = stem[len("analytics_updates_") :]
        add_card(
            stem,
            base,
            f"Analytics updates ({bid_part})",
            "Analytics success/update log for this business",
        )

    legacy_o = os.path.join(backend_dir, "orchestration.log")
    if not bid_safe and os.path.isfile(legacy_o):
        add_card(
            "orchestration",
            "orchestration.log",
            "Orchestration (legacy shared)",
            "Legacy log before per-BID files; new runs use orchestration_<bid>.log",
        )
    legacy_a = os.path.join(backend_dir, "analytics_updates.log")
    if not bid_safe and os.path.isfile(legacy_a):
        add_card(
            "analytics_updates",
            "analytics_updates.log",
            "Analytics updates (legacy shared)",
            "Legacy log before per-BID files",
        )

    return cards


def _pca_aggregate_log_errors(
    backend_dir: str,
    pattern: str,
    tail_lines: int = 500,
    bid: Optional[str] = None,
) -> List[str]:
    out: List[str] = []
    for path in sorted(glob.glob(os.path.join(backend_dir, pattern))):
        tail = _read_last_lines(path, tail_lines)
        tag = os.path.basename(path)
        for ln in tail:
            ul = ln.upper()
            if (
                ("ERROR" in ul or "FAIL" in ul or "WARN" in ul or "WARNING" in ul)
                and _pca_log_line_matches_bid(ln, bid)
            ):
                out.append(f"[{tag}] {ln}")
    return out[-300:]


def _pca_log_pattern_for_bid(base: str, bid: Optional[str]) -> str:
    bid_safe = _pca_bid_safe(bid or "")
    if bid_safe:
        return f"{base}_{bid_safe}.log"
    return f"{base}*.log"


_RABBITMQ_CACHE: Dict[str, Any] = {"at": 0.0, "queues": {}}
_RABBITMQ_CACHE_TTL_SEC = 20


def _rabbitmq_queue_stats(queue_name: str, *, timeout_sec: float = 0.35) -> Dict[str, Any]:
    now = time.time()
    cached_queues = _RABBITMQ_CACHE.get("queues") or {}
    cache_age = now - float(_RABBITMQ_CACHE.get("at") or 0)
    if cached_queues and cache_age < _RABBITMQ_CACHE_TTL_SEC:
        cached = cached_queues.get(queue_name)
        if cached is not None:
            return {**cached, "name": queue_name, "cached": True}

    def _cache_and_return(payload: Dict[str, Any]) -> Dict[str, Any]:
        merged = dict(cached_queues)
        merged[queue_name] = payload
        _RABBITMQ_CACHE["at"] = now
        _RABBITMQ_CACHE["queues"] = merged
        return {**payload, "name": queue_name}

    try:
        out = subprocess.check_output(
            ["rabbitmqctl", "list_queues", "name", "messages", "consumers", "-q"],
            stderr=subprocess.STDOUT,
            text=True,
            timeout=timeout_sec,
        )
        parsed: Dict[str, Dict[str, Any]] = {}
        for line in out.splitlines():
            parts = line.strip().split("\t")
            if len(parts) >= 3:
                parsed[parts[0]] = {
                    "messages": int(parts[1]),
                    "consumers": int(parts[2]),
                    "error": None,
                }
        _RABBITMQ_CACHE["at"] = now
        _RABBITMQ_CACHE["queues"] = {
            name: {"name": name, **stats}
            for name, stats in parsed.items()
        }
        if queue_name in parsed:
            return {"name": queue_name, **parsed[queue_name]}
        return _cache_and_return(
            {"messages": 0, "consumers": 0, "error": "Queue not found"}
        )
    except Exception as exc:
        return _cache_and_return(
            {"messages": None, "consumers": None, "error": str(exc)}
        )


def _pca_job_process_snapshot(selected_bid: Optional[str]) -> Dict[str, Dict[str, Any]]:
    """Resolve orchestrator / pipeline / STT process state concurrently."""
    tasks: Dict[str, Any] = {}
    if selected_bid:
        tasks["loop"] = lambda: _process_running(f"orchestrator_loop_{selected_bid}.sh")
        tasks["pipeline"] = lambda: _process_running(f"orchestrate_pipeline.py --bid {selected_bid}")
    else:
        tasks["loop"] = lambda: _process_running("orchestrator_loop_")
        tasks["pipeline"] = lambda: {"running": False, "pid": None}
    tasks["stt"] = _stt_worker_process

    results: Dict[str, Dict[str, Any]] = {}
    with ThreadPoolExecutor(max_workers=3) as pool:
        futures = {pool.submit(fn): key for key, fn in tasks.items()}
        for future in as_completed(futures):
            key = futures[future]
            try:
                results[key] = future.result()
            except Exception:
                results[key] = {"running": False, "pid": None}
    return results


def _pca_business_user_map(cursor, bids: List[str]) -> Dict[str, Dict[str, Any]]:
    clean_bids = [str(bid).strip() for bid in bids if str(bid or "").strip()]
    if not clean_bids:
        return {}
    placeholders = ", ".join(["%s"] * len(clean_bids))
    cursor.execute(
        f"""
        SELECT
            uba.bid,
            GROUP_CONCAT(
                bu.id
                ORDER BY CASE WHEN uba.role = 'admin' THEN 0 ELSE 1 END, bu.id
                SEPARATOR ', '
            ) AS business_user_ids,
            GROUP_CONCAT(
                bu.username
                ORDER BY CASE WHEN uba.role = 'admin' THEN 0 ELSE 1 END, bu.id
                SEPARATOR ', '
            ) AS business_login_user_ids,
            GROUP_CONCAT(
                COALESCE(NULLIF(bu.full_name, ''), bu.username)
                ORDER BY CASE WHEN uba.role = 'admin' THEN 0 ELSE 1 END, bu.id
                SEPARATOR ', '
            ) AS business_user_names,
            COUNT(*) AS business_user_count
        FROM user_business_access uba
        JOIN business_users bu ON bu.id = uba.user_id
        WHERE uba.bid IN ({placeholders})
          AND bu.is_active = TRUE
        GROUP BY uba.bid
        """,
        clean_bids,
    )
    rows = cursor.fetchall() or []
    result: Dict[str, Dict[str, Any]] = {}
    for row in rows:
        user_ids = str(row.get("business_user_ids") or "").strip()
        login_user_ids = str(row.get("business_login_user_ids") or "").strip()
        names = str(row.get("business_user_names") or "").strip()
        first_user_id = user_ids.split(",", 1)[0].strip() if user_ids else None
        first_login_user_id = login_user_ids.split(",", 1)[0].strip() if login_user_ids else None
        first_name = names.split(",", 1)[0].strip() if names else None
        result[str(row.get("bid"))] = {
            "business_user_id": first_user_id,
            "business_user_ids": user_ids,
            "business_login_user_id": first_login_user_id,
            "business_login_user_ids": login_user_ids,
            "business_user_name": first_name,
            "business_user_names": names,
            "business_user_count": int(row.get("business_user_count") or 0),
        }
    return result


def _pgrep_process_lines(pattern: str) -> List[str]:
    try:
        result = subprocess.run(
            ["pgrep", "-af", pattern],
            capture_output=True,
            text=True,
            timeout=2,
        )
        return [line.strip() for line in (result.stdout or "").splitlines() if line.strip()]
    except Exception:
        return []


def _is_noise_process_line(line: str) -> bool:
    lowered = line.lower()
    noise_markers = (
        "grep",
        "pgrep",
        "python3 -c",
        "extglob",
        "cursor",
        "dump_bash_state",
        "snap=$(command cat",
    )
    return any(marker in lowered for marker in noise_markers)


def _process_running(pattern: str) -> Dict[str, Any]:
    """Return the first real matching process PID, ignoring shell wrappers and pgrep itself."""
    for line in _pgrep_process_lines(pattern):
        if _is_noise_process_line(line):
            continue
        if pattern.endswith(".sh") and line.count(" ") > 20:
            continue
        parts = line.split(None, 1)
        if parts and parts[0].isdigit():
            return {"running": True, "pid": int(parts[0])}
    return {"running": False, "pid": None}


def _bid_loop_running(bid: str) -> bool:
    """True when this BID's orchestrator loop or an active orchestrate run is running."""
    bid = str(bid or "").strip()
    if not bid:
        return False
    if _process_running(f"orchestrator_loop_{bid}.sh").get("running"):
        return True
    return bool(_process_running(f"orchestrate_pipeline.py --bid {bid}").get("running"))


def _stt_worker_process() -> Dict[str, Any]:
    proc = _process_running("call-proccessing/stt_pipeline/run.py")
    if proc.get("running"):
        return proc
    return _process_running("run.py --worker")


def _collect_bid_stats(cursor, bid: str) -> Dict[str, Any]:
    table = f"{bid}_raw_calls"
    if not _table_exists(cursor, table):
        return {
            "bid": bid,
            "total_calls": 0,
            "calls_today": 0,
            "by_status": {},
            "total_minutes": 0,
            "calls_per_day": [],
            "calls_per_hour": [],
        }

    cursor.execute(f"SELECT COUNT(*) AS c FROM `{table}`")
    total_calls = int((cursor.fetchone() or {}).get("c") or 0)

    cursor.execute(
        f"""
        SELECT COALESCE(CAST(status AS CHAR), '0') AS status_key, COUNT(*) AS c
        FROM `{table}`
        GROUP BY status_key
        """
    )
    by_status = {str(r["status_key"]): int(r["c"]) for r in (cursor.fetchall() or [])}

    cursor.execute(
        f"SELECT COUNT(*) AS c FROM `{table}` WHERE DATE(call_starttime) = CURDATE()"
    )
    calls_today = int((cursor.fetchone() or {}).get("c") or 0)

    cursor.execute(
        f"""
        SELECT DATE(call_starttime) AS d, COUNT(*) AS c
        FROM `{table}`
        WHERE call_starttime >= DATE_SUB(CURDATE(), INTERVAL 30 DAY)
        GROUP BY DATE(call_starttime)
        ORDER BY d ASC
        """
    )
    calls_per_day = [
        {"date": str(r["d"]), "count": int(r["c"])}
        for r in (cursor.fetchall() or [])
        if r.get("d")
    ]

    cursor.execute(
        f"""
        SELECT HOUR(call_starttime) AS h, COUNT(*) AS c
        FROM `{table}`
        WHERE DATE(call_starttime) = CURDATE()
        GROUP BY HOUR(call_starttime)
        ORDER BY h ASC
        """
    )
    calls_per_hour = [
        {"hour": f"{int(r['h']):02d}:00", "count": int(r["c"])}
        for r in (cursor.fetchall() or [])
        if r.get("h") is not None
    ]

    cursor.execute(
        f"""
        SELECT COALESCE(SUM(TIMESTAMPDIFF(SECOND, call_starttime, call_endtime)), 0) AS sec
        FROM `{table}`
        WHERE call_starttime IS NOT NULL AND call_endtime IS NOT NULL
        """
    )
    total_seconds = int((cursor.fetchone() or {}).get("sec") or 0)

    return {
        "bid": bid,
        "total_calls": total_calls,
        "calls_today": calls_today,
        "by_status": by_status,
        "total_minutes": int(total_seconds / 60),
        "calls_per_day": calls_per_day,
        "calls_per_hour": calls_per_hour,
    }


def _get_transcribed_minutes_for_bid(cursor, bid: str) -> int:
    raw_table = f"{bid}_raw_calls"
    sarvam_table = f"{bid}_sarvamresponse"
    if not _table_exists(cursor, raw_table):
        return 0
    if not _table_exists(cursor, sarvam_table):
        return 0

    cursor.execute(
        f"""
        SELECT COALESCE(
            SUM(
                CASE
                    WHEN s.transcript IS NOT NULL AND s.transcript != ''
                    THEN COALESCE(s.duration, TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime))
                    ELSE 0
                END
            ),
            0
        ) AS transcribed_seconds
        FROM `{raw_table}` r
        LEFT JOIN `{sarvam_table}` s ON r.callid = s.callid
        """
    )
    seconds = float((cursor.fetchone() or {}).get("transcribed_seconds") or 0)
    return int(seconds / 60)


def _ensure_usage_allocation_table() -> None:
    with _db_conn() as conn:
        cursor = conn.cursor()
        cursor.execute(
            """
            CREATE TABLE IF NOT EXISTS pca_business_allocations (
                bid VARCHAR(64) PRIMARY KEY,
                monthly_call_limit INT NOT NULL DEFAULT 0,
                monthly_minute_limit INT NOT NULL DEFAULT 0,
                notes TEXT NULL,
                updated_by VARCHAR(255) NULL,
                updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
            """
        )
        cursor.execute(
            """
            SELECT 1
            FROM information_schema.columns
            WHERE table_schema = DATABASE()
              AND table_name = 'pca_business_allocations'
              AND column_name = 'monthly_minute_limit'
            LIMIT 1
            """
        )
        has_minute_col = cursor.fetchone() is not None
        if not has_minute_col:
            cursor.execute(
                """
                ALTER TABLE pca_business_allocations
                ADD COLUMN monthly_minute_limit INT NOT NULL DEFAULT 0
                """
            )


@app.get("/pca/businesses-overview")
def pca_businesses_overview(user: Dict[str, Any] = Depends(_auth_user)):
    _require_master(user)
    businesses, _ = auth_handler.get_all_businesses()
    discovered = set(db_handler.discover_stt_raw_call_bids())
    stt_proc = _stt_worker_process()

    by_bid: Dict[str, Dict[str, Any]] = {}
    with _db_conn() as conn:
        cursor = conn.cursor()
        known_bids = [str(b.get("bid") or "").strip() for b in businesses or [] if str(b.get("bid") or "").strip()]
        user_by_bid = _pca_business_user_map(cursor, sorted(set(known_bids) | discovered))
        for b in businesses or []:
            bid = str(b.get("bid") or "").strip()
            if not bid:
                continue
            s = _collect_bid_stats(cursor, bid)
            user_info = user_by_bid.get(bid, {})
            by_bid[bid] = {
                "bid": bid,
                "name": b.get("name") or f"Business {bid}",
                "business_user_id": user_info.get("business_user_id"),
                "business_user_ids": user_info.get("business_user_ids"),
                "business_login_user_id": user_info.get("business_login_user_id"),
                "business_login_user_ids": user_info.get("business_login_user_ids"),
                "business_user_name": user_info.get("business_user_name"),
                "business_user_names": user_info.get("business_user_names"),
                "business_user_count": user_info.get("business_user_count", 0),
                "is_active": bool(b.get("is_active", True)),
                "has_pipeline": bid in discovered,
                "loop_running": _bid_loop_running(bid),
                **s,
            }

        for bid in discovered:
            if bid in by_bid:
                continue
            s = _collect_bid_stats(cursor, bid)
            user_info = user_by_bid.get(bid, {})
            by_bid[bid] = {
                "bid": bid,
                "name": f"Business {bid}",
                "business_user_id": user_info.get("business_user_id"),
                "business_user_ids": user_info.get("business_user_ids"),
                "business_login_user_id": user_info.get("business_login_user_id"),
                "business_login_user_ids": user_info.get("business_login_user_ids"),
                "business_user_name": user_info.get("business_user_name"),
                "business_user_names": user_info.get("business_user_names"),
                "business_user_count": user_info.get("business_user_count", 0),
                "is_active": True,
                "has_pipeline": True,
                "loop_running": _bid_loop_running(bid),
                **s,
            }

    any_loop_running = any(row.get("loop_running") for row in by_bid.values())
    generic_loop = _process_running("orchestrator_loop.sh")
    orch_running = any_loop_running or bool(generic_loop.get("running"))

    db_handler.ensure_business_pipeline_config_table()
    webhook_by_bid: Dict[str, bool] = {}
    with _db_conn() as conn:
        cursor = conn.cursor()
        cursor.execute(
            "SELECT bid, webhook_ingest_enabled FROM business_pipeline_config"
        )
        for row in cursor.fetchall() or []:
            webhook_by_bid[str(row.get("bid") or "")] = bool(
                int(row.get("webhook_ingest_enabled") or 0)
            )
    for bid, row in by_bid.items():
        row["webhook_ingest_enabled"] = webhook_by_bid.get(bid, False)

    webhook_enabled_count = sum(1 for v in webhook_by_bid.values() if v)
    public_base = (
        os.getenv("PUBLIC_BASE_URL", "").strip()
        or CONFIG.get("PUBLIC_BASE_URL", "")
        or ""
    ).rstrip("/")
    webhook_path = "/api/webhook/call-ingest"

    return {
        "businesses": sorted(by_bid.values(), key=lambda x: str(x.get("bid"))),
        "webhook_ingest_url": webhook_path,
        "webhook_ingest_full_url": f"{public_base}{webhook_path}" if public_base else webhook_path,
        "webhook_ingest_header": "X-Ingest-Secret",
        "webhook_ingest_secret_configured": bool(shared_ingest_secret()),
        "webhook_ingest_enabled_count": webhook_enabled_count,
        "global_jobs": {
            "loop_running": orch_running,
            "orch_running": orch_running,
            "stt_running": bool(stt_proc.get("running")),
        },
    }


@app.get("/pca/webhook-ingest-logs")
def pca_webhook_ingest_logs(
    user: Dict[str, Any] = Depends(_auth_user),
    bid: Optional[str] = Query(default=None),
    action: Optional[str] = Query(default=None),
    skip_reason: Optional[str] = Query(default=None),
    signature_valid: Optional[bool] = Query(default=None),
    callid: Optional[str] = Query(default=None),
    date_from: Optional[str] = Query(default=None, description="YYYY-MM-DD"),
    date_to: Optional[str] = Query(default=None, description="YYYY-MM-DD"),
    hours: Optional[int] = Query(default=168, ge=1, le=8760),
    limit: int = Query(default=50, ge=1, le=500),
    offset: int = Query(default=0, ge=0),
):
    """Master-only audit log for Mcube webhook POSTs (pcaa_ingest_log)."""
    _require_master(user)
    if date_from or date_to:
        hours = None
    return db_handler.list_webhook_ingest_logs(
        bid=bid,
        action=action,
        skip_reason=skip_reason,
        signature_valid=signature_valid,
        callid=callid,
        date_from=date_from,
        date_to=date_to,
        hours=hours,
        limit=limit,
        offset=offset,
    )


@app.get("/pca/usage")
def pca_usage(user: Dict[str, Any] = Depends(_auth_user)):
    _require_master(user)
    _ensure_usage_allocation_table()

    businesses, _ = auth_handler.get_all_businesses()
    usage_rows: List[Dict[str, Any]] = []
    with _db_conn() as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM pca_business_allocations")
        alloc_map = {str(r["bid"]): r for r in (cursor.fetchall() or [])}

        for b in businesses or []:
            bid = str(b.get("bid") or "").strip()
            if not bid:
                continue
            stats = _collect_bid_stats(cursor, bid)
            used_minutes = _get_transcribed_minutes_for_bid(cursor, bid)
            allocation = alloc_map.get(bid, {})
            monthly_limit = int(
                allocation.get("monthly_minute_limit")
                or allocation.get("monthly_call_limit")
                or 0
            )
            usage_pct = (
                min(round((used_minutes / monthly_limit) * 100, 2), 100.0)
                if monthly_limit > 0
                else None
            )
            usage_rows.append({
                "bid": bid,
                "name": b.get("name") or f"Business {bid}",
                "calls_today": int(stats.get("calls_today") or 0),
                "total_calls": int(stats.get("total_calls") or 0),
                "used_minutes": used_minutes,
                "monthly_minute_limit": monthly_limit,
                "usage_percent": usage_pct,
                "limit_exhausted": bool(monthly_limit > 0 and used_minutes > monthly_limit),
                "notes": allocation.get("notes"),
                "updated_at": allocation.get("updated_at"),
            })

    return {"businesses": sorted(usage_rows, key=lambda x: str(x.get("bid")))}


@app.get("/pca/usage/{bid}")
def pca_usage_for_bid(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    _auth_user_for_bid(bid, user)
    _ensure_usage_allocation_table()

    with _db_conn() as conn:
        cursor = conn.cursor()
        stats = _collect_bid_stats(cursor, str(bid))
        cursor.execute("SELECT * FROM pca_business_allocations WHERE bid = %s", (str(bid),))
        allocation = cursor.fetchone() or {}
        used_minutes = _get_transcribed_minutes_for_bid(cursor, str(bid))

    monthly_limit = int(
        allocation.get("monthly_minute_limit")
        or allocation.get("monthly_call_limit")
        or 0
    )
    usage_pct = (
        min(round((used_minutes / monthly_limit) * 100, 2), 100.0)
        if monthly_limit > 0
        else None
    )
    return {
        "bid": str(bid),
        "calls_today": int(stats.get("calls_today") or 0),
        "total_calls": int(stats.get("total_calls") or 0),
        "used_minutes": used_minutes,
        "monthly_minute_limit": monthly_limit,
        "usage_percent": usage_pct,
        "limit_exhausted": bool(monthly_limit > 0 and used_minutes > monthly_limit),
        "notes": allocation.get("notes"),
        "updated_at": allocation.get("updated_at"),
    }


@app.put("/pca/usage/{bid}/allocation")
def pca_update_allocation(
    bid: str,
    payload: Dict[str, Any] = Body(default={}),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_master(user)
    _ensure_usage_allocation_table()
    monthly_limit = int(
        payload.get("monthly_minute_limit")
        or payload.get("monthly_call_limit")
        or 0
    )
    notes = (payload.get("notes") or None)
    updated_by = user.get("username")

    with _db_conn() as conn:
        cursor = conn.cursor()
        cursor.execute(
            """
            INSERT INTO pca_business_allocations (bid, monthly_call_limit, notes, updated_by)
            VALUES (%s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE
                monthly_call_limit = VALUES(monthly_call_limit),
                monthly_minute_limit = VALUES(monthly_call_limit),
                notes = VALUES(notes),
                updated_by = VALUES(updated_by)
            """,
            (str(bid), monthly_limit, notes, updated_by),
        )
    return {"updated": True, "bid": str(bid), "monthly_minute_limit": monthly_limit, "notes": notes}


@app.get("/pca/bid/{bid}/stats")
def pca_bid_stats(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    _require_master(user)
    with _db_conn() as conn:
        cursor = conn.cursor()
        return _collect_bid_stats(cursor, str(bid))


@app.get("/pca/status")
def pca_status(
    bid: Optional[str] = Query(default=None),
    lite: bool = Query(default=False),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_master(user)
    selected_bid = str((bid or "")).strip() or None
    skip_db_stats = bool(lite) or bool(selected_bid)
    proc_snapshot = _pca_job_process_snapshot(selected_bid)
    loop_proc = proc_snapshot.get("loop") or {"running": False, "pid": None}
    pipeline_proc = proc_snapshot.get("pipeline") or {"running": False, "pid": None}
    stt_proc = proc_snapshot.get("stt") or {"running": False, "pid": None}
    queue = _rabbitmq_queue_stats("stt_jobs")
    db_status: Dict[str, Any] = {"total": 0, "by_status": {}, "bid": selected_bid}

    if not skip_db_stats:
        try:
            default_bid = selected_bid
            if not default_bid:
                discovered = db_handler.discover_stt_raw_call_bids()
                default_bid = discovered[0] if discovered else "1713"
            with _db_conn() as conn:
                cursor = conn.cursor()
                stats = _collect_bid_stats(cursor, default_bid)
                db_status = {
                    "total": stats.get("total_calls", 0),
                    "by_status": stats.get("by_status", {}),
                    "bid": default_bid,
                }
        except Exception:
            pass

    backend_dir = os.path.dirname(os.path.abspath(__file__))
    logs = _pca_collect_log_cards(backend_dir, selected_bid)
    loop_running = bool(loop_proc.get("running"))

    return {
        "jobs": [
            {
                "job_key": "orchestrator-loop",
                "name": "Orchestrator Loop",
                "description": "Runs orchestrate pipeline on schedule.",
                "running": loop_running,
                "scheduled": loop_running,
                "required": True,
                "pid": loop_proc.get("pid"),
                "command": "bash dashboard-backend/orchestrator_loop_<bid>.sh",
            },
            {
                "job_key": "stt-worker",
                "name": "STT Worker",
                "description": "Consumes stt_jobs queue and transcribes calls.",
                "running": bool(stt_proc.get("running")),
                "scheduled": False,
                "required": True,
                "pid": stt_proc.get("pid"),
                "command": "call-proccessing/stt_pipeline/venv/bin/python run.py --worker",
            },
            {
                "job_key": "orchestrate-pipeline",
                "name": "One-time Orchestrate Pipeline",
                "description": "Manual on-demand orchestration run.",
                "running": bool(pipeline_proc.get("running")),
                "scheduled": False,
                "required": False,
                "pid": pipeline_proc.get("pid"),
                "command": "python3 dashboard-backend/orchestrate_pipeline.py --bid <bid>",
            },
        ],
        "queues": [
            {
                "name": "stt_jobs",
                "description": "Speech-to-text transcription jobs",
                "messages": queue.get("messages"),
                "consumers": queue.get("consumers"),
                "error": queue.get("error"),
            }
        ],
        "db_status": db_status,
        "logs": logs,
    }


@app.get("/pca/logs/{log_key}")
def pca_logs(
    log_key: str,
    lines: int = Query(default=200),
    filter: Optional[str] = Query(default=None),
    bid: Optional[str] = Query(default=None),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_master(user)
    backend_dir = os.path.dirname(os.path.abspath(__file__))
    full = _resolve_pca_log_file(backend_dir, log_key, bid=bid)
    out_lines = _read_last_lines(full, limit=lines)
    out_lines = [ln for ln in out_lines if _pca_log_line_matches_bid(ln, bid)]
    if filter == "errors":
        out_lines = [ln for ln in out_lines if ("ERROR" in ln.upper() or "FAIL" in ln.upper())]
    elif filter == "warnings":
        out_lines = [ln for ln in out_lines if ("WARN" in ln.upper() or "WARNING" in ln.upper())]
    return {"lines": out_lines}


@app.get("/pca/failures")
def pca_failures(bid: Optional[str] = Query(default=None), user: Dict[str, Any] = Depends(_auth_user)):
    _require_master(user)
    selected_bid = str((bid or "")).strip() or None
    stt_failed: List[Dict[str, Any]] = []
    invalid_url: List[Dict[str, Any]] = []
    analytics_no_transcript: List[Dict[str, Any]] = []

    with _db_conn() as conn:
        cursor = conn.cursor()
        bids = [selected_bid] if selected_bid else db_handler.discover_stt_raw_call_bids()
        for bid_item in bids:
            table = f"{bid_item}_raw_calls"
            if not _table_exists(cursor, table):
                continue
            cursor.execute(
                f"""
                SELECT callid, agentname, groupname, call_starttime, fileurl
                FROM `{table}` WHERE status = -2
                ORDER BY call_starttime DESC LIMIT 100
                """
            )
            for row in (cursor.fetchall() or []):
                row["bid"] = bid_item
                stt_failed.append(row)

            cursor.execute(
                f"""
                SELECT callid, agentname, groupname, call_starttime, fileurl
                FROM `{table}` WHERE status = -1
                ORDER BY call_starttime DESC LIMIT 100
                """
            )
            for row in (cursor.fetchall() or []):
                row["bid"] = bid_item
                invalid_url.append(row)

            cursor.execute(
                f"""
                SELECT callid, agentname, groupname, call_starttime, fileurl
                FROM `{table}` WHERE status = -3
                ORDER BY call_starttime DESC LIMIT 100
                """
            )
            for row in (cursor.fetchall() or []):
                row["bid"] = bid_item
                analytics_no_transcript.append(row)

    backend_dir = os.path.dirname(os.path.abspath(__file__))
    orchestration_err = _pca_aggregate_log_errors(
        backend_dir,
        _pca_log_pattern_for_bid("orchestration", selected_bid),
        bid=selected_bid,
    )
    analytics_err = _pca_aggregate_log_errors(
        backend_dir,
        _pca_log_pattern_for_bid("analytics_updates", selected_bid),
        bid=selected_bid,
    )

    stt_failed = sorted(stt_failed, key=lambda r: str(r.get("call_starttime") or ""), reverse=True)[:300]
    invalid_url = sorted(invalid_url, key=lambda r: str(r.get("call_starttime") or ""), reverse=True)[:300]
    analytics_no_transcript = sorted(analytics_no_transcript, key=lambda r: str(r.get("call_starttime") or ""), reverse=True)[:300]

    return {
        "stt_failed": stt_failed,
        "invalid_url": invalid_url,
        "analytics_no_transcript": analytics_no_transcript,
        "summary": {
            "stt_failed_count": len(stt_failed),
            "invalid_url_count": len(invalid_url),
            "no_transcript_count": len(analytics_no_transcript),
        },
        "log_errors": {
            "orchestration": orchestration_err,
            "analytics_updates": analytics_err,
        },
    }


@app.get("/pca/queue/{queue_name}/details")
def pca_queue_details(
    queue_name: str,
    bid: Optional[str] = Query(default=None),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_master(user)
    selected_bid = str((bid or "")).strip() or None
    queue = _rabbitmq_queue_stats(queue_name)
    db_breakdown = {"total": 0, "by_status": {}, "bid": selected_bid}
    queued_calls: List[Dict[str, Any]] = []
    try:
        discovered = db_handler.discover_stt_raw_call_bids()
        target_bid = selected_bid or (discovered[0] if discovered else "1713")
        table = f"{target_bid}_raw_calls"
        with _db_conn() as conn:
            cursor = conn.cursor()
            if _table_exists(cursor, table):
                cursor.execute(
                    f"SELECT COALESCE(CAST(status AS CHAR), '0') AS status_key, COUNT(*) AS c FROM `{table}` GROUP BY status_key"
                )
                by_status = {str(r["status_key"]): int(r["c"]) for r in (cursor.fetchall() or [])}
                cursor.execute(f"SELECT COUNT(*) AS c FROM `{table}`")
                total = int((cursor.fetchone() or {}).get("c") or 0)
                db_breakdown = {"total": total, "by_status": by_status, "bid": target_bid}
                cursor.execute(
                    f"""
                    SELECT callid, agentname, groupname, call_starttime
                    FROM `{table}` WHERE status = 1
                    ORDER BY call_starttime DESC LIMIT 200
                    """
                )
                queued_calls = cursor.fetchall() or []
    except Exception:
        pass
    return {"queue": queue, "db_breakdown": db_breakdown, "queued_calls": queued_calls}


@app.get("/pca/api-credits")
def pca_api_credits(user: Dict[str, Any] = Depends(_auth_user)):
    _require_master(user)
    sarvam_key = os.getenv("SARVAM_SUBSCRIPTION_KEY", CONFIG.get("SARVAM_SUBSCRIPTION_KEY", ""))
    aws_model = os.getenv("AWS_NOVA_MODEL", CONFIG.get("AWS_NOVA_MODEL", "amazon.nova-lite-v1:0"))
    aws_region = os.getenv("AWS_REGION", CONFIG.get("AWS_REGION", "ap-south-1"))
    sarvam_status = "active" if sarvam_key else "not_configured"
    return {
        "providers": [
            {
                "provider": "Sarvam AI",
                "status": sarvam_status,
                "status_label": "Active" if sarvam_key else "Not configured",
                "key_set": bool(sarvam_key),
                "key_masked": _mask_key(sarvam_key),
                "model": "speech-to-text-translate/job/v1",
                "dashboard_url": "https://dashboard.sarvam.ai/",
            },
            {
                "provider": "AWS Bedrock",
                "status": "active",
                "status_label": "Active",
                "billing_type": "pay_per_use",
                "model": aws_model,
                "region": aws_region,
                "dashboard_url": "https://console.aws.amazon.com/cost-management/home",
            },
        ]
    }


@app.get("/pca/stt/config")
def pca_stt_config(user: Dict[str, Any] = Depends(_auth_user)):
    _require_master(user)
    key = os.getenv("SARVAM_SUBSCRIPTION_KEY", CONFIG.get("SARVAM_SUBSCRIPTION_KEY", ""))
    model = os.getenv("TRANSCRIPTION_PROVIDER", CONFIG.get("TRANSCRIPTION_PROVIDER", "sarvam"))
    status = "active" if key else "not_configured"
    return {
        "status": status,
        "status_label": "Active" if key else "Not configured",
        "key_set": bool(key),
        "key_masked": _mask_key(key),
        "model": model,
        "dashboard_url": "https://dashboard.sarvam.ai/",
    }


@app.post("/pca/stt/update-key")
def pca_stt_update_key(payload: Dict[str, Any], user: Dict[str, Any] = Depends(_auth_user)):
    _require_master(user)
    key = str((payload or {}).get("key") or "").strip()
    if not key:
        raise HTTPException(status_code=400, detail="key is required")
    env_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env")
    try:
        lines = []
        if os.path.exists(env_path):
            with open(env_path, "r", encoding="utf-8", errors="ignore") as fh:
                lines = fh.readlines()
        updated = False
        for idx, line in enumerate(lines):
            if line.strip().startswith("SARVAM_SUBSCRIPTION_KEY="):
                lines[idx] = f"SARVAM_SUBSCRIPTION_KEY={key}\n"
                updated = True
                break
        if not updated:
            lines.append(f"SARVAM_SUBSCRIPTION_KEY={key}\n")
        with open(env_path, "w", encoding="utf-8") as fh:
            fh.writelines(lines)
        CONFIG["SARVAM_SUBSCRIPTION_KEY"] = key
        os.environ["SARVAM_SUBSCRIPTION_KEY"] = key
        return {"updated": True, "key_masked": _mask_key(key)}
    except Exception as exc:
        raise HTTPException(status_code=500, detail=f"Failed to update key: {exc}")


@app.post("/pca/jobs/{job_key}/start")
def pca_start_job(job_key: str, payload: Dict[str, Any] = Body(default={}), user: Dict[str, Any] = Depends(_auth_user)):
    _require_master(user)
    base_dir = os.path.dirname(os.path.abspath(__file__))
    project_root = os.path.abspath(os.path.join(base_dir, ".."))
    if job_key == "stt-worker":
        existing = _stt_worker_process()
        if existing.get("running"):
            return {"started": False, "reason": "already_running", "pid": existing.get("pid")}

        stt_dir = os.path.join(project_root, "call-proccessing", "stt_pipeline")
        python_bin = os.path.join(stt_dir, "venv", "bin", "python")
        if not os.path.exists(python_bin):
            raise HTTPException(status_code=500, detail=f"STT venv python not found: {python_bin}")

        log_fh = open("/tmp/pca_stt-worker.log", "ab", buffering=0)
        try:
            proc = subprocess.Popen(
                [python_bin, "run.py", "--worker"],
                cwd=stt_dir,
                stdin=subprocess.DEVNULL,
                stdout=log_fh,
                stderr=subprocess.STDOUT,
                start_new_session=True,
                close_fds=True,
            )
        finally:
            log_fh.close()
        return {"started": True, "pid": proc.pid}
    elif job_key == "orchestrate-pipeline":
        bid = str((payload or {}).get("bid") or "").strip()
        if not bid:
            raise HTTPException(status_code=400, detail="bid is required")
        limit = int((payload or {}).get("limit") or 50)
        cmd = f"python3 \"{base_dir}/orchestrate_pipeline.py\" --bid {bid} --limit {limit}"
    elif job_key == "orchestrator-loop":
        bid = str((payload or {}).get("bid") or "").strip()
        if not bid:
            raise HTTPException(status_code=400, detail="bid is required")
        script = os.path.join(base_dir, f"orchestrator_loop_{bid}.sh")
        if not os.path.exists(script):
            with open(script, "w", encoding="utf-8") as fh:
                fh.write("#!/usr/bin/env bash\n")
                fh.write("set -euo pipefail\n")
                fh.write(f"cd \"{base_dir}\"\n")
                fh.write("while true; do\n")
                fh.write(f"  python3 orchestrate_pipeline.py --bid {bid} --limit 50\n")
                fh.write("  sleep 300\n")
                fh.write("done\n")
            os.chmod(script, 0o755)
        cmd = f"bash \"{script}\""
    else:
        raise HTTPException(status_code=404, detail="Unknown job key")

    proc = subprocess.Popen(
        ["bash", "-lc", f"cd \"{project_root}\" && nohup {cmd} >/tmp/pca_{job_key}.log 2>&1 & echo $!"],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
    )
    out, _ = proc.communicate(timeout=5)
    pid = int((out or "0").strip().splitlines()[-1]) if (out or "").strip() else None
    return {"started": True, "pid": pid}


@app.post("/admin/businesses/{bid}/orchestration/start")
def admin_orchestration_start(bid: str, payload: Dict[str, Any] = Body(default={}), user: Dict[str, Any] = Depends(_auth_user)):
    _require_master(user)
    existing = _process_running(f"orchestrator_loop_{bid}.sh")
    if existing.get("running"):
        return {"started": False, "reason": "already_running", "pid": existing.get("pid")}
    base_dir = os.path.dirname(os.path.abspath(__file__))
    script = os.path.join(base_dir, f"orchestrator_loop_{bid}.sh")
    if not os.path.exists(script):
        with open(script, "w", encoding="utf-8") as fh:
            fh.write("#!/usr/bin/env bash\nset -euo pipefail\n")
            fh.write(f"cd \"{base_dir}\"\n")
            fh.write("while true; do\n")
            fh.write(f"  python3 orchestrate_pipeline.py --bid {bid} --limit 50\n")
            fh.write("  sleep 300\n")
            fh.write("done\n")
        os.chmod(script, 0o755)
    proc = subprocess.Popen(
        ["bash", "-lc", f"nohup bash \"{script}\" >/tmp/orch_{bid}.log 2>&1 & echo $!"],
        stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,
    )
    out, _ = proc.communicate(timeout=5)
    pid = int((out or "0").strip().splitlines()[-1]) if (out or "").strip() else None
    return {"started": True, "pid": pid}


@app.get("/api-push/{bid}/config")
def get_api_push_config(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    _require_master(user)
    db_handler.ensure_api_push_tables()
    cfg = db_handler.get_api_push_config(bid)
    if not cfg:
        cfg = {
            **db_handler._api_push_config_defaults(),
            "bid": str(bid),
            "has_auth_token": False,
            "has_api_key_value": False,
            "api_push_effective_at": None,
        }
    return {"success": True, "data": cfg}


@app.put("/api-push/{bid}/config")
def save_api_push_config(
    bid: str,
    payload: Dict[str, Any] = Body(default_factory=dict),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_master(user)
    saved = db_handler.save_api_push_config(bid, payload or {})
    return {"success": True, "data": saved, "message": "API push settings saved"}


@app.get("/api-push/{bid}/logs")
def get_api_push_logs(
    bid: str,
    limit: int = Query(default=25, ge=1, le=200),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_master(user)
    logs = db_handler.get_api_push_logs(bid, limit=limit)
    return {"success": True, "logs": logs}


@app.post("/api-push/{bid}/test")
def test_api_push(
    bid: str,
    payload: Dict[str, Any] = Body(default_factory=dict),
    user: Dict[str, Any] = Depends(_auth_user),
):
    _require_master(user)
    callid = str(payload.get("callid") or "").strip()
    if not callid:
        raise HTTPException(status_code=400, detail="callid is required")
    result = api_push_service.push_call_update(
        bid,
        callid,
        trigger_event="manual_test",
        force=True,
    )
    return result


@app.get("/admin/businesses/{bid}/orchestration/status")
def admin_orchestration_status(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    _require_master(user)
    p = _process_running(f"orchestrator_loop_{bid}.sh")
    base_dir = os.path.dirname(os.path.abspath(__file__))
    script = os.path.exists(os.path.join(base_dir, f"orchestrator_loop_{bid}.sh"))
    return {"running": bool(p.get("running")), "pid": p.get("pid"), "script_exists": script}


@app.post("/admin/businesses/{bid}/orchestration/stop")
def admin_orchestration_stop(bid: str, user: Dict[str, Any] = Depends(_auth_user)):
    _require_master(user)
    try:
        subprocess.call(["bash", "-lc", f"pkill -f 'orchestrator_loop_{bid}.sh'"])
    except Exception:
        pass
    return {"stopped": True}
