# Call Processing Pipeline Documentation

## Overview
This document explains the end-to-end data flow for call ingestion, transcription, and analytics. It covers how a recorded call flows from the Mcube platform into our local database, gets transcribed by our worker, gets analyzed by the language model, and finally how it appears in the UI Dashboard.

## Step-by-Step Data Flow

### 1. Call Recording in Mcube (Source DB)
Calls are initiated or received on the Mcube platform. Once completed, they are logged in the Mcube source database table: `{bid}_callhistory` along with their audio recording URL (`filename`), call duration, statuses, and agent routing details.

### 2. Orchestrator Loop Ingestion `status = 0`
A continuous background bash script (`orchestrator_loop.sh`) runs the `orchestrate_pipeline.py` Python job every 5 minutes.
*   **Watermark Logic**: The Python script queries the local MySQL database table `{bid}_raw_calls` to find the highest `call_starttime`. If no calls exist, it defaults to the start of the current day.
*   **Data Extraction**: The job fetches newly completed calls from the Mcube source database `{bid}_callhistory` that occurred *after* the watermark time and have a dialed status of "ANSWER".
*   **Local Storage**: The newly fetched calls are inserted into our local database table `{bid}_raw_calls` with `status = 0` (Ingested).

### 3. Queueing for Transcription `status = 1`
During the same 5-minute execution cycle, the `orchestrator_pipeline.py`:
*   Finds calls in `{bid}_raw_calls` where `status = 0` and the file URL is structurally valid (returns an HTTP 200).
*   **Idempotency Check**: It verifies the call doesn't already exist in `{bid}_sarvamresponse`.
*   **Message Queuing**: A message payload containing `bid`, `call_id`, and `recording_url` is pushed to a RabbitMQ message queue named `stt_jobs`.
*   **Status Update**: The call in `{bid}_raw_calls` is updated to `status = 1` (Transcription Requested).

### 4. Audio Transcription by Worker `status = 2`
A continuously running background worker (`rabbitmq_transcription_worker.py`) constantly consumes messages from the RabbitMQ `stt_jobs` queue.
*   **Processing**: When the worker receives a job, it pulls the audio file from the URL and sends it to the Sarvam STT (Speech-to-Text) transcription API, receiving an AI-generated transcript with diarized speaker segments indicating when the agent was talking vs. when the customer was talking.
*   **Result Storage**: The full transcript, number of speakers, and individual speaker segments are saved natively to `{bid}_sarvamresponse`.
*   **Pipeline Completion**: The worker updates the call in `{bid}_raw_calls` to `status = 2` (Transcription Completed). If an error occurs during transcription, the status resets to `-2`.

### 5. Final Analytics Loop `status = 3`
During subsequent 5-minute cycles, the `orchestrator_pipeline.py` script identifies calls that have completed transcription:
*   **Identification**: The script searches for calls in `{bid}_raw_calls` where `status = 2`.
*   **Data Retrieval**: The script fetches the newly created text transcript and speaker segments from `{bid}_sarvamresponse` via the `db_handler.py`.
*   **AI Analytics**: The text data is run through `CallAnalyzer` (`analyze_calls_with_parameters.py` via AWS Nova or equivalent LLM), analyzing the conversation for quality score, talk/listen ratios, objections, intent, and summary.
*   **Database Writes**:
    *   The analyzer result is written to the `{bid}_callanalytics` table, mapping all parameters to the respective `callid`.
    *   The system performs NLP classification to extract objections, calculates the global quality score, and extracts the 11 call parameters using `amazon.nova-lite-v1:0`.
    *   It stores this metadata within `1713_callanalytics` and writes a dedicated success log to `analytics_updates.log`.
*   **Logging**: A dedicated log file `analytics_updates.log` gets updated confirming a successful entry into the analytics database alongside the generated Quality Score.
*   **Complete State**: The call in `{bid}_raw_calls` is ultimately moved to `status = 3` (Analyzed).

### Dashboard API Resolution
The API endpoints responsible for serving call data (`api/calls/{bid}`) in `db_handler.py` perform a mapping of call states to calculate the true aggregate status. 

### Data Traceability (call_starttime)
To ensure calls can be traced back to their physical occurrence in Mcube, we have mapped the `call_starttime` field across all processing tables:
1. **1713_raw_calls**: Stores the original start time from Mcube.
2. **1713_sarvamresponse**: Inherits the `call_starttime` from the raw table during transcription insertion.
3. **1713_callanalytics**: Inherits the `call_starttime` from the raw table during analytics insertion.

This ensures that even though `created_at` reflects when the system processed the data, the actual call time is always available for auditing and dashboard rendering.

## System Files Involved
- `orchestrate_pipeline.py`: The main controller. Checks state, pushes payloads, triggers analytics.

### 6. User Interface Dashboard Synchronization
The local User Interface Dashboard fetches and visualizes this data continuously.
*   The dashboard backend relies on `db_handler.py`.
*   When rendering the transcript interface on the client (`http://10.40.180.74:6174/#/`), the `get_call_transcript()` API explicitly queries the `{bid}_sarvamresponse` table by `callid`, returning the `transcript` and `speaker_segments` JSON cleanly matching the data created in Step 4. All analytical metadata is joined effectively using the call's unique ID pulling from the local MySQL database instance.
