2026-06-10 19:34:18 +02:00
|
|
|
"""MythicAdapter unit tests — mocked HTTP with requests-mock."""
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
import requests
|
|
|
|
|
import requests_mock as rm_module
|
|
|
|
|
|
|
|
|
|
from backend.app.services.c2.adapter import C2Error
|
|
|
|
|
from backend.app.services.c2.mythic import MythicAdapter
|
|
|
|
|
|
|
|
|
|
_BASE_URL = "https://mythic.lab:7443"
|
|
|
|
|
_GQL_URL = _BASE_URL + "/graphql"
|
|
|
|
|
_TOKEN = "fake-api-token"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
|
|
def adapter():
|
|
|
|
|
return MythicAdapter(url=_BASE_URL, api_token=_TOKEN, verify_tls=False)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestMythicAdapterListCallbacks:
|
|
|
|
|
def test_returns_callbacks_from_graphql(self, adapter):
|
|
|
|
|
payload = {
|
|
|
|
|
"data": {
|
|
|
|
|
"callback": [
|
|
|
|
|
{
|
|
|
|
|
"id": 1,
|
|
|
|
|
"display_id": 1,
|
|
|
|
|
"active": True,
|
|
|
|
|
"host": "HOST-01",
|
|
|
|
|
"user": "jdoe",
|
|
|
|
|
"domain": "LAB",
|
|
|
|
|
"last_checkin": "2026-06-10T00:00:00Z",
|
|
|
|
|
}
|
|
|
|
|
]
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
with rm_module.Mocker() as m:
|
|
|
|
|
m.post(_GQL_URL, json=payload)
|
|
|
|
|
callbacks = adapter.list_callbacks()
|
|
|
|
|
|
|
|
|
|
assert len(callbacks) == 1
|
|
|
|
|
assert callbacks[0].display_id == 1
|
|
|
|
|
assert callbacks[0].host == "HOST-01"
|
|
|
|
|
assert callbacks[0].user == "jdoe"
|
|
|
|
|
|
|
|
|
|
def test_sends_apitoken_header(self, adapter):
|
|
|
|
|
payload = {"data": {"callback": []}}
|
|
|
|
|
with rm_module.Mocker() as m:
|
|
|
|
|
m.post(_GQL_URL, json=payload)
|
|
|
|
|
adapter.list_callbacks()
|
|
|
|
|
sent_headers = m.last_request.headers
|
|
|
|
|
|
|
|
|
|
assert sent_headers.get("apitoken") == _TOKEN
|
|
|
|
|
|
|
|
|
|
def test_verify_tls_flag_passed(self):
|
|
|
|
|
"""Adapter with verify_tls=True should pass verify=True to requests."""
|
|
|
|
|
adapter_tls = MythicAdapter(url=_BASE_URL, api_token=_TOKEN, verify_tls=True)
|
|
|
|
|
payload = {"data": {"callback": []}}
|
|
|
|
|
# requests-mock intercepts before TLS — just confirm no error path triggered.
|
|
|
|
|
with rm_module.Mocker() as m:
|
|
|
|
|
m.post(_GQL_URL, json=payload)
|
|
|
|
|
callbacks = adapter_tls.list_callbacks()
|
|
|
|
|
assert isinstance(callbacks, list)
|
|
|
|
|
|
|
|
|
|
def test_network_error_raises_c2error(self, adapter):
|
|
|
|
|
with rm_module.Mocker() as m:
|
|
|
|
|
m.post(_GQL_URL, exc=requests.exceptions.ConnectionError("connection refused"))
|
|
|
|
|
with pytest.raises(C2Error):
|
|
|
|
|
adapter.list_callbacks()
|
|
|
|
|
|
|
|
|
|
def test_http_error_raises_c2error(self, adapter):
|
|
|
|
|
with rm_module.Mocker() as m:
|
|
|
|
|
m.post(_GQL_URL, status_code=500, text="Internal Server Error")
|
|
|
|
|
with pytest.raises(C2Error):
|
|
|
|
|
adapter.list_callbacks()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestMythicAdapterCreateTask:
|
|
|
|
|
def test_returns_display_id_on_success(self, adapter):
|
|
|
|
|
payload = {
|
|
|
|
|
"data": {
|
|
|
|
|
"createTask": {
|
|
|
|
|
"id": 42,
|
|
|
|
|
"display_id": 7,
|
|
|
|
|
"error": None,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
with rm_module.Mocker() as m:
|
|
|
|
|
m.post(_GQL_URL, json=payload)
|
|
|
|
|
tid = adapter.create_task(callback_display_id=1, command="whoami")
|
|
|
|
|
|
|
|
|
|
assert tid == 7
|
|
|
|
|
|
|
|
|
|
def test_sends_apitoken_header(self, adapter):
|
|
|
|
|
payload = {"data": {"createTask": {"id": 1, "display_id": 1, "error": None}}}
|
|
|
|
|
with rm_module.Mocker() as m:
|
|
|
|
|
m.post(_GQL_URL, json=payload)
|
|
|
|
|
adapter.create_task(1, "cmd")
|
|
|
|
|
sent_headers = m.last_request.headers
|
|
|
|
|
|
|
|
|
|
assert sent_headers.get("apitoken") == _TOKEN
|
|
|
|
|
|
|
|
|
|
def test_error_field_raises_c2error(self, adapter):
|
|
|
|
|
payload = {
|
|
|
|
|
"data": {
|
|
|
|
|
"createTask": {
|
|
|
|
|
"id": None,
|
|
|
|
|
"display_id": None,
|
|
|
|
|
"error": "callback not found",
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
with rm_module.Mocker() as m:
|
|
|
|
|
m.post(_GQL_URL, json=payload)
|
|
|
|
|
with pytest.raises(C2Error, match="callback not found"):
|
|
|
|
|
adapter.create_task(1, "whoami")
|
|
|
|
|
|
|
|
|
|
def test_network_error_raises_c2error(self, adapter):
|
|
|
|
|
with rm_module.Mocker() as m:
|
|
|
|
|
m.post(_GQL_URL, exc=requests.exceptions.Timeout("timeout"))
|
|
|
|
|
with pytest.raises(C2Error):
|
|
|
|
|
adapter.create_task(1, "whoami")
|
|
|
|
|
|
|
|
|
|
|
2026-06-10 20:28:49 +02:00
|
|
|
class TestMythicAdapterErrorSanitization:
|
|
|
|
|
def test_connection_error_message_does_not_contain_url(self, adapter):
|
|
|
|
|
"""C2Error message must not expose the configured Mythic URL."""
|
|
|
|
|
with rm_module.Mocker() as m:
|
|
|
|
|
m.post(_GQL_URL, exc=requests.exceptions.ConnectionError(
|
|
|
|
|
f"HTTPSConnectionPool(host='{_BASE_URL}', port=7443): Max retries exceeded"
|
|
|
|
|
))
|
|
|
|
|
with pytest.raises(C2Error) as exc_info:
|
|
|
|
|
adapter.list_callbacks()
|
|
|
|
|
|
|
|
|
|
assert _BASE_URL not in str(exc_info.value)
|
|
|
|
|
assert "ConnectionError" in str(exc_info.value)
|
|
|
|
|
|
|
|
|
|
|
2026-06-10 19:34:18 +02:00
|
|
|
class TestMythicAdapterNoRedirects:
|
|
|
|
|
def test_does_not_follow_redirect(self, adapter):
|
|
|
|
|
"""Adapter must not follow HTTP redirects (allow_redirects=False)."""
|
|
|
|
|
with rm_module.Mocker() as m:
|
|
|
|
|
# Simulate a redirect response; requests-mock won't auto-follow it.
|
|
|
|
|
m.post(_GQL_URL, status_code=301, headers={"Location": "https://evil.example/graphql"})
|
|
|
|
|
# With allow_redirects=False the 301 is treated as a non-2xx → raise_for_status raises.
|
|
|
|
|
with pytest.raises(C2Error):
|
|
|
|
|
adapter.list_callbacks()
|
|
|
|
|
# Exactly one request was made — no follow-up to Location.
|
|
|
|
|
assert len(m.request_history) == 1
|