# Post-Call Analytics Platform — Architecture Overview

## What It Is & What It Should Do

The platform ingests recorded call audio from multiple call centers (businesses), transcribes speech to text, runs configurable AI agents against those transcripts, and surfaces results in a multi-tenant analytics dashboard. The goal is to process **thousands of minutes of audio per hour** across many businesses simultaneously, producing quality scores, BANT signals, sentiment, summaries, and CRM-enriched context — all queryable via a React UI and a RAG chat interface.

---

## Current Architecture

### Module Invocation Map

```mermaid
flowchart TD
    subgraph Sources["External Sources"]
        SRC_DB["Source DB\n(mcube_cl1 @ 10.40.180.35)"]
        LSQ["LeadSquared API\n(CRM leads)"]
        AUDIO["Audio Files\n(S3 / file URLs)"]
        SARVAM["Sarvam AI API\n(saaras:v2.5 + diarization)"]
        BEDROCK["AWS Bedrock\n(Nova lite/pro)"]
        OLLAMA["Ollama\n(local LLM)"]
        QDRANT["Qdrant\n(vector DB)"]
        REDIS["Redis\n(optional cache)"]
    end

    subgraph Pipeline["Pipeline Layer (runs as background process)"]
        CP["call_processor.py\nMulti-bid orchestrator\n--continuous --bid --stage"]
        P6004["pipeline_6004.py\nLegacy BID 6004\nhardcoded config"]
        SCL["sync_crm_leads.py\nLeadSquared lead sync\n--continuous"]
        AR["agent_runner.py\nAgentRunner class\nrenders prompts → LLM"]
        STT_F["stt/__init__.py\nget_stt_provider() factory"]
        STT_B["stt/base.py\nBaseSTT + STTResult"]
        STT_S["stt/sarvam.py\nSarvamSTT\nbatch upload → poll"]
    end

    subgraph Backend["Backend Layer (Flask, port 8002/8001)"]
        APP["dashboard-backend/app.py\n~40 REST endpoints\nmulti-tenant auth"]
        DH["db_handler.py\nDatabaseHandler\nall MySQL ops + encryption"]
        AUTH["auth_handler.py\nJWT + embed keys"]
        RAG["rag_handler.py\nRAGHandler\nembeddings + vector chat"]
        ANA["analytics.py\nAnalyticsService\naggregations + metrics"]
        QP["quality_parameters_handler.py\nscoring templates"]
        OBJ["objection_handler.py\nobjection classifications"]
        LSQS["leadsquared_service.py\nLeadSquared HTTP client"]
    end

    subgraph Legacy["Legacy Services"]
        DP["datapush/api_server.py\nFlask port 2000\n7417_callhistory → POST /update-call"]
        PCA["post call analysis/\nlegacy async pipeline\nport 4567"]
        RAG_MS["ai-call-quality-rag/\nFastAPI microservice\nport 4567 (RAG)"]
    end

    subgraph DB["MySQL (voicebot_cluster @ 10.0.0.109)"]
        CR["{bid}_call_records\npending→done"]
        BPC["business_pipeline_config"]
        BAC["business_agent_config"]
        SW["sync_watermarks"]
        CLA["crm_lead_activities\ncrm_leads_cache"]
        BUS["businesses\nbusiness_users\nuser_sessions"]
    end

    subgraph Frontend["Frontend (React/Vite, port 6174/6173)"]
        FE["dashboard-frontend/\nsrc/App.jsx\nsrc/components/Dashboard.jsx\nsrc/services/api.js"]
    end

    %% CRM sync flow
    LSQ -->|"Leads.Get AdvancedSearch"| SCL
    SCL -->|"upsert_crm_lead_activity\nset_sync_watermark"| DH
    DH --> CLA

    %% Pipeline Stage 1: SYNC
    SRC_DB -->|"SELECT ANSWER calls\nwatermark from sync_watermarks"| CP
    CP -->|"get_pipeline_config\nget_lead_phone_set"| DH
    DH --> BPC
    DH --> CLA
    CP -->|"upsert_call_record (pending)\nset_sync_watermark"| DH
    DH --> CR
    DH --> SW

    %% Pipeline Stage 2: TRANSCRIBE
    CP -->|"get_calls_to_transcribe"| DH
    CP -->|"get_stt_provider('sarvam')"| STT_F
    STT_F -->|"instantiates"| STT_S
    STT_S -->|"inherits"| STT_B
    AUDIO -->|"download audio bytes"| STT_S
    STT_S -->|"batch upload + poll"| SARVAM
    SARVAM -->|"diarized transcript\nspeaker_segments"| STT_S
    STT_S -->|"STTResult"| CP
    CP -->|"save_call_transcription\nstatus=transcribed"| DH
    DH --> CR

    %% Pipeline Stage 3: ANALYZE
    CP -->|"get_calls_to_analyze"| DH
    CP -->|"AgentRunner.run()"| AR
    AR -->|"get_agent_configs"| DH
    DH --> BAC
    AR -->|"Converse API\nmodel=nova-lite"| BEDROCK
    AR -->|"POST /api/chat\n(fallback)"| OLLAMA
    AR -->|"aggregated analysis JSON"| CP
    CP -->|"save_call_analysis\nstatus=done"| DH
    DH --> CR

    %% Legacy BID 6004
    SRC_DB -->|"hardcoded SELECT"| P6004
    P6004 -->|"direct DB writes"| DH

    %% Legacy datapush
    SRC_DB -->|"SELECT 7417_callhistory"| DP
    DP -->|"POST /update-call"| PCA

    %% Backend → DB
    APP -->|"all DB ops"| DH
    APP -->|"validateToken"| AUTH
    APP -->|"rag_search / rag_chat"| RAG
    APP -->|"getMetrics / aggregations"| ANA
    APP -->|"getScoringTemplate"| QP
    APP -->|"getObjections"| OBJ
    APP -->|"LSQ enrichment"| LSQS
    AUTH --> BUS
    RAG -->|"titan-embed-text-v2"| BEDROCK
    RAG -->|"upsert / search"| QDRANT
    RAG -->|"get/set"| REDIS
    RAG -->|"transcript storage"| DH

    %% Frontend → Backend
    FE -->|"axios / api.js\nGET /calls /call-records\nPOST /auth /rag/chat"| APP
```

