Files
B.Tech-Project-III/thirdeye/docs/Voice_milestones.md
2026-04-05 00:43:23 +05:30

56 KiB
Raw Permalink Blame History

ThirdEye — Voice Message Intelligence Milestones (20→22)

Prerequisite: Milestones 019 must be COMPLETE and PASSING. This feature layers on top of the existing working system. Same rule: Do NOT skip milestones. Do NOT skip tests. Every test must PASS before moving to the next milestone.


WHAT THIS ADDS

Telegram groups are full of voice notes. Nobody processes them. Every other "chat intelligence" product is text-only. ThirdEye fixes this:

  1. A Groq Whisper transcription client — uses the whisper-large-v3 model already in your provider stack. Zero new API keys. Zero new cost.
  2. A Telegram voice handler — detects voice messages (and round video notes) in any monitored group, downloads the audio, transcribes it, and runs the transcript through the exact same signal extraction pipeline as text messages.
  3. Voice attribution — every signal extracted from a voice note carries full provenance: who said it, when, duration, and the raw transcript. When /ask answers a question using a voice-sourced signal, it cites it: "Based on what @Raj said in a voice note on March 14th...". Plus a /voicelog command to list every voice note decision your team has ever made.

The integration is seamless. Voice transcripts become first-class signals in your existing knowledge graph alongside chat, documents, links, and Meet recordings. The pipeline does not care about the source — it just sees text.

The demo line: "Most teams make their most important decisions in voice notes. Until now, those decisions were invisible to every tool on the market. ThirdEye is the first to change that."


PRE-WORK: Dependencies & Config Updates

Step 0.1 — No new pip packages needed

Groq Whisper uses the same GROQ_API_KEY already in your .env and the same httpx already installed. The Telegram bot already downloads files via python-telegram-bot. There is literally nothing new to install.

Verify httpx is present (it will be):

python -c "import httpx; print('httpx OK:', httpx.__version__)"

Step 0.2 — Add new env vars

Append to thirdeye/.env:

# Voice Message Intelligence (Milestone 20)
ENABLE_VOICE_TRANSCRIPTION=true
VOICE_MAX_DURATION_SECONDS=300        # Skip voice notes longer than 5 minutes (too long for free tier)
VOICE_MIN_DURATION_SECONDS=2          # Skip accidental sub-2-second recordings
VOICE_LANGUAGE=                       # Optional: force a language code e.g. "hi", "en". Leave empty for auto-detect.
VOICE_STORE_TRANSCRIPT=true           # Store the raw transcript text in ChromaDB for full-text search

Step 0.3 — Update config.py

Add these lines at the bottom of thirdeye/backend/config.py:

# Voice Message Intelligence
ENABLE_VOICE_TRANSCRIPTION = os.getenv("ENABLE_VOICE_TRANSCRIPTION", "true").lower() == "true"
VOICE_MAX_DURATION_SECONDS = int(os.getenv("VOICE_MAX_DURATION_SECONDS", "300"))
VOICE_MIN_DURATION_SECONDS = int(os.getenv("VOICE_MIN_DURATION_SECONDS", "2"))
VOICE_LANGUAGE = os.getenv("VOICE_LANGUAGE", "")          # empty string = Whisper auto-detects
VOICE_STORE_TRANSCRIPT = os.getenv("VOICE_STORE_TRANSCRIPT", "true").lower() == "true"

MILESTONE 20: Groq Whisper Transcription Client (150%)

Goal: A focused, robust async function that takes raw audio bytes (any format Telegram sends — OGG/Opus for voice, MP4 for video notes) and returns a clean transcript string. Uses Groq's free whisper-large-v3 endpoint. Handles rate limits, empty audio, and network failures gracefully. No new API keys. No new dependencies.

Step 20.1 — Create the transcription module

Create file: thirdeye/backend/agents/voice_transcriber.py

"""
Voice Transcriber — Groq Whisper integration.

Uses Groq's whisper-large-v3 model (free, already in provider stack) to transcribe
audio bytes from Telegram voice messages and video notes into plain text.

Groq Whisper endpoint: https://api.groq.com/openai/v1/audio/transcriptions
Supported formats: flac, mp3, mp4, mpeg, mpga, m4a, ogg, opus, wav, webm
Telegram voice messages: OGG/Opus
Telegram video notes:    MP4

Free tier limits: 7,200 seconds of audio / hour on Groq free plan.
At avg 30s per voice note: ~240 voice notes / hour — more than any team sends.
"""
import io
import logging
from typing import Optional

import httpx

from backend.config import (
    GROQ_API_KEY,
    VOICE_LANGUAGE,
    VOICE_MAX_DURATION_SECONDS,
    VOICE_MIN_DURATION_SECONDS,
)

logger = logging.getLogger("thirdeye.agents.voice_transcriber")

GROQ_WHISPER_URL = "https://api.groq.com/openai/v1/audio/transcriptions"
WHISPER_MODEL = "whisper-large-v3"

# Groq file size limit for Whisper: 25 MB
GROQ_MAX_FILE_BYTES = 25 * 1024 * 1024


# --- Main transcription function ---------------------------------------------

