Files
B.Tech-Project-III/thirdeye/scripts/test_m13.py
2026-04-05 00:43:23 +05:30

230 lines
9.1 KiB
Python

"""Test Milestone 13: Link fetch & ingestion."""
import asyncio, os, sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
def test_url_extraction():
"""Test URL extraction from message text."""
from backend.agents.link_fetcher import extract_urls
print("Testing URL extraction...")
# Test 1: Simple URL
urls = extract_urls("Check this out https://example.com/article")
assert len(urls) == 1
assert urls[0] == "https://example.com/article"
print(f" ✅ Simple URL extracted")
# Test 2: Multiple URLs
urls = extract_urls("See https://github.com/issue/123 and also https://docs.python.org/3/library/asyncio.html for reference")
assert len(urls) == 2
print(f" ✅ Multiple URLs extracted: {len(urls)}")
# Test 3: URL with trailing punctuation
urls = extract_urls("Visit https://example.com/page.")
assert len(urls) == 1
assert not urls[0].endswith(".")
print(f" ✅ Trailing punctuation stripped")
# Test 4: No URLs
urls = extract_urls("This message has no links at all")
assert len(urls) == 0
print(f" ✅ No URLs returns empty list")
# Test 5: URL with query params
urls = extract_urls("https://example.com/search?q=test&page=2")
assert len(urls) == 1
assert "q=test" in urls[0]
print(f" ✅ URL with query params preserved")
def test_should_fetch():
"""Test URL filtering logic."""
from backend.agents.link_fetcher import should_fetch
print("\nTesting URL filter (should_fetch)...")
# Should fetch
assert should_fetch("https://github.com/org/repo/issues/347") == True
assert should_fetch("https://docs.python.org/3/library/asyncio.html") == True
assert should_fetch("https://blog.example.com/how-to-rate-limit") == True
print(f" ✅ Valid URLs pass filter")
# Should NOT fetch
assert should_fetch("https://example.com/photo.png") == False
assert should_fetch("https://example.com/image.jpg?size=large") == False
assert should_fetch("https://example.com/release.zip") == False
assert should_fetch("https://example.com/video.mp4") == False
print(f" ✅ Image/download/media URLs filtered out")
# Social media skips
assert should_fetch("https://t.me/somechannel/123") == False
print(f" ✅ Social media URLs filtered out")
async def test_fetch_content():
"""Test fetching actual web page content."""
from backend.agents.link_fetcher import fetch_url_content
print("\nTesting URL content fetch...")
# Test 1: Fetch a reliable public page
content = await fetch_url_content("https://httpbin.org/html")
if content:
assert content["text"], "Expected text content"
assert content["url"] == "https://httpbin.org/html"
print(f" ✅ Fetched httpbin.org/html: {len(content['text'])} chars, title='{content['title'][:40]}'")
else:
print(f" ⚠️ httpbin.org unreachable (network may be restricted)")
# Test 2: Graceful failure on non-existent page
content = await fetch_url_content("https://httpbin.org/status/404")
assert content is None, "Expected None for 404 page"
print(f" ✅ 404 page returns None (graceful failure)")
# Test 3: Graceful failure on timeout
content = await fetch_url_content("https://httpbin.org/delay/30", timeout=2.0)
assert content is None, "Expected None for timeout"
print(f" ✅ Timeout returns None (graceful failure)")
# Test 4: Graceful failure on invalid domain
content = await fetch_url_content("https://this-domain-definitely-does-not-exist-12345.com")
assert content is None, "Expected None for invalid domain"
print(f" ✅ Invalid domain returns None (graceful failure)")
async def test_summarization():
"""Test LLM summarization of fetched content."""
from backend.agents.link_fetcher import summarize_content
print("\nTesting content summarization...")
sample_title = "Understanding Rate Limiting in FastAPI"
sample_text = """Rate limiting is a technique to control the number of requests a client can make to an API.
In FastAPI, you can implement rate limiting using middleware or third-party packages like slowapi.
The most common approach is the token bucket algorithm, which allows burst traffic while maintaining
an average rate. For production systems, consider using Redis as a backend for distributed rate limiting
across multiple server instances. Key considerations include: setting appropriate limits per endpoint,
using different limits for authenticated vs anonymous users, and returning proper 429 status codes
with Retry-After headers."""
summary = await summarize_content(sample_title, sample_text, "https://example.com/rate-limiting")
assert len(summary) > 20, f"Summary too short: {summary}"
assert len(summary) < 1000, f"Summary too long: {len(summary)} chars"
print(f" ✅ Summary generated: {summary[:100]}...")
async def test_full_link_pipeline():
"""Test full pipeline: message with URL → fetch → summarize → store → query."""
from backend.agents.link_fetcher import process_links_from_message
from backend.db.chroma import store_signals, query_signals
print("\nTesting full link ingestion pipeline...")
group_id = "test_links_m13"
# Simulate a message with a URL
# Using httpbin.org/html which returns a simple HTML page
message_text = "Check out this page for reference: https://httpbin.org/html"
signals = await process_links_from_message(message_text, group_id, shared_by="Sam")
if signals:
assert len(signals) > 0
assert signals[0]["type"] == "link_knowledge"
assert signals[0]["group_id"] == group_id
assert "@Sam" in signals[0]["entities"]
print(f" ✅ Link pipeline produced {len(signals)} signals")
# Store and query
store_signals(group_id, signals)
results = query_signals(group_id, "what was shared from the web")
assert len(results) > 0, "Expected query results after storing link signals"
print(f" ✅ Link signals stored and queryable ({len(results)} results)")
# Cleanup
import chromadb
from backend.config import CHROMA_DB_PATH
client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
try:
client.delete_collection(f"ll_{group_id}")
except:
pass
else:
print(f" ⚠️ No signals produced (httpbin.org may be unreachable in this environment)")
async def test_mixed_with_chat_and_docs():
"""Test that link signals coexist with chat and document signals."""
from backend.agents.link_fetcher import process_links_from_message
from backend.agents.document_ingestor import ingest_document
from backend.pipeline import process_message_batch, query_knowledge, set_lens
from backend.db.chroma import store_signals
import tempfile
print("\nTesting all three signal types together...")
group_id = "test_all_sources_m13"
set_lens(group_id, "dev")
# 1. Chat signals
chat_messages = [
{"sender": "Alex", "text": "We decided to use PostgreSQL for the main DB.", "timestamp": "2026-03-20T10:00:00Z"},
{"sender": "Priya", "text": "I'll set up the schema and run migrations today.", "timestamp": "2026-03-20T10:05:00Z"},
]
await process_message_batch(group_id, chat_messages)
print(f" ✅ Chat signals stored")
# 2. Document signals
tmp = tempfile.NamedTemporaryFile(suffix=".txt", mode="w", delete=False, encoding="utf-8")
tmp.write("Security Policy: All API endpoints must use OAuth 2.0. JWT tokens expire after 1 hour.")
tmp.close()
doc_signals = ingest_document(tmp.name, group_id, shared_by="Priya", filename="security_policy.txt")
store_signals(group_id, doc_signals)
os.unlink(tmp.name)
print(f" ✅ Document signals stored")
# 3. Link signals
link_signals = await process_links_from_message(
"Relevant: https://httpbin.org/html",
group_id,
shared_by="Sam"
)
if link_signals:
store_signals(group_id, link_signals)
print(f" ✅ Link signals stored")
else:
print(f" ⚠️ Link signals skipped (network restriction)")
# 4. Query across all sources
answer = await query_knowledge(group_id, "What database are we using?")
assert "postgres" in answer.lower() or "database" in answer.lower()
print(f" ✅ Chat knowledge queryable: {answer[:80]}...")
answer2 = await query_knowledge(group_id, "What is the security policy?")
assert "oauth" in answer2.lower() or "jwt" in answer2.lower() or "security" in answer2.lower()
print(f" ✅ Document knowledge queryable: {answer2[:80]}...")
# Cleanup
import chromadb
from backend.config import CHROMA_DB_PATH
client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
try:
client.delete_collection(f"ll_{group_id}")
except:
pass
print(f" ✅ All three signal types coexist and are queryable")
async def main():
test_url_extraction()
test_should_fetch()
await test_fetch_content()
await test_summarization()
await test_full_link_pipeline()
await test_mixed_with_chat_and_docs()
print("\n🎉 MILESTONE 13 PASSED — Link fetch & ingestion working")
asyncio.run(main())