test(backend): add pytest baseline (B0.8)

Unit (SQLite, pure logic):
- test_templating.py: Jinja2 sandbox, regex_extract, strict-undefined,
  sandbox blocks attribute-access escape, output blob 10 MB cap.
- test_password.py: bcrypt hash + verify, empty / malformed handling.
- test_soc_token.py: 256-bit url-safe token + bcrypt verification.
- test_rbac_matrix.py: F11 invariants (lead ⊇ operator, SOC restricted
  to detection + report-read, audit_read & ttp_promote lead-only).
- test_connector_factory.py: register / build / double-register-rejected,
  TaskStatus terminal helper, Mythic mapping vs empty Home mapping.
- test_audit_hash.py: SHA-256 chain helper is deterministic and reacts
  to prev_hash / metadata changes.

Integration scaffold (testcontainers Postgres):
- tests/integration/conftest.py spins up postgres:16-alpine, monkeypatches
  MIMIC_DATABASE_URL, creates a Flask app + db.create_all.
- test_healthz.py: end-to-end smoke through the Flask test client.

38 unit tests pass; ruff clean.
This commit is contained in:
knacky
2026-05-21 20:34:11 +02:00
parent a6b7502dfa
commit ec52208233
12 changed files with 436 additions and 0 deletions

View File

View File

@@ -0,0 +1,64 @@
"""Audit log hash-chain helper tests (pure function, no DB)."""
from __future__ import annotations
from datetime import UTC, datetime
from uuid import uuid4
from mimic.audit.log import audit_hash
def test_hash_changes_with_metadata() -> None:
ts = datetime(2026, 5, 21, 12, 0, tzinfo=UTC)
actor = uuid4()
base = {
"prev_hash": None,
"ts": ts,
"actor_id": actor,
"action": "ttp.create",
"resource_type": "ttp",
"resource_id": "t-1",
"metadata": {"name": "whoami"},
}
h1 = audit_hash(**base)
h2 = audit_hash(**{**base, "metadata": {"name": "whoami2"}})
assert h1 != h2
assert len(h1) == 64
assert all(c in "0123456789abcdef" for c in h1)
def test_hash_changes_with_prev_hash() -> None:
ts = datetime(2026, 5, 21, 12, 0, tzinfo=UTC)
h_no_prev = audit_hash(
prev_hash=None,
ts=ts,
actor_id=None,
action="x",
resource_type="r",
resource_id="1",
metadata={},
)
h_with_prev = audit_hash(
prev_hash="abc",
ts=ts,
actor_id=None,
action="x",
resource_type="r",
resource_id="1",
metadata={},
)
assert h_no_prev != h_with_prev
def test_hash_stable_for_same_input() -> None:
ts = datetime(2026, 5, 21, 12, 0, tzinfo=UTC)
base = {
"prev_hash": "aa",
"ts": ts,
"actor_id": None,
"action": "x",
"resource_type": "r",
"resource_id": "1",
"metadata": {"k": "v"},
}
assert audit_hash(**base) == audit_hash(**base)

View File

