# backend/api.py — FastAPI + Socket.IO API server for negoT8 Dashboard (Milestone 6) import json import asyncio from datetime import datetime from fastapi import FastAPI, HTTPException, Request from fastapi.responses import HTMLResponse from fastapi.middleware.cors import CORSMiddleware import socketio import database as db from tools.google_calendar import GoogleCalendarTool # ─── Socket.IO server ────────────────────────────────────────────────────────── sio = socketio.AsyncServer( async_mode="asgi", cors_allowed_origins="*", logger=False, engineio_logger=False, ) # ─── FastAPI app ─────────────────────────────────────────────────────────────── app = FastAPI(title="negoT8 API", version="2.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ─── Helpers ────────────────────────────────────────────────────────────────── def _row_to_dict(row) -> dict: """Convert an aiosqlite Row or plain dict, parsing JSON string fields.""" if row is None: return {} d = dict(row) for field in ( "preferences", "proposal", "response", "concessions_made", "resolution", "satisfaction_timeline", "concession_log", ): if field in d and isinstance(d[field], str): try: d[field] = json.loads(d[field]) except (json.JSONDecodeError, TypeError): pass return d async def _build_negotiation_detail(negotiation_id: str) -> dict: """Assemble full negotiation object: meta + participants + rounds + analytics.""" import aiosqlite from config import DATABASE_PATH async with aiosqlite.connect(DATABASE_PATH) as conn: conn.row_factory = aiosqlite.Row # Negotiation meta async with conn.execute( "SELECT * FROM negotiations WHERE id = ?", (negotiation_id,) ) as cur: neg = await cur.fetchone() if neg is None: return None neg_dict = _row_to_dict(neg) # Participants (with user display info) import aiosqlite from config import DATABASE_PATH async with aiosqlite.connect(DATABASE_PATH) as conn: conn.row_factory = aiosqlite.Row async with conn.execute( """SELECT p.*, u.username, u.display_name, u.personality, u.voice_id FROM participants p LEFT JOIN users u ON u.telegram_id = p.user_id WHERE p.negotiation_id = ?""", (negotiation_id,), ) as cur: participants = [_row_to_dict(r) for r in await cur.fetchall()] # Rounds async with conn.execute( "SELECT * FROM rounds WHERE negotiation_id = ? ORDER BY round_number", (negotiation_id,), ) as cur: rounds = [_row_to_dict(r) for r in await cur.fetchall()] # Analytics async with conn.execute( "SELECT * FROM negotiation_analytics WHERE negotiation_id = ?", (negotiation_id,), ) as cur: analytics_row = await cur.fetchone() analytics = _row_to_dict(analytics_row) if analytics_row else {} if analytics.get("satisfaction_timeline") and isinstance( analytics["satisfaction_timeline"], str ): try: analytics["satisfaction_timeline"] = json.loads( analytics["satisfaction_timeline"] ) except Exception: analytics["satisfaction_timeline"] = [] if analytics.get("concession_log") and isinstance( analytics["concession_log"], str ): try: analytics["concession_log"] = json.loads(analytics["concession_log"]) except Exception: analytics["concession_log"] = [] return { **neg_dict, "participants": participants, "rounds": rounds, "analytics": analytics, } # ─── REST Endpoints ──────────────────────────────────────────────────────────── @app.get("/") async def root(): return {"status": "ok", "message": "negoT8 API v2 running"} # ─── Google Calendar OAuth Callback ─────────────────────────────────────────── @app.get("/api/auth/google/callback", response_class=HTMLResponse) async def google_calendar_callback(request: Request): """ Handles the redirect from Google after the user authorises calendar access. - `code` — OAuth authorisation code from Google - `state` — the user's telegram_id (set when building the auth URL) """ params = dict(request.query_params) code = params.get("code") state = params.get("state") # telegram_id if not code or not state: return HTMLResponse( "

❌ Missing code or state. Please try /connectcalendar again.

", status_code=400, ) try: telegram_id = int(state) except ValueError: return HTMLResponse("

❌ Invalid state parameter.

