"""
Campaign endpoints expected by the React app (/api/campaigns/...).

These must run on the existing Django API host (localhost:8000) so the frontend
doesn't need a base URL change.

Data sources (matches backend/apps/cluster/dynamic_tables.py):
- Cluster DB: `{business_id}_campaigns`, `{business_id}_campaign_contacts`, `{business_id}_call_history`
- Master DB: `did_numbers`
"""

from __future__ import annotations

import json
import os
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from typing import Any

from django.db import close_old_connections, connections
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response

from apps.cluster.dynamic_tables import _q_ident, _table_names
from config.reporting_utils import _legacy_profile_for_user, cluster_table_exists


def _resolve_business_id(request) -> int | None:
    legacy = _legacy_profile_for_user(request.user)
    bid = legacy.get("business_id") if legacy else None
    try:
        return int(bid) if bid is not None else None
    except Exception:
        return None


def _parse_int(q: Any, key: str, default: int, *, min_v: int = 1, max_v: int = 10_000) -> int:
    try:
        v = int(q.get(key, default))
    except Exception:
        v = default
    return max(min_v, min(v, max_v))


def _php_truthy(v: Any) -> bool:
    if isinstance(v, bool):
        return v
    s = str(v).strip().lower()
    return s in ("1", "true", "on", "yes")


def _now_ist() -> datetime:
    return datetime.now(ZoneInfo("Asia/Kolkata"))


def _fmt_dt(dt: datetime) -> str:
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=ZoneInfo("Asia/Kolkata"))
    else:
        dt = dt.astimezone(ZoneInfo("Asia/Kolkata"))
    return dt.strftime("%Y-%m-%d %H:%M:%S")


def _parse_dt_any(raw: Any) -> datetime:
    """
    Accepts ISO strings or 'Y-m-d H:i:s'. Best-effort; falls back to now (IST).
    """
    if raw is None or str(raw).strip() == "":
        return _now_ist()
    s = str(raw).strip()
    # Fast path: "YYYY-mm-dd HH:MM:SS"
    for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f"):
        try:
            dt = datetime.strptime(s.replace("Z", ""), fmt)
            return dt.replace(tzinfo=ZoneInfo("Asia/Kolkata"))
        except Exception:
            pass
    try:
        # Python 3.11 ISO parser
        dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=ZoneInfo("Asia/Kolkata"))
        return dt.astimezone(ZoneInfo("Asia/Kolkata"))
    except Exception:
        return _now_ist()


def _fetch_retry_attempt_values(business_id: int) -> list[int]:
    """
    businesses.retry_attempt is JSON (object). Extract numeric values preserving order.
    """
    with connections["default"].cursor() as cur:
        cur.execute("SELECT retry_attempt FROM businesses WHERE business_id = %s LIMIT 1", [business_id])
        row = cur.fetchone()
        raw = row[0] if row else None
    if raw in (None, ""):
        return []
    if isinstance(raw, str):
        try:
            raw = json.loads(raw)
        except Exception:
            return []
    if not isinstance(raw, dict):
        return []
    out: list[int] = []
    for _k, v in raw.items():
        try:
            if isinstance(v, (int, float)) or str(v).replace(".", "", 1).isdigit():
                out.append(int(float(v)))
        except Exception:
            continue
    return out


def _compute_waiting_time_values(business_id: int, retry_logic: str, custom_attempt_values: Any) -> str:
    if retry_logic == "business_logic":
        vals = _fetch_retry_attempt_values(business_id)
        return ",".join(str(v) for v in vals) if vals else ""
    if retry_logic == "custom_logic":
        if not isinstance(custom_attempt_values, list) or not custom_attempt_values:
            return ""
        minutes: list[int] = []
        for v in custom_attempt_values:
            try:
                n = float(v) if v is not None else 0.0
            except Exception:
                n = 0.0
            minutes.append(int(round(n * 60)))
        return ",".join(str(m) for m in minutes if m >= 0)
    if retry_logic == "not_retry":
        return ""
    return ""


