feat(backend): sprint 2 — simulations + MITRE ATT&CK

- Simulation model with full field set (redteam + SOC sides) and cascade delete
- Alembic migration 0002 for simulations table
- simulation_workflow service: PATCH RBAC field-level + auto-transition pending→in_progress + state machine
- mitre service: STIX bundle loader (boot-safe) + ranked search (exact-id > prefix-id > name)
- 7 new API endpoints: list/create/get/patch/delete simulations, transition, MITRE autocomplete
- serialize_simulation added to serializers.py
- Makefile update-mitre target with real curl + optional docker restart
- Dockerfile updated to copy backend/data/ into image
- MITRE enterprise-attack.json bundle committed (~45 MB)
- 67 new tests (total 130 passing), ruff clean, mypy introduces no new errors

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Knacky
2026-05-26 10:59:14 +02:00
parent 7fc79cc5a6
commit 006c4c2c5f
17 changed files with 796686 additions and 4 deletions

247
backend/tests/test_mitre.py Normal file
View File

@@ -0,0 +1,247 @@
"""MITRE service and endpoint tests. Uses a tiny fixture bundle, not the 40 MB file."""
from __future__ import annotations
import json
import pathlib
import pytest
from flask.testing import FlaskClient
from backend.app.services import mitre as mitre_svc
from backend.tests.conftest import auth_headers as _h
# ---------------------------------------------------------------------------
# Fixture STIX bundle (minimal, 4 techniques including one sub-technique)
# ---------------------------------------------------------------------------
_FIXTURE_BUNDLE = {
"type": "bundle",
"objects": [
{
"type": "attack-pattern",
"name": "Command and Scripting Interpreter",
"external_references": [
{"source_name": "mitre-attack", "external_id": "T1059"}
],
"kill_chain_phases": [{"phase_name": "execution", "kill_chain_name": "mitre-attack"}],
},
{
"type": "attack-pattern",
"name": "PowerShell",
"external_references": [
{"source_name": "mitre-attack", "external_id": "T1059.001"}
],
"kill_chain_phases": [{"phase_name": "execution", "kill_chain_name": "mitre-attack"}],
},
{
"type": "attack-pattern",
"name": "Phishing",
"external_references": [
{"source_name": "mitre-attack", "external_id": "T1566"}
],
"kill_chain_phases": [{"phase_name": "initial-access", "kill_chain_name": "mitre-attack"}],
},
{
"type": "attack-pattern",
"name": "Valid Accounts",
"external_references": [
{"source_name": "mitre-attack", "external_id": "T1078"}
],
"kill_chain_phases": [
{"phase_name": "initial-access", "kill_chain_name": "mitre-attack"},
{"phase_name": "persistence", "kill_chain_name": "mitre-attack"},
],
},
{
# Revoked — must be excluded from index.
"type": "attack-pattern",
"name": "Old Technique",
"revoked": True,
"external_references": [
{"source_name": "mitre-attack", "external_id": "T9999"}
],
"kill_chain_phases": [],
},
{
# Not an attack-pattern — must be ignored.
"type": "relationship",
"name": "Ignored",
},
],
}
@pytest.fixture(autouse=True)
def _reset_mitre():
"""Reset the MITRE service state between tests."""
original_loaded = mitre_svc.mitre_loaded
original_index = list(mitre_svc._index)
yield
mitre_svc.mitre_loaded = original_loaded
mitre_svc._index = original_index
@pytest.fixture()
def bundle_file(tmp_path: pathlib.Path) -> pathlib.Path:
p = tmp_path / "enterprise-attack.json"
p.write_text(json.dumps(_FIXTURE_BUNDLE), encoding="utf-8")
return p
# ---------------------------------------------------------------------------
# Unit tests for load_bundle
# ---------------------------------------------------------------------------
def test_load_bundle_success(bundle_file: pathlib.Path) -> None:
mitre_svc.load_bundle(bundle_file)
assert mitre_svc.mitre_loaded is True
assert len(mitre_svc._index) == 4 # 5 objects minus 1 revoked = 4
def test_load_bundle_missing_file() -> None:
mitre_svc.load_bundle(pathlib.Path("/nonexistent/path.json"))
assert mitre_svc.mitre_loaded is False
def test_load_bundle_invalid_json(tmp_path: pathlib.Path) -> None:
bad = tmp_path / "bad.json"
bad.write_text("{ not json }", encoding="utf-8")
mitre_svc.load_bundle(bad)
assert mitre_svc.mitre_loaded is False
def test_load_bundle_excludes_revoked(bundle_file: pathlib.Path) -> None:
mitre_svc.load_bundle(bundle_file)
ids = [e["id"] for e in mitre_svc._index]
assert "T9999" not in ids
def test_load_bundle_includes_subtechniques(bundle_file: pathlib.Path) -> None:
mitre_svc.load_bundle(bundle_file)
ids = [e["id"] for e in mitre_svc._index]
assert "T1059.001" in ids
def test_load_bundle_extracts_tactics(bundle_file: pathlib.Path) -> None:
mitre_svc.load_bundle(bundle_file)
t1078 = next(e for e in mitre_svc._index if e["id"] == "T1078")
assert "initial-access" in t1078["tactics"]
assert "persistence" in t1078["tactics"]
# ---------------------------------------------------------------------------
# Unit tests for search
# ---------------------------------------------------------------------------
def test_search_exact_id_first(bundle_file: pathlib.Path) -> None:
mitre_svc.load_bundle(bundle_file)
results = mitre_svc.search("T1059")
assert results[0]["id"] == "T1059"
def test_search_prefix_id(bundle_file: pathlib.Path) -> None:
mitre_svc.load_bundle(bundle_file)
results = mitre_svc.search("T105")
ids = [r["id"] for r in results]
assert "T1059" in ids
assert "T1059.001" in ids
def test_search_name_substring(bundle_file: pathlib.Path) -> None:
mitre_svc.load_bundle(bundle_file)
results = mitre_svc.search("phish")
assert any(r["id"] == "T1566" for r in results)
def test_search_case_insensitive(bundle_file: pathlib.Path) -> None:
mitre_svc.load_bundle(bundle_file)
results = mitre_svc.search("POWERSHELL")
assert any(r["id"] == "T1059.001" for r in results)
def test_search_limit(bundle_file: pathlib.Path) -> None:
mitre_svc.load_bundle(bundle_file)
results = mitre_svc.search("T", limit=2)
assert len(results) <= 2
def test_search_empty_query(bundle_file: pathlib.Path) -> None:
mitre_svc.load_bundle(bundle_file)
assert mitre_svc.search("") == []
def test_search_ranking_order(bundle_file: pathlib.Path) -> None:
"""exact-id > prefix-id > name match."""
mitre_svc.load_bundle(bundle_file)
results = mitre_svc.search("T1059")
# T1059 must come before T1059.001 (prefix match)
ids = [r["id"] for r in results]
assert ids.index("T1059") < ids.index("T1059.001")
# ---------------------------------------------------------------------------
# Endpoint tests
# ---------------------------------------------------------------------------
def test_mitre_endpoint_503_when_not_loaded(
client: FlaskClient, redteam_token: str
) -> None:
mitre_svc.mitre_loaded = False
mitre_svc._index = []
resp = client.get("/api/mitre/techniques?q=T1059", headers=_h(redteam_token))
assert resp.status_code == 503
assert resp.get_json()["error"] == "mitre bundle not loaded"
def test_mitre_endpoint_returns_results(
client: FlaskClient, redteam_token: str, bundle_file: pathlib.Path
) -> None:
mitre_svc.load_bundle(bundle_file)
resp = client.get("/api/mitre/techniques?q=T1059", headers=_h(redteam_token))
assert resp.status_code == 200
data = resp.get_json()
assert isinstance(data, list)
assert any(r["id"] == "T1059" for r in data)
def test_mitre_endpoint_requires_auth(client: FlaskClient) -> None:
resp = client.get("/api/mitre/techniques?q=T1059")
assert resp.status_code == 401
def test_mitre_endpoint_all_roles_can_access(
client: FlaskClient,
redteam_token: str,
soc_token: str,
admin_token: str,
bundle_file: pathlib.Path,
) -> None:
mitre_svc.load_bundle(bundle_file)
for token in (redteam_token, soc_token, admin_token):
resp = client.get("/api/mitre/techniques?q=T1059", headers=_h(token))
assert resp.status_code == 200
def test_mitre_endpoint_max_20_results(
client: FlaskClient, redteam_token: str, bundle_file: pathlib.Path
) -> None:
mitre_svc.load_bundle(bundle_file)
resp = client.get("/api/mitre/techniques?q=T", headers=_h(redteam_token))
assert resp.status_code == 200
assert len(resp.get_json()) <= 20
def test_mitre_endpoint_includes_tactics(
client: FlaskClient, redteam_token: str, bundle_file: pathlib.Path
) -> None:
mitre_svc.load_bundle(bundle_file)
resp = client.get("/api/mitre/techniques?q=T1566", headers=_h(redteam_token))
assert resp.status_code == 200
data = resp.get_json()
assert len(data) >= 1
phishing = next((r for r in data if r["id"] == "T1566"), None)
assert phishing is not None
assert "initial-access" in phishing["tactics"]

