mirror of
https://github.com/arkorty/B.Tech-Project-III.git
synced 2026-04-19 12:41:48 +00:00
230 lines
9.1 KiB
Python
230 lines
9.1 KiB
Python
"""Test Milestone 13: Link fetch & ingestion."""
|
|
import asyncio, os, sys
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
|
|
def test_url_extraction():
|
|
"""Test URL extraction from message text."""
|
|
from backend.agents.link_fetcher import extract_urls
|
|
|
|
print("Testing URL extraction...")
|
|
|
|
# Test 1: Simple URL
|
|
urls = extract_urls("Check this out https://example.com/article")
|
|
assert len(urls) == 1
|
|
assert urls[0] == "https://example.com/article"
|
|
print(f" ✅ Simple URL extracted")
|
|
|
|
# Test 2: Multiple URLs
|
|
urls = extract_urls("See https://github.com/issue/123 and also https://docs.python.org/3/library/asyncio.html for reference")
|
|
assert len(urls) == 2
|
|
print(f" ✅ Multiple URLs extracted: {len(urls)}")
|
|
|
|
# Test 3: URL with trailing punctuation
|
|
urls = extract_urls("Visit https://example.com/page.")
|
|
assert len(urls) == 1
|
|
assert not urls[0].endswith(".")
|
|
print(f" ✅ Trailing punctuation stripped")
|
|
|
|
# Test 4: No URLs
|
|
urls = extract_urls("This message has no links at all")
|
|
assert len(urls) == 0
|
|
print(f" ✅ No URLs returns empty list")
|
|
|
|
# Test 5: URL with query params
|
|
urls = extract_urls("https://example.com/search?q=test&page=2")
|
|
assert len(urls) == 1
|
|
assert "q=test" in urls[0]
|
|
print(f" ✅ URL with query params preserved")
|
|
|
|
|
|
def test_should_fetch():
|
|
"""Test URL filtering logic."""
|
|
from backend.agents.link_fetcher import should_fetch
|
|
|
|
print("\nTesting URL filter (should_fetch)...")
|
|
|
|
# Should fetch
|
|
assert should_fetch("https://github.com/org/repo/issues/347") == True
|
|
assert should_fetch("https://docs.python.org/3/library/asyncio.html") == True
|
|
assert should_fetch("https://blog.example.com/how-to-rate-limit") == True
|
|
print(f" ✅ Valid URLs pass filter")
|
|
|
|
# Should NOT fetch
|
|
assert should_fetch("https://example.com/photo.png") == False
|
|
assert should_fetch("https://example.com/image.jpg?size=large") == False
|
|
assert should_fetch("https://example.com/release.zip") == False
|
|
assert should_fetch("https://example.com/video.mp4") == False
|
|
print(f" ✅ Image/download/media URLs filtered out")
|
|
|
|
# Social media skips
|
|
assert should_fetch("https://t.me/somechannel/123") == False
|
|
print(f" ✅ Social media URLs filtered out")
|
|
|
|
|
|
async def test_fetch_content():
|
|
"""Test fetching actual web page content."""
|
|
from backend.agents.link_fetcher import fetch_url_content
|
|
|
|
print("\nTesting URL content fetch...")
|
|
|
|
# Test 1: Fetch a reliable public page
|
|
content = await fetch_url_content("https://httpbin.org/html")
|
|
if content:
|
|
assert content["text"], "Expected text content"
|
|
assert content["url"] == "https://httpbin.org/html"
|
|
print(f" ✅ Fetched httpbin.org/html: {len(content['text'])} chars, title='{content['title'][:40]}'")
|
|
else:
|
|
print(f" ⚠️ httpbin.org unreachable (network may be restricted)")
|
|
|
|
# Test 2: Graceful failure on non-existent page
|
|
content = await fetch_url_content("https://httpbin.org/status/404")
|
|
assert content is None, "Expected None for 404 page"
|
|
print(f" ✅ 404 page returns None (graceful failure)")
|
|
|
|
# Test 3: Graceful failure on timeout
|
|
content = await fetch_url_content("https://httpbin.org/delay/30", timeout=2.0)
|
|
assert content is None, "Expected None for timeout"
|
|
print(f" ✅ Timeout returns None (graceful failure)")
|
|
|
|
# Test 4: Graceful failure on invalid domain
|
|
content = await fetch_url_content("https://this-domain-definitely-does-not-exist-12345.com")
|
|
assert content is None, "Expected None for invalid domain"
|
|
print(f" ✅ Invalid domain returns None (graceful failure)")
|
|
|
|
|
|
async def test_summarization():
|
|
"""Test LLM summarization of fetched content."""
|
|
from backend.agents.link_fetcher import summarize_content
|
|
|
|
print("\nTesting content summarization...")
|
|
|
|
sample_title = "Understanding Rate Limiting in FastAPI"
|
|
sample_text = """Rate limiting is a technique to control the number of requests a client can make to an API.
|
|
In FastAPI, you can implement rate limiting using middleware or third-party packages like slowapi.
|
|
The most common approach is the token bucket algorithm, which allows burst traffic while maintaining
|
|
an average rate. For production systems, consider using Redis as a backend for distributed rate limiting
|
|
across multiple server instances. Key considerations include: setting appropriate limits per endpoint,
|
|
using different limits for authenticated vs anonymous users, and returning proper 429 status codes
|
|
with Retry-After headers."""
|
|
|
|
summary = await summarize_content(sample_title, sample_text, "https://example.com/rate-limiting")
|
|
assert len(summary) > 20, f"Summary too short: {summary}"
|
|
assert len(summary) < 1000, f"Summary too long: {len(summary)} chars"
|
|
print(f" ✅ Summary generated: {summary[:100]}...")
|
|
|
|
|
|
async def test_full_link_pipeline():
|
|
"""Test full pipeline: message with URL → fetch → summarize → store → query."""
|
|
from backend.agents.link_fetcher import process_links_from_message
|
|
from backend.db.chroma import store_signals, query_signals
|
|
|
|
print("\nTesting full link ingestion pipeline...")
|
|
|
|
group_id = "test_links_m13"
|
|
|
|
# Simulate a message with a URL
|
|
# Using httpbin.org/html which returns a simple HTML page
|
|
message_text = "Check out this page for reference: https://httpbin.org/html"
|
|
|
|
signals = await process_links_from_message(message_text, group_id, shared_by="Sam")
|
|
|
|
if signals:
|
|
assert len(signals) > 0
|
|
assert signals[0]["type"] == "link_knowledge"
|
|
assert signals[0]["group_id"] == group_id
|
|
assert "@Sam" in signals[0]["entities"]
|
|
print(f" ✅ Link pipeline produced {len(signals)} signals")
|
|
|
|
# Store and query
|
|
store_signals(group_id, signals)
|
|
results = query_signals(group_id, "what was shared from the web")
|
|
assert len(results) > 0, "Expected query results after storing link signals"
|
|
print(f" ✅ Link signals stored and queryable ({len(results)} results)")
|
|
|
|
# Cleanup
|
|
import chromadb
|
|
from backend.config import CHROMA_DB_PATH
|
|
client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
|
|
try:
|
|
client.delete_collection(f"ll_{group_id}")
|
|
except:
|
|
pass
|
|
else:
|
|
print(f" ⚠️ No signals produced (httpbin.org may be unreachable in this environment)")
|
|
|
|
|
|
async def test_mixed_with_chat_and_docs():
|
|
"""Test that link signals coexist with chat and document signals."""
|
|
from backend.agents.link_fetcher import process_links_from_message
|
|
from backend.agents.document_ingestor import ingest_document
|
|
from backend.pipeline import process_message_batch, query_knowledge, set_lens
|
|
from backend.db.chroma import store_signals
|
|
import tempfile
|
|
|
|
print("\nTesting all three signal types together...")
|
|
|
|
group_id = "test_all_sources_m13"
|
|
set_lens(group_id, "dev")
|
|
|
|
# 1. Chat signals
|
|
chat_messages = [
|
|
{"sender": "Alex", "text": "We decided to use PostgreSQL for the main DB.", "timestamp": "2026-03-20T10:00:00Z"},
|
|
{"sender": "Priya", "text": "I'll set up the schema and run migrations today.", "timestamp": "2026-03-20T10:05:00Z"},
|
|
]
|
|
await process_message_batch(group_id, chat_messages)
|
|
print(f" ✅ Chat signals stored")
|
|
|
|
# 2. Document signals
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".txt", mode="w", delete=False, encoding="utf-8")
|
|
tmp.write("Security Policy: All API endpoints must use OAuth 2.0. JWT tokens expire after 1 hour.")
|
|
tmp.close()
|
|
doc_signals = ingest_document(tmp.name, group_id, shared_by="Priya", filename="security_policy.txt")
|
|
store_signals(group_id, doc_signals)
|
|
os.unlink(tmp.name)
|
|
print(f" ✅ Document signals stored")
|
|
|
|
# 3. Link signals
|
|
link_signals = await process_links_from_message(
|
|
"Relevant: https://httpbin.org/html",
|
|
group_id,
|
|
shared_by="Sam"
|
|
)
|
|
if link_signals:
|
|
store_signals(group_id, link_signals)
|
|
print(f" ✅ Link signals stored")
|
|
else:
|
|
print(f" ⚠️ Link signals skipped (network restriction)")
|
|
|
|
# 4. Query across all sources
|
|
answer = await query_knowledge(group_id, "What database are we using?")
|
|
assert "postgres" in answer.lower() or "database" in answer.lower()
|
|
print(f" ✅ Chat knowledge queryable: {answer[:80]}...")
|
|
|
|
answer2 = await query_knowledge(group_id, "What is the security policy?")
|
|
assert "oauth" in answer2.lower() or "jwt" in answer2.lower() or "security" in answer2.lower()
|
|
print(f" ✅ Document knowledge queryable: {answer2[:80]}...")
|
|
|
|
# Cleanup
|
|
import chromadb
|
|
from backend.config import CHROMA_DB_PATH
|
|
client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
|
|
try:
|
|
client.delete_collection(f"ll_{group_id}")
|
|
except:
|
|
pass
|
|
|
|
print(f" ✅ All three signal types coexist and are queryable")
|
|
|
|
|
|
async def main():
|
|
test_url_extraction()
|
|
test_should_fetch()
|
|
await test_fetch_content()
|
|
await test_summarization()
|
|
await test_full_link_pipeline()
|
|
await test_mixed_with_chat_and_docs()
|
|
print("\n🎉 MILESTONE 13 PASSED — Link fetch & ingestion working")
|
|
|
|
asyncio.run(main())
|