- Simulation model: replace mitre_technique_id/name scalars with techniques JSON column [{id, name}]
- Alembic migration 0003: add techniques, backfill from scalars, drop old columns (reversible)
- MITRE service: add get_tactics(), lookup_name(), get_matrix() with canonical tactic order and sub-technique nesting
- serializer: enrich techniques with tactics from service at serialize time (graceful empty tactics if bundle outdated)
- simulation_workflow: PATCH now accepts technique_ids list, validates against bundle, deduplicates preserving order, auto-transitions on non-empty list
- simulations API: add GET /api/mitre/matrix endpoint (503 if bundle absent)
- test_mitre.py: updated _reset_mitre fixture, added T1059.006 sub-technique, 14 new tests for get_tactics/lookup_name/get_matrix/matrix endpoint
- test_simulations_techniques.py: 20 new tests covering AC-13.1 to AC-13.5 (create, PATCH, dedup, auto-transition, SOC blocked, migration backfill logic)
Total: 161 tests passing. ruff clean. mypy: no new errors.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
70 lines
2.6 KiB
Python
70 lines
2.6 KiB
Python
"""JSON serializers for API responses."""
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from backend.app.models import Engagement, User
|
|
from backend.app.models.simulation import Simulation
|
|
|
|
|
|
def serialize_user(user: User) -> dict[str, Any]:
|
|
return {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"role": user.role.value,
|
|
"created_at": user.created_at.isoformat() if user.created_at else None,
|
|
}
|
|
|
|
|
|
def serialize_user_brief(user: User) -> dict[str, Any]:
|
|
return {"id": user.id, "username": user.username}
|
|
|
|
|
|
def _enrich_techniques(raw: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
"""Attach tactics to each {id, name} snapshot from the MITRE service."""
|
|
from backend.app.services import mitre as mitre_svc
|
|
|
|
return [
|
|
{"id": t["id"], "name": t["name"], "tactics": mitre_svc.get_tactics(t["id"])}
|
|
for t in (raw or [])
|
|
]
|
|
|
|
|
|
def serialize_simulation(simulation: Simulation) -> dict[str, Any]:
|
|
return {
|
|
"id": simulation.id,
|
|
"engagement_id": simulation.engagement_id,
|
|
"name": simulation.name,
|
|
"techniques": _enrich_techniques(simulation.techniques or []),
|
|
"description": simulation.description,
|
|
"commands": simulation.commands,
|
|
"prerequisites": simulation.prerequisites,
|
|
"executed_at": simulation.executed_at.isoformat() if simulation.executed_at else None,
|
|
"execution_result": simulation.execution_result,
|
|
"log_source": simulation.log_source,
|
|
"logs": simulation.logs,
|
|
"soc_comment": simulation.soc_comment,
|
|
"incident_number": simulation.incident_number,
|
|
"status": simulation.status.value,
|
|
"created_at": simulation.created_at.isoformat() if simulation.created_at else None,
|
|
"updated_at": simulation.updated_at.isoformat() if simulation.updated_at else None,
|
|
"created_by": serialize_user_brief(simulation.created_by) # type: ignore[arg-type]
|
|
if simulation.created_by
|
|
else None,
|
|
}
|
|
|
|
|
|
def serialize_engagement(engagement: Engagement) -> dict[str, Any]:
|
|
return {
|
|
"id": engagement.id,
|
|
"name": engagement.name,
|
|
"description": engagement.description,
|
|
"start_date": engagement.start_date.isoformat() if engagement.start_date else None,
|
|
"end_date": engagement.end_date.isoformat() if engagement.end_date else None,
|
|
"status": engagement.status.value,
|
|
"created_at": engagement.created_at.isoformat() if engagement.created_at else None,
|
|
"created_by": serialize_user_brief(engagement.created_by) # type: ignore[arg-type]
|
|
if engagement.created_by
|
|
else None,
|
|
}
|