diff --git a/backend/app/__init__.py b/backend/app/__init__.py index 350a0e2..e2f2746 100644 --- a/backend/app/__init__.py +++ b/backend/app/__init__.py @@ -6,7 +6,7 @@ from pathlib import Path from flask import Flask, jsonify, send_from_directory -from backend.app.api import auth_bp, engagements_bp, simulations_bp, templates_bp, users_bp +from backend.app.api import auth_bp, c2_bp, engagements_bp, simulations_bp, templates_bp, users_bp from backend.app.cli import register_cli from backend.app.config import Config, TestConfig from backend.app.errors import register_error_handlers @@ -38,6 +38,7 @@ def create_app(config_object: object | None = None) -> Flask: app.register_blueprint(engagements_bp) app.register_blueprint(simulations_bp) app.register_blueprint(templates_bp) + app.register_blueprint(c2_bp) from backend.app.services import mitre as mitre_svc mitre_svc.load_bundle() diff --git a/backend/app/api/__init__.py b/backend/app/api/__init__.py index 780821a..083e147 100644 --- a/backend/app/api/__init__.py +++ b/backend/app/api/__init__.py @@ -1,8 +1,9 @@ """API blueprints.""" from backend.app.api.auth import auth_bp +from backend.app.api.c2 import c2_bp from backend.app.api.engagements import engagements_bp from backend.app.api.simulations import simulations_bp from backend.app.api.templates import templates_bp from backend.app.api.users import users_bp -__all__ = ["auth_bp", "users_bp", "engagements_bp", "simulations_bp", "templates_bp"] +__all__ = ["auth_bp", "c2_bp", "users_bp", "engagements_bp", "simulations_bp", "templates_bp"] diff --git a/backend/app/api/c2.py b/backend/app/api/c2.py new file mode 100644 index 0000000..d4a2774 --- /dev/null +++ b/backend/app/api/c2.py @@ -0,0 +1,156 @@ +"""C2 config endpoints for engagements. + +All four endpoints: +- Require admin or redteam role (SOC → 403). +- Return 503 when MIMIC_ENCRYPTION_KEY is not set. +- Never include the cleartext API token in any response. +""" +from __future__ import annotations + +from datetime import UTC, datetime + +from flask import Blueprint, jsonify, request + +from backend.app.auth import role_required +from backend.app.extensions import db +from backend.app.models import Engagement +from backend.app.models.c2_config import C2Config +from backend.app.services.c2.factory import get_adapter +from backend.app.services.crypto import C2Disabled, decrypt, encrypt + +c2_bp = Blueprint("c2", __name__, url_prefix="/api/engagements") + +_503_BODY = {"error": "C2 disabled: MIMIC_ENCRYPTION_KEY not set"} + + +def _crypto_guard(): + """Return a 503 Response when crypto key is absent, else None.""" + try: + # Attempt a dummy operation to test key availability. + encrypt("probe") + return None + except C2Disabled: + return jsonify(_503_BODY), 503 + + +@c2_bp.get("//c2-config") +@role_required("admin", "redteam") +def get_c2_config(eid: int): + guard = _crypto_guard() + if guard is not None: + return guard + + engagement = db.session.get(Engagement, eid) + if engagement is None: + return jsonify({"error": "Engagement not found"}), 404 + + cfg: C2Config | None = engagement.c2_config + if cfg is None: + return jsonify({"error": "C2 config not found"}), 404 + + return jsonify({ + "has_token": bool(cfg.api_token_encrypted), + "url": cfg.url, + "verify_tls": cfg.verify_tls, + }), 200 + + +@c2_bp.put("//c2-config") +@role_required("admin", "redteam") +def upsert_c2_config(eid: int): + guard = _crypto_guard() + if guard is not None: + return guard + + engagement = db.session.get(Engagement, eid) + if engagement is None: + return jsonify({"error": "Engagement not found"}), 404 + + data = request.get_json(silent=True) or {} + url = (data.get("url") or "").strip() + if not url: + return jsonify({"error": "url is required"}), 400 + + verify_tls = data.get("verify_tls", True) + if not isinstance(verify_tls, bool): + return jsonify({"error": "verify_tls must be a boolean"}), 400 + + cfg: C2Config | None = engagement.c2_config + + if cfg is None: + # New row — api_token is required on creation. + raw_token = data.get("api_token") or "" + if not raw_token: + return jsonify({"error": "api_token is required when creating a config"}), 400 + encrypted = encrypt(raw_token) + cfg = C2Config( + engagement_id=eid, + url=url, + api_token_encrypted=encrypted, + verify_tls=verify_tls, + ) + db.session.add(cfg) + else: + # Update — omitting api_token keeps the existing ciphertext. + cfg.url = url + cfg.verify_tls = verify_tls + cfg.updated_at = datetime.now(UTC) + raw_token = data.get("api_token") or "" + if raw_token: + cfg.api_token_encrypted = encrypt(raw_token) + + db.session.commit() + return jsonify({ + "has_token": True, + "url": cfg.url, + "verify_tls": cfg.verify_tls, + }), 200 + + +@c2_bp.delete("//c2-config") +@role_required("admin", "redteam") +def delete_c2_config(eid: int): + guard = _crypto_guard() + if guard is not None: + return guard + + engagement = db.session.get(Engagement, eid) + if engagement is None: + return jsonify({"error": "Engagement not found"}), 404 + + cfg: C2Config | None = engagement.c2_config + if cfg is None: + return jsonify({"error": "C2 config not found"}), 404 + + db.session.delete(cfg) + db.session.commit() + return "", 204 + + +@c2_bp.post("//c2-config/test") +@role_required("admin", "redteam") +def test_c2_config(eid: int): + guard = _crypto_guard() + if guard is not None: + return guard + + engagement = db.session.get(Engagement, eid) + if engagement is None: + return jsonify({"error": "Engagement not found"}), 404 + + cfg: C2Config | None = engagement.c2_config + if cfg is None: + return jsonify({"error": "C2 config not found"}), 404 + + try: + api_token = decrypt(cfg.api_token_encrypted) + except ValueError: + return jsonify({"ok": False, "error": "Stored token is corrupt"}), 200 + + adapter = get_adapter( + url=cfg.url, + api_token=api_token, + verify_tls=cfg.verify_tls, + ) + health = adapter.test_connection() + return jsonify({"ok": health.ok, "error": health.error}), 200 diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index e432347..693b091 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -1,4 +1,6 @@ """SQLAlchemy models.""" +from backend.app.models.c2_config import C2Config +from backend.app.models.c2_task import C2Task, C2TaskSource from backend.app.models.engagement import Engagement, EngagementStatus from backend.app.models.simulation import Simulation, SimulationStatus from backend.app.models.simulation_template import SimulationTemplate @@ -12,4 +14,7 @@ __all__ = [ "Simulation", "SimulationStatus", "SimulationTemplate", + "C2Config", + "C2Task", + "C2TaskSource", ] diff --git a/backend/app/models/c2_config.py b/backend/app/models/c2_config.py new file mode 100644 index 0000000..4015a13 --- /dev/null +++ b/backend/app/models/c2_config.py @@ -0,0 +1,34 @@ +"""C2Config model — per-engagement Mythic connection settings.""" +from __future__ import annotations + +from datetime import UTC, datetime + +from backend.app.extensions import db + + +class C2Config(db.Model): # type: ignore[name-defined] + __tablename__ = "c2_config" + + id = db.Column(db.Integer, primary_key=True) + engagement_id = db.Column( + db.Integer, + db.ForeignKey("engagements.id", ondelete="CASCADE"), + nullable=False, + unique=True, + index=True, + ) + url = db.Column(db.Text, nullable=False) + api_token_encrypted = db.Column(db.Text, nullable=False) + verify_tls = db.Column(db.Boolean, nullable=False, default=True) + created_at = db.Column( + db.DateTime, nullable=False, default=lambda: datetime.now(UTC) + ) + updated_at = db.Column(db.DateTime, nullable=True) + + engagement = db.relationship( + "Engagement", + backref=db.backref("c2_config", uselist=False, cascade="all, delete-orphan"), + ) + + def __repr__(self) -> str: + return f"" diff --git a/backend/app/models/c2_task.py b/backend/app/models/c2_task.py new file mode 100644 index 0000000..d87de92 --- /dev/null +++ b/backend/app/models/c2_task.py @@ -0,0 +1,47 @@ +"""C2Task model — link between a Mimic simulation and a Mythic task.""" +from __future__ import annotations + +import enum +from datetime import UTC, datetime + +from backend.app.extensions import db + + +class C2TaskSource(str, enum.Enum): + MIMIC = "mimic" + IMPORT = "import" + + +class C2Task(db.Model): # type: ignore[name-defined] + __tablename__ = "c2_task" + + id = db.Column(db.Integer, primary_key=True) + simulation_id = db.Column( + db.Integer, + db.ForeignKey("simulations.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + mythic_task_display_id = db.Column(db.Integer, nullable=False) + callback_display_id = db.Column(db.Integer, nullable=False) + command = db.Column(db.Text, nullable=False) + params = db.Column(db.Text, nullable=True) + status = db.Column(db.Text, nullable=False) + completed = db.Column(db.Boolean, nullable=False, default=False) + output = db.Column(db.Text, nullable=True) + source = db.Column( + db.Enum(C2TaskSource, name="c2task_source"), + nullable=False, + ) + created_at = db.Column( + db.DateTime, nullable=False, default=lambda: datetime.now(UTC) + ) + completed_at = db.Column(db.DateTime, nullable=True) + + simulation = db.relationship( + "Simulation", + backref=db.backref("c2_tasks", cascade="all, delete-orphan", lazy="dynamic"), + ) + + def __repr__(self) -> str: + return f"" diff --git a/backend/app/services/c2/__init__.py b/backend/app/services/c2/__init__.py new file mode 100644 index 0000000..63c943a --- /dev/null +++ b/backend/app/services/c2/__init__.py @@ -0,0 +1,20 @@ +"""C2 adapter package. Import the factory from here.""" +from backend.app.services.c2.adapter import ( + C2Adapter, + C2Callback, + C2Health, + C2TaskPage, + C2TaskStatus, + decode_response_text, +) +from backend.app.services.c2.factory import get_adapter + +__all__ = [ + "C2Adapter", + "C2Callback", + "C2Health", + "C2TaskPage", + "C2TaskStatus", + "decode_response_text", + "get_adapter", +] diff --git a/backend/app/services/c2/adapter.py b/backend/app/services/c2/adapter.py new file mode 100644 index 0000000..3a576aa --- /dev/null +++ b/backend/app/services/c2/adapter.py @@ -0,0 +1,97 @@ +"""Abstract C2 adapter interface and shared dataclasses.""" +from __future__ import annotations + +import base64 +import binascii +from abc import ABC, abstractmethod +from dataclasses import dataclass + + +@dataclass +class C2Health: + ok: bool + error: str | None = None + + +@dataclass +class C2Callback: + display_id: int + active: bool + host: str + user: str + domain: str + last_checkin: str # ISO-8601 string + + +@dataclass +class C2TaskStatus: + display_id: int + status: str + completed: bool + + +@dataclass +class C2TaskPage: + items: list[dict] # raw task dicts from Mythic + total: int + page: int + page_size: int + + +def decode_response_text(raw: str) -> str: + """Decode a base64-encoded Mythic response_text field. + + On binascii.Error (binary payload) returns " " + hex string + so execution_result never silently corrupts. + """ + try: + return base64.b64decode(raw).decode("utf-8") + except binascii.Error: + return " " + raw.encode().hex() + except UnicodeDecodeError: + raw_bytes = base64.b64decode(raw) + return " " + raw_bytes.hex() + + +class C2Adapter(ABC): + """Thin interface over a C2 backend (Mythic or custom).""" + + @abstractmethod + def test_connection(self) -> C2Health: + """Verify that the C2 is reachable and the token is valid.""" + ... + + @abstractmethod + def list_callbacks(self) -> list[C2Callback]: + """Return active callbacks visible to this API token.""" + ... + + @abstractmethod + def create_task( + self, + callback_display_id: int, + command: str, + params: str | None = None, + ) -> int: + """Issue a task and return its Mythic display_id.""" + ... + + @abstractmethod + def get_task(self, task_display_id: int) -> C2TaskStatus: + """Return current status of a task.""" + ... + + @abstractmethod + def get_task_output(self, task_display_id: int) -> str: + """Return decoded, concatenated output for a completed task.""" + ... + + @abstractmethod + def list_callback_tasks( + self, + callback_display_id: int, + page: int = 1, + page_size: int = 25, + ) -> C2TaskPage: + """Return a paginated history of tasks for a callback.""" + ... diff --git a/backend/app/services/c2/factory.py b/backend/app/services/c2/factory.py new file mode 100644 index 0000000..0c46370 --- /dev/null +++ b/backend/app/services/c2/factory.py @@ -0,0 +1,19 @@ +"""Factory that resolves the C2Adapter implementation from MIMIC_C2_ADAPTER env.""" +from __future__ import annotations + +import os + +from backend.app.services.c2.adapter import C2Adapter + + +def get_adapter(url: str, api_token: str, verify_tls: bool = True) -> C2Adapter: + """Return the correct C2Adapter based on MIMIC_C2_ADAPTER (default: mythic).""" + adapter_name = os.environ.get("MIMIC_C2_ADAPTER", "mythic").lower() + + if adapter_name == "fake": + from backend.app.services.c2.fake import FakeAdapter + return FakeAdapter() + + # Default: real Mythic adapter + from backend.app.services.c2.mythic import MythicAdapter + return MythicAdapter(url=url, api_token=api_token, verify_tls=verify_tls) diff --git a/backend/app/services/c2/fake.py b/backend/app/services/c2/fake.py new file mode 100644 index 0000000..08da43d --- /dev/null +++ b/backend/app/services/c2/fake.py @@ -0,0 +1,91 @@ +"""Deterministic in-memory C2 adapter — used when MIMIC_C2_ADAPTER=fake. + +Intended for integration tests and local development without a live Mythic instance. +""" +from __future__ import annotations + +from backend.app.services.c2.adapter import ( + C2Adapter, + C2Callback, + C2Health, + C2TaskPage, + C2TaskStatus, +) + +_FAKE_CALLBACKS = [ + C2Callback( + display_id=1, + active=True, + host="WORKSTATION-01", + user="jdoe", + domain="LAB", + last_checkin="2026-06-10T00:00:00Z", + ), +] + +_FAKE_TASKS: dict[int, dict] = {} +_next_task_id = 100 + + +class FakeAdapter(C2Adapter): + """In-memory adapter with deterministic behaviour.""" + + def test_connection(self) -> C2Health: + return C2Health(ok=True) + + def list_callbacks(self) -> list[C2Callback]: + return list(_FAKE_CALLBACKS) + + def create_task( + self, + callback_display_id: int, + command: str, + params: str | None = None, + ) -> int: + global _next_task_id + tid = _next_task_id + _next_task_id += 1 + _FAKE_TASKS[tid] = { + "display_id": tid, + "callback_display_id": callback_display_id, + "command": command, + "params": params, + "status": "submitted", + "completed": False, + "output": None, + } + return tid + + def get_task(self, task_display_id: int) -> C2TaskStatus: + task = _FAKE_TASKS.get(task_display_id) + if task is None: + return C2TaskStatus(display_id=task_display_id, status="unknown", completed=False) + return C2TaskStatus( + display_id=task_display_id, + status=task["status"], + completed=task["completed"], + ) + + def get_task_output(self, task_display_id: int) -> str: + task = _FAKE_TASKS.get(task_display_id) + if task is None: + return "" + return task.get("output") or "" + + def list_callback_tasks( + self, + callback_display_id: int, + page: int = 1, + page_size: int = 25, + ) -> C2TaskPage: + items = [ + t for t in _FAKE_TASKS.values() + if t["callback_display_id"] == callback_display_id + ] + start = (page - 1) * page_size + return C2TaskPage( + items=items[start : start + page_size], + total=len(items), + page=page, + page_size=page_size, + ) diff --git a/backend/app/services/c2/mythic.py b/backend/app/services/c2/mythic.py new file mode 100644 index 0000000..3c47a83 --- /dev/null +++ b/backend/app/services/c2/mythic.py @@ -0,0 +1,79 @@ +# Contract pinned from MythicMeta/Mythic_Scripting master @ 2026-06-10 (raw.githubusercontent.com/MythicMeta/Mythic_Scripting/master/mythic/mythic.py) +"""Mythic 3.x C2 adapter. + +M1 implements test_connection() only. +All other methods raise NotImplementedError("M2") — they land in milestone M2/M3. + +Transport: POST https://:7443/graphql +Header: apitoken: +Backend: Hasura-proxied Postgres behind nginx. +""" +from __future__ import annotations + +import requests + +from backend.app.services.c2.adapter import ( + C2Adapter, + C2Callback, + C2Health, + C2TaskPage, + C2TaskStatus, +) + +_HEALTH_QUERY = '{ __typename }' + + +class MythicAdapter(C2Adapter): + """Real Mythic 3.x adapter using GraphQL over HTTP.""" + + def __init__(self, url: str, api_token: str, verify_tls: bool = True) -> None: + self._url = url.rstrip("/") + "/graphql" + self._token = api_token + self._verify = verify_tls + + def _headers(self) -> dict[str, str]: + return { + "Content-Type": "application/json", + "apitoken": self._token, + } + + def test_connection(self) -> C2Health: + """POST a trivial introspection query to verify reachability and token validity.""" + try: + resp = requests.post( + self._url, + json={"query": _HEALTH_QUERY}, + headers=self._headers(), + verify=self._verify, + timeout=10, + ) + if resp.status_code == 200: + return C2Health(ok=True) + return C2Health(ok=False, error=f"HTTP {resp.status_code}") + except requests.RequestException as exc: + return C2Health(ok=False, error=str(exc)) + + def list_callbacks(self) -> list[C2Callback]: + raise NotImplementedError("M2") + + def create_task( + self, + callback_display_id: int, + command: str, + params: str | None = None, + ) -> int: + raise NotImplementedError("M2") + + def get_task(self, task_display_id: int) -> C2TaskStatus: + raise NotImplementedError("M2") + + def get_task_output(self, task_display_id: int) -> str: + raise NotImplementedError("M3") + + def list_callback_tasks( + self, + callback_display_id: int, + page: int = 1, + page_size: int = 25, + ) -> C2TaskPage: + raise NotImplementedError("M4") diff --git a/backend/app/services/crypto.py b/backend/app/services/crypto.py new file mode 100644 index 0000000..4c13854 --- /dev/null +++ b/backend/app/services/crypto.py @@ -0,0 +1,40 @@ +"""Fernet-based encryption service for sensitive fields. + +Key is read from the MIMIC_ENCRYPTION_KEY env var (Fernet base64-urlsafe 32-byte key). +When the key is absent the service raises C2Disabled so callers can return 503. +The key is never logged or returned in any response. +""" +from __future__ import annotations + +import os + +from cryptography.fernet import Fernet, InvalidToken + + +class C2Disabled(Exception): + """Raised when MIMIC_ENCRYPTION_KEY is not set.""" + + +def _get_fernet() -> Fernet: + key = os.environ.get("MIMIC_ENCRYPTION_KEY") + if not key: + raise C2Disabled("C2 disabled: MIMIC_ENCRYPTION_KEY not set") + return Fernet(key.encode() if isinstance(key, str) else key) + + +def encrypt(plaintext: str) -> str: + """Encrypt *plaintext* and return a Fernet token (str).""" + f = _get_fernet() + return f.encrypt(plaintext.encode()).decode() + + +def decrypt(ciphertext: str) -> str: + """Decrypt a Fernet token and return the plaintext string.""" + f = _get_fernet() + try: + return f.decrypt(ciphertext.encode()).decode() + except InvalidToken as exc: + raise ValueError("Invalid ciphertext") from exc + + +__all__ = ["C2Disabled", "encrypt", "decrypt"] diff --git a/backend/migrations/versions/0006_c2_layer.py b/backend/migrations/versions/0006_c2_layer.py new file mode 100644 index 0000000..619a394 --- /dev/null +++ b/backend/migrations/versions/0006_c2_layer.py @@ -0,0 +1,67 @@ +"""create c2_config and c2_task tables + +Revision ID: 0006 +Revises: 0005 +Create Date: 2026-06-10 00:00:00.000000 +""" +import sqlalchemy as sa +from alembic import op + +revision = "0006" +down_revision = "0005" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "c2_config", + sa.Column("id", sa.Integer(), primary_key=True), + sa.Column( + "engagement_id", + sa.Integer(), + sa.ForeignKey("engagements.id", ondelete="CASCADE"), + nullable=False, + unique=True, + ), + sa.Column("url", sa.Text(), nullable=False), + sa.Column("api_token_encrypted", sa.Text(), nullable=False), + sa.Column("verify_tls", sa.Boolean(), nullable=False, server_default=sa.true()), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("updated_at", sa.DateTime(), nullable=True), + ) + op.create_index("ix_c2_config_engagement_id", "c2_config", ["engagement_id"]) + + op.create_table( + "c2_task", + sa.Column("id", sa.Integer(), primary_key=True), + sa.Column( + "simulation_id", + sa.Integer(), + sa.ForeignKey("simulations.id", ondelete="CASCADE"), + nullable=False, + index=True, + ), + sa.Column("mythic_task_display_id", sa.Integer(), nullable=False), + sa.Column("callback_display_id", sa.Integer(), nullable=False), + sa.Column("command", sa.Text(), nullable=False), + sa.Column("params", sa.Text(), nullable=True), + sa.Column("status", sa.Text(), nullable=False), + sa.Column("completed", sa.Boolean(), nullable=False, server_default=sa.false()), + sa.Column("output", sa.Text(), nullable=True), + sa.Column( + "source", + sa.Enum("mimic", "import", name="c2task_source"), + nullable=False, + ), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("completed_at", sa.DateTime(), nullable=True), + ) + + +def downgrade() -> None: + op.drop_table("c2_task") + op.drop_index("ix_c2_config_engagement_id", "c2_config") + op.drop_table("c2_config") + # Remove the enum type (no-op on SQLite, required on Postgres) + sa.Enum(name="c2task_source").drop(op.get_bind(), checkfirst=True) diff --git a/backend/requirements.txt b/backend/requirements.txt index 878005e..9f06e02 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -4,6 +4,9 @@ Flask-Migrate==4.0.7 PyJWT==2.9.0 argon2-cffi==23.1.0 weasyprint>=60.0 +cryptography==44.0.0 +requests==2.32.3 pytest==8.3.3 ruff==0.6.9 mypy==1.11.2 +types-requests==2.32.0.20240914 diff --git a/backend/tests/test_c2_adapter_fake.py b/backend/tests/test_c2_adapter_fake.py new file mode 100644 index 0000000..ca464cf --- /dev/null +++ b/backend/tests/test_c2_adapter_fake.py @@ -0,0 +1,30 @@ +"""Tests for the FakeAdapter deterministic in-memory implementation.""" +from __future__ import annotations + +from backend.app.services.c2.adapter import C2Health +from backend.app.services.c2.fake import FakeAdapter + + +class TestFakeAdapterTestConnection: + def test_returns_ok_true(self): + adapter = FakeAdapter() + health = adapter.test_connection() + assert isinstance(health, C2Health) + assert health.ok is True + assert health.error is None + + def test_list_callbacks_returns_list(self): + adapter = FakeAdapter() + callbacks = adapter.list_callbacks() + assert isinstance(callbacks, list) + assert len(callbacks) >= 1 + + def test_list_callbacks_fields(self): + adapter = FakeAdapter() + cb = adapter.list_callbacks()[0] + assert hasattr(cb, "display_id") + assert hasattr(cb, "active") + assert hasattr(cb, "host") + assert hasattr(cb, "user") + assert hasattr(cb, "domain") + assert hasattr(cb, "last_checkin") diff --git a/backend/tests/test_c2_config.py b/backend/tests/test_c2_config.py new file mode 100644 index 0000000..f5f76ca --- /dev/null +++ b/backend/tests/test_c2_config.py @@ -0,0 +1,352 @@ +"""Tests for C2 config CRUD endpoints. + +Covers: +- GET 404 when no config exists +- PUT create (api_token required) +- PUT update with omitted token keeps old ciphertext +- GET 200 returns has_token=True, never cleartext +- DELETE 204 +- Cascade delete when engagement is deleted +- RBAC: admin OK / redteam OK / SOC 403 on all 4 endpoints +- 503 guard when MIMIC_ENCRYPTION_KEY is unset +- POST /test with fake adapter +""" +from __future__ import annotations + +import pytest +from cryptography.fernet import Fernet +from flask import Flask +from flask.testing import FlaskClient + +from backend.app.models.c2_config import C2Config +from backend.tests.conftest import auth_headers as _h + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +_FERNET_KEY = Fernet.generate_key().decode() + + +@pytest.fixture(autouse=True) +def set_encryption_key(monkeypatch): + """Default: key is present. Individual tests can override.""" + monkeypatch.setenv("MIMIC_ENCRYPTION_KEY", _FERNET_KEY) + + +@pytest.fixture(autouse=True) +def use_fake_adapter(monkeypatch): + monkeypatch.setenv("MIMIC_C2_ADAPTER", "fake") + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_engagement(client: FlaskClient, token: str) -> dict: + resp = client.post( + "/api/engagements", + headers=_h(token), + json={"name": "Op Alpha", "start_date": "2026-06-10"}, + ) + assert resp.status_code == 201, resp.get_json() + return resp.get_json() + + +def _put_config( + client: FlaskClient, + token: str, + eid: int, + *, + url: str = "https://c2.internal:7443", + api_token: str | None = "s3cr3t", + verify_tls: bool = True, +) -> dict: + payload: dict = {"url": url, "verify_tls": verify_tls} + if api_token is not None: + payload["api_token"] = api_token + resp = client.put( + f"/api/engagements/{eid}/c2-config", + headers=_h(token), + json=payload, + ) + return resp + + +# --------------------------------------------------------------------------- +# GET — 404 when no config +# --------------------------------------------------------------------------- + + +def test_get_config_not_found(client: FlaskClient, admin_token: str) -> None: + eng = _make_engagement(client, admin_token) + resp = client.get(f"/api/engagements/{eng['id']}/c2-config", headers=_h(admin_token)) + assert resp.status_code == 404 + + +def test_get_config_engagement_not_found(client: FlaskClient, admin_token: str) -> None: + resp = client.get("/api/engagements/9999/c2-config", headers=_h(admin_token)) + assert resp.status_code == 404 + + +# --------------------------------------------------------------------------- +# PUT — create +# --------------------------------------------------------------------------- + + +def test_put_creates_config(client: FlaskClient, admin_token: str) -> None: + eng = _make_engagement(client, admin_token) + resp = _put_config(client, admin_token, eng["id"]) + assert resp.status_code == 200 + body = resp.get_json() + assert body["has_token"] is True + assert body["url"] == "https://c2.internal:7443" + assert body["verify_tls"] is True + + +def test_put_create_requires_api_token(client: FlaskClient, admin_token: str) -> None: + eng = _make_engagement(client, admin_token) + resp = _put_config(client, admin_token, eng["id"], api_token=None) + assert resp.status_code == 400 + + +def test_put_create_requires_url(client: FlaskClient, admin_token: str) -> None: + eng = _make_engagement(client, admin_token) + resp = client.put( + f"/api/engagements/{eng['id']}/c2-config", + headers=_h(admin_token), + json={"api_token": "tok", "verify_tls": True}, + ) + assert resp.status_code == 400 + + +# --------------------------------------------------------------------------- +# PUT — update, omitting api_token preserves old ciphertext +# --------------------------------------------------------------------------- + + +def test_put_update_omits_token_keeps_old( + app: Flask, client: FlaskClient, admin_token: str +) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"], api_token="original-token") + + # Read ciphertext from DB before update. + with app.app_context(): + cfg = C2Config.query.filter_by(engagement_id=eng["id"]).first() + assert cfg is not None + old_cipher = cfg.api_token_encrypted + + # Update URL, omit api_token. + resp = _put_config( + client, admin_token, eng["id"], + url="https://new.internal:7443", api_token=None, + ) + assert resp.status_code == 200 + + with app.app_context(): + cfg = C2Config.query.filter_by(engagement_id=eng["id"]).first() + assert cfg is not None + assert cfg.api_token_encrypted == old_cipher + assert cfg.url == "https://new.internal:7443" + + +def test_put_update_with_token_replaces_ciphertext( + app: Flask, client: FlaskClient, admin_token: str +) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"], api_token="original-token") + + with app.app_context(): + cfg = C2Config.query.filter_by(engagement_id=eng["id"]).first() + assert cfg is not None + old_cipher = cfg.api_token_encrypted + + _put_config(client, admin_token, eng["id"], api_token="new-token") + + with app.app_context(): + cfg = C2Config.query.filter_by(engagement_id=eng["id"]).first() + assert cfg is not None + assert cfg.api_token_encrypted != old_cipher + + +# --------------------------------------------------------------------------- +# GET — 200, has_token=True, never cleartext +# --------------------------------------------------------------------------- + + +def test_get_config_returns_has_token_not_cleartext( + client: FlaskClient, admin_token: str +) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"], api_token="s3cr3t") + + resp = client.get(f"/api/engagements/{eng['id']}/c2-config", headers=_h(admin_token)) + assert resp.status_code == 200 + body = resp.get_json() + assert body["has_token"] is True + assert "api_token" not in body + assert "api_token_encrypted" not in body + assert "s3cr3t" not in str(body) + + +def test_get_config_verify_tls_default_true( + client: FlaskClient, admin_token: str +) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + resp = client.get(f"/api/engagements/{eng['id']}/c2-config", headers=_h(admin_token)) + assert resp.get_json()["verify_tls"] is True + + +# --------------------------------------------------------------------------- +# DELETE — 204 +# --------------------------------------------------------------------------- + + +def test_delete_config_204(client: FlaskClient, admin_token: str) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + + resp = client.delete( + f"/api/engagements/{eng['id']}/c2-config", headers=_h(admin_token) + ) + assert resp.status_code == 204 + + # Subsequent GET returns 404. + resp2 = client.get(f"/api/engagements/{eng['id']}/c2-config", headers=_h(admin_token)) + assert resp2.status_code == 404 + + +def test_delete_config_not_found(client: FlaskClient, admin_token: str) -> None: + eng = _make_engagement(client, admin_token) + resp = client.delete( + f"/api/engagements/{eng['id']}/c2-config", headers=_h(admin_token) + ) + assert resp.status_code == 404 + + +# --------------------------------------------------------------------------- +# CASCADE — delete engagement removes config +# --------------------------------------------------------------------------- + + +def test_cascade_delete_engagement_removes_config( + app: Flask, client: FlaskClient, admin_token: str +) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + + with app.app_context(): + assert C2Config.query.filter_by(engagement_id=eng["id"]).count() == 1 + + client.delete(f"/api/engagements/{eng['id']}", headers=_h(admin_token)) + + with app.app_context(): + assert C2Config.query.filter_by(engagement_id=eng["id"]).count() == 0 + + +# --------------------------------------------------------------------------- +# RBAC matrix +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("method,path_suffix", [ + ("GET", "/c2-config"), + ("PUT", "/c2-config"), + ("DELETE", "/c2-config"), + ("POST", "/c2-config/test"), +]) +def test_soc_gets_403( + client: FlaskClient, admin_token: str, soc_token: str, + method: str, path_suffix: str, +) -> None: + eng = _make_engagement(client, admin_token) + url = f"/api/engagements/{eng['id']}{path_suffix}" + resp = getattr(client, method.lower())(url, headers=_h(soc_token), json={}) + assert resp.status_code == 403 + + +@pytest.mark.parametrize("method,path_suffix", [ + ("GET", "/c2-config"), + ("DELETE", "/c2-config"), + ("POST", "/c2-config/test"), +]) +def test_redteam_gets_allowed( + client: FlaskClient, admin_token: str, redteam_token: str, + method: str, path_suffix: str, +) -> None: + eng = _make_engagement(client, admin_token) + # Ensure config exists for GET/DELETE/test. + _put_config(client, admin_token, eng["id"]) + url = f"/api/engagements/{eng['id']}{path_suffix}" + resp = getattr(client, method.lower())(url, headers=_h(redteam_token), json={}) + # Not 403 and not 401. + assert resp.status_code not in (401, 403) + + +def test_redteam_can_put_config( + client: FlaskClient, admin_token: str, redteam_token: str, +) -> None: + eng = _make_engagement(client, admin_token) + resp = _put_config(client, redteam_token, eng["id"]) + assert resp.status_code == 200 + + +# --------------------------------------------------------------------------- +# 503 guard when MIMIC_ENCRYPTION_KEY is unset +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("method,path_suffix", [ + ("GET", "/c2-config"), + ("PUT", "/c2-config"), + ("DELETE", "/c2-config"), + ("POST", "/c2-config/test"), +]) +def test_503_when_key_unset( + monkeypatch, + client: FlaskClient, + admin_token: str, + method: str, + path_suffix: str, +) -> None: + monkeypatch.delenv("MIMIC_ENCRYPTION_KEY", raising=False) + eng = _make_engagement(client, admin_token) + url = f"/api/engagements/{eng['id']}{path_suffix}" + resp = getattr(client, method.lower())(url, headers=_h(admin_token), json={ + "url": "https://c2", "api_token": "tok", "verify_tls": True, + }) + assert resp.status_code == 503 + assert "MIMIC_ENCRYPTION_KEY" in resp.get_json().get("error", "") + + +# --------------------------------------------------------------------------- +# POST /test — connectivity check via fake adapter +# --------------------------------------------------------------------------- + + +def test_post_test_returns_ok_true(client: FlaskClient, admin_token: str) -> None: + eng = _make_engagement(client, admin_token) + _put_config(client, admin_token, eng["id"]) + + resp = client.post( + f"/api/engagements/{eng['id']}/c2-config/test", + headers=_h(admin_token), + json={}, + ) + assert resp.status_code == 200 + body = resp.get_json() + assert body["ok"] is True + assert body["error"] is None + + +def test_post_test_no_config_returns_404(client: FlaskClient, admin_token: str) -> None: + eng = _make_engagement(client, admin_token) + resp = client.post( + f"/api/engagements/{eng['id']}/c2-config/test", + headers=_h(admin_token), + json={}, + ) + assert resp.status_code == 404 diff --git a/backend/tests/test_crypto.py b/backend/tests/test_crypto.py new file mode 100644 index 0000000..71da8d3 --- /dev/null +++ b/backend/tests/test_crypto.py @@ -0,0 +1,52 @@ +"""Tests for the Fernet crypto service.""" +from __future__ import annotations + +import pytest +from cryptography.fernet import Fernet + +from backend.app.services.crypto import C2Disabled, decrypt, encrypt + + +@pytest.fixture() +def fernet_key(monkeypatch) -> str: + key = Fernet.generate_key().decode() + monkeypatch.setenv("MIMIC_ENCRYPTION_KEY", key) + return key + + +@pytest.fixture() +def no_key(monkeypatch): + monkeypatch.delenv("MIMIC_ENCRYPTION_KEY", raising=False) + + +class TestEncryptDecrypt: + def test_round_trip(self, fernet_key): + plaintext = "s3cr3t-api-token" + ciphertext = encrypt(plaintext) + assert ciphertext != plaintext + assert decrypt(ciphertext) == plaintext + + def test_different_tokens_for_same_input(self, fernet_key): + # Fernet tokens are non-deterministic (random IV). + t1 = encrypt("same") + t2 = encrypt("same") + assert t1 != t2 + assert decrypt(t1) == decrypt(t2) == "same" + + def test_decrypt_invalid_ciphertext(self, fernet_key): + with pytest.raises(ValueError): + decrypt("not-valid-fernet-token") + + +class TestKeyAbsent: + def test_encrypt_raises_c2disabled(self, no_key): + with pytest.raises(C2Disabled): + encrypt("anything") + + def test_decrypt_raises_c2disabled(self, no_key): + with pytest.raises(C2Disabled): + decrypt("anything") + + def test_c2disabled_message(self, no_key): + with pytest.raises(C2Disabled, match="MIMIC_ENCRYPTION_KEY"): + encrypt("x") diff --git a/backend/tests/test_migration_0006_c2.py b/backend/tests/test_migration_0006_c2.py new file mode 100644 index 0000000..e06277e --- /dev/null +++ b/backend/tests/test_migration_0006_c2.py @@ -0,0 +1,204 @@ +"""Migration round-trip test for 0006_c2_layer. + +Verifies that upgrade() creates c2_config and c2_task with the expected schema, +and that downgrade() removes both tables cleanly. + +Uses the resolved-path pattern (derives path from __file__) to avoid the +hardcoded-path regression documented in lessons.md Sprint 4. +""" +from __future__ import annotations + +import importlib.util +from pathlib import Path + +from alembic.operations import Operations +from alembic.runtime.migration import MigrationContext +from sqlalchemy import create_engine, inspect, text + + +def _load_migration(): + versions_dir = Path(__file__).resolve().parent.parent / "migrations" / "versions" + path = versions_dir / "0006_c2_layer.py" + spec = importlib.util.spec_from_file_location("migration_0006", path) + assert spec and spec.loader + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) # type: ignore[union-attr] + return mod + + +def _fresh_engine(): + """In-memory SQLite with the tables that 0006 depends on already present.""" + engine = create_engine("sqlite:///:memory:") + with engine.begin() as conn: + conn.execute( + text( + """ + CREATE TABLE users ( + id INTEGER PRIMARY KEY, + username TEXT NOT NULL UNIQUE, + password_hash TEXT NOT NULL, + role TEXT NOT NULL, + created_at DATETIME NOT NULL + ) + """ + ) + ) + conn.execute( + text( + """ + CREATE TABLE engagements ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + description TEXT, + start_date DATE NOT NULL, + end_date DATE, + status TEXT NOT NULL DEFAULT 'planned', + created_at DATETIME NOT NULL, + created_by_id INTEGER NOT NULL REFERENCES users(id) + ) + """ + ) + ) + conn.execute( + text( + """ + CREATE TABLE simulations ( + id INTEGER PRIMARY KEY, + engagement_id INTEGER NOT NULL REFERENCES engagements(id), + name TEXT NOT NULL, + techniques JSON NOT NULL DEFAULT '[]', + tactic_ids JSON NOT NULL DEFAULT '[]', + description TEXT, + commands TEXT, + prerequisites TEXT, + executed_at DATETIME, + execution_result TEXT, + log_source TEXT, + logs TEXT, + soc_comment TEXT, + incident_number TEXT, + status TEXT NOT NULL DEFAULT 'pending', + created_at DATETIME NOT NULL, + updated_at DATETIME, + created_by_id INTEGER NOT NULL REFERENCES users(id) + ) + """ + ) + ) + return engine + + +def _run_upgrade(engine, migration_mod): + with engine.begin() as conn: + ctx = MigrationContext.configure(conn) + ops = Operations(ctx) + # Patch op module for the migration + import alembic.op as alembic_op + original_proxy = alembic_op._proxy # type: ignore[attr-defined] + alembic_op._proxy = ops # type: ignore[attr-defined] + try: + migration_mod.upgrade() + finally: + alembic_op._proxy = original_proxy # type: ignore[attr-defined] + + +def _run_downgrade(engine, migration_mod): + with engine.begin() as conn: + ctx = MigrationContext.configure(conn) + ops = Operations(ctx) + import alembic.op as alembic_op + original_proxy = alembic_op._proxy # type: ignore[attr-defined] + alembic_op._proxy = ops # type: ignore[attr-defined] + try: + migration_mod.downgrade() + finally: + alembic_op._proxy = original_proxy # type: ignore[attr-defined] + + +class TestMigration0006Upgrade: + def test_c2_config_table_created(self): + engine = _fresh_engine() + mod = _load_migration() + _run_upgrade(engine, mod) + + insp = inspect(engine) + assert "c2_config" in insp.get_table_names() + + def test_c2_task_table_created(self): + engine = _fresh_engine() + mod = _load_migration() + _run_upgrade(engine, mod) + + insp = inspect(engine) + assert "c2_task" in insp.get_table_names() + + def test_c2_config_columns(self): + engine = _fresh_engine() + mod = _load_migration() + _run_upgrade(engine, mod) + + insp = inspect(engine) + cols = {c["name"] for c in insp.get_columns("c2_config")} + assert {"id", "engagement_id", "url", "api_token_encrypted", + "verify_tls", "created_at", "updated_at"} <= cols + + def test_c2_config_unique_constraint_on_engagement_id(self): + engine = _fresh_engine() + mod = _load_migration() + _run_upgrade(engine, mod) + + # Insert a user and engagement first. + with engine.begin() as conn: + conn.execute(text( + "INSERT INTO users (id, username, password_hash, role, created_at) " + "VALUES (1, 'u', 'h', 'admin', '2026-01-01')" + )) + conn.execute(text( + "INSERT INTO engagements (id, name, start_date, status, created_at, created_by_id) " + "VALUES (1, 'Op', '2026-01-01', 'planned', '2026-01-01', 1)" + )) + conn.execute(text( + "INSERT INTO c2_config (engagement_id, url, api_token_encrypted, verify_tls, created_at) " + "VALUES (1, 'https://c2', 'tok', 1, '2026-01-01')" + )) + # Second insert on same engagement_id must fail. + try: + conn.execute(text( + "INSERT INTO c2_config (engagement_id, url, api_token_encrypted, verify_tls, created_at) " + "VALUES (1, 'https://c2b', 'tok2', 1, '2026-01-01')" + )) + raised = False + except Exception: + raised = True + assert raised, "UNIQUE constraint on c2_config.engagement_id must be enforced" + + def test_c2_task_columns(self): + engine = _fresh_engine() + mod = _load_migration() + _run_upgrade(engine, mod) + + insp = inspect(engine) + cols = {c["name"] for c in insp.get_columns("c2_task")} + assert {"id", "simulation_id", "mythic_task_display_id", "callback_display_id", + "command", "params", "status", "completed", "output", "source", + "created_at", "completed_at"} <= cols + + +class TestMigration0006Downgrade: + def test_downgrade_removes_c2_config(self): + engine = _fresh_engine() + mod = _load_migration() + _run_upgrade(engine, mod) + _run_downgrade(engine, mod) + + insp = inspect(engine) + assert "c2_config" not in insp.get_table_names() + + def test_downgrade_removes_c2_task(self): + engine = _fresh_engine() + mod = _load_migration() + _run_upgrade(engine, mod) + _run_downgrade(engine, mod) + + insp = inspect(engine) + assert "c2_task" not in insp.get_table_names()