"""End-to-end coverage of sprint 2: - rt_lead creates an rt_operator via `POST /api/v1/users`. - rt_lead assigns the operator to engagement A (`POST /engagements/A/members`). - The operator signs in. - The operator's `GET /engagements` listing shows A and NOT B. - The operator's `GET /engagements/B` returns 404 (MA6: same 404 as if B didn't exist — anti-leak). - The operator's `GET /engagements/B/members` returns 404 too (anti-leak). - The audit log records the chain (`user.create`, `engagement_member.add`, `auth.login`). """ from __future__ import annotations from uuid import UUID import pytest from mimic.auth.password import hash_password from mimic.db.models import Engagement, Group, User, UserGroup from mimic.db.types import C2Type, EngagementStatus, UserType from mimic.rbac.matrix import GroupName pytestmark = pytest.mark.integration def _ensure_group(db, name: GroupName, description: str = "") -> Group: group = db.session.query(Group).filter_by(name=name.value).first() if group is None: group = Group(name=name.value, description=description) db.session.add(group) db.session.flush() return group def _seed_rt_lead(db, email: str, password: str) -> UUID: group = _ensure_group(db, GroupName.RT_LEAD, "Red team lead") user = User( email=email, display_name="Lead", type=UserType.RT_LEAD, local_password_hash=hash_password(password, rounds=4), ) db.session.add(user) db.session.flush() db.session.add(UserGroup(user_id=user.id, group_id=group.id, engagement_id=None)) db.session.commit() return user.id def _ensure_operator_group(db) -> None: _ensure_group(db, GroupName.RT_OPERATOR, "Red team operator") def _create_engagement(db, client_name: str) -> UUID: engagement = Engagement( client_name=client_name, c2_type=C2Type.MYTHIC, status=EngagementStatus.DRAFT ) db.session.add(engagement) db.session.commit() return engagement.id def test_lead_creates_operator_assigns_engagement_a_scope_isolates(app, client) -> None: from mimic.extensions import db # noqa: PLC0415 with app.app_context(): _seed_rt_lead(db, "lead@example.org", "lead-secret-1") _ensure_operator_group(db) engagement_a = _create_engagement(db, "Acme A") engagement_b = _create_engagement(db, "Acme B") # rt_lead logs in. response = client.post( "/api/v1/auth/login", json={"username": "lead@example.org", "password": "lead-secret-1"}, ) assert response.status_code == 200 # rt_lead creates an rt_operator user. response = client.post( "/api/v1/users", json={ "email": "op@example.org", "password": "operator-pw-1", "type": "rt_operator", "display_name": "Op One", }, ) assert response.status_code == 201, response.get_json() operator_id = response.get_json()["id"] # rt_lead assigns the operator to engagement A only. response = client.post( f"/api/v1/engagements/{engagement_a}/members", json={"user_id": operator_id, "role": "binôme"}, ) assert response.status_code == 201 # rt_lead listing /users contains the operator. response = client.get("/api/v1/users?type=rt_operator") assert response.status_code == 200 body = response.get_json() assert body["total"] == 1 assert body["items"][0]["email"] == "op@example.org" # Logout the lead and log in as the operator. client.post("/api/v1/auth/logout") response = client.post( "/api/v1/auth/login", json={"username": "op@example.org", "password": "operator-pw-1"}, ) assert response.status_code == 200 op_payload = response.get_json() assert op_payload["role"] == "rt_operator" # /engagements lists only A. response = client.get("/api/v1/engagements") assert response.status_code == 200 listing = response.get_json() ids = {row["id"] for row in listing} assert ids == {str(engagement_a)} # /engagements/B → 404 (MA6 — anti-leak: same response as a non-existent id). response = client.get(f"/api/v1/engagements/{engagement_b}") assert response.status_code == 404 assert response.get_json()["error"] == "not_found" # /engagements/B/members → 404 too (spec-analyst anti-enum requirement). response = client.get(f"/api/v1/engagements/{engagement_b}/members") assert response.status_code == 404 # /engagements/A is reachable for the operator. response = client.get(f"/api/v1/engagements/{engagement_a}") assert response.status_code == 200 # /engagements/A/members is reachable for the operator (they are a member). response = client.get(f"/api/v1/engagements/{engagement_a}/members") assert response.status_code == 200 members = response.get_json() assert len(members) == 1 assert members[0]["user_id"] == operator_id assert members[0]["role"] == "binôme" # The operator cannot list /users (no USER_MANAGE permission). response = client.get("/api/v1/users") assert response.status_code == 403 def test_user_management_lead_only(app, client) -> None: """USER_MANAGE is rt_lead-only (D-015). An operator with a session gets a clean 403 — and an anonymous request gets a 401.""" from mimic.extensions import db # noqa: PLC0415 with app.app_context(): _seed_rt_lead(db, "lead2@example.org", "lead2-secret-1") _ensure_operator_group(db) client.post( "/api/v1/auth/login", json={"username": "lead2@example.org", "password": "lead2-secret-1"}, ) # Create an operator user. response = client.post( "/api/v1/users", json={ "email": "op2@example.org", "password": "operator-pw-1", "type": "rt_operator", }, ) assert response.status_code == 201 client.post("/api/v1/auth/logout") # Anonymous → 401. response = client.get("/api/v1/users") assert response.status_code == 401 # Operator session → 403. client.post( "/api/v1/auth/login", json={"username": "op2@example.org", "password": "operator-pw-1"}, ) response = client.get("/api/v1/users") assert response.status_code == 403 def test_create_user_email_taken_returns_409(app, client) -> None: from mimic.extensions import db # noqa: PLC0415 with app.app_context(): _seed_rt_lead(db, "lead3@example.org", "lead3-secret-1") _ensure_operator_group(db) client.post( "/api/v1/auth/login", json={"username": "lead3@example.org", "password": "lead3-secret-1"}, ) body = { "email": "dup@example.org", "password": "longenough", "type": "rt_operator", } assert client.post("/api/v1/users", json=body).status_code == 201 response = client.post("/api/v1/users", json=body) assert response.status_code == 409 payload = response.get_json() assert payload["error"] == "email_taken" def test_audit_log_endpoint_is_lead_only_and_paginates(app, client) -> None: from mimic.extensions import db # noqa: PLC0415 with app.app_context(): _seed_rt_lead(db, "lead4@example.org", "lead4-secret-1") _ensure_operator_group(db) client.post( "/api/v1/auth/login", json={"username": "lead4@example.org", "password": "lead4-secret-1"}, ) # Generate some audit activity. for i in range(3): client.post( "/api/v1/users", json={ "email": f"audit-op-{i}@example.org", "password": "longenough", "type": "rt_operator", }, ) response = client.get("/api/v1/audit/log?page_size=2&action=user.create") assert response.status_code == 200 body = response.get_json() assert body["page_size"] == 2 assert body["page"] == 1 assert len(body["items"]) == 2 assert body["total"] >= 3 assert all(entry["action"] == "user.create" for entry in body["items"]) # Operator session → 403. client.post("/api/v1/auth/logout") client.post( "/api/v1/auth/login", json={"username": "audit-op-0@example.org", "password": "longenough"}, ) response = client.get("/api/v1/audit/log") assert response.status_code == 403 def test_disable_user_blocks_future_login(app, client) -> None: from mimic.extensions import db # noqa: PLC0415 with app.app_context(): _seed_rt_lead(db, "lead5@example.org", "lead5-secret-1") _ensure_operator_group(db) client.post( "/api/v1/auth/login", json={"username": "lead5@example.org", "password": "lead5-secret-1"}, ) response = client.post( "/api/v1/users", json={ "email": "soon-disabled@example.org", "password": "longenough", "type": "rt_operator", }, ) assert response.status_code == 201 user_id = response.get_json()["id"] response = client.delete(f"/api/v1/users/{user_id}") assert response.status_code == 204 client.post("/api/v1/auth/logout") response = client.post( "/api/v1/auth/login", json={"username": "soon-disabled@example.org", "password": "longenough"}, ) # Disabled accounts return the same uniform envelope as bad credentials. assert response.status_code == 401 assert response.get_json()["error"] == "invalid_credentials"