feat(m3): RBAC — atomic perms, groups, users, admin SPA pages

Permission catalogue (services/permissions_seed.py)
- 31 atomic codes across 10 families: user.*, group.*, invitation.*,
  test_template.*, scenario_template.*, mission.* (incl.
  mission.write_red_fields + mission.write_blue_fields),
  detection_level.{read,update}, setting.{read,update}, mitre.sync.
- Default bindings: admin = all 31; redteam = 8 (catalogue read + mission.
  {read,create,update,archive,write_red_fields} + detection_level.read);
  blueteam = 5 (catalogue read + mission.{read,write_blue_fields} +
  detection_level.read).
- Seed runs at boot AND after /setup so a freshly truncated DB (via
  /diag/reset) gets the bindings back via the bootstrap path. Idempotent +
  additive (never removes a perm from a system group).

Users admin (services/users.py + api/users.py)
- list (q + is_active filter + pagination), get, patch (display_name /
  locale / is_active with tri-state sentinel for clear-vs-unset),
  soft-delete, set groups.
- Last-admin protection on update (deactivate), delete, and group-strip
  (refusing to remove the admin group from the last active admin).

Groups admin (services/groups.py + api/groups.py)
- Full CRUD with system-group protection (no rename, no delete on
  admin/redteam/blueteam).
- PUT /groups/{id}/permissions sets the perm list.
- Admin system group's perm set is locked to the full catalogue
  (SystemGroupProtected → 409) — preserves the bypass invariant even if a
  future refactor moves to perm-based checks.

Permissions read-only (api/permissions.py)
- GET /permissions returns the catalogue (admin or group.read holders).

/diag/reset extension
- After truncate + token mint, the limiter is also reset (limiter.reset())
  so the Playwright suite doesn't hit 10/min budgets across spec files.
  Guarded by limiter.enabled to no-op in APP_ENV=test.

Rate-limit scope (core/rate_limit.py)
- enabled = APP_ENV in ("prod", "staging"). A staging deployment serves
  humans, so it gets the limits too. Dev/test stay unthrottled for
  Playwright ergonomics. Spec §6 NF-security is an operator-facing
  requirement.

Frontend chrome
- components/RequireAdmin.tsx + ui/Modal.tsx (reusable centered dialog
  with accessible name + Escape + backdrop-click).
