"""
Campaign endpoints expected by the React app (/api/campaigns/...).

These must run on the existing Django API host (localhost:8000) so the frontend
doesn't need a base URL change.

Data sources (matches backend/apps/cluster/dynamic_tables.py):
- Cluster DB: `{business_id}_campaigns`, `{business_id}_campaign_contacts`, `{business_id}_call_history`
- Master DB: `did_numbers`
"""

from __future__ import annotations

import json
import os
import threading
import time
import urllib.request
import urllib.error
from pathlib import Path

from dotenv import load_dotenv
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from typing import Any

from django.db import close_old_connections, connections
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response

from apps.cluster.dynamic_tables import _q_ident, _table_names
from config.reporting_utils import _legacy_profile_for_user, cluster_table_exists


def _resolve_business_id(request) -> int | None:
    legacy = _legacy_profile_for_user(request.user)
    bid = legacy.get("business_id") if legacy else None
    try:
        return int(bid) if bid is not None else None
    except Exception:
        return None


def _parse_int(q: Any, key: str, default: int, *, min_v: int = 1, max_v: int = 10_000) -> int:
    try:
        v = int(q.get(key, default))
    except Exception:
        v = default
    return max(min_v, min(v, max_v))


def _php_truthy(v: Any) -> bool:
    if isinstance(v, bool):
        return v
    s = str(v).strip().lower()
    return s in ("1", "true", "on", "yes")


def _now_ist() -> datetime:
    return datetime.now(ZoneInfo("Asia/Kolkata"))


def _fmt_dt(dt: datetime) -> str:
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=ZoneInfo("Asia/Kolkata"))
    else:
        dt = dt.astimezone(ZoneInfo("Asia/Kolkata"))
    return dt.strftime("%Y-%m-%d %H:%M:%S")


def _parse_dt_any(raw: Any) -> datetime:
    """
    Accepts ISO strings or 'Y-m-d H:i:s'. Best-effort; falls back to now (IST).
    """
    if raw is None or str(raw).strip() == "":
        return _now_ist()
    s = str(raw).strip()
    # Fast path: "YYYY-mm-dd HH:MM:SS"
    for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f"):
        try:
            dt = datetime.strptime(s.replace("Z", ""), fmt)
            return dt.replace(tzinfo=ZoneInfo("Asia/Kolkata"))
        except Exception:
            pass
    try:
        # Python 3.11 ISO parser
        dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=ZoneInfo("Asia/Kolkata"))
        return dt.astimezone(ZoneInfo("Asia/Kolkata"))
    except Exception:
        return _now_ist()


def _fetch_retry_attempt_values(business_id: int) -> list[int]:
    """
    businesses.retry_attempt is JSON (object). Extract numeric values preserving order.
    """
    with connections["default"].cursor() as cur:
        cur.execute("SELECT retry_attempt FROM businesses WHERE business_id = %s LIMIT 1", [business_id])
        row = cur.fetchone()
        raw = row[0] if row else None
    if raw in (None, ""):
        return []
    if isinstance(raw, str):
        try:
            raw = json.loads(raw)
        except Exception:
            return []
    if not isinstance(raw, dict):
        return []
    out: list[int] = []
    for _k, v in raw.items():
        try:
            if isinstance(v, (int, float)) or str(v).replace(".", "", 1).isdigit():
                out.append(int(float(v)))
        except Exception:
            continue
    return out


def _compute_waiting_time_values(business_id: int, retry_logic: str, custom_attempt_values: Any) -> str:
    if retry_logic == "business_logic":
        vals = _fetch_retry_attempt_values(business_id)
        return ",".join(str(v) for v in vals) if vals else ""
    if retry_logic == "custom_logic":
        if not isinstance(custom_attempt_values, list) or not custom_attempt_values:
            return ""
        minutes: list[int] = []
        for v in custom_attempt_values:
            try:
                n = float(v) if v is not None else 0.0
            except Exception:
                n = 0.0
            minutes.append(int(round(n * 60)))
        return ",".join(str(m) for m in minutes if m >= 0)
    if retry_logic == "not_retry":
        return ""
    return ""


