Files

294 lines
8.4 KiB
Python
Raw Permalink Normal View History

# Contract pinned from MythicMeta/Mythic_Scripting master @ 2026-06-10 (raw.githubusercontent.com/MythicMeta/Mythic_Scripting/master/mythic/mythic.py)
"""Mythic 3.x C2 adapter.
Transport: POST https://<host>:7443/graphql
Header: apitoken: <token>
Backend: Hasura-proxied Postgres behind nginx.
M1: test_connection()
M2: list_callbacks(), create_task()
M3: get_task(), get_task_output()
M4: list_callback_tasks()
"""
from __future__ import annotations
from datetime import datetime
import requests
from backend.app.services.c2.adapter import (
C2Adapter,
C2Callback,
C2Error,
C2Health,
feat(backend): c2 callback history + task import (sprint 8 M4) Command source decision: extended C2TaskStatus with command: str | None (default None). Added command_name to _GET_TASK_QUERY so get_task() returns command in a single round-trip — no separate history fetch needed on import. 4-line change, zero cascading test impact. adapter.py: - C2TaskStatus: add command: str | None = None field - C2HistoricalTask: new dataclass (display_id, command, params, status, completed, timestamp) for history rows - C2TaskPage.items: typed as list[C2HistoricalTask] (was list[dict]) mythic.py: - _GET_TASK_QUERY: add command_name field - _LIST_CALLBACK_TASKS_QUERY: new query (order_by id desc, limit/offset) - _COUNT_CALLBACK_TASKS_QUERY: new aggregate query for total - get_task(): surfaces command_name as status.command - list_callback_tasks(): two _post() calls (tasks + count), allow_redirects=False fake.py: - _FAKE_HISTORY: frozen deterministic history (cb1=12, cb2=0, cb3=5 tasks) - list_callback_tasks(): serves from _FAKE_HISTORY, pagination applied - get_task(): returns command from _tasks dict api/c2.py: - GET /api/engagements/<eid>/c2/callbacks/<cid>/history: page+page_size defaults 1/25, cap 100, reject <1, 502 on adapter error - POST /api/simulations/<sid>/c2/import: idempotent per (sim,mythic_id) pair, source=import, completed tasks get output+mapping_applied, incomplete tasks stored for poll-on-read pickup, auto-transition pending→in_progress 60 new tests (456 total); pytest/ruff/mypy all green Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-10 20:09:29 +02:00
C2HistoricalTask,
C2TaskPage,
C2TaskStatus,
decode_response_text,
)
_HEALTH_QUERY = "{ __typename }"
_CALLBACKS_QUERY = """
query {
callback(order_by: {id: asc}, where: {active: {_eq: true}}) {
id
display_id
active
host
user
domain
last_checkin
}
}
"""
_CREATE_TASK_MUTATION = """
mutation CreateTask($callback_id: Int!, $command: String!, $params: String!) {
createTask(
callback_id: $callback_id,
command: $command,
params: $params,
tasking_location: "command_line"
) {
id
display_id
error
}
}
"""
_GET_TASK_QUERY = """
query GetTask($display_id: Int!) {
task(where: {display_id: {_eq: $display_id}}) {
display_id
feat(backend): c2 callback history + task import (sprint 8 M4) Command source decision: extended C2TaskStatus with command: str | None (default None). Added command_name to _GET_TASK_QUERY so get_task() returns command in a single round-trip — no separate history fetch needed on import. 4-line change, zero cascading test impact. adapter.py: - C2TaskStatus: add command: str | None = None field - C2HistoricalTask: new dataclass (display_id, command, params, status, completed, timestamp) for history rows - C2TaskPage.items: typed as list[C2HistoricalTask] (was list[dict]) mythic.py: - _GET_TASK_QUERY: add command_name field - _LIST_CALLBACK_TASKS_QUERY: new query (order_by id desc, limit/offset) - _COUNT_CALLBACK_TASKS_QUERY: new aggregate query for total - get_task(): surfaces command_name as status.command - list_callback_tasks(): two _post() calls (tasks + count), allow_redirects=False fake.py: - _FAKE_HISTORY: frozen deterministic history (cb1=12, cb2=0, cb3=5 tasks) - list_callback_tasks(): serves from _FAKE_HISTORY, pagination applied - get_task(): returns command from _tasks dict api/c2.py: - GET /api/engagements/<eid>/c2/callbacks/<cid>/history: page+page_size defaults 1/25, cap 100, reject <1, 502 on adapter error - POST /api/simulations/<sid>/c2/import: idempotent per (sim,mythic_id) pair, source=import, completed tasks get output+mapping_applied, incomplete tasks stored for poll-on-read pickup, auto-transition pending→in_progress 60 new tests (456 total); pytest/ruff/mypy all green Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-10 20:09:29 +02:00
command_name
status
completed
timestamp
}
}
"""
feat(backend): c2 callback history + task import (sprint 8 M4) Command source decision: extended C2TaskStatus with command: str | None (default None). Added command_name to _GET_TASK_QUERY so get_task() returns command in a single round-trip — no separate history fetch needed on import. 4-line change, zero cascading test impact. adapter.py: - C2TaskStatus: add command: str | None = None field - C2HistoricalTask: new dataclass (display_id, command, params, status, completed, timestamp) for history rows - C2TaskPage.items: typed as list[C2HistoricalTask] (was list[dict]) mythic.py: - _GET_TASK_QUERY: add command_name field - _LIST_CALLBACK_TASKS_QUERY: new query (order_by id desc, limit/offset) - _COUNT_CALLBACK_TASKS_QUERY: new aggregate query for total - get_task(): surfaces command_name as status.command - list_callback_tasks(): two _post() calls (tasks + count), allow_redirects=False fake.py: - _FAKE_HISTORY: frozen deterministic history (cb1=12, cb2=0, cb3=5 tasks) - list_callback_tasks(): serves from _FAKE_HISTORY, pagination applied - get_task(): returns command from _tasks dict api/c2.py: - GET /api/engagements/<eid>/c2/callbacks/<cid>/history: page+page_size defaults 1/25, cap 100, reject <1, 502 on adapter error - POST /api/simulations/<sid>/c2/import: idempotent per (sim,mythic_id) pair, source=import, completed tasks get output+mapping_applied, incomplete tasks stored for poll-on-read pickup, auto-transition pending→in_progress 60 new tests (456 total); pytest/ruff/mypy all green Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-10 20:09:29 +02:00
_LIST_CALLBACK_TASKS_QUERY = """
query ListCallbackTasks($callback_display_id: Int!, $limit: Int!, $offset: Int!) {
task(
where: {callback: {display_id: {_eq: $callback_display_id}}}
order_by: {id: desc}
limit: $limit
offset: $offset
) {
display_id
command_name
params
status
completed
timestamp
}
}
"""
_COUNT_CALLBACK_TASKS_QUERY = """
query CountCallbackTasks($callback_display_id: Int!) {
task_aggregate(where: {callback: {display_id: {_eq: $callback_display_id}}}) {
aggregate {
count
}
}
}
"""
_GET_TASK_OUTPUT_QUERY = """
query GetTaskOutput($display_id: Int!) {
response(
where: {task: {display_id: {_eq: $display_id}}}
order_by: {id: asc}
) {
response_text
}
}
"""
class MythicAdapter(C2Adapter):
"""Real Mythic 3.x adapter using GraphQL over HTTP."""
def __init__(self, url: str, api_token: str, verify_tls: bool = True) -> None:
self._url = url.rstrip("/") + "/graphql"
self._token = api_token
self._verify = verify_tls
def _headers(self) -> dict[str, str]:
return {
"Content-Type": "application/json",
"apitoken": self._token,
}
def _post(self, body: dict) -> dict:
resp = requests.post(
self._url,
json=body,
headers=self._headers(),
verify=self._verify,
timeout=10,
allow_redirects=False,
)
resp.raise_for_status()
return resp.json()
def test_connection(self) -> C2Health:
"""POST a trivial introspection query to verify reachability and token validity."""
try:
resp = requests.post(
self._url,
json={"query": _HEALTH_QUERY},
headers=self._headers(),
verify=self._verify,
timeout=10,
allow_redirects=False,
)
if resp.status_code == 200:
return C2Health(ok=True)
return C2Health(ok=False, error=f"HTTP {resp.status_code}")
except requests.RequestException as exc:
return C2Health(ok=False, error=str(exc))
def list_callbacks(self) -> list[C2Callback]:
"""Return active callbacks from Mythic (filtered server-side: active=true)."""
try:
data = self._post({"query": _CALLBACKS_QUERY})
except requests.RequestException as exc:
raise C2Error(f"C2 transport error: {type(exc).__name__}") from exc
callbacks_raw = data.get("data", {}).get("callback", [])
return [
C2Callback(
display_id=cb["display_id"],
active=cb["active"],
host=cb.get("host") or "",
user=cb.get("user") or "",
domain=cb.get("domain") or "",
last_checkin=cb.get("last_checkin") or "",
)
for cb in callbacks_raw
]
def create_task(
self,
callback_display_id: int,
command: str,
params: str | None = None,
) -> int:
"""Issue a task on a callback; return Mythic task display_id."""
try:
data = self._post({
"query": _CREATE_TASK_MUTATION,
"variables": {
"callback_id": callback_display_id,
"command": command,
"params": params or "",
},
})
except requests.RequestException as exc:
raise C2Error(f"C2 transport error: {type(exc).__name__}") from exc
task_data = data.get("data", {}).get("createTask", {})
error_msg = task_data.get("error")
if error_msg:
raise C2Error(error_msg)
return int(task_data["display_id"])
def get_task(self, task_display_id: int) -> C2TaskStatus:
"""Return current task status from Mythic."""
try:
data = self._post({
"query": _GET_TASK_QUERY,
"variables": {"display_id": task_display_id},
})
except requests.RequestException as exc:
raise C2Error(f"C2 transport error: {type(exc).__name__}") from exc
rows = data.get("data", {}).get("task", [])
if not rows:
raise C2Error(f"task {task_display_id} not found in Mythic")
row = rows[0]
completed_at: datetime | None = None
if row.get("completed") and row.get("timestamp"):
try:
completed_at = datetime.fromisoformat(
row["timestamp"].replace("Z", "+00:00")
)
except ValueError:
completed_at = None
return C2TaskStatus(
display_id=row["display_id"],
status=row["status"],
completed=bool(row.get("completed", False)),
completed_at=completed_at,
feat(backend): c2 callback history + task import (sprint 8 M4) Command source decision: extended C2TaskStatus with command: str | None (default None). Added command_name to _GET_TASK_QUERY so get_task() returns command in a single round-trip — no separate history fetch needed on import. 4-line change, zero cascading test impact. adapter.py: - C2TaskStatus: add command: str | None = None field - C2HistoricalTask: new dataclass (display_id, command, params, status, completed, timestamp) for history rows - C2TaskPage.items: typed as list[C2HistoricalTask] (was list[dict]) mythic.py: - _GET_TASK_QUERY: add command_name field - _LIST_CALLBACK_TASKS_QUERY: new query (order_by id desc, limit/offset) - _COUNT_CALLBACK_TASKS_QUERY: new aggregate query for total - get_task(): surfaces command_name as status.command - list_callback_tasks(): two _post() calls (tasks + count), allow_redirects=False fake.py: - _FAKE_HISTORY: frozen deterministic history (cb1=12, cb2=0, cb3=5 tasks) - list_callback_tasks(): serves from _FAKE_HISTORY, pagination applied - get_task(): returns command from _tasks dict api/c2.py: - GET /api/engagements/<eid>/c2/callbacks/<cid>/history: page+page_size defaults 1/25, cap 100, reject <1, 502 on adapter error - POST /api/simulations/<sid>/c2/import: idempotent per (sim,mythic_id) pair, source=import, completed tasks get output+mapping_applied, incomplete tasks stored for poll-on-read pickup, auto-transition pending→in_progress 60 new tests (456 total); pytest/ruff/mypy all green Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-10 20:09:29 +02:00
command=row.get("command_name") or None,
)
def get_task_output(self, task_display_id: int) -> str:
"""Return decoded, concatenated output for a task."""
try:
data = self._post({
"query": _GET_TASK_OUTPUT_QUERY,
"variables": {"display_id": task_display_id},
})
except requests.RequestException as exc:
raise C2Error(f"C2 transport error: {type(exc).__name__}") from exc
rows = data.get("data", {}).get("response", [])
return "".join(
decode_response_text(r["response_text"])
for r in rows
if r.get("response_text")
)
def list_callback_tasks(
self,
callback_display_id: int,
page: int = 1,
page_size: int = 25,
) -> C2TaskPage:
feat(backend): c2 callback history + task import (sprint 8 M4) Command source decision: extended C2TaskStatus with command: str | None (default None). Added command_name to _GET_TASK_QUERY so get_task() returns command in a single round-trip — no separate history fetch needed on import. 4-line change, zero cascading test impact. adapter.py: - C2TaskStatus: add command: str | None = None field - C2HistoricalTask: new dataclass (display_id, command, params, status, completed, timestamp) for history rows - C2TaskPage.items: typed as list[C2HistoricalTask] (was list[dict]) mythic.py: - _GET_TASK_QUERY: add command_name field - _LIST_CALLBACK_TASKS_QUERY: new query (order_by id desc, limit/offset) - _COUNT_CALLBACK_TASKS_QUERY: new aggregate query for total - get_task(): surfaces command_name as status.command - list_callback_tasks(): two _post() calls (tasks + count), allow_redirects=False fake.py: - _FAKE_HISTORY: frozen deterministic history (cb1=12, cb2=0, cb3=5 tasks) - list_callback_tasks(): serves from _FAKE_HISTORY, pagination applied - get_task(): returns command from _tasks dict api/c2.py: - GET /api/engagements/<eid>/c2/callbacks/<cid>/history: page+page_size defaults 1/25, cap 100, reject <1, 502 on adapter error - POST /api/simulations/<sid>/c2/import: idempotent per (sim,mythic_id) pair, source=import, completed tasks get output+mapping_applied, incomplete tasks stored for poll-on-read pickup, auto-transition pending→in_progress 60 new tests (456 total); pytest/ruff/mypy all green Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-10 20:09:29 +02:00
"""Return a paginated, most-recent-first history of tasks for a callback."""
offset = (page - 1) * page_size
try:
data = self._post({
"query": _LIST_CALLBACK_TASKS_QUERY,
"variables": {
"callback_display_id": callback_display_id,
"limit": page_size,
"offset": offset,
},
})
count_data = self._post({
"query": _COUNT_CALLBACK_TASKS_QUERY,
"variables": {"callback_display_id": callback_display_id},
})
except requests.RequestException as exc:
raise C2Error(f"C2 transport error: {type(exc).__name__}") from exc
feat(backend): c2 callback history + task import (sprint 8 M4) Command source decision: extended C2TaskStatus with command: str | None (default None). Added command_name to _GET_TASK_QUERY so get_task() returns command in a single round-trip — no separate history fetch needed on import. 4-line change, zero cascading test impact. adapter.py: - C2TaskStatus: add command: str | None = None field - C2HistoricalTask: new dataclass (display_id, command, params, status, completed, timestamp) for history rows - C2TaskPage.items: typed as list[C2HistoricalTask] (was list[dict]) mythic.py: - _GET_TASK_QUERY: add command_name field - _LIST_CALLBACK_TASKS_QUERY: new query (order_by id desc, limit/offset) - _COUNT_CALLBACK_TASKS_QUERY: new aggregate query for total - get_task(): surfaces command_name as status.command - list_callback_tasks(): two _post() calls (tasks + count), allow_redirects=False fake.py: - _FAKE_HISTORY: frozen deterministic history (cb1=12, cb2=0, cb3=5 tasks) - list_callback_tasks(): serves from _FAKE_HISTORY, pagination applied - get_task(): returns command from _tasks dict api/c2.py: - GET /api/engagements/<eid>/c2/callbacks/<cid>/history: page+page_size defaults 1/25, cap 100, reject <1, 502 on adapter error - POST /api/simulations/<sid>/c2/import: idempotent per (sim,mythic_id) pair, source=import, completed tasks get output+mapping_applied, incomplete tasks stored for poll-on-read pickup, auto-transition pending→in_progress 60 new tests (456 total); pytest/ruff/mypy all green Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-10 20:09:29 +02:00
rows = data.get("data", {}).get("task", [])
total: int = (
count_data.get("data", {})
.get("task_aggregate", {})
.get("aggregate", {})
.get("count", 0)
)
items = [
C2HistoricalTask(
display_id=r["display_id"],
command=r.get("command_name") or "",
params=r.get("params") or None,
status=r.get("status") or "",
completed=bool(r.get("completed", False)),
timestamp=r.get("timestamp") or None,
)
for r in rows
]
return C2TaskPage(items=items, total=total, page=page, page_size=page_size)