- SimulationTemplate model + migration 0005 (CREATE TABLE + name index) - 5 CRUD endpoints under /api/templates (admin|redteam only, SOC 403) - POST /api/engagements/<eid>/simulations extended with optional template_id - serialize_template() reusing _enrich_techniques/_enrich_tactics helpers - IntegrityError → 409 for duplicate name on both POST and PATCH - 28 new tests (CRUD, RBAC, dedup, instantiation, migration round-trip) - 221 tests pass; ruff clean; mypy clean Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
103 lines
3.8 KiB
Python
103 lines
3.8 KiB
Python
"""JSON serializers for API responses."""
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from backend.app.models import Engagement, User
|
|
from backend.app.models.simulation import Simulation
|
|
from backend.app.models.simulation_template import SimulationTemplate
|
|
|
|
|
|
def serialize_user(user: User) -> dict[str, Any]:
|
|
return {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"role": user.role.value,
|
|
"created_at": user.created_at.isoformat() if user.created_at else None,
|
|
}
|
|
|
|
|
|
def serialize_user_brief(user: User) -> dict[str, Any]:
|
|
return {"id": user.id, "username": user.username}
|
|
|
|
|
|
def _enrich_techniques(raw: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
"""Attach tactics to each {id, name} snapshot from the MITRE service."""
|
|
from backend.app.services import mitre as mitre_svc
|
|
|
|
return [
|
|
{"id": t["id"], "name": t["name"], "tactics": mitre_svc.get_tactics(t["id"])}
|
|
for t in (raw or [])
|
|
]
|
|
|
|
|
|
def _enrich_tactics(tactic_ids: list[str]) -> list[dict[str, str]]:
|
|
"""Resolve TA-ids to {id, name} at runtime."""
|
|
from backend.app.services import mitre as mitre_svc
|
|
|
|
result = []
|
|
for tid in tactic_ids or []:
|
|
entry = mitre_svc.lookup_tactic(tid)
|
|
if entry is not None:
|
|
result.append(entry)
|
|
else:
|
|
result.append({"id": tid, "name": ""})
|
|
return result
|
|
|
|
|
|
def serialize_simulation(simulation: Simulation) -> dict[str, Any]:
|
|
return {
|
|
"id": simulation.id,
|
|
"engagement_id": simulation.engagement_id,
|
|
"name": simulation.name,
|
|
"techniques": _enrich_techniques(simulation.techniques or []),
|
|
"tactics": _enrich_tactics(simulation.tactic_ids or []),
|
|
"description": simulation.description,
|
|
"commands": simulation.commands,
|
|
"prerequisites": simulation.prerequisites,
|
|
"executed_at": simulation.executed_at.isoformat() if simulation.executed_at else None,
|
|
"execution_result": simulation.execution_result,
|
|
"log_source": simulation.log_source,
|
|
"logs": simulation.logs,
|
|
"soc_comment": simulation.soc_comment,
|
|
"incident_number": simulation.incident_number,
|
|
"status": simulation.status.value,
|
|
"created_at": simulation.created_at.isoformat() if simulation.created_at else None,
|
|
"updated_at": simulation.updated_at.isoformat() if simulation.updated_at else None,
|
|
"created_by": serialize_user_brief(simulation.created_by) # type: ignore[arg-type]
|
|
if simulation.created_by
|
|
else None,
|
|
}
|
|
|
|
|
|
def serialize_template(t: SimulationTemplate) -> dict[str, Any]:
|
|
return {
|
|
"id": t.id,
|
|
"name": t.name,
|
|
"description": t.description,
|
|
"commands": t.commands,
|
|
"prerequisites": t.prerequisites,
|
|
"techniques": _enrich_techniques(t.techniques or []),
|
|
"tactics": _enrich_tactics(t.tactic_ids or []),
|
|
"created_at": t.created_at.isoformat() if t.created_at else None,
|
|
"updated_at": t.updated_at.isoformat() if t.updated_at else None,
|
|
"created_by": serialize_user_brief(t.created_by) # type: ignore[arg-type]
|
|
if t.created_by
|
|
else None,
|
|
}
|
|
|
|
|
|
def serialize_engagement(engagement: Engagement) -> dict[str, Any]:
|
|
return {
|
|
"id": engagement.id,
|
|
"name": engagement.name,
|
|
"description": engagement.description,
|
|
"start_date": engagement.start_date.isoformat() if engagement.start_date else None,
|
|
"end_date": engagement.end_date.isoformat() if engagement.end_date else None,
|
|
"status": engagement.status.value,
|
|
"created_at": engagement.created_at.isoformat() if engagement.created_at else None,
|
|
"created_by": serialize_user_brief(engagement.created_by) # type: ignore[arg-type]
|
|
if engagement.created_by
|
|
else None,
|
|
}
|