def _fetch_executive_number(business_id: int, did_no: str, bot_id: Any) -> str:
    """
    Mirrors Laravel fetch of exe_number/did_no from did_numbers (direction='outbound' when exists).
    Falls back to provided did_no.
    """
    if not did_no:
        return ""
    with connections["default"].cursor() as cur:
        cur.execute("SHOW TABLES LIKE 'did_numbers'")
        if cur.fetchone() is None:
            return did_no

        # Optional columns
        cur.execute("SHOW COLUMNS FROM did_numbers LIKE 'direction'")
        has_direction = cur.fetchone() is not None
        cur.execute("SHOW COLUMNS FROM did_numbers LIKE 'exe_number'")
        has_exe_number = cur.fetchone() is not None

        sql = "SELECT did_no"
        if has_exe_number:
            sql += ", exe_number"
        sql += " FROM did_numbers WHERE business_id = %s AND did_no = %s"
        params: list[Any] = [business_id, did_no]
        if bot_id not in (None, "", 0):
            sql += " AND bot_id = %s"
            params.append(int(bot_id))
        if has_direction:
            sql += " AND direction = 'outbound'"
        sql += " LIMIT 1"

        cur.execute(sql, params)
        row = cur.fetchone()
        if not row:
            return did_no
        if has_exe_number and len(row) > 1 and row[1] not in (None, ""):
            return str(row[1])
        return str(row[0])


def _master_table_exists(table: str) -> bool:
    try:
        with connections["default"].cursor() as cur:
            cur.execute("SHOW TABLES LIKE %s", [table])
            return cur.fetchone() is not None
    except Exception:
        return False


def _parse_contacts_from_csv_bytes(content: bytes) -> tuple[list[dict[str, str]], list[str]] | None:
    """
    Laravel-like ultra-robust parsing:
    - tries delimiters: ',', '\\t', ';', '|'
    - detects name + contact columns by header variations
    """
    try:
        text = content.decode("utf-8", errors="replace")
        lines = [ln for ln in text.split("\n") if ln.strip() != ""]
        if len(lines) < 2:
            return None

        headers: list[str] = []
        name_idx = -1
        contact_idx = -1
        contacts: list[dict[str, str]] = []

        delimiters = [",", "\t", ";", "|"]

        for i, line in enumerate(lines):
            line = line.strip()
            if not line:
                continue

            row: list[str] = []
            for d in delimiters:
                if d == ",":
                    # minimal CSV parsing (quotes)
                    import csv
                    import io

                    row = next(csv.reader(io.StringIO(line), delimiter=d))
                else:
                    row = line.split(d)
                if len(row) >= 2:
                    break

            row = [str(x).strip(" \t\n\r\0\x0B\"'") for x in row]

            if i == 0:
                headers = row
                for j, h in enumerate(headers):
                    n = h.lower().strip()
                    if any(k in n for k in ("name", "fullname", "full_name", "customer_name")):
                        name_idx = j
                    if any(
                        k in n
                        for k in (
                            "contact",
                            "phone",
                            "mobile",
                            "number",
                            "phone_number",
                            "mobile_number",
                        )
                    ):
                        contact_idx = j
                continue

            if name_idx >= 0 and contact_idx >= 0:
                name = row[name_idx] if name_idx < len(row) else ""
                contact = row[contact_idx] if contact_idx < len(row) else ""
                if name or contact:
                    contacts.append({"name": name, "contact": contact})

        if name_idx == -1 or contact_idx == -1 or not contacts:
            return None
        return contacts, headers
    except Exception:
        return None


def _parse_contacts_from_xlsx_bytes(content: bytes) -> tuple[list[dict[str, str]], list[str]] | None:
    try:
        import io

        import openpyxl

        wb = openpyxl.load_workbook(io.BytesIO(content), read_only=True, data_only=True)
        ws = wb[wb.sheetnames[0]]
        rows_iter = ws.iter_rows(values_only=True)
        header_row = next(rows_iter, None)
        if not header_row:
            return None
        headers = [str(c).strip() if c is not None else "" for c in header_row]

        name_idx = -1
        contact_idx = -1
        for j, h in enumerate(headers):
            n = h.lower().strip()
            if any(k in n for k in ("name", "fullname", "full_name", "customer_name")):
                name_idx = j
            if any(
                k in n
                for k in (
                    "contact",
                    "phone",
                    "mobile",
                    "number",
                    "phone_number",
                    "mobile_number",
                )
            ):
                contact_idx = j

        if name_idx == -1 or contact_idx == -1:
            return None

        contacts: list[dict[str, str]] = []
        for r in rows_iter:
            row = list(r or [])
            name = str(row[name_idx]).strip() if name_idx < len(row) and row[name_idx] is not None else ""
            contact = (
                str(row[contact_idx]).strip()
                if contact_idx < len(row) and row[contact_idx] is not None
                else ""
            )
            if name or contact:
                contacts.append({"name": name, "contact": contact})

        wb.close()
        if not contacts:
            return None
        return contacts, headers
    except Exception:
        return None


