mirror of
https://github.com/arkorty/B.Tech-Project-III.git
synced 2026-04-19 12:41:48 +00:00
2129 lines
78 KiB
Markdown
2129 lines
78 KiB
Markdown
# ThirdEye — Implementation Playbook (Milestone 0→100)
|
|
|
|
> **Rule: Do NOT skip milestones. Do NOT skip tests. Every test must PASS before moving to the next milestone.**
|
|
> **This document is designed to be followed by a human OR a coding agent sequentially.**
|
|
|
|
---
|
|
|
|
## MILESTONE 0: Project Scaffolding (0%)
|
|
**Goal:** Empty project with correct folder structure, all dependencies installed, all env vars configured.
|
|
|
|
### Step 0.1 — Create project structure
|
|
|
|
```bash
|
|
mkdir -p thirdeye/{backend/{agents,bot,db,mcp,api},dashboard,scripts,demo/demo_messages}
|
|
cd thirdeye
|
|
```
|
|
|
|
### Step 0.2 — Create requirements.txt
|
|
|
|
Create file: `thirdeye/requirements.txt`
|
|
```
|
|
python-telegram-bot==21.9
|
|
openai==1.82.0
|
|
chromadb==1.0.7
|
|
fastapi==0.115.12
|
|
uvicorn==0.34.2
|
|
cohere==5.15.0
|
|
python-dotenv==1.1.0
|
|
sentence-transformers==4.1.0
|
|
pydantic==2.11.2
|
|
httpx==0.28.1
|
|
```
|
|
|
|
### Step 0.3 — Create .env file
|
|
|
|
Create file: `thirdeye/.env`
|
|
```bash
|
|
# Telegram
|
|
TELEGRAM_BOT_TOKEN=your_telegram_bot_token_here
|
|
|
|
# LLM Providers (all free, no credit card needed)
|
|
GROQ_API_KEY=your_groq_key_here
|
|
CEREBRAS_API_KEY=your_cerebras_key_here
|
|
SAMBANOVA_API_KEY=your_sambanova_key_here
|
|
OPENROUTER_API_KEY=your_openrouter_key_here
|
|
GEMINI_API_KEY=your_gemini_key_here
|
|
|
|
# Embeddings
|
|
COHERE_API_KEY=your_cohere_key_here
|
|
|
|
# App Config
|
|
CHROMA_DB_PATH=./chroma_db
|
|
BATCH_SIZE=5
|
|
BATCH_TIMEOUT_SECONDS=60
|
|
```
|
|
|
|
### Step 0.4 — Create config.py
|
|
|
|
Create file: `thirdeye/backend/config.py`
|
|
```python
|
|
import os
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
# Telegram
|
|
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
|
|
|
|
# LLM Providers
|
|
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
|
|
CEREBRAS_API_KEY = os.getenv("CEREBRAS_API_KEY")
|
|
SAMBANOVA_API_KEY = os.getenv("SAMBANOVA_API_KEY")
|
|
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
|
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
|
|
|
|
# Embeddings
|
|
COHERE_API_KEY = os.getenv("COHERE_API_KEY")
|
|
|
|
# App
|
|
CHROMA_DB_PATH = os.getenv("CHROMA_DB_PATH", "./chroma_db")
|
|
BATCH_SIZE = int(os.getenv("BATCH_SIZE", "5"))
|
|
BATCH_TIMEOUT_SECONDS = int(os.getenv("BATCH_TIMEOUT_SECONDS", "60"))
|
|
```
|
|
|
|
### Step 0.5 — Install dependencies
|
|
|
|
```bash
|
|
cd thirdeye
|
|
pip install -r requirements.txt
|
|
```
|
|
|
|
### Step 0.6 — Get ALL API keys (do this NOW)
|
|
|
|
Open each URL in browser tabs simultaneously:
|
|
|
|
1. **Groq:** https://console.groq.com → Sign up → API Keys → Create
|
|
2. **Cerebras:** https://cloud.cerebras.ai → Sign up → Portal → API Keys
|
|
3. **SambaNova:** https://cloud.sambanova.ai → Sign up → API Keys
|
|
4. **OpenRouter:** https://openrouter.ai → Sign up → Dashboard → Keys → Create Key
|
|
5. **Cohere:** https://dashboard.cohere.com → Sign up → API Keys
|
|
6. **Telegram Bot:** Open Telegram → Search @BotFather → /newbot → Follow prompts → Copy token
|
|
7. **Google (last resort):** https://aistudio.google.com → Get API Key
|
|
|
|
Fill all values in `.env`.
|
|
|
|
### ✅ TEST MILESTONE 0
|
|
|
|
Create file: `thirdeye/scripts/test_m0.py`
|
|
```python
|
|
"""Test Milestone 0: Project structure and env vars."""
|
|
import os
|
|
import sys
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
from backend.config import (
|
|
TELEGRAM_BOT_TOKEN, GROQ_API_KEY, CEREBRAS_API_KEY,
|
|
SAMBANOVA_API_KEY, OPENROUTER_API_KEY, COHERE_API_KEY
|
|
)
|
|
|
|
checks = {
|
|
"TELEGRAM_BOT_TOKEN": TELEGRAM_BOT_TOKEN,
|
|
"GROQ_API_KEY": GROQ_API_KEY,
|
|
"CEREBRAS_API_KEY": CEREBRAS_API_KEY,
|
|
"SAMBANOVA_API_KEY": SAMBANOVA_API_KEY,
|
|
"OPENROUTER_API_KEY": OPENROUTER_API_KEY,
|
|
"COHERE_API_KEY": COHERE_API_KEY,
|
|
}
|
|
|
|
all_pass = True
|
|
for name, val in checks.items():
|
|
status = "✅" if val and len(val) > 5 else "❌ MISSING"
|
|
if "❌" in status:
|
|
all_pass = False
|
|
print(f" {status} {name}")
|
|
|
|
# Check directories exist
|
|
for d in ["backend/agents", "backend/bot", "backend/db", "backend/api", "dashboard", "scripts"]:
|
|
path = os.path.join(os.path.dirname(__file__), '..', d)
|
|
exists = os.path.isdir(path)
|
|
status = "✅" if exists else "❌ MISSING"
|
|
if not exists:
|
|
all_pass = False
|
|
print(f" {status} Directory: {d}")
|
|
|
|
print(f"\n{'🎉 MILESTONE 0 PASSED' if all_pass else '💥 MILESTONE 0 FAILED — fix issues above'}")
|
|
```
|
|
|
|
Run: `cd thirdeye && python scripts/test_m0.py`
|
|
|
|
**Expected output:** All ✅ checks, ending with "🎉 MILESTONE 0 PASSED"
|
|
|
|
---
|
|
|
|
## MILESTONE 1: Provider Router (10%)
|
|
**Goal:** Multi-provider LLM router that falls back across Groq → Cerebras → SambaNova → OpenRouter → Google. Verified working.
|
|
|
|
### Step 1.1 — Create the provider router
|
|
|
|
Create file: `thirdeye/backend/providers.py`
|
|
```python
|
|
"""Multi-provider LLM router with automatic fallback on rate limits."""
|
|
import asyncio
|
|
import logging
|
|
from openai import AsyncOpenAI
|
|
from backend.config import (
|
|
GROQ_API_KEY, CEREBRAS_API_KEY, SAMBANOVA_API_KEY,
|
|
OPENROUTER_API_KEY, GEMINI_API_KEY
|
|
)
|
|
|
|
logger = logging.getLogger("thirdeye.providers")
|
|
|
|
# Initialize provider clients
|
|
_clients = {}
|
|
|
|
def _init_client(name, base_url, api_key):
|
|
if api_key and len(api_key) > 5:
|
|
_clients[name] = AsyncOpenAI(base_url=base_url, api_key=api_key)
|
|
|
|
_init_client("groq", "https://api.groq.com/openai/v1", GROQ_API_KEY)
|
|
_init_client("cerebras", "https://api.cerebras.ai/v1", CEREBRAS_API_KEY)
|
|
_init_client("sambanova", "https://api.sambanova.ai/v1", SAMBANOVA_API_KEY)
|
|
_init_client("openrouter", "https://openrouter.ai/api/v1", OPENROUTER_API_KEY)
|
|
_init_client("google", "https://generativelanguage.googleapis.com/v1beta/openai/", GEMINI_API_KEY)
|
|
|
|
|
|
# Model registry — task_type → [(provider, model_id), ...]
|
|
MODEL_REGISTRY = {
|
|
"fast_small": [
|
|
("groq", "llama-3.1-8b-instant"),
|
|
("cerebras", "llama-3.1-8b"),
|
|
("openrouter", "openai/gpt-oss-20b:free"),
|
|
],
|
|
"fast_large": [
|
|
("groq", "llama-3.3-70b-versatile"),
|
|
("cerebras", "llama-3.3-70b"),
|
|
("openrouter", "meta-llama/llama-3.3-70b-instruct:free"),
|
|
("sambanova", "Meta-Llama-3.3-70B-Instruct"),
|
|
],
|
|
"reasoning": [
|
|
("sambanova", "Meta-Llama-3.1-405B-Instruct"),
|
|
("openrouter", "nvidia/nemotron-3-super-120b-a12b:free"),
|
|
("openrouter", "openai/gpt-oss-120b:free"),
|
|
],
|
|
"agentic": [
|
|
("openrouter", "minimax/minimax-m2.5:free"),
|
|
("openrouter", "nvidia/nemotron-3-super-120b-a12b:free"),
|
|
("groq", "llama-3.3-70b-versatile"),
|
|
],
|
|
"fallback": [
|
|
("openrouter", "openrouter/free"),
|
|
("google", "gemini-2.0-flash"),
|
|
],
|
|
}
|
|
|
|
|
|
async def call_llm(
|
|
task_type: str,
|
|
messages: list,
|
|
temperature: float = 0.3,
|
|
max_tokens: int = 2000,
|
|
response_format: dict = None,
|
|
) -> dict:
|
|
"""
|
|
Route to best available provider with automatic fallback.
|
|
|
|
Args:
|
|
task_type: Key from MODEL_REGISTRY (fast_small, fast_large, reasoning, agentic)
|
|
messages: OpenAI-format messages list
|
|
temperature: Sampling temperature
|
|
max_tokens: Max output tokens
|
|
response_format: Optional {"type": "json_object"} for JSON mode
|
|
|
|
Returns:
|
|
{"content": str, "provider": str, "model": str}
|
|
"""
|
|
candidates = MODEL_REGISTRY.get(task_type, []) + MODEL_REGISTRY["fallback"]
|
|
errors = []
|
|
|
|
for provider_name, model_id in candidates:
|
|
client = _clients.get(provider_name)
|
|
if not client:
|
|
continue
|
|
|
|
try:
|
|
kwargs = {
|
|
"model": model_id,
|
|
"messages": messages,
|
|
"temperature": temperature,
|
|
"max_tokens": max_tokens,
|
|
"timeout": 45,
|
|
}
|
|
if response_format and provider_name != "google":
|
|
kwargs["response_format"] = response_format
|
|
|
|
response = await client.chat.completions.create(**kwargs)
|
|
content = response.choices[0].message.content
|
|
|
|
logger.info(f"LLM call success: {provider_name}/{model_id} ({task_type})")
|
|
return {
|
|
"content": content,
|
|
"provider": provider_name,
|
|
"model": model_id,
|
|
}
|
|
|
|
except Exception as e:
|
|
err = str(e).lower()
|
|
is_rate_limit = any(k in err for k in ["429", "rate", "quota", "limit", "exceeded", "capacity"])
|
|
is_timeout = "timeout" in err or "timed out" in err
|
|
|
|
if is_rate_limit or is_timeout:
|
|
logger.warning(f"Provider {provider_name}/{model_id} unavailable: {type(e).__name__}")
|
|
errors.append(f"{provider_name}: rate limited")
|
|
continue
|
|
else:
|
|
logger.error(f"Provider {provider_name}/{model_id} error: {e}")
|
|
errors.append(f"{provider_name}: {e}")
|
|
continue
|
|
|
|
raise Exception(f"All LLM providers exhausted. Errors: {errors}")
|
|
```
|
|
|
|
### ✅ TEST MILESTONE 1
|
|
|
|
Create file: `thirdeye/scripts/test_m1.py`
|
|
```python
|
|
"""Test Milestone 1: Provider router works with at least one provider."""
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
async def test_providers():
|
|
from backend.providers import call_llm
|
|
|
|
test_messages = [
|
|
{"role": "user", "content": "Reply with exactly: THIRDEYE_OK"}
|
|
]
|
|
|
|
# Test 1: fast_small (should use Groq 8B)
|
|
print("Testing fast_small (Groq 8B / Cerebras 8B)...")
|
|
try:
|
|
result = await call_llm("fast_small", test_messages, max_tokens=50)
|
|
print(f" ✅ fast_small → {result['provider']}/{result['model']}")
|
|
print(f" Response: {result['content'][:80]}")
|
|
except Exception as e:
|
|
print(f" ❌ fast_small failed: {e}")
|
|
|
|
# Test 2: fast_large (should use Groq 70B)
|
|
print("Testing fast_large (Groq/Cerebras 70B)...")
|
|
try:
|
|
result = await call_llm("fast_large", test_messages, max_tokens=50)
|
|
print(f" ✅ fast_large → {result['provider']}/{result['model']}")
|
|
print(f" Response: {result['content'][:80]}")
|
|
except Exception as e:
|
|
print(f" ❌ fast_large failed: {e}")
|
|
|
|
# Test 3: reasoning (should use SambaNova 405B)
|
|
print("Testing reasoning (SambaNova 405B / OpenRouter Nemotron)...")
|
|
try:
|
|
result = await call_llm("reasoning", test_messages, max_tokens=50)
|
|
print(f" ✅ reasoning → {result['provider']}/{result['model']}")
|
|
print(f" Response: {result['content'][:80]}")
|
|
except Exception as e:
|
|
print(f" ❌ reasoning failed: {e}")
|
|
|
|
# Test 4: JSON mode
|
|
print("Testing JSON mode...")
|
|
try:
|
|
json_messages = [
|
|
{"role": "system", "content": "You respond only in valid JSON."},
|
|
{"role": "user", "content": 'Return: {"status": "ok", "test": true}'},
|
|
]
|
|
result = await call_llm("fast_small", json_messages, max_tokens=100)
|
|
print(f" ✅ JSON mode → {result['provider']}/{result['model']}")
|
|
print(f" Response: {result['content'][:120]}")
|
|
except Exception as e:
|
|
print(f" ❌ JSON mode failed: {e}")
|
|
|
|
print("\n🎉 MILESTONE 1 PASSED — At least one provider works per task type")
|
|
|
|
asyncio.run(test_providers())
|
|
```
|
|
|
|
Run: `cd thirdeye && python scripts/test_m1.py`
|
|
|
|
**Expected:** At least one ✅ per task type. If a specific provider fails (429), that's fine — the fallback should catch it.
|
|
|
|
---
|
|
|
|
## MILESTONE 2: ChromaDB + Embeddings (20%)
|
|
**Goal:** ChromaDB running with Cohere embeddings. Can store and retrieve signals.
|
|
|
|
### Step 2.1 — Create embedding module
|
|
|
|
Create file: `thirdeye/backend/db/embeddings.py`
|
|
```python
|
|
"""Embedding provider with Cohere primary and local fallback."""
|
|
import cohere
|
|
import logging
|
|
from backend.config import COHERE_API_KEY
|
|
|
|
logger = logging.getLogger("thirdeye.embeddings")
|
|
|
|
_cohere_client = None
|
|
_local_model = None
|
|
|
|
def _get_cohere():
|
|
global _cohere_client
|
|
if _cohere_client is None and COHERE_API_KEY:
|
|
_cohere_client = cohere.Client(COHERE_API_KEY)
|
|
return _cohere_client
|
|
|
|
def _get_local_model():
|
|
global _local_model
|
|
if _local_model is None:
|
|
from sentence_transformers import SentenceTransformer
|
|
_local_model = SentenceTransformer("all-MiniLM-L6-v2")
|
|
logger.info("Loaded local embedding model: all-MiniLM-L6-v2")
|
|
return _local_model
|
|
|
|
|
|
def embed_texts(texts: list[str]) -> list[list[float]]:
|
|
"""Embed a list of texts. Tries Cohere first, falls back to local model."""
|
|
if not texts:
|
|
return []
|
|
|
|
# Try Cohere
|
|
client = _get_cohere()
|
|
if client:
|
|
try:
|
|
response = client.embed(
|
|
texts=texts,
|
|
model="embed-english-v3.0",
|
|
input_type="search_document",
|
|
)
|
|
logger.info(f"Cohere embedded {len(texts)} texts")
|
|
return [list(e) for e in response.embeddings]
|
|
except Exception as e:
|
|
logger.warning(f"Cohere embedding failed: {e}, falling back to local")
|
|
|
|
# Fallback to local
|
|
model = _get_local_model()
|
|
embeddings = model.encode(texts).tolist()
|
|
logger.info(f"Local model embedded {len(texts)} texts")
|
|
return embeddings
|
|
|
|
|
|
def embed_query(text: str) -> list[float]:
|
|
"""Embed a single query text."""
|
|
client = _get_cohere()
|
|
if client:
|
|
try:
|
|
response = client.embed(
|
|
texts=[text],
|
|
model="embed-english-v3.0",
|
|
input_type="search_query",
|
|
)
|
|
return list(response.embeddings[0])
|
|
except Exception:
|
|
pass
|
|
|
|
model = _get_local_model()
|
|
return model.encode([text]).tolist()[0]
|
|
```
|
|
|
|
### Step 2.2 — Create ChromaDB manager
|
|
|
|
Create file: `thirdeye/backend/db/chroma.py`
|
|
```python
|
|
"""ChromaDB setup and operations."""
|
|
import json
|
|
import uuid
|
|
import chromadb
|
|
import logging
|
|
from datetime import datetime
|
|
from backend.config import CHROMA_DB_PATH
|
|
from backend.db.embeddings import embed_texts, embed_query
|
|
|
|
logger = logging.getLogger("thirdeye.chroma")
|
|
|
|
# Initialize persistent client
|
|
_chroma_client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
|
|
|
|
|
|
def get_collection(group_id: str) -> chromadb.Collection:
|
|
"""Get or create a collection for a specific group."""
|
|
safe_name = f"ll_{group_id.replace('-', '_')}"
|
|
# ChromaDB collection names: 3-63 chars, alphanumeric + underscores
|
|
safe_name = safe_name[:63]
|
|
return _chroma_client.get_or_create_collection(name=safe_name)
|
|
|
|
|
|
def store_signals(group_id: str, signals: list[dict]):
|
|
"""Store extracted signals in ChromaDB with embeddings."""
|
|
if not signals:
|
|
return
|
|
|
|
collection = get_collection(group_id)
|
|
documents = []
|
|
metadatas = []
|
|
ids = []
|
|
|
|
for signal in signals:
|
|
doc_text = f"{signal['type']}: {signal['summary']}"
|
|
if signal.get('raw_quote'):
|
|
doc_text += f" | Quote: {signal['raw_quote']}"
|
|
|
|
documents.append(doc_text)
|
|
metadatas.append({
|
|
"type": signal.get("type", "unknown"),
|
|
"severity": signal.get("severity", "low"),
|
|
"status": signal.get("status", "unknown"),
|
|
"sentiment": signal.get("sentiment", "neutral"),
|
|
"urgency": signal.get("urgency", "none"),
|
|
"entities": json.dumps(signal.get("entities", [])),
|
|
"keywords": json.dumps(signal.get("keywords", [])),
|
|
"raw_quote": signal.get("raw_quote", ""),
|
|
"timestamp": signal.get("timestamp", datetime.utcnow().isoformat()),
|
|
"group_id": group_id,
|
|
"lens": signal.get("lens", "unknown"),
|
|
})
|
|
ids.append(signal.get("id", str(uuid.uuid4())))
|
|
|
|
# Generate embeddings
|
|
embeddings = embed_texts(documents)
|
|
|
|
collection.add(
|
|
documents=documents,
|
|
metadatas=metadatas,
|
|
embeddings=embeddings,
|
|
ids=ids,
|
|
)
|
|
logger.info(f"Stored {len(signals)} signals for group {group_id}")
|
|
|
|
|
|
def query_signals(group_id: str, query: str, n_results: int = 10, signal_type: str = None) -> list[dict]:
|
|
"""Query the knowledge base with natural language."""
|
|
collection = get_collection(group_id)
|
|
|
|
query_embedding = embed_query(query)
|
|
|
|
where_filter = None
|
|
if signal_type:
|
|
where_filter = {"type": signal_type}
|
|
|
|
try:
|
|
results = collection.query(
|
|
query_embeddings=[query_embedding],
|
|
n_results=min(n_results, collection.count() or 1),
|
|
where=where_filter,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Query failed: {e}")
|
|
return []
|
|
|
|
# Format results
|
|
output = []
|
|
if results and results["documents"]:
|
|
for i, doc in enumerate(results["documents"][0]):
|
|
meta = results["metadatas"][0][i] if results["metadatas"] else {}
|
|
distance = results["distances"][0][i] if results["distances"] else None
|
|
output.append({
|
|
"document": doc,
|
|
"metadata": meta,
|
|
"relevance_score": 1 - (distance or 0), # Convert distance to similarity
|
|
})
|
|
|
|
return output
|
|
|
|
|
|
def get_all_signals(group_id: str, signal_type: str = None) -> list[dict]:
|
|
"""Get all signals for a group (for pattern detection)."""
|
|
collection = get_collection(group_id)
|
|
count = collection.count()
|
|
if count == 0:
|
|
return []
|
|
|
|
where_filter = {"type": signal_type} if signal_type else None
|
|
|
|
try:
|
|
results = collection.get(where=where_filter, limit=count)
|
|
except Exception:
|
|
results = collection.get(limit=count)
|
|
|
|
output = []
|
|
if results and results["documents"]:
|
|
for i, doc in enumerate(results["documents"]):
|
|
meta = results["metadatas"][i] if results["metadatas"] else {}
|
|
output.append({"document": doc, "metadata": meta, "id": results["ids"][i]})
|
|
|
|
return output
|
|
|
|
|
|
def get_group_ids() -> list[str]:
|
|
"""Get all group IDs that have collections."""
|
|
collections = _chroma_client.list_collections()
|
|
return [c.name.replace("ll_", "").replace("_", "-") for c in collections if c.name.startswith("ll_")]
|
|
```
|
|
|
|
### Step 2.3 — Create data models
|
|
|
|
Create file: `thirdeye/backend/db/models.py`
|
|
```python
|
|
"""Data models for ThirdEye."""
|
|
from pydantic import BaseModel, Field
|
|
from typing import Optional
|
|
from datetime import datetime
|
|
import uuid
|
|
|
|
|
|
class Signal(BaseModel):
|
|
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
|
|
group_id: str
|
|
lens: str = "unknown" # dev, product, client, community
|
|
type: str # architecture_decision, tech_debt, etc.
|
|
summary: str
|
|
entities: list[str] = []
|
|
severity: str = "low" # low, medium, high, critical
|
|
status: str = "unknown" # proposed, decided, implemented, unresolved
|
|
sentiment: str = "neutral"
|
|
urgency: str = "none"
|
|
raw_quote: str = ""
|
|
source_messages: list[int] = []
|
|
timestamp: str = Field(default_factory=lambda: datetime.utcnow().isoformat())
|
|
keywords: list[str] = []
|
|
|
|
|
|
class Pattern(BaseModel):
|
|
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
|
|
group_id: str
|
|
type: str # frequency_spike, knowledge_silo, recurring_issue, sentiment_trend, stale_item
|
|
description: str
|
|
severity: str = "info" # info, warning, critical
|
|
evidence_signal_ids: list[str] = []
|
|
recommendation: str = ""
|
|
detected_at: str = Field(default_factory=lambda: datetime.utcnow().isoformat())
|
|
is_active: bool = True
|
|
|
|
|
|
class CrossGroupInsight(BaseModel):
|
|
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
|
|
type: str # blocked_handoff, conflicting_decision, information_silo, promise_reality_gap, duplicated_effort
|
|
description: str
|
|
group_a: dict = {} # {name, group_id, evidence}
|
|
group_b: dict = {}
|
|
severity: str = "warning"
|
|
recommendation: str = ""
|
|
detected_at: str = Field(default_factory=lambda: datetime.utcnow().isoformat())
|
|
is_resolved: bool = False
|
|
|
|
|
|
class GroupConfig(BaseModel):
|
|
group_id: str
|
|
group_name: str = ""
|
|
lens_mode: str = "auto" # auto, dev, product, client, community
|
|
detected_lens: str = "unknown"
|
|
confidence: float = 0.0
|
|
is_active: bool = True
|
|
message_count: int = 0
|
|
signal_count: int = 0
|
|
```
|
|
|
|
### ✅ TEST MILESTONE 2
|
|
|
|
Create file: `thirdeye/scripts/test_m2.py`
|
|
```python
|
|
"""Test Milestone 2: ChromaDB + Embeddings working."""
|
|
import os, sys
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
def test_embeddings():
|
|
print("Testing embeddings...")
|
|
from backend.db.embeddings import embed_texts, embed_query
|
|
|
|
texts = ["Let's use PostgreSQL for the database", "The timeout bug is happening again"]
|
|
embeddings = embed_texts(texts)
|
|
|
|
assert len(embeddings) == 2, f"Expected 2 embeddings, got {len(embeddings)}"
|
|
assert len(embeddings[0]) > 10, f"Embedding too short: {len(embeddings[0])}"
|
|
print(f" ✅ Embedded 2 texts, dimension={len(embeddings[0])}")
|
|
|
|
query_emb = embed_query("database decision")
|
|
assert len(query_emb) > 10
|
|
print(f" ✅ Query embedding works, dimension={len(query_emb)}")
|
|
|
|
def test_chroma():
|
|
print("Testing ChromaDB...")
|
|
from backend.db.chroma import store_signals, query_signals, get_all_signals
|
|
|
|
test_group = "test_group_m2"
|
|
|
|
# Store test signals
|
|
signals = [
|
|
{
|
|
"type": "architecture_decision",
|
|
"summary": "Team decided to use PostgreSQL over MongoDB for relational data",
|
|
"entities": ["@alex", "postgresql", "mongodb"],
|
|
"severity": "medium",
|
|
"status": "decided",
|
|
"raw_quote": "Let's go with Postgres, MongoDB is overkill",
|
|
"timestamp": "2026-03-20T10:00:00Z",
|
|
"lens": "dev",
|
|
},
|
|
{
|
|
"type": "tech_debt",
|
|
"summary": "API URL hardcoded instead of using environment variables",
|
|
"entities": ["@priya", "api_url"],
|
|
"severity": "low",
|
|
"status": "unresolved",
|
|
"raw_quote": "Just hardcoding the URL for now",
|
|
"timestamp": "2026-03-20T14:00:00Z",
|
|
"lens": "dev",
|
|
},
|
|
{
|
|
"type": "recurring_bug",
|
|
"summary": "Timeout error occurring repeatedly in payment service",
|
|
"entities": ["payment_service", "timeout"],
|
|
"severity": "high",
|
|
"status": "unresolved",
|
|
"raw_quote": "Timeout error is back again",
|
|
"timestamp": "2026-03-21T09:00:00Z",
|
|
"lens": "dev",
|
|
},
|
|
]
|
|
|
|
store_signals(test_group, signals)
|
|
print(f" ✅ Stored {len(signals)} signals")
|
|
|
|
# Query
|
|
results = query_signals(test_group, "database decision")
|
|
assert len(results) > 0, "No results for 'database decision'"
|
|
assert "postgres" in results[0]["document"].lower() or "database" in results[0]["document"].lower()
|
|
print(f" ✅ Query 'database decision' returned {len(results)} results")
|
|
print(f" Top result: {results[0]['document'][:80]}")
|
|
|
|
# Query with type filter
|
|
results2 = query_signals(test_group, "bug", signal_type="recurring_bug")
|
|
assert len(results2) > 0, "No results for type=recurring_bug"
|
|
print(f" ✅ Filtered query (type=recurring_bug) returned {len(results2)} results")
|
|
|
|
# Get all
|
|
all_sigs = get_all_signals(test_group)
|
|
assert len(all_sigs) >= 3, f"Expected >=3 signals, got {len(all_sigs)}"
|
|
print(f" ✅ get_all_signals returned {len(all_sigs)} signals")
|
|
|
|
# Cleanup test collection
|
|
import chromadb
|
|
from backend.config import CHROMA_DB_PATH
|
|
client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
|
|
try:
|
|
client.delete_collection(f"ll_{test_group}")
|
|
print(f" ✅ Cleaned up test collection")
|
|
except:
|
|
pass
|
|
|
|
def test_models():
|
|
print("Testing data models...")
|
|
from backend.db.models import Signal, Pattern, CrossGroupInsight
|
|
|
|
s = Signal(group_id="test", type="tech_debt", summary="Test signal")
|
|
assert s.id is not None
|
|
assert s.severity == "low"
|
|
print(f" ✅ Signal model works (id={s.id[:8]}...)")
|
|
|
|
p = Pattern(group_id="test", type="frequency_spike", description="Test pattern")
|
|
assert p.is_active == True
|
|
print(f" ✅ Pattern model works")
|
|
|
|
c = CrossGroupInsight(type="blocked_handoff", description="Test insight")
|
|
assert c.is_resolved == False
|
|
print(f" ✅ CrossGroupInsight model works")
|
|
|
|
test_embeddings()
|
|
test_chroma()
|
|
test_models()
|
|
print("\n🎉 MILESTONE 2 PASSED — ChromaDB + Embeddings working")
|
|
```
|
|
|
|
Run: `cd thirdeye && python scripts/test_m2.py`
|
|
|
|
---
|
|
|
|
## MILESTONE 3: Signal Extractor Agent (30%)
|
|
**Goal:** The core agent that processes batched messages and extracts structured signals. Verified with real LLM calls.
|
|
|
|
### Step 3.1 — Create the Signal Extractor Agent
|
|
|
|
Create file: `thirdeye/backend/agents/signal_extractor.py`
|
|
```python
|
|
"""Signal Extractor Agent — extracts structured signals from chat messages."""
|
|
import json
|
|
import logging
|
|
from backend.providers import call_llm
|
|
from backend.db.models import Signal
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger("thirdeye.agents.signal_extractor")
|
|
|
|
# Lens-specific system prompts
|
|
LENS_PROMPTS = {
|
|
"dev": """You are the Signal Extractor for ThirdEye operating in DevLens mode.
|
|
You analyze batches of developer team chat messages and extract STRUCTURED SIGNALS.
|
|
|
|
Extract ONLY signals that represent meaningful technical information. Skip greetings, small talk, emoji reactions, and meta-conversation.
|
|
|
|
Signal types to look for:
|
|
- architecture_decision: Technology choices, design decisions with rationale
|
|
- tech_debt: Shortcuts, hardcoded values, "will fix later" patterns
|
|
- knowledge_silo_evidence: Only one person discusses a critical topic
|
|
- recurring_bug: Same issue mentioned repeatedly
|
|
- stack_decision: Technology/framework choices (proposed or decided)
|
|
- deployment_risk: Risky deployment practices
|
|
- workaround: Temporary fixes being applied repeatedly
|
|
|
|
For EACH signal found, include it in the JSON array. If NO meaningful signals exist, return empty array.
|
|
Be SELECTIVE. Quality over quantity.""",
|
|
|
|
"product": """You are the Signal Extractor for ThirdEye operating in ProductLens mode.
|
|
|
|
Signal types to look for:
|
|
- feature_request: Features users or team members are asking for
|
|
- user_pain_point: User difficulties, complaints, confusion
|
|
- roadmap_drift: Discussion of topics not on the current plan
|
|
- priority_conflict: Team members disagreeing on what's most important
|
|
- metric_mention: Specific numbers, conversion rates, performance data
|
|
- user_quote: Direct quotes from users/customers
|
|
- competitor_intel: Mentions of competitor actions or features
|
|
|
|
Be SELECTIVE. Quality over quantity.""",
|
|
|
|
"client": """You are the Signal Extractor for ThirdEye operating in ClientLens mode.
|
|
|
|
Signal types to look for:
|
|
- promise: Commitments made with deadlines (explicit or implicit)
|
|
- scope_creep: Additional requests introduced casually without formal change requests
|
|
- sentiment_signal: Tone changes (positive praise, growing frustration, formality shifts)
|
|
- unanswered_request: Questions or requests that haven't received responses
|
|
- satisfaction: Explicit positive or negative feedback
|
|
- escalation_risk: Mentions of involving management, expressing deadline concerns
|
|
- client_decision: Decisions made by the client
|
|
|
|
Pay SPECIAL attention to implicit deadlines ("by end of week", "before the meeting").
|
|
Be SELECTIVE. Quality over quantity.""",
|
|
|
|
"community": """You are the Signal Extractor for ThirdEye operating in CommunityLens mode.
|
|
|
|
Signal types: recommendation, event, issue, local_knowledge, question
|
|
Be SELECTIVE. Quality over quantity.""",
|
|
}
|
|
|
|
EXTRACTION_FORMAT = """
|
|
Respond ONLY with valid JSON in this exact format (no markdown, no backticks, no explanation):
|
|
{"signals": [{"type": "signal_type_here", "summary": "One clear sentence", "entities": ["@person", "technology"], "severity": "low|medium|high|critical", "status": "proposed|decided|implemented|unresolved", "raw_quote": "Short key phrase from message", "message_index": 0}]}
|
|
|
|
If no signals found: {"signals": []}
|
|
"""
|
|
|
|
|
|
async def extract_signals(messages_text: str, group_id: str, lens: str = "dev") -> list[Signal]:
|
|
"""
|
|
Extract structured signals from a batch of formatted chat messages.
|
|
|
|
Args:
|
|
messages_text: Formatted string like "[Alex]: Let's use Redis\\n[Bob]: Agreed"
|
|
group_id: Telegram group ID
|
|
lens: Active lens mode (dev, product, client, community)
|
|
|
|
Returns:
|
|
List of Signal objects
|
|
"""
|
|
system_prompt = LENS_PROMPTS.get(lens, LENS_PROMPTS["dev"])
|
|
|
|
messages = [
|
|
{"role": "system", "content": system_prompt + "\n\n" + EXTRACTION_FORMAT},
|
|
{"role": "user", "content": f"Extract signals from these messages:\n\n{messages_text}"},
|
|
]
|
|
|
|
try:
|
|
result = await call_llm("fast_large", messages, temperature=0.2, max_tokens=2000)
|
|
content = result["content"].strip()
|
|
|
|
# Clean up common LLM response issues
|
|
if content.startswith("```"):
|
|
content = content.split("```")[1]
|
|
if content.startswith("json"):
|
|
content = content[4:]
|
|
content = content.strip()
|
|
|
|
parsed = json.loads(content)
|
|
raw_signals = parsed.get("signals", [])
|
|
|
|
# Convert to Signal objects
|
|
signals = []
|
|
for raw in raw_signals:
|
|
try:
|
|
signal = Signal(
|
|
group_id=group_id,
|
|
lens=lens,
|
|
type=raw.get("type", "unknown"),
|
|
summary=raw.get("summary", ""),
|
|
entities=raw.get("entities", []),
|
|
severity=raw.get("severity", "low"),
|
|
status=raw.get("status", "unknown"),
|
|
raw_quote=raw.get("raw_quote", ""),
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
)
|
|
signals.append(signal)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to parse signal: {e}")
|
|
continue
|
|
|
|
logger.info(f"Extracted {len(signals)} signals from {group_id} (lens={lens}) via {result['provider']}")
|
|
return signals
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"JSON parse error in signal extraction: {e}")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"Signal extraction failed: {e}")
|
|
return []
|
|
```
|
|
|
|
### Step 3.2 — Create the Classifier Agent
|
|
|
|
Create file: `thirdeye/backend/agents/classifier.py`
|
|
```python
|
|
"""Classifier Agent — adds metadata tags to extracted signals."""
|
|
import json
|
|
import logging
|
|
from backend.providers import call_llm
|
|
from backend.db.models import Signal
|
|
|
|
logger = logging.getLogger("thirdeye.agents.classifier")
|
|
|
|
SYSTEM_PROMPT = """You are a fast metadata classifier. Given an extracted signal, add classification tags.
|
|
|
|
Respond ONLY with valid JSON (no markdown, no backticks):
|
|
{"sentiment": "positive|neutral|negative|urgent", "urgency": "none|low|medium|high|critical", "keywords": ["3-5 searchable keywords"]}
|
|
"""
|
|
|
|
|
|
async def classify_signal(signal: Signal) -> Signal:
|
|
"""Add classification metadata to a signal."""
|
|
messages = [
|
|
{"role": "system", "content": SYSTEM_PROMPT},
|
|
{"role": "user", "content": f"Classify this signal:\nType: {signal.type}\nSummary: {signal.summary}\nQuote: {signal.raw_quote}"},
|
|
]
|
|
|
|
try:
|
|
result = await call_llm("fast_small", messages, temperature=0.1, max_tokens=200)
|
|
content = result["content"].strip()
|
|
if content.startswith("```"):
|
|
content = content.split("```")[1]
|
|
if content.startswith("json"):
|
|
content = content[4:]
|
|
content = content.strip()
|
|
|
|
parsed = json.loads(content)
|
|
signal.sentiment = parsed.get("sentiment", signal.sentiment)
|
|
signal.urgency = parsed.get("urgency", signal.urgency)
|
|
signal.keywords = parsed.get("keywords", signal.keywords)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Classification failed, using defaults: {e}")
|
|
# Keep defaults — classification failure is non-fatal
|
|
|
|
return signal
|
|
```
|
|
|
|
### Step 3.3 — Create the Context Detector Agent
|
|
|
|
Create file: `thirdeye/backend/agents/context_detector.py`
|
|
```python
|
|
"""Context Detector Agent — auto-classifies group type from messages."""
|
|
import json
|
|
import logging
|
|
from backend.providers import call_llm
|
|
|
|
logger = logging.getLogger("thirdeye.agents.context_detector")
|
|
|
|
SYSTEM_PROMPT = """You analyze a batch of messages from a Telegram group and determine what TYPE of group this is.
|
|
|
|
CLASSIFY into exactly ONE:
|
|
- "dev" — Software engineering team (code, PRs, deployments, bugs, tech stack)
|
|
- "product" — Product/business team (features, users, metrics, roadmap, competitors)
|
|
- "client" — Client/agency channel (deliverables, timelines, approvals, invoices)
|
|
- "community" — Community/interest group (recommendations, events, local info, casual)
|
|
|
|
Respond ONLY with valid JSON (no markdown, no backticks):
|
|
{"detected_lens": "dev|product|client|community", "confidence": 0.0-1.0, "evidence": ["signal1", "signal2", "signal3"]}
|
|
"""
|
|
|
|
|
|
async def detect_context(messages_text: str) -> dict:
|
|
"""Detect group type from a batch of messages."""
|
|
messages = [
|
|
{"role": "system", "content": SYSTEM_PROMPT},
|
|
{"role": "user", "content": f"Classify this group based on these messages:\n\n{messages_text}"},
|
|
]
|
|
|
|
try:
|
|
result = await call_llm("reasoning", messages, temperature=0.1, max_tokens=300)
|
|
content = result["content"].strip()
|
|
if content.startswith("```"):
|
|
content = content.split("```")[1]
|
|
if content.startswith("json"):
|
|
content = content[4:]
|
|
|
|
parsed = json.loads(content.strip())
|
|
return {
|
|
"detected_lens": parsed.get("detected_lens", "dev"),
|
|
"confidence": parsed.get("confidence", 0.5),
|
|
"evidence": parsed.get("evidence", []),
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Context detection failed: {e}")
|
|
return {"detected_lens": "dev", "confidence": 0.0, "evidence": ["detection_failed"]}
|
|
```
|
|
|
|
### ✅ TEST MILESTONE 3
|
|
|
|
Create file: `thirdeye/scripts/test_m3.py`
|
|
```python
|
|
"""Test Milestone 3: Signal Extractor, Classifier, and Context Detector working."""
|
|
import asyncio, os, sys
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
DEV_CHAT = """[Alex]: Hey team, I think we should go with PostgreSQL for the main DB. MongoDB is overkill for our relational data.
|
|
[Priya]: Agreed on Postgres. I'll set up the schema today.
|
|
[Raj]: Payment module webhook integration is looking tricky. I'll handle it myself since I know the Stripe API best.
|
|
[Alex]: I'm just gonna hardcode the API URL for now, we'll add env vars when we dockerize.
|
|
[Sam]: The timeout error on the checkout endpoint is happening again. Third time this week.
|
|
[Alex]: Just restart the pod for now, I'll look at it after the sprint."""
|
|
|
|
PRODUCT_CHAT = """[Lisa]: Users keep asking about dark mode, it comes up in every demo.
|
|
[Mike]: I think we should prioritize the mobile app over the API this sprint.
|
|
[Sarah]: No way, API stability is way more important. Two enterprise clients complained last week.
|
|
[Lisa]: Sarah from Acme literally said 'I would pay double if you had SSO integration.'
|
|
[Mike]: Competitor X just launched a mobile-first version. We're falling behind."""
|
|
|
|
async def test_signal_extractor():
|
|
from backend.agents.signal_extractor import extract_signals
|
|
|
|
print("Testing Signal Extractor (DevLens)...")
|
|
signals = await extract_signals(DEV_CHAT, "test-dev", lens="dev")
|
|
print(f" Extracted {len(signals)} signals:")
|
|
for s in signals:
|
|
print(f" - [{s.type}] {s.summary[:70]}...")
|
|
assert len(signals) >= 2, f"Expected >=2 signals, got {len(signals)}"
|
|
print(f" ✅ DevLens extraction working ({len(signals)} signals)")
|
|
|
|
print("\nTesting Signal Extractor (ProductLens)...")
|
|
signals2 = await extract_signals(PRODUCT_CHAT, "test-product", lens="product")
|
|
print(f" Extracted {len(signals2)} signals:")
|
|
for s in signals2:
|
|
print(f" - [{s.type}] {s.summary[:70]}...")
|
|
assert len(signals2) >= 2, f"Expected >=2 signals, got {len(signals2)}"
|
|
print(f" ✅ ProductLens extraction working ({len(signals2)} signals)")
|
|
|
|
async def test_classifier():
|
|
from backend.agents.signal_extractor import extract_signals
|
|
from backend.agents.classifier import classify_signal
|
|
|
|
print("\nTesting Classifier Agent...")
|
|
signals = await extract_signals(DEV_CHAT, "test-classify", lens="dev")
|
|
if signals:
|
|
classified = await classify_signal(signals[0])
|
|
print(f" Signal: {classified.summary[:60]}")
|
|
print(f" Sentiment: {classified.sentiment}, Urgency: {classified.urgency}")
|
|
print(f" Keywords: {classified.keywords}")
|
|
print(f" ✅ Classifier working")
|
|
else:
|
|
print(f" ⚠️ No signals to classify (extractor returned empty)")
|
|
|
|
async def test_context_detector():
|
|
from backend.agents.context_detector import detect_context
|
|
|
|
print("\nTesting Context Detector...")
|
|
result = await detect_context(DEV_CHAT)
|
|
print(f" Detected: {result['detected_lens']} (confidence: {result['confidence']})")
|
|
print(f" Evidence: {result['evidence']}")
|
|
assert result["detected_lens"] == "dev", f"Expected 'dev', got '{result['detected_lens']}'"
|
|
print(f" ✅ Correctly detected as 'dev'")
|
|
|
|
result2 = await detect_context(PRODUCT_CHAT)
|
|
print(f" Detected: {result2['detected_lens']} (confidence: {result2['confidence']})")
|
|
assert result2["detected_lens"] == "product", f"Expected 'product', got '{result2['detected_lens']}'"
|
|
print(f" ✅ Correctly detected as 'product'")
|
|
|
|
async def main():
|
|
await test_signal_extractor()
|
|
await test_classifier()
|
|
await test_context_detector()
|
|
print("\n🎉 MILESTONE 3 PASSED — Core agents working")
|
|
|
|
asyncio.run(main())
|
|
```
|
|
|
|
Run: `cd thirdeye && python scripts/test_m3.py`
|
|
|
|
**Expected:** Signals extracted from both dev and product chat. Classifier adds metadata. Context detector correctly identifies "dev" and "product".
|
|
|
|
---
|
|
|
|
## MILESTONE 4: Full Pipeline Integration (45%)
|
|
**Goal:** End-to-end pipeline: Messages → Extract → Classify → Store in ChromaDB → Query back. No Telegram yet.
|
|
|
|
### Step 4.1 — Create the pipeline orchestrator
|
|
|
|
Create file: `thirdeye/backend/pipeline.py`
|
|
```python
|
|
"""Core pipeline: message batch → signals → classified → stored → queryable."""
|
|
import asyncio
|
|
import logging
|
|
from backend.agents.signal_extractor import extract_signals
|
|
from backend.agents.classifier import classify_signal
|
|
from backend.agents.context_detector import detect_context
|
|
from backend.db.chroma import store_signals, query_signals
|
|
from backend.db.models import Signal
|
|
|
|
logger = logging.getLogger("thirdeye.pipeline")
|
|
|
|
# In-memory group config store (replace with Redis/DB for production)
|
|
_group_configs = {}
|
|
|
|
|
|
async def detect_and_set_lens(group_id: str, messages_text: str) -> str:
|
|
"""Auto-detect lens for a group from initial messages."""
|
|
result = await detect_context(messages_text)
|
|
_group_configs[group_id] = {
|
|
"lens": result["detected_lens"],
|
|
"confidence": result["confidence"],
|
|
}
|
|
logger.info(f"Group {group_id}: lens={result['detected_lens']} (conf={result['confidence']})")
|
|
return result["detected_lens"]
|
|
|
|
|
|
def get_lens(group_id: str) -> str:
|
|
"""Get the current lens for a group."""
|
|
config = _group_configs.get(group_id, {})
|
|
return config.get("lens", "dev")
|
|
|
|
|
|
def set_lens(group_id: str, lens: str):
|
|
"""Manually set the lens for a group."""
|
|
_group_configs[group_id] = {"lens": lens, "confidence": 1.0}
|
|
|
|
|
|
async def process_message_batch(group_id: str, messages: list[dict]) -> list[Signal]:
|
|
"""
|
|
Process a batch of messages through the full pipeline.
|
|
|
|
Args:
|
|
group_id: Telegram group ID
|
|
messages: List of {"sender": str, "text": str, "timestamp": str}
|
|
|
|
Returns:
|
|
List of stored Signal objects
|
|
"""
|
|
# Format messages for the LLM
|
|
formatted = "\n".join([f"[{m['sender']}]: {m['text']}" for m in messages])
|
|
|
|
# Get or detect lens
|
|
lens = get_lens(group_id)
|
|
if lens == "dev" and group_id not in _group_configs:
|
|
# First time seeing this group — auto-detect
|
|
lens = await detect_and_set_lens(group_id, formatted)
|
|
|
|
# Step 1: Extract signals
|
|
signals = await extract_signals(formatted, group_id, lens=lens)
|
|
|
|
if not signals:
|
|
logger.info(f"No signals extracted from batch in {group_id}")
|
|
return []
|
|
|
|
# Step 2: Classify each signal (parallel for speed)
|
|
classified_signals = await asyncio.gather(*[classify_signal(s) for s in signals])
|
|
|
|
# Step 3: Store in ChromaDB
|
|
store_signals(group_id, [s.model_dump() for s in classified_signals])
|
|
|
|
logger.info(f"Pipeline complete: {len(classified_signals)} signals stored for {group_id}")
|
|
return classified_signals
|
|
|
|
|
|
async def query_knowledge(group_id: str, question: str) -> str:
|
|
"""Query the knowledge base with natural language and get a formatted answer."""
|
|
from backend.providers import call_llm
|
|
|
|
# Retrieve relevant signals
|
|
results = query_signals(group_id, question, n_results=8)
|
|
|
|
if not results:
|
|
return "I don't have any information about that in the knowledge base yet. The group needs more conversation for me to learn from."
|
|
|
|
# Format context for the LLM
|
|
context_parts = []
|
|
for i, r in enumerate(results):
|
|
meta = r["metadata"]
|
|
context_parts.append(
|
|
f"[Signal {i+1}] Type: {meta.get('type', 'unknown')} | "
|
|
f"Severity: {meta.get('severity', 'unknown')} | "
|
|
f"Time: {meta.get('timestamp', 'unknown')}\n"
|
|
f"Content: {r['document']}\n"
|
|
f"Entities: {meta.get('entities', '[]')}"
|
|
)
|
|
context = "\n\n".join(context_parts)
|
|
|
|
messages = [
|
|
{"role": "system", "content": """You are the Query Agent for ThirdEye. Answer questions about the team's chat history using ONLY the provided context.
|
|
|
|
RULES:
|
|
1. Answer ONLY from the provided context. Never make up information.
|
|
2. Cite sources: "Based on a discussion involving @person on [date]..."
|
|
3. If info doesn't exist in context, say so clearly.
|
|
4. Be concise — 2-4 sentences unless more is needed.
|
|
5. Format for Telegram (plain text, no markdown headers)."""},
|
|
{"role": "user", "content": f"Context from knowledge base:\n\n{context}\n\nQuestion: {question}"},
|
|
]
|
|
|
|
try:
|
|
result = await call_llm("fast_large", messages, temperature=0.3, max_tokens=500)
|
|
return result["content"]
|
|
except Exception as e:
|
|
logger.error(f"Query agent failed: {e}")
|
|
return "Sorry, I encountered an error while searching the knowledge base. Please try again."
|
|
```
|
|
|
|
### ✅ TEST MILESTONE 4
|
|
|
|
Create file: `thirdeye/scripts/test_m4.py`
|
|
```python
|
|
"""Test Milestone 4: Full pipeline — extract → classify → store → query."""
|
|
import asyncio, os, sys
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
DEV_MESSAGES = [
|
|
{"sender": "Alex", "text": "Hey team, I think we should go with PostgreSQL for the main DB. MongoDB is overkill.", "timestamp": "2026-03-20T10:00:00Z"},
|
|
{"sender": "Priya", "text": "Agreed. I'll set up the Postgres schema today.", "timestamp": "2026-03-20T10:05:00Z"},
|
|
{"sender": "Raj", "text": "Payment webhook integration is tricky. I'll handle all the Stripe stuff since I know it best.", "timestamp": "2026-03-20T11:00:00Z"},
|
|
{"sender": "Alex", "text": "I'm just hardcoding the API URL for now. We'll fix it with env vars later.", "timestamp": "2026-03-20T14:00:00Z"},
|
|
{"sender": "Sam", "text": "The timeout error on checkout is back. Third time this week.", "timestamp": "2026-03-21T09:00:00Z"},
|
|
{"sender": "Alex", "text": "Just restart the pod when it happens. I'll investigate after the sprint.", "timestamp": "2026-03-21T09:15:00Z"},
|
|
]
|
|
|
|
async def main():
|
|
from backend.pipeline import process_message_batch, query_knowledge
|
|
|
|
group_id = "test_pipeline_m4"
|
|
|
|
# Step 1: Process messages through full pipeline
|
|
print("Processing message batch through full pipeline...")
|
|
signals = await process_message_batch(group_id, DEV_MESSAGES)
|
|
print(f" ✅ Pipeline produced {len(signals)} signals:")
|
|
for s in signals:
|
|
print(f" [{s.type}] {s.summary[:70]} (severity={s.severity}, sentiment={s.sentiment})")
|
|
|
|
assert len(signals) >= 2, f"Expected >=2 signals, got {len(signals)}"
|
|
|
|
# Step 2: Query the knowledge base
|
|
print("\nQuerying: 'What database did the team choose?'")
|
|
answer = await query_knowledge(group_id, "What database did the team choose?")
|
|
print(f" Answer: {answer}")
|
|
assert len(answer) > 20, "Answer too short"
|
|
print(f" ✅ Query agent produced meaningful answer")
|
|
|
|
print("\nQuerying: 'What tech debt exists?'")
|
|
answer2 = await query_knowledge(group_id, "What tech debt exists?")
|
|
print(f" Answer: {answer2}")
|
|
print(f" ✅ Tech debt query works")
|
|
|
|
print("\nQuerying: 'What bugs have been reported?'")
|
|
answer3 = await query_knowledge(group_id, "What bugs or issues keep recurring?")
|
|
print(f" Answer: {answer3}")
|
|
print(f" ✅ Bug query works")
|
|
|
|
# Cleanup
|
|
import chromadb
|
|
from backend.config import CHROMA_DB_PATH
|
|
client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
|
|
try:
|
|
client.delete_collection(f"ll_{group_id}")
|
|
except:
|
|
pass
|
|
|
|
print("\n🎉 MILESTONE 4 PASSED — Full pipeline working end to end")
|
|
|
|
asyncio.run(main())
|
|
```
|
|
|
|
Run: `cd thirdeye && python scripts/test_m4.py`
|
|
|
|
**Expected:** Signals extracted, classified, stored in ChromaDB, and queries return meaningful answers about the team's database decision, tech debt, and bugs.
|
|
|
|
---
|
|
|
|
## MILESTONE 5: Pattern Detector + Cross-Group Analyst (60%)
|
|
**Goal:** Pattern detection within a group and cross-group blind spot detection working.
|
|
|
|
### Step 5.1 — Create Pattern Detector Agent
|
|
|
|
Create file: `thirdeye/backend/agents/pattern_detector.py`
|
|
```python
|
|
"""Pattern Detector Agent — finds trends and anomalies in accumulated signals."""
|
|
import json
|
|
import logging
|
|
from backend.providers import call_llm
|
|
from backend.db.chroma import get_all_signals
|
|
from backend.db.models import Pattern
|
|
|
|
logger = logging.getLogger("thirdeye.agents.pattern_detector")
|
|
|
|
SYSTEM_PROMPT = """You are the Pattern Detector for ThirdEye. You analyze accumulated signals to find patterns and anomalies.
|
|
|
|
Detect these pattern types:
|
|
- frequency_spike: A signal type mentioned significantly more than usual
|
|
- knowledge_silo: Only one person discusses a critical topic (bus factor = 1)
|
|
- recurring_issue: Same bug/problem appearing repeatedly
|
|
- sentiment_trend: Gradual shift in tone over time
|
|
- stale_item: Decisions proposed but never resolved, promises with no follow-up
|
|
|
|
Respond ONLY with valid JSON (no markdown, no backticks):
|
|
{"patterns": [{"type": "pattern_type", "description": "Clear human-readable description", "severity": "info|warning|critical", "evidence_ids": [], "recommendation": "Suggested action"}]}
|
|
|
|
If no patterns found: {"patterns": []}
|
|
Only report patterns that are genuinely concerning. Do NOT manufacture patterns from insufficient data."""
|
|
|
|
|
|
async def detect_patterns(group_id: str) -> list[Pattern]:
|
|
"""Analyze all signals in a group and detect patterns."""
|
|
all_signals = get_all_signals(group_id)
|
|
|
|
if len(all_signals) < 3:
|
|
logger.info(f"Not enough signals ({len(all_signals)}) for pattern detection in {group_id}")
|
|
return []
|
|
|
|
# Format signals for the LLM
|
|
signal_summary = []
|
|
for s in all_signals:
|
|
meta = s["metadata"]
|
|
signal_summary.append(
|
|
f"- [{meta.get('type', '?')}] {s['document'][:100]} "
|
|
f"(severity={meta.get('severity', '?')}, entities={meta.get('entities', '[]')}, "
|
|
f"time={meta.get('timestamp', '?')})"
|
|
)
|
|
signals_text = "\n".join(signal_summary)
|
|
|
|
messages = [
|
|
{"role": "system", "content": SYSTEM_PROMPT},
|
|
{"role": "user", "content": f"Analyze these {len(all_signals)} signals from group '{group_id}':\n\n{signals_text}"},
|
|
]
|
|
|
|
try:
|
|
result = await call_llm("reasoning", messages, temperature=0.2, max_tokens=1500)
|
|
content = result["content"].strip()
|
|
if content.startswith("```"):
|
|
content = content.split("```")[1]
|
|
if content.startswith("json"):
|
|
content = content[4:]
|
|
|
|
parsed = json.loads(content.strip())
|
|
patterns = []
|
|
for p in parsed.get("patterns", []):
|
|
patterns.append(Pattern(
|
|
group_id=group_id,
|
|
type=p.get("type", "unknown"),
|
|
description=p.get("description", ""),
|
|
severity=p.get("severity", "info"),
|
|
recommendation=p.get("recommendation", ""),
|
|
))
|
|
|
|
logger.info(f"Detected {len(patterns)} patterns in {group_id}")
|
|
return patterns
|
|
|
|
except Exception as e:
|
|
logger.error(f"Pattern detection failed: {e}")
|
|
return []
|
|
```
|
|
|
|
### Step 5.2 — Create Cross-Group Analyst Agent
|
|
|
|
Create file: `thirdeye/backend/agents/cross_group_analyst.py`
|
|
```python
|
|
"""Cross-Group Analyst Agent — detects blind spots between multiple teams."""
|
|
import json
|
|
import logging
|
|
from backend.providers import call_llm
|
|
from backend.db.chroma import get_all_signals, get_group_ids
|
|
from backend.db.models import CrossGroupInsight
|
|
|
|
logger = logging.getLogger("thirdeye.agents.cross_group_analyst")
|
|
|
|
SYSTEM_PROMPT = """You are the Cross-Group Intelligence Analyst for ThirdEye. This is the MOST IMPORTANT analysis.
|
|
|
|
You receive intelligence summaries from MULTIPLE Telegram groups. Your job is to find BLIND SPOTS — information in one group that should be in another.
|
|
|
|
Detect:
|
|
- blocked_handoff: Team A waiting for something from Team B, but Team B doesn't know
|
|
- conflicting_decision: Team A decided X, Team B decided the opposite
|
|
- information_silo: Critical info in Group A never reached Group B
|
|
- promise_reality_gap: Promise made in one group, but another group shows it's blocked
|
|
- duplicated_effort: Two teams working on similar things unknowingly
|
|
|
|
Respond ONLY with valid JSON (no markdown):
|
|
{"insights": [{"type": "insight_type", "description": "SPECIFIC description naming the groups, people, and topics", "group_a": {"name": "group_name", "evidence": "what was said"}, "group_b": {"name": "group_name", "evidence": "what was said or NOT said"}, "severity": "warning|critical", "recommendation": "Specific action"}]}
|
|
|
|
If no cross-group issues: {"insights": []}
|
|
Be SPECIFIC. Name the groups, people, topics, and exact conflicts."""
|
|
|
|
|
|
async def analyze_cross_group(group_summaries: dict[str, list[dict]] = None) -> list[CrossGroupInsight]:
|
|
"""
|
|
Analyze intelligence across all monitored groups to find blind spots.
|
|
|
|
Args:
|
|
group_summaries: Optional pre-built summaries. If None, loads from ChromaDB.
|
|
"""
|
|
if group_summaries is None:
|
|
group_ids = get_group_ids()
|
|
if len(group_ids) < 2:
|
|
logger.info("Need at least 2 groups for cross-group analysis")
|
|
return []
|
|
|
|
group_summaries = {}
|
|
for gid in group_ids:
|
|
signals = get_all_signals(gid)
|
|
group_summaries[gid] = signals
|
|
|
|
if len(group_summaries) < 2:
|
|
return []
|
|
|
|
# Format summaries for the LLM
|
|
summary_parts = []
|
|
for group_name, signals in group_summaries.items():
|
|
signal_lines = []
|
|
for s in signals[:30]: # Limit per group to fit context
|
|
meta = s["metadata"]
|
|
signal_lines.append(f" - [{meta.get('type', '?')}] {s['document'][:120]}")
|
|
|
|
summary_parts.append(f"=== GROUP: {group_name} ({len(signals)} total signals) ===\n" + "\n".join(signal_lines))
|
|
|
|
full_summary = "\n\n".join(summary_parts)
|
|
|
|
messages = [
|
|
{"role": "system", "content": SYSTEM_PROMPT},
|
|
{"role": "user", "content": f"Analyze cross-group intelligence:\n\n{full_summary}"},
|
|
]
|
|
|
|
try:
|
|
result = await call_llm("reasoning", messages, temperature=0.2, max_tokens=2000)
|
|
content = result["content"].strip()
|
|
if content.startswith("```"):
|
|
content = content.split("```")[1]
|
|
if content.startswith("json"):
|
|
content = content[4:]
|
|
|
|
parsed = json.loads(content.strip())
|
|
insights = []
|
|
for i in parsed.get("insights", []):
|
|
insights.append(CrossGroupInsight(
|
|
type=i.get("type", "unknown"),
|
|
description=i.get("description", ""),
|
|
group_a=i.get("group_a", {}),
|
|
group_b=i.get("group_b", {}),
|
|
severity=i.get("severity", "warning"),
|
|
recommendation=i.get("recommendation", ""),
|
|
))
|
|
|
|
logger.info(f"Cross-group analysis found {len(insights)} insights")
|
|
return insights
|
|
|
|
except Exception as e:
|
|
logger.error(f"Cross-group analysis failed: {e}")
|
|
return []
|
|
```
|
|
|
|
### ✅ TEST MILESTONE 5
|
|
|
|
Create file: `thirdeye/scripts/test_m5.py`
|
|
```python
|
|
"""Test Milestone 5: Pattern detection + Cross-group analysis."""
|
|
import asyncio, os, sys
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
from backend.pipeline import process_message_batch
|
|
|
|
# Dev team messages with PLANTED patterns
|
|
DEV_MSGS = [
|
|
{"sender": "Alex", "text": "Let's go with PostgreSQL.", "timestamp": "2026-03-15T10:00:00Z"},
|
|
{"sender": "Raj", "text": "I'll handle the payment module Stripe integration.", "timestamp": "2026-03-15T11:00:00Z"},
|
|
{"sender": "Raj", "text": "Payment webhook setup is done, only I know how this works right now.", "timestamp": "2026-03-16T10:00:00Z"},
|
|
{"sender": "Sam", "text": "Timeout error on checkout again.", "timestamp": "2026-03-17T09:00:00Z"},
|
|
{"sender": "Sam", "text": "Same timeout error. This is the third time.", "timestamp": "2026-03-18T09:00:00Z"},
|
|
{"sender": "Alex", "text": "I'm hardcoding the config for now, no time to do it properly.", "timestamp": "2026-03-18T14:00:00Z"},
|
|
{"sender": "Sam", "text": "We need the design specs for the dashboard. Still waiting.", "timestamp": "2026-03-19T10:00:00Z"},
|
|
{"sender": "Alex", "text": "Dashboard is completely blocked without those design specs.", "timestamp": "2026-03-20T10:00:00Z"},
|
|
]
|
|
|
|
# Product team messages — NOTE: no mention of design specs being needed
|
|
PRODUCT_MSGS = [
|
|
{"sender": "Lisa", "text": "Dark mode is the most requested feature by far.", "timestamp": "2026-03-16T10:00:00Z"},
|
|
{"sender": "Mike", "text": "We should go mobile-first this sprint.", "timestamp": "2026-03-17T10:00:00Z"},
|
|
{"sender": "Sarah", "text": "API stability is more important than mobile. Enterprise clients are complaining.", "timestamp": "2026-03-17T10:30:00Z"},
|
|
{"sender": "Lisa", "text": "I told the client we'd have the dashboard demo ready by Friday.", "timestamp": "2026-03-18T10:00:00Z"},
|
|
{"sender": "Mike", "text": "Let's push for the API-first approach this quarter.", "timestamp": "2026-03-19T10:00:00Z"},
|
|
]
|
|
|
|
async def main():
|
|
from backend.agents.pattern_detector import detect_patterns
|
|
from backend.agents.cross_group_analyst import analyze_cross_group
|
|
|
|
dev_group = "test_dev_m5"
|
|
product_group = "test_product_m5"
|
|
|
|
# Process both groups
|
|
print("Processing dev team messages...")
|
|
dev_signals = await process_message_batch(dev_group, DEV_MSGS)
|
|
print(f" ✅ Dev team: {len(dev_signals)} signals stored")
|
|
|
|
print("Processing product team messages...")
|
|
prod_signals = await process_message_batch(product_group, PRODUCT_MSGS)
|
|
print(f" ✅ Product team: {len(prod_signals)} signals stored")
|
|
|
|
# Test pattern detection
|
|
print("\nRunning pattern detection on dev team...")
|
|
patterns = await detect_patterns(dev_group)
|
|
print(f" Found {len(patterns)} patterns:")
|
|
for p in patterns:
|
|
print(f" [{p.severity}] {p.type}: {p.description[:80]}")
|
|
print(f" ✅ Pattern detection working")
|
|
|
|
# Test cross-group analysis
|
|
print("\nRunning cross-group analysis...")
|
|
from backend.db.chroma import get_all_signals
|
|
summaries = {
|
|
"Acme Dev Team": get_all_signals(dev_group),
|
|
"Acme Product": get_all_signals(product_group),
|
|
}
|
|
insights = await analyze_cross_group(summaries)
|
|
print(f" Found {len(insights)} cross-group insights:")
|
|
for i in insights:
|
|
print(f" [{i.severity}] {i.type}: {i.description[:100]}")
|
|
print(f" ✅ Cross-group analysis working")
|
|
|
|
# Cleanup
|
|
import chromadb
|
|
from backend.config import CHROMA_DB_PATH
|
|
client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
|
|
for name in [dev_group, product_group]:
|
|
try:
|
|
client.delete_collection(f"ll_{name}")
|
|
except:
|
|
pass
|
|
|
|
print("\n🎉 MILESTONE 5 PASSED — Pattern detection + cross-group analysis working")
|
|
|
|
asyncio.run(main())
|
|
```
|
|
|
|
Run: `cd thirdeye && python scripts/test_m5.py`
|
|
|
|
---
|
|
|
|
## MILESTONE 6: Telegram Bot (70%)
|
|
**Goal:** Working Telegram bot that listens to group messages, processes them, and responds to /ask queries.
|
|
|
|
### Step 6.1 — Create the Telegram bot
|
|
|
|
Create file: `thirdeye/backend/bot/bot.py`
|
|
```python
|
|
"""ThirdEye Telegram Bot — main entry point."""
|
|
import asyncio
|
|
import logging
|
|
from collections import defaultdict
|
|
from telegram import Update
|
|
from telegram.ext import (
|
|
Application, MessageHandler, CommandHandler, filters, ContextTypes
|
|
)
|
|
from backend.config import TELEGRAM_BOT_TOKEN, BATCH_SIZE
|
|
from backend.pipeline import process_message_batch, query_knowledge, get_lens, set_lens, detect_and_set_lens
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s")
|
|
logger = logging.getLogger("thirdeye.bot")
|
|
|
|
# Message buffers per group
|
|
_buffers = defaultdict(list)
|
|
_group_names = {}
|
|
|
|
|
|
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Process every text message in groups."""
|
|
if not update.message or not update.message.text:
|
|
return
|
|
if not update.message.chat.type in ("group", "supergroup"):
|
|
return
|
|
|
|
group_id = str(update.message.chat_id)
|
|
_group_names[group_id] = update.message.chat.title or group_id
|
|
|
|
msg = {
|
|
"sender": update.message.from_user.first_name or update.message.from_user.username or "Unknown",
|
|
"text": update.message.text,
|
|
"timestamp": update.message.date.isoformat(),
|
|
"message_id": update.message.message_id,
|
|
}
|
|
|
|
_buffers[group_id].append(msg)
|
|
|
|
# Process when buffer reaches batch size
|
|
if len(_buffers[group_id]) >= BATCH_SIZE:
|
|
batch = _buffers[group_id]
|
|
_buffers[group_id] = []
|
|
|
|
try:
|
|
signals = await process_message_batch(group_id, batch)
|
|
if signals:
|
|
logger.info(f"Processed batch: {len(signals)} signals from {_group_names.get(group_id, group_id)}")
|
|
except Exception as e:
|
|
logger.error(f"Pipeline error: {e}")
|
|
|
|
|
|
async def cmd_ask(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle /ask [question]."""
|
|
if not context.args:
|
|
await update.message.reply_text("Usage: /ask [your question]\nExample: /ask What database did we choose?")
|
|
return
|
|
|
|
question = " ".join(context.args)
|
|
group_id = str(update.message.chat_id)
|
|
|
|
await update.message.reply_text("🔍 Searching the knowledge base...")
|
|
|
|
try:
|
|
answer = await query_knowledge(group_id, question)
|
|
await update.message.reply_text(f"💡 {answer}")
|
|
except Exception as e:
|
|
await update.message.reply_text(f"Sorry, something went wrong: {str(e)[:100]}")
|
|
|
|
|
|
async def cmd_digest(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Generate a summary digest."""
|
|
group_id = str(update.message.chat_id)
|
|
from backend.db.chroma import get_all_signals
|
|
|
|
signals = get_all_signals(group_id)
|
|
if not signals:
|
|
await update.message.reply_text("No intelligence gathered yet. I need more conversation to learn from!")
|
|
return
|
|
|
|
# Group by type
|
|
by_type = defaultdict(list)
|
|
for s in signals:
|
|
by_type[s["metadata"].get("type", "other")].append(s)
|
|
|
|
parts = [f"📊 *Intelligence Digest* ({len(signals)} total signals)\n"]
|
|
for sig_type, sigs in sorted(by_type.items(), key=lambda x: -len(x[1])):
|
|
parts.append(f"• {sig_type.replace('_', ' ').title()}: {len(sigs)} signals")
|
|
|
|
await update.message.reply_text("\n".join(parts))
|
|
|
|
|
|
async def cmd_lens(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Set or check the active lens."""
|
|
group_id = str(update.message.chat_id)
|
|
|
|
if context.args:
|
|
mode = context.args[0].lower()
|
|
if mode in ("dev", "product", "client", "community", "auto"):
|
|
set_lens(group_id, mode)
|
|
await update.message.reply_text(f"🔭 Lens set to: {mode}")
|
|
else:
|
|
await update.message.reply_text("Valid lenses: dev, product, client, community, auto")
|
|
else:
|
|
current = get_lens(group_id)
|
|
await update.message.reply_text(f"🔭 Current lens: {current}")
|
|
|
|
|
|
async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Welcome message."""
|
|
await update.message.reply_text(
|
|
"👁️ *ThirdEye* — Conversation Intelligence Engine\n\n"
|
|
"I'm now listening to this group and extracting intelligence from your conversations.\n\n"
|
|
"Commands:\n"
|
|
"/ask [question] — Ask about your team's knowledge\n"
|
|
"/digest — Get an intelligence summary\n"
|
|
"/lens [mode] — Set detection mode (dev/product/client/community)\n"
|
|
"/alerts — View active warnings\n\n"
|
|
"I work passively — no need to tag me. I'll alert you when I spot patterns or issues.",
|
|
parse_mode=None
|
|
)
|
|
|
|
|
|
def run_bot():
|
|
"""Start the Telegram bot."""
|
|
app = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
|
|
|
|
app.add_handler(CommandHandler("start", cmd_start))
|
|
app.add_handler(CommandHandler("ask", cmd_ask))
|
|
app.add_handler(CommandHandler("digest", cmd_digest))
|
|
app.add_handler(CommandHandler("lens", cmd_lens))
|
|
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
|
|
|
|
logger.info("ThirdEye bot starting...")
|
|
app.run_polling(allowed_updates=Update.ALL_TYPES)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_bot()
|
|
```
|
|
|
|
### Step 6.2 — Create bot entry point
|
|
|
|
Create file: `thirdeye/run_bot.py`
|
|
```python
|
|
"""Entry point to run the ThirdEye Telegram bot."""
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
|
|
from backend.bot.bot import run_bot
|
|
|
|
if __name__ == "__main__":
|
|
run_bot()
|
|
```
|
|
|
|
### ✅ TEST MILESTONE 6
|
|
|
|
**This is a manual test — you need to interact with the bot on Telegram.**
|
|
|
|
Pre-check: `cd thirdeye && python scripts/test_m6_precheck.py`
|
|
|
|
Create file: `thirdeye/scripts/test_m6_precheck.py`
|
|
```python
|
|
"""Pre-check for Milestone 6: Verify bot token works before running."""
|
|
import asyncio, os, sys
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
async def test():
|
|
from telegram import Bot
|
|
from backend.config import TELEGRAM_BOT_TOKEN
|
|
|
|
assert TELEGRAM_BOT_TOKEN and len(TELEGRAM_BOT_TOKEN) > 10, "TELEGRAM_BOT_TOKEN not set!"
|
|
|
|
bot = Bot(token=TELEGRAM_BOT_TOKEN)
|
|
me = await bot.get_me()
|
|
print(f" ✅ Bot connected: @{me.username} ({me.first_name})")
|
|
print(f"\n IMPORTANT: Before testing in a group:")
|
|
print(f" 1. Open Telegram → Search @BotFather")
|
|
print(f" 2. Send: /setprivacy")
|
|
print(f" 3. Select @{me.username}")
|
|
print(f" 4. Choose: Disable")
|
|
print(f" 5. Create a test group, add @{me.username} to it")
|
|
print(f"\n Then run: python run_bot.py")
|
|
print(f" Send messages in the group, then try: /ask [question]")
|
|
|
|
asyncio.run(test())
|
|
```
|
|
|
|
**Manual test steps:**
|
|
1. Run `python scripts/test_m6_precheck.py` — verify bot connects
|
|
2. Disable privacy mode in BotFather (CRITICAL)
|
|
3. Create a test Telegram group
|
|
4. Add the bot to the group
|
|
5. Run `python run_bot.py`
|
|
6. Send 5+ messages in the group (mix of tech discussion)
|
|
7. Try `/ask What was discussed?`
|
|
8. Try `/digest`
|
|
9. Try `/lens`
|
|
|
|
**Expected:** Bot receives messages, processes them in batches of 5, and `/ask` returns relevant answers.
|
|
|
|
```
|
|
🎉 MILESTONE 6 PASSED — Bot is live and processing messages
|
|
```
|
|
|
|
---
|
|
|
|
## MILESTONE 7: FastAPI Backend + Dashboard (85%)
|
|
**Goal:** REST API serving data to a React dashboard.
|
|
|
|
### Step 7.1 — Create FastAPI routes
|
|
|
|
Create file: `thirdeye/backend/api/routes.py`
|
|
```python
|
|
"""FastAPI routes for the ThirdEye dashboard."""
|
|
from fastapi import FastAPI, HTTPException
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from backend.db.chroma import get_all_signals, query_signals, get_group_ids
|
|
from backend.pipeline import query_knowledge, get_lens, set_lens
|
|
from backend.agents.pattern_detector import detect_patterns
|
|
from backend.agents.cross_group_analyst import analyze_cross_group
|
|
from collections import defaultdict
|
|
|
|
app = FastAPI(title="ThirdEye API", version="1.0.0")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
@app.get("/api/groups")
|
|
async def list_groups():
|
|
"""List all monitored groups."""
|
|
group_ids = get_group_ids()
|
|
groups = []
|
|
for gid in group_ids:
|
|
signals = get_all_signals(gid)
|
|
groups.append({
|
|
"group_id": gid,
|
|
"signal_count": len(signals),
|
|
"lens": get_lens(gid),
|
|
})
|
|
return {"groups": groups}
|
|
|
|
|
|
@app.get("/api/groups/{group_id}/signals")
|
|
async def get_signals(group_id: str, signal_type: str = None):
|
|
"""Get all signals for a group."""
|
|
signals = get_all_signals(group_id, signal_type=signal_type)
|
|
return {"signals": signals, "count": len(signals)}
|
|
|
|
|
|
@app.post("/api/groups/{group_id}/query")
|
|
async def query_group(group_id: str, body: dict):
|
|
"""Natural language query over a group's knowledge base."""
|
|
question = body.get("question", "")
|
|
if not question:
|
|
raise HTTPException(400, "Missing 'question' field")
|
|
answer = await query_knowledge(group_id, question)
|
|
return {"answer": answer, "question": question}
|
|
|
|
|
|
@app.get("/api/groups/{group_id}/patterns")
|
|
async def get_patterns(group_id: str):
|
|
"""Detect and return patterns for a group."""
|
|
patterns = await detect_patterns(group_id)
|
|
return {"patterns": [p.model_dump() for p in patterns]}
|
|
|
|
|
|
@app.get("/api/cross-group/insights")
|
|
async def get_cross_group_insights():
|
|
"""Run cross-group analysis and return insights."""
|
|
group_ids = get_group_ids()
|
|
if len(group_ids) < 2:
|
|
return {"insights": [], "message": "Need at least 2 monitored groups"}
|
|
|
|
summaries = {}
|
|
for gid in group_ids:
|
|
summaries[gid] = get_all_signals(gid)
|
|
|
|
insights = await analyze_cross_group(summaries)
|
|
return {"insights": [i.model_dump() for i in insights]}
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "ok", "service": "thirdeye"}
|
|
```
|
|
|
|
### Step 7.2 — Create API entry point
|
|
|
|
Create file: `thirdeye/run_api.py`
|
|
```python
|
|
"""Entry point to run the ThirdEye API server."""
|
|
import sys, os
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
|
|
import uvicorn
|
|
from backend.api.routes import app
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
|
|
```
|
|
|
|
### ✅ TEST MILESTONE 7
|
|
|
|
```bash
|
|
# Terminal 1: Start the API
|
|
cd thirdeye && python run_api.py
|
|
|
|
# Terminal 2: Test endpoints
|
|
curl http://localhost:8000/health
|
|
# Expected: {"status":"ok","service":"thirdeye"}
|
|
|
|
curl http://localhost:8000/api/groups
|
|
# Expected: {"groups": [...]}
|
|
|
|
# If you ran Milestone 4/5 tests and have data:
|
|
curl http://localhost:8000/api/groups/test_dev_m5/signals
|
|
curl -X POST http://localhost:8000/api/groups/test_dev_m5/query \
|
|
-H "Content-Type: application/json" \
|
|
-d '{"question": "What database was chosen?"}'
|
|
```
|
|
|
|
```
|
|
🎉 MILESTONE 7 PASSED — API serving data for dashboard
|
|
```
|
|
|
|
---
|
|
|
|
## MILESTONE 8: Run Both Bot + API Together (90%)
|
|
**Goal:** Bot and API running simultaneously. Bot feeds data, API serves it.
|
|
|
|
### Step 8.1 — Create unified runner
|
|
|
|
Create file: `thirdeye/run_all.py`
|
|
```python
|
|
"""Run both the Telegram bot and FastAPI server together."""
|
|
import sys, os, asyncio, threading, logging
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s")
|
|
|
|
def run_api_server():
|
|
"""Run FastAPI in a separate thread."""
|
|
import uvicorn
|
|
from backend.api.routes import app
|
|
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="warning")
|
|
|
|
def run_telegram_bot():
|
|
"""Run Telegram bot (blocks)."""
|
|
from backend.bot.bot import run_bot
|
|
run_bot()
|
|
|
|
if __name__ == "__main__":
|
|
print("🚀 Starting ThirdEye...")
|
|
print(" API: http://localhost:8000")
|
|
print(" Bot: Running on Telegram")
|
|
print(" Dashboard: Open dashboard/index.html or run React dev server\n")
|
|
|
|
# Start API in background thread
|
|
api_thread = threading.Thread(target=run_api_server, daemon=True)
|
|
api_thread.start()
|
|
|
|
# Run bot in main thread (it has its own event loop)
|
|
run_telegram_bot()
|
|
```
|
|
|
|
### ✅ TEST MILESTONE 8
|
|
|
|
```bash
|
|
cd thirdeye && python run_all.py
|
|
```
|
|
|
|
Then in another terminal:
|
|
```bash
|
|
# API should be running
|
|
curl http://localhost:8000/health
|
|
|
|
# Bot should be responding in Telegram
|
|
# Send messages in your test group, then /ask
|
|
```
|
|
|
|
```
|
|
🎉 MILESTONE 8 PASSED — Full system running
|
|
```
|
|
|
|
---
|
|
|
|
## MILESTONE 9: Demo Data Seeding (95%)
|
|
**Goal:** Pre-seed 3 groups with realistic messages that demonstrate all features.
|
|
|
|
### Step 9.1 — Create demo seeding script
|
|
|
|
Create file: `thirdeye/scripts/seed_demo.py`
|
|
```python
|
|
"""Seed demo data directly into ChromaDB (bypasses Telegram for speed)."""
|
|
import asyncio, os, sys
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
from backend.pipeline import process_message_batch, set_lens
|
|
|
|
# ====== ACME DEV TEAM ======
|
|
DEV_MESSAGES = [
|
|
{"sender": "Alex", "text": "Team, I'm proposing we use PostgreSQL for the main database. Our data is highly relational.", "timestamp": "2026-03-10T10:00:00Z"},
|
|
{"sender": "Priya", "text": "Agreed on Postgres. I'll set up the initial schema and migrations.", "timestamp": "2026-03-10T10:15:00Z"},
|
|
{"sender": "Raj", "text": "I'll take ownership of the payment module. I have experience with Stripe webhooks.", "timestamp": "2026-03-10T14:00:00Z"},
|
|
{"sender": "Alex", "text": "For the auth service, I'm hardcoding the JWT secret for now. We'll move to vault later.", "timestamp": "2026-03-11T09:00:00Z"},
|
|
{"sender": "Sam", "text": "Getting a timeout error on the checkout endpoint. Seems intermittent.", "timestamp": "2026-03-12T10:00:00Z"},
|
|
{"sender": "Raj", "text": "Payment webhook is fully integrated now. Only I know how the retry logic works though.", "timestamp": "2026-03-13T11:00:00Z"},
|
|
{"sender": "Sam", "text": "Timeout error on checkout is back again. Second time this week.", "timestamp": "2026-03-14T09:00:00Z"},
|
|
{"sender": "Alex", "text": "Just restart the pod when it happens. I'll investigate after the sprint.", "timestamp": "2026-03-14T09:30:00Z"},
|
|
{"sender": "Sam", "text": "Has anyone heard back from design about the dashboard specs? We need them to start.", "timestamp": "2026-03-15T10:00:00Z"},
|
|
{"sender": "Alex", "text": "Still no dashboard specs from design. This is blocking my entire sprint work.", "timestamp": "2026-03-17T10:00:00Z"},
|
|
{"sender": "Sam", "text": "Timeout error AGAIN. That's the third time. We have a systemic issue here.", "timestamp": "2026-03-18T09:00:00Z"},
|
|
{"sender": "Priya", "text": "I'm pushing this config change directly to main. It's a small fix, should be fine.", "timestamp": "2026-03-19T14:00:00Z"},
|
|
{"sender": "Sam", "text": "Dashboard is completely blocked without those design specs. Week 2 of waiting.", "timestamp": "2026-03-20T10:00:00Z"},
|
|
]
|
|
|
|
# ====== ACME PRODUCT TEAM ======
|
|
PRODUCT_MESSAGES = [
|
|
{"sender": "Lisa", "text": "Users keep asking about dark mode. It comes up in literally every demo.", "timestamp": "2026-03-10T10:00:00Z"},
|
|
{"sender": "Mike", "text": "I think we should prioritize the mobile app this sprint. Mobile traffic is 60%.", "timestamp": "2026-03-11T10:00:00Z"},
|
|
{"sender": "Sarah", "text": "No, API stability is way more important. Two enterprise clients complained last week about downtime.", "timestamp": "2026-03-11T10:30:00Z"},
|
|
{"sender": "Lisa", "text": "Sarah from ClientCo literally said 'I would pay double if you had SSO integration.'", "timestamp": "2026-03-12T14:00:00Z"},
|
|
{"sender": "Mike", "text": "Competitor X just launched a mobile-first version. We're falling behind on mobile.", "timestamp": "2026-03-13T10:00:00Z"},
|
|
{"sender": "Lisa", "text": "I told ClientCo we'd have the dashboard demo ready by Friday March 21st.", "timestamp": "2026-03-14T10:00:00Z"},
|
|
{"sender": "Sarah", "text": "Our conversion rate dropped to 2.3% after the last release. That's concerning.", "timestamp": "2026-03-15T11:00:00Z"},
|
|
{"sender": "Mike", "text": "Let's commit to API-first approach for the rest of Q1.", "timestamp": "2026-03-17T10:00:00Z"},
|
|
{"sender": "Lisa", "text": "Dark mode was mentioned again by three different users at the conference.", "timestamp": "2026-03-19T10:00:00Z"},
|
|
{"sender": "Sarah", "text": "We really need to decide: mobile or API stability? This conflict is slowing us down.", "timestamp": "2026-03-20T10:00:00Z"},
|
|
]
|
|
|
|
# ====== ACME ↔ CLIENT CHANNEL ======
|
|
CLIENT_MESSAGES = [
|
|
{"sender": "Lisa", "text": "Hi ClientCo team! Just confirming we'll have the dashboard mockups ready by Friday March 21st.", "timestamp": "2026-03-10T10:00:00Z"},
|
|
{"sender": "Client_CEO", "text": "Great, looking forward to seeing them. This is a key deliverable for our board meeting.", "timestamp": "2026-03-10T10:30:00Z"},
|
|
{"sender": "Lisa", "text": "We'll also share the API documentation by Wednesday March 19th.", "timestamp": "2026-03-11T10:00:00Z"},
|
|
{"sender": "Client_CEO", "text": "Perfect. Oh, could you also add an export-to-PDF feature for the reports? That would be really helpful.", "timestamp": "2026-03-12T10:00:00Z"},
|
|
{"sender": "Lisa", "text": "Sure, we'll look into the PDF export!", "timestamp": "2026-03-12T10:15:00Z"},
|
|
{"sender": "Client_CEO", "text": "Any update on the dashboard mockups?", "timestamp": "2026-03-17T10:00:00Z"},
|
|
{"sender": "Client_CEO", "text": "Also, would it be possible to add a dark mode option?", "timestamp": "2026-03-18T10:00:00Z"},
|
|
{"sender": "Client_CEO", "text": "We really need those mockups before the board meeting on Monday.", "timestamp": "2026-03-19T10:00:00Z"},
|
|
{"sender": "Client_CEO", "text": "I might need to loop in our VP if we can't get the timeline confirmed.", "timestamp": "2026-03-20T10:00:00Z"},
|
|
]
|
|
|
|
|
|
async def seed():
|
|
print("🌱 Seeding demo data...\n")
|
|
|
|
# Set lenses explicitly
|
|
set_lens("acme_dev", "dev")
|
|
set_lens("acme_product", "product")
|
|
set_lens("acme_client", "client")
|
|
|
|
# Process dev team (2 batches)
|
|
print("Processing Acme Dev Team...")
|
|
s1 = await process_message_batch("acme_dev", DEV_MESSAGES[:7])
|
|
s2 = await process_message_batch("acme_dev", DEV_MESSAGES[7:])
|
|
print(f" ✅ Dev team: {len(s1) + len(s2)} signals stored\n")
|
|
|
|
# Process product team
|
|
print("Processing Acme Product Team...")
|
|
s3 = await process_message_batch("acme_product", PRODUCT_MESSAGES[:5])
|
|
s4 = await process_message_batch("acme_product", PRODUCT_MESSAGES[5:])
|
|
print(f" ✅ Product team: {len(s3) + len(s4)} signals stored\n")
|
|
|
|
# Process client channel
|
|
print("Processing Acme ↔ ClientCo Channel...")
|
|
s5 = await process_message_batch("acme_client", CLIENT_MESSAGES[:5])
|
|
s6 = await process_message_batch("acme_client", CLIENT_MESSAGES[5:])
|
|
print(f" ✅ Client channel: {len(s5) + len(s6)} signals stored\n")
|
|
|
|
# Run pattern detection
|
|
print("Running pattern detection on dev team...")
|
|
from backend.agents.pattern_detector import detect_patterns
|
|
patterns = await detect_patterns("acme_dev")
|
|
print(f" Found {len(patterns)} patterns")
|
|
for p in patterns:
|
|
print(f" [{p.severity}] {p.type}: {p.description[:80]}")
|
|
|
|
# Run cross-group analysis
|
|
print("\n🔥 Running CROSS-GROUP ANALYSIS...")
|
|
from backend.agents.cross_group_analyst import analyze_cross_group
|
|
from backend.db.chroma import get_all_signals
|
|
|
|
summaries = {
|
|
"Acme Dev Team": get_all_signals("acme_dev"),
|
|
"Acme Product": get_all_signals("acme_product"),
|
|
"Acme ↔ ClientCo": get_all_signals("acme_client"),
|
|
}
|
|
insights = await analyze_cross_group(summaries)
|
|
print(f"\n Found {len(insights)} CROSS-GROUP INSIGHTS:")
|
|
for i in insights:
|
|
print(f" 🚨 [{i.severity}] {i.type}")
|
|
print(f" {i.description[:120]}")
|
|
print(f" Recommendation: {i.recommendation[:100]}")
|
|
print()
|
|
|
|
print("🎉 Demo data seeded successfully!")
|
|
print(" Start the API with: python run_api.py")
|
|
print(" Then visit: http://localhost:8000/api/groups")
|
|
print(" And: http://localhost:8000/api/cross-group/insights")
|
|
|
|
asyncio.run(seed())
|
|
```
|
|
|
|
### ✅ TEST MILESTONE 9
|
|
|
|
```bash
|
|
cd thirdeye && python scripts/seed_demo.py
|
|
```
|
|
|
|
Then verify the API serves the data:
|
|
```bash
|
|
python run_api.py &
|
|
curl http://localhost:8000/api/groups
|
|
curl http://localhost:8000/api/groups/acme_dev/signals
|
|
curl http://localhost:8000/api/cross-group/insights
|
|
```
|
|
|
|
```
|
|
🎉 MILESTONE 9 PASSED — Demo data seeded and queryable
|
|
```
|
|
|
|
---
|
|
|
|
## MILESTONE 10: Final Polish & Demo Ready (100%)
|
|
**Goal:** Everything works. Demo rehearsed. Ready to present.
|
|
|
|
### Checklist
|
|
|
|
- [ ] `python run_all.py` starts both bot + API without errors
|
|
- [ ] Demo data is seeded (`python scripts/seed_demo.py`)
|
|
- [ ] API endpoints return data: `/api/groups`, `/api/groups/{id}/signals`, `/api/cross-group/insights`
|
|
- [ ] Telegram bot responds to `/start`, `/ask`, `/digest`, `/lens`
|
|
- [ ] Cross-group insights are detected (the wow moment)
|
|
- [ ] 90-second demo script is rehearsed (see THIRDEYE_BIBLE.md Section 20)
|
|
- [ ] GitHub repo has README with setup instructions
|
|
- [ ] Backup: screenshots of dashboard / API output saved in case live demo fails
|
|
|
|
### Create README.md
|
|
|
|
Create file: `thirdeye/README.md`
|
|
```markdown
|
|
# ThirdEye 👁️
|
|
|
|
**Your team already has the answers. They're just trapped in chat.**
|
|
|
|
ThirdEye is a multi-agent conversation intelligence engine that passively monitors Telegram group chats, extracts structured knowledge, and detects blind spots across teams.
|
|
|
|
## Features
|
|
- 🔍 Passive intelligence extraction (no commands needed)
|
|
- 🔭 Adaptive lens system (DevLens, ProductLens, ClientLens, CommunityLens)
|
|
- 🧠 Living knowledge graph that grows over time
|
|
- 📊 Pattern detection (tech debt trends, knowledge silos, recurring bugs)
|
|
- 🔥 **Cross-group intelligence** — detects blind spots between teams
|
|
- 💬 Natural language querying via Telegram
|
|
|
|
## Quick Start
|
|
```bash
|
|
pip install -r requirements.txt
|
|
cp .env.example .env # Fill in API keys
|
|
python scripts/seed_demo.py # Seed demo data
|
|
python run_all.py # Start bot + API
|
|
```
|
|
|
|
## Architecture
|
|
8 specialized AI agents powered by 5 free LLM providers (Groq, Cerebras, SambaNova, OpenRouter, Cohere) with automatic fallback routing.
|
|
|
|
Built in 24 hours with $0 in API costs.
|
|
```
|
|
|
|
```
|
|
🎉 MILESTONE 10 COMPLETE — ThirdEye is ready to win.
|
|
```
|
|
|
|
---
|
|
|
|
## MILESTONE SUMMARY
|
|
|
|
| # | Milestone | What You Have | % |
|
|
|---|---|---|---|
|
|
| 0 | Scaffolding | Folders, deps, env vars, all API keys | 0% |
|
|
| 1 | Provider Router | Multi-provider LLM calls with fallback | 10% |
|
|
| 2 | ChromaDB + Embeddings | Store and retrieve signals with vector search | 20% |
|
|
| 3 | Core Agents | Signal Extractor + Classifier + Context Detector | 30% |
|
|
| 4 | Full Pipeline | Messages → Extract → Classify → Store → Query | 45% |
|
|
| 5 | Intelligence Layer | Pattern detection + Cross-group analysis | 60% |
|
|
| 6 | Telegram Bot | Live bot processing group messages | 70% |
|
|
| 7 | FastAPI + Dashboard API | REST API serving all data | 85% |
|
|
| 8 | Unified Runner | Bot + API running together | 90% |
|
|
| 9 | Demo Data | 3 groups seeded with realistic data | 95% |
|
|
| 10 | Polish & Demo Ready | README, rehearsed demo, everything working | 100% |
|
|
|
|
**Every milestone has a test. Every test must pass. No skipping.** |