# PCAA — Full Integration Architecture Plan
## Post-Call Analytics: Mcube-Integrated, Automated, End-to-End

**Document Type:** Architecture & Implementation Plan
**Status:** Planning — No code changes made
**Date:** 2026-03-11
**Scope:** All changes required to transform the current manual/scripted PCAA system into a fully automated, Mcube-integrated, production-grade post-call analytics platform.

---

## Table of Contents

1. [Current State Assessment](#1-current-state-assessment)
2. [Target State Vision](#2-target-state-vision)
3. [Target Architecture Diagram](#3-target-architecture-diagram)
4. [Gap Analysis](#4-gap-analysis)
5. [Phase 1 — Mcube Webhook Integration](#5-phase-1--mcube-webhook-integration)
6. [Phase 2 — Subscription & BID Activation](#6-phase-2--subscription--bid-activation)
7. [Phase 3 — Automated Processing Pipeline](#7-phase-3--automated-processing-pipeline)
8. [Phase 4 — Agent Feedback & Coaching System](#8-phase-4--agent-feedback--coaching-system)
9. [Phase 5 — Data Storage, Retention & Archival](#9-phase-5--data-storage-retention--archival)
10. [Phase 6 — Observability & Operations](#10-phase-6--observability--operations)
11. [Database Schema Changes Required](#11-database-schema-changes-required)
12. [API Changes Required](#12-api-changes-required)
13. [New Services & Workers Required](#13-new-services--workers-required)
14. [Mcube-Side Requirements](#14-mcube-side-requirements)
15. [Complete Activity Checklist](#15-complete-activity-checklist)
16. [Dependency & Sequencing Map](#16-dependency--sequencing-map)

---

## 1. Current State Assessment

### What exists today

| Component | Status | Problem |
|---|---|---|
| `pipeline_6004.py` | Working | Hardcoded to BID 6004 only, run manually |
| `call_self_service.py` | Working | CLI only, manually invoked per-call |
| `6004_raw_calls` table | Working | Populated by manual sync from source DB |
| Sarvam transcription | Working | Triggered manually, one call at a time |
| AWS Bedrock analysis | Working | Triggered manually after transcription |
| `6004_callanalytics` table | Working | Results land here after manual run |
| `6004_bant_analysis` table | Working | BANT results stored manually |
| `business_pipeline_config` | Exists | Not auto-used; manually seeded per BID |
| `business_agent_config` | Exists | Not auto-used; manually seeded per BID |
| Dashboard frontend | Working | Reads analytics but not linked to pipeline |
| RabbitMQ | Installed | Not used by pipeline at all |
| FastAPI (port 8002) | Running | Has endpoints but no webhook receiver |

### What does NOT exist

- No webhook endpoint to receive call completion events from Mcube
- No subscription or payment-based BID activation/deactivation
- No automatic trigger when a call recording becomes available
- No multi-BID automated processing (everything is BID 6004 specific)
- No message queue between call receipt and transcription/analysis
- No retry or dead-letter handling for failed transcriptions/analyses
- No agent feedback or coaching delivery mechanism
- No data retention or archival policy or implementation
- No notification system (email/WhatsApp) for quality alerts
- No self-service onboarding flow for new business customers
- No monitoring, alerting, or pipeline health dashboard
- No audit trail for processing events

---

## 2. Target State Vision

When fully built, the system should behave as follows:

### Customer Journey (Automated)

```
Business signs up on Mcube
    │
    ▼
Business selects "Post-Call Analytics" add-on & makes payment
    │
    ▼
Mcube marks BID as PCAA-active → sends activation event to PCAA
    │
    ▼
PCAA creates BID configuration (STT provider, quality params, agents)
    │
    ▼
Every call made by that business ends
    │
    ▼
Mcube sends webhook: { bid, callid, agent, duration, recording_url, ... }
    │
    ▼
PCAA queues call for processing (RabbitMQ)
    │
    ▼
Transcription worker picks up → Sarvam → saves transcript
    │
    ▼
Analysis worker picks up → AWS Bedrock → quality score + BANT + coaching
    │
    ▼
Results stored in DB → dashboard shows live scores
    │
    ▼
Agent & supervisor notified (email / WhatsApp / dashboard alert)
    │
    ▼
After retention period → audio archived to cold storage, DB compressed
```

---

## 3. Target Architecture Diagram

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                          MCUBE TELEPHONY PLATFORM                           │
│                                                                             │
│  ┌──────────────┐    ┌──────────────┐    ┌────────────────────────────┐    │
│  │  Call Engine │───▶│  Recording   │───▶│  Webhook Dispatcher        │    │
│  │  (SIP/VoIP)  │    │  Storage     │    │  POST /pcaa/webhook/call   │    │
│  └──────────────┘    └──────────────┘    └────────────┬───────────────┘    │
│                                                        │                    │
│  ┌──────────────────────────────────────┐             │                    │
│  │  Subscription & Billing System       │             │                    │
│  │  POST /pcaa/webhook/bid-activated    │─────────────┤                    │
│  │  POST /pcaa/webhook/bid-deactivated  │             │                    │
│  └──────────────────────────────────────┘             │                    │
└───────────────────────────────────────────────────────┼────────────────────┘
                                                         │
                              ┌──────────────────────────▼───────────────────┐
                              │              PCAA PLATFORM                    │
                              │                                               │
                              │  ┌─────────────────────────────────────────┐ │
                              │  │  API Gateway (FastAPI — port 8002)      │ │
                              │  │                                         │ │
                              │  │  POST /webhook/call-completed           │ │
                              │  │  POST /webhook/bid-activated            │ │
                              │  │  POST /webhook/bid-deactivated          │ │
                              │  │  GET  /pipeline/{bid}/status            │ │
                              │  │  GET  /call-records/{bid}               │ │
                              │  └───────────────────┬─────────────────────┘ │
                              │                      │                        │
                              │          ┌───────────▼───────────┐           │
                              │          │   RabbitMQ            │           │
                              │          │                       │           │
                              │          │   call.received       │           │
                              │          │   call.transcribe     │           │
                              │          │   call.analyze        │           │
                              │          │   call.notify         │           │
                              │          │   call.archive        │           │
                              │          └─────┬──────┬──────────┘           │
                              │                │      │                       │
                              │   ┌────────────▼─┐  ┌▼────────────────┐     │
                              │   │ Transcription │  │ Analysis Worker │     │
                              │   │ Worker        │  │                 │     │
                              │   │               │  │  AWS Bedrock    │     │
                              │   │  Sarvam STT   │  │  Nova-Lite      │     │
                              │   │  Deepgram     │  │  Quality Score  │     │
                              │   │  (per BID)    │  │  BANT           │     │
                              │   └──────────┬────┘  └────────┬────────┘     │
                              │              │                │               │
                              │   ┌──────────▼────────────────▼────────────┐ │
                              │   │              MySQL                      │ │
                              │   │  voicebot_cluster @ 127.0.0.1          │ │
                              │   │                                         │ │
                              │   │  pcaa_subscriptions (new)               │ │
                              │   │  pcaa_bid_config (new)                  │ │
                              │   │  pcaa_webhook_log (new)                 │ │
                              │   │  {bid}_raw_calls                        │ │
                              │   │  {bid}_sarvamresponse                   │ │
                              │   │  {bid}_callanalytics                    │ │
                              │   │  {bid}_bant_analysis                    │ │
                              │   │  pcaa_processing_log (new)              │ │
                              │   │  pcaa_archive_manifest (new)            │ │
                              │   └─────────────────────────────────────────┘ │
                              │                                               │
                              │   ┌─────────────────────────────────────────┐ │
                              │   │  Notification Worker                    │ │
                              │   │  Email / WhatsApp Business API          │ │
                              │   │  Per-agent coaching reports             │ │
                              │   │  Per-supervisor team summaries          │ │
                              │   └─────────────────────────────────────────┘ │
                              │                                               │
                              │   ┌─────────────────────────────────────────┐ │
                              │   │  Archive Worker                         │ │
                              │   │  AWS S3 / compatible object storage     │ │
                              │   │  Configurable retention (30/60/90 days) │ │
                              │   └─────────────────────────────────────────┘ │
                              └───────────────────────────────────────────────┘
```

---

## 4. Gap Analysis

The table below maps every gap between the current system and the target state.

| # | Gap | Current State | Required State |
|---|---|---|---|
| G1 | Call receipt trigger | Manual DB sync or CSV import | Mcube webhook → instant queue |
| G2 | BID activation | Manually seeded in DB | Automated via Mcube billing event |
| G3 | BID deactivation | Not implemented | Mcube sends event → pipeline stops |
| G4 | Multi-BID pipeline | BID 6004 hardcoded | All active BIDs processed automatically |
| G5 | Transcription trigger | Manual CLI or cron | Queue-driven worker, fires on call receipt |
| G6 | Analysis trigger | Manual CLI after transcription | Queue-driven worker, fires after transcription |
| G7 | Failure handling | None — failed calls stay stuck | Dead-letter queue, retry with backoff, failure alerts |
| G8 | Queue system | RabbitMQ installed but unused | RabbitMQ wired into all pipeline stages |
| G9 | Agent feedback | Not implemented | Post-analysis notification to agent email/WhatsApp |
| G10 | Supervisor reports | Dashboard only | Daily digest email/WhatsApp with team scores |
| G11 | Coaching suggestions | Not implemented | AI-generated improvement tips per call sent to agent |
| G12 | Data retention | No policy — data grows forever | Configurable per BID (default 90 days active, then archive) |
| G13 | Audio archival | Audio stays on Mcube servers | Copy to S3/object store, expire from hot storage |
| G14 | Transcript archival | Never compressed or archived | After retention period, compress + move to cold tier |
| G15 | Onboarding flow | Manual DB inserts | Self-service API + UI to configure BID quality params |
| G16 | Webhook security | No webhook endpoint exists | HMAC signature verification on all incoming webhooks |
| G17 | Monitoring | No visibility into pipeline health | Processing lag, error rate, queue depth metrics |
| G18 | Idempotency | Re-running overwrites data | Webhook deduplication, exactly-once processing guarantee |
| G19 | Payment tracking | Not tracked | `pcaa_subscriptions` table with plan, payment date, expiry |
| G20 | Audit trail | No logging of who triggered what | `pcaa_processing_log` and `pcaa_webhook_log` tables |

---

## 5. Phase 1 — Mcube Webhook Integration

**Goal:** Replace manual sync/polling with event-driven call receipt from Mcube.

### 5.1 Mcube-side webhook configuration

Mcube must be configured to POST to a PCAA endpoint whenever a call recording is ready. This is a Mcube platform configuration, not a PCAA code change.

**Webhook event: Call Completed**
```json
POST https://pcaa.mcube.in/webhook/call-completed
Content-Type: application/json
X-Mcube-Signature: <HMAC-SHA256 of body>

{
  "event": "call.completed",
  "bid": "6004",
  "callid": "97393902541773212620",
  "agent_name": "Richa vishwakarma",
  "agent_id": "emp_123",
  "agent_email": "richa@company.com",
  "agent_phone": "9XXXXXXXXX",
  "customer_phone": "9XXXXXXXXX",
  "call_starttime": "2026-03-11T12:33:40Z",
  "call_endtime": "2026-03-11T12:50:36Z",
  "duration_seconds": 1016,
  "call_status": "ANSWER",
  "direction": "outbound",
  "recording_url": "https://recordings.mcube.com/.../97393902541773212620.wav",
  "group_name": "Presales Team"
}
```

**Webhook event: BID Activated for PCAA**
```json
POST https://pcaa.mcube.in/webhook/bid-activated
{
  "event": "pcaa.activated",
  "bid": "6004",
  "business_name": "Mcube Presales",
  "plan": "standard",
  "activated_at": "2026-03-11T10:00:00Z",
  "paid_until": "2026-04-11T10:00:00Z"
}
```

**Webhook event: BID Deactivated**
```json
POST https://pcaa.mcube.in/webhook/bid-deactivated
{
  "event": "pcaa.deactivated",
  "bid": "6004",
  "reason": "payment_lapsed"
}
```

### 5.2 New PCAA API endpoints to add

| Endpoint | Method | Purpose |
|---|---|---|
| `/webhook/call-completed` | POST | Receive call completion from Mcube |
| `/webhook/bid-activated` | POST | BID subscription activation |
| `/webhook/bid-deactivated` | POST | BID subscription deactivation |

### 5.3 Webhook security: HMAC verification

Every incoming webhook from Mcube must include a `X-Mcube-Signature` header containing an HMAC-SHA256 of the raw request body, signed with a shared secret stored per-BID.

**Required:** A shared secret must be established between Mcube and PCAA at BID activation time. PCAA rejects any webhook where the signature does not match.

### 5.4 Webhook processing flow

```
Webhook arrives at /webhook/call-completed
    │
    ├── Verify HMAC signature → reject if invalid (HTTP 401)
    ├── Check BID is active in pcaa_subscriptions → reject if not (HTTP 403)
    ├── Check callid not already processed (idempotency) → skip if duplicate
    ├── Insert into {bid}_raw_calls (if not present)
    ├── Insert into pcaa_webhook_log
    ├── Check call_status == 'ANSWER' and duration >= min_call_duration_s
    │       └── If not eligible → log and return HTTP 200 (acknowledged, not queued)
    └── Publish message to RabbitMQ queue: call.transcribe
            └── Return HTTP 200
```

### 5.5 Webhook log table (`pcaa_webhook_log`) — new

| Column | Type | Description |
|---|---|---|
| `id` | BIGINT AUTO_INCREMENT | PK |
| `bid` | VARCHAR(20) | Business ID |
| `event_type` | VARCHAR(50) | e.g. `call.completed` |
| `callid` | VARCHAR(50) | Call ID from payload |
| `payload` | JSON | Full raw webhook payload |
| `signature_valid` | TINYINT | 1 = valid, 0 = rejected |
| `queued` | TINYINT | 1 = pushed to RabbitMQ |
| `skip_reason` | VARCHAR(100) | Why not queued (short call, duplicate, etc.) |
| `received_at` | DATETIME | Timestamp of receipt |

---

## 6. Phase 2 — Subscription & BID Activation

**Goal:** Automatically enable/disable PCAA processing for a BID based on Mcube's subscription/payment status.

### 6.1 New table: `pcaa_subscriptions`

| Column | Type | Description |
|---|---|---|
| `id` | INT AUTO_INCREMENT | PK |
| `bid` | VARCHAR(20) UNIQUE | Business ID |
| `business_name` | VARCHAR(255) | Business name |
| `plan` | VARCHAR(50) | e.g. `standard`, `premium` |
| `status` | ENUM | `active`, `suspended`, `cancelled` |
| `activated_at` | DATETIME | When PCAA was first activated |
| `paid_until` | DATETIME | Current paid period end |
| `webhook_secret` | VARCHAR(255) | HMAC secret for this BID's webhooks |
| `created_at` | DATETIME | |
| `updated_at` | DATETIME | |

### 6.2 BID activation sequence

When Mcube sends `pcaa.activated`:

1. Create/update row in `pcaa_subscriptions` with `status = 'active'`
2. Auto-create default `business_pipeline_config` row for the BID (if not exists)
3. Auto-create default `business_agent_config` row (quality_analyzer agent)
4. Auto-create per-BID DB tables: `{bid}_raw_calls`, `{bid}_sarvamresponse`, `{bid}_callanalytics`, `{bid}_bant_analysis`
5. Copy the standard BID 6004 quality parameters as a template for the new BID (into `quality_parameters` table)
6. Send welcome email/notification to business admin
7. Log the activation event in `pcaa_processing_log`

### 6.3 BID deactivation sequence

When Mcube sends `pcaa.deactivated`:

1. Update `pcaa_subscriptions.status = 'suspended'`
2. All new webhooks for this BID are rejected with HTTP 403
3. In-flight calls already in the queue are processed (do not drop mid-flight)
4. Log the deactivation event
5. (Optional) Notify business admin

### 6.4 Subscription status check

Every webhook processing step checks `pcaa_subscriptions.status = 'active'` before proceeding. This is the single gate that controls all processing for a BID.

### 6.5 Onboarding API endpoints to add

| Endpoint | Method | Purpose |
|---|---|---|
| `/admin/bids/{bid}/activate` | POST | Manual override to activate a BID |
| `/admin/bids/{bid}/deactivate` | POST | Manual override to deactivate |
| `/admin/bids/{bid}/configure` | POST/PUT | Set pipeline config, quality params, agents |
| `/admin/bids` | GET | List all BIDs and their subscription status |

---

## 7. Phase 3 — Automated Processing Pipeline

**Goal:** Replace all manual CLI scripts with a continuously-running, queue-driven, multi-BID pipeline.

### 7.1 Message queue topology (RabbitMQ)

```
Exchange: pcaa.direct (type: direct)

Queues:
  call.transcribe         ← calls waiting for STT
  call.analyze            ← transcribed calls waiting for analysis
  call.notify             ← analyzed calls waiting for notification dispatch
  call.archive            ← calls past retention period, waiting for archival

Dead-letter queues (automatic retry with exponential backoff):
  call.transcribe.failed  ← transcriptions that failed 3 times
  call.analyze.failed     ← analyses that failed 3 times
  call.notify.failed      ← notifications that failed
```

**Message format (call.transcribe):**
```json
{
  "bid": "6004",
  "callid": "97393902541773212620",
  "recording_url": "https://recordings.mcube.com/.../97393902541773212620.wav",
  "duration_seconds": 1016,
  "agent_name": "Richa vishwakarma",
  "agent_email": "richa@company.com",
  "attempt": 1,
  "queued_at": "2026-03-11T12:55:00Z"
}
```

### 7.2 New worker services

#### Transcription Worker (`transcription_worker.py`)
- Consumes from `call.transcribe`
- Loads BID's STT provider from `business_pipeline_config` (sarvam/deepgram)
- Downloads audio, calls STT API, saves to `{bid}_sarvamresponse`
- On success: publishes message to `call.analyze`
- On failure: increments attempt counter; if attempt < 3, requeue with delay; else route to dead-letter queue + alert

#### Analysis Worker (`analysis_worker.py`)
- Consumes from `call.analyze`
- Loads BID's agent configs from `business_agent_config`
- Runs `CallAnalyzer.analyze_call()` — quality scores + BANT + coaching suggestions
- Saves to `{bid}_callanalytics` + `{bid}_bant_analysis`
- On success: publishes message to `call.notify`
- On failure: dead-letter queue + alert

#### Notification Worker (`notification_worker.py`)
- Consumes from `call.notify`
- Loads notification preferences per BID
- Sends coaching report to agent (email/WhatsApp)
- Sends daily summary digest to supervisor (batched, not per-call)
- On failure: dead-letter + log (non-critical, best-effort delivery)

#### Archive Worker (`archive_worker.py`)
- Consumes from `call.archive`
- Downloads audio from Mcube recording URL → uploads to S3/object storage
- Compresses transcript and analysis rows to archive tables
- Marks original rows as archived
- Deletes from hot storage if configured

### 7.3 Pipeline orchestrator (`pipeline_orchestrator.py`)

A lightweight scheduler (replaces `pipeline_6004.py`) that:
- Runs on a configurable interval (default: every 60 seconds)
- Queries `pcaa_subscriptions` for all `status = 'active'` BIDs
- For each active BID, checks `business_pipeline_config.pipeline_enabled`
- Picks up any calls stuck in a non-terminal state (missed webhooks, server restarts) and re-queues them
- Publishes calls due for archival to `call.archive`
- This is the safety net — the primary trigger is still webhooks

### 7.4 Processing state machine

Each call transitions through these states, tracked in `{bid}_raw_calls`:

```
received → queued → transcribing → transcribed → analyzing → analyzed → notified → archived
                                                                                        │
                         ↑                  ↑                                      (terminal)
                     failed_stt         failed_analysis
```

All state transitions are logged in `pcaa_processing_log`.

### 7.5 New table: `pcaa_processing_log`

| Column | Type | Description |
|---|---|---|
| `id` | BIGINT | PK |
| `bid` | VARCHAR(20) | |
| `callid` | VARCHAR(50) | |
| `stage` | VARCHAR(50) | `received`, `transcribing`, `analyzing`, `notified`, etc. |
| `status` | VARCHAR(20) | `started`, `success`, `failed` |
| `worker_id` | VARCHAR(100) | Which worker instance handled it |
| `duration_ms` | INT | Processing time |
| `error_message` | TEXT | Error if failed |
| `attempt` | INT | Retry attempt number |
| `created_at` | DATETIME | |

---

## 8. Phase 4 — Agent Feedback & Coaching System

**Goal:** Automatically deliver actionable quality insights to agents and supervisors after every call.

### 8.1 What gets delivered per call (to agent)

After analysis completes, each agent receives (via email or WhatsApp):

```
📞 Call Quality Report — [Date] [Time]
Call with: [Customer Phone]
Duration: 16m 56s

Your Score: 87/100

✅ Strong areas:
   • Product knowledge (14/15) — Great explanation of pricing and features
   • Active listening (14/15) — You understood the spam concern clearly

⚠️ Areas to improve:
   • Greeting (8/10) — Introduce yourself by name and designation upfront
   • Objection handling (12/15) — Offer specific proof points when cost is questioned

BANT Status:
   Need: Identified ✓ (voice broadcasting without spam)
   Authority: Likely decision maker ✓
   Budget: Not discussed ✗
   Timeline: Not discussed ✗

💡 Coaching Tip:
   "Try asking 'What's your timeline for this decision?' before the closing.
   You naturally built good rapport — use that to ask budget questions earlier."
```

### 8.2 What gets delivered per team (to supervisor)

A daily digest (8 AM, configurable) per supervisor group:

```
📊 Team Quality Report — 2026-03-11

Team: Presales | BID: 6004

Today's calls: 24 | Analyzed: 24 | Avg Score: 81.4/100

Top performers:
  1. Richa vishwakarma   — 87.0 avg  (3 calls)
  2. Pooja Kumar         — 84.0 avg  (5 calls)

Needs attention:
  1. Shubham Kumari      — 72.0 avg  (6 calls) — Closing & objection handling

Today's objection themes:
  • Spam concerns (8 calls)
  • Pricing vs competitors (5 calls)
  • DID registration complexity (3 calls)

BANT pipeline:
  Needs clearly identified: 18/24 calls
  Budget discussed: 6/24 calls ← Opportunity to improve
```

### 8.3 New table: `pcaa_notification_config`

| Column | Type | Description |
|---|---|---|
| `id` | INT | PK |
| `bid` | VARCHAR(20) | |
| `notification_type` | ENUM | `agent_report`, `supervisor_digest` |
| `channel` | ENUM | `email`, `whatsapp`, `both`, `none` |
| `recipient_type` | ENUM | `agent_self`, `supervisor`, `custom_email` |
| `custom_email` | VARCHAR(255) | For `custom_email` recipient type |
| `send_time` | TIME | For digest: time of day to send |
| `send_days` | VARCHAR(50) | e.g. `mon,tue,wed,thu,fri` |
| `enabled` | TINYINT | On/off |
| `created_at` | DATETIME | |

### 8.4 Coaching suggestion generation

Within the existing analysis worker, after quality scoring, an additional AI prompt is run:

**Input:** Quality parameter scores + transcript + BANT gaps
**Output:** 2–3 bullet coaching suggestions for the agent, specific to this call
**Stored in:** `{bid}_callanalytics.coaching_suggestions` (new column)
**Used for:** Agent notification and dashboard display

### 8.5 Notification delivery options

**Email:** Uses SMTP or AWS SES. Agent email is sourced from `{bid}_raw_calls.agent_email` (requires Mcube webhook to include it) or from a new `pcaa_agent_profiles` table.

**WhatsApp:** Uses WhatsApp Business API. Agent phone sourced from `{bid}_raw_calls.agent_callinfo` or `pcaa_agent_profiles`.

---

## 9. Phase 5 — Data Storage, Retention & Archival

**Goal:** Prevent unbounded data growth, comply with business retention requirements, and archive efficiently.

### 9.1 Data tiers

| Tier | Storage | Retention | Content |
|---|---|---|---|
| **Hot** | MySQL (`voicebot_cluster`) | 0–90 days | All call metadata, transcripts, analytics |
| **Warm** | MySQL compressed / separate schema | 90–365 days | Compressed analytics only (no audio) |
| **Cold** | AWS S3 (or compatible) | 1–7 years | Gzipped JSON exports per call |
| **Audio** | Mcube recordings server (source) | Per Mcube policy | Not PCAA's responsibility unless explicitly archived |

### 9.2 Retention policy config

Retention is configurable per BID in `business_pipeline_config`. Proposed new columns:

| Column | Default | Description |
|---|---|---|
| `hot_retention_days` | 90 | Days to keep in MySQL hot tier |
| `warm_retention_days` | 365 | Days to keep in warm tier |
| `archive_audio` | 0 | Whether to copy audio to S3 |
| `s3_bucket` | NULL | S3 bucket name for this BID's archives |

### 9.3 Archival process (per call, after `hot_retention_days`)

1. Export call record to gzipped JSON: `s3://{bucket}/{bid}/{year}/{month}/{callid}.json.gz`
   - Includes: raw_call metadata, transcript, speaker_segments, analytics, BANT
2. Insert row in `pcaa_archive_manifest`
3. Delete from `{bid}_sarvamresponse` (largest table — transcripts)
4. Soft-delete from `{bid}_callanalytics` (mark archived, keep summary row)
5. If `archive_audio = true`: copy WAV from Mcube URL to S3 before archiving

### 9.4 New table: `pcaa_archive_manifest`

| Column | Type | Description |
|---|---|---|
| `id` | BIGINT | PK |
| `bid` | VARCHAR(20) | |
| `callid` | VARCHAR(50) | |
| `archive_path` | TEXT | S3 path of archived JSON |
| `audio_path` | TEXT | S3 path of archived audio (if applicable) |
| `archived_at` | DATETIME | |
| `original_call_date` | DATETIME | For audit |
| `archive_size_bytes` | BIGINT | Compressed size |

### 9.5 Dashboard "archived call" retrieval

When a user views a call that has been archived, the dashboard backend:
1. Checks `pcaa_archive_manifest` for the callid
2. Fetches the JSON from S3 on demand
3. Returns it as if it were a live record (transparent to the user)

---

## 10. Phase 6 — Observability & Operations

**Goal:** Make the pipeline visible, alertable, and self-healing.

### 10.1 Metrics to track (in `pcaa_processing_log` and a new `pcaa_pipeline_metrics` table)

| Metric | Description |
|---|---|
| `calls_received_per_hour` | Inbound webhook volume |
| `transcription_lag_seconds` | Time from call completion to transcript ready |
| `analysis_lag_seconds` | Time from transcript to analysis ready |
| `transcription_success_rate` | % of calls successfully transcribed |
| `analysis_success_rate` | % of calls successfully analyzed |
| `queue_depth` | Number of messages in each RabbitMQ queue |
| `failed_calls_count` | Calls stuck in failed state per BID |
| `avg_quality_score_per_bid` | Rolling 7-day average |

### 10.2 Alerting rules

| Condition | Alert Channel | Severity |
|---|---|---|
| Queue depth > 100 for > 5 minutes | Ops email/Slack | High |
| Transcription failure rate > 20% in 1 hour | Ops email | High |
| Any call stuck in `transcribing` state > 30 minutes | Ops email | Medium |
| BID analysis failure 3 consecutive calls | Ops + BID admin email | High |
| Sarvam API returns non-200 3 consecutive times | Ops | Critical |
| AWS Bedrock quota error | Ops | Critical |
| DB disk usage > 80% | Ops | Critical |

### 10.3 New API endpoints for operations

| Endpoint | Description |
|---|---|
| `GET /ops/pipeline/health` | Overall pipeline health (queue depths, error rates) |
| `GET /ops/pipeline/{bid}/status` | Per-BID status: queued/processing/failed counts |
| `POST /ops/pipeline/{bid}/retry-failed` | Re-queue failed calls for a BID |
| `GET /ops/pipeline/stuck-calls` | Calls stuck in non-terminal state > 30 min |
| `GET /ops/metrics` | Aggregated metrics for all BIDs |

### 10.4 Structured logging

All workers log in JSON format to a centralized log file (and optionally to a log aggregator like Elasticsearch or CloudWatch):

```json
{
  "timestamp": "2026-03-11T12:55:00Z",
  "service": "transcription_worker",
  "bid": "6004",
  "callid": "97393902541773212620",
  "event": "transcription_complete",
  "duration_ms": 45000,
  "transcript_chars": 17020,
  "segments": 254,
  "stt_provider": "sarvam",
  "attempt": 1
}
```

---

## 11. Database Schema Changes Required

### New tables to create

| Table | Purpose | Phase |
|---|---|---|
| `pcaa_subscriptions` | BID payment/activation status | Phase 2 |
| `pcaa_webhook_log` | Log all inbound webhooks | Phase 1 |
| `pcaa_processing_log` | Log every pipeline stage per call | Phase 3 |
| `pcaa_notification_config` | Per-BID notification settings | Phase 4 |
| `pcaa_agent_profiles` | Agent email/phone for notifications | Phase 4 |
| `pcaa_archive_manifest` | Tracks archived calls and S3 locations | Phase 5 |
| `pcaa_pipeline_metrics` | Time-series metrics snapshots | Phase 6 |

### Existing tables to alter

| Table | Change | Phase |
|---|---|---|
| `businesses` | Add `pcaa_active` TINYINT, `pcaa_plan` VARCHAR | Phase 2 |
| `business_pipeline_config` | Add `hot_retention_days`, `warm_retention_days`, `archive_audio`, `s3_bucket`, `webhook_secret` | Phase 5 |
| `{bid}_raw_calls` | Add `processing_state` ENUM, `webhook_received_at` DATETIME, `agent_email` VARCHAR | Phase 1 + 3 |
| `{bid}_callanalytics` | Add `coaching_suggestions` JSON, `notified_at` DATETIME, `archived` TINYINT | Phase 4 + 5 |
| `quality_parameters` | No structural change; need default template rows for auto-provisioned BIDs | Phase 2 |

### Processing state column (add to `{bid}_raw_calls`)

```sql
ALTER TABLE `{bid}_raw_calls`
  ADD COLUMN processing_state ENUM(
    'received',
    'queued',
    'transcribing',
    'transcribed',
    'analyzing',
    'analyzed',
    'notified',
    'archived',
    'failed_stt',
    'failed_analysis',
    'skipped'
  ) DEFAULT 'received' AFTER transcription_status,
  ADD COLUMN webhook_received_at DATETIME NULL,
  ADD COLUMN agent_email VARCHAR(255) NULL,
  ADD INDEX idx_processing_state (processing_state);
```

---

## 12. API Changes Required

### New endpoints (all in `fastapi_app.py` or a new `webhook_router.py`)

#### Inbound Webhooks (from Mcube)

| Method | Path | Auth | Description |
|---|---|---|---|
| POST | `/webhook/call-completed` | HMAC signature | Receive call completion event |
| POST | `/webhook/bid-activated` | HMAC signature | BID subscription activated |
| POST | `/webhook/bid-deactivated` | HMAC signature | BID subscription deactivated |

#### Admin / Ops

| Method | Path | Auth | Description |
|---|---|---|---|
| GET | `/admin/bids` | JWT admin | List all BIDs and status |
| POST | `/admin/bids/{bid}/activate` | JWT admin | Manually activate BID |
| POST | `/admin/bids/{bid}/deactivate` | JWT admin | Manually deactivate BID |
| POST | `/admin/bids/{bid}/configure` | JWT admin | Set pipeline + quality config |
| POST | `/ops/pipeline/{bid}/retry-failed` | JWT admin | Re-queue failed calls |
| GET | `/ops/pipeline/health` | JWT admin | Pipeline health summary |
| GET | `/ops/pipeline/stuck-calls` | JWT admin | Calls stuck > 30 min |

#### Agent / Supervisor (for notifications)

| Method | Path | Auth | Description |
|---|---|---|---|
| GET | `/agents/{bid}/{agent_id}/reports` | JWT | Agent's call reports |
| GET | `/supervisor/{bid}/team-report` | JWT | Daily team digest |
| GET | `/call-records/{bid}/{callid}/coaching` | JWT | Coaching tips for a call |

### Existing endpoints to modify

| Path | Change |
|---|---|
| `GET /pipeline/{bid}/status` | Add `processing_state` breakdown counts |
| `GET /call-records/{bid}` | Add `processing_state` filter + archived call lookup |
| `GET /call-records/{bid}/{callid}` | Transparently serve from S3 if archived |

---

## 13. New Services & Workers Required

### Summary of new processes

| Service | Language | Trigger | Deployment |
|---|---|---|---|
| `webhook_receiver` | Python/FastAPI (existing app) | HTTP POST from Mcube | Part of port 8002 |
| `transcription_worker.py` | Python | RabbitMQ consumer | Background process / systemd |
| `analysis_worker.py` | Python | RabbitMQ consumer | Background process / systemd |
| `notification_worker.py` | Python | RabbitMQ consumer | Background process / systemd |
| `archive_worker.py` | Python | RabbitMQ consumer | Background process / systemd |
| `pipeline_orchestrator.py` | Python | Cron (every 60s) | Cron job / systemd timer |
| `bid_provisioner.py` | Python | Called by webhook_receiver | Library (not standalone) |

### Scaling model

Initially all workers run as single processes. They are designed as independent consumers so multiple instances can be started without code change when volume grows:

```bash
# Run 3 transcription workers in parallel
python3 transcription_worker.py --worker-id transcriber-1 &
python3 transcription_worker.py --worker-id transcriber-2 &
python3 transcription_worker.py --worker-id transcriber-3 &
```

RabbitMQ distributes messages round-robin across all consumers of the same queue.

---

## 14. Mcube-Side Requirements

These are changes that must be made **by the Mcube platform team** (not PCAA), listed here so they can be coordinated:

| # | Requirement | Details |
|---|---|---|
| M1 | Webhook delivery | Mcube call engine must POST to `/webhook/call-completed` when recording is ready |
| M2 | Webhook payload fields | Must include: `bid`, `callid`, `recording_url`, `agent_email`, `agent_phone`, `duration_seconds`, `call_status`, `direction`, `call_starttime`, `call_endtime` |
| M3 | Billing event webhook | Mcube billing must POST `pcaa.activated` / `pcaa.deactivated` on payment events |
| M4 | Webhook reliability | Mcube must retry failed webhooks with backoff (3 attempts minimum) |
| M5 | HMAC signing | Mcube must sign all webhooks with the shared secret issued at BID activation |
| M6 | Recording URL lifetime | Recording URLs must remain accessible for at least 24 hours (time needed for PCAA to download and archive audio) |
| M7 | BID as identifier | Every webhook must include `bid` to allow PCAA to route correctly |
| M8 | Agent identifier | Agent email or employee ID must be included in webhook payload for notification routing |
| M9 | PCAA flag in Mcube UI | Mcube admin panel should show which BIDs have PCAA active, and let business admins manage it |
| M10 | Sandbox / test BID | Mcube to provide a sandbox BID for PCAA integration testing before production |

---

## 15. Complete Activity Checklist

All activities required to implement the full integration, grouped by phase.

### Phase 1 — Webhook Integration

- [ ] **1.1** Confirm webhook payload schema with Mcube platform team (fields, format, timing)
- [ ] **1.2** Agree on HMAC signing mechanism and key exchange process
- [ ] **1.3** Create `pcaa_webhook_log` table in MySQL
- [ ] **1.4** Add `processing_state` column and `webhook_received_at` column to all `{bid}_raw_calls` tables
- [ ] **1.5** Create `webhook_router.py` module within FastAPI app with 3 new endpoints
- [ ] **1.6** Implement HMAC signature verification middleware
- [ ] **1.7** Implement webhook idempotency check (callid already processed = 200 + skip)
- [ ] **1.8** Implement webhook → RabbitMQ publish logic
- [ ] **1.9** Write integration tests for webhook endpoint (valid payload, invalid signature, duplicate, short call)
- [ ] **1.10** Deploy and test against Mcube sandbox BID

### Phase 2 — Subscription & BID Activation

- [ ] **2.1** Create `pcaa_subscriptions` table
- [ ] **2.2** Implement `bid-activated` webhook handler (creates subscription + provisions BID)
- [ ] **2.3** Implement `bid-deactivated` webhook handler (suspends subscription)
- [ ] **2.4** Implement `bid_provisioner.py`: auto-creates tables, default pipeline config, default agent config, copies quality parameter template
- [ ] **2.5** Add admin override endpoints (`/admin/bids/*`)
- [ ] **2.6** Add `pcaa_active` column to `businesses` table
- [ ] **2.7** Create default quality parameter template rows (one per BID type: presales, support, collections)
- [ ] **2.8** Test full BID onboarding: activation → config provisioned → first call processed end-to-end

### Phase 3 — Automated Pipeline

- [ ] **3.1** Set up RabbitMQ exchanges and queues (`call.transcribe`, `call.analyze`, `call.notify`, `call.archive`) with dead-letter queues
- [ ] **3.2** Write `transcription_worker.py` as RabbitMQ consumer (replaces manual pipeline step)
- [ ] **3.3** Write `analysis_worker.py` as RabbitMQ consumer
- [ ] **3.4** Implement retry logic with exponential backoff in both workers (max 3 attempts)
- [ ] **3.5** Implement dead-letter queue handler (alerts ops on permanently failed calls)
- [ ] **3.6** Write `pipeline_orchestrator.py` for safety-net polling and archival scheduling
- [ ] **3.7** Create `pcaa_processing_log` table + structured logging in all workers
- [ ] **3.8** Update `{bid}_raw_calls.processing_state` at every stage transition
- [ ] **3.9** Write systemd unit files for all worker processes
- [ ] **3.10** Load test: simulate 100 calls/hour across 5 BIDs, verify end-to-end processing

### Phase 4 — Agent Feedback & Coaching

- [ ] **4.1** Create `pcaa_notification_config` table
- [ ] **4.2** Create `pcaa_agent_profiles` table (maps agent name/phone → email for routing)
- [ ] **4.3** Add `coaching_suggestions` JSON column to `{bid}_callanalytics`
- [ ] **4.4** Add coaching suggestion AI prompt to the analysis step (runs within analysis worker)
- [ ] **4.5** Write `notification_worker.py` as RabbitMQ consumer
- [ ] **4.6** Implement email delivery via SMTP/AWS SES (per-call agent report)
- [ ] **4.7** Implement WhatsApp Business API delivery (per-call agent report)
- [ ] **4.8** Implement daily supervisor digest (aggregated team report, scheduled send)
- [ ] **4.9** Add `/call-records/{bid}/{callid}/coaching` API endpoint
- [ ] **4.10** Update dashboard frontend to show coaching tips on call detail page
- [ ] **4.11** Test notification delivery end-to-end (email + WhatsApp)

### Phase 5 — Data Retention & Archival

- [ ] **5.1** Create `pcaa_archive_manifest` table
- [ ] **5.2** Add retention config columns to `business_pipeline_config` (`hot_retention_days`, `warm_retention_days`, `archive_audio`, `s3_bucket`)
- [ ] **5.3** Provision S3 bucket (or compatible object store) with BID-based prefix structure
- [ ] **5.4** Write `archive_worker.py` as RabbitMQ consumer
- [ ] **5.5** Implement call export: compress call record + transcript + analytics to JSON.gz → upload to S3
- [ ] **5.6** Implement audio archival (optional per BID config): download WAV → upload to S3
- [ ] **5.7** Implement hot-tier cleanup (delete `{bid}_sarvamresponse` row after archive + retention period)
- [ ] **5.8** Implement archived call retrieval in `/call-records/{bid}/{callid}` API (transparent S3 lookup)
- [ ] **5.9** Update `pipeline_orchestrator.py` to queue calls due for archival
- [ ] **5.10** Test archival and retrieval cycle end-to-end

### Phase 6 — Observability

- [ ] **6.1** Create `pcaa_pipeline_metrics` table for time-series snapshots
- [ ] **6.2** Implement metrics collector (runs alongside orchestrator, snapshots every 5 min)
- [ ] **6.3** Add ops API endpoints (`/ops/pipeline/health`, `/ops/pipeline/stuck-calls`, etc.)
- [ ] **6.4** Add RabbitMQ queue depth monitoring to health endpoint
- [ ] **6.5** Implement alerting rules (email/Slack on critical conditions)
- [ ] **6.6** Implement stuck-call detection and auto-requeue logic in orchestrator
- [ ] **6.7** Add structured JSON logging to all workers
- [ ] **6.8** Build ops monitoring section in dashboard frontend (or separate internal page)
- [ ] **6.9** Document runbook for common operational issues

---

## 16. Dependency & Sequencing Map

Phases must be executed in this order due to hard dependencies:

```
Phase 1 (Webhooks)
    └── Required before Phase 2 (need webhook infrastructure to receive activation events)

Phase 2 (Subscription/BID Activation)
    └── Required before Phase 3 (workers must check subscription status)

Phase 3 (Automated Pipeline)
    └── Required before Phase 4 (notifications fire at end of pipeline)
    └── Required before Phase 5 (archival integrates with pipeline orchestrator)
    └── Required before Phase 6 (observability wraps the pipeline)

Phase 4 (Notifications) ─── can run in parallel with Phase 5 and Phase 6
Phase 5 (Archival) ─────── can run in parallel with Phase 4 and Phase 6
Phase 6 (Observability) ─── can run in parallel with Phase 4 and Phase 5
```

**Minimum viable delivery order:**
1. Phase 1 → Phase 2 → Phase 3 → **Go Live (automated, multi-BID pipeline)**
2. Phase 4 → **Agent feedback live**
3. Phase 5 → **Data management live**
4. Phase 6 → **Full operational visibility**

---

## Summary: What Needs to Change

| Category | Count | Examples |
|---|---|---|
| New DB tables | 7 | `pcaa_subscriptions`, `pcaa_webhook_log`, `pcaa_processing_log`, `pcaa_notification_config`, `pcaa_agent_profiles`, `pcaa_archive_manifest`, `pcaa_pipeline_metrics` |
| Altered DB tables | 5 | `businesses`, `business_pipeline_config`, `{bid}_raw_calls`, `{bid}_callanalytics`, `quality_parameters` |
| New API endpoints | 14 | Webhooks, admin, ops, agent/supervisor |
| Modified API endpoints | 3 | Pipeline status, call-records list, call-record detail |
| New Python services | 6 | `transcription_worker`, `analysis_worker`, `notification_worker`, `archive_worker`, `pipeline_orchestrator`, `bid_provisioner` |
| New Python modules | 1 | `webhook_router.py` in FastAPI app |
| External dependencies | 3 | RabbitMQ (already installed), AWS S3, WhatsApp Business API |
| Mcube platform changes | 10 | Webhook delivery, HMAC signing, billing events, agent email in payload |
| Total activities | 58 | Across 6 phases |

---

*This document describes the target architecture only. No code changes have been made.*
*All existing scripts (`pipeline_6004.py`, `call_self_service.py`, `run_single_transcription.py`, etc.) remain in place and functional as the interim manual process during the transition period.*