@@ -0,0 +1,92 @@
"""C2Connector factory + payload mapping tests."""
from __future__ import annotations
import pytest
from mimic.connectors import (
C2Connector,
ConnectorFactory,
Payload,
TaskHandle,
TaskResult,
TaskStatus,
UnsupportedPayloadType,
register_connector,
)
from mimic.connectors.factory import _REGISTRY
from mimic.connectors.payload_map import resolve_native, supports
from mimic.db.models.host import Host
from mimic.db.types import C2Type, PayloadType
@pytest.fixture(autouse=True)
def _clear_registry() -> None:
"""Each test starts with a clean connector registry."""
snapshot = dict(_REGISTRY)
_REGISTRY.clear()
yield
_REGISTRY.clear()
_REGISTRY.update(snapshot)
class _NullConnector(C2Connector):
def authenticate(self, config: dict[str, object]) -> None: ...
def list_hosts(self, engagement_id: str) -> list[Host]:
return []
def execute_task(self, host: Host, payload: Payload) -> TaskHandle:
return TaskHandle(
c2=self.name,
c2_task_id="t-1",
host_id="h-1",
payload_type=payload.payload_type,
)
def get_task_result(self, handle: TaskHandle) -> TaskResult:
return TaskResult(status=TaskStatus.COMPLETED, output_text="ok")
def cancel_task(self, handle: TaskHandle) -> None: ...
def execute_cleanup(
self, host: Host, resolved_command: str, params: dict[str, object]
) -> TaskResult:
return TaskResult(status=TaskStatus.COMPLETED)
def test_register_and_build() -> None:
register_connector(C2Type.MYTHIC)(_NullConnector)
factory = ConnectorFactory(config_resolver=lambda _: {})
connector = factory.build(C2Type.MYTHIC)
assert isinstance(connector, _NullConnector)
assert connector.name is C2Type.MYTHIC
def test_double_registration_rejected() -> None:
register_connector(C2Type.MYTHIC)(_NullConnector)
with pytest.raises(RuntimeError, match="already registered"):
register_connector(C2Type.MYTHIC)(_NullConnector)
def test_build_unknown_raises_not_implemented() -> None:
factory = ConnectorFactory(config_resolver=lambda _: {})
with pytest.raises(NotImplementedError):
factory.build(C2Type.HOME)
def test_task_status_is_terminal() -> None:
assert TaskStatus.COMPLETED.is_terminal
assert TaskStatus.FAILED.is_terminal
assert TaskStatus.CANCELED.is_terminal
assert not TaskStatus.RUNNING.is_terminal
def test_mythic_mapping_covers_powershell() -> None:
assert resolve_native(C2Type.MYTHIC, PayloadType.POWERSHELL) == "powershell"
assert supports(C2Type.MYTHIC, PayloadType.SHELLCODE)
def test_home_mapping_empty_until_pr2() -> None:
assert not supports(C2Type.HOME, PayloadType.CMD)
with pytest.raises(UnsupportedPayloadType):
resolve_native(C2Type.HOME, PayloadType.CMD)

View File

@@ -0,0 +1,31 @@
"""Local-auth bcrypt helpers."""
from __future__ import annotations
import pytest
from mimic.auth.password import check_password, hash_password
def test_hash_then_check_succeeds() -> None:
hashed = hash_password("Sup3rSecret!", rounds=4)
assert check_password("Sup3rSecret!", hashed) is True
def test_check_rejects_wrong_password() -> None:
hashed = hash_password("right", rounds=4)
assert check_password("wrong", hashed) is False
def test_empty_password_raises() -> None:
with pytest.raises(ValueError, match="must not be empty"):
hash_password("")
def test_check_missing_hash_returns_false() -> None:
assert check_password("anything", None) is False
assert check_password("anything", "") is False
def test_check_invalid_hash_returns_false() -> None:
assert check_password("anything", "not-a-bcrypt-hash") is False

View File

@@ -0,0 +1,55 @@
"""F11 matrix coverage tests."""
from __future__ import annotations
from mimic.rbac.matrix import GROUP_PERMISSIONS, GroupName, Permission
def test_every_group_has_at_least_one_permission() -> None:
for group, perms in GROUP_PERMISSIONS.items():
assert perms, f"group {group.value} has no permissions"
def test_rt_lead_is_superset_of_operator() -> None:
lead = GROUP_PERMISSIONS[GroupName.RT_LEAD]
operator = GROUP_PERMISSIONS[GroupName.RT_OPERATOR]
assert operator <= lead
def test_soc_cannot_start_runs() -> None:
soc = GROUP_PERMISSIONS[GroupName.SOC_ANALYST]
assert Permission.RUN_START not in soc
assert Permission.RUN_CONTROL not in soc
def test_only_lead_promotes_ttp() -> None:
operator = GROUP_PERMISSIONS[GroupName.RT_OPERATOR]
soc = GROUP_PERMISSIONS[GroupName.SOC_ANALYST]
assert Permission.TTP_PROMOTE not in operator
assert Permission.TTP_PROMOTE not in soc
assert Permission.TTP_PROMOTE in GROUP_PERMISSIONS[GroupName.RT_LEAD]
def test_audit_read_lead_only() -> None:
for group in (GroupName.RT_OPERATOR, GroupName.SOC_ANALYST):
assert Permission.AUDIT_READ not in GROUP_PERMISSIONS[group]
assert Permission.AUDIT_READ in GROUP_PERMISSIONS[GroupName.RT_LEAD]
def test_only_lead_issues_soc_tokens() -> None:
for group in (GroupName.RT_OPERATOR, GroupName.SOC_ANALYST):
assert Permission.ENGAGEMENT_SOC_TOKEN_ISSUE not in GROUP_PERMISSIONS[group]
def test_operator_cannot_control_run() -> None:
operator = GROUP_PERMISSIONS[GroupName.RT_OPERATOR]
assert Permission.RUN_START not in operator
assert Permission.RUN_CONTROL not in operator
def test_soc_can_only_read_report_and_add_detection() -> None:
soc = GROUP_PERMISSIONS[GroupName.SOC_ANALYST]
assert Permission.DETECTION_ADD in soc
assert Permission.REPORT_READ in soc
assert Permission.EVIDENCE_ADD not in soc
assert Permission.HOST_CRUD not in soc

