Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.inya.ai/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Contact centers handling financial services, insurance, or healthcare operate under strict regulatory requirements. Agents must follow scripts, disclose specific information, and avoid prohibited language. Traditional QA reviews 2–5% of calls after the fact. By the time a violation is caught, it has already happened hundreds of times. This guide shows you how to build a system that monitors every call in real time. Audio streams to the Vachana WebSocket STT API. Transcripts arrive within milliseconds of speech completion. A compliance and quality engine processes each segment, matches against rule sets, and fires alerts to your backend — while the call is still live.
CapabilityImplementation
Live transcriptionWebSocket stream to wss://api.vachana.ai/stt/v3/stream with per-segment transcript events
Compliance detectionKeyword and phrase matching on each transcript event with configurable rule sets
Quality monitoringSilence detection, interruption tracking, escalation phrase matching from segment metadata
Real-time alertsAsync alert dispatcher — webhook, queue, or supervisor dashboard
Reconnect handlingExponential backoff with session continuity across drops
Which API to use? This use case uses the WebSocket STT API for real-time streaming. For post-call batch analysis, see the Call Analytics Pipeline which uses the Batch STT API.

Architecture

The system has three logical layers: audio ingestion, transcription, and monitoring. Each runs concurrently in an async event loop.
AUDIO SOURCE
│  (Telephony bridge / RTP tap / microphone)
│  PCM 16-bit LE, 16kHz or 8kHz, mono

AUDIO STREAMER
│  Chunks audio into 1024-byte frames (32ms @ 16kHz)
│  Maintains real-time cadence — no burst, no starvation

VACHANA WEBSOCKET STT API  wss://api.vachana.ai/stt/v3/stream
│  VAD detects speech boundaries
│  Returns: connected → processing → transcript events
│  Latency: ~300–500ms from end of speech to transcript

TRANSCRIPT HANDLER → COMPLIANCE ENGINE
                   → QUALITY ENGINE

ALERT DISPATCHER → Webhook / Queue / Supervisor dashboard
Each call owns an isolated session object that tracks the full transcript buffer, a timeline of events, compliance status, quality metrics, and reconnect context. This state survives WebSocket reconnects and is flushed to your store at call end.

Prerequisites

RequirementDetails
Vachana API keyAvailable from the Vachana dashboard. Used as the x-api-key-id header on the WebSocket connection.
Python 3.9+Required by the SDK. The full example uses asyncio, dataclasses, and typed event classes.
Audio sourcePCM 16-bit LE, mono. Either 8kHz (PSTN/legacy VoIP) or 16kHz (wideband VoIP). Defaults to 16kHz.
Alert targetAn HTTP endpoint, message queue, or Redis channel to receive alerts.
pip install gnani-vachana aiohttp python-dotenv

Authentication

Authentication is performed at connection time via HTTP headers on the WebSocket upgrade request. There is no separate auth step — the connection either opens or returns 401.
HeaderRequiredDescription
x-api-key-idYesYour Vachana API key.
lang_codeYesBCP-47 language code. Defaults to en-IN. Pass comma-separated codes for multilingual auto-detection.
x-sample-rateNoAudio sample rate in Hz. Accepted: 8000, 16000, 44100, 48000. Defaults to 16000.
x-formatNoSet transcribe for ITN (numbers, currency, dates in written form). ITN applies to hi-IN and en-IN only.
.env
GNANI_API_KEY=your-api-key-here
ALERT_WEBHOOK_URL=https://supervisor.internal/alerts
LANG_CODE=hi-IN
SAMPLE_RATE=16000
Never hardcode API keys. Load credentials from environment variables or a secrets manager. The x-api-key-id header is visible in plaintext in WebSocket upgrade logs — ensure those logs are access-controlled.

End-to-End Workflow

1

Call starts — open WebSocket connection

Your telephony bridge fires a call-start event. The monitor opens a WebSocket to wss://api.vachana.ai/stt/v3/stream with auth headers and language config. A session object is created and keyed to the call ID.
2

Receive connected event — confirm config

The server returns a connected event confirming sample rate and chunk size. Any mismatch (wrong sample rate, unsupported language) surfaces immediately.
3

Stream audio in 1024-byte frames

