"""Test-template CRUD endpoints. Reads gated by `test_template.read`. Writes gated by `test_template.{create, update,delete}`. Service layer handles all DB work; this module only validates the wire payload and shapes the JSON response. """ from __future__ import annotations import logging import uuid from typing import Any from flask import Blueprint, jsonify, request from pydantic import BaseModel, Field, StringConstraints, ValidationError from typing import Annotated from app.core.auth_decorators import require_auth, require_perm from app.services import test_templates as svc # Tag and IOC entries are stored as PG ARRAY(String(N)). Cap items at the wire # layer so over-sized inputs return 400 with a useful message rather than the # bare StringDataRightTruncation from the driver. TagStr = Annotated[str, StringConstraints(min_length=1, max_length=64)] IocStr = Annotated[str, StringConstraints(min_length=1, max_length=255)] bp = Blueprint("test_templates", __name__, url_prefix="/test-templates") log = logging.getLogger("metamorph.api.test_templates") # === Payload schemas ========================================================== class MitreTagIn(BaseModel): kind: str = Field(min_length=1) external_id: str = Field(min_length=1, max_length=16) model_config = {"extra": "forbid"} class CreateTestTemplatePayload(BaseModel): name: str = Field(min_length=1, max_length=255) description: str | None = Field(default=None, max_length=4000) objective: str | None = Field(default=None, max_length=4000) procedure_md: str | None = Field(default=None, max_length=32_000) prerequisites_md: str | None = Field(default=None, max_length=32_000) expected_result_red_md: str | None = Field(default=None, max_length=32_000) expected_detection_blue_md: str | None = Field(default=None, max_length=32_000) opsec_level: str = Field(default="medium") tags: list[TagStr] = Field(default_factory=list, max_length=64) expected_iocs: list[IocStr] = Field(default_factory=list, max_length=128) mitre_tags: list[MitreTagIn] = Field(default_factory=list, max_length=64) model_config = {"extra": "forbid"} class UpdateTestTemplatePayload(BaseModel): name: str | None = Field(default=None, min_length=1, max_length=255) description: str | None = Field(default=None, max_length=4000) objective: str | None = Field(default=None, max_length=4000) procedure_md: str | None = Field(default=None, max_length=32_000) prerequisites_md: str | None = Field(default=None, max_length=32_000) expected_result_red_md: str | None = Field(default=None, max_length=32_000) expected_detection_blue_md: str | None = Field(default=None, max_length=32_000) opsec_level: str | None = None tags: list[TagStr] | None = Field(default=None, max_length=64) expected_iocs: list[IocStr] | None = Field(default=None, max_length=128) mitre_tags: list[MitreTagIn] | None = Field(default=None, max_length=64) model_config = {"extra": "forbid"} # === Serializers ============================================================== def _serialize(t: svc.TestTemplateView) -> dict[str, Any]: return { "id": str(t.id), "name": t.name, "description": t.description, "objective": t.objective, "procedure_md": t.procedure_md, "prerequisites_md": t.prerequisites_md, "expected_result_red_md": t.expected_result_red_md, "expected_detection_blue_md": t.expected_detection_blue_md, "opsec_level": t.opsec_level, "tags": list(t.tags), "expected_iocs": list(t.expected_iocs), "mitre_tags": [ {"kind": tag.kind, "external_id": tag.external_id, "name": tag.name, "url": tag.url} for tag in t.mitre_tags ], "deleted_at": t.deleted_at.isoformat() if t.deleted_at else None, "created_at": t.created_at.isoformat(), "updated_at": t.updated_at.isoformat(), } def _parse_uuid_or_400(raw: str): try: return uuid.UUID(raw) except ValueError: return None def _pagination_args() -> tuple[int, int] | tuple[None, tuple[int, str]]: try: limit = int(request.args.get("limit", "100")) offset = int(request.args.get("offset", "0")) except ValueError: return None, (400, "invalid_pagination") return max(1, min(limit, 500)), max(0, offset) # === Endpoints ================================================================ @bp.get("") @require_auth @require_perm("test_template.read") def list_test_templates(): paging = _pagination_args() if paging[0] is None: return jsonify({"error": paging[1][1]}), paging[1][0] limit, offset = paging q = request.args.get("q") or None tactic = request.args.get("tactic") or None technique = request.args.get("technique") or None subtechnique = request.args.get("subtechnique") or None opsec_level = request.args.get("opsec") or None tag = request.args.get("tag") or None include_deleted = request.args.get("include_deleted", "false").lower() == "true" try: items, total = svc.list_test_templates( q=q, tactic=tactic, technique=technique, subtechnique=subtechnique, opsec_level=opsec_level, tag=tag, include_deleted=include_deleted, limit=limit, offset=offset, ) except ValueError as e: return jsonify({"error": "invalid_request", "message": str(e)}), 400 return jsonify( { "items": [_serialize(it) for it in items], "total": total, "limit": limit, "offset": offset, } ) @bp.get("/") @require_auth @require_perm("test_template.read") def get_test_template(template_id: str): tid = _parse_uuid_or_400(template_id) if tid is None: return jsonify({"error": "invalid_id"}), 400 include_deleted = request.args.get("include_deleted", "false").lower() == "true" try: view = svc.get_test_template(tid, include_deleted=include_deleted) except svc.TestTemplateNotFound: return jsonify({"error": "not_found"}), 404 return jsonify(_serialize(view)) @bp.post("") @require_auth @require_perm("test_template.create") def create_test_template(): try: payload = CreateTestTemplatePayload.model_validate(request.get_json(silent=True) or {}) except ValidationError as e: return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400 try: view = svc.create_test_template( name=payload.name, description=payload.description, objective=payload.objective, procedure_md=payload.procedure_md, prerequisites_md=payload.prerequisites_md, expected_result_red_md=payload.expected_result_red_md, expected_detection_blue_md=payload.expected_detection_blue_md, opsec_level=payload.opsec_level, tags=payload.tags, expected_iocs=payload.expected_iocs, mitre_tags=[svc.MitreTagRef(kind=t.kind, external_id=t.external_id) for t in payload.mitre_tags], ) except svc.UnknownMitreTag as e: return jsonify({"error": "unknown_mitre_tag", "message": str(e)}), 400 except ValueError as e: return jsonify({"error": "invalid_request", "message": str(e)}), 400 log.info( "metamorph.test_template.created", extra={"id": str(view.id), "template_name": view.name}, ) return jsonify(_serialize(view)), 201 @bp.put("/") @require_auth @require_perm("test_template.update") def update_test_template(template_id: str): tid = _parse_uuid_or_400(template_id) if tid is None: return jsonify({"error": "invalid_id"}), 400 raw = request.get_json(silent=True) or {} try: payload = UpdateTestTemplatePayload.model_validate(raw) except ValidationError as e: return jsonify({"error": "invalid_request", "details": e.errors(include_context=False, include_url=False)}), 400 # Only forward keys actually present in the body — model_validate leaves # missing fields as None and we can't distinguish "explicitly null" from # "omitted". The set of keys in `raw` is the wire-level intent. kwargs: dict[str, Any] = {} for field_name in ( "name", "description", "objective", "procedure_md", "prerequisites_md", "expected_result_red_md", "expected_detection_blue_md", "opsec_level", "tags", "expected_iocs", ): if field_name in raw: kwargs[field_name] = getattr(payload, field_name) if "mitre_tags" in raw: kwargs["mitre_tags"] = ( [svc.MitreTagRef(kind=t.kind, external_id=t.external_id) for t in (payload.mitre_tags or [])] ) try: view = svc.update_test_template(tid, **kwargs) except svc.TestTemplateNotFound: return jsonify({"error": "not_found"}), 404 except svc.UnknownMitreTag as e: return jsonify({"error": "unknown_mitre_tag", "message": str(e)}), 400 except ValueError as e: return jsonify({"error": "invalid_request", "message": str(e)}), 400 log.info("metamorph.test_template.updated", extra={"id": str(tid), "fields": sorted(kwargs.keys())}) return jsonify(_serialize(view)) @bp.delete("/") @require_auth @require_perm("test_template.delete") def soft_delete_test_template(template_id: str): tid = _parse_uuid_or_400(template_id) if tid is None: return jsonify({"error": "invalid_id"}), 400 try: svc.soft_delete_test_template(tid) except svc.TestTemplateNotFound: return jsonify({"error": "not_found"}), 404 log.info("metamorph.test_template.soft_deleted", extra={"id": str(tid)}) return jsonify({"ok": True})