feat(backend): add local auth + group-based RBAC matching F11 (B0.6)

- Permission enum + GroupName enum + GROUP_PERMISSIONS mapping mirror
  the F11 matrix in code (verifiable against the spec table in tests).
- @require_perm decorator: 401 on anonymous, 403 on missing permission,
  passes through otherwise. Pure-function user_has() for unit-testing.
- AuthUser (Flask-Login wrapper) resolves the permission set from a
  User's groups; load_user is the Flask-Login user_loader.
- bcrypt password hashing helpers (12 rounds by default, configurable).
- SOC opaque token (D-006): secrets.token_urlsafe(32), bcrypt-hashed at
  rest, plain value returned once at creation and never re-displayable.
- Group-based RBAC from day one (D-003) — Keycloak OIDC in v2 maps onto
  the same group model.
This commit is contained in:
knacky
2026-05-21 20:33:31 +02:00
parent 104d73143a
commit 7f4ad85a68
7 changed files with 289 additions and 0 deletions

View File

@@ -0,0 +1,6 @@
"""Authentication: local password (Flask-Login) + SOC sessions."""
from mimic.auth.identity import AuthUser, load_user
from mimic.auth.password import check_password, hash_password
__all__ = ["AuthUser", "check_password", "hash_password", "load_user"]

View File

@@ -0,0 +1,60 @@
"""Flask-Login user wrapper that carries the resolved permission set."""
from __future__ import annotations
from dataclasses import dataclass, field
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from mimic.db.models import User, UserGroup
from mimic.extensions import db
from mimic.rbac.matrix import GROUP_PERMISSIONS, GroupName, Permission
@dataclass(slots=True)
class AuthUser:
"""Lightweight identity attached to ``flask.g`` / ``current_user``."""
id: UUID
email: str
permissions: frozenset[Permission] = field(default_factory=frozenset)
groups: frozenset[str] = field(default_factory=frozenset)
is_authenticated: bool = True
is_active: bool = True
is_anonymous: bool = False
def get_id(self) -> str:
return str(self.id)
def load_user(user_id: str) -> AuthUser | None:
"""Flask-Login `user_loader` callback."""
try:
uid = UUID(user_id)
except (ValueError, TypeError):
return None
stmt = (
select(User)
.where(User.id == uid)
.options(selectinload(User.group_links).selectinload(UserGroup.group))
)
user = db.session.execute(stmt).scalar_one_or_none()
if user is None or not user.is_active:
return None
group_names = {link.group.name for link in user.group_links}
perms: set[Permission] = set()
for group_name in group_names:
try:
perms.update(GROUP_PERMISSIONS[GroupName(group_name)])
except ValueError:
continue
return AuthUser(
id=user.id,
email=user.email,
permissions=frozenset(perms),
groups=frozenset(group_names),
)

View File

@@ -0,0 +1,25 @@
"""bcrypt password helpers."""
from __future__ import annotations
import bcrypt
_DEFAULT_ROUNDS = 12
def hash_password(plain: str, *, rounds: int = _DEFAULT_ROUNDS) -> str:
"""Hash a plain password with bcrypt; returns the UTF-8 string."""
if not plain:
raise ValueError("password must not be empty")
salt = bcrypt.gensalt(rounds=rounds)
return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8")
def check_password(plain: str, hashed: str | None) -> bool:
"""Constant-time bcrypt verification (False if hash is missing/invalid)."""
if not plain or not hashed:
return False
try:
return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
except (ValueError, TypeError):
return False

View File

@@ -0,0 +1,44 @@
"""SOC opaque token generation + verification.
Decision D-006: clear token returned once at creation, bcrypt hash persisted.
"""
from __future__ import annotations
import hmac
import secrets
from dataclasses import dataclass
import bcrypt
_TOKEN_BYTES = 32 # 256 bits of entropy
_DEFAULT_ROUNDS = 12
@dataclass(frozen=True, slots=True)
class SocTokenMaterial:
plain: str
hashed: str
def generate_token(*, rounds: int = _DEFAULT_ROUNDS) -> SocTokenMaterial:
"""Generate a fresh SOC token and its bcrypt hash."""
plain = secrets.token_urlsafe(_TOKEN_BYTES)
salt = bcrypt.gensalt(rounds=rounds)
hashed = bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8")
return SocTokenMaterial(plain=plain, hashed=hashed)
def verify_token(plain: str, hashed: str) -> bool:
"""Constant-time bcrypt verification of a SOC token."""
if not plain or not hashed:
return False
try:
return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
except (ValueError, TypeError):
return False
def safe_compare(a: str, b: str) -> bool:
"""Constant-time string compare for non-hashed contexts."""
return hmac.compare_digest(a, b)

