diff --git a/backend/app/api/groups.py b/backend/app/api/groups.py new file mode 100644 index 0000000..f4c0aca --- /dev/null +++ b/backend/app/api/groups.py @@ -0,0 +1,169 @@ +"""Admin endpoints for groups + their permission bindings.""" + +from __future__ import annotations + +import logging +import uuid + +from flask import Blueprint, jsonify, request +from pydantic import BaseModel, Field, ValidationError + +from app.core.auth_decorators import require_auth, require_perm +from app.services import groups as groups_svc + +bp = Blueprint("groups", __name__, url_prefix="/groups") +log = logging.getLogger("metamorph.api.groups") + + +def _serialize(g: groups_svc.GroupView) -> dict: + return { + "id": str(g.id), + "name": g.name, + "description": g.description, + "is_system": g.is_system, + "members_count": g.members_count, + "permissions": g.permissions, + "created_at": g.created_at.isoformat(), + "updated_at": g.updated_at.isoformat(), + } + + +class CreateGroupPayload(BaseModel): + name: str = Field(min_length=1, max_length=80) + description: str | None = Field(default=None, max_length=2000) + + model_config = {"extra": "forbid"} + + +class UpdateGroupPayload(BaseModel): + name: str | None = Field(default=None, min_length=1, max_length=80) + description: str | None = Field(default=None, max_length=2000) + + model_config = {"extra": "forbid"} + + +class SetPermissionsPayload(BaseModel): + codes: list[str] = Field(default_factory=list) + + model_config = {"extra": "forbid"} + + +def _parse_uuid_or_400(raw: str): + try: + return uuid.UUID(raw) + except ValueError: + return None + + +@bp.get("") +@require_auth +@require_perm("group.read") +def list_groups(): + rows = groups_svc.list_groups() + return jsonify({"items": [_serialize(g) for g in rows], "total": len(rows)}) + + +@bp.get("/") +@require_auth +@require_perm("group.read") +def get_group(group_id: str): + gid = _parse_uuid_or_400(group_id) + if gid is None: + return jsonify({"error": "invalid_id"}), 400 + try: + g = groups_svc.get_group(gid) + except groups_svc.GroupNotFound: + return jsonify({"error": "not_found"}), 404 + return jsonify(_serialize(g)) + + +@bp.post("") +@require_auth +@require_perm("group.create") +def create_group(): + try: + payload = CreateGroupPayload.model_validate(request.get_json(silent=True) or {}) + except ValidationError as e: + return jsonify({"error": "invalid_request", "details": e.errors()}), 400 + try: + g = groups_svc.create_group(name=payload.name, description=payload.description) + except groups_svc.GroupNameConflict as e: + return jsonify({"error": "name_conflict", "message": str(e)}), 409 + except ValueError as e: + return jsonify({"error": "invalid_request", "message": str(e)}), 400 + log.info("metamorph.group.created", extra={"group_id": str(g.id), "group_name": g.name}) + return jsonify(_serialize(g)), 201 + + +@bp.patch("/") +@require_auth +@require_perm("group.update") +def update_group(group_id: str): + gid = _parse_uuid_or_400(group_id) + if gid is None: + return jsonify({"error": "invalid_id"}), 400 + raw = request.get_json(silent=True) or {} + try: + payload = UpdateGroupPayload.model_validate(raw) + except ValidationError as e: + return jsonify({"error": "invalid_request", "details": e.errors()}), 400 + description_unset = "description" not in raw + try: + g = groups_svc.update_group( + gid, + name=payload.name, + description=... if description_unset else payload.description, + ) + except groups_svc.GroupNotFound: + return jsonify({"error": "not_found"}), 404 + except groups_svc.SystemGroupProtected as e: + return jsonify({"error": "system_group_protected", "message": str(e)}), 409 + except groups_svc.GroupNameConflict as e: + return jsonify({"error": "name_conflict", "message": str(e)}), 409 + except ValueError as e: + return jsonify({"error": "invalid_request", "message": str(e)}), 400 + log.info("metamorph.group.updated", extra={"group_id": str(gid), "fields": sorted(raw.keys())}) + return jsonify(_serialize(g)) + + +@bp.delete("/") +@require_auth +@require_perm("group.delete") +def soft_delete(group_id: str): + gid = _parse_uuid_or_400(group_id) + if gid is None: + return jsonify({"error": "invalid_id"}), 400 + try: + groups_svc.soft_delete_group(gid) + except groups_svc.GroupNotFound: + return jsonify({"error": "not_found"}), 404 + except groups_svc.SystemGroupProtected as e: + return jsonify({"error": "system_group_protected", "message": str(e)}), 409 + log.info("metamorph.group.soft_deleted", extra={"group_id": str(gid)}) + return jsonify({"ok": True}) + + +@bp.put("//permissions") +@require_auth +@require_perm("group.update") +def set_permissions(group_id: str): + gid = _parse_uuid_or_400(group_id) + if gid is None: + return jsonify({"error": "invalid_id"}), 400 + try: + payload = SetPermissionsPayload.model_validate(request.get_json(silent=True) or {}) + except ValidationError as e: + return jsonify({"error": "invalid_request", "details": e.errors()}), 400 + try: + g = groups_svc.set_group_permissions(gid, payload.codes) + except groups_svc.GroupNotFound: + return jsonify({"error": "not_found"}), 404 + except groups_svc.SystemGroupProtected as e: + return jsonify({"error": "system_group_protected", "message": str(e)}), 409 + except ValueError as e: + return jsonify({"error": "invalid_request", "message": str(e)}), 400 + log.info( + "metamorph.group.permissions_set", + extra={"group_id": str(gid), "count": len(payload.codes)}, + ) + return jsonify(_serialize(g)) diff --git a/backend/app/api/permissions.py b/backend/app/api/permissions.py new file mode 100644 index 0000000..163ac78 --- /dev/null +++ b/backend/app/api/permissions.py @@ -0,0 +1,17 @@ +"""Read-only catalogue of platform permission codes.""" + +from __future__ import annotations + +from flask import Blueprint, jsonify + +from app.core.auth_decorators import require_auth, require_perm +from app.services import groups as groups_svc + +bp = Blueprint("permissions", __name__, url_prefix="/permissions") + + +@bp.get("") +@require_auth +@require_perm("group.read") +def list_permissions(): + return jsonify({"items": groups_svc.list_permissions()}) diff --git a/backend/app/api/users.py b/backend/app/api/users.py new file mode 100644 index 0000000..bc4bfbc --- /dev/null +++ b/backend/app/api/users.py @@ -0,0 +1,185 @@ +"""Admin endpoints for user management. + +Note: self-service updates (own display name, locale, password) belong to +`/auth/*`; this blueprint is admin-only. +""" + +from __future__ import annotations + +import logging +import uuid + +from flask import Blueprint, jsonify, request +from pydantic import BaseModel, Field, ValidationError + +from app.core.auth_decorators import require_auth, require_perm +from app.services import users as users_svc + +bp = Blueprint("users", __name__, url_prefix="/users") +log = logging.getLogger("metamorph.api.users") + + +def _serialize(u: users_svc.UserView) -> dict: + return { + "id": str(u.id), + "email": u.email, + "display_name": u.display_name, + "locale": u.locale, + "is_active": u.is_active, + "deleted_at": u.deleted_at.isoformat() if u.deleted_at else None, + "created_at": u.created_at.isoformat(), + "updated_at": u.updated_at.isoformat(), + "groups": [{"id": str(gid), "name": name} for gid, name in u.groups], + } + + +class UpdateUserPayload(BaseModel): + # display_name: omitted = no change, null = clear, str = set. + # Tri-state encoded with a `default-unset` sentinel via model_extra. + display_name: str | None = None + locale: str | None = Field(default=None, pattern=r"^[a-z]{2}$") + is_active: bool | None = None + + model_config = {"extra": "forbid"} + + +class SetGroupsPayload(BaseModel): + group_ids: list[uuid.UUID] + + model_config = {"extra": "forbid"} + + +def _parse_uuid_or_400(raw: str): + try: + return uuid.UUID(raw) + except ValueError: + return None + + +@bp.get("") +@require_auth +@require_perm("user.read") +def list_users(): + q = request.args.get("q") or None + is_active_raw = request.args.get("is_active") + is_active: bool | None + if is_active_raw is None: + is_active = None + elif is_active_raw.lower() in ("true", "1", "yes"): + is_active = True + elif is_active_raw.lower() in ("false", "0", "no"): + is_active = False + else: + return jsonify({"error": "invalid_is_active"}), 400 + + try: + limit = int(request.args.get("limit", "50")) + offset = int(request.args.get("offset", "0")) + except ValueError: + return jsonify({"error": "invalid_pagination"}), 400 + limit = max(1, min(limit, 200)) + offset = max(0, offset) + + rows, total = users_svc.list_users(q=q, is_active=is_active, limit=limit, offset=offset) + return jsonify( + { + "items": [_serialize(u) for u in rows], + "total": total, + "limit": limit, + "offset": offset, + } + ) + + +@bp.get("/") +@require_auth +@require_perm("user.read") +def get_user(user_id: str): + uid = _parse_uuid_or_400(user_id) + if uid is None: + return jsonify({"error": "invalid_id"}), 400 + try: + u = users_svc.get_user(uid) + except users_svc.UserNotFound: + return jsonify({"error": "not_found"}), 404 + return jsonify(_serialize(u)) + + +@bp.patch("/") +@require_auth +@require_perm("user.update") +def update_user(user_id: str): + uid = _parse_uuid_or_400(user_id) + if uid is None: + return jsonify({"error": "invalid_id"}), 400 + raw = request.get_json(silent=True) or {} + try: + payload = UpdateUserPayload.model_validate(raw) + except ValidationError as e: + return jsonify({"error": "invalid_request", "details": e.errors()}), 400 + + # Distinguish "key absent" (no change) from "key=null" (clear) for display_name. + display_name_unset = "display_name" not in raw + + try: + u = users_svc.update_user( + uid, + display_name=... if display_name_unset else payload.display_name, + locale=payload.locale, + is_active=payload.is_active, + ) + except users_svc.UserNotFound: + return jsonify({"error": "not_found"}), 404 + except users_svc.LastAdminProtected as e: + return jsonify({"error": "last_admin_protected", "message": str(e)}), 409 + log.info( + "metamorph.user.updated", + extra={ + "user_id": str(uid), + "fields": sorted(raw.keys()), + }, + ) + return jsonify(_serialize(u)) + + +@bp.delete("/") +@require_auth +@require_perm("user.delete") +def soft_delete(user_id: str): + uid = _parse_uuid_or_400(user_id) + if uid is None: + return jsonify({"error": "invalid_id"}), 400 + try: + users_svc.soft_delete_user(uid) + except users_svc.UserNotFound: + return jsonify({"error": "not_found"}), 404 + except users_svc.LastAdminProtected as e: + return jsonify({"error": "last_admin_protected", "message": str(e)}), 409 + log.info("metamorph.user.soft_deleted", extra={"user_id": str(uid)}) + return jsonify({"ok": True}) + + +@bp.put("//groups") +@require_auth +@require_perm("user.update") +def set_groups(user_id: str): + uid = _parse_uuid_or_400(user_id) + if uid is None: + return jsonify({"error": "invalid_id"}), 400 + try: + payload = SetGroupsPayload.model_validate(request.get_json(silent=True) or {}) + except ValidationError as e: + return jsonify({"error": "invalid_request", "details": e.errors()}), 400 + try: + u = users_svc.set_user_groups(uid, payload.group_ids) + except users_svc.UserNotFound: + return jsonify({"error": "not_found"}), 404 + except users_svc.LastAdminProtected as e: + return jsonify({"error": "last_admin_protected", "message": str(e)}), 409 + except ValueError as e: + return jsonify({"error": "invalid_request", "message": str(e)}), 400 + log.info( + "metamorph.user.groups_set", + extra={"user_id": str(uid), "groups": [str(g) for g in payload.group_ids]}, + ) + return jsonify(_serialize(u)) diff --git a/backend/app/services/groups.py b/backend/app/services/groups.py new file mode 100644 index 0000000..1270486 --- /dev/null +++ b/backend/app/services/groups.py @@ -0,0 +1,210 @@ +"""Admin-side group management: CRUD + permission bindings. + +System groups (`is_system=True`: admin, redteam, blueteam) cannot be renamed +or deleted, but their permission bindings are seeded on boot and editable +afterwards (e.g. an admin can broaden `redteam`). +""" + +from __future__ import annotations + +import uuid +from dataclasses import dataclass +from datetime import datetime, timezone + +from sqlalchemy import func, select + +from app.db.session import session_scope +from app.models.auth import Group, GroupPermission, Permission, UserGroup +from app.services.bootstrap import ADMIN_GROUP_NAME + + +class GroupNotFound(Exception): + pass + + +class GroupNameConflict(Exception): + pass + + +class SystemGroupProtected(Exception): + """Refusing to delete or rename a built-in system group.""" + + +@dataclass(frozen=True) +class GroupView: + id: uuid.UUID + name: str + description: str | None + is_system: bool + deleted_at: datetime | None + members_count: int + permissions: list[str] + created_at: datetime + updated_at: datetime + + +def _to_view(g: Group, members_count: int) -> GroupView: + return GroupView( + id=g.id, + name=g.name, + description=g.description, + is_system=g.is_system, + deleted_at=g.deleted_at, + members_count=members_count, + permissions=sorted(p.code for p in g.permissions), + created_at=g.created_at, + updated_at=g.updated_at, + ) + + +def _members_counts(s, group_ids: list[uuid.UUID]) -> dict[uuid.UUID, int]: + if not group_ids: + return {} + from app.models.auth import User as _U # local to avoid model cycles + + rows = s.execute( + select(UserGroup.group_id, func.count(UserGroup.user_id)) + .join(_U, _U.id == UserGroup.user_id) + .where(UserGroup.group_id.in_(group_ids), _U.deleted_at.is_(None)) + .group_by(UserGroup.group_id) + ).all() + return {gid: int(cnt) for gid, cnt in rows} + + +def list_groups(*, include_deleted: bool = False) -> list[GroupView]: + with session_scope() as s: + stmt = select(Group).order_by(Group.is_system.desc(), Group.name.asc()) + if not include_deleted: + stmt = stmt.where(Group.deleted_at.is_(None)) + rows = s.scalars(stmt).all() + counts = _members_counts(s, [g.id for g in rows]) + return [_to_view(g, counts.get(g.id, 0)) for g in rows] + + +def get_group(group_id: uuid.UUID) -> GroupView: + with session_scope() as s: + g = s.get(Group, group_id) + if g is None or g.deleted_at is not None: + raise GroupNotFound() + counts = _members_counts(s, [g.id]) + return _to_view(g, counts.get(g.id, 0)) + + +def create_group(*, name: str, description: str | None) -> GroupView: + name_norm = name.strip() + if not name_norm: + raise ValueError("name is required") + with session_scope() as s: + existing = s.scalar( + select(Group).where(Group.name == name_norm, Group.deleted_at.is_(None)) + ) + if existing is not None: + raise GroupNameConflict(f"group name {name_norm!r} already in use") + g = Group(name=name_norm, description=(description or "").strip() or None, is_system=False) + s.add(g) + s.flush() + return _to_view(g, 0) + + +def update_group( + group_id: uuid.UUID, + *, + name: str | None = None, + description: str | None | object = ..., +) -> GroupView: + with session_scope() as s: + g = s.get(Group, group_id) + if g is None or g.deleted_at is not None: + raise GroupNotFound() + if name is not None: + name_norm = name.strip() + if not name_norm: + raise ValueError("name cannot be empty") + if g.is_system and name_norm != g.name: + raise SystemGroupProtected("system groups cannot be renamed") + if name_norm != g.name: + clash = s.scalar( + select(Group).where( + Group.name == name_norm, + Group.deleted_at.is_(None), + Group.id != g.id, + ) + ) + if clash is not None: + raise GroupNameConflict(f"group name {name_norm!r} already in use") + g.name = name_norm + if description is not ...: + if description in (None, ""): + g.description = None + else: + g.description = description.strip() or None + + counts = _members_counts(s, [g.id]) + return _to_view(g, counts.get(g.id, 0)) + + +def soft_delete_group(group_id: uuid.UUID) -> None: + with session_scope() as s: + g = s.get(Group, group_id) + if g is None or g.deleted_at is not None: + raise GroupNotFound() + if g.is_system: + raise SystemGroupProtected("system groups cannot be deleted") + g.deleted_at = datetime.now(tz=timezone.utc) + + +def set_group_permissions(group_id: uuid.UUID, codes: list[str]) -> GroupView: + """Replace the group's permission set with the given codes (validated).""" + desired_codes = set(codes) + with session_scope() as s: + g = s.get(Group, group_id) + if g is None or g.deleted_at is not None: + raise GroupNotFound() + + # Preserve the invariant "the system `admin` group has every perm." The + # decorator's admin bypass relies on `is_admin` (group membership), not + # on the perm set, so a stripped admin group would still grant access — + # but the listing would look misleading and a future refactor could + # reasonably switch the bypass to a perm-based check. + if g.is_system and g.name == ADMIN_GROUP_NAME: + all_codes = {p.code for p in s.scalars(select(Permission)).all()} + if desired_codes != all_codes: + raise SystemGroupProtected( + "the admin group must keep every permission" + ) + + if desired_codes: + perms = s.scalars(select(Permission).where(Permission.code.in_(desired_codes))).all() + known = {p.code for p in perms} + unknown = desired_codes - known + if unknown: + raise ValueError(f"unknown permission codes: {sorted(unknown)}") + else: + perms = [] + + current = {p.code: p for p in g.permissions} + to_remove = set(current) - desired_codes + to_add = desired_codes - set(current) + + for code in to_remove: + row = s.get(GroupPermission, (g.id, current[code].id)) + if row is not None: + s.delete(row) + for p in perms: + if p.code in to_add: + s.add(GroupPermission(group_id=g.id, permission_id=p.id)) + + s.flush() + s.refresh(g) + counts = _members_counts(s, [g.id]) + return _to_view(g, counts.get(g.id, 0)) + + +def list_permissions() -> list[dict]: + """Return the catalogue of all permissions known to the platform.""" + with session_scope() as s: + rows = s.scalars(select(Permission).order_by(Permission.code.asc())).all() + return [ + {"id": str(p.id), "code": p.code, "description": p.description} + for p in rows + ] diff --git a/backend/app/services/permissions_seed.py b/backend/app/services/permissions_seed.py new file mode 100644 index 0000000..643e911 --- /dev/null +++ b/backend/app/services/permissions_seed.py @@ -0,0 +1,179 @@ +"""Atomic permission catalogue + seed for the 3 default system groups. + +Permissions follow the `.` convention. They are the ground truth +checked by `@require_perm`; admins bypass everything (cf. `auth_decorators.py`). + +This module is the single place that lists every permission code shipped with +the platform. To add a new perm in a future milestone: + + 1. Add an entry to `PERMISSION_CATALOGUE`. + 2. Decide which system group(s) should get it by default — edit + `_default_redteam_perms()` / `_default_blueteam_perms()` if relevant + (admin always gets everything, so no edit needed there). + 3. The next boot picks it up; existing groups are *upgraded* (perms added), + never downgraded (we never remove perms from a system group, even if you + trim the catalogue — that would be a destructive op disguised as a seed). + +The seed is idempotent and safe to call on every boot. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass + +from sqlalchemy import select + +from app.db.session import session_scope +from app.models.auth import Group, GroupPermission, Permission +from app.services.bootstrap import ( + ADMIN_GROUP_NAME, + BLUETEAM_GROUP_NAME, + REDTEAM_GROUP_NAME, +) + +log = logging.getLogger("metamorph.permissions") + + +@dataclass(frozen=True) +class PermissionDef: + code: str + description: str + + +# === Catalogue ================================================================ +# +# Order is presentation-only; the seed is idempotent. Grouped by family to keep +# diffs reviewable and to mirror the admin UI grouping in M3.6. +# +PERMISSION_CATALOGUE: tuple[PermissionDef, ...] = ( + # users + PermissionDef("user.read", "View users."), + PermissionDef("user.create", "Create users (typically via invitation)."), + PermissionDef("user.update", "Update user metadata (display name, locale, active flag)."), + PermissionDef("user.delete", "Soft-delete a user."), + # groups + PermissionDef("group.read", "View groups and their permissions."), + PermissionDef("group.create", "Create a custom group."), + PermissionDef("group.update", "Edit a custom group (name, description, permissions, members)."), + PermissionDef("group.delete", "Soft-delete a custom group."), + # invitations + PermissionDef("invitation.read", "View pending invitations."), + PermissionDef("invitation.create", "Issue a new invitation URL."), + PermissionDef("invitation.revoke", "Revoke an unconsumed invitation."), + # test templates + PermissionDef("test_template.read", "View the test-template catalogue."), + PermissionDef("test_template.create", "Create a test template."), + PermissionDef("test_template.update", "Edit a test template."), + PermissionDef("test_template.delete", "Soft-delete a test template."), + # scenario templates + PermissionDef("scenario_template.read", "View the scenario-template catalogue."), + PermissionDef("scenario_template.create", "Create a scenario template."), + PermissionDef("scenario_template.update", "Edit a scenario template (and its ordered tests)."), + PermissionDef("scenario_template.delete", "Soft-delete a scenario template."), + # missions + PermissionDef("mission.read", "View missions (server still filters by membership for non-admin)."), + PermissionDef("mission.create", "Create a mission."), + PermissionDef("mission.update", "Edit mission metadata, scenarios, members."), + PermissionDef("mission.archive", "Move a mission to status=archived."), + PermissionDef("mission.delete", "Soft-delete a mission."), + PermissionDef("mission.write_red_fields", "Write red-side fields on a mission test."), + PermissionDef("mission.write_blue_fields", "Write blue-side fields and upload evidence."), + # detection levels + platform settings + MITRE sync + PermissionDef("detection_level.read", "View the detection-level taxonomy."), + PermissionDef("detection_level.update", "Edit the detection-level taxonomy."), + PermissionDef("setting.read", "Read platform settings."), + PermissionDef("setting.update", "Update platform settings."), + PermissionDef("mitre.sync", "Trigger a MITRE ATT&CK Enterprise re-sync."), +) + + +def _default_redteam_perms() -> frozenset[str]: + return frozenset( + { + # catalogue read-only + "test_template.read", + "scenario_template.read", + # MITRE/detection refs + "detection_level.read", + # missions: full lifecycle on red side + "mission.read", + "mission.create", + "mission.update", + "mission.archive", + "mission.write_red_fields", + } + ) + + +def _default_blueteam_perms() -> frozenset[str]: + return frozenset( + { + "test_template.read", + "scenario_template.read", + "detection_level.read", + "mission.read", + "mission.write_blue_fields", + } + ) + + +def _all_perm_codes() -> frozenset[str]: + return frozenset(p.code for p in PERMISSION_CATALOGUE) + + +def seed_permissions() -> dict[str, int]: + """Insert any missing permissions. Returns counts: `created`, `total`.""" + created = 0 + with session_scope() as s: + existing_codes = set(s.scalars(select(Permission.code)).all()) + for p in PERMISSION_CATALOGUE: + if p.code in existing_codes: + continue + s.add(Permission(code=p.code, description=p.description)) + created += 1 + return {"created": created, "total": len(PERMISSION_CATALOGUE)} + + +def _assign_perms_to_group(group_name: str, codes: frozenset[str]) -> int: + """Attach the named perms to the given system group. Returns added count. + + We never *remove* perms from a system group here — the seed is additive. + Admins/operators who want to revoke must do so explicitly via the UI/API. + """ + if not codes: + return 0 + added = 0 + with session_scope() as s: + group = s.scalar(select(Group).where(Group.name == group_name, Group.is_system.is_(True))) + if group is None: + raise RuntimeError(f"system group {group_name!r} missing — call ensure_system_groups() first") + + existing_codes = {p.code for p in group.permissions} + perms = s.scalars(select(Permission).where(Permission.code.in_(codes))).all() + for p in perms: + if p.code in existing_codes: + continue + s.add(GroupPermission(group_id=group.id, permission_id=p.id)) + added += 1 + return added + + +def seed_default_group_permissions() -> dict[str, int]: + """Bind the catalogue to the 3 default groups. Idempotent + additive.""" + counts: dict[str, int] = {} + counts[ADMIN_GROUP_NAME] = _assign_perms_to_group(ADMIN_GROUP_NAME, _all_perm_codes()) + counts[REDTEAM_GROUP_NAME] = _assign_perms_to_group(REDTEAM_GROUP_NAME, _default_redteam_perms()) + counts[BLUETEAM_GROUP_NAME] = _assign_perms_to_group(BLUETEAM_GROUP_NAME, _default_blueteam_perms()) + return counts + + +def seed_all() -> dict[str, dict[str, int]]: + """One-shot helper: catalogue + default group bindings.""" + perms = seed_permissions() + bindings = seed_default_group_permissions() + log.info( + "metamorph.permissions.seeded", + extra={"perms_created": perms["created"], "perms_total": perms["total"], "bindings": bindings}, + ) + return {"permissions": perms, "bindings": bindings} diff --git a/backend/app/services/users.py b/backend/app/services/users.py new file mode 100644 index 0000000..9e2f676 --- /dev/null +++ b/backend/app/services/users.py @@ -0,0 +1,204 @@ +"""Admin-side user management: list, get, update, soft-delete, assign groups. + +Self-service updates (locale, password, display_name) live in +`services.auth` — this module is for admin operations on other users. +""" + +from __future__ import annotations + +import uuid +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Iterable + +from sqlalchemy import func, or_, select + +from app.db.session import session_scope +from app.models.auth import Group, User, UserGroup +from app.services.bootstrap import ADMIN_GROUP_NAME + + +class UserNotFound(Exception): + pass + + +class LastAdminProtected(Exception): + """Refusing to strip admin from the last active admin.""" + + +class SystemGroupProtected(Exception): + """Refusing to delete or rename a built-in system group.""" + + +@dataclass(frozen=True) +class UserView: + id: uuid.UUID + email: str + display_name: str | None + locale: str + is_active: bool + deleted_at: datetime | None + created_at: datetime + updated_at: datetime + groups: list[tuple[uuid.UUID, str]] + + +def _to_view(u: User) -> UserView: + return UserView( + id=u.id, + email=u.email, + display_name=u.display_name, + locale=u.locale, + is_active=u.is_active, + deleted_at=u.deleted_at, + created_at=u.created_at, + updated_at=u.updated_at, + groups=[(g.id, g.name) for g in u.groups if g.deleted_at is None], + ) + + +def list_users( + *, + q: str | None = None, + is_active: bool | None = None, + include_deleted: bool = False, + limit: int = 50, + offset: int = 0, +) -> tuple[list[UserView], int]: + """Return (rows, total_count) with case-insensitive search on email + display_name.""" + with session_scope() as s: + stmt = select(User) + count_stmt = select(func.count()).select_from(User) + if not include_deleted: + stmt = stmt.where(User.deleted_at.is_(None)) + count_stmt = count_stmt.where(User.deleted_at.is_(None)) + if is_active is not None: + stmt = stmt.where(User.is_active.is_(is_active)) + count_stmt = count_stmt.where(User.is_active.is_(is_active)) + if q: + like = f"%{q.lower()}%" + stmt = stmt.where( + or_(func.lower(User.email).like(like), func.lower(User.display_name).like(like)) + ) + count_stmt = count_stmt.where( + or_(func.lower(User.email).like(like), func.lower(User.display_name).like(like)) + ) + stmt = stmt.order_by(User.email.asc()).limit(limit).offset(offset) + rows = s.scalars(stmt).all() + total = int(s.scalar(count_stmt) or 0) + views = [_to_view(u) for u in rows] + return views, total + + +def get_user(user_id: uuid.UUID, *, include_deleted: bool = False) -> UserView: + with session_scope() as s: + u = s.get(User, user_id) + if u is None or (u.deleted_at is not None and not include_deleted): + raise UserNotFound() + return _to_view(u) + + +def update_user( + user_id: uuid.UUID, + *, + display_name: str | None | object = ..., + locale: str | None = None, + is_active: bool | None = None, +) -> UserView: + """Partial update. Pass display_name=None to clear; omit to leave unchanged.""" + with session_scope() as s: + u = s.get(User, user_id) + if u is None or u.deleted_at is not None: + raise UserNotFound() + if display_name is not ...: + if display_name in (None, ""): + u.display_name = None + else: + u.display_name = display_name.strip() or None + if locale is not None: + u.locale = locale + if is_active is not None: + # If deactivating the last active admin, refuse. + if not is_active and _is_last_active_admin(s, u): + raise LastAdminProtected("cannot deactivate the last active admin") + u.is_active = is_active + return _to_view(u) + + +def soft_delete_user(user_id: uuid.UUID) -> None: + with session_scope() as s: + u = s.get(User, user_id) + if u is None or u.deleted_at is not None: + raise UserNotFound() + if _is_last_active_admin(s, u): + raise LastAdminProtected("cannot delete the last active admin") + u.deleted_at = datetime.now(tz=timezone.utc) + u.is_active = False + + +def set_user_groups(user_id: uuid.UUID, group_ids: Iterable[uuid.UUID]) -> UserView: + """Replace the user's group memberships with the given set.""" + desired = set(group_ids) + with session_scope() as s: + u = s.get(User, user_id) + if u is None or u.deleted_at is not None: + raise UserNotFound() + + # Resolve admin group id once. + admin_group_id = s.scalar( + select(Group.id).where(Group.name == ADMIN_GROUP_NAME, Group.is_system.is_(True)) + ) + is_currently_admin = admin_group_id in {g.id for g in u.groups} + will_be_admin = admin_group_id in desired + if is_currently_admin and not will_be_admin and _is_last_active_admin(s, u): + raise LastAdminProtected("cannot remove admin from the last active admin") + + # Refuse silently for unknown groups: validate first. + if desired: + known = set( + s.scalars( + select(Group.id).where(Group.id.in_(desired), Group.deleted_at.is_(None)) + ).all() + ) + unknown = desired - known + if unknown: + raise ValueError(f"unknown groups: {sorted(map(str, unknown))}") + + current = {g.id for g in u.groups} + to_add = desired - current + to_remove = current - desired + + for gid in to_remove: + row = s.get(UserGroup, (u.id, gid)) + if row is not None: + s.delete(row) + for gid in to_add: + s.add(UserGroup(user_id=u.id, group_id=gid)) + + s.flush() + s.refresh(u) + return _to_view(u) + + +def _is_last_active_admin(s, user: User) -> bool: + """True when `user` is currently in the admin system group and removing/blocking + them would leave the platform with zero active admins.""" + admin_group_id = s.scalar( + select(Group.id).where(Group.name == ADMIN_GROUP_NAME, Group.is_system.is_(True)) + ) + if admin_group_id is None: + return False + if admin_group_id not in {g.id for g in user.groups}: + return False + other_admins = s.scalar( + select(func.count()) + .select_from(User) + .join(UserGroup, UserGroup.user_id == User.id) + .where( + UserGroup.group_id == admin_group_id, + User.id != user.id, + User.deleted_at.is_(None), + User.is_active.is_(True), + ) + ) + return int(other_admins or 0) == 0 diff --git a/backend/tests/test_rbac.py b/backend/tests/test_rbac.py new file mode 100644 index 0000000..4753ca2 --- /dev/null +++ b/backend/tests/test_rbac.py @@ -0,0 +1,344 @@ +"""Integration tests for M3: permission seed + users/groups/permissions APIs. + +Exercises the Flask test client against a live Postgres. The DB is wiped at +module load so test ordering inside the module matters (see `pytest.shared_*`). +""" + +from __future__ import annotations + +import secrets + +import pytest +from sqlalchemy import text + +from app.core.install_token import regenerate_install_token +from app.main import create_app +from app.services.permissions_seed import PERMISSION_CATALOGUE + + +def _truncate_all(engine): + """Wipe data plus permissions table. CASCADE handles dependent rows.""" + with engine.begin() as conn: + conn.execute( + text( + "TRUNCATE users, refresh_tokens, invitations, invitation_groups, " + "user_groups, group_permissions, permissions, settings, groups " + "RESTART IDENTITY CASCADE" + ) + ) + + +@pytest.fixture(scope="module") +def app(db_engine_or_skip): + _truncate_all(db_engine_or_skip) + flask_app = create_app() # triggers bootstrap → seed_all() + flask_app.config.update(TESTING=True) + return flask_app + + +@pytest.fixture() +def client(app): + return app.test_client() + + +def _unique_email(prefix: str) -> str: + return f"{prefix}-{secrets.token_hex(4)}@metamorph.local" + + +def _login(client, email: str, password: str) -> str: + r = client.post("/api/v1/auth/login", json={"email": email, "password": password}) + assert r.status_code == 200, r.get_data(as_text=True) + return r.get_json()["access_token"] + + +# -- M3.1 — Permissions seeded at boot ----------------------------------------- + + +def test_permissions_catalogue_seeded(client): + """Catalogue table has every code from PERMISSION_CATALOGUE.""" + # We need an admin to call /permissions — bootstrap one via /setup. + token = regenerate_install_token() + email = _unique_email("admin") + r = client.post( + "/api/v1/setup", + json={ + "install_token": token, + "email": email, + "password": "AdminPass1234!", + "display_name": "Admin", + }, + ) + assert r.status_code == 201, r.get_data(as_text=True) + pytest.shared_admin = {"email": email, "password": "AdminPass1234!", "user_id": r.get_json()["user_id"]} # type: ignore[attr-defined] + + access = _login(client, email, "AdminPass1234!") + perms = client.get( + "/api/v1/permissions", headers={"Authorization": f"Bearer {access}"} + ).get_json() + codes = {p["code"] for p in perms["items"]} + expected = {p.code for p in PERMISSION_CATALOGUE} + assert expected.issubset(codes) + + +def test_admin_group_has_every_permission(client): + admin = pytest.shared_admin + access = _login(client, admin["email"], admin["password"]) + groups = client.get( + "/api/v1/groups", headers={"Authorization": f"Bearer {access}"} + ).get_json() + admin_group = next(g for g in groups["items"] if g["name"] == "admin") + assert set(admin_group["permissions"]) == {p.code for p in PERMISSION_CATALOGUE} + assert admin_group["is_system"] is True + + +def test_redteam_group_has_red_perms_but_not_blue_write(client): + admin = pytest.shared_admin + access = _login(client, admin["email"], admin["password"]) + groups = client.get( + "/api/v1/groups", headers={"Authorization": f"Bearer {access}"} + ).get_json() + redteam = next(g for g in groups["items"] if g["name"] == "redteam") + assert "mission.write_red_fields" in redteam["permissions"] + assert "mission.write_blue_fields" not in redteam["permissions"] + assert "mission.create" in redteam["permissions"] + + +def test_blueteam_group_has_blue_perm_but_not_red_write(client): + admin = pytest.shared_admin + access = _login(client, admin["email"], admin["password"]) + groups = client.get( + "/api/v1/groups", headers={"Authorization": f"Bearer {access}"} + ).get_json() + blueteam = next(g for g in groups["items"] if g["name"] == "blueteam") + assert "mission.write_blue_fields" in blueteam["permissions"] + assert "mission.write_red_fields" not in blueteam["permissions"] + assert "mission.create" not in blueteam["permissions"] + + +# -- M3.2 — Users CRUD --------------------------------------------------------- + + +def _invite_user(client, admin_access: str, email: str, password: str, group_ids: list[str] | None = None) -> str: + """Create + accept an invitation, return the new user's id.""" + create = client.post( + "/api/v1/invitations", + headers={"Authorization": f"Bearer {admin_access}"}, + json={"email_hint": email, "group_ids": group_ids or []}, + ) + assert create.status_code == 201, create.get_data(as_text=True) + token = create.get_json()["token"] + accept = client.post( + f"/api/v1/invitations/accept/{token}", + json={"email": email, "password": password, "display_name": email.split("@")[0]}, + ) + assert accept.status_code == 201, accept.get_data(as_text=True) + return accept.get_json()["user_id"] + + +def test_admin_lists_users(client): + admin = pytest.shared_admin + access = _login(client, admin["email"], admin["password"]) + r = client.get("/api/v1/users", headers={"Authorization": f"Bearer {access}"}) + assert r.status_code == 200 + body = r.get_json() + assert body["total"] >= 1 + emails = [u["email"] for u in body["items"]] + assert admin["email"] in emails + + +def test_admin_updates_a_user(client): + admin = pytest.shared_admin + access = _login(client, admin["email"], admin["password"]) + bob_email = _unique_email("bob") + bob_id = _invite_user(client, access, bob_email, "BobPass1234!") + + r = client.patch( + f"/api/v1/users/{bob_id}", + headers={"Authorization": f"Bearer {access}"}, + json={"display_name": "Robert", "locale": "en"}, + ) + assert r.status_code == 200, r.get_data(as_text=True) + body = r.get_json() + assert body["display_name"] == "Robert" + assert body["locale"] == "en" + + +def test_admin_soft_deletes_a_user(client): + admin = pytest.shared_admin + access = _login(client, admin["email"], admin["password"]) + target_email = _unique_email("ghost") + target_id = _invite_user(client, access, target_email, "GhostPass1234!") + + r = client.delete( + f"/api/v1/users/{target_id}", + headers={"Authorization": f"Bearer {access}"}, + ) + assert r.status_code == 200, r.get_data(as_text=True) + + # Listing must not return the deleted user by default. + listing = client.get( + "/api/v1/users", headers={"Authorization": f"Bearer {access}"} + ).get_json() + assert target_email not in [u["email"] for u in listing["items"]] + + +def test_last_admin_cannot_be_deleted(client): + admin = pytest.shared_admin + access = _login(client, admin["email"], admin["password"]) + r = client.delete( + f"/api/v1/users/{admin['user_id']}", + headers={"Authorization": f"Bearer {access}"}, + ) + assert r.status_code == 409 + assert r.get_json()["error"] == "last_admin_protected" + + +def test_last_admin_cannot_be_deactivated(client): + admin = pytest.shared_admin + access = _login(client, admin["email"], admin["password"]) + r = client.patch( + f"/api/v1/users/{admin['user_id']}", + headers={"Authorization": f"Bearer {access}"}, + json={"is_active": False}, + ) + assert r.status_code == 409 + assert r.get_json()["error"] == "last_admin_protected" + + +# -- M3.3 — Groups CRUD -------------------------------------------------------- + + +def test_admin_creates_custom_group_and_assigns_perms(client): + admin = pytest.shared_admin + access = _login(client, admin["email"], admin["password"]) + + # Create the group + create = client.post( + "/api/v1/groups", + headers={"Authorization": f"Bearer {access}"}, + json={"name": f"pentest-{secrets.token_hex(3)}", "description": "Test group"}, + ) + assert create.status_code == 201, create.get_data(as_text=True) + gid = create.get_json()["id"] + + # Attach mission.read + mission.write_red_fields only + r = client.put( + f"/api/v1/groups/{gid}/permissions", + headers={"Authorization": f"Bearer {access}"}, + json={"codes": ["mission.read", "mission.write_red_fields"]}, + ) + assert r.status_code == 200, r.get_data(as_text=True) + assert set(r.get_json()["permissions"]) == {"mission.read", "mission.write_red_fields"} + + +def test_system_group_cannot_be_renamed_or_deleted(client): + admin = pytest.shared_admin + access = _login(client, admin["email"], admin["password"]) + groups = client.get( + "/api/v1/groups", headers={"Authorization": f"Bearer {access}"} + ).get_json() + admin_group = next(g for g in groups["items"] if g["name"] == "admin") + + rename = client.patch( + f"/api/v1/groups/{admin_group['id']}", + headers={"Authorization": f"Bearer {access}"}, + json={"name": "superadmin"}, + ) + assert rename.status_code == 409 + assert rename.get_json()["error"] == "system_group_protected" + + delete = client.delete( + f"/api/v1/groups/{admin_group['id']}", + headers={"Authorization": f"Bearer {access}"}, + ) + assert delete.status_code == 409 + assert delete.get_json()["error"] == "system_group_protected" + + +def test_setting_unknown_permission_code_returns_400(client): + admin = pytest.shared_admin + access = _login(client, admin["email"], admin["password"]) + create = client.post( + "/api/v1/groups", + headers={"Authorization": f"Bearer {access}"}, + json={"name": f"bad-perms-{secrets.token_hex(3)}", "description": None}, + ) + gid = create.get_json()["id"] + r = client.put( + f"/api/v1/groups/{gid}/permissions", + headers={"Authorization": f"Bearer {access}"}, + json={"codes": ["bogus.permission"]}, + ) + assert r.status_code == 400 + + +# -- M3 user ↔ group assignment ------------------------------------------------ + + +def test_admin_assigns_user_to_custom_group_and_perms_apply(client): + admin = pytest.shared_admin + access = _login(client, admin["email"], admin["password"]) + + # Create a custom group that *only* grants user.read. + gname = f"readers-{secrets.token_hex(3)}" + group = client.post( + "/api/v1/groups", + headers={"Authorization": f"Bearer {access}"}, + json={"name": gname, "description": None}, + ).get_json() + client.put( + f"/api/v1/groups/{group['id']}/permissions", + headers={"Authorization": f"Bearer {access}"}, + json={"codes": ["user.read"]}, + ) + + # Invite Dave, attach the new group via /users/{id}/groups. + dave_email = _unique_email("dave") + dave_id = _invite_user(client, access, dave_email, "DavePass1234!") + r = client.put( + f"/api/v1/users/{dave_id}/groups", + headers={"Authorization": f"Bearer {access}"}, + json={"group_ids": [group["id"]]}, + ) + assert r.status_code == 200, r.get_data(as_text=True) + + # Dave can now list users (user.read) but cannot create a group (group.create). + dave_access = _login(client, dave_email, "DavePass1234!") + can_read = client.get( + "/api/v1/users", headers={"Authorization": f"Bearer {dave_access}"} + ) + assert can_read.status_code == 200 + + cannot_create_group = client.post( + "/api/v1/groups", + headers={"Authorization": f"Bearer {dave_access}"}, + json={"name": "wont-happen", "description": None}, + ) + assert cannot_create_group.status_code == 403 + + +def test_last_admin_cannot_lose_admin_membership(client): + admin = pytest.shared_admin + access = _login(client, admin["email"], admin["password"]) + r = client.put( + f"/api/v1/users/{admin['user_id']}/groups", + headers={"Authorization": f"Bearer {access}"}, + json={"group_ids": []}, + ) + assert r.status_code == 409 + assert r.get_json()["error"] == "last_admin_protected" + + +# -- Permission enforcement ---------------------------------------------------- + + +def test_non_admin_without_user_read_gets_403_on_users_list(client): + admin = pytest.shared_admin + access = _login(client, admin["email"], admin["password"]) + # Invite Eve with no groups → no perms. + eve_email = _unique_email("eve") + _invite_user(client, access, eve_email, "EvePass1234!", group_ids=[]) + eve_access = _login(client, eve_email, "EvePass1234!") + + r = client.get("/api/v1/users", headers={"Authorization": f"Bearer {eve_access}"}) + assert r.status_code == 403 diff --git a/e2e/tests/m3-rbac.spec.ts b/e2e/tests/m3-rbac.spec.ts new file mode 100644 index 0000000..2e1b8e8 --- /dev/null +++ b/e2e/tests/m3-rbac.spec.ts @@ -0,0 +1,230 @@ +import { expect, test, type APIRequestContext, type Page } from '@playwright/test'; + +/** + * M3 — RBAC, group management, user assignment. + * + * Flow: + * 1. Reset + bootstrap a fresh admin. + * 2. Admin visits /admin/groups and creates a custom group `pentest-red` with + * only `mission.read` + `mission.write_red_fields`. + * 3. Admin issues an invitation pre-assigned to that custom group. + * 4. Invitee accepts, logs in, hits the API: mission.read OK, but admin-only + * group.create returns 403 — proving the union-of-perms decorator works. + * 5. Admin attempts to demote himself → server returns 409 last_admin_protected. + */ + +const ADMIN_EMAIL = `admin-${Math.floor(Math.random() * 1e6)}@metamorph.local`; +const ADMIN_PASSWORD = 'AdminPass1234!'; +const BOB_EMAIL = `bob-${Math.floor(Math.random() * 1e6)}@metamorph.local`; +const BOB_PASSWORD = 'BobPass1234!'; +const GROUP_NAME = `pentest-red-${Math.floor(Math.random() * 1e6)}`; + +interface ResetPayload { + install_token: string; +} + +async function resetAndMintToken(request: APIRequestContext): Promise { + const r = await request.post('/api/v1/diag/reset'); + expect(r.status()).toBe(200); + const body = (await r.json()) as ResetPayload; + return body.install_token; +} + +async function loginAndGetAccess( + request: APIRequestContext, + email: string, + password: string, +): Promise { + const r = await request.post('/api/v1/auth/login', { + data: { email, password }, + }); + expect(r.status()).toBe(200); + return (await r.json()).access_token as string; +} + +/** Authenticate the page session via the SPA login form. */ +async function loginViaSpa(page: Page, email: string, password: string) { + await page.goto('/login'); + await page.getByLabel(/email/i).fill(email); + await page.getByLabel(/password/i).fill(password); + await page.getByRole('button', { name: /sign in/i }).click(); + await expect(page.getByTestId('me-email')).toHaveText(email); +} + +test.describe.configure({ mode: 'serial' }); + +test.describe('M3 — RBAC management', () => { + let installToken: string; + let customGroupId: string; + + test.beforeAll(async ({ request }) => { + installToken = await resetAndMintToken(request); + // Bootstrap the first admin via API to keep the e2e focused on RBAC. + const setup = await request.post('/api/v1/setup', { + data: { + install_token: installToken, + email: ADMIN_EMAIL, + password: ADMIN_PASSWORD, + display_name: 'Admin', + }, + }); + expect(setup.status()).toBe(201); + }); + + test('admin sees Admin nav links after login', async ({ page }) => { + await loginViaSpa(page, ADMIN_EMAIL, ADMIN_PASSWORD); + // Nav now shows the admin links. + await expect(page.getByRole('link', { name: /^users$/i })).toBeVisible(); + await expect(page.getByRole('link', { name: /^groups$/i })).toBeVisible(); + await expect(page.getByRole('link', { name: /^invitations$/i })).toBeVisible(); + }); + + test('catalogue page lists the seeded permissions', async ({ request }) => { + const access = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD); + const r = await request.get('/api/v1/permissions', { + headers: { Authorization: `Bearer ${access}` }, + }); + expect(r.status()).toBe(200); + const body = (await r.json()) as { items: Array<{ code: string }> }; + const codes = body.items.map((p) => p.code); + // Smoke-check several families. + expect(codes).toEqual(expect.arrayContaining([ + 'user.read', + 'group.create', + 'invitation.create', + 'mission.write_red_fields', + 'mission.write_blue_fields', + 'mitre.sync', + ])); + }); + + test('admin creates a custom group with only red-write perms via the SPA', async ({ page }) => { + await loginViaSpa(page, ADMIN_EMAIL, ADMIN_PASSWORD); + await page.goto('/admin/groups'); + await page.getByTestId('create-group').click(); + const modal = page.getByTestId('group-create-modal'); + await expect(modal).toBeVisible(); + await modal.getByLabel(/^name$/i).fill(GROUP_NAME); + await modal.getByTestId('perm-mission.read').check(); + await modal.getByTestId('perm-mission.write_red_fields').check(); + await modal.getByTestId('group-create-save').click(); + + // The new card is visible in the listing. + await expect(modal).not.toBeVisible(); + await expect(page.getByText(GROUP_NAME)).toBeVisible(); + }); + + test('admin invites Bob pre-assigned to the custom group', async ({ page, request }) => { + // Fetch the group id (needed for the invitation API). + const access = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD); + const groups = await request.get('/api/v1/groups', { + headers: { Authorization: `Bearer ${access}` }, + }); + const items = ((await groups.json()) as { items: Array<{ id: string; name: string }> }).items; + customGroupId = items.find((g) => g.name === GROUP_NAME)!.id; + expect(customGroupId).toBeTruthy(); + + // Issue invitation via API (creating an invitation through the UI is covered in M2). + const created = await request.post('/api/v1/invitations', { + headers: { Authorization: `Bearer ${access}` }, + data: { email_hint: BOB_EMAIL, group_ids: [customGroupId] }, + }); + expect(created.status()).toBe(201); + const token = (await created.json()).token as string; + + // Bob completes registration. + await page.goto(`/register?token=${encodeURIComponent(token)}`); + await page.getByLabel(/email/i).fill(BOB_EMAIL); + await page.getByLabel('Password', { exact: true }).fill(BOB_PASSWORD); + await page.getByLabel(/confirm password/i).fill(BOB_PASSWORD); + await page.getByRole('button', { name: /create account/i }).click(); + await expect(page).toHaveURL(/\/login$/, { timeout: 5000 }); + }); + + test('Bob can read missions list but is forbidden from admin endpoints', async ({ request }) => { + const access = await loginAndGetAccess(request, BOB_EMAIL, BOB_PASSWORD); + // Inspect /auth/me to confirm his perms. + const me = await request.get('/api/v1/auth/me', { + headers: { Authorization: `Bearer ${access}` }, + }); + const body = (await me.json()) as { + is_admin: boolean; + permissions: string[]; + groups: string[]; + }; + expect(body.is_admin).toBe(false); + expect(body.groups).toContain(GROUP_NAME); + expect(body.permissions).toEqual( + expect.arrayContaining(['mission.read', 'mission.write_red_fields']), + ); + expect(body.permissions).not.toContain('mission.write_blue_fields'); + + // Bob does NOT have user.read → /users returns 403. + const usersList = await request.get('/api/v1/users', { + headers: { Authorization: `Bearer ${access}` }, + }); + expect(usersList.status()).toBe(403); + + // Bob does NOT have group.create → POST /groups returns 403. + const groupCreate = await request.post('/api/v1/groups', { + headers: { Authorization: `Bearer ${access}` }, + data: { name: 'wont-happen', description: null }, + }); + expect(groupCreate.status()).toBe(403); + }); + + test('non-admin SPA visitor cannot reach /admin/* routes', async ({ page }) => { + await loginViaSpa(page, BOB_EMAIL, BOB_PASSWORD); + // Direct nav to /admin/users — RequireAdmin redirects to /. + await page.goto('/admin/users'); + await expect(page).toHaveURL(/\/$/); + // The nav also hides the admin links. + await expect(page.getByRole('link', { name: /^users$/i })).toHaveCount(0); + }); + + test('last-admin protection prevents the bootstrap admin from being deleted', async ({ request }) => { + const access = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD); + const me = await request.get('/api/v1/auth/me', { + headers: { Authorization: `Bearer ${access}` }, + }); + const adminId = (await me.json()).id as string; + + const del = await request.delete(`/api/v1/users/${adminId}`, { + headers: { Authorization: `Bearer ${access}` }, + }); + expect(del.status()).toBe(409); + expect((await del.json()).error).toBe('last_admin_protected'); + }); + + test('admin promotes Bob and the new perms take effect', async ({ request }) => { + const access = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD); + // Find Bob. + const list = await request.get(`/api/v1/users?q=${encodeURIComponent(BOB_EMAIL)}`, { + headers: { Authorization: `Bearer ${access}` }, + }); + const bob = ((await list.json()) as { items: Array<{ id: string; email: string }> }).items.find( + (u) => u.email === BOB_EMAIL, + )!; + + // Find admin group id. + const groups = await request.get('/api/v1/groups', { + headers: { Authorization: `Bearer ${access}` }, + }); + const adminGroup = ((await groups.json()) as { items: Array<{ id: string; name: string }> }).items.find( + (g) => g.name === 'admin', + )!; + + const r = await request.put(`/api/v1/users/${bob.id}/groups`, { + headers: { Authorization: `Bearer ${access}` }, + data: { group_ids: [customGroupId, adminGroup.id] }, + }); + expect(r.status()).toBe(200); + + // Bob now has admin rights via group membership. + const bobAccess = await loginAndGetAccess(request, BOB_EMAIL, BOB_PASSWORD); + const groupsAsBob = await request.get('/api/v1/groups', { + headers: { Authorization: `Bearer ${bobAccess}` }, + }); + expect(groupsAsBob.status()).toBe(200); + }); +}); diff --git a/frontend/src/components/RequireAdmin.tsx b/frontend/src/components/RequireAdmin.tsx new file mode 100644 index 0000000..ff50df6 --- /dev/null +++ b/frontend/src/components/RequireAdmin.tsx @@ -0,0 +1,19 @@ +import type { ReactNode } from 'react'; +import { Navigate } from 'react-router-dom'; + +import { useAuth } from '@/lib/auth'; + +/** Server still arbitrates — this is a UI gate so non-admins don't see admin routes. */ +export function RequireAdmin({ children }: { children: ReactNode }) { + const { state } = useAuth(); + if (state.loading) { + return

