- Simulation model: add tactic_ids JSON column (nullable=False, default=[])
- Migration 0004: ADD COLUMN tactic_ids (server_default='[]', no batch needed)
- mitre.py: add _TACTIC_IDS map, lookup_tactic(), get_tactic_name()
- simulation_workflow.py: done guard (409) before RBAC; SOC gate += tactic_ids;
_resolve_tactic_ids() validates against hardcoded map; auto-transition += tactic_ids;
transition done→review_required is Reopen (all 3 roles); _maybe_activate_engagement hook
- serializers.py: _enrich_tactics() → serialize_simulation adds tactics:[{id,name}]
- test_simulations_tactics.py: valid/invalid/dedup/SOC gate/auto-transition/no-bundle
- test_simulations_done_readonly.py: 409 all roles, Reopen all roles, invalid transitions, after-reopen ok
- test_engagement_lifecycle.py: planned→active on auto-transition, already active/closed unchanged, migration 0004 round-trip
- Updated test_simulations_patch.py + test_simulations_workflow.py for AC-18 behavior
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
235 lines
8.4 KiB
Python
235 lines
8.4 KiB
Python
"""Simulation business logic: PATCH rules and state machine transitions."""
|
|
from __future__ import annotations
|
|
|
|
from datetime import UTC, datetime
|
|
from typing import Any
|
|
|
|
from flask import jsonify
|
|
|
|
from backend.app.extensions import db
|
|
from backend.app.models import User
|
|
from backend.app.models.simulation import Simulation, SimulationStatus
|
|
|
|
# Fields only admin/redteam may write (excluding technique_ids/tactic_ids handled separately).
|
|
REDTEAM_FIELDS = frozenset(
|
|
{
|
|
"name",
|
|
"description",
|
|
"commands",
|
|
"prerequisites",
|
|
"executed_at",
|
|
"execution_result",
|
|
}
|
|
)
|
|
|
|
SOC_FIELDS = frozenset({"log_source", "logs", "soc_comment", "incident_number"})
|
|
|
|
_ALLOWED_TRANSITIONS: dict[str, dict[str, set[str]]] = {
|
|
"review_required": {
|
|
"from": {"pending", "in_progress"},
|
|
"roles": {"admin", "redteam"},
|
|
},
|
|
"done": {
|
|
"from": {"review_required"},
|
|
"roles": {"admin", "redteam", "soc"},
|
|
},
|
|
}
|
|
|
|
|
|
def _is_non_empty(value: Any) -> bool:
|
|
"""Return True if value counts as "filled" for auto-transition purposes."""
|
|
if value is None:
|
|
return False
|
|
if isinstance(value, str) and value == "":
|
|
return False
|
|
return not (isinstance(value, list) and len(value) == 0)
|
|
|
|
|
|
def _resolve_technique_ids(
|
|
technique_ids: list[str],
|
|
) -> tuple[list[dict[str, str]] | None, tuple[Any, int] | None]:
|
|
"""Validate and resolve technique IDs to [{id, name}] snapshots.
|
|
|
|
Returns (resolved_list, None) on success or (None, error_tuple) on failure.
|
|
Deduplicates while preserving order.
|
|
"""
|
|
from backend.app.services import mitre as mitre_svc
|
|
|
|
if not mitre_svc.mitre_loaded:
|
|
return None, (jsonify({"error": "mitre bundle not loaded"}), 503)
|
|
|
|
seen: dict[str, None] = dict.fromkeys(technique_ids)
|
|
resolved: list[dict[str, str]] = []
|
|
for tid in seen:
|
|
name = mitre_svc.lookup_name(tid)
|
|
if name is None:
|
|
return None, (jsonify({"error": f"unknown technique id: {tid}"}), 400)
|
|
resolved.append({"id": tid, "name": name})
|
|
return resolved, None
|
|
|
|
|
|
def _resolve_tactic_ids(
|
|
tactic_ids: list[str],
|
|
) -> tuple[list[str] | None, tuple[Any, int] | None]:
|
|
"""Validate and deduplicate tactic TA-ids.
|
|
|
|
Returns (deduped_list, None) on success or (None, error_tuple) on failure.
|
|
Bundle does not need to be loaded — validation is against the hardcoded _TACTIC_IDS map.
|
|
"""
|
|
from backend.app.services import mitre as mitre_svc
|
|
|
|
seen: dict[str, None] = dict.fromkeys(tactic_ids)
|
|
for tid in seen:
|
|
if mitre_svc.lookup_tactic(tid) is None:
|
|
return None, (jsonify({"error": f"unknown tactic id: {tid}"}), 400)
|
|
return list(seen), None
|
|
|
|
|
|
def _maybe_activate_engagement(simulation: Simulation) -> None:
|
|
"""If simulation's engagement is planned, advance it to active.
|
|
|
|
Caller must commit — do not commit here to avoid double-commit.
|
|
"""
|
|
from backend.app.models.engagement import Engagement, EngagementStatus
|
|
|
|
engagement: Engagement | None = getattr(simulation, "engagement", None)
|
|
if engagement is not None and engagement.status == EngagementStatus.PLANNED:
|
|
engagement.status = EngagementStatus.ACTIVE
|
|
db.session.add(engagement)
|
|
|
|
|
|
def apply_patch(
|
|
simulation: Simulation, payload: dict[str, Any], user: User
|
|
) -> tuple[Any, int] | None:
|
|
"""Apply a validated PATCH payload to a simulation.
|
|
|
|
Returns a (response, status_code) tuple on error, or None on success
|
|
(caller is responsible for committing).
|
|
"""
|
|
# Done guard — applies to ALL roles before any RBAC check.
|
|
if simulation.status == SimulationStatus.DONE:
|
|
return jsonify({"error": "simulation is done — reopen first"}), 409
|
|
|
|
role = user.role.value
|
|
|
|
if role == "soc":
|
|
if simulation.status not in (
|
|
SimulationStatus.REVIEW_REQUIRED,
|
|
SimulationStatus.DONE,
|
|
):
|
|
return jsonify({"error": "simulation not ready for SOC review"}), 403
|
|
|
|
# SOC must not send redteam fields, technique_ids, or tactic_ids.
|
|
redteam_keys_in_payload = (
|
|
REDTEAM_FIELDS | {"technique_ids", "tactic_ids"}
|
|
) & payload.keys()
|
|
if redteam_keys_in_payload:
|
|
return jsonify({"error": "soc cannot edit redteam fields"}), 403
|
|
|
|
for field in SOC_FIELDS:
|
|
if field in payload:
|
|
setattr(simulation, field, payload[field])
|
|
|
|
else:
|
|
# admin / redteam path.
|
|
redteam_keys_present = REDTEAM_FIELDS & payload.keys()
|
|
|
|
# Validate executed_at upfront before any writes.
|
|
executed_at_value: datetime | None = None
|
|
if "executed_at" in redteam_keys_present:
|
|
val = payload["executed_at"]
|
|
if val is not None:
|
|
if not isinstance(val, str):
|
|
return jsonify({"error": "invalid executed_at"}), 400
|
|
try:
|
|
executed_at_value = datetime.fromisoformat(val)
|
|
except ValueError:
|
|
return jsonify({"error": "invalid executed_at"}), 400
|
|
|
|
# Validate and resolve technique_ids upfront.
|
|
resolved_techniques: list[dict[str, str]] | None = None
|
|
if "technique_ids" in payload:
|
|
raw_ids = payload["technique_ids"]
|
|
if not isinstance(raw_ids, list):
|
|
return jsonify({"error": "technique_ids must be a list"}), 400
|
|
resolved_techniques, err = _resolve_technique_ids(raw_ids)
|
|
if err is not None:
|
|
return err
|
|
|
|
# Validate and deduplicate tactic_ids upfront.
|
|
resolved_tactic_ids: list[str] | None = None
|
|
if "tactic_ids" in payload:
|
|
raw_tids = payload["tactic_ids"]
|
|
if not isinstance(raw_tids, list):
|
|
return jsonify({"error": "tactic_ids must be a list"}), 400
|
|
resolved_tactic_ids, err = _resolve_tactic_ids(raw_tids)
|
|
if err is not None:
|
|
return err
|
|
|
|
# Apply scalar redteam fields.
|
|
for field in redteam_keys_present:
|
|
if field == "executed_at":
|
|
simulation.executed_at = executed_at_value
|
|
else:
|
|
setattr(simulation, field, payload[field])
|
|
|
|
# Apply resolved techniques.
|
|
if resolved_techniques is not None:
|
|
simulation.techniques = resolved_techniques
|
|
|
|
# Apply resolved tactic_ids.
|
|
if resolved_tactic_ids is not None:
|
|
simulation.tactic_ids = resolved_tactic_ids
|
|
|
|
# Apply SOC fields (admin/redteam may also write them).
|
|
for field in SOC_FIELDS:
|
|
if field in payload:
|
|
setattr(simulation, field, payload[field])
|
|
|
|
# Auto-transition pending → in_progress.
|
|
# Triggers when any redteam scalar has a non-empty value, technique_ids or tactic_ids non-empty.
|
|
auto_trigger = any(_is_non_empty(payload[k]) for k in redteam_keys_present)
|
|
if not auto_trigger and "technique_ids" in payload:
|
|
auto_trigger = len(payload["technique_ids"]) > 0
|
|
if not auto_trigger and "tactic_ids" in payload:
|
|
auto_trigger = len(payload["tactic_ids"]) > 0
|
|
|
|
if simulation.status == SimulationStatus.PENDING and auto_trigger:
|
|
simulation.status = SimulationStatus.IN_PROGRESS
|
|
_maybe_activate_engagement(simulation)
|
|
|
|
simulation.updated_at = datetime.now(UTC)
|
|
return None
|
|
|
|
|
|
def transition(
|
|
simulation: Simulation, to_status: str, user: User
|
|
) -> tuple[Any, int] | None:
|
|
"""Attempt a manual transition. Returns error tuple or None on success."""
|
|
# Special case: done → review_required (Reopen), allowed for all 3 roles.
|
|
if to_status == "review_required" and simulation.status == SimulationStatus.DONE:
|
|
simulation.status = SimulationStatus.REVIEW_REQUIRED
|
|
simulation.updated_at = datetime.now(UTC)
|
|
db.session.commit()
|
|
return None
|
|
|
|
rule = _ALLOWED_TRANSITIONS.get(to_status)
|
|
if rule is None:
|
|
return jsonify({"error": "invalid transition"}), 409
|
|
|
|
if simulation.status.value not in rule["from"]:
|
|
return jsonify({"error": "invalid transition"}), 409
|
|
|
|
if user.role.value not in rule["roles"]:
|
|
return jsonify({"error": "Forbidden"}), 403
|
|
|
|
simulation.status = SimulationStatus(to_status)
|
|
simulation.updated_at = datetime.now(UTC)
|
|
|
|
# Hook: auto-activate engagement when simulation enters in_progress via manual transition.
|
|
if simulation.status == SimulationStatus.IN_PROGRESS:
|
|
_maybe_activate_engagement(simulation)
|
|
|
|
db.session.commit()
|
|
return None
|