6 Commits

Author SHA1 Message Date
knacky
76f8443ac2 docs: sprint 2 surface in docs/api.md + D-015/D-016/D-017 + changelog
- `docs/api.md` extended with the sprint-2 surface: pagination envelope
  conventions, engagement members (GET/POST/DELETE), users (GET paginated
  with `?type=`, POST, PATCH, DELETE-soft), audit log viewer with its
  five filters. Anti-enumeration semantics (404 on foreign members) made
  explicit. Drive-by fix: `/engagements<eid>` → `/engagements/<eid>`.
- `tasks/spec-decisions.md` logs the three sprint-2 decisions verbatim:
  - **D-015** USER_MANAGE permission (wording from spec-analyst).
  - **D-016** pagination envelope shape (`{items, total, page, page_size}`).
  - **D-017** `engagement_member.role` stays a free-form label.
- `CHANGELOG.md` summarises the sprint with hashes / behaviours / decisions.
2026-05-23 15:53:45 +02:00
knacky
4bade795fd test(backend): sprint 2 unit + integration coverage
Unit (`tests/unit/test_user_schemas.py`):
- 4 tests on `UserCreate` (happy path, password min length, email
  validation, invalid type).
- 2 tests on `UserUpdate` (all-optional, password validation when set).
- 3 tests on `EngagementMemberCreate` (default `"member"`, explicit role,
  max-length 40).
- 4 tests on `PageQuery` (defaults, offset arithmetic, page_size cap,
  page lower bound).

Integration (`tests/integration/test_user_mgmt_e2e.py`, marked
`integration`):
- The critical MA6-in-practice flow: rt_lead creates rt_operator, assigns
  to engagement A, the operator signs in, lists engagements and sees only
  A, `GET /engagements/B` returns 404 (anti-leak), `GET /engagements/B/members`
  returns 404 too, `/engagements/A/members` is reachable, `GET /users` is
  forbidden for the operator.
- `USER_MANAGE` gate: anonymous → 401, operator session → 403,
  lead session → 200.
- 409 `email_taken` on duplicate `POST /users`.
- `/audit/log` is lead-only, paginates with `page_size`, filters by
  `?action=`.
- Disabling a user blocks subsequent logins (same uniform
  `invalid_credentials` envelope as for bad passwords — no enumeration
  leak of "this account was disabled").

74 unit tests pass (61 sprint 1 + 13 sprint 2); integration tests run on
the testcontainers Postgres fixture in CI.
2026-05-23 15:53:35 +02:00
knacky
9f75f119f0 feat(backend): engagement members + audit log viewer (sprint 2)
Engagement members on `/api/v1/engagements/<eid>/members`:
- `GET` lists members (flat array, ordered by `added_at`). Permission
  `ENGAGEMENT_READ`.
- `POST` adds a member. Permission `ENGAGEMENT_MEMBER_MANAGE`. Body
  `{user_id, role?}`; `role` defaults to `"member"` (D-017). Returns 201
  with `EngagementMemberRead`, 404 if the user is disabled/unknown, or
  `409 already_member` on duplicate.
- `DELETE /members/<uid>` revokes. 204 on success, 404 if the membership
  doesn't exist.

Every route reuses `_engagement_or_404` *before* any membership query, so
an RT operator targeting a foreign engagement receives the same 404 as for
a non-existent ID — matching the MA6 anti-leak posture flagged by
spec-analyst on this sprint.

Audit log viewer on `/api/v1/audit/log`:
- Single endpoint `GET`, paginated `Page[AuditLogEntry]`, gated by
  `AUDIT_READ` (rt_lead only).
- Filters: `?action=`, `?actor_id=`, `?resource_type=`, `?since=`,
  `?until=`. Times are ISO 8601; invalid input goes through the global 422
  envelope with a `loc` field for the bad parameter.
- Exposes `prev_hash` / `row_hash` to support future client-side
  chain-verification (D-013 stayed v1).
- Sorted by `ts DESC` so the most recent activity is the first page.

Blueprints registered in `api/__init__.py`.
2026-05-23 15:53:22 +02:00
knacky
e2f030e0e1 feat(backend): /api/v1/users CRUD endpoints (sprint 2)
Four routes, all gated by `USER_MANAGE` (D-015 — rt_lead only):

- `GET /api/v1/users` paginated, optional `?type=` filter. Returns
  `Page[UserRead]`.
- `POST /api/v1/users` hashes the password with bcrypt, attaches the user
  to the matching F11 group (rt_operator → `rt_operator`, rt_lead →
  `rt_lead`, soc_analyst → `soc_analyst`). Returns 201 with `UserRead`, or
  409 `email_taken` on duplicate email (active or already-disabled).
- `PATCH /api/v1/users/<uid>` partial. Changing `type` realigns the user's
  *global* F11 membership (engagement_id IS NULL) and leaves per-engagement
  memberships untouched. Password rotation rehashes with bcrypt; audit row
  carries a `password_rotated` flag rather than logging the value.
- `DELETE /api/v1/users/<uid>` sets `disabled_at = now()`. Idempotent —
  a second call on a disabled user returns 204 without an extra audit row.
  Hard-delete is intentionally absent: keeps audit-trail FKs valid and
  matches NF-AUDIT (we never lose actor-id linkage).

`_TYPE_TO_GROUP` translates `UserType` enum → `GroupName`; `_resolve_group`
loads the corresponding row from `group` (raises 500 if the seed didn't
run, surfaced through the global JSON error handler).
2026-05-23 15:53:09 +02:00
knacky
feda5d1485 feat(backend): add pagination + user/member/audit DTOs (D-016)
Adds the `Page[T]` envelope `{items, total, page, page_size}` documented in
D-016, the matching `PageQuery` for `?page=&page_size=` parsing (default 50,
max 200), and `parse_page_query()` helper for blueprints.

DTOs:
- `UserRead` / `UserCreate` / `UserUpdate` (sprint 2). `UserRead` never
  exposes `local_password_hash`. `UserCreate` validates email via
  pydantic-email-validator and pins password to 8..128 chars.
- `EngagementMemberRead` / `EngagementMemberCreate`. `role` is a free-form
  string ≤ 40 chars (D-017), defaulting to `"member"`.
- `AuditLogEntry` for the upcoming audit viewer.
2026-05-23 15:52:56 +02:00
knacky
48a1c756bf feat(backend): add USER_MANAGE permission + delta migration (D-015)
Adds `Permission.USER_MANAGE = "user.manage"` to the F11 matrix. rt_lead
already holds ALL_PERMISSIONS so GROUP_PERMISSIONS is unchanged — rt_lead
gets the new permission automatically, rt_operator and soc_analyst get 403.

Alembic migration `202605230001_add_user_manage_permission`:
- inserts the `user.manage` row into `permission`,
- inserts the `(rt_lead, user.manage)` link into `group_permission`,
- exposes `_DELTA_PERMISSIONS` / `_DELTA_GROUP_PERMISSIONS` for parity tests.

The previous `test_frozen_*_matches_runtime` invariant (MA3) is generalised
to "runtime = initial frozen ∪ deltas of every migration in `_DELTAS`". New
migrations register themselves there without editing the historical one.

Verbatim wording from spec-analyst is recorded as D-015 in
`tasks/spec-decisions.md` (separate commit).
2026-05-23 15:52:47 +02:00
18 changed files with 1308 additions and 26 deletions

View File

