"""Simulation business logic: PATCH rules and state machine transitions.""" from __future__ import annotations from datetime import UTC, datetime from typing import Any from flask import jsonify from backend.app.extensions import db from backend.app.models import User from backend.app.models.simulation import Simulation, SimulationStatus # Fields only admin/redteam may write (excluding technique_ids/tactic_ids handled separately). REDTEAM_FIELDS = frozenset( { "name", "description", "commands", "prerequisites", "executed_at", "execution_result", } ) SOC_FIELDS = frozenset({"log_source", "logs", "soc_comment", "incident_number"}) _ALLOWED_TRANSITIONS: dict[str, dict[str, set[str]]] = { "review_required": { "from": {"pending", "in_progress"}, "roles": {"admin", "redteam"}, }, "done": { "from": {"review_required"}, "roles": {"admin", "redteam", "soc"}, }, } def _is_non_empty(value: Any) -> bool: """Return True if value counts as "filled" for auto-transition purposes.""" if value is None: return False if isinstance(value, str) and value == "": return False return not (isinstance(value, list) and len(value) == 0) def _resolve_technique_ids( technique_ids: list[str], ) -> tuple[list[dict[str, str]] | None, tuple[Any, int] | None]: """Validate and resolve technique IDs to [{id, name}] snapshots. Returns (resolved_list, None) on success or (None, error_tuple) on failure. Deduplicates while preserving order. """ from backend.app.services import mitre as mitre_svc if not mitre_svc.mitre_loaded: return None, (jsonify({"error": "mitre bundle not loaded"}), 503) seen: dict[str, None] = dict.fromkeys(technique_ids) resolved: list[dict[str, str]] = [] for tid in seen: name = mitre_svc.lookup_name(tid) if name is None: return None, (jsonify({"error": f"unknown technique id: {tid}"}), 400) resolved.append({"id": tid, "name": name}) return resolved, None def _resolve_tactic_ids( tactic_ids: list[str], ) -> tuple[list[str] | None, tuple[Any, int] | None]: """Validate and deduplicate tactic TA-ids. Returns (deduped_list, None) on success or (None, error_tuple) on failure. Bundle does not need to be loaded — validation is against the hardcoded _TACTIC_IDS map. """ from backend.app.services import mitre as mitre_svc seen: dict[str, None] = dict.fromkeys(tactic_ids) for tid in seen: if mitre_svc.lookup_tactic(tid) is None: return None, (jsonify({"error": f"unknown tactic id: {tid}"}), 400) return list(seen), None def _maybe_activate_engagement(simulation: Simulation) -> None: """If simulation's engagement is planned, advance it to active. Caller must commit — do not commit here to avoid double-commit. """ from backend.app.models.engagement import Engagement, EngagementStatus engagement: Engagement | None = getattr(simulation, "engagement", None) if engagement is not None and engagement.status == EngagementStatus.PLANNED: engagement.status = EngagementStatus.ACTIVE db.session.add(engagement) def apply_patch( simulation: Simulation, payload: dict[str, Any], user: User ) -> tuple[Any, int] | None: """Apply a validated PATCH payload to a simulation. Returns a (response, status_code) tuple on error, or None on success (caller is responsible for committing). """ # Done guard — applies to ALL roles before any RBAC check. if simulation.status == SimulationStatus.DONE: return jsonify({"error": "simulation is done — reopen first"}), 409 role = user.role.value if role == "soc": if simulation.status not in ( SimulationStatus.REVIEW_REQUIRED, SimulationStatus.DONE, ): return jsonify({"error": "simulation not ready for SOC review"}), 403 # SOC must not send redteam fields, technique_ids, or tactic_ids. redteam_keys_in_payload = ( REDTEAM_FIELDS | {"technique_ids", "tactic_ids"} ) & payload.keys() if redteam_keys_in_payload: return jsonify({"error": "soc cannot edit redteam fields"}), 403 for field in SOC_FIELDS: if field in payload: setattr(simulation, field, payload[field]) else: # admin / redteam path. redteam_keys_present = REDTEAM_FIELDS & payload.keys() # Validate executed_at upfront before any writes. executed_at_value: datetime | None = None if "executed_at" in redteam_keys_present: val = payload["executed_at"] if val is not None: if not isinstance(val, str): return jsonify({"error": "invalid executed_at"}), 400 try: executed_at_value = datetime.fromisoformat(val) except ValueError: return jsonify({"error": "invalid executed_at"}), 400 # Validate and resolve technique_ids upfront. resolved_techniques: list[dict[str, str]] | None = None if "technique_ids" in payload: raw_ids = payload["technique_ids"] if not isinstance(raw_ids, list): return jsonify({"error": "technique_ids must be a list"}), 400 resolved_techniques, err = _resolve_technique_ids(raw_ids) if err is not None: return err # Validate and deduplicate tactic_ids upfront. resolved_tactic_ids: list[str] | None = None if "tactic_ids" in payload: raw_tids = payload["tactic_ids"] if not isinstance(raw_tids, list): return jsonify({"error": "tactic_ids must be a list"}), 400 resolved_tactic_ids, err = _resolve_tactic_ids(raw_tids) if err is not None: return err # Apply scalar redteam fields. for field in redteam_keys_present: if field == "executed_at": simulation.executed_at = executed_at_value else: setattr(simulation, field, payload[field]) # Apply resolved techniques. if resolved_techniques is not None: simulation.techniques = resolved_techniques # Apply resolved tactic_ids. if resolved_tactic_ids is not None: simulation.tactic_ids = resolved_tactic_ids # Apply SOC fields (admin/redteam may also write them). for field in SOC_FIELDS: if field in payload: setattr(simulation, field, payload[field]) # Auto-transition pending → in_progress. # Triggers when any redteam scalar has a non-empty value, technique_ids or tactic_ids non-empty. auto_trigger = any(_is_non_empty(payload[k]) for k in redteam_keys_present) if not auto_trigger and "technique_ids" in payload: auto_trigger = len(payload["technique_ids"]) > 0 if not auto_trigger and "tactic_ids" in payload: auto_trigger = len(payload["tactic_ids"]) > 0 if simulation.status == SimulationStatus.PENDING and auto_trigger: simulation.status = SimulationStatus.IN_PROGRESS _maybe_activate_engagement(simulation) simulation.updated_at = datetime.now(UTC) return None def transition( simulation: Simulation, to_status: str, user: User ) -> tuple[Any, int] | None: """Attempt a manual transition. Returns error tuple or None on success.""" # Special case: done → review_required (Reopen), allowed for all 3 roles. if to_status == "review_required" and simulation.status == SimulationStatus.DONE: simulation.status = SimulationStatus.REVIEW_REQUIRED simulation.updated_at = datetime.now(UTC) db.session.commit() return None rule = _ALLOWED_TRANSITIONS.get(to_status) if rule is None: return jsonify({"error": "invalid transition"}), 409 if simulation.status.value not in rule["from"]: return jsonify({"error": "invalid transition"}), 409 if user.role.value not in rule["roles"]: return jsonify({"error": "Forbidden"}), 403 simulation.status = SimulationStatus(to_status) simulation.updated_at = datetime.now(UTC) db.session.commit() return None