2026-05-12 19:57:33 +02:00
|
|
|
"""M5 — Template catalogue integration tests.
|
|
|
|
|
|
|
|
|
|
Covers `test_template` and `scenario_template` CRUD + ordering + perm gating.
|
|
|
|
|
Relies on a minimal MITRE seed (T1059 / TA0001 / T1059.001) so the polymorphic
|
|
|
|
|
tag join can be exercised end-to-end.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
import secrets
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
from sqlalchemy import text
|
|
|
|
|
|
|
|
|
|
from app.core.install_token import regenerate_install_token
|
|
|
|
|
from app.main import create_app
|
|
|
|
|
from app.services import mitre_seed as mitre_svc
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _truncate_all(engine):
|
|
|
|
|
with engine.begin() as conn:
|
|
|
|
|
conn.execute(
|
|
|
|
|
text(
|
|
|
|
|
"TRUNCATE users, refresh_tokens, invitations, invitation_groups, "
|
|
|
|
|
"user_groups, group_permissions, permissions, settings, groups, "
|
|
|
|
|
"scenario_template_tests, scenario_templates, "
|
|
|
|
|
"test_template_mitre_tags, test_templates, "
|
|
|
|
|
"mitre_subtechniques, mitre_technique_tactics, mitre_techniques, "
|
|
|
|
|
"mitre_tactics RESTART IDENTITY CASCADE"
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Same minimal bundle as in test_mitre.py — keeps tag resolution deterministic
|
|
|
|
|
# without re-pulling the full enterprise STIX bundle.
|
|
|
|
|
_MINIMAL_BUNDLE = {
|
|
|
|
|
"type": "bundle",
|
|
|
|
|
"id": "bundle--00000000-0000-0000-0000-000000000002",
|
|
|
|
|
"spec_version": "2.1",
|
|
|
|
|
"objects": [
|
|
|
|
|
{
|
|
|
|
|
"type": "x-mitre-tactic",
|
|
|
|
|
"id": "x-mitre-tactic--ta0001",
|
|
|
|
|
"name": "Initial Access",
|
|
|
|
|
"x_mitre_shortname": "initial-access",
|
|
|
|
|
"external_references": [
|
|
|
|
|
{"source_name": "mitre-attack", "external_id": "TA0001"}
|
|
|
|
|
],
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"type": "x-mitre-tactic",
|
|
|
|
|
"id": "x-mitre-tactic--ta0002",
|
|
|
|
|
"name": "Execution",
|
|
|
|
|
"x_mitre_shortname": "execution",
|
|
|
|
|
"external_references": [
|
|
|
|
|
{"source_name": "mitre-attack", "external_id": "TA0002"}
|
|
|
|
|
],
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"type": "attack-pattern",
|
|
|
|
|
"id": "attack-pattern--t1059",
|
|
|
|
|
"name": "Command and Scripting Interpreter",
|
|
|
|
|
"kill_chain_phases": [
|
|
|
|
|
{"kill_chain_name": "mitre-attack", "phase_name": "execution"}
|
|
|
|
|
],
|
|
|
|
|
"external_references": [
|
|
|
|
|
{"source_name": "mitre-attack", "external_id": "T1059"}
|
|
|
|
|
],
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"type": "attack-pattern",
|
|
|
|
|
"id": "attack-pattern--t1059-001",
|
|
|
|
|
"name": "PowerShell",
|
|
|
|
|
"x_mitre_is_subtechnique": True,
|
|
|
|
|
"external_references": [
|
|
|
|
|
{"source_name": "mitre-attack", "external_id": "T1059.001"}
|
|
|
|
|
],
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"type": "relationship",
|
|
|
|
|
"id": "relationship--rel1",
|
|
|
|
|
"relationship_type": "subtechnique-of",
|
|
|
|
|
"source_ref": "attack-pattern--t1059-001",
|
|
|
|
|
"target_ref": "attack-pattern--t1059",
|
|
|
|
|
},
|
|
|
|
|
],
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
|
def app(db_engine_or_skip, tmp_path_factory):
|
|
|
|
|
_truncate_all(db_engine_or_skip)
|
|
|
|
|
bundle_path = tmp_path_factory.mktemp("m5") / "stix.json"
|
|
|
|
|
bundle_path.write_text(json.dumps(_MINIMAL_BUNDLE))
|
|
|
|
|
mitre_svc.seed_mitre(source=bundle_path, expected_sha256=None)
|
|
|
|
|
flask_app = create_app()
|
|
|
|
|
flask_app.config.update(TESTING=True)
|
|
|
|
|
return flask_app
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
|
|
def client(app):
|
|
|
|
|
return app.test_client()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _unique_email(prefix: str) -> str:
|
|
|
|
|
return f"{prefix}-{secrets.token_hex(4)}@metamorph.local"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
|
def admin(app):
|
|
|
|
|
token = regenerate_install_token()
|
|
|
|
|
email = _unique_email("admin")
|
|
|
|
|
password = "AdminPass1234!"
|
|
|
|
|
with app.test_client() as c:
|
|
|
|
|
r = c.post(
|
|
|
|
|
"/api/v1/setup",
|
|
|
|
|
json={"install_token": token, "email": email, "password": password},
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 201, r.get_data(as_text=True)
|
|
|
|
|
return {"email": email, "password": password}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _login(client, email: str, password: str) -> str:
|
|
|
|
|
r = client.post("/api/v1/auth/login", json={"email": email, "password": password})
|
|
|
|
|
assert r.status_code == 200, r.get_data(as_text=True)
|
|
|
|
|
return r.get_json()["access_token"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _bearer(token: str) -> dict[str, str]:
|
|
|
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
|
|
def admin_token(client, admin) -> str:
|
|
|
|
|
return _login(client, admin["email"], admin["password"])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# === Reader fixture: an invited user with only `test_template.read` =========
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _bootstrap_user_without_perms(client, admin_token: str, prefix: str) -> tuple[str, str]:
|
|
|
|
|
email = _unique_email(prefix)
|
|
|
|
|
inv = client.post(
|
|
|
|
|
"/api/v1/invitations",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={"email_hint": email},
|
|
|
|
|
)
|
|
|
|
|
token = inv.get_json()["token"]
|
|
|
|
|
password = "ReaderPass1234!"
|
|
|
|
|
client.post(
|
|
|
|
|
f"/api/v1/invitations/accept/{token}",
|
|
|
|
|
json={"email": email, "password": password},
|
|
|
|
|
)
|
|
|
|
|
return email, _login(client, email, password)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# === test_template CRUD =====================================================
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _make_test(client, admin_token: str, **overrides):
|
|
|
|
|
body = {
|
|
|
|
|
"name": overrides.pop("name", f"Test {secrets.token_hex(2)}"),
|
|
|
|
|
"description": overrides.pop("description", "auto"),
|
|
|
|
|
"objective": "do thing",
|
|
|
|
|
"procedure_md": "1. step",
|
|
|
|
|
"expected_result_red_md": "red expectation",
|
|
|
|
|
"expected_detection_blue_md": "blue expectation",
|
|
|
|
|
"opsec_level": overrides.pop("opsec_level", "medium"),
|
|
|
|
|
"tags": overrides.pop("tags", ["fast"]),
|
|
|
|
|
"expected_iocs": ["evil.exe"],
|
|
|
|
|
"mitre_tags": overrides.pop("mitre_tags", [{"kind": "technique", "external_id": "T1059"}]),
|
|
|
|
|
**overrides,
|
|
|
|
|
}
|
|
|
|
|
r = client.post("/api/v1/test-templates", headers=_bearer(admin_token), json=body)
|
|
|
|
|
assert r.status_code == 201, r.get_data(as_text=True)
|
|
|
|
|
return r.get_json()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_create_test_template_with_mitre_tags(client, admin_token):
|
|
|
|
|
body = _make_test(
|
|
|
|
|
client,
|
|
|
|
|
admin_token,
|
|
|
|
|
name="PowerShell exec",
|
|
|
|
|
mitre_tags=[
|
|
|
|
|
{"kind": "tactic", "external_id": "TA0002"},
|
|
|
|
|
{"kind": "technique", "external_id": "T1059"},
|
|
|
|
|
{"kind": "subtechnique", "external_id": "T1059.001"},
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
assert body["opsec_level"] == "medium"
|
|
|
|
|
kinds = sorted((t["kind"], t["external_id"]) for t in body["mitre_tags"])
|
|
|
|
|
assert kinds == [
|
|
|
|
|
("subtechnique", "T1059.001"),
|
|
|
|
|
("tactic", "TA0002"),
|
|
|
|
|
("technique", "T1059"),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_create_test_template_rejects_unknown_mitre(client, admin_token):
|
|
|
|
|
r = client.post(
|
|
|
|
|
"/api/v1/test-templates",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={
|
|
|
|
|
"name": "Bad",
|
|
|
|
|
"mitre_tags": [{"kind": "technique", "external_id": "T9999"}],
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 400
|
|
|
|
|
assert r.get_json()["error"] == "unknown_mitre_tag"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_create_test_template_rejects_bad_opsec(client, admin_token):
|
|
|
|
|
r = client.post(
|
|
|
|
|
"/api/v1/test-templates",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={"name": "Bad", "opsec_level": "burner"},
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 400
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_list_test_templates_filter_by_tactic(client, admin_token):
|
|
|
|
|
_make_test(
|
|
|
|
|
client,
|
|
|
|
|
admin_token,
|
|
|
|
|
name="filterable-1",
|
|
|
|
|
mitre_tags=[{"kind": "tactic", "external_id": "TA0002"}],
|
|
|
|
|
)
|
|
|
|
|
r = client.get(
|
|
|
|
|
"/api/v1/test-templates?tactic=TA0002",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 200
|
|
|
|
|
body = r.get_json()
|
|
|
|
|
names = [it["name"] for it in body["items"]]
|
|
|
|
|
assert "filterable-1" in names
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_list_test_templates_filter_by_opsec(client, admin_token):
|
|
|
|
|
_make_test(client, admin_token, name="high-opsec", opsec_level="high")
|
|
|
|
|
r = client.get(
|
|
|
|
|
"/api/v1/test-templates?opsec=high",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 200
|
|
|
|
|
names = [it["name"] for it in r.get_json()["items"]]
|
|
|
|
|
assert "high-opsec" in names
|
|
|
|
|
assert all(it["opsec_level"] == "high" for it in r.get_json()["items"])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_list_test_templates_filter_by_tag(client, admin_token):
|
|
|
|
|
_make_test(client, admin_token, name="tagged-fast", tags=["fast", "phish"])
|
|
|
|
|
r = client.get(
|
|
|
|
|
"/api/v1/test-templates?tag=phish",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 200
|
|
|
|
|
names = [it["name"] for it in r.get_json()["items"]]
|
|
|
|
|
assert "tagged-fast" in names
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_list_test_templates_search_q(client, admin_token):
|
|
|
|
|
_make_test(client, admin_token, name="unique-token-azertyuiop")
|
|
|
|
|
r = client.get(
|
|
|
|
|
"/api/v1/test-templates?q=AZERTYUIOP", # case-insensitive
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 200
|
|
|
|
|
names = [it["name"] for it in r.get_json()["items"]]
|
|
|
|
|
assert "unique-token-azertyuiop" in names
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_update_test_template_replaces_mitre_tags(client, admin_token):
|
|
|
|
|
body = _make_test(
|
|
|
|
|
client,
|
|
|
|
|
admin_token,
|
|
|
|
|
name="to-update",
|
|
|
|
|
mitre_tags=[{"kind": "tactic", "external_id": "TA0001"}],
|
|
|
|
|
)
|
|
|
|
|
r = client.put(
|
|
|
|
|
f"/api/v1/test-templates/{body['id']}",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={"mitre_tags": [{"kind": "technique", "external_id": "T1059"}]},
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 200, r.get_data(as_text=True)
|
|
|
|
|
updated = r.get_json()
|
|
|
|
|
kinds = [(t["kind"], t["external_id"]) for t in updated["mitre_tags"]]
|
|
|
|
|
assert kinds == [("technique", "T1059")]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_update_test_template_partial_keeps_unset_fields(client, admin_token):
|
|
|
|
|
body = _make_test(
|
|
|
|
|
client,
|
|
|
|
|
admin_token,
|
|
|
|
|
name="partial-update",
|
|
|
|
|
opsec_level="low",
|
|
|
|
|
tags=["a", "b"],
|
|
|
|
|
)
|
|
|
|
|
r = client.put(
|
|
|
|
|
f"/api/v1/test-templates/{body['id']}",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={"name": "renamed"},
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 200
|
|
|
|
|
updated = r.get_json()
|
|
|
|
|
assert updated["name"] == "renamed"
|
|
|
|
|
assert updated["opsec_level"] == "low" # untouched
|
|
|
|
|
assert set(updated["tags"]) == {"a", "b"} # untouched
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_soft_delete_then_list_hides_by_default(client, admin_token):
|
|
|
|
|
body = _make_test(client, admin_token, name="to-be-deleted")
|
|
|
|
|
r = client.delete(
|
|
|
|
|
f"/api/v1/test-templates/{body['id']}", headers=_bearer(admin_token)
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 200
|
|
|
|
|
r2 = client.get("/api/v1/test-templates", headers=_bearer(admin_token))
|
|
|
|
|
names = [it["name"] for it in r2.get_json()["items"]]
|
|
|
|
|
assert "to-be-deleted" not in names
|
|
|
|
|
# And reappears with include_deleted=true
|
|
|
|
|
r3 = client.get(
|
|
|
|
|
"/api/v1/test-templates?include_deleted=true",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
)
|
|
|
|
|
names3 = [it["name"] for it in r3.get_json()["items"]]
|
|
|
|
|
assert "to-be-deleted" in names3
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_read_perm_required(client, admin_token):
|
|
|
|
|
"""A user without `test_template.read` gets 403."""
|
|
|
|
|
_, eve_token = _bootstrap_user_without_perms(client, admin_token, "eve-noperm")
|
|
|
|
|
r = client.get("/api/v1/test-templates", headers=_bearer(eve_token))
|
|
|
|
|
assert r.status_code == 403
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_write_perm_required(client, admin_token):
|
|
|
|
|
"""A user with only `test_template.read` cannot create.
|
|
|
|
|
|
|
|
|
|
Bootstrap path: create a dedicated group via the admin API, bind only the
|
|
|
|
|
`test_template.read` perm, then invite a user pre-assigned to that group.
|
|
|
|
|
"""
|
|
|
|
|
# 1. Create the read-only group + bind the single perm.
|
|
|
|
|
grp = client.post(
|
|
|
|
|
"/api/v1/groups",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={"name": f"tpl-reader-{secrets.token_hex(2)}"},
|
|
|
|
|
).get_json()
|
|
|
|
|
r_set = client.put(
|
|
|
|
|
f"/api/v1/groups/{grp['id']}/permissions",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={"codes": ["test_template.read"]},
|
|
|
|
|
)
|
|
|
|
|
assert r_set.status_code == 200, r_set.get_data(as_text=True)
|
|
|
|
|
|
|
|
|
|
# 2. Invite a user already attached to that group.
|
|
|
|
|
email = _unique_email("alice-readonly")
|
|
|
|
|
password = "ReaderPass1234!"
|
|
|
|
|
inv = client.post(
|
|
|
|
|
"/api/v1/invitations",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={"email_hint": email, "group_ids": [grp["id"]]},
|
|
|
|
|
).get_json()
|
|
|
|
|
client.post(
|
|
|
|
|
f"/api/v1/invitations/accept/{inv['token']}",
|
|
|
|
|
json={"email": email, "password": password},
|
|
|
|
|
)
|
|
|
|
|
token = _login(client, email, password)
|
|
|
|
|
|
|
|
|
|
r = client.get("/api/v1/test-templates", headers=_bearer(token))
|
|
|
|
|
assert r.status_code == 200, r.get_data(as_text=True)
|
|
|
|
|
r2 = client.post(
|
|
|
|
|
"/api/v1/test-templates", headers=_bearer(token), json={"name": "X"}
|
|
|
|
|
)
|
|
|
|
|
assert r2.status_code == 403
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# === scenario_template CRUD =================================================
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_create_scenario_with_ordered_tests(client, admin_token):
|
|
|
|
|
a = _make_test(client, admin_token, name="scn-a")
|
|
|
|
|
b = _make_test(client, admin_token, name="scn-b")
|
|
|
|
|
c = _make_test(client, admin_token, name="scn-c")
|
|
|
|
|
r = client.post(
|
|
|
|
|
"/api/v1/scenario-templates",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={
|
|
|
|
|
"name": "phishing-flow",
|
|
|
|
|
"description": "click → exec → persist",
|
|
|
|
|
"test_template_ids": [a["id"], b["id"], c["id"]],
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 201, r.get_data(as_text=True)
|
|
|
|
|
body = r.get_json()
|
|
|
|
|
assert body["tests_count"] == 3
|
|
|
|
|
assert [t["position"] for t in body["tests"]] == [0, 1, 2]
|
|
|
|
|
assert [t["test_template_name"] for t in body["tests"]] == ["scn-a", "scn-b", "scn-c"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_reorder_scenario_tests(client, admin_token):
|
|
|
|
|
a = _make_test(client, admin_token, name="reord-a")
|
|
|
|
|
b = _make_test(client, admin_token, name="reord-b")
|
|
|
|
|
c = _make_test(client, admin_token, name="reord-c")
|
|
|
|
|
created = client.post(
|
|
|
|
|
"/api/v1/scenario-templates",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={
|
|
|
|
|
"name": "reorder-me",
|
|
|
|
|
"test_template_ids": [a["id"], b["id"], c["id"]],
|
|
|
|
|
},
|
|
|
|
|
).get_json()
|
|
|
|
|
# Reverse order.
|
|
|
|
|
r = client.put(
|
|
|
|
|
f"/api/v1/scenario-templates/{created['id']}/tests",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={"test_template_ids": [c["id"], b["id"], a["id"]]},
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 200
|
|
|
|
|
after = r.get_json()
|
|
|
|
|
assert [t["test_template_name"] for t in after["tests"]] == ["reord-c", "reord-b", "reord-a"]
|
|
|
|
|
# Re-reading via GET yields the same order — confirms persistence.
|
|
|
|
|
fresh = client.get(
|
|
|
|
|
f"/api/v1/scenario-templates/{created['id']}", headers=_bearer(admin_token)
|
|
|
|
|
).get_json()
|
|
|
|
|
assert [t["test_template_name"] for t in fresh["tests"]] == ["reord-c", "reord-b", "reord-a"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_scenario_rejects_unknown_test_id(client, admin_token):
|
|
|
|
|
r = client.post(
|
|
|
|
|
"/api/v1/scenario-templates",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={
|
|
|
|
|
"name": "bad",
|
|
|
|
|
"test_template_ids": ["00000000-0000-0000-0000-000000000000"],
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 400
|
|
|
|
|
assert r.get_json()["error"] == "unknown_test_template"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_scenario_rejects_soft_deleted_test_on_create(client, admin_token):
|
|
|
|
|
a = _make_test(client, admin_token, name="will-be-deleted")
|
|
|
|
|
client.delete(f"/api/v1/test-templates/{a['id']}", headers=_bearer(admin_token))
|
|
|
|
|
r = client.post(
|
|
|
|
|
"/api/v1/scenario-templates",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={"name": "linked", "test_template_ids": [a["id"]]},
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 400
|
|
|
|
|
assert r.get_json()["error"] == "unknown_test_template"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_scenario_surfaces_soft_deleted_test_after_link(client, admin_token):
|
|
|
|
|
"""Once linked, a test can be soft-deleted without breaking the scenario —
|
|
|
|
|
the join row stays and the API flags the test as deleted."""
|
|
|
|
|
a = _make_test(client, admin_token, name="linked-then-deleted")
|
|
|
|
|
sc = client.post(
|
|
|
|
|
"/api/v1/scenario-templates",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={"name": "survives", "test_template_ids": [a["id"]]},
|
|
|
|
|
).get_json()
|
|
|
|
|
client.delete(f"/api/v1/test-templates/{a['id']}", headers=_bearer(admin_token))
|
|
|
|
|
fresh = client.get(
|
|
|
|
|
f"/api/v1/scenario-templates/{sc['id']}", headers=_bearer(admin_token)
|
|
|
|
|
).get_json()
|
|
|
|
|
assert fresh["tests"][0]["test_template_deleted"] is True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_scenario_soft_delete(client, admin_token):
|
|
|
|
|
sc = client.post(
|
|
|
|
|
"/api/v1/scenario-templates",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={"name": "doomed-scn"},
|
|
|
|
|
).get_json()
|
|
|
|
|
r = client.delete(
|
|
|
|
|
f"/api/v1/scenario-templates/{sc['id']}", headers=_bearer(admin_token)
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 200
|
|
|
|
|
names = [
|
|
|
|
|
it["name"]
|
|
|
|
|
for it in client.get(
|
|
|
|
|
"/api/v1/scenario-templates", headers=_bearer(admin_token)
|
|
|
|
|
).get_json()["items"]
|
|
|
|
|
]
|
|
|
|
|
assert "doomed-scn" not in names
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_scenario_perm_required(client, admin_token):
|
|
|
|
|
_, eve_token = _bootstrap_user_without_perms(client, admin_token, "scn-eve")
|
|
|
|
|
r = client.get("/api/v1/scenario-templates", headers=_bearer(eve_token))
|
|
|
|
|
assert r.status_code == 403
|
fix(m5): post-review pass — AND filter, advisory lock, N+1, item caps, mutation cache
Spec-reviewer + code-reviewer findings applied:
Must-fix
- Filter combinator AND-semantics: tactic+technique+subtechnique now intersect
(one IN subquery per facet) instead of being pooled into one OR. Reviewers
flagged both the wrong default semantics and the theoretical UUID-collision
risk of pooling tactic/technique/sub UUIDs into a shared list across
three columns.
- Front-end mutation cache hygiene: updateMeta + setTests both
`onSettled: invalidate` so a partial failure leaves the cache consistent.
Should-fix
- Per-scenario pg_advisory_xact_lock on set_scenario_tests — serialises
concurrent reorders, mirrors M4 /mitre/sync pattern.
- Backend/front consistency on duplicate tests in a scenario: the
UNIQUE(scenario_id, position) constraint already allows the same
test_template multiple times (chained ops), so the catalogue picker no
longer excludes already-picked items.
Nice-to-have
- N+1 eradicated in test_template view rendering: _to_views_batch
builds {uuid → MitreRow} maps in 3 queries up-front; list endpoint
now issues 4 queries total regardless of list size.
- Wire-level item length caps on tags (64) and expected_iocs (255)
via Annotated[str, StringConstraints(...)] — returns 400 instead of
bubbling up StringDataRightTruncation.
- 4 new pytest covering the AND-filter, extra="forbid" rejection,
empty mitre_tags clearing, and the 65-char tag cap. Total now
81 pytest + 38 e2e pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 20:05:00 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# === Post-review fixes ======================================================
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_list_filter_combines_facets_with_and_semantics(client, admin_token):
|
|
|
|
|
"""A template tagged only `TA0002` is NOT in `?tactic=TA0002&technique=T1059`.
|
|
|
|
|
|
|
|
|
|
Pre-fix the OR-combined query would return it. AND-combined semantics
|
|
|
|
|
(one IN subquery per facet) restrict the set to templates matching ALL
|
|
|
|
|
requested facets.
|
|
|
|
|
"""
|
|
|
|
|
a = _make_test(
|
|
|
|
|
client,
|
|
|
|
|
admin_token,
|
|
|
|
|
name="and-tactic-only",
|
|
|
|
|
mitre_tags=[{"kind": "tactic", "external_id": "TA0002"}],
|
|
|
|
|
)
|
|
|
|
|
b = _make_test(
|
|
|
|
|
client,
|
|
|
|
|
admin_token,
|
|
|
|
|
name="and-both-tags",
|
|
|
|
|
mitre_tags=[
|
|
|
|
|
{"kind": "tactic", "external_id": "TA0002"},
|
|
|
|
|
{"kind": "technique", "external_id": "T1059"},
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
r = client.get(
|
|
|
|
|
"/api/v1/test-templates?tactic=TA0002&technique=T1059",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 200
|
|
|
|
|
names = [it["name"] for it in r.get_json()["items"]]
|
|
|
|
|
assert "and-both-tags" in names
|
|
|
|
|
assert "and-tactic-only" not in names
|
|
|
|
|
_ = a, b # silence unused vars from linter
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_create_test_template_rejects_extra_fields(client, admin_token):
|
|
|
|
|
"""`model_config = {"extra": "forbid"}` — unknown fields must 400."""
|
|
|
|
|
r = client.post(
|
|
|
|
|
"/api/v1/test-templates",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={"name": "extra-test", "rogue_field": "smuggled"},
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 400
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_update_test_template_explicit_empty_mitre_clears(client, admin_token):
|
|
|
|
|
"""`PUT { mitre_tags: [] }` is an explicit clear, not a no-op."""
|
|
|
|
|
body = _make_test(
|
|
|
|
|
client,
|
|
|
|
|
admin_token,
|
|
|
|
|
name="clear-tags",
|
|
|
|
|
mitre_tags=[{"kind": "technique", "external_id": "T1059"}],
|
|
|
|
|
)
|
|
|
|
|
assert len(body["mitre_tags"]) == 1
|
|
|
|
|
r = client.put(
|
|
|
|
|
f"/api/v1/test-templates/{body['id']}",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={"mitre_tags": []},
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 200
|
|
|
|
|
assert r.get_json()["mitre_tags"] == []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_tag_item_length_capped_at_64(client, admin_token):
|
|
|
|
|
"""Individual `tags` items must be ≤ 64 chars at the wire layer."""
|
|
|
|
|
long_tag = "x" * 65
|
|
|
|
|
r = client.post(
|
|
|
|
|
"/api/v1/test-templates",
|
|
|
|
|
headers=_bearer(admin_token),
|
|
|
|
|
json={"name": "long-tag", "tags": [long_tag]},
|
|
|
|
|
)
|
|
|
|
|
assert r.status_code == 400
|