Command source decision: extended C2TaskStatus with command: str | None (default None). Added command_name to _GET_TASK_QUERY so get_task() returns command in a single round-trip — no separate history fetch needed on import. 4-line change, zero cascading test impact. adapter.py: - C2TaskStatus: add command: str | None = None field - C2HistoricalTask: new dataclass (display_id, command, params, status, completed, timestamp) for history rows - C2TaskPage.items: typed as list[C2HistoricalTask] (was list[dict]) mythic.py: - _GET_TASK_QUERY: add command_name field - _LIST_CALLBACK_TASKS_QUERY: new query (order_by id desc, limit/offset) - _COUNT_CALLBACK_TASKS_QUERY: new aggregate query for total - get_task(): surfaces command_name as status.command - list_callback_tasks(): two _post() calls (tasks + count), allow_redirects=False fake.py: - _FAKE_HISTORY: frozen deterministic history (cb1=12, cb2=0, cb3=5 tasks) - list_callback_tasks(): serves from _FAKE_HISTORY, pagination applied - get_task(): returns command from _tasks dict api/c2.py: - GET /api/engagements/<eid>/c2/callbacks/<cid>/history: page+page_size defaults 1/25, cap 100, reject <1, 502 on adapter error - POST /api/simulations/<sid>/c2/import: idempotent per (sim,mythic_id) pair, source=import, completed tasks get output+mapping_applied, incomplete tasks stored for poll-on-read pickup, auto-transition pending→in_progress 60 new tests (456 total); pytest/ruff/mypy all green Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
168 lines
5.7 KiB
Python
168 lines
5.7 KiB
Python
"""MythicAdapter M4 tests — list_callback_tasks, mocked HTTP."""
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
import requests
|
|
import requests_mock as rm_module
|
|
|
|
from backend.app.services.c2.adapter import C2Error, C2HistoricalTask
|
|
from backend.app.services.c2.mythic import MythicAdapter
|
|
|
|
_BASE_URL = "https://mythic.lab:7443"
|
|
_GQL_URL = _BASE_URL + "/graphql"
|
|
_TOKEN = "fake-api-token"
|
|
|
|
|
|
@pytest.fixture()
|
|
def adapter():
|
|
return MythicAdapter(url=_BASE_URL, api_token=_TOKEN, verify_tls=False)
|
|
|
|
|
|
def _task_list_payload(tasks: list[dict]) -> dict:
|
|
return {"data": {"task": tasks}}
|
|
|
|
|
|
def _count_payload(count: int) -> dict:
|
|
return {"data": {"task_aggregate": {"aggregate": {"count": count}}}}
|
|
|
|
|
|
class TestMythicAdapterListCallbackTasks:
|
|
def test_returns_tasks_from_graphql(self, adapter):
|
|
tasks_payload = _task_list_payload([
|
|
{
|
|
"display_id": 7,
|
|
"command_name": "whoami",
|
|
"params": "",
|
|
"status": "completed",
|
|
"completed": True,
|
|
"timestamp": "2026-06-10T12:00:00Z",
|
|
}
|
|
])
|
|
count_payload = _count_payload(1)
|
|
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, [{"json": tasks_payload}, {"json": count_payload}])
|
|
page = adapter.list_callback_tasks(callback_display_id=1, page=1, page_size=25)
|
|
|
|
assert page.total == 1
|
|
assert len(page.items) == 1
|
|
item = page.items[0]
|
|
assert isinstance(item, C2HistoricalTask)
|
|
assert item.display_id == 7
|
|
assert item.command == "whoami"
|
|
assert item.completed is True
|
|
|
|
def test_pagination_offset_calculation(self, adapter):
|
|
"""page=2, page_size=10 → offset=10 must be sent to Mythic."""
|
|
tasks_payload = _task_list_payload([])
|
|
count_payload = _count_payload(0)
|
|
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, [{"json": tasks_payload}, {"json": count_payload}])
|
|
adapter.list_callback_tasks(callback_display_id=1, page=2, page_size=10)
|
|
|
|
# First request is the task list; check variables.
|
|
first_body = m.request_history[0].json()
|
|
variables = first_body.get("variables", {})
|
|
|
|
assert variables.get("offset") == 10
|
|
assert variables.get("limit") == 10
|
|
|
|
def test_sends_apitoken_header(self, adapter):
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, [
|
|
{"json": _task_list_payload([])},
|
|
{"json": _count_payload(0)},
|
|
])
|
|
adapter.list_callback_tasks(callback_display_id=1)
|
|
for req in m.request_history:
|
|
assert req.headers.get("apitoken") == _TOKEN
|
|
|
|
def test_empty_task_list(self, adapter):
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, [
|
|
{"json": _task_list_payload([])},
|
|
{"json": _count_payload(0)},
|
|
])
|
|
page = adapter.list_callback_tasks(callback_display_id=1)
|
|
|
|
assert page.total == 0
|
|
assert page.items == []
|
|
|
|
def test_network_error_raises_c2error(self, adapter):
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, exc=requests.exceptions.ConnectionError("refused"))
|
|
with pytest.raises(C2Error):
|
|
adapter.list_callback_tasks(callback_display_id=1)
|
|
|
|
def test_http_error_raises_c2error(self, adapter):
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, status_code=500, text="error")
|
|
with pytest.raises(C2Error):
|
|
adapter.list_callback_tasks(callback_display_id=1)
|
|
|
|
def test_no_redirect_followed(self, adapter):
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, status_code=301, headers={"Location": "https://evil.example/"})
|
|
with pytest.raises(C2Error):
|
|
adapter.list_callback_tasks(callback_display_id=1)
|
|
# Both requests (tasks + count) should each only make one attempt.
|
|
for req in m.request_history:
|
|
assert req.method == "POST"
|
|
|
|
def test_page_and_page_size_in_response(self, adapter):
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, [
|
|
{"json": _task_list_payload([])},
|
|
{"json": _count_payload(50)},
|
|
])
|
|
page = adapter.list_callback_tasks(callback_display_id=1, page=3, page_size=10)
|
|
|
|
assert page.page == 3
|
|
assert page.page_size == 10
|
|
assert page.total == 50
|
|
|
|
|
|
class TestMythicAdapterGetTaskCommandField:
|
|
"""Ensure command_name is surfaced via get_task() C2TaskStatus.command."""
|
|
|
|
def test_get_task_returns_command(self, adapter):
|
|
payload = {
|
|
"data": {
|
|
"task": [
|
|
{
|
|
"display_id": 7,
|
|
"command_name": "shell",
|
|
"status": "completed",
|
|
"completed": True,
|
|
"timestamp": "2026-06-10T12:00:00Z",
|
|
}
|
|
]
|
|
}
|
|
}
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, json=payload)
|
|
status = adapter.get_task(7)
|
|
|
|
assert status.command == "shell"
|
|
|
|
def test_get_task_command_none_when_missing(self, adapter):
|
|
payload = {
|
|
"data": {
|
|
"task": [
|
|
{
|
|
"display_id": 7,
|
|
"command_name": None,
|
|
"status": "submitted",
|
|
"completed": False,
|
|
"timestamp": None,
|
|
}
|
|
]
|
|
}
|
|
}
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, json=payload)
|
|
status = adapter.get_task(7)
|
|
|
|
assert status.command is None
|