@@ -5,6 +5,46 @@ Versioning starts at `0.1.0` when sprint 0 lands.
## [Unreleased] ## [Unreleased]
### Sprint 2 — user mgmt + engagement members + audit viewer (`feature/backend-user-mgmt`)
- **`USER_MANAGE` permission** (D-015) added to the F11 matrix; `rt_lead` only.
Migration `202605230001_add_user_manage_permission` adds `user.manage` to
the `permission` table and ties it to the `rt_lead` group. The
`test_migration_seed_matches_current_matrix` invariant is generalised to
the union "initial frozen delta migrations" so future sprints can keep
adding permissions via new migrations without editing the historical one.
- **User CRUD** (`/api/v1/users`):
- `GET` paginated list (filter `?type=`).
- `POST` creates a user, hashes the password, wires the F11 group membership
automatically, returns `409 email_taken` on duplicate.
- `PATCH` partial update; changing `type` realigns the global group
membership and leaves per-engagement memberships untouched.
- `DELETE` soft-disables via `disabled_at`; idempotent (returns 204 even
when already disabled).
- Every mutation writes an audit row (`user.create` / `update` / `disable`).
- **Engagement members** (`/api/v1/engagements/<eid>/members`):
- `GET`, `POST`, `DELETE`. `_engagement_or_404` runs *before* any membership
query so an RT operator targeting a foreign engagement receives the same
404 as for a non-existent id (anti-enumeration).
- `role` is a free-form ≤40-char label (D-017). Default `"member"`.
- `409 already_member` on duplicate.
- **Audit log viewer** (`/api/v1/audit/log`): paginated, `rt_lead` only via
`AUDIT_READ`. Filters: `action`, `actor_id`, `resource_type`, `since`,
`until` (ISO 8601). Exposes `prev_hash` / `row_hash` so future clients can
verify the chain.
- **Pagination envelope** (D-016): `Page[T]` schema
`{items, total, page, page_size}` and `PageQuery` for parsing
`?page=&page_size=` (max 200). Used by `/users` and `/audit/log` this
sprint; existing flat-array endpoints stay unchanged.
- **Spec decisions** D-015, D-016, D-017 logged.
- **Tests**: 11 new unit tests (Pydantic shapes + pagination bounds) + 5 new
integration tests covering the critical MA6 scenario (`rt_lead creates
rt_operator → assigns engagement A → operator only sees A`), the RBAC
gate on `USER_MANAGE`, the 409 on duplicate emails, the audit pagination,
and the soft-disable login-block path.
- **`docs/api.md`** extended with the sprint-2 surface; the typo
`/engagements<eid>``/engagements/<eid>` fixed in passing.
### Sprint 1 — backend follow-up fixes (`feature/backend-auth-wiring`) ### Sprint 1 — backend follow-up fixes (`feature/backend-auth-wiring`)
- **Global JSON error envelope** — register `@app.errorhandler(HTTPException)` - **Global JSON error envelope** — register `@app.errorhandler(HTTPException)`

View File

@@ -4,16 +4,20 @@ from __future__ import annotations
from flask import Flask from flask import Flask
from mimic.api.audit import bp as audit_bp
from mimic.api.auth import bp as auth_bp from mimic.api.auth import bp as auth_bp
from mimic.api.engagements import bp as engagements_bp from mimic.api.engagements import bp as engagements_bp
from mimic.api.hosts import bp as hosts_bp from mimic.api.hosts import bp as hosts_bp
from mimic.api.scenarios import bp as scenarios_bp from mimic.api.scenarios import bp as scenarios_bp
from mimic.api.ttps import bp as ttps_bp from mimic.api.ttps import bp as ttps_bp
from mimic.api.users import bp as users_bp
def register_blueprints(app: Flask) -> None: def register_blueprints(app: Flask) -> None:
app.register_blueprint(auth_bp, url_prefix="/api/v1/auth") app.register_blueprint(auth_bp, url_prefix="/api/v1/auth")
app.register_blueprint(users_bp, url_prefix="/api/v1/users")
app.register_blueprint(engagements_bp, url_prefix="/api/v1/engagements") app.register_blueprint(engagements_bp, url_prefix="/api/v1/engagements")
app.register_blueprint(hosts_bp, url_prefix="/api/v1") app.register_blueprint(hosts_bp, url_prefix="/api/v1")
app.register_blueprint(ttps_bp, url_prefix="/api/v1/library/ttps") app.register_blueprint(ttps_bp, url_prefix="/api/v1/library/ttps")
app.register_blueprint(scenarios_bp, url_prefix="/api/v1") app.register_blueprint(scenarios_bp, url_prefix="/api/v1")
app.register_blueprint(audit_bp, url_prefix="/api/v1/audit")

View File