async def transcribe_audio(
    audio_bytes: bytes,
    filename: str = "audio.ogg",
    duration_seconds: int = None,
) -> dict:
    """
    Transcribe audio bytes using Groq Whisper.

    Args:
        audio_bytes:       Raw audio data (OGG, MP4, WAV, etc.)
        filename:          Filename hint for the API (determines format detection)
        duration_seconds:  Voice message duration from Telegram metadata (for pre-filtering)

    Returns:
        {
            "ok": True,
            "transcript": "The full transcribed text...",
            "language": "en",
            "duration": 45,
            "word_count": 120,
        }
        OR on failure:
        {
            "ok": False,
            "reason": "too_long" | "too_short" | "empty" | "file_too_large" | "api_error" | "no_speech",
            "error": "optional error string",
        }
    """
    # Pre-flight checks
    if not GROQ_API_KEY or len(GROQ_API_KEY) < 5:
        return {"ok": False, "reason": "api_error", "error": "GROQ_API_KEY not set"}

    if not audio_bytes:
        return {"ok": False, "reason": "empty", "error": "No audio bytes received"}

    if len(audio_bytes) > GROQ_MAX_FILE_BYTES:
        return {
            "ok": False,
            "reason": "file_too_large",
            "error": f"Audio is {len(audio_bytes) / 1024 / 1024:.1f}MB — Groq limit is 25MB",
        }

    if duration_seconds is not None:
        if duration_seconds < VOICE_MIN_DURATION_SECONDS:
            return {
                "ok": False,
                "reason": "too_short",
                "error": f"Voice note is {duration_seconds}s — minimum is {VOICE_MIN_DURATION_SECONDS}s",
            }
        if duration_seconds > VOICE_MAX_DURATION_SECONDS:
            return {
                "ok": False,
                "reason": "too_long",
                "error": f"Voice note is {duration_seconds}s — maximum is {VOICE_MAX_DURATION_SECONDS}s",
            }

    # Determine MIME type from filename extension
    ext_to_mime = {
        ".ogg": "audio/ogg",
        ".opus": "audio/ogg",
        ".mp3": "audio/mpeg",
        ".mp4": "video/mp4",
        ".m4a": "audio/mp4",
        ".wav": "audio/wav",
        ".flac": "audio/flac",
        ".webm": "audio/webm",
    }
    ext = "." + filename.rsplit(".", 1)[-1].lower() if "." in filename else ".ogg"
    mime_type = ext_to_mime.get(ext, "audio/ogg")

    form_data = {
        "model": WHISPER_MODEL,
        "response_format": "verbose_json",   # returns language detection
        "temperature": "0",                  # deterministic transcription
    }
    if VOICE_LANGUAGE:
        form_data["language"] = VOICE_LANGUAGE

    try:
        async with httpx.AsyncClient(timeout=60.0) as client:
            resp = await client.post(
                GROQ_WHISPER_URL,
                headers={"Authorization": f"Bearer {GROQ_API_KEY}"},
                files={"file": (filename, io.BytesIO(audio_bytes), mime_type)},
                data=form_data,
            )
            resp.raise_for_status()
            data = resp.json()

    except httpx.HTTPStatusError as e:
        error_text = ""
        try:
            error_text = e.response.json().get("error", {}).get("message", e.response.text[:200])
        except Exception:
            error_text = e.response.text[:200]

        if e.response.status_code == 429:
            logger.warning("Groq Whisper rate limited")
            return {"ok": False, "reason": "api_error", "error": "Rate limited — try again shortly"}
        logger.error(f"Groq Whisper HTTP error {e.response.status_code}: {error_text}")
        return {"ok": False, "reason": "api_error", "error": f"HTTP {e.response.status_code}: {error_text}"}

    except httpx.TimeoutException:
        logger.warning("Groq Whisper request timed out")
        return {"ok": False, "reason": "api_error", "error": "Request timed out after 60s"}

    except Exception as e:
        logger.error(f"Groq Whisper unexpected error: {e}")
        return {"ok": False, "reason": "api_error", "error": str(e)}

    # Parse response
    transcript = (data.get("text") or "").strip()

    if not transcript:
        return {"ok": False, "reason": "no_speech", "error": "Whisper returned empty transcript"}

    # Detect if Whisper only returned noise markers
    noise_patterns = {"[music]", "[noise]", "[silence]", "[inaudible]", "(music)", "(noise)"}
    if transcript.lower() in noise_patterns:
        return {"ok": False, "reason": "no_speech", "error": f"Only noise detected: {transcript}"}

    detected_language = data.get("language", VOICE_LANGUAGE or "unknown")
    word_count = len(transcript.split())

    logger.info(
        f"Whisper transcribed {duration_seconds or '?'}s audio -> "
        f"{word_count} words [{detected_language}]: {transcript[:60]}..."
    )

    return {
        "ok": True,
        "transcript": transcript,
        "language": detected_language,
        "duration": duration_seconds,
        "word_count": word_count,
    }


# --- Telegram-specific download helper ---------------------------------------

async def download_telegram_audio(bot, file_id: str) -> bytes:
    """
    Download a Telegram file (voice or video_note) and return raw bytes.
    """
    tg_file = await bot.get_file(file_id)
    audio_bytes = await tg_file.download_as_bytearray()
    return bytes(audio_bytes)


def format_duration(seconds: int) -> str:
    """Format seconds into human-readable string: '1m 34s' or '45s'."""
    if seconds is None:
        return "?"
    if seconds >= 60:
        return f"{seconds // 60}m {seconds % 60}s"
    return f"{seconds}s"

TEST MILESTONE 20

Create file: thirdeye/scripts/test_m20.py

"""
Test Milestone 20: Groq Whisper transcription client.

Note: Full transcription tests require real audio bytes.
We test pre-flight filters and API reachability here.
Silent/near-silent audio will return "no_speech" — that is correct behaviour.
To test with real speech: record a short voice note and save as
thirdeye/scripts/test_voice.ogg before running this test.
"""
import asyncio
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))


def _make_minimal_ogg() -> bytes:
    """
    Generate a minimal valid OGG container header (silent).
    Whisper will return no_speech for this — that IS the correct result.
    We use it to confirm the API is reachable and credentials work.
    """
    ogg_magic = b"OggS"
    header = b"\x00\x02" + b"\x00" * 8 + b"\x00\x00\x00\x01" + b"\x00\x00\x00\x00" + b"\x00\x00\x00\x00" + b"\x01\x1e"
    vorbis_id = b"\x01vorbis" + b"\x00" * 23
    return ogg_magic + header + vorbis_id


async def test_config_loaded():
    """Test that GROQ_API_KEY is present (needed for Whisper)."""
    from backend.config import GROQ_API_KEY, ENABLE_VOICE_TRANSCRIPTION

    print("Testing voice transcription config...")
    assert GROQ_API_KEY and len(GROQ_API_KEY) > 5, (
        "GROQ_API_KEY is missing. Groq Whisper uses the same key as your LLM providers."
    )
    print(f"  ✅ GROQ_API_KEY present ({len(GROQ_API_KEY)} chars)")
    print(f"  ✅ ENABLE_VOICE_TRANSCRIPTION: {ENABLE_VOICE_TRANSCRIPTION}")


async def test_pre_flight_filters():
    """Test that duration and size filters work before hitting the API."""
    from backend.agents.voice_transcriber import transcribe_audio

    print("\nTesting pre-flight filters (no API calls made)...")

    result = await transcribe_audio(b"", filename="audio.ogg")
    assert not result["ok"] and result["reason"] == "empty"
    print("  ✅ Empty bytes -> reason='empty'")

    result = await transcribe_audio(b"fake", filename="audio.ogg", duration_seconds=1)
    assert not result["ok"] and result["reason"] == "too_short"
    print("  ✅ 1s audio -> reason='too_short' (min is 2s)")

    result = await transcribe_audio(b"fake", filename="audio.ogg", duration_seconds=9999)
    assert not result["ok"] and result["reason"] == "too_long"
    print("  ✅ 9999s audio -> reason='too_long' (max is 300s)")

    big_bytes = b"x" * (26 * 1024 * 1024)
    result = await transcribe_audio(big_bytes, filename="audio.ogg", duration_seconds=30)
    assert not result["ok"] and result["reason"] == "file_too_large"
    print("  ✅ 26MB audio -> reason='file_too_large' (Groq limit is 25MB)")


async def test_api_reachable():
    """
    Test that Groq Whisper API is reachable and authenticates correctly.
    A 401 means your GROQ_API_KEY is wrong.
    """
    from backend.agents.voice_transcriber import transcribe_audio

    print("\nTesting Groq Whisper API reachability...")
    minimal_ogg = _make_minimal_ogg()
    result = await transcribe_audio(minimal_ogg, filename="test.ogg", duration_seconds=5)

    if result["ok"]:
        print(f"  ✅ API reachable — transcript: '{result['transcript'][:60]}'")
    elif result["reason"] == "no_speech":
        print(f"  ✅ API reachable — silent audio correctly returned no_speech")
    elif result["reason"] == "api_error" and "401" in result.get("error", ""):
        raise AssertionError(
            f"Authentication failed — check GROQ_API_KEY in .env\nError: {result['error']}"
        )
    else:
        print(f"  ⚠️ API returned: reason={result['reason']}, error={result.get('error')} (non-fatal)")


