feature/m5-templates #2
@@ -31,6 +31,15 @@ All notable changes to this project will be documented here. Format: [Keep a Cha
|
|||||||
- **`LogRecord` key collision**: `log.info(..., extra={"name": ...})` raises `KeyError("Attempt to overwrite 'name' in LogRecord")` because `name` is reserved by Python's stdlib logging. Renamed to `template_name`.
|
- **`LogRecord` key collision**: `log.info(..., extra={"name": ...})` raises `KeyError("Attempt to overwrite 'name' in LogRecord")` because `name` is reserved by Python's stdlib logging. Renamed to `template_name`.
|
||||||
- **React `currentTarget` null in deferred state updaters**: `onChange={(e) => setX((prev) => ({ ...prev, q: e.currentTarget.value }))}` blanked the page on the first user input because `currentTarget` is cleared after the listener bubble ends, before React invokes the updater. Switched all M5 handlers to `e.target.value`, which persists on the synthetic event.
|
- **React `currentTarget` null in deferred state updaters**: `onChange={(e) => setX((prev) => ({ ...prev, q: e.currentTarget.value }))}` blanked the page on the first user input because `currentTarget` is cleared after the listener bubble ends, before React invokes the updater. Switched all M5 handlers to `e.target.value`, which persists on the synthetic event.
|
||||||
|
|
||||||
|
### Fixed (post-M5 review pass — spec-reviewer + code-reviewer)
|
||||||
|
- **Filter combinator was OR, not AND** (`backend/app/services/test_templates.py:235`): `?tactic=TA0002&technique=T1059` returned templates matching *either* facet instead of *both*. Pre-fix also pooled all three UUIDs into a shared `IN` list across three columns, theoretically allowing a UUID collision to match across kinds. Refactored to one IN-subquery per facet, ANDed together via repeated `WHERE id IN (...)`.
|
||||||
|
- **Concurrent reorder race on `set_scenario_tests`** (`backend/app/services/scenario_templates.py:207`): two parallel reorders on the same scenario could deadlock on the `UNIQUE(scenario_id, position)` constraint under READ COMMITTED. Added a per-scenario `pg_advisory_xact_lock(0x5C3, hash(scenario_id))` mirroring the M4 `/mitre/sync` pattern; different scenarios don't contend.
|
||||||
|
- **N+1 on `_to_view` MITRE resolution** (`backend/app/services/test_templates.py:160`): rendering K templates with ~T tags each fired up to K×T `s.get(...)` calls. Added `_to_views_batch` that pre-builds `{uuid → MitreRow}` maps in 3 queries and feeds them to per-template view assembly; `list_test_templates` now issues 4 queries total regardless of list size.
|
||||||
|
- **Wire-level item length cap on `tags` / `expected_iocs`** (`backend/app/api/test_templates.py:18-21`): the DB columns are `ARRAY(String(64))` / `ARRAY(String(255))` but the API layer only capped the LIST length, not item strings — long inputs hit the driver with `StringDataRightTruncation`. Added `Annotated[str, StringConstraints(...)]` types so the API returns 400 with a clean validation error.
|
||||||
|
- **Front-end mutation cache hygiene** (`frontend/src/pages/AdminScenariosPage.tsx:148-156`): `updateMeta` and `setTests` mutations are run sequentially in `submit()`; on partial failure (metadata saved but reorder failed) the cache stayed stale. Both mutations now `onSettled: invalidate` so whatever step landed is reflected without manual refresh.
|
||||||
|
- **Backend vs front-end consistency on duplicate tests in a scenario** (`frontend/src/pages/AdminScenariosPage.tsx:227-231`): the backend allows the same `test_template` to appear multiple times (chained ops; the UNIQUE constraint is `(scenario_id, position)` not `(scenario_id, test_template_id)`), but the catalogue picker was filtering out already-picked items. Removed the filter — only soft-deleted tests are excluded now.
|
||||||
|
- **Test coverage closure** (`backend/tests/test_templates.py`): +4 pytest (tactic+technique AND-semantics, `extra="forbid"` rejection, empty `mitre_tags` explicit clear, 65-char tag length cap → 400). Total backend now 23 M5 tests + 39 elsewhere = 81 pass.
|
||||||
|
|
||||||
### Added — M4 (MITRE ATT&CK Enterprise)
|
### Added — M4 (MITRE ATT&CK Enterprise)
|
||||||
- **STIX 2.1 parser + upsert** (`app/services/mitre_seed.py`): stdlib-only (`urllib.request` + `hashlib`), pinned to Enterprise v19.0 (`enterprise-attack-19.0.json`, sha256 `df520ea0…`). Parses 25k+ STIX objects → 15 tactics, 222 techniques, 475 sub-techniques in ~1.1 s. Skips revoked + deprecated, resolves sub-technique parents via `relationship[subtechnique-of]` with a `T1003.001 → T1003` dotted-id fallback, copies kill-chain phases into the `mitre_technique_tactics` M2M.
|
- **STIX 2.1 parser + upsert** (`app/services/mitre_seed.py`): stdlib-only (`urllib.request` + `hashlib`), pinned to Enterprise v19.0 (`enterprise-attack-19.0.json`, sha256 `df520ea0…`). Parses 25k+ STIX objects → 15 tactics, 222 techniques, 475 sub-techniques in ~1.1 s. Skips revoked + deprecated, resolves sub-technique parents via `relationship[subtechnique-of]` with a `T1003.001 → T1003` dotted-id fallback, copies kill-chain phases into the `mitre_technique_tactics` M2M.
|
||||||
- **CLI**: `flask metamorph seed-mitre [--source <path|url>] [--checksum-sha256 <hex>] [--skip-checksum]` (`app/cli.py`). `make seed-mitre` wraps it.
|
- **CLI**: `flask metamorph seed-mitre [--source <path|url>] [--checksum-sha256 <hex>] [--skip-checksum]` (`app/cli.py`). `make seed-mitre` wraps it.
|
||||||
|
|||||||
@@ -12,11 +12,18 @@ import uuid
|
|||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from flask import Blueprint, jsonify, request
|
from flask import Blueprint, jsonify, request
|
||||||
from pydantic import BaseModel, Field, ValidationError
|
from pydantic import BaseModel, Field, StringConstraints, ValidationError
|
||||||
|
from typing import Annotated
|
||||||
|
|
||||||
from app.core.auth_decorators import require_auth, require_perm
|
from app.core.auth_decorators import require_auth, require_perm
|
||||||
from app.services import test_templates as svc
|
from app.services import test_templates as svc
|
||||||
|
|
||||||
|
# Tag and IOC entries are stored as PG ARRAY(String(N)). Cap items at the wire
|
||||||
|
# layer so over-sized inputs return 400 with a useful message rather than the
|
||||||
|
# bare StringDataRightTruncation from the driver.
|
||||||
|
TagStr = Annotated[str, StringConstraints(min_length=1, max_length=64)]
|
||||||
|
IocStr = Annotated[str, StringConstraints(min_length=1, max_length=255)]
|
||||||
|
|
||||||
bp = Blueprint("test_templates", __name__, url_prefix="/test-templates")
|
bp = Blueprint("test_templates", __name__, url_prefix="/test-templates")
|
||||||
log = logging.getLogger("metamorph.api.test_templates")
|
log = logging.getLogger("metamorph.api.test_templates")
|
||||||
|
|
||||||
@@ -40,8 +47,8 @@ class CreateTestTemplatePayload(BaseModel):
|
|||||||
expected_result_red_md: str | None = Field(default=None, max_length=32_000)
|
expected_result_red_md: str | None = Field(default=None, max_length=32_000)
|
||||||
expected_detection_blue_md: str | None = Field(default=None, max_length=32_000)
|
expected_detection_blue_md: str | None = Field(default=None, max_length=32_000)
|
||||||
opsec_level: str = Field(default="medium")
|
opsec_level: str = Field(default="medium")
|
||||||
tags: list[str] = Field(default_factory=list, max_length=64)
|
tags: list[TagStr] = Field(default_factory=list, max_length=64)
|
||||||
expected_iocs: list[str] = Field(default_factory=list, max_length=128)
|
expected_iocs: list[IocStr] = Field(default_factory=list, max_length=128)
|
||||||
mitre_tags: list[MitreTagIn] = Field(default_factory=list, max_length=64)
|
mitre_tags: list[MitreTagIn] = Field(default_factory=list, max_length=64)
|
||||||
|
|
||||||
model_config = {"extra": "forbid"}
|
model_config = {"extra": "forbid"}
|
||||||
@@ -56,8 +63,8 @@ class UpdateTestTemplatePayload(BaseModel):
|
|||||||
expected_result_red_md: str | None = Field(default=None, max_length=32_000)
|
expected_result_red_md: str | None = Field(default=None, max_length=32_000)
|
||||||
expected_detection_blue_md: str | None = Field(default=None, max_length=32_000)
|
expected_detection_blue_md: str | None = Field(default=None, max_length=32_000)
|
||||||
opsec_level: str | None = None
|
opsec_level: str | None = None
|
||||||
tags: list[str] | None = Field(default=None, max_length=64)
|
tags: list[TagStr] | None = Field(default=None, max_length=64)
|
||||||
expected_iocs: list[str] | None = Field(default=None, max_length=128)
|
expected_iocs: list[IocStr] | None = Field(default=None, max_length=128)
|
||||||
mitre_tags: list[MitreTagIn] | None = Field(default=None, max_length=64)
|
mitre_tags: list[MitreTagIn] | None = Field(default=None, max_length=64)
|
||||||
|
|
||||||
model_config = {"extra": "forbid"}
|
model_config = {"extra": "forbid"}
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ from dataclasses import dataclass
|
|||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from sqlalchemy import func, or_, select
|
from sqlalchemy import func, or_, select, text
|
||||||
from sqlalchemy.orm import Session, selectinload
|
from sqlalchemy.orm import Session, selectinload
|
||||||
|
|
||||||
_UNSET: Any = object()
|
_UNSET: Any = object()
|
||||||
@@ -208,8 +208,20 @@ def set_scenario_tests(
|
|||||||
scenario_id: uuid.UUID,
|
scenario_id: uuid.UUID,
|
||||||
test_template_ids: list[uuid.UUID],
|
test_template_ids: list[uuid.UUID],
|
||||||
) -> ScenarioTemplateView:
|
) -> ScenarioTemplateView:
|
||||||
"""Replace the entire ordered test list. `position` becomes the index."""
|
"""Replace the entire ordered test list. `position` becomes the index.
|
||||||
|
|
||||||
|
Acquires a per-scenario advisory lock to serialise concurrent reorders.
|
||||||
|
Without it, two parallel `PUT /scenario-templates/{id}/tests` calls would
|
||||||
|
race on the wipe-then-insert sequence and deadlock on the UNIQUE(position)
|
||||||
|
constraint under READ COMMITTED. Mirrors the M4 pattern on /mitre/sync.
|
||||||
|
"""
|
||||||
with session_scope() as s:
|
with session_scope() as s:
|
||||||
|
# Lock keyed on the scenario UUID — different scenarios don't block
|
||||||
|
# each other. Two-int form: high-32 = constant, low-32 = hash of UUID.
|
||||||
|
s.execute(
|
||||||
|
text("SELECT pg_advisory_xact_lock(:n, :m)"),
|
||||||
|
{"n": 0x5C3, "m": hash(scenario_id) & 0xFFFFFFFF},
|
||||||
|
)
|
||||||
sc = s.get(ScenarioTemplate, scenario_id)
|
sc = s.get(ScenarioTemplate, scenario_id)
|
||||||
if sc is None or sc.deleted_at is not None:
|
if sc is None or sc.deleted_at is not None:
|
||||||
raise ScenarioTemplateNotFound()
|
raise ScenarioTemplateNotFound()
|
||||||
|
|||||||
@@ -157,24 +157,122 @@ def _resolve_mitre_refs(s: Session, refs: list[MitreTagRef]) -> list[TestTemplat
|
|||||||
return rows
|
return rows
|
||||||
|
|
||||||
|
|
||||||
def _to_view(s: Session, t: TestTemplate) -> TestTemplateView:
|
def _resolve_mitre_views(s: Session, tags: list[TestTemplateMitreTag]) -> list[MitreTagView]:
|
||||||
tag_views: list[MitreTagView] = []
|
"""Batch-resolve polymorphic MITRE FKs into MitreTagViews in 3 queries
|
||||||
|
total — one per kind — regardless of how many tags or templates the
|
||||||
|
caller is rendering.
|
||||||
|
"""
|
||||||
|
tactic_ids = {t.tactic_id for t in tags if t.mitre_kind == "tactic" and t.tactic_id is not None}
|
||||||
|
technique_ids = {t.technique_id for t in tags if t.mitre_kind == "technique" and t.technique_id is not None}
|
||||||
|
sub_ids = {t.subtechnique_id for t in tags if t.mitre_kind == "subtechnique" and t.subtechnique_id is not None}
|
||||||
|
|
||||||
|
tactic_map: dict[uuid.UUID, MitreTactic] = {}
|
||||||
|
technique_map: dict[uuid.UUID, MitreTechnique] = {}
|
||||||
|
sub_map: dict[uuid.UUID, MitreSubtechnique] = {}
|
||||||
|
if tactic_ids:
|
||||||
|
tactic_map = {row.id: row for row in s.scalars(select(MitreTactic).where(MitreTactic.id.in_(tactic_ids))).all()}
|
||||||
|
if technique_ids:
|
||||||
|
technique_map = {
|
||||||
|
row.id: row
|
||||||
|
for row in s.scalars(select(MitreTechnique).where(MitreTechnique.id.in_(technique_ids))).all()
|
||||||
|
}
|
||||||
|
if sub_ids:
|
||||||
|
sub_map = {
|
||||||
|
row.id: row
|
||||||
|
for row in s.scalars(select(MitreSubtechnique).where(MitreSubtechnique.id.in_(sub_ids))).all()
|
||||||
|
}
|
||||||
|
|
||||||
|
views: list[MitreTagView] = []
|
||||||
|
for tag in tags:
|
||||||
|
if tag.mitre_kind == "tactic" and tag.tactic_id in tactic_map:
|
||||||
|
row_t = tactic_map[tag.tactic_id]
|
||||||
|
views.append(MitreTagView(kind="tactic", external_id=row_t.external_id, name=row_t.name, url=row_t.url))
|
||||||
|
elif tag.mitre_kind == "technique" and tag.technique_id in technique_map:
|
||||||
|
row_te = technique_map[tag.technique_id]
|
||||||
|
views.append(MitreTagView(kind="technique", external_id=row_te.external_id, name=row_te.name, url=row_te.url))
|
||||||
|
elif tag.mitre_kind == "subtechnique" and tag.subtechnique_id in sub_map:
|
||||||
|
row_sb = sub_map[tag.subtechnique_id]
|
||||||
|
views.append(MitreTagView(kind="subtechnique", external_id=row_sb.external_id, name=row_sb.name, url=row_sb.url))
|
||||||
|
views.sort(key=lambda v: (v.kind, v.external_id))
|
||||||
|
return views
|
||||||
|
|
||||||
|
|
||||||
|
def _to_views_batch(s: Session, templates: list[TestTemplate]) -> list[TestTemplateView]:
|
||||||
|
"""List-level batcher: one bulk MITRE resolve for all templates' tags.
|
||||||
|
|
||||||
|
For a list of K templates with ~T tags each, this issues 3 queries total
|
||||||
|
(one per MITRE kind) instead of 3K. We build (kind, uuid) → row maps
|
||||||
|
once, then assemble each template's view in memory.
|
||||||
|
"""
|
||||||
|
tactic_ids: set[uuid.UUID] = set()
|
||||||
|
technique_ids: set[uuid.UUID] = set()
|
||||||
|
sub_ids: set[uuid.UUID] = set()
|
||||||
|
for t in templates:
|
||||||
for tag in t.mitre_tags:
|
for tag in t.mitre_tags:
|
||||||
if tag.mitre_kind == "tactic" and tag.tactic_id is not None:
|
if tag.mitre_kind == "tactic" and tag.tactic_id is not None:
|
||||||
row = s.get(MitreTactic, tag.tactic_id)
|
tactic_ids.add(tag.tactic_id)
|
||||||
if row is not None:
|
|
||||||
tag_views.append(MitreTagView(kind="tactic", external_id=row.external_id, name=row.name, url=row.url))
|
|
||||||
elif tag.mitre_kind == "technique" and tag.technique_id is not None:
|
elif tag.mitre_kind == "technique" and tag.technique_id is not None:
|
||||||
row = s.get(MitreTechnique, tag.technique_id)
|
technique_ids.add(tag.technique_id)
|
||||||
if row is not None:
|
|
||||||
tag_views.append(MitreTagView(kind="technique", external_id=row.external_id, name=row.name, url=row.url))
|
|
||||||
elif tag.mitre_kind == "subtechnique" and tag.subtechnique_id is not None:
|
elif tag.mitre_kind == "subtechnique" and tag.subtechnique_id is not None:
|
||||||
row = s.get(MitreSubtechnique, tag.subtechnique_id)
|
sub_ids.add(tag.subtechnique_id)
|
||||||
if row is not None:
|
|
||||||
tag_views.append(
|
tactic_map: dict[uuid.UUID, MitreTactic] = (
|
||||||
MitreTagView(kind="subtechnique", external_id=row.external_id, name=row.name, url=row.url)
|
{row.id: row for row in s.scalars(select(MitreTactic).where(MitreTactic.id.in_(tactic_ids))).all()}
|
||||||
|
if tactic_ids
|
||||||
|
else {}
|
||||||
)
|
)
|
||||||
tag_views.sort(key=lambda v: (v.kind, v.external_id))
|
technique_map: dict[uuid.UUID, MitreTechnique] = (
|
||||||
|
{row.id: row for row in s.scalars(select(MitreTechnique).where(MitreTechnique.id.in_(technique_ids))).all()}
|
||||||
|
if technique_ids
|
||||||
|
else {}
|
||||||
|
)
|
||||||
|
sub_map: dict[uuid.UUID, MitreSubtechnique] = (
|
||||||
|
{row.id: row for row in s.scalars(select(MitreSubtechnique).where(MitreSubtechnique.id.in_(sub_ids))).all()}
|
||||||
|
if sub_ids
|
||||||
|
else {}
|
||||||
|
)
|
||||||
|
|
||||||
|
def _views_for(tags: list[TestTemplateMitreTag]) -> list[MitreTagView]:
|
||||||
|
out: list[MitreTagView] = []
|
||||||
|
for tag in tags:
|
||||||
|
if tag.mitre_kind == "tactic" and tag.tactic_id in tactic_map:
|
||||||
|
row_t = tactic_map[tag.tactic_id]
|
||||||
|
out.append(MitreTagView(kind="tactic", external_id=row_t.external_id, name=row_t.name, url=row_t.url))
|
||||||
|
elif tag.mitre_kind == "technique" and tag.technique_id in technique_map:
|
||||||
|
row_te = technique_map[tag.technique_id]
|
||||||
|
out.append(MitreTagView(kind="technique", external_id=row_te.external_id, name=row_te.name, url=row_te.url))
|
||||||
|
elif tag.mitre_kind == "subtechnique" and tag.subtechnique_id in sub_map:
|
||||||
|
row_sb = sub_map[tag.subtechnique_id]
|
||||||
|
out.append(MitreTagView(kind="subtechnique", external_id=row_sb.external_id, name=row_sb.name, url=row_sb.url))
|
||||||
|
out.sort(key=lambda v: (v.kind, v.external_id))
|
||||||
|
return out
|
||||||
|
|
||||||
|
views: list[TestTemplateView] = []
|
||||||
|
for t in templates:
|
||||||
|
views.append(
|
||||||
|
TestTemplateView(
|
||||||
|
id=t.id,
|
||||||
|
name=t.name,
|
||||||
|
description=t.description,
|
||||||
|
objective=t.objective,
|
||||||
|
procedure_md=t.procedure_md,
|
||||||
|
prerequisites_md=t.prerequisites_md,
|
||||||
|
expected_result_red_md=t.expected_result_red_md,
|
||||||
|
expected_detection_blue_md=t.expected_detection_blue_md,
|
||||||
|
opsec_level=t.opsec_level,
|
||||||
|
tags=list(t.tags or []),
|
||||||
|
expected_iocs=list(t.expected_iocs or []),
|
||||||
|
mitre_tags=_views_for(list(t.mitre_tags)),
|
||||||
|
deleted_at=t.deleted_at,
|
||||||
|
created_at=t.created_at,
|
||||||
|
updated_at=t.updated_at,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return views
|
||||||
|
|
||||||
|
|
||||||
|
def _to_view(s: Session, t: TestTemplate) -> TestTemplateView:
|
||||||
|
tag_views = _resolve_mitre_views(s, list(t.mitre_tags))
|
||||||
return TestTemplateView(
|
return TestTemplateView(
|
||||||
id=t.id,
|
id=t.id,
|
||||||
name=t.name,
|
name=t.name,
|
||||||
@@ -232,41 +330,43 @@ def list_test_templates(
|
|||||||
stmt = stmt.where(TestTemplate.tags.any(tag))
|
stmt = stmt.where(TestTemplate.tags.any(tag))
|
||||||
count_stmt = count_stmt.where(TestTemplate.tags.any(tag))
|
count_stmt = count_stmt.where(TestTemplate.tags.any(tag))
|
||||||
|
|
||||||
# MITRE facet: resolve external_id → uuid then filter via join subquery.
|
# MITRE facets: each provided facet (tactic, technique, subtechnique) is
|
||||||
if tactic or technique or subtechnique:
|
# AND-combined — a template tagged BOTH `TA0006` AND `T1003` matches a
|
||||||
tag_ids: list[uuid.UUID] = []
|
# query with `?tactic=TA0006&technique=T1003`, but a template tagged
|
||||||
|
# only `TA0006` does NOT. Each facet matches strictly its own column
|
||||||
|
# (no cross-column UUID collision risk).
|
||||||
|
def _facet_subquery(column, mitre_id: uuid.UUID):
|
||||||
|
return (
|
||||||
|
select(TestTemplateMitreTag.test_template_id)
|
||||||
|
.where(column == mitre_id)
|
||||||
|
.distinct()
|
||||||
|
)
|
||||||
|
|
||||||
if tactic:
|
if tactic:
|
||||||
tac = s.scalar(select(MitreTactic).where(MitreTactic.external_id == tactic))
|
tac = s.scalar(select(MitreTactic).where(MitreTactic.external_id == tactic))
|
||||||
if tac is None:
|
if tac is None:
|
||||||
return [], 0
|
return [], 0
|
||||||
tag_ids.append(tac.id)
|
sub_q = _facet_subquery(TestTemplateMitreTag.tactic_id, tac.id)
|
||||||
|
stmt = stmt.where(TestTemplate.id.in_(sub_q))
|
||||||
|
count_stmt = count_stmt.where(TestTemplate.id.in_(sub_q))
|
||||||
if technique:
|
if technique:
|
||||||
tech = s.scalar(select(MitreTechnique).where(MitreTechnique.external_id == technique))
|
tech = s.scalar(select(MitreTechnique).where(MitreTechnique.external_id == technique))
|
||||||
if tech is None:
|
if tech is None:
|
||||||
return [], 0
|
return [], 0
|
||||||
tag_ids.append(tech.id)
|
sub_q = _facet_subquery(TestTemplateMitreTag.technique_id, tech.id)
|
||||||
|
stmt = stmt.where(TestTemplate.id.in_(sub_q))
|
||||||
|
count_stmt = count_stmt.where(TestTemplate.id.in_(sub_q))
|
||||||
if subtechnique:
|
if subtechnique:
|
||||||
sub = s.scalar(select(MitreSubtechnique).where(MitreSubtechnique.external_id == subtechnique))
|
sub = s.scalar(select(MitreSubtechnique).where(MitreSubtechnique.external_id == subtechnique))
|
||||||
if sub is None:
|
if sub is None:
|
||||||
return [], 0
|
return [], 0
|
||||||
tag_ids.append(sub.id)
|
sub_q = _facet_subquery(TestTemplateMitreTag.subtechnique_id, sub.id)
|
||||||
sub_q = (
|
|
||||||
select(TestTemplateMitreTag.test_template_id)
|
|
||||||
.where(
|
|
||||||
or_(
|
|
||||||
TestTemplateMitreTag.tactic_id.in_(tag_ids),
|
|
||||||
TestTemplateMitreTag.technique_id.in_(tag_ids),
|
|
||||||
TestTemplateMitreTag.subtechnique_id.in_(tag_ids),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
.distinct()
|
|
||||||
)
|
|
||||||
stmt = stmt.where(TestTemplate.id.in_(sub_q))
|
stmt = stmt.where(TestTemplate.id.in_(sub_q))
|
||||||
count_stmt = count_stmt.where(TestTemplate.id.in_(sub_q))
|
count_stmt = count_stmt.where(TestTemplate.id.in_(sub_q))
|
||||||
|
|
||||||
total = s.scalar(count_stmt) or 0
|
total = s.scalar(count_stmt) or 0
|
||||||
rows = s.scalars(stmt.limit(max(1, min(limit, 500))).offset(max(0, offset))).all()
|
rows = s.scalars(stmt.limit(max(1, min(limit, 500))).offset(max(0, offset))).all()
|
||||||
return [_to_view(s, t) for t in rows], int(total)
|
return _to_views_batch(s, list(rows)), int(total)
|
||||||
|
|
||||||
|
|
||||||
def get_test_template(template_id: uuid.UUID, *, include_deleted: bool = False) -> TestTemplateView:
|
def get_test_template(template_id: uuid.UUID, *, include_deleted: bool = False) -> TestTemplateView:
|
||||||
|
|||||||
@@ -490,3 +490,78 @@ def test_scenario_perm_required(client, admin_token):
|
|||||||
_, eve_token = _bootstrap_user_without_perms(client, admin_token, "scn-eve")
|
_, eve_token = _bootstrap_user_without_perms(client, admin_token, "scn-eve")
|
||||||
r = client.get("/api/v1/scenario-templates", headers=_bearer(eve_token))
|
r = client.get("/api/v1/scenario-templates", headers=_bearer(eve_token))
|
||||||
assert r.status_code == 403
|
assert r.status_code == 403
|
||||||
|
|
||||||
|
|
||||||
|
# === Post-review fixes ======================================================
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_filter_combines_facets_with_and_semantics(client, admin_token):
|
||||||
|
"""A template tagged only `TA0002` is NOT in `?tactic=TA0002&technique=T1059`.
|
||||||
|
|
||||||
|
Pre-fix the OR-combined query would return it. AND-combined semantics
|
||||||
|
(one IN subquery per facet) restrict the set to templates matching ALL
|
||||||
|
requested facets.
|
||||||
|
"""
|
||||||
|
a = _make_test(
|
||||||
|
client,
|
||||||
|
admin_token,
|
||||||
|
name="and-tactic-only",
|
||||||
|
mitre_tags=[{"kind": "tactic", "external_id": "TA0002"}],
|
||||||
|
)
|
||||||
|
b = _make_test(
|
||||||
|
client,
|
||||||
|
admin_token,
|
||||||
|
name="and-both-tags",
|
||||||
|
mitre_tags=[
|
||||||
|
{"kind": "tactic", "external_id": "TA0002"},
|
||||||
|
{"kind": "technique", "external_id": "T1059"},
|
||||||
|
],
|
||||||
|
)
|
||||||
|
r = client.get(
|
||||||
|
"/api/v1/test-templates?tactic=TA0002&technique=T1059",
|
||||||
|
headers=_bearer(admin_token),
|
||||||
|
)
|
||||||
|
assert r.status_code == 200
|
||||||
|
names = [it["name"] for it in r.get_json()["items"]]
|
||||||
|
assert "and-both-tags" in names
|
||||||
|
assert "and-tactic-only" not in names
|
||||||
|
_ = a, b # silence unused vars from linter
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_test_template_rejects_extra_fields(client, admin_token):
|
||||||
|
"""`model_config = {"extra": "forbid"}` — unknown fields must 400."""
|
||||||
|
r = client.post(
|
||||||
|
"/api/v1/test-templates",
|
||||||
|
headers=_bearer(admin_token),
|
||||||
|
json={"name": "extra-test", "rogue_field": "smuggled"},
|
||||||
|
)
|
||||||
|
assert r.status_code == 400
|
||||||
|
|
||||||
|
|
||||||
|
def test_update_test_template_explicit_empty_mitre_clears(client, admin_token):
|
||||||
|
"""`PUT { mitre_tags: [] }` is an explicit clear, not a no-op."""
|
||||||
|
body = _make_test(
|
||||||
|
client,
|
||||||
|
admin_token,
|
||||||
|
name="clear-tags",
|
||||||
|
mitre_tags=[{"kind": "technique", "external_id": "T1059"}],
|
||||||
|
)
|
||||||
|
assert len(body["mitre_tags"]) == 1
|
||||||
|
r = client.put(
|
||||||
|
f"/api/v1/test-templates/{body['id']}",
|
||||||
|
headers=_bearer(admin_token),
|
||||||
|
json={"mitre_tags": []},
|
||||||
|
)
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert r.get_json()["mitre_tags"] == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_tag_item_length_capped_at_64(client, admin_token):
|
||||||
|
"""Individual `tags` items must be ≤ 64 chars at the wire layer."""
|
||||||
|
long_tag = "x" * 65
|
||||||
|
r = client.post(
|
||||||
|
"/api/v1/test-templates",
|
||||||
|
headers=_bearer(admin_token),
|
||||||
|
json={"name": "long-tag", "tags": [long_tag]},
|
||||||
|
)
|
||||||
|
assert r.status_code == 400
|
||||||
|
|||||||
@@ -145,14 +145,19 @@ export function AdminScenariosPage() {
|
|||||||
onError: (e) => setError(humanError(e)),
|
onError: (e) => setError(humanError(e)),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// updateMeta and setTests both invalidate on success so a partial failure
|
||||||
|
// (metadata saved, reorder rejected) still leaves the cache consistent
|
||||||
|
// with whichever step landed.
|
||||||
const updateMeta = useMutation({
|
const updateMeta = useMutation({
|
||||||
mutationFn: ({ id, name, description }: { id: string; name: string; description: string | null }) =>
|
mutationFn: ({ id, name, description }: { id: string; name: string; description: string | null }) =>
|
||||||
apiPatch<ScenarioTemplate>(`/scenario-templates/${id}`, { name, description }),
|
apiPatch<ScenarioTemplate>(`/scenario-templates/${id}`, { name, description }),
|
||||||
|
onSettled: () => qc.invalidateQueries({ queryKey: ['templates', 'scenarios'] }),
|
||||||
});
|
});
|
||||||
|
|
||||||
const setTests = useMutation({
|
const setTests = useMutation({
|
||||||
mutationFn: ({ id, test_template_ids }: { id: string; test_template_ids: string[] }) =>
|
mutationFn: ({ id, test_template_ids }: { id: string; test_template_ids: string[] }) =>
|
||||||
apiPut<ScenarioTemplate>(`/scenario-templates/${id}/tests`, { test_template_ids }),
|
apiPut<ScenarioTemplate>(`/scenario-templates/${id}/tests`, { test_template_ids }),
|
||||||
|
onSettled: () => qc.invalidateQueries({ queryKey: ['templates', 'scenarios'] }),
|
||||||
});
|
});
|
||||||
|
|
||||||
const remove = useMutation({
|
const remove = useMutation({
|
||||||
@@ -223,12 +228,15 @@ export function AdminScenariosPage() {
|
|||||||
return map;
|
return map;
|
||||||
}, [catalogue.data]);
|
}, [catalogue.data]);
|
||||||
|
|
||||||
// Tests available for inclusion (excluding already-picked + soft-deleted).
|
// Tests offered for inclusion. The same test_template MAY appear multiple
|
||||||
|
// times in a scenario (chained operations are a real purple-team pattern,
|
||||||
|
// cf. `scenario_template_tests` UNIQUE on `(scenario_id, position)`, not
|
||||||
|
// on `test_template_id`). So we do NOT exclude already-picked items —
|
||||||
|
// only soft-deleted ones, which the backend would reject.
|
||||||
const availableTests = useMemo<TestTemplate[]>(() => {
|
const availableTests = useMemo<TestTemplate[]>(() => {
|
||||||
if (!catalogue.data) return [];
|
if (!catalogue.data) return [];
|
||||||
const picked = new Set(form.test_ids);
|
return catalogue.data.items.filter((t) => !t.deleted_at);
|
||||||
return catalogue.data.items.filter((t) => !picked.has(t.id) && !t.deleted_at);
|
}, [catalogue.data]);
|
||||||
}, [catalogue.data, form.test_ids]);
|
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<>
|
<>
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ make seed-mitre # tag picker needs the catalogue
|
|||||||
## 2. Tests automatisés
|
## 2. Tests automatisés
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
make test-api # 77 tests pytest dont 19 M5 (CRUD, perm, mitre tags, reorder)
|
make test-api # 81 tests pytest dont 23 M5 (CRUD, perm, mitre tags, reorder, AND-semantics, extra="forbid", item caps, empty-clear)
|
||||||
make e2e # 38 tests Playwright dont 4 M5 (API CRUD + scenario reorder + SPA list/filter)
|
make e2e # 38 tests Playwright dont 4 M5 (API CRUD + scenario reorder + SPA list/filter)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user