From 3a3e3ff0ec6d5efbebe5ba256f6572b6a088ab7e Mon Sep 17 00:00:00 2001 From: knacky Date: Fri, 22 May 2026 05:24:54 +0200 Subject: [PATCH] feat(backend): wire created_by_id, audit log, F11 scope into CRUD (MA4/5/6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three follow-ups on the flat CRUD blueprints triggered by code-review + spec-analyst (MA4, MA5, MA6). **MA4 — `created_by_id`** — engagements, TTPs and scenarios now record the creator from `current_user.id` instead of leaving the FK NULL. The new `api._helpers.current_user_id()` exposes the UUID safely (returns None when the request is unauthenticated, e.g. during /healthz). **MA5 — Audit log integration** — `api._helpers.audit_write(...)` wraps the hash-chained `AuditWriter` and is called after every successful commit in the 4 blueprints (engagement / host / ttp / scenario incl. step), recording the actor, action, resource type/id, IP, user agent, and small metadata (field list, names, engagement scope). F13 "Toute mutation tracée" now holds end-to-end. **MA6 — RT operator scope on engagements** — F11 limits RT operators to "engagements assignés". The previous implementation let them list / read every engagement and every nested resource. Fix: `is_rt_lead()` short- circuits the check for RT leads; otherwise a membership probe against `engagement_member` runs on every list/read and on `_engagement_or_404` in `hosts.py` and `scenarios.py`. Listings now `JOIN engagement_member` and filter by `current_user.id`. `audit_write` casts `db.session` (a `scoped_session` proxy) to the unwrapped `sqlalchemy.orm.Session` that `AuditWriter` expects; the two are interchangeable at runtime. The promotion-perm check on TTPs no longer needs a lazy `flask_login` import since the decorator scope already brings `current_user` in. --- backend/src/mimic/api/_helpers.py | 52 ++++++++++++++++++- backend/src/mimic/api/engagements.py | 75 +++++++++++++++++++++++----- backend/src/mimic/api/hosts.py | 44 ++++++++++++++-- backend/src/mimic/api/scenarios.py | 60 ++++++++++++++++++++-- backend/src/mimic/api/ttps.py | 29 +++++++++-- 5 files changed, 237 insertions(+), 23 deletions(-) diff --git a/backend/src/mimic/api/_helpers.py b/backend/src/mimic/api/_helpers.py index 6d296ce..96750ee 100644 --- a/backend/src/mimic/api/_helpers.py +++ b/backend/src/mimic/api/_helpers.py @@ -1,11 +1,18 @@ -"""Shared blueprint helpers (pydantic validation, error mapping).""" +"""Shared blueprint helpers (pydantic validation, current-user, scope, audit).""" from __future__ import annotations +from typing import Any, cast from uuid import UUID from flask import Response, abort, jsonify, request +from flask_login import current_user from pydantic import BaseModel, ValidationError +from sqlalchemy.orm import Session + +from mimic.audit import AuditWriter +from mimic.extensions import db +from mimic.rbac.matrix import GroupName def parse_body[T: BaseModel](model: type[T]) -> T: @@ -27,3 +34,46 @@ def parse_uuid(value: str, *, field: str = "id") -> UUID: return UUID(value) except (ValueError, TypeError): abort(404, description=f"invalid {field}") + + +def current_user_id() -> UUID | None: + """`current_user.id` if logged in, else None (for `created_by_id` columns).""" + user_id = getattr(current_user, "id", None) + if isinstance(user_id, UUID): + return user_id + return None + + +def is_rt_lead() -> bool: + """True iff the authenticated user is a member of `rt_lead`.""" + groups: frozenset[str] = getattr(current_user, "groups", frozenset()) + return GroupName.RT_LEAD.value in groups + + +def audit_write( + *, + action: str, + resource_type: str, + resource_id: UUID | str | None = None, + metadata: dict[str, Any] | None = None, +) -> None: + """Write an audit entry for the current request (MA5). + + Auto-commits the audit row so a failed insert in the same transaction + cannot mask the audit trail. Caller MUST have committed the resource + change before calling this. + """ + resource_id_str = str(resource_id) if resource_id is not None else None + # `db.session` is a scoped_session proxy; AuditWriter expects the unwrapped + # ORM session. They are interchangeable at runtime. + session = cast(Session, db.session) + AuditWriter(session).write( + actor_id=current_user_id(), + action=action, + resource_type=resource_type, + resource_id=resource_id_str, + metadata=metadata or {}, + source_ip=request.remote_addr, + user_agent=request.headers.get("User-Agent"), + ) + db.session.commit() diff --git a/backend/src/mimic/api/engagements.py b/backend/src/mimic/api/engagements.py index ec73b2d..25ac0f3 100644 --- a/backend/src/mimic/api/engagements.py +++ b/backend/src/mimic/api/engagements.py @@ -2,12 +2,21 @@ from __future__ import annotations +from uuid import UUID + from flask import Blueprint, abort, jsonify from flask.typing import ResponseReturnValue from sqlalchemy import select -from mimic.api._helpers import jsonify_model, parse_body, parse_uuid -from mimic.db.models import Engagement +from mimic.api._helpers import ( + audit_write, + current_user_id, + is_rt_lead, + jsonify_model, + parse_body, + parse_uuid, +) +from mimic.db.models import Engagement, EngagementMember from mimic.db.types import EngagementStatus from mimic.extensions import db from mimic.rbac import Permission, require_perm @@ -16,10 +25,39 @@ from mimic.schemas import EngagementCreate, EngagementRead, EngagementUpdate bp = Blueprint("engagements", __name__) +def _engagement_or_404(eid: str) -> Engagement: + engagement = db.session.get(Engagement, parse_uuid(eid)) + if engagement is None: + abort(404) + # MA6: RT operators may only see engagements they're assigned to. + if not is_rt_lead(): + user_id = current_user_id() + if user_id is None or not _is_member(engagement.id, user_id): + abort(404) + return engagement + + +def _is_member(engagement_id: UUID, user_id: UUID) -> bool: + stmt = select(EngagementMember).where( + EngagementMember.engagement_id == engagement_id, + EngagementMember.user_id == user_id, + ) + return db.session.execute(stmt).scalar_one_or_none() is not None + + @bp.get("") @require_perm(Permission.ENGAGEMENT_READ) def list_engagements() -> ResponseReturnValue: stmt = select(Engagement).order_by(Engagement.created_at.desc()) + # MA6: RT operators see only their assignments. + if not is_rt_lead(): + user_id = current_user_id() + if user_id is None: + return jsonify([]) + stmt = stmt.join( + EngagementMember, + EngagementMember.engagement_id == Engagement.id, + ).where(EngagementMember.user_id == user_id) rows = db.session.execute(stmt).scalars().all() return jsonify([EngagementRead.model_validate(row).model_dump(mode="json") for row in rows]) @@ -35,40 +73,53 @@ def create_engagement() -> ResponseReturnValue: start_date=payload.start_date, end_date=payload.end_date, status=EngagementStatus.DRAFT, + created_by_id=current_user_id(), ) db.session.add(engagement) db.session.commit() + audit_write( + action="engagement.create", + resource_type="engagement", + resource_id=engagement.id, + metadata={"client_name": engagement.client_name}, + ) return jsonify_model(EngagementRead.model_validate(engagement), status=201) @bp.get("/") @require_perm(Permission.ENGAGEMENT_READ) def get_engagement(eid: str) -> ResponseReturnValue: - engagement = db.session.get(Engagement, parse_uuid(eid)) - if engagement is None: - abort(404) + engagement = _engagement_or_404(eid) return jsonify_model(EngagementRead.model_validate(engagement)) @bp.put("/") @require_perm(Permission.ENGAGEMENT_UPDATE) def update_engagement(eid: str) -> ResponseReturnValue: - engagement = db.session.get(Engagement, parse_uuid(eid)) - if engagement is None: - abort(404) + engagement = _engagement_or_404(eid) payload = parse_body(EngagementUpdate) - for field, value in payload.model_dump(exclude_unset=True).items(): + changes = payload.model_dump(exclude_unset=True) + for field, value in changes.items(): setattr(engagement, field, value) db.session.commit() + audit_write( + action="engagement.update", + resource_type="engagement", + resource_id=engagement.id, + metadata={"fields": sorted(changes.keys())}, + ) return jsonify_model(EngagementRead.model_validate(engagement)) @bp.delete("/") @require_perm(Permission.ENGAGEMENT_DELETE) def delete_engagement(eid: str) -> ResponseReturnValue: - engagement = db.session.get(Engagement, parse_uuid(eid)) - if engagement is None: - abort(404) + engagement = _engagement_or_404(eid) engagement.status = EngagementStatus.ARCHIVED db.session.commit() + audit_write( + action="engagement.archive", + resource_type="engagement", + resource_id=engagement.id, + ) return "", 204 diff --git a/backend/src/mimic/api/hosts.py b/backend/src/mimic/api/hosts.py index 984d82c..20b07fb 100644 --- a/backend/src/mimic/api/hosts.py +++ b/backend/src/mimic/api/hosts.py @@ -6,8 +6,15 @@ from flask import Blueprint, abort, jsonify from flask.typing import ResponseReturnValue from sqlalchemy import select -from mimic.api._helpers import jsonify_model, parse_body, parse_uuid -from mimic.db.models import Engagement, Host +from mimic.api._helpers import ( + audit_write, + current_user_id, + is_rt_lead, + jsonify_model, + parse_body, + parse_uuid, +) +from mimic.db.models import Engagement, EngagementMember, Host from mimic.db.types import HostStatus from mimic.extensions import db from mimic.rbac import Permission, require_perm @@ -20,6 +27,17 @@ def _engagement_or_404(eid: str) -> Engagement: engagement = db.session.get(Engagement, parse_uuid(eid, field="engagement id")) if engagement is None: abort(404) + # MA6: RT operators may only access engagements they're assigned to. + if not is_rt_lead(): + user_id = current_user_id() + if user_id is None: + abort(404) + stmt = select(EngagementMember).where( + EngagementMember.engagement_id == engagement.id, + EngagementMember.user_id == user_id, + ) + if db.session.execute(stmt).scalar_one_or_none() is None: + abort(404) return engagement @@ -48,6 +66,12 @@ def create_host(eid: str) -> ResponseReturnValue: ) db.session.add(host) db.session.commit() + audit_write( + action="host.create", + resource_type="host", + resource_id=host.id, + metadata={"engagement_id": str(engagement.id), "hostname": host.hostname}, + ) return jsonify_model(HostRead.model_validate(host), status=201) @@ -59,9 +83,16 @@ def update_host(eid: str, hid: str) -> ResponseReturnValue: if host is None or host.engagement_id != engagement.id: abort(404) payload = parse_body(HostUpdate) - for field, value in payload.model_dump(exclude_unset=True).items(): + changes = payload.model_dump(exclude_unset=True) + for field, value in changes.items(): setattr(host, field, value) db.session.commit() + audit_write( + action="host.update", + resource_type="host", + resource_id=host.id, + metadata={"engagement_id": str(engagement.id), "fields": sorted(changes.keys())}, + ) return jsonify_model(HostRead.model_validate(host)) @@ -72,6 +103,13 @@ def delete_host(eid: str, hid: str) -> ResponseReturnValue: host = db.session.get(Host, parse_uuid(hid, field="host id")) if host is None or host.engagement_id != engagement.id: abort(404) + host_id = host.id db.session.delete(host) db.session.commit() + audit_write( + action="host.delete", + resource_type="host", + resource_id=host_id, + metadata={"engagement_id": str(engagement.id)}, + ) return "", 204 diff --git a/backend/src/mimic/api/scenarios.py b/backend/src/mimic/api/scenarios.py index 5e0c0bf..1336c8f 100644 --- a/backend/src/mimic/api/scenarios.py +++ b/backend/src/mimic/api/scenarios.py @@ -8,8 +8,15 @@ from flask import Blueprint, abort, jsonify from flask.typing import ResponseReturnValue from sqlalchemy import select -from mimic.api._helpers import jsonify_model, parse_body, parse_uuid -from mimic.db.models import Engagement, Host, Scenario, ScenarioStep, Ttp +from mimic.api._helpers import ( + audit_write, + current_user_id, + is_rt_lead, + jsonify_model, + parse_body, + parse_uuid, +) +from mimic.db.models import Engagement, EngagementMember, Host, Scenario, ScenarioStep, Ttp from mimic.extensions import db from mimic.rbac import Permission, require_perm from mimic.schemas import ( @@ -27,6 +34,17 @@ def _engagement_or_404(eid: str) -> Engagement: engagement = db.session.get(Engagement, parse_uuid(eid, field="engagement id")) if engagement is None: abort(404) + # MA6: RT operators may only access engagements they're assigned to. + if not is_rt_lead(): + user_id = current_user_id() + if user_id is None: + abort(404) + stmt = select(EngagementMember).where( + EngagementMember.engagement_id == engagement.id, + EngagementMember.user_id == user_id, + ) + if db.session.execute(stmt).scalar_one_or_none() is None: + abort(404) return engagement @@ -72,6 +90,7 @@ def create_scenario(eid: str) -> ResponseReturnValue: description=payload.description, c2_type=payload.c2_type, version=payload.version, + created_by_id=current_user_id(), ) db.session.add(scenario) db.session.flush() @@ -88,6 +107,16 @@ def create_scenario(eid: str) -> ResponseReturnValue: ) ) db.session.commit() + audit_write( + action="scenario.create", + resource_type="scenario", + resource_id=scenario.id, + metadata={ + "engagement_id": str(engagement.id), + "name": scenario.name, + "step_count": len(payload.steps), + }, + ) return jsonify_model(ScenarioRead.model_validate(scenario), status=201) @@ -105,9 +134,16 @@ def update_scenario(eid: str, sid: str) -> ResponseReturnValue: engagement = _engagement_or_404(eid) scenario = _scenario_or_404(engagement, sid) payload = parse_body(ScenarioUpdate) - for field, value in payload.model_dump(exclude_unset=True).items(): + changes = payload.model_dump(exclude_unset=True) + for field, value in changes.items(): setattr(scenario, field, value) db.session.commit() + audit_write( + action="scenario.update", + resource_type="scenario", + resource_id=scenario.id, + metadata={"engagement_id": str(engagement.id), "fields": sorted(changes.keys())}, + ) return jsonify_model(ScenarioRead.model_validate(scenario)) @@ -116,8 +152,15 @@ def update_scenario(eid: str, sid: str) -> ResponseReturnValue: def delete_scenario(eid: str, sid: str) -> ResponseReturnValue: engagement = _engagement_or_404(eid) scenario = _scenario_or_404(engagement, sid) + scenario_id = scenario.id db.session.delete(scenario) db.session.commit() + audit_write( + action="scenario.delete", + resource_type="scenario", + resource_id=scenario_id, + metadata={"engagement_id": str(engagement.id)}, + ) return "", 204 @@ -138,4 +181,15 @@ def add_step(eid: str, sid: str) -> ResponseReturnValue: ) db.session.add(step) db.session.commit() + audit_write( + action="scenario_step.create", + resource_type="scenario_step", + resource_id=step.id, + metadata={ + "scenario_id": str(scenario.id), + "ttp_id": str(payload.ttp_id), + "host_id": str(payload.host_id), + "order_idx": payload.order_idx, + }, + ) return jsonify_model(ScenarioStepRead.model_validate(step), status=201) diff --git a/backend/src/mimic/api/ttps.py b/backend/src/mimic/api/ttps.py index 929c55c..cb776cd 100644 --- a/backend/src/mimic/api/ttps.py +++ b/backend/src/mimic/api/ttps.py @@ -4,9 +4,16 @@ from __future__ import annotations from flask import Blueprint, abort, jsonify from flask.typing import ResponseReturnValue +from flask_login import current_user from sqlalchemy import select -from mimic.api._helpers import jsonify_model, parse_body, parse_uuid +from mimic.api._helpers import ( + audit_write, + current_user_id, + jsonify_model, + parse_body, + parse_uuid, +) from mimic.db.models import Ttp from mimic.extensions import db from mimic.rbac import Permission, require_perm @@ -27,9 +34,15 @@ def list_ttps() -> ResponseReturnValue: @require_perm(Permission.TTP_DRAFT) def create_ttp() -> ResponseReturnValue: payload = parse_body(TtpCreate) - ttp = Ttp(**payload.model_dump()) + ttp = Ttp(**payload.model_dump(), created_by_id=current_user_id()) db.session.add(ttp) db.session.commit() + audit_write( + action="ttp.create", + resource_type="ttp", + resource_id=ttp.id, + metadata={"name": ttp.name, "mitre_technique": ttp.mitre_technique}, + ) return jsonify_model(TtpRead.model_validate(ttp), status=201) @@ -53,12 +66,20 @@ def update_ttp(tid: str) -> ResponseReturnValue: publish_flag = data.pop("is_published", None) for field, value in data.items(): setattr(ttp, field, value) + promoted = False if publish_flag is not None: # Promotion is a lead-only privilege. Decorator already gates draft # edits; promotion gets a second-tier check at the call site. _ensure_promote_perm() ttp.is_published = publish_flag + promoted = True db.session.commit() + audit_write( + action="ttp.promote" if promoted else "ttp.update", + resource_type="ttp", + resource_id=ttp.id, + metadata={"fields": sorted(data.keys()), "is_published": ttp.is_published}, + ) return jsonify_model(TtpRead.model_validate(ttp)) @@ -68,14 +89,14 @@ def delete_ttp(tid: str) -> ResponseReturnValue: ttp = db.session.get(Ttp, parse_uuid(tid, field="ttp id")) if ttp is None: abort(404) + ttp_id = ttp.id db.session.delete(ttp) db.session.commit() + audit_write(action="ttp.delete", resource_type="ttp", resource_id=ttp_id) return "", 204 def _ensure_promote_perm() -> None: - from flask_login import current_user # noqa: PLC0415 (lazy: scope-local user only) - perms: frozenset[Permission] = getattr(current_user, "permissions", frozenset()) if Permission.TTP_PROMOTE not in perms: abort(403)