feat(backend): add C2Connector ABC + payload mapping + factory (B0.4)

- abstract C2Connector with authenticate / list_hosts / execute_task /
  get_task_result / cancel_task / execute_cleanup; stream_task_output
  optional v1 (NotImplementedError).
- Payload / TaskHandle / TaskResult / TaskStatus frozen dataclasses.
- UnsupportedPayloadType raised when no native command maps to the
  chosen (c2_type, payload_type) pair.
- Mythic payload_type → native command map populated (spec §7 table).
- HOME map left empty until PR2 is closed.
- ConnectorFactory: register_connector decorator + build(c2_type) that
  instantiates + authenticates via an injected config resolver.

No real Mythic / Home implementations land in this sprint.
This commit is contained in:
knacky
2026-05-21 20:33:06 +02:00
parent 22d37fb240
commit 20112d61ff
4 changed files with 251 additions and 0 deletions

View File

@@ -0,0 +1,27 @@
"""C2Connector abstraction.
Sprint 0 ships the interface + dataclasses + payload mapping + factory.
Concrete `MythicConnector` and `HomeConnector` implementations land after
PR1 and PR2 respectively.
"""
from mimic.connectors.base import (
C2Connector,
Payload,
TaskHandle,
TaskResult,
TaskStatus,
UnsupportedPayloadType,
)
from mimic.connectors.factory import ConnectorFactory, register_connector
__all__ = [
"C2Connector",
"ConnectorFactory",
"Payload",
"TaskHandle",
"TaskResult",
"TaskStatus",
"UnsupportedPayloadType",
"register_connector",
]

View File

