Compare commits

...

14 Commits

Author SHA1 Message Date
e5f3de8f55 Merge pull request 'feature/m4-mitre' (#1) from feature/m4-mitre into main
Reviewed-on: #1
2026-05-12 17:24:14 +00:00
Knacky
2c85f9b57e docs(m4): reconcile CHANGELOG + testing-m4 with the flat matrix + CR fixes
- CHANGELOG M4 Added: rewrote the frontend bullet to describe the actual
  flat ATT&CK matrix that ships (full-bleed, 15-col grid with minmax(7rem,
  1fr), name-only cells, ▸/▾ chevron). The original entry still described
  the abandoned 3-column drill-down picker.
- New "Fixed (post-M4 code-review pass)" subsection enumerating the six
  CR-driven fixes that landed in this branch (SSRF allowlist, advisory
  lock, typed contract, N+1 elimination, version clearing, error scrub +
  the test additions and e2e count pinning).
- DoD counts: 53 → 58 pytest, 34 e2e unchanged. testing-m4.md follows.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 19:19:44 +02:00
Knacky
8b1de6e258 test(m4): cover the new security guards + pin e2e to exact MITRE v19 counts
- 5 new pytest covering paths the code-reviewer flagged as un-asserted:
    * `test_seed_refuses_file_url` — `file://` scheme rejected before I/O
      (was the SSRF-to-local-FS vector).
    * `test_seed_refuses_disallowed_https_host` — non-allowlisted HTTPS
      host rejected with `MitreSourceForbidden`.
    * `test_seed_refuses_custom_url_without_sha` — end-to-end guard that
      `seed_mitre(source=<custom URL>, expected_sha256=None,
      allow_unverified=False)` raises `MitreSeedError`.
    * `test_dotted_id_fallback_resolves_orphan_subtechnique` — STIX bundle
      without `relationship[subtechnique-of]` still attaches T1059.001 to
      T1059 via the dotted-id convention.
    * `test_seed_clears_version_when_source_is_not_default` — seed from a
      local path leaves `settings.mitre_version` NULL (no stale pin).
- Existing `test_checksum_mismatch_aborts` reworked to monkey-patch
  `_ensure_host_allowed` so `file://` can drive the test past the allowlist
  gate (was relying on file:// being accepted before CR1).
- Removed unused `uuid` import.
- e2e: assertions on `tactics_upserted`/`techniques_upserted`/
  `subtechniques_upserted` switched from `>= 14/180/400` thresholds to
  `=== 15/222/475` exact counts pinned to MITRE Enterprise v19.0 + 0
  orphans. Catches parser regressions that would silently include revoked
  rows. Bump alongside MITRE_VERSION when re-pinning.
- e2e: `Math.random()` → `crypto.randomUUID().slice(0, 8)` for unique
  test-run emails (collision-safe across parallel CI workers).

DoD: 58 pytest pass (was 53), 34 Playwright pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 19:19:34 +02:00
Knacky
54adfee690 fix(m4): typed MitreSyncResult interface — drop the as cast
Mirrors the backend Pydantic `SyncResultOut` in TS so the mutation result is
properly typed end-to-end. `(res as { duration_ms: number })` cast removed
from MitrePage.tsx; `apiPost<MitreSyncResult>` carries the contract.

Also annotated the unused query-key factories in mitre.ts so the next reader
knows they're parked for M5 template-form consumption (not dead).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 19:19:19 +02:00
Knacky
63b48addc0 fix(m4): code-review pass — SSRF allowlist + advisory lock + typed contract
Six post-code-review fixes, applied before opening the PR per project
workflow (spec-review + code-review both gate the merge):

1. SSRF allowlist on `/mitre/sync`. Host must be in MITRE_ALLOWED_HOSTS
   (defaults to `raw.githubusercontent.com`, env-overridable). Closes "admin
   holding `mitre.sync` pivots api container at 169.254.169.254 / internal
   mirrors" via a typo'd URL. New `MitreSourceForbidden` → 400
   `source_forbidden`; checked at the top of `_download()` so it kicks in
   before any I/O.

2. `pg_advisory_xact_lock(hashtext('mitre.seed'))` at the top of the seed
   transaction. Two concurrent `/mitre/sync` requests now serialise across
   the DELETE+INSERT of `mitre_technique_tactics`; previously they could
   both wipe the M2M and one would fail the unique constraint on re-insert.

3. Typed SyncResult contract. Pydantic `SyncResultOut` on the Flask side
   `model_validate`s the dict before returning — single source of truth
   for the response shape, mirrored by a `MitreSyncResult` TS interface
   (next commit). The `as Record<string, unknown>` + `as { duration_ms }`
   cast in MitrePage is gone.

4. N+1 in dotted sub-technique fallback removed. Built
   `{external_id → technique_id}` once at function entry. Currently a
   no-op against MITRE official (0 orphans), but a latent footgun for
   partial / older bundles.

5. `SETTING_VERSION` cleared explicitly when `source != MITRE_DEFAULT_URL`.
   Previously it kept the stale pin label, so `/mitre/status` lied after
   a custom-URL re-sync.

6. `/mitre/sync` 500s no longer echo `str(e)` to the client — URLError /
   psycopg / Pydantic text now lives in the JSON log only. Public response
   stays `{"error": "internal_error"}`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 19:19:11 +02:00
Knacky
7a69f10f3e docs(m4): post-review polish — helper text + test counts
Spec-reviewer PASS pointed two factual nits:
- MitrePage helper text still referenced the old 3-column drill-down ("Pick
  a tactic on the left, then a technique..."). Reworded for the flat matrix
  with the ▸ glyph + hover-for-id idiom.
- testing-m4.md + CHANGELOG were stale at 51/12; the actual counts are 53/14
  after the GET /mitre/matrix tests landed. Reconciled.

No code-path change, no e2e fallout — DoD remains 53 pytest + 34 Playwright.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 18:58:51 +02:00
Knacky
b52cb0e5e4 refactor(m4): full-bleed matrix + word-only line breaks
Two follow-up tweaks per user feedback ("wrap sur les mots, agrandit le
cadre"):

- Full-bleed wrapper: the matrix breaks out of the page's max-w-page (1400px)
  constraint via `margin: 0 calc(50% - 50vw)` + `width: 100vw`, mirroring the
  60px page padding internally. On wide viewports the picker now uses the
  ENTIRE viewport width, so column widths grow proportionally — names that
  used to wrap on 3 lines now fit on 1-2.
- Word-only wrapping: replaced `break-words` (overflow-wrap: break-word,
  which falls back to mid-word breaks) with `break-normal hyphens-none`
  (overflow-wrap: normal + word-break: normal). Cells break only at word
  boundaries; if a single word is longer than the cell it overflows
  visually rather than splitting `Aut\nhentication`-style. The grid is
  configured `minmax(7rem, 1fr)` so the minimum column is wide enough for
  every single word in MITRE v19 names, and stretches with available space.
- Spec §F2 rewritten as a bullet contract locking in: full-bleed, 15 cols
  minmax(7rem, 1fr), word-only wrap, font sans 12px / count 10px, headers/
  cells show name-only with external_id on hover + chips. Future spec-reviewer
  passes can grade against this.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 18:53:51 +02:00
Knacky
8742fb2b6e refactor(m4): match attack.mitre.org sizing — equal-width cols, name-only cells
Visual parity pass against attack.mitre.org/# per user feedback ("trop dense,
illisible, je veux la même représentation"):

- Layout switched from flex+fixed-width 224px columns to a CSS grid of
  `repeat(N, minmax(0, 1fr))` so the 15 tactic columns share the container
  width equally. No more horizontal scroll on a standard desktop.
- Cells now show NAME ONLY (matches mitre.org). The external_id (TA00xx /
  T1xxx / T1xxx.xxx) is preserved in the chip selection bar at the top and
  in the `title` hover tooltip on every cell — surfaces on demand, doesn't
  consume cell real estate.
- Font: switched to `font-sans` (IBM Plex Sans) at `text-xs` (12px) across
  cells, matching the mitre.org typography. Headers use the same family at
  the same size with a 10px sub-line for the technique count.
- Chevron icons: ▸ (collapsed) / ▾ (expanded) — small, sub-technique count
  rendered inline beside the chevron.
- Helper line below the matrix tells the user where the IDs went.

Spec §F2 + testing-m4.md walkthrough rewritten to lock the new sizing rules
in (font-xs, no external_id in cells, hover/chip for the ID, no horizontal
scroll). spec-reviewer will see the matching contract.

DoD: make e2e → 34 passed. Selectors (data-testid + aria-pressed) unchanged
so the existing M4 e2e test still walks the new layout end-to-end.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 18:41:11 +02:00
Knacky
7dbe2dbc28 refactor(m4): flatten the MITRE picker into the attack.mitre.org matrix
The hierarchical 3-column drill-down was hard to scan and forced a stateful
walk per tag. Replaced with a flat, columns-as-tactics matrix that mirrors
attack.mitre.org/# — every cell is a one-click select target, with inline
sub-technique expand via a `+N` chevron.

- New endpoint GET /api/v1/mitre/matrix returns the full grid (tactics →
  techniques → sub-techniques nested) in a single ~55 KB response, so the
  SPA renders the whole matrix without firing 15 parallel queries. Two
  pytest tests added (nested structure + auth required).
- MitreTagPicker.tsx rewritten as a horizontal-scrolling matrix:
  - Click a tactic header → select the tactic (cyan filled).
  - Click a technique cell → select the technique (orange filled).
  - Click the `+N` chevron → expand sub-techniques inline within the column.
  - Click a sub-technique → select (purple filled).
  - Single Filter field matches on external_id or name across all kinds.
  - Selection chips at the top, clickable to remove.
  - `aria-pressed` on every clickable cell for screen readers and Playwright.
- e2e test updated to walk the new flow (click cell → assert aria-pressed,
  expand chevron, click sub, verify chip + JSON preview, filter to T1078).
- Spec §F2 + §F12 + todo.md M4 entry updated to make the matrix layout the
  canonical UI for MITRE tagging (so future spec-reviewer passes accept it).
- testing-m4.md walkthrough rewritten for the flat picker.

DoD post-refactor: make test-api → 53 passed (was 51), make e2e → 34 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 18:32:20 +02:00
Knacky
37e9e03f02 docs(m4): CHANGELOG, README, lessons, spec drift fix, todo tick
- CHANGELOG: added M4 section listing endpoints, CLI, volume, persisted
  settings, picker, and the post-spec-review fixes (custom-URL integrity
  requirement + /diag/reset consistency + spec drift). Includes the
  intentional decisions paragraph (seed-time download not image-baked, read
  endpoints unauthenticated-perm-wise, stdlib over httpx).
- README: status bumped to M0–M4, added MITRE quickstart (make seed-mitre +
  air-gapped path with --source /data/mitre/<file> + --skip-checksum),
  testing-m<N>.md pointer updated to testing-m4.md, roadmap line.
- tasks/spec.md §10 #4: amended "14 tactics Enterprise" → "≥14 tactics
  Enterprise (la v19 du pin actuel en ship 15)".
- tasks/lessons.md: 7 M4 lessons captured (stdlib STIX parsing, decoupling
  DoD asserts from upstream versions, subtechnique parent resolution, single-
  transaction safety, custom-URL footgun mitigation, /diag/reset consistency,
  named-volume permission caveat, podman build cache surprise).
- tasks/todo.md: M4 marked ☑.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 13:54:46 +02:00
Knacky
90036437cc test(m4): pytest parser + endpoints + e2e tag picker
- backend/tests/test_mitre.py: 12 integration tests using a hand-crafted
  minimal STIX bundle (no network in tests). Covers parser
  (revoked/deprecated skip, sub-technique parent linkage), seed idempotence,
  persisted settings, checksum mismatch path, all four read endpoints, perm
  enforcement on /mitre/sync, ILIKE search.
- e2e/tests/m4-mitre.spec.ts: 6 Playwright tests against the live stack.
  beforeAll calls POST /mitre/sync once (real bundle, ~50 MB, ~1.1 s) then
  the suite validates tactics ≥14, T1003 has ≥5 sub-techniques, the picker
  walks tactic→technique→subtechnique with chip multi-select, and non-admin
  sees /mitre but no Sync card.
- tasks/testing-m4.md: manual + automated checklist, air-gapped operator
  notes, volume-permission caveat for pre-existing root-owned volumes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 13:54:26 +02:00
Knacky
8a1dd58c83 feat(m4): frontend MitreTagPicker + /mitre showcase page
- lib/mitre.ts: shared types (MitreTactic, Technique, Subtechnique, MitreTag
  kind/id/external_id/name) + TanStack query keys.
- components/MitreTagPicker.tsx: three-column controlled picker (tactic →
  technique → subtechnique), multi-select with chip-removal, autocomplete on
  each column, ARIA labels for screen readers. Returns MitreTag[] via
  value/onChange — drop-in for M5 template forms.
- pages/MitrePage.tsx: status card (version, source URL, last_sync), admin-
  gated Trigger Sync button with success/error alerts, picker showcase, JSON
  preview of the current selection.
- Layout adds MITRE nav link for any logged-in user; App.tsx adds the
  /mitre route under RequireAuth. HomePage roadmap bumped to next: M5
  templates.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 13:54:15 +02:00
Knacky
872f3c046a feat(m4): REST endpoints + admin sync + /diag/reset consistency
- GET /api/v1/mitre/tactics, /techniques?tactic=&q=, /subtechniques?technique=&q=
  (paginated, ILIKE search on name + external_id, @require_auth only — MITRE
  is public reference material).
- GET /api/v1/mitre/status: last_sync, version, source_url + the pinned
  defaults (default_url, default_version) for the SPA badge.
- POST /api/v1/mitre/sync: @require_perm("mitre.sync"). Body supports
  {source, expected_sha256, allow_unverified} — defaults inherit the pin.
- /diag/reset now also TRUNCATEs the mitre_* tables alongside settings so a
  freshly-reset stack has GET /mitre/status and GET /mitre/tactics agree
  ("no data, no last_sync"). Previously the catalogue persisted while the
  metadata was wiped, leaving status to lie. The e2e suite re-syncs in
  beforeAll.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 13:54:03 +02:00
Knacky
ba976959a1 feat(m4): STIX parser + seed service + CLI
- backend/app/services/mitre_seed.py: stdlib-only STIX 2.1 parser (urllib +
  hashlib + json). Pinned to enterprise-attack-19.0.json with sha256
  df520ea0775a57db7bff760145b02fed89290802913e056b7ed5970b02f3626a (~52 MB,
  ~1.1 s parse). Resolves sub-technique parents via
  relationship[subtechnique-of] with a T1003.001→T1003 dotted-id fallback;
  upserts on external_id, rebuilds the technique↔tactic M2M in a single
  transaction so external readers never see an empty join. Persists
  mitre_last_sync, mitre_version, mitre_source_url in the settings table.
- Custom URLs MUST be paired with expected_sha256 OR allow_unverified=true —
  refuses silent integrity bypass.
- CLI: flask metamorph seed-mitre [--source path|url]
  [--checksum-sha256 hex] [--skip-checksum]. Make target wraps it.
- Docker: /data/mitre/ chowned to the metamorph user at build; named volume
  metamorph_mitre mounted from compose for cross-restart cache.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 13:53:53 +02:00
21 changed files with 2260 additions and 17 deletions

View File

@@ -4,6 +4,51 @@ All notable changes to this project will be documented here. Format: [Keep a Cha
## [Unreleased]
### Added — M4 (MITRE ATT&CK Enterprise)
- **STIX 2.1 parser + upsert** (`app/services/mitre_seed.py`): stdlib-only (`urllib.request` + `hashlib`), pinned to Enterprise v19.0 (`enterprise-attack-19.0.json`, sha256 `df520ea0…`). Parses 25k+ STIX objects → 15 tactics, 222 techniques, 475 sub-techniques in ~1.1 s. Skips revoked + deprecated, resolves sub-technique parents via `relationship[subtechnique-of]` with a `T1003.001 → T1003` dotted-id fallback, copies kill-chain phases into the `mitre_technique_tactics` M2M.
- **CLI**: `flask metamorph seed-mitre [--source <path|url>] [--checksum-sha256 <hex>] [--skip-checksum]` (`app/cli.py`). `make seed-mitre` wraps it.
- **REST endpoints** (`app/api/mitre.py`):
- `GET /api/v1/mitre/tactics`, `/mitre/techniques?tactic=…&q=…`, `/mitre/subtechniques?technique=…&q=…` (paginated, search on name/external_id).
- `GET /api/v1/mitre/status` (last_sync, version, source_url, defaults).
- `POST /api/v1/mitre/sync` (perm `mitre.sync`) — re-pull on demand.
- **Persisted metadata** in `settings`: `mitre_last_sync`, `mitre_version`, `mitre_source_url`.
- **Compose volume `metamorph_mitre`** mounted at `/data/mitre/` in the api container — caches the downloaded bundle across restarts. Owned by `metamorph:metamorph`.
- **Frontend**:
- `<MitreTagPicker>` component: flat ATT&CK matrix matching `attack.mitre.org/#` — full-bleed beyond `max-w-page`, 15 equal-width columns via `grid-template-columns: repeat(N, minmax(7rem, 1fr))`, sans-serif 12px, **name-only cells** (external_id surfaces on hover via `title` and in selection chips), `▸/▾` chevron expands sub-techniques inline within the column, multi-select with chip-removal at the top. Returns `MitreTag[]` (`kind`, `id`, `external_id`, `name`), ready for M5 templates.
- `/mitre` showcase page with status card, admin-gated **Trigger sync** button, the picker, and a JSON `<pre>` preview of the current selection.
- Nav adds **MITRE** link for any logged-in user.
- **Testing**:
- `backend/tests/test_mitre.py`**12 pytest** (parser, idempotence, checksum mismatch, persisted settings, endpoint variants, perm enforcement) using a hand-crafted minimal STIX bundle (no network in tests).
- `e2e/tests/m4-mitre.spec.ts`**6 Playwright** against the live stack (calls `/mitre/sync` once in `beforeAll`).
- `tasks/testing-m4.md`.
### Fixed (post-M4 spec-review pass)
- **Sync integrity guarantee**: `seed_mitre()` now refuses a custom URL without either `expected_sha256` or an explicit `allow_unverified=true`. Closes a "typo in `mitre_source_url` setting routes the seed to attacker JSON" footgun. CLI surfaces this via `--checksum-sha256` / `--skip-checksum`; API via `{"source", "expected_sha256", "allow_unverified"}` body.
- **`/diag/reset` consistency**: now truncates the `mitre_*` tables alongside `settings` so `GET /mitre/status` and `GET /mitre/tactics` agree after a reset (previously: catalogue rows persisted, but `mitre_last_sync` got wiped → status lied).
- **Spec drift §10 #4**: amended "14 tactics" → "≥ 14 tactics (v19 ships 15)" to reflect MITRE v8+ reality.
### Fixed (post-M4 code-review pass)
- **SSRF allowlist on `/mitre/sync`**: host must be in `MITRE_ALLOWED_HOSTS` (defaults to `raw.githubusercontent.com`, comma-separated env override). Closes the "admin holding `mitre.sync` can pivot the api container at cloud metadata (`169.254.169.254`) or internal mirrors" vector. New `MitreSourceForbidden` exception → 400 with `source_forbidden` error code.
- **Concurrent sync race**: `seed_mitre()` now acquires `pg_advisory_xact_lock(hashtext('mitre.seed'))` at the top of the transaction so two `/mitre/sync` calls serialise cleanly across the `DELETE` + re-`INSERT` of `mitre_technique_tactics`.
- **Typed sync contract end-to-end**: Pydantic `SyncResultOut` on the backend (`app/api/mitre.py`) mirrored by a `MitreSyncResult` TS interface (`frontend/src/lib/mitre.ts`). The MitrePage mutation no longer uses an `as Record<string, unknown>` escape hatch.
- **N+1 in dotted sub-technique fallback**: pre-built `{external_id → id}` dict at function entry; was firing one extra SELECT per orphan (currently 0 with MITRE, but a latent footgun for partial bundles).
- **`SETTING_VERSION` cleared explicitly when source != default**: previously kept the stale pinned version after a custom-URL re-sync; now `_upsert_setting(..., None)` so `/mitre/status` doesn't lie.
- **Internal error scrub on `/mitre/sync`**: 500 responses no longer leak URLError / DB driver text via `str(e)` — stack lands in JSON logs only.
- **E2E pinned to exact MITRE v19 counts** (15/222/475/0 orphans) for parser-regression detection; previously `>=` thresholds could mask "revoked tactics silently included".
- **E2E uses `crypto.randomUUID()`** instead of `Math.random()` for unique test emails.
- **Test coverage for security guards**: `file://` rejection, disallowed HTTPS host, custom-URL-without-sha refusal, dotted-id fallback, version-clearing semantics — 5 new pytest covering paths the spec-review demanded but no test enforced.
### Decisions (intentional)
- **Bundle "embarqué" interpreted as seed-time download + named-volume cache**, not "binary baked into the Docker image". Keeps the image ~150 MB, makes version bumps a constant edit, plays nicely with `make seed-mitre` re-runs. Air-gapped operators copy the file into the volume + pass `--source /data/mitre/<file>`.
- **Read endpoints unauthenticated-perm-wise but auth-required**: MITRE data is public reference material — no `mitre.read` perm. Status endpoint is similarly open (under `@require_auth`) to keep `/mitre/status` simple for the UI badge.
- **No `requests` / `httpx` dep added**: stdlib `urllib.request` is enough and avoids inflating the image.
### Validated end-to-end (M4 DoD)
- `make clean && make up && make migrate && make seed-mitre` → 15 tactics / 222 techniques / 475 sub-techniques / 254 links / 0 orphans / ~1.1 s.
- `make test-api`**58 pytest pass** (1 health + 8 schema + 15 auth + 15 RBAC + 19 MITRE) in ~5 s.
- `make e2e`**34 Playwright pass** (8 M0 + 4 M1 + 8 M2 + 8 M3 + 6 M4) in ~18 s.
- Spec-reviewer PASS after fixes applied.
### Added — M3 (RBAC: groups, permissions, users)
- **Permission catalogue** (`app/services/permissions_seed.py`): 31 atomic codes across 10 families (`user`, `group`, `invitation`, `test_template`, `scenario_template`, `mission`, `detection_level`, `setting`, `mitre.sync`). Seeded at boot **and** after `/setup` to handle a freshly truncated DB. Idempotent + additive on system groups (never removes a perm).
- **Default group bindings**: `admin` = all 31 codes; `redteam` = 8 (catalogue read + mission.{read,create,update,archive,write_red_fields} + detection_level.read); `blueteam` = 5 (catalogue read + mission.{read,write_blue_fields} + detection_level.read).

View File

@@ -2,13 +2,15 @@
Collaborative purple-team platform. Red team logs the tests they execute (procedure, command, timestamp); blue team annotates each test with detection evidence (alerts, logs, files). At the end of an engagement, Metamorph generates a standalone reveal.js slide deck classified by MITRE ATT&CK tactic.
> **Status**: M0 (bootstrap). See `tasks/spec.md` for the full specification and `tasks/todo.md` for the milestone-by-milestone plan.
> **Status**: M0M4 delivered (bootstrap → DB schema → auth → RBAC → MITRE ATT&CK reference). See `tasks/spec.md` for the full specification and `tasks/todo.md` for the milestone-by-milestone plan.
## Stack
- **Backend**: Python 3.12, Flask 3, SQLAlchemy 2 + Alembic (M1+), PostgreSQL 16.
- **Frontend**: React 18 + TypeScript + Vite + TailwindCSS (RTOps design tokens, see `tasks/design.md`).
- **Auth (M2+)**: JWT access (1h) + refresh (30d), Argon2id, invite-link enrollment.
- **RBAC (M3+)**: atomic permissions (31 codes) bundled into custom groups; 3 system groups seeded (`admin` / `redteam` / `blueteam`).
- **MITRE ATT&CK (M4+)**: Enterprise reference catalogue pinned to v19.0, seedable via `make seed-mitre`.
- **Delivery**: docker-compose. TLS termination is expected to be handled by an external reverse proxy in production.
## Quickstart
@@ -43,6 +45,17 @@ Then:
- Front: <http://localhost:8080>
- API health: <http://localhost:8080/api/v1/health> (proxied) or <http://localhost:8000/api/v1/health>
### First-time setup
```bash
make migrate # apply DB schema
make print-install-token # prints the bootstrap admin token (logs banner)
# visit http://localhost:8080/setup, paste the token, create the admin account
make seed-mitre # populate the MITRE ATT&CK reference (~50 MB, ~1 s)
```
The MITRE bundle is cached in the named volume `metamorph_mitre` (`/data/mitre/<file>.json` inside the api container). For air-gapped operators, pre-populate the volume with your own STIX 2.1 file and run `podman compose exec api flask --app app.cli metamorph seed-mitre --source /data/mitre/your-file.json --skip-checksum`.
To stop:
```bash
@@ -80,7 +93,7 @@ See `.env.example`. The most important ones:
## Testing
- **Manual + automated checklist for the current milestone**: see [`tasks/testing-m<N>.md`](tasks/testing-m0.md) (currently `testing-m0.md`).
- **Manual + automated checklist for the current milestone**: see [`tasks/testing-m<N>.md`](tasks/testing-m4.md) (current: `testing-m4.md`).
- **Backend unit tests**: `make test-api`
- **End-to-end (Playwright)**: `make e2e-install` (once), then `make up && make e2e`. Reports land in `e2e/playwright-report/` (HTML + JUnit XML); open with `make e2e-report`.
@@ -122,7 +135,7 @@ The hooks run `ruff` + `ruff-format` on the backend and `eslint` / `tsc --noEmit
## Roadmap
See `tasks/todo.md`. Current milestone: **M0bootstrap**.
See `tasks/todo.md`. Current milestone: **M4MITRE ATT&CK Enterprise**. Next: M5 (test & scenario templates).
## License

View File

@@ -30,7 +30,7 @@ ENV PYTHONDONTWRITEBYTECODE=1 \
# Non-root user
RUN groupadd --gid 10001 metamorph \
&& useradd --uid 10001 --gid metamorph --shell /usr/sbin/nologin --create-home metamorph \
&& mkdir -p /data/evidence \
&& mkdir -p /data/evidence /data/mitre \
&& chown -R metamorph:metamorph /data
COPY --from=deps /opt/venv /opt/venv

View File

@@ -66,12 +66,23 @@ def reset_test_state():
try:
with get_engine().begin() as conn:
# Auth + RBAC + settings reset.
conn.execute(
text(
"TRUNCATE users, refresh_tokens, invitations, invitation_groups, "
"user_groups, settings, groups RESTART IDENTITY CASCADE"
)
)
# MITRE reference reset — kept in sync with `settings` so a freshly
# reset stack has `GET /mitre/status` and `GET /mitre/tactics` agree
# ("no data, no last_sync"). The e2e suite re-syncs via /mitre/sync
# when it needs catalogue data.
conn.execute(
text(
"TRUNCATE mitre_technique_tactics, mitre_subtechniques, "
"mitre_techniques, mitre_tactics RESTART IDENTITY CASCADE"
)
)
except SQLAlchemyError as e:
log.error("metamorph.diag.reset_failed", extra={"error": str(e)})
return jsonify({"reset": False, "error": "database_error"}), 500

296
backend/app/api/mitre.py Normal file
View File

@@ -0,0 +1,296 @@
"""MITRE ATT&CK reference endpoints.
Read access is open to any authenticated user (the catalogue is reference
data — not sensitive on its own). Sync is admin-only via `mitre.sync`.
"""
from __future__ import annotations
import logging
from flask import Blueprint, jsonify, request
from pydantic import BaseModel
from sqlalchemy import func, or_, select
from app.core.auth_decorators import require_auth, require_perm
from app.db.session import session_scope
from app.models.mitre import MitreSubtechnique, MitreTactic, MitreTechnique
from app.services import mitre_seed as mitre_seed_svc
class SyncResultOut(BaseModel):
"""Response schema for `POST /mitre/sync`. Mirrors `SeedResult.as_dict()`."""
tactics_upserted: int
techniques_upserted: int
subtechniques_upserted: int
subtechniques_skipped_orphan: int
technique_tactic_links: int
version: str | None
source: str
started_at: str
finished_at: str
duration_ms: int
bp = Blueprint("mitre", __name__, url_prefix="/mitre")
log = logging.getLogger("metamorph.api.mitre")
def _pagination_args() -> tuple[int, int] | tuple[None, tuple[int, str]]:
"""Returns (limit, offset) or (None, (status, error_payload))."""
try:
limit = int(request.args.get("limit", "100"))
offset = int(request.args.get("offset", "0"))
except ValueError:
return None, (400, "invalid_pagination")
limit = max(1, min(limit, 500))
offset = max(0, offset)
return limit, offset
def _search(stmt, model, q: str | None):
if not q:
return stmt
like = f"%{q.lower()}%"
return stmt.where(
or_(
func.lower(model.name).like(like),
func.lower(model.external_id).like(like),
)
)
def _serialize_tactic(t: MitreTactic) -> dict:
return {
"id": str(t.id),
"external_id": t.external_id,
"short_name": t.short_name,
"name": t.name,
"description": t.description,
"url": t.url,
}
def _serialize_technique(t: MitreTechnique, *, include_tactics: bool = True) -> dict:
out = {
"id": str(t.id),
"external_id": t.external_id,
"name": t.name,
"description": t.description,
"url": t.url,
}
if include_tactics:
out["tactics"] = sorted(
({"external_id": tac.external_id, "name": tac.name} for tac in t.tactics),
key=lambda d: d["external_id"],
)
return out
def _serialize_subtechnique(sb: MitreSubtechnique) -> dict:
return {
"id": str(sb.id),
"external_id": sb.external_id,
"name": sb.name,
"description": sb.description,
"url": sb.url,
"technique_id": str(sb.technique_id),
}
@bp.get("/tactics")
@require_auth
def list_tactics():
paging = _pagination_args()
if paging[0] is None:
return jsonify({"error": paging[1][1]}), paging[1][0]
limit, offset = paging
q = request.args.get("q") or None
with session_scope() as s:
stmt = select(MitreTactic).order_by(MitreTactic.external_id.asc())
stmt = _search(stmt, MitreTactic, q)
total = s.scalar(_search(select(func.count()).select_from(MitreTactic), MitreTactic, q)) or 0
rows = s.scalars(stmt.limit(limit).offset(offset)).all()
return jsonify(
{
"items": [_serialize_tactic(t) for t in rows],
"total": int(total),
"limit": limit,
"offset": offset,
}
)
@bp.get("/techniques")
@require_auth
def list_techniques():
paging = _pagination_args()
if paging[0] is None:
return jsonify({"error": paging[1][1]}), paging[1][0]
limit, offset = paging
q = request.args.get("q") or None
tactic = request.args.get("tactic") or None
with session_scope() as s:
stmt = select(MitreTechnique).order_by(MitreTechnique.external_id.asc())
count_stmt = select(func.count()).select_from(MitreTechnique)
if tactic:
tac = s.scalar(select(MitreTactic).where(MitreTactic.external_id == tactic))
if tac is None:
return jsonify({"items": [], "total": 0, "limit": limit, "offset": offset})
stmt = stmt.join(MitreTechnique.tactics).where(MitreTactic.id == tac.id)
count_stmt = (
count_stmt.select_from(MitreTechnique)
.join(MitreTechnique.tactics)
.where(MitreTactic.id == tac.id)
)
stmt = _search(stmt, MitreTechnique, q)
count_stmt = _search(count_stmt, MitreTechnique, q)
total = s.scalar(count_stmt) or 0
rows = s.scalars(stmt.limit(limit).offset(offset)).all()
return jsonify(
{
"items": [_serialize_technique(t) for t in rows],
"total": int(total),
"limit": limit,
"offset": offset,
}
)
@bp.get("/subtechniques")
@require_auth
def list_subtechniques():
paging = _pagination_args()
if paging[0] is None:
return jsonify({"error": paging[1][1]}), paging[1][0]
limit, offset = paging
q = request.args.get("q") or None
technique = request.args.get("technique") or None
with session_scope() as s:
stmt = select(MitreSubtechnique).order_by(MitreSubtechnique.external_id.asc())
count_stmt = select(func.count()).select_from(MitreSubtechnique)
if technique:
tech = s.scalar(
select(MitreTechnique).where(MitreTechnique.external_id == technique)
)
if tech is None:
return jsonify({"items": [], "total": 0, "limit": limit, "offset": offset})
stmt = stmt.where(MitreSubtechnique.technique_id == tech.id)
count_stmt = count_stmt.where(MitreSubtechnique.technique_id == tech.id)
stmt = _search(stmt, MitreSubtechnique, q)
count_stmt = _search(count_stmt, MitreSubtechnique, q)
total = s.scalar(count_stmt) or 0
rows = s.scalars(stmt.limit(limit).offset(offset)).all()
return jsonify(
{
"items": [_serialize_subtechnique(sb) for sb in rows],
"total": int(total),
"limit": limit,
"offset": offset,
}
)
@bp.get("/matrix")
@require_auth
def matrix():
"""Return the full Enterprise matrix: tactics → techniques → sub-techniques.
One-shot endpoint so the SPA can render the flat attack.mitre.org-style
grid without firing 15 parallel queries. The payload is ~55 KB serialised
against MITRE v19 (15 tactics × ~50 techniques × ~3 subs).
"""
with session_scope() as s:
# All techniques + their tactics (selectin-loaded by the relationship).
techniques = s.scalars(
select(MitreTechnique).order_by(MitreTechnique.external_id.asc())
).all()
# Sub-techniques bucketed by parent.
subs_by_parent: dict = {}
for sb in s.scalars(
select(MitreSubtechnique).order_by(MitreSubtechnique.external_id.asc())
).all():
subs_by_parent.setdefault(sb.technique_id, []).append(
{
"id": str(sb.id),
"external_id": sb.external_id,
"name": sb.name,
}
)
# Tactics in canonical kill-chain order (matches attack.mitre.org).
tactics = s.scalars(
select(MitreTactic).order_by(MitreTactic.external_id.asc())
).all()
# Group techniques by tactic short_name.
techs_by_tactic: dict = {}
for t in techniques:
entry = {
"id": str(t.id),
"external_id": t.external_id,
"name": t.name,
"subtechniques": subs_by_parent.get(t.id, []),
}
for tac in t.tactics:
techs_by_tactic.setdefault(tac.short_name, []).append(entry)
return jsonify(
{
"tactics": [
{
"id": str(t.id),
"external_id": t.external_id,
"short_name": t.short_name,
"name": t.name,
"techniques": techs_by_tactic.get(t.short_name, []),
}
for t in tactics
]
}
)
@bp.get("/status")
@require_auth
def status():
return jsonify(mitre_seed_svc.read_status())
@bp.post("/sync")
@require_auth
@require_perm("mitre.sync")
def sync():
"""Re-pull the configured (or default) STIX source and upsert.
Custom `source` URLs MUST be paired with either `expected_sha256` (integrity
guarantee) or `allow_unverified: true` (explicit opt-out) — the seed service
will raise otherwise. The host is allowlisted (defaults to
raw.githubusercontent.com, overridable via the MITRE_ALLOWED_HOSTS env).
"""
payload = request.get_json(silent=True) or {}
source = payload.get("source") # optional URL override
expected_sha256 = payload.get("expected_sha256")
allow_unverified = bool(payload.get("allow_unverified", False))
try:
result = mitre_seed_svc.seed_mitre(
source=source,
expected_sha256=expected_sha256
or (mitre_seed_svc.MITRE_DEFAULT_SHA256 if source is None else None),
allow_unverified=allow_unverified,
)
except mitre_seed_svc.MitreSourceForbidden as e:
return jsonify({"error": "source_forbidden", "message": str(e)}), 400
except mitre_seed_svc.MitreChecksumMismatch as e:
return jsonify({"error": "checksum_mismatch", "message": str(e)}), 502
except mitre_seed_svc.MitreSeedError as e:
return jsonify({"error": "seed_failed", "message": str(e)}), 502
except Exception: # noqa: BLE001
# Do NOT leak the internal error string to the client (URLError stack,
# DB driver text). The stack lands in our JSON logs.
log.exception("metamorph.api.mitre.sync_failed")
return jsonify({"error": "internal_error"}), 500
# Validate via the Pydantic Out model so the response contract is
# explicit (single source of truth shared with the TS interface).
payload_out = SyncResultOut.model_validate(result.as_dict()).model_dump()
log.info("metamorph.api.mitre.sync_done", extra=payload_out)
return jsonify(payload_out)

View File

@@ -9,6 +9,7 @@ from app.api.diag import bp as diag_bp
from app.api.groups import bp as groups_bp
from app.api.health import bp as health_bp
from app.api.invitations import bp as invitations_bp
from app.api.mitre import bp as mitre_bp
from app.api.permissions import bp as permissions_bp
from app.api.setup import bp as setup_bp
from app.api.users import bp as users_bp
@@ -22,3 +23,4 @@ bp.register_blueprint(invitations_bp)
bp.register_blueprint(users_bp)
bp.register_blueprint(groups_bp)
bp.register_blueprint(permissions_bp)
bp.register_blueprint(mitre_bp)

View File

@@ -56,10 +56,66 @@ def print_install_token(force: bool):
@metamorph.command("seed-mitre")
def seed_mitre():
"""Placeholder for M4 — left so `make seed-mitre` doesn't crash."""
click.echo("MITRE seeding will land in M4. (no-op for now)", err=True)
sys.exit(0)
@click.option(
"--source",
default=None,
help="STIX bundle source: local path or HTTPS URL. Defaults to the pinned MITRE Enterprise release.",
)
@click.option(
"--checksum-sha256",
"checksum_sha256",
default=None,
help="Expected sha256 of the bundle (required with a non-default --source URL unless --skip-checksum).",
)
@click.option(
"--skip-checksum",
is_flag=True,
help="Skip sha256 verification entirely (escape hatch for testing).",
)
def seed_mitre(source: str | None, checksum_sha256: str | None, skip_checksum: bool):
"""Seed/refresh the MITRE ATT&CK Enterprise reference tables.
Upserts on `external_id`. Re-running with the same source updates the
name/description/url and re-applies the technique↔tactic mapping.
"""
from app.services.mitre_seed import (
MITRE_DEFAULT_SHA256,
MITRE_DEFAULT_URL,
seed_mitre as seed_mitre_svc,
)
if skip_checksum:
expected_sha = None
elif checksum_sha256:
expected_sha = checksum_sha256
elif source is None or source == MITRE_DEFAULT_URL:
expected_sha = MITRE_DEFAULT_SHA256
else:
expected_sha = None # let seed_mitre_svc decide whether to refuse
click.echo(
f"Seeding from {source or MITRE_DEFAULT_URL} "
f"(sha256 check: {'off' if skip_checksum else expected_sha or 'unverified'}) ...",
err=True,
)
try:
result = seed_mitre_svc(
source=source,
expected_sha256=expected_sha,
allow_unverified=skip_checksum,
)
except Exception as e: # noqa: BLE001
click.echo(f"seed-mitre failed: {e}", err=True)
sys.exit(2)
click.echo(
f" tactics: {result.tactics_upserted}, "
f"techniques: {result.techniques_upserted}, "
f"subtechniques: {result.subtechniques_upserted} "
f"(skipped orphans: {result.subtechniques_skipped_orphan}), "
f"links: {result.technique_tactic_links}, "
f"duration: {(result.finished_at - result.started_at).total_seconds():.1f}s",
err=True,
)
app.cli.add_command(metamorph)

View File

@@ -0,0 +1,515 @@
"""MITRE ATT&CK Enterprise seed + sync.
Parses a STIX 2.1 bundle into the `mitre_*` tables. Idempotent: re-running
upserts on `external_id`, refreshes name/description/url, and re-applies the
technique↔tactic mapping. Sub-techniques whose parent is missing in the
bundle are skipped (with a WARNING log).
Defaults pin a specific Enterprise release (see `MITRE_DEFAULT_*`). The pin
is honored by the CLI (`flask metamorph seed-mitre`) and by the
`POST /mitre/sync` admin endpoint; both accept a `--source` / `source_url`
override for air-gapped operators.
The bundle is downloaded with `urllib.request` (stdlib — no extra dep) and
cached at `MITRE_BUNDLE_CACHE_PATH` (default `/data/mitre/<basename>.json`).
Pass an absolute path as `source` to bypass the network entirely.
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import urllib.parse
import urllib.request
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable
from sqlalchemy import delete, select, text as sql_text
from app.db.session import session_scope
from app.models.mitre import (
MitreSubtechnique,
MitreTactic,
MitreTechnique,
MitreTechniqueTactic,
)
from app.models.setting import Setting
log = logging.getLogger("metamorph.mitre.seed")
# === Default pin =============================================================
#
# MITRE publishes versioned bundles at
# `https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/enterprise-attack/enterprise-attack-<X.Y>.json`.
# Update these three constants in lock-step when bumping the pin. The SHA256
# is verified against the downloaded bytes — a mismatch aborts the seed.
#
MITRE_VERSION = "19.0"
MITRE_DEFAULT_URL = (
"https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/"
"enterprise-attack/enterprise-attack-19.0.json"
)
MITRE_DEFAULT_SHA256 = "df520ea0775a57db7bff760145b02fed89290802913e056b7ed5970b02f3626a"
MITRE_BUNDLE_CACHE_PATH = Path(os.environ.get("MITRE_CACHE_DIR", "/data/mitre"))
MITRE_DOWNLOAD_TIMEOUT_SECONDS = 120
# Hosts authorised as a source for a MITRE sync. An admin holding `mitre.sync`
# could otherwise pivot the in-container HTTP fetch to internal services
# (169.254.169.254, db, internal mirrors). Override via the `MITRE_ALLOWED_HOSTS`
# env (comma-separated) when running against a private mirror.
MITRE_ALLOWED_HOSTS: frozenset[str] = frozenset(
h.strip()
for h in os.environ.get(
"MITRE_ALLOWED_HOSTS", "raw.githubusercontent.com"
).split(",")
if h.strip()
)
# Settings keys used to expose the seed metadata to the operator UI/CLI.
SETTING_LAST_SYNC = "mitre_last_sync"
SETTING_VERSION = "mitre_version"
SETTING_SOURCE_URL = "mitre_source_url"
ATTACK_SOURCE_NAME = "mitre-attack"
KILL_CHAIN_NAME = "mitre-attack"
class MitreSeedError(Exception):
pass
class MitreChecksumMismatch(MitreSeedError):
pass
class MitreSourceForbidden(MitreSeedError):
"""The provided source URL points to a host outside the allowlist."""
@dataclass
class ParsedBundle:
tactics: list[dict] = field(default_factory=list)
techniques: list[dict] = field(default_factory=list) # parent techniques
subtechniques: list[dict] = field(default_factory=list)
# Map: subtechnique attack-pattern STIX id -> parent technique STIX id
subtechnique_parents: dict[str, str] = field(default_factory=dict)
spec_version: str | None = None
@dataclass
class SeedResult:
tactics_upserted: int
techniques_upserted: int
subtechniques_upserted: int
subtechniques_skipped_orphan: int
technique_tactic_links: int
version: str | None
source: str
started_at: datetime
finished_at: datetime
def as_dict(self) -> dict:
return {
"tactics_upserted": self.tactics_upserted,
"techniques_upserted": self.techniques_upserted,
"subtechniques_upserted": self.subtechniques_upserted,
"subtechniques_skipped_orphan": self.subtechniques_skipped_orphan,
"technique_tactic_links": self.technique_tactic_links,
"version": self.version,
"source": self.source,
"started_at": self.started_at.isoformat(),
"finished_at": self.finished_at.isoformat(),
"duration_ms": int(
(self.finished_at - self.started_at).total_seconds() * 1000
),
}
# === I/O =====================================================================
def _is_url(source: str) -> bool:
parsed = urllib.parse.urlparse(source)
return parsed.scheme in ("http", "https")
def _ensure_host_allowed(url: str) -> None:
"""Raise MitreSourceForbidden if the URL targets a non-allowlisted host."""
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in ("http", "https"):
raise MitreSourceForbidden(f"unsupported URL scheme: {parsed.scheme!r}")
host = (parsed.hostname or "").lower()
if host not in MITRE_ALLOWED_HOSTS:
raise MitreSourceForbidden(
f"host {host!r} not in MITRE_ALLOWED_HOSTS={sorted(MITRE_ALLOWED_HOSTS)}"
)
def _sha256_of(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1 << 16), b""):
h.update(chunk)
return h.hexdigest()
def _download(url: str, dest: Path, *, expected_sha256: str | None = None) -> Path:
_ensure_host_allowed(url)
dest.parent.mkdir(parents=True, exist_ok=True)
tmp = dest.with_suffix(dest.suffix + ".part")
log.info("metamorph.mitre.download.start", extra={"url": url, "dest": str(dest)})
req = urllib.request.Request(url, headers={"User-Agent": "metamorph-mitre-seed/1.0"})
with urllib.request.urlopen(req, timeout=MITRE_DOWNLOAD_TIMEOUT_SECONDS) as resp:
with tmp.open("wb") as f:
for chunk in iter(lambda: resp.read(1 << 16), b""):
f.write(chunk)
if expected_sha256:
actual = _sha256_of(tmp)
if actual != expected_sha256:
tmp.unlink(missing_ok=True)
raise MitreChecksumMismatch(
f"sha256 mismatch for {url}: expected {expected_sha256}, got {actual}"
)
tmp.replace(dest)
log.info(
"metamorph.mitre.download.done",
extra={"url": url, "bytes": dest.stat().st_size},
)
return dest
def resolve_source_to_path(
source: str | Path | None,
*,
cache_dir: Path = MITRE_BUNDLE_CACHE_PATH,
expected_sha256: str | None = MITRE_DEFAULT_SHA256,
) -> tuple[Path, str]:
"""Return (path, source_label). Downloads if `source` is an URL; otherwise
treats it as a local file. `None` → default URL.
`source_label` is what we persist in `settings.mitre_source_url`."""
if source is None:
source = MITRE_DEFAULT_URL
source_str = str(source)
if _is_url(source_str):
basename = source_str.rsplit("/", 1)[-1] or "enterprise-attack.json"
dest = cache_dir / basename
_download(source_str, dest, expected_sha256=expected_sha256)
return dest, source_str
path = Path(source_str)
if not path.exists():
raise MitreSeedError(f"source path does not exist: {path}")
return path, str(path)
# === STIX parsing ============================================================
def _attack_ref(obj: dict) -> dict | None:
for ref in obj.get("external_references") or ():
if ref.get("source_name") == ATTACK_SOURCE_NAME and ref.get("external_id"):
return ref
return None
def parse_bundle(path: Path) -> ParsedBundle:
"""Read the STIX bundle into normalized dicts ready for SQL upserts."""
with path.open("r", encoding="utf-8") as f:
bundle = json.load(f)
objs = bundle.get("objects") or []
parsed = ParsedBundle(spec_version=bundle.get("spec_version"))
parents_by_subtech: dict[str, str] = {}
for o in objs:
if (
o.get("type") == "relationship"
and o.get("relationship_type") == "subtechnique-of"
and not o.get("revoked")
):
parents_by_subtech[o["source_ref"]] = o["target_ref"]
parsed.subtechnique_parents = parents_by_subtech
for o in objs:
if o.get("revoked") or o.get("x_mitre_deprecated"):
continue
kind = o.get("type")
if kind == "x-mitre-tactic":
ref = _attack_ref(o)
if not ref:
continue
parsed.tactics.append(
{
"external_id": ref["external_id"],
"name": o.get("name") or "",
"short_name": o.get("x_mitre_shortname") or "",
"description": o.get("description"),
"url": ref.get("url"),
}
)
elif kind == "attack-pattern":
ref = _attack_ref(o)
if not ref:
continue
common = {
"external_id": ref["external_id"],
"name": o.get("name") or "",
"description": o.get("description"),
"url": ref.get("url"),
}
if o.get("x_mitre_is_subtechnique"):
parent_stix = parents_by_subtech.get(o["id"])
parsed.subtechniques.append(
{**common, "stix_id": o["id"], "parent_stix_id": parent_stix}
)
else:
# Capture kill_chain_phases so we can map to tactics by short_name.
phases = [
p.get("phase_name")
for p in (o.get("kill_chain_phases") or ())
if p.get("kill_chain_name") == KILL_CHAIN_NAME and p.get("phase_name")
]
parsed.techniques.append(
{**common, "stix_id": o["id"], "phase_names": phases}
)
return parsed
# === DB upserts ==============================================================
def _upsert_tactics(s, tactics: Iterable[dict]) -> tuple[dict, int]:
"""Upsert tactics. Returns (short_name → tactic_id, n_upserted)."""
existing = {t.external_id: t for t in s.scalars(select(MitreTactic)).all()}
short_to_id: dict = {}
upserted = 0
for t in tactics:
row = existing.get(t["external_id"])
if row is None:
row = MitreTactic(
external_id=t["external_id"],
short_name=t["short_name"],
name=t["name"],
description=t["description"],
url=t["url"],
)
s.add(row)
s.flush()
upserted += 1
else:
row.short_name = t["short_name"]
row.name = t["name"]
row.description = t["description"]
row.url = t["url"]
upserted += 1
short_to_id[t["short_name"]] = row.id
return short_to_id, upserted
def _upsert_techniques(
s, techniques: Iterable[dict], short_to_tactic_id: dict
) -> tuple[dict, int, int]:
"""Upsert techniques + their tactic links. Returns (stix_id→technique_id, n_upserted, n_links)."""
existing = {t.external_id: t for t in s.scalars(select(MitreTechnique)).all()}
stix_to_id: dict = {}
n_upserted = 0
n_links = 0
# We'll rebuild the technique↔tactic mapping for clarity (drop + add). This
# is O(techniques × tactics) but cheap relative to the parse itself.
s.execute(delete(MitreTechniqueTactic))
for t in techniques:
row = existing.get(t["external_id"])
if row is None:
row = MitreTechnique(
external_id=t["external_id"],
name=t["name"],
description=t["description"],
url=t["url"],
)
s.add(row)
s.flush()
else:
row.name = t["name"]
row.description = t["description"]
row.url = t["url"]
n_upserted += 1
stix_to_id[t["stix_id"]] = row.id
for phase in t.get("phase_names", []):
tac_id = short_to_tactic_id.get(phase)
if tac_id is None:
# Tactic referenced but not in bundle — log + skip.
log.warning(
"metamorph.mitre.unknown_tactic_phase",
extra={"technique": t["external_id"], "phase": phase},
)
continue
s.add(MitreTechniqueTactic(technique_id=row.id, tactic_id=tac_id))
n_links += 1
return stix_to_id, n_upserted, n_links
def _upsert_subtechniques(
s,
subtechniques: Iterable[dict],
stix_to_tech_id: dict,
) -> tuple[int, int]:
"""Returns (n_upserted, n_skipped_orphans).
`n_upserted` is the count of rows whose state was applied (INSERT or
UPDATE) — matches Postgres upsert semantics.
"""
existing = {sb.external_id: sb for sb in s.scalars(select(MitreSubtechnique)).all()}
# Pre-index techniques by external_id so the dotted-id fallback doesn't
# issue N+1 SELECTs (was a latent footgun for partial-bundle re-syncs).
parent_by_external: dict[str, object] = {
t.external_id: t.id
for t in s.scalars(select(MitreTechnique)).all()
}
n_upserted = 0
n_skipped = 0
for sb in subtechniques:
parent_stix = sb.get("parent_stix_id")
parent_id = stix_to_tech_id.get(parent_stix) if parent_stix else None
if parent_id is None:
# Fall back to the dotted external_id convention (T1003.001 → T1003).
m = re.match(r"^(T\d+)\.\d+$", sb["external_id"])
if m:
parent_id = parent_by_external.get(m.group(1))
if parent_id is None:
log.warning(
"metamorph.mitre.orphan_subtechnique",
extra={"subtechnique": sb["external_id"]},
)
n_skipped += 1
continue
row = existing.get(sb["external_id"])
if row is None:
s.add(
MitreSubtechnique(
external_id=sb["external_id"],
name=sb["name"],
description=sb["description"],
url=sb["url"],
technique_id=parent_id,
)
)
else:
row.name = sb["name"]
row.description = sb["description"]
row.url = sb["url"]
row.technique_id = parent_id
n_upserted += 1
return n_upserted, n_skipped
def _upsert_setting(s, key: str, value: object) -> None:
row = s.scalar(select(Setting).where(Setting.key == key))
if row is None:
s.add(Setting(key=key, value=value))
else:
row.value = value
# === Entry point =============================================================
def seed_mitre(
*,
source: str | Path | None = None,
expected_sha256: str | None = MITRE_DEFAULT_SHA256,
cache_dir: Path = MITRE_BUNDLE_CACHE_PATH,
allow_unverified: bool = False,
) -> SeedResult:
"""Top-level seed. URL → download + verify + parse; path → just parse.
Custom URLs (anything other than `MITRE_DEFAULT_URL`) MUST be paired with
an `expected_sha256` for integrity, or with `allow_unverified=True` to opt
out explicitly. This avoids a silent integrity bypass when an operator
points the sync at a typo'd or attacker-controlled mirror.
"""
started_at = datetime.now(tz=timezone.utc)
if source is not None and _is_url(str(source)) and str(source) != MITRE_DEFAULT_URL:
if expected_sha256 is None or expected_sha256 == MITRE_DEFAULT_SHA256:
# The caller passed a non-default URL but didn't override the hash:
# MITRE_DEFAULT_SHA256 would obviously not match → force an explicit
# decision rather than silently bypassing.
if not allow_unverified:
raise MitreSeedError(
"custom URL requires an expected_sha256 (or allow_unverified=True)"
)
expected_sha256 = None
path, source_label = resolve_source_to_path(
source, cache_dir=cache_dir, expected_sha256=expected_sha256
)
parsed = parse_bundle(path)
log.info(
"metamorph.mitre.parsed",
extra={
"tactics": len(parsed.tactics),
"techniques": len(parsed.techniques),
"subtechniques": len(parsed.subtechniques),
"spec_version": parsed.spec_version,
},
)
with session_scope() as s:
# Serialize concurrent /mitre/sync calls. The lock is transaction-scoped
# (released automatically at COMMIT/ROLLBACK), so a second sync arriving
# while the first is mid-DELETE+INSERT of `mitre_technique_tactics`
# blocks until the first commits. Avoids the unique-constraint race the
# code-reviewer flagged. hashtext() is stable across sessions.
s.execute(sql_text("SELECT pg_advisory_xact_lock(hashtext('mitre.seed'))"))
short_to_tactic_id, n_tactics = _upsert_tactics(s, parsed.tactics)
stix_to_tech_id, n_techs, n_links = _upsert_techniques(
s, parsed.techniques, short_to_tactic_id
)
n_subs, n_orphan = _upsert_subtechniques(s, parsed.subtechniques, stix_to_tech_id)
finished_at = datetime.now(tz=timezone.utc)
_upsert_setting(s, SETTING_LAST_SYNC, finished_at.isoformat())
# `version` reflects the known pin only when seeded from MITRE_DEFAULT_URL;
# otherwise we explicitly clear it so /mitre/status doesn't lie about a
# stale version after a custom-URL re-sync.
version = MITRE_VERSION if source_label == MITRE_DEFAULT_URL else None
_upsert_setting(s, SETTING_VERSION, version)
_upsert_setting(s, SETTING_SOURCE_URL, source_label)
result = SeedResult(
tactics_upserted=n_tactics,
techniques_upserted=n_techs,
subtechniques_upserted=n_subs,
subtechniques_skipped_orphan=n_orphan,
technique_tactic_links=n_links,
version=version,
source=source_label,
started_at=started_at,
finished_at=finished_at,
)
log.info("metamorph.mitre.seed_completed", extra=result.as_dict())
return result
def read_status() -> dict:
"""Return the persisted seed metadata for `GET /mitre/status`."""
keys = {SETTING_LAST_SYNC, SETTING_VERSION, SETTING_SOURCE_URL}
out = {k: None for k in keys}
with session_scope() as s:
for row in s.scalars(select(Setting).where(Setting.key.in_(keys))).all():
out[row.key] = row.value
return {
"last_sync": out[SETTING_LAST_SYNC],
"version": out[SETTING_VERSION],
"source_url": out[SETTING_SOURCE_URL],
"default_url": MITRE_DEFAULT_URL,
"default_version": MITRE_VERSION,
}

453
backend/tests/test_mitre.py Normal file
View File

@@ -0,0 +1,453 @@
"""Integration tests for M4: STIX parser + seed + /mitre/* endpoints.
Uses a minimal hand-crafted STIX bundle (no network) so the parser logic and
the upsert semantics can be exercised deterministically.
"""
from __future__ import annotations
import json
import secrets
from pathlib import Path
import pytest
from sqlalchemy import text
from app.core.install_token import regenerate_install_token
from app.main import create_app
from app.services import mitre_seed as mitre_svc
def _truncate_all(engine):
with engine.begin() as conn:
conn.execute(
text(
"TRUNCATE users, refresh_tokens, invitations, invitation_groups, "
"user_groups, group_permissions, permissions, settings, groups, "
"mitre_subtechniques, mitre_technique_tactics, mitre_techniques, "
"mitre_tactics RESTART IDENTITY CASCADE"
)
)
@pytest.fixture(scope="module")
def app(db_engine_or_skip):
_truncate_all(db_engine_or_skip)
flask_app = create_app()
flask_app.config.update(TESTING=True)
return flask_app
@pytest.fixture()
def client(app):
return app.test_client()
def _unique_email(prefix: str) -> str:
return f"{prefix}-{secrets.token_hex(4)}@metamorph.local"
@pytest.fixture(scope="module")
def admin_credentials(app, db_engine_or_skip):
"""Bootstrap a fresh admin once for the whole module."""
token = regenerate_install_token()
email = _unique_email("admin")
password = "AdminPass1234!"
with app.test_client() as c:
r = c.post(
"/api/v1/setup",
json={"install_token": token, "email": email, "password": password},
)
assert r.status_code == 201, r.get_data(as_text=True)
return {"email": email, "password": password, "user_id": r.get_json()["user_id"]}
def _login(client, email: str, password: str) -> str:
r = client.post("/api/v1/auth/login", json={"email": email, "password": password})
assert r.status_code == 200, r.get_data(as_text=True)
return r.get_json()["access_token"]
# === Fixture STIX bundle =====================================================
MINIMAL_BUNDLE = {
"type": "bundle",
"id": "bundle--00000000-0000-0000-0000-000000000001",
"spec_version": "2.1",
"objects": [
# Tactic 1 — kept
{
"type": "x-mitre-tactic",
"id": "x-mitre-tactic--ta0001",
"name": "Initial Access",
"description": "Get a foothold.",
"x_mitre_shortname": "initial-access",
"external_references": [
{
"source_name": "mitre-attack",
"external_id": "TA0001",
"url": "https://attack.mitre.org/tactics/TA0001/",
}
],
},
# Tactic 2 — kept
{
"type": "x-mitre-tactic",
"id": "x-mitre-tactic--ta0002",
"name": "Execution",
"description": "Run code.",
"x_mitre_shortname": "execution",
"external_references": [
{
"source_name": "mitre-attack",
"external_id": "TA0002",
"url": "https://attack.mitre.org/tactics/TA0002/",
}
],
},
# Revoked tactic — must be skipped
{
"type": "x-mitre-tactic",
"id": "x-mitre-tactic--ta0099",
"name": "Doomed",
"x_mitre_shortname": "doomed",
"revoked": True,
"external_references": [
{"source_name": "mitre-attack", "external_id": "TA0099"}
],
},
# Technique T1059 covers both tactics
{
"type": "attack-pattern",
"id": "attack-pattern--t1059",
"name": "Command and Scripting Interpreter",
"description": "Use shells.",
"kill_chain_phases": [
{"kill_chain_name": "mitre-attack", "phase_name": "initial-access"},
{"kill_chain_name": "mitre-attack", "phase_name": "execution"},
],
"external_references": [
{
"source_name": "mitre-attack",
"external_id": "T1059",
"url": "https://attack.mitre.org/techniques/T1059/",
}
],
},
# Technique T1078 only initial-access
{
"type": "attack-pattern",
"id": "attack-pattern--t1078",
"name": "Valid Accounts",
"description": "Use legit creds.",
"kill_chain_phases": [
{"kill_chain_name": "mitre-attack", "phase_name": "initial-access"},
],
"external_references": [
{
"source_name": "mitre-attack",
"external_id": "T1078",
"url": "https://attack.mitre.org/techniques/T1078/",
}
],
},
# Deprecated technique — skipped
{
"type": "attack-pattern",
"id": "attack-pattern--t1190",
"name": "Exploit Public-Facing Application",
"x_mitre_deprecated": True,
"external_references": [
{"source_name": "mitre-attack", "external_id": "T1190"}
],
},
# Sub-technique of T1059
{
"type": "attack-pattern",
"id": "attack-pattern--t1059-001",
"name": "PowerShell",
"description": "Windows shell.",
"x_mitre_is_subtechnique": True,
"external_references": [
{
"source_name": "mitre-attack",
"external_id": "T1059.001",
"url": "https://attack.mitre.org/techniques/T1059/001/",
}
],
},
# Relationship attaching the sub to its parent
{
"type": "relationship",
"id": "relationship--rel1",
"relationship_type": "subtechnique-of",
"source_ref": "attack-pattern--t1059-001",
"target_ref": "attack-pattern--t1059",
},
],
}
@pytest.fixture()
def fixture_bundle_path(tmp_path: Path) -> Path:
path = tmp_path / "minimal-stix.json"
path.write_text(json.dumps(MINIMAL_BUNDLE))
return path
# === Parser unit tests =======================================================
def test_parser_extracts_active_objects(fixture_bundle_path):
parsed = mitre_svc.parse_bundle(fixture_bundle_path)
assert len(parsed.tactics) == 2 # TA0001 + TA0002 (TA0099 revoked)
assert {t["external_id"] for t in parsed.tactics} == {"TA0001", "TA0002"}
assert len(parsed.techniques) == 2 # T1059 + T1078 (T1190 deprecated)
assert {t["external_id"] for t in parsed.techniques} == {"T1059", "T1078"}
assert len(parsed.subtechniques) == 1
sb = parsed.subtechniques[0]
assert sb["external_id"] == "T1059.001"
assert sb["parent_stix_id"] == "attack-pattern--t1059"
# === Seed integration tests ==================================================
def test_seed_against_fixture(app, fixture_bundle_path):
result = mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
assert result.tactics_upserted == 2
assert result.techniques_upserted == 2
assert result.subtechniques_upserted == 1
assert result.subtechniques_skipped_orphan == 0
assert result.technique_tactic_links == 3 # T1059→TA0001, T1059→TA0002, T1078→TA0001
def test_seed_is_idempotent(app, fixture_bundle_path):
"""Running twice yields the same row counts and no SQL errors."""
first = mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
second = mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
assert (first.tactics_upserted, first.techniques_upserted, first.subtechniques_upserted) == (
second.tactics_upserted,
second.techniques_upserted,
second.subtechniques_upserted,
)
def test_seed_persists_setting(app, fixture_bundle_path):
"""settings table records the last sync timestamp + source URL."""
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
status = mitre_svc.read_status()
assert status["last_sync"] is not None
# We seeded from a local path so version is None and source_url is the path string.
assert status["source_url"] == str(fixture_bundle_path)
assert status["version"] is None # only set when source == MITRE_DEFAULT_URL
def test_checksum_mismatch_aborts(tmp_path, monkeypatch):
"""A wrong sha256 triggers MitreChecksumMismatch and skips DB writes.
We monkey-patch the allowlist to accept `file://` for the duration of the
test — file:// is rejected in production by `_ensure_host_allowed` (cf.
`test_seed_refuses_file_url`), but we need to drive `_download` past that
gate to exercise the sha256 path.
"""
path = tmp_path / "tiny.json"
path.write_text(json.dumps(MINIMAL_BUNDLE))
monkeypatch.setattr(mitre_svc, "_ensure_host_allowed", lambda _: None)
bogus = "0" * 64
with pytest.raises(mitre_svc.MitreChecksumMismatch):
mitre_svc._download(
f"file://{path}", tmp_path / "out.json", expected_sha256=bogus
)
# === API endpoint tests ======================================================
def test_list_tactics_requires_auth(app, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
r = c.get("/api/v1/mitre/tactics")
assert r.status_code == 401
def test_list_tactics_returns_seeded(app, admin_credentials, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
access = _login(c, admin_credentials["email"], admin_credentials["password"])
r = c.get(
"/api/v1/mitre/tactics", headers={"Authorization": f"Bearer {access}"}
)
assert r.status_code == 200
body = r.get_json()
assert body["total"] == 2
ids = [t["external_id"] for t in body["items"]]
assert "TA0001" in ids and "TA0002" in ids
def test_filter_techniques_by_tactic(app, admin_credentials, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
access = _login(c, admin_credentials["email"], admin_credentials["password"])
r = c.get(
"/api/v1/mitre/techniques?tactic=TA0002",
headers={"Authorization": f"Bearer {access}"},
)
assert r.status_code == 200
body = r.get_json()
# Only T1059 covers TA0002 (execution); T1078 covers initial-access only.
ext_ids = [t["external_id"] for t in body["items"]]
assert ext_ids == ["T1059"]
def test_subtechniques_listed_under_parent(app, admin_credentials, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
access = _login(c, admin_credentials["email"], admin_credentials["password"])
r = c.get(
"/api/v1/mitre/subtechniques?technique=T1059",
headers={"Authorization": f"Bearer {access}"},
)
assert r.status_code == 200
body = r.get_json()
ext_ids = [t["external_id"] for t in body["items"]]
assert ext_ids == ["T1059.001"]
def test_status_endpoint(app, admin_credentials, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
access = _login(c, admin_credentials["email"], admin_credentials["password"])
r = c.get("/api/v1/mitre/status", headers={"Authorization": f"Bearer {access}"})
assert r.status_code == 200
body = r.get_json()
assert body["last_sync"] is not None
assert body["default_url"].startswith("https://")
assert body["default_version"]
def test_sync_endpoint_requires_perm(app, admin_credentials, fixture_bundle_path):
"""A non-admin without mitre.sync gets 403."""
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
# Bootstrap a no-perm user via invitation.
admin_access = _login(c, admin_credentials["email"], admin_credentials["password"])
eve_email = _unique_email("eve")
inv = c.post(
"/api/v1/invitations",
headers={"Authorization": f"Bearer {admin_access}"},
json={"email_hint": eve_email},
)
token = inv.get_json()["token"]
c.post(
f"/api/v1/invitations/accept/{token}",
json={"email": eve_email, "password": "EvePass1234!"},
)
eve_access = _login(c, eve_email, "EvePass1234!")
r = c.post(
"/api/v1/mitre/sync", headers={"Authorization": f"Bearer {eve_access}"}
)
assert r.status_code == 403
def test_search_filter_on_name(app, admin_credentials, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
access = _login(c, admin_credentials["email"], admin_credentials["password"])
r = c.get(
"/api/v1/mitre/techniques?q=valid",
headers={"Authorization": f"Bearer {access}"},
)
assert r.status_code == 200
ext_ids = [t["external_id"] for t in r.get_json()["items"]]
assert ext_ids == ["T1078"]
def test_matrix_endpoint_returns_nested_grid(app, admin_credentials, fixture_bundle_path):
"""GET /mitre/matrix returns the flat tactic→technique→subtechnique grid."""
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
access = _login(c, admin_credentials["email"], admin_credentials["password"])
r = c.get("/api/v1/mitre/matrix", headers={"Authorization": f"Bearer {access}"})
assert r.status_code == 200
body = r.get_json()
tactics = body["tactics"]
assert {t["external_id"] for t in tactics} == {"TA0001", "TA0002"}
# TA0001 has T1059 (multi-tactic) + T1078; T1059 carries its sub.
ta0001 = next(t for t in tactics if t["external_id"] == "TA0001")
techs = {t["external_id"]: t for t in ta0001["techniques"]}
assert set(techs.keys()) == {"T1059", "T1078"}
assert techs["T1059"]["subtechniques"][0]["external_id"] == "T1059.001"
assert techs["T1078"]["subtechniques"] == []
# TA0002 only carries T1059 (no T1078).
ta0002 = next(t for t in tactics if t["external_id"] == "TA0002")
assert [t["external_id"] for t in ta0002["techniques"]] == ["T1059"]
def test_matrix_endpoint_requires_auth(app, fixture_bundle_path):
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
with app.test_client() as c:
assert c.get("/api/v1/mitre/matrix").status_code == 401
# === Security guards ==========================================================
def test_seed_refuses_file_url(tmp_path):
"""file:// (or any scheme outside the allowlist) is rejected — protects
against a privileged operator pivoting the in-container fetch to local
filesystem reads via the URL path."""
path = tmp_path / "bundle.json"
path.write_text(json.dumps(MINIMAL_BUNDLE))
with pytest.raises(mitre_svc.MitreSourceForbidden):
mitre_svc._download(f"file://{path}", tmp_path / "out.json")
def test_seed_refuses_disallowed_https_host(tmp_path):
"""An HTTPS URL outside MITRE_ALLOWED_HOSTS is rejected before any I/O.
Closes the SSRF surface (cloud metadata, internal mirrors)."""
with pytest.raises(mitre_svc.MitreSourceForbidden):
mitre_svc._download("https://attacker.example/bundle.json", tmp_path / "out.json")
def test_seed_refuses_custom_url_without_sha(tmp_path):
"""End-to-end refusal: even an allowlisted custom URL needs a sha or an
explicit allow_unverified=True."""
# Use the default URL with a different sha to simulate "custom" semantics
# without actually hitting the network: pass a different MITRE_DEFAULT_URL.
# The cleanest expression is to call seed_mitre with the same URL but no sha
# — but the default URL gets the default sha auto-set; we need to bypass.
with pytest.raises(mitre_svc.MitreSeedError):
mitre_svc.seed_mitre(
source="https://raw.githubusercontent.com/some-other-path/bundle.json",
expected_sha256=None,
allow_unverified=False,
)
def test_dotted_id_fallback_resolves_orphan_subtechnique(app, tmp_path):
"""When the STIX `subtechnique-of` relationship is missing, the parser
must fall back to the dotted convention (T1003.001 → T1003)."""
bundle = json.loads(json.dumps(MINIMAL_BUNDLE)) # deep copy
# Strip the relationship object so the parent_stix_id lookup fails.
bundle["objects"] = [o for o in bundle["objects"] if o.get("type") != "relationship"]
bundle_path = tmp_path / "no-rel.json"
bundle_path.write_text(json.dumps(bundle))
result = mitre_svc.seed_mitre(source=bundle_path, expected_sha256=None)
# The fallback resolves T1059.001 → T1059 via the dotted-id pattern,
# so the subtechnique is still attached (no orphan).
assert result.subtechniques_upserted == 1
assert result.subtechniques_skipped_orphan == 0
def test_seed_clears_version_when_source_is_not_default(app, fixture_bundle_path):
"""A custom source must NULL `mitre_version` so /mitre/status doesn't lie
about a stale upstream pin."""
# First seed from the default URL would set version=19.0; here we seed from
# a local file path, which should write version=None.
mitre_svc.seed_mitre(source=fixture_bundle_path, expected_sha256=None)
assert mitre_svc.read_status()["version"] is None

View File

@@ -38,6 +38,7 @@ services:
EVIDENCE_DIR: ${EVIDENCE_DIR}
volumes:
- metamorph_evidence:/data/evidence
- metamorph_mitre:/data/mitre
depends_on:
db:
condition: service_healthy
@@ -76,6 +77,7 @@ services:
volumes:
metamorph_db:
metamorph_evidence:
metamorph_mitre:
networks:
metamorph:

178
e2e/tests/m4-mitre.spec.ts Normal file
View File

@@ -0,0 +1,178 @@
import { expect, test, type APIRequestContext, type Page } from '@playwright/test';
/**
* M4 — MITRE ATT&CK Enterprise reference catalogue + tag picker.
*
* The seed itself (download + parse) is exercised by pytest with a small
* fixture bundle. This spec hits the live stack with the real, pinned bundle
* by calling `POST /mitre/sync` once and then validating the read endpoints
* + the picker UI.
*/
// crypto.randomUUID() guarantees uniqueness across parallel test runs; the
// Math.random() previous pattern could collide one-in-a-million in CI.
const ADMIN_EMAIL = `admin-${crypto.randomUUID().slice(0, 8)}@metamorph.local`;
const ADMIN_PASSWORD = 'AdminPass1234!';
async function resetAndMintToken(request: APIRequestContext): Promise<string> {
const r = await request.post('/api/v1/diag/reset');
expect(r.status()).toBe(200);
return (await r.json()).install_token as string;
}
async function loginAndGetAccess(
request: APIRequestContext,
email: string,
password: string,
): Promise<string> {
const r = await request.post('/api/v1/auth/login', { data: { email, password } });
expect(r.status()).toBe(200);
return (await r.json()).access_token as string;
}
async function loginViaSpa(page: Page, email: string, password: string) {
await page.goto('/login');
await page.getByLabel(/email/i).fill(email);
await page.getByLabel(/password/i).fill(password);
await page.getByRole('button', { name: /sign in/i }).click();
await expect(page.getByTestId('me-email')).toHaveText(email);
}
test.describe.configure({ mode: 'serial' });
test.describe('M4 — MITRE ATT&CK reference', () => {
test.beforeAll(async ({ request }) => {
const installToken = await resetAndMintToken(request);
const setup = await request.post('/api/v1/setup', {
data: { install_token: installToken, email: ADMIN_EMAIL, password: ADMIN_PASSWORD },
});
expect(setup.status()).toBe(201);
// Trigger a real sync against the pinned MITRE URL. Idempotent — if the
// mitre_* tables were left populated by a previous run, this is a no-op
// upsert.
const access = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD);
const sync = await request.post('/api/v1/mitre/sync', {
headers: { Authorization: `Bearer ${access}` },
});
expect(sync.status(), `mitre sync failed: ${await sync.text()}`).toBe(200);
const result = await sync.json();
// Pinned exactly to MITRE Enterprise v19.0 — bump alongside MITRE_VERSION
// in `app/services/mitre_seed.py` when the pin changes. Exact counts catch
// parser regressions that would silently include revoked/deprecated rows.
expect(result.tactics_upserted).toBe(15);
expect(result.techniques_upserted).toBe(222);
expect(result.subtechniques_upserted).toBe(475);
expect(result.subtechniques_skipped_orphan).toBe(0);
});
test('GET /mitre/tactics returns 14+ Enterprise tactics', async ({ request }) => {
const access = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD);
const r = await request.get('/api/v1/mitre/tactics', {
headers: { Authorization: `Bearer ${access}` },
});
expect(r.status()).toBe(200);
const body = await r.json();
expect(body.total).toBeGreaterThanOrEqual(14);
const ids = body.items.map((t: { external_id: string }) => t.external_id);
expect(ids).toContain('TA0001'); // Initial Access
expect(ids).toContain('TA0006'); // Credential Access
});
test('GET /mitre/techniques?tactic=TA0006 filters', async ({ request }) => {
const access = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD);
const r = await request.get('/api/v1/mitre/techniques?tactic=TA0006&limit=200', {
headers: { Authorization: `Bearer ${access}` },
});
expect(r.status()).toBe(200);
const body = await r.json();
expect(body.total).toBeGreaterThan(0);
// OS Credential Dumping is the textbook TA0006 example.
const ids = body.items.map((t: { external_id: string }) => t.external_id);
expect(ids).toContain('T1003');
});
test('GET /mitre/subtechniques?technique=T1003 lists 8 sub-techniques', async ({ request }) => {
const access = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD);
const r = await request.get('/api/v1/mitre/subtechniques?technique=T1003', {
headers: { Authorization: `Bearer ${access}` },
});
expect(r.status()).toBe(200);
const ids = (await r.json()).items.map((t: { external_id: string }) => t.external_id);
expect(ids).toContain('T1003.001'); // LSASS Memory
expect(ids.length).toBeGreaterThanOrEqual(5);
});
test('GET /mitre/status returns version + last_sync', async ({ request }) => {
const access = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD);
const r = await request.get('/api/v1/mitre/status', {
headers: { Authorization: `Bearer ${access}` },
});
expect(r.status()).toBe(200);
const body = await r.json();
expect(body.last_sync).toBeTruthy();
expect(body.default_url).toContain('mitre-attack');
expect(body.default_version).toBeTruthy();
});
test('SPA MITRE matrix renders + click cells to select technique + sub-technique', async ({ page }) => {
await loginViaSpa(page, ADMIN_EMAIL, ADMIN_PASSWORD);
await page.goto('/mitre');
// Status card shows a non-null last_sync.
await expect(page.getByTestId('mitre-last-sync')).not.toHaveText('never');
const picker = page.getByTestId('mitre-tag-picker');
await expect(picker).toBeVisible();
// The matrix has a column per tactic.
await expect(picker.getByTestId('mitre-column-TA0006')).toBeVisible();
// 1. Click the T1003 cell (Credential Dumping under TA0006) → technique selected.
const t1003 = picker.getByTestId('mitre-technique-T1003').first();
await t1003.scrollIntoViewIfNeeded();
await t1003.click();
await expect(page.getByTestId('mitre-selected')).toContainText('T1003');
await expect(t1003).toHaveAttribute('aria-pressed', 'true');
// 2. Expand T1003's sub-techniques inline via the +N chevron.
await picker.getByTestId('mitre-expand-T1003').first().click();
const sub = picker.getByTestId('mitre-subtechnique-T1003.001').first();
await expect(sub).toBeVisible();
// 3. Click the sub-technique → chip + JSON preview both update.
await sub.click();
await expect(page.getByTestId('mitre-selected')).toContainText('T1003.001');
await expect(page.getByTestId('mitre-selected-json')).toContainText('"T1003.001"');
// 4. Filter the matrix on "valid" → TA0006/T1003 are hidden but TA0001/T1078 visible.
await picker.getByLabel(/^filter$/i).fill('valid');
await expect(picker.getByTestId('mitre-technique-T1078').first()).toBeVisible();
});
test('Non-admin cannot trigger /mitre/sync', async ({ page, request }) => {
// Invite a no-perm user via the admin.
const adminAccess = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD);
const eveEmail = `eve-${crypto.randomUUID().slice(0, 8)}@metamorph.local`;
const inv = await request.post('/api/v1/invitations', {
headers: { Authorization: `Bearer ${adminAccess}` },
data: { email_hint: eveEmail },
});
const token = (await inv.json()).token;
await request.post(`/api/v1/invitations/accept/${token}`, {
data: { email: eveEmail, password: 'EvePass1234!' },
});
const eveAccess = await loginAndGetAccess(request, eveEmail, 'EvePass1234!');
const r = await request.post('/api/v1/mitre/sync', {
headers: { Authorization: `Bearer ${eveAccess}` },
});
expect(r.status()).toBe(403);
// The MITRE page is reachable in read-only mode for any logged-in user,
// but the Sync card is hidden for non-admins.
await loginViaSpa(page, eveEmail, 'EvePass1234!');
await page.goto('/mitre');
await expect(page.getByTestId('mitre-tag-picker')).toBeVisible();
await expect(page.getByTestId('mitre-sync')).toHaveCount(0);
});
});

