from __future__ import annotations

import json
import logging
import time
from datetime import datetime, timedelta
from typing import Any

from sqlalchemy import text
from sqlalchemy.orm import Session

from app.config import get_settings
from app.repositories.schema_helper import has_column, has_table, table_columns
from app.services.audit_log import audit_trail
from app.utils.cluster_table_names import campaign_contacts_table, campaigns_table
from app.utils.rabbitmq_campaign import delete_queue_for_campaign
from app.utils.timezone_util import format_dt, now_app

logger = logging.getLogger(__name__)


def _parse_waiting_minutes_for_retry(waiting_time_raw: Any) -> int:
    """PHP-style: first numeric segment or cast of string."""
    if waiting_time_raw is None:
        return 0
    s = str(waiting_time_raw).strip()
    if s == "" or s == "0":
        return 0
    if "," in s:
        first = s.split(",")[0].strip()
        try:
            return int(float(first))
        except ValueError:
            return 0
    try:
        return int(float(s))
    except ValueError:
        return 0


def check_remaining_minutes(session: Session, business_id: int) -> dict[str, Any]:
    try:
        row = session.execute(
            text("SELECT remain_min FROM businesses WHERE business_id = :bid LIMIT 1"),
            {"bid": business_id},
        ).first()
        if not row:
            return {"valid": False, "message": "Business not found"}
        remain_min = row[0] if row[0] is not None else 0
        if remain_min <= 0:
            return {
                "valid": False,
                "message": "Insufficient remaining minutes. Please recharge or upgrade your plan to start campaigns.",
                "remain_min": remain_min,
            }
        return {"valid": True, "remain_min": remain_min}
    except Exception as e:  # noqa: BLE001
        logger.error("Error checking remaining minutes: %s", e)
        return {"valid": False, "message": "Error checking remaining minutes. Please try again."}


def check_business_time_window(session: Session, business_id: int) -> dict[str, Any]:
    msg = "Please check the business,if you wan to allow the campaign call edit the business time"
    try:
        row = session.execute(
            text("SELECT business_time FROM businesses WHERE business_id = :bid LIMIT 1"),
            {"bid": business_id},
        ).first()
        if not row:
            return {"valid": False, "message": msg}

        business_time = row[0]
        if isinstance(business_time, str):
            try:
                decoded = json.loads(business_time)
                if isinstance(decoded, dict):
                    business_time = decoded
            except json.JSONDecodeError:
                pass

        if not isinstance(business_time, dict):
            return {"valid": False, "message": msg}

        now = now_app()
        day_key = now.strftime("%A").lower()
        day_config = business_time.get(day_key)
        if not day_config or not isinstance(day_config, dict):
            return {
                "valid": False,
                "message": msg,
                "business_time": business_time,
                "current_day": day_key,
                "current_time": now.strftime("%H:%M"),
            }

        is_enabled = day_config.get("enabled", True)
        if is_enabled is False:
            return {
                "valid": False,
                "message": msg,
                "business_time": business_time,
                "current_day": day_key,
                "current_time": now.strftime("%H:%M"),
            }

        time_slots: list[dict[str, Any]] = []
        if day_config.get("timeSlots") and isinstance(day_config["timeSlots"], list) and len(day_config["timeSlots"]) > 0:
            time_slots = day_config["timeSlots"]
        elif day_config.get("start") and day_config.get("end"):
            time_slots = [{"start": day_config["start"], "end": day_config["end"]}]
        else:
            return {
                "valid": False,
                "message": msg,
                "business_time": business_time,
                "current_day": day_key,
                "current_time": now.strftime("%H:%M"),
            }

        is_within_any_slot = False
        for slot in time_slots:
            start = slot.get("start")
            end = slot.get("end")
            if not start or not end:
                continue
            try:
                st = datetime.strptime(str(start), "%H:%M").replace(
                    year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo
                )
                et = datetime.strptime(str(end), "%H:%M").replace(
                    year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo
                )
            except ValueError:
                continue
            if now >= st and now <= et:
                is_within_any_slot = True
                break

        if not is_within_any_slot:
            return {
                "valid": False,
                "message": msg,
                "business_time": business_time,
                "current_day": day_key,
                "current_time": now.strftime("%H:%M"),
            }

        return {
            "valid": True,
            "business_time": business_time,
            "current_day": day_key,
            "current_time": now.strftime("%H:%M"),
        }
    except Exception as e:  # noqa: BLE001
        logger.error("Error checking business time: %s", e, extra={"business_id": business_id})
        return {"valid": False, "message": msg}


