From 35c3823ac79e6961ce629900e4b7da2c2e92d86d Mon Sep 17 00:00:00 2001 From: knacky Date: Thu, 21 May 2026 20:33:06 +0200 Subject: [PATCH] feat(backend): add C2Connector ABC + payload mapping + factory (B0.4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - abstract C2Connector with authenticate / list_hosts / execute_task / get_task_result / cancel_task / execute_cleanup; stream_task_output optional v1 (NotImplementedError). - Payload / TaskHandle / TaskResult / TaskStatus frozen dataclasses. - UnsupportedPayloadType raised when no native command maps to the chosen (c2_type, payload_type) pair. - Mythic payload_type → native command map populated (spec §7 table). - HOME map left empty until PR2 is closed. - ConnectorFactory: register_connector decorator + build(c2_type) that instantiates + authenticates via an injected config resolver. No real Mythic / Home implementations land in this sprint. --- backend/src/mimic/connectors/__init__.py | 27 +++++ backend/src/mimic/connectors/base.py | 122 ++++++++++++++++++++ backend/src/mimic/connectors/factory.py | 55 +++++++++ backend/src/mimic/connectors/payload_map.py | 47 ++++++++ 4 files changed, 251 insertions(+) create mode 100644 backend/src/mimic/connectors/__init__.py create mode 100644 backend/src/mimic/connectors/base.py create mode 100644 backend/src/mimic/connectors/factory.py create mode 100644 backend/src/mimic/connectors/payload_map.py diff --git a/backend/src/mimic/connectors/__init__.py b/backend/src/mimic/connectors/__init__.py new file mode 100644 index 0000000..f55ccba --- /dev/null +++ b/backend/src/mimic/connectors/__init__.py @@ -0,0 +1,27 @@ +"""C2Connector abstraction. + +Sprint 0 ships the interface + dataclasses + payload mapping + factory. +Concrete `MythicConnector` and `HomeConnector` implementations land after +PR1 and PR2 respectively. +""" + +from mimic.connectors.base import ( + C2Connector, + Payload, + TaskHandle, + TaskResult, + TaskStatus, + UnsupportedPayloadType, +) +from mimic.connectors.factory import ConnectorFactory, register_connector + +__all__ = [ + "C2Connector", + "ConnectorFactory", + "Payload", + "TaskHandle", + "TaskResult", + "TaskStatus", + "UnsupportedPayloadType", + "register_connector", +] diff --git a/backend/src/mimic/connectors/base.py b/backend/src/mimic/connectors/base.py new file mode 100644 index 0000000..4a56a6b --- /dev/null +++ b/backend/src/mimic/connectors/base.py @@ -0,0 +1,122 @@ +"""Abstract C2 connector interface (spec §7). + +The orchestrator calls `execute_task` → polls `get_task_result` every 500 ms +until a terminal `TaskStatus`. `stream_task_output` is optional in v1. +`cancel_task` backs F6 Abort. `execute_cleanup` runs the resolved Jinja2 +template against the C2 (spec F15). +""" + +from __future__ import annotations + +import enum +from abc import ABC, abstractmethod +from collections.abc import Iterator +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +from mimic.db.types import C2Type, PayloadType + +if TYPE_CHECKING: + from mimic.db.models.host import Host + + +class UnsupportedPayloadType(RuntimeError): # noqa: N818 (kept for spec wording) + """Raised when the chosen C2 has no native command for a payload kind. + + The name mirrors the exact identifier from spec §7 (`UnsupportedPayloadType`). + The trailing `Error` suffix is intentionally omitted to keep the spec link + one-to-one. + """ + + def __init__(self, c2: C2Type, payload_type: PayloadType) -> None: + super().__init__(f"{c2.value} does not support payload_type={payload_type.value}") + self.c2 = c2 + self.payload_type = payload_type + + +class TaskStatus(enum.StrEnum): + """Terminal and non-terminal task lifecycle states.""" + + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + CANCELED = "canceled" + + @property + def is_terminal(self) -> bool: + return self in {TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELED} + + +@dataclass(frozen=True, slots=True) +class Payload: + """Self-contained payload sent to a C2.""" + + payload_type: PayloadType + template_text: str + params: dict[str, object] = field(default_factory=dict) + mimic_run_id: str | None = None + is_stealth_variant: bool = False + # When True the orchestrator MUST strip the MIMIC marker (NF-OPSEC). + + +@dataclass(frozen=True, slots=True) +class TaskHandle: + """Opaque per-connector reference to a started task.""" + + c2: C2Type + c2_task_id: str + host_id: str + payload_type: PayloadType + + +@dataclass(frozen=True, slots=True) +class TaskResult: + status: TaskStatus + output_text: str = "" + output_blob_ref: str | None = None + exit_code: int | None = None + error_message: str | None = None + + +class C2Connector(ABC): + """Abstract base for every C2 backend.""" + + name: C2Type + + @abstractmethod + def authenticate(self, config: dict[str, object]) -> None: + """Open / refresh the auth context for this connector.""" + + @abstractmethod + def list_hosts(self, engagement_id: str) -> list[Host]: + """Return hosts known by the C2 for the given engagement.""" + + @abstractmethod + def execute_task(self, host: Host, payload: Payload) -> TaskHandle: + """Start a task on `host`. MUST NOT block.""" + + @abstractmethod + def get_task_result(self, handle: TaskHandle) -> TaskResult: + """Poll status (called every 500 ms by the orchestrator).""" + + def stream_task_output(self, handle: TaskHandle) -> Iterator[bytes]: + """Optional v2. Connectors may leave this raising NotImplementedError.""" + raise NotImplementedError + + @abstractmethod + def cancel_task(self, handle: TaskHandle) -> None: + """Abort a running task (F6 Abort).""" + + @abstractmethod + def execute_cleanup( + self, + host: Host, + resolved_command: str, + params: dict[str, object], + ) -> TaskResult: + """Run a fully-resolved cleanup command (F15). + + The Jinja2 template is rendered by the orchestrator BEFORE this call; + connectors never see template variables. + """ diff --git a/backend/src/mimic/connectors/factory.py b/backend/src/mimic/connectors/factory.py new file mode 100644 index 0000000..2c7c2cd --- /dev/null +++ b/backend/src/mimic/connectors/factory.py @@ -0,0 +1,55 @@ +"""Connector factory keyed on `c2_type`. + +Concrete connectors register themselves at import time via the +`@register_connector` decorator. Sprint 0 ships only the interface — no real +implementation registers in this codebase yet. +""" + +from __future__ import annotations + +from collections.abc import Callable +from typing import TypeVar + +from mimic.connectors.base import C2Connector +from mimic.db.types import C2Type + +ConnectorClass = type[C2Connector] +C = TypeVar("C", bound=C2Connector) + +_REGISTRY: dict[C2Type, ConnectorClass] = {} + + +def register_connector(c2_type: C2Type) -> Callable[[type[C]], type[C]]: + """Class decorator: register a concrete connector under its C2Type.""" + + def _wrap(klass: type[C]) -> type[C]: + if c2_type in _REGISTRY: + raise RuntimeError(f"connector already registered for {c2_type.value}") + _REGISTRY[c2_type] = klass + klass.name = c2_type + return klass + + return _wrap + + +class ConnectorFactory: + """Resolves a connector instance for a given C2 type.""" + + def __init__(self, config_resolver: Callable[[C2Type], dict[str, object]]): + self._resolver = config_resolver + + @staticmethod + def registered() -> dict[C2Type, ConnectorClass]: + return dict(_REGISTRY) + + def build(self, c2_type: C2Type) -> C2Connector: + try: + klass = _REGISTRY[c2_type] + except KeyError as exc: + raise NotImplementedError( + f"no connector registered for {c2_type.value}" + ) from exc + + connector = klass() + connector.authenticate(self._resolver(c2_type)) + return connector diff --git a/backend/src/mimic/connectors/payload_map.py b/backend/src/mimic/connectors/payload_map.py new file mode 100644 index 0000000..967c4f3 --- /dev/null +++ b/backend/src/mimic/connectors/payload_map.py @@ -0,0 +1,47 @@ +"""Static `payload_type` → native command mapping per C2 (spec §7 table). + +Concrete connectors consume this map in `execute_task` to translate the +neutral `PayloadType` into the right C2 verb. Unmapped combinations raise +`UnsupportedPayloadType`. +""" + +from __future__ import annotations + +from mimic.connectors.base import UnsupportedPayloadType +from mimic.db.types import C2Type, PayloadType + +MYTHIC_MAP: dict[PayloadType, str] = { + PayloadType.CMD: "shell", + PayloadType.POWERSHELL: "powershell", + PayloadType.BOF: "inline-execute", + PayloadType.DOTNET_ASSEMBLY: "execute-assembly", + PayloadType.DOTNET_EXE: "execute-assembly", + PayloadType.PE_EXE: "spawn", + PayloadType.PE_DLL: "loadlibrary", + PayloadType.SHELLCODE: "inject", + PayloadType.PYTHON: "python", + PayloadType.VBS: "shell", + PayloadType.WMI_QUERY: "wmi_query", + PayloadType.REGISTRY: "reg", + PayloadType.SCRIPT_FILE: "upload_and_exec", +} + +# Home connector mapping is TBD (PR2). Empty dict = nothing supported yet. +HOME_MAP: dict[PayloadType, str] = {} + +_BY_C2: dict[C2Type, dict[PayloadType, str]] = { + C2Type.MYTHIC: MYTHIC_MAP, + C2Type.HOME: HOME_MAP, +} + + +def resolve_native(c2: C2Type, payload_type: PayloadType) -> str: + """Resolve the native command for a (c2, payload_type) pair.""" + mapping = _BY_C2.get(c2, {}) + if payload_type not in mapping: + raise UnsupportedPayloadType(c2, payload_type) + return mapping[payload_type] + + +def supports(c2: C2Type, payload_type: PayloadType) -> bool: + return payload_type in _BY_C2.get(c2, {})