- New module backend/app/services/export.py with render_engagement_markdown,
render_engagement_csv, render_engagement_pdf, _render_engagement_html helper,
and _export_filename slugifier (NFKD + fallback "unnamed").
- Extend engagements_bp with GET /api/engagements/<int:eid>/export?format=md|csv|pdf,
gated @role_required("admin","redteam"). Returns 400 on missing/unknown format,
404 on unknown engagement, correct Content-Type + Content-Disposition headers.
- Reuses _enrich_techniques and _enrich_tactics from serializers.py; resilient
to MITRE bundle not loaded (returns empty tactics, no crash).
- Adds weasyprint>=60.0 to backend/requirements.txt.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
211 lines
6.5 KiB
Python
211 lines
6.5 KiB
Python
"""Engagement CRUD endpoints."""
|
|
from __future__ import annotations
|
|
|
|
from datetime import date
|
|
|
|
from flask import Blueprint, Response, g, jsonify, request
|
|
|
|
from backend.app.auth import login_required, role_required
|
|
from backend.app.extensions import db
|
|
from backend.app.models import Engagement, EngagementStatus
|
|
from backend.app.models.simulation import Simulation
|
|
from backend.app.serializers import serialize_engagement
|
|
from backend.app.services.export import (
|
|
_export_filename,
|
|
render_engagement_csv,
|
|
render_engagement_markdown,
|
|
render_engagement_pdf,
|
|
)
|
|
|
|
engagements_bp = Blueprint("engagements", __name__, url_prefix="/api/engagements")
|
|
|
|
|
|
def _parse_date(value: object) -> date | None:
|
|
if not isinstance(value, str):
|
|
return None
|
|
try:
|
|
return date.fromisoformat(value)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _parse_status(value: object) -> EngagementStatus | None:
|
|
if not isinstance(value, str):
|
|
return None
|
|
try:
|
|
return EngagementStatus(value)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
@engagements_bp.get("")
|
|
@login_required
|
|
def list_engagements():
|
|
items = Engagement.query.order_by(Engagement.id.asc()).all()
|
|
return jsonify([serialize_engagement(e) for e in items]), 200
|
|
|
|
|
|
@engagements_bp.post("")
|
|
@role_required("admin", "redteam")
|
|
def create_engagement():
|
|
data = request.get_json(silent=True) or {}
|
|
|
|
name = (data.get("name") or "").strip()
|
|
if not name:
|
|
return jsonify({"error": "name is required"}), 400
|
|
|
|
start_raw = data.get("start_date")
|
|
start_date = _parse_date(start_raw) if start_raw else None
|
|
if start_date is None:
|
|
return jsonify({"error": "start_date is required (YYYY-MM-DD)"}), 400
|
|
|
|
end_raw = data.get("end_date")
|
|
end_date: date | None = None
|
|
if end_raw:
|
|
end_date = _parse_date(end_raw)
|
|
if end_date is None:
|
|
return jsonify({"error": "end_date must be YYYY-MM-DD"}), 400
|
|
if end_date < start_date:
|
|
return jsonify({"error": "end_date must be >= start_date"}), 400
|
|
|
|
status = EngagementStatus.PLANNED
|
|
if "status" in data and data.get("status") is not None:
|
|
parsed = _parse_status(data.get("status"))
|
|
if parsed is None:
|
|
return (
|
|
jsonify({"error": "status must be one of: planned, active, closed"}),
|
|
400,
|
|
)
|
|
status = parsed
|
|
|
|
engagement = Engagement(
|
|
name=name,
|
|
description=data.get("description"),
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
status=status,
|
|
created_by_id=g.current_user.id,
|
|
)
|
|
db.session.add(engagement)
|
|
db.session.commit()
|
|
return jsonify(serialize_engagement(engagement)), 201
|
|
|
|
|
|
@engagements_bp.get("/<int:engagement_id>")
|
|
@login_required
|
|
def get_engagement(engagement_id: int):
|
|
engagement = db.session.get(Engagement, engagement_id)
|
|
if engagement is None:
|
|
return jsonify({"error": "Engagement not found"}), 404
|
|
return jsonify(serialize_engagement(engagement)), 200
|
|
|
|
|
|
@engagements_bp.patch("/<int:engagement_id>")
|
|
@role_required("admin", "redteam")
|
|
def update_engagement(engagement_id: int):
|
|
engagement = db.session.get(Engagement, engagement_id)
|
|
if engagement is None:
|
|
return jsonify({"error": "Engagement not found"}), 404
|
|
|
|
data = request.get_json(silent=True) or {}
|
|
|
|
if "name" in data:
|
|
name = (data.get("name") or "").strip()
|
|
if not name:
|
|
return jsonify({"error": "name must not be empty"}), 400
|
|
engagement.name = name
|
|
|
|
if "description" in data:
|
|
engagement.description = data.get("description")
|
|
|
|
new_start = engagement.start_date
|
|
if "start_date" in data:
|
|
parsed = _parse_date(data.get("start_date"))
|
|
if parsed is None:
|
|
return jsonify({"error": "start_date must be YYYY-MM-DD"}), 400
|
|
new_start = parsed
|
|
|
|
new_end = engagement.end_date
|
|
if "end_date" in data:
|
|
if data.get("end_date") in (None, ""):
|
|
new_end = None
|
|
else:
|
|
parsed = _parse_date(data.get("end_date"))
|
|
if parsed is None:
|
|
return jsonify({"error": "end_date must be YYYY-MM-DD"}), 400
|
|
new_end = parsed
|
|
|
|
if new_end is not None and new_end < new_start:
|
|
return jsonify({"error": "end_date must be >= start_date"}), 400
|
|
|
|
engagement.start_date = new_start
|
|
engagement.end_date = new_end
|
|
|
|
if "status" in data:
|
|
parsed_status = _parse_status((data.get("status") or "").strip())
|
|
if parsed_status is None:
|
|
return (
|
|
jsonify({"error": "status must be one of: planned, active, closed"}),
|
|
400,
|
|
)
|
|
engagement.status = parsed_status
|
|
|
|
db.session.commit()
|
|
return jsonify(serialize_engagement(engagement)), 200
|
|
|
|
|
|
@engagements_bp.delete("/<int:engagement_id>")
|
|
@role_required("admin", "redteam")
|
|
def delete_engagement(engagement_id: int):
|
|
engagement = db.session.get(Engagement, engagement_id)
|
|
if engagement is None:
|
|
return jsonify({"error": "Engagement not found"}), 404
|
|
db.session.delete(engagement)
|
|
db.session.commit()
|
|
return "", 204
|
|
|
|
|
|
@engagements_bp.get("/<int:eid>/export")
|
|
@role_required("admin", "redteam")
|
|
def export_engagement(eid: int):
|
|
engagement = db.session.get(Engagement, eid)
|
|
if engagement is None:
|
|
return jsonify({"error": "Engagement not found"}), 404
|
|
|
|
fmt = request.args.get("format", "").strip().lower()
|
|
if fmt not in ("md", "csv", "pdf"):
|
|
return jsonify({"error": "format must be one of: md, csv, pdf"}), 400
|
|
|
|
simulations = (
|
|
Simulation.query.filter_by(engagement_id=eid)
|
|
.order_by(Simulation.id.asc())
|
|
.all()
|
|
)
|
|
|
|
if fmt == "md":
|
|
body = render_engagement_markdown(engagement, simulations)
|
|
filename = _export_filename(engagement, "md")
|
|
return Response(
|
|
body,
|
|
mimetype="text/markdown; charset=utf-8",
|
|
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
|
)
|
|
|
|
if fmt == "csv":
|
|
body = render_engagement_csv(engagement, simulations)
|
|
filename = _export_filename(engagement, "csv")
|
|
return Response(
|
|
body,
|
|
mimetype="text/csv; charset=utf-8",
|
|
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
|
)
|
|
|
|
# pdf
|
|
body_bytes = render_engagement_pdf(engagement, simulations)
|
|
filename = _export_filename(engagement, "pdf")
|
|
return Response(
|
|
body_bytes,
|
|
mimetype="application/pdf",
|
|
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
|
)
|