feat(backend): /api/v1/users CRUD endpoints (sprint 2)

Four routes, all gated by `USER_MANAGE` (D-015 — rt_lead only):

- `GET /api/v1/users` paginated, optional `?type=` filter. Returns
  `Page[UserRead]`.
- `POST /api/v1/users` hashes the password with bcrypt, attaches the user
  to the matching F11 group (rt_operator → `rt_operator`, rt_lead →
  `rt_lead`, soc_analyst → `soc_analyst`). Returns 201 with `UserRead`, or
  409 `email_taken` on duplicate email (active or already-disabled).
- `PATCH /api/v1/users/<uid>` partial. Changing `type` realigns the user's
  *global* F11 membership (engagement_id IS NULL) and leaves per-engagement
  memberships untouched. Password rotation rehashes with bcrypt; audit row
  carries a `password_rotated` flag rather than logging the value.
- `DELETE /api/v1/users/<uid>` sets `disabled_at = now()`. Idempotent —
  a second call on a disabled user returns 204 without an extra audit row.
  Hard-delete is intentionally absent: keeps audit-trail FKs valid and
  matches NF-AUDIT (we never lose actor-id linkage).

`_TYPE_TO_GROUP` translates `UserType` enum → `GroupName`; `_resolve_group`
loads the corresponding row from `group` (raises 500 if the seed didn't
run, surfaced through the global JSON error handler).
This commit is contained in:
knacky
2026-05-23 15:53:09 +02:00
parent feda5d1485
commit e2f030e0e1
2 changed files with 191 additions and 0 deletions

View File

@@ -4,16 +4,20 @@ from __future__ import annotations
from flask import Flask from flask import Flask
from mimic.api.audit import bp as audit_bp
from mimic.api.auth import bp as auth_bp from mimic.api.auth import bp as auth_bp
from mimic.api.engagements import bp as engagements_bp from mimic.api.engagements import bp as engagements_bp
from mimic.api.hosts import bp as hosts_bp from mimic.api.hosts import bp as hosts_bp
from mimic.api.scenarios import bp as scenarios_bp from mimic.api.scenarios import bp as scenarios_bp
from mimic.api.ttps import bp as ttps_bp from mimic.api.ttps import bp as ttps_bp
from mimic.api.users import bp as users_bp
def register_blueprints(app: Flask) -> None: def register_blueprints(app: Flask) -> None:
app.register_blueprint(auth_bp, url_prefix="/api/v1/auth") app.register_blueprint(auth_bp, url_prefix="/api/v1/auth")
app.register_blueprint(users_bp, url_prefix="/api/v1/users")
app.register_blueprint(engagements_bp, url_prefix="/api/v1/engagements") app.register_blueprint(engagements_bp, url_prefix="/api/v1/engagements")
app.register_blueprint(hosts_bp, url_prefix="/api/v1") app.register_blueprint(hosts_bp, url_prefix="/api/v1")
app.register_blueprint(ttps_bp, url_prefix="/api/v1/library/ttps") app.register_blueprint(ttps_bp, url_prefix="/api/v1/library/ttps")
app.register_blueprint(scenarios_bp, url_prefix="/api/v1") app.register_blueprint(scenarios_bp, url_prefix="/api/v1")
app.register_blueprint(audit_bp, url_prefix="/api/v1/audit")

View File

