test(backend): add pytest baseline (B0.8)
Unit (SQLite, pure logic): - test_templating.py: Jinja2 sandbox, regex_extract, strict-undefined, sandbox blocks attribute-access escape, output blob 10 MB cap. - test_password.py: bcrypt hash + verify, empty / malformed handling. - test_soc_token.py: 256-bit url-safe token + bcrypt verification. - test_rbac_matrix.py: F11 invariants (lead ⊇ operator, SOC restricted to detection + report-read, audit_read & ttp_promote lead-only). - test_connector_factory.py: register / build / double-register-rejected, TaskStatus terminal helper, Mythic mapping vs empty Home mapping. - test_audit_hash.py: SHA-256 chain helper is deterministic and reacts to prev_hash / metadata changes. Integration scaffold (testcontainers Postgres): - tests/integration/conftest.py spins up postgres:16-alpine, monkeypatches MIMIC_DATABASE_URL, creates a Flask app + db.create_all. - test_healthz.py: end-to-end smoke through the Flask test client. 38 unit tests pass; ruff clean.
This commit is contained in:
0
backend/tests/unit/__init__.py
Normal file
0
backend/tests/unit/__init__.py
Normal file
64
backend/tests/unit/test_audit_hash.py
Normal file
64
backend/tests/unit/test_audit_hash.py
Normal file
@@ -0,0 +1,64 @@
|
||||
"""Audit log hash-chain helper tests (pure function, no DB)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime
|
||||
from uuid import uuid4
|
||||
|
||||
from mimic.audit.log import audit_hash
|
||||
|
||||
|
||||
def test_hash_changes_with_metadata() -> None:
|
||||
ts = datetime(2026, 5, 21, 12, 0, tzinfo=UTC)
|
||||
actor = uuid4()
|
||||
base = {
|
||||
"prev_hash": None,
|
||||
"ts": ts,
|
||||
"actor_id": actor,
|
||||
"action": "ttp.create",
|
||||
"resource_type": "ttp",
|
||||
"resource_id": "t-1",
|
||||
"metadata": {"name": "whoami"},
|
||||
}
|
||||
h1 = audit_hash(**base)
|
||||
h2 = audit_hash(**{**base, "metadata": {"name": "whoami2"}})
|
||||
assert h1 != h2
|
||||
assert len(h1) == 64
|
||||
assert all(c in "0123456789abcdef" for c in h1)
|
||||
|
||||
|
||||
def test_hash_changes_with_prev_hash() -> None:
|
||||
ts = datetime(2026, 5, 21, 12, 0, tzinfo=UTC)
|
||||
h_no_prev = audit_hash(
|
||||
prev_hash=None,
|
||||
ts=ts,
|
||||
actor_id=None,
|
||||
action="x",
|
||||
resource_type="r",
|
||||
resource_id="1",
|
||||
metadata={},
|
||||
)
|
||||
h_with_prev = audit_hash(
|
||||
prev_hash="abc",
|
||||
ts=ts,
|
||||
actor_id=None,
|
||||
action="x",
|
||||
resource_type="r",
|
||||
resource_id="1",
|
||||
metadata={},
|
||||
)
|
||||
assert h_no_prev != h_with_prev
|
||||
|
||||
|
||||
def test_hash_stable_for_same_input() -> None:
|
||||
ts = datetime(2026, 5, 21, 12, 0, tzinfo=UTC)
|
||||
base = {
|
||||
"prev_hash": "aa",
|
||||
"ts": ts,
|
||||
"actor_id": None,
|
||||
"action": "x",
|
||||
"resource_type": "r",
|
||||
"resource_id": "1",
|
||||
"metadata": {"k": "v"},
|
||||
}
|
||||
assert audit_hash(**base) == audit_hash(**base)
|
||||
92
backend/tests/unit/test_connector_factory.py
Normal file
92
backend/tests/unit/test_connector_factory.py
Normal file
@@ -0,0 +1,92 @@
|
||||
"""C2Connector factory + payload mapping tests."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from mimic.connectors import (
|
||||
C2Connector,
|
||||
ConnectorFactory,
|
||||
Payload,
|
||||
TaskHandle,
|
||||
TaskResult,
|
||||
TaskStatus,
|
||||
UnsupportedPayloadType,
|
||||
register_connector,
|
||||
)
|
||||
from mimic.connectors.factory import _REGISTRY
|
||||
from mimic.connectors.payload_map import resolve_native, supports
|
||||
from mimic.db.models.host import Host
|
||||
from mimic.db.types import C2Type, PayloadType
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clear_registry() -> None:
|
||||
"""Each test starts with a clean connector registry."""
|
||||
snapshot = dict(_REGISTRY)
|
||||
_REGISTRY.clear()
|
||||
yield
|
||||
_REGISTRY.clear()
|
||||
_REGISTRY.update(snapshot)
|
||||
|
||||
|
||||
class _NullConnector(C2Connector):
|
||||
def authenticate(self, config: dict[str, object]) -> None: ...
|
||||
def list_hosts(self, engagement_id: str) -> list[Host]:
|
||||
return []
|
||||
|
||||
def execute_task(self, host: Host, payload: Payload) -> TaskHandle:
|
||||
return TaskHandle(
|
||||
c2=self.name,
|
||||
c2_task_id="t-1",
|
||||
host_id="h-1",
|
||||
payload_type=payload.payload_type,
|
||||
)
|
||||
|
||||
def get_task_result(self, handle: TaskHandle) -> TaskResult:
|
||||
return TaskResult(status=TaskStatus.COMPLETED, output_text="ok")
|
||||
|
||||
def cancel_task(self, handle: TaskHandle) -> None: ...
|
||||
|
||||
def execute_cleanup(
|
||||
self, host: Host, resolved_command: str, params: dict[str, object]
|
||||
) -> TaskResult:
|
||||
return TaskResult(status=TaskStatus.COMPLETED)
|
||||
|
||||
|
||||
def test_register_and_build() -> None:
|
||||
register_connector(C2Type.MYTHIC)(_NullConnector)
|
||||
factory = ConnectorFactory(config_resolver=lambda _: {})
|
||||
connector = factory.build(C2Type.MYTHIC)
|
||||
assert isinstance(connector, _NullConnector)
|
||||
assert connector.name is C2Type.MYTHIC
|
||||
|
||||
|
||||
def test_double_registration_rejected() -> None:
|
||||
register_connector(C2Type.MYTHIC)(_NullConnector)
|
||||
with pytest.raises(RuntimeError, match="already registered"):
|
||||
register_connector(C2Type.MYTHIC)(_NullConnector)
|
||||
|
||||
|
||||
def test_build_unknown_raises_not_implemented() -> None:
|
||||
factory = ConnectorFactory(config_resolver=lambda _: {})
|
||||
with pytest.raises(NotImplementedError):
|
||||
factory.build(C2Type.HOME)
|
||||
|
||||
|
||||
def test_task_status_is_terminal() -> None:
|
||||
assert TaskStatus.COMPLETED.is_terminal
|
||||
assert TaskStatus.FAILED.is_terminal
|
||||
assert TaskStatus.CANCELED.is_terminal
|
||||
assert not TaskStatus.RUNNING.is_terminal
|
||||
|
||||
|
||||
def test_mythic_mapping_covers_powershell() -> None:
|
||||
assert resolve_native(C2Type.MYTHIC, PayloadType.POWERSHELL) == "powershell"
|
||||
assert supports(C2Type.MYTHIC, PayloadType.SHELLCODE)
|
||||
|
||||
|
||||
def test_home_mapping_empty_until_pr2() -> None:
|
||||
assert not supports(C2Type.HOME, PayloadType.CMD)
|
||||
with pytest.raises(UnsupportedPayloadType):
|
||||
resolve_native(C2Type.HOME, PayloadType.CMD)
|
||||
31
backend/tests/unit/test_password.py
Normal file
31
backend/tests/unit/test_password.py
Normal file
@@ -0,0 +1,31 @@
|
||||
"""Local-auth bcrypt helpers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from mimic.auth.password import check_password, hash_password
|
||||
|
||||
|
||||
def test_hash_then_check_succeeds() -> None:
|
||||
hashed = hash_password("Sup3rSecret!", rounds=4)
|
||||
assert check_password("Sup3rSecret!", hashed) is True
|
||||
|
||||
|
||||
def test_check_rejects_wrong_password() -> None:
|
||||
hashed = hash_password("right", rounds=4)
|
||||
assert check_password("wrong", hashed) is False
|
||||
|
||||
|
||||
def test_empty_password_raises() -> None:
|
||||
with pytest.raises(ValueError, match="must not be empty"):
|
||||
hash_password("")
|
||||
|
||||
|
||||
def test_check_missing_hash_returns_false() -> None:
|
||||
assert check_password("anything", None) is False
|
||||
assert check_password("anything", "") is False
|
||||
|
||||
|
||||
def test_check_invalid_hash_returns_false() -> None:
|
||||
assert check_password("anything", "not-a-bcrypt-hash") is False
|
||||
55
backend/tests/unit/test_rbac_matrix.py
Normal file
55
backend/tests/unit/test_rbac_matrix.py
Normal file
@@ -0,0 +1,55 @@
|
||||
"""F11 matrix coverage tests."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from mimic.rbac.matrix import GROUP_PERMISSIONS, GroupName, Permission
|
||||
|
||||
|
||||
def test_every_group_has_at_least_one_permission() -> None:
|
||||
for group, perms in GROUP_PERMISSIONS.items():
|
||||
assert perms, f"group {group.value} has no permissions"
|
||||
|
||||
|
||||
def test_rt_lead_is_superset_of_operator() -> None:
|
||||
lead = GROUP_PERMISSIONS[GroupName.RT_LEAD]
|
||||
operator = GROUP_PERMISSIONS[GroupName.RT_OPERATOR]
|
||||
assert operator <= lead
|
||||
|
||||
|
||||
def test_soc_cannot_start_runs() -> None:
|
||||
soc = GROUP_PERMISSIONS[GroupName.SOC_ANALYST]
|
||||
assert Permission.RUN_START not in soc
|
||||
assert Permission.RUN_CONTROL not in soc
|
||||
|
||||
|
||||
def test_only_lead_promotes_ttp() -> None:
|
||||
operator = GROUP_PERMISSIONS[GroupName.RT_OPERATOR]
|
||||
soc = GROUP_PERMISSIONS[GroupName.SOC_ANALYST]
|
||||
assert Permission.TTP_PROMOTE not in operator
|
||||
assert Permission.TTP_PROMOTE not in soc
|
||||
assert Permission.TTP_PROMOTE in GROUP_PERMISSIONS[GroupName.RT_LEAD]
|
||||
|
||||
|
||||
def test_audit_read_lead_only() -> None:
|
||||
for group in (GroupName.RT_OPERATOR, GroupName.SOC_ANALYST):
|
||||
assert Permission.AUDIT_READ not in GROUP_PERMISSIONS[group]
|
||||
assert Permission.AUDIT_READ in GROUP_PERMISSIONS[GroupName.RT_LEAD]
|
||||
|
||||
|
||||
def test_only_lead_issues_soc_tokens() -> None:
|
||||
for group in (GroupName.RT_OPERATOR, GroupName.SOC_ANALYST):
|
||||
assert Permission.ENGAGEMENT_SOC_TOKEN_ISSUE not in GROUP_PERMISSIONS[group]
|
||||
|
||||
|
||||
def test_operator_cannot_control_run() -> None:
|
||||
operator = GROUP_PERMISSIONS[GroupName.RT_OPERATOR]
|
||||
assert Permission.RUN_START not in operator
|
||||
assert Permission.RUN_CONTROL not in operator
|
||||
|
||||
|
||||
def test_soc_can_only_read_report_and_add_detection() -> None:
|
||||
soc = GROUP_PERMISSIONS[GroupName.SOC_ANALYST]
|
||||
assert Permission.DETECTION_ADD in soc
|
||||
assert Permission.REPORT_READ in soc
|
||||
assert Permission.EVIDENCE_ADD not in soc
|
||||
assert Permission.HOST_CRUD not in soc
|
||||
27
backend/tests/unit/test_soc_token.py
Normal file
27
backend/tests/unit/test_soc_token.py
Normal file
@@ -0,0 +1,27 @@
|
||||
"""SOC opaque token generation / verification."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from mimic.auth.soc_token import generate_token, verify_token
|
||||
|
||||
|
||||
def test_generated_token_verifies() -> None:
|
||||
material = generate_token(rounds=4)
|
||||
assert verify_token(material.plain, material.hashed) is True
|
||||
|
||||
|
||||
def test_different_plain_does_not_verify() -> None:
|
||||
material = generate_token(rounds=4)
|
||||
assert verify_token("wrong-token", material.hashed) is False
|
||||
|
||||
|
||||
def test_plain_is_url_safe_and_long() -> None:
|
||||
material = generate_token(rounds=4)
|
||||
# 32 random bytes → ~43 url-safe base64 chars.
|
||||
assert len(material.plain) >= 32
|
||||
assert all(c.isalnum() or c in "-_" for c in material.plain)
|
||||
|
||||
|
||||
def test_verify_with_empty_values() -> None:
|
||||
assert verify_token("", "$2b$04$abc") is False
|
||||
assert verify_token("token", "") is False
|
||||
80
backend/tests/unit/test_templating.py
Normal file
80
backend/tests/unit/test_templating.py
Normal file
@@ -0,0 +1,80 @@
|
||||
"""Jinja2 sandbox + regex_extract tests."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from mimic.templating.filters import regex_extract
|
||||
from mimic.templating.sandbox import (
|
||||
CleanupRenderer,
|
||||
RenderError,
|
||||
StepOutputs,
|
||||
render_cleanup,
|
||||
)
|
||||
|
||||
|
||||
class TestRegexExtract:
|
||||
def test_returns_capture_group(self) -> None:
|
||||
assert regex_extract("hello world", r"hello (\w+)") == "world"
|
||||
|
||||
def test_default_when_no_match(self) -> None:
|
||||
assert regex_extract("hello", r"foo(\d+)", default="N/A") == "N/A"
|
||||
|
||||
def test_none_input_returns_default(self) -> None:
|
||||
assert regex_extract(None, r"x", default="empty") == "empty"
|
||||
|
||||
def test_supports_group_zero(self) -> None:
|
||||
assert regex_extract("abc123", r"\w+\d+", group=0) == "abc123"
|
||||
|
||||
|
||||
class TestCleanupRenderer:
|
||||
def setup_method(self) -> None:
|
||||
self.renderer = CleanupRenderer()
|
||||
|
||||
def test_render_params(self) -> None:
|
||||
out = self.renderer.render(
|
||||
"echo {{ params.target }}",
|
||||
params={"target": "WIN-01"},
|
||||
)
|
||||
assert out == "echo WIN-01"
|
||||
|
||||
def test_render_outputs_text(self) -> None:
|
||||
out = self.renderer.render(
|
||||
'echo "{{ outputs.text }}"',
|
||||
outputs=StepOutputs(text="captured"),
|
||||
)
|
||||
assert out == 'echo "captured"'
|
||||
|
||||
def test_regex_extract_filter(self) -> None:
|
||||
out = self.renderer.render(
|
||||
r"{{ outputs.text | regex_extract('pid=(\\d+)') }}",
|
||||
outputs=StepOutputs(text="status: pid=4242 user=svc"),
|
||||
)
|
||||
assert out == "4242"
|
||||
|
||||
def test_strict_undefined_raises(self) -> None:
|
||||
with pytest.raises(RenderError):
|
||||
self.renderer.render("{{ params.does_not_exist }}", params={})
|
||||
|
||||
def test_sandbox_forbids_attribute_access(self) -> None:
|
||||
with pytest.raises(RenderError):
|
||||
self.renderer.render(
|
||||
"{{ ().__class__.__bases__[0].__subclasses__() }}",
|
||||
params={},
|
||||
)
|
||||
|
||||
def test_module_singleton_round_trip(self) -> None:
|
||||
out = render_cleanup("hello {{ params.x }}", params={"x": "there"})
|
||||
assert out == "hello there"
|
||||
|
||||
|
||||
class TestStepOutputsBlob:
|
||||
def test_blob_returns_empty_when_no_path(self) -> None:
|
||||
out = StepOutputs(text="x")
|
||||
assert out.blob() == ""
|
||||
|
||||
def test_blob_caps_size(self, tmp_path) -> None:
|
||||
blob = tmp_path / "evidence.bin"
|
||||
blob.write_bytes(b"A" * 1024)
|
||||
out = StepOutputs(blob_path=blob, blob_max_bytes=10)
|
||||
assert out.blob() == "A" * 10
|
||||
Reference in New Issue
Block a user