Compare commits

...

2 Commits

Author SHA1 Message Date
Knacky
e1b51db25f fix(m6): post-review pass — cache prefix, snapshot lock, perm-before-parse, LIKE escape
Addresses spec-reviewer + code-reviewer feedback on the M6 bundle:

Critical:
- frontend/src/lib/missions.ts: add `listPrefix()` so TanStack invalidation
  catches every filtered list variant; the previous `list()` returned
  `['missions','list',{}]` and only matched the exact empty-filter cache,
  leaving filtered tables stale after create/transition/delete.
- backend/app/services/missions.py: acquire the same per-scenario
  `pg_advisory_xact_lock` key used by `set_scenario_tests` before
  snapshotting; without it a concurrent M5 reorder could freeze a torn
  snapshot under READ COMMITTED. Sorted by key to avoid deadlocks with
  another snapshotter.

Important:
- backend/app/api/missions.py: `@require_perm("mission.update",
  "mission.archive")` on the transition endpoint so users without either
  perm get 403 before the body is parsed (no shape leak via 400).
- backend/app/services/missions.py: escape `%` / `_` / `\` in user-typed
  `q` / `client` LIKE search; users can no longer trigger wildcard
  semantics by typing literal `%`. Added `escape='\\'` arg on every .like().
- backend/app/services/missions.py: filter `MissionTest.deleted_at` and
  `MissionScenario.deleted_at` in the list-item and detail counts so M7+
  soft-deletes don't drift the totals silently.

Nits:
- backend/app/api/users.py: order `/users/roster` by email for stable
  rendering + deterministic e2e selectors.
- frontend/src/pages/MissionDetailPage.tsx: distinct accent per
  transition target (cyan/orange/green/teal) matching the status legend.
- e2e/tests/m6-missions.spec.ts: switch fragile `getByRole(name=/In
  Progress/i)` to the stable `mission-transition-in_progress` data-testid.

New tests:
- test_create_mission_rejects_soft_deleted_scenario
- test_transition_perm_gate_runs_before_payload_parse
- test_search_treats_wildcards_as_literals

Suite: 106 pytest passing (was 103), 43 Playwright passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 15:14:57 +02:00
Knacky
00b7557e30 feat(m6): missions + snapshot CRUD, membership visibility, status state machine
Adds the mission layer that materialises template snapshots, plus the SPA
list / 3-step wizard / detail page.

Backend:
- app/services/missions.py — create_mission snapshots scenarios, tests, MITRE
  tags in a 4-query write; list/get apply a non-admin membership filter that
  collapses to 404 (no existence leak); status state machine enforces
  draft → in_progress → completed → archived with archived as a sink; the
  non-admin creator is auto-added as role_hint='red' to retain visibility.
- app/api/missions.py — 8 endpoints (list, get, create, update, add
  scenarios, set members, transition, soft-delete) with strict pydantic
  schemas. The transition endpoint splits the perm gate manually so
  archive requires mission.archive while other targets use mission.update.
- app/api/users.py — new GET /users/roster returning (id, email,
  display_name) only, gated by user.read OR mission.create OR
  mission.update — lets non-admin wizard users see assignable peers
  without exposing the admin /users payload.
- app/api/diag.py — /diag/reset truncates the mission_* tables before the
  template tables because the source_*_template_id FKs are ON DELETE SET
  NULL, which is cheaper to short-circuit by removing the children first.

Frontend:
- lib/missions.ts — typed client, queryKey factory, status accent map.
- pages/MissionsListPage.tsx — list cards with status accent + filters
  (q, client, status).
- pages/MissionsCreatePage.tsx — 3-step wizard (meta → scenarios → members)
  with member roster fed by /users/roster.
- pages/MissionDetailPage.tsx — header + transition buttons (legal next
  states only) + Tests/Members/Synthesis/Export tabs.
- Routes + nav entry (visible to anyone with mission.read or admin).

Tests:
- backend/tests/test_missions.py — 22 pytest covering snapshot fidelity,
  MITRE propagation, membership visibility, transition state machine,
  perm gating, member set replace, append scenarios, soft-delete, partial
  update, inverted-date rejection.
- e2e/tests/m6-missions.spec.ts — 5 Playwright (snapshot freezing, non-admin
  visibility, status transitions + 409, SPA wizard end-to-end, list filter).

Docs:
- CHANGELOG, tasks/testing-m6.md, tasks/lessons.md (snapshot tradeoffs,
  membership=404 pattern, /diag/reset order, auto-creator add).
- README + tasks/todo.md updated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 15:07:32 +02:00
18 changed files with 3845 additions and 4 deletions

View File

@@ -4,6 +4,40 @@ All notable changes to this project will be documented here. Format: [Keep a Cha
## [Unreleased] ## [Unreleased]
### Fixed (post-M6 review pass — spec-reviewer + code-reviewer)
- **SPA cache invalidation only refreshed the empty-filter list** (`frontend/src/lib/missions.ts:136`): `missionKeys.list()` returns `['missions','list',{}]`. TanStack v5's `invalidateQueries({queryKey})` is prefix-based, but `{}` is treated as an atomic final element — so create / transition / delete called with that key only invalidated the *exact* empty-filter list, leaving any filtered variant stale until manual refetch. Added `missionKeys.listPrefix()` returning `['missions','list']` and switched all three mutation `onSuccess` paths to it.
- **Snapshot lacked the per-scenario advisory lock** (`backend/app/services/missions.py:467`): a concurrent `PUT /scenario-templates/{id}/tests` (M5 reorder, which deletes-then-reinserts join rows) running while `_snapshot_scenarios` walked `sc.tests` could freeze a torn snapshot — `selectinload` re-queries under READ COMMITTED so a partial view was possible. Added `_lock_scenario_ids_for_snapshot` that acquires the same `pg_advisory_xact_lock` key used by `set_scenario_tests` (blake2b digest of the scenario UUID, sorted to avoid deadlocks). Snapshot and reorder now serialise per scenario.
- **Transition endpoint leaked its body shape via 400 before the perm gate** (`backend/app/api/missions.py:441`): a user without `mission.update` or `mission.archive` POSTing `{"status":"x"}` got a Pydantic 400 instead of 403. Added `@require_perm("mission.update", "mission.archive")` so the gate fires before the parse; the inner refinement still enforces the per-target perm. Test `test_transition_perm_gate_runs_before_payload_parse`.
- **LIKE wildcards in user-typed search were honoured as SQL wildcards** (`backend/app/services/missions.py:632,637`): `?q=%` matched every mission. Added `_escape_like` that pre-escapes `%`, `_`, `\` and a matching `escape='\\'` argument on every `.like(...)` call. Test `test_search_treats_wildcards_as_literals`.
- **Counts ignored soft-deleted mission children** (`backend/app/services/missions.py:587,597`): `tests_count` and the detail view summed `len(sc.tests)` without filtering `MissionTest.deleted_at`. Harmless today (M6 doesn't soft-delete mission tests), but would drift silently once M7+ surfaces `state=skipped/blocked`. Added the filter in both `_to_list_item` and `_scenario_views`.
- **`/users/roster` was unordered** (`backend/app/api/users.py:73`): the wizard's member list shuffled rows on every refetch. Sorted by `email` for predictable rendering + stable e2e selectors.
- **Frontend transition button accent collapsed `in_progress` and `completed` into one colour** (`frontend/src/pages/MissionDetailPage.tsx:97`): both rendered cyan, so the status legend in the list didn't match the transition button. Added a `TRANSITION_BUTTON_ACCENT` map mirroring `MISSION_STATUS_ACCENT` (cyan/orange/green/teal).
- **Soft-deleted source scenario was a silent foot-gun**: `_load_scenario_templates_for_snapshot` already rejected it, but no test pinned the behaviour. Added `test_create_mission_rejects_soft_deleted_scenario` so future refactors can't regress to "freeze a tombstoned scenario into a fresh mission".
- **E2E wizard assertion used `getByRole('button', { name: /In Progress/i })`** (`e2e/tests/m6-missions.spec.ts:287`): the accessible name is `→ In Progress` and the arrow Unicode is brittle. Switched to `getByTestId('mission-transition-in_progress')`.
### Added — M6 (Missions & snapshot)
- **CRUD `missions`** (`app/services/missions.py` + `app/api/missions.py`):
- Fields: name, client_target, date_start, date_end, status (`draft/in_progress/completed/archived`), description (markdown), visibility_mode (frozen to `whitebox` in v1).
- On creation/append, the service **snapshots** the selected `scenario_templates` and all their `test_templates` into `mission_scenarios` / `mission_tests` (every template field — including OPSEC level, tags, expected IOCs, MITRE tags). The denormalised `mission_test_mitre_tags` table copies `external_id`, `name`, `url` so a later MITRE re-sync that drops the entry can't alter a mission's tags (spec §11).
- `source_*_template_id` FKs survive template soft-deletes (`ON DELETE SET NULL`); the mission's frozen content is unaffected.
- **Membership visibility**: non-admin viewers see only missions where they are a `mission_members` row. The service maps "not visible" → 404 (no existence leak via 403). Admins bypass via the `admin` group.
- **Status state machine**: `draft → in_progress → completed → archived`; `archived → ∅`. The transition endpoint accepts the target status, validates the move, and rejects invalid jumps with 409. Idempotent (target=current) is a no-op 200.
- Auto-creator-membership: a non-admin caller of `POST /missions` is auto-added as `role_hint='red'` if not already in the `members[]` payload — so they retain visibility on the mission they just created.
- REST: `GET/POST /missions`, `GET/PUT/DELETE /missions/{id}`, `POST /missions/{id}/scenarios` (append snapshots at the end), `PUT /missions/{id}/members` (replace set), `POST /missions/{id}/transition`.
- Filters on list: `q` (LIKE on name/description), `status`, `client` (LIKE on client_target). `include_deleted=true` is admin-only (403 otherwise).
- **`GET /users/roster`** (`app/api/users.py`): a deliberately minimal listing — `id`, `email`, `display_name` of active users only — accessible to any holder of `user.read`, `mission.create`, or `mission.update`. Lets a non-admin red teamer populate the wizard's member picker without exposing the admin-grade `/users` endpoint (which leaks `is_admin`, `is_active`, group memberships).
- **Frontend**:
- `lib/missions.ts` — typed client + queryKey factory + status accent map + filter query-string builder.
- `pages/MissionsListPage.tsx` — list cards (one per mission) with status accent, scenario/test/member counts, date range, plus filters (q, client, status).
- `pages/MissionsCreatePage.tsx`**3-step wizard**: metadata → scenario picker → member roster (red/blue toggles + auto-include the non-admin creator). Submits via `POST /missions` and redirects to the detail page.
- `pages/MissionDetailPage.tsx` — header with transition buttons (only the legal next states are rendered), soft-delete with confirm prompt, and 4 tabs: **Tests** (table of snapshotted tests with MITRE tags, OPSEC, state), **Members** (role-coloured pills), **Synthesis** (placeholder for M10), **Export** (placeholder for M11).
- Nav adds **Missions** link visible to anyone with `mission.read` or admin.
- **/diag/reset** truncates the mission tables before the template tables — `mission_scenarios.source_scenario_template_id` and `mission_tests.source_test_template_id` are `ON DELETE SET NULL`, so wiping missions first avoids the round-trip through the null-update path.
- **Testing**:
- `backend/tests/test_missions.py`**22 pytest** covering snapshot fidelity (rename source template after snapshot → mission unchanged), MITRE tag propagation, membership-based 404, perm gating (create vs read vs archive), status transition chain + invalid jumps (409), member set replace + role-hint validation, scenario append at correct position, soft-delete, partial metadata update, inverted-date rejection, admin-only `include_deleted`.
- `e2e/tests/m6-missions.spec.ts`**5 Playwright** (snapshot freezing, membership visibility for non-admin red, status transition + 409, SPA wizard end-to-end, SPA list + status filter).
- `tasks/testing-m6.md`.
### Added — M5 (Test & scenario templates) ### Added — M5 (Test & scenario templates)
- **CRUD `test_templates`** (`app/services/test_templates.py` + `app/api/test_templates.py`): - **CRUD `test_templates`** (`app/services/test_templates.py` + `app/api/test_templates.py`):
- Fields: name, description, objective, procedure (markdown), prerequisites (markdown), expected result red, expected detection blue, OPSEC level (`low/medium/high`), free tags (TEXT[]), expected IOCs (TEXT[]). - Fields: name, description, objective, procedure (markdown), prerequisites (markdown), expected result red, expected detection blue, OPSEC level (`low/medium/high`), free tags (TEXT[]), expected IOCs (TEXT[]).

View File

@@ -12,6 +12,7 @@ Collaborative purple-team platform. Red team logs the tests they execute (proced
- **RBAC (M3+)**: atomic permissions (31 codes) bundled into custom groups; 3 system groups seeded (`admin` / `redteam` / `blueteam`). - **RBAC (M3+)**: atomic permissions (31 codes) bundled into custom groups; 3 system groups seeded (`admin` / `redteam` / `blueteam`).
- **MITRE ATT&CK (M4+)**: Enterprise reference catalogue pinned to v19.0, seedable via `make seed-mitre`. - **MITRE ATT&CK (M4+)**: Enterprise reference catalogue pinned to v19.0, seedable via `make seed-mitre`.
- **Template catalogue (M5+)**: reusable `test_templates` (markdown procedure, OPSEC level, free tags, expected IOCs, MITRE tags) + ordered `scenario_templates` with drag-and-drop reordering. Admin pages at `/admin/tests` and `/admin/scenarios`. - **Template catalogue (M5+)**: reusable `test_templates` (markdown procedure, OPSEC level, free tags, expected IOCs, MITRE tags) + ordered `scenario_templates` with drag-and-drop reordering. Admin pages at `/admin/tests` and `/admin/scenarios`.
- **Missions (M6+)**: `missions` snapshot one or more scenario templates at creation time; template edits don't drift live missions (`mission_*` tables freeze every field, including MITRE tags). Non-admin members see only their own missions (membership filter, 404 on existence-leak attempts). Status state machine `draft → in_progress → completed → archived`, archive perm gated separately. SPA: list/filter at `/missions`, 3-step create wizard at `/missions/new`, detail page with Tests / Members / Synthesis / Export tabs.
- **Delivery**: docker-compose. TLS termination is expected to be handled by an external reverse proxy in production. - **Delivery**: docker-compose. TLS termination is expected to be handled by an external reverse proxy in production.
## Quickstart ## Quickstart
@@ -94,7 +95,7 @@ See `.env.example`. The most important ones:
## Testing ## Testing
- **Manual + automated checklist for the current milestone**: see [`tasks/testing-m<N>.md`](tasks/testing-m4.md) (current: `testing-m4.md`). - **Manual + automated checklist for the current milestone**: see [`tasks/testing-m<N>.md`](tasks/testing-m6.md) (current: `testing-m6.md`).
- **Backend unit tests**: `make test-api` - **Backend unit tests**: `make test-api`
- **End-to-end (Playwright)**: `make e2e-install` (once), then `make up && make e2e`. Reports land in `e2e/playwright-report/` (HTML + JUnit XML); open with `make e2e-report`. - **End-to-end (Playwright)**: `make e2e-install` (once), then `make up && make e2e`. Reports land in `e2e/playwright-report/` (HTML + JUnit XML); open with `make e2e-report`.
@@ -136,7 +137,7 @@ The hooks run `ruff` + `ruff-format` on the backend and `eslint` / `tsc --noEmit
## Roadmap ## Roadmap
See `tasks/todo.md`. Current milestone: **M4 — MITRE ATT&CK Enterprise**. Next: M5 (test & scenario templates). See `tasks/todo.md`. Current milestone: **M6 — Missions & snapshot** (done). Next: M7 (red/blue execution on a mission test).
## License ## License

View File

@@ -73,6 +73,20 @@ def reset_test_state():
"user_groups, settings, groups RESTART IDENTITY CASCADE" "user_groups, settings, groups RESTART IDENTITY CASCADE"
) )
) )
# Mission catalogue reset (M6). Truncated before the template tables
# below because `mission_scenarios.source_scenario_template_id` and
# `mission_tests.source_test_template_id` are ON DELETE SET NULL — a
# cascade-truncate of templates would attempt to null those columns
# and stall on the constraint check. Wiping the mission tables first
# avoids that round-trip; cascades from `missions` then take care of
# members, scenarios, tests, mitre_tags, categories.
conn.execute(
text(
"TRUNCATE mission_test_mitre_tags, mission_tests, "
"mission_scenarios, mission_categories, mission_members, "
"missions RESTART IDENTITY CASCADE"
)
)
# Template catalogue reset (M5). The MITRE truncate below cascades to # Template catalogue reset (M5). The MITRE truncate below cascades to
# the polymorphic tag join, but the template rows themselves must be # the polymorphic tag join, but the template rows themselves must be
# wiped first because `scenario_template_tests.test_template_id` is # wiped first because `scenario_template_tests.test_template_id` is

498
backend/app/api/missions.py Normal file
View File

