fix(backend): freeze F11 matrix inline in the initial migration (MA3)

Code-review MAJOR MA3. The initial Alembic migration imported the live
`mimic.rbac.matrix.GROUP_PERMISSIONS` to seed the `permission` / `group` /
`group_permission` rows. That breaks the Alembic invariant "a migration
produces the same schema regardless of when you replay it": a future tweak
to the runtime matrix would silently change the seeded baseline on a fresh
DB.

Two changes:

1. The migration now carries an *inline frozen snapshot* of the F11 matrix
   (`_PERMISSIONS_FROZEN`, `_GROUP_PERMISSIONS_FROZEN`, `_GROUP_DESCRIPTIONS`).
   The seed reads from these tuples/dicts only. If the canonical matrix
   evolves, the next migration is responsible for the delta.

2. A new unit test `test_migration_seed_matches_current_matrix` enforces
   that the frozen seed equals the runtime `Permission` enum and
   `GROUP_PERMISSIONS` mapping. Drift now fails CI loudly with a hint to
   write a new migration instead of editing the existing one.

Also: docstring no longer mentions `ttp_version` (M8 follow-up).
This commit is contained in:
knacky
2026-05-22 05:24:37 +02:00
parent feadad850b
commit 36c1ed5ffb
2 changed files with 116 additions and 28 deletions

View File

