# ThirdEye — Implementation Playbook (Milestone 0→100) > **Rule: Do NOT skip milestones. Do NOT skip tests. Every test must PASS before moving to the next milestone.** > **This document is designed to be followed by a human OR a coding agent sequentially.** --- ## MILESTONE 0: Project Scaffolding (0%) **Goal:** Empty project with correct folder structure, all dependencies installed, all env vars configured. ### Step 0.1 — Create project structure ```bash mkdir -p thirdeye/{backend/{agents,bot,db,mcp,api},dashboard,scripts,demo/demo_messages} cd thirdeye ``` ### Step 0.2 — Create requirements.txt Create file: `thirdeye/requirements.txt` ``` python-telegram-bot==21.9 openai==1.82.0 chromadb==1.0.7 fastapi==0.115.12 uvicorn==0.34.2 cohere==5.15.0 python-dotenv==1.1.0 sentence-transformers==4.1.0 pydantic==2.11.2 httpx==0.28.1 ``` ### Step 0.3 — Create .env file Create file: `thirdeye/.env` ```bash # Telegram TELEGRAM_BOT_TOKEN=your_telegram_bot_token_here # LLM Providers (all free, no credit card needed) GROQ_API_KEY=your_groq_key_here CEREBRAS_API_KEY=your_cerebras_key_here SAMBANOVA_API_KEY=your_sambanova_key_here OPENROUTER_API_KEY=your_openrouter_key_here GEMINI_API_KEY=your_gemini_key_here # Embeddings COHERE_API_KEY=your_cohere_key_here # App Config CHROMA_DB_PATH=./chroma_db BATCH_SIZE=5 BATCH_TIMEOUT_SECONDS=60 ``` ### Step 0.4 — Create config.py Create file: `thirdeye/backend/config.py` ```python import os from dotenv import load_dotenv load_dotenv() # Telegram TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") # LLM Providers GROQ_API_KEY = os.getenv("GROQ_API_KEY") CEREBRAS_API_KEY = os.getenv("CEREBRAS_API_KEY") SAMBANOVA_API_KEY = os.getenv("SAMBANOVA_API_KEY") OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") # Embeddings COHERE_API_KEY = os.getenv("COHERE_API_KEY") # App CHROMA_DB_PATH = os.getenv("CHROMA_DB_PATH", "./chroma_db") BATCH_SIZE = int(os.getenv("BATCH_SIZE", "5")) BATCH_TIMEOUT_SECONDS = int(os.getenv("BATCH_TIMEOUT_SECONDS", "60")) ``` ### Step 0.5 — Install dependencies ```bash cd thirdeye pip install -r requirements.txt ``` ### Step 0.6 — Get ALL API keys (do this NOW) Open each URL in browser tabs simultaneously: 1. **Groq:** https://console.groq.com → Sign up → API Keys → Create 2. **Cerebras:** https://cloud.cerebras.ai → Sign up → Portal → API Keys 3. **SambaNova:** https://cloud.sambanova.ai → Sign up → API Keys 4. **OpenRouter:** https://openrouter.ai → Sign up → Dashboard → Keys → Create Key 5. **Cohere:** https://dashboard.cohere.com → Sign up → API Keys 6. **Telegram Bot:** Open Telegram → Search @BotFather → /newbot → Follow prompts → Copy token 7. **Google (last resort):** https://aistudio.google.com → Get API Key Fill all values in `.env`. ### ✅ TEST MILESTONE 0 Create file: `thirdeye/scripts/test_m0.py` ```python """Test Milestone 0: Project structure and env vars.""" import os import sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from backend.config import ( TELEGRAM_BOT_TOKEN, GROQ_API_KEY, CEREBRAS_API_KEY, SAMBANOVA_API_KEY, OPENROUTER_API_KEY, COHERE_API_KEY ) checks = { "TELEGRAM_BOT_TOKEN": TELEGRAM_BOT_TOKEN, "GROQ_API_KEY": GROQ_API_KEY, "CEREBRAS_API_KEY": CEREBRAS_API_KEY, "SAMBANOVA_API_KEY": SAMBANOVA_API_KEY, "OPENROUTER_API_KEY": OPENROUTER_API_KEY, "COHERE_API_KEY": COHERE_API_KEY, } all_pass = True for name, val in checks.items(): status = "✅" if val and len(val) > 5 else "❌ MISSING" if "❌" in status: all_pass = False print(f" {status} {name}") # Check directories exist for d in ["backend/agents", "backend/bot", "backend/db", "backend/api", "dashboard", "scripts"]: path = os.path.join(os.path.dirname(__file__), '..', d) exists = os.path.isdir(path) status = "✅" if exists else "❌ MISSING" if not exists: all_pass = False print(f" {status} Directory: {d}") print(f"\n{'🎉 MILESTONE 0 PASSED' if all_pass else '💥 MILESTONE 0 FAILED — fix issues above'}") ``` Run: `cd thirdeye && python scripts/test_m0.py` **Expected output:** All ✅ checks, ending with "🎉 MILESTONE 0 PASSED" --- ## MILESTONE 1: Provider Router (10%) **Goal:** Multi-provider LLM router that falls back across Groq → Cerebras → SambaNova → OpenRouter → Google. Verified working. ### Step 1.1 — Create the provider router Create file: `thirdeye/backend/providers.py` ```python """Multi-provider LLM router with automatic fallback on rate limits.""" import asyncio import logging from openai import AsyncOpenAI from backend.config import ( GROQ_API_KEY, CEREBRAS_API_KEY, SAMBANOVA_API_KEY, OPENROUTER_API_KEY, GEMINI_API_KEY ) logger = logging.getLogger("thirdeye.providers") # Initialize provider clients _clients = {} def _init_client(name, base_url, api_key): if api_key and len(api_key) > 5: _clients[name] = AsyncOpenAI(base_url=base_url, api_key=api_key) _init_client("groq", "https://api.groq.com/openai/v1", GROQ_API_KEY) _init_client("cerebras", "https://api.cerebras.ai/v1", CEREBRAS_API_KEY) _init_client("sambanova", "https://api.sambanova.ai/v1", SAMBANOVA_API_KEY) _init_client("openrouter", "https://openrouter.ai/api/v1", OPENROUTER_API_KEY) _init_client("google", "https://generativelanguage.googleapis.com/v1beta/openai/", GEMINI_API_KEY) # Model registry — task_type → [(provider, model_id), ...] MODEL_REGISTRY = { "fast_small": [ ("groq", "llama-3.1-8b-instant"), ("cerebras", "llama-3.1-8b"), ("openrouter", "openai/gpt-oss-20b:free"), ], "fast_large": [ ("groq", "llama-3.3-70b-versatile"), ("cerebras", "llama-3.3-70b"), ("openrouter", "meta-llama/llama-3.3-70b-instruct:free"), ("sambanova", "Meta-Llama-3.3-70B-Instruct"), ], "reasoning": [ ("sambanova", "Meta-Llama-3.1-405B-Instruct"), ("openrouter", "nvidia/nemotron-3-super-120b-a12b:free"), ("openrouter", "openai/gpt-oss-120b:free"), ], "agentic": [ ("openrouter", "minimax/minimax-m2.5:free"), ("openrouter", "nvidia/nemotron-3-super-120b-a12b:free"), ("groq", "llama-3.3-70b-versatile"), ], "fallback": [ ("openrouter", "openrouter/free"), ("google", "gemini-2.0-flash"), ], } async def call_llm( task_type: str, messages: list, temperature: float = 0.3, max_tokens: int = 2000, response_format: dict = None, ) -> dict: """ Route to best available provider with automatic fallback. Args: task_type: Key from MODEL_REGISTRY (fast_small, fast_large, reasoning, agentic) messages: OpenAI-format messages list temperature: Sampling temperature max_tokens: Max output tokens response_format: Optional {"type": "json_object"} for JSON mode Returns: {"content": str, "provider": str, "model": str} """ candidates = MODEL_REGISTRY.get(task_type, []) + MODEL_REGISTRY["fallback"] errors = [] for provider_name, model_id in candidates: client = _clients.get(provider_name) if not client: continue try: kwargs = { "model": model_id, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, "timeout": 45, } if response_format and provider_name != "google": kwargs["response_format"] = response_format response = await client.chat.completions.create(**kwargs) content = response.choices[0].message.content logger.info(f"LLM call success: {provider_name}/{model_id} ({task_type})") return { "content": content, "provider": provider_name, "model": model_id, } except Exception as e: err = str(e).lower() is_rate_limit = any(k in err for k in ["429", "rate", "quota", "limit", "exceeded", "capacity"]) is_timeout = "timeout" in err or "timed out" in err if is_rate_limit or is_timeout: logger.warning(f"Provider {provider_name}/{model_id} unavailable: {type(e).__name__}") errors.append(f"{provider_name}: rate limited") continue else: logger.error(f"Provider {provider_name}/{model_id} error: {e}") errors.append(f"{provider_name}: {e}") continue raise Exception(f"All LLM providers exhausted. Errors: {errors}") ``` ### ✅ TEST MILESTONE 1 Create file: `thirdeye/scripts/test_m1.py` ```python """Test Milestone 1: Provider router works with at least one provider.""" import asyncio import os import sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) async def test_providers(): from backend.providers import call_llm test_messages = [ {"role": "user", "content": "Reply with exactly: THIRDEYE_OK"} ] # Test 1: fast_small (should use Groq 8B) print("Testing fast_small (Groq 8B / Cerebras 8B)...") try: result = await call_llm("fast_small", test_messages, max_tokens=50) print(f" ✅ fast_small → {result['provider']}/{result['model']}") print(f" Response: {result['content'][:80]}") except Exception as e: print(f" ❌ fast_small failed: {e}") # Test 2: fast_large (should use Groq 70B) print("Testing fast_large (Groq/Cerebras 70B)...") try: result = await call_llm("fast_large", test_messages, max_tokens=50) print(f" ✅ fast_large → {result['provider']}/{result['model']}") print(f" Response: {result['content'][:80]}") except Exception as e: print(f" ❌ fast_large failed: {e}") # Test 3: reasoning (should use SambaNova 405B) print("Testing reasoning (SambaNova 405B / OpenRouter Nemotron)...") try: result = await call_llm("reasoning", test_messages, max_tokens=50) print(f" ✅ reasoning → {result['provider']}/{result['model']}") print(f" Response: {result['content'][:80]}") except Exception as e: print(f" ❌ reasoning failed: {e}") # Test 4: JSON mode print("Testing JSON mode...") try: json_messages = [ {"role": "system", "content": "You respond only in valid JSON."}, {"role": "user", "content": 'Return: {"status": "ok", "test": true}'}, ] result = await call_llm("fast_small", json_messages, max_tokens=100) print(f" ✅ JSON mode → {result['provider']}/{result['model']}") print(f" Response: {result['content'][:120]}") except Exception as e: print(f" ❌ JSON mode failed: {e}") print("\n🎉 MILESTONE 1 PASSED — At least one provider works per task type") asyncio.run(test_providers()) ``` Run: `cd thirdeye && python scripts/test_m1.py` **Expected:** At least one ✅ per task type. If a specific provider fails (429), that's fine — the fallback should catch it. --- ## MILESTONE 2: ChromaDB + Embeddings (20%) **Goal:** ChromaDB running with Cohere embeddings. Can store and retrieve signals. ### Step 2.1 — Create embedding module Create file: `thirdeye/backend/db/embeddings.py` ```python """Embedding provider with Cohere primary and local fallback.""" import cohere import logging from backend.config import COHERE_API_KEY logger = logging.getLogger("thirdeye.embeddings") _cohere_client = None _local_model = None def _get_cohere(): global _cohere_client if _cohere_client is None and COHERE_API_KEY: _cohere_client = cohere.Client(COHERE_API_KEY) return _cohere_client def _get_local_model(): global _local_model if _local_model is None: from sentence_transformers import SentenceTransformer _local_model = SentenceTransformer("all-MiniLM-L6-v2") logger.info("Loaded local embedding model: all-MiniLM-L6-v2") return _local_model def embed_texts(texts: list[str]) -> list[list[float]]: """Embed a list of texts. Tries Cohere first, falls back to local model.""" if not texts: return [] # Try Cohere client = _get_cohere() if client: try: response = client.embed( texts=texts, model="embed-english-v3.0", input_type="search_document", ) logger.info(f"Cohere embedded {len(texts)} texts") return [list(e) for e in response.embeddings] except Exception as e: logger.warning(f"Cohere embedding failed: {e}, falling back to local") # Fallback to local model = _get_local_model() embeddings = model.encode(texts).tolist() logger.info(f"Local model embedded {len(texts)} texts") return embeddings def embed_query(text: str) -> list[float]: """Embed a single query text.""" client = _get_cohere() if client: try: response = client.embed( texts=[text], model="embed-english-v3.0", input_type="search_query", ) return list(response.embeddings[0]) except Exception: pass model = _get_local_model() return model.encode([text]).tolist()[0] ``` ### Step 2.2 — Create ChromaDB manager Create file: `thirdeye/backend/db/chroma.py` ```python """ChromaDB setup and operations.""" import json import uuid import chromadb import logging from datetime import datetime from backend.config import CHROMA_DB_PATH from backend.db.embeddings import embed_texts, embed_query logger = logging.getLogger("thirdeye.chroma") # Initialize persistent client _chroma_client = chromadb.PersistentClient(path=CHROMA_DB_PATH) def get_collection(group_id: str) -> chromadb.Collection: """Get or create a collection for a specific group.""" safe_name = f"ll_{group_id.replace('-', '_')}" # ChromaDB collection names: 3-63 chars, alphanumeric + underscores safe_name = safe_name[:63] return _chroma_client.get_or_create_collection(name=safe_name) def store_signals(group_id: str, signals: list[dict]): """Store extracted signals in ChromaDB with embeddings.""" if not signals: return collection = get_collection(group_id) documents = [] metadatas = [] ids = [] for signal in signals: doc_text = f"{signal['type']}: {signal['summary']}" if signal.get('raw_quote'): doc_text += f" | Quote: {signal['raw_quote']}" documents.append(doc_text) metadatas.append({ "type": signal.get("type", "unknown"), "severity": signal.get("severity", "low"), "status": signal.get("status", "unknown"), "sentiment": signal.get("sentiment", "neutral"), "urgency": signal.get("urgency", "none"), "entities": json.dumps(signal.get("entities", [])), "keywords": json.dumps(signal.get("keywords", [])), "raw_quote": signal.get("raw_quote", ""), "timestamp": signal.get("timestamp", datetime.utcnow().isoformat()), "group_id": group_id, "lens": signal.get("lens", "unknown"), }) ids.append(signal.get("id", str(uuid.uuid4()))) # Generate embeddings embeddings = embed_texts(documents) collection.add( documents=documents, metadatas=metadatas, embeddings=embeddings, ids=ids, ) logger.info(f"Stored {len(signals)} signals for group {group_id}") def query_signals(group_id: str, query: str, n_results: int = 10, signal_type: str = None) -> list[dict]: """Query the knowledge base with natural language.""" collection = get_collection(group_id) query_embedding = embed_query(query) where_filter = None if signal_type: where_filter = {"type": signal_type} try: results = collection.query( query_embeddings=[query_embedding], n_results=min(n_results, collection.count() or 1), where=where_filter, ) except Exception as e: logger.warning(f"Query failed: {e}") return [] # Format results output = [] if results and results["documents"]: for i, doc in enumerate(results["documents"][0]): meta = results["metadatas"][0][i] if results["metadatas"] else {} distance = results["distances"][0][i] if results["distances"] else None output.append({ "document": doc, "metadata": meta, "relevance_score": 1 - (distance or 0), # Convert distance to similarity }) return output def get_all_signals(group_id: str, signal_type: str = None) -> list[dict]: """Get all signals for a group (for pattern detection).""" collection = get_collection(group_id) count = collection.count() if count == 0: return [] where_filter = {"type": signal_type} if signal_type else None try: results = collection.get(where=where_filter, limit=count) except Exception: results = collection.get(limit=count) output = [] if results and results["documents"]: for i, doc in enumerate(results["documents"]): meta = results["metadatas"][i] if results["metadatas"] else {} output.append({"document": doc, "metadata": meta, "id": results["ids"][i]}) return output def get_group_ids() -> list[str]: """Get all group IDs that have collections.""" collections = _chroma_client.list_collections() return [c.name.replace("ll_", "").replace("_", "-") for c in collections if c.name.startswith("ll_")] ``` ### Step 2.3 — Create data models Create file: `thirdeye/backend/db/models.py` ```python """Data models for ThirdEye.""" from pydantic import BaseModel, Field from typing import Optional from datetime import datetime import uuid class Signal(BaseModel): id: str = Field(default_factory=lambda: str(uuid.uuid4())) group_id: str lens: str = "unknown" # dev, product, client, community type: str # architecture_decision, tech_debt, etc. summary: str entities: list[str] = [] severity: str = "low" # low, medium, high, critical status: str = "unknown" # proposed, decided, implemented, unresolved sentiment: str = "neutral" urgency: str = "none" raw_quote: str = "" source_messages: list[int] = [] timestamp: str = Field(default_factory=lambda: datetime.utcnow().isoformat()) keywords: list[str] = [] class Pattern(BaseModel): id: str = Field(default_factory=lambda: str(uuid.uuid4())) group_id: str type: str # frequency_spike, knowledge_silo, recurring_issue, sentiment_trend, stale_item description: str severity: str = "info" # info, warning, critical evidence_signal_ids: list[str] = [] recommendation: str = "" detected_at: str = Field(default_factory=lambda: datetime.utcnow().isoformat()) is_active: bool = True class CrossGroupInsight(BaseModel): id: str = Field(default_factory=lambda: str(uuid.uuid4())) type: str # blocked_handoff, conflicting_decision, information_silo, promise_reality_gap, duplicated_effort description: str group_a: dict = {} # {name, group_id, evidence} group_b: dict = {} severity: str = "warning" recommendation: str = "" detected_at: str = Field(default_factory=lambda: datetime.utcnow().isoformat()) is_resolved: bool = False class GroupConfig(BaseModel): group_id: str group_name: str = "" lens_mode: str = "auto" # auto, dev, product, client, community detected_lens: str = "unknown" confidence: float = 0.0 is_active: bool = True message_count: int = 0 signal_count: int = 0 ``` ### ✅ TEST MILESTONE 2 Create file: `thirdeye/scripts/test_m2.py` ```python """Test Milestone 2: ChromaDB + Embeddings working.""" import os, sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) def test_embeddings(): print("Testing embeddings...") from backend.db.embeddings import embed_texts, embed_query texts = ["Let's use PostgreSQL for the database", "The timeout bug is happening again"] embeddings = embed_texts(texts) assert len(embeddings) == 2, f"Expected 2 embeddings, got {len(embeddings)}" assert len(embeddings[0]) > 10, f"Embedding too short: {len(embeddings[0])}" print(f" ✅ Embedded 2 texts, dimension={len(embeddings[0])}") query_emb = embed_query("database decision") assert len(query_emb) > 10 print(f" ✅ Query embedding works, dimension={len(query_emb)}") def test_chroma(): print("Testing ChromaDB...") from backend.db.chroma import store_signals, query_signals, get_all_signals test_group = "test_group_m2" # Store test signals signals = [ { "type": "architecture_decision", "summary": "Team decided to use PostgreSQL over MongoDB for relational data", "entities": ["@alex", "postgresql", "mongodb"], "severity": "medium", "status": "decided", "raw_quote": "Let's go with Postgres, MongoDB is overkill", "timestamp": "2026-03-20T10:00:00Z", "lens": "dev", }, { "type": "tech_debt", "summary": "API URL hardcoded instead of using environment variables", "entities": ["@priya", "api_url"], "severity": "low", "status": "unresolved", "raw_quote": "Just hardcoding the URL for now", "timestamp": "2026-03-20T14:00:00Z", "lens": "dev", }, { "type": "recurring_bug", "summary": "Timeout error occurring repeatedly in payment service", "entities": ["payment_service", "timeout"], "severity": "high", "status": "unresolved", "raw_quote": "Timeout error is back again", "timestamp": "2026-03-21T09:00:00Z", "lens": "dev", }, ] store_signals(test_group, signals) print(f" ✅ Stored {len(signals)} signals") # Query results = query_signals(test_group, "database decision") assert len(results) > 0, "No results for 'database decision'" assert "postgres" in results[0]["document"].lower() or "database" in results[0]["document"].lower() print(f" ✅ Query 'database decision' returned {len(results)} results") print(f" Top result: {results[0]['document'][:80]}") # Query with type filter results2 = query_signals(test_group, "bug", signal_type="recurring_bug") assert len(results2) > 0, "No results for type=recurring_bug" print(f" ✅ Filtered query (type=recurring_bug) returned {len(results2)} results") # Get all all_sigs = get_all_signals(test_group) assert len(all_sigs) >= 3, f"Expected >=3 signals, got {len(all_sigs)}" print(f" ✅ get_all_signals returned {len(all_sigs)} signals") # Cleanup test collection import chromadb from backend.config import CHROMA_DB_PATH client = chromadb.PersistentClient(path=CHROMA_DB_PATH) try: client.delete_collection(f"ll_{test_group}") print(f" ✅ Cleaned up test collection") except: pass def test_models(): print("Testing data models...") from backend.db.models import Signal, Pattern, CrossGroupInsight s = Signal(group_id="test", type="tech_debt", summary="Test signal") assert s.id is not None assert s.severity == "low" print(f" ✅ Signal model works (id={s.id[:8]}...)") p = Pattern(group_id="test", type="frequency_spike", description="Test pattern") assert p.is_active == True print(f" ✅ Pattern model works") c = CrossGroupInsight(type="blocked_handoff", description="Test insight") assert c.is_resolved == False print(f" ✅ CrossGroupInsight model works") test_embeddings() test_chroma() test_models() print("\n🎉 MILESTONE 2 PASSED — ChromaDB + Embeddings working") ``` Run: `cd thirdeye && python scripts/test_m2.py` --- ## MILESTONE 3: Signal Extractor Agent (30%) **Goal:** The core agent that processes batched messages and extracts structured signals. Verified with real LLM calls. ### Step 3.1 — Create the Signal Extractor Agent Create file: `thirdeye/backend/agents/signal_extractor.py` ```python """Signal Extractor Agent — extracts structured signals from chat messages.""" import json import logging from backend.providers import call_llm from backend.db.models import Signal from datetime import datetime logger = logging.getLogger("thirdeye.agents.signal_extractor") # Lens-specific system prompts LENS_PROMPTS = { "dev": """You are the Signal Extractor for ThirdEye operating in DevLens mode. You analyze batches of developer team chat messages and extract STRUCTURED SIGNALS. Extract ONLY signals that represent meaningful technical information. Skip greetings, small talk, emoji reactions, and meta-conversation. Signal types to look for: - architecture_decision: Technology choices, design decisions with rationale - tech_debt: Shortcuts, hardcoded values, "will fix later" patterns - knowledge_silo_evidence: Only one person discusses a critical topic - recurring_bug: Same issue mentioned repeatedly - stack_decision: Technology/framework choices (proposed or decided) - deployment_risk: Risky deployment practices - workaround: Temporary fixes being applied repeatedly For EACH signal found, include it in the JSON array. If NO meaningful signals exist, return empty array. Be SELECTIVE. Quality over quantity.""", "product": """You are the Signal Extractor for ThirdEye operating in ProductLens mode. Signal types to look for: - feature_request: Features users or team members are asking for - user_pain_point: User difficulties, complaints, confusion - roadmap_drift: Discussion of topics not on the current plan - priority_conflict: Team members disagreeing on what's most important - metric_mention: Specific numbers, conversion rates, performance data - user_quote: Direct quotes from users/customers - competitor_intel: Mentions of competitor actions or features Be SELECTIVE. Quality over quantity.""", "client": """You are the Signal Extractor for ThirdEye operating in ClientLens mode. Signal types to look for: - promise: Commitments made with deadlines (explicit or implicit) - scope_creep: Additional requests introduced casually without formal change requests - sentiment_signal: Tone changes (positive praise, growing frustration, formality shifts) - unanswered_request: Questions or requests that haven't received responses - satisfaction: Explicit positive or negative feedback - escalation_risk: Mentions of involving management, expressing deadline concerns - client_decision: Decisions made by the client Pay SPECIAL attention to implicit deadlines ("by end of week", "before the meeting"). Be SELECTIVE. Quality over quantity.""", "community": """You are the Signal Extractor for ThirdEye operating in CommunityLens mode. Signal types: recommendation, event, issue, local_knowledge, question Be SELECTIVE. Quality over quantity.""", } EXTRACTION_FORMAT = """ Respond ONLY with valid JSON in this exact format (no markdown, no backticks, no explanation): {"signals": [{"type": "signal_type_here", "summary": "One clear sentence", "entities": ["@person", "technology"], "severity": "low|medium|high|critical", "status": "proposed|decided|implemented|unresolved", "raw_quote": "Short key phrase from message", "message_index": 0}]} If no signals found: {"signals": []} """ async def extract_signals(messages_text: str, group_id: str, lens: str = "dev") -> list[Signal]: """ Extract structured signals from a batch of formatted chat messages. Args: messages_text: Formatted string like "[Alex]: Let's use Redis\\n[Bob]: Agreed" group_id: Telegram group ID lens: Active lens mode (dev, product, client, community) Returns: List of Signal objects """ system_prompt = LENS_PROMPTS.get(lens, LENS_PROMPTS["dev"]) messages = [ {"role": "system", "content": system_prompt + "\n\n" + EXTRACTION_FORMAT}, {"role": "user", "content": f"Extract signals from these messages:\n\n{messages_text}"}, ] try: result = await call_llm("fast_large", messages, temperature=0.2, max_tokens=2000) content = result["content"].strip() # Clean up common LLM response issues if content.startswith("```"): content = content.split("```")[1] if content.startswith("json"): content = content[4:] content = content.strip() parsed = json.loads(content) raw_signals = parsed.get("signals", []) # Convert to Signal objects signals = [] for raw in raw_signals: try: signal = Signal( group_id=group_id, lens=lens, type=raw.get("type", "unknown"), summary=raw.get("summary", ""), entities=raw.get("entities", []), severity=raw.get("severity", "low"), status=raw.get("status", "unknown"), raw_quote=raw.get("raw_quote", ""), timestamp=datetime.utcnow().isoformat(), ) signals.append(signal) except Exception as e: logger.warning(f"Failed to parse signal: {e}") continue logger.info(f"Extracted {len(signals)} signals from {group_id} (lens={lens}) via {result['provider']}") return signals except json.JSONDecodeError as e: logger.error(f"JSON parse error in signal extraction: {e}") return [] except Exception as e: logger.error(f"Signal extraction failed: {e}") return [] ``` ### Step 3.2 — Create the Classifier Agent Create file: `thirdeye/backend/agents/classifier.py` ```python """Classifier Agent — adds metadata tags to extracted signals.""" import json import logging from backend.providers import call_llm from backend.db.models import Signal logger = logging.getLogger("thirdeye.agents.classifier") SYSTEM_PROMPT = """You are a fast metadata classifier. Given an extracted signal, add classification tags. Respond ONLY with valid JSON (no markdown, no backticks): {"sentiment": "positive|neutral|negative|urgent", "urgency": "none|low|medium|high|critical", "keywords": ["3-5 searchable keywords"]} """ async def classify_signal(signal: Signal) -> Signal: """Add classification metadata to a signal.""" messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": f"Classify this signal:\nType: {signal.type}\nSummary: {signal.summary}\nQuote: {signal.raw_quote}"}, ] try: result = await call_llm("fast_small", messages, temperature=0.1, max_tokens=200) content = result["content"].strip() if content.startswith("```"): content = content.split("```")[1] if content.startswith("json"): content = content[4:] content = content.strip() parsed = json.loads(content) signal.sentiment = parsed.get("sentiment", signal.sentiment) signal.urgency = parsed.get("urgency", signal.urgency) signal.keywords = parsed.get("keywords", signal.keywords) except Exception as e: logger.warning(f"Classification failed, using defaults: {e}") # Keep defaults — classification failure is non-fatal return signal ``` ### Step 3.3 — Create the Context Detector Agent Create file: `thirdeye/backend/agents/context_detector.py` ```python """Context Detector Agent — auto-classifies group type from messages.""" import json import logging from backend.providers import call_llm logger = logging.getLogger("thirdeye.agents.context_detector") SYSTEM_PROMPT = """You analyze a batch of messages from a Telegram group and determine what TYPE of group this is. CLASSIFY into exactly ONE: - "dev" — Software engineering team (code, PRs, deployments, bugs, tech stack) - "product" — Product/business team (features, users, metrics, roadmap, competitors) - "client" — Client/agency channel (deliverables, timelines, approvals, invoices) - "community" — Community/interest group (recommendations, events, local info, casual) Respond ONLY with valid JSON (no markdown, no backticks): {"detected_lens": "dev|product|client|community", "confidence": 0.0-1.0, "evidence": ["signal1", "signal2", "signal3"]} """ async def detect_context(messages_text: str) -> dict: """Detect group type from a batch of messages.""" messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": f"Classify this group based on these messages:\n\n{messages_text}"}, ] try: result = await call_llm("reasoning", messages, temperature=0.1, max_tokens=300) content = result["content"].strip() if content.startswith("```"): content = content.split("```")[1] if content.startswith("json"): content = content[4:] parsed = json.loads(content.strip()) return { "detected_lens": parsed.get("detected_lens", "dev"), "confidence": parsed.get("confidence", 0.5), "evidence": parsed.get("evidence", []), } except Exception as e: logger.error(f"Context detection failed: {e}") return {"detected_lens": "dev", "confidence": 0.0, "evidence": ["detection_failed"]} ``` ### ✅ TEST MILESTONE 3 Create file: `thirdeye/scripts/test_m3.py` ```python """Test Milestone 3: Signal Extractor, Classifier, and Context Detector working.""" import asyncio, os, sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) DEV_CHAT = """[Alex]: Hey team, I think we should go with PostgreSQL for the main DB. MongoDB is overkill for our relational data. [Priya]: Agreed on Postgres. I'll set up the schema today. [Raj]: Payment module webhook integration is looking tricky. I'll handle it myself since I know the Stripe API best. [Alex]: I'm just gonna hardcode the API URL for now, we'll add env vars when we dockerize. [Sam]: The timeout error on the checkout endpoint is happening again. Third time this week. [Alex]: Just restart the pod for now, I'll look at it after the sprint.""" PRODUCT_CHAT = """[Lisa]: Users keep asking about dark mode, it comes up in every demo. [Mike]: I think we should prioritize the mobile app over the API this sprint. [Sarah]: No way, API stability is way more important. Two enterprise clients complained last week. [Lisa]: Sarah from Acme literally said 'I would pay double if you had SSO integration.' [Mike]: Competitor X just launched a mobile-first version. We're falling behind.""" async def test_signal_extractor(): from backend.agents.signal_extractor import extract_signals print("Testing Signal Extractor (DevLens)...") signals = await extract_signals(DEV_CHAT, "test-dev", lens="dev") print(f" Extracted {len(signals)} signals:") for s in signals: print(f" - [{s.type}] {s.summary[:70]}...") assert len(signals) >= 2, f"Expected >=2 signals, got {len(signals)}" print(f" ✅ DevLens extraction working ({len(signals)} signals)") print("\nTesting Signal Extractor (ProductLens)...") signals2 = await extract_signals(PRODUCT_CHAT, "test-product", lens="product") print(f" Extracted {len(signals2)} signals:") for s in signals2: print(f" - [{s.type}] {s.summary[:70]}...") assert len(signals2) >= 2, f"Expected >=2 signals, got {len(signals2)}" print(f" ✅ ProductLens extraction working ({len(signals2)} signals)") async def test_classifier(): from backend.agents.signal_extractor import extract_signals from backend.agents.classifier import classify_signal print("\nTesting Classifier Agent...") signals = await extract_signals(DEV_CHAT, "test-classify", lens="dev") if signals: classified = await classify_signal(signals[0]) print(f" Signal: {classified.summary[:60]}") print(f" Sentiment: {classified.sentiment}, Urgency: {classified.urgency}") print(f" Keywords: {classified.keywords}") print(f" ✅ Classifier working") else: print(f" ⚠️ No signals to classify (extractor returned empty)") async def test_context_detector(): from backend.agents.context_detector import detect_context print("\nTesting Context Detector...") result = await detect_context(DEV_CHAT) print(f" Detected: {result['detected_lens']} (confidence: {result['confidence']})") print(f" Evidence: {result['evidence']}") assert result["detected_lens"] == "dev", f"Expected 'dev', got '{result['detected_lens']}'" print(f" ✅ Correctly detected as 'dev'") result2 = await detect_context(PRODUCT_CHAT) print(f" Detected: {result2['detected_lens']} (confidence: {result2['confidence']})") assert result2["detected_lens"] == "product", f"Expected 'product', got '{result2['detected_lens']}'" print(f" ✅ Correctly detected as 'product'") async def main(): await test_signal_extractor() await test_classifier() await test_context_detector() print("\n🎉 MILESTONE 3 PASSED — Core agents working") asyncio.run(main()) ``` Run: `cd thirdeye && python scripts/test_m3.py` **Expected:** Signals extracted from both dev and product chat. Classifier adds metadata. Context detector correctly identifies "dev" and "product". --- ## MILESTONE 4: Full Pipeline Integration (45%) **Goal:** End-to-end pipeline: Messages → Extract → Classify → Store in ChromaDB → Query back. No Telegram yet. ### Step 4.1 — Create the pipeline orchestrator Create file: `thirdeye/backend/pipeline.py` ```python """Core pipeline: message batch → signals → classified → stored → queryable.""" import asyncio import logging from backend.agents.signal_extractor import extract_signals from backend.agents.classifier import classify_signal from backend.agents.context_detector import detect_context from backend.db.chroma import store_signals, query_signals from backend.db.models import Signal logger = logging.getLogger("thirdeye.pipeline") # In-memory group config store (replace with Redis/DB for production) _group_configs = {} async def detect_and_set_lens(group_id: str, messages_text: str) -> str: """Auto-detect lens for a group from initial messages.""" result = await detect_context(messages_text) _group_configs[group_id] = { "lens": result["detected_lens"], "confidence": result["confidence"], } logger.info(f"Group {group_id}: lens={result['detected_lens']} (conf={result['confidence']})") return result["detected_lens"] def get_lens(group_id: str) -> str: """Get the current lens for a group.""" config = _group_configs.get(group_id, {}) return config.get("lens", "dev") def set_lens(group_id: str, lens: str): """Manually set the lens for a group.""" _group_configs[group_id] = {"lens": lens, "confidence": 1.0} async def process_message_batch(group_id: str, messages: list[dict]) -> list[Signal]: """ Process a batch of messages through the full pipeline. Args: group_id: Telegram group ID messages: List of {"sender": str, "text": str, "timestamp": str} Returns: List of stored Signal objects """ # Format messages for the LLM formatted = "\n".join([f"[{m['sender']}]: {m['text']}" for m in messages]) # Get or detect lens lens = get_lens(group_id) if lens == "dev" and group_id not in _group_configs: # First time seeing this group — auto-detect lens = await detect_and_set_lens(group_id, formatted) # Step 1: Extract signals signals = await extract_signals(formatted, group_id, lens=lens) if not signals: logger.info(f"No signals extracted from batch in {group_id}") return [] # Step 2: Classify each signal (parallel for speed) classified_signals = await asyncio.gather(*[classify_signal(s) for s in signals]) # Step 3: Store in ChromaDB store_signals(group_id, [s.model_dump() for s in classified_signals]) logger.info(f"Pipeline complete: {len(classified_signals)} signals stored for {group_id}") return classified_signals async def query_knowledge(group_id: str, question: str) -> str: """Query the knowledge base with natural language and get a formatted answer.""" from backend.providers import call_llm # Retrieve relevant signals results = query_signals(group_id, question, n_results=8) if not results: return "I don't have any information about that in the knowledge base yet. The group needs more conversation for me to learn from." # Format context for the LLM context_parts = [] for i, r in enumerate(results): meta = r["metadata"] context_parts.append( f"[Signal {i+1}] Type: {meta.get('type', 'unknown')} | " f"Severity: {meta.get('severity', 'unknown')} | " f"Time: {meta.get('timestamp', 'unknown')}\n" f"Content: {r['document']}\n" f"Entities: {meta.get('entities', '[]')}" ) context = "\n\n".join(context_parts) messages = [ {"role": "system", "content": """You are the Query Agent for ThirdEye. Answer questions about the team's chat history using ONLY the provided context. RULES: 1. Answer ONLY from the provided context. Never make up information. 2. Cite sources: "Based on a discussion involving @person on [date]..." 3. If info doesn't exist in context, say so clearly. 4. Be concise — 2-4 sentences unless more is needed. 5. Format for Telegram (plain text, no markdown headers)."""}, {"role": "user", "content": f"Context from knowledge base:\n\n{context}\n\nQuestion: {question}"}, ] try: result = await call_llm("fast_large", messages, temperature=0.3, max_tokens=500) return result["content"] except Exception as e: logger.error(f"Query agent failed: {e}") return "Sorry, I encountered an error while searching the knowledge base. Please try again." ``` ### ✅ TEST MILESTONE 4 Create file: `thirdeye/scripts/test_m4.py` ```python """Test Milestone 4: Full pipeline — extract → classify → store → query.""" import asyncio, os, sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) DEV_MESSAGES = [ {"sender": "Alex", "text": "Hey team, I think we should go with PostgreSQL for the main DB. MongoDB is overkill.", "timestamp": "2026-03-20T10:00:00Z"}, {"sender": "Priya", "text": "Agreed. I'll set up the Postgres schema today.", "timestamp": "2026-03-20T10:05:00Z"}, {"sender": "Raj", "text": "Payment webhook integration is tricky. I'll handle all the Stripe stuff since I know it best.", "timestamp": "2026-03-20T11:00:00Z"}, {"sender": "Alex", "text": "I'm just hardcoding the API URL for now. We'll fix it with env vars later.", "timestamp": "2026-03-20T14:00:00Z"}, {"sender": "Sam", "text": "The timeout error on checkout is back. Third time this week.", "timestamp": "2026-03-21T09:00:00Z"}, {"sender": "Alex", "text": "Just restart the pod when it happens. I'll investigate after the sprint.", "timestamp": "2026-03-21T09:15:00Z"}, ] async def main(): from backend.pipeline import process_message_batch, query_knowledge group_id = "test_pipeline_m4" # Step 1: Process messages through full pipeline print("Processing message batch through full pipeline...") signals = await process_message_batch(group_id, DEV_MESSAGES) print(f" ✅ Pipeline produced {len(signals)} signals:") for s in signals: print(f" [{s.type}] {s.summary[:70]} (severity={s.severity}, sentiment={s.sentiment})") assert len(signals) >= 2, f"Expected >=2 signals, got {len(signals)}" # Step 2: Query the knowledge base print("\nQuerying: 'What database did the team choose?'") answer = await query_knowledge(group_id, "What database did the team choose?") print(f" Answer: {answer}") assert len(answer) > 20, "Answer too short" print(f" ✅ Query agent produced meaningful answer") print("\nQuerying: 'What tech debt exists?'") answer2 = await query_knowledge(group_id, "What tech debt exists?") print(f" Answer: {answer2}") print(f" ✅ Tech debt query works") print("\nQuerying: 'What bugs have been reported?'") answer3 = await query_knowledge(group_id, "What bugs or issues keep recurring?") print(f" Answer: {answer3}") print(f" ✅ Bug query works") # Cleanup import chromadb from backend.config import CHROMA_DB_PATH client = chromadb.PersistentClient(path=CHROMA_DB_PATH) try: client.delete_collection(f"ll_{group_id}") except: pass print("\n🎉 MILESTONE 4 PASSED — Full pipeline working end to end") asyncio.run(main()) ``` Run: `cd thirdeye && python scripts/test_m4.py` **Expected:** Signals extracted, classified, stored in ChromaDB, and queries return meaningful answers about the team's database decision, tech debt, and bugs. --- ## MILESTONE 5: Pattern Detector + Cross-Group Analyst (60%) **Goal:** Pattern detection within a group and cross-group blind spot detection working. ### Step 5.1 — Create Pattern Detector Agent Create file: `thirdeye/backend/agents/pattern_detector.py` ```python """Pattern Detector Agent — finds trends and anomalies in accumulated signals.""" import json import logging from backend.providers import call_llm from backend.db.chroma import get_all_signals from backend.db.models import Pattern logger = logging.getLogger("thirdeye.agents.pattern_detector") SYSTEM_PROMPT = """You are the Pattern Detector for ThirdEye. You analyze accumulated signals to find patterns and anomalies. Detect these pattern types: - frequency_spike: A signal type mentioned significantly more than usual - knowledge_silo: Only one person discusses a critical topic (bus factor = 1) - recurring_issue: Same bug/problem appearing repeatedly - sentiment_trend: Gradual shift in tone over time - stale_item: Decisions proposed but never resolved, promises with no follow-up Respond ONLY with valid JSON (no markdown, no backticks): {"patterns": [{"type": "pattern_type", "description": "Clear human-readable description", "severity": "info|warning|critical", "evidence_ids": [], "recommendation": "Suggested action"}]} If no patterns found: {"patterns": []} Only report patterns that are genuinely concerning. Do NOT manufacture patterns from insufficient data.""" async def detect_patterns(group_id: str) -> list[Pattern]: """Analyze all signals in a group and detect patterns.""" all_signals = get_all_signals(group_id) if len(all_signals) < 3: logger.info(f"Not enough signals ({len(all_signals)}) for pattern detection in {group_id}") return [] # Format signals for the LLM signal_summary = [] for s in all_signals: meta = s["metadata"] signal_summary.append( f"- [{meta.get('type', '?')}] {s['document'][:100]} " f"(severity={meta.get('severity', '?')}, entities={meta.get('entities', '[]')}, " f"time={meta.get('timestamp', '?')})" ) signals_text = "\n".join(signal_summary) messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": f"Analyze these {len(all_signals)} signals from group '{group_id}':\n\n{signals_text}"}, ] try: result = await call_llm("reasoning", messages, temperature=0.2, max_tokens=1500) content = result["content"].strip() if content.startswith("```"): content = content.split("```")[1] if content.startswith("json"): content = content[4:] parsed = json.loads(content.strip()) patterns = [] for p in parsed.get("patterns", []): patterns.append(Pattern( group_id=group_id, type=p.get("type", "unknown"), description=p.get("description", ""), severity=p.get("severity", "info"), recommendation=p.get("recommendation", ""), )) logger.info(f"Detected {len(patterns)} patterns in {group_id}") return patterns except Exception as e: logger.error(f"Pattern detection failed: {e}") return [] ``` ### Step 5.2 — Create Cross-Group Analyst Agent Create file: `thirdeye/backend/agents/cross_group_analyst.py` ```python """Cross-Group Analyst Agent — detects blind spots between multiple teams.""" import json import logging from backend.providers import call_llm from backend.db.chroma import get_all_signals, get_group_ids from backend.db.models import CrossGroupInsight logger = logging.getLogger("thirdeye.agents.cross_group_analyst") SYSTEM_PROMPT = """You are the Cross-Group Intelligence Analyst for ThirdEye. This is the MOST IMPORTANT analysis. You receive intelligence summaries from MULTIPLE Telegram groups. Your job is to find BLIND SPOTS — information in one group that should be in another. Detect: - blocked_handoff: Team A waiting for something from Team B, but Team B doesn't know - conflicting_decision: Team A decided X, Team B decided the opposite - information_silo: Critical info in Group A never reached Group B - promise_reality_gap: Promise made in one group, but another group shows it's blocked - duplicated_effort: Two teams working on similar things unknowingly Respond ONLY with valid JSON (no markdown): {"insights": [{"type": "insight_type", "description": "SPECIFIC description naming the groups, people, and topics", "group_a": {"name": "group_name", "evidence": "what was said"}, "group_b": {"name": "group_name", "evidence": "what was said or NOT said"}, "severity": "warning|critical", "recommendation": "Specific action"}]} If no cross-group issues: {"insights": []} Be SPECIFIC. Name the groups, people, topics, and exact conflicts.""" async def analyze_cross_group(group_summaries: dict[str, list[dict]] = None) -> list[CrossGroupInsight]: """ Analyze intelligence across all monitored groups to find blind spots. Args: group_summaries: Optional pre-built summaries. If None, loads from ChromaDB. """ if group_summaries is None: group_ids = get_group_ids() if len(group_ids) < 2: logger.info("Need at least 2 groups for cross-group analysis") return [] group_summaries = {} for gid in group_ids: signals = get_all_signals(gid) group_summaries[gid] = signals if len(group_summaries) < 2: return [] # Format summaries for the LLM summary_parts = [] for group_name, signals in group_summaries.items(): signal_lines = [] for s in signals[:30]: # Limit per group to fit context meta = s["metadata"] signal_lines.append(f" - [{meta.get('type', '?')}] {s['document'][:120]}") summary_parts.append(f"=== GROUP: {group_name} ({len(signals)} total signals) ===\n" + "\n".join(signal_lines)) full_summary = "\n\n".join(summary_parts) messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": f"Analyze cross-group intelligence:\n\n{full_summary}"}, ] try: result = await call_llm("reasoning", messages, temperature=0.2, max_tokens=2000) content = result["content"].strip() if content.startswith("```"): content = content.split("```")[1] if content.startswith("json"): content = content[4:] parsed = json.loads(content.strip()) insights = [] for i in parsed.get("insights", []): insights.append(CrossGroupInsight( type=i.get("type", "unknown"), description=i.get("description", ""), group_a=i.get("group_a", {}), group_b=i.get("group_b", {}), severity=i.get("severity", "warning"), recommendation=i.get("recommendation", ""), )) logger.info(f"Cross-group analysis found {len(insights)} insights") return insights except Exception as e: logger.error(f"Cross-group analysis failed: {e}") return [] ``` ### ✅ TEST MILESTONE 5 Create file: `thirdeye/scripts/test_m5.py` ```python """Test Milestone 5: Pattern detection + Cross-group analysis.""" import asyncio, os, sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from backend.pipeline import process_message_batch # Dev team messages with PLANTED patterns DEV_MSGS = [ {"sender": "Alex", "text": "Let's go with PostgreSQL.", "timestamp": "2026-03-15T10:00:00Z"}, {"sender": "Raj", "text": "I'll handle the payment module Stripe integration.", "timestamp": "2026-03-15T11:00:00Z"}, {"sender": "Raj", "text": "Payment webhook setup is done, only I know how this works right now.", "timestamp": "2026-03-16T10:00:00Z"}, {"sender": "Sam", "text": "Timeout error on checkout again.", "timestamp": "2026-03-17T09:00:00Z"}, {"sender": "Sam", "text": "Same timeout error. This is the third time.", "timestamp": "2026-03-18T09:00:00Z"}, {"sender": "Alex", "text": "I'm hardcoding the config for now, no time to do it properly.", "timestamp": "2026-03-18T14:00:00Z"}, {"sender": "Sam", "text": "We need the design specs for the dashboard. Still waiting.", "timestamp": "2026-03-19T10:00:00Z"}, {"sender": "Alex", "text": "Dashboard is completely blocked without those design specs.", "timestamp": "2026-03-20T10:00:00Z"}, ] # Product team messages — NOTE: no mention of design specs being needed PRODUCT_MSGS = [ {"sender": "Lisa", "text": "Dark mode is the most requested feature by far.", "timestamp": "2026-03-16T10:00:00Z"}, {"sender": "Mike", "text": "We should go mobile-first this sprint.", "timestamp": "2026-03-17T10:00:00Z"}, {"sender": "Sarah", "text": "API stability is more important than mobile. Enterprise clients are complaining.", "timestamp": "2026-03-17T10:30:00Z"}, {"sender": "Lisa", "text": "I told the client we'd have the dashboard demo ready by Friday.", "timestamp": "2026-03-18T10:00:00Z"}, {"sender": "Mike", "text": "Let's push for the API-first approach this quarter.", "timestamp": "2026-03-19T10:00:00Z"}, ] async def main(): from backend.agents.pattern_detector import detect_patterns from backend.agents.cross_group_analyst import analyze_cross_group dev_group = "test_dev_m5" product_group = "test_product_m5" # Process both groups print("Processing dev team messages...") dev_signals = await process_message_batch(dev_group, DEV_MSGS) print(f" ✅ Dev team: {len(dev_signals)} signals stored") print("Processing product team messages...") prod_signals = await process_message_batch(product_group, PRODUCT_MSGS) print(f" ✅ Product team: {len(prod_signals)} signals stored") # Test pattern detection print("\nRunning pattern detection on dev team...") patterns = await detect_patterns(dev_group) print(f" Found {len(patterns)} patterns:") for p in patterns: print(f" [{p.severity}] {p.type}: {p.description[:80]}") print(f" ✅ Pattern detection working") # Test cross-group analysis print("\nRunning cross-group analysis...") from backend.db.chroma import get_all_signals summaries = { "Acme Dev Team": get_all_signals(dev_group), "Acme Product": get_all_signals(product_group), } insights = await analyze_cross_group(summaries) print(f" Found {len(insights)} cross-group insights:") for i in insights: print(f" [{i.severity}] {i.type}: {i.description[:100]}") print(f" ✅ Cross-group analysis working") # Cleanup import chromadb from backend.config import CHROMA_DB_PATH client = chromadb.PersistentClient(path=CHROMA_DB_PATH) for name in [dev_group, product_group]: try: client.delete_collection(f"ll_{name}") except: pass print("\n🎉 MILESTONE 5 PASSED — Pattern detection + cross-group analysis working") asyncio.run(main()) ``` Run: `cd thirdeye && python scripts/test_m5.py` --- ## MILESTONE 6: Telegram Bot (70%) **Goal:** Working Telegram bot that listens to group messages, processes them, and responds to /ask queries. ### Step 6.1 — Create the Telegram bot Create file: `thirdeye/backend/bot/bot.py` ```python """ThirdEye Telegram Bot — main entry point.""" import asyncio import logging from collections import defaultdict from telegram import Update from telegram.ext import ( Application, MessageHandler, CommandHandler, filters, ContextTypes ) from backend.config import TELEGRAM_BOT_TOKEN, BATCH_SIZE from backend.pipeline import process_message_batch, query_knowledge, get_lens, set_lens, detect_and_set_lens logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s") logger = logging.getLogger("thirdeye.bot") # Message buffers per group _buffers = defaultdict(list) _group_names = {} async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): """Process every text message in groups.""" if not update.message or not update.message.text: return if not update.message.chat.type in ("group", "supergroup"): return group_id = str(update.message.chat_id) _group_names[group_id] = update.message.chat.title or group_id msg = { "sender": update.message.from_user.first_name or update.message.from_user.username or "Unknown", "text": update.message.text, "timestamp": update.message.date.isoformat(), "message_id": update.message.message_id, } _buffers[group_id].append(msg) # Process when buffer reaches batch size if len(_buffers[group_id]) >= BATCH_SIZE: batch = _buffers[group_id] _buffers[group_id] = [] try: signals = await process_message_batch(group_id, batch) if signals: logger.info(f"Processed batch: {len(signals)} signals from {_group_names.get(group_id, group_id)}") except Exception as e: logger.error(f"Pipeline error: {e}") async def cmd_ask(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /ask [question].""" if not context.args: await update.message.reply_text("Usage: /ask [your question]\nExample: /ask What database did we choose?") return question = " ".join(context.args) group_id = str(update.message.chat_id) await update.message.reply_text("🔍 Searching the knowledge base...") try: answer = await query_knowledge(group_id, question) await update.message.reply_text(f"💡 {answer}") except Exception as e: await update.message.reply_text(f"Sorry, something went wrong: {str(e)[:100]}") async def cmd_digest(update: Update, context: ContextTypes.DEFAULT_TYPE): """Generate a summary digest.""" group_id = str(update.message.chat_id) from backend.db.chroma import get_all_signals signals = get_all_signals(group_id) if not signals: await update.message.reply_text("No intelligence gathered yet. I need more conversation to learn from!") return # Group by type by_type = defaultdict(list) for s in signals: by_type[s["metadata"].get("type", "other")].append(s) parts = [f"📊 *Intelligence Digest* ({len(signals)} total signals)\n"] for sig_type, sigs in sorted(by_type.items(), key=lambda x: -len(x[1])): parts.append(f"• {sig_type.replace('_', ' ').title()}: {len(sigs)} signals") await update.message.reply_text("\n".join(parts)) async def cmd_lens(update: Update, context: ContextTypes.DEFAULT_TYPE): """Set or check the active lens.""" group_id = str(update.message.chat_id) if context.args: mode = context.args[0].lower() if mode in ("dev", "product", "client", "community", "auto"): set_lens(group_id, mode) await update.message.reply_text(f"🔭 Lens set to: {mode}") else: await update.message.reply_text("Valid lenses: dev, product, client, community, auto") else: current = get_lens(group_id) await update.message.reply_text(f"🔭 Current lens: {current}") async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE): """Welcome message.""" await update.message.reply_text( "👁️ *ThirdEye* — Conversation Intelligence Engine\n\n" "I'm now listening to this group and extracting intelligence from your conversations.\n\n" "Commands:\n" "/ask [question] — Ask about your team's knowledge\n" "/digest — Get an intelligence summary\n" "/lens [mode] — Set detection mode (dev/product/client/community)\n" "/alerts — View active warnings\n\n" "I work passively — no need to tag me. I'll alert you when I spot patterns or issues.", parse_mode=None ) def run_bot(): """Start the Telegram bot.""" app = Application.builder().token(TELEGRAM_BOT_TOKEN).build() app.add_handler(CommandHandler("start", cmd_start)) app.add_handler(CommandHandler("ask", cmd_ask)) app.add_handler(CommandHandler("digest", cmd_digest)) app.add_handler(CommandHandler("lens", cmd_lens)) app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) logger.info("ThirdEye bot starting...") app.run_polling(allowed_updates=Update.ALL_TYPES) if __name__ == "__main__": run_bot() ``` ### Step 6.2 — Create bot entry point Create file: `thirdeye/run_bot.py` ```python """Entry point to run the ThirdEye Telegram bot.""" import sys import os sys.path.insert(0, os.path.dirname(__file__)) from backend.bot.bot import run_bot if __name__ == "__main__": run_bot() ``` ### ✅ TEST MILESTONE 6 **This is a manual test — you need to interact with the bot on Telegram.** Pre-check: `cd thirdeye && python scripts/test_m6_precheck.py` Create file: `thirdeye/scripts/test_m6_precheck.py` ```python """Pre-check for Milestone 6: Verify bot token works before running.""" import asyncio, os, sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) async def test(): from telegram import Bot from backend.config import TELEGRAM_BOT_TOKEN assert TELEGRAM_BOT_TOKEN and len(TELEGRAM_BOT_TOKEN) > 10, "TELEGRAM_BOT_TOKEN not set!" bot = Bot(token=TELEGRAM_BOT_TOKEN) me = await bot.get_me() print(f" ✅ Bot connected: @{me.username} ({me.first_name})") print(f"\n IMPORTANT: Before testing in a group:") print(f" 1. Open Telegram → Search @BotFather") print(f" 2. Send: /setprivacy") print(f" 3. Select @{me.username}") print(f" 4. Choose: Disable") print(f" 5. Create a test group, add @{me.username} to it") print(f"\n Then run: python run_bot.py") print(f" Send messages in the group, then try: /ask [question]") asyncio.run(test()) ``` **Manual test steps:** 1. Run `python scripts/test_m6_precheck.py` — verify bot connects 2. Disable privacy mode in BotFather (CRITICAL) 3. Create a test Telegram group 4. Add the bot to the group 5. Run `python run_bot.py` 6. Send 5+ messages in the group (mix of tech discussion) 7. Try `/ask What was discussed?` 8. Try `/digest` 9. Try `/lens` **Expected:** Bot receives messages, processes them in batches of 5, and `/ask` returns relevant answers. ``` 🎉 MILESTONE 6 PASSED — Bot is live and processing messages ``` --- ## MILESTONE 7: FastAPI Backend + Dashboard (85%) **Goal:** REST API serving data to a React dashboard. ### Step 7.1 — Create FastAPI routes Create file: `thirdeye/backend/api/routes.py` ```python """FastAPI routes for the ThirdEye dashboard.""" from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from backend.db.chroma import get_all_signals, query_signals, get_group_ids from backend.pipeline import query_knowledge, get_lens, set_lens from backend.agents.pattern_detector import detect_patterns from backend.agents.cross_group_analyst import analyze_cross_group from collections import defaultdict app = FastAPI(title="ThirdEye API", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) @app.get("/api/groups") async def list_groups(): """List all monitored groups.""" group_ids = get_group_ids() groups = [] for gid in group_ids: signals = get_all_signals(gid) groups.append({ "group_id": gid, "signal_count": len(signals), "lens": get_lens(gid), }) return {"groups": groups} @app.get("/api/groups/{group_id}/signals") async def get_signals(group_id: str, signal_type: str = None): """Get all signals for a group.""" signals = get_all_signals(group_id, signal_type=signal_type) return {"signals": signals, "count": len(signals)} @app.post("/api/groups/{group_id}/query") async def query_group(group_id: str, body: dict): """Natural language query over a group's knowledge base.""" question = body.get("question", "") if not question: raise HTTPException(400, "Missing 'question' field") answer = await query_knowledge(group_id, question) return {"answer": answer, "question": question} @app.get("/api/groups/{group_id}/patterns") async def get_patterns(group_id: str): """Detect and return patterns for a group.""" patterns = await detect_patterns(group_id) return {"patterns": [p.model_dump() for p in patterns]} @app.get("/api/cross-group/insights") async def get_cross_group_insights(): """Run cross-group analysis and return insights.""" group_ids = get_group_ids() if len(group_ids) < 2: return {"insights": [], "message": "Need at least 2 monitored groups"} summaries = {} for gid in group_ids: summaries[gid] = get_all_signals(gid) insights = await analyze_cross_group(summaries) return {"insights": [i.model_dump() for i in insights]} @app.get("/health") async def health(): return {"status": "ok", "service": "thirdeye"} ``` ### Step 7.2 — Create API entry point Create file: `thirdeye/run_api.py` ```python """Entry point to run the ThirdEye API server.""" import sys, os sys.path.insert(0, os.path.dirname(__file__)) import uvicorn from backend.api.routes import app if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000, reload=True) ``` ### ✅ TEST MILESTONE 7 ```bash # Terminal 1: Start the API cd thirdeye && python run_api.py # Terminal 2: Test endpoints curl http://localhost:8000/health # Expected: {"status":"ok","service":"thirdeye"} curl http://localhost:8000/api/groups # Expected: {"groups": [...]} # If you ran Milestone 4/5 tests and have data: curl http://localhost:8000/api/groups/test_dev_m5/signals curl -X POST http://localhost:8000/api/groups/test_dev_m5/query \ -H "Content-Type: application/json" \ -d '{"question": "What database was chosen?"}' ``` ``` 🎉 MILESTONE 7 PASSED — API serving data for dashboard ``` --- ## MILESTONE 8: Run Both Bot + API Together (90%) **Goal:** Bot and API running simultaneously. Bot feeds data, API serves it. ### Step 8.1 — Create unified runner Create file: `thirdeye/run_all.py` ```python """Run both the Telegram bot and FastAPI server together.""" import sys, os, asyncio, threading, logging sys.path.insert(0, os.path.dirname(__file__)) logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s") def run_api_server(): """Run FastAPI in a separate thread.""" import uvicorn from backend.api.routes import app uvicorn.run(app, host="0.0.0.0", port=8000, log_level="warning") def run_telegram_bot(): """Run Telegram bot (blocks).""" from backend.bot.bot import run_bot run_bot() if __name__ == "__main__": print("🚀 Starting ThirdEye...") print(" API: http://localhost:8000") print(" Bot: Running on Telegram") print(" Dashboard: Open dashboard/index.html or run React dev server\n") # Start API in background thread api_thread = threading.Thread(target=run_api_server, daemon=True) api_thread.start() # Run bot in main thread (it has its own event loop) run_telegram_bot() ``` ### ✅ TEST MILESTONE 8 ```bash cd thirdeye && python run_all.py ``` Then in another terminal: ```bash # API should be running curl http://localhost:8000/health # Bot should be responding in Telegram # Send messages in your test group, then /ask ``` ``` 🎉 MILESTONE 8 PASSED — Full system running ``` --- ## MILESTONE 9: Demo Data Seeding (95%) **Goal:** Pre-seed 3 groups with realistic messages that demonstrate all features. ### Step 9.1 — Create demo seeding script Create file: `thirdeye/scripts/seed_demo.py` ```python """Seed demo data directly into ChromaDB (bypasses Telegram for speed).""" import asyncio, os, sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from backend.pipeline import process_message_batch, set_lens # ====== ACME DEV TEAM ====== DEV_MESSAGES = [ {"sender": "Alex", "text": "Team, I'm proposing we use PostgreSQL for the main database. Our data is highly relational.", "timestamp": "2026-03-10T10:00:00Z"}, {"sender": "Priya", "text": "Agreed on Postgres. I'll set up the initial schema and migrations.", "timestamp": "2026-03-10T10:15:00Z"}, {"sender": "Raj", "text": "I'll take ownership of the payment module. I have experience with Stripe webhooks.", "timestamp": "2026-03-10T14:00:00Z"}, {"sender": "Alex", "text": "For the auth service, I'm hardcoding the JWT secret for now. We'll move to vault later.", "timestamp": "2026-03-11T09:00:00Z"}, {"sender": "Sam", "text": "Getting a timeout error on the checkout endpoint. Seems intermittent.", "timestamp": "2026-03-12T10:00:00Z"}, {"sender": "Raj", "text": "Payment webhook is fully integrated now. Only I know how the retry logic works though.", "timestamp": "2026-03-13T11:00:00Z"}, {"sender": "Sam", "text": "Timeout error on checkout is back again. Second time this week.", "timestamp": "2026-03-14T09:00:00Z"}, {"sender": "Alex", "text": "Just restart the pod when it happens. I'll investigate after the sprint.", "timestamp": "2026-03-14T09:30:00Z"}, {"sender": "Sam", "text": "Has anyone heard back from design about the dashboard specs? We need them to start.", "timestamp": "2026-03-15T10:00:00Z"}, {"sender": "Alex", "text": "Still no dashboard specs from design. This is blocking my entire sprint work.", "timestamp": "2026-03-17T10:00:00Z"}, {"sender": "Sam", "text": "Timeout error AGAIN. That's the third time. We have a systemic issue here.", "timestamp": "2026-03-18T09:00:00Z"}, {"sender": "Priya", "text": "I'm pushing this config change directly to main. It's a small fix, should be fine.", "timestamp": "2026-03-19T14:00:00Z"}, {"sender": "Sam", "text": "Dashboard is completely blocked without those design specs. Week 2 of waiting.", "timestamp": "2026-03-20T10:00:00Z"}, ] # ====== ACME PRODUCT TEAM ====== PRODUCT_MESSAGES = [ {"sender": "Lisa", "text": "Users keep asking about dark mode. It comes up in literally every demo.", "timestamp": "2026-03-10T10:00:00Z"}, {"sender": "Mike", "text": "I think we should prioritize the mobile app this sprint. Mobile traffic is 60%.", "timestamp": "2026-03-11T10:00:00Z"}, {"sender": "Sarah", "text": "No, API stability is way more important. Two enterprise clients complained last week about downtime.", "timestamp": "2026-03-11T10:30:00Z"}, {"sender": "Lisa", "text": "Sarah from ClientCo literally said 'I would pay double if you had SSO integration.'", "timestamp": "2026-03-12T14:00:00Z"}, {"sender": "Mike", "text": "Competitor X just launched a mobile-first version. We're falling behind on mobile.", "timestamp": "2026-03-13T10:00:00Z"}, {"sender": "Lisa", "text": "I told ClientCo we'd have the dashboard demo ready by Friday March 21st.", "timestamp": "2026-03-14T10:00:00Z"}, {"sender": "Sarah", "text": "Our conversion rate dropped to 2.3% after the last release. That's concerning.", "timestamp": "2026-03-15T11:00:00Z"}, {"sender": "Mike", "text": "Let's commit to API-first approach for the rest of Q1.", "timestamp": "2026-03-17T10:00:00Z"}, {"sender": "Lisa", "text": "Dark mode was mentioned again by three different users at the conference.", "timestamp": "2026-03-19T10:00:00Z"}, {"sender": "Sarah", "text": "We really need to decide: mobile or API stability? This conflict is slowing us down.", "timestamp": "2026-03-20T10:00:00Z"}, ] # ====== ACME ↔ CLIENT CHANNEL ====== CLIENT_MESSAGES = [ {"sender": "Lisa", "text": "Hi ClientCo team! Just confirming we'll have the dashboard mockups ready by Friday March 21st.", "timestamp": "2026-03-10T10:00:00Z"}, {"sender": "Client_CEO", "text": "Great, looking forward to seeing them. This is a key deliverable for our board meeting.", "timestamp": "2026-03-10T10:30:00Z"}, {"sender": "Lisa", "text": "We'll also share the API documentation by Wednesday March 19th.", "timestamp": "2026-03-11T10:00:00Z"}, {"sender": "Client_CEO", "text": "Perfect. Oh, could you also add an export-to-PDF feature for the reports? That would be really helpful.", "timestamp": "2026-03-12T10:00:00Z"}, {"sender": "Lisa", "text": "Sure, we'll look into the PDF export!", "timestamp": "2026-03-12T10:15:00Z"}, {"sender": "Client_CEO", "text": "Any update on the dashboard mockups?", "timestamp": "2026-03-17T10:00:00Z"}, {"sender": "Client_CEO", "text": "Also, would it be possible to add a dark mode option?", "timestamp": "2026-03-18T10:00:00Z"}, {"sender": "Client_CEO", "text": "We really need those mockups before the board meeting on Monday.", "timestamp": "2026-03-19T10:00:00Z"}, {"sender": "Client_CEO", "text": "I might need to loop in our VP if we can't get the timeline confirmed.", "timestamp": "2026-03-20T10:00:00Z"}, ] async def seed(): print("🌱 Seeding demo data...\n") # Set lenses explicitly set_lens("acme_dev", "dev") set_lens("acme_product", "product") set_lens("acme_client", "client") # Process dev team (2 batches) print("Processing Acme Dev Team...") s1 = await process_message_batch("acme_dev", DEV_MESSAGES[:7]) s2 = await process_message_batch("acme_dev", DEV_MESSAGES[7:]) print(f" ✅ Dev team: {len(s1) + len(s2)} signals stored\n") # Process product team print("Processing Acme Product Team...") s3 = await process_message_batch("acme_product", PRODUCT_MESSAGES[:5]) s4 = await process_message_batch("acme_product", PRODUCT_MESSAGES[5:]) print(f" ✅ Product team: {len(s3) + len(s4)} signals stored\n") # Process client channel print("Processing Acme ↔ ClientCo Channel...") s5 = await process_message_batch("acme_client", CLIENT_MESSAGES[:5]) s6 = await process_message_batch("acme_client", CLIENT_MESSAGES[5:]) print(f" ✅ Client channel: {len(s5) + len(s6)} signals stored\n") # Run pattern detection print("Running pattern detection on dev team...") from backend.agents.pattern_detector import detect_patterns patterns = await detect_patterns("acme_dev") print(f" Found {len(patterns)} patterns") for p in patterns: print(f" [{p.severity}] {p.type}: {p.description[:80]}") # Run cross-group analysis print("\n🔥 Running CROSS-GROUP ANALYSIS...") from backend.agents.cross_group_analyst import analyze_cross_group from backend.db.chroma import get_all_signals summaries = { "Acme Dev Team": get_all_signals("acme_dev"), "Acme Product": get_all_signals("acme_product"), "Acme ↔ ClientCo": get_all_signals("acme_client"), } insights = await analyze_cross_group(summaries) print(f"\n Found {len(insights)} CROSS-GROUP INSIGHTS:") for i in insights: print(f" 🚨 [{i.severity}] {i.type}") print(f" {i.description[:120]}") print(f" Recommendation: {i.recommendation[:100]}") print() print("🎉 Demo data seeded successfully!") print(" Start the API with: python run_api.py") print(" Then visit: http://localhost:8000/api/groups") print(" And: http://localhost:8000/api/cross-group/insights") asyncio.run(seed()) ``` ### ✅ TEST MILESTONE 9 ```bash cd thirdeye && python scripts/seed_demo.py ``` Then verify the API serves the data: ```bash python run_api.py & curl http://localhost:8000/api/groups curl http://localhost:8000/api/groups/acme_dev/signals curl http://localhost:8000/api/cross-group/insights ``` ``` 🎉 MILESTONE 9 PASSED — Demo data seeded and queryable ``` --- ## MILESTONE 10: Final Polish & Demo Ready (100%) **Goal:** Everything works. Demo rehearsed. Ready to present. ### Checklist - [ ] `python run_all.py` starts both bot + API without errors - [ ] Demo data is seeded (`python scripts/seed_demo.py`) - [ ] API endpoints return data: `/api/groups`, `/api/groups/{id}/signals`, `/api/cross-group/insights` - [ ] Telegram bot responds to `/start`, `/ask`, `/digest`, `/lens` - [ ] Cross-group insights are detected (the wow moment) - [ ] 90-second demo script is rehearsed (see THIRDEYE_BIBLE.md Section 20) - [ ] GitHub repo has README with setup instructions - [ ] Backup: screenshots of dashboard / API output saved in case live demo fails ### Create README.md Create file: `thirdeye/README.md` ```markdown # ThirdEye 👁️ **Your team already has the answers. They're just trapped in chat.** ThirdEye is a multi-agent conversation intelligence engine that passively monitors Telegram group chats, extracts structured knowledge, and detects blind spots across teams. ## Features - 🔍 Passive intelligence extraction (no commands needed) - 🔭 Adaptive lens system (DevLens, ProductLens, ClientLens, CommunityLens) - 🧠 Living knowledge graph that grows over time - 📊 Pattern detection (tech debt trends, knowledge silos, recurring bugs) - 🔥 **Cross-group intelligence** — detects blind spots between teams - 💬 Natural language querying via Telegram ## Quick Start ```bash pip install -r requirements.txt cp .env.example .env # Fill in API keys python scripts/seed_demo.py # Seed demo data python run_all.py # Start bot + API ``` ## Architecture 8 specialized AI agents powered by 5 free LLM providers (Groq, Cerebras, SambaNova, OpenRouter, Cohere) with automatic fallback routing. Built in 24 hours with $0 in API costs. ``` ``` 🎉 MILESTONE 10 COMPLETE — ThirdEye is ready to win. ``` --- ## MILESTONE SUMMARY | # | Milestone | What You Have | % | |---|---|---|---| | 0 | Scaffolding | Folders, deps, env vars, all API keys | 0% | | 1 | Provider Router | Multi-provider LLM calls with fallback | 10% | | 2 | ChromaDB + Embeddings | Store and retrieve signals with vector search | 20% | | 3 | Core Agents | Signal Extractor + Classifier + Context Detector | 30% | | 4 | Full Pipeline | Messages → Extract → Classify → Store → Query | 45% | | 5 | Intelligence Layer | Pattern detection + Cross-group analysis | 60% | | 6 | Telegram Bot | Live bot processing group messages | 70% | | 7 | FastAPI + Dashboard API | REST API serving all data | 85% | | 8 | Unified Runner | Bot + API running together | 90% | | 9 | Demo Data | 3 groups seeded with realistic data | 95% | | 10 | Polish & Demo Ready | README, rehearsed demo, everything working | 100% | **Every milestone has a test. Every test must pass. No skipping.**