Milestone 3

This commit is contained in:
Knacky
2026-05-11 06:05:27 +02:00
commit 4c25e198fc
125 changed files with 13489 additions and 0 deletions

View File

25
backend/tests/conftest.py Normal file
View File

@@ -0,0 +1,25 @@
"""Shared pytest fixtures.
The DB integration tests need a reachable Postgres on the URL configured in
`app.core.config.settings`. They are skipped automatically when the DB isn't up,
so unit tests still pass on a developer's bare laptop.
"""
from __future__ import annotations
import pytest
from sqlalchemy.exc import OperationalError
from app.db.session import get_engine
@pytest.fixture(scope="session")
def db_engine_or_skip():
"""Yield the SQLAlchemy engine, skipping the test if the DB is unreachable."""
engine = get_engine()
try:
with engine.connect() as conn:
conn.execute.__self__ # touch the connection
except OperationalError as e:
pytest.skip(f"Postgres unreachable: {e}", allow_module_level=False)
return engine

View File

@@ -0,0 +1,299 @@
"""End-to-end auth flow integration test (live DB).
Hits the Flask test client to exercise:
- /setup with the install token
- /auth/login + /auth/me
- /auth/refresh (rotation)
- /auth/logout (revocation, idempotency)
- /auth/change-password (forces logout-all)
- /invitations create + preview + accept
- RBAC: non-admin gets 403 on admin endpoint
The DB schema is left in place between tests; we use unique emails to avoid
collisions across runs. The install token is force-minted at the start.
"""
from __future__ import annotations
import json
import secrets
import uuid
import pytest
from sqlalchemy import text
from app.core.install_token import regenerate_install_token
from app.db.session import get_engine
from app.main import create_app
def _truncate_users(engine):
"""Wipe data so /setup has work to do. CASCADE handles dependent rows."""
with engine.begin() as conn:
conn.execute(
text(
"TRUNCATE users, refresh_tokens, invitations, invitation_groups, "
"user_groups, settings, groups RESTART IDENTITY CASCADE"
)
)
@pytest.fixture(scope="module")
def app(db_engine_or_skip):
_truncate_users(db_engine_or_skip)
flask_app = create_app()
flask_app.config.update(TESTING=True)
return flask_app
@pytest.fixture()
def client(app):
return app.test_client()
@pytest.fixture(scope="module")
def install_token(db_engine_or_skip):
return regenerate_install_token()
def _unique_email(prefix: str) -> str:
return f"{prefix}-{secrets.token_hex(4)}@metamorph.local"
# -- /setup -------------------------------------------------------------------
def test_setup_status_starts_uncompleted(client):
r = client.get("/api/v1/setup")
assert r.status_code == 200
assert r.get_json()["completed"] is False
def test_setup_creates_first_admin(client, install_token):
email = _unique_email("admin")
r = client.post(
"/api/v1/setup",
json={
"install_token": install_token,
"email": email,
"password": "AdminPass1234!",
"display_name": "Init Admin",
},
)
assert r.status_code == 201, r.get_data(as_text=True)
body = r.get_json()
assert "user_id" in body
pytest.shared_admin = {"email": email, "password": "AdminPass1234!", "id": body["user_id"]} # type: ignore[attr-defined]
def test_setup_status_now_completed(client):
assert client.get("/api/v1/setup").get_json()["completed"] is True
def test_setup_replay_is_blocked(client, install_token):
# Token already consumed — a second call must be refused.
r = client.post(
"/api/v1/setup",
json={
"install_token": install_token,
"email": _unique_email("replay"),
"password": "AdminPass1234!",
},
)
assert r.status_code == 409
# -- /auth/login + /auth/me ---------------------------------------------------
@pytest.fixture(scope="module")
def admin_credentials():
return getattr(pytest, "shared_admin") # populated by test_setup_creates_first_admin
def _login(client, email: str, password: str) -> tuple[str, dict]:
r = client.post("/api/v1/auth/login", json={"email": email, "password": password})
assert r.status_code == 200, r.get_data(as_text=True)
body = r.get_json()
return body["access_token"], dict(r.headers)
def test_login_and_me(client, admin_credentials):
access, _ = _login(client, admin_credentials["email"], admin_credentials["password"])
r = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {access}"})
assert r.status_code == 200
me = r.get_json()
assert me["email"] == admin_credentials["email"]
assert me["is_admin"] is True
assert "admin" in me["groups"]
def test_login_with_wrong_password_returns_401(client, admin_credentials):
r = client.post(
"/api/v1/auth/login",
json={"email": admin_credentials["email"], "password": "wrong"},
)
assert r.status_code == 401
assert r.get_json()["error"] == "invalid_credentials"
def test_me_without_token_returns_401(client):
r = client.get("/api/v1/auth/me")
assert r.status_code == 401
# -- /auth/refresh rotation ---------------------------------------------------
def test_refresh_rotates_and_old_token_is_revoked(client, admin_credentials):
# Login fresh to get a refresh cookie on the test client.
client.post(
"/api/v1/auth/login",
json={"email": admin_credentials["email"], "password": admin_credentials["password"]},
)
# First refresh — should succeed.
r1 = client.post("/api/v1/auth/refresh")
assert r1.status_code == 200, r1.get_data(as_text=True)
new_access1 = r1.get_json()["access_token"]
assert new_access1
# Second refresh — uses the rotated cookie automatically (test client persists cookies).
r2 = client.post("/api/v1/auth/refresh")
assert r2.status_code == 200
assert r2.get_json()["access_token"] != new_access1
def test_refresh_with_no_cookie_returns_401(client):
fresh = client.application.test_client() # blank cookie jar
r = fresh.post("/api/v1/auth/refresh")
assert r.status_code == 401
# -- /auth/logout -------------------------------------------------------------
def test_logout_clears_cookie_and_is_idempotent(client, admin_credentials):
client.post(
"/api/v1/auth/login",
json={"email": admin_credentials["email"], "password": admin_credentials["password"]},
)
r1 = client.post("/api/v1/auth/logout")
assert r1.status_code == 200
# Second logout — no token, still 200.
r2 = client.post("/api/v1/auth/logout")
assert r2.status_code == 200
# -- /invitations -------------------------------------------------------------
def test_admin_creates_invitation_and_invitee_accepts(client, admin_credentials):
access, _ = _login(client, admin_credentials["email"], admin_credentials["password"])
inv_email = _unique_email("alice")
create = client.post(
"/api/v1/invitations",
headers={"Authorization": f"Bearer {access}"},
json={"email_hint": inv_email},
)
assert create.status_code == 201, create.get_data(as_text=True)
token = create.get_json()["token"]
preview = client.get(f"/api/v1/invitations/preview/{token}")
assert preview.status_code == 200
assert preview.get_json()["is_valid"] is True
accept = client.post(
f"/api/v1/invitations/accept/{token}",
json={"email": inv_email, "password": "AlicePass1234!", "display_name": "Alice"},
)
assert accept.status_code == 201, accept.get_data(as_text=True)
# Alice can now log in.
a_access, _ = _login(client, inv_email, "AlicePass1234!")
me = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {a_access}"}).get_json()
assert me["is_admin"] is False
assert me["email"] == inv_email
def test_unauthenticated_cannot_create_invitation(client):
r = client.post("/api/v1/invitations", json={})
assert r.status_code == 401
def test_non_admin_cannot_create_invitation(client, admin_credentials):
# Create a non-admin user via invitation, then try as them.
access, _ = _login(client, admin_credentials["email"], admin_credentials["password"])
inv_email = _unique_email("bob")
create = client.post(
"/api/v1/invitations",
headers={"Authorization": f"Bearer {access}"},
json={"email_hint": inv_email},
)
token = create.get_json()["token"]
client.post(
f"/api/v1/invitations/accept/{token}",
json={"email": inv_email, "password": "BobPass1234!"},
)
bob_access, _ = _login(client, inv_email, "BobPass1234!")
r = client.post(
"/api/v1/invitations",
headers={"Authorization": f"Bearer {bob_access}"},
json={},
)
assert r.status_code == 403
def test_used_invitation_cannot_be_accepted_twice(client, admin_credentials):
access, _ = _login(client, admin_credentials["email"], admin_credentials["password"])
inv_email = _unique_email("carol")
token = client.post(
"/api/v1/invitations",
headers={"Authorization": f"Bearer {access}"},
json={"email_hint": inv_email},
).get_json()["token"]
first = client.post(
f"/api/v1/invitations/accept/{token}",
json={"email": inv_email, "password": "CarolPass1234!"},
)
assert first.status_code == 201
second = client.post(
f"/api/v1/invitations/accept/{token}",
json={"email": _unique_email("carol2"), "password": "OtherPass1234!"},
)
assert second.status_code == 410
assert second.get_json()["error"] == "invitation_consumed"
# -- change-password forces logout-all ----------------------------------------
def test_change_password_revokes_all_refresh_tokens(client, admin_credentials):
access, _ = _login(client, admin_credentials["email"], admin_credentials["password"])
# Trigger a couple of refreshes so we have multiple chains in DB.
client.post("/api/v1/auth/refresh")
client.post("/api/v1/auth/refresh")
# Change password.
new_pw = "AdminPass5678!"
r = client.post(
"/api/v1/auth/change-password",
headers={"Authorization": f"Bearer {access}"},
json={
"current_password": admin_credentials["password"],
"new_password": new_pw,
},
)
assert r.status_code == 200
admin_credentials["password"] = new_pw
# Existing refresh cookie must now be rejected.
r2 = client.post("/api/v1/auth/refresh")
assert r2.status_code == 401
# New login still works.
_login(client, admin_credentials["email"], admin_credentials["password"])