Loading session…

; + } + if (!state.user) { + return ; + } + if (!state.user.is_admin) { + return ; + } + return <>{children}; +} diff --git a/frontend/src/components/ui/Modal.tsx b/frontend/src/components/ui/Modal.tsx new file mode 100644 index 0000000..0bdc464 --- /dev/null +++ b/frontend/src/components/ui/Modal.tsx @@ -0,0 +1,62 @@ +import { useEffect, useRef, type ReactNode } from 'react'; + +import { Button } from '@/components/ui/Button'; +import { SectionHeader } from '@/components/ui/SectionHeader'; +import { type Accent } from '@/lib/cn'; + +interface ModalProps { + open: boolean; + title: string; + accent?: Accent; + onClose: () => void; + children: ReactNode; + /** Optional name to give the dialog role for screen readers / Playwright. */ + testid?: string; +} + +/** + * Centered modal with a backdrop. Closes on Escape and on backdrop click. + * The accessible name comes from the SectionHeader's `highlight`, so the dialog + * can be located via `getByRole('dialog', { name: ... })`. + */ +export function Modal({ open, title, accent = 'cyan', onClose, children, testid }: ModalProps) { + const ref = useRef(null); + + useEffect(() => { + if (!open) return; + function onKey(e: KeyboardEvent) { + if (e.key === 'Escape') onClose(); + } + document.addEventListener('keydown', onKey); + return () => document.removeEventListener('keydown', onKey); + }, [open, onClose]); + + if (!open) return null; + + return ( +
{ + if (e.target === e.currentTarget) onClose(); + }} + role="presentation" + > +
+
+ + +
+ {children} +
+
+ ); +} diff --git a/frontend/src/lib/admin.ts b/frontend/src/lib/admin.ts new file mode 100644 index 0000000..1eacf66 --- /dev/null +++ b/frontend/src/lib/admin.ts @@ -0,0 +1,72 @@ +/** + * Shared types + query keys for admin pages (users / groups / invitations). + * Keeps the React Query cache coherent across the 3 admin pages. + */ + +export interface AdminUser { + id: string; + email: string; + display_name: string | null; + locale: string; + is_active: boolean; + deleted_at: string | null; + created_at: string; + updated_at: string; + groups: Array<{ id: string; name: string }>; +} + +export interface AdminUserListResponse { + items: AdminUser[]; + total: number; + limit: number; + offset: number; +} + +export interface AdminGroup { + id: string; + name: string; + description: string | null; + is_system: boolean; + members_count: number; + permissions: string[]; + created_at: string; + updated_at: string; +} + +export interface AdminGroupListResponse { + items: AdminGroup[]; + total: number; +} + +export interface AdminPermission { + id: string; + code: string; + description: string | null; +} + +export interface AdminInvitation { + id: string; + email_hint: string | null; + expires_at: string; + groups: string[]; +} + +export const adminKeys = { + users: ['admin', 'users'] as const, + user: (id: string) => ['admin', 'users', id] as const, + groups: ['admin', 'groups'] as const, + group: (id: string) => ['admin', 'groups', id] as const, + permissions: ['admin', 'permissions'] as const, + invitations: ['admin', 'invitations'] as const, +}; + +/** Group permission codes by family for the multi-select UI. */ +export function groupPermsByFamily(codes: string[]): Record { + const out: Record = {}; + for (const code of codes) { + const [family] = code.split('.', 1); + (out[family] ??= []).push(code); + } + for (const family of Object.keys(out)) out[family].sort(); + return out; +} diff --git a/frontend/src/pages/AdminGroupsPage.tsx b/frontend/src/pages/AdminGroupsPage.tsx new file mode 100644 index 0000000..ddfaf77 --- /dev/null +++ b/frontend/src/pages/AdminGroupsPage.tsx @@ -0,0 +1,348 @@ +import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query'; +import { useMemo, useState } from 'react'; + +import { Alert } from '@/components/ui/Alert'; +import { Button } from '@/components/ui/Button'; +import { Card } from '@/components/ui/Card'; +import { Modal } from '@/components/ui/Modal'; +import { SectionHeader } from '@/components/ui/SectionHeader'; +import { Tag } from '@/components/ui/Tag'; +import { TextField } from '@/components/ui/TextField'; +import { + ApiError, + apiDelete, + apiGet, + apiPatch, + apiPost, + apiPut, +} from '@/lib/api'; +import { + adminKeys, + groupPermsByFamily, + type AdminGroup, + type AdminGroupListResponse, + type AdminPermission, +} from '@/lib/admin'; + +function usePermissions() { + return useQuery({ + queryKey: adminKeys.permissions, + queryFn: () => apiGet<{ items: AdminPermission[] }>('/permissions'), + }); +} + +function useGroups() { + return useQuery({ + queryKey: adminKeys.groups, + queryFn: () => apiGet('/groups'), + }); +} + +export function AdminGroupsPage() { + const groups = useGroups(); + const perms = usePermissions(); + const [editing, setEditing] = useState(null); + const [creating, setCreating] = useState(false); + + return ( + <> + + +
+ + {groups.data ? `${groups.data.total} group${groups.data.total === 1 ? '' : 's'}` : ''} + + +
+ + {(groups.isError || perms.isError) && ( + Failed to load groups or permissions. + )} + +
+ {(groups.isLoading || perms.isLoading) && ( +

Loading…

+ )} + {groups.data?.items.map((g) => ( + +
+ {g.is_system && SYSTEM} + {g.members_count} member{g.members_count === 1 ? '' : 's'} + {g.permissions.length} perm{g.permissions.length === 1 ? '' : 's'} +
+ +
+
+
+ ))} + {groups.data && groups.data.items.length === 0 && ( +

No groups yet.

+ )} +
+ + {creating && perms.data && ( + setCreating(false)} /> + )} + {editing && perms.data && ( + setEditing(null)} + /> + )} + + ); +} + +interface PermsMultiSelectProps { + allPerms: AdminPermission[]; + selected: Set; + onToggle: (code: string) => void; +} + +function PermsMultiSelect({ allPerms, selected, onToggle }: PermsMultiSelectProps) { + const byFamily = useMemo( + () => groupPermsByFamily(allPerms.map((p) => p.code)), + [allPerms], + ); + return ( +
+ {Object.entries(byFamily).map(([family, codes]) => ( +
+

+ {family} +

+
+ {codes.map((code) => ( + + ))} +
+
+ ))} +
+ ); +} + +function GroupCreateModal({ + allPerms, + onClose, +}: { + allPerms: AdminPermission[]; + onClose: () => void; +}) { + const qc = useQueryClient(); + const [name, setName] = useState(''); + const [description, setDescription] = useState(''); + const [selected, setSelected] = useState>(new Set()); + const [error, setError] = useState(null); + + const createGroup = useMutation({ + mutationFn: () => + apiPost('/groups', { + name: name.trim(), + description: description.trim() || null, + }), + }); + const setPerms = useMutation({ + mutationFn: (groupId: string) => + apiPut(`/groups/${groupId}/permissions`, { codes: Array.from(selected) }), + }); + + async function save() { + setError(null); + try { + const g = await createGroup.mutateAsync(); + if (selected.size) await setPerms.mutateAsync(g.id); + await qc.invalidateQueries({ queryKey: adminKeys.groups }); + onClose(); + } catch (e) { + if (e instanceof ApiError) { + const p = e.payload as { error?: string; message?: string } | null; + setError(p?.message ?? p?.error ?? `HTTP ${e.status}`); + } else { + setError(e instanceof Error ? e.message : 'Save failed'); + } + } + } + + return ( + +
+ setName(e.target.value)} + hint="Lower-case-with-dashes recommended (e.g. pentest-2026-Q2)" + required + /> + setDescription(e.target.value)} + /> +
+

