feat(backend): add local auth + group-based RBAC matching F11 (B0.6)
- Permission enum + GroupName enum + GROUP_PERMISSIONS mapping mirror the F11 matrix in code (verifiable against the spec table in tests). - @require_perm decorator: 401 on anonymous, 403 on missing permission, passes through otherwise. Pure-function user_has() for unit-testing. - AuthUser (Flask-Login wrapper) resolves the permission set from a User's groups; load_user is the Flask-Login user_loader. - bcrypt password hashing helpers (12 rounds by default, configurable). - SOC opaque token (D-006): secrets.token_urlsafe(32), bcrypt-hashed at rest, plain value returned once at creation and never re-displayable. - Group-based RBAC from day one (D-003) — Keycloak OIDC in v2 maps onto the same group model.
This commit is contained in:
6
backend/src/mimic/auth/__init__.py
Normal file
6
backend/src/mimic/auth/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
"""Authentication: local password (Flask-Login) + SOC sessions."""
|
||||||
|
|
||||||
|
from mimic.auth.identity import AuthUser, load_user
|
||||||
|
from mimic.auth.password import check_password, hash_password
|
||||||
|
|
||||||
|
__all__ = ["AuthUser", "check_password", "hash_password", "load_user"]
|
||||||
60
backend/src/mimic/auth/identity.py
Normal file
60
backend/src/mimic/auth/identity.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
"""Flask-Login user wrapper that carries the resolved permission set."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from uuid import UUID
|
||||||
|
|
||||||
|
from sqlalchemy import select
|
||||||
|
from sqlalchemy.orm import selectinload
|
||||||
|
|
||||||
|
from mimic.db.models import User, UserGroup
|
||||||
|
from mimic.extensions import db
|
||||||
|
from mimic.rbac.matrix import GROUP_PERMISSIONS, GroupName, Permission
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(slots=True)
|
||||||
|
class AuthUser:
|
||||||
|
"""Lightweight identity attached to ``flask.g`` / ``current_user``."""
|
||||||
|
|
||||||
|
id: UUID
|
||||||
|
email: str
|
||||||
|
permissions: frozenset[Permission] = field(default_factory=frozenset)
|
||||||
|
groups: frozenset[str] = field(default_factory=frozenset)
|
||||||
|
is_authenticated: bool = True
|
||||||
|
is_active: bool = True
|
||||||
|
is_anonymous: bool = False
|
||||||
|
|
||||||
|
def get_id(self) -> str:
|
||||||
|
return str(self.id)
|
||||||
|
|
||||||
|
|
||||||
|
def load_user(user_id: str) -> AuthUser | None:
|
||||||
|
"""Flask-Login `user_loader` callback."""
|
||||||
|
try:
|
||||||
|
uid = UUID(user_id)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
stmt = (
|
||||||
|
select(User)
|
||||||
|
.where(User.id == uid)
|
||||||
|
.options(selectinload(User.group_links).selectinload(UserGroup.group))
|
||||||
|
)
|
||||||
|
user = db.session.execute(stmt).scalar_one_or_none()
|
||||||
|
if user is None or not user.is_active:
|
||||||
|
return None
|
||||||
|
|
||||||
|
group_names = {link.group.name for link in user.group_links}
|
||||||
|
perms: set[Permission] = set()
|
||||||
|
for group_name in group_names:
|
||||||
|
try:
|
||||||
|
perms.update(GROUP_PERMISSIONS[GroupName(group_name)])
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
return AuthUser(
|
||||||
|
id=user.id,
|
||||||
|
email=user.email,
|
||||||
|
permissions=frozenset(perms),
|
||||||
|
groups=frozenset(group_names),
|
||||||
|
)
|
||||||
25
backend/src/mimic/auth/password.py
Normal file
25
backend/src/mimic/auth/password.py
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
"""bcrypt password helpers."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import bcrypt
|
||||||
|
|
||||||
|
_DEFAULT_ROUNDS = 12
|
||||||
|
|
||||||
|
|
||||||
|
def hash_password(plain: str, *, rounds: int = _DEFAULT_ROUNDS) -> str:
|
||||||
|
"""Hash a plain password with bcrypt; returns the UTF-8 string."""
|
||||||
|
if not plain:
|
||||||
|
raise ValueError("password must not be empty")
|
||||||
|
salt = bcrypt.gensalt(rounds=rounds)
|
||||||
|
return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
def check_password(plain: str, hashed: str | None) -> bool:
|
||||||
|
"""Constant-time bcrypt verification (False if hash is missing/invalid)."""
|
||||||
|
if not plain or not hashed:
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
return False
|
||||||
44
backend/src/mimic/auth/soc_token.py
Normal file
44
backend/src/mimic/auth/soc_token.py
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
"""SOC opaque token generation + verification.
|
||||||
|
|
||||||
|
Decision D-006: clear token returned once at creation, bcrypt hash persisted.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import hmac
|
||||||
|
import secrets
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
import bcrypt
|
||||||
|
|
||||||
|
_TOKEN_BYTES = 32 # 256 bits of entropy
|
||||||
|
_DEFAULT_ROUNDS = 12
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True, slots=True)
|
||||||
|
class SocTokenMaterial:
|
||||||
|
plain: str
|
||||||
|
hashed: str
|
||||||
|
|
||||||
|
|
||||||
|
def generate_token(*, rounds: int = _DEFAULT_ROUNDS) -> SocTokenMaterial:
|
||||||
|
"""Generate a fresh SOC token and its bcrypt hash."""
|
||||||
|
plain = secrets.token_urlsafe(_TOKEN_BYTES)
|
||||||
|
salt = bcrypt.gensalt(rounds=rounds)
|
||||||
|
hashed = bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8")
|
||||||
|
return SocTokenMaterial(plain=plain, hashed=hashed)
|
||||||
|
|
||||||
|
|
||||||
|
def verify_token(plain: str, hashed: str) -> bool:
|
||||||
|
"""Constant-time bcrypt verification of a SOC token."""
|
||||||
|
if not plain or not hashed:
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def safe_compare(a: str, b: str) -> bool:
|
||||||
|
"""Constant-time string compare for non-hashed contexts."""
|
||||||
|
return hmac.compare_digest(a, b)
|
||||||
17
backend/src/mimic/rbac/__init__.py
Normal file
17
backend/src/mimic/rbac/__init__.py
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
"""Group-based RBAC (spec F11 matrix)."""
|
||||||
|
|
||||||
|
from mimic.rbac.decorators import require_perm
|
||||||
|
from mimic.rbac.matrix import (
|
||||||
|
ALL_PERMISSIONS,
|
||||||
|
DEFAULT_GROUPS,
|
||||||
|
GROUP_PERMISSIONS,
|
||||||
|
Permission,
|
||||||
|
)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"ALL_PERMISSIONS",
|
||||||
|
"DEFAULT_GROUPS",
|
||||||
|
"GROUP_PERMISSIONS",
|
||||||
|
"Permission",
|
||||||
|
"require_perm",
|
||||||
|
]
|
||||||
42
backend/src/mimic/rbac/decorators.py
Normal file
42
backend/src/mimic/rbac/decorators.py
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
"""`@require_perm` Flask decorator (group-based RBAC)."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from collections.abc import Callable
|
||||||
|
from functools import wraps
|
||||||
|
from typing import TYPE_CHECKING, ParamSpec, TypeVar
|
||||||
|
|
||||||
|
from flask import abort
|
||||||
|
from flask_login import current_user
|
||||||
|
|
||||||
|
from mimic.rbac.matrix import Permission
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
pass
|
||||||
|
|
||||||
|
P = ParamSpec("P")
|
||||||
|
R = TypeVar("R")
|
||||||
|
|
||||||
|
|
||||||
|
def require_perm(perm: Permission) -> Callable[[Callable[P, R]], Callable[P, R]]:
|
||||||
|
"""Reject the request with 401/403 unless the user holds `perm`."""
|
||||||
|
|
||||||
|
def _decorate(view: Callable[P, R]) -> Callable[P, R]:
|
||||||
|
@wraps(view)
|
||||||
|
def _wrapped(*args: P.args, **kwargs: P.kwargs) -> R:
|
||||||
|
user = current_user
|
||||||
|
if not getattr(user, "is_authenticated", False):
|
||||||
|
abort(401)
|
||||||
|
permissions: frozenset[Permission] = getattr(user, "permissions", frozenset())
|
||||||
|
if perm not in permissions:
|
||||||
|
abort(403)
|
||||||
|
return view(*args, **kwargs)
|
||||||
|
|
||||||
|
return _wrapped # type: ignore[return-value]
|
||||||
|
|
||||||
|
return _decorate
|
||||||
|
|
||||||
|
|
||||||
|
def user_has(perm: Permission, permissions: frozenset[Permission]) -> bool:
|
||||||
|
"""Pure helper, easier to unit-test than the decorator."""
|
||||||
|
return perm in permissions
|
||||||
95
backend/src/mimic/rbac/matrix.py
Normal file
95
backend/src/mimic/rbac/matrix.py
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
"""F11 permission matrix as code.
|
||||||
|
|
||||||
|
This mirrors the spec table 1:1 — keeping it as a single source in code lets
|
||||||
|
us hash-check it against the spec in CI (spec-analyst task S0.2). Three default
|
||||||
|
groups are seeded by Alembic and align with the three user types.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import enum
|
||||||
|
|
||||||
|
|
||||||
|
class Permission(enum.StrEnum):
|
||||||
|
"""Application permissions. Codes are kebab-case + dotted scope."""
|
||||||
|
|
||||||
|
# Engagement
|
||||||
|
ENGAGEMENT_CREATE = "engagement.create"
|
||||||
|
ENGAGEMENT_READ = "engagement.read"
|
||||||
|
ENGAGEMENT_READ_OWN = "engagement.read_own" # scoped to soc_session
|
||||||
|
ENGAGEMENT_UPDATE = "engagement.update"
|
||||||
|
ENGAGEMENT_DELETE = "engagement.delete"
|
||||||
|
ENGAGEMENT_MEMBER_MANAGE = "engagement.member.manage"
|
||||||
|
ENGAGEMENT_SOC_TOKEN_ISSUE = "engagement.soc_token.issue" # noqa: S105
|
||||||
|
|
||||||
|
# Hosts
|
||||||
|
HOST_CRUD = "host.crud"
|
||||||
|
|
||||||
|
# TTPs
|
||||||
|
TTP_READ = "ttp.read"
|
||||||
|
TTP_DRAFT = "ttp.draft"
|
||||||
|
TTP_PROMOTE = "ttp.promote"
|
||||||
|
|
||||||
|
# Imports
|
||||||
|
IMPORT_JOURNAL = "import.journal"
|
||||||
|
|
||||||
|
# Scenarios
|
||||||
|
SCENARIO_CRUD = "scenario.crud"
|
||||||
|
|
||||||
|
# Runs
|
||||||
|
RUN_START = "run.start"
|
||||||
|
RUN_CONTROL = "run.control"
|
||||||
|
|
||||||
|
# Detection / evidence
|
||||||
|
EVIDENCE_ADD = "evidence.add"
|
||||||
|
DETECTION_ADD = "detection.add"
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
CLEANUP_TRIGGER = "cleanup.trigger"
|
||||||
|
|
||||||
|
# Reports
|
||||||
|
REPORT_GENERATE = "report.generate"
|
||||||
|
REPORT_READ = "report.read"
|
||||||
|
|
||||||
|
# Audit
|
||||||
|
AUDIT_READ = "audit.read"
|
||||||
|
|
||||||
|
|
||||||
|
ALL_PERMISSIONS: tuple[Permission, ...] = tuple(Permission)
|
||||||
|
|
||||||
|
|
||||||
|
class GroupName(enum.StrEnum):
|
||||||
|
RT_OPERATOR = "rt_operator"
|
||||||
|
RT_LEAD = "rt_lead"
|
||||||
|
SOC_ANALYST = "soc_analyst"
|
||||||
|
|
||||||
|
|
||||||
|
# Source-of-truth mapping derived from spec §F11. Verified by tests against
|
||||||
|
# the spec table.
|
||||||
|
GROUP_PERMISSIONS: dict[GroupName, frozenset[Permission]] = {
|
||||||
|
GroupName.RT_OPERATOR: frozenset(
|
||||||
|
{
|
||||||
|
Permission.ENGAGEMENT_CREATE,
|
||||||
|
Permission.ENGAGEMENT_READ,
|
||||||
|
Permission.HOST_CRUD,
|
||||||
|
Permission.TTP_READ,
|
||||||
|
Permission.TTP_DRAFT,
|
||||||
|
Permission.IMPORT_JOURNAL,
|
||||||
|
Permission.SCENARIO_CRUD,
|
||||||
|
Permission.EVIDENCE_ADD,
|
||||||
|
Permission.CLEANUP_TRIGGER,
|
||||||
|
Permission.REPORT_READ,
|
||||||
|
}
|
||||||
|
),
|
||||||
|
GroupName.RT_LEAD: frozenset(ALL_PERMISSIONS),
|
||||||
|
GroupName.SOC_ANALYST: frozenset(
|
||||||
|
{
|
||||||
|
Permission.ENGAGEMENT_READ_OWN,
|
||||||
|
Permission.DETECTION_ADD,
|
||||||
|
Permission.REPORT_READ,
|
||||||
|
}
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
DEFAULT_GROUPS: tuple[GroupName, ...] = tuple(GroupName)
|
||||||
Reference in New Issue
Block a user