"""Admin-side user management: list, get, update, soft-delete, assign groups. Self-service updates (locale, password, display_name) live in `services.auth` — this module is for admin operations on other users. """ from __future__ import annotations import uuid from dataclasses import dataclass from datetime import datetime, timezone from typing import Iterable from sqlalchemy import func, or_, select from app.db.session import session_scope from app.models.auth import Group, User, UserGroup from app.services.bootstrap import ADMIN_GROUP_NAME class UserNotFound(Exception): pass class LastAdminProtected(Exception): """Refusing to strip admin from the last active admin.""" class SystemGroupProtected(Exception): """Refusing to delete or rename a built-in system group.""" @dataclass(frozen=True) class UserView: id: uuid.UUID email: str display_name: str | None locale: str is_active: bool deleted_at: datetime | None created_at: datetime updated_at: datetime groups: list[tuple[uuid.UUID, str]] def _to_view(u: User) -> UserView: return UserView( id=u.id, email=u.email, display_name=u.display_name, locale=u.locale, is_active=u.is_active, deleted_at=u.deleted_at, created_at=u.created_at, updated_at=u.updated_at, groups=[(g.id, g.name) for g in u.groups if g.deleted_at is None], ) def list_users( *, q: str | None = None, is_active: bool | None = None, include_deleted: bool = False, limit: int = 50, offset: int = 0, ) -> tuple[list[UserView], int]: """Return (rows, total_count) with case-insensitive search on email + display_name.""" with session_scope() as s: stmt = select(User) count_stmt = select(func.count()).select_from(User) if not include_deleted: stmt = stmt.where(User.deleted_at.is_(None)) count_stmt = count_stmt.where(User.deleted_at.is_(None)) if is_active is not None: stmt = stmt.where(User.is_active.is_(is_active)) count_stmt = count_stmt.where(User.is_active.is_(is_active)) if q: like = f"%{q.lower()}%" stmt = stmt.where( or_(func.lower(User.email).like(like), func.lower(User.display_name).like(like)) ) count_stmt = count_stmt.where( or_(func.lower(User.email).like(like), func.lower(User.display_name).like(like)) ) stmt = stmt.order_by(User.email.asc()).limit(limit).offset(offset) rows = s.scalars(stmt).all() total = int(s.scalar(count_stmt) or 0) views = [_to_view(u) for u in rows] return views, total def get_user(user_id: uuid.UUID, *, include_deleted: bool = False) -> UserView: with session_scope() as s: u = s.get(User, user_id) if u is None or (u.deleted_at is not None and not include_deleted): raise UserNotFound() return _to_view(u) def update_user( user_id: uuid.UUID, *, display_name: str | None | object = ..., locale: str | None = None, is_active: bool | None = None, ) -> UserView: """Partial update. Pass display_name=None to clear; omit to leave unchanged.""" with session_scope() as s: u = s.get(User, user_id) if u is None or u.deleted_at is not None: raise UserNotFound() if display_name is not ...: if display_name in (None, ""): u.display_name = None else: u.display_name = display_name.strip() or None if locale is not None: u.locale = locale if is_active is not None: # If deactivating the last active admin, refuse. if not is_active and _is_last_active_admin(s, u): raise LastAdminProtected("cannot deactivate the last active admin") u.is_active = is_active return _to_view(u) def soft_delete_user(user_id: uuid.UUID) -> None: with session_scope() as s: u = s.get(User, user_id) if u is None or u.deleted_at is not None: raise UserNotFound() if _is_last_active_admin(s, u): raise LastAdminProtected("cannot delete the last active admin") u.deleted_at = datetime.now(tz=timezone.utc) u.is_active = False def set_user_groups(user_id: uuid.UUID, group_ids: Iterable[uuid.UUID]) -> UserView: """Replace the user's group memberships with the given set.""" desired = set(group_ids) with session_scope() as s: u = s.get(User, user_id) if u is None or u.deleted_at is not None: raise UserNotFound() # Resolve admin group id once. admin_group_id = s.scalar( select(Group.id).where(Group.name == ADMIN_GROUP_NAME, Group.is_system.is_(True)) ) is_currently_admin = admin_group_id in {g.id for g in u.groups} will_be_admin = admin_group_id in desired if is_currently_admin and not will_be_admin and _is_last_active_admin(s, u): raise LastAdminProtected("cannot remove admin from the last active admin") # Refuse silently for unknown groups: validate first. if desired: known = set( s.scalars( select(Group.id).where(Group.id.in_(desired), Group.deleted_at.is_(None)) ).all() ) unknown = desired - known if unknown: raise ValueError(f"unknown groups: {sorted(map(str, unknown))}") current = {g.id for g in u.groups} to_add = desired - current to_remove = current - desired for gid in to_remove: row = s.get(UserGroup, (u.id, gid)) if row is not None: s.delete(row) for gid in to_add: s.add(UserGroup(user_id=u.id, group_id=gid)) s.flush() s.refresh(u) return _to_view(u) def _is_last_active_admin(s, user: User) -> bool: """True when `user` is currently in the admin system group and removing/blocking them would leave the platform with zero active admins.""" admin_group_id = s.scalar( select(Group.id).where(Group.name == ADMIN_GROUP_NAME, Group.is_system.is_(True)) ) if admin_group_id is None: return False if admin_group_id not in {g.id for g in user.groups}: return False other_admins = s.scalar( select(func.count()) .select_from(User) .join(UserGroup, UserGroup.user_id == User.id) .where( UserGroup.group_id == admin_group_id, User.id != user.id, User.deleted_at.is_(None), User.is_active.is_(True), ) ) return int(other_admins or 0) == 0