An async producer task reads PCM frames from the telephony tap and sends them at real-time cadence: one 1024-byte frame every 32ms for 16kHz audio. Bursting frames degrades VAD accuracy.
4

VAD triggers — receive processing event

When VAD detects end-of-speech, the server sends a processing event. Use this timestamp to measure speech-to-transcript latency and to start a silence timer in the quality engine.
5

Transcript arrives — run compliance and quality engines

The transcript event carries text, segment_index, audio_duration_ms, and latency. Both engines process the text synchronously. Alerts are dispatched async so they never block the next transcript.
6

Alerts fire — supervisor is notified

Compliance violations and quality alerts go to the alert dispatcher. Severity determines the channel: CRITICAL hits the supervisor dashboard immediately; WARNING queues for post-call review.
7

Call ends — flush session

On call end, close the WebSocket gracefully. Run final session-level checks (e.g. required disclosure was never spoken). Flush session state to your store and emit a call-complete summary event.

Connecting to the WebSocket API

The SDK’s GnaniSTTStreamClient wraps the WebSocket connection, frame pacing, and event parsing. Use it as an async context manager.
basic connection
import asyncio, os
from gnani.stt import GnaniSTTStreamClient

async def open_stream():
    async with GnaniSTTStreamClient(
        api_key=os.getenv("GNANI_API_KEY"),
        language_code="hi-IN",     # or comma-separated for auto-detect
        sample_rate=16000,
    ) as stream:
        async for event in stream:
            await handle_event(event)
Multilingual auto-detection: For multilingual contact centers, pass comma-separated codes as lang_code (e.g. hi-IN,ta-IN,en-IN). The API detects the dominant language per segment. Adds minimal latency but removes the need to pre-classify calls by language.

Streaming Audio

Audio format requirements

Property16kHz (wideband VoIP)8kHz (PSTN / legacy)
EncodingPCM signed 16-bit little-endianPCM signed 16-bit little-endian
Channels1 (mono)1 (mono)
Frame size1024 bytes (512 samples = 32ms)1024 bytes (512 samples = 64ms)
x-sample-rate160008000
Each WebSocket frame must be exactly 1024 bytes. Bursting frames (sending faster than real time) degrades VAD accuracy — the VAD model is trained on real-time cadence.
audio producer task
import asyncio

FRAME_SIZE   = 1024   # bytes — exactly 512 x 16-bit samples
FRAME_MS_16K = 0.032  # 32ms per frame at 16kHz
FRAME_MS_8K  = 0.064  # 64ms per frame at 8kHz

async def stream_audio_producer(stream, audio_source, sample_rate=16000, stop_event=None):
    frame_interval = FRAME_MS_16K if sample_rate == 16000 else FRAME_MS_8K
    buffer = bytearray()

    async for chunk in audio_source:
        if stop_event and stop_event.is_set(): break
        buffer.extend(chunk)
        while len(buffer) >= FRAME_SIZE:
            await stream.send_audio(bytes(buffer[:FRAME_SIZE]))
            buffer = buffer[FRAME_SIZE:]
            await asyncio.sleep(frame_interval)  # enforce real-time cadence

    # Flush remaining partial frame padded with silence
    if buffer:
        await stream.send_audio(bytes(buffer) + b"\x00" * (FRAME_SIZE - len(buffer)))

WebSocket Event Reference

Event typeWhen sentKey fields
connectedOnce, immediately after handshake.message, config.sample_rate, config.chunk_size, timestamp
processingEach time VAD detects end-of-speech.timestamp
transcriptAfter transcription of a VAD segment completes.text, segment_index, segment_id, audio_duration_ms, latency, timestamp
errorServer-side error, recoverable or fatal.message, timestamp
transcript event
{
  "type": "transcript",
  "timestamp": "2024-01-15T10:30:05.987Z",
  "text": "guaranteed returns milenge, bilkul risk-free hai",
  "audio_duration_ms": 2340,
  "segment_id": "seg_7f3a92",
  "segment_index": 4,
  "latency": 318
}
The latency field (milliseconds from end of speech to transcript delivery) is your primary observability metric for pipeline health. Track p50, p95, p99 per call session and alert if p95 consistently exceeds your SLA threshold.

Compliance Detection

