
import json
import logging
import os
import pika
import pymysql
import subprocess
import sys
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from urllib.parse import unquote, urlparse

from dotenv import load_dotenv
from pymysql.cursors import DictCursor

from analyze_calls_with_parameters import CallAnalyzer
from config import Config
from db_handler import DatabaseHandler

load_dotenv()


def _sanitize_bid_for_log_filename(bid: str) -> str:
    s = "".join(c for c in str(bid).strip() if c.isalnum() or c in "_-")
    return s or "unknown"


def _setup_per_bid_orchestration_logging(bid: str) -> tuple:
    """Separate orchestration vs analytics logs per business under dashboard-backend/."""
    bid_safe = _sanitize_bid_for_log_filename(bid)
    base_dir = os.path.dirname(os.path.abspath(__file__))
    orch_path = os.path.join(base_dir, f"orchestration_{bid_safe}.log")
    analytics_path = os.path.join(base_dir, f"analytics_updates_{bid_safe}.log")

    line_fmt = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
    orch_mod = logging.getLogger(__name__)
    orch_mod.handlers.clear()
    orch_mod.setLevel(logging.INFO)
    orch_mod.propagate = False
    ofh = logging.FileHandler(orch_path)
    ofh.setFormatter(line_fmt)
    osh = logging.StreamHandler(sys.stderr)
    osh.setFormatter(line_fmt)
    orch_mod.addHandler(ofh)
    orch_mod.addHandler(osh)

    alog = logging.getLogger("analytics_updates")
    alog.handlers.clear()
    alog.setLevel(logging.INFO)
    alog.propagate = False
    ah = logging.FileHandler(analytics_path)
    ah.setFormatter(logging.Formatter("%(asctime)s - %(message)s"))
    alog.addHandler(ah)

    lsq_log = logging.getLogger("leadsquared_activity_push")
    lsq_log.handlers.clear()
    lsq_log.setLevel(logging.INFO)
    lsq_log.propagate = False
    lsq_log.addHandler(ofh)
    lsq_log.addHandler(osh)

    return orch_path, analytics_path


logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

analytics_logger = logging.getLogger("analytics_updates")
analytics_logger.addHandler(logging.NullHandler())


from min_duration_util import (
    call_duration_seconds as shared_call_duration_seconds,
    evaluate_min_duration_for_ingest,
    is_below_min_duration as shared_is_below_min_duration,
    purge_unprocessed_if_below_min,
    skip_reason as shared_skip_reason,
)
from mcube_group_util import resolve_mcube_group_sql


