feat(backend): add USER_MANAGE permission + delta migration (D-015)

Adds `Permission.USER_MANAGE = "user.manage"` to the F11 matrix. rt_lead
already holds ALL_PERMISSIONS so GROUP_PERMISSIONS is unchanged — rt_lead
gets the new permission automatically, rt_operator and soc_analyst get 403.

Alembic migration `202605230001_add_user_manage_permission`:
- inserts the `user.manage` row into `permission`,
- inserts the `(rt_lead, user.manage)` link into `group_permission`,
- exposes `_DELTA_PERMISSIONS` / `_DELTA_GROUP_PERMISSIONS` for parity tests.

The previous `test_frozen_*_matches_runtime` invariant (MA3) is generalised
to "runtime = initial frozen ∪ deltas of every migration in `_DELTAS`". New
migrations register themselves there without editing the historical one.

Verbatim wording from spec-analyst is recorded as D-015 in
`tasks/spec-decisions.md` (separate commit).
This commit is contained in:
knacky
2026-05-23 15:52:47 +02:00
parent 140a34b81e
commit 48a1c756bf
3 changed files with 132 additions and 21 deletions

View File

@@ -0,0 +1,75 @@
"""add `user.manage` permission + link to rt_lead (D-015)
Revision ID: 202605230001
Revises: 202605210001
Create Date: 2026-05-23
"""
from __future__ import annotations
from uuid import NAMESPACE_DNS, uuid5
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects.postgresql import UUID
revision: str = "202605230001"
down_revision: str | None = "202605210001"
branch_labels: str | None = None
depends_on: str | None = None
_PERMISSION_CODE = "user.manage"
_GROUP_NAME = "rt_lead"
# Frozen delta exposed for the migration-seed parity test
# (see tests/unit/test_migration_seed.py). The runtime matrix must equal the
# union of the initial migration freeze + every subsequent migration delta.
_DELTA_PERMISSIONS: tuple[str, ...] = (_PERMISSION_CODE,)
_DELTA_GROUP_PERMISSIONS: dict[str, frozenset[str]] = {
_GROUP_NAME: frozenset({_PERMISSION_CODE}),
}
def _pid(code: str) -> str:
"""Same hashing scheme as the initial-schema seed (frozen scheme)."""
return str(uuid5(NAMESPACE_DNS, f"mimic.permission.{code}"))
def _gid(name: str) -> str:
return str(uuid5(NAMESPACE_DNS, f"mimic.group.{name}"))
def upgrade() -> None:
permission_table = sa.table(
"permission",
sa.column("id", UUID(as_uuid=True)),
sa.column("code", sa.String),
sa.column("description", sa.String),
)
op.bulk_insert(
permission_table,
[{"id": _pid(_PERMISSION_CODE), "code": _PERMISSION_CODE, "description": None}],
)
group_permission_table = sa.table(
"group_permission",
sa.column("group_id", UUID(as_uuid=True)),
sa.column("permission_id", UUID(as_uuid=True)),
)
op.bulk_insert(
group_permission_table,
[{"group_id": _gid(_GROUP_NAME), "permission_id": _pid(_PERMISSION_CODE)}],
)
def downgrade() -> None:
bind = op.get_bind()
bind.exec_driver_sql(
sa.text("DELETE FROM group_permission WHERE permission_id = :pid").bindparams(
pid=_pid(_PERMISSION_CODE)
)
)
bind.exec_driver_sql(
sa.text("DELETE FROM permission WHERE code = :code").bindparams(code=_PERMISSION_CODE)
)

View File

@@ -54,6 +54,9 @@ class Permission(enum.StrEnum):
# Audit
AUDIT_READ = "audit.read"
# User management (D-015): gates all /api/v1/users CRUD. rt_lead only.
USER_MANAGE = "user.manage"
ALL_PERMISSIONS: tuple[Permission, ...] = tuple(Permission)

View File

