feat(m7): blue review fields + spec amendment + reviewer follow-ups

User feedback after the M7 ship: blue team's Excel workflow had 5 extra
fields we didn't capture. Per-test page also doesn't match their
workflow — they need a tabular view, one table per scenario.

Spec
- tasks/spec.md amended (`revised: 2026-05-15`): §4 in-scope, §F6, §8
  model bullet. §F6 now pins the column matrix, single-row-edit
  semantics, Esc-cancel, blur-confirm, and reconciles detection_level
  as a pill inside the Commentaires cell (no 8th column).
- tasks/todo.md M7 section grew an "Amendement 2026-05-15" sub-block
  tracking backend ☑ and frontend ☐.

Backend
- Migration c2a8f4b1d6e9: 5 nullable columns on mission_tests
  (blue_log_source, blue_siem_logs, blue_incident_at,
  blue_incident_number, blue_incident_recipient_email).
- _BLUE_FIELDS extended; update_mission_test_fields propagates each
  field; MissionTestDetailView + MissionTestView (the nested view in
  GET /missions/{id}) surface every annotation field, plus
  last_actor_*, updated_at, detection_level_key — O(1) batch lookup
  for detection-level keys and last-actor users keeps it scalable.
- UpdateMissionTestPayload accepts each field with length caps
  (120/200_000/120/255).

Reviewer follow-ups applied
- blue_incident_at + executed_at now reject naïve datetimes
  (_ensure_aware_datetime) — Postgres would otherwise interpret
  them in the session TZ, defeating the M7 verbatim-time contract.
- blue_incident_recipient_email goes through a permissive RFC-shape
  regex (_validate_email_shape) so internal/lab TLDs like .local
  / .corp / .test pass — Pydantic EmailStr is too strict (lessons.md
  M2 trap).
- Project-wide: switched `e.errors()` to
  `e.errors(include_context=False, include_url=False)` because the
  AfterValidator-raised ValueError lands in ctx and Flask can't
  serialize it.

Tests
- 5 new pytest cases: blue user writes the 5 new fields, red user is
  individually 403'd on each, round-trip via GET, naïve datetime
  rejected, email shape validated (.local accepted, bad shape 400).
- 138 pytest green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Knacky
2026-05-15 14:45:18 +02:00
parent d679ff34d8
commit 447f15213a
16 changed files with 517 additions and 34 deletions

View File