@@ -15,6 +15,7 @@ from mimic.auth.identity import AuthUser
from mimic.extensions import db from mimic.extensions import db
from mimic.rbac.matrix import GroupName from mimic.rbac.matrix import GroupName
from mimic.schemas import CurrentUser from mimic.schemas import CurrentUser
from mimic.schemas.pagination import PageQuery
def parse_body[T: BaseModel](model: type[T]) -> T: def parse_body[T: BaseModel](model: type[T]) -> T:
@@ -69,6 +70,15 @@ def api_error(code: str, message: str, status: int) -> tuple[Response, int]:
return jsonify({"error": code, "message": message}), status return jsonify({"error": code, "message": message}), status
def parse_page_query() -> PageQuery:
"""Read `?page=` / `?page_size=` from the current request (D-016)."""
raw = {key: value for key, value in request.args.items() if key in {"page", "page_size"}}
try:
return PageQuery.model_validate(raw)
except ValidationError as exc:
abort(422, description=exc.errors())
def audit_write( def audit_write(
*, *,
action: str, action: str,

View File

@@ -0,0 +1,83 @@
"""Audit log viewer (rt_lead only — F11 AUDIT_READ)."""
from __future__ import annotations
from datetime import datetime
from flask import Blueprint, abort, jsonify, request
from flask.typing import ResponseReturnValue
from sqlalchemy import func, select
from sqlalchemy.sql.elements import ColumnElement
from mimic.api._helpers import parse_page_query, parse_uuid
from mimic.db.models import AuditLog
from mimic.extensions import db
from mimic.rbac import Permission, require_perm
from mimic.schemas import AuditLogEntry, Page
bp = Blueprint("audit", __name__)
@bp.get("/log")
@require_perm(Permission.AUDIT_READ)
def list_audit_log() -> ResponseReturnValue:
page_query = parse_page_query()
filters = _parse_filters()
base = select(AuditLog)
for clause in filters:
base = base.where(clause)
total = db.session.execute(select(func.count()).select_from(base.subquery())).scalar_one()
rows = (
db.session.execute(
base.order_by(AuditLog.ts.desc()).offset(page_query.offset).limit(page_query.limit)
)
.scalars()
.all()
)
page = Page[AuditLogEntry](
items=[AuditLogEntry.model_validate(row) for row in rows],
total=total,
page=page_query.page,
page_size=page_query.page_size,
)
return jsonify(page.model_dump(mode="json"))
def _parse_filters() -> list[ColumnElement[bool]]:
"""Translate the query string into SQLAlchemy where-clauses.
Supported filters: `action`, `actor_id`, `resource_type`, `since`, `until`.
Times use ISO 8601; invalid inputs yield a 422 through the global handler.
"""
clauses: list[ColumnElement[bool]] = []
args = request.args
if (action := args.get("action")) is not None:
clauses.append(AuditLog.action == action)
if (resource_type := args.get("resource_type")) is not None:
clauses.append(AuditLog.resource_type == resource_type)
if (actor_id := args.get("actor_id")) is not None:
clauses.append(AuditLog.actor_id == parse_uuid(actor_id, field="actor_id"))
if (since := args.get("since")) is not None:
clauses.append(AuditLog.ts >= _parse_iso(since, "since"))
if (until := args.get("until")) is not None:
clauses.append(AuditLog.ts <= _parse_iso(until, "until"))
return clauses
def _parse_iso(raw: str, field: str) -> datetime:
try:
return datetime.fromisoformat(raw)
except ValueError:
abort(
422,
description=[
{"loc": [field], "msg": "invalid ISO 8601 datetime", "type": "value_error"}
],
)
__all__ = ["bp"]

View File

@@ -16,11 +16,17 @@ from mimic.api._helpers import (
parse_body, parse_body,
parse_uuid, parse_uuid,
) )
from mimic.db.models import Engagement, EngagementMember from mimic.db.models import Engagement, EngagementMember, User
from mimic.db.types import EngagementStatus from mimic.db.types import EngagementStatus
from mimic.extensions import db from mimic.extensions import db
from mimic.rbac import Permission, require_perm from mimic.rbac import Permission, require_perm
from mimic.schemas import EngagementCreate, EngagementRead, EngagementUpdate from mimic.schemas import (
EngagementCreate,
EngagementMemberCreate,
EngagementMemberRead,
EngagementRead,
EngagementUpdate,
)
bp = Blueprint("engagements", __name__) bp = Blueprint("engagements", __name__)
@@ -123,3 +129,92 @@ def delete_engagement(eid: str) -> ResponseReturnValue:
resource_id=engagement.id, resource_id=engagement.id,
) )
return "", 204 return "", 204
# ---------------------------------------------------------------- members
# `_engagement_or_404` runs BEFORE any membership query so a non-member RT
# operator gets the same 404 as a non-existent engagement (spec-analyst
# anti-enumeration requirement).
@bp.get("/<eid>/members")
@require_perm(Permission.ENGAGEMENT_READ)
def list_members(eid: str) -> ResponseReturnValue:
engagement = _engagement_or_404(eid)
rows = (
db.session.execute(
select(EngagementMember)
.where(EngagementMember.engagement_id == engagement.id)
.order_by(EngagementMember.added_at)
)
.scalars()
.all()
)
return jsonify(
[EngagementMemberRead.model_validate(row).model_dump(mode="json") for row in rows]
)
@bp.post("/<eid>/members")
@require_perm(Permission.ENGAGEMENT_MEMBER_MANAGE)
def add_member(eid: str) -> ResponseReturnValue:
engagement = _engagement_or_404(eid)
payload = parse_body(EngagementMemberCreate)
user = db.session.get(User, payload.user_id)
if user is None or not user.is_active:
abort(404, description="user not found")
existing = db.session.execute(
select(EngagementMember).where(
EngagementMember.engagement_id == engagement.id,
EngagementMember.user_id == payload.user_id,
)
).scalar_one_or_none()
if existing is not None:
from mimic.api._helpers import api_error # noqa: PLC0415
return api_error("already_member", "user is already a member of this engagement", 409)
member = EngagementMember(
engagement_id=engagement.id,
user_id=payload.user_id,
role=payload.role,
)
db.session.add(member)
db.session.commit()
audit_write(
action="engagement_member.add",
resource_type="engagement_member",
resource_id=f"{engagement.id}:{payload.user_id}",
metadata={
"engagement_id": str(engagement.id),
"user_id": str(payload.user_id),
"role": payload.role,
},
)
return jsonify_model(EngagementMemberRead.model_validate(member), status=201)
@bp.delete("/<eid>/members/<uid>")
@require_perm(Permission.ENGAGEMENT_MEMBER_MANAGE)
def remove_member(eid: str, uid: str) -> ResponseReturnValue:
engagement = _engagement_or_404(eid)
user_uuid = parse_uuid(uid, field="user id")
member = db.session.execute(
select(EngagementMember).where(
EngagementMember.engagement_id == engagement.id,
EngagementMember.user_id == user_uuid,
)
).scalar_one_or_none()
if member is None:
abort(404)
db.session.delete(member)
db.session.commit()
audit_write(
action="engagement_member.remove",
resource_type="engagement_member",
resource_id=f"{engagement.id}:{user_uuid}",
metadata={"engagement_id": str(engagement.id), "user_id": str(user_uuid)},
)
return "", 204

View File

@@ -0,0 +1,187 @@
"""User management endpoints (rt_lead only — D-015).
All four routes require `USER_MANAGE`. The `DELETE` endpoint is a soft-disable
(sets `disabled_at`); we never hard-delete users so the audit trail and
authored resources keep their FK targets.
"""
from __future__ import annotations
from datetime import UTC, datetime
from flask import Blueprint, abort, jsonify
from flask.typing import ResponseReturnValue
from sqlalchemy import func, select
from sqlalchemy.orm import selectinload
from mimic.api._helpers import (
api_error,
audit_write,
jsonify_model,
parse_body,
parse_page_query,
parse_uuid,
)
from mimic.auth.password import hash_password
from mimic.db.models import Group, User, UserGroup
from mimic.db.types import UserType
from mimic.extensions import db
from mimic.rbac import Permission, require_perm
from mimic.rbac.matrix import GroupName
from mimic.schemas import Page, UserCreate, UserRead, UserUpdate
bp = Blueprint("users", __name__)
_TYPE_TO_GROUP: dict[UserType, GroupName] = {
UserType.RT_OPERATOR: GroupName.RT_OPERATOR,
UserType.RT_LEAD: GroupName.RT_LEAD,
UserType.SOC_ANALYST: GroupName.SOC_ANALYST,
}
def _resolve_group(user_type: UserType) -> Group:
group_name = _TYPE_TO_GROUP[user_type]
group = db.session.execute(
select(Group).where(Group.name == group_name.value)
).scalar_one_or_none()
if group is None:
abort(500, description=f"group {group_name.value!r} missing; run migrations")
return group
@bp.get("")
@require_perm(Permission.USER_MANAGE)
def list_users() -> ResponseReturnValue:
page_query = parse_page_query()
type_filter = _parse_type_filter()
base = select(User)
if type_filter is not None:
base = base.where(User.type == type_filter)
total = db.session.execute(select(func.count()).select_from(base.subquery())).scalar_one()
rows = (
db.session.execute(
base.order_by(User.created_at.desc()).offset(page_query.offset).limit(page_query.limit)
)
.scalars()
.all()
)
page = Page[UserRead](
items=[UserRead.model_validate(row) for row in rows],
total=total,
page=page_query.page,
page_size=page_query.page_size,
)
return jsonify(page.model_dump(mode="json"))
def _parse_type_filter() -> UserType | None:
from flask import request # noqa: PLC0415 — narrow to keep module import lean
raw = request.args.get("type")
if raw is None:
return None
try:
return UserType(raw)
except ValueError:
abort(422, description=[{"loc": ["type"], "msg": "invalid user type", "type": "enum"}])
@bp.post("")
@require_perm(Permission.USER_MANAGE)
def create_user() -> ResponseReturnValue:
payload = parse_body(UserCreate)
existing = db.session.execute(
select(User).where(User.email == payload.email)
).scalar_one_or_none()
if existing is not None:
return api_error("email_taken", "user with this email already exists", 409)
user = User(
email=payload.email,
display_name=payload.display_name,
type=payload.type,
local_password_hash=hash_password(payload.password),
)
db.session.add(user)
db.session.flush()
group = _resolve_group(payload.type)
db.session.add(UserGroup(user_id=user.id, group_id=group.id, engagement_id=None))
db.session.commit()
audit_write(
action="user.create",
resource_type="user",
resource_id=user.id,
metadata={"email": user.email, "type": user.type.value},
)
return jsonify_model(UserRead.model_validate(user), status=201)
@bp.patch("/<uid>")
@require_perm(Permission.USER_MANAGE)
def update_user(uid: str) -> ResponseReturnValue:
user = _user_or_404(uid)
payload = parse_body(UserUpdate)
changes = payload.model_dump(exclude_unset=True)
type_changed = "type" in changes and changes["type"] != user.type
if "display_name" in changes:
user.display_name = changes["display_name"]
if "password" in changes and changes["password"] is not None:
user.local_password_hash = hash_password(changes["password"])
if type_changed:
user.type = changes["type"]
# Realign group membership: drop the previous global membership and
# attach a fresh one matching the new type. Per-engagement memberships
# (engagement_id IS NOT NULL) stay untouched.
for link in list(user.group_links):
if link.engagement_id is None:
db.session.delete(link)
new_group = _resolve_group(changes["type"])
db.session.add(UserGroup(user_id=user.id, group_id=new_group.id, engagement_id=None))
db.session.commit()
audit_write(
action="user.update",
resource_type="user",
resource_id=user.id,
metadata={
"fields": sorted(k for k in changes if k != "password"),
"password_rotated": "password" in changes and changes["password"] is not None,
},
)
return jsonify_model(UserRead.model_validate(user))
@bp.delete("/<uid>")
@require_perm(Permission.USER_MANAGE)
def disable_user(uid: str) -> ResponseReturnValue:
user = _user_or_404(uid)
if user.disabled_at is not None:
return "", 204 # idempotent — already disabled
user.disabled_at = datetime.now(tz=UTC)
db.session.commit()
audit_write(
action="user.disable",
resource_type="user",
resource_id=user.id,
metadata={"email": user.email},
)
return "", 204
def _user_or_404(uid: str) -> User:
user = db.session.execute(
select(User)
.where(User.id == parse_uuid(uid, field="user id"))
.options(selectinload(User.group_links))
).scalar_one_or_none()
if user is None:
abort(404)
return user

