Files
Metamorph/backend/tests/test_mitre.py

454 lines
18 KiB
Python
Raw Permalink Normal View History

"""Integration tests for M4: STIX parser + seed + /mitre/* endpoints.
Uses a minimal hand-crafted STIX bundle (no network) so the parser logic and
the upsert semantics can be exercised deterministically.
"""
from __future__ import annotations
import json
import secrets
from pathlib import Path
import pytest
from sqlalchemy import text
from app.core.install_token import regenerate_install_token
from app.main import create_app
from app.services import mitre_seed as mitre_svc
def _truncate_all(engine):
with engine.begin() as conn:
conn.execute(
text(
"TRUNCATE users, refresh_tokens, invitations, invitation_groups, "
"user_groups, group_permissions, permissions, settings, groups, "
"mitre_subtechniques, mitre_technique_tactics, mitre_techniques, "
"mitre_tactics RESTART IDENTITY CASCADE"
)
)
@pytest.fixture(scope="module")
def app(db_engine_or_skip):
_truncate_all(db_engine_or_skip)
flask_app = create_app()
flask_app.config.update(TESTING=True)
return flask_app
@pytest.fixture()
def client(app):
return app.test_client()
def _unique_email(prefix: str) -> str:
return f"{prefix}-{secrets.token_hex(4)}@metamorph.local"
@pytest.fixture(scope="module")
def admin_credentials(app, db_engine_or_skip):
"""Bootstrap a fresh admin once for the whole module."""
token = regenerate_install_token()
email = _unique_email("admin")
password = "AdminPass1234!"
with app.test_client() as c:
r = c.post(
"/api/v1/setup",
json={"install_token": token, "email": email, "password": password},
)
assert r.status_code == 201, r.get_data(as_text=True)
return {"email": email, "password": password, "user_id": r.get_json()["user_id"]}
def _login(client, email: str, password: str) -> str:
r = client.post("/api/v1/auth/login", json={"email": email, "password": password})
assert r.status_code == 200, r.get_data(as_text=True)
return r.get_json()["access_token"]
# === Fixture STIX bundle =====================================================
MINIMAL_BUNDLE = {
"type": "bundle",
"id": "bundle--00000000-0000-0000-0000-000000000001",
"spec_version": "2.1",
"objects": [
# Tactic 1 — kept
{
"type": "x-mitre-tactic",
"id": "x-mitre-tactic--ta0001",
"name": "Initial Access",
"description": "Get a foothold.",
"x_mitre_shortname": "initial-access",
"external_references": [
{
"source_name": "mitre-attack",
"external_id": "TA0001",
"url": "https://attack.mitre.org/tactics/TA0001/",
}
],
},
# Tactic 2 — kept
{
"type": "x-mitre-tactic",
"id": "x-mitre-tactic--ta0002",
"name": "Execution",
"description": "Run code.",
"x_mitre_shortname": "execution",
"external_references": [
{
"source_name": "mitre-attack",
"external_id": "TA0002",
"url": "https://attack.mitre.org/tactics/TA0002/",
}
],
},
# Revoked tactic — must be skipped
{
"type": "x-mitre-tactic",
"id": "x-mitre-tactic--ta0099",
"name": "Doomed",
"x_mitre_shortname": "doomed",
"revoked": True,
"external_references": [
{"source_name": "mitre-attack", "external_id": "TA0099"}
],
},
# Technique T1059 covers both tactics
{
"type": "attack-pattern",
"id": "attack-pattern--t1059",
"name": "Command and Scripting Interpreter",
"description": "Use shells.",
"kill_chain_phases": [
{"kill_chain_name": "mitre-attack", "phase_name": "initial-access"},
{"kill_chain_name": "mitre-attack", "phase_name": "execution"},
],
"external_references": [
{
"source_name": "mitre-attack",
"external_id": "T1059",
"url": "https://attack.mitre.org/techniques/T1059/",
}
],
},
# Technique T1078 only initial-access
{
"type": "attack-pattern",
"id": "attack-pattern--t1078",
"name": "Valid Accounts",
"description": "Use legit creds.",
"kill_chain_phases": [
{"kill_chain_name": "mitre-attack", "phase_name": "initial-access"},
],
"external_references": [
{
"source_name": "mitre-attack",
"external_id": "T1078",
"url": "https://attack.mitre.org/techniques/T1078/",
}
],
},
# Deprecated technique — skipped
{
"type": "attack-pattern",
"id": "attack-pattern--t1190",
"name": "Exploit Public-Facing Application",
"x_mitre_deprecated": True,
"external_references": [
{"source_name": "mitre-attack", "external_id": "T1190"}
],
},
# Sub-technique of T1059
{
"type": "attack-pattern",
"id": "attack-pattern--t1059-001",
"name": "PowerShell",
"description": "Windows shell.",
"x_mitre_is_subtechnique": True,
"external_references": [
{
"source_name": "mitre-attack",
"external_id": "T1059.001",
"url": "https://attack.mitre.org/techniques/T1059/001/",
}
],
},
# Relationship attaching the sub to its parent
{
"type": "relationship",
"id": "relationship--rel1",
"relationship_type": "subtechnique-of",
"source_ref": "attack-pattern--t1059-001",
"target_ref": "attack-pattern--t1059",
},
],
}
@pytest.fixture()
def fixture_bundle_path(tmp_path: Path) -> Path:
path = tmp_path / "minimal-stix.json"
path.write_text(json.dumps(MINIMAL_BUNDLE))
return path
# === Parser unit tests =======================================================
def test_parser_extracts_active_objects(fixture_bundle_path):
parsed = mitre_svc.parse_bundle(fixture_bundle_path)
assert len(parsed.tactics) == 2 # TA0001 + TA0002 (TA0099 revoked)
assert {t["external_id"] for t in parsed.tactics} == {"TA0001", "TA0002"}
assert len(parsed.techniques) == 2 # T1059 + T1078 (T1190 deprecated)
assert {t["external_id"] for t in parsed.techniques} == {"T1059", "T1078"}
assert len(parsed.subtechniques) == 1
sb = parsed.subtechniques[0]
assert sb["external_id"] == "T1059.001"
assert sb["parent_stix_id"] == "attack-pattern--t1059"
# === Seed integration tests ==================================================
def test_seed_against_fixture(app, fixture_bundle_path):
result = mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
assert result.tactics_upserted == 2
assert result.techniques_upserted == 2
assert result.subtechniques_upserted == 1
assert result.subtechniques_skipped_orphan == 0
assert result.technique_tactic_links == 3 # T1059→TA0001, T1059→TA0002, T1078→TA0001
def test_seed_is_idempotent(app, fixture_bundle_path):
"""Running twice yields the same row counts and no SQL errors."""
first = mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
second = mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
assert (first.tactics_upserted, first.techniques_upserted, first.subtechniques_upserted) == (
second.tactics_upserted,
second.techniques_upserted,
second.subtechniques_upserted,
)
def test_seed_persists_setting(app, fixture_bundle_path):
"""settings table records the last sync timestamp + source URL."""
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
status = mitre_svc.read_status()
assert status["last_sync"] is not None
# We seeded from a local path so version is None and source_url is the path string.
assert status["source_url"] == str(fixture_bundle_path)
assert status["version"] is None # only set when source == MITRE_DEFAULT_URL
test(m4): cover the new security guards + pin e2e to exact MITRE v19 counts - 5 new pytest covering paths the code-reviewer flagged as un-asserted: * `test_seed_refuses_file_url` — `file://` scheme rejected before I/O (was the SSRF-to-local-FS vector). * `test_seed_refuses_disallowed_https_host` — non-allowlisted HTTPS host rejected with `MitreSourceForbidden`. * `test_seed_refuses_custom_url_without_sha` — end-to-end guard that `seed_mitre(source=<custom URL>, expected_sha256=None, allow_unverified=False)` raises `MitreSeedError`. * `test_dotted_id_fallback_resolves_orphan_subtechnique` — STIX bundle without `relationship[subtechnique-of]` still attaches T1059.001 to T1059 via the dotted-id convention. * `test_seed_clears_version_when_source_is_not_default` — seed from a local path leaves `settings.mitre_version` NULL (no stale pin). - Existing `test_checksum_mismatch_aborts` reworked to monkey-patch `_ensure_host_allowed` so `file://` can drive the test past the allowlist gate (was relying on file:// being accepted before CR1). - Removed unused `uuid` import. - e2e: assertions on `tactics_upserted`/`techniques_upserted`/ `subtechniques_upserted` switched from `>= 14/180/400` thresholds to `=== 15/222/475` exact counts pinned to MITRE Enterprise v19.0 + 0 orphans. Catches parser regressions that would silently include revoked rows. Bump alongside MITRE_VERSION when re-pinning. - e2e: `Math.random()` → `crypto.randomUUID().slice(0, 8)` for unique test-run emails (collision-safe across parallel CI workers). DoD: 58 pytest pass (was 53), 34 Playwright pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 19:19:34 +02:00
def test_checksum_mismatch_aborts(tmp_path, monkeypatch):
"""A wrong sha256 triggers MitreChecksumMismatch and skips DB writes.
We monkey-patch the allowlist to accept `file://` for the duration of the
test file:// is rejected in production by `_ensure_host_allowed` (cf.
`test_seed_refuses_file_url`), but we need to drive `_download` past that
gate to exercise the sha256 path.
"""
path = tmp_path / "tiny.json"
path.write_text(json.dumps(MINIMAL_BUNDLE))
test(m4): cover the new security guards + pin e2e to exact MITRE v19 counts - 5 new pytest covering paths the code-reviewer flagged as un-asserted: * `test_seed_refuses_file_url` — `file://` scheme rejected before I/O (was the SSRF-to-local-FS vector). * `test_seed_refuses_disallowed_https_host` — non-allowlisted HTTPS host rejected with `MitreSourceForbidden`. * `test_seed_refuses_custom_url_without_sha` — end-to-end guard that `seed_mitre(source=<custom URL>, expected_sha256=None, allow_unverified=False)` raises `MitreSeedError`. * `test_dotted_id_fallback_resolves_orphan_subtechnique` — STIX bundle without `relationship[subtechnique-of]` still attaches T1059.001 to T1059 via the dotted-id convention. * `test_seed_clears_version_when_source_is_not_default` — seed from a local path leaves `settings.mitre_version` NULL (no stale pin). - Existing `test_checksum_mismatch_aborts` reworked to monkey-patch `_ensure_host_allowed` so `file://` can drive the test past the allowlist gate (was relying on file:// being accepted before CR1). - Removed unused `uuid` import. - e2e: assertions on `tactics_upserted`/`techniques_upserted`/ `subtechniques_upserted` switched from `>= 14/180/400` thresholds to `=== 15/222/475` exact counts pinned to MITRE Enterprise v19.0 + 0 orphans. Catches parser regressions that would silently include revoked rows. Bump alongside MITRE_VERSION when re-pinning. - e2e: `Math.random()` → `crypto.randomUUID().slice(0, 8)` for unique test-run emails (collision-safe across parallel CI workers). DoD: 58 pytest pass (was 53), 34 Playwright pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 19:19:34 +02:00
monkeypatch.setattr(mitre_svc, "_ensure_host_allowed", lambda _: None)
bogus = "0" * 64
with pytest.raises(mitre_svc.MitreChecksumMismatch):
mitre_svc._download(
f"file://{path}", tmp_path / "out.json", expected_sha256=bogus
)
# === API endpoint tests ======================================================
def test_list_tactics_requires_auth(app, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
r = c.get("/api/v1/mitre/tactics")
assert r.status_code == 401
def test_list_tactics_returns_seeded(app, admin_credentials, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
access = _login(c, admin_credentials["email"], admin_credentials["password"])
r = c.get(
"/api/v1/mitre/tactics", headers={"Authorization": f"Bearer {access}"}
)
assert r.status_code == 200
body = r.get_json()
assert body["total"] == 2
ids = [t["external_id"] for t in body["items"]]
assert "TA0001" in ids and "TA0002" in ids
def test_filter_techniques_by_tactic(app, admin_credentials, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
access = _login(c, admin_credentials["email"], admin_credentials["password"])
r = c.get(
"/api/v1/mitre/techniques?tactic=TA0002",
headers={"Authorization": f"Bearer {access}"},
)
assert r.status_code == 200
body = r.get_json()
# Only T1059 covers TA0002 (execution); T1078 covers initial-access only.
ext_ids = [t["external_id"] for t in body["items"]]
assert ext_ids == ["T1059"]
def test_subtechniques_listed_under_parent(app, admin_credentials, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
access = _login(c, admin_credentials["email"], admin_credentials["password"])
r = c.get(
"/api/v1/mitre/subtechniques?technique=T1059",
headers={"Authorization": f"Bearer {access}"},
)
assert r.status_code == 200
body = r.get_json()
ext_ids = [t["external_id"] for t in body["items"]]
assert ext_ids == ["T1059.001"]
def test_status_endpoint(app, admin_credentials, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
access = _login(c, admin_credentials["email"], admin_credentials["password"])
r = c.get("/api/v1/mitre/status", headers={"Authorization": f"Bearer {access}"})
assert r.status_code == 200
body = r.get_json()
assert body["last_sync"] is not None
assert body["default_url"].startswith("https://")
assert body["default_version"]
def test_sync_endpoint_requires_perm(app, admin_credentials, fixture_bundle_path):
"""A non-admin without mitre.sync gets 403."""
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
# Bootstrap a no-perm user via invitation.
admin_access = _login(c, admin_credentials["email"], admin_credentials["password"])
eve_email = _unique_email("eve")
inv = c.post(
"/api/v1/invitations",
headers={"Authorization": f"Bearer {admin_access}"},
json={"email_hint": eve_email},
)
token = inv.get_json()["token"]
c.post(
f"/api/v1/invitations/accept/{token}",
json={"email": eve_email, "password": "EvePass1234!"},
)
eve_access = _login(c, eve_email, "EvePass1234!")
r = c.post(
"/api/v1/mitre/sync", headers={"Authorization": f"Bearer {eve_access}"}
)
assert r.status_code == 403
def test_search_filter_on_name(app, admin_credentials, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
access = _login(c, admin_credentials["email"], admin_credentials["password"])
r = c.get(
"/api/v1/mitre/techniques?q=valid",
headers={"Authorization": f"Bearer {access}"},
)
assert r.status_code == 200
ext_ids = [t["external_id"] for t in r.get_json()["items"]]
assert ext_ids == ["T1078"]
refactor(m4): flatten the MITRE picker into the attack.mitre.org matrix The hierarchical 3-column drill-down was hard to scan and forced a stateful walk per tag. Replaced with a flat, columns-as-tactics matrix that mirrors attack.mitre.org/# — every cell is a one-click select target, with inline sub-technique expand via a `+N` chevron. - New endpoint GET /api/v1/mitre/matrix returns the full grid (tactics → techniques → sub-techniques nested) in a single ~55 KB response, so the SPA renders the whole matrix without firing 15 parallel queries. Two pytest tests added (nested structure + auth required). - MitreTagPicker.tsx rewritten as a horizontal-scrolling matrix: - Click a tactic header → select the tactic (cyan filled). - Click a technique cell → select the technique (orange filled). - Click the `+N` chevron → expand sub-techniques inline within the column. - Click a sub-technique → select (purple filled). - Single Filter field matches on external_id or name across all kinds. - Selection chips at the top, clickable to remove. - `aria-pressed` on every clickable cell for screen readers and Playwright. - e2e test updated to walk the new flow (click cell → assert aria-pressed, expand chevron, click sub, verify chip + JSON preview, filter to T1078). - Spec §F2 + §F12 + todo.md M4 entry updated to make the matrix layout the canonical UI for MITRE tagging (so future spec-reviewer passes accept it). - testing-m4.md walkthrough rewritten for the flat picker. DoD post-refactor: make test-api → 53 passed (was 51), make e2e → 34 passed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 18:32:20 +02:00
def test_matrix_endpoint_returns_nested_grid(app, admin_credentials, fixture_bundle_path):
"""GET /mitre/matrix returns the flat tactic→technique→subtechnique grid."""
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
access = _login(c, admin_credentials["email"], admin_credentials["password"])
r = c.get("/api/v1/mitre/matrix", headers={"Authorization": f"Bearer {access}"})
assert r.status_code == 200
body = r.get_json()
tactics = body["tactics"]
assert {t["external_id"] for t in tactics} == {"TA0001", "TA0002"}
# TA0001 has T1059 (multi-tactic) + T1078; T1059 carries its sub.
ta0001 = next(t for t in tactics if t["external_id"] == "TA0001")
techs = {t["external_id"]: t for t in ta0001["techniques"]}
assert set(techs.keys()) == {"T1059", "T1078"}
assert techs["T1059"]["subtechniques"][0]["external_id"] == "T1059.001"
assert techs["T1078"]["subtechniques"] == []
# TA0002 only carries T1059 (no T1078).
ta0002 = next(t for t in tactics if t["external_id"] == "TA0002")
assert [t["external_id"] for t in ta0002["techniques"]] == ["T1059"]
def test_matrix_endpoint_requires_auth(app, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
assert c.get("/api/v1/mitre/matrix").status_code == 401
test(m4): cover the new security guards + pin e2e to exact MITRE v19 counts - 5 new pytest covering paths the code-reviewer flagged as un-asserted: * `test_seed_refuses_file_url` — `file://` scheme rejected before I/O (was the SSRF-to-local-FS vector). * `test_seed_refuses_disallowed_https_host` — non-allowlisted HTTPS host rejected with `MitreSourceForbidden`. * `test_seed_refuses_custom_url_without_sha` — end-to-end guard that `seed_mitre(source=<custom URL>, expected_sha256=None, allow_unverified=False)` raises `MitreSeedError`. * `test_dotted_id_fallback_resolves_orphan_subtechnique` — STIX bundle without `relationship[subtechnique-of]` still attaches T1059.001 to T1059 via the dotted-id convention. * `test_seed_clears_version_when_source_is_not_default` — seed from a local path leaves `settings.mitre_version` NULL (no stale pin). - Existing `test_checksum_mismatch_aborts` reworked to monkey-patch `_ensure_host_allowed` so `file://` can drive the test past the allowlist gate (was relying on file:// being accepted before CR1). - Removed unused `uuid` import. - e2e: assertions on `tactics_upserted`/`techniques_upserted`/ `subtechniques_upserted` switched from `>= 14/180/400` thresholds to `=== 15/222/475` exact counts pinned to MITRE Enterprise v19.0 + 0 orphans. Catches parser regressions that would silently include revoked rows. Bump alongside MITRE_VERSION when re-pinning. - e2e: `Math.random()` → `crypto.randomUUID().slice(0, 8)` for unique test-run emails (collision-safe across parallel CI workers). DoD: 58 pytest pass (was 53), 34 Playwright pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 19:19:34 +02:00
# === Security guards ==========================================================
def test_seed_refuses_file_url(tmp_path):
"""file:// (or any scheme outside the allowlist) is rejected — protects
against a privileged operator pivoting the in-container fetch to local
filesystem reads via the URL path."""
path = tmp_path / "bundle.json"
path.write_text(json.dumps(MINIMAL_BUNDLE))
with pytest.raises(mitre_svc.MitreSourceForbidden):
mitre_svc._download(f"file://{path}", tmp_path / "out.json")
def test_seed_refuses_disallowed_https_host(tmp_path):
"""An HTTPS URL outside MITRE_ALLOWED_HOSTS is rejected before any I/O.
Closes the SSRF surface (cloud metadata, internal mirrors)."""
with pytest.raises(mitre_svc.MitreSourceForbidden):
mitre_svc._download("https://attacker.example/bundle.json", tmp_path / "out.json")
def test_seed_refuses_custom_url_without_sha(tmp_path):
"""End-to-end refusal: even an allowlisted custom URL needs a sha or an
explicit allow_unverified=True."""
# Use the default URL with a different sha to simulate "custom" semantics
# without actually hitting the network: pass a different MITRE_DEFAULT_URL.
# The cleanest expression is to call seed_mitre with the same URL but no sha
# — but the default URL gets the default sha auto-set; we need to bypass.
with pytest.raises(mitre_svc.MitreSeedError):
mitre_svc.seed_mitre(
source="https://raw.githubusercontent.com/some-other-path/bundle.json",
expected_sha256=None,
allow_unverified=False,
)
def test_dotted_id_fallback_resolves_orphan_subtechnique(app, tmp_path):
"""When the STIX `subtechnique-of` relationship is missing, the parser
must fall back to the dotted convention (T1003.001 T1003)."""
bundle = json.loads(json.dumps(MINIMAL_BUNDLE)) # deep copy
# Strip the relationship object so the parent_stix_id lookup fails.
bundle["objects"] = [o for o in bundle["objects"] if o.get("type") != "relationship"]
bundle_path = tmp_path / "no-rel.json"
bundle_path.write_text(json.dumps(bundle))
result = mitre_svc.seed_mitre(source=bundle_path, expected_sha256=None)
# The fallback resolves T1059.001 → T1059 via the dotted-id pattern,
# so the subtechnique is still attached (no orphan).
assert result.subtechniques_upserted == 1
assert result.subtechniques_skipped_orphan == 0
def test_seed_clears_version_when_source_is_not_default(app, fixture_bundle_path):
"""A custom source must NULL `mitre_version` so /mitre/status doesn't lie
about a stale upstream pin."""
# First seed from the default URL would set version=19.0; here we seed from
# a local file path, which should write version=None.
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
assert mitre_svc.read_status()["version"] is None