import pymysql
from pymysql.cursors import DictCursor
import logging
import json
import base64
import hashlib
import re
from datetime import datetime, timezone, timedelta
from contextlib import contextmanager
from typing import Any, Dict, List, Optional
from cryptography.fernet import Fernet

logger = logging.getLogger(__name__)

_IST = timezone(timedelta(hours=5, minutes=30))


def _serialize_db_datetime(value):
    """Serialize naive MySQL datetimes as IST so clients do not treat them as UTC."""
    if value is None:
        return None
    if hasattr(value, "replace"):
        aware = value.replace(tzinfo=_IST) if value.tzinfo is None else value.astimezone(_IST)
        return aware.isoformat(timespec="seconds")
    return value


class DatabaseHandler:
    """Handle all database operations for the dashboard"""

    def __init__(self, config):
        self.config = config
        self.db_config = {
            'host': config.get('DB_HOST', '127.0.0.1'),
            'port': config.get('DB_PORT', 3306),
            'user': config.get('DB_USER', 'admin'),
            'password': config.get('DB_PASSWORD', 'mcube@admin123'),
            'database': config.get('DB_NAME', 'voicebot_cluster'),
            'charset': 'utf8mb4',
            'cursorclass': DictCursor,
            'autocommit': True
        }

    def _get_fernet(self):
        secret = str(self.config.get('SECRET_KEY', 'dev-secret-key-change-in-production'))
        key = base64.urlsafe_b64encode(hashlib.sha256(secret.encode('utf-8')).digest())
        return Fernet(key)

    def _encrypt_text(self, value):
        if not value:
            return None
        return self._get_fernet().encrypt(str(value).encode('utf-8')).decode('utf-8')

    def _decrypt_text(self, value):
        if not value:
            return None
        try:
            return self._get_fernet().decrypt(str(value).encode('utf-8')).decode('utf-8')
        except Exception:
            logger.warning("Failed to decrypt CRM secret; data may use legacy format")
            return None

    def _normalize_phone_variants(self, phone):
        digits = "".join(ch for ch in str(phone or "") if ch.isdigit())
        if not digits:
            return []
        core10 = digits[-10:] if len(digits) > 10 else digits
        variants = {digits, f"+{digits}", core10}
        if len(core10) == 10:
            variants.update({f"91{core10}", f"+91{core10}", f"0{core10}"})
        return [v for v in variants if v]

    @contextmanager
    def get_connection(self):
        """Context manager for database connections"""
        conn = None
        try:
            conn = pymysql.connect(**self.db_config)
            yield conn
        except pymysql.Error as e:
            logger.error(f"Database connection error: {e}")
            raise
        finally:
            if conn:
                conn.close()

    def _table_exists(self, cursor, table_name):
        cursor.execute("SHOW TABLES LIKE %s", (table_name,))
        return cursor.fetchone() is not None

    def _column_exists(self, cursor, table_name, column_name):
        cursor.execute(
            "SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS "
            "WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s AND COLUMN_NAME = %s LIMIT 1",
            (table_name, column_name)
        )
        return cursor.fetchone() is not None

    def ensure_crm_integrations_table(self):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS business_crm_integrations (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    provider VARCHAR(50) NOT NULL,
                    access_key_enc LONGTEXT NOT NULL,
                    secret_key_enc LONGTEXT NOT NULL,
                    api_host VARCHAR(255) DEFAULT NULL,
                    is_active BOOLEAN DEFAULT TRUE,
                    config JSON DEFAULT NULL,
                    last_tested_at DATETIME DEFAULT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uniq_bid_provider (bid, provider),
                    INDEX idx_provider_active (provider, is_active)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
                """
            )

    def ensure_crm_leads_cache_table(self):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS crm_leads_cache (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    provider VARCHAR(50) NOT NULL,
                    external_lead_id VARCHAR(128) DEFAULT NULL,
                    lead_name VARCHAR(255) DEFAULT NULL,
                    owner_name VARCHAR(255) DEFAULT NULL,
                    email VARCHAR(255) DEFAULT NULL,
                    phone_primary VARCHAR(32) DEFAULT NULL,
                    phone_variants JSON DEFAULT NULL,
                    lead_status VARCHAR(255) DEFAULT NULL,
                    next_task_due_date VARCHAR(255) DEFAULT NULL,
                    lead_payload JSON DEFAULT NULL,
                    last_synced_at DATETIME NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uniq_bid_provider_external (bid, provider, external_lead_id),
                    INDEX idx_bid_provider (bid, provider),
                    INDEX idx_bid_provider_phone (bid, provider, phone_primary),
                    INDEX idx_last_synced (last_synced_at)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
                """
            )

    def get_crm_integration(self, bid, provider):
        self.ensure_crm_integrations_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                SELECT id, bid, provider, access_key_enc, secret_key_enc, api_host, is_active, config, last_tested_at, created_at, updated_at
                FROM business_crm_integrations
                WHERE bid = %s AND provider = %s
                LIMIT 1
                """,
                (str(bid), str(provider).lower()),
            )
            row = cursor.fetchone()
            if not row:
                return None
            return {
                'id': row['id'],
                'bid': row['bid'],
                'provider': row['provider'],
                'api_host': row.get('api_host'),
                'is_active': bool(row.get('is_active')),
                'config': self._parse_json_field(row.get('config')),
                'last_tested_at': row.get('last_tested_at'),
                'created_at': row.get('created_at'),
                'updated_at': row.get('updated_at'),
                'has_credentials': bool(row.get('access_key_enc') and row.get('secret_key_enc'))
            }

    def get_active_crm_integrations(self, provider=None):
        self.ensure_crm_integrations_table()
        query = """
            SELECT bid, provider, access_key_enc, secret_key_enc, api_host, is_active, config
            FROM business_crm_integrations
            WHERE is_active = 1
        """
        params = []
        if provider:
            query += " AND provider = %s"
            params.append(str(provider).lower())
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(query, params)
            rows = cursor.fetchall() or []
            result = []
            for row in rows:
                result.append({
                    'bid': str(row.get('bid')),
                    'provider': str(row.get('provider') or '').lower(),
                    'access_key': self._decrypt_text(row.get('access_key_enc')),
                    'secret_key': self._decrypt_text(row.get('secret_key_enc')),
                    'api_host': row.get('api_host'),
                    'is_active': bool(row.get('is_active')),
                    'config': self._parse_json_field(row.get('config')) or {},
                })
            return result

    def get_crm_credentials(self, bid, provider):
        self.ensure_crm_integrations_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                SELECT access_key_enc, secret_key_enc, api_host, is_active
                FROM business_crm_integrations
                WHERE bid = %s AND provider = %s
                LIMIT 1
                """,
                (str(bid), str(provider).lower()),
            )
            row = cursor.fetchone()
            if not row:
                return None
            return {
                'access_key': self._decrypt_text(row.get('access_key_enc')),
                'secret_key': self._decrypt_text(row.get('secret_key_enc')),
                'api_host': row.get('api_host'),
                'is_active': bool(row.get('is_active')),
            }

    def upsert_crm_integration(self, bid, provider, access_key, secret_key, api_host=None, is_active=True, config=None):
        self.ensure_crm_integrations_table()
        provider = str(provider).lower()
        access_key_enc = self._encrypt_text(access_key)
        secret_key_enc = self._encrypt_text(secret_key)
        config_json = json.dumps(config or {}, ensure_ascii=True)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                INSERT INTO business_crm_integrations (
                    bid, provider, access_key_enc, secret_key_enc, api_host, is_active, config
                ) VALUES (%s, %s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    access_key_enc = VALUES(access_key_enc),
                    secret_key_enc = VALUES(secret_key_enc),
                    api_host = VALUES(api_host),
                    is_active = VALUES(is_active),
                    config = VALUES(config),
                    updated_at = CURRENT_TIMESTAMP
                """,
                (
                    str(bid),
                    provider,
                    access_key_enc,
                    secret_key_enc,
                    (api_host or None),
                    1 if is_active else 0,
                    config_json,
                ),
            )

    def delete_crm_integration(self, bid, provider):
        self.ensure_crm_integrations_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "DELETE FROM business_crm_integrations WHERE bid = %s AND provider = %s",
                (str(bid), str(provider).lower()),
            )
            return cursor.rowcount > 0

    def mark_crm_integration_tested(self, bid, provider):
        self.ensure_crm_integrations_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                UPDATE business_crm_integrations
                SET last_tested_at = NOW()
                WHERE bid = %s AND provider = %s
                """,
                (str(bid), str(provider).lower()),
            )

    def ensure_crm_push_logs_table(self):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS crm_push_logs (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    provider VARCHAR(50) NOT NULL,
                    callid VARCHAR(128) DEFAULT NULL,
                    lead_phone VARCHAR(64) DEFAULT NULL,
                    crm_lead_id VARCHAR(128) DEFAULT NULL,
                    activity_key VARCHAR(64) DEFAULT NULL,
                    activity_event VARCHAR(64) DEFAULT NULL,
                    status VARCHAR(32) NOT NULL,
                    message TEXT,
                    payload_preview JSON DEFAULT NULL,
                    response_preview JSON DEFAULT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    INDEX idx_bid_provider_created (bid, provider, created_at),
                    INDEX idx_bid_callid (bid, callid),
                    INDEX idx_status (bid, provider, status)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
                """
            )

    def log_crm_push(
        self,
        bid,
        provider,
        status,
        callid=None,
        lead_phone=None,
        crm_lead_id=None,
        activity_key=None,
        activity_event=None,
        message=None,
        payload_preview=None,
        response_preview=None,
    ):
        self.ensure_crm_push_logs_table()
        payload_json = json.dumps(payload_preview, ensure_ascii=True) if payload_preview is not None else None
        response_json = json.dumps(response_preview, ensure_ascii=True) if response_preview is not None else None
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                INSERT INTO crm_push_logs (
                    bid, provider, callid, lead_phone, crm_lead_id,
                    activity_key, activity_event, status, message,
                    payload_preview, response_preview
                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                """,
                (
                    str(bid),
                    str(provider).lower(),
                    callid,
                    lead_phone,
                    crm_lead_id,
                    activity_key,
                    activity_event,
                    str(status),
                    message,
                    payload_json,
                    response_json,
                ),
            )
            return cursor.lastrowid

    def get_crm_push_logs(self, bid, provider="leadsquared", limit=100, offset=0, status=None):
        self.ensure_crm_push_logs_table()
        limit = max(1, min(int(limit or 100), 500))
        offset = max(0, int(offset or 0))
        where = ["bid = %s", "provider = %s"]
        params = [str(bid), str(provider).lower()]
        if status:
            where.append("status = %s")
            params.append(str(status))
        where_sql = " AND ".join(where)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"SELECT COUNT(*) AS total FROM crm_push_logs WHERE {where_sql}",
                params,
            )
            total_row = cursor.fetchone() or {}
            total = int(total_row.get("total") or 0)
            cursor.execute(
                f"""
                SELECT id, bid, provider, callid, lead_phone, crm_lead_id,
                       activity_key, activity_event, status, message,
                       payload_preview, response_preview, created_at
                FROM crm_push_logs
                WHERE {where_sql}
                ORDER BY created_at DESC, id DESC
                LIMIT %s OFFSET %s
                """,
                params + [limit, offset],
            )
            rows = cursor.fetchall() or []
            logs = []
            for row in rows:
                logs.append({
                    "id": row.get("id"),
                    "bid": row.get("bid"),
                    "provider": row.get("provider"),
                    "callid": row.get("callid"),
                    "lead_phone": row.get("lead_phone"),
                    "crm_lead_id": row.get("crm_lead_id"),
                    "activity_key": row.get("activity_key"),
                    "activity_event": row.get("activity_event"),
                    "status": row.get("status"),
                    "message": row.get("message"),
                    "payload_preview": self._parse_json_field(row.get("payload_preview")),
                    "response_preview": self._parse_json_field(row.get("response_preview")),
                    "created_at": _serialize_db_datetime(row.get("created_at")),
                })
            return {
                "logs": logs,
                "total": total,
                "limit": limit,
                "offset": offset,
            }

    # =========================================================================
    # Telephony integrations (source DB connectivity for call sync)
    # =========================================================================

    def ensure_telephony_integrations_table(self):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS business_telephony_integrations (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    provider VARCHAR(50) NOT NULL,
                    source_bid VARCHAR(50) NOT NULL,
                    host VARCHAR(255) DEFAULT NULL,
                    port INT DEFAULT 3306,
                    db_user VARCHAR(255) DEFAULT NULL,
                    db_password_enc LONGTEXT DEFAULT NULL,
                    db_name VARCHAR(255) DEFAULT NULL,
                    config JSON DEFAULT NULL,
                    is_active BOOLEAN DEFAULT TRUE,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uniq_bid_provider (bid, provider),
                    INDEX idx_bid_active (bid, is_active),
                    INDEX idx_provider_active (provider, is_active)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
                """
            )

    def list_telephony_integrations(self, bid):
        self.ensure_telephony_integrations_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                SELECT bid, provider, source_bid, host, port, db_user, db_password_enc, db_name, config, is_active, created_at
                FROM business_telephony_integrations
                WHERE bid = %s
                ORDER BY provider
                """,
                (str(bid),),
            )
            rows = cursor.fetchall() or []
            integrations = []
            for row in rows:
                config = self._parse_json_field(row.get("config")) or {}
                # Build a stable shape for the frontend
                db_host = config.get("host") or row.get("host")
                db_port = int(config.get("port") or row.get("port") or 3306)
                db_user = config.get("user") or config.get("db_user") or row.get("db_user")
                db_name = config.get("database") or config.get("db_name") or row.get("db_name")
                password = config.get("password") or self._decrypt_text(row.get("db_password_enc"))
                effective_config = {
                    "host": db_host,
                    "port": db_port,
                    "user": db_user,
                    "password": password,
                    "database": db_name,
                }
                integrations.append(
                    {
                        "provider": str(row.get("provider") or "").lower(),
                        "source_bid": str(row.get("source_bid") or ""),
                        "config_json": json.dumps(effective_config or {}, ensure_ascii=True),
                        "is_active": 1 if bool(row.get("is_active")) else 0,
                        "created_at": row.get("created_at").isoformat() if hasattr(row.get("created_at"), "isoformat") else row.get("created_at"),
                        "config": effective_config,
                    }
                )
            return integrations

    def upsert_telephony_integration(self, bid, provider, source_bid, config=None, is_active=True):
        self.ensure_telephony_integrations_table()
        provider = str(provider).lower()
        source_bid = str(source_bid).strip()
        if not source_bid:
            raise ValueError("source_bid is required")

        cfg = dict(config or {})
        host = cfg.get("host")
        port = int(cfg.get("port") or 3306) if cfg.get("port") is not None else 3306
        db_user = cfg.get("user") or cfg.get("db_user")
        db_password = cfg.get("password")
        db_name = cfg.get("database") or cfg.get("db_name")

        db_password_enc = self._encrypt_text(db_password) if db_password else None
        config_json = json.dumps(cfg or {}, ensure_ascii=True)

        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                INSERT INTO business_telephony_integrations (
                    bid, provider, source_bid, host, port, db_user, db_password_enc, db_name, config, is_active
                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    source_bid = VALUES(source_bid),
                    host = VALUES(host),
                    port = VALUES(port),
                    db_user = VALUES(db_user),
                    db_password_enc = COALESCE(VALUES(db_password_enc), db_password_enc),
                    db_name = VALUES(db_name),
                    config = VALUES(config),
                    is_active = VALUES(is_active),
                    updated_at = CURRENT_TIMESTAMP
                """,
                (
                    str(bid),
                    provider,
                    source_bid,
                    host,
                    port,
                    db_user,
                    db_password_enc,
                    db_name,
                    config_json,
                    1 if is_active else 0,
                ),
            )

    def delete_telephony_integration(self, bid, provider):
        self.ensure_telephony_integrations_table()
        provider = str(provider).lower()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "DELETE FROM business_telephony_integrations WHERE bid = %s AND provider = %s",
                (str(bid), provider),
            )
            return cursor.rowcount > 0

    def set_active_telephony_provider(self, bid, provider):
        """Ensure only one active telephony integration per bid."""
        self.ensure_telephony_integrations_table()
        provider = str(provider).lower()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "UPDATE business_telephony_integrations SET is_active = 0 WHERE bid = %s",
                (str(bid),),
            )
            cursor.execute(
                "UPDATE business_telephony_integrations SET is_active = 1 WHERE bid = %s AND provider = %s",
                (str(bid), provider),
            )
            return cursor.rowcount > 0

    @staticmethod
    def _slug_data_capture_field_key(display_name: str) -> str:
        key = re.sub(r"[^a-z0-9]+", "_", str(display_name or "").strip().lower())
        key = key.strip("_")
        return key or "field"

    def ensure_data_capture_fields_table(self):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS business_data_capture_fields (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    field_key VARCHAR(80) NOT NULL,
                    display_name VARCHAR(255) NOT NULL,
                    field_type VARCHAR(32) NOT NULL DEFAULT 'text',
                    required_flag TINYINT(1) NOT NULL DEFAULT 0,
                    sort_order INT NOT NULL DEFAULT 0,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uniq_bid_field_key (bid, field_key),
                    INDEX idx_bid_sort (bid, sort_order)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
                """
            )

    def list_data_capture_fields(self, bid: str) -> List[Dict[str, Any]]:
        self.ensure_data_capture_fields_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                SELECT id, field_key, display_name, field_type, required_flag, sort_order
                FROM business_data_capture_fields
                WHERE bid = %s
                ORDER BY sort_order ASC, id ASC
                """,
                (str(bid),),
            )
            rows = cursor.fetchall() or []
            return [
                {
                    "id": row.get("id"),
                    "field_key": row.get("field_key"),
                    "display_name": row.get("display_name"),
                    "field_type": row.get("field_type") or "text",
                    "required": bool(row.get("required_flag")),
                    "sort_order": int(row.get("sort_order") or 0),
                }
                for row in rows
            ]

    def replace_data_capture_fields(self, bid: str, fields: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        self.ensure_data_capture_fields_table()
        bid = str(bid).strip()
        normalized: List[Dict[str, Any]] = []
        used_keys = set()

        for index, raw in enumerate(fields or []):
            display_name = str((raw or {}).get("name") or (raw or {}).get("display_name") or "").strip()
            if not display_name:
                continue
            field_type = str((raw or {}).get("type") or (raw or {}).get("field_type") or "text").strip().lower()
            if field_type not in {"text", "phone", "email", "number", "textarea", "date"}:
                field_type = "text"
            required = bool((raw or {}).get("required"))
            field_key = str((raw or {}).get("field_key") or "").strip()
            if not field_key:
                field_key = self._slug_data_capture_field_key(display_name)
            base_key = field_key
            suffix = 2
            while field_key in used_keys:
                field_key = f"{base_key}_{suffix}"
                suffix += 1
            used_keys.add(field_key)
            normalized.append(
                {
                    "field_key": field_key,
                    "display_name": display_name,
                    "field_type": field_type,
                    "required": required,
                    "sort_order": index,
                }
            )

        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("DELETE FROM business_data_capture_fields WHERE bid = %s", (bid,))
            for row in normalized:
                cursor.execute(
                    """
                    INSERT INTO business_data_capture_fields (
                        bid, field_key, display_name, field_type, required_flag, sort_order
                    ) VALUES (%s, %s, %s, %s, %s, %s)
                    """,
                    (
                        bid,
                        row["field_key"],
                        row["display_name"],
                        row["field_type"],
                        1 if row["required"] else 0,
                        row["sort_order"],
                    ),
                )
            conn.commit()

        return self.list_data_capture_fields(bid)

    def upsert_crm_lead_cache(
        self,
        bid,
        provider,
        external_lead_id,
        lead_name=None,
        owner_name=None,
        email=None,
        phone_primary=None,
        phone_variants=None,
        lead_status=None,
        next_task_due_date=None,
        lead_payload=None,
        last_synced_at=None,
    ):
        self.ensure_crm_leads_cache_table()
        provider = str(provider).lower()
        safe_lead_id = str(external_lead_id or "").strip()
        if not safe_lead_id:
            return False

        variants = phone_variants or self._normalize_phone_variants(phone_primary)
        variants_json = json.dumps(sorted(list({str(v) for v in variants if str(v).strip()})), ensure_ascii=True)
        payload_json = json.dumps(lead_payload or {}, ensure_ascii=True)
        sync_dt = last_synced_at or datetime.utcnow()

        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                INSERT INTO crm_leads_cache (
                    bid, provider, external_lead_id, lead_name, owner_name, email,
                    phone_primary, phone_variants, lead_status, next_task_due_date,
                    lead_payload, last_synced_at
                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    lead_name = VALUES(lead_name),
                    owner_name = VALUES(owner_name),
                    email = VALUES(email),
                    phone_primary = VALUES(phone_primary),
                    phone_variants = VALUES(phone_variants),
                    lead_status = VALUES(lead_status),
                    next_task_due_date = VALUES(next_task_due_date),
                    lead_payload = VALUES(lead_payload),
                    last_synced_at = VALUES(last_synced_at),
                    updated_at = CURRENT_TIMESTAMP
                """,
                (
                    str(bid),
                    provider,
                    safe_lead_id,
                    (str(lead_name).strip() if lead_name is not None else None),
                    (str(owner_name).strip() if owner_name is not None else None),
                    (str(email).strip() if email is not None else None),
                    (str(phone_primary).strip() if phone_primary is not None else None),
                    variants_json,
                    (str(lead_status).strip() if lead_status is not None else None),
                    (str(next_task_due_date).strip() if next_task_due_date is not None else None),
                    payload_json,
                    sync_dt,
                ),
            )
        return True

    def get_cached_crm_lead_by_phone(self, bid, provider, phone):
        self.ensure_crm_leads_cache_table()
        provider = str(provider).lower()
        variants = self._normalize_phone_variants(phone)
        if not variants:
            return None

        with self.get_connection() as conn:
            cursor = conn.cursor()
            for variant in variants:
                cursor.execute(
                    """
                    SELECT
                        id, bid, provider, external_lead_id, lead_name, owner_name, email,
                        phone_primary, phone_variants, lead_status, next_task_due_date,
                        lead_payload, last_synced_at
                    FROM crm_leads_cache
                    WHERE bid = %s
                      AND provider = %s
                      AND (
                          phone_primary = %s
                          OR JSON_CONTAINS(phone_variants, JSON_QUOTE(%s))
                      )
                    ORDER BY last_synced_at DESC, updated_at DESC
                    LIMIT 1
                    """,
                    (str(bid), provider, str(variant), str(variant)),
                )
                row = cursor.fetchone()
                if row:
                    return {
                        'id': row.get('id'),
                        'bid': row.get('bid'),
                        'provider': row.get('provider'),
                        'external_lead_id': row.get('external_lead_id'),
                        'lead_name': row.get('lead_name'),
                        'owner_name': row.get('owner_name'),
                        'email': row.get('email'),
                        'phone_primary': row.get('phone_primary'),
                        'phone_variants': self._parse_json_field(row.get('phone_variants')) or [],
                        'lead_status': row.get('lead_status'),
                        'next_task_due_date': row.get('next_task_due_date'),
                        'lead_payload': self._parse_json_field(row.get('lead_payload')) or {},
                        'last_synced_at': row.get('last_synced_at'),
                    }
        return None

    # ── CRM Lead Activities ────────────────────────────────────────────────

    def ensure_crm_lead_activities_table(self):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS crm_lead_activities (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    provider VARCHAR(50) NOT NULL,
                    activity_id VARCHAR(128) NOT NULL,
                    lead_id VARCHAR(128) DEFAULT NULL,
                    event_code VARCHAR(50) DEFAULT NULL,
                    event_name VARCHAR(255) DEFAULT NULL,
                    activity_created_on DATETIME DEFAULT NULL,
                    activity_modified_on DATETIME DEFAULT NULL,
                    activity_data JSON DEFAULT NULL,
                    activity_fields JSON DEFAULT NULL,
                    last_synced_at DATETIME NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uniq_bid_provider_activity (bid, provider, activity_id),
                    INDEX idx_bid_provider (bid, provider),
                    INDEX idx_lead_id (bid, provider, lead_id),
                    INDEX idx_created_on (bid, provider, activity_created_on)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
                """
            )

    def upsert_crm_lead_activity(
        self,
        bid,
        provider,
        activity_id,
        lead_id=None,
        event_code=None,
        event_name=None,
        activity_created_on=None,
        activity_modified_on=None,
        activity_data=None,
        activity_fields=None,
    ):
        self.ensure_crm_lead_activities_table()
        provider = str(provider).lower()
        safe_activity_id = str(activity_id or "").strip()
        if not safe_activity_id:
            return False
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                INSERT INTO crm_lead_activities (
                    bid, provider, activity_id, lead_id, event_code, event_name,
                    activity_created_on, activity_modified_on,
                    activity_data, activity_fields, last_synced_at
                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())
                ON DUPLICATE KEY UPDATE
                    lead_id = VALUES(lead_id),
                    event_code = VALUES(event_code),
                    event_name = VALUES(event_name),
                    activity_created_on = VALUES(activity_created_on),
                    activity_modified_on = VALUES(activity_modified_on),
                    activity_data = VALUES(activity_data),
                    activity_fields = VALUES(activity_fields),
                    last_synced_at = NOW(),
                    updated_at = CURRENT_TIMESTAMP
                """,
                (
                    str(bid),
                    provider,
                    safe_activity_id,
                    (str(lead_id) if lead_id else None),
                    (str(event_code) if event_code else None),
                    (str(event_name) if event_name else None),
                    activity_created_on,
                    activity_modified_on,
                    json.dumps(activity_data, ensure_ascii=True) if activity_data is not None else None,
                    json.dumps(activity_fields, ensure_ascii=True) if activity_fields is not None else None,
                ),
            )
        return True

    # ── Sync Watermarks ───────────────────────────────────────────────────

    def ensure_sync_watermarks_table(self):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS sync_watermarks (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    sync_type VARCHAR(100) NOT NULL,
                    watermark DATETIME NOT NULL,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uniq_bid_type (bid, sync_type)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
                """
            )

    def get_sync_watermark(self, bid, sync_type):
        self.ensure_sync_watermarks_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT watermark FROM sync_watermarks WHERE bid = %s AND sync_type = %s LIMIT 1",
                (str(bid), str(sync_type)),
            )
            row = cursor.fetchone()
            return row["watermark"] if row else None

    def set_sync_watermark(self, bid, sync_type, watermark):
        self.ensure_sync_watermarks_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                INSERT INTO sync_watermarks (bid, sync_type, watermark)
                VALUES (%s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    watermark = VALUES(watermark),
                    updated_at = CURRENT_TIMESTAMP
                """,
                (str(bid), str(sync_type), watermark),
            )

    # ── Phone Set Helper ──────────────────────────────────────────────────

    def get_lead_phone_set(self, bid, provider):
        """Return a flat set of all phone variant strings for a bid/provider from crm_leads_cache."""
        self.ensure_crm_leads_cache_table()
        provider = str(provider).lower()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT phone_primary, phone_variants FROM crm_leads_cache WHERE bid = %s AND provider = %s",
                (str(bid), provider),
            )
            rows = cursor.fetchall() or []
        phone_set = set()
        for row in rows:
            primary = str(row.get("phone_primary") or "").strip()
            if primary:
                phone_set.add(primary)
            variants = self._parse_json_field(row.get("phone_variants")) or []
            for v in variants:
                v = str(v).strip()
                if v:
                    phone_set.add(v)
        return phone_set

    def get_crm_enrichment_for_phones(self, bid, provider, phones):
        """Batch-fetch CRM lead info for a list of phone strings.

        Returns a dict mapping each input phone → {lead_name, lead_status,
        next_task_due_date, crm_owner_name} by matching against phone_primary
        and phone_variants in crm_leads_cache.  Phones that have no CRM match
        are absent from the returned dict.

        Works for any number of customers (bid-scoped) and any CRM provider.
        """
        if not phones:
            return {}
        provider = str(provider).lower()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                SELECT phone_primary, phone_variants, lead_name, owner_name,
                       lead_status, next_task_due_date
                FROM crm_leads_cache
                WHERE bid = %s AND provider = %s
                """,
                (str(bid), provider),
            )
            rows = cursor.fetchall() or []

        # Build variant → enrichment mapping (any variant of a lead maps to it)
        variant_map = {}
        for row in rows:
            info = {
                "lead_name": row.get("lead_name"),
                "lead_status": row.get("lead_status"),
                "next_task_due_date": row.get("next_task_due_date"),
                "crm_owner_name": row.get("owner_name"),
            }
            primary = str(row.get("phone_primary") or "").strip()
            if primary:
                variant_map[primary] = info
            for v in self._parse_json_field(row.get("phone_variants")) or []:
                v = str(v).strip()
                if v:
                    variant_map[v] = info

        result = {}
        for phone in phones:
            if not phone:
                continue
            # Direct hit first
            if phone in variant_map:
                result[phone] = variant_map[phone]
                continue
            # Try normalised variants of the input phone
            for variant in self._normalize_phone_variants(phone):
                if variant in variant_map:
                    result[phone] = variant_map[variant]
                    break
        return result

    def ensure_call_sync_cache_table(self):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS call_sync_cache (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    cache_key VARCHAR(255) NOT NULL,
                    bid VARCHAR(50) NOT NULL,
                    source VARCHAR(50) NOT NULL,
                    payload JSON NOT NULL,
                    expires_at DATETIME NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uniq_cache_key (cache_key),
                    INDEX idx_bid_source (bid, source),
                    INDEX idx_expires_at (expires_at)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
                """
            )

    def get_call_sync_cache(self, cache_key):
        self.ensure_call_sync_cache_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                SELECT payload
                FROM call_sync_cache
                WHERE cache_key = %s
                  AND expires_at > NOW()
                LIMIT 1
                """,
                (str(cache_key),),
            )
            row = cursor.fetchone()
            if not row:
                return None
            return self._parse_json_field(row.get('payload')) or None

    def upsert_call_sync_cache(self, cache_key, bid, source, payload, ttl_seconds):
        self.ensure_call_sync_cache_table()
        safe_ttl = max(1, int(ttl_seconds))
        payload_json = json.dumps(payload or {}, ensure_ascii=True)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                INSERT INTO call_sync_cache (
                    cache_key, bid, source, payload, expires_at
                ) VALUES (
                    %s, %s, %s, %s, DATE_ADD(NOW(), INTERVAL %s SECOND)
                )
                ON DUPLICATE KEY UPDATE
                    bid = VALUES(bid),
                    source = VALUES(source),
                    payload = VALUES(payload),
                    expires_at = VALUES(expires_at),
                    updated_at = CURRENT_TIMESTAMP
                """,
                (
                    str(cache_key),
                    str(bid),
                    str(source),
                    payload_json,
                    safe_ttl,
                ),
            )

    def prune_expired_call_sync_cache(self):
        self.ensure_call_sync_cache_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("DELETE FROM call_sync_cache WHERE expires_at <= NOW()")
            return int(cursor.rowcount or 0)

    def _parse_json_field(self, value):
        """Parse JSON string fields"""
        if not value:
            return None
        if isinstance(value, str):
            try:
                return json.loads(value)
            except json.JSONDecodeError:
                return value
        return value

    def _resolve_parameter_scores(self, row):
        """Return parameter_scores from the column, or rebuild from raw_response if the column is NULL."""
        ps = self._parse_json_field(row.get('parameter_scores'))
        if ps:
            return ps
        # Fallback: extract from raw_response.parameters
        raw = self._parse_json_field(row.get('raw_response'))
        if not raw or not isinstance(raw, dict):
            return None
        parameters = raw.get('parameters')
        if not isinstance(parameters, dict):
            return None
        # Rebuild parameter_scores dict from raw_response structure
        rebuilt = {}
        for param_name, data in parameters.items():
            if not isinstance(data, dict):
                continue
            applicable = data.get('applicable', True)
            if applicable:
                rebuilt[param_name] = {
                    'score': data.get('score', 0),
                    'max_score': data.get('max_score', data.get('maxScore', 0)),
                    'applicable': True,
                    'reasoning': data.get('reasoning', '')
                }
            else:
                rebuilt[param_name] = {
                    'score': None,
                    'max_score': data.get('max_score', data.get('maxScore', 0)),
                    'applicable': False,
                    'reason': data.get('reasoning', data.get('reason', 'Not applicable'))
                }
        return rebuilt if rebuilt else None

    def _format_call_record(self, record):
        """Format call record with proper JSON parsing"""
        if not record:
            return None

        # Parse JSON fields
        json_fields = ['keywords', 'sentiments', 'emotions', 'customer_details']
        for field in json_fields:
            if field in record and record[field]:
                record[field] = self._parse_json_field(record[field])

        return record

    # ========================================================================
    # BUSINESS OPERATIONS
    # ========================================================================

    def get_all_businesses(self):
        """Get list of all businesses with their call counts"""
        # This assumes you have a businesses table or can derive from call tables
        # If you only have dynamic tables, we'll scan for tables matching pattern {bid}_calls

        with self.get_connection() as conn:
            cursor = conn.cursor()

            # Get all tables matching pattern %_calls
            cursor.execute("SHOW TABLES LIKE '%_calls'")
            tables = cursor.fetchall()

            businesses = []
            for table in tables:
                table_name = list(table.values())[0]
                bid = table_name.replace('_calls', '')

                # Get count of calls
                cursor.execute(f"SELECT COUNT(*) as count FROM `{table_name}`")
                count_result = cursor.fetchone()
                count = count_result['count'] if count_result else 0

                # Try to get business name (if you have a businesses table)
                # For now, using BID as name
                businesses.append({
                    'bid': bid,
                    'name': f'Business {bid}',
                    'totalCalls': count,
                    'callsTable': table_name
                })

            return businesses

    def get_business_info(self, bid):
        """Get detailed information for a specific business"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            table_name = f"{bid}_calls"

            # Check if table exists
            cursor.execute(f"SHOW TABLES LIKE '{table_name}'")
            if not cursor.fetchone():
                return None

            # Get statistics
            query = f"""
                SELECT
                    COUNT(*) as total_calls,
                    COUNT(CASE WHEN status = 0 THEN 1 END) as unprocessed,
                    COUNT(CASE WHEN status = 1 THEN 1 END) as transcribed,
                    COUNT(CASE WHEN status = 2 THEN 1 END) as analyzed,
                    COUNT(CASE WHEN status = 3 THEN 1 END) as message_sent,
                    AVG(duration) as avg_duration,
                    MIN(call_starttime) as first_call_starttime,
                    MAX(call_starttime) as last_call_starttime
                FROM `{table_name}`
            """

            cursor.execute(query)
            stats = cursor.fetchone()

            return {
                'bid': bid,
                'name': f'Business {bid}',
                'statistics': stats
            }

    def _append_scope_filter(self, clauses, params, scope_where=None, scope_params=None):
        if scope_where:
            clauses.append(f"({scope_where})")
            params.extend(list(scope_params or []))

    def _analytics_where(self, groupname=None, date_from=None, date_to=None, scope_where=None, scope_params=None, base_conditions=None, date_on_datetime=False):
        conditions = list(base_conditions or [])
        params = []
        if groupname:
            conditions.append("r.groupname = %s")
            params.append(groupname)
        if date_from:
            conditions.append("r.call_starttime >= %s" if date_on_datetime else "DATE(r.call_starttime) >= %s")
            params.append(date_from)
        if date_to:
            conditions.append("r.call_starttime <= %s" if date_on_datetime else "DATE(r.call_starttime) <= %s")
            params.append(date_to)
        self._append_scope_filter(conditions, params, scope_where, scope_params)
        if conditions:
            return "WHERE " + " AND ".join(conditions), params
        return "", params

    def get_all_groupnames(self, bid):
        """Get list of all groupnames for specified business with their call counts"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            # Get distinct groupnames with counts from {bid}_raw_calls
            query = f"""
                SELECT
                    groupname,
                    COUNT(*) as totalCalls
                FROM `{bid}_raw_calls`
                WHERE groupname IS NOT NULL AND groupname != ''
                GROUP BY groupname
                ORDER BY totalCalls DESC
            """

            cursor.execute(query)
            results = cursor.fetchall()

            groupnames = []
            for row in results:
                groupnames.append({
                    'groupname': row['groupname'],
                    'totalCalls': row['totalCalls']
                })

            return groupnames

    def get_group_agent_customer_rows(self, bid, customer_numbers, groupname=None):
        """Get grouped rows for customer->group->agent mapping."""
        if not customer_numbers:
            return []

        normalized_numbers = [str(v).strip() for v in customer_numbers if str(v).strip()]
        if not normalized_numbers:
            return []

        with self.get_connection() as conn:
            cursor = conn.cursor()
            table_name = f"{bid}_raw_calls"
            if not self._table_exists(cursor, table_name):
                return []

            placeholders = ", ".join(["%s"] * len(normalized_numbers))
            params = list(normalized_numbers)

            where_clauses = [
                f"r.customer_callinfo IN ({placeholders})",
                "r.groupname IS NOT NULL",
                "TRIM(r.groupname) != ''",
                "r.agentname IS NOT NULL",
                "TRIM(r.agentname) != ''",
            ]
            if groupname:
                where_clauses.append("r.groupname = %s")
                params.append(groupname)

            where_sql = " AND ".join(where_clauses)
            query = f"""
                SELECT
                    r.groupname,
                    r.agentname,
                    r.customer_callinfo,
                    COUNT(*) AS total_calls,
                    MAX(r.call_starttime) AS last_call
                FROM `{table_name}` r
                WHERE {where_sql}
                GROUP BY r.groupname, r.agentname, r.customer_callinfo
                ORDER BY total_calls DESC, last_call DESC
            """
            cursor.execute(query, params)
            return cursor.fetchall()

    def get_location_stats(
        self,
        bid,
        groupname=None,
        date_from=None,
        date_to=None,
        agent_names=None,
        detailed_calls=None,
        detailed_threshold_seconds=120,
        direction=None,
        scope_where=None,
        scope_params=None,
    ):
        """Get call statistics for specified business filtered by groupname."""
        try:
            threshold = max(0, int(detailed_threshold_seconds or 120))
        except (TypeError, ValueError):
            threshold = 120

        agent_list = []
        if agent_names:
            agent_list = [a.strip() for a in str(agent_names).split(",") if a.strip()]

        with self.get_connection() as conn:
            cursor = conn.cursor()

            where_clause = "WHERE 1=1"
            params = []

            if groupname:
                where_clause += " AND r.groupname = %s"
                params.append(groupname)

            if date_from:
                where_clause += " AND DATE(r.call_starttime) >= %s"
                params.append(date_from)

            if date_to:
                where_clause += " AND DATE(r.call_starttime) <= %s"
                params.append(date_to)

            if direction:
                where_clause += " AND LOWER(r.direction) = LOWER(%s)"
                params.append(direction)

            if agent_list:
                placeholders = ", ".join(["%s"] * len(agent_list))
                where_clause += f" AND r.agentname IN ({placeholders})"
                params.extend(agent_list)

            if scope_where:
                where_clause += f" AND ({scope_where})"
                params.extend(list(scope_params or []))

            detailed_filter = str(detailed_calls or "").strip().lower()
            if detailed_filter == "yes":
                where_clause += (
                    " AND r.call_status = 'ANSWER'"
                    " AND r.call_starttime IS NOT NULL"
                    " AND r.call_endtime IS NOT NULL"
                    f" AND TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) >= {threshold}"
                )
            elif detailed_filter == "no":
                where_clause += (
                    " AND (r.call_status != 'ANSWER'"
                    " OR r.call_starttime IS NULL"
                    " OR r.call_endtime IS NULL"
                    f" OR TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) < {threshold})"
                )

            propensity_avg_sql = self._avg_propensity_sql(cursor, bid)

            # Get comprehensive statistics
            query = f"""
                SELECT
                    COUNT(*) as total_calls,

                    -- Inbound statistics
                    SUM(CASE WHEN LOWER(r.direction) = 'inbound' THEN 1 ELSE 0 END) as inbound_total,
                    SUM(CASE WHEN LOWER(r.direction) = 'inbound' AND r.call_status = 'ANSWER' THEN 1 ELSE 0 END) as inbound_answered,
                    SUM(CASE WHEN LOWER(r.direction) = 'inbound' AND r.call_status = 'BUSY' THEN 1 ELSE 0 END) as inbound_busy,
                    SUM(CASE WHEN LOWER(r.direction) = 'inbound' AND r.call_status = 'CANCEL' THEN 1 ELSE 0 END) as inbound_cancel,
                    SUM(CASE WHEN LOWER(r.direction) = 'inbound' AND r.call_status = 'NOANSWER' THEN 1 ELSE 0 END) as inbound_not_answered,

                    -- Outbound statistics
                    SUM(CASE WHEN LOWER(r.direction) = 'outbound' THEN 1 ELSE 0 END) as outbound_total,
                    SUM(CASE WHEN LOWER(r.direction) = 'outbound' AND r.call_status = 'ANSWER' THEN 1 ELSE 0 END) as outbound_answered,
                    SUM(CASE WHEN LOWER(r.direction) = 'outbound' AND r.call_status = 'BUSY' THEN 1 ELSE 0 END) as outbound_busy,
                    SUM(CASE WHEN LOWER(r.direction) = 'outbound' AND r.call_status = 'CANCEL' THEN 1 ELSE 0 END) as outbound_cancel,
                    SUM(CASE WHEN LOWER(r.direction) = 'outbound' AND r.call_status = 'NOANSWER' THEN 1 ELSE 0 END) as outbound_not_answered,

                    -- Total answered calls
                    SUM(CASE WHEN r.call_status = 'ANSWER' THEN 1 ELSE 0 END) as answered_total,

                    -- Average duration for answered calls (in seconds)
                    AVG(
                        CASE
                            WHEN r.call_status = 'ANSWER'
                                AND r.call_starttime IS NOT NULL
                                AND r.call_endtime IS NOT NULL
                            THEN TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime)
                            ELSE NULL
                        END
                    ) as avg_answered_duration,

                    -- Total duration for answered calls (seconds)
                    SUM(
                        CASE
                            WHEN r.call_status = 'ANSWER'
                                AND r.call_starttime IS NOT NULL
                                AND r.call_endtime IS NOT NULL
                            THEN TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime)
                            ELSE 0
                        END
                    ) as total_answered_duration_seconds,

                    -- Detailed calls over threshold
                    SUM(
                        CASE
                            WHEN r.call_status = 'ANSWER'
                                AND r.call_starttime IS NOT NULL
                                AND r.call_endtime IS NOT NULL
                                AND TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) >= {threshold}
                            THEN 1
                            ELSE 0
                        END
                    ) as detailed_calls_over_2min,

                    -- Analyzed (processed) breakdowns
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'inbound' AND r.call_status = 'ANSWER' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_inbound_answered,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'inbound' AND r.call_status = 'BUSY' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_inbound_busy,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'inbound' AND r.call_status = 'CANCEL' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_inbound_cancel,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'inbound' AND r.call_status = 'NOANSWER' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_inbound_not_answered,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'outbound' AND r.call_status = 'ANSWER' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_outbound_answered,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'outbound' AND r.call_status = 'BUSY' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_outbound_busy,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'outbound' AND r.call_status = 'CANCEL' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_outbound_cancel,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'outbound' AND r.call_status = 'NOANSWER' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_outbound_not_answered,

                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.call_status = 'ANSWER' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_answered_total,

                    AVG(
                        CASE
                            WHEN a.callid IS NOT NULL
                                AND r.call_status = 'ANSWER'
                                AND r.call_starttime IS NOT NULL
                                AND r.call_endtime IS NOT NULL
                            THEN TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime)
                            ELSE NULL
                        END
                    ) as analyzed_avg_answered_duration,

                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL
                                AND r.call_status = 'ANSWER'
                                AND r.call_starttime IS NOT NULL
                                AND r.call_endtime IS NOT NULL
                                AND TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) >= {threshold}
                            THEN 1
                            ELSE 0
                        END
                    ) as analyzed_detailed_calls_over_2min,

                    -- Average talk-listen ratio from analytics
                    AVG(CASE WHEN a.talk_listen_ratio IS NOT NULL THEN a.agent_speak_percentage ELSE NULL END) as avg_agent_talk_percentage,
                    AVG(CASE WHEN a.talk_listen_ratio IS NOT NULL THEN a.customer_speak_percentage ELSE NULL END) as avg_customer_talk_percentage,

                    -- Average quality score from analytics
                    AVG(CASE WHEN a.quality_score IS NOT NULL THEN a.quality_score ELSE NULL END) as avg_quality_score
                    {propensity_avg_sql},

                    -- Total processed durations (seconds)
                    SUM(
                        CASE
                            WHEN s.transcript IS NOT NULL AND s.transcript != ''
                            THEN COALESCE(s.duration, TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime))
                            ELSE 0
                        END
                    ) as transcribed_duration_seconds,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL
                            THEN COALESCE(s.duration, TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime))
                            ELSE 0
                        END
                    ) as analyzed_duration_seconds,

                    -- Processed calls (analyzed)
                    COUNT(DISTINCT a.callid) as analyzed_total,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'inbound' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_inbound,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'outbound' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_outbound

                FROM `{bid}_raw_calls` r
                LEFT JOIN `{bid}_callanalytics` a ON r.callid = a.callid
                LEFT JOIN `{bid}_sarvamresponse` s ON r.callid = s.callid
                {where_clause}
            """

            cursor.execute(query, params)
            result = cursor.fetchone()

            if not result:
                return {
                    'total_calls': 0,
                    'inbound_total': 0,
                    'inbound_answered': 0,
                    'inbound_busy': 0,
                    'inbound_cancel': 0,
                    'inbound_not_answered': 0,
                    'outbound_total': 0,
                    'outbound_answered': 0,
                    'outbound_busy': 0,
                    'outbound_cancel': 0,
                    'outbound_not_answered': 0,
                    'avg_answered_duration': 0,
                    'avg_agent_talk_percentage': 0,
                    'avg_customer_talk_percentage': 0,
                    'avg_quality_score': 0,
                    'avg_propensity_score': 0,
                    'answered_total': 0,
                    'total_answered_duration_seconds': 0,
                    'detailed_calls_over_2min': 0,
                    'detailed_calls_over_2min_percent': 0,
                    'analyzed_inbound_answered': 0,
                    'analyzed_inbound_busy': 0,
                    'analyzed_inbound_cancel': 0,
                    'analyzed_inbound_not_answered': 0,
                    'analyzed_outbound_answered': 0,
                    'analyzed_outbound_busy': 0,
                    'analyzed_outbound_cancel': 0,
                    'analyzed_outbound_not_answered': 0,
                    'analyzed_answered_total': 0,
                    'analyzed_avg_answered_duration': 0,
                    'analyzed_detailed_calls_over_2min': 0,
                    'analyzed_detailed_calls_over_2min_percent': 0,
                    'transcribed_duration_seconds': 0,
                    'analyzed_duration_seconds': 0,
                    'analyzed_total': 0,
                    'analyzed_inbound': 0,
                    'analyzed_outbound': 0
                }

            # Calculate talk-listen ratio in format "X : Y"
            agent_pct = round(result['avg_agent_talk_percentage'] or 0)
            customer_pct = round(result['avg_customer_talk_percentage'] or 0)
            talk_listen_ratio = f"{agent_pct} : {customer_pct}" if agent_pct > 0 or customer_pct > 0 else "N/A"

            answered_total = int(result['answered_total'] or 0)
            detailed_calls_over_2min = int(result['detailed_calls_over_2min'] or 0)
            detailed_calls_over_2min_percent = round(
                (detailed_calls_over_2min / answered_total) * 100,
                1
            ) if answered_total > 0 else 0

            analyzed_answered_total = int(result['analyzed_answered_total'] or 0)
            analyzed_detailed_calls_over_2min = int(result['analyzed_detailed_calls_over_2min'] or 0)
            analyzed_detailed_calls_over_2min_percent = round(
                (analyzed_detailed_calls_over_2min / analyzed_answered_total) * 100,
                1
            ) if analyzed_answered_total > 0 else 0

            return {
                'total_calls': int(result['total_calls'] or 0),
                'inbound_total': int(result['inbound_total'] or 0),
                'inbound_answered': int(result['inbound_answered'] or 0),
                'inbound_busy': int(result['inbound_busy'] or 0),
                'inbound_cancel': int(result['inbound_cancel'] or 0),
                'inbound_not_answered': int(result['inbound_not_answered'] or 0),
                'outbound_total': int(result['outbound_total'] or 0),
                'outbound_answered': int(result['outbound_answered'] or 0),
                'outbound_busy': int(result['outbound_busy'] or 0),
                'outbound_cancel': int(result['outbound_cancel'] or 0),
                'outbound_not_answered': int(result['outbound_not_answered'] or 0),
                'avg_answered_duration': int(result['avg_answered_duration'] or 0),
                'total_answered_duration_seconds': int(result['total_answered_duration_seconds'] or 0),
                'avg_agent_talk_percentage': agent_pct,
                'avg_customer_talk_percentage': customer_pct,
                'talk_listen_ratio': talk_listen_ratio,
                'avg_quality_score': round(result['avg_quality_score'] or 0, 2),
                'avg_propensity_score': round(result.get('avg_propensity_score') or 0, 2),
                'answered_total': answered_total,
                'detailed_calls_over_2min': detailed_calls_over_2min,
                'detailed_calls_over_2min_percent': detailed_calls_over_2min_percent,
                'analyzed_inbound_answered': int(result['analyzed_inbound_answered'] or 0),
                'analyzed_inbound_busy': int(result['analyzed_inbound_busy'] or 0),
                'analyzed_inbound_cancel': int(result['analyzed_inbound_cancel'] or 0),
                'analyzed_inbound_not_answered': int(result['analyzed_inbound_not_answered'] or 0),
                'analyzed_outbound_answered': int(result['analyzed_outbound_answered'] or 0),
                'analyzed_outbound_busy': int(result['analyzed_outbound_busy'] or 0),
                'analyzed_outbound_cancel': int(result['analyzed_outbound_cancel'] or 0),
                'analyzed_outbound_not_answered': int(result['analyzed_outbound_not_answered'] or 0),
                'analyzed_answered_total': analyzed_answered_total,
                'analyzed_avg_answered_duration': int(result['analyzed_avg_answered_duration'] or 0),
                'analyzed_detailed_calls_over_2min': analyzed_detailed_calls_over_2min,
                'analyzed_detailed_calls_over_2min_percent': analyzed_detailed_calls_over_2min_percent,
                'transcribed_duration_seconds': float(result['transcribed_duration_seconds'] or 0),
                'analyzed_duration_seconds': float(result['analyzed_duration_seconds'] or 0),
                'analyzed_total': int(result['analyzed_total'] or 0),
                'analyzed_inbound': int(result['analyzed_inbound'] or 0),
                'analyzed_outbound': int(result['analyzed_outbound'] or 0)
            }

    def get_filtered_raw_calls(
        self,
        bid,
        groupname=None,
        direction=None,
        call_status=None,
        limit=100,
        offset=0,
        date_from=None,
        date_to=None,
        scope_where=None,
        scope_params=None,
    ):
        """Get filtered raw calls from 7987_raw_calls table"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            # Build WHERE clause
            where_clauses = []
            params = []

            if groupname:
                where_clauses.append("r.groupname = %s")
                params.append(groupname)

            if direction:
                where_clauses.append("LOWER(r.direction) = LOWER(%s)")
                params.append(direction)

            if call_status:
                where_clauses.append("r.call_status = %s")
                params.append(call_status)

            if date_from:
                where_clauses.append("DATE(r.call_starttime) >= %s")
                params.append(date_from)

            if date_to:
                where_clauses.append("DATE(r.call_starttime) <= %s")
                params.append(date_to)

            if scope_where:
                where_clauses.append(f"({scope_where})")
                params.extend(list(scope_params or []))

            where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""

            propensity_cols = ""
            analytics_table = f"{bid}_callanalytics"
            if self._table_exists(cursor, analytics_table) and self._table_has_column(
                cursor, analytics_table, "propensity_score"
            ):
                propensity_cols = ", ca.propensity_score, ca.propensity_band"

            query = f"""
                SELECT
                    r.callid,
                    r.agentname,
                    r.agent_callinfo,
                    r.customer_callinfo,
                    r.call_starttime,
                    r.call_endtime,
                    r.direction,
                    r.call_status,
                    r.groupname,
                    TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) as duration_seconds,
                    ca.quality_score
                    {propensity_cols},
                    CASE
                        WHEN ca.callid IS NOT NULL THEN 1
                        ELSE 0
                    END as is_analyzed,
                    r.transcription_status,
                    CASE WHEN s.callid IS NOT NULL THEN 1 ELSE 0 END as has_transcript
                FROM `{bid}_raw_calls` r
                LEFT JOIN `{bid}_callanalytics` ca ON r.callid = ca.callid
                LEFT JOIN `{bid}_sarvamresponse` s ON r.callid = s.callid
                {where_sql}
                ORDER BY r.call_starttime DESC
                LIMIT %s OFFSET %s
            """

            params.extend([limit, offset])
            cursor.execute(query, params)
            calls = cursor.fetchall()

            return calls

    def get_raw_call_details(self, bid, callid):
        """Get call details from {bid}_raw_calls joined with {bid}_sarvamresponse"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            query = f"""
                SELECT
                    r.callid,
                    r.bid,
                    r.fileurl as fileUrl,
                    r.agentname,
                    r.groupname,
                    r.call_starttime,
                    r.call_endtime,
                    r.call_status,
                    r.agent_callinfo,
                    r.customer_callinfo,
                    r.direction,
                    r.transcription_status,
                    TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) as duration_seconds,
                    s.transcript as transcripts,
                    s.speaker_segments,
                    s.num_speakers,
                    s.duration,
                    s.language,
                    s.request_id
                FROM `{bid}_raw_calls` r
                LEFT JOIN `{bid}_sarvamresponse` s ON r.callid = s.callid
                WHERE r.callid = %s
                LIMIT 1
            """

            cursor.execute(query, (callid,))
            call = cursor.fetchone()

            if not call:
                return None

            # Format the call record
            call_dict = dict(call)

            # Set status for compatibility with CallDetail component
            call_dict['status'] = 2 if call_dict['transcripts'] else 0

            return call_dict

    def _append_default_call_list_filters(
        self,
        bid: str,
        *,
        has_analytics: bool,
        has_sarvam: bool,
        filters,
        where_clauses: list,
        params: list,
    ) -> None:
        """Default dashboard view: analyzed calls only, excluding sub-min duration."""
        if filters and filters.get("status") is not None:
            return

        if has_analytics:
            where_clauses.append("a.callid IS NOT NULL")
        else:
            where_clauses.append("0=1")

        where_clauses.append("(COALESCE(r.transcription_status, '') != 'skipped_short')")

        cfg = self.get_pipeline_config(bid) or {}
        min_s = max(0, int(cfg.get("min_call_duration_s") or 0))
        if min_s <= 0:
            return

        effective_at = cfg.get("min_call_duration_effective_at") or self.ensure_min_duration_effective_at(bid)
        if not effective_at:
            return

        duration_expr = (
            "COALESCE(s.duration, r.duration_seconds, "
            "TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime))"
            if has_sarvam
            else "COALESCE(r.duration_seconds, TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime))"
        )
        where_clauses.append(f"(r.call_starttime < %s OR ({duration_expr}) >= %s)")
        params.extend([effective_at, min_s])

    def _get_raw_calls_list(self, cursor, bid, filters=None, limit=100, offset=0, scope_where=None, scope_params=None):
        raw_table = f"{bid}_raw_calls"
        if not self._table_exists(cursor, raw_table):
            return []

        sarvam_table = f"{bid}_sarvamresponse"
        analytics_table = f"{bid}_callanalytics"
        has_sarvam = self._table_exists(cursor, sarvam_table)
        has_analytics = self._table_exists(cursor, analytics_table)

        joins = []
        status_case = "0"
        quality_score_select = "NULL AS quality_score"
        propensity_score_select = "NULL AS propensity_score, NULL AS propensity_band"
        duration_seconds_select = "TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime)"

        if has_sarvam:
            joins.append(f"LEFT JOIN `{sarvam_table}` s ON r.callid = s.callid")
            status_case = "CASE WHEN s.callid IS NOT NULL THEN 1 ELSE 0 END"
            duration_seconds_select = "COALESCE(s.duration, TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime))"

        if has_analytics:
            joins.append(f"LEFT JOIN `{analytics_table}` a ON r.callid = a.callid")
            quality_score_select = "a.quality_score AS quality_score"
            if self._table_has_column(cursor, analytics_table, "propensity_score"):
                propensity_score_select = (
                    "a.propensity_score AS propensity_score, a.propensity_band AS propensity_band"
                )
            if has_sarvam:
                status_case = (
                    "CASE WHEN a.callid IS NOT NULL THEN 3 "
                    "WHEN s.callid IS NOT NULL THEN 2 ELSE r.status END"
                )
            else:
                status_case = "CASE WHEN a.callid IS NOT NULL THEN 3 ELSE r.status END"

        where_clauses = []
        params = []

        self._append_default_call_list_filters(
            bid,
            has_analytics=has_analytics,
            has_sarvam=has_sarvam,
            filters=filters,
            where_clauses=where_clauses,
            params=params,
        )

        if filters:
            if 'date_from' in filters:
                where_clauses.append("r.call_starttime >= %s")
                params.append(filters['date_from'])

            if 'date_to' in filters:
                where_clauses.append("r.call_starttime <= %s")
                params.append(filters['date_to'])

            status_filter = filters.get('status')
            if status_filter is not None:
                # Handle Status 3: Analyzed
                if status_filter >= 3:
                    if has_analytics:
                        where_clauses.append("a.callid IS NOT NULL")
                    else:
                        where_clauses.append("0=1")
                # Handle Status 2: Transcribed but not Analyzed
                elif status_filter == 2:
                    if has_sarvam:
                        where_clauses.append("s.callid IS NOT NULL")
                    if has_analytics:
                        where_clauses.append("a.callid IS NULL")
                # Handle Status 1: Queued for Transcription (usually tracked directly in raw_calls)
                elif status_filter == 1:
                    where_clauses.append("r.status = 1")
                    if has_sarvam:
                        where_clauses.append("s.callid IS NULL")
                # Handle Status 0: New/Unprocessed
                else:
                    if has_sarvam:
                        where_clauses.append("s.callid IS NULL")
                    if has_analytics:
                        where_clauses.append("a.callid IS NULL")
                    where_clauses.append("(r.status = 0 OR r.status IS NULL)")

        self._append_scope_filter(where_clauses, params, scope_where, scope_params)

        where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
        join_sql = " ".join(joins)

        query = f"""
            SELECT
                r.callid,
                r.bid,
                r.fileurl as fileUrl,
                r.agentname,
                r.groupname,
                r.call_starttime,
                r.call_endtime,
                r.call_status,
                r.agent_callinfo,
                r.customer_callinfo,
                r.direction,
                CAST(TIME(r.call_starttime) AS CHAR) as call_time,
                {duration_seconds_select} as duration_seconds,
                {quality_score_select},
                {propensity_score_select},
                {status_case} as status
            FROM `{raw_table}` r
            {join_sql}
            {where_sql}
            ORDER BY r.call_starttime DESC, r.call_endtime DESC
            LIMIT %s OFFSET %s
        """

        params.extend([limit, offset])
        cursor.execute(query, params)
        calls = cursor.fetchall()
        return [self._format_call_record(call) for call in calls]

    def _get_raw_calls_count(self, cursor, bid, filters=None, scope_where=None, scope_params=None):
        raw_table = f"{bid}_raw_calls"
        if not self._table_exists(cursor, raw_table):
            return 0

        sarvam_table = f"{bid}_sarvamresponse"
        analytics_table = f"{bid}_callanalytics"
        has_sarvam = self._table_exists(cursor, sarvam_table)
        has_analytics = self._table_exists(cursor, analytics_table)

        joins = []
        where_clauses = []
        params = []

        if has_sarvam:
            joins.append(f"LEFT JOIN `{sarvam_table}` s ON r.callid = s.callid")
        if has_analytics:
            joins.append(f"LEFT JOIN `{analytics_table}` a ON r.callid = a.callid")

        self._append_default_call_list_filters(
            bid,
            has_analytics=has_analytics,
            has_sarvam=has_sarvam,
            filters=filters,
            where_clauses=where_clauses,
            params=params,
        )

        if filters:
            if 'date_from' in filters:
                where_clauses.append("r.call_starttime >= %s")
                params.append(filters['date_from'])

            if 'date_to' in filters:
                where_clauses.append("r.call_starttime <= %s")
                params.append(filters['date_to'])

            status_filter = filters.get('status')
            if status_filter is not None:
                # Handle Status 3: Analyzed
                if status_filter >= 3:
                    if has_analytics:
                        where_clauses.append("a.callid IS NOT NULL")
                    else:
                        where_clauses.append("0=1")
                # Handle Status 2: Transcribed but not Analyzed
                elif status_filter == 2:
                    if has_sarvam:
                        where_clauses.append("s.callid IS NOT NULL")
                    if has_analytics:
                        where_clauses.append("a.callid IS NULL")
                # Handle Status 1: Queued for Transcription
                elif status_filter == 1:
                    where_clauses.append("r.status = 1")
                    if has_sarvam:
                        where_clauses.append("s.callid IS NULL")
                # Handle Status 0: New/Unprocessed
                else:
                    if has_sarvam:
                        where_clauses.append("s.callid IS NULL")
                    if has_analytics:
                        where_clauses.append("a.callid IS NULL")
                    where_clauses.append("(r.status = 0 OR r.status IS NULL)")

        self._append_scope_filter(where_clauses, params, scope_where, scope_params)

        where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
        join_sql = " ".join(joins)

        query = f"""
            SELECT COUNT(*) as count
            FROM `{raw_table}` r
            {join_sql}
            {where_sql}
        """

        cursor.execute(query, params)
        result = cursor.fetchone()
        return result['count'] if result else 0

    def _search_raw_calls(self, cursor, bid, query, limit=50, scope_where=None, scope_params=None):
        raw_table = f"{bid}_raw_calls"
        if not self._table_exists(cursor, raw_table):
            return []

        search_term = f"%{query}%"
        params = [search_term, search_term, search_term, search_term]
        where_sql = (
            " WHERE (r.callid LIKE %s OR r.agentname LIKE %s "
            "OR r.customer_callinfo LIKE %s OR r.agent_callinfo LIKE %s)"
        )
        if scope_where:
            where_sql += f" AND ({scope_where})"
            params.extend(list(scope_params or []))
        cursor.execute(f"""
            SELECT
                r.callid,
                r.bid,
                r.fileurl as fileUrl,
                r.agentname,
                r.groupname,
                r.call_starttime,
                r.call_endtime,
                r.call_status,
                r.agent_callinfo,
                r.customer_callinfo,
                r.direction,
                CAST(TIME(r.call_starttime) AS CHAR) as call_time,
                TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) as duration_seconds
            FROM `{raw_table}` r
            {where_sql}
            ORDER BY r.call_starttime DESC, r.call_endtime DESC
            LIMIT %s
        """, params + [limit])

        calls = cursor.fetchall()
        return [self._format_call_record(call) for call in calls]

    def get_leads_list(self, bid, groupname=None, limit=100, offset=0, transcripts_only=False, direction=None, scope_where=None, scope_params=None):
        """Get customer-level lead aggregates from raw calls.

        Prefer the stored customer phone. Some integrations store the agent
        extension in agent_callinfo even for inbound calls.
        """
        with self.get_connection() as conn:
            cursor = conn.cursor()

            raw_table = f"{bid}_raw_calls"
            sarvam_table = f"{bid}_sarvamresponse"
            analytics_table = f"{bid}_callanalytics"

            if not self._table_exists(cursor, raw_table):
                return {'leads': [], 'total': 0}

            has_sarvam = self._table_exists(cursor, sarvam_table)
            has_analytics = self._table_exists(cursor, analytics_table)

            lead_phone_expr = (
                "CASE "
                "WHEN NULLIF(TRIM(CAST(r.customer_callinfo AS CHAR)), '') REGEXP '^[+]?[0-9]{6,}$' "
                "THEN TRIM(CAST(r.customer_callinfo AS CHAR)) "
                "WHEN NULLIF(TRIM(CAST(r.agent_callinfo AS CHAR)), '') REGEXP '^[+]?[0-9]{6,}$' "
                "THEN TRIM(CAST(r.agent_callinfo AS CHAR)) "
                "WHEN LOWER(r.direction) = 'inbound' THEN TRIM(CAST(r.agent_callinfo AS CHAR)) "
                "ELSE TRIM(CAST(r.customer_callinfo AS CHAR)) "
                "END"
            )

            where_clauses = [
                f"({lead_phone_expr}) IS NOT NULL",
                f"TRIM(({lead_phone_expr})) != ''",
            ]
            params = []

            if direction and direction.lower() in ('inbound', 'outbound'):
                where_clauses.append("LOWER(r.direction) = %s")
                params.append(direction.lower())

            if groupname:
                where_clauses.append("r.groupname = %s")
                params.append(groupname)

            if transcripts_only and has_sarvam:
                where_clauses.append("s.callid IS NOT NULL")
                where_clauses.append("COALESCE(TRIM(s.transcript), '') != ''")

            self._append_scope_filter(where_clauses, params, scope_where, scope_params)

            where_sql = " WHERE " + " AND ".join(where_clauses)
            join_sarvam = f"LEFT JOIN `{sarvam_table}` s ON r.callid = s.callid" if has_sarvam else ""
            join_analytics = f"LEFT JOIN `{analytics_table}` a ON r.callid = a.callid" if has_analytics else ""
            quality_expr = "ROUND(AVG(a.quality_score), 2)" if has_analytics else "NULL"
            has_propensity_col = (
                has_analytics
                and self._table_has_column(cursor, analytics_table, "propensity_score")
            )
            propensity_expr = (
                "ROUND(AVG(CASE WHEN a.propensity_score IS NOT NULL THEN a.propensity_score ELSE NULL END), 2)"
                if has_propensity_col else "NULL"
            )
            transcript_expr = (
                "SUM(CASE WHEN s.callid IS NOT NULL AND COALESCE(TRIM(s.transcript), '') != '' THEN 1 ELSE 0 END)"
                if has_sarvam else
                "0"
            )

            count_query = f"""
                SELECT COUNT(*) AS total
                FROM (
                    SELECT {lead_phone_expr}
                    FROM `{raw_table}` r
                    {join_sarvam}
                    {where_sql}
                    GROUP BY {lead_phone_expr}
                ) t
            """
            cursor.execute(count_query, params)
            total_row = cursor.fetchone() or {'total': 0}
            total = int(total_row.get('total') or 0)

            data_query = f"""
                SELECT
                    {lead_phone_expr} AS lead_phone,
                    COUNT(*) AS conversations,
                    MAX(r.call_starttime) AS last_conversation,
                    SUBSTRING_INDEX(
                        GROUP_CONCAT(COALESCE(r.agentname, '-') ORDER BY r.call_starttime DESC SEPARATOR '||'),
                        '||',
                        1
                    ) AS owner_name,
                    {quality_expr} AS avg_quality_score,
                    {propensity_expr} AS avg_propensity_score,
                    SUM(
                        CASE
                            WHEN r.call_starttime IS NOT NULL AND r.call_endtime IS NOT NULL
                            THEN GREATEST(TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime), 0)
                            ELSE 0
                        END
                    ) AS total_duration_seconds,
                    SUM(CASE WHEN r.call_status = 'ANSWER' THEN 1 ELSE 0 END) AS answered_calls,
                    {transcript_expr} AS transcript_calls
                FROM `{raw_table}` r
                {join_sarvam}
                {join_analytics}
                {where_sql}
                GROUP BY {lead_phone_expr}
                ORDER BY last_conversation DESC
                LIMIT %s OFFSET %s
            """
            cursor.execute(data_query, params + [limit, offset])
            rows = cursor.fetchall()

            leads = []
            for row in rows:
                avg_propensity = row.get('avg_propensity_score')
                leads.append({
                    'lead_phone': row.get('lead_phone'),
                    'conversations': int(row.get('conversations') or 0),
                    'last_conversation': row.get('last_conversation'),
                    'owner_name': row.get('owner_name') or '-',
                    'avg_quality_score': float(row.get('avg_quality_score') or 0),
                    'avg_propensity_score': (
                        round(float(avg_propensity), 2)
                        if avg_propensity is not None else None
                    ),
                    'total_duration_seconds': int(row.get('total_duration_seconds') or 0),
                    'answered_calls': int(row.get('answered_calls') or 0),
                    'transcript_calls': int(row.get('transcript_calls') or 0),
                    'lead_name': None,
                    'lead_status': '-',
                    'next_task_due_date': '-',
                })

            # Enrich with CRM lead names / status from crm_leads_cache
            phones = [lead['lead_phone'] for lead in leads if lead.get('lead_phone')]
            if phones:
                crm_map = self.get_crm_enrichment_for_phones(bid, 'leadsquared', phones)
                for lead in leads:
                    phone = lead.get('lead_phone', '')
                    enrichment = crm_map.get(phone, {})
                    lead['lead_name'] = enrichment.get('lead_name') or None
                    if enrichment.get('lead_status'):
                        lead['lead_status'] = enrichment['lead_status']
                    if enrichment.get('next_task_due_date'):
                        lead['next_task_due_date'] = enrichment['next_task_due_date']
                    # Prefer CRM owner over call-derived owner when call owner is absent
                    if lead['owner_name'] == '-' and enrichment.get('crm_owner_name'):
                        lead['owner_name'] = enrichment['crm_owner_name']

            return {'leads': leads, 'total': total}

    def get_lead_details(self, bid, lead_phone, groupname=None, scope_where=None, scope_params=None):
        """Get customer-level details and call timeline for a given customer phone."""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            raw_table = f"{bid}_raw_calls"
            sarvam_table = f"{bid}_sarvamresponse"
            analytics_table = f"{bid}_callanalytics"

            if not self._table_exists(cursor, raw_table):
                return None

            has_sarvam = self._table_exists(cursor, sarvam_table)
            has_analytics = self._table_exists(cursor, analytics_table)

            lead_phone_expr = (
                "CASE "
                "WHEN NULLIF(TRIM(CAST(r.customer_callinfo AS CHAR)), '') REGEXP '^[+]?[0-9]{6,}$' "
                "THEN TRIM(CAST(r.customer_callinfo AS CHAR)) "
                "WHEN NULLIF(TRIM(CAST(r.agent_callinfo AS CHAR)), '') REGEXP '^[+]?[0-9]{6,}$' "
                "THEN TRIM(CAST(r.agent_callinfo AS CHAR)) "
                "WHEN LOWER(r.direction) = 'inbound' THEN TRIM(CAST(r.agent_callinfo AS CHAR)) "
                "ELSE TRIM(CAST(r.customer_callinfo AS CHAR)) "
                "END"
            )
            where_clauses = [
                f"{lead_phone_expr} = %s",
            ]
            params = [lead_phone]
            if groupname:
                where_clauses.append("r.groupname = %s")
                params.append(groupname)

            self._append_scope_filter(where_clauses, params, scope_where, scope_params)

            where_sql = " WHERE " + " AND ".join(where_clauses)
            join_sarvam = f"LEFT JOIN `{sarvam_table}` s ON r.callid = s.callid" if has_sarvam else ""
            join_analytics = f"LEFT JOIN `{analytics_table}` a ON r.callid = a.callid" if has_analytics else ""
            summary_quality_expr = "ROUND(AVG(a.quality_score), 2)" if has_analytics else "NULL"
            has_propensity_col = (
                has_analytics
                and self._table_has_column(cursor, analytics_table, "propensity_score")
            )
            has_propensity_band_col = (
                has_analytics
                and self._table_has_column(cursor, analytics_table, "propensity_band")
            )
            summary_propensity_expr = (
                "ROUND(AVG(CASE WHEN a.propensity_score IS NOT NULL THEN a.propensity_score ELSE NULL END), 2)"
                if has_propensity_col else "NULL"
            )
            talk_expr = "ROUND(AVG(a.agent_speak_percentage), 0)" if has_analytics else "0"
            listen_expr = "ROUND(AVG(a.customer_speak_percentage), 0)" if has_analytics else "0"
            intent_expr = "0"
            call_quality_expr = "a.quality_score" if has_analytics else "NULL"
            call_propensity_expr = "a.propensity_score" if has_propensity_col else "NULL"
            call_propensity_band_expr = "a.propensity_band" if has_propensity_band_col else "NULL"
            call_summary_expr = "a.summary" if has_analytics else "NULL"
            call_purpose_expr = "a.call_purpose" if has_analytics else "NULL"
            call_objections_expr = "a.objections_concerns" if has_analytics else "NULL"
            call_sentiment_expr = "a.sentiment" if has_analytics else "NULL"
            call_objection_type_expr = "a.objection_type" if has_analytics and self._column_exists(cursor, analytics_table, 'objection_type') else "NULL"
            # parameter_scores / parameter_detections columns may not exist on older tables
            has_param_scores_col = has_analytics and self._column_exists(cursor, analytics_table, 'parameter_scores')
            has_param_detections_col = has_analytics and self._column_exists(cursor, analytics_table, 'parameter_detections')
            has_raw_response_col = has_analytics and self._column_exists(cursor, analytics_table, 'raw_response')
            call_parameter_scores_expr = "a.parameter_scores" if has_param_scores_col else "NULL"
            call_parameter_detections_expr = "a.parameter_detections" if has_param_detections_col else "NULL"
            call_raw_response_expr = "a.raw_response" if has_raw_response_col else "NULL"
            call_has_transcript_expr = (
                "CASE WHEN s.callid IS NOT NULL AND COALESCE(TRIM(s.transcript), '') != '' THEN 1 ELSE 0 END"
                if has_sarvam else
                "0"
            )
            call_transcript_expr = "s.transcript" if has_sarvam else "NULL"
            call_speaker_segments_expr = "s.speaker_segments" if has_sarvam else "NULL"
            call_transcript_duration_expr = "s.duration" if has_sarvam else "NULL"

            summary_query = f"""
                SELECT
                    COUNT(*) AS total_conversations,
                    MAX(r.call_starttime) AS last_conversation,
                    MIN(r.call_starttime) AS first_conversation,
                    SUBSTRING_INDEX(
                        GROUP_CONCAT(COALESCE(r.agentname, '-') ORDER BY r.call_starttime DESC SEPARATOR '||'),
                        '||',
                        1
                    ) AS owner_name,
                    {summary_quality_expr} AS avg_quality_score,
                    {summary_propensity_expr} AS avg_propensity_score,
                    {talk_expr} AS avg_talk_pct,
                    {listen_expr} AS avg_listen_pct,
                    {intent_expr} AS avg_intent_score,
                    SUM(
                        CASE
                            WHEN r.call_starttime IS NOT NULL AND r.call_endtime IS NOT NULL
                            THEN GREATEST(TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime), 0)
                            ELSE 0
                        END
                    ) AS total_duration_seconds
                FROM `{raw_table}` r
                {join_analytics}
                {where_sql}
            """
            cursor.execute(summary_query, params)
            summary = cursor.fetchone()
            if not summary or not summary.get('total_conversations'):
                return None

            calls_query = f"""
                SELECT
                    r.callid,
                    r.fileurl AS file_url,
                    r.call_starttime,
                    r.call_endtime,
                    r.call_status,
                    r.direction,
                    r.agentname,
                    r.groupname,
                    TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) AS duration_seconds,
                    {call_has_transcript_expr} AS has_transcript,
                    {call_transcript_expr} AS transcript,
                    {call_speaker_segments_expr} AS speaker_segments,
                    {call_transcript_duration_expr} AS transcript_duration,
                    {call_quality_expr} AS quality_score,
                    {call_propensity_expr} AS propensity_score,
                    {call_propensity_band_expr} AS propensity_band,
                    {call_summary_expr} AS summary,
                    {call_parameter_scores_expr} AS parameter_scores,
                    {call_parameter_detections_expr} AS parameter_detections,
                    {call_purpose_expr} AS call_purpose,
                    {call_objections_expr} AS objections_concerns,
                    {call_sentiment_expr} AS sentiment,
                    {call_objection_type_expr} AS objection_type,
                    {call_raw_response_expr} AS raw_response
                FROM `{raw_table}` r
                {join_sarvam}
                {join_analytics}
                {where_sql}
                ORDER BY r.call_starttime DESC
            """
            cursor.execute(calls_query, params)
            calls = cursor.fetchall()

            talk_pct = int(summary.get('avg_talk_pct') or 0)
            listen_pct = int(summary.get('avg_listen_pct') or 0)

            # Enrich with CRM lead name / status
            crm_enrichment = self.get_crm_enrichment_for_phones(bid, 'leadsquared', [lead_phone])
            crm_info = crm_enrichment.get(lead_phone, {})
            call_owner = summary.get('owner_name') or '-'

            return {
                'lead_phone': lead_phone,
                'lead_name': crm_info.get('lead_name') or None,
                'lead_status': crm_info.get('lead_status') or '-',
                'next_task_due_date': crm_info.get('next_task_due_date') or '-',
                'owner_name': call_owner if call_owner != '-' else (crm_info.get('crm_owner_name') or '-'),
                'total_conversations': int(summary.get('total_conversations') or 0),
                'first_conversation': summary.get('first_conversation'),
                'last_conversation': summary.get('last_conversation'),
                'avg_quality_score': float(summary.get('avg_quality_score') or 0),
                'avg_propensity_score': (
                    round(float(summary.get('avg_propensity_score')), 2)
                    if summary.get('avg_propensity_score') is not None else None
                ),
                'talk_listen_ratio': f"{talk_pct}:{listen_pct}" if (talk_pct or listen_pct) else "N/A",
                'intent_score': int(summary.get('avg_intent_score') or 0),
                'total_duration_seconds': int(summary.get('total_duration_seconds') or 0),
                'calls': [
                    {
                        'callid': row.get('callid'),
                        'call_starttime': row.get('call_starttime'),
                        'call_endtime': row.get('call_endtime'),
                        'call_status': row.get('call_status'),
                        'direction': row.get('direction'),
                        'agentname': row.get('agentname') or '-',
                        'groupname': row.get('groupname') or '-',
                        'duration_seconds': int(row.get('duration_seconds') or 0),
                        'has_transcript': bool(row.get('has_transcript')),
                        'file_url': row.get('file_url'),
                        'transcript': row.get('transcript') or '',
                        'speaker_segments': self._parse_json_field(row.get('speaker_segments')) or [],
                        'transcript_duration': float(row.get('transcript_duration') or 0),
                        'quality_score': float(row.get('quality_score') or 0) if row.get('quality_score') is not None else None,
                        'propensity_score': (
                            float(row.get('propensity_score'))
                            if row.get('propensity_score') is not None else None
                        ),
                        'propensity_band': row.get('propensity_band') or None,
                        'summary': row.get('summary') or '',
                        'parameter_scores': self._resolve_parameter_scores(row),
                        'parameter_detections': self._parse_json_field(row.get('parameter_detections')),
                        'call_purpose': row.get('call_purpose') or '',
                        'objections_concerns': row.get('objections_concerns') or '',
                        'sentiment': row.get('sentiment') or '',
                        'objection_type': row.get('objection_type') or '',
                        'raw_response': self._parse_json_field(row.get('raw_response')),
                    } for row in calls
                ]
            }

    # ========================================================================
    # CALL OPERATIONS
    # ========================================================================

    def get_calls(self, bid, filters=None, limit=100, offset=0, scope_where=None, scope_params=None):
        """Get calls with optional filtering"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            table_name = f"{bid}_calls"
            if not self._table_exists(cursor, table_name):
                return self._get_raw_calls_list(cursor, bid, filters, limit, offset, scope_where, scope_params)

            # Build WHERE clause
            where_clauses = []
            params = []

            if filters:
                if 'status' in filters and filters['status'] is not None:
                    where_clauses.append("status = %s")
                    params.append(filters['status'])

                if 'sales_intent' in filters:
                    where_clauses.append("sales_intent = %s")
                    params.append(filters['sales_intent'])

                if 'date_from' in filters:
                    where_clauses.append("call_starttime >= %s")
                    params.append(filters['date_from'])

                if 'date_to' in filters:
                    where_clauses.append("call_starttime <= %s")
                    params.append(filters['date_to'])

            where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""

            query = f"""
                SELECT * FROM `{table_name}`
                {where_sql}
                ORDER BY call_starttime DESC, call_endtime DESC
                LIMIT %s OFFSET %s
            """

            params.extend([limit, offset])
            cursor.execute(query, params)
            calls = cursor.fetchall()

            return [self._format_call_record(call) for call in calls]

    def get_calls_count(self, bid, filters=None, scope_where=None, scope_params=None):
        """Get total count of calls matching filters"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            table_name = f"{bid}_calls"
            if not self._table_exists(cursor, table_name):
                return self._get_raw_calls_count(cursor, bid, filters, scope_where, scope_params)

            where_clauses = []
            params = []

            if filters:
                if 'status' in filters and filters['status'] is not None:
                    where_clauses.append("status = %s")
                    params.append(filters['status'])

                if 'sales_intent' in filters:
                    where_clauses.append("sales_intent = %s")
                    params.append(filters['sales_intent'])

                if 'date_from' in filters:
                    where_clauses.append("call_starttime >= %s")
                    params.append(filters['date_from'])

                if 'date_to' in filters:
                    where_clauses.append("call_starttime <= %s")
                    params.append(filters['date_to'])

            where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""

            query = f"SELECT COUNT(*) as count FROM `{table_name}` {where_sql}"

            cursor.execute(query, params)
            result = cursor.fetchone()
            return result['count'] if result else 0

    def get_call_by_id(self, bid, callid):
        """Get specific call by ID"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            table_name = f"{bid}_calls"
            if not self._table_exists(cursor, table_name):
                return self.get_raw_call_details(bid, callid)

            query = f"SELECT * FROM `{table_name}` WHERE callid = %s LIMIT 1"
            cursor.execute(query, (callid,))
            call = cursor.fetchone()

            return self._format_call_record(call)

    def get_call_transcript(self, bid, callid):
        """Get transcript from sarvamresponse table"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            table_name = f"{bid}_sarvamresponse"

            # Check if table exists
            cursor.execute(f"SHOW TABLES LIKE '{table_name}'")
            if not cursor.fetchone():
                return None

            query = f"""
                SELECT transcript, language, raw_response, speaker_segments, num_speakers, duration
                FROM `{table_name}`
                WHERE callid = %s
                ORDER BY created_at DESC
                LIMIT 1
            """
            cursor.execute(query, (callid,))
            return cursor.fetchone()

    def get_recent_calls(self, bid, limit=10, scope_where=None, scope_params=None):
        """Get most recent calls"""
        return self.get_calls(bid, filters=None, limit=limit, offset=0, scope_where=scope_where, scope_params=scope_params)

    def search_calls(self, bid, query, limit=50, scope_where=None, scope_params=None):
        """Search calls by query string"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            table_name = f"{bid}_calls"
            if not self._table_exists(cursor, table_name):
                return self._search_raw_calls(cursor, bid, query, limit, scope_where, scope_params)

            search_query = f"""
                SELECT * FROM `{table_name}`
                WHERE
                    callid LIKE %s OR
                    customer_name LIKE %s OR
                    agent_name LIKE %s OR
                    summary LIKE %s OR
                    transcripts LIKE %s
                ORDER BY call_starttime DESC, call_endtime DESC
                LIMIT %s
            """

            search_term = f"%{query}%"
            cursor.execute(search_query, (search_term, search_term, search_term,
                                         search_term, search_term, limit))
            calls = cursor.fetchall()

            return [self._format_call_record(call) for call in calls]

    def update_call(self, bid, callid, data):
        """Update call record"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            table_name = f"{bid}_calls"

            # Build UPDATE query dynamically based on provided data
            set_clauses = []
            params = []

            for key, value in data.items():
                if key in ['keywords', 'sentiments', 'emotions', 'customer_details']:
                    # Convert dict/list to JSON string
                    value = json.dumps(value) if value else None

                set_clauses.append(f"{key} = %s")
                params.append(value)

            if not set_clauses:
                return False

            set_clauses.append("updated_at = %s")
            params.append(datetime.now())

            params.append(callid)

            query = f"""
                UPDATE `{table_name}`
                SET {', '.join(set_clauses)}
                WHERE callid = %s
            """

            cursor.execute(query, params)
            return cursor.rowcount > 0

    def save_conversation_summary(self, bid, callid, transfer_reason):
        """Save conversation summary to call_history table"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            table_name = f"{bid}_call_history"

            # Check if table exists
            cursor.execute(f"SHOW TABLES LIKE '{table_name}'")
            if not cursor.fetchone():
                logger.warning(f"Table {table_name} does not exist")
                return False

            query = f"""
                INSERT INTO `{table_name}`
                (business_id, callid, transfer_reason, created_at)
                VALUES (%s, %s, %s, %s)
            """

            cursor.execute(query, (
                bid,
                callid,
                json.dumps(transfer_reason) if transfer_reason else None,
                datetime.now()
            ))

            return cursor.rowcount > 0

    # ========================================================================
    # TRANSCRIPT OPERATIONS
    # ========================================================================

    def get_transcripts(self, bid):
        """Get all transcripts with metadata from sarvamresponse table"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            # Check if sarvamresponse table exists
            sarvam_table = f"{bid}_sarvamresponse"
            cursor.execute(f"SHOW TABLES LIKE '{sarvam_table}'")
            if not cursor.fetchone():
                logger.warning(f"Table {sarvam_table} does not exist")
                return []

            # Get all transcripts with their metadata
            query = f"""
                SELECT
                    s.callid as transcript_id,
                    s.callid,
                    s.transcript as full_transcript,
                    s.speaker_segments,
                    s.num_speakers,
                    s.duration,
                    s.language,
                    s.created_at,
                    s.updated_at,
                    c.call_starttime,
                    c.call_endtime,
                    c.customer_callinfo,
                    c.agentname,
                    TIMESTAMPDIFF(SECOND, c.call_starttime, c.call_endtime) as duration_seconds,
                    CASE
                        WHEN s.transcript IS NOT NULL AND s.transcript != '' THEN TRUE
                        ELSE FALSE
                    END as stored_in_vectordb
                FROM `{sarvam_table}` s
                LEFT JOIN `{bid}_calls` c ON s.callid = c.callid
                ORDER BY s.created_at DESC
            """

            try:
                cursor.execute(query)
                transcripts = cursor.fetchall()

                # Process each transcript to add computed fields
                result = []
                for t in transcripts:
                    transcript_text = t.get('full_transcript', '')

                    # Use actual speaker data from diarization if available
                    num_speakers = t.get('num_speakers')
                    if num_speakers is None:
                        # Fallback estimation
                        num_speakers = 2 if transcript_text else 0

                    # Calculate segments from speaker_segments JSON if available
                    speaker_segments = t.get('speaker_segments')
                    if speaker_segments:
                        try:
                            import json
                            segments_data = json.loads(speaker_segments) if isinstance(speaker_segments, str) else speaker_segments
                            num_segments = len(segments_data) if segments_data else 0
                        except:
                            # Fallback: estimate segments by line breaks
                            num_segments = len([line for line in transcript_text.split('\n') if line.strip()]) if transcript_text else 0
                    else:
                        # Fallback: estimate segments by line breaks
                        num_segments = len([line for line in transcript_text.split('\n') if line.strip()]) if transcript_text else 0

                    # Create filename from callid and timestamp
                    customer_info = t.get('customer_callinfo', 'unknown')
                    filename = f"Call_{t.get('callid', 'unknown')}_{str(customer_info).replace(' ', '_')}.txt"

                    result.append({
                        'transcript_id': t.get('transcript_id'),
                        'callid': t.get('callid'),
                        'filename': filename,
                        'full_transcript': transcript_text,
                        'language': t.get('language'),
                        'num_speakers': num_speakers,
                        'num_segments': num_segments,
                        'duration': t.get('duration') or t.get('duration_seconds', 0),  # Use sarvam duration if available, else calculated
                        'created_at': t.get('created_at').isoformat() if t.get('created_at') else None,
                        'updated_at': t.get('updated_at').isoformat() if t.get('updated_at') else None,
                        'stored_in_vectordb': bool(t.get('stored_in_vectordb')),
                        'customer_name': customer_info,
                        'agent_name': t.get('agentname')
                    })

                return result

            except Exception as e:
                logger.error(f"Error fetching transcripts: {e}")
                return []

    def save_call_analytics(self, bid, callid, analytics_data):
        """Save call analytics to {bid}_callanalytics table"""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                SELECT 1
                FROM information_schema.columns
                WHERE table_schema = DATABASE()
                  AND table_name = %s
                  AND column_name = 'call_starttime'
                LIMIT 1
                """,
                (f"{bid}_callanalytics",),
            )
            has_call_starttime_col = cursor.fetchone() is not None

            if has_call_starttime_col:
                query = f"""
                    INSERT INTO `{bid}_callanalytics` 
                    (callid, bid, summary, call_purpose, objections_concerns, objection_type,
                     quality_score, sentiment, analysis_model, raw_response,
                     parameter_scores, parameter_detections, total_possible_score,
                     parameters_not_applicable, talk_listen_ratio, agent_talk_time,
                     customer_talk_time, dead_air_percentage, agent_speak_percentage,
                     customer_speak_percentage, talk_listen_assessment, call_starttime)
                    SELECT
                        %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 
                        call_starttime 
                    FROM `{bid}_raw_calls` WHERE callid = %s
                    ON DUPLICATE KEY UPDATE
                        summary = VALUES(summary),
                        call_purpose = VALUES(call_purpose),
                        objections_concerns = VALUES(objections_concerns),
                        objection_type = VALUES(objection_type),
                        quality_score = VALUES(quality_score),
                        sentiment = VALUES(sentiment),
                        analysis_model = VALUES(analysis_model),
                        raw_response = VALUES(raw_response),
                        parameter_scores = VALUES(parameter_scores),
                        parameter_detections = VALUES(parameter_detections),
                        total_possible_score = VALUES(total_possible_score),
                        parameters_not_applicable = VALUES(parameters_not_applicable),
                        talk_listen_ratio = VALUES(talk_listen_ratio),
                        agent_talk_time = VALUES(agent_talk_time),
                        customer_talk_time = VALUES(customer_talk_time),
                        dead_air_percentage = VALUES(dead_air_percentage),
                        agent_speak_percentage = VALUES(agent_speak_percentage),
                        customer_speak_percentage = VALUES(customer_speak_percentage),
                        talk_listen_assessment = VALUES(talk_listen_assessment),
                        updated_at = CURRENT_TIMESTAMP
                """
            else:
                query = f"""
                    INSERT INTO `{bid}_callanalytics` 
                    (callid, bid, summary, call_purpose, objections_concerns, objection_type,
                     quality_score, sentiment, analysis_model, raw_response,
                     parameter_scores, parameter_detections, total_possible_score,
                     parameters_not_applicable, talk_listen_ratio, agent_talk_time,
                     customer_talk_time, dead_air_percentage, agent_speak_percentage,
                     customer_speak_percentage, talk_listen_assessment)
                    VALUES
                    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    ON DUPLICATE KEY UPDATE
                        summary = VALUES(summary),
                        call_purpose = VALUES(call_purpose),
                        objections_concerns = VALUES(objections_concerns),
                        objection_type = VALUES(objection_type),
                        quality_score = VALUES(quality_score),
                        sentiment = VALUES(sentiment),
                        analysis_model = VALUES(analysis_model),
                        raw_response = VALUES(raw_response),
                        parameter_scores = VALUES(parameter_scores),
                        parameter_detections = VALUES(parameter_detections),
                        total_possible_score = VALUES(total_possible_score),
                        parameters_not_applicable = VALUES(parameters_not_applicable),
                        talk_listen_ratio = VALUES(talk_listen_ratio),
                        agent_talk_time = VALUES(agent_talk_time),
                        customer_talk_time = VALUES(customer_talk_time),
                        dead_air_percentage = VALUES(dead_air_percentage),
                        agent_speak_percentage = VALUES(agent_speak_percentage),
                        customer_speak_percentage = VALUES(customer_speak_percentage),
                        talk_listen_assessment = VALUES(talk_listen_assessment),
                        updated_at = CURRENT_TIMESTAMP
                """

            query_params = [
                callid,
                analytics_data.get('bid', '7987'),
                analytics_data.get('summary'),
                analytics_data.get('call_purpose'),
                analytics_data.get('objections_concerns'),
                analytics_data.get('objection_type', 'None'),
                analytics_data.get('quality_score'),
                analytics_data.get('sentiment'),
                analytics_data.get('analysis_model', 'aws-nova'),
                json.dumps(analytics_data.get('raw_response')) if analytics_data.get('raw_response') else None,
                analytics_data.get('parameter_scores'),
                analytics_data.get('parameter_detections'),
                analytics_data.get('total_possible_score'),
                analytics_data.get('parameters_not_applicable'),
                analytics_data.get('talk_listen_ratio'),
                analytics_data.get('agent_talk_time'),
                analytics_data.get('customer_talk_time'),
                analytics_data.get('dead_air_percentage'),
                analytics_data.get('agent_speak_percentage'),
                analytics_data.get('customer_speak_percentage'),
                analytics_data.get('talk_listen_assessment'),
            ]
            if has_call_starttime_col:
                query_params.append(callid)
            cursor.execute(query, tuple(query_params))
            cursor.execute(
                f"UPDATE `{bid}_raw_calls` SET status = 3 WHERE callid = %s",
                (callid,),
            )

            conn.commit()
            analytics_id = cursor.lastrowid

            # Save classified objections if present
            classified_objections = analytics_data.get('classified_objections', [])
            if classified_objections:
                self._save_classified_objections(bid, callid, classified_objections, cursor, conn)

            return analytics_id

    def _table_has_column(self, cursor, table_name: str, column_name: str) -> bool:
        cursor.execute(
            """
            SELECT 1
            FROM information_schema.columns
            WHERE table_schema = DATABASE()
              AND table_name = %s
              AND column_name = %s
            LIMIT 1
            """,
            (table_name, column_name),
        )
        return cursor.fetchone() is not None

    def ensure_propensity_analytics_columns(self, bid: str) -> None:
        """Add propensity columns to {bid}_callanalytics when missing (lazy migration)."""
        table = f"{bid}_callanalytics"
        columns = {
            "propensity_score": "INT DEFAULT NULL",
            "propensity_parameter_scores": "JSON DEFAULT NULL",
            "propensity_parameter_detections": "JSON DEFAULT NULL",
            "propensity_band": "VARCHAR(16) DEFAULT NULL",
            "propensity_model": "VARCHAR(64) DEFAULT NULL",
        }
        with self.get_connection() as conn:
            cursor = conn.cursor()
            for col, col_type in columns.items():
                if not self._table_has_column(cursor, table, col):
                    cursor.execute(f"ALTER TABLE `{table}` ADD COLUMN `{col}` {col_type}")
            conn.commit()

    def save_propensity_analytics(self, bid: str, callid: str, data: dict) -> bool:
        """Update propensity fields on an existing analytics row (does not touch quality fields)."""
        if not data:
            return False
        self.ensure_propensity_analytics_columns(bid)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"""
                UPDATE `{bid}_callanalytics`
                SET propensity_score = %s,
                    propensity_parameter_scores = %s,
                    propensity_parameter_detections = %s,
                    propensity_band = %s,
                    propensity_model = %s,
                    updated_at = CURRENT_TIMESTAMP
                WHERE callid = %s
                """,
                (
                    data.get("propensity_score"),
                    data.get("propensity_parameter_scores"),
                    data.get("propensity_parameter_detections"),
                    data.get("propensity_band"),
                    data.get("propensity_model"),
                    callid,
                ),
            )
            conn.commit()
            return cursor.rowcount > 0

    def _avg_propensity_sql(self, cursor, bid: str) -> str:
        table = f"{bid}_callanalytics"
        if self._table_has_column(cursor, table, "propensity_score"):
            return (
                ", AVG(CASE WHEN a.propensity_score IS NOT NULL "
                "THEN a.propensity_score ELSE NULL END) as avg_propensity_score"
            )
        return ", NULL as avg_propensity_score"

    def ensure_bant_table(self, bid):
        """Create the {bid}_bant table if it does not exist."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            query = f"""
                CREATE TABLE IF NOT EXISTS `{bid}_bant` (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    callid VARCHAR(255) UNIQUE,
                    bid VARCHAR(50),
                    profile_json LONGTEXT,
                    profile_summary TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    INDEX (callid)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
            """
            cursor.execute(query)
            conn.commit()

    def save_bant_analysis(self, bid, callid, profile, summary):
        """Save BANT profile to {bid}_bant table."""
        if not profile and not summary:
            return

        self.ensure_bant_table(bid)

        profile_json = None
        if profile:
            try:
                profile_json = json.dumps(profile)
            except Exception:
                profile_json = None

        with self.get_connection() as conn:
            cursor = conn.cursor()
            query = f"""
                INSERT INTO `{bid}_bant` (callid, bid, profile_json, profile_summary)
                VALUES (%s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    profile_json = VALUES(profile_json),
                    profile_summary = VALUES(profile_summary),
                    updated_at = CURRENT_TIMESTAMP
            """
            cursor.execute(query, (
                callid,
                bid,
                profile_json,
                summary
            ))
            conn.commit()

    def get_bant_analysis(self, bid, callid):
        """Fetch BANT profile for a call."""
        try:
            self.ensure_bant_table(bid)
        except Exception:
            return None

        with self.get_connection() as conn:
            cursor = conn.cursor()
            query = f"""
                SELECT profile_json, profile_summary
                FROM `{bid}_bant`
                WHERE callid = %s
                LIMIT 1
            """
            cursor.execute(query, (callid,))
            result = cursor.fetchone()
            if not result:
                return None

            profile = self._parse_json_field(result.get('profile_json'))
            return {
                'profile': profile,
                'summary': result.get('profile_summary') or ''
            }

    def _save_classified_objections(self, bid, callid, classified_objections, cursor, conn):
        """Save classified objections to call_objections table"""
        try:
            # Delete existing objections for this call
            delete_query = """
                DELETE FROM call_objections
                WHERE bid = %s AND callid = %s
            """
            cursor.execute(delete_query, (bid, callid))

            # Insert new classified objections
            if classified_objections:
                insert_query = """
                    INSERT INTO call_objections
                    (bid, callid, classification_id, objection_text, confidence, created_at)
                    VALUES (%s, %s, %s, %s, %s, NOW())
                """

                for objection in classified_objections:
                    cursor.execute(insert_query, (
                        bid,
                        callid,
                        objection.get('classification_id'),
                        objection.get('objection_text'),
                        objection.get('confidence', 'medium')
                    ))

                conn.commit()
                logger.info(f"Saved {len(classified_objections)} classified objections for call {callid}")

        except Exception as e:
            logger.error(f"Error saving classified objections: {e}")
            # Don't raise - continue even if objection saving fails

    def get_call_analytics(self, bid, callid):
        """Get analytics for a specific call"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            query = f"""
                SELECT * FROM `{bid}_callanalytics`
                WHERE callid = %s
                LIMIT 1
            """

            cursor.execute(query, (callid,))
            result = cursor.fetchone()

            if result and result.get('raw_response'):
                result['raw_response'] = self._parse_json_field(result['raw_response'])

            return result

    def get_calls_for_analysis(self, bid, limit=10):
        """Get calls that have transcripts but no analytics yet"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            query = f"""
                SELECT
                    r.callid,
                    r.bid,
                    r.agentname,
                    r.groupname,
                    r.call_starttime,
                    r.call_endtime,
                    s.transcript,
                    s.speaker_segments,
                    s.num_speakers,
                    s.duration
                FROM `{bid}_raw_calls` r
                INNER JOIN `{bid}_sarvamresponse` s ON r.callid = s.callid
                LEFT JOIN `{bid}_callanalytics` a ON r.callid = a.callid
                WHERE r.call_status = 'ANSWER'
                  AND s.transcript IS NOT NULL
                  AND s.transcript != ''
                  AND a.callid IS NULL
                ORDER BY r.call_starttime DESC
                LIMIT %s
            """

            cursor.execute(query, (limit,))
            results = cursor.fetchall()

            # Parse JSON fields
            for result in results:
                if result.get('speaker_segments'):
                    result['speaker_segments'] = self._parse_json_field(result['speaker_segments'])

            return results

    def get_analytics_overview(self, bid, groupname=None, date_from=None, date_to=None, scope_where=None, scope_params=None):
        """Get overall analytics summary"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            where_clause, params = self._analytics_where(groupname, date_from, date_to, scope_where, scope_params)

            propensity_avg_sql = self._avg_propensity_sql(cursor, bid)

            query = f"""
                SELECT
                    COUNT(DISTINCT a.callid) as total_analyzed_calls,
                    AVG(a.quality_score) as avg_quality_score
                    {propensity_avg_sql},
                    SUM(CASE WHEN a.sentiment = 'positive' THEN 1 ELSE 0 END) as positive_calls,
                    SUM(CASE WHEN a.sentiment = 'neutral' THEN 1 ELSE 0 END) as neutral_calls,
                    SUM(CASE WHEN a.sentiment = 'negative' THEN 1 ELSE 0 END) as negative_calls,
                    COUNT(DISTINCT r.groupname) as total_locations
                FROM `{bid}_callanalytics` a
                INNER JOIN `{bid}_raw_calls` r ON a.callid = r.callid
                {where_clause}
            """

            cursor.execute(query, params)
            result = cursor.fetchone()
            return result if result else {}

    def get_sentiment_by_location(self, bid, groupname=None, date_from=None, date_to=None, scope_where=None, scope_params=None):
        """Get sentiment distribution by location"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            where_clause, params = self._analytics_where(
                groupname, date_from, date_to, scope_where, scope_params, base_conditions=["a.sentiment IS NOT NULL"]
            )

            query = f"""
                SELECT
                    r.groupname as location,
                    a.sentiment,
                    COUNT(*) as count
                FROM `{bid}_callanalytics` a
                INNER JOIN `{bid}_raw_calls` r ON a.callid = r.callid
                {where_clause}
                GROUP BY r.groupname, a.sentiment
                ORDER BY r.groupname, a.sentiment
            """

            cursor.execute(query, params)
            return cursor.fetchall()

    def get_quality_by_location(self, bid, groupname=None, date_from=None, date_to=None, scope_where=None, scope_params=None):
        """Get average quality score by location"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            where_clause, params = self._analytics_where(groupname, date_from, date_to, scope_where, scope_params)

            query = f"""
                SELECT
                    r.groupname as location,
                    AVG(a.quality_score) as avg_quality_score,
                    MIN(a.quality_score) as min_quality_score,
                    MAX(a.quality_score) as max_quality_score,
                    COUNT(*) as call_count
                FROM `{bid}_callanalytics` a
                INNER JOIN `{bid}_raw_calls` r ON a.callid = r.callid
                {where_clause}
                GROUP BY r.groupname
                ORDER BY avg_quality_score DESC
            """

            cursor.execute(query, params)
            return cursor.fetchall()

    def get_quality_by_agent(self, bid, groupname=None, date_from=None, date_to=None, scope_where=None, scope_params=None):
        """Get average quality score by agent"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            where_clause, params = self._analytics_where(groupname, date_from, date_to, scope_where, scope_params)

            query = f"""
                SELECT
                    COALESCE(NULLIF(TRIM(r.agentname), ''), NULLIF(TRIM(r.agent_callinfo), ''), 'Unknown') as agent,
                    COALESCE(NULLIF(TRIM(r.agentname), ''), NULLIF(TRIM(r.agent_callinfo), ''), 'Unknown') as agent_display,
                    AVG(a.quality_score) as avg_quality_score,
                    MIN(a.quality_score) as min_quality_score,
                    MAX(a.quality_score) as max_quality_score,
                    COUNT(*) as call_count
                FROM `{bid}_callanalytics` a
                INNER JOIN `{bid}_raw_calls` r ON a.callid = r.callid
                {where_clause}
                GROUP BY agent, agent_display
                ORDER BY avg_quality_score DESC
                LIMIT 6
            """

            cursor.execute(query, params)
            return cursor.fetchall()

    def get_agent_leaderboard(self, bid, groupname=None, date_from=None, date_to=None, scope_where=None, scope_params=None):
        """Return per-agent scoring leaderboard with full stats and date filtering."""
        where, params = self._analytics_where(
            groupname,
            date_from,
            date_to,
            scope_where,
            scope_params,
            base_conditions=["a.quality_score IS NOT NULL"],
            date_on_datetime=True,
        )

        query = f"""
            SELECT
                COALESCE(NULLIF(r.agentname, ''), 'Unknown') AS agent_name,
                ROUND(AVG(a.quality_score), 1)  AS avg_score,
                ROUND(MIN(a.quality_score), 1)  AS min_score,
                ROUND(MAX(a.quality_score), 1)  AS max_score,
                COUNT(*)                         AS total_calls,
                SUM(CASE WHEN a.quality_score >= 85 THEN 1 ELSE 0 END) AS high_calls,
                SUM(CASE WHEN a.quality_score >= 60
                          AND a.quality_score < 85 THEN 1 ELSE 0 END)  AS mid_calls,
                SUM(CASE WHEN a.quality_score < 60  THEN 1 ELSE 0 END) AS low_calls,
                MAX(r.call_starttime)            AS last_call
            FROM `{bid}_callanalytics` a
            INNER JOIN `{bid}_raw_calls` r ON a.callid = r.callid
            {where}
            GROUP BY agent_name
            ORDER BY avg_score DESC
        """

        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(query, params)
            rows = cursor.fetchall() or []

        result = []
        for rank, row in enumerate(rows, 1):
            last_call = row.get("last_call")
            result.append({
                "rank": rank,
                "agent_name": row["agent_name"],
                "avg_score": float(row["avg_score"] or 0),
                "min_score": float(row["min_score"] or 0),
                "max_score": float(row["max_score"] or 0),
                "total_calls": int(row["total_calls"] or 0),
                "high_calls": int(row["high_calls"] or 0),
                "mid_calls": int(row["mid_calls"] or 0),
                "low_calls": int(row["low_calls"] or 0),
                "last_call": last_call.isoformat() if hasattr(last_call, "isoformat") else str(last_call or ""),
            })
        return result

    def get_call_purpose_frequency(self, bid, groupname=None, date_from=None, date_to=None, scope_where=None, scope_params=None):
        """Get frequency of different call purposes"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            where_clause, params = self._analytics_where(
                groupname,
                date_from,
                date_to,
                scope_where,
                scope_params,
                base_conditions=["a.call_purpose IS NOT NULL", "a.call_purpose != ''"],
            )

            query = f"""
                SELECT
                    a.call_purpose,
                    COUNT(*) as count
                FROM `{bid}_callanalytics` a
                INNER JOIN `{bid}_raw_calls` r ON a.callid = r.callid
                {where_clause}
                GROUP BY a.call_purpose
                ORDER BY count DESC
                LIMIT 20
            """

            cursor.execute(query, params)
            return cursor.fetchall()

    # -------------------------------------------------------------------------
    # STT Pipeline Bid Config
    # -------------------------------------------------------------------------

    def ensure_stt_pipeline_bid_config_table(self):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS `stt_pipeline_bid_config` (
                    `bid`               VARCHAR(100) NOT NULL,
                    `enabled`           TINYINT(1)   NOT NULL DEFAULT 0,
                    `raw_calls_id_col`  VARCHAR(100) NOT NULL DEFAULT 'id',
                    `raw_calls_url_col` VARCHAR(100) NOT NULL DEFAULT 'recording_url',
                    `batch_size`        INT          NOT NULL DEFAULT 10,
                    `poll_interval_s`   INT          NOT NULL DEFAULT 30,
                    `notes`             TEXT         NULL,
                    `created_at`        DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP,
                    `updated_at`        DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    PRIMARY KEY (`bid`)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
            """)

    def get_stt_pipeline_bid_configs(self):
        self.ensure_stt_pipeline_bid_config_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT * FROM stt_pipeline_bid_config ORDER BY bid")
            return cursor.fetchall() or []

    def get_stt_pipeline_bid_config(self, bid):
        self.ensure_stt_pipeline_bid_config_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT * FROM stt_pipeline_bid_config WHERE bid = %s", (bid,))
            return cursor.fetchone()

    def upsert_stt_pipeline_bid_config(self, bid, data: dict):
        self.ensure_stt_pipeline_bid_config_table()
        allowed = {"enabled", "raw_calls_id_col", "raw_calls_url_col", "batch_size",
                   "poll_interval_s", "notes"}
        fields = {k: v for k, v in data.items() if k in allowed}
        if not fields:
            return
        col_list = ", ".join(f"`{k}`" for k in fields)
        ph_list = ", ".join(["%s"] * len(fields))
        dup_set = ", ".join(f"`{k}` = VALUES(`{k}`)" for k in fields)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"INSERT INTO stt_pipeline_bid_config (bid, {col_list}) "
                f"VALUES (%s, {ph_list}) "
                f"ON DUPLICATE KEY UPDATE {dup_set}",
                [bid] + list(fields.values()),
            )

    def toggle_stt_pipeline_bid(self, bid, enabled: bool):
        self.ensure_stt_pipeline_bid_config_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "INSERT INTO stt_pipeline_bid_config (bid, enabled) VALUES (%s, %s) "
                "ON DUPLICATE KEY UPDATE enabled = VALUES(enabled)",
                (bid, 1 if enabled else 0),
            )

    def get_stt_job_stats(self, bid=None):
        """Return job counts by status from stt_jobs table. Returns {} if table missing."""
        try:
            with self.get_connection() as conn:
                cursor = conn.cursor()
                if bid:
                    cursor.execute(
                        "SELECT status, COUNT(*) as cnt FROM stt_jobs WHERE bid = %s GROUP BY status",
                        (bid,),
                    )
                else:
                    cursor.execute("SELECT status, COUNT(*) as cnt FROM stt_jobs GROUP BY status")
                rows = cursor.fetchall() or []
            return {r["status"]: r["cnt"] for r in rows}
        except Exception as exc:
            logger.warning("get_stt_job_stats failed: %s", exc)
            return {}

    def discover_stt_raw_call_bids(self):
        """Return all bids that have a *_raw_calls table in this DB."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT TABLE_NAME FROM information_schema.TABLES "
                "WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME LIKE %s",
                ("%_raw_calls",),
            )
            rows = cursor.fetchall() or []
        return [r["TABLE_NAME"].replace("_raw_calls", "") for r in rows]

    def get_concerns_frequency(self, bid, groupname=None, date_from=None, date_to=None, scope_where=None, scope_params=None):
        """Get frequency of different concerns/objections"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            conditions = [
                "a.objections_concerns IS NOT NULL",
                "a.objections_concerns != ''",
                "a.objections_concerns != 'None identified'",
                "a.objections_concerns != 'None'",
            ]
            params = []
            if groupname:
                conditions.append("r.groupname = %s")
                params.append(groupname)
            if date_from:
                conditions.append("DATE(r.call_starttime) >= %s")
                params.append(date_from)
            if date_to:
                conditions.append("DATE(r.call_starttime) <= %s")
                params.append(date_to)
            self._append_scope_filter(conditions, params, scope_where, scope_params)
            where_clause = "WHERE " + " AND ".join(conditions)

            query = f"""
                SELECT
                    a.objections_concerns,
                    a.objection_type,
                    COUNT(*) as count
                FROM `{bid}_callanalytics` a
                INNER JOIN `{bid}_raw_calls` r ON a.callid = r.callid
                {where_clause}
                GROUP BY a.objections_concerns, a.objection_type
                ORDER BY count DESC
                LIMIT 20
            """

            cursor.execute(query, params)
            return cursor.fetchall()

    def get_busy_locations(self, bid, groupname=None, date_from=None, date_to=None, scope_where=None, scope_params=None):
        """Get busiest locations by call volume"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            conditions = []
            params = []
            if groupname:
                conditions.append("r.groupname = %s")
                params.append(groupname)
            if date_from:
                conditions.append("DATE(r.call_starttime) >= %s")
                params.append(date_from)
            if date_to:
                conditions.append("DATE(r.call_starttime) <= %s")
                params.append(date_to)
            self._append_scope_filter(conditions, params, scope_where, scope_params)
            where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""

            query = f"""
                SELECT
                    r.groupname as location,
                    COUNT(DISTINCT a.callid) as analyzed_calls,
                    COUNT(DISTINCT r.callid) as total_calls,
                    ROUND(COUNT(DISTINCT a.callid) * 100.0 / COUNT(DISTINCT r.callid), 2) as analysis_percentage
                FROM `{bid}_raw_calls` r
                LEFT JOIN `{bid}_callanalytics` a ON r.callid = a.callid
                {where_clause}
                GROUP BY r.groupname
                ORDER BY analyzed_calls DESC
                LIMIT 20
            """

            cursor.execute(query, params)
            return cursor.fetchall()

    def get_calls_by_objection(self, bid, objection, groupname=None, scope_where=None, scope_params=None):
        """Get all calls with a specific objection/concern"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            conditions = ["a.objections_concerns = %s"]
            params = [objection]

            if groupname:
                conditions.append("r.groupname = %s")
                params.append(groupname)

            self._append_scope_filter(conditions, params, scope_where, scope_params)
            where_clause = "WHERE " + " AND ".join(conditions)

            query = f"""
                SELECT
                    a.callid,
                    r.groupname as location,
                    a.call_purpose,
                    a.sentiment,
                    a.quality_score,
                    a.objections_concerns,
                    a.created_at
                FROM `{bid}_callanalytics` a
                INNER JOIN `{bid}_raw_calls` r ON a.callid = r.callid
                {where_clause}
                ORDER BY a.created_at DESC
                LIMIT 100
            """

            cursor.execute(query, params)
            return cursor.fetchall()

    def delete_transcript(self, bid, callid):
        """Delete transcript from sarvamresponse table"""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            query = f"DELETE FROM `{bid}_sarvamresponse` WHERE callid = %s"
            cursor.execute(query, (callid,))
            conn.commit()
            return cursor.rowcount

    def reset_transcription_status(self, bid, callid):
        """Reset transcription_status to 0 in raw_calls table"""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            query = f"UPDATE `{bid}_raw_calls` SET transcription_status = 0 WHERE callid = %s"
            cursor.execute(query, (callid,))
            conn.commit()
            return cursor.rowcount

    def update_speaker_segment_text(self, bid, callid, segment_index, new_text):
        """Update the text of a specific speaker segment"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            # Get current speaker_segments
            query = f"SELECT speaker_segments FROM `{bid}_sarvamresponse` WHERE callid = %s"
            cursor.execute(query, (callid,))
            result = cursor.fetchone()

            if not result or not result['speaker_segments']:
                raise ValueError(f"No speaker segments found for call {callid}")

            # Parse segments
            segments = self._parse_json_field(result['speaker_segments'])
            if not segments or not isinstance(segments, list):
                raise ValueError(f"Invalid speaker_segments format for call {callid}")

            # Validate segment index
            if segment_index < 0 or segment_index >= len(segments):
                raise ValueError(f"Invalid segment index {segment_index} (total segments: {len(segments)})")

            # Update the text
            segments[segment_index]['text'] = new_text

            # Save back to database
            update_query = f"UPDATE `{bid}_sarvamresponse` SET speaker_segments = %s, updated_at = NOW() WHERE callid = %s"
            cursor.execute(update_query, (json.dumps(segments), callid))
            conn.commit()

            logger.info(f"Updated segment {segment_index} for call {callid}")
            return True

    def get_agent_names(self, bid, groupname=None):
        """Get list of unique agent names, optionally filtered by groupname"""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            if groupname:
                query = f"""
                    SELECT DISTINCT agentname
                    FROM `{bid}_raw_calls`
                    WHERE agentname IS NOT NULL
                    AND agentname != ''
                    AND groupname = %s
                    ORDER BY agentname
                """
                cursor.execute(query, (groupname,))
            else:
                query = f"""
                    SELECT DISTINCT agentname
                    FROM `{bid}_raw_calls`
                    WHERE agentname IS NOT NULL
                    AND agentname != ''
                    ORDER BY agentname
                """
                cursor.execute(query)

            agents = [row['agentname'] for row in cursor.fetchall()]
            return agents

    def import_raw_calls_from_excel(self, bid, file_storage):
        """Import raw calls from an Excel (.xlsx) file into {bid}_raw_calls."""
        try:
            import openpyxl
        except ImportError as e:
            raise ValueError("openpyxl is required for Excel uploads") from e

        if not file_storage or not getattr(file_storage, 'filename', ''):
            raise ValueError("No file provided for upload")

        filename = file_storage.filename.lower()
        if not filename.endswith('.xlsx'):
            raise ValueError("Only .xlsx files are supported")

        try:
            bid_value = int(bid)
        except (TypeError, ValueError) as e:
            raise ValueError("Business ID must be numeric") from e

        try:
            workbook = openpyxl.load_workbook(file_storage, data_only=True)
        except Exception as e:
            raise ValueError(f"Failed to read Excel file: {e}") from e

        worksheet = workbook.active
        rows_iter = worksheet.iter_rows(values_only=True)
        try:
            headers = next(rows_iter)
        except StopIteration:
            raise ValueError("Excel file is empty")

        if not headers:
            raise ValueError("Excel file header row is missing")

        import re

        def normalize_header(value):
            if value is None:
                return ''
            header = str(value).strip().lower()
            header = header.replace(' ', '_').replace('-', '_')
            header = re.sub(r'[^a-z0-9_]', '', header)
            return header

        allowed_columns = [
            'bid',
            'callid',
            'fileurl',
            'status',
            'agentname',
            'groupname',
            'call_starttime',
            'call_endtime',
            'call_status',
            'agent_callinfo',
            'customer_callinfo',
            'direction',
            'transcription_requested',
            'transcription_status',
            'selected_for_processing'
        ]

        aliases = {
            'call_id': 'callid',
            'file_url': 'fileurl',
            'fileurl': 'fileurl',
            'audio_url': 'fileurl',
            'recording_url': 'fileurl',
            'recordingurl': 'fileurl',
            'filename': 'fileurl',
            'file_name': 'fileurl',
            'agent_name': 'agentname',
            'group_name': 'groupname',
            'starttime': 'call_starttime',
            'start_time': 'call_starttime',
            'call_start_time': 'call_starttime',
            'endtime': 'call_endtime',
            'end_time': 'call_endtime',
            'call_end_time': 'call_endtime',
            'dialstatus': 'call_status',
            'agent_phone': 'agent_callinfo',
            'emp_phone': 'agent_callinfo',
            'clicktocalldid': 'customer_callinfo'
        }

        header_map = {}
        for idx, raw_header in enumerate(headers):
            normalized = normalize_header(raw_header)
            if not normalized:
                continue
            column = aliases.get(normalized, normalized)
            if column in allowed_columns:
                header_map[idx] = column

        if 'callid' not in header_map.values():
            raise ValueError("Excel must include a 'callid' column")

        def normalize_callid(value):
            if value is None:
                return None
            if isinstance(value, float) and value.is_integer():
                return str(int(value))
            return str(value).strip()

        records = []
        skipped = 0
        for row in rows_iter:
            if not row or all(cell is None or str(cell).strip() == '' for cell in row):
                continue

            record = {'bid': bid_value}
            for idx, column in header_map.items():
                if idx >= len(row):
                    continue
                value = row[idx]
                if isinstance(value, str):
                    value = value.strip()
                    if value == '':
                        value = None
                record[column] = value

            callid = normalize_callid(record.get('callid'))
            if not callid:
                skipped += 1
                continue
            record['callid'] = callid
            records.append(record)

        if not records:
            raise ValueError("No valid rows found to import")

        present_columns = set()
        for record in records:
            present_columns.update(record.keys())

        columns = [col for col in allowed_columns if col in present_columns]
        if 'bid' not in columns:
            columns.insert(0, 'bid')
        if 'callid' not in columns:
            columns.insert(1, 'callid')

        values = [[record.get(col) for col in columns] for record in records]

        table_name = f"{bid}_raw_calls"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("SHOW TABLES LIKE %s", (table_name,))
            if not cursor.fetchone():
                raise ValueError(f"Table {table_name} does not exist")

            columns_sql = ", ".join(f"`{col}`" for col in columns)
            placeholders = ", ".join(["%s"] * len(columns))
            update_cols = [col for col in columns if col not in ('callid',)]
            update_sql = ", ".join(
                f"`{col}` = VALUES(`{col}`)" for col in update_cols
            )
            query = (
                f"INSERT INTO `{table_name}` ({columns_sql}) "
                f"VALUES ({placeholders})"
            )
            if update_cols:
                query += f" ON DUPLICATE KEY UPDATE {update_sql}"

            chunk_size = 500
            for i in range(0, len(values), chunk_size):
                cursor.executemany(query, values[i:i + chunk_size])

        return {
            'processed': len(records),
            'skipped': skipped,
            'columns': columns
        }

    # =========================================================================
    # business_pipeline_config
    # =========================================================================

    def ensure_business_pipeline_config_table(self) -> None:
        """Create business_pipeline_config if it doesn't exist."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS business_pipeline_config (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(20) NOT NULL UNIQUE,
                    pipeline_enabled TINYINT(1) NOT NULL DEFAULT 1,
                    source_db_host VARCHAR(255) NOT NULL DEFAULT '',
                    source_db_port INT NOT NULL DEFAULT 3306,
                    source_db_user VARCHAR(255) NOT NULL DEFAULT '',
                    source_db_password_enc TEXT,
                    source_db_name VARCHAR(255) NOT NULL DEFAULT '',
                    stt_provider VARCHAR(50) NOT NULL DEFAULT 'sarvam',
                    stt_api_key_enc TEXT,
                    min_call_duration_s INT NOT NULL DEFAULT 120,
                    sync_batch INT NOT NULL DEFAULT 500,
                    transcribe_batch INT NOT NULL DEFAULT 3,
                    sync_interval_s INT NOT NULL DEFAULT 120,
                    lead_filter_enabled TINYINT(1) NOT NULL DEFAULT 1,
                    crm_provider VARCHAR(50) DEFAULT 'leadsquared',
                    lookback_days INT NOT NULL DEFAULT 90,
                    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
                    updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
            """)
            self._ensure_business_pipeline_config_columns(cursor)

    def _ensure_business_pipeline_config_columns(self, cursor) -> None:
        cursor.execute(
            """
            SELECT column_name
            FROM information_schema.columns
            WHERE table_schema = DATABASE()
              AND table_name = 'business_pipeline_config'
            """
        )
        existing = {
            str(row.get("column_name") or row.get("COLUMN_NAME") or "").lower()
            for row in (cursor.fetchall() or [])
        }
        columns = {
            "source_type": "ALTER TABLE business_pipeline_config ADD COLUMN source_type VARCHAR(50) DEFAULT NULL",
            "source_bid": "ALTER TABLE business_pipeline_config ADD COLUMN source_bid VARCHAR(50) DEFAULT NULL",
            "source_table": "ALTER TABLE business_pipeline_config ADD COLUMN source_table VARCHAR(255) NOT NULL DEFAULT ''",
            "analysis_mode": "ALTER TABLE business_pipeline_config ADD COLUMN analysis_mode VARCHAR(50) NOT NULL DEFAULT 'default'",
            "analytics_enabled": "ALTER TABLE business_pipeline_config ADD COLUMN analytics_enabled TINYINT(1) NOT NULL DEFAULT 1",
            "processing_mode": "ALTER TABLE business_pipeline_config ADD COLUMN processing_mode VARCHAR(50) NOT NULL DEFAULT 'default'",
            "group_filter_enabled": "ALTER TABLE business_pipeline_config ADD COLUMN group_filter_enabled TINYINT(1) NOT NULL DEFAULT 0",
            "allowed_groupnames": "ALTER TABLE business_pipeline_config ADD COLUMN allowed_groupnames TEXT",
            "min_call_duration_effective_at": "ALTER TABLE business_pipeline_config ADD COLUMN min_call_duration_effective_at DATETIME DEFAULT NULL",
            "propensity_enabled": "ALTER TABLE business_pipeline_config ADD COLUMN propensity_enabled TINYINT(1) NOT NULL DEFAULT 0",
            "summary_mode": "ALTER TABLE business_pipeline_config ADD COLUMN summary_mode VARCHAR(50) NOT NULL DEFAULT 'default'",
            "summary_instructions": "ALTER TABLE business_pipeline_config ADD COLUMN summary_instructions TEXT DEFAULT NULL",
            "webhook_ingest_enabled": "ALTER TABLE business_pipeline_config ADD COLUMN webhook_ingest_enabled TINYINT(1) NOT NULL DEFAULT 0",
            "ingest_secret": "ALTER TABLE business_pipeline_config ADD COLUMN ingest_secret VARCHAR(255) NOT NULL DEFAULT ''",
        }
        for name, ddl in columns.items():
            if name not in existing:
                cursor.execute(ddl)

        # Older deployments created a stricter table with NOT NULL columns and
        # no defaults. Keep the columns compatible with partial upserts.
        relax_columns = {
            "source_table": "ALTER TABLE business_pipeline_config MODIFY COLUMN source_table VARCHAR(255) NOT NULL DEFAULT ''",
            "source_type": "ALTER TABLE business_pipeline_config MODIFY COLUMN source_type VARCHAR(50) DEFAULT NULL",
            "processing_mode": "ALTER TABLE business_pipeline_config MODIFY COLUMN processing_mode VARCHAR(50) NOT NULL DEFAULT 'default'",
        }
        for name, ddl in relax_columns.items():
            if name in existing or name in columns:
                cursor.execute(ddl)

    def get_pipeline_config(self, bid: str) -> dict | None:
        """Return the pipeline config dict for *bid*, or None if not found."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT * FROM business_pipeline_config WHERE bid = %s",
                (str(bid),),
            )
            return cursor.fetchone()

    def ensure_min_duration_effective_at(self, bid: str):
        """
        Ensure min duration has a forward-only effective timestamp.

        When ``min_call_duration_s`` is set but ``min_call_duration_effective_at``
        is missing, stamp it to now so older calls are grandfathered.
        """
        cfg = self.get_pipeline_config(bid) or {}
        min_s = max(0, int(cfg.get("min_call_duration_s") or 0))
        if min_s <= 0:
            return cfg.get("min_call_duration_effective_at")
        existing = cfg.get("min_call_duration_effective_at")
        if existing:
            return existing
        now = datetime.now().replace(microsecond=0)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                UPDATE business_pipeline_config
                SET min_call_duration_effective_at = %s
                WHERE bid = %s
                """,
                (now, str(bid)),
            )
            conn.commit()
        return now

    def is_webhook_ingest_enabled(self, bid: str) -> bool:
        """True only when Master Panel webhook ingest toggle is on for this BID."""
        try:
            cfg = self.get_pipeline_config(bid) or {}
            return bool(int(cfg.get("webhook_ingest_enabled") or 0))
        except Exception:
            return False

    @staticmethod
    def _decode_allowed_groupnames(value) -> list:
        if isinstance(value, list):
            return [str(v).strip() for v in value if str(v).strip()]
        if isinstance(value, str) and value.strip():
            try:
                parsed = json.loads(value)
                if isinstance(parsed, list):
                    return [str(v).strip() for v in parsed if str(v).strip()]
            except Exception:
                pass
            return [part.strip() for part in value.split(",") if part.strip()]
        return []

    def normalize_parameter_score(self, score, max_score):
        """Clamp parameter score to [0, max_score]; accept 0–1 fractions when max_score > 1."""
        try:
            score_val = float(score or 0)
            max_val = float(max_score or 0)
        except (TypeError, ValueError):
            return 0.0
        if max_val <= 0:
            return max(0.0, score_val)
        if 0 <= score_val <= 1 and max_val > 1:
            score_val = score_val * max_val
        return max(0.0, min(score_val, max_val))

    def compute_quality_score_from_parameter_scores(self, parameter_scores):
        """Return (quality_score_percent, total_possible_points) from parameter_scores dict."""
        if not isinstance(parameter_scores, dict) or not parameter_scores:
            return None, None
        total = 0.0
        possible = 0.0
        for detail in parameter_scores.values():
            if not isinstance(detail, dict):
                continue
            max_val = float(detail.get("max_score") or 0)
            if not detail.get("applicable", True):
                possible += max_val
                continue
            total += float(detail.get("score") or 0)
            possible += max_val
        if possible <= 0:
            return None, None
        return round((total / possible) * 100, 2), possible

    def get_enabled_pipeline_bids(self) -> list:
        """Return list of bid strings where pipeline_enabled = 1."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT bid FROM business_pipeline_config WHERE pipeline_enabled = 1 ORDER BY bid"
            )
            rows = cursor.fetchall() or []
            return [r["bid"] for r in rows]

    @staticmethod
    def _pipeline_config_insert_defaults() -> dict:
        """Defaults for first-time INSERT when only a subset of fields is saved (e.g. summary_mode)."""
        return {
            "pipeline_enabled": 1,
            "source_db_host": "",
            "source_db_port": 3306,
            "source_db_user": "",
            "source_db_name": "",
            "stt_provider": "sarvam",
            "min_call_duration_s": 120,
            "sync_batch": 500,
            "transcribe_batch": 3,
            "sync_interval_s": 120,
            "lead_filter_enabled": 1,
            "crm_provider": "leadsquared",
            "lookback_days": 90,
            "source_table": "",
            "analysis_mode": "default",
            "analytics_enabled": 1,
            "processing_mode": "default",
            "group_filter_enabled": 0,
            "propensity_enabled": 0,
            "summary_mode": "default",
            "summary_instructions": None,
        }

    def save_pipeline_config(self, bid: str, data: dict) -> None:
        """Upsert a pipeline config row for *bid*.

        *data* may contain any subset of the table columns (except ``id``,
        ``bid``, ``created_at``, ``updated_at``).  Sensitive fields
        (``stt_api_key``, ``source_db_password``) are encrypted on-the-fly
        if passed as plain text — callers should pass the plain value under
        the key without the ``_enc`` suffix (e.g. ``stt_api_key``).
        """
        self.ensure_business_pipeline_config_table()
        row_exists = self.get_pipeline_config(bid) is not None
        row = dict(data)
        row["bid"] = str(bid)
        if not row_exists:
            merged = self._pipeline_config_insert_defaults()
            merged.update(row)
            row = merged

        if "allowed_groupnames" in row and isinstance(row["allowed_groupnames"], list):
            row["allowed_groupnames"] = json.dumps(row["allowed_groupnames"])

        if "min_call_duration_s" in row:
            existing = self.get_pipeline_config(bid) or {}
            try:
                new_min = max(0, int(row["min_call_duration_s"]))
            except (TypeError, ValueError):
                new_min = 0
            old_min_raw = existing.get("min_call_duration_s")
            try:
                old_min = max(0, int(old_min_raw)) if old_min_raw is not None else 0
            except (TypeError, ValueError):
                old_min = 0
            if new_min != old_min or not existing.get("min_call_duration_effective_at"):
                row["min_call_duration_effective_at"] = datetime.now().replace(microsecond=0)

        # Encrypt secrets if provided in plain form
        for plain_key in ("stt_api_key", "source_db_password"):
            enc_key = plain_key + "_enc"
            if plain_key in row:
                row[enc_key] = self._encrypt_text(row.pop(plain_key))

        # Remove read-only fields
        for f in ("id", "created_at", "updated_at"):
            row.pop(f, None)

        cols = list(row.keys())
        placeholders = ", ".join(["%s"] * len(cols))
        cols_sql = ", ".join(f"`{c}`" for c in cols)
        update_sql = ", ".join(
            f"`{c}` = VALUES(`{c}`)" for c in cols if c != "bid"
        )
        sql = (
            f"INSERT INTO business_pipeline_config ({cols_sql}) VALUES ({placeholders})"
            f" ON DUPLICATE KEY UPDATE {update_sql}"
        )
        with self.get_connection() as conn:
            cursor = conn.cursor()
            if row_exists:
                update_row = {k: v for k, v in row.items() if k != "bid"}
                if update_row:
                    set_sql = ", ".join(f"`{k}` = %s" for k in update_row)
                    params = list(update_row.values()) + [str(bid)]
                    cursor.execute(
                        f"UPDATE business_pipeline_config SET {set_sql} WHERE bid = %s",
                        params,
                    )
            else:
                cursor.execute(sql, [row[c] for c in cols])
            conn.commit()

    def save_summary_config(self, bid: str, summary_mode: str, summary_instructions=None) -> None:
        """Persist only call summary settings for a BID (safe partial update)."""
        from summary_config import normalize_summary_mode

        self.ensure_business_pipeline_config_table()
        mode = normalize_summary_mode(summary_mode)
        instructions = (summary_instructions or "").strip() or None
        if mode == "custom" and not instructions:
            raise ValueError("summary_instructions is required when summary_mode is custom")

        with self.get_connection() as conn:
            cursor = conn.cursor()
            existing = self.get_pipeline_config(bid)
            if existing:
                cursor.execute(
                    """
                    UPDATE business_pipeline_config
                    SET summary_mode = %s, summary_instructions = %s
                    WHERE bid = %s
                    """,
                    (mode, instructions, str(bid)),
                )
            else:
                row = self._pipeline_config_insert_defaults()
                row["bid"] = str(bid)
                row["summary_mode"] = mode
                row["summary_instructions"] = instructions
                cols = list(row.keys())
                placeholders = ", ".join(["%s"] * len(cols))
                cols_sql = ", ".join(f"`{c}`" for c in cols)
                cursor.execute(
                    f"INSERT INTO business_pipeline_config ({cols_sql}) VALUES ({placeholders})",
                    [row[c] for c in cols],
                )
            conn.commit()

    def ensure_pcaa_ingest_log_table(self) -> None:
        """Audit log for universal call ingest webhook events."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS pcaa_ingest_log (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(20) NOT NULL,
                    callid VARCHAR(64) DEFAULT NULL,
                    source VARCHAR(50) DEFAULT NULL,
                    payload JSON DEFAULT NULL,
                    action VARCHAR(50) NOT NULL,
                    skip_reason VARCHAR(255) DEFAULT NULL,
                    queued TINYINT(1) NOT NULL DEFAULT 0,
                    signature_valid TINYINT(1) NOT NULL DEFAULT 0,
                    received_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
                    INDEX idx_bid_received (bid, received_at),
                    INDEX idx_callid (callid)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
                """
            )

    def log_call_ingest_event(
        self,
        *,
        bid: str,
        callid: Optional[str],
        source: str,
        payload: dict,
        action: str,
        reason: Optional[str],
        queued: bool,
        signature_valid: bool,
    ) -> None:
        self.ensure_pcaa_ingest_log_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                INSERT INTO pcaa_ingest_log
                (bid, callid, source, payload, action, skip_reason, queued, signature_valid)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                """,
                (
                    str(bid),
                    callid or None,
                    (source or None),
                    json.dumps(payload or {}),
                    action,
                    reason,
                    1 if queued else 0,
                    1 if signature_valid else 0,
                ),
            )
            conn.commit()

    def list_webhook_ingest_logs(
        self,
        *,
        bid: Optional[str] = None,
        action: Optional[str] = None,
        skip_reason: Optional[str] = None,
        signature_valid: Optional[bool] = None,
        callid: Optional[str] = None,
        date_from: Optional[str] = None,
        date_to: Optional[str] = None,
        hours: Optional[int] = None,
        limit: int = 50,
        offset: int = 0,
    ) -> Dict[str, Any]:
        """Paginated webhook ingest audit log from pcaa_ingest_log."""
        self.ensure_pcaa_ingest_log_table()
        limit = max(1, min(int(limit or 50), 500))
        offset = max(0, int(offset or 0))

        clauses: List[str] = []
        params: List[Any] = []

        if bid and str(bid).strip():
            clauses.append("bid = %s")
            params.append(str(bid).strip())
        if action and str(action).strip():
            clauses.append("action = %s")
            params.append(str(action).strip())
        if skip_reason and str(skip_reason).strip():
            sr = str(skip_reason).strip()
            clauses.append("(skip_reason = %s OR skip_reason LIKE %s)")
            params.extend([sr, f"{sr}%"])
        if signature_valid is not None:
            clauses.append("signature_valid = %s")
            params.append(1 if signature_valid else 0)
        if callid and str(callid).strip():
            clauses.append("callid LIKE %s")
            params.append(f"%{str(callid).strip()}%")
        if date_from and str(date_from).strip():
            clauses.append("received_at >= %s")
            params.append(f"{str(date_from).strip()} 00:00:00")
        if date_to and str(date_to).strip():
            clauses.append("received_at <= %s")
            params.append(f"{str(date_to).strip()} 23:59:59")
        if hours is not None and int(hours) > 0 and not date_from and not date_to:
            clauses.append("received_at >= DATE_SUB(NOW(), INTERVAL %s HOUR)")
            params.append(int(hours))

        where_sql = f" WHERE {' AND '.join(clauses)}" if clauses else ""

        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"SELECT COUNT(*) AS cnt FROM pcaa_ingest_log{where_sql}",
                params,
            )
            total = int((cursor.fetchone() or {}).get("cnt") or 0)
            cursor.execute(
                f"""
                SELECT id, bid, callid, source, action, skip_reason, queued,
                       signature_valid, received_at, payload
                FROM pcaa_ingest_log
                {where_sql}
                ORDER BY received_at DESC, id DESC
                LIMIT %s OFFSET %s
                """,
                params + [limit, offset],
            )
            rows = cursor.fetchall() or []

        logs = []
        for row in rows:
            payload_raw = row.get("payload")
            if isinstance(payload_raw, str):
                try:
                    payload_obj = json.loads(payload_raw)
                except Exception:
                    payload_obj = payload_raw
            else:
                payload_obj = payload_raw
            logs.append(
                {
                    "id": row.get("id"),
                    "bid": row.get("bid"),
                    "callid": row.get("callid"),
                    "source": row.get("source"),
                    "action": row.get("action"),
                    "skip_reason": row.get("skip_reason"),
                    "queued": bool(int(row.get("queued") or 0)),
                    "signature_valid": bool(int(row.get("signature_valid") or 0)),
                    "received_at": (
                        row.get("received_at").isoformat(sep=" ", timespec="seconds")
                        if hasattr(row.get("received_at"), "isoformat")
                        else str(row.get("received_at") or "")
                    ),
                    "payload": payload_obj,
                }
            )
        return {"logs": logs, "total": total, "limit": limit, "offset": offset}

    # =========================================================================
    # business_agent_config
    # =========================================================================

    def ensure_business_agent_config_table(self) -> None:
        """Create business_agent_config if it doesn't exist."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS business_agent_config (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(20) NOT NULL,
                    agent_name VARCHAR(100) NOT NULL,
                    agent_enabled TINYINT(1) NOT NULL DEFAULT 1,
                    model_provider VARCHAR(50) NOT NULL DEFAULT 'bedrock',
                    model_id VARCHAR(200) NOT NULL DEFAULT 'amazon.nova-lite-v1:0',
                    system_prompt TEXT,
                    user_prompt_template TEXT,
                    output_schema TEXT,
                    temperature FLOAT NOT NULL DEFAULT 0.1,
                    max_tokens INT NOT NULL DEFAULT 4096,
                    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
                    updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uq_bid_agent (bid, agent_name)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
            """)

    def get_agent_configs(self, bid: str) -> list:
        """Return all agent config rows for *bid* (enabled or not), ordered by id."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT * FROM business_agent_config WHERE bid = %s ORDER BY id",
                (str(bid),),
            )
            return cursor.fetchall() or []

    def save_agent_config(self, bid: str, data: dict) -> int:
        """Upsert an agent config; returns the row id."""
        row = dict(data)
        row["bid"] = str(bid)
        for f in ("id", "created_at", "updated_at"):
            row.pop(f, None)

        cols = list(row.keys())
        placeholders = ", ".join(["%s"] * len(cols))
        cols_sql = ", ".join(f"`{c}`" for c in cols)
        update_sql = ", ".join(
            f"`{c}` = VALUES(`{c}`)" for c in cols if c not in ("bid", "agent_name")
        )
        sql = (
            f"INSERT INTO business_agent_config ({cols_sql}) VALUES ({placeholders})"
            f" ON DUPLICATE KEY UPDATE {update_sql}"
        )
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, [row[c] for c in cols])
            return cursor.lastrowid

    def delete_agent_config(self, bid: str, agent_name: str) -> bool:
        """Delete a single agent config; returns True if a row was deleted."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "DELETE FROM business_agent_config WHERE bid = %s AND agent_name = %s",
                (str(bid), agent_name),
            )
            return cursor.rowcount > 0

    # =========================================================================
    # {bid}_call_records — DDL
    # =========================================================================

    def ensure_call_records_table(self, bid: str) -> None:
        """Create {bid}_call_records if it doesn't exist."""
        table = f"{bid}_call_records"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(f"""
                CREATE TABLE IF NOT EXISTS `{table}` (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(20) NOT NULL,
                    callid VARCHAR(255) NOT NULL UNIQUE,
                    file_url TEXT,
                    status ENUM(
                        'pending','transcribing','transcribed',
                        'analyzing','done','failed'
                    ) NOT NULL DEFAULT 'pending',
                    fail_stage VARCHAR(50),
                    fail_reason TEXT,
                    agent_name VARCHAR(255),
                    group_name VARCHAR(255),
                    direction VARCHAR(20),
                    call_start DATETIME,
                    call_end DATETIME,
                    call_duration_s INT,
                    call_status VARCHAR(50),
                    agent_phone VARCHAR(100),
                    customer_phone VARCHAR(100),
                    stt_provider VARCHAR(50),
                    transcript MEDIUMTEXT,
                    speaker_segments JSON,
                    duration_s FLOAT,
                    analysis JSON,
                    quality_score FLOAT,
                    summary TEXT,
                    crm_provider VARCHAR(50),
                    crm_lead_id VARCHAR(100),
                    crm_lead_name VARCHAR(255),
                    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
                    updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    INDEX idx_status (status),
                    INDEX idx_customer_phone (customer_phone),
                    INDEX idx_call_start (call_start),
                    INDEX idx_agent_name (agent_name)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
            """)

    # =========================================================================
    # {bid}_call_records — write operations
    # =========================================================================

    def upsert_call_record(self, bid: str, record: dict) -> None:
        """Insert or update a call record.  ``callid`` is the unique key."""
        table = f"{bid}_call_records"
        row = dict(record)
        row.setdefault("bid", str(bid))

        # Serialise JSON columns
        for col in ("speaker_segments", "analysis"):
            if col in row and not isinstance(row[col], str) and row[col] is not None:
                row[col] = json.dumps(row[col])

        for f in ("id", "created_at", "updated_at"):
            row.pop(f, None)

        cols = list(row.keys())
        placeholders = ", ".join(["%s"] * len(cols))
        cols_sql = ", ".join(f"`{c}`" for c in cols)
        # On duplicate key, don't overwrite transcript/analysis if already set
        update_parts = []
        for c in cols:
            if c in ("callid", "bid"):
                continue
            if c in ("transcript", "speaker_segments", "analysis", "quality_score", "summary"):
                update_parts.append(
                    f"`{c}` = IF(VALUES(`{c}`) IS NOT NULL, VALUES(`{c}`), `{c}`)"
                )
            else:
                update_parts.append(f"`{c}` = VALUES(`{c}`)")
        update_sql = ", ".join(update_parts)

        sql = (
            f"INSERT INTO `{table}` ({cols_sql}) VALUES ({placeholders})"
            f" ON DUPLICATE KEY UPDATE {update_sql}"
        )
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, [row[c] for c in cols])

    def set_call_status(self, bid: str, callid: str, status: str) -> None:
        """Update only the status column for a call record."""
        table = f"{bid}_call_records"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"UPDATE `{table}` SET status = %s WHERE callid = %s",
                (status, callid),
            )

    def fail_call(self, bid: str, callid: str, stage: str, reason: str) -> None:
        """Mark a call as failed with stage and reason."""
        table = f"{bid}_call_records"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"UPDATE `{table}` SET status = 'failed', fail_stage = %s, fail_reason = %s "
                f"WHERE callid = %s",
                (stage, reason[:1000], callid),
            )

    def save_call_transcription(self, bid: str, callid: str, stt_result) -> None:
        """Persist STT result into the call record and advance status to 'transcribed'."""
        from stt import STTResult
        table = f"{bid}_call_records"
        segs_json = json.dumps(stt_result.speaker_segments) if stt_result.speaker_segments else None
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"""UPDATE `{table}`
                    SET stt_provider = %s,
                        transcript = %s,
                        speaker_segments = %s,
                        duration_s = %s,
                        status = 'transcribed',
                        fail_stage = NULL,
                        fail_reason = NULL
                    WHERE callid = %s""",
                (
                    stt_result.provider,
                    stt_result.transcript,
                    segs_json,
                    stt_result.duration,
                    callid,
                ),
            )

    def save_call_analysis(self, bid: str, callid: str, analysis: dict) -> None:
        """Persist agent analysis into the call record and advance status to 'done'."""
        table = f"{bid}_call_records"
        quality_score = analysis.get("quality_score")
        summary = analysis.get("summary")
        analysis_json = json.dumps(analysis)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"""UPDATE `{table}`
                    SET analysis = %s,
                        quality_score = %s,
                        summary = %s,
                        status = 'done',
                        fail_stage = NULL,
                        fail_reason = NULL
                    WHERE callid = %s""",
                (analysis_json, quality_score, summary, callid),
            )

    # =========================================================================
    # {bid}_call_records — read operations
    # =========================================================================

    def get_calls_to_transcribe(
        self,
        bid: str,
        batch: int = 3,
        min_duration_s: int = 0,
        effective_at=None,
    ) -> list:
        """Return call records with status='pending' that have a file URL."""
        table = f"{bid}_call_records"
        duration_clause = ""
        params: list = []
        if min_duration_s > 0:
            if effective_at:
                duration_clause = (
                    "AND (call_start < %s OR COALESCE(call_duration_s, 0) >= %s)"
                )
                params.extend([effective_at, int(min_duration_s)])
            else:
                duration_clause = "AND call_duration_s >= %s"
                params.append(int(min_duration_s))
        params.append(batch)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"""SELECT callid, file_url, call_start, call_duration_s, agent_name, customer_phone
                    FROM `{table}`
                    WHERE status = 'pending'
                      AND file_url IS NOT NULL AND file_url != ''
                      {duration_clause}
                    ORDER BY call_start ASC
                    LIMIT %s""",
                tuple(params),
            )
            return cursor.fetchall() or []

    def get_calls_to_analyze(self, bid: str, batch: int = 3) -> list:
        """Return call records with status='transcribed' ready for agent analysis."""
        table = f"{bid}_call_records"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"""SELECT callid, transcript, speaker_segments,
                           agent_name, customer_phone, call_start, call_duration_s
                    FROM `{table}`
                    WHERE status = 'transcribed'
                      AND transcript IS NOT NULL AND transcript != ''
                    ORDER BY call_start ASC
                    LIMIT %s""",
                (batch,),
            )
            return cursor.fetchall() or []

    def get_call_records_list(
        self,
        bid: str,
        page: int = 1,
        page_size: int = 20,
        status: str | None = None,
        agent_name: str | None = None,
        search: str | None = None,
        date_from: str | None = None,
        date_to: str | None = None,
    ) -> dict:
        """Return paginated call records for the API."""
        table = f"{bid}_call_records"
        conditions = ["1=1"]
        params: list = []

        if status:
            conditions.append("status = %s")
            params.append(status)
        if agent_name:
            conditions.append("agent_name = %s")
            params.append(agent_name)
        if search:
            conditions.append("(customer_phone LIKE %s OR callid LIKE %s OR crm_lead_name LIKE %s)")
            like = f"%{search}%"
            params += [like, like, like]
        if date_from:
            conditions.append("call_start >= %s")
            params.append(date_from)
        if date_to:
            conditions.append("call_start <= %s")
            params.append(date_to)

        where = " AND ".join(conditions)
        offset = (max(page, 1) - 1) * page_size

        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(f"SELECT COUNT(*) AS total FROM `{table}` WHERE {where}", params)
            total = (cursor.fetchone() or {}).get("total", 0)

            cursor.execute(
                f"""SELECT id, callid, bid, status, fail_stage,
                           agent_name, group_name, direction,
                           call_start, call_end, call_duration_s, call_status,
                           agent_phone, customer_phone,
                           stt_provider, duration_s,
                           quality_score, summary,
                           crm_provider, crm_lead_id, crm_lead_name,
                           created_at, updated_at
                    FROM `{table}`
                    WHERE {where}
                    ORDER BY call_start DESC
                    LIMIT %s OFFSET %s""",
                params + [page_size, offset],
            )
            rows = cursor.fetchall() or []

        # Serialise datetimes
        for row in rows:
            for col in ("call_start", "call_end", "created_at", "updated_at"):
                if isinstance(row.get(col), datetime):
                    row[col] = row[col].isoformat()

        return {
            "total": total,
            "page": page,
            "page_size": page_size,
            "pages": max(1, -(-total // page_size)),
            "records": rows,
        }

    def get_call_record_detail(self, bid: str, callid: str) -> dict | None:
        """Return a single call record with full transcript and analysis."""
        table = f"{bid}_call_records"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"SELECT * FROM `{table}` WHERE callid = %s",
                (callid,),
            )
            row = cursor.fetchone()
        if not row:
            return None

        # Deserialise JSON columns
        for col in ("speaker_segments", "analysis"):
            val = row.get(col)
            if isinstance(val, str):
                try:
                    row[col] = json.loads(val)
                except Exception:
                    pass

        # Serialise datetimes
        for col in ("call_start", "call_end", "created_at", "updated_at"):
            if isinstance(row.get(col), datetime):
                row[col] = row[col].isoformat()

        return row

    # ========================================================================
    # Generic API push (per-BID outbound webhook)
    # ========================================================================

    def ensure_api_push_tables(self) -> None:
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS business_api_push_config (
                    bid VARCHAR(50) NOT NULL PRIMARY KEY,
                    is_enabled TINYINT(1) NOT NULL DEFAULT 0,
                    endpoint_url TEXT,
                    http_method VARCHAR(10) NOT NULL DEFAULT 'POST',
                    timeout_seconds INT NOT NULL DEFAULT 8,
                    auth_type VARCHAR(20) NOT NULL DEFAULT 'none',
                    auth_token_enc TEXT,
                    api_key_name VARCHAR(100) DEFAULT NULL,
                    api_key_value_enc TEXT,
                    custom_headers JSON DEFAULT NULL,
                    mapping_key VARCHAR(20) NOT NULL DEFAULT 'phone',
                    mapping_location VARCHAR(20) NOT NULL DEFAULT 'query',
                    mapping_name VARCHAR(100) DEFAULT NULL,
                    selected_fields JSON DEFAULT NULL,
                    field_mappings JSON DEFAULT NULL,
                    include_empty_fields TINYINT(1) NOT NULL DEFAULT 0,
                    payload_format VARCHAR(30) NOT NULL DEFAULT 'flat_json',
                    payload_wrapper_key VARCHAR(100) NOT NULL DEFAULT 'data',
                    payload_template MEDIUMTEXT,
                    content_type VARCHAR(100) NOT NULL DEFAULT 'application/json',
                    api_push_effective_at DATETIME DEFAULT NULL,
                    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
                    updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
                """
            )
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS api_push_log (
                    id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    callid VARCHAR(100) DEFAULT NULL,
                    trigger_event VARCHAR(50) DEFAULT NULL,
                    mapping_key VARCHAR(50) DEFAULT NULL,
                    mapping_value VARCHAR(255) DEFAULT NULL,
                    endpoint_url TEXT,
                    http_method VARCHAR(10) DEFAULT NULL,
                    payload JSON DEFAULT NULL,
                    response_status INT DEFAULT NULL,
                    response_body TEXT,
                    success TINYINT(1) NOT NULL DEFAULT 0,
                    error_message TEXT,
                    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
                    KEY idx_api_push_log_bid_created (bid, created_at),
                    KEY idx_api_push_log_call (bid, callid, success)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
                """
            )
            conn.commit()

    @staticmethod
    def _api_push_config_defaults() -> dict:
        return {
            "is_enabled": 0,
            "endpoint_url": "",
            "http_method": "POST",
            "timeout_seconds": 8,
            "auth_type": "none",
            "mapping_key": "phone",
            "mapping_location": "query",
            "mapping_name": "",
            "selected_fields": ["summary"],
            "field_mappings": {"summary": "summary"},
            "custom_headers": {},
            "include_empty_fields": 0,
            "payload_format": "flat_json",
            "payload_wrapper_key": "data",
            "payload_template": "",
            "content_type": "application/json",
        }

    @staticmethod
    def _decode_json_field(value, fallback):
        if value is None or value == "":
            return fallback
        if isinstance(value, (dict, list)):
            return value
        try:
            return json.loads(value)
        except Exception:
            return fallback

    def _serialize_api_push_config(self, row: dict, *, include_secrets: bool = False) -> dict:
        if not row:
            return {**self._api_push_config_defaults(), "bid": None}
        out = {
            "bid": row.get("bid"),
            "is_enabled": bool(int(row.get("is_enabled") or 0)),
            "endpoint_url": row.get("endpoint_url") or "",
            "http_method": row.get("http_method") or "POST",
            "timeout_seconds": int(row.get("timeout_seconds") or 8),
            "auth_type": row.get("auth_type") or "none",
            "api_key_name": row.get("api_key_name") or "",
            "mapping_key": row.get("mapping_key") or "phone",
            "mapping_location": row.get("mapping_location") or "query",
            "mapping_name": row.get("mapping_name") or "",
            "selected_fields": self._decode_json_field(row.get("selected_fields"), ["summary"]),
            "field_mappings": self._decode_json_field(row.get("field_mappings"), {"summary": "summary"}),
            "custom_headers": self._decode_json_field(row.get("custom_headers"), {}),
            "include_empty_fields": bool(int(row.get("include_empty_fields") or 0)),
            "payload_format": row.get("payload_format") or "flat_json",
            "payload_wrapper_key": row.get("payload_wrapper_key") or "data",
            "payload_template": row.get("payload_template") or "",
            "content_type": row.get("content_type") or "application/json",
            "api_push_effective_at": _serialize_db_datetime(row.get("api_push_effective_at")),
            "has_auth_token": bool(row.get("auth_token_enc")),
            "has_api_key_value": bool(row.get("api_key_value_enc")),
        }
        if include_secrets:
            out["auth_token"] = self._decrypt_text(row.get("auth_token_enc")) or ""
            out["api_key_value"] = self._decrypt_text(row.get("api_key_value_enc")) or ""
        return out

    def get_api_push_config(self, bid: str, *, include_secrets: bool = False) -> dict | None:
        self.ensure_api_push_tables()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT * FROM business_api_push_config WHERE bid = %s",
                (str(bid),),
            )
            row = cursor.fetchone()
        if not row:
            return None
        return self._serialize_api_push_config(row, include_secrets=include_secrets)

    def save_api_push_config(self, bid: str, payload: dict) -> dict:
        self.ensure_api_push_tables()
        bid = str(bid).strip()
        existing_row = None
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT * FROM business_api_push_config WHERE bid = %s",
                (bid,),
            )
            existing_row = cursor.fetchone()

        existing = self._serialize_api_push_config(existing_row, include_secrets=True) if existing_row else {}
        defaults = self._api_push_config_defaults()
        merged = {**defaults, **(payload or {})}

        new_enabled = bool(merged.get("is_enabled"))
        old_enabled = bool(existing.get("is_enabled"))
        now = datetime.now().replace(microsecond=0)
        effective_at = (existing_row or {}).get("api_push_effective_at")
        if new_enabled and (not old_enabled or not effective_at):
            effective_at = now

        auth_token_enc = (existing_row or {}).get("auth_token_enc")
        if "auth_token" in (payload or {}):
            token = str(payload.get("auth_token") or "").strip()
            auth_token_enc = self._encrypt_text(token) if token else None

        api_key_value_enc = (existing_row or {}).get("api_key_value_enc")
        if "api_key_value" in (payload or {}):
            key_val = str(payload.get("api_key_value") or "").strip()
            api_key_value_enc = self._encrypt_text(key_val) if key_val else None

        row = {
            "bid": bid,
            "is_enabled": 1 if new_enabled else 0,
            "endpoint_url": str(merged.get("endpoint_url") or "").strip(),
            "http_method": str(merged.get("http_method") or "POST").upper(),
            "timeout_seconds": max(1, min(int(merged.get("timeout_seconds") or 8), 30)),
            "auth_type": str(merged.get("auth_type") or "none"),
            "auth_token_enc": auth_token_enc,
            "api_key_name": str(merged.get("api_key_name") or "").strip() or None,
            "api_key_value_enc": api_key_value_enc,
            "custom_headers": json.dumps(merged.get("custom_headers") or {}),
            "mapping_key": str(merged.get("mapping_key") or "phone"),
            "mapping_location": str(merged.get("mapping_location") or "query"),
            "mapping_name": str(merged.get("mapping_name") or "").strip() or None,
            "selected_fields": json.dumps(merged.get("selected_fields") or ["summary"]),
            "field_mappings": json.dumps(merged.get("field_mappings") or {"summary": "summary"}),
            "include_empty_fields": 1 if merged.get("include_empty_fields") else 0,
            "payload_format": str(merged.get("payload_format") or "flat_json"),
            "payload_wrapper_key": str(merged.get("payload_wrapper_key") or "data").strip() or "data",
            "payload_template": str(merged.get("payload_template") or ""),
            "content_type": str(merged.get("content_type") or "application/json").strip() or "application/json",
            "api_push_effective_at": effective_at,
        }

        cols = list(row.keys())
        placeholders = ", ".join(["%s"] * len(cols))
        cols_sql = ", ".join(f"`{c}`" for c in cols)
        update_sql = ", ".join(f"`{c}` = VALUES(`{c}`)" for c in cols if c != "bid")
        sql = (
            f"INSERT INTO business_api_push_config ({cols_sql}) VALUES ({placeholders}) "
            f"ON DUPLICATE KEY UPDATE {update_sql}"
        )
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, list(row.values()))
            conn.commit()
        return self.get_api_push_config(bid) or {}

    def log_api_push_attempt(
        self,
        *,
        bid: str,
        callid: str | None,
        trigger_event: str | None,
        mapping_key: str | None = None,
        mapping_value: str | None = None,
        endpoint_url: str | None = None,
        http_method: str | None = None,
        payload: dict | None = None,
        response_status: int | None = None,
        response_body: str | None = None,
        success: bool = False,
        error_message: str | None = None,
    ) -> None:
        self.ensure_api_push_tables()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                INSERT INTO api_push_log
                (bid, callid, trigger_event, mapping_key, mapping_value, endpoint_url,
                 http_method, payload, response_status, response_body, success, error_message)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                """,
                (
                    str(bid),
                    callid,
                    trigger_event,
                    mapping_key,
                    mapping_value,
                    endpoint_url,
                    http_method,
                    json.dumps(payload or {}, default=str),
                    response_status,
                    (response_body or "")[:5000] or None,
                    1 if success else 0,
                    (error_message or "")[:1000] or None,
                ),
            )
            conn.commit()

    def get_api_push_logs(self, bid: str, limit: int = 25) -> list:
        self.ensure_api_push_tables()
        limit = max(1, min(int(limit or 25), 200))
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                SELECT id, bid, callid, trigger_event, mapping_key, mapping_value,
                       endpoint_url, http_method, payload, response_status, response_body,
                       success, error_message, created_at
                FROM api_push_log
                WHERE bid = %s
                ORDER BY id DESC
                LIMIT %s
                """,
                (str(bid), limit),
            )
            rows = cursor.fetchall() or []
        for row in rows:
            row["success"] = bool(int(row.get("success") or 0))
            if isinstance(row.get("created_at"), datetime):
                row["created_at"] = _serialize_db_datetime(row["created_at"])
            payload = row.get("payload")
            if isinstance(payload, str):
                try:
                    row["payload"] = json.loads(payload)
                except Exception:
                    pass
        return rows

    def api_push_already_succeeded(self, bid: str, callid: str) -> bool:
        self.ensure_api_push_tables()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                SELECT 1 FROM api_push_log
                WHERE bid = %s AND callid = %s AND success = 1
                LIMIT 1
                """,
                (str(bid), str(callid)),
            )
            return cursor.fetchone() is not None