def _retry_value_for_contact_insert(bid: int, campaign_row: dict[str, Any]) -> tuple[int, bool]:
    """
    Port of Laravel storeCampaignContacts logic (simplified to match our campaign creation):
    - run_now=0 => set retry now based on waiting_time, unless retry_logic=not_retry
    - run_now=1 => retry=0 (scheduled will be set at start)
    Returns (retry_value, is_run_now).
    """
    is_run_now = int(campaign_row.get("run_now") or 0) == 0
    retry_logic = str(campaign_row.get("retry_logic") or "")

    if not is_run_now:
        return 0, False

    if retry_logic == "not_retry":
        return 1, True

    waiting = str(campaign_row.get("waiting_time") or "").strip()
    if waiting:
        parts = [p.strip() for p in waiting.split(",") if p.strip() and p.strip().replace(".", "", 1).isdigit()]
        return (len(parts) + 1) if parts else 1, True

    # waiting_time empty: for business_logic fetch from businesses.retry_attempt
    if retry_logic == "business_logic":
        vals = _fetch_retry_attempt_values(bid)
        return (len(vals) + 1) if vals else 1, True

    return 1, True


@api_view(["POST"])
@permission_classes([IsAuthenticated])
def campaign_contacts_upload(request, campaign_id: int):
    """
    Endpoint expected by the React app:
    POST /api/campaigns/{id}/contacts
    multipart/form-data with field `file`.

    Stores contacts into cluster table `{business_id}_campaign_contacts` and increments
    `{business_id}_campaigns.total_count`.
    """
    bid = _resolve_business_id(request)
    if not bid:
        return Response({"detail": "No business profile found for this user."}, status=400)

    campaigns_table = f"{bid}_campaigns"
    contacts_table = f"{bid}_campaign_contacts"

    if not cluster_table_exists(campaigns_table):
        return Response({"message": "Campaign not found"}, status=404)

    if not cluster_table_exists(contacts_table):
        return Response({"message": "Campaign contacts table does not exist"}, status=404)

    up = request.FILES.get("file")
    if not up:
        return Response({"message": "Validation failed", "errors": {"file": ["required"]}}, status=422)

    if up.size and int(up.size) > 10 * 1024 * 1024:
        return Response({"message": "Validation failed", "errors": {"file": ["max 10MB"]}}, status=422)

    filename = (getattr(up, "name", "") or "").lower()
    content = up.read()

    parsed = None
    if filename.endswith(".xlsx"):
        parsed = _parse_contacts_from_xlsx_bytes(content)
    else:
        parsed = _parse_contacts_from_csv_bytes(content)

    if not parsed:
        return Response({"message": "No valid contact data found", "success": False}, status=400)

    contacts, _headers = parsed

    with connections["cluster"].cursor() as cur:
        cur.execute(
            f"SELECT * FROM {_q_ident(campaigns_table)} WHERE campaign_id = %s LIMIT 1",
            [int(campaign_id)],
        )
        camp_row = cur.fetchone()
        if not camp_row:
            return Response({"message": "Campaign not found"}, status=404)
        camp_cols = [c[0] for c in cur.description]
        campaign = dict(zip(camp_cols, camp_row))

        current_total = int(campaign.get("total_count") or 0)

        retry_value, _is_run_now = _retry_value_for_contact_insert(bid, campaign)

        now_s = _fmt_dt(_now_ist())
        inserted_contacts: list[dict[str, Any]] = []

        for c in contacts:
            name = (c.get("name") or "").strip()
            number = (c.get("contact") or "").strip()
            cur.execute(
                f"""
                INSERT INTO {_q_ident(contacts_table)}
                (business_id, campaign_id, name, number, source, status, `retry`, created_at, updated_at)
                VALUES (%s, %s, %s, %s, %s, 'pending', %s, %s, %s)
                """,
                [bid, int(campaign_id), name, number, filename, retry_value, now_s, now_s],
            )
            inserted_contacts.append(
                {
                    "id": cur.lastrowid,
                    "number": number,
                    "status": "pending",
                    "name": name,
                    "campaign_id": int(campaign_id),
                    "business_id": bid,
                }
            )

        new_count = len(contacts)
        updated_total = current_total + new_count
        cur.execute(
            f"UPDATE {_q_ident(campaigns_table)} SET total_count = %s WHERE campaign_id = %s",
            [updated_total, int(campaign_id)],
        )

    # Auto-trigger outbound calls sequentially (2s delay) after upload.
    # This runs in a background thread so the upload response returns immediately.
    def _autocall_worker(
        *,
        business_id: int,
        campaign_id: int,
        contact_ids: list[int],
        agent_name: str,
        campaign_bot_id: Any = None,
    ) -> None:
        close_old_connections()
        # Background thread: reload backend/.env so ngrok URL edits apply without restarting Django.
        load_dotenv(Path(__file__).resolve().parent.parent / ".env", override=True)
        outbound_url = (
            os.getenv("MCUBE_LOCAL_OUTBOUND_URL")
            or os.getenv("MCUBE_OUTBOUND_CALL_URL")
            or "http://127.0.0.1:8088/api/mcube/outbound-call"
        ).strip()

        # Optional: send explicit public URLs from env (ngrok / prod) so MCube gets stable
        # callback + websocket refurl without relying only on server-side defaults.
        public_base = (os.getenv("MCUBE_PUBLIC_BASE_URL") or "").strip().rstrip("/")
        public_ws = (os.getenv("MCUBE_PUBLIC_WS_URL_BASE") or "").strip().rstrip("/")
        ws_prefix = (os.getenv("MCUBE_WS_PATH_PREFIX") or "/ws").strip()
        if ws_prefix and not ws_prefix.startswith("/"):
            ws_prefix = "/" + ws_prefix
        webhook_path = (os.getenv("MCUBE_WEBHOOK_PATH") or "/webhooks/mcube").strip()
        if webhook_path and not webhook_path.startswith("/"):
            webhook_path = "/" + webhook_path

        callback_base = f"{public_base}{webhook_path}" if public_base else ""

        # Best-effort: only update optional columns if they exist.
        has_call_id = False
        has_dialstatus = False
        with connections["cluster"].cursor() as cur2:
            cur2.execute(f"SHOW COLUMNS FROM {_q_ident(contacts_table)} LIKE 'call_id'")
            has_call_id = cur2.fetchone() is not None
            cur2.execute(f"SHOW COLUMNS FROM {_q_ident(contacts_table)} LIKE 'dialstatus'")
            has_dialstatus = cur2.fetchone() is not None

        for idx, cid in enumerate(contact_ids):
            try:
                with connections["cluster"].cursor() as cur3:
                    cur3.execute(
                        f"SELECT id, number FROM {_q_ident(contacts_table)} "
                        "WHERE id = %s AND campaign_id = %s AND business_id = %s LIMIT 1",
                        [int(cid), int(campaign_id), int(business_id)],
                    )
                    row = cur3.fetchone()
                    if not row:
                        continue
                    number = str(row[1] or "").strip()
                    if not number:
                        # mark failed
                        if has_dialstatus:
                            cur3.execute(
                                f"UPDATE {_q_ident(contacts_table)} SET status='failed', dialstatus=%s, updated_at=%s "
                                "WHERE id=%s",
                                ["missing_number", _fmt_dt(_now_ist()), int(cid)],
                            )
                        else:
                            cur3.execute(
                                f"UPDATE {_q_ident(contacts_table)} SET status='failed', updated_at=%s WHERE id=%s",
                                [_fmt_dt(_now_ist()), int(cid)],
                            )
                        continue

                    refid = f"cmp_{campaign_id}_contact_{cid}"
                    if has_call_id:
                        if has_dialstatus:
                            cur3.execute(
                                f"UPDATE {_q_ident(contacts_table)} SET status='calling', call_id=%s, dialstatus=%s, updated_at=%s "
                                "WHERE id=%s",
                                [refid, "calling", _fmt_dt(_now_ist()), int(cid)],
                            )
                        else:
                            cur3.execute(
                                f"UPDATE {_q_ident(contacts_table)} SET status='calling', call_id=%s, updated_at=%s WHERE id=%s",
                                [refid, _fmt_dt(_now_ist()), int(cid)],
                            )
                    else:
                        if has_dialstatus:
                            cur3.execute(
                                f"UPDATE {_q_ident(contacts_table)} SET status='calling', dialstatus=%s, updated_at=%s WHERE id=%s",
                                ["calling", _fmt_dt(_now_ist()), int(cid)],
                            )
                        else:
                            cur3.execute(
                                f"UPDATE {_q_ident(contacts_table)} SET status='calling', updated_at=%s WHERE id=%s",
                                [_fmt_dt(_now_ist()), int(cid)],
                            )

                refurl_ws = f"{public_ws}{ws_prefix}/{refid}" if public_ws else ""

                payload = {
                    "to": number,
                    "call_id": refid,
                    "business_id": business_id,
                }
                if campaign_bot_id not in (None, ""):
                    try:
                        _bid = int(campaign_bot_id)
                        payload["bot_id"] = _bid
                        payload["campaign_bot_id"] = _bid
                    except Exception:
                        payload["campaign_bot_id"] = campaign_bot_id
                if agent_name and str(agent_name).strip().lower() not in ("default", ""):
                    payload["agent_name"] = agent_name
                if callback_base:
                    payload["callback_url"] = callback_base
                if refurl_ws:
                    payload["refurl"] = refurl_ws

                data = json.dumps(payload).encode("utf-8")
                req = urllib.request.Request(
                    outbound_url,
                    data=data,
                    headers={"content-type": "application/json"},
                    method="POST",
                )
                try:
                    with urllib.request.urlopen(req, timeout=20.0) as resp:
                        raw = resp.read()
                        try:
                            resp_json = json.loads(raw.decode("utf-8"))
                        except Exception:
                            resp_json = {}
                        mcube_sid = str(resp_json.get("mcube_call_sid") or "").strip()
                        status_txt = str(resp_json.get("status") or "initiated").strip()
                        ok = bool(resp_json.get("ok")) if isinstance(resp_json, dict) else False
                except urllib.error.HTTPError as e:
                    ok = False
                    status_txt = f"http_{getattr(e, 'code', 500)}"
                    mcube_sid = ""
                except Exception:
                    ok = False
                    status_txt = "request_error"
                    mcube_sid = ""

                with connections["cluster"].cursor() as cur4:
                    if ok:
                        if has_call_id and mcube_sid:
                            if has_dialstatus:
                                cur4.execute(
                                    f"UPDATE {_q_ident(contacts_table)} SET status='initiated', call_id=%s, dialstatus=%s, updated_at=%s WHERE id=%s",
                                    [mcube_sid, status_txt[:250], _fmt_dt(_now_ist()), int(cid)],
                                )
                            else:
                                cur4.execute(
                                    f"UPDATE {_q_ident(contacts_table)} SET status='initiated', call_id=%s, updated_at=%s WHERE id=%s",
                                    [mcube_sid, _fmt_dt(_now_ist()), int(cid)],
                                )
                        else:
                            if has_dialstatus:
                                cur4.execute(
                                    f"UPDATE {_q_ident(contacts_table)} SET status='initiated', dialstatus=%s, updated_at=%s WHERE id=%s",
                                    [status_txt[:250], _fmt_dt(_now_ist()), int(cid)],
                                )
                            else:
                                cur4.execute(
                                    f"UPDATE {_q_ident(contacts_table)} SET status='initiated', updated_at=%s WHERE id=%s",
                                    [_fmt_dt(_now_ist()), int(cid)],
                                )
                    else:
                        if has_dialstatus:
                            cur4.execute(
                                f"UPDATE {_q_ident(contacts_table)} SET status='failed', dialstatus=%s, updated_at=%s WHERE id=%s",
                                [status_txt[:250], _fmt_dt(_now_ist()), int(cid)],
                            )
                        else:
                            cur4.execute(
                                f"UPDATE {_q_ident(contacts_table)} SET status='failed', updated_at=%s WHERE id=%s",
                                [_fmt_dt(_now_ist()), int(cid)],
                            )
            except Exception:
                # Never crash the worker; best-effort only.
                pass

            if idx < len(contact_ids) - 1:
                time.sleep(2.0)

        close_old_connections()

    try:
        contact_ids = [int(c.get("id")) for c in inserted_contacts if c.get("id") is not None]
        raw_bot = campaign.get("bot_id")
        agent_name = str(raw_bot or "default").strip() or "default"
        if contact_ids:
            threading.Thread(
                target=_autocall_worker,
                kwargs={
                    "business_id": int(bid),
                    "campaign_id": int(campaign_id),
                    "contact_ids": contact_ids,
                    "agent_name": agent_name,
                    "campaign_bot_id": raw_bot,
                },
                daemon=True,
                name=f"campaign_autocall_{bid}_{campaign_id}",
            ).start()
    except Exception:
        # Best-effort only; upload should still succeed.
        pass

    return Response(
        {
            "message": f"Successfully parsed {len(contacts)} contacts",
            "contacts_imported": new_count,
            "total_count": updated_total,
            "previous_count": current_total,
            "contacts_data": contacts,
            "inserted_contacts": inserted_contacts,
            "campaign_id": int(campaign_id),
            "success": True,
        }
    )


