# LiveKit Multi-Provider Campaign & Click-to-Call Platform — Base Architecture

## 1) Purpose and guiding principles

### Primary goals
- Run outbound voice campaigns (single + bulk) with AI voice bots built on LiveKit Agents.
- Support multiple telephony providers (MCube, Exotel, Twilio) behind a single abstraction layer.
- Provide production-grade scheduling, retry, callbacks, concurrency control, idempotent webhooks, and fast analytics.
- Support click-to-call and ongoing campaigns with the same call lifecycle state machine.

### Non-negotiable constraints (from requirements)
- Multi-tenant isolation: every DB query must scope by `tenant_id` (enforced via a shared tenant dependency and careful model boundaries).
- Async-first: all database operations are async using SQLAlchemy asyncio with an async driver (asyncpg).
- Credentials encryption at rest: `telephony_integrations.credentials` is encrypted using `cryptography.fernet` with `ENCRYPTION_KEY`.
- Idempotent webhooks: use Redis `SET NX EX` with key `webhook_processed:{call_sid}:{event}` before processing provider webhooks.
- No blocking calls: all I/O uses `await`; CPU-heavy work (e.g., CSV parsing) goes through `asyncio.to_thread`.
- RabbitMQ durability: exchanges, queues, and messages are durable/persistent; dead-lettering is used for failure management.
- Fast analytics: never query `campaign_contacts` directly inside analytics endpoints; use Redis cache first, then a materialized view fallback.

## 2) High-level architecture

### Core services
1. **API Service (FastAPI)**
   - Auth + tenant resolution (JWT -> `tenant_id`)
   - Campaign CRUD + campaign start/pause/resume/stop
   - Click-to-call endpoint (immediate initiation)
   - CSV contact upload endpoints (enqueue/stream progress)
   - Analytics endpoints (read from Redis/materialized view)
   - Webhook endpoints (one router per provider)

2. **Worker Service (RabbitMQ consumer)**
   - Consumes call initiation messages (click-to-call/campaign/bulk)
   - Applies business hours logic, concurrency logic, and provider initiation
   - Updates `campaign_contacts` and `call_logs` at dialing time
   - On failures, schedules retry via delayed messaging

3. **Scheduler Service (APScheduler or equivalent)**
   - Enforces business hours at the campaign level (pause/resume)
   - Fires due callbacks
   - Refreshes `campaign_stats` materialized view
   - Starts scheduled campaigns when `scheduled_at <= now`

4. **LiveKit Bridge Dispatch**
   - Provides/hosts the “answer_url” and maps telephony call to LiveKit room/agent dispatch.
   - Bridges provider call lifecycle events to LiveKit agent execution and bot behavior.

5. **Provider Adapter Layer**
   - `TelephonyProvider` interface
   - Concrete implementations for MCube, Exotel, Twilio
   - Provider webhook parsing + voicemail detection + call status normalization

### Data stores
- **PostgreSQL**
  - Tenant-isolated relational model (but within same DB, `tenant_id` is the isolation boundary).
  - Encrypted credentials at rest.
  - Materialized view for aggregated stats.
- **Redis**
  - Webhook idempotency keys
  - Campaign/call concurrency counters
  - Cached analytics snapshots
  - Callback due queue (sorted set)
  - CSV upload progress + blocked phone sets
- **RabbitMQ**
  - Durable messaging for call initiation, retries, callbacks
  - Dead-letter exchanges for failed attempts

## 3) Canonical call lifecycle state model

### Campaign-level statuses
- `draft`: campaign configuration exists; no calls running.
- `scheduled`: calls will start at `scheduled_at`.
- `running`: calls are being initiated and managed.
- `paused`: calls are not being initiated; pending contacts remain queued for later resume.
- `completed`: all intended contacts reached a terminal status.
- `failed`: campaign execution aborted due to unrecoverable errors (rare; separate from per-contact failures).

### Contact-level statuses (campaign_contacts.status)
- `pending`: ready to be dialed.
- `dialing`: worker has requested the provider; call_sid is stored in `call_logs` and/or `campaign_contacts`.
- `answered`: provider webhook confirmed answered and call is associated.
- `not_answered`: provider webhook indicates no answer (or max retries reached for that reason).
- `failed`: provider webhook indicates call failed, or max retries reached for generic failures.
- `voicemail`: provider webhook indicates voicemail detection or max retries reached with voicemail reason.
- `blocked`: phone number is blocked (blocked list / policy).
- `callback_scheduled`: call ended; a future callback time is scheduled.
- `completed`: a “successful terminal state” for reporting (answered + call finished) can be mapped here depending on product definition.