async def test_real_audio_file():
    """
    Test with a real OGG voice file if one exists at scripts/test_voice.ogg.
    OPTIONAL — skip if file not present.
    """
    from backend.agents.voice_transcriber import transcribe_audio

    test_file = os.path.join(os.path.dirname(__file__), "test_voice.ogg")
    if not os.path.exists(test_file):
        print("\n  ⏭️  Skipping real audio test — place a voice note OGG at scripts/test_voice.ogg to enable")
        return

    print(f"\nTesting with real audio file: {test_file}")
    with open(test_file, "rb") as f:
        audio_bytes = f.read()

    result = await transcribe_audio(audio_bytes, filename="test_voice.ogg", duration_seconds=30)
    assert result["ok"], f"Real audio transcription failed: {result}"
    assert len(result["transcript"]) > 5
    print(f"  ✅ Transcript ({result['word_count']} words): {result['transcript'][:120]}...")
    print(f"     Language detected: {result['language']}")


async def test_format_duration():
    """Test the duration formatting helper."""
    from backend.agents.voice_transcriber import format_duration

    print("\nTesting format_duration()...")
    assert format_duration(45) == "45s"
    assert format_duration(90) == "1m 30s"
    assert format_duration(0) == "0s"
    assert format_duration(None) == "?"
    print("  ✅ 45 -> '45s', 90 -> '1m 30s', None -> '?'")


async def main():
    print("Running Milestone 20 tests...\n")
    await test_config_loaded()
    await test_pre_flight_filters()
    await test_api_reachable()
    await test_real_audio_file()
    await test_format_duration()
    print("\n🎉 MILESTONE 20 PASSED — Groq Whisper client working")


asyncio.run(main())

Run: cd thirdeye && python scripts/test_m20.py

Expected output:

  ✅ GROQ_API_KEY present (56 chars)
  ✅ ENABLE_VOICE_TRANSCRIPTION: True
  ✅ Empty bytes -> reason='empty'
  ✅ 1s audio -> reason='too_short'
  ✅ 9999s audio -> reason='too_long'
  ✅ 26MB audio -> reason='file_too_large'
  ✅ API reachable — silent audio correctly returned no_speech
  ⏭️  Skipping real audio test — place a voice note OGG at scripts/test_voice.ogg to enable
  ✅ 45 -> '45s', 90 -> '1m 30s', None -> '?'

🎉 MILESTONE 20 PASSED — Groq Whisper client working

MILESTONE 21: Telegram Voice Handler + Pipeline Integration (155%)

Goal: A new Telegram message handler that fires whenever a voice message or round video note is sent to any monitored group. It downloads the audio, transcribes it, and feeds the transcript directly into the existing process_message_batch pipeline — with full voice metadata attached so every extracted signal knows it came from a voice note. The group gets a lightweight acknowledgement. Nothing else changes in the pipeline.

Step 21.1 — Create the voice handler orchestrator

Create file: thirdeye/backend/agents/voice_handler.py

"""
Voice Handler
Orchestrates the full pipeline for Telegram voice messages and video notes:

  Telegram voice/video_note message
    -> download audio bytes
    -> transcribe via Groq Whisper (voice_transcriber.py)
    -> build a voice_transcript signal (stored raw for full-text search)
    -> run transcript through process_message_batch (signal extraction)
    -> all extracted signals carry voice attribution metadata

Voice metadata attached to every extracted signal:
  source:           "voice"
  voice_file_id:    Telegram file ID
  voice_duration:   seconds
  speaker:          sender display name
"""
import logging
import uuid
from datetime import datetime, timezone

from backend.agents.voice_transcriber import (
    transcribe_audio, download_telegram_audio, format_duration
)
from backend.config import ENABLE_VOICE_TRANSCRIPTION, VOICE_STORE_TRANSCRIPT
from backend.db.chroma import store_signals
from backend.pipeline import process_message_batch

logger = logging.getLogger("thirdeye.agents.voice_handler")


# --- Voice transcript signal builder -----------------------------------------

def build_voice_transcript_signal(
    transcript: str,
    sender: str,
    group_id: str,
    voice_file_id: str,
    duration_seconds: int,
    language: str,
    timestamp: str,
) -> dict:
    """
    Build a voice_transcript signal that stores the full raw transcription.
    Always stored alongside extracted signals so the full transcript is
    searchable in ChromaDB even if no structured signals were extracted.
    """
    return {
        "id": str(uuid.uuid4()),
        "type": "voice_transcript",
        "summary": f"[Voice {format_duration(duration_seconds)}] @{sender}: {transcript[:200]}",
        "raw_quote": transcript,
        "severity": "low",
        "status": "transcribed",
        "sentiment": "neutral",
        "urgency": "none",
        "entities": [f"@{sender}"],
        "keywords": _extract_voice_keywords(transcript),
        "timestamp": timestamp,
        "group_id": group_id,
        "lens": "voice",
        "source": "voice",
        "voice_file_id": voice_file_id,
        "voice_duration": duration_seconds,
        "voice_language": language,
        "speaker": sender,
    }


def _extract_voice_keywords(text: str) -> list[str]:
    """Simple keyword extraction from transcript text."""
    stopwords = {
        "the", "a", "an", "is", "are", "was", "were", "will", "to", "of",
        "in", "on", "at", "for", "by", "with", "this", "that", "and", "or",
        "but", "we", "i", "it", "be", "do", "have", "has", "had", "not",
        "so", "just", "like", "yeah", "okay", "um", "uh", "you", "me",
    }
    words = text.lower().split()
    keywords = [w.strip(".,!?;:\"'") for w in words if len(w) > 3 and w not in stopwords]
    return list(dict.fromkeys(keywords))[:12]


def _inject_voice_metadata(signals: list[dict], voice_meta: dict) -> list[dict]:
    """
    Inject voice attribution into every signal extracted from a voice transcript.
    This ensures /ask can cite the voice source in its answers.
    """
    for signal in signals:
        signal["source"] = "voice"
        signal["voice_file_id"] = voice_meta.get("voice_file_id", "")
        signal["voice_duration"] = voice_meta.get("duration_seconds", 0)
        signal["voice_language"] = voice_meta.get("language", "")
        signal["speaker"] = voice_meta.get("sender", "Unknown")
        if "[Voice]" not in signal.get("summary", ""):
            signal["summary"] = f"[Voice @{voice_meta.get('sender', '?')}] {signal['summary']}"
    return signals


# --- Main handler ------------------------------------------------------------

