# Audio Transcription Modules — Reference Guide

> All files related to converting audio call recordings into transcripts, across two pipelines:
> **`dashboard-backend/`** (current/primary) and **`post call analysis/`** (older/alternate).

---

## Table of Contents

1. [Architecture Overview](#1-architecture-overview)
2. [Invocation Flow Diagrams](#2-invocation-flow-diagrams)
3. [Module Reference — dashboard-backend](#3-module-reference--dashboard-backend)
   - [Entry Points (Runners)](#31-entry-points--runners)
   - [STT Provider Abstraction Layer](#32-stt-provider-abstraction-layer-stt)
   - [Transcription Services](#33-transcription-services)
   - [Post-Processing Utilities](#34-post-processing-utilities)
   - [Queue Infrastructure](#35-queue-infrastructure)
   - [Batch / One-off Scripts](#36-batch--one-off-scripts)
4. [Module Reference — post call analysis](#4-module-reference--post-call-analysis-older-pipeline)
5. [STT Providers Quick Reference](#5-stt-providers-quick-reference)

---

## 1. Architecture Overview

```
AUDIO RECORDING URL
        │
        ▼
┌────────────────────┐          ┌─────────────────────────────────────┐
│  call_processor.py │ ───────► │  stt/__init__.py  (Provider Factory) │
│  (primary runner)  │          │  → SarvamSTT / DeepgramTranscriber  │
└────────────────────┘          └─────────────────────────────────────┘
        │                                          │
        │  OR (legacy batch)                       │
        ▼                                          ▼
┌───────────────────────┐        ┌────────────────────────────────────────┐
│ batch_process_calls.py│        │  unified_transcription_service.py      │
│ (Jubilant Foods only) │        │  (multi-provider with quality checks)  │
└───────────────────────┘        └────────────────────────────────────────┘
                                                   │
           ┌───────────────────────────────────────┘
           │
           ▼
┌───────────────────────────────────────────────────────────────────┐
│   STT API Call (Deepgram  OR  Sarvam AI)                          │
│                                                                   │
│   Deepgram: POST https://api.deepgram.com/v1/listen               │
│   Sarvam:   speech_to_text_translate_job (batch, async)           │
└───────────────────────────────────────────────────────────────────┘
           │
           ▼
┌──────────────────────────────────────────┐
│   speaker_splitting.py                   │
│   (fallback diarization if < 2 speakers) │
└──────────────────────────────────────────┘
           │
           ▼
┌──────────────────────────────────────────┐
│   Claude (Anthropic) — translate_and_cleanup │
│   (translate Hindi/mixed → English,      │
│    fix grammar, preserve speaker labels) │
└──────────────────────────────────────────┘
           │
           ▼
┌──────────────────────────────────────────┐
│   clean_transcript_formatter.py          │
│   (merge segments, map roles, clean text)│
└──────────────────────────────────────────┘
           │
           ▼
    DB: {bid}_call_records / sarvamresponse
```

---

## 2. Invocation Flow Diagrams

### 2a. Primary Pipeline — `call_processor.py`

```
main()
  └─► _build_config()
  └─► DatabaseHandler(config)
  └─► AgentRunner(config)
  └─► [for each active bid]:
        run_bid(bid, db, agent_runner, stage_filter)
          ├─► stage_sync(bid, db, pipeline_cfg)
          │     Pulls ANSWER calls from source DB → upserts into {bid}_call_records
          │
          ├─► stage_transcribe(bid, db, pipeline_cfg)
          │     └─► get_stt_provider(provider_name, api_key)   [stt/__init__.py]
          │            └─► SarvamSTT(api_key)                   [stt/sarvam.py]
          │                  └─► SarvamSTT.transcribe(audio_url, callid)
          │                        1. Download audio to /tmp/*.wav
          │                        2. client.speech_to_text_translate_job.create_job()
          │                        3. job.upload_files([tmp_path])  (retry up to 5×)
          │                        4. job.start()
          │                        5. job.wait_until_complete()
          │                        6. get_download_links() → download result JSON
          │                        7. _parse_sarvam_result(json) → STTResult
          │                  └─► Updates {bid}_call_records (transcript, stt_provider, …)
          │
          └─► stage_analyze(bid, db, agent_runner)
                └─► AgentRunner.run(transcript, …) → analysis saved to DB
```

### 2b. Unified/Multi-Provider Flow — `unified_transcription_service.py`

```
UnifiedTranscriptionService(deepgram_key, sarvam_key, anthropic_key)
  └─► transcribe_with_retry(audio_url, expected_duration, force_provider)
        1. LanguageDetector.detect_language_from_text()  ──► 'hi-en' / 'kannada' / etc.
        2. Choose provider:
             LanguageDetector.should_use_deepgram(lang) → DeepgramTranscriber
             LanguageDetector.should_use_sarvam(lang)   → SarvamTranscriber
        3. provider.transcribe_audio(audio_url)
        4. TranscriptQualityValidator.validate_transcript(result, audio_url)
             ├─► _get_audio_duration(audio_url)
             └─► _is_repetitive(text)
        5. If quality FAIL → retry with ALTERNATE provider
        6. DeepgramTranscriber.translate_and_cleanup(raw_transcript)
             └─► Claude API (claude-3-5-haiku) → cleaned/translated text
        7. _format_speaker_segments(utterances)
        8. _extract_translated_segments(cleaned_transcript, raw_segments)
             └─► fallback: _fallback_segment_mapping()

  └─► process_call_with_quality_check(audio_url, expected_duration)
        Thin wrapper over transcribe_with_retry() with error handling
```

### 2c. Deepgram-Specific Flow — `deepgram_transcription.py`

```
DeepgramTranscriber.process_call(audio_url)
  ├─► transcribe_audio(audio_url)
  │     POST https://api.deepgram.com/v1/listen
  │       model=nova-2, language=hi, diarize=true,
  │       min/max_speakers=2, utterances=true
  │     If diarization weak (< 2 speakers):
  │       └─► smart_speaker_split(utterances)   [deepgram_transcription.py]
  │             Uses pause gaps to alternate Speaker 0 ↔ Speaker 1
  │
  ├─► translate_and_cleanup(raw_transcript)
  │     detect_language(text)  → 'hi' / 'en' / 'mixed'
  │     Claude API prompt → English-translated, grammar-fixed transcript
  │
  └─► Returns: { transcript, raw_transcript, speaker_segments,
                 duration, num_speakers, stt_provider, language_detected }
```

### 2d. Legacy Queue Pipeline — `post call analysis/`

```
[Cron / External trigger]
  └─► audio_job_consumer.py  ──► RabbitMQ (queue: audio_jobs)
        └─► callback(ch, method, properties, body)
              parse message: { callid, bid }
              └─► sarvam_processor.process_call_id(call_id, bid)
                    1. Fetch audio_url from DB for call_id
                    2. send_to_sarvam(bid)
                         init_sarvam_job()         → create Sarvam job
                         upload audio file
                         start_sarvam_job(job_id)  → start job
                         poll_sarvam_status(job_id, output_url)
                           polls every N seconds until done
                    3. parse_diarized_transcript(result)
                         → { full_transcript, speaker_segments,
                             num_speakers, duration }
                    4. Save transcript to DB
```

---

## 3. Module Reference — `dashboard-backend/`

### 3.1 Entry Points / Runners

---

#### `call_processor.py` ⭐ Primary Production Runner

Multi-bid, DB-driven pipeline. Replaces hardcoded `pipeline_6004.py`.

| Function | Signature | Description |
|---|---|---|
| `main` | `main()` | Parses CLI args, sets up DB + AgentRunner, loops over active bids |
| `_build_config` | `_build_config()` | Constructs runtime config from env/CLI flags |
| `run_bid` | `run_bid(bid, db, agent_runner, stage_filter=None)` | Runs full pipeline (sync → transcribe → analyze) for one bid; returns `{synced, transcribed, analyzed}` |
| `stage_sync` | `stage_sync(bid, db, pipeline_cfg)` | Pulls ANSWER calls from source DB and upserts into `{bid}_call_records` |
| `stage_transcribe` | `stage_transcribe(bid, db, pipeline_cfg)` | Fetches pending calls, calls STT provider, saves transcript to `{bid}_call_records` |
| `stage_analyze` | `stage_analyze(bid, db, agent_runner)` | Runs AI agents on transcribed calls, saves analysis |
| `_decrypt_stt_key` | `_decrypt_stt_key(db, cfg)` | Decrypts STT API key from pipeline config or falls back to env |
| `_decrypt_source_password` | `_decrypt_source_password(db, cfg)` | Decrypts source DB password |
| `_open_source_conn` | `_open_source_conn(cfg)` | Opens MySQL connection to source (raw calls) database |
| `_detect_source_table` | `_detect_source_table(conn, bid)` | Auto-detects the raw_calls table name for a given bid |
| `_detect_source_columns` | `_detect_source_columns(conn, table)` | Introspects available columns in the source table |
| `_phone_variants` | `_phone_variants(phone)` | Generates phone number variants (10-digit, with country code, etc.) |
| `_pick_customer_phone` | `_pick_customer_phone(call)` | Picks the best customer phone field from a call record |
| `_handle_signal` | `_handle_signal(sig, frame)` | SIGTERM/SIGINT handler for graceful shutdown |

---

#### `batch_process_calls.py` — Batch Runner (Jubilant Foods)

Processes unprocessed calls in continuous batches of 20.

| Function | Signature | Description |
|---|---|---|
| `BatchCallProcessor.__init__` | `__init__(self, bid, batch_size=20)` | Sets up DB config and transcription service |
| `get_connection` | `get_connection(self)` | Returns a MySQL connection |
| `get_calls_needing_transcription` | `get_calls_needing_transcription(self, limit=None)` | Queries `raw_calls` for calls without a transcript |
| `get_calls_needing_analysis` | `get_calls_needing_analysis(self, limit=None)` | Queries for transcribed calls missing analytics |
| `mark_transcription_requested` | `mark_transcription_requested(self, callids)` | Marks calls as transcription-in-progress in DB |
| `request_transcription` | `request_transcription(self, calls)` | Submits calls to transcription service |
| `analyze_calls` | `analyze_calls(self, calls)` | Submits transcribed calls to AI analysis |
| `get_processing_stats` | `get_processing_stats(self)` | Returns counts: pending/transcribed/analyzed |
| `run_single_batch` | `run_single_batch(self)` | Runs one batch cycle (transcription + analysis) |
| `run_continuous` | `run_continuous(self, interval=60)` | Loops `run_single_batch()` every N seconds |
| `main` | `main()` | CLI entry point |

---

### 3.2 STT Provider Abstraction Layer (`stt/`)

---

#### `stt/__init__.py` — Provider Factory

| Function | Signature | Description |
|---|---|---|
| `get_stt_provider` | `get_stt_provider(provider: str, api_key: str) → BaseSTT` | Instantiates and returns the named STT provider (currently: `"sarvam"`). Raises `ValueError` for unknown providers. |
| `_register` | `_register(name: str)` | Decorator that registers a provider class in the `_REGISTRY` dict |

---

#### `stt/base.py` — Abstract Base Classes

| Class / Function | Description |
|---|---|
| `STTResult` (dataclass) | Normalised output from any STT provider. Fields: `transcript: str`, `speaker_segments: List[Dict]`, `duration: float`, `provider: str` |
| `STTResult.to_dict` | Returns a plain dict representation |
| `BaseSTT` (ABC) | Abstract base class all providers must implement |
| `BaseSTT.provider_name` (property) | Short string identifier stored in `stt_provider` DB column |
| `BaseSTT.transcribe` | `transcribe(audio_url, callid) → STTResult` — abstract; raises `RuntimeError` on failure |

---

#### `stt/sarvam.py` — Sarvam AI Provider

Uses Sarvam `saaras:v2.5` batch transcription+translation API with speaker diarisation.

| Function | Signature | Description |
|---|---|---|
| `_parse_sarvam_result` | `_parse_sarvam_result(result: Dict) → STTResult` | Converts raw Sarvam JSON to normalised `STTResult`; maps `speaker_0` → Agent, others → Customer |
| `SarvamSTT.__init__` | `__init__(self, api_key: str)` | Validates and stores the Sarvam subscription key |
| `SarvamSTT.provider_name` | property → `"sarvam"` | Provider identifier |
| `SarvamSTT.transcribe` | `transcribe(self, audio_url, callid) → STTResult` | Full flow: download audio → create job → upload (retry ×5) → start → wait → download result → parse |

**Internal steps in `SarvamSTT.transcribe`:**
1. Download audio URL → `/tmp/*.wav`
2. `client.speech_to_text_translate_job.create_job(model="saaras:v2.5", with_diarization=True, num_speakers=2)`
3. `job.upload_files([tmp_path])` — retries up to 5× on Azure 403 bug
4. `job.start()`
5. `job.wait_until_complete(poll_interval=3s, timeout=300s)`
6. `client.speech_to_text_translate_job.get_download_links(job_id, files=[output_file])`
7. Download result JSON → `_parse_sarvam_result(json)`

---

### 3.3 Transcription Services

---

#### `unified_transcription_service.py` — Multi-Provider Unified Service

Supports Deepgram (Hindi-English) and Sarvam (Indian languages). Includes language detection, automatic provider selection, quality validation, and retry with alternate provider.

**Classes:**

| Class | Description |
|---|---|
| `TranscriptQualityValidator` | Validates transcript quality; detects bad transcriptions |
| `LanguageDetector` | Detects language from text to route to correct STT service |
| `SarvamTranscriber` | Sarvam wrapper with same interface as `DeepgramTranscriber` |
| `UnifiedTranscriptionService` | Orchestrator: detects language, picks provider, validates quality, retries |

**`TranscriptQualityValidator` Functions:**

| Function | Description |
|---|---|
| `validate_transcript(transcript_data, audio_url, expected_duration)` | Returns `(is_valid, [issues])`. Checks duration match, repetition, word count |
| `_get_audio_duration(audio_url)` | Downloads audio and returns duration using `librosa` |
| `_is_repetitive(text, threshold=0.4)` | Returns `True` if text has excessive token repetition |

**`LanguageDetector` Functions:**

| Function | Description |
|---|---|
| `detect_language_from_text(text)` | Returns `'hi-en'`, `'kannada'`, `'tamil'`, `'telugu'`, `'en'`, etc. by scanning Unicode ranges |
| `should_use_deepgram(language)` | Returns `True` for Hindi/English mix (`'hi-en'`, `'hi'`, `'en'`) |
| `should_use_sarvam(language)` | Returns `True` for Kannada, Tamil, Telugu, and other Indian languages |

**`SarvamTranscriber` Functions:**

| Function | Description |
|---|---|
| `__init__(self, sarvam_api_key)` | Inits Sarvam client; creates temp audio dir |
| `transcribe_audio(self, audio_url)` | Downloads audio, submits to Sarvam job API, returns raw JSON result |

**`UnifiedTranscriptionService` Functions:**

| Function | Description |
|---|---|
| `__init__(self, deepgram_api_key, sarvam_api_key, anthropic_api_key)` | Initialises all three sub-clients |
| `transcribe_with_retry(self, audio_url, expected_duration, force_provider)` | Main entry: detect language → pick provider → transcribe → validate quality → retry with alternate if needed → clean/translate via Claude |
| `_format_speaker_segments(self, utterances)` | Converts raw utterances to unified `{speaker, text, start_time, end_time}` format |
| `_extract_translated_segments(self, cleaned_transcript, raw_segments)` | Parses Claude's translated output and re-aligns it with original timing data |
| `_fallback_segment_mapping(self, cleaned_transcript, raw_segments)` | Fallback: uses raw timing but text from cleaned transcript in order |
| `process_call_with_quality_check(self, audio_url, expected_duration)` | Thin wrapper over `transcribe_with_retry` with error handling; returns complete result or `None` |

---

#### `deepgram_transcription.py` — Deepgram Provider

Deepgram Nova-2 STT with deterministic diarization and Claude cleanup.

| Function | Signature | Description |
|---|---|---|
| `smart_speaker_split` | `smart_speaker_split(utterances)` | Deterministic 2-speaker fallback diarization using pause duration + utterance length. Switches speaker on gap ≥ 0.8s or gap ≥ 0.4s with ≥ 3 words |
| `DeepgramTranscriber.__init__` | `__init__(deepgram_api_key, anthropic_api_key)` | Creates Deepgram + Anthropic clients |
| `DeepgramTranscriber.transcribe_audio` | `transcribe_audio(audio_url) → Dict` | POST to Deepgram API (`nova-2`, Hindi, diarize ×2); validates speaker count; applies `smart_speaker_split` if weak; returns `{raw_transcript, utterances, duration, num_speakers, language}` |
| `DeepgramTranscriber.detect_language` | `detect_language(text) → str` | Static. Returns `'hi'`, `'en'`, or `'mixed'` based on Unicode char analysis |
| `DeepgramTranscriber.translate_and_cleanup` | `translate_and_cleanup(transcript) → str` | Sends raw transcript to Claude (`claude-3-5-haiku-20241022`) with strict rules: translate Hindi→English, fix grammar, preserve speaker labels |
| `DeepgramTranscriber.process_call` | `process_call(audio_url) → Dict` | Orchestrates: `transcribe_audio` → `translate_and_cleanup` → assembles final result dict |
| `test_transcription` | `test_transcription(audio_url)` | CLI test runner |

---

### 3.4 Post-Processing Utilities

---

#### `speaker_splitting.py` — Fallback Diarization

Used when the STT provider returns only a single speaker for the entire call.

| Function | Signature | Description |
|---|---|---|
| `split_into_speakers` | `split_into_speakers(utterances, min_silence_gap=2.0) → List[Dict]` | Alternates speaker assignment on silence gaps ≥ `min_silence_gap` seconds |
| `smart_speaker_split` | `smart_speaker_split(utterances, num_utterances_threshold=3) → List[Dict]` | Computes dynamic silence threshold (`max(1.5, avg_gap × 1.5)`) then calls `split_into_speakers`. Returns unchanged if already multi-speaker |

---

#### `clean_transcript_formatter.py` — Transcript Post-Processing

| Function | Signature | Description |
|---|---|---|
| `clean_text` | `clean_text(text) → str` | Removes fillers (yeah/uh/um), fixes spacing, applies domain-specific text replacements |
| `map_speaker` | `map_speaker(speaker_label) → str` | Converts `"Speaker 1"` / `"Speaker 2"` → `"Agent"` / `"Customer"` |
| `merge_segments` | `merge_segments(segments) → List[Dict]` | Merges consecutive segments from the same speaker into one; applies `clean_text` and `map_speaker` |
| `build_clean_transcript` | `build_clean_transcript(segments) → str` | Formats merged segments as `Agent: <text>\n\nCustomer: <text>…` |
| `format_output` | `format_output(raw_transcript, speaker_segments_json) → Dict` | Main entry: parses segments JSON → `merge_segments` → `build_clean_transcript` → returns `{clean_transcript, clean_speaker_diarization}` |

---

### 3.5 Queue Infrastructure

---

#### `call_queue_producer.py` — RabbitMQ Producer

Pushes answered calls (with transcripts) onto the `call_processing_queue` for downstream analysis.

| Function | Signature | Description |
|---|---|---|
| `CallQueueProducer.__init__` | `__init__(self)` | Connects to RabbitMQ (`localhost`), declares durable queue |
| `get_unprocessed_calls` | `get_unprocessed_calls(self, limit=100)` | Queries DB for calls with transcript but no analytics entry |
| `queue_call` | `queue_call(self, call)` | Publishes JSON message `{callid, groupname, agentname, call_starttime, queued_at}` to RabbitMQ with persistent delivery |
| `run` | `run(self)` | Fetches unprocessed calls → queues each → logs results |

---

### 3.6 Batch / One-off Scripts

| File | Purpose |
|---|---|
| `comprehensive_transcribe.py` | Full-pass transcription of all calls |
| `comprehensive_transcribe_v2.py` | Updated version of the above |
| `direct_transcribe_pending.py` | Directly transcribes calls still in `pending` state |
| `process_raw_calls.py` | Processes raw call audio files |
| `process_all_answered_calls.py` | Processes all ANSWER-status calls |
| `process_all_calls_comprehensive.py` | Full pipeline for all calls |
| `process_calls_unified.py` | Unified processing across both STT providers |
| `process_50_calls.py` | One-shot: process exactly 50 calls |
| `reprocess_calls.py` | Re-transcribes already-processed calls |
| `retranslate_segments.py` | Re-translates/re-transcribes specific segments only |
| `pipeline_6004.py` | Hardcoded pipeline for bid 6004 (deprecated, replaced by `call_processor.py`) |
| `analyze_all_calls.py` | Post-transcription: analyzes quality of transcription results |
| `debug_deepgram.py` | Debug/test harness for Deepgram transcription |
| `test_with_timestamps.py` | Tests transcription output with timestamp validation |
| `backfill_rag_transcripts.py` | Backfills RAG (retrieval-augmented generation) index with transcripts |

---

## 4. Module Reference — `post call analysis/` (Older Pipeline)

---

#### `audio_job_consumer.py` — RabbitMQ Consumer

| Function | Description |
|---|---|
| `callback(ch, method, properties, body)` | Receives a `{callid, bid}` JSON message from RabbitMQ queue `audio_jobs`; calls `process_call_id(call_id, bid)`; ACKs the message |
| `start_consumer()` | Connects to RabbitMQ (`10.0.0.109`, vhost `vGadmin`), declares queue, starts `basic_consume` loop |

---

#### `sarvam_processor.py` — Sarvam API Integration (Older)

| Function | Description |
|---|---|
| `parse_diarized_transcript(result)` | Parses Sarvam API response; returns `{full_transcript, speaker_segments, num_speakers, duration}` |
| `send_to_sarvam(bid=None)` | Fetches audio URLs from DB, submits each to Sarvam, saves result |
| `init_sarvam_job()` | Creates a new Sarvam transcription job |
| `start_sarvam_job(job_id)` | Starts a previously created job |
| `poll_sarvam_status(job_id, output_url)` | Polls job status until complete; downloads and returns result JSON |
| `process_call_id(call_id, bid)` | Full flow for one call: fetch audio URL from DB → `send_to_sarvam` → `parse_diarized_transcript` → save to DB. Called by `audio_job_consumer.py` |

---

## 5. STT Providers Quick Reference

| | **Deepgram** | **Sarvam AI** |
|---|---|---|
| **Best For** | Hindi / Hindi-English mixed calls | Kannada, Tamil, Telugu, other Indian languages |
| **Model** | Nova-2 | saaras:v2.5 |
| **Mode** | Synchronous REST API | Async batch job (poll until complete) |
| **Diarization** | Built-in (min/max 2 speakers) | Built-in (`with_diarization=True, num_speakers=2`) |
| **Fallback Diarization** | `smart_speaker_split()` | N/A |
| **Translation** | Via Claude (anthropic) post-processing | Built-in (translates to English) |
| **Primary Module** | `deepgram_transcription.py` | `stt/sarvam.py` |
| **Output Format** | `STTResult` / dict with `speaker_segments` | `STTResult` with `speaker_segments` + roles |
| **Speaker Roles** | Speaker 1, Speaker 2 | speaker_0 → Agent, speaker_N → Customer |
| **Timeout** | 300s per call | 300s per job |
