diff --git a/backend/app/services/mitre.py b/backend/app/services/mitre.py index 3618128..6dd91e2 100644 --- a/backend/app/services/mitre.py +++ b/backend/app/services/mitre.py @@ -26,7 +26,7 @@ _TACTIC_ORDER = [ "impact", ] -_TACTIC_NAMES: dict[str, str] = { +TACTIC_NAMES: dict[str, str] = { "initial-access": "Initial Access", "execution": "Execution", "persistence": "Persistence", @@ -102,7 +102,7 @@ def _build_matrix(entries: list[dict[str, Any]]) -> list[dict[str, Any]]: techs = tactic_techs.get(tactic_id, []) # Sort techniques alphabetically. techs_sorted = sorted(techs, key=lambda x: x["name"]) - tactic_name = _TACTIC_NAMES.get(tactic_id, tactic_id.replace("-", " ").title()) + tactic_name = TACTIC_NAMES.get(tactic_id, tactic_id.replace("-", " ").title()) matrix.append( { "tactic_id": tactic_id, diff --git a/backend/tests/test_mitre.py b/backend/tests/test_mitre.py index 2057571..39ba563 100644 --- a/backend/tests/test_mitre.py +++ b/backend/tests/test_mitre.py @@ -70,6 +70,14 @@ _FIXTURE_BUNDLE = { ], "kill_chain_phases": [], }, + { + "type": "attack-pattern", + "name": "Application Layer Protocol", + "external_references": [ + {"source_name": "mitre-attack", "external_id": "T1071"} + ], + "kill_chain_phases": [{"phase_name": "command-and-control", "kill_chain_name": "mitre-attack"}], + }, { # Not an attack-pattern — must be ignored. "type": "relationship", @@ -110,7 +118,7 @@ def bundle_file(tmp_path: pathlib.Path) -> pathlib.Path: def test_load_bundle_success(bundle_file: pathlib.Path) -> None: mitre_svc.load_bundle(bundle_file) assert mitre_svc.mitre_loaded is True - assert len(mitre_svc._index) == 5 # 6 attack-patterns minus 1 revoked = 5 + assert len(mitre_svc._index) == 6 # 7 attack-patterns minus 1 revoked = 6 def test_load_bundle_missing_file() -> None: @@ -375,3 +383,12 @@ def test_matrix_endpoint_all_roles( for token in (redteam_token, soc_token, admin_token): resp = client.get("/api/mitre/matrix", headers=_h(token)) assert resp.status_code == 200 + + +def test_get_matrix_command_and_control_display_name(bundle_file: pathlib.Path) -> None: + """MITRE official name uses lowercase 'and' — not title-cased.""" + mitre_svc.load_bundle(bundle_file) + matrix = mitre_svc.get_matrix() + c2 = next((t for t in matrix if t["tactic_id"] == "command-and-control"), None) + assert c2 is not None + assert c2["tactic_name"] == "Command and Control" diff --git a/backend/tests/test_simulations_techniques.py b/backend/tests/test_simulations_techniques.py index f515965..f010aa0 100644 --- a/backend/tests/test_simulations_techniques.py +++ b/backend/tests/test_simulations_techniques.py @@ -366,3 +366,64 @@ def test_migration_backfill_logic() -> None: ] assert _backfill(None, None) == [] assert _backfill("T1059", None) == [{"id": "T1059", "name": ""}] + + +def test_migration_0003_techniques_not_null_after_upgrade() -> None: + """Run migration 0003 upgrade() against a real SQLite DB and assert techniques is NOT NULL.""" + import importlib + import json as _json + + import sqlalchemy as _sa + from alembic.operations import Operations + from alembic.runtime.migration import MigrationContext + + engine = _sa.create_engine("sqlite:///:memory:") + with engine.begin() as conn: + # Create the pre-migration schema (0002 state). + conn.execute(_sa.text( + "CREATE TABLE simulations (" + " id INTEGER PRIMARY KEY," + " mitre_technique_id VARCHAR(32)," + " mitre_technique_name VARCHAR(255)" + ")" + )) + conn.execute(_sa.text( + "INSERT INTO simulations (id, mitre_technique_id, mitre_technique_name)" + " VALUES (1, 'T1059', 'Command and Scripting Interpreter')" + )) + conn.execute(_sa.text( + "INSERT INTO simulations (id, mitre_technique_id, mitre_technique_name)" + " VALUES (2, NULL, NULL)" + )) + + # Run upgrade() via Alembic Operations context. + with engine.begin() as conn: + ctx = MigrationContext.configure(conn, opts={"as_sql": False}) + ops = Operations(ctx) + + # Patch the module-level proxy so the migration's op.* calls work. + import alembic.op as _op_module + _op_module._proxy = ops # type: ignore[attr-defined] + + spec = importlib.util.spec_from_file_location( + "mig_0003", + "/home/user/Documents/01_Projects/mimic/.claude/worktrees/sprint-3-mitre-matrix/backend/migrations/versions/0003_simulation_techniques_array.py", + ) + assert spec is not None and spec.loader is not None + mig = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mig) # type: ignore[union-attr] + mig.upgrade() + + # Verify schema: techniques column exists and is NOT NULL. + insp = _sa.inspect(engine) + cols = {c["name"]: c for c in insp.get_columns("simulations")} + assert "techniques" in cols, "techniques column must exist after upgrade" + assert cols["techniques"]["nullable"] is False, "techniques must be NOT NULL after upgrade" + assert "mitre_technique_id" not in cols + assert "mitre_technique_name" not in cols + + # Verify data was backfilled correctly. + with engine.connect() as conn: + rows = conn.execute(_sa.text("SELECT id, techniques FROM simulations ORDER BY id")).fetchall() + assert _json.loads(rows[0][1]) == [{"id": "T1059", "name": "Command and Scripting Interpreter"}] + assert _json.loads(rows[1][1]) == []