test(backend): sprint 2 unit + integration coverage

Unit (`tests/unit/test_user_schemas.py`):
- 4 tests on `UserCreate` (happy path, password min length, email
  validation, invalid type).
- 2 tests on `UserUpdate` (all-optional, password validation when set).
- 3 tests on `EngagementMemberCreate` (default `"member"`, explicit role,
  max-length 40).
- 4 tests on `PageQuery` (defaults, offset arithmetic, page_size cap,
  page lower bound).

Integration (`tests/integration/test_user_mgmt_e2e.py`, marked
`integration`):
- The critical MA6-in-practice flow: rt_lead creates rt_operator, assigns
  to engagement A, the operator signs in, lists engagements and sees only
  A, `GET /engagements/B` returns 404 (anti-leak), `GET /engagements/B/members`
  returns 404 too, `/engagements/A/members` is reachable, `GET /users` is
  forbidden for the operator.
- `USER_MANAGE` gate: anonymous → 401, operator session → 403,
  lead session → 200.
- 409 `email_taken` on duplicate `POST /users`.
- `/audit/log` is lead-only, paginates with `page_size`, filters by
  `?action=`.
- Disabling a user blocks subsequent logins (same uniform
  `invalid_credentials` envelope as for bad passwords — no enumeration
  leak of "this account was disabled").

74 unit tests pass (61 sprint 1 + 13 sprint 2); integration tests run on
the testcontainers Postgres fixture in CI.
This commit is contained in:
knacky
2026-05-23 15:53:35 +02:00
parent 9f75f119f0
commit 4bade795fd
2 changed files with 378 additions and 0 deletions

View File

