From d5ab1fd26f57be2e812e21d7b9ccfafd4e9ee786 Mon Sep 17 00:00:00 2001 From: Knacky Date: Wed, 27 May 2026 19:52:02 +0200 Subject: [PATCH] =?UTF-8?q?feat(backend):=20sprint=204=20=E2=80=94=20tacti?= =?UTF-8?q?c=5Fids=20+=20done=20guard=20+=20engagement=20auto-status?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Simulation model: add tactic_ids JSON column (nullable=False, default=[]) - Migration 0004: ADD COLUMN tactic_ids (server_default='[]', no batch needed) - mitre.py: add _TACTIC_IDS map, lookup_tactic(), get_tactic_name() - simulation_workflow.py: done guard (409) before RBAC; SOC gate += tactic_ids; _resolve_tactic_ids() validates against hardcoded map; auto-transition += tactic_ids; transition done→review_required is Reopen (all 3 roles); _maybe_activate_engagement hook - serializers.py: _enrich_tactics() → serialize_simulation adds tactics:[{id,name}] - test_simulations_tactics.py: valid/invalid/dedup/SOC gate/auto-transition/no-bundle - test_simulations_done_readonly.py: 409 all roles, Reopen all roles, invalid transitions, after-reopen ok - test_engagement_lifecycle.py: planned→active on auto-transition, already active/closed unchanged, migration 0004 round-trip - Updated test_simulations_patch.py + test_simulations_workflow.py for AC-18 behavior Co-Authored-By: Claude Sonnet 4.6 --- backend/app/models/simulation.py | 1 + backend/app/serializers.py | 15 ++ backend/app/services/mitre.py | 32 +++ backend/app/services/simulation_workflow.py | 74 +++++- .../versions/0004_simulation_tactic_ids.py | 33 +++ backend/tests/test_engagement_lifecycle.py | 178 +++++++++++++ .../tests/test_simulations_done_readonly.py | 191 ++++++++++++++ backend/tests/test_simulations_patch.py | 6 +- backend/tests/test_simulations_tactics.py | 237 ++++++++++++++++++ backend/tests/test_simulations_techniques.py | 2 +- backend/tests/test_simulations_workflow.py | 6 +- 11 files changed, 765 insertions(+), 10 deletions(-) create mode 100644 backend/migrations/versions/0004_simulation_tactic_ids.py create mode 100644 backend/tests/test_engagement_lifecycle.py create mode 100644 backend/tests/test_simulations_done_readonly.py create mode 100644 backend/tests/test_simulations_tactics.py diff --git a/backend/app/models/simulation.py b/backend/app/models/simulation.py index 74d99df..9dfc0cf 100644 --- a/backend/app/models/simulation.py +++ b/backend/app/models/simulation.py @@ -26,6 +26,7 @@ class Simulation(db.Model): # type: ignore[name-defined] ) name = db.Column(db.String(255), nullable=False) techniques = db.Column(db.JSON, nullable=False, default=list) + tactic_ids = db.Column(db.JSON, nullable=False, default=list) description = db.Column(db.Text, nullable=True) commands = db.Column(db.Text, nullable=True) prerequisites = db.Column(db.Text, nullable=True) diff --git a/backend/app/serializers.py b/backend/app/serializers.py index d54e9cc..41bf4d6 100644 --- a/backend/app/serializers.py +++ b/backend/app/serializers.py @@ -30,12 +30,27 @@ def _enrich_techniques(raw: list[dict[str, Any]]) -> list[dict[str, Any]]: ] +def _enrich_tactics(tactic_ids: list[str]) -> list[dict[str, str]]: + """Resolve TA-ids to {id, name} at runtime.""" + from backend.app.services import mitre as mitre_svc + + result = [] + for tid in tactic_ids or []: + entry = mitre_svc.lookup_tactic(tid) + if entry is not None: + result.append(entry) + else: + result.append({"id": tid, "name": ""}) + return result + + def serialize_simulation(simulation: Simulation) -> dict[str, Any]: return { "id": simulation.id, "engagement_id": simulation.engagement_id, "name": simulation.name, "techniques": _enrich_techniques(simulation.techniques or []), + "tactics": _enrich_tactics(simulation.tactic_ids or []), "description": simulation.description, "commands": simulation.commands, "prerequisites": simulation.prerequisites, diff --git a/backend/app/services/mitre.py b/backend/app/services/mitre.py index 6dd91e2..62909f5 100644 --- a/backend/app/services/mitre.py +++ b/backend/app/services/mitre.py @@ -26,6 +26,22 @@ _TACTIC_ORDER = [ "impact", ] +# TA-id → short-name mapping (MITRE Enterprise, IDs are not sequential). +_TACTIC_IDS: dict[str, str] = { + "TA0001": "initial-access", + "TA0002": "execution", + "TA0003": "persistence", + "TA0004": "privilege-escalation", + "TA0005": "defense-evasion", + "TA0006": "credential-access", + "TA0007": "discovery", + "TA0008": "lateral-movement", + "TA0009": "collection", + "TA0011": "command-and-control", + "TA0010": "exfiltration", + "TA0040": "impact", +} + TACTIC_NAMES: dict[str, str] = { "initial-access": "Initial Access", "execution": "Execution", @@ -181,6 +197,22 @@ def get_matrix() -> list[dict[str, Any]]: return _matrix +def lookup_tactic(tactic_id: str) -> dict[str, str] | None: + """Return {id, name} for a TA-id, or None if unknown.""" + short = _TACTIC_IDS.get(tactic_id) + if short is None: + return None + return {"id": tactic_id, "name": TACTIC_NAMES[short]} + + +def get_tactic_name(tactic_id: str) -> str | None: + """Return the display name for a TA-id, or None if unknown.""" + short = _TACTIC_IDS.get(tactic_id) + if short is None: + return None + return TACTIC_NAMES[short] + + def search(query: str, limit: int = 20) -> list[dict[str, Any]]: """Return up to `limit` techniques matching `query`. diff --git a/backend/app/services/simulation_workflow.py b/backend/app/services/simulation_workflow.py index 2df406d..03f6386 100644 --- a/backend/app/services/simulation_workflow.py +++ b/backend/app/services/simulation_workflow.py @@ -10,7 +10,7 @@ from backend.app.extensions import db from backend.app.models import User from backend.app.models.simulation import Simulation, SimulationStatus -# Fields only admin/redteam may write (excluding technique_ids which is handled separately). +# Fields only admin/redteam may write (excluding technique_ids/tactic_ids handled separately). REDTEAM_FIELDS = frozenset( { "name", @@ -58,7 +58,6 @@ def _resolve_technique_ids( if not mitre_svc.mitre_loaded: return None, (jsonify({"error": "mitre bundle not loaded"}), 503) - # Dedup, preserve order. seen: dict[str, None] = dict.fromkeys(technique_ids) resolved: list[dict[str, str]] = [] for tid in seen: @@ -69,6 +68,36 @@ def _resolve_technique_ids( return resolved, None +def _resolve_tactic_ids( + tactic_ids: list[str], +) -> tuple[list[str] | None, tuple[Any, int] | None]: + """Validate and deduplicate tactic TA-ids. + + Returns (deduped_list, None) on success or (None, error_tuple) on failure. + Bundle does not need to be loaded — validation is against the hardcoded _TACTIC_IDS map. + """ + from backend.app.services import mitre as mitre_svc + + seen: dict[str, None] = dict.fromkeys(tactic_ids) + for tid in seen: + if mitre_svc.lookup_tactic(tid) is None: + return None, (jsonify({"error": f"unknown tactic id: {tid}"}), 400) + return list(seen), None + + +def _maybe_activate_engagement(simulation: Simulation) -> None: + """If simulation's engagement is planned, advance it to active. + + Caller must commit — do not commit here to avoid double-commit. + """ + from backend.app.models.engagement import Engagement, EngagementStatus + + engagement: Engagement | None = getattr(simulation, "engagement", None) + if engagement is not None and engagement.status == EngagementStatus.PLANNED: + engagement.status = EngagementStatus.ACTIVE + db.session.add(engagement) + + def apply_patch( simulation: Simulation, payload: dict[str, Any], user: User ) -> tuple[Any, int] | None: @@ -77,6 +106,10 @@ def apply_patch( Returns a (response, status_code) tuple on error, or None on success (caller is responsible for committing). """ + # Done guard — applies to ALL roles before any RBAC check. + if simulation.status == SimulationStatus.DONE: + return jsonify({"error": "simulation is done — reopen first"}), 409 + role = user.role.value if role == "soc": @@ -86,8 +119,10 @@ def apply_patch( ): return jsonify({"error": "simulation not ready for SOC review"}), 403 - # SOC must not send redteam fields or technique_ids. - redteam_keys_in_payload = (REDTEAM_FIELDS | {"technique_ids"}) & payload.keys() + # SOC must not send redteam fields, technique_ids, or tactic_ids. + redteam_keys_in_payload = ( + REDTEAM_FIELDS | {"technique_ids", "tactic_ids"} + ) & payload.keys() if redteam_keys_in_payload: return jsonify({"error": "soc cannot edit redteam fields"}), 403 @@ -121,6 +156,16 @@ def apply_patch( if err is not None: return err + # Validate and deduplicate tactic_ids upfront. + resolved_tactic_ids: list[str] | None = None + if "tactic_ids" in payload: + raw_tids = payload["tactic_ids"] + if not isinstance(raw_tids, list): + return jsonify({"error": "tactic_ids must be a list"}), 400 + resolved_tactic_ids, err = _resolve_tactic_ids(raw_tids) + if err is not None: + return err + # Apply scalar redteam fields. for field in redteam_keys_present: if field == "executed_at": @@ -132,19 +177,26 @@ def apply_patch( if resolved_techniques is not None: simulation.techniques = resolved_techniques + # Apply resolved tactic_ids. + if resolved_tactic_ids is not None: + simulation.tactic_ids = resolved_tactic_ids + # Apply SOC fields (admin/redteam may also write them). for field in SOC_FIELDS: if field in payload: setattr(simulation, field, payload[field]) # Auto-transition pending → in_progress. - # Triggers when any redteam scalar has a non-empty value, OR technique_ids is non-empty. + # Triggers when any redteam scalar has a non-empty value, technique_ids or tactic_ids non-empty. auto_trigger = any(_is_non_empty(payload[k]) for k in redteam_keys_present) if not auto_trigger and "technique_ids" in payload: auto_trigger = len(payload["technique_ids"]) > 0 + if not auto_trigger and "tactic_ids" in payload: + auto_trigger = len(payload["tactic_ids"]) > 0 if simulation.status == SimulationStatus.PENDING and auto_trigger: simulation.status = SimulationStatus.IN_PROGRESS + _maybe_activate_engagement(simulation) simulation.updated_at = datetime.now(UTC) return None @@ -154,6 +206,13 @@ def transition( simulation: Simulation, to_status: str, user: User ) -> tuple[Any, int] | None: """Attempt a manual transition. Returns error tuple or None on success.""" + # Special case: done → review_required (Reopen), allowed for all 3 roles. + if to_status == "review_required" and simulation.status == SimulationStatus.DONE: + simulation.status = SimulationStatus.REVIEW_REQUIRED + simulation.updated_at = datetime.now(UTC) + db.session.commit() + return None + rule = _ALLOWED_TRANSITIONS.get(to_status) if rule is None: return jsonify({"error": "invalid transition"}), 409 @@ -166,5 +225,10 @@ def transition( simulation.status = SimulationStatus(to_status) simulation.updated_at = datetime.now(UTC) + + # Hook: auto-activate engagement when simulation enters in_progress via manual transition. + if simulation.status == SimulationStatus.IN_PROGRESS: + _maybe_activate_engagement(simulation) + db.session.commit() return None diff --git a/backend/migrations/versions/0004_simulation_tactic_ids.py b/backend/migrations/versions/0004_simulation_tactic_ids.py new file mode 100644 index 0000000..70622e7 --- /dev/null +++ b/backend/migrations/versions/0004_simulation_tactic_ids.py @@ -0,0 +1,33 @@ +"""add tactic_ids JSON column to simulations + +Revision ID: 0004 +Revises: 0003 +Create Date: 2026-05-27 00:00:00.000000 +""" +import sqlalchemy as sa +from alembic import op +from sqlalchemy.sql import text + +revision = "0004" +down_revision = "0003" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ADD COLUMN is safe on SQLite without batch mode. + # server_default='[]' satisfies NOT NULL for existing rows. + op.add_column( + "simulations", + sa.Column( + "tactic_ids", + sa.JSON(), + nullable=False, + server_default=text("'[]'"), + ), + ) + + +def downgrade() -> None: + with op.batch_alter_table("simulations") as batch_op: + batch_op.drop_column("tactic_ids") diff --git a/backend/tests/test_engagement_lifecycle.py b/backend/tests/test_engagement_lifecycle.py new file mode 100644 index 0000000..b3f0c26 --- /dev/null +++ b/backend/tests/test_engagement_lifecycle.py @@ -0,0 +1,178 @@ +"""Sprint 4 — engagement auto-status planned→active (AC-19).""" +from __future__ import annotations + +from flask.testing import FlaskClient + +from backend.tests.conftest import auth_headers as _h + + +def _make_engagement(client: FlaskClient, token: str, **kwargs) -> dict: + payload = {"name": "Eng", "start_date": "2026-01-01", **kwargs} + resp = client.post("/api/engagements", headers=_h(token), json=payload) + assert resp.status_code == 201 + return resp.get_json() + + +def _get_engagement(client: FlaskClient, token: str, eid: int) -> dict: + resp = client.get(f"/api/engagements/{eid}", headers=_h(token)) + assert resp.status_code == 200 + return resp.get_json() + + +def _make_sim(client: FlaskClient, token: str, eid: int) -> dict: + resp = client.post( + f"/api/engagements/{eid}/simulations", + headers=_h(token), + json={"name": "Sim"}, + ) + assert resp.status_code == 201 + return resp.get_json() + + +def _patch_sim(client: FlaskClient, token: str, sid: int, payload: dict) -> dict: + resp = client.patch(f"/api/simulations/{sid}", headers=_h(token), json=payload) + assert resp.status_code == 200 + return resp.get_json() + + +# --------------------------------------------------------------------------- +# AC-19.1 — Auto-activate engagement on first sim in_progress +# --------------------------------------------------------------------------- + + +def test_sim_creation_does_not_activate_engagement( + client: FlaskClient, redteam_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + _make_sim(client, redteam_token, eng["id"]) + + eng_data = _get_engagement(client, redteam_token, eng["id"]) + assert eng_data["status"] == "planned" + + +def test_patch_rt_field_activates_planned_engagement( + client: FlaskClient, redteam_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + assert sim["status"] == "pending" + + sim_data = _patch_sim(client, redteam_token, sim["id"], {"description": "started"}) + assert sim_data["status"] == "in_progress" + + eng_data = _get_engagement(client, redteam_token, eng["id"]) + assert eng_data["status"] == "active" + + +def test_patch_tactic_ids_activates_planned_engagement( + client: FlaskClient, redteam_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + + _patch_sim(client, redteam_token, sim["id"], {"tactic_ids": ["TA0007"]}) + + eng_data = _get_engagement(client, redteam_token, eng["id"]) + assert eng_data["status"] == "active" + + +# --------------------------------------------------------------------------- +# AC-19.2 — Already active → stays active (no change) +# --------------------------------------------------------------------------- + + +def test_patch_rt_field_does_not_change_active_engagement( + client: FlaskClient, redteam_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + # First patch triggers activation. + _patch_sim(client, redteam_token, sim["id"], {"description": "started"}) + + # Second patch: engagement should remain active (no state change). + _patch_sim(client, redteam_token, sim["id"], {"description": "updated"}) + + eng_data = _get_engagement(client, redteam_token, eng["id"]) + assert eng_data["status"] == "active" + + +# --------------------------------------------------------------------------- +# AC-19.3 — Engagement in closed state → not touched +# --------------------------------------------------------------------------- + + +def test_patch_does_not_reopen_closed_engagement( + client: FlaskClient, redteam_token: str, admin_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + + # Manually close the engagement via API. + close_resp = client.patch( + f"/api/engagements/{eng['id']}", + headers=_h(admin_token), + json={"status": "closed"}, + ) + assert close_resp.status_code == 200 + + # PATCH a sim field that would normally trigger in_progress. + _patch_sim(client, redteam_token, sim["id"], {"description": "new work"}) + + eng_data = _get_engagement(client, redteam_token, eng["id"]) + assert eng_data["status"] == "closed" + + +# --------------------------------------------------------------------------- +# Migration 0004 — tactic_ids column NOT NULL after upgrade +# --------------------------------------------------------------------------- + + +def test_migration_0004_tactic_ids_not_null_after_upgrade() -> None: + """Alembic round-trip: tactic_ids column is NOT NULL after migration 0004.""" + import importlib + + import sqlalchemy as _sa + from alembic.operations import Operations + from alembic.runtime.migration import MigrationContext + + engine = _sa.create_engine("sqlite:///:memory:") + + # Create post-0003 schema (simulations with techniques column). + with engine.begin() as conn: + conn.execute(_sa.text( + "CREATE TABLE simulations (" + " id INTEGER PRIMARY KEY," + " techniques TEXT NOT NULL DEFAULT '[]'" + ")" + )) + conn.execute(_sa.text( + "INSERT INTO simulations (id, techniques) VALUES (1, '[]')" + )) + + with engine.begin() as conn: + ctx = MigrationContext.configure(conn, opts={"as_sql": False}) + ops = Operations(ctx) + + import alembic.op as _op_module + _op_module._proxy = ops # type: ignore[attr-defined] + + spec = importlib.util.spec_from_file_location( + "mig_0004", + "/home/user/Documents/01_Projects/mimic/.claude/worktrees/sprint-4-ui-polish/backend/migrations/versions/0004_simulation_tactic_ids.py", + ) + assert spec is not None and spec.loader is not None + mig = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mig) # type: ignore[union-attr] + mig.upgrade() + + insp = _sa.inspect(engine) + cols = {c["name"]: c for c in insp.get_columns("simulations")} + assert "tactic_ids" in cols, "tactic_ids column must exist after upgrade" + assert cols["tactic_ids"]["nullable"] is False, "tactic_ids must be NOT NULL" + + # Existing row should have server_default applied. + with engine.connect() as conn: + row = conn.execute(_sa.text("SELECT tactic_ids FROM simulations WHERE id=1")).fetchone() + assert row is not None + import json + assert json.loads(row[0]) == [] diff --git a/backend/tests/test_simulations_done_readonly.py b/backend/tests/test_simulations_done_readonly.py new file mode 100644 index 0000000..8f5609c --- /dev/null +++ b/backend/tests/test_simulations_done_readonly.py @@ -0,0 +1,191 @@ +"""Sprint 4 — done read-only + Reopen tests (AC-18).""" +from __future__ import annotations + +import pytest +from flask.testing import FlaskClient + +from backend.tests.conftest import auth_headers as _h + + +def _make_engagement(client: FlaskClient, token: str) -> dict: + resp = client.post( + "/api/engagements", + headers=_h(token), + json={"name": "Eng", "start_date": "2026-01-01"}, + ) + assert resp.status_code == 201 + return resp.get_json() + + +def _make_sim(client: FlaskClient, token: str, eid: int) -> dict: + resp = client.post( + f"/api/engagements/{eid}/simulations", + headers=_h(token), + json={"name": "Sim"}, + ) + assert resp.status_code == 201 + return resp.get_json() + + +def _advance_to_done(client: FlaskClient, redteam_token: str, soc_token: str, sid: int) -> None: + client.post( + f"/api/simulations/{sid}/transition", + headers=_h(redteam_token), + json={"to": "review_required"}, + ) + client.post( + f"/api/simulations/{sid}/transition", + headers=_h(soc_token), + json={"to": "done"}, + ) + + +def _patch(client: FlaskClient, token: str, sid: int, payload: dict): + return client.patch( + f"/api/simulations/{sid}", + headers=_h(token), + json=payload, + ) + + +def _transition(client: FlaskClient, token: str, sid: int, to: str): + return client.post( + f"/api/simulations/{sid}/transition", + headers=_h(token), + json={"to": to}, + ) + + +# --------------------------------------------------------------------------- +# AC-18.1 — PATCH on done → 409 for all roles +# --------------------------------------------------------------------------- + + +def test_patch_done_sim_admin_returns_409( + client: FlaskClient, redteam_token: str, soc_token: str, admin_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + _advance_to_done(client, redteam_token, soc_token, sim["id"]) + + resp = _patch(client, admin_token, sim["id"], {"name": "renamed"}) + assert resp.status_code == 409 + assert resp.get_json()["error"] == "simulation is done — reopen first" + + +def test_patch_done_sim_redteam_returns_409( + client: FlaskClient, redteam_token: str, soc_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + _advance_to_done(client, redteam_token, soc_token, sim["id"]) + + resp = _patch(client, redteam_token, sim["id"], {"description": "x"}) + assert resp.status_code == 409 + + +def test_patch_done_sim_soc_returns_409( + client: FlaskClient, redteam_token: str, soc_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + _advance_to_done(client, redteam_token, soc_token, sim["id"]) + + resp = _patch(client, soc_token, sim["id"], {"soc_comment": "afterthought"}) + assert resp.status_code == 409 + + +# --------------------------------------------------------------------------- +# AC-18.2 — Reopen: done → review_required, all 3 roles +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("role", ["redteam", "soc", "admin"]) +def test_reopen_done_sim_allowed_for_all_roles( + client: FlaskClient, + redteam_token: str, + soc_token: str, + admin_token: str, + role: str, +) -> None: + token = {"redteam": redteam_token, "soc": soc_token, "admin": admin_token}[role] + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + _advance_to_done(client, redteam_token, soc_token, sim["id"]) + + resp = _transition(client, token, sim["id"], "review_required") + assert resp.status_code == 200 + assert resp.get_json()["status"] == "review_required" + + +# --------------------------------------------------------------------------- +# AC-18.3 — Other transitions from done → 409 +# --------------------------------------------------------------------------- + + +def test_transition_done_to_done_rejected( + client: FlaskClient, redteam_token: str, soc_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + _advance_to_done(client, redteam_token, soc_token, sim["id"]) + + resp = _transition(client, redteam_token, sim["id"], "done") + assert resp.status_code == 409 + + +def test_transition_done_to_in_progress_rejected( + client: FlaskClient, redteam_token: str, soc_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + _advance_to_done(client, redteam_token, soc_token, sim["id"]) + + resp = _transition(client, redteam_token, sim["id"], "in_progress") + assert resp.status_code == 409 + + +def test_transition_done_to_pending_rejected( + client: FlaskClient, redteam_token: str, soc_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + _advance_to_done(client, redteam_token, soc_token, sim["id"]) + + resp = _transition(client, redteam_token, sim["id"], "pending") + assert resp.status_code == 409 + + +# --------------------------------------------------------------------------- +# After reopen, PATCH is allowed again +# --------------------------------------------------------------------------- + + +def test_patch_allowed_after_reopen( + client: FlaskClient, redteam_token: str, soc_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + _advance_to_done(client, redteam_token, soc_token, sim["id"]) + _transition(client, redteam_token, sim["id"], "review_required") + + resp = _patch(client, soc_token, sim["id"], {"soc_comment": "re-reviewed"}) + assert resp.status_code == 200 + assert resp.get_json()["soc_comment"] == "re-reviewed" + + +# --------------------------------------------------------------------------- +# AC-18.3 — Normal review_required path (pending/in_progress) unchanged +# --------------------------------------------------------------------------- + + +def test_transition_review_required_from_in_progress_still_needs_redteam( + client: FlaskClient, redteam_token: str, soc_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + # Auto-advance to in_progress. + _patch(client, redteam_token, sim["id"], {"description": "active"}) + + resp = _transition(client, soc_token, sim["id"], "review_required") + assert resp.status_code == 403 diff --git a/backend/tests/test_simulations_patch.py b/backend/tests/test_simulations_patch.py index b9cb21e..fe96f2d 100644 --- a/backend/tests/test_simulations_patch.py +++ b/backend/tests/test_simulations_patch.py @@ -230,9 +230,10 @@ def test_soc_can_patch_when_review_required( assert body["incident_number"] == "INC-001" -def test_soc_can_patch_when_done( +def test_patch_when_done_returns_409( client: FlaskClient, redteam_token: str, soc_token: str ) -> None: + """Done is terminal — PATCH is rejected for ALL roles (AC-18.1).""" eng = _make_engagement(client, redteam_token) sim = _make_sim(client, redteam_token, eng["id"]) client.post( @@ -247,7 +248,8 @@ def test_soc_can_patch_when_done( ) resp = _patch(client, soc_token, sim["id"], {"soc_comment": "Final note"}) - assert resp.status_code == 200 + assert resp.status_code == 409 + assert resp.get_json()["error"] == "simulation is done — reopen first" def test_soc_cannot_edit_redteam_fields( diff --git a/backend/tests/test_simulations_tactics.py b/backend/tests/test_simulations_tactics.py new file mode 100644 index 0000000..c674c03 --- /dev/null +++ b/backend/tests/test_simulations_tactics.py @@ -0,0 +1,237 @@ +"""Sprint 4 — tactic_ids PATCH tests (AC-21).""" +from __future__ import annotations + +import pathlib + +import pytest +from flask.testing import FlaskClient + +from backend.app.services import mitre as mitre_svc +from backend.tests.conftest import auth_headers as _h + +_FIXTURE_BUNDLE = { + "type": "bundle", + "objects": [ + { + "type": "attack-pattern", + "name": "Command and Scripting Interpreter", + "external_references": [{"source_name": "mitre-attack", "external_id": "T1059"}], + "kill_chain_phases": [{"phase_name": "execution", "kill_chain_name": "mitre-attack"}], + }, + ], +} + + +@pytest.fixture(autouse=True) +def _reset_mitre(): + original_loaded = mitre_svc.mitre_loaded + original_index = list(mitre_svc._index) + original_tactics = dict(mitre_svc._tactics_by_technique) + original_names = dict(mitre_svc._name_by_id) + original_matrix = list(mitre_svc._matrix) + yield + mitre_svc.mitre_loaded = original_loaded + mitre_svc._index = original_index + mitre_svc._tactics_by_technique = original_tactics + mitre_svc._name_by_id = original_names + mitre_svc._matrix = original_matrix + + +@pytest.fixture() +def bundle_file(tmp_path: pathlib.Path) -> pathlib.Path: + import json + p = tmp_path / "enterprise-attack.json" + p.write_text(json.dumps(_FIXTURE_BUNDLE), encoding="utf-8") + return p + + +def _make_engagement(client: FlaskClient, token: str) -> dict: + resp = client.post( + "/api/engagements", + headers=_h(token), + json={"name": "Eng", "start_date": "2026-01-01"}, + ) + assert resp.status_code == 201 + return resp.get_json() + + +def _make_sim(client: FlaskClient, token: str, eid: int) -> dict: + resp = client.post( + f"/api/engagements/{eid}/simulations", + headers=_h(token), + json={"name": "Sim"}, + ) + assert resp.status_code == 201 + return resp.get_json() + + +def _patch(client: FlaskClient, token: str, sid: int, payload: dict): + return client.patch( + f"/api/simulations/{sid}", + headers=_h(token), + json=payload, + ) + + +# --------------------------------------------------------------------------- +# tactic_ids happy path +# --------------------------------------------------------------------------- + + +def test_patch_tactic_ids_valid( + client: FlaskClient, redteam_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + + resp = _patch(client, redteam_token, sim["id"], {"tactic_ids": ["TA0007"]}) + assert resp.status_code == 200 + data = resp.get_json() + assert data["tactics"] == [{"id": "TA0007", "name": "Discovery"}] + + +def test_patch_tactic_ids_multiple( + client: FlaskClient, redteam_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + + resp = _patch(client, redteam_token, sim["id"], {"tactic_ids": ["TA0001", "TA0002"]}) + assert resp.status_code == 200 + tactics = resp.get_json()["tactics"] + ids = [t["id"] for t in tactics] + assert "TA0001" in ids + assert "TA0002" in ids + + +def test_patch_tactic_ids_empty_clears( + client: FlaskClient, redteam_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + _patch(client, redteam_token, sim["id"], {"tactic_ids": ["TA0007"]}) + + resp = _patch(client, redteam_token, sim["id"], {"tactic_ids": []}) + assert resp.status_code == 200 + assert resp.get_json()["tactics"] == [] + + +def test_patch_tactic_ids_dedup( + client: FlaskClient, redteam_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + + resp = _patch(client, redteam_token, sim["id"], {"tactic_ids": ["TA0007", "TA0007"]}) + assert resp.status_code == 200 + tactics = resp.get_json()["tactics"] + assert len(tactics) == 1 + + +# --------------------------------------------------------------------------- +# tactic_ids error paths +# --------------------------------------------------------------------------- + + +def test_patch_tactic_ids_unknown_returns_400( + client: FlaskClient, redteam_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + + resp = _patch(client, redteam_token, sim["id"], {"tactic_ids": ["TA9999"]}) + assert resp.status_code == 400 + assert "unknown tactic id" in resp.get_json()["error"] + + +def test_patch_tactic_ids_not_a_list_returns_400( + client: FlaskClient, redteam_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + + resp = _patch(client, redteam_token, sim["id"], {"tactic_ids": "TA0007"}) + assert resp.status_code == 400 + + +# --------------------------------------------------------------------------- +# SOC gate +# --------------------------------------------------------------------------- + + +def test_soc_cannot_patch_tactic_ids( + client: FlaskClient, redteam_token: str, soc_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + # Advance to review_required so SOC can act. + client.post( + f"/api/simulations/{sim['id']}/transition", + headers=_h(redteam_token), + json={"to": "review_required"}, + ) + + resp = _patch(client, soc_token, sim["id"], {"tactic_ids": ["TA0007"]}) + assert resp.status_code == 403 + + +# --------------------------------------------------------------------------- +# Auto-transition via tactic_ids +# --------------------------------------------------------------------------- + + +def test_patch_tactic_ids_triggers_auto_transition( + client: FlaskClient, redteam_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + assert sim["status"] == "pending" + + resp = _patch(client, redteam_token, sim["id"], {"tactic_ids": ["TA0007"]}) + assert resp.status_code == 200 + assert resp.get_json()["status"] == "in_progress" + + +def test_patch_empty_tactic_ids_no_auto_transition( + client: FlaskClient, redteam_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + + resp = _patch(client, redteam_token, sim["id"], {"tactic_ids": []}) + assert resp.status_code == 200 + assert resp.get_json()["status"] == "pending" + + +# --------------------------------------------------------------------------- +# tactic_ids not affected by MITRE bundle loaded state +# (validation uses hardcoded _TACTIC_IDS, not the live bundle) +# --------------------------------------------------------------------------- + + +def test_patch_tactic_ids_works_without_bundle( + client: FlaskClient, redteam_token: str +) -> None: + """tactic_ids validation is hardcoded — bundle state is irrelevant.""" + mitre_svc.mitre_loaded = False + mitre_svc._index = [] + + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + + resp = _patch(client, redteam_token, sim["id"], {"tactic_ids": ["TA0007"]}) + assert resp.status_code == 200 + + +def test_patch_technique_ids_bundle_not_loaded_returns_503( + client: FlaskClient, redteam_token: str +) -> None: + """technique_ids still needs the bundle (different from tactic_ids).""" + mitre_svc.mitre_loaded = False + mitre_svc._index = [] + + eng = _make_engagement(client, redteam_token) + sim = _make_sim(client, redteam_token, eng["id"]) + + resp = _patch(client, redteam_token, sim["id"], {"technique_ids": ["T1059"]}) + assert resp.status_code == 503 diff --git a/backend/tests/test_simulations_techniques.py b/backend/tests/test_simulations_techniques.py index f010aa0..d4e882e 100644 --- a/backend/tests/test_simulations_techniques.py +++ b/backend/tests/test_simulations_techniques.py @@ -407,7 +407,7 @@ def test_migration_0003_techniques_not_null_after_upgrade() -> None: spec = importlib.util.spec_from_file_location( "mig_0003", - "/home/user/Documents/01_Projects/mimic/.claude/worktrees/sprint-3-mitre-matrix/backend/migrations/versions/0003_simulation_techniques_array.py", + "/home/user/Documents/01_Projects/mimic/.claude/worktrees/sprint-4-ui-polish/backend/migrations/versions/0003_simulation_techniques_array.py", ) assert spec is not None and spec.loader is not None mig = importlib.util.module_from_spec(spec) diff --git a/backend/tests/test_simulations_workflow.py b/backend/tests/test_simulations_workflow.py index 264d16f..e7bd75c 100644 --- a/backend/tests/test_simulations_workflow.py +++ b/backend/tests/test_simulations_workflow.py @@ -150,16 +150,18 @@ def test_transition_unknown_status_rejected( assert resp.status_code == 409 -def test_transition_review_required_from_done_rejected( +def test_transition_review_required_from_done_is_reopen( client: FlaskClient, redteam_token: str ) -> None: + """done → review_required is the Reopen path, now allowed (AC-18.2).""" eng = _make_engagement(client, redteam_token) sim = _make_sim(client, redteam_token, eng["id"]) _transition(client, redteam_token, sim["id"], "review_required") _transition(client, redteam_token, sim["id"], "done") resp = _transition(client, redteam_token, sim["id"], "review_required") - assert resp.status_code == 409 + assert resp.status_code == 200 + assert resp.get_json()["status"] == "review_required" # ---------------------------------------------------------------------------