"""Document Ingestor — extracts text from PDFs, DOCX, TXT and chunks for RAG storage.""" import os import logging import uuid from datetime import datetime logger = logging.getLogger("thirdeye.agents.document_ingestor") # --- Text Extraction --- def extract_text_from_pdf(file_path: str) -> list[dict]: """Extract text from PDF, returns list of {page: int, text: str}.""" from PyPDF2 import PdfReader pages = [] try: reader = PdfReader(file_path) for i, page in enumerate(reader.pages): text = page.extract_text() if text and text.strip(): pages.append({"page": i + 1, "text": text.strip()}) except Exception as e: logger.error(f"PDF extraction failed for {file_path}: {e}") return pages def extract_text_from_docx(file_path: str) -> list[dict]: """Extract text from DOCX, returns list of {page: 1, text: str} (DOCX has no real pages).""" from docx import Document try: doc = Document(file_path) full_text = "\n".join([p.text for p in doc.paragraphs if p.text.strip()]) if full_text.strip(): return [{"page": 1, "text": full_text.strip()}] except Exception as e: logger.error(f"DOCX extraction failed for {file_path}: {e}") return [] def extract_text_from_txt(file_path: str) -> list[dict]: """Extract text from plain text file.""" try: with open(file_path, "r", encoding="utf-8", errors="ignore") as f: text = f.read().strip() if text: return [{"page": 1, "text": text}] except Exception as e: logger.error(f"TXT extraction failed for {file_path}: {e}") return [] EXTRACTORS = { ".pdf": extract_text_from_pdf, ".docx": extract_text_from_docx, ".txt": extract_text_from_txt, ".md": extract_text_from_txt, ".csv": extract_text_from_txt, ".json": extract_text_from_txt, ".log": extract_text_from_txt, } def extract_text(file_path: str) -> list[dict]: """Route to correct extractor based on file extension.""" ext = os.path.splitext(file_path)[1].lower() extractor = EXTRACTORS.get(ext) if not extractor: logger.warning(f"Unsupported file type: {ext} ({file_path})") return [] return extractor(file_path) # --- Chunking --- def chunk_text(text: str, max_chars: int = 1500, overlap_chars: int = 200) -> list[str]: """ Split text into overlapping chunks. Uses paragraph boundaries when possible, falls back to sentence boundaries, then hard character splits. ~1500 chars ≈ ~375 tokens for embedding. """ if len(text) <= max_chars: return [text] # Split by paragraphs first paragraphs = [p.strip() for p in text.split("\n") if p.strip()] chunks = [] current_chunk = "" for para in paragraphs: # If adding this paragraph stays under limit, add it if len(current_chunk) + len(para) + 1 <= max_chars: current_chunk = (current_chunk + "\n" + para).strip() else: # Save current chunk if it has content if current_chunk: chunks.append(current_chunk) # If single paragraph is too long, split it by sentences if len(para) > max_chars: sentences = para.replace(". ", ".\n").split("\n") sub_chunk = "" for sent in sentences: if len(sub_chunk) + len(sent) + 1 <= max_chars: sub_chunk = (sub_chunk + " " + sent).strip() else: if sub_chunk: chunks.append(sub_chunk) sub_chunk = sent if sub_chunk: current_chunk = sub_chunk else: current_chunk = "" else: current_chunk = para if current_chunk: chunks.append(current_chunk) # Add overlap: prepend last N chars of previous chunk to each subsequent chunk if overlap_chars > 0 and len(chunks) > 1: overlapped = [chunks[0]] for i in range(1, len(chunks)): prev_tail = chunks[i - 1][-overlap_chars:] # Find a word boundary in the overlap space_idx = prev_tail.find(" ") if space_idx > 0: prev_tail = prev_tail[space_idx + 1:] overlapped.append(prev_tail + " " + chunks[i]) chunks = overlapped return chunks # --- Main Ingestion --- def ingest_document( file_path: str, group_id: str, shared_by: str = "Unknown", filename: str = None, ) -> list[dict]: """ Full pipeline: extract text → chunk → produce signal dicts ready for ChromaDB. Args: file_path: Path to the downloaded file on disk group_id: Telegram group ID shared_by: Who shared the file filename: Original filename (for metadata) Returns: List of signal dicts ready for store_signals() """ if filename is None: filename = os.path.basename(file_path) # Extract pages = extract_text(file_path) if not pages: logger.warning(f"No text extracted from {filename}") return [] # Chunk each page signals = [] total_chunks = 0 for page_data in pages: page_num = page_data["page"] chunks = chunk_text(page_data["text"]) for chunk_idx, chunk_text_str in enumerate(chunks): if len(chunk_text_str.strip()) < 30: continue # Skip tiny chunks signal = { "id": str(uuid.uuid4()), "type": "document_knowledge", "summary": f"[{filename} p{page_num}] {chunk_text_str[:150]}...", "entities": [f"@{shared_by}", filename], "severity": "low", "status": "reference", "sentiment": "neutral", "urgency": "none", "raw_quote": chunk_text_str, "timestamp": datetime.utcnow().isoformat(), "group_id": group_id, "lens": "document", "keywords": [filename, f"page_{page_num}", "document", shared_by], } signals.append(signal) total_chunks += 1 logger.info(f"Ingested {filename}: {len(pages)} pages → {total_chunks} chunks for group {group_id}") return signals