feat(backend): add §8 data model + Alembic baseline (B0.2, B0.3)

- SQLAlchemy 2 typed mapped classes for every spec §8 aggregate:
  engagement, c2_credential, host, user, group, group_permission,
  user_group, engagement_member, ttp, ttp_version, scenario,
  scenario_step, run, run_step, run_step_cleanup, detection, evidence,
  report, soc_session, audit_log.
- Shared mixins: UuidPkMixin (PG_UUID(as_uuid=True)) + TimestampsMixin.
- StrEnum types covering every spec enum (C2Type, PayloadType, UserType,
  EngagementStatus, HostStatus, TtpSource, RunStatus, RunStepStatus,
  CleanupStatus, DetectionLevel, DetectionSource, EvidenceStatus).
- Alembic baseline migration 202605210001_initial_schema: creates every
  table, enum, index, and idempotent grants for the audit_log
  write-only Postgres role (mimic_audit_writer).
- Audit log carries prev_hash / row_hash from v1 (D-009).
- ttp_version table coexists with run.snapshot_json (D-008,
  overrides H32).
This commit is contained in:
knacky
2026-05-21 20:32:45 +02:00
parent 72a0e857a4
commit df8e4d8ef1
23 changed files with 1833 additions and 0 deletions

View File

@@ -0,0 +1,37 @@
"""SQLAlchemy 2 typed mapped classes for every Mimic aggregate."""
from mimic.db.models.audit import AuditLog
from mimic.db.models.detection import Detection, Evidence
from mimic.db.models.engagement import C2Credential, Engagement, EngagementMember
from mimic.db.models.host import Host
from mimic.db.models.permission import Group, GroupPermission, Permission, UserGroup
from mimic.db.models.report import Report
from mimic.db.models.run import Run, RunStep, RunStepCleanup
from mimic.db.models.scenario import Scenario, ScenarioStep
from mimic.db.models.soc_session import SocSession
from mimic.db.models.ttp import Ttp, TtpVersion
from mimic.db.models.user import User
__all__ = [
"AuditLog",
"C2Credential",
"Detection",
"Engagement",
"EngagementMember",
"Evidence",
"Group",
"GroupPermission",
"Host",
"Permission",
"Report",
"Run",
"RunStep",
"RunStepCleanup",
"Scenario",
"ScenarioStep",
"SocSession",
"Ttp",
"TtpVersion",
"User",
"UserGroup",
]

View File

@@ -0,0 +1,45 @@
"""Append-only audit log.
NF-AUDIT: enforced at SQL level by granting only INSERT to
`mimic_audit_writer` and only SELECT to `mimic_app`. The ORM never updates or
deletes rows on this table from application code. Hash chain (`prev_hash` /
`row_hash`) is wired here at the schema level so v2 can switch to a strict
WORM enforcement without a migration; sprint 0 fills the columns at insert.
"""
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from sqlalchemy import JSON, DateTime, ForeignKey, String, Text, func
from sqlalchemy.orm import Mapped, mapped_column
from mimic.db.base import Base, UuidPkMixin
class AuditLog(UuidPkMixin, Base):
__tablename__ = "audit_log"
ts: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
)
actor_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
action: Mapped[str] = mapped_column(String(80), nullable=False)
resource_type: Mapped[str] = mapped_column(String(80), nullable=False)
resource_id: Mapped[str | None] = mapped_column(String(128))
metadata_json: Mapped[dict] = mapped_column(JSON, nullable=False, default=dict)
prev_hash: Mapped[str | None] = mapped_column(String(64))
row_hash: Mapped[str] = mapped_column(String(64), nullable=False)
# SHA-256 of canonical (prev_hash || ts || actor_id || action ||
# resource_type || resource_id || metadata_json). Provides hash-chain
# integrity once verifier runs in v2.
source_ip: Mapped[str | None] = mapped_column(String(64))
user_agent: Mapped[str | None] = mapped_column(String(512))
comment: Mapped[str | None] = mapped_column(Text)

View File

