mirror of
https://github.com/arkorty/B.Tech-Project-III.git
synced 2026-04-19 12:41:48 +00:00
1622 lines
59 KiB
Markdown
1622 lines
59 KiB
Markdown
# ThirdEye — Additional Milestones (11→13)
|
|
|
|
> **Prerequisite: Milestone 10 must be COMPLETE and PASSING. These features layer on top of the existing working system.**
|
|
> **Same rule: Do NOT skip milestones. Do NOT skip tests. Every test must PASS before moving to the next milestone.**
|
|
|
|
---
|
|
|
|
## PRE-WORK: Dependencies & Config Updates
|
|
|
|
### Step 0.1 — Add new dependencies
|
|
|
|
Append to `thirdeye/requirements.txt`:
|
|
```
|
|
python-docx==1.1.2
|
|
PyPDF2==3.0.1
|
|
tavily-python==0.5.0
|
|
beautifulsoup4==4.12.3
|
|
```
|
|
|
|
Install:
|
|
```bash
|
|
cd thirdeye && pip install python-docx PyPDF2 tavily-python beautifulsoup4
|
|
```
|
|
|
|
### Step 0.2 — Add new env vars
|
|
|
|
Append to `thirdeye/.env`:
|
|
```bash
|
|
# Web Search (Milestone 12)
|
|
TAVILY_API_KEY=your_tavily_key_here
|
|
|
|
# Feature Flags
|
|
ENABLE_DOCUMENT_INGESTION=true
|
|
ENABLE_WEB_SEARCH=true
|
|
ENABLE_LINK_FETCH=true
|
|
```
|
|
|
|
**Get the key:** https://tavily.com → Sign up → Dashboard → API Keys (free tier: 1000 searches/month, no credit card)
|
|
|
|
### Step 0.3 — Update config.py
|
|
|
|
Add these lines at the bottom of `thirdeye/backend/config.py`:
|
|
```python
|
|
# Web Search
|
|
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
|
|
|
|
# Feature Flags
|
|
ENABLE_DOCUMENT_INGESTION = os.getenv("ENABLE_DOCUMENT_INGESTION", "true").lower() == "true"
|
|
ENABLE_WEB_SEARCH = os.getenv("ENABLE_WEB_SEARCH", "true").lower() == "true"
|
|
ENABLE_LINK_FETCH = os.getenv("ENABLE_LINK_FETCH", "true").lower() == "true"
|
|
```
|
|
|
|
---
|
|
|
|
## MILESTONE 11: Document & PDF Ingestion into RAG (105%)
|
|
**Goal:** When a PDF, DOCX, or TXT file is shared in a Telegram group, the bot auto-downloads it, extracts text, chunks it, and stores the chunks in ChromaDB as `document_knowledge` signals — queryable alongside chat signals.
|
|
|
|
### Step 11.1 — Create the Document Ingestor
|
|
|
|
Create file: `thirdeye/backend/agents/document_ingestor.py`
|
|
```python
|
|
"""Document Ingestor — extracts text from PDFs, DOCX, TXT and chunks for RAG storage."""
|
|
import os
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger("thirdeye.agents.document_ingestor")
|
|
|
|
# --- Text Extraction ---
|
|
|
|
def extract_text_from_pdf(file_path: str) -> list[dict]:
|
|
"""Extract text from PDF, returns list of {page: int, text: str}."""
|
|
from PyPDF2 import PdfReader
|
|
|
|
pages = []
|
|
try:
|
|
reader = PdfReader(file_path)
|
|
for i, page in enumerate(reader.pages):
|
|
text = page.extract_text()
|
|
if text and text.strip():
|
|
pages.append({"page": i + 1, "text": text.strip()})
|
|
except Exception as e:
|
|
logger.error(f"PDF extraction failed for {file_path}: {e}")
|
|
|
|
return pages
|
|
|
|
|
|
def extract_text_from_docx(file_path: str) -> list[dict]:
|
|
"""Extract text from DOCX, returns list of {page: 1, text: str} (DOCX has no real pages)."""
|
|
from docx import Document
|
|
|
|
try:
|
|
doc = Document(file_path)
|
|
full_text = "\n".join([p.text for p in doc.paragraphs if p.text.strip()])
|
|
if full_text.strip():
|
|
return [{"page": 1, "text": full_text.strip()}]
|
|
except Exception as e:
|
|
logger.error(f"DOCX extraction failed for {file_path}: {e}")
|
|
|
|
return []
|
|
|
|
|
|
def extract_text_from_txt(file_path: str) -> list[dict]:
|
|
"""Extract text from plain text file."""
|
|
try:
|
|
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
|
|
text = f.read().strip()
|
|
if text:
|
|
return [{"page": 1, "text": text}]
|
|
except Exception as e:
|
|
logger.error(f"TXT extraction failed for {file_path}: {e}")
|
|
|
|
return []
|
|
|
|
|
|
EXTRACTORS = {
|
|
".pdf": extract_text_from_pdf,
|
|
".docx": extract_text_from_docx,
|
|
".txt": extract_text_from_txt,
|
|
".md": extract_text_from_txt,
|
|
".csv": extract_text_from_txt,
|
|
".json": extract_text_from_txt,
|
|
".log": extract_text_from_txt,
|
|
}
|
|
|
|
|
|
def extract_text(file_path: str) -> list[dict]:
|
|
"""Route to correct extractor based on file extension."""
|
|
ext = os.path.splitext(file_path)[1].lower()
|
|
extractor = EXTRACTORS.get(ext)
|
|
if not extractor:
|
|
logger.warning(f"Unsupported file type: {ext} ({file_path})")
|
|
return []
|
|
return extractor(file_path)
|
|
|
|
|
|
# --- Chunking ---
|
|
|
|
def chunk_text(text: str, max_chars: int = 1500, overlap_chars: int = 200) -> list[str]:
|
|
"""
|
|
Split text into overlapping chunks.
|
|
|
|
Uses paragraph boundaries when possible, falls back to sentence boundaries,
|
|
then hard character splits. ~1500 chars ≈ ~375 tokens for embedding.
|
|
"""
|
|
if len(text) <= max_chars:
|
|
return [text]
|
|
|
|
# Split by paragraphs first
|
|
paragraphs = [p.strip() for p in text.split("\n") if p.strip()]
|
|
|
|
chunks = []
|
|
current_chunk = ""
|
|
|
|
for para in paragraphs:
|
|
# If adding this paragraph stays under limit, add it
|
|
if len(current_chunk) + len(para) + 1 <= max_chars:
|
|
current_chunk = (current_chunk + "\n" + para).strip()
|
|
else:
|
|
# Save current chunk if it has content
|
|
if current_chunk:
|
|
chunks.append(current_chunk)
|
|
|
|
# If single paragraph is too long, split it by sentences
|
|
if len(para) > max_chars:
|
|
sentences = para.replace(". ", ".\n").split("\n")
|
|
sub_chunk = ""
|
|
for sent in sentences:
|
|
if len(sub_chunk) + len(sent) + 1 <= max_chars:
|
|
sub_chunk = (sub_chunk + " " + sent).strip()
|
|
else:
|
|
if sub_chunk:
|
|
chunks.append(sub_chunk)
|
|
sub_chunk = sent
|
|
if sub_chunk:
|
|
current_chunk = sub_chunk
|
|
else:
|
|
current_chunk = ""
|
|
else:
|
|
current_chunk = para
|
|
|
|
if current_chunk:
|
|
chunks.append(current_chunk)
|
|
|
|
# Add overlap: prepend last N chars of previous chunk to each subsequent chunk
|
|
if overlap_chars > 0 and len(chunks) > 1:
|
|
overlapped = [chunks[0]]
|
|
for i in range(1, len(chunks)):
|
|
prev_tail = chunks[i - 1][-overlap_chars:]
|
|
# Find a word boundary in the overlap
|
|
space_idx = prev_tail.find(" ")
|
|
if space_idx > 0:
|
|
prev_tail = prev_tail[space_idx + 1:]
|
|
overlapped.append(prev_tail + " " + chunks[i])
|
|
chunks = overlapped
|
|
|
|
return chunks
|
|
|
|
|
|
# --- Main Ingestion ---
|
|
|
|
def ingest_document(
|
|
file_path: str,
|
|
group_id: str,
|
|
shared_by: str = "Unknown",
|
|
filename: str = None,
|
|
) -> list[dict]:
|
|
"""
|
|
Full pipeline: extract text → chunk → produce signal dicts ready for ChromaDB.
|
|
|
|
Args:
|
|
file_path: Path to the downloaded file on disk
|
|
group_id: Telegram group ID
|
|
shared_by: Who shared the file
|
|
filename: Original filename (for metadata)
|
|
|
|
Returns:
|
|
List of signal dicts ready for store_signals()
|
|
"""
|
|
if filename is None:
|
|
filename = os.path.basename(file_path)
|
|
|
|
# Extract
|
|
pages = extract_text(file_path)
|
|
if not pages:
|
|
logger.warning(f"No text extracted from {filename}")
|
|
return []
|
|
|
|
# Chunk each page
|
|
signals = []
|
|
total_chunks = 0
|
|
|
|
for page_data in pages:
|
|
page_num = page_data["page"]
|
|
chunks = chunk_text(page_data["text"])
|
|
|
|
for chunk_idx, chunk_text_str in enumerate(chunks):
|
|
if len(chunk_text_str.strip()) < 30:
|
|
continue # Skip tiny chunks
|
|
|
|
signal = {
|
|
"id": str(uuid.uuid4()),
|
|
"type": "document_knowledge",
|
|
"summary": f"[{filename} p{page_num}] {chunk_text_str[:150]}...",
|
|
"entities": [f"@{shared_by}", filename],
|
|
"severity": "low",
|
|
"status": "reference",
|
|
"sentiment": "neutral",
|
|
"urgency": "none",
|
|
"raw_quote": chunk_text_str,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"group_id": group_id,
|
|
"lens": "document",
|
|
"keywords": [filename, f"page_{page_num}", "document", shared_by],
|
|
}
|
|
signals.append(signal)
|
|
total_chunks += 1
|
|
|
|
logger.info(f"Ingested {filename}: {len(pages)} pages → {total_chunks} chunks for group {group_id}")
|
|
return signals
|
|
```
|
|
|
|
### Step 11.2 — Add document handler to the Telegram bot
|
|
|
|
Open `thirdeye/backend/bot/bot.py` and add the following.
|
|
|
|
**Add import at the top (after existing imports):**
|
|
```python
|
|
import os
|
|
import tempfile
|
|
from backend.config import ENABLE_DOCUMENT_INGESTION
|
|
from backend.agents.document_ingestor import ingest_document
|
|
from backend.db.chroma import store_signals
|
|
```
|
|
|
|
**Add this handler function (after `handle_message`):**
|
|
```python
|
|
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Process documents/files shared in groups."""
|
|
if not ENABLE_DOCUMENT_INGESTION:
|
|
return
|
|
if not update.message or not update.message.document:
|
|
return
|
|
if not update.message.chat.type in ("group", "supergroup"):
|
|
return
|
|
|
|
doc = update.message.document
|
|
filename = doc.file_name or "unknown_file"
|
|
ext = os.path.splitext(filename)[1].lower()
|
|
|
|
# Only process supported file types
|
|
supported = {".pdf", ".docx", ".txt", ".md", ".csv", ".json", ".log"}
|
|
if ext not in supported:
|
|
return
|
|
|
|
# Size guard: skip files over 10MB
|
|
if doc.file_size and doc.file_size > 10 * 1024 * 1024:
|
|
logger.warning(f"Skipping oversized file: {filename} ({doc.file_size} bytes)")
|
|
return
|
|
|
|
group_id = str(update.message.chat_id)
|
|
shared_by = update.message.from_user.first_name or update.message.from_user.username or "Unknown"
|
|
_group_names[group_id] = update.message.chat.title or group_id
|
|
|
|
try:
|
|
# Download file to temp directory
|
|
tg_file = await doc.get_file()
|
|
tmp_dir = tempfile.mkdtemp()
|
|
file_path = os.path.join(tmp_dir, filename)
|
|
await tg_file.download_to_drive(file_path)
|
|
|
|
logger.info(f"Downloaded {filename} from {shared_by} in {_group_names.get(group_id, group_id)}")
|
|
|
|
# Ingest into knowledge base
|
|
signals = ingest_document(file_path, group_id, shared_by=shared_by, filename=filename)
|
|
|
|
if signals:
|
|
store_signals(group_id, signals)
|
|
await update.message.reply_text(
|
|
f"📄 Ingested *{filename}* — {len(signals)} knowledge chunks stored.\n"
|
|
f"You can now `/ask` questions about this document.",
|
|
parse_mode=None
|
|
)
|
|
else:
|
|
logger.info(f"No extractable text in {filename}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Document ingestion failed for {filename}: {e}")
|
|
finally:
|
|
# Cleanup temp file
|
|
try:
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
os.rmdir(tmp_dir)
|
|
except Exception:
|
|
pass
|
|
```
|
|
|
|
**Register the handler in `run_bot()` — add this line BEFORE the text message handler:**
|
|
```python
|
|
app.add_handler(MessageHandler(filters.Document.ALL, handle_document))
|
|
```
|
|
|
|
So the handler section in `run_bot()` now looks like:
|
|
```python
|
|
app.add_handler(CommandHandler("start", cmd_start))
|
|
app.add_handler(CommandHandler("ask", cmd_ask))
|
|
app.add_handler(CommandHandler("digest", cmd_digest))
|
|
app.add_handler(CommandHandler("lens", cmd_lens))
|
|
app.add_handler(MessageHandler(filters.Document.ALL, handle_document)) # NEW
|
|
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
|
|
```
|
|
|
|
### ✅ TEST MILESTONE 11
|
|
|
|
Create file: `thirdeye/scripts/test_m11.py`
|
|
```python
|
|
"""Test Milestone 11: Document & PDF ingestion into RAG."""
|
|
import os, sys, tempfile
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
|
|
def test_text_extraction():
|
|
"""Test extraction from each supported file type."""
|
|
from backend.agents.document_ingestor import extract_text
|
|
|
|
# Test 1: Plain text file
|
|
print("Testing TXT extraction...")
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".txt", mode="w", delete=False, encoding="utf-8")
|
|
tmp.write("This is a test document.\nIt has multiple lines.\nThird line about PostgreSQL decisions.")
|
|
tmp.close()
|
|
|
|
pages = extract_text(tmp.name)
|
|
assert len(pages) == 1, f"Expected 1 page, got {len(pages)}"
|
|
assert "PostgreSQL" in pages[0]["text"]
|
|
print(f" ✅ TXT extraction works ({len(pages[0]['text'])} chars)")
|
|
os.unlink(tmp.name)
|
|
|
|
# Test 2: DOCX file
|
|
print("Testing DOCX extraction...")
|
|
try:
|
|
from docx import Document
|
|
doc = Document()
|
|
doc.add_paragraph("Architecture Decision: We chose Redis for caching.")
|
|
doc.add_paragraph("Tech Debt: The API keys are hardcoded in config.py.")
|
|
doc.add_paragraph("Promise: Dashboard mockups will be ready by Friday March 21st.")
|
|
tmp_docx = tempfile.NamedTemporaryFile(suffix=".docx", delete=False)
|
|
doc.save(tmp_docx.name)
|
|
tmp_docx.close()
|
|
|
|
pages = extract_text(tmp_docx.name)
|
|
assert len(pages) == 1, f"Expected 1 page, got {len(pages)}"
|
|
assert "Redis" in pages[0]["text"]
|
|
print(f" ✅ DOCX extraction works ({len(pages[0]['text'])} chars)")
|
|
os.unlink(tmp_docx.name)
|
|
except ImportError:
|
|
print(" ⚠️ python-docx not installed, skipping DOCX test")
|
|
|
|
# Test 3: PDF file
|
|
print("Testing PDF extraction...")
|
|
try:
|
|
from PyPDF2 import PdfWriter
|
|
from io import BytesIO
|
|
# PyPDF2 can't easily create PDFs with text from scratch,
|
|
# so we test the extractor handles an empty/corrupt file gracefully
|
|
tmp_pdf = tempfile.NamedTemporaryFile(suffix=".pdf", delete=False)
|
|
writer = PdfWriter()
|
|
writer.add_blank_page(width=612, height=792)
|
|
writer.write(tmp_pdf)
|
|
tmp_pdf.close()
|
|
|
|
pages = extract_text(tmp_pdf.name)
|
|
# Blank page = no text, should return empty gracefully
|
|
print(f" ✅ PDF extraction handles blank PDF gracefully ({len(pages)} pages with text)")
|
|
os.unlink(tmp_pdf.name)
|
|
except ImportError:
|
|
print(" ⚠️ PyPDF2 not installed, skipping PDF test")
|
|
|
|
# Test 4: Unsupported file type
|
|
print("Testing unsupported file type...")
|
|
tmp_bin = tempfile.NamedTemporaryFile(suffix=".exe", delete=False)
|
|
tmp_bin.write(b"binary data")
|
|
tmp_bin.close()
|
|
pages = extract_text(tmp_bin.name)
|
|
assert len(pages) == 0, "Should return empty for unsupported types"
|
|
print(f" ✅ Unsupported file type handled gracefully")
|
|
os.unlink(tmp_bin.name)
|
|
|
|
|
|
def test_chunking():
|
|
"""Test text chunking logic."""
|
|
from backend.agents.document_ingestor import chunk_text
|
|
|
|
print("\nTesting chunking...")
|
|
|
|
# Test 1: Short text — should NOT be split
|
|
short = "This is a short text that fits in one chunk."
|
|
chunks = chunk_text(short, max_chars=1500)
|
|
assert len(chunks) == 1, f"Short text should be 1 chunk, got {len(chunks)}"
|
|
print(f" ✅ Short text → 1 chunk")
|
|
|
|
# Test 2: Long text — should be split
|
|
long_text = "\n".join([f"This is paragraph {i} with enough content to fill the chunk. " * 5 for i in range(20)])
|
|
chunks = chunk_text(long_text, max_chars=500, overlap_chars=100)
|
|
assert len(chunks) > 1, f"Long text should produce multiple chunks, got {len(chunks)}"
|
|
print(f" ✅ Long text ({len(long_text)} chars) → {len(chunks)} chunks")
|
|
|
|
# Test 3: All chunks are within size limit (with some tolerance for overlap)
|
|
for i, c in enumerate(chunks):
|
|
# Overlap can push slightly over max_chars, that's fine
|
|
assert len(c) < 800, f"Chunk {i} too large: {len(c)} chars"
|
|
print(f" ✅ All chunks are within size bounds")
|
|
|
|
# Test 4: Empty text
|
|
chunks = chunk_text("")
|
|
assert len(chunks) == 1 and chunks[0] == "", "Empty text should return ['']"
|
|
print(f" ✅ Empty text handled")
|
|
|
|
|
|
def test_full_ingestion():
|
|
"""Test full ingestion pipeline: file → extract → chunk → signals → store → query."""
|
|
from backend.agents.document_ingestor import ingest_document
|
|
from backend.db.chroma import store_signals, query_signals
|
|
|
|
print("\nTesting full ingestion pipeline...")
|
|
|
|
# Create a realistic test document
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".txt", mode="w", delete=False, encoding="utf-8")
|
|
tmp.write("""API Specification v2.0 — Acme Project
|
|
|
|
Authentication:
|
|
All endpoints require OAuth 2.0 Bearer tokens. The recommended flow for SPAs is Authorization Code with PKCE.
|
|
Tokens expire after 3600 seconds. Refresh tokens are valid for 30 days.
|
|
|
|
Endpoints:
|
|
POST /api/v2/orders — Create a new order. Requires 'orders:write' scope.
|
|
GET /api/v2/orders/{id} — Retrieve order details. Requires 'orders:read' scope.
|
|
DELETE /api/v2/orders/{id} — Cancel an order. Only allowed within 24 hours of creation.
|
|
|
|
Rate Limits:
|
|
Standard tier: 100 requests per minute.
|
|
Enterprise tier: 1000 requests per minute.
|
|
Rate limit headers (X-RateLimit-Remaining) are included in every response.
|
|
|
|
Compliance:
|
|
All data must be encrypted at rest using AES-256.
|
|
PII fields are redacted in logs automatically.
|
|
GDPR deletion requests must be processed within 72 hours.
|
|
The compliance deadline for the new data residency requirements is April 1st 2026.
|
|
""")
|
|
tmp.close()
|
|
|
|
group_id = "test_doc_m11"
|
|
|
|
# Ingest
|
|
signals = ingest_document(tmp.name, group_id, shared_by="Priya", filename="api_spec_v2.txt")
|
|
assert len(signals) > 0, f"Expected signals, got {len(signals)}"
|
|
print(f" ✅ Ingestion produced {len(signals)} signals")
|
|
|
|
# Verify signal structure
|
|
for s in signals:
|
|
assert s["type"] == "document_knowledge"
|
|
assert s["group_id"] == group_id
|
|
assert "@Priya" in s["entities"]
|
|
assert "api_spec_v2.txt" in s["entities"]
|
|
print(f" ✅ All signals have correct type and metadata")
|
|
|
|
# Store in ChromaDB
|
|
store_signals(group_id, signals)
|
|
print(f" ✅ Stored {len(signals)} document signals in ChromaDB")
|
|
|
|
# Query: can we find document content?
|
|
results = query_signals(group_id, "What authentication method is recommended?")
|
|
assert len(results) > 0, "No results for auth query"
|
|
found_auth = any("oauth" in r["document"].lower() or "auth" in r["document"].lower() for r in results)
|
|
assert found_auth, "Expected to find OAuth/auth info in results"
|
|
print(f" ✅ Query 'authentication method' returns relevant results")
|
|
|
|
results2 = query_signals(group_id, "What is the compliance deadline?")
|
|
assert len(results2) > 0, "No results for compliance query"
|
|
found_compliance = any("april" in r["document"].lower() or "compliance" in r["document"].lower() for r in results2)
|
|
assert found_compliance, "Expected to find compliance deadline in results"
|
|
print(f" ✅ Query 'compliance deadline' returns relevant results")
|
|
|
|
results3 = query_signals(group_id, "rate limits")
|
|
assert len(results3) > 0, "No results for rate limits query"
|
|
print(f" ✅ Query 'rate limits' returns {len(results3)} results")
|
|
|
|
# Cleanup
|
|
os.unlink(tmp.name)
|
|
import chromadb
|
|
from backend.config import CHROMA_DB_PATH
|
|
client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
|
|
try:
|
|
client.delete_collection(f"ll_{group_id}")
|
|
print(f" ✅ Cleaned up test collection")
|
|
except:
|
|
pass
|
|
|
|
|
|
def test_mixed_query():
|
|
"""Test that document signals AND chat signals coexist and are both queryable."""
|
|
from backend.agents.document_ingestor import ingest_document
|
|
from backend.pipeline import process_message_batch, query_knowledge
|
|
from backend.db.chroma import store_signals
|
|
import asyncio
|
|
|
|
print("\nTesting mixed query (documents + chat signals)...")
|
|
|
|
group_id = "test_mixed_m11"
|
|
|
|
# 1. Ingest a document
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".txt", mode="w", delete=False, encoding="utf-8")
|
|
tmp.write("Architecture Decision Record: The team has selected Redis for session caching due to sub-millisecond latency.")
|
|
tmp.close()
|
|
|
|
doc_signals = ingest_document(tmp.name, group_id, shared_by="Priya", filename="adr_001.txt")
|
|
store_signals(group_id, doc_signals)
|
|
os.unlink(tmp.name)
|
|
|
|
# 2. Process some chat messages (that mention a DIFFERENT topic)
|
|
chat_messages = [
|
|
{"sender": "Alex", "text": "The timeout bug on checkout is back. Third time this sprint.", "timestamp": "2026-03-20T10:00:00Z"},
|
|
{"sender": "Sam", "text": "I think it's a database connection pool issue.", "timestamp": "2026-03-20T10:05:00Z"},
|
|
]
|
|
chat_signals = asyncio.run(process_message_batch(group_id, chat_messages))
|
|
|
|
# 3. Query for document knowledge
|
|
answer1 = asyncio.run(query_knowledge(group_id, "What caching solution was selected?"))
|
|
assert "redis" in answer1.lower() or "caching" in answer1.lower(), f"Expected Redis/caching mention, got: {answer1[:100]}"
|
|
print(f" ✅ Document query works: {answer1[:80]}...")
|
|
|
|
# 4. Query for chat knowledge
|
|
answer2 = asyncio.run(query_knowledge(group_id, "What bugs have been reported?"))
|
|
assert "timeout" in answer2.lower() or "bug" in answer2.lower(), f"Expected timeout/bug mention, got: {answer2[:100]}"
|
|
print(f" ✅ Chat query works alongside documents: {answer2[:80]}...")
|
|
|
|
# Cleanup
|
|
import chromadb
|
|
from backend.config import CHROMA_DB_PATH
|
|
client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
|
|
try:
|
|
client.delete_collection(f"ll_{group_id}")
|
|
except:
|
|
pass
|
|
|
|
print(f" ✅ Mixed query (document + chat) both return correct results")
|
|
|
|
|
|
test_text_extraction()
|
|
test_chunking()
|
|
test_full_ingestion()
|
|
test_mixed_query()
|
|
print("\n🎉 MILESTONE 11 PASSED — Document & PDF ingestion working")
|
|
```
|
|
|
|
Run: `cd thirdeye && python scripts/test_m11.py`
|
|
|
|
**Expected output:** All ✅ checks. Documents are extracted, chunked, stored in ChromaDB, and queryable alongside chat-extracted signals.
|
|
|
|
---
|
|
|
|
## MILESTONE 12: Tavily Web Search Tool (110%)
|
|
**Goal:** The Query Agent gains a web search fallback. When internal knowledge is insufficient OR the question is clearly about external/general topics, it calls Tavily for fresh web context. Also adds a `/search` command for explicit web search.
|
|
|
|
### Step 12.1 — Create the web search module
|
|
|
|
Create file: `thirdeye/backend/agents/web_search.py`
|
|
```python
|
|
"""Web Search Agent — Tavily integration for real-time web context."""
|
|
import logging
|
|
from backend.config import TAVILY_API_KEY, ENABLE_WEB_SEARCH
|
|
|
|
logger = logging.getLogger("thirdeye.agents.web_search")
|
|
|
|
_tavily_client = None
|
|
|
|
|
|
def _get_client():
|
|
global _tavily_client
|
|
if _tavily_client is None and TAVILY_API_KEY and len(TAVILY_API_KEY) > 5:
|
|
try:
|
|
from tavily import TavilyClient
|
|
_tavily_client = TavilyClient(api_key=TAVILY_API_KEY)
|
|
logger.info("Tavily client initialized")
|
|
except ImportError:
|
|
logger.error("tavily-python not installed. Run: pip install tavily-python")
|
|
except Exception as e:
|
|
logger.error(f"Tavily client init failed: {e}")
|
|
return _tavily_client
|
|
|
|
|
|
async def search_web(query: str, max_results: int = 5) -> list[dict]:
|
|
"""
|
|
Search the web using Tavily and return structured results.
|
|
|
|
Args:
|
|
query: Search query string
|
|
max_results: Max results to return (1-10)
|
|
|
|
Returns:
|
|
List of {title, url, content, score} dicts, sorted by relevance
|
|
"""
|
|
if not ENABLE_WEB_SEARCH:
|
|
logger.info("Web search is disabled via feature flag")
|
|
return []
|
|
|
|
client = _get_client()
|
|
if not client:
|
|
logger.warning("Tavily client not available (missing API key or install)")
|
|
return []
|
|
|
|
try:
|
|
response = client.search(
|
|
query=query,
|
|
max_results=max_results,
|
|
search_depth="basic", # "basic" is faster + free-tier friendly; "advanced" for deeper
|
|
include_answer=False,
|
|
include_raw_content=False,
|
|
)
|
|
|
|
results = []
|
|
for r in response.get("results", []):
|
|
results.append({
|
|
"title": r.get("title", ""),
|
|
"url": r.get("url", ""),
|
|
"content": r.get("content", ""),
|
|
"score": r.get("score", 0.0),
|
|
})
|
|
|
|
logger.info(f"Tavily returned {len(results)} results for: {query[:60]}")
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Tavily search failed: {e}")
|
|
return []
|
|
|
|
|
|
def format_search_results_for_llm(results: list[dict]) -> str:
|
|
"""Format Tavily results into context string for the Query Agent."""
|
|
if not results:
|
|
return ""
|
|
|
|
parts = []
|
|
for i, r in enumerate(results):
|
|
content_preview = r["content"][:500] if r["content"] else "No content"
|
|
parts.append(
|
|
f"[Web Result {i+1}] {r['title']}\n"
|
|
f"Source: {r['url']}\n"
|
|
f"Content: {content_preview}"
|
|
)
|
|
|
|
return "\n\n".join(parts)
|
|
```
|
|
|
|
### Step 12.2 — Update `query_knowledge` in pipeline.py to use web search
|
|
|
|
Open `thirdeye/backend/pipeline.py` and **replace** the existing `query_knowledge` function with:
|
|
|
|
```python
|
|
async def query_knowledge(group_id: str, question: str, force_web_search: bool = False) -> str:
|
|
"""
|
|
Query the knowledge base with natural language, with optional web search fallback.
|
|
|
|
Flow:
|
|
1. Search internal knowledge base (ChromaDB)
|
|
2. If results are weak OR question is clearly external, also search the web
|
|
3. LLM synthesizes both sources into a final answer
|
|
"""
|
|
from backend.providers import call_llm
|
|
from backend.agents.web_search import search_web, format_search_results_for_llm
|
|
from backend.config import ENABLE_WEB_SEARCH
|
|
|
|
# Step 1: Internal RAG search
|
|
results = query_signals(group_id, question, n_results=8)
|
|
|
|
# Format internal context
|
|
internal_context = ""
|
|
if results:
|
|
context_parts = []
|
|
for i, r in enumerate(results):
|
|
meta = r["metadata"]
|
|
source_label = "Document" if meta.get("type") == "document_knowledge" else "Chat Signal"
|
|
context_parts.append(
|
|
f"[{source_label} {i+1}] Type: {meta.get('type', 'unknown')} | "
|
|
f"Severity: {meta.get('severity', 'unknown')} | "
|
|
f"Time: {meta.get('timestamp', 'unknown')}\n"
|
|
f"Content: {r['document']}\n"
|
|
f"Entities: {meta.get('entities', '[]')}"
|
|
)
|
|
internal_context = "\n\n".join(context_parts)
|
|
|
|
# Step 2: Decide whether to invoke web search
|
|
web_context = ""
|
|
used_web = False
|
|
|
|
# Determine if internal results are strong enough
|
|
has_strong_internal = (
|
|
len(results) >= 2
|
|
and results[0].get("relevance_score", 0) > 0.5
|
|
)
|
|
|
|
# Heuristics for when web search adds value
|
|
web_keywords = [
|
|
"latest", "current", "best practice", "industry", "how does",
|
|
"compare", "what is", "standard", "benchmark", "trend",
|
|
"security", "vulnerability", "update", "news", "release",
|
|
]
|
|
question_lower = question.lower()
|
|
wants_external = any(kw in question_lower for kw in web_keywords)
|
|
|
|
should_search_web = (
|
|
ENABLE_WEB_SEARCH
|
|
and (force_web_search or not has_strong_internal or wants_external)
|
|
)
|
|
|
|
if should_search_web:
|
|
web_results = await search_web(question, max_results=3)
|
|
if web_results:
|
|
web_context = format_search_results_for_llm(web_results)
|
|
used_web = True
|
|
|
|
# Step 3: Build combined prompt
|
|
if not internal_context and not web_context:
|
|
return "I don't have any information about that in the knowledge base yet, and web search didn't return relevant results. The group needs more conversation for me to learn from."
|
|
|
|
combined_context = ""
|
|
if internal_context:
|
|
combined_context += f"=== INTERNAL KNOWLEDGE BASE (from team conversations & documents) ===\n\n{internal_context}\n\n"
|
|
if web_context:
|
|
combined_context += f"=== WEB SEARCH RESULTS ===\n\n{web_context}\n\n"
|
|
|
|
system_prompt = """You are the Query Agent for ThirdEye. Answer questions using the provided context.
|
|
|
|
RULES:
|
|
1. PRIORITIZE internal knowledge base results — they come from the team's own conversations and documents.
|
|
2. Use web search results to SUPPLEMENT or provide additional context, not to override team decisions.
|
|
3. Clearly distinguish sources: "Based on your team's discussion..." vs "According to web sources..."
|
|
4. If info doesn't exist in any context, say so clearly.
|
|
5. Be concise — 2-4 sentences unless more is needed.
|
|
6. Format for Telegram (plain text, no markdown headers).
|
|
7. If you cite web sources, include the source name (not the full URL)."""
|
|
|
|
messages = [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": f"Context:\n\n{combined_context}\n\nQuestion: {question}"},
|
|
]
|
|
|
|
try:
|
|
result = await call_llm("fast_large", messages, temperature=0.3, max_tokens=600)
|
|
answer = result["content"]
|
|
|
|
# Append a subtle indicator of sources used
|
|
sources = []
|
|
if internal_context:
|
|
sources.append("knowledge base")
|
|
if used_web:
|
|
sources.append("web search")
|
|
answer += f"\n\n📌 Sources: {' + '.join(sources)}"
|
|
|
|
return answer
|
|
except Exception as e:
|
|
logger.error(f"Query agent failed: {e}")
|
|
return "Sorry, I encountered an error while searching. Please try again."
|
|
```
|
|
|
|
### Step 12.3 — Add `/search` command to the bot
|
|
|
|
Open `thirdeye/backend/bot/bot.py` and add:
|
|
|
|
**Add import at the top:**
|
|
```python
|
|
from backend.agents.web_search import search_web, format_search_results_for_llm
|
|
from backend.config import ENABLE_WEB_SEARCH
|
|
```
|
|
|
|
**Add this command handler (after `cmd_lens`):**
|
|
```python
|
|
async def cmd_search(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle /search [query] — explicit web search."""
|
|
if not ENABLE_WEB_SEARCH:
|
|
await update.message.reply_text("🔍 Web search is currently disabled.")
|
|
return
|
|
|
|
if not context.args:
|
|
await update.message.reply_text("Usage: /search [your query]\nExample: /search FastAPI rate limiting best practices")
|
|
return
|
|
|
|
query = " ".join(context.args)
|
|
await update.message.reply_text(f"🌐 Searching the web for: {query}...")
|
|
|
|
try:
|
|
results = await search_web(query, max_results=3)
|
|
if not results:
|
|
await update.message.reply_text("No web results found. Try a different query.")
|
|
return
|
|
|
|
parts = [f"🌐 Web Search: {query}\n"]
|
|
for i, r in enumerate(results):
|
|
snippet = r["content"][:200] + "..." if len(r["content"]) > 200 else r["content"]
|
|
parts.append(f"{i+1}. {r['title']}\n{snippet}\n🔗 {r['url']}\n")
|
|
|
|
await update.message.reply_text("\n".join(parts))
|
|
|
|
except Exception as e:
|
|
await update.message.reply_text(f"Search failed: {str(e)[:100]}")
|
|
```
|
|
|
|
**Register the handler in `run_bot()` — add this line with the other CommandHandlers:**
|
|
```python
|
|
app.add_handler(CommandHandler("search", cmd_search))
|
|
```
|
|
|
|
**Update the `/start` welcome message to include the new commands:**
|
|
```python
|
|
async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Welcome message."""
|
|
await update.message.reply_text(
|
|
"👁️ *ThirdEye* — Conversation Intelligence Engine\n\n"
|
|
"I'm now listening to this group and extracting intelligence from your conversations.\n\n"
|
|
"Commands:\n"
|
|
"/ask [question] — Ask about your team's knowledge\n"
|
|
"/search [query] — Search the web for external info\n"
|
|
"/digest — Get an intelligence summary\n"
|
|
"/lens [mode] — Set detection mode (dev/product/client/community)\n"
|
|
"/alerts — View active warnings\n\n"
|
|
"📄 Share documents (PDF, DOCX, TXT) — I'll ingest them into the knowledge base.\n"
|
|
"🔗 Share links — I'll fetch and store their content.\n\n"
|
|
"I work passively — no need to tag me. I'll alert you when I spot patterns or issues.",
|
|
parse_mode=None
|
|
)
|
|
```
|
|
|
|
### ✅ TEST MILESTONE 12
|
|
|
|
Create file: `thirdeye/scripts/test_m12.py`
|
|
```python
|
|
"""Test Milestone 12: Tavily web search integration."""
|
|
import asyncio, os, sys
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
|
|
async def test_tavily_connection():
|
|
"""Test that Tavily API is reachable and returns results."""
|
|
from backend.agents.web_search import search_web
|
|
|
|
print("Testing Tavily API connection...")
|
|
results = await search_web("FastAPI rate limiting best practices", max_results=3)
|
|
|
|
if not results:
|
|
print(" ⚠️ No results returned (check TAVILY_API_KEY in .env)")
|
|
print(" ⚠️ If key is missing, get one at: https://tavily.com")
|
|
return False
|
|
|
|
assert len(results) > 0, "Expected at least 1 result"
|
|
assert results[0]["title"], "Result missing title"
|
|
assert results[0]["url"], "Result missing URL"
|
|
assert results[0]["content"], "Result missing content"
|
|
|
|
print(f" ✅ Tavily returned {len(results)} results")
|
|
for r in results:
|
|
print(f" - {r['title'][:60]} ({r['url'][:50]}...)")
|
|
|
|
return True
|
|
|
|
|
|
async def test_format_results():
|
|
"""Test result formatting for LLM context."""
|
|
from backend.agents.web_search import search_web, format_search_results_for_llm
|
|
|
|
print("\nTesting result formatting...")
|
|
results = await search_web("Python async programming", max_results=2)
|
|
|
|
if results:
|
|
formatted = format_search_results_for_llm(results)
|
|
assert "[Web Result 1]" in formatted
|
|
assert "Source:" in formatted
|
|
assert len(formatted) > 50
|
|
print(f" ✅ Formatted context: {len(formatted)} chars")
|
|
else:
|
|
print(" ⚠️ Skipped (no results to format)")
|
|
|
|
|
|
async def test_query_with_web_fallback():
|
|
"""Test that query_knowledge uses web search when internal KB is empty."""
|
|
from backend.pipeline import query_knowledge
|
|
|
|
print("\nTesting query with web search fallback...")
|
|
|
|
# Use a group with no data — forces web search fallback
|
|
empty_group = "test_empty_web_m12"
|
|
|
|
answer = await query_knowledge(empty_group, "What is the latest version of Python?")
|
|
print(f" Answer: {answer[:150]}...")
|
|
|
|
# Should have used web search since internal KB is empty
|
|
assert len(answer) > 20, f"Answer too short: {answer}"
|
|
assert "sources" in answer.lower() or "web" in answer.lower() or "python" in answer.lower(), \
|
|
"Expected web-sourced answer about Python"
|
|
print(f" ✅ Web fallback produced a meaningful answer")
|
|
|
|
|
|
async def test_query_prefers_internal():
|
|
"""Test that internal knowledge is preferred over web when available."""
|
|
from backend.pipeline import process_message_batch, query_knowledge, set_lens
|
|
|
|
print("\nTesting internal knowledge priority over web...")
|
|
|
|
group_id = "test_internal_prio_m12"
|
|
set_lens(group_id, "dev")
|
|
|
|
# Seed some very specific internal knowledge
|
|
messages = [
|
|
{"sender": "Alex", "text": "Team decision: We are using Python 3.11 specifically, not 3.12, because of the ML library compatibility issue.", "timestamp": "2026-03-20T10:00:00Z"},
|
|
{"sender": "Priya", "text": "Confirmed, 3.11 is locked in. I've updated the Dockerfile.", "timestamp": "2026-03-20T10:05:00Z"},
|
|
]
|
|
|
|
await process_message_batch(group_id, messages)
|
|
|
|
answer = await query_knowledge(group_id, "What Python version are we using?")
|
|
print(f" Answer: {answer[:150]}...")
|
|
|
|
# Should reference internal knowledge (3.11) not latest web info
|
|
assert "3.11" in answer or "python" in answer.lower(), \
|
|
f"Expected internal knowledge about Python 3.11, got: {answer[:100]}"
|
|
print(f" ✅ Internal knowledge (Python 3.11) is prioritized in answer")
|
|
|
|
# Cleanup
|
|
import chromadb
|
|
from backend.config import CHROMA_DB_PATH
|
|
client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
|
|
try:
|
|
client.delete_collection(f"ll_{group_id}")
|
|
except:
|
|
pass
|
|
|
|
|
|
async def test_explicit_search():
|
|
"""Test the /search style direct web search."""
|
|
from backend.agents.web_search import search_web
|
|
|
|
print("\nTesting explicit web search (for /search command)...")
|
|
results = await search_web("OWASP top 10 2025", max_results=3)
|
|
|
|
if results:
|
|
assert len(results) <= 3
|
|
print(f" ✅ Explicit search returned {len(results)} results")
|
|
for r in results:
|
|
print(f" - {r['title'][:60]}")
|
|
else:
|
|
print(" ⚠️ No results (Tavily key may be missing)")
|
|
|
|
|
|
async def main():
|
|
tavily_ok = await test_tavily_connection()
|
|
|
|
if tavily_ok:
|
|
await test_format_results()
|
|
await test_query_with_web_fallback()
|
|
await test_query_prefers_internal()
|
|
await test_explicit_search()
|
|
print("\n🎉 MILESTONE 12 PASSED — Web search integration working")
|
|
else:
|
|
print("\n⚠️ MILESTONE 12 PARTIAL — Tavily API key not configured")
|
|
print(" The code is correct but needs a valid TAVILY_API_KEY in .env")
|
|
print(" Get one free at: https://tavily.com")
|
|
|
|
asyncio.run(main())
|
|
```
|
|
|
|
Run: `cd thirdeye && python scripts/test_m12.py`
|
|
|
|
**Expected output:** All ✅ checks. Tavily returns results. Internal knowledge is prioritized over web results. Web search fills gaps when knowledge base is empty.
|
|
|
|
---
|
|
|
|
## MILESTONE 13: Link Fetch & Ingestion (115%)
|
|
**Goal:** When a URL is shared in a Telegram group, the bot attempts to fetch the page content, summarize it with an LLM, and store the summary as a `link_knowledge` signal in ChromaDB. Fails gracefully and silently if the link is inaccessible.
|
|
|
|
### Step 13.1 — Create the Link Fetcher
|
|
|
|
Create file: `thirdeye/backend/agents/link_fetcher.py`
|
|
```python
|
|
"""Link Fetcher — extracts, summarizes, and stores content from URLs shared in chat."""
|
|
import re
|
|
import uuid
|
|
import logging
|
|
import asyncio
|
|
from datetime import datetime
|
|
|
|
import httpx
|
|
from bs4 import BeautifulSoup
|
|
|
|
from backend.providers import call_llm
|
|
from backend.config import ENABLE_LINK_FETCH
|
|
|
|
logger = logging.getLogger("thirdeye.agents.link_fetcher")
|
|
|
|
# Patterns to skip (images, downloads, social media embeds, etc.)
|
|
SKIP_PATTERNS = [
|
|
r"\.(png|jpg|jpeg|gif|svg|webp|ico|bmp)(\?.*)?$",
|
|
r"\.(zip|tar|gz|rar|7z|exe|msi|dmg|apk|deb)(\?.*)?$",
|
|
r"\.(mp3|mp4|avi|mov|mkv|wav|flac)(\?.*)?$",
|
|
r"^https?://(www\.)?(twitter|x)\.com/.*/status/",
|
|
r"^https?://(www\.)?instagram\.com/p/",
|
|
r"^https?://(www\.)?tiktok\.com/",
|
|
r"^https?://(www\.)?youtube\.com/shorts/",
|
|
r"^https?://t\.me/", # Other Telegram links
|
|
]
|
|
|
|
SKIP_COMPILED = [re.compile(p, re.IGNORECASE) for p in SKIP_PATTERNS]
|
|
|
|
|
|
def extract_urls(text: str) -> list[str]:
|
|
"""Extract all HTTP/HTTPS URLs from a text string."""
|
|
url_pattern = re.compile(
|
|
r"https?://[^\s<>\"')\]},;]+"
|
|
)
|
|
urls = url_pattern.findall(text)
|
|
|
|
# Clean trailing punctuation
|
|
cleaned = []
|
|
for url in urls:
|
|
url = url.rstrip(".,;:!?)")
|
|
if len(url) > 10:
|
|
cleaned.append(url)
|
|
|
|
return cleaned
|
|
|
|
|
|
def should_fetch(url: str) -> bool:
|
|
"""Decide if a URL is worth fetching (skip images, downloads, social embeds)."""
|
|
for pattern in SKIP_COMPILED:
|
|
if pattern.search(url):
|
|
return False
|
|
return True
|
|
|
|
|
|
async def fetch_url_content(url: str, timeout: float = 15.0) -> dict | None:
|
|
"""
|
|
Fetch a URL and extract main text content.
|
|
|
|
Returns:
|
|
{title, text, url} or None if fetch fails
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
follow_redirects=True,
|
|
timeout=timeout,
|
|
headers={
|
|
"User-Agent": "Mozilla/5.0 (compatible; ThirdEye/1.0; +https://thirdeye.dev)",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
},
|
|
) as client:
|
|
response = await client.get(url)
|
|
|
|
if response.status_code != 200:
|
|
logger.info(f"URL returned {response.status_code}: {url[:80]}")
|
|
return None
|
|
|
|
content_type = response.headers.get("content-type", "")
|
|
if "text/html" not in content_type and "application/xhtml" not in content_type:
|
|
logger.info(f"Skipping non-HTML content ({content_type}): {url[:80]}")
|
|
return None
|
|
|
|
html = response.text
|
|
|
|
except httpx.TimeoutException:
|
|
logger.info(f"URL timed out: {url[:80]}")
|
|
return None
|
|
except Exception as e:
|
|
logger.info(f"URL fetch failed ({type(e).__name__}): {url[:80]}")
|
|
return None
|
|
|
|
# Parse HTML
|
|
try:
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
|
|
# Extract title
|
|
title = ""
|
|
if soup.title and soup.title.string:
|
|
title = soup.title.string.strip()
|
|
|
|
# Remove script, style, nav, footer, header elements
|
|
for tag in soup(["script", "style", "nav", "footer", "header", "aside", "noscript", "form"]):
|
|
tag.decompose()
|
|
|
|
# Try to find main content area
|
|
main = soup.find("main") or soup.find("article") or soup.find("div", {"role": "main"})
|
|
if main:
|
|
text = main.get_text(separator="\n", strip=True)
|
|
else:
|
|
text = soup.get_text(separator="\n", strip=True)
|
|
|
|
# Clean up
|
|
lines = [line.strip() for line in text.split("\n") if line.strip()]
|
|
text = "\n".join(lines)
|
|
|
|
# Skip if too little content
|
|
if len(text) < 100:
|
|
logger.info(f"Too little text content ({len(text)} chars): {url[:80]}")
|
|
return None
|
|
|
|
# Truncate very long content
|
|
if len(text) > 8000:
|
|
text = text[:8000] + "\n\n[Content truncated]"
|
|
|
|
return {
|
|
"title": title or url,
|
|
"text": text,
|
|
"url": url,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.warning(f"HTML parsing failed for {url[:80]}: {e}")
|
|
return None
|
|
|
|
|
|
async def summarize_content(title: str, text: str, url: str) -> str:
|
|
"""Use LLM to create a concise summary of fetched content."""
|
|
# Limit text sent to LLM
|
|
text_preview = text[:3000]
|
|
|
|
messages = [
|
|
{"role": "system", "content": """You are a content summarizer for ThirdEye.
|
|
Given the title and text of a web page, produce a concise 2-4 sentence summary that captures the key information.
|
|
Focus on: main topic, key facts, any actionable insights, any deadlines or decisions mentioned.
|
|
Respond with ONLY the summary text, nothing else."""},
|
|
{"role": "user", "content": f"Title: {title}\nURL: {url}\n\nContent:\n{text_preview}"},
|
|
]
|
|
|
|
try:
|
|
result = await call_llm("fast_small", messages, temperature=0.2, max_tokens=300)
|
|
return result["content"].strip()
|
|
except Exception as e:
|
|
logger.warning(f"Link summarization failed: {e}")
|
|
# Fallback: use first 200 chars of text
|
|
return text[:200] + "..."
|
|
|
|
|
|
async def process_links_from_message(
|
|
text: str,
|
|
group_id: str,
|
|
shared_by: str = "Unknown",
|
|
) -> list[dict]:
|
|
"""
|
|
Full pipeline: extract URLs from message → fetch → summarize → produce signals.
|
|
|
|
Designed to be called in the background (non-blocking to the main message pipeline).
|
|
|
|
Returns:
|
|
List of signal dicts ready for store_signals()
|
|
"""
|
|
if not ENABLE_LINK_FETCH:
|
|
return []
|
|
|
|
urls = extract_urls(text)
|
|
fetchable = [u for u in urls if should_fetch(u)]
|
|
|
|
if not fetchable:
|
|
return []
|
|
|
|
signals = []
|
|
|
|
# Process up to 3 links per message to avoid overload
|
|
for url in fetchable[:3]:
|
|
try:
|
|
content = await fetch_url_content(url)
|
|
if not content:
|
|
continue
|
|
|
|
summary = await summarize_content(content["title"], content["text"], url)
|
|
|
|
signal = {
|
|
"id": str(uuid.uuid4()),
|
|
"type": "link_knowledge",
|
|
"summary": f"[Link: {content['title'][:80]}] {summary[:200]}",
|
|
"entities": [f"@{shared_by}", url[:100]],
|
|
"severity": "low",
|
|
"status": "reference",
|
|
"sentiment": "neutral",
|
|
"urgency": "none",
|
|
"raw_quote": summary,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"group_id": group_id,
|
|
"lens": "link",
|
|
"keywords": [content["title"][:50], "link", "web", shared_by],
|
|
}
|
|
signals.append(signal)
|
|
logger.info(f"Link ingested: {content['title'][:50]} ({url[:60]})")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Link processing failed for {url[:60]}: {e}")
|
|
continue
|
|
|
|
return signals
|
|
```
|
|
|
|
### Step 13.2 — Integrate link fetching into the Telegram bot
|
|
|
|
Open `thirdeye/backend/bot/bot.py` and add:
|
|
|
|
**Add import at the top:**
|
|
```python
|
|
from backend.agents.link_fetcher import extract_urls, process_links_from_message
|
|
from backend.config import ENABLE_LINK_FETCH
|
|
```
|
|
|
|
**Modify the existing `handle_message` function** to add link detection at the end. Replace the entire `handle_message` function with:
|
|
|
|
```python
|
|
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Process every text message in groups."""
|
|
if not update.message or not update.message.text:
|
|
return
|
|
if not update.message.chat.type in ("group", "supergroup"):
|
|
return
|
|
|
|
group_id = str(update.message.chat_id)
|
|
_group_names[group_id] = update.message.chat.title or group_id
|
|
text = update.message.text
|
|
sender = update.message.from_user.first_name or update.message.from_user.username or "Unknown"
|
|
|
|
msg = {
|
|
"sender": sender,
|
|
"text": text,
|
|
"timestamp": update.message.date.isoformat(),
|
|
"message_id": update.message.message_id,
|
|
}
|
|
|
|
_buffers[group_id].append(msg)
|
|
|
|
# Process when buffer reaches batch size
|
|
if len(_buffers[group_id]) >= BATCH_SIZE:
|
|
batch = _buffers[group_id]
|
|
_buffers[group_id] = []
|
|
|
|
try:
|
|
signals = await process_message_batch(group_id, batch)
|
|
if signals:
|
|
logger.info(f"Processed batch: {len(signals)} signals from {_group_names.get(group_id, group_id)}")
|
|
except Exception as e:
|
|
logger.error(f"Pipeline error: {e}")
|
|
|
|
# Background: process links if message contains URLs
|
|
if ENABLE_LINK_FETCH and extract_urls(text):
|
|
asyncio.create_task(_process_links_background(text, group_id, sender))
|
|
|
|
|
|
async def _process_links_background(text: str, group_id: str, sender: str):
|
|
"""Process links from a message in the background (non-blocking)."""
|
|
try:
|
|
link_signals = await process_links_from_message(text, group_id, shared_by=sender)
|
|
if link_signals:
|
|
store_signals(group_id, link_signals)
|
|
logger.info(f"Stored {len(link_signals)} link signals for {group_id}")
|
|
except Exception as e:
|
|
logger.error(f"Background link processing failed: {e}")
|
|
```
|
|
|
|
### ✅ TEST MILESTONE 13
|
|
|
|
Create file: `thirdeye/scripts/test_m13.py`
|
|
```python
|
|
"""Test Milestone 13: Link fetch & ingestion."""
|
|
import asyncio, os, sys
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
|
|
def test_url_extraction():
|
|
"""Test URL extraction from message text."""
|
|
from backend.agents.link_fetcher import extract_urls
|
|
|
|
print("Testing URL extraction...")
|
|
|
|
# Test 1: Simple URL
|
|
urls = extract_urls("Check this out https://example.com/article")
|
|
assert len(urls) == 1
|
|
assert urls[0] == "https://example.com/article"
|
|
print(f" ✅ Simple URL extracted")
|
|
|
|
# Test 2: Multiple URLs
|
|
urls = extract_urls("See https://github.com/issue/123 and also https://docs.python.org/3/library/asyncio.html for reference")
|
|
assert len(urls) == 2
|
|
print(f" ✅ Multiple URLs extracted: {len(urls)}")
|
|
|
|
# Test 3: URL with trailing punctuation
|
|
urls = extract_urls("Visit https://example.com/page.")
|
|
assert len(urls) == 1
|
|
assert not urls[0].endswith(".")
|
|
print(f" ✅ Trailing punctuation stripped")
|
|
|
|
# Test 4: No URLs
|
|
urls = extract_urls("This message has no links at all")
|
|
assert len(urls) == 0
|
|
print(f" ✅ No URLs returns empty list")
|
|
|
|
# Test 5: URL with query params
|
|
urls = extract_urls("https://example.com/search?q=test&page=2")
|
|
assert len(urls) == 1
|
|
assert "q=test" in urls[0]
|
|
print(f" ✅ URL with query params preserved")
|
|
|
|
|
|
def test_should_fetch():
|
|
"""Test URL filtering logic."""
|
|
from backend.agents.link_fetcher import should_fetch
|
|
|
|
print("\nTesting URL filter (should_fetch)...")
|
|
|
|
# Should fetch
|
|
assert should_fetch("https://github.com/org/repo/issues/347") == True
|
|
assert should_fetch("https://docs.python.org/3/library/asyncio.html") == True
|
|
assert should_fetch("https://blog.example.com/how-to-rate-limit") == True
|
|
print(f" ✅ Valid URLs pass filter")
|
|
|
|
# Should NOT fetch
|
|
assert should_fetch("https://example.com/photo.png") == False
|
|
assert should_fetch("https://example.com/image.jpg?size=large") == False
|
|
assert should_fetch("https://example.com/release.zip") == False
|
|
assert should_fetch("https://example.com/video.mp4") == False
|
|
print(f" ✅ Image/download/media URLs filtered out")
|
|
|
|
# Social media skips
|
|
assert should_fetch("https://t.me/somechannel/123") == False
|
|
print(f" ✅ Social media URLs filtered out")
|
|
|
|
|
|
async def test_fetch_content():
|
|
"""Test fetching actual web page content."""
|
|
from backend.agents.link_fetcher import fetch_url_content
|
|
|
|
print("\nTesting URL content fetch...")
|
|
|
|
# Test 1: Fetch a reliable public page
|
|
content = await fetch_url_content("https://httpbin.org/html")
|
|
if content:
|
|
assert content["text"], "Expected text content"
|
|
assert content["url"] == "https://httpbin.org/html"
|
|
print(f" ✅ Fetched httpbin.org/html: {len(content['text'])} chars, title='{content['title'][:40]}'")
|
|
else:
|
|
print(f" ⚠️ httpbin.org unreachable (network may be restricted)")
|
|
|
|
# Test 2: Graceful failure on non-existent page
|
|
content = await fetch_url_content("https://httpbin.org/status/404")
|
|
assert content is None, "Expected None for 404 page"
|
|
print(f" ✅ 404 page returns None (graceful failure)")
|
|
|
|
# Test 3: Graceful failure on timeout
|
|
content = await fetch_url_content("https://httpbin.org/delay/30", timeout=2.0)
|
|
assert content is None, "Expected None for timeout"
|
|
print(f" ✅ Timeout returns None (graceful failure)")
|
|
|
|
# Test 4: Graceful failure on invalid domain
|
|
content = await fetch_url_content("https://this-domain-definitely-does-not-exist-12345.com")
|
|
assert content is None, "Expected None for invalid domain"
|
|
print(f" ✅ Invalid domain returns None (graceful failure)")
|
|
|
|
|
|
async def test_summarization():
|
|
"""Test LLM summarization of fetched content."""
|
|
from backend.agents.link_fetcher import summarize_content
|
|
|
|
print("\nTesting content summarization...")
|
|
|
|
sample_title = "Understanding Rate Limiting in FastAPI"
|
|
sample_text = """Rate limiting is a technique to control the number of requests a client can make to an API.
|
|
In FastAPI, you can implement rate limiting using middleware or third-party packages like slowapi.
|
|
The most common approach is the token bucket algorithm, which allows burst traffic while maintaining
|
|
an average rate. For production systems, consider using Redis as a backend for distributed rate limiting
|
|
across multiple server instances. Key considerations include: setting appropriate limits per endpoint,
|
|
using different limits for authenticated vs anonymous users, and returning proper 429 status codes
|
|
with Retry-After headers."""
|
|
|
|
summary = await summarize_content(sample_title, sample_text, "https://example.com/rate-limiting")
|
|
assert len(summary) > 20, f"Summary too short: {summary}"
|
|
assert len(summary) < 1000, f"Summary too long: {len(summary)} chars"
|
|
print(f" ✅ Summary generated: {summary[:100]}...")
|
|
|
|
|
|
async def test_full_link_pipeline():
|
|
"""Test full pipeline: message with URL → fetch → summarize → store → query."""
|
|
from backend.agents.link_fetcher import process_links_from_message
|
|
from backend.db.chroma import store_signals, query_signals
|
|
|
|
print("\nTesting full link ingestion pipeline...")
|
|
|
|
group_id = "test_links_m13"
|
|
|
|
# Simulate a message with a URL
|
|
# Using httpbin.org/html which returns a simple HTML page
|
|
message_text = "Check out this page for reference: https://httpbin.org/html"
|
|
|
|
signals = await process_links_from_message(message_text, group_id, shared_by="Sam")
|
|
|
|
if signals:
|
|
assert len(signals) > 0
|
|
assert signals[0]["type"] == "link_knowledge"
|
|
assert signals[0]["group_id"] == group_id
|
|
assert "@Sam" in signals[0]["entities"]
|
|
print(f" ✅ Link pipeline produced {len(signals)} signals")
|
|
|
|
# Store and query
|
|
store_signals(group_id, signals)
|
|
results = query_signals(group_id, "what was shared from the web")
|
|
assert len(results) > 0, "Expected query results after storing link signals"
|
|
print(f" ✅ Link signals stored and queryable ({len(results)} results)")
|
|
|
|
# Cleanup
|
|
import chromadb
|
|
from backend.config import CHROMA_DB_PATH
|
|
client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
|
|
try:
|
|
client.delete_collection(f"ll_{group_id}")
|
|
except:
|
|
pass
|
|
else:
|
|
print(f" ⚠️ No signals produced (httpbin.org may be unreachable in this environment)")
|
|
|
|
|
|
async def test_mixed_with_chat_and_docs():
|
|
"""Test that link signals coexist with chat and document signals."""
|
|
from backend.agents.link_fetcher import process_links_from_message
|
|
from backend.agents.document_ingestor import ingest_document
|
|
from backend.pipeline import process_message_batch, query_knowledge, set_lens
|
|
from backend.db.chroma import store_signals
|
|
import tempfile
|
|
|
|
print("\nTesting all three signal types together...")
|
|
|
|
group_id = "test_all_sources_m13"
|
|
set_lens(group_id, "dev")
|
|
|
|
# 1. Chat signals
|
|
chat_messages = [
|
|
{"sender": "Alex", "text": "We decided to use PostgreSQL for the main DB.", "timestamp": "2026-03-20T10:00:00Z"},
|
|
{"sender": "Priya", "text": "I'll set up the schema and run migrations today.", "timestamp": "2026-03-20T10:05:00Z"},
|
|
]
|
|
await process_message_batch(group_id, chat_messages)
|
|
print(f" ✅ Chat signals stored")
|
|
|
|
# 2. Document signals
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".txt", mode="w", delete=False, encoding="utf-8")
|
|
tmp.write("Security Policy: All API endpoints must use OAuth 2.0. JWT tokens expire after 1 hour.")
|
|
tmp.close()
|
|
doc_signals = ingest_document(tmp.name, group_id, shared_by="Priya", filename="security_policy.txt")
|
|
store_signals(group_id, doc_signals)
|
|
os.unlink(tmp.name)
|
|
print(f" ✅ Document signals stored")
|
|
|
|
# 3. Link signals
|
|
link_signals = await process_links_from_message(
|
|
"Relevant: https://httpbin.org/html",
|
|
group_id,
|
|
shared_by="Sam"
|
|
)
|
|
if link_signals:
|
|
store_signals(group_id, link_signals)
|
|
print(f" ✅ Link signals stored")
|
|
else:
|
|
print(f" ⚠️ Link signals skipped (network restriction)")
|
|
|
|
# 4. Query across all sources
|
|
answer = await query_knowledge(group_id, "What database are we using?")
|
|
assert "postgres" in answer.lower() or "database" in answer.lower()
|
|
print(f" ✅ Chat knowledge queryable: {answer[:80]}...")
|
|
|
|
answer2 = await query_knowledge(group_id, "What is the security policy?")
|
|
assert "oauth" in answer2.lower() or "jwt" in answer2.lower() or "security" in answer2.lower()
|
|
print(f" ✅ Document knowledge queryable: {answer2[:80]}...")
|
|
|
|
# Cleanup
|
|
import chromadb
|
|
from backend.config import CHROMA_DB_PATH
|
|
client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
|
|
try:
|
|
client.delete_collection(f"ll_{group_id}")
|
|
except:
|
|
pass
|
|
|
|
print(f" ✅ All three signal types coexist and are queryable")
|
|
|
|
|
|
async def main():
|
|
test_url_extraction()
|
|
test_should_fetch()
|
|
await test_fetch_content()
|
|
await test_summarization()
|
|
await test_full_link_pipeline()
|
|
await test_mixed_with_chat_and_docs()
|
|
print("\n🎉 MILESTONE 13 PASSED — Link fetch & ingestion working")
|
|
|
|
asyncio.run(main())
|
|
```
|
|
|
|
Run: `cd thirdeye && python scripts/test_m13.py`
|
|
|
|
**Expected output:** All ✅ checks. URLs are extracted, content is fetched (with graceful failures for 404/timeout/invalid), summaries are generated, signals are stored, and they're queryable alongside chat and document signals.
|
|
|
|
---
|
|
|
|
## MILESTONE SUMMARY (Updated)
|
|
|
|
| # | Milestone | What You Have | % |
|
|
|---|---|---|---|
|
|
| 0 | Scaffolding | Folders, deps, env vars, all API keys | 0% |
|
|
| 1 | Provider Router | Multi-provider LLM calls with fallback | 10% |
|
|
| 2 | ChromaDB + Embeddings | Store and retrieve signals with vector search | 20% |
|
|
| 3 | Core Agents | Signal Extractor + Classifier + Context Detector | 30% |
|
|
| 4 | Full Pipeline | Messages → Extract → Classify → Store → Query | 45% |
|
|
| 5 | Intelligence Layer | Pattern detection + Cross-group analysis | 60% |
|
|
| 6 | Telegram Bot | Live bot processing group messages | 70% |
|
|
| 7 | FastAPI + Dashboard API | REST API serving all data | 85% |
|
|
| 8 | Unified Runner | Bot + API running together | 90% |
|
|
| 9 | Demo Data | 3 groups seeded with realistic data | 95% |
|
|
| 10 | Polish & Demo Ready | README, rehearsed demo, everything working | 100% |
|
|
| **11** | **Document & PDF Ingestion** | **PDFs/DOCX/TXT shared in groups → chunked → stored in RAG** | **105%** |
|
|
| **12** | **Tavily Web Search** | **Query Agent searches web when KB is empty or question is external** | **110%** |
|
|
| **13** | **Link Fetch & Ingestion** | **URLs in messages → fetched → summarized → stored as signals** | **115%** |
|
|
|
|
---
|
|
|
|
## FILE CHANGE SUMMARY
|
|
|
|
### New Files Created
|
|
```
|
|
thirdeye/backend/agents/document_ingestor.py # Milestone 11
|
|
thirdeye/backend/agents/web_search.py # Milestone 12
|
|
thirdeye/backend/agents/link_fetcher.py # Milestone 13
|
|
thirdeye/scripts/test_m11.py # Milestone 11 test
|
|
thirdeye/scripts/test_m12.py # Milestone 12 test
|
|
thirdeye/scripts/test_m13.py # Milestone 13 test
|
|
```
|
|
|
|
### Existing Files Modified
|
|
```
|
|
thirdeye/requirements.txt # Pre-work: 4 new deps
|
|
thirdeye/.env # Pre-work: TAVILY_API_KEY + feature flags
|
|
thirdeye/backend/config.py # Pre-work: new config vars
|
|
thirdeye/backend/bot/bot.py # M11: handle_document, M12: cmd_search, M13: link detection
|
|
thirdeye/backend/pipeline.py # M12: updated query_knowledge with web search
|
|
```
|
|
|
|
### Updated Repo Structure (additions only)
|
|
```
|
|
thirdeye/
|
|
├── backend/
|
|
│ ├── agents/
|
|
│ │ ├── document_ingestor.py # NEW — PDF/DOCX/TXT extraction + chunking
|
|
│ │ ├── web_search.py # NEW — Tavily web search integration
|
|
│ │ └── link_fetcher.py # NEW — URL extraction, fetch, summarize
|
|
│ └── bot/
|
|
│ └── bot.py # MODIFIED — document handler, /search cmd, link detection
|
|
│
|
|
└── scripts/
|
|
├── test_m11.py # NEW — document ingestion tests
|
|
├── test_m12.py # NEW — web search tests
|
|
└── test_m13.py # NEW — link fetch tests
|
|
```
|
|
|
|
---
|
|
|
|
## UPDATED COMMANDS REFERENCE
|
|
|
|
```
|
|
/start — Welcome message (updated with new features)
|
|
/ask [q] — Query knowledge base (now with web search fallback)
|
|
/search [q] — NEW: Explicit web search via Tavily
|
|
/digest — Intelligence summary
|
|
/lens [mode] — Set/check detection lens
|
|
/alerts — View active warnings
|
|
|
|
PASSIVE (no command needed):
|
|
• Text messages → batched → signal extraction (existing)
|
|
• Document drops → downloaded → chunked → stored (NEW)
|
|
• URLs in messages → fetched → summarized → stored (NEW)
|
|
```
|
|
|
|
---
|
|
|
|
*Every milestone has a test. Every test must pass. No skipping.* |