test: add export endpoint + render unit tests (226 → 249 passing)
- test_export_engagement.py: 13 endpoint tests — RBAC (admin/redteam ok, SOC 403, 401 unauthenticated), CSV column contract, CSV special char escaping, PDF magic bytes, 400 on missing/unknown format, 404 on missing engagement, zero-simulations edge case, filename slugification. - test_export_render.py: 10 unit tests on pure render functions — header fields, simulation order, techniques/tactics enrichment, SOC fields always rendered, backtick safety in commands, CSV header row, multi-technique pipe join, PDF magic bytes, MITRE bundle not loaded does not crash. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
266
backend/tests/test_export_engagement.py
Normal file
266
backend/tests/test_export_engagement.py
Normal file
@@ -0,0 +1,266 @@
|
||||
"""Endpoint tests for GET /api/engagements/<eid>/export."""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import date
|
||||
|
||||
from flask.testing import FlaskClient
|
||||
|
||||
from backend.app.extensions import db
|
||||
from backend.app.models import Engagement, EngagementStatus, User
|
||||
from backend.app.models.simulation import Simulation, SimulationStatus
|
||||
from backend.app.services.export import _export_filename
|
||||
from backend.tests.conftest import auth_headers as _h
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_engagement(client: FlaskClient, token: str, name: str = "Op Alpha") -> dict:
|
||||
resp = client.post(
|
||||
"/api/engagements",
|
||||
headers=_h(token),
|
||||
json={"name": name, "start_date": "2026-06-01"},
|
||||
)
|
||||
assert resp.status_code == 201, resp.get_json()
|
||||
return resp.get_json()
|
||||
|
||||
|
||||
def _make_sim(client: FlaskClient, token: str, eid: int, name: str = "Sim One") -> dict:
|
||||
resp = client.post(
|
||||
f"/api/engagements/{eid}/simulations",
|
||||
headers=_h(token),
|
||||
json={"name": name},
|
||||
)
|
||||
assert resp.status_code == 201, resp.get_json()
|
||||
return resp.get_json()
|
||||
|
||||
|
||||
def _export(client: FlaskClient, token: str, eid: int, fmt: str):
|
||||
return client.get(
|
||||
f"/api/engagements/{eid}/export?format={fmt}",
|
||||
headers=_h(token),
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# RBAC
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_export_markdown_admin_returns_md_with_engagement_header_and_all_simulations(
|
||||
client: FlaskClient, admin_token: str
|
||||
) -> None:
|
||||
eng = _make_engagement(client, admin_token)
|
||||
_make_sim(client, admin_token, eng["id"], "Lateral Movement")
|
||||
_make_sim(client, admin_token, eng["id"], "Persistence Check")
|
||||
|
||||
resp = _export(client, admin_token, eng["id"], "md")
|
||||
assert resp.status_code == 200
|
||||
assert "text/markdown" in resp.content_type
|
||||
body = resp.data.decode()
|
||||
assert "Op Alpha" in body
|
||||
assert "Lateral Movement" in body
|
||||
assert "Persistence Check" in body
|
||||
|
||||
|
||||
def test_export_markdown_redteam_ok(
|
||||
client: FlaskClient, redteam_token: str
|
||||
) -> None:
|
||||
eng = _make_engagement(client, redteam_token)
|
||||
resp = _export(client, redteam_token, eng["id"], "md")
|
||||
assert resp.status_code == 200
|
||||
|
||||
|
||||
def test_export_markdown_soc_403(
|
||||
client: FlaskClient, soc_token: str, admin_token: str
|
||||
) -> None:
|
||||
eng = _make_engagement(client, admin_token)
|
||||
resp = _export(client, soc_token, eng["id"], "md")
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
def test_export_unauthenticated_401(client: FlaskClient, admin_token: str) -> None:
|
||||
eng = _make_engagement(client, admin_token)
|
||||
resp = client.get(f"/api/engagements/{eng['id']}/export?format=md")
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CSV
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_export_csv_returns_csv_with_one_row_per_simulation(
|
||||
client: FlaskClient, admin_token: str
|
||||
) -> None:
|
||||
eng = _make_engagement(client, admin_token)
|
||||
_make_sim(client, admin_token, eng["id"], "S1")
|
||||
_make_sim(client, admin_token, eng["id"], "S2")
|
||||
|
||||
resp = _export(client, admin_token, eng["id"], "csv")
|
||||
assert resp.status_code == 200
|
||||
assert "text/csv" in resp.content_type
|
||||
|
||||
import csv as csv_mod
|
||||
import io
|
||||
|
||||
rows = list(csv_mod.reader(io.StringIO(resp.data.decode())))
|
||||
# 1 header + 2 simulations
|
||||
assert len(rows) == 3
|
||||
|
||||
|
||||
def test_export_csv_columns_match_contract(
|
||||
client: FlaskClient, admin_token: str
|
||||
) -> None:
|
||||
eng = _make_engagement(client, admin_token)
|
||||
resp = _export(client, admin_token, eng["id"], "csv")
|
||||
assert resp.status_code == 200
|
||||
|
||||
import csv as csv_mod
|
||||
import io
|
||||
|
||||
rows = list(csv_mod.reader(io.StringIO(resp.data.decode())))
|
||||
expected_headers = [
|
||||
"id", "name", "status", "techniques", "tactics", "description",
|
||||
"commands", "prerequisites", "executed_at", "execution_result",
|
||||
"log_source", "logs", "soc_comment", "incident_number",
|
||||
"created_at", "updated_at",
|
||||
]
|
||||
assert rows[0] == expected_headers
|
||||
|
||||
|
||||
def test_export_csv_escapes_special_characters(
|
||||
client: FlaskClient, admin_token: str, app
|
||||
) -> None:
|
||||
eng = _make_engagement(client, admin_token)
|
||||
|
||||
with app.app_context():
|
||||
from backend.app.models import User
|
||||
|
||||
admin = User.query.filter_by(username="admin1").first()
|
||||
sim = Simulation(
|
||||
engagement_id=eng["id"],
|
||||
name='Sim "quoted"',
|
||||
commands='cmd1, cmd2\nnewline "here"',
|
||||
status=SimulationStatus.PENDING,
|
||||
created_by_id=admin.id,
|
||||
)
|
||||
db.session.add(sim)
|
||||
db.session.commit()
|
||||
|
||||
resp = _export(client, admin_token, eng["id"], "csv")
|
||||
assert resp.status_code == 200
|
||||
body = resp.data.decode()
|
||||
# csv.writer must have quoted the fields — no raw unquoted double-quotes breaking rows
|
||||
import csv as csv_mod
|
||||
import io
|
||||
|
||||
rows = list(csv_mod.reader(io.StringIO(body)))
|
||||
assert len(rows) == 2 # header + 1 sim
|
||||
name_col = rows[1][1]
|
||||
assert "quoted" in name_col
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PDF
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_export_pdf_returns_pdf_magic_bytes_and_non_empty(
|
||||
client: FlaskClient, admin_token: str
|
||||
) -> None:
|
||||
eng = _make_engagement(client, admin_token)
|
||||
_make_sim(client, admin_token, eng["id"], "S1")
|
||||
|
||||
resp = _export(client, admin_token, eng["id"], "pdf")
|
||||
assert resp.status_code == 200
|
||||
assert resp.content_type == "application/pdf"
|
||||
assert resp.data[:4] == b"%PDF"
|
||||
assert len(resp.data) > 1024
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 400 / 404
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_export_unknown_format_400(
|
||||
client: FlaskClient, admin_token: str
|
||||
) -> None:
|
||||
eng = _make_engagement(client, admin_token)
|
||||
resp = client.get(
|
||||
f"/api/engagements/{eng['id']}/export?format=xml",
|
||||
headers=_h(admin_token),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
assert "format must be one of" in resp.get_json()["error"]
|
||||
|
||||
|
||||
def test_export_missing_format_400(
|
||||
client: FlaskClient, admin_token: str
|
||||
) -> None:
|
||||
eng = _make_engagement(client, admin_token)
|
||||
resp = client.get(
|
||||
f"/api/engagements/{eng['id']}/export",
|
||||
headers=_h(admin_token),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
assert "format must be one of" in resp.get_json()["error"]
|
||||
|
||||
|
||||
def test_export_unknown_engagement_404(
|
||||
client: FlaskClient, admin_token: str
|
||||
) -> None:
|
||||
resp = client.get(
|
||||
"/api/engagements/99999/export?format=md",
|
||||
headers=_h(admin_token),
|
||||
)
|
||||
assert resp.status_code == 404
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Edge cases
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_export_engagement_with_zero_simulations_renders_header_only(
|
||||
client: FlaskClient, admin_token: str
|
||||
) -> None:
|
||||
eng = _make_engagement(client, admin_token, "Empty Engagement")
|
||||
|
||||
resp_md = _export(client, admin_token, eng["id"], "md")
|
||||
assert resp_md.status_code == 200
|
||||
assert "Empty Engagement" in resp_md.data.decode()
|
||||
|
||||
resp_csv = _export(client, admin_token, eng["id"], "csv")
|
||||
assert resp_csv.status_code == 200
|
||||
import csv as csv_mod
|
||||
import io
|
||||
|
||||
rows = list(csv_mod.reader(io.StringIO(resp_csv.data.decode())))
|
||||
assert len(rows) == 1 # header only
|
||||
|
||||
resp_pdf = _export(client, admin_token, eng["id"], "pdf")
|
||||
assert resp_pdf.status_code == 200
|
||||
assert resp_pdf.data[:4] == b"%PDF"
|
||||
|
||||
|
||||
def test_export_filename_slugifies_name_and_carries_date(app, admin_user: User) -> None:
|
||||
with app.app_context():
|
||||
eng = Engagement(
|
||||
name="Opération Spéciale!",
|
||||
start_date=date(2026, 6, 1),
|
||||
status=EngagementStatus.PLANNED,
|
||||
created_by_id=admin_user.id,
|
||||
)
|
||||
db.session.add(eng)
|
||||
db.session.commit()
|
||||
|
||||
fname = _export_filename(eng, "md")
|
||||
from datetime import date as _date
|
||||
|
||||
today = _date.today().strftime("%Y%m%d")
|
||||
assert fname.startswith(f"engagement-{eng.id}-")
|
||||
assert "operation-speciale" in fname
|
||||
assert fname.endswith(f"-{today}.md")
|
||||
204
backend/tests/test_export_render.py
Normal file
204
backend/tests/test_export_render.py
Normal file
@@ -0,0 +1,204 @@
|
||||
"""Unit tests for render functions in backend.app.services.export."""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime
|
||||
from types import SimpleNamespace
|
||||
from typing import Any
|
||||
|
||||
from backend.app.services.export import (
|
||||
render_engagement_csv,
|
||||
render_engagement_markdown,
|
||||
render_engagement_pdf,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures / factories
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_engagement(**kw) -> Any:
|
||||
from datetime import date
|
||||
|
||||
defaults: dict[str, Any] = {
|
||||
"id": 1,
|
||||
"name": "Test Engagement",
|
||||
"description": "A purple team exercise",
|
||||
"start_date": date(2026, 6, 1),
|
||||
"end_date": date(2026, 6, 30),
|
||||
"status": SimpleNamespace(value="active"),
|
||||
"created_at": datetime(2026, 6, 1, 10, 0, tzinfo=UTC),
|
||||
"created_by": SimpleNamespace(username="alice"),
|
||||
}
|
||||
defaults.update(kw)
|
||||
return SimpleNamespace(**defaults)
|
||||
|
||||
|
||||
def _make_sim(sid: int = 1, name: str = "Sim Alpha", **kw) -> Any:
|
||||
defaults: dict[str, Any] = {
|
||||
"id": sid,
|
||||
"name": name,
|
||||
"status": SimpleNamespace(value="pending"),
|
||||
"techniques": [{"id": "T1059", "name": "Command and Scripting Interpreter"}],
|
||||
"tactic_ids": ["TA0002"],
|
||||
"description": "Execute a script",
|
||||
"commands": "whoami",
|
||||
"prerequisites": "local admin",
|
||||
"executed_at": None,
|
||||
"execution_result": None,
|
||||
"log_source": None,
|
||||
"logs": None,
|
||||
"soc_comment": None,
|
||||
"incident_number": None,
|
||||
"created_at": datetime(2026, 6, 1, 10, 0, tzinfo=UTC),
|
||||
"updated_at": None,
|
||||
"created_by": SimpleNamespace(username="bob"),
|
||||
}
|
||||
defaults.update(kw)
|
||||
return SimpleNamespace(**defaults)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Markdown tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_render_engagement_markdown_includes_header_fields(app) -> None:
|
||||
with app.app_context():
|
||||
eng = _make_engagement()
|
||||
result = render_engagement_markdown(eng, [])
|
||||
assert "Test Engagement" in result
|
||||
assert "2026-06-01" in result
|
||||
assert "2026-06-30" in result
|
||||
assert "active" in result
|
||||
assert "alice" in result
|
||||
|
||||
|
||||
def test_render_engagement_markdown_lists_all_simulations_in_order(app) -> None:
|
||||
with app.app_context():
|
||||
eng = _make_engagement()
|
||||
sims = [_make_sim(1, "First Sim"), _make_sim(2, "Second Sim")]
|
||||
result = render_engagement_markdown(eng, sims)
|
||||
first_pos = result.index("First Sim")
|
||||
second_pos = result.index("Second Sim")
|
||||
assert first_pos < second_pos
|
||||
|
||||
|
||||
def test_render_engagement_markdown_includes_techniques_with_id_and_name(app) -> None:
|
||||
with app.app_context():
|
||||
eng = _make_engagement()
|
||||
sim = _make_sim(
|
||||
techniques=[{"id": "T1059", "name": "Command and Scripting Interpreter"}]
|
||||
)
|
||||
result = render_engagement_markdown(eng, [sim])
|
||||
assert "T1059" in result
|
||||
assert "Command and Scripting Interpreter" in result
|
||||
|
||||
|
||||
def test_render_engagement_markdown_includes_tactics(app) -> None:
|
||||
with app.app_context():
|
||||
eng = _make_engagement()
|
||||
sim = _make_sim(tactic_ids=["TA0002"])
|
||||
result = render_engagement_markdown(eng, [sim])
|
||||
# TA0002 = Execution — should appear as tactic name or id
|
||||
assert "TA0002" in result or "Execution" in result
|
||||
|
||||
|
||||
def test_render_engagement_markdown_includes_soc_fields_even_when_blank(app) -> None:
|
||||
with app.app_context():
|
||||
eng = _make_engagement()
|
||||
sim = _make_sim(log_source=None, logs=None, soc_comment=None, incident_number=None)
|
||||
result = render_engagement_markdown(eng, [sim])
|
||||
assert "Log source" in result
|
||||
assert "SOC comment" in result
|
||||
assert "Incident number" in result
|
||||
|
||||
|
||||
def test_render_engagement_markdown_escapes_backticks_in_commands(app) -> None:
|
||||
with app.app_context():
|
||||
eng = _make_engagement()
|
||||
sim = _make_sim(commands="echo `whoami`")
|
||||
result = render_engagement_markdown(eng, [sim])
|
||||
# Commands must be inside a fenced code block using ~~~ (tilde fences)
|
||||
assert "~~~" in result
|
||||
# The backtick content must still be present
|
||||
assert "`whoami`" in result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CSV tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_render_engagement_csv_has_header_row(app) -> None:
|
||||
with app.app_context():
|
||||
eng = _make_engagement()
|
||||
result = render_engagement_csv(eng, [])
|
||||
first_line = result.splitlines()[0]
|
||||
assert "id" in first_line
|
||||
assert "name" in first_line
|
||||
assert "status" in first_line
|
||||
assert "techniques" in first_line
|
||||
|
||||
|
||||
def test_render_engagement_csv_joins_multi_techniques_with_pipe(app) -> None:
|
||||
with app.app_context():
|
||||
import csv
|
||||
import io
|
||||
|
||||
eng = _make_engagement()
|
||||
sim = _make_sim(
|
||||
techniques=[
|
||||
{"id": "T1059", "name": "Command and Scripting Interpreter"},
|
||||
{"id": "T1078", "name": "Valid Accounts"},
|
||||
]
|
||||
)
|
||||
result = render_engagement_csv(eng, [sim])
|
||||
rows = list(csv.reader(io.StringIO(result)))
|
||||
# row[3] = techniques column
|
||||
tech_cell = rows[1][3]
|
||||
assert "|" in tech_cell
|
||||
assert "T1059" in tech_cell
|
||||
assert "T1078" in tech_cell
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PDF test
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_render_engagement_pdf_starts_with_pdf_magic(app) -> None:
|
||||
with app.app_context():
|
||||
eng = _make_engagement()
|
||||
sim = _make_sim()
|
||||
result = render_engagement_pdf(eng, [sim])
|
||||
assert isinstance(result, bytes)
|
||||
assert result[:4] == b"%PDF"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MITRE bundle not loaded
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_render_engagement_markdown_with_mitre_bundle_not_loaded_does_not_crash(
|
||||
app,
|
||||
) -> None:
|
||||
with app.app_context():
|
||||
import backend.app.services.mitre as mitre_svc
|
||||
|
||||
original = mitre_svc.mitre_loaded
|
||||
original_tactics = mitre_svc._tactics_by_technique.copy()
|
||||
try:
|
||||
mitre_svc.mitre_loaded = False
|
||||
mitre_svc._tactics_by_technique = {}
|
||||
eng = _make_engagement()
|
||||
sim = _make_sim(
|
||||
techniques=[{"id": "T1059", "name": "Command and Scripting Interpreter"}],
|
||||
tactic_ids=["TA0002"],
|
||||
)
|
||||
result = render_engagement_markdown(eng, [sim])
|
||||
# Must not crash and must include the technique id
|
||||
assert "T1059" in result
|
||||
finally:
|
||||
mitre_svc.mitre_loaded = original
|
||||
mitre_svc._tactics_by_technique = original_tactics
|
||||
Reference in New Issue
Block a user