def get_bot_id_from_campaign_did(session: Session, business_id: int, exenumber: str) -> int | None:
    if exenumber == "":
        return None
    try:
        if not has_table(session, "did_numbers"):
            return None
        cols = table_columns(session, "did_numbers")
        has_bot_id = "bot_id" in cols
        has_exe_number = "exe_number" in cols
        if not has_bot_id:
            return None
        if has_exe_number:
            q = text(
                """
                SELECT bot_id FROM did_numbers
                WHERE business_id = :bid AND (did_no = :ex OR exe_number = :ex)
                LIMIT 1
                """
            )
        else:
            q = text(
                """
                SELECT bot_id FROM did_numbers
                WHERE business_id = :bid AND did_no = :ex
                LIMIT 1
                """
            )
        row = session.execute(q, {"bid": business_id, "ex": exenumber}).first()
        if not row or row[0] in (None, ""):
            return None
        try:
            return int(row[0])
        except (TypeError, ValueError):
            return None
    except Exception as e:  # noqa: BLE001
        logger.debug("getBotIdFromCampaignDid failed: %s", e)
        return None


def get_bot_concurrent_limit(session_cluster: Session, business_id: int, bot_id: int | None) -> int | None:
    if bot_id is None or bot_id == "":
        return None
    bots_table = f"{business_id}_bots"
    try:
        if not has_table(session_cluster, bots_table):
            return None
        row = session_cluster.execute(
            text(f"SELECT concurrent_calls FROM `{bots_table}` WHERE bot_id = :bid LIMIT 1"),
            {"bid": bot_id},
        ).first()
        if not row:
            return None
        val = row[0]
        if has_column(session_cluster, bots_table, "concurrent_calls"):
            if isinstance(val, (int, float)) and int(val) > 0:
                return int(val)
        if isinstance(val, str):
            try:
                decoded = json.loads(val)
                limit = decoded.get("limit") or decoded.get("concurrent_calls_limit")
                if limit is not None and int(limit) > 0:
                    return int(limit)
            except json.JSONDecodeError:
                pass
        return None
    except Exception as e:  # noqa: BLE001
        logger.warning("getBotConcurrentLimit failed: %s", e)
        return None


def get_bot_connecting_count(session_cluster: Session, call_history_table: str, exenumber: str) -> int:
    if exenumber == "":
        return 0
    try:
        n = session_cluster.execute(
            text(
                f"SELECT COUNT(*) FROM `{call_history_table}` WHERE call_status IN ('CONNECTING') AND clicktocalldid = :ex"
            ),
            {"ex": exenumber},
        ).scalar_one()
        return int(n)
    except Exception:
        try:
            n = session_cluster.execute(
                text(
                    f"SELECT COUNT(*) FROM `{call_history_table}` WHERE call_status IN ('CONNECTING') AND exenumber = :ex"
                ),
                {"ex": exenumber},
            ).scalar_one()
            return int(n)
        except Exception as e2:  # noqa: BLE001
            logger.debug("getBotConnectingCount failed: %s", e2)
            return 0


