feat(backend): add Jinja2 sandbox + regex_extract filter (B0.5)
- CleanupRenderer wraps jinja2.sandbox.SandboxedEnvironment with
StrictUndefined (no autoescape — shell context, not HTML).
- Custom filter regex_extract(text, pattern, group=1, default='') uses
google-re2 for linear-time matching (ReDoS-safe) and falls back to
re with a 1 MB input cap when re2 is absent.
- StepOutputs exposes {{ outputs.text }} and {{ outputs.blob('name') }}.
blob() decodes UTF-8 with latin-1 fallback, hard-capped at 10 MB
(consistent with F8 evidence limit, D-005).
- render_cleanup() is the module-level convenience wrapper.
This commit is contained in:
5
backend/src/mimic/templating/__init__.py
Normal file
5
backend/src/mimic/templating/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
"""Jinja2 sandboxed templating used for cleanup commands and payloads."""
|
||||||
|
|
||||||
|
from mimic.templating.sandbox import CleanupRenderer, RenderError, render_cleanup
|
||||||
|
|
||||||
|
__all__ = ["CleanupRenderer", "RenderError", "render_cleanup"]
|
||||||
55
backend/src/mimic/templating/filters.py
Normal file
55
backend/src/mimic/templating/filters.py
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
"""Custom Jinja2 filters.
|
||||||
|
|
||||||
|
`regex_extract(text, pattern, group=1, default="")` uses google-re2 for
|
||||||
|
linear-time matching to neutralize ReDoS on adversarial C2 output. If the
|
||||||
|
library isn't installed the implementation falls back to `re` with a hard
|
||||||
|
length cap.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
|
|
||||||
|
try: # pragma: no cover - presence depends on environment
|
||||||
|
import re2 as _re2 # type: ignore[import-not-found]
|
||||||
|
|
||||||
|
_HAS_RE2 = True
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
_re2 = None
|
||||||
|
_HAS_RE2 = False
|
||||||
|
|
||||||
|
|
||||||
|
_FALLBACK_MAX_INPUT = 1 * 1024 * 1024 # 1 MB safety cap when re2 missing
|
||||||
|
|
||||||
|
|
||||||
|
def regex_extract(
|
||||||
|
text: object,
|
||||||
|
pattern: str,
|
||||||
|
group: int = 1,
|
||||||
|
default: str = "",
|
||||||
|
) -> str:
|
||||||
|
"""Return capture group `group` of the first match of `pattern` in `text`."""
|
||||||
|
if text is None:
|
||||||
|
return default
|
||||||
|
haystack = text if isinstance(text, str) else str(text)
|
||||||
|
|
||||||
|
if _HAS_RE2:
|
||||||
|
compiled = _re2.compile(pattern)
|
||||||
|
match = compiled.search(haystack)
|
||||||
|
else:
|
||||||
|
if len(haystack) > _FALLBACK_MAX_INPUT:
|
||||||
|
haystack = haystack[:_FALLBACK_MAX_INPUT]
|
||||||
|
compiled_py = re.compile(pattern)
|
||||||
|
match = compiled_py.search(haystack)
|
||||||
|
|
||||||
|
if match is None:
|
||||||
|
return default
|
||||||
|
try:
|
||||||
|
captured = match.group(group)
|
||||||
|
except (IndexError, _IndexErrors):
|
||||||
|
return default
|
||||||
|
return captured if captured is not None else default
|
||||||
|
|
||||||
|
|
||||||
|
# `re2.error` is `_re2.error`; `re.error` differs. Tuple them for safe catch.
|
||||||
|
_IndexErrors = (re.error,)
|
||||||
106
backend/src/mimic/templating/sandbox.py
Normal file
106
backend/src/mimic/templating/sandbox.py
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
"""Sandboxed Jinja2 environment used to resolve cleanup commands and payloads.
|
||||||
|
|
||||||
|
Spec H26 / D-005: two output accessors are exposed.
|
||||||
|
|
||||||
|
- `{{ params.<key> }}` — straight from the merged TTP/scenario parameters.
|
||||||
|
- `{{ outputs.text }}` — `run_step.output_text` (stdout / UTF-8 text).
|
||||||
|
- `{{ outputs.blob("name") }}` — decoded `output_blob_ref` content, 10 MB cap,
|
||||||
|
UTF-8 with latin-1 fallback, silent empty string on non-decodable data.
|
||||||
|
|
||||||
|
The custom `regex_extract` filter operates on the resulting string only.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from collections.abc import Mapping
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from jinja2 import StrictUndefined, TemplateError
|
||||||
|
from jinja2.sandbox import SandboxedEnvironment
|
||||||
|
|
||||||
|
from mimic.config import get_settings
|
||||||
|
from mimic.templating.filters import regex_extract
|
||||||
|
|
||||||
|
|
||||||
|
class RenderError(RuntimeError):
|
||||||
|
"""Raised when a cleanup / payload template cannot be rendered safely."""
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True, slots=True)
|
||||||
|
class StepOutputs:
|
||||||
|
"""Read-only view of the previous step's outputs exposed to templates."""
|
||||||
|
|
||||||
|
text: str = ""
|
||||||
|
blob_path: Path | None = None
|
||||||
|
blob_max_bytes: int = 10 * 1024 * 1024
|
||||||
|
|
||||||
|
def blob(self, _name: str = "default") -> str:
|
||||||
|
"""Read the binary output blob, decoded (UTF-8 → latin-1 fallback).
|
||||||
|
|
||||||
|
The argument is accepted for future multi-blob support but ignored in
|
||||||
|
v1 — a step has at most one blob attachment.
|
||||||
|
"""
|
||||||
|
if self.blob_path is None:
|
||||||
|
return ""
|
||||||
|
try:
|
||||||
|
raw = self.blob_path.read_bytes()
|
||||||
|
except OSError:
|
||||||
|
return ""
|
||||||
|
if len(raw) > self.blob_max_bytes:
|
||||||
|
raw = raw[: self.blob_max_bytes]
|
||||||
|
try:
|
||||||
|
return raw.decode("utf-8")
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
try:
|
||||||
|
return raw.decode("latin-1")
|
||||||
|
except UnicodeDecodeError: # pragma: no cover - latin-1 never fails
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
class CleanupRenderer:
|
||||||
|
"""Sandboxed Jinja2 renderer for cleanup commands and payload templates."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
env = SandboxedEnvironment(
|
||||||
|
undefined=StrictUndefined,
|
||||||
|
autoescape=False,
|
||||||
|
trim_blocks=False,
|
||||||
|
lstrip_blocks=False,
|
||||||
|
keep_trailing_newline=False,
|
||||||
|
)
|
||||||
|
env.filters["regex_extract"] = regex_extract
|
||||||
|
self._env = env
|
||||||
|
|
||||||
|
def render(
|
||||||
|
self,
|
||||||
|
template_text: str,
|
||||||
|
*,
|
||||||
|
params: Mapping[str, Any] | None = None,
|
||||||
|
outputs: StepOutputs | None = None,
|
||||||
|
) -> str:
|
||||||
|
try:
|
||||||
|
tmpl = self._env.from_string(template_text)
|
||||||
|
return tmpl.render(
|
||||||
|
params=dict(params or {}),
|
||||||
|
outputs=outputs or StepOutputs(),
|
||||||
|
)
|
||||||
|
except TemplateError as exc:
|
||||||
|
raise RenderError(str(exc)) from exc
|
||||||
|
|
||||||
|
|
||||||
|
_RENDERER = CleanupRenderer()
|
||||||
|
|
||||||
|
|
||||||
|
def render_cleanup(
|
||||||
|
template_text: str,
|
||||||
|
*,
|
||||||
|
params: Mapping[str, Any] | None = None,
|
||||||
|
outputs: StepOutputs | None = None,
|
||||||
|
) -> str:
|
||||||
|
"""Module-level convenience: render with the singleton renderer."""
|
||||||
|
if outputs is None:
|
||||||
|
settings = get_settings()
|
||||||
|
outputs = StepOutputs(blob_max_bytes=settings.output_blob_max_bytes)
|
||||||
|
return _RENDERER.render(template_text, params=params, outputs=outputs)
|
||||||
Reference in New Issue
Block a user