Files
mimic-big/backend/tests/integration/test_user_mgmt_e2e.py

285 lines
9.3 KiB
Python
Raw Permalink Normal View History

"""End-to-end coverage of sprint 2:
- rt_lead creates an rt_operator via `POST /api/v1/users`.
- rt_lead assigns the operator to engagement A (`POST /engagements/A/members`).
- The operator signs in.
- The operator's `GET /engagements` listing shows A and NOT B.
- The operator's `GET /engagements/B` returns 404 (MA6: same 404 as if B
didn't exist — anti-leak).
- The operator's `GET /engagements/B/members` returns 404 too (anti-leak).
- The audit log records the chain (`user.create`, `engagement_member.add`,
`auth.login`).
"""
from __future__ import annotations
from uuid import UUID
import pytest
from mimic.auth.password import hash_password
from mimic.db.models import Engagement, Group, User, UserGroup
from mimic.db.types import C2Type, EngagementStatus, UserType
from mimic.rbac.matrix import GroupName
pytestmark = pytest.mark.integration
def _ensure_group(db, name: GroupName, description: str = "") -> Group:
group = db.session.query(Group).filter_by(name=name.value).first()
if group is None:
group = Group(name=name.value, description=description)
db.session.add(group)
db.session.flush()
return group
def _seed_rt_lead(db, email: str, password: str) -> UUID:
group = _ensure_group(db, GroupName.RT_LEAD, "Red team lead")
user = User(
email=email,
display_name="Lead",
type=UserType.RT_LEAD,
local_password_hash=hash_password(password, rounds=4),
)
db.session.add(user)
db.session.flush()
db.session.add(UserGroup(user_id=user.id, group_id=group.id, engagement_id=None))
db.session.commit()
return user.id
def _ensure_operator_group(db) -> None:
_ensure_group(db, GroupName.RT_OPERATOR, "Red team operator")
def _create_engagement(db, client_name: str) -> UUID:
engagement = Engagement(
client_name=client_name, c2_type=C2Type.MYTHIC, status=EngagementStatus.DRAFT
)
db.session.add(engagement)
db.session.commit()
return engagement.id
def test_lead_creates_operator_assigns_engagement_a_scope_isolates(app, client) -> None:
from mimic.extensions import db # noqa: PLC0415
with app.app_context():
_seed_rt_lead(db, "lead@example.org", "lead-secret-1")
_ensure_operator_group(db)
engagement_a = _create_engagement(db, "Acme A")
engagement_b = _create_engagement(db, "Acme B")
# rt_lead logs in.
response = client.post(
"/api/v1/auth/login",
json={"username": "lead@example.org", "password": "lead-secret-1"},
)
assert response.status_code == 200
# rt_lead creates an rt_operator user.
response = client.post(
"/api/v1/users",
json={
"email": "op@example.org",
"password": "operator-pw-1",
"type": "rt_operator",
"display_name": "Op One",
},
)
assert response.status_code == 201, response.get_json()
operator_id = response.get_json()["id"]
# rt_lead assigns the operator to engagement A only.
response = client.post(
f"/api/v1/engagements/{engagement_a}/members",
json={"user_id": operator_id, "role": "binôme"},
)
assert response.status_code == 201
# rt_lead listing /users contains the operator.
response = client.get("/api/v1/users?type=rt_operator")
assert response.status_code == 200
body = response.get_json()
assert body["total"] == 1
assert body["items"][0]["email"] == "op@example.org"
# Logout the lead and log in as the operator.
client.post("/api/v1/auth/logout")
response = client.post(
"/api/v1/auth/login",
json={"username": "op@example.org", "password": "operator-pw-1"},
)
assert response.status_code == 200
op_payload = response.get_json()
assert op_payload["role"] == "rt_operator"
# /engagements lists only A.
response = client.get("/api/v1/engagements")
assert response.status_code == 200
listing = response.get_json()
ids = {row["id"] for row in listing}
assert ids == {str(engagement_a)}
# /engagements/B → 404 (MA6 — anti-leak: same response as a non-existent id).
response = client.get(f"/api/v1/engagements/{engagement_b}")
assert response.status_code == 404
assert response.get_json()["error"] == "not_found"
# /engagements/B/members → 404 too (spec-analyst anti-enum requirement).
response = client.get(f"/api/v1/engagements/{engagement_b}/members")
assert response.status_code == 404
# /engagements/A is reachable for the operator.
response = client.get(f"/api/v1/engagements/{engagement_a}")
assert response.status_code == 200
# /engagements/A/members is reachable for the operator (they are a member).
response = client.get(f"/api/v1/engagements/{engagement_a}/members")
assert response.status_code == 200
members = response.get_json()
assert len(members) == 1
assert members[0]["user_id"] == operator_id
assert members[0]["role"] == "binôme"
# The operator cannot list /users (no USER_MANAGE permission).
response = client.get("/api/v1/users")
assert response.status_code == 403
def test_user_management_lead_only(app, client) -> None:
"""USER_MANAGE is rt_lead-only (D-015). An operator with a session gets
a clean 403 and an anonymous request gets a 401."""
from mimic.extensions import db # noqa: PLC0415
with app.app_context():
_seed_rt_lead(db, "lead2@example.org", "lead2-secret-1")
_ensure_operator_group(db)
client.post(
"/api/v1/auth/login",
json={"username": "lead2@example.org", "password": "lead2-secret-1"},
)
# Create an operator user.
response = client.post(
"/api/v1/users",
json={
"email": "op2@example.org",
"password": "operator-pw-1",
"type": "rt_operator",
},
)
assert response.status_code == 201
client.post("/api/v1/auth/logout")
# Anonymous → 401.
response = client.get("/api/v1/users")
assert response.status_code == 401
# Operator session → 403.
client.post(
"/api/v1/auth/login",
json={"username": "op2@example.org", "password": "operator-pw-1"},
)
response = client.get("/api/v1/users")
assert response.status_code == 403
def test_create_user_email_taken_returns_409(app, client) -> None:
from mimic.extensions import db # noqa: PLC0415
with app.app_context():
_seed_rt_lead(db, "lead3@example.org", "lead3-secret-1")
_ensure_operator_group(db)
client.post(
"/api/v1/auth/login",
json={"username": "lead3@example.org", "password": "lead3-secret-1"},
)
body = {
"email": "dup@example.org",
"password": "longenough",
"type": "rt_operator",
}
assert client.post("/api/v1/users", json=body).status_code == 201
response = client.post("/api/v1/users", json=body)
assert response.status_code == 409
payload = response.get_json()
assert payload["error"] == "email_taken"
def test_audit_log_endpoint_is_lead_only_and_paginates(app, client) -> None:
from mimic.extensions import db # noqa: PLC0415
with app.app_context():
_seed_rt_lead(db, "lead4@example.org", "lead4-secret-1")
_ensure_operator_group(db)
client.post(
"/api/v1/auth/login",
json={"username": "lead4@example.org", "password": "lead4-secret-1"},
)
# Generate some audit activity.
for i in range(3):
client.post(
"/api/v1/users",
json={
"email": f"audit-op-{i}@example.org",
"password": "longenough",
"type": "rt_operator",
},
)
response = client.get("/api/v1/audit/log?page_size=2&action=user.create")
assert response.status_code == 200
body = response.get_json()
assert body["page_size"] == 2
assert body["page"] == 1
assert len(body["items"]) == 2
assert body["total"] >= 3
assert all(entry["action"] == "user.create" for entry in body["items"])
# Operator session → 403.
client.post("/api/v1/auth/logout")
client.post(
"/api/v1/auth/login",
json={"username": "audit-op-0@example.org", "password": "longenough"},
)
response = client.get("/api/v1/audit/log")
assert response.status_code == 403
def test_disable_user_blocks_future_login(app, client) -> None:
from mimic.extensions import db # noqa: PLC0415
with app.app_context():
_seed_rt_lead(db, "lead5@example.org", "lead5-secret-1")
_ensure_operator_group(db)
client.post(
"/api/v1/auth/login",
json={"username": "lead5@example.org", "password": "lead5-secret-1"},
)
response = client.post(
"/api/v1/users",
json={
"email": "soon-disabled@example.org",
"password": "longenough",
"type": "rt_operator",
},
)
assert response.status_code == 201
user_id = response.get_json()["id"]
response = client.delete(f"/api/v1/users/{user_id}")
assert response.status_code == 204
client.post("/api/v1/auth/logout")
response = client.post(
"/api/v1/auth/login",
json={"username": "soon-disabled@example.org", "password": "longenough"},
)
# Disabled accounts return the same uniform envelope as bad credentials.
assert response.status_code == 401
assert response.get_json()["error"] == "invalid_credentials"