def resolve_business_id_for_campaign(
    session_default: Session, session_cluster: Session, campaign_id: int
) -> int | None:
    if has_table(session_default, "scheduled_campaigns"):
        row = session_default.execute(
            text("SELECT business_id FROM scheduled_campaigns WHERE campaign_id = :cid LIMIT 1"),
            {"cid": campaign_id},
        ).first()
        if row and row[0]:
            return int(row[0])

    rows = session_default.execute(
        text("SELECT business_id FROM businesses WHERE status = 'active'")
    ).fetchall()
    for (bid,) in rows:
        tname = campaigns_table(bid)
        if has_table(session_cluster, tname):
            ex = session_cluster.execute(
                text(f"SELECT 1 FROM `{tname}` WHERE campaign_id = :cid LIMIT 1"),
                {"cid": campaign_id},
            ).first()
            if ex:
                return int(bid)
    return None


def compute_waiting_time_for_new_campaign(
    session_default: Session,
    business_id: int,
    retry_logic: str,
    custom_attempt_values: list[Any] | None,
) -> str:
    """Matches CampaignController::store waiting_time / retry_logic branches."""
    waiting_time_values = ""
    if retry_logic == "business_logic":
        try:
            row = session_default.execute(
                text("SELECT retry_attempt FROM businesses WHERE business_id = :bid LIMIT 1"),
                {"bid": business_id},
            ).first()
            ra = row[0] if row else None
            data = ra
            if isinstance(data, str):
                data = json.loads(data)
            if isinstance(data, dict) and data:
                values: list[int] = []
                for _k, v in data.items():
                    if isinstance(v, (int, float)) or (isinstance(v, str) and v.replace(".", "", 1).isdigit()):
                        values.append(int(float(v)))
                if values:
                    waiting_time_values = ",".join(str(x) for x in values)
        except Exception:
            waiting_time_values = ""
    elif retry_logic == "custom_logic":
        custom_values = custom_attempt_values or []
        if isinstance(custom_values, list) and custom_values:
            minutes: list[int] = []
            for v in custom_values:
                try:
                    n = float(v) if v is not None else 0.0
                except (TypeError, ValueError):
                    n = 0.0
                minutes.append(int(round(n * 60)))
            waiting_time_values = ",".join(str(m) for m in minutes if m >= 0)
    elif retry_logic == "not_retry":
        waiting_time_values = ""
    return waiting_time_values


def fetch_executive_number(
    session_default: Session, business_id: int, did_no: str | None, bot_name: str | None
) -> str:
    if not did_no:
        return ""
    try:
        cols = table_columns(session_default, "did_numbers")
        has_direction = "direction" in cols
        has_exe_number = "exe_number" in cols
        sql = [
            "SELECT did_no",
        ]
        if has_exe_number:
            sql.append(", exe_number")
        sql.append(" FROM did_numbers WHERE business_id = :bid AND did_no = :did")
        params: dict[str, Any] = {"bid": business_id, "did": did_no}
        if bot_name:
            sql.append(" AND bot_id = :bot")
            params["bot"] = bot_name
        if has_direction:
            sql.append(" AND direction = 'outbound'")
        sql.append(" LIMIT 1")
        row = session_default.execute(text(" ".join(sql)), params).first()
        if row:
            if has_exe_number and len(row) > 1 and row[1] not in (None, ""):
                return str(row[1])
            return str(row[0])
        return did_no
    except Exception as e:  # noqa: BLE001
        logger.error("Error fetching executive number: %s", e)
        return did_no


