"""Host CRUD endpoints (scoped under an engagement).""" from __future__ import annotations from flask import Blueprint, abort, jsonify from flask.typing import ResponseReturnValue from sqlalchemy import select from mimic.api._helpers import ( audit_write, current_user_id, is_rt_lead, jsonify_model, parse_body, parse_uuid, ) from mimic.db.models import Engagement, EngagementMember, Host from mimic.db.types import HostStatus from mimic.extensions import db from mimic.rbac import Permission, require_perm from mimic.schemas import HostCreate, HostRead, HostUpdate bp = Blueprint("hosts", __name__) def _engagement_or_404(eid: str) -> Engagement: engagement = db.session.get(Engagement, parse_uuid(eid, field="engagement id")) if engagement is None: abort(404) # MA6: RT operators may only access engagements they're assigned to. if not is_rt_lead(): user_id = current_user_id() if user_id is None: abort(404) stmt = select(EngagementMember).where( EngagementMember.engagement_id == engagement.id, EngagementMember.user_id == user_id, ) if db.session.execute(stmt).scalar_one_or_none() is None: abort(404) return engagement @bp.get("/engagements//hosts") @require_perm(Permission.HOST_CRUD) def list_hosts(eid: str) -> ResponseReturnValue: engagement = _engagement_or_404(eid) stmt = select(Host).where(Host.engagement_id == engagement.id).order_by(Host.hostname) rows = db.session.execute(stmt).scalars().all() return jsonify([HostRead.model_validate(row).model_dump(mode="json") for row in rows]) @bp.post("/engagements//hosts") @require_perm(Permission.HOST_CRUD) def create_host(eid: str) -> ResponseReturnValue: engagement = _engagement_or_404(eid) payload = parse_body(HostCreate) host = Host( engagement_id=engagement.id, hostname=payload.hostname, ip=payload.ip, os=payload.os, c2_session_id=payload.c2_session_id, c2_type=payload.c2_type, status=HostStatus.UNKNOWN, ) db.session.add(host) db.session.commit() audit_write( action="host.create", resource_type="host", resource_id=host.id, metadata={"engagement_id": str(engagement.id), "hostname": host.hostname}, ) return jsonify_model(HostRead.model_validate(host), status=201) @bp.put("/engagements//hosts/") @require_perm(Permission.HOST_CRUD) def update_host(eid: str, hid: str) -> ResponseReturnValue: engagement = _engagement_or_404(eid) host = db.session.get(Host, parse_uuid(hid, field="host id")) if host is None or host.engagement_id != engagement.id: abort(404) payload = parse_body(HostUpdate) changes = payload.model_dump(exclude_unset=True) for field, value in changes.items(): setattr(host, field, value) db.session.commit() audit_write( action="host.update", resource_type="host", resource_id=host.id, metadata={"engagement_id": str(engagement.id), "fields": sorted(changes.keys())}, ) return jsonify_model(HostRead.model_validate(host)) @bp.delete("/engagements//hosts/") @require_perm(Permission.HOST_CRUD) def delete_host(eid: str, hid: str) -> ResponseReturnValue: engagement = _engagement_or_404(eid) host = db.session.get(Host, parse_uuid(hid, field="host id")) if host is None or host.engagement_id != engagement.id: abort(404) host_id = host.id db.session.delete(host) db.session.commit() audit_write( action="host.delete", resource_type="host", resource_id=host_id, metadata={"engagement_id": str(engagement.id)}, ) return "", 204