Files

345 lines
12 KiB
Python
Raw Permalink Normal View History

feat(m3): RBAC — atomic perms, groups, users, admin SPA pages Permission catalogue (services/permissions_seed.py) - 31 atomic codes across 10 families: user.*, group.*, invitation.*, test_template.*, scenario_template.*, mission.* (incl. mission.write_red_fields + mission.write_blue_fields), detection_level.{read,update}, setting.{read,update}, mitre.sync. - Default bindings: admin = all 31; redteam = 8 (catalogue read + mission. {read,create,update,archive,write_red_fields} + detection_level.read); blueteam = 5 (catalogue read + mission.{read,write_blue_fields} + detection_level.read). - Seed runs at boot AND after /setup so a freshly truncated DB (via /diag/reset) gets the bindings back via the bootstrap path. Idempotent + additive (never removes a perm from a system group). Users admin (services/users.py + api/users.py) - list (q + is_active filter + pagination), get, patch (display_name / locale / is_active with tri-state sentinel for clear-vs-unset), soft-delete, set groups. - Last-admin protection on update (deactivate), delete, and group-strip (refusing to remove the admin group from the last active admin). Groups admin (services/groups.py + api/groups.py) - Full CRUD with system-group protection (no rename, no delete on admin/redteam/blueteam). - PUT /groups/{id}/permissions sets the perm list. - Admin system group's perm set is locked to the full catalogue (SystemGroupProtected → 409) — preserves the bypass invariant even if a future refactor moves to perm-based checks. Permissions read-only (api/permissions.py) - GET /permissions returns the catalogue (admin or group.read holders). /diag/reset extension - After truncate + token mint, the limiter is also reset (limiter.reset()) so the Playwright suite doesn't hit 10/min budgets across spec files. Guarded by limiter.enabled to no-op in APP_ENV=test. Rate-limit scope (core/rate_limit.py) - enabled = APP_ENV in ("prod", "staging"). A staging deployment serves humans, so it gets the limits too. Dev/test stay unthrottled for Playwright ergonomics. Spec §6 NF-security is an operator-facing requirement. Frontend chrome - components/RequireAdmin.tsx + ui/Modal.tsx (reusable centered dialog with accessible name + Escape + backdrop-click). - Layout.tsx shows Admin nav links only when is_admin === true. Server remains the arbiter — non-admins hitting /admin/* get redirected to /. Frontend pages - pages/AdminUsersPage.tsx, AdminGroupsPage.tsx, AdminInvitationsPage.tsx with edit modals using TanStack Query mutations + multi-select for perms grouped by family + copy-once invitation URL display. - lib/admin.ts: shared types + query keys + groupPermsByFamily helper. - lib/api.ts: apiPatch / apiPut / apiDelete added. Playwright config (e2e/playwright.config.ts) - workers: 1 + fullyParallel: false: spec files share the live Postgres, so concurrent /diag/reset calls clobber each other. Intra-file order preserved via test.describe.configure({ mode: 'serial' }). Testing - backend/tests/test_rbac.py: 15 integration tests (39 backend total — 1 health + 8 schema + 15 auth + 15 RBAC). - e2e/tests/m3-rbac.spec.ts: 8 Playwright tests covering DoD §10 #2/#3 (28 e2e total — 8 M0 + 4 M1 + 8 M2 + 8 M3). - tasks/testing-m3.md. DoD: make test-api → 39 passed, make e2e → 28 passed. Spec-reviewer pass applied (admin perm invariant + staging rate-limit scope). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 06:17:07 +02:00
"""Integration tests for M3: permission seed + users/groups/permissions APIs.
Exercises the Flask test client against a live Postgres. The DB is wiped at
module load so test ordering inside the module matters (see `pytest.shared_*`).
"""
from __future__ import annotations
import secrets
import pytest
from sqlalchemy import text
from app.core.install_token import regenerate_install_token
from app.main import create_app
from app.services.permissions_seed import PERMISSION_CATALOGUE
def _truncate_all(engine):
"""Wipe data plus permissions table. CASCADE handles dependent rows."""
with engine.begin() as conn:
conn.execute(
text(
"TRUNCATE users, refresh_tokens, invitations, invitation_groups, "
"user_groups, group_permissions, permissions, settings, groups "
"RESTART IDENTITY CASCADE"
)
)
@pytest.fixture(scope="module")
def app(db_engine_or_skip):
_truncate_all(db_engine_or_skip)
flask_app = create_app() # triggers bootstrap → seed_all()
flask_app.config.update(TESTING=True)
return flask_app
@pytest.fixture()
def client(app):
return app.test_client()
def _unique_email(prefix: str) -> str:
return f"{prefix}-{secrets.token_hex(4)}@metamorph.local"
def _login(client, email: str, password: str) -> str:
r = client.post("/api/v1/auth/login", json={"email": email, "password": password})
assert r.status_code == 200, r.get_data(as_text=True)
return r.get_json()["access_token"]
# -- M3.1 — Permissions seeded at boot -----------------------------------------
def test_permissions_catalogue_seeded(client):
"""Catalogue table has every code from PERMISSION_CATALOGUE."""
# We need an admin to call /permissions — bootstrap one via /setup.
token = regenerate_install_token()
email = _unique_email("admin")
r = client.post(
"/api/v1/setup",
json={
"install_token": token,
"email": email,
"password": "AdminPass1234!",
"display_name": "Admin",
},
)
assert r.status_code == 201, r.get_data(as_text=True)
pytest.shared_admin = {"email": email, "password": "AdminPass1234!", "user_id": r.get_json()["user_id"]} # type: ignore[attr-defined]
access = _login(client, email, "AdminPass1234!")
perms = client.get(
"/api/v1/permissions", headers={"Authorization": f"Bearer {access}"}
).get_json()
codes = {p["code"] for p in perms["items"]}
expected = {p.code for p in PERMISSION_CATALOGUE}
assert expected.issubset(codes)
def test_admin_group_has_every_permission(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
groups = client.get(
"/api/v1/groups", headers={"Authorization": f"Bearer {access}"}
).get_json()
admin_group = next(g for g in groups["items"] if g["name"] == "admin")
assert set(admin_group["permissions"]) == {p.code for p in PERMISSION_CATALOGUE}
assert admin_group["is_system"] is True
def test_redteam_group_has_red_perms_but_not_blue_write(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
groups = client.get(
"/api/v1/groups", headers={"Authorization": f"Bearer {access}"}
).get_json()
redteam = next(g for g in groups["items"] if g["name"] == "redteam")
assert "mission.write_red_fields" in redteam["permissions"]
assert "mission.write_blue_fields" not in redteam["permissions"]
assert "mission.create" in redteam["permissions"]
def test_blueteam_group_has_blue_perm_but_not_red_write(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
groups = client.get(
"/api/v1/groups", headers={"Authorization": f"Bearer {access}"}
).get_json()
blueteam = next(g for g in groups["items"] if g["name"] == "blueteam")
assert "mission.write_blue_fields" in blueteam["permissions"]
assert "mission.write_red_fields" not in blueteam["permissions"]
assert "mission.create" not in blueteam["permissions"]
# -- M3.2 — Users CRUD ---------------------------------------------------------
def _invite_user(client, admin_access: str, email: str, password: str, group_ids: list[str] | None = None) -> str:
"""Create + accept an invitation, return the new user's id."""
create = client.post(
"/api/v1/invitations",
headers={"Authorization": f"Bearer {admin_access}"},
json={"email_hint": email, "group_ids": group_ids or []},
)
assert create.status_code == 201, create.get_data(as_text=True)
token = create.get_json()["token"]
accept = client.post(
f"/api/v1/invitations/accept/{token}",
json={"email": email, "password": password, "display_name": email.split("@")[0]},
)
assert accept.status_code == 201, accept.get_data(as_text=True)
return accept.get_json()["user_id"]
def test_admin_lists_users(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
r = client.get("/api/v1/users", headers={"Authorization": f"Bearer {access}"})
assert r.status_code == 200
body = r.get_json()
assert body["total"] >= 1
emails = [u["email"] for u in body["items"]]
assert admin["email"] in emails
def test_admin_updates_a_user(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
bob_email = _unique_email("bob")
bob_id = _invite_user(client, access, bob_email, "BobPass1234!")
r = client.patch(
f"/api/v1/users/{bob_id}",
headers={"Authorization": f"Bearer {access}"},
json={"display_name": "Robert", "locale": "en"},
)
assert r.status_code == 200, r.get_data(as_text=True)
body = r.get_json()
assert body["display_name"] == "Robert"
assert body["locale"] == "en"
def test_admin_soft_deletes_a_user(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
target_email = _unique_email("ghost")
target_id = _invite_user(client, access, target_email, "GhostPass1234!")
r = client.delete(
f"/api/v1/users/{target_id}",
headers={"Authorization": f"Bearer {access}"},
)
assert r.status_code == 200, r.get_data(as_text=True)
# Listing must not return the deleted user by default.
listing = client.get(
"/api/v1/users", headers={"Authorization": f"Bearer {access}"}
).get_json()
assert target_email not in [u["email"] for u in listing["items"]]
def test_last_admin_cannot_be_deleted(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
r = client.delete(
f"/api/v1/users/{admin['user_id']}",
headers={"Authorization": f"Bearer {access}"},
)
assert r.status_code == 409
assert r.get_json()["error"] == "last_admin_protected"
def test_last_admin_cannot_be_deactivated(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
r = client.patch(
f"/api/v1/users/{admin['user_id']}",
headers={"Authorization": f"Bearer {access}"},
json={"is_active": False},
)
assert r.status_code == 409
assert r.get_json()["error"] == "last_admin_protected"
# -- M3.3 — Groups CRUD --------------------------------------------------------
def test_admin_creates_custom_group_and_assigns_perms(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
# Create the group
create = client.post(
"/api/v1/groups",
headers={"Authorization": f"Bearer {access}"},
json={"name": f"pentest-{secrets.token_hex(3)}", "description": "Test group"},
)
assert create.status_code == 201, create.get_data(as_text=True)
gid = create.get_json()["id"]
# Attach mission.read + mission.write_red_fields only
r = client.put(
f"/api/v1/groups/{gid}/permissions",
headers={"Authorization": f"Bearer {access}"},
json={"codes": ["mission.read", "mission.write_red_fields"]},
)
assert r.status_code == 200, r.get_data(as_text=True)
assert set(r.get_json()["permissions"]) == {"mission.read", "mission.write_red_fields"}
def test_system_group_cannot_be_renamed_or_deleted(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
groups = client.get(
"/api/v1/groups", headers={"Authorization": f"Bearer {access}"}
).get_json()
admin_group = next(g for g in groups["items"] if g["name"] == "admin")
rename = client.patch(
f"/api/v1/groups/{admin_group['id']}",
headers={"Authorization": f"Bearer {access}"},
json={"name": "superadmin"},
)
assert rename.status_code == 409
assert rename.get_json()["error"] == "system_group_protected"
delete = client.delete(
f"/api/v1/groups/{admin_group['id']}",
headers={"Authorization": f"Bearer {access}"},
)
assert delete.status_code == 409
assert delete.get_json()["error"] == "system_group_protected"
def test_setting_unknown_permission_code_returns_400(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
create = client.post(
"/api/v1/groups",
headers={"Authorization": f"Bearer {access}"},
json={"name": f"bad-perms-{secrets.token_hex(3)}", "description": None},
)
gid = create.get_json()["id"]
r = client.put(
f"/api/v1/groups/{gid}/permissions",
headers={"Authorization": f"Bearer {access}"},
json={"codes": ["bogus.permission"]},
)
assert r.status_code == 400
# -- M3 user ↔ group assignment ------------------------------------------------
def test_admin_assigns_user_to_custom_group_and_perms_apply(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
# Create a custom group that *only* grants user.read.
gname = f"readers-{secrets.token_hex(3)}"
group = client.post(
"/api/v1/groups",
headers={"Authorization": f"Bearer {access}"},
json={"name": gname, "description": None},
).get_json()
client.put(
f"/api/v1/groups/{group['id']}/permissions",
headers={"Authorization": f"Bearer {access}"},
json={"codes": ["user.read"]},
)
# Invite Dave, attach the new group via /users/{id}/groups.
dave_email = _unique_email("dave")
dave_id = _invite_user(client, access, dave_email, "DavePass1234!")
r = client.put(
f"/api/v1/users/{dave_id}/groups",
headers={"Authorization": f"Bearer {access}"},
json={"group_ids": [group["id"]]},
)
assert r.status_code == 200, r.get_data(as_text=True)
# Dave can now list users (user.read) but cannot create a group (group.create).
dave_access = _login(client, dave_email, "DavePass1234!")
can_read = client.get(
"/api/v1/users", headers={"Authorization": f"Bearer {dave_access}"}
)
assert can_read.status_code == 200
cannot_create_group = client.post(
"/api/v1/groups",
headers={"Authorization": f"Bearer {dave_access}"},
json={"name": "wont-happen", "description": None},
)
assert cannot_create_group.status_code == 403
def test_last_admin_cannot_lose_admin_membership(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
r = client.put(
f"/api/v1/users/{admin['user_id']}/groups",
headers={"Authorization": f"Bearer {access}"},
json={"group_ids": []},
)
assert r.status_code == 409
assert r.get_json()["error"] == "last_admin_protected"
# -- Permission enforcement ----------------------------------------------------
def test_non_admin_without_user_read_gets_403_on_users_list(client):
admin = pytest.shared_admin
access = _login(client, admin["email"], admin["password"])
# Invite Eve with no groups → no perms.
eve_email = _unique_email("eve")
_invite_user(client, access, eve_email, "EvePass1234!", group_ids=[])
eve_access = _login(client, eve_email, "EvePass1234!")
r = client.get("/api/v1/users", headers={"Authorization": f"Bearer {eve_access}"})
assert r.status_code == 403