def _fetch_executive_number(business_id: int, did_no: str, bot_id: Any) -> str:
    """
    Mirrors Laravel fetch of exe_number/did_no from did_numbers (direction='outbound' when exists).
    Falls back to provided did_no.
    """
    if not did_no:
        return ""
    with connections["default"].cursor() as cur:
        cur.execute("SHOW TABLES LIKE 'did_numbers'")
        if cur.fetchone() is None:
            return did_no

        # Optional columns
        cur.execute("SHOW COLUMNS FROM did_numbers LIKE 'direction'")
        has_direction = cur.fetchone() is not None
        cur.execute("SHOW COLUMNS FROM did_numbers LIKE 'exe_number'")
        has_exe_number = cur.fetchone() is not None

        sql = "SELECT did_no"
        if has_exe_number:
            sql += ", exe_number"
        sql += " FROM did_numbers WHERE business_id = %s AND did_no = %s"
        params: list[Any] = [business_id, did_no]
        if bot_id not in (None, "", 0):
            sql += " AND bot_id = %s"
            params.append(int(bot_id))
        if has_direction:
            sql += " AND direction = 'outbound'"
        sql += " LIMIT 1"

        cur.execute(sql, params)
        row = cur.fetchone()
        if not row:
            return did_no
        if has_exe_number and len(row) > 1 and row[1] not in (None, ""):
            return str(row[1])
        return str(row[0])


def _master_table_exists(table: str) -> bool:
    try:
        with connections["default"].cursor() as cur:
            cur.execute("SHOW TABLES LIKE %s", [table])
            return cur.fetchone() is not None
    except Exception:
        return False


def _parse_contacts_from_csv_bytes(content: bytes) -> tuple[list[dict[str, str]], list[str]] | None:
    """
    Laravel-like ultra-robust parsing:
    - tries delimiters: ',', '\\t', ';', '|'
    - detects name + contact columns by header variations
    """
    try:
        text = content.decode("utf-8", errors="replace")
        lines = [ln for ln in text.split("\n") if ln.strip() != ""]
        if len(lines) < 2:
            return None

        headers: list[str] = []
        name_idx = -1
        contact_idx = -1
        contacts: list[dict[str, str]] = []

        delimiters = [",", "\t", ";", "|"]

        for i, line in enumerate(lines):
            line = line.strip()
            if not line:
                continue

            row: list[str] = []
            for d in delimiters:
                if d == ",":
                    # minimal CSV parsing (quotes)
                    import csv
                    import io

                    row = next(csv.reader(io.StringIO(line), delimiter=d))
                else:
                    row = line.split(d)
                if len(row) >= 2:
                    break

            row = [str(x).strip(" \t\n\r\0\x0B\"'") for x in row]

            if i == 0:
                headers = row
                for j, h in enumerate(headers):
                    n = h.lower().strip()
                    if any(k in n for k in ("name", "fullname", "full_name", "customer_name")):
                        name_idx = j
                    if any(
                        k in n
                        for k in (
                            "contact",
                            "phone",
                            "mobile",
                            "number",
                            "phone_number",
                            "mobile_number",
                        )
                    ):
                        contact_idx = j
                continue

            if name_idx >= 0 and contact_idx >= 0:
                name = row[name_idx] if name_idx < len(row) else ""
                contact = row[contact_idx] if contact_idx < len(row) else ""
                if name or contact:
                    contacts.append({"name": name, "contact": contact})

        if name_idx == -1 or contact_idx == -1 or not contacts:
            return None
        return contacts, headers
    except Exception:
        return None