", status_code=400) cal = GoogleCalendarTool() success = await cal.exchange_code(telegram_id, code) # Notify the user in Telegram (best-effort via direct Bot API call) try: import httpx from config import TELEGRAM_BOT_TOKEN_A, TELEGRAM_BOT_TOKEN_B msg = ( "✅ *Google Calendar connected!*\n\n" "Your agent will now automatically use your real availability " "when scheduling meetings — no need to mention times manually.\n\n" "_Read-only access. Revoke anytime from myaccount.google.com → Security → Third-party apps._" if success else "❌ Failed to connect Google Calendar. Please try /connectcalendar again." ) async with httpx.AsyncClient() as client: for token in (TELEGRAM_BOT_TOKEN_A, TELEGRAM_BOT_TOKEN_B): if not token: continue resp = await client.post( f"https://api.telegram.org/bot{token}/sendMessage", json={"chat_id": telegram_id, "text": msg, "parse_mode": "Markdown"}, timeout=8.0, ) if resp.status_code == 200: break except Exception as e: print(f"[OAuth] Could not send Telegram confirmation: {e}") if success: return HTMLResponse("""

✅ Google Calendar Connected!

You can close this tab and return to Telegram.

negoT8 now has read-only access to your calendar.

""") else: return HTMLResponse("""

❌ Connection Failed

Please go back to Telegram and try /connectcalendar again.

