"""M5 — Template catalogue integration tests. Covers `test_template` and `scenario_template` CRUD + ordering + perm gating. Relies on a minimal MITRE seed (T1059 / TA0001 / T1059.001) so the polymorphic tag join can be exercised end-to-end. """ from __future__ import annotations import json import secrets import pytest from sqlalchemy import text from app.core.install_token import regenerate_install_token from app.main import create_app from app.services import mitre_seed as mitre_svc def _truncate_all(engine): with engine.begin() as conn: conn.execute( text( "TRUNCATE users, refresh_tokens, invitations, invitation_groups, " "user_groups, group_permissions, permissions, settings, groups, " "scenario_template_tests, scenario_templates, " "test_template_mitre_tags, test_templates, " "mitre_subtechniques, mitre_technique_tactics, mitre_techniques, " "mitre_tactics RESTART IDENTITY CASCADE" ) ) # Same minimal bundle as in test_mitre.py — keeps tag resolution deterministic # without re-pulling the full enterprise STIX bundle. _MINIMAL_BUNDLE = { "type": "bundle", "id": "bundle--00000000-0000-0000-0000-000000000002", "spec_version": "2.1", "objects": [ { "type": "x-mitre-tactic", "id": "x-mitre-tactic--ta0001", "name": "Initial Access", "x_mitre_shortname": "initial-access", "external_references": [ {"source_name": "mitre-attack", "external_id": "TA0001"} ], }, { "type": "x-mitre-tactic", "id": "x-mitre-tactic--ta0002", "name": "Execution", "x_mitre_shortname": "execution", "external_references": [ {"source_name": "mitre-attack", "external_id": "TA0002"} ], }, { "type": "attack-pattern", "id": "attack-pattern--t1059", "name": "Command and Scripting Interpreter", "kill_chain_phases": [ {"kill_chain_name": "mitre-attack", "phase_name": "execution"} ], "external_references": [ {"source_name": "mitre-attack", "external_id": "T1059"} ], }, { "type": "attack-pattern", "id": "attack-pattern--t1059-001", "name": "PowerShell", "x_mitre_is_subtechnique": True, "external_references": [ {"source_name": "mitre-attack", "external_id": "T1059.001"} ], }, { "type": "relationship", "id": "relationship--rel1", "relationship_type": "subtechnique-of", "source_ref": "attack-pattern--t1059-001", "target_ref": "attack-pattern--t1059", }, ], } @pytest.fixture(scope="module") def app(db_engine_or_skip, tmp_path_factory): _truncate_all(db_engine_or_skip) bundle_path = tmp_path_factory.mktemp("m5") / "stix.json" bundle_path.write_text(json.dumps(_MINIMAL_BUNDLE)) mitre_svc.seed_mitre(source=bundle_path, expected_sha256=None) flask_app = create_app() flask_app.config.update(TESTING=True) return flask_app @pytest.fixture() def client(app): return app.test_client() def _unique_email(prefix: str) -> str: return f"{prefix}-{secrets.token_hex(4)}@metamorph.local" @pytest.fixture(scope="module") def admin(app): token = regenerate_install_token() email = _unique_email("admin") password = "AdminPass1234!" with app.test_client() as c: r = c.post( "/api/v1/setup", json={"install_token": token, "email": email, "password": password}, ) assert r.status_code == 201, r.get_data(as_text=True) return {"email": email, "password": password} def _login(client, email: str, password: str) -> str: r = client.post("/api/v1/auth/login", json={"email": email, "password": password}) assert r.status_code == 200, r.get_data(as_text=True) return r.get_json()["access_token"] def _bearer(token: str) -> dict[str, str]: return {"Authorization": f"Bearer {token}"} @pytest.fixture() def admin_token(client, admin) -> str: return _login(client, admin["email"], admin["password"]) # === Reader fixture: an invited user with only `test_template.read` ========= def _bootstrap_user_without_perms(client, admin_token: str, prefix: str) -> tuple[str, str]: email = _unique_email(prefix) inv = client.post( "/api/v1/invitations", headers=_bearer(admin_token), json={"email_hint": email}, ) token = inv.get_json()["token"] password = "ReaderPass1234!" client.post( f"/api/v1/invitations/accept/{token}", json={"email": email, "password": password}, ) return email, _login(client, email, password) # === test_template CRUD ===================================================== def _make_test(client, admin_token: str, **overrides): body = { "name": overrides.pop("name", f"Test {secrets.token_hex(2)}"), "description": overrides.pop("description", "auto"), "objective": "do thing", "procedure_md": "1. step", "expected_result_red_md": "red expectation", "expected_detection_blue_md": "blue expectation", "opsec_level": overrides.pop("opsec_level", "medium"), "tags": overrides.pop("tags", ["fast"]), "expected_iocs": ["evil.exe"], "mitre_tags": overrides.pop("mitre_tags", [{"kind": "technique", "external_id": "T1059"}]), **overrides, } r = client.post("/api/v1/test-templates", headers=_bearer(admin_token), json=body) assert r.status_code == 201, r.get_data(as_text=True) return r.get_json() def test_create_test_template_with_mitre_tags(client, admin_token): body = _make_test( client, admin_token, name="PowerShell exec", mitre_tags=[ {"kind": "tactic", "external_id": "TA0002"}, {"kind": "technique", "external_id": "T1059"}, {"kind": "subtechnique", "external_id": "T1059.001"}, ], ) assert body["opsec_level"] == "medium" kinds = sorted((t["kind"], t["external_id"]) for t in body["mitre_tags"]) assert kinds == [ ("subtechnique", "T1059.001"), ("tactic", "TA0002"), ("technique", "T1059"), ] def test_create_test_template_rejects_unknown_mitre(client, admin_token): r = client.post( "/api/v1/test-templates", headers=_bearer(admin_token), json={ "name": "Bad", "mitre_tags": [{"kind": "technique", "external_id": "T9999"}], }, ) assert r.status_code == 400 assert r.get_json()["error"] == "unknown_mitre_tag" def test_create_test_template_rejects_bad_opsec(client, admin_token): r = client.post( "/api/v1/test-templates", headers=_bearer(admin_token), json={"name": "Bad", "opsec_level": "burner"}, ) assert r.status_code == 400 def test_list_test_templates_filter_by_tactic(client, admin_token): _make_test( client, admin_token, name="filterable-1", mitre_tags=[{"kind": "tactic", "external_id": "TA0002"}], ) r = client.get( "/api/v1/test-templates?tactic=TA0002", headers=_bearer(admin_token), ) assert r.status_code == 200 body = r.get_json() names = [it["name"] for it in body["items"]] assert "filterable-1" in names def test_list_test_templates_filter_by_opsec(client, admin_token): _make_test(client, admin_token, name="high-opsec", opsec_level="high") r = client.get( "/api/v1/test-templates?opsec=high", headers=_bearer(admin_token), ) assert r.status_code == 200 names = [it["name"] for it in r.get_json()["items"]] assert "high-opsec" in names assert all(it["opsec_level"] == "high" for it in r.get_json()["items"]) def test_list_test_templates_filter_by_tag(client, admin_token): _make_test(client, admin_token, name="tagged-fast", tags=["fast", "phish"]) r = client.get( "/api/v1/test-templates?tag=phish", headers=_bearer(admin_token), ) assert r.status_code == 200 names = [it["name"] for it in r.get_json()["items"]] assert "tagged-fast" in names def test_list_test_templates_search_q(client, admin_token): _make_test(client, admin_token, name="unique-token-azertyuiop") r = client.get( "/api/v1/test-templates?q=AZERTYUIOP", # case-insensitive headers=_bearer(admin_token), ) assert r.status_code == 200 names = [it["name"] for it in r.get_json()["items"]] assert "unique-token-azertyuiop" in names def test_update_test_template_replaces_mitre_tags(client, admin_token): body = _make_test( client, admin_token, name="to-update", mitre_tags=[{"kind": "tactic", "external_id": "TA0001"}], ) r = client.put( f"/api/v1/test-templates/{body['id']}", headers=_bearer(admin_token), json={"mitre_tags": [{"kind": "technique", "external_id": "T1059"}]}, ) assert r.status_code == 200, r.get_data(as_text=True) updated = r.get_json() kinds = [(t["kind"], t["external_id"]) for t in updated["mitre_tags"]] assert kinds == [("technique", "T1059")] def test_update_test_template_partial_keeps_unset_fields(client, admin_token): body = _make_test( client, admin_token, name="partial-update", opsec_level="low", tags=["a", "b"], ) r = client.put( f"/api/v1/test-templates/{body['id']}", headers=_bearer(admin_token), json={"name": "renamed"}, ) assert r.status_code == 200 updated = r.get_json() assert updated["name"] == "renamed" assert updated["opsec_level"] == "low" # untouched assert set(updated["tags"]) == {"a", "b"} # untouched def test_soft_delete_then_list_hides_by_default(client, admin_token): body = _make_test(client, admin_token, name="to-be-deleted") r = client.delete( f"/api/v1/test-templates/{body['id']}", headers=_bearer(admin_token) ) assert r.status_code == 200 r2 = client.get("/api/v1/test-templates", headers=_bearer(admin_token)) names = [it["name"] for it in r2.get_json()["items"]] assert "to-be-deleted" not in names # And reappears with include_deleted=true r3 = client.get( "/api/v1/test-templates?include_deleted=true", headers=_bearer(admin_token), ) names3 = [it["name"] for it in r3.get_json()["items"]] assert "to-be-deleted" in names3 def test_read_perm_required(client, admin_token): """A user without `test_template.read` gets 403.""" _, eve_token = _bootstrap_user_without_perms(client, admin_token, "eve-noperm") r = client.get("/api/v1/test-templates", headers=_bearer(eve_token)) assert r.status_code == 403 def test_write_perm_required(client, admin_token): """A user with only `test_template.read` cannot create. Bootstrap path: create a dedicated group via the admin API, bind only the `test_template.read` perm, then invite a user pre-assigned to that group. """ # 1. Create the read-only group + bind the single perm. grp = client.post( "/api/v1/groups", headers=_bearer(admin_token), json={"name": f"tpl-reader-{secrets.token_hex(2)}"}, ).get_json() r_set = client.put( f"/api/v1/groups/{grp['id']}/permissions", headers=_bearer(admin_token), json={"codes": ["test_template.read"]}, ) assert r_set.status_code == 200, r_set.get_data(as_text=True) # 2. Invite a user already attached to that group. email = _unique_email("alice-readonly") password = "ReaderPass1234!" inv = client.post( "/api/v1/invitations", headers=_bearer(admin_token), json={"email_hint": email, "group_ids": [grp["id"]]}, ).get_json() client.post( f"/api/v1/invitations/accept/{inv['token']}", json={"email": email, "password": password}, ) token = _login(client, email, password) r = client.get("/api/v1/test-templates", headers=_bearer(token)) assert r.status_code == 200, r.get_data(as_text=True) r2 = client.post( "/api/v1/test-templates", headers=_bearer(token), json={"name": "X"} ) assert r2.status_code == 403 # === scenario_template CRUD ================================================= def test_create_scenario_with_ordered_tests(client, admin_token): a = _make_test(client, admin_token, name="scn-a") b = _make_test(client, admin_token, name="scn-b") c = _make_test(client, admin_token, name="scn-c") r = client.post( "/api/v1/scenario-templates", headers=_bearer(admin_token), json={ "name": "phishing-flow", "description": "click → exec → persist", "test_template_ids": [a["id"], b["id"], c["id"]], }, ) assert r.status_code == 201, r.get_data(as_text=True) body = r.get_json() assert body["tests_count"] == 3 assert [t["position"] for t in body["tests"]] == [0, 1, 2] assert [t["test_template_name"] for t in body["tests"]] == ["scn-a", "scn-b", "scn-c"] def test_reorder_scenario_tests(client, admin_token): a = _make_test(client, admin_token, name="reord-a") b = _make_test(client, admin_token, name="reord-b") c = _make_test(client, admin_token, name="reord-c") created = client.post( "/api/v1/scenario-templates", headers=_bearer(admin_token), json={ "name": "reorder-me", "test_template_ids": [a["id"], b["id"], c["id"]], }, ).get_json() # Reverse order. r = client.put( f"/api/v1/scenario-templates/{created['id']}/tests", headers=_bearer(admin_token), json={"test_template_ids": [c["id"], b["id"], a["id"]]}, ) assert r.status_code == 200 after = r.get_json() assert [t["test_template_name"] for t in after["tests"]] == ["reord-c", "reord-b", "reord-a"] # Re-reading via GET yields the same order — confirms persistence. fresh = client.get( f"/api/v1/scenario-templates/{created['id']}", headers=_bearer(admin_token) ).get_json() assert [t["test_template_name"] for t in fresh["tests"]] == ["reord-c", "reord-b", "reord-a"] def test_scenario_rejects_unknown_test_id(client, admin_token): r = client.post( "/api/v1/scenario-templates", headers=_bearer(admin_token), json={ "name": "bad", "test_template_ids": ["00000000-0000-0000-0000-000000000000"], }, ) assert r.status_code == 400 assert r.get_json()["error"] == "unknown_test_template" def test_scenario_rejects_soft_deleted_test_on_create(client, admin_token): a = _make_test(client, admin_token, name="will-be-deleted") client.delete(f"/api/v1/test-templates/{a['id']}", headers=_bearer(admin_token)) r = client.post( "/api/v1/scenario-templates", headers=_bearer(admin_token), json={"name": "linked", "test_template_ids": [a["id"]]}, ) assert r.status_code == 400 assert r.get_json()["error"] == "unknown_test_template" def test_scenario_surfaces_soft_deleted_test_after_link(client, admin_token): """Once linked, a test can be soft-deleted without breaking the scenario — the join row stays and the API flags the test as deleted.""" a = _make_test(client, admin_token, name="linked-then-deleted") sc = client.post( "/api/v1/scenario-templates", headers=_bearer(admin_token), json={"name": "survives", "test_template_ids": [a["id"]]}, ).get_json() client.delete(f"/api/v1/test-templates/{a['id']}", headers=_bearer(admin_token)) fresh = client.get( f"/api/v1/scenario-templates/{sc['id']}", headers=_bearer(admin_token) ).get_json() assert fresh["tests"][0]["test_template_deleted"] is True def test_scenario_soft_delete(client, admin_token): sc = client.post( "/api/v1/scenario-templates", headers=_bearer(admin_token), json={"name": "doomed-scn"}, ).get_json() r = client.delete( f"/api/v1/scenario-templates/{sc['id']}", headers=_bearer(admin_token) ) assert r.status_code == 200 names = [ it["name"] for it in client.get( "/api/v1/scenario-templates", headers=_bearer(admin_token) ).get_json()["items"] ] assert "doomed-scn" not in names def test_scenario_perm_required(client, admin_token): _, eve_token = _bootstrap_user_without_perms(client, admin_token, "scn-eve") r = client.get("/api/v1/scenario-templates", headers=_bearer(eve_token)) assert r.status_code == 403