---

## How Each Module Invokes the Others

| Caller | Calls | Via |
|--------|-------|-----|
| `call_processor.py` | `db_handler.DatabaseHandler` | Direct instantiation — reads `business_pipeline_config`, writes `{bid}_call_records`, `sync_watermarks` |
| `call_processor.py` | `stt/__init__.get_stt_provider()` | Factory call → returns `SarvamSTT` instance |
| `call_processor.py` | `agent_runner.AgentRunner.run()` | Direct method call after transcription completes |
| `agent_runner.py` | AWS Bedrock (`boto3`) | `client.converse()` — sends rendered prompt, receives JSON |
| `agent_runner.py` | Ollama | `requests.post()` to `localhost:11434` |
| `stt/sarvam.py` | Sarvam AI API | `requests.post()` — uploads audio, polls job until done |
| `sync_crm_leads.py` | `leadsquared_service.LeadSquaredService` | HTTP calls to LeadSquared `Leads.Get` + activities endpoints |
| `sync_crm_leads.py` | `db_handler.DatabaseHandler` | Upserts leads, sets watermarks |
| `app.py` | `db_handler.DatabaseHandler` | All reads/writes for every API endpoint |
| `app.py` | `rag_handler.RAGHandler` | On `/rag/search` and `/rag/chat` routes |
| `app.py` | `auth_handler.AuthHandler` | On every protected route — JWT validation |
| `rag_handler.py` | Bedrock (`boto3`) | Embeddings (`titan-embed-text-v2`) + chat (`nova-lite`) |
| `rag_handler.py` | Qdrant (`qdrant_client`) | Vector upsert + similarity search, one collection per bid |
| `datapush/api_server.py` | Legacy `post call analysis` service | `POST /update-call` — fire-and-forget HTTP |
| `dashboard-frontend` | `app.py` | Axios calls in `src/services/api.js` to all REST endpoints |

