- Service `app/services/test_templates.py`: CRUD with MITRE tag resolution (kind, external_id) → polymorphic join, filters by tactic/technique/ subtechnique/opsec/tag, `_UNSET` sentinel for partial-update semantics. - Service `app/services/scenario_templates.py`: ordered test list, reorder via full-replace (atomic w.r.t. UNIQUE(position) constraint), soft-delete. - REST endpoints on /api/v1/test-templates and /scenario-templates with pydantic schemas + perm gating (test_template.* and scenario_template.*). - /diag/reset truncates the 4 new tables before MITRE (FK ordering). - 19 pytest covering CRUD, MITRE tag merge, soft-delete chaining, perm enforcement, and reorder atomicity. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
209 lines
6.9 KiB
Python
209 lines
6.9 KiB
Python
"""Scenario-template CRUD + reorder endpoints.
|
|
|
|
`PUT /<id>/tests` is the reorder/replace endpoint — it takes the full ordered
|
|
list and rewrites the join rows. There's no partial mutation API for the test
|
|
list: the wire contract is simpler and the client (drag-and-drop) already
|
|
holds the full ordering.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import uuid
|
|
from typing import Any
|
|
|
|
from flask import Blueprint, jsonify, request
|
|
from pydantic import BaseModel, Field, ValidationError
|
|
|
|
from app.core.auth_decorators import require_auth, require_perm
|
|
from app.services import scenario_templates as svc
|
|
|
|
bp = Blueprint("scenario_templates", __name__, url_prefix="/scenario-templates")
|
|
log = logging.getLogger("metamorph.api.scenario_templates")
|
|
|
|
|
|
class CreateScenarioPayload(BaseModel):
|
|
name: str = Field(min_length=1, max_length=255)
|
|
description: str | None = Field(default=None, max_length=4000)
|
|
test_template_ids: list[uuid.UUID] = Field(default_factory=list, max_length=512)
|
|
|
|
model_config = {"extra": "forbid"}
|
|
|
|
|
|
class UpdateScenarioPayload(BaseModel):
|
|
name: str | None = Field(default=None, min_length=1, max_length=255)
|
|
description: str | None = Field(default=None, max_length=4000)
|
|
|
|
model_config = {"extra": "forbid"}
|
|
|
|
|
|
class SetTestsPayload(BaseModel):
|
|
test_template_ids: list[uuid.UUID] = Field(default_factory=list, max_length=512)
|
|
|
|
model_config = {"extra": "forbid"}
|
|
|
|
|
|
def _serialize(sc: svc.ScenarioTemplateView) -> dict[str, Any]:
|
|
return {
|
|
"id": str(sc.id),
|
|
"name": sc.name,
|
|
"description": sc.description,
|
|
"tests": [
|
|
{
|
|
"position": t.position,
|
|
"test_template_id": str(t.test_template_id),
|
|
"test_template_name": t.test_template_name,
|
|
"test_template_deleted": t.test_template_deleted,
|
|
}
|
|
for t in sc.tests
|
|
],
|
|
"tests_count": sc.tests_count,
|
|
"deleted_at": sc.deleted_at.isoformat() if sc.deleted_at else None,
|
|
"created_at": sc.created_at.isoformat(),
|
|
"updated_at": sc.updated_at.isoformat(),
|
|
}
|
|
|
|
|
|
def _parse_uuid_or_400(raw: str):
|
|
try:
|
|
return uuid.UUID(raw)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _pagination_args() -> tuple[int, int] | tuple[None, tuple[int, str]]:
|
|
try:
|
|
limit = int(request.args.get("limit", "100"))
|
|
offset = int(request.args.get("offset", "0"))
|
|
except ValueError:
|
|
return None, (400, "invalid_pagination")
|
|
return max(1, min(limit, 500)), max(0, offset)
|
|
|
|
|
|
@bp.get("")
|
|
@require_auth
|
|
@require_perm("scenario_template.read")
|
|
def list_scenario_templates():
|
|
paging = _pagination_args()
|
|
if paging[0] is None:
|
|
return jsonify({"error": paging[1][1]}), paging[1][0]
|
|
limit, offset = paging
|
|
q = request.args.get("q") or None
|
|
include_deleted = request.args.get("include_deleted", "false").lower() == "true"
|
|
items, total = svc.list_scenario_templates(
|
|
q=q, include_deleted=include_deleted, limit=limit, offset=offset
|
|
)
|
|
return jsonify(
|
|
{
|
|
"items": [_serialize(it) for it in items],
|
|
"total": total,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
}
|
|
)
|
|
|
|
|
|
@bp.get("/<scenario_id>")
|
|
@require_auth
|
|
@require_perm("scenario_template.read")
|
|
def get_scenario_template(scenario_id: str):
|
|
sid = _parse_uuid_or_400(scenario_id)
|
|
if sid is None:
|
|
return jsonify({"error": "invalid_id"}), 400
|
|
include_deleted = request.args.get("include_deleted", "false").lower() == "true"
|
|
try:
|
|
view = svc.get_scenario_template(sid, include_deleted=include_deleted)
|
|
except svc.ScenarioTemplateNotFound:
|
|
return jsonify({"error": "not_found"}), 404
|
|
return jsonify(_serialize(view))
|
|
|
|
|
|
@bp.post("")
|
|
@require_auth
|
|
@require_perm("scenario_template.create")
|
|
def create_scenario_template():
|
|
try:
|
|
payload = CreateScenarioPayload.model_validate(request.get_json(silent=True) or {})
|
|
except ValidationError as e:
|
|
return jsonify({"error": "invalid_request", "details": e.errors()}), 400
|
|
try:
|
|
view = svc.create_scenario_template(
|
|
name=payload.name,
|
|
description=payload.description,
|
|
test_template_ids=payload.test_template_ids,
|
|
)
|
|
except svc.UnknownTestTemplate as e:
|
|
return jsonify({"error": "unknown_test_template", "message": str(e)}), 400
|
|
except ValueError as e:
|
|
return jsonify({"error": "invalid_request", "message": str(e)}), 400
|
|
log.info(
|
|
"metamorph.scenario_template.created",
|
|
extra={"id": str(view.id), "tests": len(view.tests)},
|
|
)
|
|
return jsonify(_serialize(view)), 201
|
|
|
|
|
|
@bp.patch("/<scenario_id>")
|
|
@require_auth
|
|
@require_perm("scenario_template.update")
|
|
def update_scenario_template(scenario_id: str):
|
|
sid = _parse_uuid_or_400(scenario_id)
|
|
if sid is None:
|
|
return jsonify({"error": "invalid_id"}), 400
|
|
raw = request.get_json(silent=True) or {}
|
|
try:
|
|
payload = UpdateScenarioPayload.model_validate(raw)
|
|
except ValidationError as e:
|
|
return jsonify({"error": "invalid_request", "details": e.errors()}), 400
|
|
kwargs: dict[str, Any] = {}
|
|
if "name" in raw:
|
|
kwargs["name"] = payload.name
|
|
if "description" in raw:
|
|
kwargs["description"] = payload.description
|
|
try:
|
|
view = svc.update_scenario_template(sid, **kwargs)
|
|
except svc.ScenarioTemplateNotFound:
|
|
return jsonify({"error": "not_found"}), 404
|
|
except ValueError as e:
|
|
return jsonify({"error": "invalid_request", "message": str(e)}), 400
|
|
return jsonify(_serialize(view))
|
|
|
|
|
|
@bp.put("/<scenario_id>/tests")
|
|
@require_auth
|
|
@require_perm("scenario_template.update")
|
|
def set_scenario_tests(scenario_id: str):
|
|
sid = _parse_uuid_or_400(scenario_id)
|
|
if sid is None:
|
|
return jsonify({"error": "invalid_id"}), 400
|
|
try:
|
|
payload = SetTestsPayload.model_validate(request.get_json(silent=True) or {})
|
|
except ValidationError as e:
|
|
return jsonify({"error": "invalid_request", "details": e.errors()}), 400
|
|
try:
|
|
view = svc.set_scenario_tests(sid, payload.test_template_ids)
|
|
except svc.ScenarioTemplateNotFound:
|
|
return jsonify({"error": "not_found"}), 404
|
|
except svc.UnknownTestTemplate as e:
|
|
return jsonify({"error": "unknown_test_template", "message": str(e)}), 400
|
|
log.info(
|
|
"metamorph.scenario_template.tests_set",
|
|
extra={"id": str(sid), "tests": len(view.tests)},
|
|
)
|
|
return jsonify(_serialize(view))
|
|
|
|
|
|
@bp.delete("/<scenario_id>")
|
|
@require_auth
|
|
@require_perm("scenario_template.delete")
|
|
def soft_delete_scenario_template(scenario_id: str):
|
|
sid = _parse_uuid_or_400(scenario_id)
|
|
if sid is None:
|
|
return jsonify({"error": "invalid_id"}), 400
|
|
try:
|
|
svc.soft_delete_scenario_template(sid)
|
|
except svc.ScenarioTemplateNotFound:
|
|
return jsonify({"error": "not_found"}), 404
|
|
log.info("metamorph.scenario_template.soft_deleted", extra={"id": str(sid)})
|
|
return jsonify({"ok": True})
|