@@ -0,0 +1,498 @@
"""Missions API.
Per spec §4: a non-admin user can only see (or edit) missions they are a
member of. The decorator stack here gates the *action type* by permission
code; the service layer applies the membership filter. Both layers fail
closed.
Status transitions are routed through a single POST endpoint that accepts a
target status. We accept either `mission.update` or `mission.archive` at the
gate — archiving requires the dedicated perm if the target is `archived`, and
the service enforces the lifecycle graph (`_VALID_TRANSITIONS`).
"""
from __future__ import annotations
import logging
import uuid
from datetime import date
from typing import Any
from flask import Blueprint, abort, g, jsonify, request
from pydantic import BaseModel, Field, ValidationError
from app.core.auth_decorators import AuthenticatedUser, require_auth, require_perm
from app.services import missions as svc
bp = Blueprint("missions", __name__, url_prefix="/missions")
log = logging.getLogger("metamorph.api.missions")
# --------------------------------------------------------------------------- #
# Payloads
# --------------------------------------------------------------------------- #
class MemberPayload(BaseModel):
user_id: uuid.UUID
role_hint: str = Field(min_length=1, max_length=8)
model_config = {"extra": "forbid"}
class CreateMissionPayload(BaseModel):
name: str = Field(min_length=1, max_length=255)
client_target: str | None = Field(default=None, max_length=255)
date_start: date | None = None
date_end: date | None = None
description_md: str | None = Field(default=None, max_length=20_000)
scenario_template_ids: list[uuid.UUID] = Field(default_factory=list, max_length=64)
members: list[MemberPayload] = Field(default_factory=list, max_length=128)
model_config = {"extra": "forbid"}
class UpdateMissionPayload(BaseModel):
name: str | None = Field(default=None, min_length=1, max_length=255)
client_target: str | None = Field(default=None, max_length=255)
date_start: date | None = None
date_end: date | None = None
description_md: str | None = Field(default=None, max_length=20_000)
model_config = {"extra": "forbid"}
class AddScenariosPayload(BaseModel):
scenario_template_ids: list[uuid.UUID] = Field(default_factory=list, max_length=64)
model_config = {"extra": "forbid"}
class SetMembersPayload(BaseModel):
members: list[MemberPayload] = Field(default_factory=list, max_length=128)
model_config = {"extra": "forbid"}
class TransitionPayload(BaseModel):
status: str = Field(min_length=1, max_length=16)
model_config = {"extra": "forbid"}
# --------------------------------------------------------------------------- #
# Serialisers
# --------------------------------------------------------------------------- #
def _serialize_member(m: svc.MissionMemberView) -> dict[str, Any]:
return {
"user_id": str(m.user_id),
"user_email": m.user_email,
"user_display_name": m.user_display_name,
"role_hint": m.role_hint,
}
def _serialize_mitre_tag(tag: svc.MissionMitreTagView) -> dict[str, Any]:
return {
"kind": tag.kind,
"external_id": tag.external_id,
"name": tag.name,
"url": tag.url,
}
def _serialize_test(t: svc.MissionTestView) -> dict[str, Any]:
return {
"id": str(t.id),
"position": t.position,
"snapshot_name": t.snapshot_name,
"snapshot_description": t.snapshot_description,
"snapshot_objective": t.snapshot_objective,
"snapshot_procedure_md": t.snapshot_procedure_md,
"snapshot_prerequisites_md": t.snapshot_prerequisites_md,
"snapshot_expected_red_md": t.snapshot_expected_red_md,
"snapshot_expected_blue_md": t.snapshot_expected_blue_md,
"snapshot_opsec_level": t.snapshot_opsec_level,
"snapshot_tags": t.snapshot_tags,
"snapshot_expected_iocs": t.snapshot_expected_iocs,
"state": t.state,
"executed_at": t.executed_at.isoformat() if t.executed_at else None,
"executed_at_overridden": t.executed_at_overridden,
"mitre_tags": [_serialize_mitre_tag(tag) for tag in t.mitre_tags],
"source_test_template_id": (
str(t.source_test_template_id) if t.source_test_template_id else None
),
}
def _serialize_scenario(sc: svc.MissionScenarioView) -> dict[str, Any]:
return {
"id": str(sc.id),
"position": sc.position,
"snapshot_name": sc.snapshot_name,
"snapshot_description": sc.snapshot_description,
"tests": [_serialize_test(t) for t in sc.tests],
"source_scenario_template_id": (
str(sc.source_scenario_template_id)
if sc.source_scenario_template_id
else None
),
}
def _serialize_list_item(m: svc.MissionListItemView) -> dict[str, Any]:
return {
"id": str(m.id),
"name": m.name,
"client_target": m.client_target,
"date_start": m.date_start.isoformat() if m.date_start else None,
"date_end": m.date_end.isoformat() if m.date_end else None,
"status": m.status,
"description_md": m.description_md,
"visibility_mode": m.visibility_mode,
"scenarios_count": m.scenarios_count,
"tests_count": m.tests_count,
"members_count": m.members_count,
"deleted_at": m.deleted_at.isoformat() if m.deleted_at else None,
"created_at": m.created_at.isoformat(),
"updated_at": m.updated_at.isoformat(),
}
def _serialize_detail(m: svc.MissionView) -> dict[str, Any]:
base = {
"id": str(m.id),
"name": m.name,
"client_target": m.client_target,
"date_start": m.date_start.isoformat() if m.date_start else None,
"date_end": m.date_end.isoformat() if m.date_end else None,
"status": m.status,
"description_md": m.description_md,
"visibility_mode": m.visibility_mode,
"scenarios_count": m.scenarios_count,
"tests_count": m.tests_count,
"members_count": m.members_count,
"deleted_at": m.deleted_at.isoformat() if m.deleted_at else None,
"created_at": m.created_at.isoformat(),
"updated_at": m.updated_at.isoformat(),
}
base["scenarios"] = [_serialize_scenario(sc) for sc in m.scenarios]
base["members"] = [_serialize_member(mb) for mb in m.members]
return base
# --------------------------------------------------------------------------- #
# Helpers
# --------------------------------------------------------------------------- #
def _parse_uuid_or_400(raw: str) -> uuid.UUID | None:
try:
return uuid.UUID(raw)
except ValueError:
return None
def _pagination_args() -> tuple[int, int] | tuple[None, tuple[int, str]]:
try:
limit = int(request.args.get("limit", "100"))
offset = int(request.args.get("offset", "0"))
except ValueError:
return None, (400, "invalid_pagination")
return max(1, min(limit, 500)), max(0, offset)
def _current_user() -> AuthenticatedUser:
user: AuthenticatedUser | None = getattr(g, "current_user", None)
if user is None:
abort(401, description="not authenticated")
assert user is not None # for Pyright; abort raises HTTPException
return user
def _to_assignments(payload_members: list[MemberPayload]) -> list[svc.MemberAssignment]:
return [
svc.MemberAssignment(user_id=m.user_id, role_hint=m.role_hint)
for m in payload_members
]
# --------------------------------------------------------------------------- #
# Endpoints
# --------------------------------------------------------------------------- #
@bp.get("")
@require_auth
@require_perm("mission.read")
def list_missions():
paging = _pagination_args()
if paging[0] is None:
return jsonify({"error": paging[1][1]}), paging[1][0]
limit, offset = paging
q = request.args.get("q") or None
status = request.args.get("status") or None
client = request.args.get("client") or None
include_deleted = request.args.get("include_deleted", "false").lower() == "true"
user = _current_user()
if include_deleted and not user.is_admin:
return jsonify({"error": "forbidden"}), 403
try:
items, total = svc.list_missions(
viewer_id=user.id,
viewer_is_admin=user.is_admin,
q=q,
status=status,
client=client,
include_deleted=include_deleted,
limit=limit,
offset=offset,
)
except ValueError as e:
return jsonify({"error": "invalid_request", "message": str(e)}), 400
return jsonify(
{
"items": [_serialize_list_item(it) for it in items],
"total": total,
"limit": limit,
"offset": offset,
}
)
@bp.get("/<mission_id>")
@require_auth
@require_perm("mission.read")
def get_mission(mission_id: str):
mid = _parse_uuid_or_400(mission_id)
if mid is None:
return jsonify({"error": "invalid_id"}), 400
include_deleted = request.args.get("include_deleted", "false").lower() == "true"
user = _current_user()
if include_deleted and not user.is_admin:
return jsonify({"error": "forbidden"}), 403
try:
view = svc.get_mission(
mid,
viewer_id=user.id,
viewer_is_admin=user.is_admin,
include_deleted=include_deleted,
)
except svc.MissionNotFound:
return jsonify({"error": "not_found"}), 404
return jsonify(_serialize_detail(view))
@bp.post("")
@require_auth
@require_perm("mission.create")
def create_mission():
try:
payload = CreateMissionPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400
user = _current_user()
try:
view = svc.create_mission(
name=payload.name,
creator_id=user.id,
creator_is_admin=user.is_admin,
client_target=payload.client_target,
date_start=payload.date_start,
date_end=payload.date_end,
description_md=payload.description_md,
scenario_template_ids=list(payload.scenario_template_ids),
members=_to_assignments(payload.members),
)
except svc.UnknownScenarioTemplate as e:
return jsonify({"error": "unknown_scenario_template", "message": str(e)}), 400
except svc.UnknownUser as e:
return jsonify({"error": "unknown_user", "message": str(e)}), 400
except svc.InvalidMemberPayload as e:
return jsonify({"error": "invalid_member", "message": str(e)}), 400
except ValueError as e:
return jsonify({"error": "invalid_request", "message": str(e)}), 400
log.info(
"metamorph.mission.created",
extra={
"mission_id": str(view.id),
"scenarios": view.scenarios_count,
"tests": view.tests_count,
"members": view.members_count,
},
)
return jsonify(_serialize_detail(view)), 201
@bp.put("/<mission_id>")
@require_auth
@require_perm("mission.update")
def update_mission(mission_id: str):
mid = _parse_uuid_or_400(mission_id)
if mid is None:
return jsonify({"error": "invalid_id"}), 400
raw = request.get_json(silent=True) or {}
try:
payload = UpdateMissionPayload.model_validate(raw)
except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400
# Distinguish "not provided" from "explicitly null" by looking at the raw body.
kwargs: dict[str, Any] = {}
if "name" in raw and payload.name is not None:
kwargs["name"] = payload.name
if "client_target" in raw:
kwargs["client_target"] = payload.client_target
if "date_start" in raw:
kwargs["date_start"] = payload.date_start
if "date_end" in raw:
kwargs["date_end"] = payload.date_end
if "description_md" in raw:
kwargs["description_md"] = payload.description_md
user = _current_user()
try:
view = svc.update_mission_metadata(
mid,
viewer_id=user.id,
viewer_is_admin=user.is_admin,
**kwargs,
)
except svc.MissionNotFound:
return jsonify({"error": "not_found"}), 404
except ValueError as e:
return jsonify({"error": "invalid_request", "message": str(e)}), 400
return jsonify(_serialize_detail(view))
@bp.post("/<mission_id>/scenarios")
@require_auth
@require_perm("mission.update")
def add_scenarios(mission_id: str):
mid = _parse_uuid_or_400(mission_id)
if mid is None:
return jsonify({"error": "invalid_id"}), 400
try:
payload = AddScenariosPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400
user = _current_user()
try:
view = svc.add_scenarios_to_mission(
mid,
list(payload.scenario_template_ids),
viewer_id=user.id,
viewer_is_admin=user.is_admin,
)
except svc.MissionNotFound:
return jsonify({"error": "not_found"}), 404
except svc.UnknownScenarioTemplate as e:
return jsonify({"error": "unknown_scenario_template", "message": str(e)}), 400
log.info(
"metamorph.mission.scenarios_added",
extra={
"mission_id": str(mid),
"added": len(payload.scenario_template_ids),
},
)
return jsonify(_serialize_detail(view))
@bp.put("/<mission_id>/members")
@require_auth
@require_perm("mission.update")
def set_members(mission_id: str):
mid = _parse_uuid_or_400(mission_id)
if mid is None:
return jsonify({"error": "invalid_id"}), 400
try:
payload = SetMembersPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400
user = _current_user()
try:
view = svc.set_mission_members(
mid,
_to_assignments(payload.members),
viewer_id=user.id,
viewer_is_admin=user.is_admin,
)
except svc.MissionNotFound:
return jsonify({"error": "not_found"}), 404
except svc.UnknownUser as e:
return jsonify({"error": "unknown_user", "message": str(e)}), 400
except svc.InvalidMemberPayload as e:
return jsonify({"error": "invalid_member", "message": str(e)}), 400
return jsonify(_serialize_detail(view))
@bp.post("/<mission_id>/transition")
@require_auth
@require_perm("mission.update", "mission.archive")
def transition(mission_id: str):
"""Status transition. The outer decorator gates the endpoint on holding
EITHER `mission.update` or `mission.archive` — so a request with neither
perm sees 403 before its body is even parsed (no shape leak via 400).
The inner refinement then enforces the per-target rule: `mission.archive`
is required when the target is `archived`; `mission.update` covers the
other transitions. Admins bypass via the decorator's `is_admin` check.
"""
mid = _parse_uuid_or_400(mission_id)
if mid is None:
return jsonify({"error": "invalid_id"}), 400
try:
payload = TransitionPayload.model_validate(request.get_json(silent=True) or {})
except ValidationError as e:
return jsonify({"error": "invalid_request", "details": e.errors()}), 400
user = _current_user()
required = "mission.archive" if payload.status == "archived" else "mission.update"
if not user.is_admin and required not in user.permissions:
log.info(
"metamorph.auth.permission_denied",
extra={
"user_id": str(user.id),
"required": [required],
"had": sorted(user.permissions),
},
)
return jsonify({"error": "forbidden"}), 403
try:
view = svc.transition_mission_status(
mid,
payload.status,
viewer_id=user.id,
viewer_is_admin=user.is_admin,
)
except svc.MissionNotFound:
return jsonify({"error": "not_found"}), 404
except svc.InvalidTransition as e:
return jsonify({"error": "invalid_transition", "message": str(e)}), 409
except ValueError as e:
return jsonify({"error": "invalid_request", "message": str(e)}), 400
log.info(
"metamorph.mission.transitioned",
extra={"mission_id": str(mid), "status": view.status},
)
return jsonify(_serialize_detail(view))
@bp.delete("/<mission_id>")
@require_auth
@require_perm("mission.delete")
def soft_delete_mission(mission_id: str):
mid = _parse_uuid_or_400(mission_id)
if mid is None:
return jsonify({"error": "invalid_id"}), 400
user = _current_user()
try:
svc.soft_delete_mission(
mid,
viewer_id=user.id,
viewer_is_admin=user.is_admin,
)
except svc.MissionNotFound:
return jsonify({"error": "not_found"}), 404
log.info("metamorph.mission.soft_deleted", extra={"mission_id": str(mid)})
return jsonify({"ok": True})

View File

@@ -56,6 +56,37 @@ def _parse_uuid_or_400(raw: str):
return None return None
@bp.get("/roster")
@require_auth
@require_perm("user.read", "mission.create", "mission.update")
def list_roster():
"""Minimal user list for mission member assignment.
Returns only `id`, `email`, `display_name` of active, non-deleted users.
Accessible to anyone who can create or update a mission — strictly lighter
than `GET /users`, which leaks `is_admin` (via groups), `is_active`, and
group memberships and is therefore reserved to `user.read`.
"""
q = request.args.get("q") or None
rows = users_svc.list_users(q=q, is_active=True, limit=200, offset=0)[0]
# Sort by email for predictable rendering and stable e2e selectors.
return jsonify(
{
"items": [
{
"id": str(u.id),
"email": u.email,
"display_name": u.display_name,
}
for u in sorted(
(u for u in rows if u.deleted_at is None),
key=lambda x: x.email,
)
]
}
)
@bp.get("") @bp.get("")
@require_auth @require_auth
@require_perm("user.read") @require_perm("user.read")

View File

@@ -9,6 +9,7 @@ from app.api.diag import bp as diag_bp
from app.api.groups import bp as groups_bp from app.api.groups import bp as groups_bp
from app.api.health import bp as health_bp from app.api.health import bp as health_bp
from app.api.invitations import bp as invitations_bp from app.api.invitations import bp as invitations_bp
from app.api.missions import bp as missions_bp
from app.api.mitre import bp as mitre_bp from app.api.mitre import bp as mitre_bp
from app.api.permissions import bp as permissions_bp from app.api.permissions import bp as permissions_bp
from app.api.scenario_templates import bp as scenario_templates_bp from app.api.scenario_templates import bp as scenario_templates_bp
@@ -28,3 +29,4 @@ bp.register_blueprint(permissions_bp)
bp.register_blueprint(mitre_bp) bp.register_blueprint(mitre_bp)
bp.register_blueprint(test_templates_bp) bp.register_blueprint(test_templates_bp)
bp.register_blueprint(scenario_templates_bp) bp.register_blueprint(scenario_templates_bp)
bp.register_blueprint(missions_bp)

View File