### Call logs (call_logs)
`call_logs` is the authoritative per-call record:
- unique `call_sid` (enforced uniqueness)
- provider, direction, phone, status
- recording_url, started_at, ended_at
- linked `tenant_id`, `campaign_id`, `contact_id`, and `livekit_room`

## 4) Data model architecture (SQLAlchemy + asyncpg)

### Multi-tenant strategy
- Every table includes `tenant_id` or is reachable by foreign keys from a tenant-scoped table.
- Enforce tenant filtering in:
  - repositories/queries
  - API dependencies
  - webhook processing routes

### Encrypted credentials
- `telephony_integrations.credentials` is stored encrypted.
- Runtime:
  - decrypt credentials in memory when building provider adapter
  - never log decrypted secrets

### Table overview and responsibilities
1. `tenants`
   - Tenant identity + timezone (used for business hours defaults).
2. `telephony_integrations`
   - Which provider is enabled for a tenant and holds credentials.
3. `contact_lists` and `contacts`
   - Reusable phone lists and normalized phone storage in E.164.
4. `campaigns`
   - Bot configuration knobs + references to LiveKit agent id.
5. `campaign_contacts`
   - Dialing lifecycle state per campaign+contact, including attempt counts and retry scheduling.
6. `call_logs`
   - Provider-level call tracking with unique `call_sid`.
7. `business_hours`, `holidays`
   - Tenant-specific allowed calling windows.
8. `sms_templates`
   - Tenant SMS templates (body with placeholder support).

### Stats materialized view: `campaign_stats`
- Must aggregate at least:
  - total_contacts
  - responded_users
  - attempted_calls
  - failed_calls
  - not_answered
  - blocked_calls
  - voicemail_on_hold
- Grouped by `campaign_id`.

### Indexing and performance expectations (design-level)
- Ensure indexes exist for common query patterns:
  - `campaign_contacts(campaign_id, status)`
  - `call_logs(call_sid)` unique
  - `contacts(tenant_id, phone)` and `contacts(tenant_id, list_id)`
  - `campaigns(tenant_id, status, scheduled_at)`
  - `telephony_integrations(tenant_id, provider, is_active)`

## 5) Provider abstraction layer

### Base interface contract
`TelephonyProvider` must provide:
- `initiate_call(to, callback_url, answer_url, caller_id, metadata) -> CallResult`
- `send_sms(to, body, from_) -> SMSResult`
- `get_call_status(call_sid) -> dict`
- `parse_webhook(payload) -> dict` with normalized fields:
  - `{call_sid, status, duration, answered_by}`

### Factory
`get_provider(integration_row)`:
- decrypt credentials
- return correct provider adapter

### Webhook normalization mapping (canonical fields)
The adapter converts provider-specific fields into:
- `call_sid`: unique provider call identifier
- `status`: normalized call state (answered, no-answer, failed, busy, etc.)
- `duration`: seconds (if available)
- `answered_by`: provider-dependent voice classification mapped to:
  - voicemail/machine vs human

### Voicemail detection policy (common rule)
When the webhook is parsed:
- MCube: `answered_by=machine` => voicemail
- Exotel: `CallType=IVR` => voicemail
- Twilio: `AnsweredBy in [machine_start, fax]` => voicemail

### Provider adapters and HTTP client strategy
- Use `httpx.AsyncClient` for provider calls.
- Centralize:
  - timeouts
  - retry policy for provider API calls
  - user-agent / request tracing IDs

## 6) RabbitMQ messaging architecture

### Exchanges and queues
Use a direct exchange scheme for deterministic routing:
- `calls.high_priority`: click-to-call and urgent single call initiation
- `calls.bulk`: bulk dispatch/campaign contact fan-out (optional distinction)
- `calls.retry`: retry attempts via delayed/TTL routing
- `calls.callback`: scheduled callbacks
- DLX:
  - `dlx.campaigns` -> `dlx.failed_calls`

### Message durability requirements
- Exchanges and queues: durable
- Messages: persistent `delivery_mode`

### Message payload contract (conceptual)
Each message should include:
- `tenant_id`
- `campaign_id` (nullable for click-to-call)
- `contact_id` (nullable for click-to-call)
- `provider_integration_id` or provider key (resolved at worker time)
- `call_sid` correlation placeholder (worker sets after provider initiation)
- `metadata` (bot id, LiveKit agent id hints, custom fields)
- `attempt_count` or enough info for retry_service to fetch attempt_count

### Delayed retry design
Requirement approach:
- schedule retry by TTL + dead-letter to `calls.retry`
- set message property `expiration` in milliseconds until `next_retry_at`