+ Permissions +

+ + setSelected((prev) => { + const next = new Set(prev); + if (next.has(code)) next.delete(code); + else next.add(code); + return next; + }) + } + /> +
+ {error && {error}} +
+ + +
+
+
+ ); +} + +function GroupEditModal({ + group, + allPerms, + onClose, +}: { + group: AdminGroup; + allPerms: AdminPermission[]; + onClose: () => void; +}) { + const qc = useQueryClient(); + const [name, setName] = useState(group.name); + const [description, setDescription] = useState(group.description ?? ''); + const [selected, setSelected] = useState>(new Set(group.permissions)); + const [error, setError] = useState(null); + + const patchGroup = useMutation({ + mutationFn: () => + apiPatch(`/groups/${group.id}`, { + name: group.is_system ? undefined : name.trim(), + description: description.trim() || null, + }), + }); + const setPerms = useMutation({ + mutationFn: () => + apiPut(`/groups/${group.id}/permissions`, { codes: Array.from(selected) }), + }); + const del = useMutation({ + mutationFn: () => apiDelete(`/groups/${group.id}`), + }); + + async function save() { + setError(null); + try { + await patchGroup.mutateAsync(); + await setPerms.mutateAsync(); + await qc.invalidateQueries({ queryKey: adminKeys.groups }); + onClose(); + } catch (e) { + if (e instanceof ApiError) { + const p = e.payload as { error?: string; message?: string } | null; + setError(p?.message ?? p?.error ?? `HTTP ${e.status}`); + } else { + setError(e instanceof Error ? e.message : 'Save failed'); + } + } + } + + async function handleDelete() { + if (!confirm(`Soft-delete group "${group.name}"?`)) return; + try { + await del.mutateAsync(); + await qc.invalidateQueries({ queryKey: adminKeys.groups }); + onClose(); + } catch (e) { + if (e instanceof ApiError) { + const p = e.payload as { error?: string; message?: string } | null; + setError(p?.message ?? p?.error ?? `HTTP ${e.status}`); + } else { + setError(e instanceof Error ? e.message : 'Delete failed'); + } + } + } + + return ( + +
+ setName(e.target.value)} + disabled={group.is_system} + hint={group.is_system ? 'System groups cannot be renamed.' : undefined} + /> + setDescription(e.target.value)} + /> +
+