def fetch_dial_status_and_update(
    session_cluster: Session,
    call_id: Any,
    contact_id: int,
    campaign_id: int,
    business_id: int,
    table_name: str,
    campaigns_table_name: str,
    number: str,
    call_history_table: str,
    retry_already_decremented: bool = False,
) -> dict[str, Any]:
    _ = retry_already_decremented  # PHP param; decrement happens before this call in outbound flow
    try:
        time.sleep(3)
        call_id_str = str(call_id)
        row = session_cluster.execute(
            text(f"SELECT callid, call_status FROM `{call_history_table}` WHERE callid = :cid LIMIT 1"),
            {"cid": call_id_str},
        ).first()
        if not row:
            row = session_cluster.execute(
                text(f"SELECT callid, call_status FROM `{call_history_table}` WHERE callid = :cid LIMIT 1"),
                {"cid": call_id},
            ).first()

        dial_status = None
        if row and row[1] is not None:
            dial_status = str(row[1]).strip()

        update_data: dict[str, Any] = {"call_id": call_id, "updated_at": now_app()}
        if has_column(session_cluster, table_name, "call_status") and dial_status is not None:
            update_data["call_status"] = dial_status

        if dial_status is not None:
            if dial_status.upper() == "ANSWER":
                update_data["status"] = "ANSWER"
                logger.info("Call answered for contact %s (Number: %s) - no retry needed", contact_id, number)
            else:
                contact = session_cluster.execute(
                    text(f"SELECT id, `retry`, status FROM `{table_name}` WHERE id = :id AND campaign_id = :cid LIMIT 1"),
                    {"id": contact_id, "cid": campaign_id},
                ).mappings().first()
                campaign = session_cluster.execute(
                    text(
                        f"SELECT campaign_id, waiting_time FROM `{campaigns_table_name}` WHERE campaign_id = :cid LIMIT 1"
                    ),
                    {"cid": campaign_id},
                ).mappings().first()

                current_retry = int(contact["retry"] or 0) if contact else 0
                waiting_time_raw = campaign["waiting_time"] if campaign else 0
                waiting_minutes = _parse_waiting_minutes_for_retry(waiting_time_raw)

                update_data["status"] = dial_status

                if current_retry > 0:
                    if has_column(session_cluster, table_name, "retry_at"):
                        if waiting_minutes > 0:
                            retry_at = now_app() + timedelta(minutes=waiting_minutes)
                            update_data["retry_at"] = format_dt(retry_at)
                        else:
                            update_data["retry_at"] = format_dt(now_app())
                else:
                    if has_column(session_cluster, table_name, "retry_at"):
                        update_data["retry_at"] = None
                    logger.info(
                        "No retries remaining for contact %s (Number: %s)",
                        contact_id,
                        number,
                    )
        else:
            update_data["status"] = "pending"

        if has_column(session_cluster, table_name, "priority"):
            update_data["priority"] = 0

        sets = ", ".join(f"`{k}` = :{k}" for k in update_data.keys())
        params = {**update_data, "id": contact_id, "cid": campaign_id}
        res = session_cluster.execute(
            text(f"UPDATE `{table_name}` SET {sets} WHERE id = :id AND campaign_id = :cid"),
            params,
        )
        logger.info(
            "Updated contact %s with call_status from mcube rows=%s",
            contact_id,
            res.rowcount,
        )

        if str(update_data.get("status", "")).upper() == "ANSWER":
            try:
                check_and_mark_campaign_completed(session_cluster, session_default, business_id, campaign_id)
            except Exception as e:  # noqa: BLE001
                logger.warning("Failed to check campaign completion after ANSWER: %s", e)

        return {"success": True, "call_status": dial_status, "status": update_data.get("status"), "updated": True}
    except Exception as e:  # noqa: BLE001
        logger.error("Error fetching call_status from mcube for call %s: %s", call_id, e)
        try:
            fb = {"status": "pending", "call_id": call_id, "updated_at": now_app()}
            if has_column(session_cluster, table_name, "priority"):
                fb["priority"] = 0
            sets = ", ".join(f"`{k}` = :{k}" for k in fb.keys())
            session_cluster.execute(
                text(f"UPDATE `{table_name}` SET {sets} WHERE id = :id AND campaign_id = :cid"),
                {**fb, "id": contact_id, "cid": campaign_id},
            )
        except Exception as e2:  # noqa: BLE001
            logger.error("Failed fallback contact update: %s", e2)
        return {"success": False, "error": str(e)}