### Callback design
- callbacks are stored in Redis sorted set:
  - score = unix timestamp of callback_at
  - worker/scheduler fetches due callbacks periodically and publishes into `calls.callback`

## 7) Worker behavior (call_worker)

### Worker responsibilities
1. Consume messages from RabbitMQ with appropriate prefetch.
2. Enforce business hours before initiating calls.
3. Enforce concurrency limits per tenant+campaign (or per tenant+bot, depending on your config).
4. Initiate call using provider adapter.
5. Update:
   - `campaign_contacts.status = dialing`
   - attempt counters
   - `call_logs` with `call_sid` and correlation to campaign/contact
6. On exceptions:
   - schedule retry based on reason classification
   - ensure concurrency slots are released appropriately

### Business hours enforcement (worker-level)
Before calling provider:
- If outside allowed business hours:
  - compute `next_window` open time
  - requeue/delay message to the appropriate time (using retry-like delay or a callback queue)
  - ack original message once the scheduling is successful

### Concurrency enforcement (Redis)
Concurrency lock must be deterministic:
- `acquire_call_slot(tenant_id, campaign_id, max_concurrent)`
  - increment Redis key: `concurrent_calls:{tenant_id}:{campaign_id}`
  - if value exceeds max:
    - immediately decrement
    - reject/nack with short requeue delay (e.g., 5 seconds)
  - else:
    - keep slot until webhook ends or worker finalizes state

Release:
- `release_call_slot(...)` in webhook handlers (preferred) or in worker failure finally blocks.

### Provider initiation responsibilities
- Worker calls:
  - `provider.initiate_call(to, callback_url, answer_url, caller_id, metadata)`
- `answer_url` must point to the LiveKit bridge dispatch endpoint with:
  - `tenant_id`
  - `agent_id` (or LiveKit agent id)
  - correlation fields to map telephony call to LiveKit execution context

### Error handling classification
Errors should be classified into:
- voicemail-like (provider says answered_by machine/IVR)
- no-answer / busy / failed (reason for retry scheduling)
- transient failures (provider outage, network errors)

This classification is what drives `retry_service.schedule_retry`’s terminal statuses.

## 8) Retry service design

### Retry attempt progression
Inputs:
- `campaign_contact_id`
- `reason`
- `campaign` (to read max_retries and timing knobs)

Algorithm (state-machine description):
1. Read current `attempt_count`.
2. If `attempt_count >= campaign.max_retries`:
   - set terminal status:
     - voicemail -> `voicemail`
     - no-answer -> `not_answered`
     - otherwise -> `failed`
   - stop processing further retries
3. Else compute `next_retry_at`:
   - voicemail -> now + `voicemail_retry_hours` * 60 mins
   - otherwise -> now + `retry_interval_mins` mins
4. Clamp `next_retry_at` into next business hours window.
5. Update:
   - `campaign_contacts.status = pending`
   - `campaign_contacts.next_retry_at = next_retry_at`
   - `attempt_count += 1`
6. Publish to `calls.retry` queue:
   - TTL/expiration set so dead-lettering triggers at `next_retry_at`

### Business hours clamping for retry
This ensures retries never happen at forbidden times and avoids worker-level churn.

## 9) Callback service design

### When callbacks happen
Typical scenario:
- call ended with a “needs follow-up” category (e.g., callback requested by LiveKit conversation, or business rules).

### Storage
- Store pending callbacks in Redis sorted set:
  - key: `callbacks:{tenant_id}:{campaign_id}`
  - score: unix timestamp

### Scheduling job
- `fire_due_callbacks` runs every ~30 seconds:
  - reads due callbacks from ZRANGEBYSCORE
  - removes them (ZREM)
  - publishes callback messages to `calls.callback` queue

### Callback message behavior
Worker consumes callback message:
- re-validates business hours window
- re-acquires concurrency slot
- initiates call again as a new dialing attempt

## 10) Business hours service + campaign scheduler

### business_hours_service responsibilities
- `is_within_business_hours(campaign_id) -> bool`
  - loads per-weekday enabled start/end
  - checks tenant timezone
  - checks holiday overrides in `holidays`
- `get_next_business_window(campaign_id) -> datetime`
  - forward-iterates up to 7 days to find next opening
- `clamp_to_business_hours(dt, bh_id) -> datetime`
  - if outside, advance to next open time

### Scheduler responsibilities
1. `enforce_business_hours` every 5 minutes
   - pauses running campaigns outside hours
   - resumes paused campaigns when within hours
2. `campaign_scheduler` every 1 minute
   - finds campaigns with `scheduled_at <= now` and `status='scheduled'`
   - transitions them to `running` and launches contacts
