// JWT utilities for token expiry checking (no signature verification required) use anyhow::{anyhow, bail}; use base64::{Engine, engine::general_purpose::URL_SAFE_NO_PAD}; use std::time::{SystemTime, UNIX_EPOCH}; fn now_secs() -> i64 { SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs() as i64 } /// Decode the `exp` field from a JWT payload without signature verification. pub fn jwt_exp(token: &str) -> Result { let parts: Vec<&str> = token.split('.').collect(); if parts.len() < 2 { bail!("Invalid JWT: expected at least 2 dot-separated parts"); } let payload_bytes = URL_SAFE_NO_PAD.decode(parts[1])?; let json: serde_json::Value = serde_json::from_slice(&payload_bytes)?; let exp = json["exp"] .as_i64() .ok_or_else(|| anyhow!("JWT payload missing 'exp' field"))?; Ok(exp) } /// Returns true when the access token expires within 10 seconds /// and the refresh token has not yet expired (with 10-second buffer). /// This mirrors Python's `Token.is_refresh_required`. pub fn is_refresh_required(access: &str, refresh: &str) -> bool { let t = now_secs(); let access_exp = jwt_exp(access).unwrap_or(0); let refresh_exp = jwt_exp(refresh).unwrap_or(0); (t + 10) > access_exp && (t - 10) < refresh_exp } /// Returns true when the refresh token itself has expired (with 10-second buffer). /// This mirrors Python's `Token.is_expired`. pub fn is_expired(refresh: &str) -> bool { let t = now_secs(); let refresh_exp = jwt_exp(refresh).unwrap_or(0); (t - 10) > refresh_exp }