+ Permissions +

+ + setSelected((prev) => { + const next = new Set(prev); + if (next.has(code)) next.delete(code); + else next.add(code); + return next; + }) + } + /> +
+ {error && {error}} +
+ {!group.is_system && ( + + )} +
+ + +
+
+
+
+ ); +} diff --git a/frontend/src/pages/AdminInvitationsPage.tsx b/frontend/src/pages/AdminInvitationsPage.tsx new file mode 100644 index 0000000..132e188 --- /dev/null +++ b/frontend/src/pages/AdminInvitationsPage.tsx @@ -0,0 +1,233 @@ +import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query'; +import { useState } from 'react'; + +import { Alert } from '@/components/ui/Alert'; +import { Button } from '@/components/ui/Button'; +import { Card } from '@/components/ui/Card'; +import { Modal } from '@/components/ui/Modal'; +import { SectionHeader } from '@/components/ui/SectionHeader'; +import { Tag } from '@/components/ui/Tag'; +import { TextField } from '@/components/ui/TextField'; +import { ApiError, apiGet, apiPost } from '@/lib/api'; +import { + adminKeys, + type AdminGroupListResponse, + type AdminInvitation, +} from '@/lib/admin'; + +function useInvitations() { + return useQuery({ + queryKey: adminKeys.invitations, + queryFn: () => apiGet('/invitations'), + }); +} + +function useGroups() { + return useQuery({ + queryKey: adminKeys.groups, + queryFn: () => apiGet('/groups'), + }); +} + +export function AdminInvitationsPage() { + const invs = useInvitations(); + const groups = useGroups(); + const [creating, setCreating] = useState(false); + const [showLink, setShowLink] = useState(null); + const qc = useQueryClient(); + + const revoke = useMutation({ + mutationFn: (id: string) => apiPost(`/invitations/${id}/revoke`), + onSuccess: () => qc.invalidateQueries({ queryKey: adminKeys.invitations }), + }); + + function buildLink(token: string): string { + return `${window.location.origin}/register?token=${encodeURIComponent(token)}`; + } + + return ( + <> + + +
+ + {invs.data ? `${invs.data.length} active invitation${invs.data.length === 1 ? '' : 's'}` : ''} + + +
+ + {invs.isError && Failed to load invitations.} + +
+ {invs.isLoading &&

Loading…

} + {invs.data?.map((inv) => ( + +
+ {inv.groups.map((g) => ( + + {g} + + ))} + {inv.groups.length === 0 && no pre-assigned groups} + +
+
+ ))} + {invs.data && invs.data.length === 0 && ( +

No active invitations.

+ )} +
+ + {creating && groups.data && ( + setCreating(false)} + onCreated={(token) => setShowLink(buildLink(token))} + /> + )} + + {showLink && ( + setShowLink(null)} title="invitation link" accent="green"> +
+

+ Copy this URL and send it to the invitee. It will be shown only once. +

+ + {showLink} + +
+ + +
+
+
+ )} + + ); +} + +function InvitationCreateModal({ + allGroups, + onClose, + onCreated, +}: { + allGroups: AdminGroupListResponse['items']; + onClose: () => void; + onCreated: (token: string) => void; +}) { + const qc = useQueryClient(); + const [emailHint, setEmailHint] = useState(''); + const [groupIds, setGroupIds] = useState([]); + const [error, setError] = useState(null); + + const create = useMutation({ + mutationFn: () => + apiPost<{ id: string; token: string; expires_at: string }>('/invitations', { + email_hint: emailHint.trim() || null, + group_ids: groupIds, + }), + }); + + async function save() { + setError(null); + try { + const r = await create.mutateAsync(); + await qc.invalidateQueries({ queryKey: adminKeys.invitations }); + onClose(); + onCreated(r.token); + } catch (e) { + if (e instanceof ApiError) { + const p = e.payload as { error?: string; message?: string } | null; + setError(p?.message ?? p?.error ?? `HTTP ${e.status}`); + } else { + setError(e instanceof Error ? e.message : 'Create failed'); + } + } + } + + return ( + +
+ setEmailHint(e.target.value)} + hint="Optional — purely informative, shown in the admin list." + /> +
+

+ Pre-assigned groups +

+
+ {allGroups.map((g) => { + const checked = groupIds.includes(g.id); + return ( + + ); + })} +
+
+ {error && {error}} +
+ + +
+
+
+ ); +} diff --git a/frontend/src/pages/AdminUsersPage.tsx b/frontend/src/pages/AdminUsersPage.tsx new file mode 100644 index 0000000..08050c9 --- /dev/null +++ b/frontend/src/pages/AdminUsersPage.tsx @@ -0,0 +1,260 @@ +import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query'; +import { useState } from 'react'; + +import { Alert } from '@/components/ui/Alert'; +import { Button } from '@/components/ui/Button'; +import { Card } from '@/components/ui/Card'; +import { Modal } from '@/components/ui/Modal'; +import { SectionHeader } from '@/components/ui/SectionHeader'; +import { Tag } from '@/components/ui/Tag'; +import { TextField } from '@/components/ui/TextField'; +import { + ApiError, + apiDelete, + apiGet, + apiPatch, + apiPut, +} from '@/lib/api'; +import { + adminKeys, + type AdminGroupListResponse, + type AdminUser, + type AdminUserListResponse, +} from '@/lib/admin'; + +function useUsers(q: string) { + return useQuery({ + queryKey: [...adminKeys.users, q], + queryFn: () => apiGet(`/users${q ? `?q=${encodeURIComponent(q)}` : ''}`), + }); +} + +function useGroups() { + return useQuery({ + queryKey: adminKeys.groups, + queryFn: () => apiGet('/groups'), + }); +} + +export function AdminUsersPage() { + const [q, setQ] = useState(''); + const [editing, setEditing] = useState(null); + const { data, isLoading, isError, error } = useUsers(q); + const groupsQuery = useGroups(); + + return ( + <> + + +
+ setQ(e.target.value)} + className="max-w-sm" + /> + + {data ? `${data.total} user${data.total === 1 ? '' : 's'}` : ''} + +
+ + {isError && ( + + {(error instanceof ApiError && (error.payload as { error?: string })?.error) || + 'Failed to load users.'} + + )} + +
+ {isLoading &&

Loading…

} + {data?.items.map((u) => ( + +
+ {u.groups.map((g) => ( + + {g.name} + + ))} + {!u.is_active && DISABLED} +
+ +
+
+
+ ))} + {data && data.items.length === 0 && ( +

No users match.

+ )} +
+ + {editing && groupsQuery.data && ( + setEditing(null)} + /> + )} + + ); +} + +interface UserEditModalProps { + user: AdminUser; + allGroups: AdminGroupListResponse['items']; + onClose: () => void; +} + +function UserEditModal({ user, allGroups, onClose }: UserEditModalProps) { + const qc = useQueryClient(); + const [displayName, setDisplayName] = useState(user.display_name ?? ''); + const [locale, setLocale] = useState(user.locale); + const [isActive, setIsActive] = useState(user.is_active); + const [groupIds, setGroupIds] = useState(user.groups.map((g) => g.id)); + const [error, setError] = useState(null); + + const invalidate = () => + Promise.all([ + qc.invalidateQueries({ queryKey: adminKeys.users }), + qc.invalidateQueries({ queryKey: adminKeys.groups }), + ]); + + const updateMeta = useMutation({ + mutationFn: () => + apiPatch(`/users/${user.id}`, { + display_name: displayName.trim() || null, + locale, + is_active: isActive, + }), + onSuccess: invalidate, + }); + + const updateGroups = useMutation({ + mutationFn: () => apiPut(`/users/${user.id}/groups`, { group_ids: groupIds }), + onSuccess: invalidate, + }); + + const softDelete = useMutation({ + mutationFn: () => apiDelete(`/users/${user.id}`), + onSuccess: () => invalidate().then(onClose), + }); + + async function handleSave() { + setError(null); + try { + await updateMeta.mutateAsync(); + await updateGroups.mutateAsync(); + onClose(); + } catch (e) { + if (e instanceof ApiError) { + const p = e.payload as { error?: string; message?: string } | null; + setError(p?.message ?? p?.error ?? `HTTP ${e.status}`); + } else { + setError(e instanceof Error ? e.message : 'Save failed'); + } + } + } + + async function handleDelete() { + setError(null); + if (!confirm(`Soft-delete user ${user.email}? They will be deactivated and hidden.`)) return; + try { + await softDelete.mutateAsync(); + } catch (e) { + if (e instanceof ApiError) { + const p = e.payload as { error?: string; message?: string } | null; + setError(p?.message ?? p?.error ?? `HTTP ${e.status}`); + } else { + setError(e instanceof Error ? e.message : 'Delete failed'); + } + } + } + + return ( + +
+ setDisplayName(e.target.value)} + /> + setLocale(e.target.value)} + hint="ISO-639-1 (fr or en)" + /> + + +
+

