"""Link Fetcher — extracts, summarizes, and stores content from URLs shared in chat.""" import re import uuid import logging import asyncio from datetime import datetime import httpx from bs4 import BeautifulSoup from backend.providers import call_llm from backend.config import ENABLE_LINK_FETCH logger = logging.getLogger("thirdeye.agents.link_fetcher") # Patterns to skip (images, downloads, social media embeds, etc.) SKIP_PATTERNS = [ r"\.(png|jpg|jpeg|gif|svg|webp|ico|bmp)(\?.*)?$", r"\.(zip|tar|gz|rar|7z|exe|msi|dmg|apk|deb)(\?.*)?$", r"\.(mp3|mp4|avi|mov|mkv|wav|flac)(\?.*)?$", r"^https?://(www\.)?(twitter|x)\.com/.*/status/", r"^https?://(www\.)?instagram\.com/p/", r"^https?://(www\.)?tiktok\.com/", r"^https?://(www\.)?youtube\.com/shorts/", r"^https?://t\.me/", # Other Telegram links ] SKIP_COMPILED = [re.compile(p, re.IGNORECASE) for p in SKIP_PATTERNS] def extract_urls(text: str) -> list[str]: """Extract all HTTP/HTTPS URLs from a text string.""" url_pattern = re.compile( r"https?://[^\s<>\"')\]},;]+" ) urls = url_pattern.findall(text) # Clean trailing punctuation cleaned = [] for url in urls: url = url.rstrip(".,;:!?)") if len(url) > 10: cleaned.append(url) return cleaned def should_fetch(url: str) -> bool: """Decide if a URL is worth fetching (skip images, downloads, social embeds).""" for pattern in SKIP_COMPILED: if pattern.search(url): return False return True async def fetch_url_content(url: str, timeout: float = 15.0) -> dict | None: """ Fetch a URL and extract main text content. Returns: {title, text, url} or None if fetch fails """ try: async with httpx.AsyncClient( follow_redirects=True, timeout=timeout, headers={ "User-Agent": "Mozilla/5.0 (compatible; ThirdEye/1.0; +https://thirdeye.dev)", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", }, ) as client: response = await client.get(url) if response.status_code != 200: logger.info(f"URL returned {response.status_code}: {url[:80]}") return None content_type = response.headers.get("content-type", "") if "text/html" not in content_type and "application/xhtml" not in content_type: logger.info(f"Skipping non-HTML content ({content_type}): {url[:80]}") return None html = response.text except httpx.TimeoutException: logger.info(f"URL timed out: {url[:80]}") return None except Exception as e: logger.info(f"URL fetch failed ({type(e).__name__}): {url[:80]}") return None # Parse HTML try: soup = BeautifulSoup(html, "html.parser") # Extract title title = "" if soup.title and soup.title.string: title = soup.title.string.strip() # Remove script, style, nav, footer, header elements for tag in soup(["script", "style", "nav", "footer", "header", "aside", "noscript", "form"]): tag.decompose() # Try to find main content area main = soup.find("main") or soup.find("article") or soup.find("div", {"role": "main"}) if main: text = main.get_text(separator="\n", strip=True) else: text = soup.get_text(separator="\n", strip=True) # Clean up lines = [line.strip() for line in text.split("\n") if line.strip()] text = "\n".join(lines) # Skip if too little content if len(text) < 100: logger.info(f"Too little text content ({len(text)} chars): {url[:80]}") return None # Truncate very long content if len(text) > 8000: text = text[:8000] + "\n\n[Content truncated]" return { "title": title or url, "text": text, "url": url, } except Exception as e: logger.warning(f"HTML parsing failed for {url[:80]}: {e}") return None async def summarize_content(title: str, text: str, url: str) -> str: """Use LLM to create a concise summary of fetched content.""" # Limit text sent to LLM text_preview = text[:3000] messages = [ {"role": "system", "content": """You are a content summarizer for ThirdEye. Given the title and text of a web page, produce a concise 2-4 sentence summary that captures the key information. Focus on: main topic, key facts, any actionable insights, any deadlines or decisions mentioned. Respond with ONLY the summary text, nothing else."""}, {"role": "user", "content": f"Title: {title}\nURL: {url}\n\nContent:\n{text_preview}"}, ] try: result = await call_llm("fast_small", messages, temperature=0.2, max_tokens=300) return result["content"].strip() except Exception as e: logger.warning(f"Link summarization failed: {e}") # Fallback: use first 200 chars of text return text[:200] + "..." async def process_links_from_message( text: str, group_id: str, shared_by: str = "Unknown", ) -> list[dict]: """ Full pipeline: extract URLs from message → fetch → summarize → produce signals. Designed to be called in the background (non-blocking to the main message pipeline). Returns: List of signal dicts ready for store_signals() """ if not ENABLE_LINK_FETCH: return [] urls = extract_urls(text) fetchable = [u for u in urls if should_fetch(u)] if not fetchable: return [] signals = [] # Process up to 3 links per message to avoid overload for url in fetchable[:3]: try: content = await fetch_url_content(url) if not content: continue summary = await summarize_content(content["title"], content["text"], url) signal = { "id": str(uuid.uuid4()), "type": "link_knowledge", "summary": f"[Link: {content['title'][:80]}] {summary[:200]}", "entities": [f"@{shared_by}", url[:100]], "severity": "low", "status": "reference", "sentiment": "neutral", "urgency": "none", "raw_quote": summary, "timestamp": datetime.utcnow().isoformat(), "group_id": group_id, "lens": "link", "keywords": [content["title"][:50], "link", "web", shared_by], } signals.append(signal) logger.info(f"Link ingested: {content['title'][:50]} ({url[:60]})") except Exception as e: logger.warning(f"Link processing failed for {url[:60]}: {e}") continue return signals