Files
mimic/backend/app/api/simulations.py
Knacky 55f993fa24 fix(backend): sprint 5 post-review — name fallback, isinstance guards, 400 tests
- create_simulation: name falls back to template.name when template_id provided
  and name is absent/empty (AC-27.1)
- templates POST/PATCH: isinstance(list) check on technique_ids/tactic_ids
  before resolving, returns 400 with clear message
- 5 new tests: unknown technique_id → 400 (POST+PATCH), unknown tactic_id → 400
  (POST+PATCH), name fallback to template.name
- mypy: merged template branch into if/else to eliminate union-attr false positives

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-28 07:04:25 +02:00

173 lines
5.3 KiB
Python

"""Simulation CRUD + workflow endpoints."""
from __future__ import annotations
from datetime import UTC, datetime
from flask import Blueprint, g, jsonify, request
from backend.app.auth import login_required, role_required
from backend.app.extensions import db
from backend.app.models import Engagement
from backend.app.models.simulation import Simulation, SimulationStatus
from backend.app.serializers import serialize_simulation
from backend.app.services import simulation_workflow
simulations_bp = Blueprint("simulations", __name__)
# ---------------------------------------------------------------------------
# Nested under /api/engagements/<eid>/simulations
# ---------------------------------------------------------------------------
@simulations_bp.get("/api/engagements/<int:eid>/simulations")
@login_required
def list_simulations(eid: int):
engagement = db.session.get(Engagement, eid)
if engagement is None:
return jsonify({"error": "Engagement not found"}), 404
sims = (
Simulation.query.filter_by(engagement_id=eid)
.order_by(Simulation.created_at.desc())
.all()
)
return jsonify([serialize_simulation(s) for s in sims]), 200
@simulations_bp.post("/api/engagements/<int:eid>/simulations")
@role_required("admin", "redteam")
def create_simulation(eid: int):
engagement = db.session.get(Engagement, eid)
if engagement is None:
return jsonify({"error": "Engagement not found"}), 404
data = request.get_json(silent=True) or {}
name = (data.get("name") or "").strip()
template_id = data.get("template_id")
if template_id is not None:
from backend.app.models.simulation_template import SimulationTemplate
tmpl = db.session.get(SimulationTemplate, template_id)
if tmpl is None:
return jsonify({"error": "Template not found"}), 404
if not name:
name = tmpl.name
sim = Simulation(
engagement_id=eid,
name=name,
status=SimulationStatus.PENDING,
created_at=datetime.now(UTC),
created_by_id=g.current_user.id,
)
sim.description = tmpl.description
sim.commands = tmpl.commands
sim.prerequisites = tmpl.prerequisites
sim.techniques = list(tmpl.techniques or [])
sim.tactic_ids = list(tmpl.tactic_ids or [])
else:
if not name:
return jsonify({"error": "name is required"}), 400
sim = Simulation(
engagement_id=eid,
name=name,
status=SimulationStatus.PENDING,
created_at=datetime.now(UTC),
created_by_id=g.current_user.id,
)
db.session.add(sim)
db.session.commit()
return jsonify(serialize_simulation(sim)), 201
# ---------------------------------------------------------------------------
# Flat /api/simulations/<sid>
# ---------------------------------------------------------------------------
@simulations_bp.get("/api/simulations/<int:sid>")
@login_required
def get_simulation(sid: int):
sim = db.session.get(Simulation, sid)
if sim is None:
return jsonify({"error": "Simulation not found"}), 404
return jsonify(serialize_simulation(sim)), 200
@simulations_bp.patch("/api/simulations/<int:sid>")
@login_required
def update_simulation(sid: int):
sim = db.session.get(Simulation, sid)
if sim is None:
return jsonify({"error": "Simulation not found"}), 404
user = g.current_user
data = request.get_json(silent=True) or {}
if not data:
return jsonify(serialize_simulation(sim)), 200
err = simulation_workflow.apply_patch(sim, data, user)
if err is not None:
return err
db.session.commit()
return jsonify(serialize_simulation(sim)), 200
@simulations_bp.delete("/api/simulations/<int:sid>")
@role_required("admin", "redteam")
def delete_simulation(sid: int):
sim = db.session.get(Simulation, sid)
if sim is None:
return jsonify({"error": "Simulation not found"}), 404
db.session.delete(sim)
db.session.commit()
return "", 204
@simulations_bp.post("/api/simulations/<int:sid>/transition")
@login_required
def transition_simulation(sid: int):
sim = db.session.get(Simulation, sid)
if sim is None:
return jsonify({"error": "Simulation not found"}), 404
data = request.get_json(silent=True) or {}
to_status = data.get("to", "")
err = simulation_workflow.transition(sim, to_status, g.current_user)
if err is not None:
return err
return jsonify(serialize_simulation(sim)), 200
# ---------------------------------------------------------------------------
# MITRE autocomplete + matrix
# ---------------------------------------------------------------------------
@simulations_bp.get("/api/mitre/techniques")
@login_required
def mitre_techniques():
from backend.app.services import mitre as mitre_svc
if not mitre_svc.mitre_loaded:
return jsonify({"error": "mitre bundle not loaded"}), 503
q = request.args.get("q", "").strip()
results = mitre_svc.search(q)
return jsonify(results), 200
@simulations_bp.get("/api/mitre/matrix")
@login_required
def mitre_matrix():
from backend.app.services import mitre as mitre_svc
if not mitre_svc.mitre_loaded:
return jsonify({"error": "mitre bundle not loaded"}), 503
return jsonify(mitre_svc.get_matrix()), 200