3. `refresh_materialized_stats` every 60 seconds
   - refreshes `campaign_stats` materialized view concurrently

## 11) Campaign service design

### launch_campaign(campaign_id)
Responsibilities:
- validate:
  - integration active
  - bot has required LiveKit agent identity
  - business hours config exists if strict enforcement is enabled
- populate contacts:
  - apply blocked list policies
  - apply ignore-history rules
- set `campaign.status = running`
- publish call initiation messages in batches

### populate_campaign_contacts(campaign_id)
Core logic:
1. Load all contacts for `campaign.list_id` (or single phone for single campaigns).
2. If `contact.is_blocked`:
   - insert `campaign_contacts` with `status='blocked'`
3. Ignore-history handling:
   - If `campaign.ignore_history = False`:
     - join `call_logs` to find whether the same phone had an answered call within the last 30 days
     - if yes, skip creating a pending contact row
   - If `campaign.ignore_history = True`:
     - upsert contact with reset counters and `status='pending'`
4. Attempt counters:
   - initial attempt_count = 0 (or align with your product’s definition)

### pause/resume
- Pause:
  - set `campaign.status='paused'`
  - stop new message publishing; pending contacts remain in DB
  - optionally persist paused reason in Redis
- Resume:
  - set `campaign.status='running'`
  - re-publish pending contacts as call initiation messages

## 12) Webhook handlers (idempotent + concurrency release)

### General webhook requirements
Every provider webhook endpoint must:
1. Verify signature/auth:
   - Twilio: HMAC verification using Twilio signature scheme
   - MCube/Exotel: API key verification or provider-specific header verification
2. Normalize payload:
   - `provider.parse_webhook(payload)` -> `{call_sid, status, duration, answered_by}`
3. Idempotency guard:
   - Redis key: `webhook_processed:{call_sid}:{event}`
   - Use `SET NX EX 30` to prevent double-processing
4. Release concurrency slot:
   - ensure `release_call_slot(tenant_id, campaign_id)` happens exactly once (or is idempotent by design)
5. Update:
   - `call_logs` record matching `call_sid`
   - `campaign_contacts.status` based on normalized status:
     - answered -> answered/completed pipeline
     - no answer -> not_answered / retry scheduling logic
     - voicemail -> voicemail pipeline
6. If voicemail/no answer requires retry:
   - call `retry_service.schedule_retry(...)`
7. Refresh analytics cache:
   - either refresh Redis cache for this campaign or mark it stale (with TTL)
8. Return HTTP 200 quickly.

### Status routing strategy
Define a small mapping table from `normalized status` to:
- terminal status
- retry reason
- callback creation rules

## 13) LiveKit integration: agent dispatch and call mapping

### What must be true for LiveKit to work
- When the provider connects the call, it must hit an `answer_url` that:
  - identifies the tenant
  - identifies the agent/bot
  - carries correlation so `call_sid` can be mapped to LiveKit room and later to `call_logs`
- The LiveKit bridge dispatch must:
  - join/create LiveKit room
  - start LiveKit agent with the right bot prompt/config
  - ensure audio routing matches your telephony provider format expectations (codec/sample rate)

### Per-call isolation requirement
Your platform must treat each call as an isolated “session context”:
- one LiveKit room per call (or a stable mapping if provider requires it)
- one WS session (or SIP session) per call (depending on provider integration)

## 14) Analytics architecture

### Fast path (Redis cache)
For `GET /analytics/campaigns/{id}/stats`:
1. Compute cache key: `stats:{campaign_id}`
2. Attempt Redis GET.
3. If present, return.

### Slow path (materialized view)
On cache miss:
- query `campaign_stats` materialized view for aggregated counts
- incorporate any live overlays from Redis (e.g., “live calls now”)
- compute `completion_percent`:
  - `(responded + not_answered + failed + voicemail + blocked) / total * 100`
- store result into Redis with TTL ~60 seconds

### Why materialized view is required
- Avoid scanning `campaign_contacts` for each analytics request.
- Keeps analytics stable under load during campaign spikes.

## 15) CSV upload + contact management worker

### Upload responsibilities
- API receives CSV upload request.
- Enqueue background job:
  - download CSV from S3 (or local storage)
  - parse using `csv.DictReader`
  - normalize phone numbers to E.164

### Phone normalization rule
- Always store `contacts.phone` in E.164:
  - parse via `phonenumbers.parse(phone, default_region)`
  - format to E.164

### Blocked list rule
- Check `blocked:{tenant_id}` as a Redis SET:
  - mark blocked contacts accordingly without scheduling calls