- Layout.tsx shows Admin nav links only when is_admin === true. Server
  remains the arbiter — non-admins hitting /admin/* get redirected to /.

Frontend pages
- pages/AdminUsersPage.tsx, AdminGroupsPage.tsx, AdminInvitationsPage.tsx
  with edit modals using TanStack Query mutations + multi-select for perms
  grouped by family + copy-once invitation URL display.
- lib/admin.ts: shared types + query keys + groupPermsByFamily helper.
- lib/api.ts: apiPatch / apiPut / apiDelete added.

Playwright config (e2e/playwright.config.ts)
- workers: 1 + fullyParallel: false: spec files share the live Postgres,
  so concurrent /diag/reset calls clobber each other. Intra-file order
  preserved via test.describe.configure({ mode: 'serial' }).

Testing
- backend/tests/test_rbac.py: 15 integration tests (39 backend total — 1
  health + 8 schema + 15 auth + 15 RBAC).
- e2e/tests/m3-rbac.spec.ts: 8 Playwright tests covering DoD §10 #2/#3
  (28 e2e total — 8 M0 + 4 M1 + 8 M2 + 8 M3).
- tasks/testing-m3.md.

DoD: make test-api → 39 passed, make e2e → 28 passed. Spec-reviewer pass
applied (admin perm invariant + staging rate-limit scope).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Knacky
2026-05-11 06:17:07 +02:00
parent 700b563297
commit bb23bf3928
15 changed files with 2634 additions and 0 deletions

169
backend/app/api/groups.py Normal file
View File

@@ -0,0 +1,169 @@
"""Admin endpoints for groups + their permission bindings."""
from __future__ import annotations
import logging
import uuid
from flask import Blueprint, jsonify, request
from pydantic import BaseModel, Field, ValidationError
from app.core.auth_decorators import require_auth, require_perm
from app.services import groups as groups_svc
bp = Blueprint("groups", __name__, url_prefix="/groups")
log = logging.getLogger("metamorph.api.groups")
def _serialize(g: groups_svc.GroupView) -> dict:
return {
"id": str(g.id),
"name": g.name,
"description": g.description,
"is_system": g.is_system,
"members_count": g.members_count,
"permissions": g.permissions,
"created_at": g.created_at.isoformat(),
"updated_at": g.updated_at.isoformat(),
}
class CreateGroupPayload(BaseModel):
name: str = Field(min_length=1, max_length=80)
description: str | None = Field(default=None, max_length=2000)
model_config = {"extra": "forbid"}
class UpdateGroupPayload(BaseModel):
name: str | None = Field(default=None, min_length=1, max_length=80)
description: str | None = Field(default=None, max_length=2000)
model_config = {"extra": "forbid"}
class SetPermissionsPayload(BaseModel):
codes: list[str] = Field(default_factory=list)
model_config = {"extra": "forbid"}
def _parse_uuid_or_400(raw: str):
try:
return uuid.UUID(raw)
except ValueError:
return None
@bp.get("")
@require_auth
@require_perm("group.read")
def list_groups():
rows = groups_svc.list_groups()
return jsonify({"items": [_serialize(g) for g in rows], "total": len(rows)})
@bp.get("/<group_id>")
@require_auth
@require_perm("group.read")
def get_group(group_id: str):
gid = _parse_uuid_or_400(group_id)
if gid is None:
return jsonify({"error": "invalid_id"}), 400
try:
g = groups_svc.get_group(gid)
except groups_svc.GroupNotFound:
return jsonify({"error": "not_found"}), 404
return jsonify(_serialize(g))
@bp.post("")
@require_auth
@require_perm("group.create")
def create_group():
try:
payload = CreateGroupPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400
try:
g = groups_svc.create_group(name=payload.name, description=payload.description)
except groups_svc.GroupNameConflict as e:
return jsonify({"error": "name_conflict", "message": str(e)}), 409
except ValueError as e:
return jsonify({"error": "invalid_request", "message": str(e)}), 400
log.info("metamorph.group.created", extra={"group_id": str(g.id), "group_name": g.name})
return jsonify(_serialize(g)), 201
@bp.patch("/<group_id>")
@require_auth
@require_perm("group.update")
def update_group(group_id: str):
gid = _parse_uuid_or_400(group_id)
if gid is None:
return jsonify({"error": "invalid_id"}), 400
raw = request.get_json(silent=True) or {}
try:
payload = UpdateGroupPayload.model_validate(raw)
except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400
description_unset = "description" not in raw
try:
g = groups_svc.update_group(
gid,
name=payload.name,
description=... if description_unset else payload.description,
)
except groups_svc.GroupNotFound:
return jsonify({"error": "not_found"}), 404
except groups_svc.SystemGroupProtected as e:
return jsonify({"error": "system_group_protected", "message": str(e)}), 409
except groups_svc.GroupNameConflict as e:
return jsonify({"error": "name_conflict", "message": str(e)}), 409
except ValueError as e:
return jsonify({"error": "invalid_request", "message": str(e)}), 400
log.info("metamorph.group.updated", extra={"group_id": str(gid), "fields": sorted(raw.keys())})
return jsonify(_serialize(g))
@bp.delete("/<group_id>")
@require_auth
@require_perm("group.delete")
def soft_delete(group_id: str):
gid = _parse_uuid_or_400(group_id)
if gid is None:
return jsonify({"error": "invalid_id"}), 400
try:
groups_svc.soft_delete_group(gid)
except groups_svc.GroupNotFound:
return jsonify({"error": "not_found"}), 404
except groups_svc.SystemGroupProtected as e:
return jsonify({"error": "system_group_protected", "message": str(e)}), 409
log.info("metamorph.group.soft_deleted", extra={"group_id": str(gid)})
return jsonify({"ok": True})
@bp.put("/<group_id>/permissions")
@require_auth
@require_perm("group.update")
def set_permissions(group_id: str):
gid = _parse_uuid_or_400(group_id)
if gid is None:
return jsonify({"error": "invalid_id"}), 400
try:
payload = SetPermissionsPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400
try:
g = groups_svc.set_group_permissions(gid, payload.codes)
except groups_svc.GroupNotFound:
return jsonify({"error": "not_found"}), 404
except groups_svc.SystemGroupProtected as e:
return jsonify({"error": "system_group_protected", "message": str(e)}), 409
except ValueError as e:
return jsonify({"error": "invalid_request", "message": str(e)}), 400
log.info(
"metamorph.group.permissions_set",
extra={"group_id": str(gid), "count": len(payload.codes)},
)
return jsonify(_serialize(g))

View File

@@ -0,0 +1,17 @@
"""Read-only catalogue of platform permission codes."""
from __future__ import annotations
from flask import Blueprint, jsonify
from app.core.auth_decorators import require_auth, require_perm
from app.services import groups as groups_svc
bp = Blueprint("permissions", __name__, url_prefix="/permissions")
@bp.get("")
@require_auth
@require_perm("group.read")
def list_permissions():
return jsonify({"items": groups_svc.list_permissions()})

185
backend/app/api/users.py Normal file
View File

@@ -0,0 +1,185 @@
"""Admin endpoints for user management.
Note: self-service updates (own display name, locale, password) belong to
`/auth/*`; this blueprint is admin-only.
"""
from __future__ import annotations
import logging
import uuid
from flask import Blueprint, jsonify, request
from pydantic import BaseModel, Field, ValidationError
from app.core.auth_decorators import require_auth, require_perm
from app.services import users as users_svc
bp = Blueprint("users", __name__, url_prefix="/users")
log = logging.getLogger("metamorph.api.users")
def _serialize(u: users_svc.UserView) -> dict:
return {
"id": str(u.id),
"email": u.email,
"display_name": u.display_name,
"locale": u.locale,
"is_active": u.is_active,
"deleted_at": u.deleted_at.isoformat() if u.deleted_at else None,
"created_at": u.created_at.isoformat(),
"updated_at": u.updated_at.isoformat(),
"groups": [{"id": str(gid), "name": name} for gid, name in u.groups],
}
class UpdateUserPayload(BaseModel):
# display_name: omitted = no change, null = clear, str = set.
# Tri-state encoded with a `default-unset` sentinel via model_extra.
display_name: str | None = None
locale: str | None = Field(default=None, pattern=r"^[a-z]{2}$")
is_active: bool | None = None
model_config = {"extra": "forbid"}
class SetGroupsPayload(BaseModel):
group_ids: list[uuid.UUID]
model_config = {"extra": "forbid"}
def _parse_uuid_or_400(raw: str):
try:
return uuid.UUID(raw)
except ValueError:
return None
@bp.get("")
@require_auth
@require_perm("user.read")
def list_users():
q = request.args.get("q") or None
is_active_raw = request.args.get("is_active")
is_active: bool | None
if is_active_raw is None:
is_active = None
elif is_active_raw.lower() in ("true", "1", "yes"):
is_active = True
elif is_active_raw.lower() in ("false", "0", "no"):
is_active = False
else:
return jsonify({"error": "invalid_is_active"}), 400
try:
limit = int(request.args.get("limit", "50"))
offset = int(request.args.get("offset", "0"))
except ValueError:
return jsonify({"error": "invalid_pagination"}), 400
limit = max(1, min(limit, 200))
offset = max(0, offset)
rows, total = users_svc.list_users(q=q, is_active=is_active, limit=limit, offset=offset)
return jsonify(
{
"items": [_serialize(u) for u in rows],
"total": total,
"limit": limit,
"offset": offset,
}
)
@bp.get("/<user_id>")
@require_auth
@require_perm("user.read")
def get_user(user_id: str):
uid = _parse_uuid_or_400(user_id)
if uid is None:
return jsonify({"error": "invalid_id"}), 400
try:
u = users_svc.get_user(uid)
except users_svc.UserNotFound:
return jsonify({"error": "not_found"}), 404
return jsonify(_serialize(u))
@bp.patch("/<user_id>")
@require_auth
@require_perm("user.update")
def update_user(user_id: str):
uid = _parse_uuid_or_400(user_id)
if uid is None:
return jsonify({"error": "invalid_id"}), 400
raw = request.get_json(silent=True) or {}
try:
payload = UpdateUserPayload.model_validate(raw)
except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400
# Distinguish "key absent" (no change) from "key=null" (clear) for display_name.
display_name_unset = "display_name" not in raw
try:
u = users_svc.update_user(
uid,
display_name=... if display_name_unset else payload.display_name,
locale=payload.locale,
is_active=payload.is_active,
)
except users_svc.UserNotFound:
return jsonify({"error": "not_found"}), 404
except users_svc.LastAdminProtected as e:
return jsonify({"error": "last_admin_protected", "message": str(e)}), 409
log.info(
"metamorph.user.updated",
extra={
"user_id": str(uid),
"fields": sorted(raw.keys()),
},
)
return jsonify(_serialize(u))
@bp.delete("/<user_id>")
@require_auth
@require_perm("user.delete")
def soft_delete(user_id: str):
uid = _parse_uuid_or_400(user_id)
if uid is None:
return jsonify({"error": "invalid_id"}), 400
try:
users_svc.soft_delete_user(uid)
except users_svc.UserNotFound:
return jsonify({"error": "not_found"}), 404
except users_svc.LastAdminProtected as e:
return jsonify({"error": "last_admin_protected", "message": str(e)}), 409
log.info("metamorph.user.soft_deleted", extra={"user_id": str(uid)})
return jsonify({"ok": True})
@bp.put("/<user_id>/groups")
@require_auth
@require_perm("user.update")
def set_groups(user_id: str):
uid = _parse_uuid_or_400(user_id)
if uid is None:
return jsonify({"error": "invalid_id"}), 400
try:
payload = SetGroupsPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400
try:
u = users_svc.set_user_groups(uid, payload.group_ids)
except users_svc.UserNotFound:
return jsonify({"error": "not_found"}), 404
except users_svc.LastAdminProtected as e:
return jsonify({"error": "last_admin_protected", "message": str(e)}), 409
except ValueError as e:
return jsonify({"error": "invalid_request", "message": str(e)}), 400
log.info(
"metamorph.user.groups_set",
extra={"user_id": str(uid), "groups": [str(g) for g in payload.group_ids]},
)
return jsonify(_serialize(u))

View File

@@ -0,0 +1,210 @@
"""Admin-side group management: CRUD + permission bindings.
System groups (`is_system=True`: admin, redteam, blueteam) cannot be renamed
or deleted, but their permission bindings are seeded on boot and editable
afterwards (e.g. an admin can broaden `redteam`).
"""
from __future__ import annotations
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from sqlalchemy import func, select
from app.db.session import session_scope
from app.models.auth import Group, GroupPermission, Permission, UserGroup
from app.services.bootstrap import ADMIN_GROUP_NAME
class GroupNotFound(Exception):
pass
class GroupNameConflict(Exception):
pass
class SystemGroupProtected(Exception):
"""Refusing to delete or rename a built-in system group."""
@dataclass(frozen=True)
class GroupView:
id: uuid.UUID
name: str
description: str | None
is_system: bool
deleted_at: datetime | None
members_count: int
permissions: list[str]
created_at: datetime
updated_at: datetime
def _to_view(g: Group, members_count: int) -> GroupView:
return GroupView(
id=g.id,
name=g.name,
description=g.description,
is_system=g.is_system,
deleted_at=g.deleted_at,
members_count=members_count,
permissions=sorted(p.code for p in g.permissions),
created_at=g.created_at,
updated_at=g.updated_at,
)
def _members_counts(s, group_ids: list[uuid.UUID]) -> dict[uuid.UUID, int]:
if not group_ids:
return {}
from app.models.auth import User as _U # local to avoid model cycles
rows = s.execute(
select(UserGroup.group_id, func.count(UserGroup.user_id))
.join(_U, _U.id == UserGroup.user_id)
.where(UserGroup.group_id.in_(group_ids), _U.deleted_at.is_(None))
.group_by(UserGroup.group_id)
).all()
return {gid: int(cnt) for gid, cnt in rows}
def list_groups(*, include_deleted: bool = False) -> list[GroupView]:
with session_scope() as s:
stmt = select(Group).order_by(Group.is_system.desc(), Group.name.asc())
if not include_deleted:
stmt = stmt.where(Group.deleted_at.is_(None))
rows = s.scalars(stmt).all()
counts = _members_counts(s, [g.id for g in rows])
return [_to_view(g, counts.get(g.id, 0)) for g in rows]
def get_group(group_id: uuid.UUID) -> GroupView:
with session_scope() as s:
g = s.get(Group, group_id)
if g is None or g.deleted_at is not None:
raise GroupNotFound()
counts = _members_counts(s, [g.id])
return _to_view(g, counts.get(g.id, 0))
def create_group(*, name: str, description: str | None) -> GroupView:
name_norm = name.strip()
if not name_norm:
raise ValueError("name is required")
with session_scope() as s:
existing = s.scalar(
select(Group).where(Group.name == name_norm, Group.deleted_at.is_(None))
)
if existing is not None:
raise GroupNameConflict(f"group name {name_norm!r} already in use")
g = Group(name=name_norm, description=(description or "").strip() or None, is_system=False)
s.add(g)
s.flush()
return _to_view(g, 0)
def update_group(
group_id: uuid.UUID,
*,
name: str | None = None,
description: str | None | object = ...,
) -> GroupView:
with session_scope() as s:
g = s.get(Group, group_id)
if g is None or g.deleted_at is not None:
raise GroupNotFound()
if name is not None:
name_norm = name.strip()
if not name_norm:
raise ValueError("name cannot be empty")
if g.is_system and name_norm != g.name:
raise SystemGroupProtected("system groups cannot be renamed")
if name_norm != g.name:
clash = s.scalar(
select(Group).where(
Group.name == name_norm,
Group.deleted_at.is_(None),
Group.id != g.id,
)
)
if clash is not None:
raise GroupNameConflict(f"group name {name_norm!r} already in use")
g.name = name_norm
if description is not ...:
if description in (None, ""):
g.description = None
else:
g.description = description.strip() or None
counts = _members_counts(s, [g.id])
return _to_view(g, counts.get(g.id, 0))
def soft_delete_group(group_id: uuid.UUID) -> None:
with session_scope() as s:
g = s.get(Group, group_id)
if g is None or g.deleted_at is not None:
raise GroupNotFound()
if g.is_system:
raise SystemGroupProtected("system groups cannot be deleted")
g.deleted_at = datetime.now(tz=timezone.utc)
def set_group_permissions(group_id: uuid.UUID, codes: list[str]) -> GroupView:
"""Replace the group's permission set with the given codes (validated)."""
desired_codes = set(codes)
with session_scope() as s:
g = s.get(Group, group_id)
if g is None or g.deleted_at is not None:
raise GroupNotFound()
# Preserve the invariant "the system `admin` group has every perm." The
# decorator's admin bypass relies on `is_admin` (group membership), not
# on the perm set, so a stripped admin group would still grant access —
# but the listing would look misleading and a future refactor could
# reasonably switch the bypass to a perm-based check.
if g.is_system and g.name == ADMIN_GROUP_NAME:
all_codes = {p.code for p in s.scalars(select(Permission)).all()}
if desired_codes != all_codes:
raise SystemGroupProtected(
"the admin group must keep every permission"
)
if desired_codes:
perms = s.scalars(select(Permission).where(Permission.code.in_(desired_codes))).all()
known = {p.code for p in perms}
unknown = desired_codes - known
if unknown:
raise ValueError(f"unknown permission codes: {sorted(unknown)}")
else:
perms = []
current = {p.code: p for p in g.permissions}
to_remove = set(current) - desired_codes
to_add = desired_codes - set(current)
for code in to_remove:
row = s.get(GroupPermission, (g.id, current[code].id))
if row is not None:
s.delete(row)
for p in perms:
if p.code in to_add:
s.add(GroupPermission(group_id=g.id, permission_id=p.id))
s.flush()
s.refresh(g)
counts = _members_counts(s, [g.id])
return _to_view(g, counts.get(g.id, 0))
def list_permissions() -> list[dict]:
"""Return the catalogue of all permissions known to the platform."""
with session_scope() as s:
rows = s.scalars(select(Permission).order_by(Permission.code.asc())).all()
return [
{"id": str(p.id), "code": p.code, "description": p.description}
for p in rows
]

View File

@@ -0,0 +1,179 @@
"""Atomic permission catalogue + seed for the 3 default system groups.
Permissions follow the `<entity>.<action>` convention. They are the ground truth
checked by `@require_perm`; admins bypass everything (cf. `auth_decorators.py`).
This module is the single place that lists every permission code shipped with
the platform. To add a new perm in a future milestone:
1. Add an entry to `PERMISSION_CATALOGUE`.
2. Decide which system group(s) should get it by default — edit
`_default_redteam_perms()` / `_default_blueteam_perms()` if relevant
(admin always gets everything, so no edit needed there).
3. The next boot picks it up; existing groups are *upgraded* (perms added),
never downgraded (we never remove perms from a system group, even if you
trim the catalogue — that would be a destructive op disguised as a seed).
The seed is idempotent and safe to call on every boot.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from sqlalchemy import select
from app.db.session import session_scope
from app.models.auth import Group, GroupPermission, Permission
from app.services.bootstrap import (
ADMIN_GROUP_NAME,
BLUETEAM_GROUP_NAME,
REDTEAM_GROUP_NAME,
)
log = logging.getLogger("metamorph.permissions")
@dataclass(frozen=True)
class PermissionDef:
code: str
description: str
# === Catalogue ================================================================
#
# Order is presentation-only; the seed is idempotent. Grouped by family to keep
# diffs reviewable and to mirror the admin UI grouping in M3.6.
#
PERMISSION_CATALOGUE: tuple[PermissionDef, ...] = (
# users
PermissionDef("user.read", "View users."),
PermissionDef("user.create", "Create users (typically via invitation)."),
PermissionDef("user.update", "Update user metadata (display name, locale, active flag)."),
PermissionDef("user.delete", "Soft-delete a user."),
# groups
PermissionDef("group.read", "View groups and their permissions."),
PermissionDef("group.create", "Create a custom group."),
PermissionDef("group.update", "Edit a custom group (name, description, permissions, members)."),
PermissionDef("group.delete", "Soft-delete a custom group."),
# invitations
PermissionDef("invitation.read", "View pending invitations."),
PermissionDef("invitation.create", "Issue a new invitation URL."),
PermissionDef("invitation.revoke", "Revoke an unconsumed invitation."),
# test templates
PermissionDef("test_template.read", "View the test-template catalogue."),
PermissionDef("test_template.create", "Create a test template."),
PermissionDef("test_template.update", "Edit a test template."),
PermissionDef("test_template.delete", "Soft-delete a test template."),
# scenario templates
PermissionDef("scenario_template.read", "View the scenario-template catalogue."),
PermissionDef("scenario_template.create", "Create a scenario template."),
PermissionDef("scenario_template.update", "Edit a scenario template (and its ordered tests)."),
PermissionDef("scenario_template.delete", "Soft-delete a scenario template."),
# missions
PermissionDef("mission.read", "View missions (server still filters by membership for non-admin)."),
PermissionDef("mission.create", "Create a mission."),
PermissionDef("mission.update", "Edit mission metadata, scenarios, members."),
PermissionDef("mission.archive", "Move a mission to status=archived."),
PermissionDef("mission.delete", "Soft-delete a mission."),
PermissionDef("mission.write_red_fields", "Write red-side fields on a mission test."),
PermissionDef("mission.write_blue_fields", "Write blue-side fields and upload evidence."),
# detection levels + platform settings + MITRE sync
PermissionDef("detection_level.read", "View the detection-level taxonomy."),
PermissionDef("detection_level.update", "Edit the detection-level taxonomy."),
PermissionDef("setting.read", "Read platform settings."),
PermissionDef("setting.update", "Update platform settings."),
PermissionDef("mitre.sync", "Trigger a MITRE ATT&CK Enterprise re-sync."),
)
def _default_redteam_perms() -> frozenset[str]:
return frozenset(
{
# catalogue read-only
"test_template.read",
"scenario_template.read",
# MITRE/detection refs
"detection_level.read",
# missions: full lifecycle on red side
"mission.read",
"mission.create",
"mission.update",
"mission.archive",
"mission.write_red_fields",
}
)
def _default_blueteam_perms() -> frozenset[str]:
return frozenset(
{
"test_template.read",
"scenario_template.read",
"detection_level.read",
"mission.read",
"mission.write_blue_fields",
}
)
def _all_perm_codes() -> frozenset[str]:
return frozenset(p.code for p in PERMISSION_CATALOGUE)
def seed_permissions() -> dict[str, int]:
"""Insert any missing permissions. Returns counts: `created`, `total`."""
created = 0
with session_scope() as s:
existing_codes = set(s.scalars(select(Permission.code)).all())
for p in PERMISSION_CATALOGUE:
if p.code in existing_codes:
continue
s.add(Permission(code=p.code, description=p.description))
created += 1
return {"created": created, "total": len(PERMISSION_CATALOGUE)}
def _assign_perms_to_group(group_name: str, codes: frozenset[str]) -> int:
"""Attach the named perms to the given system group. Returns added count.
We never *remove* perms from a system group here — the seed is additive.
Admins/operators who want to revoke must do so explicitly via the UI/API.
"""
if not codes:
return 0
added = 0
with session_scope() as s:
group = s.scalar(select(Group).where(Group.name == group_name, Group.is_system.is_(True)))
if group is None:
raise RuntimeError(f"system group {group_name!r} missing — call ensure_system_groups() first")
existing_codes = {p.code for p in group.permissions}
perms = s.scalars(select(Permission).where(Permission.code.in_(codes))).all()
for p in perms:
if p.code in existing_codes:
continue
s.add(GroupPermission(group_id=group.id, permission_id=p.id))
added += 1
return added
def seed_default_group_permissions() -> dict[str, int]:
"""Bind the catalogue to the 3 default groups. Idempotent + additive."""
counts: dict[str, int] = {}
counts[ADMIN_GROUP_NAME] = _assign_perms_to_group(ADMIN_GROUP_NAME, _all_perm_codes())
counts[REDTEAM_GROUP_NAME] = _assign_perms_to_group(REDTEAM_GROUP_NAME, _default_redteam_perms())
counts[BLUETEAM_GROUP_NAME] = _assign_perms_to_group(BLUETEAM_GROUP_NAME, _default_blueteam_perms())
return counts
def seed_all() -> dict[str, dict[str, int]]:
"""One-shot helper: catalogue + default group bindings."""
perms = seed_permissions()
bindings = seed_default_group_permissions()
log.info(
"metamorph.permissions.seeded",
extra={"perms_created": perms["created"], "perms_total": perms["total"], "bindings": bindings},
)
return {"permissions": perms, "bindings": bindings}

View File

@@ -0,0 +1,204 @@
"""Admin-side user management: list, get, update, soft-delete, assign groups.
Self-service updates (locale, password, display_name) live in
`services.auth` — this module is for admin operations on other users.
"""
from __future__ import annotations
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Iterable
from sqlalchemy import func, or_, select
from app.db.session import session_scope
from app.models.auth import Group, User, UserGroup
from app.services.bootstrap import ADMIN_GROUP_NAME
class UserNotFound(Exception):
pass
class LastAdminProtected(Exception):
"""Refusing to strip admin from the last active admin."""
class SystemGroupProtected(Exception):
"""Refusing to delete or rename a built-in system group."""
@dataclass(frozen=True)
class UserView:
id: uuid.UUID
email: str
display_name: str | None
locale: str
is_active: bool
deleted_at: datetime | None
created_at: datetime
updated_at: datetime
groups: list[tuple[uuid.UUID, str]]
def _to_view(u: User) -> UserView:
return UserView(
id=u.id,
email=u.email,
display_name=u.display_name,
locale=u.locale,
is_active=u.is_active,
deleted_at=u.deleted_at,
created_at=u.created_at,
updated_at=u.updated_at,
groups=[(g.id, g.name) for g in u.groups if g.deleted_at is None],
)
def list_users(
*,
q: str | None = None,
is_active: bool | None = None,
include_deleted: bool = False,
limit: int = 50,
offset: int = 0,
) -> tuple[list[UserView], int]:
"""Return (rows, total_count) with case-insensitive search on email + display_name."""
with session_scope() as s:
stmt = select(User)
count_stmt = select(func.count()).select_from(User)
if not include_deleted:
stmt = stmt.where(User.deleted_at.is_(None))
count_stmt = count_stmt.where(User.deleted_at.is_(None))
if is_active is not None:
stmt = stmt.where(User.is_active.is_(is_active))
count_stmt = count_stmt.where(User.is_active.is_(is_active))
if q:
like = f"%{q.lower()}%"
stmt = stmt.where(
or_(func.lower(User.email).like(like), func.lower(User.display_name).like(like))
)
count_stmt = count_stmt.where(
or_(func.lower(User.email).like(like), func.lower(User.display_name).like(like))
)
stmt = stmt.order_by(User.email.asc()).limit(limit).offset(offset)
rows = s.scalars(stmt).all()
total = int(s.scalar(count_stmt) or 0)
views = [_to_view(u) for u in rows]
return views, total
def get_user(user_id: uuid.UUID, *, include_deleted: bool = False) -> UserView:
with session_scope() as s:
u = s.get(User, user_id)
if u is None or (u.deleted_at is not None and not include_deleted):
raise UserNotFound()
return _to_view(u)
def update_user(
user_id: uuid.UUID,
*,
display_name: str | None | object = ...,
locale: str | None = None,
is_active: bool | None = None,
) -> UserView:
"""Partial update. Pass display_name=None to clear; omit to leave unchanged."""
with session_scope() as s:
u = s.get(User, user_id)
if u is None or u.deleted_at is not None:
raise UserNotFound()
if display_name is not ...:
if display_name in (None, ""):
u.display_name = None
else:
u.display_name = display_name.strip() or None
if locale is not None:
u.locale = locale
if is_active is not None:
# If deactivating the last active admin, refuse.
if not is_active and _is_last_active_admin(s, u):
raise LastAdminProtected("cannot deactivate the last active admin")
u.is_active = is_active
return _to_view(u)
def soft_delete_user(user_id: uuid.UUID) -> None:
with session_scope() as s:
u = s.get(User, user_id)
if u is None or u.deleted_at is not None:
raise UserNotFound()
if _is_last_active_admin(s, u):
raise LastAdminProtected("cannot delete the last active admin")
u.deleted_at = datetime.now(tz=timezone.utc)
u.is_active = False
def set_user_groups(user_id: uuid.UUID, group_ids: Iterable[uuid.UUID]) -> UserView:
"""Replace the user's group memberships with the given set."""
desired = set(group_ids)
with session_scope() as s:
u = s.get(User, user_id)
if u is None or u.deleted_at is not None:
raise UserNotFound()
# Resolve admin group id once.
admin_group_id = s.scalar(
select(Group.id).where(Group.name == ADMIN_GROUP_NAME, Group.is_system.is_(True))
)
is_currently_admin = admin_group_id in {g.id for g in u.groups}
will_be_admin = admin_group_id in desired
if is_currently_admin and not will_be_admin and _is_last_active_admin(s, u):
raise LastAdminProtected("cannot remove admin from the last active admin")
# Refuse silently for unknown groups: validate first.
if desired:
known = set(
s.scalars(
select(Group.id).where(Group.id.in_(desired), Group.deleted_at.is_(None))
).all()
)
unknown = desired - known
if unknown:
raise ValueError(f"unknown groups: {sorted(map(str, unknown))}")
current = {g.id for g in u.groups}
to_add = desired - current
to_remove = current - desired
for gid in to_remove:
row = s.get(UserGroup, (u.id, gid))
if row is not None:
s.delete(row)
for gid in to_add:
s.add(UserGroup(user_id=u.id, group_id=gid))
s.flush()
s.refresh(u)
return _to_view(u)
def _is_last_active_admin(s, user: User) -> bool:
"""True when `user` is currently in the admin system group and removing/blocking
them would leave the platform with zero active admins."""
admin_group_id = s.scalar(
select(Group.id).where(Group.name == ADMIN_GROUP_NAME, Group.is_system.is_(True))
)
if admin_group_id is None:
return False
if admin_group_id not in {g.id for g in user.groups}:
return False
other_admins = s.scalar(
select(func.count())
.select_from(User)
.join(UserGroup, UserGroup.user_id == User.id)
.where(
UserGroup.group_id == admin_group_id,
User.id != user.id,
User.deleted_at.is_(None),
User.is_active.is_(True),
)
)
return int(other_admins or 0) == 0

344
backend/tests/test_rbac.py Normal file
View File

@@ -0,0 +1,344 @@
"""Integration tests for M3: permission seed + users/groups/permissions APIs.
Exercises the Flask test client against a live Postgres. The DB is wiped at
module load so test ordering inside the module matters (see `pytest.shared_*`).
"""
from __future__ import annotations
import secrets
import pytest
from sqlalchemy import text
from app.core.install_token import regenerate_install_token
from app.main import create_app
from app.services.permissions_seed import PERMISSION_CATALOGUE
def _truncate_all(engine):
"""Wipe data plus permissions table. CASCADE handles dependent rows."""
with engine.begin() as conn:
conn.execute(
text(
"TRUNCATE users, refresh_tokens, invitations, invitation_groups, "
"user_groups, group_permissions, permissions, settings, groups "
"RESTART IDENTITY CASCADE"
)
)
@pytest.fixture(scope="module")
def app(db_engine_or_skip):
_truncate_all(db_engine_or_skip)
flask_app = create_app() # triggers bootstrap → seed_all()
flask_app.config.update(TESTING=True)
return flask_app
@pytest.fixture()
def client(app):
return app.test_client()
def _unique_email(prefix: str) -> str:
return f"{prefix}-{secrets.token_hex(4)}@metamorph.local"
def _login(client, email: str, password: str) -> str:
r = client.post("/api/v1/auth/login", json={"email": email, "password": password})
assert r.status_code == 200, r.get_data(as_text=True)
return r.get_json()["access_token"]
# -- M3.1 — Permissions seeded at boot -----------------------------------------
def test_permissions_catalogue_seeded(client):
"""Catalogue table has every code from PERMISSION_CATALOGUE."""
# We need an admin to call /permissions — bootstrap one via /setup.
token = regenerate_install_token()
email = _unique_email("admin")
r = client.post(
"/api/v1/setup",
json={
"install_token": token,
"email": email,
"password": "AdminPass1234!",
"display_name": "Admin",
},
)
assert r.status_code == 201, r.get_data(as_text=True)
pytest.shared_admin = {"email": email, "password": "AdminPass1234!", "user_id": r.get_json()["user_id"]} # type: ignore[attr-defined]
access = _login(client, email, "AdminPass1234!")
perms = client.get(
"/api/v1/permissions", headers={"Authorization": f"Bearer {access}"}
).get_json()
codes = {p["code"] for p in perms["items"]}
expected = {p.code for p in PERMISSION_CATALOGUE}
assert expected.issubset(codes)
def test_admin_group_has_every_permission(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
groups = client.get(
"/api/v1/groups", headers={"Authorization": f"Bearer {access}"}
).get_json()
admin_group = next(g for g in groups["items"] if g["name"] == "admin")
assert set(admin_group["permissions"]) == {p.code for p in PERMISSION_CATALOGUE}
assert admin_group["is_system"] is True
def test_redteam_group_has_red_perms_but_not_blue_write(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
groups = client.get(
"/api/v1/groups", headers={"Authorization": f"Bearer {access}"}
).get_json()
redteam = next(g for g in groups["items"] if g["name"] == "redteam")
assert "mission.write_red_fields" in redteam["permissions"]
assert "mission.write_blue_fields" not in redteam["permissions"]
assert "mission.create" in redteam["permissions"]
def test_blueteam_group_has_blue_perm_but_not_red_write(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
groups = client.get(
"/api/v1/groups", headers={"Authorization": f"Bearer {access}"}
).get_json()
blueteam = next(g for g in groups["items"] if g["name"] == "blueteam")
assert "mission.write_blue_fields" in blueteam["permissions"]
assert "mission.write_red_fields" not in blueteam["permissions"]
assert "mission.create" not in blueteam["permissions"]
# -- M3.2 — Users CRUD ---------------------------------------------------------
def _invite_user(client, admin_access: str, email: str, password: str, group_ids: list[str] | None = None) -> str:
"""Create + accept an invitation, return the new user's id."""
create = client.post(
"/api/v1/invitations",
headers={"Authorization": f"Bearer {admin_access}"},
json={"email_hint": email, "group_ids": group_ids or []},
)
assert create.status_code == 201, create.get_data(as_text=True)
token = create.get_json()["token"]
accept = client.post(
f"/api/v1/invitations/accept/{token}",
json={"email": email, "password": password, "display_name": email.split("@")[0]},
)
assert accept.status_code == 201, accept.get_data(as_text=True)
return accept.get_json()["user_id"]
def test_admin_lists_users(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
r = client.get("/api/v1/users", headers={"Authorization": f"Bearer {access}"})
assert r.status_code == 200
body = r.get_json()
assert body["total"] >= 1
emails = [u["email"] for u in body["items"]]
assert admin["email"] in emails
def test_admin_updates_a_user(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
bob_email = _unique_email("bob")
bob_id = _invite_user(client, access, bob_email, "BobPass1234!")
r = client.patch(
f"/api/v1/users/{bob_id}",
headers={"Authorization": f"Bearer {access}"},
json={"display_name": "Robert", "locale": "en"},
)
assert r.status_code == 200, r.get_data(as_text=True)
body = r.get_json()
assert body["display_name"] == "Robert"
assert body["locale"] == "en"
def test_admin_soft_deletes_a_user(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
target_email = _unique_email("ghost")
target_id = _invite_user(client, access, target_email, "GhostPass1234!")
r = client.delete(
f"/api/v1/users/{target_id}",
headers={"Authorization": f"Bearer {access}"},
)
assert r.status_code == 200, r.get_data(as_text=True)
# Listing must not return the deleted user by default.
listing = client.get(
"/api/v1/users", headers={"Authorization": f"Bearer {access}"}
).get_json()
assert target_email not in [u["email"] for u in listing["items"]]
def test_last_admin_cannot_be_deleted(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
r = client.delete(
f"/api/v1/users/{admin['user_id']}",
headers={"Authorization": f"Bearer {access}"},
)
assert r.status_code == 409
assert r.get_json()["error"] == "last_admin_protected"
def test_last_admin_cannot_be_deactivated(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
r = client.patch(
f"/api/v1/users/{admin['user_id']}",
headers={"Authorization": f"Bearer {access}"},
json={"is_active": False},
)
assert r.status_code == 409
assert r.get_json()["error"] == "last_admin_protected"
# -- M3.3 — Groups CRUD --------------------------------------------------------
def test_admin_creates_custom_group_and_assigns_perms(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
# Create the group
create = client.post(
"/api/v1/groups",
headers={"Authorization": f"Bearer {access}"},
json={"name": f"pentest-{secrets.token_hex(3)}", "description": "Test group"},
)
assert create.status_code == 201, create.get_data(as_text=True)
gid = create.get_json()["id"]
# Attach mission.read + mission.write_red_fields only
r = client.put(
f"/api/v1/groups/{gid}/permissions",
headers={"Authorization": f"Bearer {access}"},
json={"codes": ["mission.read", "mission.write_red_fields"]},
)
assert r.status_code == 200, r.get_data(as_text=True)
assert set(r.get_json()["permissions"]) == {"mission.read", "mission.write_red_fields"}
def test_system_group_cannot_be_renamed_or_deleted(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
groups = client.get(
"/api/v1/groups", headers={"Authorization": f"Bearer {access}"}
).get_json()
admin_group = next(g for g in groups["items"] if g["name"] == "admin")
rename = client.patch(
f"/api/v1/groups/{admin_group['id']}",
headers={"Authorization": f"Bearer {access}"},
json={"name": "superadmin"},
)
assert rename.status_code == 409
assert rename.get_json()["error"] == "system_group_protected"
delete = client.delete(
f"/api/v1/groups/{admin_group['id']}",
headers={"Authorization": f"Bearer {access}"},
)
assert delete.status_code == 409
assert delete.get_json()["error"] == "system_group_protected"
def test_setting_unknown_permission_code_returns_400(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
create = client.post(
"/api/v1/groups",
headers={"Authorization": f"Bearer {access}"},
json={"name": f"bad-perms-{secrets.token_hex(3)}", "description": None},
)
gid = create.get_json()["id"]
r = client.put(
f"/api/v1/groups/{gid}/permissions",
headers={"Authorization": f"Bearer {access}"},
json={"codes": ["bogus.permission"]},
)
assert r.status_code == 400
# -- M3 user ↔ group assignment ------------------------------------------------
def test_admin_assigns_user_to_custom_group_and_perms_apply(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
# Create a custom group that *only* grants user.read.
gname = f"readers-{secrets.token_hex(3)}"
group = client.post(
"/api/v1/groups",
headers={"Authorization": f"Bearer {access}"},
json={"name": gname, "description": None},
).get_json()
client.put(
f"/api/v1/groups/{group['id']}/permissions",
headers={"Authorization": f"Bearer {access}"},
json={"codes": ["user.read"]},
)
# Invite Dave, attach the new group via /users/{id}/groups.
dave_email = _unique_email("dave")
dave_id = _invite_user(client, access, dave_email, "DavePass1234!")
r = client.put(
f"/api/v1/users/{dave_id}/groups",
headers={"Authorization": f"Bearer {access}"},
json={"group_ids": [group["id"]]},
)
assert r.status_code == 200, r.get_data(as_text=True)
# Dave can now list users (user.read) but cannot create a group (group.create).
dave_access = _login(client, dave_email, "DavePass1234!")
can_read = client.get(
"/api/v1/users", headers={"Authorization": f"Bearer {dave_access}"}
)
assert can_read.status_code == 200
cannot_create_group = client.post(
"/api/v1/groups",
headers={"Authorization": f"Bearer {dave_access}"},
json={"name": "wont-happen", "description": None},
)
assert cannot_create_group.status_code == 403
def test_last_admin_cannot_lose_admin_membership(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
r = client.put(
f"/api/v1/users/{admin['user_id']}/groups",
headers={"Authorization": f"Bearer {access}"},
json={"group_ids": []},
)
assert r.status_code == 409
assert r.get_json()["error"] == "last_admin_protected"
# -- Permission enforcement ----------------------------------------------------
def test_non_admin_without_user_read_gets_403_on_users_list(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
# Invite Eve with no groups → no perms.
eve_email = _unique_email("eve")
_invite_user(client, access, eve_email, "EvePass1234!", group_ids=[])
eve_access = _login(client, eve_email, "EvePass1234!")
r = client.get("/api/v1/users", headers={"Authorization": f"Bearer {eve_access}"})
assert r.status_code == 403