"""Authentication endpoints. `POST /auth/login` returns the access token in the body and sets the refresh token in an HTTPOnly cookie scoped to `/api/v1/auth/`. The cookie is `Secure; SameSite=Strict` and only the matching paths can read it. """ from __future__ import annotations import logging from flask import Blueprint, g, jsonify, make_response, request from pydantic import BaseModel, Field, ValidationError from app.api._validation import Email from app.core.auth_decorators import require_auth from app.core.config import settings from app.core.rate_limit import limiter from app.services import auth as auth_svc bp = Blueprint("auth", __name__, url_prefix="/auth") log = logging.getLogger("metamorph.api.auth") REFRESH_COOKIE_NAME = "metamorph_refresh" REFRESH_COOKIE_PATH = "/api/v1/auth/" class LoginPayload(BaseModel): email: Email password: str = Field(min_length=1) class ChangePasswordPayload(BaseModel): current_password: str = Field(min_length=1) new_password: str = Field(min_length=8) def _set_refresh_cookie(resp, token: str, expires_at) -> None: resp.set_cookie( REFRESH_COOKIE_NAME, token, expires=expires_at, httponly=True, secure=True, # spec §M2; localhost is a secure context for modern browsers samesite="Strict", path=REFRESH_COOKIE_PATH, ) def _clear_refresh_cookie(resp) -> None: resp.set_cookie( REFRESH_COOKIE_NAME, "", expires=0, httponly=True, secure=True, # spec §M2; localhost is a secure context for modern browsers samesite="Strict", path=REFRESH_COOKIE_PATH, ) def _read_refresh_cookie() -> str | None: return request.cookies.get(REFRESH_COOKIE_NAME) def _serialize_pair(pair: auth_svc.TokenPair) -> dict: return { "access_token": pair.access_token, "token_type": "Bearer", "user_id": str(pair.user_id), } @bp.post("/login") @limiter.limit("10 per minute") def login(): try: payload = LoginPayload.model_validate(request.get_json(silent=True) or {}) except ValidationError as e: return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400 try: pair = auth_svc.login(payload.email, payload.password) except auth_svc.InvalidCredentials: return jsonify({"error": "invalid_credentials"}), 401 resp = make_response(jsonify(_serialize_pair(pair))) _set_refresh_cookie(resp, pair.refresh_token, pair.refresh_expires_at) return resp @bp.post("/refresh") @limiter.limit("10 per minute") def refresh_endpoint(): raw = _read_refresh_cookie() if not raw: return jsonify({"error": "no_refresh_cookie"}), 401 try: pair = auth_svc.refresh(raw) except auth_svc.TokenRevoked: resp = make_response(jsonify({"error": "token_revoked"}), 401) _clear_refresh_cookie(resp) return resp except auth_svc.InvalidCredentials: resp = make_response(jsonify({"error": "invalid_refresh"}), 401) _clear_refresh_cookie(resp) return resp resp = make_response(jsonify(_serialize_pair(pair))) _set_refresh_cookie(resp, pair.refresh_token, pair.refresh_expires_at) return resp @bp.post("/logout") def logout(): raw = _read_refresh_cookie() if raw: auth_svc.logout(raw) resp = make_response(jsonify({"ok": True})) _clear_refresh_cookie(resp) return resp @bp.get("/me") @require_auth def me(): u = g.current_user return jsonify( { "id": str(u.id), "email": u.email, "display_name": u.display_name, "locale": u.locale, "is_admin": u.is_admin, "groups": sorted(u.group_names), "permissions": sorted(u.permissions), } ) @bp.post("/change-password") @require_auth @limiter.limit("5 per minute") def change_password(): try: payload = ChangePasswordPayload.model_validate(request.get_json(silent=True) or {}) except ValidationError as e: return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400 try: auth_svc.change_password(g.current_user.id, payload.current_password, payload.new_password) except auth_svc.InvalidCredentials: return jsonify({"error": "current_password_incorrect"}), 400 except ValueError as e: return jsonify({"error": "weak_password", "message": str(e)}), 400 resp = make_response(jsonify({"ok": True})) _clear_refresh_cookie(resp) return resp