View File

@@ -8,6 +8,7 @@ import { AdminGroupsPage } from '@/pages/AdminGroupsPage';
import { AdminInvitationsPage } from '@/pages/AdminInvitationsPage';
import { AdminUsersPage } from '@/pages/AdminUsersPage';
import { HomePage } from '@/pages/HomePage';
import { MitrePage } from '@/pages/MitrePage';
import { LoginPage } from '@/pages/LoginPage';
import { ProfilePage } from '@/pages/ProfilePage';
import { RegisterPage } from '@/pages/RegisterPage';
@@ -49,6 +50,14 @@ function App() {
</RequireAuth>
}
/>
<Route
path="/mitre"
element={
<RequireAuth>
<MitrePage />
</RequireAuth>
}
/>
<Route
path="/admin/users"
element={

View File

@@ -36,6 +36,7 @@ export function Layout() {
<>
{navItem('/', 'Home')}
{navItem('/profile', 'Profile')}
{navItem('/mitre', 'MITRE')}
{state.user.is_admin && (
<>
{navItem('/admin/users', 'Users')}
@@ -68,7 +69,7 @@ export function Layout() {
<Outlet />
<footer className="mt-[60px] py-8 border-t border-border text-center font-mono text-xs text-text-dim">
metamorph · M0 bootstrap · M1 db schema · M2 auth · M3 rbac · design system from tasks/design.md
metamorph · M0 bootstrap · M1 db schema · M2 auth · M3 rbac · M4 mitre · design system from tasks/design.md
</footer>
</div>
</div>

View File

@@ -0,0 +1,286 @@
import { useQuery } from '@tanstack/react-query';
import { useMemo, useState } from 'react';
import { Alert } from '@/components/ui/Alert';
import { Tag } from '@/components/ui/Tag';
import { TextField } from '@/components/ui/TextField';
import { apiGet } from '@/lib/api';
import { cn } from '@/lib/cn';
import {
mitreKeys,
type MatrixTechnique,
type MitreMatrix,
type MitreTag,
} from '@/lib/mitre';
interface MitreTagPickerProps {
/** Already-selected tags. The parent owns the state. */
value: MitreTag[];
/** Replace-style change handler — called with the new full selection. */
onChange: (next: MitreTag[]) => void;
className?: string;
}
function useMatrix() {
return useQuery({
queryKey: mitreKeys.matrix,
queryFn: () => apiGet<MitreMatrix>('/mitre/matrix'),
staleTime: 5 * 60_000,
});
}
/**
* Flat ATT&CK matrix in the attack.mitre.org/# style — 15 columns share the
* available width (no horizontal scroll on standard desktop). Cells show the
* technique NAME only; the external_id surfaces in the chips at the top and in
* the hover tooltip. A `▸/▾` chevron beside a technique expands its
* sub-techniques inline within the column.
*/
export function MitreTagPicker({ value, onChange, className }: MitreTagPickerProps) {
const matrix = useMatrix();
const [filter, setFilter] = useState('');
const [expanded, setExpanded] = useState<Set<string>>(new Set());
const filterNorm = filter.trim().toLowerCase();
// O(1) selection lookup.
const selectedKeys = useMemo(
() => new Set(value.map((t) => `${t.kind}:${t.external_id}`)),
[value],
);
function isSelected(kind: MitreTag['kind'], external_id: string): boolean {
return selectedKeys.has(`${kind}:${external_id}`);
}
function toggle(tag: MitreTag) {
const key = `${tag.kind}:${tag.external_id}`;
if (selectedKeys.has(key)) {
onChange(value.filter((t) => `${t.kind}:${t.external_id}` !== key));
} else {
onChange([...value, tag]);
}
}
function toggleExpand(techExtId: string) {
setExpanded((prev) => {
const next = new Set(prev);
if (next.has(techExtId)) next.delete(techExtId);
else next.add(techExtId);
return next;
});
}
function matches(t: MatrixTechnique): boolean {
if (!filterNorm) return true;
if (t.external_id.toLowerCase().includes(filterNorm)) return true;
if (t.name.toLowerCase().includes(filterNorm)) return true;
return t.subtechniques.some(
(sb) =>
sb.external_id.toLowerCase().includes(filterNorm) ||
sb.name.toLowerCase().includes(filterNorm),
);
}
return (
<div className={cn('rounded-lg border border-border bg-bg-card p-4', className)} data-testid="mitre-tag-picker">
{/* Selection chips */}
{value.length > 0 && (
<div className="mb-3 flex flex-wrap items-center gap-1" data-testid="mitre-selected">
{value.map((t) => (
<button
key={`${t.kind}:${t.external_id}`}
type="button"
onClick={() => toggle(t)}
className="inline-flex items-center"
aria-label={`Remove ${t.external_id}`}
>
<Tag accent={t.kind === 'tactic' ? 'cyan' : t.kind === 'technique' ? 'orange' : 'purple'}>
{t.external_id} · {t.name}
</Tag>
</button>
))}
</div>
)}
{/* Filter + counts */}
<div className="mb-3 flex items-end justify-between gap-3">
<TextField
label="Filter"
placeholder="external_id or name (e.g. TA0006, T1003, powershell)"
value={filter}
onChange={(e) => setFilter(e.target.value)}
className="max-w-md"
/>
{matrix.data && (
<span className="font-sans text-xs text-text-dim mb-2 whitespace-nowrap">
{matrix.data.tactics.length} tactics ·{' '}
{matrix.data.tactics.reduce((sum, t) => sum + t.techniques.length, 0)} mappings
</span>
)}
</div>
{matrix.isLoading && <p className="font-mono text-xs text-text-dim">Loading matrix</p>}
{matrix.isError && (
<Alert accent="red">Failed to load /mitre/matrix has `make seed-mitre` been run?</Alert>
)}
{matrix.data && (
<div
data-testid="mitre-matrix-scroll"
role="region"
aria-label="MITRE ATT&CK matrix"
/* `minmax(7rem, 1fr)` ensures every cell is wide enough for the
* longest single word in MITRE names (no mid-word breaks), and
* stretches to fill the container otherwise. Horizontal scroll only
* kicks in on narrow viewports below ~1680px. */
className="grid gap-px bg-border rounded overflow-x-auto"
style={{
gridTemplateColumns: `repeat(${matrix.data.tactics.length}, minmax(7rem, 1fr))`,
}}
>
{matrix.data.tactics.map((tactic) => {
const visible = tactic.techniques.filter(matches);
const tacticSel = isSelected('tactic', tactic.external_id);
return (
<div
key={tactic.id}
className="bg-bg-base flex flex-col min-w-0"
data-testid={`mitre-column-${tactic.external_id}`}
>
{/* Tactic header — name only (attack.mitre.org style) */}
<button
type="button"
onClick={() =>
toggle({
kind: 'tactic',
id: tactic.id,
external_id: tactic.external_id,
name: tactic.name,
})
}
className={cn(
'w-full text-left px-2 py-1.5 font-sans border-b transition',
tacticSel
? 'accent-fill-cyan border-cyan text-text-bright'
: 'bg-bg-card border-border hover:bg-cyan/10 text-text-bright',
)}
title={`${tactic.external_id}${tactic.name}`}
data-testid={`mitre-tactic-${tactic.external_id}`}
aria-pressed={tacticSel}
>
<div className="text-xs font-semibold leading-tight break-normal hyphens-none">
{tactic.name}
</div>
<div className="text-[10px] text-text-dim mt-0.5">
{tactic.techniques.length} technique{tactic.techniques.length === 1 ? '' : 's'}
</div>
</button>
{/* Techniques cells */}
<div className="flex flex-col">
{visible.length === 0 && filterNorm && (
<div className="px-2 py-1 font-sans text-[10px] text-text-dim italic">
(filtered out)
</div>
)}
{visible.map((tech) => {
const techSel = isSelected('technique', tech.external_id);
const isExpanded = expanded.has(tech.external_id);
const hasSubs = tech.subtechniques.length > 0;
return (
<div key={tech.id} className="border-b border-border last:border-b-0">
<div
className={cn(
'flex items-stretch text-xs',
techSel
? 'accent-fill-orange text-text-bright'
: 'hover:bg-orange/10',
)}
>
<button
type="button"
onClick={() =>
toggle({
kind: 'technique',
id: tech.id,
external_id: tech.external_id,
name: tech.name,
})
}
className="flex-1 min-w-0 text-left px-2 py-1.5 font-sans break-normal hyphens-none"
title={`${tech.external_id}${tech.name}`}
data-testid={`mitre-technique-${tech.external_id}`}
aria-pressed={techSel}
>
<span className="leading-tight">{tech.name}</span>
</button>
{hasSubs && (
<button
type="button"
onClick={() => toggleExpand(tech.external_id)}
className={cn(
'shrink-0 px-1 border-l text-[10px] font-mono leading-none',
techSel
? 'border-text-bright/30 text-text-bright hover:bg-text-bright/10'
: 'border-border text-purple hover:bg-purple/10',
)}
aria-label={`${isExpanded ? 'Collapse' : 'Expand'} ${tech.external_id} sub-techniques`}
aria-expanded={isExpanded}
data-testid={`mitre-expand-${tech.external_id}`}
title={`${tech.subtechniques.length} sub-technique${tech.subtechniques.length === 1 ? '' : 's'}`}
>
<span className="text-purple">{isExpanded ? '▾' : '▸'}</span>
<span className="ml-0.5 align-middle">{tech.subtechniques.length}</span>
</button>
)}
</div>
{isExpanded && hasSubs && (
<div className="bg-bg-card">
{tech.subtechniques.map((sb) => {
const subSel = isSelected('subtechnique', sb.external_id);
return (
<button
key={sb.id}
type="button"
onClick={() =>
toggle({
kind: 'subtechnique',
id: sb.id,
external_id: sb.external_id,
name: sb.name,
})
}
className={cn(
'block w-full text-left pl-5 pr-2 py-1 font-sans text-xs border-l-2 break-normal hyphens-none',
subSel
? 'accent-fill-purple border-purple text-text-bright'
: 'border-purple/30 hover:bg-purple/10',
)}
title={`${sb.external_id}${sb.name}`}
data-testid={`mitre-subtechnique-${sb.external_id}`}
aria-pressed={subSel}
>
<span className="leading-tight">{sb.name}</span>
</button>
);
})}
</div>
)}
</div>
);
})}
</div>
</div>
);
})}
</div>
)}
<p className="mt-3 font-sans text-[11px] text-text-dim">
Hover a cell for its <code className="font-mono text-purple">external_id</code>. Click a cell to toggle selection. Use <span className="text-purple font-mono"></span> to reveal sub-techniques inline. Click a chip above to remove.
</p>
</div>
);
}

105
frontend/src/lib/mitre.ts Normal file
View File

@@ -0,0 +1,105 @@
/** Shared types + query keys for MITRE ATT&CK browsing. */
export interface MitreTactic {
id: string;
external_id: string;
short_name: string;
name: string;
description: string | null;
url: string | null;
}
export interface MitreTechnique {
id: string;
external_id: string;
name: string;
description: string | null;
url: string | null;
tactics: Array<{ external_id: string; name: string }>;
}
export interface MitreSubtechnique {
id: string;
external_id: string;
name: string;
description: string | null;
url: string | null;
technique_id: string;
}
export interface Paginated<T> {
items: T[];
total: number;
limit: number;
offset: number;
}
export interface MitreStatus {
last_sync: string | null;
version: string | null;
source_url: string | null;
default_url: string;
default_version: string;
}
export type MitreTagKind = 'tactic' | 'technique' | 'subtechnique';
export interface MitreTag {
kind: MitreTagKind;
id: string;
external_id: string;
name: string;
}
// Query keys. `status` + `matrix` drive the M4 picker; the per-list factories
// (`tactics`/`techniques`/`subtechniques`) are unused today but the M5
// template forms will consume them for the standalone REST endpoints when
// users edit a single test's tags inline.
export const mitreKeys = {
status: ['mitre', 'status'] as const,
matrix: ['mitre', 'matrix'] as const,
tactics: (q?: string) => ['mitre', 'tactics', q ?? ''] as const,
techniques: (tactic?: string, q?: string) =>
['mitre', 'techniques', tactic ?? '', q ?? ''] as const,
subtechniques: (technique?: string, q?: string) =>
['mitre', 'subtechniques', technique ?? '', q ?? ''] as const,
};
export interface MatrixSubtechnique {
id: string;
external_id: string;
name: string;
}
export interface MatrixTechnique {
id: string;
external_id: string;
name: string;
subtechniques: MatrixSubtechnique[];
}
export interface MatrixTactic {
id: string;
external_id: string;
short_name: string;
name: string;
techniques: MatrixTechnique[];
}
export interface MitreMatrix {
tactics: MatrixTactic[];
}
/** Mirror of backend `SyncResultOut` (`api/mitre.py`). */
export interface MitreSyncResult {
tactics_upserted: number;
techniques_upserted: number;
subtechniques_upserted: number;
subtechniques_skipped_orphan: number;
technique_tactic_links: number;
version: string | null;
source: string;
started_at: string;
finished_at: string;
duration_ms: number;
}

View File

@@ -61,7 +61,7 @@ export function HomePage() {
<span className="text-purple">Purple Team Platform</span>
</h1>
<p className="font-mono text-sm font-light text-text-dim mt-2">
Collaborative red &amp; blue test orchestration M3 milestone (RBAC)
Collaborative red &amp; blue test orchestration M4 milestone (MITRE ATT&amp;CK)
</p>
</header>
<SectionHeader
@@ -141,9 +141,9 @@ export function HomePage() {
<Card accent="purple" title="Roadmap" sub="14 milestones">
<p>
M0 + M1 + M2 + M3 done. Next:{' '}
M0 + M1 + M2 + M3 + M4 done. Next:{' '}
<code className="accent-fill-cyan px-2 py-[2px] rounded-sm font-mono text-4xs">
M4 MITRE ATT&amp;CK
M5 Test &amp; scenario templates
</code>
.
</p>

View File

@@ -0,0 +1,137 @@
import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query';
import { useState } from 'react';
import { MitreTagPicker } from '@/components/MitreTagPicker';
import { Alert } from '@/components/ui/Alert';
import { Button } from '@/components/ui/Button';
import { Card } from '@/components/ui/Card';
import { SectionHeader } from '@/components/ui/SectionHeader';
import { Tag } from '@/components/ui/Tag';
import { ApiError, apiGet, apiPost } from '@/lib/api';
import { useAuth } from '@/lib/auth';
import {
mitreKeys,
type MitreStatus,
type MitreSyncResult,
type MitreTag,
} from '@/lib/mitre';
export function MitrePage() {
const { state } = useAuth();
const qc = useQueryClient();
const [selected, setSelected] = useState<MitreTag[]>([]);
const [syncResult, setSyncResult] = useState<string | null>(null);
const [syncError, setSyncError] = useState<string | null>(null);
const status = useQuery({
queryKey: mitreKeys.status,
queryFn: () => apiGet<MitreStatus>('/mitre/status'),
});
const sync = useMutation({
mutationFn: () => apiPost<MitreSyncResult>('/mitre/sync'),
onMutate: () => {
setSyncResult(null);
setSyncError(null);
},
onSuccess: async (res) => {
const counts = `${res.tactics_upserted} tactics, ${res.techniques_upserted} techniques, ${res.subtechniques_upserted} subtechniques`;
setSyncResult(`Sync completed in ${(res.duration_ms / 1000).toFixed(1)}s — ${counts}.`);
await qc.invalidateQueries({ queryKey: ['mitre'] });
},
onError: (e) => {
if (e instanceof ApiError) {
const p = e.payload as { error?: string; message?: string } | null;
setSyncError(p?.message ?? p?.error ?? `HTTP ${e.status}`);
} else {
setSyncError(e instanceof Error ? e.message : 'Sync failed');
}
},
});
return (
<>
<SectionHeader
prefix="MITRE"
highlight="ATT&CK"
accent="cyan"
description="Reference catalogue: tactics, techniques, sub-techniques. Tag picker reusable by tests and missions."
/>
<div className="grid gap-4 mb-6 [grid-template-columns:repeat(auto-fill,minmax(360px,1fr))]">
<Card accent="cyan" title="Source" sub={status.data?.version ? `version ${status.data.version}` : '—'}>
<p>
URL:{' '}
<code className="accent-fill-cyan px-2 py-[2px] rounded-sm font-mono text-4xs break-all">
{status.data?.source_url ?? status.data?.default_url ?? '—'}
</code>
</p>
<p className="mt-2">
Last sync:{' '}
<code
className="accent-fill-cyan px-2 py-[2px] rounded-sm font-mono text-4xs"
data-testid="mitre-last-sync"
>
{status.data?.last_sync ?? 'never'}
</code>
</p>
</Card>
{state.user?.is_admin && (
<Card accent="orange" title="Sync" sub="re-pull the bundle from the configured source">
<p className="text-2xs text-text-dim mb-3">
The seed CLI runs the same logic; trigger from here only if the URL is reachable from the api container.
</p>
<Button
accent="orange"
onClick={() => sync.mutate()}
disabled={sync.isPending}
data-testid="mitre-sync"
>
{sync.isPending ? 'Syncing…' : 'Trigger MITRE sync'}
</Button>
{syncResult && <Alert accent="green" className="mt-3">{syncResult}</Alert>}
{syncError && <Alert accent="red" className="mt-3">{syncError}</Alert>}
</Card>
)}
</div>
<SectionHeader prefix="Tag" highlight="Picker" accent="orange" />
{/* Full-bleed the matrix beyond max-w-page so it uses the full viewport
* width. `calc(50% - 50vw)` is the canonical CSS recipe: the element's
* left edge slides back to viewport x=0 regardless of how big the
* outer max-w-page container is. `px-[60px]` mirrors the page padding
* so cells don't touch the viewport edge. */}
<div
className="px-[60px]"
style={{
marginLeft: 'calc(50% - 50vw)',
marginRight: 'calc(50% - 50vw)',
width: '100vw',
}}
>
<MitreTagPicker value={selected} onChange={setSelected} />
</div>
{selected.length > 0 && (
<Card accent="purple" className="mt-6" title="Selected (preview payload)">
<pre className="text-2xs text-text-bright whitespace-pre-wrap" data-testid="mitre-selected-json">
{JSON.stringify(selected, null, 2)}
</pre>
</Card>
)}
{selected.length === 0 && (
<p className="mt-3 font-mono text-2xs text-text-dim">
Click any cell to toggle. Use <span className="text-purple"></span> to reveal sub-techniques inline. Hover a cell for its <code className="text-purple">external_id</code>. Selections accumulate.
</p>
)}
<div className="mt-4">
<Tag accent="cyan">Tactic</Tag>
<Tag accent="orange">Technique</Tag>
<Tag accent="purple">Sub-technique</Tag>
</div>
</>
);
}

View File

@@ -41,6 +41,17 @@ project: Metamorph
- **Le test d'intégration "expected tables/FK/CHECK" est le bon filet de sécurité** pour M1+ : il a immédiatement attrapé les fixes du reviewer (le retrait de `ck_mission_test_mitre_tags_exactly_one_mitre_fk` aurait été un oubli silencieux sinon).
- **Lancer le DoD avant de dire "M1 done"** : règle gravée à M0, respectée ici. `make clean && make up && make migrate && make test-api && make e2e` est la séquence canonique de fin de milestone.
## 2026-05-12 — M4 MITRE ATT&CK
- **STIX parsing avec stdlib uniquement** (`urllib.request` + `json` + `hashlib`) suffit pour 50 MB de bundle, ~1.1 s parse end-to-end. Pas besoin de `requests`/`httpx`. Toute future ingestion de gros JSON pinné → stdlib first, ne pas inflater l'image pour un cas d'usage one-shot.
- **Le sous-jacent MITRE évolue** : la spec mentionne "14 tactics" mais la v19 actuelle en ship 15 (Reconnaissance + Resource Development depuis v8). Les assertions de DoD sont à exprimer en `>= X` quand un référentiel externe est en jeu, pas en `== X`. Pattern : décorréler la valeur exacte du contrat (sinon la maintenance casse au prochain bump).
- **Sub-technique parent resolution** : la source authoritative est la `relationship[subtechnique-of]` STIX, pas la convention dotted-id `T1003.001 → T1003`. La regex en fallback ne sert que si la relation manque (jamais le cas avec MITRE officiel, mais utile pour bundles custom).
- **`session_scope()` enveloppe tout le seed dans une seule transaction** → les lecteurs externes ne voient jamais un état intermédiaire pendant le DELETE+INSERT de `mitre_technique_tactics`. Postgres READ COMMITTED isole. Pas besoin d'advisory lock sauf si on s'attend à des syncs concurrents.
- **Checksum bypass silencieux = footgun**. Avant : `source != MITRE_DEFAULT_URL``expected_sha256 = None`. Un admin qui type un domaine attaquant dans `mitre_source_url` ingère du JSON arbitraire sans intégrité. Patron correct : `MitreSeedError("custom URL requires an expected_sha256 or allow_unverified=True")`. L'opt-out explicite (`--skip-checksum` côté CLI, `allow_unverified: true` côté API) reste possible mais visible.
- **`/diag/reset` cohérence** : si on TRUNCATE `settings` mais pas les tables de référence MITRE (gardées car coûteuses à re-seeder), `GET /mitre/status` retourne `last_sync: null` alors que `GET /mitre/tactics` retourne 15 lignes. Discrepancy mensongère. Fix : TRUNCATE aussi les `mitre_*` dans `/diag/reset` (test-only endpoint, on accepte la re-sync via `/mitre/sync` en `beforeAll`).
- **Volume permissions et chown au build** : `mkdir -p /data/mitre && chown -R metamorph:metamorph /data` dans le Dockerfile suffit POUR le premier `make up` (podman copie l'ownership de l'image lors de l'init du named volume). Mais si un volume préexiste owned root, le chown ne replay pas. À documenter en pré-requis dans `tasks/testing-m<N>.md`, ou ajouter un entrypoint shim qui valide les perms au boot.
- **Build cache du front silencieux** : `podman build` montre `Using cache aad724...` même quand src/ a changé si le diff entre les arborescences est invisible (mtime). En cas de doute : `podman build --no-cache` une fois pour confirmer que le typecheck passe, puis `make down && make up` pour pousser le bundle. Réflexe à garder en mémoire.
## 2026-05-11 — M3 RBAC, groupes, users, invitations
- **`logging.LogRecord` réserve `name`** comme attribut interne (en plus de `message`, `levelname`, `pathname`, `filename`, `module`, `funcName`, `lineno`, `asctime`, `process`, `thread`, `args`). Donc `log.info("metamorph.x.created", extra={"name": entity.name})` lève `KeyError: "Attempt to overwrite 'name' in LogRecord"`. Patron : préfixer toute clé risquée par l'entité (`group_name`, `user_name`, `template_name`). À documenter dans le style guide quand on en aura un.

View File

@@ -79,7 +79,14 @@ Remplace l'aller-retour Excel actuel par une UI partagée temps réel, multi-rô
## 5. Exigences fonctionnelles
- **F1** — Gestion users/groupes/invitations par admin avec permissions atomiques (familles listées en §4).
- **F2** — CRUD tests unitaires templates avec MITRE ATT&CK (Tactic+Technique+Sub-technique multi), procédure markdown/code, prérequis, résultat attendu red, détection attendue blue, niveau OPSEC (low/med/high), tags libres, IOCs attendus.
- **F2** — CRUD tests unitaires templates avec MITRE ATT&CK (Tactic+Technique+Sub-technique multi), procédure markdown/code, prérequis, résultat attendu red, détection attendue blue, niveau OPSEC (low/med/high), tags libres, IOCs attendus. **Représentation UI du picker MITRE** : matrice flat fidèle à `attack.mitre.org/#`
- **Full-bleed** : le picker s'étend sur toute la largeur du viewport (s'échappe du `max-w-page` global du layout) pour exposer un maximum de cellules sans scroll.
- **15 colonnes** equal-width via `grid-template-columns: repeat(N, minmax(7rem, 1fr))` ; scroll horizontal seulement en dernier recours sur viewport étroit (<≈1680px).
- **Wrap word-only** : `overflow-wrap: normal` + `hyphens: none` — les noms cassent uniquement sur les espaces, jamais au milieu d'un mot.
- **Headers** = nom de la tactic seul + compteur de techniques en 10px ; l'`external_id` `TA00xx` n'apparaît qu'au hover (title) et dans les chips de sélection.
- **Cellules** = nom de la technique seul (idem pour `T1xxx` au hover) ; chevron `▸ N / ▾ N` qui déplie inline les sub-techniques dans la colonne.
- **Police** sans-serif uniforme `text-xs` (12px) pour cells + headers, `10px` pour les sub-counts.
- **Click** sur une cellule = (dé)sélection ; selection multi-niveaux (tactic / technique / sub-technique) cumulative ; chips de sélection en haut avec `external_id · name` cliquables pour retirer.
- **F3** — CRUD scénarios = liste ordonnée (drag-and-drop) de tests unitaires.
- **F4** — CRUD missions (métadonnées §4) composées d'un ou plusieurs scénarios, snapshot des templates à l'instanciation.
- **F5** — Saisie côté red : commande lancée, output texte, commentaires markdown, statut, timestamp auto+override.
@@ -89,7 +96,7 @@ Remplace l'aller-retour Excel actuel par une UI partagée temps réel, multi-rô
- **F9** — Export mission : JSON complet (API + UI), CSV agrégé.
- **F10** — Soft delete + purge admin.
- **F11** — Switch i18n FR/EN par utilisateur (préférence persistée).
- **F12** — Sync MITRE ATT&CK Enterprise : dataset STIX embarqué (seed) + job admin manuel pour re-puller depuis github.com/mitre/cti.
- **F12** — Sync MITRE ATT&CK Enterprise : dataset STIX embarqué (seed) + job admin manuel pour re-puller depuis github.com/mitre/cti. Le picker (cf. F2) se base sur un endpoint `GET /mitre/matrix` qui retourne la grille complète (tactics → techniques → sub-techniques) en un seul appel.
## 6. Exigences non fonctionnelles
- **NF-perf** : UI fluide, pagination côté API au-delà de 50 éléments par liste, lazy-loading des fichiers de preuves.
@@ -137,7 +144,7 @@ Remplace l'aller-retour Excel actuel par une UI partagée temps réel, multi-rô
1. Premier boot : token d'install affiché dans les logs, accès `/setup` permet de créer le 1er admin.
2. Admin crée un groupe custom et lui attribue des permissions atomiques (write_red_fields uniquement, par exemple).
3. Admin envoie un lien d'invitation, l'utilisateur s'enregistre avec succès et hérite des bons groupes.
4. Admin importe la matrice MITRE et crée un test unitaire avec Tactic+Technique+Sub-technique.
4. Admin importe la matrice MITRE et crée un test unitaire avec Tactic+Technique+Sub-technique. *(≥ 14 tactics Enterprise — la v19 du pin actuel en ship 15.)*
5. Admin compose un scénario de 3 tests ordonnés via drag-and-drop.
6. Red Teamer crée une mission avec scénario, assigne 1 red + 1 blue.
7. Red Teamer marque un test « exécuté », saisit commande+output, timestamp auto capturé.

116
tasks/testing-m4.md Normal file
View File

@@ -0,0 +1,116 @@
---
type: testing
milestone: M4
date: "2026-05-12"
project: Metamorph
---
# Testing M4 — MITRE ATT&CK Enterprise
## 1. Lancement de la stack
```bash
make clean # reset si une stack tournait
make up # build + start db/api/front
make migrate
make seed-mitre # télécharge le bundle pinné v19.0 (~50 MB, ~1 s parse)
```
> **Permissions volume** : `metamorph_mitre` est créé chowné `metamorph:metamorph` par le Dockerfile à la 1ʳᵉ initialisation. Si tu as un volume préexistant (d'une expé antérieure) appartenant à root, le seed échouera avec `PermissionError`. Solution : `podman volume rm metamorph_metamorph_mitre` avant `make up`.
## 2. Tests automatisés
```bash
make test-api # 58 tests pytest dont 19 MITRE (parser, idempotence, security guards, all endpoints, dotted fallback, version clearing)
make e2e # 34 tests Playwright dont 6 M4
```
Le rapport HTML est dans `e2e/playwright-report/`, le JUnit dans `e2e/playwright-report/junit.xml`.
## 3. Procédure manuelle (smoke navigateur)
### Pré-requis
- Stack up, migrations appliquées, `make seed-mitre` exécuté.
- Le bundle est cache dans le volume `metamorph_mitre` (`/data/mitre/enterprise-attack-19.0.json`). Pour ré-utiliser un fichier local : `flask --app app.cli metamorph seed-mitre --source /chemin/vers/enterprise-attack.json`.
### 3.1 Page MITRE (`/mitre`)
1. Se connecter en admin.
2. Cliquer **MITRE** dans la nav → page chargée.
3. Carte **Source** : vérifier `version 19.0` + URL pinnée + `Last sync` non vide.
4. Carte **Sync** (admin uniquement) : cliquer **Trigger MITRE sync** → bannière verte avec counts (15 tactics / 222 techniques / 475 subtechniques).
5. **Picker — matrice flat type attack.mitre.org** :
- La matrice tient sur la largeur de la page sans scroll horizontal (15 colonnes de largeur égale, partagent l'espace dispo).
- Chaque header de colonne montre **seulement le nom de la tactic** (ex. `Credential Access`) + `17 techniques` en petit dessous. L'`external_id` (TA0006) apparaît au hover (title).
- Click sur le header **Credential Access** → toute la colonne est sélectionnée (chip cyan en haut, header en cyan filled).
- Re-click pour désélectionner.
- Les cellules affichent **uniquement le nom de la technique** (ex. `OS Credential Dumping`). L'`external_id` (T1003) apparaît au hover (title) et dans le chip de sélection.
- Cliquer la cellule **OS Credential Dumping** → cellule en orange filled, chip `T1003 · OS Credential Dumping` en haut.
- Cliquer le chevron `▸ 8` à droite de la cellule → la liste des sub-techniques se déploie inline dans la même colonne, chevron passe à `▾ 8`.
- Cliquer **LSASS Memory** (sub-technique) → cell purple filled, chip `T1003.001 · LSASS Memory`.
- Click le chip pour le retirer.
- La carte « Selected (preview payload) » sous la matrice montre le JSON cumulatif avec les `external_id`.
### 3.2 Filtre
1. Taper `dump` dans le champ **Filter** → seules T1003 + sub-techniques restent visibles, les autres techniques sont cachées (mais leurs colonnes restent visibles pour préserver la grille).
2. Taper `TA0006` → idem mais filtre par `external_id`.
3. Vider le filtre → toutes les cellules réapparaissent.
### 3.3 Non-admin
1. Inviter un user sans perms via Admin > Invitations.
2. Se connecter en tant que ce user.
3. Naviguer sur `/mitre` → page accessible, picker fonctionnel (read-only).
4. La carte **Sync** n'apparaît PAS (UI gate `is_admin`).
5. Tenter `POST /api/v1/mitre/sync` via curl avec son token → **403** `insufficient permissions`.
### 3.4 Re-sync admin
```bash
ACCESS=$(curl -sX POST http://localhost:8080/api/v1/auth/login \
-H 'Content-Type: application/json' \
-d '{"email":"admin@metamorph.local","password":"AdminPass1234!"}' | jq -r .access_token)
curl -sX POST http://localhost:8080/api/v1/mitre/sync \
-H "Authorization: Bearer $ACCESS" | jq
```
Sortie attendue :
```json
{
"tactics_upserted": 15,
"techniques_upserted": 222,
"subtechniques_upserted": 475,
"subtechniques_skipped_orphan": 0,
"technique_tactic_links": 254,
"version": "19.0",
"duration_ms": ~1000
}
```
### 3.5 Sync via URL custom
```bash
curl -sX POST http://localhost:8080/api/v1/mitre/sync \
-H "Authorization: Bearer $ACCESS" \
-H 'Content-Type: application/json' \
-d '{"source":"https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/enterprise-attack/enterprise-attack-18.1.json"}' | jq
```
- Avec une URL ≠ pinned : sha256 désactivé, `version` est `null` (on ne connaît pas la version d'un fichier custom).
### 3.6 Mode air-gap
1. Préparer un STIX 2.1 valide localement : `enterprise-attack-19.0.json`.
2. Le copier dans le volume :
```bash
podman cp enterprise-attack-19.0.json metamorph-api:/data/mitre/
```
3. Lancer le seed pointé sur le path : `podman compose exec api flask --app app.cli metamorph seed-mitre --source /data/mitre/enterprise-attack-19.0.json --skip-checksum`
## 4. Points de contrôle critiques
- [x] `make seed-mitre` initial pinné v19.0 → 15/222/475 sans orphans.
- [x] Re-lancer le seed est idempotent (mêmes counts).
- [x] `/mitre/tactics` retourne 15 tactics (la spec mentionne 14 — MITRE en a 15 depuis v8).
- [x] `/mitre/techniques?tactic=TA0006` retourne ≥ 17 techniques incl. T1003.
- [x] `/mitre/subtechniques?technique=T1003` retourne 8 sub-techniques.
- [x] `/mitre/status` expose `last_sync`, `version`, `source_url`, `default_url`, `default_version`.
- [x] `/mitre/sync` exige la perm `mitre.sync` (admin via bypass `is_admin`).
- [x] Sha256 mismatch sur la pinned URL → 502 `checksum_mismatch`, DB intacte.
- [x] Bundle local (`--source <path>`) bypasse la vérif checksum.
- [x] Picker SPA : matrice flat attack.mitre.org-style — 15 colonnes equal-width sans scroll horizontal, cellules avec **name only** (external_id au hover + dans chips), chevron `▸ N / ▾ N` → sub-techniques inline, chips multi-niveaux en haut.
- [x] `GET /mitre/matrix` retourne tous les tactics + leurs techniques + sub-techniques nestées en un seul appel (~55 KB pour v19).
- [x] Non-admin : voit la page `/mitre` mais pas la carte Sync ; `POST /mitre/sync` → 403.

View File

@@ -101,7 +101,7 @@ spec: tasks/spec.md
---
## M4 — MITRE ATT&CK Enterprise
## M4 — MITRE ATT&CK Enterprise
**But** : le référentiel ATT&CK est interrogeable et tagué sur les tests.
@@ -111,7 +111,7 @@ spec: tasks/spec.md
- ☐ Endpoint `POST /mitre/sync` (perm `mitre.sync`) qui re-pull depuis l'URL configurée (setting `mitre_source_url`).
- ☐ Persister `mitre_last_sync` dans `settings`.
- ☐ Endpoint `GET /mitre/tactics`, `/mitre/techniques?tactic=…`, `/mitre/subtechniques?technique=…` (pagination + recherche full-text simple sur `name`).
- ☐ Front : composant `<MitreTagPicker>` (autocomplete combiné Tactic > Technique > Sub-technique, multi-select).
- ☐ Front : composant `<MitreTagPicker>` — matrice flat type `attack.mitre.org/#` (colonnes = tactics, cellules = techniques, chevron `+N` qui déplie les sub-techniques inline). Click = (dé)sélection, multi-niveaux cumulatif, chips en haut, recherche par `external_id` ou `name`. Alimenté par `GET /mitre/matrix` (one-shot, ~55 KB).
**DoD** : après `make seed-mitre`, `GET /mitre/tactics` retourne 14 tactics Enterprise ; le picker permet de tagger un test avec « TA0002 / T1059.001 » et l'enregistrement est persistant.