View File

@@ -0,0 +1,75 @@
"""add `user.manage` permission + link to rt_lead (D-015)
Revision ID: 202605230001
Revises: 202605210001
Create Date: 2026-05-23
"""
from __future__ import annotations
from uuid import NAMESPACE_DNS, uuid5
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects.postgresql import UUID
revision: str = "202605230001"
down_revision: str | None = "202605210001"
branch_labels: str | None = None
depends_on: str | None = None
_PERMISSION_CODE = "user.manage"
_GROUP_NAME = "rt_lead"
# Frozen delta exposed for the migration-seed parity test
# (see tests/unit/test_migration_seed.py). The runtime matrix must equal the
# union of the initial migration freeze + every subsequent migration delta.
_DELTA_PERMISSIONS: tuple[str, ...] = (_PERMISSION_CODE,)
_DELTA_GROUP_PERMISSIONS: dict[str, frozenset[str]] = {
_GROUP_NAME: frozenset({_PERMISSION_CODE}),
}
def _pid(code: str) -> str:
"""Same hashing scheme as the initial-schema seed (frozen scheme)."""
return str(uuid5(NAMESPACE_DNS, f"mimic.permission.{code}"))
def _gid(name: str) -> str:
return str(uuid5(NAMESPACE_DNS, f"mimic.group.{name}"))
def upgrade() -> None:
permission_table = sa.table(
"permission",
sa.column("id", UUID(as_uuid=True)),
sa.column("code", sa.String),
sa.column("description", sa.String),
)
op.bulk_insert(
permission_table,
[{"id": _pid(_PERMISSION_CODE), "code": _PERMISSION_CODE, "description": None}],
)
group_permission_table = sa.table(
"group_permission",
sa.column("group_id", UUID(as_uuid=True)),
sa.column("permission_id", UUID(as_uuid=True)),
)
op.bulk_insert(
group_permission_table,
[{"group_id": _gid(_GROUP_NAME), "permission_id": _pid(_PERMISSION_CODE)}],
)
def downgrade() -> None:
bind = op.get_bind()
bind.exec_driver_sql(
sa.text("DELETE FROM group_permission WHERE permission_id = :pid").bindparams(
pid=_pid(_PERMISSION_CODE)
)
)
bind.exec_driver_sql(
sa.text("DELETE FROM permission WHERE code = :code").bindparams(code=_PERMISSION_CODE)
)

View File

@@ -54,6 +54,9 @@ class Permission(enum.StrEnum):
# Audit # Audit
AUDIT_READ = "audit.read" AUDIT_READ = "audit.read"
# User management (D-015): gates all /api/v1/users CRUD. rt_lead only.
USER_MANAGE = "user.manage"
ALL_PERMISSIONS: tuple[Permission, ...] = tuple(Permission) ALL_PERMISSIONS: tuple[Permission, ...] = tuple(Permission)

View File

