User feedback after the M7 ship: blue team's Excel workflow had 5 extra
fields we didn't capture. Per-test page also doesn't match their
workflow — they need a tabular view, one table per scenario.
Spec
- tasks/spec.md amended (`revised: 2026-05-15`): §4 in-scope, §F6, §8
model bullet. §F6 now pins the column matrix, single-row-edit
semantics, Esc-cancel, blur-confirm, and reconciles detection_level
as a pill inside the Commentaires cell (no 8th column).
- tasks/todo.md M7 section grew an "Amendement 2026-05-15" sub-block
tracking backend ☑ and frontend ☐.
Backend
- Migration c2a8f4b1d6e9: 5 nullable columns on mission_tests
(blue_log_source, blue_siem_logs, blue_incident_at,
blue_incident_number, blue_incident_recipient_email).
- _BLUE_FIELDS extended; update_mission_test_fields propagates each
field; MissionTestDetailView + MissionTestView (the nested view in
GET /missions/{id}) surface every annotation field, plus
last_actor_*, updated_at, detection_level_key — O(1) batch lookup
for detection-level keys and last-actor users keeps it scalable.
- UpdateMissionTestPayload accepts each field with length caps
(120/200_000/120/255).
Reviewer follow-ups applied
- blue_incident_at + executed_at now reject naïve datetimes
(_ensure_aware_datetime) — Postgres would otherwise interpret
them in the session TZ, defeating the M7 verbatim-time contract.
- blue_incident_recipient_email goes through a permissive RFC-shape
regex (_validate_email_shape) so internal/lab TLDs like .local
/ .corp / .test pass — Pydantic EmailStr is too strict (lessons.md
M2 trap).
- Project-wide: switched `e.errors()` to
`e.errors(include_context=False, include_url=False)` because the
AfterValidator-raised ValueError lands in ctx and Flask can't
serialize it.
Tests
- 5 new pytest cases: blue user writes the 5 new fields, red user is
individually 403'd on each, round-trip via GET, naïve datetime
rejected, email shape validated (.local accepted, bad shape 400).
- 138 pytest green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1021 lines
35 KiB
Python
1021 lines
35 KiB
Python
"""M7 — per-test execution, evidence upload, activity polling.
|
|
|
|
Fixture stack mirrors `test_missions.py` so we can reuse the test_template/
|
|
scenario_template catalogue and the red/blue/reader user invitations. M7
|
|
adds the assumption that detection_levels are seeded (boot does this for
|
|
the live API; we re-seed inside the module fixture to cover the truncated
|
|
state).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import io
|
|
import json
|
|
import secrets
|
|
import urllib.parse
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import pytest
|
|
from sqlalchemy import text
|
|
|
|
from app.core.install_token import regenerate_install_token
|
|
from app.main import create_app
|
|
from app.services import detection_levels as detection_svc
|
|
from app.services import mitre_seed as mitre_svc
|
|
|
|
|
|
_MINIMAL_BUNDLE = {
|
|
"type": "bundle",
|
|
"id": "bundle--00000000-0000-0000-0000-000000000007",
|
|
"spec_version": "2.1",
|
|
"objects": [
|
|
{
|
|
"type": "x-mitre-tactic",
|
|
"id": "x-mitre-tactic--ta0002",
|
|
"name": "Execution",
|
|
"x_mitre_shortname": "execution",
|
|
"external_references": [
|
|
{"source_name": "mitre-attack", "external_id": "TA0002"}
|
|
],
|
|
},
|
|
{
|
|
"type": "attack-pattern",
|
|
"id": "attack-pattern--t1059",
|
|
"name": "Command and Scripting Interpreter",
|
|
"kill_chain_phases": [
|
|
{"kill_chain_name": "mitre-attack", "phase_name": "execution"}
|
|
],
|
|
"external_references": [
|
|
{"source_name": "mitre-attack", "external_id": "T1059"}
|
|
],
|
|
},
|
|
],
|
|
}
|
|
|
|
|
|
def _truncate_all(engine):
|
|
with engine.begin() as conn:
|
|
conn.execute(
|
|
text(
|
|
"TRUNCATE mission_test_mitre_tags, mission_tests, "
|
|
"mission_scenarios, mission_categories, mission_members, "
|
|
"missions RESTART IDENTITY CASCADE"
|
|
)
|
|
)
|
|
conn.execute(
|
|
text(
|
|
"TRUNCATE scenario_template_tests, scenario_templates, "
|
|
"test_template_mitre_tags, test_templates "
|
|
"RESTART IDENTITY CASCADE"
|
|
)
|
|
)
|
|
conn.execute(
|
|
text(
|
|
"TRUNCATE users, refresh_tokens, invitations, invitation_groups, "
|
|
"user_groups, group_permissions, permissions, settings, groups "
|
|
"RESTART IDENTITY CASCADE"
|
|
)
|
|
)
|
|
conn.execute(
|
|
text(
|
|
"TRUNCATE mitre_technique_tactics, mitre_subtechniques, "
|
|
"mitre_techniques, mitre_tactics RESTART IDENTITY CASCADE"
|
|
)
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def app(db_engine_or_skip, tmp_path_factory, monkeypatch_module):
|
|
_truncate_all(db_engine_or_skip)
|
|
# Re-seed catalogues that boot/seed handles in production but `_truncate_all`
|
|
# has just wiped.
|
|
bundle_path = tmp_path_factory.mktemp("m7") / "stix.json"
|
|
bundle_path.write_text(json.dumps(_MINIMAL_BUNDLE))
|
|
mitre_svc.seed_mitre(source=bundle_path, expected_sha256=None)
|
|
detection_svc.seed_detection_levels()
|
|
# Point the evidence dir at a tmp location so test uploads don't pollute /data.
|
|
evidence_root = tmp_path_factory.mktemp("evidence")
|
|
monkeypatch_module.setattr(
|
|
"app.core.config.settings.EVIDENCE_DIR", str(evidence_root)
|
|
)
|
|
flask_app = create_app()
|
|
flask_app.config.update(TESTING=True)
|
|
flask_app.config["EVIDENCE_ROOT"] = str(evidence_root)
|
|
return flask_app
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def monkeypatch_module():
|
|
"""Module-scoped monkeypatch — pytest's built-in is function-scoped only."""
|
|
from _pytest.monkeypatch import MonkeyPatch # noqa: PLC0415
|
|
|
|
mp = MonkeyPatch()
|
|
yield mp
|
|
mp.undo()
|
|
|
|
|
|
@pytest.fixture()
|
|
def client(app):
|
|
return app.test_client()
|
|
|
|
|
|
def _unique_email(prefix: str) -> str:
|
|
return f"{prefix}-{secrets.token_hex(4)}@metamorph.local"
|
|
|
|
|
|
def _bearer(token: str) -> dict[str, str]:
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
def _login(client, email: str, password: str) -> str:
|
|
r = client.post("/api/v1/auth/login", json={"email": email, "password": password})
|
|
assert r.status_code == 200, r.get_data(as_text=True)
|
|
return r.get_json()["access_token"]
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def admin(app):
|
|
token = regenerate_install_token()
|
|
email = _unique_email("admin")
|
|
password = "AdminPass1234!"
|
|
with app.test_client() as c:
|
|
r = c.post(
|
|
"/api/v1/setup",
|
|
json={"install_token": token, "email": email, "password": password},
|
|
)
|
|
assert r.status_code == 201, r.get_data(as_text=True)
|
|
return {"email": email, "password": password}
|
|
|
|
|
|
@pytest.fixture()
|
|
def admin_token(client, admin) -> str:
|
|
return _login(client, admin["email"], admin["password"])
|
|
|
|
|
|
# --------------------------------------------------------------- catalogue --
|
|
|
|
|
|
def _make_test_template(client, admin_token: str, name: str):
|
|
body = {
|
|
"name": name,
|
|
"description": "auto",
|
|
"objective": "do thing",
|
|
"procedure_md": f"# {name}",
|
|
"expected_result_red_md": "red expectation",
|
|
"expected_detection_blue_md": "blue expectation",
|
|
"opsec_level": "medium",
|
|
"tags": [],
|
|
"expected_iocs": [],
|
|
"mitre_tags": [{"kind": "technique", "external_id": "T1059"}],
|
|
}
|
|
r = client.post("/api/v1/test-templates", headers=_bearer(admin_token), json=body)
|
|
assert r.status_code == 201, r.get_data(as_text=True)
|
|
return r.get_json()
|
|
|
|
|
|
def _make_scenario(client, admin_token: str, name: str, test_ids: list[str]):
|
|
r = client.post(
|
|
"/api/v1/scenario-templates",
|
|
headers=_bearer(admin_token),
|
|
json={"name": name, "description": None, "test_template_ids": test_ids},
|
|
)
|
|
assert r.status_code == 201, r.get_data(as_text=True)
|
|
return r.get_json()
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def catalogue(app, admin):
|
|
with app.test_client() as c:
|
|
tok = _login(c, admin["email"], admin["password"])
|
|
t1 = _make_test_template(c, tok, "exec-test")
|
|
sc = _make_scenario(c, tok, "exec-scenario", [t1["id"]])
|
|
return {"test": t1, "scenario": sc}
|
|
|
|
|
|
# ----------------------------------------------------------------- users --
|
|
|
|
|
|
def _invite_user(client, admin_token: str, prefix: str, group_codes: list[str]) -> dict:
|
|
grp = client.post(
|
|
"/api/v1/groups",
|
|
headers=_bearer(admin_token),
|
|
json={"name": f"{prefix}-grp-{secrets.token_hex(2)}"},
|
|
).get_json()
|
|
r_set = client.put(
|
|
f"/api/v1/groups/{grp['id']}/permissions",
|
|
headers=_bearer(admin_token),
|
|
json={"codes": group_codes},
|
|
)
|
|
assert r_set.status_code == 200, r_set.get_data(as_text=True)
|
|
|
|
email = _unique_email(prefix)
|
|
password = "Pass1234!"
|
|
inv = client.post(
|
|
"/api/v1/invitations",
|
|
headers=_bearer(admin_token),
|
|
json={"email_hint": email, "group_ids": [grp["id"]]},
|
|
)
|
|
assert inv.status_code == 201, inv.get_data(as_text=True)
|
|
accept_token = inv.get_json()["token"]
|
|
r = client.post(
|
|
f"/api/v1/invitations/accept/{accept_token}",
|
|
json={"email": email, "password": password},
|
|
)
|
|
assert r.status_code == 201, r.get_data(as_text=True)
|
|
tok = _login(client, email, password)
|
|
me = client.get("/api/v1/auth/me", headers=_bearer(tok)).get_json()
|
|
return {"email": email, "password": password, "token": tok, "id": me["id"]}
|
|
|
|
|
|
@pytest.fixture()
|
|
def red_user(client, admin_token):
|
|
return _invite_user(
|
|
client,
|
|
admin_token,
|
|
"red",
|
|
[
|
|
"mission.read",
|
|
"mission.create",
|
|
"mission.update",
|
|
"mission.write_red_fields",
|
|
"detection_level.read",
|
|
],
|
|
)
|
|
|
|
|
|
@pytest.fixture()
|
|
def blue_user(client, admin_token):
|
|
return _invite_user(
|
|
client,
|
|
admin_token,
|
|
"blue",
|
|
["mission.read", "mission.write_blue_fields", "detection_level.read"],
|
|
)
|
|
|
|
|
|
@pytest.fixture()
|
|
def reader_user(client, admin_token):
|
|
return _invite_user(client, admin_token, "reader", ["mission.read"])
|
|
|
|
|
|
# Helper: bootstrap a mission with red+blue assigned and snapshot the catalogue.
|
|
def _make_mission(client, admin_token: str, *, name: str, scenario_id: str,
|
|
red_id: str | None = None, blue_id: str | None = None) -> dict:
|
|
members = []
|
|
if red_id:
|
|
members.append({"user_id": red_id, "role_hint": "red"})
|
|
if blue_id:
|
|
members.append({"user_id": blue_id, "role_hint": "blue"})
|
|
r = client.post(
|
|
"/api/v1/missions",
|
|
headers=_bearer(admin_token),
|
|
json={
|
|
"name": name,
|
|
"client_target": "Acme",
|
|
"scenario_template_ids": [scenario_id],
|
|
"members": members,
|
|
},
|
|
)
|
|
assert r.status_code == 201, r.get_data(as_text=True)
|
|
return r.get_json()
|
|
|
|
|
|
def _first_test_id(mission: dict) -> str:
|
|
return mission["scenarios"][0]["tests"][0]["id"]
|
|
|
|
|
|
# ================================================================ detection ==
|
|
|
|
|
|
def test_detection_levels_seeded_and_listed(client, admin_token):
|
|
r = client.get("/api/v1/detection-levels", headers=_bearer(admin_token))
|
|
assert r.status_code == 200
|
|
body = r.get_json()
|
|
keys = [it["key"] for it in body["items"]]
|
|
# All four defaults must be present, in position order.
|
|
assert keys == ["detected_blocked", "detected_alert", "logged_only", "not_detected"]
|
|
# The default flag is on `not_detected` per the seed.
|
|
defaults = [it for it in body["items"] if it["is_default"]]
|
|
assert [d["key"] for d in defaults] == ["not_detected"]
|
|
# All are flagged system so M8 CRUD can distinguish operator-added levels.
|
|
assert all(it["is_system"] for it in body["items"])
|
|
|
|
|
|
def test_detection_levels_requires_perm(client, admin_token, reader_user):
|
|
# The reader_user fixture has mission.read only — no detection_level.read.
|
|
r = client.get(
|
|
"/api/v1/detection-levels", headers=_bearer(reader_user["token"])
|
|
)
|
|
assert r.status_code == 403
|
|
|
|
|
|
# ===================================================================== test ==
|
|
|
|
|
|
def test_get_mission_test_returns_snapshot_state(
|
|
client, admin_token, catalogue, red_user
|
|
):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-get",
|
|
scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"],
|
|
)
|
|
test_id = _first_test_id(mission)
|
|
r = client.get(
|
|
f"/api/v1/missions/{mission['id']}/tests/{test_id}",
|
|
headers=_bearer(red_user["token"]),
|
|
)
|
|
assert r.status_code == 200, r.get_data(as_text=True)
|
|
body = r.get_json()
|
|
assert body["state"] == "pending"
|
|
assert body["red_command"] is None
|
|
assert body["blue_comment_md"] is None
|
|
assert body["evidence"] == []
|
|
assert body["mission_id"] == mission["id"]
|
|
|
|
|
|
def test_red_user_writes_red_fields(client, admin_token, catalogue, red_user):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-red-write",
|
|
scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
r = client.put(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(red_user["token"]),
|
|
json={
|
|
"red_command": "powershell -enc ZAB1AG0AeQA=",
|
|
"red_output": "{stdout}",
|
|
"red_comment_md": "executed via SYSTEM",
|
|
},
|
|
)
|
|
assert r.status_code == 200, r.get_data(as_text=True)
|
|
body = r.get_json()
|
|
assert body["red_command"] == "powershell -enc ZAB1AG0AeQA="
|
|
assert body["red_comment_md"] == "executed via SYSTEM"
|
|
assert body["last_actor_email"] == red_user["email"]
|
|
|
|
|
|
def test_red_user_cannot_write_blue_fields(client, admin_token, catalogue, red_user):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-red-blocked-blue",
|
|
scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
r = client.put(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(red_user["token"]),
|
|
json={"blue_comment_md": "should be blocked"},
|
|
)
|
|
assert r.status_code == 403, r.get_data(as_text=True)
|
|
|
|
|
|
def test_blue_user_cannot_write_red_fields(client, admin_token, catalogue, blue_user):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-blue-blocked-red",
|
|
scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
r = client.put(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(blue_user["token"]),
|
|
json={"red_command": "echo nope"},
|
|
)
|
|
assert r.status_code == 403
|
|
|
|
|
|
def test_blue_user_writes_new_blue_review_fields(
|
|
client, admin_token, catalogue, blue_user
|
|
):
|
|
"""Post-M7 feedback fields: log source, SIEM excerpt, cyber-incident
|
|
sub-record. All are blue-side, gated by `mission.write_blue_fields`."""
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-blue-extras",
|
|
scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
body = {
|
|
"blue_log_source": "EDR · CrowdStrike Falcon",
|
|
"blue_siem_logs": "2026-05-15 10:30:42 WIN-DC01 evt=4688 cmd=powershell -enc ...",
|
|
"blue_incident_at": "2026-05-15T11:00:00+00:00",
|
|
"blue_incident_number": "INC-2026-1234",
|
|
"blue_incident_recipient_email": "soc-night@metamorph.local",
|
|
}
|
|
r = client.put(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(blue_user["token"]),
|
|
json=body,
|
|
)
|
|
assert r.status_code == 200, r.get_data(as_text=True)
|
|
out = r.get_json()
|
|
assert out["blue_log_source"] == body["blue_log_source"]
|
|
assert out["blue_siem_logs"] == body["blue_siem_logs"]
|
|
assert out["blue_incident_at"].startswith("2026-05-15T11:00:00")
|
|
assert out["blue_incident_number"] == body["blue_incident_number"]
|
|
assert out["blue_incident_recipient_email"] == body["blue_incident_recipient_email"]
|
|
|
|
|
|
def test_red_user_cannot_write_new_blue_review_fields(
|
|
client, admin_token, catalogue, red_user
|
|
):
|
|
"""Each of the five new fields is classified as blue-side; a red-only
|
|
caller must receive 403 individually for each one."""
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-blue-extras-perm",
|
|
scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
bad_bodies = [
|
|
{"blue_log_source": "Firewall"},
|
|
{"blue_siem_logs": "log line"},
|
|
{"blue_incident_at": "2026-05-15T11:00:00+00:00"},
|
|
{"blue_incident_number": "INC-1"},
|
|
{"blue_incident_recipient_email": "x@y.test"},
|
|
]
|
|
for body in bad_bodies:
|
|
r = client.put(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(red_user["token"]),
|
|
json=body,
|
|
)
|
|
assert r.status_code == 403, (body, r.get_data(as_text=True))
|
|
|
|
|
|
def test_blue_incident_at_rejects_naive_datetime(
|
|
client, admin_token, catalogue, blue_user
|
|
):
|
|
"""A naïve datetime (no TZ offset) is rejected with 400 — Postgres would
|
|
otherwise interpret it in the session TZ, defeating the M7 verbatim-time
|
|
contract. Same rule applies to executed_at (covered by a separate red
|
|
test below)."""
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-naive-incident",
|
|
scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
r = client.put(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(blue_user["token"]),
|
|
json={"blue_incident_at": "2026-05-15T11:00:00"}, # no offset
|
|
)
|
|
assert r.status_code == 400, r.get_data(as_text=True)
|
|
|
|
|
|
def test_blue_incident_recipient_email_validates_shape(
|
|
client, admin_token, catalogue, blue_user
|
|
):
|
|
"""Bad-shape email is rejected; well-formed internal address (`.local`,
|
|
`.corp`) is accepted — we deliberately don't use Pydantic EmailStr
|
|
because email-validator rejects internal TLDs."""
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-email-shape",
|
|
scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
bad = client.put(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(blue_user["token"]),
|
|
json={"blue_incident_recipient_email": "not-an-email"},
|
|
)
|
|
assert bad.status_code == 400
|
|
|
|
ok = client.put(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(blue_user["token"]),
|
|
json={"blue_incident_recipient_email": "soc@internal.local"},
|
|
)
|
|
assert ok.status_code == 200
|
|
assert ok.get_json()["blue_incident_recipient_email"] == "soc@internal.local"
|
|
|
|
|
|
def test_blue_review_fields_survive_round_trip_via_get(
|
|
client, admin_token, catalogue, blue_user
|
|
):
|
|
"""After a PUT the same values must come back on a fresh GET — guards
|
|
against a future serializer drift that would silently drop one of the
|
|
new columns from the response."""
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-blue-extras-rt",
|
|
scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
body = {
|
|
"blue_log_source": "Proxy",
|
|
"blue_siem_logs": "raw\n indented\nthird",
|
|
"blue_incident_number": "INC-rt",
|
|
}
|
|
put_r = client.put(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(blue_user["token"]),
|
|
json=body,
|
|
)
|
|
assert put_r.status_code == 200
|
|
get_r = client.get(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(blue_user["token"]),
|
|
)
|
|
assert get_r.status_code == 200
|
|
after = get_r.get_json()
|
|
for k, v in body.items():
|
|
assert after[k] == v, k
|
|
|
|
|
|
def test_blue_user_writes_blue_fields_and_picks_detection_level(
|
|
client, admin_token, catalogue, blue_user
|
|
):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-blue-write",
|
|
scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
# First fetch the detection levels.
|
|
levels = client.get(
|
|
"/api/v1/detection-levels", headers=_bearer(blue_user["token"])
|
|
).get_json()["items"]
|
|
not_detected = next(l for l in levels if l["key"] == "not_detected")
|
|
r = client.put(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(blue_user["token"]),
|
|
json={
|
|
"blue_comment_md": "no detection on SOC",
|
|
"detection_level_id": not_detected["id"],
|
|
},
|
|
)
|
|
assert r.status_code == 200, r.get_data(as_text=True)
|
|
body = r.get_json()
|
|
assert body["blue_comment_md"] == "no detection on SOC"
|
|
assert body["detection_level_id"] == not_detected["id"]
|
|
assert body["detection_level_key"] == "not_detected"
|
|
|
|
|
|
def test_mark_executed_stamps_executed_at(
|
|
client, admin_token, catalogue, red_user
|
|
):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-exec",
|
|
scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
r = client.post(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}/transition",
|
|
headers=_bearer(red_user["token"]),
|
|
json={"target_state": "executed"},
|
|
)
|
|
assert r.status_code == 200, r.get_data(as_text=True)
|
|
body = r.get_json()
|
|
assert body["state"] == "executed"
|
|
assert body["executed_at"] is not None
|
|
assert body["executed_at_overridden"] is False
|
|
|
|
|
|
def test_executed_at_override_requires_red_perm_and_state(
|
|
client, admin_token, catalogue, red_user, blue_user
|
|
):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-override",
|
|
scenario_id=catalogue["scenario"]["id"],
|
|
red_id=red_user["id"], blue_id=blue_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
|
|
# Override while still pending → invalid_request (no executed milestone yet).
|
|
bad = client.put(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(red_user["token"]),
|
|
json={
|
|
"executed_at": "2026-05-14T10:00:00+00:00",
|
|
"executed_at_overridden": True,
|
|
},
|
|
)
|
|
assert bad.status_code == 400, bad.get_data(as_text=True)
|
|
|
|
# Mark executed first so we're allowed to override.
|
|
client.post(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}/transition",
|
|
headers=_bearer(red_user["token"]),
|
|
json={"target_state": "executed"},
|
|
)
|
|
|
|
# Blue cannot override (executed_at is a red field).
|
|
forbidden = client.put(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(blue_user["token"]),
|
|
json={
|
|
"executed_at": "2026-05-14T10:00:00+00:00",
|
|
"executed_at_overridden": True,
|
|
},
|
|
)
|
|
assert forbidden.status_code == 403
|
|
|
|
# Red successfully overrides.
|
|
ok = client.put(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(red_user["token"]),
|
|
json={
|
|
"executed_at": "2026-05-14T10:00:00+00:00",
|
|
"executed_at_overridden": True,
|
|
},
|
|
)
|
|
assert ok.status_code == 200, ok.get_data(as_text=True)
|
|
body = ok.get_json()
|
|
assert body["executed_at_overridden"] is True
|
|
assert body["executed_at"].startswith("2026-05-14T10:00:00")
|
|
|
|
|
|
def test_state_machine_rejects_invalid_transitions(
|
|
client, admin_token, catalogue, red_user
|
|
):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-state",
|
|
scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
# pending → reviewed_by_blue is not allowed (must go through executed first).
|
|
r = client.post(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}/transition",
|
|
headers=_bearer(red_user["token"]),
|
|
json={"target_state": "reviewed_by_blue"},
|
|
)
|
|
assert r.status_code == 409
|
|
|
|
|
|
def test_review_by_blue_requires_blue_perm(
|
|
client, admin_token, catalogue, red_user, blue_user
|
|
):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-review",
|
|
scenario_id=catalogue["scenario"]["id"],
|
|
red_id=red_user["id"], blue_id=blue_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
# red marks executed
|
|
r1 = client.post(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}/transition",
|
|
headers=_bearer(red_user["token"]),
|
|
json={"target_state": "executed"},
|
|
)
|
|
assert r1.status_code == 200
|
|
# red tries to mark reviewed_by_blue — denied (blue side)
|
|
r2 = client.post(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}/transition",
|
|
headers=_bearer(red_user["token"]),
|
|
json={"target_state": "reviewed_by_blue"},
|
|
)
|
|
assert r2.status_code == 403
|
|
# blue does it — OK
|
|
r3 = client.post(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}/transition",
|
|
headers=_bearer(blue_user["token"]),
|
|
json={"target_state": "reviewed_by_blue"},
|
|
)
|
|
assert r3.status_code == 200
|
|
assert r3.get_json()["state"] == "reviewed_by_blue"
|
|
|
|
|
|
def test_member_visibility_returns_404_for_outsiders(
|
|
client, admin_token, catalogue, red_user, reader_user
|
|
):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-secret",
|
|
scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
r = client.get(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(reader_user["token"]),
|
|
)
|
|
assert r.status_code == 404
|
|
|
|
|
|
def test_admin_bypasses_membership(client, admin_token, catalogue, red_user):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-admin-sees-all",
|
|
scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
# Admin is not a member; sees the test anyway.
|
|
r = client.get(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(admin_token),
|
|
)
|
|
assert r.status_code == 200
|
|
|
|
|
|
# ================================================================ evidence ==
|
|
|
|
|
|
def _png_bytes(n: int) -> bytes:
|
|
"""Return n bytes prefixed with a valid PNG magic so MIME sniffers cooperate."""
|
|
return b"\x89PNG\r\n\x1a\n" + b"A" * max(0, n - 8)
|
|
|
|
|
|
def _upload(client, mission_id: str, test_id: str, token: str, *,
|
|
filename: str, content: bytes, mime: str = "image/png"):
|
|
return client.post(
|
|
f"/api/v1/missions/{mission_id}/tests/{test_id}/evidence",
|
|
headers=_bearer(token),
|
|
data={"file": (io.BytesIO(content), filename, mime)},
|
|
content_type="multipart/form-data",
|
|
)
|
|
|
|
|
|
def test_evidence_upload_small_succeeds_and_records_sha256(
|
|
client, admin_token, catalogue, blue_user
|
|
):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-ev-small",
|
|
scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
content = _png_bytes(1024)
|
|
expected = hashlib.sha256(content).hexdigest()
|
|
r = _upload(client, mission["id"], tid, blue_user["token"],
|
|
filename="screenshot.png", content=content, mime="image/png")
|
|
assert r.status_code == 201, r.get_data(as_text=True)
|
|
body = r.get_json()
|
|
assert body["sha256"] == expected
|
|
assert body["size_bytes"] == len(content)
|
|
assert body["original_filename"] == "screenshot.png"
|
|
assert body["mime"] == "image/png"
|
|
|
|
|
|
def test_evidence_upload_24mb_succeeds_26mb_rejected(
|
|
client, admin_token, catalogue, blue_user
|
|
):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-ev-boundaries",
|
|
scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
twenty_four = _png_bytes(24 * 1024 * 1024)
|
|
ok = _upload(
|
|
client, mission["id"], tid, blue_user["token"],
|
|
filename="lab.evtx", content=twenty_four, mime="application/octet-stream",
|
|
)
|
|
assert ok.status_code == 201, ok.get_data(as_text=True)[:200]
|
|
|
|
twenty_six = _png_bytes(26 * 1024 * 1024)
|
|
too_big = _upload(
|
|
client, mission["id"], tid, blue_user["token"],
|
|
filename="huge.evtx", content=twenty_six, mime="application/octet-stream",
|
|
)
|
|
assert too_big.status_code == 400
|
|
assert too_big.get_json()["error"] == "too_large"
|
|
|
|
|
|
def test_evidence_upload_rejects_unsupported_extension(
|
|
client, admin_token, catalogue, blue_user
|
|
):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-ev-ext",
|
|
scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
r = _upload(
|
|
client, mission["id"], tid, blue_user["token"],
|
|
filename="evil.exe", content=b"\x4d\x5a", mime="application/octet-stream",
|
|
)
|
|
assert r.status_code == 400
|
|
assert r.get_json()["error"] == "unsupported_extension"
|
|
|
|
|
|
def test_evidence_upload_requires_blue_perm(
|
|
client, admin_token, catalogue, red_user, blue_user
|
|
):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-ev-perm",
|
|
scenario_id=catalogue["scenario"]["id"],
|
|
red_id=red_user["id"], blue_id=blue_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
r = _upload(
|
|
client, mission["id"], tid, red_user["token"],
|
|
filename="note.txt", content=b"hi", mime="text/plain",
|
|
)
|
|
assert r.status_code == 403
|
|
|
|
|
|
def test_evidence_download_returns_bytes(client, admin_token, catalogue, blue_user):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-ev-dl",
|
|
scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
content = b"hello evidence\n"
|
|
upl = _upload(
|
|
client, mission["id"], tid, blue_user["token"],
|
|
filename="note.txt", content=content, mime="text/plain",
|
|
).get_json()
|
|
eid = upl["id"]
|
|
|
|
meta = client.get(
|
|
f"/api/v1/evidence/{eid}", headers=_bearer(blue_user["token"])
|
|
)
|
|
assert meta.status_code == 200
|
|
assert meta.get_json()["sha256"] == hashlib.sha256(content).hexdigest()
|
|
|
|
dl = client.get(
|
|
f"/api/v1/evidence/{eid}?download=true",
|
|
headers=_bearer(blue_user["token"]),
|
|
)
|
|
assert dl.status_code == 200
|
|
assert dl.data == content
|
|
|
|
|
|
def test_evidence_soft_delete_hides_it_from_test_detail(
|
|
client, admin_token, catalogue, blue_user
|
|
):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-ev-del",
|
|
scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
upl = _upload(
|
|
client, mission["id"], tid, blue_user["token"],
|
|
filename="evidence.json", content=b'{"ok":true}',
|
|
mime="application/json",
|
|
).get_json()
|
|
eid = upl["id"]
|
|
|
|
detail_before = client.get(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(blue_user["token"]),
|
|
).get_json()
|
|
assert len(detail_before["evidence"]) == 1
|
|
|
|
r = client.delete(
|
|
f"/api/v1/evidence/{eid}", headers=_bearer(blue_user["token"])
|
|
)
|
|
assert r.status_code == 200
|
|
detail_after = client.get(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(blue_user["token"]),
|
|
).get_json()
|
|
assert detail_after["evidence"] == []
|
|
|
|
|
|
def test_idempotent_transition_still_checks_side_perm(
|
|
client, admin_token, catalogue, red_user, blue_user
|
|
):
|
|
"""A blue-only user re-POSTing target_state=executed on an already-executed
|
|
test must NOT receive 200 — even though no write happens, returning success
|
|
falsely implies they hold the red-side perm. See post-review fix C1."""
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-idemp-side",
|
|
scenario_id=catalogue["scenario"]["id"],
|
|
red_id=red_user["id"], blue_id=blue_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
# Red marks executed.
|
|
r1 = client.post(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}/transition",
|
|
headers=_bearer(red_user["token"]),
|
|
json={"target_state": "executed"},
|
|
)
|
|
assert r1.status_code == 200
|
|
# Blue replays the same transition — must be 403, not 200.
|
|
r2 = client.post(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}/transition",
|
|
headers=_bearer(blue_user["token"]),
|
|
json={"target_state": "executed"},
|
|
)
|
|
assert r2.status_code == 403
|
|
|
|
|
|
def test_evidence_member_of_other_mission_gets_404(
|
|
client, admin_token, catalogue, blue_user
|
|
):
|
|
"""A user who is a blue member of mission B must NOT be able to read an
|
|
evidence row belonging to mission A — the chain walk must collapse to 404."""
|
|
mission_a = _make_mission(
|
|
client, admin_token, name="m7-ev-cross-a",
|
|
scenario_id=catalogue["scenario"]["id"],
|
|
# blue_user is NOT a member of A
|
|
)
|
|
tid_a = _first_test_id(mission_a)
|
|
# Admin uploads on mission A.
|
|
upl = _upload(
|
|
client, mission_a["id"], tid_a, admin_token,
|
|
filename="a.txt", content=b"secret", mime="text/plain",
|
|
).get_json()
|
|
eid = upl["id"]
|
|
|
|
# blue_user joins mission B but tries to read mission A's evidence.
|
|
_make_mission(
|
|
client, admin_token, name="m7-ev-cross-b",
|
|
scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"],
|
|
)
|
|
r = client.get(f"/api/v1/evidence/{eid}", headers=_bearer(blue_user["token"]))
|
|
assert r.status_code == 404
|
|
|
|
|
|
def test_evidence_non_member_gets_404(client, admin_token, catalogue, blue_user,
|
|
reader_user):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-ev-leak",
|
|
scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
upl = _upload(
|
|
client, mission["id"], tid, blue_user["token"],
|
|
filename="a.txt", content=b"x", mime="text/plain",
|
|
).get_json()
|
|
eid = upl["id"]
|
|
r = client.get(f"/api/v1/evidence/{eid}", headers=_bearer(reader_user["token"]))
|
|
assert r.status_code == 404
|
|
|
|
|
|
# ================================================================ activity ==
|
|
|
|
|
|
def test_activity_polling_returns_recent_changes(
|
|
client, admin_token, catalogue, red_user
|
|
):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-activity",
|
|
scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"],
|
|
)
|
|
tid = _first_test_id(mission)
|
|
|
|
# Baseline timestamp from server, then a write 'after' it should appear.
|
|
now = client.get(
|
|
f"/api/v1/missions/{mission['id']}/activity",
|
|
headers=_bearer(red_user["token"]),
|
|
)
|
|
assert now.status_code == 200
|
|
server_t = now.get_json()["server_time"]
|
|
|
|
# Mutate via PUT to bump updated_at.
|
|
client.put(
|
|
f"/api/v1/missions/{mission['id']}/tests/{tid}",
|
|
headers=_bearer(red_user["token"]),
|
|
json={"red_comment_md": "kicked off"},
|
|
)
|
|
# `since` must be URL-encoded — its `+` and `:` would otherwise be mangled.
|
|
since_q = urllib.parse.quote(server_t)
|
|
fresh = client.get(
|
|
f"/api/v1/missions/{mission['id']}/activity?since={since_q}",
|
|
headers=_bearer(red_user["token"]),
|
|
)
|
|
assert fresh.status_code == 200
|
|
items = fresh.get_json()["items"]
|
|
assert len(items) >= 1
|
|
assert items[0]["test_id"] == tid
|
|
assert items[0]["last_actor_email"] == red_user["email"]
|
|
|
|
|
|
def test_activity_invalid_since_returns_400(client, admin_token, catalogue, red_user):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-activity-bad",
|
|
scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"],
|
|
)
|
|
r = client.get(
|
|
f"/api/v1/missions/{mission['id']}/activity?since=not-a-date",
|
|
headers=_bearer(red_user["token"]),
|
|
)
|
|
assert r.status_code == 400
|
|
|
|
|
|
def test_activity_404_for_non_member(client, admin_token, catalogue, red_user,
|
|
reader_user):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-activity-leak",
|
|
scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"],
|
|
)
|
|
r = client.get(
|
|
f"/api/v1/missions/{mission['id']}/activity",
|
|
headers=_bearer(reader_user["token"]),
|
|
)
|
|
assert r.status_code == 404
|
|
|
|
|
|
def test_activity_since_in_future_returns_empty(
|
|
client, admin_token, catalogue, red_user
|
|
):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-activity-future",
|
|
scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"],
|
|
)
|
|
future = (datetime.now(tz=timezone.utc) + timedelta(hours=1)).isoformat()
|
|
since_q = urllib.parse.quote(future)
|
|
r = client.get(
|
|
f"/api/v1/missions/{mission['id']}/activity?since={since_q}",
|
|
headers=_bearer(red_user["token"]),
|
|
)
|
|
assert r.status_code == 200
|
|
assert r.get_json()["items"] == []
|
|
|
|
|
|
def test_unknown_test_id_returns_404(client, admin_token, catalogue, red_user):
|
|
mission = _make_mission(
|
|
client, admin_token, name="m7-unknown-test",
|
|
scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"],
|
|
)
|
|
fake = str(uuid.uuid4())
|
|
r = client.get(
|
|
f"/api/v1/missions/{mission['id']}/tests/{fake}",
|
|
headers=_bearer(red_user["token"]),
|
|
)
|
|
assert r.status_code == 404
|