import aiosqlite import json import uuid from datetime import datetime from config import DATABASE_PATH async def init_db(): async with aiosqlite.connect(DATABASE_PATH) as db: await db.execute(""" CREATE TABLE IF NOT EXISTS users ( telegram_id INTEGER PRIMARY KEY, username TEXT, display_name TEXT, personality TEXT DEFAULT 'balanced', voice_id TEXT DEFAULT 'pNInz6obpgDQGcFmaJgB', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) await db.execute(""" CREATE TABLE IF NOT EXISTS negotiations ( id TEXT PRIMARY KEY, feature_type TEXT, status TEXT DEFAULT 'pending', initiator_id INTEGER, resolution TEXT, voice_summary_file TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) await db.execute(""" CREATE TABLE IF NOT EXISTS participants ( negotiation_id TEXT REFERENCES negotiations(id), user_id INTEGER, preferences TEXT, personality_used TEXT DEFAULT 'balanced', PRIMARY KEY (negotiation_id, user_id) ) """) await db.execute(""" CREATE TABLE IF NOT EXISTS rounds ( id INTEGER PRIMARY KEY AUTOINCREMENT, negotiation_id TEXT REFERENCES negotiations(id), round_number INTEGER, proposer_id INTEGER, proposal TEXT, response_type TEXT, response TEXT, reasoning TEXT, satisfaction_a REAL, satisfaction_b REAL, concessions_made TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) await db.execute(""" CREATE TABLE IF NOT EXISTS tool_calls ( id INTEGER PRIMARY KEY AUTOINCREMENT, negotiation_id TEXT REFERENCES negotiations(id), tool_name TEXT, tool_input TEXT, tool_output TEXT, called_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) await db.execute(""" CREATE TABLE IF NOT EXISTS negotiation_analytics ( negotiation_id TEXT PRIMARY KEY REFERENCES negotiations(id), satisfaction_timeline TEXT, concession_log TEXT, fairness_score REAL, total_concessions_a INTEGER DEFAULT 0, total_concessions_b INTEGER DEFAULT 0, computed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) await db.execute(""" CREATE TABLE IF NOT EXISTS user_calendar_tokens ( telegram_id INTEGER PRIMARY KEY, token_json TEXT NOT NULL, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) await db.execute(""" CREATE TABLE IF NOT EXISTS blockchain_proofs ( negotiation_id TEXT PRIMARY KEY REFERENCES negotiations(id), tx_hash TEXT, block_number INTEGER, agreement_hash TEXT, explorer_url TEXT, gas_used INTEGER DEFAULT 0, recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # ─── Open Contracts (public marketplace) ───────────────────────────────── await db.execute(""" CREATE TABLE IF NOT EXISTS open_contracts ( id TEXT PRIMARY KEY, poster_id INTEGER NOT NULL, contract_type TEXT NOT NULL, title TEXT NOT NULL, description TEXT NOT NULL, requirements TEXT, status TEXT DEFAULT 'open', matched_applicant_id INTEGER, negotiation_id TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) await db.execute(""" CREATE TABLE IF NOT EXISTS contract_applications ( id INTEGER PRIMARY KEY AUTOINCREMENT, contract_id TEXT REFERENCES open_contracts(id), applicant_id INTEGER NOT NULL, preferences TEXT, match_score REAL DEFAULT 0, match_reasoning TEXT DEFAULT '', status TEXT DEFAULT 'pending', applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) await db.commit() print("✅ Database initialized (v5 schema — open contracts + applications added)") # ─── User helpers ─────────────────────────────────────────────────────────── async def create_user(telegram_id: int, username: str, display_name: str): async with aiosqlite.connect(DATABASE_PATH) as db: await db.execute( """INSERT OR IGNORE INTO users (telegram_id, username, display_name) VALUES (?, ?, ?)""", (telegram_id, username or "", display_name or ""), ) await db.commit() async def get_user(telegram_id: int): async with aiosqlite.connect(DATABASE_PATH) as db: db.row_factory = aiosqlite.Row async with db.execute( "SELECT * FROM users WHERE telegram_id = ?", (telegram_id,) ) as cursor: return await cursor.fetchone() async def update_user_personality(telegram_id: int, personality: str): async with aiosqlite.connect(DATABASE_PATH) as db: await db.execute( "UPDATE users SET personality = ? WHERE telegram_id = ?", (personality, telegram_id), ) await db.commit() # ─── Negotiation helpers ───────────────────────────────────────────────────── async def create_negotiation(feature_type: str, initiator_id: int) -> str: neg_id = str(uuid.uuid4())[:8] async with aiosqlite.connect(DATABASE_PATH) as db: await db.execute( """INSERT INTO negotiations (id, feature_type, initiator_id, status) VALUES (?, ?, ?, 'pending')""", (neg_id, feature_type, initiator_id), ) await db.commit() return neg_id async def add_participant( negotiation_id: str, user_id: int, preferences: dict, personality_used: str = "balanced", ): async with aiosqlite.connect(DATABASE_PATH) as db: await db.execute( """INSERT OR REPLACE INTO participants (negotiation_id, user_id, preferences, personality_used) VALUES (?, ?, ?, ?)""", (negotiation_id, user_id, json.dumps(preferences), personality_used), ) await db.commit() async def get_participants(negotiation_id: str): async with aiosqlite.connect(DATABASE_PATH) as db: db.row_factory = aiosqlite.Row async with db.execute( "SELECT * FROM participants WHERE negotiation_id = ?", (negotiation_id,) ) as cursor: rows = await cursor.fetchall() return [dict(r) for r in rows] async def save_round( negotiation_id: str, round_number: int, proposer_id: int, proposal: dict, response_type: str = None, response: dict = None, reasoning: str = None, satisfaction_a: float = None, satisfaction_b: float = None, concessions_made: list = None, ): async with aiosqlite.connect(DATABASE_PATH) as db: await db.execute( """INSERT INTO rounds (negotiation_id, round_number, proposer_id, proposal, response_type, response, reasoning, satisfaction_a, satisfaction_b, concessions_made) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( negotiation_id, round_number, proposer_id, json.dumps(proposal), response_type, json.dumps(response) if response else None, reasoning, satisfaction_a, satisfaction_b, json.dumps(concessions_made or []), ), ) await db.commit() async def get_rounds(negotiation_id: str): async with aiosqlite.connect(DATABASE_PATH) as db: db.row_factory = aiosqlite.Row async with db.execute( "SELECT * FROM rounds WHERE negotiation_id = ? ORDER BY round_number", (negotiation_id,), ) as cursor: rows = await cursor.fetchall() return [dict(r) for r in rows] async def update_negotiation_status( negotiation_id: str, status: str, resolution: dict = None, voice_file: str = None, ): async with aiosqlite.connect(DATABASE_PATH) as db: await db.execute( """UPDATE negotiations SET status = ?, resolution = ?, voice_summary_file = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?""", ( status, json.dumps(resolution) if resolution else None, voice_file, negotiation_id, ), ) await db.commit() async def store_analytics(analytics: dict): async with aiosqlite.connect(DATABASE_PATH) as db: await db.execute( """INSERT OR REPLACE INTO negotiation_analytics (negotiation_id, satisfaction_timeline, concession_log, fairness_score, total_concessions_a, total_concessions_b) VALUES (?, ?, ?, ?, ?, ?)""", ( analytics["negotiation_id"], analytics.get("satisfaction_timeline", "[]"), analytics.get("concession_log", "[]"), analytics.get("fairness_score", 0), analytics.get("total_concessions_a", 0), analytics.get("total_concessions_b", 0), ), ) await db.commit() # ─── Google Calendar token helpers ───────────────────────────────────────── async def save_calendar_token(telegram_id: int, token_json: str): """Upsert a user's Google OAuth2 credentials (serialized JSON).""" async with aiosqlite.connect(DATABASE_PATH) as db: await db.execute( """INSERT INTO user_calendar_tokens (telegram_id, token_json, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP) ON CONFLICT(telegram_id) DO UPDATE SET token_json = excluded.token_json, updated_at = CURRENT_TIMESTAMP""", (telegram_id, token_json), ) await db.commit() async def get_calendar_token(telegram_id: int) -> str: """Return stored token JSON for a user, or None if not connected.""" async with aiosqlite.connect(DATABASE_PATH) as db: db.row_factory = aiosqlite.Row async with db.execute( "SELECT token_json FROM user_calendar_tokens WHERE telegram_id = ?", (telegram_id,), ) as cursor: row = await cursor.fetchone() return row["token_json"] if row else None async def get_analytics(negotiation_id: str): async with aiosqlite.connect(DATABASE_PATH) as db: db.row_factory = aiosqlite.Row async with db.execute( "SELECT * FROM negotiation_analytics WHERE negotiation_id = ?", (negotiation_id,), ) as cursor: row = await cursor.fetchone() return dict(row) if row else None # ─── Blockchain proof helpers ──────────────────────────────────────────────── async def store_blockchain_proof( negotiation_id: str, tx_hash: str, block_number: int, agreement_hash: str, explorer_url: str, gas_used: int = 0, ): async with aiosqlite.connect(DATABASE_PATH) as db: await db.execute( """INSERT OR REPLACE INTO blockchain_proofs (negotiation_id, tx_hash, block_number, agreement_hash, explorer_url, gas_used) VALUES (?, ?, ?, ?, ?, ?)""", (negotiation_id, tx_hash, block_number, agreement_hash, explorer_url, gas_used), ) await db.commit() async def get_blockchain_proof(negotiation_id: str): async with aiosqlite.connect(DATABASE_PATH) as db: db.row_factory = aiosqlite.Row async with db.execute( "SELECT * FROM blockchain_proofs WHERE negotiation_id = ?", (negotiation_id,), ) as cursor: row = await cursor.fetchone() return dict(row) if row else None async def get_negotiation(negotiation_id: str): """Return a single negotiation row by ID, or None if not found.""" async with aiosqlite.connect(DATABASE_PATH) as db: db.row_factory = aiosqlite.Row async with db.execute( "SELECT * FROM negotiations WHERE id = ?", (negotiation_id,), ) as cursor: row = await cursor.fetchone() return dict(row) if row else None async def get_latest_negotiation_for_user(telegram_id: int): """Return the most recent resolved negotiation_id that the given user participated in.""" async with aiosqlite.connect(DATABASE_PATH) as db: db.row_factory = aiosqlite.Row # First try: find a resolved negotiation where the user is the initiator async with db.execute( """SELECT n.id FROM negotiations n JOIN participants p ON p.negotiation_id = n.id WHERE p.user_id = ? AND n.status = 'resolved' ORDER BY n.created_at DESC LIMIT 1""", (telegram_id,), ) as cursor: row = await cursor.fetchone() return row["id"] if row else None # ─── Open Contracts helpers ────────────────────────────────────────────────── async def create_open_contract( poster_id: int, contract_type: str, title: str, description: str, requirements: dict, ) -> str: """Create a new open contract. Returns the 8-char UUID id.""" contract_id = str(uuid.uuid4())[:8] async with aiosqlite.connect(DATABASE_PATH) as db: await db.execute( """INSERT INTO open_contracts (id, poster_id, contract_type, title, description, requirements, status) VALUES (?, ?, ?, ?, ?, ?, 'open')""", (contract_id, poster_id, contract_type, title, description, json.dumps(requirements)), ) await db.commit() return contract_id async def get_open_contracts(status: str = "open") -> list: """Return all contracts with the given status, newest first.""" async with aiosqlite.connect(DATABASE_PATH) as db: db.row_factory = aiosqlite.Row async with db.execute( """SELECT oc.*, u.username AS poster_username, u.display_name AS poster_name, COUNT(ca.id) AS application_count FROM open_contracts oc LEFT JOIN users u ON u.telegram_id = oc.poster_id LEFT JOIN contract_applications ca ON ca.contract_id = oc.id WHERE oc.status = ? GROUP BY oc.id ORDER BY oc.created_at DESC""", (status,), ) as cur: rows = await cur.fetchall() result = [] for r in rows: d = dict(r) if isinstance(d.get("requirements"), str): try: d["requirements"] = json.loads(d["requirements"]) except Exception: pass result.append(d) return result async def get_open_contract(contract_id: str): """Return a single open contract by ID, or None.""" async with aiosqlite.connect(DATABASE_PATH) as db: db.row_factory = aiosqlite.Row async with db.execute( """SELECT oc.*, u.username AS poster_username, u.display_name AS poster_name FROM open_contracts oc LEFT JOIN users u ON u.telegram_id = oc.poster_id WHERE oc.id = ?""", (contract_id,), ) as cur: row = await cur.fetchone() if not row: return None d = dict(row) if isinstance(d.get("requirements"), str): try: d["requirements"] = json.loads(d["requirements"]) except Exception: pass return d async def add_application(contract_id: str, applicant_id: int, preferences: dict) -> int: """Add an application to an open contract. Returns the new application row id.""" async with aiosqlite.connect(DATABASE_PATH) as db: cursor = await db.execute( """INSERT INTO contract_applications (contract_id, applicant_id, preferences, status) VALUES (?, ?, ?, 'pending')""", (contract_id, applicant_id, json.dumps(preferences)), ) await db.commit() return cursor.lastrowid async def get_applications(contract_id: str) -> list: """Return all applications for a contract, sorted by match_score descending.""" async with aiosqlite.connect(DATABASE_PATH) as db: db.row_factory = aiosqlite.Row async with db.execute( """SELECT ca.*, u.username, u.display_name FROM contract_applications ca LEFT JOIN users u ON u.telegram_id = ca.applicant_id WHERE ca.contract_id = ? ORDER BY ca.match_score DESC""", (contract_id,), ) as cur: rows = await cur.fetchall() result = [] for r in rows: d = dict(r) if isinstance(d.get("preferences"), str): try: d["preferences"] = json.loads(d["preferences"]) except Exception: pass result.append(d) return result async def update_application_match_score(app_id: int, score: float, reasoning: str): """Update the AI match score + reasoning for an application row.""" async with aiosqlite.connect(DATABASE_PATH) as db: await db.execute( """UPDATE contract_applications SET match_score = ?, match_reasoning = ? WHERE id = ?""", (score, reasoning, app_id), ) await db.commit() async def claim_contract(contract_id: str, applicant_id: int, negotiation_id: str): """Mark a contract as 'negotiating', lock in the matched applicant, and link the negotiation.""" async with aiosqlite.connect(DATABASE_PATH) as db: await db.execute( """UPDATE open_contracts SET status = 'negotiating', matched_applicant_id = ?, negotiation_id = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?""", (applicant_id, negotiation_id, contract_id), ) await db.execute( "UPDATE contract_applications SET status = 'selected' WHERE contract_id = ? AND applicant_id = ?", (contract_id, applicant_id), ) await db.execute( "UPDATE contract_applications SET status = 'rejected' WHERE contract_id = ? AND applicant_id != ?", (contract_id, applicant_id), ) await db.commit() async def close_contract(contract_id: str): """Mark a contract as resolved/closed after negotiation completes.""" async with aiosqlite.connect(DATABASE_PATH) as db: await db.execute( "UPDATE open_contracts SET status = 'resolved', updated_at = CURRENT_TIMESTAMP WHERE id = ?", (contract_id,), ) await db.commit() async def get_contracts_by_poster(poster_id: int) -> list: """Return all open contracts created by a given poster, newest first.""" async with aiosqlite.connect(DATABASE_PATH) as db: db.row_factory = aiosqlite.Row async with db.execute( """SELECT oc.*, COUNT(ca.id) AS application_count, MAX(ca.match_score) AS best_score FROM open_contracts oc LEFT JOIN contract_applications ca ON ca.contract_id = oc.id WHERE oc.poster_id = ? GROUP BY oc.id ORDER BY oc.created_at DESC""", (poster_id,), ) as cur: rows = await cur.fetchall() result = [] for r in rows: d = dict(r) if isinstance(d.get("requirements"), str): try: d["requirements"] = json.loads(d["requirements"]) except Exception: pass result.append(d) return result