Six post-code-review fixes, applied before opening the PR per project
workflow (spec-review + code-review both gate the merge):
1. SSRF allowlist on `/mitre/sync`. Host must be in MITRE_ALLOWED_HOSTS
(defaults to `raw.githubusercontent.com`, env-overridable). Closes "admin
holding `mitre.sync` pivots api container at 169.254.169.254 / internal
mirrors" via a typo'd URL. New `MitreSourceForbidden` → 400
`source_forbidden`; checked at the top of `_download()` so it kicks in
before any I/O.
2. `pg_advisory_xact_lock(hashtext('mitre.seed'))` at the top of the seed
transaction. Two concurrent `/mitre/sync` requests now serialise across
the DELETE+INSERT of `mitre_technique_tactics`; previously they could
both wipe the M2M and one would fail the unique constraint on re-insert.
3. Typed SyncResult contract. Pydantic `SyncResultOut` on the Flask side
`model_validate`s the dict before returning — single source of truth
for the response shape, mirrored by a `MitreSyncResult` TS interface
(next commit). The `as Record<string, unknown>` + `as { duration_ms }`
cast in MitrePage is gone.
4. N+1 in dotted sub-technique fallback removed. Built
`{external_id → technique_id}` once at function entry. Currently a
no-op against MITRE official (0 orphans), but a latent footgun for
partial / older bundles.
5. `SETTING_VERSION` cleared explicitly when `source != MITRE_DEFAULT_URL`.
Previously it kept the stale pin label, so `/mitre/status` lied after
a custom-URL re-sync.
6. `/mitre/sync` 500s no longer echo `str(e)` to the client — URLError /
psycopg / Pydantic text now lives in the JSON log only. Public response
stays `{"error": "internal_error"}`.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
516 lines
18 KiB
Python
516 lines
18 KiB
Python
"""MITRE ATT&CK Enterprise seed + sync.
|
||
|
||
Parses a STIX 2.1 bundle into the `mitre_*` tables. Idempotent: re-running
|
||
upserts on `external_id`, refreshes name/description/url, and re-applies the
|
||
technique↔tactic mapping. Sub-techniques whose parent is missing in the
|
||
bundle are skipped (with a WARNING log).
|
||
|
||
Defaults pin a specific Enterprise release (see `MITRE_DEFAULT_*`). The pin
|
||
is honored by the CLI (`flask metamorph seed-mitre`) and by the
|
||
`POST /mitre/sync` admin endpoint; both accept a `--source` / `source_url`
|
||
override for air-gapped operators.
|
||
|
||
The bundle is downloaded with `urllib.request` (stdlib — no extra dep) and
|
||
cached at `MITRE_BUNDLE_CACHE_PATH` (default `/data/mitre/<basename>.json`).
|
||
Pass an absolute path as `source` to bypass the network entirely.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import hashlib
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import urllib.parse
|
||
import urllib.request
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Iterable
|
||
|
||
from sqlalchemy import delete, select, text as sql_text
|
||
|
||
from app.db.session import session_scope
|
||
from app.models.mitre import (
|
||
MitreSubtechnique,
|
||
MitreTactic,
|
||
MitreTechnique,
|
||
MitreTechniqueTactic,
|
||
)
|
||
from app.models.setting import Setting
|
||
|
||
log = logging.getLogger("metamorph.mitre.seed")
|
||
|
||
# === Default pin =============================================================
|
||
#
|
||
# MITRE publishes versioned bundles at
|
||
# `https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/enterprise-attack/enterprise-attack-<X.Y>.json`.
|
||
# Update these three constants in lock-step when bumping the pin. The SHA256
|
||
# is verified against the downloaded bytes — a mismatch aborts the seed.
|
||
#
|
||
MITRE_VERSION = "19.0"
|
||
MITRE_DEFAULT_URL = (
|
||
"https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/"
|
||
"enterprise-attack/enterprise-attack-19.0.json"
|
||
)
|
||
MITRE_DEFAULT_SHA256 = "df520ea0775a57db7bff760145b02fed89290802913e056b7ed5970b02f3626a"
|
||
|
||
MITRE_BUNDLE_CACHE_PATH = Path(os.environ.get("MITRE_CACHE_DIR", "/data/mitre"))
|
||
MITRE_DOWNLOAD_TIMEOUT_SECONDS = 120
|
||
|
||
# Hosts authorised as a source for a MITRE sync. An admin holding `mitre.sync`
|
||
# could otherwise pivot the in-container HTTP fetch to internal services
|
||
# (169.254.169.254, db, internal mirrors). Override via the `MITRE_ALLOWED_HOSTS`
|
||
# env (comma-separated) when running against a private mirror.
|
||
MITRE_ALLOWED_HOSTS: frozenset[str] = frozenset(
|
||
h.strip()
|
||
for h in os.environ.get(
|
||
"MITRE_ALLOWED_HOSTS", "raw.githubusercontent.com"
|
||
).split(",")
|
||
if h.strip()
|
||
)
|
||
|
||
# Settings keys used to expose the seed metadata to the operator UI/CLI.
|
||
SETTING_LAST_SYNC = "mitre_last_sync"
|
||
SETTING_VERSION = "mitre_version"
|
||
SETTING_SOURCE_URL = "mitre_source_url"
|
||
|
||
ATTACK_SOURCE_NAME = "mitre-attack"
|
||
KILL_CHAIN_NAME = "mitre-attack"
|
||
|
||
|
||
class MitreSeedError(Exception):
|
||
pass
|
||
|
||
|
||
class MitreChecksumMismatch(MitreSeedError):
|
||
pass
|
||
|
||
|
||
class MitreSourceForbidden(MitreSeedError):
|
||
"""The provided source URL points to a host outside the allowlist."""
|
||
|
||
|
||
@dataclass
|
||
class ParsedBundle:
|
||
tactics: list[dict] = field(default_factory=list)
|
||
techniques: list[dict] = field(default_factory=list) # parent techniques
|
||
subtechniques: list[dict] = field(default_factory=list)
|
||
# Map: subtechnique attack-pattern STIX id -> parent technique STIX id
|
||
subtechnique_parents: dict[str, str] = field(default_factory=dict)
|
||
spec_version: str | None = None
|
||
|
||
|
||
@dataclass
|
||
class SeedResult:
|
||
tactics_upserted: int
|
||
techniques_upserted: int
|
||
subtechniques_upserted: int
|
||
subtechniques_skipped_orphan: int
|
||
technique_tactic_links: int
|
||
version: str | None
|
||
source: str
|
||
started_at: datetime
|
||
finished_at: datetime
|
||
|
||
def as_dict(self) -> dict:
|
||
return {
|
||
"tactics_upserted": self.tactics_upserted,
|
||
"techniques_upserted": self.techniques_upserted,
|
||
"subtechniques_upserted": self.subtechniques_upserted,
|
||
"subtechniques_skipped_orphan": self.subtechniques_skipped_orphan,
|
||
"technique_tactic_links": self.technique_tactic_links,
|
||
"version": self.version,
|
||
"source": self.source,
|
||
"started_at": self.started_at.isoformat(),
|
||
"finished_at": self.finished_at.isoformat(),
|
||
"duration_ms": int(
|
||
(self.finished_at - self.started_at).total_seconds() * 1000
|
||
),
|
||
}
|
||
|
||
|
||
# === I/O =====================================================================
|
||
|
||
|
||
def _is_url(source: str) -> bool:
|
||
parsed = urllib.parse.urlparse(source)
|
||
return parsed.scheme in ("http", "https")
|
||
|
||
|
||
def _ensure_host_allowed(url: str) -> None:
|
||
"""Raise MitreSourceForbidden if the URL targets a non-allowlisted host."""
|
||
parsed = urllib.parse.urlparse(url)
|
||
if parsed.scheme not in ("http", "https"):
|
||
raise MitreSourceForbidden(f"unsupported URL scheme: {parsed.scheme!r}")
|
||
host = (parsed.hostname or "").lower()
|
||
if host not in MITRE_ALLOWED_HOSTS:
|
||
raise MitreSourceForbidden(
|
||
f"host {host!r} not in MITRE_ALLOWED_HOSTS={sorted(MITRE_ALLOWED_HOSTS)}"
|
||
)
|
||
|
||
|
||
def _sha256_of(path: Path) -> str:
|
||
h = hashlib.sha256()
|
||
with path.open("rb") as f:
|
||
for chunk in iter(lambda: f.read(1 << 16), b""):
|
||
h.update(chunk)
|
||
return h.hexdigest()
|
||
|
||
|
||
def _download(url: str, dest: Path, *, expected_sha256: str | None = None) -> Path:
|
||
_ensure_host_allowed(url)
|
||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||
tmp = dest.with_suffix(dest.suffix + ".part")
|
||
log.info("metamorph.mitre.download.start", extra={"url": url, "dest": str(dest)})
|
||
req = urllib.request.Request(url, headers={"User-Agent": "metamorph-mitre-seed/1.0"})
|
||
with urllib.request.urlopen(req, timeout=MITRE_DOWNLOAD_TIMEOUT_SECONDS) as resp:
|
||
with tmp.open("wb") as f:
|
||
for chunk in iter(lambda: resp.read(1 << 16), b""):
|
||
f.write(chunk)
|
||
if expected_sha256:
|
||
actual = _sha256_of(tmp)
|
||
if actual != expected_sha256:
|
||
tmp.unlink(missing_ok=True)
|
||
raise MitreChecksumMismatch(
|
||
f"sha256 mismatch for {url}: expected {expected_sha256}, got {actual}"
|
||
)
|
||
tmp.replace(dest)
|
||
log.info(
|
||
"metamorph.mitre.download.done",
|
||
extra={"url": url, "bytes": dest.stat().st_size},
|
||
)
|
||
return dest
|
||
|
||
|
||
def resolve_source_to_path(
|
||
source: str | Path | None,
|
||
*,
|
||
cache_dir: Path = MITRE_BUNDLE_CACHE_PATH,
|
||
expected_sha256: str | None = MITRE_DEFAULT_SHA256,
|
||
) -> tuple[Path, str]:
|
||
"""Return (path, source_label). Downloads if `source` is an URL; otherwise
|
||
treats it as a local file. `None` → default URL.
|
||
|
||
`source_label` is what we persist in `settings.mitre_source_url`."""
|
||
if source is None:
|
||
source = MITRE_DEFAULT_URL
|
||
source_str = str(source)
|
||
if _is_url(source_str):
|
||
basename = source_str.rsplit("/", 1)[-1] or "enterprise-attack.json"
|
||
dest = cache_dir / basename
|
||
_download(source_str, dest, expected_sha256=expected_sha256)
|
||
return dest, source_str
|
||
path = Path(source_str)
|
||
if not path.exists():
|
||
raise MitreSeedError(f"source path does not exist: {path}")
|
||
return path, str(path)
|
||
|
||
|
||
# === STIX parsing ============================================================
|
||
|
||
|
||
def _attack_ref(obj: dict) -> dict | None:
|
||
for ref in obj.get("external_references") or ():
|
||
if ref.get("source_name") == ATTACK_SOURCE_NAME and ref.get("external_id"):
|
||
return ref
|
||
return None
|
||
|
||
|
||
def parse_bundle(path: Path) -> ParsedBundle:
|
||
"""Read the STIX bundle into normalized dicts ready for SQL upserts."""
|
||
with path.open("r", encoding="utf-8") as f:
|
||
bundle = json.load(f)
|
||
objs = bundle.get("objects") or []
|
||
parsed = ParsedBundle(spec_version=bundle.get("spec_version"))
|
||
|
||
parents_by_subtech: dict[str, str] = {}
|
||
for o in objs:
|
||
if (
|
||
o.get("type") == "relationship"
|
||
and o.get("relationship_type") == "subtechnique-of"
|
||
and not o.get("revoked")
|
||
):
|
||
parents_by_subtech[o["source_ref"]] = o["target_ref"]
|
||
parsed.subtechnique_parents = parents_by_subtech
|
||
|
||
for o in objs:
|
||
if o.get("revoked") or o.get("x_mitre_deprecated"):
|
||
continue
|
||
kind = o.get("type")
|
||
if kind == "x-mitre-tactic":
|
||
ref = _attack_ref(o)
|
||
if not ref:
|
||
continue
|
||
parsed.tactics.append(
|
||
{
|
||
"external_id": ref["external_id"],
|
||
"name": o.get("name") or "",
|
||
"short_name": o.get("x_mitre_shortname") or "",
|
||
"description": o.get("description"),
|
||
"url": ref.get("url"),
|
||
}
|
||
)
|
||
elif kind == "attack-pattern":
|
||
ref = _attack_ref(o)
|
||
if not ref:
|
||
continue
|
||
common = {
|
||
"external_id": ref["external_id"],
|
||
"name": o.get("name") or "",
|
||
"description": o.get("description"),
|
||
"url": ref.get("url"),
|
||
}
|
||
if o.get("x_mitre_is_subtechnique"):
|
||
parent_stix = parents_by_subtech.get(o["id"])
|
||
parsed.subtechniques.append(
|
||
{**common, "stix_id": o["id"], "parent_stix_id": parent_stix}
|
||
)
|
||
else:
|
||
# Capture kill_chain_phases so we can map to tactics by short_name.
|
||
phases = [
|
||
p.get("phase_name")
|
||
for p in (o.get("kill_chain_phases") or ())
|
||
if p.get("kill_chain_name") == KILL_CHAIN_NAME and p.get("phase_name")
|
||
]
|
||
parsed.techniques.append(
|
||
{**common, "stix_id": o["id"], "phase_names": phases}
|
||
)
|
||
return parsed
|
||
|
||
|
||
# === DB upserts ==============================================================
|
||
|
||
|
||
def _upsert_tactics(s, tactics: Iterable[dict]) -> tuple[dict, int]:
|
||
"""Upsert tactics. Returns (short_name → tactic_id, n_upserted)."""
|
||
existing = {t.external_id: t for t in s.scalars(select(MitreTactic)).all()}
|
||
short_to_id: dict = {}
|
||
upserted = 0
|
||
for t in tactics:
|
||
row = existing.get(t["external_id"])
|
||
if row is None:
|
||
row = MitreTactic(
|
||
external_id=t["external_id"],
|
||
short_name=t["short_name"],
|
||
name=t["name"],
|
||
description=t["description"],
|
||
url=t["url"],
|
||
)
|
||
s.add(row)
|
||
s.flush()
|
||
upserted += 1
|
||
else:
|
||
row.short_name = t["short_name"]
|
||
row.name = t["name"]
|
||
row.description = t["description"]
|
||
row.url = t["url"]
|
||
upserted += 1
|
||
short_to_id[t["short_name"]] = row.id
|
||
return short_to_id, upserted
|
||
|
||
|
||
def _upsert_techniques(
|
||
s, techniques: Iterable[dict], short_to_tactic_id: dict
|
||
) -> tuple[dict, int, int]:
|
||
"""Upsert techniques + their tactic links. Returns (stix_id→technique_id, n_upserted, n_links)."""
|
||
existing = {t.external_id: t for t in s.scalars(select(MitreTechnique)).all()}
|
||
stix_to_id: dict = {}
|
||
n_upserted = 0
|
||
n_links = 0
|
||
|
||
# We'll rebuild the technique↔tactic mapping for clarity (drop + add). This
|
||
# is O(techniques × tactics) but cheap relative to the parse itself.
|
||
s.execute(delete(MitreTechniqueTactic))
|
||
|
||
for t in techniques:
|
||
row = existing.get(t["external_id"])
|
||
if row is None:
|
||
row = MitreTechnique(
|
||
external_id=t["external_id"],
|
||
name=t["name"],
|
||
description=t["description"],
|
||
url=t["url"],
|
||
)
|
||
s.add(row)
|
||
s.flush()
|
||
else:
|
||
row.name = t["name"]
|
||
row.description = t["description"]
|
||
row.url = t["url"]
|
||
n_upserted += 1
|
||
stix_to_id[t["stix_id"]] = row.id
|
||
for phase in t.get("phase_names", []):
|
||
tac_id = short_to_tactic_id.get(phase)
|
||
if tac_id is None:
|
||
# Tactic referenced but not in bundle — log + skip.
|
||
log.warning(
|
||
"metamorph.mitre.unknown_tactic_phase",
|
||
extra={"technique": t["external_id"], "phase": phase},
|
||
)
|
||
continue
|
||
s.add(MitreTechniqueTactic(technique_id=row.id, tactic_id=tac_id))
|
||
n_links += 1
|
||
return stix_to_id, n_upserted, n_links
|
||
|
||
|
||
def _upsert_subtechniques(
|
||
s,
|
||
subtechniques: Iterable[dict],
|
||
stix_to_tech_id: dict,
|
||
) -> tuple[int, int]:
|
||
"""Returns (n_upserted, n_skipped_orphans).
|
||
|
||
`n_upserted` is the count of rows whose state was applied (INSERT or
|
||
UPDATE) — matches Postgres upsert semantics.
|
||
"""
|
||
existing = {sb.external_id: sb for sb in s.scalars(select(MitreSubtechnique)).all()}
|
||
# Pre-index techniques by external_id so the dotted-id fallback doesn't
|
||
# issue N+1 SELECTs (was a latent footgun for partial-bundle re-syncs).
|
||
parent_by_external: dict[str, object] = {
|
||
t.external_id: t.id
|
||
for t in s.scalars(select(MitreTechnique)).all()
|
||
}
|
||
n_upserted = 0
|
||
n_skipped = 0
|
||
for sb in subtechniques:
|
||
parent_stix = sb.get("parent_stix_id")
|
||
parent_id = stix_to_tech_id.get(parent_stix) if parent_stix else None
|
||
if parent_id is None:
|
||
# Fall back to the dotted external_id convention (T1003.001 → T1003).
|
||
m = re.match(r"^(T\d+)\.\d+$", sb["external_id"])
|
||
if m:
|
||
parent_id = parent_by_external.get(m.group(1))
|
||
if parent_id is None:
|
||
log.warning(
|
||
"metamorph.mitre.orphan_subtechnique",
|
||
extra={"subtechnique": sb["external_id"]},
|
||
)
|
||
n_skipped += 1
|
||
continue
|
||
row = existing.get(sb["external_id"])
|
||
if row is None:
|
||
s.add(
|
||
MitreSubtechnique(
|
||
external_id=sb["external_id"],
|
||
name=sb["name"],
|
||
description=sb["description"],
|
||
url=sb["url"],
|
||
technique_id=parent_id,
|
||
)
|
||
)
|
||
else:
|
||
row.name = sb["name"]
|
||
row.description = sb["description"]
|
||
row.url = sb["url"]
|
||
row.technique_id = parent_id
|
||
n_upserted += 1
|
||
return n_upserted, n_skipped
|
||
|
||
|
||
def _upsert_setting(s, key: str, value: object) -> None:
|
||
row = s.scalar(select(Setting).where(Setting.key == key))
|
||
if row is None:
|
||
s.add(Setting(key=key, value=value))
|
||
else:
|
||
row.value = value
|
||
|
||
|
||
# === Entry point =============================================================
|
||
|
||
|
||
def seed_mitre(
|
||
*,
|
||
source: str | Path | None = None,
|
||
expected_sha256: str | None = MITRE_DEFAULT_SHA256,
|
||
cache_dir: Path = MITRE_BUNDLE_CACHE_PATH,
|
||
allow_unverified: bool = False,
|
||
) -> SeedResult:
|
||
"""Top-level seed. URL → download + verify + parse; path → just parse.
|
||
|
||
Custom URLs (anything other than `MITRE_DEFAULT_URL`) MUST be paired with
|
||
an `expected_sha256` for integrity, or with `allow_unverified=True` to opt
|
||
out explicitly. This avoids a silent integrity bypass when an operator
|
||
points the sync at a typo'd or attacker-controlled mirror.
|
||
"""
|
||
started_at = datetime.now(tz=timezone.utc)
|
||
if source is not None and _is_url(str(source)) and str(source) != MITRE_DEFAULT_URL:
|
||
if expected_sha256 is None or expected_sha256 == MITRE_DEFAULT_SHA256:
|
||
# The caller passed a non-default URL but didn't override the hash:
|
||
# MITRE_DEFAULT_SHA256 would obviously not match → force an explicit
|
||
# decision rather than silently bypassing.
|
||
if not allow_unverified:
|
||
raise MitreSeedError(
|
||
"custom URL requires an expected_sha256 (or allow_unverified=True)"
|
||
)
|
||
expected_sha256 = None
|
||
|
||
path, source_label = resolve_source_to_path(
|
||
source, cache_dir=cache_dir, expected_sha256=expected_sha256
|
||
)
|
||
|
||
parsed = parse_bundle(path)
|
||
log.info(
|
||
"metamorph.mitre.parsed",
|
||
extra={
|
||
"tactics": len(parsed.tactics),
|
||
"techniques": len(parsed.techniques),
|
||
"subtechniques": len(parsed.subtechniques),
|
||
"spec_version": parsed.spec_version,
|
||
},
|
||
)
|
||
|
||
with session_scope() as s:
|
||
# Serialize concurrent /mitre/sync calls. The lock is transaction-scoped
|
||
# (released automatically at COMMIT/ROLLBACK), so a second sync arriving
|
||
# while the first is mid-DELETE+INSERT of `mitre_technique_tactics`
|
||
# blocks until the first commits. Avoids the unique-constraint race the
|
||
# code-reviewer flagged. hashtext() is stable across sessions.
|
||
s.execute(sql_text("SELECT pg_advisory_xact_lock(hashtext('mitre.seed'))"))
|
||
|
||
short_to_tactic_id, n_tactics = _upsert_tactics(s, parsed.tactics)
|
||
stix_to_tech_id, n_techs, n_links = _upsert_techniques(
|
||
s, parsed.techniques, short_to_tactic_id
|
||
)
|
||
n_subs, n_orphan = _upsert_subtechniques(s, parsed.subtechniques, stix_to_tech_id)
|
||
|
||
finished_at = datetime.now(tz=timezone.utc)
|
||
_upsert_setting(s, SETTING_LAST_SYNC, finished_at.isoformat())
|
||
# `version` reflects the known pin only when seeded from MITRE_DEFAULT_URL;
|
||
# otherwise we explicitly clear it so /mitre/status doesn't lie about a
|
||
# stale version after a custom-URL re-sync.
|
||
version = MITRE_VERSION if source_label == MITRE_DEFAULT_URL else None
|
||
_upsert_setting(s, SETTING_VERSION, version)
|
||
_upsert_setting(s, SETTING_SOURCE_URL, source_label)
|
||
|
||
result = SeedResult(
|
||
tactics_upserted=n_tactics,
|
||
techniques_upserted=n_techs,
|
||
subtechniques_upserted=n_subs,
|
||
subtechniques_skipped_orphan=n_orphan,
|
||
technique_tactic_links=n_links,
|
||
version=version,
|
||
source=source_label,
|
||
started_at=started_at,
|
||
finished_at=finished_at,
|
||
)
|
||
log.info("metamorph.mitre.seed_completed", extra=result.as_dict())
|
||
return result
|
||
|
||
|
||
def read_status() -> dict:
|
||
"""Return the persisted seed metadata for `GET /mitre/status`."""
|
||
keys = {SETTING_LAST_SYNC, SETTING_VERSION, SETTING_SOURCE_URL}
|
||
out = {k: None for k in keys}
|
||
with session_scope() as s:
|
||
for row in s.scalars(select(Setting).where(Setting.key.in_(keys))).all():
|
||
out[row.key] = row.value
|
||
return {
|
||
"last_sync": out[SETTING_LAST_SYNC],
|
||
"version": out[SETTING_VERSION],
|
||
"source_url": out[SETTING_SOURCE_URL],
|
||
"default_url": MITRE_DEFAULT_URL,
|
||
"default_version": MITRE_VERSION,
|
||
}
|