Files
mimic-big/backend/src/mimic/connectors/factory.py

56 lines
1.6 KiB
Python
Raw Normal View History

"""Connector factory keyed on `c2_type`.
Concrete connectors register themselves at import time via the
`@register_connector` decorator. Sprint 0 ships only the interface no real
implementation registers in this codebase yet.
"""
from __future__ import annotations
from collections.abc import Callable
from typing import TypeVar
from mimic.connectors.base import C2Connector
from mimic.db.types import C2Type
ConnectorClass = type[C2Connector]
C = TypeVar("C", bound=C2Connector)
_REGISTRY: dict[C2Type, ConnectorClass] = {}
def register_connector(c2_type: C2Type) -> Callable[[type[C]], type[C]]:
"""Class decorator: register a concrete connector under its C2Type."""
def _wrap(klass: type[C]) -> type[C]:
if c2_type in _REGISTRY:
raise RuntimeError(f"connector already registered for {c2_type.value}")
_REGISTRY[c2_type] = klass
klass.name = c2_type
return klass
return _wrap
class ConnectorFactory:
"""Resolves a connector instance for a given C2 type."""
def __init__(self, config_resolver: Callable[[C2Type], dict[str, object]]):
self._resolver = config_resolver
@staticmethod
def registered() -> dict[C2Type, ConnectorClass]:
return dict(_REGISTRY)
def build(self, c2_type: C2Type) -> C2Connector:
try:
klass = _REGISTRY[c2_type]
except KeyError as exc:
raise NotImplementedError(
f"no connector registered for {c2_type.value}"
) from exc
connector = klass()
connector.authenticate(self._resolver(c2_type))
return connector