View File

@@ -0,0 +1,27 @@
"""SOC opaque token generation / verification."""
from __future__ import annotations
from mimic.auth.soc_token import generate_token, verify_token
def test_generated_token_verifies() -> None:
material = generate_token(rounds=4)
assert verify_token(material.plain, material.hashed) is True
def test_different_plain_does_not_verify() -> None:
material = generate_token(rounds=4)
assert verify_token("wrong-token", material.hashed) is False
def test_plain_is_url_safe_and_long() -> None:
material = generate_token(rounds=4)
# 32 random bytes → ~43 url-safe base64 chars.
assert len(material.plain) >= 32
assert all(c.isalnum() or c in "-_" for c in material.plain)
def test_verify_with_empty_values() -> None:
assert verify_token("", "$2b$04$abc") is False
assert verify_token("token", "") is False

View File

@@ -0,0 +1,80 @@
"""Jinja2 sandbox + regex_extract tests."""
from __future__ import annotations
import pytest
from mimic.templating.filters import regex_extract
from mimic.templating.sandbox import (
CleanupRenderer,
RenderError,
StepOutputs,
render_cleanup,
)
class TestRegexExtract:
def test_returns_capture_group(self) -> None:
assert regex_extract("hello world", r"hello (\w+)") == "world"
def test_default_when_no_match(self) -> None:
assert regex_extract("hello", r"foo(\d+)", default="N/A") == "N/A"
def test_none_input_returns_default(self) -> None:
assert regex_extract(None, r"x", default="empty") == "empty"
def test_supports_group_zero(self) -> None:
assert regex_extract("abc123", r"\w+\d+", group=0) == "abc123"
class TestCleanupRenderer:
def setup_method(self) -> None:
self.renderer = CleanupRenderer()
def test_render_params(self) -> None:
out = self.renderer.render(
"echo {{ params.target }}",
params={"target": "WIN-01"},
)
assert out == "echo WIN-01"
def test_render_outputs_text(self) -> None:
out = self.renderer.render(
'echo "{{ outputs.text }}"',
outputs=StepOutputs(text="captured"),
)
assert out == 'echo "captured"'
def test_regex_extract_filter(self) -> None:
out = self.renderer.render(
r"{{ outputs.text | regex_extract('pid=(\\d+)') }}",
outputs=StepOutputs(text="status: pid=4242 user=svc"),
)
assert out == "4242"
def test_strict_undefined_raises(self) -> None:
with pytest.raises(RenderError):
self.renderer.render("{{ params.does_not_exist }}", params={})
def test_sandbox_forbids_attribute_access(self) -> None:
with pytest.raises(RenderError):
self.renderer.render(
"{{ ().__class__.__bases__[0].__subclasses__() }}",
params={},
)
def test_module_singleton_round_trip(self) -> None:
out = render_cleanup("hello {{ params.x }}", params={"x": "there"})
assert out == "hello there"
class TestStepOutputsBlob:
def test_blob_returns_empty_when_no_path(self) -> None:
out = StepOutputs(text="x")
assert out.blob() == ""
def test_blob_caps_size(self, tmp_path) -> None:
blob = tmp_path / "evidence.bin"
blob.write_bytes(b"A" * 1024)
out = StepOutputs(blob_path=blob, blob_max_bytes=10)
assert out.blob() == "A" * 10