"""Pattern Detector Agent — finds trends and anomalies in accumulated signals.""" import logging from backend.providers import call_llm from backend.db.chroma import get_all_signals from backend.db.models import Pattern from backend.agents.json_utils import extract_json_object logger = logging.getLogger("thirdeye.agents.pattern_detector") SYSTEM_PROMPT = """You are the Pattern Detector for ThirdEye. You analyze accumulated signals to find patterns and anomalies. Detect these pattern types: - frequency_spike: A signal type mentioned significantly more than usual - knowledge_silo: Only one person discusses a critical topic (bus factor = 1) - recurring_issue: Same bug/problem appearing repeatedly - sentiment_trend: Gradual shift in tone over time - stale_item: Decisions proposed but never resolved, promises with no follow-up Respond ONLY with valid JSON (no markdown, no backticks): {"patterns": [{"type": "pattern_type", "description": "Clear human-readable description", "severity": "info|warning|critical", "evidence_ids": [], "recommendation": "Suggested action"}]} If no patterns found: {"patterns": []} Only report patterns that are genuinely concerning. Do NOT manufacture patterns from insufficient data.""" def _heuristic_detect_patterns(group_id: str, all_signals: list[dict]) -> list[Pattern]: """Generate conservative patterns from signal metadata when LLM output is unavailable.""" patterns: list[Pattern] = [] type_counts: dict[str, int] = {} entity_counts: dict[str, int] = {} for s in all_signals: meta = s.get("metadata", {}) signal_type = str(meta.get("type", "unknown")) type_counts[signal_type] = type_counts.get(signal_type, 0) + 1 entities = meta.get("entities", []) if isinstance(entities, str): entities = [entities] if isinstance(entities, list): for ent in entities: ent_key = str(ent).strip() if ent_key: entity_counts[ent_key] = entity_counts.get(ent_key, 0) + 1 recurring_types = [t for t, c in type_counts.items() if c >= 2 and t in {"recurring_bug", "workaround", "tech_debt"}] for signal_type in recurring_types: patterns.append(Pattern( group_id=group_id, type="recurring_issue", description=f"Signal type '{signal_type}' has appeared repeatedly ({type_counts[signal_type]} times).", severity="warning", recommendation="Create a dedicated action item with owner and due date to stop repeated recurrence.", )) silo_entities = [ent for ent, c in entity_counts.items() if c >= 2] if any("stripe" in ent.lower() or "payment" in ent.lower() for ent in silo_entities): patterns.append(Pattern( group_id=group_id, type="knowledge_silo", description="Critical payment-related topics are concentrated in repeated mentions, suggesting low bus factor.", severity="warning", recommendation="Document payment workflows and assign at least one backup owner.", )) return patterns[:5] async def detect_patterns(group_id: str) -> list[Pattern]: """Analyze all signals in a group and detect patterns.""" all_signals = get_all_signals(group_id) if len(all_signals) < 3: logger.info(f"Not enough signals ({len(all_signals)}) for pattern detection in {group_id}") return [] # Format signals for the LLM signal_summary = [] for s in all_signals: meta = s["metadata"] signal_summary.append( f"- [{meta.get('type', '?')}] {s['document'][:100]} " f"(severity={meta.get('severity', '?')}, entities={meta.get('entities', '[]')}, " f"time={meta.get('timestamp', '?')})" ) signals_text = "\n".join(signal_summary) messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": f"Analyze these {len(all_signals)} signals from group '{group_id}':\n\n{signals_text}"}, ] try: result = await call_llm("reasoning", messages, temperature=0.2, max_tokens=1500) parsed = extract_json_object(result.get("content", "")) patterns = [] for p in parsed.get("patterns", []): patterns.append(Pattern( group_id=group_id, type=p.get("type", "unknown"), description=p.get("description", ""), severity=p.get("severity", "info"), recommendation=p.get("recommendation", ""), )) logger.info(f"Detected {len(patterns)} patterns in {group_id}") return patterns except Exception as e: logger.info(f"Pattern detection LLM parse issue, using fallback: {e}") fallback = _heuristic_detect_patterns(group_id, all_signals) if fallback: logger.info(f"Pattern heuristic fallback produced {len(fallback)} patterns in {group_id}") return fallback