feat: add engagement export service and endpoint (md/csv/pdf)

- New module backend/app/services/export.py with render_engagement_markdown,
  render_engagement_csv, render_engagement_pdf, _render_engagement_html helper,
  and _export_filename slugifier (NFKD + fallback "unnamed").
- Extend engagements_bp with GET /api/engagements/<int:eid>/export?format=md|csv|pdf,
  gated @role_required("admin","redteam"). Returns 400 on missing/unknown format,
  404 on unknown engagement, correct Content-Type + Content-Disposition headers.
- Reuses _enrich_techniques and _enrich_tactics from serializers.py; resilient
  to MITRE bundle not loaded (returns empty tactics, no crash).
- Adds weasyprint>=60.0 to backend/requirements.txt.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Knacky
2026-06-08 17:57:22 +02:00
parent cf006a2ba8
commit 87e4409530
3 changed files with 356 additions and 1 deletions

View File

@@ -3,12 +3,19 @@ from __future__ import annotations
from datetime import date from datetime import date
from flask import Blueprint, g, jsonify, request from flask import Blueprint, Response, g, jsonify, request
from backend.app.auth import login_required, role_required from backend.app.auth import login_required, role_required
from backend.app.extensions import db from backend.app.extensions import db
from backend.app.models import Engagement, EngagementStatus from backend.app.models import Engagement, EngagementStatus
from backend.app.models.simulation import Simulation
from backend.app.serializers import serialize_engagement from backend.app.serializers import serialize_engagement
from backend.app.services.export import (
_export_filename,
render_engagement_csv,
render_engagement_markdown,
render_engagement_pdf,
)
engagements_bp = Blueprint("engagements", __name__, url_prefix="/api/engagements") engagements_bp = Blueprint("engagements", __name__, url_prefix="/api/engagements")
@@ -156,3 +163,48 @@ def delete_engagement(engagement_id: int):
db.session.delete(engagement) db.session.delete(engagement)
db.session.commit() db.session.commit()
return "", 204 return "", 204
@engagements_bp.get("/<int:eid>/export")
@role_required("admin", "redteam")
def export_engagement(eid: int):
engagement = db.session.get(Engagement, eid)
if engagement is None:
return jsonify({"error": "Engagement not found"}), 404
fmt = request.args.get("format", "").strip().lower()
if fmt not in ("md", "csv", "pdf"):
return jsonify({"error": "format must be one of: md, csv, pdf"}), 400
simulations = (
Simulation.query.filter_by(engagement_id=eid)
.order_by(Simulation.id.asc())
.all()
)
if fmt == "md":
body = render_engagement_markdown(engagement, simulations)
filename = _export_filename(engagement, "md")
return Response(
body,
mimetype="text/markdown; charset=utf-8",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
if fmt == "csv":
body = render_engagement_csv(engagement, simulations)
filename = _export_filename(engagement, "csv")
return Response(
body,
mimetype="text/csv; charset=utf-8",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
# pdf
body_bytes = render_engagement_pdf(engagement, simulations)
filename = _export_filename(engagement, "pdf")
return Response(
body_bytes,
mimetype="application/pdf",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)

View File

@@ -0,0 +1,302 @@
"""Engagement export renderers — Markdown, CSV, PDF."""
from __future__ import annotations
import csv
import io
import re
import unicodedata
from datetime import date
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from backend.app.models.engagement import Engagement
from backend.app.models.simulation import Simulation
def _export_filename(engagement: Engagement, ext: str) -> str:
name = engagement.name or ""
normalized = unicodedata.normalize("NFKD", name).encode("ascii", "ignore").decode()
slug = re.sub(r"[^a-z0-9]+", "-", normalized.lower()).strip("-")[:60] or "unnamed"
today = date.today().strftime("%Y%m%d")
return f"engagement-{engagement.id}-{slug}-{today}.{ext}"
def _technique_names(techniques: list[dict]) -> str:
return " | ".join(t.get("name", t.get("id", "")) for t in (techniques or []))
def _technique_ids(techniques: list[dict]) -> str:
return " | ".join(t.get("id", "") for t in (techniques or []))
def _tactic_names(tactic_ids: list[str]) -> str:
from backend.app.serializers import _enrich_tactics
enriched = _enrich_tactics(tactic_ids or [])
return " | ".join(e.get("name", e.get("id", "")) for e in enriched)
def _enrich_sim_techniques(techniques: list[dict]) -> list[dict]:
from backend.app.serializers import _enrich_techniques
return _enrich_techniques(techniques or [])
def _creator(obj: object) -> str:
"""Return username string from an ORM object with a created_by relationship."""
cb = getattr(obj, "created_by", None)
if cb is None:
return ""
return getattr(cb, "username", "") or ""
# ---------------------------------------------------------------------------
# Markdown
# ---------------------------------------------------------------------------
def render_engagement_markdown(
engagement: Engagement, simulations: list[Simulation]
) -> str:
lines: list[str] = []
lines.append(f"# {engagement.name}")
lines.append("")
if engagement.description:
lines.append(engagement.description)
lines.append("")
lines.append(f"**Status**: {engagement.status.value}")
lines.append(
f"**Start date**: {engagement.start_date.isoformat() if engagement.start_date else 'N/A'}"
)
lines.append(
f"**End date**: {engagement.end_date.isoformat() if engagement.end_date else 'N/A'}"
)
lines.append(
f"**Created by**: {_creator(engagement)}"
)
lines.append(
f"**Created at**: {engagement.created_at.isoformat() if engagement.created_at else 'N/A'}"
)
lines.append("")
if not simulations:
return "\n".join(lines)
lines.append("---")
lines.append("")
lines.append("## Simulations")
lines.append("")
for sim in simulations:
enriched_techniques = _enrich_sim_techniques(sim.techniques)
tech_str = " | ".join(
f"{t['id']}{t['name']}" for t in enriched_techniques
) if enriched_techniques else "N/A"
tactic_str = _tactic_names(sim.tactic_ids) or "N/A"
lines.append(f"### {sim.name}")
lines.append("")
lines.append(f"**Status**: {sim.status.value}")
lines.append(f"**Techniques**: {tech_str}")
lines.append(f"**Tactics**: {tactic_str}")
if sim.description:
lines.append(f"**Description**: {sim.description}")
lines.append(
f"**Executed at**: {sim.executed_at.isoformat() if sim.executed_at else 'N/A'}"
)
lines.append(f"**Execution result**: {sim.execution_result or 'N/A'}")
lines.append("")
lines.append("**Commands**:")
if sim.commands:
safe = sim.commands.replace("~~~", "\\~\\~\\~")
lines.append("~~~bash")
lines.append(safe)
lines.append("~~~")
else:
lines.append("N/A")
lines.append("")
lines.append(f"**Prerequisites**: {sim.prerequisites or 'N/A'}")
lines.append("")
lines.append("#### SOC")
lines.append(f"**Log source**: {sim.log_source or 'N/A'}")
lines.append(f"**Logs**: {sim.logs or 'N/A'}")
lines.append(f"**SOC comment**: {sim.soc_comment or 'N/A'}")
lines.append(f"**Incident number**: {sim.incident_number or 'N/A'}")
lines.append("")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# CSV
# ---------------------------------------------------------------------------
_CSV_HEADERS = [
"id",
"name",
"status",
"techniques",
"tactics",
"description",
"commands",
"prerequisites",
"executed_at",
"execution_result",
"log_source",
"logs",
"soc_comment",
"incident_number",
"created_at",
"updated_at",
]
def render_engagement_csv(
engagement: Engagement, simulations: list[Simulation]
) -> str:
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(_CSV_HEADERS)
for sim in simulations:
enriched_techniques = _enrich_sim_techniques(sim.techniques)
tech_ids = "|".join(t["id"] for t in enriched_techniques)
tactic_str = "|".join(sim.tactic_ids or [])
writer.writerow([
sim.id,
sim.name,
sim.status.value,
tech_ids,
tactic_str,
sim.description or "",
sim.commands or "",
sim.prerequisites or "",
sim.executed_at.isoformat() if sim.executed_at else "",
sim.execution_result or "",
sim.log_source or "",
sim.logs or "",
sim.soc_comment or "",
sim.incident_number or "",
sim.created_at.isoformat() if sim.created_at else "",
sim.updated_at.isoformat() if sim.updated_at else "",
])
return buf.getvalue()
# ---------------------------------------------------------------------------
# HTML (internal, used by PDF renderer)
# ---------------------------------------------------------------------------
_CSS = """
body { font-family: sans-serif; font-size: 13px; color: #1a1a1a; margin: 40px; }
h1 { font-size: 22px; border-bottom: 2px solid #333; padding-bottom: 6px; }
h2 { font-size: 17px; margin-top: 32px; color: #333; }
h3 { font-size: 14px; margin-top: 24px; background: #f0f0f0; padding: 4px 8px; }
h3:nth-of-type(odd) { background: #e8e8e8; }
table { border-collapse: collapse; width: 100%; margin-bottom: 12px; }
th, td { border: 1px solid #ccc; padding: 4px 8px; text-align: left; vertical-align: top; }
th { background: #e0e0e0; }
pre { background: #f5f5f5; padding: 8px; font-size: 11px; white-space: pre-wrap; word-break: break-all; }
.meta { color: #555; margin-bottom: 16px; }
"""
def _html_escape(text: str) -> str:
return (
text.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
)
def _render_engagement_html(
engagement: Engagement, simulations: list[Simulation]
) -> str:
h = _html_escape
parts: list[str] = []
parts.append("<!DOCTYPE html><html><head><meta charset='utf-8'>")
parts.append(f"<style>{_CSS}</style></head><body>")
parts.append(f"<h1>{h(engagement.name)}</h1>")
parts.append("<div class='meta'>")
if engagement.description:
parts.append(f"<p>{h(engagement.description)}</p>")
parts.append(f"<p><strong>Status:</strong> {h(engagement.status.value)}</p>")
sd = engagement.start_date.isoformat() if engagement.start_date else "N/A"
ed = engagement.end_date.isoformat() if engagement.end_date else "N/A"
parts.append(f"<p><strong>Dates:</strong> {h(sd)}{h(ed)}</p>")
parts.append(f"<p><strong>Created by:</strong> {h(_creator(engagement))}</p>")
ca = engagement.created_at.isoformat() if engagement.created_at else "N/A"
parts.append(f"<p><strong>Created at:</strong> {h(ca)}</p>")
parts.append("</div>")
if simulations:
parts.append("<h2>Simulations</h2>")
for sim in simulations:
enriched_techniques = _enrich_sim_techniques(sim.techniques)
tech_str = h(
" | ".join(f"{t['id']}{t['name']}" for t in enriched_techniques)
or "N/A"
)
tactic_str = h(_tactic_names(sim.tactic_ids) or "N/A")
parts.append(f"<h3>{h(sim.name)}</h3>")
parts.append("<table>")
parts.append(f"<tr><th>Status</th><td>{h(sim.status.value)}</td></tr>")
parts.append(f"<tr><th>Techniques</th><td>{tech_str}</td></tr>")
parts.append(f"<tr><th>Tactics</th><td>{tactic_str}</td></tr>")
parts.append(
f"<tr><th>Description</th><td>{h(sim.description or '')}</td></tr>"
)
executed = sim.executed_at.isoformat() if sim.executed_at else "N/A"
parts.append(f"<tr><th>Executed at</th><td>{h(executed)}</td></tr>")
parts.append(
f"<tr><th>Execution result</th><td>{h(sim.execution_result or '')}</td></tr>"
)
parts.append("</table>")
if sim.commands:
parts.append(f"<p><strong>Commands:</strong></p><pre>{h(sim.commands)}</pre>")
if sim.prerequisites:
parts.append(
f"<p><strong>Prerequisites:</strong> {h(sim.prerequisites)}</p>"
)
parts.append("<table>")
parts.append("<tr><th colspan='2'>SOC</th></tr>")
parts.append(
f"<tr><th>Log source</th><td>{h(sim.log_source or '')}</td></tr>"
)
parts.append(f"<tr><th>Logs</th><td>{h(sim.logs or '')}</td></tr>")
parts.append(
f"<tr><th>SOC comment</th><td>{h(sim.soc_comment or '')}</td></tr>"
)
parts.append(
f"<tr><th>Incident number</th><td>{h(sim.incident_number or '')}</td></tr>"
)
parts.append("</table>")
parts.append("</body></html>")
return "".join(parts)
# ---------------------------------------------------------------------------
# PDF
# ---------------------------------------------------------------------------
def render_engagement_pdf(
engagement: Engagement, simulations: list[Simulation]
) -> bytes:
from weasyprint import HTML # type: ignore[import-untyped]
html = _render_engagement_html(engagement, simulations)
return HTML(string=html).write_pdf() # type: ignore[no-any-return]

View File

@@ -3,6 +3,7 @@ Flask-SQLAlchemy==3.1.1
Flask-Migrate==4.0.7 Flask-Migrate==4.0.7
PyJWT==2.9.0 PyJWT==2.9.0
argon2-cffi==23.1.0 argon2-cffi==23.1.0
weasyprint>=60.0
pytest==8.3.3 pytest==8.3.3
ruff==0.6.9 ruff==0.6.9
mypy==1.11.2 mypy==1.11.2