Files
B.Tech-Project-III/thirdeye/backend/agents/voice_handler.py
2026-04-05 00:43:23 +05:30

281 lines
10 KiB
Python

"""
Voice Handler
Orchestrates the full pipeline for Telegram voice messages and video notes:
Telegram voice/video_note message
-> download audio bytes
-> transcribe via Groq Whisper (voice_transcriber.py)
-> build a voice_transcript signal (stored raw for full-text search)
-> run transcript through process_message_batch (signal extraction)
-> all extracted signals carry voice attribution metadata
Voice metadata attached to every extracted signal:
source: "voice"
voice_file_id: Telegram file ID
voice_duration: seconds
speaker: sender display name
"""
import logging
import uuid
from datetime import datetime, timezone
from backend.agents.voice_transcriber import (
transcribe_audio, download_telegram_audio, format_duration
)
from backend.config import ENABLE_VOICE_TRANSCRIPTION, VOICE_STORE_TRANSCRIPT
from backend.db.chroma import store_signals
from backend.pipeline import process_message_batch
logger = logging.getLogger("thirdeye.agents.voice_handler")
# --- Voice transcript signal builder -----------------------------------------
def build_voice_transcript_signal(
transcript: str,
sender: str,
group_id: str,
voice_file_id: str,
duration_seconds: int,
language: str,
timestamp: str,
) -> dict:
"""
Build a voice_transcript signal that stores the full raw transcription.
Always stored alongside extracted signals so the full transcript is
searchable in ChromaDB even if no structured signals were extracted.
"""
return {
"id": str(uuid.uuid4()),
"type": "voice_transcript",
"summary": f"[Voice {format_duration(duration_seconds)}] @{sender}: {transcript[:200]}",
"raw_quote": transcript,
"severity": "low",
"status": "transcribed",
"sentiment": "neutral",
"urgency": "none",
"entities": [f"@{sender}"],
"keywords": _extract_voice_keywords(transcript),
"timestamp": timestamp,
"group_id": group_id,
"lens": "voice",
"source": "voice",
"voice_file_id": voice_file_id,
"voice_duration": duration_seconds,
"voice_language": language,
"speaker": sender,
}
def _extract_voice_keywords(text: str) -> list[str]:
"""Simple keyword extraction from transcript text."""
stopwords = {
"the", "a", "an", "is", "are", "was", "were", "will", "to", "of",
"in", "on", "at", "for", "by", "with", "this", "that", "and", "or",
"but", "we", "i", "it", "be", "do", "have", "has", "had", "not",
"so", "just", "like", "yeah", "okay", "um", "uh", "you", "me",
}
words = text.lower().split()
keywords = [w.strip(".,!?;:\"'") for w in words if len(w) > 3 and w not in stopwords]
return list(dict.fromkeys(keywords))[:12]
def _inject_voice_metadata(signals: list, voice_meta: dict) -> list[dict]:
"""
Inject voice attribution into every signal extracted from a voice transcript.
Accepts both Signal Pydantic model objects and plain dicts.
This ensures /ask can cite the voice source in its answers.
"""
result = []
for signal in signals:
sig = signal.model_dump() if hasattr(signal, "model_dump") else dict(signal)
sig["source"] = "voice"
sig["voice_file_id"] = voice_meta.get("voice_file_id", "")
sig["voice_duration"] = voice_meta.get("duration_seconds", 0)
sig["voice_language"] = voice_meta.get("language", "")
sig["speaker"] = voice_meta.get("sender", "Unknown")
if "[Voice]" not in sig.get("summary", ""):
sig["summary"] = f"[Voice @{voice_meta.get('sender', '?')}] {sig['summary']}"
result.append(sig)
return result
# --- Fallback signal builder -------------------------------------------------
# Keywords that hint at a signal type when the LLM extraction returns nothing
_FALLBACK_TYPE_HINTS = {
"feature_request": {
"need", "needs", "required", "require", "want", "should", "missing",
"add", "feature", "ui", "ux", "design", "change", "changes", "update",
"improve", "improvement", "responsiveness", "responsive",
},
"blocker": {
"blocked", "blocking", "blocker", "stuck", "waiting", "can't", "cannot",
"issue", "problem", "broken", "fails", "failing",
},
"action_item": {
"will", "going", "plan", "todo", "do", "fix", "implement", "setup",
"create", "build", "deploy", "check",
},
"risk": {
"risk", "risky", "concern", "worried", "urgent", "urgently", "critical",
"deadline", "delay", "late",
},
}
def _build_fallback_signal(
transcript: str,
sender: str,
group_id: str,
timestamp: str,
voice_meta: dict,
) -> dict:
"""
Build a best-effort structured signal from a voice transcript when the LLM
returned 0 signals. Picks the most likely signal type from keyword hints,
falling back to 'feature_request' as the safe default.
"""
words = set(transcript.lower().split())
scores = {sig_type: len(words & hints) for sig_type, hints in _FALLBACK_TYPE_HINTS.items()}
best_type = max(scores, key=scores.get) if any(scores.values()) else "feature_request"
urgency_words = {"urgent", "urgently", "asap", "immediately", "critical", "now"}
severity = "high" if words & urgency_words else "medium"
summary = transcript[:200].strip()
if len(transcript) > 200:
summary += "..."
return {
"id": str(uuid.uuid4()),
"type": best_type,
"summary": f"[Voice @{sender}] {summary}",
"raw_quote": transcript[:500],
"severity": severity,
"status": "unresolved",
"sentiment": "neutral",
"urgency": "high" if severity == "high" else "medium",
"entities": [f"@{sender}"],
"keywords": _extract_voice_keywords(transcript),
"timestamp": timestamp,
"group_id": group_id,
"lens": "voice",
"source": "voice",
"speaker": sender,
"voice_file_id": voice_meta.get("voice_file_id", ""),
"voice_duration": voice_meta.get("duration_seconds", 0),
"voice_language": voice_meta.get("language", ""),
}
# --- Main handler ------------------------------------------------------------
async def handle_voice_message(
bot,
group_id: str,
sender: str,
file_id: str,
duration_seconds: int,
message_date,
is_video_note: bool = False,
) -> dict:
"""
Full pipeline for a single voice or video note message.
Returns:
{"ok": True, "transcript": "...", "signals_extracted": 3, "duration": 45, ...}
OR {"ok": False, "reason": "...", "error": "..."}
"""
if not ENABLE_VOICE_TRANSCRIPTION:
return {"ok": False, "reason": "disabled", "error": "Voice transcription is disabled"}
msg_type = "video note" if is_video_note else "voice message"
logger.info(f"Processing {msg_type} from {sender} in {group_id} ({duration_seconds}s)")
# 1. Download audio
try:
audio_bytes = await download_telegram_audio(bot, file_id)
except Exception as e:
logger.error(f"Failed to download audio from {sender}: {e}")
return {"ok": False, "reason": "download_failed", "error": str(e)}
# 2. Transcribe
filename = "audio.mp4" if is_video_note else "audio.ogg"
transcription = await transcribe_audio(
audio_bytes,
filename=filename,
duration_seconds=duration_seconds,
)
if not transcription["ok"]:
logger.info(f"Transcription skipped for {sender}: {transcription['reason']}")
return {"ok": False, "reason": transcription["reason"], "error": transcription.get("error", "")}
transcript = transcription["transcript"]
language = transcription.get("language", "unknown")
timestamp = (
message_date.replace(tzinfo=timezone.utc).isoformat()
if message_date else datetime.utcnow().isoformat()
)
# 3. Store raw voice transcript signal
if VOICE_STORE_TRANSCRIPT:
transcript_signal = build_voice_transcript_signal(
transcript=transcript,
sender=sender,
group_id=group_id,
voice_file_id=file_id,
duration_seconds=duration_seconds,
language=language,
timestamp=timestamp,
)
store_signals(group_id, [transcript_signal])
logger.info(f"Voice transcript stored for {sender} ({len(transcript)} chars)")
# 4. Run through signal extraction pipeline — treat as a regular text message
voice_meta = {
"sender": sender,
"voice_file_id": file_id,
"duration_seconds": duration_seconds,
"language": language,
}
messages = [{
"sender": sender,
"text": transcript,
"timestamp": timestamp,
"source": "voice",
"voice_file_id": file_id,
"voice_duration": duration_seconds,
}]
try:
extracted_signals = await process_message_batch(group_id, messages)
extracted_signals = _inject_voice_metadata(extracted_signals, voice_meta)
signals_count = len(extracted_signals)
# Fallback: if the LLM extracted nothing from a meaningful voice message,
# create a generic signal so the content is still searchable as structured data.
if signals_count == 0 and len(transcript.split()) >= 5:
fallback = _build_fallback_signal(transcript, sender, group_id, timestamp, voice_meta)
store_signals(group_id, [fallback])
signals_count = 1
logger.info(f"Voice fallback signal created for {sender} (0 from LLM)")
except Exception as e:
logger.error(f"Signal extraction failed for voice from {sender}: {e}")
signals_count = 0
logger.info(
f"Voice pipeline complete: {sender}, {duration_seconds}s, "
f"{signals_count} signals, transcript={len(transcript)} chars"
)
return {
"ok": True,
"transcript": transcript,
"signals_extracted": signals_count,
"duration": duration_seconds,
"sender": f"@{sender}",
"language": language,
}