Addresses spec-reviewer + code-reviewer feedback on the M6 bundle:
Critical:
- frontend/src/lib/missions.ts: add `listPrefix()` so TanStack invalidation
catches every filtered list variant; the previous `list()` returned
`['missions','list',{}]` and only matched the exact empty-filter cache,
leaving filtered tables stale after create/transition/delete.
- backend/app/services/missions.py: acquire the same per-scenario
`pg_advisory_xact_lock` key used by `set_scenario_tests` before
snapshotting; without it a concurrent M5 reorder could freeze a torn
snapshot under READ COMMITTED. Sorted by key to avoid deadlocks with
another snapshotter.
Important:
- backend/app/api/missions.py: `@require_perm("mission.update",
"mission.archive")` on the transition endpoint so users without either
perm get 403 before the body is parsed (no shape leak via 400).
- backend/app/services/missions.py: escape `%` / `_` / `\` in user-typed
`q` / `client` LIKE search; users can no longer trigger wildcard
semantics by typing literal `%`. Added `escape='\\'` arg on every .like().
- backend/app/services/missions.py: filter `MissionTest.deleted_at` and
`MissionScenario.deleted_at` in the list-item and detail counts so M7+
soft-deletes don't drift the totals silently.
Nits:
- backend/app/api/users.py: order `/users/roster` by email for stable
rendering + deterministic e2e selectors.
- frontend/src/pages/MissionDetailPage.tsx: distinct accent per
transition target (cyan/orange/green/teal) matching the status legend.
- e2e/tests/m6-missions.spec.ts: switch fragile `getByRole(name=/In
Progress/i)` to the stable `mission-transition-in_progress` data-testid.
New tests:
- test_create_mission_rejects_soft_deleted_scenario
- test_transition_perm_gate_runs_before_payload_parse
- test_search_treats_wildcards_as_literals
Suite: 106 pytest passing (was 103), 43 Playwright passing.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
782 lines
26 KiB
Python
782 lines
26 KiB
Python
"""M6 — Mission CRUD, snapshot fidelity, membership visibility, transitions.
|
|
|
|
The fixture stack mirrors `test_templates.py`: one shared `app` per module,
|
|
fresh truncate at the start, a minimal MITRE bundle seeded for tag resolution,
|
|
plus a small catalogue of test_templates and scenario_templates created via
|
|
the admin API so the snapshot path is exercised end-to-end (not via raw ORM).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import secrets
|
|
|
|
import pytest
|
|
from sqlalchemy import text
|
|
|
|
from app.core.install_token import regenerate_install_token
|
|
from app.main import create_app
|
|
from app.services import mitre_seed as mitre_svc
|
|
|
|
|
|
def _truncate_all(engine):
|
|
with engine.begin() as conn:
|
|
# Order matches /diag/reset: missions before templates before MITRE.
|
|
conn.execute(
|
|
text(
|
|
"TRUNCATE mission_test_mitre_tags, mission_tests, "
|
|
"mission_scenarios, mission_categories, mission_members, "
|
|
"missions RESTART IDENTITY CASCADE"
|
|
)
|
|
)
|
|
conn.execute(
|
|
text(
|
|
"TRUNCATE scenario_template_tests, scenario_templates, "
|
|
"test_template_mitre_tags, test_templates "
|
|
"RESTART IDENTITY CASCADE"
|
|
)
|
|
)
|
|
conn.execute(
|
|
text(
|
|
"TRUNCATE users, refresh_tokens, invitations, invitation_groups, "
|
|
"user_groups, group_permissions, permissions, settings, groups "
|
|
"RESTART IDENTITY CASCADE"
|
|
)
|
|
)
|
|
conn.execute(
|
|
text(
|
|
"TRUNCATE mitre_technique_tactics, mitre_subtechniques, "
|
|
"mitre_techniques, mitre_tactics RESTART IDENTITY CASCADE"
|
|
)
|
|
)
|
|
|
|
|
|
_MINIMAL_BUNDLE = {
|
|
"type": "bundle",
|
|
"id": "bundle--00000000-0000-0000-0000-000000000006",
|
|
"spec_version": "2.1",
|
|
"objects": [
|
|
{
|
|
"type": "x-mitre-tactic",
|
|
"id": "x-mitre-tactic--ta0002",
|
|
"name": "Execution",
|
|
"x_mitre_shortname": "execution",
|
|
"external_references": [
|
|
{"source_name": "mitre-attack", "external_id": "TA0002"}
|
|
],
|
|
},
|
|
{
|
|
"type": "attack-pattern",
|
|
"id": "attack-pattern--t1059",
|
|
"name": "Command and Scripting Interpreter",
|
|
"kill_chain_phases": [
|
|
{"kill_chain_name": "mitre-attack", "phase_name": "execution"}
|
|
],
|
|
"external_references": [
|
|
{"source_name": "mitre-attack", "external_id": "T1059"}
|
|
],
|
|
},
|
|
{
|
|
"type": "attack-pattern",
|
|
"id": "attack-pattern--t1059-001",
|
|
"name": "PowerShell",
|
|
"x_mitre_is_subtechnique": True,
|
|
"external_references": [
|
|
{"source_name": "mitre-attack", "external_id": "T1059.001"}
|
|
],
|
|
},
|
|
{
|
|
"type": "relationship",
|
|
"id": "relationship--rel1",
|
|
"relationship_type": "subtechnique-of",
|
|
"source_ref": "attack-pattern--t1059-001",
|
|
"target_ref": "attack-pattern--t1059",
|
|
},
|
|
],
|
|
}
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def app(db_engine_or_skip, tmp_path_factory):
|
|
_truncate_all(db_engine_or_skip)
|
|
bundle_path = tmp_path_factory.mktemp("m6") / "stix.json"
|
|
bundle_path.write_text(json.dumps(_MINIMAL_BUNDLE))
|
|
mitre_svc.seed_mitre(source=bundle_path, expected_sha256=None)
|
|
flask_app = create_app()
|
|
flask_app.config.update(TESTING=True)
|
|
return flask_app
|
|
|
|
|
|
@pytest.fixture()
|
|
def client(app):
|
|
return app.test_client()
|
|
|
|
|
|
def _unique_email(prefix: str) -> str:
|
|
return f"{prefix}-{secrets.token_hex(4)}@metamorph.local"
|
|
|
|
|
|
def _bearer(token: str) -> dict[str, str]:
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
def _login(client, email: str, password: str) -> str:
|
|
r = client.post("/api/v1/auth/login", json={"email": email, "password": password})
|
|
assert r.status_code == 200, r.get_data(as_text=True)
|
|
return r.get_json()["access_token"]
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def admin(app):
|
|
token = regenerate_install_token()
|
|
email = _unique_email("admin")
|
|
password = "AdminPass1234!"
|
|
with app.test_client() as c:
|
|
r = c.post(
|
|
"/api/v1/setup",
|
|
json={"install_token": token, "email": email, "password": password},
|
|
)
|
|
assert r.status_code == 201, r.get_data(as_text=True)
|
|
return {"email": email, "password": password}
|
|
|
|
|
|
@pytest.fixture()
|
|
def admin_token(client, admin) -> str:
|
|
return _login(client, admin["email"], admin["password"])
|
|
|
|
|
|
# ---------------------------------------------------------------- catalogue --
|
|
|
|
|
|
def _mitre_kind(external_id: str) -> str:
|
|
if external_id.startswith("TA"):
|
|
return "tactic"
|
|
if "." in external_id:
|
|
return "subtechnique"
|
|
return "technique"
|
|
|
|
|
|
def _make_test_template(client, admin_token: str, *, name: str, mitre: str = "T1059"):
|
|
body = {
|
|
"name": name,
|
|
"description": "auto",
|
|
"objective": "do thing",
|
|
"procedure_md": f"# {name}\n1. run",
|
|
"expected_result_red_md": "red expectation",
|
|
"expected_detection_blue_md": "blue expectation",
|
|
"opsec_level": "medium",
|
|
"tags": ["fast"],
|
|
"expected_iocs": ["evil.exe"],
|
|
"mitre_tags": [{"kind": _mitre_kind(mitre), "external_id": mitre}],
|
|
}
|
|
r = client.post("/api/v1/test-templates", headers=_bearer(admin_token), json=body)
|
|
assert r.status_code == 201, r.get_data(as_text=True)
|
|
return r.get_json()
|
|
|
|
|
|
def _make_scenario(client, admin_token: str, *, name: str, test_ids: list[str]):
|
|
r = client.post(
|
|
"/api/v1/scenario-templates",
|
|
headers=_bearer(admin_token),
|
|
json={
|
|
"name": name,
|
|
"description": f"auto-{name}",
|
|
"test_template_ids": test_ids,
|
|
},
|
|
)
|
|
assert r.status_code == 201, r.get_data(as_text=True)
|
|
return r.get_json()
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def catalogue(app, admin):
|
|
"""Pre-seeded templates + scenarios so tests can reference them by id."""
|
|
with app.test_client() as c:
|
|
tok = _login(c, admin["email"], admin["password"])
|
|
t1 = _make_test_template(c, tok, name="cat-test-1", mitre="T1059")
|
|
t2 = _make_test_template(
|
|
c, tok, name="cat-test-2", mitre="T1059.001"
|
|
)
|
|
t3 = _make_test_template(c, tok, name="cat-test-3", mitre="T1059")
|
|
sc_one = _make_scenario(
|
|
c, tok, name="cat-scenario-A", test_ids=[t1["id"], t2["id"], t3["id"]]
|
|
)
|
|
sc_solo = _make_scenario(c, tok, name="cat-scenario-B", test_ids=[t1["id"]])
|
|
return {
|
|
"tests": {"t1": t1, "t2": t2, "t3": t3},
|
|
"scenarios": {"a": sc_one, "b": sc_solo},
|
|
}
|
|
|
|
|
|
# --------------------------------------------------------------------- users --
|
|
|
|
|
|
def _invite_user(client, admin_token: str, prefix: str, group_codes: list[str]) -> dict:
|
|
"""Invite a user pre-bound to a freshly-minted group with the listed perm codes."""
|
|
grp = client.post(
|
|
"/api/v1/groups",
|
|
headers=_bearer(admin_token),
|
|
json={"name": f"{prefix}-grp-{secrets.token_hex(2)}"},
|
|
).get_json()
|
|
r_set = client.put(
|
|
f"/api/v1/groups/{grp['id']}/permissions",
|
|
headers=_bearer(admin_token),
|
|
json={"codes": group_codes},
|
|
)
|
|
assert r_set.status_code == 200, r_set.get_data(as_text=True)
|
|
|
|
email = _unique_email(prefix)
|
|
password = "Pass1234!"
|
|
inv = client.post(
|
|
"/api/v1/invitations",
|
|
headers=_bearer(admin_token),
|
|
json={"email_hint": email, "group_ids": [grp["id"]]},
|
|
)
|
|
assert inv.status_code == 201, inv.get_data(as_text=True)
|
|
accept_token = inv.get_json()["token"]
|
|
r = client.post(
|
|
f"/api/v1/invitations/accept/{accept_token}",
|
|
json={"email": email, "password": password},
|
|
)
|
|
assert r.status_code == 201, r.get_data(as_text=True)
|
|
me_token = _login(client, email, password)
|
|
me = client.get("/api/v1/auth/me", headers=_bearer(me_token)).get_json()
|
|
return {"email": email, "password": password, "token": me_token, "id": me["id"]}
|
|
|
|
|
|
@pytest.fixture()
|
|
def red_user(client, admin_token):
|
|
return _invite_user(
|
|
client,
|
|
admin_token,
|
|
"red",
|
|
[
|
|
"mission.read",
|
|
"mission.create",
|
|
"mission.update",
|
|
"mission.archive",
|
|
"mission.write_red_fields",
|
|
],
|
|
)
|
|
|
|
|
|
@pytest.fixture()
|
|
def blue_user(client, admin_token):
|
|
return _invite_user(
|
|
client,
|
|
admin_token,
|
|
"blue",
|
|
["mission.read", "mission.write_blue_fields"],
|
|
)
|
|
|
|
|
|
@pytest.fixture()
|
|
def reader_user(client, admin_token):
|
|
"""A user with mission.read only — for "non-member can't see" checks."""
|
|
return _invite_user(client, admin_token, "reader", ["mission.read"])
|
|
|
|
|
|
@pytest.fixture()
|
|
def noperm_user(client, admin_token):
|
|
"""A user with no mission perms at all."""
|
|
return _invite_user(client, admin_token, "noperm", [])
|
|
|
|
|
|
# ================================================================ snapshot ==
|
|
|
|
|
|
def test_create_mission_snapshots_scenarios_and_tests(client, admin_token, catalogue):
|
|
r = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={
|
|
"name": "snapshot-fidelity",
|
|
"client_target": "Acme Corp",
|
|
"description_md": "## ROE\n- approved\n",
|
|
"scenario_template_ids": [catalogue["scenarios"]["a"]["id"]],
|
|
},
|
|
)
|
|
assert r.status_code == 201, r.get_data(as_text=True)
|
|
body = r.get_json()
|
|
assert body["status"] == "draft"
|
|
assert body["visibility_mode"] == "whitebox"
|
|
assert body["scenarios_count"] == 1
|
|
assert body["tests_count"] == 3
|
|
assert body["members_count"] == 0 # admin creator is not auto-added
|
|
sc = body["scenarios"][0]
|
|
assert sc["position"] == 0
|
|
assert sc["snapshot_name"] == "cat-scenario-A"
|
|
names_in_order = [t["snapshot_name"] for t in sc["tests"]]
|
|
assert names_in_order == ["cat-test-1", "cat-test-2", "cat-test-3"]
|
|
# MITRE denormalised into the snapshot
|
|
t1 = next(t for t in sc["tests"] if t["snapshot_name"] == "cat-test-1")
|
|
kinds = [(tag["kind"], tag["external_id"]) for tag in t1["mitre_tags"]]
|
|
assert kinds == [("technique", "T1059")]
|
|
|
|
|
|
def test_snapshot_is_frozen_after_template_edits(client, admin_token, catalogue):
|
|
create = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={
|
|
"name": "frozen-after-edits",
|
|
"scenario_template_ids": [catalogue["scenarios"]["b"]["id"]],
|
|
},
|
|
)
|
|
assert create.status_code == 201
|
|
mission_id = create.get_json()["id"]
|
|
# Mutate the source test_template: rename + change MITRE
|
|
edit = client.put(
|
|
f"/api/v1/test-templates/{catalogue['tests']['t1']['id']}",
|
|
headers=_bearer(admin_token),
|
|
json={
|
|
"name": "RENAMED-AFTER-SNAPSHOT",
|
|
"mitre_tags": [{"kind": "tactic", "external_id": "TA0002"}],
|
|
},
|
|
)
|
|
assert edit.status_code == 200
|
|
# Mission still sees the pre-edit snapshot
|
|
again = client.get(
|
|
f"/api/v1/missions/{mission_id}", headers=_bearer(admin_token)
|
|
).get_json()
|
|
sc = again["scenarios"][0]
|
|
assert sc["tests"][0]["snapshot_name"] == "cat-test-1"
|
|
assert [(t["kind"], t["external_id"]) for t in sc["tests"][0]["mitre_tags"]] == [
|
|
("technique", "T1059")
|
|
]
|
|
# Revert the rename so other tests still find the original name
|
|
client.put(
|
|
f"/api/v1/test-templates/{catalogue['tests']['t1']['id']}",
|
|
headers=_bearer(admin_token),
|
|
json={
|
|
"name": "cat-test-1",
|
|
"mitre_tags": [{"kind": "technique", "external_id": "T1059"}],
|
|
},
|
|
)
|
|
|
|
|
|
def test_create_mission_rejects_unknown_scenario(client, admin_token):
|
|
r = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={
|
|
"name": "bad-ref",
|
|
"scenario_template_ids": ["00000000-0000-0000-0000-000000000099"],
|
|
},
|
|
)
|
|
assert r.status_code == 400
|
|
assert r.get_json()["error"] == "unknown_scenario_template"
|
|
|
|
|
|
def test_create_mission_rejects_soft_deleted_scenario(client, admin_token):
|
|
# Build a scenario, soft-delete it, then try to snapshot — must 400 with
|
|
# `unknown_scenario_template` so we don't silently freeze a tombstoned
|
|
# template into a new mission.
|
|
t = _make_test_template(client, admin_token, name="sd-rejection-t")
|
|
sc = client.post(
|
|
"/api/v1/scenario-templates",
|
|
headers=_bearer(admin_token),
|
|
json={"name": "sd-rejection-sc", "test_template_ids": [t["id"]]},
|
|
).get_json()
|
|
del_r = client.delete(
|
|
f"/api/v1/scenario-templates/{sc['id']}", headers=_bearer(admin_token)
|
|
)
|
|
assert del_r.status_code == 200
|
|
r = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={
|
|
"name": "sd-rejection-mission",
|
|
"scenario_template_ids": [sc["id"]],
|
|
},
|
|
)
|
|
assert r.status_code == 400
|
|
assert r.get_json()["error"] == "unknown_scenario_template"
|
|
|
|
|
|
def test_create_mission_validates_dates(client, admin_token):
|
|
r = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={
|
|
"name": "date-flip",
|
|
"date_start": "2026-06-01",
|
|
"date_end": "2026-05-01",
|
|
},
|
|
)
|
|
assert r.status_code == 400
|
|
assert "date_end" in r.get_json().get("message", "").lower()
|
|
|
|
|
|
# =================================================== membership visibility ==
|
|
|
|
|
|
def test_non_admin_creator_auto_added(client, red_user, catalogue):
|
|
r = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(red_user["token"]),
|
|
json={
|
|
"name": "red-self-created",
|
|
"scenario_template_ids": [catalogue["scenarios"]["b"]["id"]],
|
|
},
|
|
)
|
|
assert r.status_code == 201, r.get_data(as_text=True)
|
|
body = r.get_json()
|
|
assert body["members_count"] == 1
|
|
assert body["members"][0]["user_id"] == red_user["id"]
|
|
assert body["members"][0]["role_hint"] == "red"
|
|
# And the red user can see it back via /missions
|
|
r2 = client.get("/api/v1/missions", headers=_bearer(red_user["token"]))
|
|
ids = [it["id"] for it in r2.get_json()["items"]]
|
|
assert body["id"] in ids
|
|
|
|
|
|
def test_non_admin_cannot_see_missions_they_are_not_members_of(
|
|
client, admin_token, reader_user, catalogue
|
|
):
|
|
# Admin creates a mission with NO members
|
|
create = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={
|
|
"name": "hidden-from-reader",
|
|
"scenario_template_ids": [catalogue["scenarios"]["b"]["id"]],
|
|
},
|
|
)
|
|
assert create.status_code == 201
|
|
mid = create.get_json()["id"]
|
|
# Reader has mission.read but is not a member → empty list + 404
|
|
r_list = client.get("/api/v1/missions", headers=_bearer(reader_user["token"]))
|
|
ids = [it["id"] for it in r_list.get_json()["items"]]
|
|
assert mid not in ids
|
|
r_get = client.get(
|
|
f"/api/v1/missions/{mid}", headers=_bearer(reader_user["token"])
|
|
)
|
|
assert r_get.status_code == 404
|
|
|
|
|
|
def test_non_member_get_returns_404_not_403(client, admin_token, reader_user):
|
|
"""Existence leak guard: non-members should see 404, not 403."""
|
|
create = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={"name": "stealth-mission"},
|
|
)
|
|
mid = create.get_json()["id"]
|
|
r = client.get(f"/api/v1/missions/{mid}", headers=_bearer(reader_user["token"]))
|
|
assert r.status_code == 404
|
|
|
|
|
|
# ===================================================== perm gating ==========
|
|
|
|
|
|
def test_create_requires_mission_create_perm(client, blue_user):
|
|
"""Blue team users (no mission.create) cannot create missions."""
|
|
r = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(blue_user["token"]),
|
|
json={"name": "no-perm"},
|
|
)
|
|
assert r.status_code == 403
|
|
|
|
|
|
def test_list_requires_mission_read_perm(client, noperm_user):
|
|
r = client.get("/api/v1/missions", headers=_bearer(noperm_user["token"]))
|
|
assert r.status_code == 403
|
|
|
|
|
|
def test_transition_perm_gate_runs_before_payload_parse(client, blue_user):
|
|
"""A user without `mission.update` or `mission.archive` should see 403,
|
|
not 400, even when posting a malformed body — otherwise the endpoint's
|
|
shape leaks via the validation error message."""
|
|
# blue_user only has mission.read + mission.write_blue_fields, so neither
|
|
# mission.update nor mission.archive is held.
|
|
r = client.post(
|
|
"/api/v1/missions/00000000-0000-0000-0000-000000000000/transition",
|
|
headers=_bearer(blue_user["token"]),
|
|
json={"status": "garbage-not-a-valid-shape"},
|
|
)
|
|
assert r.status_code == 403
|
|
|
|
|
|
def test_search_treats_wildcards_as_literals(client, admin_token):
|
|
"""User-typed `%` and `_` must NOT act as SQL LIKE wildcards."""
|
|
client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={"name": "no-wildcards-here"},
|
|
)
|
|
# Without escaping, `?q=%` would match every mission. With escaping, it
|
|
# only matches names that literally contain `%`.
|
|
r = client.get("/api/v1/missions?q=%25", headers=_bearer(admin_token))
|
|
assert r.status_code == 200
|
|
names = [it["name"] for it in r.get_json()["items"]]
|
|
assert "no-wildcards-here" not in names
|
|
|
|
|
|
def test_archive_requires_mission_archive_not_just_update(client, admin_token):
|
|
"""A user with mission.update but no mission.archive cannot archive."""
|
|
# blue_user only has mission.read + mission.write_blue_fields — no update either.
|
|
# We'll craft a user with update-only here.
|
|
update_only = _invite_user(client, admin_token, "u-only", ["mission.read", "mission.update"])
|
|
create = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={
|
|
"name": "to-archive",
|
|
"members": [{"user_id": update_only["id"], "role_hint": "red"}],
|
|
},
|
|
)
|
|
assert create.status_code == 201
|
|
mid = create.get_json()["id"]
|
|
# update-only can transition to in_progress (mission.update is enough)
|
|
r1 = client.post(
|
|
f"/api/v1/missions/{mid}/transition",
|
|
headers=_bearer(update_only["token"]),
|
|
json={"status": "in_progress"},
|
|
)
|
|
assert r1.status_code == 200
|
|
# … but cannot archive
|
|
r2 = client.post(
|
|
f"/api/v1/missions/{mid}/transition",
|
|
headers=_bearer(update_only["token"]),
|
|
json={"status": "archived"},
|
|
)
|
|
assert r2.status_code == 403
|
|
|
|
|
|
# ==================================================== status transitions ==
|
|
|
|
|
|
def test_valid_transition_chain(client, admin_token):
|
|
create = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={"name": "chain"},
|
|
)
|
|
mid = create.get_json()["id"]
|
|
for target in ("in_progress", "completed", "archived"):
|
|
r = client.post(
|
|
f"/api/v1/missions/{mid}/transition",
|
|
headers=_bearer(admin_token),
|
|
json={"status": target},
|
|
)
|
|
assert r.status_code == 200, (target, r.get_data(as_text=True))
|
|
assert r.get_json()["status"] == target
|
|
|
|
|
|
def test_invalid_transition_409(client, admin_token):
|
|
create = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={"name": "invalid-jump"},
|
|
)
|
|
mid = create.get_json()["id"]
|
|
# draft → completed is not allowed (must pass through in_progress)
|
|
r = client.post(
|
|
f"/api/v1/missions/{mid}/transition",
|
|
headers=_bearer(admin_token),
|
|
json={"status": "completed"},
|
|
)
|
|
assert r.status_code == 409
|
|
assert r.get_json()["error"] == "invalid_transition"
|
|
|
|
|
|
def test_unknown_target_status_400(client, admin_token):
|
|
create = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={"name": "bad-status"},
|
|
)
|
|
mid = create.get_json()["id"]
|
|
r = client.post(
|
|
f"/api/v1/missions/{mid}/transition",
|
|
headers=_bearer(admin_token),
|
|
json={"status": "delivered"},
|
|
)
|
|
assert r.status_code == 400
|
|
|
|
|
|
def test_idempotent_same_status_transition(client, admin_token):
|
|
create = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={"name": "idempotent"},
|
|
)
|
|
mid = create.get_json()["id"]
|
|
r = client.post(
|
|
f"/api/v1/missions/{mid}/transition",
|
|
headers=_bearer(admin_token),
|
|
json={"status": "draft"},
|
|
)
|
|
assert r.status_code == 200
|
|
assert r.get_json()["status"] == "draft"
|
|
|
|
|
|
# ============================================================ members =====
|
|
|
|
|
|
def test_set_members_replaces_full_set(client, admin_token, red_user, blue_user):
|
|
create = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={
|
|
"name": "members-replace",
|
|
"members": [{"user_id": red_user["id"], "role_hint": "red"}],
|
|
},
|
|
)
|
|
mid = create.get_json()["id"]
|
|
r = client.put(
|
|
f"/api/v1/missions/{mid}/members",
|
|
headers=_bearer(admin_token),
|
|
json={"members": [{"user_id": blue_user["id"], "role_hint": "blue"}]},
|
|
)
|
|
assert r.status_code == 200, r.get_data(as_text=True)
|
|
body = r.get_json()
|
|
assert body["members_count"] == 1
|
|
assert body["members"][0]["user_id"] == blue_user["id"]
|
|
# And red can no longer see it
|
|
r_red = client.get(
|
|
f"/api/v1/missions/{mid}", headers=_bearer(red_user["token"])
|
|
)
|
|
assert r_red.status_code == 404
|
|
|
|
|
|
def test_set_members_rejects_unknown_user(client, admin_token):
|
|
create = client.post(
|
|
"/api/v1/missions", headers=_bearer(admin_token), json={"name": "ghost-member"}
|
|
)
|
|
mid = create.get_json()["id"]
|
|
r = client.put(
|
|
f"/api/v1/missions/{mid}/members",
|
|
headers=_bearer(admin_token),
|
|
json={
|
|
"members": [
|
|
{
|
|
"user_id": "00000000-0000-0000-0000-000000000123",
|
|
"role_hint": "red",
|
|
}
|
|
]
|
|
},
|
|
)
|
|
assert r.status_code == 400
|
|
assert r.get_json()["error"] == "unknown_user"
|
|
|
|
|
|
def test_set_members_rejects_bad_role_hint(client, admin_token, red_user):
|
|
create = client.post(
|
|
"/api/v1/missions", headers=_bearer(admin_token), json={"name": "bad-hint"}
|
|
)
|
|
mid = create.get_json()["id"]
|
|
r = client.put(
|
|
f"/api/v1/missions/{mid}/members",
|
|
headers=_bearer(admin_token),
|
|
json={"members": [{"user_id": red_user["id"], "role_hint": "yellow"}]},
|
|
)
|
|
assert r.status_code == 400
|
|
|
|
|
|
# ==================================================== add scenarios ======
|
|
|
|
|
|
def test_add_scenarios_appends_at_end(client, admin_token, catalogue):
|
|
create = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={
|
|
"name": "appendable",
|
|
"scenario_template_ids": [catalogue["scenarios"]["b"]["id"]],
|
|
},
|
|
)
|
|
mid = create.get_json()["id"]
|
|
r = client.post(
|
|
f"/api/v1/missions/{mid}/scenarios",
|
|
headers=_bearer(admin_token),
|
|
json={"scenario_template_ids": [catalogue["scenarios"]["a"]["id"]]},
|
|
)
|
|
assert r.status_code == 200, r.get_data(as_text=True)
|
|
body = r.get_json()
|
|
assert body["scenarios_count"] == 2
|
|
positions = [sc["position"] for sc in body["scenarios"]]
|
|
assert positions == [0, 1]
|
|
# Second scenario lands at position 1
|
|
sc1 = next(sc for sc in body["scenarios"] if sc["position"] == 1)
|
|
assert sc1["snapshot_name"] == "cat-scenario-A"
|
|
assert len(sc1["tests"]) == 3
|
|
|
|
|
|
# ============================================================= delete =====
|
|
|
|
|
|
def test_soft_delete_hides_from_list(client, admin_token):
|
|
create = client.post(
|
|
"/api/v1/missions", headers=_bearer(admin_token), json={"name": "to-delete"}
|
|
)
|
|
mid = create.get_json()["id"]
|
|
r_del = client.delete(f"/api/v1/missions/{mid}", headers=_bearer(admin_token))
|
|
assert r_del.status_code == 200
|
|
r_list = client.get("/api/v1/missions", headers=_bearer(admin_token))
|
|
ids = [it["id"] for it in r_list.get_json()["items"]]
|
|
assert mid not in ids
|
|
# include_deleted=true brings it back (admin only)
|
|
r_list2 = client.get(
|
|
"/api/v1/missions?include_deleted=true", headers=_bearer(admin_token)
|
|
)
|
|
ids2 = [it["id"] for it in r_list2.get_json()["items"]]
|
|
assert mid in ids2
|
|
|
|
|
|
def test_include_deleted_forbidden_for_non_admin(client, red_user):
|
|
r = client.get(
|
|
"/api/v1/missions?include_deleted=true", headers=_bearer(red_user["token"])
|
|
)
|
|
assert r.status_code == 403
|
|
|
|
|
|
# ============================================================ update ======
|
|
|
|
|
|
def test_update_metadata_partial(client, admin_token):
|
|
create = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={
|
|
"name": "to-rename",
|
|
"client_target": "X",
|
|
"date_start": "2026-06-01",
|
|
"date_end": "2026-06-10",
|
|
},
|
|
)
|
|
mid = create.get_json()["id"]
|
|
r = client.put(
|
|
f"/api/v1/missions/{mid}",
|
|
headers=_bearer(admin_token),
|
|
json={"name": "renamed", "client_target": None},
|
|
)
|
|
assert r.status_code == 200, r.get_data(as_text=True)
|
|
body = r.get_json()
|
|
assert body["name"] == "renamed"
|
|
assert body["client_target"] is None
|
|
# date fields untouched
|
|
assert body["date_start"] == "2026-06-01"
|
|
assert body["date_end"] == "2026-06-10"
|
|
|
|
|
|
def test_update_rejects_inverted_dates(client, admin_token):
|
|
create = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={
|
|
"name": "invert",
|
|
"date_start": "2026-06-01",
|
|
"date_end": "2026-06-10",
|
|
},
|
|
)
|
|
mid = create.get_json()["id"]
|
|
r = client.put(
|
|
f"/api/v1/missions/{mid}",
|
|
headers=_bearer(admin_token),
|
|
json={"date_end": "2026-05-01"},
|
|
)
|
|
assert r.status_code == 400
|