---

## Current Data Flow (End-to-End)

```
Source DB (call records)
        │
        ▼  [call_processor.py Stage 1 — SYNC]
        Filter by CRM phone list (from crm_leads_cache)
        Watermark via sync_watermarks
        INSERT into {bid}_call_records (status=pending)
        │
        ▼  [call_processor.py Stage 2 — TRANSCRIBE]
        Download audio from file_url
        SarvamSTT: upload → batch job → poll → parse diarized output
        SAVE transcript + speaker_segments → status=transcribed
        │
        ▼  [call_processor.py Stage 3 — ANALYZE]
        Load agent configs from business_agent_config
        AgentRunner: render prompt → Bedrock / Ollama → parse JSON
        SAVE analysis blob → status=done
        │
        ▼  [app.py REST API]
        GET /call-records/{bid} → paginated list
        GET /call-records/{bid}/{callid} → full detail
        GET /pipeline/{bid}/status → watermarks + status counts
        │
        ▼  [dashboard-frontend]
        Dashboard.jsx renders calls, scores, sentiment, BANT
        RAG chat widget queries /rag/{bid}/chat
```

---

## What Is Missing / Broken Today

- **No queue** — `call_processor.py` runs a polling loop with `time.sleep()`. If it crashes, work is lost. No retry, no dead-letter, no backpressure.
- **Single-process, single-threaded per stage** — one worker processes calls one at a time per stage. Cannot scale to thousands of minutes/hour.
- **Transcribe batch = 1** — each audio file is a blocking sequential call to Sarvam. A 5-minute call takes ~30s; 1000 calls = ~8 hours.
- **Analyze batch = 1** — same problem with LLM calls.
- **`pipeline_6004.py` is hardcoded** — duplicates logic from `call_processor.py`, will diverge.
- **`datapush/`** uses hardcoded credentials, pushes to a legacy service, and is disconnected from the new pipeline.
- **`ai-call-quality-rag/`** is a separate unintegrated microservice running its own stack.
- **No observability** — no metrics, no dead-letter queues, no alert on stalled pipeline.
- **Lead filter blocks everything** — if LSQ has 0 leads (BID 6004 case), zero calls are ever synced.

---

## Production Target Architecture

> Handle thousands of minutes of audio per hour across multiple businesses with fault tolerance, horizontal scale, and pluggable providers.

