User feedback after the M7 ship: blue team's Excel workflow had 5 extra
fields we didn't capture. Per-test page also doesn't match their
workflow — they need a tabular view, one table per scenario.
Spec
- tasks/spec.md amended (`revised: 2026-05-15`): §4 in-scope, §F6, §8
model bullet. §F6 now pins the column matrix, single-row-edit
semantics, Esc-cancel, blur-confirm, and reconciles detection_level
as a pill inside the Commentaires cell (no 8th column).
- tasks/todo.md M7 section grew an "Amendement 2026-05-15" sub-block
tracking backend ☑ and frontend ☐.
Backend
- Migration c2a8f4b1d6e9: 5 nullable columns on mission_tests
(blue_log_source, blue_siem_logs, blue_incident_at,
blue_incident_number, blue_incident_recipient_email).
- _BLUE_FIELDS extended; update_mission_test_fields propagates each
field; MissionTestDetailView + MissionTestView (the nested view in
GET /missions/{id}) surface every annotation field, plus
last_actor_*, updated_at, detection_level_key — O(1) batch lookup
for detection-level keys and last-actor users keeps it scalable.
- UpdateMissionTestPayload accepts each field with length caps
(120/200_000/120/255).
Reviewer follow-ups applied
- blue_incident_at + executed_at now reject naïve datetimes
(_ensure_aware_datetime) — Postgres would otherwise interpret
them in the session TZ, defeating the M7 verbatim-time contract.
- blue_incident_recipient_email goes through a permissive RFC-shape
regex (_validate_email_shape) so internal/lab TLDs like .local
/ .corp / .test pass — Pydantic EmailStr is too strict (lessons.md
M2 trap).
- Project-wide: switched `e.errors()` to
`e.errors(include_context=False, include_url=False)` because the
AfterValidator-raised ValueError lands in ctx and Flask can't
serialize it.
Tests
- 5 new pytest cases: blue user writes the 5 new fields, red user is
individually 403'd on each, round-trip via GET, naïve datetime
rejected, email shape validated (.local accepted, bad shape 400).
- 138 pytest green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
344 lines
13 KiB
Python
344 lines
13 KiB
Python
"""Missions and snapshots.
|
|
|
|
A `Mission` references members and a tree of snapshot rows:
|
|
mission ─< mission_scenarios ─< mission_tests ─< (red/blue annotations)
|
|
|
|
Snapshots copy template fields verbatim so editing a template doesn't drift
|
|
already-running missions. `source_*_template_id` keep a soft pointer for
|
|
analytics, but the source rows can be soft-deleted without breaking the mission.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from datetime import date, datetime
|
|
from typing import Any
|
|
|
|
from sqlalchemy import (
|
|
ARRAY,
|
|
CheckConstraint,
|
|
Date,
|
|
DateTime,
|
|
ForeignKey,
|
|
Index,
|
|
Integer,
|
|
String,
|
|
Text,
|
|
UniqueConstraint,
|
|
Uuid,
|
|
)
|
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
|
|
|
from app.db.base import Base
|
|
from app.db.mixins import SoftDeleteMixin, TimestampMixin, UuidPkMixin
|
|
|
|
# DateTime is no longer needed since MissionMember now uses TimestampMixin.
|
|
# The remaining DateTime usages in MissionTest (executed_at) keep the import below.
|
|
from app.db.types import (
|
|
MISSION_ROLE_HINTS,
|
|
MISSION_STATUSES,
|
|
MISSION_TEST_STATES,
|
|
MISSION_VISIBILITY_MODES,
|
|
MITRE_KINDS,
|
|
OPSEC_LEVELS,
|
|
)
|
|
|
|
# `mission_test_mitre_tags` deliberately denormalises the MITRE labels so a
|
|
# mission's tags survive a MITRE re-sync that drops the original entry. The
|
|
# FK columns were removed in favour of frozen `mitre_external_id` + `mitre_name`
|
|
# snapshots — see spec §11 ("snapshot vs reference" risk).
|
|
|
|
|
|
class Mission(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
|
|
__tablename__ = "missions"
|
|
|
|
name: Mapped[str] = mapped_column(String(255), nullable=False)
|
|
client_target: Mapped[str | None] = mapped_column(String(255), nullable=True)
|
|
date_start: Mapped[date | None] = mapped_column(Date, nullable=True)
|
|
date_end: Mapped[date | None] = mapped_column(Date, nullable=True)
|
|
status: Mapped[str] = mapped_column(String(16), default="draft", nullable=False)
|
|
description_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
visibility_mode: Mapped[str] = mapped_column(
|
|
String(16), default="whitebox", nullable=False
|
|
)
|
|
|
|
members: Mapped[list["MissionMember"]] = relationship(
|
|
back_populates="mission",
|
|
cascade="all, delete-orphan",
|
|
)
|
|
scenarios: Mapped[list["MissionScenario"]] = relationship(
|
|
back_populates="mission",
|
|
cascade="all, delete-orphan",
|
|
order_by="MissionScenario.position",
|
|
)
|
|
categories: Mapped[list["MissionCategory"]] = relationship(
|
|
back_populates="mission",
|
|
cascade="all, delete-orphan",
|
|
order_by="MissionCategory.position",
|
|
)
|
|
|
|
__table_args__ = (
|
|
CheckConstraint(
|
|
f"status IN ({', '.join(repr(v) for v in MISSION_STATUSES)})",
|
|
name="status_valid",
|
|
),
|
|
CheckConstraint(
|
|
f"visibility_mode IN ({', '.join(repr(v) for v in MISSION_VISIBILITY_MODES)})",
|
|
name="visibility_mode_valid",
|
|
),
|
|
Index("ix_missions_active", "deleted_at", postgresql_where="deleted_at IS NULL"),
|
|
Index("ix_missions_status", "status"),
|
|
)
|
|
|
|
|
|
class MissionMember(Base, TimestampMixin):
|
|
"""A user's membership in a mission with a hint about their team side."""
|
|
|
|
__tablename__ = "mission_members"
|
|
|
|
mission_id: Mapped[uuid.UUID] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
ForeignKey("missions.id", ondelete="CASCADE"),
|
|
primary_key=True,
|
|
)
|
|
user_id: Mapped[uuid.UUID] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
ForeignKey("users.id", ondelete="CASCADE"),
|
|
primary_key=True,
|
|
)
|
|
role_hint: Mapped[str] = mapped_column(String(8), nullable=False)
|
|
|
|
mission: Mapped[Mission] = relationship(back_populates="members")
|
|
|
|
__table_args__ = (
|
|
CheckConstraint(
|
|
f"role_hint IN ({', '.join(repr(v) for v in MISSION_ROLE_HINTS)})",
|
|
name="role_hint_valid",
|
|
),
|
|
Index("ix_mission_members_user", "user_id"),
|
|
)
|
|
|
|
|
|
class MissionScenario(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
|
|
"""Snapshot of a `scenario_template` instantiated within a mission."""
|
|
|
|
__tablename__ = "mission_scenarios"
|
|
|
|
mission_id: Mapped[uuid.UUID] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
ForeignKey("missions.id", ondelete="CASCADE"),
|
|
nullable=False,
|
|
)
|
|
source_scenario_template_id: Mapped[uuid.UUID | None] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
ForeignKey("scenario_templates.id", ondelete="SET NULL"),
|
|
nullable=True,
|
|
)
|
|
snapshot_name: Mapped[str] = mapped_column(String(255), nullable=False)
|
|
snapshot_description: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
position: Mapped[int] = mapped_column(Integer, nullable=False)
|
|
|
|
mission: Mapped[Mission] = relationship(back_populates="scenarios")
|
|
tests: Mapped[list["MissionTest"]] = relationship(
|
|
back_populates="scenario",
|
|
cascade="all, delete-orphan",
|
|
order_by="MissionTest.position",
|
|
)
|
|
|
|
__table_args__ = (
|
|
UniqueConstraint(
|
|
"mission_id", "position", name="uq_mission_scenarios_position"
|
|
),
|
|
Index("ix_mission_scenarios_mission", "mission_id"),
|
|
Index(
|
|
"ix_mission_scenarios_active",
|
|
"deleted_at",
|
|
postgresql_where="deleted_at IS NULL",
|
|
),
|
|
)
|
|
|
|
|
|
class MissionTest(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
|
|
"""Snapshot of a `test_template` + execution state + red/blue annotations."""
|
|
|
|
__tablename__ = "mission_tests"
|
|
|
|
scenario_id: Mapped[uuid.UUID] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
ForeignKey("mission_scenarios.id", ondelete="CASCADE"),
|
|
nullable=False,
|
|
)
|
|
source_test_template_id: Mapped[uuid.UUID | None] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
ForeignKey("test_templates.id", ondelete="SET NULL"),
|
|
nullable=True,
|
|
)
|
|
position: Mapped[int] = mapped_column(Integer, nullable=False)
|
|
|
|
# --- Snapshot of the template (immutable after creation) ---
|
|
snapshot_name: Mapped[str] = mapped_column(String(255), nullable=False)
|
|
snapshot_description: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
snapshot_objective: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
snapshot_procedure_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
snapshot_prerequisites_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
snapshot_expected_red_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
snapshot_expected_blue_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
snapshot_opsec_level: Mapped[str] = mapped_column(
|
|
String(8), default="medium", nullable=False
|
|
)
|
|
snapshot_tags: Mapped[list[str]] = mapped_column(
|
|
ARRAY(String(64)), nullable=False, server_default="{}"
|
|
)
|
|
snapshot_expected_iocs: Mapped[list[str]] = mapped_column(
|
|
ARRAY(String(255)), nullable=False, server_default="{}"
|
|
)
|
|
|
|
# --- Execution state ---
|
|
state: Mapped[str] = mapped_column(String(24), default="pending", nullable=False)
|
|
executed_at: Mapped[datetime | None] = mapped_column(
|
|
DateTime(timezone=True), nullable=True
|
|
)
|
|
executed_at_overridden: Mapped[bool] = mapped_column(default=False, nullable=False)
|
|
|
|
# --- Red side (text-only per spec §4) ---
|
|
red_command: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
red_output: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
red_comment_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
|
|
# --- Blue side ---
|
|
blue_comment_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
detection_level_id: Mapped[uuid.UUID | None] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
ForeignKey("detection_levels.id", ondelete="SET NULL"),
|
|
nullable=True,
|
|
)
|
|
# Post-M7 blue-side review fields (cf. user feedback 2026-05-15). The
|
|
# blue team historically maintained these in an Excel sheet — log source,
|
|
# raw SIEM excerpt, plus a small cyber-incident sub-record. All five are
|
|
# gated by `mission.write_blue_fields` at the service layer.
|
|
blue_log_source: Mapped[str | None] = mapped_column(String(120), nullable=True)
|
|
blue_siem_logs: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
blue_incident_at: Mapped[datetime | None] = mapped_column(
|
|
DateTime(timezone=True), nullable=True
|
|
)
|
|
blue_incident_number: Mapped[str | None] = mapped_column(
|
|
String(120), nullable=True
|
|
)
|
|
blue_incident_recipient_email: Mapped[str | None] = mapped_column(
|
|
String(255), nullable=True
|
|
)
|
|
category_id: Mapped[uuid.UUID | None] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
ForeignKey("mission_categories.id", ondelete="SET NULL"),
|
|
nullable=True,
|
|
)
|
|
|
|
# --- Activity tracking (M7) ---
|
|
# Last user who wrote any red/blue field, flipped state, or uploaded
|
|
# evidence on this test. Used by the polling activity endpoint to drive
|
|
# the "modified by X Ns ago" badge. FK ON DELETE SET NULL so removing a
|
|
# user retains the history (the badge falls back to "<deleted>").
|
|
last_actor_id: Mapped[uuid.UUID | None] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
ForeignKey("users.id", ondelete="SET NULL"),
|
|
nullable=True,
|
|
)
|
|
|
|
scenario: Mapped[MissionScenario] = relationship(back_populates="tests")
|
|
mitre_tags: Mapped[list["MissionTestMitreTag"]] = relationship(
|
|
back_populates="mission_test",
|
|
cascade="all, delete-orphan",
|
|
lazy="selectin",
|
|
)
|
|
|
|
__table_args__ = (
|
|
CheckConstraint(
|
|
f"snapshot_opsec_level IN ({', '.join(repr(v) for v in OPSEC_LEVELS)})",
|
|
name="snapshot_opsec_level_valid",
|
|
),
|
|
CheckConstraint(
|
|
f"state IN ({', '.join(repr(v) for v in MISSION_TEST_STATES)})",
|
|
name="state_valid",
|
|
),
|
|
UniqueConstraint("scenario_id", "position", name="uq_mission_tests_position"),
|
|
Index("ix_mission_tests_state", "state"),
|
|
Index("ix_mission_tests_updated_at", "updated_at"),
|
|
Index(
|
|
"ix_mission_tests_active",
|
|
"deleted_at",
|
|
postgresql_where="deleted_at IS NULL",
|
|
),
|
|
)
|
|
|
|
|
|
class MissionCategory(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
|
|
"""Optional custom grouping override for the slide synthesis."""
|
|
|
|
__tablename__ = "mission_categories"
|
|
|
|
mission_id: Mapped[uuid.UUID] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
ForeignKey("missions.id", ondelete="CASCADE"),
|
|
nullable=False,
|
|
)
|
|
name: Mapped[str] = mapped_column(String(120), nullable=False)
|
|
color_token: Mapped[str | None] = mapped_column(String(16), nullable=True)
|
|
position: Mapped[int] = mapped_column(Integer, nullable=False)
|
|
|
|
mission: Mapped[Mission] = relationship(back_populates="categories")
|
|
|
|
__table_args__ = (
|
|
UniqueConstraint(
|
|
"mission_id", "position", name="uq_mission_categories_position"
|
|
),
|
|
UniqueConstraint("mission_id", "name", name="uq_mission_categories_name"),
|
|
Index(
|
|
"ix_mission_categories_active",
|
|
"deleted_at",
|
|
postgresql_where="deleted_at IS NULL",
|
|
),
|
|
)
|
|
|
|
|
|
class MissionTestMitreTag(Base):
|
|
"""Frozen MITRE tag attached to a mission test.
|
|
|
|
DELIBERATELY DENORMALISED — no FK to mitre_* tables. The MITRE
|
|
`external_id` and human label are copied at tag-creation time so that a
|
|
later MITRE re-sync that drops the original entry cannot purge or alter
|
|
a mission's tags. See spec §11 (snapshot vs reference).
|
|
|
|
The companion `test_template_mitre_tags` table keeps the FK relationship
|
|
because templates are editable and admins can re-tag them after a sync.
|
|
"""
|
|
|
|
__tablename__ = "mission_test_mitre_tags"
|
|
|
|
id: Mapped[uuid.UUID] = mapped_column(
|
|
Uuid(as_uuid=True), primary_key=True, default=uuid.uuid4, nullable=False
|
|
)
|
|
mission_test_id: Mapped[uuid.UUID] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
ForeignKey("mission_tests.id", ondelete="CASCADE"),
|
|
nullable=False,
|
|
)
|
|
mitre_kind: Mapped[str] = mapped_column(String(16), nullable=False)
|
|
mitre_external_id: Mapped[str] = mapped_column(String(16), nullable=False)
|
|
mitre_name: Mapped[str] = mapped_column(String(255), nullable=False)
|
|
mitre_url: Mapped[str | None] = mapped_column(String(512), nullable=True)
|
|
|
|
mission_test: Mapped[MissionTest] = relationship(back_populates="mitre_tags")
|
|
|
|
__table_args__: Any = (
|
|
CheckConstraint(
|
|
f"mitre_kind IN ({', '.join(repr(v) for v in MITRE_KINDS)})",
|
|
name="mitre_kind_valid",
|
|
),
|
|
UniqueConstraint(
|
|
"mission_test_id",
|
|
"mitre_external_id",
|
|
name="uq_mission_test_mitre_tag",
|
|
),
|
|
Index("ix_mission_test_mitre_tags_test", "mission_test_id"),
|
|
)
|