@@ -0,0 +1,284 @@
"""End-to-end coverage of sprint 2:
- rt_lead creates an rt_operator via `POST /api/v1/users`.
- rt_lead assigns the operator to engagement A (`POST /engagements/A/members`).
- The operator signs in.
- The operator's `GET /engagements` listing shows A and NOT B.
- The operator's `GET /engagements/B` returns 404 (MA6: same 404 as if B
didn't exist — anti-leak).
- The operator's `GET /engagements/B/members` returns 404 too (anti-leak).
- The audit log records the chain (`user.create`, `engagement_member.add`,
`auth.login`).
"""
from __future__ import annotations
from uuid import UUID
import pytest
from mimic.auth.password import hash_password
from mimic.db.models import Engagement, Group, User, UserGroup
from mimic.db.types import C2Type, EngagementStatus, UserType
from mimic.rbac.matrix import GroupName
pytestmark = pytest.mark.integration
def _ensure_group(db, name: GroupName, description: str = "") -> Group:
group = db.session.query(Group).filter_by(name=name.value).first()
if group is None:
group = Group(name=name.value, description=description)
db.session.add(group)
db.session.flush()
return group
def _seed_rt_lead(db, email: str, password: str) -> UUID:
group = _ensure_group(db, GroupName.RT_LEAD, "Red team lead")
user = User(
email=email,
display_name="Lead",
type=UserType.RT_LEAD,
local_password_hash=hash_password(password, rounds=4),
)
db.session.add(user)
db.session.flush()
db.session.add(UserGroup(user_id=user.id, group_id=group.id, engagement_id=None))
db.session.commit()
return user.id
def _ensure_operator_group(db) -> None:
_ensure_group(db, GroupName.RT_OPERATOR, "Red team operator")
def _create_engagement(db, client_name: str) -> UUID:
engagement = Engagement(
client_name=client_name, c2_type=C2Type.MYTHIC, status=EngagementStatus.DRAFT
)
db.session.add(engagement)
db.session.commit()
return engagement.id
def test_lead_creates_operator_assigns_engagement_a_scope_isolates(app, client) -> None:
from mimic.extensions import db # noqa: PLC0415
with app.app_context():
_seed_rt_lead(db, "lead@example.org", "lead-secret-1")
_ensure_operator_group(db)
engagement_a = _create_engagement(db, "Acme A")
engagement_b = _create_engagement(db, "Acme B")
# rt_lead logs in.
response = client.post(
"/api/v1/auth/login",
json={"username": "lead@example.org", "password": "lead-secret-1"},
)
assert response.status_code == 200
# rt_lead creates an rt_operator user.
response = client.post(
"/api/v1/users",
json={
"email": "op@example.org",
"password": "operator-pw-1",
"type": "rt_operator",
"display_name": "Op One",
},
)
assert response.status_code == 201, response.get_json()
operator_id = response.get_json()["id"]
# rt_lead assigns the operator to engagement A only.
response = client.post(
f"/api/v1/engagements/{engagement_a}/members",
json={"user_id": operator_id, "role": "binôme"},
)
assert response.status_code == 201
# rt_lead listing /users contains the operator.
response = client.get("/api/v1/users?type=rt_operator")
assert response.status_code == 200
body = response.get_json()
assert body["total"] == 1
assert body["items"][0]["email"] == "op@example.org"
# Logout the lead and log in as the operator.
client.post("/api/v1/auth/logout")
response = client.post(
"/api/v1/auth/login",
json={"username": "op@example.org", "password": "operator-pw-1"},
)
assert response.status_code == 200
op_payload = response.get_json()
assert op_payload["role"] == "rt_operator"
# /engagements lists only A.
response = client.get("/api/v1/engagements")
assert response.status_code == 200
listing = response.get_json()
ids = {row["id"] for row in listing}
assert ids == {str(engagement_a)}
# /engagements/B → 404 (MA6 — anti-leak: same response as a non-existent id).
response = client.get(f"/api/v1/engagements/{engagement_b}")
assert response.status_code == 404
assert response.get_json()["error"] == "not_found"
# /engagements/B/members → 404 too (spec-analyst anti-enum requirement).
response = client.get(f"/api/v1/engagements/{engagement_b}/members")
assert response.status_code == 404
# /engagements/A is reachable for the operator.
response = client.get(f"/api/v1/engagements/{engagement_a}")
assert response.status_code == 200
# /engagements/A/members is reachable for the operator (they are a member).
response = client.get(f"/api/v1/engagements/{engagement_a}/members")
assert response.status_code == 200
members = response.get_json()
assert len(members) == 1
assert members[0]["user_id"] == operator_id
assert members[0]["role"] == "binôme"
# The operator cannot list /users (no USER_MANAGE permission).
response = client.get("/api/v1/users")
assert response.status_code == 403
def test_user_management_lead_only(app, client) -> None:
"""USER_MANAGE is rt_lead-only (D-015). An operator with a session gets
a clean 403 — and an anonymous request gets a 401."""
from mimic.extensions import db # noqa: PLC0415
with app.app_context():
_seed_rt_lead(db, "lead2@example.org", "lead2-secret-1")
_ensure_operator_group(db)
client.post(
"/api/v1/auth/login",
json={"username": "lead2@example.org", "password": "lead2-secret-1"},
)
# Create an operator user.
response = client.post(
"/api/v1/users",
json={
"email": "op2@example.org",
"password": "operator-pw-1",
"type": "rt_operator",
},
)
assert response.status_code == 201
client.post("/api/v1/auth/logout")
# Anonymous → 401.
response = client.get("/api/v1/users")
assert response.status_code == 401
# Operator session → 403.
client.post(
"/api/v1/auth/login",
json={"username": "op2@example.org", "password": "operator-pw-1"},
)
response = client.get("/api/v1/users")
assert response.status_code == 403
def test_create_user_email_taken_returns_409(app, client) -> None:
from mimic.extensions import db # noqa: PLC0415
with app.app_context():
_seed_rt_lead(db, "lead3@example.org", "lead3-secret-1")
_ensure_operator_group(db)
client.post(
"/api/v1/auth/login",
json={"username": "lead3@example.org", "password": "lead3-secret-1"},
)
body = {
"email": "dup@example.org",
"password": "longenough",
"type": "rt_operator",
}
assert client.post("/api/v1/users", json=body).status_code == 201
response = client.post("/api/v1/users", json=body)
assert response.status_code == 409
payload = response.get_json()
assert payload["error"] == "email_taken"
def test_audit_log_endpoint_is_lead_only_and_paginates(app, client) -> None:
from mimic.extensions import db # noqa: PLC0415
with app.app_context():
_seed_rt_lead(db, "lead4@example.org", "lead4-secret-1")
_ensure_operator_group(db)
client.post(
"/api/v1/auth/login",
json={"username": "lead4@example.org", "password": "lead4-secret-1"},
)
# Generate some audit activity.
for i in range(3):
client.post(
"/api/v1/users",
json={
"email": f"audit-op-{i}@example.org",
"password": "longenough",
"type": "rt_operator",
},
)
response = client.get("/api/v1/audit/log?page_size=2&action=user.create")
assert response.status_code == 200
body = response.get_json()
assert body["page_size"] == 2
assert body["page"] == 1
assert len(body["items"]) == 2
assert body["total"] >= 3
assert all(entry["action"] == "user.create" for entry in body["items"])
# Operator session → 403.
client.post("/api/v1/auth/logout")
client.post(
"/api/v1/auth/login",
json={"username": "audit-op-0@example.org", "password": "longenough"},
)
response = client.get("/api/v1/audit/log")
assert response.status_code == 403
def test_disable_user_blocks_future_login(app, client) -> None:
from mimic.extensions import db # noqa: PLC0415
with app.app_context():
_seed_rt_lead(db, "lead5@example.org", "lead5-secret-1")
_ensure_operator_group(db)
client.post(
"/api/v1/auth/login",
json={"username": "lead5@example.org", "password": "lead5-secret-1"},
)
response = client.post(
"/api/v1/users",
json={
"email": "soon-disabled@example.org",
"password": "longenough",
"type": "rt_operator",
},
)
assert response.status_code == 201
user_id = response.get_json()["id"]
response = client.delete(f"/api/v1/users/{user_id}")
assert response.status_code == 204
client.post("/api/v1/auth/logout")
response = client.post(
"/api/v1/auth/login",
json={"username": "soon-disabled@example.org", "password": "longenough"},
)
# Disabled accounts return the same uniform envelope as bad credentials.
assert response.status_code == 401
assert response.get_json()["error"] == "invalid_credentials"

