Files
mimic/backend/tests/test_c2_adapter_mythic_m3.py

189 lines
6.1 KiB
Python
Raw Permalink Normal View History

"""MythicAdapter M3 tests — get_task and get_task_output, mocked HTTP."""
from __future__ import annotations
import pytest
import requests
import requests_mock as rm_module
from backend.app.services.c2.adapter import C2Error
from backend.app.services.c2.mythic import MythicAdapter
_BASE_URL = "https://mythic.lab:7443"
_GQL_URL = _BASE_URL + "/graphql"
_TOKEN = "fake-api-token"
@pytest.fixture()
def adapter():
return MythicAdapter(url=_BASE_URL, api_token=_TOKEN, verify_tls=False)
class TestMythicAdapterGetTask:
def test_returns_status_for_incomplete_task(self, adapter):
payload = {
"data": {
"task": [
{
"display_id": 7,
"status": "processing",
"completed": False,
"timestamp": None,
}
]
}
}
with rm_module.Mocker() as m:
m.post(_GQL_URL, json=payload)
status = adapter.get_task(7)
assert status.display_id == 7
assert status.status == "processing"
assert status.completed is False
assert status.completed_at is None
def test_returns_completed_at_for_completed_task(self, adapter):
payload = {
"data": {
"task": [
{
"display_id": 7,
"status": "completed",
"completed": True,
"timestamp": "2026-06-10T12:00:00Z",
}
]
}
}
with rm_module.Mocker() as m:
m.post(_GQL_URL, json=payload)
status = adapter.get_task(7)
assert status.completed is True
assert status.completed_at is not None
assert status.completed_at.year == 2026
def test_raises_when_task_not_found(self, adapter):
payload = {"data": {"task": []}}
with rm_module.Mocker() as m:
m.post(_GQL_URL, json=payload)
with pytest.raises(C2Error, match="not found"):
adapter.get_task(999)
def test_sends_apitoken_header(self, adapter):
payload = {
"data": {
"task": [
{"display_id": 1, "status": "submitted", "completed": False, "timestamp": None}
]
}
}
with rm_module.Mocker() as m:
m.post(_GQL_URL, json=payload)
adapter.get_task(1)
assert m.last_request.headers.get("apitoken") == _TOKEN
def test_network_error_raises_c2error(self, adapter):
with rm_module.Mocker() as m:
m.post(_GQL_URL, exc=requests.exceptions.ConnectionError("refused"))
with pytest.raises(C2Error):
adapter.get_task(1)
def test_no_redirect_followed(self, adapter):
"""get_task must not follow HTTP redirects."""
with rm_module.Mocker() as m:
m.post(_GQL_URL, status_code=301, headers={"Location": "https://evil.example/"})
with pytest.raises(C2Error):
adapter.get_task(1)
assert len(m.request_history) == 1
def test_invalid_timestamp_does_not_crash(self, adapter):
"""A malformed timestamp field falls back to completed_at=None without raising."""
payload = {
"data": {
"task": [
{
"display_id": 5,
"status": "completed",
"completed": True,
"timestamp": "not-a-date",
}
]
}
}
with rm_module.Mocker() as m:
m.post(_GQL_URL, json=payload)
status = adapter.get_task(5)
assert status.completed is True
assert status.completed_at is None
class TestMythicAdapterGetTaskOutput:
def test_returns_decoded_output(self, adapter):
import base64
encoded = base64.b64encode(b"Administrator\r\n").decode()
payload = {
"data": {
"response": [{"response_text": encoded}]
}
}
with rm_module.Mocker() as m:
m.post(_GQL_URL, json=payload)
output = adapter.get_task_output(7)
assert "Administrator" in output
def test_concatenates_multiple_responses(self, adapter):
import base64
r1 = base64.b64encode(b"line one\n").decode()
r2 = base64.b64encode(b"line two\n").decode()
payload = {
"data": {
"response": [{"response_text": r1}, {"response_text": r2}]
}
}
with rm_module.Mocker() as m:
m.post(_GQL_URL, json=payload)
output = adapter.get_task_output(7)
assert "line one" in output
assert "line two" in output
def test_returns_empty_string_when_no_responses(self, adapter):
payload = {"data": {"response": []}}
with rm_module.Mocker() as m:
m.post(_GQL_URL, json=payload)
output = adapter.get_task_output(7)
assert output == ""
def test_skips_empty_response_text(self, adapter):
import base64
encoded = base64.b64encode(b"real output").decode()
payload = {
"data": {
"response": [
{"response_text": ""},
{"response_text": encoded},
]
}
}
with rm_module.Mocker() as m:
m.post(_GQL_URL, json=payload)
output = adapter.get_task_output(7)
assert output == "real output"
def test_network_error_raises_c2error(self, adapter):
with rm_module.Mocker() as m:
m.post(_GQL_URL, exc=requests.exceptions.Timeout("timeout"))
with pytest.raises(C2Error):
adapter.get_task_output(7)
def test_no_redirect_followed(self, adapter):
with rm_module.Mocker() as m:
m.post(_GQL_URL, status_code=302, headers={"Location": "https://evil.example/"})
with pytest.raises(C2Error):
adapter.get_task_output(1)
assert len(m.request_history) == 1