""" Test Milestone 21: Voice handler pipeline integration. Uses synthetic transcript text to avoid needing real audio in CI. """ import asyncio import os import sys import uuid sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) TRANSCRIPT_ARCHITECTURE = """ So I just wanted to quickly explain the architecture decision. We've been going back and forth on the database and I think we should just go with PostgreSQL. The main reason is Raj already knows it and we have less than two weeks to ship. Final decision — PostgreSQL. Raj can you start the schema by Thursday? """ TRANSCRIPT_BLOCKER = """ The thing I wanted to flag is the design specs are still not done. I've been waiting for two weeks and I literally cannot start the dashboard without them. This is a hard blocker. If I don't get the specs by Wednesday we'll miss Friday. """ async def test_voice_transcript_signal_builder(): """Test that the voice transcript signal is correctly structured.""" from backend.agents.voice_handler import build_voice_transcript_signal print("Testing voice transcript signal builder...") signal = build_voice_transcript_signal( transcript=TRANSCRIPT_ARCHITECTURE.strip(), sender="Raj", group_id="test_voice_m21", voice_file_id="fake_file_id_123", duration_seconds=45, language="en", timestamp="2026-03-21T10:00:00Z", ) assert signal["type"] == "voice_transcript" assert signal["source"] == "voice" assert signal["speaker"] == "Raj" assert "@Raj" in signal["entities"] assert signal["voice_duration"] == 45 assert signal["voice_language"] == "en" assert len(signal["raw_quote"]) > 50 # full transcript stored assert len(signal["keywords"]) > 0 print(f" ✅ type: {signal['type']}, source: {signal['source']}, speaker: {signal['speaker']}") print(f" ✅ keywords: {signal['keywords'][:5]}") print(f" ✅ summary: {signal['summary'][:100]}") async def test_voice_metadata_injection(): """Test that voice metadata is injected into extracted signals.""" from backend.agents.voice_handler import _inject_voice_metadata print("\nTesting voice metadata injection...") raw_signals = [ {"id": "1", "type": "architecture_decision", "summary": "Use PostgreSQL", "severity": "medium"}, {"id": "2", "type": "action_item", "summary": "Raj to set up schema by Thursday", "severity": "medium"}, ] voice_meta = {"sender": "Raj", "voice_file_id": "file_abc123", "duration_seconds": 45, "language": "en"} enriched = _inject_voice_metadata(raw_signals, voice_meta) for sig in enriched: assert sig["source"] == "voice" assert sig["speaker"] == "Raj" assert sig["voice_file_id"] == "file_abc123" assert "[Voice @Raj]" in sig["summary"] print(f" ✅ [{sig['type']}] -> {sig['summary'][:80]}") async def test_full_pipeline_with_transcript(): """ Full pipeline test: inject synthetic transcript -> signal extraction -> ChromaDB. Bypasses the Whisper API entirely. """ from backend.pipeline import process_message_batch, query_knowledge, set_lens from backend.agents.voice_handler import build_voice_transcript_signal, _inject_voice_metadata from backend.db.chroma import store_signals import chromadb from backend.config import CHROMA_DB_PATH print("\nTesting full pipeline with synthetic transcript...") group_id = "test_voice_m21_pipeline" set_lens(group_id, "dev") sender = "Raj" timestamp = "2026-03-21T10:00:00Z" voice_meta = {"sender": sender, "voice_file_id": "test_file_id", "duration_seconds": 45, "language": "en"} # Store raw transcript transcript_signal = build_voice_transcript_signal( transcript=TRANSCRIPT_ARCHITECTURE.strip(), sender=sender, group_id=group_id, voice_file_id="test_file_id", duration_seconds=45, language="en", timestamp=timestamp, ) store_signals(group_id, [transcript_signal]) print(f" ✅ Raw voice transcript stored in ChromaDB") # Run through signal extraction messages = [{"sender": sender, "text": TRANSCRIPT_ARCHITECTURE.strip(), "timestamp": timestamp}] extracted = await process_message_batch(group_id, messages) enriched = _inject_voice_metadata(extracted, voice_meta) print(f" ✅ {len(enriched)} signal(s) extracted from transcript") # Verify voice attribution for sig in enriched: assert sig.get("source") == "voice" assert "[Voice @Raj]" in sig.get("summary", "") print(f" ✅ Voice attribution on all extracted signals") # Query knowledge base answer = await query_knowledge(group_id, "What database did we decide on?") assert any(w in answer.lower() for w in ["postgres", "database", "sql"]) print(f" ✅ Knowledge base query answered: {answer[:100]}...") # Cleanup client = chromadb.PersistentClient(path=CHROMA_DB_PATH) try: client.delete_collection(f"ll_{group_id}") except Exception: pass async def test_handler_functions_importable(): """Test that the Telegram handler functions import correctly.""" print("\nTesting handler function imports...") from backend.bot.bot import handle_voice_telegram, handle_video_note_telegram print(" ✅ handle_voice_telegram importable") print(" ✅ handle_video_note_telegram importable") async def main(): print("Running Milestone 21 tests...\n") await test_voice_transcript_signal_builder() await test_voice_metadata_injection() await test_full_pipeline_with_transcript() await test_handler_functions_importable() print("\n🎉 MILESTONE 21 PASSED — Voice handler integrated into signal pipeline") asyncio.run(main())