@@ -1,12 +1,15 @@
"""Pydantic 2 request/response DTOs.""" """Pydantic 2 request/response DTOs."""
from mimic.schemas.audit import AuditLogEntry
from mimic.schemas.auth import CurrentUser, LoginRequest from mimic.schemas.auth import CurrentUser, LoginRequest
from mimic.schemas.engagement import ( from mimic.schemas.engagement import (
EngagementCreate, EngagementCreate,
EngagementRead, EngagementRead,
EngagementUpdate, EngagementUpdate,
) )
from mimic.schemas.engagement_member import EngagementMemberCreate, EngagementMemberRead
from mimic.schemas.host import HostCreate, HostRead, HostUpdate from mimic.schemas.host import HostCreate, HostRead, HostUpdate
from mimic.schemas.pagination import Page, PageQuery
from mimic.schemas.scenario import ( from mimic.schemas.scenario import (
ScenarioCreate, ScenarioCreate,
ScenarioRead, ScenarioRead,
@@ -15,16 +18,22 @@ from mimic.schemas.scenario import (
ScenarioUpdate, ScenarioUpdate,
) )
from mimic.schemas.ttp import TtpCreate, TtpRead, TtpUpdate from mimic.schemas.ttp import TtpCreate, TtpRead, TtpUpdate
from mimic.schemas.user import UserCreate, UserRead, UserUpdate
__all__ = [ __all__ = [
"AuditLogEntry",
"CurrentUser", "CurrentUser",
"EngagementCreate", "EngagementCreate",
"EngagementMemberCreate",
"EngagementMemberRead",
"EngagementRead", "EngagementRead",
"EngagementUpdate", "EngagementUpdate",
"HostCreate", "HostCreate",
"HostRead", "HostRead",
"HostUpdate", "HostUpdate",
"LoginRequest", "LoginRequest",
"Page",
"PageQuery",
"ScenarioCreate", "ScenarioCreate",
"ScenarioRead", "ScenarioRead",
"ScenarioStepCreate", "ScenarioStepCreate",
@@ -33,4 +42,7 @@ __all__ = [
"TtpCreate", "TtpCreate",
"TtpRead", "TtpRead",
"TtpUpdate", "TtpUpdate",
"UserCreate",
"UserRead",
"UserUpdate",
] ]

View File

@@ -0,0 +1,28 @@
"""Audit log viewer DTO (sprint 2)."""
from __future__ import annotations
from datetime import datetime
from typing import Any
from uuid import UUID
from pydantic import BaseModel, ConfigDict
class AuditLogEntry(BaseModel):
"""Single audit log row as exposed by `GET /api/v1/audit/log`."""
model_config = ConfigDict(from_attributes=True)
id: UUID
ts: datetime
actor_id: UUID | None
action: str
resource_type: str
resource_id: str | None
metadata_json: dict[str, Any]
prev_hash: str | None
row_hash: str
source_ip: str | None
user_agent: str | None
comment: str | None

View File

@@ -0,0 +1,27 @@
"""Engagement membership DTOs (sprint 2).
`role` is a free-form label per D-017 — not a permission gate. Application-
level RBAC stays the responsibility of the F11 `group` membership; per-
engagement role is informational (e.g. "lead", "shadow", "binôme A").
"""
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
class EngagementMemberRead(BaseModel):
model_config = ConfigDict(from_attributes=True)
engagement_id: UUID
user_id: UUID
role: str
added_at: datetime
class EngagementMemberCreate(BaseModel):
user_id: UUID
role: str = Field(default="member", min_length=1, max_length=40)

View File

@@ -0,0 +1,40 @@
"""Generic pagination envelope (D-016).
Frontend reads `{items, total, page, page_size}`. Sprint 2 uses this on
`/users` and `/audit/log`; existing endpoints (`/engagements`) stay
non-paginated for backwards-compatibility and will migrate together in a
later opt-in (`?paginate=true` or `/api/v2/`).
"""
from __future__ import annotations
from pydantic import BaseModel, ConfigDict, Field
DEFAULT_PAGE_SIZE = 50
MAX_PAGE_SIZE = 200
class Page[T](BaseModel):
"""Paginated response envelope."""
model_config = ConfigDict(arbitrary_types_allowed=True)
items: list[T]
total: int = Field(ge=0)
page: int = Field(ge=1)
page_size: int = Field(ge=1, le=MAX_PAGE_SIZE)
class PageQuery(BaseModel):
"""Parsed `?page=` / `?page_size=` query string (always normalised)."""
page: int = Field(default=1, ge=1)
page_size: int = Field(default=DEFAULT_PAGE_SIZE, ge=1, le=MAX_PAGE_SIZE)
@property
def offset(self) -> int:
return (self.page - 1) * self.page_size
@property
def limit(self) -> int:
return self.page_size

View File

@@ -0,0 +1,41 @@
"""User management DTOs (sprint 2)."""
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel, ConfigDict, EmailStr, Field
from mimic.db.types import UserType
class UserRead(BaseModel):
"""Public user representation. Never exposes `local_password_hash`."""
model_config = ConfigDict(from_attributes=True)
id: UUID
email: str
display_name: str | None
type: UserType
disabled_at: datetime | None
last_login_at: datetime | None
created_at: datetime
class UserCreate(BaseModel):
"""`POST /api/v1/users` body."""
email: EmailStr
display_name: str | None = Field(default=None, max_length=120)
password: str = Field(min_length=8, max_length=128)
type: UserType
class UserUpdate(BaseModel):
"""`PATCH /api/v1/users/<uid>` body (all fields optional)."""
display_name: str | None = Field(default=None, max_length=120)
password: str | None = Field(default=None, min_length=8, max_length=128)
type: UserType | None = None

View File

@@ -0,0 +1,284 @@
"""End-to-end coverage of sprint 2:
- rt_lead creates an rt_operator via `POST /api/v1/users`.
- rt_lead assigns the operator to engagement A (`POST /engagements/A/members`).
- The operator signs in.
- The operator's `GET /engagements` listing shows A and NOT B.
- The operator's `GET /engagements/B` returns 404 (MA6: same 404 as if B
didn't exist — anti-leak).
- The operator's `GET /engagements/B/members` returns 404 too (anti-leak).
- The audit log records the chain (`user.create`, `engagement_member.add`,
`auth.login`).
"""
from __future__ import annotations
from uuid import UUID
import pytest
from mimic.auth.password import hash_password
from mimic.db.models import Engagement, Group, User, UserGroup
from mimic.db.types import C2Type, EngagementStatus, UserType
from mimic.rbac.matrix import GroupName
pytestmark = pytest.mark.integration
def _ensure_group(db, name: GroupName, description: str = "") -> Group:
group = db.session.query(Group).filter_by(name=name.value).first()
if group is None:
group = Group(name=name.value, description=description)
db.session.add(group)
db.session.flush()
return group
def _seed_rt_lead(db, email: str, password: str) -> UUID:
group = _ensure_group(db, GroupName.RT_LEAD, "Red team lead")
user = User(
email=email,
display_name="Lead",
type=UserType.RT_LEAD,
local_password_hash=hash_password(password, rounds=4),
)
db.session.add(user)
db.session.flush()
db.session.add(UserGroup(user_id=user.id, group_id=group.id, engagement_id=None))
db.session.commit()
return user.id
def _ensure_operator_group(db) -> None:
_ensure_group(db, GroupName.RT_OPERATOR, "Red team operator")
def _create_engagement(db, client_name: str) -> UUID:
engagement = Engagement(
client_name=client_name, c2_type=C2Type.MYTHIC, status=EngagementStatus.DRAFT
)
db.session.add(engagement)
db.session.commit()
return engagement.id
def test_lead_creates_operator_assigns_engagement_a_scope_isolates(app, client) -> None:
from mimic.extensions import db # noqa: PLC0415
with app.app_context():
_seed_rt_lead(db, "lead@example.org", "lead-secret-1")
_ensure_operator_group(db)
engagement_a = _create_engagement(db, "Acme A")
engagement_b = _create_engagement(db, "Acme B")
# rt_lead logs in.
response = client.post(
"/api/v1/auth/login",
json={"username": "lead@example.org", "password": "lead-secret-1"},
)
assert response.status_code == 200
# rt_lead creates an rt_operator user.
response = client.post(
"/api/v1/users",
json={
"email": "op@example.org",
"password": "operator-pw-1",
"type": "rt_operator",
"display_name": "Op One",
},
)
assert response.status_code == 201, response.get_json()
operator_id = response.get_json()["id"]
# rt_lead assigns the operator to engagement A only.
response = client.post(
f"/api/v1/engagements/{engagement_a}/members",
json={"user_id": operator_id, "role": "binôme"},
)
assert response.status_code == 201
# rt_lead listing /users contains the operator.
response = client.get("/api/v1/users?type=rt_operator")
assert response.status_code == 200
body = response.get_json()
assert body["total"] == 1
assert body["items"][0]["email"] == "op@example.org"
# Logout the lead and log in as the operator.
client.post("/api/v1/auth/logout")
response = client.post(
"/api/v1/auth/login",
json={"username": "op@example.org", "password": "operator-pw-1"},
)
assert response.status_code == 200
op_payload = response.get_json()
assert op_payload["role"] == "rt_operator"
# /engagements lists only A.
response = client.get("/api/v1/engagements")
assert response.status_code == 200
listing = response.get_json()
ids = {row["id"] for row in listing}
assert ids == {str(engagement_a)}
# /engagements/B → 404 (MA6 — anti-leak: same response as a non-existent id).
response = client.get(f"/api/v1/engagements/{engagement_b}")
assert response.status_code == 404
assert response.get_json()["error"] == "not_found"
# /engagements/B/members → 404 too (spec-analyst anti-enum requirement).
response = client.get(f"/api/v1/engagements/{engagement_b}/members")
assert response.status_code == 404
# /engagements/A is reachable for the operator.
response = client.get(f"/api/v1/engagements/{engagement_a}")
assert response.status_code == 200
# /engagements/A/members is reachable for the operator (they are a member).
response = client.get(f"/api/v1/engagements/{engagement_a}/members")
assert response.status_code == 200
members = response.get_json()
assert len(members) == 1
assert members[0]["user_id"] == operator_id
assert members[0]["role"] == "binôme"
# The operator cannot list /users (no USER_MANAGE permission).
response = client.get("/api/v1/users")
assert response.status_code == 403
def test_user_management_lead_only(app, client) -> None:
"""USER_MANAGE is rt_lead-only (D-015). An operator with a session gets
a clean 403 — and an anonymous request gets a 401."""
from mimic.extensions import db # noqa: PLC0415
with app.app_context():
_seed_rt_lead(db, "lead2@example.org", "lead2-secret-1")
_ensure_operator_group(db)
client.post(
"/api/v1/auth/login",
json={"username": "lead2@example.org", "password": "lead2-secret-1"},
)
# Create an operator user.
response = client.post(
"/api/v1/users",
json={
"email": "op2@example.org",
"password": "operator-pw-1",
"type": "rt_operator",
},
)
assert response.status_code == 201
client.post("/api/v1/auth/logout")
# Anonymous → 401.
response = client.get("/api/v1/users")
assert response.status_code == 401
# Operator session → 403.
client.post(
"/api/v1/auth/login",
json={"username": "op2@example.org", "password": "operator-pw-1"},
)
response = client.get("/api/v1/users")
assert response.status_code == 403
def test_create_user_email_taken_returns_409(app, client) -> None:
from mimic.extensions import db # noqa: PLC0415
with app.app_context():
_seed_rt_lead(db, "lead3@example.org", "lead3-secret-1")
_ensure_operator_group(db)
client.post(
"/api/v1/auth/login",
json={"username": "lead3@example.org", "password": "lead3-secret-1"},
)
body = {
"email": "dup@example.org",
"password": "longenough",
"type": "rt_operator",
}
assert client.post("/api/v1/users", json=body).status_code == 201
response = client.post("/api/v1/users", json=body)
assert response.status_code == 409
payload = response.get_json()
assert payload["error"] == "email_taken"
def test_audit_log_endpoint_is_lead_only_and_paginates(app, client) -> None:
from mimic.extensions import db # noqa: PLC0415
with app.app_context():
_seed_rt_lead(db, "lead4@example.org", "lead4-secret-1")
_ensure_operator_group(db)
client.post(
"/api/v1/auth/login",
json={"username": "lead4@example.org", "password": "lead4-secret-1"},
)
# Generate some audit activity.
for i in range(3):
client.post(
"/api/v1/users",
json={
"email": f"audit-op-{i}@example.org",
"password": "longenough",
"type": "rt_operator",
},
)
response = client.get("/api/v1/audit/log?page_size=2&action=user.create")
assert response.status_code == 200
body = response.get_json()
assert body["page_size"] == 2
assert body["page"] == 1
assert len(body["items"]) == 2
assert body["total"] >= 3
assert all(entry["action"] == "user.create" for entry in body["items"])
# Operator session → 403.
client.post("/api/v1/auth/logout")
client.post(
"/api/v1/auth/login",
json={"username": "audit-op-0@example.org", "password": "longenough"},
)
response = client.get("/api/v1/audit/log")
assert response.status_code == 403
def test_disable_user_blocks_future_login(app, client) -> None:
from mimic.extensions import db # noqa: PLC0415
with app.app_context():
_seed_rt_lead(db, "lead5@example.org", "lead5-secret-1")
_ensure_operator_group(db)
client.post(
"/api/v1/auth/login",
json={"username": "lead5@example.org", "password": "lead5-secret-1"},
)
response = client.post(
"/api/v1/users",
json={
"email": "soon-disabled@example.org",
"password": "longenough",
"type": "rt_operator",
},
)
assert response.status_code == 201
user_id = response.get_json()["id"]
response = client.delete(f"/api/v1/users/{user_id}")
assert response.status_code == 204
client.post("/api/v1/auth/logout")
response = client.post(
"/api/v1/auth/login",
json={"username": "soon-disabled@example.org", "password": "longenough"},
)
# Disabled accounts return the same uniform envelope as bad credentials.
assert response.status_code == 401
assert response.get_json()["error"] == "invalid_credentials"

View File

@@ -1,39 +1,72 @@
"""MA3: the frozen RBAC seed in the initial migration must keep matching """Migration-seed parity test (MA3 + sprint 2 delta).
the runtime F11 matrix in `mimic.rbac.matrix`. When they drift, *do not* edit
the migration in place — write a new migration. This test enforces it. The runtime F11 matrix in `mimic.rbac.matrix` must equal the union of:
- the inline frozen snapshot in the initial migration `202605210001`, plus
- every per-migration `_DELTA_PERMISSIONS` / `_DELTA_GROUP_PERMISSIONS` added
by later migrations.
When the runtime drifts, *do not* edit an existing migration: write a new
one with its own delta block and extend the `_MIGRATIONS` tuple below.
""" """
from __future__ import annotations from __future__ import annotations
import importlib import importlib
from types import ModuleType
from mimic.rbac.matrix import GROUP_PERMISSIONS, GroupName, Permission from mimic.rbac.matrix import GROUP_PERMISSIONS, GroupName, Permission
_INITIAL = "mimic.db.migrations.versions.202605210001_initial_schema"
def _load_migration(): _DELTAS: tuple[str, ...] = ("mimic.db.migrations.versions.202605230001_add_user_manage_permission",)
return importlib.import_module("mimic.db.migrations.versions.202605210001_initial_schema")
def test_frozen_permission_list_matches_runtime() -> None: def _load(name: str) -> ModuleType:
migration = _load_migration() return importlib.import_module(name)
def _cumulative_permissions() -> set[str]:
initial = _load(_INITIAL)
codes = set(initial._PERMISSIONS_FROZEN)
for name in _DELTAS:
delta = _load(name)
codes.update(delta._DELTA_PERMISSIONS)
return codes
def _cumulative_group_permissions() -> dict[str, set[str]]:
initial = _load(_INITIAL)
cumulative = {gn: set(perms) for gn, perms in initial._GROUP_PERMISSIONS_FROZEN.items()}
for name in _DELTAS:
delta = _load(name)
# `rt_lead` carries ALL_PERMISSIONS at runtime — every delta perm
# implicitly extends rt_lead too, regardless of whether the migration
# listed it explicitly.
rt_lead_implicit = {p for perms in delta._DELTA_GROUP_PERMISSIONS.values() for p in perms}
cumulative.setdefault("rt_lead", set()).update(rt_lead_implicit)
for group_name, perms in delta._DELTA_GROUP_PERMISSIONS.items():
cumulative.setdefault(group_name, set()).update(perms)
return cumulative
def test_runtime_permissions_match_cumulative_migrations() -> None:
runtime_codes = {p.value for p in Permission} runtime_codes = {p.value for p in Permission}
frozen_codes = set(migration._PERMISSIONS_FROZEN) cumulative = _cumulative_permissions()
assert runtime_codes == frozen_codes, ( assert runtime_codes == cumulative, (
"Permission enum drifted from the migration freeze; " "Permission enum drifted from the cumulative migration freeze; "
"write a new migration, do not edit the existing one." "write a new migration delta, do not edit existing ones."
) )
def test_frozen_group_membership_matches_runtime() -> None: def test_runtime_group_membership_matches_cumulative_migrations() -> None:
migration = _load_migration()
runtime = {gn.value: {p.value for p in perms} for gn, perms in GROUP_PERMISSIONS.items()} runtime = {gn.value: {p.value for p in perms} for gn, perms in GROUP_PERMISSIONS.items()}
frozen = {gn: set(perms) for gn, perms in migration._GROUP_PERMISSIONS_FROZEN.items()} cumulative = _cumulative_group_permissions()
assert runtime == frozen, ( assert runtime == cumulative, (
"GROUP_PERMISSIONS drifted from the migration freeze; " "GROUP_PERMISSIONS drifted from the cumulative migration freeze; "
"write a new migration, do not edit the existing one." "write a new migration delta, do not edit existing ones."
) )
def test_frozen_group_names_match_enum() -> None: def test_initial_frozen_group_names_match_enum() -> None:
migration = _load_migration() initial = _load(_INITIAL)
assert set(migration._GROUP_PERMISSIONS_FROZEN.keys()) == {g.value for g in GroupName} assert set(initial._GROUP_PERMISSIONS_FROZEN.keys()) == {g.value for g in GroupName}

View File

@@ -0,0 +1,94 @@
"""User/member DTO validation (no DB)."""
from __future__ import annotations
from uuid import uuid4
import pytest
from pydantic import ValidationError
from mimic.db.types import UserType
from mimic.schemas import EngagementMemberCreate, UserCreate, UserUpdate
from mimic.schemas.pagination import MAX_PAGE_SIZE, PageQuery
class TestUserCreate:
def test_minimal_valid_payload(self) -> None:
u = UserCreate.model_validate(
{
"email": "alice@example.org",
"password": "longenough",
"type": "rt_operator",
}
)
assert u.email == "alice@example.org"
assert u.type is UserType.RT_OPERATOR
assert u.display_name is None
def test_password_min_length(self) -> None:
with pytest.raises(ValidationError):
UserCreate.model_validate(
{"email": "a@b.c", "password": "short", "type": "rt_operator"}
)
def test_invalid_email_rejected(self) -> None:
with pytest.raises(ValidationError):
UserCreate.model_validate(
{"email": "not-an-email", "password": "longenough", "type": "rt_operator"}
)
def test_invalid_type_rejected(self) -> None:
with pytest.raises(ValidationError):
UserCreate.model_validate(
{"email": "a@b.c", "password": "longenough", "type": "not-a-role"}
)
class TestUserUpdate:
def test_all_optional(self) -> None:
u = UserUpdate.model_validate({})
assert u.display_name is None
assert u.password is None
assert u.type is None
def test_password_validates_when_provided(self) -> None:
with pytest.raises(ValidationError):
UserUpdate.model_validate({"password": "tiny"})
class TestEngagementMemberCreate:
def test_default_role(self) -> None:
member = EngagementMemberCreate.model_validate({"user_id": str(uuid4())})
assert member.role == "member"
def test_explicit_role(self) -> None:
member = EngagementMemberCreate.model_validate(
{"user_id": str(uuid4()), "role": "lead-on-mission"}
)
assert member.role == "lead-on-mission"
def test_role_max_length(self) -> None:
with pytest.raises(ValidationError):
EngagementMemberCreate.model_validate({"user_id": str(uuid4()), "role": "x" * 41})
class TestPageQuery:
def test_defaults(self) -> None:
page = PageQuery.model_validate({})
assert page.page == 1
assert page.page_size == 50
assert page.offset == 0
assert page.limit == 50
def test_offset_arithmetic(self) -> None:
page = PageQuery.model_validate({"page": 4, "page_size": 25})
assert page.offset == 75
assert page.limit == 25
def test_page_size_clamped(self) -> None:
with pytest.raises(ValidationError):
PageQuery.model_validate({"page_size": MAX_PAGE_SIZE + 1})
def test_page_lower_bound(self) -> None:
with pytest.raises(ValidationError):
PageQuery.model_validate({"page": 0})

View File

@@ -1,6 +1,7 @@
# Mimic API — sprint 1 surface # Mimic API — sprint 1 + 2 surface
This document covers the endpoints the frontend is expected to call in sprint 1. This document covers the endpoints the frontend is expected to call in
sprints 1 and 2 (auth, engagements, users, engagement members, audit log).
Everything is JSON, every protected route relies on the Flask session cookie set Everything is JSON, every protected route relies on the Flask session cookie set
by `POST /api/v1/auth/login`. CORS is enabled only when `MIMIC_ENV=development` by `POST /api/v1/auth/login`. CORS is enabled only when `MIMIC_ENV=development`
and `MIMIC_CORS_ORIGINS` is set (the prod reverse proxy serves the SPA on the and `MIMIC_CORS_ORIGINS` is set (the prod reverse proxy serves the SPA on the
@@ -68,6 +69,19 @@ RT operators only see engagements they are members of. Requests targeting an
engagement they don't belong to return **404**, never 403, so the existence of a engagement they don't belong to return **404**, never 403, so the existence of a
neighbouring engagement is not leaked between teams. RT leads see everything. neighbouring engagement is not leaked between teams. RT leads see everything.
This applies to every `/api/v1/engagements/<eid>/...` route, including the
`/members` sub-resource introduced in sprint 2.
### Pagination (D-016)
The new sprint-2 endpoints (`/users`, `/audit/log`) return:
```json
{ "items": [...], "total": <n>, "page": 1, "page_size": 50 }
```
Query: `?page=` (≥1, default 1) and `?page_size=` (default 50, max 200). The
existing non-paginated endpoints (`/engagements` list, `/engagements/<id>/members`)
stay as flat arrays — they'll migrate together in a future opt-in.
## Authentication ## Authentication
### `POST /api/v1/auth/login` ### `POST /api/v1/auth/login`
@@ -148,7 +162,7 @@ Response — `200`:
] ]
``` ```
### `GET /api/v1/engagements<eid>` ### `GET /api/v1/engagements/<eid>`
Same payload shape as the list element. Returns 404 if the engagement does not Same payload shape as the list element. Returns 404 if the engagement does not
exist or the caller is not a member (MA6). exist or the caller is not a member (MA6).
@@ -178,6 +192,167 @@ on creation; they see every engagement via the `is_rt_lead()` short-circuit.
This will change in a future sprint when membership becomes the single scope This will change in a future sprint when membership becomes the single scope
authority. authority.
## Engagement members (sprint 2)
The MA6 tenant-scope check (`_engagement_or_404`) runs **before** any
membership query: a non-member RT operator targeting an engagement gets the
same `404 not_found` as if the engagement did not exist.
### `GET /api/v1/engagements/<eid>/members`
Lists members of the engagement (`engagement.read`). Flat array, not paginated.
```json
[
{
"engagement_id": "•••",
"user_id": "•••",
"role": "binôme A",
"added_at": "2026-05-23T12:00:00+00:00"
}
]
```
### `POST /api/v1/engagements/<eid>/members`
Adds a member (`engagement.member.manage`).
```json
{ "user_id": "•••", "role": "member" }
```
- `role` is a free-form label ≤40 chars (D-017); not a permission gate.
Defaults to `"member"`.
- `201` with the new `EngagementMemberRead`.
- `404` if the user does not exist or is disabled.
- `409 already_member` if the user is already in this engagement.
- Audit `engagement_member.add` row written.
### `DELETE /api/v1/engagements/<eid>/members/<uid>`
Revokes membership (`engagement.member.manage`).
- `204 No Content` on success.
- `404` if the membership does not exist.
- Audit `engagement_member.remove` row written.
## Users (sprint 2, `rt_lead` only — D-015)
All four routes require `USER_MANAGE`. `rt_operator` and `soc_analyst` get
`403 forbidden`.
### `GET /api/v1/users`
Paginated. Optional filter `?type=rt_lead|rt_operator|soc_analyst`.
```json
{
"items": [
{
"id": "•••",
"email": "alice@example.org",
"display_name": "Alice",
"type": "rt_lead",
"disabled_at": null,
"last_login_at": "2026-05-23T08:00:00+00:00",
"created_at": "2026-05-21T10:00:00+00:00"
}
],
"total": 1,
"page": 1,
"page_size": 50
}
```
### `POST /api/v1/users`
Body:
```json
{
"email": "newuser@example.org",
"display_name": "New User",
"password": "longenough",
"type": "rt_operator"
}
```
- `password` ≥ 8 chars, ≤ 128.
- `type` ∈ `rt_operator | rt_lead | soc_analyst`. Group membership is wired
automatically to the matching F11 group.
- `201` with the created `UserRead`.
- `409 email_taken` if a user with that email exists (whether active or
already disabled).
- Audit `user.create` row written.
### `PATCH /api/v1/users/<uid>`
Partial update — every field is optional.
```json
{ "display_name": "Renamed", "type": "rt_lead", "password": "newlongenough" }
```
- Changing `type` realigns the user's global F11 group membership; existing
per-engagement memberships are preserved.
- `password` rotates the bcrypt hash; never logged in audit metadata.
- `200` with the updated `UserRead`.
- `404` if the user does not exist.
- Audit `user.update` row written (lists changed fields; flags
`password_rotated`).
### `DELETE /api/v1/users/<uid>`
Soft-disable: sets `disabled_at = now()`. The user can no longer log in;
`load_user` returns `None` so existing sessions become anonymous on next
request.
- `204 No Content`. Idempotent: a second call on a disabled user also returns
`204` (no audit row).
- `404` if the user does not exist.
- Audit `user.disable` row written.
## Audit log (sprint 2, `rt_lead` only — F11 `audit.read`)
### `GET /api/v1/audit/log`
Paginated, descending by `ts`. Filters:
| Query | Type | Meaning |
|-------|------|---------|
| `action` | string | exact match (`user.create`, `engagement.update`, …) |
| `actor_id` | UUID | filter by acting user |
| `resource_type` | string | exact match (`engagement`, `user`, …) |
| `since` | ISO 8601 | rows with `ts >= since` |
| `until` | ISO 8601 | rows with `ts <= until` |
Response:
```json
{
"items": [
{
"id": "•••",
"ts": "2026-05-23T12:00:00+00:00",
"actor_id": "•••",
"action": "engagement.create",
"resource_type": "engagement",
"resource_id": "•••",
"metadata_json": { "client_name": "Acme" },
"prev_hash": "•••",
"row_hash": "•••",
"source_ip": "127.0.0.1",
"user_agent": "curl/8.5.0",
"comment": null
}
],
"total": 42,
"page": 1,
"page_size": 50
}
```
`prev_hash` / `row_hash` are exposed as-is to support future client-side
chain verification (D-013).
## Worked example ## Worked example
1. Create a local admin from the CLI: 1. Create a local admin from the CLI:

View File

@@ -152,3 +152,54 @@ extension owns the registry).
on `UuidPkMixin`. Foreign-key UUID columns rely on SQLAlchemy 2's built-in on `UuidPkMixin`. Foreign-key UUID columns rely on SQLAlchemy 2's built-in
`Uuid` mapping via `Mapped[uuid.UUID]`. No `type_annotation_map` on the `Uuid` mapping via `Mapped[uuid.UUID]`. No `type_annotation_map` on the
declarative base. declarative base.
### D-015 — User management permission
**Decision**: Add `USER_MANAGE = "user.manage"` to the `Permission` enum in
`backend/src/mimic/rbac/matrix.py`. This permission gates all `/api/v1/users`
CRUD endpoints (list, create, update/disable). It is granted exclusively to
`rt_lead` (already holds ALL_PERMISSIONS — no change to GROUP_PERMISSIONS dict).
**Why**: The F11 matrix does not explicitly list "manage users" as a named
permission, but spec §9 routes assign `/admin` (users, audit log) to Lead RT only.
The CLI `mimic-cli user create` covered creation out-of-band but sprint 2 adds a
UI-facing REST endpoint, which requires a named permission for `@require_perm`
decorator + testability.
**How to apply**: Backend uses `@require_perm(Permission.USER_MANAGE)` on all
`/api/v1/users` endpoints. No change to GROUP_PERMISSIONS needed — rt_lead holds
ALL_PERMISSIONS already. rt_operator and soc_analyst get 403 automatically.
### D-016 — Pagination envelope shape
**Context.** Sprint 2 adds two paginated endpoints (`/users` and `/audit/log`);
sprint 3+ will paginate TTPs and scenarios. A consistent shape avoids two
client-side parsers.
**Decision.** Standard envelope:
```json
{ "items": [...], "total": <n>, "page": 1, "page_size": 50 }
```
- Query params: `?page=` (≥1, default 1), `?page_size=` (default 50, max 200).
- `total` is computed via a `SELECT COUNT(*)` against the same filtered query.
- Existing non-paginated endpoints (`GET /api/v1/engagements`) are **not**
migrated this sprint — changing them retroactively would break the frontend
client that already shipped. They'll migrate together later via either a
`/api/v2/` bump or an opt-in `?paginate=true` flag.
**How to apply.** `mimic.schemas.pagination.Page[T]` + `PageQuery` provide the
shape and the validated query parsing; `mimic.api._helpers.parse_page_query()`
is the canonical entrypoint inside blueprints.
### D-017 — `engagement_member.role` as a free-form label
**Context.** The `engagement_member.role` column is `String(40)` (sprint 0).
Sprint 2 needs to know what to validate at the API boundary.
**Decision.** Treat `role` as a free-form informational label, not as an
authorization gate. Application-level RBAC stays the responsibility of the F11
`group` membership; `role` documents who-does-what on the engagement
(e.g. `"member"`, `"lead-on-mission"`, `"binôme A"`, `"shadow"`). Default to
`"member"` when not provided. Validation: 140 chars.
**How to apply.** `EngagementMemberCreate` uses a `str` field with the
140-char bound; no enum to maintain. If future code needs a typed role,
introduce a separate column (do not repurpose this one).