diff --git a/backend/tests/integration/test_user_mgmt_e2e.py b/backend/tests/integration/test_user_mgmt_e2e.py new file mode 100644 index 0000000..957409e --- /dev/null +++ b/backend/tests/integration/test_user_mgmt_e2e.py @@ -0,0 +1,284 @@ +"""End-to-end coverage of sprint 2: +- rt_lead creates an rt_operator via `POST /api/v1/users`. +- rt_lead assigns the operator to engagement A (`POST /engagements/A/members`). +- The operator signs in. +- The operator's `GET /engagements` listing shows A and NOT B. +- The operator's `GET /engagements/B` returns 404 (MA6: same 404 as if B + didn't exist — anti-leak). +- The operator's `GET /engagements/B/members` returns 404 too (anti-leak). +- The audit log records the chain (`user.create`, `engagement_member.add`, + `auth.login`). +""" + +from __future__ import annotations + +from uuid import UUID + +import pytest + +from mimic.auth.password import hash_password +from mimic.db.models import Engagement, Group, User, UserGroup +from mimic.db.types import C2Type, EngagementStatus, UserType +from mimic.rbac.matrix import GroupName + +pytestmark = pytest.mark.integration + + +def _ensure_group(db, name: GroupName, description: str = "") -> Group: + group = db.session.query(Group).filter_by(name=name.value).first() + if group is None: + group = Group(name=name.value, description=description) + db.session.add(group) + db.session.flush() + return group + + +def _seed_rt_lead(db, email: str, password: str) -> UUID: + group = _ensure_group(db, GroupName.RT_LEAD, "Red team lead") + user = User( + email=email, + display_name="Lead", + type=UserType.RT_LEAD, + local_password_hash=hash_password(password, rounds=4), + ) + db.session.add(user) + db.session.flush() + db.session.add(UserGroup(user_id=user.id, group_id=group.id, engagement_id=None)) + db.session.commit() + return user.id + + +def _ensure_operator_group(db) -> None: + _ensure_group(db, GroupName.RT_OPERATOR, "Red team operator") + + +def _create_engagement(db, client_name: str) -> UUID: + engagement = Engagement( + client_name=client_name, c2_type=C2Type.MYTHIC, status=EngagementStatus.DRAFT + ) + db.session.add(engagement) + db.session.commit() + return engagement.id + + +def test_lead_creates_operator_assigns_engagement_a_scope_isolates(app, client) -> None: + from mimic.extensions import db # noqa: PLC0415 + + with app.app_context(): + _seed_rt_lead(db, "lead@example.org", "lead-secret-1") + _ensure_operator_group(db) + engagement_a = _create_engagement(db, "Acme A") + engagement_b = _create_engagement(db, "Acme B") + + # rt_lead logs in. + response = client.post( + "/api/v1/auth/login", + json={"username": "lead@example.org", "password": "lead-secret-1"}, + ) + assert response.status_code == 200 + + # rt_lead creates an rt_operator user. + response = client.post( + "/api/v1/users", + json={ + "email": "op@example.org", + "password": "operator-pw-1", + "type": "rt_operator", + "display_name": "Op One", + }, + ) + assert response.status_code == 201, response.get_json() + operator_id = response.get_json()["id"] + + # rt_lead assigns the operator to engagement A only. + response = client.post( + f"/api/v1/engagements/{engagement_a}/members", + json={"user_id": operator_id, "role": "binôme"}, + ) + assert response.status_code == 201 + + # rt_lead listing /users contains the operator. + response = client.get("/api/v1/users?type=rt_operator") + assert response.status_code == 200 + body = response.get_json() + assert body["total"] == 1 + assert body["items"][0]["email"] == "op@example.org" + + # Logout the lead and log in as the operator. + client.post("/api/v1/auth/logout") + response = client.post( + "/api/v1/auth/login", + json={"username": "op@example.org", "password": "operator-pw-1"}, + ) + assert response.status_code == 200 + op_payload = response.get_json() + assert op_payload["role"] == "rt_operator" + + # /engagements lists only A. + response = client.get("/api/v1/engagements") + assert response.status_code == 200 + listing = response.get_json() + ids = {row["id"] for row in listing} + assert ids == {str(engagement_a)} + + # /engagements/B → 404 (MA6 — anti-leak: same response as a non-existent id). + response = client.get(f"/api/v1/engagements/{engagement_b}") + assert response.status_code == 404 + assert response.get_json()["error"] == "not_found" + + # /engagements/B/members → 404 too (spec-analyst anti-enum requirement). + response = client.get(f"/api/v1/engagements/{engagement_b}/members") + assert response.status_code == 404 + + # /engagements/A is reachable for the operator. + response = client.get(f"/api/v1/engagements/{engagement_a}") + assert response.status_code == 200 + + # /engagements/A/members is reachable for the operator (they are a member). + response = client.get(f"/api/v1/engagements/{engagement_a}/members") + assert response.status_code == 200 + members = response.get_json() + assert len(members) == 1 + assert members[0]["user_id"] == operator_id + assert members[0]["role"] == "binôme" + + # The operator cannot list /users (no USER_MANAGE permission). + response = client.get("/api/v1/users") + assert response.status_code == 403 + + +def test_user_management_lead_only(app, client) -> None: + """USER_MANAGE is rt_lead-only (D-015). An operator with a session gets + a clean 403 — and an anonymous request gets a 401.""" + from mimic.extensions import db # noqa: PLC0415 + + with app.app_context(): + _seed_rt_lead(db, "lead2@example.org", "lead2-secret-1") + _ensure_operator_group(db) + + client.post( + "/api/v1/auth/login", + json={"username": "lead2@example.org", "password": "lead2-secret-1"}, + ) + # Create an operator user. + response = client.post( + "/api/v1/users", + json={ + "email": "op2@example.org", + "password": "operator-pw-1", + "type": "rt_operator", + }, + ) + assert response.status_code == 201 + client.post("/api/v1/auth/logout") + + # Anonymous → 401. + response = client.get("/api/v1/users") + assert response.status_code == 401 + + # Operator session → 403. + client.post( + "/api/v1/auth/login", + json={"username": "op2@example.org", "password": "operator-pw-1"}, + ) + response = client.get("/api/v1/users") + assert response.status_code == 403 + + +def test_create_user_email_taken_returns_409(app, client) -> None: + from mimic.extensions import db # noqa: PLC0415 + + with app.app_context(): + _seed_rt_lead(db, "lead3@example.org", "lead3-secret-1") + _ensure_operator_group(db) + client.post( + "/api/v1/auth/login", + json={"username": "lead3@example.org", "password": "lead3-secret-1"}, + ) + + body = { + "email": "dup@example.org", + "password": "longenough", + "type": "rt_operator", + } + assert client.post("/api/v1/users", json=body).status_code == 201 + response = client.post("/api/v1/users", json=body) + assert response.status_code == 409 + payload = response.get_json() + assert payload["error"] == "email_taken" + + +def test_audit_log_endpoint_is_lead_only_and_paginates(app, client) -> None: + from mimic.extensions import db # noqa: PLC0415 + + with app.app_context(): + _seed_rt_lead(db, "lead4@example.org", "lead4-secret-1") + _ensure_operator_group(db) + + client.post( + "/api/v1/auth/login", + json={"username": "lead4@example.org", "password": "lead4-secret-1"}, + ) + # Generate some audit activity. + for i in range(3): + client.post( + "/api/v1/users", + json={ + "email": f"audit-op-{i}@example.org", + "password": "longenough", + "type": "rt_operator", + }, + ) + + response = client.get("/api/v1/audit/log?page_size=2&action=user.create") + assert response.status_code == 200 + body = response.get_json() + assert body["page_size"] == 2 + assert body["page"] == 1 + assert len(body["items"]) == 2 + assert body["total"] >= 3 + assert all(entry["action"] == "user.create" for entry in body["items"]) + + # Operator session → 403. + client.post("/api/v1/auth/logout") + client.post( + "/api/v1/auth/login", + json={"username": "audit-op-0@example.org", "password": "longenough"}, + ) + response = client.get("/api/v1/audit/log") + assert response.status_code == 403 + + +def test_disable_user_blocks_future_login(app, client) -> None: + from mimic.extensions import db # noqa: PLC0415 + + with app.app_context(): + _seed_rt_lead(db, "lead5@example.org", "lead5-secret-1") + _ensure_operator_group(db) + client.post( + "/api/v1/auth/login", + json={"username": "lead5@example.org", "password": "lead5-secret-1"}, + ) + + response = client.post( + "/api/v1/users", + json={ + "email": "soon-disabled@example.org", + "password": "longenough", + "type": "rt_operator", + }, + ) + assert response.status_code == 201 + user_id = response.get_json()["id"] + + response = client.delete(f"/api/v1/users/{user_id}") + assert response.status_code == 204 + + client.post("/api/v1/auth/logout") + response = client.post( + "/api/v1/auth/login", + json={"username": "soon-disabled@example.org", "password": "longenough"}, + ) + # Disabled accounts return the same uniform envelope as bad credentials. + assert response.status_code == 401 + assert response.get_json()["error"] == "invalid_credentials" diff --git a/backend/tests/unit/test_user_schemas.py b/backend/tests/unit/test_user_schemas.py new file mode 100644 index 0000000..1be2453 --- /dev/null +++ b/backend/tests/unit/test_user_schemas.py @@ -0,0 +1,94 @@ +"""User/member DTO validation (no DB).""" + +from __future__ import annotations + +from uuid import uuid4 + +import pytest +from pydantic import ValidationError + +from mimic.db.types import UserType +from mimic.schemas import EngagementMemberCreate, UserCreate, UserUpdate +from mimic.schemas.pagination import MAX_PAGE_SIZE, PageQuery + + +class TestUserCreate: + def test_minimal_valid_payload(self) -> None: + u = UserCreate.model_validate( + { + "email": "alice@example.org", + "password": "longenough", + "type": "rt_operator", + } + ) + assert u.email == "alice@example.org" + assert u.type is UserType.RT_OPERATOR + assert u.display_name is None + + def test_password_min_length(self) -> None: + with pytest.raises(ValidationError): + UserCreate.model_validate( + {"email": "a@b.c", "password": "short", "type": "rt_operator"} + ) + + def test_invalid_email_rejected(self) -> None: + with pytest.raises(ValidationError): + UserCreate.model_validate( + {"email": "not-an-email", "password": "longenough", "type": "rt_operator"} + ) + + def test_invalid_type_rejected(self) -> None: + with pytest.raises(ValidationError): + UserCreate.model_validate( + {"email": "a@b.c", "password": "longenough", "type": "not-a-role"} + ) + + +class TestUserUpdate: + def test_all_optional(self) -> None: + u = UserUpdate.model_validate({}) + assert u.display_name is None + assert u.password is None + assert u.type is None + + def test_password_validates_when_provided(self) -> None: + with pytest.raises(ValidationError): + UserUpdate.model_validate({"password": "tiny"}) + + +class TestEngagementMemberCreate: + def test_default_role(self) -> None: + member = EngagementMemberCreate.model_validate({"user_id": str(uuid4())}) + assert member.role == "member" + + def test_explicit_role(self) -> None: + member = EngagementMemberCreate.model_validate( + {"user_id": str(uuid4()), "role": "lead-on-mission"} + ) + assert member.role == "lead-on-mission" + + def test_role_max_length(self) -> None: + with pytest.raises(ValidationError): + EngagementMemberCreate.model_validate({"user_id": str(uuid4()), "role": "x" * 41}) + + +class TestPageQuery: + def test_defaults(self) -> None: + page = PageQuery.model_validate({}) + assert page.page == 1 + assert page.page_size == 50 + assert page.offset == 0 + assert page.limit == 50 + + def test_offset_arithmetic(self) -> None: + page = PageQuery.model_validate({"page": 4, "page_size": 25}) + assert page.offset == 75 + assert page.limit == 25 + + def test_page_size_clamped(self) -> None: + with pytest.raises(ValidationError): + PageQuery.model_validate({"page_size": MAX_PAGE_SIZE + 1}) + + def test_page_lower_bound(self) -> None: + with pytest.raises(ValidationError): + PageQuery.model_validate({"page": 0})