mirror of
https://github.com/arkorty/B.Tech-Project-III.git
synced 2026-04-19 12:41:48 +00:00
289 lines
12 KiB
Python
289 lines
12 KiB
Python
"""Core pipeline: message batch → signals → classified → stored → queryable."""
|
|
import asyncio
|
|
import logging
|
|
from backend.agents.signal_extractor import extract_signals
|
|
from backend.agents.classifier import classify_signal
|
|
from backend.agents.context_detector import detect_context
|
|
from backend.db.chroma import store_signals, query_signals
|
|
from backend.db.models import Signal
|
|
|
|
logger = logging.getLogger("thirdeye.pipeline")
|
|
|
|
# In-memory group config store (replace with Redis/DB for production)
|
|
_group_configs = {}
|
|
|
|
|
|
async def detect_and_set_lens(group_id: str, messages_text: str) -> str:
|
|
"""Auto-detect lens for a group from initial messages."""
|
|
result = await detect_context(messages_text)
|
|
_group_configs[group_id] = {
|
|
"lens": result["detected_lens"],
|
|
"confidence": result["confidence"],
|
|
}
|
|
logger.info(f"Group {group_id}: lens={result['detected_lens']} (conf={result['confidence']})")
|
|
return result["detected_lens"]
|
|
|
|
|
|
def get_lens(group_id: str) -> str:
|
|
"""Get the current lens for a group."""
|
|
config = _group_configs.get(group_id, {})
|
|
return config.get("lens", "dev")
|
|
|
|
|
|
def set_lens(group_id: str, lens: str):
|
|
"""Manually set the lens for a group."""
|
|
_group_configs[group_id] = {"lens": lens, "confidence": 1.0}
|
|
|
|
async def _auto_raise_and_notify(group_id: str, signals: list[dict]):
|
|
"""
|
|
Background task: raise Jira tickets for critical signals and log results.
|
|
Called automatically when JIRA_AUTO_RAISE=true in .env.
|
|
Does NOT send Telegram messages (no bot context here) — check logs or /jiraraised.
|
|
"""
|
|
import logging
|
|
logger = logging.getLogger("thirdeye.pipeline.auto_raise")
|
|
|
|
try:
|
|
from backend.agents.jira_agent import bulk_raise_for_group
|
|
results = await bulk_raise_for_group(
|
|
group_id=group_id,
|
|
signals=signals,
|
|
min_severity="high",
|
|
max_tickets=5,
|
|
)
|
|
raised = [r for r in results if r.get("ok")]
|
|
if raised:
|
|
logger.info(
|
|
f"[Auto-raise] Group {group_id}: {len(raised)} ticket(s) raised — "
|
|
+ ", ".join(r.get("key", "?") for r in raised)
|
|
)
|
|
except Exception as e:
|
|
logging.getLogger("thirdeye.pipeline.auto_raise").error(f"Auto-raise failed: {e}")
|
|
|
|
|
|
async def process_message_batch(group_id: str, messages: list[dict]) -> list[Signal]:
|
|
"""
|
|
Process a batch of messages through the full pipeline.
|
|
|
|
Args:
|
|
group_id: Telegram group ID
|
|
messages: List of {"sender": str, "text": str, "timestamp": str}
|
|
|
|
Returns:
|
|
List of stored Signal objects
|
|
"""
|
|
# Format messages for the LLM
|
|
formatted = "\n".join([f"[{m['sender']}]: {m['text']}" for m in messages])
|
|
|
|
# Get or detect lens
|
|
lens = get_lens(group_id)
|
|
if lens == "dev" and group_id not in _group_configs:
|
|
# First time seeing this group — auto-detect
|
|
lens = await detect_and_set_lens(group_id, formatted)
|
|
|
|
# Step 1: Extract signals
|
|
signals = await extract_signals(formatted, group_id, lens=lens)
|
|
|
|
if not signals:
|
|
logger.info(f"No signals extracted from batch in {group_id}")
|
|
return []
|
|
|
|
# Step 2: Classify each signal (parallel for speed)
|
|
classified_signals = await asyncio.gather(*[classify_signal(s) for s in signals])
|
|
|
|
# Step 3: Store in ChromaDB
|
|
store_signals(group_id, [s.model_dump() for s in classified_signals])
|
|
|
|
# Append inside process_message_batch(), after store_signals() call:
|
|
# ─── Auto-raise Jira tickets for critical signals ─────────────────────────────
|
|
from backend.config import JIRA_AUTO_RAISE, ENABLE_JIRA
|
|
|
|
if ENABLE_JIRA and JIRA_AUTO_RAISE and signals:
|
|
from backend.agents.jira_agent import bulk_raise_for_group
|
|
|
|
critical_signals = [
|
|
s for s in signals
|
|
if s.get("severity", "low") in ("high", "critical")
|
|
]
|
|
if critical_signals:
|
|
asyncio.create_task(
|
|
_auto_raise_and_notify(group_id, critical_signals)
|
|
)
|
|
|
|
logger.info(f"Pipeline complete: {len(classified_signals)} signals stored for {group_id}")
|
|
return classified_signals
|
|
|
|
|
|
async def query_knowledge(group_id: str, question: str, force_web_search: bool = False) -> str:
|
|
"""
|
|
Query the knowledge base with natural language, with cross-group fallback and
|
|
conservative web search (only when all internal sources fail).
|
|
|
|
Flow:
|
|
1. Search this group's knowledge base (ChromaDB)
|
|
2. If results are weak, also search all other groups (cross-group fallback)
|
|
3. Only hit the web if no internal knowledge is found AND question is clearly external
|
|
4. LLM synthesizes the best available context into a final answer
|
|
"""
|
|
from backend.providers import call_llm
|
|
from backend.agents.web_search import search_web, format_search_results_for_llm
|
|
from backend.config import ENABLE_WEB_SEARCH
|
|
from backend.db.chroma import query_signals_global
|
|
|
|
# ── Step 1: search this group's own collection ──────────────────────────────
|
|
results = query_signals(group_id, question, n_results=8)
|
|
|
|
# A result is "strong" when the top hit has high semantic similarity (≥ 0.40)
|
|
STRONG_THRESHOLD = 0.40
|
|
has_strong_local = bool(results) and results[0].get("relevance_score", 0) >= STRONG_THRESHOLD
|
|
|
|
# ── Step 2: cross-group fallback ────────────────────────────────────────────
|
|
cross_results = []
|
|
if not has_strong_local:
|
|
cross_results = query_signals_global(question, n_results=8, exclude_group_id=group_id)
|
|
|
|
# Combine: local results first, then cross-group ones (de-duplicated by document text)
|
|
seen_docs = {r["document"] for r in results}
|
|
for cr in cross_results:
|
|
if cr["document"] not in seen_docs:
|
|
results.append(cr)
|
|
seen_docs.add(cr["document"])
|
|
|
|
# ── Recency re-ranking ───────────────────────────────────────────────────────
|
|
# Boost signals that are recent so a fresh update beats an older one on the same topic.
|
|
# Boost formula: +0.4 for brand-new, decays to ~0 after 7 days.
|
|
from datetime import datetime, timezone
|
|
import math
|
|
now = datetime.now(timezone.utc)
|
|
|
|
def _recency_boost(ts_str: str) -> float:
|
|
try:
|
|
ts = datetime.fromisoformat(ts_str)
|
|
if ts.tzinfo is None:
|
|
ts = ts.replace(tzinfo=timezone.utc)
|
|
age_hours = max(0, (now - ts).total_seconds() / 3600)
|
|
return 0.4 * math.exp(-age_hours / 48) # half-life ≈ 33 hours
|
|
except Exception:
|
|
return 0.0
|
|
|
|
for r in results:
|
|
ts = r.get("metadata", {}).get("timestamp", "")
|
|
r["_ranked_score"] = r.get("relevance_score", 0) + _recency_boost(ts)
|
|
|
|
results.sort(key=lambda x: x["_ranked_score"], reverse=True)
|
|
|
|
# Re-evaluate strength after combining and re-ranking
|
|
has_any_internal = bool(results) and results[0].get("relevance_score", 0) >= STRONG_THRESHOLD
|
|
|
|
# ── Build internal context ───────────────────────────────────────────────────
|
|
# Results are already sorted by (relevance + recency). The first result is the
|
|
# best match. We label it explicitly so even small fallback models can't miss it.
|
|
from backend.agents.query_agent import _format_signal_for_context, VOICE_CITATION_INSTRUCTION
|
|
|
|
internal_context = ""
|
|
has_voice_signals = False
|
|
if results:
|
|
context_parts = []
|
|
for i, r in enumerate(results):
|
|
meta = r["metadata"]
|
|
source_group = r.get("source_group_id")
|
|
|
|
if meta.get("source") == "voice" or meta.get("type") == "voice_transcript":
|
|
has_voice_signals = True
|
|
|
|
# Rich source label using voice-aware formatter
|
|
signal_label = _format_signal_for_context(r)
|
|
|
|
rank_header = (
|
|
"*** BEST MATCH (use this as your primary answer) ***\n"
|
|
if i == 0 else
|
|
f"(supporting context {i+1})\n"
|
|
)
|
|
context_parts.append(
|
|
f"{rank_header}"
|
|
f"{signal_label}\n"
|
|
f"Content: {r['document']}\n"
|
|
f"Entities: {meta.get('entities', '[]')}"
|
|
)
|
|
internal_context = "\n\n---\n\n".join(context_parts)
|
|
|
|
# ── Step 3: web search — only when all internal sources fail ────────────────
|
|
# Only keywords that are clearly external / internet-specific trigger web search.
|
|
# Intentionally excludes personal/team words like "update", "current", "what is".
|
|
web_keywords = [
|
|
"latest news", "industry standard", "best practice", "benchmark",
|
|
"security vulnerability", "cve", "public release", "changelog",
|
|
"documentation for", "how to install", "npm package", "pypi",
|
|
]
|
|
question_lower = question.lower()
|
|
wants_external = any(kw in question_lower for kw in web_keywords)
|
|
|
|
# Web search fires only when: explicitly forced, OR no internal knowledge at all
|
|
# AND the question looks like it's asking about something external.
|
|
should_search_web = ENABLE_WEB_SEARCH and (
|
|
force_web_search
|
|
or (not has_any_internal and wants_external)
|
|
)
|
|
|
|
web_context = ""
|
|
used_web = False
|
|
if should_search_web:
|
|
web_results = await search_web(question, max_results=3)
|
|
if web_results:
|
|
web_context = format_search_results_for_llm(web_results)
|
|
used_web = True
|
|
|
|
# ── Step 4: build combined prompt ───────────────────────────────────────────
|
|
if not internal_context and not web_context:
|
|
return (
|
|
"I don't have any information about that yet across all team chats. "
|
|
"The relevant group may need more conversation, or try /search for external info."
|
|
)
|
|
|
|
combined_context = ""
|
|
if internal_context:
|
|
combined_context += (
|
|
"=== INTERNAL KNOWLEDGE BASE (from team conversations & documents) ===\n\n"
|
|
f"{internal_context}\n\n"
|
|
)
|
|
if web_context:
|
|
combined_context += f"=== WEB SEARCH RESULTS ===\n\n{web_context}\n\n"
|
|
|
|
system_prompt = """You are the Query Agent for ThirdEye. Answer the question using the context below.
|
|
|
|
The context is sorted: the BEST MATCH signal appears first and is your primary source.
|
|
Older or supporting signals appear after it — they may be outdated, so prefer the BEST MATCH.
|
|
|
|
RULES:
|
|
- Answer from the BEST MATCH signal first. Only use other signals as supporting context.
|
|
- Quote exact numbers, dates, and durations directly — never paraphrase them.
|
|
- If a signal has a "Quote:" field, that is the verbatim team message — treat it as ground truth.
|
|
- Signals from "other group" are still internal team knowledge.
|
|
- Be concise (2-3 sentences). Plain text only, no markdown headers."""
|
|
|
|
if has_voice_signals:
|
|
system_prompt += VOICE_CITATION_INSTRUCTION
|
|
|
|
messages = [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": f"Context:\n\n{combined_context}\n\nQuestion: {question}"},
|
|
]
|
|
|
|
try:
|
|
result = await call_llm("fast_large", messages, temperature=0.3, max_tokens=600)
|
|
answer = result["content"]
|
|
|
|
sources = []
|
|
if internal_context:
|
|
sources.append("knowledge base")
|
|
if used_web:
|
|
sources.append("web search")
|
|
answer += f"\n\n📌 Sources: {' + '.join(sources)}"
|
|
|
|
return answer
|
|
except Exception as e:
|
|
logger.error(f"Query agent failed: {e}")
|
|
return "Sorry, I encountered an error while searching. Please try again."
|
|
|
|
|