- Simulation model: add tactic_ids JSON column (nullable=False, default=[])
- Migration 0004: ADD COLUMN tactic_ids (server_default='[]', no batch needed)
- mitre.py: add _TACTIC_IDS map, lookup_tactic(), get_tactic_name()
- simulation_workflow.py: done guard (409) before RBAC; SOC gate += tactic_ids;
_resolve_tactic_ids() validates against hardcoded map; auto-transition += tactic_ids;
transition done→review_required is Reopen (all 3 roles); _maybe_activate_engagement hook
- serializers.py: _enrich_tactics() → serialize_simulation adds tactics:[{id,name}]
- test_simulations_tactics.py: valid/invalid/dedup/SOC gate/auto-transition/no-bundle
- test_simulations_done_readonly.py: 409 all roles, Reopen all roles, invalid transitions, after-reopen ok
- test_engagement_lifecycle.py: planned→active on auto-transition, already active/closed unchanged, migration 0004 round-trip
- Updated test_simulations_patch.py + test_simulations_workflow.py for AC-18 behavior
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
179 lines
6.4 KiB
Python
179 lines
6.4 KiB
Python
"""Sprint 4 — engagement auto-status planned→active (AC-19)."""
|
|
from __future__ import annotations
|
|
|
|
from flask.testing import FlaskClient
|
|
|
|
from backend.tests.conftest import auth_headers as _h
|
|
|
|
|
|
def _make_engagement(client: FlaskClient, token: str, **kwargs) -> dict:
|
|
payload = {"name": "Eng", "start_date": "2026-01-01", **kwargs}
|
|
resp = client.post("/api/engagements", headers=_h(token), json=payload)
|
|
assert resp.status_code == 201
|
|
return resp.get_json()
|
|
|
|
|
|
def _get_engagement(client: FlaskClient, token: str, eid: int) -> dict:
|
|
resp = client.get(f"/api/engagements/{eid}", headers=_h(token))
|
|
assert resp.status_code == 200
|
|
return resp.get_json()
|
|
|
|
|
|
def _make_sim(client: FlaskClient, token: str, eid: int) -> dict:
|
|
resp = client.post(
|
|
f"/api/engagements/{eid}/simulations",
|
|
headers=_h(token),
|
|
json={"name": "Sim"},
|
|
)
|
|
assert resp.status_code == 201
|
|
return resp.get_json()
|
|
|
|
|
|
def _patch_sim(client: FlaskClient, token: str, sid: int, payload: dict) -> dict:
|
|
resp = client.patch(f"/api/simulations/{sid}", headers=_h(token), json=payload)
|
|
assert resp.status_code == 200
|
|
return resp.get_json()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-19.1 — Auto-activate engagement on first sim in_progress
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_sim_creation_does_not_activate_engagement(
|
|
client: FlaskClient, redteam_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, redteam_token)
|
|
_make_sim(client, redteam_token, eng["id"])
|
|
|
|
eng_data = _get_engagement(client, redteam_token, eng["id"])
|
|
assert eng_data["status"] == "planned"
|
|
|
|
|
|
def test_patch_rt_field_activates_planned_engagement(
|
|
client: FlaskClient, redteam_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, redteam_token)
|
|
sim = _make_sim(client, redteam_token, eng["id"])
|
|
assert sim["status"] == "pending"
|
|
|
|
sim_data = _patch_sim(client, redteam_token, sim["id"], {"description": "started"})
|
|
assert sim_data["status"] == "in_progress"
|
|
|
|
eng_data = _get_engagement(client, redteam_token, eng["id"])
|
|
assert eng_data["status"] == "active"
|
|
|
|
|
|
def test_patch_tactic_ids_activates_planned_engagement(
|
|
client: FlaskClient, redteam_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, redteam_token)
|
|
sim = _make_sim(client, redteam_token, eng["id"])
|
|
|
|
_patch_sim(client, redteam_token, sim["id"], {"tactic_ids": ["TA0007"]})
|
|
|
|
eng_data = _get_engagement(client, redteam_token, eng["id"])
|
|
assert eng_data["status"] == "active"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-19.2 — Already active → stays active (no change)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_patch_rt_field_does_not_change_active_engagement(
|
|
client: FlaskClient, redteam_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, redteam_token)
|
|
sim = _make_sim(client, redteam_token, eng["id"])
|
|
# First patch triggers activation.
|
|
_patch_sim(client, redteam_token, sim["id"], {"description": "started"})
|
|
|
|
# Second patch: engagement should remain active (no state change).
|
|
_patch_sim(client, redteam_token, sim["id"], {"description": "updated"})
|
|
|
|
eng_data = _get_engagement(client, redteam_token, eng["id"])
|
|
assert eng_data["status"] == "active"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-19.3 — Engagement in closed state → not touched
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_patch_does_not_reopen_closed_engagement(
|
|
client: FlaskClient, redteam_token: str, admin_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, redteam_token)
|
|
sim = _make_sim(client, redteam_token, eng["id"])
|
|
|
|
# Manually close the engagement via API.
|
|
close_resp = client.patch(
|
|
f"/api/engagements/{eng['id']}",
|
|
headers=_h(admin_token),
|
|
json={"status": "closed"},
|
|
)
|
|
assert close_resp.status_code == 200
|
|
|
|
# PATCH a sim field that would normally trigger in_progress.
|
|
_patch_sim(client, redteam_token, sim["id"], {"description": "new work"})
|
|
|
|
eng_data = _get_engagement(client, redteam_token, eng["id"])
|
|
assert eng_data["status"] == "closed"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Migration 0004 — tactic_ids column NOT NULL after upgrade
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_migration_0004_tactic_ids_not_null_after_upgrade() -> None:
|
|
"""Alembic round-trip: tactic_ids column is NOT NULL after migration 0004."""
|
|
import importlib
|
|
|
|
import sqlalchemy as _sa
|
|
from alembic.operations import Operations
|
|
from alembic.runtime.migration import MigrationContext
|
|
|
|
engine = _sa.create_engine("sqlite:///:memory:")
|
|
|
|
# Create post-0003 schema (simulations with techniques column).
|
|
with engine.begin() as conn:
|
|
conn.execute(_sa.text(
|
|
"CREATE TABLE simulations ("
|
|
" id INTEGER PRIMARY KEY,"
|
|
" techniques TEXT NOT NULL DEFAULT '[]'"
|
|
")"
|
|
))
|
|
conn.execute(_sa.text(
|
|
"INSERT INTO simulations (id, techniques) VALUES (1, '[]')"
|
|
))
|
|
|
|
with engine.begin() as conn:
|
|
ctx = MigrationContext.configure(conn, opts={"as_sql": False})
|
|
ops = Operations(ctx)
|
|
|
|
import alembic.op as _op_module
|
|
_op_module._proxy = ops # type: ignore[attr-defined]
|
|
|
|
spec = importlib.util.spec_from_file_location(
|
|
"mig_0004",
|
|
"/home/user/Documents/01_Projects/mimic/.claude/worktrees/sprint-4-ui-polish/backend/migrations/versions/0004_simulation_tactic_ids.py",
|
|
)
|
|
assert spec is not None and spec.loader is not None
|
|
mig = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(mig) # type: ignore[union-attr]
|
|
mig.upgrade()
|
|
|
|
insp = _sa.inspect(engine)
|
|
cols = {c["name"]: c for c in insp.get_columns("simulations")}
|
|
assert "tactic_ids" in cols, "tactic_ids column must exist after upgrade"
|
|
assert cols["tactic_ids"]["nullable"] is False, "tactic_ids must be NOT NULL"
|
|
|
|
# Existing row should have server_default applied.
|
|
with engine.connect() as conn:
|
|
row = conn.execute(_sa.text("SELECT tactic_ids FROM simulations WHERE id=1")).fetchone()
|
|
assert row is not None
|
|
import json
|
|
assert json.loads(row[0]) == []
|