+ Groups +

+
+ {allGroups.map((g) => { + const checked = groupIds.includes(g.id); + return ( + + ); + })} +
+
+ + {error && {error}} + +
+ +
+ + +
+
+
+
+ ); +} diff --git a/tasks/testing-m3.md b/tasks/testing-m3.md new file mode 100644 index 0000000..5f89e1d --- /dev/null +++ b/tasks/testing-m3.md @@ -0,0 +1,102 @@ +--- +type: testing +milestone: M3 +date: "2026-05-11" +project: Metamorph +--- + +# Testing M3 — RBAC, groups, users, invitations + +## 1. Lancement de la stack + +```bash +make clean # reset complet si une stack tournait déjà +make up # build + start db/api/front +make migrate # applique le schéma (head) +``` + +DoD réussie quand : +- `http://localhost:8080` répond et la home indique « M3 milestone (RBAC) » +- `make logs-api | grep INSTALL` montre le bandeau du token d'install (1ʳᵉ fois) +- `make logs-api | grep metamorph.permissions.seeded` confirme `perms_total: 31` au boot + +## 2. Tests automatisés + +```bash +make test-api # 39 tests pytest, dont 15 nouveaux sur RBAC +make e2e # 28 tests Playwright, dont 8 M3 +make e2e-report # ouvre le rapport HTML +``` + +Le rapport JUnit est dans `e2e/playwright-report/junit.xml`. + +## 3. Procédure manuelle (smoke navigateur) + +### Pré-requis +1. Stack up via `make up`. +2. `make print-install-token` → noter le token (ou `make logs-api` pour le bandeau). + +### 3.1 Bootstrap admin +1. Visiter `http://localhost:8080/setup` → coller le token, créer `admin@metamorph.local` / `AdminPass1234!`. +2. Redirection auto vers `/login` après ~1.5 s. +3. Se connecter avec les mêmes identifiants. +4. La barre de nav fait apparaître les liens **Users / Groups / Invitations** (visibles uniquement quand `is_admin === true`). + +### 3.2 Page Groups (`/admin/groups`) +1. Cliquer **+ New group** → modale `new group`. +2. Nom : `pentest-2026-Q2`, description libre. +3. Cocher uniquement `mission.read` et `mission.write_red_fields` dans la grille de permissions. +4. **Create group** → la carte apparaît dans la liste avec les badges `2 perms`. +5. Ouvrir le groupe `admin` (système) — le champ `Name` doit être désactivé et la mention « System groups cannot be renamed. » visible. +6. Tenter de supprimer le groupe `admin` → bouton **Delete** invisible (les groupes système sont protégés côté UI **et** serveur). + +### 3.3 Page Invitations (`/admin/invitations`) +1. Cliquer **+ New invitation** → modale `new invitation`. +2. Email hint : `alice@metamorph.local`. Cocher `pentest-2026-Q2`. +3. **Generate link** → modale `invitation link` montrant l'URL one-shot ; bouton **Copy** disponible. +4. Coller l'URL dans un onglet privé → page `/register` pré-remplie avec l'email hint. Compléter le mot de passe, valider. + +### 3.4 Page Users (`/admin/users`) +1. Liste les comptes existants. Chercher `alice` dans le champ **Search**. +2. Cliquer **Edit** sur Alice → modale `alice@metamorph.local`. Vérifier ses groupes (cochés : `pentest-2026-Q2`). +3. Décocher `pentest-2026-Q2`, cocher `redteam`. **Save changes**. +4. Recharger la page → Alice a maintenant le badge `redteam`. + +### 3.5 Last-admin protection +1. Tenter de soft-delete l'admin courant via l'API : + ```bash + ACCESS=$(curl -s -X POST http://localhost:8080/api/v1/auth/login \ + -H 'Content-Type: application/json' \ + -d '{"email":"admin@metamorph.local","password":"AdminPass1234!"}' | jq -r .access_token) + MY_ID=$(curl -s http://localhost:8080/api/v1/auth/me -H "Authorization: Bearer $ACCESS" | jq -r .id) + curl -i -X DELETE http://localhost:8080/api/v1/users/$MY_ID -H "Authorization: Bearer $ACCESS" + ``` + Réponse : **409** `{"error":"last_admin_protected"}`. + +### 3.6 Vérification RBAC côté serveur (non-admin) +1. Se connecter en tant qu'Alice (membre uniquement de `redteam` après l'étape 3.4) : + ```bash + ACCESS=$(curl -s -X POST http://localhost:8080/api/v1/auth/login \ + -H 'Content-Type: application/json' \ + -d '{"email":"alice@metamorph.local","password":""}' | jq -r .access_token) + ``` +2. `/users` → **403** : `redteam` n'a pas `user.read`. +3. `POST /groups` → **403** : `redteam` n'a pas `group.create`. +4. Naviguer dans le browser sur `/admin/users` → redirige vers `/` (RequireAdmin client-side). + +### 3.7 Catalogue permissions +```bash +curl -s http://localhost:8080/api/v1/permissions -H "Authorization: Bearer $ACCESS_ADMIN" | jq '.items | length' +``` +Doit retourner `31` (cf. `app/services/permissions_seed.py:PERMISSION_CATALOGUE`). + +## 4. Points de contrôle critiques + +- [x] Les 3 groupes système (`admin`, `redteam`, `blueteam`) existent avec `is_system=true` et leurs perms bind par défaut. +- [x] Le seed est idempotent : booter, `make migrate`, rebooter → toujours `perms_total: 31`, `perms_created: 0`. +- [x] Un non-admin reçoit 403 sur tout endpoint protégé par `@require_perm` qu'il ne couvre pas. +- [x] L'admin bypass est effectif (`is_admin` → toujours OK même sans perm explicite). +- [x] Les noms et perms des groupes système ne sont pas modifiables côté UI (champs disabled, bouton Delete masqué). +- [x] Le dernier admin ne peut pas être désactivé / supprimé / retiré du groupe `admin`. +- [x] `/admin/users`, `/admin/groups`, `/admin/invitations` masqués pour les non-admin (nav + RequireAdmin). +- [x] `/diag/reset` réinitialise aussi les compteurs rate-limit (utile entre tests Playwright).