Files
mimic/backend/tests/test_c2_config.py
Knacky 9a9c98beab feat(backend): c2 crypto + config CRUD + adapter scaffolding (sprint 8 M1)
- Add Fernet crypto service (MIMIC_ENCRYPTION_KEY env, C2Disabled on absent key)
- Add Alembic migration 0006: c2_config + c2_task tables with cascade FKs
- Add C2Config and C2Task SQLAlchemy models
- Add C2Adapter ABC with dataclasses (C2Health, C2Callback, C2TaskStatus, C2TaskPage)
- Add FakeAdapter (deterministic in-memory, MIMIC_C2_ADAPTER=fake)
- Add MythicAdapter scaffold: test_connection() live, M2+ raise NotImplementedError
- Add decode_response_text() helper for base64/binary Mythic responses
- Add GET/PUT/DELETE/POST-test /api/engagements/<id>/c2-config endpoints
- RBAC: admin+redteam OK, SOC 403; 503 guard when encryption key absent
- Token never returned in API responses; stored Fernet-encrypted only
- 42 new tests (300 total, 258 baseline preserved green)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-10 19:20:52 +02:00

353 lines
12 KiB
Python

"""Tests for C2 config CRUD endpoints.
Covers:
- GET 404 when no config exists
- PUT create (api_token required)
- PUT update with omitted token keeps old ciphertext
- GET 200 returns has_token=True, never cleartext
- DELETE 204
- Cascade delete when engagement is deleted
- RBAC: admin OK / redteam OK / SOC 403 on all 4 endpoints
- 503 guard when MIMIC_ENCRYPTION_KEY is unset
- POST /test with fake adapter
"""
from __future__ import annotations
import pytest
from cryptography.fernet import Fernet
from flask import Flask
from flask.testing import FlaskClient
from backend.app.models.c2_config import C2Config
from backend.tests.conftest import auth_headers as _h
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
_FERNET_KEY = Fernet.generate_key().decode()
@pytest.fixture(autouse=True)
def set_encryption_key(monkeypatch):
"""Default: key is present. Individual tests can override."""
monkeypatch.setenv("MIMIC_ENCRYPTION_KEY", _FERNET_KEY)
@pytest.fixture(autouse=True)
def use_fake_adapter(monkeypatch):
monkeypatch.setenv("MIMIC_C2_ADAPTER", "fake")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_engagement(client: FlaskClient, token: str) -> dict:
resp = client.post(
"/api/engagements",
headers=_h(token),
json={"name": "Op Alpha", "start_date": "2026-06-10"},
)
assert resp.status_code == 201, resp.get_json()
return resp.get_json()
def _put_config(
client: FlaskClient,
token: str,
eid: int,
*,
url: str = "https://c2.internal:7443",
api_token: str | None = "s3cr3t",
verify_tls: bool = True,
) -> dict:
payload: dict = {"url": url, "verify_tls": verify_tls}
if api_token is not None:
payload["api_token"] = api_token
resp = client.put(
f"/api/engagements/{eid}/c2-config",
headers=_h(token),
json=payload,
)
return resp
# ---------------------------------------------------------------------------
# GET — 404 when no config
# ---------------------------------------------------------------------------
def test_get_config_not_found(client: FlaskClient, admin_token: str) -> None:
eng = _make_engagement(client, admin_token)
resp = client.get(f"/api/engagements/{eng['id']}/c2-config", headers=_h(admin_token))
assert resp.status_code == 404
def test_get_config_engagement_not_found(client: FlaskClient, admin_token: str) -> None:
resp = client.get("/api/engagements/9999/c2-config", headers=_h(admin_token))
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# PUT — create
# ---------------------------------------------------------------------------
def test_put_creates_config(client: FlaskClient, admin_token: str) -> None:
eng = _make_engagement(client, admin_token)
resp = _put_config(client, admin_token, eng["id"])
assert resp.status_code == 200
body = resp.get_json()
assert body["has_token"] is True
assert body["url"] == "https://c2.internal:7443"
assert body["verify_tls"] is True
def test_put_create_requires_api_token(client: FlaskClient, admin_token: str) -> None:
eng = _make_engagement(client, admin_token)
resp = _put_config(client, admin_token, eng["id"], api_token=None)
assert resp.status_code == 400
def test_put_create_requires_url(client: FlaskClient, admin_token: str) -> None:
eng = _make_engagement(client, admin_token)
resp = client.put(
f"/api/engagements/{eng['id']}/c2-config",
headers=_h(admin_token),
json={"api_token": "tok", "verify_tls": True},
)
assert resp.status_code == 400
# ---------------------------------------------------------------------------
# PUT — update, omitting api_token preserves old ciphertext
# ---------------------------------------------------------------------------
def test_put_update_omits_token_keeps_old(
app: Flask, client: FlaskClient, admin_token: str
) -> None:
eng = _make_engagement(client, admin_token)
_put_config(client, admin_token, eng["id"], api_token="original-token")
# Read ciphertext from DB before update.
with app.app_context():
cfg = C2Config.query.filter_by(engagement_id=eng["id"]).first()
assert cfg is not None
old_cipher = cfg.api_token_encrypted
# Update URL, omit api_token.
resp = _put_config(
client, admin_token, eng["id"],
url="https://new.internal:7443", api_token=None,
)
assert resp.status_code == 200
with app.app_context():
cfg = C2Config.query.filter_by(engagement_id=eng["id"]).first()
assert cfg is not None
assert cfg.api_token_encrypted == old_cipher
assert cfg.url == "https://new.internal:7443"
def test_put_update_with_token_replaces_ciphertext(
app: Flask, client: FlaskClient, admin_token: str
) -> None:
eng = _make_engagement(client, admin_token)
_put_config(client, admin_token, eng["id"], api_token="original-token")
with app.app_context():
cfg = C2Config.query.filter_by(engagement_id=eng["id"]).first()
assert cfg is not None
old_cipher = cfg.api_token_encrypted
_put_config(client, admin_token, eng["id"], api_token="new-token")
with app.app_context():
cfg = C2Config.query.filter_by(engagement_id=eng["id"]).first()
assert cfg is not None
assert cfg.api_token_encrypted != old_cipher
# ---------------------------------------------------------------------------
# GET — 200, has_token=True, never cleartext
# ---------------------------------------------------------------------------
def test_get_config_returns_has_token_not_cleartext(
client: FlaskClient, admin_token: str
) -> None:
eng = _make_engagement(client, admin_token)
_put_config(client, admin_token, eng["id"], api_token="s3cr3t")
resp = client.get(f"/api/engagements/{eng['id']}/c2-config", headers=_h(admin_token))
assert resp.status_code == 200
body = resp.get_json()
assert body["has_token"] is True
assert "api_token" not in body
assert "api_token_encrypted" not in body
assert "s3cr3t" not in str(body)
def test_get_config_verify_tls_default_true(
client: FlaskClient, admin_token: str
) -> None:
eng = _make_engagement(client, admin_token)
_put_config(client, admin_token, eng["id"])
resp = client.get(f"/api/engagements/{eng['id']}/c2-config", headers=_h(admin_token))
assert resp.get_json()["verify_tls"] is True
# ---------------------------------------------------------------------------
# DELETE — 204
# ---------------------------------------------------------------------------
def test_delete_config_204(client: FlaskClient, admin_token: str) -> None:
eng = _make_engagement(client, admin_token)
_put_config(client, admin_token, eng["id"])
resp = client.delete(
f"/api/engagements/{eng['id']}/c2-config", headers=_h(admin_token)
)
assert resp.status_code == 204
# Subsequent GET returns 404.
resp2 = client.get(f"/api/engagements/{eng['id']}/c2-config", headers=_h(admin_token))
assert resp2.status_code == 404
def test_delete_config_not_found(client: FlaskClient, admin_token: str) -> None:
eng = _make_engagement(client, admin_token)
resp = client.delete(
f"/api/engagements/{eng['id']}/c2-config", headers=_h(admin_token)
)
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# CASCADE — delete engagement removes config
# ---------------------------------------------------------------------------
def test_cascade_delete_engagement_removes_config(
app: Flask, client: FlaskClient, admin_token: str
) -> None:
eng = _make_engagement(client, admin_token)
_put_config(client, admin_token, eng["id"])
with app.app_context():
assert C2Config.query.filter_by(engagement_id=eng["id"]).count() == 1
client.delete(f"/api/engagements/{eng['id']}", headers=_h(admin_token))
with app.app_context():
assert C2Config.query.filter_by(engagement_id=eng["id"]).count() == 0
# ---------------------------------------------------------------------------
# RBAC matrix
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("method,path_suffix", [
("GET", "/c2-config"),
("PUT", "/c2-config"),
("DELETE", "/c2-config"),
("POST", "/c2-config/test"),
])
def test_soc_gets_403(
client: FlaskClient, admin_token: str, soc_token: str,
method: str, path_suffix: str,
) -> None:
eng = _make_engagement(client, admin_token)
url = f"/api/engagements/{eng['id']}{path_suffix}"
resp = getattr(client, method.lower())(url, headers=_h(soc_token), json={})
assert resp.status_code == 403
@pytest.mark.parametrize("method,path_suffix", [
("GET", "/c2-config"),
("DELETE", "/c2-config"),
("POST", "/c2-config/test"),
])
def test_redteam_gets_allowed(
client: FlaskClient, admin_token: str, redteam_token: str,
method: str, path_suffix: str,
) -> None:
eng = _make_engagement(client, admin_token)
# Ensure config exists for GET/DELETE/test.
_put_config(client, admin_token, eng["id"])
url = f"/api/engagements/{eng['id']}{path_suffix}"
resp = getattr(client, method.lower())(url, headers=_h(redteam_token), json={})
# Not 403 and not 401.
assert resp.status_code not in (401, 403)
def test_redteam_can_put_config(
client: FlaskClient, admin_token: str, redteam_token: str,
) -> None:
eng = _make_engagement(client, admin_token)
resp = _put_config(client, redteam_token, eng["id"])
assert resp.status_code == 200
# ---------------------------------------------------------------------------
# 503 guard when MIMIC_ENCRYPTION_KEY is unset
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("method,path_suffix", [
("GET", "/c2-config"),
("PUT", "/c2-config"),
("DELETE", "/c2-config"),
("POST", "/c2-config/test"),
])
def test_503_when_key_unset(
monkeypatch,
client: FlaskClient,
admin_token: str,
method: str,
path_suffix: str,
) -> None:
monkeypatch.delenv("MIMIC_ENCRYPTION_KEY", raising=False)
eng = _make_engagement(client, admin_token)
url = f"/api/engagements/{eng['id']}{path_suffix}"
resp = getattr(client, method.lower())(url, headers=_h(admin_token), json={
"url": "https://c2", "api_token": "tok", "verify_tls": True,
})
assert resp.status_code == 503
assert "MIMIC_ENCRYPTION_KEY" in resp.get_json().get("error", "")
# ---------------------------------------------------------------------------
# POST /test — connectivity check via fake adapter
# ---------------------------------------------------------------------------
def test_post_test_returns_ok_true(client: FlaskClient, admin_token: str) -> None:
eng = _make_engagement(client, admin_token)
_put_config(client, admin_token, eng["id"])
resp = client.post(
f"/api/engagements/{eng['id']}/c2-config/test",
headers=_h(admin_token),
json={},
)
assert resp.status_code == 200
body = resp.get_json()
assert body["ok"] is True
assert body["error"] is None
def test_post_test_no_config_returns_404(client: FlaskClient, admin_token: str) -> None:
eng = _make_engagement(client, admin_token)
resp = client.post(
f"/api/engagements/{eng['id']}/c2-config/test",
headers=_h(admin_token),
json={},
)
assert resp.status_code == 404