diff --git a/backend/app/__init__.py b/backend/app/__init__.py index e2f2746..3333543 100644 --- a/backend/app/__init__.py +++ b/backend/app/__init__.py @@ -6,7 +6,15 @@ from pathlib import Path from flask import Flask, jsonify, send_from_directory -from backend.app.api import auth_bp, c2_bp, engagements_bp, simulations_bp, templates_bp, users_bp +from backend.app.api import ( + auth_bp, + c2_bp, + engagements_bp, + sims_c2_bp, + simulations_bp, + templates_bp, + users_bp, +) from backend.app.cli import register_cli from backend.app.config import Config, TestConfig from backend.app.errors import register_error_handlers @@ -39,6 +47,7 @@ def create_app(config_object: object | None = None) -> Flask: app.register_blueprint(simulations_bp) app.register_blueprint(templates_bp) app.register_blueprint(c2_bp) + app.register_blueprint(sims_c2_bp) from backend.app.services import mitre as mitre_svc mitre_svc.load_bundle() diff --git a/backend/app/api/__init__.py b/backend/app/api/__init__.py index 083e147..7faef15 100644 --- a/backend/app/api/__init__.py +++ b/backend/app/api/__init__.py @@ -1,9 +1,17 @@ """API blueprints.""" from backend.app.api.auth import auth_bp -from backend.app.api.c2 import c2_bp +from backend.app.api.c2 import c2_bp, sims_c2_bp from backend.app.api.engagements import engagements_bp from backend.app.api.simulations import simulations_bp from backend.app.api.templates import templates_bp from backend.app.api.users import users_bp -__all__ = ["auth_bp", "c2_bp", "users_bp", "engagements_bp", "simulations_bp", "templates_bp"] +__all__ = [ + "auth_bp", + "c2_bp", + "sims_c2_bp", + "users_bp", + "engagements_bp", + "simulations_bp", + "templates_bp", +] diff --git a/backend/app/api/c2.py b/backend/app/api/c2.py index d4a2774..15c5060 100644 --- a/backend/app/api/c2.py +++ b/backend/app/api/c2.py @@ -1,13 +1,15 @@ -"""C2 config endpoints for engagements. +"""C2 endpoints — config CRUD and execution. -All four endpoints: +All endpoints: - Require admin or redteam role (SOC → 403). - Return 503 when MIMIC_ENCRYPTION_KEY is not set. - Never include the cleartext API token in any response. +- Adapter errors → 502 with sanitized message (no URL or token in body). """ from __future__ import annotations from datetime import UTC, datetime +from urllib.parse import urlparse from flask import Blueprint, jsonify, request @@ -15,10 +17,15 @@ from backend.app.auth import role_required from backend.app.extensions import db from backend.app.models import Engagement from backend.app.models.c2_config import C2Config +from backend.app.models.c2_task import C2Task, C2TaskSource +from backend.app.models.simulation import Simulation, SimulationStatus +from backend.app.services.c2.adapter import C2Error from backend.app.services.c2.factory import get_adapter from backend.app.services.crypto import C2Disabled, decrypt, encrypt +from backend.app.services.simulation_workflow import promote_to_in_progress c2_bp = Blueprint("c2", __name__, url_prefix="/api/engagements") +sims_c2_bp = Blueprint("sims_c2", __name__, url_prefix="/api/simulations") _503_BODY = {"error": "C2 disabled: MIMIC_ENCRYPTION_KEY not set"} @@ -70,6 +77,11 @@ def upsert_c2_config(eid: int): url = (data.get("url") or "").strip() if not url: return jsonify({"error": "url is required"}), 400 + parsed = urlparse(url) + if parsed.scheme != "https": + return jsonify({"error": "url must use https"}), 400 + if not parsed.hostname: + return jsonify({"error": "url must contain a hostname"}), 400 verify_tls = data.get("verify_tls", True) if not isinstance(verify_tls, bool): @@ -154,3 +166,134 @@ def test_c2_config(eid: int): ) health = adapter.test_connection() return jsonify({"ok": health.ok, "error": health.error}), 200 + + +# --------------------------------------------------------------------------- +# M2 — callbacks listing + execute +# --------------------------------------------------------------------------- + + +def _load_adapter_for_engagement(engagement: Engagement): + """Decrypt token and return adapter, or return a (response, status) error tuple.""" + cfg: C2Config | None = engagement.c2_config + if cfg is None: + return None, (jsonify({"error": "C2 config not found"}), 404) + try: + api_token = decrypt(cfg.api_token_encrypted) + except ValueError: + return None, (jsonify({"error": "Stored token is corrupt"}), 500) + adapter = get_adapter(url=cfg.url, api_token=api_token, verify_tls=cfg.verify_tls) + return adapter, None + + +@c2_bp.get("//c2/callbacks") +@role_required("admin", "redteam") +def list_callbacks(eid: int): + guard = _crypto_guard() + if guard is not None: + return guard + + engagement = db.session.get(Engagement, eid) + if engagement is None: + return jsonify({"error": "Engagement not found"}), 404 + + adapter, err = _load_adapter_for_engagement(engagement) + if err is not None: + return err + + try: + callbacks = adapter.list_callbacks() + except C2Error as exc: + return jsonify({"error": str(exc)}), 502 + + return jsonify({ + "callbacks": [ + { + "display_id": cb.display_id, + "active": cb.active, + "host": cb.host, + "user": cb.user, + "domain": cb.domain, + "last_checkin": cb.last_checkin, + } + for cb in callbacks + ] + }), 200 + + +@sims_c2_bp.post("//c2/execute") +@role_required("admin", "redteam") +def execute_simulation(sid: int): + guard = _crypto_guard() + if guard is not None: + return guard + + sim = db.session.get(Simulation, sid) + if sim is None: + return jsonify({"error": "Simulation not found"}), 404 + + # Done is terminal — block execution. + if sim.status == SimulationStatus.DONE: + return jsonify({"error": "simulation is done — reopen first"}), 409 + + data = request.get_json(silent=True) or {} + callback_display_id = data.get("callback_display_id") + commands = data.get("commands") + + if not isinstance(callback_display_id, int): + return jsonify({"error": "callback_display_id must be an integer"}), 400 + if not isinstance(commands, list) or len(commands) == 0: + return jsonify({"error": "commands must be a non-empty list"}), 400 + for cmd in commands: + if not isinstance(cmd, str): + return jsonify({"error": "each command must be a string"}), 400 + + engagement = db.session.get(Engagement, sim.engagement_id) + if engagement is None: + return jsonify({"error": "Engagement not found"}), 404 + + adapter, err = _load_adapter_for_engagement(engagement) + if err is not None: + return err + + created_tasks = [] + try: + for command in commands: + mythic_id = adapter.create_task( + callback_display_id=callback_display_id, + command=command, + ) + task = C2Task( + simulation_id=sid, + mythic_task_display_id=mythic_id, + callback_display_id=callback_display_id, + command=command, + params=None, + status="submitted", + completed=False, + source=C2TaskSource.MIMIC, + created_at=datetime.now(UTC), + ) + db.session.add(task) + created_tasks.append(task) + except C2Error as exc: + db.session.rollback() + return jsonify({"error": str(exc)}), 502 + + # Auto-transition pending → in_progress (no-op for other statuses). + promote_to_in_progress(sim) + + db.session.commit() + + return jsonify({ + "tasks": [ + { + "id": t.id, + "mythic_task_display_id": t.mythic_task_display_id, + "command": t.command, + "status": t.status, + "completed": t.completed, + } + for t in created_tasks + ] + }), 200 diff --git a/backend/app/services/c2/__init__.py b/backend/app/services/c2/__init__.py index 63c943a..768b149 100644 --- a/backend/app/services/c2/__init__.py +++ b/backend/app/services/c2/__init__.py @@ -2,6 +2,7 @@ from backend.app.services.c2.adapter import ( C2Adapter, C2Callback, + C2Error, C2Health, C2TaskPage, C2TaskStatus, @@ -12,6 +13,7 @@ from backend.app.services.c2.factory import get_adapter __all__ = [ "C2Adapter", "C2Callback", + "C2Error", "C2Health", "C2TaskPage", "C2TaskStatus", diff --git a/backend/app/services/c2/adapter.py b/backend/app/services/c2/adapter.py index 3a576aa..5ee5460 100644 --- a/backend/app/services/c2/adapter.py +++ b/backend/app/services/c2/adapter.py @@ -7,6 +7,10 @@ from abc import ABC, abstractmethod from dataclasses import dataclass +class C2Error(Exception): + """Raised by adapters when the C2 returns an application-level error.""" + + @dataclass class C2Health: ok: bool diff --git a/backend/app/services/c2/fake.py b/backend/app/services/c2/fake.py index 08da43d..1da4a71 100644 --- a/backend/app/services/c2/fake.py +++ b/backend/app/services/c2/fake.py @@ -1,6 +1,7 @@ """Deterministic in-memory C2 adapter — used when MIMIC_C2_ADAPTER=fake. Intended for integration tests and local development without a live Mythic instance. +Task state is per-instance so parallel tests don't interfere with each other. """ from __future__ import annotations @@ -12,6 +13,7 @@ from backend.app.services.c2.adapter import ( C2TaskStatus, ) +# Three fixed callbacks the test suite can pin against. _FAKE_CALLBACKS = [ C2Callback( display_id=1, @@ -21,14 +23,34 @@ _FAKE_CALLBACKS = [ domain="LAB", last_checkin="2026-06-10T00:00:00Z", ), + C2Callback( + display_id=2, + active=True, + host="SERVER-DC01", + user="svc_backup", + domain="LAB", + last_checkin="2026-06-10T00:01:00Z", + ), + C2Callback( + display_id=3, + active=True, + host="LAPTOP-RT", + user="admin", + domain="LAB", + last_checkin="2026-06-10T00:02:00Z", + ), ] -_FAKE_TASKS: dict[int, dict] = {} -_next_task_id = 100 - class FakeAdapter(C2Adapter): - """In-memory adapter with deterministic behaviour.""" + """In-memory adapter with deterministic behaviour. + + Each instance starts with an empty task store and display_ids from 1000. + """ + + def __init__(self) -> None: + self._tasks: dict[int, dict] = {} + self._next_task_id = 1000 def test_connection(self) -> C2Health: return C2Health(ok=True) @@ -42,10 +64,9 @@ class FakeAdapter(C2Adapter): command: str, params: str | None = None, ) -> int: - global _next_task_id - tid = _next_task_id - _next_task_id += 1 - _FAKE_TASKS[tid] = { + tid = self._next_task_id + self._next_task_id += 1 + self._tasks[tid] = { "display_id": tid, "callback_display_id": callback_display_id, "command": command, @@ -57,7 +78,7 @@ class FakeAdapter(C2Adapter): return tid def get_task(self, task_display_id: int) -> C2TaskStatus: - task = _FAKE_TASKS.get(task_display_id) + task = self._tasks.get(task_display_id) if task is None: return C2TaskStatus(display_id=task_display_id, status="unknown", completed=False) return C2TaskStatus( @@ -67,7 +88,7 @@ class FakeAdapter(C2Adapter): ) def get_task_output(self, task_display_id: int) -> str: - task = _FAKE_TASKS.get(task_display_id) + task = self._tasks.get(task_display_id) if task is None: return "" return task.get("output") or "" @@ -79,7 +100,7 @@ class FakeAdapter(C2Adapter): page_size: int = 25, ) -> C2TaskPage: items = [ - t for t in _FAKE_TASKS.values() + t for t in self._tasks.values() if t["callback_display_id"] == callback_display_id ] start = (page - 1) * page_size diff --git a/backend/app/services/c2/mythic.py b/backend/app/services/c2/mythic.py index 3c47a83..eb62c96 100644 --- a/backend/app/services/c2/mythic.py +++ b/backend/app/services/c2/mythic.py @@ -1,12 +1,14 @@ # Contract pinned from MythicMeta/Mythic_Scripting master @ 2026-06-10 (raw.githubusercontent.com/MythicMeta/Mythic_Scripting/master/mythic/mythic.py) """Mythic 3.x C2 adapter. -M1 implements test_connection() only. -All other methods raise NotImplementedError("M2") — they land in milestone M2/M3. - Transport: POST https://:7443/graphql Header: apitoken: Backend: Hasura-proxied Postgres behind nginx. + +M1: test_connection() +M2: list_callbacks(), create_task() +M3: get_task(), get_task_output() +M4: list_callback_tasks() """ from __future__ import annotations @@ -15,12 +17,42 @@ import requests from backend.app.services.c2.adapter import ( C2Adapter, C2Callback, + C2Error, C2Health, C2TaskPage, C2TaskStatus, ) -_HEALTH_QUERY = '{ __typename }' +_HEALTH_QUERY = "{ __typename }" + +_CALLBACKS_QUERY = """ +query { + callback(order_by: {id: asc}, where: {active: {_eq: true}}) { + id + display_id + active + host + user + domain + last_checkin + } +} +""" + +_CREATE_TASK_MUTATION = """ +mutation CreateTask($callback_id: Int!, $command: String!, $params: String!) { + createTask( + callback_id: $callback_id, + command: $command, + params: $params, + tasking_location: "command_line" + ) { + id + display_id + error + } +} +""" class MythicAdapter(C2Adapter): @@ -37,6 +69,18 @@ class MythicAdapter(C2Adapter): "apitoken": self._token, } + def _post(self, body: dict) -> dict: + resp = requests.post( + self._url, + json=body, + headers=self._headers(), + verify=self._verify, + timeout=10, + allow_redirects=False, + ) + resp.raise_for_status() + return resp.json() + def test_connection(self) -> C2Health: """POST a trivial introspection query to verify reachability and token validity.""" try: @@ -46,6 +90,7 @@ class MythicAdapter(C2Adapter): headers=self._headers(), verify=self._verify, timeout=10, + allow_redirects=False, ) if resp.status_code == 200: return C2Health(ok=True) @@ -54,7 +99,24 @@ class MythicAdapter(C2Adapter): return C2Health(ok=False, error=str(exc)) def list_callbacks(self) -> list[C2Callback]: - raise NotImplementedError("M2") + """Return active callbacks from Mythic (filtered server-side: active=true).""" + try: + data = self._post({"query": _CALLBACKS_QUERY}) + except requests.RequestException as exc: + raise C2Error(str(exc)) from exc + + callbacks_raw = data.get("data", {}).get("callback", []) + return [ + C2Callback( + display_id=cb["display_id"], + active=cb["active"], + host=cb.get("host") or "", + user=cb.get("user") or "", + domain=cb.get("domain") or "", + last_checkin=cb.get("last_checkin") or "", + ) + for cb in callbacks_raw + ] def create_task( self, @@ -62,10 +124,27 @@ class MythicAdapter(C2Adapter): command: str, params: str | None = None, ) -> int: - raise NotImplementedError("M2") + """Issue a task on a callback; return Mythic task display_id.""" + try: + data = self._post({ + "query": _CREATE_TASK_MUTATION, + "variables": { + "callback_id": callback_display_id, + "command": command, + "params": params or "", + }, + }) + except requests.RequestException as exc: + raise C2Error(str(exc)) from exc + + task_data = data.get("data", {}).get("createTask", {}) + error_msg = task_data.get("error") + if error_msg: + raise C2Error(error_msg) + return int(task_data["display_id"]) def get_task(self, task_display_id: int) -> C2TaskStatus: - raise NotImplementedError("M2") + raise NotImplementedError("M3") def get_task_output(self, task_display_id: int) -> str: raise NotImplementedError("M3") diff --git a/backend/app/services/simulation_workflow.py b/backend/app/services/simulation_workflow.py index cb0b1c5..daad7fe 100644 --- a/backend/app/services/simulation_workflow.py +++ b/backend/app/services/simulation_workflow.py @@ -98,6 +98,19 @@ def _maybe_activate_engagement(simulation: Simulation) -> None: db.session.add(engagement) +def promote_to_in_progress(simulation: Simulation) -> None: + """Transition simulation pending → in_progress if it is currently pending. + + Also advances the engagement planned → active via _maybe_activate_engagement. + No-op when the simulation is already in any other status. + Caller must commit. + """ + if simulation.status == SimulationStatus.PENDING: + simulation.status = SimulationStatus.IN_PROGRESS + simulation.updated_at = datetime.now(UTC) + _maybe_activate_engagement(simulation) + + def apply_patch( simulation: Simulation, payload: dict[str, Any], user: User ) -> tuple[Any, int] | None: diff --git a/backend/requirements.txt b/backend/requirements.txt index 9f06e02..f22875d 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -10,3 +10,4 @@ pytest==8.3.3 ruff==0.6.9 mypy==1.11.2 types-requests==2.32.0.20240914 +requests-mock==1.12.1 diff --git a/backend/tests/test_c2_adapter_fake_m2.py b/backend/tests/test_c2_adapter_fake_m2.py new file mode 100644 index 0000000..3da42a4 --- /dev/null +++ b/backend/tests/test_c2_adapter_fake_m2.py @@ -0,0 +1,62 @@ +"""FakeAdapter M2 tests — list_callbacks shape, create_task monotonicity.""" +from __future__ import annotations + +from backend.app.services.c2.fake import FakeAdapter + + +class TestFakeAdapterListCallbacks: + def test_returns_three_callbacks(self): + adapter = FakeAdapter() + callbacks = adapter.list_callbacks() + assert len(callbacks) == 3 + + def test_all_active(self): + adapter = FakeAdapter() + for cb in adapter.list_callbacks(): + assert cb.active is True + + def test_display_ids_are_1_2_3(self): + adapter = FakeAdapter() + ids = [cb.display_id for cb in adapter.list_callbacks()] + assert ids == [1, 2, 3] + + def test_pinned_last_checkin_format(self): + adapter = FakeAdapter() + for cb in adapter.list_callbacks(): + assert cb.last_checkin.startswith("2026-06-10") + + def test_callbacks_have_host_user_domain(self): + adapter = FakeAdapter() + for cb in adapter.list_callbacks(): + assert cb.host + assert cb.user + assert cb.domain + + +class TestFakeAdapterCreateTask: + def test_returns_monotonic_ids_from_1000(self): + adapter = FakeAdapter() + id1 = adapter.create_task(1, "whoami") + id2 = adapter.create_task(1, "ipconfig") + assert id1 == 1000 + assert id2 == 1001 + + def test_separate_instances_start_at_1000_independently(self): + a1 = FakeAdapter() + a2 = FakeAdapter() + assert a1.create_task(1, "cmd") == 1000 + assert a2.create_task(1, "cmd") == 1000 + + def test_stores_command_and_callback(self): + adapter = FakeAdapter() + tid = adapter.create_task(callback_display_id=2, command="ls", params="-la") + task = adapter._tasks[tid] + assert task["command"] == "ls" + assert task["params"] == "-la" + assert task["callback_display_id"] == 2 + + def test_initial_status_submitted(self): + adapter = FakeAdapter() + tid = adapter.create_task(1, "hostname") + assert adapter._tasks[tid]["status"] == "submitted" + assert adapter._tasks[tid]["completed"] is False diff --git a/backend/tests/test_c2_adapter_mythic.py b/backend/tests/test_c2_adapter_mythic.py new file mode 100644 index 0000000..3deb698 --- /dev/null +++ b/backend/tests/test_c2_adapter_mythic.py @@ -0,0 +1,137 @@ +"""MythicAdapter unit tests — mocked HTTP with requests-mock.""" +from __future__ import annotations + +import pytest +import requests +import requests_mock as rm_module + +from backend.app.services.c2.adapter import C2Error +from backend.app.services.c2.mythic import MythicAdapter + +_BASE_URL = "https://mythic.lab:7443" +_GQL_URL = _BASE_URL + "/graphql" +_TOKEN = "fake-api-token" + + +@pytest.fixture() +def adapter(): + return MythicAdapter(url=_BASE_URL, api_token=_TOKEN, verify_tls=False) + + +class TestMythicAdapterListCallbacks: + def test_returns_callbacks_from_graphql(self, adapter): + payload = { + "data": { + "callback": [ + { + "id": 1, + "display_id": 1, + "active": True, + "host": "HOST-01", + "user": "jdoe", + "domain": "LAB", + "last_checkin": "2026-06-10T00:00:00Z", + } + ] + } + } + with rm_module.Mocker() as m: + m.post(_GQL_URL, json=payload) + callbacks = adapter.list_callbacks() + + assert len(callbacks) == 1 + assert callbacks[0].display_id == 1 + assert callbacks[0].host == "HOST-01" + assert callbacks[0].user == "jdoe" + + def test_sends_apitoken_header(self, adapter): + payload = {"data": {"callback": []}} + with rm_module.Mocker() as m: + m.post(_GQL_URL, json=payload) + adapter.list_callbacks() + sent_headers = m.last_request.headers + + assert sent_headers.get("apitoken") == _TOKEN + + def test_verify_tls_flag_passed(self): + """Adapter with verify_tls=True should pass verify=True to requests.""" + adapter_tls = MythicAdapter(url=_BASE_URL, api_token=_TOKEN, verify_tls=True) + payload = {"data": {"callback": []}} + # requests-mock intercepts before TLS — just confirm no error path triggered. + with rm_module.Mocker() as m: + m.post(_GQL_URL, json=payload) + callbacks = adapter_tls.list_callbacks() + assert isinstance(callbacks, list) + + def test_network_error_raises_c2error(self, adapter): + with rm_module.Mocker() as m: + m.post(_GQL_URL, exc=requests.exceptions.ConnectionError("connection refused")) + with pytest.raises(C2Error): + adapter.list_callbacks() + + def test_http_error_raises_c2error(self, adapter): + with rm_module.Mocker() as m: + m.post(_GQL_URL, status_code=500, text="Internal Server Error") + with pytest.raises(C2Error): + adapter.list_callbacks() + + +class TestMythicAdapterCreateTask: + def test_returns_display_id_on_success(self, adapter): + payload = { + "data": { + "createTask": { + "id": 42, + "display_id": 7, + "error": None, + } + } + } + with rm_module.Mocker() as m: + m.post(_GQL_URL, json=payload) + tid = adapter.create_task(callback_display_id=1, command="whoami") + + assert tid == 7 + + def test_sends_apitoken_header(self, adapter): + payload = {"data": {"createTask": {"id": 1, "display_id": 1, "error": None}}} + with rm_module.Mocker() as m: + m.post(_GQL_URL, json=payload) + adapter.create_task(1, "cmd") + sent_headers = m.last_request.headers + + assert sent_headers.get("apitoken") == _TOKEN + + def test_error_field_raises_c2error(self, adapter): + payload = { + "data": { + "createTask": { + "id": None, + "display_id": None, + "error": "callback not found", + } + } + } + with rm_module.Mocker() as m: + m.post(_GQL_URL, json=payload) + with pytest.raises(C2Error, match="callback not found"): + adapter.create_task(1, "whoami") + + def test_network_error_raises_c2error(self, adapter): + with rm_module.Mocker() as m: + m.post(_GQL_URL, exc=requests.exceptions.Timeout("timeout")) + with pytest.raises(C2Error): + adapter.create_task(1, "whoami") + + +class TestMythicAdapterNoRedirects: + def test_does_not_follow_redirect(self, adapter): + """Adapter must not follow HTTP redirects (allow_redirects=False).""" + with rm_module.Mocker() as m: + # Simulate a redirect response; requests-mock won't auto-follow it. + m.post(_GQL_URL, status_code=301, headers={"Location": "https://evil.example/graphql"}) + # With allow_redirects=False the 301 is treated as a non-2xx → raise_for_status raises. + with pytest.raises(C2Error): + adapter.list_callbacks() + # Exactly one request was made — no follow-up to Location. + assert len(m.request_history) == 1 diff --git a/backend/tests/test_c2_callbacks.py b/backend/tests/test_c2_callbacks.py new file mode 100644 index 0000000..5f35b54 --- /dev/null +++ b/backend/tests/test_c2_callbacks.py @@ -0,0 +1,142 @@ +"""Tests for GET /api/engagements//c2/callbacks.""" +from __future__ import annotations + +import pytest +from cryptography.fernet import Fernet +from flask.testing import FlaskClient + +from backend.app.services.c2.adapter import C2Error +from backend.tests.conftest import auth_headers as _h + +_FERNET_KEY = Fernet.generate_key().decode() + + +@pytest.fixture(autouse=True) +def set_encryption_key(monkeypatch): + monkeypatch.setenv("MIMIC_ENCRYPTION_KEY", _FERNET_KEY) + + +@pytest.fixture(autouse=True) +def use_fake_adapter(monkeypatch): + monkeypatch.setenv("MIMIC_C2_ADAPTER", "fake") + + +def _make_engagement(client: FlaskClient, token: str) -> dict: + resp = client.post( + "/api/engagements", + headers=_h(token), + json={"name": "Op Alpha", "start_date": "2026-06-10"}, + ) + assert resp.status_code == 201 + return resp.get_json() + + +def _put_config(client: FlaskClient, token: str, eid: int) -> None: + resp = client.put( + f"/api/engagements/{eid}/c2-config", + headers=_h(token), + json={"url": "https://c2.internal:7443", "api_token": "s3cr3t", "verify_tls": True}, + ) + assert resp.status_code == 200 + + +class TestGetCallbacksHappyPath: + def test_returns_3_callbacks_with_fake_adapter( + self, client: FlaskClient, admin_token: str + ) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + + resp = client.get( + f"/api/engagements/{eng['id']}/c2/callbacks", + headers=_h(admin_token), + ) + assert resp.status_code == 200 + body = resp.get_json() + assert "callbacks" in body + assert len(body["callbacks"]) == 3 + + def test_callback_shape(self, client: FlaskClient, admin_token: str) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + + resp = client.get( + f"/api/engagements/{eng['id']}/c2/callbacks", + headers=_h(admin_token), + ) + cb = resp.get_json()["callbacks"][0] + assert "display_id" in cb + assert "active" in cb + assert "host" in cb + assert "user" in cb + assert "domain" in cb + assert "last_checkin" in cb + + def test_redteam_allowed( + self, client: FlaskClient, admin_token: str, redteam_token: str + ) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + resp = client.get( + f"/api/engagements/{eng['id']}/c2/callbacks", + headers=_h(redteam_token), + ) + assert resp.status_code == 200 + + +class TestGetCallbacksErrorCases: + def test_404_when_no_config(self, client: FlaskClient, admin_token: str) -> None: + eng = _make_engagement(client, admin_token) + resp = client.get( + f"/api/engagements/{eng['id']}/c2/callbacks", + headers=_h(admin_token), + ) + assert resp.status_code == 404 + + def test_404_engagement_not_found(self, client: FlaskClient, admin_token: str) -> None: + resp = client.get( + "/api/engagements/9999/c2/callbacks", + headers=_h(admin_token), + ) + assert resp.status_code == 404 + + def test_403_soc( + self, client: FlaskClient, admin_token: str, soc_token: str + ) -> None: + eng = _make_engagement(client, admin_token) + resp = client.get( + f"/api/engagements/{eng['id']}/c2/callbacks", + headers=_h(soc_token), + ) + assert resp.status_code == 403 + + def test_503_no_key( + self, monkeypatch, client: FlaskClient, admin_token: str + ) -> None: + monkeypatch.delenv("MIMIC_ENCRYPTION_KEY", raising=False) + eng = _make_engagement(client, admin_token) + resp = client.get( + f"/api/engagements/{eng['id']}/c2/callbacks", + headers=_h(admin_token), + ) + assert resp.status_code == 503 + + def test_502_when_adapter_raises( + self, monkeypatch, client: FlaskClient, admin_token: str + ) -> None: + from backend.app.services.c2 import fake as fake_mod + + def _boom(self): + raise C2Error("mythic unreachable") + + monkeypatch.setattr(fake_mod.FakeAdapter, "list_callbacks", _boom) + + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + + resp = client.get( + f"/api/engagements/{eng['id']}/c2/callbacks", + headers=_h(admin_token), + ) + assert resp.status_code == 502 + assert "mythic unreachable" in resp.get_json().get("error", "") diff --git a/backend/tests/test_c2_config.py b/backend/tests/test_c2_config.py index f5f76ca..fd2c617 100644 --- a/backend/tests/test_c2_config.py +++ b/backend/tests/test_c2_config.py @@ -95,6 +95,21 @@ def test_get_config_engagement_not_found(client: FlaskClient, admin_token: str) # --------------------------------------------------------------------------- +def test_put_rejects_http_scheme(client: FlaskClient, admin_token: str) -> None: + eng = _make_engagement(client, admin_token) + resp = _put_config(client, admin_token, eng["id"], url="http://c2.internal:7443") + assert resp.status_code == 400 + assert "https" in resp.get_json().get("error", "").lower() + + +def test_put_rejects_missing_hostname(client: FlaskClient, admin_token: str) -> None: + eng = _make_engagement(client, admin_token) + # urlparse("https://:7443") produces an empty hostname + resp = _put_config(client, admin_token, eng["id"], url="https://:7443") + assert resp.status_code == 400 + assert "hostname" in resp.get_json().get("error", "").lower() + + def test_put_creates_config(client: FlaskClient, admin_token: str) -> None: eng = _make_engagement(client, admin_token) resp = _put_config(client, admin_token, eng["id"]) diff --git a/backend/tests/test_c2_execute.py b/backend/tests/test_c2_execute.py new file mode 100644 index 0000000..8b632cc --- /dev/null +++ b/backend/tests/test_c2_execute.py @@ -0,0 +1,324 @@ +"""Tests for POST /api/simulations//c2/execute.""" +from __future__ import annotations + +import pytest +from cryptography.fernet import Fernet +from flask import Flask +from flask.testing import FlaskClient + +from backend.app.extensions import db +from backend.app.models.c2_task import C2Task +from backend.app.models.simulation import Simulation, SimulationStatus +from backend.app.services.c2.adapter import C2Error +from backend.tests.conftest import auth_headers as _h + +_FERNET_KEY = Fernet.generate_key().decode() + + +@pytest.fixture(autouse=True) +def set_encryption_key(monkeypatch): + monkeypatch.setenv("MIMIC_ENCRYPTION_KEY", _FERNET_KEY) + + +@pytest.fixture(autouse=True) +def use_fake_adapter(monkeypatch): + monkeypatch.setenv("MIMIC_C2_ADAPTER", "fake") + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_engagement(client: FlaskClient, token: str) -> dict: + resp = client.post( + "/api/engagements", + headers=_h(token), + json={"name": "Op Alpha", "start_date": "2026-06-10"}, + ) + assert resp.status_code == 201 + return resp.get_json() + + +def _put_config(client: FlaskClient, token: str, eid: int) -> None: + resp = client.put( + f"/api/engagements/{eid}/c2-config", + headers=_h(token), + json={"url": "https://c2.internal:7443", "api_token": "s3cr3t", "verify_tls": True}, + ) + assert resp.status_code == 200 + + +def _make_sim(client: FlaskClient, token: str, eid: int) -> dict: + resp = client.post( + f"/api/engagements/{eid}/simulations", + headers=_h(token), + json={"name": "Sim Alpha"}, + ) + assert resp.status_code == 201 + return resp.get_json() + + +def _execute( + client: FlaskClient, + token: str, + sid: int, + commands: list, + callback_display_id: int = 1, +): + return client.post( + f"/api/simulations/{sid}/c2/execute", + headers=_h(token), + json={"callback_display_id": callback_display_id, "commands": commands}, + ) + + +def _advance_to_in_progress(client: FlaskClient, token: str, sid: int) -> None: + client.patch( + f"/api/simulations/{sid}", + headers=_h(token), + json={"name": "Sim Alpha"}, + ) + + +def _advance_to_review_required(client: FlaskClient, token: str, sid: int) -> None: + _advance_to_in_progress(client, token, sid) + client.post( + f"/api/simulations/{sid}/transition", + headers=_h(token), + json={"to": "review_required"}, + ) + + +def _advance_to_done(client: FlaskClient, redteam_token: str, soc_token: str, sid: int) -> None: + _advance_to_review_required(client, redteam_token, sid) + client.post( + f"/api/simulations/{sid}/transition", + headers=_h(soc_token), + json={"to": "done"}, + ) + + +# --------------------------------------------------------------------------- +# Happy path +# --------------------------------------------------------------------------- + + +class TestExecuteHappyPath: + def test_two_commands_create_two_tasks( + self, app: Flask, client: FlaskClient, admin_token: str + ) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + sim = _make_sim(client, admin_token, eng["id"]) + + resp = _execute(client, admin_token, sim["id"], ["whoami", "ipconfig"]) + assert resp.status_code == 200 + body = resp.get_json() + assert len(body["tasks"]) == 2 + assert body["tasks"][0]["command"] == "whoami" + assert body["tasks"][1]["command"] == "ipconfig" + + with app.app_context(): + rows = C2Task.query.filter_by(simulation_id=sim["id"]).all() + assert len(rows) == 2 + + def test_task_response_shape( + self, client: FlaskClient, admin_token: str + ) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + sim = _make_sim(client, admin_token, eng["id"]) + + resp = _execute(client, admin_token, sim["id"], ["hostname"]) + task = resp.get_json()["tasks"][0] + assert "id" in task + assert "mythic_task_display_id" in task + assert "command" in task + assert "status" in task + assert "completed" in task + + def test_pending_sim_transitions_to_in_progress( + self, app: Flask, client: FlaskClient, admin_token: str + ) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + sim = _make_sim(client, admin_token, eng["id"]) + assert sim["status"] == "pending" + + _execute(client, admin_token, sim["id"], ["whoami"]) + + with app.app_context(): + updated = db.session.get(Simulation, sim["id"]) + assert updated is not None + assert updated.status == SimulationStatus.IN_PROGRESS + + def test_already_in_progress_stays_in_progress( + self, app: Flask, client: FlaskClient, admin_token: str + ) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + sim = _make_sim(client, admin_token, eng["id"]) + _advance_to_in_progress(client, admin_token, sim["id"]) + + resp = _execute(client, admin_token, sim["id"], ["whoami"]) + assert resp.status_code == 200 + + with app.app_context(): + updated = db.session.get(Simulation, sim["id"]) + assert updated is not None + assert updated.status == SimulationStatus.IN_PROGRESS + + def test_review_required_sim_still_allowed( + self, app: Flask, client: FlaskClient, admin_token: str, soc_token: str + ) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + sim = _make_sim(client, admin_token, eng["id"]) + _advance_to_review_required(client, admin_token, sim["id"]) + + resp = _execute(client, admin_token, sim["id"], ["net use"]) + assert resp.status_code == 200 + + # Status stays review_required — no regression to in_progress. + with app.app_context(): + updated = db.session.get(Simulation, sim["id"]) + assert updated is not None + assert updated.status == SimulationStatus.REVIEW_REQUIRED + + def test_redteam_can_execute( + self, client: FlaskClient, admin_token: str, redteam_token: str + ) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + sim = _make_sim(client, admin_token, eng["id"]) + + resp = _execute(client, redteam_token, sim["id"], ["whoami"]) + assert resp.status_code == 200 + + def test_mythic_task_display_id_stored( + self, app: Flask, client: FlaskClient, admin_token: str + ) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + sim = _make_sim(client, admin_token, eng["id"]) + + _execute(client, admin_token, sim["id"], ["whoami"]) + + with app.app_context(): + task = C2Task.query.filter_by(simulation_id=sim["id"]).first() + assert task is not None + assert task.mythic_task_display_id == 1000 # FakeAdapter starts at 1000 + + +# --------------------------------------------------------------------------- +# Error cases +# --------------------------------------------------------------------------- + + +class TestExecuteValidation: + def test_400_empty_commands( + self, client: FlaskClient, admin_token: str + ) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + sim = _make_sim(client, admin_token, eng["id"]) + + resp = _execute(client, admin_token, sim["id"], []) + assert resp.status_code == 400 + + def test_400_non_string_command( + self, client: FlaskClient, admin_token: str + ) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + sim = _make_sim(client, admin_token, eng["id"]) + + resp = client.post( + f"/api/simulations/{sim['id']}/c2/execute", + headers=_h(admin_token), + json={"callback_display_id": 1, "commands": [42]}, + ) + assert resp.status_code == 400 + + def test_400_missing_callback_display_id( + self, client: FlaskClient, admin_token: str + ) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + sim = _make_sim(client, admin_token, eng["id"]) + + resp = client.post( + f"/api/simulations/{sim['id']}/c2/execute", + headers=_h(admin_token), + json={"commands": ["whoami"]}, + ) + assert resp.status_code == 400 + + def test_409_done_sim( + self, + client: FlaskClient, + admin_token: str, + soc_token: str, + ) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + sim = _make_sim(client, admin_token, eng["id"]) + _advance_to_done(client, admin_token, soc_token, sim["id"]) + + resp = _execute(client, admin_token, sim["id"], ["whoami"]) + assert resp.status_code == 409 + assert "done" in resp.get_json().get("error", "").lower() + + def test_404_simulation_not_found( + self, client: FlaskClient, admin_token: str + ) -> None: + resp = _execute(client, admin_token, 9999, ["whoami"]) + assert resp.status_code == 404 + + def test_404_no_c2_config( + self, client: FlaskClient, admin_token: str + ) -> None: + eng = _make_engagement(client, admin_token) + sim = _make_sim(client, admin_token, eng["id"]) + + resp = _execute(client, admin_token, sim["id"], ["whoami"]) + assert resp.status_code == 404 + + def test_403_soc( + self, client: FlaskClient, admin_token: str, soc_token: str + ) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + sim = _make_sim(client, admin_token, eng["id"]) + + resp = _execute(client, soc_token, sim["id"], ["whoami"]) + assert resp.status_code == 403 + + def test_503_no_key( + self, monkeypatch, client: FlaskClient, admin_token: str + ) -> None: + monkeypatch.delenv("MIMIC_ENCRYPTION_KEY", raising=False) + eng = _make_engagement(client, admin_token) + sim = _make_sim(client, admin_token, eng["id"]) + + resp = _execute(client, admin_token, sim["id"], ["whoami"]) + assert resp.status_code == 503 + + def test_502_adapter_error( + self, monkeypatch, client: FlaskClient, admin_token: str + ) -> None: + from backend.app.services.c2 import fake as fake_mod + + def _boom(self, callback_display_id, command, params=None): + raise C2Error("task queue full") + + monkeypatch.setattr(fake_mod.FakeAdapter, "create_task", _boom) + + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + sim = _make_sim(client, admin_token, eng["id"]) + + resp = _execute(client, admin_token, sim["id"], ["whoami"]) + assert resp.status_code == 502 + assert "task queue full" in resp.get_json().get("error", "")