@@ -0,0 +1,944 @@
"""Mission CRUD + snapshot service.
A mission is a *materialised* run of one or more scenario templates: when the
mission is created (or scenarios are appended later), the service copies the
template rows into `mission_scenarios` / `mission_tests` / `mission_test_mitre_tags`
verbatim. Editing the source templates afterwards does not touch the mission —
that's the snapshot contract from spec §11.
Visibility rule (spec §4, last bullet): a non-admin user can only see a mission
they are a member of. The decorator layer enforces *which type of action* is
allowed (perm codes); this service enforces *which mission* is visible.
"""
from __future__ import annotations
import hashlib
import uuid
from dataclasses import dataclass
from datetime import date, datetime, timezone
from typing import Any, Iterable
from sqlalchemy import func, or_, select, text
from sqlalchemy.orm import Session, selectinload
from app.db.session import session_scope
from app.db.types import (
MISSION_ROLE_HINTS,
MISSION_STATUSES,
)
from app.models.auth import User
from app.models.mission import (
Mission,
MissionMember,
MissionScenario,
MissionTest,
MissionTestMitreTag,
)
from app.models.mitre import MitreSubtechnique, MitreTactic, MitreTechnique
from app.models.template import (
ScenarioTemplate,
TestTemplate,
TestTemplateMitreTag,
)
_UNSET: Any = object()
# Status transition graph. A target status that's not in the source's set is
# rejected as InvalidTransition. `archived` is a one-way sink (un-archiving
# would require an explicit restore endpoint, out of M6 scope).
_VALID_TRANSITIONS: dict[str, frozenset[str]] = {
"draft": frozenset({"in_progress", "archived"}),
"in_progress": frozenset({"completed", "archived"}),
"completed": frozenset({"archived"}),
"archived": frozenset(),
}
# --------------------------------------------------------------------------- #
# Exceptions
# --------------------------------------------------------------------------- #
class MissionNotFound(Exception):
"""Mission missing, soft-deleted, or not visible to the viewer."""
class UnknownScenarioTemplate(Exception):
pass
class UnknownUser(Exception):
pass
class InvalidTransition(Exception):
pass
class InvalidMemberPayload(Exception):
pass
# --------------------------------------------------------------------------- #
# Views (detached dataclasses — safe to return after session_scope exits)
# --------------------------------------------------------------------------- #
@dataclass(frozen=True)
class MemberAssignment:
"""Inbound member spec. The service resolves the user and validates the hint."""
user_id: uuid.UUID
role_hint: str
@dataclass(frozen=True)
class MissionMemberView:
user_id: uuid.UUID
user_email: str
user_display_name: str | None
role_hint: str
@dataclass(frozen=True)
class MissionMitreTagView:
kind: str
external_id: str
name: str
url: str | None
@dataclass(frozen=True)
class MissionTestView:
id: uuid.UUID
position: int
snapshot_name: str
snapshot_description: str | None
snapshot_objective: str | None
snapshot_procedure_md: str | None
snapshot_prerequisites_md: str | None
snapshot_expected_red_md: str | None
snapshot_expected_blue_md: str | None
snapshot_opsec_level: str
snapshot_tags: list[str]
snapshot_expected_iocs: list[str]
state: str
executed_at: datetime | None
executed_at_overridden: bool
mitre_tags: list[MissionMitreTagView]
source_test_template_id: uuid.UUID | None
@dataclass(frozen=True)
class MissionScenarioView:
id: uuid.UUID
position: int
snapshot_name: str
snapshot_description: str | None
tests: list[MissionTestView]
source_scenario_template_id: uuid.UUID | None
@dataclass(frozen=True)
class MissionListItemView:
id: uuid.UUID
name: str
client_target: str | None
date_start: date | None
date_end: date | None
status: str
description_md: str | None
visibility_mode: str
scenarios_count: int
tests_count: int
members_count: int
deleted_at: datetime | None
created_at: datetime
updated_at: datetime
@dataclass(frozen=True)
class MissionView:
id: uuid.UUID
name: str
client_target: str | None
date_start: date | None
date_end: date | None
status: str
description_md: str | None
visibility_mode: str
scenarios_count: int
tests_count: int
members_count: int
deleted_at: datetime | None
created_at: datetime
updated_at: datetime
scenarios: list[MissionScenarioView]
members: list[MissionMemberView]
# --------------------------------------------------------------------------- #
# Helpers
# --------------------------------------------------------------------------- #
def _opt_str(value: str | None) -> str | None:
if value is None:
return None
v = value.strip()
return v or None
def _normalize_name(value: str) -> str:
name = (value or "").strip()
if not name:
raise ValueError("name is required")
if len(name) > 255:
raise ValueError("name must be ≤ 255 characters")
return name
def _validate_dates(date_start: date | None, date_end: date | None) -> None:
if date_start and date_end and date_end < date_start:
raise ValueError("date_end must be on or after date_start")
def _validate_status(value: str) -> str:
if value not in MISSION_STATUSES:
raise ValueError(f"status must be one of {MISSION_STATUSES}")
return value
def _validate_role_hint(value: str) -> str:
if value not in MISSION_ROLE_HINTS:
raise InvalidMemberPayload(f"role_hint must be one of {MISSION_ROLE_HINTS}")
return value
def _escape_like(raw: str) -> str:
"""Escape LIKE wildcards so user-typed `%` / `_` / `\\` stay literal."""
return raw.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
def _lock_scenario_ids_for_snapshot(s: Session, scenario_ids: list[uuid.UUID]) -> None:
"""Acquire a per-scenario `pg_advisory_xact_lock` for every source scenario
we're about to snapshot.
Why: a concurrent admin invoking `set_scenario_tests(scenario_id)` (M5)
deletes-then-reinserts the `scenario_template_tests` join rows mid-transaction.
Under READ COMMITTED, `_snapshot_scenarios` could observe a partial view
(selectinload re-queries) and freeze a torn snapshot. Sharing the same lock
key as `app.services.scenario_templates.set_scenario_tests` makes the
snapshot wait until the reorder commits (and vice versa).
The lock keys are derived deterministically from the scenario UUIDs via
blake2b (cf. lessons: `hash()` is randomised per-worker). We sort the keys
before acquiring to avoid deadlocks with another snapshotter that holds
them in a different order.
"""
if not scenario_ids:
return
keys: list[int] = []
for sid in scenario_ids:
digest = hashlib.blake2b(sid.bytes, digest_size=8).digest()
keys.append(int.from_bytes(digest, "big", signed=True))
for key in sorted(keys):
s.execute(
text("SELECT pg_advisory_xact_lock(CAST(:key AS bigint))"),
{"key": key},
)
def _is_member(s: Session, mission_id: uuid.UUID, viewer_id: uuid.UUID) -> bool:
return (
s.scalar(
select(func.count())
.select_from(MissionMember)
.where(
MissionMember.mission_id == mission_id,
MissionMember.user_id == viewer_id,
)
)
or 0
) > 0
def _membership_filter(viewer_id: uuid.UUID):
"""SQL predicate restricting to missions where viewer_id is a member."""
return Mission.id.in_(
select(MissionMember.mission_id).where(MissionMember.user_id == viewer_id)
)
def _load_users_map(s: Session, ids: Iterable[uuid.UUID]) -> dict[uuid.UUID, User]:
ids_list = [i for i in ids]
if not ids_list:
return {}
rows = s.scalars(
select(User).where(User.id.in_(ids_list), User.deleted_at.is_(None))
).all()
return {u.id: u for u in rows}
# --------------------------------------------------------------------------- #
# MITRE denormalisation
# --------------------------------------------------------------------------- #
def _collect_mitre_ids(
tag_rows: Iterable[TestTemplateMitreTag],
) -> tuple[set[uuid.UUID], set[uuid.UUID], set[uuid.UUID]]:
tactic_ids: set[uuid.UUID] = set()
technique_ids: set[uuid.UUID] = set()
sub_ids: set[uuid.UUID] = set()
for tag in tag_rows:
if tag.mitre_kind == "tactic" and tag.tactic_id is not None:
tactic_ids.add(tag.tactic_id)
elif tag.mitre_kind == "technique" and tag.technique_id is not None:
technique_ids.add(tag.technique_id)
elif tag.mitre_kind == "subtechnique" and tag.subtechnique_id is not None:
sub_ids.add(tag.subtechnique_id)
return tactic_ids, technique_ids, sub_ids
def _resolve_mitre_lookup(
s: Session,
tactic_ids: set[uuid.UUID],
technique_ids: set[uuid.UUID],
sub_ids: set[uuid.UUID],
) -> tuple[
dict[uuid.UUID, MitreTactic],
dict[uuid.UUID, MitreTechnique],
dict[uuid.UUID, MitreSubtechnique],
]:
"""Batch-load all MITRE rows referenced by a snapshot in 3 queries."""
tactic_map: dict[uuid.UUID, MitreTactic] = {}
technique_map: dict[uuid.UUID, MitreTechnique] = {}
sub_map: dict[uuid.UUID, MitreSubtechnique] = {}
if tactic_ids:
tactic_map = {
r.id: r
for r in s.scalars(
select(MitreTactic).where(MitreTactic.id.in_(tactic_ids))
).all()
}
if technique_ids:
technique_map = {
r.id: r
for r in s.scalars(
select(MitreTechnique).where(MitreTechnique.id.in_(technique_ids))
).all()
}
if sub_ids:
sub_map = {
r.id: r
for r in s.scalars(
select(MitreSubtechnique).where(MitreSubtechnique.id.in_(sub_ids))
).all()
}
return tactic_map, technique_map, sub_map
def _snapshot_tag(
tag: TestTemplateMitreTag,
tactic_map: dict[uuid.UUID, MitreTactic],
technique_map: dict[uuid.UUID, MitreTechnique],
sub_map: dict[uuid.UUID, MitreSubtechnique],
) -> MissionTestMitreTag | None:
"""Convert a template's polymorphic MITRE tag into a frozen mission tag.
Returns None if the referenced MITRE row vanished between read and snapshot
(paranoid: should not happen inside one tx).
"""
if tag.mitre_kind == "tactic" and tag.tactic_id in tactic_map:
r = tactic_map[tag.tactic_id]
return MissionTestMitreTag(
mitre_kind="tactic",
mitre_external_id=r.external_id,
mitre_name=r.name,
mitre_url=r.url,
)
if tag.mitre_kind == "technique" and tag.technique_id in technique_map:
r = technique_map[tag.technique_id]
return MissionTestMitreTag(
mitre_kind="technique",
mitre_external_id=r.external_id,
mitre_name=r.name,
mitre_url=r.url,
)
if tag.mitre_kind == "subtechnique" and tag.subtechnique_id in sub_map:
r = sub_map[tag.subtechnique_id]
return MissionTestMitreTag(
mitre_kind="subtechnique",
mitre_external_id=r.external_id,
mitre_name=r.name,
mitre_url=r.url,
)
return None
# --------------------------------------------------------------------------- #
# Snapshot
# --------------------------------------------------------------------------- #
def _load_scenario_templates_for_snapshot(
s: Session, scenario_ids: list[uuid.UUID]
) -> dict[uuid.UUID, ScenarioTemplate]:
"""Load scenarios in eager-load mode and reject unknowns/soft-deleted upfront."""
if not scenario_ids:
return {}
rows = s.scalars(
select(ScenarioTemplate)
.options(selectinload(ScenarioTemplate.tests))
.where(ScenarioTemplate.id.in_(scenario_ids))
).all()
by_id = {sc.id: sc for sc in rows}
missing = set(scenario_ids) - by_id.keys()
if missing:
raise UnknownScenarioTemplate(
f"unknown scenario_template ids: {sorted(str(m) for m in missing)}"
)
deleted = [sc.id for sc in rows if sc.deleted_at is not None]
if deleted:
raise UnknownScenarioTemplate(
f"cannot snapshot soft-deleted scenario_template ids: "
f"{sorted(str(d) for d in deleted)}"
)
return by_id
def _snapshot_scenarios(
s: Session,
mission_id: uuid.UUID,
scenario_ids: list[uuid.UUID],
start_position: int,
) -> None:
"""Append `scenario_ids` as new MissionScenario+MissionTest rows under the mission.
Position counter continues from `start_position`. Each scenario_template's
`tests` order is preserved 1:1. MITRE tags on the source templates are
copied as denormalised `MissionTestMitreTag` rows (frozen external_id/name/url).
"""
if not scenario_ids:
return
_lock_scenario_ids_for_snapshot(s, scenario_ids)
sc_by_id = _load_scenario_templates_for_snapshot(s, scenario_ids)
# Collect the underlying test_template ids in stable order.
ordered_test_ids: list[uuid.UUID] = []
for sid in scenario_ids:
sc = sc_by_id[sid]
for link in sc.tests:
ordered_test_ids.append(link.test_template_id)
test_template_map: dict[uuid.UUID, TestTemplate] = {}
if ordered_test_ids:
test_template_rows = s.scalars(
select(TestTemplate)
.options(selectinload(TestTemplate.mitre_tags))
.where(TestTemplate.id.in_(set(ordered_test_ids)))
).all()
test_template_map = {t.id: t for t in test_template_rows}
# A test_template may be soft-deleted between the scenario authoring and
# the mission creation. We do not refuse the snapshot (the user expects
# the scenario's planned tests to appear); we just freeze the last
# known content, which is what a snapshot is for.
missing_t = set(ordered_test_ids) - test_template_map.keys()
if missing_t:
raise UnknownScenarioTemplate(
f"scenario references missing test_template ids: "
f"{sorted(str(m) for m in missing_t)}"
)
# Pre-load all MITRE rows referenced by any tag across all involved templates.
all_tag_rows: list[TestTemplateMitreTag] = []
for t in test_template_map.values():
all_tag_rows.extend(t.mitre_tags)
tactic_map, technique_map, sub_map = _resolve_mitre_lookup(
s, *_collect_mitre_ids(all_tag_rows)
)
pos = start_position
for sid in scenario_ids:
sc = sc_by_id[sid]
ms = MissionScenario(
mission_id=mission_id,
source_scenario_template_id=sc.id,
snapshot_name=sc.name,
snapshot_description=sc.description,
position=pos,
)
s.add(ms)
s.flush() # populate ms.id for the child tests
test_pos = 0
for link in sc.tests:
tt = test_template_map[link.test_template_id]
mt = MissionTest(
scenario_id=ms.id,
source_test_template_id=tt.id,
position=test_pos,
snapshot_name=tt.name,
snapshot_description=tt.description,
snapshot_objective=tt.objective,
snapshot_procedure_md=tt.procedure_md,
snapshot_prerequisites_md=tt.prerequisites_md,
snapshot_expected_red_md=tt.expected_result_red_md,
snapshot_expected_blue_md=tt.expected_detection_blue_md,
snapshot_opsec_level=tt.opsec_level,
snapshot_tags=list(tt.tags or []),
snapshot_expected_iocs=list(tt.expected_iocs or []),
state="pending",
)
s.add(mt)
s.flush()
for src_tag in tt.mitre_tags:
snap = _snapshot_tag(src_tag, tactic_map, technique_map, sub_map)
if snap is not None:
snap.mission_test_id = mt.id
s.add(snap)
test_pos += 1
pos += 1
s.flush()
# --------------------------------------------------------------------------- #
# View assembly
# --------------------------------------------------------------------------- #
def _member_views(s: Session, mission: Mission) -> list[MissionMemberView]:
if not mission.members:
return []
users = _load_users_map(s, [m.user_id for m in mission.members])
out: list[MissionMemberView] = []
for m in mission.members:
u = users.get(m.user_id)
out.append(
MissionMemberView(
user_id=m.user_id,
user_email=u.email if u else "<deleted>",
user_display_name=(u.display_name if u else None),
role_hint=m.role_hint,
)
)
out.sort(key=lambda mv: (mv.role_hint, mv.user_email))
return out
def _scenario_views(scenarios: list[MissionScenario]) -> list[MissionScenarioView]:
"""Assemble scenario views. `mission_scenarios` and `mission_tests` both
carry `SoftDeleteMixin`; M6 doesn't surface soft-deletion of those rows in
any endpoint, but the filter is applied here so future deletions (M7+)
don't drift the rendered list silently."""
views: list[MissionScenarioView] = []
live = [sc for sc in scenarios if sc.deleted_at is None]
for sc in sorted(live, key=lambda s_: s_.position):
test_views: list[MissionTestView] = []
live_tests = [t for t in sc.tests if t.deleted_at is None]
for t in sorted(live_tests, key=lambda t_: t_.position):
tag_views = [
MissionMitreTagView(
kind=tag.mitre_kind,
external_id=tag.mitre_external_id,
name=tag.mitre_name,
url=tag.mitre_url,
)
for tag in sorted(
t.mitre_tags, key=lambda tg: (tg.mitre_kind, tg.mitre_external_id)
)
]
test_views.append(
MissionTestView(
id=t.id,
position=t.position,
snapshot_name=t.snapshot_name,
snapshot_description=t.snapshot_description,
snapshot_objective=t.snapshot_objective,
snapshot_procedure_md=t.snapshot_procedure_md,
snapshot_prerequisites_md=t.snapshot_prerequisites_md,
snapshot_expected_red_md=t.snapshot_expected_red_md,
snapshot_expected_blue_md=t.snapshot_expected_blue_md,
snapshot_opsec_level=t.snapshot_opsec_level,
snapshot_tags=list(t.snapshot_tags or []),
snapshot_expected_iocs=list(t.snapshot_expected_iocs or []),
state=t.state,
executed_at=t.executed_at,
executed_at_overridden=t.executed_at_overridden,
mitre_tags=tag_views,
source_test_template_id=t.source_test_template_id,
)
)
views.append(
MissionScenarioView(
id=sc.id,
position=sc.position,
snapshot_name=sc.snapshot_name,
snapshot_description=sc.snapshot_description,
tests=test_views,
source_scenario_template_id=sc.source_scenario_template_id,
)
)
return views
def _to_detail_view(s: Session, m: Mission) -> MissionView:
scenarios = [sc for sc in m.scenarios if sc.deleted_at is None]
members = _member_views(s, m)
scenario_views = _scenario_views(scenarios)
tests_count = sum(len(sc.tests) for sc in scenario_views)
return MissionView(
id=m.id,
name=m.name,
client_target=m.client_target,
date_start=m.date_start,
date_end=m.date_end,
status=m.status,
description_md=m.description_md,
visibility_mode=m.visibility_mode,
scenarios_count=len(scenario_views),
tests_count=tests_count,
members_count=len(members),
deleted_at=m.deleted_at,
created_at=m.created_at,
updated_at=m.updated_at,
scenarios=scenario_views,
members=members,
)
def _to_list_item(m: Mission) -> MissionListItemView:
# Cheap counts via the loaded relationships (selectinloaded by the caller).
# We filter soft-deleted children consistently with `_scenario_views` so
# the list and the detail page agree.
live_scenarios = [sc for sc in m.scenarios if sc.deleted_at is None]
tests_count = sum(
len([t for t in sc.tests if t.deleted_at is None]) for sc in live_scenarios
)
return MissionListItemView(
id=m.id,
name=m.name,
client_target=m.client_target,
date_start=m.date_start,
date_end=m.date_end,
status=m.status,
description_md=m.description_md,
visibility_mode=m.visibility_mode,
scenarios_count=len(live_scenarios),
tests_count=tests_count,
members_count=len(m.members),
deleted_at=m.deleted_at,
created_at=m.created_at,
updated_at=m.updated_at,
)
# --------------------------------------------------------------------------- #
# Public API — list / get
# --------------------------------------------------------------------------- #
def list_missions(
*,
viewer_id: uuid.UUID,
viewer_is_admin: bool,
q: str | None = None,
status: str | None = None,
client: str | None = None,
include_deleted: bool = False,
limit: int = 100,
offset: int = 0,
) -> tuple[list[MissionListItemView], int]:
with session_scope() as s:
stmt = (
select(Mission)
.options(
selectinload(Mission.scenarios).selectinload(MissionScenario.tests),
selectinload(Mission.members),
)
.order_by(Mission.created_at.desc(), Mission.id.desc())
)
count_stmt = select(func.count()).select_from(Mission)
if not include_deleted:
stmt = stmt.where(Mission.deleted_at.is_(None))
count_stmt = count_stmt.where(Mission.deleted_at.is_(None))
if not viewer_is_admin:
stmt = stmt.where(_membership_filter(viewer_id))
count_stmt = count_stmt.where(_membership_filter(viewer_id))
if status:
_validate_status(status)
stmt = stmt.where(Mission.status == status)
count_stmt = count_stmt.where(Mission.status == status)
if client:
like = f"%{_escape_like(client.lower())}%"
cond = func.lower(Mission.client_target).like(like, escape="\\")
stmt = stmt.where(cond)
count_stmt = count_stmt.where(cond)
if q:
like = f"%{_escape_like(q.lower())}%"
cond = or_(
func.lower(Mission.name).like(like, escape="\\"),
func.lower(Mission.description_md).like(like, escape="\\"),
)
stmt = stmt.where(cond)
count_stmt = count_stmt.where(cond)
total = s.scalar(count_stmt) or 0
rows = s.scalars(
stmt.limit(max(1, min(limit, 500))).offset(max(0, offset))
).all()
return [_to_list_item(m) for m in rows], int(total)
def get_mission(
mission_id: uuid.UUID,
*,
viewer_id: uuid.UUID,
viewer_is_admin: bool,
include_deleted: bool = False,
) -> MissionView:
with session_scope() as s:
m = s.get(Mission, mission_id)
if m is None:
raise MissionNotFound()
if m.deleted_at is not None and not include_deleted:
raise MissionNotFound()
if not viewer_is_admin and not _is_member(s, mission_id, viewer_id):
raise MissionNotFound()
return _to_detail_view(s, m)
# --------------------------------------------------------------------------- #
# Public API — write
# --------------------------------------------------------------------------- #
def _validate_members(s: Session, members: list[MemberAssignment]) -> None:
"""Reject duplicates, bad role hints, unknown/soft-deleted users."""
seen: set[uuid.UUID] = set()
for m in members:
if m.user_id in seen:
raise InvalidMemberPayload(f"duplicate user_id: {m.user_id}")
seen.add(m.user_id)
_validate_role_hint(m.role_hint)
if not members:
return
user_map = _load_users_map(s, seen)
missing = seen - user_map.keys()
if missing:
raise UnknownUser(f"unknown or deleted user_ids: {sorted(str(u) for u in missing)}")
def create_mission(
*,
name: str,
creator_id: uuid.UUID,
creator_is_admin: bool,
client_target: str | None = None,
date_start: date | None = None,
date_end: date | None = None,
description_md: str | None = None,
scenario_template_ids: list[uuid.UUID] | None = None,
members: list[MemberAssignment] | None = None,
) -> MissionView:
"""Create a mission and snapshot the requested scenarios + their tests.
Side effect: if `creator_is_admin` is False and the creator is not in
`members`, they are added with `role_hint='red'`. This prevents the
non-admin creator from immediately losing visibility on the mission they
just created (membership-based visibility, see spec §4).
"""
name_norm = _normalize_name(name)
_validate_dates(date_start, date_end)
scenarios = list(scenario_template_ids or [])
members_list = list(members or [])
with session_scope() as s:
_validate_members(s, members_list)
# Auto-add the non-admin creator as a member so they retain visibility.
if not creator_is_admin and not any(m.user_id == creator_id for m in members_list):
members_list = [
MemberAssignment(user_id=creator_id, role_hint="red"),
*members_list,
]
# Defensive re-validation in case the creator id was bogus.
_validate_members(s, members_list)
mission = Mission(
name=name_norm,
client_target=_opt_str(client_target),
date_start=date_start,
date_end=date_end,
description_md=_opt_str(description_md),
status="draft",
visibility_mode="whitebox",
)
s.add(mission)
s.flush()
for m in members_list:
s.add(
MissionMember(
mission_id=mission.id,
user_id=m.user_id,
role_hint=m.role_hint,
)
)
if scenarios:
_snapshot_scenarios(s, mission.id, scenarios, start_position=0)
s.flush()
s.refresh(mission)
return _to_detail_view(s, mission)
def update_mission_metadata(
mission_id: uuid.UUID,
*,
viewer_id: uuid.UUID,
viewer_is_admin: bool,
name: str | None = None,
client_target: Any = _UNSET,
date_start: Any = _UNSET,
date_end: Any = _UNSET,
description_md: Any = _UNSET,
) -> MissionView:
with session_scope() as s:
m = s.get(Mission, mission_id)
if m is None or m.deleted_at is not None:
raise MissionNotFound()
if not viewer_is_admin and not _is_member(s, mission_id, viewer_id):
raise MissionNotFound()
if name is not None:
m.name = _normalize_name(name)
if client_target is not _UNSET:
m.client_target = _opt_str(client_target)
if date_start is not _UNSET:
m.date_start = date_start
if date_end is not _UNSET:
m.date_end = date_end
# Validate the combined date pair regardless of which side was passed.
_validate_dates(m.date_start, m.date_end)
if description_md is not _UNSET:
m.description_md = _opt_str(description_md)
s.flush()
s.refresh(m)
return _to_detail_view(s, m)
def add_scenarios_to_mission(
mission_id: uuid.UUID,
scenario_template_ids: list[uuid.UUID],
*,
viewer_id: uuid.UUID,
viewer_is_admin: bool,
) -> MissionView:
"""Append more snapshot scenarios to an existing mission.
They land at `current_max_position + 1` and onwards. Empty list is a no-op
and just returns the current view.
"""
with session_scope() as s:
m = s.get(Mission, mission_id)
if m is None or m.deleted_at is not None:
raise MissionNotFound()
if not viewer_is_admin and not _is_member(s, mission_id, viewer_id):
raise MissionNotFound()
if scenario_template_ids:
max_pos = s.scalar(
select(func.coalesce(func.max(MissionScenario.position), -1)).where(
MissionScenario.mission_id == mission_id
)
)
_snapshot_scenarios(
s,
mission_id,
list(scenario_template_ids),
start_position=int(max_pos) + 1,
)
s.flush()
s.refresh(m)
return _to_detail_view(s, m)
def set_mission_members(
mission_id: uuid.UUID,
members: list[MemberAssignment],
*,
viewer_id: uuid.UUID,
viewer_is_admin: bool,
) -> MissionView:
"""Replace the entire member set. Wipe + insert, like the scenario reorder."""
with session_scope() as s:
m = s.get(Mission, mission_id)
if m is None or m.deleted_at is not None:
raise MissionNotFound()
if not viewer_is_admin and not _is_member(s, mission_id, viewer_id):
raise MissionNotFound()
_validate_members(s, members)
for link in list(m.members):
s.delete(link)
s.flush()
for assignment in members:
s.add(
MissionMember(
mission_id=m.id,
user_id=assignment.user_id,
role_hint=assignment.role_hint,
)
)
s.flush()
s.refresh(m)
return _to_detail_view(s, m)
def transition_mission_status(
mission_id: uuid.UUID,
target_status: str,
*,
viewer_id: uuid.UUID,
viewer_is_admin: bool,
) -> MissionView:
"""Move the mission's status one step along the lifecycle graph."""
_validate_status(target_status)
with session_scope() as s:
m = s.get(Mission, mission_id)
if m is None or m.deleted_at is not None:
raise MissionNotFound()
if not viewer_is_admin and not _is_member(s, mission_id, viewer_id):
raise MissionNotFound()
if target_status == m.status:
# No-op transitions are valid: a client retry should not 409.
s.refresh(m)
return _to_detail_view(s, m)
allowed = _VALID_TRANSITIONS.get(m.status, frozenset())
if target_status not in allowed:
raise InvalidTransition(
f"cannot transition from {m.status!r} to {target_status!r}"
)
m.status = target_status
s.flush()
s.refresh(m)
return _to_detail_view(s, m)
def soft_delete_mission(
mission_id: uuid.UUID,
*,
viewer_id: uuid.UUID,
viewer_is_admin: bool,
) -> None:
with session_scope() as s:
m = s.get(Mission, mission_id)
if m is None or m.deleted_at is not None:
raise MissionNotFound()
if not viewer_is_admin and not _is_member(s, mission_id, viewer_id):
raise MissionNotFound()
m.deleted_at = datetime.now(tz=timezone.utc)