def _parse_contacts_from_xlsx_bytes(content: bytes) -> tuple[list[dict[str, str]], list[str]] | None:
    try:
        import io

        import openpyxl

        wb = openpyxl.load_workbook(io.BytesIO(content), read_only=True, data_only=True)
        ws = wb[wb.sheetnames[0]]
        rows_iter = ws.iter_rows(values_only=True)
        header_row = next(rows_iter, None)
        if not header_row:
            return None
        headers = [str(c).strip() if c is not None else "" for c in header_row]

        name_idx = -1
        contact_idx = -1
        for j, h in enumerate(headers):
            n = h.lower().strip()
            if any(k in n for k in ("name", "fullname", "full_name", "customer_name")):
                name_idx = j
            if any(
                k in n
                for k in (
                    "contact",
                    "phone",
                    "mobile",
                    "number",
                    "phone_number",
                    "mobile_number",
                )
            ):
                contact_idx = j

        if name_idx == -1 or contact_idx == -1:
            return None

        contacts: list[dict[str, str]] = []
        for r in rows_iter:
            row = list(r or [])
            name = str(row[name_idx]).strip() if name_idx < len(row) and row[name_idx] is not None else ""
            contact = (
                str(row[contact_idx]).strip()
                if contact_idx < len(row) and row[contact_idx] is not None
                else ""
            )
            if name or contact:
                contacts.append({"name": name, "contact": contact})

        wb.close()
        if not contacts:
            return None
        return contacts, headers
    except Exception:
        return None


def _retry_value_for_contact_insert(bid: int, campaign_row: dict[str, Any]) -> tuple[int, bool]:
    """
    Port of Laravel storeCampaignContacts logic (simplified to match our campaign creation):
    - run_now=0 => set retry now based on waiting_time, unless retry_logic=not_retry
    - run_now=1 => retry=0 (scheduled will be set at start)
    Returns (retry_value, is_run_now).
    """
    is_run_now = int(campaign_row.get("run_now") or 0) == 0
    retry_logic = str(campaign_row.get("retry_logic") or "")

    if not is_run_now:
        return 0, False

    if retry_logic == "not_retry":
        return 1, True

    waiting = str(campaign_row.get("waiting_time") or "").strip()
    if waiting:
        parts = [p.strip() for p in waiting.split(",") if p.strip() and p.strip().replace(".", "", 1).isdigit()]
        return (len(parts) + 1) if parts else 1, True

    # waiting_time empty: for business_logic fetch from businesses.retry_attempt
    if retry_logic == "business_logic":
        vals = _fetch_retry_attempt_values(bid)
        return (len(vals) + 1) if vals else 1, True

    return 1, True


