68 lines
2.1 KiB
Python
68 lines
2.1 KiB
Python
|
|
"""Auth decorators that gate routes on a valid JWT and (optionally) role."""
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from collections.abc import Callable
|
||
|
|
from functools import wraps
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
import jwt
|
||
|
|
from flask import g, jsonify, request
|
||
|
|
|
||
|
|
from backend.app.auth.jwt import decode_token
|
||
|
|
from backend.app.extensions import db
|
||
|
|
from backend.app.models import User
|
||
|
|
|
||
|
|
|
||
|
|
def _extract_token() -> str | None:
|
||
|
|
header = request.headers.get("Authorization", "")
|
||
|
|
if not header.startswith("Bearer "):
|
||
|
|
return None
|
||
|
|
return header.removeprefix("Bearer ").strip() or None
|
||
|
|
|
||
|
|
|
||
|
|
def login_required(fn: Callable[..., Any]) -> Callable[..., Any]:
|
||
|
|
"""Require a valid JWT. Populates `g.current_user` with the User row."""
|
||
|
|
|
||
|
|
@wraps(fn)
|
||
|
|
def wrapper(*args: Any, **kwargs: Any) -> Any:
|
||
|
|
token = _extract_token()
|
||
|
|
if not token:
|
||
|
|
return jsonify({"error": "Missing or invalid Authorization header"}), 401
|
||
|
|
try:
|
||
|
|
payload = decode_token(token)
|
||
|
|
except jwt.ExpiredSignatureError:
|
||
|
|
return jsonify({"error": "Token expired"}), 401
|
||
|
|
except jwt.PyJWTError:
|
||
|
|
return jsonify({"error": "Invalid token"}), 401
|
||
|
|
|
||
|
|
try:
|
||
|
|
user_id = int(payload["sub"])
|
||
|
|
except (KeyError, TypeError, ValueError):
|
||
|
|
return jsonify({"error": "Invalid token payload"}), 401
|
||
|
|
|
||
|
|
user = db.session.get(User, user_id)
|
||
|
|
if user is None:
|
||
|
|
return jsonify({"error": "User no longer exists"}), 401
|
||
|
|
|
||
|
|
g.current_user = user
|
||
|
|
return fn(*args, **kwargs)
|
||
|
|
|
||
|
|
return wrapper
|
||
|
|
|
||
|
|
|
||
|
|
def role_required(*roles: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
|
||
|
|
"""Require the current user to hold one of the given role names."""
|
||
|
|
|
||
|
|
def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
|
||
|
|
@wraps(fn)
|
||
|
|
@login_required
|
||
|
|
def wrapper(*args: Any, **kwargs: Any) -> Any:
|
||
|
|
user = g.current_user
|
||
|
|
if user.role.value not in roles:
|
||
|
|
return jsonify({"error": "Forbidden"}), 403
|
||
|
|
return fn(*args, **kwargs)
|
||
|
|
|
||
|
|
return wrapper
|
||
|
|
|
||
|
|
return decorator
|