- Add C2Error exception to adapter ABC - Add promote_to_in_progress() helper to simulation_workflow (pending→in_progress) - Flesh out MythicAdapter: list_callbacks() (GraphQL query) + create_task() (mutation) - Expand FakeAdapter to 3 deterministic callbacks; switch task store to per-instance - Add GET /api/engagements/<id>/c2/callbacks — lists active callbacks via adapter - Add POST /api/simulations/<id>/c2/execute — issues tasks, stores C2Task rows, auto-transitions pending→in_progress, blocks on done (409) - Both endpoints: SOC=403, 503 no-key, 502 adapter error, sanitized error messages - Add requests-mock==1.12.1 to requirements.txt - 42 new tests (342 total, 300 M1 baseline preserved green) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
138 lines
4.9 KiB
Python
138 lines
4.9 KiB
Python
"""MythicAdapter unit tests — mocked HTTP with requests-mock."""
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
import requests
|
|
import requests_mock as rm_module
|
|
|
|
from backend.app.services.c2.adapter import C2Error
|
|
from backend.app.services.c2.mythic import MythicAdapter
|
|
|
|
_BASE_URL = "https://mythic.lab:7443"
|
|
_GQL_URL = _BASE_URL + "/graphql"
|
|
_TOKEN = "fake-api-token"
|
|
|
|
|
|
@pytest.fixture()
|
|
def adapter():
|
|
return MythicAdapter(url=_BASE_URL, api_token=_TOKEN, verify_tls=False)
|
|
|
|
|
|
class TestMythicAdapterListCallbacks:
|
|
def test_returns_callbacks_from_graphql(self, adapter):
|
|
payload = {
|
|
"data": {
|
|
"callback": [
|
|
{
|
|
"id": 1,
|
|
"display_id": 1,
|
|
"active": True,
|
|
"host": "HOST-01",
|
|
"user": "jdoe",
|
|
"domain": "LAB",
|
|
"last_checkin": "2026-06-10T00:00:00Z",
|
|
}
|
|
]
|
|
}
|
|
}
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, json=payload)
|
|
callbacks = adapter.list_callbacks()
|
|
|
|
assert len(callbacks) == 1
|
|
assert callbacks[0].display_id == 1
|
|
assert callbacks[0].host == "HOST-01"
|
|
assert callbacks[0].user == "jdoe"
|
|
|
|
def test_sends_apitoken_header(self, adapter):
|
|
payload = {"data": {"callback": []}}
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, json=payload)
|
|
adapter.list_callbacks()
|
|
sent_headers = m.last_request.headers
|
|
|
|
assert sent_headers.get("apitoken") == _TOKEN
|
|
|
|
def test_verify_tls_flag_passed(self):
|
|
"""Adapter with verify_tls=True should pass verify=True to requests."""
|
|
adapter_tls = MythicAdapter(url=_BASE_URL, api_token=_TOKEN, verify_tls=True)
|
|
payload = {"data": {"callback": []}}
|
|
# requests-mock intercepts before TLS — just confirm no error path triggered.
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, json=payload)
|
|
callbacks = adapter_tls.list_callbacks()
|
|
assert isinstance(callbacks, list)
|
|
|
|
def test_network_error_raises_c2error(self, adapter):
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, exc=requests.exceptions.ConnectionError("connection refused"))
|
|
with pytest.raises(C2Error):
|
|
adapter.list_callbacks()
|
|
|
|
def test_http_error_raises_c2error(self, adapter):
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, status_code=500, text="Internal Server Error")
|
|
with pytest.raises(C2Error):
|
|
adapter.list_callbacks()
|
|
|
|
|
|
class TestMythicAdapterCreateTask:
|
|
def test_returns_display_id_on_success(self, adapter):
|
|
payload = {
|
|
"data": {
|
|
"createTask": {
|
|
"id": 42,
|
|
"display_id": 7,
|
|
"error": None,
|
|
}
|
|
}
|
|
}
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, json=payload)
|
|
tid = adapter.create_task(callback_display_id=1, command="whoami")
|
|
|
|
assert tid == 7
|
|
|
|
def test_sends_apitoken_header(self, adapter):
|
|
payload = {"data": {"createTask": {"id": 1, "display_id": 1, "error": None}}}
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, json=payload)
|
|
adapter.create_task(1, "cmd")
|
|
sent_headers = m.last_request.headers
|
|
|
|
assert sent_headers.get("apitoken") == _TOKEN
|
|
|
|
def test_error_field_raises_c2error(self, adapter):
|
|
payload = {
|
|
"data": {
|
|
"createTask": {
|
|
"id": None,
|
|
"display_id": None,
|
|
"error": "callback not found",
|
|
}
|
|
}
|
|
}
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, json=payload)
|
|
with pytest.raises(C2Error, match="callback not found"):
|
|
adapter.create_task(1, "whoami")
|
|
|
|
def test_network_error_raises_c2error(self, adapter):
|
|
with rm_module.Mocker() as m:
|
|
m.post(_GQL_URL, exc=requests.exceptions.Timeout("timeout"))
|
|
with pytest.raises(C2Error):
|
|
adapter.create_task(1, "whoami")
|
|
|
|
|
|
class TestMythicAdapterNoRedirects:
|
|
def test_does_not_follow_redirect(self, adapter):
|
|
"""Adapter must not follow HTTP redirects (allow_redirects=False)."""
|
|
with rm_module.Mocker() as m:
|
|
# Simulate a redirect response; requests-mock won't auto-follow it.
|
|
m.post(_GQL_URL, status_code=301, headers={"Location": "https://evil.example/graphql"})
|
|
# With allow_redirects=False the 301 is treated as a non-2xx → raise_for_status raises.
|
|
with pytest.raises(C2Error):
|
|
adapter.list_callbacks()
|
|
# Exactly one request was made — no follow-up to Location.
|
|
assert len(m.request_history) == 1
|