Files
Metamorph/backend/tests/test_templates.py

568 lines
19 KiB
Python
Raw Permalink Normal View History

"""M5 — Template catalogue integration tests.
Covers `test_template` and `scenario_template` CRUD + ordering + perm gating.
Relies on a minimal MITRE seed (T1059 / TA0001 / T1059.001) so the polymorphic
tag join can be exercised end-to-end.
"""
from __future__ import annotations
import json
import secrets
import pytest
from sqlalchemy import text
from app.core.install_token import regenerate_install_token
from app.main import create_app
from app.services import mitre_seed as mitre_svc
def _truncate_all(engine):
with engine.begin() as conn:
conn.execute(
text(
"TRUNCATE users, refresh_tokens, invitations, invitation_groups, "
"user_groups, group_permissions, permissions, settings, groups, "
"scenario_template_tests, scenario_templates, "
"test_template_mitre_tags, test_templates, "
"mitre_subtechniques, mitre_technique_tactics, mitre_techniques, "
"mitre_tactics RESTART IDENTITY CASCADE"
)
)
# Same minimal bundle as in test_mitre.py — keeps tag resolution deterministic
# without re-pulling the full enterprise STIX bundle.
_MINIMAL_BUNDLE = {
"type": "bundle",
"id": "bundle--00000000-0000-0000-0000-000000000002",
"spec_version": "2.1",
"objects": [
{
"type": "x-mitre-tactic",
"id": "x-mitre-tactic--ta0001",
"name": "Initial Access",
"x_mitre_shortname": "initial-access",
"external_references": [
{"source_name": "mitre-attack", "external_id": "TA0001"}
],
},
{
"type": "x-mitre-tactic",
"id": "x-mitre-tactic--ta0002",
"name": "Execution",
"x_mitre_shortname": "execution",
"external_references": [
{"source_name": "mitre-attack", "external_id": "TA0002"}
],
},
{
"type": "attack-pattern",
"id": "attack-pattern--t1059",
"name": "Command and Scripting Interpreter",
"kill_chain_phases": [
{"kill_chain_name": "mitre-attack", "phase_name": "execution"}
],
"external_references": [
{"source_name": "mitre-attack", "external_id": "T1059"}
],
},
{
"type": "attack-pattern",
"id": "attack-pattern--t1059-001",
"name": "PowerShell",
"x_mitre_is_subtechnique": True,
"external_references": [
{"source_name": "mitre-attack", "external_id": "T1059.001"}
],
},
{
"type": "relationship",
"id": "relationship--rel1",
"relationship_type": "subtechnique-of",
"source_ref": "attack-pattern--t1059-001",
"target_ref": "attack-pattern--t1059",
},
],
}
@pytest.fixture(scope="module")
def app(db_engine_or_skip, tmp_path_factory):
_truncate_all(db_engine_or_skip)
bundle_path = tmp_path_factory.mktemp("m5") / "stix.json"
bundle_path.write_text(json.dumps(_MINIMAL_BUNDLE))
mitre_svc.seed_mitre(source=bundle_path, expected_sha256=None)
flask_app = create_app()
flask_app.config.update(TESTING=True)
return flask_app
@pytest.fixture()
def client(app):
return app.test_client()
def _unique_email(prefix: str) -> str:
return f"{prefix}-{secrets.token_hex(4)}@metamorph.local"
@pytest.fixture(scope="module")
def admin(app):
token = regenerate_install_token()
email = _unique_email("admin")
password = "AdminPass1234!"
with app.test_client() as c:
r = c.post(
"/api/v1/setup",
json={"install_token": token, "email": email, "password": password},
)
assert r.status_code == 201, r.get_data(as_text=True)
return {"email": email, "password": password}
def _login(client, email: str, password: str) -> str:
r = client.post("/api/v1/auth/login", json={"email": email, "password": password})
assert r.status_code == 200, r.get_data(as_text=True)
return r.get_json()["access_token"]
def _bearer(token: str) -> dict[str, str]:
return {"Authorization": f"Bearer {token}"}
@pytest.fixture()
def admin_token(client, admin) -> str:
return _login(client, admin["email"], admin["password"])
# === Reader fixture: an invited user with only `test_template.read` =========
def _bootstrap_user_without_perms(client, admin_token: str, prefix: str) -> tuple[str, str]:
email = _unique_email(prefix)
inv = client.post(
"/api/v1/invitations",
headers=_bearer(admin_token),
json={"email_hint": email},
)
token = inv.get_json()["token"]
password = "ReaderPass1234!"
client.post(
f"/api/v1/invitations/accept/{token}",
json={"email": email, "password": password},
)
return email, _login(client, email, password)
# === test_template CRUD =====================================================
def _make_test(client, admin_token: str, **overrides):
body = {
"name": overrides.pop("name", f"Test {secrets.token_hex(2)}"),
"description": overrides.pop("description", "auto"),
"objective": "do thing",
"procedure_md": "1. step",
"expected_result_red_md": "red expectation",
"expected_detection_blue_md": "blue expectation",
"opsec_level": overrides.pop("opsec_level", "medium"),
"tags": overrides.pop("tags", ["fast"]),
"expected_iocs": ["evil.exe"],
"mitre_tags": overrides.pop("mitre_tags", [{"kind": "technique", "external_id": "T1059"}]),
**overrides,
}
r = client.post("/api/v1/test-templates", headers=_bearer(admin_token), json=body)
assert r.status_code == 201, r.get_data(as_text=True)
return r.get_json()
def test_create_test_template_with_mitre_tags(client, admin_token):
body = _make_test(
client,
admin_token,
name="PowerShell exec",
mitre_tags=[
{"kind": "tactic", "external_id": "TA0002"},
{"kind": "technique", "external_id": "T1059"},
{"kind": "subtechnique", "external_id": "T1059.001"},
],
)
assert body["opsec_level"] == "medium"
kinds = sorted((t["kind"], t["external_id"]) for t in body["mitre_tags"])
assert kinds == [
("subtechnique", "T1059.001"),
("tactic", "TA0002"),
("technique", "T1059"),
]
def test_create_test_template_rejects_unknown_mitre(client, admin_token):
r = client.post(
"/api/v1/test-templates",
headers=_bearer(admin_token),
json={
"name": "Bad",
"mitre_tags": [{"kind": "technique", "external_id": "T9999"}],
},
)
assert r.status_code == 400
assert r.get_json()["error"] == "unknown_mitre_tag"
def test_create_test_template_rejects_bad_opsec(client, admin_token):
r = client.post(
"/api/v1/test-templates",
headers=_bearer(admin_token),
json={"name": "Bad", "opsec_level": "burner"},
)
assert r.status_code == 400
def test_list_test_templates_filter_by_tactic(client, admin_token):
_make_test(
client,
admin_token,
name="filterable-1",
mitre_tags=[{"kind": "tactic", "external_id": "TA0002"}],
)
r = client.get(
"/api/v1/test-templates?tactic=TA0002",
headers=_bearer(admin_token),
)
assert r.status_code == 200
body = r.get_json()
names = [it["name"] for it in body["items"]]
assert "filterable-1" in names
def test_list_test_templates_filter_by_opsec(client, admin_token):
_make_test(client, admin_token, name="high-opsec", opsec_level="high")
r = client.get(
"/api/v1/test-templates?opsec=high",
headers=_bearer(admin_token),
)
assert r.status_code == 200
names = [it["name"] for it in r.get_json()["items"]]
assert "high-opsec" in names
assert all(it["opsec_level"] == "high" for it in r.get_json()["items"])
def test_list_test_templates_filter_by_tag(client, admin_token):
_make_test(client, admin_token, name="tagged-fast", tags=["fast", "phish"])
r = client.get(
"/api/v1/test-templates?tag=phish",
headers=_bearer(admin_token),
)
assert r.status_code == 200
names = [it["name"] for it in r.get_json()["items"]]
assert "tagged-fast" in names
def test_list_test_templates_search_q(client, admin_token):
_make_test(client, admin_token, name="unique-token-azertyuiop")
r = client.get(
"/api/v1/test-templates?q=AZERTYUIOP", # case-insensitive
headers=_bearer(admin_token),
)
assert r.status_code == 200
names = [it["name"] for it in r.get_json()["items"]]
assert "unique-token-azertyuiop" in names
def test_update_test_template_replaces_mitre_tags(client, admin_token):
body = _make_test(
client,
admin_token,
name="to-update",
mitre_tags=[{"kind": "tactic", "external_id": "TA0001"}],
)
r = client.put(
f"/api/v1/test-templates/{body['id']}",
headers=_bearer(admin_token),
json={"mitre_tags": [{"kind": "technique", "external_id": "T1059"}]},
)
assert r.status_code == 200, r.get_data(as_text=True)
updated = r.get_json()
kinds = [(t["kind"], t["external_id"]) for t in updated["mitre_tags"]]
assert kinds == [("technique", "T1059")]
def test_update_test_template_partial_keeps_unset_fields(client, admin_token):
body = _make_test(
client,
admin_token,
name="partial-update",
opsec_level="low",
tags=["a", "b"],
)
r = client.put(
f"/api/v1/test-templates/{body['id']}",
headers=_bearer(admin_token),
json={"name": "renamed"},
)
assert r.status_code == 200
updated = r.get_json()
assert updated["name"] == "renamed"
assert updated["opsec_level"] == "low" # untouched
assert set(updated["tags"]) == {"a", "b"} # untouched
def test_soft_delete_then_list_hides_by_default(client, admin_token):
body = _make_test(client, admin_token, name="to-be-deleted")
r = client.delete(
f"/api/v1/test-templates/{body['id']}", headers=_bearer(admin_token)
)
assert r.status_code == 200
r2 = client.get("/api/v1/test-templates", headers=_bearer(admin_token))
names = [it["name"] for it in r2.get_json()["items"]]
assert "to-be-deleted" not in names
# And reappears with include_deleted=true
r3 = client.get(
"/api/v1/test-templates?include_deleted=true",
headers=_bearer(admin_token),
)
names3 = [it["name"] for it in r3.get_json()["items"]]
assert "to-be-deleted" in names3
def test_read_perm_required(client, admin_token):
"""A user without `test_template.read` gets 403."""
_, eve_token = _bootstrap_user_without_perms(client, admin_token, "eve-noperm")
r = client.get("/api/v1/test-templates", headers=_bearer(eve_token))
assert r.status_code == 403
def test_write_perm_required(client, admin_token):
"""A user with only `test_template.read` cannot create.
Bootstrap path: create a dedicated group via the admin API, bind only the
`test_template.read` perm, then invite a user pre-assigned to that group.
"""
# 1. Create the read-only group + bind the single perm.
grp = client.post(
"/api/v1/groups",
headers=_bearer(admin_token),
json={"name": f"tpl-reader-{secrets.token_hex(2)}"},
).get_json()
r_set = client.put(
f"/api/v1/groups/{grp['id']}/permissions",
headers=_bearer(admin_token),
json={"codes": ["test_template.read"]},
)
assert r_set.status_code == 200, r_set.get_data(as_text=True)
# 2. Invite a user already attached to that group.
email = _unique_email("alice-readonly")
password = "ReaderPass1234!"
inv = client.post(
"/api/v1/invitations",
headers=_bearer(admin_token),
json={"email_hint": email, "group_ids": [grp["id"]]},
).get_json()
client.post(
f"/api/v1/invitations/accept/{inv['token']}",
json={"email": email, "password": password},
)
token = _login(client, email, password)
r = client.get("/api/v1/test-templates", headers=_bearer(token))
assert r.status_code == 200, r.get_data(as_text=True)
r2 = client.post(
"/api/v1/test-templates", headers=_bearer(token), json={"name": "X"}
)
assert r2.status_code == 403
# === scenario_template CRUD =================================================
def test_create_scenario_with_ordered_tests(client, admin_token):
a = _make_test(client, admin_token, name="scn-a")
b = _make_test(client, admin_token, name="scn-b")
c = _make_test(client, admin_token, name="scn-c")
r = client.post(
"/api/v1/scenario-templates",
headers=_bearer(admin_token),
json={
"name": "phishing-flow",
"description": "click → exec → persist",
"test_template_ids": [a["id"], b["id"], c["id"]],
},
)
assert r.status_code == 201, r.get_data(as_text=True)
body = r.get_json()
assert body["tests_count"] == 3
assert [t["position"] for t in body["tests"]] == [0, 1, 2]
assert [t["test_template_name"] for t in body["tests"]] == ["scn-a", "scn-b", "scn-c"]
def test_reorder_scenario_tests(client, admin_token):
a = _make_test(client, admin_token, name="reord-a")
b = _make_test(client, admin_token, name="reord-b")
c = _make_test(client, admin_token, name="reord-c")
created = client.post(
"/api/v1/scenario-templates",
headers=_bearer(admin_token),
json={
"name": "reorder-me",
"test_template_ids": [a["id"], b["id"], c["id"]],
},
).get_json()
# Reverse order.
r = client.put(
f"/api/v1/scenario-templates/{created['id']}/tests",
headers=_bearer(admin_token),
json={"test_template_ids": [c["id"], b["id"], a["id"]]},
)
assert r.status_code == 200
after = r.get_json()
assert [t["test_template_name"] for t in after["tests"]] == ["reord-c", "reord-b", "reord-a"]
# Re-reading via GET yields the same order — confirms persistence.
fresh = client.get(
f"/api/v1/scenario-templates/{created['id']}", headers=_bearer(admin_token)
).get_json()
assert [t["test_template_name"] for t in fresh["tests"]] == ["reord-c", "reord-b", "reord-a"]
def test_scenario_rejects_unknown_test_id(client, admin_token):
r = client.post(
"/api/v1/scenario-templates",
headers=_bearer(admin_token),
json={
"name": "bad",
"test_template_ids": ["00000000-0000-0000-0000-000000000000"],
},
)
assert r.status_code == 400
assert r.get_json()["error"] == "unknown_test_template"
def test_scenario_rejects_soft_deleted_test_on_create(client, admin_token):
a = _make_test(client, admin_token, name="will-be-deleted")
client.delete(f"/api/v1/test-templates/{a['id']}", headers=_bearer(admin_token))
r = client.post(
"/api/v1/scenario-templates",
headers=_bearer(admin_token),
json={"name": "linked", "test_template_ids": [a["id"]]},
)
assert r.status_code == 400
assert r.get_json()["error"] == "unknown_test_template"
def test_scenario_surfaces_soft_deleted_test_after_link(client, admin_token):
"""Once linked, a test can be soft-deleted without breaking the scenario —
the join row stays and the API flags the test as deleted."""
a = _make_test(client, admin_token, name="linked-then-deleted")
sc = client.post(
"/api/v1/scenario-templates",
headers=_bearer(admin_token),
json={"name": "survives", "test_template_ids": [a["id"]]},
).get_json()
client.delete(f"/api/v1/test-templates/{a['id']}", headers=_bearer(admin_token))
fresh = client.get(
f"/api/v1/scenario-templates/{sc['id']}", headers=_bearer(admin_token)
).get_json()
assert fresh["tests"][0]["test_template_deleted"] is True
def test_scenario_soft_delete(client, admin_token):
sc = client.post(
"/api/v1/scenario-templates",
headers=_bearer(admin_token),
json={"name": "doomed-scn"},
).get_json()
r = client.delete(
f"/api/v1/scenario-templates/{sc['id']}", headers=_bearer(admin_token)
)
assert r.status_code == 200
names = [
it["name"]
for it in client.get(
"/api/v1/scenario-templates", headers=_bearer(admin_token)
).get_json()["items"]
]
assert "doomed-scn" not in names
def test_scenario_perm_required(client, admin_token):
_, eve_token = _bootstrap_user_without_perms(client, admin_token, "scn-eve")
r = client.get("/api/v1/scenario-templates", headers=_bearer(eve_token))
assert r.status_code == 403
fix(m5): post-review pass — AND filter, advisory lock, N+1, item caps, mutation cache Spec-reviewer + code-reviewer findings applied: Must-fix - Filter combinator AND-semantics: tactic+technique+subtechnique now intersect (one IN subquery per facet) instead of being pooled into one OR. Reviewers flagged both the wrong default semantics and the theoretical UUID-collision risk of pooling tactic/technique/sub UUIDs into a shared list across three columns. - Front-end mutation cache hygiene: updateMeta + setTests both `onSettled: invalidate` so a partial failure leaves the cache consistent. Should-fix - Per-scenario pg_advisory_xact_lock on set_scenario_tests — serialises concurrent reorders, mirrors M4 /mitre/sync pattern. - Backend/front consistency on duplicate tests in a scenario: the UNIQUE(scenario_id, position) constraint already allows the same test_template multiple times (chained ops), so the catalogue picker no longer excludes already-picked items. Nice-to-have - N+1 eradicated in test_template view rendering: _to_views_batch builds {uuid → MitreRow} maps in 3 queries up-front; list endpoint now issues 4 queries total regardless of list size. - Wire-level item length caps on tags (64) and expected_iocs (255) via Annotated[str, StringConstraints(...)] — returns 400 instead of bubbling up StringDataRightTruncation. - 4 new pytest covering the AND-filter, extra="forbid" rejection, empty mitre_tags clearing, and the 65-char tag cap. Total now 81 pytest + 38 e2e pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 20:05:00 +02:00
# === Post-review fixes ======================================================
def test_list_filter_combines_facets_with_and_semantics(client, admin_token):
"""A template tagged only `TA0002` is NOT in `?tactic=TA0002&technique=T1059`.
Pre-fix the OR-combined query would return it. AND-combined semantics
(one IN subquery per facet) restrict the set to templates matching ALL
requested facets.
"""
a = _make_test(
client,
admin_token,
name="and-tactic-only",
mitre_tags=[{"kind": "tactic", "external_id": "TA0002"}],
)
b = _make_test(
client,
admin_token,
name="and-both-tags",
mitre_tags=[
{"kind": "tactic", "external_id": "TA0002"},
{"kind": "technique", "external_id": "T1059"},
],
)
r = client.get(
"/api/v1/test-templates?tactic=TA0002&technique=T1059",
headers=_bearer(admin_token),
)
assert r.status_code == 200
names = [it["name"] for it in r.get_json()["items"]]
assert "and-both-tags" in names
assert "and-tactic-only" not in names
_ = a, b # silence unused vars from linter
def test_create_test_template_rejects_extra_fields(client, admin_token):
"""`model_config = {"extra": "forbid"}` — unknown fields must 400."""
r = client.post(
"/api/v1/test-templates",
headers=_bearer(admin_token),
json={"name": "extra-test", "rogue_field": "smuggled"},
)
assert r.status_code == 400
def test_update_test_template_explicit_empty_mitre_clears(client, admin_token):
"""`PUT { mitre_tags: [] }` is an explicit clear, not a no-op."""
body = _make_test(
client,
admin_token,
name="clear-tags",
mitre_tags=[{"kind": "technique", "external_id": "T1059"}],
)
assert len(body["mitre_tags"]) == 1
r = client.put(
f"/api/v1/test-templates/{body['id']}",
headers=_bearer(admin_token),
json={"mitre_tags": []},
)
assert r.status_code == 200
assert r.get_json()["mitre_tags"] == []
def test_tag_item_length_capped_at_64(client, admin_token):
"""Individual `tags` items must be ≤ 64 chars at the wire layer."""
long_tag = "x" * 65
r = client.post(
"/api/v1/test-templates",
headers=_bearer(admin_token),
json={"name": "long-tag", "tags": [long_tag]},
)
assert r.status_code == 400