diff --git a/backend/src/mimic/auth/__init__.py b/backend/src/mimic/auth/__init__.py new file mode 100644 index 0000000..2b0abbd --- /dev/null +++ b/backend/src/mimic/auth/__init__.py @@ -0,0 +1,6 @@ +"""Authentication: local password (Flask-Login) + SOC sessions.""" + +from mimic.auth.identity import AuthUser, load_user +from mimic.auth.password import check_password, hash_password + +__all__ = ["AuthUser", "check_password", "hash_password", "load_user"] diff --git a/backend/src/mimic/auth/identity.py b/backend/src/mimic/auth/identity.py new file mode 100644 index 0000000..10912dc --- /dev/null +++ b/backend/src/mimic/auth/identity.py @@ -0,0 +1,60 @@ +"""Flask-Login user wrapper that carries the resolved permission set.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from uuid import UUID + +from sqlalchemy import select +from sqlalchemy.orm import selectinload + +from mimic.db.models import User, UserGroup +from mimic.extensions import db +from mimic.rbac.matrix import GROUP_PERMISSIONS, GroupName, Permission + + +@dataclass(slots=True) +class AuthUser: + """Lightweight identity attached to ``flask.g`` / ``current_user``.""" + + id: UUID + email: str + permissions: frozenset[Permission] = field(default_factory=frozenset) + groups: frozenset[str] = field(default_factory=frozenset) + is_authenticated: bool = True + is_active: bool = True + is_anonymous: bool = False + + def get_id(self) -> str: + return str(self.id) + + +def load_user(user_id: str) -> AuthUser | None: + """Flask-Login `user_loader` callback.""" + try: + uid = UUID(user_id) + except (ValueError, TypeError): + return None + + stmt = ( + select(User) + .where(User.id == uid) + .options(selectinload(User.group_links).selectinload(UserGroup.group)) + ) + user = db.session.execute(stmt).scalar_one_or_none() + if user is None or not user.is_active: + return None + + group_names = {link.group.name for link in user.group_links} + perms: set[Permission] = set() + for group_name in group_names: + try: + perms.update(GROUP_PERMISSIONS[GroupName(group_name)]) + except ValueError: + continue + return AuthUser( + id=user.id, + email=user.email, + permissions=frozenset(perms), + groups=frozenset(group_names), + ) diff --git a/backend/src/mimic/auth/password.py b/backend/src/mimic/auth/password.py new file mode 100644 index 0000000..7c84ade --- /dev/null +++ b/backend/src/mimic/auth/password.py @@ -0,0 +1,25 @@ +"""bcrypt password helpers.""" + +from __future__ import annotations + +import bcrypt + +_DEFAULT_ROUNDS = 12 + + +def hash_password(plain: str, *, rounds: int = _DEFAULT_ROUNDS) -> str: + """Hash a plain password with bcrypt; returns the UTF-8 string.""" + if not plain: + raise ValueError("password must not be empty") + salt = bcrypt.gensalt(rounds=rounds) + return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8") + + +def check_password(plain: str, hashed: str | None) -> bool: + """Constant-time bcrypt verification (False if hash is missing/invalid).""" + if not plain or not hashed: + return False + try: + return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) + except (ValueError, TypeError): + return False diff --git a/backend/src/mimic/auth/soc_token.py b/backend/src/mimic/auth/soc_token.py new file mode 100644 index 0000000..6e82958 --- /dev/null +++ b/backend/src/mimic/auth/soc_token.py @@ -0,0 +1,44 @@ +"""SOC opaque token generation + verification. + +Decision D-006: clear token returned once at creation, bcrypt hash persisted. +""" + +from __future__ import annotations + +import hmac +import secrets +from dataclasses import dataclass + +import bcrypt + +_TOKEN_BYTES = 32 # 256 bits of entropy +_DEFAULT_ROUNDS = 12 + + +@dataclass(frozen=True, slots=True) +class SocTokenMaterial: + plain: str + hashed: str + + +def generate_token(*, rounds: int = _DEFAULT_ROUNDS) -> SocTokenMaterial: + """Generate a fresh SOC token and its bcrypt hash.""" + plain = secrets.token_urlsafe(_TOKEN_BYTES) + salt = bcrypt.gensalt(rounds=rounds) + hashed = bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8") + return SocTokenMaterial(plain=plain, hashed=hashed) + + +def verify_token(plain: str, hashed: str) -> bool: + """Constant-time bcrypt verification of a SOC token.""" + if not plain or not hashed: + return False + try: + return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) + except (ValueError, TypeError): + return False + + +def safe_compare(a: str, b: str) -> bool: + """Constant-time string compare for non-hashed contexts.""" + return hmac.compare_digest(a, b) diff --git a/backend/src/mimic/rbac/__init__.py b/backend/src/mimic/rbac/__init__.py new file mode 100644 index 0000000..183c051 --- /dev/null +++ b/backend/src/mimic/rbac/__init__.py @@ -0,0 +1,17 @@ +"""Group-based RBAC (spec F11 matrix).""" + +from mimic.rbac.decorators import require_perm +from mimic.rbac.matrix import ( + ALL_PERMISSIONS, + DEFAULT_GROUPS, + GROUP_PERMISSIONS, + Permission, +) + +__all__ = [ + "ALL_PERMISSIONS", + "DEFAULT_GROUPS", + "GROUP_PERMISSIONS", + "Permission", + "require_perm", +] diff --git a/backend/src/mimic/rbac/decorators.py b/backend/src/mimic/rbac/decorators.py new file mode 100644 index 0000000..ae342a2 --- /dev/null +++ b/backend/src/mimic/rbac/decorators.py @@ -0,0 +1,42 @@ +"""`@require_perm` Flask decorator (group-based RBAC).""" + +from __future__ import annotations + +from collections.abc import Callable +from functools import wraps +from typing import TYPE_CHECKING, ParamSpec, TypeVar + +from flask import abort +from flask_login import current_user + +from mimic.rbac.matrix import Permission + +if TYPE_CHECKING: + pass + +P = ParamSpec("P") +R = TypeVar("R") + + +def require_perm(perm: Permission) -> Callable[[Callable[P, R]], Callable[P, R]]: + """Reject the request with 401/403 unless the user holds `perm`.""" + + def _decorate(view: Callable[P, R]) -> Callable[P, R]: + @wraps(view) + def _wrapped(*args: P.args, **kwargs: P.kwargs) -> R: + user = current_user + if not getattr(user, "is_authenticated", False): + abort(401) + permissions: frozenset[Permission] = getattr(user, "permissions", frozenset()) + if perm not in permissions: + abort(403) + return view(*args, **kwargs) + + return _wrapped # type: ignore[return-value] + + return _decorate + + +def user_has(perm: Permission, permissions: frozenset[Permission]) -> bool: + """Pure helper, easier to unit-test than the decorator.""" + return perm in permissions diff --git a/backend/src/mimic/rbac/matrix.py b/backend/src/mimic/rbac/matrix.py new file mode 100644 index 0000000..66aadbb --- /dev/null +++ b/backend/src/mimic/rbac/matrix.py @@ -0,0 +1,95 @@ +"""F11 permission matrix as code. + +This mirrors the spec table 1:1 — keeping it as a single source in code lets +us hash-check it against the spec in CI (spec-analyst task S0.2). Three default +groups are seeded by Alembic and align with the three user types. +""" + +from __future__ import annotations + +import enum + + +class Permission(enum.StrEnum): + """Application permissions. Codes are kebab-case + dotted scope.""" + + # Engagement + ENGAGEMENT_CREATE = "engagement.create" + ENGAGEMENT_READ = "engagement.read" + ENGAGEMENT_READ_OWN = "engagement.read_own" # scoped to soc_session + ENGAGEMENT_UPDATE = "engagement.update" + ENGAGEMENT_DELETE = "engagement.delete" + ENGAGEMENT_MEMBER_MANAGE = "engagement.member.manage" + ENGAGEMENT_SOC_TOKEN_ISSUE = "engagement.soc_token.issue" # noqa: S105 + + # Hosts + HOST_CRUD = "host.crud" + + # TTPs + TTP_READ = "ttp.read" + TTP_DRAFT = "ttp.draft" + TTP_PROMOTE = "ttp.promote" + + # Imports + IMPORT_JOURNAL = "import.journal" + + # Scenarios + SCENARIO_CRUD = "scenario.crud" + + # Runs + RUN_START = "run.start" + RUN_CONTROL = "run.control" + + # Detection / evidence + EVIDENCE_ADD = "evidence.add" + DETECTION_ADD = "detection.add" + + # Cleanup + CLEANUP_TRIGGER = "cleanup.trigger" + + # Reports + REPORT_GENERATE = "report.generate" + REPORT_READ = "report.read" + + # Audit + AUDIT_READ = "audit.read" + + +ALL_PERMISSIONS: tuple[Permission, ...] = tuple(Permission) + + +class GroupName(enum.StrEnum): + RT_OPERATOR = "rt_operator" + RT_LEAD = "rt_lead" + SOC_ANALYST = "soc_analyst" + + +# Source-of-truth mapping derived from spec §F11. Verified by tests against +# the spec table. +GROUP_PERMISSIONS: dict[GroupName, frozenset[Permission]] = { + GroupName.RT_OPERATOR: frozenset( + { + Permission.ENGAGEMENT_CREATE, + Permission.ENGAGEMENT_READ, + Permission.HOST_CRUD, + Permission.TTP_READ, + Permission.TTP_DRAFT, + Permission.IMPORT_JOURNAL, + Permission.SCENARIO_CRUD, + Permission.EVIDENCE_ADD, + Permission.CLEANUP_TRIGGER, + Permission.REPORT_READ, + } + ), + GroupName.RT_LEAD: frozenset(ALL_PERMISSIONS), + GroupName.SOC_ANALYST: frozenset( + { + Permission.ENGAGEMENT_READ_OWN, + Permission.DETECTION_ADD, + Permission.REPORT_READ, + } + ), +} + + +DEFAULT_GROUPS: tuple[GroupName, ...] = tuple(GroupName)