Unit (SQLite, pure logic): - test_templating.py: Jinja2 sandbox, regex_extract, strict-undefined, sandbox blocks attribute-access escape, output blob 10 MB cap. - test_password.py: bcrypt hash + verify, empty / malformed handling. - test_soc_token.py: 256-bit url-safe token + bcrypt verification. - test_rbac_matrix.py: F11 invariants (lead ⊇ operator, SOC restricted to detection + report-read, audit_read & ttp_promote lead-only). - test_connector_factory.py: register / build / double-register-rejected, TaskStatus terminal helper, Mythic mapping vs empty Home mapping. - test_audit_hash.py: SHA-256 chain helper is deterministic and reacts to prev_hash / metadata changes. Integration scaffold (testcontainers Postgres): - tests/integration/conftest.py spins up postgres:16-alpine, monkeypatches MIMIC_DATABASE_URL, creates a Flask app + db.create_all. - test_healthz.py: end-to-end smoke through the Flask test client. 38 unit tests pass; ruff clean.
93 lines
2.8 KiB
Python
93 lines
2.8 KiB
Python
"""C2Connector factory + payload mapping tests."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
from mimic.connectors import (
|
|
C2Connector,
|
|
ConnectorFactory,
|
|
Payload,
|
|
TaskHandle,
|
|
TaskResult,
|
|
TaskStatus,
|
|
UnsupportedPayloadType,
|
|
register_connector,
|
|
)
|
|
from mimic.connectors.factory import _REGISTRY
|
|
from mimic.connectors.payload_map import resolve_native, supports
|
|
from mimic.db.models.host import Host
|
|
from mimic.db.types import C2Type, PayloadType
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _clear_registry() -> None:
|
|
"""Each test starts with a clean connector registry."""
|
|
snapshot = dict(_REGISTRY)
|
|
_REGISTRY.clear()
|
|
yield
|
|
_REGISTRY.clear()
|
|
_REGISTRY.update(snapshot)
|
|
|
|
|
|
class _NullConnector(C2Connector):
|
|
def authenticate(self, config: dict[str, object]) -> None: ...
|
|
def list_hosts(self, engagement_id: str) -> list[Host]:
|
|
return []
|
|
|
|
def execute_task(self, host: Host, payload: Payload) -> TaskHandle:
|
|
return TaskHandle(
|
|
c2=self.name,
|
|
c2_task_id="t-1",
|
|
host_id="h-1",
|
|
payload_type=payload.payload_type,
|
|
)
|
|
|
|
def get_task_result(self, handle: TaskHandle) -> TaskResult:
|
|
return TaskResult(status=TaskStatus.COMPLETED, output_text="ok")
|
|
|
|
def cancel_task(self, handle: TaskHandle) -> None: ...
|
|
|
|
def execute_cleanup(
|
|
self, host: Host, resolved_command: str, params: dict[str, object]
|
|
) -> TaskResult:
|
|
return TaskResult(status=TaskStatus.COMPLETED)
|
|
|
|
|
|
def test_register_and_build() -> None:
|
|
register_connector(C2Type.MYTHIC)(_NullConnector)
|
|
factory = ConnectorFactory(config_resolver=lambda _: {})
|
|
connector = factory.build(C2Type.MYTHIC)
|
|
assert isinstance(connector, _NullConnector)
|
|
assert connector.name is C2Type.MYTHIC
|
|
|
|
|
|
def test_double_registration_rejected() -> None:
|
|
register_connector(C2Type.MYTHIC)(_NullConnector)
|
|
with pytest.raises(RuntimeError, match="already registered"):
|
|
register_connector(C2Type.MYTHIC)(_NullConnector)
|
|
|
|
|
|
def test_build_unknown_raises_not_implemented() -> None:
|
|
factory = ConnectorFactory(config_resolver=lambda _: {})
|
|
with pytest.raises(NotImplementedError):
|
|
factory.build(C2Type.HOME)
|
|
|
|
|
|
def test_task_status_is_terminal() -> None:
|
|
assert TaskStatus.COMPLETED.is_terminal
|
|
assert TaskStatus.FAILED.is_terminal
|
|
assert TaskStatus.CANCELED.is_terminal
|
|
assert not TaskStatus.RUNNING.is_terminal
|
|
|
|
|
|
def test_mythic_mapping_covers_powershell() -> None:
|
|
assert resolve_native(C2Type.MYTHIC, PayloadType.POWERSHELL) == "powershell"
|
|
assert supports(C2Type.MYTHIC, PayloadType.SHELLCODE)
|
|
|
|
|
|
def test_home_mapping_empty_until_pr2() -> None:
|
|
assert not supports(C2Type.HOME, PayloadType.CMD)
|
|
with pytest.raises(UnsupportedPayloadType):
|
|
resolve_native(C2Type.HOME, PayloadType.CMD)
|