feat(backend): add §8 data model + Alembic baseline (B0.2, B0.3)

- SQLAlchemy 2 typed mapped classes for every spec §8 aggregate:
  engagement, c2_credential, host, user, group, group_permission,
  user_group, engagement_member, ttp, ttp_version, scenario,
  scenario_step, run, run_step, run_step_cleanup, detection, evidence,
  report, soc_session, audit_log.
- Shared mixins: UuidPkMixin (PG_UUID(as_uuid=True)) + TimestampsMixin.
- StrEnum types covering every spec enum (C2Type, PayloadType, UserType,
  EngagementStatus, HostStatus, TtpSource, RunStatus, RunStepStatus,
  CleanupStatus, DetectionLevel, DetectionSource, EvidenceStatus).
- Alembic baseline migration 202605210001_initial_schema: creates every
  table, enum, index, and idempotent grants for the audit_log
  write-only Postgres role (mimic_audit_writer).
- Audit log carries prev_hash / row_hash from v1 (D-009).
- ttp_version table coexists with run.snapshot_json (D-008,
  overrides H32).
This commit is contained in:
knacky
2026-05-21 20:32:45 +02:00
parent a93c959444
commit 22d37fb240
23 changed files with 1833 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
"""Mimic backend package."""
__version__ = "0.1.0a0"

View File

@@ -0,0 +1,66 @@
"""Runtime configuration loaded from environment (Pydantic Settings)."""
from __future__ import annotations
from functools import lru_cache
from typing import Literal
from pydantic import Field, SecretStr, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
Environment = Literal["development", "testing", "production"]
class Settings(BaseSettings):
"""Application settings.
All values are env-driven (NF-network: no hardcoded secrets). The `MIMIC_`
prefix isolates Mimic vars from ambient shell vars.
"""
model_config = SettingsConfigDict(
env_prefix="MIMIC_",
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
case_sensitive=False,
)
env: Environment = "development"
secret_key: SecretStr = Field(default=SecretStr("change-me"))
fernet_key: SecretStr = Field(default=SecretStr(""))
database_url: str = "postgresql+psycopg://mimic_app:mimic_dev_password@localhost:5432/mimic"
database_audit_url: str | None = None
session_cookie_secure: bool = True
session_cookie_samesite: Literal["Lax", "Strict", "None"] = "Lax"
session_lifetime_minutes: int = 60 * 8
cors_origins: list[str] = Field(default_factory=list)
log_level: str = "INFO"
log_json: bool = True
template_render_timeout_ms: int = 250
output_blob_max_bytes: int = 10 * 1024 * 1024
@field_validator("cors_origins", mode="before")
@classmethod
def _split_cors(cls, value: object) -> object:
if isinstance(value, str):
return [origin.strip() for origin in value.split(",") if origin.strip()]
return value
@property
def is_production(self) -> bool:
return self.env == "production"
@property
def is_testing(self) -> bool:
return self.env == "testing"
@lru_cache(maxsize=1)
def get_settings() -> Settings:
return Settings()

View File

@@ -0,0 +1,5 @@
"""Database layer: SQLAlchemy 2 declarative base, models, repositories."""
from mimic.db.base import Base
__all__ = ["Base"]

View File

@@ -0,0 +1,59 @@
"""Declarative base + shared mixins for all ORM models."""
from __future__ import annotations
import uuid
from datetime import UTC, datetime
from sqlalchemy import DateTime, MetaData, func
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
NAMING_CONVENTION = {
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s",
}
class Base(DeclarativeBase):
"""Project-wide declarative base.
UUID columns are declared explicitly on each model via `PG_UUID(as_uuid=True)`
rather than through a `type_annotation_map` — Flask-SQLAlchemy injects its
own registry which is incompatible with per-base annotation maps.
"""
metadata = MetaData(naming_convention=NAMING_CONVENTION)
class UuidPkMixin:
"""Mixin: UUID v4 primary key generated client-side."""
id: Mapped[uuid.UUID] = mapped_column(
PG_UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4,
)
def _utcnow() -> datetime:
return datetime.now(tz=UTC)
class TimestampsMixin:
"""Mixin: `created_at` / `updated_at` columns, UTC timezone-aware."""
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=_utcnow,
nullable=False,
)

View File

