"""MITRE ATT&CK Enterprise seed + sync. Parses a STIX 2.1 bundle into the `mitre_*` tables. Idempotent: re-running upserts on `external_id`, refreshes name/description/url, and re-applies the technique↔tactic mapping. Sub-techniques whose parent is missing in the bundle are skipped (with a WARNING log). Defaults pin a specific Enterprise release (see `MITRE_DEFAULT_*`). The pin is honored by the CLI (`flask metamorph seed-mitre`) and by the `POST /mitre/sync` admin endpoint; both accept a `--source` / `source_url` override for air-gapped operators. The bundle is downloaded with `urllib.request` (stdlib — no extra dep) and cached at `MITRE_BUNDLE_CACHE_PATH` (default `/data/mitre/.json`). Pass an absolute path as `source` to bypass the network entirely. """ from __future__ import annotations import hashlib import json import logging import os import re import urllib.parse import urllib.request from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Iterable from sqlalchemy import delete, select from app.db.session import session_scope from app.models.mitre import ( MitreSubtechnique, MitreTactic, MitreTechnique, MitreTechniqueTactic, ) from app.models.setting import Setting log = logging.getLogger("metamorph.mitre.seed") # === Default pin ============================================================= # # MITRE publishes versioned bundles at # `https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/enterprise-attack/enterprise-attack-.json`. # Update these three constants in lock-step when bumping the pin. The SHA256 # is verified against the downloaded bytes — a mismatch aborts the seed. # MITRE_VERSION = "19.0" MITRE_DEFAULT_URL = ( "https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/" "enterprise-attack/enterprise-attack-19.0.json" ) MITRE_DEFAULT_SHA256 = "df520ea0775a57db7bff760145b02fed89290802913e056b7ed5970b02f3626a" MITRE_BUNDLE_CACHE_PATH = Path(os.environ.get("MITRE_CACHE_DIR", "/data/mitre")) MITRE_DOWNLOAD_TIMEOUT_SECONDS = 120 # Settings keys used to expose the seed metadata to the operator UI/CLI. SETTING_LAST_SYNC = "mitre_last_sync" SETTING_VERSION = "mitre_version" SETTING_SOURCE_URL = "mitre_source_url" ATTACK_SOURCE_NAME = "mitre-attack" KILL_CHAIN_NAME = "mitre-attack" class MitreSeedError(Exception): pass class MitreChecksumMismatch(MitreSeedError): pass @dataclass class ParsedBundle: tactics: list[dict] = field(default_factory=list) techniques: list[dict] = field(default_factory=list) # parent techniques subtechniques: list[dict] = field(default_factory=list) # Map: subtechnique attack-pattern STIX id -> parent technique STIX id subtechnique_parents: dict[str, str] = field(default_factory=dict) spec_version: str | None = None @dataclass class SeedResult: tactics_upserted: int techniques_upserted: int subtechniques_upserted: int subtechniques_skipped_orphan: int technique_tactic_links: int version: str | None source: str started_at: datetime finished_at: datetime def as_dict(self) -> dict: return { "tactics_upserted": self.tactics_upserted, "techniques_upserted": self.techniques_upserted, "subtechniques_upserted": self.subtechniques_upserted, "subtechniques_skipped_orphan": self.subtechniques_skipped_orphan, "technique_tactic_links": self.technique_tactic_links, "version": self.version, "source": self.source, "started_at": self.started_at.isoformat(), "finished_at": self.finished_at.isoformat(), "duration_ms": int( (self.finished_at - self.started_at).total_seconds() * 1000 ), } # === I/O ===================================================================== def _is_url(source: str) -> bool: parsed = urllib.parse.urlparse(source) return parsed.scheme in ("http", "https") def _sha256_of(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1 << 16), b""): h.update(chunk) return h.hexdigest() def _download(url: str, dest: Path, *, expected_sha256: str | None = None) -> Path: dest.parent.mkdir(parents=True, exist_ok=True) tmp = dest.with_suffix(dest.suffix + ".part") log.info("metamorph.mitre.download.start", extra={"url": url, "dest": str(dest)}) req = urllib.request.Request(url, headers={"User-Agent": "metamorph-mitre-seed/1.0"}) with urllib.request.urlopen(req, timeout=MITRE_DOWNLOAD_TIMEOUT_SECONDS) as resp: with tmp.open("wb") as f: for chunk in iter(lambda: resp.read(1 << 16), b""): f.write(chunk) if expected_sha256: actual = _sha256_of(tmp) if actual != expected_sha256: tmp.unlink(missing_ok=True) raise MitreChecksumMismatch( f"sha256 mismatch for {url}: expected {expected_sha256}, got {actual}" ) tmp.replace(dest) log.info( "metamorph.mitre.download.done", extra={"url": url, "bytes": dest.stat().st_size}, ) return dest def resolve_source_to_path( source: str | Path | None, *, cache_dir: Path = MITRE_BUNDLE_CACHE_PATH, expected_sha256: str | None = MITRE_DEFAULT_SHA256, ) -> tuple[Path, str]: """Return (path, source_label). Downloads if `source` is an URL; otherwise treats it as a local file. `None` → default URL. `source_label` is what we persist in `settings.mitre_source_url`.""" if source is None: source = MITRE_DEFAULT_URL source_str = str(source) if _is_url(source_str): basename = source_str.rsplit("/", 1)[-1] or "enterprise-attack.json" dest = cache_dir / basename _download(source_str, dest, expected_sha256=expected_sha256) return dest, source_str path = Path(source_str) if not path.exists(): raise MitreSeedError(f"source path does not exist: {path}") return path, str(path) # === STIX parsing ============================================================ def _attack_ref(obj: dict) -> dict | None: for ref in obj.get("external_references") or (): if ref.get("source_name") == ATTACK_SOURCE_NAME and ref.get("external_id"): return ref return None def parse_bundle(path: Path) -> ParsedBundle: """Read the STIX bundle into normalized dicts ready for SQL upserts.""" with path.open("r", encoding="utf-8") as f: bundle = json.load(f) objs = bundle.get("objects") or [] parsed = ParsedBundle(spec_version=bundle.get("spec_version")) parents_by_subtech: dict[str, str] = {} for o in objs: if ( o.get("type") == "relationship" and o.get("relationship_type") == "subtechnique-of" and not o.get("revoked") ): parents_by_subtech[o["source_ref"]] = o["target_ref"] parsed.subtechnique_parents = parents_by_subtech for o in objs: if o.get("revoked") or o.get("x_mitre_deprecated"): continue kind = o.get("type") if kind == "x-mitre-tactic": ref = _attack_ref(o) if not ref: continue parsed.tactics.append( { "external_id": ref["external_id"], "name": o.get("name") or "", "short_name": o.get("x_mitre_shortname") or "", "description": o.get("description"), "url": ref.get("url"), } ) elif kind == "attack-pattern": ref = _attack_ref(o) if not ref: continue common = { "external_id": ref["external_id"], "name": o.get("name") or "", "description": o.get("description"), "url": ref.get("url"), } if o.get("x_mitre_is_subtechnique"): parent_stix = parents_by_subtech.get(o["id"]) parsed.subtechniques.append( {**common, "stix_id": o["id"], "parent_stix_id": parent_stix} ) else: # Capture kill_chain_phases so we can map to tactics by short_name. phases = [ p.get("phase_name") for p in (o.get("kill_chain_phases") or ()) if p.get("kill_chain_name") == KILL_CHAIN_NAME and p.get("phase_name") ] parsed.techniques.append( {**common, "stix_id": o["id"], "phase_names": phases} ) return parsed # === DB upserts ============================================================== def _upsert_tactics(s, tactics: Iterable[dict]) -> tuple[dict, int]: """Upsert tactics. Returns (short_name → tactic_id, n_upserted).""" existing = {t.external_id: t for t in s.scalars(select(MitreTactic)).all()} short_to_id: dict = {} upserted = 0 for t in tactics: row = existing.get(t["external_id"]) if row is None: row = MitreTactic( external_id=t["external_id"], short_name=t["short_name"], name=t["name"], description=t["description"], url=t["url"], ) s.add(row) s.flush() upserted += 1 else: row.short_name = t["short_name"] row.name = t["name"] row.description = t["description"] row.url = t["url"] upserted += 1 short_to_id[t["short_name"]] = row.id return short_to_id, upserted def _upsert_techniques( s, techniques: Iterable[dict], short_to_tactic_id: dict ) -> tuple[dict, int, int]: """Upsert techniques + their tactic links. Returns (stix_id→technique_id, n_upserted, n_links).""" existing = {t.external_id: t for t in s.scalars(select(MitreTechnique)).all()} stix_to_id: dict = {} n_upserted = 0 n_links = 0 # We'll rebuild the technique↔tactic mapping for clarity (drop + add). This # is O(techniques × tactics) but cheap relative to the parse itself. s.execute(delete(MitreTechniqueTactic)) for t in techniques: row = existing.get(t["external_id"]) if row is None: row = MitreTechnique( external_id=t["external_id"], name=t["name"], description=t["description"], url=t["url"], ) s.add(row) s.flush() else: row.name = t["name"] row.description = t["description"] row.url = t["url"] n_upserted += 1 stix_to_id[t["stix_id"]] = row.id for phase in t.get("phase_names", []): tac_id = short_to_tactic_id.get(phase) if tac_id is None: # Tactic referenced but not in bundle — log + skip. log.warning( "metamorph.mitre.unknown_tactic_phase", extra={"technique": t["external_id"], "phase": phase}, ) continue s.add(MitreTechniqueTactic(technique_id=row.id, tactic_id=tac_id)) n_links += 1 return stix_to_id, n_upserted, n_links def _upsert_subtechniques( s, subtechniques: Iterable[dict], stix_to_tech_id: dict, ) -> tuple[int, int]: """Returns (n_upserted, n_skipped_orphans).""" existing = {sb.external_id: sb for sb in s.scalars(select(MitreSubtechnique)).all()} n_upserted = 0 n_skipped = 0 for sb in subtechniques: parent_stix = sb.get("parent_stix_id") parent_id = stix_to_tech_id.get(parent_stix) if parent_stix else None if parent_id is None: # Fall back to the dotted external_id convention (T1003.001 → T1003). m = re.match(r"^(T\d+)\.\d+$", sb["external_id"]) if m: parent_ext = m.group(1) # We don't have a parent-by-external-id map here; query. parent_row = next( iter( s.scalars( select(MitreTechnique).where(MitreTechnique.external_id == parent_ext) ).all() ), None, ) parent_id = parent_row.id if parent_row else None if parent_id is None: log.warning( "metamorph.mitre.orphan_subtechnique", extra={"subtechnique": sb["external_id"]}, ) n_skipped += 1 continue row = existing.get(sb["external_id"]) if row is None: s.add( MitreSubtechnique( external_id=sb["external_id"], name=sb["name"], description=sb["description"], url=sb["url"], technique_id=parent_id, ) ) else: row.name = sb["name"] row.description = sb["description"] row.url = sb["url"] row.technique_id = parent_id n_upserted += 1 return n_upserted, n_skipped def _upsert_setting(s, key: str, value: object) -> None: row = s.scalar(select(Setting).where(Setting.key == key)) if row is None: s.add(Setting(key=key, value=value)) else: row.value = value # === Entry point ============================================================= def seed_mitre( *, source: str | Path | None = None, expected_sha256: str | None = MITRE_DEFAULT_SHA256, cache_dir: Path = MITRE_BUNDLE_CACHE_PATH, allow_unverified: bool = False, ) -> SeedResult: """Top-level seed. URL → download + verify + parse; path → just parse. Custom URLs (anything other than `MITRE_DEFAULT_URL`) MUST be paired with an `expected_sha256` for integrity, or with `allow_unverified=True` to opt out explicitly. This avoids a silent integrity bypass when an operator points the sync at a typo'd or attacker-controlled mirror. """ started_at = datetime.now(tz=timezone.utc) if source is not None and _is_url(str(source)) and str(source) != MITRE_DEFAULT_URL: if expected_sha256 is None or expected_sha256 == MITRE_DEFAULT_SHA256: # The caller passed a non-default URL but didn't override the hash: # MITRE_DEFAULT_SHA256 would obviously not match → force an explicit # decision rather than silently bypassing. if not allow_unverified: raise MitreSeedError( "custom URL requires an expected_sha256 (or allow_unverified=True)" ) expected_sha256 = None path, source_label = resolve_source_to_path( source, cache_dir=cache_dir, expected_sha256=expected_sha256 ) parsed = parse_bundle(path) log.info( "metamorph.mitre.parsed", extra={ "tactics": len(parsed.tactics), "techniques": len(parsed.techniques), "subtechniques": len(parsed.subtechniques), "spec_version": parsed.spec_version, }, ) with session_scope() as s: short_to_tactic_id, n_tactics = _upsert_tactics(s, parsed.tactics) stix_to_tech_id, n_techs, n_links = _upsert_techniques( s, parsed.techniques, short_to_tactic_id ) n_subs, n_orphan = _upsert_subtechniques(s, parsed.subtechniques, stix_to_tech_id) finished_at = datetime.now(tz=timezone.utc) _upsert_setting(s, SETTING_LAST_SYNC, finished_at.isoformat()) # If the URL is the pinned one, we know the version; otherwise leave None. version = MITRE_VERSION if source_label == MITRE_DEFAULT_URL else None if version: _upsert_setting(s, SETTING_VERSION, version) _upsert_setting(s, SETTING_SOURCE_URL, source_label) result = SeedResult( tactics_upserted=n_tactics, techniques_upserted=n_techs, subtechniques_upserted=n_subs, subtechniques_skipped_orphan=n_orphan, technique_tactic_links=n_links, version=version, source=source_label, started_at=started_at, finished_at=finished_at, ) log.info("metamorph.mitre.seed_completed", extra=result.as_dict()) return result def read_status() -> dict: """Return the persisted seed metadata for `GET /mitre/status`.""" keys = {SETTING_LAST_SYNC, SETTING_VERSION, SETTING_SOURCE_URL} out = {k: None for k in keys} with session_scope() as s: for row in s.scalars(select(Setting).where(Setting.key.in_(keys))).all(): out[row.key] = row.value return { "last_sync": out[SETTING_LAST_SYNC], "version": out[SETTING_VERSION], "source_url": out[SETTING_SOURCE_URL], "default_url": MITRE_DEFAULT_URL, "default_version": MITRE_VERSION, }