async def handle_voice_message(
    bot,
    group_id: str,
    sender: str,
    file_id: str,
    duration_seconds: int,
    message_date,
    is_video_note: bool = False,
) -> dict:
    """
    Full pipeline for a single voice or video note message.

    Returns:
        {"ok": True, "transcript": "...", "signals_extracted": 3, "duration": 45, ...}
        OR {"ok": False, "reason": "...", "error": "..."}
    """
    if not ENABLE_VOICE_TRANSCRIPTION:
        return {"ok": False, "reason": "disabled", "error": "Voice transcription is disabled"}

    msg_type = "video note" if is_video_note else "voice message"
    logger.info(f"Processing {msg_type} from {sender} in {group_id} ({duration_seconds}s)")

    # 1. Download audio
    try:
        audio_bytes = await download_telegram_audio(bot, file_id)
    except Exception as e:
        logger.error(f"Failed to download audio from {sender}: {e}")
        return {"ok": False, "reason": "download_failed", "error": str(e)}

    # 2. Transcribe
    filename = "audio.mp4" if is_video_note else "audio.ogg"
    transcription = await transcribe_audio(
        audio_bytes,
        filename=filename,
        duration_seconds=duration_seconds,
    )

    if not transcription["ok"]:
        logger.info(f"Transcription skipped for {sender}: {transcription['reason']}")
        return {"ok": False, "reason": transcription["reason"], "error": transcription.get("error", "")}

    transcript = transcription["transcript"]
    language = transcription.get("language", "unknown")
    timestamp = (
        message_date.replace(tzinfo=timezone.utc).isoformat()
        if message_date else datetime.utcnow().isoformat()
    )

    # 3. Store raw voice transcript signal
    if VOICE_STORE_TRANSCRIPT:
        transcript_signal = build_voice_transcript_signal(
            transcript=transcript,
            sender=sender,
            group_id=group_id,
            voice_file_id=file_id,
            duration_seconds=duration_seconds,
            language=language,
            timestamp=timestamp,
        )
        store_signals(group_id, [transcript_signal])
        logger.info(f"Voice transcript stored for {sender} ({len(transcript)} chars)")

    # 4. Run through signal extraction pipeline — treat as a regular text message
    voice_meta = {
        "sender": sender,
        "voice_file_id": file_id,
        "duration_seconds": duration_seconds,
        "language": language,
    }

    messages = [{
        "sender": sender,
        "text": transcript,
        "timestamp": timestamp,
        "source": "voice",
        "voice_file_id": file_id,
        "voice_duration": duration_seconds,
    }]

    try:
        extracted_signals = await process_message_batch(group_id, messages)
        extracted_signals = _inject_voice_metadata(extracted_signals, voice_meta)
        signals_count = len(extracted_signals)
    except Exception as e:
        logger.error(f"Signal extraction failed for voice from {sender}: {e}")
        signals_count = 0

    logger.info(
        f"Voice pipeline complete: {sender}, {duration_seconds}s, "
        f"{signals_count} signals, transcript={len(transcript)} chars"
    )

    return {
        "ok": True,
        "transcript": transcript,
        "signals_extracted": signals_count,
        "duration": duration_seconds,
        "sender": f"@{sender}",
        "language": language,
    }

Step 21.2 — Add voice and video_note handlers to commands.py

Add to thirdeye/backend/bot/commands.py:

# -----------------------------------------------------------------
#  Voice Message Handlers — add to commands.py
# -----------------------------------------------------------------

async def handle_voice_telegram(update, context):
    """
    Fires for every voice message sent to a monitored group.
    Downloads, transcribes via Groq Whisper, feeds into signal pipeline.
    """
    from backend.agents.voice_handler import handle_voice_message
    from backend.config import ENABLE_VOICE_TRANSCRIPTION

    if not ENABLE_VOICE_TRANSCRIPTION:
        return

    msg = update.message
    if not msg or not msg.voice:
        return

    group_id = str(msg.chat_id)
    sender = (msg.from_user.full_name or msg.from_user.username or "Unknown")
    voice = msg.voice
    duration = voice.duration or 0

    # React with 👂 immediately so team knows ThirdEye is processing
    try:
        await msg.set_reaction("👂")
    except Exception:
        pass  # Reactions need bot admin rights — fail silently

    result = await handle_voice_message(
        bot=context.bot,
        group_id=group_id,
        sender=sender,
        file_id=voice.file_id,
        duration_seconds=duration,
        message_date=msg.date,
        is_video_note=False,
    )

    if result["ok"]:
        signals = result["signals_extracted"]
        reply = (
            f"🎤 *{sender}* ({duration}s) — transcribed\n"
            f"_{result['transcript'][:120]}{'...' if len(result['transcript']) > 120 else ''}_\n"
            f"`{signals} signal{'s' if signals != 1 else ''} extracted`"
        )
        await msg.reply_text(reply, parse_mode="Markdown")
    elif result["reason"] == "too_long":
        await msg.reply_text(
            f"⏭️ Voice note from *{sender}* skipped — too long ({duration}s).",
            parse_mode="Markdown",
        )
    elif result["reason"] in ("no_speech", "too_short", "empty"):
        pass  # Silent skip
    else:
        logger.warning(f"Voice error for {sender}: {result.get('error')}")


async def handle_video_note_telegram(update, context):
    """
    Fires for round video messages (video notes).
    Same pipeline as voice messages — also contain audio.
    """
    from backend.agents.voice_handler import handle_voice_message
    from backend.config import ENABLE_VOICE_TRANSCRIPTION

    if not ENABLE_VOICE_TRANSCRIPTION:
        return

    msg = update.message
    if not msg or not msg.video_note:
        return

    group_id = str(msg.chat_id)
    sender = (msg.from_user.full_name or msg.from_user.username or "Unknown")
    video_note = msg.video_note
    duration = video_note.duration or 0

    try:
        await msg.set_reaction("👂")
    except Exception:
        pass

    result = await handle_voice_message(
        bot=context.bot,
        group_id=group_id,
        sender=sender,
        file_id=video_note.file_id,
        duration_seconds=duration,
        message_date=msg.date,
        is_video_note=True,
    )

    if result["ok"]:
        signals = result["signals_extracted"]
        reply = (
            f"📹 *{sender}* ({duration}s video note) — transcribed\n"
            f"_{result['transcript'][:120]}{'...' if len(result['transcript']) > 120 else ''}_\n"
            f"`{signals} signal{'s' if signals != 1 else ''} extracted`"
        )
        await msg.reply_text(reply, parse_mode="Markdown")

Step 21.3 — Register handlers in bot.py

In thirdeye/backend/bot/bot.py, add these two MessageHandler registrations after your existing text handler:

from telegram.ext import MessageHandler, filters
from backend.bot.commands import handle_voice_telegram, handle_video_note_telegram

application.add_handler(
    MessageHandler(filters.VOICE & ~filters.COMMAND, handle_voice_telegram)
)
application.add_handler(
    MessageHandler(filters.VIDEO_NOTE & ~filters.COMMAND, handle_video_note_telegram)
)

TEST MILESTONE 21

Create file: thirdeye/scripts/test_m21.py

"""
Test Milestone 21: Voice handler pipeline integration.
Uses synthetic transcript text to avoid needing real audio in CI.
"""
import asyncio
import os
import sys
import uuid
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))

TRANSCRIPT_ARCHITECTURE = """
So I just wanted to quickly explain the architecture decision. We've been going
back and forth on the database and I think we should just go with PostgreSQL.
The main reason is Raj already knows it and we have less than two weeks to ship.
Final decision — PostgreSQL. Raj can you start the schema by Thursday?
"""

