mirror of
https://github.com/arkorty/B.Tech-Project-III.git
synced 2026-04-19 12:41:48 +00:00
1501 lines
54 KiB
Python
1501 lines
54 KiB
Python
"""ThirdEye Telegram Bot — main entry point."""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
from backend.config import ENABLE_DOCUMENT_INGESTION
|
|
from backend.agents.document_ingestor import ingest_document
|
|
from backend.db.chroma import store_signals, set_group_name
|
|
from collections import defaultdict
|
|
from telegram import Update
|
|
from telegram.ext import (
|
|
Application,
|
|
MessageHandler,
|
|
CommandHandler,
|
|
filters,
|
|
ContextTypes,
|
|
)
|
|
from backend.config import TELEGRAM_BOT_TOKEN, BATCH_SIZE
|
|
from backend.pipeline import (
|
|
process_message_batch,
|
|
query_knowledge,
|
|
get_lens,
|
|
set_lens,
|
|
detect_and_set_lens,
|
|
)
|
|
from backend.agents.web_search import search_web, format_search_results_for_llm
|
|
from backend.agents.link_fetcher import extract_urls, process_links_from_message
|
|
from backend.config import ENABLE_LINK_FETCH
|
|
|
|
from backend.config import ENABLE_WEB_SEARCH
|
|
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s"
|
|
)
|
|
logger = logging.getLogger("thirdeye.bot")
|
|
|
|
# Message buffers per group
|
|
_buffers = defaultdict(list)
|
|
_group_names = {}
|
|
_flush_timers = {} # Track pending flush timers per group
|
|
|
|
|
|
async def _flush_buffer(group_id: str):
|
|
"""Flush the message buffer for a specific group (timer callback)."""
|
|
if group_id not in _buffers or not _buffers[group_id]:
|
|
return
|
|
|
|
batch = _buffers[group_id]
|
|
_buffers[group_id] = []
|
|
|
|
# Clear the timer reference
|
|
if group_id in _flush_timers:
|
|
del _flush_timers[group_id]
|
|
|
|
try:
|
|
signals = await process_message_batch(group_id, batch)
|
|
if signals:
|
|
logger.info(
|
|
f"Timer flush: {len(signals)} signals from {_group_names.get(group_id, group_id)} ({len(batch)} messages)"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Timer flush failed for {group_id}: {e}")
|
|
|
|
|
|
def _schedule_flush(group_id: str):
|
|
"""Schedule a flush for this group after BATCH_TIMEOUT_SECONDS."""
|
|
from backend.config import BATCH_TIMEOUT_SECONDS
|
|
|
|
# Cancel existing timer if any
|
|
if group_id in _flush_timers:
|
|
_flush_timers[group_id].cancel()
|
|
|
|
# Schedule new flush
|
|
loop = asyncio.get_event_loop()
|
|
timer = loop.call_later(
|
|
BATCH_TIMEOUT_SECONDS, lambda: asyncio.create_task(_flush_buffer(group_id))
|
|
)
|
|
_flush_timers[group_id] = timer
|
|
|
|
|
|
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Process every text message in groups."""
|
|
if not update.message or not update.message.text:
|
|
return
|
|
if not update.message.chat.type in ("group", "supergroup"):
|
|
return
|
|
|
|
group_id = str(update.message.chat_id)
|
|
chat_title = update.message.chat.title or group_id
|
|
_group_names[group_id] = chat_title
|
|
set_group_name(group_id, chat_title)
|
|
text = update.message.text
|
|
sender = (
|
|
update.message.from_user.first_name
|
|
or update.message.from_user.username
|
|
or "Unknown"
|
|
)
|
|
|
|
msg = {
|
|
"sender": sender,
|
|
"text": text,
|
|
"timestamp": update.message.date.isoformat(),
|
|
"message_id": update.message.message_id,
|
|
}
|
|
|
|
_buffers[group_id].append(msg)
|
|
|
|
# Schedule or reschedule the flush timer
|
|
# If buffer was empty before, this starts the timer
|
|
# If buffer had messages, this resets the timer
|
|
if len(_buffers[group_id]) == 1:
|
|
# First message in buffer - start the timer
|
|
_schedule_flush(group_id)
|
|
|
|
# Process immediately when buffer reaches batch size
|
|
if len(_buffers[group_id]) >= BATCH_SIZE:
|
|
# Cancel the timer since we're processing now
|
|
if group_id in _flush_timers:
|
|
_flush_timers[group_id].cancel()
|
|
del _flush_timers[group_id]
|
|
|
|
batch = _buffers[group_id]
|
|
_buffers[group_id] = []
|
|
|
|
try:
|
|
signals = await process_message_batch(group_id, batch)
|
|
if signals:
|
|
logger.info(
|
|
f"Processed batch: {len(signals)} signals from {_group_names.get(group_id, group_id)}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Pipeline error: {e}")
|
|
|
|
# Background: process links if message contains URLs
|
|
if ENABLE_LINK_FETCH and extract_urls(text):
|
|
asyncio.create_task(_process_links_background(text, group_id, sender))
|
|
|
|
|
|
async def _process_links_background(text: str, group_id: str, sender: str):
|
|
"""Process links from a message in the background (non-blocking)."""
|
|
try:
|
|
link_signals = await process_links_from_message(
|
|
text, group_id, shared_by=sender
|
|
)
|
|
if link_signals:
|
|
store_signals(group_id, link_signals)
|
|
logger.info(f"Stored {len(link_signals)} link signals for {group_id}")
|
|
except Exception as e:
|
|
logger.error(f"Background link processing failed: {e}")
|
|
|
|
|
|
async def _process_document_background(
|
|
doc, filename, group_id, shared_by, chat_id, context
|
|
):
|
|
"""Background task to process document without blocking Telegram."""
|
|
file_path = None
|
|
tmp_dir = None
|
|
|
|
try:
|
|
# Download file to temp directory
|
|
tg_file = await doc.get_file()
|
|
tmp_dir = tempfile.mkdtemp()
|
|
file_path = os.path.join(tmp_dir, filename)
|
|
await tg_file.download_to_drive(file_path)
|
|
|
|
logger.info(
|
|
f"Downloaded {filename} from {shared_by} in {_group_names.get(group_id, group_id)}"
|
|
)
|
|
|
|
# Ingest into knowledge base (this is the slow part)
|
|
signals = ingest_document(
|
|
file_path, group_id, shared_by=shared_by, filename=filename
|
|
)
|
|
|
|
if signals:
|
|
store_signals(group_id, signals)
|
|
# Send success message
|
|
await context.bot.send_message(
|
|
chat_id=chat_id,
|
|
text=f"✅ *{filename}* processed — {len(signals)} knowledge chunks stored.\nYou can now `/ask` questions about this document.",
|
|
parse_mode="Markdown",
|
|
)
|
|
logger.info(f"Successfully ingested {filename}: {len(signals)} signals")
|
|
else:
|
|
logger.info(f"No extractable text in {filename}")
|
|
await context.bot.send_message(
|
|
chat_id=chat_id,
|
|
text=f"⚠️ *{filename}* - No text could be extracted from this file.",
|
|
parse_mode="Markdown",
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Document ingestion failed for {filename}: {e}")
|
|
try:
|
|
await context.bot.send_message(
|
|
chat_id=chat_id,
|
|
text=f"❌ Failed to process *{filename}*: {str(e)[:100]}",
|
|
parse_mode="Markdown",
|
|
)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
# Cleanup temp file
|
|
try:
|
|
if file_path and os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
if tmp_dir and os.path.exists(tmp_dir):
|
|
os.rmdir(tmp_dir)
|
|
except Exception as e:
|
|
logger.warning(f"Cleanup failed for {filename}: {e}")
|
|
|
|
|
|
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Process documents/files shared in groups."""
|
|
if not ENABLE_DOCUMENT_INGESTION:
|
|
return
|
|
if not update.message or not update.message.document:
|
|
return
|
|
if not update.message.chat.type in ("group", "supergroup"):
|
|
return
|
|
|
|
doc = update.message.document
|
|
filename = doc.file_name or "unknown_file"
|
|
ext = os.path.splitext(filename)[1].lower()
|
|
|
|
# Only process supported file types
|
|
supported = {".pdf", ".docx", ".txt", ".md", ".csv", ".json", ".log"}
|
|
if ext not in supported:
|
|
return
|
|
|
|
# Size guard: skip files over 10MB
|
|
if doc.file_size and doc.file_size > 10 * 1024 * 1024:
|
|
await update.message.reply_text(
|
|
f"⚠️ File too large: *{filename}* ({doc.file_size / 1024 / 1024:.1f} MB). Max size: 10 MB.",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
group_id = str(update.message.chat_id)
|
|
shared_by = (
|
|
update.message.from_user.first_name
|
|
or update.message.from_user.username
|
|
or "Unknown"
|
|
)
|
|
chat_title = update.message.chat.title or group_id
|
|
_group_names[group_id] = chat_title
|
|
set_group_name(group_id, chat_title)
|
|
|
|
# Respond immediately so Telegram doesn't timeout
|
|
await update.message.reply_text(
|
|
f"📄 Processing *{filename}*...\n_This may take a minute for large files._",
|
|
parse_mode="Markdown",
|
|
)
|
|
|
|
# Process in background
|
|
asyncio.create_task(
|
|
_process_document_background(
|
|
doc, filename, group_id, shared_by, update.message.chat_id, context
|
|
)
|
|
)
|
|
|
|
|
|
async def cmd_ask(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle /ask [question] — also supports /ask raise jira [topic] to raise tickets inline."""
|
|
if not context.args:
|
|
await update.message.reply_text(
|
|
"Usage: /ask [your question]\n"
|
|
"Example: /ask What database did we choose?\n"
|
|
"Tip: /ask raise jira on [topic] — raise Jira tickets matching a topic"
|
|
)
|
|
return
|
|
|
|
question = " ".join(context.args)
|
|
group_id = str(update.message.chat_id)
|
|
|
|
# ── Jira assign intent detection (checked before raise) ──────────────────
|
|
_ASSIGN_TRIGGERS = (
|
|
"assign jira",
|
|
"assign ticket",
|
|
"assign a jira",
|
|
"assign a ticket",
|
|
"assign the jira",
|
|
"assign the ticket",
|
|
)
|
|
if any(trigger in question.lower() for trigger in _ASSIGN_TRIGGERS):
|
|
await _handle_ask_jira_assign(update, context, question, group_id)
|
|
return
|
|
|
|
# ── Jira raise intent detection ───────────────────────────────────────────
|
|
_RAISE_TRIGGERS = (
|
|
"raise jira",
|
|
"jira raise",
|
|
"create jira",
|
|
"file jira",
|
|
"open jira",
|
|
"raise ticket",
|
|
"create ticket",
|
|
"raise a jira",
|
|
"raise jira ticket",
|
|
"raise jira token", # common autocorrect/typo variant
|
|
)
|
|
if any(trigger in question.lower() for trigger in _RAISE_TRIGGERS):
|
|
await _handle_ask_jira_raise(update, context, question, group_id)
|
|
return
|
|
|
|
await update.message.reply_text("🔍 Searching the knowledge base...")
|
|
|
|
# Flush any buffered messages for this group so /ask always sees the latest content.
|
|
# Without this, a message sent moments before /ask might still be sitting in the buffer.
|
|
if _buffers.get(group_id):
|
|
batch = _buffers[group_id]
|
|
_buffers[group_id] = []
|
|
if group_id in _flush_timers:
|
|
_flush_timers[group_id].cancel()
|
|
del _flush_timers[group_id]
|
|
try:
|
|
await process_message_batch(group_id, batch)
|
|
except Exception as e:
|
|
logger.warning(f"Pre-ask flush failed for {group_id}: {e}")
|
|
|
|
try:
|
|
answer = await query_knowledge(group_id, question)
|
|
await update.message.reply_text(f"💡 {answer}")
|
|
except Exception as e:
|
|
await update.message.reply_text(f"Sorry, something went wrong: {str(e)[:100]}")
|
|
|
|
|
|
async def _handle_ask_jira_assign(update, context, question: str, group_id: str):
|
|
"""
|
|
Handle: /ask assign jira ticket for [topic] to [Name]
|
|
Raises a new Jira ticket for the best matching signal and assigns it to the
|
|
named person. If the signal was already raised, assigns the existing ticket instead.
|
|
"""
|
|
import re
|
|
from backend.integrations.jira_client import is_configured, assign_issue
|
|
from backend.agents.jira_agent import (
|
|
raise_ticket_for_signal,
|
|
RAISEABLE_TYPES,
|
|
resolve_assignee_account_id,
|
|
)
|
|
from backend.db.chroma import query_signals, get_all_signals, get_raised_signal_ids
|
|
|
|
if not is_configured():
|
|
await update.message.reply_text(
|
|
"⚙️ Jira is not configured. Add `JIRA_BASE_URL`, `JIRA_EMAIL`, and `JIRA_API_TOKEN` "
|
|
"to your .env and restart.",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
# Parse assignee from "… to [Name]" at the end of the query
|
|
assignee_name = None
|
|
topic = question
|
|
to_match = re.search(r'\bto\s+([A-Za-z][A-Za-z\s]{1,40}?)$', question, re.IGNORECASE)
|
|
if to_match:
|
|
assignee_name = to_match.group(1).strip()
|
|
topic = question[: to_match.start()].strip()
|
|
|
|
# Strip assign intent keywords from topic
|
|
_STRIP_PATTERNS = [
|
|
r"assign\s+(a\s+|the\s+)?(jira\s+)?(ticket|tickets)?\s*(for|on|about|regarding)?\s*(the\s*)?",
|
|
r"assign\s+jira\s*(ticket|tickets)?\s*(for|on|about|regarding)?\s*(the\s*)?",
|
|
]
|
|
for pattern in _STRIP_PATTERNS:
|
|
topic = re.sub(pattern, "", topic, flags=re.IGNORECASE).strip()
|
|
topic = topic.strip(" .,!?") or "priority task"
|
|
|
|
if not assignee_name:
|
|
await update.message.reply_text(
|
|
"Usage: `/ask assign jira ticket for [topic] to [Name]`\n"
|
|
"Example: `/ask assign jira ticket for design implementation to Anirban Basak`",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
await update.message.reply_text(
|
|
f"🔍 Finding signals for _{topic}_...", parse_mode="Markdown"
|
|
)
|
|
|
|
# Resolve Jira account ID before doing anything else
|
|
account_id = await resolve_assignee_account_id(assignee_name)
|
|
if not account_id:
|
|
await update.message.reply_text(
|
|
f"❌ Could not find *{assignee_name}* in Jira.\n"
|
|
"Make sure the name matches their Jira display name exactly.",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
# Flush pending buffer so we see the latest messages
|
|
if _buffers.get(group_id):
|
|
batch = _buffers[group_id]
|
|
_buffers[group_id] = []
|
|
if group_id in _flush_timers:
|
|
_flush_timers[group_id].cancel()
|
|
del _flush_timers[group_id]
|
|
try:
|
|
await process_message_batch(group_id, batch)
|
|
except Exception as e:
|
|
logger.warning(f"Pre-assign flush failed for {group_id}: {e}")
|
|
|
|
already_raised = get_raised_signal_ids(group_id)
|
|
|
|
# Semantic search for the best matching signal
|
|
semantic_hits = query_signals(group_id, topic, n_results=10)
|
|
candidates = [
|
|
r for r in semantic_hits
|
|
if r.get("metadata", {}).get("type") in RAISEABLE_TYPES
|
|
]
|
|
if not candidates:
|
|
all_sigs = get_all_signals(group_id)
|
|
candidates = [
|
|
{"id": s["id"], "document": s["document"], "metadata": s["metadata"]}
|
|
for s in all_sigs
|
|
if s.get("metadata", {}).get("type") in RAISEABLE_TYPES
|
|
][:3]
|
|
|
|
if not candidates:
|
|
await update.message.reply_text(
|
|
f"📭 No raiseable signals found for *{topic}* in this group.",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
# Take the single best candidate
|
|
best = candidates[0]
|
|
meta = best.get("metadata", {})
|
|
sig_id = best.get("id", "")
|
|
|
|
await update.message.reply_text(
|
|
f"🚀 Raising ticket and assigning to *{assignee_name}*...", parse_mode="Markdown"
|
|
)
|
|
|
|
# If already raised, just assign the existing ticket
|
|
if sig_id in already_raised:
|
|
# The jira_key is stored in the raised signal entities
|
|
from backend.db.chroma import get_all_signals as _gas
|
|
import json as _json
|
|
raised_sigs = _gas(group_id)
|
|
jira_key = None
|
|
for s in raised_sigs:
|
|
if s.get("metadata", {}).get("signal_id") == sig_id:
|
|
try:
|
|
ents = _json.loads(s["metadata"].get("entities", "[]"))
|
|
except Exception:
|
|
ents = []
|
|
jira_key = ents[0] if ents else None
|
|
break
|
|
if jira_key:
|
|
res = await assign_issue(jira_key, account_id)
|
|
if res.get("ok"):
|
|
from backend.config import JIRA_BASE_URL
|
|
await update.message.reply_text(
|
|
f"✅ [{jira_key}](https://{JIRA_BASE_URL}/browse/{jira_key}) assigned to *{assignee_name}*",
|
|
parse_mode="Markdown",
|
|
)
|
|
else:
|
|
await update.message.reply_text(
|
|
f"❌ Could not assign {jira_key}: {res.get('error')}", parse_mode="Markdown"
|
|
)
|
|
return
|
|
|
|
# Raise a new ticket with the assignee baked in
|
|
sig = {
|
|
"id": sig_id,
|
|
"type": meta.get("type", ""),
|
|
"summary": meta.get("summary", ""),
|
|
"severity": meta.get("severity", "high"),
|
|
"raw_quote": best.get("document", ""),
|
|
"entities": meta.get("entities", []),
|
|
"keywords": meta.get("keywords", []),
|
|
"timestamp": meta.get("timestamp", ""),
|
|
"group_id": group_id,
|
|
"lens": meta.get("lens", ""),
|
|
"assignee_override": assignee_name,
|
|
}
|
|
result = await raise_ticket_for_signal(sig, group_id, assignee_account_id=account_id)
|
|
|
|
if result.get("ok"):
|
|
await update.message.reply_text(
|
|
f"✅ [{result['key']}]({result['url']}) raised and assigned to *{assignee_name}*\n"
|
|
f"_{result.get('summary', '')[:90]}_",
|
|
parse_mode="Markdown",
|
|
)
|
|
else:
|
|
reason = result.get("reason", "unknown")
|
|
await update.message.reply_text(
|
|
f"❌ Failed to raise ticket: {result.get('error', reason)}", parse_mode="Markdown"
|
|
)
|
|
|
|
|
|
async def _handle_ask_jira_raise(update, context, question: str, group_id: str):
|
|
"""
|
|
Called from cmd_ask when a Jira raise intent is detected.
|
|
Finds signals matching the topic in the query, applies priority override from
|
|
keywords (priority 0 / p0 / critical → Highest; priority / urgent / p1 → High),
|
|
then raises Jira tickets and reports results.
|
|
"""
|
|
import re
|
|
from backend.integrations.jira_client import is_configured
|
|
from backend.agents.jira_agent import (
|
|
raise_ticket_for_signal,
|
|
RAISEABLE_TYPES,
|
|
format_raise_result_for_telegram,
|
|
SEVERITY_TO_PRIORITY,
|
|
)
|
|
from backend.db.chroma import query_signals, get_all_signals, get_raised_signal_ids
|
|
from backend.config import JIRA_DEFAULT_PROJECT
|
|
|
|
if not is_configured():
|
|
await update.message.reply_text(
|
|
"⚙️ Jira is not configured. Add `JIRA_BASE_URL`, `JIRA_EMAIL`, and `JIRA_API_TOKEN` "
|
|
"to your .env and restart.",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
# ── Determine priority override from keywords in the query ────────────────
|
|
q_lower = question.lower()
|
|
if any(kw in q_lower for kw in ("priority 0", "p0", "critical", "blocker")):
|
|
priority_override = "Highest"
|
|
elif any(kw in q_lower for kw in ("priority 1", "p1", "urgent", "high priority")):
|
|
priority_override = "High"
|
|
elif "priority" in q_lower:
|
|
# Generic "priority" keyword without a level → treat as High
|
|
priority_override = "High"
|
|
else:
|
|
priority_override = None # Use each signal's own severity
|
|
|
|
# ── Extract the topic by stripping the raise intent phrase ────────────────
|
|
_STRIP_PATTERNS = [
|
|
r"raise jira (token|ticket|tickets)?\s*(on|for|about|regarding)?\s*(the\s*)?",
|
|
r"jira raise\s*(on|for|about|regarding)?\s*(the\s*)?",
|
|
r"create jira (ticket|tickets)?\s*(on|for|about|regarding)?\s*(the\s*)?",
|
|
r"file jira (ticket|tickets)?\s*(on|for|about|regarding)?\s*(the\s*)?",
|
|
r"open jira (ticket|tickets)?\s*(on|for|about|regarding)?\s*(the\s*)?",
|
|
r"raise (a\s*)?(jira\s*)?(ticket|tickets)?\s*(on|for|about|regarding)?\s*(the\s*)?",
|
|
r"create (a\s*)?ticket\s*(on|for|about|regarding)?\s*(the\s*)?",
|
|
]
|
|
topic = q_lower
|
|
for pattern in _STRIP_PATTERNS:
|
|
topic = re.sub(pattern, "", topic, flags=re.IGNORECASE).strip()
|
|
topic = topic.strip(" .,!?") or "priority fixes"
|
|
|
|
# ── Flush pending buffer so we see the latest messages ────────────────────
|
|
if _buffers.get(group_id):
|
|
batch = _buffers[group_id]
|
|
_buffers[group_id] = []
|
|
if group_id in _flush_timers:
|
|
_flush_timers[group_id].cancel()
|
|
del _flush_timers[group_id]
|
|
try:
|
|
await process_message_batch(group_id, batch)
|
|
except Exception as e:
|
|
logger.warning(f"Pre-ask-jira flush failed for {group_id}: {e}")
|
|
|
|
await update.message.reply_text(
|
|
"🔍 Searching signals to raise...", parse_mode="Markdown"
|
|
)
|
|
|
|
# ── Find raiseable signals matching the topic ──────────────────────────────
|
|
already_raised = get_raised_signal_ids(group_id)
|
|
|
|
# Semantic search on topic
|
|
semantic_hits = query_signals(group_id, topic, n_results=10)
|
|
candidates = [
|
|
r
|
|
for r in semantic_hits
|
|
if r.get("metadata", {}).get("type") in RAISEABLE_TYPES
|
|
and r.get("id", "") not in already_raised
|
|
]
|
|
|
|
# Fallback: if semantic search found nothing, use all unraised raiseable signals
|
|
if not candidates:
|
|
all_sigs = get_all_signals(group_id)
|
|
candidates = [
|
|
{"id": s["id"], "document": s["document"], "metadata": s["metadata"]}
|
|
for s in all_sigs
|
|
if s.get("metadata", {}).get("type") in RAISEABLE_TYPES
|
|
and s.get("id", "") not in already_raised
|
|
][:5]
|
|
|
|
if not candidates:
|
|
await update.message.reply_text(
|
|
f"📭 No raiseable signals found for *{topic}* in this group.\n\n"
|
|
"Make sure conversations have been processed first, or use `/jira` to see "
|
|
"all pending signals.",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
prio_label = f" with *{priority_override}* priority" if priority_override else ""
|
|
await update.message.reply_text(
|
|
f"🚀 Raising {len(candidates)} ticket(s){prio_label} for: _{topic}_",
|
|
parse_mode="Markdown",
|
|
)
|
|
|
|
# ── Raise each ticket ─────────────────────────────────────────────────────
|
|
_rev_priority = {
|
|
"Highest": "critical",
|
|
"High": "high",
|
|
"Medium": "medium",
|
|
"Low": "low",
|
|
}
|
|
results = []
|
|
for r in candidates:
|
|
meta = r.get("metadata", {})
|
|
sig = {
|
|
"id": r.get("id", ""),
|
|
"type": meta.get("type", ""),
|
|
"summary": meta.get("summary", ""),
|
|
"severity": meta.get("severity", "medium"),
|
|
"raw_quote": r.get("document", ""),
|
|
"entities": meta.get("entities", []),
|
|
"keywords": meta.get("keywords", []),
|
|
"timestamp": meta.get("timestamp", ""),
|
|
"group_id": group_id,
|
|
"lens": meta.get("lens", ""),
|
|
}
|
|
if priority_override:
|
|
sig["severity"] = _rev_priority.get(priority_override, "high")
|
|
|
|
result = await raise_ticket_for_signal(sig, group_id)
|
|
results.append(
|
|
{
|
|
**result,
|
|
"signal_type": sig["type"],
|
|
"signal_summary": sig["summary"][:80],
|
|
}
|
|
)
|
|
|
|
raised = [r for r in results if r.get("ok")]
|
|
failed = [
|
|
r
|
|
for r in results
|
|
if not r.get("ok")
|
|
and r.get("reason") not in ("already_raised", "not_raiseable")
|
|
]
|
|
|
|
lines = [f"🎫 *Jira Tickets Raised* ({len(raised)}/{len(candidates)})\n"]
|
|
for r in results:
|
|
lines.append(format_raise_result_for_telegram(r))
|
|
if failed:
|
|
lines.append(f"\n⚠️ {len(failed)} failed — check logs for details")
|
|
if not raised and not failed:
|
|
lines.append("⏭️ All matching signals were already raised or not raiseable.")
|
|
|
|
await update.message.reply_text("\n".join(lines), parse_mode="Markdown")
|
|
|
|
|
|
async def cmd_flush(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Manually flush buffered messages for immediate processing."""
|
|
group_id = str(update.message.chat_id)
|
|
|
|
if group_id not in _buffers or not _buffers[group_id]:
|
|
await update.message.reply_text("✅ No messages in buffer (already processed)")
|
|
return
|
|
|
|
batch = _buffers[group_id]
|
|
_buffers[group_id] = []
|
|
|
|
# Cancel any pending timer
|
|
if group_id in _flush_timers:
|
|
_flush_timers[group_id].cancel()
|
|
del _flush_timers[group_id]
|
|
|
|
await update.message.reply_text(
|
|
f"⚡ Processing {len(batch)} buffered message(s)..."
|
|
)
|
|
|
|
try:
|
|
signals = await process_message_batch(group_id, batch)
|
|
if signals:
|
|
await update.message.reply_text(
|
|
f"✅ Processed {len(batch)} messages → {len(signals)} signals extracted.\nYou can now `/ask` questions about them.",
|
|
parse_mode="Markdown",
|
|
)
|
|
logger.info(
|
|
f"Manual flush: {len(signals)} signals from {_group_names.get(group_id, group_id)}"
|
|
)
|
|
else:
|
|
await update.message.reply_text(
|
|
"✅ Messages processed (no significant signals detected)"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Manual flush failed: {e}")
|
|
await update.message.reply_text(f"❌ Processing failed: {str(e)[:100]}")
|
|
|
|
|
|
async def cmd_debug(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Show recent signals for debugging."""
|
|
group_id = str(update.message.chat_id)
|
|
from backend.db.chroma import get_all_signals
|
|
import json
|
|
|
|
signals = get_all_signals(group_id)
|
|
|
|
if not signals:
|
|
await update.message.reply_text("No signals found for this group yet.")
|
|
return
|
|
|
|
# Show last 5 signals
|
|
lines = [f"🔍 *Debug: Last 5 Signals*\n"]
|
|
|
|
for sig in signals[-5:]:
|
|
meta = sig.get("metadata", {})
|
|
doc = sig.get("document", "")
|
|
|
|
sig_type = meta.get("type", "unknown")
|
|
timestamp = meta.get("timestamp", "")[:16] # Just date+time
|
|
|
|
# Extract summary from document
|
|
if ": " in doc:
|
|
summary = doc.split(": ", 1)[1].split(" | Quote:")[0]
|
|
else:
|
|
summary = doc
|
|
|
|
lines.append(f"• [{sig_type}] {timestamp}")
|
|
lines.append(f" {summary[:100]}...\n")
|
|
|
|
await update.message.reply_text("\n".join(lines), parse_mode="Markdown")
|
|
|
|
|
|
async def cmd_digest(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Generate a summary digest."""
|
|
group_id = str(update.message.chat_id)
|
|
from backend.db.chroma import get_all_signals
|
|
|
|
signals = get_all_signals(group_id)
|
|
if not signals:
|
|
await update.message.reply_text(
|
|
"No intelligence gathered yet. I need more conversation to learn from!"
|
|
)
|
|
return
|
|
|
|
# Group by type
|
|
by_type = defaultdict(list)
|
|
for s in signals:
|
|
by_type[s["metadata"].get("type", "other")].append(s)
|
|
|
|
parts = [f"📊 *Intelligence Digest* ({len(signals)} total signals)\n"]
|
|
for sig_type, sigs in sorted(by_type.items(), key=lambda x: -len(x[1])):
|
|
parts.append(f"• {sig_type.replace('_', ' ').title()}: {len(sigs)} signals")
|
|
|
|
await update.message.reply_text("\n".join(parts))
|
|
|
|
|
|
async def cmd_lens(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Set or check the active lens."""
|
|
group_id = str(update.message.chat_id)
|
|
|
|
if context.args:
|
|
mode = context.args[0].lower()
|
|
if mode in ("dev", "product", "client", "community", "auto"):
|
|
set_lens(group_id, mode)
|
|
await update.message.reply_text(f"🔭 Lens set to: {mode}")
|
|
else:
|
|
await update.message.reply_text(
|
|
"Valid lenses: dev, product, client, community, auto"
|
|
)
|
|
else:
|
|
current = get_lens(group_id)
|
|
await update.message.reply_text(f"🔭 Current lens: {current}")
|
|
|
|
|
|
async def cmd_search(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle /search [query] — explicit web search."""
|
|
if not ENABLE_WEB_SEARCH:
|
|
await update.message.reply_text("🔍 Web search is currently disabled.")
|
|
return
|
|
|
|
if not context.args:
|
|
await update.message.reply_text(
|
|
"Usage: /search [your query]\nExample: /search FastAPI rate limiting best practices"
|
|
)
|
|
return
|
|
|
|
query = " ".join(context.args)
|
|
await update.message.reply_text(f"🌐 Searching the web for: {query}...")
|
|
|
|
try:
|
|
results = await search_web(query, max_results=3)
|
|
if not results:
|
|
await update.message.reply_text(
|
|
"No web results found. Try a different query."
|
|
)
|
|
return
|
|
|
|
parts = [f"🌐 Web Search: {query}\n"]
|
|
for i, r in enumerate(results):
|
|
snippet = (
|
|
r["content"][:200] + "..." if len(r["content"]) > 200 else r["content"]
|
|
)
|
|
parts.append(f"{i+1}. {r['title']}\n{snippet}\n🔗 {r['url']}\n")
|
|
|
|
await update.message.reply_text("\n".join(parts))
|
|
|
|
except Exception as e:
|
|
await update.message.reply_text(f"Search failed: {str(e)[:100]}")
|
|
|
|
|
|
async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Welcome message."""
|
|
await update.message.reply_text(
|
|
"👁️ *ThirdEye* — Conversation Intelligence Engine\n\n"
|
|
"I'm now listening to this group and extracting intelligence from your conversations.\n\n"
|
|
"Commands:\n"
|
|
"/ask [question] — Ask about your team's knowledge\n"
|
|
" ↳ /ask raise jira on [topic] — raise Jira tickets matching a topic\n"
|
|
" ↳ /ask raise jira on priority fixes — raises as High priority automatically\n"
|
|
" ↳ /ask assign jira ticket for [topic] to [Name] — raise & assign to a person\n"
|
|
"/search [query] — Search the web for external info\n"
|
|
"/digest — Get an intelligence summary\n"
|
|
"/lens [mode] — Set detection mode (dev/product/client/community)\n"
|
|
"/alerts — View active warnings\n"
|
|
"/flush — Process buffered messages now\n"
|
|
"/debug — Show last 5 signals\n\n"
|
|
"📄 Share documents (PDF, DOCX, TXT) — I'll ingest them into the knowledge base.\n"
|
|
"🔗 Share links — I'll fetch and store their content.\n\n"
|
|
"🎙️ /meetsum [id] — Summarize a meeting\n"
|
|
"🔍 /meetask [id] [q] — Ask about a meeting\n"
|
|
"🔗 /meetmatch [id] — Match meeting to team chats\n"
|
|
"🎫 /jira — Preview & raise Jira tickets from signals\n"
|
|
"📋 /jirastatus [KEY] — Get ticket status (e.g. ENG-42)\n"
|
|
"🔍 /jirasearch [query] — Search Jira in plain English\n"
|
|
"📜 /jiraraised — See all tickets ThirdEye has raised\n"
|
|
"👁️ /jirawatch [on|off] — Toggle auto-raise mode\n\n"
|
|
"🎤 /voicelog — Audit trail of all voice note decisions\n"
|
|
"🎤 /voicelog @name — Voice notes by a specific person\n"
|
|
"🎤 /voicelog decisions|actions|blockers — Filter by type\n"
|
|
"🎤 /voicelog search [q] — Search voice note content\n\n"
|
|
"I work passively — no need to tag me. I'll alert you when I spot patterns or issues.",
|
|
parse_mode=None,
|
|
)
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Meet Commands — add to existing commands.py
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
|
async def cmd_meetsum(update, context):
|
|
"""
|
|
/meetsum [meeting_id]
|
|
Summarizes a meeting. Uses the most recent meeting if no ID given.
|
|
"""
|
|
from backend.db.chroma import query_signals
|
|
from backend.config import MEET_DEFAULT_GROUP_ID
|
|
|
|
args = context.args or []
|
|
group_id = MEET_DEFAULT_GROUP_ID
|
|
|
|
if args:
|
|
meeting_id = args[0]
|
|
else:
|
|
# Find the most recent meeting ID from meet_summary signals
|
|
all_summaries = query_signals(
|
|
group_id, "meeting summary", n_results=5, signal_type="meet_summary"
|
|
)
|
|
if not all_summaries:
|
|
await update.message.reply_text(
|
|
"📭 No meeting summaries found yet. Record a meeting with the ThirdEye Meet extension first."
|
|
)
|
|
return
|
|
# The most recent signal (by position in results, or pick first)
|
|
meeting_id = all_summaries[0].get("metadata", {}).get("meeting_id", "unknown")
|
|
|
|
if not meeting_id or meeting_id == "unknown":
|
|
await update.message.reply_text("Usage: /meetsum [meeting_id]")
|
|
return
|
|
|
|
# Fetch summary signal
|
|
signals = query_signals(group_id, meeting_id, n_results=20)
|
|
summary_signal = next(
|
|
(s for s in signals if s.get("metadata", {}).get("type") == "meet_summary"),
|
|
None,
|
|
)
|
|
|
|
if summary_signal:
|
|
# Summary text is in the document field
|
|
summary_text = summary_signal.get("metadata", {}).get(
|
|
"raw_quote", ""
|
|
) or summary_signal.get("document", "")
|
|
await update.message.reply_text(
|
|
f"📋 *Meeting Summary*\nID: `{meeting_id}`\n\n{summary_text}",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
# No summary yet — gather structured signals and summarize on-the-fly
|
|
structured = [
|
|
s
|
|
for s in signals
|
|
if s.get("metadata", {}).get("type")
|
|
in ("meet_decision", "meet_action_item", "meet_blocker", "meet_risk")
|
|
]
|
|
|
|
if not structured:
|
|
await update.message.reply_text(
|
|
f"📭 No signals found for meeting `{meeting_id}`. "
|
|
"Make sure the meeting has been recorded and processed.",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
lines = [f"📋 *Meeting Signals* for `{meeting_id}`\n"]
|
|
type_labels = {
|
|
"meet_decision": "✅ Decision",
|
|
"meet_action_item": "📌 Action",
|
|
"meet_blocker": "🚧 Blocker",
|
|
"meet_risk": "⚠️ Risk",
|
|
}
|
|
for s in structured[:10]:
|
|
sig_type = s.get("metadata", {}).get("type", "unknown")
|
|
label = type_labels.get(sig_type, sig_type)
|
|
# Get summary from document text (format: "type: summary | Quote: ...")
|
|
doc = s.get("document", "")
|
|
if ": " in doc:
|
|
summary = doc.split(": ", 1)[1].split(" | Quote:")[0]
|
|
else:
|
|
summary = doc
|
|
lines.append(f"{label}: {summary[:120]}")
|
|
|
|
await update.message.reply_text("\n".join(lines), parse_mode="Markdown")
|
|
|
|
|
|
async def cmd_meetask(update, context):
|
|
"""
|
|
/meetask [meeting_id] [question]
|
|
Asks a question about a specific meeting's knowledge.
|
|
"""
|
|
from backend.pipeline import query_knowledge
|
|
from backend.config import MEET_DEFAULT_GROUP_ID
|
|
|
|
args = context.args or []
|
|
if len(args) < 2:
|
|
await update.message.reply_text(
|
|
"Usage: /meetask [meeting_id] [your question]\n"
|
|
"Example: /meetask abc-defg-hij What did we decide about the database?"
|
|
)
|
|
return
|
|
|
|
meeting_id = args[0]
|
|
question = " ".join(args[1:])
|
|
group_id = MEET_DEFAULT_GROUP_ID
|
|
|
|
await update.message.reply_text(
|
|
f"🔍 Searching meeting `{meeting_id}`...", parse_mode="Markdown"
|
|
)
|
|
|
|
# Augment the question with meeting ID context so the RAG retrieval is scoped
|
|
scoped_question = f"From meeting {meeting_id}: {question}"
|
|
answer = await query_knowledge(group_id, scoped_question)
|
|
|
|
await update.message.reply_text(
|
|
f"🎙️ *Meet Q&A* — `{meeting_id}`\n\nQ: _{question}_\n\nA: {answer}",
|
|
parse_mode="Markdown",
|
|
)
|
|
|
|
|
|
async def cmd_meetmatch(update, context):
|
|
"""
|
|
/meetmatch [meeting_id]
|
|
Finds where a meeting's decisions/blockers match or conflict with your team chat signals.
|
|
"""
|
|
from backend.agents.meet_cross_ref import (
|
|
find_cross_references,
|
|
format_cross_ref_for_telegram,
|
|
)
|
|
from backend.db.chroma import query_signals
|
|
from backend.config import MEET_DEFAULT_GROUP_ID
|
|
|
|
args = context.args or []
|
|
group_id = MEET_DEFAULT_GROUP_ID
|
|
|
|
if args:
|
|
meeting_id = args[0]
|
|
else:
|
|
# Find most recent meeting
|
|
all_summaries = query_signals(
|
|
group_id, "meeting", n_results=5, signal_type="meet_summary"
|
|
)
|
|
if not all_summaries:
|
|
await update.message.reply_text(
|
|
"📭 No meetings found. Record a meeting first with the ThirdEye Meet extension."
|
|
)
|
|
return
|
|
meeting_id = all_summaries[0].get("metadata", {}).get("meeting_id", "unknown")
|
|
|
|
await update.message.reply_text(
|
|
f"🔗 Comparing meeting `{meeting_id}` against your team chats...\n"
|
|
"_This may take a moment._",
|
|
parse_mode="Markdown",
|
|
)
|
|
|
|
analysis = await find_cross_references(meeting_id, group_id=group_id)
|
|
formatted = format_cross_ref_for_telegram(analysis, meeting_id)
|
|
await update.message.reply_text(formatted, parse_mode="Markdown")
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Jira Commands — add to existing commands.py
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
|
async def cmd_jira(update, context):
|
|
"""
|
|
/jira
|
|
Shows unraised high-severity signals in this group and raises them as Jira tickets.
|
|
With no args: shows a preview of what would be raised and asks for confirmation.
|
|
With 'confirm': actually raises the tickets.
|
|
|
|
Usage:
|
|
/jira — preview unraised signals
|
|
/jira confirm — raise all unraised high+ severity signals now
|
|
/jira [signal_type] — raise only signals of that type (e.g. /jira recurring_bug)
|
|
"""
|
|
from backend.db.chroma import get_all_signals, get_raised_signal_ids
|
|
from backend.agents.jira_agent import (
|
|
bulk_raise_for_group,
|
|
RAISEABLE_TYPES,
|
|
SEVERITY_TO_PRIORITY,
|
|
format_raise_result_for_telegram,
|
|
)
|
|
from backend.integrations.jira_client import is_configured
|
|
from backend.config import JIRA_DEFAULT_PROJECT
|
|
|
|
chat_id = str(update.effective_chat.id)
|
|
args = context.args or []
|
|
confirm = "confirm" in [a.lower() for a in args]
|
|
type_filter = next(
|
|
(a for a in args if a.lower() not in ("confirm",) and "_" in a), None
|
|
)
|
|
|
|
if not is_configured():
|
|
await update.message.reply_text(
|
|
"⚙️ Jira is not configured. Add `JIRA_BASE_URL`, `JIRA_EMAIL`, and `JIRA_API_TOKEN` to your .env and restart."
|
|
)
|
|
return
|
|
|
|
await update.message.reply_text("🔍 Scanning signals...", parse_mode="Markdown")
|
|
|
|
all_signals = get_all_signals(chat_id)
|
|
already_raised = get_raised_signal_ids(chat_id)
|
|
severity_rank = {"low": 0, "medium": 1, "high": 2, "critical": 3}
|
|
|
|
candidates = [
|
|
s
|
|
for s in all_signals
|
|
if s.get("type") in RAISEABLE_TYPES
|
|
and s.get("id", "") not in already_raised
|
|
and severity_rank.get(s.get("severity", "low"), 0) >= 2 # high+
|
|
and (not type_filter or s.get("type") == type_filter)
|
|
]
|
|
|
|
if not candidates:
|
|
await update.message.reply_text(
|
|
"✅ Nothing to raise — no unraised high-severity signals found in this group.\n"
|
|
"Use `/ask` to query existing knowledge or wait for new signals.",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
if not confirm:
|
|
# Preview mode — show what would be raised
|
|
lines = [
|
|
f"📋 *{len(candidates)} signal(s) ready to raise in `{JIRA_DEFAULT_PROJECT}`*\n"
|
|
]
|
|
for s in candidates[:8]:
|
|
lines.append(
|
|
f"• [{s.get('severity','?').upper()}] `{s.get('type','?')}` — "
|
|
f"_{s.get('summary','')[:80]}_"
|
|
)
|
|
if len(candidates) > 8:
|
|
lines.append(f"_...and {len(candidates) - 8} more_")
|
|
lines.append(f"\nSend `/jira confirm` to raise all of these as Jira tickets.")
|
|
await update.message.reply_text("\n".join(lines), parse_mode="Markdown")
|
|
return
|
|
|
|
# Confirm mode — raise the tickets
|
|
await update.message.reply_text(
|
|
f"🚀 Raising {len(candidates)} ticket(s) in `{JIRA_DEFAULT_PROJECT}`...",
|
|
parse_mode="Markdown",
|
|
)
|
|
results = await bulk_raise_for_group(
|
|
group_id=chat_id,
|
|
signals=candidates,
|
|
min_severity="high",
|
|
max_tickets=10,
|
|
)
|
|
|
|
raised = [r for r in results if r.get("ok")]
|
|
failed = [
|
|
r
|
|
for r in results
|
|
if not r.get("ok")
|
|
and r.get("reason") not in ("already_raised", "not_raiseable")
|
|
]
|
|
|
|
lines = [f"🎫 *Jira Tickets Raised* ({len(raised)}/{len(candidates)})\n"]
|
|
for r in raised:
|
|
lines.append(format_raise_result_for_telegram(r))
|
|
if failed:
|
|
lines.append(f"\n⚠️ {len(failed)} failed — check logs")
|
|
|
|
await update.message.reply_text("\n".join(lines), parse_mode="Markdown")
|
|
|
|
|
|
async def cmd_jirastatus(update, context):
|
|
"""
|
|
/jirastatus [TICKET-KEY]
|
|
Get the current status of a specific Jira ticket.
|
|
|
|
Usage:
|
|
/jirastatus ENG-42
|
|
"""
|
|
from backend.integrations.jira_client import get_issue, is_configured
|
|
|
|
if not is_configured():
|
|
await update.message.reply_text("⚙️ Jira not configured.")
|
|
return
|
|
|
|
args = context.args or []
|
|
if not args:
|
|
await update.message.reply_text(
|
|
"Usage: `/jirastatus TICKET-KEY`\nExample: `/jirastatus ENG-42`",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
issue_key = args[0].upper()
|
|
try:
|
|
issue = await get_issue(issue_key)
|
|
status_emoji = {
|
|
"To Do": "🔵",
|
|
"In Progress": "🟡",
|
|
"Done": "✅",
|
|
"Blocked": "🔴",
|
|
}.get(issue["status"], "⚪")
|
|
msg = (
|
|
f"🎫 *{issue['key']}*\n"
|
|
f"{status_emoji} Status: *{issue['status']}*\n"
|
|
f"📌 Type: {issue['issue_type']} | ⚡ Priority: {issue['priority']}\n"
|
|
f"👤 Assignee: {issue['assignee']}\n"
|
|
f"📝 {issue['summary']}\n\n"
|
|
f"🔗 [Open in Jira]({issue['url']})"
|
|
)
|
|
await update.message.reply_text(msg, parse_mode="Markdown")
|
|
except Exception as e:
|
|
await update.message.reply_text(
|
|
f"❌ Could not fetch `{issue_key}`: {e}", parse_mode="Markdown"
|
|
)
|
|
|
|
|
|
async def cmd_jirasearch(update, context):
|
|
"""
|
|
/jirasearch [query]
|
|
Search Jira using natural language. ThirdEye converts it to JQL automatically.
|
|
|
|
Usage:
|
|
/jirasearch open bugs assigned to Alex
|
|
/jirasearch all thirdeye tickets from this sprint
|
|
"""
|
|
from backend.integrations.jira_client import search_issues, is_configured
|
|
from backend.providers import call_llm
|
|
from backend.config import JIRA_DEFAULT_PROJECT
|
|
|
|
if not is_configured():
|
|
await update.message.reply_text("⚙️ Jira not configured.")
|
|
return
|
|
|
|
args = context.args or []
|
|
if not args:
|
|
await update.message.reply_text(
|
|
"Usage: `/jirasearch [natural language query]`\n"
|
|
"Example: `/jirasearch open bugs assigned to Alex`",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
query = " ".join(args)
|
|
await update.message.reply_text(
|
|
f"🔍 Searching Jira for: _{query}_...", parse_mode="Markdown"
|
|
)
|
|
|
|
# Convert natural language → JQL via LLM
|
|
try:
|
|
jql_result = await call_llm(
|
|
task_type="fast_small",
|
|
messages=[
|
|
{
|
|
"role": "system",
|
|
"content": (
|
|
f"Convert the user's natural language query into a valid Jira JQL query. "
|
|
f"Default project is '{JIRA_DEFAULT_PROJECT}'. "
|
|
"Return ONLY the JQL string — no explanation, no quotes around it, no markdown."
|
|
),
|
|
},
|
|
{"role": "user", "content": query},
|
|
],
|
|
temperature=0.0,
|
|
max_tokens=150,
|
|
)
|
|
jql = jql_result["content"].strip().strip('"').strip("'")
|
|
except Exception:
|
|
# Fallback: simple text search JQL
|
|
jql = f'project = {JIRA_DEFAULT_PROJECT} AND text ~ "{query}" ORDER BY created DESC'
|
|
|
|
try:
|
|
results = await search_issues(jql, max_results=8)
|
|
except Exception as e:
|
|
await update.message.reply_text(
|
|
f"❌ JQL search failed.\nJQL used: `{jql}`\nError: {e}",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
if not results:
|
|
await update.message.reply_text(
|
|
f"📭 No results found.\nJQL: `{jql}`",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
status_emoji = {"To Do": "🔵", "In Progress": "🟡", "Done": "✅"}.get
|
|
lines = [f"🔍 *Jira Search Results* ({len(results)} found)\nQuery: _{query}_\n"]
|
|
for r in results:
|
|
emoji = {"To Do": "🔵", "In Progress": "🟡", "Done": "✅"}.get(
|
|
r["status"], "⚪"
|
|
)
|
|
lines.append(
|
|
f"{emoji} [{r['key']}]({r['url']}) — _{r['summary'][:70]}_\n"
|
|
f" {r['status']} | {r['priority']} | {r['assignee']}"
|
|
)
|
|
await update.message.reply_text("\n".join(lines), parse_mode="Markdown")
|
|
|
|
|
|
async def cmd_jiraraised(update, context):
|
|
"""
|
|
/jiraraised
|
|
Shows all tickets ThirdEye has previously raised for this group.
|
|
"""
|
|
from backend.db.chroma import query_signals
|
|
from backend.integrations.jira_client import is_configured
|
|
from backend.config import JIRA_BASE_URL
|
|
|
|
if not is_configured():
|
|
await update.message.reply_text("⚙️ Jira not configured.")
|
|
return
|
|
|
|
chat_id = str(update.effective_chat.id)
|
|
raised_signals = query_signals(
|
|
chat_id, "jira raised ticket", n_results=20, signal_type="jira_raised"
|
|
)
|
|
|
|
if not raised_signals:
|
|
await update.message.reply_text(
|
|
"📭 No tickets raised yet for this group. Use `/jira confirm` to raise tickets.",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
lines = [f"🎫 *Tickets Raised by ThirdEye* ({len(raised_signals)} total)\n"]
|
|
for sig in raised_signals[:10]:
|
|
meta = sig.get("metadata", {})
|
|
# entities is stored as a JSON string in ChromaDB metadata
|
|
try:
|
|
import json as _json
|
|
entities_raw = meta.get("entities", "[]")
|
|
entities = _json.loads(entities_raw) if isinstance(entities_raw, str) else (entities_raw or [])
|
|
except Exception:
|
|
entities = []
|
|
jira_key = entities[0] if entities else "Unknown"
|
|
url = f"{JIRA_BASE_URL}/browse/{jira_key}"
|
|
label = meta.get("summary", "") or sig.get("document", "")
|
|
lines.append(f"• [{jira_key}]({url}) — _{label[:80]}_")
|
|
|
|
if len(raised_signals) > 10:
|
|
lines.append(f"_...and {len(raised_signals) - 10} more_")
|
|
|
|
await update.message.reply_text("\n".join(lines), parse_mode="Markdown")
|
|
|
|
|
|
async def cmd_jirawatch(update, context):
|
|
"""
|
|
/jirawatch [on|off]
|
|
Enable or disable auto-raise mode for this group.
|
|
When ON: any new signal at or above JIRA_AUTO_RAISE_SEVERITY is automatically
|
|
filed as a Jira ticket without manual /jira confirm.
|
|
|
|
Usage:
|
|
/jirawatch on
|
|
/jirawatch off
|
|
/jirawatch — show current status
|
|
"""
|
|
import json
|
|
from telegram.ext import ContextTypes
|
|
|
|
args = context.args or []
|
|
chat_id = str(update.effective_chat.id)
|
|
|
|
# Store watch state in bot_data (in-memory; persists until restart)
|
|
if not hasattr(context, "bot_data"):
|
|
context.bot_data = {}
|
|
watch_key = f"jirawatch_{chat_id}"
|
|
|
|
if not args:
|
|
current = context.bot_data.get(watch_key, False)
|
|
status = "🟢 ON" if current else "🔴 OFF"
|
|
await update.message.reply_text(
|
|
f"👁️ *Jira Auto-Raise* — {status}\n\n"
|
|
"When ON, ThirdEye automatically raises Jira tickets for any high or critical "
|
|
"severity signal detected in this group — no `/jira confirm` needed.\n\n"
|
|
"Use `/jirawatch on` or `/jirawatch off` to toggle.",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
mode = args[0].lower()
|
|
if mode == "on":
|
|
context.bot_data[watch_key] = True
|
|
await update.message.reply_text(
|
|
"🟢 *Jira Auto-Raise: ON*\n"
|
|
"Any new `high` or `critical` signal detected in this group will automatically "
|
|
"be raised as a Jira ticket. You'll be notified here.\n\n"
|
|
"Use `/jirawatch off` to disable.",
|
|
parse_mode="Markdown",
|
|
)
|
|
elif mode == "off":
|
|
context.bot_data[watch_key] = False
|
|
await update.message.reply_text(
|
|
"🔴 *Jira Auto-Raise: OFF*\n"
|
|
"Use `/jira confirm` to raise tickets manually.",
|
|
parse_mode="Markdown",
|
|
)
|
|
else:
|
|
await update.message.reply_text(
|
|
"Usage: `/jirawatch on` or `/jirawatch off`", parse_mode="Markdown"
|
|
)
|
|
|
|
|
|
# -----------------------------------------------------------------
|
|
# Voice Message Handlers — add to commands.py
|
|
# -----------------------------------------------------------------
|
|
|
|
|
|
async def handle_voice_telegram(update, context):
|
|
"""
|
|
Fires for every voice message sent to a monitored group.
|
|
Downloads, transcribes via Groq Whisper, feeds into signal pipeline.
|
|
"""
|
|
from backend.agents.voice_handler import handle_voice_message
|
|
from backend.config import ENABLE_VOICE_TRANSCRIPTION
|
|
|
|
if not ENABLE_VOICE_TRANSCRIPTION:
|
|
return
|
|
|
|
msg = update.message
|
|
if not msg or not msg.voice:
|
|
return
|
|
|
|
group_id = str(msg.chat_id)
|
|
sender = msg.from_user.full_name or msg.from_user.username or "Unknown"
|
|
voice = msg.voice
|
|
duration = voice.duration or 0
|
|
|
|
# React with 👂 immediately so team knows ThirdEye is processing
|
|
try:
|
|
await msg.set_reaction("👂")
|
|
except Exception:
|
|
pass # Reactions need bot admin rights — fail silently
|
|
|
|
result = await handle_voice_message(
|
|
bot=context.bot,
|
|
group_id=group_id,
|
|
sender=sender,
|
|
file_id=voice.file_id,
|
|
duration_seconds=duration,
|
|
message_date=msg.date,
|
|
is_video_note=False,
|
|
)
|
|
|
|
if result["ok"]:
|
|
signals = result["signals_extracted"]
|
|
reply = (
|
|
f"🎤 *{sender}* ({duration}s) — transcribed\n"
|
|
f"_{result['transcript'][:120]}{'...' if len(result['transcript']) > 120 else ''}_\n"
|
|
f"`{signals} signal{'s' if signals != 1 else ''} extracted`"
|
|
)
|
|
await msg.reply_text(reply, parse_mode="Markdown")
|
|
elif result["reason"] == "too_long":
|
|
await msg.reply_text(
|
|
f"⏭️ Voice note from *{sender}* skipped — too long ({duration}s).",
|
|
parse_mode="Markdown",
|
|
)
|
|
elif result["reason"] in ("no_speech", "too_short", "empty"):
|
|
pass # Silent skip
|
|
else:
|
|
logger.warning(f"Voice error for {sender}: {result.get('error')}")
|
|
|
|
|
|
async def handle_video_note_telegram(update, context):
|
|
"""
|
|
Fires for round video messages (video notes).
|
|
Same pipeline as voice messages — also contain audio.
|
|
"""
|
|
from backend.agents.voice_handler import handle_voice_message
|
|
from backend.config import ENABLE_VOICE_TRANSCRIPTION
|
|
|
|
if not ENABLE_VOICE_TRANSCRIPTION:
|
|
return
|
|
|
|
msg = update.message
|
|
if not msg or not msg.video_note:
|
|
return
|
|
|
|
group_id = str(msg.chat_id)
|
|
sender = msg.from_user.full_name or msg.from_user.username or "Unknown"
|
|
video_note = msg.video_note
|
|
duration = video_note.duration or 0
|
|
|
|
try:
|
|
await msg.set_reaction("👂")
|
|
except Exception:
|
|
pass
|
|
|
|
result = await handle_voice_message(
|
|
bot=context.bot,
|
|
group_id=group_id,
|
|
sender=sender,
|
|
file_id=video_note.file_id,
|
|
duration_seconds=duration,
|
|
message_date=msg.date,
|
|
is_video_note=True,
|
|
)
|
|
|
|
if result["ok"]:
|
|
signals = result["signals_extracted"]
|
|
reply = (
|
|
f"📹 *{sender}* ({duration}s video note) — transcribed\n"
|
|
f"_{result['transcript'][:120]}{'...' if len(result['transcript']) > 120 else ''}_\n"
|
|
f"`{signals} signal{'s' if signals != 1 else ''} extracted`"
|
|
)
|
|
await msg.reply_text(reply, parse_mode="Markdown")
|
|
|
|
|
|
def run_bot():
|
|
"""Start the Telegram bot."""
|
|
from telegram.request import HTTPXRequest
|
|
|
|
# Configure longer timeouts for slow network or heavy processing
|
|
request = HTTPXRequest(
|
|
connection_pool_size=8,
|
|
connect_timeout=20.0,
|
|
read_timeout=30.0,
|
|
write_timeout=30.0,
|
|
pool_timeout=10.0,
|
|
)
|
|
|
|
app = Application.builder().token(TELEGRAM_BOT_TOKEN).request(request).build()
|
|
|
|
app.add_handler(CommandHandler("start", cmd_start))
|
|
app.add_handler(CommandHandler("ask", cmd_ask))
|
|
app.add_handler(CommandHandler("digest", cmd_digest))
|
|
app.add_handler(CommandHandler("lens", cmd_lens))
|
|
app.add_handler(CommandHandler("flush", cmd_flush))
|
|
app.add_handler(CommandHandler("debug", cmd_debug))
|
|
app.add_handler(MessageHandler(filters.Document.ALL, handle_document))
|
|
app.add_handler(CommandHandler("search", cmd_search))
|
|
app.add_handler(CommandHandler("meetsum", cmd_meetsum))
|
|
app.add_handler(CommandHandler("meetask", cmd_meetask))
|
|
app.add_handler(CommandHandler("meetmatch", cmd_meetmatch))
|
|
app.add_handler(CommandHandler("jira", cmd_jira))
|
|
app.add_handler(CommandHandler("jirastatus", cmd_jirastatus))
|
|
app.add_handler(CommandHandler("jirasearch", cmd_jirasearch))
|
|
app.add_handler(CommandHandler("jiraraised", cmd_jiraraised))
|
|
app.add_handler(CommandHandler("jirawatch", cmd_jirawatch))
|
|
|
|
from backend.bot.commands import cmd_voicelog
|
|
app.add_handler(CommandHandler("voicelog", cmd_voicelog))
|
|
|
|
app.add_handler(
|
|
MessageHandler(filters.VOICE & ~filters.COMMAND, handle_voice_telegram)
|
|
)
|
|
app.add_handler(
|
|
MessageHandler(
|
|
filters.VIDEO_NOTE & ~filters.COMMAND, handle_video_note_telegram
|
|
)
|
|
)
|
|
|
|
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
|
|
|
|
logger.info("ThirdEye bot starting...")
|
|
app.run_polling(allowed_updates=Update.ALL_TYPES)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_bot()
|