@@ -0,0 +1,478 @@
""" MITRE ATT&CK Enterprise seed + sync.
Parses a STIX 2.1 bundle into the `mitre_*` tables. Idempotent: re-running
upserts on `external_id`, refreshes name/description/url, and re-applies the
technique↔tactic mapping. Sub-techniques whose parent is missing in the
bundle are skipped (with a WARNING log).
Defaults pin a specific Enterprise release (see `MITRE_DEFAULT_*`). The pin
is honored by the CLI (`flask metamorph seed-mitre`) and by the
`POST /mitre/sync` admin endpoint; both accept a `--source` / `source_url`
override for air-gapped operators.
The bundle is downloaded with `urllib.request` (stdlib — no extra dep) and
cached at `MITRE_BUNDLE_CACHE_PATH` (default `/data/mitre/<basename>.json`).
Pass an absolute path as `source` to bypass the network entirely.
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import urllib . parse
import urllib . request
from dataclasses import dataclass , field
from datetime import datetime , timezone
from pathlib import Path
from typing import Iterable
from sqlalchemy import delete , select
from app . db . session import session_scope
from app . models . mitre import (
MitreSubtechnique ,
MitreTactic ,
MitreTechnique ,
MitreTechniqueTactic ,
)
from app . models . setting import Setting
log = logging . getLogger ( " metamorph.mitre.seed " )
# === Default pin =============================================================
#
# MITRE publishes versioned bundles at
# `https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/enterprise-attack/enterprise-attack-<X.Y>.json`.
# Update these three constants in lock-step when bumping the pin. The SHA256
# is verified against the downloaded bytes — a mismatch aborts the seed.
#
MITRE_VERSION = " 19.0 "
MITRE_DEFAULT_URL = (
" https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/ "
" enterprise-attack/enterprise-attack-19.0.json "
)
MITRE_DEFAULT_SHA256 = " df520ea0775a57db7bff760145b02fed89290802913e056b7ed5970b02f3626a "
MITRE_BUNDLE_CACHE_PATH = Path ( os . environ . get ( " MITRE_CACHE_DIR " , " /data/mitre " ) )
MITRE_DOWNLOAD_TIMEOUT_SECONDS = 120
# Settings keys used to expose the seed metadata to the operator UI/CLI.
SETTING_LAST_SYNC = " mitre_last_sync "
SETTING_VERSION = " mitre_version "
SETTING_SOURCE_URL = " mitre_source_url "
ATTACK_SOURCE_NAME = " mitre-attack "
KILL_CHAIN_NAME = " mitre-attack "
class MitreSeedError ( Exception ) :
pass
class MitreChecksumMismatch ( MitreSeedError ) :
pass
@dataclass
class ParsedBundle :
tactics : list [ dict ] = field ( default_factory = list )
techniques : list [ dict ] = field ( default_factory = list ) # parent techniques
subtechniques : list [ dict ] = field ( default_factory = list )
# Map: subtechnique attack-pattern STIX id -> parent technique STIX id
subtechnique_parents : dict [ str , str ] = field ( default_factory = dict )
spec_version : str | None = None
@dataclass
class SeedResult :
tactics_upserted : int
techniques_upserted : int
subtechniques_upserted : int
subtechniques_skipped_orphan : int
technique_tactic_links : int
version : str | None
source : str
started_at : datetime
finished_at : datetime
def as_dict ( self ) - > dict :
return {
" tactics_upserted " : self . tactics_upserted ,
" techniques_upserted " : self . techniques_upserted ,
" subtechniques_upserted " : self . subtechniques_upserted ,
" subtechniques_skipped_orphan " : self . subtechniques_skipped_orphan ,
" technique_tactic_links " : self . technique_tactic_links ,
" version " : self . version ,
" source " : self . source ,
" started_at " : self . started_at . isoformat ( ) ,
" finished_at " : self . finished_at . isoformat ( ) ,
" duration_ms " : int (
( self . finished_at - self . started_at ) . total_seconds ( ) * 1000
) ,
}
# === I/O =====================================================================
def _is_url ( source : str ) - > bool :
parsed = urllib . parse . urlparse ( source )
return parsed . scheme in ( " http " , " https " )
def _sha256_of ( path : Path ) - > str :
h = hashlib . sha256 ( )
with path . open ( " rb " ) as f :
for chunk in iter ( lambda : f . read ( 1 << 16 ) , b " " ) :
h . update ( chunk )
return h . hexdigest ( )
def _download ( url : str , dest : Path , * , expected_sha256 : str | None = None ) - > Path :
dest . parent . mkdir ( parents = True , exist_ok = True )
tmp = dest . with_suffix ( dest . suffix + " .part " )
log . info ( " metamorph.mitre.download.start " , extra = { " url " : url , " dest " : str ( dest ) } )
req = urllib . request . Request ( url , headers = { " User-Agent " : " metamorph-mitre-seed/1.0 " } )
with urllib . request . urlopen ( req , timeout = MITRE_DOWNLOAD_TIMEOUT_SECONDS ) as resp :
with tmp . open ( " wb " ) as f :
for chunk in iter ( lambda : resp . read ( 1 << 16 ) , b " " ) :
f . write ( chunk )
if expected_sha256 :
actual = _sha256_of ( tmp )
if actual != expected_sha256 :
tmp . unlink ( missing_ok = True )
raise MitreChecksumMismatch (
f " sha256 mismatch for { url } : expected { expected_sha256 } , got { actual } "
)
tmp . replace ( dest )
log . info (
" metamorph.mitre.download.done " ,
extra = { " url " : url , " bytes " : dest . stat ( ) . st_size } ,
)
return dest
def resolve_source_to_path (
source : str | Path | None ,
* ,
cache_dir : Path = MITRE_BUNDLE_CACHE_PATH ,
expected_sha256 : str | None = MITRE_DEFAULT_SHA256 ,
) - > tuple [ Path , str ] :
""" Return (path, source_label). Downloads if `source` is an URL; otherwise
treats it as a local file. `None` → default URL.
`source_label` is what we persist in `settings.mitre_source_url`. """
if source is None :
source = MITRE_DEFAULT_URL
source_str = str ( source )
if _is_url ( source_str ) :
basename = source_str . rsplit ( " / " , 1 ) [ - 1 ] or " enterprise-attack.json "
dest = cache_dir / basename
_download ( source_str , dest , expected_sha256 = expected_sha256 )
return dest , source_str
path = Path ( source_str )
if not path . exists ( ) :
raise MitreSeedError ( f " source path does not exist: { path } " )
return path , str ( path )
# === STIX parsing ============================================================
def _attack_ref ( obj : dict ) - > dict | None :
for ref in obj . get ( " external_references " ) or ( ) :
if ref . get ( " source_name " ) == ATTACK_SOURCE_NAME and ref . get ( " external_id " ) :
return ref
return None
def parse_bundle ( path : Path ) - > ParsedBundle :
""" Read the STIX bundle into normalized dicts ready for SQL upserts. """
with path . open ( " r " , encoding = " utf-8 " ) as f :
bundle = json . load ( f )
objs = bundle . get ( " objects " ) or [ ]
parsed = ParsedBundle ( spec_version = bundle . get ( " spec_version " ) )
parents_by_subtech : dict [ str , str ] = { }
for o in objs :
if (
o . get ( " type " ) == " relationship "
and o . get ( " relationship_type " ) == " subtechnique-of "
and not o . get ( " revoked " )
) :
parents_by_subtech [ o [ " source_ref " ] ] = o [ " target_ref " ]
parsed . subtechnique_parents = parents_by_subtech
for o in objs :
if o . get ( " revoked " ) or o . get ( " x_mitre_deprecated " ) :
continue
kind = o . get ( " type " )
if kind == " x-mitre-tactic " :
ref = _attack_ref ( o )
if not ref :
continue
parsed . tactics . append (
{
" external_id " : ref [ " external_id " ] ,
" name " : o . get ( " name " ) or " " ,
" short_name " : o . get ( " x_mitre_shortname " ) or " " ,
" description " : o . get ( " description " ) ,
" url " : ref . get ( " url " ) ,
}
)
elif kind == " attack-pattern " :
ref = _attack_ref ( o )
if not ref :
continue
common = {
" external_id " : ref [ " external_id " ] ,
" name " : o . get ( " name " ) or " " ,
" description " : o . get ( " description " ) ,
" url " : ref . get ( " url " ) ,
}
if o . get ( " x_mitre_is_subtechnique " ) :
parent_stix = parents_by_subtech . get ( o [ " id " ] )
parsed . subtechniques . append (
{ * * common , " stix_id " : o [ " id " ] , " parent_stix_id " : parent_stix }
)
else :
# Capture kill_chain_phases so we can map to tactics by short_name.
phases = [
p . get ( " phase_name " )
for p in ( o . get ( " kill_chain_phases " ) or ( ) )
if p . get ( " kill_chain_name " ) == KILL_CHAIN_NAME and p . get ( " phase_name " )
]
parsed . techniques . append (
{ * * common , " stix_id " : o [ " id " ] , " phase_names " : phases }
)
return parsed
# === DB upserts ==============================================================
def _upsert_tactics ( s , tactics : Iterable [ dict ] ) - > tuple [ dict , int ] :
""" Upsert tactics. Returns (short_name → tactic_id, n_upserted). """
existing = { t . external_id : t for t in s . scalars ( select ( MitreTactic ) ) . all ( ) }
short_to_id : dict = { }
upserted = 0
for t in tactics :
row = existing . get ( t [ " external_id " ] )
if row is None :
row = MitreTactic (
external_id = t [ " external_id " ] ,
short_name = t [ " short_name " ] ,
name = t [ " name " ] ,
description = t [ " description " ] ,
url = t [ " url " ] ,
)
s . add ( row )
s . flush ( )
upserted + = 1
else :
row . short_name = t [ " short_name " ]
row . name = t [ " name " ]
row . description = t [ " description " ]
row . url = t [ " url " ]
upserted + = 1
short_to_id [ t [ " short_name " ] ] = row . id
return short_to_id , upserted
def _upsert_techniques (
s , techniques : Iterable [ dict ] , short_to_tactic_id : dict
) - > tuple [ dict , int , int ] :
""" Upsert techniques + their tactic links. Returns (stix_id→technique_id, n_upserted, n_links). """
existing = { t . external_id : t for t in s . scalars ( select ( MitreTechnique ) ) . all ( ) }
stix_to_id : dict = { }
n_upserted = 0
n_links = 0
# We'll rebuild the technique↔tactic mapping for clarity (drop + add). This
# is O(techniques × tactics) but cheap relative to the parse itself.
s . execute ( delete ( MitreTechniqueTactic ) )
for t in techniques :
row = existing . get ( t [ " external_id " ] )
if row is None :
row = MitreTechnique (
external_id = t [ " external_id " ] ,
name = t [ " name " ] ,
description = t [ " description " ] ,
url = t [ " url " ] ,
)
s . add ( row )
s . flush ( )
else :
row . name = t [ " name " ]
row . description = t [ " description " ]
row . url = t [ " url " ]
n_upserted + = 1
stix_to_id [ t [ " stix_id " ] ] = row . id
for phase in t . get ( " phase_names " , [ ] ) :
tac_id = short_to_tactic_id . get ( phase )
if tac_id is None :
# Tactic referenced but not in bundle — log + skip.
log . warning (
" metamorph.mitre.unknown_tactic_phase " ,
extra = { " technique " : t [ " external_id " ] , " phase " : phase } ,
)
continue
s . add ( MitreTechniqueTactic ( technique_id = row . id , tactic_id = tac_id ) )
n_links + = 1
return stix_to_id , n_upserted , n_links
def _upsert_subtechniques (
s ,
subtechniques : Iterable [ dict ] ,
stix_to_tech_id : dict ,
) - > tuple [ int , int ] :
""" Returns (n_upserted, n_skipped_orphans). """
existing = { sb . external_id : sb for sb in s . scalars ( select ( MitreSubtechnique ) ) . all ( ) }
n_upserted = 0
n_skipped = 0
for sb in subtechniques :
parent_stix = sb . get ( " parent_stix_id " )
parent_id = stix_to_tech_id . get ( parent_stix ) if parent_stix else None
if parent_id is None :
# Fall back to the dotted external_id convention (T1003.001 → T1003).
m = re . match ( r " ^(T \ d+) \ . \ d+$ " , sb [ " external_id " ] )
if m :
parent_ext = m . group ( 1 )
# We don't have a parent-by-external-id map here; query.
parent_row = next (
iter (
s . scalars (
select ( MitreTechnique ) . where ( MitreTechnique . external_id == parent_ext )
) . all ( )
) ,
None ,
)
parent_id = parent_row . id if parent_row else None
if parent_id is None :
log . warning (
" metamorph.mitre.orphan_subtechnique " ,
extra = { " subtechnique " : sb [ " external_id " ] } ,
)
n_skipped + = 1
continue
row = existing . get ( sb [ " external_id " ] )
if row is None :
s . add (
MitreSubtechnique (
external_id = sb [ " external_id " ] ,
name = sb [ " name " ] ,
description = sb [ " description " ] ,
url = sb [ " url " ] ,
technique_id = parent_id ,
)
)
else :
row . name = sb [ " name " ]
row . description = sb [ " description " ]
row . url = sb [ " url " ]
row . technique_id = parent_id
n_upserted + = 1
return n_upserted , n_skipped
def _upsert_setting ( s , key : str , value : object ) - > None :
row = s . scalar ( select ( Setting ) . where ( Setting . key == key ) )
if row is None :
s . add ( Setting ( key = key , value = value ) )
else :
row . value = value
# === Entry point =============================================================
def seed_mitre (
* ,
source : str | Path | None = None ,
expected_sha256 : str | None = MITRE_DEFAULT_SHA256 ,
cache_dir : Path = MITRE_BUNDLE_CACHE_PATH ,
allow_unverified : bool = False ,
) - > SeedResult :
""" Top-level seed. URL → download + verify + parse; path → just parse.
Custom URLs (anything other than `MITRE_DEFAULT_URL`) MUST be paired with
an `expected_sha256` for integrity, or with `allow_unverified=True` to opt
out explicitly. This avoids a silent integrity bypass when an operator
points the sync at a typo ' d or attacker-controlled mirror.
"""
started_at = datetime . now ( tz = timezone . utc )
if source is not None and _is_url ( str ( source ) ) and str ( source ) != MITRE_DEFAULT_URL :
if expected_sha256 is None or expected_sha256 == MITRE_DEFAULT_SHA256 :
# The caller passed a non-default URL but didn't override the hash:
# MITRE_DEFAULT_SHA256 would obviously not match → force an explicit
# decision rather than silently bypassing.
if not allow_unverified :
raise MitreSeedError (
" custom URL requires an expected_sha256 (or allow_unverified=True) "
)
expected_sha256 = None
path , source_label = resolve_source_to_path (
source , cache_dir = cache_dir , expected_sha256 = expected_sha256
)
parsed = parse_bundle ( path )
log . info (
" metamorph.mitre.parsed " ,
extra = {
" tactics " : len ( parsed . tactics ) ,
" techniques " : len ( parsed . techniques ) ,
" subtechniques " : len ( parsed . subtechniques ) ,
" spec_version " : parsed . spec_version ,
} ,
)
with session_scope ( ) as s :
short_to_tactic_id , n_tactics = _upsert_tactics ( s , parsed . tactics )
stix_to_tech_id , n_techs , n_links = _upsert_techniques (
s , parsed . techniques , short_to_tactic_id
)
n_subs , n_orphan = _upsert_subtechniques ( s , parsed . subtechniques , stix_to_tech_id )
finished_at = datetime . now ( tz = timezone . utc )
_upsert_setting ( s , SETTING_LAST_SYNC , finished_at . isoformat ( ) )
# If the URL is the pinned one, we know the version; otherwise leave None.
version = MITRE_VERSION if source_label == MITRE_DEFAULT_URL else None
if version :
_upsert_setting ( s , SETTING_VERSION , version )
_upsert_setting ( s , SETTING_SOURCE_URL , source_label )
result = SeedResult (
tactics_upserted = n_tactics ,
techniques_upserted = n_techs ,
subtechniques_upserted = n_subs ,
subtechniques_skipped_orphan = n_orphan ,
technique_tactic_links = n_links ,
version = version ,
source = source_label ,
started_at = started_at ,
finished_at = finished_at ,
)
log . info ( " metamorph.mitre.seed_completed " , extra = result . as_dict ( ) )
return result
def read_status ( ) - > dict :
""" Return the persisted seed metadata for `GET /mitre/status`. """
keys = { SETTING_LAST_SYNC , SETTING_VERSION , SETTING_SOURCE_URL }
out = { k : None for k in keys }
with session_scope ( ) as s :
for row in s . scalars ( select ( Setting ) . where ( Setting . key . in_ ( keys ) ) ) . all ( ) :
out [ row . key ] = row . value
return {
" last_sync " : out [ SETTING_LAST_SYNC ] ,
" version " : out [ SETTING_VERSION ] ,
" source_url " : out [ SETTING_SOURCE_URL ] ,
" default_url " : MITRE_DEFAULT_URL ,
" default_version " : MITRE_VERSION ,
}