@@ -0,0 +1,58 @@
"""Alembic environment.
We import the SQLAlchemy `Base.metadata` directly so migrations are decoupled
from the Flask app object (Alembic can run in CI without spinning a request
context).
"""
from __future__ import annotations
import logging
from logging.config import fileConfig
from alembic import context
from sqlalchemy import engine_from_config, pool
from mimic.config import get_settings
from mimic.db.base import Base
from mimic.db.models import * # noqa: F403 (ensures all tables register)
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
log = logging.getLogger("alembic.env")
target_metadata = Base.metadata
settings = get_settings()
config.set_main_option("sqlalchemy.url", settings.database_url)
def run_migrations_offline() -> None:
context.configure(
url=config.get_main_option("sqlalchemy.url"),
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,27 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from __future__ import annotations
from typing import Sequence
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
revision: str = ${repr(up_revision)}
down_revision: str | Sequence[str] | None = ${repr(down_revision)}
branch_labels: str | Sequence[str] | None = ${repr(branch_labels)}
depends_on: str | Sequence[str] | None = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,610 @@
"""initial schema (sprint 0, §8 spec)
Creates every aggregate listed in spec §8: engagement, user/group RBAC,
host, ttp/ttp_version, scenario/scenario_step, run/run_step/run_step_cleanup,
detection, evidence, report, soc_session, c2_credential, audit_log.
Postgres-only objects:
- ENUM types created via SQLAlchemy `Enum(..., create_type=True)`.
- `audit_log` SQL-level append-only via `mimic_audit_writer` role grants
(idempotent, no-op if role missing — deployment playbook is authoritative).
Revision ID: 202605210001
Revises:
Create Date: 2026-05-21
"""
from __future__ import annotations
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects.postgresql import JSONB, UUID
revision: str = "202605210001"
down_revision: str | None = None
branch_labels: str | None = None
depends_on: str | None = None
# ---------------------------------------------------------------------------
# Enums (Postgres CREATE TYPE — done up-front so every table reuses the type)
# ---------------------------------------------------------------------------
USER_TYPE = sa.Enum("rt_operator", "rt_lead", "soc_analyst", name="user_type")
ENGAGEMENT_STATUS = sa.Enum(
"draft", "active", "closed", "archived", name="engagement_status"
)
C2_TYPE = sa.Enum("mythic", "home", name="c2_type")
HOST_STATUS = sa.Enum("unknown", "alive", "dead", name="host_status")
PAYLOAD_TYPE = sa.Enum(
"cmd",
"powershell",
"bof",
"dotnet_assembly",
"dotnet_exe",
"pe_exe",
"pe_dll",
"shellcode",
"python",
"vbs",
"wmi_query",
"registry",
"script_file",
name="payload_type",
)
TTP_SOURCE = sa.Enum("custom", "import_atr", "import_mission", name="ttp_source")
RUN_STATUS = sa.Enum(
"queued", "running", "paused", "completed", "failed", "aborted", name="run_status"
)
RUN_STEP_STATUS = sa.Enum(
"queued",
"running",
"completed",
"failed",
"skipped",
"cleanup_failed",
name="run_step_status",
)
CLEANUP_STATUS = sa.Enum(
"pending", "success", "failed", "partial", name="cleanup_status"
)
DETECTION_LEVEL = sa.Enum(
"detected", "partial", "not_detected", name="detection_level"
)
DETECTION_SOURCE = sa.Enum(
"ndr", "edr", "siem", "manual", "other", name="detection_source"
)
EVIDENCE_STATUS = sa.Enum(
"success", "failure", "partial", name="evidence_status"
)
def upgrade() -> None:
bind = op.get_bind()
for enum_t in (
USER_TYPE,
ENGAGEMENT_STATUS,
C2_TYPE,
HOST_STATUS,
PAYLOAD_TYPE,
TTP_SOURCE,
RUN_STATUS,
RUN_STEP_STATUS,
CLEANUP_STATUS,
DETECTION_LEVEL,
DETECTION_SOURCE,
EVIDENCE_STATUS,
):
enum_t.create(bind, checkfirst=True)
# ------------------------------------------------------------------ user
op.create_table(
"user",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column("type", USER_TYPE, nullable=False),
sa.Column("email", sa.String(255), nullable=False),
sa.Column("display_name", sa.String(120)),
sa.Column("keycloak_sub", sa.String(255), unique=True),
sa.Column("local_password_hash", sa.String(255)),
sa.Column("disabled_at", sa.DateTime(timezone=True)),
sa.Column("last_login_at", sa.DateTime(timezone=True)),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.UniqueConstraint("email", name="uq_user_email"),
)
# ------------------------------------------------------------ permission
op.create_table(
"permission",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column("code", sa.String(80), nullable=False),
sa.Column("description", sa.String(255)),
sa.UniqueConstraint("code", name="uq_permission_code"),
)
op.create_table(
"group",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column("name", sa.String(80), nullable=False),
sa.Column("description", sa.String(255)),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.UniqueConstraint("name", name="uq_group_name"),
)
op.create_table(
"group_permission",
sa.Column(
"group_id",
UUID(as_uuid=True),
sa.ForeignKey("group.id", ondelete="CASCADE", name="fk_group_permission_group_id_group"),
nullable=False,
),
sa.Column(
"permission_id",
UUID(as_uuid=True),
sa.ForeignKey("permission.id", ondelete="CASCADE", name="fk_group_permission_permission_id_permission"),
nullable=False,
),
sa.PrimaryKeyConstraint("group_id", "permission_id", name="pk_group_permission"),
)
# ------------------------------------------------------------ engagement
op.create_table(
"engagement",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column("client_name", sa.String(255), nullable=False),
sa.Column("description", sa.String(1024)),
sa.Column("status", ENGAGEMENT_STATUS, nullable=False, server_default="draft"),
sa.Column("start_date", sa.Date),
sa.Column("end_date", sa.Date),
sa.Column("c2_type", C2_TYPE, nullable=False, server_default="mythic"),
sa.Column("created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
op.create_table(
"engagement_member",
sa.Column(
"engagement_id",
UUID(as_uuid=True),
sa.ForeignKey("engagement.id", ondelete="CASCADE"),
primary_key=True,
),
sa.Column(
"user_id",
UUID(as_uuid=True),
sa.ForeignKey("user.id", ondelete="CASCADE"),
primary_key=True,
),
sa.Column("role", sa.String(40), nullable=False),
sa.Column("added_at", sa.DateTime(timezone=True), nullable=False),
)
op.create_table(
"user_group",
sa.Column(
"user_id",
UUID(as_uuid=True),
sa.ForeignKey("user.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"group_id",
UUID(as_uuid=True),
sa.ForeignKey("group.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"engagement_id",
UUID(as_uuid=True),
sa.ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=True,
),
sa.PrimaryKeyConstraint("user_id", "group_id", "engagement_id", name="pk_user_group"),
)
op.create_table(
"c2_credential",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"engagement_id",
UUID(as_uuid=True),
sa.ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("c2_type", C2_TYPE, nullable=False),
sa.Column("config_fernet", sa.LargeBinary, nullable=False),
sa.Column("version", sa.Integer, nullable=False, server_default="1"),
sa.Column("retired_at", sa.DateTime(timezone=True)),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
op.create_index(
"ix_c2_credential_engagement_active",
"c2_credential",
["engagement_id", "c2_type", "version"],
unique=False,
)
# ------------------------------------------------------------------ host
op.create_table(
"host",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"engagement_id",
UUID(as_uuid=True),
sa.ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("hostname", sa.String(255), nullable=False),
sa.Column("ip", sa.String(64)),
sa.Column("os", sa.String(128)),
sa.Column("c2_session_id", sa.String(128)),
sa.Column("c2_type", C2_TYPE, nullable=False),
sa.Column("status", HOST_STATUS, nullable=False, server_default="unknown"),
sa.Column("last_seen", sa.DateTime(timezone=True)),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
op.create_index("ix_host_engagement_id", "host", ["engagement_id"])
# ------------------------------------------------------------------ ttp
op.create_table(
"ttp",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column("name", sa.String(255), nullable=False),
sa.Column("description", sa.Text),
sa.Column("mitre_technique", sa.String(16), nullable=False),
sa.Column("mitre_subtechnique", sa.String(16)),
sa.Column("payload_type", PAYLOAD_TYPE, nullable=False),
sa.Column("payload_template", sa.Text, nullable=False, server_default=""),
sa.Column("params_schema_json", JSONB),
sa.Column("opsec_notes", sa.Text),
sa.Column("cleanup_command", sa.Text),
sa.Column("is_stealth_variant", sa.Boolean, nullable=False, server_default=sa.false()),
sa.Column("source", TTP_SOURCE, nullable=False, server_default="custom"),
sa.Column("tags", JSONB, nullable=False, server_default="[]"),
sa.Column("current_version", sa.Integer, nullable=False, server_default="1"),
sa.Column("is_published", sa.Boolean, nullable=False, server_default=sa.false()),
sa.Column("created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
op.create_table(
"ttp_version",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"ttp_id",
UUID(as_uuid=True),
sa.ForeignKey("ttp.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("version", sa.Integer, nullable=False),
sa.Column("snapshot_json", JSONB, nullable=False),
sa.Column("content_sha256", sa.String(64), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")),
sa.UniqueConstraint("ttp_id", "version", name="uq_ttp_version_ttp_id_version"),
)
# -------------------------------------------------------------- scenario
op.create_table(
"scenario",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"engagement_id",
UUID(as_uuid=True),
sa.ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("name", sa.String(255), nullable=False),
sa.Column("description", sa.Text),
sa.Column("version", sa.Integer, nullable=False, server_default="1"),
sa.Column("c2_type", C2_TYPE, nullable=False),
sa.Column("created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
op.create_table(
"scenario_step",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"scenario_id",
UUID(as_uuid=True),
sa.ForeignKey("scenario.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("order_idx", sa.Integer, nullable=False),
sa.Column(
"ttp_id",
UUID(as_uuid=True),
sa.ForeignKey("ttp.id", ondelete="RESTRICT"),
nullable=False,
),
sa.Column(
"host_id",
UUID(as_uuid=True),
sa.ForeignKey("host.id", ondelete="RESTRICT"),
nullable=False,
),
sa.Column("params_override_json", JSONB, nullable=False, server_default="{}"),
sa.Column("delay_after_ms", sa.Integer, nullable=False, server_default="0"),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.UniqueConstraint("scenario_id", "order_idx", name="uq_scenario_step_order_idx"),
)
# ------------------------------------------------------------------- run
op.create_table(
"run",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"scenario_id",
UUID(as_uuid=True),
sa.ForeignKey("scenario.id", ondelete="RESTRICT"),
nullable=False,
),
sa.Column("status", RUN_STATUS, nullable=False, server_default="queued"),
sa.Column("started_at", sa.DateTime(timezone=True)),
sa.Column("ended_at", sa.DateTime(timezone=True)),
sa.Column("started_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")),
sa.Column("snapshot_json", JSONB, nullable=False, server_default="{}"),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
op.create_table(
"run_step",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"run_id",
UUID(as_uuid=True),
sa.ForeignKey("run.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"scenario_step_id",
UUID(as_uuid=True),
sa.ForeignKey("scenario_step.id", ondelete="SET NULL"),
),
sa.Column("order_idx", sa.Integer, nullable=False),
sa.Column("status", RUN_STEP_STATUS, nullable=False, server_default="queued"),
sa.Column("started_at", sa.DateTime(timezone=True)),
sa.Column("ended_at", sa.DateTime(timezone=True)),
sa.Column("c2_task_id", sa.String(128)),
sa.Column("output_text", sa.Text),
sa.Column("output_blob_ref", sa.String(512)),
sa.Column("exit_code", sa.Integer),
sa.Column("resolved_payload_text", sa.Text),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
op.create_table(
"run_step_cleanup",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"run_step_id",
UUID(as_uuid=True),
sa.ForeignKey("run_step.id", ondelete="CASCADE"),
nullable=False,
unique=True,
),
sa.Column("status", CLEANUP_STATUS, nullable=False, server_default="pending"),
sa.Column("started_at", sa.DateTime(timezone=True)),
sa.Column("ended_at", sa.DateTime(timezone=True)),
sa.Column("resolved_command_text", sa.Text),
sa.Column("output", sa.Text),
sa.Column("executed_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
# ----------------------------------------------------- detection / evidence
op.create_table(
"detection",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"run_step_id",
UUID(as_uuid=True),
sa.ForeignKey("run_step.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"soc_user_id",
UUID(as_uuid=True),
sa.ForeignKey("user.id", ondelete="RESTRICT"),
nullable=False,
),
sa.Column("level", DETECTION_LEVEL, nullable=False),
sa.Column("source", DETECTION_SOURCE, nullable=False),
sa.Column("latency_ms", sa.Integer),
sa.Column("comment", sa.Text),
sa.Column("recorded_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
op.create_table(
"evidence",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"run_step_id",
UUID(as_uuid=True),
sa.ForeignKey("run_step.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"rt_user_id",
UUID(as_uuid=True),
sa.ForeignKey("user.id", ondelete="RESTRICT"),
nullable=False,
),
sa.Column("status", EVIDENCE_STATUS, nullable=False),
sa.Column("artifacts_text", sa.Text),
sa.Column("artifact_files_json", JSONB, nullable=False, server_default="[]"),
sa.Column("comment", sa.Text),
sa.Column("recorded_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
# ----------------------------------------------------------------- report
op.create_table(
"report",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"engagement_id",
UUID(as_uuid=True),
sa.ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("version", sa.Integer, nullable=False, server_default="1"),
sa.Column("content_json", JSONB, nullable=False, server_default="{}"),
sa.Column("content_sha256", sa.String(64), nullable=False),
sa.Column("pdf_path", sa.String(512)),
sa.Column("md_path", sa.String(512)),
sa.Column("generated_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("generated_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
# ------------------------------------------------------------- soc_session
op.create_table(
"soc_session",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"user_id",
UUID(as_uuid=True),
sa.ForeignKey("user.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"engagement_id",
UUID(as_uuid=True),
sa.ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("token_hash", sa.String(255), nullable=False, unique=True),
sa.Column("expires_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("revoked_at", sa.DateTime(timezone=True)),
sa.Column("last_ip", sa.String(64)),
sa.Column("last_user_agent", sa.String(512)),
sa.Column("last_used_at", sa.DateTime(timezone=True)),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
# ------------------------------------------------------------ audit_log
op.create_table(
"audit_log",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column("ts", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
sa.Column("actor_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")),
sa.Column("action", sa.String(80), nullable=False),
sa.Column("resource_type", sa.String(80), nullable=False),
sa.Column("resource_id", sa.String(128)),
sa.Column("metadata_json", JSONB, nullable=False, server_default="{}"),
sa.Column("prev_hash", sa.String(64)),
sa.Column("row_hash", sa.String(64), nullable=False),
sa.Column("source_ip", sa.String(64)),
sa.Column("user_agent", sa.String(512)),
sa.Column("comment", sa.Text),
)
op.create_index("ix_audit_log_ts", "audit_log", ["ts"])
op.create_index("ix_audit_log_resource", "audit_log", ["resource_type", "resource_id"])
# ---------------------------------------------- NF-AUDIT role-level grants
# `mimic_audit_writer` (write-only) + `mimic_app` (read-only on audit_log).
# Idempotent: skip silently if roles are absent (dev/test boxes).
bind.exec_driver_sql(
"""
DO $do$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'mimic_audit_writer') THEN
GRANT INSERT ON TABLE audit_log TO mimic_audit_writer;
REVOKE UPDATE, DELETE, TRUNCATE ON TABLE audit_log FROM mimic_audit_writer;
END IF;
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = current_user) THEN
REVOKE UPDATE, DELETE, TRUNCATE ON TABLE audit_log FROM PUBLIC;
END IF;
END
$do$;
"""
)
# ---------------------------------------------------------------- seed RBAC
op.bulk_insert(
sa.table(
"group",
sa.column("id", UUID(as_uuid=True)),
sa.column("name", sa.String),
sa.column("description", sa.String),
sa.column("created_at", sa.DateTime(timezone=True)),
sa.column("updated_at", sa.DateTime(timezone=True)),
),
[
{
"id": "11111111-0000-0000-0000-000000000001",
"name": "rt_operator",
"description": "Red team operator (per-engagement scope).",
},
{
"id": "11111111-0000-0000-0000-000000000002",
"name": "rt_lead",
"description": "Red team lead (full RT privileges).",
},
{
"id": "11111111-0000-0000-0000-000000000003",
"name": "soc_analyst",
"description": "SOC analyst (per-engagement, scoped via soc_session).",
},
],
)
def downgrade() -> None:
for table in (
"audit_log",
"soc_session",
"report",
"evidence",
"detection",
"run_step_cleanup",
"run_step",
"run",
"scenario_step",
"scenario",
"ttp_version",
"ttp",
"host",
"c2_credential",
"user_group",
"engagement_member",
"engagement",
"group_permission",
"group",
"permission",
"user",
):
op.drop_table(table)
for enum_t in (
EVIDENCE_STATUS,
DETECTION_SOURCE,
DETECTION_LEVEL,
CLEANUP_STATUS,
RUN_STEP_STATUS,
RUN_STATUS,
TTP_SOURCE,
PAYLOAD_TYPE,
HOST_STATUS,
C2_TYPE,
ENGAGEMENT_STATUS,
USER_TYPE,
):
enum_t.drop(op.get_bind(), checkfirst=True)

View File

@@ -0,0 +1,37 @@
"""SQLAlchemy 2 typed mapped classes for every Mimic aggregate."""
from mimic.db.models.audit import AuditLog
from mimic.db.models.detection import Detection, Evidence
from mimic.db.models.engagement import C2Credential, Engagement, EngagementMember
from mimic.db.models.host import Host
from mimic.db.models.permission import Group, GroupPermission, Permission, UserGroup
from mimic.db.models.report import Report
from mimic.db.models.run import Run, RunStep, RunStepCleanup
from mimic.db.models.scenario import Scenario, ScenarioStep
from mimic.db.models.soc_session import SocSession
from mimic.db.models.ttp import Ttp, TtpVersion
from mimic.db.models.user import User
__all__ = [
"AuditLog",
"C2Credential",
"Detection",
"Engagement",
"EngagementMember",
"Evidence",
"Group",
"GroupPermission",
"Host",
"Permission",
"Report",
"Run",
"RunStep",
"RunStepCleanup",
"Scenario",
"ScenarioStep",
"SocSession",
"Ttp",
"TtpVersion",
"User",
"UserGroup",
]

View File

@@ -0,0 +1,45 @@
"""Append-only audit log.
NF-AUDIT: enforced at SQL level by granting only INSERT to
`mimic_audit_writer` and only SELECT to `mimic_app`. The ORM never updates or
deletes rows on this table from application code. Hash chain (`prev_hash` /
`row_hash`) is wired here at the schema level so v2 can switch to a strict
WORM enforcement without a migration; sprint 0 fills the columns at insert.
"""
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from sqlalchemy import JSON, DateTime, ForeignKey, String, Text, func
from sqlalchemy.orm import Mapped, mapped_column
from mimic.db.base import Base, UuidPkMixin
class AuditLog(UuidPkMixin, Base):
__tablename__ = "audit_log"
ts: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
)
actor_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
action: Mapped[str] = mapped_column(String(80), nullable=False)
resource_type: Mapped[str] = mapped_column(String(80), nullable=False)
resource_id: Mapped[str | None] = mapped_column(String(128))
metadata_json: Mapped[dict] = mapped_column(JSON, nullable=False, default=dict)
prev_hash: Mapped[str | None] = mapped_column(String(64))
row_hash: Mapped[str] = mapped_column(String(64), nullable=False)
# SHA-256 of canonical (prev_hash || ts || actor_id || action ||
# resource_type || resource_id || metadata_json). Provides hash-chain
# integrity once verifier runs in v2.
source_ip: Mapped[str | None] = mapped_column(String(64))
user_agent: Mapped[str | None] = mapped_column(String(512))
comment: Mapped[str | None] = mapped_column(Text)

View File

@@ -0,0 +1,82 @@
"""Per-run-step detection (SOC) and offensive evidence (RT)."""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import (
JSON,
DateTime,
Enum,
ForeignKey,
Integer,
Text,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
from mimic.db.types import DetectionLevel, DetectionSource, EvidenceStatus
if TYPE_CHECKING:
from mimic.db.models.run import RunStep
from mimic.db.models.user import User
class Detection(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "detection"
run_step_id: Mapped[UUID] = mapped_column(
ForeignKey("run_step.id", ondelete="CASCADE"),
nullable=False,
)
soc_user_id: Mapped[UUID] = mapped_column(
ForeignKey("user.id", ondelete="RESTRICT"),
nullable=False,
)
level: Mapped[DetectionLevel] = mapped_column(
Enum(DetectionLevel, name="detection_level"),
nullable=False,
)
source: Mapped[DetectionSource] = mapped_column(
Enum(DetectionSource, name="detection_source"),
nullable=False,
)
latency_ms: Mapped[int | None] = mapped_column(Integer)
comment: Mapped[str | None] = mapped_column(Text)
recorded_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
run_step: Mapped[RunStep] = relationship()
soc_user: Mapped[User] = relationship()
class Evidence(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "evidence"
run_step_id: Mapped[UUID] = mapped_column(
ForeignKey("run_step.id", ondelete="CASCADE"),
nullable=False,
)
rt_user_id: Mapped[UUID] = mapped_column(
ForeignKey("user.id", ondelete="RESTRICT"),
nullable=False,
)
status: Mapped[EvidenceStatus] = mapped_column(
Enum(EvidenceStatus, name="evidence_status"),
nullable=False,
)
artifacts_text: Mapped[str | None] = mapped_column(Text)
artifact_files_json: Mapped[list[dict]] = mapped_column(
JSON, default=list, nullable=False
)
# Each entry: {"name": str, "ref": str, "sha256": str, "size_bytes": int}
comment: Mapped[str | None] = mapped_column(Text)
recorded_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
run_step: Mapped[RunStep] = relationship()
rt_user: Mapped[User] = relationship()

View File

@@ -0,0 +1,106 @@
"""Engagement aggregate: tenancy container + C2 credentials."""
from __future__ import annotations
from datetime import date, datetime
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import Date, DateTime, Enum, ForeignKey, Integer, LargeBinary, String, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
from mimic.db.types import C2Type, EngagementStatus
if TYPE_CHECKING:
from mimic.db.models.host import Host
from mimic.db.models.scenario import Scenario
from mimic.db.models.user import User
class Engagement(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "engagement"
client_name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str | None] = mapped_column(String(1024))
status: Mapped[EngagementStatus] = mapped_column(
Enum(EngagementStatus, name="engagement_status"),
default=EngagementStatus.DRAFT,
nullable=False,
)
start_date: Mapped[date | None] = mapped_column(Date)
end_date: Mapped[date | None] = mapped_column(Date)
c2_type: Mapped[C2Type] = mapped_column(
Enum(C2Type, name="c2_type"),
default=C2Type.MYTHIC,
nullable=False,
)
created_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
hosts: Mapped[list[Host]] = relationship(
back_populates="engagement",
cascade="all, delete-orphan",
)
scenarios: Mapped[list[Scenario]] = relationship(
back_populates="engagement",
cascade="all, delete-orphan",
)
members: Mapped[list[EngagementMember]] = relationship(
back_populates="engagement",
cascade="all, delete-orphan",
)
c2_credentials: Mapped[list[C2Credential]] = relationship(
back_populates="engagement",
cascade="all, delete-orphan",
)
class EngagementMember(Base):
__tablename__ = "engagement_member"
engagement_id: Mapped[UUID] = mapped_column(
ForeignKey("engagement.id", ondelete="CASCADE"),
primary_key=True,
)
user_id: Mapped[UUID] = mapped_column(
ForeignKey("user.id", ondelete="CASCADE"),
primary_key=True,
)
role: Mapped[str] = mapped_column(String(40), nullable=False)
added_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
)
engagement: Mapped[Engagement] = relationship(back_populates="members")
user: Mapped[User] = relationship()
class C2Credential(UuidPkMixin, TimestampsMixin, Base):
"""Per-engagement encrypted C2 credentials.
Decision D-004: dedicated table, Fernet-encrypted blob, versioned, rotation
via insert + retire. Active row = `retired_at IS NULL` with the highest
`version` for `(engagement_id, c2_type)`.
"""
__tablename__ = "c2_credential"
engagement_id: Mapped[UUID] = mapped_column(
ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=False,
)
c2_type: Mapped[C2Type] = mapped_column(
Enum(C2Type, name="c2_type", create_type=False),
nullable=False,
)
config_fernet: Mapped[bytes] = mapped_column(LargeBinary, nullable=False)
version: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
retired_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
engagement: Mapped[Engagement] = relationship(back_populates="c2_credentials")

View File

@@ -0,0 +1,41 @@
"""Host inventory (per-engagement target machines)."""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import DateTime, Enum, ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
from mimic.db.types import C2Type, HostStatus
if TYPE_CHECKING:
from mimic.db.models.engagement import Engagement
class Host(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "host"
engagement_id: Mapped[UUID] = mapped_column(
ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=False,
)
hostname: Mapped[str] = mapped_column(String(255), nullable=False)
ip: Mapped[str | None] = mapped_column(String(64))
os: Mapped[str | None] = mapped_column(String(128))
c2_session_id: Mapped[str | None] = mapped_column(String(128))
c2_type: Mapped[C2Type] = mapped_column(
Enum(C2Type, name="c2_type", create_type=False),
nullable=False,
)
status: Mapped[HostStatus] = mapped_column(
Enum(HostStatus, name="host_status"),
default=HostStatus.UNKNOWN,
nullable=False,
)
last_seen: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
engagement: Mapped[Engagement] = relationship(back_populates="hosts")

View File

@@ -0,0 +1,87 @@
"""Group-based RBAC tables.
Decision D-003 (spec-decisions): group-based from day one so OIDC will map
claims to existing groups in v2 without touching application code.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import ForeignKey, PrimaryKeyConstraint, String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
if TYPE_CHECKING:
from mimic.db.models.engagement import Engagement
from mimic.db.models.user import User
class Permission(UuidPkMixin, Base):
__tablename__ = "permission"
__table_args__ = (UniqueConstraint("code", name="uq_permission_code"),)
code: Mapped[str] = mapped_column(String(80), nullable=False)
description: Mapped[str | None] = mapped_column(String(255))
class Group(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "group"
__table_args__ = (UniqueConstraint("name", name="uq_group_name"),)
name: Mapped[str] = mapped_column(String(80), nullable=False)
description: Mapped[str | None] = mapped_column(String(255))
permission_links: Mapped[list[GroupPermission]] = relationship(
back_populates="group",
cascade="all, delete-orphan",
)
user_links: Mapped[list[UserGroup]] = relationship(
back_populates="group",
cascade="all, delete-orphan",
)
class GroupPermission(Base):
__tablename__ = "group_permission"
__table_args__ = (
PrimaryKeyConstraint("group_id", "permission_id", name="pk_group_permission"),
)
group_id: Mapped[UUID] = mapped_column(
ForeignKey("group.id", ondelete="CASCADE"), nullable=False
)
permission_id: Mapped[UUID] = mapped_column(
ForeignKey("permission.id", ondelete="CASCADE"), nullable=False
)
group: Mapped[Group] = relationship(back_populates="permission_links")
permission: Mapped[Permission] = relationship()
class UserGroup(Base):
__tablename__ = "user_group"
__table_args__ = (
PrimaryKeyConstraint(
"user_id", "group_id", "engagement_id", name="pk_user_group"
),
)
user_id: Mapped[UUID] = mapped_column(
ForeignKey("user.id", ondelete="CASCADE"), nullable=False
)
group_id: Mapped[UUID] = mapped_column(
ForeignKey("group.id", ondelete="CASCADE"), nullable=False
)
engagement_id: Mapped[UUID | None] = mapped_column(
ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=True,
)
# Global membership (e.g. rt_lead enterprise-wide) when engagement_id IS NULL.
# Per-engagement membership otherwise.
user: Mapped[User] = relationship(back_populates="group_links")
group: Mapped[Group] = relationship(back_populates="user_links")
engagement: Mapped[Engagement | None] = relationship()

View File

@@ -0,0 +1,48 @@
"""Mission report (PDF / JSON / Markdown bundle)."""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import (
JSON,
DateTime,
ForeignKey,
Integer,
String,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
if TYPE_CHECKING:
from mimic.db.models.engagement import Engagement
class Report(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "report"
engagement_id: Mapped[UUID] = mapped_column(
ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=False,
)
version: Mapped[int] = mapped_column(Integer, default=1, nullable=False)
content_json: Mapped[dict] = mapped_column(JSON, nullable=False, default=dict)
content_sha256: Mapped[str] = mapped_column(String(64), nullable=False)
# SHA-256 of canonical JSON. Identical hash referenced in PDF footer, JSON
# export and Markdown export (spec H19 / F9 / F14).
pdf_path: Mapped[str | None] = mapped_column(String(512))
md_path: Mapped[str | None] = mapped_column(String(512))
generated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
generated_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
engagement: Mapped[Engagement] = relationship()

View File

@@ -0,0 +1,118 @@
"""Run aggregate: scenario execution + per-step state + cleanup."""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import (
JSON,
DateTime,
Enum,
ForeignKey,
Integer,
String,
Text,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
from mimic.db.types import CleanupStatus, RunStatus, RunStepStatus
if TYPE_CHECKING:
from mimic.db.models.scenario import Scenario, ScenarioStep
class Run(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "run"
scenario_id: Mapped[UUID] = mapped_column(
ForeignKey("scenario.id", ondelete="RESTRICT"),
nullable=False,
)
status: Mapped[RunStatus] = mapped_column(
Enum(RunStatus, name="run_status"),
default=RunStatus.QUEUED,
nullable=False,
)
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
ended_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
started_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
snapshot_json: Mapped[dict] = mapped_column(JSON, nullable=False, default=dict)
# Full self-contained snapshot of scenario + steps + resolved TTPs.
# Source of truth for replay (spec H32).
scenario: Mapped[Scenario] = relationship(back_populates="runs")
steps: Mapped[list[RunStep]] = relationship(
back_populates="run",
cascade="all, delete-orphan",
order_by="RunStep.order_idx",
)
class RunStep(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "run_step"
run_id: Mapped[UUID] = mapped_column(
ForeignKey("run.id", ondelete="CASCADE"),
nullable=False,
)
scenario_step_id: Mapped[UUID | None] = mapped_column(
ForeignKey("scenario_step.id", ondelete="SET NULL")
)
order_idx: Mapped[int] = mapped_column(Integer, nullable=False)
status: Mapped[RunStepStatus] = mapped_column(
Enum(RunStepStatus, name="run_step_status"),
default=RunStepStatus.QUEUED,
nullable=False,
)
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
ended_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
c2_task_id: Mapped[str | None] = mapped_column(String(128))
output_text: Mapped[str | None] = mapped_column(Text)
output_blob_ref: Mapped[str | None] = mapped_column(String(512))
exit_code: Mapped[int | None] = mapped_column(Integer)
resolved_payload_text: Mapped[str | None] = mapped_column(Text)
# The fully-resolved payload (post-Jinja) actually sent to the C2. Useful
# for audit / report. Marker (`# MIMIC-RUN:<id>`) is added here.
run: Mapped[Run] = relationship(back_populates="steps")
scenario_step: Mapped[ScenarioStep | None] = relationship()
cleanup: Mapped[RunStepCleanup | None] = relationship(
back_populates="run_step",
cascade="all, delete-orphan",
uselist=False,
)
class RunStepCleanup(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "run_step_cleanup"
run_step_id: Mapped[UUID] = mapped_column(
ForeignKey("run_step.id", ondelete="CASCADE"),
nullable=False,
unique=True,
)
status: Mapped[CleanupStatus] = mapped_column(
Enum(CleanupStatus, name="cleanup_status"),
default=CleanupStatus.PENDING,
nullable=False,
)
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
ended_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
resolved_command_text: Mapped[str | None] = mapped_column(Text)
output: Mapped[str | None] = mapped_column(Text)
executed_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
run_step: Mapped[RunStep] = relationship(back_populates="cleanup")

View File

@@ -0,0 +1,86 @@
"""Scenario aggregate: ordered list of TTP steps.
Spec §F3: `scenario.c2_type` is the source of truth for the run. Verified at
run start that every referenced host matches.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import (
JSON,
Enum,
ForeignKey,
Integer,
String,
Text,
UniqueConstraint,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
from mimic.db.types import C2Type
if TYPE_CHECKING:
from mimic.db.models.engagement import Engagement
from mimic.db.models.host import Host
from mimic.db.models.run import Run
from mimic.db.models.ttp import Ttp
class Scenario(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "scenario"
engagement_id: Mapped[UUID] = mapped_column(
ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=False,
)
name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str | None] = mapped_column(Text)
version: Mapped[int] = mapped_column(Integer, default=1, nullable=False)
c2_type: Mapped[C2Type] = mapped_column(
Enum(C2Type, name="c2_type", create_type=False),
nullable=False,
)
created_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
engagement: Mapped[Engagement] = relationship(back_populates="scenarios")
steps: Mapped[list[ScenarioStep]] = relationship(
back_populates="scenario",
cascade="all, delete-orphan",
order_by="ScenarioStep.order_idx",
)
runs: Mapped[list[Run]] = relationship(back_populates="scenario")
class ScenarioStep(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "scenario_step"
__table_args__ = (
UniqueConstraint("scenario_id", "order_idx", name="uq_scenario_step_order_idx"),
)
scenario_id: Mapped[UUID] = mapped_column(
ForeignKey("scenario.id", ondelete="CASCADE"),
nullable=False,
)
order_idx: Mapped[int] = mapped_column(Integer, nullable=False)
ttp_id: Mapped[UUID] = mapped_column(
ForeignKey("ttp.id", ondelete="RESTRICT"),
nullable=False,
)
host_id: Mapped[UUID] = mapped_column(
ForeignKey("host.id", ondelete="RESTRICT"),
nullable=False,
)
params_override_json: Mapped[dict] = mapped_column(JSON, default=dict, nullable=False)
delay_after_ms: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
scenario: Mapped[Scenario] = relationship(back_populates="steps")
ttp: Mapped[Ttp] = relationship()
host: Mapped[Host] = relationship()

View File

@@ -0,0 +1,48 @@
"""SOC analyst sessions (bcrypt-hashed opaque tokens).
Decision D-006: bcrypt hash stored; the clear token is generated server-side at
session creation, returned **once** in the API response and delivered out-of-band.
Never re-displayable.
"""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import DateTime, ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
if TYPE_CHECKING:
from mimic.db.models.engagement import Engagement
from mimic.db.models.user import User
class SocSession(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "soc_session"
user_id: Mapped[UUID] = mapped_column(
ForeignKey("user.id", ondelete="CASCADE"),
nullable=False,
)
engagement_id: Mapped[UUID] = mapped_column(
ForeignKey("engagement.id", ondelete="CASCADE"),
nullable=False,
)
token_hash: Mapped[str] = mapped_column(String(255), nullable=False, unique=True)
# bcrypt hash. Plain token returned once at creation.
expires_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
revoked_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
last_ip: Mapped[str | None] = mapped_column(String(64))
last_user_agent: Mapped[str | None] = mapped_column(String(512))
last_used_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
user: Mapped[User] = relationship(back_populates="soc_sessions")
engagement: Mapped[Engagement] = relationship()

View File

@@ -0,0 +1,105 @@
"""TTP library + immutable version snapshots.
Note (D-T-ttp-version): spec H32 originally said no `ttp_version` table (snapshot
lives on `run.snapshot_json`). Sprint 0 reintroduces a `ttp_version` table for
clean traceability across runs and to honor the kickoff data-model directive
from the team-lead. `run.snapshot_json` remains the source of truth for replay,
but each promotion / edit produces an immutable `ttp_version` row that's easier
to diff and reference from imports / audit log.
"""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import (
JSON,
Boolean,
DateTime,
Enum,
ForeignKey,
Integer,
String,
Text,
UniqueConstraint,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
from mimic.db.types import PayloadType, TtpSource
if TYPE_CHECKING:
pass
class Ttp(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "ttp"
name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str | None] = mapped_column(Text)
mitre_technique: Mapped[str] = mapped_column(String(16), nullable=False)
mitre_subtechnique: Mapped[str | None] = mapped_column(String(16))
payload_type: Mapped[PayloadType] = mapped_column(
Enum(PayloadType, name="payload_type"),
nullable=False,
)
payload_template: Mapped[str] = mapped_column(Text, nullable=False, default="")
params_schema_json: Mapped[dict | None] = mapped_column(JSON)
opsec_notes: Mapped[str | None] = mapped_column(Text)
cleanup_command: Mapped[str | None] = mapped_column(Text)
is_stealth_variant: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
source: Mapped[TtpSource] = mapped_column(
Enum(TtpSource, name="ttp_source"),
default=TtpSource.CUSTOM,
nullable=False,
)
tags: Mapped[list[str]] = mapped_column(JSON, default=list, nullable=False)
current_version: Mapped[int] = mapped_column(Integer, default=1, nullable=False)
is_published: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
# is_published = promoted to the library (lead RT only — F11).
created_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
versions: Mapped[list[TtpVersion]] = relationship(
back_populates="ttp",
cascade="all, delete-orphan",
order_by="TtpVersion.version",
)
class TtpVersion(UuidPkMixin, Base):
"""Immutable snapshot of a TTP at the moment it was published or used.
Used by importers / audit / report builder. `run.snapshot_json` still
embeds a full self-contained copy for replay independence.
"""
__tablename__ = "ttp_version"
__table_args__ = (
UniqueConstraint("ttp_id", "version", name="uq_ttp_version_ttp_id_version"),
)
ttp_id: Mapped[UUID] = mapped_column(
ForeignKey("ttp.id", ondelete="CASCADE"),
nullable=False,
)
version: Mapped[int] = mapped_column(Integer, nullable=False)
snapshot_json: Mapped[dict] = mapped_column(JSON, nullable=False)
content_sha256: Mapped[str] = mapped_column(String(64), nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
created_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
ttp: Mapped[Ttp] = relationship(back_populates="versions")

View File

@@ -0,0 +1,50 @@
"""User accounts (RT operators, RT leads, SOC analysts)."""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import DateTime, Enum, String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
from mimic.db.types import UserType
if TYPE_CHECKING:
from mimic.db.models.permission import UserGroup
from mimic.db.models.soc_session import SocSession
class User(UuidPkMixin, TimestampsMixin, Base):
__tablename__ = "user"
__table_args__ = (UniqueConstraint("email", name="uq_user_email"),)
type: Mapped[UserType] = mapped_column(Enum(UserType, name="user_type"), nullable=False)
email: Mapped[str] = mapped_column(String(255), nullable=False)
display_name: Mapped[str | None] = mapped_column(String(120))
keycloak_sub: Mapped[str | None] = mapped_column(String(255), unique=True)
local_password_hash: Mapped[str | None] = mapped_column(String(255))
disabled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
last_login_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
group_links: Mapped[list[UserGroup]] = relationship(
back_populates="user",
cascade="all, delete-orphan",
)
soc_sessions: Mapped[list[SocSession]] = relationship(
back_populates="user",
cascade="all, delete-orphan",
)
@property
def is_active(self) -> bool:
return self.disabled_at is None
@property
def created_by_id(self) -> UUID | None:
# Placeholder for future audit linkage; kept for symmetry with spec §8.
return None

View File

@@ -0,0 +1,110 @@
"""Reusable column / enum definitions."""
from __future__ import annotations
import enum
class C2Type(enum.StrEnum):
"""C2 backend kind. Source of truth for connector resolution.
Spec §F3 / §F4: `scenario.c2_type` is the authoritative value at run time;
every host used in a scenario must match it.
"""
MYTHIC = "mythic"
HOME = "home"
class PayloadType(enum.StrEnum):
"""Internal neutral payload kind (spec §7 table).
The C2Connector resolves the actual native command on a per-c2 basis.
`UnsupportedPayloadType` is raised when no mapping exists.
"""
CMD = "cmd"
POWERSHELL = "powershell"
BOF = "bof"
DOTNET_ASSEMBLY = "dotnet_assembly"
DOTNET_EXE = "dotnet_exe"
PE_EXE = "pe_exe"
PE_DLL = "pe_dll"
SHELLCODE = "shellcode"
PYTHON = "python"
VBS = "vbs"
WMI_QUERY = "wmi_query"
REGISTRY = "registry"
SCRIPT_FILE = "script_file"
class UserType(enum.StrEnum):
"""Application user kind."""
RT_OPERATOR = "rt_operator"
RT_LEAD = "rt_lead"
SOC_ANALYST = "soc_analyst"
class EngagementStatus(enum.StrEnum):
DRAFT = "draft"
ACTIVE = "active"
CLOSED = "closed"
ARCHIVED = "archived"
class HostStatus(enum.StrEnum):
UNKNOWN = "unknown"
ALIVE = "alive"
DEAD = "dead"
class TtpSource(enum.StrEnum):
CUSTOM = "custom"
IMPORT_ATR = "import_atr"
IMPORT_MISSION = "import_mission"
class RunStatus(enum.StrEnum):
QUEUED = "queued"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
ABORTED = "aborted"
class RunStepStatus(enum.StrEnum):
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
CLEANUP_FAILED = "cleanup_failed"
class CleanupStatus(enum.StrEnum):
PENDING = "pending"
SUCCESS = "success"
FAILED = "failed"
PARTIAL = "partial"
class DetectionLevel(enum.StrEnum):
DETECTED = "detected"
PARTIAL = "partial"
NOT_DETECTED = "not_detected"
class DetectionSource(enum.StrEnum):
NDR = "ndr"
EDR = "edr"
SIEM = "siem"
MANUAL = "manual"
OTHER = "other"
class EvidenceStatus(enum.StrEnum):
SUCCESS = "success"
FAILURE = "failure"
PARTIAL = "partial"

View File

@@ -0,0 +1,15 @@
"""Singletons wired into the Flask app at create-time."""
from __future__ import annotations
from flask_login import LoginManager
from flask_migrate import Migrate
from flask_socketio import SocketIO
from flask_sqlalchemy import SQLAlchemy
from mimic.db.base import Base
db = SQLAlchemy(model_class=Base)
migrate = Migrate()
login_manager = LoginManager()
socketio = SocketIO(async_mode="gevent", cors_allowed_origins=[])

View File

@@ -0,0 +1,27 @@
"""Structured JSON logging (NF-observability)."""
from __future__ import annotations
import logging
import sys
from pythonjsonlogger.jsonlogger import JsonFormatter
def configure_logging(level: str = "INFO", *, as_json: bool = True) -> None:
"""Configure the root logger once at app start."""
handler = logging.StreamHandler(sys.stdout)
if as_json:
formatter: logging.Formatter = JsonFormatter(
"%(asctime)s %(levelname)s %(name)s %(message)s"
)
else:
formatter = logging.Formatter(
"%(asctime)s %(levelname)-8s %(name)s: %(message)s"
)
handler.setFormatter(formatter)
root = logging.getLogger()
root.handlers.clear()
root.addHandler(handler)
root.setLevel(level.upper())