Files
Metamorph/backend/app/api/scenario_templates.py
Knacky 447f15213a feat(m7): blue review fields + spec amendment + reviewer follow-ups
User feedback after the M7 ship: blue team's Excel workflow had 5 extra
fields we didn't capture. Per-test page also doesn't match their
workflow — they need a tabular view, one table per scenario.

Spec
- tasks/spec.md amended (`revised: 2026-05-15`): §4 in-scope, §F6, §8
  model bullet. §F6 now pins the column matrix, single-row-edit
  semantics, Esc-cancel, blur-confirm, and reconciles detection_level
  as a pill inside the Commentaires cell (no 8th column).
- tasks/todo.md M7 section grew an "Amendement 2026-05-15" sub-block
  tracking backend ☑ and frontend ☐.

Backend
- Migration c2a8f4b1d6e9: 5 nullable columns on mission_tests
  (blue_log_source, blue_siem_logs, blue_incident_at,
  blue_incident_number, blue_incident_recipient_email).
- _BLUE_FIELDS extended; update_mission_test_fields propagates each
  field; MissionTestDetailView + MissionTestView (the nested view in
  GET /missions/{id}) surface every annotation field, plus
  last_actor_*, updated_at, detection_level_key — O(1) batch lookup
  for detection-level keys and last-actor users keeps it scalable.
- UpdateMissionTestPayload accepts each field with length caps
  (120/200_000/120/255).

Reviewer follow-ups applied
- blue_incident_at + executed_at now reject naïve datetimes
  (_ensure_aware_datetime) — Postgres would otherwise interpret
  them in the session TZ, defeating the M7 verbatim-time contract.
- blue_incident_recipient_email goes through a permissive RFC-shape
  regex (_validate_email_shape) so internal/lab TLDs like .local
  / .corp / .test pass — Pydantic EmailStr is too strict (lessons.md
  M2 trap).
- Project-wide: switched `e.errors()` to
  `e.errors(include_context=False, include_url=False)` because the
  AfterValidator-raised ValueError lands in ctx and Flask can't
  serialize it.

Tests
- 5 new pytest cases: blue user writes the 5 new fields, red user is
  individually 403'd on each, round-trip via GET, naïve datetime
  rejected, email shape validated (.local accepted, bad shape 400).
- 138 pytest green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 14:45:18 +02:00

209 lines
7.0 KiB
Python

