"""Simulation business logic: PATCH rules and state machine transitions.""" from __future__ import annotations from datetime import UTC, datetime from typing import Any from flask import jsonify from backend.app.extensions import db from backend.app.models import User from backend.app.models.simulation import Simulation, SimulationStatus REDTEAM_FIELDS = frozenset( { "name", "mitre_technique_id", "mitre_technique_name", "description", "commands", "prerequisites", "executed_at", "execution_result", } ) SOC_FIELDS = frozenset({"log_source", "logs", "soc_comment", "incident_number"}) # Transitions allowed via POST /transition endpoint (manual only). # auto pending→in_progress is handled in apply_patch, not here. _ALLOWED_TRANSITIONS: dict[str, dict[str, set[str]]] = { "review_required": { "from": {"pending", "in_progress"}, "roles": {"admin", "redteam"}, }, "done": { "from": {"review_required"}, "roles": {"admin", "redteam", "soc"}, }, } def _is_non_empty(value: Any) -> bool: """Return True if value counts as "filled" for auto-transition purposes.""" if value is None: return False if isinstance(value, str) and value == "": return False return not (isinstance(value, list) and len(value) == 0) def apply_patch( simulation: Simulation, payload: dict[str, Any], user: User ) -> tuple[Any, int] | None: """Apply a validated PATCH payload to a simulation. Returns a (response, status_code) tuple on error, or None on success (caller is responsible for committing). """ role = user.role.value if role == "soc": # SOC can only patch when status allows it. if simulation.status not in ( SimulationStatus.REVIEW_REQUIRED, SimulationStatus.DONE, ): return jsonify({"error": "simulation not ready for SOC review"}), 403 # SOC must not send redteam fields. redteam_keys_in_payload = REDTEAM_FIELDS & payload.keys() if redteam_keys_in_payload: return jsonify({"error": "soc cannot edit redteam fields"}), 403 for field in SOC_FIELDS: if field in payload: setattr(simulation, field, payload[field]) else: # admin / redteam: apply all fields present. redteam_keys_present = REDTEAM_FIELDS & payload.keys() for field in redteam_keys_present: if field == "executed_at": val = payload["executed_at"] if val is None: simulation.executed_at = None else: if not isinstance(val, str): return jsonify({"error": "invalid executed_at"}), 400 try: simulation.executed_at = datetime.fromisoformat(val) except ValueError: return jsonify({"error": "invalid executed_at"}), 400 else: setattr(simulation, field, payload[field]) for field in SOC_FIELDS: if field in payload: setattr(simulation, field, payload[field]) # Auto-transition pending → in_progress: at least one redteam field with # a non-empty value in the *incoming payload*. if simulation.status == SimulationStatus.PENDING and any( _is_non_empty(payload[k]) for k in redteam_keys_present ): simulation.status = SimulationStatus.IN_PROGRESS simulation.updated_at = datetime.now(UTC) return None def transition( simulation: Simulation, to_status: str, user: User ) -> tuple[Any, int] | None: """Attempt a manual transition. Returns error tuple or None on success.""" rule = _ALLOWED_TRANSITIONS.get(to_status) if rule is None: return jsonify({"error": "invalid transition"}), 409 if simulation.status.value not in rule["from"]: return jsonify({"error": "invalid transition"}), 409 if user.role.value not in rule["roles"]: return jsonify({"error": "Forbidden"}), 403 simulation.status = SimulationStatus(to_status) simulation.updated_at = datetime.now(UTC) db.session.commit() return None