@@ -0,0 +1,122 @@
"""Abstract C2 connector interface (spec §7).
The orchestrator calls `execute_task` → polls `get_task_result` every 500 ms
until a terminal `TaskStatus`. `stream_task_output` is optional in v1.
`cancel_task` backs F6 Abort. `execute_cleanup` runs the resolved Jinja2
template against the C2 (spec F15).
"""
from __future__ import annotations
import enum
from abc import ABC, abstractmethod
from collections.abc import Iterator
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from mimic.db.types import C2Type, PayloadType
if TYPE_CHECKING:
from mimic.db.models.host import Host
class UnsupportedPayloadType(RuntimeError): # noqa: N818 (kept for spec wording)
"""Raised when the chosen C2 has no native command for a payload kind.
The name mirrors the exact identifier from spec §7 (`UnsupportedPayloadType`).
The trailing `Error` suffix is intentionally omitted to keep the spec link
one-to-one.
"""
def __init__(self, c2: C2Type, payload_type: PayloadType) -> None:
super().__init__(f"{c2.value} does not support payload_type={payload_type.value}")
self.c2 = c2
self.payload_type = payload_type
class TaskStatus(enum.StrEnum):
"""Terminal and non-terminal task lifecycle states."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELED = "canceled"
@property
def is_terminal(self) -> bool:
return self in {TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELED}
@dataclass(frozen=True, slots=True)
class Payload:
"""Self-contained payload sent to a C2."""
payload_type: PayloadType
template_text: str
params: dict[str, object] = field(default_factory=dict)
mimic_run_id: str | None = None
is_stealth_variant: bool = False
# When True the orchestrator MUST strip the MIMIC marker (NF-OPSEC).
@dataclass(frozen=True, slots=True)
class TaskHandle:
"""Opaque per-connector reference to a started task."""
c2: C2Type
c2_task_id: str
host_id: str
payload_type: PayloadType
@dataclass(frozen=True, slots=True)
class TaskResult:
status: TaskStatus
output_text: str = ""
output_blob_ref: str | None = None
exit_code: int | None = None
error_message: str | None = None
class C2Connector(ABC):
"""Abstract base for every C2 backend."""
name: C2Type
@abstractmethod
def authenticate(self, config: dict[str, object]) -> None:
"""Open / refresh the auth context for this connector."""
@abstractmethod
def list_hosts(self, engagement_id: str) -> list[Host]:
"""Return hosts known by the C2 for the given engagement."""
@abstractmethod
def execute_task(self, host: Host, payload: Payload) -> TaskHandle:
"""Start a task on `host`. MUST NOT block."""
@abstractmethod
def get_task_result(self, handle: TaskHandle) -> TaskResult:
"""Poll status (called every 500 ms by the orchestrator)."""
def stream_task_output(self, handle: TaskHandle) -> Iterator[bytes]:
"""Optional v2. Connectors may leave this raising NotImplementedError."""
raise NotImplementedError
@abstractmethod
def cancel_task(self, handle: TaskHandle) -> None:
"""Abort a running task (F6 Abort)."""
@abstractmethod
def execute_cleanup(
self,
host: Host,
resolved_command: str,
params: dict[str, object],
) -> TaskResult:
"""Run a fully-resolved cleanup command (F15).
The Jinja2 template is rendered by the orchestrator BEFORE this call;
connectors never see template variables.
"""

View File

@@ -0,0 +1,55 @@
"""Connector factory keyed on `c2_type`.
Concrete connectors register themselves at import time via the
`@register_connector` decorator. Sprint 0 ships only the interface — no real
implementation registers in this codebase yet.
"""
from __future__ import annotations
from collections.abc import Callable
from typing import TypeVar
from mimic.connectors.base import C2Connector
from mimic.db.types import C2Type
ConnectorClass = type[C2Connector]
C = TypeVar("C", bound=C2Connector)
_REGISTRY: dict[C2Type, ConnectorClass] = {}
def register_connector(c2_type: C2Type) -> Callable[[type[C]], type[C]]:
"""Class decorator: register a concrete connector under its C2Type."""
def _wrap(klass: type[C]) -> type[C]:
if c2_type in _REGISTRY:
raise RuntimeError(f"connector already registered for {c2_type.value}")
_REGISTRY[c2_type] = klass
klass.name = c2_type
return klass
return _wrap
class ConnectorFactory:
"""Resolves a connector instance for a given C2 type."""
def __init__(self, config_resolver: Callable[[C2Type], dict[str, object]]):
self._resolver = config_resolver
@staticmethod
def registered() -> dict[C2Type, ConnectorClass]:
return dict(_REGISTRY)
def build(self, c2_type: C2Type) -> C2Connector:
try:
klass = _REGISTRY[c2_type]
except KeyError as exc:
raise NotImplementedError(
f"no connector registered for {c2_type.value}"
) from exc
connector = klass()
connector.authenticate(self._resolver(c2_type))
return connector

View File

@@ -0,0 +1,47 @@
"""Static `payload_type` → native command mapping per C2 (spec §7 table).
Concrete connectors consume this map in `execute_task` to translate the
neutral `PayloadType` into the right C2 verb. Unmapped combinations raise
`UnsupportedPayloadType`.
"""
from __future__ import annotations
from mimic.connectors.base import UnsupportedPayloadType
from mimic.db.types import C2Type, PayloadType
MYTHIC_MAP: dict[PayloadType, str] = {
PayloadType.CMD: "shell",
PayloadType.POWERSHELL: "powershell",
PayloadType.BOF: "inline-execute",
PayloadType.DOTNET_ASSEMBLY: "execute-assembly",
PayloadType.DOTNET_EXE: "execute-assembly",
PayloadType.PE_EXE: "spawn",
PayloadType.PE_DLL: "loadlibrary",
PayloadType.SHELLCODE: "inject",
PayloadType.PYTHON: "python",
PayloadType.VBS: "shell",
PayloadType.WMI_QUERY: "wmi_query",
PayloadType.REGISTRY: "reg",
PayloadType.SCRIPT_FILE: "upload_and_exec",
}
# Home connector mapping is TBD (PR2). Empty dict = nothing supported yet.
HOME_MAP: dict[PayloadType, str] = {}
_BY_C2: dict[C2Type, dict[PayloadType, str]] = {
C2Type.MYTHIC: MYTHIC_MAP,
C2Type.HOME: HOME_MAP,
}
def resolve_native(c2: C2Type, payload_type: PayloadType) -> str:
"""Resolve the native command for a (c2, payload_type) pair."""
mapping = _BY_C2.get(c2, {})
if payload_type not in mapping:
raise UnsupportedPayloadType(c2, payload_type)
return mapping[payload_type]
def supports(c2: C2Type, payload_type: PayloadType) -> bool:
return payload_type in _BY_C2.get(c2, {})