@api_view(["POST"])
@permission_classes([IsAuthenticated])
def campaign_contacts_upload(request, campaign_id: int):
    """
    Endpoint expected by the React app:
    POST /api/campaigns/{id}/contacts
    multipart/form-data with field `file`.

    Stores contacts into cluster table `{business_id}_campaign_contacts` and increments
    `{business_id}_campaigns.total_count`.
    """
    bid = _resolve_business_id(request)
    if not bid:
        return Response({"detail": "No business profile found for this user."}, status=400)

    campaigns_table = f"{bid}_campaigns"
    contacts_table = f"{bid}_campaign_contacts"

    if not cluster_table_exists(campaigns_table):
        return Response({"message": "Campaign not found"}, status=404)

    if not cluster_table_exists(contacts_table):
        return Response({"message": "Campaign contacts table does not exist"}, status=404)

    up = request.FILES.get("file")
    if not up:
        return Response({"message": "Validation failed", "errors": {"file": ["required"]}}, status=422)

    if up.size and int(up.size) > 10 * 1024 * 1024:
        return Response({"message": "Validation failed", "errors": {"file": ["max 10MB"]}}, status=422)

    filename = (getattr(up, "name", "") or "").lower()
    content = up.read()

    parsed = None
    if filename.endswith(".xlsx"):
        parsed = _parse_contacts_from_xlsx_bytes(content)
    else:
        parsed = _parse_contacts_from_csv_bytes(content)

    if not parsed:
        return Response({"message": "No valid contact data found", "success": False}, status=400)

    contacts, _headers = parsed

    with connections["cluster"].cursor() as cur:
        cur.execute(
            f"SELECT * FROM {_q_ident(campaigns_table)} WHERE campaign_id = %s LIMIT 1",
            [int(campaign_id)],
        )
        camp_row = cur.fetchone()
        if not camp_row:
            return Response({"message": "Campaign not found"}, status=404)
        camp_cols = [c[0] for c in cur.description]
        campaign = dict(zip(camp_cols, camp_row))

        current_total = int(campaign.get("total_count") or 0)

        retry_value, _is_run_now = _retry_value_for_contact_insert(bid, campaign)

        now_s = _fmt_dt(_now_ist())
        inserted_contacts: list[dict[str, Any]] = []

        for c in contacts:
            name = (c.get("name") or "").strip()
            number = (c.get("contact") or "").strip()
            cur.execute(
                f"""
                INSERT INTO {_q_ident(contacts_table)}
                (business_id, campaign_id, name, number, source, status, `retry`, created_at, updated_at)
                VALUES (%s, %s, %s, %s, %s, 'pending', %s, %s, %s)
                """,
                [bid, int(campaign_id), name, number, filename, retry_value, now_s, now_s],
            )
            inserted_contacts.append(
                {
                    "id": cur.lastrowid,
                    "number": number,
                    "status": "pending",
                    "name": name,
                    "campaign_id": int(campaign_id),
                    "business_id": bid,
                }
            )

        new_count = len(contacts)
        updated_total = current_total + new_count
        cur.execute(
            f"UPDATE {_q_ident(campaigns_table)} SET total_count = %s WHERE campaign_id = %s",
            [updated_total, int(campaign_id)],
        )

    # Auto-trigger outbound calls sequentially (2s delay) after upload.
    # This runs in a background thread so the upload response returns immediately.
    def _autocall_worker(
        *,
        business_id: int,
        campaign_id: int,
        contact_ids: list[int],
        agent_name: str,
    ) -> None:
        close_old_connections()
        outbound_url = (
            os.getenv("MCUBE_LOCAL_OUTBOUND_URL")
            or os.getenv("MCUBE_OUTBOUND_CALL_URL")
            or "https://app3.syntheon.in/api/mcube/outbound-call"
        ).strip()

        public_base = (os.getenv("MCUBE_PUBLIC_BASE_URL") or "").strip().rstrip("/")
        public_ws = (os.getenv("MCUBE_PUBLIC_WS_URL_BASE") or "").strip().rstrip("/")
        ws_prefix = (os.getenv("MCUBE_WS_PATH_PREFIX") or "/ws").strip()
        if ws_prefix and not ws_prefix.startswith("/"):
            ws_prefix = "/" + ws_prefix
        webhook_path = (os.getenv("MCUBE_WEBHOOK_PATH") or "/webhooks/mcube").strip()
        if webhook_path and not webhook_path.startswith("/"):
            webhook_path = "/" + webhook_path

        callback_url = f"{public_base}{webhook_path}" if public_base else ""

        # Best-effort: only update optional columns if they exist.
        has_call_id = False
        has_dialstatus = False
        with connections["cluster"].cursor() as cur2:
            cur2.execute(f"SHOW COLUMNS FROM {_q_ident(contacts_table)} LIKE 'call_id'")
            has_call_id = cur2.fetchone() is not None
            cur2.execute(f"SHOW COLUMNS FROM {_q_ident(contacts_table)} LIKE 'dialstatus'")
            has_dialstatus = cur2.fetchone() is not None

        for idx, cid in enumerate(contact_ids):
            try:
                with connections["cluster"].cursor() as cur3:
                    cur3.execute(
                        f"SELECT id, number FROM {_q_ident(contacts_table)} "
                        "WHERE id = %s AND campaign_id = %s AND business_id = %s LIMIT 1",
                        [int(cid), int(campaign_id), int(business_id)],
                    )
                    row = cur3.fetchone()
                    if not row:
                        continue
                    number = str(row[1] or "").strip()
                    if not number:
                        # mark failed
                        if has_dialstatus:
                            cur3.execute(
                                f"UPDATE {_q_ident(contacts_table)} SET status='failed', dialstatus=%s, updated_at=%s "
                                "WHERE id=%s",
                                ["missing_number", _fmt_dt(_now_ist()), int(cid)],
                            )
                        else:
                            cur3.execute(
                                f"UPDATE {_q_ident(contacts_table)} SET status='failed', updated_at=%s WHERE id=%s",
                                [_fmt_dt(_now_ist()), int(cid)],
                            )
                        continue

                    refid = f"cmp_{campaign_id}_contact_{cid}"
                    if has_call_id:
                        if has_dialstatus:
                            cur3.execute(
                                f"UPDATE {_q_ident(contacts_table)} SET status='calling', call_id=%s, dialstatus=%s, updated_at=%s "
                                "WHERE id=%s",
                                [refid, "calling", _fmt_dt(_now_ist()), int(cid)],
                            )
                        else:
                            cur3.execute(
                                f"UPDATE {_q_ident(contacts_table)} SET status='calling', call_id=%s, updated_at=%s WHERE id=%s",
                                [refid, _fmt_dt(_now_ist()), int(cid)],
                            )
                    else:
                        if has_dialstatus:
                            cur3.execute(
                                f"UPDATE {_q_ident(contacts_table)} SET status='calling', dialstatus=%s, updated_at=%s WHERE id=%s",
                                ["calling", _fmt_dt(_now_ist()), int(cid)],
                            )
                        else:
                            cur3.execute(
                                f"UPDATE {_q_ident(contacts_table)} SET status='calling', updated_at=%s WHERE id=%s",
                                [_fmt_dt(_now_ist()), int(cid)],
                            )

                # Build refurl pointing to WS bridge for this call id.
                refurl = f"{public_ws}{ws_prefix}/{refid}" if public_ws else ""

                payload = {
                    "to": number,
                    "agent_name": agent_name,
                    "call_id": refid,
                }
                if callback_url:
                    payload["callback_url"] = callback_url
                if refurl:
                    payload["refurl"] = refurl

                data = json.dumps(payload).encode("utf-8")
                req = urllib.request.Request(
                    outbound_url,
                    data=data,
                    headers={"content-type": "application/json"},
                    method="POST",
                )
                try:
                    with urllib.request.urlopen(req, timeout=20.0) as resp:
                        raw = resp.read()
                        try:
                            resp_json = json.loads(raw.decode("utf-8"))
                        except Exception:
                            resp_json = {}
                        mcube_sid = str(resp_json.get("mcube_call_sid") or "").strip()
                        status_txt = str(resp_json.get("status") or "initiated").strip()
                        ok = bool(resp_json.get("ok")) if isinstance(resp_json, dict) else False
                except urllib.error.HTTPError as e:
                    ok = False
                    status_txt = f"http_{getattr(e, 'code', 500)}"
                    mcube_sid = ""
                except Exception:
                    ok = False
                    status_txt = "request_error"
                    mcube_sid = ""

                # NOTE: Database updates removed - Laravel will handle all database updates via webhook
                # Previously: Python backend updated campaign_contacts table with status, call_id, dialstatus
                # Now: Laravel webhook handler receives MCube callbacks and updates database
                # This prevents conflicts and ensures single source of truth for database updates

            except Exception:
                # Never crash the worker; best-effort only.
                pass

            if idx < len(contact_ids) - 1:
                time.sleep(2.0)

        close_old_connections()

    try:
        contact_ids = [int(c.get("id")) for c in inserted_contacts if c.get("id") is not None]
        agent_name = str(campaign.get("bot_id") or "default").strip() or "default"
        if contact_ids:
            threading.Thread(
                target=_autocall_worker,
                kwargs={
                    "business_id": int(bid),
                    "campaign_id": int(campaign_id),
                    "contact_ids": contact_ids,
                    "agent_name": agent_name,
                },
                daemon=True,
                name=f"campaign_autocall_{bid}_{campaign_id}",
            ).start()
    except Exception:
        # Best-effort only; upload should still succeed.
        pass

    return Response(
        {
            "message": f"Successfully parsed {len(contacts)} contacts",
            "contacts_imported": new_count,
            "total_count": updated_total,
            "previous_count": current_total,
            "contacts_data": contacts,
            "inserted_contacts": inserted_contacts,
            "campaign_id": int(campaign_id),
            "success": True,
        }
    )