The compliance engine runs on each transcript event. It checks segment text against three rule categories: prohibited keywords, risk phrases, and required disclosures. All checks are synchronous string operations — they complete in under 1ms per segment.
rules/compliance.json
{
  "prohibited_keywords": [
    {
      "rule_id": "PROH_001", "severity": "CRITICAL",
      "keywords": ["guaranteed returns", "guaranteed profit", "no risk", "risk-free"],
      "description": "SEBI-prohibited investment language"
    },
    {
      "rule_id": "PROH_002", "severity": "CRITICAL",
      "keywords": ["personal account", "off the books", "my account"],
      "description": "Agent directing customer to off-channel transaction"
    }
  ],
  "risk_phrases": [
    {
      "rule_id": "RISK_001", "severity": "WARNING",
      "phrases": ["cancel my policy", "close my account", "policy cancel"],
      "description": "Churn risk signal"
    },
    {
      "rule_id": "RISK_002", "severity": "WARNING",
      "phrases": ["legal action", "consumer forum", "RBI complaint", "complaint"],
      "description": "Regulatory complaint intent"
    }
  ],
  "required_disclosures": [
    {
      "rule_id": "DISC_001", "severity": "CRITICAL",
      "must_contain_one_of": ["this call is being recorded", "call recording", "recorded for quality"],
      "check_within_segments": 3,
      "description": "Recording disclosure required within first 3 segments"
    }
  ]
}
ComplianceEngine
import json
from pathlib import Path
from typing import List, Dict

class ComplianceEngine:
    def __init__(self, rules_path="rules/compliance.json"):
        rules = json.loads(Path(rules_path).read_text())
        self.prohibited   = rules.get("prohibited_keywords", [])
        self.risk_phrases = rules.get("risk_phrases", [])
        self.disclosures  = rules.get("required_disclosures", [])
        self._disclosed   = set()

    def check(self, segment) -> List[Dict]:
        text = segment.text.lower()
        hits = []

        for rule in self.prohibited:
            for kw in rule["keywords"]:
                if kw in text:
                    hits.append({"rule_id": rule["rule_id"], "severity": rule["severity"],
                                  "matched": kw, "description": rule["description"],
                                  "segment_idx": segment.segment_index, "text": segment.text})
                    break

        for rule in self.risk_phrases:
            for phrase in rule["phrases"]:
                if phrase in text:
                    hits.append({"rule_id": rule["rule_id"], "severity": rule["severity"],
                                  "matched": phrase, "description": rule["description"],
                                  "segment_idx": segment.segment_index, "text": segment.text})
                    break

        for rule in self.disclosures:
            rid = rule["rule_id"]
            if rid in self._disclosed: continue
            if any(p in text for p in rule["must_contain_one_of"]):
                self._disclosed.add(rid)
            elif segment.segment_index >= rule["check_within_segments"]:
                hits.append({"rule_id": rid, "severity": rule["severity"],
                              "matched": "MISSING_DISCLOSURE", "description": rule["description"],
                              "segment_idx": segment.segment_index, "text": ""})
                self._disclosed.add(rid)

        return hits

Quality Monitoring

rules/quality.json
{
  "silence": { "threshold_seconds": 8 },
  "escalation_phrases": [
    "transfer to supervisor", "let me escalate", "i will get my supervisor"
  ],
  "interruption": { "min_duration_ms": 300 },
  "short_segment_ms": 500
}
QualityEngine
import json
from datetime import datetime, timezone
from pathlib  import Path
from typing   import List, Dict, Optional