@@ -0,0 +1,82 @@
"""Per-run-step detection (SOC) and offensive evidence (RT)."""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import (
JSON,
DateTime,
Enum,
ForeignKey,
Integer,
Text,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
from mimic.db.types import DetectionLevel, DetectionSource, EvidenceStatus
if TYPE_CHECKING:
from mimic.db.models.run import RunStep
from mimic.db.models.user import User
class Detection(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "detection"
run_step_id: Mapped[UUID] = mapped_column(
ForeignKey("run_step.id", ondelete="CASCADE"),
nullable=False,
)
soc_user_id: Mapped[UUID] = mapped_column(
ForeignKey("user.id", ondelete="RESTRICT"),
nullable=False,
)
level: Mapped[DetectionLevel] = mapped_column(
Enum(DetectionLevel, name="detection_level"),
nullable=False,
)
source: Mapped[DetectionSource] = mapped_column(
Enum(DetectionSource, name="detection_source"),
nullable=False,
)
latency_ms: Mapped[int | None] = mapped_column(Integer)
comment: Mapped[str | None] = mapped_column(Text)
recorded_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
run_step: Mapped[RunStep] = relationship()
soc_user: Mapped[User] = relationship()
class Evidence(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "evidence"
run_step_id: Mapped[UUID] = mapped_column(
ForeignKey("run_step.id", ondelete="CASCADE"),
nullable=False,
)
rt_user_id: Mapped[UUID] = mapped_column(
ForeignKey("user.id", ondelete="RESTRICT"),
nullable=False,
)
status: Mapped[EvidenceStatus] = mapped_column(
Enum(EvidenceStatus, name="evidence_status"),
nullable=False,
)
artifacts_text: Mapped[str | None] = mapped_column(Text)
artifact_files_json: Mapped[list[dict]] = mapped_column(
JSON, default=list, nullable=False
)
# Each entry: {"name": str, "ref": str, "sha256": str, "size_bytes": int}
comment: Mapped[str | None] = mapped_column(Text)
recorded_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
run_step: Mapped[RunStep] = relationship()
rt_user: Mapped[User] = relationship()

View File

@@ -0,0 +1,106 @@
"""Engagement aggregate: tenancy container + C2 credentials."""
from __future__ import annotations
from datetime import date, datetime
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import Date, DateTime, Enum, ForeignKey, Integer, LargeBinary, String, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
from mimic.db.types import C2Type, EngagementStatus
if TYPE_CHECKING:
from mimic.db.models.host import Host
from mimic.db.models.scenario import Scenario
from mimic.db.models.user import User
class Engagement(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "engagement"
client_name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str | None] = mapped_column(String(1024))
status: Mapped[EngagementStatus] = mapped_column(
Enum(EngagementStatus, name="engagement_status"),
default=EngagementStatus.DRAFT,
nullable=False,
)
start_date: Mapped[date | None] = mapped_column(Date)
end_date: Mapped[date | None] = mapped_column(Date)
c2_type: Mapped[C2Type] = mapped_column(
Enum(C2Type, name="c2_type"),
default=C2Type.MYTHIC,
nullable=False,
)
created_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
hosts: Mapped[list[Host]] = relationship(
back_populates="engagement",
cascade="all, delete-orphan",
)
scenarios: Mapped[list[Scenario]] = relationship(
back_populates="engagement",
cascade="all, delete-orphan",
)
members: Mapped[list[EngagementMember]] = relationship(
back_populates="engagement",
cascade="all, delete-orphan",
)
c2_credentials: Mapped[list[C2Credential]] = relationship(
back_populates="engagement",
cascade="all, delete-orphan",
)
class EngagementMember(Base):
__tablename__ = "engagement_member"
engagement_id: Mapped[UUID] = mapped_column(
ForeignKey("engagement.id", ondelete="CASCADE"),
primary_key=True,
)
user_id: Mapped[UUID] = mapped_column(
ForeignKey("user.id", ondelete="CASCADE"),
primary_key=True,
)
role: Mapped[str] = mapped_column(String(40), nullable=False)
added_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
)
engagement: Mapped[Engagement] = relationship(back_populates="members")
user: Mapped[User] = relationship()
class C2Credential(UuidPkMixin, TimestampsMixin, Base):
"""Per-engagement encrypted C2 credentials.
Decision D-004: dedicated table, Fernet-encrypted blob, versioned, rotation
via insert + retire. Active row = `retired_at IS NULL` with the highest
`version` for `(engagement_id, c2_type)`.
"""
__tablename__ = "c2_credential"
engagement_id: Mapped[UUID] = mapped_column(
ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=False,
)
c2_type: Mapped[C2Type] = mapped_column(
Enum(C2Type, name="c2_type", create_type=False),
nullable=False,
)
config_fernet: Mapped[bytes] = mapped_column(LargeBinary, nullable=False)
version: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
retired_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
engagement: Mapped[Engagement] = relationship(back_populates="c2_credentials")

View File

@@ -0,0 +1,41 @@
"""Host inventory (per-engagement target machines)."""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import DateTime, Enum, ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
from mimic.db.types import C2Type, HostStatus
if TYPE_CHECKING:
from mimic.db.models.engagement import Engagement
class Host(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "host"
engagement_id: Mapped[UUID] = mapped_column(
ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=False,
)
hostname: Mapped[str] = mapped_column(String(255), nullable=False)
ip: Mapped[str | None] = mapped_column(String(64))
os: Mapped[str | None] = mapped_column(String(128))
c2_session_id: Mapped[str | None] = mapped_column(String(128))
c2_type: Mapped[C2Type] = mapped_column(
Enum(C2Type, name="c2_type", create_type=False),
nullable=False,
)
status: Mapped[HostStatus] = mapped_column(
Enum(HostStatus, name="host_status"),
default=HostStatus.UNKNOWN,
nullable=False,
)
last_seen: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
engagement: Mapped[Engagement] = relationship(back_populates="hosts")

View File

@@ -0,0 +1,87 @@
"""Group-based RBAC tables.
Decision D-003 (spec-decisions): group-based from day one so OIDC will map
claims to existing groups in v2 without touching application code.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import ForeignKey, PrimaryKeyConstraint, String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
if TYPE_CHECKING:
from mimic.db.models.engagement import Engagement
from mimic.db.models.user import User
class Permission(UuidPkMixin, Base):
__tablename__ = "permission"
__table_args__ = (UniqueConstraint("code", name="uq_permission_code"),)
code: Mapped[str] = mapped_column(String(80), nullable=False)
description: Mapped[str | None] = mapped_column(String(255))
class Group(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "group"
__table_args__ = (UniqueConstraint("name", name="uq_group_name"),)
name: Mapped[str] = mapped_column(String(80), nullable=False)
description: Mapped[str | None] = mapped_column(String(255))
permission_links: Mapped[list[GroupPermission]] = relationship(
back_populates="group",
cascade="all, delete-orphan",
)
user_links: Mapped[list[UserGroup]] = relationship(
back_populates="group",
cascade="all, delete-orphan",
)
class GroupPermission(Base):
__tablename__ = "group_permission"
__table_args__ = (
PrimaryKeyConstraint("group_id", "permission_id", name="pk_group_permission"),
)
group_id: Mapped[UUID] = mapped_column(
ForeignKey("group.id", ondelete="CASCADE"), nullable=False
)
permission_id: Mapped[UUID] = mapped_column(
ForeignKey("permission.id", ondelete="CASCADE"), nullable=False
)
group: Mapped[Group] = relationship(back_populates="permission_links")
permission: Mapped[Permission] = relationship()
class UserGroup(Base):
__tablename__ = "user_group"
__table_args__ = (
PrimaryKeyConstraint(
"user_id", "group_id", "engagement_id", name="pk_user_group"
),
)
user_id: Mapped[UUID] = mapped_column(
ForeignKey("user.id", ondelete="CASCADE"), nullable=False
)
group_id: Mapped[UUID] = mapped_column(
ForeignKey("group.id", ondelete="CASCADE"), nullable=False
)
engagement_id: Mapped[UUID | None] = mapped_column(
ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=True,
)
# Global membership (e.g. rt_lead enterprise-wide) when engagement_id IS NULL.
# Per-engagement membership otherwise.
user: Mapped[User] = relationship(back_populates="group_links")
group: Mapped[Group] = relationship(back_populates="user_links")
engagement: Mapped[Engagement | None] = relationship()

View File

@@ -0,0 +1,48 @@
"""Mission report (PDF / JSON / Markdown bundle)."""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import (
JSON,
DateTime,
ForeignKey,
Integer,
String,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
if TYPE_CHECKING:
from mimic.db.models.engagement import Engagement
class Report(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "report"
engagement_id: Mapped[UUID] = mapped_column(
ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=False,
)
version: Mapped[int] = mapped_column(Integer, default=1, nullable=False)
content_json: Mapped[dict] = mapped_column(JSON, nullable=False, default=dict)
content_sha256: Mapped[str] = mapped_column(String(64), nullable=False)
# SHA-256 of canonical JSON. Identical hash referenced in PDF footer, JSON
# export and Markdown export (spec H19 / F9 / F14).
pdf_path: Mapped[str | None] = mapped_column(String(512))
md_path: Mapped[str | None] = mapped_column(String(512))
generated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
generated_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
engagement: Mapped[Engagement] = relationship()

View File

@@ -0,0 +1,118 @@
"""Run aggregate: scenario execution + per-step state + cleanup."""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import (
JSON,
DateTime,
Enum,
ForeignKey,
Integer,
String,
Text,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
from mimic.db.types import CleanupStatus, RunStatus, RunStepStatus
if TYPE_CHECKING:
from mimic.db.models.scenario import Scenario, ScenarioStep
class Run(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "run"
scenario_id: Mapped[UUID] = mapped_column(
ForeignKey("scenario.id", ondelete="RESTRICT"),
nullable=False,
)
status: Mapped[RunStatus] = mapped_column(
Enum(RunStatus, name="run_status"),
default=RunStatus.QUEUED,
nullable=False,
)
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
ended_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
started_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
snapshot_json: Mapped[dict] = mapped_column(JSON, nullable=False, default=dict)
# Full self-contained snapshot of scenario + steps + resolved TTPs.
# Source of truth for replay (spec H32).
scenario: Mapped[Scenario] = relationship(back_populates="runs")
steps: Mapped[list[RunStep]] = relationship(
back_populates="run",
cascade="all, delete-orphan",
order_by="RunStep.order_idx",
)
class RunStep(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "run_step"
run_id: Mapped[UUID] = mapped_column(
ForeignKey("run.id", ondelete="CASCADE"),
nullable=False,
)
scenario_step_id: Mapped[UUID | None] = mapped_column(
ForeignKey("scenario_step.id", ondelete="SET NULL")
)
order_idx: Mapped[int] = mapped_column(Integer, nullable=False)
status: Mapped[RunStepStatus] = mapped_column(
Enum(RunStepStatus, name="run_step_status"),
default=RunStepStatus.QUEUED,
nullable=False,
)
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
ended_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
c2_task_id: Mapped[str | None] = mapped_column(String(128))
output_text: Mapped[str | None] = mapped_column(Text)
output_blob_ref: Mapped[str | None] = mapped_column(String(512))
exit_code: Mapped[int | None] = mapped_column(Integer)
resolved_payload_text: Mapped[str | None] = mapped_column(Text)
# The fully-resolved payload (post-Jinja) actually sent to the C2. Useful
# for audit / report. Marker (`# MIMIC-RUN:<id>`) is added here.
run: Mapped[Run] = relationship(back_populates="steps")
scenario_step: Mapped[ScenarioStep | None] = relationship()
cleanup: Mapped[RunStepCleanup | None] = relationship(
back_populates="run_step",
cascade="all, delete-orphan",
uselist=False,
)
class RunStepCleanup(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "run_step_cleanup"
run_step_id: Mapped[UUID] = mapped_column(
ForeignKey("run_step.id", ondelete="CASCADE"),
nullable=False,
unique=True,
)
status: Mapped[CleanupStatus] = mapped_column(
Enum(CleanupStatus, name="cleanup_status"),
default=CleanupStatus.PENDING,
nullable=False,
)
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
ended_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
resolved_command_text: Mapped[str | None] = mapped_column(Text)
output: Mapped[str | None] = mapped_column(Text)
executed_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
run_step: Mapped[RunStep] = relationship(back_populates="cleanup")

View File

@@ -0,0 +1,86 @@
"""Scenario aggregate: ordered list of TTP steps.
Spec §F3: `scenario.c2_type` is the source of truth for the run. Verified at
run start that every referenced host matches.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import (
JSON,
Enum,
ForeignKey,
Integer,
String,
Text,
UniqueConstraint,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
from mimic.db.types import C2Type
if TYPE_CHECKING:
from mimic.db.models.engagement import Engagement
from mimic.db.models.host import Host
from mimic.db.models.run import Run
from mimic.db.models.ttp import Ttp
class Scenario(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "scenario"
engagement_id: Mapped[UUID] = mapped_column(
ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=False,
)
name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str | None] = mapped_column(Text)
version: Mapped[int] = mapped_column(Integer, default=1, nullable=False)
c2_type: Mapped[C2Type] = mapped_column(
Enum(C2Type, name="c2_type", create_type=False),
nullable=False,
)
created_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
engagement: Mapped[Engagement] = relationship(back_populates="scenarios")
steps: Mapped[list[ScenarioStep]] = relationship(
back_populates="scenario",
cascade="all, delete-orphan",
order_by="ScenarioStep.order_idx",
)
runs: Mapped[list[Run]] = relationship(back_populates="scenario")
class ScenarioStep(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "scenario_step"
__table_args__ = (
UniqueConstraint("scenario_id", "order_idx", name="uq_scenario_step_order_idx"),
)
scenario_id: Mapped[UUID] = mapped_column(
ForeignKey("scenario.id", ondelete="CASCADE"),
nullable=False,
)
order_idx: Mapped[int] = mapped_column(Integer, nullable=False)
ttp_id: Mapped[UUID] = mapped_column(
ForeignKey("ttp.id", ondelete="RESTRICT"),
nullable=False,
)
host_id: Mapped[UUID] = mapped_column(
ForeignKey("host.id", ondelete="RESTRICT"),
nullable=False,
)
params_override_json: Mapped[dict] = mapped_column(JSON, default=dict, nullable=False)
delay_after_ms: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
scenario: Mapped[Scenario] = relationship(back_populates="steps")
ttp: Mapped[Ttp] = relationship()
host: Mapped[Host] = relationship()

View File

@@ -0,0 +1,48 @@
"""SOC analyst sessions (bcrypt-hashed opaque tokens).
Decision D-006: bcrypt hash stored; the clear token is generated server-side at
session creation, returned **once** in the API response and delivered out-of-band.
Never re-displayable.
"""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import DateTime, ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
if TYPE_CHECKING:
from mimic.db.models.engagement import Engagement
from mimic.db.models.user import User
class SocSession(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "soc_session"
user_id: Mapped[UUID] = mapped_column(
ForeignKey("user.id", ondelete="CASCADE"),
nullable=False,
)
engagement_id: Mapped[UUID] = mapped_column(
ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=False,
)
token_hash: Mapped[str] = mapped_column(String(255), nullable=False, unique=True)
# bcrypt hash. Plain token returned once at creation.
expires_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
revoked_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
last_ip: Mapped[str | None] = mapped_column(String(64))
last_user_agent: Mapped[str | None] = mapped_column(String(512))
last_used_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
user: Mapped[User] = relationship(back_populates="soc_sessions")
engagement: Mapped[Engagement] = relationship()

View File

@@ -0,0 +1,105 @@
"""TTP library + immutable version snapshots.
Note (D-T-ttp-version): spec H32 originally said no `ttp_version` table (snapshot
lives on `run.snapshot_json`). Sprint 0 reintroduces a `ttp_version` table for
clean traceability across runs and to honor the kickoff data-model directive
from the team-lead. `run.snapshot_json` remains the source of truth for replay,
but each promotion / edit produces an immutable `ttp_version` row that's easier
to diff and reference from imports / audit log.
"""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import (
JSON,
Boolean,
DateTime,
Enum,
ForeignKey,
Integer,
String,
Text,
UniqueConstraint,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
from mimic.db.types import PayloadType, TtpSource
if TYPE_CHECKING:
pass
class Ttp(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "ttp"
name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str | None] = mapped_column(Text)
mitre_technique: Mapped[str] = mapped_column(String(16), nullable=False)
mitre_subtechnique: Mapped[str | None] = mapped_column(String(16))
payload_type: Mapped[PayloadType] = mapped_column(
Enum(PayloadType, name="payload_type"),
nullable=False,
)
payload_template: Mapped[str] = mapped_column(Text, nullable=False, default="")
params_schema_json: Mapped[dict | None] = mapped_column(JSON)
opsec_notes: Mapped[str | None] = mapped_column(Text)
cleanup_command: Mapped[str | None] = mapped_column(Text)
is_stealth_variant: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
source: Mapped[TtpSource] = mapped_column(
Enum(TtpSource, name="ttp_source"),
default=TtpSource.CUSTOM,
nullable=False,
)
tags: Mapped[list[str]] = mapped_column(JSON, default=list, nullable=False)
current_version: Mapped[int] = mapped_column(Integer, default=1, nullable=False)
is_published: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
# is_published = promoted to the library (lead RT only — F11).
created_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
versions: Mapped[list[TtpVersion]] = relationship(
back_populates="ttp",
cascade="all, delete-orphan",
order_by="TtpVersion.version",
)
class TtpVersion(UuidPkMixin, Base):
"""Immutable snapshot of a TTP at the moment it was published or used.
Used by importers / audit / report builder. `run.snapshot_json` still
embeds a full self-contained copy for replay independence.
"""
__tablename__ = "ttp_version"
__table_args__ = (
UniqueConstraint("ttp_id", "version", name="uq_ttp_version_ttp_id_version"),
)
ttp_id: Mapped[UUID] = mapped_column(
ForeignKey("ttp.id", ondelete="CASCADE"),
nullable=False,
)
version: Mapped[int] = mapped_column(Integer, nullable=False)
snapshot_json: Mapped[dict] = mapped_column(JSON, nullable=False)
content_sha256: Mapped[str] = mapped_column(String(64), nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
created_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
ttp: Mapped[Ttp] = relationship(back_populates="versions")

View File

@@ -0,0 +1,50 @@
"""User accounts (RT operators, RT leads, SOC analysts)."""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import DateTime, Enum, String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
from mimic.db.types import UserType
if TYPE_CHECKING:
from mimic.db.models.permission import UserGroup
from mimic.db.models.soc_session import SocSession
class User(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "user"
__table_args__ = (UniqueConstraint("email", name="uq_user_email"),)
type: Mapped[UserType] = mapped_column(Enum(UserType, name="user_type"), nullable=False)
email: Mapped[str] = mapped_column(String(255), nullable=False)
display_name: Mapped[str | None] = mapped_column(String(120))
keycloak_sub: Mapped[str | None] = mapped_column(String(255), unique=True)
local_password_hash: Mapped[str | None] = mapped_column(String(255))
disabled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
last_login_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
group_links: Mapped[list[UserGroup]] = relationship(
back_populates="user",
cascade="all, delete-orphan",
)
soc_sessions: Mapped[list[SocSession]] = relationship(
back_populates="user",
cascade="all, delete-orphan",
)
@property
def is_active(self) -> bool:
return self.disabled_at is None
@property
def created_by_id(self) -> UUID | None:
# Placeholder for future audit linkage; kept for symmetry with spec §8.
return None