feat: sprint 3 — multi-technique simulations + MITRE matrix modal #6
@@ -26,7 +26,7 @@ _TACTIC_ORDER = [
|
||||
"impact",
|
||||
]
|
||||
|
||||
_TACTIC_NAMES: dict[str, str] = {
|
||||
TACTIC_NAMES: dict[str, str] = {
|
||||
"initial-access": "Initial Access",
|
||||
"execution": "Execution",
|
||||
"persistence": "Persistence",
|
||||
@@ -102,7 +102,7 @@ def _build_matrix(entries: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
techs = tactic_techs.get(tactic_id, [])
|
||||
# Sort techniques alphabetically.
|
||||
techs_sorted = sorted(techs, key=lambda x: x["name"])
|
||||
tactic_name = _TACTIC_NAMES.get(tactic_id, tactic_id.replace("-", " ").title())
|
||||
tactic_name = TACTIC_NAMES.get(tactic_id, tactic_id.replace("-", " ").title())
|
||||
matrix.append(
|
||||
{
|
||||
"tactic_id": tactic_id,
|
||||
|
||||
@@ -70,6 +70,14 @@ _FIXTURE_BUNDLE = {
|
||||
],
|
||||
"kill_chain_phases": [],
|
||||
},
|
||||
{
|
||||
"type": "attack-pattern",
|
||||
"name": "Application Layer Protocol",
|
||||
"external_references": [
|
||||
{"source_name": "mitre-attack", "external_id": "T1071"}
|
||||
],
|
||||
"kill_chain_phases": [{"phase_name": "command-and-control", "kill_chain_name": "mitre-attack"}],
|
||||
},
|
||||
{
|
||||
# Not an attack-pattern — must be ignored.
|
||||
"type": "relationship",
|
||||
@@ -110,7 +118,7 @@ def bundle_file(tmp_path: pathlib.Path) -> pathlib.Path:
|
||||
def test_load_bundle_success(bundle_file: pathlib.Path) -> None:
|
||||
mitre_svc.load_bundle(bundle_file)
|
||||
assert mitre_svc.mitre_loaded is True
|
||||
assert len(mitre_svc._index) == 5 # 6 attack-patterns minus 1 revoked = 5
|
||||
assert len(mitre_svc._index) == 6 # 7 attack-patterns minus 1 revoked = 6
|
||||
|
||||
|
||||
def test_load_bundle_missing_file() -> None:
|
||||
@@ -375,3 +383,12 @@ def test_matrix_endpoint_all_roles(
|
||||
for token in (redteam_token, soc_token, admin_token):
|
||||
resp = client.get("/api/mitre/matrix", headers=_h(token))
|
||||
assert resp.status_code == 200
|
||||
|
||||
|
||||
def test_get_matrix_command_and_control_display_name(bundle_file: pathlib.Path) -> None:
|
||||
"""MITRE official name uses lowercase 'and' — not title-cased."""
|
||||
mitre_svc.load_bundle(bundle_file)
|
||||
matrix = mitre_svc.get_matrix()
|
||||
c2 = next((t for t in matrix if t["tactic_id"] == "command-and-control"), None)
|
||||
assert c2 is not None
|
||||
assert c2["tactic_name"] == "Command and Control"
|
||||
|
||||
@@ -366,3 +366,64 @@ def test_migration_backfill_logic() -> None:
|
||||
]
|
||||
assert _backfill(None, None) == []
|
||||
assert _backfill("T1059", None) == [{"id": "T1059", "name": ""}]
|
||||
|
||||
|
||||
def test_migration_0003_techniques_not_null_after_upgrade() -> None:
|
||||
"""Run migration 0003 upgrade() against a real SQLite DB and assert techniques is NOT NULL."""
|
||||
import importlib
|
||||
import json as _json
|
||||
|
||||
import sqlalchemy as _sa
|
||||
from alembic.operations import Operations
|
||||
from alembic.runtime.migration import MigrationContext
|
||||
|
||||
engine = _sa.create_engine("sqlite:///:memory:")
|
||||
with engine.begin() as conn:
|
||||
# Create the pre-migration schema (0002 state).
|
||||
conn.execute(_sa.text(
|
||||
"CREATE TABLE simulations ("
|
||||
" id INTEGER PRIMARY KEY,"
|
||||
" mitre_technique_id VARCHAR(32),"
|
||||
" mitre_technique_name VARCHAR(255)"
|
||||
")"
|
||||
))
|
||||
conn.execute(_sa.text(
|
||||
"INSERT INTO simulations (id, mitre_technique_id, mitre_technique_name)"
|
||||
" VALUES (1, 'T1059', 'Command and Scripting Interpreter')"
|
||||
))
|
||||
conn.execute(_sa.text(
|
||||
"INSERT INTO simulations (id, mitre_technique_id, mitre_technique_name)"
|
||||
" VALUES (2, NULL, NULL)"
|
||||
))
|
||||
|
||||
# Run upgrade() via Alembic Operations context.
|
||||
with engine.begin() as conn:
|
||||
ctx = MigrationContext.configure(conn, opts={"as_sql": False})
|
||||
ops = Operations(ctx)
|
||||
|
||||
# Patch the module-level proxy so the migration's op.* calls work.
|
||||
import alembic.op as _op_module
|
||||
_op_module._proxy = ops # type: ignore[attr-defined]
|
||||
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"mig_0003",
|
||||
"/home/user/Documents/01_Projects/mimic/.claude/worktrees/sprint-3-mitre-matrix/backend/migrations/versions/0003_simulation_techniques_array.py",
|
||||
)
|
||||
assert spec is not None and spec.loader is not None
|
||||
mig = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mig) # type: ignore[union-attr]
|
||||
mig.upgrade()
|
||||
|
||||
# Verify schema: techniques column exists and is NOT NULL.
|
||||
insp = _sa.inspect(engine)
|
||||
cols = {c["name"]: c for c in insp.get_columns("simulations")}
|
||||
assert "techniques" in cols, "techniques column must exist after upgrade"
|
||||
assert cols["techniques"]["nullable"] is False, "techniques must be NOT NULL after upgrade"
|
||||
assert "mitre_technique_id" not in cols
|
||||
assert "mitre_technique_name" not in cols
|
||||
|
||||
# Verify data was backfilled correctly.
|
||||
with engine.connect() as conn:
|
||||
rows = conn.execute(_sa.text("SELECT id, techniques FROM simulations ORDER BY id")).fetchall()
|
||||
assert _json.loads(rows[0][1]) == [{"id": "T1059", "name": "Command and Scripting Interpreter"}]
|
||||
assert _json.loads(rows[1][1]) == []
|
||||
|
||||
Reference in New Issue
Block a user