Files
Metamorph/backend/app/services/mitre_seed.py
Knacky 63b48addc0 fix(m4): code-review pass — SSRF allowlist + advisory lock + typed contract
Six post-code-review fixes, applied before opening the PR per project
workflow (spec-review + code-review both gate the merge):

1. SSRF allowlist on `/mitre/sync`. Host must be in MITRE_ALLOWED_HOSTS
   (defaults to `raw.githubusercontent.com`, env-overridable). Closes "admin
   holding `mitre.sync` pivots api container at 169.254.169.254 / internal
   mirrors" via a typo'd URL. New `MitreSourceForbidden` → 400
   `source_forbidden`; checked at the top of `_download()` so it kicks in
   before any I/O.

2. `pg_advisory_xact_lock(hashtext('mitre.seed'))` at the top of the seed
   transaction. Two concurrent `/mitre/sync` requests now serialise across
   the DELETE+INSERT of `mitre_technique_tactics`; previously they could
   both wipe the M2M and one would fail the unique constraint on re-insert.

3. Typed SyncResult contract. Pydantic `SyncResultOut` on the Flask side
   `model_validate`s the dict before returning — single source of truth
   for the response shape, mirrored by a `MitreSyncResult` TS interface
   (next commit). The `as Record<string, unknown>` + `as { duration_ms }`
   cast in MitrePage is gone.

4. N+1 in dotted sub-technique fallback removed. Built
   `{external_id → technique_id}` once at function entry. Currently a
   no-op against MITRE official (0 orphans), but a latent footgun for
   partial / older bundles.

5. `SETTING_VERSION` cleared explicitly when `source != MITRE_DEFAULT_URL`.
   Previously it kept the stale pin label, so `/mitre/status` lied after
   a custom-URL re-sync.

6. `/mitre/sync` 500s no longer echo `str(e)` to the client — URLError /
   psycopg / Pydantic text now lives in the JSON log only. Public response
   stays `{"error": "internal_error"}`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 19:19:11 +02:00

