Files
Metamorph/backend/app/cli.py
Knacky ba976959a1 feat(m4): STIX parser + seed service + CLI
- backend/app/services/mitre_seed.py: stdlib-only STIX 2.1 parser (urllib +
  hashlib + json). Pinned to enterprise-attack-19.0.json with sha256
  df520ea0775a57db7bff760145b02fed89290802913e056b7ed5970b02f3626a (~52 MB,
  ~1.1 s parse). Resolves sub-technique parents via
  relationship[subtechnique-of] with a T1003.001→T1003 dotted-id fallback;
  upserts on external_id, rebuilds the technique↔tactic M2M in a single
  transaction so external readers never see an empty join. Persists
  mitre_last_sync, mitre_version, mitre_source_url in the settings table.
- Custom URLs MUST be paired with expected_sha256 OR allow_unverified=true —
  refuses silent integrity bypass.
- CLI: flask metamorph seed-mitre [--source path|url]
  [--checksum-sha256 hex] [--skip-checksum]. Make target wraps it.
- Docker: /data/mitre/ chowned to the metamorph user at build; named volume
  metamorph_mitre mounted from compose for cross-restart cache.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 13:53:53 +02:00

122 lines
3.5 KiB
Python

"""Flask CLI entry point.
Used as `flask --app app.cli metamorph <subcommand>` (or via the make targets).
"""
from __future__ import annotations
import sys
import click
from flask import Flask
from flask.cli import AppGroup
from app.core.install_token import (
ensure_install_token,
log_install_token_banner,
regenerate_install_token,
)
from app.core.logging import configure_logging
from app.services.bootstrap import ensure_system_groups
from app.core.config import settings
def _create_cli_app() -> Flask:
configure_logging(settings.LOG_LEVEL)
return Flask("metamorph-cli")
app = _create_cli_app()
metamorph = AppGroup("metamorph", help="Metamorph admin commands.")
@metamorph.command("print-install-token")
@click.option(
"--force",
is_flag=True,
help="Always mint a fresh token even if one is already pending.",
)
def print_install_token(force: bool):
"""Mint and print the bootstrap install token (idempotent unless --force)."""
ensure_system_groups()
if force:
token = regenerate_install_token()
else:
token = ensure_install_token()
if token is None:
click.echo(
"No install token minted: either at least one user already exists, "
"or a token is already pending (use --force to mint a fresh one).",
err=True,
)
sys.exit(1)
log_install_token_banner(token)
@metamorph.command("seed-mitre")
@click.option(
"--source",
default=None,
help="STIX bundle source: local path or HTTPS URL. Defaults to the pinned MITRE Enterprise release.",
)
@click.option(
"--checksum-sha256",
"checksum_sha256",
default=None,
help="Expected sha256 of the bundle (required with a non-default --source URL unless --skip-checksum).",
)
@click.option(
"--skip-checksum",
is_flag=True,
help="Skip sha256 verification entirely (escape hatch for testing).",
)
def seed_mitre(source: str | None, checksum_sha256: str | None, skip_checksum: bool):
"""Seed/refresh the MITRE ATT&CK Enterprise reference tables.
Upserts on `external_id`. Re-running with the same source updates the
name/description/url and re-applies the technique↔tactic mapping.
"""
from app.services.mitre_seed import (
MITRE_DEFAULT_SHA256,
MITRE_DEFAULT_URL,
seed_mitre as seed_mitre_svc,
)
if skip_checksum:
expected_sha = None
elif checksum_sha256:
expected_sha = checksum_sha256
elif source is None or source == MITRE_DEFAULT_URL:
expected_sha = MITRE_DEFAULT_SHA256
else:
expected_sha = None # let seed_mitre_svc decide whether to refuse
click.echo(
f"Seeding from {source or MITRE_DEFAULT_URL} "
f"(sha256 check: {'off' if skip_checksum else expected_sha or 'unverified'}) ...",
err=True,
)
try:
result = seed_mitre_svc(
source=source,
expected_sha256=expected_sha,
allow_unverified=skip_checksum,
)
except Exception as e: # noqa: BLE001
click.echo(f"seed-mitre failed: {e}", err=True)
sys.exit(2)
click.echo(
f" tactics: {result.tactics_upserted}, "
f"techniques: {result.techniques_upserted}, "
f"subtechniques: {result.subtechniques_upserted} "
f"(skipped orphans: {result.subtechniques_skipped_orphan}), "
f"links: {result.technique_tactic_links}, "
f"duration: {(result.finished_at - result.started_at).total_seconds():.1f}s",
err=True,
)
app.cli.add_command(metamorph)