"""Admin-side group management: CRUD + permission bindings. System groups (`is_system=True`: admin, redteam, blueteam) cannot be renamed or deleted, but their permission bindings are seeded on boot and editable afterwards (e.g. an admin can broaden `redteam`). """ from __future__ import annotations import uuid from dataclasses import dataclass from datetime import datetime, timezone from sqlalchemy import func, select from app.db.session import session_scope from app.models.auth import Group, GroupPermission, Permission, UserGroup from app.services.bootstrap import ADMIN_GROUP_NAME class GroupNotFound(Exception): pass class GroupNameConflict(Exception): pass class SystemGroupProtected(Exception): """Refusing to delete or rename a built-in system group.""" @dataclass(frozen=True) class GroupView: id: uuid.UUID name: str description: str | None is_system: bool deleted_at: datetime | None members_count: int permissions: list[str] created_at: datetime updated_at: datetime def _to_view(g: Group, members_count: int) -> GroupView: return GroupView( id=g.id, name=g.name, description=g.description, is_system=g.is_system, deleted_at=g.deleted_at, members_count=members_count, permissions=sorted(p.code for p in g.permissions), created_at=g.created_at, updated_at=g.updated_at, ) def _members_counts(s, group_ids: list[uuid.UUID]) -> dict[uuid.UUID, int]: if not group_ids: return {} from app.models.auth import User as _U # local to avoid model cycles rows = s.execute( select(UserGroup.group_id, func.count(UserGroup.user_id)) .join(_U, _U.id == UserGroup.user_id) .where(UserGroup.group_id.in_(group_ids), _U.deleted_at.is_(None)) .group_by(UserGroup.group_id) ).all() return {gid: int(cnt) for gid, cnt in rows} def list_groups(*, include_deleted: bool = False) -> list[GroupView]: with session_scope() as s: stmt = select(Group).order_by(Group.is_system.desc(), Group.name.asc()) if not include_deleted: stmt = stmt.where(Group.deleted_at.is_(None)) rows = s.scalars(stmt).all() counts = _members_counts(s, [g.id for g in rows]) return [_to_view(g, counts.get(g.id, 0)) for g in rows] def get_group(group_id: uuid.UUID) -> GroupView: with session_scope() as s: g = s.get(Group, group_id) if g is None or g.deleted_at is not None: raise GroupNotFound() counts = _members_counts(s, [g.id]) return _to_view(g, counts.get(g.id, 0)) def create_group(*, name: str, description: str | None) -> GroupView: name_norm = name.strip() if not name_norm: raise ValueError("name is required") with session_scope() as s: existing = s.scalar( select(Group).where(Group.name == name_norm, Group.deleted_at.is_(None)) ) if existing is not None: raise GroupNameConflict(f"group name {name_norm!r} already in use") g = Group(name=name_norm, description=(description or "").strip() or None, is_system=False) s.add(g) s.flush() return _to_view(g, 0) def update_group( group_id: uuid.UUID, *, name: str | None = None, description: str | None | object = ..., ) -> GroupView: with session_scope() as s: g = s.get(Group, group_id) if g is None or g.deleted_at is not None: raise GroupNotFound() if name is not None: name_norm = name.strip() if not name_norm: raise ValueError("name cannot be empty") if g.is_system and name_norm != g.name: raise SystemGroupProtected("system groups cannot be renamed") if name_norm != g.name: clash = s.scalar( select(Group).where( Group.name == name_norm, Group.deleted_at.is_(None), Group.id != g.id, ) ) if clash is not None: raise GroupNameConflict(f"group name {name_norm!r} already in use") g.name = name_norm if description is not ...: if description in (None, ""): g.description = None else: g.description = description.strip() or None counts = _members_counts(s, [g.id]) return _to_view(g, counts.get(g.id, 0)) def soft_delete_group(group_id: uuid.UUID) -> None: with session_scope() as s: g = s.get(Group, group_id) if g is None or g.deleted_at is not None: raise GroupNotFound() if g.is_system: raise SystemGroupProtected("system groups cannot be deleted") g.deleted_at = datetime.now(tz=timezone.utc) def set_group_permissions(group_id: uuid.UUID, codes: list[str]) -> GroupView: """Replace the group's permission set with the given codes (validated).""" desired_codes = set(codes) with session_scope() as s: g = s.get(Group, group_id) if g is None or g.deleted_at is not None: raise GroupNotFound() # Preserve the invariant "the system `admin` group has every perm." The # decorator's admin bypass relies on `is_admin` (group membership), not # on the perm set, so a stripped admin group would still grant access — # but the listing would look misleading and a future refactor could # reasonably switch the bypass to a perm-based check. if g.is_system and g.name == ADMIN_GROUP_NAME: all_codes = {p.code for p in s.scalars(select(Permission)).all()} if desired_codes != all_codes: raise SystemGroupProtected( "the admin group must keep every permission" ) if desired_codes: perms = s.scalars(select(Permission).where(Permission.code.in_(desired_codes))).all() known = {p.code for p in perms} unknown = desired_codes - known if unknown: raise ValueError(f"unknown permission codes: {sorted(unknown)}") else: perms = [] current = {p.code: p for p in g.permissions} to_remove = set(current) - desired_codes to_add = desired_codes - set(current) for code in to_remove: row = s.get(GroupPermission, (g.id, current[code].id)) if row is not None: s.delete(row) for p in perms: if p.code in to_add: s.add(GroupPermission(group_id=g.id, permission_id=p.id)) s.flush() s.refresh(g) counts = _members_counts(s, [g.id]) return _to_view(g, counts.get(g.id, 0)) def list_permissions() -> list[dict]: """Return the catalogue of all permissions known to the platform.""" with session_scope() as s: rows = s.scalars(select(Permission).order_by(Permission.code.asc())).all() return [ {"id": str(p.id), "code": p.code, "description": p.description} for p in rows ]