"""Core pipeline: message batch → signals → classified → stored → queryable.""" import asyncio import logging from backend.agents.signal_extractor import extract_signals from backend.agents.classifier import classify_signal from backend.agents.context_detector import detect_context from backend.db.chroma import store_signals, query_signals from backend.db.models import Signal logger = logging.getLogger("thirdeye.pipeline") # In-memory group config store (replace with Redis/DB for production) _group_configs = {} async def detect_and_set_lens(group_id: str, messages_text: str) -> str: """Auto-detect lens for a group from initial messages.""" result = await detect_context(messages_text) _group_configs[group_id] = { "lens": result["detected_lens"], "confidence": result["confidence"], } logger.info(f"Group {group_id}: lens={result['detected_lens']} (conf={result['confidence']})") return result["detected_lens"] def get_lens(group_id: str) -> str: """Get the current lens for a group.""" config = _group_configs.get(group_id, {}) return config.get("lens", "dev") def set_lens(group_id: str, lens: str): """Manually set the lens for a group.""" _group_configs[group_id] = {"lens": lens, "confidence": 1.0} async def _auto_raise_and_notify(group_id: str, signals: list[dict]): """ Background task: raise Jira tickets for critical signals and log results. Called automatically when JIRA_AUTO_RAISE=true in .env. Does NOT send Telegram messages (no bot context here) — check logs or /jiraraised. """ import logging logger = logging.getLogger("thirdeye.pipeline.auto_raise") try: from backend.agents.jira_agent import bulk_raise_for_group results = await bulk_raise_for_group( group_id=group_id, signals=signals, min_severity="high", max_tickets=5, ) raised = [r for r in results if r.get("ok")] if raised: logger.info( f"[Auto-raise] Group {group_id}: {len(raised)} ticket(s) raised — " + ", ".join(r.get("key", "?") for r in raised) ) except Exception as e: logging.getLogger("thirdeye.pipeline.auto_raise").error(f"Auto-raise failed: {e}") async def process_message_batch(group_id: str, messages: list[dict]) -> list[Signal]: """ Process a batch of messages through the full pipeline. Args: group_id: Telegram group ID messages: List of {"sender": str, "text": str, "timestamp": str} Returns: List of stored Signal objects """ # Format messages for the LLM formatted = "\n".join([f"[{m['sender']}]: {m['text']}" for m in messages]) # Get or detect lens lens = get_lens(group_id) if lens == "dev" and group_id not in _group_configs: # First time seeing this group — auto-detect lens = await detect_and_set_lens(group_id, formatted) # Step 1: Extract signals signals = await extract_signals(formatted, group_id, lens=lens) if not signals: logger.info(f"No signals extracted from batch in {group_id}") return [] # Step 2: Classify each signal (parallel for speed) classified_signals = await asyncio.gather(*[classify_signal(s) for s in signals]) # Step 3: Store in ChromaDB store_signals(group_id, [s.model_dump() for s in classified_signals]) # Append inside process_message_batch(), after store_signals() call: # ─── Auto-raise Jira tickets for critical signals ───────────────────────────── from backend.config import JIRA_AUTO_RAISE, ENABLE_JIRA if ENABLE_JIRA and JIRA_AUTO_RAISE and signals: from backend.agents.jira_agent import bulk_raise_for_group critical_signals = [ s for s in signals if s.get("severity", "low") in ("high", "critical") ] if critical_signals: asyncio.create_task( _auto_raise_and_notify(group_id, critical_signals) ) logger.info(f"Pipeline complete: {len(classified_signals)} signals stored for {group_id}") return classified_signals async def query_knowledge(group_id: str, question: str, force_web_search: bool = False) -> str: """ Query the knowledge base with natural language, with cross-group fallback and conservative web search (only when all internal sources fail). Flow: 1. Search this group's knowledge base (ChromaDB) 2. If results are weak, also search all other groups (cross-group fallback) 3. Only hit the web if no internal knowledge is found AND question is clearly external 4. LLM synthesizes the best available context into a final answer """ from backend.providers import call_llm from backend.agents.web_search import search_web, format_search_results_for_llm from backend.config import ENABLE_WEB_SEARCH from backend.db.chroma import query_signals_global # ── Step 1: search this group's own collection ────────────────────────────── results = query_signals(group_id, question, n_results=8) # A result is "strong" when the top hit has high semantic similarity (≥ 0.40) STRONG_THRESHOLD = 0.40 has_strong_local = bool(results) and results[0].get("relevance_score", 0) >= STRONG_THRESHOLD # ── Step 2: cross-group fallback ──────────────────────────────────────────── cross_results = [] if not has_strong_local: cross_results = query_signals_global(question, n_results=8, exclude_group_id=group_id) # Combine: local results first, then cross-group ones (de-duplicated by document text) seen_docs = {r["document"] for r in results} for cr in cross_results: if cr["document"] not in seen_docs: results.append(cr) seen_docs.add(cr["document"]) # ── Recency re-ranking ─────────────────────────────────────────────────────── # Boost signals that are recent so a fresh update beats an older one on the same topic. # Boost formula: +0.4 for brand-new, decays to ~0 after 7 days. from datetime import datetime, timezone import math now = datetime.now(timezone.utc) def _recency_boost(ts_str: str) -> float: try: ts = datetime.fromisoformat(ts_str) if ts.tzinfo is None: ts = ts.replace(tzinfo=timezone.utc) age_hours = max(0, (now - ts).total_seconds() / 3600) return 0.4 * math.exp(-age_hours / 48) # half-life ≈ 33 hours except Exception: return 0.0 for r in results: ts = r.get("metadata", {}).get("timestamp", "") r["_ranked_score"] = r.get("relevance_score", 0) + _recency_boost(ts) results.sort(key=lambda x: x["_ranked_score"], reverse=True) # Re-evaluate strength after combining and re-ranking has_any_internal = bool(results) and results[0].get("relevance_score", 0) >= STRONG_THRESHOLD # ── Build internal context ─────────────────────────────────────────────────── # Results are already sorted by (relevance + recency). The first result is the # best match. We label it explicitly so even small fallback models can't miss it. from backend.agents.query_agent import _format_signal_for_context, VOICE_CITATION_INSTRUCTION internal_context = "" has_voice_signals = False if results: context_parts = [] for i, r in enumerate(results): meta = r["metadata"] source_group = r.get("source_group_id") if meta.get("source") == "voice" or meta.get("type") == "voice_transcript": has_voice_signals = True # Rich source label using voice-aware formatter signal_label = _format_signal_for_context(r) rank_header = ( "*** BEST MATCH (use this as your primary answer) ***\n" if i == 0 else f"(supporting context {i+1})\n" ) context_parts.append( f"{rank_header}" f"{signal_label}\n" f"Content: {r['document']}\n" f"Entities: {meta.get('entities', '[]')}" ) internal_context = "\n\n---\n\n".join(context_parts) # ── Step 3: web search — only when all internal sources fail ──────────────── # Only keywords that are clearly external / internet-specific trigger web search. # Intentionally excludes personal/team words like "update", "current", "what is". web_keywords = [ "latest news", "industry standard", "best practice", "benchmark", "security vulnerability", "cve", "public release", "changelog", "documentation for", "how to install", "npm package", "pypi", ] question_lower = question.lower() wants_external = any(kw in question_lower for kw in web_keywords) # Web search fires only when: explicitly forced, OR no internal knowledge at all # AND the question looks like it's asking about something external. should_search_web = ENABLE_WEB_SEARCH and ( force_web_search or (not has_any_internal and wants_external) ) web_context = "" used_web = False if should_search_web: web_results = await search_web(question, max_results=3) if web_results: web_context = format_search_results_for_llm(web_results) used_web = True # ── Step 4: build combined prompt ─────────────────────────────────────────── if not internal_context and not web_context: return ( "I don't have any information about that yet across all team chats. " "The relevant group may need more conversation, or try /search for external info." ) combined_context = "" if internal_context: combined_context += ( "=== INTERNAL KNOWLEDGE BASE (from team conversations & documents) ===\n\n" f"{internal_context}\n\n" ) if web_context: combined_context += f"=== WEB SEARCH RESULTS ===\n\n{web_context}\n\n" system_prompt = """You are the Query Agent for ThirdEye. Answer the question using the context below. The context is sorted: the BEST MATCH signal appears first and is your primary source. Older or supporting signals appear after it — they may be outdated, so prefer the BEST MATCH. RULES: - Answer from the BEST MATCH signal first. Only use other signals as supporting context. - Quote exact numbers, dates, and durations directly — never paraphrase them. - If a signal has a "Quote:" field, that is the verbatim team message — treat it as ground truth. - Signals from "other group" are still internal team knowledge. - Be concise (2-3 sentences). Plain text only, no markdown headers.""" if has_voice_signals: system_prompt += VOICE_CITATION_INSTRUCTION messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"Context:\n\n{combined_context}\n\nQuestion: {question}"}, ] try: result = await call_llm("fast_large", messages, temperature=0.3, max_tokens=600) answer = result["content"] sources = [] if internal_context: sources.append("knowledge base") if used_web: sources.append("web search") answer += f"\n\n📌 Sources: {' + '.join(sources)}" return answer except Exception as e: logger.error(f"Query agent failed: {e}") return "Sorry, I encountered an error while searching. Please try again."