516 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""MITRE ATT&CK Enterprise seed + sync.
Parses a STIX 2.1 bundle into the `mitre_*` tables. Idempotent: re-running
upserts on `external_id`, refreshes name/description/url, and re-applies the
technique↔tactic mapping. Sub-techniques whose parent is missing in the
bundle are skipped (with a WARNING log).
Defaults pin a specific Enterprise release (see `MITRE_DEFAULT_*`). The pin
is honored by the CLI (`flask metamorph seed-mitre`) and by the
`POST /mitre/sync` admin endpoint; both accept a `--source` / `source_url`
override for air-gapped operators.
The bundle is downloaded with `urllib.request` (stdlib — no extra dep) and
cached at `MITRE_BUNDLE_CACHE_PATH` (default `/data/mitre/<basename>.json`).
Pass an absolute path as `source` to bypass the network entirely.
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import urllib.parse
import urllib.request
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable
from sqlalchemy import delete, select, text as sql_text
from app.db.session import session_scope
from app.models.mitre import (
MitreSubtechnique,
MitreTactic,
MitreTechnique,
MitreTechniqueTactic,
)
from app.models.setting import Setting
log = logging.getLogger("metamorph.mitre.seed")
# === Default pin =============================================================
#
# MITRE publishes versioned bundles at
# `https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/enterprise-attack/enterprise-attack-<X.Y>.json`.
# Update these three constants in lock-step when bumping the pin. The SHA256
# is verified against the downloaded bytes — a mismatch aborts the seed.
#
MITRE_VERSION = "19.0"
MITRE_DEFAULT_URL = (
"https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/"
"enterprise-attack/enterprise-attack-19.0.json"
)
MITRE_DEFAULT_SHA256 = "df520ea0775a57db7bff760145b02fed89290802913e056b7ed5970b02f3626a"
MITRE_BUNDLE_CACHE_PATH = Path(os.environ.get("MITRE_CACHE_DIR", "/data/mitre"))
MITRE_DOWNLOAD_TIMEOUT_SECONDS = 120
# Hosts authorised as a source for a MITRE sync. An admin holding `mitre.sync`
# could otherwise pivot the in-container HTTP fetch to internal services
# (169.254.169.254, db, internal mirrors). Override via the `MITRE_ALLOWED_HOSTS`
# env (comma-separated) when running against a private mirror.
MITRE_ALLOWED_HOSTS: frozenset[str] = frozenset(
h.strip()
for h in os.environ.get(
"MITRE_ALLOWED_HOSTS", "raw.githubusercontent.com"
).split(",")
if h.strip()
)
# Settings keys used to expose the seed metadata to the operator UI/CLI.
SETTING_LAST_SYNC = "mitre_last_sync"
SETTING_VERSION = "mitre_version"
SETTING_SOURCE_URL = "mitre_source_url"
ATTACK_SOURCE_NAME = "mitre-attack"
KILL_CHAIN_NAME = "mitre-attack"
class MitreSeedError(Exception):
pass
class MitreChecksumMismatch(MitreSeedError):
pass
class MitreSourceForbidden(MitreSeedError):
"""The provided source URL points to a host outside the allowlist."""
@dataclass
class ParsedBundle:
tactics: list[dict] = field(default_factory=list)
techniques: list[dict] = field(default_factory=list) # parent techniques
subtechniques: list[dict] = field(default_factory=list)
# Map: subtechnique attack-pattern STIX id -> parent technique STIX id
subtechnique_parents: dict[str, str] = field(default_factory=dict)
spec_version: str | None = None
@dataclass
class SeedResult:
tactics_upserted: int
techniques_upserted: int
subtechniques_upserted: int
subtechniques_skipped_orphan: int
technique_tactic_links: int
version: str | None
source: str
started_at: datetime
finished_at: datetime
def as_dict(self) -> dict:
return {
"tactics_upserted": self.tactics_upserted,
"techniques_upserted": self.techniques_upserted,
"subtechniques_upserted": self.subtechniques_upserted,
"subtechniques_skipped_orphan": self.subtechniques_skipped_orphan,
"technique_tactic_links": self.technique_tactic_links,
"version": self.version,
"source": self.source,
"started_at": self.started_at.isoformat(),
"finished_at": self.finished_at.isoformat(),
"duration_ms": int(
(self.finished_at - self.started_at).total_seconds() * 1000
),
}
# === I/O =====================================================================
def _is_url(source: str) -> bool:
parsed = urllib.parse.urlparse(source)
return parsed.scheme in ("http", "https")
def _ensure_host_allowed(url: str) -> None:
"""Raise MitreSourceForbidden if the URL targets a non-allowlisted host."""
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in ("http", "https"):
raise MitreSourceForbidden(f"unsupported URL scheme: {parsed.scheme!r}")
host = (parsed.hostname or "").lower()
if host not in MITRE_ALLOWED_HOSTS:
raise MitreSourceForbidden(
f"host {host!r} not in MITRE_ALLOWED_HOSTS={sorted(MITRE_ALLOWED_HOSTS)}"
)
def _sha256_of(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1 << 16), b""):
h.update(chunk)
return h.hexdigest()
def _download(url: str, dest: Path, *, expected_sha256: str | None = None) -> Path:
_ensure_host_allowed(url)
dest.parent.mkdir(parents=True, exist_ok=True)
tmp = dest.with_suffix(dest.suffix + ".part")
log.info("metamorph.mitre.download.start", extra={"url": url, "dest": str(dest)})
req = urllib.request.Request(url, headers={"User-Agent": "metamorph-mitre-seed/1.0"})
with urllib.request.urlopen(req, timeout=MITRE_DOWNLOAD_TIMEOUT_SECONDS) as resp:
with tmp.open("wb") as f:
for chunk in iter(lambda: resp.read(1 << 16), b""):
f.write(chunk)
if expected_sha256:
actual = _sha256_of(tmp)
if actual != expected_sha256:
tmp.unlink(missing_ok=True)
raise MitreChecksumMismatch(
f"sha256 mismatch for {url}: expected {expected_sha256}, got {actual}"
)
tmp.replace(dest)
log.info(
"metamorph.mitre.download.done",
extra={"url": url, "bytes": dest.stat().st_size},
)
return dest
def resolve_source_to_path(
source: str | Path | None,
*,
cache_dir: Path = MITRE_BUNDLE_CACHE_PATH,
expected_sha256: str | None = MITRE_DEFAULT_SHA256,
) -> tuple[Path, str]:
"""Return (path, source_label). Downloads if `source` is an URL; otherwise
treats it as a local file. `None` → default URL.
`source_label` is what we persist in `settings.mitre_source_url`."""
if source is None:
source = MITRE_DEFAULT_URL
source_str = str(source)
if _is_url(source_str):
basename = source_str.rsplit("/", 1)[-1] or "enterprise-attack.json"
dest = cache_dir / basename
_download(source_str, dest, expected_sha256=expected_sha256)
return dest, source_str
path = Path(source_str)
if not path.exists():
raise MitreSeedError(f"source path does not exist: {path}")
return path, str(path)
# === STIX parsing ============================================================
def _attack_ref(obj: dict) -> dict | None:
for ref in obj.get("external_references") or ():
if ref.get("source_name") == ATTACK_SOURCE_NAME and ref.get("external_id"):
return ref
return None
def parse_bundle(path: Path) -> ParsedBundle:
"""Read the STIX bundle into normalized dicts ready for SQL upserts."""
with path.open("r", encoding="utf-8") as f:
bundle = json.load(f)
objs = bundle.get("objects") or []
parsed = ParsedBundle(spec_version=bundle.get("spec_version"))
parents_by_subtech: dict[str, str] = {}
for o in objs:
if (
o.get("type") == "relationship"
and o.get("relationship_type") == "subtechnique-of"
and not o.get("revoked")
):
parents_by_subtech[o["source_ref"]] = o["target_ref"]
parsed.subtechnique_parents = parents_by_subtech
for o in objs:
if o.get("revoked") or o.get("x_mitre_deprecated"):
continue
kind = o.get("type")
if kind == "x-mitre-tactic":
ref = _attack_ref(o)
if not ref:
continue
parsed.tactics.append(
{
"external_id": ref["external_id"],
"name": o.get("name") or "",
"short_name": o.get("x_mitre_shortname") or "",
"description": o.get("description"),
"url": ref.get("url"),
}
)
elif kind == "attack-pattern":
ref = _attack_ref(o)
if not ref:
continue
common = {
"external_id": ref["external_id"],
"name": o.get("name") or "",
"description": o.get("description"),
"url": ref.get("url"),
}
if o.get("x_mitre_is_subtechnique"):
parent_stix = parents_by_subtech.get(o["id"])
parsed.subtechniques.append(
{**common, "stix_id": o["id"], "parent_stix_id": parent_stix}
)
else:
# Capture kill_chain_phases so we can map to tactics by short_name.
phases = [
p.get("phase_name")
for p in (o.get("kill_chain_phases") or ())
if p.get("kill_chain_name") == KILL_CHAIN_NAME and p.get("phase_name")
]
parsed.techniques.append(
{**common, "stix_id": o["id"], "phase_names": phases}
)
return parsed
# === DB upserts ==============================================================
def _upsert_tactics(s, tactics: Iterable[dict]) -> tuple[dict, int]:
"""Upsert tactics. Returns (short_name → tactic_id, n_upserted)."""
existing = {t.external_id: t for t in s.scalars(select(MitreTactic)).all()}
short_to_id: dict = {}
upserted = 0
for t in tactics:
row = existing.get(t["external_id"])
if row is None:
row = MitreTactic(
external_id=t["external_id"],
short_name=t["short_name"],
name=t["name"],
description=t["description"],
url=t["url"],
)
s.add(row)
s.flush()
upserted += 1
else:
row.short_name = t["short_name"]
row.name = t["name"]
row.description = t["description"]
row.url = t["url"]
upserted += 1
short_to_id[t["short_name"]] = row.id
return short_to_id, upserted
def _upsert_techniques(
s, techniques: Iterable[dict], short_to_tactic_id: dict
) -> tuple[dict, int, int]:
"""Upsert techniques + their tactic links. Returns (stix_id→technique_id, n_upserted, n_links)."""
existing = {t.external_id: t for t in s.scalars(select(MitreTechnique)).all()}
stix_to_id: dict = {}
n_upserted = 0
n_links = 0
# We'll rebuild the technique↔tactic mapping for clarity (drop + add). This
# is O(techniques × tactics) but cheap relative to the parse itself.
s.execute(delete(MitreTechniqueTactic))
for t in techniques:
row = existing.get(t["external_id"])
if row is None:
row = MitreTechnique(
external_id=t["external_id"],
name=t["name"],
description=t["description"],
url=t["url"],
)
s.add(row)
s.flush()
else:
row.name = t["name"]
row.description = t["description"]
row.url = t["url"]
n_upserted += 1
stix_to_id[t["stix_id"]] = row.id
for phase in t.get("phase_names", []):
tac_id = short_to_tactic_id.get(phase)
if tac_id is None:
# Tactic referenced but not in bundle — log + skip.
log.warning(
"metamorph.mitre.unknown_tactic_phase",
extra={"technique": t["external_id"], "phase": phase},
)
continue
s.add(MitreTechniqueTactic(technique_id=row.id, tactic_id=tac_id))
n_links += 1
return stix_to_id, n_upserted, n_links
def _upsert_subtechniques(
s,
subtechniques: Iterable[dict],
stix_to_tech_id: dict,
) -> tuple[int, int]:
"""Returns (n_upserted, n_skipped_orphans).
`n_upserted` is the count of rows whose state was applied (INSERT or
UPDATE) — matches Postgres upsert semantics.
"""
existing = {sb.external_id: sb for sb in s.scalars(select(MitreSubtechnique)).all()}
# Pre-index techniques by external_id so the dotted-id fallback doesn't
# issue N+1 SELECTs (was a latent footgun for partial-bundle re-syncs).
parent_by_external: dict[str, object] = {
t.external_id: t.id
for t in s.scalars(select(MitreTechnique)).all()
}
n_upserted = 0
n_skipped = 0
for sb in subtechniques:
parent_stix = sb.get("parent_stix_id")
parent_id = stix_to_tech_id.get(parent_stix) if parent_stix else None
if parent_id is None:
# Fall back to the dotted external_id convention (T1003.001 → T1003).
m = re.match(r"^(T\d+)\.\d+$", sb["external_id"])
if m:
parent_id = parent_by_external.get(m.group(1))
if parent_id is None:
log.warning(
"metamorph.mitre.orphan_subtechnique",
extra={"subtechnique": sb["external_id"]},
)
n_skipped += 1
continue
row = existing.get(sb["external_id"])
if row is None:
s.add(
MitreSubtechnique(
external_id=sb["external_id"],
name=sb["name"],
description=sb["description"],
url=sb["url"],
technique_id=parent_id,
)
)
else:
row.name = sb["name"]
row.description = sb["description"]
row.url = sb["url"]
row.technique_id = parent_id
n_upserted += 1
return n_upserted, n_skipped
def _upsert_setting(s, key: str, value: object) -> None:
row = s.scalar(select(Setting).where(Setting.key == key))
if row is None:
s.add(Setting(key=key, value=value))
else:
row.value = value
# === Entry point =============================================================
def seed_mitre(
*,
source: str | Path | None = None,
expected_sha256: str | None = MITRE_DEFAULT_SHA256,
cache_dir: Path = MITRE_BUNDLE_CACHE_PATH,
allow_unverified: bool = False,
) -> SeedResult:
"""Top-level seed. URL → download + verify + parse; path → just parse.
Custom URLs (anything other than `MITRE_DEFAULT_URL`) MUST be paired with
an `expected_sha256` for integrity, or with `allow_unverified=True` to opt
out explicitly. This avoids a silent integrity bypass when an operator
points the sync at a typo'd or attacker-controlled mirror.
"""
started_at = datetime.now(tz=timezone.utc)
if source is not None and _is_url(str(source)) and str(source) != MITRE_DEFAULT_URL:
if expected_sha256 is None or expected_sha256 == MITRE_DEFAULT_SHA256:
# The caller passed a non-default URL but didn't override the hash:
# MITRE_DEFAULT_SHA256 would obviously not match → force an explicit
# decision rather than silently bypassing.
if not allow_unverified:
raise MitreSeedError(
"custom URL requires an expected_sha256 (or allow_unverified=True)"
)
expected_sha256 = None
path, source_label = resolve_source_to_path(
source, cache_dir=cache_dir, expected_sha256=expected_sha256
)
parsed = parse_bundle(path)
log.info(
"metamorph.mitre.parsed",
extra={
"tactics": len(parsed.tactics),
"techniques": len(parsed.techniques),
"subtechniques": len(parsed.subtechniques),
"spec_version": parsed.spec_version,
},
)
with session_scope() as s:
# Serialize concurrent /mitre/sync calls. The lock is transaction-scoped
# (released automatically at COMMIT/ROLLBACK), so a second sync arriving
# while the first is mid-DELETE+INSERT of `mitre_technique_tactics`
# blocks until the first commits. Avoids the unique-constraint race the
# code-reviewer flagged. hashtext() is stable across sessions.
s.execute(sql_text("SELECT pg_advisory_xact_lock(hashtext('mitre.seed'))"))
short_to_tactic_id, n_tactics = _upsert_tactics(s, parsed.tactics)
stix_to_tech_id, n_techs, n_links = _upsert_techniques(
s, parsed.techniques, short_to_tactic_id
)
n_subs, n_orphan = _upsert_subtechniques(s, parsed.subtechniques, stix_to_tech_id)
finished_at = datetime.now(tz=timezone.utc)
_upsert_setting(s, SETTING_LAST_SYNC, finished_at.isoformat())
# `version` reflects the known pin only when seeded from MITRE_DEFAULT_URL;
# otherwise we explicitly clear it so /mitre/status doesn't lie about a
# stale version after a custom-URL re-sync.
version = MITRE_VERSION if source_label == MITRE_DEFAULT_URL else None
_upsert_setting(s, SETTING_VERSION, version)
_upsert_setting(s, SETTING_SOURCE_URL, source_label)
result = SeedResult(
tactics_upserted=n_tactics,
techniques_upserted=n_techs,
subtechniques_upserted=n_subs,
subtechniques_skipped_orphan=n_orphan,
technique_tactic_links=n_links,
version=version,
source=source_label,
started_at=started_at,
finished_at=finished_at,
)
log.info("metamorph.mitre.seed_completed", extra=result.as_dict())
return result
def read_status() -> dict:
"""Return the persisted seed metadata for `GET /mitre/status`."""
keys = {SETTING_LAST_SYNC, SETTING_VERSION, SETTING_SOURCE_URL}
out = {k: None for k in keys}
with session_scope() as s:
for row in s.scalars(select(Setting).where(Setting.key.in_(keys))).all():
out[row.key] = row.value
return {
"last_sync": out[SETTING_LAST_SYNC],
"version": out[SETTING_VERSION],
"source_url": out[SETTING_SOURCE_URL],
"default_url": MITRE_DEFAULT_URL,
"default_version": MITRE_VERSION,
}