Files
Metamorph/backend/app/api/test_templates.py

258 lines
9.7 KiB
Python
Raw Permalink Normal View History

"""Test-template CRUD endpoints.
Reads gated by `test_template.read`. Writes gated by `test_template.{create,
update,delete}`. Service layer handles all DB work; this module only validates
the wire payload and shapes the JSON response.
"""
from __future__ import annotations
import logging
import uuid
from typing import Any
from flask import Blueprint, jsonify, request
fix(m5): post-review pass — AND filter, advisory lock, N+1, item caps, mutation cache Spec-reviewer + code-reviewer findings applied: Must-fix - Filter combinator AND-semantics: tactic+technique+subtechnique now intersect (one IN subquery per facet) instead of being pooled into one OR. Reviewers flagged both the wrong default semantics and the theoretical UUID-collision risk of pooling tactic/technique/sub UUIDs into a shared list across three columns. - Front-end mutation cache hygiene: updateMeta + setTests both `onSettled: invalidate` so a partial failure leaves the cache consistent. Should-fix - Per-scenario pg_advisory_xact_lock on set_scenario_tests — serialises concurrent reorders, mirrors M4 /mitre/sync pattern. - Backend/front consistency on duplicate tests in a scenario: the UNIQUE(scenario_id, position) constraint already allows the same test_template multiple times (chained ops), so the catalogue picker no longer excludes already-picked items. Nice-to-have - N+1 eradicated in test_template view rendering: _to_views_batch builds {uuid → MitreRow} maps in 3 queries up-front; list endpoint now issues 4 queries total regardless of list size. - Wire-level item length caps on tags (64) and expected_iocs (255) via Annotated[str, StringConstraints(...)] — returns 400 instead of bubbling up StringDataRightTruncation. - 4 new pytest covering the AND-filter, extra="forbid" rejection, empty mitre_tags clearing, and the 65-char tag cap. Total now 81 pytest + 38 e2e pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 20:05:00 +02:00
from pydantic import BaseModel, Field, StringConstraints, ValidationError
from typing import Annotated
from app.core.auth_decorators import require_auth, require_perm
from app.services import test_templates as svc
fix(m5): post-review pass — AND filter, advisory lock, N+1, item caps, mutation cache Spec-reviewer + code-reviewer findings applied: Must-fix - Filter combinator AND-semantics: tactic+technique+subtechnique now intersect (one IN subquery per facet) instead of being pooled into one OR. Reviewers flagged both the wrong default semantics and the theoretical UUID-collision risk of pooling tactic/technique/sub UUIDs into a shared list across three columns. - Front-end mutation cache hygiene: updateMeta + setTests both `onSettled: invalidate` so a partial failure leaves the cache consistent. Should-fix - Per-scenario pg_advisory_xact_lock on set_scenario_tests — serialises concurrent reorders, mirrors M4 /mitre/sync pattern. - Backend/front consistency on duplicate tests in a scenario: the UNIQUE(scenario_id, position) constraint already allows the same test_template multiple times (chained ops), so the catalogue picker no longer excludes already-picked items. Nice-to-have - N+1 eradicated in test_template view rendering: _to_views_batch builds {uuid → MitreRow} maps in 3 queries up-front; list endpoint now issues 4 queries total regardless of list size. - Wire-level item length caps on tags (64) and expected_iocs (255) via Annotated[str, StringConstraints(...)] — returns 400 instead of bubbling up StringDataRightTruncation. - 4 new pytest covering the AND-filter, extra="forbid" rejection, empty mitre_tags clearing, and the 65-char tag cap. Total now 81 pytest + 38 e2e pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 20:05:00 +02:00
# Tag and IOC entries are stored as PG ARRAY(String(N)). Cap items at the wire
# layer so over-sized inputs return 400 with a useful message rather than the
# bare StringDataRightTruncation from the driver.
TagStr = Annotated[str, StringConstraints(min_length=1, max_length=64)]
IocStr = Annotated[str, StringConstraints(min_length=1, max_length=255)]
bp = Blueprint("test_templates", __name__, url_prefix="/test-templates")
log = logging.getLogger("metamorph.api.test_templates")
# === Payload schemas ==========================================================
class MitreTagIn(BaseModel):
kind: str = Field(min_length=1)
external_id: str = Field(min_length=1, max_length=16)
model_config = {"extra": "forbid"}
class CreateTestTemplatePayload(BaseModel):
name: str = Field(min_length=1, max_length=255)
description: str | None = Field(default=None, max_length=4000)
objective: str | None = Field(default=None, max_length=4000)
procedure_md: str | None = Field(default=None, max_length=32_000)
prerequisites_md: str | None = Field(default=None, max_length=32_000)
expected_result_red_md: str | None = Field(default=None, max_length=32_000)
expected_detection_blue_md: str | None = Field(default=None, max_length=32_000)
opsec_level: str = Field(default="medium")
fix(m5): post-review pass — AND filter, advisory lock, N+1, item caps, mutation cache Spec-reviewer + code-reviewer findings applied: Must-fix - Filter combinator AND-semantics: tactic+technique+subtechnique now intersect (one IN subquery per facet) instead of being pooled into one OR. Reviewers flagged both the wrong default semantics and the theoretical UUID-collision risk of pooling tactic/technique/sub UUIDs into a shared list across three columns. - Front-end mutation cache hygiene: updateMeta + setTests both `onSettled: invalidate` so a partial failure leaves the cache consistent. Should-fix - Per-scenario pg_advisory_xact_lock on set_scenario_tests — serialises concurrent reorders, mirrors M4 /mitre/sync pattern. - Backend/front consistency on duplicate tests in a scenario: the UNIQUE(scenario_id, position) constraint already allows the same test_template multiple times (chained ops), so the catalogue picker no longer excludes already-picked items. Nice-to-have - N+1 eradicated in test_template view rendering: _to_views_batch builds {uuid → MitreRow} maps in 3 queries up-front; list endpoint now issues 4 queries total regardless of list size. - Wire-level item length caps on tags (64) and expected_iocs (255) via Annotated[str, StringConstraints(...)] — returns 400 instead of bubbling up StringDataRightTruncation. - 4 new pytest covering the AND-filter, extra="forbid" rejection, empty mitre_tags clearing, and the 65-char tag cap. Total now 81 pytest + 38 e2e pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 20:05:00 +02:00
tags: list[TagStr] = Field(default_factory=list, max_length=64)
expected_iocs: list[IocStr] = Field(default_factory=list, max_length=128)
mitre_tags: list[MitreTagIn] = Field(default_factory=list, max_length=64)
model_config = {"extra": "forbid"}
class UpdateTestTemplatePayload(BaseModel):
name: str | None = Field(default=None, min_length=1, max_length=255)
description: str | None = Field(default=None, max_length=4000)
objective: str | None = Field(default=None, max_length=4000)
procedure_md: str | None = Field(default=None, max_length=32_000)
prerequisites_md: str | None = Field(default=None, max_length=32_000)
expected_result_red_md: str | None = Field(default=None, max_length=32_000)
expected_detection_blue_md: str | None = Field(default=None, max_length=32_000)
opsec_level: str | None = None
fix(m5): post-review pass — AND filter, advisory lock, N+1, item caps, mutation cache Spec-reviewer + code-reviewer findings applied: Must-fix - Filter combinator AND-semantics: tactic+technique+subtechnique now intersect (one IN subquery per facet) instead of being pooled into one OR. Reviewers flagged both the wrong default semantics and the theoretical UUID-collision risk of pooling tactic/technique/sub UUIDs into a shared list across three columns. - Front-end mutation cache hygiene: updateMeta + setTests both `onSettled: invalidate` so a partial failure leaves the cache consistent. Should-fix - Per-scenario pg_advisory_xact_lock on set_scenario_tests — serialises concurrent reorders, mirrors M4 /mitre/sync pattern. - Backend/front consistency on duplicate tests in a scenario: the UNIQUE(scenario_id, position) constraint already allows the same test_template multiple times (chained ops), so the catalogue picker no longer excludes already-picked items. Nice-to-have - N+1 eradicated in test_template view rendering: _to_views_batch builds {uuid → MitreRow} maps in 3 queries up-front; list endpoint now issues 4 queries total regardless of list size. - Wire-level item length caps on tags (64) and expected_iocs (255) via Annotated[str, StringConstraints(...)] — returns 400 instead of bubbling up StringDataRightTruncation. - 4 new pytest covering the AND-filter, extra="forbid" rejection, empty mitre_tags clearing, and the 65-char tag cap. Total now 81 pytest + 38 e2e pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 20:05:00 +02:00
tags: list[TagStr] | None = Field(default=None, max_length=64)
expected_iocs: list[IocStr] | None = Field(default=None, max_length=128)
mitre_tags: list[MitreTagIn] | None = Field(default=None, max_length=64)
model_config = {"extra": "forbid"}
# === Serializers ==============================================================
def _serialize(t: svc.TestTemplateView) -> dict[str, Any]:
return {
"id": str(t.id),
"name": t.name,
"description": t.description,
"objective": t.objective,
"procedure_md": t.procedure_md,
"prerequisites_md": t.prerequisites_md,
"expected_result_red_md": t.expected_result_red_md,
"expected_detection_blue_md": t.expected_detection_blue_md,
"opsec_level": t.opsec_level,
"tags": list(t.tags),
"expected_iocs": list(t.expected_iocs),
"mitre_tags": [
{"kind": tag.kind, "external_id": tag.external_id, "name": tag.name, "url": tag.url}
for tag in t.mitre_tags
],
"deleted_at": t.deleted_at.isoformat() if t.deleted_at else None,
"created_at": t.created_at.isoformat(),
"updated_at": t.updated_at.isoformat(),
}
def _parse_uuid_or_400(raw: str):
try:
return uuid.UUID(raw)
except ValueError:
return None
def _pagination_args() -> tuple[int, int] | tuple[None, tuple[int, str]]:
try:
limit = int(request.args.get("limit", "100"))
offset = int(request.args.get("offset", "0"))
except ValueError:
return None, (400, "invalid_pagination")
return max(1, min(limit, 500)), max(0, offset)
# === Endpoints ================================================================
@bp.get("")
@require_auth
@require_perm("test_template.read")
def list_test_templates():
paging = _pagination_args()
if paging[0] is None:
return jsonify({"error": paging[1][1]}), paging[1][0]
limit, offset = paging
q = request.args.get("q") or None
tactic = request.args.get("tactic") or None
technique = request.args.get("technique") or None
subtechnique = request.args.get("subtechnique") or None
opsec_level = request.args.get("opsec") or None
tag = request.args.get("tag") or None
include_deleted = request.args.get("include_deleted", "false").lower() == "true"
try:
items, total = svc.list_test_templates(
q=q,
tactic=tactic,
technique=technique,
subtechnique=subtechnique,
opsec_level=opsec_level,
tag=tag,
include_deleted=include_deleted,
limit=limit,
offset=offset,
)
except ValueError as e:
return jsonify({"error": "invalid_request", "message": str(e)}), 400
return jsonify(
{
"items": [_serialize(it) for it in items],
"total": total,
"limit": limit,
"offset": offset,
}
)
@bp.get("/<template_id>")
@require_auth
@require_perm("test_template.read")
def get_test_template(template_id: str):
tid = _parse_uuid_or_400(template_id)
if tid is None:
return jsonify({"error": "invalid_id"}), 400
include_deleted = request.args.get("include_deleted", "false").lower() == "true"
try:
view = svc.get_test_template(tid, include_deleted=include_deleted)
except svc.TestTemplateNotFound:
return jsonify({"error": "not_found"}), 404
return jsonify(_serialize(view))
@bp.post("")
@require_auth
@require_perm("test_template.create")
def create_test_template():
try:
payload = CreateTestTemplatePayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e:
feat(m7): blue review fields + spec amendment + reviewer follow-ups User feedback after the M7 ship: blue team's Excel workflow had 5 extra fields we didn't capture. Per-test page also doesn't match their workflow — they need a tabular view, one table per scenario. Spec - tasks/spec.md amended (`revised: 2026-05-15`): §4 in-scope, §F6, §8 model bullet. §F6 now pins the column matrix, single-row-edit semantics, Esc-cancel, blur-confirm, and reconciles detection_level as a pill inside the Commentaires cell (no 8th column). - tasks/todo.md M7 section grew an "Amendement 2026-05-15" sub-block tracking backend ☑ and frontend ☐. Backend - Migration c2a8f4b1d6e9: 5 nullable columns on mission_tests (blue_log_source, blue_siem_logs, blue_incident_at, blue_incident_number, blue_incident_recipient_email). - _BLUE_FIELDS extended; update_mission_test_fields propagates each field; MissionTestDetailView + MissionTestView (the nested view in GET /missions/{id}) surface every annotation field, plus last_actor_*, updated_at, detection_level_key — O(1) batch lookup for detection-level keys and last-actor users keeps it scalable. - UpdateMissionTestPayload accepts each field with length caps (120/200_000/120/255). Reviewer follow-ups applied - blue_incident_at + executed_at now reject naïve datetimes (_ensure_aware_datetime) — Postgres would otherwise interpret them in the session TZ, defeating the M7 verbatim-time contract. - blue_incident_recipient_email goes through a permissive RFC-shape regex (_validate_email_shape) so internal/lab TLDs like .local / .corp / .test pass — Pydantic EmailStr is too strict (lessons.md M2 trap). - Project-wide: switched `e.errors()` to `e.errors(include_context=False, include_url=False)` because the AfterValidator-raised ValueError lands in ctx and Flask can't serialize it. Tests - 5 new pytest cases: blue user writes the 5 new fields, red user is individually 403'd on each, round-trip via GET, naïve datetime rejected, email shape validated (.local accepted, bad shape 400). - 138 pytest green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 14:45:18 +02:00
return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
try:
view = svc.create_test_template(
name=payload.name,
description=payload.description,
objective=payload.objective,
procedure_md=payload.procedure_md,
prerequisites_md=payload.prerequisites_md,
expected_result_red_md=payload.expected_result_red_md,
expected_detection_blue_md=payload.expected_detection_blue_md,
opsec_level=payload.opsec_level,
tags=payload.tags,
expected_iocs=payload.expected_iocs,
mitre_tags=[svc.MitreTagRef(kind=t.kind, external_id=t.external_id) for t in payload.mitre_tags],
)
except svc.UnknownMitreTag as e:
return jsonify({"error": "unknown_mitre_tag", "message": str(e)}), 400
except ValueError as e:
return jsonify({"error": "invalid_request", "message": str(e)}), 400
log.info(
"metamorph.test_template.created",
extra={"id": str(view.id), "template_name": view.name},
)
return jsonify(_serialize(view)), 201
@bp.put("/<template_id>")
@require_auth
@require_perm("test_template.update")
def update_test_template(template_id: str):
tid = _parse_uuid_or_400(template_id)
if tid is None:
return jsonify({"error": "invalid_id"}), 400
raw = request.get_json(silent=True) or {}
try:
payload = UpdateTestTemplatePayload.model_validate(raw)
except ValidationError as e:
feat(m7): blue review fields + spec amendment + reviewer follow-ups User feedback after the M7 ship: blue team's Excel workflow had 5 extra fields we didn't capture. Per-test page also doesn't match their workflow — they need a tabular view, one table per scenario. Spec - tasks/spec.md amended (`revised: 2026-05-15`): §4 in-scope, §F6, §8 model bullet. §F6 now pins the column matrix, single-row-edit semantics, Esc-cancel, blur-confirm, and reconciles detection_level as a pill inside the Commentaires cell (no 8th column). - tasks/todo.md M7 section grew an "Amendement 2026-05-15" sub-block tracking backend ☑ and frontend ☐. Backend - Migration c2a8f4b1d6e9: 5 nullable columns on mission_tests (blue_log_source, blue_siem_logs, blue_incident_at, blue_incident_number, blue_incident_recipient_email). - _BLUE_FIELDS extended; update_mission_test_fields propagates each field; MissionTestDetailView + MissionTestView (the nested view in GET /missions/{id}) surface every annotation field, plus last_actor_*, updated_at, detection_level_key — O(1) batch lookup for detection-level keys and last-actor users keeps it scalable. - UpdateMissionTestPayload accepts each field with length caps (120/200_000/120/255). Reviewer follow-ups applied - blue_incident_at + executed_at now reject naïve datetimes (_ensure_aware_datetime) — Postgres would otherwise interpret them in the session TZ, defeating the M7 verbatim-time contract. - blue_incident_recipient_email goes through a permissive RFC-shape regex (_validate_email_shape) so internal/lab TLDs like .local / .corp / .test pass — Pydantic EmailStr is too strict (lessons.md M2 trap). - Project-wide: switched `e.errors()` to `e.errors(include_context=False, include_url=False)` because the AfterValidator-raised ValueError lands in ctx and Flask can't serialize it. Tests - 5 new pytest cases: blue user writes the 5 new fields, red user is individually 403'd on each, round-trip via GET, naïve datetime rejected, email shape validated (.local accepted, bad shape 400). - 138 pytest green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 14:45:18 +02:00
return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400
# Only forward keys actually present in the body — model_validate leaves
# missing fields as None and we can't distinguish "explicitly null" from
# "omitted". The set of keys in `raw` is the wire-level intent.
kwargs: dict[str, Any] = {}
for field_name in (
"name", "description", "objective", "procedure_md", "prerequisites_md",
"expected_result_red_md", "expected_detection_blue_md",
"opsec_level", "tags", "expected_iocs",
):
if field_name in raw:
kwargs[field_name] = getattr(payload, field_name)
if "mitre_tags" in raw:
kwargs["mitre_tags"] = (
[svc.MitreTagRef(kind=t.kind, external_id=t.external_id) for t in (payload.mitre_tags or [])]
)
try:
view = svc.update_test_template(tid, **kwargs)
except svc.TestTemplateNotFound:
return jsonify({"error": "not_found"}), 404
except svc.UnknownMitreTag as e:
return jsonify({"error": "unknown_mitre_tag", "message": str(e)}), 400
except ValueError as e:
return jsonify({"error": "invalid_request", "message": str(e)}), 400
log.info("metamorph.test_template.updated", extra={"id": str(tid), "fields": sorted(kwargs.keys())})
return jsonify(_serialize(view))
@bp.delete("/<template_id>")
@require_auth
@require_perm("test_template.delete")
def soft_delete_test_template(template_id: str):
tid = _parse_uuid_or_400(template_id)
if tid is None:
return jsonify({"error": "invalid_id"}), 400
try:
svc.soft_delete_test_template(tid)
except svc.TestTemplateNotFound:
return jsonify({"error": "not_found"}), 404
log.info("metamorph.test_template.soft_deleted", extra={"id": str(tid)})
return jsonify({"ok": True})