View File

@@ -0,0 +1,781 @@
"""M6 — Mission CRUD, snapshot fidelity, membership visibility, transitions.
The fixture stack mirrors `test_templates.py`: one shared `app` per module,
fresh truncate at the start, a minimal MITRE bundle seeded for tag resolution,
plus a small catalogue of test_templates and scenario_templates created via
the admin API so the snapshot path is exercised end-to-end (not via raw ORM).
"""
from __future__ import annotations
import json
import secrets
import pytest
from sqlalchemy import text
from app.core.install_token import regenerate_install_token
from app.main import create_app
from app.services import mitre_seed as mitre_svc
def _truncate_all(engine):
with engine.begin() as conn:
# Order matches /diag/reset: missions before templates before MITRE.
conn.execute(
text(
"TRUNCATE mission_test_mitre_tags, mission_tests, "
"mission_scenarios, mission_categories, mission_members, "
"missions RESTART IDENTITY CASCADE"
)
)
conn.execute(
text(
"TRUNCATE scenario_template_tests, scenario_templates, "
"test_template_mitre_tags, test_templates "
"RESTART IDENTITY CASCADE"
)
)
conn.execute(
text(
"TRUNCATE users, refresh_tokens, invitations, invitation_groups, "
"user_groups, group_permissions, permissions, settings, groups "
"RESTART IDENTITY CASCADE"
)
)
conn.execute(
text(
"TRUNCATE mitre_technique_tactics, mitre_subtechniques, "
"mitre_techniques, mitre_tactics RESTART IDENTITY CASCADE"
)
)
_MINIMAL_BUNDLE = {
"type": "bundle",
"id": "bundle--00000000-0000-0000-0000-000000000006",
"spec_version": "2.1",
"objects": [
{
"type": "x-mitre-tactic",
"id": "x-mitre-tactic--ta0002",
"name": "Execution",
"x_mitre_shortname": "execution",
"external_references": [
{"source_name": "mitre-attack", "external_id": "TA0002"}
],
},
{
"type": "attack-pattern",
"id": "attack-pattern--t1059",
"name": "Command and Scripting Interpreter",
"kill_chain_phases": [
{"kill_chain_name": "mitre-attack", "phase_name": "execution"}
],
"external_references": [
{"source_name": "mitre-attack", "external_id": "T1059"}
],
},
{
"type": "attack-pattern",
"id": "attack-pattern--t1059-001",
"name": "PowerShell",
"x_mitre_is_subtechnique": True,
"external_references": [
{"source_name": "mitre-attack", "external_id": "T1059.001"}
],
},
{
"type": "relationship",
"id": "relationship--rel1",
"relationship_type": "subtechnique-of",
"source_ref": "attack-pattern--t1059-001",
"target_ref": "attack-pattern--t1059",
},
],
}
@pytest.fixture(scope="module")
def app(db_engine_or_skip, tmp_path_factory):
_truncate_all(db_engine_or_skip)
bundle_path = tmp_path_factory.mktemp("m6") / "stix.json"
bundle_path.write_text(json.dumps(_MINIMAL_BUNDLE))
mitre_svc.seed_mitre(source=bundle_path, expected_sha256=None)
flask_app = create_app()
flask_app.config.update(TESTING=True)
return flask_app
@pytest.fixture()
def client(app):
return app.test_client()
def _unique_email(prefix: str) -> str:
return f"{prefix}-{secrets.token_hex(4)}@metamorph.local"
def _bearer(token: str) -> dict[str, str]:
return {"Authorization": f"Bearer {token}"}
def _login(client, email: str, password: str) -> str:
r = client.post("/api/v1/auth/login", json={"email": email, "password": password})
assert r.status_code == 200, r.get_data(as_text=True)
return r.get_json()["access_token"]
@pytest.fixture(scope="module")
def admin(app):
token = regenerate_install_token()
email = _unique_email("admin")
password = "AdminPass1234!"
with app.test_client() as c:
r = c.post(
"/api/v1/setup",
json={"install_token": token, "email": email, "password": password},
)
assert r.status_code == 201, r.get_data(as_text=True)
return {"email": email, "password": password}
@pytest.fixture()
def admin_token(client, admin) -> str:
return _login(client, admin["email"], admin["password"])
# ---------------------------------------------------------------- catalogue --
def _mitre_kind(external_id: str) -> str:
if external_id.startswith("TA"):
return "tactic"
if "." in external_id:
return "subtechnique"
return "technique"
def _make_test_template(client, admin_token: str, *, name: str, mitre: str = "T1059"):
body = {
"name": name,
"description": "auto",
"objective": "do thing",
"procedure_md": f"# {name}\n1. run",
"expected_result_red_md": "red expectation",
"expected_detection_blue_md": "blue expectation",
"opsec_level": "medium",
"tags": ["fast"],
"expected_iocs": ["evil.exe"],
"mitre_tags": [{"kind": _mitre_kind(mitre), "external_id": mitre}],
}
r = client.post("/api/v1/test-templates", headers=_bearer(admin_token), json=body)
assert r.status_code == 201, r.get_data(as_text=True)
return r.get_json()
def _make_scenario(client, admin_token: str, *, name: str, test_ids: list[str]):
r = client.post(
"/api/v1/scenario-templates",
headers=_bearer(admin_token),
json={
"name": name,
"description": f"auto-{name}",
"test_template_ids": test_ids,
},
)
assert r.status_code == 201, r.get_data(as_text=True)
return r.get_json()
@pytest.fixture(scope="module")
def catalogue(app, admin):
"""Pre-seeded templates + scenarios so tests can reference them by id."""
with app.test_client() as c:
tok = _login(c, admin["email"], admin["password"])
t1 = _make_test_template(c, tok, name="cat-test-1", mitre="T1059")
t2 = _make_test_template(
c, tok, name="cat-test-2", mitre="T1059.001"
)
t3 = _make_test_template(c, tok, name="cat-test-3", mitre="T1059")
sc_one = _make_scenario(
c, tok, name="cat-scenario-A", test_ids=[t1["id"], t2["id"], t3["id"]]
)
sc_solo = _make_scenario(c, tok, name="cat-scenario-B", test_ids=[t1["id"]])
return {
"tests": {"t1": t1, "t2": t2, "t3": t3},
"scenarios": {"a": sc_one, "b": sc_solo},
}
# --------------------------------------------------------------------- users --
def _invite_user(client, admin_token: str, prefix: str, group_codes: list[str]) -> dict:
"""Invite a user pre-bound to a freshly-minted group with the listed perm codes."""
grp = client.post(
"/api/v1/groups",
headers=_bearer(admin_token),
json={"name": f"{prefix}-grp-{secrets.token_hex(2)}"},
).get_json()
r_set = client.put(
f"/api/v1/groups/{grp['id']}/permissions",
headers=_bearer(admin_token),
json={"codes": group_codes},
)
assert r_set.status_code == 200, r_set.get_data(as_text=True)
email = _unique_email(prefix)
password = "Pass1234!"
inv = client.post(
"/api/v1/invitations",
headers=_bearer(admin_token),
json={"email_hint": email, "group_ids": [grp["id"]]},
)
assert inv.status_code == 201, inv.get_data(as_text=True)
accept_token = inv.get_json()["token"]
r = client.post(
f"/api/v1/invitations/accept/{accept_token}",
json={"email": email, "password": password},
)
assert r.status_code == 201, r.get_data(as_text=True)
me_token = _login(client, email, password)
me = client.get("/api/v1/auth/me", headers=_bearer(me_token)).get_json()
return {"email": email, "password": password, "token": me_token, "id": me["id"]}
@pytest.fixture()
def red_user(client, admin_token):
return _invite_user(
client,
admin_token,
"red",
[
"mission.read",
"mission.create",
"mission.update",
"mission.archive",
"mission.write_red_fields",
],
)
@pytest.fixture()
def blue_user(client, admin_token):
return _invite_user(
client,
admin_token,
"blue",
["mission.read", "mission.write_blue_fields"],
)
@pytest.fixture()
def reader_user(client, admin_token):
"""A user with mission.read only — for "non-member can't see" checks."""
return _invite_user(client, admin_token, "reader", ["mission.read"])
@pytest.fixture()
def noperm_user(client, admin_token):
"""A user with no mission perms at all."""
return _invite_user(client, admin_token, "noperm", [])
# ================================================================ snapshot ==
def test_create_mission_snapshots_scenarios_and_tests(client, admin_token, catalogue):
r = client.post(
"/api/v1/missions",
headers=_bearer(admin_token),
json={
"name": "snapshot-fidelity",
"client_target": "Acme Corp",
"description_md": "## ROE\n- approved\n",
"scenario_template_ids": [catalogue["scenarios"]["a"]["id"]],
},
)
assert r.status_code == 201, r.get_data(as_text=True)
body = r.get_json()
assert body["status"] == "draft"
assert body["visibility_mode"] == "whitebox"
assert body["scenarios_count"] == 1
assert body["tests_count"] == 3
assert body["members_count"] == 0 # admin creator is not auto-added
sc = body["scenarios"][0]
assert sc["position"] == 0
assert sc["snapshot_name"] == "cat-scenario-A"
names_in_order = [t["snapshot_name"] for t in sc["tests"]]
assert names_in_order == ["cat-test-1", "cat-test-2", "cat-test-3"]
# MITRE denormalised into the snapshot
t1 = next(t for t in sc["tests"] if t["snapshot_name"] == "cat-test-1")
kinds = [(tag["kind"], tag["external_id"]) for tag in t1["mitre_tags"]]
assert kinds == [("technique", "T1059")]
def test_snapshot_is_frozen_after_template_edits(client, admin_token, catalogue):
create = client.post(
"/api/v1/missions",
headers=_bearer(admin_token),
json={
"name": "frozen-after-edits",
"scenario_template_ids": [catalogue["scenarios"]["b"]["id"]],
},
)
assert create.status_code == 201
mission_id = create.get_json()["id"]
# Mutate the source test_template: rename + change MITRE
edit = client.put(
f"/api/v1/test-templates/{catalogue['tests']['t1']['id']}",
headers=_bearer(admin_token),
json={
"name": "RENAMED-AFTER-SNAPSHOT",
"mitre_tags": [{"kind": "tactic", "external_id": "TA0002"}],
},
)
assert edit.status_code == 200
# Mission still sees the pre-edit snapshot
again = client.get(
f"/api/v1/missions/{mission_id}", headers=_bearer(admin_token)
).get_json()
sc = again["scenarios"][0]
assert sc["tests"][0]["snapshot_name"] == "cat-test-1"
assert [(t["kind"], t["external_id"]) for t in sc["tests"][0]["mitre_tags"]] == [
("technique", "T1059")
]
# Revert the rename so other tests still find the original name
client.put(
f"/api/v1/test-templates/{catalogue['tests']['t1']['id']}",
headers=_bearer(admin_token),
json={
"name": "cat-test-1",
"mitre_tags": [{"kind": "technique", "external_id": "T1059"}],
},
)
def test_create_mission_rejects_unknown_scenario(client, admin_token):
r = client.post(
"/api/v1/missions",
headers=_bearer(admin_token),
json={
"name": "bad-ref",
"scenario_template_ids": ["00000000-0000-0000-0000-000000000099"],
},
)
assert r.status_code == 400
assert r.get_json()["error"] == "unknown_scenario_template"
def test_create_mission_rejects_soft_deleted_scenario(client, admin_token):
# Build a scenario, soft-delete it, then try to snapshot — must 400 with
# `unknown_scenario_template` so we don't silently freeze a tombstoned
# template into a new mission.
t = _make_test_template(client, admin_token, name="sd-rejection-t")
sc = client.post(
"/api/v1/scenario-templates",
headers=_bearer(admin_token),
json={"name": "sd-rejection-sc", "test_template_ids": [t["id"]]},
).get_json()
del_r = client.delete(
f"/api/v1/scenario-templates/{sc['id']}", headers=_bearer(admin_token)
)
assert del_r.status_code == 200
r = client.post(
"/api/v1/missions",
headers=_bearer(admin_token),
json={
"name": "sd-rejection-mission",
"scenario_template_ids": [sc["id"]],
},
)
assert r.status_code == 400
assert r.get_json()["error"] == "unknown_scenario_template"
def test_create_mission_validates_dates(client, admin_token):
r = client.post(
"/api/v1/missions",
headers=_bearer(admin_token),
json={
"name": "date-flip",
"date_start": "2026-06-01",
"date_end": "2026-05-01",
},
)
assert r.status_code == 400
assert "date_end" in r.get_json().get("message", "").lower()
# =================================================== membership visibility ==
def test_non_admin_creator_auto_added(client, red_user, catalogue):
r = client.post(
"/api/v1/missions",
headers=_bearer(red_user["token"]),
json={
"name": "red-self-created",
"scenario_template_ids": [catalogue["scenarios"]["b"]["id"]],
},
)
assert r.status_code == 201, r.get_data(as_text=True)
body = r.get_json()
assert body["members_count"] == 1
assert body["members"][0]["user_id"] == red_user["id"]
assert body["members"][0]["role_hint"] == "red"
# And the red user can see it back via /missions
r2 = client.get("/api/v1/missions", headers=_bearer(red_user["token"]))
ids = [it["id"] for it in r2.get_json()["items"]]
assert body["id"] in ids
def test_non_admin_cannot_see_missions_they_are_not_members_of(
client, admin_token, reader_user, catalogue
):
# Admin creates a mission with NO members
create = client.post(
"/api/v1/missions",
headers=_bearer(admin_token),
json={
"name": "hidden-from-reader",
"scenario_template_ids": [catalogue["scenarios"]["b"]["id"]],
},
)
assert create.status_code == 201
mid = create.get_json()["id"]
# Reader has mission.read but is not a member → empty list + 404
r_list = client.get("/api/v1/missions", headers=_bearer(reader_user["token"]))
ids = [it["id"] for it in r_list.get_json()["items"]]
assert mid not in ids
r_get = client.get(
f"/api/v1/missions/{mid}", headers=_bearer(reader_user["token"])
)
assert r_get.status_code == 404
def test_non_member_get_returns_404_not_403(client, admin_token, reader_user):
"""Existence leak guard: non-members should see 404, not 403."""
create = client.post(
"/api/v1/missions",
headers=_bearer(admin_token),
json={"name": "stealth-mission"},
)
mid = create.get_json()["id"]
r = client.get(f"/api/v1/missions/{mid}", headers=_bearer(reader_user["token"]))
assert r.status_code == 404
# ===================================================== perm gating ==========
def test_create_requires_mission_create_perm(client, blue_user):
"""Blue team users (no mission.create) cannot create missions."""
r = client.post(
"/api/v1/missions",
headers=_bearer(blue_user["token"]),
json={"name": "no-perm"},
)
assert r.status_code == 403
def test_list_requires_mission_read_perm(client, noperm_user):
r = client.get("/api/v1/missions", headers=_bearer(noperm_user["token"]))
assert r.status_code == 403
def test_transition_perm_gate_runs_before_payload_parse(client, blue_user):
"""A user without `mission.update` or `mission.archive` should see 403,
not 400, even when posting a malformed body — otherwise the endpoint's
shape leaks via the validation error message."""
# blue_user only has mission.read + mission.write_blue_fields, so neither
# mission.update nor mission.archive is held.
r = client.post(
"/api/v1/missions/00000000-0000-0000-0000-000000000000/transition",
headers=_bearer(blue_user["token"]),
json={"status": "garbage-not-a-valid-shape"},
)
assert r.status_code == 403
def test_search_treats_wildcards_as_literals(client, admin_token):
"""User-typed `%` and `_` must NOT act as SQL LIKE wildcards."""
client.post(
"/api/v1/missions",
headers=_bearer(admin_token),
json={"name": "no-wildcards-here"},
)
# Without escaping, `?q=%` would match every mission. With escaping, it
# only matches names that literally contain `%`.
r = client.get("/api/v1/missions?q=%25", headers=_bearer(admin_token))
assert r.status_code == 200
names = [it["name"] for it in r.get_json()["items"]]
assert "no-wildcards-here" not in names
def test_archive_requires_mission_archive_not_just_update(client, admin_token):
"""A user with mission.update but no mission.archive cannot archive."""
# blue_user only has mission.read + mission.write_blue_fields — no update either.
# We'll craft a user with update-only here.
update_only = _invite_user(client, admin_token, "u-only", ["mission.read", "mission.update"])
create = client.post(
"/api/v1/missions",
headers=_bearer(admin_token),
json={
"name": "to-archive",
"members": [{"user_id": update_only["id"], "role_hint": "red"}],
},
)
assert create.status_code == 201
mid = create.get_json()["id"]
# update-only can transition to in_progress (mission.update is enough)
r1 = client.post(
f"/api/v1/missions/{mid}/transition",
headers=_bearer(update_only["token"]),
json={"status": "in_progress"},
)
assert r1.status_code == 200
# … but cannot archive
r2 = client.post(
f"/api/v1/missions/{mid}/transition",
headers=_bearer(update_only["token"]),
json={"status": "archived"},
)
assert r2.status_code == 403
# ==================================================== status transitions ==
def test_valid_transition_chain(client, admin_token):
create = client.post(
"/api/v1/missions",
headers=_bearer(admin_token),
json={"name": "chain"},
)
mid = create.get_json()["id"]
for target in ("in_progress", "completed", "archived"):
r = client.post(
f"/api/v1/missions/{mid}/transition",
headers=_bearer(admin_token),
json={"status": target},
)
assert r.status_code == 200, (target, r.get_data(as_text=True))
assert r.get_json()["status"] == target
def test_invalid_transition_409(client, admin_token):
create = client.post(
"/api/v1/missions",
headers=_bearer(admin_token),
json={"name": "invalid-jump"},
)
mid = create.get_json()["id"]
# draft → completed is not allowed (must pass through in_progress)
r = client.post(
f"/api/v1/missions/{mid}/transition",
headers=_bearer(admin_token),
json={"status": "completed"},
)
assert r.status_code == 409
assert r.get_json()["error"] == "invalid_transition"
def test_unknown_target_status_400(client, admin_token):
create = client.post(
"/api/v1/missions",
headers=_bearer(admin_token),
json={"name": "bad-status"},
)
mid = create.get_json()["id"]
r = client.post(
f"/api/v1/missions/{mid}/transition",
headers=_bearer(admin_token),
json={"status": "delivered"},
)
assert r.status_code == 400
def test_idempotent_same_status_transition(client, admin_token):
create = client.post(
"/api/v1/missions",
headers=_bearer(admin_token),
json={"name": "idempotent"},
)
mid = create.get_json()["id"]
r = client.post(
f"/api/v1/missions/{mid}/transition",
headers=_bearer(admin_token),
json={"status": "draft"},
)
assert r.status_code == 200
assert r.get_json()["status"] == "draft"
# ============================================================ members =====
def test_set_members_replaces_full_set(client, admin_token, red_user, blue_user):
create = client.post(
"/api/v1/missions",
headers=_bearer(admin_token),
json={
"name": "members-replace",
"members": [{"user_id": red_user["id"], "role_hint": "red"}],
},
)
mid = create.get_json()["id"]
r = client.put(
f"/api/v1/missions/{mid}/members",
headers=_bearer(admin_token),
json={"members": [{"user_id": blue_user["id"], "role_hint": "blue"}]},
)
assert r.status_code == 200, r.get_data(as_text=True)
body = r.get_json()
assert body["members_count"] == 1
assert body["members"][0]["user_id"] == blue_user["id"]
# And red can no longer see it
r_red = client.get(
f"/api/v1/missions/{mid}", headers=_bearer(red_user["token"])
)
assert r_red.status_code == 404
def test_set_members_rejects_unknown_user(client, admin_token):
create = client.post(
"/api/v1/missions", headers=_bearer(admin_token), json={"name": "ghost-member"}
)
mid = create.get_json()["id"]
r = client.put(
f"/api/v1/missions/{mid}/members",
headers=_bearer(admin_token),
json={
"members": [
{
"user_id": "00000000-0000-0000-0000-000000000123",
"role_hint": "red",
}
]
},
)
assert r.status_code == 400
assert r.get_json()["error"] == "unknown_user"
def test_set_members_rejects_bad_role_hint(client, admin_token, red_user):
create = client.post(
"/api/v1/missions", headers=_bearer(admin_token), json={"name": "bad-hint"}
)
mid = create.get_json()["id"]
r = client.put(
f"/api/v1/missions/{mid}/members",
headers=_bearer(admin_token),
json={"members": [{"user_id": red_user["id"], "role_hint": "yellow"}]},
)
assert r.status_code == 400
# ==================================================== add scenarios ======
def test_add_scenarios_appends_at_end(client, admin_token, catalogue):
create = client.post(
"/api/v1/missions",
headers=_bearer(admin_token),
json={
"name": "appendable",
"scenario_template_ids": [catalogue["scenarios"]["b"]["id"]],
},
)
mid = create.get_json()["id"]
r = client.post(
f"/api/v1/missions/{mid}/scenarios",
headers=_bearer(admin_token),
json={"scenario_template_ids": [catalogue["scenarios"]["a"]["id"]]},
)
assert r.status_code == 200, r.get_data(as_text=True)
body = r.get_json()
assert body["scenarios_count"] == 2
positions = [sc["position"] for sc in body["scenarios"]]
assert positions == [0, 1]
# Second scenario lands at position 1
sc1 = next(sc for sc in body["scenarios"] if sc["position"] == 1)
assert sc1["snapshot_name"] == "cat-scenario-A"
assert len(sc1["tests"]) == 3
# ============================================================= delete =====
def test_soft_delete_hides_from_list(client, admin_token):
create = client.post(
"/api/v1/missions", headers=_bearer(admin_token), json={"name": "to-delete"}
)
mid = create.get_json()["id"]
r_del = client.delete(f"/api/v1/missions/{mid}", headers=_bearer(admin_token))
assert r_del.status_code == 200
r_list = client.get("/api/v1/missions", headers=_bearer(admin_token))
ids = [it["id"] for it in r_list.get_json()["items"]]
assert mid not in ids
# include_deleted=true brings it back (admin only)
r_list2 = client.get(
"/api/v1/missions?include_deleted=true", headers=_bearer(admin_token)
)
ids2 = [it["id"] for it in r_list2.get_json()["items"]]
assert mid in ids2
def test_include_deleted_forbidden_for_non_admin(client, red_user):
r = client.get(
"/api/v1/missions?include_deleted=true", headers=_bearer(red_user["token"])
)
assert r.status_code == 403
# ============================================================ update ======
def test_update_metadata_partial(client, admin_token):
create = client.post(
"/api/v1/missions",
headers=_bearer(admin_token),
json={
"name": "to-rename",
"client_target": "X",
"date_start": "2026-06-01",
"date_end": "2026-06-10",
},
)
mid = create.get_json()["id"]
r = client.put(
f"/api/v1/missions/{mid}",
headers=_bearer(admin_token),
json={"name": "renamed", "client_target": None},
)
assert r.status_code == 200, r.get_data(as_text=True)
body = r.get_json()
assert body["name"] == "renamed"
assert body["client_target"] is None
# date fields untouched
assert body["date_start"] == "2026-06-01"
assert body["date_end"] == "2026-06-10"
def test_update_rejects_inverted_dates(client, admin_token):
create = client.post(
"/api/v1/missions",
headers=_bearer(admin_token),
json={
"name": "invert",
"date_start": "2026-06-01",
"date_end": "2026-06-10",
},
)
mid = create.get_json()["id"]
r = client.put(
f"/api/v1/missions/{mid}",
headers=_bearer(admin_token),
json={"date_end": "2026-05-01"},
)
assert r.status_code == 400

