87 lines
2.7 KiB
Python
87 lines
2.7 KiB
Python
|
|
"""Scenario aggregate: ordered list of TTP steps.
|
||
|
|
|
||
|
|
Spec §F3: `scenario.c2_type` is the source of truth for the run. Verified at
|
||
|
|
run start that every referenced host matches.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from typing import TYPE_CHECKING
|
||
|
|
from uuid import UUID
|
||
|
|
|
||
|
|
from sqlalchemy import (
|
||
|
|
JSON,
|
||
|
|
Enum,
|
||
|
|
ForeignKey,
|
||
|
|
Integer,
|
||
|
|
String,
|
||
|
|
Text,
|
||
|
|
UniqueConstraint,
|
||
|
|
)
|
||
|
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||
|
|
|
||
|
|
from mimic.db.base import Base, TimestampsMixin, UuidPkMixin
|
||
|
|
from mimic.db.types import C2Type
|
||
|
|
|
||
|
|
if TYPE_CHECKING:
|
||
|
|
from mimic.db.models.engagement import Engagement
|
||
|
|
from mimic.db.models.host import Host
|
||
|
|
from mimic.db.models.run import Run
|
||
|
|
from mimic.db.models.ttp import Ttp
|
||
|
|
|
||
|
|
|
||
|
|
class Scenario(UuidPkMixin, TimestampsMixin, Base):
|
||
|
|
__tablename__ = "scenario"
|
||
|
|
|
||
|
|
engagement_id: Mapped[UUID] = mapped_column(
|
||
|
|
ForeignKey("engagement.id", ondelete="CASCADE"),
|
||
|
|
nullable=False,
|
||
|
|
)
|
||
|
|
name: Mapped[str] = mapped_column(String(255), nullable=False)
|
||
|
|
description: Mapped[str | None] = mapped_column(Text)
|
||
|
|
version: Mapped[int] = mapped_column(Integer, default=1, nullable=False)
|
||
|
|
|
||
|
|
c2_type: Mapped[C2Type] = mapped_column(
|
||
|
|
Enum(C2Type, name="c2_type", create_type=False),
|
||
|
|
nullable=False,
|
||
|
|
)
|
||
|
|
created_by_id: Mapped[UUID | None] = mapped_column(
|
||
|
|
ForeignKey("user.id", ondelete="SET NULL")
|
||
|
|
)
|
||
|
|
|
||
|
|
engagement: Mapped[Engagement] = relationship(back_populates="scenarios")
|
||
|
|
steps: Mapped[list[ScenarioStep]] = relationship(
|
||
|
|
back_populates="scenario",
|
||
|
|
cascade="all, delete-orphan",
|
||
|
|
order_by="ScenarioStep.order_idx",
|
||
|
|
)
|
||
|
|
runs: Mapped[list[Run]] = relationship(back_populates="scenario")
|
||
|
|
|
||
|
|
|
||
|
|
class ScenarioStep(UuidPkMixin, TimestampsMixin, Base):
|
||
|
|
__tablename__ = "scenario_step"
|
||
|
|
__table_args__ = (
|
||
|
|
UniqueConstraint("scenario_id", "order_idx", name="uq_scenario_step_order_idx"),
|
||
|
|
)
|
||
|
|
|
||
|
|
scenario_id: Mapped[UUID] = mapped_column(
|
||
|
|
ForeignKey("scenario.id", ondelete="CASCADE"),
|
||
|
|
nullable=False,
|
||
|
|
)
|
||
|
|
order_idx: Mapped[int] = mapped_column(Integer, nullable=False)
|
||
|
|
|
||
|
|
ttp_id: Mapped[UUID] = mapped_column(
|
||
|
|
ForeignKey("ttp.id", ondelete="RESTRICT"),
|
||
|
|
nullable=False,
|
||
|
|
)
|
||
|
|
host_id: Mapped[UUID] = mapped_column(
|
||
|
|
ForeignKey("host.id", ondelete="RESTRICT"),
|
||
|
|
nullable=False,
|
||
|
|
)
|
||
|
|
params_override_json: Mapped[dict] = mapped_column(JSON, default=dict, nullable=False)
|
||
|
|
delay_after_ms: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
|
||
|
|
|
||
|
|
scenario: Mapped[Scenario] = relationship(back_populates="steps")
|
||
|
|
ttp: Mapped[Ttp] = relationship()
|
||
|
|
host: Mapped[Host] = relationship()
|