@api_view(["GET", "POST"])
@permission_classes([IsAuthenticated])
def campaigns_index(request):
    bid = _resolve_business_id(request)
    if not bid:
        return Response({"detail": "No business profile found for this user."}, status=400)

    # POST /api/campaigns  (create)
    if request.method == "POST":
        data = request.data if isinstance(request.data, dict) else {}

        name = (data.get("name") or "").strip()
        if not name:
            return Response(
                {"message": "Validation failed", "errors": {"name": ["required"]}},
                status=422,
            )

        description = data.get("description")
        run_now_flag = _php_truthy(data.get("run_now", False))

        # IMPORTANT: your UI sends run_now=false for run-now campaigns.
        # Behavior (Laravel parity):
        # - run_now=false => start_time=now, end_time=now+2y, status='active'
        # - run_now=true  => start_time=user-selected (or now if missing), end=start+2y, status='scheduled'
        if run_now_flag:
            start = _parse_dt_any(data.get("start_time"))
        else:
            start = _now_ist()
        end = start + timedelta(days=365 * 2)

        callback_enabled = _php_truthy(data.get("callback_enabled", True))
        retry_time = 1 if callback_enabled else 0

        retry_logic = str(data.get("retry_logic") or "")
        custom_attempt_values = data.get("custom_attempt_values", [])
        waiting_time_values = _compute_waiting_time_values(bid, retry_logic, custom_attempt_values)

        exeuctive_number = str(data.get("exeuctive_number") or "").strip()
        bot_name = data.get("bot_name")
        executive_number = _fetch_executive_number(bid, exeuctive_number, bot_name)

        campaigns_table = f"{bid}_campaigns"
        if not cluster_table_exists(campaigns_table):
            return Response(
                {"message": "Campaign table not found. Please contact administrator.", "error": "Table does not exist"},
                status=500,
            )

        with connections["cluster"].cursor() as cur:
            # Optional columns (backward compatible)
            cur.execute(f"SHOW COLUMNS FROM {_q_ident(campaigns_table)} LIKE 'bot_id'")
            has_bot_id = cur.fetchone() is not None
            cur.execute(f"SHOW COLUMNS FROM {_q_ident(campaigns_table)} LIKE 'exeuctive_number'")
            has_exeuctive_number_col = cur.fetchone() is not None
            cur.execute(f"SHOW COLUMNS FROM {_q_ident(campaigns_table)} LIKE 'retry_logic'")
            has_retry_logic_col = cur.fetchone() is not None

            insert_cols = [
                "business_id",
                "name",
                "description",
                "status",
                "total_count",
                "waiting_time",
                "did",
                "start_time",
                "end_time",
                "start_date",
                "end_date",
                "retry_time",
                "run_now",
                "batch_size",
                "batch_interval",
                "created_at",
                "updated_at",
            ]
            insert_vals: list[Any] = [
                bid,
                name,
                description,
                "scheduled" if run_now_flag else "active",
                0,
                waiting_time_values or "",
                executive_number,
                _fmt_dt(start),
                _fmt_dt(end),
                start.strftime("%Y-%m-%d"),
                end.strftime("%Y-%m-%d"),
                retry_time,
                1 if run_now_flag else 0,
                10,
                5,
                _fmt_dt(_now_ist()),
                _fmt_dt(_now_ist()),
            ]

            if has_bot_id:
                insert_cols.append("bot_id")
                insert_vals.append(bot_name)
            if has_exeuctive_number_col:
                insert_cols.append("exeuctive_number")
                insert_vals.append(exeuctive_number)
            if has_retry_logic_col:
                insert_cols.append("retry_logic")
                insert_vals.append(retry_logic)

            cols_sql = ", ".join(_q_ident(c) for c in insert_cols)
            ph_sql = ", ".join(["%s"] * len(insert_cols))

            cur.execute(
                f"INSERT INTO {_q_ident(campaigns_table)} ({cols_sql}) VALUES ({ph_sql})",
                insert_vals,
            )
            campaign_id = cur.lastrowid

            # Fetch inserted campaign row
            cur.execute(
                f"SELECT * FROM {_q_ident(campaigns_table)} WHERE campaign_id = %s LIMIT 1",
                [campaign_id],
            )
            camp_cols = [c[0] for c in cur.description]
            campaign = dict(zip(camp_cols, cur.fetchone()))

        # Insert scheduled_campaigns row when scheduled
        if run_now_flag:
            # Laravel parity: insertion failure must NOT fail campaign creation.
            # In your environment this table may not exist.
            try:
                if _master_table_exists("scheduled_campaigns"):
                    with connections["default"].cursor() as cur:
                        cur.execute(
                            """
                            INSERT INTO scheduled_campaigns
                            (campaign_id, business_id, start_time, end_time, execute_at, delay_seconds, status, error_message, created_at, updated_at)
                            VALUES (%s, %s, %s, %s, %s, 0, 'pending', NULL, %s, %s)
                            """,
                            [
                                campaign_id,
                                bid,
                                _fmt_dt(start),
                                _fmt_dt(end),
                                _fmt_dt(start),
                                _fmt_dt(_now_ist()),
                                _fmt_dt(_now_ist()),
                            ],
                        )
                else:
                    # Best-effort only; campaign remains scheduled in cluster table.
                    pass
            except Exception:
                # Best-effort only
                pass

        campaign["id"] = campaign.get("campaign_id")
        return Response(
            {
                "message": "Campaign created successfully",
                "campaign": campaign,
                "campaign_id": campaign_id,
                "id": campaign_id,
            },
            status=201,
        )

    campaigns_table = f"{bid}_campaigns"
    if not cluster_table_exists(campaigns_table):
        return Response(
            {
                "data": [],
                "pagination": {"current_page": 1, "last_page": 1, "per_page": 15, "total": 0},
            }
        )

    search = (request.query_params.get("search") or "").strip()
    status = (request.query_params.get("status") or "").strip()
    page = _parse_int(request.query_params, "page", 1)
    per_page = _parse_int(request.query_params, "per_page", 15, min_v=1, max_v=200)
    offset = (page - 1) * per_page

    where_parts: list[str] = []
    params: list[Any] = []

    if search:
        where_parts.append("(name LIKE %s OR description LIKE %s OR did LIKE %s OR status LIKE %s)")
        like = f"%{search}%"
        params.extend([like, like, like, like])

    if status and status != "all":
        where_parts.append("status = %s")
        params.append(status)

    where_sql = (" WHERE " + " AND ".join(where_parts)) if where_parts else ""

    # Contacts + call history tables
    contacts_table = f"{bid}_campaign_contacts"
    has_contacts = cluster_table_exists(contacts_table)
    call_history_table = f"{bid}_call_history"
    has_call_history = cluster_table_exists(call_history_table)

    # Check refid column presence (best-effort)
    has_refid = False
    if has_call_history:
        with connections["cluster"].cursor() as cur:
            cur.execute(f"SHOW COLUMNS FROM {_q_ident(call_history_table)} LIKE 'refid'")
            has_refid = cur.fetchone() is not None

    with connections["cluster"].cursor() as cur:
        cur.execute(f"SELECT COUNT(*) FROM {_q_ident(campaigns_table)}{where_sql}", params)
        total = int((cur.fetchone() or [0])[0] or 0)

        cur.execute(
            f"SELECT * FROM {_q_ident(campaigns_table)}{where_sql} ORDER BY created_at DESC LIMIT %s OFFSET %s",
            params + [per_page, offset],
        )
        cols = [c[0] for c in cur.description]
        rows = [dict(zip(cols, r)) for r in cur.fetchall()]

        out: list[dict[str, Any]] = []
        for camp in rows:
            cid = int(camp.get("campaign_id") or 0)
            camp["id"] = cid
            camp["successful_calls"] = 0
            camp["total_calls"] = 0
            camp["total_attempts"] = 0

            waiting_raw = str(camp.get("waiting_time") or "").strip()
            if waiting_raw == "":
                camp["waiting_time_count"] = 0
            else:
                camp["waiting_time_count"] = len([x for x in waiting_raw.split(",") if x.strip() != ""])

            if has_contacts and cid:
                cur.execute(
                    f"SELECT COUNT(*) FROM {_q_ident(contacts_table)} WHERE campaign_id = %s",
                    [cid],
                )
                camp["total_calls"] = int((cur.fetchone() or [0])[0] or 0)
                cur.execute(
                    f"SELECT COUNT(*) FROM {_q_ident(contacts_table)} WHERE campaign_id = %s AND status = 'ANSWER'",
                    [cid],
                )
                camp["successful_calls"] = int((cur.fetchone() or [0])[0] or 0)

            if has_call_history and has_refid and cid:
                cur.execute(
                    f"SELECT COUNT(*) FROM {_q_ident(call_history_table)} WHERE refid = %s",
                    [str(cid)],
                )
                camp["total_attempts"] = int((cur.fetchone() or [0])[0] or 0)

            out.append(camp)

    last_page = (total + per_page - 1) // per_page if per_page else 1
    return Response(
        {
            "data": out,
            "pagination": {
                "current_page": page,
                "last_page": last_page,
                "per_page": per_page,
                "total": total,
            },
        }
    )


