Files
2026-04-05 00:43:23 +05:30

786 lines
29 KiB
Python

"""FastAPI routes for the ThirdEye dashboard."""
import asyncio
import json
import logging
from fastapi import FastAPI, HTTPException, Request, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from backend.db.chroma import get_all_signals, query_signals, get_group_ids, get_group_names
from backend.pipeline import query_knowledge, get_lens, set_lens
from backend.agents.pattern_detector import detect_patterns
from backend.agents.cross_group_analyst import analyze_cross_group
from collections import defaultdict
logger = logging.getLogger("thirdeye.api")
app = FastAPI(title="ThirdEye API", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/api/groups")
async def list_groups():
"""List all monitored groups."""
group_ids = get_group_ids()
names = get_group_names()
groups = []
for gid in group_ids:
signals = get_all_signals(gid)
groups.append({
"group_id": gid,
"group_name": names.get(gid, gid),
"signal_count": len(signals),
"lens": get_lens(gid),
})
return {"groups": groups}
@app.get("/api/groups/{group_id}/signals")
async def get_signals(
group_id: str,
signal_type: str = None,
severity: str = None,
lens: str = None,
date_from: str = None,
date_to: str = None,
):
"""Get signals for a group with optional filters."""
signals = get_all_signals(group_id, signal_type=signal_type)
if severity:
signals = [s for s in signals if s.get("metadata", {}).get("severity") == severity]
if lens:
signals = [s for s in signals if s.get("metadata", {}).get("lens") == lens]
if date_from:
signals = [s for s in signals if s.get("metadata", {}).get("timestamp", "") >= date_from]
if date_to:
signals = [s for s in signals if s.get("metadata", {}).get("timestamp", "") <= date_to]
signals.sort(key=lambda s: s.get("metadata", {}).get("timestamp", ""), reverse=True)
return {"signals": signals, "count": len(signals)}
@app.post("/api/groups/{group_id}/query")
async def query_group(group_id: str, body: dict):
"""Natural language query over a group's knowledge base."""
question = body.get("question", "")
if not question:
raise HTTPException(400, "Missing 'question' field")
try:
answer = await query_knowledge(group_id, question)
return {"answer": answer, "question": question}
except Exception as e:
logger.warning(f"Query failed for {group_id}: {e}")
raise HTTPException(500, "Query processing failed — please try again")
@app.get("/api/groups/{group_id}/patterns")
async def get_patterns(group_id: str):
"""Detect and return patterns for a group."""
try:
patterns = await asyncio.wait_for(detect_patterns(group_id), timeout=25.0)
return {"patterns": [p.model_dump() for p in patterns]}
except asyncio.TimeoutError:
logger.warning(f"Pattern detection timed out for {group_id}")
return {"patterns": []}
except Exception as e:
logger.warning(f"Pattern detection failed for {group_id}: {e}")
return {"patterns": []}
@app.get("/api/cross-group/insights")
async def get_cross_group_insights():
"""Run cross-group analysis and return insights."""
try:
group_ids = get_group_ids()
if len(group_ids) < 2:
return {"insights": [], "message": "Need at least 2 monitored groups"}
summaries = {}
for gid in group_ids:
summaries[gid] = get_all_signals(gid)
insights = await asyncio.wait_for(analyze_cross_group(summaries), timeout=25.0)
return {"insights": [i.model_dump() for i in insights]}
except asyncio.TimeoutError:
logger.warning("Cross-group analysis timed out — returning heuristic fallback")
from backend.agents.cross_group_analyst import _heuristic_cross_group_insights
fallback = _heuristic_cross_group_insights(summaries)
return {"insights": [i.model_dump() for i in fallback]}
except Exception as e:
logger.warning(f"Cross-group analysis failed: {e}")
return {"insights": [], "message": "Analysis temporarily unavailable"}
@app.get("/health")
async def health():
return {"status": "ok", "service": "thirdeye"}
# ─────────────────────────────────────────────
# Google Meet Ingestion Endpoints
# ─────────────────────────────────────────────
from pydantic import BaseModel
from backend.config import MEET_INGEST_SECRET, ENABLE_MEET_INGESTION
class MeetStartPayload(BaseModel):
meeting_id: str
group_id: str = "meet_sessions"
started_at: str
speaker: str = "Unknown"
class MeetChunkPayload(BaseModel):
meeting_id: str
group_id: str = "meet_sessions"
chunk_index: int
text: str
speaker: str = "Unknown"
timestamp: str
is_final: bool = False
def _verify_meet_secret(request: Request):
secret = request.headers.get("X-ThirdEye-Secret", "")
if secret != MEET_INGEST_SECRET:
from fastapi import HTTPException
raise HTTPException(status_code=403, detail="Invalid Meet ingest secret")
@app.post("/api/meet/start")
async def meet_start(payload: MeetStartPayload, request: Request):
"""Called by extension when a new meeting begins."""
_verify_meet_secret(request)
if not ENABLE_MEET_INGESTION:
return {"ok": False, "reason": "Meet ingestion disabled"}
# Store a meeting-started signal immediately
from backend.db.chroma import store_signals
from datetime import datetime
import uuid
signal = {
"id": str(uuid.uuid4()),
"type": "meet_started",
"summary": f"Meeting {payload.meeting_id} started by {payload.speaker}",
"raw_quote": "",
"severity": "low",
"status": "active",
"sentiment": "neutral",
"urgency": "none",
"entities": [f"@{payload.speaker}", f"#{payload.meeting_id}"],
"keywords": ["meeting", "started", payload.meeting_id],
"timestamp": payload.started_at,
"group_id": payload.group_id,
"lens": "meet",
"meeting_id": payload.meeting_id,
}
store_signals(payload.group_id, [signal])
return {"ok": True, "meeting_id": payload.meeting_id}
@app.post("/api/meet/ingest")
async def meet_ingest(payload: MeetChunkPayload, request: Request, background_tasks: BackgroundTasks):
"""Called by extension every 30s with a transcript chunk."""
_verify_meet_secret(request)
if not ENABLE_MEET_INGESTION:
return {"ok": False, "reason": "Meet ingestion disabled"}
if len(payload.text.strip()) < 10:
return {"ok": True, "skipped": True, "reason": "chunk too short"}
# Process asynchronously so the extension gets a fast response
from backend.agents.meet_ingestor import process_meet_chunk
background_tasks.add_task(
process_meet_chunk,
payload.meeting_id,
payload.group_id,
payload.chunk_index,
payload.text,
payload.speaker,
payload.timestamp,
payload.is_final,
)
return {
"ok": True,
"meeting_id": payload.meeting_id,
"chunk_index": payload.chunk_index,
"queued": True,
}
@app.get("/api/meet/meetings")
async def list_meetings():
"""List all recorded meetings with their signal counts."""
from backend.db.chroma import get_all_signals, get_group_ids
meetings = {}
# Check all groups for meet-related signals
for group_id in get_group_ids():
all_signals = get_all_signals(group_id)
for sig in all_signals:
# Only process signals that have lens="meet"
if sig.get("metadata", {}).get("lens") != "meet":
continue
mid = sig.get("metadata", {}).get("meeting_id", "unknown")
if not mid or mid == "":
continue
if mid not in meetings:
meetings[mid] = {"meeting_id": mid, "signal_count": 0, "types": {}}
meetings[mid]["signal_count"] += 1
t = sig.get("metadata", {}).get("type", "unknown")
meetings[mid]["types"][t] = meetings[mid]["types"].get(t, 0) + 1
return {"meetings": list(meetings.values())}
@app.get("/api/meet/meetings/{meeting_id}/signals")
async def get_meeting_signals(meeting_id: str):
"""Get all signals for a specific meeting."""
from backend.db.chroma import get_all_signals, get_group_ids
all_signals = []
for gid in get_group_ids():
for sig in get_all_signals(gid):
meta = sig.get("metadata", {})
if meta.get("meeting_id") == meeting_id and meta.get("lens") == "meet":
all_signals.append(sig)
all_signals.sort(key=lambda s: s.get("metadata", {}).get("timestamp", ""))
return {"meeting_id": meeting_id, "signals": all_signals, "count": len(all_signals)}
@app.get("/api/meet/meetings/{meeting_id}")
async def get_meeting_detail(meeting_id: str):
"""Get detailed info for a single meeting."""
from backend.db.chroma import get_all_signals, get_group_ids
signals_by_type: dict = {}
started_at = ""
speaker = "Unknown"
group_id = ""
for gid in get_group_ids():
for sig in get_all_signals(gid):
meta = sig.get("metadata", {})
if meta.get("meeting_id") != meeting_id or meta.get("lens") != "meet":
continue
sig_type = meta.get("type", "unknown")
signals_by_type.setdefault(sig_type, []).append(sig)
if sig_type == "meet_started":
started_at = meta.get("timestamp", "")
speaker = meta.get("speaker", "Unknown") or meta.get("entities", "Unknown")
group_id = gid
# Summary signal text
summary_text = ""
for sig in signals_by_type.get("meet_summary", []):
summary_text = sig.get("metadata", {}).get("summary", "") or sig.get("document", "")
break
signal_counts = {k: len(v) for k, v in signals_by_type.items()}
total_signals = sum(signal_counts.values())
return {
"meeting_id": meeting_id,
"started_at": started_at,
"speaker": speaker,
"group_id": group_id,
"total_signals": total_signals,
"signal_counts": signal_counts,
"summary": summary_text,
}
@app.get("/api/meet/meetings/{meeting_id}/transcript")
async def get_meeting_transcript(meeting_id: str):
"""Get raw transcript chunks for a meeting in chronological order."""
from backend.db.chroma import get_all_signals, get_group_ids
chunks = []
for gid in get_group_ids():
for sig in get_all_signals(gid):
meta = sig.get("metadata", {})
if meta.get("meeting_id") == meeting_id and meta.get("type") == "meet_chunk_raw":
chunks.append({
"id": sig.get("id", ""),
"text": meta.get("raw_quote", "") or sig.get("document", ""),
"speaker": meta.get("speaker", "Unknown"),
"timestamp": meta.get("timestamp", ""),
"summary": meta.get("summary", ""),
})
chunks.sort(key=lambda c: c["timestamp"])
return {"meeting_id": meeting_id, "transcript": chunks, "chunk_count": len(chunks)}
# ─────────────────────────────────────────────
# Jira Endpoints
# ─────────────────────────────────────────────
class JiraRaisePayload(BaseModel):
signal_id: str
group_id: str
project_key: str = None
force: bool = False
class JiraCreatePayload(BaseModel):
summary: str
description: str = ""
project_key: str = None
issue_type: str = "Task"
priority: str = "Medium"
labels: list = []
assignee_account_id: str = None
@app.get("/api/jira/tickets")
async def list_jira_tickets(
group_id: str = None,
date_from: str = None,
date_to: str = None,
live: bool = False,
):
"""List all Jira tickets raised by ThirdEye across all groups."""
from backend.db.chroma import get_all_signals, get_group_ids
from backend.integrations.jira_client import get_issue, is_configured
group_ids = [group_id] if group_id else get_group_ids()
tickets = []
for gid in group_ids:
for sig in get_all_signals(gid, signal_type="jira_raised"):
meta = sig.get("metadata", {})
ts = meta.get("timestamp", "")
if date_from and ts < date_from:
continue
if date_to and ts > date_to:
continue
jira_key = meta.get("jira_key", "") or (
json.loads(meta.get("entities", "[]") or "[]") or [""]
)[0]
tickets.append({
"id": sig.get("id", ""),
"jira_key": jira_key,
"jira_url": meta.get("jira_url", ""),
"jira_summary": meta.get("jira_summary", "") or meta.get("summary", ""),
"jira_priority": meta.get("jira_priority", "Medium"),
"original_signal_id": meta.get("original_signal_id", "") or meta.get("raw_quote", ""),
"group_id": gid,
"raised_at": ts,
"status": "Unknown",
})
# Fetch live status from Jira if requested and configured
if live and is_configured() and tickets:
for ticket in tickets:
if ticket["jira_key"]:
try:
issue_data = await get_issue(ticket["jira_key"])
ticket["status"] = issue_data.get("status", "Unknown")
ticket["assignee"] = issue_data.get("assignee", "Unassigned")
if not ticket["jira_summary"]:
ticket["jira_summary"] = issue_data.get("summary", "")
except Exception:
pass
tickets.sort(key=lambda t: t["raised_at"], reverse=True)
return {"tickets": tickets, "count": len(tickets)}
@app.get("/api/jira/tickets/{ticket_key}/status")
async def get_jira_ticket_status(ticket_key: str):
"""Fetch live status for a Jira ticket."""
from backend.integrations.jira_client import get_issue, is_configured
if not is_configured():
raise HTTPException(503, "Jira is not configured")
try:
data = await get_issue(ticket_key)
return data
except Exception as e:
raise HTTPException(502, f"Jira API error: {e}")
@app.post("/api/jira/raise")
async def raise_jira_ticket(payload: JiraRaisePayload):
"""Raise a Jira ticket for an existing ThirdEye signal."""
from backend.db.chroma import get_all_signals
from backend.agents.jira_agent import raise_ticket_for_signal
from backend.integrations.jira_client import is_configured
if not is_configured():
raise HTTPException(503, "Jira is not configured")
# Find the signal in the group
signals = get_all_signals(payload.group_id)
target = next((s for s in signals if s.get("id") == payload.signal_id), None)
if not target:
raise HTTPException(404, f"Signal {payload.signal_id} not found in group {payload.group_id}")
# Build flat signal dict from stored format
meta = target.get("metadata", {})
signal_dict = {
"id": target.get("id", ""),
"type": meta.get("type", "unknown"),
"summary": meta.get("summary", "") or target.get("document", ""),
"raw_quote": meta.get("raw_quote", ""),
"severity": meta.get("severity", "medium"),
"status": meta.get("status", "open"),
"entities": json.loads(meta.get("entities", "[]") or "[]"),
"keywords": json.loads(meta.get("keywords", "[]") or "[]"),
"timestamp": meta.get("timestamp", ""),
"group_id": payload.group_id,
"lens": meta.get("lens", ""),
}
result = await raise_ticket_for_signal(
signal_dict,
payload.group_id,
project_key=payload.project_key,
force=payload.force,
)
return result
@app.post("/api/jira/create")
async def create_jira_ticket(payload: JiraCreatePayload):
"""Create a custom Jira ticket directly from the dashboard."""
from backend.integrations.jira_client import create_issue, is_configured
from backend.config import JIRA_DEFAULT_PROJECT
from backend.db.chroma import store_signals
import uuid as _uuid
from datetime import datetime
if not is_configured():
raise HTTPException(503, "Jira is not configured")
result = await create_issue(
project_key=payload.project_key or JIRA_DEFAULT_PROJECT,
summary=payload.summary,
description=payload.description or "(Created from ThirdEye Dashboard)",
issue_type=payload.issue_type,
priority=payload.priority,
labels=payload.labels or ["thirdeye", "dashboard"],
assignee_account_id=payload.assignee_account_id or None,
)
# Persist a jira_raised tracking signal so it appears in the ticket list
if result.get("ok"):
jira_key = result["key"]
jira_url = result.get("url", "")
tracking_signal = {
"id": str(_uuid.uuid4()),
"type": "jira_raised",
"summary": payload.summary,
"raw_quote": "manual",
"severity": payload.priority.lower() if payload.priority else "medium",
"status": "raised",
"sentiment": "neutral",
"urgency": "none",
"entities": [jira_key],
"keywords": ["jira", jira_key, "manual", "dashboard"],
"timestamp": datetime.utcnow().isoformat(),
"group_id": "dashboard",
"lens": "jira",
"jira_key": jira_key,
"jira_url": jira_url,
"jira_summary": payload.summary,
"jira_priority": payload.priority or "Medium",
"original_signal_id": "manual",
}
store_signals("dashboard", [tracking_signal])
logger.info(f"Stored manually-created Jira ticket {jira_key} in ChromaDB (group=dashboard)")
return result
@app.get("/api/jira/users/search")
async def search_jira_users(q: str = ""):
"""Search Jira users by name or email fragment for assignee picker."""
from backend.integrations.jira_client import search_users, is_configured
if not is_configured():
raise HTTPException(503, "Jira is not configured")
if not q or len(q.strip()) < 1:
return {"users": []}
try:
users = await search_users(q.strip(), max_results=8)
return {"users": users}
except Exception as e:
logger.warning(f"Jira user search failed: {e}")
return {"users": []}
@app.get("/api/jira/config")
async def get_jira_config():
"""Check if Jira is configured and return basic project info."""
from backend.integrations.jira_client import is_configured, test_connection, list_projects
from backend.config import JIRA_DEFAULT_PROJECT, JIRA_BASE_URL
configured = is_configured()
if not configured:
return {"configured": False}
conn = await test_connection()
projects = []
if conn.get("ok"):
try:
projects = await list_projects()
except Exception:
pass
return {
"configured": True,
"connected": conn.get("ok", False),
"base_url": JIRA_BASE_URL,
"default_project": JIRA_DEFAULT_PROJECT,
"projects": projects,
}
# ─────────────────────────────────────────────
# Knowledge Browser
# ─────────────────────────────────────────────
_BROWSE_EXCLUDED_TYPES = {
"jira_raised", "meet_started", "meet_chunk_raw", "voice_transcript",
}
_BROWSE_STOPWORDS = {
"the", "a", "an", "is", "in", "on", "at", "to", "and", "or", "not",
"for", "of", "it", "this", "that", "be", "with", "as", "was", "are",
"has", "have", "but", "by", "from", "we", "our", "they", "its",
"will", "can", "would", "should", "about", "all", "new", "use",
}
@app.get("/api/knowledge/browse/{group_id}")
async def browse_knowledge(
group_id: str,
date_from: str = None,
date_to: str = None,
topic: str = None,
):
"""
Browse a group's knowledge base organized by AI-clustered topics and date timeline.
Returns topics (derived from keyword frequency) and a day-by-day timeline of signals.
Excludes internal system signals (jira_raised, meet_chunk_raw, etc.).
"""
all_sigs = get_all_signals(group_id)
# Strip system / tracking signals
signals = [
s for s in all_sigs
if s.get("metadata", {}).get("type", "") not in _BROWSE_EXCLUDED_TYPES
]
# Date filtering
if date_from:
signals = [s for s in signals if s.get("metadata", {}).get("timestamp", "") >= date_from]
if date_to:
signals = [s for s in signals if s.get("metadata", {}).get("timestamp", "") <= date_to + "T23:59:59"]
# ── Build keyword frequency map ────────────────────────────────────────────
keyword_freq: dict[str, int] = defaultdict(int)
for sig in signals:
raw_kws = sig.get("metadata", {}).get("keywords", "[]")
try:
kws: list = json.loads(raw_kws) if isinstance(raw_kws, str) else raw_kws
except Exception:
kws = []
for kw in kws:
kw_clean = str(kw).lower().strip()
if len(kw_clean) > 2 and kw_clean not in _BROWSE_STOPWORDS:
keyword_freq[kw_clean] += 1
# Top 25 keywords become the selectable topics (must appear in ≥2 signals)
sorted_kws = sorted(keyword_freq.items(), key=lambda x: x[1], reverse=True)
top_topics: list[str] = [kw for kw, freq in sorted_kws[:25] if freq >= 1]
def _primary_topic(sig: dict) -> str:
"""Return the highest-ranked top-topic that this signal's keywords contain."""
raw_kws = sig.get("metadata", {}).get("keywords", "[]")
try:
kw_set = {str(k).lower().strip() for k in (json.loads(raw_kws) if isinstance(raw_kws, str) else raw_kws)}
except Exception:
kw_set = set()
for t in top_topics:
if t in kw_set:
return t
return sig.get("metadata", {}).get("type", "other").replace("_", " ")
def _all_topics(sig: dict) -> list[str]:
"""Return all top-topics that this signal belongs to."""
raw_kws = sig.get("metadata", {}).get("keywords", "[]")
try:
kw_set = {str(k).lower().strip() for k in (json.loads(raw_kws) if isinstance(raw_kws, str) else raw_kws)}
except Exception:
kw_set = set()
matched = [t for t in top_topics if t in kw_set]
return matched if matched else [sig.get("metadata", {}).get("type", "other").replace("_", " ")]
# ── Topic-filter (optional) ────────────────────────────────────────────────
if topic:
topic_lower = topic.lower()
filtered = []
for sig in signals:
raw_kws = sig.get("metadata", {}).get("keywords", "[]")
try:
kws = [str(k).lower().strip() for k in (json.loads(raw_kws) if isinstance(raw_kws, str) else raw_kws)]
except Exception:
kws = []
sig_type = sig.get("metadata", {}).get("type", "").replace("_", " ")
if topic_lower in kws or topic_lower == sig_type:
filtered.append(sig)
signals = filtered
# ── Build topics summary list ──────────────────────────────────────────────
topic_buckets: dict[str, list] = defaultdict(list)
for sig in (get_all_signals(group_id) if not (date_from or date_to or topic) else signals):
# Rebuild buckets from the full unfiltered set for sidebar counts
pass
# Use current filtered signals for topic counts
for sig in signals:
primary = _primary_topic(sig)
topic_buckets[primary].append(sig)
topics_summary = []
seen_topics: set[str] = set()
for t in top_topics:
bucket = topic_buckets.get(t, [])
if bucket and t not in seen_topics:
seen_topics.add(t)
latest_ts = max((s.get("metadata", {}).get("timestamp", "") for s in bucket), default="")
topics_summary.append({
"name": t,
"signal_count": len(bucket),
"latest": latest_ts,
"sample_signals": [
s.get("metadata", {}).get("summary", "") or s.get("document", "")
for s in bucket[:2]
],
})
# Add leftover types as topics
for t, bucket in sorted(topic_buckets.items(), key=lambda x: len(x[1]), reverse=True):
if t not in seen_topics and bucket:
seen_topics.add(t)
latest_ts = max((s.get("metadata", {}).get("timestamp", "") for s in bucket), default="")
topics_summary.append({
"name": t,
"signal_count": len(bucket),
"latest": latest_ts,
"sample_signals": [
s.get("metadata", {}).get("summary", "") or s.get("document", "")
for s in bucket[:2]
],
})
topics_summary.sort(key=lambda t: t["signal_count"], reverse=True)
# ── Build day-by-day timeline ──────────────────────────────────────────────
day_buckets: dict[str, list] = defaultdict(list)
for sig in signals:
ts = sig.get("metadata", {}).get("timestamp", "")
date_key = ts[:10] if ts and len(ts) >= 10 else "unknown"
day_buckets[date_key].append(sig)
timeline = []
for date_key in sorted(day_buckets.keys(), reverse=True):
day_sigs = sorted(
day_buckets[date_key],
key=lambda s: s.get("metadata", {}).get("timestamp", ""),
reverse=True,
)
day_topics = list(dict.fromkeys(
t for s in day_sigs for t in _all_topics(s)
))
timeline.append({
"date": date_key,
"signals": day_sigs,
"topics": day_topics[:6],
"signal_count": len(day_sigs),
})
# ── Date range metadata ────────────────────────────────────────────────────
all_ts = [
s.get("metadata", {}).get("timestamp", "")
for s in signals
if s.get("metadata", {}).get("timestamp", "")
]
date_range = {
"earliest": min(all_ts) if all_ts else "",
"latest": max(all_ts) if all_ts else "",
}
names = get_group_names()
return {
"group_id": group_id,
"group_name": names.get(group_id, group_id),
"total_signals": len(signals),
"date_range": date_range,
"topics": topics_summary,
"timeline": timeline,
}
# ─────────────────────────────────────────────
# Enhanced Chat / Signals Timeline
# ─────────────────────────────────────────────
@app.get("/api/signals/timeline")
async def get_signals_timeline(
group_id: str = None,
severity: str = None,
lens: str = None,
signal_type: str = None,
date_from: str = None,
date_to: str = None,
limit: int = 200,
):
"""
Cross-group signal timeline with full filter support.
Returns signals sorted newest-first, ready for timeline rendering.
"""
from backend.db.chroma import get_all_signals, get_group_ids, get_group_names
group_ids = [group_id] if group_id else get_group_ids()
names = get_group_names()
all_signals = []
for gid in group_ids:
for sig in get_all_signals(gid, signal_type=signal_type):
meta = sig.get("metadata", {})
ts = meta.get("timestamp", "")
if severity and meta.get("severity") != severity:
continue
if lens and meta.get("lens") != lens:
continue
if date_from and ts < date_from:
continue
if date_to and ts > date_to:
continue
# Exclude internal tracking signals from timeline
if meta.get("type") in ("jira_raised", "meet_started"):
continue
all_signals.append({
**sig,
"group_name": names.get(gid, gid),
})
all_signals.sort(key=lambda s: s.get("metadata", {}).get("timestamp", ""), reverse=True)
return {
"signals": all_signals[:limit],
"total": len(all_signals),
"truncated": len(all_signals) > limit,
}