Files
Metamorph/backend/app/services/detection_levels.py

141 lines
3.8 KiB
Python
Raw Permalink Normal View History

feat(m7): per-test execution — red/blue zones, evidence pipeline, activity poll DoD M7 (spec §F5 + §F6 + §F8 + tasks/todo.md M7) covered end-to-end: Backend - New migration `91a4e7c6d2f3` adds `mission_tests.last_actor_id` (FK users ON DELETE SET NULL) and `ix_mission_tests_updated_at` for the polling query. - `detection_levels`: 4 default rows seeded at boot, `GET /detection-levels` read-only (CRUD lands in M8). - `mission_tests` service + `missions` API extension: - `GET /missions/{id}/tests/{test_id}` — full detail incl. evidence list - `PUT /missions/{id}/tests/{test_id}` — patch red/blue fields with per-field perm classification (`mission.write_red_fields` vs `mission.write_blue_fields`) - `POST /missions/{id}/tests/{test_id}/transition` — pending↔skipped/blocked and pending→executed→reviewed_by_blue (+ undo paths), side-aware perm gate that fires *before* idempotency, `executed_at` auto-stamped on the way in - `GET /missions/{id}/activity?since=<ISO>` — drives the 15 s polling badge - `evidence` service + top-level `/evidence/<id>` API: - Streaming upload, SHA256 chunk-by-chunk, 25 MB cap, ext+MIME whitelist - Content-addressed storage at ${EVIDENCE_DIR}/<mission>/<test>/<sha256><ext> - Atomic `os.replace`, hex-validated SHA path component, root-dir guard - Membership-aware (404 on miss/forbidden, no existence leak) - `/diag/reset` now wipes ${EVIDENCE_DIR}/* in test mode (symlink-safe) and re-seeds detection levels as a safety net. Frontend - `lib/missions.ts` — M7 types + queryKey factory + state-machine matrix. - `pages/MissionTestPage.tsx` — two-zone layout: red border (command, output, comment, mark-executed + override toggle) and cyan border (detection-level select, comment, drag-and-drop evidence dropzone). Last-touched badge polls /activity every 15 s, gated on document.visibilityState. Per-field disable based on the user's red/blue perms (server stays the arbiter). - `pages/MissionDetailPage.tsx` — test rows link to the new per-test page. - `App.tsx` — registers /missions/:id/tests/:testId behind RequireAuth. - `HomePage.tsx` — hero + roadmap card bumped to M7; next is M8. Tests - `backend/tests/test_mission_tests.py` — 27 pytest tests (red/blue field gating, state-machine matrix incl. idempotent-side enforcement, executed_at override, 24/26 MB upload + SHA256, MIME/ext whitelist, soft-delete hide, activity polling with URL-encoded `since`, membership 404 vs admin bypass, cross-mission evidence access). - `e2e/tests/m7-execution.spec.ts` — 5 Playwright tests against the live stack (red-only/blue-only API gating, mark-executed + reviewed_by_blue side enforcement, 24 MB/26 MB upload + SHA256 round-trip, SPA per-test page save + transition, non-member 404 message). afterAll restores stable admin and re-syncs MITRE. Docs - CHANGELOG.md: M7 section + post-M7 review-pass subsection. - README.md: status, feature blurb, roadmap, testing-m7 link. - tasks/testing-m7.md: manual + automated procedure with transition matrix and perm-gating table. - tasks/lessons.md: M7 retrospectives (LogRecord `created` trap, URL-encoded query timestamps, perm-before-flush, atomic move, polling visibility gate). Test count: 133 pytest / 49 Playwright, all green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 08:16:48 +02:00
"""Detection-level taxonomy.
The 4 default levels are seeded at boot. M7 exposes read-only access so the
blue side of a mission test can pick a level; M8 will add CRUD.
The seed is idempotent and additive: rows whose `key` already exists are left
alone (operators may have renamed labels). Only missing keys are inserted.
"""
from __future__ import annotations
import logging
import uuid
from dataclasses import dataclass
from sqlalchemy import select
from app.db.session import session_scope
from app.models.setting import DetectionLevel
log = logging.getLogger("metamorph.detection_levels")
@dataclass(frozen=True)
class DetectionLevelView:
id: uuid.UUID
key: str
label_fr: str
label_en: str
color_token: str
position: int
is_default: bool
is_system: bool
@dataclass(frozen=True)
class _DefaultLevel:
key: str
label_fr: str
label_en: str
color_token: str
position: int
is_default: bool
# Seed catalogue. Colors map onto the design-system accents (cf. tasks/design.md).
DEFAULT_LEVELS: tuple[_DefaultLevel, ...] = (
_DefaultLevel(
key="detected_blocked",
label_fr="Bloqué",
label_en="Blocked",
color_token="red",
position=0,
is_default=False,
),
_DefaultLevel(
key="detected_alert",
label_fr="Alerte détectée",
label_en="Alert detected",
color_token="orange",
position=1,
is_default=False,
),
_DefaultLevel(
key="logged_only",
label_fr="Loggé uniquement",
label_en="Logged only",
color_token="yellow",
position=2,
is_default=False,
),
_DefaultLevel(
key="not_detected",
label_fr="Non détecté",
label_en="Not detected",
color_token="rose",
position=3,
is_default=True,
),
)
def _to_view(r: DetectionLevel) -> DetectionLevelView:
return DetectionLevelView(
id=r.id,
key=r.key,
label_fr=r.label_fr,
label_en=r.label_en,
color_token=r.color_token,
position=r.position,
is_default=r.is_default,
is_system=r.is_system,
)
def seed_detection_levels() -> dict[str, int]:
"""Insert any default level whose `key` is missing. Idempotent.
We never mutate existing rows here operators are free to rename labels
or change the default flag. Adding a new entry to `DEFAULT_LEVELS` in a
future release will surface it on the next boot.
"""
created = 0
with session_scope() as s:
existing_keys = set(s.scalars(select(DetectionLevel.key)).all())
for lvl in DEFAULT_LEVELS:
if lvl.key in existing_keys:
continue
s.add(
DetectionLevel(
key=lvl.key,
label_fr=lvl.label_fr,
label_en=lvl.label_en,
color_token=lvl.color_token,
position=lvl.position,
is_default=lvl.is_default,
is_system=True,
)
)
created += 1
# `created` is a reserved LogRecord attribute (timestamp) — use a prefixed key.
log.info(
"metamorph.detection_levels.seeded",
extra={"rows_created": created, "total": len(DEFAULT_LEVELS)},
)
return {"created": created, "total": len(DEFAULT_LEVELS)}
def list_detection_levels() -> list[DetectionLevelView]:
with session_scope() as s:
rows = s.scalars(
select(DetectionLevel).order_by(DetectionLevel.position, DetectionLevel.key)
).all()
return [_to_view(r) for r in rows]
def get_detection_level(level_id: uuid.UUID) -> DetectionLevelView | None:
with session_scope() as s:
r = s.get(DetectionLevel, level_id)
return _to_view(r) if r is not None else None