"""End-to-end auth flow integration test (live DB). Hits the Flask test client to exercise: - /setup with the install token - /auth/login + /auth/me - /auth/refresh (rotation) - /auth/logout (revocation, idempotency) - /auth/change-password (forces logout-all) - /invitations create + preview + accept - RBAC: non-admin gets 403 on admin endpoint The DB schema is left in place between tests; we use unique emails to avoid collisions across runs. The install token is force-minted at the start. """ from __future__ import annotations import json import secrets import uuid import pytest from sqlalchemy import text from app.core.install_token import regenerate_install_token from app.db.session import get_engine from app.main import create_app def _truncate_users(engine): """Wipe data so /setup has work to do. CASCADE handles dependent rows.""" with engine.begin() as conn: conn.execute( text( "TRUNCATE users, refresh_tokens, invitations, invitation_groups, " "user_groups, settings, groups RESTART IDENTITY CASCADE" ) ) @pytest.fixture(scope="module") def app(db_engine_or_skip): _truncate_users(db_engine_or_skip) flask_app = create_app() flask_app.config.update(TESTING=True) return flask_app @pytest.fixture() def client(app): return app.test_client() @pytest.fixture(scope="module") def install_token(db_engine_or_skip): return regenerate_install_token() def _unique_email(prefix: str) -> str: return f"{prefix}-{secrets.token_hex(4)}@metamorph.local" # -- /setup ------------------------------------------------------------------- def test_setup_status_starts_uncompleted(client): r = client.get("/api/v1/setup") assert r.status_code == 200 assert r.get_json()["completed"] is False def test_setup_creates_first_admin(client, install_token): email = _unique_email("admin") r = client.post( "/api/v1/setup", json={ "install_token": install_token, "email": email, "password": "AdminPass1234!", "display_name": "Init Admin", }, ) assert r.status_code == 201, r.get_data(as_text=True) body = r.get_json() assert "user_id" in body pytest.shared_admin = {"email": email, "password": "AdminPass1234!", "id": body["user_id"]} # type: ignore[attr-defined] def test_setup_status_now_completed(client): assert client.get("/api/v1/setup").get_json()["completed"] is True def test_setup_replay_is_blocked(client, install_token): # Token already consumed — a second call must be refused. r = client.post( "/api/v1/setup", json={ "install_token": install_token, "email": _unique_email("replay"), "password": "AdminPass1234!", }, ) assert r.status_code == 409 # -- /auth/login + /auth/me --------------------------------------------------- @pytest.fixture(scope="module") def admin_credentials(): return getattr(pytest, "shared_admin") # populated by test_setup_creates_first_admin def _login(client, email: str, password: str) -> tuple[str, dict]: r = client.post("/api/v1/auth/login", json={"email": email, "password": password}) assert r.status_code == 200, r.get_data(as_text=True) body = r.get_json() return body["access_token"], dict(r.headers) def test_login_and_me(client, admin_credentials): access, _ = _login(client, admin_credentials["email"], admin_credentials["password"]) r = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {access}"}) assert r.status_code == 200 me = r.get_json() assert me["email"] == admin_credentials["email"] assert me["is_admin"] is True assert "admin" in me["groups"] def test_login_with_wrong_password_returns_401(client, admin_credentials): r = client.post( "/api/v1/auth/login", json={"email": admin_credentials["email"], "password": "wrong"}, ) assert r.status_code == 401 assert r.get_json()["error"] == "invalid_credentials" def test_me_without_token_returns_401(client): r = client.get("/api/v1/auth/me") assert r.status_code == 401 # -- /auth/refresh rotation --------------------------------------------------- def test_refresh_rotates_and_old_token_is_revoked(client, admin_credentials): # Login fresh to get a refresh cookie on the test client. client.post( "/api/v1/auth/login", json={"email": admin_credentials["email"], "password": admin_credentials["password"]}, ) # First refresh — should succeed. r1 = client.post("/api/v1/auth/refresh") assert r1.status_code == 200, r1.get_data(as_text=True) new_access1 = r1.get_json()["access_token"] assert new_access1 # Second refresh — uses the rotated cookie automatically (test client persists cookies). r2 = client.post("/api/v1/auth/refresh") assert r2.status_code == 200 assert r2.get_json()["access_token"] != new_access1 def test_refresh_with_no_cookie_returns_401(client): fresh = client.application.test_client() # blank cookie jar r = fresh.post("/api/v1/auth/refresh") assert r.status_code == 401 # -- /auth/logout ------------------------------------------------------------- def test_logout_clears_cookie_and_is_idempotent(client, admin_credentials): client.post( "/api/v1/auth/login", json={"email": admin_credentials["email"], "password": admin_credentials["password"]}, ) r1 = client.post("/api/v1/auth/logout") assert r1.status_code == 200 # Second logout — no token, still 200. r2 = client.post("/api/v1/auth/logout") assert r2.status_code == 200 # -- /invitations ------------------------------------------------------------- def test_admin_creates_invitation_and_invitee_accepts(client, admin_credentials): access, _ = _login(client, admin_credentials["email"], admin_credentials["password"]) inv_email = _unique_email("alice") create = client.post( "/api/v1/invitations", headers={"Authorization": f"Bearer {access}"}, json={"email_hint": inv_email}, ) assert create.status_code == 201, create.get_data(as_text=True) token = create.get_json()["token"] preview = client.get(f"/api/v1/invitations/preview/{token}") assert preview.status_code == 200 assert preview.get_json()["is_valid"] is True accept = client.post( f"/api/v1/invitations/accept/{token}", json={"email": inv_email, "password": "AlicePass1234!", "display_name": "Alice"}, ) assert accept.status_code == 201, accept.get_data(as_text=True) # Alice can now log in. a_access, _ = _login(client, inv_email, "AlicePass1234!") me = client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {a_access}"}).get_json() assert me["is_admin"] is False assert me["email"] == inv_email def test_unauthenticated_cannot_create_invitation(client): r = client.post("/api/v1/invitations", json={}) assert r.status_code == 401 def test_non_admin_cannot_create_invitation(client, admin_credentials): # Create a non-admin user via invitation, then try as them. access, _ = _login(client, admin_credentials["email"], admin_credentials["password"]) inv_email = _unique_email("bob") create = client.post( "/api/v1/invitations", headers={"Authorization": f"Bearer {access}"}, json={"email_hint": inv_email}, ) token = create.get_json()["token"] client.post( f"/api/v1/invitations/accept/{token}", json={"email": inv_email, "password": "BobPass1234!"}, ) bob_access, _ = _login(client, inv_email, "BobPass1234!") r = client.post( "/api/v1/invitations", headers={"Authorization": f"Bearer {bob_access}"}, json={}, ) assert r.status_code == 403 def test_used_invitation_cannot_be_accepted_twice(client, admin_credentials): access, _ = _login(client, admin_credentials["email"], admin_credentials["password"]) inv_email = _unique_email("carol") token = client.post( "/api/v1/invitations", headers={"Authorization": f"Bearer {access}"}, json={"email_hint": inv_email}, ).get_json()["token"] first = client.post( f"/api/v1/invitations/accept/{token}", json={"email": inv_email, "password": "CarolPass1234!"}, ) assert first.status_code == 201 second = client.post( f"/api/v1/invitations/accept/{token}", json={"email": _unique_email("carol2"), "password": "OtherPass1234!"}, ) assert second.status_code == 410 assert second.get_json()["error"] == "invitation_consumed" # -- change-password forces logout-all ---------------------------------------- def test_change_password_revokes_all_refresh_tokens(client, admin_credentials): access, _ = _login(client, admin_credentials["email"], admin_credentials["password"]) # Trigger a couple of refreshes so we have multiple chains in DB. client.post("/api/v1/auth/refresh") client.post("/api/v1/auth/refresh") # Change password. new_pw = "AdminPass5678!" r = client.post( "/api/v1/auth/change-password", headers={"Authorization": f"Bearer {access}"}, json={ "current_password": admin_credentials["password"], "new_password": new_pw, }, ) assert r.status_code == 200 admin_credentials["password"] = new_pw # Existing refresh cookie must now be rejected. r2 = client.post("/api/v1/auth/refresh") assert r2.status_code == 401 # New login still works. _login(client, admin_credentials["email"], admin_credentials["password"])