def check_and_mark_campaign_completed(
    session_cluster: Session,
    session_default: Session,
    business_id: int,
    campaign_id: int,
) -> bool:
    try:
        table_name = campaign_contacts_table(business_id)
        campaigns_table_name = campaigns_table(business_id)

        campaign = session_cluster.execute(
            text(f"SELECT * FROM `{campaigns_table_name}` WHERE campaign_id = :cid LIMIT 1"),
            {"cid": campaign_id},
        ).mappings().first()
        if not campaign:
            return False

        current_status = campaign.get("status") or "unknown"

        if current_status == "completed":
            delete_queue_for_campaign(business_id, campaign_id)
            return False

        total_contacts = session_cluster.execute(
            text(f"SELECT COUNT(*) FROM `{table_name}` WHERE campaign_id = :cid"),
            {"cid": campaign_id},
        ).scalar_one()
        total_contacts = int(total_contacts)
        if total_contacts == 0:
            return False

        waiting_raw = campaign.get("waiting_time")
        wt_trimmed = str(waiting_raw).strip() if waiting_raw is not None else ""
        if wt_trimmed == "" or wt_trimmed == "0":
            session_cluster.execute(
                text(f"UPDATE `{campaigns_table_name}` SET status = 'completed', updated_at = :u WHERE campaign_id = :cid"),
                {"u": now_app(), "cid": campaign_id},
            )
            delete_queue_for_campaign(business_id, campaign_id)
            logger.info(
                "Campaign %s marked completed (waiting_time empty)",
                campaign_id,
            )
            return True

        answer_contacts = session_cluster.execute(
            text(
                f"SELECT COUNT(*) FROM `{table_name}` WHERE campaign_id = :cid "
                "AND UPPER(TRIM(COALESCE(status, ''))) = 'ANSWER'"
            ),
            {"cid": campaign_id},
        ).scalar_one()
        answer_contacts = int(answer_contacts)
        if answer_contacts == total_contacts and total_contacts > 0:
            session_cluster.execute(
                text(f"UPDATE `{campaigns_table_name}` SET status = 'completed', updated_at = :u WHERE campaign_id = :cid"),
                {"u": now_app(), "cid": campaign_id},
            )
            delete_queue_for_campaign(business_id, campaign_id)
            logger.info("Campaign %s marked completed (all ANSWER)", campaign_id)
            return True

        not_attempted = session_cluster.execute(
            text(f"SELECT COUNT(*) FROM `{table_name}` WHERE campaign_id = :cid AND call_id IS NULL"),
            {"cid": campaign_id},
        ).scalar_one()
        attempted = session_cluster.execute(
            text(f"SELECT COUNT(*) FROM `{table_name}` WHERE campaign_id = :cid AND call_id IS NOT NULL"),
            {"cid": campaign_id},
        ).scalar_one()
        not_attempted = int(not_attempted)
        attempted = int(attempted)
        all_attempted = not_attempted == 0 and total_contacts > 0 and attempted == total_contacts

        logger.info(
            "checkAndMarkCampaignCompleted campaign=%s total=%s attempted=%s not_attempted=%s all_attempted=%s",
            campaign_id,
            total_contacts,
            attempted,
            not_attempted,
            all_attempted,
        )

        if all_attempted:
            is_run_now = int(campaign.get("run_now") or 0) == 0
            cur_wt = campaign.get("waiting_time")
            cur_trim = str(cur_wt).strip() if cur_wt is not None else ""
            waiting_time_already_set = cur_trim != "" and cur_trim != "0"

            if current_status != "completed":
                if is_run_now and not waiting_time_already_set:
                    logger.info("Run_now first pass done - updating waiting_time/retry")
                    update_waiting_time_and_retry_after_completion(
                        session_cluster, session_default, business_id, campaign_id
                    )
                elif is_run_now and waiting_time_already_set:
                    logger.debug("Campaign %s already has waiting_time set", campaign_id)

                session_cluster.execute(
                    text(f"UPDATE `{campaigns_table_name}` SET status = 'active', updated_at = :u WHERE campaign_id = :cid"),
                    {"u": now_app(), "cid": campaign_id},
                )
                logger.info("Campaign %s set active (first pass done)", campaign_id)
                return True
            else:
                if is_run_now:
                    if cur_trim == "":
                        logger.info("Campaign completed but waiting_time empty, updating...")
                        update_waiting_time_and_retry_after_completion(
                            session_cluster, session_default, business_id, campaign_id
                        )
                return False

        logger.info("Campaign not ready for completion campaign=%s", campaign_id)
        return False
    except Exception as e:  # noqa: BLE001
        logger.error("checkAndMarkCampaignCompleted error: %s", e)
        return False


