"""M6 — Mission CRUD, snapshot fidelity, membership visibility, transitions. The fixture stack mirrors `test_templates.py`: one shared `app` per module, fresh truncate at the start, a minimal MITRE bundle seeded for tag resolution, plus a small catalogue of test_templates and scenario_templates created via the admin API so the snapshot path is exercised end-to-end (not via raw ORM). """ from __future__ import annotations import json import secrets import pytest from sqlalchemy import text from app.core.install_token import regenerate_install_token from app.main import create_app from app.services import mitre_seed as mitre_svc def _truncate_all(engine): with engine.begin() as conn: # Order matches /diag/reset: missions before templates before MITRE. conn.execute( text( "TRUNCATE mission_test_mitre_tags, mission_tests, " "mission_scenarios, mission_categories, mission_members, " "missions RESTART IDENTITY CASCADE" ) ) conn.execute( text( "TRUNCATE scenario_template_tests, scenario_templates, " "test_template_mitre_tags, test_templates " "RESTART IDENTITY CASCADE" ) ) conn.execute( text( "TRUNCATE users, refresh_tokens, invitations, invitation_groups, " "user_groups, group_permissions, permissions, settings, groups " "RESTART IDENTITY CASCADE" ) ) conn.execute( text( "TRUNCATE mitre_technique_tactics, mitre_subtechniques, " "mitre_techniques, mitre_tactics RESTART IDENTITY CASCADE" ) ) _MINIMAL_BUNDLE = { "type": "bundle", "id": "bundle--00000000-0000-0000-0000-000000000006", "spec_version": "2.1", "objects": [ { "type": "x-mitre-tactic", "id": "x-mitre-tactic--ta0002", "name": "Execution", "x_mitre_shortname": "execution", "external_references": [ {"source_name": "mitre-attack", "external_id": "TA0002"} ], }, { "type": "attack-pattern", "id": "attack-pattern--t1059", "name": "Command and Scripting Interpreter", "kill_chain_phases": [ {"kill_chain_name": "mitre-attack", "phase_name": "execution"} ], "external_references": [ {"source_name": "mitre-attack", "external_id": "T1059"} ], }, { "type": "attack-pattern", "id": "attack-pattern--t1059-001", "name": "PowerShell", "x_mitre_is_subtechnique": True, "external_references": [ {"source_name": "mitre-attack", "external_id": "T1059.001"} ], }, { "type": "relationship", "id": "relationship--rel1", "relationship_type": "subtechnique-of", "source_ref": "attack-pattern--t1059-001", "target_ref": "attack-pattern--t1059", }, ], } @pytest.fixture(scope="module") def app(db_engine_or_skip, tmp_path_factory): _truncate_all(db_engine_or_skip) bundle_path = tmp_path_factory.mktemp("m6") / "stix.json" bundle_path.write_text(json.dumps(_MINIMAL_BUNDLE)) mitre_svc.seed_mitre(source=bundle_path, expected_sha256=None) flask_app = create_app() flask_app.config.update(TESTING=True) return flask_app @pytest.fixture() def client(app): return app.test_client() def _unique_email(prefix: str) -> str: return f"{prefix}-{secrets.token_hex(4)}@metamorph.local" def _bearer(token: str) -> dict[str, str]: return {"Authorization": f"Bearer {token}"} def _login(client, email: str, password: str) -> str: r = client.post("/api/v1/auth/login", json={"email": email, "password": password}) assert r.status_code == 200, r.get_data(as_text=True) return r.get_json()["access_token"] @pytest.fixture(scope="module") def admin(app): token = regenerate_install_token() email = _unique_email("admin") password = "AdminPass1234!" with app.test_client() as c: r = c.post( "/api/v1/setup", json={"install_token": token, "email": email, "password": password}, ) assert r.status_code == 201, r.get_data(as_text=True) return {"email": email, "password": password} @pytest.fixture() def admin_token(client, admin) -> str: return _login(client, admin["email"], admin["password"]) # ---------------------------------------------------------------- catalogue -- def _mitre_kind(external_id: str) -> str: if external_id.startswith("TA"): return "tactic" if "." in external_id: return "subtechnique" return "technique" def _make_test_template(client, admin_token: str, *, name: str, mitre: str = "T1059"): body = { "name": name, "description": "auto", "objective": "do thing", "procedure_md": f"# {name}\n1. run", "expected_result_red_md": "red expectation", "expected_detection_blue_md": "blue expectation", "opsec_level": "medium", "tags": ["fast"], "expected_iocs": ["evil.exe"], "mitre_tags": [{"kind": _mitre_kind(mitre), "external_id": mitre}], } r = client.post("/api/v1/test-templates", headers=_bearer(admin_token), json=body) assert r.status_code == 201, r.get_data(as_text=True) return r.get_json() def _make_scenario(client, admin_token: str, *, name: str, test_ids: list[str]): r = client.post( "/api/v1/scenario-templates", headers=_bearer(admin_token), json={ "name": name, "description": f"auto-{name}", "test_template_ids": test_ids, }, ) assert r.status_code == 201, r.get_data(as_text=True) return r.get_json() @pytest.fixture(scope="module") def catalogue(app, admin): """Pre-seeded templates + scenarios so tests can reference them by id.""" with app.test_client() as c: tok = _login(c, admin["email"], admin["password"]) t1 = _make_test_template(c, tok, name="cat-test-1", mitre="T1059") t2 = _make_test_template( c, tok, name="cat-test-2", mitre="T1059.001" ) t3 = _make_test_template(c, tok, name="cat-test-3", mitre="T1059") sc_one = _make_scenario( c, tok, name="cat-scenario-A", test_ids=[t1["id"], t2["id"], t3["id"]] ) sc_solo = _make_scenario(c, tok, name="cat-scenario-B", test_ids=[t1["id"]]) return { "tests": {"t1": t1, "t2": t2, "t3": t3}, "scenarios": {"a": sc_one, "b": sc_solo}, } # --------------------------------------------------------------------- users -- def _invite_user(client, admin_token: str, prefix: str, group_codes: list[str]) -> dict: """Invite a user pre-bound to a freshly-minted group with the listed perm codes.""" grp = client.post( "/api/v1/groups", headers=_bearer(admin_token), json={"name": f"{prefix}-grp-{secrets.token_hex(2)}"}, ).get_json() r_set = client.put( f"/api/v1/groups/{grp['id']}/permissions", headers=_bearer(admin_token), json={"codes": group_codes}, ) assert r_set.status_code == 200, r_set.get_data(as_text=True) email = _unique_email(prefix) password = "Pass1234!" inv = client.post( "/api/v1/invitations", headers=_bearer(admin_token), json={"email_hint": email, "group_ids": [grp["id"]]}, ) assert inv.status_code == 201, inv.get_data(as_text=True) accept_token = inv.get_json()["token"] r = client.post( f"/api/v1/invitations/accept/{accept_token}", json={"email": email, "password": password}, ) assert r.status_code == 201, r.get_data(as_text=True) me_token = _login(client, email, password) me = client.get("/api/v1/auth/me", headers=_bearer(me_token)).get_json() return {"email": email, "password": password, "token": me_token, "id": me["id"]} @pytest.fixture() def red_user(client, admin_token): return _invite_user( client, admin_token, "red", [ "mission.read", "mission.create", "mission.update", "mission.archive", "mission.write_red_fields", ], ) @pytest.fixture() def blue_user(client, admin_token): return _invite_user( client, admin_token, "blue", ["mission.read", "mission.write_blue_fields"], ) @pytest.fixture() def reader_user(client, admin_token): """A user with mission.read only — for "non-member can't see" checks.""" return _invite_user(client, admin_token, "reader", ["mission.read"]) @pytest.fixture() def noperm_user(client, admin_token): """A user with no mission perms at all.""" return _invite_user(client, admin_token, "noperm", []) # ================================================================ snapshot == def test_create_mission_snapshots_scenarios_and_tests(client, admin_token, catalogue): r = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={ "name": "snapshot-fidelity", "client_target": "Acme Corp", "description_md": "## ROE\n- approved\n", "scenario_template_ids": [catalogue["scenarios"]["a"]["id"]], }, ) assert r.status_code == 201, r.get_data(as_text=True) body = r.get_json() assert body["status"] == "draft" assert body["visibility_mode"] == "whitebox" assert body["scenarios_count"] == 1 assert body["tests_count"] == 3 assert body["members_count"] == 0 # admin creator is not auto-added sc = body["scenarios"][0] assert sc["position"] == 0 assert sc["snapshot_name"] == "cat-scenario-A" names_in_order = [t["snapshot_name"] for t in sc["tests"]] assert names_in_order == ["cat-test-1", "cat-test-2", "cat-test-3"] # MITRE denormalised into the snapshot t1 = next(t for t in sc["tests"] if t["snapshot_name"] == "cat-test-1") kinds = [(tag["kind"], tag["external_id"]) for tag in t1["mitre_tags"]] assert kinds == [("technique", "T1059")] def test_snapshot_is_frozen_after_template_edits(client, admin_token, catalogue): create = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={ "name": "frozen-after-edits", "scenario_template_ids": [catalogue["scenarios"]["b"]["id"]], }, ) assert create.status_code == 201 mission_id = create.get_json()["id"] # Mutate the source test_template: rename + change MITRE edit = client.put( f"/api/v1/test-templates/{catalogue['tests']['t1']['id']}", headers=_bearer(admin_token), json={ "name": "RENAMED-AFTER-SNAPSHOT", "mitre_tags": [{"kind": "tactic", "external_id": "TA0002"}], }, ) assert edit.status_code == 200 # Mission still sees the pre-edit snapshot again = client.get( f"/api/v1/missions/{mission_id}", headers=_bearer(admin_token) ).get_json() sc = again["scenarios"][0] assert sc["tests"][0]["snapshot_name"] == "cat-test-1" assert [(t["kind"], t["external_id"]) for t in sc["tests"][0]["mitre_tags"]] == [ ("technique", "T1059") ] # Revert the rename so other tests still find the original name client.put( f"/api/v1/test-templates/{catalogue['tests']['t1']['id']}", headers=_bearer(admin_token), json={ "name": "cat-test-1", "mitre_tags": [{"kind": "technique", "external_id": "T1059"}], }, ) def test_create_mission_rejects_unknown_scenario(client, admin_token): r = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={ "name": "bad-ref", "scenario_template_ids": ["00000000-0000-0000-0000-000000000099"], }, ) assert r.status_code == 400 assert r.get_json()["error"] == "unknown_scenario_template" def test_create_mission_validates_dates(client, admin_token): r = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={ "name": "date-flip", "date_start": "2026-06-01", "date_end": "2026-05-01", }, ) assert r.status_code == 400 assert "date_end" in r.get_json().get("message", "").lower() # =================================================== membership visibility == def test_non_admin_creator_auto_added(client, red_user, catalogue): r = client.post( "/api/v1/missions", headers=_bearer(red_user["token"]), json={ "name": "red-self-created", "scenario_template_ids": [catalogue["scenarios"]["b"]["id"]], }, ) assert r.status_code == 201, r.get_data(as_text=True) body = r.get_json() assert body["members_count"] == 1 assert body["members"][0]["user_id"] == red_user["id"] assert body["members"][0]["role_hint"] == "red" # And the red user can see it back via /missions r2 = client.get("/api/v1/missions", headers=_bearer(red_user["token"])) ids = [it["id"] for it in r2.get_json()["items"]] assert body["id"] in ids def test_non_admin_cannot_see_missions_they_are_not_members_of( client, admin_token, reader_user, catalogue ): # Admin creates a mission with NO members create = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={ "name": "hidden-from-reader", "scenario_template_ids": [catalogue["scenarios"]["b"]["id"]], }, ) assert create.status_code == 201 mid = create.get_json()["id"] # Reader has mission.read but is not a member → empty list + 404 r_list = client.get("/api/v1/missions", headers=_bearer(reader_user["token"])) ids = [it["id"] for it in r_list.get_json()["items"]] assert mid not in ids r_get = client.get( f"/api/v1/missions/{mid}", headers=_bearer(reader_user["token"]) ) assert r_get.status_code == 404 def test_non_member_get_returns_404_not_403(client, admin_token, reader_user): """Existence leak guard: non-members should see 404, not 403.""" create = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={"name": "stealth-mission"}, ) mid = create.get_json()["id"] r = client.get(f"/api/v1/missions/{mid}", headers=_bearer(reader_user["token"])) assert r.status_code == 404 # ===================================================== perm gating ========== def test_create_requires_mission_create_perm(client, blue_user): """Blue team users (no mission.create) cannot create missions.""" r = client.post( "/api/v1/missions", headers=_bearer(blue_user["token"]), json={"name": "no-perm"}, ) assert r.status_code == 403 def test_list_requires_mission_read_perm(client, noperm_user): r = client.get("/api/v1/missions", headers=_bearer(noperm_user["token"])) assert r.status_code == 403 def test_archive_requires_mission_archive_not_just_update(client, admin_token): """A user with mission.update but no mission.archive cannot archive.""" # blue_user only has mission.read + mission.write_blue_fields — no update either. # We'll craft a user with update-only here. update_only = _invite_user(client, admin_token, "u-only", ["mission.read", "mission.update"]) create = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={ "name": "to-archive", "members": [{"user_id": update_only["id"], "role_hint": "red"}], }, ) assert create.status_code == 201 mid = create.get_json()["id"] # update-only can transition to in_progress (mission.update is enough) r1 = client.post( f"/api/v1/missions/{mid}/transition", headers=_bearer(update_only["token"]), json={"status": "in_progress"}, ) assert r1.status_code == 200 # … but cannot archive r2 = client.post( f"/api/v1/missions/{mid}/transition", headers=_bearer(update_only["token"]), json={"status": "archived"}, ) assert r2.status_code == 403 # ==================================================== status transitions == def test_valid_transition_chain(client, admin_token): create = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={"name": "chain"}, ) mid = create.get_json()["id"] for target in ("in_progress", "completed", "archived"): r = client.post( f"/api/v1/missions/{mid}/transition", headers=_bearer(admin_token), json={"status": target}, ) assert r.status_code == 200, (target, r.get_data(as_text=True)) assert r.get_json()["status"] == target def test_invalid_transition_409(client, admin_token): create = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={"name": "invalid-jump"}, ) mid = create.get_json()["id"] # draft → completed is not allowed (must pass through in_progress) r = client.post( f"/api/v1/missions/{mid}/transition", headers=_bearer(admin_token), json={"status": "completed"}, ) assert r.status_code == 409 assert r.get_json()["error"] == "invalid_transition" def test_unknown_target_status_400(client, admin_token): create = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={"name": "bad-status"}, ) mid = create.get_json()["id"] r = client.post( f"/api/v1/missions/{mid}/transition", headers=_bearer(admin_token), json={"status": "delivered"}, ) assert r.status_code == 400 def test_idempotent_same_status_transition(client, admin_token): create = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={"name": "idempotent"}, ) mid = create.get_json()["id"] r = client.post( f"/api/v1/missions/{mid}/transition", headers=_bearer(admin_token), json={"status": "draft"}, ) assert r.status_code == 200 assert r.get_json()["status"] == "draft" # ============================================================ members ===== def test_set_members_replaces_full_set(client, admin_token, red_user, blue_user): create = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={ "name": "members-replace", "members": [{"user_id": red_user["id"], "role_hint": "red"}], }, ) mid = create.get_json()["id"] r = client.put( f"/api/v1/missions/{mid}/members", headers=_bearer(admin_token), json={"members": [{"user_id": blue_user["id"], "role_hint": "blue"}]}, ) assert r.status_code == 200, r.get_data(as_text=True) body = r.get_json() assert body["members_count"] == 1 assert body["members"][0]["user_id"] == blue_user["id"] # And red can no longer see it r_red = client.get( f"/api/v1/missions/{mid}", headers=_bearer(red_user["token"]) ) assert r_red.status_code == 404 def test_set_members_rejects_unknown_user(client, admin_token): create = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={"name": "ghost-member"} ) mid = create.get_json()["id"] r = client.put( f"/api/v1/missions/{mid}/members", headers=_bearer(admin_token), json={ "members": [ { "user_id": "00000000-0000-0000-0000-000000000123", "role_hint": "red", } ] }, ) assert r.status_code == 400 assert r.get_json()["error"] == "unknown_user" def test_set_members_rejects_bad_role_hint(client, admin_token, red_user): create = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={"name": "bad-hint"} ) mid = create.get_json()["id"] r = client.put( f"/api/v1/missions/{mid}/members", headers=_bearer(admin_token), json={"members": [{"user_id": red_user["id"], "role_hint": "yellow"}]}, ) assert r.status_code == 400 # ==================================================== add scenarios ====== def test_add_scenarios_appends_at_end(client, admin_token, catalogue): create = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={ "name": "appendable", "scenario_template_ids": [catalogue["scenarios"]["b"]["id"]], }, ) mid = create.get_json()["id"] r = client.post( f"/api/v1/missions/{mid}/scenarios", headers=_bearer(admin_token), json={"scenario_template_ids": [catalogue["scenarios"]["a"]["id"]]}, ) assert r.status_code == 200, r.get_data(as_text=True) body = r.get_json() assert body["scenarios_count"] == 2 positions = [sc["position"] for sc in body["scenarios"]] assert positions == [0, 1] # Second scenario lands at position 1 sc1 = next(sc for sc in body["scenarios"] if sc["position"] == 1) assert sc1["snapshot_name"] == "cat-scenario-A" assert len(sc1["tests"]) == 3 # ============================================================= delete ===== def test_soft_delete_hides_from_list(client, admin_token): create = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={"name": "to-delete"} ) mid = create.get_json()["id"] r_del = client.delete(f"/api/v1/missions/{mid}", headers=_bearer(admin_token)) assert r_del.status_code == 200 r_list = client.get("/api/v1/missions", headers=_bearer(admin_token)) ids = [it["id"] for it in r_list.get_json()["items"]] assert mid not in ids # include_deleted=true brings it back (admin only) r_list2 = client.get( "/api/v1/missions?include_deleted=true", headers=_bearer(admin_token) ) ids2 = [it["id"] for it in r_list2.get_json()["items"]] assert mid in ids2 def test_include_deleted_forbidden_for_non_admin(client, red_user): r = client.get( "/api/v1/missions?include_deleted=true", headers=_bearer(red_user["token"]) ) assert r.status_code == 403 # ============================================================ update ====== def test_update_metadata_partial(client, admin_token): create = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={ "name": "to-rename", "client_target": "X", "date_start": "2026-06-01", "date_end": "2026-06-10", }, ) mid = create.get_json()["id"] r = client.put( f"/api/v1/missions/{mid}", headers=_bearer(admin_token), json={"name": "renamed", "client_target": None}, ) assert r.status_code == 200, r.get_data(as_text=True) body = r.get_json() assert body["name"] == "renamed" assert body["client_target"] is None # date fields untouched assert body["date_start"] == "2026-06-01" assert body["date_end"] == "2026-06-10" def test_update_rejects_inverted_dates(client, admin_token): create = client.post( "/api/v1/missions", headers=_bearer(admin_token), json={ "name": "invert", "date_start": "2026-06-01", "date_end": "2026-06-10", }, ) mid = create.get_json()["id"] r = client.put( f"/api/v1/missions/{mid}", headers=_bearer(admin_token), json={"date_end": "2026-05-01"}, ) assert r.status_code == 400