From 162b6988f8e508e64d1a3b5eea93f545285ad3f3 Mon Sep 17 00:00:00 2001 From: knacky Date: Thu, 21 May 2026 20:44:48 +0200 Subject: [PATCH] fix(backend): align regex_extract + outputs.blob() with D-011/D-012 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit D-011 — `regex_extract(text, pattern, *, group=1, name=None)`: - engine google-re2 (linear-time, ReDoS-safe), `re` fallback with 1 MB cap. - first match only. - no match → raises Jinja2 `TemplateError` (no silent default — cleanup templates must fail loud when source string drifts). - default capture is group 1 with fallback to group(0) when the pattern has no groups; named groups via `name=""`. D-012 — `outputs.blob()`: - reads the gzip-compressed CAS file from `MIMIC_BLOB_ROOT`. - 10 MB cap is applied **after** decompression. - decode UTF-8 with latin-1 fallback; never raises (missing / corrupt / non-gzip blobs return empty string, logged at WARNING). Unit tests rewritten to cover both the new fail-loud regex contract and the gzip read path. 49 unit tests pass; ruff clean. --- backend/src/mimic/templating/filters.py | 62 ++++++++++++++++++------- backend/src/mimic/templating/sandbox.py | 49 +++++++++++++------ backend/tests/unit/test_templating.py | 55 ++++++++++++++++++---- 3 files changed, 127 insertions(+), 39 deletions(-) diff --git a/backend/src/mimic/templating/filters.py b/backend/src/mimic/templating/filters.py index b7be750..0df2f53 100644 --- a/backend/src/mimic/templating/filters.py +++ b/backend/src/mimic/templating/filters.py @@ -1,14 +1,21 @@ """Custom Jinja2 filters. -`regex_extract(text, pattern, group=1, default="")` uses google-re2 for -linear-time matching to neutralize ReDoS on adversarial C2 output. If the -library isn't installed the implementation falls back to `re` with a hard -length cap. +`regex_extract(text, pattern, *, group=1, name=None)` per D-011: +- google-re2 engine (linear-time, no backrefs, ReDoS-safe). Falls back to the + stdlib `re` module when re2 is absent, with a 1 MB input cap. +- First match only. +- No match → raises a Jinja2 `TemplateError` (no silent default — cleanup + templates must fail loud when the source string drifts). +- Default capture is group 1, falling back to the full match when the pattern + has no groups. Named groups via `name=""`. """ from __future__ import annotations import re +from typing import Any + +from jinja2 import TemplateError try: # pragma: no cover - presence depends on environment import re2 as _re2 # type: ignore[import-not-found] @@ -23,14 +30,15 @@ _FALLBACK_MAX_INPUT = 1 * 1024 * 1024 # 1 MB safety cap when re2 missing def regex_extract( - text: object, + text: Any, pattern: str, + *, group: int = 1, - default: str = "", + name: str | None = None, ) -> str: - """Return capture group `group` of the first match of `pattern` in `text`.""" + """First-match capture; raise on no match (spec D-011).""" if text is None: - return default + raise TemplateError(f"regex_extract: cannot match against None for /{pattern}/") haystack = text if isinstance(text, str) else str(text) if _HAS_RE2: @@ -39,17 +47,37 @@ def regex_extract( else: if len(haystack) > _FALLBACK_MAX_INPUT: haystack = haystack[:_FALLBACK_MAX_INPUT] - compiled_py = re.compile(pattern) - match = compiled_py.search(haystack) + match = re.compile(pattern).search(haystack) if match is None: - return default + raise TemplateError(f"regex_extract: no match for /{pattern}/") + + if name is not None: + try: + captured = match.group(name) + except IndexError as exc: + raise TemplateError( + f"regex_extract: named group {name!r} not in /{pattern}/" + ) from exc + if captured is None: + raise TemplateError( + f"regex_extract: named group {name!r} captured nothing in /{pattern}/" + ) + return captured + try: captured = match.group(group) - except (IndexError, _IndexErrors): - return default - return captured if captured is not None else default + except IndexError: + if group == 1: + return match.group(0) + raise TemplateError( + f"regex_extract: group {group} out of range for /{pattern}/" + ) from None - -# `re2.error` is `_re2.error`; `re.error` differs. Tuple them for safe catch. -_IndexErrors = (re.error,) + if captured is None: + if group == 1: + return match.group(0) + raise TemplateError( + f"regex_extract: group {group} captured nothing in /{pattern}/" + ) + return captured diff --git a/backend/src/mimic/templating/sandbox.py b/backend/src/mimic/templating/sandbox.py index 671bd17..5f65588 100644 --- a/backend/src/mimic/templating/sandbox.py +++ b/backend/src/mimic/templating/sandbox.py @@ -1,17 +1,22 @@ """Sandboxed Jinja2 environment used to resolve cleanup commands and payloads. -Spec H26 / D-005: two output accessors are exposed. +Spec H26 / D-005 / D-012: two output accessors are exposed to templates. - `{{ params. }}` — straight from the merged TTP/scenario parameters. - `{{ outputs.text }}` — `run_step.output_text` (stdout / UTF-8 text). -- `{{ outputs.blob("name") }}` — decoded `output_blob_ref` content, 10 MB cap, - UTF-8 with latin-1 fallback, silent empty string on non-decodable data. +- `{{ outputs.blob() }}` — decoded `output_blob_ref` content. Per D-012 the + blob lives in `MIMIC_BLOB_ROOT` as a content-addressed gzip-compressed file; + `StepOutputs` does the decompression and exposes a UTF-8 string with a + latin-1 fallback. Hard cap 10 MB **after decompression** (consistent with + F8 evidence limit). The custom `regex_extract` filter operates on the resulting string only. """ from __future__ import annotations +import gzip +import logging from collections.abc import Mapping from dataclasses import dataclass from pathlib import Path @@ -23,6 +28,8 @@ from jinja2.sandbox import SandboxedEnvironment from mimic.config import get_settings from mimic.templating.filters import regex_extract +log = logging.getLogger(__name__) + class RenderError(RuntimeError): """Raised when a cleanup / payload template cannot be rendered safely.""" @@ -37,26 +44,42 @@ class StepOutputs: blob_max_bytes: int = 10 * 1024 * 1024 def blob(self, _name: str = "default") -> str: - """Read the binary output blob, decoded (UTF-8 → latin-1 fallback). + """Read the CAS-gzipped output blob (D-012), decoded UTF-8 with + latin-1 fallback. Returns the empty string when the blob is missing + or undecodable (logged but never raises — templates that need a + present blob should assert via regex_extract instead). The argument is accepted for future multi-blob support but ignored in v1 — a step has at most one blob attachment. """ - if self.blob_path is None: - return "" - try: - raw = self.blob_path.read_bytes() - except OSError: + raw = self._read_raw() + if raw is None: return "" if len(raw) > self.blob_max_bytes: raw = raw[: self.blob_max_bytes] try: return raw.decode("utf-8") except UnicodeDecodeError: - try: - return raw.decode("latin-1") - except UnicodeDecodeError: # pragma: no cover - latin-1 never fails - return "" + pass + try: + return raw.decode("latin-1") + except UnicodeDecodeError: # pragma: no cover - latin-1 never fails + log.warning("blob undecodable even as latin-1: %s", self.blob_path) + return "" + + def _read_raw(self) -> bytes | None: + if self.blob_path is None: + return None + try: + with gzip.open(self.blob_path, "rb") as fh: + return fh.read(self.blob_max_bytes + 1) + except FileNotFoundError: + log.warning("blob not found: %s", self.blob_path) + except OSError as exc: + log.warning("blob unreadable %s: %s", self.blob_path, exc) + except gzip.BadGzipFile as exc: + log.warning("blob is not gzip %s: %s", self.blob_path, exc) + return None class CleanupRenderer: diff --git a/backend/tests/unit/test_templating.py b/backend/tests/unit/test_templating.py index d90cb66..df8cb53 100644 --- a/backend/tests/unit/test_templating.py +++ b/backend/tests/unit/test_templating.py @@ -2,7 +2,10 @@ from __future__ import annotations +import gzip + import pytest +from jinja2 import TemplateError from mimic.templating.filters import regex_extract from mimic.templating.sandbox import ( @@ -17,14 +20,23 @@ class TestRegexExtract: def test_returns_capture_group(self) -> None: assert regex_extract("hello world", r"hello (\w+)") == "world" - def test_default_when_no_match(self) -> None: - assert regex_extract("hello", r"foo(\d+)", default="N/A") == "N/A" + def test_no_match_raises(self) -> None: + with pytest.raises(TemplateError, match="no match"): + regex_extract("hello", r"foo(\d+)") - def test_none_input_returns_default(self) -> None: - assert regex_extract(None, r"x", default="empty") == "empty" + def test_none_input_raises(self) -> None: + with pytest.raises(TemplateError, match="None"): + regex_extract(None, r"x") - def test_supports_group_zero(self) -> None: - assert regex_extract("abc123", r"\w+\d+", group=0) == "abc123" + def test_no_groups_falls_back_to_full_match(self) -> None: + assert regex_extract("abc123", r"\w+\d+") == "abc123" + + def test_named_group(self) -> None: + assert regex_extract("pid=4242", r"pid=(?P\d+)", name="n") == "4242" + + def test_missing_named_group_raises(self) -> None: + with pytest.raises(TemplateError): + regex_extract("pid=4242", r"pid=(\d+)", name="absent") class TestCleanupRenderer: @@ -52,6 +64,13 @@ class TestCleanupRenderer: ) assert out == "4242" + def test_regex_extract_no_match_propagates_as_render_error(self) -> None: + with pytest.raises(RenderError, match="no match"): + self.renderer.render( + r"{{ outputs.text | regex_extract('pid=(\\d+)') }}", + outputs=StepOutputs(text="nothing"), + ) + def test_strict_undefined_raises(self) -> None: with pytest.raises(RenderError): self.renderer.render("{{ params.does_not_exist }}", params={}) @@ -73,8 +92,26 @@ class TestStepOutputsBlob: out = StepOutputs(text="x") assert out.blob() == "" - def test_blob_caps_size(self, tmp_path) -> None: - blob = tmp_path / "evidence.bin" - blob.write_bytes(b"A" * 1024) + def test_blob_reads_gzipped_file(self, tmp_path) -> None: + blob = tmp_path / "blob.gz" + with gzip.open(blob, "wb") as fh: + fh.write(b"hello") + out = StepOutputs(blob_path=blob) + assert out.blob() == "hello" + + def test_blob_caps_size_after_decompression(self, tmp_path) -> None: + blob = tmp_path / "blob.gz" + with gzip.open(blob, "wb") as fh: + fh.write(b"A" * 1024) out = StepOutputs(blob_path=blob, blob_max_bytes=10) assert out.blob() == "A" * 10 + + def test_blob_missing_file_returns_empty(self, tmp_path) -> None: + out = StepOutputs(blob_path=tmp_path / "absent.gz") + assert out.blob() == "" + + def test_blob_non_gzip_returns_empty(self, tmp_path) -> None: + blob = tmp_path / "blob.gz" + blob.write_bytes(b"not actually gzip") + out = StepOutputs(blob_path=blob) + assert out.blob() == ""