Files
2026-04-05 00:43:23 +05:30

324 lines
14 KiB
Python

# backend/run.py — THE MAIN ENTRY POINT
import asyncio
import json
import os
import sys
import atexit
# ─── PID lock — prevents two copies of this process running simultaneously ───
import tempfile
_PID_FILE = os.path.join(tempfile.gettempdir(), "negot8_bots.pid")
def _acquire_pid_lock():
"""Write our PID to the lock file. If a previous PID is still alive, kill it first."""
if os.path.exists(_PID_FILE):
try:
old_pid = int(open(_PID_FILE).read().strip())
os.kill(old_pid, 9) # kill the old copy unconditionally
print(f"🔫 Killed previous bot process (PID {old_pid})")
except (ValueError, ProcessLookupError, PermissionError):
pass # already dead — no problem
with open(_PID_FILE, "w") as f:
f.write(str(os.getpid()))
atexit.register(lambda: os.path.exists(_PID_FILE) and os.remove(_PID_FILE))
_acquire_pid_lock()
from telegram.ext import Application, CommandHandler, ConversationHandler, MessageHandler, CallbackQueryHandler, filters
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from agents.personal_agent import PersonalAgent
from agents.negotiation import run_negotiation
from voice.elevenlabs_tts import generate_voice_summary, build_voice_text
from tools.upi_generator import UPIGeneratorTool
import database as db
from config import *
# ─── Socket.IO emitters (optional — only active when api.py is co-running) ───
try:
from api import emit_round_update, emit_negotiation_started, emit_negotiation_resolved
_sio_available = True
except ImportError:
_sio_available = False
async def emit_round_update(*args, **kwargs): pass
async def emit_negotiation_started(*args, **kwargs): pass
async def emit_negotiation_resolved(*args, **kwargs): pass
# ─── Blockchain (Polygon Amoy) ────────────────────────────────────────────────
try:
from blockchain_web3.blockchain import register_agreement_on_chain
_blockchain_available = True
except Exception as _bc_err:
print(f"⚠️ Blockchain module unavailable: {_bc_err}")
_blockchain_available = False
async def register_agreement_on_chain(*args, **kwargs):
return {"success": False, "mock": True, "tx_hash": "0xUNAVAILABLE",
"block_number": 0, "agreement_hash": "0x0",
"explorer_url": "", "gas_used": 0}
personal_agent = PersonalAgent()
upi_tool = UPIGeneratorTool()
pending_coordinations = {}
bot_apps = {}
async def send_to_user(bot, user_id, text, reply_markup=None):
"""Send a message to any user."""
try:
await bot.send_message(chat_id=user_id, text=text, parse_mode="Markdown", reply_markup=reply_markup)
except Exception as e:
print(f"Failed to send to {user_id}: {e}")
async def send_voice_to_user(bot, user_id, audio_path):
"""Send voice note to user."""
try:
with open(audio_path, "rb") as f:
await bot.send_voice(chat_id=user_id, voice=f, caption="🎙 Voice summary from your agent")
except Exception as e:
print(f"Failed to send voice to {user_id}: {e}")
# ─── Resolution handler with voice + UPI ───
async def handle_resolution(negotiation_id, resolution, feature_type,
user_a_id, user_b_id, bot_a, bot_b,
preferences_a, preferences_b):
"""Post-resolution: generate UPI link, voice summary, analytics, send to users."""
status = resolution["status"]
proposal = resolution.get("final_proposal", {})
emoji = "" if status == "resolved" else "⚠️"
summary_text = (
f"{emoji} *Negotiation {'Complete' if status == 'resolved' else 'Needs Input'}!*\n\n"
f"📊 Resolved in {resolution['rounds_taken']} rounds\n\n"
f"📋 *Agreement:*\n{proposal.get('summary', 'See details')}\n\n"
f"*For A:* {proposal.get('for_party_a', 'See details')}\n"
f"*For B:* {proposal.get('for_party_b', 'See details')}"
)
# ─── UPI link (for expense-related features) ───
upi_markup = None
if feature_type in ("expenses", "freelance", "marketplace", "roommate"):
# Try to extract settlement from proposal details
details = proposal.get("details", {})
settlement = details.get("settlement", {})
upi_id = (preferences_a.get("raw_details", {}).get("upi_id") or
preferences_b.get("raw_details", {}).get("upi_id"))
if upi_id and settlement.get("amount"):
upi_result = await upi_tool.execute(
payee_upi=upi_id,
payee_name=settlement.get("payee_name", "User"),
amount=float(settlement["amount"]),
note=f"negoT8: {feature_type} settlement"
)
upi_link = upi_result["upi_link"]
summary_text += f"\n\n💳 *Tap to pay:* ₹{settlement['amount']:,.0f}"
upi_markup = InlineKeyboardMarkup([
[InlineKeyboardButton(
f"💳 Pay ₹{settlement['amount']:,.0f}",
url=upi_link
)]
])
# ─── Send text summary to both ───
await send_to_user(bot_a, user_a_id, summary_text, reply_markup=upi_markup)
await send_to_user(bot_b, user_b_id, summary_text, reply_markup=upi_markup)
# ─── Voice summary ───
voice_text = build_voice_text(feature_type, {
"rounds": resolution["rounds_taken"],
"summary": proposal.get("summary", "resolved"),
**proposal.get("details", {}),
**{k: v for k, v in proposal.items() if k != "details"}
})
voice_path = await generate_voice_summary(voice_text, negotiation_id, VOICE_ID_AGENT_A)
if voice_path:
await send_voice_to_user(bot_a, user_a_id, voice_path)
# Generate with different voice for User B
voice_path_b = await generate_voice_summary(voice_text, f"{negotiation_id}_b", VOICE_ID_AGENT_B)
if voice_path_b:
await send_voice_to_user(bot_b, user_b_id, voice_path_b)
# ─── Compute & store analytics ───
timeline = resolution.get("satisfaction_timeline", [])
concession_log = []
rounds = await db.get_rounds(negotiation_id)
for r in rounds:
concessions = json.loads(r["concessions_made"]) if r["concessions_made"] else []
for c in concessions:
concession_log.append({"round": r["round_number"], "by": "A" if r["proposer_id"] == user_a_id else "B", "gave_up": c})
final_sat_a = timeline[-1]["score_a"] if timeline else 50
final_sat_b = timeline[-1]["score_b"] if timeline else 50
fairness = 100 - abs(final_sat_a - final_sat_b)
await db.store_analytics({
"negotiation_id": negotiation_id,
"satisfaction_timeline": json.dumps(timeline),
"concession_log": json.dumps(concession_log),
"fairness_score": fairness,
"total_concessions_a": sum(1 for c in concession_log if c["by"] == "A"),
"total_concessions_b": sum(1 for c in concession_log if c["by"] == "B"),
})
# ─── Register agreement on Polygon Amoy (invisible to user) ──────────────
blockchain_text = ""
if status == "resolved":
try:
blockchain_result = await register_agreement_on_chain(
negotiation_id = negotiation_id,
feature_type = feature_type,
summary = proposal.get("summary", "Agreement reached"),
resolution_data = resolution,
)
await db.store_blockchain_proof(
negotiation_id = negotiation_id,
tx_hash = blockchain_result["tx_hash"],
block_number = blockchain_result.get("block_number", 0),
agreement_hash = blockchain_result["agreement_hash"],
explorer_url = blockchain_result["explorer_url"],
gas_used = blockchain_result.get("gas_used", 0),
)
if blockchain_result.get("success") and not blockchain_result.get("mock"):
blockchain_text = (
f"\n\n🔗 *Verified on Blockchain*\n"
f"This agreement is permanently recorded on Polygon\\.\n"
f"[View Proof on PolygonScan]({blockchain_result['explorer_url']})"
)
else:
blockchain_text = "\n\n🔗 _Blockchain proof pending\\.\\.\\._"
except Exception as _bc_exc:
print(f"[Blockchain] Non-critical error for {negotiation_id}: {_bc_exc}")
blockchain_text = "\n\n🔗 _Blockchain proof pending\\.\\.\\._"
if blockchain_text:
await send_to_user(bot_a, user_a_id, blockchain_text)
await send_to_user(bot_b, user_b_id, blockchain_text)
# ─── Emit final resolution to dashboard via Socket.IO ───
try:
await emit_negotiation_resolved(negotiation_id, resolution)
except Exception as e:
print(f"[Socket.IO] emit_negotiation_resolved failed (non-critical): {e}")
# ─────────────────────────────────────────────────────────────
# Entry point — starts Bot A + Bot B concurrently
# Per PTB docs (Context7): use context-manager pattern so
# initialize() and shutdown() are always called correctly.
# Source: https://github.com/python-telegram-bot/python-telegram-bot/wiki/Frequently-requested-design-patterns
# ─────────────────────────────────────────────────────────────
async def _reset_telegram_session(name: str, token: str) -> None:
"""
Forcefully clear any stale Telegram polling session for this token.
Steps:
1. deleteWebhook — removes any webhook and drops pending updates
2. getUpdates(offset=-1, timeout=0) — forces the server to close
any open long-poll held by a previously killed process
Both calls are made via asyncio-friendly httpx so we don't block.
"""
import httpx
base = f"https://api.telegram.org/bot{token}"
async with httpx.AsyncClient(timeout=10) as client:
# Step 1 — delete webhook
try:
r = await client.post(f"{base}/deleteWebhook",
json={"drop_pending_updates": True})
desc = r.json().get("description", "ok")
print(f" Bot {name} deleteWebhook: {desc}")
except Exception as e:
print(f" Bot {name} deleteWebhook failed (non-critical): {e}")
# Step 2 — drain pending updates; this causes the Telegram server
# to close any open long-poll connection from a previous process.
try:
r = await client.get(f"{base}/getUpdates",
params={"offset": -1, "timeout": 0,
"limit": 1})
print(f" Bot {name} session drained ✓")
except Exception as e:
print(f" Bot {name} drain failed (non-critical): {e}")
async def _run_single_bot(name: str, app, stop_event: asyncio.Event) -> None:
"""
Run one bot for its full lifecycle using the PTB context-manager pattern.
Context7 source:
async with application: # calls initialize() + shutdown()
await application.start()
await application.updater.start_polling(...)
# … keep alive …
await application.updater.stop()
await application.stop()
"""
from telegram import Update
async with app: # initialize + shutdown
await app.start()
await app.updater.start_polling(
allowed_updates=Update.ALL_TYPES,
drop_pending_updates=True, # ignore stale messages
poll_interval=0.5, # fast reconnect
)
print(f"▶️ Bot {name} polling...")
await stop_event.wait() # block until Ctrl+C
await app.updater.stop()
await app.stop()
print(f" Bot {name} stopped.")
async def run_bots():
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "telegram-bots"))
from bot import create_bot
# ── 1. Database ───────────────────────────────────────────────────────────
await db.init_db()
print("✅ Database initialized")
# ── 2. Collect bot tokens ─────────────────────────────────────────────────
bots_to_run = []
if TELEGRAM_BOT_TOKEN_A:
bots_to_run.append(("A", TELEGRAM_BOT_TOKEN_A))
if TELEGRAM_BOT_TOKEN_B:
bots_to_run.append(("B", TELEGRAM_BOT_TOKEN_B))
if not bots_to_run:
print("❌ No bot tokens found in .env")
return
# ── 3. Reset Telegram sessions ────────────────────────────────────────────
# This clears any long-poll held by a previously killed process.
print("🧹 Resetting Telegram sessions...")
for name, token in bots_to_run:
await _reset_telegram_session(name, token)
# PTB long-poll timeout is 10s — a brief pause lets Telegram's servers
# acknowledge the deleteWebhook + drain before we start polling.
await asyncio.sleep(2)
# ── 4. Signal handler → shared stop event ────────────────────────────────
stop_event = asyncio.Event()
loop = asyncio.get_running_loop()
try:
import signal
loop.add_signal_handler(signal.SIGINT, stop_event.set)
loop.add_signal_handler(signal.SIGTERM, stop_event.set)
except NotImplementedError:
pass # Windows fallback
# ── 5. Build & run all bots concurrently ─────────────────────────────────
apps = [(name, create_bot(token)) for name, token in bots_to_run]
print(f"\n🚀 Starting {len(apps)} bot(s)...\n")
try:
await asyncio.gather(
*[_run_single_bot(name, app, stop_event) for name, app in apps]
)
except (KeyboardInterrupt, asyncio.CancelledError):
stop_event.set()
print("👋 Done.")
if __name__ == "__main__":
asyncio.run(run_bots())