feat(backend): wire created_by_id, audit log, F11 scope into CRUD (MA4/5/6)
Three follow-ups on the flat CRUD blueprints triggered by code-review + spec-analyst (MA4, MA5, MA6). **MA4 — `created_by_id`** — engagements, TTPs and scenarios now record the creator from `current_user.id` instead of leaving the FK NULL. The new `api._helpers.current_user_id()` exposes the UUID safely (returns None when the request is unauthenticated, e.g. during /healthz). **MA5 — Audit log integration** — `api._helpers.audit_write(...)` wraps the hash-chained `AuditWriter` and is called after every successful commit in the 4 blueprints (engagement / host / ttp / scenario incl. step), recording the actor, action, resource type/id, IP, user agent, and small metadata (field list, names, engagement scope). F13 "Toute mutation tracée" now holds end-to-end. **MA6 — RT operator scope on engagements** — F11 limits RT operators to "engagements assignés". The previous implementation let them list / read every engagement and every nested resource. Fix: `is_rt_lead()` short- circuits the check for RT leads; otherwise a membership probe against `engagement_member` runs on every list/read and on `_engagement_or_404` in `hosts.py` and `scenarios.py`. Listings now `JOIN engagement_member` and filter by `current_user.id`. `audit_write` casts `db.session` (a `scoped_session` proxy) to the unwrapped `sqlalchemy.orm.Session` that `AuditWriter` expects; the two are interchangeable at runtime. The promotion-perm check on TTPs no longer needs a lazy `flask_login` import since the decorator scope already brings `current_user` in.
This commit is contained in:
@@ -1,11 +1,18 @@
|
|||||||
"""Shared blueprint helpers (pydantic validation, error mapping)."""
|
"""Shared blueprint helpers (pydantic validation, current-user, scope, audit)."""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any, cast
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
from flask import Response, abort, jsonify, request
|
from flask import Response, abort, jsonify, request
|
||||||
|
from flask_login import current_user
|
||||||
from pydantic import BaseModel, ValidationError
|
from pydantic import BaseModel, ValidationError
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from mimic.audit import AuditWriter
|
||||||
|
from mimic.extensions import db
|
||||||
|
from mimic.rbac.matrix import GroupName
|
||||||
|
|
||||||
|
|
||||||
def parse_body[T: BaseModel](model: type[T]) -> T:
|
def parse_body[T: BaseModel](model: type[T]) -> T:
|
||||||
@@ -27,3 +34,46 @@ def parse_uuid(value: str, *, field: str = "id") -> UUID:
|
|||||||
return UUID(value)
|
return UUID(value)
|
||||||
except (ValueError, TypeError):
|
except (ValueError, TypeError):
|
||||||
abort(404, description=f"invalid {field}")
|
abort(404, description=f"invalid {field}")
|
||||||
|
|
||||||
|
|
||||||
|
def current_user_id() -> UUID | None:
|
||||||
|
"""`current_user.id` if logged in, else None (for `created_by_id` columns)."""
|
||||||
|
user_id = getattr(current_user, "id", None)
|
||||||
|
if isinstance(user_id, UUID):
|
||||||
|
return user_id
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def is_rt_lead() -> bool:
|
||||||
|
"""True iff the authenticated user is a member of `rt_lead`."""
|
||||||
|
groups: frozenset[str] = getattr(current_user, "groups", frozenset())
|
||||||
|
return GroupName.RT_LEAD.value in groups
|
||||||
|
|
||||||
|
|
||||||
|
def audit_write(
|
||||||
|
*,
|
||||||
|
action: str,
|
||||||
|
resource_type: str,
|
||||||
|
resource_id: UUID | str | None = None,
|
||||||
|
metadata: dict[str, Any] | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""Write an audit entry for the current request (MA5).
|
||||||
|
|
||||||
|
Auto-commits the audit row so a failed insert in the same transaction
|
||||||
|
cannot mask the audit trail. Caller MUST have committed the resource
|
||||||
|
change before calling this.
|
||||||
|
"""
|
||||||
|
resource_id_str = str(resource_id) if resource_id is not None else None
|
||||||
|
# `db.session` is a scoped_session proxy; AuditWriter expects the unwrapped
|
||||||
|
# ORM session. They are interchangeable at runtime.
|
||||||
|
session = cast(Session, db.session)
|
||||||
|
AuditWriter(session).write(
|
||||||
|
actor_id=current_user_id(),
|
||||||
|
action=action,
|
||||||
|
resource_type=resource_type,
|
||||||
|
resource_id=resource_id_str,
|
||||||
|
metadata=metadata or {},
|
||||||
|
source_ip=request.remote_addr,
|
||||||
|
user_agent=request.headers.get("User-Agent"),
|
||||||
|
)
|
||||||
|
db.session.commit()
|
||||||
|
|||||||
@@ -2,12 +2,21 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from uuid import UUID
|
||||||
|
|
||||||
from flask import Blueprint, abort, jsonify
|
from flask import Blueprint, abort, jsonify
|
||||||
from flask.typing import ResponseReturnValue
|
from flask.typing import ResponseReturnValue
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
|
||||||
from mimic.api._helpers import jsonify_model, parse_body, parse_uuid
|
from mimic.api._helpers import (
|
||||||
from mimic.db.models import Engagement
|
audit_write,
|
||||||
|
current_user_id,
|
||||||
|
is_rt_lead,
|
||||||
|
jsonify_model,
|
||||||
|
parse_body,
|
||||||
|
parse_uuid,
|
||||||
|
)
|
||||||
|
from mimic.db.models import Engagement, EngagementMember
|
||||||
from mimic.db.types import EngagementStatus
|
from mimic.db.types import EngagementStatus
|
||||||
from mimic.extensions import db
|
from mimic.extensions import db
|
||||||
from mimic.rbac import Permission, require_perm
|
from mimic.rbac import Permission, require_perm
|
||||||
@@ -16,10 +25,39 @@ from mimic.schemas import EngagementCreate, EngagementRead, EngagementUpdate
|
|||||||
bp = Blueprint("engagements", __name__)
|
bp = Blueprint("engagements", __name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _engagement_or_404(eid: str) -> Engagement:
|
||||||
|
engagement = db.session.get(Engagement, parse_uuid(eid))
|
||||||
|
if engagement is None:
|
||||||
|
abort(404)
|
||||||
|
# MA6: RT operators may only see engagements they're assigned to.
|
||||||
|
if not is_rt_lead():
|
||||||
|
user_id = current_user_id()
|
||||||
|
if user_id is None or not _is_member(engagement.id, user_id):
|
||||||
|
abort(404)
|
||||||
|
return engagement
|
||||||
|
|
||||||
|
|
||||||
|
def _is_member(engagement_id: UUID, user_id: UUID) -> bool:
|
||||||
|
stmt = select(EngagementMember).where(
|
||||||
|
EngagementMember.engagement_id == engagement_id,
|
||||||
|
EngagementMember.user_id == user_id,
|
||||||
|
)
|
||||||
|
return db.session.execute(stmt).scalar_one_or_none() is not None
|
||||||
|
|
||||||
|
|
||||||
@bp.get("")
|
@bp.get("")
|
||||||
@require_perm(Permission.ENGAGEMENT_READ)
|
@require_perm(Permission.ENGAGEMENT_READ)
|
||||||
def list_engagements() -> ResponseReturnValue:
|
def list_engagements() -> ResponseReturnValue:
|
||||||
stmt = select(Engagement).order_by(Engagement.created_at.desc())
|
stmt = select(Engagement).order_by(Engagement.created_at.desc())
|
||||||
|
# MA6: RT operators see only their assignments.
|
||||||
|
if not is_rt_lead():
|
||||||
|
user_id = current_user_id()
|
||||||
|
if user_id is None:
|
||||||
|
return jsonify([])
|
||||||
|
stmt = stmt.join(
|
||||||
|
EngagementMember,
|
||||||
|
EngagementMember.engagement_id == Engagement.id,
|
||||||
|
).where(EngagementMember.user_id == user_id)
|
||||||
rows = db.session.execute(stmt).scalars().all()
|
rows = db.session.execute(stmt).scalars().all()
|
||||||
return jsonify([EngagementRead.model_validate(row).model_dump(mode="json") for row in rows])
|
return jsonify([EngagementRead.model_validate(row).model_dump(mode="json") for row in rows])
|
||||||
|
|
||||||
@@ -35,40 +73,53 @@ def create_engagement() -> ResponseReturnValue:
|
|||||||
start_date=payload.start_date,
|
start_date=payload.start_date,
|
||||||
end_date=payload.end_date,
|
end_date=payload.end_date,
|
||||||
status=EngagementStatus.DRAFT,
|
status=EngagementStatus.DRAFT,
|
||||||
|
created_by_id=current_user_id(),
|
||||||
)
|
)
|
||||||
db.session.add(engagement)
|
db.session.add(engagement)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
audit_write(
|
||||||
|
action="engagement.create",
|
||||||
|
resource_type="engagement",
|
||||||
|
resource_id=engagement.id,
|
||||||
|
metadata={"client_name": engagement.client_name},
|
||||||
|
)
|
||||||
return jsonify_model(EngagementRead.model_validate(engagement), status=201)
|
return jsonify_model(EngagementRead.model_validate(engagement), status=201)
|
||||||
|
|
||||||
|
|
||||||
@bp.get("/<eid>")
|
@bp.get("/<eid>")
|
||||||
@require_perm(Permission.ENGAGEMENT_READ)
|
@require_perm(Permission.ENGAGEMENT_READ)
|
||||||
def get_engagement(eid: str) -> ResponseReturnValue:
|
def get_engagement(eid: str) -> ResponseReturnValue:
|
||||||
engagement = db.session.get(Engagement, parse_uuid(eid))
|
engagement = _engagement_or_404(eid)
|
||||||
if engagement is None:
|
|
||||||
abort(404)
|
|
||||||
return jsonify_model(EngagementRead.model_validate(engagement))
|
return jsonify_model(EngagementRead.model_validate(engagement))
|
||||||
|
|
||||||
|
|
||||||
@bp.put("/<eid>")
|
@bp.put("/<eid>")
|
||||||
@require_perm(Permission.ENGAGEMENT_UPDATE)
|
@require_perm(Permission.ENGAGEMENT_UPDATE)
|
||||||
def update_engagement(eid: str) -> ResponseReturnValue:
|
def update_engagement(eid: str) -> ResponseReturnValue:
|
||||||
engagement = db.session.get(Engagement, parse_uuid(eid))
|
engagement = _engagement_or_404(eid)
|
||||||
if engagement is None:
|
|
||||||
abort(404)
|
|
||||||
payload = parse_body(EngagementUpdate)
|
payload = parse_body(EngagementUpdate)
|
||||||
for field, value in payload.model_dump(exclude_unset=True).items():
|
changes = payload.model_dump(exclude_unset=True)
|
||||||
|
for field, value in changes.items():
|
||||||
setattr(engagement, field, value)
|
setattr(engagement, field, value)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
audit_write(
|
||||||
|
action="engagement.update",
|
||||||
|
resource_type="engagement",
|
||||||
|
resource_id=engagement.id,
|
||||||
|
metadata={"fields": sorted(changes.keys())},
|
||||||
|
)
|
||||||
return jsonify_model(EngagementRead.model_validate(engagement))
|
return jsonify_model(EngagementRead.model_validate(engagement))
|
||||||
|
|
||||||
|
|
||||||
@bp.delete("/<eid>")
|
@bp.delete("/<eid>")
|
||||||
@require_perm(Permission.ENGAGEMENT_DELETE)
|
@require_perm(Permission.ENGAGEMENT_DELETE)
|
||||||
def delete_engagement(eid: str) -> ResponseReturnValue:
|
def delete_engagement(eid: str) -> ResponseReturnValue:
|
||||||
engagement = db.session.get(Engagement, parse_uuid(eid))
|
engagement = _engagement_or_404(eid)
|
||||||
if engagement is None:
|
|
||||||
abort(404)
|
|
||||||
engagement.status = EngagementStatus.ARCHIVED
|
engagement.status = EngagementStatus.ARCHIVED
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
audit_write(
|
||||||
|
action="engagement.archive",
|
||||||
|
resource_type="engagement",
|
||||||
|
resource_id=engagement.id,
|
||||||
|
)
|
||||||
return "", 204
|
return "", 204
|
||||||
|
|||||||
@@ -6,8 +6,15 @@ from flask import Blueprint, abort, jsonify
|
|||||||
from flask.typing import ResponseReturnValue
|
from flask.typing import ResponseReturnValue
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
|
||||||
from mimic.api._helpers import jsonify_model, parse_body, parse_uuid
|
from mimic.api._helpers import (
|
||||||
from mimic.db.models import Engagement, Host
|
audit_write,
|
||||||
|
current_user_id,
|
||||||
|
is_rt_lead,
|
||||||
|
jsonify_model,
|
||||||
|
parse_body,
|
||||||
|
parse_uuid,
|
||||||
|
)
|
||||||
|
from mimic.db.models import Engagement, EngagementMember, Host
|
||||||
from mimic.db.types import HostStatus
|
from mimic.db.types import HostStatus
|
||||||
from mimic.extensions import db
|
from mimic.extensions import db
|
||||||
from mimic.rbac import Permission, require_perm
|
from mimic.rbac import Permission, require_perm
|
||||||
@@ -20,6 +27,17 @@ def _engagement_or_404(eid: str) -> Engagement:
|
|||||||
engagement = db.session.get(Engagement, parse_uuid(eid, field="engagement id"))
|
engagement = db.session.get(Engagement, parse_uuid(eid, field="engagement id"))
|
||||||
if engagement is None:
|
if engagement is None:
|
||||||
abort(404)
|
abort(404)
|
||||||
|
# MA6: RT operators may only access engagements they're assigned to.
|
||||||
|
if not is_rt_lead():
|
||||||
|
user_id = current_user_id()
|
||||||
|
if user_id is None:
|
||||||
|
abort(404)
|
||||||
|
stmt = select(EngagementMember).where(
|
||||||
|
EngagementMember.engagement_id == engagement.id,
|
||||||
|
EngagementMember.user_id == user_id,
|
||||||
|
)
|
||||||
|
if db.session.execute(stmt).scalar_one_or_none() is None:
|
||||||
|
abort(404)
|
||||||
return engagement
|
return engagement
|
||||||
|
|
||||||
|
|
||||||
@@ -48,6 +66,12 @@ def create_host(eid: str) -> ResponseReturnValue:
|
|||||||
)
|
)
|
||||||
db.session.add(host)
|
db.session.add(host)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
audit_write(
|
||||||
|
action="host.create",
|
||||||
|
resource_type="host",
|
||||||
|
resource_id=host.id,
|
||||||
|
metadata={"engagement_id": str(engagement.id), "hostname": host.hostname},
|
||||||
|
)
|
||||||
return jsonify_model(HostRead.model_validate(host), status=201)
|
return jsonify_model(HostRead.model_validate(host), status=201)
|
||||||
|
|
||||||
|
|
||||||
@@ -59,9 +83,16 @@ def update_host(eid: str, hid: str) -> ResponseReturnValue:
|
|||||||
if host is None or host.engagement_id != engagement.id:
|
if host is None or host.engagement_id != engagement.id:
|
||||||
abort(404)
|
abort(404)
|
||||||
payload = parse_body(HostUpdate)
|
payload = parse_body(HostUpdate)
|
||||||
for field, value in payload.model_dump(exclude_unset=True).items():
|
changes = payload.model_dump(exclude_unset=True)
|
||||||
|
for field, value in changes.items():
|
||||||
setattr(host, field, value)
|
setattr(host, field, value)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
audit_write(
|
||||||
|
action="host.update",
|
||||||
|
resource_type="host",
|
||||||
|
resource_id=host.id,
|
||||||
|
metadata={"engagement_id": str(engagement.id), "fields": sorted(changes.keys())},
|
||||||
|
)
|
||||||
return jsonify_model(HostRead.model_validate(host))
|
return jsonify_model(HostRead.model_validate(host))
|
||||||
|
|
||||||
|
|
||||||
@@ -72,6 +103,13 @@ def delete_host(eid: str, hid: str) -> ResponseReturnValue:
|
|||||||
host = db.session.get(Host, parse_uuid(hid, field="host id"))
|
host = db.session.get(Host, parse_uuid(hid, field="host id"))
|
||||||
if host is None or host.engagement_id != engagement.id:
|
if host is None or host.engagement_id != engagement.id:
|
||||||
abort(404)
|
abort(404)
|
||||||
|
host_id = host.id
|
||||||
db.session.delete(host)
|
db.session.delete(host)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
audit_write(
|
||||||
|
action="host.delete",
|
||||||
|
resource_type="host",
|
||||||
|
resource_id=host_id,
|
||||||
|
metadata={"engagement_id": str(engagement.id)},
|
||||||
|
)
|
||||||
return "", 204
|
return "", 204
|
||||||
|
|||||||
@@ -8,8 +8,15 @@ from flask import Blueprint, abort, jsonify
|
|||||||
from flask.typing import ResponseReturnValue
|
from flask.typing import ResponseReturnValue
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
|
||||||
from mimic.api._helpers import jsonify_model, parse_body, parse_uuid
|
from mimic.api._helpers import (
|
||||||
from mimic.db.models import Engagement, Host, Scenario, ScenarioStep, Ttp
|
audit_write,
|
||||||
|
current_user_id,
|
||||||
|
is_rt_lead,
|
||||||
|
jsonify_model,
|
||||||
|
parse_body,
|
||||||
|
parse_uuid,
|
||||||
|
)
|
||||||
|
from mimic.db.models import Engagement, EngagementMember, Host, Scenario, ScenarioStep, Ttp
|
||||||
from mimic.extensions import db
|
from mimic.extensions import db
|
||||||
from mimic.rbac import Permission, require_perm
|
from mimic.rbac import Permission, require_perm
|
||||||
from mimic.schemas import (
|
from mimic.schemas import (
|
||||||
@@ -27,6 +34,17 @@ def _engagement_or_404(eid: str) -> Engagement:
|
|||||||
engagement = db.session.get(Engagement, parse_uuid(eid, field="engagement id"))
|
engagement = db.session.get(Engagement, parse_uuid(eid, field="engagement id"))
|
||||||
if engagement is None:
|
if engagement is None:
|
||||||
abort(404)
|
abort(404)
|
||||||
|
# MA6: RT operators may only access engagements they're assigned to.
|
||||||
|
if not is_rt_lead():
|
||||||
|
user_id = current_user_id()
|
||||||
|
if user_id is None:
|
||||||
|
abort(404)
|
||||||
|
stmt = select(EngagementMember).where(
|
||||||
|
EngagementMember.engagement_id == engagement.id,
|
||||||
|
EngagementMember.user_id == user_id,
|
||||||
|
)
|
||||||
|
if db.session.execute(stmt).scalar_one_or_none() is None:
|
||||||
|
abort(404)
|
||||||
return engagement
|
return engagement
|
||||||
|
|
||||||
|
|
||||||
@@ -72,6 +90,7 @@ def create_scenario(eid: str) -> ResponseReturnValue:
|
|||||||
description=payload.description,
|
description=payload.description,
|
||||||
c2_type=payload.c2_type,
|
c2_type=payload.c2_type,
|
||||||
version=payload.version,
|
version=payload.version,
|
||||||
|
created_by_id=current_user_id(),
|
||||||
)
|
)
|
||||||
db.session.add(scenario)
|
db.session.add(scenario)
|
||||||
db.session.flush()
|
db.session.flush()
|
||||||
@@ -88,6 +107,16 @@ def create_scenario(eid: str) -> ResponseReturnValue:
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
audit_write(
|
||||||
|
action="scenario.create",
|
||||||
|
resource_type="scenario",
|
||||||
|
resource_id=scenario.id,
|
||||||
|
metadata={
|
||||||
|
"engagement_id": str(engagement.id),
|
||||||
|
"name": scenario.name,
|
||||||
|
"step_count": len(payload.steps),
|
||||||
|
},
|
||||||
|
)
|
||||||
return jsonify_model(ScenarioRead.model_validate(scenario), status=201)
|
return jsonify_model(ScenarioRead.model_validate(scenario), status=201)
|
||||||
|
|
||||||
|
|
||||||
@@ -105,9 +134,16 @@ def update_scenario(eid: str, sid: str) -> ResponseReturnValue:
|
|||||||
engagement = _engagement_or_404(eid)
|
engagement = _engagement_or_404(eid)
|
||||||
scenario = _scenario_or_404(engagement, sid)
|
scenario = _scenario_or_404(engagement, sid)
|
||||||
payload = parse_body(ScenarioUpdate)
|
payload = parse_body(ScenarioUpdate)
|
||||||
for field, value in payload.model_dump(exclude_unset=True).items():
|
changes = payload.model_dump(exclude_unset=True)
|
||||||
|
for field, value in changes.items():
|
||||||
setattr(scenario, field, value)
|
setattr(scenario, field, value)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
audit_write(
|
||||||
|
action="scenario.update",
|
||||||
|
resource_type="scenario",
|
||||||
|
resource_id=scenario.id,
|
||||||
|
metadata={"engagement_id": str(engagement.id), "fields": sorted(changes.keys())},
|
||||||
|
)
|
||||||
return jsonify_model(ScenarioRead.model_validate(scenario))
|
return jsonify_model(ScenarioRead.model_validate(scenario))
|
||||||
|
|
||||||
|
|
||||||
@@ -116,8 +152,15 @@ def update_scenario(eid: str, sid: str) -> ResponseReturnValue:
|
|||||||
def delete_scenario(eid: str, sid: str) -> ResponseReturnValue:
|
def delete_scenario(eid: str, sid: str) -> ResponseReturnValue:
|
||||||
engagement = _engagement_or_404(eid)
|
engagement = _engagement_or_404(eid)
|
||||||
scenario = _scenario_or_404(engagement, sid)
|
scenario = _scenario_or_404(engagement, sid)
|
||||||
|
scenario_id = scenario.id
|
||||||
db.session.delete(scenario)
|
db.session.delete(scenario)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
audit_write(
|
||||||
|
action="scenario.delete",
|
||||||
|
resource_type="scenario",
|
||||||
|
resource_id=scenario_id,
|
||||||
|
metadata={"engagement_id": str(engagement.id)},
|
||||||
|
)
|
||||||
return "", 204
|
return "", 204
|
||||||
|
|
||||||
|
|
||||||
@@ -138,4 +181,15 @@ def add_step(eid: str, sid: str) -> ResponseReturnValue:
|
|||||||
)
|
)
|
||||||
db.session.add(step)
|
db.session.add(step)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
audit_write(
|
||||||
|
action="scenario_step.create",
|
||||||
|
resource_type="scenario_step",
|
||||||
|
resource_id=step.id,
|
||||||
|
metadata={
|
||||||
|
"scenario_id": str(scenario.id),
|
||||||
|
"ttp_id": str(payload.ttp_id),
|
||||||
|
"host_id": str(payload.host_id),
|
||||||
|
"order_idx": payload.order_idx,
|
||||||
|
},
|
||||||
|
)
|
||||||
return jsonify_model(ScenarioStepRead.model_validate(step), status=201)
|
return jsonify_model(ScenarioStepRead.model_validate(step), status=201)
|
||||||
|
|||||||
@@ -4,9 +4,16 @@ from __future__ import annotations
|
|||||||
|
|
||||||
from flask import Blueprint, abort, jsonify
|
from flask import Blueprint, abort, jsonify
|
||||||
from flask.typing import ResponseReturnValue
|
from flask.typing import ResponseReturnValue
|
||||||
|
from flask_login import current_user
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
|
||||||
from mimic.api._helpers import jsonify_model, parse_body, parse_uuid
|
from mimic.api._helpers import (
|
||||||
|
audit_write,
|
||||||
|
current_user_id,
|
||||||
|
jsonify_model,
|
||||||
|
parse_body,
|
||||||
|
parse_uuid,
|
||||||
|
)
|
||||||
from mimic.db.models import Ttp
|
from mimic.db.models import Ttp
|
||||||
from mimic.extensions import db
|
from mimic.extensions import db
|
||||||
from mimic.rbac import Permission, require_perm
|
from mimic.rbac import Permission, require_perm
|
||||||
@@ -27,9 +34,15 @@ def list_ttps() -> ResponseReturnValue:
|
|||||||
@require_perm(Permission.TTP_DRAFT)
|
@require_perm(Permission.TTP_DRAFT)
|
||||||
def create_ttp() -> ResponseReturnValue:
|
def create_ttp() -> ResponseReturnValue:
|
||||||
payload = parse_body(TtpCreate)
|
payload = parse_body(TtpCreate)
|
||||||
ttp = Ttp(**payload.model_dump())
|
ttp = Ttp(**payload.model_dump(), created_by_id=current_user_id())
|
||||||
db.session.add(ttp)
|
db.session.add(ttp)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
audit_write(
|
||||||
|
action="ttp.create",
|
||||||
|
resource_type="ttp",
|
||||||
|
resource_id=ttp.id,
|
||||||
|
metadata={"name": ttp.name, "mitre_technique": ttp.mitre_technique},
|
||||||
|
)
|
||||||
return jsonify_model(TtpRead.model_validate(ttp), status=201)
|
return jsonify_model(TtpRead.model_validate(ttp), status=201)
|
||||||
|
|
||||||
|
|
||||||
@@ -53,12 +66,20 @@ def update_ttp(tid: str) -> ResponseReturnValue:
|
|||||||
publish_flag = data.pop("is_published", None)
|
publish_flag = data.pop("is_published", None)
|
||||||
for field, value in data.items():
|
for field, value in data.items():
|
||||||
setattr(ttp, field, value)
|
setattr(ttp, field, value)
|
||||||
|
promoted = False
|
||||||
if publish_flag is not None:
|
if publish_flag is not None:
|
||||||
# Promotion is a lead-only privilege. Decorator already gates draft
|
# Promotion is a lead-only privilege. Decorator already gates draft
|
||||||
# edits; promotion gets a second-tier check at the call site.
|
# edits; promotion gets a second-tier check at the call site.
|
||||||
_ensure_promote_perm()
|
_ensure_promote_perm()
|
||||||
ttp.is_published = publish_flag
|
ttp.is_published = publish_flag
|
||||||
|
promoted = True
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
audit_write(
|
||||||
|
action="ttp.promote" if promoted else "ttp.update",
|
||||||
|
resource_type="ttp",
|
||||||
|
resource_id=ttp.id,
|
||||||
|
metadata={"fields": sorted(data.keys()), "is_published": ttp.is_published},
|
||||||
|
)
|
||||||
return jsonify_model(TtpRead.model_validate(ttp))
|
return jsonify_model(TtpRead.model_validate(ttp))
|
||||||
|
|
||||||
|
|
||||||
@@ -68,14 +89,14 @@ def delete_ttp(tid: str) -> ResponseReturnValue:
|
|||||||
ttp = db.session.get(Ttp, parse_uuid(tid, field="ttp id"))
|
ttp = db.session.get(Ttp, parse_uuid(tid, field="ttp id"))
|
||||||
if ttp is None:
|
if ttp is None:
|
||||||
abort(404)
|
abort(404)
|
||||||
|
ttp_id = ttp.id
|
||||||
db.session.delete(ttp)
|
db.session.delete(ttp)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
audit_write(action="ttp.delete", resource_type="ttp", resource_id=ttp_id)
|
||||||
return "", 204
|
return "", 204
|
||||||
|
|
||||||
|
|
||||||
def _ensure_promote_perm() -> None:
|
def _ensure_promote_perm() -> None:
|
||||||
from flask_login import current_user # noqa: PLC0415 (lazy: scope-local user only)
|
|
||||||
|
|
||||||
perms: frozenset[Permission] = getattr(current_user, "permissions", frozenset())
|
perms: frozenset[Permission] = getattr(current_user, "permissions", frozenset())
|
||||||
if Permission.TTP_PROMOTE not in perms:
|
if Permission.TTP_PROMOTE not in perms:
|
||||||
abort(403)
|
abort(403)
|
||||||
|
|||||||
Reference in New Issue
Block a user