mirror of
https://github.com/arkorty/B.Tech-Project-III.git
synced 2026-04-19 12:41:48 +00:00
786 lines
29 KiB
Python
786 lines
29 KiB
Python
"""FastAPI routes for the ThirdEye dashboard."""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from fastapi import FastAPI, HTTPException, Request, BackgroundTasks
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from backend.db.chroma import get_all_signals, query_signals, get_group_ids, get_group_names
|
|
from backend.pipeline import query_knowledge, get_lens, set_lens
|
|
from backend.agents.pattern_detector import detect_patterns
|
|
from backend.agents.cross_group_analyst import analyze_cross_group
|
|
from collections import defaultdict
|
|
|
|
logger = logging.getLogger("thirdeye.api")
|
|
|
|
app = FastAPI(title="ThirdEye API", version="1.0.0")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
@app.get("/api/groups")
|
|
async def list_groups():
|
|
"""List all monitored groups."""
|
|
group_ids = get_group_ids()
|
|
names = get_group_names()
|
|
groups = []
|
|
for gid in group_ids:
|
|
signals = get_all_signals(gid)
|
|
groups.append({
|
|
"group_id": gid,
|
|
"group_name": names.get(gid, gid),
|
|
"signal_count": len(signals),
|
|
"lens": get_lens(gid),
|
|
})
|
|
return {"groups": groups}
|
|
|
|
|
|
@app.get("/api/groups/{group_id}/signals")
|
|
async def get_signals(
|
|
group_id: str,
|
|
signal_type: str = None,
|
|
severity: str = None,
|
|
lens: str = None,
|
|
date_from: str = None,
|
|
date_to: str = None,
|
|
):
|
|
"""Get signals for a group with optional filters."""
|
|
signals = get_all_signals(group_id, signal_type=signal_type)
|
|
|
|
if severity:
|
|
signals = [s for s in signals if s.get("metadata", {}).get("severity") == severity]
|
|
if lens:
|
|
signals = [s for s in signals if s.get("metadata", {}).get("lens") == lens]
|
|
if date_from:
|
|
signals = [s for s in signals if s.get("metadata", {}).get("timestamp", "") >= date_from]
|
|
if date_to:
|
|
signals = [s for s in signals if s.get("metadata", {}).get("timestamp", "") <= date_to]
|
|
|
|
signals.sort(key=lambda s: s.get("metadata", {}).get("timestamp", ""), reverse=True)
|
|
return {"signals": signals, "count": len(signals)}
|
|
|
|
|
|
@app.post("/api/groups/{group_id}/query")
|
|
async def query_group(group_id: str, body: dict):
|
|
"""Natural language query over a group's knowledge base."""
|
|
question = body.get("question", "")
|
|
if not question:
|
|
raise HTTPException(400, "Missing 'question' field")
|
|
try:
|
|
answer = await query_knowledge(group_id, question)
|
|
return {"answer": answer, "question": question}
|
|
except Exception as e:
|
|
logger.warning(f"Query failed for {group_id}: {e}")
|
|
raise HTTPException(500, "Query processing failed — please try again")
|
|
|
|
|
|
@app.get("/api/groups/{group_id}/patterns")
|
|
async def get_patterns(group_id: str):
|
|
"""Detect and return patterns for a group."""
|
|
try:
|
|
patterns = await asyncio.wait_for(detect_patterns(group_id), timeout=25.0)
|
|
return {"patterns": [p.model_dump() for p in patterns]}
|
|
except asyncio.TimeoutError:
|
|
logger.warning(f"Pattern detection timed out for {group_id}")
|
|
return {"patterns": []}
|
|
except Exception as e:
|
|
logger.warning(f"Pattern detection failed for {group_id}: {e}")
|
|
return {"patterns": []}
|
|
|
|
|
|
@app.get("/api/cross-group/insights")
|
|
async def get_cross_group_insights():
|
|
"""Run cross-group analysis and return insights."""
|
|
try:
|
|
group_ids = get_group_ids()
|
|
if len(group_ids) < 2:
|
|
return {"insights": [], "message": "Need at least 2 monitored groups"}
|
|
|
|
summaries = {}
|
|
for gid in group_ids:
|
|
summaries[gid] = get_all_signals(gid)
|
|
|
|
insights = await asyncio.wait_for(analyze_cross_group(summaries), timeout=25.0)
|
|
return {"insights": [i.model_dump() for i in insights]}
|
|
except asyncio.TimeoutError:
|
|
logger.warning("Cross-group analysis timed out — returning heuristic fallback")
|
|
from backend.agents.cross_group_analyst import _heuristic_cross_group_insights
|
|
fallback = _heuristic_cross_group_insights(summaries)
|
|
return {"insights": [i.model_dump() for i in fallback]}
|
|
except Exception as e:
|
|
logger.warning(f"Cross-group analysis failed: {e}")
|
|
return {"insights": [], "message": "Analysis temporarily unavailable"}
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "ok", "service": "thirdeye"}
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Google Meet Ingestion Endpoints
|
|
# ─────────────────────────────────────────────
|
|
|
|
from pydantic import BaseModel
|
|
from backend.config import MEET_INGEST_SECRET, ENABLE_MEET_INGESTION
|
|
|
|
class MeetStartPayload(BaseModel):
|
|
meeting_id: str
|
|
group_id: str = "meet_sessions"
|
|
started_at: str
|
|
speaker: str = "Unknown"
|
|
|
|
class MeetChunkPayload(BaseModel):
|
|
meeting_id: str
|
|
group_id: str = "meet_sessions"
|
|
chunk_index: int
|
|
text: str
|
|
speaker: str = "Unknown"
|
|
timestamp: str
|
|
is_final: bool = False
|
|
|
|
def _verify_meet_secret(request: Request):
|
|
secret = request.headers.get("X-ThirdEye-Secret", "")
|
|
if secret != MEET_INGEST_SECRET:
|
|
from fastapi import HTTPException
|
|
raise HTTPException(status_code=403, detail="Invalid Meet ingest secret")
|
|
|
|
@app.post("/api/meet/start")
|
|
async def meet_start(payload: MeetStartPayload, request: Request):
|
|
"""Called by extension when a new meeting begins."""
|
|
_verify_meet_secret(request)
|
|
if not ENABLE_MEET_INGESTION:
|
|
return {"ok": False, "reason": "Meet ingestion disabled"}
|
|
|
|
# Store a meeting-started signal immediately
|
|
from backend.db.chroma import store_signals
|
|
from datetime import datetime
|
|
import uuid
|
|
|
|
signal = {
|
|
"id": str(uuid.uuid4()),
|
|
"type": "meet_started",
|
|
"summary": f"Meeting {payload.meeting_id} started by {payload.speaker}",
|
|
"raw_quote": "",
|
|
"severity": "low",
|
|
"status": "active",
|
|
"sentiment": "neutral",
|
|
"urgency": "none",
|
|
"entities": [f"@{payload.speaker}", f"#{payload.meeting_id}"],
|
|
"keywords": ["meeting", "started", payload.meeting_id],
|
|
"timestamp": payload.started_at,
|
|
"group_id": payload.group_id,
|
|
"lens": "meet",
|
|
"meeting_id": payload.meeting_id,
|
|
}
|
|
store_signals(payload.group_id, [signal])
|
|
return {"ok": True, "meeting_id": payload.meeting_id}
|
|
|
|
|
|
@app.post("/api/meet/ingest")
|
|
async def meet_ingest(payload: MeetChunkPayload, request: Request, background_tasks: BackgroundTasks):
|
|
"""Called by extension every 30s with a transcript chunk."""
|
|
_verify_meet_secret(request)
|
|
if not ENABLE_MEET_INGESTION:
|
|
return {"ok": False, "reason": "Meet ingestion disabled"}
|
|
|
|
if len(payload.text.strip()) < 10:
|
|
return {"ok": True, "skipped": True, "reason": "chunk too short"}
|
|
|
|
# Process asynchronously so the extension gets a fast response
|
|
from backend.agents.meet_ingestor import process_meet_chunk
|
|
background_tasks.add_task(
|
|
process_meet_chunk,
|
|
payload.meeting_id,
|
|
payload.group_id,
|
|
payload.chunk_index,
|
|
payload.text,
|
|
payload.speaker,
|
|
payload.timestamp,
|
|
payload.is_final,
|
|
)
|
|
|
|
return {
|
|
"ok": True,
|
|
"meeting_id": payload.meeting_id,
|
|
"chunk_index": payload.chunk_index,
|
|
"queued": True,
|
|
}
|
|
|
|
|
|
@app.get("/api/meet/meetings")
|
|
async def list_meetings():
|
|
"""List all recorded meetings with their signal counts."""
|
|
from backend.db.chroma import get_all_signals, get_group_ids
|
|
|
|
meetings = {}
|
|
|
|
# Check all groups for meet-related signals
|
|
for group_id in get_group_ids():
|
|
all_signals = get_all_signals(group_id)
|
|
for sig in all_signals:
|
|
# Only process signals that have lens="meet"
|
|
if sig.get("metadata", {}).get("lens") != "meet":
|
|
continue
|
|
|
|
mid = sig.get("metadata", {}).get("meeting_id", "unknown")
|
|
if not mid or mid == "":
|
|
continue
|
|
|
|
if mid not in meetings:
|
|
meetings[mid] = {"meeting_id": mid, "signal_count": 0, "types": {}}
|
|
meetings[mid]["signal_count"] += 1
|
|
t = sig.get("metadata", {}).get("type", "unknown")
|
|
meetings[mid]["types"][t] = meetings[mid]["types"].get(t, 0) + 1
|
|
|
|
return {"meetings": list(meetings.values())}
|
|
|
|
|
|
@app.get("/api/meet/meetings/{meeting_id}/signals")
|
|
async def get_meeting_signals(meeting_id: str):
|
|
"""Get all signals for a specific meeting."""
|
|
from backend.db.chroma import get_all_signals, get_group_ids
|
|
|
|
all_signals = []
|
|
for gid in get_group_ids():
|
|
for sig in get_all_signals(gid):
|
|
meta = sig.get("metadata", {})
|
|
if meta.get("meeting_id") == meeting_id and meta.get("lens") == "meet":
|
|
all_signals.append(sig)
|
|
|
|
all_signals.sort(key=lambda s: s.get("metadata", {}).get("timestamp", ""))
|
|
return {"meeting_id": meeting_id, "signals": all_signals, "count": len(all_signals)}
|
|
|
|
|
|
@app.get("/api/meet/meetings/{meeting_id}")
|
|
async def get_meeting_detail(meeting_id: str):
|
|
"""Get detailed info for a single meeting."""
|
|
from backend.db.chroma import get_all_signals, get_group_ids
|
|
|
|
signals_by_type: dict = {}
|
|
started_at = ""
|
|
speaker = "Unknown"
|
|
group_id = ""
|
|
|
|
for gid in get_group_ids():
|
|
for sig in get_all_signals(gid):
|
|
meta = sig.get("metadata", {})
|
|
if meta.get("meeting_id") != meeting_id or meta.get("lens") != "meet":
|
|
continue
|
|
sig_type = meta.get("type", "unknown")
|
|
signals_by_type.setdefault(sig_type, []).append(sig)
|
|
if sig_type == "meet_started":
|
|
started_at = meta.get("timestamp", "")
|
|
speaker = meta.get("speaker", "Unknown") or meta.get("entities", "Unknown")
|
|
group_id = gid
|
|
|
|
# Summary signal text
|
|
summary_text = ""
|
|
for sig in signals_by_type.get("meet_summary", []):
|
|
summary_text = sig.get("metadata", {}).get("summary", "") or sig.get("document", "")
|
|
break
|
|
|
|
signal_counts = {k: len(v) for k, v in signals_by_type.items()}
|
|
total_signals = sum(signal_counts.values())
|
|
|
|
return {
|
|
"meeting_id": meeting_id,
|
|
"started_at": started_at,
|
|
"speaker": speaker,
|
|
"group_id": group_id,
|
|
"total_signals": total_signals,
|
|
"signal_counts": signal_counts,
|
|
"summary": summary_text,
|
|
}
|
|
|
|
|
|
@app.get("/api/meet/meetings/{meeting_id}/transcript")
|
|
async def get_meeting_transcript(meeting_id: str):
|
|
"""Get raw transcript chunks for a meeting in chronological order."""
|
|
from backend.db.chroma import get_all_signals, get_group_ids
|
|
|
|
chunks = []
|
|
for gid in get_group_ids():
|
|
for sig in get_all_signals(gid):
|
|
meta = sig.get("metadata", {})
|
|
if meta.get("meeting_id") == meeting_id and meta.get("type") == "meet_chunk_raw":
|
|
chunks.append({
|
|
"id": sig.get("id", ""),
|
|
"text": meta.get("raw_quote", "") or sig.get("document", ""),
|
|
"speaker": meta.get("speaker", "Unknown"),
|
|
"timestamp": meta.get("timestamp", ""),
|
|
"summary": meta.get("summary", ""),
|
|
})
|
|
|
|
chunks.sort(key=lambda c: c["timestamp"])
|
|
return {"meeting_id": meeting_id, "transcript": chunks, "chunk_count": len(chunks)}
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Jira Endpoints
|
|
# ─────────────────────────────────────────────
|
|
|
|
class JiraRaisePayload(BaseModel):
|
|
signal_id: str
|
|
group_id: str
|
|
project_key: str = None
|
|
force: bool = False
|
|
|
|
class JiraCreatePayload(BaseModel):
|
|
summary: str
|
|
description: str = ""
|
|
project_key: str = None
|
|
issue_type: str = "Task"
|
|
priority: str = "Medium"
|
|
labels: list = []
|
|
assignee_account_id: str = None
|
|
|
|
|
|
@app.get("/api/jira/tickets")
|
|
async def list_jira_tickets(
|
|
group_id: str = None,
|
|
date_from: str = None,
|
|
date_to: str = None,
|
|
live: bool = False,
|
|
):
|
|
"""List all Jira tickets raised by ThirdEye across all groups."""
|
|
from backend.db.chroma import get_all_signals, get_group_ids
|
|
from backend.integrations.jira_client import get_issue, is_configured
|
|
|
|
group_ids = [group_id] if group_id else get_group_ids()
|
|
tickets = []
|
|
|
|
for gid in group_ids:
|
|
for sig in get_all_signals(gid, signal_type="jira_raised"):
|
|
meta = sig.get("metadata", {})
|
|
ts = meta.get("timestamp", "")
|
|
if date_from and ts < date_from:
|
|
continue
|
|
if date_to and ts > date_to:
|
|
continue
|
|
|
|
jira_key = meta.get("jira_key", "") or (
|
|
json.loads(meta.get("entities", "[]") or "[]") or [""]
|
|
)[0]
|
|
|
|
tickets.append({
|
|
"id": sig.get("id", ""),
|
|
"jira_key": jira_key,
|
|
"jira_url": meta.get("jira_url", ""),
|
|
"jira_summary": meta.get("jira_summary", "") or meta.get("summary", ""),
|
|
"jira_priority": meta.get("jira_priority", "Medium"),
|
|
"original_signal_id": meta.get("original_signal_id", "") or meta.get("raw_quote", ""),
|
|
"group_id": gid,
|
|
"raised_at": ts,
|
|
"status": "Unknown",
|
|
})
|
|
|
|
# Fetch live status from Jira if requested and configured
|
|
if live and is_configured() and tickets:
|
|
for ticket in tickets:
|
|
if ticket["jira_key"]:
|
|
try:
|
|
issue_data = await get_issue(ticket["jira_key"])
|
|
ticket["status"] = issue_data.get("status", "Unknown")
|
|
ticket["assignee"] = issue_data.get("assignee", "Unassigned")
|
|
if not ticket["jira_summary"]:
|
|
ticket["jira_summary"] = issue_data.get("summary", "")
|
|
except Exception:
|
|
pass
|
|
|
|
tickets.sort(key=lambda t: t["raised_at"], reverse=True)
|
|
return {"tickets": tickets, "count": len(tickets)}
|
|
|
|
|
|
@app.get("/api/jira/tickets/{ticket_key}/status")
|
|
async def get_jira_ticket_status(ticket_key: str):
|
|
"""Fetch live status for a Jira ticket."""
|
|
from backend.integrations.jira_client import get_issue, is_configured
|
|
|
|
if not is_configured():
|
|
raise HTTPException(503, "Jira is not configured")
|
|
try:
|
|
data = await get_issue(ticket_key)
|
|
return data
|
|
except Exception as e:
|
|
raise HTTPException(502, f"Jira API error: {e}")
|
|
|
|
|
|
@app.post("/api/jira/raise")
|
|
async def raise_jira_ticket(payload: JiraRaisePayload):
|
|
"""Raise a Jira ticket for an existing ThirdEye signal."""
|
|
from backend.db.chroma import get_all_signals
|
|
from backend.agents.jira_agent import raise_ticket_for_signal
|
|
from backend.integrations.jira_client import is_configured
|
|
|
|
if not is_configured():
|
|
raise HTTPException(503, "Jira is not configured")
|
|
|
|
# Find the signal in the group
|
|
signals = get_all_signals(payload.group_id)
|
|
target = next((s for s in signals if s.get("id") == payload.signal_id), None)
|
|
if not target:
|
|
raise HTTPException(404, f"Signal {payload.signal_id} not found in group {payload.group_id}")
|
|
|
|
# Build flat signal dict from stored format
|
|
meta = target.get("metadata", {})
|
|
signal_dict = {
|
|
"id": target.get("id", ""),
|
|
"type": meta.get("type", "unknown"),
|
|
"summary": meta.get("summary", "") or target.get("document", ""),
|
|
"raw_quote": meta.get("raw_quote", ""),
|
|
"severity": meta.get("severity", "medium"),
|
|
"status": meta.get("status", "open"),
|
|
"entities": json.loads(meta.get("entities", "[]") or "[]"),
|
|
"keywords": json.loads(meta.get("keywords", "[]") or "[]"),
|
|
"timestamp": meta.get("timestamp", ""),
|
|
"group_id": payload.group_id,
|
|
"lens": meta.get("lens", ""),
|
|
}
|
|
|
|
result = await raise_ticket_for_signal(
|
|
signal_dict,
|
|
payload.group_id,
|
|
project_key=payload.project_key,
|
|
force=payload.force,
|
|
)
|
|
return result
|
|
|
|
|
|
@app.post("/api/jira/create")
|
|
async def create_jira_ticket(payload: JiraCreatePayload):
|
|
"""Create a custom Jira ticket directly from the dashboard."""
|
|
from backend.integrations.jira_client import create_issue, is_configured
|
|
from backend.config import JIRA_DEFAULT_PROJECT
|
|
from backend.db.chroma import store_signals
|
|
import uuid as _uuid
|
|
from datetime import datetime
|
|
|
|
if not is_configured():
|
|
raise HTTPException(503, "Jira is not configured")
|
|
|
|
result = await create_issue(
|
|
project_key=payload.project_key or JIRA_DEFAULT_PROJECT,
|
|
summary=payload.summary,
|
|
description=payload.description or "(Created from ThirdEye Dashboard)",
|
|
issue_type=payload.issue_type,
|
|
priority=payload.priority,
|
|
labels=payload.labels or ["thirdeye", "dashboard"],
|
|
assignee_account_id=payload.assignee_account_id or None,
|
|
)
|
|
|
|
# Persist a jira_raised tracking signal so it appears in the ticket list
|
|
if result.get("ok"):
|
|
jira_key = result["key"]
|
|
jira_url = result.get("url", "")
|
|
tracking_signal = {
|
|
"id": str(_uuid.uuid4()),
|
|
"type": "jira_raised",
|
|
"summary": payload.summary,
|
|
"raw_quote": "manual",
|
|
"severity": payload.priority.lower() if payload.priority else "medium",
|
|
"status": "raised",
|
|
"sentiment": "neutral",
|
|
"urgency": "none",
|
|
"entities": [jira_key],
|
|
"keywords": ["jira", jira_key, "manual", "dashboard"],
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"group_id": "dashboard",
|
|
"lens": "jira",
|
|
"jira_key": jira_key,
|
|
"jira_url": jira_url,
|
|
"jira_summary": payload.summary,
|
|
"jira_priority": payload.priority or "Medium",
|
|
"original_signal_id": "manual",
|
|
}
|
|
store_signals("dashboard", [tracking_signal])
|
|
logger.info(f"Stored manually-created Jira ticket {jira_key} in ChromaDB (group=dashboard)")
|
|
|
|
return result
|
|
|
|
|
|
@app.get("/api/jira/users/search")
|
|
async def search_jira_users(q: str = ""):
|
|
"""Search Jira users by name or email fragment for assignee picker."""
|
|
from backend.integrations.jira_client import search_users, is_configured
|
|
if not is_configured():
|
|
raise HTTPException(503, "Jira is not configured")
|
|
if not q or len(q.strip()) < 1:
|
|
return {"users": []}
|
|
try:
|
|
users = await search_users(q.strip(), max_results=8)
|
|
return {"users": users}
|
|
except Exception as e:
|
|
logger.warning(f"Jira user search failed: {e}")
|
|
return {"users": []}
|
|
|
|
|
|
@app.get("/api/jira/config")
|
|
async def get_jira_config():
|
|
"""Check if Jira is configured and return basic project info."""
|
|
from backend.integrations.jira_client import is_configured, test_connection, list_projects
|
|
from backend.config import JIRA_DEFAULT_PROJECT, JIRA_BASE_URL
|
|
|
|
configured = is_configured()
|
|
if not configured:
|
|
return {"configured": False}
|
|
|
|
conn = await test_connection()
|
|
projects = []
|
|
if conn.get("ok"):
|
|
try:
|
|
projects = await list_projects()
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"configured": True,
|
|
"connected": conn.get("ok", False),
|
|
"base_url": JIRA_BASE_URL,
|
|
"default_project": JIRA_DEFAULT_PROJECT,
|
|
"projects": projects,
|
|
}
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Knowledge Browser
|
|
# ─────────────────────────────────────────────
|
|
|
|
_BROWSE_EXCLUDED_TYPES = {
|
|
"jira_raised", "meet_started", "meet_chunk_raw", "voice_transcript",
|
|
}
|
|
|
|
_BROWSE_STOPWORDS = {
|
|
"the", "a", "an", "is", "in", "on", "at", "to", "and", "or", "not",
|
|
"for", "of", "it", "this", "that", "be", "with", "as", "was", "are",
|
|
"has", "have", "but", "by", "from", "we", "our", "they", "its",
|
|
"will", "can", "would", "should", "about", "all", "new", "use",
|
|
}
|
|
|
|
|
|
@app.get("/api/knowledge/browse/{group_id}")
|
|
async def browse_knowledge(
|
|
group_id: str,
|
|
date_from: str = None,
|
|
date_to: str = None,
|
|
topic: str = None,
|
|
):
|
|
"""
|
|
Browse a group's knowledge base organized by AI-clustered topics and date timeline.
|
|
Returns topics (derived from keyword frequency) and a day-by-day timeline of signals.
|
|
Excludes internal system signals (jira_raised, meet_chunk_raw, etc.).
|
|
"""
|
|
all_sigs = get_all_signals(group_id)
|
|
|
|
# Strip system / tracking signals
|
|
signals = [
|
|
s for s in all_sigs
|
|
if s.get("metadata", {}).get("type", "") not in _BROWSE_EXCLUDED_TYPES
|
|
]
|
|
|
|
# Date filtering
|
|
if date_from:
|
|
signals = [s for s in signals if s.get("metadata", {}).get("timestamp", "") >= date_from]
|
|
if date_to:
|
|
signals = [s for s in signals if s.get("metadata", {}).get("timestamp", "") <= date_to + "T23:59:59"]
|
|
|
|
# ── Build keyword frequency map ────────────────────────────────────────────
|
|
keyword_freq: dict[str, int] = defaultdict(int)
|
|
for sig in signals:
|
|
raw_kws = sig.get("metadata", {}).get("keywords", "[]")
|
|
try:
|
|
kws: list = json.loads(raw_kws) if isinstance(raw_kws, str) else raw_kws
|
|
except Exception:
|
|
kws = []
|
|
for kw in kws:
|
|
kw_clean = str(kw).lower().strip()
|
|
if len(kw_clean) > 2 and kw_clean not in _BROWSE_STOPWORDS:
|
|
keyword_freq[kw_clean] += 1
|
|
|
|
# Top 25 keywords become the selectable topics (must appear in ≥2 signals)
|
|
sorted_kws = sorted(keyword_freq.items(), key=lambda x: x[1], reverse=True)
|
|
top_topics: list[str] = [kw for kw, freq in sorted_kws[:25] if freq >= 1]
|
|
|
|
def _primary_topic(sig: dict) -> str:
|
|
"""Return the highest-ranked top-topic that this signal's keywords contain."""
|
|
raw_kws = sig.get("metadata", {}).get("keywords", "[]")
|
|
try:
|
|
kw_set = {str(k).lower().strip() for k in (json.loads(raw_kws) if isinstance(raw_kws, str) else raw_kws)}
|
|
except Exception:
|
|
kw_set = set()
|
|
for t in top_topics:
|
|
if t in kw_set:
|
|
return t
|
|
return sig.get("metadata", {}).get("type", "other").replace("_", " ")
|
|
|
|
def _all_topics(sig: dict) -> list[str]:
|
|
"""Return all top-topics that this signal belongs to."""
|
|
raw_kws = sig.get("metadata", {}).get("keywords", "[]")
|
|
try:
|
|
kw_set = {str(k).lower().strip() for k in (json.loads(raw_kws) if isinstance(raw_kws, str) else raw_kws)}
|
|
except Exception:
|
|
kw_set = set()
|
|
matched = [t for t in top_topics if t in kw_set]
|
|
return matched if matched else [sig.get("metadata", {}).get("type", "other").replace("_", " ")]
|
|
|
|
# ── Topic-filter (optional) ────────────────────────────────────────────────
|
|
if topic:
|
|
topic_lower = topic.lower()
|
|
filtered = []
|
|
for sig in signals:
|
|
raw_kws = sig.get("metadata", {}).get("keywords", "[]")
|
|
try:
|
|
kws = [str(k).lower().strip() for k in (json.loads(raw_kws) if isinstance(raw_kws, str) else raw_kws)]
|
|
except Exception:
|
|
kws = []
|
|
sig_type = sig.get("metadata", {}).get("type", "").replace("_", " ")
|
|
if topic_lower in kws or topic_lower == sig_type:
|
|
filtered.append(sig)
|
|
signals = filtered
|
|
|
|
# ── Build topics summary list ──────────────────────────────────────────────
|
|
topic_buckets: dict[str, list] = defaultdict(list)
|
|
for sig in (get_all_signals(group_id) if not (date_from or date_to or topic) else signals):
|
|
# Rebuild buckets from the full unfiltered set for sidebar counts
|
|
pass
|
|
|
|
# Use current filtered signals for topic counts
|
|
for sig in signals:
|
|
primary = _primary_topic(sig)
|
|
topic_buckets[primary].append(sig)
|
|
|
|
topics_summary = []
|
|
seen_topics: set[str] = set()
|
|
for t in top_topics:
|
|
bucket = topic_buckets.get(t, [])
|
|
if bucket and t not in seen_topics:
|
|
seen_topics.add(t)
|
|
latest_ts = max((s.get("metadata", {}).get("timestamp", "") for s in bucket), default="")
|
|
topics_summary.append({
|
|
"name": t,
|
|
"signal_count": len(bucket),
|
|
"latest": latest_ts,
|
|
"sample_signals": [
|
|
s.get("metadata", {}).get("summary", "") or s.get("document", "")
|
|
for s in bucket[:2]
|
|
],
|
|
})
|
|
# Add leftover types as topics
|
|
for t, bucket in sorted(topic_buckets.items(), key=lambda x: len(x[1]), reverse=True):
|
|
if t not in seen_topics and bucket:
|
|
seen_topics.add(t)
|
|
latest_ts = max((s.get("metadata", {}).get("timestamp", "") for s in bucket), default="")
|
|
topics_summary.append({
|
|
"name": t,
|
|
"signal_count": len(bucket),
|
|
"latest": latest_ts,
|
|
"sample_signals": [
|
|
s.get("metadata", {}).get("summary", "") or s.get("document", "")
|
|
for s in bucket[:2]
|
|
],
|
|
})
|
|
topics_summary.sort(key=lambda t: t["signal_count"], reverse=True)
|
|
|
|
# ── Build day-by-day timeline ──────────────────────────────────────────────
|
|
day_buckets: dict[str, list] = defaultdict(list)
|
|
for sig in signals:
|
|
ts = sig.get("metadata", {}).get("timestamp", "")
|
|
date_key = ts[:10] if ts and len(ts) >= 10 else "unknown"
|
|
day_buckets[date_key].append(sig)
|
|
|
|
timeline = []
|
|
for date_key in sorted(day_buckets.keys(), reverse=True):
|
|
day_sigs = sorted(
|
|
day_buckets[date_key],
|
|
key=lambda s: s.get("metadata", {}).get("timestamp", ""),
|
|
reverse=True,
|
|
)
|
|
day_topics = list(dict.fromkeys(
|
|
t for s in day_sigs for t in _all_topics(s)
|
|
))
|
|
timeline.append({
|
|
"date": date_key,
|
|
"signals": day_sigs,
|
|
"topics": day_topics[:6],
|
|
"signal_count": len(day_sigs),
|
|
})
|
|
|
|
# ── Date range metadata ────────────────────────────────────────────────────
|
|
all_ts = [
|
|
s.get("metadata", {}).get("timestamp", "")
|
|
for s in signals
|
|
if s.get("metadata", {}).get("timestamp", "")
|
|
]
|
|
date_range = {
|
|
"earliest": min(all_ts) if all_ts else "",
|
|
"latest": max(all_ts) if all_ts else "",
|
|
}
|
|
|
|
names = get_group_names()
|
|
return {
|
|
"group_id": group_id,
|
|
"group_name": names.get(group_id, group_id),
|
|
"total_signals": len(signals),
|
|
"date_range": date_range,
|
|
"topics": topics_summary,
|
|
"timeline": timeline,
|
|
}
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Enhanced Chat / Signals Timeline
|
|
# ─────────────────────────────────────────────
|
|
|
|
@app.get("/api/signals/timeline")
|
|
async def get_signals_timeline(
|
|
group_id: str = None,
|
|
severity: str = None,
|
|
lens: str = None,
|
|
signal_type: str = None,
|
|
date_from: str = None,
|
|
date_to: str = None,
|
|
limit: int = 200,
|
|
):
|
|
"""
|
|
Cross-group signal timeline with full filter support.
|
|
Returns signals sorted newest-first, ready for timeline rendering.
|
|
"""
|
|
from backend.db.chroma import get_all_signals, get_group_ids, get_group_names
|
|
|
|
group_ids = [group_id] if group_id else get_group_ids()
|
|
names = get_group_names()
|
|
all_signals = []
|
|
|
|
for gid in group_ids:
|
|
for sig in get_all_signals(gid, signal_type=signal_type):
|
|
meta = sig.get("metadata", {})
|
|
ts = meta.get("timestamp", "")
|
|
|
|
if severity and meta.get("severity") != severity:
|
|
continue
|
|
if lens and meta.get("lens") != lens:
|
|
continue
|
|
if date_from and ts < date_from:
|
|
continue
|
|
if date_to and ts > date_to:
|
|
continue
|
|
# Exclude internal tracking signals from timeline
|
|
if meta.get("type") in ("jira_raised", "meet_started"):
|
|
continue
|
|
|
|
all_signals.append({
|
|
**sig,
|
|
"group_name": names.get(gid, gid),
|
|
})
|
|
|
|
all_signals.sort(key=lambda s: s.get("metadata", {}).get("timestamp", ""), reverse=True)
|
|
return {
|
|
"signals": all_signals[:limit],
|
|
"total": len(all_signals),
|
|
"truncated": len(all_signals) > limit,
|
|
}
|
|
|