"""Scenario-template CRUD + reorder endpoints.
`PUT /<id>/tests` is the reorder/replace endpoint — it takes the full ordered
list and rewrites the join rows. There's no partial mutation API for the test
list: the wire contract is simpler and the client (drag-and-drop) already
holds the full ordering.
"""
from __future__ import annotations
import logging
import uuid
from typing import Any
from flask import Blueprint, jsonify, request
from pydantic import BaseModel, Field, ValidationError
from app.core.auth_decorators import require_auth, require_perm
from app.services import scenario_templates as svc
bp = Blueprint("scenario_templates", __name__, url_prefix="/scenario-templates")
log = logging.getLogger("metamorph.api.scenario_templates")
class CreateScenarioPayload(BaseModel):
name: str = Field(min_length=1, max_length=255)
description: str | None = Field(default=None, max_length=4000)
test_template_ids: list[uuid.UUID] = Field(default_factory=list, max_length=512)
model_config = {"extra": "forbid"}
class UpdateScenarioPayload(BaseModel):
name: str | None = Field(default=None, min_length=1, max_length=255)
description: str | None = Field(default=None, max_length=4000)
model_config = {"extra": "forbid"}
class SetTestsPayload(BaseModel):
test_template_ids: list[uuid.UUID] = Field(default_factory=list, max_length=512)
model_config = {"extra": "forbid"}
def _serialize(sc: svc.ScenarioTemplateView) -> dict[str, Any]:
return {
"id": str(sc.id),
"name": sc.name,
"description": sc.description,
"tests": [
{
"position": t.position,
"test_template_id": str(t.test_template_id),
"test_template_name": t.test_template_name,
"test_template_deleted": t.test_template_deleted,
}
for t in sc.tests
],
"tests_count": sc.tests_count,
"deleted_at": sc.deleted_at.isoformat() if sc.deleted_at else None,
"created_at": sc.created_at.isoformat(),
"updated_at": sc.updated_at.isoformat(),
}
def _parse_uuid_or_400(raw: str):
try:
return uuid.UUID(raw)
except ValueError:
return None
def _pagination_args() -> tuple[int, int] | tuple[None, tuple[int, str]]:
try:
limit = int(request.args.get("limit", "100"))
offset = int(request.args.get("offset", "0"))
except ValueError:
return None, (400, "invalid_pagination")
return max(1, min(limit, 500)), max(0, offset)
@bp.get("")
@require_auth
@require_perm("scenario_template.read")
def list_scenario_templates():
paging = _pagination_args()
if paging[0] is None:
return jsonify({"error": paging[1][1]}), paging[1][0]
limit, offset = paging
q = request.args.get("q") or None
include_deleted = request.args.get("include_deleted", "false").lower() == "true"
items, total = svc.list_scenario_templates(
q=q, include_deleted=include_deleted, limit=limit, offset=offset
)
return jsonify(
{
"items": [_serialize(it) for it in items],
"total": total,
"limit": limit,
"offset": offset,
}
)
@bp.get("/<scenario_id>")
@require_auth
@require_perm("scenario_template.read")
def get_scenario_template(scenario_id: str):
sid = _parse_uuid_or_400(scenario_id)
if sid is None:
return jsonify({"error": "invalid_id"}), 400
include_deleted = request.args.get("include_deleted", "false").lower() == "true"
try:
view = svc.get_scenario_template(sid, include_deleted=include_deleted)
except svc.ScenarioTemplateNotFound:
return jsonify({"error": "not_found"}), 404
return jsonify(_serialize(view))
@bp.post("")
@require_auth
@require_perm("scenario_template.create")
def create_scenario_template():
try:
payload = CreateScenarioPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
try:
view = svc.create_scenario_template(
name=payload.name,
description=payload.description,
test_template_ids=payload.test_template_ids,
)
except svc.UnknownTestTemplate as e:
return jsonify({"error": "unknown_test_template", "message": str(e)}), 400
except ValueError as e:
return jsonify({"error": "invalid_request", "message": str(e)}), 400
log.info(
"metamorph.scenario_template.created",
extra={"id": str(view.id), "tests": len(view.tests)},
)
return jsonify(_serialize(view)), 201
@bp.patch("/<scenario_id>")
@require_auth
@require_perm("scenario_template.update")
def update_scenario_template(scenario_id: str):
sid = _parse_uuid_or_400(scenario_id)
if sid is None:
return jsonify({"error": "invalid_id"}), 400
raw = request.get_json(silent=True) or {}
try:
payload = UpdateScenarioPayload.model_validate(raw)
except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
kwargs: dict[str, Any] = {}
if "name" in raw:
kwargs["name"] = payload.name
if "description" in raw:
kwargs["description"] = payload.description
try:
view = svc.update_scenario_template(sid, **kwargs)
except svc.ScenarioTemplateNotFound:
return jsonify({"error": "not_found"}), 404
except ValueError as e:
return jsonify({"error": "invalid_request", "message": str(e)}), 400
return jsonify(_serialize(view))
@bp.put("/<scenario_id>/tests")
@require_auth
@require_perm("scenario_template.update")
def set_scenario_tests(scenario_id: str):
sid = _parse_uuid_or_400(scenario_id)
if sid is None:
return jsonify({"error": "invalid_id"}), 400
try:
payload = SetTestsPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
try:
view = svc.set_scenario_tests(sid, payload.test_template_ids)
except svc.ScenarioTemplateNotFound:
return jsonify({"error": "not_found"}), 404
except svc.UnknownTestTemplate as e:
return jsonify({"error": "unknown_test_template", "message": str(e)}), 400
log.info(
"metamorph.scenario_template.tests_set",
extra={"id": str(sid), "tests": len(view.tests)},
)
return jsonify(_serialize(view))
@bp.delete("/<scenario_id>")
@require_auth
@require_perm("scenario_template.delete")
def soft_delete_scenario_template(scenario_id: str):
sid = _parse_uuid_or_400(scenario_id)
if sid is None:
return jsonify({"error": "invalid_id"}), 400
try:
svc.soft_delete_scenario_template(sid)
except svc.ScenarioTemplateNotFound:
return jsonify({"error": "not_found"}), 404
log.info("metamorph.scenario_template.soft_deleted", extra={"id": str(sid)})
return jsonify({"ok": True})