```mermaid
flowchart TD
    subgraph Ingestion["Ingestion Layer"]
        SRC["Source DBs\n(per-bid config)"]
        CRM["CRM APIs\n(LeadSquared, Salesforce, etc.)"]
        WEBHOOK["Webhooks / Push APIs\n(future: real-time delivery)"]
    end

    subgraph Queue["Message Queue (RabbitMQ or Kafka)"]
        Q_SYNC["sync.{bid} queue\nnew call records"]
        Q_STT["transcribe.{bid} queue\npending audio URLs"]
        Q_ANALYZE["analyze.{bid} queue\nfinished transcripts"]
        Q_NOTIFY["notify.{bid} queue\ncompleted results"]
        DLQ["Dead Letter Queues\nfailed jobs with retry metadata"]
    end

    subgraph Workers["Worker Pool (horizontally scalable)"]
        W_SYNC["Sync Workers\ncall_processor.stage_sync()\nN processes per bid"]
        W_STT["STT Workers\ncall_processor.stage_transcribe()\nN processes, batched"]
        W_ANALYZE["Analyze Workers\ncall_processor.stage_analyze()\nN processes, batched"]
        W_NOTIFY["Notify Workers\nwebhook / email / CRM write-back"]
    end

    subgraph STT_Providers["STT Provider Pool (stt/)"]
        SARVAM["SarvamSTT\n(saaras:v2.5)"]
        DEEPGRAM["DeepgramSTT\n(nova-3)"]
        WHISPER["WhisperSTT\n(self-hosted)"]
    end

    subgraph LLM_Providers["LLM Provider Pool (agent_runner.py)"]
        BEDROCK["AWS Bedrock\n(Nova pro/lite)"]
        OLLAMA["Ollama\n(local GPU)"]
        OPENAI["OpenAI-compatible\n(future)"]
    end

    subgraph Backend["API Layer (app.py — Flask or FastAPI)"]
        REST["REST API\ncall-records, pipeline, agents\nauth, dashboard, rag"]
        WS["WebSocket / SSE\nreal-time status push"]
        AUTH2["auth_handler.py\nJWT + API keys"]
    end

    subgraph Storage["Storage Layer"]
        MYSQL["MySQL (voicebot_cluster)\nbusiness_pipeline_config\nbusiness_agent_config\n{bid}_call_records\nsync_watermarks"]
        S3["Object Storage (S3)\naudio archive\ntranscript JSON"]
        QDRANT2["Qdrant\nvector collections per bid"]
        REDIS2["Redis\nRAG session cache\nrate limiting\njob status"]
    end

    subgraph Observability["Observability"]
        PROM["Prometheus\njob counters, queue depth"]
        GRAF["Grafana\ndashboards + alerts"]
        LOGS["Structured Logs\n(JSON → ELK or CloudWatch)"]
    end

    subgraph Frontend2["Frontend (dashboard-frontend/)"]
        FE2["React Dashboard\nCall list, scores, sentiment\nRAG chat, pipeline status"]
    end

    %% Ingestion → Queue
    SRC -->|"watermarked poll\nor CDC (Debezium)"| W_SYNC
    CRM -->|"sync_crm_leads.py\ncontinuous"| W_SYNC
    WEBHOOK -->|"push event"| Q_SYNC

    W_SYNC -->|"upsert call_records(pending)"| MYSQL
    W_SYNC -->|"publish callid + audio_url"| Q_STT
    Q_SYNC --> W_SYNC

    %% STT stage
    Q_STT --> W_STT
    W_STT -->|"get_stt_provider()"| SARVAM
    W_STT -->|"get_stt_provider()"| DEEPGRAM
    W_STT -->|"get_stt_provider()"| WHISPER
    W_STT -->|"save transcript\nstatus=transcribed"| MYSQL
    W_STT -->|"store audio + transcript"| S3
    W_STT -->|"publish callid"| Q_ANALYZE
    W_STT -->|"on failure"| DLQ

    %% Analyze stage
    Q_ANALYZE --> W_ANALYZE
    W_ANALYZE -->|"AgentRunner.run()"| BEDROCK
    W_ANALYZE -->|"AgentRunner.run()"| OLLAMA
    W_ANALYZE -->|"AgentRunner.run()"| OPENAI
    W_ANALYZE -->|"save analysis\nstatus=done"| MYSQL
    W_ANALYZE -->|"upsert embeddings"| QDRANT2
    W_ANALYZE -->|"publish result"| Q_NOTIFY
    W_ANALYZE -->|"on failure"| DLQ

    %% Notify
    Q_NOTIFY --> W_NOTIFY
    W_NOTIFY -->|"CRM write-back\nwebhook\nemail"| CRM

    %% API layer
    REST -->|"db_handler.py"| MYSQL
    REST -->|"rag_handler.py"| QDRANT2
    REST -->|"rag_handler.py"| REDIS2
    REST --> AUTH2
    WS -->|"subscribe call status"| REDIS2

    %% Frontend
    FE2 -->|"api.js axios"| REST
    FE2 -->|"SSE / WebSocket"| WS

    %% Observability
    W_SYNC & W_STT & W_ANALYZE -->|"metrics"| PROM
    PROM --> GRAF
    DLQ -->|"alert"| GRAF
```

---

## What Changes and Why

### 1. Replace polling loops with a message queue

**Current:** `call_processor.py` polls MySQL in a `while True` loop with `time.sleep()`.
**Target:** Each stage publishes a message when it finishes. Workers consume from `transcribe.{bid}` and `analyze.{bid}` queues (RabbitMQ already in the stack via `pika`). Work is never lost — queue holds state.

### 2. Make workers stateless and horizontally scalable