class QualityEngine:
    def __init__(self, rules_path="rules/quality.json"):
        rules = json.loads(Path(rules_path).read_text())
        self.silence_threshold  = rules["silence"]["threshold_seconds"]
        self.escalation_phrases = [p.lower() for p in rules["escalation_phrases"]]
        self.interruption_ms    = rules["interruption"]["min_duration_ms"]
        self.short_segment_ms   = rules["short_segment_ms"]
        self._last_processing_ts: Optional[datetime] = None

    def on_processing(self, timestamp_str: str):
        self._last_processing_ts = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))

    def check(self, session, segment) -> List[Dict]:
        now, text, events = datetime.now(timezone.utc), segment.text.lower(), []

        if session.last_segment_end:
            silence_s = (now - session.last_segment_end).total_seconds() - (segment.audio_duration_ms / 1000)
            if silence_s > self.silence_threshold:
                events.append({"event_type": "SILENCE", "severity": "WARNING",
                                "silence_s": round(silence_s, 1), "segment_idx": segment.segment_index,
                                "description": f"Silence gap of {silence_s:.1f}s detected"})

        for phrase in self.escalation_phrases:
            if phrase in text:
                events.append({"event_type": "ESCALATION", "severity": "WARNING",
                                "matched": phrase, "segment_idx": segment.segment_index,
                                "description": "Supervisor escalation signal"})
                break

        if self._last_processing_ts and segment.audio_duration_ms < self.short_segment_ms:
            gap_ms = (now - self._last_processing_ts).total_seconds() * 1000
            if gap_ms < self.interruption_ms:
                events.append({"event_type": "INTERRUPTION", "severity": "INFO",
                                "gap_ms": round(gap_ms, 1), "segment_idx": segment.segment_index,
                                "description": f"Possible interruption — {gap_ms:.0f}ms gap"})

        return events

Error Handling & Reconnect Logic

WebSocket connections drop. The reconnect loop below uses exponential backoff with full jitter and caps at a configurable maximum. Session state is preserved across reconnects using processed_indices to deduplicate segments.
reconnect loop
import asyncio, random, os
from gnani.stt import GnaniSTTStreamClient, StreamConnectionError, StreamClosedError, StreamError

MAX_RECONNECTS = 5
BASE_BACKOFF_S = 1.0
MAX_BACKOFF_S  = 30.0

async def monitor_call_with_reconnect(session, audio_source, compliance_engine, quality_engine, alert_dispatcher):
    attempt = 0

    while attempt <= MAX_RECONNECTS:
        try:
            async with GnaniSTTStreamClient(
                api_key=os.getenv("GNANI_API_KEY"),
                language_code=session.language_code,
                sample_rate=int(os.getenv("SAMPLE_RATE", "16000")),
            ) as stream:
                if attempt > 0: session.reconnect_count += 1
                attempt = 0  # reset backoff counter on successful connect

                stop_event = asyncio.Event()
                producer   = asyncio.create_task(stream_audio_producer(stream, audio_source, stop_event=stop_event))

                async for event in stream:
                    await handle_event(session, event, compliance_engine, quality_engine, alert_dispatcher)

                stop_event.set()
                await producer
                return  # clean exit

        except StreamConnectionError:
            print(f"[{session.call_id}] Auth failure. Not retrying.")
            raise

        except (StreamClosedError, ConnectionResetError, OSError) as e:
            attempt += 1
            if attempt > MAX_RECONNECTS: raise
            backoff = min(BASE_BACKOFF_S * (2 ** attempt), MAX_BACKOFF_S)
            jitter  = random.uniform(0, backoff * 0.2)
            print(f"[{session.call_id}] Reconnect {attempt}/{MAX_RECONNECTS} in {backoff+jitter:.1f}s")
            await asyncio.sleep(backoff + jitter)
ErrorCauseStrategy
StreamConnectionError401, invalid API key, unsupported language code.Do not retry. Fix config and redeploy.
StreamClosedErrorServer closed cleanly (service restart, session timeout).Retry with backoff. Session state is preserved.
ConnectionResetError / OSErrorNetwork drop, TCP reset, intermediary timeout.Exponential backoff + jitter. Cap at MAX_RECONNECTS.
StreamErrorSTT engine failure reported in an error event.Log, retry once. Flag the call for manual review on repeat failures.

Production Best Practices

Each active call runs in its own asyncio.Task. The audio producer and event consumer run concurrently within that task. Do not use threads — the WebSocket library is async-native. A single well-tuned Python process handles 100+ concurrent calls comfortably; the bottleneck is network I/O, not CPU.
Compliance and quality checks run synchronously (sub-millisecond string matching). Alert dispatch — HTTP webhooks, queue publishes, database writes — must always be fire-and-forget via asyncio.create_task(). A slow downstream system under load must never delay the next transcript event.
OptimizationImpact
Co-locate with telephony bridgeRun the monitor in the same region as the Vachana API. Cross-region adds 50–150ms RTT per frame delivery.
16kHz over 8kHz when possibleHigher accuracy transcripts mean fewer false positives in compliance matching.
Pre-compile compliance patternsCompile all regex at engine __init__. Never compile inside the hot path.
Buffer writes, not readsWrite to an in-memory session buffer. Flush to the database at call end or on CRITICAL alerts only.
MetricSource
transcript_latency_mslatency field on each transcript event. Track p50/p95/p99.
segment_countIncrement on each transcript event.
compliance_hit_rateCompliance hits / total segments per call.
silence_gap_secondsMax silence gap derived from processing event timestamps.
reconnect_countsession.reconnect_count, incremented on each reconnect.