TRANSCRIPT_BLOCKER = """
The thing I wanted to flag is the design specs are still not done. I've been
waiting for two weeks and I literally cannot start the dashboard without them.
This is a hard blocker. If I don't get the specs by Wednesday we'll miss Friday.
"""


async def test_voice_transcript_signal_builder():
    """Test that the voice transcript signal is correctly structured."""
    from backend.agents.voice_handler import build_voice_transcript_signal

    print("Testing voice transcript signal builder...")
    signal = build_voice_transcript_signal(
        transcript=TRANSCRIPT_ARCHITECTURE.strip(),
        sender="Raj",
        group_id="test_voice_m21",
        voice_file_id="fake_file_id_123",
        duration_seconds=45,
        language="en",
        timestamp="2026-03-21T10:00:00Z",
    )

    assert signal["type"] == "voice_transcript"
    assert signal["source"] == "voice"
    assert signal["speaker"] == "Raj"
    assert "@Raj" in signal["entities"]
    assert signal["voice_duration"] == 45
    assert signal["voice_language"] == "en"
    assert len(signal["raw_quote"]) > 50   # full transcript stored
    assert len(signal["keywords"]) > 0
    print(f"  ✅ type: {signal['type']}, source: {signal['source']}, speaker: {signal['speaker']}")
    print(f"  ✅ keywords: {signal['keywords'][:5]}")
    print(f"  ✅ summary: {signal['summary'][:100]}")


async def test_voice_metadata_injection():
    """Test that voice metadata is injected into extracted signals."""
    from backend.agents.voice_handler import _inject_voice_metadata

    print("\nTesting voice metadata injection...")
    raw_signals = [
        {"id": "1", "type": "architecture_decision", "summary": "Use PostgreSQL", "severity": "medium"},
        {"id": "2", "type": "action_item", "summary": "Raj to set up schema by Thursday", "severity": "medium"},
    ]
    voice_meta = {"sender": "Raj", "voice_file_id": "file_abc123", "duration_seconds": 45, "language": "en"}

    enriched = _inject_voice_metadata(raw_signals, voice_meta)
    for sig in enriched:
        assert sig["source"] == "voice"
        assert sig["speaker"] == "Raj"
        assert sig["voice_file_id"] == "file_abc123"
        assert "[Voice @Raj]" in sig["summary"]
        print(f"  ✅ [{sig['type']}] -> {sig['summary'][:80]}")


async def test_full_pipeline_with_transcript():
    """
    Full pipeline test: inject synthetic transcript -> signal extraction -> ChromaDB.
    Bypasses the Whisper API entirely.
    """
    from backend.pipeline import process_message_batch, query_knowledge, set_lens
    from backend.agents.voice_handler import build_voice_transcript_signal, _inject_voice_metadata
    from backend.db.chroma import store_signals
    import chromadb
    from backend.config import CHROMA_DB_PATH

    print("\nTesting full pipeline with synthetic transcript...")
    group_id = "test_voice_m21_pipeline"
    set_lens(group_id, "dev")

    sender = "Raj"
    timestamp = "2026-03-21T10:00:00Z"
    voice_meta = {"sender": sender, "voice_file_id": "test_file_id", "duration_seconds": 45, "language": "en"}

    # Store raw transcript
    transcript_signal = build_voice_transcript_signal(
        transcript=TRANSCRIPT_ARCHITECTURE.strip(),
        sender=sender, group_id=group_id,
        voice_file_id="test_file_id", duration_seconds=45,
        language="en", timestamp=timestamp,
    )
    store_signals(group_id, [transcript_signal])
    print(f"  ✅ Raw voice transcript stored in ChromaDB")

    # Run through signal extraction
    messages = [{"sender": sender, "text": TRANSCRIPT_ARCHITECTURE.strip(), "timestamp": timestamp}]
    extracted = await process_message_batch(group_id, messages)
    enriched = _inject_voice_metadata(extracted, voice_meta)
    print(f"  ✅ {len(enriched)} signal(s) extracted from transcript")

    # Verify voice attribution
    for sig in enriched:
        assert sig.get("source") == "voice"
        assert "[Voice @Raj]" in sig.get("summary", "")
    print(f"  ✅ Voice attribution on all extracted signals")

    # Query knowledge base
    answer = await query_knowledge(group_id, "What database did we decide on?")
    assert any(w in answer.lower() for w in ["postgres", "database", "sql"])
    print(f"  ✅ Knowledge base query answered: {answer[:100]}...")

    # Cleanup
    client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
    try:
        client.delete_collection(f"ll_{group_id}")
    except Exception:
        pass


async def test_handler_functions_importable():
    """Test that the Telegram handler functions import correctly."""
    print("\nTesting handler function imports...")
    from backend.bot.commands import handle_voice_telegram, handle_video_note_telegram
    print("  ✅ handle_voice_telegram importable")
    print("  ✅ handle_video_note_telegram importable")


async def main():
    print("Running Milestone 21 tests...\n")
    await test_voice_transcript_signal_builder()
    await test_voice_metadata_injection()
    await test_full_pipeline_with_transcript()
    await test_handler_functions_importable()
    print("\n🎉 MILESTONE 21 PASSED — Voice handler integrated into signal pipeline")


asyncio.run(main())

Run: cd thirdeye && python scripts/test_m21.py

Expected output:

  ✅ type: voice_transcript, source: voice, speaker: Raj
  ✅ keywords: ['postgres', 'database', 'schema', 'thursday', 'decision']
  ✅ [architecture_decision] -> [Voice @Raj] Use PostgreSQL...
  ✅ [action_item] -> [Voice @Raj] Raj to set up schema by Thursday
  ✅ 3 signal(s) extracted from transcript
  ✅ Voice attribution on all extracted signals
  ✅ Knowledge base query answered: Based on what @Raj said...
  ✅ handle_voice_telegram importable
  ✅ handle_video_note_telegram importable

🎉 MILESTONE 21 PASSED — Voice handler integrated into signal pipeline

MILESTONE 22: Voice Attribution in /ask + /voicelog Command (160%)

Goal: Two things that complete the voice intelligence loop. First, update the Query Agent so when it answers using a voice-sourced signal, the answer cites it explicitly: "Based on what @Raj said in a voice note on March 14th (45s)...". Second, a /voicelog command — a searchable audit trail of everything your team has ever said aloud in a voice note, filterable by speaker, signal type, or keyword.

Step 22.1 — Update the Query Agent for voice citation

In thirdeye/backend/agents/query_agent.py, add this helper function and wire it into your context-building logic:

# Add to backend/agents/query_agent.py

