23 tables + alembic_version covering the v1 data model:
- Auth/RBAC (8): users, groups, permissions, user_groups, group_permissions,
invitations, invitation_groups, refresh_tokens.
- MITRE (4): mitre_tactics, mitre_techniques, mitre_subtechniques + the
technique↔tactic many-to-many.
- Templates (4): test_templates, test_template_mitre_tags (3 nullable FKs +
CHECK exactly_one_mitre_fk), scenario_templates, scenario_template_tests
(UUID PK + UNIQUE(scenario_id, position) so a test can appear at multiple
positions).
- Missions (6): missions, mission_members, mission_scenarios, mission_tests,
mission_test_mitre_tags (deliberately denormalised — copies external_id +
name + url, no FK to mitre_* — so a re-sync of the catalogue can't purge
historical tags), mission_categories.
- Evidence/settings/notifications (5): evidence_files, settings (JSONB
value), detection_levels, notifications.
SQLAlchemy 2.x with Mapped[]/mapped_column(), pk_/fk_/ck_/uq_/ix_ naming
convention. Reusable mixins (UuidPkMixin, TimestampMixin, SoftDeleteMixin —
no auto __table_args__ since classes silently clobber the mixin's).
Soft delete: deleted_at + partial indexes ix_<table>_active WHERE deleted_at
IS NULL on 9 tables (users, groups, test_templates, scenario_templates,
missions, mission_scenarios, mission_tests, mission_categories,
evidence_files). Notifications gets ix_..._unread WHERE read_at IS NULL.
CHECK constraints for status / state / opsec_level / mitre_kind enums.
New API endpoint GET /api/v1/diag/db: returns alembic_revision (short hash)
and the public-schema table_count. 503 with {"reachable": false} on a DB
outage. Database card on the SPA home consumes it.
Test stage in backend/Dockerfile (--target test): runtime + dev extras +
tests/. New make test-api spins an ephemeral pytest container against the
live DB on the compose network. backend/tests/test_schema.py: 8 integration
tests (tables, FK pairs, CHECK constraints, partial indexes, alembic-at-head,
negative INSERT proving the exactly_one_mitre_fk CHECK fires).
e2e/tests/m1-db.spec.ts: 4 Playwright tests covering the diag endpoint
contract + the Database card + footer/roadmap labels.
DoD: make clean && make up && make migrate → 23 tables, 32 FKs, 9 CHECKs,
make test-api → 9 passed, make e2e → 12 passed.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
175 lines
6.0 KiB
Python
175 lines
6.0 KiB
Python
"""Reusable templates: test_templates and scenario_templates.
|
|
|
|
A `mission_scenarios` row is a snapshot copy of a `scenario_templates` row at
|
|
mission-creation time. Templates can therefore be edited freely without
|
|
disturbing already-running missions.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from typing import Any
|
|
|
|
from sqlalchemy import (
|
|
ARRAY,
|
|
CheckConstraint,
|
|
ForeignKey,
|
|
Index,
|
|
Integer,
|
|
String,
|
|
Text,
|
|
UniqueConstraint,
|
|
Uuid,
|
|
)
|
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
|
|
|
from app.db.base import Base
|
|
from app.db.mixins import SoftDeleteMixin, TimestampMixin, UuidPkMixin
|
|
from app.db.types import MITRE_KINDS, OPSEC_LEVELS
|
|
|
|
|
|
class TestTemplate(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
|
|
__tablename__ = "test_templates"
|
|
|
|
name: Mapped[str] = mapped_column(String(255), nullable=False)
|
|
description: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
objective: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
procedure_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
prerequisites_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
expected_result_red_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
expected_detection_blue_md: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
opsec_level: Mapped[str] = mapped_column(String(8), default="medium", nullable=False)
|
|
tags: Mapped[list[str]] = mapped_column(
|
|
ARRAY(String(64)), nullable=False, server_default="{}"
|
|
)
|
|
expected_iocs: Mapped[list[str]] = mapped_column(
|
|
ARRAY(String(255)), nullable=False, server_default="{}"
|
|
)
|
|
|
|
mitre_tags: Mapped[list["TestTemplateMitreTag"]] = relationship(
|
|
back_populates="test_template",
|
|
cascade="all, delete-orphan",
|
|
lazy="selectin",
|
|
)
|
|
|
|
__table_args__ = (
|
|
CheckConstraint(
|
|
f"opsec_level IN ({', '.join(repr(v) for v in OPSEC_LEVELS)})",
|
|
name="opsec_level_valid",
|
|
),
|
|
Index("ix_test_templates_active", "deleted_at", postgresql_where="deleted_at IS NULL"),
|
|
Index("ix_test_templates_name", "name"),
|
|
)
|
|
|
|
|
|
class TestTemplateMitreTag(Base):
|
|
"""Polymorphic MITRE tag on a test template.
|
|
|
|
Exactly one of `tactic_id`, `technique_id`, `subtechnique_id` is set —
|
|
enforced by the CHECK constraint. This keeps FK integrity per MITRE level
|
|
while letting a single conceptual table answer "what's tagged on this test".
|
|
"""
|
|
|
|
__tablename__ = "test_template_mitre_tags"
|
|
|
|
id: Mapped[uuid.UUID] = mapped_column(
|
|
Uuid(as_uuid=True), primary_key=True, default=uuid.uuid4, nullable=False
|
|
)
|
|
test_template_id: Mapped[uuid.UUID] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
ForeignKey("test_templates.id", ondelete="CASCADE"),
|
|
nullable=False,
|
|
)
|
|
mitre_kind: Mapped[str] = mapped_column(String(16), nullable=False)
|
|
tactic_id: Mapped[uuid.UUID | None] = mapped_column(
|
|
Uuid(as_uuid=True), ForeignKey("mitre_tactics.id", ondelete="CASCADE"), nullable=True
|
|
)
|
|
technique_id: Mapped[uuid.UUID | None] = mapped_column(
|
|
Uuid(as_uuid=True), ForeignKey("mitre_techniques.id", ondelete="CASCADE"), nullable=True
|
|
)
|
|
subtechnique_id: Mapped[uuid.UUID | None] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
ForeignKey("mitre_subtechniques.id", ondelete="CASCADE"),
|
|
nullable=True,
|
|
)
|
|
|
|
test_template: Mapped[TestTemplate] = relationship(back_populates="mitre_tags")
|
|
|
|
__table_args__: Any = (
|
|
CheckConstraint(
|
|
f"mitre_kind IN ({', '.join(repr(v) for v in MITRE_KINDS)})",
|
|
name="mitre_kind_valid",
|
|
),
|
|
CheckConstraint(
|
|
"(CASE WHEN tactic_id IS NOT NULL THEN 1 ELSE 0 END) "
|
|
"+ (CASE WHEN technique_id IS NOT NULL THEN 1 ELSE 0 END) "
|
|
"+ (CASE WHEN subtechnique_id IS NOT NULL THEN 1 ELSE 0 END) = 1",
|
|
name="exactly_one_mitre_fk",
|
|
),
|
|
UniqueConstraint(
|
|
"test_template_id",
|
|
"tactic_id",
|
|
"technique_id",
|
|
"subtechnique_id",
|
|
name="uq_test_template_mitre_tag",
|
|
),
|
|
Index("ix_test_template_mitre_tags_template", "test_template_id"),
|
|
)
|
|
|
|
|
|
class ScenarioTemplate(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
|
|
__tablename__ = "scenario_templates"
|
|
|
|
name: Mapped[str] = mapped_column(String(255), nullable=False)
|
|
description: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
|
|
tests: Mapped[list["ScenarioTemplateTest"]] = relationship(
|
|
back_populates="scenario_template",
|
|
cascade="all, delete-orphan",
|
|
order_by="ScenarioTemplateTest.position",
|
|
)
|
|
|
|
__table_args__ = (
|
|
Index(
|
|
"ix_scenario_templates_active",
|
|
"deleted_at",
|
|
postgresql_where="deleted_at IS NULL",
|
|
),
|
|
Index("ix_scenario_templates_name", "name"),
|
|
)
|
|
|
|
|
|
class ScenarioTemplateTest(Base, UuidPkMixin):
|
|
"""Ordered membership of a test template inside a scenario template.
|
|
|
|
UUID PK + UNIQUE(scenario_template_id, position) lets the same test appear
|
|
multiple times at different positions (chained operations are common in
|
|
purple-team scenarios).
|
|
"""
|
|
|
|
__tablename__ = "scenario_template_tests"
|
|
|
|
scenario_template_id: Mapped[uuid.UUID] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
ForeignKey("scenario_templates.id", ondelete="CASCADE"),
|
|
nullable=False,
|
|
)
|
|
test_template_id: Mapped[uuid.UUID] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
ForeignKey("test_templates.id", ondelete="RESTRICT"),
|
|
nullable=False,
|
|
)
|
|
position: Mapped[int] = mapped_column(Integer, nullable=False)
|
|
|
|
scenario_template: Mapped[ScenarioTemplate] = relationship(back_populates="tests")
|
|
|
|
__table_args__ = (
|
|
UniqueConstraint(
|
|
"scenario_template_id",
|
|
"position",
|
|
name="uq_scenario_template_tests_position",
|
|
),
|
|
Index("ix_scenario_template_tests_scenario", "scenario_template_id"),
|
|
Index("ix_scenario_template_tests_test", "test_template_id"),
|
|
)
|