View File

@@ -0,0 +1,94 @@
"""User/member DTO validation (no DB)."""
from __future__ import annotations
from uuid import uuid4
import pytest
from pydantic import ValidationError
from mimic.db.types import UserType
from mimic.schemas import EngagementMemberCreate, UserCreate, UserUpdate
from mimic.schemas.pagination import MAX_PAGE_SIZE, PageQuery
class TestUserCreate:
def test_minimal_valid_payload(self) -> None:
u = UserCreate.model_validate(
{
"email": "alice@example.org",
"password": "longenough",
"type": "rt_operator",
}
)
assert u.email == "alice@example.org"
assert u.type is UserType.RT_OPERATOR
assert u.display_name is None
def test_password_min_length(self) -> None:
with pytest.raises(ValidationError):
UserCreate.model_validate(
{"email": "a@b.c", "password": "short", "type": "rt_operator"}
)
def test_invalid_email_rejected(self) -> None:
with pytest.raises(ValidationError):
UserCreate.model_validate(
{"email": "not-an-email", "password": "longenough", "type": "rt_operator"}
)
def test_invalid_type_rejected(self) -> None:
with pytest.raises(ValidationError):
UserCreate.model_validate(
{"email": "a@b.c", "password": "longenough", "type": "not-a-role"}
)
class TestUserUpdate:
def test_all_optional(self) -> None:
u = UserUpdate.model_validate({})
assert u.display_name is None
assert u.password is None
assert u.type is None
def test_password_validates_when_provided(self) -> None:
with pytest.raises(ValidationError):
UserUpdate.model_validate({"password": "tiny"})
class TestEngagementMemberCreate:
def test_default_role(self) -> None:
member = EngagementMemberCreate.model_validate({"user_id": str(uuid4())})
assert member.role == "member"
def test_explicit_role(self) -> None:
member = EngagementMemberCreate.model_validate(
{"user_id": str(uuid4()), "role": "lead-on-mission"}
)
assert member.role == "lead-on-mission"
def test_role_max_length(self) -> None:
with pytest.raises(ValidationError):
EngagementMemberCreate.model_validate({"user_id": str(uuid4()), "role": "x" * 41})
class TestPageQuery:
def test_defaults(self) -> None:
page = PageQuery.model_validate({})
assert page.page == 1
assert page.page_size == 50
assert page.offset == 0
assert page.limit == 50
def test_offset_arithmetic(self) -> None:
page = PageQuery.model_validate({"page": 4, "page_size": 25})
assert page.offset == 75
assert page.limit == 25
def test_page_size_clamped(self) -> None:
with pytest.raises(ValidationError):
PageQuery.model_validate({"page_size": MAX_PAGE_SIZE + 1})
def test_page_lower_bound(self) -> None:
with pytest.raises(ValidationError):
PageQuery.model_validate({"page": 0})