Files
Metamorph/backend/app/core/install_token.py

148 lines
4.6 KiB
Python
Raw Permalink Normal View History

feat(m2): auth, JWT, invitations, bootstrap, RTOps SPA pages Crypto + tokens - app/core/security.py: Argon2id PasswordHasher (time_cost=2, memory_cost= 64 MiB, parallelism=2) + opaque-token SHA-256 helpers (raw token shown once, only the hash lives in the DB). - app/core/jwt_tokens.py: HS256, claims iss/sub/type/jti/iat/exp. Access 1h, refresh 30d. Services - services/auth.py: login, refresh with token rotation + reuse-detection chain revoke, logout (idempotent), change_password (forces logout-all). - services/invitations.py: create, preview, accept, revoke. Default 7d TTL. - services/bootstrap.py: seeds the 3 system groups (admin/redteam/blueteam), consumes the install token, attaches the first user to admin. - core/install_token.py: mints, persists in settings, marks consumed, regenerate hook for /diag/reset. API - POST /setup (consume install token, create 1st admin) + GET /setup (status). - POST /auth/{login,refresh,logout,change-password} + GET /auth/me. - POST /invitations + GET /invitations + GET /invitations/preview/<token> + POST /invitations/accept/<token> + POST /invitations/<id>/revoke. - POST /diag/reset: test-only kill switch (truncate auth tables + mint fresh install token). Allowed in dev too (with WARNING log) so the e2e suite can run against a make-up stack; production locked out. Middleware - @require_auth populates g.current_user (snapshot dataclass, session closed before request handler runs). - @require_perm(*codes): atomic perm union check; admin group bypasses. Perm catalogue lands in M3, scaffolding here. - flask-limiter: 10/min/IP on /auth/login & /auth/refresh, 5/min on /auth/change-password & /setup, 10–20/min on invitation endpoints. Disabled in APP_ENV=test. CLI - flask --app app.cli metamorph print-install-token [--force] - flask --app app.cli metamorph seed-mitre (M4 placeholder) Refresh cookie metamorph_refresh: HttpOnly + Secure (localhost is a secure context for modern browsers) + SameSite=Strict + Path=/api/v1/auth/. Email validation: app.api._validation.Email permissive RFC-shape regex so internal TLDs (.local/.corp/.test) are accepted — pydantic.EmailStr's deliverability check is too strict for red-team labs. Frontend - lib/{api,auth}.ts: access token in module memory, refresh cookie, automatic 401-retry via /auth/refresh, useAuth() hook. - components/{Layout,RequireAuth}.tsx + ui/{TextField,Alert}.tsx. - pages/{Login,Setup,Register,Profile}. Testing - tests/test_auth_flow.py: 15 integration tests (24 backend total). - e2e/tests/m2-auth.spec.ts: 8 Playwright tests (20 e2e total). - tasks/testing-m2.md. DoD: make test-api → 24 passed, make e2e → 20 passed; spec-reviewer pass applied (Secure unconditional, refresh limit 10/min/IP). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 06:16:48 +02:00
"""First-admin install token.
When the `users` table is empty at boot, we mint a one-shot opaque token,
store its SHA-256 in `settings(key='install_token_hash')`, and log the raw
token to stdout. The operator copies it from the logs and posts it to
`/api/v1/setup` with the desired admin credentials.
Idempotency: as long as the token row exists and no admin has consumed it,
subsequent boots reuse the same hash and re-emit the same token only if
explicitly invoked via `flask metamorph print-install-token`.
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone
from sqlalchemy import select
from app.core.security import generate_opaque_token, hash_opaque_token
from app.db.session import session_scope
from app.models.auth import User
from app.models.setting import Setting
INSTALL_TOKEN_KEY = "install_token"
log = logging.getLogger("metamorph.bootstrap")
# Setting JSONB shape: {"hash": "<sha256>", "issued_at": ISO, "expires_at": ISO|null, "consumed_at": ISO|null}
def _users_exist() -> bool:
with session_scope() as s:
return s.execute(select(User.id).limit(1)).first() is not None
def _read_setting() -> Setting | None:
with session_scope() as s:
return s.get(Setting, INSTALL_TOKEN_KEY)
def _write_setting(payload: dict) -> None:
with session_scope() as s:
existing = s.get(Setting, INSTALL_TOKEN_KEY)
if existing is None:
s.add(
Setting(
key=INSTALL_TOKEN_KEY,
value=payload,
description="One-shot bootstrap token for the first admin (M2).",
)
)
else:
existing.value = payload
def ensure_install_token(*, force: bool = False) -> str | None:
"""Mint a token if no users exist and no live token is on file.
Returns the raw token if newly minted (caller is responsible for logging it),
or None if the bootstrap is already consumed / not applicable.
"""
if _users_exist() and not force:
return None
setting = _read_setting()
if setting is not None and not force:
value = setting.value or {}
if value.get("consumed_at"):
return None # consumed, do not mint again
# A pending token exists; we don't know its raw value any more.
# Caller must `force=True` to mint a new one (CLI command will do that).
return None
token = generate_opaque_token()
_write_setting(
{
"hash": hash_opaque_token(token),
"issued_at": datetime.now(tz=timezone.utc).isoformat(),
"expires_at": None, # never expires until consumed
"consumed_at": None,
}
)
return token
def regenerate_install_token() -> str:
"""CLI helper: always mint and persist a fresh token (overwrites any pending one)."""
return ensure_install_token(force=True) or _force_mint()
def _force_mint() -> str:
token = generate_opaque_token()
_write_setting(
{
"hash": hash_opaque_token(token),
"issued_at": datetime.now(tz=timezone.utc).isoformat(),
"expires_at": None,
"consumed_at": None,
}
)
return token
def verify_install_token(token: str) -> bool:
"""Constant-time comparison against the stored hash."""
setting = _read_setting()
if setting is None or not setting.value:
return False
payload = setting.value
if payload.get("consumed_at"):
return False
expected = payload.get("hash")
if not expected:
return False
import hmac
return hmac.compare_digest(hash_opaque_token(token), expected)
def mark_install_token_consumed() -> None:
setting = _read_setting()
if setting is None:
return
payload = dict(setting.value or {})
payload["consumed_at"] = datetime.now(tz=timezone.utc).isoformat()
_write_setting(payload)
def log_install_token_banner(raw_token: str) -> None:
"""Pretty banner so the token is unmissable in container logs."""
sep = "=" * 72
log.warning(
"metamorph.install_token.minted",
extra={
"banner": sep,
"message_template": (
"BOOTSTRAP — copy the token below and POST it to /api/v1/setup "
"with your desired admin email + password. Save it: it is logged once."
),
"install_token": raw_token,
},
)
# Also dump a plain banner so the token is grep-friendly even if the JSON
# consumer hides `extra` fields.
print(sep, flush=True) # noqa: T201
print(f"INSTALL TOKEN: {raw_token}", flush=True) # noqa: T201
print(sep, flush=True) # noqa: T201