93 lines
2.8 KiB
Python
93 lines
2.8 KiB
Python
|
|
"""C2Connector factory + payload mapping tests."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
|
||
|
|
from mimic.connectors import (
|
||
|
|
C2Connector,
|
||
|
|
ConnectorFactory,
|
||
|
|
Payload,
|
||
|
|
TaskHandle,
|
||
|
|
TaskResult,
|
||
|
|
TaskStatus,
|
||
|
|
UnsupportedPayloadType,
|
||
|
|
register_connector,
|
||
|
|
)
|
||
|
|
from mimic.connectors.factory import _REGISTRY
|
||
|
|
from mimic.connectors.payload_map import resolve_native, supports
|
||
|
|
from mimic.db.models.host import Host
|
||
|
|
from mimic.db.types import C2Type, PayloadType
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture(autouse=True)
|
||
|
|
def _clear_registry() -> None:
|
||
|
|
"""Each test starts with a clean connector registry."""
|
||
|
|
snapshot = dict(_REGISTRY)
|
||
|
|
_REGISTRY.clear()
|
||
|
|
yield
|
||
|
|
_REGISTRY.clear()
|
||
|
|
_REGISTRY.update(snapshot)
|
||
|
|
|
||
|
|
|
||
|
|
class _NullConnector(C2Connector):
|
||
|
|
def authenticate(self, config: dict[str, object]) -> None: ...
|
||
|
|
def list_hosts(self, engagement_id: str) -> list[Host]:
|
||
|
|
return []
|
||
|
|
|
||
|
|
def execute_task(self, host: Host, payload: Payload) -> TaskHandle:
|
||
|
|
return TaskHandle(
|
||
|
|
c2=self.name,
|
||
|
|
c2_task_id="t-1",
|
||
|
|
host_id="h-1",
|
||
|
|
payload_type=payload.payload_type,
|
||
|
|
)
|
||
|
|
|
||
|
|
def get_task_result(self, handle: TaskHandle) -> TaskResult:
|
||
|
|
return TaskResult(status=TaskStatus.COMPLETED, output_text="ok")
|
||
|
|
|
||
|
|
def cancel_task(self, handle: TaskHandle) -> None: ...
|
||
|
|
|
||
|
|
def execute_cleanup(
|
||
|
|
self, host: Host, resolved_command: str, params: dict[str, object]
|
||
|
|
) -> TaskResult:
|
||
|
|
return TaskResult(status=TaskStatus.COMPLETED)
|
||
|
|
|
||
|
|
|
||
|
|
def test_register_and_build() -> None:
|
||
|
|
register_connector(C2Type.MYTHIC)(_NullConnector)
|
||
|
|
factory = ConnectorFactory(config_resolver=lambda _: {})
|
||
|
|
connector = factory.build(C2Type.MYTHIC)
|
||
|
|
assert isinstance(connector, _NullConnector)
|
||
|
|
assert connector.name is C2Type.MYTHIC
|
||
|
|
|
||
|
|
|
||
|
|
def test_double_registration_rejected() -> None:
|
||
|
|
register_connector(C2Type.MYTHIC)(_NullConnector)
|
||
|
|
with pytest.raises(RuntimeError, match="already registered"):
|
||
|
|
register_connector(C2Type.MYTHIC)(_NullConnector)
|
||
|
|
|
||
|
|
|
||
|
|
def test_build_unknown_raises_not_implemented() -> None:
|
||
|
|
factory = ConnectorFactory(config_resolver=lambda _: {})
|
||
|
|
with pytest.raises(NotImplementedError):
|
||
|
|
factory.build(C2Type.HOME)
|
||
|
|
|
||
|
|
|
||
|
|
def test_task_status_is_terminal() -> None:
|
||
|
|
assert TaskStatus.COMPLETED.is_terminal
|
||
|
|
assert TaskStatus.FAILED.is_terminal
|
||
|
|
assert TaskStatus.CANCELED.is_terminal
|
||
|
|
assert not TaskStatus.RUNNING.is_terminal
|
||
|
|
|
||
|
|
|
||
|
|
def test_mythic_mapping_covers_powershell() -> None:
|
||
|
|
assert resolve_native(C2Type.MYTHIC, PayloadType.POWERSHELL) == "powershell"
|
||
|
|
assert supports(C2Type.MYTHIC, PayloadType.SHELLCODE)
|
||
|
|
|
||
|
|
|
||
|
|
def test_home_mapping_empty_until_pr2() -> None:
|
||
|
|
assert not supports(C2Type.HOME, PayloadType.CMD)
|
||
|
|
with pytest.raises(UnsupportedPayloadType):
|
||
|
|
resolve_native(C2Type.HOME, PayloadType.CMD)
|