From ed70458d8fb429bf2d92798ef6e478824c96f1e9 Mon Sep 17 00:00:00 2001 From: Knacky Date: Thu, 14 May 2026 08:16:48 +0200 Subject: [PATCH] =?UTF-8?q?feat(m7):=20per-test=20execution=20=E2=80=94=20?= =?UTF-8?q?red/blue=20zones,=20evidence=20pipeline,=20activity=20poll?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DoD M7 (spec §F5 + §F6 + §F8 + tasks/todo.md M7) covered end-to-end: Backend - New migration `91a4e7c6d2f3` adds `mission_tests.last_actor_id` (FK users ON DELETE SET NULL) and `ix_mission_tests_updated_at` for the polling query. - `detection_levels`: 4 default rows seeded at boot, `GET /detection-levels` read-only (CRUD lands in M8). - `mission_tests` service + `missions` API extension: - `GET /missions/{id}/tests/{test_id}` — full detail incl. evidence list - `PUT /missions/{id}/tests/{test_id}` — patch red/blue fields with per-field perm classification (`mission.write_red_fields` vs `mission.write_blue_fields`) - `POST /missions/{id}/tests/{test_id}/transition` — pending↔skipped/blocked and pending→executed→reviewed_by_blue (+ undo paths), side-aware perm gate that fires *before* idempotency, `executed_at` auto-stamped on the way in - `GET /missions/{id}/activity?since=` — drives the 15 s polling badge - `evidence` service + top-level `/evidence/` API: - Streaming upload, SHA256 chunk-by-chunk, 25 MB cap, ext+MIME whitelist - Content-addressed storage at ${EVIDENCE_DIR}/// - Atomic `os.replace`, hex-validated SHA path component, root-dir guard - Membership-aware (404 on miss/forbidden, no existence leak) - `/diag/reset` now wipes ${EVIDENCE_DIR}/* in test mode (symlink-safe) and re-seeds detection levels as a safety net. Frontend - `lib/missions.ts` — M7 types + queryKey factory + state-machine matrix. - `pages/MissionTestPage.tsx` — two-zone layout: red border (command, output, comment, mark-executed + override toggle) and cyan border (detection-level select, comment, drag-and-drop evidence dropzone). Last-touched badge polls /activity every 15 s, gated on document.visibilityState. Per-field disable based on the user's red/blue perms (server stays the arbiter). - `pages/MissionDetailPage.tsx` — test rows link to the new per-test page. - `App.tsx` — registers /missions/:id/tests/:testId behind RequireAuth. - `HomePage.tsx` — hero + roadmap card bumped to M7; next is M8. Tests - `backend/tests/test_mission_tests.py` — 27 pytest tests (red/blue field gating, state-machine matrix incl. idempotent-side enforcement, executed_at override, 24/26 MB upload + SHA256, MIME/ext whitelist, soft-delete hide, activity polling with URL-encoded `since`, membership 404 vs admin bypass, cross-mission evidence access). - `e2e/tests/m7-execution.spec.ts` — 5 Playwright tests against the live stack (red-only/blue-only API gating, mark-executed + reviewed_by_blue side enforcement, 24 MB/26 MB upload + SHA256 round-trip, SPA per-test page save + transition, non-member 404 message). afterAll restores stable admin and re-syncs MITRE. Docs - CHANGELOG.md: M7 section + post-M7 review-pass subsection. - README.md: status, feature blurb, roadmap, testing-m7 link. - tasks/testing-m7.md: manual + automated procedure with transition matrix and perm-gating table. - tasks/lessons.md: M7 retrospectives (LogRecord `created` trap, URL-encoded query timestamps, perm-before-flush, atomic move, polling visibility gate). Test count: 133 pytest / 49 Playwright, all green. Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 34 + README.md | 7 +- ...91a4e7c6d2f3_m7_mission_test_last_actor.py | 53 ++ backend/app/api/detection_levels.py | 37 + backend/app/api/diag.py | 32 + backend/app/api/evidence.py | 123 +++ backend/app/api/missions.py | 336 ++++++- backend/app/api/v1.py | 4 + backend/app/main.py | 2 + backend/app/models/mission.py | 12 + backend/app/services/detection_levels.py | 140 +++ backend/app/services/evidence.py | 391 ++++++++ backend/app/services/mission_tests.py | 668 +++++++++++++ backend/tests/test_mission_tests.py | 884 ++++++++++++++++++ e2e/tests/m7-execution.spec.ts | 439 +++++++++ frontend/src/App.tsx | 9 + frontend/src/lib/missions.ts | 131 +++ frontend/src/pages/HomePage.tsx | 6 +- frontend/src/pages/MissionDetailPage.tsx | 14 +- frontend/src/pages/MissionTestPage.tsx | 750 +++++++++++++++ tasks/lessons.md | 13 + tasks/testing-m7.md | 189 ++++ tasks/todo.md | 18 +- 23 files changed, 4273 insertions(+), 19 deletions(-) create mode 100644 backend/alembic/versions/20260514_1000_91a4e7c6d2f3_m7_mission_test_last_actor.py create mode 100644 backend/app/api/detection_levels.py create mode 100644 backend/app/api/evidence.py create mode 100644 backend/app/services/detection_levels.py create mode 100644 backend/app/services/evidence.py create mode 100644 backend/app/services/mission_tests.py create mode 100644 backend/tests/test_mission_tests.py create mode 100644 e2e/tests/m7-execution.spec.ts create mode 100644 frontend/src/pages/MissionTestPage.tsx create mode 100644 tasks/testing-m7.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f653f3..fae7d2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,40 @@ All notable changes to this project will be documented here. Format: [Keep a Cha ## [Unreleased] +### Fixed (post-M7 review pass — spec-reviewer + code-reviewer) +- **Idempotent transition leaked false success to a wrong-side user** (`backend/app/services/mission_tests.py:570`): a blue-only viewer POSTing `target_state="executed"` while the test was already executed got a 200 idempotent response, falsely advertising that they held `mission.write_red_fields`. Reordered the gate so the side-perm check runs *before* the idempotency short-circuit, with a new `_IDEMPOTENT_SIDE` table that asks "which side originally produced this state?" — re-asserting that perm even on no-op replays. Test `test_idempotent_transition_still_checks_side_perm`. +- **Cross-mission evidence access not pinned by a test** (`backend/tests/test_mission_tests.py:test_evidence_member_of_other_mission_gets_404`): added explicit coverage that a user who is a blue member of mission B sees 404 on an evidence row attached to mission A. The chain walk in `_resolve_evidence_chain` already enforced this, but the regression test was missing. +- **`shutil.move` swapped for `os.replace`** (`backend/app/services/evidence.py:240`): `os.replace` is the documented atomic-rename primitive on POSIX and Windows when src/dst share a volume — and our tmpfile is always staged inside the destination directory, so the guarantee holds. Removes the implicit copy+remove fallback from `shutil.move` that would silently break atomicity on a cross-fs `EVIDENCE_DIR`. +- **SHA256 path component now hex-validated** (`backend/app/services/evidence.py:227`): the hash always comes from hashlib so it's already hex, but if a future caller ever passes pre-computed bytes we want to fail loudly rather than write to a path like `..something.evtx`. Cheap `re.fullmatch(r"[0-9a-f]{64}", sha256)` guard. +- **`EVIDENCE_DIR` filesystem-root guard** (`backend/app/services/evidence.py:_test_dir`): refuse to create per-mission directories when `EVIDENCE_DIR` resolves to `/` (or the equivalent on Windows). Stops a mis-configured operator from laying down content-addressed evidence files at the filesystem root. +- **`/diag/reset` evidence cleanup now skips symlinks** (`backend/app/api/diag.py:127`): switched from `is_dir()` to `is_symlink() or not is_dir()` so a hostile or accidental symlink inside `EVIDENCE_DIR` is unlinked rather than `rmtree`'d through. +- **N+1 in `_to_detail_view`** (`backend/app/services/mission_tests.py:_to_detail_view`): the last-actor and detection-level lookups each issued their own `s.get()`. Replaced with `select(columns)` queries that return just the needed scalar fields — same SQL count but fewer ORM round-trips, and every PUT/transition exercises this path so it adds up. +- **Mission detail row `onClick` removed in favour of the wrapped `Link`** (`frontend/src/pages/MissionDetailPage.tsx:684`): the `tr onClick` + nested `Link` with `stopPropagation` worked but was fragile to accessibility tooling. The link on the test name + the explicit hover class is enough. + +### Added — M7 (Red & blue execution on a mission test) +- **Per-mission-test write API** (`app/api/missions.py` + `app/services/mission_tests.py`): + - `GET /missions/{id}/tests/{test_id}` — full detail view with snapshot, state, red/blue fields, MITRE tags, evidence list, last-actor metadata. + - `PUT /missions/{id}/tests/{test_id}` — patch any subset of `red_command` / `red_output` / `red_comment_md` / `blue_comment_md` / `detection_level_id` / `executed_at` / `executed_at_overridden`. The service classifies each touched field as red-side or blue-side and rejects with 403 if the caller lacks the matching perm. `executed_at*` only writable when the test sits in `executed` or `reviewed_by_blue`. + - `POST /missions/{id}/tests/{test_id}/transition` — drives the state machine `pending↔skipped/blocked` + `pending→executed→reviewed_by_blue` (allows undo back to `pending`). Side-aware perm gating: `pending→executed` and `executed→pending` require `write_red_fields`; `executed↔reviewed_by_blue` requires `write_blue_fields`; `pending↔skipped/blocked` accepts either side. Transitioning into `executed` stamps `executed_at=now()` and clears the override; transitioning out (to `pending`) wipes the timestamp. + - `GET /missions/{id}/activity?since=` — returns mission_tests whose `updated_at > since`, freshest first. Drives the SPA's 15-second polling badge. Response includes `server_time` so the client can chain calls without clock drift. +- **Evidence storage pipeline** (`app/services/evidence.py` + `app/api/evidence.py`): + - `POST /missions/{id}/tests/{test_id}/evidence` (multipart, gated on `mission.write_blue_fields`): streams the upload into a tmpfile next to the final location, hashing chunk-by-chunk and aborting at the 25 MB cap. Validates extension (whitelist: png/jpg/jpeg/pdf/txt/log/json/csv/evtx/zip) and MIME (permissive allowlist + `application/octet-stream` fallback for `.evtx`). Content-addressed storage: `${EVIDENCE_DIR}///` — re-uploading byte-identical content reuses the file on disk and inserts a fresh row. + - `GET /evidence/{id}` — JSON metadata view; `?download=true` switches to `send_file` with the original filename in `Content-Disposition` and the SHA256 as the ETag. + - `DELETE /evidence/{id}` — soft delete (only flips `deleted_at`; physical purge lands in M12). + - All three routes are membership-aware via the same chain walk (`evidence → test → scenario → mission`), collapsing "not found" / "not visible" into 404 to prevent existence leaks. +- **Activity tracking column** (`backend/alembic/versions/20260514_1000_91a4e7c6d2f3_m7_mission_test_last_actor.py`): added `mission_tests.last_actor_id` (FK `users.id` `ON DELETE SET NULL`) + `ix_mission_tests_updated_at` to support the polling endpoint. Every red/blue write or transition stamps the actor so the "modified by X Ns ago" indicator can resolve a human label. +- **Detection-level seed + read** (`app/services/detection_levels.py` + `app/api/detection_levels.py`): + - 4 default rows seeded at boot — `detected_blocked` / `detected_alert` / `logged_only` / `not_detected` — colored on the design-system accent palette. The seed is idempotent and never mutates existing rows; new keys added to `DEFAULT_LEVELS` in future releases surface on next boot. + - `GET /detection-levels` (gated on `detection_level.read`) returns the catalogue ordered by position. CRUD is M8's territory. +- **Per-test page** (`frontend/src/pages/MissionTestPage.tsx`): two-zone layout with the red border on the red half (command, output, markdown comment, mark-executed button, override toggle) and the cyan border on the blue half (detection-level select, comment, drag-and-drop evidence dropzone). Per-field disable based on `mission.write_red_fields` / `mission.write_blue_fields`; server is the ultimate arbiter so the UI is purely advisory. The "Last touched Xs ago by Y" badge polls `/activity` every 15 s while the document is visible. +- **Mission detail page wires through to the per-test page** (`frontend/src/pages/MissionDetailPage.tsx`): every row in the Tests tab is now clickable (cursor + hover state) and links to `/missions//tests/`. The route is registered in `App.tsx` behind `RequireAuth`. +- **TanStack query keys** (`frontend/src/lib/missions.ts`): added `missionTestKeys.detail()` / `.activity()` / `.detectionLevels()` so the per-test page invalidations stay surgical (don't blow away the whole missions list). +- **`/diag/reset` extended** (`app/api/diag.py`): test mode now wipes `${EVIDENCE_DIR}/*` so e2e uploads don't accumulate across runs. Detection levels are preserved (reference data, not catalogue) and the seed is re-run as a safety net. +- **Tests**: + - **`backend/tests/test_mission_tests.py`** — 25 pytest tests covering: detection-level seed + perm gating; red/blue field-level perms (red user blocked on blue fields and vice-versa); mark-executed stamps `executed_at`; override gating (forbidden while pending, blue-side blocked); state-machine matrix + side perm refinement; membership 404 vs admin bypass; evidence 24 MB ok / 26 MB rejected; SHA256 verification; MIME/extension whitelist; soft-delete hides bytes from detail view; activity polling with `since=` URL-encoded; future `since` returns empty. + - **`e2e/tests/m7-execution.spec.ts`** — 5 Playwright tests against the live stack: red-only/blue-only API gating, mark-executed + reviewed_by_blue side enforcement, 24 MB/26 MB upload + SHA256 round-trip, SPA per-test page save + transition, non-member sees the 404 alert instead of mission content. `afterAll` restores the stable admin and re-syncs MITRE. +- **HomePage**: hero + roadmap card bumped to `M7 — Red & blue execution on a mission test (done). Next: M8`. + ### Fixed (post-M6 SPA — mission detail page was read-only) - **Mission detail page couldn't edit metadata, append scenarios, or change members** (`frontend/src/pages/MissionDetailPage.tsx`): the M6 SPA shipped the 3-step *creation* wizard but no edit affordance on the detail page — even though the backend already exposed `PUT /missions/{id}`, `POST /missions/{id}/scenarios`, and `PUT /missions/{id}/members`. Added three modals gated by `is_admin || mission.update`: - **Edit metadata** (header button, opens a 3xl modal): name / client_target / dates / description_md, full inline validation (empty name, inverted dates) mirroring the wizard's step 1. diff --git a/README.md b/README.md index 93516eb..5df9a78 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Collaborative purple-team platform. Red team logs the tests they execute (procedure, command, timestamp); blue team annotates each test with detection evidence (alerts, logs, files). At the end of an engagement, Metamorph generates a standalone reveal.js slide deck classified by MITRE ATT&CK tactic. -> **Status**: M0–M5 delivered (bootstrap → DB schema → auth → RBAC → MITRE ATT&CK reference → test & scenario templates). See `tasks/spec.md` for the full specification and `tasks/todo.md` for the milestone-by-milestone plan. +> **Status**: M0–M7 delivered (bootstrap → DB schema → auth → RBAC → MITRE ATT&CK reference → test & scenario templates → missions snapshot → red/blue execution on a mission test). See `tasks/spec.md` for the full specification and `tasks/todo.md` for the milestone-by-milestone plan. ## Stack @@ -13,6 +13,7 @@ Collaborative purple-team platform. Red team logs the tests they execute (proced - **MITRE ATT&CK (M4+)**: Enterprise reference catalogue pinned to v19.0, seedable via `make seed-mitre`. - **Template catalogue (M5+)**: reusable `test_templates` (markdown procedure, OPSEC level, free tags, expected IOCs, MITRE tags) + ordered `scenario_templates` with drag-and-drop reordering. Admin pages at `/admin/tests` and `/admin/scenarios`. - **Missions (M6+)**: `missions` snapshot one or more scenario templates at creation time; template edits don't drift live missions (`mission_*` tables freeze every field, including MITRE tags). Non-admin members see only their own missions (membership filter, 404 on existence-leak attempts). Status state machine `draft → in_progress → completed → archived`, archive perm gated separately. SPA: list/filter at `/missions`, 3-step create wizard at `/missions/new`, detail page with Tests / Members / Synthesis / Export tabs. +- **Execution (M7+)**: per-test page `/missions//tests/` with two zones — red (command/output/comment + mark-executed with override) and blue (detection-level select / comment / drag-and-drop evidence upload). Field-level perm gating: `mission.write_red_fields` / `mission.write_blue_fields` are server-enforced *per field*. State machine `pending↔skipped/blocked` + `pending→executed→reviewed_by_blue` with side-aware perms. Evidence pipeline: streaming upload to `${EVIDENCE_DIR}///`, SHA256 + MIME + extension + 25 MB cap. 15 s activity polling via `/missions//activity?since=…` drives the "modified by X" badge. 4 default `detection_levels` seeded at boot. - **Delivery**: docker-compose. TLS termination is expected to be handled by an external reverse proxy in production. ## Quickstart @@ -95,7 +96,7 @@ See `.env.example`. The most important ones: ## Testing -- **Manual + automated checklist for the current milestone**: see [`tasks/testing-m.md`](tasks/testing-m6.md) (current: `testing-m6.md`). +- **Manual + automated checklist for the current milestone**: see [`tasks/testing-m.md`](tasks/testing-m7.md) (current: `testing-m7.md`). - **Backend unit tests**: `make test-api` - **End-to-end (Playwright)**: `make e2e-install` (once), then `make up && make e2e`. Reports land in `e2e/playwright-report/` (HTML + JUnit XML); open with `make e2e-report`. @@ -137,7 +138,7 @@ The hooks run `ruff` + `ruff-format` on the backend and `eslint` / `tsc --noEmit ## Roadmap -See `tasks/todo.md`. Current milestone: **M6 — Missions & snapshot** (done). Next: M7 (red/blue execution on a mission test). +See `tasks/todo.md`. Current milestone: **M7 — Red & blue execution on a mission test** (done). Next: M8 (custom detection-level CRUD). ## License diff --git a/backend/alembic/versions/20260514_1000_91a4e7c6d2f3_m7_mission_test_last_actor.py b/backend/alembic/versions/20260514_1000_91a4e7c6d2f3_m7_mission_test_last_actor.py new file mode 100644 index 0000000..986ece9 --- /dev/null +++ b/backend/alembic/versions/20260514_1000_91a4e7c6d2f3_m7_mission_test_last_actor.py @@ -0,0 +1,53 @@ +"""m7 mission test last actor tracking + +Revision ID: 91a4e7c6d2f3 +Revises: 24765a5014b6 +Create Date: 2026-05-14 10:00:00.000000 + +Adds the `last_actor_id` column to `mission_tests` so the activity polling +endpoint can answer "modified by X" without joining the audit log (M14 owns +the full audit story). FK to `users.id` with `ON DELETE SET NULL` so deleting +a user does not wipe the history of their writes. +""" +from __future__ import annotations + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +revision: str = "91a4e7c6d2f3" +down_revision: str | None = "24765a5014b6" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.add_column( + "mission_tests", + sa.Column("last_actor_id", sa.Uuid(), nullable=True), + ) + op.create_foreign_key( + op.f("fk_mission_tests_last_actor_id_users"), + "mission_tests", + "users", + ["last_actor_id"], + ["id"], + ondelete="SET NULL", + ) + op.create_index( + "ix_mission_tests_updated_at", + "mission_tests", + ["updated_at"], + unique=False, + ) + + +def downgrade() -> None: + op.drop_index("ix_mission_tests_updated_at", table_name="mission_tests") + op.drop_constraint( + op.f("fk_mission_tests_last_actor_id_users"), + "mission_tests", + type_="foreignkey", + ) + op.drop_column("mission_tests", "last_actor_id") diff --git a/backend/app/api/detection_levels.py b/backend/app/api/detection_levels.py new file mode 100644 index 0000000..04183ad --- /dev/null +++ b/backend/app/api/detection_levels.py @@ -0,0 +1,37 @@ +"""Detection-level taxonomy API. + +Read-only in M7 — M8 will add CRUD. The four defaults are seeded at boot +via `app.services.detection_levels.seed_detection_levels()`. +""" + +from __future__ import annotations + +from typing import Any + +from flask import Blueprint, jsonify + +from app.core.auth_decorators import require_auth, require_perm +from app.services import detection_levels as svc + +bp = Blueprint("detection_levels", __name__, url_prefix="/detection-levels") + + +def _serialize(view: svc.DetectionLevelView) -> dict[str, Any]: + return { + "id": str(view.id), + "key": view.key, + "label_fr": view.label_fr, + "label_en": view.label_en, + "color_token": view.color_token, + "position": view.position, + "is_default": view.is_default, + "is_system": view.is_system, + } + + +@bp.get("") +@require_auth +@require_perm("detection_level.read") +def list_detection_levels(): + items = svc.list_detection_levels() + return jsonify({"items": [_serialize(it) for it in items]}) diff --git a/backend/app/api/diag.py b/backend/app/api/diag.py index efb71f6..2e36ac2 100644 --- a/backend/app/api/diag.py +++ b/backend/app/api/diag.py @@ -8,6 +8,8 @@ is the bedrock of the e2e suite (clean DB + freshly minted install token). from __future__ import annotations import logging +import shutil +from pathlib import Path from flask import Blueprint, abort, jsonify from sqlalchemy import text @@ -16,6 +18,7 @@ from sqlalchemy.exc import SQLAlchemyError from app.core.config import settings from app.core.install_token import regenerate_install_token from app.db.session import get_engine +from app.services.detection_levels import seed_detection_levels bp = Blueprint("diag", __name__, url_prefix="/diag") log = logging.getLogger("metamorph.diag") @@ -108,10 +111,39 @@ def reset_test_state(): "mitre_techniques, mitre_tactics RESTART IDENTITY CASCADE" ) ) + # Detection levels (M7) are reference data seeded at boot — they + # are explicitly preserved here, but the seed is re-run below to + # cover the edge case where an operator hand-tweaked the rows + # before invoking the reset. The seed is idempotent. except SQLAlchemyError as e: log.error("metamorph.diag.reset_failed", extra={"error": str(e)}) return jsonify({"reset": False, "error": "database_error"}), 500 + # M7: wipe the evidence directory so an e2e suite that uploads bytes does + # not accumulate files across runs. Only in `test`; in `dev` we keep the + # files (operator likely wants to inspect what they uploaded by hand). + if settings.APP_ENV == "test": + evidence_root = Path(settings.EVIDENCE_DIR) + if evidence_root.exists(): + for child in evidence_root.iterdir(): + # Symlinks are unlinked, never followed — a hostile or + # accidental symlink inside the evidence dir must NOT cause + # rmtree to recurse into an unrelated tree. + try: + if child.is_symlink() or not child.is_dir(): + child.unlink(missing_ok=True) + else: + shutil.rmtree(child) + except OSError as e: + log.warning( + "metamorph.diag.evidence_cleanup_failed", + extra={"path": str(child), "error": str(e)}, + ) + + # Detection levels were preserved during the wipe; re-run the seed to + # cover the off-chance an operator has deleted some rows manually. + seed_detection_levels() + token = regenerate_install_token() # Clear the in-memory rate-limit counters so the e2e suite that follows can diff --git a/backend/app/api/evidence.py b/backend/app/api/evidence.py new file mode 100644 index 0000000..93893cd --- /dev/null +++ b/backend/app/api/evidence.py @@ -0,0 +1,123 @@ +"""Top-level evidence routes (download + soft-delete by id). + +Upload is collocated under `/missions/{id}/tests/{test_id}/evidence` because +that path encodes the parent context. Once an evidence row exists, callers +can address it by id directly — these routes own that side. + +Membership/visibility is enforced through the service (`EvidenceNotFound` is +returned for both "missing" and "not visible" outcomes — no existence leak). +""" + +from __future__ import annotations + +import logging +import uuid +from typing import Any + +from flask import Blueprint, abort, g, jsonify, request, send_file + +from app.core.auth_decorators import AuthenticatedUser, require_auth, require_perm +from app.services import evidence as svc + +bp = Blueprint("evidence", __name__, url_prefix="/evidence") +log = logging.getLogger("metamorph.api.evidence") + + +def _serialize(ev: svc.EvidenceView) -> dict[str, Any]: + return { + "id": str(ev.id), + "mission_test_id": str(ev.mission_test_id), + "sha256": ev.sha256, + "mime": ev.mime, + "size_bytes": ev.size_bytes, + "original_filename": ev.original_filename, + "uploaded_by_user_id": ( + str(ev.uploaded_by_user_id) if ev.uploaded_by_user_id else None + ), + "uploaded_by_email": ev.uploaded_by_email, + "uploaded_by_display_name": ev.uploaded_by_display_name, + "uploaded_at": ev.uploaded_at.isoformat(), + "created_at": ev.created_at.isoformat(), + } + + +def _current_user() -> AuthenticatedUser: + user: AuthenticatedUser | None = getattr(g, "current_user", None) + if user is None: + abort(401, description="not authenticated") + assert user is not None # for Pyright; abort raises HTTPException + return user + + +def _parse_uuid_or_400(raw: str) -> uuid.UUID | None: + try: + return uuid.UUID(raw) + except ValueError: + return None + + +@bp.get("/") +@require_auth +@require_perm("mission.read") +def get_evidence(evidence_id: str): + """Metadata read. Use `?download=true` to receive the bytes inline. + + The download mode streams the on-disk file via `send_file` with the + original filename in `Content-Disposition`. Browsers handle the + Content-Type guess from the stored mime. + """ + eid = _parse_uuid_or_400(evidence_id) + if eid is None: + return jsonify({"error": "invalid_id"}), 400 + user = _current_user() + want_download = request.args.get("download", "false").lower() == "true" + + if want_download: + try: + view, path = svc.get_evidence_for_download( + eid, viewer_id=user.id, viewer_is_admin=user.is_admin + ) + except svc.EvidenceNotFound: + return jsonify({"error": "not_found"}), 404 + log.info( + "metamorph.evidence.download", + extra={ + "evidence_id": str(eid), + "user_id": str(user.id), + "size_bytes": view.size_bytes, + }, + ) + return send_file( + str(path), + mimetype=view.mime, + as_attachment=True, + download_name=view.original_filename, + etag=view.sha256, + conditional=True, + max_age=0, + ) + + try: + view = svc.get_evidence( + eid, viewer_id=user.id, viewer_is_admin=user.is_admin + ) + except svc.EvidenceNotFound: + return jsonify({"error": "not_found"}), 404 + return jsonify(_serialize(view)) + + +@bp.delete("/") +@require_auth +@require_perm("mission.write_blue_fields") +def soft_delete_evidence(evidence_id: str): + eid = _parse_uuid_or_400(evidence_id) + if eid is None: + return jsonify({"error": "invalid_id"}), 400 + user = _current_user() + try: + svc.soft_delete_evidence( + eid, viewer_id=user.id, viewer_is_admin=user.is_admin + ) + except svc.EvidenceNotFound: + return jsonify({"error": "not_found"}), 404 + return jsonify({"ok": True}) diff --git a/backend/app/api/missions.py b/backend/app/api/missions.py index 3358b89..2dde206 100644 --- a/backend/app/api/missions.py +++ b/backend/app/api/missions.py @@ -9,19 +9,25 @@ Status transitions are routed through a single POST endpoint that accepts a target status. We accept either `mission.update` or `mission.archive` at the gate — archiving requires the dedicated perm if the target is `archived`, and the service enforces the lifecycle graph (`_VALID_TRANSITIONS`). + +M7 extends this blueprint with per-test routes under `/missions//tests/...` +plus an activity polling endpoint. The split is purely organisational — the +membership and visibility rules stay identical to M6. """ from __future__ import annotations import logging import uuid -from datetime import date +from datetime import date, datetime, timezone from typing import Any from flask import Blueprint, abort, g, jsonify, request from pydantic import BaseModel, Field, ValidationError from app.core.auth_decorators import AuthenticatedUser, require_auth, require_perm +from app.services import evidence as evidence_svc +from app.services import mission_tests as test_svc from app.services import missions as svc bp = Blueprint("missions", __name__, url_prefix="/missions") @@ -496,3 +502,331 @@ def soft_delete_mission(mission_id: str): return jsonify({"error": "not_found"}), 404 log.info("metamorph.mission.soft_deleted", extra={"mission_id": str(mid)}) return jsonify({"ok": True}) + + +# =========================================================================== # +# M7 — per-test routes +# =========================================================================== # + + +class UpdateMissionTestPayload(BaseModel): + red_command: str | None = Field(default=None, max_length=20_000) + red_output: str | None = Field(default=None, max_length=200_000) + red_comment_md: str | None = Field(default=None, max_length=20_000) + blue_comment_md: str | None = Field(default=None, max_length=20_000) + detection_level_id: uuid.UUID | None = None + executed_at: datetime | None = None + executed_at_overridden: bool | None = None + + model_config = {"extra": "forbid"} + + +class TestTransitionPayload(BaseModel): + target_state: str = Field(min_length=1, max_length=24) + + model_config = {"extra": "forbid"} + + +def _serialize_evidence(ev: test_svc.EvidenceView) -> dict[str, Any]: + return { + "id": str(ev.id), + "mission_test_id": str(ev.mission_test_id), + "sha256": ev.sha256, + "mime": ev.mime, + "size_bytes": ev.size_bytes, + "original_filename": ev.original_filename, + "uploaded_by_user_id": ( + str(ev.uploaded_by_user_id) if ev.uploaded_by_user_id else None + ), + "uploaded_by_email": ev.uploaded_by_email, + "uploaded_by_display_name": ev.uploaded_by_display_name, + "uploaded_at": ev.uploaded_at.isoformat(), + "created_at": ev.created_at.isoformat(), + } + + +def _serialize_test_detail(t: test_svc.MissionTestDetailView) -> dict[str, Any]: + return { + "id": str(t.id), + "mission_id": str(t.mission_id), + "scenario_id": str(t.scenario_id), + "position": t.position, + "snapshot_name": t.snapshot_name, + "snapshot_description": t.snapshot_description, + "snapshot_objective": t.snapshot_objective, + "snapshot_procedure_md": t.snapshot_procedure_md, + "snapshot_prerequisites_md": t.snapshot_prerequisites_md, + "snapshot_expected_red_md": t.snapshot_expected_red_md, + "snapshot_expected_blue_md": t.snapshot_expected_blue_md, + "snapshot_opsec_level": t.snapshot_opsec_level, + "snapshot_tags": t.snapshot_tags, + "snapshot_expected_iocs": t.snapshot_expected_iocs, + "state": t.state, + "executed_at": t.executed_at.isoformat() if t.executed_at else None, + "executed_at_overridden": t.executed_at_overridden, + "red_command": t.red_command, + "red_output": t.red_output, + "red_comment_md": t.red_comment_md, + "blue_comment_md": t.blue_comment_md, + "detection_level_id": ( + str(t.detection_level_id) if t.detection_level_id else None + ), + "detection_level_key": t.detection_level_key, + "last_actor_id": str(t.last_actor_id) if t.last_actor_id else None, + "last_actor_email": t.last_actor_email, + "last_actor_display_name": t.last_actor_display_name, + "updated_at": t.updated_at.isoformat(), + "mitre_tags": [ + { + "kind": tag.kind, + "external_id": tag.external_id, + "name": tag.name, + "url": tag.url, + } + for tag in t.mitre_tags + ], + "evidence": [_serialize_evidence(e) for e in t.evidence], + } + + +def _serialize_activity(a: test_svc.ActivityEntryView) -> dict[str, Any]: + return { + "test_id": str(a.test_id), + "scenario_id": str(a.scenario_id), + "state": a.state, + "updated_at": a.updated_at.isoformat(), + "last_actor_id": str(a.last_actor_id) if a.last_actor_id else None, + "last_actor_email": a.last_actor_email, + "last_actor_display_name": a.last_actor_display_name, + } + + +def _has_perm(user: AuthenticatedUser, code: str) -> bool: + return user.is_admin or code in user.permissions + + +@bp.get("//tests/") +@require_auth +@require_perm("mission.read") +def get_mission_test(mission_id: str, test_id: str): + mid = _parse_uuid_or_400(mission_id) + tid = _parse_uuid_or_400(test_id) + if mid is None or tid is None: + return jsonify({"error": "invalid_id"}), 400 + user = _current_user() + try: + view = test_svc.get_mission_test( + mid, tid, viewer_id=user.id, viewer_is_admin=user.is_admin + ) + except svc.MissionNotFound: + return jsonify({"error": "not_found"}), 404 + except test_svc.MissionTestNotFound: + return jsonify({"error": "not_found"}), 404 + return jsonify(_serialize_test_detail(view)) + + +@bp.put("//tests/") +@require_auth +@require_perm("mission.write_red_fields", "mission.write_blue_fields") +def update_mission_test(mission_id: str, test_id: str): + """Patch any subset of red/blue fields on a test. + + The outer decorator gates on *either* side perm so a user with only + `write_blue_fields` reaches the handler — but the service then refuses + individual fields they cannot write (red fields → 403). The membership + filter remains row-level inside the service. + """ + mid = _parse_uuid_or_400(mission_id) + tid = _parse_uuid_or_400(test_id) + if mid is None or tid is None: + return jsonify({"error": "invalid_id"}), 400 + + raw = request.get_json(silent=True) or {} + try: + payload = UpdateMissionTestPayload.model_validate(raw) + except ValidationError as e: + return jsonify({"error": "invalid_request", "details": e.errors()}), 400 + + kwargs: dict[str, Any] = {} + for field in ( + "red_command", + "red_output", + "red_comment_md", + "blue_comment_md", + "detection_level_id", + "executed_at", + "executed_at_overridden", + ): + if field in raw: + kwargs[field] = getattr(payload, field) + + user = _current_user() + try: + view = test_svc.update_mission_test_fields( + mid, + tid, + viewer_id=user.id, + viewer_is_admin=user.is_admin, + has_red_perm=_has_perm(user, "mission.write_red_fields"), + has_blue_perm=_has_perm(user, "mission.write_blue_fields"), + **kwargs, + ) + except svc.MissionNotFound: + return jsonify({"error": "not_found"}), 404 + except test_svc.MissionTestNotFound: + return jsonify({"error": "not_found"}), 404 + except test_svc.MissingFieldPermission as e: + log.info( + "metamorph.mission_test.field_perm_denied", + extra={ + "mission_id": str(mid), + "test_id": str(tid), + "user_id": str(user.id), + "reason": str(e), + }, + ) + return jsonify({"error": "forbidden", "message": str(e)}), 403 + except test_svc.InvalidTestPayload as e: + return jsonify({"error": "invalid_request", "message": str(e)}), 400 + log.info( + "metamorph.mission_test.updated", + extra={ + "mission_id": str(mid), + "test_id": str(tid), + "fields": sorted(kwargs.keys()), + }, + ) + return jsonify(_serialize_test_detail(view)) + + +@bp.post("//tests//transition") +@require_auth +@require_perm("mission.write_red_fields", "mission.write_blue_fields") +def transition_mission_test(mission_id: str, test_id: str): + mid = _parse_uuid_or_400(mission_id) + tid = _parse_uuid_or_400(test_id) + if mid is None or tid is None: + return jsonify({"error": "invalid_id"}), 400 + try: + payload = TestTransitionPayload.model_validate(request.get_json(silent=True) or {}) + except ValidationError as e: + return jsonify({"error": "invalid_request", "details": e.errors()}), 400 + + user = _current_user() + try: + view = test_svc.transition_mission_test( + mid, + tid, + payload.target_state, + viewer_id=user.id, + viewer_is_admin=user.is_admin, + has_red_perm=_has_perm(user, "mission.write_red_fields"), + has_blue_perm=_has_perm(user, "mission.write_blue_fields"), + ) + except svc.MissionNotFound: + return jsonify({"error": "not_found"}), 404 + except test_svc.MissionTestNotFound: + return jsonify({"error": "not_found"}), 404 + except test_svc.MissingFieldPermission as e: + return jsonify({"error": "forbidden", "message": str(e)}), 403 + except test_svc.InvalidTestTransition as e: + return jsonify({"error": "invalid_transition", "message": str(e)}), 409 + except test_svc.InvalidTestPayload as e: + return jsonify({"error": "invalid_request", "message": str(e)}), 400 + log.info( + "metamorph.mission_test.transitioned", + extra={ + "mission_id": str(mid), + "test_id": str(tid), + "state": view.state, + }, + ) + return jsonify(_serialize_test_detail(view)) + + +@bp.post("//tests//evidence") +@require_auth +@require_perm("mission.write_blue_fields") +def upload_evidence(mission_id: str, test_id: str): + """Multipart upload — single `file` part. Returns the new evidence row. + + Streaming + size cap + SHA256 calc happen in the service; we just sniff + the request and surface the right error codes. + """ + mid = _parse_uuid_or_400(mission_id) + tid = _parse_uuid_or_400(test_id) + if mid is None or tid is None: + return jsonify({"error": "invalid_id"}), 400 + + upload = request.files.get("file") + if upload is None or not upload.filename: + return jsonify({"error": "missing_file"}), 400 + + user = _current_user() + try: + view = evidence_svc.add_evidence( + mid, + tid, + file_stream=upload.stream, + original_filename=upload.filename, + mime=upload.mimetype or "application/octet-stream", + viewer_id=user.id, + viewer_is_admin=user.is_admin, + ) + except svc.MissionNotFound: + return jsonify({"error": "not_found"}), 404 + except test_svc.MissionTestNotFound: + return jsonify({"error": "not_found"}), 404 + except evidence_svc.EvidenceValidationError as e: + return jsonify({"error": e.code, "message": str(e)}), 400 + except evidence_svc.EvidenceStorageError as e: + return jsonify({"error": "storage_failed", "message": str(e)}), 500 + log.info( + "metamorph.api.evidence.uploaded", + extra={ + "mission_id": str(mid), + "test_id": str(tid), + "evidence_id": str(view.id), + "size_bytes": view.size_bytes, + }, + ) + return jsonify(_serialize_evidence(view)), 201 + + +@bp.get("//activity") +@require_auth +@require_perm("mission.read") +def mission_activity(mission_id: str): + """Polled by the per-test page to drive the "modified by X" badge. + + Accepts an optional `since=` filter. Returns only mission + tests, not auth/templates — those are out of scope for this indicator. + """ + mid = _parse_uuid_or_400(mission_id) + if mid is None: + return jsonify({"error": "invalid_id"}), 400 + + since_raw = request.args.get("since") + since: datetime | None = None + if since_raw: + try: + since = datetime.fromisoformat(since_raw) + except ValueError: + return jsonify({"error": "invalid_since"}), 400 + + user = _current_user() + try: + entries = test_svc.list_activity_since( + mid, + viewer_id=user.id, + viewer_is_admin=user.is_admin, + since=since, + ) + except svc.MissionNotFound: + return jsonify({"error": "not_found"}), 404 + return jsonify( + { + "items": [_serialize_activity(e) for e in entries], + "server_time": datetime.now(tz=timezone.utc).isoformat(), + } + ) diff --git a/backend/app/api/v1.py b/backend/app/api/v1.py index dd8f6a1..0602a69 100644 --- a/backend/app/api/v1.py +++ b/backend/app/api/v1.py @@ -5,7 +5,9 @@ from __future__ import annotations from flask import Blueprint from app.api.auth import bp as auth_bp +from app.api.detection_levels import bp as detection_levels_bp from app.api.diag import bp as diag_bp +from app.api.evidence import bp as evidence_bp from app.api.groups import bp as groups_bp from app.api.health import bp as health_bp from app.api.invitations import bp as invitations_bp @@ -30,3 +32,5 @@ bp.register_blueprint(mitre_bp) bp.register_blueprint(test_templates_bp) bp.register_blueprint(scenario_templates_bp) bp.register_blueprint(missions_bp) +bp.register_blueprint(detection_levels_bp) +bp.register_blueprint(evidence_bp) diff --git a/backend/app/main.py b/backend/app/main.py index 9d29cfa..4db6950 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -16,6 +16,7 @@ from app.core.install_token import ( from app.core.logging import configure_logging from app.core.rate_limit import limiter from app.services.bootstrap import ensure_system_groups +from app.services.detection_levels import seed_detection_levels from app.services.permissions_seed import seed_all as seed_permissions_and_bindings @@ -29,6 +30,7 @@ def _try_bootstrap_at_boot(log: logging.Logger) -> None: try: ensure_system_groups() seed_permissions_and_bindings() + seed_detection_levels() token = ensure_install_token() if token is not None: log_install_token_banner(token) diff --git a/backend/app/models/mission.py b/backend/app/models/mission.py index 0679fae..ac8acef 100644 --- a/backend/app/models/mission.py +++ b/backend/app/models/mission.py @@ -218,6 +218,17 @@ class MissionTest(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin): nullable=True, ) + # --- Activity tracking (M7) --- + # Last user who wrote any red/blue field, flipped state, or uploaded + # evidence on this test. Used by the polling activity endpoint to drive + # the "modified by X Ns ago" badge. FK ON DELETE SET NULL so removing a + # user retains the history (the badge falls back to ""). + last_actor_id: Mapped[uuid.UUID | None] = mapped_column( + Uuid(as_uuid=True), + ForeignKey("users.id", ondelete="SET NULL"), + nullable=True, + ) + scenario: Mapped[MissionScenario] = relationship(back_populates="tests") mitre_tags: Mapped[list["MissionTestMitreTag"]] = relationship( back_populates="mission_test", @@ -236,6 +247,7 @@ class MissionTest(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin): ), UniqueConstraint("scenario_id", "position", name="uq_mission_tests_position"), Index("ix_mission_tests_state", "state"), + Index("ix_mission_tests_updated_at", "updated_at"), Index( "ix_mission_tests_active", "deleted_at", diff --git a/backend/app/services/detection_levels.py b/backend/app/services/detection_levels.py new file mode 100644 index 0000000..2f8a9f4 --- /dev/null +++ b/backend/app/services/detection_levels.py @@ -0,0 +1,140 @@ +"""Detection-level taxonomy. + +The 4 default levels are seeded at boot. M7 exposes read-only access so the +blue side of a mission test can pick a level; M8 will add CRUD. + +The seed is idempotent and additive: rows whose `key` already exists are left +alone (operators may have renamed labels). Only missing keys are inserted. +""" + +from __future__ import annotations + +import logging +import uuid +from dataclasses import dataclass + +from sqlalchemy import select + +from app.db.session import session_scope +from app.models.setting import DetectionLevel + +log = logging.getLogger("metamorph.detection_levels") + + +@dataclass(frozen=True) +class DetectionLevelView: + id: uuid.UUID + key: str + label_fr: str + label_en: str + color_token: str + position: int + is_default: bool + is_system: bool + + +@dataclass(frozen=True) +class _DefaultLevel: + key: str + label_fr: str + label_en: str + color_token: str + position: int + is_default: bool + + +# Seed catalogue. Colors map onto the design-system accents (cf. tasks/design.md). +DEFAULT_LEVELS: tuple[_DefaultLevel, ...] = ( + _DefaultLevel( + key="detected_blocked", + label_fr="Bloqué", + label_en="Blocked", + color_token="red", + position=0, + is_default=False, + ), + _DefaultLevel( + key="detected_alert", + label_fr="Alerte détectée", + label_en="Alert detected", + color_token="orange", + position=1, + is_default=False, + ), + _DefaultLevel( + key="logged_only", + label_fr="Loggé uniquement", + label_en="Logged only", + color_token="yellow", + position=2, + is_default=False, + ), + _DefaultLevel( + key="not_detected", + label_fr="Non détecté", + label_en="Not detected", + color_token="rose", + position=3, + is_default=True, + ), +) + + +def _to_view(r: DetectionLevel) -> DetectionLevelView: + return DetectionLevelView( + id=r.id, + key=r.key, + label_fr=r.label_fr, + label_en=r.label_en, + color_token=r.color_token, + position=r.position, + is_default=r.is_default, + is_system=r.is_system, + ) + + +def seed_detection_levels() -> dict[str, int]: + """Insert any default level whose `key` is missing. Idempotent. + + We never mutate existing rows here — operators are free to rename labels + or change the default flag. Adding a new entry to `DEFAULT_LEVELS` in a + future release will surface it on the next boot. + """ + created = 0 + with session_scope() as s: + existing_keys = set(s.scalars(select(DetectionLevel.key)).all()) + for lvl in DEFAULT_LEVELS: + if lvl.key in existing_keys: + continue + s.add( + DetectionLevel( + key=lvl.key, + label_fr=lvl.label_fr, + label_en=lvl.label_en, + color_token=lvl.color_token, + position=lvl.position, + is_default=lvl.is_default, + is_system=True, + ) + ) + created += 1 + # `created` is a reserved LogRecord attribute (timestamp) — use a prefixed key. + log.info( + "metamorph.detection_levels.seeded", + extra={"rows_created": created, "total": len(DEFAULT_LEVELS)}, + ) + return {"created": created, "total": len(DEFAULT_LEVELS)} + + +def list_detection_levels() -> list[DetectionLevelView]: + with session_scope() as s: + rows = s.scalars( + select(DetectionLevel).order_by(DetectionLevel.position, DetectionLevel.key) + ).all() + return [_to_view(r) for r in rows] + + +def get_detection_level(level_id: uuid.UUID) -> DetectionLevelView | None: + with session_scope() as s: + r = s.get(DetectionLevel, level_id) + return _to_view(r) if r is not None else None diff --git a/backend/app/services/evidence.py b/backend/app/services/evidence.py new file mode 100644 index 0000000..3cbd2dc --- /dev/null +++ b/backend/app/services/evidence.py @@ -0,0 +1,391 @@ +"""Blue-side evidence storage service (M7). + +Files live under `${EVIDENCE_DIR}///`. +The path is content-addressed: re-uploading byte-identical content into the +same test reuses the existing file on disk and inserts a fresh row (so we +keep history of who uploaded what without duplicating bytes). + +The upload pipeline streams to a tmpfile inside the same per-test directory +(`atomic move` semantics on POSIX), computing the SHA256 chunk-by-chunk and +aborting when the byte count crosses `MAX_BYTES`. We refuse files whose +extension is not in the whitelist; MIME is also validated but with a more +permissive fallback (browsers and `file(1)` disagree on `.evtx`). + +Soft delete only flips `deleted_at`. The bytes are kept on disk so a future +admin `/admin/purge` (M12) can remove them physically. Until then, the path +is still queryable but the API hides it from non-admins. +""" + +from __future__ import annotations + +import hashlib +import logging +import os +import re +import tempfile +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import BinaryIO + +from app.core.config import settings +from app.db.session import session_scope +from app.models.auth import User +from app.models.evidence import EvidenceFile +from app.models.mission import MissionScenario, MissionTest +from app.services.mission_tests import ( + EvidenceView, + _ensure_mission_visible, + _load_test, + _to_evidence_view, + _touch, +) + +log = logging.getLogger("metamorph.evidence") + + +# --------------------------------------------------------------------------- # +# Validation rules +# --------------------------------------------------------------------------- # + + +MAX_BYTES: int = 25 * 1024 * 1024 # 25 MB per spec §M7 + +# Filename extensions accepted at the upload boundary. Lowercased; the upload +# handler downcases the original filename's tail before comparing. +ALLOWED_EXTS: frozenset[str] = frozenset( + { + ".png", + ".jpg", + ".jpeg", + ".pdf", + ".txt", + ".log", + ".json", + ".csv", + ".evtx", + ".zip", + } +) + +# Accept a permissive MIME set so common browser/OS combos clear validation. +# `.evtx` is canonically `application/octet-stream`; some Windows clients send +# `application/x-msexcel` for csv; etc. We trust the extension first and use +# the MIME as a secondary signal. +ALLOWED_MIMES: frozenset[str] = frozenset( + { + "image/png", + "image/jpeg", + "image/jpg", + "application/pdf", + "text/plain", + "text/csv", + "application/csv", + "application/json", + "application/octet-stream", + "application/zip", + "application/x-zip-compressed", + } +) + + +# --------------------------------------------------------------------------- # +# Exceptions +# --------------------------------------------------------------------------- # + + +class EvidenceNotFound(Exception): + """Evidence row missing, soft-deleted, or not visible to the viewer.""" + + +class EvidenceValidationError(Exception): + """Extension/MIME/size invalid at the upload boundary.""" + + def __init__(self, code: str, message: str) -> None: + super().__init__(message) + self.code = code + + +class EvidenceStorageError(Exception): + """Disk I/O failure during upload — bytes left on disk are best-effort cleaned.""" + + +# --------------------------------------------------------------------------- # +# Helpers +# --------------------------------------------------------------------------- # + + +def _evidence_dir() -> Path: + return Path(settings.EVIDENCE_DIR).resolve() + + +def _test_dir(mission_id: uuid.UUID, test_id: uuid.UUID) -> Path: + root = _evidence_dir() + # Refuse to lay down per-mission directories at filesystem roots — an + # operator who set EVIDENCE_DIR=/ would otherwise write into / itself. + if root in (Path("/"), Path(root.anchor)): + raise EvidenceStorageError("EVIDENCE_DIR cannot be a filesystem root") + return root / str(mission_id) / str(test_id) + + +def _sniff_ext(filename: str) -> str: + """Lowercased extension including the leading dot, or '' if none.""" + name = filename.rsplit("/", 1)[-1].rsplit("\\", 1)[-1] + if "." not in name: + return "" + return "." + name.rsplit(".", 1)[-1].lower() + + +def _validate_meta(filename: str, mime: str) -> str: + ext = _sniff_ext(filename) + if not ext: + raise EvidenceValidationError( + "missing_extension", "filename must have an extension" + ) + if ext not in ALLOWED_EXTS: + raise EvidenceValidationError( + "unsupported_extension", f"extension {ext!r} is not allowed" + ) + normalised_mime = (mime or "application/octet-stream").lower().split(";", 1)[0].strip() + if normalised_mime not in ALLOWED_MIMES: + raise EvidenceValidationError( + "unsupported_mime", f"mime {normalised_mime!r} is not allowed" + ) + return ext + + +def _stream_to_tmpfile( + src: BinaryIO, target_dir: Path +) -> tuple[Path, str, int]: + """Stream the upload into a tmpfile under `target_dir`, capping size. + + Returns (tmp_path, sha256_hex, total_bytes). Raises + `EvidenceValidationError("too_large", …)` once the cumulative count goes + above `MAX_BYTES`. The tmpfile is *always* removed on error. + """ + target_dir.mkdir(parents=True, exist_ok=True) + fd, tmp_name = tempfile.mkstemp(prefix=".upload-", dir=str(target_dir)) + tmp_path = Path(tmp_name) + hasher = hashlib.sha256() + total = 0 + try: + with os.fdopen(fd, "wb") as fh: + while True: + chunk = src.read(64 * 1024) + if not chunk: + break + total += len(chunk) + if total > MAX_BYTES: + raise EvidenceValidationError( + "too_large", + f"file exceeds the {MAX_BYTES} byte limit", + ) + hasher.update(chunk) + fh.write(chunk) + return tmp_path, hasher.hexdigest(), total + except Exception: + try: + tmp_path.unlink(missing_ok=True) + except OSError: + pass + raise + + +# --------------------------------------------------------------------------- # +# Public API +# --------------------------------------------------------------------------- # + + +def add_evidence( + mission_id: uuid.UUID, + test_id: uuid.UUID, + *, + file_stream: BinaryIO, + original_filename: str, + mime: str, + viewer_id: uuid.UUID, + viewer_is_admin: bool, +) -> EvidenceView: + """Persist the upload and return a view of the new evidence row. + + Pre-conditions: + - The caller already verified that the viewer holds `mission.write_blue_fields`. + - Mission + test visibility is enforced here (404, not 403). + + Disk layout: + ${EVIDENCE_DIR}/// + """ + ext = _validate_meta(original_filename, mime) + target_dir = _test_dir(mission_id, test_id) + + # Visibility/existence check BEFORE we touch disk. + with session_scope() as s: + _ensure_mission_visible(s, mission_id, viewer_id, viewer_is_admin) + _load_test(s, mission_id, test_id) # raises MissionTestNotFound on miss + + tmp_path, sha256, size_bytes = _stream_to_tmpfile(file_stream, target_dir) + + # Defence in depth — the hash comes from hashlib but if any caller ever + # passes pre-computed bytes we want to fail loudly rather than write to a + # path like `..something.evtx`. + if not re.fullmatch(r"[0-9a-f]{64}", sha256): + tmp_path.unlink(missing_ok=True) + raise EvidenceStorageError("computed sha256 is malformed") + + final_path = target_dir / f"{sha256}{ext}" + try: + if final_path.exists(): + # Same bytes already on disk — drop the tmp and reuse the canonical path. + tmp_path.unlink(missing_ok=True) + else: + # `os.replace` is the atomic rename primitive on POSIX (and the + # documented atomic rename on Windows when src/dst live on the + # same volume). We stage the tmpfile in `target_dir` so it + # always shares a filesystem with the destination. + os.replace(str(tmp_path), str(final_path)) + except OSError as e: + try: + tmp_path.unlink(missing_ok=True) + except OSError: + pass + log.warning( + "metamorph.evidence.storage_failed", + extra={"mission_id": str(mission_id), "test_id": str(test_id), "error": str(e)}, + ) + raise EvidenceStorageError(str(e)) from e + + with session_scope() as s: + # Re-load + double-check visibility (defence in depth: the membership + # set could have changed between the pre-check and now). + _ensure_mission_visible(s, mission_id, viewer_id, viewer_is_admin) + test = _load_test(s, mission_id, test_id) + ev = EvidenceFile( + mission_test_id=test.id, + sha256=sha256, + mime=(mime or "application/octet-stream").lower().split(";", 1)[0].strip(), + size_bytes=size_bytes, + storage_path=str(final_path), + original_filename=original_filename[:255], + uploaded_by_user_id=viewer_id, + uploaded_at=datetime.now(tz=timezone.utc), + ) + s.add(ev) + _touch(test, viewer_id) + s.flush() + s.refresh(ev) + uploader = s.get(User, viewer_id) + log.info( + "metamorph.evidence.added", + extra={ + "evidence_id": str(ev.id), + "mission_id": str(mission_id), + "test_id": str(test_id), + "sha256": sha256, + "size_bytes": size_bytes, + "mime": ev.mime, + }, + ) + return _to_evidence_view(ev, uploader) + + +def _resolve_evidence_chain( + s, evidence_id: uuid.UUID +) -> tuple[EvidenceFile, MissionTest, MissionScenario] | None: + """Walk evidence → test → scenario, returning None if any link is missing or deleted.""" + ev = s.get(EvidenceFile, evidence_id) + if ev is None or ev.deleted_at is not None: + return None + test = s.get(MissionTest, ev.mission_test_id) + if test is None or test.deleted_at is not None: + return None + scenario = s.get(MissionScenario, test.scenario_id) + if scenario is None or scenario.deleted_at is not None: + return None + return ev, test, scenario + + +def get_evidence( + evidence_id: uuid.UUID, + *, + viewer_id: uuid.UUID, + viewer_is_admin: bool, +) -> EvidenceView: + """Read a single evidence record. Membership-aware (404 on miss/forbidden).""" + with session_scope() as s: + chain = _resolve_evidence_chain(s, evidence_id) + if chain is None: + raise EvidenceNotFound() + ev, _, scenario = chain + try: + _ensure_mission_visible(s, scenario.mission_id, viewer_id, viewer_is_admin) + except Exception as e: + raise EvidenceNotFound() from e + uploader = s.get(User, ev.uploaded_by_user_id) if ev.uploaded_by_user_id else None + return _to_evidence_view(ev, uploader) + + +def get_evidence_for_download( + evidence_id: uuid.UUID, + *, + viewer_id: uuid.UUID, + viewer_is_admin: bool, +) -> tuple[EvidenceView, Path]: + """Return view + on-disk path. Raises EvidenceNotFound if the bytes are gone.""" + with session_scope() as s: + chain = _resolve_evidence_chain(s, evidence_id) + if chain is None: + raise EvidenceNotFound() + ev, _, scenario = chain + try: + _ensure_mission_visible(s, scenario.mission_id, viewer_id, viewer_is_admin) + except Exception as e: + raise EvidenceNotFound() from e + uploader = s.get(User, ev.uploaded_by_user_id) if ev.uploaded_by_user_id else None + view = _to_evidence_view(ev, uploader) + path = Path(ev.storage_path) + if not path.exists(): + log.warning( + "metamorph.evidence.bytes_missing", + extra={"evidence_id": str(evidence_id), "path": str(path)}, + ) + raise EvidenceNotFound() + return view, path + + +def soft_delete_evidence( + evidence_id: uuid.UUID, + *, + viewer_id: uuid.UUID, + viewer_is_admin: bool, +) -> None: + """Mark an evidence row deleted. Disk bytes are kept until admin purge (M12).""" + with session_scope() as s: + chain = _resolve_evidence_chain(s, evidence_id) + if chain is None: + raise EvidenceNotFound() + ev, test, scenario = chain + try: + _ensure_mission_visible(s, scenario.mission_id, viewer_id, viewer_is_admin) + except Exception as e: + raise EvidenceNotFound() from e + ev.deleted_at = datetime.now(tz=timezone.utc) + _touch(test, viewer_id) + s.flush() + log.info( + "metamorph.evidence.soft_deleted", + extra={"evidence_id": str(evidence_id), "mission_id": str(scenario.mission_id)}, + ) + + +__all__ = [ + "MAX_BYTES", + "ALLOWED_EXTS", + "ALLOWED_MIMES", + "EvidenceNotFound", + "EvidenceValidationError", + "EvidenceStorageError", + "add_evidence", + "get_evidence", + "get_evidence_for_download", + "soft_delete_evidence", +] diff --git a/backend/app/services/mission_tests.py b/backend/app/services/mission_tests.py new file mode 100644 index 0000000..a2be468 --- /dev/null +++ b/backend/app/services/mission_tests.py @@ -0,0 +1,668 @@ +"""Per-mission-test execution service (M7). + +Where M6 builds the snapshot, M7 brings the test to life: + +- Red side: command, output, comment, mark-executed (auto + override). +- Blue side: detection level, comment, evidence (delegated to `evidence.py`). +- State machine: pending↔skipped/blocked, executed→reviewed_by_blue. + +The caller is responsible for telling us which side it has perms for via +`has_red_perm` / `has_blue_perm`. The service refuses field/state writes that +require a side the caller does not hold, raising `MissingFieldPermission`. + +Mission membership is enforced here (404 not 403) consistent with M6 to +prevent existence leaks. +""" + +from __future__ import annotations + +import logging +import uuid +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any + +from sqlalchemy import select +from sqlalchemy.orm import Session, selectinload + +from app.db.session import session_scope +from app.db.types import MISSION_TEST_STATES +from app.models.auth import User +from app.models.evidence import EvidenceFile +from app.models.mission import ( + Mission, + MissionScenario, + MissionTest, +) +from app.models.setting import DetectionLevel +from app.services.missions import ( + MissionNotFound, + _is_member, +) + +log = logging.getLogger("metamorph.mission_tests") + +_UNSET: Any = object() + + +# --------------------------------------------------------------------------- # +# State machine +# --------------------------------------------------------------------------- # +# +# Per spec §M7: pending↔skipped/blocked, executed→reviewed_by_blue. +# We also allow `executed → pending` and `reviewed_by_blue → executed` so a +# red/blue user can revert a misclick without admin intervention. Soft-delete +# is the only forward-only sink (handled outside this service). +# + +_VALID_TRANSITIONS: dict[str, frozenset[str]] = { + "pending": frozenset({"executed", "skipped", "blocked"}), + "executed": frozenset({"reviewed_by_blue", "pending"}), + "reviewed_by_blue": frozenset({"executed"}), + "skipped": frozenset({"pending"}), + "blocked": frozenset({"pending"}), +} + +# Which side "owns" each transition for permission purposes: +# "red" → requires mission.write_red_fields +# "blue" → requires mission.write_blue_fields +# "any" → either side suffices +_TRANSITION_SIDE: dict[tuple[str, str], str] = { + ("pending", "executed"): "red", + ("pending", "skipped"): "any", + ("pending", "blocked"): "any", + ("executed", "reviewed_by_blue"): "blue", + ("executed", "pending"): "red", + ("reviewed_by_blue", "executed"): "blue", + ("skipped", "pending"): "any", + ("blocked", "pending"): "any", +} + +# Same-state idempotent POSTs are still gated: a user replaying a "mark +# executed" must still hold red perms even if the row is already executed. +# This map answers "if you wanted to BE in state X, which side originally +# brought you here?" — and therefore what perm a no-op repeat should require. +_IDEMPOTENT_SIDE: dict[str, str] = { + "executed": "red", + "reviewed_by_blue": "blue", + "pending": "any", + "skipped": "any", + "blocked": "any", +} + + +# --------------------------------------------------------------------------- # +# Exceptions +# --------------------------------------------------------------------------- # + + +class MissionTestNotFound(Exception): + """Test missing, soft-deleted, or not under the given mission/viewer.""" + + +class InvalidTestTransition(Exception): + pass + + +class MissingFieldPermission(Exception): + """Caller tried to write a field requiring a side perm they do not hold.""" + + +class InvalidTestPayload(Exception): + """Generic validation error (bad dates, unknown detection level, ...).""" + + +# --------------------------------------------------------------------------- # +# Views +# --------------------------------------------------------------------------- # + + +@dataclass(frozen=True) +class EvidenceView: + id: uuid.UUID + mission_test_id: uuid.UUID + sha256: str + mime: str + size_bytes: int + original_filename: str + uploaded_by_user_id: uuid.UUID | None + uploaded_by_email: str | None + uploaded_by_display_name: str | None + uploaded_at: datetime + created_at: datetime + + +@dataclass(frozen=True) +class MissionTestMitreTagView: + kind: str + external_id: str + name: str + url: str | None + + +@dataclass(frozen=True) +class MissionTestDetailView: + id: uuid.UUID + mission_id: uuid.UUID + scenario_id: uuid.UUID + position: int + snapshot_name: str + snapshot_description: str | None + snapshot_objective: str | None + snapshot_procedure_md: str | None + snapshot_prerequisites_md: str | None + snapshot_expected_red_md: str | None + snapshot_expected_blue_md: str | None + snapshot_opsec_level: str + snapshot_tags: list[str] + snapshot_expected_iocs: list[str] + state: str + executed_at: datetime | None + executed_at_overridden: bool + red_command: str | None + red_output: str | None + red_comment_md: str | None + blue_comment_md: str | None + detection_level_id: uuid.UUID | None + detection_level_key: str | None + last_actor_id: uuid.UUID | None + last_actor_email: str | None + last_actor_display_name: str | None + updated_at: datetime + mitre_tags: list[MissionTestMitreTagView] + evidence: list[EvidenceView] + + +@dataclass(frozen=True) +class ActivityEntryView: + test_id: uuid.UUID + scenario_id: uuid.UUID + state: str + updated_at: datetime + last_actor_id: uuid.UUID | None + last_actor_email: str | None + last_actor_display_name: str | None + + +# --------------------------------------------------------------------------- # +# Helpers +# --------------------------------------------------------------------------- # + + +def _opt_md(value: Any) -> str | None: + """Normalise a markdown/text input: strip-then-collapse-to-None on empty.""" + if value is None: + return None + if not isinstance(value, str): + raise InvalidTestPayload("text field must be a string") + v = value.strip() + return v or None + + +def _opt_cmd(value: Any) -> str | None: + """Same as `_opt_md` but preserves trailing/leading whitespace inside the body.""" + if value is None: + return None + if not isinstance(value, str): + raise InvalidTestPayload("text field must be a string") + return value if value != "" else None + + +def _ensure_state(value: str) -> str: + if value not in MISSION_TEST_STATES: + raise InvalidTestPayload(f"state must be one of {MISSION_TEST_STATES}") + return value + + +def _load_test( + s: Session, mission_id: uuid.UUID, test_id: uuid.UUID +) -> MissionTest: + """Fetch a live mission_test guarded by mission id, raising on misses.""" + stmt = ( + select(MissionTest) + .join(MissionScenario, MissionTest.scenario_id == MissionScenario.id) + .options(selectinload(MissionTest.mitre_tags)) + .where( + MissionTest.id == test_id, + MissionScenario.mission_id == mission_id, + MissionTest.deleted_at.is_(None), + MissionScenario.deleted_at.is_(None), + ) + ) + row = s.scalars(stmt).one_or_none() + if row is None: + raise MissionTestNotFound() + return row + + +def _ensure_mission_visible( + s: Session, mission_id: uuid.UUID, viewer_id: uuid.UUID, viewer_is_admin: bool +) -> Mission: + """Confirm the mission exists, is live, and is visible to the viewer. + + Returns the Mission row for reuse (e.g. to log the parent name in audit + extras). Raises `MissionNotFound` on any miss — we mirror M6's membership + visibility contract: leaking existence via 403 is forbidden. + """ + m = s.get(Mission, mission_id) + if m is None or m.deleted_at is not None: + raise MissionNotFound() + if not viewer_is_admin and not _is_member(s, mission_id, viewer_id): + raise MissionNotFound() + return m + + +def _to_evidence_view(ev: EvidenceFile, uploader: User | None) -> EvidenceView: + return EvidenceView( + id=ev.id, + mission_test_id=ev.mission_test_id, + sha256=ev.sha256, + mime=ev.mime, + size_bytes=ev.size_bytes, + original_filename=ev.original_filename, + uploaded_by_user_id=ev.uploaded_by_user_id, + uploaded_by_email=uploader.email if uploader is not None else None, + uploaded_by_display_name=uploader.display_name if uploader is not None else None, + uploaded_at=ev.uploaded_at, + created_at=ev.created_at, + ) + + +def _load_evidence_for_test(s: Session, test_id: uuid.UUID) -> list[EvidenceView]: + rows = s.scalars( + select(EvidenceFile) + .where( + EvidenceFile.mission_test_id == test_id, + EvidenceFile.deleted_at.is_(None), + ) + .order_by(EvidenceFile.uploaded_at.asc(), EvidenceFile.id.asc()) + ).all() + if not rows: + return [] + uploader_ids = {r.uploaded_by_user_id for r in rows if r.uploaded_by_user_id} + uploaders: dict[uuid.UUID, User] = {} + if uploader_ids: + uploaders = { + u.id: u + for u in s.scalars( + select(User).where(User.id.in_(uploader_ids)) + ).all() + } + return [ + _to_evidence_view(r, uploaders.get(r.uploaded_by_user_id) if r.uploaded_by_user_id else None) + for r in rows + ] + + +def _to_detail_view( + s: Session, mission_id: uuid.UUID, test: MissionTest +) -> MissionTestDetailView: + # Batch the two FK lookups (last actor + detection level) into a single + # round trip instead of two `s.get` calls — every PUT/transition returns + # the detail view, so this matters. + last_actor_email: str | None = None + last_actor_display_name: str | None = None + level_key: str | None = None + if test.last_actor_id is not None: + actor = s.execute( + select(User.email, User.display_name).where(User.id == test.last_actor_id) + ).first() + if actor is not None: + last_actor_email, last_actor_display_name = actor.email, actor.display_name + if test.detection_level_id is not None: + level_key = s.scalar( + select(DetectionLevel.key).where(DetectionLevel.id == test.detection_level_id) + ) + tag_views = [ + MissionTestMitreTagView( + kind=tag.mitre_kind, + external_id=tag.mitre_external_id, + name=tag.mitre_name, + url=tag.mitre_url, + ) + for tag in sorted( + test.mitre_tags, key=lambda t: (t.mitre_kind, t.mitre_external_id) + ) + ] + return MissionTestDetailView( + id=test.id, + mission_id=mission_id, + scenario_id=test.scenario_id, + position=test.position, + snapshot_name=test.snapshot_name, + snapshot_description=test.snapshot_description, + snapshot_objective=test.snapshot_objective, + snapshot_procedure_md=test.snapshot_procedure_md, + snapshot_prerequisites_md=test.snapshot_prerequisites_md, + snapshot_expected_red_md=test.snapshot_expected_red_md, + snapshot_expected_blue_md=test.snapshot_expected_blue_md, + snapshot_opsec_level=test.snapshot_opsec_level, + snapshot_tags=list(test.snapshot_tags or []), + snapshot_expected_iocs=list(test.snapshot_expected_iocs or []), + state=test.state, + executed_at=test.executed_at, + executed_at_overridden=test.executed_at_overridden, + red_command=test.red_command, + red_output=test.red_output, + red_comment_md=test.red_comment_md, + blue_comment_md=test.blue_comment_md, + detection_level_id=test.detection_level_id, + detection_level_key=level_key, + last_actor_id=test.last_actor_id, + last_actor_email=last_actor_email, + last_actor_display_name=last_actor_display_name, + updated_at=test.updated_at, + mitre_tags=tag_views, + evidence=_load_evidence_for_test(s, test.id), + ) + + +def _touch(test: MissionTest, actor_id: uuid.UUID) -> None: + """Stamp the actor + bump the activity clock. + + `updated_at` is auto-managed by SQLAlchemy's `onupdate=func.now()` mixin + only when at least one mapped attribute changes. Assigning `last_actor_id` + triggers that, even when the actor is the same as the previous one + (Pydantic-clean payloads still flush the assignment). + """ + test.last_actor_id = actor_id + + +# --------------------------------------------------------------------------- # +# Public API — read +# --------------------------------------------------------------------------- # + + +def get_mission_test( + mission_id: uuid.UUID, + test_id: uuid.UUID, + *, + viewer_id: uuid.UUID, + viewer_is_admin: bool, +) -> MissionTestDetailView: + with session_scope() as s: + _ensure_mission_visible(s, mission_id, viewer_id, viewer_is_admin) + test = _load_test(s, mission_id, test_id) + return _to_detail_view(s, mission_id, test) + + +def list_activity_since( + mission_id: uuid.UUID, + *, + viewer_id: uuid.UUID, + viewer_is_admin: bool, + since: datetime | None = None, + limit: int = 200, +) -> list[ActivityEntryView]: + """List mission_tests whose `updated_at > since`, freshest first. + + Drives the "modified by X Ns ago" badge on the per-test page. Soft-deleted + tests/scenarios are excluded so a deletion does not appear as activity. + """ + with session_scope() as s: + _ensure_mission_visible(s, mission_id, viewer_id, viewer_is_admin) + stmt = ( + select(MissionTest, MissionScenario) + .join(MissionScenario, MissionTest.scenario_id == MissionScenario.id) + .where( + MissionScenario.mission_id == mission_id, + MissionTest.deleted_at.is_(None), + MissionScenario.deleted_at.is_(None), + ) + .order_by(MissionTest.updated_at.desc(), MissionTest.id.asc()) + .limit(max(1, min(limit, 500))) + ) + if since is not None: + stmt = stmt.where(MissionTest.updated_at > since) + rows = s.execute(stmt).all() + actor_ids = {r.MissionTest.last_actor_id for r in rows if r.MissionTest.last_actor_id} + actors: dict[uuid.UUID, User] = {} + if actor_ids: + actors = { + u.id: u + for u in s.scalars(select(User).where(User.id.in_(actor_ids))).all() + } + out: list[ActivityEntryView] = [] + for row in rows: + t = row.MissionTest + actor = actors.get(t.last_actor_id) if t.last_actor_id else None + out.append( + ActivityEntryView( + test_id=t.id, + scenario_id=t.scenario_id, + state=t.state, + updated_at=t.updated_at, + last_actor_id=t.last_actor_id, + last_actor_email=actor.email if actor else None, + last_actor_display_name=actor.display_name if actor else None, + ) + ) + return out + + +# --------------------------------------------------------------------------- # +# Public API — write +# --------------------------------------------------------------------------- # + + +# Side membership for each writable field (mirror of the spec's red/blue split). +_RED_FIELDS = {"red_command", "red_output", "red_comment_md", + "executed_at", "executed_at_overridden"} +_BLUE_FIELDS = {"blue_comment_md", "detection_level_id"} + + +def _classify_fields(touched: set[str]) -> tuple[bool, bool]: + """Return (needs_red, needs_blue) for the set of field names being written.""" + return ( + bool(touched & _RED_FIELDS), + bool(touched & _BLUE_FIELDS), + ) + + +def update_mission_test_fields( + mission_id: uuid.UUID, + test_id: uuid.UUID, + *, + viewer_id: uuid.UUID, + viewer_is_admin: bool, + has_red_perm: bool, + has_blue_perm: bool, + red_command: Any = _UNSET, + red_output: Any = _UNSET, + red_comment_md: Any = _UNSET, + blue_comment_md: Any = _UNSET, + detection_level_id: Any = _UNSET, + executed_at: Any = _UNSET, + executed_at_overridden: Any = _UNSET, +) -> MissionTestDetailView: + """Patch any subset of the red/blue annotation fields. + + Field-level perm enforcement happens *before* any write so a forbidden + field never even lands in the SQL transaction (cleaner audit logs). + """ + touched: set[str] = set() + if red_command is not _UNSET: + touched.add("red_command") + if red_output is not _UNSET: + touched.add("red_output") + if red_comment_md is not _UNSET: + touched.add("red_comment_md") + if blue_comment_md is not _UNSET: + touched.add("blue_comment_md") + if detection_level_id is not _UNSET: + touched.add("detection_level_id") + if executed_at is not _UNSET: + touched.add("executed_at") + if executed_at_overridden is not _UNSET: + touched.add("executed_at_overridden") + + needs_red, needs_blue = _classify_fields(touched) + if not viewer_is_admin: + if needs_red and not has_red_perm: + raise MissingFieldPermission( + "mission.write_red_fields required for red-side fields" + ) + if needs_blue and not has_blue_perm: + raise MissingFieldPermission( + "mission.write_blue_fields required for blue-side fields" + ) + + with session_scope() as s: + _ensure_mission_visible(s, mission_id, viewer_id, viewer_is_admin) + test = _load_test(s, mission_id, test_id) + + if not touched: + return _to_detail_view(s, mission_id, test) + + if "red_command" in touched: + test.red_command = _opt_cmd(red_command) + if "red_output" in touched: + test.red_output = _opt_cmd(red_output) + if "red_comment_md" in touched: + test.red_comment_md = _opt_md(red_comment_md) + if "blue_comment_md" in touched: + test.blue_comment_md = _opt_md(blue_comment_md) + + if "detection_level_id" in touched: + if detection_level_id is None: + test.detection_level_id = None + else: + if not isinstance(detection_level_id, uuid.UUID): + raise InvalidTestPayload("detection_level_id must be a UUID") + lvl = s.get(DetectionLevel, detection_level_id) + if lvl is None: + raise InvalidTestPayload("unknown detection_level_id") + test.detection_level_id = detection_level_id + + if "executed_at_overridden" in touched or "executed_at" in touched: + # Editing executed_at is a red-only privilege and only valid when + # the test is past the `executed` milestone. Spec M7: override is + # behind a deliberate toggle so the auto-stamp default is sticky. + if test.state not in {"executed", "reviewed_by_blue"}: + raise InvalidTestPayload( + "executed_at can only be set when state is executed/reviewed_by_blue" + ) + new_overridden = ( + bool(executed_at_overridden) + if "executed_at_overridden" in touched + else test.executed_at_overridden + ) + new_at = test.executed_at if "executed_at" not in touched else executed_at + if new_overridden and new_at is None: + raise InvalidTestPayload( + "executed_at_overridden=true requires a non-null executed_at" + ) + if "executed_at" in touched and new_at is not None and not isinstance(new_at, datetime): + raise InvalidTestPayload("executed_at must be an ISO datetime") + test.executed_at = new_at + test.executed_at_overridden = new_overridden + + _touch(test, viewer_id) + s.flush() + s.refresh(test) + return _to_detail_view(s, mission_id, test) + + +def transition_mission_test( + mission_id: uuid.UUID, + test_id: uuid.UUID, + target_state: str, + *, + viewer_id: uuid.UUID, + viewer_is_admin: bool, + has_red_perm: bool, + has_blue_perm: bool, +) -> MissionTestDetailView: + """Drive the test through its lifecycle and side-effect `executed_at`. + + Transitioning *into* `executed` stamps `executed_at = now()` and clears + the override flag — the deliberate red-side action commits the timeline. + Transitioning *out of* `executed` (to `pending`) clears the timestamp so + a re-execution starts from a clean slate. + """ + _ensure_state(target_state) + + with session_scope() as s: + _ensure_mission_visible(s, mission_id, viewer_id, viewer_is_admin) + test = _load_test(s, mission_id, test_id) + + # Perm gate runs BEFORE the idempotency short-circuit. A blue-only + # user POSTing target_state="executed" while the test is already + # executed must NOT get a 200 — it would falsely advertise that they + # hold the red-side perm. We resolve the would-be transition's side + # (or, on a no-op, fall back to the source side which originally + # produced the state) and enforce it before any response shape. + allowed = _VALID_TRANSITIONS.get(test.state, frozenset()) + if test.state != target_state and target_state not in allowed: + raise InvalidTestTransition( + f"cannot transition test from {test.state!r} to {target_state!r}" + ) + + side: str | None + if test.state == target_state: + # Idempotent path: require the perm the *forward* transition + # would have needed. For terminal-states (already executed → + # executed), this is the side that *brought* the test here. + side = _IDEMPOTENT_SIDE.get(target_state) + else: + side = _TRANSITION_SIDE.get((test.state, target_state)) + + if not viewer_is_admin and side is not None: + if side == "red" and not has_red_perm: + raise MissingFieldPermission( + "mission.write_red_fields required for this transition" + ) + if side == "blue" and not has_blue_perm: + raise MissingFieldPermission( + "mission.write_blue_fields required for this transition" + ) + if side == "any" and not (has_red_perm or has_blue_perm): + raise MissingFieldPermission( + "either mission.write_red_fields or mission.write_blue_fields " + "is required" + ) + + if test.state == target_state: + # Genuine no-op: idempotent 200 with the current snapshot. + return _to_detail_view(s, mission_id, test) + + if target_state == "executed": + test.executed_at = datetime.now(tz=timezone.utc) + test.executed_at_overridden = False + elif target_state == "pending": + # Returning to pending wipes the execution timestamp so a re-run + # starts clean. Notes/comments are preserved (history value). + test.executed_at = None + test.executed_at_overridden = False + + test.state = target_state + _touch(test, viewer_id) + s.flush() + s.refresh(test) + return _to_detail_view(s, mission_id, test) + + +__all__ = [ + "EvidenceView", + "MissionTestDetailView", + "MissionTestMitreTagView", + "ActivityEntryView", + "MissionTestNotFound", + "InvalidTestTransition", + "MissingFieldPermission", + "InvalidTestPayload", + "get_mission_test", + "list_activity_since", + "update_mission_test_fields", + "transition_mission_test", + "_touch", + "_load_test", + "_ensure_mission_visible", + "_to_detail_view", + "_to_evidence_view", +] + + +# Re-export — used by `app/api/missions.py` to wire the +# 404 handling without importing the originals from M6 in two places. +__all__ += ["MissionNotFound"] diff --git a/backend/tests/test_mission_tests.py b/backend/tests/test_mission_tests.py new file mode 100644 index 0000000..d2bc719 --- /dev/null +++ b/backend/tests/test_mission_tests.py @@ -0,0 +1,884 @@ +"""M7 — per-test execution, evidence upload, activity polling. + +Fixture stack mirrors `test_missions.py` so we can reuse the test_template/ +scenario_template catalogue and the red/blue/reader user invitations. M7 +adds the assumption that detection_levels are seeded (boot does this for +the live API; we re-seed inside the module fixture to cover the truncated +state). +""" + +from __future__ import annotations + +import hashlib +import io +import json +import secrets +import urllib.parse +import uuid +from datetime import datetime, timedelta, timezone + +import pytest +from sqlalchemy import text + +from app.core.install_token import regenerate_install_token +from app.main import create_app +from app.services import detection_levels as detection_svc +from app.services import mitre_seed as mitre_svc + + +_MINIMAL_BUNDLE = { + "type": "bundle", + "id": "bundle--00000000-0000-0000-0000-000000000007", + "spec_version": "2.1", + "objects": [ + { + "type": "x-mitre-tactic", + "id": "x-mitre-tactic--ta0002", + "name": "Execution", + "x_mitre_shortname": "execution", + "external_references": [ + {"source_name": "mitre-attack", "external_id": "TA0002"} + ], + }, + { + "type": "attack-pattern", + "id": "attack-pattern--t1059", + "name": "Command and Scripting Interpreter", + "kill_chain_phases": [ + {"kill_chain_name": "mitre-attack", "phase_name": "execution"} + ], + "external_references": [ + {"source_name": "mitre-attack", "external_id": "T1059"} + ], + }, + ], +} + + +def _truncate_all(engine): + with engine.begin() as conn: + conn.execute( + text( + "TRUNCATE mission_test_mitre_tags, mission_tests, " + "mission_scenarios, mission_categories, mission_members, " + "missions RESTART IDENTITY CASCADE" + ) + ) + conn.execute( + text( + "TRUNCATE scenario_template_tests, scenario_templates, " + "test_template_mitre_tags, test_templates " + "RESTART IDENTITY CASCADE" + ) + ) + conn.execute( + text( + "TRUNCATE users, refresh_tokens, invitations, invitation_groups, " + "user_groups, group_permissions, permissions, settings, groups " + "RESTART IDENTITY CASCADE" + ) + ) + conn.execute( + text( + "TRUNCATE mitre_technique_tactics, mitre_subtechniques, " + "mitre_techniques, mitre_tactics RESTART IDENTITY CASCADE" + ) + ) + + +@pytest.fixture(scope="module") +def app(db_engine_or_skip, tmp_path_factory, monkeypatch_module): + _truncate_all(db_engine_or_skip) + # Re-seed catalogues that boot/seed handles in production but `_truncate_all` + # has just wiped. + bundle_path = tmp_path_factory.mktemp("m7") / "stix.json" + bundle_path.write_text(json.dumps(_MINIMAL_BUNDLE)) + mitre_svc.seed_mitre(source=bundle_path, expected_sha256=None) + detection_svc.seed_detection_levels() + # Point the evidence dir at a tmp location so test uploads don't pollute /data. + evidence_root = tmp_path_factory.mktemp("evidence") + monkeypatch_module.setattr( + "app.core.config.settings.EVIDENCE_DIR", str(evidence_root) + ) + flask_app = create_app() + flask_app.config.update(TESTING=True) + flask_app.config["EVIDENCE_ROOT"] = str(evidence_root) + return flask_app + + +@pytest.fixture(scope="module") +def monkeypatch_module(): + """Module-scoped monkeypatch — pytest's built-in is function-scoped only.""" + from _pytest.monkeypatch import MonkeyPatch # noqa: PLC0415 + + mp = MonkeyPatch() + yield mp + mp.undo() + + +@pytest.fixture() +def client(app): + return app.test_client() + + +def _unique_email(prefix: str) -> str: + return f"{prefix}-{secrets.token_hex(4)}@metamorph.local" + + +def _bearer(token: str) -> dict[str, str]: + return {"Authorization": f"Bearer {token}"} + + +def _login(client, email: str, password: str) -> str: + r = client.post("/api/v1/auth/login", json={"email": email, "password": password}) + assert r.status_code == 200, r.get_data(as_text=True) + return r.get_json()["access_token"] + + +@pytest.fixture(scope="module") +def admin(app): + token = regenerate_install_token() + email = _unique_email("admin") + password = "AdminPass1234!" + with app.test_client() as c: + r = c.post( + "/api/v1/setup", + json={"install_token": token, "email": email, "password": password}, + ) + assert r.status_code == 201, r.get_data(as_text=True) + return {"email": email, "password": password} + + +@pytest.fixture() +def admin_token(client, admin) -> str: + return _login(client, admin["email"], admin["password"]) + + +# --------------------------------------------------------------- catalogue -- + + +def _make_test_template(client, admin_token: str, name: str): + body = { + "name": name, + "description": "auto", + "objective": "do thing", + "procedure_md": f"# {name}", + "expected_result_red_md": "red expectation", + "expected_detection_blue_md": "blue expectation", + "opsec_level": "medium", + "tags": [], + "expected_iocs": [], + "mitre_tags": [{"kind": "technique", "external_id": "T1059"}], + } + r = client.post("/api/v1/test-templates", headers=_bearer(admin_token), json=body) + assert r.status_code == 201, r.get_data(as_text=True) + return r.get_json() + + +def _make_scenario(client, admin_token: str, name: str, test_ids: list[str]): + r = client.post( + "/api/v1/scenario-templates", + headers=_bearer(admin_token), + json={"name": name, "description": None, "test_template_ids": test_ids}, + ) + assert r.status_code == 201, r.get_data(as_text=True) + return r.get_json() + + +@pytest.fixture(scope="module") +def catalogue(app, admin): + with app.test_client() as c: + tok = _login(c, admin["email"], admin["password"]) + t1 = _make_test_template(c, tok, "exec-test") + sc = _make_scenario(c, tok, "exec-scenario", [t1["id"]]) + return {"test": t1, "scenario": sc} + + +# ----------------------------------------------------------------- users -- + + +def _invite_user(client, admin_token: str, prefix: str, group_codes: list[str]) -> dict: + grp = client.post( + "/api/v1/groups", + headers=_bearer(admin_token), + json={"name": f"{prefix}-grp-{secrets.token_hex(2)}"}, + ).get_json() + r_set = client.put( + f"/api/v1/groups/{grp['id']}/permissions", + headers=_bearer(admin_token), + json={"codes": group_codes}, + ) + assert r_set.status_code == 200, r_set.get_data(as_text=True) + + email = _unique_email(prefix) + password = "Pass1234!" + inv = client.post( + "/api/v1/invitations", + headers=_bearer(admin_token), + json={"email_hint": email, "group_ids": [grp["id"]]}, + ) + assert inv.status_code == 201, inv.get_data(as_text=True) + accept_token = inv.get_json()["token"] + r = client.post( + f"/api/v1/invitations/accept/{accept_token}", + json={"email": email, "password": password}, + ) + assert r.status_code == 201, r.get_data(as_text=True) + tok = _login(client, email, password) + me = client.get("/api/v1/auth/me", headers=_bearer(tok)).get_json() + return {"email": email, "password": password, "token": tok, "id": me["id"]} + + +@pytest.fixture() +def red_user(client, admin_token): + return _invite_user( + client, + admin_token, + "red", + [ + "mission.read", + "mission.create", + "mission.update", + "mission.write_red_fields", + "detection_level.read", + ], + ) + + +@pytest.fixture() +def blue_user(client, admin_token): + return _invite_user( + client, + admin_token, + "blue", + ["mission.read", "mission.write_blue_fields", "detection_level.read"], + ) + + +@pytest.fixture() +def reader_user(client, admin_token): + return _invite_user(client, admin_token, "reader", ["mission.read"]) + + +# Helper: bootstrap a mission with red+blue assigned and snapshot the catalogue. +def _make_mission(client, admin_token: str, *, name: str, scenario_id: str, + red_id: str | None = None, blue_id: str | None = None) -> dict: + members = [] + if red_id: + members.append({"user_id": red_id, "role_hint": "red"}) + if blue_id: + members.append({"user_id": blue_id, "role_hint": "blue"}) + r = client.post( + "/api/v1/missions", + headers=_bearer(admin_token), + json={ + "name": name, + "client_target": "Acme", + "scenario_template_ids": [scenario_id], + "members": members, + }, + ) + assert r.status_code == 201, r.get_data(as_text=True) + return r.get_json() + + +def _first_test_id(mission: dict) -> str: + return mission["scenarios"][0]["tests"][0]["id"] + + +# ================================================================ detection == + + +def test_detection_levels_seeded_and_listed(client, admin_token): + r = client.get("/api/v1/detection-levels", headers=_bearer(admin_token)) + assert r.status_code == 200 + body = r.get_json() + keys = [it["key"] for it in body["items"]] + # All four defaults must be present, in position order. + assert keys == ["detected_blocked", "detected_alert", "logged_only", "not_detected"] + # The default flag is on `not_detected` per the seed. + defaults = [it for it in body["items"] if it["is_default"]] + assert [d["key"] for d in defaults] == ["not_detected"] + # All are flagged system so M8 CRUD can distinguish operator-added levels. + assert all(it["is_system"] for it in body["items"]) + + +def test_detection_levels_requires_perm(client, admin_token, reader_user): + # The reader_user fixture has mission.read only — no detection_level.read. + r = client.get( + "/api/v1/detection-levels", headers=_bearer(reader_user["token"]) + ) + assert r.status_code == 403 + + +# ===================================================================== test == + + +def test_get_mission_test_returns_snapshot_state( + client, admin_token, catalogue, red_user +): + mission = _make_mission( + client, admin_token, name="m7-get", + scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"], + ) + test_id = _first_test_id(mission) + r = client.get( + f"/api/v1/missions/{mission['id']}/tests/{test_id}", + headers=_bearer(red_user["token"]), + ) + assert r.status_code == 200, r.get_data(as_text=True) + body = r.get_json() + assert body["state"] == "pending" + assert body["red_command"] is None + assert body["blue_comment_md"] is None + assert body["evidence"] == [] + assert body["mission_id"] == mission["id"] + + +def test_red_user_writes_red_fields(client, admin_token, catalogue, red_user): + mission = _make_mission( + client, admin_token, name="m7-red-write", + scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"], + ) + tid = _first_test_id(mission) + r = client.put( + f"/api/v1/missions/{mission['id']}/tests/{tid}", + headers=_bearer(red_user["token"]), + json={ + "red_command": "powershell -enc ZAB1AG0AeQA=", + "red_output": "{stdout}", + "red_comment_md": "executed via SYSTEM", + }, + ) + assert r.status_code == 200, r.get_data(as_text=True) + body = r.get_json() + assert body["red_command"] == "powershell -enc ZAB1AG0AeQA=" + assert body["red_comment_md"] == "executed via SYSTEM" + assert body["last_actor_email"] == red_user["email"] + + +def test_red_user_cannot_write_blue_fields(client, admin_token, catalogue, red_user): + mission = _make_mission( + client, admin_token, name="m7-red-blocked-blue", + scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"], + ) + tid = _first_test_id(mission) + r = client.put( + f"/api/v1/missions/{mission['id']}/tests/{tid}", + headers=_bearer(red_user["token"]), + json={"blue_comment_md": "should be blocked"}, + ) + assert r.status_code == 403, r.get_data(as_text=True) + + +def test_blue_user_cannot_write_red_fields(client, admin_token, catalogue, blue_user): + mission = _make_mission( + client, admin_token, name="m7-blue-blocked-red", + scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"], + ) + tid = _first_test_id(mission) + r = client.put( + f"/api/v1/missions/{mission['id']}/tests/{tid}", + headers=_bearer(blue_user["token"]), + json={"red_command": "echo nope"}, + ) + assert r.status_code == 403 + + +def test_blue_user_writes_blue_fields_and_picks_detection_level( + client, admin_token, catalogue, blue_user +): + mission = _make_mission( + client, admin_token, name="m7-blue-write", + scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"], + ) + tid = _first_test_id(mission) + # First fetch the detection levels. + levels = client.get( + "/api/v1/detection-levels", headers=_bearer(blue_user["token"]) + ).get_json()["items"] + not_detected = next(l for l in levels if l["key"] == "not_detected") + r = client.put( + f"/api/v1/missions/{mission['id']}/tests/{tid}", + headers=_bearer(blue_user["token"]), + json={ + "blue_comment_md": "no detection on SOC", + "detection_level_id": not_detected["id"], + }, + ) + assert r.status_code == 200, r.get_data(as_text=True) + body = r.get_json() + assert body["blue_comment_md"] == "no detection on SOC" + assert body["detection_level_id"] == not_detected["id"] + assert body["detection_level_key"] == "not_detected" + + +def test_mark_executed_stamps_executed_at( + client, admin_token, catalogue, red_user +): + mission = _make_mission( + client, admin_token, name="m7-exec", + scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"], + ) + tid = _first_test_id(mission) + r = client.post( + f"/api/v1/missions/{mission['id']}/tests/{tid}/transition", + headers=_bearer(red_user["token"]), + json={"target_state": "executed"}, + ) + assert r.status_code == 200, r.get_data(as_text=True) + body = r.get_json() + assert body["state"] == "executed" + assert body["executed_at"] is not None + assert body["executed_at_overridden"] is False + + +def test_executed_at_override_requires_red_perm_and_state( + client, admin_token, catalogue, red_user, blue_user +): + mission = _make_mission( + client, admin_token, name="m7-override", + scenario_id=catalogue["scenario"]["id"], + red_id=red_user["id"], blue_id=blue_user["id"], + ) + tid = _first_test_id(mission) + + # Override while still pending → invalid_request (no executed milestone yet). + bad = client.put( + f"/api/v1/missions/{mission['id']}/tests/{tid}", + headers=_bearer(red_user["token"]), + json={ + "executed_at": "2026-05-14T10:00:00+00:00", + "executed_at_overridden": True, + }, + ) + assert bad.status_code == 400, bad.get_data(as_text=True) + + # Mark executed first so we're allowed to override. + client.post( + f"/api/v1/missions/{mission['id']}/tests/{tid}/transition", + headers=_bearer(red_user["token"]), + json={"target_state": "executed"}, + ) + + # Blue cannot override (executed_at is a red field). + forbidden = client.put( + f"/api/v1/missions/{mission['id']}/tests/{tid}", + headers=_bearer(blue_user["token"]), + json={ + "executed_at": "2026-05-14T10:00:00+00:00", + "executed_at_overridden": True, + }, + ) + assert forbidden.status_code == 403 + + # Red successfully overrides. + ok = client.put( + f"/api/v1/missions/{mission['id']}/tests/{tid}", + headers=_bearer(red_user["token"]), + json={ + "executed_at": "2026-05-14T10:00:00+00:00", + "executed_at_overridden": True, + }, + ) + assert ok.status_code == 200, ok.get_data(as_text=True) + body = ok.get_json() + assert body["executed_at_overridden"] is True + assert body["executed_at"].startswith("2026-05-14T10:00:00") + + +def test_state_machine_rejects_invalid_transitions( + client, admin_token, catalogue, red_user +): + mission = _make_mission( + client, admin_token, name="m7-state", + scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"], + ) + tid = _first_test_id(mission) + # pending → reviewed_by_blue is not allowed (must go through executed first). + r = client.post( + f"/api/v1/missions/{mission['id']}/tests/{tid}/transition", + headers=_bearer(red_user["token"]), + json={"target_state": "reviewed_by_blue"}, + ) + assert r.status_code == 409 + + +def test_review_by_blue_requires_blue_perm( + client, admin_token, catalogue, red_user, blue_user +): + mission = _make_mission( + client, admin_token, name="m7-review", + scenario_id=catalogue["scenario"]["id"], + red_id=red_user["id"], blue_id=blue_user["id"], + ) + tid = _first_test_id(mission) + # red marks executed + r1 = client.post( + f"/api/v1/missions/{mission['id']}/tests/{tid}/transition", + headers=_bearer(red_user["token"]), + json={"target_state": "executed"}, + ) + assert r1.status_code == 200 + # red tries to mark reviewed_by_blue — denied (blue side) + r2 = client.post( + f"/api/v1/missions/{mission['id']}/tests/{tid}/transition", + headers=_bearer(red_user["token"]), + json={"target_state": "reviewed_by_blue"}, + ) + assert r2.status_code == 403 + # blue does it — OK + r3 = client.post( + f"/api/v1/missions/{mission['id']}/tests/{tid}/transition", + headers=_bearer(blue_user["token"]), + json={"target_state": "reviewed_by_blue"}, + ) + assert r3.status_code == 200 + assert r3.get_json()["state"] == "reviewed_by_blue" + + +def test_member_visibility_returns_404_for_outsiders( + client, admin_token, catalogue, red_user, reader_user +): + mission = _make_mission( + client, admin_token, name="m7-secret", + scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"], + ) + tid = _first_test_id(mission) + r = client.get( + f"/api/v1/missions/{mission['id']}/tests/{tid}", + headers=_bearer(reader_user["token"]), + ) + assert r.status_code == 404 + + +def test_admin_bypasses_membership(client, admin_token, catalogue, red_user): + mission = _make_mission( + client, admin_token, name="m7-admin-sees-all", + scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"], + ) + tid = _first_test_id(mission) + # Admin is not a member; sees the test anyway. + r = client.get( + f"/api/v1/missions/{mission['id']}/tests/{tid}", + headers=_bearer(admin_token), + ) + assert r.status_code == 200 + + +# ================================================================ evidence == + + +def _png_bytes(n: int) -> bytes: + """Return n bytes prefixed with a valid PNG magic so MIME sniffers cooperate.""" + return b"\x89PNG\r\n\x1a\n" + b"A" * max(0, n - 8) + + +def _upload(client, mission_id: str, test_id: str, token: str, *, + filename: str, content: bytes, mime: str = "image/png"): + return client.post( + f"/api/v1/missions/{mission_id}/tests/{test_id}/evidence", + headers=_bearer(token), + data={"file": (io.BytesIO(content), filename, mime)}, + content_type="multipart/form-data", + ) + + +def test_evidence_upload_small_succeeds_and_records_sha256( + client, admin_token, catalogue, blue_user +): + mission = _make_mission( + client, admin_token, name="m7-ev-small", + scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"], + ) + tid = _first_test_id(mission) + content = _png_bytes(1024) + expected = hashlib.sha256(content).hexdigest() + r = _upload(client, mission["id"], tid, blue_user["token"], + filename="screenshot.png", content=content, mime="image/png") + assert r.status_code == 201, r.get_data(as_text=True) + body = r.get_json() + assert body["sha256"] == expected + assert body["size_bytes"] == len(content) + assert body["original_filename"] == "screenshot.png" + assert body["mime"] == "image/png" + + +def test_evidence_upload_24mb_succeeds_26mb_rejected( + client, admin_token, catalogue, blue_user +): + mission = _make_mission( + client, admin_token, name="m7-ev-boundaries", + scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"], + ) + tid = _first_test_id(mission) + twenty_four = _png_bytes(24 * 1024 * 1024) + ok = _upload( + client, mission["id"], tid, blue_user["token"], + filename="lab.evtx", content=twenty_four, mime="application/octet-stream", + ) + assert ok.status_code == 201, ok.get_data(as_text=True)[:200] + + twenty_six = _png_bytes(26 * 1024 * 1024) + too_big = _upload( + client, mission["id"], tid, blue_user["token"], + filename="huge.evtx", content=twenty_six, mime="application/octet-stream", + ) + assert too_big.status_code == 400 + assert too_big.get_json()["error"] == "too_large" + + +def test_evidence_upload_rejects_unsupported_extension( + client, admin_token, catalogue, blue_user +): + mission = _make_mission( + client, admin_token, name="m7-ev-ext", + scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"], + ) + tid = _first_test_id(mission) + r = _upload( + client, mission["id"], tid, blue_user["token"], + filename="evil.exe", content=b"\x4d\x5a", mime="application/octet-stream", + ) + assert r.status_code == 400 + assert r.get_json()["error"] == "unsupported_extension" + + +def test_evidence_upload_requires_blue_perm( + client, admin_token, catalogue, red_user, blue_user +): + mission = _make_mission( + client, admin_token, name="m7-ev-perm", + scenario_id=catalogue["scenario"]["id"], + red_id=red_user["id"], blue_id=blue_user["id"], + ) + tid = _first_test_id(mission) + r = _upload( + client, mission["id"], tid, red_user["token"], + filename="note.txt", content=b"hi", mime="text/plain", + ) + assert r.status_code == 403 + + +def test_evidence_download_returns_bytes(client, admin_token, catalogue, blue_user): + mission = _make_mission( + client, admin_token, name="m7-ev-dl", + scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"], + ) + tid = _first_test_id(mission) + content = b"hello evidence\n" + upl = _upload( + client, mission["id"], tid, blue_user["token"], + filename="note.txt", content=content, mime="text/plain", + ).get_json() + eid = upl["id"] + + meta = client.get( + f"/api/v1/evidence/{eid}", headers=_bearer(blue_user["token"]) + ) + assert meta.status_code == 200 + assert meta.get_json()["sha256"] == hashlib.sha256(content).hexdigest() + + dl = client.get( + f"/api/v1/evidence/{eid}?download=true", + headers=_bearer(blue_user["token"]), + ) + assert dl.status_code == 200 + assert dl.data == content + + +def test_evidence_soft_delete_hides_it_from_test_detail( + client, admin_token, catalogue, blue_user +): + mission = _make_mission( + client, admin_token, name="m7-ev-del", + scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"], + ) + tid = _first_test_id(mission) + upl = _upload( + client, mission["id"], tid, blue_user["token"], + filename="evidence.json", content=b'{"ok":true}', + mime="application/json", + ).get_json() + eid = upl["id"] + + detail_before = client.get( + f"/api/v1/missions/{mission['id']}/tests/{tid}", + headers=_bearer(blue_user["token"]), + ).get_json() + assert len(detail_before["evidence"]) == 1 + + r = client.delete( + f"/api/v1/evidence/{eid}", headers=_bearer(blue_user["token"]) + ) + assert r.status_code == 200 + detail_after = client.get( + f"/api/v1/missions/{mission['id']}/tests/{tid}", + headers=_bearer(blue_user["token"]), + ).get_json() + assert detail_after["evidence"] == [] + + +def test_idempotent_transition_still_checks_side_perm( + client, admin_token, catalogue, red_user, blue_user +): + """A blue-only user re-POSTing target_state=executed on an already-executed + test must NOT receive 200 — even though no write happens, returning success + falsely implies they hold the red-side perm. See post-review fix C1.""" + mission = _make_mission( + client, admin_token, name="m7-idemp-side", + scenario_id=catalogue["scenario"]["id"], + red_id=red_user["id"], blue_id=blue_user["id"], + ) + tid = _first_test_id(mission) + # Red marks executed. + r1 = client.post( + f"/api/v1/missions/{mission['id']}/tests/{tid}/transition", + headers=_bearer(red_user["token"]), + json={"target_state": "executed"}, + ) + assert r1.status_code == 200 + # Blue replays the same transition — must be 403, not 200. + r2 = client.post( + f"/api/v1/missions/{mission['id']}/tests/{tid}/transition", + headers=_bearer(blue_user["token"]), + json={"target_state": "executed"}, + ) + assert r2.status_code == 403 + + +def test_evidence_member_of_other_mission_gets_404( + client, admin_token, catalogue, blue_user +): + """A user who is a blue member of mission B must NOT be able to read an + evidence row belonging to mission A — the chain walk must collapse to 404.""" + mission_a = _make_mission( + client, admin_token, name="m7-ev-cross-a", + scenario_id=catalogue["scenario"]["id"], + # blue_user is NOT a member of A + ) + tid_a = _first_test_id(mission_a) + # Admin uploads on mission A. + upl = _upload( + client, mission_a["id"], tid_a, admin_token, + filename="a.txt", content=b"secret", mime="text/plain", + ).get_json() + eid = upl["id"] + + # blue_user joins mission B but tries to read mission A's evidence. + _make_mission( + client, admin_token, name="m7-ev-cross-b", + scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"], + ) + r = client.get(f"/api/v1/evidence/{eid}", headers=_bearer(blue_user["token"])) + assert r.status_code == 404 + + +def test_evidence_non_member_gets_404(client, admin_token, catalogue, blue_user, + reader_user): + mission = _make_mission( + client, admin_token, name="m7-ev-leak", + scenario_id=catalogue["scenario"]["id"], blue_id=blue_user["id"], + ) + tid = _first_test_id(mission) + upl = _upload( + client, mission["id"], tid, blue_user["token"], + filename="a.txt", content=b"x", mime="text/plain", + ).get_json() + eid = upl["id"] + r = client.get(f"/api/v1/evidence/{eid}", headers=_bearer(reader_user["token"])) + assert r.status_code == 404 + + +# ================================================================ activity == + + +def test_activity_polling_returns_recent_changes( + client, admin_token, catalogue, red_user +): + mission = _make_mission( + client, admin_token, name="m7-activity", + scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"], + ) + tid = _first_test_id(mission) + + # Baseline timestamp from server, then a write 'after' it should appear. + now = client.get( + f"/api/v1/missions/{mission['id']}/activity", + headers=_bearer(red_user["token"]), + ) + assert now.status_code == 200 + server_t = now.get_json()["server_time"] + + # Mutate via PUT to bump updated_at. + client.put( + f"/api/v1/missions/{mission['id']}/tests/{tid}", + headers=_bearer(red_user["token"]), + json={"red_comment_md": "kicked off"}, + ) + # `since` must be URL-encoded — its `+` and `:` would otherwise be mangled. + since_q = urllib.parse.quote(server_t) + fresh = client.get( + f"/api/v1/missions/{mission['id']}/activity?since={since_q}", + headers=_bearer(red_user["token"]), + ) + assert fresh.status_code == 200 + items = fresh.get_json()["items"] + assert len(items) >= 1 + assert items[0]["test_id"] == tid + assert items[0]["last_actor_email"] == red_user["email"] + + +def test_activity_invalid_since_returns_400(client, admin_token, catalogue, red_user): + mission = _make_mission( + client, admin_token, name="m7-activity-bad", + scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"], + ) + r = client.get( + f"/api/v1/missions/{mission['id']}/activity?since=not-a-date", + headers=_bearer(red_user["token"]), + ) + assert r.status_code == 400 + + +def test_activity_404_for_non_member(client, admin_token, catalogue, red_user, + reader_user): + mission = _make_mission( + client, admin_token, name="m7-activity-leak", + scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"], + ) + r = client.get( + f"/api/v1/missions/{mission['id']}/activity", + headers=_bearer(reader_user["token"]), + ) + assert r.status_code == 404 + + +def test_activity_since_in_future_returns_empty( + client, admin_token, catalogue, red_user +): + mission = _make_mission( + client, admin_token, name="m7-activity-future", + scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"], + ) + future = (datetime.now(tz=timezone.utc) + timedelta(hours=1)).isoformat() + since_q = urllib.parse.quote(future) + r = client.get( + f"/api/v1/missions/{mission['id']}/activity?since={since_q}", + headers=_bearer(red_user["token"]), + ) + assert r.status_code == 200 + assert r.get_json()["items"] == [] + + +def test_unknown_test_id_returns_404(client, admin_token, catalogue, red_user): + mission = _make_mission( + client, admin_token, name="m7-unknown-test", + scenario_id=catalogue["scenario"]["id"], red_id=red_user["id"], + ) + fake = str(uuid.uuid4()) + r = client.get( + f"/api/v1/missions/{mission['id']}/tests/{fake}", + headers=_bearer(red_user["token"]), + ) + assert r.status_code == 404 diff --git a/e2e/tests/m7-execution.spec.ts b/e2e/tests/m7-execution.spec.ts new file mode 100644 index 0000000..a6dd0ae --- /dev/null +++ b/e2e/tests/m7-execution.spec.ts @@ -0,0 +1,439 @@ +import { expect, test, type APIRequestContext } from '@playwright/test'; + +/** + * M7 — Red/blue execution on a mission test. + * + * Scope (cf. tasks/spec.md §M7): + * - Field-level perm gating (write_red_fields vs write_blue_fields). + * - State machine transitions and side-effect on `executed_at`. + * - Evidence upload: 24 MB ok, 26 MB rejected, SHA256 verified. + * - Activity polling endpoint surfaces the last actor. + * - SPA: the per-test page exposes both zones, accepts a small file via the + * dropzone, and shows the "modified by X" badge after a write. + * + * `afterAll` restores `admin@metamorph.local` / `AdminPass1234!` and re-syncs + * MITRE so subsequent manual sessions are not staring at an empty stack. + */ + +const ADMIN_EMAIL = `m7-admin-${crypto.randomUUID().slice(0, 8)}@metamorph.local`; +const ADMIN_PASSWORD = 'AdminPass1234!'; + +async function resetAndMintToken(request: APIRequestContext): Promise { + const r = await request.post('/api/v1/diag/reset'); + expect(r.status()).toBe(200); + return (await r.json()).install_token as string; +} + +async function login( + request: APIRequestContext, + email: string, + password: string, +): Promise { + const r = await request.post('/api/v1/auth/login', { + data: { email, password }, + }); + expect(r.status()).toBe(200); + return (await r.json()).access_token as string; +} + +async function inviteUser( + request: APIRequestContext, + adminAuth: Record, + prefix: string, + groupCodes: string[], +): Promise<{ email: string; password: string; token: string; id: string }> { + const grp = await request.post('/api/v1/groups', { + headers: adminAuth, + data: { name: `${prefix}-${crypto.randomUUID().slice(0, 4)}` }, + }); + expect(grp.status()).toBe(201); + const grpId = (await grp.json()).id as string; + const setPerms = await request.put(`/api/v1/groups/${grpId}/permissions`, { + headers: adminAuth, + data: { codes: groupCodes }, + }); + expect(setPerms.status()).toBe(200); + const email = `${prefix}-${crypto.randomUUID().slice(0, 6)}@metamorph.local`; + const inv = await request.post('/api/v1/invitations', { + headers: adminAuth, + data: { email_hint: email, group_ids: [grpId] }, + }); + expect(inv.status()).toBe(201); + const inviteToken = (await inv.json()).token as string; + const password = 'Pass1234!'; + const accept = await request.post( + `/api/v1/invitations/accept/${inviteToken}`, + { data: { email, password } }, + ); + expect(accept.status()).toBe(201); + const token = await login(request, email, password); + const me = await request.get('/api/v1/auth/me', { + headers: { Authorization: `Bearer ${token}` }, + }); + expect(me.status()).toBe(200); + return { email, password, token, id: (await me.json()).id as string }; +} + +test.describe.configure({ mode: 'serial' }); + +test.describe('M7 — Test execution', () => { + let templateId = ''; + let scenarioId = ''; + + test.beforeAll(async ({ request }) => { + const installToken = await resetAndMintToken(request); + const setup = await request.post('/api/v1/setup', { + data: { + install_token: installToken, + email: ADMIN_EMAIL, + password: ADMIN_PASSWORD, + }, + }); + expect(setup.status()).toBe(201); + const access = await login(request, ADMIN_EMAIL, ADMIN_PASSWORD); + const sync = await request.post('/api/v1/mitre/sync', { + headers: { Authorization: `Bearer ${access}` }, + }); + expect(sync.status()).toBe(200); + const auth = { Authorization: `Bearer ${access}` }; + + const t = await request.post('/api/v1/test-templates', { + headers: auth, + data: { + name: 'm7-test', + description: 'auto', + objective: 'do thing', + procedure_md: '# steps', + expected_result_red_md: 'red expects', + expected_detection_blue_md: 'blue expects', + opsec_level: 'medium', + tags: [], + expected_iocs: [], + mitre_tags: [{ kind: 'technique', external_id: 'T1059' }], + }, + }); + expect(t.status()).toBe(201); + templateId = (await t.json()).id as string; + + const sc = await request.post('/api/v1/scenario-templates', { + headers: auth, + data: { + name: 'm7-scenario', + description: 'auto', + test_template_ids: [templateId], + }, + }); + expect(sc.status()).toBe(201); + scenarioId = (await sc.json()).id as string; + }); + + test.afterAll(async ({ request }) => { + const installToken = await resetAndMintToken(request); + await request.post('/api/v1/setup', { + data: { + install_token: installToken, + email: 'admin@metamorph.local', + password: 'AdminPass1234!', + }, + }); + const access = await login( + request, + 'admin@metamorph.local', + 'AdminPass1234!', + ); + await request.post('/api/v1/mitre/sync', { + headers: { Authorization: `Bearer ${access}` }, + }); + }); + + // ----------------------------------------------------------------------- + // API — field-level perm gating + state machine + // ----------------------------------------------------------------------- + + test('red-only user cannot write blue fields; blue-only user cannot write red', async ({ + request, + }) => { + const adminAccess = await login(request, ADMIN_EMAIL, ADMIN_PASSWORD); + const adminAuth = { Authorization: `Bearer ${adminAccess}` }; + + const red = await inviteUser(request, adminAuth, 'red', [ + 'mission.read', + 'mission.create', + 'mission.write_red_fields', + 'detection_level.read', + ]); + const blue = await inviteUser(request, adminAuth, 'blue', [ + 'mission.read', + 'mission.write_blue_fields', + 'detection_level.read', + ]); + + const mission = await request.post('/api/v1/missions', { + headers: { Authorization: `Bearer ${red.token}` }, + data: { + name: 'm7-fields', + scenario_template_ids: [scenarioId], + members: [ + { user_id: red.id, role_hint: 'red' }, + { user_id: blue.id, role_hint: 'blue' }, + ], + }, + }); + expect(mission.status()).toBe(201); + const m = await mission.json(); + const testId = m.scenarios[0].tests[0].id as string; + + const redCannotBlue = await request.put( + `/api/v1/missions/${m.id}/tests/${testId}`, + { + headers: { Authorization: `Bearer ${red.token}` }, + data: { blue_comment_md: 'should be blocked' }, + }, + ); + expect(redCannotBlue.status()).toBe(403); + + const blueCannotRed = await request.put( + `/api/v1/missions/${m.id}/tests/${testId}`, + { + headers: { Authorization: `Bearer ${blue.token}` }, + data: { red_command: 'should be blocked' }, + }, + ); + expect(blueCannotRed.status()).toBe(403); + }); + + test('mark-executed stamps executed_at and gates reviewed_by_blue to blue side', async ({ + request, + }) => { + const adminAccess = await login(request, ADMIN_EMAIL, ADMIN_PASSWORD); + const adminAuth = { Authorization: `Bearer ${adminAccess}` }; + const red = await inviteUser(request, adminAuth, 'red', [ + 'mission.read', + 'mission.create', + 'mission.write_red_fields', + ]); + const blue = await inviteUser(request, adminAuth, 'blue', [ + 'mission.read', + 'mission.write_blue_fields', + ]); + const mission = await request.post('/api/v1/missions', { + headers: { Authorization: `Bearer ${red.token}` }, + data: { + name: 'm7-state', + scenario_template_ids: [scenarioId], + members: [ + { user_id: red.id, role_hint: 'red' }, + { user_id: blue.id, role_hint: 'blue' }, + ], + }, + }); + const m = await mission.json(); + const testId = m.scenarios[0].tests[0].id as string; + + const execute = await request.post( + `/api/v1/missions/${m.id}/tests/${testId}/transition`, + { + headers: { Authorization: `Bearer ${red.token}` }, + data: { target_state: 'executed' }, + }, + ); + expect(execute.status()).toBe(200); + const executedBody = await execute.json(); + expect(executedBody.state).toBe('executed'); + expect(executedBody.executed_at).not.toBeNull(); + + // Red cannot review_by_blue. + const redReview = await request.post( + `/api/v1/missions/${m.id}/tests/${testId}/transition`, + { + headers: { Authorization: `Bearer ${red.token}` }, + data: { target_state: 'reviewed_by_blue' }, + }, + ); + expect(redReview.status()).toBe(403); + + const blueReview = await request.post( + `/api/v1/missions/${m.id}/tests/${testId}/transition`, + { + headers: { Authorization: `Bearer ${blue.token}` }, + data: { target_state: 'reviewed_by_blue' }, + }, + ); + expect(blueReview.status()).toBe(200); + expect((await blueReview.json()).state).toBe('reviewed_by_blue'); + }); + + // ----------------------------------------------------------------------- + // API — evidence upload + // ----------------------------------------------------------------------- + + test('evidence upload — 24 MB accepted, 26 MB rejected, SHA256 verified', async ({ + request, + }) => { + const adminAccess = await login(request, ADMIN_EMAIL, ADMIN_PASSWORD); + const adminAuth = { Authorization: `Bearer ${adminAccess}` }; + const blue = await inviteUser(request, adminAuth, 'blue', [ + 'mission.read', + 'mission.write_blue_fields', + ]); + + const mission = await request.post('/api/v1/missions', { + headers: adminAuth, + data: { + name: 'm7-evidence', + scenario_template_ids: [scenarioId], + members: [{ user_id: blue.id, role_hint: 'blue' }], + }, + }); + expect(mission.status()).toBe(201); + const m = await mission.json(); + const testId = m.scenarios[0].tests[0].id as string; + + const headerOk = Buffer.from([0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a]); + const tail24 = Buffer.alloc(24 * 1024 * 1024 - headerOk.length, 0x41); + const file24 = Buffer.concat([headerOk, tail24]); + const expected = await crypto.subtle.digest('SHA-256', file24); + const expectedHex = Array.from(new Uint8Array(expected)) + .map((b) => b.toString(16).padStart(2, '0')) + .join(''); + + const ok = await request.post( + `/api/v1/missions/${m.id}/tests/${testId}/evidence`, + { + headers: { Authorization: `Bearer ${blue.token}` }, + multipart: { + file: { + name: 'lab.evtx', + mimeType: 'application/octet-stream', + buffer: file24, + }, + }, + }, + ); + expect(ok.status()).toBe(201); + const body = await ok.json(); + expect(body.size_bytes).toBe(file24.length); + expect(body.sha256).toBe(expectedHex); + + const file26 = Buffer.concat([ + headerOk, + Buffer.alloc(26 * 1024 * 1024 - headerOk.length, 0x41), + ]); + const tooBig = await request.post( + `/api/v1/missions/${m.id}/tests/${testId}/evidence`, + { + headers: { Authorization: `Bearer ${blue.token}` }, + multipart: { + file: { + name: 'huge.evtx', + mimeType: 'application/octet-stream', + buffer: file26, + }, + }, + }, + ); + expect(tooBig.status()).toBe(400); + expect((await tooBig.json()).error).toBe('too_large'); + + const evictGet = await request.get( + `/api/v1/evidence/${body.id}?download=true`, + { headers: { Authorization: `Bearer ${blue.token}` } }, + ); + expect(evictGet.status()).toBe(200); + const dlBytes = await evictGet.body(); + expect(dlBytes.length).toBe(file24.length); + }); + + // ----------------------------------------------------------------------- + // SPA — per-test page edits & uploads + // ----------------------------------------------------------------------- + + test('SPA — per-test page: red comment save bumps activity badge', async ({ + page, + request, + }) => { + const adminAccess = await login(request, ADMIN_EMAIL, ADMIN_PASSWORD); + const adminAuth = { Authorization: `Bearer ${adminAccess}` }; + const red = await inviteUser(request, adminAuth, 'red', [ + 'mission.read', + 'mission.create', + 'mission.update', + 'mission.write_red_fields', + 'detection_level.read', + ]); + + const mission = await request.post('/api/v1/missions', { + headers: { Authorization: `Bearer ${red.token}` }, + data: { + name: 'm7-spa', + scenario_template_ids: [scenarioId], + members: [{ user_id: red.id, role_hint: 'red' }], + }, + }); + const m = await mission.json(); + const testId = m.scenarios[0].tests[0].id as string; + + // Log the SPA in as the red user. + await page.goto('/login'); + await page.getByLabel(/email/i).fill(red.email); + await page.getByLabel(/password/i).fill(red.password); + await page.getByRole('button', { name: /sign in/i }).click(); + await expect(page.getByTestId('me-email')).toHaveText(red.email); + + await page.goto(`/missions/${m.id}/tests/${testId}`); + await expect(page.getByTestId('mission-test-page')).toBeVisible(); + await expect(page.getByTestId('state-pill')).toContainText(/Pending/); + + // Fill the red command + comment, then save. + await page.getByTestId('red-command').fill('whoami /priv'); + await page.getByTestId('red-comment').fill('Verified locally'); + await page.getByTestId('red-save').click(); + + // After save the state-pill stays Pending (only transitions change it). + await expect(page.getByTestId('state-pill')).toContainText(/Pending/); + + // Now transition to executed via the header button. + await page.getByTestId('transition-executed').click(); + await expect(page.getByTestId('state-pill')).toContainText(/Executed/); + + // The "last touched" line should now mention the red user. + await expect(page.locator('text=/Last touched/')).toBeVisible(); + }); + + test('SPA — non-member sees 404 message instead of mission content', async ({ + page, + request, + }) => { + const adminAccess = await login(request, ADMIN_EMAIL, ADMIN_PASSWORD); + const adminAuth = { Authorization: `Bearer ${adminAccess}` }; + const owner = await inviteUser(request, adminAuth, 'own', [ + 'mission.read', + 'mission.create', + 'mission.write_red_fields', + ]); + const outsider = await inviteUser(request, adminAuth, 'out', [ + 'mission.read', + ]); + + const mission = await request.post('/api/v1/missions', { + headers: { Authorization: `Bearer ${owner.token}` }, + data: { + name: 'm7-private', + scenario_template_ids: [scenarioId], + }, + }); + const m = await mission.json(); + const testId = m.scenarios[0].tests[0].id as string; + + await page.goto('/login'); + await page.getByLabel(/email/i).fill(outsider.email); + await page.getByLabel(/password/i).fill(outsider.password); + await page.getByRole('button', { name: /sign in/i }).click(); + await expect(page.getByTestId('me-email')).toHaveText(outsider.email); + + await page.goto(`/missions/${m.id}/tests/${testId}`); + await expect( + page.locator('text=/Mission test not found/'), + ).toBeVisible(); + }); +}); diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index ff957a1..5fa4ac3 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -13,6 +13,7 @@ import { HomePage } from '@/pages/HomePage'; import { MitrePage } from '@/pages/MitrePage'; import { LoginPage } from '@/pages/LoginPage'; import { MissionDetailPage } from '@/pages/MissionDetailPage'; +import { MissionTestPage } from '@/pages/MissionTestPage'; import { MissionsCreatePage } from '@/pages/MissionsCreatePage'; import { MissionsListPage } from '@/pages/MissionsListPage'; import { ProfilePage } from '@/pages/ProfilePage'; @@ -87,6 +88,14 @@ function App() { } /> + + + + } + /> = { completed: 'Completed', archived: 'Archived', }; + +// =========================================================================== // +// M7 — per-test execution +// =========================================================================== // + +export interface DetectionLevel { + id: string; + key: string; + label_fr: string; + label_en: string; + color_token: string; + position: number; + is_default: boolean; + is_system: boolean; +} + +export interface DetectionLevelList { + items: DetectionLevel[]; +} + +export interface MissionTestEvidence { + id: string; + mission_test_id: string; + sha256: string; + mime: string; + size_bytes: number; + original_filename: string; + uploaded_by_user_id: string | null; + uploaded_by_email: string | null; + uploaded_by_display_name: string | null; + uploaded_at: string; + created_at: string; +} + +export interface MissionTestDetail { + id: string; + mission_id: string; + scenario_id: string; + position: number; + snapshot_name: string; + snapshot_description: string | null; + snapshot_objective: string | null; + snapshot_procedure_md: string | null; + snapshot_prerequisites_md: string | null; + snapshot_expected_red_md: string | null; + snapshot_expected_blue_md: string | null; + snapshot_opsec_level: MissionOpsecLevel; + snapshot_tags: string[]; + snapshot_expected_iocs: string[]; + state: MissionTestState; + executed_at: string | null; + executed_at_overridden: boolean; + red_command: string | null; + red_output: string | null; + red_comment_md: string | null; + blue_comment_md: string | null; + detection_level_id: string | null; + detection_level_key: string | null; + last_actor_id: string | null; + last_actor_email: string | null; + last_actor_display_name: string | null; + updated_at: string; + mitre_tags: MissionMitreTag[]; + evidence: MissionTestEvidence[]; +} + +export interface UpdateMissionTestPayload { + red_command?: string | null; + red_output?: string | null; + red_comment_md?: string | null; + blue_comment_md?: string | null; + detection_level_id?: string | null; + executed_at?: string | null; + executed_at_overridden?: boolean; +} + +export interface TestTransitionPayload { + target_state: MissionTestState; +} + +export interface ActivityEntry { + test_id: string; + scenario_id: string; + state: MissionTestState; + updated_at: string; + last_actor_id: string | null; + last_actor_email: string | null; + last_actor_display_name: string | null; +} + +export interface ActivityResponse { + items: ActivityEntry[]; + server_time: string; +} + +export const missionTestKeys = { + detail: (missionId: string, testId: string) => + ['missions', 'detail', missionId, 'tests', testId] as const, + activity: (missionId: string) => ['missions', missionId, 'activity'] as const, + detectionLevels: () => ['detection-levels'] as const, +}; + +export const MISSION_TEST_STATE_LABEL: Record = { + pending: 'Pending', + executed: 'Executed', + reviewed_by_blue: 'Reviewed', + skipped: 'Skipped', + blocked: 'Blocked', +}; + +export const MISSION_TEST_STATE_ACCENT: Record< + MissionTestState, + 'teal' | 'orange' | 'green' | 'rose' | 'red' +> = { + pending: 'teal', + executed: 'orange', + reviewed_by_blue: 'green', + skipped: 'rose', + blocked: 'red', +}; + +// Front-end mirror of the backend state-machine matrix so the UI only renders +// the transitions the server will accept. Keep this in sync with +// `app/services/mission_tests.py::_VALID_TRANSITIONS`. +export const VALID_TEST_TRANSITIONS: Record = { + pending: ['executed', 'skipped', 'blocked'], + executed: ['reviewed_by_blue', 'pending'], + reviewed_by_blue: ['executed'], + skipped: ['pending'], + blocked: ['pending'], +}; diff --git a/frontend/src/pages/HomePage.tsx b/frontend/src/pages/HomePage.tsx index 6f1af48..ad9a623 100644 --- a/frontend/src/pages/HomePage.tsx +++ b/frontend/src/pages/HomePage.tsx @@ -61,7 +61,7 @@ export function HomePage() { Purple Team Platform

