107 lines
3.4 KiB
Python
107 lines
3.4 KiB
Python
|
|
"""User management endpoints (admin only)."""
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from flask import Blueprint, current_app, jsonify, request
|
||
|
|
|
||
|
|
from backend.app.auth import hash_password, role_required
|
||
|
|
from backend.app.extensions import db
|
||
|
|
from backend.app.models import User, UserRole
|
||
|
|
from backend.app.serializers import serialize_user
|
||
|
|
|
||
|
|
users_bp = Blueprint("users", __name__, url_prefix="/api/users")
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_role(value: object) -> UserRole | None:
|
||
|
|
if not isinstance(value, str):
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
return UserRole(value)
|
||
|
|
except ValueError:
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
@users_bp.get("")
|
||
|
|
@role_required("admin")
|
||
|
|
def list_users():
|
||
|
|
users = User.query.order_by(User.id.asc()).all()
|
||
|
|
return jsonify([serialize_user(u) for u in users]), 200
|
||
|
|
|
||
|
|
|
||
|
|
@users_bp.post("")
|
||
|
|
@role_required("admin")
|
||
|
|
def create_user():
|
||
|
|
data = request.get_json(silent=True) or {}
|
||
|
|
username = (data.get("username") or "").strip()
|
||
|
|
password = data.get("password") or ""
|
||
|
|
role_raw = (data.get("role") or "").strip()
|
||
|
|
|
||
|
|
if not username:
|
||
|
|
return jsonify({"error": "username is required"}), 400
|
||
|
|
|
||
|
|
min_len = current_app.config["MIN_PASSWORD_LENGTH"]
|
||
|
|
if len(password) < min_len:
|
||
|
|
return jsonify({"error": f"password must be at least {min_len} characters"}), 400
|
||
|
|
|
||
|
|
role = _parse_role(role_raw)
|
||
|
|
if role is None:
|
||
|
|
return jsonify({"error": "role must be one of: admin, redteam, soc"}), 400
|
||
|
|
|
||
|
|
if User.query.filter_by(username=username).first() is not None:
|
||
|
|
return jsonify({"error": "username already exists"}), 400
|
||
|
|
|
||
|
|
user = User(username=username, password_hash=hash_password(password), role=role)
|
||
|
|
db.session.add(user)
|
||
|
|
db.session.commit()
|
||
|
|
return jsonify(serialize_user(user)), 201
|
||
|
|
|
||
|
|
|
||
|
|
@users_bp.patch("/<int:user_id>")
|
||
|
|
@role_required("admin")
|
||
|
|
def update_user(user_id: int):
|
||
|
|
user = db.session.get(User, user_id)
|
||
|
|
if user is None:
|
||
|
|
return jsonify({"error": "User not found"}), 404
|
||
|
|
|
||
|
|
data = request.get_json(silent=True) or {}
|
||
|
|
|
||
|
|
if "role" in data:
|
||
|
|
new_role = _parse_role((data.get("role") or "").strip())
|
||
|
|
if new_role is None:
|
||
|
|
return jsonify({"error": "role must be one of: admin, redteam, soc"}), 400
|
||
|
|
# Refuse to demote the last admin.
|
||
|
|
if user.role == UserRole.ADMIN and new_role != UserRole.ADMIN:
|
||
|
|
admin_count = User.query.filter_by(role=UserRole.ADMIN).count()
|
||
|
|
if admin_count <= 1:
|
||
|
|
return jsonify({"error": "Cannot demote the last admin"}), 409
|
||
|
|
user.role = new_role
|
||
|
|
|
||
|
|
if "password" in data:
|
||
|
|
password = data.get("password") or ""
|
||
|
|
min_len = current_app.config["MIN_PASSWORD_LENGTH"]
|
||
|
|
if len(password) < min_len:
|
||
|
|
return (
|
||
|
|
jsonify({"error": f"password must be at least {min_len} characters"}),
|
||
|
|
400,
|
||
|
|
)
|
||
|
|
user.password_hash = hash_password(password)
|
||
|
|
|
||
|
|
db.session.commit()
|
||
|
|
return jsonify(serialize_user(user)), 200
|
||
|
|
|
||
|
|
|
||
|
|
@users_bp.delete("/<int:user_id>")
|
||
|
|
@role_required("admin")
|
||
|
|
def delete_user(user_id: int):
|
||
|
|
user = db.session.get(User, user_id)
|
||
|
|
if user is None:
|
||
|
|
return jsonify({"error": "User not found"}), 404
|
||
|
|
|
||
|
|
if user.role == UserRole.ADMIN:
|
||
|
|
admin_count = User.query.filter_by(role=UserRole.ADMIN).count()
|
||
|
|
if admin_count <= 1:
|
||
|
|
return jsonify({"error": "Cannot delete the last admin"}), 409
|
||
|
|
|
||
|
|
db.session.delete(user)
|
||
|
|
db.session.commit()
|
||
|
|
return "", 204
|