From 87e44095307cc70d4ac01a99189ab14156a241c8 Mon Sep 17 00:00:00 2001 From: Knacky Date: Mon, 8 Jun 2026 17:57:22 +0200 Subject: [PATCH] feat: add engagement export service and endpoint (md/csv/pdf) - New module backend/app/services/export.py with render_engagement_markdown, render_engagement_csv, render_engagement_pdf, _render_engagement_html helper, and _export_filename slugifier (NFKD + fallback "unnamed"). - Extend engagements_bp with GET /api/engagements//export?format=md|csv|pdf, gated @role_required("admin","redteam"). Returns 400 on missing/unknown format, 404 on unknown engagement, correct Content-Type + Content-Disposition headers. - Reuses _enrich_techniques and _enrich_tactics from serializers.py; resilient to MITRE bundle not loaded (returns empty tactics, no crash). - Adds weasyprint>=60.0 to backend/requirements.txt. Co-Authored-By: Claude Sonnet 4.6 --- backend/app/api/engagements.py | 54 +++++- backend/app/services/export.py | 302 +++++++++++++++++++++++++++++++++ backend/requirements.txt | 1 + 3 files changed, 356 insertions(+), 1 deletion(-) create mode 100644 backend/app/services/export.py diff --git a/backend/app/api/engagements.py b/backend/app/api/engagements.py index 2e57ce8..ee10972 100644 --- a/backend/app/api/engagements.py +++ b/backend/app/api/engagements.py @@ -3,12 +3,19 @@ from __future__ import annotations from datetime import date -from flask import Blueprint, g, jsonify, request +from flask import Blueprint, Response, g, jsonify, request from backend.app.auth import login_required, role_required from backend.app.extensions import db from backend.app.models import Engagement, EngagementStatus +from backend.app.models.simulation import Simulation from backend.app.serializers import serialize_engagement +from backend.app.services.export import ( + _export_filename, + render_engagement_csv, + render_engagement_markdown, + render_engagement_pdf, +) engagements_bp = Blueprint("engagements", __name__, url_prefix="/api/engagements") @@ -156,3 +163,48 @@ def delete_engagement(engagement_id: int): db.session.delete(engagement) db.session.commit() return "", 204 + + +@engagements_bp.get("//export") +@role_required("admin", "redteam") +def export_engagement(eid: int): + engagement = db.session.get(Engagement, eid) + if engagement is None: + return jsonify({"error": "Engagement not found"}), 404 + + fmt = request.args.get("format", "").strip().lower() + if fmt not in ("md", "csv", "pdf"): + return jsonify({"error": "format must be one of: md, csv, pdf"}), 400 + + simulations = ( + Simulation.query.filter_by(engagement_id=eid) + .order_by(Simulation.id.asc()) + .all() + ) + + if fmt == "md": + body = render_engagement_markdown(engagement, simulations) + filename = _export_filename(engagement, "md") + return Response( + body, + mimetype="text/markdown; charset=utf-8", + headers={"Content-Disposition": f'attachment; filename="{filename}"'}, + ) + + if fmt == "csv": + body = render_engagement_csv(engagement, simulations) + filename = _export_filename(engagement, "csv") + return Response( + body, + mimetype="text/csv; charset=utf-8", + headers={"Content-Disposition": f'attachment; filename="{filename}"'}, + ) + + # pdf + body_bytes = render_engagement_pdf(engagement, simulations) + filename = _export_filename(engagement, "pdf") + return Response( + body_bytes, + mimetype="application/pdf", + headers={"Content-Disposition": f'attachment; filename="{filename}"'}, + ) diff --git a/backend/app/services/export.py b/backend/app/services/export.py new file mode 100644 index 0000000..ef15f88 --- /dev/null +++ b/backend/app/services/export.py @@ -0,0 +1,302 @@ +"""Engagement export renderers — Markdown, CSV, PDF.""" +from __future__ import annotations + +import csv +import io +import re +import unicodedata +from datetime import date +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from backend.app.models.engagement import Engagement + from backend.app.models.simulation import Simulation + + +def _export_filename(engagement: Engagement, ext: str) -> str: + name = engagement.name or "" + normalized = unicodedata.normalize("NFKD", name).encode("ascii", "ignore").decode() + slug = re.sub(r"[^a-z0-9]+", "-", normalized.lower()).strip("-")[:60] or "unnamed" + today = date.today().strftime("%Y%m%d") + return f"engagement-{engagement.id}-{slug}-{today}.{ext}" + + +def _technique_names(techniques: list[dict]) -> str: + return " | ".join(t.get("name", t.get("id", "")) for t in (techniques or [])) + + +def _technique_ids(techniques: list[dict]) -> str: + return " | ".join(t.get("id", "") for t in (techniques or [])) + + +def _tactic_names(tactic_ids: list[str]) -> str: + from backend.app.serializers import _enrich_tactics + + enriched = _enrich_tactics(tactic_ids or []) + return " | ".join(e.get("name", e.get("id", "")) for e in enriched) + + +def _enrich_sim_techniques(techniques: list[dict]) -> list[dict]: + from backend.app.serializers import _enrich_techniques + + return _enrich_techniques(techniques or []) + + +def _creator(obj: object) -> str: + """Return username string from an ORM object with a created_by relationship.""" + cb = getattr(obj, "created_by", None) + if cb is None: + return "" + return getattr(cb, "username", "") or "" + + +# --------------------------------------------------------------------------- +# Markdown +# --------------------------------------------------------------------------- + + +def render_engagement_markdown( + engagement: Engagement, simulations: list[Simulation] +) -> str: + lines: list[str] = [] + + lines.append(f"# {engagement.name}") + lines.append("") + if engagement.description: + lines.append(engagement.description) + lines.append("") + lines.append(f"**Status**: {engagement.status.value}") + lines.append( + f"**Start date**: {engagement.start_date.isoformat() if engagement.start_date else 'N/A'}" + ) + lines.append( + f"**End date**: {engagement.end_date.isoformat() if engagement.end_date else 'N/A'}" + ) + lines.append( + f"**Created by**: {_creator(engagement)}" + ) + lines.append( + f"**Created at**: {engagement.created_at.isoformat() if engagement.created_at else 'N/A'}" + ) + lines.append("") + + if not simulations: + return "\n".join(lines) + + lines.append("---") + lines.append("") + lines.append("## Simulations") + lines.append("") + + for sim in simulations: + enriched_techniques = _enrich_sim_techniques(sim.techniques) + tech_str = " | ".join( + f"{t['id']} — {t['name']}" for t in enriched_techniques + ) if enriched_techniques else "N/A" + tactic_str = _tactic_names(sim.tactic_ids) or "N/A" + + lines.append(f"### {sim.name}") + lines.append("") + lines.append(f"**Status**: {sim.status.value}") + lines.append(f"**Techniques**: {tech_str}") + lines.append(f"**Tactics**: {tactic_str}") + if sim.description: + lines.append(f"**Description**: {sim.description}") + lines.append( + f"**Executed at**: {sim.executed_at.isoformat() if sim.executed_at else 'N/A'}" + ) + lines.append(f"**Execution result**: {sim.execution_result or 'N/A'}") + lines.append("") + + lines.append("**Commands**:") + if sim.commands: + safe = sim.commands.replace("~~~", "\\~\\~\\~") + lines.append("~~~bash") + lines.append(safe) + lines.append("~~~") + else: + lines.append("N/A") + lines.append("") + + lines.append(f"**Prerequisites**: {sim.prerequisites or 'N/A'}") + lines.append("") + + lines.append("#### SOC") + lines.append(f"**Log source**: {sim.log_source or 'N/A'}") + lines.append(f"**Logs**: {sim.logs or 'N/A'}") + lines.append(f"**SOC comment**: {sim.soc_comment or 'N/A'}") + lines.append(f"**Incident number**: {sim.incident_number or 'N/A'}") + lines.append("") + + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# CSV +# --------------------------------------------------------------------------- + +_CSV_HEADERS = [ + "id", + "name", + "status", + "techniques", + "tactics", + "description", + "commands", + "prerequisites", + "executed_at", + "execution_result", + "log_source", + "logs", + "soc_comment", + "incident_number", + "created_at", + "updated_at", +] + + +def render_engagement_csv( + engagement: Engagement, simulations: list[Simulation] +) -> str: + buf = io.StringIO() + writer = csv.writer(buf) + writer.writerow(_CSV_HEADERS) + + for sim in simulations: + enriched_techniques = _enrich_sim_techniques(sim.techniques) + tech_ids = "|".join(t["id"] for t in enriched_techniques) + tactic_str = "|".join(sim.tactic_ids or []) + + writer.writerow([ + sim.id, + sim.name, + sim.status.value, + tech_ids, + tactic_str, + sim.description or "", + sim.commands or "", + sim.prerequisites or "", + sim.executed_at.isoformat() if sim.executed_at else "", + sim.execution_result or "", + sim.log_source or "", + sim.logs or "", + sim.soc_comment or "", + sim.incident_number or "", + sim.created_at.isoformat() if sim.created_at else "", + sim.updated_at.isoformat() if sim.updated_at else "", + ]) + + return buf.getvalue() + + +# --------------------------------------------------------------------------- +# HTML (internal, used by PDF renderer) +# --------------------------------------------------------------------------- + +_CSS = """ +body { font-family: sans-serif; font-size: 13px; color: #1a1a1a; margin: 40px; } +h1 { font-size: 22px; border-bottom: 2px solid #333; padding-bottom: 6px; } +h2 { font-size: 17px; margin-top: 32px; color: #333; } +h3 { font-size: 14px; margin-top: 24px; background: #f0f0f0; padding: 4px 8px; } +h3:nth-of-type(odd) { background: #e8e8e8; } +table { border-collapse: collapse; width: 100%; margin-bottom: 12px; } +th, td { border: 1px solid #ccc; padding: 4px 8px; text-align: left; vertical-align: top; } +th { background: #e0e0e0; } +pre { background: #f5f5f5; padding: 8px; font-size: 11px; white-space: pre-wrap; word-break: break-all; } +.meta { color: #555; margin-bottom: 16px; } +""" + + +def _html_escape(text: str) -> str: + return ( + text.replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + ) + + +def _render_engagement_html( + engagement: Engagement, simulations: list[Simulation] +) -> str: + h = _html_escape + parts: list[str] = [] + + parts.append("") + parts.append(f"") + parts.append(f"