**Current:** One process runs all three stages sequentially for each bid.
**Target:** Separate worker processes for SYNC, STT, ANALYZE. Each reads config from DB — no hardcoded BIDs. Spin up 10 STT workers per bid when queue depth exceeds threshold. Use `call_processor.py` stage functions as the task body, wrapped in a queue consumer.

### 3. Batch STT calls

**Current:** `TRANSCRIBE_BATCH=1` — one audio file processed at a time.
**Target:** STT workers pull N audio URLs, upload concurrently (async or thread pool), collect results. Sarvam supports concurrent jobs. At 10 parallel jobs, a 5-min audio at 30s each → 60 calls processed per minute per worker → 300 calls/min with 5 workers.

### 4. Batch and cache LLM analysis

**Current:** `ANALYZE_BATCH=1` — one Bedrock call per call record.
**Target:** Analyze workers pull N transcripts, send concurrent Bedrock requests. Add Redis cache keyed on `hash(transcript + agent_prompt)` for near-duplicate calls (common in scripted sales).

### 5. Unify `pipeline_6004.py` into `call_processor.py`

Delete `pipeline_6004.py`. All BID-specific config lives in `business_pipeline_config`. Add a seed script or API endpoint to register new bids.

### 6. Upgrade `datapush/` to write to `{bid}_call_records`

Remove the hop through the legacy `post call analysis` service. `datapush/api_server.py` should write directly to `{bid}_call_records` with `status=pending` and publish to the STT queue.

### 7. Integrate `ai-call-quality-rag/` or retire it

`rag_handler.py` inside `dashboard-backend/` already covers the same function. Either merge the `ai-call-quality-rag/` FastAPI service's unique logic into `rag_handler.py`, or make it a proper internal microservice with a documented API contract — not a parallel duplicate.

### 8. Real-time status in the frontend

**Current:** Frontend polls `/pipeline/{bid}/status`.
**Target:** Backend publishes job status events to Redis Pub/Sub. API layer streams updates via SSE (`/call-records/{bid}/stream`). Frontend subscribes and updates the call list in real time without polling.

### 9. Observability

Add Prometheus counters in each worker stage: `calls_synced_total`, `calls_transcribed_total`, `calls_analyzed_total`, `queue_depth`, `stt_duration_seconds`, `llm_duration_seconds`. Wire a Grafana dashboard. Alert on DLQ depth > 0 and queue lag > 500.

---

## File-Level Change Summary

| File / Folder | Current Role | Action |
|---------------|-------------|--------|
| `call_processor.py` | Polling pipeline orchestrator | Keep stage functions, wrap in queue consumer |
| `pipeline_6004.py` | Hardcoded BID 6004 pipeline | Delete — config lives in DB |
| `agent_runner.py` | LLM agent runner | Keep — add concurrency / batch support |
| `stt/__init__.py` + `stt/*.py` | STT factory + Sarvam impl | Keep — add `DeepgramSTT`, `WhisperSTT` |
| `sync_crm_leads.py` | LSQ lead sync | Keep — run as independent process |
| `dashboard-backend/app.py` | Flask REST API | Keep — add SSE endpoint, wire `AgentRunner` for on-demand re-analysis |
| `dashboard-backend/db_handler.py` | All MySQL ops | Keep — add queue-depth query |
| `dashboard-backend/rag_handler.py` | RAG + Qdrant | Keep — replace `ai-call-quality-rag/` |
| `datapush/api_server.py` | Legacy HTTP relay | Rewrite to write directly to `{bid}_call_records` + publish to STT queue |
| `datapush/transfer_data.py` | CLI runner | Keep as a one-shot backfill tool |
| `ai-call-quality-rag/` | Duplicate RAG microservice | Merge into `rag_handler.py` or deprecate |
| `dashboard-frontend/` | React UI | Add real-time status, pipeline config UI, agent builder |
| `workers/` _(new)_ | Queue consumers | New: `sync_worker.py`, `stt_worker.py`, `analyze_worker.py` |
| `infra/` _(new)_ | Docker Compose / Helm | New: RabbitMQ, Redis, Prometheus, Grafana, Qdrant configs |
