mirror of
https://github.com/arkorty/B.Tech-Project-III.git
synced 2026-04-19 12:41:48 +00:00
238 lines
9.8 KiB
Python
238 lines
9.8 KiB
Python
"""Test Milestone 11: Document & PDF ingestion into RAG."""
|
|
import os, sys, tempfile
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
|
|
def test_text_extraction():
|
|
"""Test extraction from each supported file type."""
|
|
from backend.agents.document_ingestor import extract_text
|
|
|
|
# Test 1: Plain text file
|
|
print("Testing TXT extraction...")
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".txt", mode="w", delete=False, encoding="utf-8")
|
|
tmp.write("This is a test document.\nIt has multiple lines.\nThird line about PostgreSQL decisions.")
|
|
tmp.close()
|
|
|
|
pages = extract_text(tmp.name)
|
|
assert len(pages) == 1, f"Expected 1 page, got {len(pages)}"
|
|
assert "PostgreSQL" in pages[0]["text"]
|
|
print(f" ✅ TXT extraction works ({len(pages[0]['text'])} chars)")
|
|
os.unlink(tmp.name)
|
|
|
|
# Test 2: DOCX file
|
|
print("Testing DOCX extraction...")
|
|
try:
|
|
from docx import Document
|
|
doc = Document()
|
|
doc.add_paragraph("Architecture Decision: We chose Redis for caching.")
|
|
doc.add_paragraph("Tech Debt: The API keys are hardcoded in config.py.")
|
|
doc.add_paragraph("Promise: Dashboard mockups will be ready by Friday March 21st.")
|
|
tmp_docx = tempfile.NamedTemporaryFile(suffix=".docx", delete=False)
|
|
doc.save(tmp_docx.name)
|
|
tmp_docx.close()
|
|
|
|
pages = extract_text(tmp_docx.name)
|
|
assert len(pages) == 1, f"Expected 1 page, got {len(pages)}"
|
|
assert "Redis" in pages[0]["text"]
|
|
print(f" ✅ DOCX extraction works ({len(pages[0]['text'])} chars)")
|
|
os.unlink(tmp_docx.name)
|
|
except ImportError:
|
|
print(" ⚠️ python-docx not installed, skipping DOCX test")
|
|
|
|
# Test 3: PDF file
|
|
print("Testing PDF extraction...")
|
|
try:
|
|
from PyPDF2 import PdfWriter
|
|
from io import BytesIO
|
|
# PyPDF2 can't easily create PDFs with text from scratch,
|
|
# so we test the extractor handles an empty/corrupt file gracefully
|
|
tmp_pdf = tempfile.NamedTemporaryFile(suffix=".pdf", delete=False)
|
|
writer = PdfWriter()
|
|
writer.add_blank_page(width=612, height=792)
|
|
writer.write(tmp_pdf)
|
|
tmp_pdf.close()
|
|
|
|
pages = extract_text(tmp_pdf.name)
|
|
# Blank page = no text, should return empty gracefully
|
|
print(f" ✅ PDF extraction handles blank PDF gracefully ({len(pages)} pages with text)")
|
|
os.unlink(tmp_pdf.name)
|
|
except ImportError:
|
|
print(" ⚠️ PyPDF2 not installed, skipping PDF test")
|
|
|
|
# Test 4: Unsupported file type
|
|
print("Testing unsupported file type...")
|
|
tmp_bin = tempfile.NamedTemporaryFile(suffix=".exe", delete=False)
|
|
tmp_bin.write(b"binary data")
|
|
tmp_bin.close()
|
|
pages = extract_text(tmp_bin.name)
|
|
assert len(pages) == 0, "Should return empty for unsupported types"
|
|
print(f" ✅ Unsupported file type handled gracefully")
|
|
os.unlink(tmp_bin.name)
|
|
|
|
|
|
def test_chunking():
|
|
"""Test text chunking logic."""
|
|
from backend.agents.document_ingestor import chunk_text
|
|
|
|
print("\nTesting chunking...")
|
|
|
|
# Test 1: Short text — should NOT be split
|
|
short = "This is a short text that fits in one chunk."
|
|
chunks = chunk_text(short, max_chars=1500)
|
|
assert len(chunks) == 1, f"Short text should be 1 chunk, got {len(chunks)}"
|
|
print(f" ✅ Short text → 1 chunk")
|
|
|
|
# Test 2: Long text — should be split
|
|
long_text = "\n".join([f"This is paragraph {i} with enough content to fill the chunk. " * 5 for i in range(20)])
|
|
chunks = chunk_text(long_text, max_chars=500, overlap_chars=100)
|
|
assert len(chunks) > 1, f"Long text should produce multiple chunks, got {len(chunks)}"
|
|
print(f" ✅ Long text ({len(long_text)} chars) → {len(chunks)} chunks")
|
|
|
|
# Test 3: All chunks are within size limit (with some tolerance for overlap)
|
|
for i, c in enumerate(chunks):
|
|
# Overlap can push slightly over max_chars, that's fine
|
|
assert len(c) < 800, f"Chunk {i} too large: {len(c)} chars"
|
|
print(f" ✅ All chunks are within size bounds")
|
|
|
|
# Test 4: Empty text
|
|
chunks = chunk_text("")
|
|
assert len(chunks) == 1 and chunks[0] == "", "Empty text should return ['']"
|
|
print(f" ✅ Empty text handled")
|
|
|
|
|
|
def test_full_ingestion():
|
|
"""Test full ingestion pipeline: file → extract → chunk → signals → store → query."""
|
|
from backend.agents.document_ingestor import ingest_document
|
|
from backend.db.chroma import store_signals, query_signals
|
|
|
|
print("\nTesting full ingestion pipeline...")
|
|
|
|
# Create a realistic test document
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".txt", mode="w", delete=False, encoding="utf-8")
|
|
tmp.write("""API Specification v2.0 — Acme Project
|
|
|
|
Authentication:
|
|
All endpoints require OAuth 2.0 Bearer tokens. The recommended flow for SPAs is Authorization Code with PKCE.
|
|
Tokens expire after 3600 seconds. Refresh tokens are valid for 30 days.
|
|
|
|
Endpoints:
|
|
POST /api/v2/orders — Create a new order. Requires 'orders:write' scope.
|
|
GET /api/v2/orders/{id} — Retrieve order details. Requires 'orders:read' scope.
|
|
DELETE /api/v2/orders/{id} — Cancel an order. Only allowed within 24 hours of creation.
|
|
|
|
Rate Limits:
|
|
Standard tier: 100 requests per minute.
|
|
Enterprise tier: 1000 requests per minute.
|
|
Rate limit headers (X-RateLimit-Remaining) are included in every response.
|
|
|
|
Compliance:
|
|
All data must be encrypted at rest using AES-256.
|
|
PII fields are redacted in logs automatically.
|
|
GDPR deletion requests must be processed within 72 hours.
|
|
The compliance deadline for the new data residency requirements is April 1st 2026.
|
|
""")
|
|
tmp.close()
|
|
|
|
group_id = "test_doc_m11"
|
|
|
|
# Ingest
|
|
signals = ingest_document(tmp.name, group_id, shared_by="Priya", filename="api_spec_v2.txt")
|
|
assert len(signals) > 0, f"Expected signals, got {len(signals)}"
|
|
print(f" ✅ Ingestion produced {len(signals)} signals")
|
|
|
|
# Verify signal structure
|
|
for s in signals:
|
|
assert s["type"] == "document_knowledge"
|
|
assert s["group_id"] == group_id
|
|
assert "@Priya" in s["entities"]
|
|
assert "api_spec_v2.txt" in s["entities"]
|
|
print(f" ✅ All signals have correct type and metadata")
|
|
|
|
# Store in ChromaDB
|
|
store_signals(group_id, signals)
|
|
print(f" ✅ Stored {len(signals)} document signals in ChromaDB")
|
|
|
|
# Query: can we find document content?
|
|
results = query_signals(group_id, "What authentication method is recommended?")
|
|
assert len(results) > 0, "No results for auth query"
|
|
found_auth = any("oauth" in r["document"].lower() or "auth" in r["document"].lower() for r in results)
|
|
assert found_auth, "Expected to find OAuth/auth info in results"
|
|
print(f" ✅ Query 'authentication method' returns relevant results")
|
|
|
|
results2 = query_signals(group_id, "What is the compliance deadline?")
|
|
assert len(results2) > 0, "No results for compliance query"
|
|
found_compliance = any("april" in r["document"].lower() or "compliance" in r["document"].lower() for r in results2)
|
|
assert found_compliance, "Expected to find compliance deadline in results"
|
|
print(f" ✅ Query 'compliance deadline' returns relevant results")
|
|
|
|
results3 = query_signals(group_id, "rate limits")
|
|
assert len(results3) > 0, "No results for rate limits query"
|
|
print(f" ✅ Query 'rate limits' returns {len(results3)} results")
|
|
|
|
# Cleanup
|
|
os.unlink(tmp.name)
|
|
import chromadb
|
|
from backend.config import CHROMA_DB_PATH
|
|
client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
|
|
try:
|
|
client.delete_collection(f"ll_{group_id}")
|
|
print(f" ✅ Cleaned up test collection")
|
|
except:
|
|
pass
|
|
|
|
|
|
def test_mixed_query():
|
|
"""Test that document signals AND chat signals coexist and are both queryable."""
|
|
from backend.agents.document_ingestor import ingest_document
|
|
from backend.pipeline import process_message_batch, query_knowledge
|
|
from backend.db.chroma import store_signals
|
|
import asyncio
|
|
|
|
print("\nTesting mixed query (documents + chat signals)...")
|
|
|
|
group_id = "test_mixed_m11"
|
|
|
|
# 1. Ingest a document
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".txt", mode="w", delete=False, encoding="utf-8")
|
|
tmp.write("Architecture Decision Record: The team has selected Redis for session caching due to sub-millisecond latency.")
|
|
tmp.close()
|
|
|
|
doc_signals = ingest_document(tmp.name, group_id, shared_by="Priya", filename="adr_001.txt")
|
|
store_signals(group_id, doc_signals)
|
|
os.unlink(tmp.name)
|
|
|
|
# 2. Process some chat messages (that mention a DIFFERENT topic)
|
|
chat_messages = [
|
|
{"sender": "Alex", "text": "The timeout bug on checkout is back. Third time this sprint.", "timestamp": "2026-03-20T10:00:00Z"},
|
|
{"sender": "Sam", "text": "I think it's a database connection pool issue.", "timestamp": "2026-03-20T10:05:00Z"},
|
|
]
|
|
chat_signals = asyncio.run(process_message_batch(group_id, chat_messages))
|
|
|
|
# 3. Query for document knowledge
|
|
answer1 = asyncio.run(query_knowledge(group_id, "What caching solution was selected?"))
|
|
assert "redis" in answer1.lower() or "caching" in answer1.lower(), f"Expected Redis/caching mention, got: {answer1[:100]}"
|
|
print(f" ✅ Document query works: {answer1[:80]}...")
|
|
|
|
# 4. Query for chat knowledge
|
|
answer2 = asyncio.run(query_knowledge(group_id, "What bugs have been reported?"))
|
|
assert "timeout" in answer2.lower() or "bug" in answer2.lower(), f"Expected timeout/bug mention, got: {answer2[:100]}"
|
|
print(f" ✅ Chat query works alongside documents: {answer2[:80]}...")
|
|
|
|
# Cleanup
|
|
import chromadb
|
|
from backend.config import CHROMA_DB_PATH
|
|
client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
|
|
try:
|
|
client.delete_collection(f"ll_{group_id}")
|
|
except:
|
|
pass
|
|
|
|
print(f" ✅ Mixed query (document + chat) both return correct results")
|
|
|
|
|
|
test_text_extraction()
|
|
test_chunking()
|
|
test_full_ingestion()
|
|
test_mixed_query()
|
|
print("\n🎉 MILESTONE 11 PASSED — Document & PDF ingestion working")
|