@@ -0,0 +1,187 @@
"""User management endpoints (rt_lead only — D-015).
All four routes require `USER_MANAGE`. The `DELETE` endpoint is a soft-disable
(sets `disabled_at`); we never hard-delete users so the audit trail and
authored resources keep their FK targets.
"""
from __future__ import annotations
from datetime import UTC, datetime
from flask import Blueprint, abort, jsonify
from flask.typing import ResponseReturnValue
from sqlalchemy import func, select
from sqlalchemy.orm import selectinload
from mimic.api._helpers import (
api_error,
audit_write,
jsonify_model,
parse_body,
parse_page_query,
parse_uuid,
)
from mimic.auth.password import hash_password
from mimic.db.models import Group, User, UserGroup
from mimic.db.types import UserType
from mimic.extensions import db
from mimic.rbac import Permission, require_perm
from mimic.rbac.matrix import GroupName
from mimic.schemas import Page, UserCreate, UserRead, UserUpdate
bp = Blueprint("users", __name__)
_TYPE_TO_GROUP: dict[UserType, GroupName] = {
UserType.RT_OPERATOR: GroupName.RT_OPERATOR,
UserType.RT_LEAD: GroupName.RT_LEAD,
UserType.SOC_ANALYST: GroupName.SOC_ANALYST,
}
def _resolve_group(user_type: UserType) -> Group:
group_name = _TYPE_TO_GROUP[user_type]
group = db.session.execute(
select(Group).where(Group.name == group_name.value)
).scalar_one_or_none()
if group is None:
abort(500, description=f"group {group_name.value!r} missing; run migrations")
return group
@bp.get("")
@require_perm(Permission.USER_MANAGE)
def list_users() -> ResponseReturnValue:
page_query = parse_page_query()
type_filter = _parse_type_filter()
base = select(User)
if type_filter is not None:
base = base.where(User.type == type_filter)
total = db.session.execute(select(func.count()).select_from(base.subquery())).scalar_one()
rows = (
db.session.execute(
base.order_by(User.created_at.desc()).offset(page_query.offset).limit(page_query.limit)
)
.scalars()
.all()
)
page = Page[UserRead](
items=[UserRead.model_validate(row) for row in rows],
total=total,
page=page_query.page,
page_size=page_query.page_size,
)
return jsonify(page.model_dump(mode="json"))
def _parse_type_filter() -> UserType | None:
from flask import request # noqa: PLC0415 — narrow to keep module import lean
raw = request.args.get("type")
if raw is None:
return None
try:
return UserType(raw)
except ValueError:
abort(422, description=[{"loc": ["type"], "msg": "invalid user type", "type": "enum"}])
@bp.post("")
@require_perm(Permission.USER_MANAGE)
def create_user() -> ResponseReturnValue:
payload = parse_body(UserCreate)
existing = db.session.execute(
select(User).where(User.email == payload.email)
).scalar_one_or_none()
if existing is not None:
return api_error("email_taken", "user with this email already exists", 409)
user = User(
email=payload.email,
display_name=payload.display_name,
type=payload.type,
local_password_hash=hash_password(payload.password),
)
db.session.add(user)
db.session.flush()
group = _resolve_group(payload.type)
db.session.add(UserGroup(user_id=user.id, group_id=group.id, engagement_id=None))
db.session.commit()
audit_write(
action="user.create",
resource_type="user",
resource_id=user.id,
metadata={"email": user.email, "type": user.type.value},
)
return jsonify_model(UserRead.model_validate(user), status=201)
@bp.patch("/<uid>")
@require_perm(Permission.USER_MANAGE)
def update_user(uid: str) -> ResponseReturnValue:
user = _user_or_404(uid)
payload = parse_body(UserUpdate)
changes = payload.model_dump(exclude_unset=True)
type_changed = "type" in changes and changes["type"] != user.type
if "display_name" in changes:
user.display_name = changes["display_name"]
if "password" in changes and changes["password"] is not None:
user.local_password_hash = hash_password(changes["password"])
if type_changed:
user.type = changes["type"]
# Realign group membership: drop the previous global membership and
# attach a fresh one matching the new type. Per-engagement memberships
# (engagement_id IS NOT NULL) stay untouched.
for link in list(user.group_links):
if link.engagement_id is None:
db.session.delete(link)
new_group = _resolve_group(changes["type"])
db.session.add(UserGroup(user_id=user.id, group_id=new_group.id, engagement_id=None))
db.session.commit()
audit_write(
action="user.update",
resource_type="user",
resource_id=user.id,
metadata={
"fields": sorted(k for k in changes if k != "password"),
"password_rotated": "password" in changes and changes["password"] is not None,
},
)
return jsonify_model(UserRead.model_validate(user))
@bp.delete("/<uid>")
@require_perm(Permission.USER_MANAGE)
def disable_user(uid: str) -> ResponseReturnValue:
user = _user_or_404(uid)
if user.disabled_at is not None:
return "", 204 # idempotent — already disabled
user.disabled_at = datetime.now(tz=UTC)
db.session.commit()
audit_write(
action="user.disable",
resource_type="user",
resource_id=user.id,
metadata={"email": user.email},
)
return "", 204
def _user_or_404(uid: str) -> User:
user = db.session.execute(
select(User)
.where(User.id == parse_uuid(uid, field="user id"))
.options(selectinload(User.group_links))
).scalar_one_or_none()
if user is None:
abort(404)
return user