@api_view(["GET", "POST"])
@permission_classes([IsAuthenticated])
def campaigns_index(request):
    bid = _resolve_business_id(request)
    if not bid:
        return Response({"detail": "No business profile found for this user."}, status=400)

    # POST /api/campaigns  (create)
    if request.method == "POST":
        data = request.data if isinstance(request.data, dict) else {}

        name = (data.get("name") or "").strip()
        if not name:
            return Response(
                {"message": "Validation failed", "errors": {"name": ["required"]}},
                status=422,
            )

        description = data.get("description")
        run_now_flag = _php_truthy(data.get("run_now", False))

        # IMPORTANT: your UI sends run_now=false for run-now campaigns.
        # Behavior (Laravel parity):
        # - run_now=false => start_time=now, end_time=now+2y, status='active'
        # - run_now=true  => start_time=user-selected (or now if missing), end=start+2y, status='scheduled'
        if run_now_flag:
            start = _parse_dt_any(data.get("start_time"))
        else:
            start = _now_ist()
        end = start + timedelta(days=365 * 2)

        callback_enabled = _php_truthy(data.get("callback_enabled", True))
        retry_time = 1 if callback_enabled else 0

        retry_logic = str(data.get("retry_logic") or "")
        custom_attempt_values = data.get("custom_attempt_values", [])
        waiting_time_values = _compute_waiting_time_values(bid, retry_logic, custom_attempt_values)

        exeuctive_number = str(data.get("exeuctive_number") or "").strip()
        bot_name = data.get("bot_name")
        executive_number = _fetch_executive_number(bid, exeuctive_number, bot_name)

        campaigns_table = f"{bid}_campaigns"
        if not cluster_table_exists(campaigns_table):
            return Response(
                {"message": "Campaign table not found. Please contact administrator.", "error": "Table does not exist"},
                status=500,
            )

        with connections["cluster"].cursor() as cur:
            # Optional columns (backward compatible)
            cur.execute(f"SHOW COLUMNS FROM {_q_ident(campaigns_table)} LIKE 'bot_id'")
            has_bot_id = cur.fetchone() is not None
            cur.execute(f"SHOW COLUMNS FROM {_q_ident(campaigns_table)} LIKE 'exeuctive_number'")
            has_exeuctive_number_col = cur.fetchone() is not None
            cur.execute(f"SHOW COLUMNS FROM {_q_ident(campaigns_table)} LIKE 'retry_logic'")
            has_retry_logic_col = cur.fetchone() is not None

            insert_cols = [
                "business_id",
                "name",
                "description",
                "status",
                "total_count",
                "waiting_time",
                "did",
                "start_time",
                "end_time",
                "start_date",
                "end_date",
                "retry_time",
                "run_now",
                "batch_size",
                "batch_interval",
                "created_at",
                "updated_at",
            ]
            insert_vals: list[Any] = [
                bid,
                name,
                description,
                "scheduled" if run_now_flag else "active",
                0,
                waiting_time_values or "",
                executive_number,
                _fmt_dt(start),
                _fmt_dt(end),
                start.strftime("%Y-%m-%d"),
                end.strftime("%Y-%m-%d"),
                retry_time,
                1 if run_now_flag else 0,
                10,
                5,
                _fmt_dt(_now_ist()),
                _fmt_dt(_now_ist()),
            ]

            if has_bot_id:
                insert_cols.append("bot_id")
                insert_vals.append(bot_name)
            if has_exeuctive_number_col:
                insert_cols.append("exeuctive_number")
                insert_vals.append(exeuctive_number)
            if has_retry_logic_col:
                insert_cols.append("retry_logic")
                insert_vals.append(retry_logic)

            cols_sql = ", ".join(_q_ident(c) for c in insert_cols)
            ph_sql = ", ".join(["%s"] * len(insert_cols))

            cur.execute(
                f"INSERT INTO {_q_ident(campaigns_table)} ({cols_sql}) VALUES ({ph_sql})",
                insert_vals,
            )
            campaign_id = cur.lastrowid

            # Fetch inserted campaign row
            cur.execute(
                f"SELECT * FROM {_q_ident(campaigns_table)} WHERE campaign_id = %s LIMIT 1",
                [campaign_id],
            )
            camp_cols = [c[0] for c in cur.description]
            campaign = dict(zip(camp_cols, cur.fetchone()))

        # Insert scheduled_campaigns row when scheduled
        if run_now_flag:
            # Laravel parity: insertion failure must NOT fail campaign creation.
            # In your environment this table may not exist.
            try:
                if _master_table_exists("scheduled_campaigns"):
                    with connections["default"].cursor() as cur:
                        cur.execute(
                            """
                            INSERT INTO scheduled_campaigns
                            (campaign_id, business_id, start_time, end_time, execute_at, delay_seconds, status, error_message, created_at, updated_at)
                            VALUES (%s, %s, %s, %s, %s, 0, 'pending', NULL, %s, %s)
                            """,
                            [
                                campaign_id,
                                bid,
                                _fmt_dt(start),
                                _fmt_dt(end),
                                _fmt_dt(start),
                                _fmt_dt(_now_ist()),
                                _fmt_dt(_now_ist()),
                            ],
                        )
                else:
                    # Best-effort only; campaign remains scheduled in cluster table.
                    pass
            except Exception:
                # Best-effort only
                pass

        campaign["id"] = campaign.get("campaign_id")
        return Response(
            {
                "message": "Campaign created successfully",
                "campaign": campaign,
                "campaign_id": campaign_id,
                "id": campaign_id,
            },
            status=201,
        )

    campaigns_table = f"{bid}_campaigns"
    if not cluster_table_exists(campaigns_table):
        return Response(
            {
                "data": [],
                "pagination": {"current_page": 1, "last_page": 1, "per_page": 15, "total": 0},
            }
        )

    search = (request.query_params.get("search") or "").strip()
    status = (request.query_params.get("status") or "").strip()
    page = _parse_int(request.query_params, "page", 1)
    per_page = _parse_int(request.query_params, "per_page", 15, min_v=1, max_v=200)
    offset = (page - 1) * per_page

    where_parts: list[str] = []
    params: list[Any] = []

    if search:
        where_parts.append("(name LIKE %s OR description LIKE %s OR did LIKE %s OR status LIKE %s)")
        like = f"%{search}%"
        params.extend([like, like, like, like])

    if status and status != "all":
        where_parts.append("status = %s")
        params.append(status)

    where_sql = (" WHERE " + " AND ".join(where_parts)) if where_parts else ""

    # Contacts + call history tables
    contacts_table = f"{bid}_campaign_contacts"
    has_contacts = cluster_table_exists(contacts_table)
    call_history_table = f"{bid}_call_history"
    has_call_history = cluster_table_exists(call_history_table)

    # Check refid column presence (best-effort)
    has_refid = False
    if has_call_history:
        with connections["cluster"].cursor() as cur:
            cur.execute(f"SHOW COLUMNS FROM {_q_ident(call_history_table)} LIKE 'refid'")
            has_refid = cur.fetchone() is not None

    with connections["cluster"].cursor() as cur:
        cur.execute(f"SELECT COUNT(*) FROM {_q_ident(campaigns_table)}{where_sql}", params)
        total = int((cur.fetchone() or [0])[0] or 0)

        cur.execute(
            f"SELECT * FROM {_q_ident(campaigns_table)}{where_sql} ORDER BY created_at DESC LIMIT %s OFFSET %s",
            params + [per_page, offset],
        )
        cols = [c[0] for c in cur.description]
        rows = [dict(zip(cols, r)) for r in cur.fetchall()]

        out: list[dict[str, Any]] = []
        for camp in rows:
            cid = int(camp.get("campaign_id") or 0)
            camp["id"] = cid
            camp["successful_calls"] = 0
            camp["total_calls"] = 0
            camp["total_attempts"] = 0

            waiting_raw = str(camp.get("waiting_time") or "").strip()
            if waiting_raw == "":
                camp["waiting_time_count"] = 0
            else:
                camp["waiting_time_count"] = len([x for x in waiting_raw.split(",") if x.strip() != ""])

            if has_contacts and cid:
                cur.execute(
                    f"SELECT COUNT(*) FROM {_q_ident(contacts_table)} WHERE campaign_id = %s",
                    [cid],
                )
                camp["total_calls"] = int((cur.fetchone() or [0])[0] or 0)
                cur.execute(
                    f"SELECT COUNT(*) FROM {_q_ident(contacts_table)} WHERE campaign_id = %s AND status = 'ANSWER'",
                    [cid],
                )
                camp["successful_calls"] = int((cur.fetchone() or [0])[0] or 0)

            if has_call_history and has_refid and cid:
                cur.execute(
                    f"SELECT COUNT(*) FROM {_q_ident(call_history_table)} WHERE refid = %s",
                    [str(cid)],
                )
                camp["total_attempts"] = int((cur.fetchone() or [0])[0] or 0)

            out.append(camp)

    last_page = (total + per_page - 1) // per_page if per_page else 1
    return Response(
        {
            "data": out,
            "pagination": {
                "current_page": page,
                "last_page": last_page,
                "per_page": per_page,
                "total": total,
            },
        }
    )


