"""Tests for POST /api/simulations//c2/execute.""" from __future__ import annotations import pytest from cryptography.fernet import Fernet from flask import Flask from flask.testing import FlaskClient from backend.app.extensions import db from backend.app.models.c2_task import C2Task from backend.app.models.simulation import Simulation, SimulationStatus from backend.app.services.c2.adapter import C2Error from backend.tests.conftest import auth_headers as _h _FERNET_KEY = Fernet.generate_key().decode() @pytest.fixture(autouse=True) def set_encryption_key(monkeypatch): monkeypatch.setenv("MIMIC_ENCRYPTION_KEY", _FERNET_KEY) @pytest.fixture(autouse=True) def use_fake_adapter(monkeypatch): monkeypatch.setenv("MIMIC_C2_ADAPTER", "fake") # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_engagement(client: FlaskClient, token: str) -> dict: resp = client.post( "/api/engagements", headers=_h(token), json={"name": "Op Alpha", "start_date": "2026-06-10"}, ) assert resp.status_code == 201 return resp.get_json() def _put_config(client: FlaskClient, token: str, eid: int) -> None: resp = client.put( f"/api/engagements/{eid}/c2-config", headers=_h(token), json={"url": "https://c2.internal:7443", "api_token": "s3cr3t", "verify_tls": True}, ) assert resp.status_code == 200 def _make_sim(client: FlaskClient, token: str, eid: int) -> dict: resp = client.post( f"/api/engagements/{eid}/simulations", headers=_h(token), json={"name": "Sim Alpha"}, ) assert resp.status_code == 201 return resp.get_json() def _execute( client: FlaskClient, token: str, sid: int, commands: list, callback_display_id: int = 1, ): return client.post( f"/api/simulations/{sid}/c2/execute", headers=_h(token), json={"callback_display_id": callback_display_id, "commands": commands}, ) def _advance_to_in_progress(client: FlaskClient, token: str, sid: int) -> None: client.patch( f"/api/simulations/{sid}", headers=_h(token), json={"name": "Sim Alpha"}, ) def _advance_to_review_required(client: FlaskClient, token: str, sid: int) -> None: _advance_to_in_progress(client, token, sid) client.post( f"/api/simulations/{sid}/transition", headers=_h(token), json={"to": "review_required"}, ) def _advance_to_done(client: FlaskClient, redteam_token: str, soc_token: str, sid: int) -> None: _advance_to_review_required(client, redteam_token, sid) client.post( f"/api/simulations/{sid}/transition", headers=_h(soc_token), json={"to": "done"}, ) # --------------------------------------------------------------------------- # Happy path # --------------------------------------------------------------------------- class TestExecuteHappyPath: def test_two_commands_create_two_tasks( self, app: Flask, client: FlaskClient, admin_token: str ) -> None: eng = _make_engagement(client, admin_token) _put_config(client, admin_token, eng["id"]) sim = _make_sim(client, admin_token, eng["id"]) resp = _execute(client, admin_token, sim["id"], ["whoami", "ipconfig"]) assert resp.status_code == 200 body = resp.get_json() assert len(body["tasks"]) == 2 assert body["tasks"][0]["command"] == "whoami" assert body["tasks"][1]["command"] == "ipconfig" with app.app_context(): rows = C2Task.query.filter_by(simulation_id=sim["id"]).all() assert len(rows) == 2 def test_task_response_shape( self, client: FlaskClient, admin_token: str ) -> None: eng = _make_engagement(client, admin_token) _put_config(client, admin_token, eng["id"]) sim = _make_sim(client, admin_token, eng["id"]) resp = _execute(client, admin_token, sim["id"], ["hostname"]) task = resp.get_json()["tasks"][0] assert "id" in task assert "mythic_task_display_id" in task assert "command" in task assert "status" in task assert "completed" in task def test_pending_sim_transitions_to_in_progress( self, app: Flask, client: FlaskClient, admin_token: str ) -> None: eng = _make_engagement(client, admin_token) _put_config(client, admin_token, eng["id"]) sim = _make_sim(client, admin_token, eng["id"]) assert sim["status"] == "pending" _execute(client, admin_token, sim["id"], ["whoami"]) with app.app_context(): updated = db.session.get(Simulation, sim["id"]) assert updated is not None assert updated.status == SimulationStatus.IN_PROGRESS def test_already_in_progress_stays_in_progress( self, app: Flask, client: FlaskClient, admin_token: str ) -> None: eng = _make_engagement(client, admin_token) _put_config(client, admin_token, eng["id"]) sim = _make_sim(client, admin_token, eng["id"]) _advance_to_in_progress(client, admin_token, sim["id"]) resp = _execute(client, admin_token, sim["id"], ["whoami"]) assert resp.status_code == 200 with app.app_context(): updated = db.session.get(Simulation, sim["id"]) assert updated is not None assert updated.status == SimulationStatus.IN_PROGRESS def test_review_required_sim_still_allowed( self, app: Flask, client: FlaskClient, admin_token: str, soc_token: str ) -> None: eng = _make_engagement(client, admin_token) _put_config(client, admin_token, eng["id"]) sim = _make_sim(client, admin_token, eng["id"]) _advance_to_review_required(client, admin_token, sim["id"]) resp = _execute(client, admin_token, sim["id"], ["net use"]) assert resp.status_code == 200 # Status stays review_required — no regression to in_progress. with app.app_context(): updated = db.session.get(Simulation, sim["id"]) assert updated is not None assert updated.status == SimulationStatus.REVIEW_REQUIRED def test_redteam_can_execute( self, client: FlaskClient, admin_token: str, redteam_token: str ) -> None: eng = _make_engagement(client, admin_token) _put_config(client, admin_token, eng["id"]) sim = _make_sim(client, admin_token, eng["id"]) resp = _execute(client, redteam_token, sim["id"], ["whoami"]) assert resp.status_code == 200 def test_mythic_task_display_id_stored( self, app: Flask, client: FlaskClient, admin_token: str ) -> None: eng = _make_engagement(client, admin_token) _put_config(client, admin_token, eng["id"]) sim = _make_sim(client, admin_token, eng["id"]) _execute(client, admin_token, sim["id"], ["whoami"]) with app.app_context(): task = C2Task.query.filter_by(simulation_id=sim["id"]).first() assert task is not None assert task.mythic_task_display_id == 1000 # FakeAdapter starts at 1000 # --------------------------------------------------------------------------- # Error cases # --------------------------------------------------------------------------- class TestExecuteValidation: def test_400_empty_commands( self, client: FlaskClient, admin_token: str ) -> None: eng = _make_engagement(client, admin_token) _put_config(client, admin_token, eng["id"]) sim = _make_sim(client, admin_token, eng["id"]) resp = _execute(client, admin_token, sim["id"], []) assert resp.status_code == 400 def test_400_non_string_command( self, client: FlaskClient, admin_token: str ) -> None: eng = _make_engagement(client, admin_token) _put_config(client, admin_token, eng["id"]) sim = _make_sim(client, admin_token, eng["id"]) resp = client.post( f"/api/simulations/{sim['id']}/c2/execute", headers=_h(admin_token), json={"callback_display_id": 1, "commands": [42]}, ) assert resp.status_code == 400 def test_400_missing_callback_display_id( self, client: FlaskClient, admin_token: str ) -> None: eng = _make_engagement(client, admin_token) _put_config(client, admin_token, eng["id"]) sim = _make_sim(client, admin_token, eng["id"]) resp = client.post( f"/api/simulations/{sim['id']}/c2/execute", headers=_h(admin_token), json={"commands": ["whoami"]}, ) assert resp.status_code == 400 def test_409_done_sim( self, client: FlaskClient, admin_token: str, soc_token: str, ) -> None: eng = _make_engagement(client, admin_token) _put_config(client, admin_token, eng["id"]) sim = _make_sim(client, admin_token, eng["id"]) _advance_to_done(client, admin_token, soc_token, sim["id"]) resp = _execute(client, admin_token, sim["id"], ["whoami"]) assert resp.status_code == 409 assert "done" in resp.get_json().get("error", "").lower() def test_404_simulation_not_found( self, client: FlaskClient, admin_token: str ) -> None: resp = _execute(client, admin_token, 9999, ["whoami"]) assert resp.status_code == 404 def test_404_no_c2_config( self, client: FlaskClient, admin_token: str ) -> None: eng = _make_engagement(client, admin_token) sim = _make_sim(client, admin_token, eng["id"]) resp = _execute(client, admin_token, sim["id"], ["whoami"]) assert resp.status_code == 404 def test_403_soc( self, client: FlaskClient, admin_token: str, soc_token: str ) -> None: eng = _make_engagement(client, admin_token) _put_config(client, admin_token, eng["id"]) sim = _make_sim(client, admin_token, eng["id"]) resp = _execute(client, soc_token, sim["id"], ["whoami"]) assert resp.status_code == 403 def test_503_no_key( self, monkeypatch, client: FlaskClient, admin_token: str ) -> None: monkeypatch.delenv("MIMIC_ENCRYPTION_KEY", raising=False) eng = _make_engagement(client, admin_token) sim = _make_sim(client, admin_token, eng["id"]) resp = _execute(client, admin_token, sim["id"], ["whoami"]) assert resp.status_code == 503 def test_502_adapter_error( self, monkeypatch, client: FlaskClient, admin_token: str ) -> None: from backend.app.services.c2 import fake as fake_mod def _boom(self, callback_display_id, command, params=None): raise C2Error("task queue full") monkeypatch.setattr(fake_mod.FakeAdapter, "create_task", _boom) eng = _make_engagement(client, admin_token) _put_config(client, admin_token, eng["id"]) sim = _make_sim(client, admin_token, eng["id"]) resp = _execute(client, admin_token, sim["id"], ["whoami"]) assert resp.status_code == 502 assert "task queue full" in resp.get_json().get("error", "")