@api_view(["GET"])
@permission_classes([IsAuthenticated])
def campaigns_stats(request):
    bid = _resolve_business_id(request)
    if not bid:
        return Response({"detail": "No business profile found for this user."}, status=400)

    campaigns_table = f"{bid}_campaigns"
    contacts_table = f"{bid}_campaign_contacts"

    if not cluster_table_exists(campaigns_table):
        return Response(
            {
                "total_campaigns": 0,
                "active_campaigns": 0,
                "completed_campaigns": 0,
                "total_calls": 0,
                "successful_calls": 0,
                "failed_calls": 0,
                "average_call_duration": 0,
            }
        )

    with connections["cluster"].cursor() as cur:
        cur.execute(f"SELECT COUNT(*) FROM {_q_ident(campaigns_table)}")
        total_campaigns = int((cur.fetchone() or [0])[0] or 0)
        cur.execute(f"SELECT COUNT(*) FROM {_q_ident(campaigns_table)} WHERE status = 'active'")
        active_campaigns = int((cur.fetchone() or [0])[0] or 0)
        cur.execute(f"SELECT COUNT(*) FROM {_q_ident(campaigns_table)} WHERE status = 'completed'")
        completed_campaigns = int((cur.fetchone() or [0])[0] or 0)

        total_calls = successful_calls = failed_calls = 0
        if cluster_table_exists(contacts_table):
            cur.execute(f"SELECT COUNT(*) FROM {_q_ident(contacts_table)}")
            total_calls = int((cur.fetchone() or [0])[0] or 0)
            cur.execute(f"SELECT COUNT(*) FROM {_q_ident(contacts_table)} WHERE status IN ('ANSWER')")
            successful_calls = int((cur.fetchone() or [0])[0] or 0)
            cur.execute(f"SELECT COUNT(*) FROM {_q_ident(contacts_table)} WHERE status = 'failed'")
            failed_calls = int((cur.fetchone() or [0])[0] or 0)

    return Response(
        {
            "total_campaigns": total_campaigns,
            "active_campaigns": active_campaigns,
            "completed_campaigns": completed_campaigns,
            "total_calls": total_calls,
            "successful_calls": successful_calls,
            "failed_calls": failed_calls,
            "average_call_duration": 0,
        }
    )