def _format_signal_for_context(signal: dict) -> str:
    """
    Format a ChromaDB signal as a context snippet for the Query Agent LLM.
    Voice-sourced signals get explicit attribution so the LLM cites them correctly.
    """
    source = signal.get("source", "chat")
    sig_type = signal.get("type", "unknown")
    summary = signal.get("summary", "")
    timestamp = signal.get("timestamp", "")

    date_str = ""
    if timestamp:
        try:
            from datetime import datetime
            dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
            date_str = dt.strftime("%b %d")
        except Exception:
            date_str = timestamp[:10]

    if source == "voice":
        speaker = signal.get("speaker", "Unknown")
        duration = signal.get("voice_duration", 0)
        duration_str = f"{duration}s" if duration else "?"
        return (
            f"[VOICE NOTE — @{speaker} on {date_str} ({duration_str})] "
            f"[{sig_type}] {summary}"
        )

    if source == "document":
        return f"[DOCUMENT — {date_str}] [{sig_type}] {summary}"

    if source == "link":
        return f"[WEB LINK — {date_str}] [{sig_type}] {summary}"

    if sig_type in ("meet_decision", "meet_action_item", "meet_blocker", "meet_summary"):
        meeting_id = signal.get("meeting_id", "")
        return f"[MEETING {meeting_id}{date_str}] [{sig_type}] {summary}"

    entities = signal.get("entities", [])
    sender_str = entities[0] if entities else ""
    return f"[CHAT — {sender_str} on {date_str}] [{sig_type}] {summary}"

Then add this line to your existing QUERY_SYSTEM_PROMPT string (or concatenate it):

# Add to your QUERY_SYSTEM_PROMPT in query_agent.py
VOICE_CITATION_INSTRUCTION = """
When context includes [VOICE NOTE — @name on Date (Xs)] signals, ALWAYS cite the voice note explicitly.
Example: "Based on what @Raj said in a voice note on Mar 14 (45s), the team decided to use PostgreSQL."
Never flatten voice signals into generic "the team discussed" language. Always name the speaker and source.
"""
# Then in your query call: system_prompt = EXISTING_PROMPT + VOICE_CITATION_INSTRUCTION

Step 22.2 — Add the /voicelog command

Add to thirdeye/backend/bot/commands.py:

# -----------------------------------------------------------------
#  /voicelog command — add to commands.py
# -----------------------------------------------------------------

async def cmd_voicelog(update, context):
    """
    /voicelog [filter]
    Audit trail of all voice note decisions, actions, and blockers in this group.

    Usage:
      /voicelog                — all voice-sourced signals (last 20)
      /voicelog decisions      — only decisions from voice notes
      /voicelog actions        — only action items from voice notes
      /voicelog blockers       — only blockers from voice notes
      /voicelog @Raj           — only voice notes by Raj
      /voicelog search [query] — search voice note content
    """
    from backend.db.chroma import query_signals, get_all_signals
    from backend.agents.voice_transcriber import format_duration
    from datetime import datetime

    chat_id = str(update.effective_chat.id)
    args = context.args or []

    filter_type = None
    filter_speaker = None
    search_query = None

    if args:
        first = args[0].lower()
        if first == "decisions":
            filter_type = "architecture_decision"
        elif first == "actions":
            filter_type = "action_item"
        elif first == "blockers":
            filter_type = "blocker"
        elif first == "search" and len(args) > 1:
            search_query = " ".join(args[1:])
        elif first.startswith("@"):
            filter_speaker = first[1:]

    await update.message.reply_text("🎤 Searching voice notes...", parse_mode="Markdown")

    if search_query:
        all_signals = query_signals(chat_id, search_query, n_results=30)
    else:
        all_signals = get_all_signals(chat_id)

    # Filter to voice-sourced signals only
    voice_signals = [
        s for s in all_signals
        if s.get("source") == "voice"
        or s.get("type") == "voice_transcript"
        or "[Voice @" in s.get("summary", "")
    ]

    if filter_type:
        voice_signals = [s for s in voice_signals if s.get("type") == filter_type]
    if filter_speaker:
        voice_signals = [
            s for s in voice_signals
            if filter_speaker.lower() in s.get("speaker", "").lower()
            or filter_speaker.lower() in str(s.get("entities", [])).lower()
        ]

    # Prefer structured signals; fall back to raw transcripts if none
    structured = [s for s in voice_signals if s.get("type") != "voice_transcript"]
    display_signals = structured if structured else voice_signals

    # Sort by timestamp descending
    def _ts(s):
        try:
            return datetime.fromisoformat(s.get("timestamp", "").replace("Z", "+00:00"))
        except Exception:
            return datetime.min

    display_signals.sort(key=_ts, reverse=True)
    display_signals = display_signals[:20]

    if not display_signals:
        await update.message.reply_text(
            "📭 No voice note signals found. Voice notes are transcribed automatically when sent here.",
            parse_mode="Markdown",
        )
        return

    type_emoji = {
        "architecture_decision": "🏗️",
        "tech_debt":             "⚠️",
        "action_item":           "📌",
        "blocker":               "🚧",
        "feature_request":       "💡",
        "promise":               "🤝",
        "risk":                  "🔴",
        "recurring_bug":         "🐛",
        "voice_transcript":      "🎤",
    }

    filter_label = ""
    if filter_type:
        filter_label = f" — {filter_type.replace('_', ' ').title()}"
    elif filter_speaker:
        filter_label = f" — @{filter_speaker}"
    elif search_query:
        filter_label = f" — '{search_query}'"

    lines = [f"🎤 *Voice Note Audit Trail*{filter_label}\n_{len(display_signals)} signal(s)_\n"]

    for sig in display_signals:
        ts = sig.get("timestamp", "")
        date_str = ""
        if ts:
            try:
                dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
                date_str = dt.strftime("%b %d")
            except Exception:
                date_str = ts[:10]

        speaker = sig.get("speaker", "")
        duration = sig.get("voice_duration", 0)
        duration_str = format_duration(duration) if duration else ""
        emoji = type_emoji.get(sig.get("type", ""), "🎤")

        summary = sig.get("summary", "")
        if summary.startswith("[Voice @"):
            summary = summary.split("] ", 1)[-1] if "] " in summary else summary

        meta = " · ".join(filter(None, [f"@{speaker}" if speaker else "", date_str, duration_str]))
        lines.append(f"{emoji} *{meta}*\n   _{summary[:100]}_\n")

    await update.message.reply_text("\n".join(lines), parse_mode="Markdown")

Step 22.3 — Register in bot.py

from backend.bot.commands import cmd_voicelog
application.add_handler(CommandHandler("voicelog", cmd_voicelog))

Update your /start message:

"🎤 /voicelog — Audit trail of all voice note decisions\n"
"🎤 /voicelog @name — Voice notes by a specific person\n"
"🎤 /voicelog search [q] — Search voice note content\n"

TEST MILESTONE 22

Create file: thirdeye/scripts/test_m22.py

"""
Test Milestone 22: Voice attribution in /ask + /voicelog.
"""
import asyncio
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))

TRANSCRIPT_POSTGRES = "We decided to go with PostgreSQL. Final. Raj will set up the schema by Thursday."
TRANSCRIPT_BLOCKER  = "Dashboard is still blocked on design specs. Two weeks now. Hard blocker for the sprint."
TRANSCRIPT_BUG      = "Checkout timeout is happening again. Critical. Someone needs to investigate today."


