Files
Metamorph/backend/app/models/template.py

175 lines
6.0 KiB
Python
Raw Normal View History

feat(m1): DB schema, migrations, diag visibility 23 tables + alembic_version covering the v1 data model: - Auth/RBAC (8): users, groups, permissions, user_groups, group_permissions, invitations, invitation_groups, refresh_tokens. - MITRE (4): mitre_tactics, mitre_techniques, mitre_subtechniques + the technique↔tactic many-to-many. - Templates (4): test_templates, test_template_mitre_tags (3 nullable FKs + CHECK exactly_one_mitre_fk), scenario_templates, scenario_template_tests (UUID PK + UNIQUE(scenario_id, position) so a test can appear at multiple positions). - Missions (6): missions, mission_members, mission_scenarios, mission_tests, mission_test_mitre_tags (deliberately denormalised — copies external_id + name + url, no FK to mitre_* — so a re-sync of the catalogue can't purge historical tags), mission_categories. - Evidence/settings/notifications (5): evidence_files, settings (JSONB value), detection_levels, notifications. SQLAlchemy 2.x with Mapped[]/mapped_column(), pk_/fk_/ck_/uq_/ix_ naming convention. Reusable mixins (UuidPkMixin, TimestampMixin, SoftDeleteMixin — no auto __table_args__ since classes silently clobber the mixin's). Soft delete: deleted_at + partial indexes ix_<table>_active WHERE deleted_at IS NULL on 9 tables (users, groups, test_templates, scenario_templates, missions, mission_scenarios, mission_tests, mission_categories, evidence_files). Notifications gets ix_..._unread WHERE read_at IS NULL. CHECK constraints for status / state / opsec_level / mitre_kind enums. New API endpoint GET /api/v1/diag/db: returns alembic_revision (short hash) and the public-schema table_count. 503 with {"reachable": false} on a DB outage. Database card on the SPA home consumes it. Test stage in backend/Dockerfile (--target test): runtime + dev extras + tests/. New make test-api spins an ephemeral pytest container against the live DB on the compose network. backend/tests/test_schema.py: 8 integration tests (tables, FK pairs, CHECK constraints, partial indexes, alembic-at-head, negative INSERT proving the exactly_one_mitre_fk CHECK fires). e2e/tests/m1-db.spec.ts: 4 Playwright tests covering the diag endpoint contract + the Database card + footer/roadmap labels. DoD: make clean && make up && make migrate → 23 tables, 32 FKs, 9 CHECKs, make test-api → 9 passed, make e2e → 12 passed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 06:16:24 +02:00
"""Reusable templates: test_templates and scenario_templates.
A `mission_scenarios` row is a snapshot copy of a `scenario_templates` row at
mission-creation time. Templates can therefore be edited freely without
disturbing already-running missions.
"""
from __future__ import annotations
import uuid
from typing import Any
from sqlalchemy import (
ARRAY,
CheckConstraint,
ForeignKey,
Index,
Integer,
String,
Text,
UniqueConstraint,
Uuid,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.db.base import Base
from app.db.mixins import SoftDeleteMixin, TimestampMixin, UuidPkMixin
from app.db.types import MITRE_KINDS, OPSEC_LEVELS
class TestTemplate(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
__tablename__ = "test_templates"
name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
objective: Mapped[str | None] = mapped_column(Text, nullable=True)
procedure_md: Mapped[str | None] = mapped_column(Text, nullable=True)
prerequisites_md: Mapped[str | None] = mapped_column(Text, nullable=True)
expected_result_red_md: Mapped[str | None] = mapped_column(Text, nullable=True)
expected_detection_blue_md: Mapped[str | None] = mapped_column(Text, nullable=True)
opsec_level: Mapped[str] = mapped_column(String(8), default="medium", nullable=False)
tags: Mapped[list[str]] = mapped_column(
ARRAY(String(64)), nullable=False, server_default="{}"
)
expected_iocs: Mapped[list[str]] = mapped_column(
ARRAY(String(255)), nullable=False, server_default="{}"
)
mitre_tags: Mapped[list["TestTemplateMitreTag"]] = relationship(
back_populates="test_template",
cascade="all, delete-orphan",
lazy="selectin",
)
__table_args__ = (
CheckConstraint(
f"opsec_level IN ({', '.join(repr(v) for v in OPSEC_LEVELS)})",
name="opsec_level_valid",
),
Index("ix_test_templates_active", "deleted_at", postgresql_where="deleted_at IS NULL"),
Index("ix_test_templates_name", "name"),
)
class TestTemplateMitreTag(Base):
"""Polymorphic MITRE tag on a test template.
Exactly one of `tactic_id`, `technique_id`, `subtechnique_id` is set
enforced by the CHECK constraint. This keeps FK integrity per MITRE level
while letting a single conceptual table answer "what's tagged on this test".
"""
__tablename__ = "test_template_mitre_tags"
id: Mapped[uuid.UUID] = mapped_column(
Uuid(as_uuid=True), primary_key=True, default=uuid.uuid4, nullable=False
)
test_template_id: Mapped[uuid.UUID] = mapped_column(
Uuid(as_uuid=True),
ForeignKey("test_templates.id", ondelete="CASCADE"),
nullable=False,
)
mitre_kind: Mapped[str] = mapped_column(String(16), nullable=False)
tactic_id: Mapped[uuid.UUID | None] = mapped_column(
Uuid(as_uuid=True), ForeignKey("mitre_tactics.id", ondelete="CASCADE"), nullable=True
)
technique_id: Mapped[uuid.UUID | None] = mapped_column(
Uuid(as_uuid=True), ForeignKey("mitre_techniques.id", ondelete="CASCADE"), nullable=True
)
subtechnique_id: Mapped[uuid.UUID | None] = mapped_column(
Uuid(as_uuid=True),
ForeignKey("mitre_subtechniques.id", ondelete="CASCADE"),
nullable=True,
)
test_template: Mapped[TestTemplate] = relationship(back_populates="mitre_tags")
__table_args__: Any = (
CheckConstraint(
f"mitre_kind IN ({', '.join(repr(v) for v in MITRE_KINDS)})",
name="mitre_kind_valid",
),
CheckConstraint(
"(CASE WHEN tactic_id IS NOT NULL THEN 1 ELSE 0 END) "
"+ (CASE WHEN technique_id IS NOT NULL THEN 1 ELSE 0 END) "
"+ (CASE WHEN subtechnique_id IS NOT NULL THEN 1 ELSE 0 END) = 1",
name="exactly_one_mitre_fk",
),
UniqueConstraint(
"test_template_id",
"tactic_id",
"technique_id",
"subtechnique_id",
name="uq_test_template_mitre_tag",
),
Index("ix_test_template_mitre_tags_template", "test_template_id"),
)
class ScenarioTemplate(Base, UuidPkMixin, TimestampMixin, SoftDeleteMixin):
__tablename__ = "scenario_templates"
name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
tests: Mapped[list["ScenarioTemplateTest"]] = relationship(
back_populates="scenario_template",
cascade="all, delete-orphan",
order_by="ScenarioTemplateTest.position",
)
__table_args__ = (
Index(
"ix_scenario_templates_active",
"deleted_at",
postgresql_where="deleted_at IS NULL",
),
Index("ix_scenario_templates_name", "name"),
)
class ScenarioTemplateTest(Base, UuidPkMixin):
"""Ordered membership of a test template inside a scenario template.
UUID PK + UNIQUE(scenario_template_id, position) lets the same test appear
multiple times at different positions (chained operations are common in
purple-team scenarios).
"""
__tablename__ = "scenario_template_tests"
scenario_template_id: Mapped[uuid.UUID] = mapped_column(
Uuid(as_uuid=True),
ForeignKey("scenario_templates.id", ondelete="CASCADE"),
nullable=False,
)
test_template_id: Mapped[uuid.UUID] = mapped_column(
Uuid(as_uuid=True),
ForeignKey("test_templates.id", ondelete="RESTRICT"),
nullable=False,
)
position: Mapped[int] = mapped_column(Integer, nullable=False)
scenario_template: Mapped[ScenarioTemplate] = relationship(back_populates="tests")
__table_args__ = (
UniqueConstraint(
"scenario_template_id",
"position",
name="uq_scenario_template_tests_position",
),
Index("ix_scenario_template_tests_scenario", "scenario_template_id"),
Index("ix_scenario_template_tests_test", "test_template_id"),
)