View File

@@ -0,0 +1,14 @@
"""M0 smoke test: the /api/v1/health endpoint returns 200 and the expected payload."""
from __future__ import annotations
from app.main import app
def test_health_returns_ok():
client = app.test_client()
resp = client.get("/api/v1/health")
assert resp.status_code == 200
body = resp.get_json()
assert body["status"] == "ok"
assert "version" in body

344
backend/tests/test_rbac.py Normal file
View File

@@ -0,0 +1,344 @@
"""Integration tests for M3: permission seed + users/groups/permissions APIs.
Exercises the Flask test client against a live Postgres. The DB is wiped at
module load so test ordering inside the module matters (see `pytest.shared_*`).
"""
from __future__ import annotations
import secrets
import pytest
from sqlalchemy import text
from app.core.install_token import regenerate_install_token
from app.main import create_app
from app.services.permissions_seed import PERMISSION_CATALOGUE
def _truncate_all(engine):
"""Wipe data plus permissions table. CASCADE handles dependent rows."""
with engine.begin() as conn:
conn.execute(
text(
"TRUNCATE users, refresh_tokens, invitations, invitation_groups, "
"user_groups, group_permissions, permissions, settings, groups "
"RESTART IDENTITY CASCADE"
)
)
@pytest.fixture(scope="module")
def app(db_engine_or_skip):
_truncate_all(db_engine_or_skip)
flask_app = create_app() # triggers bootstrap → seed_all()
flask_app.config.update(TESTING=True)
return flask_app
@pytest.fixture()
def client(app):
return app.test_client()
def _unique_email(prefix: str) -> str:
return f"{prefix}-{secrets.token_hex(4)}@metamorph.local"
def _login(client, email: str, password: str) -> str:
r = client.post("/api/v1/auth/login", json={"email": email, "password": password})
assert r.status_code == 200, r.get_data(as_text=True)
return r.get_json()["access_token"]
# -- M3.1 — Permissions seeded at boot -----------------------------------------
def test_permissions_catalogue_seeded(client):
"""Catalogue table has every code from PERMISSION_CATALOGUE."""
# We need an admin to call /permissions — bootstrap one via /setup.
token = regenerate_install_token()
email = _unique_email("admin")
r = client.post(
"/api/v1/setup",
json={
"install_token": token,
"email": email,
"password": "AdminPass1234!",
"display_name": "Admin",
},
)
assert r.status_code == 201, r.get_data(as_text=True)
pytest.shared_admin = {"email": email, "password": "AdminPass1234!", "user_id": r.get_json()["user_id"]} # type: ignore[attr-defined]
access = _login(client, email, "AdminPass1234!")
perms = client.get(
"/api/v1/permissions", headers={"Authorization": f"Bearer {access}"}
).get_json()
codes = {p["code"] for p in perms["items"]}
expected = {p.code for p in PERMISSION_CATALOGUE}
assert expected.issubset(codes)
def test_admin_group_has_every_permission(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
groups = client.get(
"/api/v1/groups", headers={"Authorization": f"Bearer {access}"}
).get_json()
admin_group = next(g for g in groups["items"] if g["name"] == "admin")
assert set(admin_group["permissions"]) == {p.code for p in PERMISSION_CATALOGUE}
assert admin_group["is_system"] is True
def test_redteam_group_has_red_perms_but_not_blue_write(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
groups = client.get(
"/api/v1/groups", headers={"Authorization": f"Bearer {access}"}
).get_json()
redteam = next(g for g in groups["items"] if g["name"] == "redteam")
assert "mission.write_red_fields" in redteam["permissions"]
assert "mission.write_blue_fields" not in redteam["permissions"]
assert "mission.create" in redteam["permissions"]
def test_blueteam_group_has_blue_perm_but_not_red_write(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
groups = client.get(
"/api/v1/groups", headers={"Authorization": f"Bearer {access}"}
).get_json()
blueteam = next(g for g in groups["items"] if g["name"] == "blueteam")
assert "mission.write_blue_fields" in blueteam["permissions"]
assert "mission.write_red_fields" not in blueteam["permissions"]
assert "mission.create" not in blueteam["permissions"]
# -- M3.2 — Users CRUD ---------------------------------------------------------
def _invite_user(client, admin_access: str, email: str, password: str, group_ids: list[str] | None = None) -> str:
"""Create + accept an invitation, return the new user's id."""
create = client.post(
"/api/v1/invitations",
headers={"Authorization": f"Bearer {admin_access}"},
json={"email_hint": email, "group_ids": group_ids or []},
)
assert create.status_code == 201, create.get_data(as_text=True)
token = create.get_json()["token"]
accept = client.post(
f"/api/v1/invitations/accept/{token}",
json={"email": email, "password": password, "display_name": email.split("@")[0]},
)
assert accept.status_code == 201, accept.get_data(as_text=True)
return accept.get_json()["user_id"]
def test_admin_lists_users(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
r = client.get("/api/v1/users", headers={"Authorization": f"Bearer {access}"})
assert r.status_code == 200
body = r.get_json()
assert body["total"] >= 1
emails = [u["email"] for u in body["items"]]
assert admin["email"] in emails
def test_admin_updates_a_user(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
bob_email = _unique_email("bob")
bob_id = _invite_user(client, access, bob_email, "BobPass1234!")
r = client.patch(
f"/api/v1/users/{bob_id}",
headers={"Authorization": f"Bearer {access}"},
json={"display_name": "Robert", "locale": "en"},
)
assert r.status_code == 200, r.get_data(as_text=True)
body = r.get_json()
assert body["display_name"] == "Robert"
assert body["locale"] == "en"
def test_admin_soft_deletes_a_user(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
target_email = _unique_email("ghost")
target_id = _invite_user(client, access, target_email, "GhostPass1234!")
r = client.delete(
f"/api/v1/users/{target_id}",
headers={"Authorization": f"Bearer {access}"},
)
assert r.status_code == 200, r.get_data(as_text=True)
# Listing must not return the deleted user by default.
listing = client.get(
"/api/v1/users", headers={"Authorization": f"Bearer {access}"}
).get_json()
assert target_email not in [u["email"] for u in listing["items"]]
def test_last_admin_cannot_be_deleted(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
r = client.delete(
f"/api/v1/users/{admin['user_id']}",
headers={"Authorization": f"Bearer {access}"},
)
assert r.status_code == 409
assert r.get_json()["error"] == "last_admin_protected"
def test_last_admin_cannot_be_deactivated(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
r = client.patch(
f"/api/v1/users/{admin['user_id']}",
headers={"Authorization": f"Bearer {access}"},
json={"is_active": False},
)
assert r.status_code == 409
assert r.get_json()["error"] == "last_admin_protected"
# -- M3.3 — Groups CRUD --------------------------------------------------------
def test_admin_creates_custom_group_and_assigns_perms(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
# Create the group
create = client.post(
"/api/v1/groups",
headers={"Authorization": f"Bearer {access}"},
json={"name": f"pentest-{secrets.token_hex(3)}", "description": "Test group"},
)
assert create.status_code == 201, create.get_data(as_text=True)
gid = create.get_json()["id"]
# Attach mission.read + mission.write_red_fields only
r = client.put(
f"/api/v1/groups/{gid}/permissions",
headers={"Authorization": f"Bearer {access}"},
json={"codes": ["mission.read", "mission.write_red_fields"]},
)
assert r.status_code == 200, r.get_data(as_text=True)
assert set(r.get_json()["permissions"]) == {"mission.read", "mission.write_red_fields"}
def test_system_group_cannot_be_renamed_or_deleted(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
groups = client.get(
"/api/v1/groups", headers={"Authorization": f"Bearer {access}"}
).get_json()
admin_group = next(g for g in groups["items"] if g["name"] == "admin")
rename = client.patch(
f"/api/v1/groups/{admin_group['id']}",
headers={"Authorization": f"Bearer {access}"},
json={"name": "superadmin"},
)
assert rename.status_code == 409
assert rename.get_json()["error"] == "system_group_protected"
delete = client.delete(
f"/api/v1/groups/{admin_group['id']}",
headers={"Authorization": f"Bearer {access}"},
)
assert delete.status_code == 409
assert delete.get_json()["error"] == "system_group_protected"
def test_setting_unknown_permission_code_returns_400(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
create = client.post(
"/api/v1/groups",
headers={"Authorization": f"Bearer {access}"},
json={"name": f"bad-perms-{secrets.token_hex(3)}", "description": None},
)
gid = create.get_json()["id"]
r = client.put(
f"/api/v1/groups/{gid}/permissions",
headers={"Authorization": f"Bearer {access}"},
json={"codes": ["bogus.permission"]},
)
assert r.status_code == 400
# -- M3 user ↔ group assignment ------------------------------------------------
def test_admin_assigns_user_to_custom_group_and_perms_apply(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
# Create a custom group that *only* grants user.read.
gname = f"readers-{secrets.token_hex(3)}"
group = client.post(
"/api/v1/groups",
headers={"Authorization": f"Bearer {access}"},
json={"name": gname, "description": None},
).get_json()
client.put(
f"/api/v1/groups/{group['id']}/permissions",
headers={"Authorization": f"Bearer {access}"},
json={"codes": ["user.read"]},
)
# Invite Dave, attach the new group via /users/{id}/groups.
dave_email = _unique_email("dave")
dave_id = _invite_user(client, access, dave_email, "DavePass1234!")
r = client.put(
f"/api/v1/users/{dave_id}/groups",
headers={"Authorization": f"Bearer {access}"},
json={"group_ids": [group["id"]]},
)
assert r.status_code == 200, r.get_data(as_text=True)
# Dave can now list users (user.read) but cannot create a group (group.create).
dave_access = _login(client, dave_email, "DavePass1234!")
can_read = client.get(
"/api/v1/users", headers={"Authorization": f"Bearer {dave_access}"}
)
assert can_read.status_code == 200
cannot_create_group = client.post(
"/api/v1/groups",
headers={"Authorization": f"Bearer {dave_access}"},
json={"name": "wont-happen", "description": None},
)
assert cannot_create_group.status_code == 403
def test_last_admin_cannot_lose_admin_membership(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
r = client.put(
f"/api/v1/users/{admin['user_id']}/groups",
headers={"Authorization": f"Bearer {access}"},
json={"group_ids": []},
)
assert r.status_code == 409
assert r.get_json()["error"] == "last_admin_protected"
# -- Permission enforcement ----------------------------------------------------
def test_non_admin_without_user_read_gets_403_on_users_list(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
# Invite Eve with no groups → no perms.
eve_email = _unique_email("eve")
_invite_user(client, access, eve_email, "EvePass1234!", group_ids=[])
eve_access = _login(client, eve_email, "EvePass1234!")
r = client.get("/api/v1/users", headers={"Authorization": f"Bearer {eve_access}"})
assert r.status_code == 403

View File

@@ -0,0 +1,234 @@
"""M1 schema integration test.
Asserts that the migration has produced the expected tables, FK relations,
CHECK constraints, partial indexes, and that the alembic_version row is at head.
Skips automatically when Postgres is unreachable.
"""
from __future__ import annotations
import pytest
from sqlalchemy import inspect, text
EXPECTED_TABLES = {
# Auth / RBAC
"users",
"groups",
"permissions",
"user_groups",
"group_permissions",
"invitations",
"invitation_groups",
"refresh_tokens",
# MITRE
"mitre_tactics",
"mitre_techniques",
"mitre_subtechniques",
"mitre_technique_tactics",
# Templates
"test_templates",
"test_template_mitre_tags",
"scenario_templates",
"scenario_template_tests",
# Missions
"missions",
"mission_members",
"mission_scenarios",
"mission_tests",
"mission_test_mitre_tags",
"mission_categories",
# Evidence / settings / notifications
"evidence_files",
"settings",
"detection_levels",
"notifications",
# Alembic bookkeeping
"alembic_version",
}
# Tables that MUST carry a `deleted_at` column (soft delete).
SOFT_DELETE_TABLES = {
"users",
"groups",
"test_templates",
"scenario_templates",
"missions",
"mission_scenarios",
"mission_tests",
"mission_categories",
"evidence_files",
}
# Tables that MUST carry the standard `created_at` + `updated_at` pair.
TIMESTAMP_TABLES = {
"users",
"groups",
"test_templates",
"scenario_templates",
"missions",
"mission_scenarios",
"mission_tests",
"mission_categories",
"evidence_files",
}
# Spot-checked FK pairs (child_table, child_col, parent_table).
EXPECTED_FKS = {
("evidence_files", "mission_test_id", "mission_tests"),
("evidence_files", "uploaded_by_user_id", "users"),
("mission_members", "mission_id", "missions"),
("mission_members", "user_id", "users"),
("mission_scenarios", "mission_id", "missions"),
("mission_tests", "scenario_id", "mission_scenarios"),
("mission_tests", "detection_level_id", "detection_levels"),
("group_permissions", "group_id", "groups"),
("group_permissions", "permission_id", "permissions"),
("user_groups", "user_id", "users"),
("user_groups", "group_id", "groups"),
("refresh_tokens", "user_id", "users"),
("notifications", "user_id", "users"),
("mitre_subtechniques", "technique_id", "mitre_techniques"),
}
# CHECK constraint names we expect to see (namespace 'public' only).
# `mission_test_mitre_tags` deliberately lacks the exactly_one_mitre_fk check
# because it is denormalised — see app/models/mission.py docstring.
EXPECTED_CHECKS = {
"ck_missions_status_valid",
"ck_missions_visibility_mode_valid",
"ck_mission_tests_state_valid",
"ck_mission_tests_snapshot_opsec_level_valid",
"ck_test_templates_opsec_level_valid",
"ck_mission_members_role_hint_valid",
"ck_test_template_mitre_tags_mitre_kind_valid",
"ck_test_template_mitre_tags_exactly_one_mitre_fk",
"ck_mission_test_mitre_tags_mitre_kind_valid",
}
@pytest.fixture(scope="module")
def insp(db_engine_or_skip):
return inspect(db_engine_or_skip)
def test_all_expected_tables_exist(insp):
actual = set(insp.get_table_names(schema="public"))
missing = EXPECTED_TABLES - actual
assert not missing, f"missing tables: {sorted(missing)}"
def test_soft_delete_columns_present(insp):
for tbl in sorted(SOFT_DELETE_TABLES):
cols = {c["name"] for c in insp.get_columns(tbl)}
assert "deleted_at" in cols, f"{tbl} missing deleted_at"
def test_standard_timestamp_columns_present(insp):
for tbl in sorted(TIMESTAMP_TABLES):
cols = {c["name"] for c in insp.get_columns(tbl)}
assert "created_at" in cols, f"{tbl} missing created_at"
assert "updated_at" in cols, f"{tbl} missing updated_at"
def test_partial_index_for_soft_delete(db_engine_or_skip):
"""Each soft-delete table must carry an `ix_<table>_active` partial index."""
with db_engine_or_skip.connect() as conn:
rows = conn.execute(
text(
"SELECT indexname FROM pg_indexes "
"WHERE schemaname='public' AND indexdef ILIKE '%deleted_at IS NULL%'"
)
).all()
names = {r[0] for r in rows}
for tbl in SOFT_DELETE_TABLES:
assert f"ix_{tbl}_active" in names, f"{tbl}: partial index missing — got {names}"
def test_expected_foreign_keys(insp):
all_fks = set()
for tbl in EXPECTED_TABLES:
if tbl == "alembic_version":
continue
for fk in insp.get_foreign_keys(tbl):
for col in fk["constrained_columns"]:
all_fks.add((tbl, col, fk["referred_table"]))
for triple in EXPECTED_FKS:
assert triple in all_fks, f"missing FK: {triple}"
def test_expected_check_constraints(db_engine_or_skip):
with db_engine_or_skip.connect() as conn:
rows = conn.execute(
text(
"SELECT conname FROM pg_constraint "
"WHERE contype='c' AND connamespace = 'public'::regnamespace"
)
).all()
names = {r[0] for r in rows}
missing = EXPECTED_CHECKS - names
assert not missing, f"missing CHECK constraints: {sorted(missing)}"
def test_alembic_at_head(db_engine_or_skip):
"""The DB must be at the latest migration after `make migrate`."""
with db_engine_or_skip.connect() as conn:
rev = conn.execute(text("SELECT version_num FROM alembic_version")).scalar_one()
assert rev, "alembic_version is empty — migrate didn't run"
assert len(rev) >= 8, f"unexpected alembic version: {rev}"
def test_exactly_one_mitre_fk_check_enforced(db_engine_or_skip):
"""Inserting a tag with two non-null FKs must raise (CHECK constraint)."""
import uuid
from sqlalchemy.exc import IntegrityError
with db_engine_or_skip.begin() as conn:
# Seed a minimal test_template + tactic + technique to reference.
tmpl_id = uuid.uuid4()
tactic_id = uuid.uuid4()
technique_id = uuid.uuid4()
conn.execute(
text(
"INSERT INTO test_templates (id, name, opsec_level) "
"VALUES (:id, 'tmp', 'low')"
),
{"id": tmpl_id},
)
conn.execute(
text(
"INSERT INTO mitre_tactics (id, external_id, short_name, name) "
"VALUES (:id, 'TA0099', 'tmp', 'tmp')"
),
{"id": tactic_id},
)
conn.execute(
text(
"INSERT INTO mitre_techniques (id, external_id, name) "
"VALUES (:id, 'T9999', 'tmp')"
),
{"id": technique_id},
)
# Now try to insert a violating row — must fail.
with pytest.raises(IntegrityError):
with db_engine_or_skip.begin() as conn:
conn.execute(
text(
"INSERT INTO test_template_mitre_tags "
"(id, test_template_id, mitre_kind, tactic_id, technique_id) "
"VALUES (:id, :tmpl, 'tactic', :tac, :tech)"
),
{
"id": uuid.uuid4(),
"tmpl": tmpl_id,
"tac": tactic_id,
"tech": technique_id,
},
)
# Cleanup so the test is rerunnable.
with db_engine_or_skip.begin() as conn:
conn.execute(text("DELETE FROM mitre_techniques WHERE id = :id"), {"id": technique_id})
conn.execute(text("DELETE FROM mitre_tactics WHERE id = :id"), {"id": tactic_id})
conn.execute(text("DELETE FROM test_templates WHERE id = :id"), {"id": tmpl_id})