"""First-admin install token. When the `users` table is empty at boot, we mint a one-shot opaque token, store its SHA-256 in `settings(key='install_token_hash')`, and log the raw token to stdout. The operator copies it from the logs and posts it to `/api/v1/setup` with the desired admin credentials. Idempotency: as long as the token row exists and no admin has consumed it, subsequent boots reuse the same hash and re-emit the same token only if explicitly invoked via `flask metamorph print-install-token`. """ from __future__ import annotations import logging from datetime import datetime, timedelta, timezone from sqlalchemy import select from app.core.security import generate_opaque_token, hash_opaque_token from app.db.session import session_scope from app.models.auth import User from app.models.setting import Setting INSTALL_TOKEN_KEY = "install_token" log = logging.getLogger("metamorph.bootstrap") # Setting JSONB shape: {"hash": "", "issued_at": ISO, "expires_at": ISO|null, "consumed_at": ISO|null} def _users_exist() -> bool: with session_scope() as s: return s.execute(select(User.id).limit(1)).first() is not None def _read_setting() -> Setting | None: with session_scope() as s: return s.get(Setting, INSTALL_TOKEN_KEY) def _write_setting(payload: dict) -> None: with session_scope() as s: existing = s.get(Setting, INSTALL_TOKEN_KEY) if existing is None: s.add( Setting( key=INSTALL_TOKEN_KEY, value=payload, description="One-shot bootstrap token for the first admin (M2).", ) ) else: existing.value = payload def ensure_install_token(*, force: bool = False) -> str | None: """Mint a token if no users exist and no live token is on file. Returns the raw token if newly minted (caller is responsible for logging it), or None if the bootstrap is already consumed / not applicable. """ if _users_exist() and not force: return None setting = _read_setting() if setting is not None and not force: value = setting.value or {} if value.get("consumed_at"): return None # consumed, do not mint again # A pending token exists; we don't know its raw value any more. # Caller must `force=True` to mint a new one (CLI command will do that). return None token = generate_opaque_token() _write_setting( { "hash": hash_opaque_token(token), "issued_at": datetime.now(tz=timezone.utc).isoformat(), "expires_at": None, # never expires until consumed "consumed_at": None, } ) return token def regenerate_install_token() -> str: """CLI helper: always mint and persist a fresh token (overwrites any pending one).""" return ensure_install_token(force=True) or _force_mint() def _force_mint() -> str: token = generate_opaque_token() _write_setting( { "hash": hash_opaque_token(token), "issued_at": datetime.now(tz=timezone.utc).isoformat(), "expires_at": None, "consumed_at": None, } ) return token def verify_install_token(token: str) -> bool: """Constant-time comparison against the stored hash.""" setting = _read_setting() if setting is None or not setting.value: return False payload = setting.value if payload.get("consumed_at"): return False expected = payload.get("hash") if not expected: return False import hmac return hmac.compare_digest(hash_opaque_token(token), expected) def mark_install_token_consumed() -> None: setting = _read_setting() if setting is None: return payload = dict(setting.value or {}) payload["consumed_at"] = datetime.now(tz=timezone.utc).isoformat() _write_setting(payload) def log_install_token_banner(raw_token: str) -> None: """Pretty banner so the token is unmissable in container logs.""" sep = "=" * 72 log.warning( "metamorph.install_token.minted", extra={ "banner": sep, "message_template": ( "BOOTSTRAP — copy the token below and POST it to /api/v1/setup " "with your desired admin email + password. Save it: it is logged once." ), "install_token": raw_token, }, ) # Also dump a plain banner so the token is grep-friendly even if the JSON # consumer hides `extra` fields. print(sep, flush=True) # noqa: T201 print(f"INSTALL TOKEN: {raw_token}", flush=True) # noqa: T201 print(sep, flush=True) # noqa: T201