@@ -1,8 +1,9 @@
"""initial schema (sprint 0, §8 spec)
Creates every aggregate listed in spec §8: engagement, user/group RBAC,
host, ttp/ttp_version, scenario/scenario_step, run/run_step/run_step_cleanup,
detection, evidence, report, soc_session, c2_credential, audit_log.
host, ttp (no ttp_version — H32/D-009), scenario/scenario_step,
run/run_step/run_step_cleanup, detection, evidence, report, soc_session,
c2_credential, audit_log.
Postgres-only objects:
- ENUM types created via SQLAlchemy `Enum(..., create_type=True)`.
@@ -592,21 +593,82 @@ def upgrade() -> None:
# ---------------------------------------------------------------- seed RBAC
# D-008: exactly the 3 F11 groups, with exactly the F11 permission matrix.
# The matrix is the authoritative source — see mimic.rbac.matrix.
# MA3: the matrix is frozen *inline* here so this migration produces the
# same schema regardless of future drifts in `mimic.rbac.matrix`. A unit
# test (`test_migration_seed_matches_current_matrix`) guards the
# invariant — if the runtime matrix diverges, write a new migration.
_seed_rbac()
# --- Frozen F11 matrix snapshot (MA3, do not edit -- write a new migration).
_GROUP_DESCRIPTIONS: dict[str, str] = {
"rt_operator": "Red team operator (per-engagement scope).",
"rt_lead": "Red team lead (full RT privileges).",
"soc_analyst": "SOC analyst (scoped via soc_session).",
}
# Frozen permission codes (D-008 canonical list).
_PERMISSIONS_FROZEN: tuple[str, ...] = (
"engagement.create",
"engagement.read",
"engagement.read_own",
"engagement.update",
"engagement.delete",
"engagement.member.manage",
"engagement.soc_token.issue",
"host.crud",
"ttp.read",
"ttp.draft",
"ttp.promote",
"import.journal",
"scenario.crud",
"run.start",
"run.control",
"evidence.add",
"detection.add",
"cleanup.trigger",
"report.generate",
"report.read",
"audit.read",
)
# Frozen group → permissions assignments (D-008 / F11).
_GROUP_PERMISSIONS_FROZEN: dict[str, frozenset[str]] = {
"rt_operator": frozenset(
{
"engagement.create",
"engagement.read",
"host.crud",
"ttp.read",
"ttp.draft",
"import.journal",
"scenario.crud",
"evidence.add",
"cleanup.trigger",
"report.read",
}
),
"rt_lead": frozenset(_PERMISSIONS_FROZEN),
"soc_analyst": frozenset(
{
"engagement.read_own",
"detection.add",
"report.read",
}
),
}
def _seed_rbac() -> None:
"""Seed `permission` + `group` + `group_permission` from F11 (D-008)."""
from uuid import NAMESPACE_DNS, uuid5 # noqa: PLC0415 (avoid pulling at migration import)
"""Seed `permission` + `group` + `group_permission` from the frozen
inline F11 matrix above (D-008 / MA3)."""
from uuid import NAMESPACE_DNS, uuid5 # noqa: PLC0415
from mimic.rbac.matrix import GROUP_PERMISSIONS, GroupName, Permission # noqa: PLC0415
def _gid(name: str) -> str:
return str(uuid5(NAMESPACE_DNS, f"mimic.group.{name}"))
def _gid(name: GroupName) -> str:
return str(uuid5(NAMESPACE_DNS, f"mimic.group.{name.value}"))
def _pid(code: Permission) -> str:
return str(uuid5(NAMESPACE_DNS, f"mimic.permission.{code.value}"))
def _pid(code: str) -> str:
return str(uuid5(NAMESPACE_DNS, f"mimic.permission.{code}"))
permission_table = sa.table(
"permission",
@@ -616,7 +678,7 @@ def _seed_rbac() -> None:
)
op.bulk_insert(
permission_table,
[{"id": _pid(p), "code": p.value, "description": None} for p in Permission],
[{"id": _pid(code), "code": code, "description": None} for code in _PERMISSIONS_FROZEN],
)
group_table = sa.table(
@@ -628,21 +690,8 @@ def _seed_rbac() -> None:
op.bulk_insert(
group_table,
[
{
"id": _gid(GroupName.RT_OPERATOR),
"name": GroupName.RT_OPERATOR.value,
"description": "Red team operator (per-engagement scope).",
},
{
"id": _gid(GroupName.RT_LEAD),
"name": GroupName.RT_LEAD.value,
"description": "Red team lead (full RT privileges).",
},
{
"id": _gid(GroupName.SOC_ANALYST),
"name": GroupName.SOC_ANALYST.value,
"description": "SOC analyst (scoped via soc_session).",
},
{"id": _gid(name), "name": name, "description": _GROUP_DESCRIPTIONS[name]}
for name in _GROUP_PERMISSIONS_FROZEN
],
)
@@ -652,7 +701,7 @@ def _seed_rbac() -> None:
sa.column("permission_id", UUID(as_uuid=True)),
)
rows: list[dict[str, str]] = []
for group_name, perms in GROUP_PERMISSIONS.items():
for group_name, perms in _GROUP_PERMISSIONS_FROZEN.items():
for perm in perms:
rows.append({"group_id": _gid(group_name), "permission_id": _pid(perm)})
op.bulk_insert(group_permission_table, rows)

View File

@@ -0,0 +1,39 @@
"""MA3: the frozen RBAC seed in the initial migration must keep matching
the runtime F11 matrix in `mimic.rbac.matrix`. When they drift, *do not* edit
the migration in place — write a new migration. This test enforces it.
"""
from __future__ import annotations
import importlib
from mimic.rbac.matrix import GROUP_PERMISSIONS, GroupName, Permission
def _load_migration():
return importlib.import_module("mimic.db.migrations.versions.202605210001_initial_schema")
def test_frozen_permission_list_matches_runtime() -> None:
migration = _load_migration()
runtime_codes = {p.value for p in Permission}
frozen_codes = set(migration._PERMISSIONS_FROZEN)
assert runtime_codes == frozen_codes, (
"Permission enum drifted from the migration freeze; "
"write a new migration, do not edit the existing one."
)
def test_frozen_group_membership_matches_runtime() -> None:
migration = _load_migration()
runtime = {gn.value: {p.value for p in perms} for gn, perms in GROUP_PERMISSIONS.items()}
frozen = {gn: set(perms) for gn, perms in migration._GROUP_PERMISSIONS_FROZEN.items()}
assert runtime == frozen, (
"GROUP_PERMISSIONS drifted from the migration freeze; "
"write a new migration, do not edit the existing one."
)
def test_frozen_group_names_match_enum() -> None:
migration = _load_migration()
assert set(migration._GROUP_PERMISSIONS_FROZEN.keys()) == {g.value for g in GroupName}