"""ThirdEye Telegram Bot — main entry point.""" import asyncio import logging import os import tempfile from backend.config import ENABLE_DOCUMENT_INGESTION from backend.agents.document_ingestor import ingest_document from backend.db.chroma import store_signals, set_group_name from collections import defaultdict from telegram import Update from telegram.ext import ( Application, MessageHandler, CommandHandler, filters, ContextTypes, ) from backend.config import TELEGRAM_BOT_TOKEN, BATCH_SIZE from backend.pipeline import ( process_message_batch, query_knowledge, get_lens, set_lens, detect_and_set_lens, ) from backend.agents.web_search import search_web, format_search_results_for_llm from backend.agents.link_fetcher import extract_urls, process_links_from_message from backend.config import ENABLE_LINK_FETCH from backend.config import ENABLE_WEB_SEARCH logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s" ) logger = logging.getLogger("thirdeye.bot") # Message buffers per group _buffers = defaultdict(list) _group_names = {} _flush_timers = {} # Track pending flush timers per group async def _flush_buffer(group_id: str): """Flush the message buffer for a specific group (timer callback).""" if group_id not in _buffers or not _buffers[group_id]: return batch = _buffers[group_id] _buffers[group_id] = [] # Clear the timer reference if group_id in _flush_timers: del _flush_timers[group_id] try: signals = await process_message_batch(group_id, batch) if signals: logger.info( f"Timer flush: {len(signals)} signals from {_group_names.get(group_id, group_id)} ({len(batch)} messages)" ) except Exception as e: logger.error(f"Timer flush failed for {group_id}: {e}") def _schedule_flush(group_id: str): """Schedule a flush for this group after BATCH_TIMEOUT_SECONDS.""" from backend.config import BATCH_TIMEOUT_SECONDS # Cancel existing timer if any if group_id in _flush_timers: _flush_timers[group_id].cancel() # Schedule new flush loop = asyncio.get_event_loop() timer = loop.call_later( BATCH_TIMEOUT_SECONDS, lambda: asyncio.create_task(_flush_buffer(group_id)) ) _flush_timers[group_id] = timer async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): """Process every text message in groups.""" if not update.message or not update.message.text: return if not update.message.chat.type in ("group", "supergroup"): return group_id = str(update.message.chat_id) chat_title = update.message.chat.title or group_id _group_names[group_id] = chat_title set_group_name(group_id, chat_title) text = update.message.text sender = ( update.message.from_user.first_name or update.message.from_user.username or "Unknown" ) msg = { "sender": sender, "text": text, "timestamp": update.message.date.isoformat(), "message_id": update.message.message_id, } _buffers[group_id].append(msg) # Schedule or reschedule the flush timer # If buffer was empty before, this starts the timer # If buffer had messages, this resets the timer if len(_buffers[group_id]) == 1: # First message in buffer - start the timer _schedule_flush(group_id) # Process immediately when buffer reaches batch size if len(_buffers[group_id]) >= BATCH_SIZE: # Cancel the timer since we're processing now if group_id in _flush_timers: _flush_timers[group_id].cancel() del _flush_timers[group_id] batch = _buffers[group_id] _buffers[group_id] = [] try: signals = await process_message_batch(group_id, batch) if signals: logger.info( f"Processed batch: {len(signals)} signals from {_group_names.get(group_id, group_id)}" ) except Exception as e: logger.error(f"Pipeline error: {e}") # Background: process links if message contains URLs if ENABLE_LINK_FETCH and extract_urls(text): asyncio.create_task(_process_links_background(text, group_id, sender)) async def _process_links_background(text: str, group_id: str, sender: str): """Process links from a message in the background (non-blocking).""" try: link_signals = await process_links_from_message( text, group_id, shared_by=sender ) if link_signals: store_signals(group_id, link_signals) logger.info(f"Stored {len(link_signals)} link signals for {group_id}") except Exception as e: logger.error(f"Background link processing failed: {e}") async def _process_document_background( doc, filename, group_id, shared_by, chat_id, context ): """Background task to process document without blocking Telegram.""" file_path = None tmp_dir = None try: # Download file to temp directory tg_file = await doc.get_file() tmp_dir = tempfile.mkdtemp() file_path = os.path.join(tmp_dir, filename) await tg_file.download_to_drive(file_path) logger.info( f"Downloaded {filename} from {shared_by} in {_group_names.get(group_id, group_id)}" ) # Ingest into knowledge base (this is the slow part) signals = ingest_document( file_path, group_id, shared_by=shared_by, filename=filename ) if signals: store_signals(group_id, signals) # Send success message await context.bot.send_message( chat_id=chat_id, text=f"✅ *{filename}* processed — {len(signals)} knowledge chunks stored.\nYou can now `/ask` questions about this document.", parse_mode="Markdown", ) logger.info(f"Successfully ingested {filename}: {len(signals)} signals") else: logger.info(f"No extractable text in {filename}") await context.bot.send_message( chat_id=chat_id, text=f"⚠️ *{filename}* - No text could be extracted from this file.", parse_mode="Markdown", ) except Exception as e: logger.error(f"Document ingestion failed for {filename}: {e}") try: await context.bot.send_message( chat_id=chat_id, text=f"❌ Failed to process *{filename}*: {str(e)[:100]}", parse_mode="Markdown", ) except Exception: pass finally: # Cleanup temp file try: if file_path and os.path.exists(file_path): os.remove(file_path) if tmp_dir and os.path.exists(tmp_dir): os.rmdir(tmp_dir) except Exception as e: logger.warning(f"Cleanup failed for {filename}: {e}") async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE): """Process documents/files shared in groups.""" if not ENABLE_DOCUMENT_INGESTION: return if not update.message or not update.message.document: return if not update.message.chat.type in ("group", "supergroup"): return doc = update.message.document filename = doc.file_name or "unknown_file" ext = os.path.splitext(filename)[1].lower() # Only process supported file types supported = {".pdf", ".docx", ".txt", ".md", ".csv", ".json", ".log"} if ext not in supported: return # Size guard: skip files over 10MB if doc.file_size and doc.file_size > 10 * 1024 * 1024: await update.message.reply_text( f"⚠️ File too large: *{filename}* ({doc.file_size / 1024 / 1024:.1f} MB). Max size: 10 MB.", parse_mode="Markdown", ) return group_id = str(update.message.chat_id) shared_by = ( update.message.from_user.first_name or update.message.from_user.username or "Unknown" ) chat_title = update.message.chat.title or group_id _group_names[group_id] = chat_title set_group_name(group_id, chat_title) # Respond immediately so Telegram doesn't timeout await update.message.reply_text( f"📄 Processing *{filename}*...\n_This may take a minute for large files._", parse_mode="Markdown", ) # Process in background asyncio.create_task( _process_document_background( doc, filename, group_id, shared_by, update.message.chat_id, context ) ) async def cmd_ask(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /ask [question] — also supports /ask raise jira [topic] to raise tickets inline.""" if not context.args: await update.message.reply_text( "Usage: /ask [your question]\n" "Example: /ask What database did we choose?\n" "Tip: /ask raise jira on [topic] — raise Jira tickets matching a topic" ) return question = " ".join(context.args) group_id = str(update.message.chat_id) # ── Jira assign intent detection (checked before raise) ────────────────── _ASSIGN_TRIGGERS = ( "assign jira", "assign ticket", "assign a jira", "assign a ticket", "assign the jira", "assign the ticket", ) if any(trigger in question.lower() for trigger in _ASSIGN_TRIGGERS): await _handle_ask_jira_assign(update, context, question, group_id) return # ── Jira raise intent detection ─────────────────────────────────────────── _RAISE_TRIGGERS = ( "raise jira", "jira raise", "create jira", "file jira", "open jira", "raise ticket", "create ticket", "raise a jira", "raise jira ticket", "raise jira token", # common autocorrect/typo variant ) if any(trigger in question.lower() for trigger in _RAISE_TRIGGERS): await _handle_ask_jira_raise(update, context, question, group_id) return await update.message.reply_text("🔍 Searching the knowledge base...") # Flush any buffered messages for this group so /ask always sees the latest content. # Without this, a message sent moments before /ask might still be sitting in the buffer. if _buffers.get(group_id): batch = _buffers[group_id] _buffers[group_id] = [] if group_id in _flush_timers: _flush_timers[group_id].cancel() del _flush_timers[group_id] try: await process_message_batch(group_id, batch) except Exception as e: logger.warning(f"Pre-ask flush failed for {group_id}: {e}") try: answer = await query_knowledge(group_id, question) await update.message.reply_text(f"💡 {answer}") except Exception as e: await update.message.reply_text(f"Sorry, something went wrong: {str(e)[:100]}") async def _handle_ask_jira_assign(update, context, question: str, group_id: str): """ Handle: /ask assign jira ticket for [topic] to [Name] Raises a new Jira ticket for the best matching signal and assigns it to the named person. If the signal was already raised, assigns the existing ticket instead. """ import re from backend.integrations.jira_client import is_configured, assign_issue from backend.agents.jira_agent import ( raise_ticket_for_signal, RAISEABLE_TYPES, resolve_assignee_account_id, ) from backend.db.chroma import query_signals, get_all_signals, get_raised_signal_ids if not is_configured(): await update.message.reply_text( "⚙️ Jira is not configured. Add `JIRA_BASE_URL`, `JIRA_EMAIL`, and `JIRA_API_TOKEN` " "to your .env and restart.", parse_mode="Markdown", ) return # Parse assignee from "… to [Name]" at the end of the query assignee_name = None topic = question to_match = re.search(r'\bto\s+([A-Za-z][A-Za-z\s]{1,40}?)$', question, re.IGNORECASE) if to_match: assignee_name = to_match.group(1).strip() topic = question[: to_match.start()].strip() # Strip assign intent keywords from topic _STRIP_PATTERNS = [ r"assign\s+(a\s+|the\s+)?(jira\s+)?(ticket|tickets)?\s*(for|on|about|regarding)?\s*(the\s*)?", r"assign\s+jira\s*(ticket|tickets)?\s*(for|on|about|regarding)?\s*(the\s*)?", ] for pattern in _STRIP_PATTERNS: topic = re.sub(pattern, "", topic, flags=re.IGNORECASE).strip() topic = topic.strip(" .,!?") or "priority task" if not assignee_name: await update.message.reply_text( "Usage: `/ask assign jira ticket for [topic] to [Name]`\n" "Example: `/ask assign jira ticket for design implementation to Anirban Basak`", parse_mode="Markdown", ) return await update.message.reply_text( f"🔍 Finding signals for _{topic}_...", parse_mode="Markdown" ) # Resolve Jira account ID before doing anything else account_id = await resolve_assignee_account_id(assignee_name) if not account_id: await update.message.reply_text( f"❌ Could not find *{assignee_name}* in Jira.\n" "Make sure the name matches their Jira display name exactly.", parse_mode="Markdown", ) return # Flush pending buffer so we see the latest messages if _buffers.get(group_id): batch = _buffers[group_id] _buffers[group_id] = [] if group_id in _flush_timers: _flush_timers[group_id].cancel() del _flush_timers[group_id] try: await process_message_batch(group_id, batch) except Exception as e: logger.warning(f"Pre-assign flush failed for {group_id}: {e}") already_raised = get_raised_signal_ids(group_id) # Semantic search for the best matching signal semantic_hits = query_signals(group_id, topic, n_results=10) candidates = [ r for r in semantic_hits if r.get("metadata", {}).get("type") in RAISEABLE_TYPES ] if not candidates: all_sigs = get_all_signals(group_id) candidates = [ {"id": s["id"], "document": s["document"], "metadata": s["metadata"]} for s in all_sigs if s.get("metadata", {}).get("type") in RAISEABLE_TYPES ][:3] if not candidates: await update.message.reply_text( f"📭 No raiseable signals found for *{topic}* in this group.", parse_mode="Markdown", ) return # Take the single best candidate best = candidates[0] meta = best.get("metadata", {}) sig_id = best.get("id", "") await update.message.reply_text( f"🚀 Raising ticket and assigning to *{assignee_name}*...", parse_mode="Markdown" ) # If already raised, just assign the existing ticket if sig_id in already_raised: # The jira_key is stored in the raised signal entities from backend.db.chroma import get_all_signals as _gas import json as _json raised_sigs = _gas(group_id) jira_key = None for s in raised_sigs: if s.get("metadata", {}).get("signal_id") == sig_id: try: ents = _json.loads(s["metadata"].get("entities", "[]")) except Exception: ents = [] jira_key = ents[0] if ents else None break if jira_key: res = await assign_issue(jira_key, account_id) if res.get("ok"): from backend.config import JIRA_BASE_URL await update.message.reply_text( f"✅ [{jira_key}](https://{JIRA_BASE_URL}/browse/{jira_key}) assigned to *{assignee_name}*", parse_mode="Markdown", ) else: await update.message.reply_text( f"❌ Could not assign {jira_key}: {res.get('error')}", parse_mode="Markdown" ) return # Raise a new ticket with the assignee baked in sig = { "id": sig_id, "type": meta.get("type", ""), "summary": meta.get("summary", ""), "severity": meta.get("severity", "high"), "raw_quote": best.get("document", ""), "entities": meta.get("entities", []), "keywords": meta.get("keywords", []), "timestamp": meta.get("timestamp", ""), "group_id": group_id, "lens": meta.get("lens", ""), "assignee_override": assignee_name, } result = await raise_ticket_for_signal(sig, group_id, assignee_account_id=account_id) if result.get("ok"): await update.message.reply_text( f"✅ [{result['key']}]({result['url']}) raised and assigned to *{assignee_name}*\n" f"_{result.get('summary', '')[:90]}_", parse_mode="Markdown", ) else: reason = result.get("reason", "unknown") await update.message.reply_text( f"❌ Failed to raise ticket: {result.get('error', reason)}", parse_mode="Markdown" ) async def _handle_ask_jira_raise(update, context, question: str, group_id: str): """ Called from cmd_ask when a Jira raise intent is detected. Finds signals matching the topic in the query, applies priority override from keywords (priority 0 / p0 / critical → Highest; priority / urgent / p1 → High), then raises Jira tickets and reports results. """ import re from backend.integrations.jira_client import is_configured from backend.agents.jira_agent import ( raise_ticket_for_signal, RAISEABLE_TYPES, format_raise_result_for_telegram, SEVERITY_TO_PRIORITY, ) from backend.db.chroma import query_signals, get_all_signals, get_raised_signal_ids from backend.config import JIRA_DEFAULT_PROJECT if not is_configured(): await update.message.reply_text( "⚙️ Jira is not configured. Add `JIRA_BASE_URL`, `JIRA_EMAIL`, and `JIRA_API_TOKEN` " "to your .env and restart.", parse_mode="Markdown", ) return # ── Determine priority override from keywords in the query ──────────────── q_lower = question.lower() if any(kw in q_lower for kw in ("priority 0", "p0", "critical", "blocker")): priority_override = "Highest" elif any(kw in q_lower for kw in ("priority 1", "p1", "urgent", "high priority")): priority_override = "High" elif "priority" in q_lower: # Generic "priority" keyword without a level → treat as High priority_override = "High" else: priority_override = None # Use each signal's own severity # ── Extract the topic by stripping the raise intent phrase ──────────────── _STRIP_PATTERNS = [ r"raise jira (token|ticket|tickets)?\s*(on|for|about|regarding)?\s*(the\s*)?", r"jira raise\s*(on|for|about|regarding)?\s*(the\s*)?", r"create jira (ticket|tickets)?\s*(on|for|about|regarding)?\s*(the\s*)?", r"file jira (ticket|tickets)?\s*(on|for|about|regarding)?\s*(the\s*)?", r"open jira (ticket|tickets)?\s*(on|for|about|regarding)?\s*(the\s*)?", r"raise (a\s*)?(jira\s*)?(ticket|tickets)?\s*(on|for|about|regarding)?\s*(the\s*)?", r"create (a\s*)?ticket\s*(on|for|about|regarding)?\s*(the\s*)?", ] topic = q_lower for pattern in _STRIP_PATTERNS: topic = re.sub(pattern, "", topic, flags=re.IGNORECASE).strip() topic = topic.strip(" .,!?") or "priority fixes" # ── Flush pending buffer so we see the latest messages ──────────────────── if _buffers.get(group_id): batch = _buffers[group_id] _buffers[group_id] = [] if group_id in _flush_timers: _flush_timers[group_id].cancel() del _flush_timers[group_id] try: await process_message_batch(group_id, batch) except Exception as e: logger.warning(f"Pre-ask-jira flush failed for {group_id}: {e}") await update.message.reply_text( "🔍 Searching signals to raise...", parse_mode="Markdown" ) # ── Find raiseable signals matching the topic ────────────────────────────── already_raised = get_raised_signal_ids(group_id) # Semantic search on topic semantic_hits = query_signals(group_id, topic, n_results=10) candidates = [ r for r in semantic_hits if r.get("metadata", {}).get("type") in RAISEABLE_TYPES and r.get("id", "") not in already_raised ] # Fallback: if semantic search found nothing, use all unraised raiseable signals if not candidates: all_sigs = get_all_signals(group_id) candidates = [ {"id": s["id"], "document": s["document"], "metadata": s["metadata"]} for s in all_sigs if s.get("metadata", {}).get("type") in RAISEABLE_TYPES and s.get("id", "") not in already_raised ][:5] if not candidates: await update.message.reply_text( f"📭 No raiseable signals found for *{topic}* in this group.\n\n" "Make sure conversations have been processed first, or use `/jira` to see " "all pending signals.", parse_mode="Markdown", ) return prio_label = f" with *{priority_override}* priority" if priority_override else "" await update.message.reply_text( f"🚀 Raising {len(candidates)} ticket(s){prio_label} for: _{topic}_", parse_mode="Markdown", ) # ── Raise each ticket ───────────────────────────────────────────────────── _rev_priority = { "Highest": "critical", "High": "high", "Medium": "medium", "Low": "low", } results = [] for r in candidates: meta = r.get("metadata", {}) sig = { "id": r.get("id", ""), "type": meta.get("type", ""), "summary": meta.get("summary", ""), "severity": meta.get("severity", "medium"), "raw_quote": r.get("document", ""), "entities": meta.get("entities", []), "keywords": meta.get("keywords", []), "timestamp": meta.get("timestamp", ""), "group_id": group_id, "lens": meta.get("lens", ""), } if priority_override: sig["severity"] = _rev_priority.get(priority_override, "high") result = await raise_ticket_for_signal(sig, group_id) results.append( { **result, "signal_type": sig["type"], "signal_summary": sig["summary"][:80], } ) raised = [r for r in results if r.get("ok")] failed = [ r for r in results if not r.get("ok") and r.get("reason") not in ("already_raised", "not_raiseable") ] lines = [f"🎫 *Jira Tickets Raised* ({len(raised)}/{len(candidates)})\n"] for r in results: lines.append(format_raise_result_for_telegram(r)) if failed: lines.append(f"\n⚠️ {len(failed)} failed — check logs for details") if not raised and not failed: lines.append("⏭️ All matching signals were already raised or not raiseable.") await update.message.reply_text("\n".join(lines), parse_mode="Markdown") async def cmd_flush(update: Update, context: ContextTypes.DEFAULT_TYPE): """Manually flush buffered messages for immediate processing.""" group_id = str(update.message.chat_id) if group_id not in _buffers or not _buffers[group_id]: await update.message.reply_text("✅ No messages in buffer (already processed)") return batch = _buffers[group_id] _buffers[group_id] = [] # Cancel any pending timer if group_id in _flush_timers: _flush_timers[group_id].cancel() del _flush_timers[group_id] await update.message.reply_text( f"⚡ Processing {len(batch)} buffered message(s)..." ) try: signals = await process_message_batch(group_id, batch) if signals: await update.message.reply_text( f"✅ Processed {len(batch)} messages → {len(signals)} signals extracted.\nYou can now `/ask` questions about them.", parse_mode="Markdown", ) logger.info( f"Manual flush: {len(signals)} signals from {_group_names.get(group_id, group_id)}" ) else: await update.message.reply_text( "✅ Messages processed (no significant signals detected)" ) except Exception as e: logger.error(f"Manual flush failed: {e}") await update.message.reply_text(f"❌ Processing failed: {str(e)[:100]}") async def cmd_debug(update: Update, context: ContextTypes.DEFAULT_TYPE): """Show recent signals for debugging.""" group_id = str(update.message.chat_id) from backend.db.chroma import get_all_signals import json signals = get_all_signals(group_id) if not signals: await update.message.reply_text("No signals found for this group yet.") return # Show last 5 signals lines = [f"🔍 *Debug: Last 5 Signals*\n"] for sig in signals[-5:]: meta = sig.get("metadata", {}) doc = sig.get("document", "") sig_type = meta.get("type", "unknown") timestamp = meta.get("timestamp", "")[:16] # Just date+time # Extract summary from document if ": " in doc: summary = doc.split(": ", 1)[1].split(" | Quote:")[0] else: summary = doc lines.append(f"• [{sig_type}] {timestamp}") lines.append(f" {summary[:100]}...\n") await update.message.reply_text("\n".join(lines), parse_mode="Markdown") async def cmd_digest(update: Update, context: ContextTypes.DEFAULT_TYPE): """Generate a summary digest.""" group_id = str(update.message.chat_id) from backend.db.chroma import get_all_signals signals = get_all_signals(group_id) if not signals: await update.message.reply_text( "No intelligence gathered yet. I need more conversation to learn from!" ) return # Group by type by_type = defaultdict(list) for s in signals: by_type[s["metadata"].get("type", "other")].append(s) parts = [f"📊 *Intelligence Digest* ({len(signals)} total signals)\n"] for sig_type, sigs in sorted(by_type.items(), key=lambda x: -len(x[1])): parts.append(f"• {sig_type.replace('_', ' ').title()}: {len(sigs)} signals") await update.message.reply_text("\n".join(parts)) async def cmd_lens(update: Update, context: ContextTypes.DEFAULT_TYPE): """Set or check the active lens.""" group_id = str(update.message.chat_id) if context.args: mode = context.args[0].lower() if mode in ("dev", "product", "client", "community", "auto"): set_lens(group_id, mode) await update.message.reply_text(f"🔭 Lens set to: {mode}") else: await update.message.reply_text( "Valid lenses: dev, product, client, community, auto" ) else: current = get_lens(group_id) await update.message.reply_text(f"🔭 Current lens: {current}") async def cmd_search(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /search [query] — explicit web search.""" if not ENABLE_WEB_SEARCH: await update.message.reply_text("🔍 Web search is currently disabled.") return if not context.args: await update.message.reply_text( "Usage: /search [your query]\nExample: /search FastAPI rate limiting best practices" ) return query = " ".join(context.args) await update.message.reply_text(f"🌐 Searching the web for: {query}...") try: results = await search_web(query, max_results=3) if not results: await update.message.reply_text( "No web results found. Try a different query." ) return parts = [f"🌐 Web Search: {query}\n"] for i, r in enumerate(results): snippet = ( r["content"][:200] + "..." if len(r["content"]) > 200 else r["content"] ) parts.append(f"{i+1}. {r['title']}\n{snippet}\n🔗 {r['url']}\n") await update.message.reply_text("\n".join(parts)) except Exception as e: await update.message.reply_text(f"Search failed: {str(e)[:100]}") async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE): """Welcome message.""" await update.message.reply_text( "👁️ *ThirdEye* — Conversation Intelligence Engine\n\n" "I'm now listening to this group and extracting intelligence from your conversations.\n\n" "Commands:\n" "/ask [question] — Ask about your team's knowledge\n" " ↳ /ask raise jira on [topic] — raise Jira tickets matching a topic\n" " ↳ /ask raise jira on priority fixes — raises as High priority automatically\n" " ↳ /ask assign jira ticket for [topic] to [Name] — raise & assign to a person\n" "/search [query] — Search the web for external info\n" "/digest — Get an intelligence summary\n" "/lens [mode] — Set detection mode (dev/product/client/community)\n" "/alerts — View active warnings\n" "/flush — Process buffered messages now\n" "/debug — Show last 5 signals\n\n" "📄 Share documents (PDF, DOCX, TXT) — I'll ingest them into the knowledge base.\n" "🔗 Share links — I'll fetch and store their content.\n\n" "🎙️ /meetsum [id] — Summarize a meeting\n" "🔍 /meetask [id] [q] — Ask about a meeting\n" "🔗 /meetmatch [id] — Match meeting to team chats\n" "🎫 /jira — Preview & raise Jira tickets from signals\n" "📋 /jirastatus [KEY] — Get ticket status (e.g. ENG-42)\n" "🔍 /jirasearch [query] — Search Jira in plain English\n" "📜 /jiraraised — See all tickets ThirdEye has raised\n" "👁️ /jirawatch [on|off] — Toggle auto-raise mode\n\n" "🎤 /voicelog — Audit trail of all voice note decisions\n" "🎤 /voicelog @name — Voice notes by a specific person\n" "🎤 /voicelog decisions|actions|blockers — Filter by type\n" "🎤 /voicelog search [q] — Search voice note content\n\n" "I work passively — no need to tag me. I'll alert you when I spot patterns or issues.", parse_mode=None, ) # ───────────────────────────────────────────── # Meet Commands — add to existing commands.py # ───────────────────────────────────────────── async def cmd_meetsum(update, context): """ /meetsum [meeting_id] Summarizes a meeting. Uses the most recent meeting if no ID given. """ from backend.db.chroma import query_signals from backend.config import MEET_DEFAULT_GROUP_ID args = context.args or [] group_id = MEET_DEFAULT_GROUP_ID if args: meeting_id = args[0] else: # Find the most recent meeting ID from meet_summary signals all_summaries = query_signals( group_id, "meeting summary", n_results=5, signal_type="meet_summary" ) if not all_summaries: await update.message.reply_text( "📭 No meeting summaries found yet. Record a meeting with the ThirdEye Meet extension first." ) return # The most recent signal (by position in results, or pick first) meeting_id = all_summaries[0].get("metadata", {}).get("meeting_id", "unknown") if not meeting_id or meeting_id == "unknown": await update.message.reply_text("Usage: /meetsum [meeting_id]") return # Fetch summary signal signals = query_signals(group_id, meeting_id, n_results=20) summary_signal = next( (s for s in signals if s.get("metadata", {}).get("type") == "meet_summary"), None, ) if summary_signal: # Summary text is in the document field summary_text = summary_signal.get("metadata", {}).get( "raw_quote", "" ) or summary_signal.get("document", "") await update.message.reply_text( f"📋 *Meeting Summary*\nID: `{meeting_id}`\n\n{summary_text}", parse_mode="Markdown", ) return # No summary yet — gather structured signals and summarize on-the-fly structured = [ s for s in signals if s.get("metadata", {}).get("type") in ("meet_decision", "meet_action_item", "meet_blocker", "meet_risk") ] if not structured: await update.message.reply_text( f"📭 No signals found for meeting `{meeting_id}`. " "Make sure the meeting has been recorded and processed.", parse_mode="Markdown", ) return lines = [f"📋 *Meeting Signals* for `{meeting_id}`\n"] type_labels = { "meet_decision": "✅ Decision", "meet_action_item": "📌 Action", "meet_blocker": "🚧 Blocker", "meet_risk": "⚠️ Risk", } for s in structured[:10]: sig_type = s.get("metadata", {}).get("type", "unknown") label = type_labels.get(sig_type, sig_type) # Get summary from document text (format: "type: summary | Quote: ...") doc = s.get("document", "") if ": " in doc: summary = doc.split(": ", 1)[1].split(" | Quote:")[0] else: summary = doc lines.append(f"{label}: {summary[:120]}") await update.message.reply_text("\n".join(lines), parse_mode="Markdown") async def cmd_meetask(update, context): """ /meetask [meeting_id] [question] Asks a question about a specific meeting's knowledge. """ from backend.pipeline import query_knowledge from backend.config import MEET_DEFAULT_GROUP_ID args = context.args or [] if len(args) < 2: await update.message.reply_text( "Usage: /meetask [meeting_id] [your question]\n" "Example: /meetask abc-defg-hij What did we decide about the database?" ) return meeting_id = args[0] question = " ".join(args[1:]) group_id = MEET_DEFAULT_GROUP_ID await update.message.reply_text( f"🔍 Searching meeting `{meeting_id}`...", parse_mode="Markdown" ) # Augment the question with meeting ID context so the RAG retrieval is scoped scoped_question = f"From meeting {meeting_id}: {question}" answer = await query_knowledge(group_id, scoped_question) await update.message.reply_text( f"🎙️ *Meet Q&A* — `{meeting_id}`\n\nQ: _{question}_\n\nA: {answer}", parse_mode="Markdown", ) async def cmd_meetmatch(update, context): """ /meetmatch [meeting_id] Finds where a meeting's decisions/blockers match or conflict with your team chat signals. """ from backend.agents.meet_cross_ref import ( find_cross_references, format_cross_ref_for_telegram, ) from backend.db.chroma import query_signals from backend.config import MEET_DEFAULT_GROUP_ID args = context.args or [] group_id = MEET_DEFAULT_GROUP_ID if args: meeting_id = args[0] else: # Find most recent meeting all_summaries = query_signals( group_id, "meeting", n_results=5, signal_type="meet_summary" ) if not all_summaries: await update.message.reply_text( "📭 No meetings found. Record a meeting first with the ThirdEye Meet extension." ) return meeting_id = all_summaries[0].get("metadata", {}).get("meeting_id", "unknown") await update.message.reply_text( f"🔗 Comparing meeting `{meeting_id}` against your team chats...\n" "_This may take a moment._", parse_mode="Markdown", ) analysis = await find_cross_references(meeting_id, group_id=group_id) formatted = format_cross_ref_for_telegram(analysis, meeting_id) await update.message.reply_text(formatted, parse_mode="Markdown") # ───────────────────────────────────────────── # Jira Commands — add to existing commands.py # ───────────────────────────────────────────── async def cmd_jira(update, context): """ /jira Shows unraised high-severity signals in this group and raises them as Jira tickets. With no args: shows a preview of what would be raised and asks for confirmation. With 'confirm': actually raises the tickets. Usage: /jira — preview unraised signals /jira confirm — raise all unraised high+ severity signals now /jira [signal_type] — raise only signals of that type (e.g. /jira recurring_bug) """ from backend.db.chroma import get_all_signals, get_raised_signal_ids from backend.agents.jira_agent import ( bulk_raise_for_group, RAISEABLE_TYPES, SEVERITY_TO_PRIORITY, format_raise_result_for_telegram, ) from backend.integrations.jira_client import is_configured from backend.config import JIRA_DEFAULT_PROJECT chat_id = str(update.effective_chat.id) args = context.args or [] confirm = "confirm" in [a.lower() for a in args] type_filter = next( (a for a in args if a.lower() not in ("confirm",) and "_" in a), None ) if not is_configured(): await update.message.reply_text( "⚙️ Jira is not configured. Add `JIRA_BASE_URL`, `JIRA_EMAIL`, and `JIRA_API_TOKEN` to your .env and restart." ) return await update.message.reply_text("🔍 Scanning signals...", parse_mode="Markdown") all_signals = get_all_signals(chat_id) already_raised = get_raised_signal_ids(chat_id) severity_rank = {"low": 0, "medium": 1, "high": 2, "critical": 3} candidates = [ s for s in all_signals if s.get("type") in RAISEABLE_TYPES and s.get("id", "") not in already_raised and severity_rank.get(s.get("severity", "low"), 0) >= 2 # high+ and (not type_filter or s.get("type") == type_filter) ] if not candidates: await update.message.reply_text( "✅ Nothing to raise — no unraised high-severity signals found in this group.\n" "Use `/ask` to query existing knowledge or wait for new signals.", parse_mode="Markdown", ) return if not confirm: # Preview mode — show what would be raised lines = [ f"📋 *{len(candidates)} signal(s) ready to raise in `{JIRA_DEFAULT_PROJECT}`*\n" ] for s in candidates[:8]: lines.append( f"• [{s.get('severity','?').upper()}] `{s.get('type','?')}` — " f"_{s.get('summary','')[:80]}_" ) if len(candidates) > 8: lines.append(f"_...and {len(candidates) - 8} more_") lines.append(f"\nSend `/jira confirm` to raise all of these as Jira tickets.") await update.message.reply_text("\n".join(lines), parse_mode="Markdown") return # Confirm mode — raise the tickets await update.message.reply_text( f"🚀 Raising {len(candidates)} ticket(s) in `{JIRA_DEFAULT_PROJECT}`...", parse_mode="Markdown", ) results = await bulk_raise_for_group( group_id=chat_id, signals=candidates, min_severity="high", max_tickets=10, ) raised = [r for r in results if r.get("ok")] failed = [ r for r in results if not r.get("ok") and r.get("reason") not in ("already_raised", "not_raiseable") ] lines = [f"🎫 *Jira Tickets Raised* ({len(raised)}/{len(candidates)})\n"] for r in raised: lines.append(format_raise_result_for_telegram(r)) if failed: lines.append(f"\n⚠️ {len(failed)} failed — check logs") await update.message.reply_text("\n".join(lines), parse_mode="Markdown") async def cmd_jirastatus(update, context): """ /jirastatus [TICKET-KEY] Get the current status of a specific Jira ticket. Usage: /jirastatus ENG-42 """ from backend.integrations.jira_client import get_issue, is_configured if not is_configured(): await update.message.reply_text("⚙️ Jira not configured.") return args = context.args or [] if not args: await update.message.reply_text( "Usage: `/jirastatus TICKET-KEY`\nExample: `/jirastatus ENG-42`", parse_mode="Markdown", ) return issue_key = args[0].upper() try: issue = await get_issue(issue_key) status_emoji = { "To Do": "🔵", "In Progress": "🟡", "Done": "✅", "Blocked": "🔴", }.get(issue["status"], "⚪") msg = ( f"🎫 *{issue['key']}*\n" f"{status_emoji} Status: *{issue['status']}*\n" f"📌 Type: {issue['issue_type']} | ⚡ Priority: {issue['priority']}\n" f"👤 Assignee: {issue['assignee']}\n" f"📝 {issue['summary']}\n\n" f"🔗 [Open in Jira]({issue['url']})" ) await update.message.reply_text(msg, parse_mode="Markdown") except Exception as e: await update.message.reply_text( f"❌ Could not fetch `{issue_key}`: {e}", parse_mode="Markdown" ) async def cmd_jirasearch(update, context): """ /jirasearch [query] Search Jira using natural language. ThirdEye converts it to JQL automatically. Usage: /jirasearch open bugs assigned to Alex /jirasearch all thirdeye tickets from this sprint """ from backend.integrations.jira_client import search_issues, is_configured from backend.providers import call_llm from backend.config import JIRA_DEFAULT_PROJECT if not is_configured(): await update.message.reply_text("⚙️ Jira not configured.") return args = context.args or [] if not args: await update.message.reply_text( "Usage: `/jirasearch [natural language query]`\n" "Example: `/jirasearch open bugs assigned to Alex`", parse_mode="Markdown", ) return query = " ".join(args) await update.message.reply_text( f"🔍 Searching Jira for: _{query}_...", parse_mode="Markdown" ) # Convert natural language → JQL via LLM try: jql_result = await call_llm( task_type="fast_small", messages=[ { "role": "system", "content": ( f"Convert the user's natural language query into a valid Jira JQL query. " f"Default project is '{JIRA_DEFAULT_PROJECT}'. " "Return ONLY the JQL string — no explanation, no quotes around it, no markdown." ), }, {"role": "user", "content": query}, ], temperature=0.0, max_tokens=150, ) jql = jql_result["content"].strip().strip('"').strip("'") except Exception: # Fallback: simple text search JQL jql = f'project = {JIRA_DEFAULT_PROJECT} AND text ~ "{query}" ORDER BY created DESC' try: results = await search_issues(jql, max_results=8) except Exception as e: await update.message.reply_text( f"❌ JQL search failed.\nJQL used: `{jql}`\nError: {e}", parse_mode="Markdown", ) return if not results: await update.message.reply_text( f"📭 No results found.\nJQL: `{jql}`", parse_mode="Markdown", ) return status_emoji = {"To Do": "🔵", "In Progress": "🟡", "Done": "✅"}.get lines = [f"🔍 *Jira Search Results* ({len(results)} found)\nQuery: _{query}_\n"] for r in results: emoji = {"To Do": "🔵", "In Progress": "🟡", "Done": "✅"}.get( r["status"], "⚪" ) lines.append( f"{emoji} [{r['key']}]({r['url']}) — _{r['summary'][:70]}_\n" f" {r['status']} | {r['priority']} | {r['assignee']}" ) await update.message.reply_text("\n".join(lines), parse_mode="Markdown") async def cmd_jiraraised(update, context): """ /jiraraised Shows all tickets ThirdEye has previously raised for this group. """ from backend.db.chroma import query_signals from backend.integrations.jira_client import is_configured from backend.config import JIRA_BASE_URL if not is_configured(): await update.message.reply_text("⚙️ Jira not configured.") return chat_id = str(update.effective_chat.id) raised_signals = query_signals( chat_id, "jira raised ticket", n_results=20, signal_type="jira_raised" ) if not raised_signals: await update.message.reply_text( "📭 No tickets raised yet for this group. Use `/jira confirm` to raise tickets.", parse_mode="Markdown", ) return lines = [f"🎫 *Tickets Raised by ThirdEye* ({len(raised_signals)} total)\n"] for sig in raised_signals[:10]: meta = sig.get("metadata", {}) # entities is stored as a JSON string in ChromaDB metadata try: import json as _json entities_raw = meta.get("entities", "[]") entities = _json.loads(entities_raw) if isinstance(entities_raw, str) else (entities_raw or []) except Exception: entities = [] jira_key = entities[0] if entities else "Unknown" url = f"{JIRA_BASE_URL}/browse/{jira_key}" label = meta.get("summary", "") or sig.get("document", "") lines.append(f"• [{jira_key}]({url}) — _{label[:80]}_") if len(raised_signals) > 10: lines.append(f"_...and {len(raised_signals) - 10} more_") await update.message.reply_text("\n".join(lines), parse_mode="Markdown") async def cmd_jirawatch(update, context): """ /jirawatch [on|off] Enable or disable auto-raise mode for this group. When ON: any new signal at or above JIRA_AUTO_RAISE_SEVERITY is automatically filed as a Jira ticket without manual /jira confirm. Usage: /jirawatch on /jirawatch off /jirawatch — show current status """ import json from telegram.ext import ContextTypes args = context.args or [] chat_id = str(update.effective_chat.id) # Store watch state in bot_data (in-memory; persists until restart) if not hasattr(context, "bot_data"): context.bot_data = {} watch_key = f"jirawatch_{chat_id}" if not args: current = context.bot_data.get(watch_key, False) status = "🟢 ON" if current else "🔴 OFF" await update.message.reply_text( f"👁️ *Jira Auto-Raise* — {status}\n\n" "When ON, ThirdEye automatically raises Jira tickets for any high or critical " "severity signal detected in this group — no `/jira confirm` needed.\n\n" "Use `/jirawatch on` or `/jirawatch off` to toggle.", parse_mode="Markdown", ) return mode = args[0].lower() if mode == "on": context.bot_data[watch_key] = True await update.message.reply_text( "🟢 *Jira Auto-Raise: ON*\n" "Any new `high` or `critical` signal detected in this group will automatically " "be raised as a Jira ticket. You'll be notified here.\n\n" "Use `/jirawatch off` to disable.", parse_mode="Markdown", ) elif mode == "off": context.bot_data[watch_key] = False await update.message.reply_text( "🔴 *Jira Auto-Raise: OFF*\n" "Use `/jira confirm` to raise tickets manually.", parse_mode="Markdown", ) else: await update.message.reply_text( "Usage: `/jirawatch on` or `/jirawatch off`", parse_mode="Markdown" ) # ----------------------------------------------------------------- # Voice Message Handlers — add to commands.py # ----------------------------------------------------------------- async def handle_voice_telegram(update, context): """ Fires for every voice message sent to a monitored group. Downloads, transcribes via Groq Whisper, feeds into signal pipeline. """ from backend.agents.voice_handler import handle_voice_message from backend.config import ENABLE_VOICE_TRANSCRIPTION if not ENABLE_VOICE_TRANSCRIPTION: return msg = update.message if not msg or not msg.voice: return group_id = str(msg.chat_id) sender = msg.from_user.full_name or msg.from_user.username or "Unknown" voice = msg.voice duration = voice.duration or 0 # React with 👂 immediately so team knows ThirdEye is processing try: await msg.set_reaction("👂") except Exception: pass # Reactions need bot admin rights — fail silently result = await handle_voice_message( bot=context.bot, group_id=group_id, sender=sender, file_id=voice.file_id, duration_seconds=duration, message_date=msg.date, is_video_note=False, ) if result["ok"]: signals = result["signals_extracted"] reply = ( f"🎤 *{sender}* ({duration}s) — transcribed\n" f"_{result['transcript'][:120]}{'...' if len(result['transcript']) > 120 else ''}_\n" f"`{signals} signal{'s' if signals != 1 else ''} extracted`" ) await msg.reply_text(reply, parse_mode="Markdown") elif result["reason"] == "too_long": await msg.reply_text( f"⏭️ Voice note from *{sender}* skipped — too long ({duration}s).", parse_mode="Markdown", ) elif result["reason"] in ("no_speech", "too_short", "empty"): pass # Silent skip else: logger.warning(f"Voice error for {sender}: {result.get('error')}") async def handle_video_note_telegram(update, context): """ Fires for round video messages (video notes). Same pipeline as voice messages — also contain audio. """ from backend.agents.voice_handler import handle_voice_message from backend.config import ENABLE_VOICE_TRANSCRIPTION if not ENABLE_VOICE_TRANSCRIPTION: return msg = update.message if not msg or not msg.video_note: return group_id = str(msg.chat_id) sender = msg.from_user.full_name or msg.from_user.username or "Unknown" video_note = msg.video_note duration = video_note.duration or 0 try: await msg.set_reaction("👂") except Exception: pass result = await handle_voice_message( bot=context.bot, group_id=group_id, sender=sender, file_id=video_note.file_id, duration_seconds=duration, message_date=msg.date, is_video_note=True, ) if result["ok"]: signals = result["signals_extracted"] reply = ( f"📹 *{sender}* ({duration}s video note) — transcribed\n" f"_{result['transcript'][:120]}{'...' if len(result['transcript']) > 120 else ''}_\n" f"`{signals} signal{'s' if signals != 1 else ''} extracted`" ) await msg.reply_text(reply, parse_mode="Markdown") def run_bot(): """Start the Telegram bot.""" from telegram.request import HTTPXRequest # Configure longer timeouts for slow network or heavy processing request = HTTPXRequest( connection_pool_size=8, connect_timeout=20.0, read_timeout=30.0, write_timeout=30.0, pool_timeout=10.0, ) app = Application.builder().token(TELEGRAM_BOT_TOKEN).request(request).build() app.add_handler(CommandHandler("start", cmd_start)) app.add_handler(CommandHandler("ask", cmd_ask)) app.add_handler(CommandHandler("digest", cmd_digest)) app.add_handler(CommandHandler("lens", cmd_lens)) app.add_handler(CommandHandler("flush", cmd_flush)) app.add_handler(CommandHandler("debug", cmd_debug)) app.add_handler(MessageHandler(filters.Document.ALL, handle_document)) app.add_handler(CommandHandler("search", cmd_search)) app.add_handler(CommandHandler("meetsum", cmd_meetsum)) app.add_handler(CommandHandler("meetask", cmd_meetask)) app.add_handler(CommandHandler("meetmatch", cmd_meetmatch)) app.add_handler(CommandHandler("jira", cmd_jira)) app.add_handler(CommandHandler("jirastatus", cmd_jirastatus)) app.add_handler(CommandHandler("jirasearch", cmd_jirasearch)) app.add_handler(CommandHandler("jiraraised", cmd_jiraraised)) app.add_handler(CommandHandler("jirawatch", cmd_jirawatch)) from backend.bot.commands import cmd_voicelog app.add_handler(CommandHandler("voicelog", cmd_voicelog)) app.add_handler( MessageHandler(filters.VOICE & ~filters.COMMAND, handle_voice_telegram) ) app.add_handler( MessageHandler( filters.VIDEO_NOTE & ~filters.COMMAND, handle_video_note_telegram ) ) app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) logger.info("ThirdEye bot starting...") app.run_polling(allowed_updates=Update.ALL_TYPES) if __name__ == "__main__": run_bot()