View File

@@ -0,0 +1,236 @@
"""Simulation CRUD tests: create, list, get, delete, cascade."""
from __future__ import annotations
from flask.testing import FlaskClient
from backend.app.extensions import db
from backend.app.models import User
from backend.app.models.simulation import Simulation
from backend.tests.conftest import auth_headers as _h
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_engagement(client: FlaskClient, token: str) -> dict:
resp = client.post(
"/api/engagements",
headers=_h(token),
json={"name": "Op Alpha", "start_date": "2026-06-01"},
)
assert resp.status_code == 201, resp.get_json()
return resp.get_json()
def _make_sim(client: FlaskClient, token: str, eid: int, **kw) -> dict:
payload = {"name": "Sim 1", **kw}
resp = client.post(
f"/api/engagements/{eid}/simulations", headers=_h(token), json=payload
)
assert resp.status_code == 201, resp.get_json()
return resp.get_json()
# ---------------------------------------------------------------------------
# Create
# ---------------------------------------------------------------------------
def test_create_simulation_as_redteam(
client: FlaskClient, redteam_user: User, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
body = _make_sim(client, redteam_token, eng["id"])
assert body["name"] == "Sim 1"
assert body["status"] == "pending"
assert body["engagement_id"] == eng["id"]
assert body["created_by"] == {"id": redteam_user.id, "username": "redteam1"}
def test_create_simulation_as_admin(
client: FlaskClient, admin_user: User, admin_token: str
) -> None:
eng = _make_engagement(client, admin_token)
body = _make_sim(client, admin_token, eng["id"])
assert body["created_by"]["username"] == "admin1"
def test_create_simulation_soc_forbidden(
client: FlaskClient, redteam_token: str, soc_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
resp = client.post(
f"/api/engagements/{eng['id']}/simulations",
headers=_h(soc_token),
json={"name": "x"},
)
assert resp.status_code == 403
def test_create_simulation_unauth(client: FlaskClient, redteam_token: str) -> None:
eng = _make_engagement(client, redteam_token)
resp = client.post(
f"/api/engagements/{eng['id']}/simulations", json={"name": "x"}
)
assert resp.status_code == 401
def test_create_simulation_missing_name(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
resp = client.post(
f"/api/engagements/{eng['id']}/simulations",
headers=_h(redteam_token),
json={"name": ""},
)
assert resp.status_code == 400
def test_create_simulation_engagement_not_found(
client: FlaskClient, redteam_token: str
) -> None:
resp = client.post(
"/api/engagements/9999/simulations",
headers=_h(redteam_token),
json={"name": "x"},
)
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# List
# ---------------------------------------------------------------------------
def test_list_simulations_empty(client: FlaskClient, redteam_token: str) -> None:
eng = _make_engagement(client, redteam_token)
resp = client.get(
f"/api/engagements/{eng['id']}/simulations", headers=_h(redteam_token)
)
assert resp.status_code == 200
assert resp.get_json() == []
def test_list_simulations_ordered_desc(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
_make_sim(client, redteam_token, eng["id"], name="First")
_make_sim(client, redteam_token, eng["id"], name="Second")
resp = client.get(
f"/api/engagements/{eng['id']}/simulations", headers=_h(redteam_token)
)
assert resp.status_code == 200
items = resp.get_json()
assert len(items) == 2
# Most recent first
assert items[0]["name"] == "Second"
assert items[1]["name"] == "First"
def test_list_simulations_soc_can_read(
client: FlaskClient, redteam_token: str, soc_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
_make_sim(client, redteam_token, eng["id"])
resp = client.get(
f"/api/engagements/{eng['id']}/simulations", headers=_h(soc_token)
)
assert resp.status_code == 200
assert len(resp.get_json()) == 1
def test_list_simulations_engagement_not_found(
client: FlaskClient, redteam_token: str
) -> None:
resp = client.get(
"/api/engagements/9999/simulations", headers=_h(redteam_token)
)
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# Get
# ---------------------------------------------------------------------------
def test_get_simulation_ok(client: FlaskClient, redteam_token: str) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
resp = client.get(f"/api/simulations/{sim['id']}", headers=_h(redteam_token))
assert resp.status_code == 200
assert resp.get_json()["id"] == sim["id"]
def test_get_simulation_404(client: FlaskClient, redteam_token: str) -> None:
resp = client.get("/api/simulations/9999", headers=_h(redteam_token))
assert resp.status_code == 404
def test_get_simulation_soc_can_read(
client: FlaskClient, redteam_token: str, soc_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
resp = client.get(f"/api/simulations/{sim['id']}", headers=_h(soc_token))
assert resp.status_code == 200
# ---------------------------------------------------------------------------
# Delete
# ---------------------------------------------------------------------------
def test_delete_simulation_redteam(client: FlaskClient, redteam_token: str) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
resp = client.delete(
f"/api/simulations/{sim['id']}", headers=_h(redteam_token)
)
assert resp.status_code == 204
def test_delete_simulation_admin(
client: FlaskClient, redteam_token: str, admin_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
resp = client.delete(f"/api/simulations/{sim['id']}", headers=_h(admin_token))
assert resp.status_code == 204
def test_delete_simulation_soc_forbidden(
client: FlaskClient, redteam_token: str, soc_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
resp = client.delete(f"/api/simulations/{sim['id']}", headers=_h(soc_token))
assert resp.status_code == 403
def test_delete_simulation_404(client: FlaskClient, redteam_token: str) -> None:
resp = client.delete("/api/simulations/9999", headers=_h(redteam_token))
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# Cascade delete
# ---------------------------------------------------------------------------
def test_cascade_delete_engagement_removes_simulations(
app, client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
sim_id = sim["id"]
resp = client.delete(
f"/api/engagements/{eng['id']}", headers=_h(redteam_token)
)
assert resp.status_code == 204
with app.app_context():
assert db.session.get(Simulation, sim_id) is None

View File

@@ -0,0 +1,272 @@
"""Simulation PATCH tests: auto-transition, RBAC field-level, SOC restrictions."""
from __future__ import annotations
from flask.testing import FlaskClient
from backend.tests.conftest import auth_headers as _h
def _make_engagement(client: FlaskClient, token: str) -> dict:
resp = client.post(
"/api/engagements",
headers=_h(token),
json={"name": "Op Beta", "start_date": "2026-06-01"},
)
assert resp.status_code == 201
return resp.get_json()
def _make_sim(client: FlaskClient, token: str, eid: int) -> dict:
resp = client.post(
f"/api/engagements/{eid}/simulations",
headers=_h(token),
json={"name": "Test Sim"},
)
assert resp.status_code == 201
return resp.get_json()
def _patch(client: FlaskClient, token: str, sid: int, payload: dict) -> dict:
resp = client.patch(
f"/api/simulations/{sid}", headers=_h(token), json=payload
)
return resp
# ---------------------------------------------------------------------------
# Auto-transition pending → in_progress (AC-8.2)
# ---------------------------------------------------------------------------
def test_patch_triggers_auto_transition(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
assert sim["status"] == "pending"
resp = _patch(client, redteam_token, sim["id"], {"description": "some desc"})
assert resp.status_code == 200
assert resp.get_json()["status"] == "in_progress"
def test_patch_name_triggers_auto_transition(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
resp = _patch(client, redteam_token, sim["id"], {"name": "Updated name"})
assert resp.status_code == 200
assert resp.get_json()["status"] == "in_progress"
def test_patch_commands_triggers_auto_transition(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
resp = _patch(client, redteam_token, sim["id"], {"commands": "cmd1\ncmd2"})
assert resp.status_code == 200
assert resp.get_json()["status"] == "in_progress"
def test_patch_null_value_does_not_trigger_auto_transition(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
resp = _patch(client, redteam_token, sim["id"], {"description": None})
assert resp.status_code == 200
assert resp.get_json()["status"] == "pending"
def test_patch_empty_string_does_not_trigger_auto_transition(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
resp = _patch(client, redteam_token, sim["id"], {"description": ""})
assert resp.status_code == 200
assert resp.get_json()["status"] == "pending"
def test_patch_admin_triggers_auto_transition(
client: FlaskClient, admin_token: str
) -> None:
eng = _make_engagement(client, admin_token)
sim = _make_sim(client, admin_token, eng["id"])
resp = _patch(client, admin_token, sim["id"], {"execution_result": "success"})
assert resp.status_code == 200
assert resp.get_json()["status"] == "in_progress"
def test_patch_soc_does_not_trigger_auto_transition(
client: FlaskClient, redteam_token: str, soc_token: str
) -> None:
"""SOC patch on review_required must not trigger auto-transition."""
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
# Manually advance to review_required.
client.post(
f"/api/simulations/{sim['id']}/transition",
headers=_h(redteam_token),
json={"to": "review_required"},
)
resp = _patch(client, soc_token, sim["id"], {"soc_comment": "looks good"})
assert resp.status_code == 200
assert resp.get_json()["status"] == "review_required"
# ---------------------------------------------------------------------------
# Field updates
# ---------------------------------------------------------------------------
def test_patch_updates_commands_as_text(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
commands = "whoami\nnet user\nipconfig"
resp = _patch(client, redteam_token, sim["id"], {"commands": commands})
assert resp.status_code == 200
assert resp.get_json()["commands"] == commands
def test_patch_updates_executed_at(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
resp = _patch(
client, redteam_token, sim["id"], {"executed_at": "2026-06-01T12:00:00"}
)
assert resp.status_code == 200
assert "2026-06-01" in resp.get_json()["executed_at"]
def test_patch_invalid_executed_at(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
resp = _patch(
client, redteam_token, sim["id"], {"executed_at": "not-a-date"}
)
assert resp.status_code == 400
assert resp.get_json()["error"] == "invalid executed_at"
def test_patch_clear_executed_at(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
_patch(client, redteam_token, sim["id"], {"executed_at": "2026-06-01T12:00:00"})
resp = _patch(client, redteam_token, sim["id"], {"executed_at": None})
assert resp.status_code == 200
assert resp.get_json()["executed_at"] is None
# ---------------------------------------------------------------------------
# SOC RBAC field-level (AC-9.1, AC-9.2)
# ---------------------------------------------------------------------------
def test_soc_cannot_patch_before_review_required(
client: FlaskClient, redteam_token: str, soc_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
assert sim["status"] == "pending"
resp = _patch(client, soc_token, sim["id"], {"soc_comment": "hello"})
assert resp.status_code == 403
assert "not ready" in resp.get_json()["error"]
def test_soc_cannot_patch_in_progress(
client: FlaskClient, redteam_token: str, soc_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
_patch(client, redteam_token, sim["id"], {"description": "in progress now"})
resp = _patch(client, soc_token, sim["id"], {"soc_comment": "hello"})
assert resp.status_code == 403
def test_soc_can_patch_when_review_required(
client: FlaskClient, redteam_token: str, soc_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
client.post(
f"/api/simulations/{sim['id']}/transition",
headers=_h(redteam_token),
json={"to": "review_required"},
)
resp = _patch(
client,
soc_token,
sim["id"],
{"soc_comment": "Detected", "log_source": "SIEM", "incident_number": "INC-001"},
)
assert resp.status_code == 200
body = resp.get_json()
assert body["soc_comment"] == "Detected"
assert body["log_source"] == "SIEM"
assert body["incident_number"] == "INC-001"
def test_soc_can_patch_when_done(
client: FlaskClient, redteam_token: str, soc_token: str, admin_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
client.post(
f"/api/simulations/{sim['id']}/transition",
headers=_h(redteam_token),
json={"to": "review_required"},
)
client.post(
f"/api/simulations/{sim['id']}/transition",
headers=_h(soc_token),
json={"to": "done"},
)
resp = _patch(client, soc_token, sim["id"], {"soc_comment": "Final note"})
assert resp.status_code == 200
def test_soc_cannot_edit_redteam_fields(
client: FlaskClient, redteam_token: str, soc_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
client.post(
f"/api/simulations/{sim['id']}/transition",
headers=_h(redteam_token),
json={"to": "review_required"},
)
resp = _patch(client, soc_token, sim["id"], {"description": "redteam field"})
assert resp.status_code == 403
assert resp.get_json()["error"] == "soc cannot edit redteam fields"
def test_patch_simulation_404(client: FlaskClient, redteam_token: str) -> None:
resp = _patch(client, redteam_token, 9999, {"name": "x"})
assert resp.status_code == 404

View File

@@ -0,0 +1,192 @@
"""Simulation workflow / state machine tests."""
from __future__ import annotations
from flask.testing import FlaskClient
from backend.tests.conftest import auth_headers as _h
def _make_engagement(client: FlaskClient, token: str) -> dict:
resp = client.post(
"/api/engagements",
headers=_h(token),
json={"name": "Op Gamma", "start_date": "2026-06-01"},
)
assert resp.status_code == 201
return resp.get_json()
def _make_sim(client: FlaskClient, token: str, eid: int) -> dict:
resp = client.post(
f"/api/engagements/{eid}/simulations",
headers=_h(token),
json={"name": "Workflow Sim"},
)
assert resp.status_code == 201
return resp.get_json()
def _transition(client: FlaskClient, token: str, sid: int, to: str):
return client.post(
f"/api/simulations/{sid}/transition",
headers=_h(token),
json={"to": to},
)
# ---------------------------------------------------------------------------
# Valid transitions
# ---------------------------------------------------------------------------
def test_transition_to_review_required_from_pending(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
assert sim["status"] == "pending"
resp = _transition(client, redteam_token, sim["id"], "review_required")
assert resp.status_code == 200
assert resp.get_json()["status"] == "review_required"
def test_transition_to_review_required_from_in_progress(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
# Auto-advance to in_progress
client.patch(
f"/api/simulations/{sim['id']}",
headers=_h(redteam_token),
json={"description": "started"},
)
resp = _transition(client, redteam_token, sim["id"], "review_required")
assert resp.status_code == 200
assert resp.get_json()["status"] == "review_required"
def test_transition_to_done_by_redteam(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
_transition(client, redteam_token, sim["id"], "review_required")
resp = _transition(client, redteam_token, sim["id"], "done")
assert resp.status_code == 200
assert resp.get_json()["status"] == "done"
def test_transition_to_done_by_soc(
client: FlaskClient, redteam_token: str, soc_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
_transition(client, redteam_token, sim["id"], "review_required")
resp = _transition(client, soc_token, sim["id"], "done")
assert resp.status_code == 200
assert resp.get_json()["status"] == "done"
def test_transition_to_done_by_admin(
client: FlaskClient, admin_token: str
) -> None:
eng = _make_engagement(client, admin_token)
sim = _make_sim(client, admin_token, eng["id"])
_transition(client, admin_token, sim["id"], "review_required")
resp = _transition(client, admin_token, sim["id"], "done")
assert resp.status_code == 200
assert resp.get_json()["status"] == "done"
# ---------------------------------------------------------------------------
# Invalid transitions (AC-11.3)
# ---------------------------------------------------------------------------
def test_transition_done_from_pending_rejected(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
resp = _transition(client, redteam_token, sim["id"], "done")
assert resp.status_code == 409
def test_transition_to_pending_rejected(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
_transition(client, redteam_token, sim["id"], "review_required")
resp = _transition(client, redteam_token, sim["id"], "pending")
assert resp.status_code == 409
def test_transition_to_in_progress_rejected(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
resp = _transition(client, redteam_token, sim["id"], "in_progress")
assert resp.status_code == 409
def test_transition_unknown_status_rejected(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
resp = _transition(client, redteam_token, sim["id"], "nonexistent")
assert resp.status_code == 409
def test_transition_review_required_from_done_rejected(
client: FlaskClient, redteam_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
_transition(client, redteam_token, sim["id"], "review_required")
_transition(client, redteam_token, sim["id"], "done")
resp = _transition(client, redteam_token, sim["id"], "review_required")
assert resp.status_code == 409
# ---------------------------------------------------------------------------
# RBAC by role
# ---------------------------------------------------------------------------
def test_soc_cannot_transition_to_review_required(
client: FlaskClient, redteam_token: str, soc_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
resp = _transition(client, soc_token, sim["id"], "review_required")
assert resp.status_code == 403
def test_soc_cannot_transition_to_done_from_pending(
client: FlaskClient, redteam_token: str, soc_token: str
) -> None:
eng = _make_engagement(client, redteam_token)
sim = _make_sim(client, redteam_token, eng["id"])
resp = _transition(client, soc_token, sim["id"], "done")
assert resp.status_code == 409
def test_transition_simulation_404(client: FlaskClient, redteam_token: str) -> None:
resp = _transition(client, redteam_token, 9999, "review_required")
assert resp.status_code == 404