test(m4): pytest parser + endpoints + e2e tag picker
- backend/tests/test_mitre.py: 12 integration tests using a hand-crafted minimal STIX bundle (no network in tests). Covers parser (revoked/deprecated skip, sub-technique parent linkage), seed idempotence, persisted settings, checksum mismatch path, all four read endpoints, perm enforcement on /mitre/sync, ILIKE search. - e2e/tests/m4-mitre.spec.ts: 6 Playwright tests against the live stack. beforeAll calls POST /mitre/sync once (real bundle, ~50 MB, ~1.1 s) then the suite validates tactics ≥14, T1003 has ≥5 sub-techniques, the picker walks tactic→technique→subtechnique with chip multi-select, and non-admin sees /mitre but no Sync card. - tasks/testing-m4.md: manual + automated checklist, air-gapped operator notes, volume-permission caveat for pre-existing root-owned volumes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
360
backend/tests/test_mitre.py
Normal file
360
backend/tests/test_mitre.py
Normal file
@@ -0,0 +1,360 @@
|
||||
"""Integration tests for M4: STIX parser + seed + /mitre/* endpoints.
|
||||
|
||||
Uses a minimal hand-crafted STIX bundle (no network) so the parser logic and
|
||||
the upsert semantics can be exercised deterministically.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import secrets
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import text
|
||||
|
||||
from app.core.install_token import regenerate_install_token
|
||||
from app.main import create_app
|
||||
from app.services import mitre_seed as mitre_svc
|
||||
|
||||
|
||||
def _truncate_all(engine):
|
||||
with engine.begin() as conn:
|
||||
conn.execute(
|
||||
text(
|
||||
"TRUNCATE users, refresh_tokens, invitations, invitation_groups, "
|
||||
"user_groups, group_permissions, permissions, settings, groups, "
|
||||
"mitre_subtechniques, mitre_technique_tactics, mitre_techniques, "
|
||||
"mitre_tactics RESTART IDENTITY CASCADE"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def app(db_engine_or_skip):
|
||||
_truncate_all(db_engine_or_skip)
|
||||
flask_app = create_app()
|
||||
flask_app.config.update(TESTING=True)
|
||||
return flask_app
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def client(app):
|
||||
return app.test_client()
|
||||
|
||||
|
||||
def _unique_email(prefix: str) -> str:
|
||||
return f"{prefix}-{secrets.token_hex(4)}@metamorph.local"
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def admin_credentials(app, db_engine_or_skip):
|
||||
"""Bootstrap a fresh admin once for the whole module."""
|
||||
token = regenerate_install_token()
|
||||
email = _unique_email("admin")
|
||||
password = "AdminPass1234!"
|
||||
with app.test_client() as c:
|
||||
r = c.post(
|
||||
"/api/v1/setup",
|
||||
json={"install_token": token, "email": email, "password": password},
|
||||
)
|
||||
assert r.status_code == 201, r.get_data(as_text=True)
|
||||
return {"email": email, "password": password, "user_id": r.get_json()["user_id"]}
|
||||
|
||||
|
||||
def _login(client, email: str, password: str) -> str:
|
||||
r = client.post("/api/v1/auth/login", json={"email": email, "password": password})
|
||||
assert r.status_code == 200, r.get_data(as_text=True)
|
||||
return r.get_json()["access_token"]
|
||||
|
||||
|
||||
# === Fixture STIX bundle =====================================================
|
||||
|
||||
|
||||
MINIMAL_BUNDLE = {
|
||||
"type": "bundle",
|
||||
"id": "bundle--00000000-0000-0000-0000-000000000001",
|
||||
"spec_version": "2.1",
|
||||
"objects": [
|
||||
# Tactic 1 — kept
|
||||
{
|
||||
"type": "x-mitre-tactic",
|
||||
"id": "x-mitre-tactic--ta0001",
|
||||
"name": "Initial Access",
|
||||
"description": "Get a foothold.",
|
||||
"x_mitre_shortname": "initial-access",
|
||||
"external_references": [
|
||||
{
|
||||
"source_name": "mitre-attack",
|
||||
"external_id": "TA0001",
|
||||
"url": "https://attack.mitre.org/tactics/TA0001/",
|
||||
}
|
||||
],
|
||||
},
|
||||
# Tactic 2 — kept
|
||||
{
|
||||
"type": "x-mitre-tactic",
|
||||
"id": "x-mitre-tactic--ta0002",
|
||||
"name": "Execution",
|
||||
"description": "Run code.",
|
||||
"x_mitre_shortname": "execution",
|
||||
"external_references": [
|
||||
{
|
||||
"source_name": "mitre-attack",
|
||||
"external_id": "TA0002",
|
||||
"url": "https://attack.mitre.org/tactics/TA0002/",
|
||||
}
|
||||
],
|
||||
},
|
||||
# Revoked tactic — must be skipped
|
||||
{
|
||||
"type": "x-mitre-tactic",
|
||||
"id": "x-mitre-tactic--ta0099",
|
||||
"name": "Doomed",
|
||||
"x_mitre_shortname": "doomed",
|
||||
"revoked": True,
|
||||
"external_references": [
|
||||
{"source_name": "mitre-attack", "external_id": "TA0099"}
|
||||
],
|
||||
},
|
||||
# Technique T1059 covers both tactics
|
||||
{
|
||||
"type": "attack-pattern",
|
||||
"id": "attack-pattern--t1059",
|
||||
"name": "Command and Scripting Interpreter",
|
||||
"description": "Use shells.",
|
||||
"kill_chain_phases": [
|
||||
{"kill_chain_name": "mitre-attack", "phase_name": "initial-access"},
|
||||
{"kill_chain_name": "mitre-attack", "phase_name": "execution"},
|
||||
],
|
||||
"external_references": [
|
||||
{
|
||||
"source_name": "mitre-attack",
|
||||
"external_id": "T1059",
|
||||
"url": "https://attack.mitre.org/techniques/T1059/",
|
||||
}
|
||||
],
|
||||
},
|
||||
# Technique T1078 only initial-access
|
||||
{
|
||||
"type": "attack-pattern",
|
||||
"id": "attack-pattern--t1078",
|
||||
"name": "Valid Accounts",
|
||||
"description": "Use legit creds.",
|
||||
"kill_chain_phases": [
|
||||
{"kill_chain_name": "mitre-attack", "phase_name": "initial-access"},
|
||||
],
|
||||
"external_references": [
|
||||
{
|
||||
"source_name": "mitre-attack",
|
||||
"external_id": "T1078",
|
||||
"url": "https://attack.mitre.org/techniques/T1078/",
|
||||
}
|
||||
],
|
||||
},
|
||||
# Deprecated technique — skipped
|
||||
{
|
||||
"type": "attack-pattern",
|
||||
"id": "attack-pattern--t1190",
|
||||
"name": "Exploit Public-Facing Application",
|
||||
"x_mitre_deprecated": True,
|
||||
"external_references": [
|
||||
{"source_name": "mitre-attack", "external_id": "T1190"}
|
||||
],
|
||||
},
|
||||
# Sub-technique of T1059
|
||||
{
|
||||
"type": "attack-pattern",
|
||||
"id": "attack-pattern--t1059-001",
|
||||
"name": "PowerShell",
|
||||
"description": "Windows shell.",
|
||||
"x_mitre_is_subtechnique": True,
|
||||
"external_references": [
|
||||
{
|
||||
"source_name": "mitre-attack",
|
||||
"external_id": "T1059.001",
|
||||
"url": "https://attack.mitre.org/techniques/T1059/001/",
|
||||
}
|
||||
],
|
||||
},
|
||||
# Relationship attaching the sub to its parent
|
||||
{
|
||||
"type": "relationship",
|
||||
"id": "relationship--rel1",
|
||||
"relationship_type": "subtechnique-of",
|
||||
"source_ref": "attack-pattern--t1059-001",
|
||||
"target_ref": "attack-pattern--t1059",
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def fixture_bundle_path(tmp_path: Path) -> Path:
|
||||
path = tmp_path / "minimal-stix.json"
|
||||
path.write_text(json.dumps(MINIMAL_BUNDLE))
|
||||
return path
|
||||
|
||||
|
||||
# === Parser unit tests =======================================================
|
||||
|
||||
|
||||
def test_parser_extracts_active_objects(fixture_bundle_path):
|
||||
parsed = mitre_svc.parse_bundle(fixture_bundle_path)
|
||||
assert len(parsed.tactics) == 2 # TA0001 + TA0002 (TA0099 revoked)
|
||||
assert {t["external_id"] for t in parsed.tactics} == {"TA0001", "TA0002"}
|
||||
assert len(parsed.techniques) == 2 # T1059 + T1078 (T1190 deprecated)
|
||||
assert {t["external_id"] for t in parsed.techniques} == {"T1059", "T1078"}
|
||||
assert len(parsed.subtechniques) == 1
|
||||
sb = parsed.subtechniques[0]
|
||||
assert sb["external_id"] == "T1059.001"
|
||||
assert sb["parent_stix_id"] == "attack-pattern--t1059"
|
||||
|
||||
|
||||
# === Seed integration tests ==================================================
|
||||
|
||||
|
||||
def test_seed_against_fixture(app, fixture_bundle_path):
|
||||
result = mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
|
||||
assert result.tactics_upserted == 2
|
||||
assert result.techniques_upserted == 2
|
||||
assert result.subtechniques_upserted == 1
|
||||
assert result.subtechniques_skipped_orphan == 0
|
||||
assert result.technique_tactic_links == 3 # T1059→TA0001, T1059→TA0002, T1078→TA0001
|
||||
|
||||
|
||||
def test_seed_is_idempotent(app, fixture_bundle_path):
|
||||
"""Running twice yields the same row counts and no SQL errors."""
|
||||
first = mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
|
||||
second = mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
|
||||
assert (first.tactics_upserted, first.techniques_upserted, first.subtechniques_upserted) == (
|
||||
second.tactics_upserted,
|
||||
second.techniques_upserted,
|
||||
second.subtechniques_upserted,
|
||||
)
|
||||
|
||||
|
||||
def test_seed_persists_setting(app, fixture_bundle_path):
|
||||
"""settings table records the last sync timestamp + source URL."""
|
||||
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
|
||||
status = mitre_svc.read_status()
|
||||
assert status["last_sync"] is not None
|
||||
# We seeded from a local path so version is None and source_url is the path string.
|
||||
assert status["source_url"] == str(fixture_bundle_path)
|
||||
assert status["version"] is None # only set when source == MITRE_DEFAULT_URL
|
||||
|
||||
|
||||
def test_checksum_mismatch_aborts(tmp_path):
|
||||
"""A wrong sha256 triggers MitreChecksumMismatch and skips DB writes."""
|
||||
path = tmp_path / "tiny.json"
|
||||
path.write_text(json.dumps(MINIMAL_BUNDLE))
|
||||
# Force the URL path so download() is invoked. We mock by passing a file:// URL.
|
||||
# Simpler: call _download() directly with a bogus hash.
|
||||
bogus = "0" * 64
|
||||
with pytest.raises(mitre_svc.MitreChecksumMismatch):
|
||||
mitre_svc._download(
|
||||
f"file://{path}", tmp_path / "out.json", expected_sha256=bogus
|
||||
)
|
||||
|
||||
|
||||
# === API endpoint tests ======================================================
|
||||
|
||||
|
||||
def test_list_tactics_requires_auth(app, fixture_bundle_path):
|
||||
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
|
||||
with app.test_client() as c:
|
||||
r = c.get("/api/v1/mitre/tactics")
|
||||
assert r.status_code == 401
|
||||
|
||||
|
||||
def test_list_tactics_returns_seeded(app, admin_credentials, fixture_bundle_path):
|
||||
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
|
||||
with app.test_client() as c:
|
||||
access = _login(c, admin_credentials["email"], admin_credentials["password"])
|
||||
r = c.get(
|
||||
"/api/v1/mitre/tactics", headers={"Authorization": f"Bearer {access}"}
|
||||
)
|
||||
assert r.status_code == 200
|
||||
body = r.get_json()
|
||||
assert body["total"] == 2
|
||||
ids = [t["external_id"] for t in body["items"]]
|
||||
assert "TA0001" in ids and "TA0002" in ids
|
||||
|
||||
|
||||
def test_filter_techniques_by_tactic(app, admin_credentials, fixture_bundle_path):
|
||||
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
|
||||
with app.test_client() as c:
|
||||
access = _login(c, admin_credentials["email"], admin_credentials["password"])
|
||||
r = c.get(
|
||||
"/api/v1/mitre/techniques?tactic=TA0002",
|
||||
headers={"Authorization": f"Bearer {access}"},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
body = r.get_json()
|
||||
# Only T1059 covers TA0002 (execution); T1078 covers initial-access only.
|
||||
ext_ids = [t["external_id"] for t in body["items"]]
|
||||
assert ext_ids == ["T1059"]
|
||||
|
||||
|
||||
def test_subtechniques_listed_under_parent(app, admin_credentials, fixture_bundle_path):
|
||||
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
|
||||
with app.test_client() as c:
|
||||
access = _login(c, admin_credentials["email"], admin_credentials["password"])
|
||||
r = c.get(
|
||||
"/api/v1/mitre/subtechniques?technique=T1059",
|
||||
headers={"Authorization": f"Bearer {access}"},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
body = r.get_json()
|
||||
ext_ids = [t["external_id"] for t in body["items"]]
|
||||
assert ext_ids == ["T1059.001"]
|
||||
|
||||
|
||||
def test_status_endpoint(app, admin_credentials, fixture_bundle_path):
|
||||
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
|
||||
with app.test_client() as c:
|
||||
access = _login(c, admin_credentials["email"], admin_credentials["password"])
|
||||
r = c.get("/api/v1/mitre/status", headers={"Authorization": f"Bearer {access}"})
|
||||
assert r.status_code == 200
|
||||
body = r.get_json()
|
||||
assert body["last_sync"] is not None
|
||||
assert body["default_url"].startswith("https://")
|
||||
assert body["default_version"]
|
||||
|
||||
|
||||
def test_sync_endpoint_requires_perm(app, admin_credentials, fixture_bundle_path):
|
||||
"""A non-admin without mitre.sync gets 403."""
|
||||
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
|
||||
with app.test_client() as c:
|
||||
# Bootstrap a no-perm user via invitation.
|
||||
admin_access = _login(c, admin_credentials["email"], admin_credentials["password"])
|
||||
eve_email = _unique_email("eve")
|
||||
inv = c.post(
|
||||
"/api/v1/invitations",
|
||||
headers={"Authorization": f"Bearer {admin_access}"},
|
||||
json={"email_hint": eve_email},
|
||||
)
|
||||
token = inv.get_json()["token"]
|
||||
c.post(
|
||||
f"/api/v1/invitations/accept/{token}",
|
||||
json={"email": eve_email, "password": "EvePass1234!"},
|
||||
)
|
||||
eve_access = _login(c, eve_email, "EvePass1234!")
|
||||
r = c.post(
|
||||
"/api/v1/mitre/sync", headers={"Authorization": f"Bearer {eve_access}"}
|
||||
)
|
||||
assert r.status_code == 403
|
||||
|
||||
|
||||
def test_search_filter_on_name(app, admin_credentials, fixture_bundle_path):
|
||||
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
|
||||
with app.test_client() as c:
|
||||
access = _login(c, admin_credentials["email"], admin_credentials["password"])
|
||||
r = c.get(
|
||||
"/api/v1/mitre/techniques?q=valid",
|
||||
headers={"Authorization": f"Bearer {access}"},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
ext_ids = [t["external_id"] for t in r.get_json()["items"]]
|
||||
assert ext_ids == ["T1078"]
|
||||
Reference in New Issue
Block a user