def update_waiting_time_after_retry(
    session_cluster: Session,
    campaign_id: int,
    business_id: int,
    result: dict[str, Any],
) -> None:
    campaigns_table_name = campaigns_table(business_id)
    if not has_table(session_cluster, campaigns_table_name):
        return
    campaign = session_cluster.execute(
        text(f"SELECT * FROM `{campaigns_table_name}` WHERE campaign_id = :cid LIMIT 1"),
        {"cid": campaign_id},
    ).mappings().first()
    if not campaign:
        return
    if not (result.get("success") is True):
        return

    waiting_raw = campaign.get("waiting_time") or "0"
    if isinstance(waiting_raw, str) and "," in waiting_raw:
        arr = [x.strip() for x in waiting_raw.split(",") if x.strip() and x.strip().replace(".", "", 1).isdigit()]
        waiting_arr = [int(float(x)) for x in arr]
    else:
        single = int(float(waiting_raw)) if str(waiting_raw).replace(".", "", 1).isdigit() else 0
        waiting_arr = [single] if single > 0 else []

    if not waiting_arr:
        return
    waiting_arr.pop(0)
    updated_waiting = ",".join(str(x) for x in waiting_arr) if waiting_arr else ""
    payload = {"waiting_time": updated_waiting}
    if updated_waiting == "":
        payload["status"] = "completed"
        payload["updated_at"] = now_app()
    sets = ", ".join(f"`{k}` = :{k}" for k in payload.keys())
    session_cluster.execute(
        text(f"UPDATE `{campaigns_table_name}` SET {sets} WHERE campaign_id = :cid"),
        {**payload, "cid": campaign_id},
    )
    if updated_waiting == "":
        delete_queue_for_campaign(business_id, campaign_id)


def update_waiting_time_and_retry_for_scheduled_campaign(
    session_cluster: Session, session_default: Session, business_id: int, campaign_id: int
) -> None:
    table_name = campaigns_table(business_id)
    contacts_table = campaign_contacts_table(business_id)
    try:
        campaign = session_cluster.execute(
            text(f"SELECT * FROM `{table_name}` WHERE campaign_id = :cid LIMIT 1"),
            {"cid": campaign_id},
        ).mappings().first()
        if not campaign:
            return
        if int(campaign.get("run_now") or 0) != 1:
            return

        waiting_time_values = ""
        retry_attempt_count = 0
        try:
            row = session_default.execute(
                text("SELECT retry_attempt FROM businesses WHERE business_id = :bid LIMIT 1"),
                {"bid": business_id},
            ).first()
            retry_attempt = row[0] if row else None
            data = retry_attempt
            if isinstance(data, str):
                data = json.loads(data)
            if isinstance(data, dict) and data:
                values = []
                for _k, v in data.items():
                    if isinstance(v, (int, float)) or (isinstance(v, str) and v.replace(".", "", 1).isdigit()):
                        values.append(int(float(v)))
                if values:
                    waiting_time_values = ",".join(str(x) for x in values)
                    retry_attempt_count = len(values) + 1
        except Exception as e:  # noqa: BLE001
            logger.error("retry_attempt fetch error: %s", e)

        if waiting_time_values:
            session_cluster.execute(
                text(f"UPDATE `{table_name}` SET waiting_time = :wt, updated_at = :u WHERE campaign_id = :cid"),
                {"wt": waiting_time_values, "u": now_app(), "cid": campaign_id},
            )

        if has_table(session_cluster, contacts_table):
            session_cluster.execute(
                text(
                    f"UPDATE `{contacts_table}` SET `retry` = :r, updated_at = :u "
                    "WHERE campaign_id = :cid AND business_id = :bid"
                ),
                {"r": retry_attempt_count, "u": now_app(), "cid": campaign_id, "bid": business_id},
            )
    except Exception as e:  # noqa: BLE001
        logger.error("update_waiting_time_and_retry_for_scheduled_campaign: %s", e)


