User: «Enlève également le workflow d'un test, quand on saisit des
informations côtés redteam cela signifie qu'il a été exécuté et donc
en attente d'une review blueteam.»
Backend (update_mission_test_fields)
- At the end of every PUT, inspect the touched-field set:
- any red write on state in {pending, skipped, blocked} → state=executed
+ auto-stamp executed_at=now() if absent
- any blue write on state=executed → state=reviewed_by_blue
- /transition endpoint kept for back-fill/admin use, not called from UI.
Frontend MissionTestPage
- Removed the transition-buttons header block and the `transition`
mutation. State pill stays as a passive indicator.
- New labels: "Not started" / "Awaiting review" / "Reviewed" describe
the implicit lifecycle, no longer exposing the state-machine concept.
E2E
- The SPA test that clicked `transition-executed` now verifies the
implicit promotion: typing red fields and saving flips the pill from
"Not started" → "Awaiting review", no button click required.
Spec
- §4 reword: "Cycle de vie implicite, piloté par les écritures" replaces
the old "Workflow par test instance" bullet.
Tests
- 3 new pytest: red_command-alone implicit execute + auto-stamp,
blue write promotes executed→reviewed, blue write on pending no-op.
- 142 pytest + 49 Playwright green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
737 lines
26 KiB
Python
737 lines
26 KiB
Python
"""Per-mission-test execution service (M7).
|
|
|
|
Where M6 builds the snapshot, M7 brings the test to life:
|
|
|
|
- Red side: command, output, comment, mark-executed (auto + override).
|
|
- Blue side: detection level, comment, evidence (delegated to `evidence.py`).
|
|
- State machine: pending↔skipped/blocked, executed→reviewed_by_blue.
|
|
|
|
The caller is responsible for telling us which side it has perms for via
|
|
`has_red_perm` / `has_blue_perm`. The service refuses field/state writes that
|
|
require a side the caller does not hold, raising `MissingFieldPermission`.
|
|
|
|
Mission membership is enforced here (404 not 403) consistent with M6 to
|
|
prevent existence leaks.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session, selectinload
|
|
|
|
from app.db.session import session_scope
|
|
from app.db.types import MISSION_TEST_STATES
|
|
from app.models.auth import User
|
|
from app.models.evidence import EvidenceFile
|
|
from app.models.mission import (
|
|
Mission,
|
|
MissionScenario,
|
|
MissionTest,
|
|
)
|
|
from app.models.setting import DetectionLevel
|
|
from app.services.missions import (
|
|
MissionNotFound,
|
|
_is_member,
|
|
)
|
|
|
|
log = logging.getLogger("metamorph.mission_tests")
|
|
|
|
_UNSET: Any = object()
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# State machine
|
|
# --------------------------------------------------------------------------- #
|
|
#
|
|
# Per spec §M7: pending↔skipped/blocked, executed→reviewed_by_blue.
|
|
# We also allow `executed → pending` and `reviewed_by_blue → executed` so a
|
|
# red/blue user can revert a misclick without admin intervention. Soft-delete
|
|
# is the only forward-only sink (handled outside this service).
|
|
#
|
|
|
|
_VALID_TRANSITIONS: dict[str, frozenset[str]] = {
|
|
"pending": frozenset({"executed", "skipped", "blocked"}),
|
|
"executed": frozenset({"reviewed_by_blue", "pending"}),
|
|
"reviewed_by_blue": frozenset({"executed"}),
|
|
"skipped": frozenset({"pending"}),
|
|
"blocked": frozenset({"pending"}),
|
|
}
|
|
|
|
# Which side "owns" each transition for permission purposes:
|
|
# "red" → requires mission.write_red_fields
|
|
# "blue" → requires mission.write_blue_fields
|
|
# "any" → either side suffices
|
|
_TRANSITION_SIDE: dict[tuple[str, str], str] = {
|
|
("pending", "executed"): "red",
|
|
("pending", "skipped"): "any",
|
|
("pending", "blocked"): "any",
|
|
("executed", "reviewed_by_blue"): "blue",
|
|
("executed", "pending"): "red",
|
|
("reviewed_by_blue", "executed"): "blue",
|
|
("skipped", "pending"): "any",
|
|
("blocked", "pending"): "any",
|
|
}
|
|
|
|
# Same-state idempotent POSTs are still gated: a user replaying a "mark
|
|
# executed" must still hold red perms even if the row is already executed.
|
|
# This map answers "if you wanted to BE in state X, which side originally
|
|
# brought you here?" — and therefore what perm a no-op repeat should require.
|
|
_IDEMPOTENT_SIDE: dict[str, str] = {
|
|
"executed": "red",
|
|
"reviewed_by_blue": "blue",
|
|
"pending": "any",
|
|
"skipped": "any",
|
|
"blocked": "any",
|
|
}
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Exceptions
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class MissionTestNotFound(Exception):
|
|
"""Test missing, soft-deleted, or not under the given mission/viewer."""
|
|
|
|
|
|
class InvalidTestTransition(Exception):
|
|
pass
|
|
|
|
|
|
class MissingFieldPermission(Exception):
|
|
"""Caller tried to write a field requiring a side perm they do not hold."""
|
|
|
|
|
|
class InvalidTestPayload(Exception):
|
|
"""Generic validation error (bad dates, unknown detection level, ...)."""
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Views
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class EvidenceView:
|
|
id: uuid.UUID
|
|
mission_test_id: uuid.UUID
|
|
sha256: str
|
|
mime: str
|
|
size_bytes: int
|
|
original_filename: str
|
|
uploaded_by_user_id: uuid.UUID | None
|
|
uploaded_by_email: str | None
|
|
uploaded_by_display_name: str | None
|
|
uploaded_at: datetime
|
|
created_at: datetime
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class MissionTestMitreTagView:
|
|
kind: str
|
|
external_id: str
|
|
name: str
|
|
url: str | None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class MissionTestDetailView:
|
|
id: uuid.UUID
|
|
mission_id: uuid.UUID
|
|
scenario_id: uuid.UUID
|
|
position: int
|
|
snapshot_name: str
|
|
snapshot_description: str | None
|
|
snapshot_objective: str | None
|
|
snapshot_procedure_md: str | None
|
|
snapshot_prerequisites_md: str | None
|
|
snapshot_expected_red_md: str | None
|
|
snapshot_expected_blue_md: str | None
|
|
snapshot_opsec_level: str
|
|
snapshot_tags: list[str]
|
|
snapshot_expected_iocs: list[str]
|
|
state: str
|
|
executed_at: datetime | None
|
|
executed_at_overridden: bool
|
|
red_command: str | None
|
|
red_output: str | None
|
|
red_comment_md: str | None
|
|
blue_comment_md: str | None
|
|
detection_level_id: uuid.UUID | None
|
|
detection_level_key: str | None
|
|
# Post-M7 blue review fields (cf. user feedback 2026-05-15).
|
|
blue_log_source: str | None
|
|
blue_siem_logs: str | None
|
|
blue_incident_at: datetime | None
|
|
blue_incident_number: str | None
|
|
blue_incident_recipient_email: str | None
|
|
last_actor_id: uuid.UUID | None
|
|
last_actor_email: str | None
|
|
last_actor_display_name: str | None
|
|
updated_at: datetime
|
|
mitre_tags: list[MissionTestMitreTagView]
|
|
evidence: list[EvidenceView]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ActivityEntryView:
|
|
test_id: uuid.UUID
|
|
scenario_id: uuid.UUID
|
|
state: str
|
|
updated_at: datetime
|
|
last_actor_id: uuid.UUID | None
|
|
last_actor_email: str | None
|
|
last_actor_display_name: str | None
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Helpers
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
def _opt_md(value: Any) -> str | None:
|
|
"""Normalise a markdown/text input: strip-then-collapse-to-None on empty."""
|
|
if value is None:
|
|
return None
|
|
if not isinstance(value, str):
|
|
raise InvalidTestPayload("text field must be a string")
|
|
v = value.strip()
|
|
return v or None
|
|
|
|
|
|
def _opt_cmd(value: Any) -> str | None:
|
|
"""Same as `_opt_md` but preserves trailing/leading whitespace inside the body."""
|
|
if value is None:
|
|
return None
|
|
if not isinstance(value, str):
|
|
raise InvalidTestPayload("text field must be a string")
|
|
return value if value != "" else None
|
|
|
|
|
|
def _ensure_state(value: str) -> str:
|
|
if value not in MISSION_TEST_STATES:
|
|
raise InvalidTestPayload(f"state must be one of {MISSION_TEST_STATES}")
|
|
return value
|
|
|
|
|
|
def _load_test(
|
|
s: Session, mission_id: uuid.UUID, test_id: uuid.UUID
|
|
) -> MissionTest:
|
|
"""Fetch a live mission_test guarded by mission id, raising on misses."""
|
|
stmt = (
|
|
select(MissionTest)
|
|
.join(MissionScenario, MissionTest.scenario_id == MissionScenario.id)
|
|
.options(selectinload(MissionTest.mitre_tags))
|
|
.where(
|
|
MissionTest.id == test_id,
|
|
MissionScenario.mission_id == mission_id,
|
|
MissionTest.deleted_at.is_(None),
|
|
MissionScenario.deleted_at.is_(None),
|
|
)
|
|
)
|
|
row = s.scalars(stmt).one_or_none()
|
|
if row is None:
|
|
raise MissionTestNotFound()
|
|
return row
|
|
|
|
|
|
def _ensure_mission_visible(
|
|
s: Session, mission_id: uuid.UUID, viewer_id: uuid.UUID, viewer_is_admin: bool
|
|
) -> Mission:
|
|
"""Confirm the mission exists, is live, and is visible to the viewer.
|
|
|
|
Returns the Mission row for reuse (e.g. to log the parent name in audit
|
|
extras). Raises `MissionNotFound` on any miss — we mirror M6's membership
|
|
visibility contract: leaking existence via 403 is forbidden.
|
|
"""
|
|
m = s.get(Mission, mission_id)
|
|
if m is None or m.deleted_at is not None:
|
|
raise MissionNotFound()
|
|
if not viewer_is_admin and not _is_member(s, mission_id, viewer_id):
|
|
raise MissionNotFound()
|
|
return m
|
|
|
|
|
|
def _to_evidence_view(ev: EvidenceFile, uploader: User | None) -> EvidenceView:
|
|
return EvidenceView(
|
|
id=ev.id,
|
|
mission_test_id=ev.mission_test_id,
|
|
sha256=ev.sha256,
|
|
mime=ev.mime,
|
|
size_bytes=ev.size_bytes,
|
|
original_filename=ev.original_filename,
|
|
uploaded_by_user_id=ev.uploaded_by_user_id,
|
|
uploaded_by_email=uploader.email if uploader is not None else None,
|
|
uploaded_by_display_name=uploader.display_name if uploader is not None else None,
|
|
uploaded_at=ev.uploaded_at,
|
|
created_at=ev.created_at,
|
|
)
|
|
|
|
|
|
def _load_evidence_for_test(s: Session, test_id: uuid.UUID) -> list[EvidenceView]:
|
|
rows = s.scalars(
|
|
select(EvidenceFile)
|
|
.where(
|
|
EvidenceFile.mission_test_id == test_id,
|
|
EvidenceFile.deleted_at.is_(None),
|
|
)
|
|
.order_by(EvidenceFile.uploaded_at.asc(), EvidenceFile.id.asc())
|
|
).all()
|
|
if not rows:
|
|
return []
|
|
uploader_ids = {r.uploaded_by_user_id for r in rows if r.uploaded_by_user_id}
|
|
uploaders: dict[uuid.UUID, User] = {}
|
|
if uploader_ids:
|
|
uploaders = {
|
|
u.id: u
|
|
for u in s.scalars(
|
|
select(User).where(User.id.in_(uploader_ids))
|
|
).all()
|
|
}
|
|
return [
|
|
_to_evidence_view(r, uploaders.get(r.uploaded_by_user_id) if r.uploaded_by_user_id else None)
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def _to_detail_view(
|
|
s: Session, mission_id: uuid.UUID, test: MissionTest
|
|
) -> MissionTestDetailView:
|
|
# Batch the two FK lookups (last actor + detection level) into a single
|
|
# round trip instead of two `s.get` calls — every PUT/transition returns
|
|
# the detail view, so this matters.
|
|
last_actor_email: str | None = None
|
|
last_actor_display_name: str | None = None
|
|
level_key: str | None = None
|
|
if test.last_actor_id is not None:
|
|
actor = s.execute(
|
|
select(User.email, User.display_name).where(User.id == test.last_actor_id)
|
|
).first()
|
|
if actor is not None:
|
|
last_actor_email, last_actor_display_name = actor.email, actor.display_name
|
|
if test.detection_level_id is not None:
|
|
level_key = s.scalar(
|
|
select(DetectionLevel.key).where(DetectionLevel.id == test.detection_level_id)
|
|
)
|
|
tag_views = [
|
|
MissionTestMitreTagView(
|
|
kind=tag.mitre_kind,
|
|
external_id=tag.mitre_external_id,
|
|
name=tag.mitre_name,
|
|
url=tag.mitre_url,
|
|
)
|
|
for tag in sorted(
|
|
test.mitre_tags, key=lambda t: (t.mitre_kind, t.mitre_external_id)
|
|
)
|
|
]
|
|
return MissionTestDetailView(
|
|
id=test.id,
|
|
mission_id=mission_id,
|
|
scenario_id=test.scenario_id,
|
|
position=test.position,
|
|
snapshot_name=test.snapshot_name,
|
|
snapshot_description=test.snapshot_description,
|
|
snapshot_objective=test.snapshot_objective,
|
|
snapshot_procedure_md=test.snapshot_procedure_md,
|
|
snapshot_prerequisites_md=test.snapshot_prerequisites_md,
|
|
snapshot_expected_red_md=test.snapshot_expected_red_md,
|
|
snapshot_expected_blue_md=test.snapshot_expected_blue_md,
|
|
snapshot_opsec_level=test.snapshot_opsec_level,
|
|
snapshot_tags=list(test.snapshot_tags or []),
|
|
snapshot_expected_iocs=list(test.snapshot_expected_iocs or []),
|
|
state=test.state,
|
|
executed_at=test.executed_at,
|
|
executed_at_overridden=test.executed_at_overridden,
|
|
red_command=test.red_command,
|
|
red_output=test.red_output,
|
|
red_comment_md=test.red_comment_md,
|
|
blue_comment_md=test.blue_comment_md,
|
|
detection_level_id=test.detection_level_id,
|
|
detection_level_key=level_key,
|
|
blue_log_source=test.blue_log_source,
|
|
blue_siem_logs=test.blue_siem_logs,
|
|
blue_incident_at=test.blue_incident_at,
|
|
blue_incident_number=test.blue_incident_number,
|
|
blue_incident_recipient_email=test.blue_incident_recipient_email,
|
|
last_actor_id=test.last_actor_id,
|
|
last_actor_email=last_actor_email,
|
|
last_actor_display_name=last_actor_display_name,
|
|
updated_at=test.updated_at,
|
|
mitre_tags=tag_views,
|
|
evidence=_load_evidence_for_test(s, test.id),
|
|
)
|
|
|
|
|
|
def _touch(test: MissionTest, actor_id: uuid.UUID) -> None:
|
|
"""Stamp the actor + bump the activity clock.
|
|
|
|
`updated_at` is auto-managed by SQLAlchemy's `onupdate=func.now()` mixin
|
|
only when at least one mapped attribute changes. Assigning `last_actor_id`
|
|
triggers that, even when the actor is the same as the previous one
|
|
(Pydantic-clean payloads still flush the assignment).
|
|
"""
|
|
test.last_actor_id = actor_id
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Public API — read
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
def get_mission_test(
|
|
mission_id: uuid.UUID,
|
|
test_id: uuid.UUID,
|
|
*,
|
|
viewer_id: uuid.UUID,
|
|
viewer_is_admin: bool,
|
|
) -> MissionTestDetailView:
|
|
with session_scope() as s:
|
|
_ensure_mission_visible(s, mission_id, viewer_id, viewer_is_admin)
|
|
test = _load_test(s, mission_id, test_id)
|
|
return _to_detail_view(s, mission_id, test)
|
|
|
|
|
|
def list_activity_since(
|
|
mission_id: uuid.UUID,
|
|
*,
|
|
viewer_id: uuid.UUID,
|
|
viewer_is_admin: bool,
|
|
since: datetime | None = None,
|
|
limit: int = 200,
|
|
) -> list[ActivityEntryView]:
|
|
"""List mission_tests whose `updated_at > since`, freshest first.
|
|
|
|
Drives the "modified by X Ns ago" badge on the per-test page. Soft-deleted
|
|
tests/scenarios are excluded so a deletion does not appear as activity.
|
|
"""
|
|
with session_scope() as s:
|
|
_ensure_mission_visible(s, mission_id, viewer_id, viewer_is_admin)
|
|
stmt = (
|
|
select(MissionTest, MissionScenario)
|
|
.join(MissionScenario, MissionTest.scenario_id == MissionScenario.id)
|
|
.where(
|
|
MissionScenario.mission_id == mission_id,
|
|
MissionTest.deleted_at.is_(None),
|
|
MissionScenario.deleted_at.is_(None),
|
|
)
|
|
.order_by(MissionTest.updated_at.desc(), MissionTest.id.asc())
|
|
.limit(max(1, min(limit, 500)))
|
|
)
|
|
if since is not None:
|
|
stmt = stmt.where(MissionTest.updated_at > since)
|
|
rows = s.execute(stmt).all()
|
|
actor_ids = {r.MissionTest.last_actor_id for r in rows if r.MissionTest.last_actor_id}
|
|
actors: dict[uuid.UUID, User] = {}
|
|
if actor_ids:
|
|
actors = {
|
|
u.id: u
|
|
for u in s.scalars(select(User).where(User.id.in_(actor_ids))).all()
|
|
}
|
|
out: list[ActivityEntryView] = []
|
|
for row in rows:
|
|
t = row.MissionTest
|
|
actor = actors.get(t.last_actor_id) if t.last_actor_id else None
|
|
out.append(
|
|
ActivityEntryView(
|
|
test_id=t.id,
|
|
scenario_id=t.scenario_id,
|
|
state=t.state,
|
|
updated_at=t.updated_at,
|
|
last_actor_id=t.last_actor_id,
|
|
last_actor_email=actor.email if actor else None,
|
|
last_actor_display_name=actor.display_name if actor else None,
|
|
)
|
|
)
|
|
return out
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Public API — write
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
# Side membership for each writable field (mirror of the spec's red/blue split).
|
|
_RED_FIELDS = {"red_command", "red_output", "red_comment_md",
|
|
"executed_at", "executed_at_overridden"}
|
|
_BLUE_FIELDS = {
|
|
"blue_comment_md",
|
|
"detection_level_id",
|
|
"blue_log_source",
|
|
"blue_siem_logs",
|
|
"blue_incident_at",
|
|
"blue_incident_number",
|
|
"blue_incident_recipient_email",
|
|
}
|
|
|
|
|
|
def _classify_fields(touched: set[str]) -> tuple[bool, bool]:
|
|
"""Return (needs_red, needs_blue) for the set of field names being written."""
|
|
return (
|
|
bool(touched & _RED_FIELDS),
|
|
bool(touched & _BLUE_FIELDS),
|
|
)
|
|
|
|
|
|
def update_mission_test_fields(
|
|
mission_id: uuid.UUID,
|
|
test_id: uuid.UUID,
|
|
*,
|
|
viewer_id: uuid.UUID,
|
|
viewer_is_admin: bool,
|
|
has_red_perm: bool,
|
|
has_blue_perm: bool,
|
|
red_command: Any = _UNSET,
|
|
red_output: Any = _UNSET,
|
|
red_comment_md: Any = _UNSET,
|
|
blue_comment_md: Any = _UNSET,
|
|
detection_level_id: Any = _UNSET,
|
|
executed_at: Any = _UNSET,
|
|
executed_at_overridden: Any = _UNSET,
|
|
blue_log_source: Any = _UNSET,
|
|
blue_siem_logs: Any = _UNSET,
|
|
blue_incident_at: Any = _UNSET,
|
|
blue_incident_number: Any = _UNSET,
|
|
blue_incident_recipient_email: Any = _UNSET,
|
|
) -> MissionTestDetailView:
|
|
"""Patch any subset of the red/blue annotation fields.
|
|
|
|
Field-level perm enforcement happens *before* any write so a forbidden
|
|
field never even lands in the SQL transaction (cleaner audit logs).
|
|
"""
|
|
touched: set[str] = set()
|
|
if red_command is not _UNSET:
|
|
touched.add("red_command")
|
|
if red_output is not _UNSET:
|
|
touched.add("red_output")
|
|
if red_comment_md is not _UNSET:
|
|
touched.add("red_comment_md")
|
|
if blue_comment_md is not _UNSET:
|
|
touched.add("blue_comment_md")
|
|
if detection_level_id is not _UNSET:
|
|
touched.add("detection_level_id")
|
|
if executed_at is not _UNSET:
|
|
touched.add("executed_at")
|
|
if executed_at_overridden is not _UNSET:
|
|
touched.add("executed_at_overridden")
|
|
if blue_log_source is not _UNSET:
|
|
touched.add("blue_log_source")
|
|
if blue_siem_logs is not _UNSET:
|
|
touched.add("blue_siem_logs")
|
|
if blue_incident_at is not _UNSET:
|
|
touched.add("blue_incident_at")
|
|
if blue_incident_number is not _UNSET:
|
|
touched.add("blue_incident_number")
|
|
if blue_incident_recipient_email is not _UNSET:
|
|
touched.add("blue_incident_recipient_email")
|
|
|
|
needs_red, needs_blue = _classify_fields(touched)
|
|
if not viewer_is_admin:
|
|
if needs_red and not has_red_perm:
|
|
raise MissingFieldPermission(
|
|
"mission.write_red_fields required for red-side fields"
|
|
)
|
|
if needs_blue and not has_blue_perm:
|
|
raise MissingFieldPermission(
|
|
"mission.write_blue_fields required for blue-side fields"
|
|
)
|
|
|
|
with session_scope() as s:
|
|
_ensure_mission_visible(s, mission_id, viewer_id, viewer_is_admin)
|
|
test = _load_test(s, mission_id, test_id)
|
|
|
|
if not touched:
|
|
return _to_detail_view(s, mission_id, test)
|
|
|
|
if "red_command" in touched:
|
|
test.red_command = _opt_cmd(red_command)
|
|
if "red_output" in touched:
|
|
test.red_output = _opt_cmd(red_output)
|
|
if "red_comment_md" in touched:
|
|
test.red_comment_md = _opt_md(red_comment_md)
|
|
if "blue_comment_md" in touched:
|
|
test.blue_comment_md = _opt_md(blue_comment_md)
|
|
|
|
if "detection_level_id" in touched:
|
|
if detection_level_id is None:
|
|
test.detection_level_id = None
|
|
else:
|
|
if not isinstance(detection_level_id, uuid.UUID):
|
|
raise InvalidTestPayload("detection_level_id must be a UUID")
|
|
lvl = s.get(DetectionLevel, detection_level_id)
|
|
if lvl is None:
|
|
raise InvalidTestPayload("unknown detection_level_id")
|
|
test.detection_level_id = detection_level_id
|
|
|
|
# Post-M7 blue-side review fields — short text + long text + a small
|
|
# cyber-incident sub-record. Email is sanity-checked at the API layer
|
|
# via Pydantic; the service just normalises empty strings to NULL.
|
|
if "blue_log_source" in touched:
|
|
test.blue_log_source = _opt_md(blue_log_source)
|
|
if "blue_siem_logs" in touched:
|
|
# SIEM excerpts can legitimately have leading whitespace inside
|
|
# the body (table-like log lines), so use the command-style
|
|
# normaliser that only collapses purely-empty strings to NULL.
|
|
test.blue_siem_logs = _opt_cmd(blue_siem_logs)
|
|
if "blue_incident_at" in touched:
|
|
if blue_incident_at is not None and not isinstance(blue_incident_at, datetime):
|
|
raise InvalidTestPayload("blue_incident_at must be an ISO datetime")
|
|
test.blue_incident_at = blue_incident_at
|
|
if "blue_incident_number" in touched:
|
|
test.blue_incident_number = _opt_md(blue_incident_number)
|
|
if "blue_incident_recipient_email" in touched:
|
|
test.blue_incident_recipient_email = _opt_md(
|
|
blue_incident_recipient_email
|
|
)
|
|
|
|
if "executed_at_overridden" in touched or "executed_at" in touched:
|
|
# Editing executed_at is a red-only privilege (gated above via
|
|
# _RED_FIELDS). State auto-promotion is handled by the general
|
|
# block below; this branch just validates and applies the
|
|
# timestamp + override flag.
|
|
new_overridden = (
|
|
bool(executed_at_overridden)
|
|
if "executed_at_overridden" in touched
|
|
else test.executed_at_overridden
|
|
)
|
|
new_at = test.executed_at if "executed_at" not in touched else executed_at
|
|
if new_overridden and new_at is None:
|
|
raise InvalidTestPayload(
|
|
"executed_at_overridden=true requires a non-null executed_at"
|
|
)
|
|
if "executed_at" in touched and new_at is not None and not isinstance(new_at, datetime):
|
|
raise InvalidTestPayload("executed_at must be an ISO datetime")
|
|
test.executed_at = new_at
|
|
test.executed_at_overridden = new_overridden
|
|
|
|
# Implicit lifecycle (post-amendement 2026-05-15 bis): the explicit
|
|
# workflow is gone from the UI. A write to ANY red field implies the
|
|
# test was executed (auto-stamp executed_at if the operator didn't
|
|
# supply one); a write to ANY blue field on an executed test implies
|
|
# the blue team reviewed it. The /transition endpoint stays for
|
|
# back-fill but is no longer the primary path.
|
|
red_touched = bool(touched & _RED_FIELDS)
|
|
blue_touched = bool(touched & _BLUE_FIELDS)
|
|
if red_touched and test.state in {"pending", "skipped", "blocked"}:
|
|
if test.executed_at is None:
|
|
test.executed_at = datetime.now(tz=timezone.utc)
|
|
test.executed_at_overridden = False
|
|
test.state = "executed"
|
|
if blue_touched and test.state == "executed":
|
|
test.state = "reviewed_by_blue"
|
|
|
|
_touch(test, viewer_id)
|
|
s.flush()
|
|
s.refresh(test)
|
|
return _to_detail_view(s, mission_id, test)
|
|
|
|
|
|
def transition_mission_test(
|
|
mission_id: uuid.UUID,
|
|
test_id: uuid.UUID,
|
|
target_state: str,
|
|
*,
|
|
viewer_id: uuid.UUID,
|
|
viewer_is_admin: bool,
|
|
has_red_perm: bool,
|
|
has_blue_perm: bool,
|
|
) -> MissionTestDetailView:
|
|
"""Drive the test through its lifecycle and side-effect `executed_at`.
|
|
|
|
Transitioning *into* `executed` stamps `executed_at = now()` and clears
|
|
the override flag — the deliberate red-side action commits the timeline.
|
|
Transitioning *out of* `executed` (to `pending`) clears the timestamp so
|
|
a re-execution starts from a clean slate.
|
|
"""
|
|
_ensure_state(target_state)
|
|
|
|
with session_scope() as s:
|
|
_ensure_mission_visible(s, mission_id, viewer_id, viewer_is_admin)
|
|
test = _load_test(s, mission_id, test_id)
|
|
|
|
# Perm gate runs BEFORE the idempotency short-circuit. A blue-only
|
|
# user POSTing target_state="executed" while the test is already
|
|
# executed must NOT get a 200 — it would falsely advertise that they
|
|
# hold the red-side perm. We resolve the would-be transition's side
|
|
# (or, on a no-op, fall back to the source side which originally
|
|
# produced the state) and enforce it before any response shape.
|
|
allowed = _VALID_TRANSITIONS.get(test.state, frozenset())
|
|
if test.state != target_state and target_state not in allowed:
|
|
raise InvalidTestTransition(
|
|
f"cannot transition test from {test.state!r} to {target_state!r}"
|
|
)
|
|
|
|
side: str | None
|
|
if test.state == target_state:
|
|
# Idempotent path: require the perm the *forward* transition
|
|
# would have needed. For terminal-states (already executed →
|
|
# executed), this is the side that *brought* the test here.
|
|
side = _IDEMPOTENT_SIDE.get(target_state)
|
|
else:
|
|
side = _TRANSITION_SIDE.get((test.state, target_state))
|
|
|
|
if not viewer_is_admin and side is not None:
|
|
if side == "red" and not has_red_perm:
|
|
raise MissingFieldPermission(
|
|
"mission.write_red_fields required for this transition"
|
|
)
|
|
if side == "blue" and not has_blue_perm:
|
|
raise MissingFieldPermission(
|
|
"mission.write_blue_fields required for this transition"
|
|
)
|
|
if side == "any" and not (has_red_perm or has_blue_perm):
|
|
raise MissingFieldPermission(
|
|
"either mission.write_red_fields or mission.write_blue_fields "
|
|
"is required"
|
|
)
|
|
|
|
if test.state == target_state:
|
|
# Genuine no-op: idempotent 200 with the current snapshot.
|
|
return _to_detail_view(s, mission_id, test)
|
|
|
|
if target_state == "executed":
|
|
test.executed_at = datetime.now(tz=timezone.utc)
|
|
test.executed_at_overridden = False
|
|
elif target_state == "pending":
|
|
# Returning to pending wipes the execution timestamp so a re-run
|
|
# starts clean. Notes/comments are preserved (history value).
|
|
test.executed_at = None
|
|
test.executed_at_overridden = False
|
|
|
|
test.state = target_state
|
|
_touch(test, viewer_id)
|
|
s.flush()
|
|
s.refresh(test)
|
|
return _to_detail_view(s, mission_id, test)
|
|
|
|
|
|
__all__ = [
|
|
"EvidenceView",
|
|
"MissionTestDetailView",
|
|
"MissionTestMitreTagView",
|
|
"ActivityEntryView",
|
|
"MissionTestNotFound",
|
|
"InvalidTestTransition",
|
|
"MissingFieldPermission",
|
|
"InvalidTestPayload",
|
|
"get_mission_test",
|
|
"list_activity_since",
|
|
"update_mission_test_fields",
|
|
"transition_mission_test",
|
|
"_touch",
|
|
"_load_test",
|
|
"_ensure_mission_visible",
|
|
"_to_detail_view",
|
|
"_to_evidence_view",
|
|
]
|
|
|
|
|
|
# Re-export — used by `app/api/missions.py` to wire the
|
|
# 404 handling without importing the originals from M6 in two places.
|
|
__all__ += ["MissionNotFound"]
|