# ThirdEye — Voice Message Intelligence Milestones (20→22) > **Prerequisite: Milestones 0–19 must be COMPLETE and PASSING. This feature layers on top of the existing working system.** > **Same rule: Do NOT skip milestones. Do NOT skip tests. Every test must PASS before moving to the next milestone.** --- ## WHAT THIS ADDS Telegram groups are full of voice notes. Nobody processes them. Every other "chat intelligence" product is text-only. ThirdEye fixes this: 1. A **Groq Whisper transcription client** — uses the `whisper-large-v3` model already in your provider stack. Zero new API keys. Zero new cost. 2. A **Telegram voice handler** — detects voice messages (and round video notes) in any monitored group, downloads the audio, transcribes it, and runs the transcript through the exact same signal extraction pipeline as text messages. 3. **Voice attribution** — every signal extracted from a voice note carries full provenance: who said it, when, duration, and the raw transcript. When `/ask` answers a question using a voice-sourced signal, it cites it: *"Based on what @Raj said in a voice note on March 14th..."*. Plus a `/voicelog` command to list every voice note decision your team has ever made. **The integration is seamless.** Voice transcripts become first-class signals in your existing knowledge graph alongside chat, documents, links, and Meet recordings. The pipeline does not care about the source — it just sees text. **The demo line:** *"Most teams make their most important decisions in voice notes. Until now, those decisions were invisible to every tool on the market. ThirdEye is the first to change that."* --- ## PRE-WORK: Dependencies & Config Updates ### Step 0.1 — No new pip packages needed Groq Whisper uses the same `GROQ_API_KEY` already in your `.env` and the same `httpx` already installed. The Telegram bot already downloads files via `python-telegram-bot`. There is literally nothing new to install. Verify httpx is present (it will be): ```bash python -c "import httpx; print('httpx OK:', httpx.__version__)" ``` ### Step 0.2 — Add new env vars Append to `thirdeye/.env`: ```bash # Voice Message Intelligence (Milestone 20) ENABLE_VOICE_TRANSCRIPTION=true VOICE_MAX_DURATION_SECONDS=300 # Skip voice notes longer than 5 minutes (too long for free tier) VOICE_MIN_DURATION_SECONDS=2 # Skip accidental sub-2-second recordings VOICE_LANGUAGE= # Optional: force a language code e.g. "hi", "en". Leave empty for auto-detect. VOICE_STORE_TRANSCRIPT=true # Store the raw transcript text in ChromaDB for full-text search ``` ### Step 0.3 — Update config.py Add these lines at the bottom of `thirdeye/backend/config.py`: ```python # Voice Message Intelligence ENABLE_VOICE_TRANSCRIPTION = os.getenv("ENABLE_VOICE_TRANSCRIPTION", "true").lower() == "true" VOICE_MAX_DURATION_SECONDS = int(os.getenv("VOICE_MAX_DURATION_SECONDS", "300")) VOICE_MIN_DURATION_SECONDS = int(os.getenv("VOICE_MIN_DURATION_SECONDS", "2")) VOICE_LANGUAGE = os.getenv("VOICE_LANGUAGE", "") # empty string = Whisper auto-detects VOICE_STORE_TRANSCRIPT = os.getenv("VOICE_STORE_TRANSCRIPT", "true").lower() == "true" ``` --- ## MILESTONE 20: Groq Whisper Transcription Client (150%) **Goal:** A focused, robust async function that takes raw audio bytes (any format Telegram sends — OGG/Opus for voice, MP4 for video notes) and returns a clean transcript string. Uses Groq's free `whisper-large-v3` endpoint. Handles rate limits, empty audio, and network failures gracefully. No new API keys. No new dependencies. ### Step 20.1 — Create the transcription module Create file: `thirdeye/backend/agents/voice_transcriber.py` ```python """ Voice Transcriber — Groq Whisper integration. Uses Groq's whisper-large-v3 model (free, already in provider stack) to transcribe audio bytes from Telegram voice messages and video notes into plain text. Groq Whisper endpoint: https://api.groq.com/openai/v1/audio/transcriptions Supported formats: flac, mp3, mp4, mpeg, mpga, m4a, ogg, opus, wav, webm Telegram voice messages: OGG/Opus Telegram video notes: MP4 Free tier limits: 7,200 seconds of audio / hour on Groq free plan. At avg 30s per voice note: ~240 voice notes / hour — more than any team sends. """ import io import logging from typing import Optional import httpx from backend.config import ( GROQ_API_KEY, VOICE_LANGUAGE, VOICE_MAX_DURATION_SECONDS, VOICE_MIN_DURATION_SECONDS, ) logger = logging.getLogger("thirdeye.agents.voice_transcriber") GROQ_WHISPER_URL = "https://api.groq.com/openai/v1/audio/transcriptions" WHISPER_MODEL = "whisper-large-v3" # Groq file size limit for Whisper: 25 MB GROQ_MAX_FILE_BYTES = 25 * 1024 * 1024 # --- Main transcription function --------------------------------------------- async def transcribe_audio( audio_bytes: bytes, filename: str = "audio.ogg", duration_seconds: int = None, ) -> dict: """ Transcribe audio bytes using Groq Whisper. Args: audio_bytes: Raw audio data (OGG, MP4, WAV, etc.) filename: Filename hint for the API (determines format detection) duration_seconds: Voice message duration from Telegram metadata (for pre-filtering) Returns: { "ok": True, "transcript": "The full transcribed text...", "language": "en", "duration": 45, "word_count": 120, } OR on failure: { "ok": False, "reason": "too_long" | "too_short" | "empty" | "file_too_large" | "api_error" | "no_speech", "error": "optional error string", } """ # Pre-flight checks if not GROQ_API_KEY or len(GROQ_API_KEY) < 5: return {"ok": False, "reason": "api_error", "error": "GROQ_API_KEY not set"} if not audio_bytes: return {"ok": False, "reason": "empty", "error": "No audio bytes received"} if len(audio_bytes) > GROQ_MAX_FILE_BYTES: return { "ok": False, "reason": "file_too_large", "error": f"Audio is {len(audio_bytes) / 1024 / 1024:.1f}MB — Groq limit is 25MB", } if duration_seconds is not None: if duration_seconds < VOICE_MIN_DURATION_SECONDS: return { "ok": False, "reason": "too_short", "error": f"Voice note is {duration_seconds}s — minimum is {VOICE_MIN_DURATION_SECONDS}s", } if duration_seconds > VOICE_MAX_DURATION_SECONDS: return { "ok": False, "reason": "too_long", "error": f"Voice note is {duration_seconds}s — maximum is {VOICE_MAX_DURATION_SECONDS}s", } # Determine MIME type from filename extension ext_to_mime = { ".ogg": "audio/ogg", ".opus": "audio/ogg", ".mp3": "audio/mpeg", ".mp4": "video/mp4", ".m4a": "audio/mp4", ".wav": "audio/wav", ".flac": "audio/flac", ".webm": "audio/webm", } ext = "." + filename.rsplit(".", 1)[-1].lower() if "." in filename else ".ogg" mime_type = ext_to_mime.get(ext, "audio/ogg") form_data = { "model": WHISPER_MODEL, "response_format": "verbose_json", # returns language detection "temperature": "0", # deterministic transcription } if VOICE_LANGUAGE: form_data["language"] = VOICE_LANGUAGE try: async with httpx.AsyncClient(timeout=60.0) as client: resp = await client.post( GROQ_WHISPER_URL, headers={"Authorization": f"Bearer {GROQ_API_KEY}"}, files={"file": (filename, io.BytesIO(audio_bytes), mime_type)}, data=form_data, ) resp.raise_for_status() data = resp.json() except httpx.HTTPStatusError as e: error_text = "" try: error_text = e.response.json().get("error", {}).get("message", e.response.text[:200]) except Exception: error_text = e.response.text[:200] if e.response.status_code == 429: logger.warning("Groq Whisper rate limited") return {"ok": False, "reason": "api_error", "error": "Rate limited — try again shortly"} logger.error(f"Groq Whisper HTTP error {e.response.status_code}: {error_text}") return {"ok": False, "reason": "api_error", "error": f"HTTP {e.response.status_code}: {error_text}"} except httpx.TimeoutException: logger.warning("Groq Whisper request timed out") return {"ok": False, "reason": "api_error", "error": "Request timed out after 60s"} except Exception as e: logger.error(f"Groq Whisper unexpected error: {e}") return {"ok": False, "reason": "api_error", "error": str(e)} # Parse response transcript = (data.get("text") or "").strip() if not transcript: return {"ok": False, "reason": "no_speech", "error": "Whisper returned empty transcript"} # Detect if Whisper only returned noise markers noise_patterns = {"[music]", "[noise]", "[silence]", "[inaudible]", "(music)", "(noise)"} if transcript.lower() in noise_patterns: return {"ok": False, "reason": "no_speech", "error": f"Only noise detected: {transcript}"} detected_language = data.get("language", VOICE_LANGUAGE or "unknown") word_count = len(transcript.split()) logger.info( f"Whisper transcribed {duration_seconds or '?'}s audio -> " f"{word_count} words [{detected_language}]: {transcript[:60]}..." ) return { "ok": True, "transcript": transcript, "language": detected_language, "duration": duration_seconds, "word_count": word_count, } # --- Telegram-specific download helper --------------------------------------- async def download_telegram_audio(bot, file_id: str) -> bytes: """ Download a Telegram file (voice or video_note) and return raw bytes. """ tg_file = await bot.get_file(file_id) audio_bytes = await tg_file.download_as_bytearray() return bytes(audio_bytes) def format_duration(seconds: int) -> str: """Format seconds into human-readable string: '1m 34s' or '45s'.""" if seconds is None: return "?" if seconds >= 60: return f"{seconds // 60}m {seconds % 60}s" return f"{seconds}s" ``` ### ✅ TEST MILESTONE 20 Create file: `thirdeye/scripts/test_m20.py` ```python """ Test Milestone 20: Groq Whisper transcription client. Note: Full transcription tests require real audio bytes. We test pre-flight filters and API reachability here. Silent/near-silent audio will return "no_speech" — that is correct behaviour. To test with real speech: record a short voice note and save as thirdeye/scripts/test_voice.ogg before running this test. """ import asyncio import os import sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) def _make_minimal_ogg() -> bytes: """ Generate a minimal valid OGG container header (silent). Whisper will return no_speech for this — that IS the correct result. We use it to confirm the API is reachable and credentials work. """ ogg_magic = b"OggS" header = b"\x00\x02" + b"\x00" * 8 + b"\x00\x00\x00\x01" + b"\x00\x00\x00\x00" + b"\x00\x00\x00\x00" + b"\x01\x1e" vorbis_id = b"\x01vorbis" + b"\x00" * 23 return ogg_magic + header + vorbis_id async def test_config_loaded(): """Test that GROQ_API_KEY is present (needed for Whisper).""" from backend.config import GROQ_API_KEY, ENABLE_VOICE_TRANSCRIPTION print("Testing voice transcription config...") assert GROQ_API_KEY and len(GROQ_API_KEY) > 5, ( "GROQ_API_KEY is missing. Groq Whisper uses the same key as your LLM providers." ) print(f" ✅ GROQ_API_KEY present ({len(GROQ_API_KEY)} chars)") print(f" ✅ ENABLE_VOICE_TRANSCRIPTION: {ENABLE_VOICE_TRANSCRIPTION}") async def test_pre_flight_filters(): """Test that duration and size filters work before hitting the API.""" from backend.agents.voice_transcriber import transcribe_audio print("\nTesting pre-flight filters (no API calls made)...") result = await transcribe_audio(b"", filename="audio.ogg") assert not result["ok"] and result["reason"] == "empty" print(" ✅ Empty bytes -> reason='empty'") result = await transcribe_audio(b"fake", filename="audio.ogg", duration_seconds=1) assert not result["ok"] and result["reason"] == "too_short" print(" ✅ 1s audio -> reason='too_short' (min is 2s)") result = await transcribe_audio(b"fake", filename="audio.ogg", duration_seconds=9999) assert not result["ok"] and result["reason"] == "too_long" print(" ✅ 9999s audio -> reason='too_long' (max is 300s)") big_bytes = b"x" * (26 * 1024 * 1024) result = await transcribe_audio(big_bytes, filename="audio.ogg", duration_seconds=30) assert not result["ok"] and result["reason"] == "file_too_large" print(" ✅ 26MB audio -> reason='file_too_large' (Groq limit is 25MB)") async def test_api_reachable(): """ Test that Groq Whisper API is reachable and authenticates correctly. A 401 means your GROQ_API_KEY is wrong. """ from backend.agents.voice_transcriber import transcribe_audio print("\nTesting Groq Whisper API reachability...") minimal_ogg = _make_minimal_ogg() result = await transcribe_audio(minimal_ogg, filename="test.ogg", duration_seconds=5) if result["ok"]: print(f" ✅ API reachable — transcript: '{result['transcript'][:60]}'") elif result["reason"] == "no_speech": print(f" ✅ API reachable — silent audio correctly returned no_speech") elif result["reason"] == "api_error" and "401" in result.get("error", ""): raise AssertionError( f"Authentication failed — check GROQ_API_KEY in .env\nError: {result['error']}" ) else: print(f" ⚠️ API returned: reason={result['reason']}, error={result.get('error')} (non-fatal)") async def test_real_audio_file(): """ Test with a real OGG voice file if one exists at scripts/test_voice.ogg. OPTIONAL — skip if file not present. """ from backend.agents.voice_transcriber import transcribe_audio test_file = os.path.join(os.path.dirname(__file__), "test_voice.ogg") if not os.path.exists(test_file): print("\n ⏭️ Skipping real audio test — place a voice note OGG at scripts/test_voice.ogg to enable") return print(f"\nTesting with real audio file: {test_file}") with open(test_file, "rb") as f: audio_bytes = f.read() result = await transcribe_audio(audio_bytes, filename="test_voice.ogg", duration_seconds=30) assert result["ok"], f"Real audio transcription failed: {result}" assert len(result["transcript"]) > 5 print(f" ✅ Transcript ({result['word_count']} words): {result['transcript'][:120]}...") print(f" Language detected: {result['language']}") async def test_format_duration(): """Test the duration formatting helper.""" from backend.agents.voice_transcriber import format_duration print("\nTesting format_duration()...") assert format_duration(45) == "45s" assert format_duration(90) == "1m 30s" assert format_duration(0) == "0s" assert format_duration(None) == "?" print(" ✅ 45 -> '45s', 90 -> '1m 30s', None -> '?'") async def main(): print("Running Milestone 20 tests...\n") await test_config_loaded() await test_pre_flight_filters() await test_api_reachable() await test_real_audio_file() await test_format_duration() print("\n🎉 MILESTONE 20 PASSED — Groq Whisper client working") asyncio.run(main()) ``` Run: `cd thirdeye && python scripts/test_m20.py` **Expected output:** ``` ✅ GROQ_API_KEY present (56 chars) ✅ ENABLE_VOICE_TRANSCRIPTION: True ✅ Empty bytes -> reason='empty' ✅ 1s audio -> reason='too_short' ✅ 9999s audio -> reason='too_long' ✅ 26MB audio -> reason='file_too_large' ✅ API reachable — silent audio correctly returned no_speech ⏭️ Skipping real audio test — place a voice note OGG at scripts/test_voice.ogg to enable ✅ 45 -> '45s', 90 -> '1m 30s', None -> '?' 🎉 MILESTONE 20 PASSED — Groq Whisper client working ``` --- ## MILESTONE 21: Telegram Voice Handler + Pipeline Integration (155%) **Goal:** A new Telegram message handler that fires whenever a voice message or round video note is sent to any monitored group. It downloads the audio, transcribes it, and feeds the transcript directly into the existing `process_message_batch` pipeline — with full voice metadata attached so every extracted signal knows it came from a voice note. The group gets a lightweight acknowledgement. Nothing else changes in the pipeline. ### Step 21.1 — Create the voice handler orchestrator Create file: `thirdeye/backend/agents/voice_handler.py` ```python """ Voice Handler Orchestrates the full pipeline for Telegram voice messages and video notes: Telegram voice/video_note message -> download audio bytes -> transcribe via Groq Whisper (voice_transcriber.py) -> build a voice_transcript signal (stored raw for full-text search) -> run transcript through process_message_batch (signal extraction) -> all extracted signals carry voice attribution metadata Voice metadata attached to every extracted signal: source: "voice" voice_file_id: Telegram file ID voice_duration: seconds speaker: sender display name """ import logging import uuid from datetime import datetime, timezone from backend.agents.voice_transcriber import ( transcribe_audio, download_telegram_audio, format_duration ) from backend.config import ENABLE_VOICE_TRANSCRIPTION, VOICE_STORE_TRANSCRIPT from backend.db.chroma import store_signals from backend.pipeline import process_message_batch logger = logging.getLogger("thirdeye.agents.voice_handler") # --- Voice transcript signal builder ----------------------------------------- def build_voice_transcript_signal( transcript: str, sender: str, group_id: str, voice_file_id: str, duration_seconds: int, language: str, timestamp: str, ) -> dict: """ Build a voice_transcript signal that stores the full raw transcription. Always stored alongside extracted signals so the full transcript is searchable in ChromaDB even if no structured signals were extracted. """ return { "id": str(uuid.uuid4()), "type": "voice_transcript", "summary": f"[Voice {format_duration(duration_seconds)}] @{sender}: {transcript[:200]}", "raw_quote": transcript, "severity": "low", "status": "transcribed", "sentiment": "neutral", "urgency": "none", "entities": [f"@{sender}"], "keywords": _extract_voice_keywords(transcript), "timestamp": timestamp, "group_id": group_id, "lens": "voice", "source": "voice", "voice_file_id": voice_file_id, "voice_duration": duration_seconds, "voice_language": language, "speaker": sender, } def _extract_voice_keywords(text: str) -> list[str]: """Simple keyword extraction from transcript text.""" stopwords = { "the", "a", "an", "is", "are", "was", "were", "will", "to", "of", "in", "on", "at", "for", "by", "with", "this", "that", "and", "or", "but", "we", "i", "it", "be", "do", "have", "has", "had", "not", "so", "just", "like", "yeah", "okay", "um", "uh", "you", "me", } words = text.lower().split() keywords = [w.strip(".,!?;:\"'") for w in words if len(w) > 3 and w not in stopwords] return list(dict.fromkeys(keywords))[:12] def _inject_voice_metadata(signals: list[dict], voice_meta: dict) -> list[dict]: """ Inject voice attribution into every signal extracted from a voice transcript. This ensures /ask can cite the voice source in its answers. """ for signal in signals: signal["source"] = "voice" signal["voice_file_id"] = voice_meta.get("voice_file_id", "") signal["voice_duration"] = voice_meta.get("duration_seconds", 0) signal["voice_language"] = voice_meta.get("language", "") signal["speaker"] = voice_meta.get("sender", "Unknown") if "[Voice]" not in signal.get("summary", ""): signal["summary"] = f"[Voice @{voice_meta.get('sender', '?')}] {signal['summary']}" return signals # --- Main handler ------------------------------------------------------------ async def handle_voice_message( bot, group_id: str, sender: str, file_id: str, duration_seconds: int, message_date, is_video_note: bool = False, ) -> dict: """ Full pipeline for a single voice or video note message. Returns: {"ok": True, "transcript": "...", "signals_extracted": 3, "duration": 45, ...} OR {"ok": False, "reason": "...", "error": "..."} """ if not ENABLE_VOICE_TRANSCRIPTION: return {"ok": False, "reason": "disabled", "error": "Voice transcription is disabled"} msg_type = "video note" if is_video_note else "voice message" logger.info(f"Processing {msg_type} from {sender} in {group_id} ({duration_seconds}s)") # 1. Download audio try: audio_bytes = await download_telegram_audio(bot, file_id) except Exception as e: logger.error(f"Failed to download audio from {sender}: {e}") return {"ok": False, "reason": "download_failed", "error": str(e)} # 2. Transcribe filename = "audio.mp4" if is_video_note else "audio.ogg" transcription = await transcribe_audio( audio_bytes, filename=filename, duration_seconds=duration_seconds, ) if not transcription["ok"]: logger.info(f"Transcription skipped for {sender}: {transcription['reason']}") return {"ok": False, "reason": transcription["reason"], "error": transcription.get("error", "")} transcript = transcription["transcript"] language = transcription.get("language", "unknown") timestamp = ( message_date.replace(tzinfo=timezone.utc).isoformat() if message_date else datetime.utcnow().isoformat() ) # 3. Store raw voice transcript signal if VOICE_STORE_TRANSCRIPT: transcript_signal = build_voice_transcript_signal( transcript=transcript, sender=sender, group_id=group_id, voice_file_id=file_id, duration_seconds=duration_seconds, language=language, timestamp=timestamp, ) store_signals(group_id, [transcript_signal]) logger.info(f"Voice transcript stored for {sender} ({len(transcript)} chars)") # 4. Run through signal extraction pipeline — treat as a regular text message voice_meta = { "sender": sender, "voice_file_id": file_id, "duration_seconds": duration_seconds, "language": language, } messages = [{ "sender": sender, "text": transcript, "timestamp": timestamp, "source": "voice", "voice_file_id": file_id, "voice_duration": duration_seconds, }] try: extracted_signals = await process_message_batch(group_id, messages) extracted_signals = _inject_voice_metadata(extracted_signals, voice_meta) signals_count = len(extracted_signals) except Exception as e: logger.error(f"Signal extraction failed for voice from {sender}: {e}") signals_count = 0 logger.info( f"Voice pipeline complete: {sender}, {duration_seconds}s, " f"{signals_count} signals, transcript={len(transcript)} chars" ) return { "ok": True, "transcript": transcript, "signals_extracted": signals_count, "duration": duration_seconds, "sender": f"@{sender}", "language": language, } ``` ### Step 21.2 — Add voice and video_note handlers to commands.py Add to `thirdeye/backend/bot/commands.py`: ```python # ----------------------------------------------------------------- # Voice Message Handlers — add to commands.py # ----------------------------------------------------------------- async def handle_voice_telegram(update, context): """ Fires for every voice message sent to a monitored group. Downloads, transcribes via Groq Whisper, feeds into signal pipeline. """ from backend.agents.voice_handler import handle_voice_message from backend.config import ENABLE_VOICE_TRANSCRIPTION if not ENABLE_VOICE_TRANSCRIPTION: return msg = update.message if not msg or not msg.voice: return group_id = str(msg.chat_id) sender = (msg.from_user.full_name or msg.from_user.username or "Unknown") voice = msg.voice duration = voice.duration or 0 # React with 👂 immediately so team knows ThirdEye is processing try: await msg.set_reaction("👂") except Exception: pass # Reactions need bot admin rights — fail silently result = await handle_voice_message( bot=context.bot, group_id=group_id, sender=sender, file_id=voice.file_id, duration_seconds=duration, message_date=msg.date, is_video_note=False, ) if result["ok"]: signals = result["signals_extracted"] reply = ( f"🎤 *{sender}* ({duration}s) — transcribed\n" f"_{result['transcript'][:120]}{'...' if len(result['transcript']) > 120 else ''}_\n" f"`{signals} signal{'s' if signals != 1 else ''} extracted`" ) await msg.reply_text(reply, parse_mode="Markdown") elif result["reason"] == "too_long": await msg.reply_text( f"⏭️ Voice note from *{sender}* skipped — too long ({duration}s).", parse_mode="Markdown", ) elif result["reason"] in ("no_speech", "too_short", "empty"): pass # Silent skip else: logger.warning(f"Voice error for {sender}: {result.get('error')}") async def handle_video_note_telegram(update, context): """ Fires for round video messages (video notes). Same pipeline as voice messages — also contain audio. """ from backend.agents.voice_handler import handle_voice_message from backend.config import ENABLE_VOICE_TRANSCRIPTION if not ENABLE_VOICE_TRANSCRIPTION: return msg = update.message if not msg or not msg.video_note: return group_id = str(msg.chat_id) sender = (msg.from_user.full_name or msg.from_user.username or "Unknown") video_note = msg.video_note duration = video_note.duration or 0 try: await msg.set_reaction("👂") except Exception: pass result = await handle_voice_message( bot=context.bot, group_id=group_id, sender=sender, file_id=video_note.file_id, duration_seconds=duration, message_date=msg.date, is_video_note=True, ) if result["ok"]: signals = result["signals_extracted"] reply = ( f"📹 *{sender}* ({duration}s video note) — transcribed\n" f"_{result['transcript'][:120]}{'...' if len(result['transcript']) > 120 else ''}_\n" f"`{signals} signal{'s' if signals != 1 else ''} extracted`" ) await msg.reply_text(reply, parse_mode="Markdown") ``` ### Step 21.3 — Register handlers in bot.py In `thirdeye/backend/bot/bot.py`, add these two `MessageHandler` registrations after your existing text handler: ```python from telegram.ext import MessageHandler, filters from backend.bot.commands import handle_voice_telegram, handle_video_note_telegram application.add_handler( MessageHandler(filters.VOICE & ~filters.COMMAND, handle_voice_telegram) ) application.add_handler( MessageHandler(filters.VIDEO_NOTE & ~filters.COMMAND, handle_video_note_telegram) ) ``` ### ✅ TEST MILESTONE 21 Create file: `thirdeye/scripts/test_m21.py` ```python """ Test Milestone 21: Voice handler pipeline integration. Uses synthetic transcript text to avoid needing real audio in CI. """ import asyncio import os import sys import uuid sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) TRANSCRIPT_ARCHITECTURE = """ So I just wanted to quickly explain the architecture decision. We've been going back and forth on the database and I think we should just go with PostgreSQL. The main reason is Raj already knows it and we have less than two weeks to ship. Final decision — PostgreSQL. Raj can you start the schema by Thursday? """ TRANSCRIPT_BLOCKER = """ The thing I wanted to flag is the design specs are still not done. I've been waiting for two weeks and I literally cannot start the dashboard without them. This is a hard blocker. If I don't get the specs by Wednesday we'll miss Friday. """ async def test_voice_transcript_signal_builder(): """Test that the voice transcript signal is correctly structured.""" from backend.agents.voice_handler import build_voice_transcript_signal print("Testing voice transcript signal builder...") signal = build_voice_transcript_signal( transcript=TRANSCRIPT_ARCHITECTURE.strip(), sender="Raj", group_id="test_voice_m21", voice_file_id="fake_file_id_123", duration_seconds=45, language="en", timestamp="2026-03-21T10:00:00Z", ) assert signal["type"] == "voice_transcript" assert signal["source"] == "voice" assert signal["speaker"] == "Raj" assert "@Raj" in signal["entities"] assert signal["voice_duration"] == 45 assert signal["voice_language"] == "en" assert len(signal["raw_quote"]) > 50 # full transcript stored assert len(signal["keywords"]) > 0 print(f" ✅ type: {signal['type']}, source: {signal['source']}, speaker: {signal['speaker']}") print(f" ✅ keywords: {signal['keywords'][:5]}") print(f" ✅ summary: {signal['summary'][:100]}") async def test_voice_metadata_injection(): """Test that voice metadata is injected into extracted signals.""" from backend.agents.voice_handler import _inject_voice_metadata print("\nTesting voice metadata injection...") raw_signals = [ {"id": "1", "type": "architecture_decision", "summary": "Use PostgreSQL", "severity": "medium"}, {"id": "2", "type": "action_item", "summary": "Raj to set up schema by Thursday", "severity": "medium"}, ] voice_meta = {"sender": "Raj", "voice_file_id": "file_abc123", "duration_seconds": 45, "language": "en"} enriched = _inject_voice_metadata(raw_signals, voice_meta) for sig in enriched: assert sig["source"] == "voice" assert sig["speaker"] == "Raj" assert sig["voice_file_id"] == "file_abc123" assert "[Voice @Raj]" in sig["summary"] print(f" ✅ [{sig['type']}] -> {sig['summary'][:80]}") async def test_full_pipeline_with_transcript(): """ Full pipeline test: inject synthetic transcript -> signal extraction -> ChromaDB. Bypasses the Whisper API entirely. """ from backend.pipeline import process_message_batch, query_knowledge, set_lens from backend.agents.voice_handler import build_voice_transcript_signal, _inject_voice_metadata from backend.db.chroma import store_signals import chromadb from backend.config import CHROMA_DB_PATH print("\nTesting full pipeline with synthetic transcript...") group_id = "test_voice_m21_pipeline" set_lens(group_id, "dev") sender = "Raj" timestamp = "2026-03-21T10:00:00Z" voice_meta = {"sender": sender, "voice_file_id": "test_file_id", "duration_seconds": 45, "language": "en"} # Store raw transcript transcript_signal = build_voice_transcript_signal( transcript=TRANSCRIPT_ARCHITECTURE.strip(), sender=sender, group_id=group_id, voice_file_id="test_file_id", duration_seconds=45, language="en", timestamp=timestamp, ) store_signals(group_id, [transcript_signal]) print(f" ✅ Raw voice transcript stored in ChromaDB") # Run through signal extraction messages = [{"sender": sender, "text": TRANSCRIPT_ARCHITECTURE.strip(), "timestamp": timestamp}] extracted = await process_message_batch(group_id, messages) enriched = _inject_voice_metadata(extracted, voice_meta) print(f" ✅ {len(enriched)} signal(s) extracted from transcript") # Verify voice attribution for sig in enriched: assert sig.get("source") == "voice" assert "[Voice @Raj]" in sig.get("summary", "") print(f" ✅ Voice attribution on all extracted signals") # Query knowledge base answer = await query_knowledge(group_id, "What database did we decide on?") assert any(w in answer.lower() for w in ["postgres", "database", "sql"]) print(f" ✅ Knowledge base query answered: {answer[:100]}...") # Cleanup client = chromadb.PersistentClient(path=CHROMA_DB_PATH) try: client.delete_collection(f"ll_{group_id}") except Exception: pass async def test_handler_functions_importable(): """Test that the Telegram handler functions import correctly.""" print("\nTesting handler function imports...") from backend.bot.commands import handle_voice_telegram, handle_video_note_telegram print(" ✅ handle_voice_telegram importable") print(" ✅ handle_video_note_telegram importable") async def main(): print("Running Milestone 21 tests...\n") await test_voice_transcript_signal_builder() await test_voice_metadata_injection() await test_full_pipeline_with_transcript() await test_handler_functions_importable() print("\n🎉 MILESTONE 21 PASSED — Voice handler integrated into signal pipeline") asyncio.run(main()) ``` Run: `cd thirdeye && python scripts/test_m21.py` **Expected output:** ``` ✅ type: voice_transcript, source: voice, speaker: Raj ✅ keywords: ['postgres', 'database', 'schema', 'thursday', 'decision'] ✅ [architecture_decision] -> [Voice @Raj] Use PostgreSQL... ✅ [action_item] -> [Voice @Raj] Raj to set up schema by Thursday ✅ 3 signal(s) extracted from transcript ✅ Voice attribution on all extracted signals ✅ Knowledge base query answered: Based on what @Raj said... ✅ handle_voice_telegram importable ✅ handle_video_note_telegram importable 🎉 MILESTONE 21 PASSED — Voice handler integrated into signal pipeline ``` --- ## MILESTONE 22: Voice Attribution in /ask + /voicelog Command (160%) **Goal:** Two things that complete the voice intelligence loop. First, update the Query Agent so when it answers using a voice-sourced signal, the answer cites it explicitly: *"Based on what @Raj said in a voice note on March 14th (45s)..."*. Second, a `/voicelog` command — a searchable audit trail of everything your team has ever said aloud in a voice note, filterable by speaker, signal type, or keyword. ### Step 22.1 — Update the Query Agent for voice citation In `thirdeye/backend/agents/query_agent.py`, add this helper function and wire it into your context-building logic: ```python # Add to backend/agents/query_agent.py def _format_signal_for_context(signal: dict) -> str: """ Format a ChromaDB signal as a context snippet for the Query Agent LLM. Voice-sourced signals get explicit attribution so the LLM cites them correctly. """ source = signal.get("source", "chat") sig_type = signal.get("type", "unknown") summary = signal.get("summary", "") timestamp = signal.get("timestamp", "") date_str = "" if timestamp: try: from datetime import datetime dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) date_str = dt.strftime("%b %d") except Exception: date_str = timestamp[:10] if source == "voice": speaker = signal.get("speaker", "Unknown") duration = signal.get("voice_duration", 0) duration_str = f"{duration}s" if duration else "?" return ( f"[VOICE NOTE — @{speaker} on {date_str} ({duration_str})] " f"[{sig_type}] {summary}" ) if source == "document": return f"[DOCUMENT — {date_str}] [{sig_type}] {summary}" if source == "link": return f"[WEB LINK — {date_str}] [{sig_type}] {summary}" if sig_type in ("meet_decision", "meet_action_item", "meet_blocker", "meet_summary"): meeting_id = signal.get("meeting_id", "") return f"[MEETING {meeting_id} — {date_str}] [{sig_type}] {summary}" entities = signal.get("entities", []) sender_str = entities[0] if entities else "" return f"[CHAT — {sender_str} on {date_str}] [{sig_type}] {summary}" ``` Then add this line to your existing `QUERY_SYSTEM_PROMPT` string (or concatenate it): ```python # Add to your QUERY_SYSTEM_PROMPT in query_agent.py VOICE_CITATION_INSTRUCTION = """ When context includes [VOICE NOTE — @name on Date (Xs)] signals, ALWAYS cite the voice note explicitly. Example: "Based on what @Raj said in a voice note on Mar 14 (45s), the team decided to use PostgreSQL." Never flatten voice signals into generic "the team discussed" language. Always name the speaker and source. """ # Then in your query call: system_prompt = EXISTING_PROMPT + VOICE_CITATION_INSTRUCTION ``` ### Step 22.2 — Add the /voicelog command Add to `thirdeye/backend/bot/commands.py`: ```python # ----------------------------------------------------------------- # /voicelog command — add to commands.py # ----------------------------------------------------------------- async def cmd_voicelog(update, context): """ /voicelog [filter] Audit trail of all voice note decisions, actions, and blockers in this group. Usage: /voicelog — all voice-sourced signals (last 20) /voicelog decisions — only decisions from voice notes /voicelog actions — only action items from voice notes /voicelog blockers — only blockers from voice notes /voicelog @Raj — only voice notes by Raj /voicelog search [query] — search voice note content """ from backend.db.chroma import query_signals, get_all_signals from backend.agents.voice_transcriber import format_duration from datetime import datetime chat_id = str(update.effective_chat.id) args = context.args or [] filter_type = None filter_speaker = None search_query = None if args: first = args[0].lower() if first == "decisions": filter_type = "architecture_decision" elif first == "actions": filter_type = "action_item" elif first == "blockers": filter_type = "blocker" elif first == "search" and len(args) > 1: search_query = " ".join(args[1:]) elif first.startswith("@"): filter_speaker = first[1:] await update.message.reply_text("🎤 Searching voice notes...", parse_mode="Markdown") if search_query: all_signals = query_signals(chat_id, search_query, n_results=30) else: all_signals = get_all_signals(chat_id) # Filter to voice-sourced signals only voice_signals = [ s for s in all_signals if s.get("source") == "voice" or s.get("type") == "voice_transcript" or "[Voice @" in s.get("summary", "") ] if filter_type: voice_signals = [s for s in voice_signals if s.get("type") == filter_type] if filter_speaker: voice_signals = [ s for s in voice_signals if filter_speaker.lower() in s.get("speaker", "").lower() or filter_speaker.lower() in str(s.get("entities", [])).lower() ] # Prefer structured signals; fall back to raw transcripts if none structured = [s for s in voice_signals if s.get("type") != "voice_transcript"] display_signals = structured if structured else voice_signals # Sort by timestamp descending def _ts(s): try: return datetime.fromisoformat(s.get("timestamp", "").replace("Z", "+00:00")) except Exception: return datetime.min display_signals.sort(key=_ts, reverse=True) display_signals = display_signals[:20] if not display_signals: await update.message.reply_text( "📭 No voice note signals found. Voice notes are transcribed automatically when sent here.", parse_mode="Markdown", ) return type_emoji = { "architecture_decision": "🏗️", "tech_debt": "⚠️", "action_item": "📌", "blocker": "🚧", "feature_request": "💡", "promise": "🤝", "risk": "🔴", "recurring_bug": "🐛", "voice_transcript": "🎤", } filter_label = "" if filter_type: filter_label = f" — {filter_type.replace('_', ' ').title()}" elif filter_speaker: filter_label = f" — @{filter_speaker}" elif search_query: filter_label = f" — '{search_query}'" lines = [f"🎤 *Voice Note Audit Trail*{filter_label}\n_{len(display_signals)} signal(s)_\n"] for sig in display_signals: ts = sig.get("timestamp", "") date_str = "" if ts: try: dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) date_str = dt.strftime("%b %d") except Exception: date_str = ts[:10] speaker = sig.get("speaker", "") duration = sig.get("voice_duration", 0) duration_str = format_duration(duration) if duration else "" emoji = type_emoji.get(sig.get("type", ""), "🎤") summary = sig.get("summary", "") if summary.startswith("[Voice @"): summary = summary.split("] ", 1)[-1] if "] " in summary else summary meta = " · ".join(filter(None, [f"@{speaker}" if speaker else "", date_str, duration_str])) lines.append(f"{emoji} *{meta}*\n _{summary[:100]}_\n") await update.message.reply_text("\n".join(lines), parse_mode="Markdown") ``` ### Step 22.3 — Register in bot.py ```python from backend.bot.commands import cmd_voicelog application.add_handler(CommandHandler("voicelog", cmd_voicelog)) ``` Update your `/start` message: ```python "🎤 /voicelog — Audit trail of all voice note decisions\n" "🎤 /voicelog @name — Voice notes by a specific person\n" "🎤 /voicelog search [q] — Search voice note content\n" ``` ### ✅ TEST MILESTONE 22 Create file: `thirdeye/scripts/test_m22.py` ```python """ Test Milestone 22: Voice attribution in /ask + /voicelog. """ import asyncio import os import sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) TRANSCRIPT_POSTGRES = "We decided to go with PostgreSQL. Final. Raj will set up the schema by Thursday." TRANSCRIPT_BLOCKER = "Dashboard is still blocked on design specs. Two weeks now. Hard blocker for the sprint." TRANSCRIPT_BUG = "Checkout timeout is happening again. Critical. Someone needs to investigate today." async def _seed_voice_signals(group_id: str): """Seed a group with voice-sourced signals for testing.""" from backend.pipeline import process_message_batch, set_lens from backend.agents.voice_handler import build_voice_transcript_signal, _inject_voice_metadata from backend.db.chroma import store_signals set_lens(group_id, "dev") sessions = [ ("Raj", TRANSCRIPT_POSTGRES, "f1", 22, "2026-03-14T10:00:00Z"), ("Alex", TRANSCRIPT_BLOCKER, "f2", 18, "2026-03-17T11:00:00Z"), ("Sam", TRANSCRIPT_BUG, "f3", 15, "2026-03-19T09:00:00Z"), ] for sender, transcript, file_id, duration, timestamp in sessions: ts_signal = build_voice_transcript_signal( transcript=transcript, sender=sender, group_id=group_id, voice_file_id=file_id, duration_seconds=duration, language="en", timestamp=timestamp, ) store_signals(group_id, [ts_signal]) messages = [{"sender": sender, "text": transcript, "timestamp": timestamp}] extracted = await process_message_batch(group_id, messages) voice_meta = {"sender": sender, "voice_file_id": file_id, "duration_seconds": duration, "language": "en"} _inject_voice_metadata(extracted, voice_meta) async def test_signal_formatter(): """Test that voice signals format with attribution prefix.""" from backend.agents.query_agent import _format_signal_for_context print("Testing signal formatter with voice attribution...") voice_signal = { "type": "architecture_decision", "summary": "Team decided to use PostgreSQL", "source": "voice", "speaker": "Raj", "voice_duration": 45, "timestamp": "2026-03-14T10:00:00Z", "entities": ["@Raj"], } formatted = _format_signal_for_context(voice_signal) assert "[VOICE NOTE" in formatted, "Expected [VOICE NOTE] prefix" assert "@Raj" in formatted assert "Mar 14" in formatted assert "45s" in formatted print(f" ✅ Voice: {formatted[:120]}") chat_signal = {"type": "tech_debt", "summary": "JWT hardcoded", "source": "chat", "timestamp": "2026-03-15T09:00:00Z", "entities": ["@Alex"]} assert "[CHAT" in _format_signal_for_context(chat_signal) print(f" ✅ Chat signal formatted correctly") doc_signal = {"type": "document_knowledge", "summary": "OAuth required", "source": "document", "timestamp": "2026-03-16T09:00:00Z", "entities": []} assert "[DOCUMENT" in _format_signal_for_context(doc_signal) print(f" ✅ Document signal formatted correctly") async def test_voice_query_attribution(): """Test that /ask returns voice attribution in its answer.""" from backend.pipeline import query_knowledge import chromadb from backend.config import CHROMA_DB_PATH print("\nTesting /ask returns voice attribution...") group_id = "test_voice_m22_ask" await _seed_voice_signals(group_id) answer = await query_knowledge(group_id, "What database did we decide to use?") assert len(answer) > 10 relevant = any(w in answer.lower() for w in ["postgres", "raj", "voice", "database"]) assert relevant, f"Answer did not surface voice-sourced decision. Got: {answer[:200]}" print(f" ✅ Answer surfaces voice decision: {answer[:150]}...") has_citation = any(phrase in answer.lower() for phrase in ["voice note", "@raj", "raj said", "mar 14"]) if has_citation: print(f" ✅ Explicit voice attribution present in answer") else: print(f" ⚠️ Answer correct but attribution phrasing varies by provider (acceptable)") # Cleanup import chromadb as cdb client = cdb.PersistentClient(path=CHROMA_DB_PATH) try: client.delete_collection(f"ll_{group_id}") except Exception: pass async def test_voicelog_filtering(): """Test voicelog retrieval and speaker filtering.""" from backend.db.chroma import get_all_signals import chromadb from backend.config import CHROMA_DB_PATH print("\nTesting voicelog signal retrieval and filtering...") group_id = "test_voice_m22_log" await _seed_voice_signals(group_id) all_signals = get_all_signals(group_id) voice_signals = [ s for s in all_signals if s.get("source") == "voice" or s.get("type") == "voice_transcript" or "[Voice @" in s.get("summary", "") ] assert len(voice_signals) > 0, "Expected voice-sourced signals" print(f" ✅ Found {len(voice_signals)} voice-sourced signal(s)") raj_signals = [ s for s in voice_signals if "raj" in s.get("speaker", "").lower() or "raj" in str(s.get("entities", [])).lower() ] assert len(raj_signals) > 0, "Expected signals from Raj" print(f" ✅ Found {len(raj_signals)} signal(s) from @Raj") structured = [s for s in voice_signals if s.get("type") != "voice_transcript"] print(f" ✅ {len(structured)} structured, {len(voice_signals) - len(structured)} raw transcripts") # Cleanup client = chromadb.PersistentClient(path=CHROMA_DB_PATH) try: client.delete_collection(f"ll_{group_id}") except Exception: pass async def test_voicelog_command_importable(): """Test that cmd_voicelog imports without errors.""" print("\nTesting cmd_voicelog import...") from backend.bot.commands import cmd_voicelog print(" ✅ cmd_voicelog importable") async def test_mixed_source_query(): """Test that /ask uses voice + chat signals together.""" from backend.pipeline import process_message_batch, query_knowledge, set_lens from backend.agents.voice_handler import build_voice_transcript_signal, _inject_voice_metadata from backend.db.chroma import store_signals import chromadb from backend.config import CHROMA_DB_PATH print("\nTesting mixed-source query (voice + chat)...") group_id = "test_voice_m22_mixed" set_lens(group_id, "dev") # Chat signal: Redis await process_message_batch(group_id, [ {"sender": "Alex", "text": "I think we should use Redis for the cache.", "timestamp": "2026-03-10T09:00:00Z"} ]) # Voice signal (more recent): overrides to PostgreSQL transcript = "Just to be clear — we're going with PostgreSQL for everything. Redis is off the table." ts_signal = build_voice_transcript_signal( transcript=transcript, sender="Raj", group_id=group_id, voice_file_id="f_override", duration_seconds=20, language="en", timestamp="2026-03-21T10:00:00Z", ) store_signals(group_id, [ts_signal]) extracted = await process_message_batch(group_id, [ {"sender": "Raj", "text": transcript, "timestamp": "2026-03-21T10:00:00Z"} ]) _inject_voice_metadata(extracted, {"sender": "Raj", "voice_file_id": "f_override", "duration_seconds": 20, "language": "en"}) answer = await query_knowledge(group_id, "What did we decide about caching?") assert any(w in answer.lower() for w in ["postgres", "redis", "cache"]) print(f" ✅ Mixed-source query answered: {answer[:120]}...") # Cleanup client = chromadb.PersistentClient(path=CHROMA_DB_PATH) try: client.delete_collection(f"ll_{group_id}") except Exception: pass async def main(): print("Running Milestone 22 tests...\n") await test_signal_formatter() await test_voice_query_attribution() await test_voicelog_filtering() await test_voicelog_command_importable() await test_mixed_source_query() print("\n🎉 MILESTONE 22 PASSED — Voice attribution in /ask, /voicelog working") asyncio.run(main()) ``` Run: `cd thirdeye && python scripts/test_m22.py` **Expected output:** ``` ✅ Voice: [VOICE NOTE — @Raj on Mar 14 (45s)] [architecture_decision] Team decided... ✅ Chat signal formatted correctly ✅ Document signal formatted correctly ✅ Answer surfaces voice decision: Based on what @Raj said in a voice note on Mar 14... ✅ Explicit voice attribution present in answer ✅ Found 9 voice-sourced signal(s) ✅ Found 3 signal(s) from @Raj ✅ cmd_voicelog importable ✅ Mixed-source query answered: The team discussed caching... 🎉 MILESTONE 22 PASSED — Voice attribution in /ask, /voicelog working ``` --- ## MILESTONE SUMMARY (Updated) | # | Milestone | What You Have | % | |---|---|---|---| | 0–10 | Core System | Full ThirdEye pipeline, Telegram bot, dashboard | 0–100% | | 11 | Document Ingestion | PDFs/DOCX/TXT → chunked → RAG | 105% | | 12 | Tavily Web Search | Query agent searches web on fallback | 110% | | 13 | Link Fetch & Ingestion | URLs → fetched → stored as signals | 115% | | 14 | Meet Chrome Extension | Browser captures Meet audio → POSTs chunks | 120% | | 15 | Meet Signal Processing | Transcript → decisions/actions/blockers → ChromaDB | 125% | | 16 | Meet Telegram Commands | /meetsum, /meetask, /meetmatch | 130% | | 17 | Jira API Client | Async Jira REST wrapper | 135% | | 18 | Jira Signal Agent | LLM converts signals → well-formed tickets | 140% | | 19 | Jira Telegram Commands | /jira, /jirastatus, /jirasearch, /jiraraised, /jirawatch | 145% | | **20** | **Groq Whisper Client** | **Audio bytes → transcript. Zero new keys. Zero new cost.** | **150%** | | **21** | **Voice Handler + Pipeline** | **voice/video_note → transcribe → extract signals → ChromaDB** | **155%** | | **22** | **Voice Attribution + /voicelog** | **/ask cites voice notes. /voicelog audits all voice decisions.** | **160%** | --- ## FILE CHANGE SUMMARY ### New Files Created ``` thirdeye/backend/agents/voice_transcriber.py # Milestone 20 — Groq Whisper client thirdeye/backend/agents/voice_handler.py # Milestone 21 — pipeline orchestrator thirdeye/scripts/test_m20.py # Milestone 20 test thirdeye/scripts/test_m21.py # Milestone 21 test thirdeye/scripts/test_m22.py # Milestone 22 test ``` ### Existing Files Modified ``` thirdeye/.env # Pre-work: 5 new VOICE_* vars thirdeye/backend/config.py # Pre-work: voice config vars thirdeye/backend/agents/query_agent.py # M22: _format_signal_for_context() + citation prompt thirdeye/backend/bot/commands.py # M21: handle_voice_telegram, handle_video_note_telegram # M22: cmd_voicelog thirdeye/backend/bot/bot.py # M21: VOICE + VIDEO_NOTE MessageHandlers # M22: /voicelog CommandHandler ``` ### Updated Repo Structure (additions only) ``` thirdeye/ ├── backend/ │ ├── agents/ │ │ ├── voice_transcriber.py # NEW — Groq Whisper API client │ │ └── voice_handler.py # NEW — pipeline orchestrator │ │ │ └── agents/ │ └── query_agent.py # MODIFIED — voice-aware context formatting + citation instruction │ ├── bot/ │ ├── commands.py # MODIFIED — voice handlers + cmd_voicelog │ └── bot.py # MODIFIED — VOICE + VIDEO_NOTE handlers, /voicelog │ └── scripts/ ├── test_m20.py ├── test_m21.py └── test_m22.py ``` --- ## UPDATED COMMANDS REFERENCE ``` NEW — Voice Intelligence: /voicelog — Audit trail of all voice note signals (last 20) /voicelog @name — Voice notes by a specific team member /voicelog decisions — Only decisions extracted from voice notes /voicelog actions — Only action items from voice notes /voicelog blockers — Only blockers from voice notes /voicelog search [q] — Search voice note content by keyword ENHANCED — existing commands now voice-aware: /ask [q] — Now cites voice notes: "Based on what @Raj said in a voice note on Mar 14 (45s)..." PASSIVE — no command needed: • Voice messages → 👂 react → download OGG → Groq Whisper → transcript → signal extraction • Video notes → 👂 react → download MP4 → Groq Whisper → transcript → signal extraction ``` --- ## HOW THE FULL VOICE FLOW WORKS (End-to-End) ``` 1. @Raj sends a 45s voice note: "So we're going with PostgreSQL. Final decision. Raj will set up the schema by Thursday." 2. ThirdEye reacts with 👂 immediately. 3. Bot downloads OGG audio bytes from Telegram CDN. 4. POST to Groq Whisper (whisper-large-v3, same GROQ_API_KEY): https://api.groq.com/openai/v1/audio/transcriptions ~1 second for a 45s clip. Free. 5. Whisper returns: "So we're going with PostgreSQL. Final decision. Raj will set up the schema by Thursday." 6. voice_transcript signal stored in ChromaDB: type="voice_transcript", source="voice", speaker="Raj", voice_duration=45, raw_quote="So we're going with..." 7. Transcript runs through process_message_batch() like any text message. Signal extraction finds: [Voice @Raj] architecture_decision: Use PostgreSQL [MEDIUM] [Voice @Raj] action_item: Raj to set up schema by Thursday [MEDIUM] 8. Bot replies: 🎤 Raj (45s) — transcribed "So we're going with PostgreSQL. Final decision..." `2 signals extracted` 9. Later: /ask What database are we using? "Based on what @Raj said in a voice note on Mar 21 (45s), the team decided to use PostgreSQL. Raj is also setting up the schema by Thursday." 10. /voicelog shows: 🏗️ @Raj · Mar 21 · 45s Use PostgreSQL for database 📌 @Raj · Mar 21 · 45s Set up schema by Thursday ``` --- ## THE NOVELTY ARGUMENT Every "team intelligence" tool on the market — Slack AI, Prism, Runbear, Notion AI — processes text. In practice, especially in Indian startup culture and any informal team, **the most important decisions travel as voice notes**. The CTO explaining the architecture in a 2-minute voice note. The PM clarifying the client scope during a commute. The lead engineer flagging a risk off the cuff. None of that was ever captured. Until now. ThirdEye is the first team intelligence system to treat voice notes as first-class signals with full attribution, vector search, and automatic Jira ticket raising. The knowledge gap — the one that matters most — is closed. --- *Every milestone has a test. Every test must pass. No skipping.*