@api_view(["GET"])
@permission_classes([IsAuthenticated])
def campaigns_stats(request):
    bid = _resolve_business_id(request)
    if not bid:
        return Response({"detail": "No business profile found for this user."}, status=400)

    campaigns_table = f"{bid}_campaigns"
    contacts_table = f"{bid}_campaign_contacts"

    if not cluster_table_exists(campaigns_table):
        return Response(
            {
                "total_campaigns": 0,
                "active_campaigns": 0,
                "completed_campaigns": 0,
                "total_calls": 0,
                "successful_calls": 0,
                "failed_calls": 0,
                "average_call_duration": 0,
            }
        )

    with connections["cluster"].cursor() as cur:
        cur.execute(f"SELECT COUNT(*) FROM {_q_ident(campaigns_table)}")
        total_campaigns = int((cur.fetchone() or [0])[0] or 0)
        cur.execute(f"SELECT COUNT(*) FROM {_q_ident(campaigns_table)} WHERE status = 'active'")
        active_campaigns = int((cur.fetchone() or [0])[0] or 0)
        cur.execute(f"SELECT COUNT(*) FROM {_q_ident(campaigns_table)} WHERE status = 'completed'")
        completed_campaigns = int((cur.fetchone() or [0])[0] or 0)

        total_calls = successful_calls = failed_calls = 0
        if cluster_table_exists(contacts_table):
            cur.execute(f"SELECT COUNT(*) FROM {_q_ident(contacts_table)}")
            total_calls = int((cur.fetchone() or [0])[0] or 0)
            cur.execute(f"SELECT COUNT(*) FROM {_q_ident(contacts_table)} WHERE status IN ('ANSWER')")
            successful_calls = int((cur.fetchone() or [0])[0] or 0)
            cur.execute(f"SELECT COUNT(*) FROM {_q_ident(contacts_table)} WHERE status = 'failed'")
            failed_calls = int((cur.fetchone() or [0])[0] or 0)

    return Response(
        {
            "total_campaigns": total_campaigns,
            "active_campaigns": active_campaigns,
            "completed_campaigns": completed_campaigns,
            "total_calls": total_calls,
            "successful_calls": successful_calls,
            "failed_calls": failed_calls,
            "average_call_duration": 0,
        }
    )