View File

@@ -0,0 +1,17 @@
"""Group-based RBAC (spec F11 matrix)."""
from mimic.rbac.decorators import require_perm
from mimic.rbac.matrix import (
ALL_PERMISSIONS,
DEFAULT_GROUPS,
GROUP_PERMISSIONS,
Permission,
)
__all__ = [
"ALL_PERMISSIONS",
"DEFAULT_GROUPS",
"GROUP_PERMISSIONS",
"Permission",
"require_perm",
]

View File

@@ -0,0 +1,42 @@
"""`@require_perm` Flask decorator (group-based RBAC)."""
from __future__ import annotations
from collections.abc import Callable
from functools import wraps
from typing import TYPE_CHECKING, ParamSpec, TypeVar
from flask import abort
from flask_login import current_user
from mimic.rbac.matrix import Permission
if TYPE_CHECKING:
pass
P = ParamSpec("P")
R = TypeVar("R")
def require_perm(perm: Permission) -> Callable[[Callable[P, R]], Callable[P, R]]:
"""Reject the request with 401/403 unless the user holds `perm`."""
def _decorate(view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def _wrapped(*args: P.args, **kwargs: P.kwargs) -> R:
user = current_user
if not getattr(user, "is_authenticated", False):
abort(401)
permissions: frozenset[Permission] = getattr(user, "permissions", frozenset())
if perm not in permissions:
abort(403)
return view(*args, **kwargs)
return _wrapped # type: ignore[return-value]
return _decorate
def user_has(perm: Permission, permissions: frozenset[Permission]) -> bool:
"""Pure helper, easier to unit-test than the decorator."""
return perm in permissions

View File

@@ -0,0 +1,95 @@
"""F11 permission matrix as code.
This mirrors the spec table 1:1 — keeping it as a single source in code lets
us hash-check it against the spec in CI (spec-analyst task S0.2). Three default
groups are seeded by Alembic and align with the three user types.
"""
from __future__ import annotations
import enum
class Permission(enum.StrEnum):
"""Application permissions. Codes are kebab-case + dotted scope."""
# Engagement
ENGAGEMENT_CREATE = "engagement.create"
ENGAGEMENT_READ = "engagement.read"
ENGAGEMENT_READ_OWN = "engagement.read_own" # scoped to soc_session
ENGAGEMENT_UPDATE = "engagement.update"
ENGAGEMENT_DELETE = "engagement.delete"
ENGAGEMENT_MEMBER_MANAGE = "engagement.member.manage"
ENGAGEMENT_SOC_TOKEN_ISSUE = "engagement.soc_token.issue" # noqa: S105
# Hosts
HOST_CRUD = "host.crud"
# TTPs
TTP_READ = "ttp.read"
TTP_DRAFT = "ttp.draft"
TTP_PROMOTE = "ttp.promote"
# Imports
IMPORT_JOURNAL = "import.journal"
# Scenarios
SCENARIO_CRUD = "scenario.crud"
# Runs
RUN_START = "run.start"
RUN_CONTROL = "run.control"
# Detection / evidence
EVIDENCE_ADD = "evidence.add"
DETECTION_ADD = "detection.add"
# Cleanup
CLEANUP_TRIGGER = "cleanup.trigger"
# Reports
REPORT_GENERATE = "report.generate"
REPORT_READ = "report.read"
# Audit
AUDIT_READ = "audit.read"
ALL_PERMISSIONS: tuple[Permission, ...] = tuple(Permission)
class GroupName(enum.StrEnum):
RT_OPERATOR = "rt_operator"
RT_LEAD = "rt_lead"
SOC_ANALYST = "soc_analyst"
# Source-of-truth mapping derived from spec §F11. Verified by tests against
# the spec table.
GROUP_PERMISSIONS: dict[GroupName, frozenset[Permission]] = {
GroupName.RT_OPERATOR: frozenset(
{
Permission.ENGAGEMENT_CREATE,
Permission.ENGAGEMENT_READ,
Permission.HOST_CRUD,
Permission.TTP_READ,
Permission.TTP_DRAFT,
Permission.IMPORT_JOURNAL,
Permission.SCENARIO_CRUD,
Permission.EVIDENCE_ADD,
Permission.CLEANUP_TRIGGER,
Permission.REPORT_READ,
}
),
GroupName.RT_LEAD: frozenset(ALL_PERMISSIONS),
GroupName.SOC_ANALYST: frozenset(
{
Permission.ENGAGEMENT_READ_OWN,
Permission.DETECTION_ADD,
Permission.REPORT_READ,
}
),
}
DEFAULT_GROUPS: tuple[GroupName, ...] = tuple(GroupName)