Debugging

SymptomCauseFix
Connection immediately closes — no connected eventInvalid API key, wrong lang_code, missing required headers.Log the WebSocket close code — 4001 = auth failure.
Transcripts arrive but text is empty or garbledx-sample-rate does not match the actual audio sample rate. Audio is not mono PCM.Run ffprobe on the source. Convert stereo to mono before streaming.
VAD fires too often — sentences cut mid-utteranceFrames being burst-sent faster than real time.Enforce asyncio.sleep(frame_interval) after every send.
VAD never fires — no processing or transcript eventsAudio buffer is all zeros. Audio source is not connected.Print frame[:32].hex(). All zeros = silent source.
Compliance rules fire on unrelated textSubstring match without word boundaries.Switch to word-boundary regex. Lowercase and strip punctuation before matching.
Duplicate alerts on reconnectSession buffer re-processed after reconnect.Check segment_index in session.processed_indices before dispatching any alert.

Full Runnable Example

monitor.py
"""
monitor.py — Real-time quality and compliance monitoring pipeline.

Usage:
    GNANI_API_KEY=your-key python monitor.py --audio call.pcm --lang hi-IN
    GNANI_API_KEY=your-key python monitor.py --audio call.pcm --lang en-IN --rate 8000

Install:
    pip install gnani-vachana aiohttp python-dotenv
"""

import asyncio, json, os, random, argparse, aiohttp
from dataclasses import dataclass, field
from datetime   import datetime, timezone
from pathlib    import Path
from typing     import List, Dict, Optional, Set, AsyncIterator
from dotenv     import load_dotenv
from gnani.stt  import (
    GnaniSTTStreamClient,
    StreamConnectedEvent, StreamProcessingEvent,
    StreamTranscriptEvent, StreamErrorEvent,
    StreamConnectionError, StreamClosedError, StreamError,
)
load_dotenv()

FRAME_SIZE     = 1024
MAX_RECONNECTS = 5
BASE_BACKOFF_S = 1.0
MAX_BACKOFF_S  = 30.0
FRAME_MS_16K   = 0.032
FRAME_MS_8K    = 0.064


@dataclass
class TranscriptSegment:
    segment_index:     int
    text:              str
    audio_duration_ms: int
    latency_ms:        int
    timestamp:         datetime
    compliance_flags:  List[str] = field(default_factory=list)
    quality_flags:     List[str] = field(default_factory=list)

@dataclass
class CallSession:
    call_id:           str
    language_code:     str
    started_at:        datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    segments:          List[TranscriptSegment] = field(default_factory=list)
    last_segment_end:  Optional[datetime] = None
    reconnect_count:   int = 0
    processed_indices: Set[int] = field(default_factory=set)

    def add_segment(self, event) -> TranscriptSegment:
        seg = TranscriptSegment(
            segment_index=event.segment_index, text=event.text,
            audio_duration_ms=event.audio_duration_ms, latency_ms=event.latency,
            timestamp=datetime.now(timezone.utc),
        )
        self.segments.append(seg)
        self.processed_indices.add(event.segment_index)
        self.last_segment_end = datetime.now(timezone.utc)
        return seg


async def stream_audio_producer(stream, audio_source, stop_event: asyncio.Event):
    buffer = bytearray()
    async for chunk in audio_source:
        if stop_event.is_set(): break
        buffer.extend(chunk)
        while len(buffer) >= FRAME_SIZE:
            await stream.send_audio(bytes(buffer[:FRAME_SIZE]))
            buffer = buffer[FRAME_SIZE:]
    if buffer:
        await stream.send_audio(bytes(buffer) + b" " * (FRAME_SIZE - len(buffer)))


