Files
B.Tech-Project-III/negot8/backend/tools/google_calendar.py
2026-04-05 00:43:23 +05:30

280 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Google Calendar Tool for negoT8.
Allows the scheduling negotiation agent to query a user's real Google Calendar
when they haven't specified available times in their message.
Flow:
1. User runs /connectcalendar in Telegram.
2. Bot sends the OAuth URL (get_oauth_url).
3. User authorises in browser → Google redirects to /api/auth/google/callback.
4. exchange_code() stores the token in the DB.
5. get_free_slots() is called automatically by SchedulingFeature.get_context()
whenever a scheduling negotiation starts without explicit times.
"""
import asyncio
import json
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
import database as db
from config import (
GOOGLE_CLIENT_ID,
GOOGLE_CLIENT_SECRET,
GOOGLE_REDIRECT_URI,
GOOGLE_CALENDAR_SCOPES,
)
# ── Module-level PKCE verifier store ──────────────────────────────────────
# Maps telegram_id → code_verifier string generated during get_oauth_url().
# Entries are cleaned up after a successful or failed exchange.
_pending_verifiers: dict[int, str] = {}
def _build_flow():
from google_auth_oauthlib.flow import Flow
return Flow.from_client_config(
{
"web": {
"client_id": GOOGLE_CLIENT_ID,
"client_secret": GOOGLE_CLIENT_SECRET,
"redirect_uris": [GOOGLE_REDIRECT_URI],
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
}
},
scopes=GOOGLE_CALENDAR_SCOPES,
redirect_uri=GOOGLE_REDIRECT_URI,
)
class GoogleCalendarTool:
name = "google_calendar"
# ── OAuth helpers ────────────────────────────────────────────────────────
async def is_connected(self, user_id: int) -> bool:
"""Check whether the user has a stored OAuth token."""
token_json = await db.get_calendar_token(user_id)
return token_json is not None
async def get_oauth_url(self, user_id: int) -> str:
"""
Build the Google OAuth2 authorisation URL manually — no PKCE.
Building it by hand avoids the library silently injecting a
code_challenge that we can't recover during token exchange.
"""
from urllib.parse import urlencode
params = {
"client_id": GOOGLE_CLIENT_ID,
"redirect_uri": GOOGLE_REDIRECT_URI,
"response_type": "code",
"scope": " ".join(GOOGLE_CALENDAR_SCOPES),
"access_type": "offline",
"prompt": "consent",
"state": str(user_id),
}
return "https://accounts.google.com/o/oauth2/auth?" + urlencode(params)
async def exchange_code(self, user_id: int, code: str) -> bool:
"""
Exchange the OAuth `code` (received in the callback) for credentials
and persist them in the DB. Returns True on success.
Uses the stored PKCE code_verifier (if any) so Google doesn't reject
the exchange with 'Missing code verifier'.
"""
try:
import httpx
# ── Manual token exchange — avoids PKCE state mismatch ──────────
# We post directly to Google's token endpoint so we're not
# dependent on a Flow instance having the right code_verifier.
verifier = _pending_verifiers.pop(user_id, None)
post_data: dict = {
"code": code,
"client_id": GOOGLE_CLIENT_ID,
"client_secret": GOOGLE_CLIENT_SECRET,
"redirect_uri": GOOGLE_REDIRECT_URI,
"grant_type": "authorization_code",
}
if verifier:
post_data["code_verifier"] = verifier
async with httpx.AsyncClient() as client:
resp = await client.post(
"https://oauth2.googleapis.com/token",
data=post_data,
timeout=15.0,
)
if resp.status_code != 200:
print(f"[GoogleCalendar] token exchange HTTP {resp.status_code}: {resp.text}")
return False
token_data = resp.json()
# Build a Credentials-compatible JSON that google-auth can reload
import datetime as _dt
expires_in = token_data.get("expires_in", 3600)
expiry = (
_dt.datetime.utcnow() + _dt.timedelta(seconds=expires_in)
).isoformat() + "Z"
creds_json = json.dumps({
"token": token_data["access_token"],
"refresh_token": token_data.get("refresh_token"),
"token_uri": "https://oauth2.googleapis.com/token",
"client_id": GOOGLE_CLIENT_ID,
"client_secret": GOOGLE_CLIENT_SECRET,
"scopes": GOOGLE_CALENDAR_SCOPES,
"expiry": expiry,
})
await db.save_calendar_token(user_id, creds_json)
return True
except Exception as e:
_pending_verifiers.pop(user_id, None) # clean up on failure
print(f"[GoogleCalendar] exchange_code failed for user {user_id}: {e}")
return False
# ── Main tool: free slot discovery ──────────────────────────────────────
async def get_free_slots(
self,
user_id: int,
days_ahead: int = 7,
duration_minutes: int = 60,
timezone_str: str = "Asia/Kolkata",
) -> list[str]:
"""
Return up to 6 free time slots for the user over the next `days_ahead`
days, each at least `duration_minutes` long.
Slots are within business hours (9 AM 7 PM local time) and exclude
any existing calendar events (busy intervals from the freeBusy API).
Returns [] if the user hasn't connected their calendar, or on any
API / network error — never raises so negotiations always continue.
"""
token_json = await db.get_calendar_token(user_id)
if not token_json:
return [] # user hasn't connected calendar
try:
# Run the synchronous Google API calls in a thread pool so we
# don't block the async event loop.
return await asyncio.get_event_loop().run_in_executor(
None,
self._sync_get_free_slots,
token_json,
user_id,
days_ahead,
duration_minutes,
timezone_str,
)
except Exception as e:
print(f"[GoogleCalendar] get_free_slots failed for user {user_id}: {e}")
return []
def _sync_get_free_slots(
self,
token_json: str,
user_id: int,
days_ahead: int,
duration_minutes: int,
timezone_str: str,
) -> list[str]:
"""Synchronous implementation (runs in executor)."""
import asyncio
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
# ── Load & refresh credentials ───────────────────────────────────
creds = Credentials.from_authorized_user_info(
json.loads(token_json), GOOGLE_CALENDAR_SCOPES
)
if creds.expired and creds.refresh_token:
creds.refresh(Request())
# Persist refreshed token (fire-and-forget via a new event loop)
try:
loop = asyncio.new_event_loop()
loop.run_until_complete(
db.save_calendar_token(user_id, creds.to_json())
)
loop.close()
except Exception:
pass # non-critical
service = build("calendar", "v3", credentials=creds, cache_discovery=False)
tz = ZoneInfo(timezone_str)
now = datetime.now(tz)
# Start from the next whole hour
query_start = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
query_end = query_start + timedelta(days=days_ahead)
time_min = query_start.astimezone(timezone.utc).isoformat()
time_max = query_end.astimezone(timezone.utc).isoformat()
# ── freeBusy query ───────────────────────────────────────────────
body = {
"timeMin": time_min,
"timeMax": time_max,
"timeZone": timezone_str,
"items": [{"id": "primary"}],
}
result = service.freebusy().query(body=body).execute()
busy_intervals = result.get("calendars", {}).get("primary", {}).get("busy", [])
# Convert busy intervals to (start, end) datetime pairs
busy = []
for interval in busy_intervals:
b_start = datetime.fromisoformat(interval["start"]).astimezone(tz)
b_end = datetime.fromisoformat(interval["end"]).astimezone(tz)
busy.append((b_start, b_end))
# ── Find free slots in 9 AM 7 PM business hours ───────────────
slot_duration = timedelta(minutes=duration_minutes)
free_slots: list[str] = []
cursor = query_start
while cursor < query_end and len(free_slots) < 6:
# Jump to business hours start if before 9 AM
day_start = cursor.replace(hour=9, minute=0, second=0, microsecond=0)
day_end = cursor.replace(hour=19, minute=0, second=0, microsecond=0)
if cursor < day_start:
cursor = day_start
if cursor >= day_end:
# Move to next day 9 AM
cursor = (cursor + timedelta(days=1)).replace(
hour=9, minute=0, second=0, microsecond=0
)
continue
slot_end = cursor + slot_duration
if slot_end > day_end:
cursor = (cursor + timedelta(days=1)).replace(
hour=9, minute=0, second=0, microsecond=0
)
continue
# Check for conflict with any busy interval
conflict = any(
not (slot_end <= b[0] or cursor >= b[1]) for b in busy
)
if not conflict:
label = cursor.strftime("%a %b %-d %-I:%M") + "" + slot_end.strftime("%-I:%M %p")
free_slots.append(label)
cursor = slot_end # advance by one slot (non-overlapping)
else:
cursor += timedelta(minutes=30) # try next 30-min block
return free_slots