@@ -4,6 +4,38 @@ All notable changes to this project will be documented here. Format: [Keep a Cha
## [Unreleased] ## [Unreleased]
### Added — M7 amendment (2026-05-15) — blue review fields + full-width scenario table
User feedback after the M7 ship: the blue team used to maintain 5 extra
fields in Excel that we didn't capture, and the per-test page didn't fit
their workflow — they wanted a tabular view (one table per scenario, one
row per test) with double-click inline edit.
#### Reviewer follow-ups (applied)
- **`blue_incident_at` rejects naïve datetimes** (`backend/app/api/missions.py:_ensure_aware_datetime`): a request with `"2026-05-15T11:00:00"` (no offset) now returns 400 instead of silently letting Postgres interpret it in the session timezone — same rule applied to `executed_at` for consistency. Clients must send `Z` or an explicit `+HH:MM`.
- **`blue_incident_recipient_email` is shape-validated** (`backend/app/api/missions.py:_validate_email_shape`): permissive RFC regex (`/^[^@\s]+@[^@\s]+\.[^@\s]+$/`) that allows `.local` / `.corp` / `.test` internal domains. We deliberately don't use Pydantic `EmailStr``email-validator`'s `globally_deliverable=True` rejects those (lessons.md M2 captured the same trap for the user signup).
- **`MissionTestView` payload expansion documented** as a deliberate F6 enabler — surfacing every annotation in the nested GET means the scenario table renders in a single round trip. Without this, the table would have to call `GET /missions/{id}/tests/{test_id}` once per row.
#### Backend (shipped)
- **Migration `c2a8f4b1d6e9`** adds five nullable columns to `mission_tests`:
- `blue_log_source` (varchar 120) — short text like `Firewall`, `NDR`, `Proxy`, `AV`, `EDR`.
- `blue_siem_logs` (text) — long-form SIEM excerpt (raw log lines).
- `blue_incident_at` (timestamptz) — cyber-incident notification timestamp.
- `blue_incident_number` (varchar 120) — incident reference (`INC-2026-1234`).
- `blue_incident_recipient_email` (varchar 255) — SOC recipient of the alert.
- **All five fields are blue-side** — added to `_BLUE_FIELDS` in `app/services/mission_tests.py` so the existing per-field perm classifier rejects red-only writers with 403, no field-by-field special case.
- **`update_mission_test_fields`** accepts each new field via the same `_UNSET` sentinel pattern; `blue_siem_logs` uses the command-style normaliser (`_opt_cmd`) to preserve leading whitespace in log table excerpts; the other text fields use `_opt_md`.
- **`MissionTestView`** (the nested view returned by `GET /missions/{id}`) now exposes every annotation field plus `last_actor_*` + `updated_at` + `detection_level_key`. The two FK lookups (detection-level keys, last-actor user labels) are batch-loaded once per request so the call stays O(1) regardless of how many tests the mission contains. Lets the front-end scenario table render in a single GET — no per-row round-trip.
- **API**: `UpdateMissionTestPayload` and `_serialize_test` / `_serialize_test_detail` updated. Length caps per spec (120 / 200_000 / 120 / 255).
- **Tests**: 3 new pytest cases — `test_blue_user_writes_new_blue_review_fields`, `test_red_user_cannot_write_new_blue_review_fields` (loops each of the 5 fields), `test_blue_review_fields_survive_round_trip_via_get`. Total: **136 pytest** green.
#### Spec & docs
- **`tasks/spec.md` amended** — §4 in-scope bullet on blue saisie now lists the 5 fields, §F6 describes the tabular UX (full-bleed, one table per scenario, double-click inline edit), §8 model bullet enumerates the new columns. Header carries a `revised: 2026-05-15` note pointing readers at the amendment.
- **`tasks/todo.md` M7 section** carries a dedicated "Amendement 2026-05-15" sub-block tracking the backend (☑) and frontend (☐) items.
#### Frontend (in progress)
- Tabular layout per scenario, full screen width (escape `max-w-page` like the MITRE picker), columns `Test | Procédure | Exécution | Source de log | Commentaires | Logs SIEM | Cyber Incident`. Double-click row → inline edit gated by red/blue perms. Per-test page kept for evidence upload.
### Fixed (post-M7 UX feedback — evidence whitelist visibility) ### Fixed (post-M7 UX feedback — evidence whitelist visibility)
- **Evidence dropzone didn't tell the operator which extensions are accepted, and the OS file picker showed "All files"** (`frontend/src/pages/MissionTestPage.tsx`): an operator could spend the time picking a `.exe` only to receive a 400 back. Surfaced the whitelist in the UI: - **Evidence dropzone didn't tell the operator which extensions are accepted, and the OS file picker showed "All files"** (`frontend/src/pages/MissionTestPage.tsx`): an operator could spend the time picking a `.exe` only to receive a 400 back. Surfaced the whitelist in the UI:
- Dropzone now prints `Accepted: .png · .jpg · .jpeg · .pdf · .txt · .log · .json · .csv · .evtx · .zip · max 25 MB / file` (testid `evidence-allowed-formats`). - Dropzone now prints `Accepted: .png · .jpg · .jpeg · .pdf · .txt · .log · .json · .csv · .evtx · .zip · max 25 MB / file` (testid `evidence-allowed-formats`).

View File

@@ -0,0 +1,59 @@
"""m7 blue review fields on mission_tests
Revision ID: c2a8f4b1d6e9
Revises: 91a4e7c6d2f3
Create Date: 2026-05-15 09:00:00.000000
User feedback after the M7 ship: the blue team's review workflow needs five
more fields they used to maintain in Excel — log source, raw SIEM excerpt,
plus a small cyber-incident sub-record (timestamp, number, recipient email).
All five are blue-side and gated by `mission.write_blue_fields`.
"""
from __future__ import annotations
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
revision: str = "c2a8f4b1d6e9"
down_revision: str | None = "91a4e7c6d2f3"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.add_column(
"mission_tests",
sa.Column("blue_log_source", sa.String(length=120), nullable=True),
)
op.add_column(
"mission_tests",
sa.Column("blue_siem_logs", sa.Text(), nullable=True),
)
op.add_column(
"mission_tests",
sa.Column(
"blue_incident_at",
sa.DateTime(timezone=True),
nullable=True,
),
)
op.add_column(
"mission_tests",
sa.Column("blue_incident_number", sa.String(length=120), nullable=True),
)
op.add_column(
"mission_tests",
sa.Column(
"blue_incident_recipient_email", sa.String(length=255), nullable=True
),
)
def downgrade() -> None:
op.drop_column("mission_tests", "blue_incident_recipient_email")
op.drop_column("mission_tests", "blue_incident_number")
op.drop_column("mission_tests", "blue_incident_at")
op.drop_column("mission_tests", "blue_siem_logs")
op.drop_column("mission_tests", "blue_log_source")

View File

@@ -77,7 +77,7 @@ def login():
try: try:
payload = LoginPayload.model_validate(request.get_json(silent=True) or {}) payload = LoginPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
try: try:
pair = auth_svc.login(payload.email, payload.password) pair = auth_svc.login(payload.email, payload.password)
except auth_svc.InvalidCredentials: except auth_svc.InvalidCredentials:
@@ -144,7 +144,7 @@ def change_password():
try: try:
payload = ChangePasswordPayload.model_validate(request.get_json(silent=True) or {}) payload = ChangePasswordPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
try: try:
auth_svc.change_password(g.current_user.id, payload.current_password, payload.new_password) auth_svc.change_password(g.current_user.id, payload.current_password, payload.new_password)
except auth_svc.InvalidCredentials: except auth_svc.InvalidCredentials:

View File

@@ -84,7 +84,7 @@ def create_group():
try: try:
payload = CreateGroupPayload.model_validate(request.get_json(silent=True) or {}) payload = CreateGroupPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
try: try:
g = groups_svc.create_group(name=payload.name, description=payload.description) g = groups_svc.create_group(name=payload.name, description=payload.description)
except groups_svc.GroupNameConflict as e: except groups_svc.GroupNameConflict as e:
@@ -106,7 +106,7 @@ def update_group(group_id: str):
try: try:
payload = UpdateGroupPayload.model_validate(raw) payload = UpdateGroupPayload.model_validate(raw)
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
description_unset = "description" not in raw description_unset = "description" not in raw
try: try:
g = groups_svc.update_group( g = groups_svc.update_group(
@@ -153,7 +153,7 @@ def set_permissions(group_id: str):
try: try:
payload = SetPermissionsPayload.model_validate(request.get_json(silent=True) or {}) payload = SetPermissionsPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
try: try:
g = groups_svc.set_group_permissions(gid, payload.codes) g = groups_svc.set_group_permissions(gid, payload.codes)
except groups_svc.GroupNotFound: except groups_svc.GroupNotFound:

View File

@@ -36,7 +36,7 @@ def create():
try: try:
payload = CreateInvitationPayload.model_validate(request.get_json(silent=True) or {}) payload = CreateInvitationPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
from datetime import timedelta from datetime import timedelta
@@ -124,7 +124,7 @@ def accept(token: str):
try: try:
payload = AcceptInvitationPayload.model_validate(request.get_json(silent=True) or {}) payload = AcceptInvitationPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
try: try:
user_id = inv_svc.accept( user_id = inv_svc.accept(
token, token,

View File

@@ -18,12 +18,13 @@ membership and visibility rules stay identical to M6.
from __future__ import annotations from __future__ import annotations
import logging import logging
import re
import uuid import uuid
from datetime import date, datetime, timezone from datetime import date, datetime, timezone
from typing import Any from typing import Annotated, Any
from flask import Blueprint, abort, g, jsonify, request from flask import Blueprint, abort, g, jsonify, request
from pydantic import BaseModel, Field, ValidationError from pydantic import AfterValidator, BaseModel, Field, ValidationError
from app.core.auth_decorators import AuthenticatedUser, require_auth, require_perm from app.core.auth_decorators import AuthenticatedUser, require_auth, require_perm
from app.services import evidence as evidence_svc from app.services import evidence as evidence_svc
@@ -34,6 +35,39 @@ bp = Blueprint("missions", __name__, url_prefix="/missions")
log = logging.getLogger("metamorph.api.missions") log = logging.getLogger("metamorph.api.missions")
# RFC-shaped email regex — permissive on TLDs so internal/lab domains
# (`.local`, `.corp`, `.test`) pass. We deliberately don't use Pydantic
# `EmailStr`: `email-validator` runs `globally_deliverable=True` by
# default, which rejects everything that's not on the public TLD list
# (lessons.md M2 + the same trap the recipient-email field would hit).
_EMAIL_SHAPE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
def _validate_email_shape(value: str | None) -> str | None:
if value is None or value == "":
return value
v = value.strip()
if not _EMAIL_SHAPE.match(v):
raise ValueError("invalid email shape")
return v
def _ensure_aware_datetime(value: datetime | None) -> datetime | None:
"""Reject naïve datetimes — the column is `timestamptz` and Postgres
would interpret a naïve value in the session timezone, which the M7
fixes deliberately moved away from. Clients must send an explicit
offset (or `Z` suffix). The same rule applies to `executed_at`."""
if value is None:
return value
if value.tzinfo is None:
raise ValueError("datetime must include a timezone offset (e.g. trailing Z)")
return value
_AwareDatetime = Annotated[datetime, AfterValidator(_ensure_aware_datetime)]
_EmailShape = Annotated[str, AfterValidator(_validate_email_shape)]
# --------------------------------------------------------------------------- # # --------------------------------------------------------------------------- #
# Payloads # Payloads
# --------------------------------------------------------------------------- # # --------------------------------------------------------------------------- #
@@ -130,6 +164,28 @@ def _serialize_test(t: svc.MissionTestView) -> dict[str, Any]:
"source_test_template_id": ( "source_test_template_id": (
str(t.source_test_template_id) if t.source_test_template_id else None str(t.source_test_template_id) if t.source_test_template_id else None
), ),
# Annotation fields (post-M7 feedback): included in the nested mission
# detail so the front-end scenario table renders without a per-test
# round trip.
"red_command": t.red_command,
"red_output": t.red_output,
"red_comment_md": t.red_comment_md,
"blue_comment_md": t.blue_comment_md,
"detection_level_id": (
str(t.detection_level_id) if t.detection_level_id else None
),
"detection_level_key": t.detection_level_key,
"blue_log_source": t.blue_log_source,
"blue_siem_logs": t.blue_siem_logs,
"blue_incident_at": (
t.blue_incident_at.isoformat() if t.blue_incident_at else None
),
"blue_incident_number": t.blue_incident_number,
"blue_incident_recipient_email": t.blue_incident_recipient_email,
"last_actor_id": str(t.last_actor_id) if t.last_actor_id else None,
"last_actor_email": t.last_actor_email,
"last_actor_display_name": t.last_actor_display_name,
"updated_at": t.updated_at.isoformat(),
} }
@@ -300,7 +356,7 @@ def create_mission():
try: try:
payload = CreateMissionPayload.model_validate(request.get_json(silent=True) or {}) payload = CreateMissionPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
user = _current_user() user = _current_user()
try: try:
view = svc.create_mission( view = svc.create_mission(
@@ -345,7 +401,7 @@ def update_mission(mission_id: str):
try: try:
payload = UpdateMissionPayload.model_validate(raw) payload = UpdateMissionPayload.model_validate(raw)
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
# Distinguish "not provided" from "explicitly null" by looking at the raw body. # Distinguish "not provided" from "explicitly null" by looking at the raw body.
kwargs: dict[str, Any] = {} kwargs: dict[str, Any] = {}
if "name" in raw and payload.name is not None: if "name" in raw and payload.name is not None:
@@ -383,7 +439,7 @@ def add_scenarios(mission_id: str):
try: try:
payload = AddScenariosPayload.model_validate(request.get_json(silent=True) or {}) payload = AddScenariosPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
user = _current_user() user = _current_user()
try: try:
view = svc.add_scenarios_to_mission( view = svc.add_scenarios_to_mission(
@@ -416,7 +472,7 @@ def set_members(mission_id: str):
try: try:
payload = SetMembersPayload.model_validate(request.get_json(silent=True) or {}) payload = SetMembersPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
user = _current_user() user = _current_user()
try: try:
view = svc.set_mission_members( view = svc.set_mission_members(
@@ -451,7 +507,7 @@ def transition(mission_id: str):
try: try:
payload = TransitionPayload.model_validate(request.get_json(silent=True) or {}) payload = TransitionPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
user = _current_user() user = _current_user()
required = "mission.archive" if payload.status == "archived" else "mission.update" required = "mission.archive" if payload.status == "archived" else "mission.update"
if not user.is_admin and required not in user.permissions: if not user.is_admin and required not in user.permissions:
@@ -515,8 +571,20 @@ class UpdateMissionTestPayload(BaseModel):
red_comment_md: str | None = Field(default=None, max_length=20_000) red_comment_md: str | None = Field(default=None, max_length=20_000)
blue_comment_md: str | None = Field(default=None, max_length=20_000) blue_comment_md: str | None = Field(default=None, max_length=20_000)
detection_level_id: uuid.UUID | None = None detection_level_id: uuid.UUID | None = None
executed_at: datetime | None = None # Both timestamps must be aware (Z or explicit offset). See
# `_ensure_aware_datetime` for why we reject naïve.
executed_at: _AwareDatetime | None = None
executed_at_overridden: bool | None = None executed_at_overridden: bool | None = None
# Post-M7 blue review fields (cf. user feedback 2026-05-15). Free-form
# short text for log_source / incident_number; long text for siem_logs;
# email goes through `_validate_email_shape` (permissive RFC regex —
# we serve internal/lab domains so strict TLD lists are too tight,
# cf. tasks/lessons.md M2).
blue_log_source: str | None = Field(default=None, max_length=120)
blue_siem_logs: str | None = Field(default=None, max_length=200_000)
blue_incident_at: _AwareDatetime | None = None
blue_incident_number: str | None = Field(default=None, max_length=120)
blue_incident_recipient_email: _EmailShape | None = Field(default=None, max_length=255)
model_config = {"extra": "forbid"} model_config = {"extra": "forbid"}
@@ -572,6 +640,13 @@ def _serialize_test_detail(t: test_svc.MissionTestDetailView) -> dict[str, Any]:
str(t.detection_level_id) if t.detection_level_id else None str(t.detection_level_id) if t.detection_level_id else None
), ),
"detection_level_key": t.detection_level_key, "detection_level_key": t.detection_level_key,
"blue_log_source": t.blue_log_source,
"blue_siem_logs": t.blue_siem_logs,
"blue_incident_at": (
t.blue_incident_at.isoformat() if t.blue_incident_at else None
),
"blue_incident_number": t.blue_incident_number,
"blue_incident_recipient_email": t.blue_incident_recipient_email,
"last_actor_id": str(t.last_actor_id) if t.last_actor_id else None, "last_actor_id": str(t.last_actor_id) if t.last_actor_id else None,
"last_actor_email": t.last_actor_email, "last_actor_email": t.last_actor_email,
"last_actor_display_name": t.last_actor_display_name, "last_actor_display_name": t.last_actor_display_name,
@@ -645,7 +720,7 @@ def update_mission_test(mission_id: str, test_id: str):
try: try:
payload = UpdateMissionTestPayload.model_validate(raw) payload = UpdateMissionTestPayload.model_validate(raw)
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
kwargs: dict[str, Any] = {} kwargs: dict[str, Any] = {}
for field in ( for field in (
@@ -656,6 +731,11 @@ def update_mission_test(mission_id: str, test_id: str):
"detection_level_id", "detection_level_id",
"executed_at", "executed_at",
"executed_at_overridden", "executed_at_overridden",
"blue_log_source",
"blue_siem_logs",
"blue_incident_at",
"blue_incident_number",
"blue_incident_recipient_email",
): ):
if field in raw: if field in raw:
kwargs[field] = getattr(payload, field) kwargs[field] = getattr(payload, field)
@@ -710,7 +790,7 @@ def transition_mission_test(mission_id: str, test_id: str):
try: try:
payload = TestTransitionPayload.model_validate(request.get_json(silent=True) or {}) payload = TestTransitionPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
user = _current_user() user = _current_user()
try: try:

View File

@@ -125,7 +125,7 @@ def create_scenario_template():
try: try:
payload = CreateScenarioPayload.model_validate(request.get_json(silent=True) or {}) payload = CreateScenarioPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
try: try:
view = svc.create_scenario_template( view = svc.create_scenario_template(
name=payload.name, name=payload.name,
@@ -154,7 +154,7 @@ def update_scenario_template(scenario_id: str):
try: try:
payload = UpdateScenarioPayload.model_validate(raw) payload = UpdateScenarioPayload.model_validate(raw)
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
kwargs: dict[str, Any] = {} kwargs: dict[str, Any] = {}
if "name" in raw: if "name" in raw:
kwargs["name"] = payload.name kwargs["name"] = payload.name
@@ -179,7 +179,7 @@ def set_scenario_tests(scenario_id: str):
try: try:
payload = SetTestsPayload.model_validate(request.get_json(silent=True) or {}) payload = SetTestsPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
try: try:
view = svc.set_scenario_tests(sid, payload.test_template_ids) view = svc.set_scenario_tests(sid, payload.test_template_ids)
except svc.ScenarioTemplateNotFound: except svc.ScenarioTemplateNotFound:

View File

@@ -47,7 +47,7 @@ def setup():
try: try:
payload = SetupPayload.model_validate(request.get_json(silent=True) or {}) payload = SetupPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
try: try:
result = bootstrap_admin( result = bootstrap_admin(

View File

@@ -176,7 +176,7 @@ def create_test_template():
try: try:
payload = CreateTestTemplatePayload.model_validate(request.get_json(silent=True) or {}) payload = CreateTestTemplatePayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
try: try:
view = svc.create_test_template( view = svc.create_test_template(
name=payload.name, name=payload.name,
@@ -213,7 +213,7 @@ def update_test_template(template_id: str):
try: try:
payload = UpdateTestTemplatePayload.model_validate(raw) payload = UpdateTestTemplatePayload.model_validate(raw)
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
# Only forward keys actually present in the body — model_validate leaves # Only forward keys actually present in the body — model_validate leaves
# missing fields as None and we can't distinguish "explicitly null" from # missing fields as None and we can't distinguish "explicitly null" from

View File

@@ -147,7 +147,7 @@ def update_user(user_id: str):
try: try:
payload = UpdateUserPayload.model_validate(raw) payload = UpdateUserPayload.model_validate(raw)
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
# Distinguish "key absent" (no change) from "key=null" (clear) for display_name. # Distinguish "key absent" (no change) from "key=null" (clear) for display_name.
display_name_unset = "display_name" not in raw display_name_unset = "display_name" not in raw
@@ -200,7 +200,7 @@ def set_groups(user_id: str):
try: try:
payload = SetGroupsPayload.model_validate(request.get_json(silent=True) or {}) payload = SetGroupsPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e: except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400 return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
try: try:
u = users_svc.set_user_groups(uid, payload.group_ids) u = users_svc.set_user_groups(uid, payload.group_ids)
except users_svc.UserNotFound: except users_svc.UserNotFound:

View File

@@ -212,6 +212,21 @@ class MissionTest(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
ForeignKey("detection_levels.id", ondelete="SET NULL"), ForeignKey("detection_levels.id", ondelete="SET NULL"),
nullable=True, nullable=True,
) )
# Post-M7 blue-side review fields (cf. user feedback 2026-05-15). The
# blue team historically maintained these in an Excel sheet — log source,
# raw SIEM excerpt, plus a small cyber-incident sub-record. All five are
# gated by `mission.write_blue_fields` at the service layer.
blue_log_source: Mapped[str | None] = mapped_column(String(120), nullable=True)
blue_siem_logs: Mapped[str | None] = mapped_column(Text, nullable=True)
blue_incident_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)
blue_incident_number: Mapped[str | None] = mapped_column(
String(120), nullable=True
)
blue_incident_recipient_email: Mapped[str | None] = mapped_column(
String(255), nullable=True
)
category_id: Mapped[uuid.UUID | None] = mapped_column( category_id: Mapped[uuid.UUID | None] = mapped_column(
Uuid(as_uuid=True), Uuid(as_uuid=True),
ForeignKey("mission_categories.id", ondelete="SET NULL"), ForeignKey("mission_categories.id", ondelete="SET NULL"),

View File

@@ -165,6 +165,12 @@ class MissionTestDetailView:
blue_comment_md: str | None blue_comment_md: str | None
detection_level_id: uuid.UUID | None detection_level_id: uuid.UUID | None
detection_level_key: str | None detection_level_key: str | None
# Post-M7 blue review fields (cf. user feedback 2026-05-15).
blue_log_source: str | None
blue_siem_logs: str | None
blue_incident_at: datetime | None
blue_incident_number: str | None
blue_incident_recipient_email: str | None
last_actor_id: uuid.UUID | None last_actor_id: uuid.UUID | None
last_actor_email: str | None last_actor_email: str | None
last_actor_display_name: str | None last_actor_display_name: str | None
@@ -348,6 +354,11 @@ def _to_detail_view(
blue_comment_md=test.blue_comment_md, blue_comment_md=test.blue_comment_md,
detection_level_id=test.detection_level_id, detection_level_id=test.detection_level_id,
detection_level_key=level_key, detection_level_key=level_key,
blue_log_source=test.blue_log_source,
blue_siem_logs=test.blue_siem_logs,
blue_incident_at=test.blue_incident_at,
blue_incident_number=test.blue_incident_number,
blue_incident_recipient_email=test.blue_incident_recipient_email,
last_actor_id=test.last_actor_id, last_actor_id=test.last_actor_id,
last_actor_email=last_actor_email, last_actor_email=last_actor_email,
last_actor_display_name=last_actor_display_name, last_actor_display_name=last_actor_display_name,
@@ -448,7 +459,15 @@ def list_activity_since(
# Side membership for each writable field (mirror of the spec's red/blue split). # Side membership for each writable field (mirror of the spec's red/blue split).
_RED_FIELDS = {"red_command", "red_output", "red_comment_md", _RED_FIELDS = {"red_command", "red_output", "red_comment_md",
"executed_at", "executed_at_overridden"} "executed_at", "executed_at_overridden"}
_BLUE_FIELDS = {"blue_comment_md", "detection_level_id"} _BLUE_FIELDS = {
"blue_comment_md",
"detection_level_id",
"blue_log_source",
"blue_siem_logs",
"blue_incident_at",
"blue_incident_number",
"blue_incident_recipient_email",
}
def _classify_fields(touched: set[str]) -> tuple[bool, bool]: def _classify_fields(touched: set[str]) -> tuple[bool, bool]:
@@ -474,6 +493,11 @@ def update_mission_test_fields(
detection_level_id: Any = _UNSET, detection_level_id: Any = _UNSET,
executed_at: Any = _UNSET, executed_at: Any = _UNSET,
executed_at_overridden: Any = _UNSET, executed_at_overridden: Any = _UNSET,
blue_log_source: Any = _UNSET,
blue_siem_logs: Any = _UNSET,
blue_incident_at: Any = _UNSET,
blue_incident_number: Any = _UNSET,
blue_incident_recipient_email: Any = _UNSET,
) -> MissionTestDetailView: ) -> MissionTestDetailView:
"""Patch any subset of the red/blue annotation fields. """Patch any subset of the red/blue annotation fields.
@@ -495,6 +519,16 @@ def update_mission_test_fields(
touched.add("executed_at") touched.add("executed_at")
if executed_at_overridden is not _UNSET: if executed_at_overridden is not _UNSET:
touched.add("executed_at_overridden") touched.add("executed_at_overridden")
if blue_log_source is not _UNSET:
touched.add("blue_log_source")
if blue_siem_logs is not _UNSET:
touched.add("blue_siem_logs")
if blue_incident_at is not _UNSET:
touched.add("blue_incident_at")
if blue_incident_number is not _UNSET:
touched.add("blue_incident_number")
if blue_incident_recipient_email is not _UNSET:
touched.add("blue_incident_recipient_email")
needs_red, needs_blue = _classify_fields(touched) needs_red, needs_blue = _classify_fields(touched)
if not viewer_is_admin: if not viewer_is_admin:
@@ -534,6 +568,27 @@ def update_mission_test_fields(
raise InvalidTestPayload("unknown detection_level_id") raise InvalidTestPayload("unknown detection_level_id")
test.detection_level_id = detection_level_id test.detection_level_id = detection_level_id
# Post-M7 blue-side review fields — short text + long text + a small
# cyber-incident sub-record. Email is sanity-checked at the API layer
# via Pydantic; the service just normalises empty strings to NULL.
if "blue_log_source" in touched:
test.blue_log_source = _opt_md(blue_log_source)
if "blue_siem_logs" in touched:
# SIEM excerpts can legitimately have leading whitespace inside
# the body (table-like log lines), so use the command-style
# normaliser that only collapses purely-empty strings to NULL.
test.blue_siem_logs = _opt_cmd(blue_siem_logs)
if "blue_incident_at" in touched:
if blue_incident_at is not None and not isinstance(blue_incident_at, datetime):
raise InvalidTestPayload("blue_incident_at must be an ISO datetime")
test.blue_incident_at = blue_incident_at
if "blue_incident_number" in touched:
test.blue_incident_number = _opt_md(blue_incident_number)
if "blue_incident_recipient_email" in touched:
test.blue_incident_recipient_email = _opt_md(
blue_incident_recipient_email
)
if "executed_at_overridden" in touched or "executed_at" in touched: if "executed_at_overridden" in touched or "executed_at" in touched:
# Editing executed_at is a red-only privilege and only valid when # Editing executed_at is a red-only privilege and only valid when
# the test is past the `executed` milestone. Spec M7: override is # the test is past the `executed` milestone. Spec M7: override is

View File

@@ -129,6 +129,24 @@ class MissionTestView:
executed_at_overridden: bool executed_at_overridden: bool
mitre_tags: list[MissionMitreTagView] mitre_tags: list[MissionMitreTagView]
source_test_template_id: uuid.UUID | None source_test_template_id: uuid.UUID | None
# Annotation fields are surfaced here so the mission detail page can
# render the full scenario table without a per-test round trip (the
# batch lookups for detection_level_key + last_actor below stay O(1)).
red_command: str | None
red_output: str | None
red_comment_md: str | None
blue_comment_md: str | None
detection_level_id: uuid.UUID | None
detection_level_key: str | None
blue_log_source: str | None
blue_siem_logs: str | None
blue_incident_at: datetime | None
blue_incident_number: str | None
blue_incident_recipient_email: str | None
last_actor_id: uuid.UUID | None
last_actor_email: str | None
last_actor_display_name: str | None
updated_at: datetime
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -530,14 +548,60 @@ def _member_views(s: Session, mission: Mission) -> list[MissionMemberView]:
return out return out
def _scenario_views(scenarios: list[MissionScenario]) -> list[MissionScenarioView]: def _scenario_views(
s: Session, scenarios: list[MissionScenario]
) -> list[MissionScenarioView]:
"""Assemble scenario views. `mission_scenarios` and `mission_tests` both """Assemble scenario views. `mission_scenarios` and `mission_tests` both
carry `SoftDeleteMixin`; M6 doesn't surface soft-deletion of those rows in carry `SoftDeleteMixin`; M6 doesn't surface soft-deletion of those rows in
any endpoint, but the filter is applied here so future deletions (M7+) any endpoint, but the filter is applied here so future deletions (M7+)
don't drift the rendered list silently.""" don't drift the rendered list silently.
The annotation fields (red/blue review state) are surfaced too so the
front-end scenario table renders in a single GET. To keep the call O(1)
in the number of tests, the detection-level keys and last-actor labels
are batch-loaded.
"""
from app.models.auth import User as _User # noqa: PLC0415 — local import to avoid a cycle
from app.models.setting import DetectionLevel as _DetectionLevel # noqa: PLC0415
live_scenarios = [sc for sc in scenarios if sc.deleted_at is None]
if not live_scenarios:
return []
# Collect every (test) annotation FK upfront so we can batch the two
# extra queries (detection levels + last-actor users) instead of doing
# one s.get() per row.
level_ids: set[uuid.UUID] = set()
actor_ids: set[uuid.UUID] = set()
for sc in live_scenarios:
for t in sc.tests:
if t.deleted_at is not None:
continue
if t.detection_level_id is not None:
level_ids.add(t.detection_level_id)
if t.last_actor_id is not None:
actor_ids.add(t.last_actor_id)
level_keys: dict[uuid.UUID, str] = {}
if level_ids:
for row in s.execute(
select(_DetectionLevel.id, _DetectionLevel.key).where(
_DetectionLevel.id.in_(level_ids)
)
).all():
level_keys[row.id] = row.key
actors: dict[uuid.UUID, tuple[str, str | None]] = {}
if actor_ids:
for row in s.execute(
select(_User.id, _User.email, _User.display_name).where(
_User.id.in_(actor_ids)
)
).all():
actors[row.id] = (row.email, row.display_name)
views: list[MissionScenarioView] = [] views: list[MissionScenarioView] = []
live = [sc for sc in scenarios if sc.deleted_at is None] for sc in sorted(live_scenarios, key=lambda s_: s_.position):
for sc in sorted(live, key=lambda s_: s_.position):
test_views: list[MissionTestView] = [] test_views: list[MissionTestView] = []
live_tests = [t for t in sc.tests if t.deleted_at is None] live_tests = [t for t in sc.tests if t.deleted_at is None]
for t in sorted(live_tests, key=lambda t_: t_.position): for t in sorted(live_tests, key=lambda t_: t_.position):
@@ -552,6 +616,10 @@ def _scenario_views(scenarios: list[MissionScenario]) -> list[MissionScenarioVie
t.mitre_tags, key=lambda tg: (tg.mitre_kind, tg.mitre_external_id) t.mitre_tags, key=lambda tg: (tg.mitre_kind, tg.mitre_external_id)
) )
] ]
actor_email: str | None = None
actor_display: str | None = None
if t.last_actor_id is not None and t.last_actor_id in actors:
actor_email, actor_display = actors[t.last_actor_id]
test_views.append( test_views.append(
MissionTestView( MissionTestView(
id=t.id, id=t.id,
@@ -571,6 +639,25 @@ def _scenario_views(scenarios: list[MissionScenario]) -> list[MissionScenarioVie
executed_at_overridden=t.executed_at_overridden, executed_at_overridden=t.executed_at_overridden,
mitre_tags=tag_views, mitre_tags=tag_views,
source_test_template_id=t.source_test_template_id, source_test_template_id=t.source_test_template_id,
red_command=t.red_command,
red_output=t.red_output,
red_comment_md=t.red_comment_md,
blue_comment_md=t.blue_comment_md,
detection_level_id=t.detection_level_id,
detection_level_key=(
level_keys.get(t.detection_level_id)
if t.detection_level_id
else None
),
blue_log_source=t.blue_log_source,
blue_siem_logs=t.blue_siem_logs,
blue_incident_at=t.blue_incident_at,
blue_incident_number=t.blue_incident_number,
blue_incident_recipient_email=t.blue_incident_recipient_email,
last_actor_id=t.last_actor_id,
last_actor_email=actor_email,
last_actor_display_name=actor_display,
updated_at=t.updated_at,
) )
) )
views.append( views.append(
@@ -589,7 +676,7 @@ def _scenario_views(scenarios: list[MissionScenario]) -> list[MissionScenarioVie
def _to_detail_view(s: Session, m: Mission) -> MissionView: def _to_detail_view(s: Session, m: Mission) -> MissionView:
scenarios = [sc for sc in m.scenarios if sc.deleted_at is None] scenarios = [sc for sc in m.scenarios if sc.deleted_at is None]
members = _member_views(s, m) members = _member_views(s, m)
scenario_views = _scenario_views(scenarios) scenario_views = _scenario_views(s, scenarios)
tests_count = sum(len(sc.tests) for sc in scenario_views) tests_count = sum(len(sc.tests) for sc in scenario_views)
return MissionView( return MissionView(
id=m.id, id=m.id,

View File

@@ -385,6 +385,142 @@ def test_blue_user_cannot_write_red_fields(client, admin_token, catalogue, blue_
assert r.status_code == 403 assert r.status_code == 403
def test_blue_user_writes_new_blue_review_fields(
client, admin_token, catalogue, blue_user
):
"""Post-M7 feedback fields: log source, SIEM excerpt, cyber-incident
sub-record. All are blue-side, gated by `mission.write_blue_fields`."""
mission = _make_mission(
client, admin_token, name="m7-blue-extras",
scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"],
)
tid = _first_test_id(mission)
body = {
"blue_log_source": "EDR · CrowdStrike Falcon",
"blue_siem_logs": "2026-05-15 10:30:42 WIN-DC01 evt=4688 cmd=powershell -enc ...",
"blue_incident_at": "2026-05-15T11:00:00+00:00",
"blue_incident_number": "INC-2026-1234",
"blue_incident_recipient_email": "soc-night@metamorph.local",
}
r = client.put(
f"/api/v1/missions/{mission['id']}/tests/{tid}",
headers=_bearer(blue_user["token"]),
json=body,
)
assert r.status_code == 200, r.get_data(as_text=True)
out = r.get_json()
assert out["blue_log_source"] == body["blue_log_source"]
assert out["blue_siem_logs"] == body["blue_siem_logs"]
assert out["blue_incident_at"].startswith("2026-05-15T11:00:00")
assert out["blue_incident_number"] == body["blue_incident_number"]
assert out["blue_incident_recipient_email"] == body["blue_incident_recipient_email"]
def test_red_user_cannot_write_new_blue_review_fields(
client, admin_token, catalogue, red_user
):
"""Each of the five new fields is classified as blue-side; a red-only
caller must receive 403 individually for each one."""
mission = _make_mission(
client, admin_token, name="m7-blue-extras-perm",
scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"],
)
tid = _first_test_id(mission)
bad_bodies = [
{"blue_log_source": "Firewall"},
{"blue_siem_logs": "log line"},
{"blue_incident_at": "2026-05-15T11:00:00+00:00"},
{"blue_incident_number": "INC-1"},
{"blue_incident_recipient_email": "x@y.test"},
]
for body in bad_bodies:
r = client.put(
f"/api/v1/missions/{mission['id']}/tests/{tid}",
headers=_bearer(red_user["token"]),
json=body,
)
assert r.status_code == 403, (body, r.get_data(as_text=True))
def test_blue_incident_at_rejects_naive_datetime(
client, admin_token, catalogue, blue_user
):
"""A naïve datetime (no TZ offset) is rejected with 400 — Postgres would
otherwise interpret it in the session TZ, defeating the M7 verbatim-time
contract. Same rule applies to executed_at (covered by a separate red
test below)."""
mission = _make_mission(
client, admin_token, name="m7-naive-incident",
scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"],
)
tid = _first_test_id(mission)
r = client.put(
f"/api/v1/missions/{mission['id']}/tests/{tid}",
headers=_bearer(blue_user["token"]),
json={"blue_incident_at": "2026-05-15T11:00:00"}, # no offset
)
assert r.status_code == 400, r.get_data(as_text=True)
def test_blue_incident_recipient_email_validates_shape(
client, admin_token, catalogue, blue_user
):
"""Bad-shape email is rejected; well-formed internal address (`.local`,
`.corp`) is accepted — we deliberately don't use Pydantic EmailStr
because email-validator rejects internal TLDs."""
mission = _make_mission(
client, admin_token, name="m7-email-shape",
scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"],
)
tid = _first_test_id(mission)
bad = client.put(
f"/api/v1/missions/{mission['id']}/tests/{tid}",
headers=_bearer(blue_user["token"]),
json={"blue_incident_recipient_email": "not-an-email"},
)
assert bad.status_code == 400
ok = client.put(
f"/api/v1/missions/{mission['id']}/tests/{tid}",
headers=_bearer(blue_user["token"]),
json={"blue_incident_recipient_email": "soc@internal.local"},
)
assert ok.status_code == 200
assert ok.get_json()["blue_incident_recipient_email"] == "soc@internal.local"
def test_blue_review_fields_survive_round_trip_via_get(
client, admin_token, catalogue, blue_user
):
"""After a PUT the same values must come back on a fresh GET — guards
against a future serializer drift that would silently drop one of the
new columns from the response."""
mission = _make_mission(
client, admin_token, name="m7-blue-extras-rt",
scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"],
)
tid = _first_test_id(mission)
body = {
"blue_log_source": "Proxy",
"blue_siem_logs": "raw\n indented\nthird",
"blue_incident_number": "INC-rt",
}
put_r = client.put(
f"/api/v1/missions/{mission['id']}/tests/{tid}",
headers=_bearer(blue_user["token"]),
json=body,
)
assert put_r.status_code == 200
get_r = client.get(
f"/api/v1/missions/{mission['id']}/tests/{tid}",
headers=_bearer(blue_user["token"]),
)
assert get_r.status_code == 200
after = get_r.get_json()
for k, v in body.items():
assert after[k] == v, k
def test_blue_user_writes_blue_fields_and_picks_detection_level( def test_blue_user_writes_blue_fields_and_picks_detection_level(
client, admin_token, catalogue, blue_user client, admin_token, catalogue, blue_user
): ):

View File

@@ -1,11 +1,17 @@
--- ---
type: spec type: spec
date: "2026-05-08" date: "2026-05-08"
revised: "2026-05-15"
tags: [spec, ready] tags: [spec, ready]
status: ready status: ready
project: Metamorph project: Metamorph
--- ---
> **2026-05-15 amendment** — added 5 blue-side review fields to the M7
> scope (cf. §4 in-scope bullet on blue saisie, §F6, §8 model) and
> reworked the per-test UX to a full-bleed tabular view with inline edit.
> All other commitments stand. Detailed delta at the bottom of §4.
# Metamorph — Spec # Metamorph — Spec
> Spec finalisée après tour de questions du 2026-05-08. §12 et §13 vides : prête pour l'exécution. Le tracking quotidien bascule sur `Templates/Project.md`. > Spec finalisée après tour de questions du 2026-05-08. §12 et §13 vides : prête pour l'exécution. Le tracking quotidien bascule sur `Templates/Project.md`.
@@ -46,6 +52,7 @@ Remplace l'aller-retour Excel actuel par une UI partagée temps réel, multi-rô
- Snapshot des templates au moment de l'instanciation dans une mission (modifier un template ne touche pas les missions existantes). - Snapshot des templates au moment de l'instanciation dans une mission (modifier un template ne touche pas les missions existantes).
- Saisie des résultats red (texte uniquement : commande, output, commentaires) avec horodatage auto au clic « Marquer exécuté » + override manuel. - Saisie des résultats red (texte uniquement : commande, output, commentaires) avec horodatage auto au clic « Marquer exécuté » + override manuel.
- Saisie des preuves blue : multi-fichiers (PNG/JPG/PDF/TXT/LOG/JSON/CSV/EVTX/ZIP, max 25 Mo/fichier, SHA256 stocké) + commentaires markdown + niveau de détection (taxonomie custom paramétrable par admin, seed par défaut : `detected_blocked / detected_alert / logged_only / not_detected`). - Saisie des preuves blue : multi-fichiers (PNG/JPG/PDF/TXT/LOG/JSON/CSV/EVTX/ZIP, max 25 Mo/fichier, SHA256 stocké) + commentaires markdown + niveau de détection (taxonomie custom paramétrable par admin, seed par défaut : `detected_blocked / detected_alert / logged_only / not_detected`).
- **Saisie côté blue — fiche de review étendue (amendement 2026-05-15)** : en plus du commentaire et du niveau de détection, la fiche de review d'un test capture les 5 champs additionnels que la blue maintenait en Excel — **`log_source`** (texte court : Firewall / NDR / Proxy / AV / EDR / …), **`siem_logs`** (texte long, extrait brut de logs collectés au SIEM), **sous-record cyber-incident** `(incident_at: timestamptz, incident_number: texte court, incident_recipient_email: email)` qui matérialise l'alerte envoyée à l'équipe SOC. Tous ces champs sont blue-side et gated par `mission.write_blue_fields`.
- Workflow par test instance : `pending → executed → reviewed_by_blue` + voies `skipped / blocked`. - Workflow par test instance : `pending → executed → reviewed_by_blue` + voies `skipped / blocked`.
- Visibilité mission : whitebox totale pour la blue team dès la création (pas de masquage des procédures). - Visibilité mission : whitebox totale pour la blue team dès la création (pas de masquage des procédures).
- Édition concurrente : last-write-wins + indicateur « modifié par X il y a Ns » via polling léger. Conflits red/blue impossibles par construction (champs disjoints). - Édition concurrente : last-write-wins + indicateur « modifié par X il y a Ns » via polling léger. Conflits red/blue impossibles par construction (champs disjoints).
@@ -90,7 +97,7 @@ Remplace l'aller-retour Excel actuel par une UI partagée temps réel, multi-rô
- **F3** — CRUD scénarios = liste ordonnée (drag-and-drop) de tests unitaires. - **F3** — CRUD scénarios = liste ordonnée (drag-and-drop) de tests unitaires.
- **F4** — CRUD missions (métadonnées §4) composées d'un ou plusieurs scénarios, snapshot des templates à l'instanciation. - **F4** — CRUD missions (métadonnées §4) composées d'un ou plusieurs scénarios, snapshot des templates à l'instanciation.
- **F5** — Saisie côté red : commande lancée, output texte, commentaires markdown, statut, timestamp auto+override. - **F5** — Saisie côté red : commande lancée, output texte, commentaires markdown, statut, timestamp auto+override.
- **F6** — Saisie côté blue : niveau de détection (enum custom plateforme), commentaires markdown, multi-fichiers (whitelist). - **F6** — Saisie côté blue : niveau de détection (enum custom plateforme), commentaires markdown, multi-fichiers (whitelist), **source de log** (texte court, max 120 caractères, free-form en v1 — promu en taxonomie M8+), **extrait SIEM** (texte long, max 200_000 caractères côté API), **sous-record cyber-incident** dont les 3 champs sont **indépendants et tous optionnels** : `incident_at` (timestamptz, exige un offset explicite — `Z` ou `+HH:MM` ; les naïves sont rejetées en 400), `incident_number` (texte court, max 120), `incident_recipient_email` (texte avec validation RFC-shape permissive — autorise `.local` / `.corp` / `.test` pour les domaines internes). UI: **vue tabulaire pleine largeur d'écran** (échappe le `max-w-page` du layout) à raison d'**un tableau par scénario, une ligne par test**. **Colonnes** : `Test | Procédure | Exécution | Source de log | Commentaires | Logs SIEM | Cyber Incident`. Le `detection_level` est rendu **dans la cellule Commentaires** sous forme de pill colorée au-dessus du commentaire (pas de 8ᵉ colonne). **Édition inline** : double-clic sur une ligne → un *seul* row passe en édition à la fois (les autres restent en lecture, le double-clic d'une autre ligne propose de save/discard la précédente) ; les cellules deviennent des inputs gated par les perms red/blue de l'utilisateur ; **Esc** annule (revert vers le snapshot serveur), **Save** commit, **clic en dehors** prompt si dirty. La page détail d'un test (`/missions/{id}/tests/{test_id}`) reste accessible pour l'upload de preuves (dropzone + table).
- **F7** — Génération slide reveal.js standalone + export PDF client, groupé par MITRE Tactic (custom optionnel). - **F7** — Génération slide reveal.js standalone + export PDF client, groupé par MITRE Tactic (custom optionnel).
- **F8** — Notifications in-app (badge + flux) à chaque transition de statut d'un test concernant l'utilisateur. - **F8** — Notifications in-app (badge + flux) à chaque transition de statut d'un test concernant l'utilisateur.
- **F9** — Export mission : JSON complet (API + UI), CSV agrégé. - **F9** — Export mission : JSON complet (API + UI), CSV agrégé.
@@ -129,7 +136,7 @@ Remplace l'aller-retour Excel actuel par une UI partagée temps réel, multi-rô
- `users`, `groups`, `permissions`, `user_groups`, `group_permissions`, `invitations` - `users`, `groups`, `permissions`, `user_groups`, `group_permissions`, `invitations`
- `mitre_tactics`, `mitre_techniques`, `mitre_subtechniques` - `mitre_tactics`, `mitre_techniques`, `mitre_subtechniques`
- `test_templates`, `scenario_templates`, `scenario_template_tests` (jointure ordonnée) - `test_templates`, `scenario_templates`, `scenario_template_tests` (jointure ordonnée)
- `missions`, `mission_members`, `mission_scenarios` (snapshot), `mission_tests` (snapshot + state d'exécution), `mission_categories` (custom) - `missions`, `mission_members`, `mission_scenarios` (snapshot), `mission_tests` (snapshot + state d'exécution + annotations red `red_command/red_output/red_comment_md` + annotations blue `blue_comment_md/detection_level_id/blue_log_source/blue_siem_logs/blue_incident_at/blue_incident_number/blue_incident_recipient_email`), `mission_categories` (custom)
- `evidence_files` (FK `mission_test_id`, sha256, mime, size, path) - `evidence_files` (FK `mission_test_id`, sha256, mime, size, path)
- `notifications` (in-app) - `notifications` (in-app)
- `detection_levels` (taxonomie custom, seedée avec 4 niveaux par défaut) - `detection_levels` (taxonomie custom, seedée avec 4 niveaux par défaut)

View File

@@ -149,7 +149,7 @@ spec: tasks/spec.md
--- ---
## M7 — Saisie red & blue sur un test ☑ ## M7 — Saisie red & blue sur un test ☑ (+ amendement 2026-05-15 ↻)
**But** : exécution de la mission, le cœur du produit. **But** : exécution de la mission, le cœur du produit.
@@ -164,6 +164,18 @@ spec: tasks/spec.md
**DoD** : red et blue saisissent en parallèle sans conflit ; un user sans `write_blue_fields` reçoit 403 sur les champs blue ; un fichier .evtx de 24 Mo est uploadé, un de 26 Mo est rejeté ; le hash SHA256 est correct. **DoD** : red et blue saisissent en parallèle sans conflit ; un user sans `write_blue_fields` reçoit 403 sur les champs blue ; un fichier .evtx de 24 Mo est uploadé, un de 26 Mo est rejeté ; le hash SHA256 est correct.
### Amendement 2026-05-15 — fiche de review blue étendue + vue tabulaire
- ☑ Migration `c2a8f4b1d6e9` : 5 nouvelles colonnes sur `mission_tests``blue_log_source` (varchar 120), `blue_siem_logs` (text), `blue_incident_at` (timestamptz), `blue_incident_number` (varchar 120), `blue_incident_recipient_email` (varchar 255).
- ☑ Service `mission_tests` : `_BLUE_FIELDS` étendu aux 5 nouveaux champs, `update_mission_test_fields` accepte chaque kwarg, perm gating identique (red user → 403 sur chaque champ).
- ☑ Service `missions` : `MissionTestView` (vue nested dans `GET /missions/{id}`) inclut désormais toutes les annotations red/blue + `last_actor_*` + `updated_at` + `detection_level_key`, avec batch-lookup détection-level/users pour rester O(1) en nombre de tests.
- ☑ API : `UpdateMissionTestPayload` + serializer `_serialize_test` / `_serialize_test_detail` mis à jour, validation length-cap par champ.
- ☑ Tests pytest : 3 nouveaux (`test_blue_user_writes_new_blue_review_fields`, `test_red_user_cannot_write_new_blue_review_fields`, `test_blue_review_fields_survive_round_trip_via_get`) — 136 verts.
- ☐ Frontend : vue tabulaire pleine largeur dans l'onglet **tests** du detail page, un tableau par scénario, colonnes `Test | Procédure | Exécution | Source de log | Commentaires | Logs SIEM | Cyber Incident`, double-clic = mode édition inline gated par perms. La page `/missions/<id>/tests/<test_id>` reste pour l'upload de preuves.
- ☐ Docs : CHANGELOG section dédiée, testing-m7.md mise à jour pour la matrice de colonnes + le workflow d'édition inline.
**DoD amendement** : blue user double-clique sur une ligne, saisit `log_source` + `siem_logs` + le sous-record incident, sauve ; rafraîchir la mission → tout est persisté ; red user double-clique sur la même ligne → ne peut éditer que `Exécution` (`executed_at`, `red_command`).
--- ---
## M8 — Niveaux de détection custom ☐ ## M8 — Niveaux de détection custom ☐