- Add C2Error exception to adapter ABC - Add promote_to_in_progress() helper to simulation_workflow (pending→in_progress) - Flesh out MythicAdapter: list_callbacks() (GraphQL query) + create_task() (mutation) - Expand FakeAdapter to 3 deterministic callbacks; switch task store to per-instance - Add GET /api/engagements/<id>/c2/callbacks — lists active callbacks via adapter - Add POST /api/simulations/<id>/c2/execute — issues tasks, stores C2Task rows, auto-transitions pending→in_progress, blocks on done (409) - Both endpoints: SOC=403, 503 no-key, 502 adapter error, sanitized error messages - Add requests-mock==1.12.1 to requirements.txt - 42 new tests (342 total, 300 M1 baseline preserved green) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
325 lines
11 KiB
Python
325 lines
11 KiB
Python
"""Tests for POST /api/simulations/<id>/c2/execute."""
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
from cryptography.fernet import Fernet
|
|
from flask import Flask
|
|
from flask.testing import FlaskClient
|
|
|
|
from backend.app.extensions import db
|
|
from backend.app.models.c2_task import C2Task
|
|
from backend.app.models.simulation import Simulation, SimulationStatus
|
|
from backend.app.services.c2.adapter import C2Error
|
|
from backend.tests.conftest import auth_headers as _h
|
|
|
|
_FERNET_KEY = Fernet.generate_key().decode()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def set_encryption_key(monkeypatch):
|
|
monkeypatch.setenv("MIMIC_ENCRYPTION_KEY", _FERNET_KEY)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def use_fake_adapter(monkeypatch):
|
|
monkeypatch.setenv("MIMIC_C2_ADAPTER", "fake")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_engagement(client: FlaskClient, token: str) -> dict:
|
|
resp = client.post(
|
|
"/api/engagements",
|
|
headers=_h(token),
|
|
json={"name": "Op Alpha", "start_date": "2026-06-10"},
|
|
)
|
|
assert resp.status_code == 201
|
|
return resp.get_json()
|
|
|
|
|
|
def _put_config(client: FlaskClient, token: str, eid: int) -> None:
|
|
resp = client.put(
|
|
f"/api/engagements/{eid}/c2-config",
|
|
headers=_h(token),
|
|
json={"url": "https://c2.internal:7443", "api_token": "s3cr3t", "verify_tls": True},
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
|
|
def _make_sim(client: FlaskClient, token: str, eid: int) -> dict:
|
|
resp = client.post(
|
|
f"/api/engagements/{eid}/simulations",
|
|
headers=_h(token),
|
|
json={"name": "Sim Alpha"},
|
|
)
|
|
assert resp.status_code == 201
|
|
return resp.get_json()
|
|
|
|
|
|
def _execute(
|
|
client: FlaskClient,
|
|
token: str,
|
|
sid: int,
|
|
commands: list,
|
|
callback_display_id: int = 1,
|
|
):
|
|
return client.post(
|
|
f"/api/simulations/{sid}/c2/execute",
|
|
headers=_h(token),
|
|
json={"callback_display_id": callback_display_id, "commands": commands},
|
|
)
|
|
|
|
|
|
def _advance_to_in_progress(client: FlaskClient, token: str, sid: int) -> None:
|
|
client.patch(
|
|
f"/api/simulations/{sid}",
|
|
headers=_h(token),
|
|
json={"name": "Sim Alpha"},
|
|
)
|
|
|
|
|
|
def _advance_to_review_required(client: FlaskClient, token: str, sid: int) -> None:
|
|
_advance_to_in_progress(client, token, sid)
|
|
client.post(
|
|
f"/api/simulations/{sid}/transition",
|
|
headers=_h(token),
|
|
json={"to": "review_required"},
|
|
)
|
|
|
|
|
|
def _advance_to_done(client: FlaskClient, redteam_token: str, soc_token: str, sid: int) -> None:
|
|
_advance_to_review_required(client, redteam_token, sid)
|
|
client.post(
|
|
f"/api/simulations/{sid}/transition",
|
|
headers=_h(soc_token),
|
|
json={"to": "done"},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Happy path
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestExecuteHappyPath:
|
|
def test_two_commands_create_two_tasks(
|
|
self, app: Flask, client: FlaskClient, admin_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
sim = _make_sim(client, admin_token, eng["id"])
|
|
|
|
resp = _execute(client, admin_token, sim["id"], ["whoami", "ipconfig"])
|
|
assert resp.status_code == 200
|
|
body = resp.get_json()
|
|
assert len(body["tasks"]) == 2
|
|
assert body["tasks"][0]["command"] == "whoami"
|
|
assert body["tasks"][1]["command"] == "ipconfig"
|
|
|
|
with app.app_context():
|
|
rows = C2Task.query.filter_by(simulation_id=sim["id"]).all()
|
|
assert len(rows) == 2
|
|
|
|
def test_task_response_shape(
|
|
self, client: FlaskClient, admin_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
sim = _make_sim(client, admin_token, eng["id"])
|
|
|
|
resp = _execute(client, admin_token, sim["id"], ["hostname"])
|
|
task = resp.get_json()["tasks"][0]
|
|
assert "id" in task
|
|
assert "mythic_task_display_id" in task
|
|
assert "command" in task
|
|
assert "status" in task
|
|
assert "completed" in task
|
|
|
|
def test_pending_sim_transitions_to_in_progress(
|
|
self, app: Flask, client: FlaskClient, admin_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
sim = _make_sim(client, admin_token, eng["id"])
|
|
assert sim["status"] == "pending"
|
|
|
|
_execute(client, admin_token, sim["id"], ["whoami"])
|
|
|
|
with app.app_context():
|
|
updated = db.session.get(Simulation, sim["id"])
|
|
assert updated is not None
|
|
assert updated.status == SimulationStatus.IN_PROGRESS
|
|
|
|
def test_already_in_progress_stays_in_progress(
|
|
self, app: Flask, client: FlaskClient, admin_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
sim = _make_sim(client, admin_token, eng["id"])
|
|
_advance_to_in_progress(client, admin_token, sim["id"])
|
|
|
|
resp = _execute(client, admin_token, sim["id"], ["whoami"])
|
|
assert resp.status_code == 200
|
|
|
|
with app.app_context():
|
|
updated = db.session.get(Simulation, sim["id"])
|
|
assert updated is not None
|
|
assert updated.status == SimulationStatus.IN_PROGRESS
|
|
|
|
def test_review_required_sim_still_allowed(
|
|
self, app: Flask, client: FlaskClient, admin_token: str, soc_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
sim = _make_sim(client, admin_token, eng["id"])
|
|
_advance_to_review_required(client, admin_token, sim["id"])
|
|
|
|
resp = _execute(client, admin_token, sim["id"], ["net use"])
|
|
assert resp.status_code == 200
|
|
|
|
# Status stays review_required — no regression to in_progress.
|
|
with app.app_context():
|
|
updated = db.session.get(Simulation, sim["id"])
|
|
assert updated is not None
|
|
assert updated.status == SimulationStatus.REVIEW_REQUIRED
|
|
|
|
def test_redteam_can_execute(
|
|
self, client: FlaskClient, admin_token: str, redteam_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
sim = _make_sim(client, admin_token, eng["id"])
|
|
|
|
resp = _execute(client, redteam_token, sim["id"], ["whoami"])
|
|
assert resp.status_code == 200
|
|
|
|
def test_mythic_task_display_id_stored(
|
|
self, app: Flask, client: FlaskClient, admin_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
sim = _make_sim(client, admin_token, eng["id"])
|
|
|
|
_execute(client, admin_token, sim["id"], ["whoami"])
|
|
|
|
with app.app_context():
|
|
task = C2Task.query.filter_by(simulation_id=sim["id"]).first()
|
|
assert task is not None
|
|
assert task.mythic_task_display_id == 1000 # FakeAdapter starts at 1000
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Error cases
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestExecuteValidation:
|
|
def test_400_empty_commands(
|
|
self, client: FlaskClient, admin_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
sim = _make_sim(client, admin_token, eng["id"])
|
|
|
|
resp = _execute(client, admin_token, sim["id"], [])
|
|
assert resp.status_code == 400
|
|
|
|
def test_400_non_string_command(
|
|
self, client: FlaskClient, admin_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
sim = _make_sim(client, admin_token, eng["id"])
|
|
|
|
resp = client.post(
|
|
f"/api/simulations/{sim['id']}/c2/execute",
|
|
headers=_h(admin_token),
|
|
json={"callback_display_id": 1, "commands": [42]},
|
|
)
|
|
assert resp.status_code == 400
|
|
|
|
def test_400_missing_callback_display_id(
|
|
self, client: FlaskClient, admin_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
sim = _make_sim(client, admin_token, eng["id"])
|
|
|
|
resp = client.post(
|
|
f"/api/simulations/{sim['id']}/c2/execute",
|
|
headers=_h(admin_token),
|
|
json={"commands": ["whoami"]},
|
|
)
|
|
assert resp.status_code == 400
|
|
|
|
def test_409_done_sim(
|
|
self,
|
|
client: FlaskClient,
|
|
admin_token: str,
|
|
soc_token: str,
|
|
) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
sim = _make_sim(client, admin_token, eng["id"])
|
|
_advance_to_done(client, admin_token, soc_token, sim["id"])
|
|
|
|
resp = _execute(client, admin_token, sim["id"], ["whoami"])
|
|
assert resp.status_code == 409
|
|
assert "done" in resp.get_json().get("error", "").lower()
|
|
|
|
def test_404_simulation_not_found(
|
|
self, client: FlaskClient, admin_token: str
|
|
) -> None:
|
|
resp = _execute(client, admin_token, 9999, ["whoami"])
|
|
assert resp.status_code == 404
|
|
|
|
def test_404_no_c2_config(
|
|
self, client: FlaskClient, admin_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
sim = _make_sim(client, admin_token, eng["id"])
|
|
|
|
resp = _execute(client, admin_token, sim["id"], ["whoami"])
|
|
assert resp.status_code == 404
|
|
|
|
def test_403_soc(
|
|
self, client: FlaskClient, admin_token: str, soc_token: str
|
|
) -> None:
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
sim = _make_sim(client, admin_token, eng["id"])
|
|
|
|
resp = _execute(client, soc_token, sim["id"], ["whoami"])
|
|
assert resp.status_code == 403
|
|
|
|
def test_503_no_key(
|
|
self, monkeypatch, client: FlaskClient, admin_token: str
|
|
) -> None:
|
|
monkeypatch.delenv("MIMIC_ENCRYPTION_KEY", raising=False)
|
|
eng = _make_engagement(client, admin_token)
|
|
sim = _make_sim(client, admin_token, eng["id"])
|
|
|
|
resp = _execute(client, admin_token, sim["id"], ["whoami"])
|
|
assert resp.status_code == 503
|
|
|
|
def test_502_adapter_error(
|
|
self, monkeypatch, client: FlaskClient, admin_token: str
|
|
) -> None:
|
|
from backend.app.services.c2 import fake as fake_mod
|
|
|
|
def _boom(self, callback_display_id, command, params=None):
|
|
raise C2Error("task queue full")
|
|
|
|
monkeypatch.setattr(fake_mod.FakeAdapter, "create_task", _boom)
|
|
|
|
eng = _make_engagement(client, admin_token)
|
|
_put_config(client, admin_token, eng["id"])
|
|
sim = _make_sim(client, admin_token, eng["id"])
|
|
|
|
resp = _execute(client, admin_token, sim["id"], ["whoami"])
|
|
assert resp.status_code == 502
|
|
assert "task queue full" in resp.get_json().get("error", "")
|