async def handle_event(session, event, compliance_engine, quality_engine, alert_dispatcher):
    if isinstance(event, StreamConnectedEvent):
        print(f"[{session.call_id}] Connected  sample_rate={event.sample_rate}")

    elif isinstance(event, StreamProcessingEvent):
        quality_engine.on_processing(event.timestamp)

    elif isinstance(event, StreamTranscriptEvent):
        if event.segment_index in session.processed_indices:
            return  # deduplicate across reconnects
        seg = session.add_segment(event)
        print(f"[{session.call_id}][{seg.segment_index}] {seg.text}  (latency={seg.latency_ms}ms)")
        c_hits   = compliance_engine.check(seg)
        q_events = quality_engine.check(session, seg)
        seg.compliance_flags = [h["rule_id"]     for h in c_hits]
        seg.quality_flags    = [e["event_type"]  for e in q_events]
        if c_hits or q_events:
            asyncio.create_task(alert_dispatcher.send(session, seg, c_hits + q_events))

    elif isinstance(event, StreamErrorEvent):
        raise RuntimeError(f"STT error: {event.message}")


async def monitor_call(session, audio_source, compliance_engine, quality_engine, alert_dispatcher):
    attempt = 0
    while attempt <= MAX_RECONNECTS:
        try:
            async with GnaniSTTStreamClient(
                api_key=os.getenv("GNANI_API_KEY"),
                language_code=session.language_code,
                sample_rate=int(os.getenv("SAMPLE_RATE", "16000")),
            ) as stream:
                if attempt > 0: session.reconnect_count += 1
                attempt = 0
                stop_event = asyncio.Event()
                producer   = asyncio.create_task(stream_audio_producer(stream, audio_source, stop_event))
                async for event in stream:
                    await handle_event(session, event, compliance_engine, quality_engine, alert_dispatcher)
                stop_event.set()
                await producer
                return
        except StreamConnectionError: raise
        except (StreamClosedError, ConnectionResetError, OSError) as e:
            attempt += 1
            if attempt > MAX_RECONNECTS: raise
            backoff = min(BASE_BACKOFF_S * (2 ** attempt), MAX_BACKOFF_S)
            print(f"[{session.call_id}] Reconnect {attempt}/{MAX_RECONNECTS} in {backoff:.1f}s  {e}")
            await asyncio.sleep(backoff + random.uniform(0, backoff * 0.2))
        except StreamError as e:
            attempt += 1
            print(f"[{session.call_id}] Server error: {e.message}")
            await asyncio.sleep(BASE_BACKOFF_S)


async def file_audio_source(path: str, sample_rate=16000) -> AsyncIterator[bytes]:
    frame_interval = FRAME_MS_16K if sample_rate == 16000 else FRAME_MS_8K
    with open(path, "rb") as f:
        while chunk := f.read(FRAME_SIZE):
            yield chunk
            await asyncio.sleep(frame_interval)


async def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--audio", required=True)
    parser.add_argument("--lang",  default="hi-IN")
    parser.add_argument("--rate",  default=16000, type=int)
    args = parser.parse_args()
    os.environ["SAMPLE_RATE"] = str(args.rate)

    session = CallSession(call_id="CALL_001", language_code=args.lang)
    audio   = file_audio_source(args.audio, sample_rate=args.rate)
    await monitor_call(session, audio, ComplianceEngine(), QualityEngine(), AlertDispatcher())

    duration = (datetime.now(timezone.utc) - session.started_at).total_seconds()
    print(f"Duration: {duration:.1f}s  |  Segments: {len(session.segments)}")
    print(f"Compliance hits: {sum(len(s.compliance_flags) for s in session.segments)}")
    print(f"Quality events:  {sum(len(s.quality_flags)    for s in session.segments)}")

if __name__ == "__main__":
    asyncio.run(main())

What to Build Next

  • Speaker Diarization — Separate agent and customer voices. Attribute compliance hits to the correct speaker.
  • Sentiment Analysis — Feed each segment’s text to a sentiment model. Track the sentiment arc across the call.
  • Agent Assist — On each transcript event, call an LLM with the running conversation context to surface next-best-action suggestions in real time.
  • LLM Summarisation — At call end, send the full session transcript to an LLM for structured output: issue, resolution, action items, disposition.
  • Compliance Scoring — Build a per-call compliance score (0–100) based on rule severity, frequency, and placement in the call.
Related docs: WebSocket STT API · Batch STT for post-call analysis · SDK install: pip install gnani-vachana