@@ -1,39 +1,72 @@
"""MA3: the frozen RBAC seed in the initial migration must keep matching
the runtime F11 matrix in `mimic.rbac.matrix`. When they drift, *do not* edit
the migration in place — write a new migration. This test enforces it.
"""Migration-seed parity test (MA3 + sprint 2 delta).
The runtime F11 matrix in `mimic.rbac.matrix` must equal the union of:
- the inline frozen snapshot in the initial migration `202605210001`, plus
- every per-migration `_DELTA_PERMISSIONS` / `_DELTA_GROUP_PERMISSIONS` added
by later migrations.
When the runtime drifts, *do not* edit an existing migration: write a new
one with its own delta block and extend the `_MIGRATIONS` tuple below.
"""
from __future__ import annotations
import importlib
from types import ModuleType
from mimic.rbac.matrix import GROUP_PERMISSIONS, GroupName, Permission
def _load_migration():
return importlib.import_module("mimic.db.migrations.versions.202605210001_initial_schema")
_INITIAL = "mimic.db.migrations.versions.202605210001_initial_schema"
_DELTAS: tuple[str, ...] = ("mimic.db.migrations.versions.202605230001_add_user_manage_permission",)
def test_frozen_permission_list_matches_runtime() -> None:
migration = _load_migration()
def _load(name: str) -> ModuleType:
return importlib.import_module(name)
def _cumulative_permissions() -> set[str]:
initial = _load(_INITIAL)
codes = set(initial._PERMISSIONS_FROZEN)
for name in _DELTAS:
delta = _load(name)
codes.update(delta._DELTA_PERMISSIONS)
return codes
def _cumulative_group_permissions() -> dict[str, set[str]]:
initial = _load(_INITIAL)
cumulative = {gn: set(perms) for gn, perms in initial._GROUP_PERMISSIONS_FROZEN.items()}
for name in _DELTAS:
delta = _load(name)
# `rt_lead` carries ALL_PERMISSIONS at runtime — every delta perm
# implicitly extends rt_lead too, regardless of whether the migration
# listed it explicitly.
rt_lead_implicit = {p for perms in delta._DELTA_GROUP_PERMISSIONS.values() for p in perms}
cumulative.setdefault("rt_lead", set()).update(rt_lead_implicit)
for group_name, perms in delta._DELTA_GROUP_PERMISSIONS.items():
cumulative.setdefault(group_name, set()).update(perms)
return cumulative
def test_runtime_permissions_match_cumulative_migrations() -> None:
runtime_codes = {p.value for p in Permission}
frozen_codes = set(migration._PERMISSIONS_FROZEN)
assert runtime_codes == frozen_codes, (
"Permission enum drifted from the migration freeze; "
"write a new migration, do not edit the existing one."
cumulative = _cumulative_permissions()
assert runtime_codes == cumulative, (
"Permission enum drifted from the cumulative migration freeze; "
"write a new migration delta, do not edit existing ones."
)
def test_frozen_group_membership_matches_runtime() -> None:
migration = _load_migration()
def test_runtime_group_membership_matches_cumulative_migrations() -> None:
runtime = {gn.value: {p.value for p in perms} for gn, perms in GROUP_PERMISSIONS.items()}
frozen = {gn: set(perms) for gn, perms in migration._GROUP_PERMISSIONS_FROZEN.items()}
assert runtime == frozen, (
"GROUP_PERMISSIONS drifted from the migration freeze; "
"write a new migration, do not edit the existing one."
cumulative = _cumulative_group_permissions()
assert runtime == cumulative, (
"GROUP_PERMISSIONS drifted from the cumulative migration freeze; "
"write a new migration delta, do not edit existing ones."
)
def test_frozen_group_names_match_enum() -> None:
migration = _load_migration()
assert set(migration._GROUP_PERMISSIONS_FROZEN.keys()) == {g.value for g in GroupName}
def test_initial_frozen_group_names_match_enum() -> None:
initial = _load(_INITIAL)
assert set(initial._GROUP_PERMISSIONS_FROZEN.keys()) == {g.value for g in GroupName}