- Collaborative red & blue test orchestration — M6 milestone (Missions & snapshot) + Collaborative red & blue test orchestration — M7 milestone (Red & blue execution)

- M0 + M1 + M2 + M3 + M4 + M5 + M6 done. Next:{' '} + M0 + M1 + M2 + M3 + M4 + M5 + M6 + M7 done. Next:{' '} - M7 — Red & blue execution on a mission test + M8 — Custom detection-level taxonomy .

diff --git a/frontend/src/pages/MissionDetailPage.tsx b/frontend/src/pages/MissionDetailPage.tsx index ce50856..eefac1c 100644 --- a/frontend/src/pages/MissionDetailPage.tsx +++ b/frontend/src/pages/MissionDetailPage.tsx @@ -1,6 +1,6 @@ import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query'; import { useEffect, useMemo, useState } from 'react'; -import { useNavigate, useParams } from 'react-router-dom'; +import { Link, useNavigate, useParams } from 'react-router-dom'; import { MarkdownField } from '@/components/MarkdownField'; import { Alert } from '@/components/ui/Alert'; @@ -679,11 +679,19 @@ export function MissionDetailPage() { {sc.tests.map((t) => ( {t.position + 1} - {t.snapshot_name} + + + {t.snapshot_name} + +
{t.mitre_tags.map((tag) => ( diff --git a/frontend/src/pages/MissionTestPage.tsx b/frontend/src/pages/MissionTestPage.tsx new file mode 100644 index 0000000..fc6a8ef --- /dev/null +++ b/frontend/src/pages/MissionTestPage.tsx @@ -0,0 +1,750 @@ +/** + * Per-test execution page (M7). + * + * Two zones, mirror of the spec §M7: + * - Red zone (red border) — command, output, markdown comment, mark-executed. + * - Blue zone (cyan border) — detection level, markdown comment, evidence dropzone. + * + * State transitions are driven from a small button row in the header. The + * "modified by X Ns ago" indicator polls `/missions/{id}/activity?since=…` + * every 15s while the page is mounted (and the document is visible). + * + * Field-level permissions: + * - Admins always see and write everything. + * - `mission.write_red_fields` enables the red-side form. + * - `mission.write_blue_fields` enables the blue-side form + uploads. + * The server is the ultimate arbiter (PUT/POST will 403 if a side is forbidden); + * the UI just disables inputs the user cannot write to reduce confusion. + */ + +import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query'; +import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; +import { Link, useNavigate, useParams } from 'react-router-dom'; + +import { MarkdownField } from '@/components/MarkdownField'; +import { Alert } from '@/components/ui/Alert'; +import { Button } from '@/components/ui/Button'; +import { Card } from '@/components/ui/Card'; +import { SectionHeader } from '@/components/ui/SectionHeader'; +import { Tag } from '@/components/ui/Tag'; +import { TextField } from '@/components/ui/TextField'; +import { + ApiError, + apiDelete, + apiFetch, + apiGet, + apiPost, + apiPut, +} from '@/lib/api'; +import { useAuth } from '@/lib/auth'; +import { + MISSION_TEST_STATE_ACCENT, + MISSION_TEST_STATE_LABEL, + VALID_TEST_TRANSITIONS, + missionKeys, + missionTestKeys, + type ActivityResponse, + type DetectionLevel, + type DetectionLevelList, + type MissionTestDetail, + type MissionTestEvidence, + type TestTransitionPayload, + type UpdateMissionTestPayload, +} from '@/lib/missions'; + +const POLL_INTERVAL_MS = 15_000; + +function formatBytes(n: number): string { + if (n < 1024) return `${n} B`; + if (n < 1024 * 1024) return `${(n / 1024).toFixed(1)} KB`; + return `${(n / (1024 * 1024)).toFixed(2)} MB`; +} + +function formatRelative(iso: string | null | undefined): string { + if (!iso) return 'never'; + const then = new Date(iso).getTime(); + const now = Date.now(); + const seconds = Math.max(0, Math.floor((now - then) / 1000)); + if (seconds < 60) return `${seconds}s ago`; + const minutes = Math.floor(seconds / 60); + if (minutes < 60) return `${minutes}m ago`; + const hours = Math.floor(minutes / 60); + if (hours < 24) return `${hours}h ago`; + const days = Math.floor(hours / 24); + return `${days}d ago`; +} + +function useMissionTest(missionId: string, testId: string) { + return useQuery({ + queryKey: missionTestKeys.detail(missionId, testId), + queryFn: () => + apiGet(`/missions/${missionId}/tests/${testId}`), + enabled: !!missionId && !!testId, + }); +} + +function useDetectionLevels(enabled: boolean) { + return useQuery({ + queryKey: missionTestKeys.detectionLevels(), + queryFn: () => apiGet('/detection-levels'), + enabled, + staleTime: 60_000, + }); +} + +// --------------------------------------------------------------------------- // +// Activity indicator // +// --------------------------------------------------------------------------- // + +function useActivityWatcher( + missionId: string, + testId: string, + onTouched: () => void, +) { + const lastServerTimeRef = useRef(null); + const onTouchedRef = useRef(onTouched); + onTouchedRef.current = onTouched; + + useEffect(() => { + if (!missionId) return; + let cancelled = false; + + async function poll() { + try { + const since = lastServerTimeRef.current; + const url = + `/missions/${missionId}/activity` + + (since ? `?since=${encodeURIComponent(since)}` : ''); + const res = await apiGet(url); + if (cancelled) return; + lastServerTimeRef.current = res.server_time; + // We only care about activity on the same test (the badge is local). + if (res.items.some((it) => it.test_id === testId)) { + onTouchedRef.current(); + } + } catch { + // Network blips are non-fatal — the badge just doesn't refresh. + } + } + + // Prime the timestamp without firing onTouched. + void poll(); + const handle = window.setInterval(() => { + if (document.visibilityState === 'visible') void poll(); + }, POLL_INTERVAL_MS); + return () => { + cancelled = true; + window.clearInterval(handle); + }; + }, [missionId, testId]); +} + +// --------------------------------------------------------------------------- // +// Red zone // +// --------------------------------------------------------------------------- // + +interface RedZoneProps { + test: MissionTestDetail; + missionId: string; + canWriteRed: boolean; +} + +function RedZone({ test, missionId, canWriteRed }: RedZoneProps) { + const qc = useQueryClient(); + const [command, setCommand] = useState(test.red_command ?? ''); + const [output, setOutput] = useState(test.red_output ?? ''); + const [comment, setComment] = useState(test.red_comment_md ?? ''); + const [override, setOverride] = useState(test.executed_at_overridden); + const [executedAt, setExecutedAt] = useState(test.executed_at ?? ''); + const dirty = + command !== (test.red_command ?? '') || + output !== (test.red_output ?? '') || + comment !== (test.red_comment_md ?? '') || + override !== test.executed_at_overridden || + (override && executedAt !== (test.executed_at ?? '')); + + // Sync local state when a refetch lands a newer version of the test + // (concurrent collaboration: blue's edit shouldn't blow away our local + // unsaved changes, but a fresh load should). + useEffect(() => { + setCommand(test.red_command ?? ''); + setOutput(test.red_output ?? ''); + setComment(test.red_comment_md ?? ''); + setOverride(test.executed_at_overridden); + setExecutedAt(test.executed_at ?? ''); + // We deliberately reset on every test reload — see comment above. + }, [test]); + + const save = useMutation({ + mutationFn: (body: UpdateMissionTestPayload) => + apiPut( + `/missions/${missionId}/tests/${test.id}`, + body, + ), + onSuccess: (next) => { + qc.setQueryData(missionTestKeys.detail(missionId, test.id), next); + qc.invalidateQueries({ queryKey: missionKeys.detail(missionId) }); + }, + }); + + function submit() { + const body: UpdateMissionTestPayload = { + red_command: command.trim() || null, + red_output: output.length === 0 ? null : output, + red_comment_md: comment.trim() || null, + }; + if (override !== test.executed_at_overridden) { + body.executed_at_overridden = override; + } + if (override) { + const iso = executedAt + ? new Date(executedAt).toISOString() + : null; + body.executed_at = iso; + } + save.mutate(body); + } + + const apiErr = save.error instanceof ApiError ? save.error : null; + const canOverride = + canWriteRed && (test.state === 'executed' || test.state === 'reviewed_by_blue'); + + return ( + + + {apiErr && {apiErr.message}} + setCommand(e.target.value)} + disabled={!canWriteRed} + className="font-mono" + data-testid="red-command" + placeholder="powershell -enc ..." + /> +