Files
mimic/backend/app/services/simulation_workflow.py
Knacky 83bf60fb30 fix(backend): post-review fixes sprint 2
- test_simulations_patch: remove false dict return annotation on _patch helper
- simulation_workflow: validate executed_at upfront before any setattr (prevents partial mutation on bad payload)
- api/simulations: remove unreachable role check in update_simulation (all valid roles are admin/redteam/soc)
- Dockerfile: remove redundant COPY backend/data/ (already covered by COPY backend/)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 11:21:32 +02:00

133 lines
4.3 KiB
Python

"""Simulation business logic: PATCH rules and state machine transitions."""
from __future__ import annotations
from datetime import UTC, datetime
from typing import Any
from flask import jsonify
from backend.app.extensions import db
from backend.app.models import User
from backend.app.models.simulation import Simulation, SimulationStatus
REDTEAM_FIELDS = frozenset(
{
"name",
"mitre_technique_id",
"mitre_technique_name",
"description",
"commands",
"prerequisites",
"executed_at",
"execution_result",
}
)
SOC_FIELDS = frozenset({"log_source", "logs", "soc_comment", "incident_number"})
# Transitions allowed via POST /transition endpoint (manual only).
# auto pending→in_progress is handled in apply_patch, not here.
_ALLOWED_TRANSITIONS: dict[str, dict[str, set[str]]] = {
"review_required": {
"from": {"pending", "in_progress"},
"roles": {"admin", "redteam"},
},
"done": {
"from": {"review_required"},
"roles": {"admin", "redteam", "soc"},
},
}
def _is_non_empty(value: Any) -> bool:
"""Return True if value counts as "filled" for auto-transition purposes."""
if value is None:
return False
if isinstance(value, str) and value == "":
return False
return not (isinstance(value, list) and len(value) == 0)
def apply_patch(
simulation: Simulation, payload: dict[str, Any], user: User
) -> tuple[Any, int] | None:
"""Apply a validated PATCH payload to a simulation.
Returns a (response, status_code) tuple on error, or None on success
(caller is responsible for committing).
"""
role = user.role.value
if role == "soc":
# SOC can only patch when status allows it.
if simulation.status not in (
SimulationStatus.REVIEW_REQUIRED,
SimulationStatus.DONE,
):
return jsonify({"error": "simulation not ready for SOC review"}), 403
# SOC must not send redteam fields.
redteam_keys_in_payload = REDTEAM_FIELDS & payload.keys()
if redteam_keys_in_payload:
return jsonify({"error": "soc cannot edit redteam fields"}), 403
for field in SOC_FIELDS:
if field in payload:
setattr(simulation, field, payload[field])
else:
# admin / redteam: apply all fields present.
redteam_keys_present = REDTEAM_FIELDS & payload.keys()
# Validate executed_at before any writes so a bad value causes no partial mutation.
executed_at_value: datetime | None = None
if "executed_at" in redteam_keys_present:
val = payload["executed_at"]
if val is not None:
if not isinstance(val, str):
return jsonify({"error": "invalid executed_at"}), 400
try:
executed_at_value = datetime.fromisoformat(val)
except ValueError:
return jsonify({"error": "invalid executed_at"}), 400
for field in redteam_keys_present:
if field == "executed_at":
simulation.executed_at = executed_at_value
else:
setattr(simulation, field, payload[field])
for field in SOC_FIELDS:
if field in payload:
setattr(simulation, field, payload[field])
# Auto-transition pending → in_progress: at least one redteam field with
# a non-empty value in the *incoming payload*.
if simulation.status == SimulationStatus.PENDING and any(
_is_non_empty(payload[k]) for k in redteam_keys_present
):
simulation.status = SimulationStatus.IN_PROGRESS
simulation.updated_at = datetime.now(UTC)
return None
def transition(
simulation: Simulation, to_status: str, user: User
) -> tuple[Any, int] | None:
"""Attempt a manual transition. Returns error tuple or None on success."""
rule = _ALLOWED_TRANSITIONS.get(to_status)
if rule is None:
return jsonify({"error": "invalid transition"}), 409
if simulation.status.value not in rule["from"]:
return jsonify({"error": "invalid transition"}), 409
if user.role.value not in rule["roles"]:
return jsonify({"error": "Forbidden"}), 403
simulation.status = SimulationStatus(to_status)
simulation.updated_at = datetime.now(UTC)
db.session.commit()
return None