Files
2026-04-05 00:43:23 +05:30

57 lines
2.6 KiB
Python

"""Test Milestone 4: Full pipeline — extract → classify → store → query."""
import asyncio, os, sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
DEV_MESSAGES = [
{"sender": "Alex", "text": "Hey team, I think we should go with PostgreSQL for the main DB. MongoDB is overkill.", "timestamp": "2026-03-20T10:00:00Z"},
{"sender": "Priya", "text": "Agreed. I'll set up the Postgres schema today.", "timestamp": "2026-03-20T10:05:00Z"},
{"sender": "Raj", "text": "Payment webhook integration is tricky. I'll handle all the Stripe stuff since I know it best.", "timestamp": "2026-03-20T11:00:00Z"},
{"sender": "Alex", "text": "I'm just hardcoding the API URL for now. We'll fix it with env vars later.", "timestamp": "2026-03-20T14:00:00Z"},
{"sender": "Sam", "text": "The timeout error on checkout is back. Third time this week.", "timestamp": "2026-03-21T09:00:00Z"},
{"sender": "Alex", "text": "Just restart the pod when it happens. I'll investigate after the sprint.", "timestamp": "2026-03-21T09:15:00Z"},
]
async def main():
from backend.pipeline import process_message_batch, query_knowledge
group_id = "test_pipeline_m4"
# Step 1: Process messages through full pipeline
print("Processing message batch through full pipeline...")
signals = await process_message_batch(group_id, DEV_MESSAGES)
print(f" ✅ Pipeline produced {len(signals)} signals:")
for s in signals:
print(f" [{s.type}] {s.summary[:70]} (severity={s.severity}, sentiment={s.sentiment})")
assert len(signals) >= 2, f"Expected >=2 signals, got {len(signals)}"
# Step 2: Query the knowledge base
print("\nQuerying: 'What database did the team choose?'")
answer = await query_knowledge(group_id, "What database did the team choose?")
print(f" Answer: {answer}")
assert len(answer) > 20, "Answer too short"
print(f" ✅ Query agent produced meaningful answer")
print("\nQuerying: 'What tech debt exists?'")
answer2 = await query_knowledge(group_id, "What tech debt exists?")
print(f" Answer: {answer2}")
print(f" ✅ Tech debt query works")
print("\nQuerying: 'What bugs have been reported?'")
answer3 = await query_knowledge(group_id, "What bugs or issues keep recurring?")
print(f" Answer: {answer3}")
print(f" ✅ Bug query works")
# Cleanup
import chromadb
from backend.config import CHROMA_DB_PATH
client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
try:
client.delete_collection(f"ll_{group_id}")
except:
pass
print("\n🎉 MILESTONE 4 PASSED — Full pipeline working end to end")
asyncio.run(main())