"""Auth decorators that gate routes on a valid JWT and (optionally) role.""" from __future__ import annotations from collections.abc import Callable from functools import wraps from typing import Any import jwt from flask import g, jsonify, request from backend.app.auth.jwt import decode_token from backend.app.extensions import db from backend.app.models import User def _extract_token() -> str | None: header = request.headers.get("Authorization", "") if not header.startswith("Bearer "): return None return header.removeprefix("Bearer ").strip() or None def login_required(fn: Callable[..., Any]) -> Callable[..., Any]: """Require a valid JWT. Populates `g.current_user` with the User row.""" @wraps(fn) def wrapper(*args: Any, **kwargs: Any) -> Any: token = _extract_token() if not token: return jsonify({"error": "Missing or invalid Authorization header"}), 401 try: payload = decode_token(token) except jwt.ExpiredSignatureError: return jsonify({"error": "Token expired"}), 401 except jwt.PyJWTError: return jsonify({"error": "Invalid token"}), 401 try: user_id = int(payload["sub"]) except (KeyError, TypeError, ValueError): return jsonify({"error": "Invalid token payload"}), 401 user = db.session.get(User, user_id) if user is None: return jsonify({"error": "User no longer exists"}), 401 g.current_user = user return fn(*args, **kwargs) return wrapper def role_required(*roles: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Require the current user to hold one of the given role names.""" def decorator(fn: Callable[..., Any]) -> Callable[..., Any]: @wraps(fn) @login_required def wrapper(*args: Any, **kwargs: Any) -> Any: user = g.current_user if user.role.value not in roles: return jsonify({"error": "Forbidden"}), 403 return fn(*args, **kwargs) return wrapper return decorator