fix(backend): align with D-008/D-009 (drop ttp_version, seed F11 matrix)

D-009 reaffirms spec H32: no `ttp_version` table. Replayability lives solely
on `run.snapshot_json`. The previous initial migration introduced a separate
`ttp_version` aggregate by mistake — removed here.

D-008 requires the bootstrap to seed exactly the three F11 groups
(`rt_operator`, `rt_lead`, `soc_analyst`) with exactly the F11 permission
matrix. The migration now:
- inserts every `Permission` enum value into the `permission` table,
- inserts the three groups with deterministic uuid5(NAMESPACE_DNS, ...) ids,
- inserts the matching `group_permission` rows from GROUP_PERMISSIONS.

Also renames `ttp.current_version` to `ttp.version` (matches §8 spec column
name; the value remains informational per H32 / D-009).
This commit is contained in:
knacky
2026-05-21 20:44:37 +02:00
parent 887182cfd7
commit d470db97d9
3 changed files with 65 additions and 88 deletions

View File

@@ -265,29 +265,13 @@ def upgrade() -> None:
sa.Column("is_stealth_variant", sa.Boolean, nullable=False, server_default=sa.false()), sa.Column("is_stealth_variant", sa.Boolean, nullable=False, server_default=sa.false()),
sa.Column("source", TTP_SOURCE, nullable=False, server_default="custom"), sa.Column("source", TTP_SOURCE, nullable=False, server_default="custom"),
sa.Column("tags", JSONB, nullable=False, server_default="[]"), sa.Column("tags", JSONB, nullable=False, server_default="[]"),
sa.Column("current_version", sa.Integer, nullable=False, server_default="1"), sa.Column("version", sa.Integer, nullable=False, server_default="1"),
sa.Column("is_published", sa.Boolean, nullable=False, server_default=sa.false()), sa.Column("is_published", sa.Boolean, nullable=False, server_default=sa.false()),
sa.Column("created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")), sa.Column("created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
) )
# No `ttp_version` table — H32 / D-009: snapshot lives on run.snapshot_json.
op.create_table(
"ttp_version",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"ttp_id",
UUID(as_uuid=True),
sa.ForeignKey("ttp.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("version", sa.Integer, nullable=False),
sa.Column("snapshot_json", JSONB, nullable=False),
sa.Column("content_sha256", sa.String(64), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")),
sa.UniqueConstraint("ttp_id", "version", name="uq_ttp_version_ttp_id_version"),
)
# -------------------------------------------------------------- scenario # -------------------------------------------------------------- scenario
op.create_table( op.create_table(
@@ -538,34 +522,72 @@ def upgrade() -> None:
) )
# ---------------------------------------------------------------- seed RBAC # ---------------------------------------------------------------- seed RBAC
# D-008: exactly the 3 F11 groups, with exactly the F11 permission matrix.
# The matrix is the authoritative source — see mimic.rbac.matrix.
_seed_rbac()
def _seed_rbac() -> None:
"""Seed `permission` + `group` + `group_permission` from F11 (D-008)."""
from uuid import NAMESPACE_DNS, uuid5 # noqa: PLC0415 (avoid pulling at migration import)
from mimic.rbac.matrix import GROUP_PERMISSIONS, GroupName, Permission # noqa: PLC0415
def _gid(name: GroupName) -> str:
return str(uuid5(NAMESPACE_DNS, f"mimic.group.{name.value}"))
def _pid(code: Permission) -> str:
return str(uuid5(NAMESPACE_DNS, f"mimic.permission.{code.value}"))
permission_table = sa.table(
"permission",
sa.column("id", UUID(as_uuid=True)),
sa.column("code", sa.String),
sa.column("description", sa.String),
)
op.bulk_insert( op.bulk_insert(
sa.table( permission_table,
[{"id": _pid(p), "code": p.value, "description": None} for p in Permission],
)
group_table = sa.table(
"group", "group",
sa.column("id", UUID(as_uuid=True)), sa.column("id", UUID(as_uuid=True)),
sa.column("name", sa.String), sa.column("name", sa.String),
sa.column("description", sa.String), sa.column("description", sa.String),
sa.column("created_at", sa.DateTime(timezone=True)), )
sa.column("updated_at", sa.DateTime(timezone=True)), op.bulk_insert(
), group_table,
[ [
{ {
"id": "11111111-0000-0000-0000-000000000001", "id": _gid(GroupName.RT_OPERATOR),
"name": "rt_operator", "name": GroupName.RT_OPERATOR.value,
"description": "Red team operator (per-engagement scope).", "description": "Red team operator (per-engagement scope).",
}, },
{ {
"id": "11111111-0000-0000-0000-000000000002", "id": _gid(GroupName.RT_LEAD),
"name": "rt_lead", "name": GroupName.RT_LEAD.value,
"description": "Red team lead (full RT privileges).", "description": "Red team lead (full RT privileges).",
}, },
{ {
"id": "11111111-0000-0000-0000-000000000003", "id": _gid(GroupName.SOC_ANALYST),
"name": "soc_analyst", "name": GroupName.SOC_ANALYST.value,
"description": "SOC analyst (per-engagement, scoped via soc_session).", "description": "SOC analyst (scoped via soc_session).",
}, },
], ],
) )
group_permission_table = sa.table(
"group_permission",
sa.column("group_id", UUID(as_uuid=True)),
sa.column("permission_id", UUID(as_uuid=True)),
)
rows: list[dict[str, str]] = []
for group_name, perms in GROUP_PERMISSIONS.items():
for perm in perms:
rows.append({"group_id": _gid(group_name), "permission_id": _pid(perm)})
op.bulk_insert(group_permission_table, rows)
def downgrade() -> None: def downgrade() -> None:
for table in ( for table in (
@@ -579,7 +601,6 @@ def downgrade() -> None:
"run", "run",
"scenario_step", "scenario_step",
"scenario", "scenario",
"ttp_version",
"ttp", "ttp",
"host", "host",
"c2_credential", "c2_credential",

View File

@@ -9,7 +9,7 @@ from mimic.db.models.report import Report
from mimic.db.models.run import Run, RunStep, RunStepCleanup from mimic.db.models.run import Run, RunStep, RunStepCleanup
from mimic.db.models.scenario import Scenario, ScenarioStep from mimic.db.models.scenario import Scenario, ScenarioStep
from mimic.db.models.soc_session import SocSession from mimic.db.models.soc_session import SocSession
from mimic.db.models.ttp import Ttp, TtpVersion from mimic.db.models.ttp import Ttp
from mimic.db.models.user import User from mimic.db.models.user import User
__all__ = [ __all__ = [
@@ -31,7 +31,6 @@ __all__ = [
"ScenarioStep", "ScenarioStep",
"SocSession", "SocSession",
"Ttp", "Ttp",
"TtpVersion",
"User", "User",
"UserGroup", "UserGroup",
] ]

View File

@@ -1,38 +1,29 @@
"""TTP library + immutable version snapshots. """TTP library.
Note (D-T-ttp-version): spec H32 originally said no `ttp_version` table (snapshot Spec H32 / D-009: there is **no** `ttp_version` table. The replayability
lives on `run.snapshot_json`). Sprint 0 reintroduces a `ttp_version` table for snapshot lives solely on `run.snapshot_json` (a self-contained JSONB blob
clean traceability across runs and to honor the kickoff data-model directive captured at run start). `ttp.version` stays here as a purely informational
from the team-lead. `run.snapshot_json` remains the source of truth for replay, counter (§8) — bumped on edit, never referenced for replay.
but each promotion / edit produces an immutable `ttp_version` row that's easier
to diff and reference from imports / audit log.
""" """
from __future__ import annotations from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID from uuid import UUID
from sqlalchemy import ( from sqlalchemy import (
JSON, JSON,
Boolean, Boolean,
DateTime,
Enum, Enum,
ForeignKey, ForeignKey,
Integer, Integer,
String, String,
Text, Text,
UniqueConstraint,
) )
from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.orm import Mapped, mapped_column
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
from mimic.db.types import PayloadType, TtpSource from mimic.db.types import PayloadType, TtpSource
if TYPE_CHECKING:
pass
class Ttp(UuidPkMixin, TimestampsMixin, Base): class Ttp(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "ttp" __tablename__ = "ttp"
@@ -61,45 +52,11 @@ class Ttp(UuidPkMixin, TimestampsMixin, Base):
) )
tags: Mapped[list[str]] = mapped_column(JSON, default=list, nullable=False) tags: Mapped[list[str]] = mapped_column(JSON, default=list, nullable=False)
current_version: Mapped[int] = mapped_column(Integer, default=1, nullable=False) version: Mapped[int] = mapped_column(Integer, default=1, nullable=False)
# Informational (§8). Bumped on edit. Never used for replay (H32 / D-009).
is_published: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) is_published: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
# is_published = promoted to the library (lead RT only — F11). # Promoted to the library (lead RT only — F11).
created_by_id: Mapped[UUID | None] = mapped_column( created_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL") ForeignKey("user.id", ondelete="SET NULL")
) )
versions: Mapped[list[TtpVersion]] = relationship(
back_populates="ttp",
cascade="all, delete-orphan",
order_by="TtpVersion.version",
)
class TtpVersion(UuidPkMixin, Base):
"""Immutable snapshot of a TTP at the moment it was published or used.
Used by importers / audit / report builder. `run.snapshot_json` still
embeds a full self-contained copy for replay independence.
"""
__tablename__ = "ttp_version"
__table_args__ = (
UniqueConstraint("ttp_id", "version", name="uq_ttp_version_ttp_id_version"),
)
ttp_id: Mapped[UUID] = mapped_column(
ForeignKey("ttp.id", ondelete="CASCADE"),
nullable=False,
)
version: Mapped[int] = mapped_column(Integer, nullable=False)
snapshot_json: Mapped[dict] = mapped_column(JSON, nullable=False)
content_sha256: Mapped[str] = mapped_column(String(64), nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
created_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
ttp: Mapped[Ttp] = relationship(back_populates="versions")