Compare commits

...

5 Commits

Author SHA1 Message Date
Knacky
37e9e03f02 docs(m4): CHANGELOG, README, lessons, spec drift fix, todo tick
- CHANGELOG: added M4 section listing endpoints, CLI, volume, persisted
  settings, picker, and the post-spec-review fixes (custom-URL integrity
  requirement + /diag/reset consistency + spec drift). Includes the
  intentional decisions paragraph (seed-time download not image-baked, read
  endpoints unauthenticated-perm-wise, stdlib over httpx).
- README: status bumped to M0–M4, added MITRE quickstart (make seed-mitre +
  air-gapped path with --source /data/mitre/<file> + --skip-checksum),
  testing-m<N>.md pointer updated to testing-m4.md, roadmap line.
- tasks/spec.md §10 #4: amended "14 tactics Enterprise" → "≥14 tactics
  Enterprise (la v19 du pin actuel en ship 15)".
- tasks/lessons.md: 7 M4 lessons captured (stdlib STIX parsing, decoupling
  DoD asserts from upstream versions, subtechnique parent resolution, single-
  transaction safety, custom-URL footgun mitigation, /diag/reset consistency,
  named-volume permission caveat, podman build cache surprise).
- tasks/todo.md: M4 marked ☑.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 13:54:46 +02:00
Knacky
90036437cc test(m4): pytest parser + endpoints + e2e tag picker
- backend/tests/test_mitre.py: 12 integration tests using a hand-crafted
  minimal STIX bundle (no network in tests). Covers parser
  (revoked/deprecated skip, sub-technique parent linkage), seed idempotence,
  persisted settings, checksum mismatch path, all four read endpoints, perm
  enforcement on /mitre/sync, ILIKE search.
- e2e/tests/m4-mitre.spec.ts: 6 Playwright tests against the live stack.
  beforeAll calls POST /mitre/sync once (real bundle, ~50 MB, ~1.1 s) then
  the suite validates tactics ≥14, T1003 has ≥5 sub-techniques, the picker
  walks tactic→technique→subtechnique with chip multi-select, and non-admin
  sees /mitre but no Sync card.
- tasks/testing-m4.md: manual + automated checklist, air-gapped operator
  notes, volume-permission caveat for pre-existing root-owned volumes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 13:54:26 +02:00
Knacky
8a1dd58c83 feat(m4): frontend MitreTagPicker + /mitre showcase page
- lib/mitre.ts: shared types (MitreTactic, Technique, Subtechnique, MitreTag
  kind/id/external_id/name) + TanStack query keys.
- components/MitreTagPicker.tsx: three-column controlled picker (tactic →
  technique → subtechnique), multi-select with chip-removal, autocomplete on
  each column, ARIA labels for screen readers. Returns MitreTag[] via
  value/onChange — drop-in for M5 template forms.
- pages/MitrePage.tsx: status card (version, source URL, last_sync), admin-
  gated Trigger Sync button with success/error alerts, picker showcase, JSON
  preview of the current selection.
- Layout adds MITRE nav link for any logged-in user; App.tsx adds the
  /mitre route under RequireAuth. HomePage roadmap bumped to next: M5
  templates.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 13:54:15 +02:00
Knacky
872f3c046a feat(m4): REST endpoints + admin sync + /diag/reset consistency
- GET /api/v1/mitre/tactics, /techniques?tactic=&q=, /subtechniques?technique=&q=
  (paginated, ILIKE search on name + external_id, @require_auth only — MITRE
  is public reference material).
- GET /api/v1/mitre/status: last_sync, version, source_url + the pinned
  defaults (default_url, default_version) for the SPA badge.
- POST /api/v1/mitre/sync: @require_perm("mitre.sync"). Body supports
  {source, expected_sha256, allow_unverified} — defaults inherit the pin.
- /diag/reset now also TRUNCATEs the mitre_* tables alongside settings so a
  freshly-reset stack has GET /mitre/status and GET /mitre/tactics agree
  ("no data, no last_sync"). Previously the catalogue persisted while the
  metadata was wiped, leaving status to lie. The e2e suite re-syncs in
  beforeAll.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 13:54:03 +02:00
Knacky
ba976959a1 feat(m4): STIX parser + seed service + CLI
- backend/app/services/mitre_seed.py: stdlib-only STIX 2.1 parser (urllib +
  hashlib + json). Pinned to enterprise-attack-19.0.json with sha256
  df520ea0775a57db7bff760145b02fed89290802913e056b7ed5970b02f3626a (~52 MB,
  ~1.1 s parse). Resolves sub-technique parents via
  relationship[subtechnique-of] with a T1003.001→T1003 dotted-id fallback;
  upserts on external_id, rebuilds the technique↔tactic M2M in a single
  transaction so external readers never see an empty join. Persists
  mitre_last_sync, mitre_version, mitre_source_url in the settings table.
- Custom URLs MUST be paired with expected_sha256 OR allow_unverified=true —
  refuses silent integrity bypass.
- CLI: flask metamorph seed-mitre [--source path|url]
  [--checksum-sha256 hex] [--skip-checksum]. Make target wraps it.
- Docker: /data/mitre/ chowned to the metamorph user at build; named volume
  metamorph_mitre mounted from compose for cross-restart cache.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 13:53:53 +02:00
21 changed files with 1928 additions and 14 deletions

View File