def update_waiting_time_and_retry_after_completion(
    session_cluster: Session, session_default: Session, business_id: int, campaign_id: int
) -> None:
    table_name = campaigns_table(business_id)
    contacts_table = campaign_contacts_table(business_id)
    try:
        campaign = session_cluster.execute(
            text(f"SELECT * FROM `{table_name}` WHERE campaign_id = :cid LIMIT 1"),
            {"cid": campaign_id},
        ).mappings().first()
        if not campaign:
            return
        if int(campaign.get("run_now") or 0) == 1:
            return

        waiting_time_values = ""
        retry_attempt_count = 0
        try:
            row = session_default.execute(
                text("SELECT retry_attempt FROM businesses WHERE business_id = :bid LIMIT 1"),
                {"bid": business_id},
            ).first()
            retry_attempt = row[0] if row else None
            data = retry_attempt
            if isinstance(data, str):
                data = json.loads(data)
            if isinstance(data, dict) and data:
                values = []
                for _k, v in data.items():
                    if isinstance(v, (int, float)) or (isinstance(v, str) and v.replace(".", "", 1).isdigit()):
                        values.append(int(float(v)))
                if values:
                    waiting_time_values = ",".join(str(x) for x in values)
                    retry_attempt_count = len(values) + 1
        except Exception as e:  # noqa: BLE001
            logger.error("retry_attempt fetch error: %s", e)

        session_cluster.execute(
            text(f"UPDATE `{table_name}` SET waiting_time = :wt, updated_at = :u WHERE campaign_id = :cid"),
            {"wt": waiting_time_values, "u": now_app(), "cid": campaign_id},
        )

        if has_table(session_cluster, contacts_table):
            session_cluster.execute(
                text(
                    f"UPDATE `{contacts_table}` SET `retry` = :r, updated_at = :u "
                    "WHERE campaign_id = :cid AND business_id = :bid"
                ),
                {"r": retry_attempt_count, "u": now_app(), "cid": campaign_id, "bid": business_id},
            )
    except Exception as e:  # noqa: BLE001
        logger.error("update_waiting_time_and_retry_after_completion: %s", e)


