Files
Metamorph/backend/app/services/groups.py

211 lines
7.1 KiB
Python
Raw Permalink Normal View History

feat(m3): RBAC — atomic perms, groups, users, admin SPA pages Permission catalogue (services/permissions_seed.py) - 31 atomic codes across 10 families: user.*, group.*, invitation.*, test_template.*, scenario_template.*, mission.* (incl. mission.write_red_fields + mission.write_blue_fields), detection_level.{read,update}, setting.{read,update}, mitre.sync. - Default bindings: admin = all 31; redteam = 8 (catalogue read + mission. {read,create,update,archive,write_red_fields} + detection_level.read); blueteam = 5 (catalogue read + mission.{read,write_blue_fields} + detection_level.read). - Seed runs at boot AND after /setup so a freshly truncated DB (via /diag/reset) gets the bindings back via the bootstrap path. Idempotent + additive (never removes a perm from a system group). Users admin (services/users.py + api/users.py) - list (q + is_active filter + pagination), get, patch (display_name / locale / is_active with tri-state sentinel for clear-vs-unset), soft-delete, set groups. - Last-admin protection on update (deactivate), delete, and group-strip (refusing to remove the admin group from the last active admin). Groups admin (services/groups.py + api/groups.py) - Full CRUD with system-group protection (no rename, no delete on admin/redteam/blueteam). - PUT /groups/{id}/permissions sets the perm list. - Admin system group's perm set is locked to the full catalogue (SystemGroupProtected → 409) — preserves the bypass invariant even if a future refactor moves to perm-based checks. Permissions read-only (api/permissions.py) - GET /permissions returns the catalogue (admin or group.read holders). /diag/reset extension - After truncate + token mint, the limiter is also reset (limiter.reset()) so the Playwright suite doesn't hit 10/min budgets across spec files. Guarded by limiter.enabled to no-op in APP_ENV=test. Rate-limit scope (core/rate_limit.py) - enabled = APP_ENV in ("prod", "staging"). A staging deployment serves humans, so it gets the limits too. Dev/test stay unthrottled for Playwright ergonomics. Spec §6 NF-security is an operator-facing requirement. Frontend chrome - components/RequireAdmin.tsx + ui/Modal.tsx (reusable centered dialog with accessible name + Escape + backdrop-click). - Layout.tsx shows Admin nav links only when is_admin === true. Server remains the arbiter — non-admins hitting /admin/* get redirected to /. Frontend pages - pages/AdminUsersPage.tsx, AdminGroupsPage.tsx, AdminInvitationsPage.tsx with edit modals using TanStack Query mutations + multi-select for perms grouped by family + copy-once invitation URL display. - lib/admin.ts: shared types + query keys + groupPermsByFamily helper. - lib/api.ts: apiPatch / apiPut / apiDelete added. Playwright config (e2e/playwright.config.ts) - workers: 1 + fullyParallel: false: spec files share the live Postgres, so concurrent /diag/reset calls clobber each other. Intra-file order preserved via test.describe.configure({ mode: 'serial' }). Testing - backend/tests/test_rbac.py: 15 integration tests (39 backend total — 1 health + 8 schema + 15 auth + 15 RBAC). - e2e/tests/m3-rbac.spec.ts: 8 Playwright tests covering DoD §10 #2/#3 (28 e2e total — 8 M0 + 4 M1 + 8 M2 + 8 M3). - tasks/testing-m3.md. DoD: make test-api → 39 passed, make e2e → 28 passed. Spec-reviewer pass applied (admin perm invariant + staging rate-limit scope). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 06:17:07 +02:00
"""Admin-side group management: CRUD + permission bindings.
System groups (`is_system=True`: admin, redteam, blueteam) cannot be renamed
or deleted, but their permission bindings are seeded on boot and editable
afterwards (e.g. an admin can broaden `redteam`).
"""
from __future__ import annotations
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from sqlalchemy import func, select
from app.db.session import session_scope
from app.models.auth import Group, GroupPermission, Permission, UserGroup
from app.services.bootstrap import ADMIN_GROUP_NAME
class GroupNotFound(Exception):
pass
class GroupNameConflict(Exception):
pass
class SystemGroupProtected(Exception):
"""Refusing to delete or rename a built-in system group."""
@dataclass(frozen=True)
class GroupView:
id: uuid.UUID
name: str
description: str | None
is_system: bool
deleted_at: datetime | None
members_count: int
permissions: list[str]
created_at: datetime
updated_at: datetime
def _to_view(g: Group, members_count: int) -> GroupView:
return GroupView(
id=g.id,
name=g.name,
description=g.description,
is_system=g.is_system,
deleted_at=g.deleted_at,
members_count=members_count,
permissions=sorted(p.code for p in g.permissions),
created_at=g.created_at,
updated_at=g.updated_at,
)
def _members_counts(s, group_ids: list[uuid.UUID]) -> dict[uuid.UUID, int]:
if not group_ids:
return {}
from app.models.auth import User as _U # local to avoid model cycles
rows = s.execute(
select(UserGroup.group_id, func.count(UserGroup.user_id))
.join(_U, _U.id == UserGroup.user_id)
.where(UserGroup.group_id.in_(group_ids), _U.deleted_at.is_(None))
.group_by(UserGroup.group_id)
).all()
return {gid: int(cnt) for gid, cnt in rows}
def list_groups(*, include_deleted: bool = False) -> list[GroupView]:
with session_scope() as s:
stmt = select(Group).order_by(Group.is_system.desc(), Group.name.asc())
if not include_deleted:
stmt = stmt.where(Group.deleted_at.is_(None))
rows = s.scalars(stmt).all()
counts = _members_counts(s, [g.id for g in rows])
return [_to_view(g, counts.get(g.id, 0)) for g in rows]
def get_group(group_id: uuid.UUID) -> GroupView:
with session_scope() as s:
g = s.get(Group, group_id)
if g is None or g.deleted_at is not None:
raise GroupNotFound()
counts = _members_counts(s, [g.id])
return _to_view(g, counts.get(g.id, 0))
def create_group(*, name: str, description: str | None) -> GroupView:
name_norm = name.strip()
if not name_norm:
raise ValueError("name is required")
with session_scope() as s:
existing = s.scalar(
select(Group).where(Group.name == name_norm, Group.deleted_at.is_(None))
)
if existing is not None:
raise GroupNameConflict(f"group name {name_norm!r} already in use")
g = Group(name=name_norm, description=(description or "").strip() or None, is_system=False)
s.add(g)
s.flush()
return _to_view(g, 0)
def update_group(
group_id: uuid.UUID,
*,
name: str | None = None,
description: str | None | object = ...,
) -> GroupView:
with session_scope() as s:
g = s.get(Group, group_id)
if g is None or g.deleted_at is not None:
raise GroupNotFound()
if name is not None:
name_norm = name.strip()
if not name_norm:
raise ValueError("name cannot be empty")
if g.is_system and name_norm != g.name:
raise SystemGroupProtected("system groups cannot be renamed")
if name_norm != g.name:
clash = s.scalar(
select(Group).where(
Group.name == name_norm,
Group.deleted_at.is_(None),
Group.id != g.id,
)
)
if clash is not None:
raise GroupNameConflict(f"group name {name_norm!r} already in use")
g.name = name_norm
if description is not ...:
if description in (None, ""):
g.description = None
else:
g.description = description.strip() or None
counts = _members_counts(s, [g.id])
return _to_view(g, counts.get(g.id, 0))
def soft_delete_group(group_id: uuid.UUID) -> None:
with session_scope() as s:
g = s.get(Group, group_id)
if g is None or g.deleted_at is not None:
raise GroupNotFound()
if g.is_system:
raise SystemGroupProtected("system groups cannot be deleted")
g.deleted_at = datetime.now(tz=timezone.utc)
def set_group_permissions(group_id: uuid.UUID, codes: list[str]) -> GroupView:
"""Replace the group's permission set with the given codes (validated)."""
desired_codes = set(codes)
with session_scope() as s:
g = s.get(Group, group_id)
if g is None or g.deleted_at is not None:
raise GroupNotFound()
# Preserve the invariant "the system `admin` group has every perm." The
# decorator's admin bypass relies on `is_admin` (group membership), not
# on the perm set, so a stripped admin group would still grant access —
# but the listing would look misleading and a future refactor could
# reasonably switch the bypass to a perm-based check.
if g.is_system and g.name == ADMIN_GROUP_NAME:
all_codes = {p.code for p in s.scalars(select(Permission)).all()}
if desired_codes != all_codes:
raise SystemGroupProtected(
"the admin group must keep every permission"
)
if desired_codes:
perms = s.scalars(select(Permission).where(Permission.code.in_(desired_codes))).all()
known = {p.code for p in perms}
unknown = desired_codes - known
if unknown:
raise ValueError(f"unknown permission codes: {sorted(unknown)}")
else:
perms = []
current = {p.code: p for p in g.permissions}
to_remove = set(current) - desired_codes
to_add = desired_codes - set(current)
for code in to_remove:
row = s.get(GroupPermission, (g.id, current[code].id))
if row is not None:
s.delete(row)
for p in perms:
if p.code in to_add:
s.add(GroupPermission(group_id=g.id, permission_id=p.id))
s.flush()
s.refresh(g)
counts = _members_counts(s, [g.id])
return _to_view(g, counts.get(g.id, 0))
def list_permissions() -> list[dict]:
"""Return the catalogue of all permissions known to the platform."""
with session_scope() as s:
rows = s.scalars(select(Permission).order_by(Permission.code.asc())).all()
return [
{"id": str(p.id), "code": p.code, "description": p.description}
for p in rows
]