@@ -4,6 +4,40 @@ All notable changes to this project will be documented here. Format: [Keep a Cha
## [Unreleased] ## [Unreleased]
### Added — M4 (MITRE ATT&CK Enterprise)
- **STIX 2.1 parser + upsert** (`app/services/mitre_seed.py`): stdlib-only (`urllib.request` + `hashlib`), pinned to Enterprise v19.0 (`enterprise-attack-19.0.json`, sha256 `df520ea0…`). Parses 25k+ STIX objects → 15 tactics, 222 techniques, 475 sub-techniques in ~1.1 s. Skips revoked + deprecated, resolves sub-technique parents via `relationship[subtechnique-of]` with a `T1003.001 → T1003` dotted-id fallback, copies kill-chain phases into the `mitre_technique_tactics` M2M.
- **CLI**: `flask metamorph seed-mitre [--source <path|url>] [--checksum-sha256 <hex>] [--skip-checksum]` (`app/cli.py`). `make seed-mitre` wraps it.
- **REST endpoints** (`app/api/mitre.py`):
- `GET /api/v1/mitre/tactics`, `/mitre/techniques?tactic=…&q=…`, `/mitre/subtechniques?technique=…&q=…` (paginated, search on name/external_id).
- `GET /api/v1/mitre/status` (last_sync, version, source_url, defaults).
- `POST /api/v1/mitre/sync` (perm `mitre.sync`) — re-pull on demand.
- **Persisted metadata** in `settings`: `mitre_last_sync`, `mitre_version`, `mitre_source_url`.
- **Compose volume `metamorph_mitre`** mounted at `/data/mitre/` in the api container — caches the downloaded bundle across restarts. Owned by `metamorph:metamorph`.
- **Frontend**:
- `<MitreTagPicker>` component: 3-column tactic → technique → sub-technique with multi-select chips, autocomplete on each column. Returns `MitreTag[]` (`kind`, `id`, `external_id`, `name`), ready for M5 templates.
- `/mitre` showcase page with status card, admin-gated **Trigger sync** button, picker preview, and `<pre>` payload preview.
- Nav adds **MITRE** link for any logged-in user.
- **Testing**:
- `backend/tests/test_mitre.py`**12 pytest** (parser, idempotence, checksum mismatch, persisted settings, endpoint variants, perm enforcement) using a hand-crafted minimal STIX bundle (no network in tests).
- `e2e/tests/m4-mitre.spec.ts`**6 Playwright** against the live stack (calls `/mitre/sync` once in `beforeAll`).
- `tasks/testing-m4.md`.
### Fixed (post-M4 spec-review pass)
- **Sync integrity guarantee**: `seed_mitre()` now refuses a custom URL without either `expected_sha256` or an explicit `allow_unverified=true`. Closes a "typo in `mitre_source_url` setting routes the seed to attacker JSON" footgun. CLI surfaces this via `--checksum-sha256` / `--skip-checksum`; API via `{"source", "expected_sha256", "allow_unverified"}` body.
- **`/diag/reset` consistency**: now truncates the `mitre_*` tables alongside `settings` so `GET /mitre/status` and `GET /mitre/tactics` agree after a reset (previously: catalogue rows persisted, but `mitre_last_sync` got wiped → status lied).
- **Spec drift §10 #4**: amended "14 tactics" → "≥ 14 tactics (v19 ships 15)" to reflect MITRE v8+ reality.
### Decisions (intentional)
- **Bundle "embarqué" interpreted as seed-time download + named-volume cache**, not "binary baked into the Docker image". Keeps the image ~150 MB, makes version bumps a constant edit, plays nicely with `make seed-mitre` re-runs. Air-gapped operators copy the file into the volume + pass `--source /data/mitre/<file>`.
- **Read endpoints unauthenticated-perm-wise but auth-required**: MITRE data is public reference material — no `mitre.read` perm. Status endpoint is similarly open (under `@require_auth`) to keep `/mitre/status` simple for the UI badge.
- **No `requests` / `httpx` dep added**: stdlib `urllib.request` is enough and avoids inflating the image.
### Validated end-to-end (M4 DoD)
- `make clean && make up && make migrate && make seed-mitre` → 15 tactics / 222 techniques / 475 sub-techniques / 254 links / 0 orphans / ~1.1 s.
- `make test-api`**51 pytest pass** (1 health + 8 schema + 15 auth + 15 RBAC + 12 MITRE) in ~5 s.
- `make e2e`**34 Playwright pass** (8 M0 + 4 M1 + 8 M2 + 8 M3 + 6 M4) in ~18 s.
- Spec-reviewer PASS after fixes applied.
### Added — M3 (RBAC: groups, permissions, users) ### Added — M3 (RBAC: groups, permissions, users)
- **Permission catalogue** (`app/services/permissions_seed.py`): 31 atomic codes across 10 families (`user`, `group`, `invitation`, `test_template`, `scenario_template`, `mission`, `detection_level`, `setting`, `mitre.sync`). Seeded at boot **and** after `/setup` to handle a freshly truncated DB. Idempotent + additive on system groups (never removes a perm). - **Permission catalogue** (`app/services/permissions_seed.py`): 31 atomic codes across 10 families (`user`, `group`, `invitation`, `test_template`, `scenario_template`, `mission`, `detection_level`, `setting`, `mitre.sync`). Seeded at boot **and** after `/setup` to handle a freshly truncated DB. Idempotent + additive on system groups (never removes a perm).
- **Default group bindings**: `admin` = all 31 codes; `redteam` = 8 (catalogue read + mission.{read,create,update,archive,write_red_fields} + detection_level.read); `blueteam` = 5 (catalogue read + mission.{read,write_blue_fields} + detection_level.read). - **Default group bindings**: `admin` = all 31 codes; `redteam` = 8 (catalogue read + mission.{read,create,update,archive,write_red_fields} + detection_level.read); `blueteam` = 5 (catalogue read + mission.{read,write_blue_fields} + detection_level.read).

View File

@@ -2,13 +2,15 @@
Collaborative purple-team platform. Red team logs the tests they execute (procedure, command, timestamp); blue team annotates each test with detection evidence (alerts, logs, files). At the end of an engagement, Metamorph generates a standalone reveal.js slide deck classified by MITRE ATT&CK tactic. Collaborative purple-team platform. Red team logs the tests they execute (procedure, command, timestamp); blue team annotates each test with detection evidence (alerts, logs, files). At the end of an engagement, Metamorph generates a standalone reveal.js slide deck classified by MITRE ATT&CK tactic.
> **Status**: M0 (bootstrap). See `tasks/spec.md` for the full specification and `tasks/todo.md` for the milestone-by-milestone plan. > **Status**: M0M4 delivered (bootstrap → DB schema → auth → RBAC → MITRE ATT&CK reference). See `tasks/spec.md` for the full specification and `tasks/todo.md` for the milestone-by-milestone plan.
## Stack ## Stack
- **Backend**: Python 3.12, Flask 3, SQLAlchemy 2 + Alembic (M1+), PostgreSQL 16. - **Backend**: Python 3.12, Flask 3, SQLAlchemy 2 + Alembic (M1+), PostgreSQL 16.
- **Frontend**: React 18 + TypeScript + Vite + TailwindCSS (RTOps design tokens, see `tasks/design.md`). - **Frontend**: React 18 + TypeScript + Vite + TailwindCSS (RTOps design tokens, see `tasks/design.md`).
- **Auth (M2+)**: JWT access (1h) + refresh (30d), Argon2id, invite-link enrollment. - **Auth (M2+)**: JWT access (1h) + refresh (30d), Argon2id, invite-link enrollment.
- **RBAC (M3+)**: atomic permissions (31 codes) bundled into custom groups; 3 system groups seeded (`admin` / `redteam` / `blueteam`).
- **MITRE ATT&CK (M4+)**: Enterprise reference catalogue pinned to v19.0, seedable via `make seed-mitre`.
- **Delivery**: docker-compose. TLS termination is expected to be handled by an external reverse proxy in production. - **Delivery**: docker-compose. TLS termination is expected to be handled by an external reverse proxy in production.
## Quickstart ## Quickstart
@@ -43,6 +45,17 @@ Then:
- Front: <http://localhost:8080> - Front: <http://localhost:8080>
- API health: <http://localhost:8080/api/v1/health> (proxied) or <http://localhost:8000/api/v1/health> - API health: <http://localhost:8080/api/v1/health> (proxied) or <http://localhost:8000/api/v1/health>
### First-time setup
```bash
make migrate # apply DB schema
make print-install-token # prints the bootstrap admin token (logs banner)
# visit http://localhost:8080/setup, paste the token, create the admin account
make seed-mitre # populate the MITRE ATT&CK reference (~50 MB, ~1 s)
```
The MITRE bundle is cached in the named volume `metamorph_mitre` (`/data/mitre/<file>.json` inside the api container). For air-gapped operators, pre-populate the volume with your own STIX 2.1 file and run `podman compose exec api flask --app app.cli metamorph seed-mitre --source /data/mitre/your-file.json --skip-checksum`.
To stop: To stop:
```bash ```bash
@@ -80,7 +93,7 @@ See `.env.example`. The most important ones:
## Testing ## Testing
- **Manual + automated checklist for the current milestone**: see [`tasks/testing-m<N>.md`](tasks/testing-m0.md) (currently `testing-m0.md`). - **Manual + automated checklist for the current milestone**: see [`tasks/testing-m<N>.md`](tasks/testing-m4.md) (current: `testing-m4.md`).
- **Backend unit tests**: `make test-api` - **Backend unit tests**: `make test-api`
- **End-to-end (Playwright)**: `make e2e-install` (once), then `make up && make e2e`. Reports land in `e2e/playwright-report/` (HTML + JUnit XML); open with `make e2e-report`. - **End-to-end (Playwright)**: `make e2e-install` (once), then `make up && make e2e`. Reports land in `e2e/playwright-report/` (HTML + JUnit XML); open with `make e2e-report`.
@@ -122,7 +135,7 @@ The hooks run `ruff` + `ruff-format` on the backend and `eslint` / `tsc --noEmit
## Roadmap ## Roadmap
See `tasks/todo.md`. Current milestone: **M0bootstrap**. See `tasks/todo.md`. Current milestone: **M4MITRE ATT&CK Enterprise**. Next: M5 (test & scenario templates).
## License ## License

View File

@@ -30,7 +30,7 @@ ENV PYTHONDONTWRITEBYTECODE=1 \
# Non-root user # Non-root user
RUN groupadd --gid 10001 metamorph \ RUN groupadd --gid 10001 metamorph \
&& useradd --uid 10001 --gid metamorph --shell /usr/sbin/nologin --create-home metamorph \ && useradd --uid 10001 --gid metamorph --shell /usr/sbin/nologin --create-home metamorph \
&& mkdir -p /data/evidence \ && mkdir -p /data/evidence /data/mitre \
&& chown -R metamorph:metamorph /data && chown -R metamorph:metamorph /data
COPY --from=deps /opt/venv /opt/venv COPY --from=deps /opt/venv /opt/venv

View File

@@ -66,12 +66,23 @@ def reset_test_state():
try: try:
with get_engine().begin() as conn: with get_engine().begin() as conn:
# Auth + RBAC + settings reset.
conn.execute( conn.execute(
text( text(
"TRUNCATE users, refresh_tokens, invitations, invitation_groups, " "TRUNCATE users, refresh_tokens, invitations, invitation_groups, "
"user_groups, settings, groups RESTART IDENTITY CASCADE" "user_groups, settings, groups RESTART IDENTITY CASCADE"
) )
) )
# MITRE reference reset — kept in sync with `settings` so a freshly
# reset stack has `GET /mitre/status` and `GET /mitre/tactics` agree
# ("no data, no last_sync"). The e2e suite re-syncs via /mitre/sync
# when it needs catalogue data.
conn.execute(
text(
"TRUNCATE mitre_technique_tactics, mitre_subtechniques, "
"mitre_techniques, mitre_tactics RESTART IDENTITY CASCADE"
)
)
except SQLAlchemyError as e: except SQLAlchemyError as e:
log.error("metamorph.diag.reset_failed", extra={"error": str(e)}) log.error("metamorph.diag.reset_failed", extra={"error": str(e)})
return jsonify({"reset": False, "error": "database_error"}), 500 return jsonify({"reset": False, "error": "database_error"}), 500

213
backend/app/api/mitre.py Normal file
View File

@@ -0,0 +1,213 @@
"""MITRE ATT&CK reference endpoints.
Read access is open to any authenticated user (the catalogue is reference
data — not sensitive on its own). Sync is admin-only via `mitre.sync`.
"""
from __future__ import annotations
import logging
from flask import Blueprint, jsonify, request
from sqlalchemy import func, or_, select
from app.core.auth_decorators import require_auth, require_perm
from app.db.session import session_scope
from app.models.mitre import MitreSubtechnique, MitreTactic, MitreTechnique
from app.services import mitre_seed as mitre_seed_svc
bp = Blueprint("mitre", __name__, url_prefix="/mitre")
log = logging.getLogger("metamorph.api.mitre")
def _pagination_args() -> tuple[int, int] | tuple[None, tuple[int, str]]:
"""Returns (limit, offset) or (None, (status, error_payload))."""
try:
limit = int(request.args.get("limit", "100"))
offset = int(request.args.get("offset", "0"))
except ValueError:
return None, (400, "invalid_pagination")
limit = max(1, min(limit, 500))
offset = max(0, offset)
return limit, offset
def _search(stmt, model, q: str | None):
if not q:
return stmt
like = f"%{q.lower()}%"
return stmt.where(
or_(
func.lower(model.name).like(like),
func.lower(model.external_id).like(like),
)
)
def _serialize_tactic(t: MitreTactic) -> dict:
return {
"id": str(t.id),
"external_id": t.external_id,
"short_name": t.short_name,
"name": t.name,
"description": t.description,
"url": t.url,
}
def _serialize_technique(t: MitreTechnique, *, include_tactics: bool = True) -> dict:
out = {
"id": str(t.id),
"external_id": t.external_id,
"name": t.name,
"description": t.description,
"url": t.url,
}
if include_tactics:
out["tactics"] = sorted(
({"external_id": tac.external_id, "name": tac.name} for tac in t.tactics),
key=lambda d: d["external_id"],
)
return out
def _serialize_subtechnique(sb: MitreSubtechnique) -> dict:
return {
"id": str(sb.id),
"external_id": sb.external_id,
"name": sb.name,
"description": sb.description,
"url": sb.url,
"technique_id": str(sb.technique_id),
}
@bp.get("/tactics")
@require_auth
def list_tactics():
paging = _pagination_args()
if paging[0] is None:
return jsonify({"error": paging[1][1]}), paging[1][0]
limit, offset = paging
q = request.args.get("q") or None
with session_scope() as s:
stmt = select(MitreTactic).order_by(MitreTactic.external_id.asc())
stmt = _search(stmt, MitreTactic, q)
total = s.scalar(_search(select(func.count()).select_from(MitreTactic), MitreTactic, q)) or 0
rows = s.scalars(stmt.limit(limit).offset(offset)).all()
return jsonify(
{
"items": [_serialize_tactic(t) for t in rows],
"total": int(total),
"limit": limit,
"offset": offset,
}
)
@bp.get("/techniques")
@require_auth
def list_techniques():
paging = _pagination_args()
if paging[0] is None:
return jsonify({"error": paging[1][1]}), paging[1][0]
limit, offset = paging
q = request.args.get("q") or None
tactic = request.args.get("tactic") or None
with session_scope() as s:
stmt = select(MitreTechnique).order_by(MitreTechnique.external_id.asc())
count_stmt = select(func.count()).select_from(MitreTechnique)
if tactic:
tac = s.scalar(select(MitreTactic).where(MitreTactic.external_id == tactic))
if tac is None:
return jsonify({"items": [], "total": 0, "limit": limit, "offset": offset})
stmt = stmt.join(MitreTechnique.tactics).where(MitreTactic.id == tac.id)
count_stmt = (
count_stmt.select_from(MitreTechnique)
.join(MitreTechnique.tactics)
.where(MitreTactic.id == tac.id)
)
stmt = _search(stmt, MitreTechnique, q)
count_stmt = _search(count_stmt, MitreTechnique, q)
total = s.scalar(count_stmt) or 0
rows = s.scalars(stmt.limit(limit).offset(offset)).all()
return jsonify(
{
"items": [_serialize_technique(t) for t in rows],
"total": int(total),
"limit": limit,
"offset": offset,
}
)
@bp.get("/subtechniques")
@require_auth
def list_subtechniques():
paging = _pagination_args()
if paging[0] is None:
return jsonify({"error": paging[1][1]}), paging[1][0]
limit, offset = paging
q = request.args.get("q") or None
technique = request.args.get("technique") or None
with session_scope() as s:
stmt = select(MitreSubtechnique).order_by(MitreSubtechnique.external_id.asc())
count_stmt = select(func.count()).select_from(MitreSubtechnique)
if technique:
tech = s.scalar(
select(MitreTechnique).where(MitreTechnique.external_id == technique)
)
if tech is None:
return jsonify({"items": [], "total": 0, "limit": limit, "offset": offset})
stmt = stmt.where(MitreSubtechnique.technique_id == tech.id)
count_stmt = count_stmt.where(MitreSubtechnique.technique_id == tech.id)
stmt = _search(stmt, MitreSubtechnique, q)
count_stmt = _search(count_stmt, MitreSubtechnique, q)
total = s.scalar(count_stmt) or 0
rows = s.scalars(stmt.limit(limit).offset(offset)).all()
return jsonify(
{
"items": [_serialize_subtechnique(sb) for sb in rows],
"total": int(total),
"limit": limit,
"offset": offset,
}
)
@bp.get("/status")
@require_auth
def status():
return jsonify(mitre_seed_svc.read_status())
@bp.post("/sync")
@require_auth
@require_perm("mitre.sync")
def sync():
"""Re-pull the configured (or default) STIX source and upsert.
Custom `source` URLs MUST be paired with either `expected_sha256` (integrity
guarantee) or `allow_unverified: true` (explicit opt-out) — the seed service
will raise otherwise.
"""
payload = request.get_json(silent=True) or {}
source = payload.get("source") # optional URL override
expected_sha256 = payload.get("expected_sha256")
allow_unverified = bool(payload.get("allow_unverified", False))
try:
result = mitre_seed_svc.seed_mitre(
source=source,
expected_sha256=expected_sha256
or (mitre_seed_svc.MITRE_DEFAULT_SHA256 if source is None else None),
allow_unverified=allow_unverified,
)
except mitre_seed_svc.MitreChecksumMismatch as e:
return jsonify({"error": "checksum_mismatch", "message": str(e)}), 502
except mitre_seed_svc.MitreSeedError as e:
return jsonify({"error": "seed_failed", "message": str(e)}), 502
except Exception as e: # noqa: BLE001
log.exception("metamorph.api.mitre.sync_failed")
return jsonify({"error": "internal_error", "message": str(e)}), 500
log.warning("metamorph.api.mitre.sync_done", extra=result.as_dict())
return jsonify(result.as_dict())

View File

@@ -9,6 +9,7 @@ from app.api.diag import bp as diag_bp
from app.api.groups import bp as groups_bp from app.api.groups import bp as groups_bp
from app.api.health import bp as health_bp from app.api.health import bp as health_bp
from app.api.invitations import bp as invitations_bp from app.api.invitations import bp as invitations_bp
from app.api.mitre import bp as mitre_bp
from app.api.permissions import bp as permissions_bp from app.api.permissions import bp as permissions_bp
from app.api.setup import bp as setup_bp from app.api.setup import bp as setup_bp
from app.api.users import bp as users_bp from app.api.users import bp as users_bp
@@ -22,3 +23,4 @@ bp.register_blueprint(invitations_bp)
bp.register_blueprint(users_bp) bp.register_blueprint(users_bp)
bp.register_blueprint(groups_bp) bp.register_blueprint(groups_bp)
bp.register_blueprint(permissions_bp) bp.register_blueprint(permissions_bp)
bp.register_blueprint(mitre_bp)

View File

@@ -56,10 +56,66 @@ def print_install_token(force: bool):
@metamorph.command("seed-mitre") @metamorph.command("seed-mitre")
def seed_mitre(): @click.option(
"""Placeholder for M4 — left so `make seed-mitre` doesn't crash.""" "--source",
click.echo("MITRE seeding will land in M4. (no-op for now)", err=True) default=None,
sys.exit(0) help="STIX bundle source: local path or HTTPS URL. Defaults to the pinned MITRE Enterprise release.",
)
@click.option(
"--checksum-sha256",
"checksum_sha256",
default=None,
help="Expected sha256 of the bundle (required with a non-default --source URL unless --skip-checksum).",
)
@click.option(
"--skip-checksum",
is_flag=True,
help="Skip sha256 verification entirely (escape hatch for testing).",
)
def seed_mitre(source: str | None, checksum_sha256: str | None, skip_checksum: bool):
"""Seed/refresh the MITRE ATT&CK Enterprise reference tables.
Upserts on `external_id`. Re-running with the same source updates the
name/description/url and re-applies the technique↔tactic mapping.
"""
from app.services.mitre_seed import (
MITRE_DEFAULT_SHA256,
MITRE_DEFAULT_URL,
seed_mitre as seed_mitre_svc,
)
if skip_checksum:
expected_sha = None
elif checksum_sha256:
expected_sha = checksum_sha256
elif source is None or source == MITRE_DEFAULT_URL:
expected_sha = MITRE_DEFAULT_SHA256
else:
expected_sha = None # let seed_mitre_svc decide whether to refuse
click.echo(
f"Seeding from {source or MITRE_DEFAULT_URL} "
f"(sha256 check: {'off' if skip_checksum else expected_sha or 'unverified'}) ...",
err=True,
)
try:
result = seed_mitre_svc(
source=source,
expected_sha256=expected_sha,
allow_unverified=skip_checksum,
)
except Exception as e: # noqa: BLE001
click.echo(f"seed-mitre failed: {e}", err=True)
sys.exit(2)
click.echo(
f" tactics: {result.tactics_upserted}, "
f"techniques: {result.techniques_upserted}, "
f"subtechniques: {result.subtechniques_upserted} "
f"(skipped orphans: {result.subtechniques_skipped_orphan}), "
f"links: {result.technique_tactic_links}, "
f"duration: {(result.finished_at - result.started_at).total_seconds():.1f}s",
err=True,
)
app.cli.add_command(metamorph) app.cli.add_command(metamorph)

View File

@@ -0,0 +1,478 @@
"""MITRE ATT&CK Enterprise seed + sync.
Parses a STIX 2.1 bundle into the `mitre_*` tables. Idempotent: re-running
upserts on `external_id`, refreshes name/description/url, and re-applies the
technique↔tactic mapping. Sub-techniques whose parent is missing in the
bundle are skipped (with a WARNING log).
Defaults pin a specific Enterprise release (see `MITRE_DEFAULT_*`). The pin
is honored by the CLI (`flask metamorph seed-mitre`) and by the
`POST /mitre/sync` admin endpoint; both accept a `--source` / `source_url`
override for air-gapped operators.
The bundle is downloaded with `urllib.request` (stdlib — no extra dep) and
cached at `MITRE_BUNDLE_CACHE_PATH` (default `/data/mitre/<basename>.json`).
Pass an absolute path as `source` to bypass the network entirely.
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import urllib.parse
import urllib.request
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable
from sqlalchemy import delete, select
from app.db.session import session_scope
from app.models.mitre import (
MitreSubtechnique,
MitreTactic,
MitreTechnique,
MitreTechniqueTactic,
)
from app.models.setting import Setting
log = logging.getLogger("metamorph.mitre.seed")
# === Default pin =============================================================
#
# MITRE publishes versioned bundles at
# `https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/enterprise-attack/enterprise-attack-<X.Y>.json`.
# Update these three constants in lock-step when bumping the pin. The SHA256
# is verified against the downloaded bytes — a mismatch aborts the seed.
#
MITRE_VERSION = "19.0"
MITRE_DEFAULT_URL = (
"https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/"
"enterprise-attack/enterprise-attack-19.0.json"
)
MITRE_DEFAULT_SHA256 = "df520ea0775a57db7bff760145b02fed89290802913e056b7ed5970b02f3626a"
MITRE_BUNDLE_CACHE_PATH = Path(os.environ.get("MITRE_CACHE_DIR", "/data/mitre"))
MITRE_DOWNLOAD_TIMEOUT_SECONDS = 120
# Settings keys used to expose the seed metadata to the operator UI/CLI.
SETTING_LAST_SYNC = "mitre_last_sync"
SETTING_VERSION = "mitre_version"
SETTING_SOURCE_URL = "mitre_source_url"
ATTACK_SOURCE_NAME = "mitre-attack"
KILL_CHAIN_NAME = "mitre-attack"
class MitreSeedError(Exception):
pass
class MitreChecksumMismatch(MitreSeedError):
pass
@dataclass
class ParsedBundle:
tactics: list[dict] = field(default_factory=list)
techniques: list[dict] = field(default_factory=list) # parent techniques
subtechniques: list[dict] = field(default_factory=list)
# Map: subtechnique attack-pattern STIX id -> parent technique STIX id
subtechnique_parents: dict[str, str] = field(default_factory=dict)
spec_version: str | None = None
@dataclass
class SeedResult:
tactics_upserted: int
techniques_upserted: int
subtechniques_upserted: int
subtechniques_skipped_orphan: int
technique_tactic_links: int
version: str | None
source: str
started_at: datetime
finished_at: datetime
def as_dict(self) -> dict:
return {
"tactics_upserted": self.tactics_upserted,
"techniques_upserted": self.techniques_upserted,
"subtechniques_upserted": self.subtechniques_upserted,
"subtechniques_skipped_orphan": self.subtechniques_skipped_orphan,
"technique_tactic_links": self.technique_tactic_links,
"version": self.version,
"source": self.source,
"started_at": self.started_at.isoformat(),
"finished_at": self.finished_at.isoformat(),
"duration_ms": int(
(self.finished_at - self.started_at).total_seconds() * 1000
),
}
# === I/O =====================================================================
def _is_url(source: str) -> bool:
parsed = urllib.parse.urlparse(source)
return parsed.scheme in ("http", "https")
def _sha256_of(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1 << 16), b""):
h.update(chunk)
return h.hexdigest()
def _download(url: str, dest: Path, *, expected_sha256: str | None = None) -> Path:
dest.parent.mkdir(parents=True, exist_ok=True)
tmp = dest.with_suffix(dest.suffix + ".part")
log.info("metamorph.mitre.download.start", extra={"url": url, "dest": str(dest)})
req = urllib.request.Request(url, headers={"User-Agent": "metamorph-mitre-seed/1.0"})
with urllib.request.urlopen(req, timeout=MITRE_DOWNLOAD_TIMEOUT_SECONDS) as resp:
with tmp.open("wb") as f:
for chunk in iter(lambda: resp.read(1 << 16), b""):
f.write(chunk)
if expected_sha256:
actual = _sha256_of(tmp)
if actual != expected_sha256:
tmp.unlink(missing_ok=True)
raise MitreChecksumMismatch(
f"sha256 mismatch for {url}: expected {expected_sha256}, got {actual}"
)
tmp.replace(dest)
log.info(
"metamorph.mitre.download.done",
extra={"url": url, "bytes": dest.stat().st_size},
)
return dest
def resolve_source_to_path(
source: str | Path | None,
*,
cache_dir: Path = MITRE_BUNDLE_CACHE_PATH,
expected_sha256: str | None = MITRE_DEFAULT_SHA256,
) -> tuple[Path, str]:
"""Return (path, source_label). Downloads if `source` is an URL; otherwise
treats it as a local file. `None` → default URL.
`source_label` is what we persist in `settings.mitre_source_url`."""
if source is None:
source = MITRE_DEFAULT_URL
source_str = str(source)
if _is_url(source_str):
basename = source_str.rsplit("/", 1)[-1] or "enterprise-attack.json"
dest = cache_dir / basename
_download(source_str, dest, expected_sha256=expected_sha256)
return dest, source_str
path = Path(source_str)
if not path.exists():
raise MitreSeedError(f"source path does not exist: {path}")
return path, str(path)
# === STIX parsing ============================================================
def _attack_ref(obj: dict) -> dict | None:
for ref in obj.get("external_references") or ():
if ref.get("source_name") == ATTACK_SOURCE_NAME and ref.get("external_id"):
return ref
return None
def parse_bundle(path: Path) -> ParsedBundle:
"""Read the STIX bundle into normalized dicts ready for SQL upserts."""
with path.open("r", encoding="utf-8") as f:
bundle = json.load(f)
objs = bundle.get("objects") or []
parsed = ParsedBundle(spec_version=bundle.get("spec_version"))
parents_by_subtech: dict[str, str] = {}
for o in objs:
if (
o.get("type") == "relationship"
and o.get("relationship_type") == "subtechnique-of"
and not o.get("revoked")
):
parents_by_subtech[o["source_ref"]] = o["target_ref"]
parsed.subtechnique_parents = parents_by_subtech
for o in objs:
if o.get("revoked") or o.get("x_mitre_deprecated"):
continue
kind = o.get("type")
if kind == "x-mitre-tactic":
ref = _attack_ref(o)
if not ref:
continue
parsed.tactics.append(
{
"external_id": ref["external_id"],
"name": o.get("name") or "",
"short_name": o.get("x_mitre_shortname") or "",
"description": o.get("description"),
"url": ref.get("url"),
}
)
elif kind == "attack-pattern":
ref = _attack_ref(o)
if not ref:
continue
common = {
"external_id": ref["external_id"],
"name": o.get("name") or "",
"description": o.get("description"),
"url": ref.get("url"),
}
if o.get("x_mitre_is_subtechnique"):
parent_stix = parents_by_subtech.get(o["id"])
parsed.subtechniques.append(
{**common, "stix_id": o["id"], "parent_stix_id": parent_stix}
)
else:
# Capture kill_chain_phases so we can map to tactics by short_name.
phases = [
p.get("phase_name")
for p in (o.get("kill_chain_phases") or ())
if p.get("kill_chain_name") == KILL_CHAIN_NAME and p.get("phase_name")
]
parsed.techniques.append(
{**common, "stix_id": o["id"], "phase_names": phases}
)
return parsed
# === DB upserts ==============================================================
def _upsert_tactics(s, tactics: Iterable[dict]) -> tuple[dict, int]:
"""Upsert tactics. Returns (short_name → tactic_id, n_upserted)."""
existing = {t.external_id: t for t in s.scalars(select(MitreTactic)).all()}
short_to_id: dict = {}
upserted = 0
for t in tactics:
row = existing.get(t["external_id"])
if row is None:
row = MitreTactic(
external_id=t["external_id"],
short_name=t["short_name"],
name=t["name"],
description=t["description"],
url=t["url"],
)
s.add(row)
s.flush()
upserted += 1
else:
row.short_name = t["short_name"]
row.name = t["name"]
row.description = t["description"]
row.url = t["url"]
upserted += 1
short_to_id[t["short_name"]] = row.id
return short_to_id, upserted
def _upsert_techniques(
s, techniques: Iterable[dict], short_to_tactic_id: dict
) -> tuple[dict, int, int]:
"""Upsert techniques + their tactic links. Returns (stix_id→technique_id, n_upserted, n_links)."""
existing = {t.external_id: t for t in s.scalars(select(MitreTechnique)).all()}
stix_to_id: dict = {}
n_upserted = 0
n_links = 0
# We'll rebuild the technique↔tactic mapping for clarity (drop + add). This
# is O(techniques × tactics) but cheap relative to the parse itself.
s.execute(delete(MitreTechniqueTactic))
for t in techniques:
row = existing.get(t["external_id"])
if row is None:
row = MitreTechnique(
external_id=t["external_id"],
name=t["name"],
description=t["description"],
url=t["url"],
)
s.add(row)
s.flush()
else:
row.name = t["name"]
row.description = t["description"]
row.url = t["url"]
n_upserted += 1
stix_to_id[t["stix_id"]] = row.id
for phase in t.get("phase_names", []):
tac_id = short_to_tactic_id.get(phase)
if tac_id is None:
# Tactic referenced but not in bundle — log + skip.
log.warning(
"metamorph.mitre.unknown_tactic_phase",
extra={"technique": t["external_id"], "phase": phase},
)
continue
s.add(MitreTechniqueTactic(technique_id=row.id, tactic_id=tac_id))
n_links += 1
return stix_to_id, n_upserted, n_links
def _upsert_subtechniques(
s,
subtechniques: Iterable[dict],
stix_to_tech_id: dict,
) -> tuple[int, int]:
"""Returns (n_upserted, n_skipped_orphans)."""
existing = {sb.external_id: sb for sb in s.scalars(select(MitreSubtechnique)).all()}
n_upserted = 0
n_skipped = 0
for sb in subtechniques:
parent_stix = sb.get("parent_stix_id")
parent_id = stix_to_tech_id.get(parent_stix) if parent_stix else None
if parent_id is None:
# Fall back to the dotted external_id convention (T1003.001 → T1003).
m = re.match(r"^(T\d+)\.\d+$", sb["external_id"])
if m:
parent_ext = m.group(1)
# We don't have a parent-by-external-id map here; query.
parent_row = next(
iter(
s.scalars(
select(MitreTechnique).where(MitreTechnique.external_id == parent_ext)
).all()
),
None,
)
parent_id = parent_row.id if parent_row else None
if parent_id is None:
log.warning(
"metamorph.mitre.orphan_subtechnique",
extra={"subtechnique": sb["external_id"]},
)
n_skipped += 1
continue
row = existing.get(sb["external_id"])
if row is None:
s.add(
MitreSubtechnique(
external_id=sb["external_id"],
name=sb["name"],
description=sb["description"],
url=sb["url"],
technique_id=parent_id,
)
)
else:
row.name = sb["name"]
row.description = sb["description"]
row.url = sb["url"]
row.technique_id = parent_id
n_upserted += 1
return n_upserted, n_skipped
def _upsert_setting(s, key: str, value: object) -> None:
row = s.scalar(select(Setting).where(Setting.key == key))
if row is None:
s.add(Setting(key=key, value=value))
else:
row.value = value
# === Entry point =============================================================
def seed_mitre(
*,
source: str | Path | None = None,
expected_sha256: str | None = MITRE_DEFAULT_SHA256,
cache_dir: Path = MITRE_BUNDLE_CACHE_PATH,
allow_unverified: bool = False,
) -> SeedResult:
"""Top-level seed. URL → download + verify + parse; path → just parse.
Custom URLs (anything other than `MITRE_DEFAULT_URL`) MUST be paired with
an `expected_sha256` for integrity, or with `allow_unverified=True` to opt
out explicitly. This avoids a silent integrity bypass when an operator
points the sync at a typo'd or attacker-controlled mirror.
"""
started_at = datetime.now(tz=timezone.utc)
if source is not None and _is_url(str(source)) and str(source) != MITRE_DEFAULT_URL:
if expected_sha256 is None or expected_sha256 == MITRE_DEFAULT_SHA256:
# The caller passed a non-default URL but didn't override the hash:
# MITRE_DEFAULT_SHA256 would obviously not match → force an explicit
# decision rather than silently bypassing.
if not allow_unverified:
raise MitreSeedError(
"custom URL requires an expected_sha256 (or allow_unverified=True)"
)
expected_sha256 = None
path, source_label = resolve_source_to_path(
source, cache_dir=cache_dir, expected_sha256=expected_sha256
)
parsed = parse_bundle(path)
log.info(
"metamorph.mitre.parsed",
extra={
"tactics": len(parsed.tactics),
"techniques": len(parsed.techniques),
"subtechniques": len(parsed.subtechniques),
"spec_version": parsed.spec_version,
},
)
with session_scope() as s:
short_to_tactic_id, n_tactics = _upsert_tactics(s, parsed.tactics)
stix_to_tech_id, n_techs, n_links = _upsert_techniques(
s, parsed.techniques, short_to_tactic_id
)
n_subs, n_orphan = _upsert_subtechniques(s, parsed.subtechniques, stix_to_tech_id)
finished_at = datetime.now(tz=timezone.utc)
_upsert_setting(s, SETTING_LAST_SYNC, finished_at.isoformat())
# If the URL is the pinned one, we know the version; otherwise leave None.
version = MITRE_VERSION if source_label == MITRE_DEFAULT_URL else None
if version:
_upsert_setting(s, SETTING_VERSION, version)
_upsert_setting(s, SETTING_SOURCE_URL, source_label)
result = SeedResult(
tactics_upserted=n_tactics,
techniques_upserted=n_techs,
subtechniques_upserted=n_subs,
subtechniques_skipped_orphan=n_orphan,
technique_tactic_links=n_links,
version=version,
source=source_label,
started_at=started_at,
finished_at=finished_at,
)
log.info("metamorph.mitre.seed_completed", extra=result.as_dict())
return result
def read_status() -> dict:
"""Return the persisted seed metadata for `GET /mitre/status`."""
keys = {SETTING_LAST_SYNC, SETTING_VERSION, SETTING_SOURCE_URL}
out = {k: None for k in keys}
with session_scope() as s:
for row in s.scalars(select(Setting).where(Setting.key.in_(keys))).all():
out[row.key] = row.value
return {
"last_sync": out[SETTING_LAST_SYNC],
"version": out[SETTING_VERSION],
"source_url": out[SETTING_SOURCE_URL],
"default_url": MITRE_DEFAULT_URL,
"default_version": MITRE_VERSION,
}

360
backend/tests/test_mitre.py Normal file
View File

@@ -0,0 +1,360 @@
"""Integration tests for M4: STIX parser + seed + /mitre/* endpoints.
Uses a minimal hand-crafted STIX bundle (no network) so the parser logic and
the upsert semantics can be exercised deterministically.
"""
from __future__ import annotations
import json
import secrets
import uuid
from pathlib import Path
import pytest
from sqlalchemy import text
from app.core.install_token import regenerate_install_token
from app.main import create_app
from app.services import mitre_seed as mitre_svc
def _truncate_all(engine):
with engine.begin() as conn:
conn.execute(
text(
"TRUNCATE users, refresh_tokens, invitations, invitation_groups, "
"user_groups, group_permissions, permissions, settings, groups, "
"mitre_subtechniques, mitre_technique_tactics, mitre_techniques, "
"mitre_tactics RESTART IDENTITY CASCADE"
)
)
@pytest.fixture(scope="module")
def app(db_engine_or_skip):
_truncate_all(db_engine_or_skip)
flask_app = create_app()
flask_app.config.update(TESTING=True)
return flask_app
@pytest.fixture()
def client(app):
return app.test_client()
def _unique_email(prefix: str) -> str:
return f"{prefix}-{secrets.token_hex(4)}@metamorph.local"
@pytest.fixture(scope="module")
def admin_credentials(app, db_engine_or_skip):
"""Bootstrap a fresh admin once for the whole module."""
token = regenerate_install_token()
email = _unique_email("admin")
password = "AdminPass1234!"
with app.test_client() as c:
r = c.post(
"/api/v1/setup",
json={"install_token": token, "email": email, "password": password},
)
assert r.status_code == 201, r.get_data(as_text=True)
return {"email": email, "password": password, "user_id": r.get_json()["user_id"]}
def _login(client, email: str, password: str) -> str:
r = client.post("/api/v1/auth/login", json={"email": email, "password": password})
assert r.status_code == 200, r.get_data(as_text=True)
return r.get_json()["access_token"]
# === Fixture STIX bundle =====================================================
MINIMAL_BUNDLE = {
"type": "bundle",
"id": "bundle--00000000-0000-0000-0000-000000000001",
"spec_version": "2.1",
"objects": [
# Tactic 1 — kept
{
"type": "x-mitre-tactic",
"id": "x-mitre-tactic--ta0001",
"name": "Initial Access",
"description": "Get a foothold.",
"x_mitre_shortname": "initial-access",
"external_references": [
{
"source_name": "mitre-attack",
"external_id": "TA0001",
"url": "https://attack.mitre.org/tactics/TA0001/",
}
],
},
# Tactic 2 — kept
{
"type": "x-mitre-tactic",
"id": "x-mitre-tactic--ta0002",
"name": "Execution",
"description": "Run code.",
"x_mitre_shortname": "execution",
"external_references": [
{
"source_name": "mitre-attack",
"external_id": "TA0002",
"url": "https://attack.mitre.org/tactics/TA0002/",
}
],
},
# Revoked tactic — must be skipped
{
"type": "x-mitre-tactic",
"id": "x-mitre-tactic--ta0099",
"name": "Doomed",
"x_mitre_shortname": "doomed",
"revoked": True,
"external_references": [
{"source_name": "mitre-attack", "external_id": "TA0099"}
],
},
# Technique T1059 covers both tactics
{
"type": "attack-pattern",
"id": "attack-pattern--t1059",
"name": "Command and Scripting Interpreter",
"description": "Use shells.",
"kill_chain_phases": [
{"kill_chain_name": "mitre-attack", "phase_name": "initial-access"},
{"kill_chain_name": "mitre-attack", "phase_name": "execution"},
],
"external_references": [
{
"source_name": "mitre-attack",
"external_id": "T1059",
"url": "https://attack.mitre.org/techniques/T1059/",
}
],
},
# Technique T1078 only initial-access
{
"type": "attack-pattern",
"id": "attack-pattern--t1078",
"name": "Valid Accounts",
"description": "Use legit creds.",
"kill_chain_phases": [
{"kill_chain_name": "mitre-attack", "phase_name": "initial-access"},
],
"external_references": [
{
"source_name": "mitre-attack",
"external_id": "T1078",
"url": "https://attack.mitre.org/techniques/T1078/",
}
],
},
# Deprecated technique — skipped
{
"type": "attack-pattern",
"id": "attack-pattern--t1190",
"name": "Exploit Public-Facing Application",
"x_mitre_deprecated": True,
"external_references": [
{"source_name": "mitre-attack", "external_id": "T1190"}
],
},
# Sub-technique of T1059
{
"type": "attack-pattern",
"id": "attack-pattern--t1059-001",
"name": "PowerShell",
"description": "Windows shell.",
"x_mitre_is_subtechnique": True,
"external_references": [
{
"source_name": "mitre-attack",
"external_id": "T1059.001",
"url": "https://attack.mitre.org/techniques/T1059/001/",
}
],
},
# Relationship attaching the sub to its parent
{
"type": "relationship",
"id": "relationship--rel1",
"relationship_type": "subtechnique-of",
"source_ref": "attack-pattern--t1059-001",
"target_ref": "attack-pattern--t1059",
},
],
}
@pytest.fixture()
def fixture_bundle_path(tmp_path: Path) -> Path:
path = tmp_path / "minimal-stix.json"
path.write_text(json.dumps(MINIMAL_BUNDLE))
return path
# === Parser unit tests =======================================================
def test_parser_extracts_active_objects(fixture_bundle_path):
parsed = mitre_svc.parse_bundle(fixture_bundle_path)
assert len(parsed.tactics) == 2 # TA0001 + TA0002 (TA0099 revoked)
assert {t["external_id"] for t in parsed.tactics} == {"TA0001", "TA0002"}
assert len(parsed.techniques) == 2 # T1059 + T1078 (T1190 deprecated)
assert {t["external_id"] for t in parsed.techniques} == {"T1059", "T1078"}
assert len(parsed.subtechniques) == 1
sb = parsed.subtechniques[0]
assert sb["external_id"] == "T1059.001"
assert sb["parent_stix_id"] == "attack-pattern--t1059"
# === Seed integration tests ==================================================
def test_seed_against_fixture(app, fixture_bundle_path):
result = mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
assert result.tactics_upserted == 2
assert result.techniques_upserted == 2
assert result.subtechniques_upserted == 1
assert result.subtechniques_skipped_orphan == 0
assert result.technique_tactic_links == 3 # T1059→TA0001, T1059→TA0002, T1078→TA0001
def test_seed_is_idempotent(app, fixture_bundle_path):
"""Running twice yields the same row counts and no SQL errors."""
first = mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
second = mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
assert (first.tactics_upserted, first.techniques_upserted, first.subtechniques_upserted) == (
second.tactics_upserted,
second.techniques_upserted,
second.subtechniques_upserted,
)
def test_seed_persists_setting(app, fixture_bundle_path):
"""settings table records the last sync timestamp + source URL."""
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
status = mitre_svc.read_status()
assert status["last_sync"] is not None
# We seeded from a local path so version is None and source_url is the path string.
assert status["source_url"] == str(fixture_bundle_path)
assert status["version"] is None # only set when source == MITRE_DEFAULT_URL
def test_checksum_mismatch_aborts(tmp_path):
"""A wrong sha256 triggers MitreChecksumMismatch and skips DB writes."""
path = tmp_path / "tiny.json"
path.write_text(json.dumps(MINIMAL_BUNDLE))
# Force the URL path so download() is invoked. We mock by passing a file:// URL.
# Simpler: call _download() directly with a bogus hash.
bogus = "0" * 64
with pytest.raises(mitre_svc.MitreChecksumMismatch):
mitre_svc._download(
f"file://{path}", tmp_path / "out.json", expected_sha256=bogus
)
# === API endpoint tests ======================================================
def test_list_tactics_requires_auth(app, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
r = c.get("/api/v1/mitre/tactics")
assert r.status_code == 401
def test_list_tactics_returns_seeded(app, admin_credentials, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
access = _login(c, admin_credentials["email"], admin_credentials["password"])
r = c.get(
"/api/v1/mitre/tactics", headers={"Authorization": f"Bearer {access}"}
)
assert r.status_code == 200
body = r.get_json()
assert body["total"] == 2
ids = [t["external_id"] for t in body["items"]]
assert "TA0001" in ids and "TA0002" in ids
def test_filter_techniques_by_tactic(app, admin_credentials, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
access = _login(c, admin_credentials["email"], admin_credentials["password"])
r = c.get(
"/api/v1/mitre/techniques?tactic=TA0002",
headers={"Authorization": f"Bearer {access}"},
)
assert r.status_code == 200
body = r.get_json()
# Only T1059 covers TA0002 (execution); T1078 covers initial-access only.
ext_ids = [t["external_id"] for t in body["items"]]
assert ext_ids == ["T1059"]
def test_subtechniques_listed_under_parent(app, admin_credentials, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
access = _login(c, admin_credentials["email"], admin_credentials["password"])
r = c.get(
"/api/v1/mitre/subtechniques?technique=T1059",
headers={"Authorization": f"Bearer {access}"},
)
assert r.status_code == 200
body = r.get_json()
ext_ids = [t["external_id"] for t in body["items"]]
assert ext_ids == ["T1059.001"]
def test_status_endpoint(app, admin_credentials, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
access = _login(c, admin_credentials["email"], admin_credentials["password"])
r = c.get("/api/v1/mitre/status", headers={"Authorization": f"Bearer {access}"})
assert r.status_code == 200
body = r.get_json()
assert body["last_sync"] is not None
assert body["default_url"].startswith("https://")
assert body["default_version"]
def test_sync_endpoint_requires_perm(app, admin_credentials, fixture_bundle_path):
"""A non-admin without mitre.sync gets 403."""
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
# Bootstrap a no-perm user via invitation.
admin_access = _login(c, admin_credentials["email"], admin_credentials["password"])
eve_email = _unique_email("eve")
inv = c.post(
"/api/v1/invitations",
headers={"Authorization": f"Bearer {admin_access}"},
json={"email_hint": eve_email},
)
token = inv.get_json()["token"]
c.post(
f"/api/v1/invitations/accept/{token}",
json={"email": eve_email, "password": "EvePass1234!"},
)
eve_access = _login(c, eve_email, "EvePass1234!")
r = c.post(
"/api/v1/mitre/sync", headers={"Authorization": f"Bearer {eve_access}"}
)
assert r.status_code == 403
def test_search_filter_on_name(app, admin_credentials, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
access = _login(c, admin_credentials["email"], admin_credentials["password"])
r = c.get(
"/api/v1/mitre/techniques?q=valid",
headers={"Authorization": f"Bearer {access}"},
)
assert r.status_code == 200
ext_ids = [t["external_id"] for t in r.get_json()["items"]]
assert ext_ids == ["T1078"]

View File

@@ -38,6 +38,7 @@ services:
EVIDENCE_DIR: ${EVIDENCE_DIR} EVIDENCE_DIR: ${EVIDENCE_DIR}
volumes: volumes:
- metamorph_evidence:/data/evidence - metamorph_evidence:/data/evidence
- metamorph_mitre:/data/mitre
depends_on: depends_on:
db: db:
condition: service_healthy condition: service_healthy
@@ -76,6 +77,7 @@ services:
volumes: volumes:
metamorph_db: metamorph_db:
metamorph_evidence: metamorph_evidence:
metamorph_mitre:
networks: networks:
metamorph: metamorph:

162
e2e/tests/m4-mitre.spec.ts Normal file
View File

@@ -0,0 +1,162 @@
import { expect, test, type APIRequestContext, type Page } from '@playwright/test';
/**
* M4 — MITRE ATT&CK Enterprise reference catalogue + tag picker.
*
* The seed itself (download + parse) is exercised by pytest with a small
* fixture bundle. This spec hits the live stack with the real, pinned bundle
* by calling `POST /mitre/sync` once and then validating the read endpoints
* + the picker UI.
*/
const ADMIN_EMAIL = `admin-${Math.floor(Math.random() * 1e6)}@metamorph.local`;
const ADMIN_PASSWORD = 'AdminPass1234!';
async function resetAndMintToken(request: APIRequestContext): Promise<string> {
const r = await request.post('/api/v1/diag/reset');
expect(r.status()).toBe(200);
return (await r.json()).install_token as string;
}
async function loginAndGetAccess(
request: APIRequestContext,
email: string,
password: string,
): Promise<string> {
const r = await request.post('/api/v1/auth/login', { data: { email, password } });
expect(r.status()).toBe(200);
return (await r.json()).access_token as string;
}
async function loginViaSpa(page: Page, email: string, password: string) {
await page.goto('/login');
await page.getByLabel(/email/i).fill(email);
await page.getByLabel(/password/i).fill(password);
await page.getByRole('button', { name: /sign in/i }).click();
await expect(page.getByTestId('me-email')).toHaveText(email);
}
test.describe.configure({ mode: 'serial' });
test.describe('M4 — MITRE ATT&CK reference', () => {
test.beforeAll(async ({ request }) => {
const installToken = await resetAndMintToken(request);
const setup = await request.post('/api/v1/setup', {
data: { install_token: installToken, email: ADMIN_EMAIL, password: ADMIN_PASSWORD },
});
expect(setup.status()).toBe(201);
// Trigger a real sync against the pinned MITRE URL. Idempotent — if the
// mitre_* tables were left populated by a previous run, this is a no-op
// upsert.
const access = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD);
const sync = await request.post('/api/v1/mitre/sync', {
headers: { Authorization: `Bearer ${access}` },
});
expect(sync.status(), `mitre sync failed: ${await sync.text()}`).toBe(200);
const result = await sync.json();
expect(result.tactics_upserted).toBeGreaterThanOrEqual(14);
expect(result.techniques_upserted).toBeGreaterThanOrEqual(180);
expect(result.subtechniques_upserted).toBeGreaterThanOrEqual(400);
});
test('GET /mitre/tactics returns 14+ Enterprise tactics', async ({ request }) => {
const access = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD);
const r = await request.get('/api/v1/mitre/tactics', {
headers: { Authorization: `Bearer ${access}` },
});
expect(r.status()).toBe(200);
const body = await r.json();
expect(body.total).toBeGreaterThanOrEqual(14);
const ids = body.items.map((t: { external_id: string }) => t.external_id);
expect(ids).toContain('TA0001'); // Initial Access
expect(ids).toContain('TA0006'); // Credential Access
});
test('GET /mitre/techniques?tactic=TA0006 filters', async ({ request }) => {
const access = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD);
const r = await request.get('/api/v1/mitre/techniques?tactic=TA0006&limit=200', {
headers: { Authorization: `Bearer ${access}` },
});
expect(r.status()).toBe(200);
const body = await r.json();
expect(body.total).toBeGreaterThan(0);
// OS Credential Dumping is the textbook TA0006 example.
const ids = body.items.map((t: { external_id: string }) => t.external_id);
expect(ids).toContain('T1003');
});
test('GET /mitre/subtechniques?technique=T1003 lists 8 sub-techniques', async ({ request }) => {
const access = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD);
const r = await request.get('/api/v1/mitre/subtechniques?technique=T1003', {
headers: { Authorization: `Bearer ${access}` },
});
expect(r.status()).toBe(200);
const ids = (await r.json()).items.map((t: { external_id: string }) => t.external_id);
expect(ids).toContain('T1003.001'); // LSASS Memory
expect(ids.length).toBeGreaterThanOrEqual(5);
});
test('GET /mitre/status returns version + last_sync', async ({ request }) => {
const access = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD);
const r = await request.get('/api/v1/mitre/status', {
headers: { Authorization: `Bearer ${access}` },
});
expect(r.status()).toBe(200);
const body = await r.json();
expect(body.last_sync).toBeTruthy();
expect(body.default_url).toContain('mitre-attack');
expect(body.default_version).toBeTruthy();
});
test('SPA MITRE page renders + picker walks tactic → technique → subtechnique', async ({ page }) => {
await loginViaSpa(page, ADMIN_EMAIL, ADMIN_PASSWORD);
await page.goto('/mitre');
// Status card shows a non-null last_sync.
await expect(page.getByTestId('mitre-last-sync')).not.toHaveText('never');
const picker = page.getByTestId('mitre-tag-picker');
await expect(picker).toBeVisible();
// 1. Click on TA0006 (Credential Access)
await picker.getByTestId('mitre-tactic-TA0006').click();
// 2. Techniques column populates; click T1003
await expect(picker.getByTestId('mitre-technique-T1003')).toBeVisible();
await picker.getByTestId('mitre-technique-T1003').click();
// 3. Sub-techniques column populates with T1003.001 onward
await expect(picker.getByTestId('mitre-subtechnique-T1003.001')).toBeVisible();
// 4. Select the sub-technique → chip appears in the selection bar
await picker.getByTestId('mitre-subtechnique-T1003.001').click();
await expect(page.getByTestId('mitre-selected')).toContainText('T1003.001');
// 5. Preview payload card shows the JSON encoded selection
await expect(page.getByTestId('mitre-selected-json')).toContainText('"T1003.001"');
});
test('Non-admin cannot trigger /mitre/sync', async ({ page, request }) => {
// Invite a no-perm user via the admin.
const adminAccess = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD);
const eveEmail = `eve-${Math.floor(Math.random() * 1e6)}@metamorph.local`;
const inv = await request.post('/api/v1/invitations', {
headers: { Authorization: `Bearer ${adminAccess}` },
data: { email_hint: eveEmail },
});
const token = (await inv.json()).token;
await request.post(`/api/v1/invitations/accept/${token}`, {
data: { email: eveEmail, password: 'EvePass1234!' },
});
const eveAccess = await loginAndGetAccess(request, eveEmail, 'EvePass1234!');
const r = await request.post('/api/v1/mitre/sync', {
headers: { Authorization: `Bearer ${eveAccess}` },
});
expect(r.status()).toBe(403);
// The MITRE page is reachable in read-only mode for any logged-in user,
// but the Sync card is hidden for non-admins.
await loginViaSpa(page, eveEmail, 'EvePass1234!');
await page.goto('/mitre');
await expect(page.getByTestId('mitre-tag-picker')).toBeVisible();
await expect(page.getByTestId('mitre-sync')).toHaveCount(0);
});
});

View File

@@ -8,6 +8,7 @@ import { AdminGroupsPage } from '@/pages/AdminGroupsPage';
import { AdminInvitationsPage } from '@/pages/AdminInvitationsPage'; import { AdminInvitationsPage } from '@/pages/AdminInvitationsPage';
import { AdminUsersPage } from '@/pages/AdminUsersPage'; import { AdminUsersPage } from '@/pages/AdminUsersPage';
import { HomePage } from '@/pages/HomePage'; import { HomePage } from '@/pages/HomePage';
import { MitrePage } from '@/pages/MitrePage';
import { LoginPage } from '@/pages/LoginPage'; import { LoginPage } from '@/pages/LoginPage';
import { ProfilePage } from '@/pages/ProfilePage'; import { ProfilePage } from '@/pages/ProfilePage';
import { RegisterPage } from '@/pages/RegisterPage'; import { RegisterPage } from '@/pages/RegisterPage';
@@ -49,6 +50,14 @@ function App() {
</RequireAuth> </RequireAuth>
} }
/> />
<Route
path="/mitre"
element={
<RequireAuth>
<MitrePage />
</RequireAuth>
}
/>
<Route <Route
path="/admin/users" path="/admin/users"
element={ element={

View File

@@ -36,6 +36,7 @@ export function Layout() {
<> <>
{navItem('/', 'Home')} {navItem('/', 'Home')}
{navItem('/profile', 'Profile')} {navItem('/profile', 'Profile')}
{navItem('/mitre', 'MITRE')}
{state.user.is_admin && ( {state.user.is_admin && (
<> <>
{navItem('/admin/users', 'Users')} {navItem('/admin/users', 'Users')}
@@ -68,7 +69,7 @@ export function Layout() {
<Outlet /> <Outlet />
<footer className="mt-[60px] py-8 border-t border-border text-center font-mono text-xs text-text-dim"> <footer className="mt-[60px] py-8 border-t border-border text-center font-mono text-xs text-text-dim">
metamorph · M0 bootstrap · M1 db schema · M2 auth · M3 rbac · design system from tasks/design.md metamorph · M0 bootstrap · M1 db schema · M2 auth · M3 rbac · M4 mitre · design system from tasks/design.md
</footer> </footer>
</div> </div>
</div> </div>

View File

@@ -0,0 +1,272 @@
import { useQuery } from '@tanstack/react-query';
import { useMemo, useState } from 'react';
import { Alert } from '@/components/ui/Alert';
import { Tag } from '@/components/ui/Tag';
import { TextField } from '@/components/ui/TextField';
import { apiGet } from '@/lib/api';
import {
mitreKeys,
type MitreSubtechnique,
type MitreTactic,
type MitreTag,
type MitreTechnique,
type Paginated,
} from '@/lib/mitre';
import { cn } from '@/lib/cn';
interface MitreTagPickerProps {
/** Already-selected tags. The parent owns the state. */
value: MitreTag[];
/** Called whenever the selection changes (replace semantics). */
onChange: (next: MitreTag[]) => void;
/** Hide the search box(es). Useful for compact embed in a sidebar. */
compact?: boolean;
className?: string;
}
function useTactics(q: string) {
return useQuery({
queryKey: mitreKeys.tactics(q),
queryFn: () =>
apiGet<Paginated<MitreTactic>>(
`/mitre/tactics${q ? `?q=${encodeURIComponent(q)}` : ''}`,
),
});
}
function useTechniques(tactic: string | null, q: string) {
return useQuery({
enabled: tactic !== null,
queryKey: mitreKeys.techniques(tactic ?? '', q),
queryFn: () => {
const params = new URLSearchParams();
if (tactic) params.set('tactic', tactic);
if (q) params.set('q', q);
return apiGet<Paginated<MitreTechnique>>(
`/mitre/techniques${params.toString() ? `?${params}` : ''}`,
);
},
});
}
function useSubtechniques(technique: string | null, q: string) {
return useQuery({
enabled: technique !== null,
queryKey: mitreKeys.subtechniques(technique ?? '', q),
queryFn: () => {
const params = new URLSearchParams();
if (technique) params.set('technique', technique);
if (q) params.set('q', q);
return apiGet<Paginated<MitreSubtechnique>>(
`/mitre/subtechniques${params.toString() ? `?${params}` : ''}`,
);
},
});
}
/**
* Three-column picker — Tactic > Technique > Sub-technique — with multi-select.
* Selected tags accumulate in the chips at the top.
*/
export function MitreTagPicker({ value, onChange, compact, className }: MitreTagPickerProps) {
const [activeTactic, setActiveTactic] = useState<string | null>(null);
const [activeTechnique, setActiveTechnique] = useState<string | null>(null);
const [qTactic, setQTactic] = useState('');
const [qTechnique, setQTechnique] = useState('');
const [qSub, setQSub] = useState('');
const tactics = useTactics(qTactic);
const techniques = useTechniques(activeTactic, qTechnique);
const subtechniques = useSubtechniques(activeTechnique, qSub);
const selectedKey = useMemo(() => new Set(value.map((t) => `${t.kind}:${t.external_id}`)), [value]);
function toggle(tag: MitreTag) {
const key = `${tag.kind}:${tag.external_id}`;
if (selectedKey.has(key)) {
onChange(value.filter((t) => `${t.kind}:${t.external_id}` !== key));
} else {
onChange([...value, tag]);
}
}
function colorForKind(kind: 'tactic' | 'technique' | 'subtechnique') {
return kind === 'tactic' ? 'cyan' : kind === 'technique' ? 'orange' : 'purple';
}
return (
<div className={cn('rounded-lg border border-border bg-bg-card p-4', className)} data-testid="mitre-tag-picker">
{value.length > 0 && (
<div className="mb-3 flex flex-wrap items-center gap-1" data-testid="mitre-selected">
{value.map((t) => (
<button
key={`${t.kind}:${t.external_id}`}
type="button"
onClick={() => toggle(t)}
className="inline-flex items-center"
aria-label={`Remove ${t.external_id}`}
>
<Tag accent={colorForKind(t.kind)}>
{t.external_id} · {t.name}
</Tag>
</button>
))}
</div>
)}
<div className="grid gap-3 md:grid-cols-3">
{/* Tactics column */}
<div>
{!compact && (
<TextField
label="Tactic search"
value={qTactic}
onChange={(e) => setQTactic(e.target.value)}
placeholder="e.g. Credential"
/>
)}
<div className="mt-2 max-h-72 overflow-y-auto" data-testid="mitre-tactics-column">
{tactics.isLoading && <p className="text-2xs text-text-dim">Loading</p>}
{tactics.data?.items.map((t) => {
const active = activeTactic === t.external_id;
const selected = selectedKey.has(`tactic:${t.external_id}`);
return (
<div
key={t.id}
className={cn(
'group flex items-center gap-2 rounded border px-2 py-1 cursor-pointer',
active ? 'border-cyan' : 'border-transparent hover:border-cyan',
)}
onClick={() => {
setActiveTactic(t.external_id);
setActiveTechnique(null);
}}
data-testid={`mitre-tactic-${t.external_id}`}
>
<input
type="checkbox"
checked={selected}
onClick={(e) => e.stopPropagation()}
onChange={() =>
toggle({ kind: 'tactic', id: t.id, external_id: t.external_id, name: t.name })
}
aria-label={`Select ${t.external_id}`}
/>
<span className="font-mono text-2xs text-cyan">{t.external_id}</span>
<span className="text-2xs">{t.name}</span>
</div>
);
})}
</div>
</div>
{/* Techniques column */}
<div>
{!compact && (
<TextField
label="Technique search"
value={qTechnique}
onChange={(e) => setQTechnique(e.target.value)}
placeholder="e.g. T1059"
/>
)}
<div className="mt-2 max-h-72 overflow-y-auto" data-testid="mitre-techniques-column">
{activeTactic === null && (
<p className="text-2xs text-text-dim">Select a tactic to list its techniques.</p>
)}
{techniques.isLoading && <p className="text-2xs text-text-dim">Loading</p>}
{techniques.data?.items.map((t) => {
const active = activeTechnique === t.external_id;
const selected = selectedKey.has(`technique:${t.external_id}`);
return (
<div
key={t.id}
className={cn(
'group flex items-center gap-2 rounded border px-2 py-1 cursor-pointer',
active ? 'border-orange' : 'border-transparent hover:border-orange',
)}
onClick={() => setActiveTechnique(t.external_id)}
data-testid={`mitre-technique-${t.external_id}`}
>
<input
type="checkbox"
checked={selected}
onClick={(e) => e.stopPropagation()}
onChange={() =>
toggle({
kind: 'technique',
id: t.id,
external_id: t.external_id,
name: t.name,
})
}
aria-label={`Select ${t.external_id}`}
/>
<span className="font-mono text-2xs text-orange">{t.external_id}</span>
<span className="text-2xs">{t.name}</span>
</div>
);
})}
{techniques.data && techniques.data.items.length === 0 && activeTactic && (
<p className="text-2xs text-text-dim">No techniques for this tactic.</p>
)}
</div>
</div>
{/* Sub-techniques column */}
<div>
{!compact && (
<TextField
label="Sub-technique search"
value={qSub}
onChange={(e) => setQSub(e.target.value)}
placeholder="e.g. Powershell"
/>
)}
<div className="mt-2 max-h-72 overflow-y-auto" data-testid="mitre-subtechniques-column">
{activeTechnique === null && (
<p className="text-2xs text-text-dim">Select a technique to list its sub-techniques.</p>
)}
{subtechniques.isLoading && <p className="text-2xs text-text-dim">Loading</p>}
{subtechniques.data?.items.map((sb) => {
const selected = selectedKey.has(`subtechnique:${sb.external_id}`);
return (
<div
key={sb.id}
className="flex items-center gap-2 rounded border border-transparent hover:border-purple px-2 py-1 cursor-pointer"
onClick={() =>
toggle({
kind: 'subtechnique',
id: sb.id,
external_id: sb.external_id,
name: sb.name,
})
}
data-testid={`mitre-subtechnique-${sb.external_id}`}
>
<input
type="checkbox"
checked={selected}
readOnly
aria-label={`Select ${sb.external_id}`}
/>
<span className="font-mono text-2xs text-purple">{sb.external_id}</span>
<span className="text-2xs">{sb.name}</span>
</div>
);
})}
{subtechniques.data && subtechniques.data.items.length === 0 && activeTechnique && (
<p className="text-2xs text-text-dim">No sub-techniques.</p>
)}
</div>
</div>
</div>
{(tactics.isError || techniques.isError || subtechniques.isError) && (
<Alert accent="red" className="mt-3">
Failed to load MITRE data has `make seed-mitre` been run?
</Alert>
)}
</div>
);
}

61
frontend/src/lib/mitre.ts Normal file
View File

@@ -0,0 +1,61 @@
/** Shared types + query keys for MITRE ATT&CK browsing. */
export interface MitreTactic {
id: string;
external_id: string;
short_name: string;
name: string;
description: string | null;
url: string | null;
}
export interface MitreTechnique {
id: string;
external_id: string;
name: string;
description: string | null;
url: string | null;
tactics: Array<{ external_id: string; name: string }>;
}
export interface MitreSubtechnique {
id: string;
external_id: string;
name: string;
description: string | null;
url: string | null;
technique_id: string;
}
export interface Paginated<T> {
items: T[];
total: number;
limit: number;
offset: number;
}
export interface MitreStatus {
last_sync: string | null;
version: string | null;
source_url: string | null;
default_url: string;
default_version: string;
}
export type MitreTagKind = 'tactic' | 'technique' | 'subtechnique';
export interface MitreTag {
kind: MitreTagKind;
id: string;
external_id: string;
name: string;
}
export const mitreKeys = {
status: ['mitre', 'status'] as const,
tactics: (q?: string) => ['mitre', 'tactics', q ?? ''] as const,
techniques: (tactic?: string, q?: string) =>
['mitre', 'techniques', tactic ?? '', q ?? ''] as const,
subtechniques: (technique?: string, q?: string) =>
['mitre', 'subtechniques', technique ?? '', q ?? ''] as const,
};

View File

@@ -61,7 +61,7 @@ export function HomePage() {
<span className="text-purple">Purple Team Platform</span> <span className="text-purple">Purple Team Platform</span>
</h1> </h1>
<p className="font-mono text-sm font-light text-text-dim mt-2"> <p className="font-mono text-sm font-light text-text-dim mt-2">
Collaborative red &amp; blue test orchestration M3 milestone (RBAC) Collaborative red &amp; blue test orchestration M4 milestone (MITRE ATT&amp;CK)
</p> </p>
</header> </header>
<SectionHeader <SectionHeader
@@ -141,9 +141,9 @@ export function HomePage() {
<Card accent="purple" title="Roadmap" sub="14 milestones"> <Card accent="purple" title="Roadmap" sub="14 milestones">
<p> <p>
M0 + M1 + M2 + M3 done. Next:{' '} M0 + M1 + M2 + M3 + M4 done. Next:{' '}
<code className="accent-fill-cyan px-2 py-[2px] rounded-sm font-mono text-4xs"> <code className="accent-fill-cyan px-2 py-[2px] rounded-sm font-mono text-4xs">
M4 MITRE ATT&amp;CK M5 Test &amp; scenario templates
</code> </code>
. .
</p> </p>

View File

@@ -0,0 +1,118 @@
import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query';
import { useState } from 'react';
import { MitreTagPicker } from '@/components/MitreTagPicker';
import { Alert } from '@/components/ui/Alert';
import { Button } from '@/components/ui/Button';
import { Card } from '@/components/ui/Card';
import { SectionHeader } from '@/components/ui/SectionHeader';
import { Tag } from '@/components/ui/Tag';
import { ApiError, apiGet, apiPost } from '@/lib/api';
import { useAuth } from '@/lib/auth';
import { mitreKeys, type MitreStatus, type MitreTag } from '@/lib/mitre';
export function MitrePage() {
const { state } = useAuth();
const qc = useQueryClient();
const [selected, setSelected] = useState<MitreTag[]>([]);
const [syncResult, setSyncResult] = useState<string | null>(null);
const [syncError, setSyncError] = useState<string | null>(null);
const status = useQuery({
queryKey: mitreKeys.status,
queryFn: () => apiGet<MitreStatus>('/mitre/status'),
});
const sync = useMutation({
mutationFn: () => apiPost<Record<string, unknown>>('/mitre/sync'),
onMutate: () => {
setSyncResult(null);
setSyncError(null);
},
onSuccess: async (res) => {
const counts = `${res.tactics_upserted} tactics, ${res.techniques_upserted} techniques, ${res.subtechniques_upserted} subtechniques`;
setSyncResult(`Sync completed in ${(res as { duration_ms: number }).duration_ms / 1000}s — ${counts}.`);
await qc.invalidateQueries({ queryKey: ['mitre'] });
},
onError: (e) => {
if (e instanceof ApiError) {
const p = e.payload as { error?: string; message?: string } | null;
setSyncError(p?.message ?? p?.error ?? `HTTP ${e.status}`);
} else {
setSyncError(e instanceof Error ? e.message : 'Sync failed');
}
},
});
return (
<>
<SectionHeader
prefix="MITRE"
highlight="ATT&CK"
accent="cyan"
description="Reference catalogue: tactics, techniques, sub-techniques. Tag picker reusable by tests and missions."
/>
<div className="grid gap-4 mb-6 [grid-template-columns:repeat(auto-fill,minmax(360px,1fr))]">
<Card accent="cyan" title="Source" sub={status.data?.version ? `version ${status.data.version}` : '—'}>
<p>
URL:{' '}
<code className="accent-fill-cyan px-2 py-[2px] rounded-sm font-mono text-4xs break-all">
{status.data?.source_url ?? status.data?.default_url ?? '—'}
</code>
</p>
<p className="mt-2">
Last sync:{' '}
<code
className="accent-fill-cyan px-2 py-[2px] rounded-sm font-mono text-4xs"
data-testid="mitre-last-sync"
>
{status.data?.last_sync ?? 'never'}
</code>
</p>
</Card>
{state.user?.is_admin && (
<Card accent="orange" title="Sync" sub="re-pull the bundle from the configured source">
<p className="text-2xs text-text-dim mb-3">
The seed CLI runs the same logic; trigger from here only if the URL is reachable from the api container.
</p>
<Button
accent="orange"
onClick={() => sync.mutate()}
disabled={sync.isPending}
data-testid="mitre-sync"
>
{sync.isPending ? 'Syncing…' : 'Trigger MITRE sync'}
</Button>
{syncResult && <Alert accent="green" className="mt-3">{syncResult}</Alert>}
{syncError && <Alert accent="red" className="mt-3">{syncError}</Alert>}
</Card>
)}
</div>
<SectionHeader prefix="Tag" highlight="Picker" accent="orange" />
<MitreTagPicker value={selected} onChange={setSelected} />
{selected.length > 0 && (
<Card accent="purple" className="mt-6" title="Selected (preview payload)">
<pre className="text-2xs text-text-bright whitespace-pre-wrap" data-testid="mitre-selected-json">
{JSON.stringify(selected, null, 2)}
</pre>
</Card>
)}
{selected.length === 0 && (
<p className="mt-3 font-mono text-2xs text-text-dim">
Pick a tactic on the left, then a technique, then optionally a sub-technique. Selections accumulate.
</p>
)}
<div className="mt-4">
<Tag accent="cyan">Tactic</Tag>
<Tag accent="orange">Technique</Tag>
<Tag accent="purple">Sub-technique</Tag>
</div>
</>
);
}

View File

@@ -41,6 +41,17 @@ project: Metamorph
- **Le test d'intégration "expected tables/FK/CHECK" est le bon filet de sécurité** pour M1+ : il a immédiatement attrapé les fixes du reviewer (le retrait de `ck_mission_test_mitre_tags_exactly_one_mitre_fk` aurait été un oubli silencieux sinon). - **Le test d'intégration "expected tables/FK/CHECK" est le bon filet de sécurité** pour M1+ : il a immédiatement attrapé les fixes du reviewer (le retrait de `ck_mission_test_mitre_tags_exactly_one_mitre_fk` aurait été un oubli silencieux sinon).
- **Lancer le DoD avant de dire "M1 done"** : règle gravée à M0, respectée ici. `make clean && make up && make migrate && make test-api && make e2e` est la séquence canonique de fin de milestone. - **Lancer le DoD avant de dire "M1 done"** : règle gravée à M0, respectée ici. `make clean && make up && make migrate && make test-api && make e2e` est la séquence canonique de fin de milestone.
## 2026-05-12 — M4 MITRE ATT&CK
- **STIX parsing avec stdlib uniquement** (`urllib.request` + `json` + `hashlib`) suffit pour 50 MB de bundle, ~1.1 s parse end-to-end. Pas besoin de `requests`/`httpx`. Toute future ingestion de gros JSON pinné → stdlib first, ne pas inflater l'image pour un cas d'usage one-shot.
- **Le sous-jacent MITRE évolue** : la spec mentionne "14 tactics" mais la v19 actuelle en ship 15 (Reconnaissance + Resource Development depuis v8). Les assertions de DoD sont à exprimer en `>= X` quand un référentiel externe est en jeu, pas en `== X`. Pattern : décorréler la valeur exacte du contrat (sinon la maintenance casse au prochain bump).
- **Sub-technique parent resolution** : la source authoritative est la `relationship[subtechnique-of]` STIX, pas la convention dotted-id `T1003.001 → T1003`. La regex en fallback ne sert que si la relation manque (jamais le cas avec MITRE officiel, mais utile pour bundles custom).
- **`session_scope()` enveloppe tout le seed dans une seule transaction** → les lecteurs externes ne voient jamais un état intermédiaire pendant le DELETE+INSERT de `mitre_technique_tactics`. Postgres READ COMMITTED isole. Pas besoin d'advisory lock sauf si on s'attend à des syncs concurrents.
- **Checksum bypass silencieux = footgun**. Avant : `source != MITRE_DEFAULT_URL``expected_sha256 = None`. Un admin qui type un domaine attaquant dans `mitre_source_url` ingère du JSON arbitraire sans intégrité. Patron correct : `MitreSeedError("custom URL requires an expected_sha256 or allow_unverified=True")`. L'opt-out explicite (`--skip-checksum` côté CLI, `allow_unverified: true` côté API) reste possible mais visible.
- **`/diag/reset` cohérence** : si on TRUNCATE `settings` mais pas les tables de référence MITRE (gardées car coûteuses à re-seeder), `GET /mitre/status` retourne `last_sync: null` alors que `GET /mitre/tactics` retourne 15 lignes. Discrepancy mensongère. Fix : TRUNCATE aussi les `mitre_*` dans `/diag/reset` (test-only endpoint, on accepte la re-sync via `/mitre/sync` en `beforeAll`).
- **Volume permissions et chown au build** : `mkdir -p /data/mitre && chown -R metamorph:metamorph /data` dans le Dockerfile suffit POUR le premier `make up` (podman copie l'ownership de l'image lors de l'init du named volume). Mais si un volume préexiste owned root, le chown ne replay pas. À documenter en pré-requis dans `tasks/testing-m<N>.md`, ou ajouter un entrypoint shim qui valide les perms au boot.
- **Build cache du front silencieux** : `podman build` montre `Using cache aad724...` même quand src/ a changé si le diff entre les arborescences est invisible (mtime). En cas de doute : `podman build --no-cache` une fois pour confirmer que le typecheck passe, puis `make down && make up` pour pousser le bundle. Réflexe à garder en mémoire.
## 2026-05-11 — M3 RBAC, groupes, users, invitations ## 2026-05-11 — M3 RBAC, groupes, users, invitations
- **`logging.LogRecord` réserve `name`** comme attribut interne (en plus de `message`, `levelname`, `pathname`, `filename`, `module`, `funcName`, `lineno`, `asctime`, `process`, `thread`, `args`). Donc `log.info("metamorph.x.created", extra={"name": entity.name})` lève `KeyError: "Attempt to overwrite 'name' in LogRecord"`. Patron : préfixer toute clé risquée par l'entité (`group_name`, `user_name`, `template_name`). À documenter dans le style guide quand on en aura un. - **`logging.LogRecord` réserve `name`** comme attribut interne (en plus de `message`, `levelname`, `pathname`, `filename`, `module`, `funcName`, `lineno`, `asctime`, `process`, `thread`, `args`). Donc `log.info("metamorph.x.created", extra={"name": entity.name})` lève `KeyError: "Attempt to overwrite 'name' in LogRecord"`. Patron : préfixer toute clé risquée par l'entité (`group_name`, `user_name`, `template_name`). À documenter dans le style guide quand on en aura un.

View File

@@ -137,7 +137,7 @@ Remplace l'aller-retour Excel actuel par une UI partagée temps réel, multi-rô
1. Premier boot : token d'install affiché dans les logs, accès `/setup` permet de créer le 1er admin. 1. Premier boot : token d'install affiché dans les logs, accès `/setup` permet de créer le 1er admin.
2. Admin crée un groupe custom et lui attribue des permissions atomiques (write_red_fields uniquement, par exemple). 2. Admin crée un groupe custom et lui attribue des permissions atomiques (write_red_fields uniquement, par exemple).
3. Admin envoie un lien d'invitation, l'utilisateur s'enregistre avec succès et hérite des bons groupes. 3. Admin envoie un lien d'invitation, l'utilisateur s'enregistre avec succès et hérite des bons groupes.
4. Admin importe la matrice MITRE et crée un test unitaire avec Tactic+Technique+Sub-technique. 4. Admin importe la matrice MITRE et crée un test unitaire avec Tactic+Technique+Sub-technique. *(≥ 14 tactics Enterprise — la v19 du pin actuel en ship 15.)*
5. Admin compose un scénario de 3 tests ordonnés via drag-and-drop. 5. Admin compose un scénario de 3 tests ordonnés via drag-and-drop.
6. Red Teamer crée une mission avec scénario, assigne 1 red + 1 blue. 6. Red Teamer crée une mission avec scénario, assigne 1 red + 1 blue.
7. Red Teamer marque un test « exécuté », saisit commande+output, timestamp auto capturé. 7. Red Teamer marque un test « exécuté », saisit commande+output, timestamp auto capturé.

111
tasks/testing-m4.md Normal file
View File

@@ -0,0 +1,111 @@
---
type: testing
milestone: M4
date: "2026-05-12"
project: Metamorph
---
# Testing M4 — MITRE ATT&CK Enterprise
## 1. Lancement de la stack
```bash
make clean # reset si une stack tournait
make up # build + start db/api/front
make migrate
make seed-mitre # télécharge le bundle pinné v19.0 (~50 MB, ~1 s parse)
```
> **Permissions volume** : `metamorph_mitre` est créé chowné `metamorph:metamorph` par le Dockerfile à la 1ʳᵉ initialisation. Si tu as un volume préexistant (d'une expé antérieure) appartenant à root, le seed échouera avec `PermissionError`. Solution : `podman volume rm metamorph_metamorph_mitre` avant `make up`.
## 2. Tests automatisés
```bash
make test-api # 51 tests pytest dont 12 nouveaux MITRE (parser + endpoints)
make e2e # 34 tests Playwright dont 6 M4
```
Le rapport HTML est dans `e2e/playwright-report/`, le JUnit dans `e2e/playwright-report/junit.xml`.
## 3. Procédure manuelle (smoke navigateur)
### Pré-requis
- Stack up, migrations appliquées, `make seed-mitre` exécuté.
- Le bundle est cache dans le volume `metamorph_mitre` (`/data/mitre/enterprise-attack-19.0.json`). Pour ré-utiliser un fichier local : `flask --app app.cli metamorph seed-mitre --source /chemin/vers/enterprise-attack.json`.
### 3.1 Page MITRE (`/mitre`)
1. Se connecter en admin.
2. Cliquer **MITRE** dans la nav → page chargée.
3. Carte **Source** : vérifier `version 19.0` + URL pinnée + `Last sync` non vide.
4. Carte **Sync** (admin uniquement) : cliquer **Trigger MITRE sync** → bannière verte avec counts (15 tactics / 222 techniques / 475 subtechniques).
5. Picker :
- Cliquer **TA0006 — Credential Access** dans la colonne gauche.
- La colonne du milieu liste **T1003 OS Credential Dumping** et 16 autres techniques.
- Cliquer **T1003** → la colonne droite liste **T1003.001** à **T1003.008**.
- Cocher la case en face de **T1003.001 PowerShell**.
- Un chip cyan/orange/purple apparaît en haut + la carte « Selected (preview payload) » montre le JSON.
- Cliquer le chip → désélection.
### 3.2 Filtres / recherche
1. Dans la colonne **Tactic search**, taper `cred` → seule TA0006 reste.
2. Sélectionner TA0006, dans **Technique search** taper `dump` → T1003 apparaît.
3. Vider les recherches, sélectionner T1059 → vérifier T1059.001 à T1059.012 dans les sub-techniques.
### 3.3 Non-admin
1. Inviter un user sans perms via Admin > Invitations.
2. Se connecter en tant que ce user.
3. Naviguer sur `/mitre` → page accessible, picker fonctionnel (read-only).
4. La carte **Sync** n'apparaît PAS (UI gate `is_admin`).
5. Tenter `POST /api/v1/mitre/sync` via curl avec son token → **403** `insufficient permissions`.
### 3.4 Re-sync admin
```bash
ACCESS=$(curl -sX POST http://localhost:8080/api/v1/auth/login \
-H 'Content-Type: application/json' \
-d '{"email":"admin@metamorph.local","password":"AdminPass1234!"}' | jq -r .access_token)
curl -sX POST http://localhost:8080/api/v1/mitre/sync \
-H "Authorization: Bearer $ACCESS" | jq
```
Sortie attendue :
```json
{
"tactics_upserted": 15,
"techniques_upserted": 222,
"subtechniques_upserted": 475,
"subtechniques_skipped_orphan": 0,
"technique_tactic_links": 254,
"version": "19.0",
"duration_ms": ~1000
}
```
### 3.5 Sync via URL custom
```bash
curl -sX POST http://localhost:8080/api/v1/mitre/sync \
-H "Authorization: Bearer $ACCESS" \
-H 'Content-Type: application/json' \
-d '{"source":"https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/enterprise-attack/enterprise-attack-18.1.json"}' | jq
```
- Avec une URL ≠ pinned : sha256 désactivé, `version` est `null` (on ne connaît pas la version d'un fichier custom).
### 3.6 Mode air-gap
1. Préparer un STIX 2.1 valide localement : `enterprise-attack-19.0.json`.
2. Le copier dans le volume :
```bash
podman cp enterprise-attack-19.0.json metamorph-api:/data/mitre/
```
3. Lancer le seed pointé sur le path : `podman compose exec api flask --app app.cli metamorph seed-mitre --source /data/mitre/enterprise-attack-19.0.json --skip-checksum`
## 4. Points de contrôle critiques
- [x] `make seed-mitre` initial pinné v19.0 → 15/222/475 sans orphans.
- [x] Re-lancer le seed est idempotent (mêmes counts).
- [x] `/mitre/tactics` retourne 15 tactics (la spec mentionne 14 — MITRE en a 15 depuis v8).
- [x] `/mitre/techniques?tactic=TA0006` retourne ≥ 17 techniques incl. T1003.
- [x] `/mitre/subtechniques?technique=T1003` retourne 8 sub-techniques.
- [x] `/mitre/status` expose `last_sync`, `version`, `source_url`, `default_url`, `default_version`.
- [x] `/mitre/sync` exige la perm `mitre.sync` (admin via bypass `is_admin`).
- [x] Sha256 mismatch sur la pinned URL → 502 `checksum_mismatch`, DB intacte.
- [x] Bundle local (`--source <path>`) bypasse la vérif checksum.
- [x] Picker SPA : tactic → technique → subtechnique, multi-select, déselection via chip cliquable.
- [x] Non-admin : voit la page `/mitre` mais pas la carte Sync ; `POST /mitre/sync` → 403.

View File

@@ -101,7 +101,7 @@ spec: tasks/spec.md
--- ---
## M4 — MITRE ATT&CK Enterprise ## M4 — MITRE ATT&CK Enterprise
**But** : le référentiel ATT&CK est interrogeable et tagué sur les tests. **But** : le référentiel ATT&CK est interrogeable et tagué sur les tests.