class Orchestrator:
    def __init__(self, bid):
        self.bid = bid
        self._orch_log_path, self._analytics_log_path = _setup_per_bid_orchestration_logging(bid)
        self.config = Config()
        self.source_bid = self.bid
        self.source_db_override = None

        class ConfigWrapper:
            def __init__(self, config):
                self._config = config

            def get(self, key, default=None):
                return getattr(self._config, key, default)

            def __getattr__(self, key):
                return getattr(self._config, key)

        self.config_wrapped = ConfigWrapper(self.config)
        self.db_handler = DatabaseHandler(self.config_wrapped)
        self.analyzer = CallAnalyzer(self.config_wrapped)
        self._load_telephony_source_config()

        self.analytics_enabled = True
        self.analysis_mode = "default"
        self.webhook_ingest_enabled = False
        self.group_filter_enabled = False
        self.allowed_groupnames: List[str] = []
        self.min_call_duration_s = 0
        self.min_call_duration_effective_at = None
        self.lookback_days = 90
        self._load_processing_config()

        self.rabbitmq_host = os.getenv("RABBITMQ_HOST", "localhost")
        self.rabbitmq_queue = os.getenv("RABBITMQ_QUEUE", "stt_jobs")

    def _load_processing_config(self):
        """Load optional per-business controls without changing legacy BIDs."""
        try:
            self.db_handler.ensure_business_pipeline_config_table()
            cfg = self.db_handler.get_pipeline_config(self.bid) or {}
        except Exception as e:
            logger.warning(f"Could not read pipeline config for BID {self.bid}: {e}")
            return

        self.analysis_mode = str(cfg.get("analysis_mode") or "default").strip().lower()
        raw_enabled = cfg.get("analytics_enabled")
        if raw_enabled is not None:
            self.analytics_enabled = bool(int(raw_enabled)) if str(raw_enabled).isdigit() else bool(raw_enabled)
        if self.analysis_mode == "transcription_only":
            self.analytics_enabled = False

        self.group_filter_enabled = bool(int(cfg.get("group_filter_enabled") or 0))
        self.allowed_groupnames = self.db_handler._decode_allowed_groupnames(cfg.get("allowed_groupnames"))
        if self.group_filter_enabled:
            logger.info(
                "Group filter enabled for BID %s: %s",
                self.bid,
                ", ".join(self.allowed_groupnames) if self.allowed_groupnames else "(no groups selected)",
            )

        raw_min = cfg.get("min_call_duration_s")
        self.min_call_duration_s = max(0, int(raw_min)) if raw_min is not None else 0
        if self.min_call_duration_s > 0:
            self.min_call_duration_effective_at = self.db_handler.ensure_min_duration_effective_at(self.bid)
            logger.info(
                "Min call duration for BID %s: %ss (applies to calls from %s onward)",
                self.bid,
                self.min_call_duration_s,
                self.min_call_duration_effective_at,
            )
        else:
            self.min_call_duration_effective_at = cfg.get("min_call_duration_effective_at")

        raw_lookback = cfg.get("lookback_days")
        if raw_lookback is not None:
            try:
                self.lookback_days = max(0, int(raw_lookback))
            except (TypeError, ValueError):
                pass

        self.webhook_ingest_enabled = bool(int(cfg.get("webhook_ingest_enabled") or 0))
        if self.webhook_ingest_enabled:
            logger.info(
                "Webhook ingest enabled for BID %s — polling ingest is skipped.",
                self.bid,
            )

    def _load_telephony_source_config(self):
        """Prefer active telephony integration source DB/source_bid when configured."""
        try:
            integrations = self.db_handler.list_telephony_integrations(self.bid) or []
        except Exception as e:
            logger.warning(f"Could not read telephony integrations for BID {self.bid}: {e}")
            return

        if not integrations:
            return

        active = None
        for item in integrations:
            if int(item.get("is_active") or 0) == 1:
                active = item
                break
        if not active:
            active = integrations[0]

        cfg = active.get("config") or {}
        host = cfg.get("host")
        user = cfg.get("user")
        password = cfg.get("password")
        database = cfg.get("database")
        port = int(cfg.get("port") or 3306)
        source_bid = str(active.get("source_bid") or self.bid).strip()

        if host and user and password and database:
            self.source_db_override = {
                "host": host,
                "port": port,
                "user": user,
                "password": password,
                "database": database,
            }
            self.source_bid = source_bid
            logger.info(
                f"Using {active.get('provider', 'telephony')} integration source BID "
                f"{self.source_bid} for customer BID {self.bid}"
            )
        else:
            logger.warning(
                f"Active telephony integration for BID {self.bid} is missing DB fields; "
                "falling back to SYNC_SOURCE_DB_* env config."
            )

    def get_db_connection(self):
        return pymysql.connect(
            host=self.config.DB_HOST,
            port=self.config.DB_PORT,
            user=self.config.DB_USER,
            password=self.config.DB_PASSWORD,
            database=self.config.DB_NAME,
            cursorclass=DictCursor,
            autocommit=True,
        )

    def get_source_db_connection(self):
        if self.source_db_override:
            return pymysql.connect(
                host=self.source_db_override["host"],
                port=int(self.source_db_override["port"]),
                user=self.source_db_override["user"],
                password=self.source_db_override["password"],
                database=self.source_db_override["database"],
                cursorclass=DictCursor,
            )
        return pymysql.connect(
            host=self.config.SYNC_SOURCE_DB_HOST,
            port=self.config.SYNC_SOURCE_DB_PORT,
            user=self.config.SYNC_SOURCE_DB_USER,
            password=self.config.SYNC_SOURCE_DB_PASSWORD,
            database=self.config.SYNC_SOURCE_DB_NAME,
            cursorclass=DictCursor,
        )

    def _parse_datetime(self, value):
        if value is None:
            return None
        if isinstance(value, datetime):
            return value.replace(tzinfo=None) if value.tzinfo else value
        try:
            text = str(value).replace("Z", "+00:00")
            parsed = datetime.fromisoformat(text)
            return parsed.replace(tzinfo=None) if parsed.tzinfo else parsed
        except Exception:
            return None

    def _call_starttime(self, call: dict):
        return self._parse_datetime(call.get("call_starttime") or call.get("starttime"))

    def _call_duration_seconds(self, call: dict) -> Optional[int]:
        return shared_call_duration_seconds(call)

    def _is_below_min_duration(self, call: dict) -> bool:
        return shared_is_below_min_duration(
            call,
            self.min_call_duration_s,
            self.min_call_duration_effective_at,
        )

    def _min_duration_skip_reason(self, call: dict, *, audio_duration_s=None) -> str:
        return shared_skip_reason(
            call,
            self.min_call_duration_s,
            audio_duration_s=audio_duration_s,
        )

    def _min_duration_blocks_action(
        self,
        call: dict,
        recording_url: str,
        *,
        log_prefix: str = "Not ingested",
    ) -> tuple[bool, Optional[float]]:
        """Return (should_skip, probed_audio_seconds) using WAV probe when configured."""
        if self.min_call_duration_s <= 0:
            return False, None
        skip, reason, probed = evaluate_min_duration_for_ingest(
            call,
            self.min_call_duration_s,
            self.min_call_duration_effective_at,
            recording_url,
            probe_audio=True,
        )
        if skip:
            logger.info(
                "[%s] %s (%s)",
                call.get("callid"),
                log_prefix,
                reason,
            )
            return True, probed
        return False, probed

    def _active_group_filter(self):
        if not self.group_filter_enabled:
            return None
        return [str(g).strip() for g in (self.allowed_groupnames or []) if str(g).strip()]

    def _append_group_filter(self, where_parts: list, query_params: list, group_expr: str) -> None:
        groups = self._active_group_filter()
        if groups is None:
            return
        if not groups:
            where_parts.append("1=0")
            return
        placeholders = ", ".join(["%s"] * len(groups))
        where_parts.append(f"LOWER(TRIM({group_expr})) IN ({placeholders})")
        query_params.extend([g.lower() for g in groups])

    def _resolve_source_table(self, source_cursor):
        candidates = [f"{self.source_bid}_callhistory", f"{self.source_bid}_call_history"]
        for table_name in candidates:
            source_cursor.execute("SHOW TABLES LIKE %s", (table_name,))
            if source_cursor.fetchone():
                logger.info(f"Using source table: {table_name}")
                return table_name
        raise RuntimeError(
            f"No source call history table found for source BID {self.source_bid}. "
            f"Tried: {', '.join(candidates)}"
        )

    def _source_table_columns(self, source_cursor, table_name: str) -> dict:
        source_cursor.execute(
            """
            SELECT column_name
            FROM information_schema.columns
            WHERE table_schema = DATABASE() AND table_name = %s
            """,
            (table_name,),
        )
        cols = {}
        for row in source_cursor.fetchall() or []:
            name = str(row.get("column_name") or row.get("COLUMN_NAME") or "")
            if name:
                cols[name.lower()] = name
        return cols

    def _first_source_col(self, source_cols: dict, *candidates: str) -> Optional[str]:
        for candidate in candidates:
            key = candidate.lower()
            if key in source_cols:
                return source_cols[key]
        return None

    def _source_col_expr(self, col_name: Optional[str], alias: str, default_sql: Optional[str] = None) -> str:
        if col_name:
            return f"c.`{col_name}` AS {alias}"
        if default_sql is not None:
            return f"{default_sql} AS {alias}"
        return f"NULL AS {alias}"

    def _ingest_watermark_key(self) -> str:
        return f"orchestrator_ingest_{self.bid}"

    def _get_ingest_watermark(self):
        db_max = None
        dest_conn = self.get_db_connection()
        try:
            with dest_conn.cursor() as dest_cursor:
                dest_cursor.execute(
                    f"SELECT MAX(call_starttime) AS last_start FROM `{self.bid}_raw_calls`"
                )
                result = dest_cursor.fetchone()
                if result and result.get("last_start"):
                    db_max = self._parse_datetime(result["last_start"])
        finally:
            dest_conn.close()

        stored = self.db_handler.get_sync_watermark(self.bid, self._ingest_watermark_key())
        stored_dt = self._parse_datetime(stored) if stored else None
        candidates = [d for d in (db_max, stored_dt) if d]
        if not candidates:
            return datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
        return max(candidates)

    def _advance_ingest_watermark(self, starttime) -> None:
        new_dt = self._parse_datetime(starttime)
        if not new_dt:
            return
        current = self._get_ingest_watermark()
        if new_dt <= current:
            return
        self.db_handler.set_sync_watermark(self.bid, self._ingest_watermark_key(), new_dt.isoformat())

    def _fetch_answer_calls_from_source(
        self,
        source_cursor,
        source_table: str,
        source_cols: dict,
        *,
        after_start=None,
        before_start=None,
        limit: int = 50,
    ) -> list:
        callid_col = self._first_source_col(source_cols, "callid", "call_id")
        start_col = self._first_source_col(
            source_cols, "starttime", "call_starttime", "start_time", "call_start_time"
        )
        end_col = self._first_source_col(
            source_cols, "endtime", "call_endtime", "end_time", "call_end_time"
        )
        dialstatus_col = self._first_source_col(
            source_cols, "dialstatus", "call_status", "callstatus", "status"
        )
        direction_col = self._first_source_col(source_cols, "direction")
        filename_col = self._first_source_col(
            source_cols, "filename", "fileurl", "fileUrl", "file_url", "recording_url", "audio_url"
        )
        agent_col = self._first_source_col(
            source_cols, "agentname", "callername", "caller_name", "agent_name"
        )
        group_join_sql, group_join_params, groupname_select_sql, group_filter_expr = (
            resolve_mcube_group_sql(
                source_cursor, self.source_bid, source_table, source_cols, call_alias="c"
            )
        )
        callfrom_col = self._first_source_col(
            source_cols, "callfrom", "emp_phone", "agent_callinfo", "agent_phone"
        )
        callto_col = self._first_source_col(
            source_cols, "callto", "clicktocalldid", "customer_callinfo", "customer_phone"
        )
        answered_col = self._first_source_col(
            source_cols, "answeredtime", "answered_time", "talktime", "talk_time", "billsec"
        )
        pulse_col = self._first_source_col(source_cols, "pulse", "mpulse")

        if not callid_col or not start_col or not filename_col:
            raise RuntimeError(f"Source table {source_table} missing required columns")

        where_parts: list = []
        query_params: list = []
        if after_start is not None:
            where_parts.append(f"c.`{start_col}` > %s")
            query_params.append(after_start)
        if before_start is not None:
            where_parts.append(f"c.`{start_col}` <= %s")
            query_params.append(before_start)
        if dialstatus_col:
            where_parts.append(f"c.`{dialstatus_col}` IN ('ANSWER')")
        active_groups = self._active_group_filter()
        if active_groups is not None and not group_filter_expr:
            return []
        if active_groups is not None:
            self._append_group_filter(where_parts, query_params, group_filter_expr)

        where_sql = " AND ".join(where_parts) if where_parts else "1=1"
        order = "ASC" if after_start is not None and before_start is None else "DESC"
        query = f"""
            SELECT
                c.`{callid_col}` AS callid,
                %s AS bid,
                {self._source_col_expr(agent_col, "agentname", "''")},
                {groupname_select_sql},
                c.`{start_col}` AS starttime,
                {self._source_col_expr(end_col, "endtime")},
                {self._source_col_expr(dialstatus_col, "dialstatus", "'ANSWER'")},
                {self._source_col_expr(direction_col, "direction", "'inbound'")},
                c.`{filename_col}` AS filename,
                {self._source_col_expr(callfrom_col, "emp_phone", "''")},
                {self._source_col_expr(callto_col, "clicktocalldid", "''")},
                {self._source_col_expr(answered_col, "answeredtime")},
                {self._source_col_expr(pulse_col, "pulse")}
            FROM `{source_table}` c
            {group_join_sql}
            WHERE {where_sql}
            ORDER BY c.`{start_col}` {order}
            LIMIT %s
        """
        source_cursor.execute(
            query, tuple([self.source_bid] + group_join_params + query_params + [limit])
        )
        return source_cursor.fetchall() or []

    def _load_existing_callids_in_lookback(self, lookback_start: datetime) -> set:
        conn = self.get_db_connection()
        try:
            with conn.cursor() as cursor:
                cursor.execute(
                    f"SELECT callid FROM `{self.bid}_raw_calls` WHERE call_starttime >= %s",
                    (lookback_start,),
                )
                return {
                    str(row["callid"])
                    for row in (cursor.fetchall() or [])
                    if row.get("callid")
                }
        finally:
            conn.close()

    def _log_ingestion_skip_diagnostic(
        self, source_cursor, source_table, start_col, dialstatus_col, watermark
    ) -> None:
        try:
            source_cursor.execute(
                f"""
                SELECT COUNT(*) AS cnt
                FROM `{source_table}` c
                WHERE c.`{start_col}` > %s
                  AND c.`{dialstatus_col}` = 'ANSWER'
                """,
                (watermark,),
            )
            row = source_cursor.fetchone() or {}
            pending = int(row.get("cnt") or 0)
            if pending:
                logger.info(
                    "No calls ingested for BID %s although %s ANSWER call(s) exist after watermark %s "
                    "(check group filter / min duration)",
                    self.bid,
                    pending,
                    watermark,
                )
        except Exception as e:
            logger.warning("Could not log ingestion skip diagnostic: %s", e)

    def _recording_url_candidates(self, filename, starttime) -> List[str]:
        if not filename:
            return []
        filename = str(filename).strip()
        if filename.startswith("http"):
            return [filename]

        try:
            if isinstance(starttime, str):
                dt = datetime.strptime(starttime, "%Y-%m-%d %H:%M:%S")
            else:
                dt = starttime
            year = dt.strftime("%Y")
            month = dt.strftime("%m")
        except Exception:
            return [filename]

        bid = self.source_bid or self.bid
        name = os.path.basename(filename)
        return [
            f"https://recordings.mcube.com/mcubefiles112/appmcube/{year}/{month}/{bid}/{name}",
            f"https://recordings.mcube.com/mcubefiles112/classic/{year}/{month}/{bid}/inbound/{name}",
        ]

    def _get_full_url(self, filename, starttime):
        candidates = self._recording_url_candidates(filename, starttime)
        return candidates[0] if candidates else str(filename or "")

    def ingest_calls(self, limit=3):
        """Fetch calls from source DB newer than the latest local record."""
        current_time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        logger.info(f"Orchestration initiated at {current_time_str}")
        logger.info(
            f"Logs for BID {self.bid}: orchestration → {self._orch_log_path}; "
            f"analytics → {self._analytics_log_path}"
        )
        logger.info(f"Initial limit for extraction/ingestion is set to {limit}")

        try:
            watermark = self._get_ingest_watermark()
            lookback_start = datetime.now() - timedelta(days=self.lookback_days)
            logger.info(
                f"Found local watermark: {watermark} (lookback {self.lookback_days}d from {lookback_start})"
            )

            source_conn = self.get_source_db_connection()
            calls = []
            try:
                with source_conn.cursor() as source_cursor:
                    source_table = self._resolve_source_table(source_cursor)
                    source_cols = self._source_table_columns(source_cursor, source_table)
                    callid_col = self._first_source_col(source_cols, "callid", "call_id")
                    start_col = self._first_source_col(
                        source_cols, "starttime", "call_starttime", "start_time", "call_start_time"
                    )
                    dialstatus_col = self._first_source_col(
                        source_cols, "dialstatus", "call_status", "callstatus", "status"
                    )

                    if not callid_col:
                        raise RuntimeError(f"Source table {source_table} has no callid/call_id column")
                    if not start_col:
                        raise RuntimeError(
                            f"Source table {source_table} has no starttime/call_starttime column"
                        )

                    active_groups = self._active_group_filter()
                    _, _, _, group_filter_expr = resolve_mcube_group_sql(
                        source_cursor, self.source_bid, source_table, source_cols, call_alias="c"
                    )
                    if active_groups is not None and not group_filter_expr:
                        logger.error(
                            "Group filter is enabled for BID %s, but source table %s has no resolvable group column. "
                            "Ingesting 0 calls.",
                            self.bid,
                            source_table,
                        )
                        return 0

                    forward_calls = self._fetch_answer_calls_from_source(
                        source_cursor,
                        source_table,
                        source_cols,
                        after_start=watermark,
                        limit=limit,
                    )
                    calls = list(forward_calls)
                    seen_ids = {str(c["callid"]) for c in calls if c.get("callid")}
                    remaining = max(0, limit - len(calls))

                    if remaining > 0 and self.lookback_days > 0:
                        existing_ids = self._load_existing_callids_in_lookback(lookback_start)
                        backfill_candidates = self._fetch_answer_calls_from_source(
                            source_cursor,
                            source_table,
                            source_cols,
                            after_start=lookback_start,
                            before_start=watermark,
                            limit=min(remaining * 5, 250),
                        )
                        backfilled = 0
                        for row in backfill_candidates:
                            cid = str(row.get("callid") or "")
                            if not cid or cid in existing_ids or cid in seen_ids:
                                continue
                            calls.append(row)
                            seen_ids.add(cid)
                            backfilled += 1
                            if backfilled >= remaining:
                                break
                        if backfilled:
                            logger.info(
                                "Gap-fill backfill: found %s missed ANSWER call(s) within last %s days "
                                "(before watermark %s)",
                                backfilled,
                                self.lookback_days,
                                watermark,
                            )

                    if not calls:
                        self._log_ingestion_skip_diagnostic(
                            source_cursor, source_table, start_col, dialstatus_col, watermark
                        )
            finally:
                source_conn.close()

            logger.info(f"Number of records to be ingested: {len(calls)}")
            if calls:
                call_ids_str = ", ".join([str(c["callid"]) for c in calls])
                logger.info(f"Call IDs that would be ingested: {call_ids_str}")

            if not calls:
                return 0

            dest_conn = self.get_db_connection()
            try:
                with dest_conn.cursor() as dest_cursor:
                    insert_query = f"""
                        INSERT INTO `{self.bid}_raw_calls`
                        (bid, callid, fileurl, status, agentname, groupname, call_starttime, call_endtime,
                         call_status, agent_callinfo, customer_callinfo, direction,
                         transcription_requested, transcription_status, selected_for_processing,
                         duration_seconds)
                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                        ON DUPLICATE KEY UPDATE
                        fileurl = VALUES(fileurl),
                        agentname = VALUES(agentname),
                        groupname = VALUES(groupname),
                        call_starttime = VALUES(call_starttime),
                        call_endtime = VALUES(call_endtime),
                        call_status = VALUES(call_status),
                        agent_callinfo = VALUES(agent_callinfo),
                        customer_callinfo = VALUES(customer_callinfo),
                        direction = VALUES(direction),
                        duration_seconds = VALUES(duration_seconds)
                    """

                    inserted = 0
                    skipped_short = 0
                    max_batch_start = None
                    for call in calls:
                        starttime = call.get("starttime")
                        parsed_start = self._parse_datetime(starttime)
                        if parsed_start and (max_batch_start is None or parsed_start > max_batch_start):
                            max_batch_start = parsed_start

                        row = {
                            "callid": call["callid"],
                            "starttime": starttime,
                            "endtime": call.get("endtime"),
                            "duration_seconds": call.get("duration_seconds"),
                            "answeredtime": call.get("answeredtime"),
                            "pulse": call.get("pulse"),
                        }
                        call_for_duration = {**call, **row}
                        recording_url = self._get_full_url(call["filename"], call["starttime"])
                        skip_min, probed_audio = self._min_duration_blocks_action(
                            call_for_duration,
                            recording_url,
                            log_prefix="Not ingested",
                        )
                        if skip_min:
                            skipped_short += 1
                            continue

                        if probed_audio is not None:
                            effective_duration = max(0, int(round(probed_audio)))
                        else:
                            effective_duration = shared_call_duration_seconds(call_for_duration)

                        dest_cursor.execute(
                            insert_query,
                            (
                                call.get("bid") or self.bid,
                                call["callid"],
                                recording_url,
                                0,
                                str(call.get("agentname") or ""),
                                str(call.get("groupname") or ""),
                                call["starttime"],
                                call.get("endtime"),
                                call.get("dialstatus") or "ANSWER",
                                call.get("emp_phone") or "",
                                call.get("clicktocalldid") or "",
                                (str(call.get("direction") or "inbound").strip().lower() or "inbound"),
                                None,
                                None,
                                None,
                                effective_duration,
                            ),
                        )
                        if dest_cursor.rowcount >= 1:
                            inserted += 1

                    if max_batch_start:
                        self._advance_ingest_watermark(max_batch_start)

                    if skipped_short:
                        logger.info(
                            "Did not ingest %s call(s) below min duration %ss for BID %s (not stored in DB)",
                            skipped_short,
                            self.min_call_duration_s,
                            self.bid,
                        )

                    logger.info(f"Successfully ingested {inserted} calls into local DB.")
                    return inserted
            finally:
                dest_conn.close()

        except Exception as e:
            logger.error(f"Error during ingestion: {e}")
            return 0

    def _is_manual_upload(self, call: dict) -> bool:
        return "/recording-uploads/" in str(call.get("fileurl") or "")

    def _validate_local_recording_upload(self, url: str):
        marker = "/recording-uploads/"
        url = str(url or "")
        if marker not in url:
            return None
        rel = unquote(url.split(marker, 1)[1].split("?", 1)[0])
        base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "recording_uploads"))
        full_path = os.path.abspath(os.path.join(base_dir, rel))
        if not full_path.startswith(base_dir + os.sep) and full_path != base_dir:
            return False
        if not os.path.isfile(full_path) or os.path.getsize(full_path) <= 0:
            return False
        return True

    def _is_trusted_mcube_recording_url(self, url: str) -> bool:
        parsed = urlparse(str(url or ""))
        if (parsed.hostname or "").lower() != "recordings.mcube.com":
            return False
        path = (parsed.path or "").lower()
        return "/mcubefiles" in path and path.endswith(
            (".wav", ".mp3", ".ogg", ".mpeg", ".m4a", ".webm", ".flac")
        )

    def _http_recording_reachable(self, url: str) -> bool:
        url = str(url or "").strip()
        if not url.startswith(("http://", "https://")):
            return False
        try:
            head = subprocess.run(
                ["curl", "-I", "-L", "--max-time", "20", "-s", url],
                capture_output=True,
                text=True,
            )
            if head.returncode == 0 and any(
                line.startswith("HTTP/") and any(code in line for code in (" 200", " 206"))
                for line in (head.stdout or "").splitlines()
            ):
                return True
        except Exception:
            pass
        try:
            ranged = subprocess.run(
                [
                    "curl",
                    "-L",
                    "--max-time",
                    "20",
                    "-s",
                    "-o",
                    "/dev/null",
                    "-w",
                    "%{http_code}",
                    "-r",
                    "0-1023",
                    url,
                ],
                capture_output=True,
                text=True,
            )
            if ranged.returncode == 0 and (ranged.stdout or "").strip() in ("200", "206"):
                return True
        except Exception:
            pass
        return False

    def validate_url(self, url, call=None):
        local_result = self._validate_local_recording_upload(url)
        if local_result is not None:
            return local_result

        if call and self._is_manual_upload(call):
            logger.info("[%s] Trusting manual upload URL after DB insert: %s", call.get("callid"), url)
            return True

        if self._http_recording_reachable(url):
            return True

        trust_mcube = os.getenv("ORCHESTRATOR_TRUST_MCUBE_URLS", "").lower() in ("1", "true", "yes")
        if trust_mcube and self._is_trusted_mcube_recording_url(url):
            logger.info(
                "[%s] HTTP probe inconclusive; trusting Mcube recording URL (ORCHESTRATOR_TRUST_MCUBE_URLS): %s",
                call.get("callid") if call else "?",
                url,
            )
            return True

        if self._is_trusted_mcube_recording_url(url):
            logger.info(
                "[%s] Mcube recording not reachable yet; will retry URL check: %s",
                call.get("callid") if call else "?",
                url,
            )
        return False

    def _has_transcript_sql(self, resp_alias: str = "s") -> str:
        return f"{resp_alias}.transcript IS NOT NULL AND TRIM({resp_alias}.transcript) != ''"

    def _parse_stt_retry_count(self, transcription_status) -> int:
        ts = str(transcription_status or "")
        if ts.startswith("stt_retry:"):
            try:
                return int(ts.split(":", 1)[1])
            except (TypeError, ValueError):
                return 0
        return 0

    def _stt_retry_cooldown_active(self, call: dict) -> bool:
        ts = str(call.get("transcription_status") or "")
        if not ts.startswith("stt_retry:"):
            return False
        updated_at = self._parse_datetime(call.get("updated_at"))
        if not updated_at:
            return False
        cooldown_minutes = max(1, int(os.getenv("STT_RETRY_COOLDOWN_MINUTES", "10")))
        age_minutes = (datetime.now() - updated_at).total_seconds() / 60.0
        if age_minutes < cooldown_minutes:
            logger.info(
                "[%s] STT retry cooldown active (%.0fm / %sm); skipping re-queue",
                call.get("callid"),
                age_minutes,
                cooldown_minutes,
            )
            return True
        return False

    def _is_transient_analytics_error(self, exc: Exception) -> bool:
        msg = str(exc).lower()
        transient_markers = (
            "could not connect",
            "timeout",
            "timed out",
            "throttl",
            "rate limit",
            "service unavailable",
            "connection reset",
            "endpoint url",
        )
        return any(marker in msg for marker in transient_markers)

    def repair_pipeline_state(self, limit: int = 50) -> Dict[str, int]:
        """Fix common status mismatches so STT/analytics can progress."""
        raw_table = f"`{self.bid}_raw_calls`"
        resp_table = f"`{self.bid}_sarvamresponse`"
        has_tx = self._has_transcript_sql("s")
        retry_after_minutes = max(5, int(os.getenv("ORCHESTRATOR_RETRY_STT_AFTER_MINUTES", "15")))
        stats = {
            "promoted_to_transcribed": 0,
            "reset_phantom_transcribed": 0,
            "reset_stuck_queued": 0,
            "reset_stt_failed": 0,
            "reset_invalid_url_revalidated": 0,
        }
        conn = self.get_db_connection()
        try:
            with conn.cursor() as cursor:
                cursor.execute(
                    f"""
                    UPDATE {raw_table} r
                    INNER JOIN {resp_table} s ON r.callid = s.callid
                    SET r.status = 2
                    WHERE r.status IN (1, -2) AND {has_tx}
                    """
                )
                stats["promoted_to_transcribed"] = cursor.rowcount

                def _reset_statuses(from_status, stat_key, extra_where=""):
                    cursor.execute(
                        f"""
                        SELECT r.callid FROM {raw_table} r
                        WHERE r.status = %s
                          AND r.fileurl IS NOT NULL AND r.fileurl != ''
                          {extra_where}
                        ORDER BY r.id DESC
                        LIMIT %s
                        """,
                        (from_status, limit),
                    )
                    call_ids = [str(row["callid"]) for row in (cursor.fetchall() or []) if row.get("callid")]
                    if not call_ids:
                        return
                    placeholders = ", ".join(["%s"] * len(call_ids))
                    cursor.execute(
                        f"UPDATE {raw_table} SET status = 0 WHERE callid IN ({placeholders})",
                        tuple(call_ids),
                    )
                    stats[stat_key] = cursor.rowcount

                _reset_statuses(
                    2,
                    "reset_phantom_transcribed",
                    f"""
                      AND NOT EXISTS (
                        SELECT 1 FROM {resp_table} s
                        WHERE s.callid = r.callid AND {has_tx}
                      )
                    """,
                )
                _reset_statuses(
                    1,
                    "reset_stuck_queued",
                    f"""
                      AND NOT EXISTS (
                        SELECT 1 FROM {resp_table} s
                        WHERE s.callid = r.callid AND {has_tx}
                      )
                    """,
                )
                _reset_statuses(
                    -2,
                    "reset_stt_failed",
                    f"""
                      AND COALESCE(r.transcription_status, '') != 'backlog_cleared'
                      AND (r.updated_at IS NULL OR r.updated_at < DATE_SUB(NOW(), INTERVAL {retry_after_minutes} MINUTE))
                      AND NOT EXISTS (
                        SELECT 1 FROM {resp_table} s
                        WHERE s.callid = r.callid AND {has_tx}
                      )
                    """,
                )

                cursor.execute(
                    f"""
                    SELECT callid, fileurl
                    FROM {raw_table}
                    WHERE status = -1
                      AND fileurl IS NOT NULL AND TRIM(fileurl) != ''
                    ORDER BY id DESC
                    LIMIT %s
                    """,
                    (limit,),
                )
                for row in cursor.fetchall() or []:
                    callid = str(row.get("callid") or "")
                    fileurl = row.get("fileurl")
                    if not callid or not fileurl:
                        continue
                    if self.validate_url(fileurl, {"callid": callid, "fileurl": fileurl}):
                        cursor.execute(
                            f"UPDATE {raw_table} SET status = 0 WHERE callid = %s AND status = -1",
                            (callid,),
                        )
                        if cursor.rowcount:
                            stats["reset_invalid_url_revalidated"] += 1
                            logger.info("[%s] Re-validated recording URL; reset status -1 → 0", callid)
            conn.commit()
        finally:
            conn.close()

        if any(stats.values()):
            logger.info("Pipeline state repair for BID %s: %s", self.bid, stats)
        return stats

    def _heal_phantom_queued_calls(self) -> int:
        """Reset status=1 calls that have no transcript and no queue job."""
        raw_table = f"`{self.bid}_raw_calls`"
        resp_table = f"`{self.bid}_sarvamresponse`"
        has_tx = "s.transcript IS NOT NULL AND TRIM(s.transcript) != ''"
        queue_depth = 0
        consumers = 0
        try:
            rmq_conn = pika.BlockingConnection(pika.ConnectionParameters(host=self.rabbitmq_host))
            ch = rmq_conn.channel()
            q = ch.queue_declare(queue=self.rabbitmq_queue, durable=True, passive=True)
            queue_depth = int(q.method.message_count)
            consumers = int(q.method.consumer_count)
            rmq_conn.close()
        except Exception:
            pass

        phantom_minutes = max(5, int(os.getenv("ORCHESTRATOR_PHANTOM_QUEUE_MINUTES", "12")))
        conn = self.get_db_connection()
        healed = 0
        try:
            with conn.cursor() as cursor:
                extra = ""
                target_workers = max(1, int(os.getenv("STT_WORKER_COUNT", "4")))
                if queue_depth > 0 and consumers >= target_workers:
                    extra = f"""
                      AND (updated_at IS NULL
                           OR updated_at < DATE_SUB(NOW(), INTERVAL {phantom_minutes} MINUTE))
                    """
                cursor.execute(
                    f"""
                    UPDATE {raw_table} r
                    SET r.status = 0,
                        r.transcription_status = 'not_requested',
                        r.transcription_requested = 0
                    WHERE r.status = 1
                      AND COALESCE(r.transcription_status, '') != 'backlog_cleared'
                      AND NOT EXISTS (
                        SELECT 1 FROM {resp_table} s
                        WHERE s.callid = r.callid AND {has_tx}
                      )
                      {extra}
                    """
                )
                healed = cursor.rowcount
            conn.commit()
        finally:
            conn.close()
        if healed:
            logger.info(
                "BID %s: healed %s phantom queued call(s) (status 1, no transcript, queue depth=%s)",
                self.bid,
                healed,
                queue_depth,
            )
        return healed

    def _log_transcription_backlog_hint(self) -> None:
        try:
            conn = self.get_db_connection()
            with conn.cursor() as cursor:
                cursor.execute(
                    f"""
                    SELECT status, COUNT(*) AS cnt
                    FROM `{self.bid}_raw_calls`
                    WHERE status IN (0, 1, -1, -2)
                    GROUP BY status
                    """
                )
                rows = cursor.fetchall() or []
            conn.close()
            if rows:
                summary = ", ".join(f"status {r['status']}={r['cnt']}" for r in rows)
                logger.info("Transcription backlog for BID %s: %s", self.bid, summary)
        except Exception as e:
            logger.warning("Could not log transcription backlog hint for BID %s: %s", self.bid, e)

    def trigger_transcription(self, call):
        call_id = str(call["callid"])
        recording_url = call["fileurl"]

        skip_min, probed_audio = self._min_duration_blocks_action(
            call,
            recording_url,
            log_prefix="Skipping transcription",
        )
        if skip_min:
            conn = self.get_db_connection()
            try:
                with conn.cursor() as cursor:
                    purge_unprocessed_if_below_min(
                        cursor,
                        self.bid,
                        call_id,
                        call,
                        self.min_call_duration_s,
                        self.min_call_duration_effective_at,
                        audio_duration_s=probed_audio,
                    )
                conn.commit()
            finally:
                conn.close()
            return False
        if probed_audio is not None:
            conn = self.get_db_connection()
            try:
                with conn.cursor() as cursor:
                    cursor.execute(
                        f"""
                        UPDATE `{self.bid}_raw_calls`
                        SET duration_seconds = %s
                        WHERE callid = %s
                        """,
                        (max(0, int(round(probed_audio))), call_id),
                    )
                conn.commit()
            finally:
                conn.close()

        logger.info(f"[{call_id}] Triggering transcription...")

        conn = self.get_db_connection()
        try:
            with conn.cursor() as cursor:
                resp_table = f"`{self.bid}_sarvamresponse`"
                cursor.execute(
                    f"""
                    SELECT id FROM {resp_table}
                    WHERE callid = %s
                      AND transcript IS NOT NULL
                      AND TRIM(transcript) != ''
                    """,
                    (call_id,),
                )
                if cursor.fetchone():
                    logger.info(f"[{call_id}] Transcription already exists in {resp_table}. Skipping queuing.")
                    cursor.execute(f"UPDATE `{self.bid}_raw_calls` SET status = 2 WHERE callid = %s", (call_id,))
                    return True

                cursor.execute(
                    f"SELECT status, transcription_status FROM `{self.bid}_raw_calls` WHERE callid = %s LIMIT 1",
                    (call_id,),
                )
                current = cursor.fetchone() or {}
                current_status = int(current.get("status") if current.get("status") is not None else -99)
                if current_status in (1, 2, 3):
                    logger.info(
                        f"[{call_id}] Skip RabbitMQ publish (raw_calls status={current_status} already queued or done)."
                    )
                    return True

                if str(current.get("transcription_status") or "") == "backlog_cleared":
                    logger.info(f"[{call_id}] Skip RabbitMQ publish (backlog_cleared terminal failure).")
                    return False

                cursor.execute(
                    f"""
                    UPDATE `{self.bid}_raw_calls`
                    SET status = 1
                    WHERE callid = %s
                      AND status IN (0, -2)
                      AND COALESCE(transcription_status, '') != 'backlog_cleared'
                    """,
                    (call_id,),
                )
                if cursor.rowcount < 1:
                    logger.info(f"[{call_id}] Skip RabbitMQ publish (status changed concurrently).")
                    return False

                rmq_conn = pika.BlockingConnection(pika.ConnectionParameters(host=self.rabbitmq_host))
                channel = rmq_conn.channel()
                channel.queue_declare(queue=self.rabbitmq_queue, durable=True)

                job_payload = {
                    "bid": self.bid,
                    "call_id": call_id,
                    "recording_url": recording_url,
                }

                channel.basic_publish(
                    exchange="",
                    routing_key=self.rabbitmq_queue,
                    body=json.dumps(job_payload),
                    properties=pika.BasicProperties(delivery_mode=2),
                )
                rmq_conn.close()
                logger.info(f"[{call_id}] Successfully queued for transcription.")
                return True
        except Exception as e:
            logger.error(f"[{call_id}] Failed to queue transcription: {e}")
            try:
                with conn.cursor() as cursor:
                    cursor.execute(
                        f"""
                        UPDATE `{self.bid}_raw_calls`
                        SET status = 0
                        WHERE callid = %s AND status = 1
                        """,
                        (call_id,),
                    )
            except Exception:
                pass
            return False
        finally:
            conn.close()

    def _run_propensity_if_enabled(self, call_id: str, transcript: str) -> None:
        """Non-fatal sales propensity scoring after quality analytics."""
        try:
            cfg = self.db_handler.get_pipeline_config(self.bid) or {}
            if int(cfg.get("propensity_enabled") or 0) != 1:
                return

            from propensity_scoring import score_call_propensity

            analytics = self.db_handler.get_call_analytics(self.bid, call_id)
            if not analytics:
                return

            payload = score_call_propensity(
                self.bid,
                transcript,
                analytics_context=analytics,
                config=self.config_wrapped,
            )
            if not payload:
                return

            saved = self.db_handler.save_propensity_analytics(self.bid, call_id, payload)
            if saved:
                logger.info(
                    "[%s] Propensity score=%s band=%s",
                    call_id,
                    payload.get("propensity_score"),
                    payload.get("propensity_band"),
                )
        except Exception as exc:
            logger.warning("[%s] Propensity scoring skipped (non-fatal): %s", call_id, exc)

    def trigger_analytics(self, call_id):
        logger.info(f"[{call_id}] Triggering analytics...")
        try:
            conn = self.get_db_connection()
            try:
                with conn.cursor() as cursor:
                    cursor.execute(
                        f"SELECT status FROM `{self.bid}_raw_calls` WHERE callid = %s LIMIT 1",
                        (call_id,),
                    )
                    row = cursor.fetchone()
                    if row and int(row.get("status") or 0) == 3:
                        logger.info(f"[{call_id}] Already analyzed (status=3), skipping")
                        return True
                    cursor.execute(
                        f"SELECT id FROM `{self.bid}_callanalytics` WHERE callid = %s LIMIT 1",
                        (call_id,),
                    )
                    if cursor.fetchone():
                        logger.info(f"[{call_id}] Analytics row already exists, skipping")
                        return True
            finally:
                conn.close()
        except Exception as exc:
            logger.warning(f"[{call_id}] Could not check analytics idempotency: {exc}")

        max_attempts = max(1, int(os.getenv("ANALYTICS_MAX_RETRIES", "3")))
        last_error = None
        for attempt in range(max_attempts):
            try:
                transcript_data = self.db_handler.get_call_transcript(self.bid, call_id)
                if not transcript_data or not transcript_data.get("transcript"):
                    logger.error(f"[{call_id}] No transcript found in response table.")
                    return False

                transcript = transcript_data["transcript"]
                speaker_segments = transcript_data.get("speaker_segments")
                if speaker_segments and isinstance(speaker_segments, str):
                    speaker_segments = json.loads(speaker_segments)
                duration = transcript_data.get("duration")

                result = self.analyzer.analyze_call(
                    bid=self.bid,
                    callid=call_id,
                    transcript=transcript,
                    speaker_segments=speaker_segments or [],
                    actual_duration=float(duration) if duration else None,
                )

                logger.info(f"[{call_id}] Analytics complete. Quality Score: {result.get('quality_score')}%")

                conn = self.get_db_connection()
                try:
                    with conn.cursor() as cursor:
                        duration_seconds = None
                        if duration:
                            try:
                                duration_seconds = int(round(float(duration)))
                            except (TypeError, ValueError):
                                duration_seconds = None
                        if duration_seconds is not None:
                            cursor.execute(
                                f"""
                                UPDATE `{self.bid}_raw_calls`
                                SET status = 3,
                                    duration_seconds = %s,
                                    call_endtime = DATE_ADD(call_starttime, INTERVAL %s SECOND)
                                WHERE callid = %s
                                """,
                                (duration_seconds, duration_seconds, call_id),
                            )
                        else:
                            cursor.execute(
                                f"UPDATE `{self.bid}_raw_calls` SET status = 3 WHERE callid = %s",
                                (call_id,),
                            )

                    analytics_logger.info(
                        f"SUCCESS: Analytics created/updated in {self.bid}_callanalytics for callid={call_id} "
                        f"with Quality Score={result.get('quality_score')}"
                    )
                    logger.info(f"Analytics is completed for record {call_id}")
                finally:
                    conn.close()

                self._run_propensity_if_enabled(call_id, transcript)

                if os.getenv("ORCHESTRATOR_PUSH_LEADSQUARED", "1").lower() not in ("0", "false", "no"):
                    try:
                        from leadsquared_activity_push import call_dict_for_lsq_push, push_leadsquared_activities

                        raw_call = self.db_handler.get_raw_call_details(self.bid, call_id)
                        if raw_call:
                            push_leadsquared_activities(
                                self.bid,
                                call_dict_for_lsq_push(raw_call),
                                result,
                                self.db_handler,
                            )
                    except Exception as exc:
                        logger.exception(f"[{call_id}] LeadSquared push failed: {exc}")

                if os.getenv("ORCHESTRATOR_PUSH_API", "1").lower() not in ("0", "false", "no"):
                    try:
                        from api_push_service import ApiPushService

                        ApiPushService(self.db_handler).push_call_update(
                            self.bid,
                            call_id,
                            analytics_data=result,
                            trigger_event="analytics_saved",
                        )
                    except Exception as exc:
                        logger.exception(f"[{call_id}] API push failed: {exc}")

                return True
            except Exception as e:
                last_error = e
                if attempt < max_attempts - 1 and self._is_transient_analytics_error(e):
                    wait_s = min(30, 2 ** attempt)
                    logger.warning(
                        "[%s] Analytics transient error (attempt %s/%s): %s — retry in %ss",
                        call_id,
                        attempt + 1,
                        max_attempts,
                        e,
                        wait_s,
                    )
                    time.sleep(wait_s)
                    continue
                logger.error(f"[{call_id}] Analytics failed: {e}")
                return False
        if last_error:
            logger.error(f"[{call_id}] Analytics failed after {max_attempts} attempts: {last_error}")
        return False

    def _enqueue_analytics_backlog(self, cursor, limit: int) -> int:
        """Backup: re-queue status=2 calls missed by STT analytics publish."""
        if not self.analytics_enabled:
            return 0
        from analytics_queue import publish_analytics_job

        raw_table = f"`{self.bid}_raw_calls`"
        analytics_table = f"`{self.bid}_callanalytics`"
        cursor.execute(
            f"""
            SELECT r.callid
            FROM {raw_table} r
            LEFT JOIN {analytics_table} a ON r.callid = a.callid
            WHERE r.status = 2 AND a.id IS NULL
            ORDER BY r.id DESC
            LIMIT %s
            """,
            (limit,),
        )
        rows = cursor.fetchall() or []
        queued = 0
        for row in rows:
            call_id = str(row.get("callid") or "").strip()
            if call_id and publish_analytics_job(self.bid, call_id):
                queued += 1
        if queued:
            logger.info(
                "BID %s: enqueued %s status=2 call(s) to analytics_jobs backlog",
                self.bid,
                queued,
            )
        return queued

    def run(self, limit=10):
        logger.info(f"Starting orchestration for BID {self.bid}, limit {limit}")

        self.repair_pipeline_state(limit=limit)
        self._heal_phantom_queued_calls()
        self._log_transcription_backlog_hint()
        if self.webhook_ingest_enabled:
            logger.info(
                "Skipping polling ingest for BID %s (webhook ingest enabled)",
                self.bid,
            )
        else:
            self.ingest_calls(limit=limit)

        conn = self.get_db_connection()
        try:
            with conn.cursor() as cursor:
                raw_table = f"`{self.bid}_raw_calls`"
                where_parts = [
                    "status = 0",
                    "fileurl IS NOT NULL",
                    "fileurl != ''",
                    "COALESCE(transcription_status, '') != 'backlog_cleared'",
                    "call_starttime >= DATE_SUB(NOW(), INTERVAL %s DAY)",
                ]
                cursor.execute(
                    f"SELECT * FROM {raw_table} WHERE {' AND '.join(where_parts)} "
                    f"ORDER BY id DESC LIMIT %s",
                    (self.lookback_days, limit),
                )
                candidates = cursor.fetchall()

                valid_calls = []
                for call in candidates:
                    if self._stt_retry_cooldown_active(call):
                        continue

                    current_url = call["fileurl"]
                    repaired_url = current_url
                    for candidate_url in self._recording_url_candidates(current_url, call["call_starttime"]):
                        if candidate_url != current_url:
                            logger.info(f"[{call['callid']}] Trying alternate recording URL: {candidate_url}")
                        if self.validate_url(candidate_url, call):
                            repaired_url = candidate_url
                            break

                    if repaired_url != current_url:
                        logger.info(f"[{call['callid']}] Repaired truncated URL: {repaired_url}")
                        cursor.execute(
                            f"UPDATE `{self.bid}_raw_calls` SET fileurl = %s WHERE callid = %s",
                            (repaired_url, call["callid"]),
                        )
                        call["fileurl"] = repaired_url

                    if not self.validate_url(call["fileurl"], call):
                        retry_minutes = max(5, int(os.getenv("ORCHESTRATOR_URL_RETRY_MINUTES", "45")))
                        call_start = self._parse_datetime(call.get("call_starttime"))
                        age_minutes = None
                        if call_start:
                            age_minutes = (datetime.now() - call_start).total_seconds() / 60.0
                        if age_minutes is not None and age_minutes < retry_minutes:
                            logger.warning(
                                "[%s] Recording URL not ready yet (age=%.0fm < %sm); "
                                "will retry instead of marking invalid: %s",
                                call["callid"],
                                age_minutes,
                                retry_minutes,
                                call["fileurl"],
                            )
                        else:
                            logger.warning(f"[{call['callid']}] Invalid file URL: {call['fileurl']}")
                            cursor.execute(
                                f"UPDATE `{self.bid}_raw_calls` SET status = -1 WHERE callid = %s",
                                (call["callid"],),
                            )
                        continue

                    skip_min, probed_audio = self._min_duration_blocks_action(
                        call,
                        call["fileurl"],
                        log_prefix="Skipping STT queue",
                    )
                    if skip_min:
                        purge_unprocessed_if_below_min(
                            cursor,
                            self.bid,
                            call["callid"],
                            call,
                            self.min_call_duration_s,
                            self.min_call_duration_effective_at,
                            audio_duration_s=probed_audio,
                        )
                        continue

                    valid_calls.append(call)

                logger.info(f"Found {len(valid_calls)} valid calls to queue for transcription.")
                for call in valid_calls:
                    self.trigger_transcription(call)

                if not self.analytics_enabled:
                    logger.info(
                        "Analytics is disabled for BID %s (analysis_mode=%s). "
                        "Leaving transcribed calls at status=2.",
                        self.bid,
                        self.analysis_mode,
                    )
                    return

                from analytics_queue import event_driven_analytics_enabled

                if event_driven_analytics_enabled() or self.webhook_ingest_enabled:
                    logger.info(
                        "Skipping orchestrator batch analytics for BID %s "
                        "(event-driven analytics_jobs queue)",
                        self.bid,
                    )
                    self._enqueue_analytics_backlog(cursor, limit)
                    return

                cursor.execute(
                    f"SELECT callid FROM {raw_table} WHERE status = 2 ORDER BY id DESC LIMIT %s",
                    (limit,),
                )
                analyzable_calls = cursor.fetchall()

                logger.info(f"Found {len(analyzable_calls)} calls ready for analytics.")
                for call in analyzable_calls:
                    call_id = str(call["callid"])
                    logger.info(f"Transcription is completed for record {call_id}")
                    self.trigger_analytics(call_id)

        finally:
            conn.close()

        logger.info("This instance of the orchestrator job is completed.")


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("--bid", default="1713")
    parser.add_argument("--limit", type=int, default=10)
    args = parser.parse_args()

    orch = Orchestrator(args.bid)
    orch.run(limit=args.limit)
