Files
mimic/backend/app/api/engagements.py
Knacky 87e4409530 feat: add engagement export service and endpoint (md/csv/pdf)
- New module backend/app/services/export.py with render_engagement_markdown,
  render_engagement_csv, render_engagement_pdf, _render_engagement_html helper,
  and _export_filename slugifier (NFKD + fallback "unnamed").
- Extend engagements_bp with GET /api/engagements/<int:eid>/export?format=md|csv|pdf,
  gated @role_required("admin","redteam"). Returns 400 on missing/unknown format,
  404 on unknown engagement, correct Content-Type + Content-Disposition headers.
- Reuses _enrich_techniques and _enrich_tactics from serializers.py; resilient
  to MITRE bundle not loaded (returns empty tactics, no crash).
- Adds weasyprint>=60.0 to backend/requirements.txt.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-08 17:57:22 +02:00

211 lines
6.5 KiB
Python

"""Engagement CRUD endpoints."""
from __future__ import annotations
from datetime import date
from flask import Blueprint, Response, g, jsonify, request
from backend.app.auth import login_required, role_required
from backend.app.extensions import db
from backend.app.models import Engagement, EngagementStatus
from backend.app.models.simulation import Simulation
from backend.app.serializers import serialize_engagement
from backend.app.services.export import (
_export_filename,
render_engagement_csv,
render_engagement_markdown,
render_engagement_pdf,
)
engagements_bp = Blueprint("engagements", __name__, url_prefix="/api/engagements")
def _parse_date(value: object) -> date | None:
if not isinstance(value, str):
return None
try:
return date.fromisoformat(value)
except ValueError:
return None
def _parse_status(value: object) -> EngagementStatus | None:
if not isinstance(value, str):
return None
try:
return EngagementStatus(value)
except ValueError:
return None
@engagements_bp.get("")
@login_required
def list_engagements():
items = Engagement.query.order_by(Engagement.id.asc()).all()
return jsonify([serialize_engagement(e) for e in items]), 200
@engagements_bp.post("")
@role_required("admin", "redteam")
def create_engagement():
data = request.get_json(silent=True) or {}
name = (data.get("name") or "").strip()
if not name:
return jsonify({"error": "name is required"}), 400
start_raw = data.get("start_date")
start_date = _parse_date(start_raw) if start_raw else None
if start_date is None:
return jsonify({"error": "start_date is required (YYYY-MM-DD)"}), 400
end_raw = data.get("end_date")
end_date: date | None = None
if end_raw:
end_date = _parse_date(end_raw)
if end_date is None:
return jsonify({"error": "end_date must be YYYY-MM-DD"}), 400
if end_date < start_date:
return jsonify({"error": "end_date must be >= start_date"}), 400
status = EngagementStatus.PLANNED
if "status" in data and data.get("status") is not None:
parsed = _parse_status(data.get("status"))
if parsed is None:
return (
jsonify({"error": "status must be one of: planned, active, closed"}),
400,
)
status = parsed
engagement = Engagement(
name=name,
description=data.get("description"),
start_date=start_date,
end_date=end_date,
status=status,
created_by_id=g.current_user.id,
)
db.session.add(engagement)
db.session.commit()
return jsonify(serialize_engagement(engagement)), 201
@engagements_bp.get("/<int:engagement_id>")
@login_required
def get_engagement(engagement_id: int):
engagement = db.session.get(Engagement, engagement_id)
if engagement is None:
return jsonify({"error": "Engagement not found"}), 404
return jsonify(serialize_engagement(engagement)), 200
@engagements_bp.patch("/<int:engagement_id>")
@role_required("admin", "redteam")
def update_engagement(engagement_id: int):
engagement = db.session.get(Engagement, engagement_id)
if engagement is None:
return jsonify({"error": "Engagement not found"}), 404
data = request.get_json(silent=True) or {}
if "name" in data:
name = (data.get("name") or "").strip()
if not name:
return jsonify({"error": "name must not be empty"}), 400
engagement.name = name
if "description" in data:
engagement.description = data.get("description")
new_start = engagement.start_date
if "start_date" in data:
parsed = _parse_date(data.get("start_date"))
if parsed is None:
return jsonify({"error": "start_date must be YYYY-MM-DD"}), 400
new_start = parsed
new_end = engagement.end_date
if "end_date" in data:
if data.get("end_date") in (None, ""):
new_end = None
else:
parsed = _parse_date(data.get("end_date"))
if parsed is None:
return jsonify({"error": "end_date must be YYYY-MM-DD"}), 400
new_end = parsed
if new_end is not None and new_end < new_start:
return jsonify({"error": "end_date must be >= start_date"}), 400
engagement.start_date = new_start
engagement.end_date = new_end
if "status" in data:
parsed_status = _parse_status((data.get("status") or "").strip())
if parsed_status is None:
return (
jsonify({"error": "status must be one of: planned, active, closed"}),
400,
)
engagement.status = parsed_status
db.session.commit()
return jsonify(serialize_engagement(engagement)), 200
@engagements_bp.delete("/<int:engagement_id>")
@role_required("admin", "redteam")
def delete_engagement(engagement_id: int):
engagement = db.session.get(Engagement, engagement_id)
if engagement is None:
return jsonify({"error": "Engagement not found"}), 404
db.session.delete(engagement)
db.session.commit()
return "", 204
@engagements_bp.get("/<int:eid>/export")
@role_required("admin", "redteam")
def export_engagement(eid: int):
engagement = db.session.get(Engagement, eid)
if engagement is None:
return jsonify({"error": "Engagement not found"}), 404
fmt = request.args.get("format", "").strip().lower()
if fmt not in ("md", "csv", "pdf"):
return jsonify({"error": "format must be one of: md, csv, pdf"}), 400
simulations = (
Simulation.query.filter_by(engagement_id=eid)
.order_by(Simulation.id.asc())
.all()
)
if fmt == "md":
body = render_engagement_markdown(engagement, simulations)
filename = _export_filename(engagement, "md")
return Response(
body,
mimetype="text/markdown; charset=utf-8",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
if fmt == "csv":
body = render_engagement_csv(engagement, simulations)
filename = _export_filename(engagement, "csv")
return Response(
body,
mimetype="text/csv; charset=utf-8",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
# pdf
body_bytes = render_engagement_pdf(engagement, simulations)
filename = _export_filename(engagement, "pdf")
return Response(
body_bytes,
mimetype="application/pdf",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)