import pymysql
from pymysql.cursors import DictCursor
import logging
import json
import base64
import hashlib
from datetime import datetime
from contextlib import contextmanager
from cryptography.fernet import Fernet

logger = logging.getLogger(__name__)


class DatabaseHandler:
    """Handle all database operations for the dashboard"""

    def __init__(self, config):
        self.config = config
        self.db_config = {
            'host': config.get('DB_HOST', '127.0.0.1'),
            'port': config.get('DB_PORT', 3306),
            'user': config.get('DB_USER', 'admin'),
            'password': config.get('DB_PASSWORD', 'mcube@admin123'),
            'database': config.get('DB_NAME', 'voicebot_cluster'),
            'charset': 'utf8mb4',
            'cursorclass': DictCursor,
            'autocommit': True
        }

    def _get_fernet(self):
        secret = str(self.config.get('SECRET_KEY', 'dev-secret-key-change-in-production'))
        key = base64.urlsafe_b64encode(hashlib.sha256(secret.encode('utf-8')).digest())
        return Fernet(key)

    def _encrypt_text(self, value):
        if not value:
            return None
        return self._get_fernet().encrypt(str(value).encode('utf-8')).decode('utf-8')

    def _decrypt_text(self, value):
        if not value:
            return None
        try:
            return self._get_fernet().decrypt(str(value).encode('utf-8')).decode('utf-8')
        except Exception:
            logger.warning("Failed to decrypt CRM secret; data may use legacy format")
            return None

    def _normalize_phone_variants(self, phone):
        digits = "".join(ch for ch in str(phone or "") if ch.isdigit())
        if not digits:
            return []
        core10 = digits[-10:] if len(digits) > 10 else digits
        variants = {digits, f"+{digits}", core10}
        if len(core10) == 10:
            variants.update({f"91{core10}", f"+91{core10}", f"0{core10}"})
        return [v for v in variants if v]

    @contextmanager
    def get_connection(self):
        """Context manager for database connections"""
        conn = None
        try:
            conn = pymysql.connect(**self.db_config)
            yield conn
        except pymysql.Error as e:
            logger.error(f"Database connection error: {e}")
            raise
        finally:
            if conn:
                conn.close()

    def _table_exists(self, cursor, table_name):
        cursor.execute("SHOW TABLES LIKE %s", (table_name,))
        return cursor.fetchone() is not None

    def _column_exists(self, cursor, table_name, column_name):
        cursor.execute(
            "SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS "
            "WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s AND COLUMN_NAME = %s LIMIT 1",
            (table_name, column_name)
        )
        return cursor.fetchone() is not None

    def ensure_crm_integrations_table(self):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS business_crm_integrations (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    provider VARCHAR(50) NOT NULL,
                    access_key_enc LONGTEXT NOT NULL,
                    secret_key_enc LONGTEXT NOT NULL,
                    api_host VARCHAR(255) DEFAULT NULL,
                    is_active BOOLEAN DEFAULT TRUE,
                    config JSON DEFAULT NULL,
                    last_tested_at DATETIME DEFAULT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uniq_bid_provider (bid, provider),
                    INDEX idx_provider_active (provider, is_active)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
                """
            )

    def ensure_crm_leads_cache_table(self):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS crm_leads_cache (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    provider VARCHAR(50) NOT NULL,
                    external_lead_id VARCHAR(128) DEFAULT NULL,
                    lead_name VARCHAR(255) DEFAULT NULL,
                    owner_name VARCHAR(255) DEFAULT NULL,
                    email VARCHAR(255) DEFAULT NULL,
                    phone_primary VARCHAR(32) DEFAULT NULL,
                    phone_variants JSON DEFAULT NULL,
                    lead_status VARCHAR(255) DEFAULT NULL,
                    next_task_due_date VARCHAR(255) DEFAULT NULL,
                    lead_payload JSON DEFAULT NULL,
                    last_synced_at DATETIME NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uniq_bid_provider_external (bid, provider, external_lead_id),
                    INDEX idx_bid_provider (bid, provider),
                    INDEX idx_bid_provider_phone (bid, provider, phone_primary),
                    INDEX idx_last_synced (last_synced_at)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
                """
            )

    def get_crm_integration(self, bid, provider):
        self.ensure_crm_integrations_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                SELECT id, bid, provider, access_key_enc, secret_key_enc, api_host, is_active, config, last_tested_at, created_at, updated_at
                FROM business_crm_integrations
                WHERE bid = %s AND provider = %s
                LIMIT 1
                """,
                (str(bid), str(provider).lower()),
            )
            row = cursor.fetchone()
            if not row:
                return None
            return {
                'id': row['id'],
                'bid': row['bid'],
                'provider': row['provider'],
                'api_host': row.get('api_host'),
                'is_active': bool(row.get('is_active')),
                'config': self._parse_json_field(row.get('config')),
                'last_tested_at': row.get('last_tested_at'),
                'created_at': row.get('created_at'),
                'updated_at': row.get('updated_at'),
                'has_credentials': bool(row.get('access_key_enc') and row.get('secret_key_enc'))
            }

    def get_active_crm_integrations(self, provider=None):
        self.ensure_crm_integrations_table()
        query = """
            SELECT bid, provider, access_key_enc, secret_key_enc, api_host, is_active, config
            FROM business_crm_integrations
            WHERE is_active = 1
        """
        params = []
        if provider:
            query += " AND provider = %s"
            params.append(str(provider).lower())
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(query, params)
            rows = cursor.fetchall() or []
            result = []
            for row in rows:
                result.append({
                    'bid': str(row.get('bid')),
                    'provider': str(row.get('provider') or '').lower(),
                    'access_key': self._decrypt_text(row.get('access_key_enc')),
                    'secret_key': self._decrypt_text(row.get('secret_key_enc')),
                    'api_host': row.get('api_host'),
                    'is_active': bool(row.get('is_active')),
                    'config': self._parse_json_field(row.get('config')) or {},
                })
            return result

    def get_crm_credentials(self, bid, provider):
        self.ensure_crm_integrations_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                SELECT access_key_enc, secret_key_enc, api_host, is_active
                FROM business_crm_integrations
                WHERE bid = %s AND provider = %s
                LIMIT 1
                """,
                (str(bid), str(provider).lower()),
            )
            row = cursor.fetchone()
            if not row:
                return None
            return {
                'access_key': self._decrypt_text(row.get('access_key_enc')),
                'secret_key': self._decrypt_text(row.get('secret_key_enc')),
                'api_host': row.get('api_host'),
                'is_active': bool(row.get('is_active')),
            }

    def upsert_crm_integration(self, bid, provider, access_key, secret_key, api_host=None, is_active=True, config=None):
        self.ensure_crm_integrations_table()
        provider = str(provider).lower()
        access_key_enc = self._encrypt_text(access_key)
        secret_key_enc = self._encrypt_text(secret_key)
        config_json = json.dumps(config or {}, ensure_ascii=True)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                INSERT INTO business_crm_integrations (
                    bid, provider, access_key_enc, secret_key_enc, api_host, is_active, config
                ) VALUES (%s, %s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    access_key_enc = VALUES(access_key_enc),
                    secret_key_enc = VALUES(secret_key_enc),
                    api_host = VALUES(api_host),
                    is_active = VALUES(is_active),
                    config = VALUES(config),
                    updated_at = CURRENT_TIMESTAMP
                """,
                (
                    str(bid),
                    provider,
                    access_key_enc,
                    secret_key_enc,
                    (api_host or None),
                    1 if is_active else 0,
                    config_json,
                ),
            )

    def delete_crm_integration(self, bid, provider):
        self.ensure_crm_integrations_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "DELETE FROM business_crm_integrations WHERE bid = %s AND provider = %s",
                (str(bid), str(provider).lower()),
            )
            return cursor.rowcount > 0

    def mark_crm_integration_tested(self, bid, provider):
        self.ensure_crm_integrations_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                UPDATE business_crm_integrations
                SET last_tested_at = NOW()
                WHERE bid = %s AND provider = %s
                """,
                (str(bid), str(provider).lower()),
            )

    def upsert_crm_lead_cache(
        self,
        bid,
        provider,
        external_lead_id,
        lead_name=None,
        owner_name=None,
        email=None,
        phone_primary=None,
        phone_variants=None,
        lead_status=None,
        next_task_due_date=None,
        lead_payload=None,
        last_synced_at=None,
    ):
        self.ensure_crm_leads_cache_table()
        provider = str(provider).lower()
        safe_lead_id = str(external_lead_id or "").strip()
        if not safe_lead_id:
            return False

        variants = phone_variants or self._normalize_phone_variants(phone_primary)
        variants_json = json.dumps(sorted(list({str(v) for v in variants if str(v).strip()})), ensure_ascii=True)
        payload_json = json.dumps(lead_payload or {}, ensure_ascii=True)
        sync_dt = last_synced_at or datetime.utcnow()

        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                INSERT INTO crm_leads_cache (
                    bid, provider, external_lead_id, lead_name, owner_name, email,
                    phone_primary, phone_variants, lead_status, next_task_due_date,
                    lead_payload, last_synced_at
                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    lead_name = VALUES(lead_name),
                    owner_name = VALUES(owner_name),
                    email = VALUES(email),
                    phone_primary = VALUES(phone_primary),
                    phone_variants = VALUES(phone_variants),
                    lead_status = VALUES(lead_status),
                    next_task_due_date = VALUES(next_task_due_date),
                    lead_payload = VALUES(lead_payload),
                    last_synced_at = VALUES(last_synced_at),
                    updated_at = CURRENT_TIMESTAMP
                """,
                (
                    str(bid),
                    provider,
                    safe_lead_id,
                    (str(lead_name).strip() if lead_name is not None else None),
                    (str(owner_name).strip() if owner_name is not None else None),
                    (str(email).strip() if email is not None else None),
                    (str(phone_primary).strip() if phone_primary is not None else None),
                    variants_json,
                    (str(lead_status).strip() if lead_status is not None else None),
                    (str(next_task_due_date).strip() if next_task_due_date is not None else None),
                    payload_json,
                    sync_dt,
                ),
            )
        return True

    def get_cached_crm_lead_by_phone(self, bid, provider, phone):
        self.ensure_crm_leads_cache_table()
        provider = str(provider).lower()
        variants = self._normalize_phone_variants(phone)
        if not variants:
            return None

        with self.get_connection() as conn:
            cursor = conn.cursor()
            for variant in variants:
                cursor.execute(
                    """
                    SELECT
                        id, bid, provider, external_lead_id, lead_name, owner_name, email,
                        phone_primary, phone_variants, lead_status, next_task_due_date,
                        lead_payload, last_synced_at
                    FROM crm_leads_cache
                    WHERE bid = %s
                      AND provider = %s
                      AND (
                          phone_primary = %s
                          OR JSON_CONTAINS(phone_variants, JSON_QUOTE(%s))
                      )
                    ORDER BY last_synced_at DESC, updated_at DESC
                    LIMIT 1
                    """,
                    (str(bid), provider, str(variant), str(variant)),
                )
                row = cursor.fetchone()
                if row:
                    return {
                        'id': row.get('id'),
                        'bid': row.get('bid'),
                        'provider': row.get('provider'),
                        'external_lead_id': row.get('external_lead_id'),
                        'lead_name': row.get('lead_name'),
                        'owner_name': row.get('owner_name'),
                        'email': row.get('email'),
                        'phone_primary': row.get('phone_primary'),
                        'phone_variants': self._parse_json_field(row.get('phone_variants')) or [],
                        'lead_status': row.get('lead_status'),
                        'next_task_due_date': row.get('next_task_due_date'),
                        'lead_payload': self._parse_json_field(row.get('lead_payload')) or {},
                        'last_synced_at': row.get('last_synced_at'),
                    }
        return None

    # ── CRM Lead Activities ────────────────────────────────────────────────

    def ensure_crm_lead_activities_table(self):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS crm_lead_activities (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    provider VARCHAR(50) NOT NULL,
                    activity_id VARCHAR(128) NOT NULL,
                    lead_id VARCHAR(128) DEFAULT NULL,
                    event_code VARCHAR(50) DEFAULT NULL,
                    event_name VARCHAR(255) DEFAULT NULL,
                    activity_created_on DATETIME DEFAULT NULL,
                    activity_modified_on DATETIME DEFAULT NULL,
                    activity_data JSON DEFAULT NULL,
                    activity_fields JSON DEFAULT NULL,
                    last_synced_at DATETIME NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uniq_bid_provider_activity (bid, provider, activity_id),
                    INDEX idx_bid_provider (bid, provider),
                    INDEX idx_lead_id (bid, provider, lead_id),
                    INDEX idx_created_on (bid, provider, activity_created_on)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
                """
            )

    def upsert_crm_lead_activity(
        self,
        bid,
        provider,
        activity_id,
        lead_id=None,
        event_code=None,
        event_name=None,
        activity_created_on=None,
        activity_modified_on=None,
        activity_data=None,
        activity_fields=None,
    ):
        self.ensure_crm_lead_activities_table()
        provider = str(provider).lower()
        safe_activity_id = str(activity_id or "").strip()
        if not safe_activity_id:
            return False
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                INSERT INTO crm_lead_activities (
                    bid, provider, activity_id, lead_id, event_code, event_name,
                    activity_created_on, activity_modified_on,
                    activity_data, activity_fields, last_synced_at
                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())
                ON DUPLICATE KEY UPDATE
                    lead_id = VALUES(lead_id),
                    event_code = VALUES(event_code),
                    event_name = VALUES(event_name),
                    activity_created_on = VALUES(activity_created_on),
                    activity_modified_on = VALUES(activity_modified_on),
                    activity_data = VALUES(activity_data),
                    activity_fields = VALUES(activity_fields),
                    last_synced_at = NOW(),
                    updated_at = CURRENT_TIMESTAMP
                """,
                (
                    str(bid),
                    provider,
                    safe_activity_id,
                    (str(lead_id) if lead_id else None),
                    (str(event_code) if event_code else None),
                    (str(event_name) if event_name else None),
                    activity_created_on,
                    activity_modified_on,
                    json.dumps(activity_data, ensure_ascii=True) if activity_data is not None else None,
                    json.dumps(activity_fields, ensure_ascii=True) if activity_fields is not None else None,
                ),
            )
        return True

    # ── Sync Watermarks ───────────────────────────────────────────────────

    def ensure_sync_watermarks_table(self):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS sync_watermarks (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(50) NOT NULL,
                    sync_type VARCHAR(100) NOT NULL,
                    watermark DATETIME NOT NULL,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uniq_bid_type (bid, sync_type)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
                """
            )

    def get_sync_watermark(self, bid, sync_type):
        self.ensure_sync_watermarks_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT watermark FROM sync_watermarks WHERE bid = %s AND sync_type = %s LIMIT 1",
                (str(bid), str(sync_type)),
            )
            row = cursor.fetchone()
            return row["watermark"] if row else None

    def set_sync_watermark(self, bid, sync_type, watermark):
        self.ensure_sync_watermarks_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                INSERT INTO sync_watermarks (bid, sync_type, watermark)
                VALUES (%s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    watermark = VALUES(watermark),
                    updated_at = CURRENT_TIMESTAMP
                """,
                (str(bid), str(sync_type), watermark),
            )

    # ── Phone Set Helper ──────────────────────────────────────────────────

    def get_lead_phone_set(self, bid, provider):
        """Return a flat set of all phone variant strings for a bid/provider from crm_leads_cache."""
        self.ensure_crm_leads_cache_table()
        provider = str(provider).lower()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT phone_primary, phone_variants FROM crm_leads_cache WHERE bid = %s AND provider = %s",
                (str(bid), provider),
            )
            rows = cursor.fetchall() or []
        phone_set = set()
        for row in rows:
            primary = str(row.get("phone_primary") or "").strip()
            if primary:
                phone_set.add(primary)
            variants = self._parse_json_field(row.get("phone_variants")) or []
            for v in variants:
                v = str(v).strip()
                if v:
                    phone_set.add(v)
        return phone_set

    def get_crm_enrichment_for_phones(self, bid, provider, phones):
        """Batch-fetch CRM lead info for a list of phone strings.

        Returns a dict mapping each input phone → {lead_name, lead_status,
        next_task_due_date, crm_owner_name} by matching against phone_primary
        and phone_variants in crm_leads_cache.  Phones that have no CRM match
        are absent from the returned dict.

        Works for any number of customers (bid-scoped) and any CRM provider.
        """
        if not phones:
            return {}
        provider = str(provider).lower()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                SELECT phone_primary, phone_variants, lead_name, owner_name,
                       lead_status, next_task_due_date
                FROM crm_leads_cache
                WHERE bid = %s AND provider = %s
                """,
                (str(bid), provider),
            )
            rows = cursor.fetchall() or []

        # Build variant → enrichment mapping (any variant of a lead maps to it)
        variant_map = {}
        for row in rows:
            info = {
                "lead_name": row.get("lead_name"),
                "lead_status": row.get("lead_status"),
                "next_task_due_date": row.get("next_task_due_date"),
                "crm_owner_name": row.get("owner_name"),
            }
            primary = str(row.get("phone_primary") or "").strip()
            if primary:
                variant_map[primary] = info
            for v in self._parse_json_field(row.get("phone_variants")) or []:
                v = str(v).strip()
                if v:
                    variant_map[v] = info

        result = {}
        for phone in phones:
            if not phone:
                continue
            # Direct hit first
            if phone in variant_map:
                result[phone] = variant_map[phone]
                continue
            # Try normalised variants of the input phone
            for variant in self._normalize_phone_variants(phone):
                if variant in variant_map:
                    result[phone] = variant_map[variant]
                    break
        return result

    def ensure_call_sync_cache_table(self):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS call_sync_cache (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    cache_key VARCHAR(255) NOT NULL,
                    bid VARCHAR(50) NOT NULL,
                    source VARCHAR(50) NOT NULL,
                    payload JSON NOT NULL,
                    expires_at DATETIME NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uniq_cache_key (cache_key),
                    INDEX idx_bid_source (bid, source),
                    INDEX idx_expires_at (expires_at)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
                """
            )

    def get_call_sync_cache(self, cache_key):
        self.ensure_call_sync_cache_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                SELECT payload
                FROM call_sync_cache
                WHERE cache_key = %s
                  AND expires_at > NOW()
                LIMIT 1
                """,
                (str(cache_key),),
            )
            row = cursor.fetchone()
            if not row:
                return None
            return self._parse_json_field(row.get('payload')) or None

    def upsert_call_sync_cache(self, cache_key, bid, source, payload, ttl_seconds):
        self.ensure_call_sync_cache_table()
        safe_ttl = max(1, int(ttl_seconds))
        payload_json = json.dumps(payload or {}, ensure_ascii=True)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                INSERT INTO call_sync_cache (
                    cache_key, bid, source, payload, expires_at
                ) VALUES (
                    %s, %s, %s, %s, DATE_ADD(NOW(), INTERVAL %s SECOND)
                )
                ON DUPLICATE KEY UPDATE
                    bid = VALUES(bid),
                    source = VALUES(source),
                    payload = VALUES(payload),
                    expires_at = VALUES(expires_at),
                    updated_at = CURRENT_TIMESTAMP
                """,
                (
                    str(cache_key),
                    str(bid),
                    str(source),
                    payload_json,
                    safe_ttl,
                ),
            )

    def prune_expired_call_sync_cache(self):
        self.ensure_call_sync_cache_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("DELETE FROM call_sync_cache WHERE expires_at <= NOW()")
            return int(cursor.rowcount or 0)

    def _parse_json_field(self, value):
        """Parse JSON string fields"""
        if not value:
            return None
        if isinstance(value, str):
            try:
                return json.loads(value)
            except json.JSONDecodeError:
                return value
        return value

    def _resolve_parameter_scores(self, row):
        """Return parameter_scores from the column, or rebuild from raw_response if the column is NULL."""
        ps = self._parse_json_field(row.get('parameter_scores'))
        if ps:
            return ps
        # Fallback: extract from raw_response.parameters
        raw = self._parse_json_field(row.get('raw_response'))
        if not raw or not isinstance(raw, dict):
            return None
        parameters = raw.get('parameters')
        if not isinstance(parameters, dict):
            return None
        # Rebuild parameter_scores dict from raw_response structure
        rebuilt = {}
        for param_name, data in parameters.items():
            if not isinstance(data, dict):
                continue
            applicable = data.get('applicable', True)
            if applicable:
                rebuilt[param_name] = {
                    'score': data.get('score', 0),
                    'max_score': data.get('max_score', data.get('maxScore', 0)),
                    'applicable': True,
                    'reasoning': data.get('reasoning', '')
                }
            else:
                rebuilt[param_name] = {
                    'score': None,
                    'max_score': data.get('max_score', data.get('maxScore', 0)),
                    'applicable': False,
                    'reason': data.get('reasoning', data.get('reason', 'Not applicable'))
                }
        return rebuilt if rebuilt else None

    def _format_call_record(self, record):
        """Format call record with proper JSON parsing"""
        if not record:
            return None

        # Parse JSON fields
        json_fields = ['keywords', 'sentiments', 'emotions', 'customer_details']
        for field in json_fields:
            if field in record and record[field]:
                record[field] = self._parse_json_field(record[field])

        return record

    # ========================================================================
    # BUSINESS OPERATIONS
    # ========================================================================

    def get_all_businesses(self):
        """Get list of all businesses with their call counts"""
        # This assumes you have a businesses table or can derive from call tables
        # If you only have dynamic tables, we'll scan for tables matching pattern {bid}_calls

        with self.get_connection() as conn:
            cursor = conn.cursor()

            # Get all tables matching pattern %_calls
            cursor.execute("SHOW TABLES LIKE '%_calls'")
            tables = cursor.fetchall()

            businesses = []
            for table in tables:
                table_name = list(table.values())[0]
                bid = table_name.replace('_calls', '')

                # Get count of calls
                cursor.execute(f"SELECT COUNT(*) as count FROM `{table_name}`")
                count_result = cursor.fetchone()
                count = count_result['count'] if count_result else 0

                # Try to get business name (if you have a businesses table)
                # For now, using BID as name
                businesses.append({
                    'bid': bid,
                    'name': f'Business {bid}',
                    'totalCalls': count,
                    'callsTable': table_name
                })

            return businesses

    def get_business_info(self, bid):
        """Get detailed information for a specific business"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            table_name = f"{bid}_calls"

            # Check if table exists
            cursor.execute(f"SHOW TABLES LIKE '{table_name}'")
            if not cursor.fetchone():
                return None

            # Get statistics
            query = f"""
                SELECT
                    COUNT(*) as total_calls,
                    COUNT(CASE WHEN status = 0 THEN 1 END) as unprocessed,
                    COUNT(CASE WHEN status = 1 THEN 1 END) as transcribed,
                    COUNT(CASE WHEN status = 2 THEN 1 END) as analyzed,
                    COUNT(CASE WHEN status = 3 THEN 1 END) as message_sent,
                    AVG(duration) as avg_duration,
                    MIN(call_starttime) as first_call_starttime,
                    MAX(call_starttime) as last_call_starttime
                FROM `{table_name}`
            """

            cursor.execute(query)
            stats = cursor.fetchone()

            return {
                'bid': bid,
                'name': f'Business {bid}',
                'statistics': stats
            }

    def get_all_groupnames(self, bid):
        """Get list of all groupnames for specified business with their call counts"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            # Get distinct groupnames with counts from {bid}_raw_calls
            query = f"""
                SELECT
                    groupname,
                    COUNT(*) as totalCalls
                FROM `{bid}_raw_calls`
                WHERE groupname IS NOT NULL AND groupname != ''
                GROUP BY groupname
                ORDER BY totalCalls DESC
            """

            cursor.execute(query)
            results = cursor.fetchall()

            groupnames = []
            for row in results:
                groupnames.append({
                    'groupname': row['groupname'],
                    'totalCalls': row['totalCalls']
                })

            return groupnames

    def get_group_agent_customer_rows(self, bid, customer_numbers, groupname=None):
        """Get grouped rows for customer->group->agent mapping."""
        if not customer_numbers:
            return []

        normalized_numbers = [str(v).strip() for v in customer_numbers if str(v).strip()]
        if not normalized_numbers:
            return []

        with self.get_connection() as conn:
            cursor = conn.cursor()
            table_name = f"{bid}_raw_calls"
            if not self._table_exists(cursor, table_name):
                return []

            placeholders = ", ".join(["%s"] * len(normalized_numbers))
            params = list(normalized_numbers)

            where_clauses = [
                f"r.customer_callinfo IN ({placeholders})",
                "r.groupname IS NOT NULL",
                "TRIM(r.groupname) != ''",
                "r.agentname IS NOT NULL",
                "TRIM(r.agentname) != ''",
            ]
            if groupname:
                where_clauses.append("r.groupname = %s")
                params.append(groupname)

            where_sql = " AND ".join(where_clauses)
            query = f"""
                SELECT
                    r.groupname,
                    r.agentname,
                    r.customer_callinfo,
                    COUNT(*) AS total_calls,
                    MAX(r.call_starttime) AS last_call
                FROM `{table_name}` r
                WHERE {where_sql}
                GROUP BY r.groupname, r.agentname, r.customer_callinfo
                ORDER BY total_calls DESC, last_call DESC
            """
            cursor.execute(query, params)
            return cursor.fetchall()

    def get_location_stats(self, bid, groupname=None):
        """Get call statistics for specified business filtered by groupname"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            # Build WHERE clause
            where_clause = "WHERE 1=1"
            params = []

            if groupname:
                where_clause += " AND r.groupname = %s"
                params.append(groupname)

            # Get comprehensive statistics
            query = f"""
                SELECT
                    COUNT(*) as total_calls,

                    -- Inbound statistics
                    SUM(CASE WHEN LOWER(r.direction) = 'inbound' THEN 1 ELSE 0 END) as inbound_total,
                    SUM(CASE WHEN LOWER(r.direction) = 'inbound' AND r.call_status = 'ANSWER' THEN 1 ELSE 0 END) as inbound_answered,
                    SUM(CASE WHEN LOWER(r.direction) = 'inbound' AND r.call_status = 'BUSY' THEN 1 ELSE 0 END) as inbound_busy,
                    SUM(CASE WHEN LOWER(r.direction) = 'inbound' AND r.call_status = 'CANCEL' THEN 1 ELSE 0 END) as inbound_cancel,
                    SUM(CASE WHEN LOWER(r.direction) = 'inbound' AND r.call_status = 'NOANSWER' THEN 1 ELSE 0 END) as inbound_not_answered,

                    -- Outbound statistics
                    SUM(CASE WHEN LOWER(r.direction) = 'outbound' THEN 1 ELSE 0 END) as outbound_total,
                    SUM(CASE WHEN LOWER(r.direction) = 'outbound' AND r.call_status = 'ANSWER' THEN 1 ELSE 0 END) as outbound_answered,
                    SUM(CASE WHEN LOWER(r.direction) = 'outbound' AND r.call_status = 'BUSY' THEN 1 ELSE 0 END) as outbound_busy,
                    SUM(CASE WHEN LOWER(r.direction) = 'outbound' AND r.call_status = 'CANCEL' THEN 1 ELSE 0 END) as outbound_cancel,
                    SUM(CASE WHEN LOWER(r.direction) = 'outbound' AND r.call_status = 'NOANSWER' THEN 1 ELSE 0 END) as outbound_not_answered,

                    -- Total answered calls
                    SUM(CASE WHEN r.call_status = 'ANSWER' THEN 1 ELSE 0 END) as answered_total,

                    -- Average duration for answered calls (in seconds)
                    AVG(
                        CASE
                            WHEN r.call_status = 'ANSWER'
                                AND r.call_starttime IS NOT NULL
                                AND r.call_endtime IS NOT NULL
                            THEN TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime)
                            ELSE NULL
                        END
                    ) as avg_answered_duration,

                    -- Total duration for answered calls (seconds)
                    SUM(
                        CASE
                            WHEN r.call_status = 'ANSWER'
                                AND r.call_starttime IS NOT NULL
                                AND r.call_endtime IS NOT NULL
                            THEN TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime)
                            ELSE 0
                        END
                    ) as total_answered_duration_seconds,

                    -- Detailed calls over 2 minutes
                    SUM(
                        CASE
                            WHEN r.call_status = 'ANSWER'
                                AND r.call_starttime IS NOT NULL
                                AND r.call_endtime IS NOT NULL
                                AND TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) >= 120
                            THEN 1
                            ELSE 0
                        END
                    ) as detailed_calls_over_2min,

                    -- Analyzed (processed) breakdowns
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'inbound' AND r.call_status = 'ANSWER' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_inbound_answered,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'inbound' AND r.call_status = 'BUSY' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_inbound_busy,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'inbound' AND r.call_status = 'CANCEL' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_inbound_cancel,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'inbound' AND r.call_status = 'NOANSWER' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_inbound_not_answered,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'outbound' AND r.call_status = 'ANSWER' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_outbound_answered,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'outbound' AND r.call_status = 'BUSY' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_outbound_busy,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'outbound' AND r.call_status = 'CANCEL' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_outbound_cancel,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'outbound' AND r.call_status = 'NOANSWER' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_outbound_not_answered,

                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.call_status = 'ANSWER' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_answered_total,

                    AVG(
                        CASE
                            WHEN a.callid IS NOT NULL
                                AND r.call_status = 'ANSWER'
                                AND r.call_starttime IS NOT NULL
                                AND r.call_endtime IS NOT NULL
                            THEN TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime)
                            ELSE NULL
                        END
                    ) as analyzed_avg_answered_duration,

                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL
                                AND r.call_status = 'ANSWER'
                                AND r.call_starttime IS NOT NULL
                                AND r.call_endtime IS NOT NULL
                                AND TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) >= 120
                            THEN 1
                            ELSE 0
                        END
                    ) as analyzed_detailed_calls_over_2min,

                    -- Average talk-listen ratio from analytics
                    AVG(CASE WHEN a.talk_listen_ratio IS NOT NULL THEN a.agent_speak_percentage ELSE NULL END) as avg_agent_talk_percentage,
                    AVG(CASE WHEN a.talk_listen_ratio IS NOT NULL THEN a.customer_speak_percentage ELSE NULL END) as avg_customer_talk_percentage,
                    AVG(CASE WHEN a.talk_listen_ratio IS NOT NULL THEN a.dead_air_percentage ELSE NULL END) as avg_dead_air_percentage,

                    -- Average quality score from analytics
                    AVG(CASE WHEN a.quality_score IS NOT NULL THEN a.quality_score ELSE NULL END) as avg_quality_score,

                    -- Total processed durations (seconds)
                    SUM(
                        CASE
                            WHEN s.transcript IS NOT NULL AND s.transcript != ''
                            THEN COALESCE(s.duration, TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime))
                            ELSE 0
                        END
                    ) as transcribed_duration_seconds,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL
                            THEN COALESCE(s.duration, TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime))
                            ELSE 0
                        END
                    ) as analyzed_duration_seconds,

                    -- Processed calls (analyzed)
                    COUNT(DISTINCT a.callid) as analyzed_total,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'inbound' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_inbound,
                    SUM(
                        CASE
                            WHEN a.callid IS NOT NULL AND r.direction = 'outbound' THEN 1
                            ELSE 0
                        END
                    ) as analyzed_outbound

                FROM `{bid}_raw_calls` r
                LEFT JOIN `{bid}_callanalytics` a ON r.callid = a.callid
                LEFT JOIN `{bid}_sarvamresponse` s ON r.callid = s.callid
                {where_clause}
            """

            cursor.execute(query, params)
            result = cursor.fetchone()

            if not result:
                return {
                    'total_calls': 0,
                    'inbound_total': 0,
                    'inbound_answered': 0,
                    'inbound_busy': 0,
                    'inbound_cancel': 0,
                    'inbound_not_answered': 0,
                    'outbound_total': 0,
                    'outbound_answered': 0,
                    'outbound_busy': 0,
                    'outbound_cancel': 0,
                    'outbound_not_answered': 0,
                    'avg_answered_duration': 0,
                    'avg_agent_talk_percentage': 0,
                    'avg_customer_talk_percentage': 0,
                    'avg_quality_score': 0,
                    'answered_total': 0,
                    'total_answered_duration_seconds': 0,
                    'detailed_calls_over_2min': 0,
                    'detailed_calls_over_2min_percent': 0,
                    'analyzed_inbound_answered': 0,
                    'analyzed_inbound_busy': 0,
                    'analyzed_inbound_cancel': 0,
                    'analyzed_inbound_not_answered': 0,
                    'analyzed_outbound_answered': 0,
                    'analyzed_outbound_busy': 0,
                    'analyzed_outbound_cancel': 0,
                    'analyzed_outbound_not_answered': 0,
                    'analyzed_answered_total': 0,
                    'analyzed_avg_answered_duration': 0,
                    'analyzed_detailed_calls_over_2min': 0,
                    'analyzed_detailed_calls_over_2min_percent': 0,
                    'transcribed_duration_seconds': 0,
                    'analyzed_duration_seconds': 0,
                    'analyzed_total': 0,
                    'analyzed_inbound': 0,
                    'analyzed_outbound': 0
                }

            # Calculate talk-listen ratio in format "X : Y" — always sums to 100.
            #
            # agent_speak_percentage and customer_speak_percentage are each stored as
            # % of *speak time* (they sum to ~100 between the two speakers).
            # dead_air_percentage is stored as % of *total call duration*.
            #
            # To combine them on the same scale (% of total call duration) and then
            # attribute dead air to the agent/talk side:
            #   spoken_fraction = 1 - dead_air / total_call   (the share of call that is speech)
            #   agent_of_call   = agent_speak_pct  * spoken_fraction
            #   customer_of_call = customer_speak_pct * spoken_fraction
            #   talk  = agent_of_call + dead_air_pct   (dead air attributed to agent/talk)
            #   listen = customer_of_call
            #   Normalise so talk + listen = 100 exactly.
            raw_agent    = float(result['avg_agent_talk_percentage']    or 0)   # % of speak time
            raw_customer = float(result['avg_customer_talk_percentage'] or 0)   # % of speak time
            raw_dead_air = float(result['avg_dead_air_percentage']      or 0)   # % of call duration

            spoken_fraction  = max(0.0, 1.0 - raw_dead_air / 100.0)
            agent_of_call    = raw_agent    * spoken_fraction   # agent   % of total call
            customer_of_call = raw_customer * spoken_fraction   # customer % of total call

            talk   = agent_of_call + raw_dead_air   # dead air attributed to talk side
            listen = customer_of_call

            total_parts = talk + listen
            if total_parts > 0:
                agent_pct    = round((talk   / total_parts) * 100)
                customer_pct = 100 - agent_pct
                talk_listen_ratio = f"{agent_pct} : {customer_pct}"
            else:
                agent_pct    = 0
                customer_pct = 0
                talk_listen_ratio = "N/A"

            answered_total = int(result['answered_total'] or 0)
            detailed_calls_over_2min = int(result['detailed_calls_over_2min'] or 0)
            detailed_calls_over_2min_percent = round(
                (detailed_calls_over_2min / answered_total) * 100,
                1
            ) if answered_total > 0 else 0

            analyzed_answered_total = int(result['analyzed_answered_total'] or 0)
            analyzed_detailed_calls_over_2min = int(result['analyzed_detailed_calls_over_2min'] or 0)
            analyzed_detailed_calls_over_2min_percent = round(
                (analyzed_detailed_calls_over_2min / analyzed_answered_total) * 100,
                1
            ) if analyzed_answered_total > 0 else 0

            return {
                'total_calls': int(result['total_calls'] or 0),
                'inbound_total': int(result['inbound_total'] or 0),
                'inbound_answered': int(result['inbound_answered'] or 0),
                'inbound_busy': int(result['inbound_busy'] or 0),
                'inbound_cancel': int(result['inbound_cancel'] or 0),
                'inbound_not_answered': int(result['inbound_not_answered'] or 0),
                'outbound_total': int(result['outbound_total'] or 0),
                'outbound_answered': int(result['outbound_answered'] or 0),
                'outbound_busy': int(result['outbound_busy'] or 0),
                'outbound_cancel': int(result['outbound_cancel'] or 0),
                'outbound_not_answered': int(result['outbound_not_answered'] or 0),
                'avg_answered_duration': int(result['avg_answered_duration'] or 0),
                'total_answered_duration_seconds': int(result['total_answered_duration_seconds'] or 0),
                'avg_agent_talk_percentage': agent_pct,
                'avg_customer_talk_percentage': customer_pct,
                'talk_listen_ratio': talk_listen_ratio,
                'avg_quality_score': round(result['avg_quality_score'] or 0, 2),
                'answered_total': answered_total,
                'detailed_calls_over_2min': detailed_calls_over_2min,
                'detailed_calls_over_2min_percent': detailed_calls_over_2min_percent,
                'analyzed_inbound_answered': int(result['analyzed_inbound_answered'] or 0),
                'analyzed_inbound_busy': int(result['analyzed_inbound_busy'] or 0),
                'analyzed_inbound_cancel': int(result['analyzed_inbound_cancel'] or 0),
                'analyzed_inbound_not_answered': int(result['analyzed_inbound_not_answered'] or 0),
                'analyzed_outbound_answered': int(result['analyzed_outbound_answered'] or 0),
                'analyzed_outbound_busy': int(result['analyzed_outbound_busy'] or 0),
                'analyzed_outbound_cancel': int(result['analyzed_outbound_cancel'] or 0),
                'analyzed_outbound_not_answered': int(result['analyzed_outbound_not_answered'] or 0),
                'analyzed_answered_total': analyzed_answered_total,
                'analyzed_avg_answered_duration': int(result['analyzed_avg_answered_duration'] or 0),
                'analyzed_detailed_calls_over_2min': analyzed_detailed_calls_over_2min,
                'analyzed_detailed_calls_over_2min_percent': analyzed_detailed_calls_over_2min_percent,
                'transcribed_duration_seconds': float(result['transcribed_duration_seconds'] or 0),
                'analyzed_duration_seconds': float(result['analyzed_duration_seconds'] or 0),
                'analyzed_total': int(result['analyzed_total'] or 0),
                'analyzed_inbound': int(result['analyzed_inbound'] or 0),
                'analyzed_outbound': int(result['analyzed_outbound'] or 0)
            }

    def get_filtered_raw_calls(self, bid, groupname=None, direction=None, call_status=None, limit=100, offset=0):
        """Get filtered raw calls from {bid}_raw_calls table with pagination."""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            # Build WHERE clause
            where_clauses = []
            params = []

            if groupname:
                where_clauses.append("r.groupname = %s")
                params.append(groupname)

            if direction:
                where_clauses.append("LOWER(r.direction) = LOWER(%s)")
                params.append(direction)

            if call_status:
                where_clauses.append("r.call_status = %s")
                params.append(call_status)

            where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""

            query = f"""
                SELECT
                    r.callid,
                    r.agentname,
                    r.agent_callinfo,
                    r.call_starttime,
                    r.call_endtime,
                    r.direction,
                    r.call_status,
                    r.groupname,
                    COALESCE(r.duration_seconds, TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime)) as duration_seconds,
                    ca.quality_score,
                    CASE
                        WHEN ca.callid IS NOT NULL THEN 1
                        ELSE 0
                    END as is_analyzed,
                    r.transcription_status,
                    CASE WHEN s.callid IS NOT NULL THEN 1 ELSE 0 END as has_transcript,
                    r.transcription_status,
                    CASE WHEN s.callid IS NOT NULL THEN 1 ELSE 0 END as has_transcript
                FROM `{bid}_raw_calls` r
                LEFT JOIN `{bid}_callanalytics` ca ON r.callid = ca.callid
                LEFT JOIN `{bid}_sarvamresponse` s ON r.callid = s.callid
                {where_sql}
                ORDER BY r.call_starttime DESC
                LIMIT %s OFFSET %s
            """

            params.append(limit)
            params.append(offset)
            cursor.execute(query, params)
            calls = cursor.fetchall()

            return calls

    def get_raw_call_details(self, bid, callid):
        """Get call details from {bid}_raw_calls joined with {bid}_sarvamresponse"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            query = f"""
                SELECT
                    r.callid,
                    r.bid,
                    r.fileurl as fileUrl,
                    r.agentname,
                    r.groupname,
                    r.call_starttime,
                    r.call_endtime,
                    r.call_status,
                    r.agent_callinfo,
                    r.customer_callinfo,
                    r.direction,
                    r.transcription_status,
                    COALESCE(r.duration_seconds, TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime)) as duration_seconds,
                    s.transcript as transcripts,
                    s.speaker_segments,
                    s.num_speakers,
                    s.duration,
                    s.language,
                    s.request_id
                FROM `{bid}_raw_calls` r
                LEFT JOIN `{bid}_sarvamresponse` s ON r.callid = s.callid
                WHERE r.callid = %s
                LIMIT 1
            """

            cursor.execute(query, (callid,))
            call = cursor.fetchone()

            if not call:
                return None

            # Format the call record
            call_dict = dict(call)

            # Set status for compatibility with CallDetail component
            call_dict['status'] = 2 if call_dict['transcripts'] else 0

            return call_dict

    def _get_raw_calls_list(self, cursor, bid, filters=None, limit=100, offset=0):
        raw_table = f"{bid}_raw_calls"
        if not self._table_exists(cursor, raw_table):
            return []

        sarvam_table = f"{bid}_sarvamresponse"
        analytics_table = f"{bid}_callanalytics"
        has_sarvam = self._table_exists(cursor, sarvam_table)
        has_analytics = self._table_exists(cursor, analytics_table)

        joins = []
        status_case = "0"
        quality_score_select = "NULL AS quality_score"
        duration_seconds_select = "TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime)"

        if has_sarvam:
            joins.append(f"LEFT JOIN `{sarvam_table}` s ON r.callid = s.callid")
            status_case = "CASE WHEN s.callid IS NOT NULL THEN 1 ELSE 0 END"
            duration_seconds_select = "COALESCE(s.duration, TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime))"

        if has_analytics:
            joins.append(f"LEFT JOIN `{analytics_table}` a ON r.callid = a.callid")
            quality_score_select = "a.quality_score AS quality_score"
            if has_sarvam:
                status_case = (
                    "CASE WHEN a.callid IS NOT NULL THEN 3 "
                    "WHEN s.callid IS NOT NULL THEN 2 ELSE r.status END"
                )
            else:
                status_case = "CASE WHEN a.callid IS NOT NULL THEN 3 ELSE r.status END"

        where_clauses = []
        params = []

        if filters:
            if 'date_from' in filters:
                where_clauses.append("r.call_starttime >= %s")
                params.append(filters['date_from'])

            if 'date_to' in filters:
                where_clauses.append("r.call_starttime <= %s")
                params.append(filters['date_to'])

            status_filter = filters.get('status')
            if status_filter is not None:
                # Handle Status 3: Analyzed
                if status_filter >= 3:
                    if has_analytics:
                        where_clauses.append("a.callid IS NOT NULL")
                    else:
                        where_clauses.append("0=1")
                # Handle Status 2: Transcribed but not Analyzed
                elif status_filter == 2:
                    if has_sarvam:
                        where_clauses.append("s.callid IS NOT NULL")
                    if has_analytics:
                        where_clauses.append("a.callid IS NULL")
                # Handle Status 1: Queued for Transcription (usually tracked directly in raw_calls)
                elif status_filter == 1:
                    where_clauses.append("r.status = 1")
                    if has_sarvam:
                        where_clauses.append("s.callid IS NULL")
                # Handle Status 0: New/Unprocessed
                else:
                    if has_sarvam:
                        where_clauses.append("s.callid IS NULL")
                    if has_analytics:
                        where_clauses.append("a.callid IS NULL")
                    where_clauses.append("(r.status = 0 OR r.status IS NULL)")

        where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
        join_sql = " ".join(joins)

        query = f"""
            SELECT
                r.callid,
                r.bid,
                r.fileurl as fileUrl,
                r.agentname,
                r.groupname,
                r.call_starttime,
                r.call_endtime,
                r.call_status,
                r.agent_callinfo,
                r.customer_callinfo,
                r.direction,
                CAST(TIME(r.call_starttime) AS CHAR) as call_time,
                {duration_seconds_select} as duration_seconds,
                {quality_score_select},
                {status_case} as status
            FROM `{raw_table}` r
            {join_sql}
            {where_sql}
            ORDER BY r.call_starttime DESC, r.call_endtime DESC
            LIMIT %s OFFSET %s
        """

        params.extend([limit, offset])
        cursor.execute(query, params)
        calls = cursor.fetchall()
        return [self._format_call_record(call) for call in calls]

    def _get_raw_calls_count(self, cursor, bid, filters=None):
        raw_table = f"{bid}_raw_calls"
        if not self._table_exists(cursor, raw_table):
            return 0

        sarvam_table = f"{bid}_sarvamresponse"
        analytics_table = f"{bid}_callanalytics"
        has_sarvam = self._table_exists(cursor, sarvam_table)
        has_analytics = self._table_exists(cursor, analytics_table)

        joins = []
        where_clauses = []
        params = []

        if has_sarvam:
            joins.append(f"LEFT JOIN `{sarvam_table}` s ON r.callid = s.callid")
        if has_analytics:
            joins.append(f"LEFT JOIN `{analytics_table}` a ON r.callid = a.callid")

        if filters:
            if 'date_from' in filters:
                where_clauses.append("r.call_starttime >= %s")
                params.append(filters['date_from'])

            if 'date_to' in filters:
                where_clauses.append("r.call_starttime <= %s")
                params.append(filters['date_to'])

            status_filter = filters.get('status')
            if status_filter is not None:
                # Handle Status 3: Analyzed
                if status_filter >= 3:
                    if has_analytics:
                        where_clauses.append("a.callid IS NOT NULL")
                    else:
                        where_clauses.append("0=1")
                # Handle Status 2: Transcribed but not Analyzed
                elif status_filter == 2:
                    if has_sarvam:
                        where_clauses.append("s.callid IS NOT NULL")
                    if has_analytics:
                        where_clauses.append("a.callid IS NULL")
                # Handle Status 1: Queued for Transcription
                elif status_filter == 1:
                    where_clauses.append("r.status = 1")
                    if has_sarvam:
                        where_clauses.append("s.callid IS NULL")
                # Handle Status 0: New/Unprocessed
                else:
                    if has_sarvam:
                        where_clauses.append("s.callid IS NULL")
                    if has_analytics:
                        where_clauses.append("a.callid IS NULL")
                    where_clauses.append("(r.status = 0 OR r.status IS NULL)")

        where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
        join_sql = " ".join(joins)

        query = f"""
            SELECT COUNT(*) as count
            FROM `{raw_table}` r
            {join_sql}
            {where_sql}
        """

        cursor.execute(query, params)
        result = cursor.fetchone()
        return result['count'] if result else 0

    def _search_raw_calls(self, cursor, bid, query, limit=50):
        raw_table = f"{bid}_raw_calls"
        if not self._table_exists(cursor, raw_table):
            return []

        search_term = f"%{query}%"
        cursor.execute(f"""
            SELECT
                r.callid,
                r.bid,
                r.fileurl as fileUrl,
                r.agentname,
                r.groupname,
                r.call_starttime,
                r.call_endtime,
                r.call_status,
                r.agent_callinfo,
                r.customer_callinfo,
                r.direction,
                CAST(TIME(r.call_starttime) AS CHAR) as call_time,
                COALESCE(r.duration_seconds, TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime)) as duration_seconds
            FROM `{raw_table}` r
            WHERE r.callid LIKE %s
               OR r.agentname LIKE %s
               OR r.customer_callinfo LIKE %s
               OR r.agent_callinfo LIKE %s
            ORDER BY r.call_starttime DESC, r.call_endtime DESC
            LIMIT %s
        """, (search_term, search_term, search_term, search_term, limit))

        calls = cursor.fetchall()
        return [self._format_call_record(call) for call in calls]

    def get_leads_list(self, bid, groupname=None, limit=100, offset=0, transcripts_only=False, direction=None):
        """Get customer-level lead aggregates from raw calls.

        For inbound calls the customer phone is callfrom (agent_callinfo).
        For outbound calls the customer phone is callto (customer_callinfo).
        """
        with self.get_connection() as conn:
            cursor = conn.cursor()

            raw_table = f"{bid}_raw_calls"
            sarvam_table = f"{bid}_sarvamresponse"
            analytics_table = f"{bid}_callanalytics"

            if not self._table_exists(cursor, raw_table):
                return {'leads': [], 'total': 0}

            has_sarvam = self._table_exists(cursor, sarvam_table)
            has_analytics = self._table_exists(cursor, analytics_table)

            # Derive the customer phone based on call direction:
            #   inbound, BID 1713-style: agent_callinfo == LEFT(callid, 10) → use agent_callinfo
            #   inbound, BID 7491-style: agent_callinfo is the agent's own number; customer
            #     phone is encoded as the first 10 digits of the callid → use LEFT(callid, 10)
            #   outbound → customer_callinfo (callto = the called party)
            lead_phone_expr = (
                "CASE WHEN LOWER(r.direction) = 'inbound' THEN "
                "CASE WHEN r.agent_callinfo = LEFT(r.callid, 10) THEN r.agent_callinfo "
                "ELSE LEFT(r.callid, 10) END "
                "ELSE r.customer_callinfo END"
            )

            where_clauses = [
                f"({lead_phone_expr}) IS NOT NULL",
                f"TRIM(({lead_phone_expr})) != ''",
            ]
            params = []

            if direction and direction.lower() in ('inbound', 'outbound'):
                where_clauses.append("LOWER(r.direction) = %s")
                params.append(direction.lower())

            if groupname:
                where_clauses.append("r.groupname = %s")
                params.append(groupname)

            if transcripts_only and has_sarvam:
                where_clauses.append("s.callid IS NOT NULL")
                where_clauses.append("COALESCE(TRIM(s.transcript), '') != ''")

            where_sql = " WHERE " + " AND ".join(where_clauses)
            join_sarvam = f"LEFT JOIN `{sarvam_table}` s ON r.callid = s.callid" if has_sarvam else ""
            join_analytics = f"LEFT JOIN `{analytics_table}` a ON r.callid = a.callid" if has_analytics else ""
            quality_expr = "ROUND(AVG(a.quality_score), 2)" if has_analytics else "NULL"
            transcript_expr = (
                "SUM(CASE WHEN s.callid IS NOT NULL AND COALESCE(TRIM(s.transcript), '') != '' THEN 1 ELSE 0 END)"
                if has_sarvam else
                "0"
            )

            count_query = f"""
                SELECT COUNT(*) AS total
                FROM (
                    SELECT {lead_phone_expr}
                    FROM `{raw_table}` r
                    {join_sarvam}
                    {where_sql}
                    GROUP BY {lead_phone_expr}
                ) t
            """
            cursor.execute(count_query, params)
            total_row = cursor.fetchone() or {'total': 0}
            total = int(total_row.get('total') or 0)

            data_query = f"""
                SELECT
                    {lead_phone_expr} AS lead_phone,
                    COUNT(*) AS conversations,
                    MAX(r.call_starttime) AS last_conversation,
                    SUBSTRING_INDEX(
                        GROUP_CONCAT(COALESCE(r.agentname, '-') ORDER BY r.call_starttime DESC SEPARATOR '||'),
                        '||',
                        1
                    ) AS owner_name,
                    {quality_expr} AS avg_quality_score,
                    SUM(
                        CASE
                            WHEN r.call_starttime IS NOT NULL AND r.call_endtime IS NOT NULL
                            THEN GREATEST(COALESCE(r.duration_seconds, TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime)), 0)
                            ELSE 0
                        END
                    ) AS total_duration_seconds,
                    SUM(CASE WHEN r.call_status = 'ANSWER' THEN 1 ELSE 0 END) AS answered_calls,
                    {transcript_expr} AS transcript_calls
                FROM `{raw_table}` r
                {join_sarvam}
                {join_analytics}
                {where_sql}
                GROUP BY {lead_phone_expr}
                ORDER BY last_conversation DESC
                LIMIT %s OFFSET %s
            """
            cursor.execute(data_query, params + [limit, offset])
            rows = cursor.fetchall()

            leads = []
            for row in rows:
                leads.append({
                    'lead_phone': row.get('lead_phone'),
                    'conversations': int(row.get('conversations') or 0),
                    'last_conversation': row.get('last_conversation'),
                    'owner_name': row.get('owner_name') or '-',
                    'avg_quality_score': float(row.get('avg_quality_score') or 0),
                    'total_duration_seconds': int(row.get('total_duration_seconds') or 0),
                    'answered_calls': int(row.get('answered_calls') or 0),
                    'transcript_calls': int(row.get('transcript_calls') or 0),
                    'lead_name': None,
                    'lead_status': '-',
                    'next_task_due_date': '-',
                })

            # Enrich with CRM lead names / status from crm_leads_cache
            phones = [lead['lead_phone'] for lead in leads if lead.get('lead_phone')]
            if phones:
                crm_map = self.get_crm_enrichment_for_phones(bid, 'leadsquared', phones)
                for lead in leads:
                    phone = lead.get('lead_phone', '')
                    enrichment = crm_map.get(phone, {})
                    lead['lead_name'] = enrichment.get('lead_name') or None
                    if enrichment.get('lead_status'):
                        lead['lead_status'] = enrichment['lead_status']
                    if enrichment.get('next_task_due_date'):
                        lead['next_task_due_date'] = enrichment['next_task_due_date']
                    # Prefer CRM owner over call-derived owner when call owner is absent
                    if lead['owner_name'] == '-' and enrichment.get('crm_owner_name'):
                        lead['owner_name'] = enrichment['crm_owner_name']

            return {'leads': leads, 'total': total}

    def get_lead_details(self, bid, lead_phone, groupname=None):
        """Get customer-level details and call timeline for a given customer phone."""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            raw_table = f"{bid}_raw_calls"
            sarvam_table = f"{bid}_sarvamresponse"
            analytics_table = f"{bid}_callanalytics"

            if not self._table_exists(cursor, raw_table):
                return None

            has_sarvam = self._table_exists(cursor, sarvam_table)
            has_analytics = self._table_exists(cursor, analytics_table)

            # Match calls where the derived customer phone equals lead_phone.
            # Uses the same self-detecting expression as get_leads_list:
            #   inbound, agent_callinfo == LEFT(callid,10): use agent_callinfo (BID 1713 pattern)
            #   inbound, agent_callinfo != LEFT(callid,10): use LEFT(callid,10) (BID 7491 pattern)
            #   outbound: use customer_callinfo
            where_clauses = [
                "CASE WHEN LOWER(r.direction) = 'inbound' THEN "
                "CASE WHEN r.agent_callinfo = LEFT(r.callid, 10) THEN r.agent_callinfo "
                "ELSE LEFT(r.callid, 10) END "
                "ELSE r.customer_callinfo END = %s",
            ]
            params = [lead_phone]
            if groupname:
                where_clauses.append("r.groupname = %s")
                params.append(groupname)

            where_sql = " WHERE " + " AND ".join(where_clauses)
            join_sarvam = f"LEFT JOIN `{sarvam_table}` s ON r.callid = s.callid" if has_sarvam else ""
            join_analytics = f"LEFT JOIN `{analytics_table}` a ON r.callid = a.callid" if has_analytics else ""
            summary_quality_expr = "ROUND(AVG(a.quality_score), 2)" if has_analytics else "NULL"
            talk_expr = "ROUND(AVG(a.agent_speak_percentage), 0)" if has_analytics else "0"
            listen_expr = "ROUND(AVG(a.customer_speak_percentage), 0)" if has_analytics else "0"
            intent_expr = "0"
            call_quality_expr = "a.quality_score" if has_analytics else "NULL"
            call_summary_expr = "a.summary" if has_analytics else "NULL"
            call_purpose_expr = "a.call_purpose" if has_analytics else "NULL"
            call_objections_expr = "a.objections_concerns" if has_analytics else "NULL"
            # parameter_scores / parameter_detections columns may not exist on older tables
            has_param_scores_col = has_analytics and self._column_exists(cursor, analytics_table, 'parameter_scores')
            has_param_detections_col = has_analytics and self._column_exists(cursor, analytics_table, 'parameter_detections')
            has_raw_response_col = has_analytics and self._column_exists(cursor, analytics_table, 'raw_response')
            has_rich_summary_col = has_analytics and self._column_exists(cursor, analytics_table, 'rich_summary')
            call_parameter_scores_expr = "a.parameter_scores" if has_param_scores_col else "NULL"
            call_parameter_detections_expr = "a.parameter_detections" if has_param_detections_col else "NULL"
            call_raw_response_expr = "a.raw_response" if has_raw_response_col else "NULL"
            call_rich_summary_expr = "a.rich_summary" if has_rich_summary_col else "NULL"
            # bant table join for per-call bant profile
            bant_table = f"{bid}_bant"
            has_bant = self._table_exists(cursor, bant_table)
            join_bant = f"LEFT JOIN `{bant_table}` b ON CONVERT(r.callid USING utf8mb4) = CONVERT(b.callid USING utf8mb4)" if has_bant else ""
            bant_profile_expr = "b.profile_json" if has_bant else "NULL"
            bant_summary_expr = "b.profile_summary" if has_bant else "NULL"
            call_has_transcript_expr = (
                "CASE WHEN s.callid IS NOT NULL AND COALESCE(TRIM(s.transcript), '') != '' THEN 1 ELSE 0 END"
                if has_sarvam else
                "0"
            )
            call_transcript_expr = "s.transcript" if has_sarvam else "NULL"
            call_speaker_segments_expr = "s.speaker_segments" if has_sarvam else "NULL"
            call_transcript_duration_expr = "s.duration" if has_sarvam else "NULL"

            summary_query = f"""
                SELECT
                    COUNT(*) AS total_conversations,
                    MAX(r.call_starttime) AS last_conversation,
                    MIN(r.call_starttime) AS first_conversation,
                    SUBSTRING_INDEX(
                        GROUP_CONCAT(COALESCE(r.agentname, '-') ORDER BY r.call_starttime DESC SEPARATOR '||'),
                        '||',
                        1
                    ) AS owner_name,
                    {summary_quality_expr} AS avg_quality_score,
                    {talk_expr} AS avg_talk_pct,
                    {listen_expr} AS avg_listen_pct,
                    {intent_expr} AS avg_intent_score,
                    SUM(
                        CASE
                            WHEN r.call_starttime IS NOT NULL AND r.call_endtime IS NOT NULL
                            THEN GREATEST(COALESCE(r.duration_seconds, TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime)), 0)
                            ELSE 0
                        END
                    ) AS total_duration_seconds
                FROM `{raw_table}` r
                {join_analytics}
                {where_sql}
            """
            cursor.execute(summary_query, params)
            summary = cursor.fetchone()
            if not summary or not summary.get('total_conversations'):
                return None

            calls_query = f"""
                SELECT
                    r.callid,
                    r.fileurl AS file_url,
                    r.call_starttime,
                    r.call_endtime,
                    r.call_status,
                    r.direction,
                    r.agentname,
                    r.groupname,
                    COALESCE(r.duration_seconds, TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime)) AS duration_seconds,
                    {call_has_transcript_expr} AS has_transcript,
                    {call_transcript_expr} AS transcript,
                    {call_speaker_segments_expr} AS speaker_segments,
                    {call_transcript_duration_expr} AS transcript_duration,
                    {call_quality_expr} AS quality_score,
                    {call_summary_expr} AS summary,
                    {call_rich_summary_expr} AS rich_summary,
                    {call_parameter_scores_expr} AS parameter_scores,
                    {call_parameter_detections_expr} AS parameter_detections,
                    {call_purpose_expr} AS call_purpose,
                    {call_objections_expr} AS objections_concerns,
                    {call_raw_response_expr} AS raw_response,
                    {bant_profile_expr} AS bant_profile_json,
                    {bant_summary_expr} AS bant_summary
                FROM `{raw_table}` r
                {join_sarvam}
                {join_analytics}
                {join_bant}
                {where_sql}
                ORDER BY r.call_starttime DESC
            """
            cursor.execute(calls_query, params)
            calls = cursor.fetchall()

            talk_pct = int(summary.get('avg_talk_pct') or 0)
            listen_pct = int(summary.get('avg_listen_pct') or 0)

            # Enrich with CRM lead name / status
            crm_enrichment = self.get_crm_enrichment_for_phones(bid, 'leadsquared', [lead_phone])
            crm_info = crm_enrichment.get(lead_phone, {})
            call_owner = summary.get('owner_name') or '-'

            return {
                'lead_phone': lead_phone,
                'lead_name': crm_info.get('lead_name') or None,
                'lead_status': crm_info.get('lead_status') or '-',
                'next_task_due_date': crm_info.get('next_task_due_date') or '-',
                'owner_name': call_owner if call_owner != '-' else (crm_info.get('crm_owner_name') or '-'),
                'total_conversations': int(summary.get('total_conversations') or 0),
                'first_conversation': summary.get('first_conversation'),
                'last_conversation': summary.get('last_conversation'),
                'avg_quality_score': float(summary.get('avg_quality_score') or 0),
                'talk_listen_ratio': f"{talk_pct}:{listen_pct}" if (talk_pct or listen_pct) else "N/A",
                'intent_score': int(summary.get('avg_intent_score') or 0),
                'total_duration_seconds': int(summary.get('total_duration_seconds') or 0),
                'calls': [
                    {
                        'callid': row.get('callid'),
                        'call_starttime': row.get('call_starttime'),
                        'call_endtime': row.get('call_endtime'),
                        'call_status': row.get('call_status'),
                        'direction': row.get('direction'),
                        'agentname': row.get('agentname') or '-',
                        'groupname': row.get('groupname') or '-',
                        'duration_seconds': int(row.get('duration_seconds') or 0),
                        'has_transcript': bool(row.get('has_transcript')),
                        'file_url': row.get('file_url'),
                        'transcript': row.get('transcript') or '',
                        'speaker_segments': self._parse_json_field(row.get('speaker_segments')) or [],
                        'transcript_duration': float(row.get('transcript_duration') or 0),
                        'quality_score': float(row.get('quality_score') or 0) if row.get('quality_score') is not None else None,
                        'summary': row.get('summary') or '',
                        'rich_summary': self._parse_json_field(row.get('rich_summary')),
                        'parameter_scores': self._resolve_parameter_scores(row),
                        'parameter_detections': self._parse_json_field(row.get('parameter_detections')),
                        'call_purpose': row.get('call_purpose') or '',
                        'objections_concerns': row.get('objections_concerns') or '',
                        'bant_profile': self._parse_json_field(row.get('bant_profile_json')),
                        'bant_summary': row.get('bant_summary') or '',
                    } for row in calls
                ]
            }

    # ========================================================================
    # CALL OPERATIONS
    # ========================================================================

    def get_calls(self, bid, filters=None, limit=100, offset=0):
        """Get calls with optional filtering"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            table_name = f"{bid}_calls"
            if not self._table_exists(cursor, table_name):
                return self._get_raw_calls_list(cursor, bid, filters, limit, offset)

            # Build WHERE clause
            where_clauses = []
            params = []

            if filters:
                if 'status' in filters and filters['status'] is not None:
                    where_clauses.append("status = %s")
                    params.append(filters['status'])

                if 'sales_intent' in filters:
                    where_clauses.append("sales_intent = %s")
                    params.append(filters['sales_intent'])

                if 'date_from' in filters:
                    where_clauses.append("call_starttime >= %s")
                    params.append(filters['date_from'])

                if 'date_to' in filters:
                    where_clauses.append("call_starttime <= %s")
                    params.append(filters['date_to'])

            where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""

            query = f"""
                SELECT * FROM `{table_name}`
                {where_sql}
                ORDER BY call_starttime DESC, call_endtime DESC
                LIMIT %s OFFSET %s
            """

            params.extend([limit, offset])
            cursor.execute(query, params)
            calls = cursor.fetchall()

            return [self._format_call_record(call) for call in calls]

    def get_calls_count(self, bid, filters=None):
        """Get total count of calls matching filters"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            table_name = f"{bid}_calls"
            if not self._table_exists(cursor, table_name):
                return self._get_raw_calls_count(cursor, bid, filters)

            where_clauses = []
            params = []

            if filters:
                if 'status' in filters and filters['status'] is not None:
                    where_clauses.append("status = %s")
                    params.append(filters['status'])

                if 'sales_intent' in filters:
                    where_clauses.append("sales_intent = %s")
                    params.append(filters['sales_intent'])

                if 'date_from' in filters:
                    where_clauses.append("call_starttime >= %s")
                    params.append(filters['date_from'])

                if 'date_to' in filters:
                    where_clauses.append("call_starttime <= %s")
                    params.append(filters['date_to'])

            where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""

            query = f"SELECT COUNT(*) as count FROM `{table_name}` {where_sql}"

            cursor.execute(query, params)
            result = cursor.fetchone()
            return result['count'] if result else 0

    def get_call_by_id(self, bid, callid):
        """Get specific call by ID"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            table_name = f"{bid}_calls"
            if not self._table_exists(cursor, table_name):
                return self.get_raw_call_details(bid, callid)

            query = f"SELECT * FROM `{table_name}` WHERE callid = %s LIMIT 1"
            cursor.execute(query, (callid,))
            call = cursor.fetchone()

            return self._format_call_record(call)

    def get_call_transcript(self, bid, callid):
        """Get transcript from sarvamresponse table"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            table_name = f"{bid}_sarvamresponse"

            # Check if table exists
            cursor.execute(f"SHOW TABLES LIKE '{table_name}'")
            if not cursor.fetchone():
                return None

            query = f"""
                SELECT transcript, language, raw_response, speaker_segments, num_speakers, duration
                FROM `{table_name}`
                WHERE callid = %s
                ORDER BY created_at DESC
                LIMIT 1
            """
            cursor.execute(query, (callid,))
            return cursor.fetchone()

    def get_recent_calls(self, bid, limit=10):
        """Get most recent calls"""
        return self.get_calls(bid, filters=None, limit=limit, offset=0)

    def search_calls(self, bid, query, limit=50):
        """Search calls by query string"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            table_name = f"{bid}_calls"
            if not self._table_exists(cursor, table_name):
                return self._search_raw_calls(cursor, bid, query, limit)

            search_query = f"""
                SELECT * FROM `{table_name}`
                WHERE
                    callid LIKE %s OR
                    customer_name LIKE %s OR
                    agent_name LIKE %s OR
                    summary LIKE %s OR
                    transcripts LIKE %s
                ORDER BY call_starttime DESC, call_endtime DESC
                LIMIT %s
            """

            search_term = f"%{query}%"
            cursor.execute(search_query, (search_term, search_term, search_term,
                                         search_term, search_term, limit))
            calls = cursor.fetchall()

            return [self._format_call_record(call) for call in calls]

    def update_call(self, bid, callid, data):
        """Update call record"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            table_name = f"{bid}_calls"

            # Build UPDATE query dynamically based on provided data
            set_clauses = []
            params = []

            for key, value in data.items():
                if key in ['keywords', 'sentiments', 'emotions', 'customer_details']:
                    # Convert dict/list to JSON string
                    value = json.dumps(value) if value else None

                set_clauses.append(f"{key} = %s")
                params.append(value)

            if not set_clauses:
                return False

            set_clauses.append("updated_at = %s")
            params.append(datetime.now())

            params.append(callid)

            query = f"""
                UPDATE `{table_name}`
                SET {', '.join(set_clauses)}
                WHERE callid = %s
            """

            cursor.execute(query, params)
            return cursor.rowcount > 0

    def save_conversation_summary(self, bid, callid, transfer_reason):
        """Save conversation summary to call_history table"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            table_name = f"{bid}_call_history"

            # Check if table exists
            cursor.execute(f"SHOW TABLES LIKE '{table_name}'")
            if not cursor.fetchone():
                logger.warning(f"Table {table_name} does not exist")
                return False

            query = f"""
                INSERT INTO `{table_name}`
                (business_id, callid, transfer_reason, created_at)
                VALUES (%s, %s, %s, %s)
            """

            cursor.execute(query, (
                bid,
                callid,
                json.dumps(transfer_reason) if transfer_reason else None,
                datetime.now()
            ))

            return cursor.rowcount > 0

    # ========================================================================
    # TRANSCRIPT OPERATIONS
    # ========================================================================

    def get_transcripts(self, bid):
        """Get all transcripts with metadata from sarvamresponse table"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            # Check if sarvamresponse table exists
            sarvam_table = f"{bid}_sarvamresponse"
            cursor.execute(f"SHOW TABLES LIKE '{sarvam_table}'")
            if not cursor.fetchone():
                logger.warning(f"Table {sarvam_table} does not exist")
                return []

            # Get all transcripts with their metadata
            query = f"""
                SELECT
                    s.callid as transcript_id,
                    s.callid,
                    s.transcript as full_transcript,
                    s.speaker_segments,
                    s.num_speakers,
                    s.duration,
                    s.language,
                    s.created_at,
                    s.updated_at,
                    c.call_starttime,
                    c.call_endtime,
                    c.customer_callinfo,
                    c.agentname,
                    COALESCE(c.duration_seconds, TIMESTAMPDIFF(SECOND, c.call_starttime, c.call_endtime)) as duration_seconds,
                    CASE
                        WHEN s.transcript IS NOT NULL AND s.transcript != '' THEN TRUE
                        ELSE FALSE
                    END as stored_in_vectordb
                FROM `{sarvam_table}` s
                LEFT JOIN `{bid}_calls` c ON s.callid = c.callid
                ORDER BY s.created_at DESC
            """

            try:
                cursor.execute(query)
                transcripts = cursor.fetchall()

                # Process each transcript to add computed fields
                result = []
                for t in transcripts:
                    transcript_text = t.get('full_transcript', '')

                    # Use actual speaker data from diarization if available
                    num_speakers = t.get('num_speakers')
                    if num_speakers is None:
                        # Fallback estimation
                        num_speakers = 2 if transcript_text else 0

                    # Calculate segments from speaker_segments JSON if available
                    speaker_segments = t.get('speaker_segments')
                    if speaker_segments:
                        try:
                            import json
                            segments_data = json.loads(speaker_segments) if isinstance(speaker_segments, str) else speaker_segments
                            num_segments = len(segments_data) if segments_data else 0
                        except:
                            # Fallback: estimate segments by line breaks
                            num_segments = len([line for line in transcript_text.split('\n') if line.strip()]) if transcript_text else 0
                    else:
                        # Fallback: estimate segments by line breaks
                        num_segments = len([line for line in transcript_text.split('\n') if line.strip()]) if transcript_text else 0

                    # Create filename from callid and timestamp
                    customer_info = t.get('customer_callinfo', 'unknown')
                    filename = f"Call_{t.get('callid', 'unknown')}_{str(customer_info).replace(' ', '_')}.txt"

                    result.append({
                        'transcript_id': t.get('transcript_id'),
                        'callid': t.get('callid'),
                        'filename': filename,
                        'full_transcript': transcript_text,
                        'language': t.get('language'),
                        'num_speakers': num_speakers,
                        'num_segments': num_segments,
                        'duration': t.get('duration') or t.get('duration_seconds', 0),  # Use sarvam duration if available, else calculated
                        'created_at': t.get('created_at').isoformat() if t.get('created_at') else None,
                        'updated_at': t.get('updated_at').isoformat() if t.get('updated_at') else None,
                        'stored_in_vectordb': bool(t.get('stored_in_vectordb')),
                        'customer_name': customer_info,
                        'agent_name': t.get('agentname')
                    })

                return result

            except Exception as e:
                logger.error(f"Error fetching transcripts: {e}")
                return []

    def save_call_analytics(self, bid, callid, analytics_data):
        """Save call analytics to {bid}_callanalytics table"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            query = f"""
                INSERT INTO `{bid}_callanalytics`
                (callid, bid, summary, call_purpose, objections_concerns, objection_type,
                 quality_score, sentiment, analysis_model, raw_response,
                 parameter_scores, parameter_detections, total_possible_score,
                 parameters_not_applicable, talk_listen_ratio, agent_talk_time,
                 customer_talk_time, dead_air_percentage, agent_speak_percentage,
                 customer_speak_percentage, talk_listen_assessment)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    summary = VALUES(summary),
                    call_purpose = VALUES(call_purpose),
                    objections_concerns = VALUES(objections_concerns),
                    objection_type = VALUES(objection_type),
                    quality_score = VALUES(quality_score),
                    sentiment = VALUES(sentiment),
                    analysis_model = VALUES(analysis_model),
                    raw_response = VALUES(raw_response),
                    parameter_scores = VALUES(parameter_scores),
                    parameter_detections = VALUES(parameter_detections),
                    total_possible_score = VALUES(total_possible_score),
                    parameters_not_applicable = VALUES(parameters_not_applicable),
                    talk_listen_ratio = VALUES(talk_listen_ratio),
                    agent_talk_time = VALUES(agent_talk_time),
                    customer_talk_time = VALUES(customer_talk_time),
                    dead_air_percentage = VALUES(dead_air_percentage),
                    agent_speak_percentage = VALUES(agent_speak_percentage),
                    customer_speak_percentage = VALUES(customer_speak_percentage),
                    talk_listen_assessment = VALUES(talk_listen_assessment),
                    updated_at = CURRENT_TIMESTAMP
            """

            cursor.execute(query, (
                callid,
                analytics_data.get('bid', '7987'),
                analytics_data.get('summary'),
                analytics_data.get('call_purpose'),
                analytics_data.get('objections_concerns'),
                analytics_data.get('objection_type', 'None'),
                analytics_data.get('quality_score'),
                analytics_data.get('sentiment'),
                analytics_data.get('analysis_model', 'aws-nova'),
                json.dumps(analytics_data.get('raw_response')) if analytics_data.get('raw_response') else None,
                analytics_data.get('parameter_scores'),
                analytics_data.get('parameter_detections'),
                analytics_data.get('total_possible_score'),
                analytics_data.get('parameters_not_applicable'),
                analytics_data.get('talk_listen_ratio'),
                analytics_data.get('agent_talk_time'),
                analytics_data.get('customer_talk_time'),
                analytics_data.get('dead_air_percentage'),
                analytics_data.get('agent_speak_percentage'),
                analytics_data.get('customer_speak_percentage'),
                analytics_data.get('talk_listen_assessment'),
            ))

            conn.commit()
            analytics_id = cursor.lastrowid

            # Save classified objections if present
            classified_objections = analytics_data.get('classified_objections', [])
            if classified_objections:
                self._save_classified_objections(bid, callid, classified_objections, cursor, conn)

            return analytics_id

    def ensure_bant_table(self, bid):
        """Create the {bid}_bant table if it does not exist."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            query = f"""
                CREATE TABLE IF NOT EXISTS `{bid}_bant` (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    callid VARCHAR(255) UNIQUE,
                    bid VARCHAR(50),
                    profile_json LONGTEXT,
                    profile_summary TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    INDEX (callid)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
            """
            cursor.execute(query)
            conn.commit()

    def save_bant_analysis(self, bid, callid, profile, summary):
        """Save BANT profile to {bid}_bant table."""
        if not profile and not summary:
            return

        self.ensure_bant_table(bid)

        profile_json = None
        if profile:
            try:
                profile_json = json.dumps(profile)
            except Exception:
                profile_json = None

        with self.get_connection() as conn:
            cursor = conn.cursor()
            query = f"""
                INSERT INTO `{bid}_bant` (callid, bid, profile_json, profile_summary)
                VALUES (%s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    profile_json = VALUES(profile_json),
                    profile_summary = VALUES(profile_summary),
                    updated_at = CURRENT_TIMESTAMP
            """
            cursor.execute(query, (
                callid,
                bid,
                profile_json,
                summary
            ))
            conn.commit()

    def get_bant_analysis(self, bid, callid):
        """Fetch BANT profile for a call."""
        try:
            self.ensure_bant_table(bid)
        except Exception:
            return None

        with self.get_connection() as conn:
            cursor = conn.cursor()
            query = f"""
                SELECT profile_json, profile_summary
                FROM `{bid}_bant`
                WHERE callid = %s
                LIMIT 1
            """
            cursor.execute(query, (callid,))
            result = cursor.fetchone()
            if not result:
                return None

            profile = self._parse_json_field(result.get('profile_json'))
            return {
                'profile': profile,
                'summary': result.get('profile_summary') or ''
            }

    def _save_classified_objections(self, bid, callid, classified_objections, cursor, conn):
        """Save classified objections to call_objections table"""
        try:
            # Delete existing objections for this call
            delete_query = """
                DELETE FROM call_objections
                WHERE bid = %s AND callid = %s
            """
            cursor.execute(delete_query, (bid, callid))

            # Insert new classified objections
            if classified_objections:
                insert_query = """
                    INSERT INTO call_objections
                    (bid, callid, classification_id, objection_text, confidence, created_at)
                    VALUES (%s, %s, %s, %s, %s, NOW())
                """

                for objection in classified_objections:
                    cursor.execute(insert_query, (
                        bid,
                        callid,
                        objection.get('classification_id'),
                        objection.get('objection_text'),
                        objection.get('confidence', 'medium')
                    ))

                conn.commit()
                logger.info(f"Saved {len(classified_objections)} classified objections for call {callid}")

        except Exception as e:
            logger.error(f"Error saving classified objections: {e}")
            # Don't raise - continue even if objection saving fails

    def get_call_analytics(self, bid, callid):
        """Get analytics for a specific call"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            query = f"""
                SELECT * FROM `{bid}_callanalytics`
                WHERE callid = %s
                LIMIT 1
            """

            cursor.execute(query, (callid,))
            result = cursor.fetchone()

            if result and result.get('raw_response'):
                result['raw_response'] = self._parse_json_field(result['raw_response'])

            return result

    def get_calls_for_analysis(self, bid, limit=10):
        """Get calls that have transcripts but no analytics yet"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            query = f"""
                SELECT
                    r.callid,
                    r.bid,
                    r.agentname,
                    r.groupname,
                    r.call_starttime,
                    r.call_endtime,
                    s.transcript,
                    s.speaker_segments,
                    s.num_speakers,
                    s.duration
                FROM `{bid}_raw_calls` r
                INNER JOIN `{bid}_sarvamresponse` s ON r.callid = s.callid
                LEFT JOIN `{bid}_callanalytics` a ON r.callid = a.callid
                WHERE r.call_status = 'ANSWER'
                  AND s.transcript IS NOT NULL
                  AND s.transcript != ''
                  AND a.callid IS NULL
                ORDER BY r.call_starttime DESC
                LIMIT %s
            """

            cursor.execute(query, (limit,))
            results = cursor.fetchall()

            # Parse JSON fields
            for result in results:
                if result.get('speaker_segments'):
                    result['speaker_segments'] = self._parse_json_field(result['speaker_segments'])

            return results

    def get_analytics_overview(self, bid, groupname=None):
        """Get overall analytics summary"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            where_clause = ""
            params = []
            if groupname:
                where_clause = "WHERE r.groupname = %s"
                params.append(groupname)

            query = f"""
                SELECT
                    COUNT(DISTINCT a.callid) as total_analyzed_calls,
                    AVG(a.quality_score) as avg_quality_score,
                    SUM(CASE WHEN a.sentiment = 'positive' THEN 1 ELSE 0 END) as positive_calls,
                    SUM(CASE WHEN a.sentiment = 'neutral' THEN 1 ELSE 0 END) as neutral_calls,
                    SUM(CASE WHEN a.sentiment = 'negative' THEN 1 ELSE 0 END) as negative_calls,
                    COUNT(DISTINCT r.groupname) as total_locations
                FROM `{bid}_callanalytics` a
                INNER JOIN `{bid}_raw_calls` r ON a.callid = r.callid
                {where_clause}
            """

            cursor.execute(query, params)
            result = cursor.fetchone()
            return result if result else {}

    def get_sentiment_by_location(self, bid, groupname=None):
        """Get sentiment distribution by location"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            where_clause = ""
            params = []
            if groupname:
                where_clause = "AND r.groupname = %s"
                params.append(groupname)

            query = f"""
                SELECT
                    r.groupname as location,
                    a.sentiment,
                    COUNT(*) as count
                FROM `{bid}_callanalytics` a
                INNER JOIN `{bid}_raw_calls` r ON a.callid = r.callid
                WHERE a.sentiment IS NOT NULL {where_clause}
                GROUP BY r.groupname, a.sentiment
                ORDER BY r.groupname, a.sentiment
            """

            cursor.execute(query, params)
            return cursor.fetchall()

    def get_quality_by_location(self, bid, groupname=None):
        """Get average quality score by location"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            where_clause = ""
            params = []
            if groupname:
                where_clause = "WHERE r.groupname = %s"
                params.append(groupname)

            query = f"""
                SELECT
                    r.groupname as location,
                    AVG(a.quality_score) as avg_quality_score,
                    MIN(a.quality_score) as min_quality_score,
                    MAX(a.quality_score) as max_quality_score,
                    COUNT(*) as call_count
                FROM `{bid}_callanalytics` a
                INNER JOIN `{bid}_raw_calls` r ON a.callid = r.callid
                {where_clause}
                GROUP BY r.groupname
                ORDER BY avg_quality_score DESC
            """

            cursor.execute(query, params)
            return cursor.fetchall()

    def get_quality_by_agent(self, bid, groupname=None):
        """Get average quality score by agent"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            where_clause = ""
            params = []
            if groupname:
                where_clause = "WHERE r.groupname = %s"
                params.append(groupname)

            query = f"""
                SELECT
                    COALESCE(NULLIF(r.agent_callinfo, ''), NULLIF(r.agentname, ''), 'Unknown') as agent,
                    COALESCE(NULLIF(MAX(r.agentname), ''), MAX(r.agent_callinfo), 'Unknown') as agent_display,
                    AVG(a.quality_score) as avg_quality_score,
                    MIN(a.quality_score) as min_quality_score,
                    MAX(a.quality_score) as max_quality_score,
                    COUNT(*) as call_count
                FROM `{bid}_callanalytics` a
                INNER JOIN `{bid}_raw_calls` r ON a.callid = r.callid
                {where_clause}
                GROUP BY agent
                ORDER BY avg_quality_score DESC
            """

            cursor.execute(query, params)
            return cursor.fetchall()

    def get_agent_leaderboard(self, bid, groupname=None, date_from=None, date_to=None):
        """Return per-agent scoring leaderboard with full stats and date filtering."""
        conditions = ["a.quality_score IS NOT NULL"]
        params = []

        if groupname:
            conditions.append("r.groupname = %s")
            params.append(groupname)
        if date_from:
            conditions.append("r.call_starttime >= %s")
            params.append(date_from)
        if date_to:
            conditions.append("r.call_starttime <= %s")
            params.append(date_to)

        where = "WHERE " + " AND ".join(conditions)

        query = f"""
            SELECT
                COALESCE(NULLIF(r.agentname, ''), 'Unknown') AS agent_name,
                ROUND(AVG(a.quality_score), 1)  AS avg_score,
                ROUND(MIN(a.quality_score), 1)  AS min_score,
                ROUND(MAX(a.quality_score), 1)  AS max_score,
                COUNT(*)                         AS total_calls,
                SUM(CASE WHEN a.quality_score >= 85 THEN 1 ELSE 0 END) AS high_calls,
                SUM(CASE WHEN a.quality_score >= 60
                          AND a.quality_score < 85 THEN 1 ELSE 0 END)  AS mid_calls,
                SUM(CASE WHEN a.quality_score < 60  THEN 1 ELSE 0 END) AS low_calls,
                MAX(r.call_starttime)            AS last_call
            FROM `{bid}_callanalytics` a
            INNER JOIN `{bid}_raw_calls` r ON a.callid = r.callid
            {where}
            GROUP BY agent_name
            ORDER BY avg_score DESC
        """

        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(query, params)
            rows = cursor.fetchall() or []

        result = []
        for rank, row in enumerate(rows, 1):
            last_call = row.get("last_call")
            result.append({
                "rank": rank,
                "agent_name": row["agent_name"],
                "avg_score": float(row["avg_score"] or 0),
                "min_score": float(row["min_score"] or 0),
                "max_score": float(row["max_score"] or 0),
                "total_calls": int(row["total_calls"] or 0),
                "high_calls": int(row["high_calls"] or 0),
                "mid_calls": int(row["mid_calls"] or 0),
                "low_calls": int(row["low_calls"] or 0),
                "last_call": last_call.isoformat() if hasattr(last_call, "isoformat") else str(last_call or ""),
            })
        return result

    def get_call_purpose_frequency(self, bid, groupname=None):
        """Get frequency of different call purposes"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            where_clause = ""
            params = []
            if groupname:
                where_clause = "AND r.groupname = %s"
                params.append(groupname)

            query = f"""
                SELECT
                    a.call_purpose,
                    COUNT(*) as count
                FROM `{bid}_callanalytics` a
                INNER JOIN `{bid}_raw_calls` r ON a.callid = r.callid
                WHERE a.call_purpose IS NOT NULL
                  AND a.call_purpose != '' {where_clause}
                GROUP BY a.call_purpose
                ORDER BY count DESC
                LIMIT 20
            """

            cursor.execute(query, params)
            return cursor.fetchall()

    # -------------------------------------------------------------------------
    # STT Pipeline Bid Config
    # -------------------------------------------------------------------------

    def ensure_stt_pipeline_bid_config_table(self):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS `stt_pipeline_bid_config` (
                    `bid`               VARCHAR(100) NOT NULL,
                    `enabled`           TINYINT(1)   NOT NULL DEFAULT 0,
                    `raw_calls_id_col`  VARCHAR(100) NOT NULL DEFAULT 'id',
                    `raw_calls_url_col` VARCHAR(100) NOT NULL DEFAULT 'recording_url',
                    `batch_size`        INT          NOT NULL DEFAULT 10,
                    `poll_interval_s`   INT          NOT NULL DEFAULT 30,
                    `notes`             TEXT         NULL,
                    `created_at`        DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP,
                    `updated_at`        DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    PRIMARY KEY (`bid`)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
            """)

    def get_stt_pipeline_bid_configs(self):
        self.ensure_stt_pipeline_bid_config_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT * FROM stt_pipeline_bid_config ORDER BY bid")
            return cursor.fetchall() or []

    def get_stt_pipeline_bid_config(self, bid):
        self.ensure_stt_pipeline_bid_config_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT * FROM stt_pipeline_bid_config WHERE bid = %s", (bid,))
            return cursor.fetchone()

    def upsert_stt_pipeline_bid_config(self, bid, data: dict):
        self.ensure_stt_pipeline_bid_config_table()
        allowed = {"enabled", "raw_calls_id_col", "raw_calls_url_col", "batch_size",
                   "poll_interval_s", "notes"}
        fields = {k: v for k, v in data.items() if k in allowed}
        if not fields:
            return
        col_list = ", ".join(f"`{k}`" for k in fields)
        ph_list = ", ".join(["%s"] * len(fields))
        dup_set = ", ".join(f"`{k}` = VALUES(`{k}`)" for k in fields)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"INSERT INTO stt_pipeline_bid_config (bid, {col_list}) "
                f"VALUES (%s, {ph_list}) "
                f"ON DUPLICATE KEY UPDATE {dup_set}",
                [bid] + list(fields.values()),
            )

    def toggle_stt_pipeline_bid(self, bid, enabled: bool):
        self.ensure_stt_pipeline_bid_config_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "INSERT INTO stt_pipeline_bid_config (bid, enabled) VALUES (%s, %s) "
                "ON DUPLICATE KEY UPDATE enabled = VALUES(enabled)",
                (bid, 1 if enabled else 0),
            )

    def get_stt_job_stats(self, bid=None):
        """Return job counts by status from stt_jobs table. Returns {} if table missing."""
        try:
            with self.get_connection() as conn:
                cursor = conn.cursor()
                if bid:
                    cursor.execute(
                        "SELECT status, COUNT(*) as cnt FROM stt_jobs WHERE bid = %s GROUP BY status",
                        (bid,),
                    )
                else:
                    cursor.execute("SELECT status, COUNT(*) as cnt FROM stt_jobs GROUP BY status")
                rows = cursor.fetchall() or []
            return {r["status"]: r["cnt"] for r in rows}
        except Exception as exc:
            logger.warning("get_stt_job_stats failed: %s", exc)
            return {}

    def discover_stt_raw_call_bids(self):
        """Return all bids that have a *_raw_calls table in this DB."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT TABLE_NAME FROM information_schema.TABLES "
                "WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME LIKE %s",
                ("%_raw_calls",),
            )
            rows = cursor.fetchall() or []
        return [r["TABLE_NAME"].replace("_raw_calls", "") for r in rows]

    def get_concerns_frequency(self, bid, groupname=None):
        """Get frequency of different concerns/objections"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            where_clause = ""
            params = []
            if groupname:
                where_clause = "AND r.groupname = %s"
                params.append(groupname)

            query = f"""
                SELECT
                    a.objections_concerns,
                    a.objection_type,
                    COUNT(*) as count
                FROM `{bid}_callanalytics` a
                INNER JOIN `{bid}_raw_calls` r ON a.callid = r.callid
                WHERE a.objections_concerns IS NOT NULL
                  AND a.objections_concerns != ''
                  AND a.objections_concerns != 'None identified'
                  AND a.objections_concerns != 'None'
                  {where_clause}
                GROUP BY a.objections_concerns, a.objection_type
                ORDER BY count DESC
                LIMIT 20
            """

            cursor.execute(query, params)
            return cursor.fetchall()

    def get_busy_locations(self, bid, groupname=None):
        """Get busiest locations by call volume"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            where_clause = ""
            params = []
            if groupname:
                where_clause = "WHERE r.groupname = %s"
                params.append(groupname)

            query = f"""
                SELECT
                    r.groupname as location,
                    COUNT(DISTINCT a.callid) as analyzed_calls,
                    COUNT(DISTINCT r.callid) as total_calls,
                    ROUND(COUNT(DISTINCT a.callid) * 100.0 / COUNT(DISTINCT r.callid), 2) as analysis_percentage
                FROM `{bid}_raw_calls` r
                LEFT JOIN `{bid}_callanalytics` a ON r.callid = a.callid
                {where_clause}
                GROUP BY r.groupname
                ORDER BY analyzed_calls DESC
                LIMIT 20
            """

            cursor.execute(query, params)
            return cursor.fetchall()

    def get_calls_by_objection(self, bid, objection, groupname=None):
        """Get all calls with a specific objection/concern"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            where_clause = """
                WHERE (
                    LOWER(TRIM(COALESCE(a.objection_type, ''))) = LOWER(TRIM(%s))
                    OR LOWER(TRIM(COALESCE(oc.category_name, ''))) = LOWER(TRIM(%s))
                    OR a.objections_concerns = %s
                )
            """
            params = [bid, bid, objection, objection, objection]

            if groupname:
                where_clause += " AND r.groupname = %s"
                params.append(groupname)

            query = f"""
                SELECT DISTINCT
                    a.callid,
                    r.groupname as location,
                    r.agentname,
                    r.call_starttime,
                    r.call_endtime,
                    r.call_status,
                    r.direction,
                    COALESCE(r.duration_seconds, TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime)) as duration_seconds,
                    a.call_purpose,
                    a.sentiment,
                    a.quality_score,
                    a.objections_concerns,
                    a.objection_type,
                    a.created_at
                FROM `{bid}_callanalytics` a
                INNER JOIN `{bid}_raw_calls` r ON a.callid = r.callid
                LEFT JOIN call_objections co ON co.bid = %s AND co.callid = a.callid
                LEFT JOIN objection_classifications oc ON oc.bid = %s AND oc.id = co.classification_id
                {where_clause}
                ORDER BY COALESCE(r.call_starttime, a.created_at) DESC
                LIMIT 100
            """

            cursor.execute(query, params)
            return cursor.fetchall()

    def delete_transcript(self, bid, callid):
        """Delete transcript from sarvamresponse table"""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            query = f"DELETE FROM `{bid}_sarvamresponse` WHERE callid = %s"
            cursor.execute(query, (callid,))
            conn.commit()
            return cursor.rowcount

    def reset_transcription_status(self, bid, callid):
        """Reset transcription_status to 0 in raw_calls table"""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            query = f"UPDATE `{bid}_raw_calls` SET transcription_status = 0 WHERE callid = %s"
            cursor.execute(query, (callid,))
            conn.commit()
            return cursor.rowcount

    def update_speaker_segment_text(self, bid, callid, segment_index, new_text):
        """Update the text of a specific speaker segment"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            # Get current speaker_segments
            query = f"SELECT speaker_segments FROM `{bid}_sarvamresponse` WHERE callid = %s"
            cursor.execute(query, (callid,))
            result = cursor.fetchone()

            if not result or not result['speaker_segments']:
                raise ValueError(f"No speaker segments found for call {callid}")

            # Parse segments
            segments = self._parse_json_field(result['speaker_segments'])
            if not segments or not isinstance(segments, list):
                raise ValueError(f"Invalid speaker_segments format for call {callid}")

            # Validate segment index
            if segment_index < 0 or segment_index >= len(segments):
                raise ValueError(f"Invalid segment index {segment_index} (total segments: {len(segments)})")

            # Update the text
            segments[segment_index]['text'] = new_text

            # Save back to database
            update_query = f"UPDATE `{bid}_sarvamresponse` SET speaker_segments = %s, updated_at = NOW() WHERE callid = %s"
            cursor.execute(update_query, (json.dumps(segments), callid))
            conn.commit()

            logger.info(f"Updated segment {segment_index} for call {callid}")
            return True

    def get_agent_names(self, bid, groupname=None):
        """Get list of unique agent names, optionally filtered by groupname"""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            if groupname:
                query = f"""
                    SELECT DISTINCT agentname
                    FROM `{bid}_raw_calls`
                    WHERE agentname IS NOT NULL
                    AND agentname != ''
                    AND groupname = %s
                    ORDER BY agentname
                """
                cursor.execute(query, (groupname,))
            else:
                query = f"""
                    SELECT DISTINCT agentname
                    FROM `{bid}_raw_calls`
                    WHERE agentname IS NOT NULL
                    AND agentname != ''
                    ORDER BY agentname
                """
                cursor.execute(query)

            agents = [row['agentname'] for row in cursor.fetchall()]
            return agents

    def import_raw_calls_from_excel(self, bid, file_storage):
        """Import raw calls from an Excel (.xlsx) file into {bid}_raw_calls."""
        try:
            import openpyxl
        except ImportError as e:
            raise ValueError("openpyxl is required for Excel uploads") from e

        if not file_storage or not getattr(file_storage, 'filename', ''):
            raise ValueError("No file provided for upload")

        filename = file_storage.filename.lower()
        if not filename.endswith('.xlsx'):
            raise ValueError("Only .xlsx files are supported")

        try:
            bid_value = int(bid)
        except (TypeError, ValueError) as e:
            raise ValueError("Business ID must be numeric") from e

        try:
            workbook = openpyxl.load_workbook(file_storage, data_only=True)
        except Exception as e:
            raise ValueError(f"Failed to read Excel file: {e}") from e

        worksheet = workbook.active
        rows_iter = worksheet.iter_rows(values_only=True)
        try:
            headers = next(rows_iter)
        except StopIteration:
            raise ValueError("Excel file is empty")

        if not headers:
            raise ValueError("Excel file header row is missing")

        import re

        def normalize_header(value):
            if value is None:
                return ''
            header = str(value).strip().lower()
            header = header.replace(' ', '_').replace('-', '_')
            header = re.sub(r'[^a-z0-9_]', '', header)
            return header

        allowed_columns = [
            'bid',
            'callid',
            'fileurl',
            'status',
            'agentname',
            'groupname',
            'call_starttime',
            'call_endtime',
            'call_status',
            'agent_callinfo',
            'customer_callinfo',
            'direction',
            'transcription_requested',
            'transcription_status',
            'selected_for_processing'
        ]

        aliases = {
            'call_id': 'callid',
            'file_url': 'fileurl',
            'fileurl': 'fileurl',
            'audio_url': 'fileurl',
            'recording_url': 'fileurl',
            'recordingurl': 'fileurl',
            'filename': 'fileurl',
            'file_name': 'fileurl',
            'agent_name': 'agentname',
            'group_name': 'groupname',
            'starttime': 'call_starttime',
            'start_time': 'call_starttime',
            'call_start_time': 'call_starttime',
            'endtime': 'call_endtime',
            'end_time': 'call_endtime',
            'call_end_time': 'call_endtime',
            'dialstatus': 'call_status',
            'agent_phone': 'agent_callinfo',
            'emp_phone': 'agent_callinfo',
            'clicktocalldid': 'customer_callinfo'
        }

        header_map = {}
        for idx, raw_header in enumerate(headers):
            normalized = normalize_header(raw_header)
            if not normalized:
                continue
            column = aliases.get(normalized, normalized)
            if column in allowed_columns:
                header_map[idx] = column

        if 'callid' not in header_map.values():
            raise ValueError("Excel must include a 'callid' column")

        def normalize_callid(value):
            if value is None:
                return None
            if isinstance(value, float) and value.is_integer():
                return str(int(value))
            return str(value).strip()

        records = []
        skipped = 0
        for row in rows_iter:
            if not row or all(cell is None or str(cell).strip() == '' for cell in row):
                continue

            record = {'bid': bid_value}
            for idx, column in header_map.items():
                if idx >= len(row):
                    continue
                value = row[idx]
                if isinstance(value, str):
                    value = value.strip()
                    if value == '':
                        value = None
                record[column] = value

            callid = normalize_callid(record.get('callid'))
            if not callid:
                skipped += 1
                continue
            record['callid'] = callid
            records.append(record)

        if not records:
            raise ValueError("No valid rows found to import")

        present_columns = set()
        for record in records:
            present_columns.update(record.keys())

        columns = [col for col in allowed_columns if col in present_columns]
        if 'bid' not in columns:
            columns.insert(0, 'bid')
        if 'callid' not in columns:
            columns.insert(1, 'callid')

        values = [[record.get(col) for col in columns] for record in records]

        table_name = f"{bid}_raw_calls"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("SHOW TABLES LIKE %s", (table_name,))
            if not cursor.fetchone():
                raise ValueError(f"Table {table_name} does not exist")

            columns_sql = ", ".join(f"`{col}`" for col in columns)
            placeholders = ", ".join(["%s"] * len(columns))
            update_cols = [col for col in columns if col not in ('callid',)]
            update_sql = ", ".join(
                f"`{col}` = VALUES(`{col}`)" for col in update_cols
            )
            query = (
                f"INSERT INTO `{table_name}` ({columns_sql}) "
                f"VALUES ({placeholders})"
            )
            if update_cols:
                query += f" ON DUPLICATE KEY UPDATE {update_sql}"

            chunk_size = 500
            for i in range(0, len(values), chunk_size):
                cursor.executemany(query, values[i:i + chunk_size])

        return {
            'processed': len(records),
            'skipped': skipped,
            'columns': columns
        }

    # =========================================================================
    # business_pipeline_config
    # =========================================================================

    def ensure_business_pipeline_config_table(self) -> None:
        """Create business_pipeline_config if it doesn't exist."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS business_pipeline_config (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(20) NOT NULL UNIQUE,
                    pipeline_enabled TINYINT(1) NOT NULL DEFAULT 1,
                    source_db_host VARCHAR(255) NOT NULL DEFAULT '',
                    source_db_port INT NOT NULL DEFAULT 3306,
                    source_db_user VARCHAR(255) NOT NULL DEFAULT '',
                    source_db_password_enc TEXT,
                    source_db_name VARCHAR(255) NOT NULL DEFAULT '',
                    stt_provider VARCHAR(50) NOT NULL DEFAULT 'sarvam',
                    stt_api_key_enc TEXT,
                    min_call_duration_s INT NOT NULL DEFAULT 120,
                    sync_batch INT NOT NULL DEFAULT 500,
                    transcribe_batch INT NOT NULL DEFAULT 3,
                    sync_interval_s INT NOT NULL DEFAULT 120,
                    lead_filter_enabled TINYINT(1) NOT NULL DEFAULT 1,
                    crm_provider VARCHAR(50) DEFAULT 'leadsquared',
                    lookback_days INT NOT NULL DEFAULT 90,
                    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
                    updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
            """)

    def get_pipeline_config(self, bid: str) -> dict | None:
        """Return the pipeline config dict for *bid*, or None if not found."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT * FROM business_pipeline_config WHERE bid = %s",
                (str(bid),),
            )
            return cursor.fetchone()

    def get_enabled_pipeline_bids(self) -> list:
        """Return list of bid strings where pipeline_enabled = 1."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT bid FROM business_pipeline_config WHERE pipeline_enabled = 1 ORDER BY bid"
            )
            rows = cursor.fetchall() or []
            return [r["bid"] for r in rows]

    def save_pipeline_config(self, bid: str, data: dict) -> None:
        """Upsert a pipeline config row for *bid*.

        *data* may contain any subset of the table columns (except ``id``,
        ``bid``, ``created_at``, ``updated_at``).  Sensitive fields
        (``stt_api_key``, ``source_db_password``) are encrypted on-the-fly
        if passed as plain text — callers should pass the plain value under
        the key without the ``_enc`` suffix (e.g. ``stt_api_key``).
        """
        row = dict(data)
        row["bid"] = str(bid)

        # Encrypt secrets if provided in plain form
        for plain_key in ("stt_api_key", "source_db_password"):
            enc_key = plain_key + "_enc"
            if plain_key in row:
                row[enc_key] = self._encrypt_text(row.pop(plain_key))

        # Remove read-only fields
        for f in ("id", "created_at", "updated_at"):
            row.pop(f, None)

        cols = list(row.keys())
        placeholders = ", ".join(["%s"] * len(cols))
        cols_sql = ", ".join(f"`{c}`" for c in cols)
        update_sql = ", ".join(
            f"`{c}` = VALUES(`{c}`)" for c in cols if c != "bid"
        )
        sql = (
            f"INSERT INTO business_pipeline_config ({cols_sql}) VALUES ({placeholders})"
            f" ON DUPLICATE KEY UPDATE {update_sql}"
        )
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, [row[c] for c in cols])

    # =========================================================================
    # business_agent_config
    # =========================================================================

    def ensure_business_agent_config_table(self) -> None:
        """Create business_agent_config if it doesn't exist."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS business_agent_config (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(20) NOT NULL,
                    agent_name VARCHAR(100) NOT NULL,
                    agent_enabled TINYINT(1) NOT NULL DEFAULT 1,
                    model_provider VARCHAR(50) NOT NULL DEFAULT 'bedrock',
                    model_id VARCHAR(200) NOT NULL DEFAULT 'amazon.nova-lite-v1:0',
                    system_prompt TEXT,
                    user_prompt_template TEXT,
                    output_schema TEXT,
                    temperature FLOAT NOT NULL DEFAULT 0.1,
                    max_tokens INT NOT NULL DEFAULT 4096,
                    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
                    updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uq_bid_agent (bid, agent_name)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
            """)

    def get_agent_configs(self, bid: str) -> list:
        """Return all agent config rows for *bid* (enabled or not), ordered by id."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT * FROM business_agent_config WHERE bid = %s ORDER BY id",
                (str(bid),),
            )
            return cursor.fetchall() or []

    def save_agent_config(self, bid: str, data: dict) -> int:
        """Upsert an agent config; returns the row id."""
        row = dict(data)
        row["bid"] = str(bid)
        for f in ("id", "created_at", "updated_at"):
            row.pop(f, None)

        cols = list(row.keys())
        placeholders = ", ".join(["%s"] * len(cols))
        cols_sql = ", ".join(f"`{c}`" for c in cols)
        update_sql = ", ".join(
            f"`{c}` = VALUES(`{c}`)" for c in cols if c not in ("bid", "agent_name")
        )
        sql = (
            f"INSERT INTO business_agent_config ({cols_sql}) VALUES ({placeholders})"
            f" ON DUPLICATE KEY UPDATE {update_sql}"
        )
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, [row[c] for c in cols])
            return cursor.lastrowid

    def delete_agent_config(self, bid: str, agent_name: str) -> bool:
        """Delete a single agent config; returns True if a row was deleted."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "DELETE FROM business_agent_config WHERE bid = %s AND agent_name = %s",
                (str(bid), agent_name),
            )
            return cursor.rowcount > 0

    # =========================================================================
    # {bid}_call_records — DDL
    # =========================================================================

    def ensure_call_records_table(self, bid: str) -> None:
        """Create {bid}_call_records if it doesn't exist."""
        table = f"{bid}_call_records"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(f"""
                CREATE TABLE IF NOT EXISTS `{table}` (
                    id BIGINT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(20) NOT NULL,
                    callid VARCHAR(255) NOT NULL UNIQUE,
                    file_url TEXT,
                    status ENUM(
                        'pending','transcribing','transcribed',
                        'analyzing','done','failed'
                    ) NOT NULL DEFAULT 'pending',
                    fail_stage VARCHAR(50),
                    fail_reason TEXT,
                    agent_name VARCHAR(255),
                    group_name VARCHAR(255),
                    direction VARCHAR(20),
                    call_start DATETIME,
                    call_end DATETIME,
                    call_duration_s INT,
                    call_status VARCHAR(50),
                    agent_phone VARCHAR(100),
                    customer_phone VARCHAR(100),
                    stt_provider VARCHAR(50),
                    transcript MEDIUMTEXT,
                    speaker_segments JSON,
                    duration_s FLOAT,
                    analysis JSON,
                    quality_score FLOAT,
                    summary TEXT,
                    crm_provider VARCHAR(50),
                    crm_lead_id VARCHAR(100),
                    crm_lead_name VARCHAR(255),
                    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
                    updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    INDEX idx_status (status),
                    INDEX idx_customer_phone (customer_phone),
                    INDEX idx_call_start (call_start),
                    INDEX idx_agent_name (agent_name)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
            """)

    # =========================================================================
    # {bid}_call_records — write operations
    # =========================================================================

    def upsert_call_record(self, bid: str, record: dict) -> None:
        """Insert or update a call record.  ``callid`` is the unique key."""
        table = f"{bid}_call_records"
        row = dict(record)
        row.setdefault("bid", str(bid))

        # Serialise JSON columns
        for col in ("speaker_segments", "analysis"):
            if col in row and not isinstance(row[col], str) and row[col] is not None:
                row[col] = json.dumps(row[col])

        for f in ("id", "created_at", "updated_at"):
            row.pop(f, None)

        cols = list(row.keys())
        placeholders = ", ".join(["%s"] * len(cols))
        cols_sql = ", ".join(f"`{c}`" for c in cols)
        # On duplicate key, don't overwrite transcript/analysis if already set
        update_parts = []
        for c in cols:
            if c in ("callid", "bid"):
                continue
            if c in ("transcript", "speaker_segments", "analysis", "quality_score", "summary"):
                update_parts.append(
                    f"`{c}` = IF(VALUES(`{c}`) IS NOT NULL, VALUES(`{c}`), `{c}`)"
                )
            else:
                update_parts.append(f"`{c}` = VALUES(`{c}`)")
        update_sql = ", ".join(update_parts)

        sql = (
            f"INSERT INTO `{table}` ({cols_sql}) VALUES ({placeholders})"
            f" ON DUPLICATE KEY UPDATE {update_sql}"
        )
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, [row[c] for c in cols])

    def set_call_status(self, bid: str, callid: str, status: str) -> None:
        """Update only the status column for a call record."""
        table = f"{bid}_call_records"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"UPDATE `{table}` SET status = %s WHERE callid = %s",
                (status, callid),
            )

    def fail_call(self, bid: str, callid: str, stage: str, reason: str) -> None:
        """Mark a call as failed with stage and reason."""
        table = f"{bid}_call_records"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"UPDATE `{table}` SET status = 'failed', fail_stage = %s, fail_reason = %s "
                f"WHERE callid = %s",
                (stage, reason[:1000], callid),
            )

    def save_call_transcription(self, bid: str, callid: str, stt_result) -> None:
        """Persist STT result into the call record and advance status to 'transcribed'."""
        from stt import STTResult
        table = f"{bid}_call_records"
        segs_json = json.dumps(stt_result.speaker_segments) if stt_result.speaker_segments else None
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"""UPDATE `{table}`
                    SET stt_provider = %s,
                        transcript = %s,
                        speaker_segments = %s,
                        duration_s = %s,
                        status = 'transcribed',
                        fail_stage = NULL,
                        fail_reason = NULL
                    WHERE callid = %s""",
                (
                    stt_result.provider,
                    stt_result.transcript,
                    segs_json,
                    stt_result.duration,
                    callid,
                ),
            )

    def save_call_analysis(self, bid: str, callid: str, analysis: dict) -> None:
        """Persist agent analysis into the call record and advance status to 'done'."""
        table = f"{bid}_call_records"
        quality_score = analysis.get("quality_score")
        summary = analysis.get("summary")
        analysis_json = json.dumps(analysis)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"""UPDATE `{table}`
                    SET analysis = %s,
                        quality_score = %s,
                        summary = %s,
                        status = 'done',
                        fail_stage = NULL,
                        fail_reason = NULL
                    WHERE callid = %s""",
                (analysis_json, quality_score, summary, callid),
            )

    # =========================================================================
    # {bid}_call_records — read operations
    # =========================================================================

    def get_calls_to_transcribe(
        self, bid: str, batch: int = 3, min_duration_s: int = 0
    ) -> list:
        """Return call records with status='pending' that have a file URL."""
        table = f"{bid}_call_records"
        duration_clause = (
            f"AND call_duration_s >= {int(min_duration_s)}" if min_duration_s else ""
        )
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"""SELECT callid, file_url, call_start, call_duration_s, agent_name, customer_phone
                    FROM `{table}`
                    WHERE status = 'pending'
                      AND file_url IS NOT NULL AND file_url != ''
                      {duration_clause}
                    ORDER BY call_start ASC
                    LIMIT %s""",
                (batch,),
            )
            return cursor.fetchall() or []

    def get_calls_to_analyze(self, bid: str, batch: int = 3) -> list:
        """Return call records with status='transcribed' ready for agent analysis."""
        table = f"{bid}_call_records"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"""SELECT callid, transcript, speaker_segments,
                           agent_name, customer_phone, call_start, call_duration_s
                    FROM `{table}`
                    WHERE status = 'transcribed'
                      AND transcript IS NOT NULL AND transcript != ''
                    ORDER BY call_start ASC
                    LIMIT %s""",
                (batch,),
            )
            return cursor.fetchall() or []

    def get_call_records_list(
        self,
        bid: str,
        page: int = 1,
        page_size: int = 20,
        status: str | None = None,
        agent_name: str | None = None,
        search: str | None = None,
        date_from: str | None = None,
        date_to: str | None = None,
    ) -> dict:
        """Return paginated call records for the API."""
        table = f"{bid}_call_records"
        conditions = ["1=1"]
        params: list = []

        if status:
            conditions.append("status = %s")
            params.append(status)
        if agent_name:
            conditions.append("agent_name = %s")
            params.append(agent_name)
        if search:
            conditions.append("(customer_phone LIKE %s OR callid LIKE %s OR crm_lead_name LIKE %s)")
            like = f"%{search}%"
            params += [like, like, like]
        if date_from:
            conditions.append("call_start >= %s")
            params.append(date_from)
        if date_to:
            conditions.append("call_start <= %s")
            params.append(date_to)

        where = " AND ".join(conditions)
        offset = (max(page, 1) - 1) * page_size

        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(f"SELECT COUNT(*) AS total FROM `{table}` WHERE {where}", params)
            total = (cursor.fetchone() or {}).get("total", 0)

            cursor.execute(
                f"""SELECT id, callid, bid, status, fail_stage,
                           agent_name, group_name, direction,
                           call_start, call_end, call_duration_s, call_status,
                           agent_phone, customer_phone,
                           stt_provider, duration_s,
                           quality_score, summary,
                           crm_provider, crm_lead_id, crm_lead_name,
                           created_at, updated_at
                    FROM `{table}`
                    WHERE {where}
                    ORDER BY call_start DESC
                    LIMIT %s OFFSET %s""",
                params + [page_size, offset],
            )
            rows = cursor.fetchall() or []

        # Serialise datetimes
        for row in rows:
            for col in ("call_start", "call_end", "created_at", "updated_at"):
                if isinstance(row.get(col), datetime):
                    row[col] = row[col].isoformat()

        return {
            "total": total,
            "page": page,
            "page_size": page_size,
            "pages": max(1, -(-total // page_size)),
            "records": rows,
        }

    def get_call_record_detail(self, bid: str, callid: str) -> dict | None:
        """Return a single call record with full transcript and analysis."""
        table = f"{bid}_call_records"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"SELECT * FROM `{table}` WHERE callid = %s",
                (callid,),
            )
            row = cursor.fetchone()
        if not row:
            return None

        # Deserialise JSON columns
        for col in ("speaker_segments", "analysis"):
            val = row.get(col)
            if isinstance(val, str):
                try:
                    row[col] = json.loads(val)
                except Exception:
                    pass

        # Serialise datetimes
        for col in ("call_start", "call_end", "created_at", "updated_at"):
            if isinstance(row.get(col), datetime):
                row[col] = row[col].isoformat()

        return row

    # ── Telephony Integration Config ──────────────────────────────────────────

    def ensure_business_telephony_config_table(self):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS business_telephony_config (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    bid VARCHAR(20) NOT NULL,
                    provider VARCHAR(50) NOT NULL,
                    source_bid VARCHAR(20),
                    config_json TEXT,
                    is_active TINYINT(1) NOT NULL DEFAULT 1,
                    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
                    updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    UNIQUE KEY uq_bid_provider (bid, provider)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
            """)
            conn.commit()

    def get_telephony_integrations(self, bid):
        self.ensure_business_telephony_config_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT provider, source_bid, config_json, is_active, created_at "
                "FROM business_telephony_config WHERE bid = %s",
                (str(bid),)
            )
            rows = cursor.fetchall() or []
            result = []
            for r in rows:
                row = dict(r)
                if row.get("created_at") and hasattr(row["created_at"], "isoformat"):
                    row["created_at"] = row["created_at"].isoformat()
                try:
                    row["config"] = json.loads(row.get("config_json") or "{}")
                except Exception:
                    row["config"] = {}
                result.append(row)
            return result

    def save_telephony_integration(self, bid, provider, source_bid=None, config=None):
        self.ensure_business_telephony_config_table()
        config_str = json.dumps(config or {})
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("""
                INSERT INTO business_telephony_config (bid, provider, source_bid, config_json, is_active)
                VALUES (%s, %s, %s, %s, 1)
                ON DUPLICATE KEY UPDATE source_bid=%s, config_json=%s, is_active=1,
                    updated_at=CURRENT_TIMESTAMP
            """, (str(bid), provider, source_bid, config_str, source_bid, config_str))
            conn.commit()

    def delete_telephony_integration(self, bid, provider):
        self.ensure_business_telephony_config_table()
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "DELETE FROM business_telephony_config WHERE bid=%s AND provider=%s",
                (str(bid), provider)
            )
            conn.commit()

    def get_raw_calls_preview(self, bid, date_from=None, date_to=None, limit=200):
        """Preview raw_calls for Mcube Classic (local table)."""
        bid = str(bid)
        table = f"{bid}_raw_calls"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            query = f"SELECT * FROM `{table}` WHERE 1=1"
            params = []
            if date_from and date_to:
                query += " AND DATE(call_starttime) BETWEEN %s AND %s"
                params.extend([date_from, date_to])
            query += " ORDER BY call_starttime DESC LIMIT %s"
            params.append(int(limit))
            cursor.execute(query, params)
            rows = cursor.fetchall() or []
            result = []
            for r in rows:
                row = dict(r)
                for k, v in row.items():
                    if hasattr(v, "isoformat"):
                        row[k] = v.isoformat()
                result.append(row)
            return result

    def insert_raw_calls_from_telephony(self, bid, records):
        """Insert/upsert telephony call records into {bid}_raw_calls."""
        bid = str(bid)
        table = f"{bid}_raw_calls"
        inserted = 0
        skipped = 0
        with self.get_connection() as conn:
            cursor = conn.cursor()
            for rec in records:
                callid = str(rec.get("callid") or "").strip()
                if not callid:
                    skipped += 1
                    continue
                try:
                    cursor.execute(f"""
                        INSERT INTO `{table}`
                            (callid, bid, fileurl, status, agentname, groupname,
                             call_starttime, call_endtime, call_status,
                             customer_callinfo, direction, agent_callinfo)
                        VALUES (%s, %s, %s, 0, %s, %s, %s, %s, %s, %s, %s, %s)
                        ON DUPLICATE KEY UPDATE
                            fileurl          = VALUES(fileurl),
                            agentname        = VALUES(agentname),
                            groupname        = VALUES(groupname),
                            call_starttime   = VALUES(call_starttime),
                            call_endtime     = VALUES(call_endtime),
                            call_status      = VALUES(call_status),
                            customer_callinfo= VALUES(customer_callinfo),
                            direction        = VALUES(direction),
                            agent_callinfo   = VALUES(agent_callinfo)
                    """, (
                        callid,
                        bid,
                        rec.get("filename") or rec.get("fileurl") or "",
                        rec.get("agentname") or "",
                        rec.get("groupname") or "",
                        rec.get("starttime") or rec.get("call_starttime") or None,
                        rec.get("endtime") or rec.get("call_endtime") or None,
                        rec.get("dialstatus") or rec.get("call_status") or "",
                        rec.get("customer_callinfo") or "",
                        rec.get("direction") or "",
                        rec.get("emp_phone") or "",
                    ))
                    inserted += 1
                except Exception as e:
                    logger.warning(f"Skipped record {callid}: {e}")
                    skipped += 1
            conn.commit()
        return {"inserted": inserted, "skipped": skipped}

    # ========================================================================
    # LEAD INSIGHTS (stored in {bid}_bant with synthetic callid = LEAD_{phone})
    # ========================================================================

    def _ensure_bant_lead_columns(self, bid):
        """Add lead insights columns to {bid}_bant if they don't exist yet."""
        self.ensure_bant_table(bid)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            for col, definition in [
                ("lead_phone", "VARCHAR(50)"),
                ("lead_insights_json", "LONGTEXT"),
                ("lead_insights_at", "DATETIME"),
            ]:
                if not self._column_exists(cursor, f"{bid}_bant", col):
                    cursor.execute(
                        f"ALTER TABLE `{bid}_bant` ADD COLUMN `{col}` {definition}"
                    )
            conn.commit()

    def _lead_insights_callid(self, lead_phone: str) -> str:
        """Synthetic callid used to store lead-level insights in bant table."""
        digits = "".join(ch for ch in str(lead_phone) if ch.isdigit())
        return f"LEAD_{digits}"

    def get_lead_insights(self, bid: str, lead_phone: str):
        """Return cached lead insights dict or None."""
        try:
            self._ensure_bant_lead_columns(bid)
        except Exception:
            return None
        synthetic_id = self._lead_insights_callid(lead_phone)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"SELECT lead_insights_json, lead_insights_at FROM `{bid}_bant` WHERE callid = %s LIMIT 1",
                (synthetic_id,),
            )
            row = cursor.fetchone()
        if not row or not row.get("lead_insights_json"):
            return None
        parsed = self._parse_json_field(row["lead_insights_json"])
        return {
            "insights": parsed,
            "generated_at": str(row.get("lead_insights_at") or ""),
        }

    def save_lead_insights(self, bid: str, lead_phone: str, insights: dict):
        """Persist lead insights into the bant table using a synthetic callid."""
        self._ensure_bant_lead_columns(bid)
        synthetic_id = self._lead_insights_callid(lead_phone)
        insights_json = json.dumps(insights, ensure_ascii=False)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"""
                INSERT INTO `{bid}_bant`
                    (callid, bid, lead_phone, lead_insights_json, lead_insights_at)
                VALUES (%s, %s, %s, %s, NOW())
                ON DUPLICATE KEY UPDATE
                    lead_phone = VALUES(lead_phone),
                    lead_insights_json = VALUES(lead_insights_json),
                    lead_insights_at = NOW()
                """,
                (synthetic_id, bid, lead_phone, insights_json),
            )
            conn.commit()

    def get_lead_calls_for_insights(self, bid: str, lead_phone: str, min_duration_seconds: int = 20):
        """Return all calls > min_duration_seconds for a lead with transcript + analysis data."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            raw_table = f"{bid}_raw_calls"
            sarvam_table = f"{bid}_sarvamresponse"
            analytics_table = f"{bid}_callanalytics"
            bant_table = f"{bid}_bant"

            if not self._table_exists(cursor, raw_table):
                return []

            has_sarvam = self._table_exists(cursor, sarvam_table)
            has_analytics = self._table_exists(cursor, analytics_table)
            has_bant = self._table_exists(cursor, bant_table)
            has_rich_summary = has_analytics and self._column_exists(cursor, analytics_table, "rich_summary")

            transcript_expr = "s.transcript" if has_sarvam else "NULL"
            speaker_expr = "s.speaker_segments" if has_sarvam else "NULL"
            summary_expr = "a.summary" if has_analytics else "NULL"
            rich_summary_expr = "a.rich_summary" if has_rich_summary else "NULL"
            bant_expr = "b.profile_json" if has_bant else "NULL"
            objections_expr = "a.objections_concerns" if has_analytics else "NULL"
            purpose_expr = "a.call_purpose" if has_analytics else "NULL"

            join_sarvam = f"LEFT JOIN `{sarvam_table}` s ON r.callid = s.callid" if has_sarvam else ""
            join_analytics = f"LEFT JOIN `{analytics_table}` a ON r.callid = a.callid" if has_analytics else ""
            join_bant = f"LEFT JOIN `{bant_table}` b ON CONVERT(r.callid USING utf8mb4) = CONVERT(b.callid USING utf8mb4)" if has_bant else ""

            phone_expr = (
                "CASE WHEN LOWER(r.direction) = 'inbound' THEN "
                "CASE WHEN r.agent_callinfo = LEFT(r.callid, 10) THEN r.agent_callinfo "
                "ELSE LEFT(r.callid, 10) END "
                "ELSE r.customer_callinfo END"
            )
            cursor.execute(
                f"""
                SELECT
                    r.callid,
                    r.call_starttime,
                    r.agentname,
                    r.direction,
                    COALESCE(r.duration_seconds, TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime)) AS duration_seconds,
                    {transcript_expr} AS transcript,
                    {speaker_expr} AS speaker_segments,
                    {summary_expr} AS summary,
                    {rich_summary_expr} AS rich_summary,
                    {bant_expr} AS bant_profile_json,
                    {objections_expr} AS objections_concerns,
                    {purpose_expr} AS call_purpose
                FROM `{raw_table}` r
                {join_sarvam}
                {join_analytics}
                {join_bant}
                WHERE {phone_expr} = %s
                  AND TIMESTAMPDIFF(SECOND, r.call_starttime, r.call_endtime) >= %s
                  AND COALESCE(TRIM({transcript_expr if has_sarvam else "''"} ), '') != ''
                ORDER BY r.call_starttime ASC
                """,
                (lead_phone, min_duration_seconds),
            )
            rows = cursor.fetchall()

        result = []
        for row in rows:
            result.append({
                "callid": row["callid"],
                "call_starttime": str(row.get("call_starttime") or ""),
                "agentname": row.get("agentname") or "",
                "direction": row.get("direction") or "",
                "duration_seconds": int(row.get("duration_seconds") or 0),
                "transcript": row.get("transcript") or "",
                "speaker_segments": self._parse_json_field(row.get("speaker_segments")) or [],
                "summary": row.get("summary") or "",
                "rich_summary": self._parse_json_field(row.get("rich_summary")),
                "bant_profile": self._parse_json_field(row.get("bant_profile_json")),
                "objections_concerns": row.get("objections_concerns") or "",
                "call_purpose": row.get("call_purpose") or "",
            })
        return result

    # ========================================================================
    # RICH SUMMARY (new column in {bid}_callanalytics, never overwrites summary)
    # ========================================================================

    def ensure_rich_summary_column(self, bid: str):
        """Add rich_summary column to {bid}_callanalytics if it doesn't exist."""
        analytics_table = f"{bid}_callanalytics"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            if self._table_exists(cursor, analytics_table) and not self._column_exists(
                cursor, analytics_table, "rich_summary"
            ):
                cursor.execute(
                    f"ALTER TABLE `{analytics_table}` ADD COLUMN `rich_summary` LONGTEXT"
                )
                conn.commit()

    def save_rich_summary(self, bid: str, callid: str, rich_summary: dict):
        """Save rich_summary JSON to {bid}_callanalytics without touching existing columns."""
        analytics_table = f"{bid}_callanalytics"
        self.ensure_rich_summary_column(bid)
        rich_json = json.dumps(rich_summary, ensure_ascii=False)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                f"""
                UPDATE `{analytics_table}`
                SET rich_summary = %s
                WHERE callid = %s
                """,
                (rich_json, callid),
            )
            conn.commit()

    def get_rich_summary(self, bid: str, callid: str):
        """Fetch rich_summary JSON for a call, or None."""
        analytics_table = f"{bid}_callanalytics"
        try:
            with self.get_connection() as conn:
                cursor = conn.cursor()
                if not self._column_exists(cursor, analytics_table, "rich_summary"):
                    return None
                cursor.execute(
                    f"SELECT rich_summary FROM `{analytics_table}` WHERE callid = %s LIMIT 1",
                    (callid,),
                )
                row = cursor.fetchone()
            if not row:
                return None
            return self._parse_json_field(row.get("rich_summary"))
        except Exception:
            return None

    # ─── Lead Notes ────────────────────────────────────────────────────────────

    def _ensure_lead_notes_table(self, cursor):
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS `lead_notes` (
                id INT AUTO_INCREMENT PRIMARY KEY,
                bid VARCHAR(50) NOT NULL,
                lead_phone VARCHAR(50) NOT NULL,
                content TEXT NOT NULL,
                created_by VARCHAR(255),
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                INDEX idx_lead_notes (bid, lead_phone)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        """)

    def get_lead_notes(self, bid, lead_phone):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            self._ensure_lead_notes_table(cursor)
            cursor.execute(
                "SELECT id, content, created_by, created_at, updated_at "
                "FROM `lead_notes` WHERE bid=%s AND lead_phone=%s ORDER BY created_at DESC",
                (bid, lead_phone)
            )
            rows = cursor.fetchall()
        result = []
        for r in rows:
            result.append({
                'id': r['id'],
                'content': r['content'],
                'created_by': r['created_by'],
                'created_at': r['created_at'].isoformat() if r['created_at'] else None,
                'updated_at': r['updated_at'].isoformat() if r['updated_at'] else None,
            })
        return result

    def add_lead_note(self, bid, lead_phone, content, created_by=None):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            self._ensure_lead_notes_table(cursor)
            cursor.execute(
                "INSERT INTO `lead_notes` (bid, lead_phone, content, created_by) VALUES (%s, %s, %s, %s)",
                (bid, lead_phone, content, created_by)
            )
            conn.commit()
            return cursor.lastrowid

    def delete_lead_note(self, note_id, bid):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("DELETE FROM `lead_notes` WHERE id=%s AND bid=%s", (note_id, bid))
            conn.commit()
            return cursor.rowcount > 0

    # ─── Lead Tasks ────────────────────────────────────────────────────────────

    def _ensure_lead_tasks_table(self, cursor):
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS `lead_tasks` (
                id INT AUTO_INCREMENT PRIMARY KEY,
                bid VARCHAR(50) NOT NULL,
                lead_phone VARCHAR(50) NOT NULL,
                title VARCHAR(500) NOT NULL,
                due_date DATE,
                done TINYINT(1) DEFAULT 0,
                created_by VARCHAR(255),
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                INDEX idx_lead_tasks (bid, lead_phone)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        """)

    def get_lead_tasks(self, bid, lead_phone):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            self._ensure_lead_tasks_table(cursor)
            cursor.execute(
                "SELECT id, title, due_date, done, created_by, created_at "
                "FROM `lead_tasks` WHERE bid=%s AND lead_phone=%s ORDER BY created_at DESC",
                (bid, lead_phone)
            )
            rows = cursor.fetchall()
        result = []
        for r in rows:
            result.append({
                'id': r['id'],
                'title': r['title'],
                'due_date': r['due_date'].isoformat() if r['due_date'] else None,
                'done': bool(r['done']),
                'created_by': r['created_by'],
                'created_at': r['created_at'].isoformat() if r['created_at'] else None,
            })
        return result

    def add_lead_task(self, bid, lead_phone, title, due_date=None, created_by=None):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            self._ensure_lead_tasks_table(cursor)
            cursor.execute(
                "INSERT INTO `lead_tasks` (bid, lead_phone, title, due_date, created_by) VALUES (%s, %s, %s, %s, %s)",
                (bid, lead_phone, title, due_date or None, created_by)
            )
            conn.commit()
            return cursor.lastrowid

    def update_lead_task(self, task_id, bid, done):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "UPDATE `lead_tasks` SET done=%s WHERE id=%s AND bid=%s",
                (1 if done else 0, task_id, bid)
            )
            conn.commit()
            return cursor.rowcount > 0

    def delete_lead_task(self, task_id, bid):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("DELETE FROM `lead_tasks` WHERE id=%s AND bid=%s", (task_id, bid))
            conn.commit()
            return cursor.rowcount > 0

    # ========================================================================
    # DATA CAPTURE FIELD DEFINITIONS (Settings → Lead Insights → Data Capture)
    # ========================================================================

    def ensure_data_capture_fields_table(self):
        """Global table keyed by business bid for custom lead-insights data capture keys."""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS data_capture_fields (
                  id INT AUTO_INCREMENT PRIMARY KEY,
                  bid VARCHAR(64) NOT NULL,
                  field_key VARCHAR(128) NOT NULL,
                  display_name VARCHAR(512) NOT NULL,
                  field_type VARCHAR(32) NOT NULL DEFAULT 'text',
                  required_flag TINYINT(1) NOT NULL DEFAULT 0,
                  sort_order INT NOT NULL DEFAULT 0,
                  created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
                  UNIQUE KEY uk_bid_field (bid, field_key),
                  KEY idx_bid_sort (bid, sort_order)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
                """
            )
            conn.commit()

    def get_data_capture_fields(self, bid):
        """Ordered field definitions for a business (used by lead insights + UI)."""
        self.ensure_data_capture_fields_table()
        bid = str(bid)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                SELECT id, bid, field_key, display_name, field_type, required_flag, sort_order
                FROM data_capture_fields
                WHERE bid = %s
                ORDER BY sort_order ASC, id ASC
                """,
                (bid,),
            )
            rows = cursor.fetchall() or []
        out = []
        for r in rows:
            out.append(
                {
                    "id": r["id"],
                    "bid": r["bid"],
                    "field_key": r["field_key"],
                    "display_name": r["display_name"],
                    "field_type": r["field_type"],
                    "required": bool(r["required_flag"]),
                    "sort_order": r["sort_order"],
                }
            )
        return out

    def replace_data_capture_fields(self, bid, fields):
        """
        Replace all field definitions for a business.
        Each item: name (or display_name), type (or field_type), required (bool).
        """
        import re

        self.ensure_data_capture_fields_table()
        bid = str(bid)
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("DELETE FROM data_capture_fields WHERE bid = %s", (bid,))
            seen_keys = set()
            for idx, row in enumerate(fields or []):
                if not isinstance(row, dict):
                    continue
                display = (row.get("name") or row.get("display_name") or "").strip()
                if not display:
                    continue
                base = re.sub(r"[^a-z0-9]+", "_", display.lower()).strip("_") or f"field_{idx}"
                base = base[:100]
                key = base
                n = 2
                while key in seen_keys:
                    key = f"{base}_{n}"
                    n += 1
                seen_keys.add(key)
                ftype = (row.get("type") or row.get("field_type") or "text").strip().lower()
                if ftype not in ("text", "phone", "email", "number", "textarea", "date"):
                    ftype = "text"
                req = bool(row.get("required"))
                cursor.execute(
                    """
                    INSERT INTO data_capture_fields
                        (bid, field_key, display_name, field_type, required_flag, sort_order)
                    VALUES (%s, %s, %s, %s, %s, %s)
                    """,
                    (bid, key, display, ftype, 1 if req else 0, idx),
                )
            conn.commit()
