"""Integration tests for M4: STIX parser + seed + /mitre/* endpoints. Uses a minimal hand-crafted STIX bundle (no network) so the parser logic and the upsert semantics can be exercised deterministically. """ from __future__ import annotations import json import secrets import uuid from pathlib import Path import pytest from sqlalchemy import text from app.core.install_token import regenerate_install_token from app.main import create_app from app.services import mitre_seed as mitre_svc def _truncate_all(engine): with engine.begin() as conn: conn.execute( text( "TRUNCATE users, refresh_tokens, invitations, invitation_groups, " "user_groups, group_permissions, permissions, settings, groups, " "mitre_subtechniques, mitre_technique_tactics, mitre_techniques, " "mitre_tactics RESTART IDENTITY CASCADE" ) ) @pytest.fixture(scope="module") def app(db_engine_or_skip): _truncate_all(db_engine_or_skip) flask_app = create_app() flask_app.config.update(TESTING=True) return flask_app @pytest.fixture() def client(app): return app.test_client() def _unique_email(prefix: str) -> str: return f"{prefix}-{secrets.token_hex(4)}@metamorph.local" @pytest.fixture(scope="module") def admin_credentials(app, db_engine_or_skip): """Bootstrap a fresh admin once for the whole module.""" token = regenerate_install_token() email = _unique_email("admin") password = "AdminPass1234!" with app.test_client() as c: r = c.post( "/api/v1/setup", json={"install_token": token, "email": email, "password": password}, ) assert r.status_code == 201, r.get_data(as_text=True) return {"email": email, "password": password, "user_id": r.get_json()["user_id"]} def _login(client, email: str, password: str) -> str: r = client.post("/api/v1/auth/login", json={"email": email, "password": password}) assert r.status_code == 200, r.get_data(as_text=True) return r.get_json()["access_token"] # === Fixture STIX bundle ===================================================== MINIMAL_BUNDLE = { "type": "bundle", "id": "bundle--00000000-0000-0000-0000-000000000001", "spec_version": "2.1", "objects": [ # Tactic 1 — kept { "type": "x-mitre-tactic", "id": "x-mitre-tactic--ta0001", "name": "Initial Access", "description": "Get a foothold.", "x_mitre_shortname": "initial-access", "external_references": [ { "source_name": "mitre-attack", "external_id": "TA0001", "url": "https://attack.mitre.org/tactics/TA0001/", } ], }, # Tactic 2 — kept { "type": "x-mitre-tactic", "id": "x-mitre-tactic--ta0002", "name": "Execution", "description": "Run code.", "x_mitre_shortname": "execution", "external_references": [ { "source_name": "mitre-attack", "external_id": "TA0002", "url": "https://attack.mitre.org/tactics/TA0002/", } ], }, # Revoked tactic — must be skipped { "type": "x-mitre-tactic", "id": "x-mitre-tactic--ta0099", "name": "Doomed", "x_mitre_shortname": "doomed", "revoked": True, "external_references": [ {"source_name": "mitre-attack", "external_id": "TA0099"} ], }, # Technique T1059 covers both tactics { "type": "attack-pattern", "id": "attack-pattern--t1059", "name": "Command and Scripting Interpreter", "description": "Use shells.", "kill_chain_phases": [ {"kill_chain_name": "mitre-attack", "phase_name": "initial-access"}, {"kill_chain_name": "mitre-attack", "phase_name": "execution"}, ], "external_references": [ { "source_name": "mitre-attack", "external_id": "T1059", "url": "https://attack.mitre.org/techniques/T1059/", } ], }, # Technique T1078 only initial-access { "type": "attack-pattern", "id": "attack-pattern--t1078", "name": "Valid Accounts", "description": "Use legit creds.", "kill_chain_phases": [ {"kill_chain_name": "mitre-attack", "phase_name": "initial-access"}, ], "external_references": [ { "source_name": "mitre-attack", "external_id": "T1078", "url": "https://attack.mitre.org/techniques/T1078/", } ], }, # Deprecated technique — skipped { "type": "attack-pattern", "id": "attack-pattern--t1190", "name": "Exploit Public-Facing Application", "x_mitre_deprecated": True, "external_references": [ {"source_name": "mitre-attack", "external_id": "T1190"} ], }, # Sub-technique of T1059 { "type": "attack-pattern", "id": "attack-pattern--t1059-001", "name": "PowerShell", "description": "Windows shell.", "x_mitre_is_subtechnique": True, "external_references": [ { "source_name": "mitre-attack", "external_id": "T1059.001", "url": "https://attack.mitre.org/techniques/T1059/001/", } ], }, # Relationship attaching the sub to its parent { "type": "relationship", "id": "relationship--rel1", "relationship_type": "subtechnique-of", "source_ref": "attack-pattern--t1059-001", "target_ref": "attack-pattern--t1059", }, ], } @pytest.fixture() def fixture_bundle_path(tmp_path: Path) -> Path: path = tmp_path / "minimal-stix.json" path.write_text(json.dumps(MINIMAL_BUNDLE)) return path # === Parser unit tests ======================================================= def test_parser_extracts_active_objects(fixture_bundle_path): parsed = mitre_svc.parse_bundle(fixture_bundle_path) assert len(parsed.tactics) == 2 # TA0001 + TA0002 (TA0099 revoked) assert {t["external_id"] for t in parsed.tactics} == {"TA0001", "TA0002"} assert len(parsed.techniques) == 2 # T1059 + T1078 (T1190 deprecated) assert {t["external_id"] for t in parsed.techniques} == {"T1059", "T1078"} assert len(parsed.subtechniques) == 1 sb = parsed.subtechniques[0] assert sb["external_id"] == "T1059.001" assert sb["parent_stix_id"] == "attack-pattern--t1059" # === Seed integration tests ================================================== def test_seed_against_fixture(app, fixture_bundle_path): result = mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None) assert result.tactics_upserted == 2 assert result.techniques_upserted == 2 assert result.subtechniques_upserted == 1 assert result.subtechniques_skipped_orphan == 0 assert result.technique_tactic_links == 3 # T1059→TA0001, T1059→TA0002, T1078→TA0001 def test_seed_is_idempotent(app, fixture_bundle_path): """Running twice yields the same row counts and no SQL errors.""" first = mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None) second = mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None) assert (first.tactics_upserted, first.techniques_upserted, first.subtechniques_upserted) == ( second.tactics_upserted, second.techniques_upserted, second.subtechniques_upserted, ) def test_seed_persists_setting(app, fixture_bundle_path): """settings table records the last sync timestamp + source URL.""" mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None) status = mitre_svc.read_status() assert status["last_sync"] is not None # We seeded from a local path so version is None and source_url is the path string. assert status["source_url"] == str(fixture_bundle_path) assert status["version"] is None # only set when source == MITRE_DEFAULT_URL def test_checksum_mismatch_aborts(tmp_path): """A wrong sha256 triggers MitreChecksumMismatch and skips DB writes.""" path = tmp_path / "tiny.json" path.write_text(json.dumps(MINIMAL_BUNDLE)) # Force the URL path so download() is invoked. We mock by passing a file:// URL. # Simpler: call _download() directly with a bogus hash. bogus = "0" * 64 with pytest.raises(mitre_svc.MitreChecksumMismatch): mitre_svc._download( f"file://{path}", tmp_path / "out.json", expected_sha256=bogus ) # === API endpoint tests ====================================================== def test_list_tactics_requires_auth(app, fixture_bundle_path): mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None) with app.test_client() as c: r = c.get("/api/v1/mitre/tactics") assert r.status_code == 401 def test_list_tactics_returns_seeded(app, admin_credentials, fixture_bundle_path): mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None) with app.test_client() as c: access = _login(c, admin_credentials["email"], admin_credentials["password"]) r = c.get( "/api/v1/mitre/tactics", headers={"Authorization": f"Bearer {access}"} ) assert r.status_code == 200 body = r.get_json() assert body["total"] == 2 ids = [t["external_id"] for t in body["items"]] assert "TA0001" in ids and "TA0002" in ids def test_filter_techniques_by_tactic(app, admin_credentials, fixture_bundle_path): mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None) with app.test_client() as c: access = _login(c, admin_credentials["email"], admin_credentials["password"]) r = c.get( "/api/v1/mitre/techniques?tactic=TA0002", headers={"Authorization": f"Bearer {access}"}, ) assert r.status_code == 200 body = r.get_json() # Only T1059 covers TA0002 (execution); T1078 covers initial-access only. ext_ids = [t["external_id"] for t in body["items"]] assert ext_ids == ["T1059"] def test_subtechniques_listed_under_parent(app, admin_credentials, fixture_bundle_path): mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None) with app.test_client() as c: access = _login(c, admin_credentials["email"], admin_credentials["password"]) r = c.get( "/api/v1/mitre/subtechniques?technique=T1059", headers={"Authorization": f"Bearer {access}"}, ) assert r.status_code == 200 body = r.get_json() ext_ids = [t["external_id"] for t in body["items"]] assert ext_ids == ["T1059.001"] def test_status_endpoint(app, admin_credentials, fixture_bundle_path): mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None) with app.test_client() as c: access = _login(c, admin_credentials["email"], admin_credentials["password"]) r = c.get("/api/v1/mitre/status", headers={"Authorization": f"Bearer {access}"}) assert r.status_code == 200 body = r.get_json() assert body["last_sync"] is not None assert body["default_url"].startswith("https://") assert body["default_version"] def test_sync_endpoint_requires_perm(app, admin_credentials, fixture_bundle_path): """A non-admin without mitre.sync gets 403.""" mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None) with app.test_client() as c: # Bootstrap a no-perm user via invitation. admin_access = _login(c, admin_credentials["email"], admin_credentials["password"]) eve_email = _unique_email("eve") inv = c.post( "/api/v1/invitations", headers={"Authorization": f"Bearer {admin_access}"}, json={"email_hint": eve_email}, ) token = inv.get_json()["token"] c.post( f"/api/v1/invitations/accept/{token}", json={"email": eve_email, "password": "EvePass1234!"}, ) eve_access = _login(c, eve_email, "EvePass1234!") r = c.post( "/api/v1/mitre/sync", headers={"Authorization": f"Bearer {eve_access}"} ) assert r.status_code == 403 def test_search_filter_on_name(app, admin_credentials, fixture_bundle_path): mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None) with app.test_client() as c: access = _login(c, admin_credentials["email"], admin_credentials["password"]) r = c.get( "/api/v1/mitre/techniques?q=valid", headers={"Authorization": f"Bearer {access}"}, ) assert r.status_code == 200 ext_ids = [t["external_id"] for t in r.get_json()["items"]] assert ext_ids == ["T1078"] def test_matrix_endpoint_returns_nested_grid(app, admin_credentials, fixture_bundle_path): """GET /mitre/matrix returns the flat tactic→technique→subtechnique grid.""" mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None) with app.test_client() as c: access = _login(c, admin_credentials["email"], admin_credentials["password"]) r = c.get("/api/v1/mitre/matrix", headers={"Authorization": f"Bearer {access}"}) assert r.status_code == 200 body = r.get_json() tactics = body["tactics"] assert {t["external_id"] for t in tactics} == {"TA0001", "TA0002"} # TA0001 has T1059 (multi-tactic) + T1078; T1059 carries its sub. ta0001 = next(t for t in tactics if t["external_id"] == "TA0001") techs = {t["external_id"]: t for t in ta0001["techniques"]} assert set(techs.keys()) == {"T1059", "T1078"} assert techs["T1059"]["subtechniques"][0]["external_id"] == "T1059.001" assert techs["T1078"]["subtechniques"] == [] # TA0002 only carries T1059 (no T1078). ta0002 = next(t for t in tactics if t["external_id"] == "TA0002") assert [t["external_id"] for t in ta0002["techniques"]] == ["T1059"] def test_matrix_endpoint_requires_auth(app, fixture_bundle_path): mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None) with app.test_client() as c: assert c.get("/api/v1/mitre/matrix").status_code == 401