diff --git a/backend/tests/test_export_engagement.py b/backend/tests/test_export_engagement.py new file mode 100644 index 0000000..d264543 --- /dev/null +++ b/backend/tests/test_export_engagement.py @@ -0,0 +1,266 @@ +"""Endpoint tests for GET /api/engagements//export.""" +from __future__ import annotations + +from datetime import date + +from flask.testing import FlaskClient + +from backend.app.extensions import db +from backend.app.models import Engagement, EngagementStatus, User +from backend.app.models.simulation import Simulation, SimulationStatus +from backend.app.services.export import _export_filename +from backend.tests.conftest import auth_headers as _h + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_engagement(client: FlaskClient, token: str, name: str = "Op Alpha") -> dict: + resp = client.post( + "/api/engagements", + headers=_h(token), + json={"name": name, "start_date": "2026-06-01"}, + ) + assert resp.status_code == 201, resp.get_json() + return resp.get_json() + + +def _make_sim(client: FlaskClient, token: str, eid: int, name: str = "Sim One") -> dict: + resp = client.post( + f"/api/engagements/{eid}/simulations", + headers=_h(token), + json={"name": name}, + ) + assert resp.status_code == 201, resp.get_json() + return resp.get_json() + + +def _export(client: FlaskClient, token: str, eid: int, fmt: str): + return client.get( + f"/api/engagements/{eid}/export?format={fmt}", + headers=_h(token), + ) + + +# --------------------------------------------------------------------------- +# RBAC +# --------------------------------------------------------------------------- + + +def test_export_markdown_admin_returns_md_with_engagement_header_and_all_simulations( + client: FlaskClient, admin_token: str +) -> None: + eng = _make_engagement(client, admin_token) + _make_sim(client, admin_token, eng["id"], "Lateral Movement") + _make_sim(client, admin_token, eng["id"], "Persistence Check") + + resp = _export(client, admin_token, eng["id"], "md") + assert resp.status_code == 200 + assert "text/markdown" in resp.content_type + body = resp.data.decode() + assert "Op Alpha" in body + assert "Lateral Movement" in body + assert "Persistence Check" in body + + +def test_export_markdown_redteam_ok( + client: FlaskClient, redteam_token: str +) -> None: + eng = _make_engagement(client, redteam_token) + resp = _export(client, redteam_token, eng["id"], "md") + assert resp.status_code == 200 + + +def test_export_markdown_soc_403( + client: FlaskClient, soc_token: str, admin_token: str +) -> None: + eng = _make_engagement(client, admin_token) + resp = _export(client, soc_token, eng["id"], "md") + assert resp.status_code == 403 + + +def test_export_unauthenticated_401(client: FlaskClient, admin_token: str) -> None: + eng = _make_engagement(client, admin_token) + resp = client.get(f"/api/engagements/{eng['id']}/export?format=md") + assert resp.status_code == 401 + + +# --------------------------------------------------------------------------- +# CSV +# --------------------------------------------------------------------------- + + +def test_export_csv_returns_csv_with_one_row_per_simulation( + client: FlaskClient, admin_token: str +) -> None: + eng = _make_engagement(client, admin_token) + _make_sim(client, admin_token, eng["id"], "S1") + _make_sim(client, admin_token, eng["id"], "S2") + + resp = _export(client, admin_token, eng["id"], "csv") + assert resp.status_code == 200 + assert "text/csv" in resp.content_type + + import csv as csv_mod + import io + + rows = list(csv_mod.reader(io.StringIO(resp.data.decode()))) + # 1 header + 2 simulations + assert len(rows) == 3 + + +def test_export_csv_columns_match_contract( + client: FlaskClient, admin_token: str +) -> None: + eng = _make_engagement(client, admin_token) + resp = _export(client, admin_token, eng["id"], "csv") + assert resp.status_code == 200 + + import csv as csv_mod + import io + + rows = list(csv_mod.reader(io.StringIO(resp.data.decode()))) + expected_headers = [ + "id", "name", "status", "techniques", "tactics", "description", + "commands", "prerequisites", "executed_at", "execution_result", + "log_source", "logs", "soc_comment", "incident_number", + "created_at", "updated_at", + ] + assert rows[0] == expected_headers + + +def test_export_csv_escapes_special_characters( + client: FlaskClient, admin_token: str, app +) -> None: + eng = _make_engagement(client, admin_token) + + with app.app_context(): + from backend.app.models import User + + admin = User.query.filter_by(username="admin1").first() + sim = Simulation( + engagement_id=eng["id"], + name='Sim "quoted"', + commands='cmd1, cmd2\nnewline "here"', + status=SimulationStatus.PENDING, + created_by_id=admin.id, + ) + db.session.add(sim) + db.session.commit() + + resp = _export(client, admin_token, eng["id"], "csv") + assert resp.status_code == 200 + body = resp.data.decode() + # csv.writer must have quoted the fields — no raw unquoted double-quotes breaking rows + import csv as csv_mod + import io + + rows = list(csv_mod.reader(io.StringIO(body))) + assert len(rows) == 2 # header + 1 sim + name_col = rows[1][1] + assert "quoted" in name_col + + +# --------------------------------------------------------------------------- +# PDF +# --------------------------------------------------------------------------- + + +def test_export_pdf_returns_pdf_magic_bytes_and_non_empty( + client: FlaskClient, admin_token: str +) -> None: + eng = _make_engagement(client, admin_token) + _make_sim(client, admin_token, eng["id"], "S1") + + resp = _export(client, admin_token, eng["id"], "pdf") + assert resp.status_code == 200 + assert resp.content_type == "application/pdf" + assert resp.data[:4] == b"%PDF" + assert len(resp.data) > 1024 + + +# --------------------------------------------------------------------------- +# 400 / 404 +# --------------------------------------------------------------------------- + + +def test_export_unknown_format_400( + client: FlaskClient, admin_token: str +) -> None: + eng = _make_engagement(client, admin_token) + resp = client.get( + f"/api/engagements/{eng['id']}/export?format=xml", + headers=_h(admin_token), + ) + assert resp.status_code == 400 + assert "format must be one of" in resp.get_json()["error"] + + +def test_export_missing_format_400( + client: FlaskClient, admin_token: str +) -> None: + eng = _make_engagement(client, admin_token) + resp = client.get( + f"/api/engagements/{eng['id']}/export", + headers=_h(admin_token), + ) + assert resp.status_code == 400 + assert "format must be one of" in resp.get_json()["error"] + + +def test_export_unknown_engagement_404( + client: FlaskClient, admin_token: str +) -> None: + resp = client.get( + "/api/engagements/99999/export?format=md", + headers=_h(admin_token), + ) + assert resp.status_code == 404 + + +# --------------------------------------------------------------------------- +# Edge cases +# --------------------------------------------------------------------------- + + +def test_export_engagement_with_zero_simulations_renders_header_only( + client: FlaskClient, admin_token: str +) -> None: + eng = _make_engagement(client, admin_token, "Empty Engagement") + + resp_md = _export(client, admin_token, eng["id"], "md") + assert resp_md.status_code == 200 + assert "Empty Engagement" in resp_md.data.decode() + + resp_csv = _export(client, admin_token, eng["id"], "csv") + assert resp_csv.status_code == 200 + import csv as csv_mod + import io + + rows = list(csv_mod.reader(io.StringIO(resp_csv.data.decode()))) + assert len(rows) == 1 # header only + + resp_pdf = _export(client, admin_token, eng["id"], "pdf") + assert resp_pdf.status_code == 200 + assert resp_pdf.data[:4] == b"%PDF" + + +def test_export_filename_slugifies_name_and_carries_date(app, admin_user: User) -> None: + with app.app_context(): + eng = Engagement( + name="Opération Spéciale!", + start_date=date(2026, 6, 1), + status=EngagementStatus.PLANNED, + created_by_id=admin_user.id, + ) + db.session.add(eng) + db.session.commit() + + fname = _export_filename(eng, "md") + from datetime import date as _date + + today = _date.today().strftime("%Y%m%d") + assert fname.startswith(f"engagement-{eng.id}-") + assert "operation-speciale" in fname + assert fname.endswith(f"-{today}.md") diff --git a/backend/tests/test_export_render.py b/backend/tests/test_export_render.py new file mode 100644 index 0000000..1194c92 --- /dev/null +++ b/backend/tests/test_export_render.py @@ -0,0 +1,204 @@ +"""Unit tests for render functions in backend.app.services.export.""" +from __future__ import annotations + +from datetime import UTC, datetime +from types import SimpleNamespace +from typing import Any + +from backend.app.services.export import ( + render_engagement_csv, + render_engagement_markdown, + render_engagement_pdf, +) + +# --------------------------------------------------------------------------- +# Fixtures / factories +# --------------------------------------------------------------------------- + + +def _make_engagement(**kw) -> Any: + from datetime import date + + defaults: dict[str, Any] = { + "id": 1, + "name": "Test Engagement", + "description": "A purple team exercise", + "start_date": date(2026, 6, 1), + "end_date": date(2026, 6, 30), + "status": SimpleNamespace(value="active"), + "created_at": datetime(2026, 6, 1, 10, 0, tzinfo=UTC), + "created_by": SimpleNamespace(username="alice"), + } + defaults.update(kw) + return SimpleNamespace(**defaults) + + +def _make_sim(sid: int = 1, name: str = "Sim Alpha", **kw) -> Any: + defaults: dict[str, Any] = { + "id": sid, + "name": name, + "status": SimpleNamespace(value="pending"), + "techniques": [{"id": "T1059", "name": "Command and Scripting Interpreter"}], + "tactic_ids": ["TA0002"], + "description": "Execute a script", + "commands": "whoami", + "prerequisites": "local admin", + "executed_at": None, + "execution_result": None, + "log_source": None, + "logs": None, + "soc_comment": None, + "incident_number": None, + "created_at": datetime(2026, 6, 1, 10, 0, tzinfo=UTC), + "updated_at": None, + "created_by": SimpleNamespace(username="bob"), + } + defaults.update(kw) + return SimpleNamespace(**defaults) + + +# --------------------------------------------------------------------------- +# Markdown tests +# --------------------------------------------------------------------------- + + +def test_render_engagement_markdown_includes_header_fields(app) -> None: + with app.app_context(): + eng = _make_engagement() + result = render_engagement_markdown(eng, []) + assert "Test Engagement" in result + assert "2026-06-01" in result + assert "2026-06-30" in result + assert "active" in result + assert "alice" in result + + +def test_render_engagement_markdown_lists_all_simulations_in_order(app) -> None: + with app.app_context(): + eng = _make_engagement() + sims = [_make_sim(1, "First Sim"), _make_sim(2, "Second Sim")] + result = render_engagement_markdown(eng, sims) + first_pos = result.index("First Sim") + second_pos = result.index("Second Sim") + assert first_pos < second_pos + + +def test_render_engagement_markdown_includes_techniques_with_id_and_name(app) -> None: + with app.app_context(): + eng = _make_engagement() + sim = _make_sim( + techniques=[{"id": "T1059", "name": "Command and Scripting Interpreter"}] + ) + result = render_engagement_markdown(eng, [sim]) + assert "T1059" in result + assert "Command and Scripting Interpreter" in result + + +def test_render_engagement_markdown_includes_tactics(app) -> None: + with app.app_context(): + eng = _make_engagement() + sim = _make_sim(tactic_ids=["TA0002"]) + result = render_engagement_markdown(eng, [sim]) + # TA0002 = Execution — should appear as tactic name or id + assert "TA0002" in result or "Execution" in result + + +def test_render_engagement_markdown_includes_soc_fields_even_when_blank(app) -> None: + with app.app_context(): + eng = _make_engagement() + sim = _make_sim(log_source=None, logs=None, soc_comment=None, incident_number=None) + result = render_engagement_markdown(eng, [sim]) + assert "Log source" in result + assert "SOC comment" in result + assert "Incident number" in result + + +def test_render_engagement_markdown_escapes_backticks_in_commands(app) -> None: + with app.app_context(): + eng = _make_engagement() + sim = _make_sim(commands="echo `whoami`") + result = render_engagement_markdown(eng, [sim]) + # Commands must be inside a fenced code block using ~~~ (tilde fences) + assert "~~~" in result + # The backtick content must still be present + assert "`whoami`" in result + + +# --------------------------------------------------------------------------- +# CSV tests +# --------------------------------------------------------------------------- + + +def test_render_engagement_csv_has_header_row(app) -> None: + with app.app_context(): + eng = _make_engagement() + result = render_engagement_csv(eng, []) + first_line = result.splitlines()[0] + assert "id" in first_line + assert "name" in first_line + assert "status" in first_line + assert "techniques" in first_line + + +def test_render_engagement_csv_joins_multi_techniques_with_pipe(app) -> None: + with app.app_context(): + import csv + import io + + eng = _make_engagement() + sim = _make_sim( + techniques=[ + {"id": "T1059", "name": "Command and Scripting Interpreter"}, + {"id": "T1078", "name": "Valid Accounts"}, + ] + ) + result = render_engagement_csv(eng, [sim]) + rows = list(csv.reader(io.StringIO(result))) + # row[3] = techniques column + tech_cell = rows[1][3] + assert "|" in tech_cell + assert "T1059" in tech_cell + assert "T1078" in tech_cell + + +# --------------------------------------------------------------------------- +# PDF test +# --------------------------------------------------------------------------- + + +def test_render_engagement_pdf_starts_with_pdf_magic(app) -> None: + with app.app_context(): + eng = _make_engagement() + sim = _make_sim() + result = render_engagement_pdf(eng, [sim]) + assert isinstance(result, bytes) + assert result[:4] == b"%PDF" + + +# --------------------------------------------------------------------------- +# MITRE bundle not loaded +# --------------------------------------------------------------------------- + + +def test_render_engagement_markdown_with_mitre_bundle_not_loaded_does_not_crash( + app, +) -> None: + with app.app_context(): + import backend.app.services.mitre as mitre_svc + + original = mitre_svc.mitre_loaded + original_tactics = mitre_svc._tactics_by_technique.copy() + try: + mitre_svc.mitre_loaded = False + mitre_svc._tactics_by_technique = {} + eng = _make_engagement() + sim = _make_sim( + techniques=[{"id": "T1059", "name": "Command and Scripting Interpreter"}], + tactic_ids=["TA0002"], + ) + result = render_engagement_markdown(eng, [sim]) + # Must not crash and must include the technique id + assert "T1059" in result + finally: + mitre_svc.mitre_loaded = original + mitre_svc._tactics_by_technique = original_tactics