View File

@@ -0,0 +1,321 @@
import { expect, test, type APIRequestContext, type Page } from '@playwright/test';
/**
* M6 — Mission CRUD, snapshot fidelity, membership visibility, transitions.
*
* The suite covers:
* - Snapshot independence (mutating a template after mission creation must NOT
* propagate into the mission's snapshot).
* - Membership visibility (non-admin viewers see only their own missions).
* - Status transition state machine (draft → in_progress → completed → archived).
* - SPA: list + 3-step create wizard + detail page tabs.
*
* Template + MITRE seed are pulled in `beforeAll`; the `afterAll` hook restores
* the stable admin (memory rule `feedback-metamorph-test-admin`) and re-seeds
* MITRE so subsequent manual sessions don't see an empty matrix.
*/
const ADMIN_EMAIL = `m6-admin-${crypto.randomUUID().slice(0, 8)}@metamorph.local`;
const ADMIN_PASSWORD = 'AdminPass1234!';
async function resetAndMintToken(request: APIRequestContext): Promise<string> {
const r = await request.post('/api/v1/diag/reset');
expect(r.status()).toBe(200);
return (await r.json()).install_token as string;
}
async function loginAndGetAccess(
request: APIRequestContext,
email: string,
password: string,
): Promise<string> {
const r = await request.post('/api/v1/auth/login', { data: { email, password } });
expect(r.status()).toBe(200);
return (await r.json()).access_token as string;
}
async function loginViaSpa(page: Page, email: string, password: string) {
await page.goto('/login');
await page.getByLabel(/email/i).fill(email);
await page.getByLabel(/password/i).fill(password);
await page.getByRole('button', { name: /sign in/i }).click();
await expect(page.getByTestId('me-email')).toHaveText(email);
}
test.describe.configure({ mode: 'serial' });
test.describe('M6 — Missions', () => {
test.beforeAll(async ({ request }) => {
const installToken = await resetAndMintToken(request);
const setup = await request.post('/api/v1/setup', {
data: { install_token: installToken, email: ADMIN_EMAIL, password: ADMIN_PASSWORD },
});
expect(setup.status()).toBe(201);
const access = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD);
const sync = await request.post('/api/v1/mitre/sync', {
headers: { Authorization: `Bearer ${access}` },
});
expect(sync.status()).toBe(200);
});
test.afterAll(async ({ request }) => {
const installToken = await resetAndMintToken(request);
await request.post('/api/v1/setup', {
data: {
install_token: installToken,
email: 'admin@metamorph.local',
password: 'AdminPass1234!',
},
});
const access = await loginAndGetAccess(request, 'admin@metamorph.local', 'AdminPass1234!');
await request.post('/api/v1/mitre/sync', {
headers: { Authorization: `Bearer ${access}` },
});
});
// ---------- helpers ----------------------------------------------------
async function adminAuth(request: APIRequestContext): Promise<Record<string, string>> {
const access = await loginAndGetAccess(request, ADMIN_EMAIL, ADMIN_PASSWORD);
return { Authorization: `Bearer ${access}` };
}
async function makeTest(
request: APIRequestContext,
auth: Record<string, string>,
name: string,
mitre = 'T1059',
): Promise<string> {
const r = await request.post('/api/v1/test-templates', {
headers: auth,
data: {
name,
mitre_tags: [{ kind: 'technique', external_id: mitre }],
},
});
expect(r.status(), await r.text()).toBe(201);
return (await r.json()).id as string;
}
async function makeScenario(
request: APIRequestContext,
auth: Record<string, string>,
name: string,
testIds: string[],
): Promise<string> {
const r = await request.post('/api/v1/scenario-templates', {
headers: auth,
data: { name, test_template_ids: testIds },
});
expect(r.status(), await r.text()).toBe(201);
return (await r.json()).id as string;
}
// ---------- API: snapshot fidelity ------------------------------------
test('Snapshot freezes scenario + test fields at creation time', async ({ request }) => {
const auth = await adminAuth(request);
const tid = await makeTest(request, auth, 'snap-t1');
const sid = await makeScenario(request, auth, 'snap-scenario', [tid]);
const create = await request.post('/api/v1/missions', {
headers: auth,
data: {
name: 'snap-mission',
client_target: 'Acme',
scenario_template_ids: [sid],
},
});
expect(create.status(), await create.text()).toBe(201);
const mission = await create.json();
expect(mission.scenarios_count).toBe(1);
expect(mission.tests_count).toBe(1);
expect(mission.scenarios[0].tests[0].snapshot_name).toBe('snap-t1');
// Mutate the source template AFTER snapshot
const edit = await request.put(`/api/v1/test-templates/${tid}`, {
headers: auth,
data: {
name: 'RENAMED-LATER',
mitre_tags: [{ kind: 'tactic', external_id: 'TA0002' }],
},
});
expect(edit.status()).toBe(200);
// Mission still sees the pre-edit snapshot
const refetch = await request.get(`/api/v1/missions/${mission.id}`, { headers: auth });
expect(refetch.status()).toBe(200);
const snapshot = await refetch.json();
expect(snapshot.scenarios[0].tests[0].snapshot_name).toBe('snap-t1');
expect(
snapshot.scenarios[0].tests[0].mitre_tags.map(
(t: { external_id: string }) => t.external_id,
),
).toEqual(['T1059']);
});
// ---------- API: membership visibility --------------------------------
test('Non-admin members see only missions they belong to', async ({ request }) => {
const auth = await adminAuth(request);
// Create a group with mission.* perms and invite a "red" user.
const grp = await request
.post('/api/v1/groups', { headers: auth, data: { name: 'm6-red-grp' } })
.then((r) => r.json());
const setPerms = await request.put(`/api/v1/groups/${grp.id}/permissions`, {
headers: auth,
data: {
codes: ['mission.read', 'mission.create', 'mission.update', 'mission.archive'],
},
});
expect(setPerms.status()).toBe(200);
const redEmail = `m6-red-${crypto.randomUUID().slice(0, 8)}@metamorph.local`;
const redPwd = 'RedPass1234!';
const inv = await request
.post('/api/v1/invitations', {
headers: auth,
data: { email_hint: redEmail, group_ids: [grp.id] },
})
.then((r) => r.json());
const accept = await request.post(`/api/v1/invitations/accept/${inv.token}`, {
data: { email: redEmail, password: redPwd },
});
expect(accept.status()).toBe(201);
const redAccess = await loginAndGetAccess(request, redEmail, redPwd);
const redAuth = { Authorization: `Bearer ${redAccess}` };
// Admin creates a mission with NO members → red should not see it.
const hidden = await request
.post('/api/v1/missions', {
headers: auth,
data: { name: 'm6-admin-hidden' },
})
.then((r) => r.json());
const redList = await request.get('/api/v1/missions', { headers: redAuth });
expect(redList.status()).toBe(200);
const visible = (await redList.json()).items.map((it: { name: string }) => it.name);
expect(visible).not.toContain('m6-admin-hidden');
const redGetHidden = await request.get(`/api/v1/missions/${hidden.id}`, {
headers: redAuth,
});
expect(redGetHidden.status()).toBe(404);
// Red creates their own mission — auto-added as member → visible to them.
const ownResp = await request.post('/api/v1/missions', {
headers: redAuth,
data: { name: 'm6-red-own' },
});
expect(ownResp.status(), await ownResp.text()).toBe(201);
const own = await ownResp.json();
expect(own.members.map((m: { user_id: string }) => m.user_id)).toContain(
own.members[0].user_id,
);
const redListAfter = await request.get('/api/v1/missions', { headers: redAuth });
const namesAfter = (await redListAfter.json()).items.map(
(it: { name: string }) => it.name,
);
expect(namesAfter).toContain('m6-red-own');
});
// ---------- API: transitions ------------------------------------------
test('Status transition chain and rejection of invalid jumps', async ({ request }) => {
const auth = await adminAuth(request);
const m = await request
.post('/api/v1/missions', {
headers: auth,
data: { name: 'm6-status-chain' },
})
.then((r) => r.json());
for (const target of ['in_progress', 'completed', 'archived']) {
const r = await request.post(`/api/v1/missions/${m.id}/transition`, {
headers: auth,
data: { status: target },
});
expect(r.status(), await r.text()).toBe(200);
expect((await r.json()).status).toBe(target);
}
// Re-create + try an invalid jump draft → completed (must be 409)
const m2 = await request
.post('/api/v1/missions', {
headers: auth,
data: { name: 'm6-status-jump' },
})
.then((r) => r.json());
const bad = await request.post(`/api/v1/missions/${m2.id}/transition`, {
headers: auth,
data: { status: 'completed' },
});
expect(bad.status()).toBe(409);
expect((await bad.json()).error).toBe('invalid_transition');
});
// ---------- SPA -------------------------------------------------------
test('SPA — admin creates a mission via the 3-step wizard', async ({ page, request }) => {
const auth = await adminAuth(request);
const ids: string[] = [];
for (const name of ['spa-wizard-t1', 'spa-wizard-t2', 'spa-wizard-t3']) {
ids.push(await makeTest(request, auth, name));
}
const sid = await makeScenario(request, auth, 'spa-wizard-scenario', ids);
await loginViaSpa(page, ADMIN_EMAIL, ADMIN_PASSWORD);
await page.goto('/missions');
await page.getByTestId('missions-new-link').click();
await expect(page).toHaveURL(/\/missions\/new$/);
// Step 1 — Metadata
await page.getByTestId('meta-name').fill('spa-wizard-mission');
await page.getByTestId('meta-client').fill('Acme via SPA');
await page.getByTestId('missions-create-next').click();
// Step 2 — Scenarios
await page.getByTestId(`scenario-toggle-${sid}`).click();
await page.getByTestId('missions-create-next').click();
// Step 3 — Members (admin doesn't need to add themselves; submit straight away)
await page.getByTestId('missions-create-submit').click();
// Should land on the detail page
await expect(page).toHaveURL(/\/missions\/[0-9a-f-]+$/);
await expect(page.getByTestId('mission-transition-in_progress')).toBeVisible();
await expect(page.getByTestId('mission-tab-tests')).toBeVisible();
// Tests tab renders 3 snapshotted tests
await expect(page.getByText('spa-wizard-t1')).toBeVisible();
await expect(page.getByText('spa-wizard-t2')).toBeVisible();
await expect(page.getByText('spa-wizard-t3')).toBeVisible();
});
test('SPA — list page filters by status', async ({ page, request }) => {
const auth = await adminAuth(request);
// Seed two missions with distinct statuses.
const m1 = await request
.post('/api/v1/missions', { headers: auth, data: { name: 'filter-draft' } })
.then((r) => r.json());
const m2 = await request
.post('/api/v1/missions', { headers: auth, data: { name: 'filter-active' } })
.then((r) => r.json());
await request.post(`/api/v1/missions/${m2.id}/transition`, {
headers: auth,
data: { status: 'in_progress' },
});
await loginViaSpa(page, ADMIN_EMAIL, ADMIN_PASSWORD);
await page.goto('/missions');
await expect(page.getByText('filter-draft')).toBeVisible();
await expect(page.getByText('filter-active')).toBeVisible();
await page.getByTestId('missions-filter-status').selectOption('in_progress');
await expect(page.getByText('filter-active')).toBeVisible();
await expect(page.getByText('filter-draft')).toBeHidden();
// Sanity: m1 / m2 ids should match what the list-card test-id encodes.
await expect(page.getByTestId(`mission-card-${m2.id}`)).toBeVisible();
await expect(page.getByTestId(`mission-card-${m1.id}`)).toBeHidden();
});
});