""", status_code=500) @app.get("/api/negotiations") async def list_negotiations(): """Return all negotiations with participant count and latest status.""" import aiosqlite from config import DATABASE_PATH async with aiosqlite.connect(DATABASE_PATH) as conn: conn.row_factory = aiosqlite.Row async with conn.execute( """SELECT n.*, COUNT(p.user_id) AS participant_count FROM negotiations n LEFT JOIN participants p ON p.negotiation_id = n.id GROUP BY n.id ORDER BY n.created_at DESC""" ) as cur: rows = await cur.fetchall() negotiations = [] for row in rows: d = _row_to_dict(row) # Lightweight — don't embed full rounds here negotiations.append(d) return {"negotiations": negotiations, "total": len(negotiations)} @app.get("/api/negotiations/{negotiation_id}") async def get_negotiation(negotiation_id: str): """Return full negotiation detail: meta + participants + rounds + analytics.""" detail = await _build_negotiation_detail(negotiation_id) if detail is None: raise HTTPException(status_code=404, detail=f"Negotiation '{negotiation_id}' not found") return detail @app.get("/api/negotiations/{negotiation_id}/rounds") async def get_negotiation_rounds(negotiation_id: str): """Return just the rounds for a negotiation (useful for live updates).""" rounds = await db.get_rounds(negotiation_id) parsed = [_row_to_dict(r) for r in rounds] return {"negotiation_id": negotiation_id, "rounds": parsed, "count": len(parsed)} @app.get("/api/negotiations/{negotiation_id}/analytics") async def get_negotiation_analytics(negotiation_id: str): """Return analytics for a negotiation.""" analytics = await db.get_analytics(negotiation_id) if analytics is None: raise HTTPException(status_code=404, detail="Analytics not yet available for this negotiation") # Parse JSON strings for field in ("satisfaction_timeline", "concession_log"): if isinstance(analytics.get(field), str): try: analytics[field] = json.loads(analytics[field]) except Exception: analytics[field] = [] return analytics @app.get("/api/users/{telegram_id}") async def get_user(telegram_id: int): """Return a single user by Telegram ID.""" user = await db.get_user(telegram_id) if user is None: raise HTTPException(status_code=404, detail="User not found") return dict(user) @app.get("/api/stats") async def get_stats(): """High-level stats for the dashboard overview page.""" import aiosqlite from config import DATABASE_PATH async with aiosqlite.connect(DATABASE_PATH) as conn: conn.row_factory = aiosqlite.Row async with conn.execute("SELECT COUNT(*) AS c FROM negotiations") as cur: total_neg = (await cur.fetchone())["c"] async with conn.execute( "SELECT COUNT(*) AS c FROM negotiations WHERE status = 'resolved'" ) as cur: resolved = (await cur.fetchone())["c"] async with conn.execute( "SELECT COUNT(*) AS c FROM negotiations WHERE status = 'active'" ) as cur: active = (await cur.fetchone())["c"] async with conn.execute("SELECT COUNT(*) AS c FROM users") as cur: total_users = (await cur.fetchone())["c"] async with conn.execute( "SELECT AVG(fairness_score) AS avg_fs FROM negotiation_analytics" ) as cur: row = await cur.fetchone() avg_fairness = round(row["avg_fs"] or 0, 1) async with conn.execute( """SELECT feature_type, COUNT(*) AS c FROM negotiations GROUP BY feature_type ORDER BY c DESC""" ) as cur: feature_breakdown = [dict(r) for r in await cur.fetchall()] return { "total_negotiations": total_neg, "resolved": resolved, "active": active, "escalated": total_neg - resolved - active, "total_users": total_users, "avg_fairness_score": avg_fairness, "feature_breakdown": feature_breakdown, } # ─── Open Contracts REST Endpoints ──────────────────────────────────────────── @app.get("/api/open-contracts") async def list_open_contracts(status: str = "open"): """ Return open contracts (default: status=open). Pass ?status=all to get every contract regardless of status. """ if status == "all": import aiosqlite from config import DATABASE_PATH async with aiosqlite.connect(DATABASE_PATH) as conn: conn.row_factory = aiosqlite.Row async with conn.execute( """SELECT oc.*, u.username AS poster_username, u.display_name AS poster_name, COUNT(ca.id) AS application_count FROM open_contracts oc LEFT JOIN users u ON u.telegram_id = oc.poster_id LEFT JOIN contract_applications ca ON ca.contract_id = oc.id GROUP BY oc.id ORDER BY oc.created_at DESC""" ) as cur: rows = await cur.fetchall() contracts = [] for r in rows: d = dict(r) if isinstance(d.get("requirements"), str): try: import json as _json d["requirements"] = _json.loads(d["requirements"]) except Exception: pass contracts.append(d) else: contracts = await db.get_open_contracts(status=status) return {"contracts": contracts, "total": len(contracts)} @app.get("/api/open-contracts/{contract_id}") async def get_open_contract(contract_id: str): """ Return full detail for a single open contract including ranked applicants. """ contract = await db.get_open_contract(contract_id) if contract is None: raise HTTPException(status_code=404, detail=f"Contract '{contract_id}' not found") applications = await db.get_applications(contract_id) # applications are already sorted by match_score DESC from the DB helper # Parse preferences in each application for app_row in applications: if isinstance(app_row.get("preferences"), str): try: app_row["preferences"] = json.loads(app_row["preferences"]) except Exception: pass return { **contract, "applications": applications, "application_count": len(applications), } # ─── Socket.IO Events ────────────────────────────────────────────────────────── @sio.event async def connect(sid, environ): print(f"[Socket.IO] Client connected: {sid}") @sio.event async def disconnect(sid): print(f"[Socket.IO] Client disconnected: {sid}") @sio.event async def join_negotiation(sid, data): """ Client emits: { negotiation_id: "abc123" } Server joins the socket into a room named after the negotiation_id. Then immediately sends the current state. """ if isinstance(data, str): negotiation_id = data else: negotiation_id = data.get("negotiation_id") or data.get("id", "") if not negotiation_id: await sio.emit("error", {"message": "negotiation_id required"}, to=sid) return await sio.enter_room(sid, negotiation_id) print(f"[Socket.IO] {sid} joined room: {negotiation_id}") # Send current state immediately detail = await _build_negotiation_detail(negotiation_id) if detail: await sio.emit("negotiation_state", detail, to=sid) else: await sio.emit("error", {"message": f"Negotiation '{negotiation_id}' not found"}, to=sid) @sio.event async def leave_negotiation(sid, data): """Client emits: { negotiation_id: "abc123" }""" if isinstance(data, str): negotiation_id = data else: negotiation_id = data.get("negotiation_id", "") if negotiation_id: await sio.leave_room(sid, negotiation_id) print(f"[Socket.IO] {sid} left room: {negotiation_id}") # ─── Socket.IO emit helpers (called from run.py / negotiation engine) ────────── async def emit_round_update(negotiation_id: str, round_data: dict): """ Called by the negotiation engine after each round completes. Broadcasts to all dashboard clients watching this negotiation. """ await sio.emit( "round_update", { "negotiation_id": negotiation_id, "round": round_data, "timestamp": datetime.utcnow().isoformat(), }, room=negotiation_id, ) async def emit_negotiation_started(negotiation_id: str, feature_type: str, participants: list): """Broadcast when a new negotiation kicks off.""" await sio.emit( "negotiation_started", { "negotiation_id": negotiation_id, "feature_type": feature_type, "participants": participants, "timestamp": datetime.utcnow().isoformat(), }, room=negotiation_id, ) async def emit_negotiation_resolved(negotiation_id: str, resolution: dict): """Broadcast the final resolution to all watchers.""" await sio.emit( "negotiation_resolved", { "negotiation_id": negotiation_id, "resolution": resolution, "timestamp": datetime.utcnow().isoformat(), }, room=negotiation_id, ) # ─── ASGI app (wraps FastAPI with Socket.IO) ─────────────────────────────────── # This is what uvicorn runs — it combines both the REST API and the WS server. socket_app = socketio.ASGIApp(sio, other_asgi_app=app)