async def _seed_voice_signals(group_id: str):
    """Seed a group with voice-sourced signals for testing."""
    from backend.pipeline import process_message_batch, set_lens
    from backend.agents.voice_handler import build_voice_transcript_signal, _inject_voice_metadata
    from backend.db.chroma import store_signals

    set_lens(group_id, "dev")
    sessions = [
        ("Raj",  TRANSCRIPT_POSTGRES, "f1", 22, "2026-03-14T10:00:00Z"),
        ("Alex", TRANSCRIPT_BLOCKER,  "f2", 18, "2026-03-17T11:00:00Z"),
        ("Sam",  TRANSCRIPT_BUG,      "f3", 15, "2026-03-19T09:00:00Z"),
    ]
    for sender, transcript, file_id, duration, timestamp in sessions:
        ts_signal = build_voice_transcript_signal(
            transcript=transcript, sender=sender, group_id=group_id,
            voice_file_id=file_id, duration_seconds=duration,
            language="en", timestamp=timestamp,
        )
        store_signals(group_id, [ts_signal])
        messages = [{"sender": sender, "text": transcript, "timestamp": timestamp}]
        extracted = await process_message_batch(group_id, messages)
        voice_meta = {"sender": sender, "voice_file_id": file_id, "duration_seconds": duration, "language": "en"}
        _inject_voice_metadata(extracted, voice_meta)


async def test_signal_formatter():
    """Test that voice signals format with attribution prefix."""
    from backend.agents.query_agent import _format_signal_for_context

    print("Testing signal formatter with voice attribution...")

    voice_signal = {
        "type": "architecture_decision",
        "summary": "Team decided to use PostgreSQL",
        "source": "voice",
        "speaker": "Raj",
        "voice_duration": 45,
        "timestamp": "2026-03-14T10:00:00Z",
        "entities": ["@Raj"],
    }
    formatted = _format_signal_for_context(voice_signal)
    assert "[VOICE NOTE" in formatted, "Expected [VOICE NOTE] prefix"
    assert "@Raj" in formatted
    assert "Mar 14" in formatted
    assert "45s" in formatted
    print(f"  ✅ Voice: {formatted[:120]}")

    chat_signal = {"type": "tech_debt", "summary": "JWT hardcoded", "source": "chat", "timestamp": "2026-03-15T09:00:00Z", "entities": ["@Alex"]}
    assert "[CHAT" in _format_signal_for_context(chat_signal)
    print(f"  ✅ Chat signal formatted correctly")

    doc_signal = {"type": "document_knowledge", "summary": "OAuth required", "source": "document", "timestamp": "2026-03-16T09:00:00Z", "entities": []}
    assert "[DOCUMENT" in _format_signal_for_context(doc_signal)
    print(f"  ✅ Document signal formatted correctly")


async def test_voice_query_attribution():
    """Test that /ask returns voice attribution in its answer."""
    from backend.pipeline import query_knowledge
    import chromadb
    from backend.config import CHROMA_DB_PATH

    print("\nTesting /ask returns voice attribution...")
    group_id = "test_voice_m22_ask"
    await _seed_voice_signals(group_id)

    answer = await query_knowledge(group_id, "What database did we decide to use?")
    assert len(answer) > 10
    relevant = any(w in answer.lower() for w in ["postgres", "raj", "voice", "database"])
    assert relevant, f"Answer did not surface voice-sourced decision. Got: {answer[:200]}"
    print(f"  ✅ Answer surfaces voice decision: {answer[:150]}...")

    has_citation = any(phrase in answer.lower() for phrase in ["voice note", "@raj", "raj said", "mar 14"])
    if has_citation:
        print(f"  ✅ Explicit voice attribution present in answer")
    else:
        print(f"  ⚠️  Answer correct but attribution phrasing varies by provider (acceptable)")

    # Cleanup
    import chromadb as cdb
    client = cdb.PersistentClient(path=CHROMA_DB_PATH)
    try:
        client.delete_collection(f"ll_{group_id}")
    except Exception:
        pass


async def test_voicelog_filtering():
    """Test voicelog retrieval and speaker filtering."""
    from backend.db.chroma import get_all_signals
    import chromadb
    from backend.config import CHROMA_DB_PATH

    print("\nTesting voicelog signal retrieval and filtering...")
    group_id = "test_voice_m22_log"
    await _seed_voice_signals(group_id)

    all_signals = get_all_signals(group_id)
    voice_signals = [
        s for s in all_signals
        if s.get("source") == "voice"
        or s.get("type") == "voice_transcript"
        or "[Voice @" in s.get("summary", "")
    ]
    assert len(voice_signals) > 0, "Expected voice-sourced signals"
    print(f"  ✅ Found {len(voice_signals)} voice-sourced signal(s)")

    raj_signals = [
        s for s in voice_signals
        if "raj" in s.get("speaker", "").lower() or "raj" in str(s.get("entities", [])).lower()
    ]
    assert len(raj_signals) > 0, "Expected signals from Raj"
    print(f"  ✅ Found {len(raj_signals)} signal(s) from @Raj")

    structured = [s for s in voice_signals if s.get("type") != "voice_transcript"]
    print(f"  ✅ {len(structured)} structured, {len(voice_signals) - len(structured)} raw transcripts")

    # Cleanup
    client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
    try:
        client.delete_collection(f"ll_{group_id}")
    except Exception:
        pass


async def test_voicelog_command_importable():
    """Test that cmd_voicelog imports without errors."""
    print("\nTesting cmd_voicelog import...")
    from backend.bot.commands import cmd_voicelog
    print("  ✅ cmd_voicelog importable")


async def test_mixed_source_query():
    """Test that /ask uses voice + chat signals together."""
    from backend.pipeline import process_message_batch, query_knowledge, set_lens
    from backend.agents.voice_handler import build_voice_transcript_signal, _inject_voice_metadata
    from backend.db.chroma import store_signals
    import chromadb
    from backend.config import CHROMA_DB_PATH

    print("\nTesting mixed-source query (voice + chat)...")
    group_id = "test_voice_m22_mixed"
    set_lens(group_id, "dev")

    # Chat signal: Redis
    await process_message_batch(group_id, [
        {"sender": "Alex", "text": "I think we should use Redis for the cache.", "timestamp": "2026-03-10T09:00:00Z"}
    ])

    # Voice signal (more recent): overrides to PostgreSQL
    transcript = "Just to be clear — we're going with PostgreSQL for everything. Redis is off the table."
    ts_signal = build_voice_transcript_signal(
        transcript=transcript, sender="Raj", group_id=group_id,
        voice_file_id="f_override", duration_seconds=20, language="en",
        timestamp="2026-03-21T10:00:00Z",
    )
    store_signals(group_id, [ts_signal])
    extracted = await process_message_batch(group_id, [
        {"sender": "Raj", "text": transcript, "timestamp": "2026-03-21T10:00:00Z"}
    ])
    _inject_voice_metadata(extracted, {"sender": "Raj", "voice_file_id": "f_override", "duration_seconds": 20, "language": "en"})

    answer = await query_knowledge(group_id, "What did we decide about caching?")
    assert any(w in answer.lower() for w in ["postgres", "redis", "cache"])
    print(f"  ✅ Mixed-source query answered: {answer[:120]}...")

    # Cleanup
    client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
    try:
        client.delete_collection(f"ll_{group_id}")
    except Exception:
        pass