@api_view(["GET"])
@permission_classes([IsAuthenticated])
def executive_numbers(request):
    bid = _resolve_business_id(request)
    if not bid:
        return Response({"detail": "No business profile found for this user."}, status=400)

    bot_id = request.query_params.get("bot_id")
    try:
        bot_id_int = int(bot_id) if bot_id is not None else None
    except Exception:
        bot_id_int = None

    if not bot_id_int:
        return Response({"success": False, "message": "bot_id is required", "data": []}, status=400)

    # did_numbers is in master DB.
    with connections["default"].cursor() as cur:
        cur.execute("SHOW TABLES LIKE 'did_numbers'")
        if cur.fetchone() is None:
            return Response({"success": True, "data": []})

        cur.execute("SHOW COLUMNS FROM did_numbers LIKE 'direction'")
        has_direction = cur.fetchone() is not None

        sql = "SELECT DISTINCT did_no FROM did_numbers WHERE business_id = %s AND bot_id = %s"
        params: list[Any] = [bid, bot_id_int]
        if has_direction:
            sql += " AND direction = 'outbound'"
        sql += " ORDER BY did_no ASC"

        cur.execute(sql, params)
        nums = [r[0] for r in cur.fetchall() if r and r[0] is not None]

    return Response({"success": True, "data": nums})

