from __future__ import annotations

import json
import logging
import time
from datetime import datetime
from typing import Any

import httpx
from dateutil import parser as date_parser
from sqlalchemy import text
from sqlalchemy.orm import Session

from app.config import build_mcube_refurl, get_settings
from app.utils.cluster_table_names import call_history_table, campaign_contacts_table, campaigns_table
from app.repositories.schema_helper import has_column, has_table
from app.services import campaign_core
from app.services.campaign_core import (
    check_business_time_window,
    check_remaining_minutes,
    fetch_dial_status_and_update,
    get_bot_concurrent_limit,
    get_bot_connecting_count,
    get_bot_id_from_campaign_did,
)
from app.utils.rabbitmq_campaign import trigger_call_history_sync
from app.utils.timezone_util import format_dt, now_app

logger = logging.getLogger(__name__)


def initiate_outbound_call(
    session_default: Session,
    session_cluster: Session,
    campaign_id: int | None,
    business_id: int | None,
    retry_only: bool = False,
    priority_only: bool = False,
) -> dict[str, Any]:
    """
    Port of CampaignController::initiateOutboundCall (synchronous, uses sleep/usleep like PHP).
    """
    settings = get_settings()
    auth_token = settings.outbound_bearer_token

    if not business_id and campaign_id:
        business_id = campaign_core.resolve_business_id_for_campaign(session_default, session_cluster, campaign_id)

    if not business_id:
        err = f"Business ID is required but not found. Campaign ID: {campaign_id}"
        logger.error(err)
        return {"success": False, "error": err, "campaign_id": campaign_id}

    if not (settings.mcube_public_ws_url_base or "").strip():
        return {
            "success": False,
            "error": "MCUBE_PUBLIC_WS_URL_BASE is not set (required for refurl). "
            "Copy MCUBE_PUBLIC_WS_URL_BASE from backend .env into campaign_port environment.",
            "campaign_id": campaign_id,
        }

    minutes_check = check_remaining_minutes(session_default, int(business_id))
    if not minutes_check["valid"]:
        return {
            "success": False,
            "error": minutes_check["message"],
            "insufficient_minutes": True,
            "remain_min": minutes_check.get("remain_min", 0),
            "campaign_id": campaign_id,
        }

    business_time_check = check_business_time_window(session_default, int(business_id))
    if not business_time_check["valid"]:
        return {
            "success": False,
            "error": business_time_check["message"],
            "error_code": "BUSINESS_TIME_BLOCKED",
            "business_time": business_time_check.get("business_time"),
            "current_time": business_time_check.get("current_time"),
            "current_day": business_time_check.get("current_day"),
            "campaign_id": campaign_id,
        }

    campaigns_table_name = campaigns_table(int(business_id))
    table_name = campaign_contacts_table(int(business_id))
    call_history_tbl = call_history_table(int(business_id))

    if not has_table(session_cluster, campaigns_table_name):
        err = f"Campaigns table not found: {campaigns_table_name}"
        logger.error(err)
        return {"success": False, "error": err, "table_name": campaigns_table_name}

    campaign = None
    if campaign_id:
        campaign = session_cluster.execute(
            text(f"SELECT * FROM `{campaigns_table_name}` WHERE campaign_id = :cid LIMIT 1"),
            {"cid": campaign_id},
        ).mappings().first()
        if not campaign:
            err = f"Campaign not found: {campaign_id}"
            logger.error(err)
            return {
                "success": False,
                "error": err,
                "campaign_id": campaign_id,
                "table_name": campaigns_table_name,
            }

        if not retry_only:
            if campaign.get("status") == "paused":
                return {
                    "success": False,
                    "error": "Campaign is paused. Please resume it to continue processing.",
                    "campaign_id": campaign_id,
                    "current_status": campaign.get("status"),
                    "paused": True,
                }
            if campaign.get("status") not in ("active", "processing"):
                return {
                    "success": False,
                    "error": "Campaign status must be active or processing to initiate calls. Current status: "
                    + str(campaign.get("status")),
                    "campaign_id": campaign_id,
                    "current_status": campaign.get("status"),
                }
        else:
            if campaign.get("status") in ("cancelled", "stopped"):
                logger.info("Skipping retry calls for campaign %s - status %s", campaign_id, campaign.get("status"))
                return {
                    "success": False,
                    "error": f"Campaign is {campaign.get('status')}. Retry calls are not allowed.",
                    "campaign_id": campaign_id,
                    "current_status": campaign.get("status"),
                }
            logger.info("Allowing retry calls for campaign %s with status: %s", campaign_id, campaign.get("status"))

        now = now_app()
        start_time = campaign["start_time"]
        end_time = campaign["end_time"]
        if not isinstance(start_time, datetime):
            start_time = date_parser.parse(str(start_time))
        if not isinstance(end_time, datetime):
            end_time = date_parser.parse(str(end_time))
        if start_time.tzinfo is None:
            start_time = start_time.replace(tzinfo=now.tzinfo)
        if end_time.tzinfo is None:
            end_time = end_time.replace(tzinfo=now.tzinfo)

        if int(campaign.get("run_now") or 0) == 0 and now < start_time.astimezone(now.tzinfo):
            return {
                "success": False,
                "error": "Campaign is scheduled to start at " + format_dt(start_time),
                "scheduled": True,
                "start_time": format_dt(start_time),
                "current_time": format_dt(now),
            }
        if now > end_time.astimezone(now.tzinfo):
            return {
                "success": False,
                "error": "Campaign has ended at " + format_dt(end_time),
                "ended": True,
                "end_time": format_dt(end_time),
                "current_time": format_dt(now),
            }

    if not has_table(session_cluster, table_name):
        return {"success": False, "error": "Campaign contacts table not found", "table_name": table_name}

    batch_size = int(campaign.get("batch_size") or 10) if campaign else 10
    batch_interval = int(campaign.get("batch_interval") or 5) if campaign else 5
    exenumber = str(campaign.get("did") or "").strip() if campaign else ""

    has_retry_at = has_column(session_cluster, table_name, "retry_at")
    now_str = format_dt(now_app())

    q_parts = [
        "SELECT id, number, name, status, call_id, `retry`, ",
    ]
    if has_column(session_cluster, table_name, "priority"):
        q_parts.append("priority")
    else:
        q_parts.append("0 AS priority")
    q_parts.append(
        f" FROM `{table_name}` WHERE business_id = :bid AND number IS NOT NULL AND number != ''"
    )
    params: dict[str, Any] = {"bid": business_id}

    if campaign_id:
        q_parts.append(" AND campaign_id = :cid")
        params["cid"] = campaign_id

        if retry_only:
            sub = "UPPER(COALESCE(status, '')) != 'ANSWER' AND COALESCE(`retry`, 0) > 0"
            if has_retry_at:
                sub += f" AND (retry_at <= '{now_str}' OR retry_at IS NULL)"
            q_parts.append(f" AND ({sub})")
        else:
            pending = "(status = 'pending' OR status IS NULL)"
            retry_part = (
                "(UPPER(COALESCE(status, '')) != 'ANSWER' AND call_id IS NOT NULL AND COALESCE(`retry`, 0) > 0"
            )
            if has_retry_at:
                retry_part += f" AND (retry_at <= '{now_str}' OR retry_at IS NULL)"
            retry_part += ")"
            q_parts.append(f" AND (({pending}) OR ({retry_part}))")

    if priority_only and has_column(session_cluster, table_name, "priority"):
        q_parts.append(" AND priority = 1")

    if has_column(session_cluster, table_name, "priority"):
        q_parts.append(" ORDER BY priority DESC, id ASC")
    else:
        q_parts.append(" ORDER BY id ASC")

    sql = "".join(q_parts)
    rows = session_cluster.execute(text(sql), params).mappings().all()

    customer_contacts = list(rows)
    contact_names_map = {int(r["id"]): str(r.get("name") or "").strip() for r in customer_contacts}
    customer_numbers = [str(r["number"]).strip() for r in customer_contacts]
    contact_ids = [int(r["id"]) for r in customer_contacts]

    if not customer_numbers:
        return {"success": False, "error": "No customer numbers found for this campaign", "campaign_id": campaign_id}

    base_data: dict[str, Any] = {"Authorization": auth_token, "exenumber": exenumber}

    success_count = 0
    errors: list[str] = []
    total_contacts = len(customer_numbers)
    batches = [customer_numbers[i : i + batch_size] for i in range(0, len(customer_numbers), batch_size)]
    total_batches = len(batches)

    max_connecting_calls = 10
    check_interval = 5
    max_wait_time = 300

    campaign_bot_id = None
    bot_concurrent_limit = None
    if exenumber != "":
        campaign_bot_id = get_bot_id_from_campaign_did(session_default, int(business_id), exenumber)
        if campaign_bot_id is not None:
            bot_concurrent_limit = get_bot_concurrent_limit(session_cluster, int(business_id), campaign_bot_id)
            logger.info(
                "Bot concurrent limit campaign_id=%s bot_id=%s exenumber=%s limit=%s",
                campaign_id,
                campaign_bot_id,
                exenumber,
                bot_concurrent_limit,
            )

    try:
        initial_connecting = session_cluster.execute(
            text(f"SELECT COUNT(*) FROM `{call_history_tbl}` WHERE dialstatus IN ('CONNECTING')"),
        ).scalar_one()
        logger.info("Initial CONNECTING count: %s", initial_connecting)
        if bot_concurrent_limit is not None and exenumber != "":
            ib = get_bot_connecting_count(session_cluster, call_history_tbl, exenumber)
            logger.info("Initial bot CONNECTING for %s: %s (limit %s)", exenumber, ib, bot_concurrent_limit)
    except Exception as e:  # noqa: BLE001
        logger.error("Error checking initial CONNECTING: %s", e)

    is_paused = False
    for batch_index, batch in enumerate(batches):
        if campaign_id:
            cur = session_cluster.execute(
                text(f"SELECT status FROM `{campaigns_table_name}` WHERE campaign_id = :cid LIMIT 1"),
                {"cid": campaign_id},
            ).first()
            if cur and cur[0] == "paused":
                logger.info("Campaign %s paused at batch %s", campaign_id, batch_index + 1)
                is_paused = True
                break
        if is_paused:
            break

        batch_contact_ids = contact_ids[batch_index * batch_size : batch_index * batch_size + batch_size]

        for index, number in enumerate(batch):
            if campaign_id:
                cur = session_cluster.execute(
                    text(f"SELECT status FROM `{campaigns_table_name}` WHERE campaign_id = :cid LIMIT 1"),
                    {"cid": campaign_id},
                ).first()
                if cur and cur[0] == "paused":
                    logger.info("Campaign %s paused at contact", campaign_id)
                    is_paused = True
                    break

            btc = check_business_time_window(session_default, int(business_id))
            if not btc["valid"]:
                logger.warning("Campaign %s blocked business time", campaign_id)
                session_cluster.commit()
                session_default.commit()
                return {
                    "success": False,
                    "error": btc["message"],
                    "error_code": "BUSINESS_TIME_BLOCKED",
                    "business_time": btc.get("business_time"),
                    "current_time": btc.get("current_time"),
                    "current_day": btc.get("current_day"),
                    "campaign_id": campaign_id,
                }

            if is_paused:
                break

            global_index = batch_index * batch_size + index
            contact_id = batch_contact_ids[index] if index < len(batch_contact_ids) else None

            if contact_id and campaign_id:
                ct = session_cluster.execute(
                    text(
                        f"SELECT id, status, `retry` FROM `{table_name}` WHERE id = :id AND campaign_id = :cid LIMIT 1"
                    ),
                    {"id": contact_id, "cid": campaign_id},
                ).mappings().first()
                if ct:
                    if str(ct.get("status") or "").strip().upper() == "ANSWER":
                        logger.info("Skipping contact %s - ANSWER", contact_id)
                        continue
                    if int(ct.get("retry") or 0) <= 0:
                        logger.info("Skipping contact %s - no retries", contact_id)
                        continue

            wait_start = time.time()
            while True:
                if campaign_id:
                    cur = session_cluster.execute(
                        text(f"SELECT status FROM `{campaigns_table_name}` WHERE campaign_id = :cid LIMIT 1"),
                        {"cid": campaign_id},
                    ).first()
                    if cur and cur[0] == "paused":
                        is_paused = True
                        break
                if is_paused:
                    break
                try:
                    connecting_count = int(
                        session_cluster.execute(
                            text(
                                f"SELECT COUNT(*) FROM `{call_history_tbl}` WHERE dialstatus IN ('CONNECTING')"
                            ),
                        ).scalar_one()
                    )
                    bot_connecting = get_bot_connecting_count(session_cluster, call_history_tbl, exenumber) if exenumber else 0
                    business_ok = connecting_count < max_connecting_calls
                    bot_ok = bot_concurrent_limit is None or bot_connecting < bot_concurrent_limit
                    logger.info(
                        "Concurrent check call %s: business=%s/%s bot=%s",
                        global_index + 1,
                        connecting_count,
                        max_connecting_calls,
                        bot_connecting,
                    )
                    if business_ok and bot_ok:
                        break
                    if (time.time() - wait_start) >= max_wait_time:
                        logger.warning("Max wait exceeded, proceeding")
                        break
                    time.sleep(check_interval)
                except Exception as e:  # noqa: BLE001
                    logger.error("CONNECTING check error: %s", e)
                    break

            if is_paused:
                break

            if campaign_id:
                cur = session_cluster.execute(
                    text(f"SELECT status FROM `{campaigns_table_name}` WHERE campaign_id = :cid LIMIT 1"),
                    {"cid": campaign_id},
                ).first()
                if cur and cur[0] == "paused":
                    is_paused = True
                    break
            if is_paused:
                break

            customer_name = contact_names_map.get(int(contact_id), "") if contact_id else ""
            try:
                refurl = build_mcube_refurl(settings)
            except ValueError as e:
                return {"success": False, "error": str(e), "campaign_id": campaign_id}
            data = {
                **base_data,
                "custnumber": number,
                "custname": customer_name,
                "refurl": refurl,
                "refid": campaign_id,
            }

            if not exenumber:
                errm = f"Missing required field for call. exenumber: empty"
                logger.error(errm)
                errors.append(errm)
                if contact_id and campaign_id:
                    try:
                        session_cluster.execute(
                            text(
                                f"UPDATE `{table_name}` SET status = 'failed', updated_at = :u "
                                "WHERE id = :id AND campaign_id = :cid"
                            ),
                            {"u": now_app(), "id": contact_id, "cid": campaign_id},
                        )
                    except Exception as e:  # noqa: BLE001
                        logger.warning("Failed update contact: %s", e)
                continue

            if global_index > 0:
                time.sleep(0.5)

            max_retries = 1
            retry_count = 0
            call_successful = False
            response_text = None
            http_code = None
            curl_error = None

            while retry_count <= max_retries and not call_successful:
                try:
                    with httpx.Client(timeout=30.0) as client:
                        r = client.post(
                            settings.outbound_call_url,
                            json=data,
                            headers={
                                "Content-Type": "application/json",
                                "Authorization": f"Bearer {auth_token}",
                            },
                        )
                    response_text = r.text
                    http_code = r.status_code
                    curl_error = None
                except Exception as e:  # noqa: BLE001
                    curl_error = str(e)
                    http_code = 0
                    response_text = None

                if http_code and 200 <= http_code < 300:
                    call_successful = True
                elif http_code and 400 <= http_code < 500:
                    break
                elif http_code and http_code >= 500 and retry_count < max_retries:
                    retry_count += 1
                    delay = retry_count * 1
                    logger.info("Retrying after %s s", delay)
                    time.sleep(delay)
                else:
                    break

            if not call_successful and curl_error:
                errors.append(f"Call {global_index} (Number: {number}) - cURL Error: {curl_error}")
                if contact_id and campaign_id:
                    try:
                        session_cluster.execute(
                            text(
                                f"UPDATE `{table_name}` SET status = 'failed', updated_at = :u "
                                "WHERE id = :id AND campaign_id = :cid"
                            ),
                            {"u": now_app(), "id": contact_id, "cid": campaign_id},
                        )
                    except Exception:
                        pass
            elif call_successful and http_code and 200 <= http_code < 300:
                try:
                    response_data = json.loads(response_text or "{}")
                except json.JSONDecodeError:
                    response_data = {}
                api_status = response_data.get("status")
                api_message = response_data.get("msg") or ""
                if api_status == "succ" and response_data.get("callid"):
                    success_count += 1
                    call_id = response_data.get("callid")
                    logger.info("Call placed successfully for %s: %s", number, response_text)

                    retry_already_decremented = False
                    if not priority_only and contact_id and campaign_id:
                        try:
                            current_contact = session_cluster.execute(
                                text(
                                    f"SELECT `retry` FROM `{table_name}` WHERE id = :id AND campaign_id = :cid LIMIT 1"
                                ),
                                {"id": contact_id, "cid": campaign_id},
                            ).first()
                            old_retry = int(current_contact[0] or 0) if current_contact else 0
                            if old_retry > 0:
                                new_retry = old_retry - 1
                                session_cluster.execute(
                                    text(
                                        f"UPDATE `{table_name}` SET `retry` = :nr, updated_at = :u "
                                        "WHERE id = :id AND campaign_id = :cid"
                                    ),
                                    {"nr": new_retry, "u": now_app(), "id": contact_id, "cid": campaign_id},
                                )
                                retry_already_decremented = True
                        except Exception as e:  # noqa: BLE001
                            logger.error("Decrement retry failed: %s", e)

                    if contact_id and campaign_id:
                        try:
                            fetch_dial_status_and_update(
                                session_cluster,
                                call_id,
                                int(contact_id),
                                int(campaign_id),
                                int(business_id),
                                table_name,
                                campaigns_table_name,
                                number,
                                call_history_tbl,
                                retry_already_decremented,
                            )
                        except Exception as e:  # noqa: BLE001
                            logger.warning("fetchCallStatusAndUpdate failed: %s", e)

                    logger.info("Waiting 5 seconds for call to be recorded...")
                    time.sleep(5)

                    try:
                        wait_for_disconnect_start = time.time()
                        while True:
                            if campaign_id:
                                cur = session_cluster.execute(
                                    text(
                                        f"SELECT status FROM `{campaigns_table_name}` WHERE campaign_id = :cid LIMIT 1"
                                    ),
                                    {"cid": campaign_id},
                                ).first()
                                if cur and cur[0] == "paused":
                                    is_paused = True
                                    break
                            if is_paused:
                                break
                            current_connecting = int(
                                session_cluster.execute(
                                    text(
                                        f"SELECT COUNT(*) FROM `{call_history_tbl}` WHERE call_status IN ('CONNECTING')"
                                    ),
                                ).scalar_one()
                            )
                            current_bot = (
                                get_bot_connecting_count(session_cluster, call_history_tbl, exenumber)
                                if exenumber
                                else 0
                            )
                            business_ok = current_connecting < max_connecting_calls
                            bot_ok = bot_concurrent_limit is None or current_bot < bot_concurrent_limit
                            if business_ok and bot_ok:
                                time.sleep(2)
                                final_business = int(
                                    session_cluster.execute(
                                        text(
                                            f"SELECT COUNT(*) FROM `{call_history_tbl}` WHERE call_status IN ('CONNECTING')"
                                        ),
                                    ).scalar_one()
                                )
                                final_bot = (
                                    get_bot_connecting_count(session_cluster, call_history_tbl, exenumber)
                                    if exenumber
                                    else 0
                                )
                                if final_business < max_connecting_calls and (
                                    bot_concurrent_limit is None or final_bot < bot_concurrent_limit
                                ):
                                    logger.info("Concurrent counts OK; ready for next call")
                                    break
                                continue
                            if (time.time() - wait_for_disconnect_start) >= max_wait_time:
                                logger.warning("Max wait on disconnect exceeded")
                                break
                            time.sleep(check_interval)
                    except Exception as e:  # noqa: BLE001
                        logger.error("Error re-checking CONNECTING: %s", e)
                else:
                    errm = f"Call {global_index} (Number: {number}) - API Error: {api_message} (Response: {response_text})"
                    errors.append(errm)
                    if contact_id and campaign_id:
                        try:
                            session_cluster.execute(
                                text(
                                    f"UPDATE `{table_name}` SET status = 'failed', updated_at = :u "
                                    "WHERE id = :id AND campaign_id = :cid"
                                ),
                                {"u": now_app(), "id": contact_id, "cid": campaign_id},
                            )
                        except Exception:
                            pass
            elif not call_successful:
                preview = (response_text or "")[:200] or "empty response"
                errors.append(f"Call {global_index} (Number: {number}) - HTTP Error {http_code}: {preview}")
                if http_code and http_code >= 500:
                    logger.error("Full response failed call %s: %s", number, response_text)
                if contact_id and campaign_id:
                    try:
                        session_cluster.execute(
                            text(
                                f"UPDATE `{table_name}` SET status = 'failed', updated_at = :u "
                                "WHERE id = :id AND campaign_id = :cid"
                            ),
                            {"u": now_app(), "id": contact_id, "cid": campaign_id},
                        )
                    except Exception:
                        pass

        if is_paused:
            break

        if success_count > 0:
            try:
                trigger_call_history_sync(int(business_id), True)
            except Exception as e:  # noqa: BLE001
                logger.warning("trigger_call_history_sync failed: %s", e)

        if campaign_id and not is_paused:
            was_completed = campaign_core.check_and_mark_campaign_completed(
                session_cluster, session_default, int(business_id), int(campaign_id)
            )
            if was_completed:
                logger.info("Campaign %s marked completed after batch %s", campaign_id, batch_index + 1)
                break

        if batch_index < total_batches - 1:
            logger.info("Batch %s done; waiting %s s", batch_index + 1, batch_interval)
            time.sleep(batch_interval)

    if campaign_id and not is_paused:
        campaign_core.check_and_mark_campaign_completed(
            session_cluster, session_default, int(business_id), int(campaign_id)
        )

    if success_count > 0:
        return {
            "success": True,
            "message": f"Successfully initiated {success_count} out of {len(customer_numbers)} calls",
            "call_ids": [],
            "success_count": success_count,
            "total_calls": len(customer_numbers),
            "errors": errors,
        }
    return {"success": False, "error": "All calls failed", "errors": errors}