async def main():
    print("Running Milestone 22 tests...\n")
    await test_signal_formatter()
    await test_voice_query_attribution()
    await test_voicelog_filtering()
    await test_voicelog_command_importable()
    await test_mixed_source_query()
    print("\n🎉 MILESTONE 22 PASSED — Voice attribution in /ask, /voicelog working")


asyncio.run(main())

Run: cd thirdeye && python scripts/test_m22.py

Expected output:

  ✅ Voice: [VOICE NOTE — @Raj on Mar 14 (45s)] [architecture_decision] Team decided...
  ✅ Chat signal formatted correctly
  ✅ Document signal formatted correctly
  ✅ Answer surfaces voice decision: Based on what @Raj said in a voice note on Mar 14...
  ✅ Explicit voice attribution present in answer
  ✅ Found 9 voice-sourced signal(s)
  ✅ Found 3 signal(s) from @Raj
  ✅ cmd_voicelog importable
  ✅ Mixed-source query answered: The team discussed caching...

🎉 MILESTONE 22 PASSED — Voice attribution in /ask, /voicelog working

MILESTONE SUMMARY (Updated)

# Milestone What You Have %
010 Core System Full ThirdEye pipeline, Telegram bot, dashboard 0100%
11 Document Ingestion PDFs/DOCX/TXT → chunked → RAG 105%
12 Tavily Web Search Query agent searches web on fallback 110%
13 Link Fetch & Ingestion URLs → fetched → stored as signals 115%
14 Meet Chrome Extension Browser captures Meet audio → POSTs chunks 120%
15 Meet Signal Processing Transcript → decisions/actions/blockers → ChromaDB 125%
16 Meet Telegram Commands /meetsum, /meetask, /meetmatch 130%
17 Jira API Client Async Jira REST wrapper 135%
18 Jira Signal Agent LLM converts signals → well-formed tickets 140%
19 Jira Telegram Commands /jira, /jirastatus, /jirasearch, /jiraraised, /jirawatch 145%
20 Groq Whisper Client Audio bytes → transcript. Zero new keys. Zero new cost. 150%
21 Voice Handler + Pipeline voice/video_note → transcribe → extract signals → ChromaDB 155%
22 Voice Attribution + /voicelog /ask cites voice notes. /voicelog audits all voice decisions. 160%

FILE CHANGE SUMMARY

New Files Created

thirdeye/backend/agents/voice_transcriber.py   # Milestone 20 — Groq Whisper client
thirdeye/backend/agents/voice_handler.py        # Milestone 21 — pipeline orchestrator
thirdeye/scripts/test_m20.py                   # Milestone 20 test
thirdeye/scripts/test_m21.py                   # Milestone 21 test
thirdeye/scripts/test_m22.py                   # Milestone 22 test

Existing Files Modified

thirdeye/.env                                  # Pre-work: 5 new VOICE_* vars
thirdeye/backend/config.py                     # Pre-work: voice config vars
thirdeye/backend/agents/query_agent.py         # M22: _format_signal_for_context() + citation prompt
thirdeye/backend/bot/commands.py               # M21: handle_voice_telegram, handle_video_note_telegram
                                                # M22: cmd_voicelog
thirdeye/backend/bot/bot.py                    # M21: VOICE + VIDEO_NOTE MessageHandlers
                                                # M22: /voicelog CommandHandler

Updated Repo Structure (additions only)

thirdeye/
├── backend/
│   ├── agents/
│   │   ├── voice_transcriber.py    # NEW — Groq Whisper API client
│   │   └── voice_handler.py        # NEW — pipeline orchestrator
│   │
│   └── agents/
│       └── query_agent.py          # MODIFIED — voice-aware context formatting + citation instruction
│
├── bot/
│   ├── commands.py                 # MODIFIED — voice handlers + cmd_voicelog
│   └── bot.py                      # MODIFIED — VOICE + VIDEO_NOTE handlers, /voicelog
│
└── scripts/
    ├── test_m20.py
    ├── test_m21.py
    └── test_m22.py

UPDATED COMMANDS REFERENCE

NEW — Voice Intelligence:
/voicelog               — Audit trail of all voice note signals (last 20)
/voicelog @name         — Voice notes by a specific team member
/voicelog decisions     — Only decisions extracted from voice notes
/voicelog actions       — Only action items from voice notes
/voicelog blockers      — Only blockers from voice notes
/voicelog search [q]    — Search voice note content by keyword

ENHANCED — existing commands now voice-aware:
/ask [q]    — Now cites voice notes: "Based on what @Raj said in a voice note on Mar 14 (45s)..."

PASSIVE — no command needed:
• Voice messages   → 👂 react → download OGG → Groq Whisper → transcript → signal extraction
• Video notes      → 👂 react → download MP4 → Groq Whisper → transcript → signal extraction

HOW THE FULL VOICE FLOW WORKS (End-to-End)

1. @Raj sends a 45s voice note:
   "So we're going with PostgreSQL. Final decision.
    Raj will set up the schema by Thursday."

2. ThirdEye reacts with 👂 immediately.

3. Bot downloads OGG audio bytes from Telegram CDN.

4. POST to Groq Whisper (whisper-large-v3, same GROQ_API_KEY):
   https://api.groq.com/openai/v1/audio/transcriptions
   ~1 second for a 45s clip. Free.

5. Whisper returns:
   "So we're going with PostgreSQL. Final decision.
    Raj will set up the schema by Thursday."

6. voice_transcript signal stored in ChromaDB:
   type="voice_transcript", source="voice", speaker="Raj",
   voice_duration=45, raw_quote="So we're going with..."

7. Transcript runs through process_message_batch() like any text message.
   Signal extraction finds:
   [Voice @Raj] architecture_decision: Use PostgreSQL  [MEDIUM]
   [Voice @Raj] action_item: Raj to set up schema by Thursday  [MEDIUM]

8. Bot replies:
   🎤 Raj (45s) — transcribed
   "So we're going with PostgreSQL. Final decision..."
   `2 signals extracted`

9. Later: /ask What database are we using?
   "Based on what @Raj said in a voice note on Mar 21 (45s),
    the team decided to use PostgreSQL. Raj is also setting
    up the schema by Thursday."

10. /voicelog shows:
    🏗️ @Raj · Mar 21 · 45s
       Use PostgreSQL for database
    📌 @Raj · Mar 21 · 45s
       Set up schema by Thursday

THE NOVELTY ARGUMENT

Every "team intelligence" tool on the market — Slack AI, Prism, Runbear, Notion AI — processes text. In practice, especially in Indian startup culture and any informal team, the most important decisions travel as voice notes. The CTO explaining the architecture in a 2-minute voice note. The PM clarifying the client scope during a commute. The lead engineer flagging a risk off the cuff.

None of that was ever captured. Until now.

ThirdEye is the first team intelligence system to treat voice notes as first-class signals with full attribution, vector search, and automatic Jira ticket raising. The knowledge gap — the one that matters most — is closed.


Every milestone has a test. Every test must pass. No skipping.