{h(engagement.name)}

") + parts.append("
") + if engagement.description: + parts.append(f"

{h(engagement.description)}

") + parts.append(f"

Status: {h(engagement.status.value)}

") + sd = engagement.start_date.isoformat() if engagement.start_date else "N/A" + ed = engagement.end_date.isoformat() if engagement.end_date else "N/A" + parts.append(f"

Dates: {h(sd)} → {h(ed)}

") + parts.append(f"

Created by: {h(_creator(engagement))}

") + ca = engagement.created_at.isoformat() if engagement.created_at else "N/A" + parts.append(f"

Created at: {h(ca)}

") + parts.append("
") + + if simulations: + parts.append("

Simulations

") + for sim in simulations: + enriched_techniques = _enrich_sim_techniques(sim.techniques) + tech_str = h( + " | ".join(f"{t['id']} — {t['name']}" for t in enriched_techniques) + or "N/A" + ) + tactic_str = h(_tactic_names(sim.tactic_ids) or "N/A") + + parts.append(f"

{h(sim.name)}

") + parts.append("") + parts.append(f"") + parts.append(f"") + parts.append(f"") + parts.append( + f"" + ) + executed = sim.executed_at.isoformat() if sim.executed_at else "N/A" + parts.append(f"") + parts.append( + f"" + ) + parts.append("
Status{h(sim.status.value)}
Techniques{tech_str}
Tactics{tactic_str}
Description{h(sim.description or '')}
Executed at{h(executed)}
Execution result{h(sim.execution_result or '')}
") + + if sim.commands: + parts.append(f"

