""" Meet Ingestor Agent Processes raw Google Meet transcript chunks and extracts structured signals. Signal types produced: meet_decision — A decision made during the meeting meet_action_item — A task assigned to someone meet_blocker — A blocker or dependency raised meet_risk — A risk or concern identified meet_open_q — An unresolved question left open meet_summary — Full meeting summary (emitted on is_final=True) meet_chunk_raw — Raw transcript chunk (always stored, for full-text search) """ import asyncio import json import logging import uuid from datetime import datetime from backend.providers import call_llm from backend.db.chroma import store_signals logger = logging.getLogger("thirdeye.agents.meet_ingestor") # ─── Extraction prompt ─────────────────────────────────────────────────────── EXTRACTION_SYSTEM_PROMPT = """You are an expert meeting analyst. You receive raw transcript chunks from a Google Meet recording and extract structured signals. Extract ONLY signals that are clearly present. Do NOT hallucinate or infer beyond what is stated. Return ONLY a valid JSON object with this exact structure: { "decisions": [ {"text": "...", "owner": "@name or null", "confidence": "high|medium|low"} ], "action_items": [ {"text": "...", "owner": "@name or null", "due": "date string or null", "confidence": "high|medium|low"} ], "blockers": [ {"text": "...", "blocking_what": "...", "confidence": "high|medium|low"} ], "risks": [ {"text": "...", "severity": "high|medium|low", "confidence": "high|medium|low"} ], "open_questions": [ {"text": "...", "confidence": "high|medium|low"} ] } Rules: - If a category has nothing, use an empty array [] - owner must start with @ if it's a person's name (e.g. "@Alex") - text must be a clear, standalone sentence — not a fragment - Only include confidence "high" if the signal is unambiguous - Do NOT reproduce filler words, pleasantries, or off-topic banter - Return JSON only — no markdown, no preamble, no explanation""" SUMMARY_SYSTEM_PROMPT = """You are a meeting intelligence expert. Given a full meeting transcript (possibly from multiple chunks), write a concise but complete meeting summary. Structure your summary as: 1. One-sentence overview (what was the meeting about) 2. Key decisions made (bullet points, max 5) 3. Action items assigned (who does what by when) 4. Blockers or risks raised 5. Open questions still unresolved Keep the summary under 400 words. Be specific. Use names when available. Do NOT use filler phrases like "the team discussed" — just state what was decided/agreed/assigned.""" # ─── Signal builder ───────────────────────────────────────────────────────── def _build_signal( signal_type: str, summary: str, raw_quote: str, severity: str, entities: list[str], keywords: list[str], timestamp: str, group_id: str, meeting_id: str, urgency: str = "none", status: str = "open", ) -> dict: return { "id": str(uuid.uuid4()), "type": signal_type, "summary": summary, "raw_quote": raw_quote[:500] if raw_quote else "", "severity": severity, "status": status, "sentiment": "neutral", "urgency": urgency, "entities": entities, "keywords": keywords, "timestamp": timestamp, "group_id": group_id, "lens": "meet", "meeting_id": meeting_id, } def _extract_entities(text: str, owner: str = None) -> list[str]: """Extract entity strings from text (names starting with @).""" import re entities = re.findall(r"@[\w]+", text) if owner and owner.startswith("@"): entities.append(owner) return list(set(entities)) def _extract_keywords(text: str) -> list[str]: """Simple keyword extraction: lowercase meaningful words.""" stopwords = {"the", "a", "an", "is", "are", "was", "were", "will", "to", "of", "in", "on", "at", "for", "by", "with", "this", "that", "and", "or", "but", "we", "i", "it", "be", "do", "have", "has", "had", "not"} words = text.lower().split() keywords = [w.strip(".,!?;:\"'") for w in words if len(w) > 3 and w not in stopwords] return list(dict.fromkeys(keywords))[:10] # deduplicate, keep first 10 # ─── Main processing function ──────────────────────────────────────────────── async def process_meet_chunk( meeting_id: str, group_id: str, chunk_index: int, text: str, speaker: str, timestamp: str, is_final: bool, ): """ Full pipeline for a transcript chunk: 1. Always store raw chunk for full-text search 2. Extract structured signals via LLM 3. If is_final, generate a full meeting summary """ logger.info(f"Processing meet chunk {chunk_index} for meeting {meeting_id} ({len(text)} chars)") signals_to_store = [] # 1. Always store the raw chunk (enables full-text similarity search later) raw_signal = _build_signal( signal_type="meet_chunk_raw", summary=f"[{meeting_id}] Chunk {chunk_index}: {text[:120]}...", raw_quote=text, severity="low", entities=[f"@{speaker}"] if speaker and speaker != "Unknown" else [], keywords=_extract_keywords(text), timestamp=timestamp, group_id=group_id, meeting_id=meeting_id, ) signals_to_store.append(raw_signal) # 2. Extract structured signals via LLM try: result = await call_llm( task_type="fast_large", messages=[ {"role": "system", "content": EXTRACTION_SYSTEM_PROMPT}, {"role": "user", "content": f"Transcript chunk from speaker '{speaker}':\n\n{text}"}, ], temperature=0.1, max_tokens=1500, response_format={"type": "json_object"}, ) raw_json = result["content"].strip() # Strip markdown code fences if present if raw_json.startswith("```"): raw_json = raw_json.split("```")[1] if raw_json.startswith("json"): raw_json = raw_json[4:] extracted = json.loads(raw_json) except Exception as e: logger.warning(f"Meet extraction LLM failed for chunk {chunk_index}: {e}") extracted = {} # Decisions for item in extracted.get("decisions", []): if item.get("confidence") in ("high", "medium"): signals_to_store.append(_build_signal( signal_type="meet_decision", summary=item["text"], raw_quote=item["text"], severity="medium", entities=_extract_entities(item["text"], item.get("owner")), keywords=_extract_keywords(item["text"]), timestamp=timestamp, group_id=group_id, meeting_id=meeting_id, status="decided", )) # Action items for item in extracted.get("action_items", []): if item.get("confidence") in ("high", "medium"): due_str = f" Due: {item['due']}." if item.get("due") else "" signals_to_store.append(_build_signal( signal_type="meet_action_item", summary=f"{item['text']}{due_str}", raw_quote=item["text"], severity="medium", entities=_extract_entities(item["text"], item.get("owner")), keywords=_extract_keywords(item["text"]), timestamp=timestamp, group_id=group_id, meeting_id=meeting_id, urgency="medium" if item.get("due") else "low", status="open", )) # Blockers for item in extracted.get("blockers", []): if item.get("confidence") in ("high", "medium"): signals_to_store.append(_build_signal( signal_type="meet_blocker", summary=item["text"], raw_quote=item["text"], severity="high", entities=_extract_entities(item["text"]), keywords=_extract_keywords(item["text"]), timestamp=timestamp, group_id=group_id, meeting_id=meeting_id, urgency="high", status="open", )) # Risks for item in extracted.get("risks", []): signals_to_store.append(_build_signal( signal_type="meet_risk", summary=item["text"], raw_quote=item["text"], severity=item.get("severity", "medium"), entities=_extract_entities(item["text"]), keywords=_extract_keywords(item["text"]), timestamp=timestamp, group_id=group_id, meeting_id=meeting_id, urgency="medium", status="open", )) # Open questions for item in extracted.get("open_questions", []): if item.get("confidence") in ("high", "medium"): signals_to_store.append(_build_signal( signal_type="meet_open_q", summary=item["text"], raw_quote=item["text"], severity="low", entities=_extract_entities(item["text"]), keywords=_extract_keywords(item["text"]), timestamp=timestamp, group_id=group_id, meeting_id=meeting_id, status="open", )) # 3. If this is the final chunk, generate a meeting summary if is_final: summary_signal = await _generate_meeting_summary( meeting_id, group_id, text, speaker, timestamp ) if summary_signal: signals_to_store.append(summary_signal) # Store everything if signals_to_store: store_signals(group_id, signals_to_store) logger.info( f"Stored {len(signals_to_store)} signals for meeting {meeting_id} chunk {chunk_index}" ) return signals_to_store async def _generate_meeting_summary( meeting_id: str, group_id: str, final_chunk_text: str, speaker: str, timestamp: str, ) -> dict | None: """ Pull all raw chunks for this meeting from ChromaDB and generate a summary. Falls back to summarizing just the final chunk if retrieval fails. """ from backend.db.chroma import query_signals try: # Get all raw chunks for this meeting raw_chunks = query_signals( group_id, meeting_id, n_results=50, signal_type="meet_chunk_raw", ) full_transcript = "\n\n".join( [s.get("metadata", {}).get("raw_quote", "") or s.get("document", "") for s in raw_chunks] ) if not full_transcript.strip(): full_transcript = final_chunk_text except Exception: full_transcript = final_chunk_text try: result = await call_llm( task_type="fast_large", messages=[ {"role": "system", "content": SUMMARY_SYSTEM_PROMPT}, { "role": "user", "content": f"Meeting ID: {meeting_id}\n\nFull transcript:\n\n{full_transcript[:6000]}", }, ], temperature=0.3, max_tokens=600, ) summary_text = result["content"].strip() except Exception as e: logger.warning(f"Meeting summary generation failed: {e}") return None return _build_signal( signal_type="meet_summary", summary=summary_text, raw_quote=full_transcript[:500], severity="medium", entities=[f"@{speaker}"] if speaker and speaker != "Unknown" else [], keywords=_extract_keywords(summary_text), timestamp=timestamp, group_id=group_id, meeting_id=meeting_id, status="completed", )