### Bulk upsert semantics
- Use bulk insert/upsert:
  - by `(tenant_id, phone)` or `(tenant_id, list_id, phone)` depending on desired uniqueness

### Progress emission
- Emit progress via Redis pub/sub:
  - `csv_progress:{tenant_id}:{list_id}`

## 16) SMS architecture

### Templates + placeholders
- `sms_templates.body` supports `{{name}}` and any metadata placeholders stored in contact metadata.

### Bulk SMS policy
- Render template per contact.
- Enforce rate limit:
  - token bucket in Redis: `sms_rate:{tenant_id}`
  - target: 10 SMS/sec
- Use provider adapter:
  - `provider.send_sms(to, body, from_)`

### SMS consistency guarantees
- Persist SMS send attempts in DB if you need reporting (table not listed here; may be added later).
- Keep provider failures retriable via worker retry patterns if required.

## 17) Click-to-call endpoint design

### Required request
- `POST /calls/click-to-call`
- Body:
  - `phone`
  - `provider`
  - `agent_id`
  - `metadata` (optional)

### Flow
1. Resolve active telephony integration for tenant + provider.
2. Create `call_logs` row (and optionally a “synthetic” campaign/contact record if needed for analytics symmetry).
3. Call provider adapter immediately (no RabbitMQ scheduling if you follow the requirement “immediately”).
4. Return:
   - `{call_sid, status}`

### Answer URL
- Must include tenant + agent identity so LiveKit bridge dispatch can start the correct bot.

## 18) Security, reliability, and idempotency details

### Idempotency for webhooks
- Use Redis key:
  - `webhook_processed:{call_sid}:{event}`
- Use:
  - `SET NX EX 30`

### Concurrency slot idempotency
- Ensure releasing slots does not break if webhooks arrive multiple times:
  - either release only on the first processed webhook event
  - or make release operations safe via internal checks

### Retries and message duplication
- RabbitMQ redelivery can happen.
- Ensure worker operations are safe:
  - verify if `campaign_contacts.status` already transitioned past expected state
  - avoid double call initiation for the same `campaign_contact_id` + attempt

### Observability
- Structured logs at each transition:
  - campaign start -> message dispatch
  - worker consume -> concurrency slot acquired
  - provider initiate success -> call_sid stored
  - webhook -> state transition and retry scheduling
  - materialized stats refresh

## 19) Deployment and scaling model (docker-compose)

### Services to run
- `api`: FastAPI (web + webhooks)
- `worker`: RabbitMQ consumer
- `scheduler`: APScheduler jobs
- `postgres`: primary DB
- `redis`: cache + concurrency + callbacks + idempotency
- `rabbitmq`: broker (with management plugin)

### Scaling approach
- Scale `worker` horizontally:
  - ensure concurrency constraints are global via Redis counters
- Scale `api` behind a load balancer:
  - webhooks are stateless beyond provider parsing
- Keep scheduler singleton:
  - multiple scheduler instances can be safe if they coordinate; otherwise run one replica.

## 20) Build milestones (implementation order, aligned to your requirements)

1. Database + Alembic migrations
   - tables + encryption fields
   - materialized view `campaign_stats`
2. Provider abstraction layer
   - base interface + factory + MCube adapter (then Exotel, then Twilio)
3. Core campaign CRUD + start/pause/resume/stop
4. CSV upload + contact management pipeline
5. RabbitMQ worker + concurrency + message routing
6. Retry service
7. Webhook handlers (MCube/Exotel/Twilio)
8. Callback service + due-callback scheduler
9. Business hours service + campaign scheduler
10. Click-to-call endpoint (immediate initiation)
11. Analytics + stats caching
12. SMS service (single + bulk)
13. LiveKit bridge dispatch integration
14. Docker Compose hardening (health checks, environment templates, restart policies)

## 21) Open decisions you should confirm early

To finalize the base architecture, the following product/implementation choices must be confirmed:
1. LiveKit mapping:
   - How `call_sid` maps to LiveKit `room` and agent session?
2. `campaign_contacts` uniqueness:
   - Is uniqueness `(campaign_id, contact_id)` enough, or do you need dedupe by `(campaign_id, phone)`?
3. “Completion” definition:
   - For `campaign_stats` and `completion_percent`, does “answered but still in progress” count or not?
4. Callback semantics:
   - Which statuses trigger callback scheduling? Is callback part of the same retry mechanism or separate?
5. Retry reason classification:
   - Which normalized statuses map to `voicemail`, `not_answered`, and `failed`?
6. Recording and voicemail URLs:
   - Ensure every provider webhook delivers (or can fetch) `recording_url` / `voicemail_url`.

