Command source decision: extended C2TaskStatus with command: str | None (default None). Added command_name to _GET_TASK_QUERY so get_task() returns command in a single round-trip — no separate history fetch needed on import. 4-line change, zero cascading test impact. adapter.py: - C2TaskStatus: add command: str | None = None field - C2HistoricalTask: new dataclass (display_id, command, params, status, completed, timestamp) for history rows - C2TaskPage.items: typed as list[C2HistoricalTask] (was list[dict]) mythic.py: - _GET_TASK_QUERY: add command_name field - _LIST_CALLBACK_TASKS_QUERY: new query (order_by id desc, limit/offset) - _COUNT_CALLBACK_TASKS_QUERY: new aggregate query for total - get_task(): surfaces command_name as status.command - list_callback_tasks(): two _post() calls (tasks + count), allow_redirects=False fake.py: - _FAKE_HISTORY: frozen deterministic history (cb1=12, cb2=0, cb3=5 tasks) - list_callback_tasks(): serves from _FAKE_HISTORY, pagination applied - get_task(): returns command from _tasks dict api/c2.py: - GET /api/engagements/<eid>/c2/callbacks/<cid>/history: page+page_size defaults 1/25, cap 100, reject <1, 502 on adapter error - POST /api/simulations/<sid>/c2/import: idempotent per (sim,mythic_id) pair, source=import, completed tasks get output+mapping_applied, incomplete tasks stored for poll-on-read pickup, auto-transition pending→in_progress 60 new tests (456 total); pytest/ruff/mypy all green Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
118 lines
2.8 KiB
Python
118 lines
2.8 KiB
Python
"""Abstract C2 adapter interface and shared dataclasses."""
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import binascii
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
|
|
|
|
class C2Error(Exception):
|
|
"""Raised by adapters when the C2 returns an application-level error."""
|
|
|
|
|
|
@dataclass
|
|
class C2Health:
|
|
ok: bool
|
|
error: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class C2Callback:
|
|
display_id: int
|
|
active: bool
|
|
host: str
|
|
user: str
|
|
domain: str
|
|
last_checkin: str # ISO-8601 string
|
|
|
|
|
|
@dataclass
|
|
class C2TaskStatus:
|
|
display_id: int
|
|
status: str
|
|
completed: bool
|
|
completed_at: datetime | None = field(default=None)
|
|
# command_name is populated by get_task() so import doesn't need a second round-trip.
|
|
command: str | None = field(default=None)
|
|
|
|
|
|
@dataclass
|
|
class C2HistoricalTask:
|
|
"""A task entry from callback history (carries command + params, unlike C2TaskStatus)."""
|
|
|
|
display_id: int
|
|
command: str
|
|
params: str | None
|
|
status: str
|
|
completed: bool
|
|
timestamp: str | None # ISO-8601 or None
|
|
|
|
|
|
@dataclass
|
|
class C2TaskPage:
|
|
items: list[C2HistoricalTask]
|
|
total: int
|
|
page: int
|
|
page_size: int
|
|
|
|
|
|
def decode_response_text(raw: str) -> str:
|
|
"""Decode a base64-encoded Mythic response_text field.
|
|
|
|
On binascii.Error (binary payload) returns "<binary> " + hex string
|
|
so execution_result never silently corrupts.
|
|
"""
|
|
try:
|
|
return base64.b64decode(raw).decode("utf-8")
|
|
except binascii.Error:
|
|
return "<binary> " + raw.encode().hex()
|
|
except UnicodeDecodeError:
|
|
raw_bytes = base64.b64decode(raw)
|
|
return "<binary> " + raw_bytes.hex()
|
|
|
|
|
|
class C2Adapter(ABC):
|
|
"""Thin interface over a C2 backend (Mythic or custom)."""
|
|
|
|
@abstractmethod
|
|
def test_connection(self) -> C2Health:
|
|
"""Verify that the C2 is reachable and the token is valid."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def list_callbacks(self) -> list[C2Callback]:
|
|
"""Return active callbacks visible to this API token."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def create_task(
|
|
self,
|
|
callback_display_id: int,
|
|
command: str,
|
|
params: str | None = None,
|
|
) -> int:
|
|
"""Issue a task and return its Mythic display_id."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def get_task(self, task_display_id: int) -> C2TaskStatus:
|
|
"""Return current status of a task."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def get_task_output(self, task_display_id: int) -> str:
|
|
"""Return decoded, concatenated output for a completed task."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def list_callback_tasks(
|
|
self,
|
|
callback_display_id: int,
|
|
page: int = 1,
|
|
page_size: int = 25,
|
|
) -> C2TaskPage:
|
|
"""Return a paginated history of tasks for a callback."""
|
|
...
|