@api_view(["GET"])
@permission_classes([IsAuthenticated])
def executive_numbers(request):
    bid = _resolve_business_id(request)
    if not bid:
        return Response({"detail": "No business profile found for this user."}, status=400)

    bot_id = request.query_params.get("bot_id")
    try:
        bot_id_int = int(bot_id) if bot_id is not None else None
    except Exception:
        bot_id_int = None

    if not bot_id_int:
        return Response({"success": False, "message": "bot_id is required", "data": []}, status=400)

    # did_numbers is in master DB.
    with connections["default"].cursor() as cur:
        cur.execute("SHOW TABLES LIKE 'did_numbers'")
        if cur.fetchone() is None:
            return Response({"success": True, "data": []})

        cur.execute("SHOW COLUMNS FROM did_numbers LIKE 'direction'")
        has_direction = cur.fetchone() is not None

        sql = "SELECT DISTINCT did_no FROM did_numbers WHERE business_id = %s AND bot_id = %s"
        params: list[Any] = [bid, bot_id_int]
        if has_direction:
            sql += " AND direction = 'outbound'"
        sql += " ORDER BY did_no ASC"

        cur.execute(sql, params)
        nums = [r[0] for r in cur.fetchall() if r and r[0] is not None]

    return Response({"success": True, "data": nums})

