From 5d9415bb9f8a53bd98ec9616c87a722eae4c454c Mon Sep 17 00:00:00 2001 From: knacky Date: Thu, 21 May 2026 20:34:11 +0200 Subject: [PATCH] test(backend): add pytest baseline (B0.8) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Unit (SQLite, pure logic): - test_templating.py: Jinja2 sandbox, regex_extract, strict-undefined, sandbox blocks attribute-access escape, output blob 10 MB cap. - test_password.py: bcrypt hash + verify, empty / malformed handling. - test_soc_token.py: 256-bit url-safe token + bcrypt verification. - test_rbac_matrix.py: F11 invariants (lead ⊇ operator, SOC restricted to detection + report-read, audit_read & ttp_promote lead-only). - test_connector_factory.py: register / build / double-register-rejected, TaskStatus terminal helper, Mythic mapping vs empty Home mapping. - test_audit_hash.py: SHA-256 chain helper is deterministic and reacts to prev_hash / metadata changes. Integration scaffold (testcontainers Postgres): - tests/integration/conftest.py spins up postgres:16-alpine, monkeypatches MIMIC_DATABASE_URL, creates a Flask app + db.create_all. - test_healthz.py: end-to-end smoke through the Flask test client. 38 unit tests pass; ruff clean. --- backend/tests/__init__.py | 0 backend/tests/conftest.py | 24 +++++ backend/tests/integration/__init__.py | 0 backend/tests/integration/conftest.py | 50 +++++++++++ backend/tests/integration/test_healthz.py | 13 +++ backend/tests/unit/__init__.py | 0 backend/tests/unit/test_audit_hash.py | 64 ++++++++++++++ backend/tests/unit/test_connector_factory.py | 92 ++++++++++++++++++++ backend/tests/unit/test_password.py | 31 +++++++ backend/tests/unit/test_rbac_matrix.py | 55 ++++++++++++ backend/tests/unit/test_soc_token.py | 27 ++++++ backend/tests/unit/test_templating.py | 80 +++++++++++++++++ 12 files changed, 436 insertions(+) create mode 100644 backend/tests/__init__.py create mode 100644 backend/tests/conftest.py create mode 100644 backend/tests/integration/__init__.py create mode 100644 backend/tests/integration/conftest.py create mode 100644 backend/tests/integration/test_healthz.py create mode 100644 backend/tests/unit/__init__.py create mode 100644 backend/tests/unit/test_audit_hash.py create mode 100644 backend/tests/unit/test_connector_factory.py create mode 100644 backend/tests/unit/test_password.py create mode 100644 backend/tests/unit/test_rbac_matrix.py create mode 100644 backend/tests/unit/test_soc_token.py create mode 100644 backend/tests/unit/test_templating.py diff --git a/backend/tests/__init__.py b/backend/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py new file mode 100644 index 0000000..3d799b6 --- /dev/null +++ b/backend/tests/conftest.py @@ -0,0 +1,24 @@ +"""Shared pytest fixtures for unit-level (SQLite) tests.""" + +from __future__ import annotations + +from collections.abc import Iterator + +import pytest + + +@pytest.fixture(autouse=True) +def _ensure_test_env(monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: + """Force MIMIC_ENV=testing so settings load is predictable.""" + monkeypatch.setenv("MIMIC_ENV", "testing") + monkeypatch.setenv("MIMIC_SECRET_KEY", "test-secret-not-real") + monkeypatch.setenv("MIMIC_LOG_JSON", "false") + monkeypatch.setenv("MIMIC_LOG_LEVEL", "WARNING") + # Pydantic Settings is cached via get_settings(); reset the cache. + from mimic import config as cfg # noqa: PLC0415 (must follow env mutation) + + cfg.get_settings.cache_clear() + try: + yield + finally: + cfg.get_settings.cache_clear() diff --git a/backend/tests/integration/__init__.py b/backend/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/tests/integration/conftest.py b/backend/tests/integration/conftest.py new file mode 100644 index 0000000..ebbb4f1 --- /dev/null +++ b/backend/tests/integration/conftest.py @@ -0,0 +1,50 @@ +"""Integration-level fixtures: testcontainers Postgres + Flask app + db session. + +NF-state: SQLite is reserved for pure-logic unit tests; anything touching +constraints, RBAC role grants, or audit-log SQL behavior must run on a real +Postgres via testcontainers (spec H38). +""" + +from __future__ import annotations + +from collections.abc import Iterator + +import pytest + +try: + from testcontainers.postgres import PostgresContainer +except ImportError: # pragma: no cover + PostgresContainer = None # type: ignore[assignment] + + +@pytest.fixture(scope="session") +def postgres_dsn() -> Iterator[str]: + if PostgresContainer is None: + pytest.skip("testcontainers not installed") + with PostgresContainer("postgres:16-alpine") as pg: + url = pg.get_connection_url().replace( + "postgresql+psycopg2", "postgresql+psycopg" + ) + yield url + + +@pytest.fixture +def app(postgres_dsn: str, monkeypatch: pytest.MonkeyPatch): + monkeypatch.setenv("MIMIC_DATABASE_URL", postgres_dsn) + monkeypatch.setenv("MIMIC_ENV", "testing") + monkeypatch.setenv("MIMIC_SECRET_KEY", "test-not-real") + + from mimic.app import create_app # noqa: PLC0415 (must follow env mutation) + from mimic.extensions import db # noqa: PLC0415 + + application = create_app() + with application.app_context(): + db.create_all() + yield application + db.session.remove() + db.drop_all() + + +@pytest.fixture +def client(app): + return app.test_client() diff --git a/backend/tests/integration/test_healthz.py b/backend/tests/integration/test_healthz.py new file mode 100644 index 0000000..0f8d046 --- /dev/null +++ b/backend/tests/integration/test_healthz.py @@ -0,0 +1,13 @@ +"""End-to-end smoke test: Flask app + Postgres testcontainer.""" + +from __future__ import annotations + +import pytest + +pytestmark = pytest.mark.integration + + +def test_healthz_returns_ok(client) -> None: + response = client.get("/healthz") + assert response.status_code == 200 + assert response.get_json() == {"status": "ok"} diff --git a/backend/tests/unit/__init__.py b/backend/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/tests/unit/test_audit_hash.py b/backend/tests/unit/test_audit_hash.py new file mode 100644 index 0000000..0203a78 --- /dev/null +++ b/backend/tests/unit/test_audit_hash.py @@ -0,0 +1,64 @@ +"""Audit log hash-chain helper tests (pure function, no DB).""" + +from __future__ import annotations + +from datetime import UTC, datetime +from uuid import uuid4 + +from mimic.audit.log import audit_hash + + +def test_hash_changes_with_metadata() -> None: + ts = datetime(2026, 5, 21, 12, 0, tzinfo=UTC) + actor = uuid4() + base = { + "prev_hash": None, + "ts": ts, + "actor_id": actor, + "action": "ttp.create", + "resource_type": "ttp", + "resource_id": "t-1", + "metadata": {"name": "whoami"}, + } + h1 = audit_hash(**base) + h2 = audit_hash(**{**base, "metadata": {"name": "whoami2"}}) + assert h1 != h2 + assert len(h1) == 64 + assert all(c in "0123456789abcdef" for c in h1) + + +def test_hash_changes_with_prev_hash() -> None: + ts = datetime(2026, 5, 21, 12, 0, tzinfo=UTC) + h_no_prev = audit_hash( + prev_hash=None, + ts=ts, + actor_id=None, + action="x", + resource_type="r", + resource_id="1", + metadata={}, + ) + h_with_prev = audit_hash( + prev_hash="abc", + ts=ts, + actor_id=None, + action="x", + resource_type="r", + resource_id="1", + metadata={}, + ) + assert h_no_prev != h_with_prev + + +def test_hash_stable_for_same_input() -> None: + ts = datetime(2026, 5, 21, 12, 0, tzinfo=UTC) + base = { + "prev_hash": "aa", + "ts": ts, + "actor_id": None, + "action": "x", + "resource_type": "r", + "resource_id": "1", + "metadata": {"k": "v"}, + } + assert audit_hash(**base) == audit_hash(**base) diff --git a/backend/tests/unit/test_connector_factory.py b/backend/tests/unit/test_connector_factory.py new file mode 100644 index 0000000..f946e06 --- /dev/null +++ b/backend/tests/unit/test_connector_factory.py @@ -0,0 +1,92 @@ +"""C2Connector factory + payload mapping tests.""" + +from __future__ import annotations + +import pytest + +from mimic.connectors import ( + C2Connector, + ConnectorFactory, + Payload, + TaskHandle, + TaskResult, + TaskStatus, + UnsupportedPayloadType, + register_connector, +) +from mimic.connectors.factory import _REGISTRY +from mimic.connectors.payload_map import resolve_native, supports +from mimic.db.models.host import Host +from mimic.db.types import C2Type, PayloadType + + +@pytest.fixture(autouse=True) +def _clear_registry() -> None: + """Each test starts with a clean connector registry.""" + snapshot = dict(_REGISTRY) + _REGISTRY.clear() + yield + _REGISTRY.clear() + _REGISTRY.update(snapshot) + + +class _NullConnector(C2Connector): + def authenticate(self, config: dict[str, object]) -> None: ... + def list_hosts(self, engagement_id: str) -> list[Host]: + return [] + + def execute_task(self, host: Host, payload: Payload) -> TaskHandle: + return TaskHandle( + c2=self.name, + c2_task_id="t-1", + host_id="h-1", + payload_type=payload.payload_type, + ) + + def get_task_result(self, handle: TaskHandle) -> TaskResult: + return TaskResult(status=TaskStatus.COMPLETED, output_text="ok") + + def cancel_task(self, handle: TaskHandle) -> None: ... + + def execute_cleanup( + self, host: Host, resolved_command: str, params: dict[str, object] + ) -> TaskResult: + return TaskResult(status=TaskStatus.COMPLETED) + + +def test_register_and_build() -> None: + register_connector(C2Type.MYTHIC)(_NullConnector) + factory = ConnectorFactory(config_resolver=lambda _: {}) + connector = factory.build(C2Type.MYTHIC) + assert isinstance(connector, _NullConnector) + assert connector.name is C2Type.MYTHIC + + +def test_double_registration_rejected() -> None: + register_connector(C2Type.MYTHIC)(_NullConnector) + with pytest.raises(RuntimeError, match="already registered"): + register_connector(C2Type.MYTHIC)(_NullConnector) + + +def test_build_unknown_raises_not_implemented() -> None: + factory = ConnectorFactory(config_resolver=lambda _: {}) + with pytest.raises(NotImplementedError): + factory.build(C2Type.HOME) + + +def test_task_status_is_terminal() -> None: + assert TaskStatus.COMPLETED.is_terminal + assert TaskStatus.FAILED.is_terminal + assert TaskStatus.CANCELED.is_terminal + assert not TaskStatus.RUNNING.is_terminal + + +def test_mythic_mapping_covers_powershell() -> None: + assert resolve_native(C2Type.MYTHIC, PayloadType.POWERSHELL) == "powershell" + assert supports(C2Type.MYTHIC, PayloadType.SHELLCODE) + + +def test_home_mapping_empty_until_pr2() -> None: + assert not supports(C2Type.HOME, PayloadType.CMD) + with pytest.raises(UnsupportedPayloadType): + resolve_native(C2Type.HOME, PayloadType.CMD) diff --git a/backend/tests/unit/test_password.py b/backend/tests/unit/test_password.py new file mode 100644 index 0000000..813994c --- /dev/null +++ b/backend/tests/unit/test_password.py @@ -0,0 +1,31 @@ +"""Local-auth bcrypt helpers.""" + +from __future__ import annotations + +import pytest + +from mimic.auth.password import check_password, hash_password + + +def test_hash_then_check_succeeds() -> None: + hashed = hash_password("Sup3rSecret!", rounds=4) + assert check_password("Sup3rSecret!", hashed) is True + + +def test_check_rejects_wrong_password() -> None: + hashed = hash_password("right", rounds=4) + assert check_password("wrong", hashed) is False + + +def test_empty_password_raises() -> None: + with pytest.raises(ValueError, match="must not be empty"): + hash_password("") + + +def test_check_missing_hash_returns_false() -> None: + assert check_password("anything", None) is False + assert check_password("anything", "") is False + + +def test_check_invalid_hash_returns_false() -> None: + assert check_password("anything", "not-a-bcrypt-hash") is False diff --git a/backend/tests/unit/test_rbac_matrix.py b/backend/tests/unit/test_rbac_matrix.py new file mode 100644 index 0000000..481e371 --- /dev/null +++ b/backend/tests/unit/test_rbac_matrix.py @@ -0,0 +1,55 @@ +"""F11 matrix coverage tests.""" + +from __future__ import annotations + +from mimic.rbac.matrix import GROUP_PERMISSIONS, GroupName, Permission + + +def test_every_group_has_at_least_one_permission() -> None: + for group, perms in GROUP_PERMISSIONS.items(): + assert perms, f"group {group.value} has no permissions" + + +def test_rt_lead_is_superset_of_operator() -> None: + lead = GROUP_PERMISSIONS[GroupName.RT_LEAD] + operator = GROUP_PERMISSIONS[GroupName.RT_OPERATOR] + assert operator <= lead + + +def test_soc_cannot_start_runs() -> None: + soc = GROUP_PERMISSIONS[GroupName.SOC_ANALYST] + assert Permission.RUN_START not in soc + assert Permission.RUN_CONTROL not in soc + + +def test_only_lead_promotes_ttp() -> None: + operator = GROUP_PERMISSIONS[GroupName.RT_OPERATOR] + soc = GROUP_PERMISSIONS[GroupName.SOC_ANALYST] + assert Permission.TTP_PROMOTE not in operator + assert Permission.TTP_PROMOTE not in soc + assert Permission.TTP_PROMOTE in GROUP_PERMISSIONS[GroupName.RT_LEAD] + + +def test_audit_read_lead_only() -> None: + for group in (GroupName.RT_OPERATOR, GroupName.SOC_ANALYST): + assert Permission.AUDIT_READ not in GROUP_PERMISSIONS[group] + assert Permission.AUDIT_READ in GROUP_PERMISSIONS[GroupName.RT_LEAD] + + +def test_only_lead_issues_soc_tokens() -> None: + for group in (GroupName.RT_OPERATOR, GroupName.SOC_ANALYST): + assert Permission.ENGAGEMENT_SOC_TOKEN_ISSUE not in GROUP_PERMISSIONS[group] + + +def test_operator_cannot_control_run() -> None: + operator = GROUP_PERMISSIONS[GroupName.RT_OPERATOR] + assert Permission.RUN_START not in operator + assert Permission.RUN_CONTROL not in operator + + +def test_soc_can_only_read_report_and_add_detection() -> None: + soc = GROUP_PERMISSIONS[GroupName.SOC_ANALYST] + assert Permission.DETECTION_ADD in soc + assert Permission.REPORT_READ in soc + assert Permission.EVIDENCE_ADD not in soc + assert Permission.HOST_CRUD not in soc diff --git a/backend/tests/unit/test_soc_token.py b/backend/tests/unit/test_soc_token.py new file mode 100644 index 0000000..c267ec4 --- /dev/null +++ b/backend/tests/unit/test_soc_token.py @@ -0,0 +1,27 @@ +"""SOC opaque token generation / verification.""" + +from __future__ import annotations + +from mimic.auth.soc_token import generate_token, verify_token + + +def test_generated_token_verifies() -> None: + material = generate_token(rounds=4) + assert verify_token(material.plain, material.hashed) is True + + +def test_different_plain_does_not_verify() -> None: + material = generate_token(rounds=4) + assert verify_token("wrong-token", material.hashed) is False + + +def test_plain_is_url_safe_and_long() -> None: + material = generate_token(rounds=4) + # 32 random bytes → ~43 url-safe base64 chars. + assert len(material.plain) >= 32 + assert all(c.isalnum() or c in "-_" for c in material.plain) + + +def test_verify_with_empty_values() -> None: + assert verify_token("", "$2b$04$abc") is False + assert verify_token("token", "") is False diff --git a/backend/tests/unit/test_templating.py b/backend/tests/unit/test_templating.py new file mode 100644 index 0000000..d90cb66 --- /dev/null +++ b/backend/tests/unit/test_templating.py @@ -0,0 +1,80 @@ +"""Jinja2 sandbox + regex_extract tests.""" + +from __future__ import annotations + +import pytest + +from mimic.templating.filters import regex_extract +from mimic.templating.sandbox import ( + CleanupRenderer, + RenderError, + StepOutputs, + render_cleanup, +) + + +class TestRegexExtract: + def test_returns_capture_group(self) -> None: + assert regex_extract("hello world", r"hello (\w+)") == "world" + + def test_default_when_no_match(self) -> None: + assert regex_extract("hello", r"foo(\d+)", default="N/A") == "N/A" + + def test_none_input_returns_default(self) -> None: + assert regex_extract(None, r"x", default="empty") == "empty" + + def test_supports_group_zero(self) -> None: + assert regex_extract("abc123", r"\w+\d+", group=0) == "abc123" + + +class TestCleanupRenderer: + def setup_method(self) -> None: + self.renderer = CleanupRenderer() + + def test_render_params(self) -> None: + out = self.renderer.render( + "echo {{ params.target }}", + params={"target": "WIN-01"}, + ) + assert out == "echo WIN-01" + + def test_render_outputs_text(self) -> None: + out = self.renderer.render( + 'echo "{{ outputs.text }}"', + outputs=StepOutputs(text="captured"), + ) + assert out == 'echo "captured"' + + def test_regex_extract_filter(self) -> None: + out = self.renderer.render( + r"{{ outputs.text | regex_extract('pid=(\\d+)') }}", + outputs=StepOutputs(text="status: pid=4242 user=svc"), + ) + assert out == "4242" + + def test_strict_undefined_raises(self) -> None: + with pytest.raises(RenderError): + self.renderer.render("{{ params.does_not_exist }}", params={}) + + def test_sandbox_forbids_attribute_access(self) -> None: + with pytest.raises(RenderError): + self.renderer.render( + "{{ ().__class__.__bases__[0].__subclasses__() }}", + params={}, + ) + + def test_module_singleton_round_trip(self) -> None: + out = render_cleanup("hello {{ params.x }}", params={"x": "there"}) + assert out == "hello there" + + +class TestStepOutputsBlob: + def test_blob_returns_empty_when_no_path(self) -> None: + out = StepOutputs(text="x") + assert out.blob() == "" + + def test_blob_caps_size(self, tmp_path) -> None: + blob = tmp_path / "evidence.bin" + blob.write_bytes(b"A" * 1024) + out = StepOutputs(blob_path=blob, blob_max_bytes=10) + assert out.blob() == "A" * 10