Command source decision: extended C2TaskStatus with command: str | None (default None). Added command_name to _GET_TASK_QUERY so get_task() returns command in a single round-trip — no separate history fetch needed on import. 4-line change, zero cascading test impact. adapter.py: - C2TaskStatus: add command: str | None = None field - C2HistoricalTask: new dataclass (display_id, command, params, status, completed, timestamp) for history rows - C2TaskPage.items: typed as list[C2HistoricalTask] (was list[dict]) mythic.py: - _GET_TASK_QUERY: add command_name field - _LIST_CALLBACK_TASKS_QUERY: new query (order_by id desc, limit/offset) - _COUNT_CALLBACK_TASKS_QUERY: new aggregate query for total - get_task(): surfaces command_name as status.command - list_callback_tasks(): two _post() calls (tasks + count), allow_redirects=False fake.py: - _FAKE_HISTORY: frozen deterministic history (cb1=12, cb2=0, cb3=5 tasks) - list_callback_tasks(): serves from _FAKE_HISTORY, pagination applied - get_task(): returns command from _tasks dict api/c2.py: - GET /api/engagements/<eid>/c2/callbacks/<cid>/history: page+page_size defaults 1/25, cap 100, reject <1, 502 on adapter error - POST /api/simulations/<sid>/c2/import: idempotent per (sim,mythic_id) pair, source=import, completed tasks get output+mapping_applied, incomplete tasks stored for poll-on-read pickup, auto-transition pending→in_progress 60 new tests (456 total); pytest/ruff/mypy all green Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
216 lines
8.3 KiB
Python
216 lines
8.3 KiB
Python
"""Tests for GET /api/engagements/<id>/c2/callbacks/<cid>/history."""
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
from cryptography.fernet import Fernet
|
|
from flask.testing import FlaskClient
|
|
|
|
from backend.app.services.c2.adapter import C2Error
|
|
from backend.tests.conftest import auth_headers as _h
|
|
|
|
_FERNET_KEY = Fernet.generate_key().decode()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def set_encryption_key(monkeypatch):
|
|
monkeypatch.setenv("MIMIC_ENCRYPTION_KEY", _FERNET_KEY)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def use_fake_adapter(monkeypatch):
|
|
monkeypatch.setenv("MIMIC_C2_ADAPTER", "fake")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_engagement(client: FlaskClient, token: str) -> dict:
|
|
resp = client.post(
|
|
"/api/engagements",
|
|
headers=_h(token),
|
|
json={"name": "Op Alpha", "start_date": "2026-06-10"},
|
|
)
|
|
assert resp.status_code == 201
|
|
return resp.get_json()
|
|
|
|
|
|
def _put_config(client: FlaskClient, token: str, eid: int) -> None:
|
|
resp = client.put(
|
|
f"/api/engagements/{eid}/c2-config",
|
|
headers=_h(token),
|
|
json={"url": "https://c2.internal:7443", "api_token": "s3cr3t", "verify_tls": True},
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
|
|
def _history(client: FlaskClient, token: str, eid: int, cid: int, **params):
|
|
return client.get(
|
|
f"/api/engagements/{eid}/c2/callbacks/{cid}/history",
|
|
headers=_h(token),
|
|
query_string=params,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Happy path
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestHistoryHappyPath:
|
|
def test_returns_200(self, client: FlaskClient, admin_token: str) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
resp = _history(client, admin_token, eng["id"], 1)
|
|
assert resp.status_code == 200
|
|
|
|
def test_response_shape(self, client: FlaskClient, admin_token: str) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
resp = _history(client, admin_token, eng["id"], 1)
|
|
body = resp.get_json()
|
|
assert "tasks" in body
|
|
assert "total" in body
|
|
assert "page" in body
|
|
assert "page_size" in body
|
|
|
|
def test_task_shape(self, client: FlaskClient, admin_token: str) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
resp = _history(client, admin_token, eng["id"], 1)
|
|
task = resp.get_json()["tasks"][0]
|
|
for field in ("display_id", "command", "params", "status", "completed", "timestamp"):
|
|
assert field in task, f"missing field: {field}"
|
|
|
|
def test_default_page_is_1(self, client: FlaskClient, admin_token: str) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
resp = _history(client, admin_token, eng["id"], 1)
|
|
assert resp.get_json()["page"] == 1
|
|
|
|
def test_default_page_size_is_25(self, client: FlaskClient, admin_token: str) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
resp = _history(client, admin_token, eng["id"], 1)
|
|
assert resp.get_json()["page_size"] == 25
|
|
|
|
def test_callback_1_has_12_total(self, client: FlaskClient, admin_token: str) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
resp = _history(client, admin_token, eng["id"], 1)
|
|
assert resp.get_json()["total"] == 12
|
|
|
|
def test_callback_2_has_0_tasks(self, client: FlaskClient, admin_token: str) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
resp = _history(client, admin_token, eng["id"], 2)
|
|
body = resp.get_json()
|
|
assert body["total"] == 0
|
|
assert body["tasks"] == []
|
|
|
|
def test_pagination_page_size_applied(self, client: FlaskClient, admin_token: str) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
resp = _history(client, admin_token, eng["id"], 1, page=1, page_size=5)
|
|
body = resp.get_json()
|
|
assert len(body["tasks"]) == 5
|
|
assert body["page_size"] == 5
|
|
|
|
def test_redteam_can_view_history(
|
|
self, client: FlaskClient, admin_token: str, redteam_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
resp = _history(client, redteam_token, eng["id"], 1)
|
|
assert resp.status_code == 200
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Validation errors
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestHistoryValidation:
|
|
def test_400_page_size_too_large(self, client: FlaskClient, admin_token: str) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
resp = _history(client, admin_token, eng["id"], 1, page_size=101)
|
|
assert resp.status_code == 400
|
|
|
|
def test_400_page_zero(self, client: FlaskClient, admin_token: str) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
resp = _history(client, admin_token, eng["id"], 1, page=0)
|
|
assert resp.status_code == 400
|
|
|
|
def test_400_page_size_zero(self, client: FlaskClient, admin_token: str) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
resp = _history(client, admin_token, eng["id"], 1, page_size=0)
|
|
assert resp.status_code == 400
|
|
|
|
def test_400_page_negative(self, client: FlaskClient, admin_token: str) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
resp = _history(client, admin_token, eng["id"], 1, page=-1)
|
|
assert resp.status_code == 400
|
|
|
|
def test_400_page_size_100_is_ok(self, client: FlaskClient, admin_token: str) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
resp = _history(client, admin_token, eng["id"], 1, page_size=100)
|
|
assert resp.status_code == 200
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Authorization / error cases
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestHistoryErrors:
|
|
def test_403_soc(
|
|
self, client: FlaskClient, admin_token: str, soc_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
resp = _history(client, soc_token, eng["id"], 1)
|
|
assert resp.status_code == 403
|
|
|
|
def test_503_no_key(
|
|
self, monkeypatch, client: FlaskClient, admin_token: str
|
|
) -> None:
|
|
monkeypatch.delenv("MIMIC_ENCRYPTION_KEY", raising=False)
|
|
eng = _make_engagement(client, admin_token)
|
|
resp = _history(client, admin_token, eng["id"], 1)
|
|
assert resp.status_code == 503
|
|
|
|
def test_404_engagement_not_found(
|
|
self, client: FlaskClient, admin_token: str
|
|
) -> None:
|
|
resp = _history(client, admin_token, 9999, 1)
|
|
assert resp.status_code == 404
|
|
|
|
def test_404_no_c2_config(
|
|
self, client: FlaskClient, admin_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
resp = _history(client, admin_token, eng["id"], 1)
|
|
assert resp.status_code == 404
|
|
|
|
def test_502_adapter_error(
|
|
self, monkeypatch, client: FlaskClient, admin_token: str
|
|
) -> None:
|
|
from backend.app.services.c2 import fake as fake_mod
|
|
|
|
def _boom(self, callback_display_id, page=1, page_size=25):
|
|
raise C2Error("upstream error")
|
|
|
|
monkeypatch.setattr(fake_mod.FakeAdapter, "list_callback_tasks", _boom)
|
|
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
resp = _history(client, admin_token, eng["id"], 1)
|
|
assert resp.status_code == 502
|
|
assert "upstream error" in resp.get_json().get("error", "")
|