View File

@@ -12,6 +12,9 @@ import { AdminUsersPage } from '@/pages/AdminUsersPage';
import { HomePage } from '@/pages/HomePage'; import { HomePage } from '@/pages/HomePage';
import { MitrePage } from '@/pages/MitrePage'; import { MitrePage } from '@/pages/MitrePage';
import { LoginPage } from '@/pages/LoginPage'; import { LoginPage } from '@/pages/LoginPage';
import { MissionDetailPage } from '@/pages/MissionDetailPage';
import { MissionsCreatePage } from '@/pages/MissionsCreatePage';
import { MissionsListPage } from '@/pages/MissionsListPage';
import { ProfilePage } from '@/pages/ProfilePage'; import { ProfilePage } from '@/pages/ProfilePage';
import { RegisterPage } from '@/pages/RegisterPage'; import { RegisterPage } from '@/pages/RegisterPage';
import { SetupPage } from '@/pages/SetupPage'; import { SetupPage } from '@/pages/SetupPage';
@@ -60,6 +63,30 @@ function App() {
</RequireAuth> </RequireAuth>
} }
/> />
<Route
path="/missions"
element={
<RequireAuth>
<MissionsListPage />
</RequireAuth>
}
/>
<Route
path="/missions/new"
element={
<RequireAuth>
<MissionsCreatePage />
</RequireAuth>
}
/>
<Route
path="/missions/:id"
element={
<RequireAuth>
<MissionDetailPage />
</RequireAuth>
}
/>
<Route <Route
path="/admin/users" path="/admin/users"
element={ element={

View File

@@ -37,6 +37,9 @@ export function Layout() {
{navItem('/', 'Home')} {navItem('/', 'Home')}
{navItem('/profile', 'Profile')} {navItem('/profile', 'Profile')}
{navItem('/mitre', 'MITRE')} {navItem('/mitre', 'MITRE')}
{(state.user.is_admin ||
state.user.permissions.includes('mission.read')) &&
navItem('/missions', 'Missions')}
{state.user.is_admin && ( {state.user.is_admin && (
<> <>
{navItem('/admin/users', 'Users')} {navItem('/admin/users', 'Users')}
@@ -71,7 +74,7 @@ export function Layout() {
<Outlet /> <Outlet />
<footer className="mt-[60px] py-8 border-t border-border text-center font-mono text-xs text-text-dim"> <footer className="mt-[60px] py-8 border-t border-border text-center font-mono text-xs text-text-dim">
metamorph · M0 bootstrap · M1 db schema · M2 auth · M3 rbac · M4 mitre · M5 templates · design system from tasks/design.md metamorph · M0 bootstrap · M1 db schema · M2 auth · M3 rbac · M4 mitre · M5 templates · M6 missions · design system from tasks/design.md
</footer> </footer>
</div> </div>
</div> </div>

View File

@@ -0,0 +1,167 @@
/**
* Mission types + query-key factory.
*
* A mission is a *snapshot* of one or more scenario templates: the backend
* copies template fields into mission_* tables at creation time, and template
* edits after that point do not propagate. Types here mirror the server-side
* dataclasses in `app/services/missions.py`.
*/
export type MissionStatus = 'draft' | 'in_progress' | 'completed' | 'archived';
export type MissionRoleHint = 'red' | 'blue';
export type MissionTestState =
| 'pending'
| 'executed'
| 'reviewed_by_blue'
| 'skipped'
| 'blocked';
export type MissionVisibilityMode = 'whitebox' | 'titles_only' | 'executed_only';
export type MissionMitreKind = 'tactic' | 'technique' | 'subtechnique';
export type MissionOpsecLevel = 'low' | 'medium' | 'high';
export interface MissionMember {
user_id: string;
user_email: string;
user_display_name: string | null;
role_hint: MissionRoleHint;
}
export interface MissionMitreTag {
kind: MissionMitreKind;
external_id: string;
name: string;
url: string | null;
}
export interface MissionTest {
id: string;
position: number;
snapshot_name: string;
snapshot_description: string | null;
snapshot_objective: string | null;
snapshot_procedure_md: string | null;
snapshot_prerequisites_md: string | null;
snapshot_expected_red_md: string | null;
snapshot_expected_blue_md: string | null;
snapshot_opsec_level: MissionOpsecLevel;
snapshot_tags: string[];
snapshot_expected_iocs: string[];
state: MissionTestState;
executed_at: string | null;
executed_at_overridden: boolean;
mitre_tags: MissionMitreTag[];
source_test_template_id: string | null;
}
export interface MissionScenario {
id: string;
position: number;
snapshot_name: string;
snapshot_description: string | null;
tests: MissionTest[];
source_scenario_template_id: string | null;
}
export interface MissionListItem {
id: string;
name: string;
client_target: string | null;
date_start: string | null;
date_end: string | null;
status: MissionStatus;
description_md: string | null;
visibility_mode: MissionVisibilityMode;
scenarios_count: number;
tests_count: number;
members_count: number;
deleted_at: string | null;
created_at: string;
updated_at: string;
}
export interface Mission extends MissionListItem {
scenarios: MissionScenario[];
members: MissionMember[];
}
export interface MissionListResponse {
items: MissionListItem[];
total: number;
limit: number;
offset: number;
}
export interface MissionFilters {
q?: string;
status?: MissionStatus | '';
client?: string;
}
export interface MemberPayload {
user_id: string;
role_hint: MissionRoleHint;
}
export interface CreateMissionPayload {
name: string;
client_target?: string | null;
date_start?: string | null;
date_end?: string | null;
description_md?: string | null;
scenario_template_ids?: string[];
members?: MemberPayload[];
}
export interface UpdateMissionPayload {
name?: string;
client_target?: string | null;
date_start?: string | null;
date_end?: string | null;
description_md?: string | null;
}
export interface AddScenariosPayload {
scenario_template_ids: string[];
}
export interface SetMembersPayload {
members: MemberPayload[];
}
export interface TransitionPayload {
status: MissionStatus;
}
export const missionKeys = {
/** Prefix-only key — pass this to `invalidateQueries` to refresh every
* filtered variant. Matching is prefix-based: `['missions','list',{q:'x'}]`
* also gets invalidated.
*/
listPrefix: () => ['missions', 'list'] as const,
list: (filters?: MissionFilters) => ['missions', 'list', filters ?? {}] as const,
detail: (id: string) => ['missions', 'detail', id] as const,
};
export function buildMissionQueryString(filters: MissionFilters | undefined): string {
if (!filters) return '';
const params = new URLSearchParams();
if (filters.q) params.set('q', filters.q);
if (filters.status) params.set('status', filters.status);
if (filters.client) params.set('client', filters.client);
const s = params.toString();
return s ? `?${s}` : '';
}
export const MISSION_STATUS_ACCENT: Record<MissionStatus, 'cyan' | 'orange' | 'green' | 'teal'> = {
draft: 'cyan',
in_progress: 'orange',
completed: 'green',
archived: 'teal',
};
export const MISSION_STATUS_LABEL: Record<MissionStatus, string> = {
draft: 'Draft',
in_progress: 'In Progress',
completed: 'Completed',
archived: 'Archived',
};

View File

@@ -0,0 +1,312 @@
import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query';
import { useState } from 'react';
import { useNavigate, useParams } from 'react-router-dom';
import { Alert } from '@/components/ui/Alert';
import { Button } from '@/components/ui/Button';
import { Card } from '@/components/ui/Card';
import { SectionHeader } from '@/components/ui/SectionHeader';
import { Tag } from '@/components/ui/Tag';
import { ApiError, apiDelete, apiGet, apiPost } from '@/lib/api';
import {
MISSION_STATUS_ACCENT,
MISSION_STATUS_LABEL,
missionKeys,
type Mission,
type MissionStatus,
type TransitionPayload,
} from '@/lib/missions';
const TABS = ['tests', 'members', 'synthesis', 'export'] as const;
type Tab = (typeof TABS)[number];
const ALLOWED_TRANSITIONS: Record<MissionStatus, MissionStatus[]> = {
draft: ['in_progress', 'archived'],
in_progress: ['completed', 'archived'],
completed: ['archived'],
archived: [],
};
const TRANSITION_BUTTON_ACCENT: Record<MissionStatus, 'cyan' | 'orange' | 'green' | 'teal'> = {
draft: 'cyan',
in_progress: 'orange',
completed: 'green',
archived: 'teal',
};
function useMission(id: string) {
return useQuery({
queryKey: missionKeys.detail(id),
queryFn: () => apiGet<Mission>(`/missions/${id}`),
enabled: !!id,
});
}
function formatDateRange(start: string | null, end: string | null): string {
if (!start && !end) return 'No dates set';
if (start && end) return `${start}${end}`;
return start ?? end ?? '';
}
export function MissionDetailPage() {
const params = useParams();
const missionId = params.id ?? '';
const navigate = useNavigate();
const qc = useQueryClient();
const [tab, setTab] = useState<Tab>('tests');
const detail = useMission(missionId);
const transition = useMutation({
mutationFn: (body: TransitionPayload) =>
apiPost<Mission>(`/missions/${missionId}/transition`, body),
onSuccess: () => {
qc.invalidateQueries({ queryKey: missionKeys.detail(missionId) });
qc.invalidateQueries({ queryKey: missionKeys.listPrefix() });
},
});
const remove = useMutation({
mutationFn: () => apiDelete<{ ok: true }>(`/missions/${missionId}`),
onSuccess: () => {
qc.invalidateQueries({ queryKey: missionKeys.listPrefix() });
navigate('/missions');
},
});
const apiErr = detail.error instanceof ApiError ? detail.error : null;
const m = detail.data;
if (apiErr) {
return (
<section>
<SectionHeader prefix="Mission" highlight="Not found" accent="rose" />
<Alert accent="rose">{apiErr.message}</Alert>
</section>
);
}
if (!m) {
return <p className="font-mono text-xs text-text-dim">Loading mission</p>;
}
const accent = MISSION_STATUS_ACCENT[m.status];
const allowedNext = ALLOWED_TRANSITIONS[m.status];
return (
<section data-testid={`mission-detail-${m.id}`}>
<div className="flex items-baseline justify-between flex-wrap gap-3">
<SectionHeader prefix="Mission" highlight={m.name} accent={accent} />
<div className="flex items-center gap-2">
<Tag accent={accent}>{MISSION_STATUS_LABEL[m.status]}</Tag>
{allowedNext.map((target) => (
<Button
key={target}
accent={TRANSITION_BUTTON_ACCENT[target]}
onClick={() => transition.mutate({ status: target })}
data-testid={`mission-transition-${target}`}
disabled={transition.isPending}
>
{MISSION_STATUS_LABEL[target]}
</Button>
))}
<Button
accent="rose"
variant="ghost"
onClick={() => {
if (
window.confirm(
`Soft-delete mission "${m.name}"? An admin can restore from the trash.`,
)
) {
remove.mutate();
}
}}
data-testid="mission-delete"
disabled={remove.isPending}
>
Delete
</Button>
</div>
</div>
<Card className="mb-4">
<dl className="grid grid-cols-2 gap-3 md:grid-cols-4 font-mono text-2xs">
<div>
<dt className="text-text-dim uppercase tracking-wider2">Client</dt>
<dd className="text-text-bright">{m.client_target ?? '—'}</dd>
</div>
<div>
<dt className="text-text-dim uppercase tracking-wider2">Dates</dt>
<dd className="text-text-bright">
{formatDateRange(m.date_start, m.date_end)}
</dd>
</div>
<div>
<dt className="text-text-dim uppercase tracking-wider2">Scenarios</dt>
<dd className="text-text-bright">{m.scenarios_count}</dd>
</div>
<div>
<dt className="text-text-dim uppercase tracking-wider2">Tests</dt>
<dd className="text-text-bright">{m.tests_count}</dd>
</div>
</dl>
{m.description_md && (
<pre className="mt-3 whitespace-pre-wrap font-mono text-xs text-text">{m.description_md}</pre>
)}
</Card>
<nav className="flex gap-1 border-b border-border mb-4" aria-label="Mission tabs">
{TABS.map((t) => (
<button
key={t}
type="button"
onClick={() => setTab(t)}
data-testid={`mission-tab-${t}`}
className={`px-3 py-2 font-mono text-2xs uppercase tracking-wider2 ${
tab === t
? 'text-cyan border-b-2 border-cyan -mb-px'
: 'text-text-dim hover:text-text-bright'
}`}
>
{t}
</button>
))}
</nav>
{tab === 'tests' && (
<Card>
{m.scenarios.length === 0 ? (
<p className="font-mono text-xs text-text-dim">
No scenarios snapshotted yet.
</p>
) : (
<div className="flex flex-col gap-4" data-testid="mission-scenarios">
{m.scenarios.map((sc) => (
<div
key={sc.id}
className="rounded-md border border-border bg-bg-card p-3"
data-testid={`mission-scenario-${sc.id}`}
>
<div className="flex items-center gap-2 mb-2">
<Tag accent="cyan">#{sc.position + 1}</Tag>
<p className="font-mono text-xs text-text-bright">
{sc.snapshot_name}
</p>
</div>
{sc.snapshot_description && (
<p className="mb-2 font-mono text-2xs text-text-dim">
{sc.snapshot_description}
</p>
)}
<table className="w-full font-mono text-2xs">
<thead>
<tr className="text-text-dim uppercase tracking-wider2">
<th className="text-left py-1">#</th>
<th className="text-left py-1">Test</th>
<th className="text-left py-1">MITRE</th>
<th className="text-left py-1">OPSEC</th>
<th className="text-left py-1">State</th>
</tr>
</thead>
<tbody>
{sc.tests.map((t) => (
<tr
key={t.id}
className="border-t border-border/40"
data-testid={`mission-test-${t.id}`}
>
<td className="py-1 text-text-dim">{t.position + 1}</td>
<td className="py-1 text-text-bright">{t.snapshot_name}</td>
<td className="py-1">
<div className="flex flex-wrap gap-1">
{t.mitre_tags.map((tag) => (
<Tag
accent="cyan"
key={`${tag.kind}-${tag.external_id}`}
>
{tag.external_id}
</Tag>
))}
</div>
</td>
<td className="py-1 text-text">
{t.snapshot_opsec_level}
</td>
<td className="py-1">
<Tag
accent={
t.state === 'pending'
? 'teal'
: t.state === 'executed'
? 'orange'
: t.state === 'reviewed_by_blue'
? 'green'
: 'rose'
}
>
{t.state}
</Tag>
</td>
</tr>
))}
</tbody>
</table>
</div>
))}
</div>
)}
</Card>
)}
{tab === 'members' && (
<Card>
{m.members.length === 0 ? (
<p className="font-mono text-xs text-text-dim">
No members assigned. An admin can add them via the API.
</p>
) : (
<ul className="flex flex-col gap-2" data-testid="mission-members">
{m.members.map((mb) => (
<li
key={mb.user_id}
className="flex items-center justify-between rounded-md border border-border bg-bg-card p-3"
data-testid={`mission-member-${mb.user_id}`}
>
<div>
<p className="font-mono text-xs text-text-bright">
{mb.user_display_name ?? mb.user_email}
</p>
{mb.user_display_name && (
<p className="font-mono text-2xs text-text-dim">
{mb.user_email}
</p>
)}
</div>
<Tag accent={mb.role_hint === 'red' ? 'red' : 'cyan'}>
{mb.role_hint}
</Tag>
</li>
))}
</ul>
)}
</Card>
)}
{tab === 'synthesis' && (
<Card>
<p className="font-mono text-xs text-text-dim">
Reveal.js slide synthesis lands in M10.
</p>
</Card>
)}
{tab === 'export' && (
<Card>
<p className="font-mono text-xs text-text-dim">
JSON / CSV exports land in M11.
</p>
</Card>
)}
</section>
);
}

View File

@@ -0,0 +1,404 @@
import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query';
import { useMemo, useState } from 'react';
import { useNavigate } from 'react-router-dom';
import { MarkdownField } from '@/components/MarkdownField';
import { Alert } from '@/components/ui/Alert';
import { Button } from '@/components/ui/Button';
import { Card } from '@/components/ui/Card';
import { SectionHeader } from '@/components/ui/SectionHeader';
import { Tag } from '@/components/ui/Tag';
import { TextField } from '@/components/ui/TextField';
import { ApiError, apiGet, apiPost } from '@/lib/api';
import { useAuth } from '@/lib/auth';
import {
missionKeys,
type CreateMissionPayload,
type Mission,
type MissionRoleHint,
} from '@/lib/missions';
import {
templateKeys,
type ScenarioTemplate,
type ScenarioTemplateListResponse,
} from '@/lib/templates';
interface RosterUser {
id: string;
email: string;
display_name: string | null;
}
interface RosterResponse {
items: RosterUser[];
}
interface MetaState {
name: string;
client_target: string;
date_start: string;
date_end: string;
description_md: string;
}
interface MemberSelection {
user_id: string;
role_hint: MissionRoleHint;
}
const STEPS: Array<{ key: 'meta' | 'scenarios' | 'members'; label: string }> = [
{ key: 'meta', label: 'Metadata' },
{ key: 'scenarios', label: 'Scenarios' },
{ key: 'members', label: 'Members' },
];
function blankMeta(): MetaState {
return {
name: '',
client_target: '',
date_start: '',
date_end: '',
description_md: '',
};
}
function useScenarioCatalogue() {
return useQuery({
queryKey: templateKeys.scenarios(''),
queryFn: () =>
apiGet<ScenarioTemplateListResponse>('/scenario-templates?limit=500'),
});
}
function useRoster() {
return useQuery({
queryKey: ['users', 'roster'],
queryFn: () => apiGet<RosterResponse>('/users/roster'),
});
}
export function MissionsCreatePage() {
const { state } = useAuth();
const me = state.user;
const navigate = useNavigate();
const qc = useQueryClient();
const [stepIdx, setStepIdx] = useState(0);
const step = STEPS[stepIdx];
const [meta, setMeta] = useState<MetaState>(blankMeta);
const [scenarioIds, setScenarioIds] = useState<string[]>([]);
const [members, setMembers] = useState<MemberSelection[]>(() =>
me && !me.is_admin
? [{ user_id: me.id, role_hint: 'red' }]
: [],
);
const scenarios = useScenarioCatalogue();
const roster = useRoster();
const createMutation = useMutation({
mutationFn: (body: CreateMissionPayload) => apiPost<Mission>('/missions', body),
onSuccess: (created) => {
qc.invalidateQueries({ queryKey: missionKeys.listPrefix() });
navigate(`/missions/${created.id}`);
},
});
const apiErr =
createMutation.error instanceof ApiError ? createMutation.error : null;
const metaInvalid = meta.name.trim().length === 0;
const datesInvalid =
meta.date_start &&
meta.date_end &&
meta.date_end < meta.date_start;
const scenarioById = useMemo(() => {
const m = new Map<string, ScenarioTemplate>();
for (const sc of scenarios.data?.items ?? []) m.set(sc.id, sc);
return m;
}, [scenarios.data]);
const next = () => setStepIdx((i) => Math.min(i + 1, STEPS.length - 1));
const prev = () => setStepIdx((i) => Math.max(i - 1, 0));
function toggleScenario(id: string) {
setScenarioIds((prev) =>
prev.includes(id) ? prev.filter((x) => x !== id) : [...prev, id],
);
}
function setMemberRole(user_id: string, role_hint: MissionRoleHint) {
setMembers((prev) =>
prev.some((m) => m.user_id === user_id)
? prev.map((m) => (m.user_id === user_id ? { ...m, role_hint } : m))
: [...prev, { user_id, role_hint }],
);
}
function removeMember(user_id: string) {
setMembers((prev) => prev.filter((m) => m.user_id !== user_id));
}
function submit() {
const payload: CreateMissionPayload = {
name: meta.name.trim(),
client_target: meta.client_target.trim() || null,
date_start: meta.date_start || null,
date_end: meta.date_end || null,
description_md: meta.description_md.trim() || null,
scenario_template_ids: scenarioIds,
members,
};
createMutation.mutate(payload);
}
const totalSelectedTests = useMemo(
() =>
scenarioIds.reduce(
(acc, id) => acc + (scenarioById.get(id)?.tests_count ?? 0),
0,
),
[scenarioIds, scenarioById],
);
return (
<section data-testid="missions-create">
<SectionHeader prefix="New" highlight="Mission" accent="cyan" />
<Card className="mb-6">
<ol className="flex items-center gap-2" data-testid="missions-create-steps">
{STEPS.map((s, i) => {
const active = i === stepIdx;
const done = i < stepIdx;
const accent: 'cyan' | 'green' | 'teal' = active
? 'cyan'
: done
? 'green'
: 'teal';
return (
<li key={s.key} className="flex items-center gap-2">
<Tag accent={accent}>{i + 1}. {s.label}</Tag>
{i < STEPS.length - 1 && (
<span className="text-text-dim font-mono"></span>
)}
</li>
);
})}
</ol>
</Card>
{apiErr && (
<div data-testid="missions-create-error" className="mb-4">
<Alert accent="red">{apiErr.message}</Alert>
</div>
)}
{step.key === 'meta' && (
<Card title="Metadata" sub="Identification and scope">
<div className="grid grid-cols-1 gap-3 md:grid-cols-2">
<TextField
label="Name"
required
value={meta.name}
onChange={(e) => setMeta((p) => ({ ...p, name: e.target.value }))}
data-testid="meta-name"
placeholder="purple-q2-2026"
/>
<TextField
label="Client / target"
value={meta.client_target}
onChange={(e) =>
setMeta((p) => ({ ...p, client_target: e.target.value }))
}
data-testid="meta-client"
placeholder="Acme Corp"
/>
<TextField
label="Start date"
type="date"
value={meta.date_start}
onChange={(e) =>
setMeta((p) => ({ ...p, date_start: e.target.value }))
}
data-testid="meta-date-start"
/>
<TextField
label="End date"
type="date"
value={meta.date_end}
onChange={(e) =>
setMeta((p) => ({ ...p, date_end: e.target.value }))
}
data-testid="meta-date-end"
/>
</div>
<div className="mt-4">
<MarkdownField
label="ROE / Description"
value={meta.description_md}
onChange={(v) =>
setMeta((p) => ({ ...p, description_md: v }))
}
data-testid="meta-description"
/>
</div>
{datesInvalid && (
<p className="mt-3 font-mono text-2xs text-red" data-testid="meta-date-error">
End date must be on or after start date.
</p>
)}
</Card>
)}
{step.key === 'scenarios' && (
<Card
title="Scenarios"
sub={`Select reusable scenarios — ${totalSelectedTests} tests will be snapshotted`}
>
{scenarios.isError && (
<Alert accent="red">Failed to load scenarios.</Alert>
)}
{scenarios.isLoading && (
<p className="font-mono text-xs text-text-dim">Loading</p>
)}
<ul
className="grid grid-cols-1 gap-2 md:grid-cols-2"
data-testid="scenarios-picker"
>
{scenarios.data?.items.map((sc) => {
const selected = scenarioIds.includes(sc.id);
return (
<li key={sc.id}>
<button
type="button"
className={`w-full rounded-md border ${
selected ? 'border-cyan text-cyan' : 'border-border text-text'
} bg-bg-card p-3 text-left font-mono text-xs hover:border-cyan`}
onClick={() => toggleScenario(sc.id)}
data-testid={`scenario-toggle-${sc.id}`}
aria-pressed={selected}
>
<div className="flex items-center justify-between">
<span className="text-text-bright">{sc.name}</span>
<Tag accent="purple">{sc.tests_count} tests</Tag>
</div>
{sc.description && (
<p className="mt-1 text-text-dim">{sc.description}</p>
)}
</button>
</li>
);
})}
</ul>
{scenarios.data && scenarios.data.items.length === 0 && (
<p className="font-mono text-xs text-text-dim">
No scenarios in the catalogue yet create one in
{' '}<a href="/admin/scenarios" className="text-cyan underline">Admin Scenarios</a>{' '}
first.
</p>
)}
</Card>
)}
{step.key === 'members' && (
<Card title="Members" sub="Who works on this mission and on which side">
{roster.isError && (
<Alert accent="red">Failed to load roster.</Alert>
)}
{roster.isLoading && (
<p className="font-mono text-xs text-text-dim">Loading users</p>
)}
<ul
className="flex flex-col gap-2"
data-testid="members-picker"
>
{roster.data?.items.map((u) => {
const selected = members.find((m) => m.user_id === u.id);
return (
<li
key={u.id}
className="flex flex-wrap items-center justify-between gap-2 rounded-md border border-border bg-bg-card p-3"
data-testid={`member-row-${u.id}`}
>
<div>
<p className="font-mono text-xs text-text-bright">
{u.display_name ?? u.email}
</p>
{u.display_name && (
<p className="font-mono text-2xs text-text-dim">{u.email}</p>
)}
</div>
<div className="flex items-center gap-2">
<Button
accent="red"
variant={selected?.role_hint === 'red' ? 'solid' : 'outline'}
onClick={() => setMemberRole(u.id, 'red')}
data-testid={`member-${u.id}-red`}
>
Red
</Button>
<Button
accent="cyan"
variant={selected?.role_hint === 'blue' ? 'solid' : 'outline'}
onClick={() => setMemberRole(u.id, 'blue')}
data-testid={`member-${u.id}-blue`}
>
Blue
</Button>
{selected && (
<Button
accent="rose"
variant="ghost"
onClick={() => removeMember(u.id)}
data-testid={`member-${u.id}-clear`}
>
</Button>
)}
</div>
</li>
);
})}
</ul>
</Card>
)}
<div className="mt-6 flex items-center justify-between">
<Button
accent="teal"
variant="ghost"
onClick={prev}
disabled={stepIdx === 0}
data-testid="missions-create-prev"
>
Back
</Button>
{stepIdx < STEPS.length - 1 ? (
<Button
accent="cyan"
onClick={next}
disabled={step.key === 'meta' && (metaInvalid || !!datesInvalid)}
data-testid="missions-create-next"
>
Next
</Button>
) : (
<Button
accent="green"
onClick={submit}
disabled={
createMutation.isPending ||
metaInvalid ||
!!datesInvalid
}
data-testid="missions-create-submit"
>
{createMutation.isPending ? 'Creating…' : 'Create mission'}
</Button>
)}
</div>
</section>
);
}

View File

@@ -0,0 +1,167 @@
import { useQuery } from '@tanstack/react-query';
import { useMemo, useState } from 'react';
import { Link } from 'react-router-dom';
import { Alert } from '@/components/ui/Alert';
import { Button } from '@/components/ui/Button';
import { Card } from '@/components/ui/Card';
import { SectionHeader } from '@/components/ui/SectionHeader';
import { Tag } from '@/components/ui/Tag';
import { TextField } from '@/components/ui/TextField';
import { ApiError, apiGet } from '@/lib/api';
import { useAuth } from '@/lib/auth';
import {
MISSION_STATUS_ACCENT,
MISSION_STATUS_LABEL,
buildMissionQueryString,
missionKeys,
type MissionFilters,
type MissionListResponse,
type MissionStatus,
} from '@/lib/missions';
const STATUS_OPTIONS: Array<{ value: '' | MissionStatus; label: string }> = [
{ value: '', label: 'All statuses' },
{ value: 'draft', label: MISSION_STATUS_LABEL.draft },
{ value: 'in_progress', label: MISSION_STATUS_LABEL.in_progress },
{ value: 'completed', label: MISSION_STATUS_LABEL.completed },
{ value: 'archived', label: MISSION_STATUS_LABEL.archived },
];
function useMissions(filters: MissionFilters) {
return useQuery({
queryKey: missionKeys.list(filters),
queryFn: () =>
apiGet<MissionListResponse>(`/missions${buildMissionQueryString(filters)}`),
});
}
function formatDateRange(start: string | null, end: string | null): string {
if (!start && !end) return '—';
if (start && end) return `${start}${end}`;
return start ?? end ?? '—';
}
export function MissionsListPage() {
const { state } = useAuth();
const canCreate =
state.user?.is_admin || state.user?.permissions.includes('mission.create');
const [q, setQ] = useState('');
const [status, setStatus] = useState<'' | MissionStatus>('');
const [client, setClient] = useState('');
const filters = useMemo<MissionFilters>(
() => ({
q: q.trim() || undefined,
status: status || undefined,
client: client.trim() || undefined,
}),
[q, status, client],
);
const { data, error, isLoading } = useMissions(filters);
const apiErr = error instanceof ApiError ? error : null;
return (
<section data-testid="missions-list">
<div className="flex items-baseline justify-between">
<SectionHeader prefix="Plan" highlight="Missions" accent="cyan" />
{canCreate && (
<Link to="/missions/new" data-testid="missions-new-link">
<Button accent="cyan">+ New mission</Button>
</Link>
)}
</div>
<Card className="mb-6">
<div className="grid grid-cols-1 gap-3 md:grid-cols-3">
<TextField
label="Search"
placeholder="name or description"
value={q}
onChange={(e) => setQ(e.target.value)}
data-testid="missions-filter-q"
/>
<TextField
label="Client"
placeholder="acme corp"
value={client}
onChange={(e) => setClient(e.target.value)}
data-testid="missions-filter-client"
/>
<div className="flex flex-col gap-1">
<label className="font-mono text-2xs uppercase tracking-wider2 text-text-dim">
Status
</label>
<select
value={status}
onChange={(e) => setStatus(e.target.value as '' | MissionStatus)}
data-testid="missions-filter-status"
className="bg-bg-card border border-border rounded px-3 py-2 font-mono text-xs text-text-bright"
>
{STATUS_OPTIONS.map((opt) => (
<option key={opt.value} value={opt.value}>
{opt.label}
</option>
))}
</select>
</div>
</div>
</Card>
{apiErr && (
<div data-testid="missions-error">
<Alert accent="red">{apiErr.message}</Alert>
</div>
)}
{isLoading && (
<p className="font-mono text-xs text-text-dim">Loading missions</p>
)}
{data && data.items.length === 0 && !isLoading && (
<Card>
<p className="font-mono text-xs text-text-dim" data-testid="missions-empty">
No missions match the filters. {canCreate ? 'Create one to get started.' : ''}
</p>
</Card>
)}
<div className="grid grid-cols-1 gap-3 lg:grid-cols-2" data-testid="missions-grid">
{data?.items.map((m) => {
const accent = MISSION_STATUS_ACCENT[m.status];
return (
<Link key={m.id} to={`/missions/${m.id}`} data-testid={`mission-card-${m.id}`}>
<Card
accent={accent}
title={m.name}
sub={m.client_target ?? 'No client'}
className="h-full"
>
<div className="flex flex-wrap items-center gap-2">
<Tag accent={accent}>{MISSION_STATUS_LABEL[m.status]}</Tag>
<Tag accent="cyan">{m.scenarios_count} scenarios</Tag>
<Tag accent="purple">{m.tests_count} tests</Tag>
<Tag accent="teal">{m.members_count} members</Tag>
</div>
<p className="mt-3 font-mono text-2xs text-text-dim">
{formatDateRange(m.date_start, m.date_end)}
</p>
</Card>
</Link>
);
})}
</div>
{data && (
<p
className="mt-4 font-mono text-2xs text-text-dim"
data-testid="missions-total"
>
{data.total} mission{data.total === 1 ? '' : 's'} total
</p>
)}
</section>
);
}

View File

@@ -78,6 +78,17 @@ project: Metamorph
- **`podman compose stop api` puis `up -d api` casse les dépendances** entre containers (`db` healthy → `api` depends on it) : podman-compose ne résout pas la chaîne de deps quand on cible un seul service. Pour un override d'env, mieux vaut `make down && APP_ENV=test make up`. - **`podman compose stop api` puis `up -d api` casse les dépendances** entre containers (`db` healthy → `api` depends on it) : podman-compose ne résout pas la chaîne de deps quand on cible un seul service. Pour un override d'env, mieux vaut `make down && APP_ENV=test make up`.
- **`/diag/reset` test-only** : exposer un endpoint qui truncate la DB est tentant pour les e2e mais ouvre une grosse surface en cas de fuite. Compromise actuel : autorisé en `dev` ET `test` (pas en prod), avec un log `WARNING` à chaque appel. Si jamais on déploie une stack dev publique, **désactiver** l'endpoint via env var. - **`/diag/reset` test-only** : exposer un endpoint qui truncate la DB est tentant pour les e2e mais ouvre une grosse surface en cas de fuite. Compromise actuel : autorisé en `dev` ET `test` (pas en prod), avec un log `WARNING` à chaque appel. Si jamais on déploie une stack dev publique, **désactiver** l'endpoint via env var.
## 2026-05-13 — M6 missions + snapshot
- **Snapshot independence requires more than column copies — denormalise the join tables too.** `mission_tests` copies every scalar template field, but if `mission_test_mitre_tags` kept FKs to `mitre_*` rows, a future re-sync that drops a technique would cascade through `ON DELETE CASCADE` and silently mutate frozen missions. The M1 schema already split `mission_test_mitre_tags` with frozen `(mitre_external_id, mitre_name, mitre_url)` columns and no FK — at snapshot time we denormalise via a 3-query batch lookup (`_resolve_mitre_lookup`) and build the rows in-memory. Pattern to reuse for any "frozen reference" relationship in the future.
- **`/diag/reset` truncate order is FK-aware**: `mission_scenarios.source_scenario_template_id` and `mission_tests.source_test_template_id` are `ON DELETE SET NULL`. Truncating template tables first would force PG to NULL those columns one by one. Reverse the order — wipe mission tables (which cascade to members/scenarios/tests/tags/categories from `missions`) BEFORE the templates. Saves a round-trip + keeps the truncate logically aligned with the dependency graph.
- **Membership visibility = 404, not 403.** Returning 403 for "mission exists but you're not a member" leaks the existence of the mission. The service returns 404 in both "doesn't exist" and "not visible to you" cases via the same `MissionNotFound` exception. The decorator stack handles perm-level 403 (you can't even GET /missions); the service handles row-level 404. Pattern: gate "type of action" via decorator perms, gate "which rows" via service-level membership filters that collapse to 404.
- **Auto-add the non-admin creator as a member.** Without this, a redteamer who creates a mission and forgets to add themselves to `members[]` immediately loses visibility (403 on subsequent GETs because they're not a member). Solved at the service layer: `if not creator_is_admin and creator_id not in members: prepend (creator_id, 'red')`. Admin creators don't auto-add because they bypass membership anyway. Documented in the docstring + tested explicitly (`test_non_admin_creator_auto_added`).
- **Minimum-surface roster endpoint pattern**: `/users` returns admin metadata (is_admin via groups, is_active, group memberships). The mission wizard needs a list of assignable users from a non-admin redteamer's perspective — exposing /users to them would leak admin metadata. Added a dedicated `GET /users/roster` returning only `(id, email, display_name)` and gated by **any of** `user.read`, `mission.create`, `mission.update`. Pattern: when a cross-feature needs a smaller slice of an admin endpoint, create a dedicated lightweight endpoint rather than relaxing the admin one.
- **Pyright is not always wrong about "unused" parameters** — the original `_to_list_item(s: Session, m: Mission)` took `s` but never accessed it (the function uses already-`selectinload`ed relationships). Removed the param. Lesson: when adding a `Session` parameter to a view-assembly helper, audit whether the body actually issues queries through it.
- **`flask.abort()` is not typed `NoReturn` in this project's Pyright config** so `def f() -> X: if x is None: abort(...); return x` raises a return-type error. Workaround: add `assert user is not None` after the abort to narrow the type. Cleaner than `cast(...)`. Pattern to reuse anywhere we abort-and-return.
- **Snapshot of multiple scenarios is a 4-query write** regardless of test count: (1) load N scenario_templates with their join rows, (2) load M test_templates by id with mitre_tags, (3) batch-resolve MITRE rows (3 queries for tactic/technique/sub), (4) insert mission_scenarios + mission_tests + mission_test_mitre_tags via the SQLAlchemy unit of work. Avoid the temptation to query inside per-test loops — it explodes to O(scenarios × tests × tag_kinds) easily.
## 2026-05-12 — M5 templates + scenarios ## 2026-05-12 — M5 templates + scenarios
- **`extra={"name": ...}` dans `log.info()` crash silencieusement** — Python's `logging.LogRecord` réserve `name` (le logger name). Coût : 500 sur le POST, message peu parlant (`KeyError: "Attempt to overwrite 'name' in LogRecord"`). Fix : renommer la clé (`template_name`). Liste réservée à éviter : `name`, `msg`, `args`, `levelname`, `levelno`, `pathname`, `filename`, `module`, `funcName`, `created`, `msecs`, `lineno`, `thread`, `threadName`, `process`. Pattern : préfixer les clés extra par l'entité (`template_name`, `group_id`, `user_id` est OK mais `id` aussi est piégeux dans certains setups). - **`extra={"name": ...}` dans `log.info()` crash silencieusement** — Python's `logging.LogRecord` réserve `name` (le logger name). Coût : 500 sur le POST, message peu parlant (`KeyError: "Attempt to overwrite 'name' in LogRecord"`). Fix : renommer la clé (`template_name`). Liste réservée à éviter : `name`, `msg`, `args`, `levelname`, `levelno`, `pathname`, `filename`, `module`, `funcName`, `created`, `msecs`, `lineno`, `thread`, `threadName`, `process`. Pattern : préfixer les clés extra par l'entité (`template_name`, `group_id`, `user_id` est OK mais `id` aussi est piégeux dans certains setups).

124
tasks/testing-m6.md Normal file
View File

@@ -0,0 +1,124 @@
---
type: testing
milestone: M6
date: "2026-05-13"
project: Metamorph
---
# Testing M6 — Missions & snapshot
## 1. Lancement de la stack
```bash
make clean
make up
make migrate
make seed-mitre # MITRE tags are snapshotted onto mission_tests; without them
# the snapshot will simply have an empty mitre_tags array
```
> L'admin stable `admin@metamorph.local / AdminPass1234!` est restauré
> automatiquement par le hook `afterAll` du spec e2e M6, mais la 1ʳᵉ fois,
> bootstrappe-le via `/setup` (ou laisse les tests faire le travail).
## 2. Tests automatisés
```bash
make test-api # 103 tests pytest dont 22 M6 (snapshot, membership, transitions, members CRUD, perm gating)
make e2e # 43 tests Playwright dont 5 M6 (snapshot freezing, non-admin visibility, transitions, wizard, list filter)
```
Rapport HTML : `e2e/playwright-report/`.
## 3. Smoke navigateur
### Pré-requis
- Stack `make up` + admin loggé.
- MITRE seedé (`/mitre` montre 15 tactics).
- Au moins **1 test_template** et **1 scenario_template** dans le catalogue M5
(pour avoir quelque chose à snapshotter).
### 3.1 Liste & création (`/missions`)
1. Cliquer **Missions** dans la nav (visible si tu as la perm `mission.read` ou tu es admin) → la liste s'affiche avec un message vide la 1ʳᵉ fois.
2. Cliquer **+ New mission** → page wizard `/missions/new`.
3. **Étape 1 — Metadata** :
- `Name` (requis) → `purple-2026-Q2`
- `Client / target``Acme Corp`
- `Start date` / `End date` → si tu inverses, un message en rouge apparaît et **Next** est désactivé.
- `ROE / Description` (markdown) → optionnel.
4. **Next****Étape 2 — Scenarios** :
- Le catalogue M5 s'affiche en grille de boutons. Cliquer un scénario le sélectionne (bordure cyan).
- Le sous-titre du Card affiche le total de tests qui seront snapshotés.
5. **Next****Étape 3 — Members** :
- Le roster (issu de `/users/roster`) liste les utilisateurs actifs.
- Pour chaque user, deux boutons **Red** / **Blue** togglent l'inclusion + le rôle. ✕ retire.
- Si tu es un redteamer non-admin, tu es pré-sélectionné en `red` (auto-add côté backend si tu oublies).
6. **Create mission** → redirection vers `/missions/<id>`. La nouvelle mission apparaît en haut de la liste après retour.
### 3.2 Filtres (`/missions`)
- **Search** : full-text sur `name` / `description_md`.
- **Client** : LIKE sur `client_target`.
- **Status** : select draft / in_progress / completed / archived.
- Les filtres sont combinés en AND (ex : `status=in_progress & client=acme`).
### 3.3 Page détail (`/missions/<id>`)
1. En-tête : nom + status pill + boutons de transition.
- **draft** → boutons `→ In Progress` et `→ Archived`.
- **in_progress** → `→ Completed` et `→ Archived`.
- **completed** → `→ Archived` uniquement.
- **archived** → aucun bouton.
2. Cliquer un bouton → status update immédiat (cache invalidé, badge re-rendu).
3. **Delete** (en rose) → confirm prompt → soft-delete → redirige vers `/missions`. Réapparait via `?include_deleted=true` (admin only).
4. **Tabs** :
- **tests** : tableau par scénario avec `# | Test | MITRE | OPSEC | State`. Les MITRE chips affichent l'external_id frozen.
- **members** : pills Red/Blue avec email + display_name.
- **synthesis** : placeholder « lands in M10 ».
- **export** : placeholder « lands in M11 ».
## 4. Vérification du snapshot (DoD)
1. Crée une mission qui référence un scenario_template `sc1` contenant `test_template_t1`.
2. Aller dans `/admin/tests`, éditer `test_template_t1` : changer le nom et les tags MITRE.
3. Retour sur `/missions/<id>` (rafraîchir si la cache TanStack tient encore) → la table montre **toujours** l'ancien nom et l'ancien tag MITRE. Le snapshot est gelé. ✅
## 5. Vérification visibilité par membership
1. Login en admin, créer 2 missions :
- `m-only-admin` sans aucun membre.
- `m-shared` avec Alice (red) en membre.
2. Login en Alice.
3. `/missions` → seule `m-shared` apparaît dans la liste. `GET /api/v1/missions/<m-only-admin>` retourne **404** (pas 403 — pas de fuite d'existence).
4. Alice tente de PUT/transition/delete sur `m-only-admin` → 404 idem.
## 6. Vérification transitions
| from | to | result |
|-------------|---------------|--------|
| draft | in_progress | 200 |
| draft | archived | 200 |
| draft | completed | **409 invalid_transition** |
| in_progress | completed | 200 |
| in_progress | archived | 200 |
| completed | archived | 200 |
| completed | in_progress | **409** |
| archived | (anything) | **409** |
| any | (same status) | 200 (no-op) |
```bash
curl -X POST -H "Authorization: Bearer $T" -H 'Content-Type: application/json' \
-d '{"status":"completed"}' \
http://localhost:8080/api/v1/missions/<id>/transition
```
## 7. Quick teardown
```bash
make down
# ou pour un reset complet :
curl -X POST http://localhost:8080/api/v1/diag/reset # test-only, wipes everything
```
> Reminder: `make test-api` and `make e2e` **share the dev DB container** —
> running them mid-session WILL wipe user data. The M6 spec's `afterAll`
> restores the stable admin and re-seeds MITRE, but custom templates / missions
> you've created by hand are lost. Cf. `tasks/lessons.md` (M5 lessons section).

View File

@@ -133,7 +133,7 @@ spec: tasks/spec.md
--- ---
## M6 — Missions & snapshot ## M6 — Missions & snapshot
**But** : transformer les templates en missions vivantes. **But** : transformer les templates en missions vivantes.