99 lines
3.3 KiB
Python
99 lines
3.3 KiB
Python
|
|
"""Initial bootstrap : seed `admin` / `redteam` / `blueteam` system groups + first admin.
|
||
|
|
|
||
|
|
The detailed permission seeding lives in M3 (`mitre.sync` etc.); for M2 we only
|
||
|
|
need an `admin` group that effectively grants full access. We model that as an
|
||
|
|
absent permission set + a special `is_system` flag on the group, plus the
|
||
|
|
`@require_perm` decorator that bypasses checks for any user belonging to a
|
||
|
|
system `admin` group. M3 will fill in the atomic permissions.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import uuid
|
||
|
|
from dataclasses import dataclass
|
||
|
|
|
||
|
|
from sqlalchemy import select
|
||
|
|
|
||
|
|
from app.core.install_token import (
|
||
|
|
mark_install_token_consumed,
|
||
|
|
verify_install_token,
|
||
|
|
)
|
||
|
|
from app.core.security import hash_password
|
||
|
|
from app.db.session import session_scope
|
||
|
|
from app.models.auth import Group, User, UserGroup
|
||
|
|
|
||
|
|
ADMIN_GROUP_NAME = "admin"
|
||
|
|
REDTEAM_GROUP_NAME = "redteam"
|
||
|
|
BLUETEAM_GROUP_NAME = "blueteam"
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class BootstrapResult:
|
||
|
|
user_id: uuid.UUID
|
||
|
|
admin_group_id: uuid.UUID
|
||
|
|
|
||
|
|
|
||
|
|
class BootstrapError(Exception):
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
def ensure_system_groups() -> dict[str, uuid.UUID]:
|
||
|
|
"""Create the three system groups if missing. Idempotent."""
|
||
|
|
out: dict[str, uuid.UUID] = {}
|
||
|
|
with session_scope() as s:
|
||
|
|
for name, desc in (
|
||
|
|
(ADMIN_GROUP_NAME, "Platform administrators — full access."),
|
||
|
|
(REDTEAM_GROUP_NAME, "Red team operators."),
|
||
|
|
(BLUETEAM_GROUP_NAME, "Blue team operators."),
|
||
|
|
):
|
||
|
|
grp = s.scalar(select(Group).where(Group.name == name, Group.is_system.is_(True)))
|
||
|
|
if grp is None:
|
||
|
|
grp = Group(name=name, description=desc, is_system=True)
|
||
|
|
s.add(grp)
|
||
|
|
s.flush()
|
||
|
|
out[name] = grp.id
|
||
|
|
return out
|
||
|
|
|
||
|
|
|
||
|
|
def bootstrap_admin(
|
||
|
|
*, install_token: str, email: str, password: str, display_name: str | None = None
|
||
|
|
) -> BootstrapResult:
|
||
|
|
"""Consume the install token, create the first admin user, attach to admin group."""
|
||
|
|
if not verify_install_token(install_token):
|
||
|
|
raise BootstrapError("invalid or already-consumed install token")
|
||
|
|
if len(password) < 8:
|
||
|
|
raise ValueError("password must be at least 8 characters")
|
||
|
|
|
||
|
|
email_norm = email.strip().lower()
|
||
|
|
|
||
|
|
# Re-check users count under transaction to avoid races.
|
||
|
|
with session_scope() as s:
|
||
|
|
if s.scalar(select(User.id).limit(1)) is not None:
|
||
|
|
raise BootstrapError("setup already done — at least one user exists")
|
||
|
|
|
||
|
|
groups = ensure_system_groups()
|
||
|
|
|
||
|
|
with session_scope() as s:
|
||
|
|
user = User(
|
||
|
|
email=email_norm,
|
||
|
|
display_name=(display_name or "").strip() or None,
|
||
|
|
password_hash=hash_password(password),
|
||
|
|
)
|
||
|
|
s.add(user)
|
||
|
|
s.flush()
|
||
|
|
s.add(UserGroup(user_id=user.id, group_id=groups[ADMIN_GROUP_NAME]))
|
||
|
|
admin_id = groups[ADMIN_GROUP_NAME]
|
||
|
|
user_id = user.id
|
||
|
|
|
||
|
|
mark_install_token_consumed()
|
||
|
|
|
||
|
|
# Re-seed the permission catalogue + system-group bindings. This is called
|
||
|
|
# at boot too, but on a fresh DB after `/diag/reset` the groups were just
|
||
|
|
# recreated above and have no permissions yet — seeding here keeps the
|
||
|
|
# bootstrap path self-contained.
|
||
|
|
from app.services.permissions_seed import seed_all # noqa: PLC0415 — avoid import cycle
|
||
|
|
|
||
|
|
seed_all()
|
||
|
|
|
||
|
|
return BootstrapResult(user_id=user_id, admin_group_id=admin_id)
|