"""Integration tests for M3: permission seed + users/groups/permissions APIs. Exercises the Flask test client against a live Postgres. The DB is wiped at module load so test ordering inside the module matters (see `pytest.shared_*`). """ from __future__ import annotations import secrets import pytest from sqlalchemy import text from app.core.install_token import regenerate_install_token from app.main import create_app from app.services.permissions_seed import PERMISSION_CATALOGUE def _truncate_all(engine): """Wipe data plus permissions table. CASCADE handles dependent rows.""" with engine.begin() as conn: conn.execute( text( "TRUNCATE users, refresh_tokens, invitations, invitation_groups, " "user_groups, group_permissions, permissions, settings, groups " "RESTART IDENTITY CASCADE" ) ) @pytest.fixture(scope="module") def app(db_engine_or_skip): _truncate_all(db_engine_or_skip) flask_app = create_app() # triggers bootstrap → seed_all() flask_app.config.update(TESTING=True) return flask_app @pytest.fixture() def client(app): return app.test_client() def _unique_email(prefix: str) -> str: return f"{prefix}-{secrets.token_hex(4)}@metamorph.local" def _login(client, email: str, password: str) -> str: r = client.post("/api/v1/auth/login", json={"email": email, "password": password}) assert r.status_code == 200, r.get_data(as_text=True) return r.get_json()["access_token"] # -- M3.1 — Permissions seeded at boot ----------------------------------------- def test_permissions_catalogue_seeded(client): """Catalogue table has every code from PERMISSION_CATALOGUE.""" # We need an admin to call /permissions — bootstrap one via /setup. token = regenerate_install_token() email = _unique_email("admin") r = client.post( "/api/v1/setup", json={ "install_token": token, "email": email, "password": "AdminPass1234!", "display_name": "Admin", }, ) assert r.status_code == 201, r.get_data(as_text=True) pytest.shared_admin = {"email": email, "password": "AdminPass1234!", "user_id": r.get_json()["user_id"]} # type: ignore[attr-defined] access = _login(client, email, "AdminPass1234!") perms = client.get( "/api/v1/permissions", headers={"Authorization": f"Bearer {access}"} ).get_json() codes = {p["code"] for p in perms["items"]} expected = {p.code for p in PERMISSION_CATALOGUE} assert expected.issubset(codes) def test_admin_group_has_every_permission(client): admin = pytest.shared_admin access = _login(client, admin["email"], admin["password"]) groups = client.get( "/api/v1/groups", headers={"Authorization": f"Bearer {access}"} ).get_json() admin_group = next(g for g in groups["items"] if g["name"] == "admin") assert set(admin_group["permissions"]) == {p.code for p in PERMISSION_CATALOGUE} assert admin_group["is_system"] is True def test_redteam_group_has_red_perms_but_not_blue_write(client): admin = pytest.shared_admin access = _login(client, admin["email"], admin["password"]) groups = client.get( "/api/v1/groups", headers={"Authorization": f"Bearer {access}"} ).get_json() redteam = next(g for g in groups["items"] if g["name"] == "redteam") assert "mission.write_red_fields" in redteam["permissions"] assert "mission.write_blue_fields" not in redteam["permissions"] assert "mission.create" in redteam["permissions"] def test_blueteam_group_has_blue_perm_but_not_red_write(client): admin = pytest.shared_admin access = _login(client, admin["email"], admin["password"]) groups = client.get( "/api/v1/groups", headers={"Authorization": f"Bearer {access}"} ).get_json() blueteam = next(g for g in groups["items"] if g["name"] == "blueteam") assert "mission.write_blue_fields" in blueteam["permissions"] assert "mission.write_red_fields" not in blueteam["permissions"] assert "mission.create" not in blueteam["permissions"] # -- M3.2 — Users CRUD --------------------------------------------------------- def _invite_user(client, admin_access: str, email: str, password: str, group_ids: list[str] | None = None) -> str: """Create + accept an invitation, return the new user's id.""" create = client.post( "/api/v1/invitations", headers={"Authorization": f"Bearer {admin_access}"}, json={"email_hint": email, "group_ids": group_ids or []}, ) assert create.status_code == 201, create.get_data(as_text=True) token = create.get_json()["token"] accept = client.post( f"/api/v1/invitations/accept/{token}", json={"email": email, "password": password, "display_name": email.split("@")[0]}, ) assert accept.status_code == 201, accept.get_data(as_text=True) return accept.get_json()["user_id"] def test_admin_lists_users(client): admin = pytest.shared_admin access = _login(client, admin["email"], admin["password"]) r = client.get("/api/v1/users", headers={"Authorization": f"Bearer {access}"}) assert r.status_code == 200 body = r.get_json() assert body["total"] >= 1 emails = [u["email"] for u in body["items"]] assert admin["email"] in emails def test_admin_updates_a_user(client): admin = pytest.shared_admin access = _login(client, admin["email"], admin["password"]) bob_email = _unique_email("bob") bob_id = _invite_user(client, access, bob_email, "BobPass1234!") r = client.patch( f"/api/v1/users/{bob_id}", headers={"Authorization": f"Bearer {access}"}, json={"display_name": "Robert", "locale": "en"}, ) assert r.status_code == 200, r.get_data(as_text=True) body = r.get_json() assert body["display_name"] == "Robert" assert body["locale"] == "en" def test_admin_soft_deletes_a_user(client): admin = pytest.shared_admin access = _login(client, admin["email"], admin["password"]) target_email = _unique_email("ghost") target_id = _invite_user(client, access, target_email, "GhostPass1234!") r = client.delete( f"/api/v1/users/{target_id}", headers={"Authorization": f"Bearer {access}"}, ) assert r.status_code == 200, r.get_data(as_text=True) # Listing must not return the deleted user by default. listing = client.get( "/api/v1/users", headers={"Authorization": f"Bearer {access}"} ).get_json() assert target_email not in [u["email"] for u in listing["items"]] def test_last_admin_cannot_be_deleted(client): admin = pytest.shared_admin access = _login(client, admin["email"], admin["password"]) r = client.delete( f"/api/v1/users/{admin['user_id']}", headers={"Authorization": f"Bearer {access}"}, ) assert r.status_code == 409 assert r.get_json()["error"] == "last_admin_protected" def test_last_admin_cannot_be_deactivated(client): admin = pytest.shared_admin access = _login(client, admin["email"], admin["password"]) r = client.patch( f"/api/v1/users/{admin['user_id']}", headers={"Authorization": f"Bearer {access}"}, json={"is_active": False}, ) assert r.status_code == 409 assert r.get_json()["error"] == "last_admin_protected" # -- M3.3 — Groups CRUD -------------------------------------------------------- def test_admin_creates_custom_group_and_assigns_perms(client): admin = pytest.shared_admin access = _login(client, admin["email"], admin["password"]) # Create the group create = client.post( "/api/v1/groups", headers={"Authorization": f"Bearer {access}"}, json={"name": f"pentest-{secrets.token_hex(3)}", "description": "Test group"}, ) assert create.status_code == 201, create.get_data(as_text=True) gid = create.get_json()["id"] # Attach mission.read + mission.write_red_fields only r = client.put( f"/api/v1/groups/{gid}/permissions", headers={"Authorization": f"Bearer {access}"}, json={"codes": ["mission.read", "mission.write_red_fields"]}, ) assert r.status_code == 200, r.get_data(as_text=True) assert set(r.get_json()["permissions"]) == {"mission.read", "mission.write_red_fields"} def test_system_group_cannot_be_renamed_or_deleted(client): admin = pytest.shared_admin access = _login(client, admin["email"], admin["password"]) groups = client.get( "/api/v1/groups", headers={"Authorization": f"Bearer {access}"} ).get_json() admin_group = next(g for g in groups["items"] if g["name"] == "admin") rename = client.patch( f"/api/v1/groups/{admin_group['id']}", headers={"Authorization": f"Bearer {access}"}, json={"name": "superadmin"}, ) assert rename.status_code == 409 assert rename.get_json()["error"] == "system_group_protected" delete = client.delete( f"/api/v1/groups/{admin_group['id']}", headers={"Authorization": f"Bearer {access}"}, ) assert delete.status_code == 409 assert delete.get_json()["error"] == "system_group_protected" def test_setting_unknown_permission_code_returns_400(client): admin = pytest.shared_admin access = _login(client, admin["email"], admin["password"]) create = client.post( "/api/v1/groups", headers={"Authorization": f"Bearer {access}"}, json={"name": f"bad-perms-{secrets.token_hex(3)}", "description": None}, ) gid = create.get_json()["id"] r = client.put( f"/api/v1/groups/{gid}/permissions", headers={"Authorization": f"Bearer {access}"}, json={"codes": ["bogus.permission"]}, ) assert r.status_code == 400 # -- M3 user ↔ group assignment ------------------------------------------------ def test_admin_assigns_user_to_custom_group_and_perms_apply(client): admin = pytest.shared_admin access = _login(client, admin["email"], admin["password"]) # Create a custom group that *only* grants user.read. gname = f"readers-{secrets.token_hex(3)}" group = client.post( "/api/v1/groups", headers={"Authorization": f"Bearer {access}"}, json={"name": gname, "description": None}, ).get_json() client.put( f"/api/v1/groups/{group['id']}/permissions", headers={"Authorization": f"Bearer {access}"}, json={"codes": ["user.read"]}, ) # Invite Dave, attach the new group via /users/{id}/groups. dave_email = _unique_email("dave") dave_id = _invite_user(client, access, dave_email, "DavePass1234!") r = client.put( f"/api/v1/users/{dave_id}/groups", headers={"Authorization": f"Bearer {access}"}, json={"group_ids": [group["id"]]}, ) assert r.status_code == 200, r.get_data(as_text=True) # Dave can now list users (user.read) but cannot create a group (group.create). dave_access = _login(client, dave_email, "DavePass1234!") can_read = client.get( "/api/v1/users", headers={"Authorization": f"Bearer {dave_access}"} ) assert can_read.status_code == 200 cannot_create_group = client.post( "/api/v1/groups", headers={"Authorization": f"Bearer {dave_access}"}, json={"name": "wont-happen", "description": None}, ) assert cannot_create_group.status_code == 403 def test_last_admin_cannot_lose_admin_membership(client): admin = pytest.shared_admin access = _login(client, admin["email"], admin["password"]) r = client.put( f"/api/v1/users/{admin['user_id']}/groups", headers={"Authorization": f"Bearer {access}"}, json={"group_ids": []}, ) assert r.status_code == 409 assert r.get_json()["error"] == "last_admin_protected" # -- Permission enforcement ---------------------------------------------------- def test_non_admin_without_user_read_gets_403_on_users_list(client): admin = pytest.shared_admin access = _login(client, admin["email"], admin["password"]) # Invite Eve with no groups → no perms. eve_email = _unique_email("eve") _invite_user(client, access, eve_email, "EvePass1234!", group_ids=[]) eve_access = _login(client, eve_email, "EvePass1234!") r = client.get("/api/v1/users", headers={"Authorization": f"Bearer {eve_access}"}) assert r.status_code == 403