def store_campaign_contacts(
    session_cluster: Session,
    session_default: Session,
    business_id: int,
    campaign_id: int,
    contacts: list[dict[str, Any]],
) -> dict[str, Any]:
    table_name = campaign_contacts_table(business_id)
    campaigns_table_name = campaigns_table(business_id)
    if not has_table(session_cluster, table_name):
        raise RuntimeError(f"Campaign contacts table {table_name} does not exist")

    campaign = session_cluster.execute(
        text(f"SELECT * FROM `{campaigns_table_name}` WHERE campaign_id = :cid LIMIT 1"),
        {"cid": campaign_id},
    ).mappings().first()

    retry_value = 0
    is_run_now = False
    is_not_retry_campaign = False

    if campaign:
        is_run_now = int(campaign.get("run_now") or 0) == 0
        campaign_retry_logic = campaign.get("retry_logic") or ""

        if campaign_retry_logic == "not_retry":
            retry_value = 1
            is_not_retry_campaign = True
        elif is_run_now:
            wt = campaign.get("waiting_time")
            if wt is not None and str(wt).strip() != "":
                waiting_arr = [x.strip() for x in str(wt).split(",") if x.strip() and x.strip().replace(".", "", 1).isdigit()]
                retry_value = len(waiting_arr) + 1 if waiting_arr else 1
            else:
                if campaign_retry_logic == "not_retry" or campaign_retry_logic != "business_logic":
                    retry_value = 1
                    is_not_retry_campaign = True
                    logger.info("storeCampaignContacts - not_retry or empty waiting_time, retry=1")
                else:
                    try:
                        row = session_default.execute(
                            text("SELECT retry_attempt FROM businesses WHERE business_id = :bid LIMIT 1"),
                            {"bid": business_id},
                        ).first()
                        ra = row[0] if row else None
                        data = ra
                        if isinstance(data, str):
                            data = json.loads(data)
                        if isinstance(data, dict) and data:
                            values = []
                            for _k, v in data.items():
                                if isinstance(v, (int, float)) or (isinstance(v, str) and v.replace(".", "", 1).isdigit()):
                                    values.append(int(float(v)))
                            retry_value = len(values) if values else 1
                        else:
                            retry_value = 1
                    except Exception as e:  # noqa: BLE001
                        retry_value = 0
                        logger.warning("storeCampaignContacts business retry error: %s", e)
        else:
            logger.info("Scheduled campaign, retry set when starts")

    contact_rows = []
    for c in contacts:
        name = c.get("name") or ""
        number = c.get("contact") or c.get("phone_number") or ""
        final_retry = retry_value if is_run_now else 0
        contact_rows.append(
            {
                "business_id": business_id,
                "campaign_id": campaign_id,
                "name": name,
                "number": number,
                "source": c.get("source") or "",
                "status": "pending",
                "retry": final_retry,
                "created_at": now_app(),
                "updated_at": now_app(),
            }
        )

    cols = ["business_id", "campaign_id", "name", "number", "source", "status", "retry", "created_at", "updated_at"]
    placeholders = ", ".join(f":{c}" for c in cols)
    collist = ", ".join(f"`{c}`" for c in cols)
    for row in contact_rows:
        session_cluster.execute(
            text(f"INSERT INTO `{table_name}` ({collist}) VALUES ({placeholders})"),
            row,
        )

    if is_run_now and campaign and not is_not_retry_campaign:
        if retry_value <= 0 and campaign:
            wt = campaign.get("waiting_time")
            if wt is not None and str(wt).strip() != "":
                waiting_arr = [x.strip() for x in str(wt).split(",") if x.strip() and x.strip().replace(".", "", 1).isdigit()]
                retry_value = len(waiting_arr) if waiting_arr else 1
            else:
                try:
                    row = session_default.execute(
                        text("SELECT retry_attempt FROM businesses WHERE business_id = :bid LIMIT 1"),
                        {"bid": business_id},
                    ).first()
                    ra = row[0] if row else None
                    data = ra
                    if isinstance(data, str):
                        data = json.loads(data)
                    if isinstance(data, dict) and data:
                        values = []
                        for _k, v in data.items():
                            if isinstance(v, (int, float)) or (isinstance(v, str) and v.replace(".", "", 1).isdigit()):
                                values.append(int(float(v)))
                        retry_value = len(values) if values else 1
                    else:
                        retry_value = 1
                except Exception:
                    retry_value = 1

        if retry_value > 0:
            session_cluster.execute(
                text(
                    f"UPDATE `{table_name}` SET `retry` = :r, updated_at = :u "
                    "WHERE campaign_id = :cid AND business_id = :bid AND (`retry` != :r OR `retry` IS NULL OR `retry` = 0)"
                ),
                {"r": retry_value, "u": now_app(), "cid": campaign_id, "bid": business_id},
            )
        else:
            logger.warning("storeCampaignContacts could not determine retry_value")

    inserted = session_cluster.execute(
        text(
            f"SELECT id, number, status, name, campaign_id, business_id FROM `{table_name}` "
            "WHERE campaign_id = :cid AND business_id = :bid ORDER BY id DESC LIMIT :lim"
        ),
        {"cid": campaign_id, "bid": business_id, "lim": len(contact_rows)},
    ).mappings().all()

    return {"success": True, "inserted_count": len(contact_rows), "contacts": list(inserted)}