Commands:

{h(sim.commands)}
") + + if sim.prerequisites: + parts.append( + f"

Prerequisites: {h(sim.prerequisites)}

" + ) + + parts.append("") + parts.append("") + parts.append( + f"" + ) + parts.append(f"") + parts.append( + f"" + ) + parts.append( + f"" + ) + parts.append("
SOC
Log source{h(sim.log_source or '')}
Logs{h(sim.logs or '')}
SOC comment{h(sim.soc_comment or '')}
Incident number{h(sim.incident_number or '')}
") + + parts.append("") + return "".join(parts) + + +# --------------------------------------------------------------------------- +# PDF +# --------------------------------------------------------------------------- + + +def render_engagement_pdf( + engagement: Engagement, simulations: list[Simulation] +) -> bytes: + from weasyprint import HTML # type: ignore[import-untyped] + + html = _render_engagement_html(engagement, simulations) + return HTML(string=html).write_pdf() # type: ignore[no-any-return] diff --git a/backend/requirements.txt b/backend/requirements.txt index e00316d..878005e 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -3,6 +3,7 @@ Flask-SQLAlchemy==3.1.1 Flask-Migrate==4.0.7 PyJWT==2.9.0 argon2-cffi==23.1.0 +weasyprint>=60.0 pytest==8.3.3 ruff==0.6.9 mypy==1.11.2