diff --git a/backend/Dockerfile b/backend/Dockerfile index e4d1713..6391b28 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -30,7 +30,7 @@ ENV PYTHONDONTWRITEBYTECODE=1 \ # Non-root user RUN groupadd --gid 10001 metamorph \ && useradd --uid 10001 --gid metamorph --shell /usr/sbin/nologin --create-home metamorph \ - && mkdir -p /data/evidence \ + && mkdir -p /data/evidence /data/mitre \ && chown -R metamorph:metamorph /data COPY --from=deps /opt/venv /opt/venv diff --git a/backend/app/cli.py b/backend/app/cli.py index c452d21..663eeed 100644 --- a/backend/app/cli.py +++ b/backend/app/cli.py @@ -56,10 +56,66 @@ def print_install_token(force: bool): @metamorph.command("seed-mitre") -def seed_mitre(): - """Placeholder for M4 — left so `make seed-mitre` doesn't crash.""" - click.echo("MITRE seeding will land in M4. (no-op for now)", err=True) - sys.exit(0) +@click.option( + "--source", + default=None, + help="STIX bundle source: local path or HTTPS URL. Defaults to the pinned MITRE Enterprise release.", +) +@click.option( + "--checksum-sha256", + "checksum_sha256", + default=None, + help="Expected sha256 of the bundle (required with a non-default --source URL unless --skip-checksum).", +) +@click.option( + "--skip-checksum", + is_flag=True, + help="Skip sha256 verification entirely (escape hatch for testing).", +) +def seed_mitre(source: str | None, checksum_sha256: str | None, skip_checksum: bool): + """Seed/refresh the MITRE ATT&CK Enterprise reference tables. + + Upserts on `external_id`. Re-running with the same source updates the + name/description/url and re-applies the technique↔tactic mapping. + """ + from app.services.mitre_seed import ( + MITRE_DEFAULT_SHA256, + MITRE_DEFAULT_URL, + seed_mitre as seed_mitre_svc, + ) + + if skip_checksum: + expected_sha = None + elif checksum_sha256: + expected_sha = checksum_sha256 + elif source is None or source == MITRE_DEFAULT_URL: + expected_sha = MITRE_DEFAULT_SHA256 + else: + expected_sha = None # let seed_mitre_svc decide whether to refuse + + click.echo( + f"Seeding from {source or MITRE_DEFAULT_URL} " + f"(sha256 check: {'off' if skip_checksum else expected_sha or 'unverified'}) ...", + err=True, + ) + try: + result = seed_mitre_svc( + source=source, + expected_sha256=expected_sha, + allow_unverified=skip_checksum, + ) + except Exception as e: # noqa: BLE001 + click.echo(f"seed-mitre failed: {e}", err=True) + sys.exit(2) + click.echo( + f" tactics: {result.tactics_upserted}, " + f"techniques: {result.techniques_upserted}, " + f"subtechniques: {result.subtechniques_upserted} " + f"(skipped orphans: {result.subtechniques_skipped_orphan}), " + f"links: {result.technique_tactic_links}, " + f"duration: {(result.finished_at - result.started_at).total_seconds():.1f}s", + err=True, + ) app.cli.add_command(metamorph) diff --git a/backend/app/services/mitre_seed.py b/backend/app/services/mitre_seed.py new file mode 100644 index 0000000..ee42e43 --- /dev/null +++ b/backend/app/services/mitre_seed.py @@ -0,0 +1,478 @@ +"""MITRE ATT&CK Enterprise seed + sync. + +Parses a STIX 2.1 bundle into the `mitre_*` tables. Idempotent: re-running +upserts on `external_id`, refreshes name/description/url, and re-applies the +technique↔tactic mapping. Sub-techniques whose parent is missing in the +bundle are skipped (with a WARNING log). + +Defaults pin a specific Enterprise release (see `MITRE_DEFAULT_*`). The pin +is honored by the CLI (`flask metamorph seed-mitre`) and by the +`POST /mitre/sync` admin endpoint; both accept a `--source` / `source_url` +override for air-gapped operators. + +The bundle is downloaded with `urllib.request` (stdlib — no extra dep) and +cached at `MITRE_BUNDLE_CACHE_PATH` (default `/data/mitre/.json`). +Pass an absolute path as `source` to bypass the network entirely. +""" + +from __future__ import annotations + +import hashlib +import json +import logging +import os +import re +import urllib.parse +import urllib.request +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Iterable + +from sqlalchemy import delete, select + +from app.db.session import session_scope +from app.models.mitre import ( + MitreSubtechnique, + MitreTactic, + MitreTechnique, + MitreTechniqueTactic, +) +from app.models.setting import Setting + +log = logging.getLogger("metamorph.mitre.seed") + +# === Default pin ============================================================= +# +# MITRE publishes versioned bundles at +# `https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/enterprise-attack/enterprise-attack-.json`. +# Update these three constants in lock-step when bumping the pin. The SHA256 +# is verified against the downloaded bytes — a mismatch aborts the seed. +# +MITRE_VERSION = "19.0" +MITRE_DEFAULT_URL = ( + "https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/" + "enterprise-attack/enterprise-attack-19.0.json" +) +MITRE_DEFAULT_SHA256 = "df520ea0775a57db7bff760145b02fed89290802913e056b7ed5970b02f3626a" + +MITRE_BUNDLE_CACHE_PATH = Path(os.environ.get("MITRE_CACHE_DIR", "/data/mitre")) +MITRE_DOWNLOAD_TIMEOUT_SECONDS = 120 + +# Settings keys used to expose the seed metadata to the operator UI/CLI. +SETTING_LAST_SYNC = "mitre_last_sync" +SETTING_VERSION = "mitre_version" +SETTING_SOURCE_URL = "mitre_source_url" + +ATTACK_SOURCE_NAME = "mitre-attack" +KILL_CHAIN_NAME = "mitre-attack" + + +class MitreSeedError(Exception): + pass + + +class MitreChecksumMismatch(MitreSeedError): + pass + + +@dataclass +class ParsedBundle: + tactics: list[dict] = field(default_factory=list) + techniques: list[dict] = field(default_factory=list) # parent techniques + subtechniques: list[dict] = field(default_factory=list) + # Map: subtechnique attack-pattern STIX id -> parent technique STIX id + subtechnique_parents: dict[str, str] = field(default_factory=dict) + spec_version: str | None = None + + +@dataclass +class SeedResult: + tactics_upserted: int + techniques_upserted: int + subtechniques_upserted: int + subtechniques_skipped_orphan: int + technique_tactic_links: int + version: str | None + source: str + started_at: datetime + finished_at: datetime + + def as_dict(self) -> dict: + return { + "tactics_upserted": self.tactics_upserted, + "techniques_upserted": self.techniques_upserted, + "subtechniques_upserted": self.subtechniques_upserted, + "subtechniques_skipped_orphan": self.subtechniques_skipped_orphan, + "technique_tactic_links": self.technique_tactic_links, + "version": self.version, + "source": self.source, + "started_at": self.started_at.isoformat(), + "finished_at": self.finished_at.isoformat(), + "duration_ms": int( + (self.finished_at - self.started_at).total_seconds() * 1000 + ), + } + + +# === I/O ===================================================================== + + +def _is_url(source: str) -> bool: + parsed = urllib.parse.urlparse(source) + return parsed.scheme in ("http", "https") + + +def _sha256_of(path: Path) -> str: + h = hashlib.sha256() + with path.open("rb") as f: + for chunk in iter(lambda: f.read(1 << 16), b""): + h.update(chunk) + return h.hexdigest() + + +def _download(url: str, dest: Path, *, expected_sha256: str | None = None) -> Path: + dest.parent.mkdir(parents=True, exist_ok=True) + tmp = dest.with_suffix(dest.suffix + ".part") + log.info("metamorph.mitre.download.start", extra={"url": url, "dest": str(dest)}) + req = urllib.request.Request(url, headers={"User-Agent": "metamorph-mitre-seed/1.0"}) + with urllib.request.urlopen(req, timeout=MITRE_DOWNLOAD_TIMEOUT_SECONDS) as resp: + with tmp.open("wb") as f: + for chunk in iter(lambda: resp.read(1 << 16), b""): + f.write(chunk) + if expected_sha256: + actual = _sha256_of(tmp) + if actual != expected_sha256: + tmp.unlink(missing_ok=True) + raise MitreChecksumMismatch( + f"sha256 mismatch for {url}: expected {expected_sha256}, got {actual}" + ) + tmp.replace(dest) + log.info( + "metamorph.mitre.download.done", + extra={"url": url, "bytes": dest.stat().st_size}, + ) + return dest + + +def resolve_source_to_path( + source: str | Path | None, + *, + cache_dir: Path = MITRE_BUNDLE_CACHE_PATH, + expected_sha256: str | None = MITRE_DEFAULT_SHA256, +) -> tuple[Path, str]: + """Return (path, source_label). Downloads if `source` is an URL; otherwise + treats it as a local file. `None` → default URL. + + `source_label` is what we persist in `settings.mitre_source_url`.""" + if source is None: + source = MITRE_DEFAULT_URL + source_str = str(source) + if _is_url(source_str): + basename = source_str.rsplit("/", 1)[-1] or "enterprise-attack.json" + dest = cache_dir / basename + _download(source_str, dest, expected_sha256=expected_sha256) + return dest, source_str + path = Path(source_str) + if not path.exists(): + raise MitreSeedError(f"source path does not exist: {path}") + return path, str(path) + + +# === STIX parsing ============================================================ + + +def _attack_ref(obj: dict) -> dict | None: + for ref in obj.get("external_references") or (): + if ref.get("source_name") == ATTACK_SOURCE_NAME and ref.get("external_id"): + return ref + return None + + +def parse_bundle(path: Path) -> ParsedBundle: + """Read the STIX bundle into normalized dicts ready for SQL upserts.""" + with path.open("r", encoding="utf-8") as f: + bundle = json.load(f) + objs = bundle.get("objects") or [] + parsed = ParsedBundle(spec_version=bundle.get("spec_version")) + + parents_by_subtech: dict[str, str] = {} + for o in objs: + if ( + o.get("type") == "relationship" + and o.get("relationship_type") == "subtechnique-of" + and not o.get("revoked") + ): + parents_by_subtech[o["source_ref"]] = o["target_ref"] + parsed.subtechnique_parents = parents_by_subtech + + for o in objs: + if o.get("revoked") or o.get("x_mitre_deprecated"): + continue + kind = o.get("type") + if kind == "x-mitre-tactic": + ref = _attack_ref(o) + if not ref: + continue + parsed.tactics.append( + { + "external_id": ref["external_id"], + "name": o.get("name") or "", + "short_name": o.get("x_mitre_shortname") or "", + "description": o.get("description"), + "url": ref.get("url"), + } + ) + elif kind == "attack-pattern": + ref = _attack_ref(o) + if not ref: + continue + common = { + "external_id": ref["external_id"], + "name": o.get("name") or "", + "description": o.get("description"), + "url": ref.get("url"), + } + if o.get("x_mitre_is_subtechnique"): + parent_stix = parents_by_subtech.get(o["id"]) + parsed.subtechniques.append( + {**common, "stix_id": o["id"], "parent_stix_id": parent_stix} + ) + else: + # Capture kill_chain_phases so we can map to tactics by short_name. + phases = [ + p.get("phase_name") + for p in (o.get("kill_chain_phases") or ()) + if p.get("kill_chain_name") == KILL_CHAIN_NAME and p.get("phase_name") + ] + parsed.techniques.append( + {**common, "stix_id": o["id"], "phase_names": phases} + ) + return parsed + + +# === DB upserts ============================================================== + + +def _upsert_tactics(s, tactics: Iterable[dict]) -> tuple[dict, int]: + """Upsert tactics. Returns (short_name → tactic_id, n_upserted).""" + existing = {t.external_id: t for t in s.scalars(select(MitreTactic)).all()} + short_to_id: dict = {} + upserted = 0 + for t in tactics: + row = existing.get(t["external_id"]) + if row is None: + row = MitreTactic( + external_id=t["external_id"], + short_name=t["short_name"], + name=t["name"], + description=t["description"], + url=t["url"], + ) + s.add(row) + s.flush() + upserted += 1 + else: + row.short_name = t["short_name"] + row.name = t["name"] + row.description = t["description"] + row.url = t["url"] + upserted += 1 + short_to_id[t["short_name"]] = row.id + return short_to_id, upserted + + +def _upsert_techniques( + s, techniques: Iterable[dict], short_to_tactic_id: dict +) -> tuple[dict, int, int]: + """Upsert techniques + their tactic links. Returns (stix_id→technique_id, n_upserted, n_links).""" + existing = {t.external_id: t for t in s.scalars(select(MitreTechnique)).all()} + stix_to_id: dict = {} + n_upserted = 0 + n_links = 0 + + # We'll rebuild the technique↔tactic mapping for clarity (drop + add). This + # is O(techniques × tactics) but cheap relative to the parse itself. + s.execute(delete(MitreTechniqueTactic)) + + for t in techniques: + row = existing.get(t["external_id"]) + if row is None: + row = MitreTechnique( + external_id=t["external_id"], + name=t["name"], + description=t["description"], + url=t["url"], + ) + s.add(row) + s.flush() + else: + row.name = t["name"] + row.description = t["description"] + row.url = t["url"] + n_upserted += 1 + stix_to_id[t["stix_id"]] = row.id + for phase in t.get("phase_names", []): + tac_id = short_to_tactic_id.get(phase) + if tac_id is None: + # Tactic referenced but not in bundle — log + skip. + log.warning( + "metamorph.mitre.unknown_tactic_phase", + extra={"technique": t["external_id"], "phase": phase}, + ) + continue + s.add(MitreTechniqueTactic(technique_id=row.id, tactic_id=tac_id)) + n_links += 1 + return stix_to_id, n_upserted, n_links + + +def _upsert_subtechniques( + s, + subtechniques: Iterable[dict], + stix_to_tech_id: dict, +) -> tuple[int, int]: + """Returns (n_upserted, n_skipped_orphans).""" + existing = {sb.external_id: sb for sb in s.scalars(select(MitreSubtechnique)).all()} + n_upserted = 0 + n_skipped = 0 + for sb in subtechniques: + parent_stix = sb.get("parent_stix_id") + parent_id = stix_to_tech_id.get(parent_stix) if parent_stix else None + if parent_id is None: + # Fall back to the dotted external_id convention (T1003.001 → T1003). + m = re.match(r"^(T\d+)\.\d+$", sb["external_id"]) + if m: + parent_ext = m.group(1) + # We don't have a parent-by-external-id map here; query. + parent_row = next( + iter( + s.scalars( + select(MitreTechnique).where(MitreTechnique.external_id == parent_ext) + ).all() + ), + None, + ) + parent_id = parent_row.id if parent_row else None + if parent_id is None: + log.warning( + "metamorph.mitre.orphan_subtechnique", + extra={"subtechnique": sb["external_id"]}, + ) + n_skipped += 1 + continue + row = existing.get(sb["external_id"]) + if row is None: + s.add( + MitreSubtechnique( + external_id=sb["external_id"], + name=sb["name"], + description=sb["description"], + url=sb["url"], + technique_id=parent_id, + ) + ) + else: + row.name = sb["name"] + row.description = sb["description"] + row.url = sb["url"] + row.technique_id = parent_id + n_upserted += 1 + return n_upserted, n_skipped + + +def _upsert_setting(s, key: str, value: object) -> None: + row = s.scalar(select(Setting).where(Setting.key == key)) + if row is None: + s.add(Setting(key=key, value=value)) + else: + row.value = value + + +# === Entry point ============================================================= + + +def seed_mitre( + *, + source: str | Path | None = None, + expected_sha256: str | None = MITRE_DEFAULT_SHA256, + cache_dir: Path = MITRE_BUNDLE_CACHE_PATH, + allow_unverified: bool = False, +) -> SeedResult: + """Top-level seed. URL → download + verify + parse; path → just parse. + + Custom URLs (anything other than `MITRE_DEFAULT_URL`) MUST be paired with + an `expected_sha256` for integrity, or with `allow_unverified=True` to opt + out explicitly. This avoids a silent integrity bypass when an operator + points the sync at a typo'd or attacker-controlled mirror. + """ + started_at = datetime.now(tz=timezone.utc) + if source is not None and _is_url(str(source)) and str(source) != MITRE_DEFAULT_URL: + if expected_sha256 is None or expected_sha256 == MITRE_DEFAULT_SHA256: + # The caller passed a non-default URL but didn't override the hash: + # MITRE_DEFAULT_SHA256 would obviously not match → force an explicit + # decision rather than silently bypassing. + if not allow_unverified: + raise MitreSeedError( + "custom URL requires an expected_sha256 (or allow_unverified=True)" + ) + expected_sha256 = None + + path, source_label = resolve_source_to_path( + source, cache_dir=cache_dir, expected_sha256=expected_sha256 + ) + + parsed = parse_bundle(path) + log.info( + "metamorph.mitre.parsed", + extra={ + "tactics": len(parsed.tactics), + "techniques": len(parsed.techniques), + "subtechniques": len(parsed.subtechniques), + "spec_version": parsed.spec_version, + }, + ) + + with session_scope() as s: + short_to_tactic_id, n_tactics = _upsert_tactics(s, parsed.tactics) + stix_to_tech_id, n_techs, n_links = _upsert_techniques( + s, parsed.techniques, short_to_tactic_id + ) + n_subs, n_orphan = _upsert_subtechniques(s, parsed.subtechniques, stix_to_tech_id) + + finished_at = datetime.now(tz=timezone.utc) + _upsert_setting(s, SETTING_LAST_SYNC, finished_at.isoformat()) + # If the URL is the pinned one, we know the version; otherwise leave None. + version = MITRE_VERSION if source_label == MITRE_DEFAULT_URL else None + if version: + _upsert_setting(s, SETTING_VERSION, version) + _upsert_setting(s, SETTING_SOURCE_URL, source_label) + + result = SeedResult( + tactics_upserted=n_tactics, + techniques_upserted=n_techs, + subtechniques_upserted=n_subs, + subtechniques_skipped_orphan=n_orphan, + technique_tactic_links=n_links, + version=version, + source=source_label, + started_at=started_at, + finished_at=finished_at, + ) + log.info("metamorph.mitre.seed_completed", extra=result.as_dict()) + return result + + +def read_status() -> dict: + """Return the persisted seed metadata for `GET /mitre/status`.""" + keys = {SETTING_LAST_SYNC, SETTING_VERSION, SETTING_SOURCE_URL} + out = {k: None for k in keys} + with session_scope() as s: + for row in s.scalars(select(Setting).where(Setting.key.in_(keys))).all(): + out[row.key] = row.value + return { + "last_sync": out[SETTING_LAST_SYNC], + "version": out[SETTING_VERSION], + "source_url": out[SETTING_SOURCE_URL], + "default_url": MITRE_DEFAULT_URL, + "default_version": MITRE_VERSION, + } diff --git a/docker-compose.yml b/docker-compose.yml index a2d11ba..e9da7d5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -38,6 +38,7 @@ services: EVIDENCE_DIR: ${EVIDENCE_DIR} volumes: - metamorph_evidence:/data/evidence + - metamorph_mitre:/data/mitre depends_on: db: condition: service_healthy @@ -76,6 +77,7 @@ services: volumes: metamorph_db: metamorph_evidence: + metamorph_mitre: networks: metamorph: