# Orchestrate Pipeline Documentation

This document explains the function and implementation of `orchestrate_pipeline.py`, which serves as the unified orchestrator for call ingestion, transcription, and analytics.

## 1. Overview
The orchestrator "stitches" together four separate pipeline stages:
1.  **Data Ingestion**: Fetching new call records from a source database and inserting them into the local raw calls table.
2.  **Ingestion Verification**: Identifying valid calls from the raw data.
3.  **Transcription Management**: Triggering and monitoring speech-to-text jobs.
4.  **Analytics Automation**: Running AI-driven quality analysis once transcription is available.

## 2. Core Logic Flow

The `Orchestrator` follow a fully asynchronous state machine based on the `status` field in the `{bid}_raw_calls` table:

*   `status = 0`: Ingested, ready for transcription.
*   `status = 1`: Transcription requested and queued in RabbitMQ.
*   `status = 2`: Transcription completed by worker, transcript available in `{bid}_sarvamresponse`.
*   `status = 3`: Analytics completed.

```mermaid
graph TD
    Z[Ingest from Source DB] --> A[Scan Raw Calls status = 0]
    A --> B{Valid Audio URL?}
    B -- Yes --> C[Publish Job to RabbitMQ & set status=1]
    B -- No --> D[Skip Call & set status=-1]
    
    C --> |Async Worker| E[Transcribe Audio]
    E --> F[Insert to sarvamresponse & set status=2]
    
    G[Scan Raw Calls status = 2] --> H[Trigger Analytics]
    H --> I[Store Analytics & set status=3]
```

## 3. Class: `Orchestrator`

### Initialization
```python
orch = Orchestrator(bid="1713")
```
- **`bid`**: The Business ID used to scope table names (e.g., `1713_raw_calls`).
- **`config_wrapped`**: A helper that ensures compatibility between the project's `Config` object and the expected interfaces of `DatabaseHandler` and `CallAnalyzer`.

### Key Methods

#### `ingest_calls(limit=10)`
- Identifies the latest `call_starttime` (watermark) in the local `{bid}_raw_calls` table.
- Connects to the synced source database (`SYNC_SOURCE_DB_*` config).
- Fetches new answered calls from `{bid}_callhistory` that occurred after the watermark.
- Inserts these new records into the local `{bid}_raw_calls` table with a status of `0` (ingested) and transcription status of `not_requested`.

#### `trigger_transcription(call)`
- Checks idempotency: if `{bid}_sarvamresponse` already has the call documented, it skips queueing and flags it as `status = 2`.
- Publishes a job payload to the RabbitMQ queue (`stt_jobs`).
- Updates the local raw call record to `status = 1`.
- **Dependencies**: `pika` (RabbitMQ), `pymysql`.

#### `trigger_analytics(call_id)`
- Fetches transcript natively from `{bid}_sarvamresponse`.
- Uses the `CallAnalyzer` class to process the transcript.
- Communicates with AWS Nova (via Bedrock) to score the call based on predefined parameters.
- Saves the resulting scores and sentiment to `{bid}_callanalytics`.
- Once finished, updates the local raw call record to `status = 3`.

#### `run(limit=10)`
- The main entry point.
- Calls `ingest_calls()` to sync new calls from the source database.
- **Phase 1 (Transcription Queueing)**: Fetches up to `limit` un-processed calls (status = 0) from `{bid}_raw_calls`. Validates external audio URLs over HTTP, validates `/recording-uploads/` manual uploads against the local `recording_uploads/` directory, setting invalid ones to status = -1. Triggers transcription for valid calls.
- **Phase 2 (Analytics Triggering)**: Fetches up to `limit` transcribed calls (status = 2) from `{bid}_raw_calls`. Runs `.trigger_analytics()` on them.

## 4. Database Schema Interaction

The script effectively bridges data through these tables using the `status` flag:
1.  **`{bid}_callhistory`** (Source DB): The original source of truth for new calls.
2.  **`{bid}_raw_calls`**: Local storage for ingested calls and state tracking (`status`).
3.  **`{bid}_sarvamresponse`**: Destination for STT Worker generated transcripts.
4.  **`{bid}_callanalytics`**: Destination for final AI analysis results.

## 5. Usage

To run the orchestrator for a specific business:

```bash
python3 orchestrate_pipeline.py --bid 1713 --limit 10
```

### Configuration
The script relies on a `.env` file for:
- Database credentials (`DB_HOST`, `DB_USER`, `DB_PASSWORD`).
- RabbitMQ host (`RABBITMQ_HOST`).
- AWS credentials (used by the `CallAnalyzer` for Nova Lite).
- `PUBLIC_BASE_URL` for generating public URLs for manually uploaded recordings, for example `https://pca.syntheon.in`.

## 6. Error Handling
- **URL Validation**: External recording URLs must return HTTP 200 after redirects. Manual upload URLs under `/recording-uploads/` are checked locally; if the stored file is missing or empty, the call is marked as `status = -1` (invalid/skipped).
- **Idempotency**: Prevents double queueing and transcription by checking `{bid}_sarvamresponse`.
- **Worker Errors**: If transcription fails inside the worker, it marks the call as `status = -2` to be potentially re-queued or researched.
