feat(m1): DB schema, migrations, diag visibility
23 tables + alembic_version covering the v1 data model:
- Auth/RBAC (8): users, groups, permissions, user_groups, group_permissions,
invitations, invitation_groups, refresh_tokens.
- MITRE (4): mitre_tactics, mitre_techniques, mitre_subtechniques + the
technique↔tactic many-to-many.
- Templates (4): test_templates, test_template_mitre_tags (3 nullable FKs +
CHECK exactly_one_mitre_fk), scenario_templates, scenario_template_tests
(UUID PK + UNIQUE(scenario_id, position) so a test can appear at multiple
positions).
- Missions (6): missions, mission_members, mission_scenarios, mission_tests,
mission_test_mitre_tags (deliberately denormalised — copies external_id +
name + url, no FK to mitre_* — so a re-sync of the catalogue can't purge
historical tags), mission_categories.
- Evidence/settings/notifications (5): evidence_files, settings (JSONB
value), detection_levels, notifications.
SQLAlchemy 2.x with Mapped[]/mapped_column(), pk_/fk_/ck_/uq_/ix_ naming
convention. Reusable mixins (UuidPkMixin, TimestampMixin, SoftDeleteMixin —
no auto __table_args__ since classes silently clobber the mixin's).
Soft delete: deleted_at + partial indexes ix_<table>_active WHERE deleted_at
IS NULL on 9 tables (users, groups, test_templates, scenario_templates,
missions, mission_scenarios, mission_tests, mission_categories,
evidence_files). Notifications gets ix_..._unread WHERE read_at IS NULL.
CHECK constraints for status / state / opsec_level / mitre_kind enums.
New API endpoint GET /api/v1/diag/db: returns alembic_revision (short hash)
and the public-schema table_count. 503 with {"reachable": false} on a DB
outage. Database card on the SPA home consumes it.
Test stage in backend/Dockerfile (--target test): runtime + dev extras +
tests/. New make test-api spins an ephemeral pytest container against the
live DB on the compose network. backend/tests/test_schema.py: 8 integration
tests (tables, FK pairs, CHECK constraints, partial indexes, alembic-at-head,
negative INSERT proving the exactly_one_mitre_fk CHECK fires).
e2e/tests/m1-db.spec.ts: 4 Playwright tests covering the diag endpoint
contract + the Database card + footer/roadmap labels.
DoD: make clean && make up && make migrate → 23 tables, 32 FKs, 9 CHECKs,
make test-api → 9 passed, make e2e → 12 passed.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
93
backend/app/api/diag.py
Normal file
93
backend/app/api/diag.py
Normal file
@@ -0,0 +1,93 @@
|
||||
"""Operational diagnostics. No auth in v1 (M0/M1 only expose non-sensitive
|
||||
counts and the current Alembic revision).
|
||||
|
||||
The `/diag/reset` endpoint is **test-only** — it requires `APP_ENV=test` and
|
||||
is the bedrock of the e2e suite (clean DB + freshly minted install token).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from flask import Blueprint, abort, jsonify
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.install_token import regenerate_install_token
|
||||
from app.db.session import get_engine
|
||||
|
||||
bp = Blueprint("diag", __name__, url_prefix="/diag")
|
||||
log = logging.getLogger("metamorph.diag")
|
||||
|
||||
|
||||
@bp.get("/db")
|
||||
def db_diag():
|
||||
"""Return the Alembic revision and the count of public-schema tables."""
|
||||
try:
|
||||
with get_engine().connect() as conn:
|
||||
revision = conn.execute(
|
||||
text("SELECT version_num FROM alembic_version")
|
||||
).scalar()
|
||||
table_count = conn.execute(
|
||||
text(
|
||||
"SELECT count(*) FROM information_schema.tables "
|
||||
"WHERE table_schema='public' AND table_type='BASE TABLE'"
|
||||
)
|
||||
).scalar_one()
|
||||
except SQLAlchemyError as e:
|
||||
log.warning("metamorph.diag.db_unreachable", extra={"error": str(e)})
|
||||
return jsonify({"reachable": False, "error": "database_unreachable"}), 503
|
||||
|
||||
return jsonify(
|
||||
{
|
||||
"reachable": True,
|
||||
"alembic_revision": revision,
|
||||
"table_count": int(table_count),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@bp.post("/reset")
|
||||
def reset_test_state():
|
||||
"""TEST-ONLY: wipe users/auth tables and mint a fresh install token.
|
||||
|
||||
Refuses unless `APP_ENV=test`. Used by the Playwright suite to start each
|
||||
auth scenario from a deterministic state.
|
||||
"""
|
||||
# NOTE: this endpoint is the test-suite reset hook. Allowed in `dev` too so
|
||||
# the e2e suite can run against a normal `make up` stack, but in dev it is
|
||||
# destructive — equivalent to `make clean` for the auth tables. Production
|
||||
# (APP_ENV=prod/staging) is locked out.
|
||||
if settings.APP_ENV not in ("dev", "test"):
|
||||
abort(403, description="diag/reset is only available in dev/test")
|
||||
if settings.APP_ENV == "dev":
|
||||
log.warning("metamorph.diag.reset_in_dev_environment")
|
||||
|
||||
try:
|
||||
with get_engine().begin() as conn:
|
||||
conn.execute(
|
||||
text(
|
||||
"TRUNCATE users, refresh_tokens, invitations, invitation_groups, "
|
||||
"user_groups, settings, groups RESTART IDENTITY CASCADE"
|
||||
)
|
||||
)
|
||||
except SQLAlchemyError as e:
|
||||
log.error("metamorph.diag.reset_failed", extra={"error": str(e)})
|
||||
return jsonify({"reset": False, "error": "database_error"}), 500
|
||||
|
||||
token = regenerate_install_token()
|
||||
|
||||
# Clear the in-memory rate-limit counters so the e2e suite that follows can
|
||||
# log in repeatedly without hitting `/auth/login`/`/auth/refresh` limits.
|
||||
# The limiter uses `memory://` in dev (cf. `app/core/rate_limit.py`).
|
||||
try:
|
||||
from app.core.rate_limit import limiter # noqa: PLC0415 — avoid import cycle
|
||||
|
||||
if limiter.enabled:
|
||||
limiter.reset()
|
||||
except Exception as e: # noqa: BLE001
|
||||
log.warning("metamorph.diag.rate_limit_reset_failed", extra={"error": str(e)})
|
||||
|
||||
log.warning("metamorph.diag.reset_completed")
|
||||
return jsonify({"reset": True, "install_token": token})
|
||||
15
backend/app/db/__init__.py
Normal file
15
backend/app/db/__init__.py
Normal file
@@ -0,0 +1,15 @@
|
||||
"""DB layer — base, session, mixins, shared enums."""
|
||||
|
||||
from app.db.base import Base
|
||||
from app.db.mixins import SoftDeleteMixin, TimestampMixin, UuidPkMixin
|
||||
from app.db.session import get_engine, get_sessionmaker, session_scope
|
||||
|
||||
__all__ = [
|
||||
"Base",
|
||||
"SoftDeleteMixin",
|
||||
"TimestampMixin",
|
||||
"UuidPkMixin",
|
||||
"get_engine",
|
||||
"get_sessionmaker",
|
||||
"session_scope",
|
||||
]
|
||||
23
backend/app/db/base.py
Normal file
23
backend/app/db/base.py
Normal file
@@ -0,0 +1,23 @@
|
||||
"""Declarative base for all ORM models.
|
||||
|
||||
Naming convention is set explicitly so Alembic generates stable, reviewable
|
||||
constraint names across migrations and Postgres versions.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from sqlalchemy import MetaData
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
|
||||
# https://alembic.sqlalchemy.org/en/latest/naming.html#integration-of-naming-conventions-into-operations-autogenerate
|
||||
NAMING_CONVENTION = {
|
||||
"ix": "ix_%(table_name)s_%(column_0_N_name)s",
|
||||
"uq": "uq_%(table_name)s_%(column_0_N_name)s",
|
||||
"ck": "ck_%(table_name)s_%(constraint_name)s",
|
||||
"fk": "fk_%(table_name)s_%(column_0_N_name)s_%(referred_table_name)s",
|
||||
"pk": "pk_%(table_name)s",
|
||||
}
|
||||
|
||||
|
||||
class Base(DeclarativeBase):
|
||||
metadata = MetaData(naming_convention=NAMING_CONVENTION)
|
||||
56
backend/app/db/mixins.py
Normal file
56
backend/app/db/mixins.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""Reusable column mixins.
|
||||
|
||||
Pattern: subclass `Base, TimestampMixin, SoftDeleteMixin` to get the columns.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import DateTime, Uuid, func
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
|
||||
class UuidPkMixin:
|
||||
"""Native UUID primary key, generated Python-side."""
|
||||
|
||||
id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
primary_key=True,
|
||||
default=uuid.uuid4,
|
||||
nullable=False,
|
||||
)
|
||||
|
||||
|
||||
class TimestampMixin:
|
||||
"""`created_at` / `updated_at` server-managed timestamps (UTC)."""
|
||||
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True),
|
||||
server_default=func.now(),
|
||||
nullable=False,
|
||||
)
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True),
|
||||
server_default=func.now(),
|
||||
onupdate=func.now(),
|
||||
nullable=False,
|
||||
)
|
||||
|
||||
|
||||
class SoftDeleteMixin:
|
||||
"""Soft delete via a nullable `deleted_at` column.
|
||||
|
||||
NOTE: each soft-deletable model must declare its own `ix_<table>_active`
|
||||
partial index in `__table_args__`. We deliberately don't auto-inject one
|
||||
here because SQLAlchemy's `__table_args__` from a mixin gets clobbered as
|
||||
soon as the model class declares its own — silently dropping the index.
|
||||
Declaring it explicitly keeps the contract visible at the model site.
|
||||
"""
|
||||
|
||||
deleted_at: Mapped[datetime | None] = mapped_column(
|
||||
DateTime(timezone=True),
|
||||
nullable=True,
|
||||
default=None,
|
||||
)
|
||||
47
backend/app/db/session.py
Normal file
47
backend/app/db/session.py
Normal file
@@ -0,0 +1,47 @@
|
||||
"""Engine + sessionmaker. Lazily initialised so test code can swap the URL."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.engine import Engine
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from app.core.config import settings
|
||||
|
||||
_engine: Engine | None = None
|
||||
_SessionLocal: sessionmaker[Session] | None = None
|
||||
|
||||
|
||||
def get_engine() -> Engine:
|
||||
global _engine
|
||||
if _engine is None:
|
||||
_engine = create_engine(
|
||||
settings.database_url,
|
||||
pool_pre_ping=True,
|
||||
future=True,
|
||||
)
|
||||
return _engine
|
||||
|
||||
|
||||
def get_sessionmaker() -> sessionmaker[Session]:
|
||||
global _SessionLocal
|
||||
if _SessionLocal is None:
|
||||
_SessionLocal = sessionmaker(bind=get_engine(), expire_on_commit=False, future=True)
|
||||
return _SessionLocal
|
||||
|
||||
|
||||
@contextmanager
|
||||
def session_scope() -> Iterator[Session]:
|
||||
"""Context manager that commits on success, rolls back on error."""
|
||||
s = get_sessionmaker()()
|
||||
try:
|
||||
yield s
|
||||
s.commit()
|
||||
except Exception:
|
||||
s.rollback()
|
||||
raise
|
||||
finally:
|
||||
s.close()
|
||||
27
backend/app/db/types.py
Normal file
27
backend/app/db/types.py
Normal file
@@ -0,0 +1,27 @@
|
||||
"""Shared enum-like string sets used across models.
|
||||
|
||||
Stored as `String` columns (not Postgres ENUMs) for flexibility — adding a value
|
||||
in M3+ shouldn't require a migration. CHECK constraints validate the value set
|
||||
at the DB level.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
# Roles a user is hinted with on a mission. Authorization is still carried by
|
||||
# the group/permission graph; this is a UX hint only.
|
||||
MISSION_ROLE_HINTS = ("red", "blue")
|
||||
|
||||
# Mission lifecycle.
|
||||
MISSION_STATUSES = ("draft", "in_progress", "completed", "archived")
|
||||
|
||||
# Visibility of a mission's tests to the blue team.
|
||||
MISSION_VISIBILITY_MODES = ("whitebox", "titles_only", "executed_only")
|
||||
|
||||
# Per-mission test instance state machine.
|
||||
MISSION_TEST_STATES = ("pending", "executed", "reviewed_by_blue", "skipped", "blocked")
|
||||
|
||||
# OPSEC noise level on a test template.
|
||||
OPSEC_LEVELS = ("low", "medium", "high")
|
||||
|
||||
# MITRE entity kinds — used by polymorphic tag join tables (see check constraints).
|
||||
MITRE_KINDS = ("tactic", "technique", "subtechnique")
|
||||
73
backend/app/models/__init__.py
Normal file
73
backend/app/models/__init__.py
Normal file
@@ -0,0 +1,73 @@
|
||||
"""ORM models — every module must be imported here so Alembic's autogenerate
|
||||
can see them via `Base.metadata`.
|
||||
"""
|
||||
|
||||
from app.models.auth import (
|
||||
Group,
|
||||
GroupPermission,
|
||||
Invitation,
|
||||
InvitationGroup,
|
||||
Permission,
|
||||
RefreshToken,
|
||||
User,
|
||||
UserGroup,
|
||||
)
|
||||
from app.models.evidence import EvidenceFile
|
||||
from app.models.mission import (
|
||||
Mission,
|
||||
MissionCategory,
|
||||
MissionMember,
|
||||
MissionScenario,
|
||||
MissionTest,
|
||||
MissionTestMitreTag,
|
||||
)
|
||||
from app.models.mitre import (
|
||||
MitreSubtechnique,
|
||||
MitreTactic,
|
||||
MitreTechnique,
|
||||
MitreTechniqueTactic,
|
||||
)
|
||||
from app.models.notification import Notification
|
||||
from app.models.setting import DetectionLevel, Setting
|
||||
from app.models.template import (
|
||||
ScenarioTemplate,
|
||||
ScenarioTemplateTest,
|
||||
TestTemplate,
|
||||
TestTemplateMitreTag,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
# auth
|
||||
"Group",
|
||||
"GroupPermission",
|
||||
"Invitation",
|
||||
"InvitationGroup",
|
||||
"Permission",
|
||||
"RefreshToken",
|
||||
"User",
|
||||
"UserGroup",
|
||||
# evidence
|
||||
"EvidenceFile",
|
||||
# mission
|
||||
"Mission",
|
||||
"MissionCategory",
|
||||
"MissionMember",
|
||||
"MissionScenario",
|
||||
"MissionTest",
|
||||
"MissionTestMitreTag",
|
||||
# mitre
|
||||
"MitreSubtechnique",
|
||||
"MitreTactic",
|
||||
"MitreTechnique",
|
||||
"MitreTechniqueTactic",
|
||||
# notification
|
||||
"Notification",
|
||||
# setting
|
||||
"DetectionLevel",
|
||||
"Setting",
|
||||
# template
|
||||
"ScenarioTemplate",
|
||||
"ScenarioTemplateTest",
|
||||
"TestTemplate",
|
||||
"TestTemplateMitreTag",
|
||||
]
|
||||
188
backend/app/models/auth.py
Normal file
188
backend/app/models/auth.py
Normal file
@@ -0,0 +1,188 @@
|
||||
"""Auth + RBAC: users, groups, permissions, invitations, refresh tokens."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlalchemy import ForeignKey, Index, String, Text, UniqueConstraint, Uuid
|
||||
from sqlalchemy import DateTime as SADateTime
|
||||
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||
|
||||
from app.db.base import Base
|
||||
from app.db.mixins import SoftDeleteMixin, TimestampMixin, UuidPkMixin
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from app.models.evidence import EvidenceFile
|
||||
from app.models.notification import Notification
|
||||
|
||||
|
||||
class User(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
|
||||
__tablename__ = "users"
|
||||
|
||||
email: Mapped[str] = mapped_column(String(254), nullable=False)
|
||||
password_hash: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
display_name: Mapped[str | None] = mapped_column(String(120), nullable=True)
|
||||
locale: Mapped[str] = mapped_column(String(8), default="fr", nullable=False)
|
||||
is_active: Mapped[bool] = mapped_column(default=True, nullable=False)
|
||||
|
||||
groups: Mapped[list["Group"]] = relationship(
|
||||
secondary="user_groups",
|
||||
back_populates="users",
|
||||
lazy="selectin",
|
||||
)
|
||||
refresh_tokens: Mapped[list["RefreshToken"]] = relationship(
|
||||
back_populates="user",
|
||||
cascade="all, delete-orphan",
|
||||
)
|
||||
notifications: Mapped[list["Notification"]] = relationship(
|
||||
back_populates="user",
|
||||
cascade="all, delete-orphan",
|
||||
)
|
||||
uploaded_evidence: Mapped[list["EvidenceFile"]] = relationship(
|
||||
back_populates="uploaded_by",
|
||||
)
|
||||
|
||||
__table_args__ = (
|
||||
# Email uniqueness scoped to non-deleted rows so an admin can re-invite
|
||||
# a previously-soft-deleted user.
|
||||
Index(
|
||||
"uq_users_email_active",
|
||||
"email",
|
||||
unique=True,
|
||||
postgresql_where="deleted_at IS NULL",
|
||||
),
|
||||
Index("ix_users_active", "deleted_at", postgresql_where="deleted_at IS NULL"),
|
||||
)
|
||||
|
||||
|
||||
class Group(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
|
||||
__tablename__ = "groups"
|
||||
|
||||
name: Mapped[str] = mapped_column(String(80), nullable=False)
|
||||
description: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
# Built-in groups (admin/redteam/blueteam) are protected from deletion.
|
||||
is_system: Mapped[bool] = mapped_column(default=False, nullable=False)
|
||||
|
||||
users: Mapped[list[User]] = relationship(
|
||||
secondary="user_groups",
|
||||
back_populates="groups",
|
||||
)
|
||||
permissions: Mapped[list["Permission"]] = relationship(
|
||||
secondary="group_permissions",
|
||||
back_populates="groups",
|
||||
lazy="selectin",
|
||||
)
|
||||
|
||||
__table_args__ = (
|
||||
Index(
|
||||
"uq_groups_name_active",
|
||||
"name",
|
||||
unique=True,
|
||||
postgresql_where="deleted_at IS NULL",
|
||||
),
|
||||
Index("ix_groups_active", "deleted_at", postgresql_where="deleted_at IS NULL"),
|
||||
)
|
||||
|
||||
|
||||
class Permission(Base, UuidPkMixin, TimestampMixin):
|
||||
"""Atomic permission. Code follows the `<entity>.<action>` convention."""
|
||||
|
||||
__tablename__ = "permissions"
|
||||
|
||||
code: Mapped[str] = mapped_column(String(80), unique=True, nullable=False)
|
||||
description: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
|
||||
groups: Mapped[list[Group]] = relationship(
|
||||
secondary="group_permissions",
|
||||
back_populates="permissions",
|
||||
)
|
||||
|
||||
|
||||
class UserGroup(Base):
|
||||
"""User ↔ Group join — no soft delete, just attach/detach."""
|
||||
|
||||
__tablename__ = "user_groups"
|
||||
|
||||
user_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), primary_key=True
|
||||
)
|
||||
group_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True), ForeignKey("groups.id", ondelete="CASCADE"), primary_key=True
|
||||
)
|
||||
|
||||
|
||||
class GroupPermission(Base):
|
||||
"""Group ↔ Permission join."""
|
||||
|
||||
__tablename__ = "group_permissions"
|
||||
|
||||
group_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True), ForeignKey("groups.id", ondelete="CASCADE"), primary_key=True
|
||||
)
|
||||
permission_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True), ForeignKey("permissions.id", ondelete="CASCADE"), primary_key=True
|
||||
)
|
||||
|
||||
|
||||
class Invitation(Base, UuidPkMixin, TimestampMixin):
|
||||
__tablename__ = "invitations"
|
||||
|
||||
# Hash of the URL token, never the token itself.
|
||||
token_hash: Mapped[str] = mapped_column(String(128), unique=True, nullable=False)
|
||||
email_hint: Mapped[str | None] = mapped_column(String(254), nullable=True)
|
||||
created_by_user_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True), ForeignKey("users.id", ondelete="RESTRICT"), nullable=False
|
||||
)
|
||||
expires_at: Mapped[datetime] = mapped_column(SADateTime(timezone=True), nullable=False)
|
||||
consumed_at: Mapped[datetime | None] = mapped_column(SADateTime(timezone=True), nullable=True)
|
||||
revoked_at: Mapped[datetime | None] = mapped_column(SADateTime(timezone=True), nullable=True)
|
||||
consumed_by_user_id: Mapped[uuid.UUID | None] = mapped_column(
|
||||
Uuid(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
|
||||
)
|
||||
|
||||
pre_assigned_groups: Mapped[list[Group]] = relationship(
|
||||
secondary="invitation_groups",
|
||||
lazy="selectin",
|
||||
)
|
||||
|
||||
__table_args__ = (Index("ix_invitations_expires_at", "expires_at"),)
|
||||
|
||||
|
||||
class InvitationGroup(Base):
|
||||
"""Pre-assigned groups attached to an invitation; applied at acceptance."""
|
||||
|
||||
__tablename__ = "invitation_groups"
|
||||
|
||||
invitation_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True), ForeignKey("invitations.id", ondelete="CASCADE"), primary_key=True
|
||||
)
|
||||
group_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True), ForeignKey("groups.id", ondelete="CASCADE"), primary_key=True
|
||||
)
|
||||
|
||||
|
||||
class RefreshToken(Base, UuidPkMixin, TimestampMixin):
|
||||
"""Long-lived refresh tokens. The hash, never the token, is stored."""
|
||||
|
||||
__tablename__ = "refresh_tokens"
|
||||
|
||||
user_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False
|
||||
)
|
||||
jti: Mapped[str] = mapped_column(String(64), nullable=False)
|
||||
token_hash: Mapped[str] = mapped_column(String(128), nullable=False)
|
||||
issued_at: Mapped[datetime] = mapped_column(SADateTime(timezone=True), nullable=False)
|
||||
expires_at: Mapped[datetime] = mapped_column(SADateTime(timezone=True), nullable=False)
|
||||
revoked_at: Mapped[datetime | None] = mapped_column(SADateTime(timezone=True), nullable=True)
|
||||
replaced_by_id: Mapped[uuid.UUID | None] = mapped_column(
|
||||
Uuid(as_uuid=True), ForeignKey("refresh_tokens.id", ondelete="SET NULL"), nullable=True
|
||||
)
|
||||
|
||||
user: Mapped[User] = relationship(back_populates="refresh_tokens")
|
||||
|
||||
__table_args__ = (
|
||||
UniqueConstraint("jti", name="uq_refresh_tokens_jti"),
|
||||
Index("ix_refresh_tokens_user_id_expires_at", "user_id", "expires_at"),
|
||||
)
|
||||
51
backend/app/models/evidence.py
Normal file
51
backend/app/models/evidence.py
Normal file
@@ -0,0 +1,51 @@
|
||||
"""Blue-team evidence files attached to a `mission_test`."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlalchemy import BigInteger, DateTime, ForeignKey, Index, String, Text, Uuid, func
|
||||
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||
|
||||
from app.db.base import Base
|
||||
from app.db.mixins import SoftDeleteMixin, TimestampMixin, UuidPkMixin
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from app.models.auth import User
|
||||
|
||||
|
||||
class EvidenceFile(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
|
||||
__tablename__ = "evidence_files"
|
||||
|
||||
mission_test_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("mission_tests.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
)
|
||||
sha256: Mapped[str] = mapped_column(String(64), nullable=False)
|
||||
mime: Mapped[str] = mapped_column(String(127), nullable=False)
|
||||
size_bytes: Mapped[int] = mapped_column(BigInteger, nullable=False)
|
||||
storage_path: Mapped[str] = mapped_column(Text, nullable=False)
|
||||
original_filename: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
uploaded_by_user_id: Mapped[uuid.UUID | None] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("users.id", ondelete="SET NULL"),
|
||||
nullable=True,
|
||||
)
|
||||
uploaded_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True), server_default=func.now(), nullable=False
|
||||
)
|
||||
|
||||
uploaded_by: Mapped["User | None"] = relationship(back_populates="uploaded_evidence")
|
||||
|
||||
__table_args__ = (
|
||||
Index("ix_evidence_files_mission_test", "mission_test_id"),
|
||||
Index("ix_evidence_files_sha256", "sha256"),
|
||||
Index(
|
||||
"ix_evidence_files_active",
|
||||
"deleted_at",
|
||||
postgresql_where="deleted_at IS NULL",
|
||||
),
|
||||
)
|
||||
316
backend/app/models/mission.py
Normal file
316
backend/app/models/mission.py
Normal file
@@ -0,0 +1,316 @@
|
||||
"""Missions and snapshots.
|
||||
|
||||
A `Mission` references members and a tree of snapshot rows:
|
||||
mission ─< mission_scenarios ─< mission_tests ─< (red/blue annotations)
|
||||
|
||||
Snapshots copy template fields verbatim so editing a template doesn't drift
|
||||
already-running missions. `source_*_template_id` keep a soft pointer for
|
||||
analytics, but the source rows can be soft-deleted without breaking the mission.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from datetime import date, datetime
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import (
|
||||
ARRAY,
|
||||
CheckConstraint,
|
||||
Date,
|
||||
DateTime,
|
||||
ForeignKey,
|
||||
Index,
|
||||
Integer,
|
||||
String,
|
||||
Text,
|
||||
UniqueConstraint,
|
||||
Uuid,
|
||||
)
|
||||
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||
|
||||
from app.db.base import Base
|
||||
from app.db.mixins import SoftDeleteMixin, TimestampMixin, UuidPkMixin
|
||||
|
||||
# DateTime is no longer needed since MissionMember now uses TimestampMixin.
|
||||
# The remaining DateTime usages in MissionTest (executed_at) keep the import below.
|
||||
from app.db.types import (
|
||||
MISSION_ROLE_HINTS,
|
||||
MISSION_STATUSES,
|
||||
MISSION_TEST_STATES,
|
||||
MISSION_VISIBILITY_MODES,
|
||||
MITRE_KINDS,
|
||||
OPSEC_LEVELS,
|
||||
)
|
||||
|
||||
# `mission_test_mitre_tags` deliberately denormalises the MITRE labels so a
|
||||
# mission's tags survive a MITRE re-sync that drops the original entry. The
|
||||
# FK columns were removed in favour of frozen `mitre_external_id` + `mitre_name`
|
||||
# snapshots — see spec §11 ("snapshot vs reference" risk).
|
||||
|
||||
|
||||
class Mission(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
|
||||
__tablename__ = "missions"
|
||||
|
||||
name: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
client_target: Mapped[str | None] = mapped_column(String(255), nullable=True)
|
||||
date_start: Mapped[date | None] = mapped_column(Date, nullable=True)
|
||||
date_end: Mapped[date | None] = mapped_column(Date, nullable=True)
|
||||
status: Mapped[str] = mapped_column(String(16), default="draft", nullable=False)
|
||||
description_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
visibility_mode: Mapped[str] = mapped_column(
|
||||
String(16), default="whitebox", nullable=False
|
||||
)
|
||||
|
||||
members: Mapped[list["MissionMember"]] = relationship(
|
||||
back_populates="mission",
|
||||
cascade="all, delete-orphan",
|
||||
)
|
||||
scenarios: Mapped[list["MissionScenario"]] = relationship(
|
||||
back_populates="mission",
|
||||
cascade="all, delete-orphan",
|
||||
order_by="MissionScenario.position",
|
||||
)
|
||||
categories: Mapped[list["MissionCategory"]] = relationship(
|
||||
back_populates="mission",
|
||||
cascade="all, delete-orphan",
|
||||
order_by="MissionCategory.position",
|
||||
)
|
||||
|
||||
__table_args__ = (
|
||||
CheckConstraint(
|
||||
f"status IN ({', '.join(repr(v) for v in MISSION_STATUSES)})",
|
||||
name="status_valid",
|
||||
),
|
||||
CheckConstraint(
|
||||
f"visibility_mode IN ({', '.join(repr(v) for v in MISSION_VISIBILITY_MODES)})",
|
||||
name="visibility_mode_valid",
|
||||
),
|
||||
Index("ix_missions_active", "deleted_at", postgresql_where="deleted_at IS NULL"),
|
||||
Index("ix_missions_status", "status"),
|
||||
)
|
||||
|
||||
|
||||
class MissionMember(Base, TimestampMixin):
|
||||
"""A user's membership in a mission with a hint about their team side."""
|
||||
|
||||
__tablename__ = "mission_members"
|
||||
|
||||
mission_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("missions.id", ondelete="CASCADE"),
|
||||
primary_key=True,
|
||||
)
|
||||
user_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("users.id", ondelete="CASCADE"),
|
||||
primary_key=True,
|
||||
)
|
||||
role_hint: Mapped[str] = mapped_column(String(8), nullable=False)
|
||||
|
||||
mission: Mapped[Mission] = relationship(back_populates="members")
|
||||
|
||||
__table_args__ = (
|
||||
CheckConstraint(
|
||||
f"role_hint IN ({', '.join(repr(v) for v in MISSION_ROLE_HINTS)})",
|
||||
name="role_hint_valid",
|
||||
),
|
||||
Index("ix_mission_members_user", "user_id"),
|
||||
)
|
||||
|
||||
|
||||
class MissionScenario(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
|
||||
"""Snapshot of a `scenario_template` instantiated within a mission."""
|
||||
|
||||
__tablename__ = "mission_scenarios"
|
||||
|
||||
mission_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("missions.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
)
|
||||
source_scenario_template_id: Mapped[uuid.UUID | None] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("scenario_templates.id", ondelete="SET NULL"),
|
||||
nullable=True,
|
||||
)
|
||||
snapshot_name: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
snapshot_description: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
position: Mapped[int] = mapped_column(Integer, nullable=False)
|
||||
|
||||
mission: Mapped[Mission] = relationship(back_populates="scenarios")
|
||||
tests: Mapped[list["MissionTest"]] = relationship(
|
||||
back_populates="scenario",
|
||||
cascade="all, delete-orphan",
|
||||
order_by="MissionTest.position",
|
||||
)
|
||||
|
||||
__table_args__ = (
|
||||
UniqueConstraint(
|
||||
"mission_id", "position", name="uq_mission_scenarios_position"
|
||||
),
|
||||
Index("ix_mission_scenarios_mission", "mission_id"),
|
||||
Index(
|
||||
"ix_mission_scenarios_active",
|
||||
"deleted_at",
|
||||
postgresql_where="deleted_at IS NULL",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class MissionTest(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
|
||||
"""Snapshot of a `test_template` + execution state + red/blue annotations."""
|
||||
|
||||
__tablename__ = "mission_tests"
|
||||
|
||||
scenario_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("mission_scenarios.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
)
|
||||
source_test_template_id: Mapped[uuid.UUID | None] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("test_templates.id", ondelete="SET NULL"),
|
||||
nullable=True,
|
||||
)
|
||||
position: Mapped[int] = mapped_column(Integer, nullable=False)
|
||||
|
||||
# --- Snapshot of the template (immutable after creation) ---
|
||||
snapshot_name: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
snapshot_description: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
snapshot_objective: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
snapshot_procedure_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
snapshot_prerequisites_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
snapshot_expected_red_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
snapshot_expected_blue_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
snapshot_opsec_level: Mapped[str] = mapped_column(
|
||||
String(8), default="medium", nullable=False
|
||||
)
|
||||
snapshot_tags: Mapped[list[str]] = mapped_column(
|
||||
ARRAY(String(64)), nullable=False, server_default="{}"
|
||||
)
|
||||
snapshot_expected_iocs: Mapped[list[str]] = mapped_column(
|
||||
ARRAY(String(255)), nullable=False, server_default="{}"
|
||||
)
|
||||
|
||||
# --- Execution state ---
|
||||
state: Mapped[str] = mapped_column(String(24), default="pending", nullable=False)
|
||||
executed_at: Mapped[datetime | None] = mapped_column(
|
||||
DateTime(timezone=True), nullable=True
|
||||
)
|
||||
executed_at_overridden: Mapped[bool] = mapped_column(default=False, nullable=False)
|
||||
|
||||
# --- Red side (text-only per spec §4) ---
|
||||
red_command: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
red_output: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
red_comment_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
|
||||
# --- Blue side ---
|
||||
blue_comment_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
detection_level_id: Mapped[uuid.UUID | None] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("detection_levels.id", ondelete="SET NULL"),
|
||||
nullable=True,
|
||||
)
|
||||
category_id: Mapped[uuid.UUID | None] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("mission_categories.id", ondelete="SET NULL"),
|
||||
nullable=True,
|
||||
)
|
||||
|
||||
scenario: Mapped[MissionScenario] = relationship(back_populates="tests")
|
||||
mitre_tags: Mapped[list["MissionTestMitreTag"]] = relationship(
|
||||
back_populates="mission_test",
|
||||
cascade="all, delete-orphan",
|
||||
lazy="selectin",
|
||||
)
|
||||
|
||||
__table_args__ = (
|
||||
CheckConstraint(
|
||||
f"snapshot_opsec_level IN ({', '.join(repr(v) for v in OPSEC_LEVELS)})",
|
||||
name="snapshot_opsec_level_valid",
|
||||
),
|
||||
CheckConstraint(
|
||||
f"state IN ({', '.join(repr(v) for v in MISSION_TEST_STATES)})",
|
||||
name="state_valid",
|
||||
),
|
||||
UniqueConstraint("scenario_id", "position", name="uq_mission_tests_position"),
|
||||
Index("ix_mission_tests_state", "state"),
|
||||
Index(
|
||||
"ix_mission_tests_active",
|
||||
"deleted_at",
|
||||
postgresql_where="deleted_at IS NULL",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class MissionCategory(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
|
||||
"""Optional custom grouping override for the slide synthesis."""
|
||||
|
||||
__tablename__ = "mission_categories"
|
||||
|
||||
mission_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("missions.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
)
|
||||
name: Mapped[str] = mapped_column(String(120), nullable=False)
|
||||
color_token: Mapped[str | None] = mapped_column(String(16), nullable=True)
|
||||
position: Mapped[int] = mapped_column(Integer, nullable=False)
|
||||
|
||||
mission: Mapped[Mission] = relationship(back_populates="categories")
|
||||
|
||||
__table_args__ = (
|
||||
UniqueConstraint(
|
||||
"mission_id", "position", name="uq_mission_categories_position"
|
||||
),
|
||||
UniqueConstraint("mission_id", "name", name="uq_mission_categories_name"),
|
||||
Index(
|
||||
"ix_mission_categories_active",
|
||||
"deleted_at",
|
||||
postgresql_where="deleted_at IS NULL",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class MissionTestMitreTag(Base):
|
||||
"""Frozen MITRE tag attached to a mission test.
|
||||
|
||||
DELIBERATELY DENORMALISED — no FK to mitre_* tables. The MITRE
|
||||
`external_id` and human label are copied at tag-creation time so that a
|
||||
later MITRE re-sync that drops the original entry cannot purge or alter
|
||||
a mission's tags. See spec §11 (snapshot vs reference).
|
||||
|
||||
The companion `test_template_mitre_tags` table keeps the FK relationship
|
||||
because templates are editable and admins can re-tag them after a sync.
|
||||
"""
|
||||
|
||||
__tablename__ = "mission_test_mitre_tags"
|
||||
|
||||
id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True), primary_key=True, default=uuid.uuid4, nullable=False
|
||||
)
|
||||
mission_test_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("mission_tests.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
)
|
||||
mitre_kind: Mapped[str] = mapped_column(String(16), nullable=False)
|
||||
mitre_external_id: Mapped[str] = mapped_column(String(16), nullable=False)
|
||||
mitre_name: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
mitre_url: Mapped[str | None] = mapped_column(String(512), nullable=True)
|
||||
|
||||
mission_test: Mapped[MissionTest] = relationship(back_populates="mitre_tags")
|
||||
|
||||
__table_args__: Any = (
|
||||
CheckConstraint(
|
||||
f"mitre_kind IN ({', '.join(repr(v) for v in MITRE_KINDS)})",
|
||||
name="mitre_kind_valid",
|
||||
),
|
||||
UniqueConstraint(
|
||||
"mission_test_id",
|
||||
"mitre_external_id",
|
||||
name="uq_mission_test_mitre_tag",
|
||||
),
|
||||
Index("ix_mission_test_mitre_tags_test", "mission_test_id"),
|
||||
)
|
||||
86
backend/app/models/mitre.py
Normal file
86
backend/app/models/mitre.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""MITRE ATT&CK reference tables.
|
||||
|
||||
Read-mostly. Hard delete (no soft-delete) — replaced by the periodic sync job.
|
||||
A technique can map to multiple tactics (kill_chain_phases in STIX) hence the
|
||||
M2M `technique_tactics` join. Sub-techniques inherit their parent's tactics
|
||||
through the parent technique.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
|
||||
from sqlalchemy import ForeignKey, Index, String, Text, Uuid
|
||||
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||
|
||||
from app.db.base import Base
|
||||
from app.db.mixins import TimestampMixin, UuidPkMixin
|
||||
|
||||
|
||||
class MitreTactic(Base, UuidPkMixin, TimestampMixin):
|
||||
__tablename__ = "mitre_tactics"
|
||||
|
||||
external_id: Mapped[str] = mapped_column(String(16), unique=True, nullable=False)
|
||||
short_name: Mapped[str] = mapped_column(String(80), nullable=False)
|
||||
name: Mapped[str] = mapped_column(String(120), nullable=False)
|
||||
description: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
url: Mapped[str | None] = mapped_column(String(512), nullable=True)
|
||||
|
||||
techniques: Mapped[list["MitreTechnique"]] = relationship(
|
||||
secondary="mitre_technique_tactics",
|
||||
back_populates="tactics",
|
||||
)
|
||||
|
||||
|
||||
class MitreTechnique(Base, UuidPkMixin, TimestampMixin):
|
||||
__tablename__ = "mitre_techniques"
|
||||
|
||||
external_id: Mapped[str] = mapped_column(String(16), unique=True, nullable=False)
|
||||
name: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
description: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
url: Mapped[str | None] = mapped_column(String(512), nullable=True)
|
||||
|
||||
tactics: Mapped[list[MitreTactic]] = relationship(
|
||||
secondary="mitre_technique_tactics",
|
||||
back_populates="techniques",
|
||||
lazy="selectin",
|
||||
)
|
||||
subtechniques: Mapped[list["MitreSubtechnique"]] = relationship(
|
||||
back_populates="technique",
|
||||
cascade="all, delete-orphan",
|
||||
)
|
||||
|
||||
__table_args__ = (Index("ix_mitre_techniques_name", "name"),)
|
||||
|
||||
|
||||
class MitreSubtechnique(Base, UuidPkMixin, TimestampMixin):
|
||||
__tablename__ = "mitre_subtechniques"
|
||||
|
||||
external_id: Mapped[str] = mapped_column(String(16), unique=True, nullable=False)
|
||||
name: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
description: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
url: Mapped[str | None] = mapped_column(String(512), nullable=True)
|
||||
technique_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True), ForeignKey("mitre_techniques.id", ondelete="CASCADE"), nullable=False
|
||||
)
|
||||
|
||||
technique: Mapped[MitreTechnique] = relationship(back_populates="subtechniques")
|
||||
|
||||
__table_args__ = (Index("ix_mitre_subtechniques_technique_id", "technique_id"),)
|
||||
|
||||
|
||||
class MitreTechniqueTactic(Base):
|
||||
"""Many-to-many: a technique can serve several tactics (STIX kill_chain_phases)."""
|
||||
|
||||
__tablename__ = "mitre_technique_tactics"
|
||||
|
||||
technique_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("mitre_techniques.id", ondelete="CASCADE"),
|
||||
primary_key=True,
|
||||
)
|
||||
tactic_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("mitre_tactics.id", ondelete="CASCADE"),
|
||||
primary_key=True,
|
||||
)
|
||||
41
backend/app/models/notification.py
Normal file
41
backend/app/models/notification.py
Normal file
@@ -0,0 +1,41 @@
|
||||
"""In-app notifications. Mail is out-of-scope for v1 (spec §4)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from typing import Any, TYPE_CHECKING
|
||||
|
||||
from sqlalchemy import DateTime, ForeignKey, Index, String, Uuid
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||
|
||||
from app.db.base import Base
|
||||
from app.db.mixins import TimestampMixin, UuidPkMixin
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from app.models.auth import User
|
||||
|
||||
|
||||
class Notification(Base, UuidPkMixin, TimestampMixin):
|
||||
__tablename__ = "notifications"
|
||||
|
||||
user_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("users.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
)
|
||||
type: Mapped[str] = mapped_column(String(64), nullable=False)
|
||||
payload: Mapped[Any] = mapped_column(JSONB, nullable=False, server_default="{}")
|
||||
read_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
||||
|
||||
user: Mapped["User"] = relationship(back_populates="notifications")
|
||||
|
||||
__table_args__ = (
|
||||
Index(
|
||||
"ix_notifications_user_unread",
|
||||
"user_id",
|
||||
"created_at",
|
||||
postgresql_where="read_at IS NULL",
|
||||
),
|
||||
)
|
||||
37
backend/app/models/setting.py
Normal file
37
backend/app/models/setting.py
Normal file
@@ -0,0 +1,37 @@
|
||||
"""Platform settings (key/value JSONB) and admin-defined detection levels."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import Integer, String, Text
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from app.db.base import Base
|
||||
from app.db.mixins import TimestampMixin, UuidPkMixin
|
||||
|
||||
|
||||
class Setting(Base, TimestampMixin):
|
||||
__tablename__ = "settings"
|
||||
|
||||
key: Mapped[str] = mapped_column(String(80), primary_key=True)
|
||||
value: Mapped[Any] = mapped_column(JSONB, nullable=False)
|
||||
description: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
|
||||
|
||||
class DetectionLevel(Base, UuidPkMixin, TimestampMixin):
|
||||
"""Custom taxonomy admin can edit (cf. spec §F6).
|
||||
|
||||
Pre-seeded with detected_blocked / detected_alert / logged_only / not_detected.
|
||||
"""
|
||||
|
||||
__tablename__ = "detection_levels"
|
||||
|
||||
key: Mapped[str] = mapped_column(String(40), unique=True, nullable=False)
|
||||
label_fr: Mapped[str] = mapped_column(String(80), nullable=False)
|
||||
label_en: Mapped[str] = mapped_column(String(80), nullable=False)
|
||||
color_token: Mapped[str] = mapped_column(String(16), nullable=False)
|
||||
position: Mapped[int] = mapped_column(Integer, nullable=False)
|
||||
is_default: Mapped[bool] = mapped_column(default=False, nullable=False)
|
||||
is_system: Mapped[bool] = mapped_column(default=False, nullable=False)
|
||||
174
backend/app/models/template.py
Normal file
174
backend/app/models/template.py
Normal file
@@ -0,0 +1,174 @@
|
||||
"""Reusable templates: test_templates and scenario_templates.
|
||||
|
||||
A `mission_scenarios` row is a snapshot copy of a `scenario_templates` row at
|
||||
mission-creation time. Templates can therefore be edited freely without
|
||||
disturbing already-running missions.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import (
|
||||
ARRAY,
|
||||
CheckConstraint,
|
||||
ForeignKey,
|
||||
Index,
|
||||
Integer,
|
||||
String,
|
||||
Text,
|
||||
UniqueConstraint,
|
||||
Uuid,
|
||||
)
|
||||
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||
|
||||
from app.db.base import Base
|
||||
from app.db.mixins import SoftDeleteMixin, TimestampMixin, UuidPkMixin
|
||||
from app.db.types import MITRE_KINDS, OPSEC_LEVELS
|
||||
|
||||
|
||||
class TestTemplate(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
|
||||
__tablename__ = "test_templates"
|
||||
|
||||
name: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
description: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
objective: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
procedure_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
prerequisites_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
expected_result_red_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
expected_detection_blue_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
opsec_level: Mapped[str] = mapped_column(String(8), default="medium", nullable=False)
|
||||
tags: Mapped[list[str]] = mapped_column(
|
||||
ARRAY(String(64)), nullable=False, server_default="{}"
|
||||
)
|
||||
expected_iocs: Mapped[list[str]] = mapped_column(
|
||||
ARRAY(String(255)), nullable=False, server_default="{}"
|
||||
)
|
||||
|
||||
mitre_tags: Mapped[list["TestTemplateMitreTag"]] = relationship(
|
||||
back_populates="test_template",
|
||||
cascade="all, delete-orphan",
|
||||
lazy="selectin",
|
||||
)
|
||||
|
||||
__table_args__ = (
|
||||
CheckConstraint(
|
||||
f"opsec_level IN ({', '.join(repr(v) for v in OPSEC_LEVELS)})",
|
||||
name="opsec_level_valid",
|
||||
),
|
||||
Index("ix_test_templates_active", "deleted_at", postgresql_where="deleted_at IS NULL"),
|
||||
Index("ix_test_templates_name", "name"),
|
||||
)
|
||||
|
||||
|
||||
class TestTemplateMitreTag(Base):
|
||||
"""Polymorphic MITRE tag on a test template.
|
||||
|
||||
Exactly one of `tactic_id`, `technique_id`, `subtechnique_id` is set —
|
||||
enforced by the CHECK constraint. This keeps FK integrity per MITRE level
|
||||
while letting a single conceptual table answer "what's tagged on this test".
|
||||
"""
|
||||
|
||||
__tablename__ = "test_template_mitre_tags"
|
||||
|
||||
id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True), primary_key=True, default=uuid.uuid4, nullable=False
|
||||
)
|
||||
test_template_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("test_templates.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
)
|
||||
mitre_kind: Mapped[str] = mapped_column(String(16), nullable=False)
|
||||
tactic_id: Mapped[uuid.UUID | None] = mapped_column(
|
||||
Uuid(as_uuid=True), ForeignKey("mitre_tactics.id", ondelete="CASCADE"), nullable=True
|
||||
)
|
||||
technique_id: Mapped[uuid.UUID | None] = mapped_column(
|
||||
Uuid(as_uuid=True), ForeignKey("mitre_techniques.id", ondelete="CASCADE"), nullable=True
|
||||
)
|
||||
subtechnique_id: Mapped[uuid.UUID | None] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("mitre_subtechniques.id", ondelete="CASCADE"),
|
||||
nullable=True,
|
||||
)
|
||||
|
||||
test_template: Mapped[TestTemplate] = relationship(back_populates="mitre_tags")
|
||||
|
||||
__table_args__: Any = (
|
||||
CheckConstraint(
|
||||
f"mitre_kind IN ({', '.join(repr(v) for v in MITRE_KINDS)})",
|
||||
name="mitre_kind_valid",
|
||||
),
|
||||
CheckConstraint(
|
||||
"(CASE WHEN tactic_id IS NOT NULL THEN 1 ELSE 0 END) "
|
||||
"+ (CASE WHEN technique_id IS NOT NULL THEN 1 ELSE 0 END) "
|
||||
"+ (CASE WHEN subtechnique_id IS NOT NULL THEN 1 ELSE 0 END) = 1",
|
||||
name="exactly_one_mitre_fk",
|
||||
),
|
||||
UniqueConstraint(
|
||||
"test_template_id",
|
||||
"tactic_id",
|
||||
"technique_id",
|
||||
"subtechnique_id",
|
||||
name="uq_test_template_mitre_tag",
|
||||
),
|
||||
Index("ix_test_template_mitre_tags_template", "test_template_id"),
|
||||
)
|
||||
|
||||
|
||||
class ScenarioTemplate(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
|
||||
__tablename__ = "scenario_templates"
|
||||
|
||||
name: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
description: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
|
||||
tests: Mapped[list["ScenarioTemplateTest"]] = relationship(
|
||||
back_populates="scenario_template",
|
||||
cascade="all, delete-orphan",
|
||||
order_by="ScenarioTemplateTest.position",
|
||||
)
|
||||
|
||||
__table_args__ = (
|
||||
Index(
|
||||
"ix_scenario_templates_active",
|
||||
"deleted_at",
|
||||
postgresql_where="deleted_at IS NULL",
|
||||
),
|
||||
Index("ix_scenario_templates_name", "name"),
|
||||
)
|
||||
|
||||
|
||||
class ScenarioTemplateTest(Base, UuidPkMixin):
|
||||
"""Ordered membership of a test template inside a scenario template.
|
||||
|
||||
UUID PK + UNIQUE(scenario_template_id, position) lets the same test appear
|
||||
multiple times at different positions (chained operations are common in
|
||||
purple-team scenarios).
|
||||
"""
|
||||
|
||||
__tablename__ = "scenario_template_tests"
|
||||
|
||||
scenario_template_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("scenario_templates.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
)
|
||||
test_template_id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid(as_uuid=True),
|
||||
ForeignKey("test_templates.id", ondelete="RESTRICT"),
|
||||
nullable=False,
|
||||
)
|
||||
position: Mapped[int] = mapped_column(Integer, nullable=False)
|
||||
|
||||
scenario_template: Mapped[ScenarioTemplate] = relationship(back_populates="tests")
|
||||
|
||||
__table_args__ = (
|
||||
UniqueConstraint(
|
||||
"scenario_template_id",
|
||||
"position",
|
||||
name="uq_scenario_template_tests_position",
|
||||
),
|
||||
Index("ix_scenario_template_tests_scenario", "scenario_template_id"),
|
||||
Index("ix_scenario_template_tests_test", "test_template_id"),
|
||||
)
|
||||
Reference in New Issue
Block a user