Files

118 lines
2.8 KiB
Python
Raw Permalink Normal View History

"""Abstract C2 adapter interface and shared dataclasses."""
from __future__ import annotations
import base64
import binascii
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
class C2Error(Exception):
"""Raised by adapters when the C2 returns an application-level error."""
@dataclass
class C2Health:
ok: bool
error: str | None = None
@dataclass
class C2Callback:
display_id: int
active: bool
host: str
user: str
domain: str
last_checkin: str # ISO-8601 string
@dataclass
class C2TaskStatus:
display_id: int
status: str
completed: bool
completed_at: datetime | None = field(default=None)
feat(backend): c2 callback history + task import (sprint 8 M4) Command source decision: extended C2TaskStatus with command: str | None (default None). Added command_name to _GET_TASK_QUERY so get_task() returns command in a single round-trip — no separate history fetch needed on import. 4-line change, zero cascading test impact. adapter.py: - C2TaskStatus: add command: str | None = None field - C2HistoricalTask: new dataclass (display_id, command, params, status, completed, timestamp) for history rows - C2TaskPage.items: typed as list[C2HistoricalTask] (was list[dict]) mythic.py: - _GET_TASK_QUERY: add command_name field - _LIST_CALLBACK_TASKS_QUERY: new query (order_by id desc, limit/offset) - _COUNT_CALLBACK_TASKS_QUERY: new aggregate query for total - get_task(): surfaces command_name as status.command - list_callback_tasks(): two _post() calls (tasks + count), allow_redirects=False fake.py: - _FAKE_HISTORY: frozen deterministic history (cb1=12, cb2=0, cb3=5 tasks) - list_callback_tasks(): serves from _FAKE_HISTORY, pagination applied - get_task(): returns command from _tasks dict api/c2.py: - GET /api/engagements/<eid>/c2/callbacks/<cid>/history: page+page_size defaults 1/25, cap 100, reject <1, 502 on adapter error - POST /api/simulations/<sid>/c2/import: idempotent per (sim,mythic_id) pair, source=import, completed tasks get output+mapping_applied, incomplete tasks stored for poll-on-read pickup, auto-transition pending→in_progress 60 new tests (456 total); pytest/ruff/mypy all green Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-10 20:09:29 +02:00
# command_name is populated by get_task() so import doesn't need a second round-trip.
command: str | None = field(default=None)
@dataclass
class C2HistoricalTask:
"""A task entry from callback history (carries command + params, unlike C2TaskStatus)."""
display_id: int
command: str
params: str | None
status: str
completed: bool
timestamp: str | None # ISO-8601 or None
@dataclass
class C2TaskPage:
feat(backend): c2 callback history + task import (sprint 8 M4) Command source decision: extended C2TaskStatus with command: str | None (default None). Added command_name to _GET_TASK_QUERY so get_task() returns command in a single round-trip — no separate history fetch needed on import. 4-line change, zero cascading test impact. adapter.py: - C2TaskStatus: add command: str | None = None field - C2HistoricalTask: new dataclass (display_id, command, params, status, completed, timestamp) for history rows - C2TaskPage.items: typed as list[C2HistoricalTask] (was list[dict]) mythic.py: - _GET_TASK_QUERY: add command_name field - _LIST_CALLBACK_TASKS_QUERY: new query (order_by id desc, limit/offset) - _COUNT_CALLBACK_TASKS_QUERY: new aggregate query for total - get_task(): surfaces command_name as status.command - list_callback_tasks(): two _post() calls (tasks + count), allow_redirects=False fake.py: - _FAKE_HISTORY: frozen deterministic history (cb1=12, cb2=0, cb3=5 tasks) - list_callback_tasks(): serves from _FAKE_HISTORY, pagination applied - get_task(): returns command from _tasks dict api/c2.py: - GET /api/engagements/<eid>/c2/callbacks/<cid>/history: page+page_size defaults 1/25, cap 100, reject <1, 502 on adapter error - POST /api/simulations/<sid>/c2/import: idempotent per (sim,mythic_id) pair, source=import, completed tasks get output+mapping_applied, incomplete tasks stored for poll-on-read pickup, auto-transition pending→in_progress 60 new tests (456 total); pytest/ruff/mypy all green Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-10 20:09:29 +02:00
items: list[C2HistoricalTask]
total: int
page: int
page_size: int
def decode_response_text(raw: str) -> str:
"""Decode a base64-encoded Mythic response_text field.
On binascii.Error (binary payload) returns "<binary> " + hex string
so execution_result never silently corrupts.
"""
try:
return base64.b64decode(raw).decode("utf-8")
except binascii.Error:
return "<binary> " + raw.encode().hex()
except UnicodeDecodeError:
raw_bytes = base64.b64decode(raw)
return "<binary> " + raw_bytes.hex()
class C2Adapter(ABC):
"""Thin interface over a C2 backend (Mythic or custom)."""
@abstractmethod
def test_connection(self) -> C2Health:
"""Verify that the C2 is reachable and the token is valid."""
...
@abstractmethod
def list_callbacks(self) -> list[C2Callback]:
"""Return active callbacks visible to this API token."""
...
@abstractmethod
def create_task(
self,
callback_display_id: int,
command: str,
params: str | None = None,
) -> int:
"""Issue a task and return its Mythic display_id."""
...
@abstractmethod
def get_task(self, task_display_id: int) -> C2TaskStatus:
"""Return current status of a task."""
...
@abstractmethod
def get_task_output(self, task_display_id: int) -> str:
"""Return decoded, concatenated output for a completed task."""
...
@abstractmethod
def list_callback_tasks(
self,
callback_display_id: int,
page: int = 1,
page_size: int = 25,
) -> C2TaskPage:
"""Return a paginated history of tasks for a callback."""
...