feat: sprint 1 — auth + CRUD engagements
Ship the first feature end-to-end on the UI: users log in with JWT,
admins manage user accounts, and any authenticated user (per RBAC)
can create, list, view, edit, and delete engagements.
Backend (Flask + SQLAlchemy + SQLite, 63 pytest)
- User / Engagement models, Alembic 0001 initial schema
- argon2 password hashing, JWT bearer (60-min TTL), @login_required
and @role_required decorators
- 13 API endpoints under /api/*, including last-admin protection on
DELETE/PATCH user and JSON 404 on unknown /api/* paths
- `flask create-admin` CLI with duplicate / short-password handling
Frontend (React + Vite + Tailwind + TanStack Query, 20 vitest)
- Inter font bundled locally (no CDN), DESIGN.md tokens in Tailwind
- LoginPage / EngagementsList / EngagementForm / EngagementDetail /
UsersAdmin pages with role-aware UI
- Layout, ProtectedRoute, StatusBadge, FormField, LoadingState,
ErrorState, EmptyState, Toast + provider
- Axios client: Bearer interceptor, 401 → purge + /login + "Session
expirée" toast, 403 → "Accès refusé" toast (declarative <Navigate>
for already-authed users, Fragment-keyed admin user rows)
Deployment
- Single multistage Dockerfile (node:20-alpine → python:3.12-slim)
- docker/entrypoint.sh runs `flask db upgrade` before `flask run`
- Makefile: build/start/stop/restart/update/logs/create-admin/
update-mitre/test-{backend,frontend,e2e}/clean
- .env.example documenting MIMIC_JWT_SECRET / MIMIC_DB_PATH / MIMIC_PORT
- SQLite at /data/mimic.sqlite on named volume mimic-data
Acceptance suite (Playwright, 36 tests, all 27 ACs)
- e2e/ scaffold with playwright.config + auth/api fixtures
- One spec per user story (us1-bootstrap through us6-deployment)
- Portable via MIMIC_CONTAINER_CMD / MIMIC_BASE_URL (docker or podman)
Docs
- README.md with quick-start and architecture overview
- CHANGELOG.md updated with Sprint 1 deliverables
- pyrightconfig.json so the Python LSP sees backend/.venv and
resolves the `backend.app.*` absolute imports
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
0
backend/__init__.py
Normal file
0
backend/__init__.py
Normal file
71
backend/app/__init__.py
Normal file
71
backend/app/__init__.py
Normal file
@@ -0,0 +1,71 @@
|
||||
"""Flask application factory."""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from flask import Flask, jsonify, send_from_directory
|
||||
|
||||
from backend.app.api import auth_bp, engagements_bp, users_bp
|
||||
from backend.app.cli import register_cli
|
||||
from backend.app.config import Config, TestConfig
|
||||
from backend.app.errors import register_error_handlers
|
||||
from backend.app.extensions import db, migrate
|
||||
|
||||
|
||||
def create_app(config_object: object | None = None) -> Flask:
|
||||
"""Application factory.
|
||||
|
||||
`config_object` is an *instance* (not a class) of Config or a subclass.
|
||||
If None, picks TestConfig when MIMIC_TESTING=1, otherwise Config.
|
||||
"""
|
||||
static_folder = str(Path(__file__).parent / "static")
|
||||
app = Flask(__name__, static_folder=static_folder, static_url_path="/static")
|
||||
|
||||
if config_object is None:
|
||||
config_object = TestConfig() if os.environ.get("MIMIC_TESTING") == "1" else Config()
|
||||
app.config.from_object(config_object)
|
||||
|
||||
db.init_app(app)
|
||||
migrations_dir = str(Path(__file__).parent.parent / "migrations")
|
||||
migrate.init_app(app, db, directory=migrations_dir)
|
||||
|
||||
# Ensure models are imported so Alembic/metadata see them.
|
||||
from backend.app import models # noqa: F401
|
||||
|
||||
app.register_blueprint(auth_bp)
|
||||
app.register_blueprint(users_bp)
|
||||
app.register_blueprint(engagements_bp)
|
||||
|
||||
register_error_handlers(app)
|
||||
register_cli(app)
|
||||
|
||||
static_root = Path(static_folder)
|
||||
|
||||
@app.get("/api/health")
|
||||
def health():
|
||||
return {"status": "ok"}, 200
|
||||
|
||||
# Serve the built frontend (Vite output copied into app/static at image build time).
|
||||
@app.get("/")
|
||||
def index():
|
||||
index_path = static_root / "index.html"
|
||||
if index_path.exists():
|
||||
return send_from_directory(static_folder, "index.html")
|
||||
return {"status": "ok", "message": "Mimic API running. Frontend not built."}, 200
|
||||
|
||||
@app.get("/<path:path>")
|
||||
def spa_fallback(path: str):
|
||||
# Unknown /api/* paths must stay JSON 404 — never shadowed by index.html.
|
||||
if path.startswith("api/"):
|
||||
return jsonify({"error": "Not found"}), 404
|
||||
# Serve static assets if present; otherwise hand back index.html for client routing.
|
||||
candidate = static_root / path
|
||||
if candidate.is_file():
|
||||
return send_from_directory(static_folder, path)
|
||||
index_path = static_root / "index.html"
|
||||
if index_path.exists():
|
||||
return send_from_directory(static_folder, "index.html")
|
||||
return jsonify({"error": "Not found"}), 404
|
||||
|
||||
return app
|
||||
6
backend/app/api/__init__.py
Normal file
6
backend/app/api/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""API blueprints."""
|
||||
from backend.app.api.auth import auth_bp
|
||||
from backend.app.api.engagements import engagements_bp
|
||||
from backend.app.api.users import users_bp
|
||||
|
||||
__all__ = ["auth_bp", "users_bp", "engagements_bp"]
|
||||
46
backend/app/api/auth.py
Normal file
46
backend/app/api/auth.py
Normal file
@@ -0,0 +1,46 @@
|
||||
"""Auth endpoints: login, logout, me."""
|
||||
from __future__ import annotations
|
||||
|
||||
from flask import Blueprint, g, jsonify, request
|
||||
|
||||
from backend.app.auth import encode_token, login_required, verify_password
|
||||
from backend.app.models import User
|
||||
from backend.app.serializers import serialize_user
|
||||
|
||||
auth_bp = Blueprint("auth", __name__, url_prefix="/api/auth")
|
||||
|
||||
|
||||
@auth_bp.post("/login")
|
||||
def login():
|
||||
data = request.get_json(silent=True) or {}
|
||||
username = (data.get("username") or "").strip()
|
||||
password = data.get("password") or ""
|
||||
|
||||
generic_error = (jsonify({"error": "Invalid credentials"}), 401)
|
||||
if not username or not password:
|
||||
return generic_error
|
||||
|
||||
user = User.query.filter_by(username=username).first()
|
||||
if user is None or not verify_password(user.password_hash, password):
|
||||
return generic_error
|
||||
|
||||
token = encode_token(user.id, user.role.value)
|
||||
return jsonify(
|
||||
{
|
||||
"access_token": token,
|
||||
"user": {"id": user.id, "username": user.username, "role": user.role.value},
|
||||
}
|
||||
), 200
|
||||
|
||||
|
||||
@auth_bp.post("/logout")
|
||||
@login_required
|
||||
def logout():
|
||||
# V1: stateless JWT — client discards the token. No server-side blacklist.
|
||||
return jsonify({"status": "ok"}), 200
|
||||
|
||||
|
||||
@auth_bp.get("/me")
|
||||
@login_required
|
||||
def me():
|
||||
return jsonify(serialize_user(g.current_user)), 200
|
||||
158
backend/app/api/engagements.py
Normal file
158
backend/app/api/engagements.py
Normal file
@@ -0,0 +1,158 @@
|
||||
"""Engagement CRUD endpoints."""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import date
|
||||
|
||||
from flask import Blueprint, g, jsonify, request
|
||||
|
||||
from backend.app.auth import login_required, role_required
|
||||
from backend.app.extensions import db
|
||||
from backend.app.models import Engagement, EngagementStatus
|
||||
from backend.app.serializers import serialize_engagement
|
||||
|
||||
engagements_bp = Blueprint("engagements", __name__, url_prefix="/api/engagements")
|
||||
|
||||
|
||||
def _parse_date(value: object) -> date | None:
|
||||
if not isinstance(value, str):
|
||||
return None
|
||||
try:
|
||||
return date.fromisoformat(value)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def _parse_status(value: object) -> EngagementStatus | None:
|
||||
if not isinstance(value, str):
|
||||
return None
|
||||
try:
|
||||
return EngagementStatus(value)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
@engagements_bp.get("")
|
||||
@login_required
|
||||
def list_engagements():
|
||||
items = Engagement.query.order_by(Engagement.id.asc()).all()
|
||||
return jsonify([serialize_engagement(e) for e in items]), 200
|
||||
|
||||
|
||||
@engagements_bp.post("")
|
||||
@role_required("admin", "redteam")
|
||||
def create_engagement():
|
||||
data = request.get_json(silent=True) or {}
|
||||
|
||||
name = (data.get("name") or "").strip()
|
||||
if not name:
|
||||
return jsonify({"error": "name is required"}), 400
|
||||
|
||||
start_raw = data.get("start_date")
|
||||
start_date = _parse_date(start_raw) if start_raw else None
|
||||
if start_date is None:
|
||||
return jsonify({"error": "start_date is required (YYYY-MM-DD)"}), 400
|
||||
|
||||
end_raw = data.get("end_date")
|
||||
end_date: date | None = None
|
||||
if end_raw:
|
||||
end_date = _parse_date(end_raw)
|
||||
if end_date is None:
|
||||
return jsonify({"error": "end_date must be YYYY-MM-DD"}), 400
|
||||
if end_date < start_date:
|
||||
return jsonify({"error": "end_date must be >= start_date"}), 400
|
||||
|
||||
status = EngagementStatus.PLANNED
|
||||
if "status" in data and data.get("status") is not None:
|
||||
parsed = _parse_status(data.get("status"))
|
||||
if parsed is None:
|
||||
return (
|
||||
jsonify({"error": "status must be one of: planned, active, closed"}),
|
||||
400,
|
||||
)
|
||||
status = parsed
|
||||
|
||||
engagement = Engagement(
|
||||
name=name,
|
||||
description=data.get("description"),
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
status=status,
|
||||
created_by_id=g.current_user.id,
|
||||
)
|
||||
db.session.add(engagement)
|
||||
db.session.commit()
|
||||
return jsonify(serialize_engagement(engagement)), 201
|
||||
|
||||
|
||||
@engagements_bp.get("/<int:engagement_id>")
|
||||
@login_required
|
||||
def get_engagement(engagement_id: int):
|
||||
engagement = db.session.get(Engagement, engagement_id)
|
||||
if engagement is None:
|
||||
return jsonify({"error": "Engagement not found"}), 404
|
||||
return jsonify(serialize_engagement(engagement)), 200
|
||||
|
||||
|
||||
@engagements_bp.patch("/<int:engagement_id>")
|
||||
@role_required("admin", "redteam")
|
||||
def update_engagement(engagement_id: int):
|
||||
engagement = db.session.get(Engagement, engagement_id)
|
||||
if engagement is None:
|
||||
return jsonify({"error": "Engagement not found"}), 404
|
||||
|
||||
data = request.get_json(silent=True) or {}
|
||||
|
||||
if "name" in data:
|
||||
name = (data.get("name") or "").strip()
|
||||
if not name:
|
||||
return jsonify({"error": "name must not be empty"}), 400
|
||||
engagement.name = name
|
||||
|
||||
if "description" in data:
|
||||
engagement.description = data.get("description")
|
||||
|
||||
new_start = engagement.start_date
|
||||
if "start_date" in data:
|
||||
parsed = _parse_date(data.get("start_date"))
|
||||
if parsed is None:
|
||||
return jsonify({"error": "start_date must be YYYY-MM-DD"}), 400
|
||||
new_start = parsed
|
||||
|
||||
new_end = engagement.end_date
|
||||
if "end_date" in data:
|
||||
if data.get("end_date") in (None, ""):
|
||||
new_end = None
|
||||
else:
|
||||
parsed = _parse_date(data.get("end_date"))
|
||||
if parsed is None:
|
||||
return jsonify({"error": "end_date must be YYYY-MM-DD"}), 400
|
||||
new_end = parsed
|
||||
|
||||
if new_end is not None and new_end < new_start:
|
||||
return jsonify({"error": "end_date must be >= start_date"}), 400
|
||||
|
||||
engagement.start_date = new_start
|
||||
engagement.end_date = new_end
|
||||
|
||||
if "status" in data:
|
||||
parsed_status = _parse_status((data.get("status") or "").strip())
|
||||
if parsed_status is None:
|
||||
return (
|
||||
jsonify({"error": "status must be one of: planned, active, closed"}),
|
||||
400,
|
||||
)
|
||||
engagement.status = parsed_status
|
||||
|
||||
db.session.commit()
|
||||
return jsonify(serialize_engagement(engagement)), 200
|
||||
|
||||
|
||||
@engagements_bp.delete("/<int:engagement_id>")
|
||||
@role_required("admin", "redteam")
|
||||
def delete_engagement(engagement_id: int):
|
||||
engagement = db.session.get(Engagement, engagement_id)
|
||||
if engagement is None:
|
||||
return jsonify({"error": "Engagement not found"}), 404
|
||||
db.session.delete(engagement)
|
||||
db.session.commit()
|
||||
return "", 204
|
||||
106
backend/app/api/users.py
Normal file
106
backend/app/api/users.py
Normal file
@@ -0,0 +1,106 @@
|
||||
"""User management endpoints (admin only)."""
|
||||
from __future__ import annotations
|
||||
|
||||
from flask import Blueprint, current_app, jsonify, request
|
||||
|
||||
from backend.app.auth import hash_password, role_required
|
||||
from backend.app.extensions import db
|
||||
from backend.app.models import User, UserRole
|
||||
from backend.app.serializers import serialize_user
|
||||
|
||||
users_bp = Blueprint("users", __name__, url_prefix="/api/users")
|
||||
|
||||
|
||||
def _parse_role(value: object) -> UserRole | None:
|
||||
if not isinstance(value, str):
|
||||
return None
|
||||
try:
|
||||
return UserRole(value)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
@users_bp.get("")
|
||||
@role_required("admin")
|
||||
def list_users():
|
||||
users = User.query.order_by(User.id.asc()).all()
|
||||
return jsonify([serialize_user(u) for u in users]), 200
|
||||
|
||||
|
||||
@users_bp.post("")
|
||||
@role_required("admin")
|
||||
def create_user():
|
||||
data = request.get_json(silent=True) or {}
|
||||
username = (data.get("username") or "").strip()
|
||||
password = data.get("password") or ""
|
||||
role_raw = (data.get("role") or "").strip()
|
||||
|
||||
if not username:
|
||||
return jsonify({"error": "username is required"}), 400
|
||||
|
||||
min_len = current_app.config["MIN_PASSWORD_LENGTH"]
|
||||
if len(password) < min_len:
|
||||
return jsonify({"error": f"password must be at least {min_len} characters"}), 400
|
||||
|
||||
role = _parse_role(role_raw)
|
||||
if role is None:
|
||||
return jsonify({"error": "role must be one of: admin, redteam, soc"}), 400
|
||||
|
||||
if User.query.filter_by(username=username).first() is not None:
|
||||
return jsonify({"error": "username already exists"}), 400
|
||||
|
||||
user = User(username=username, password_hash=hash_password(password), role=role)
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
return jsonify(serialize_user(user)), 201
|
||||
|
||||
|
||||
@users_bp.patch("/<int:user_id>")
|
||||
@role_required("admin")
|
||||
def update_user(user_id: int):
|
||||
user = db.session.get(User, user_id)
|
||||
if user is None:
|
||||
return jsonify({"error": "User not found"}), 404
|
||||
|
||||
data = request.get_json(silent=True) or {}
|
||||
|
||||
if "role" in data:
|
||||
new_role = _parse_role((data.get("role") or "").strip())
|
||||
if new_role is None:
|
||||
return jsonify({"error": "role must be one of: admin, redteam, soc"}), 400
|
||||
# Refuse to demote the last admin.
|
||||
if user.role == UserRole.ADMIN and new_role != UserRole.ADMIN:
|
||||
admin_count = User.query.filter_by(role=UserRole.ADMIN).count()
|
||||
if admin_count <= 1:
|
||||
return jsonify({"error": "Cannot demote the last admin"}), 409
|
||||
user.role = new_role
|
||||
|
||||
if "password" in data:
|
||||
password = data.get("password") or ""
|
||||
min_len = current_app.config["MIN_PASSWORD_LENGTH"]
|
||||
if len(password) < min_len:
|
||||
return (
|
||||
jsonify({"error": f"password must be at least {min_len} characters"}),
|
||||
400,
|
||||
)
|
||||
user.password_hash = hash_password(password)
|
||||
|
||||
db.session.commit()
|
||||
return jsonify(serialize_user(user)), 200
|
||||
|
||||
|
||||
@users_bp.delete("/<int:user_id>")
|
||||
@role_required("admin")
|
||||
def delete_user(user_id: int):
|
||||
user = db.session.get(User, user_id)
|
||||
if user is None:
|
||||
return jsonify({"error": "User not found"}), 404
|
||||
|
||||
if user.role == UserRole.ADMIN:
|
||||
admin_count = User.query.filter_by(role=UserRole.ADMIN).count()
|
||||
if admin_count <= 1:
|
||||
return jsonify({"error": "Cannot delete the last admin"}), 409
|
||||
|
||||
db.session.delete(user)
|
||||
db.session.commit()
|
||||
return "", 204
|
||||
13
backend/app/auth/__init__.py
Normal file
13
backend/app/auth/__init__.py
Normal file
@@ -0,0 +1,13 @@
|
||||
"""Auth helpers (JWT, hashing, decorators)."""
|
||||
from backend.app.auth.decorators import login_required, role_required
|
||||
from backend.app.auth.hashing import hash_password, verify_password
|
||||
from backend.app.auth.jwt import decode_token, encode_token
|
||||
|
||||
__all__ = [
|
||||
"hash_password",
|
||||
"verify_password",
|
||||
"encode_token",
|
||||
"decode_token",
|
||||
"login_required",
|
||||
"role_required",
|
||||
]
|
||||
67
backend/app/auth/decorators.py
Normal file
67
backend/app/auth/decorators.py
Normal file
@@ -0,0 +1,67 @@
|
||||
"""Auth decorators that gate routes on a valid JWT and (optionally) role."""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
from functools import wraps
|
||||
from typing import Any
|
||||
|
||||
import jwt
|
||||
from flask import g, jsonify, request
|
||||
|
||||
from backend.app.auth.jwt import decode_token
|
||||
from backend.app.extensions import db
|
||||
from backend.app.models import User
|
||||
|
||||
|
||||
def _extract_token() -> str | None:
|
||||
header = request.headers.get("Authorization", "")
|
||||
if not header.startswith("Bearer "):
|
||||
return None
|
||||
return header.removeprefix("Bearer ").strip() or None
|
||||
|
||||
|
||||
def login_required(fn: Callable[..., Any]) -> Callable[..., Any]:
|
||||
"""Require a valid JWT. Populates `g.current_user` with the User row."""
|
||||
|
||||
@wraps(fn)
|
||||
def wrapper(*args: Any, **kwargs: Any) -> Any:
|
||||
token = _extract_token()
|
||||
if not token:
|
||||
return jsonify({"error": "Missing or invalid Authorization header"}), 401
|
||||
try:
|
||||
payload = decode_token(token)
|
||||
except jwt.ExpiredSignatureError:
|
||||
return jsonify({"error": "Token expired"}), 401
|
||||
except jwt.PyJWTError:
|
||||
return jsonify({"error": "Invalid token"}), 401
|
||||
|
||||
try:
|
||||
user_id = int(payload["sub"])
|
||||
except (KeyError, TypeError, ValueError):
|
||||
return jsonify({"error": "Invalid token payload"}), 401
|
||||
|
||||
user = db.session.get(User, user_id)
|
||||
if user is None:
|
||||
return jsonify({"error": "User no longer exists"}), 401
|
||||
|
||||
g.current_user = user
|
||||
return fn(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def role_required(*roles: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
|
||||
"""Require the current user to hold one of the given role names."""
|
||||
|
||||
def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
|
||||
@wraps(fn)
|
||||
@login_required
|
||||
def wrapper(*args: Any, **kwargs: Any) -> Any:
|
||||
user = g.current_user
|
||||
if user.role.value not in roles:
|
||||
return jsonify({"error": "Forbidden"}), 403
|
||||
return fn(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
23
backend/app/auth/hashing.py
Normal file
23
backend/app/auth/hashing.py
Normal file
@@ -0,0 +1,23 @@
|
||||
"""Password hashing using argon2."""
|
||||
from __future__ import annotations
|
||||
|
||||
from argon2 import PasswordHasher
|
||||
from argon2.exceptions import VerifyMismatchError
|
||||
|
||||
_hasher = PasswordHasher()
|
||||
|
||||
|
||||
def hash_password(password: str) -> str:
|
||||
"""Return an argon2 hash of `password`."""
|
||||
return _hasher.hash(password)
|
||||
|
||||
|
||||
def verify_password(password_hash: str, password: str) -> bool:
|
||||
"""Return True iff `password` matches `password_hash`."""
|
||||
try:
|
||||
return _hasher.verify(password_hash, password)
|
||||
except VerifyMismatchError:
|
||||
return False
|
||||
except Exception:
|
||||
# Malformed hash or other argon2 error — treat as auth failure.
|
||||
return False
|
||||
34
backend/app/auth/jwt.py
Normal file
34
backend/app/auth/jwt.py
Normal file
@@ -0,0 +1,34 @@
|
||||
"""JWT encode/decode helpers."""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
import jwt
|
||||
from flask import current_app
|
||||
|
||||
|
||||
def encode_token(user_id: int, role: str) -> str:
|
||||
"""Return a signed JWT for the given user."""
|
||||
now = datetime.now(UTC)
|
||||
exp_minutes = current_app.config["JWT_EXP_MINUTES"]
|
||||
payload: dict[str, Any] = {
|
||||
"sub": str(user_id),
|
||||
"role": role,
|
||||
"iat": int(now.timestamp()),
|
||||
"exp": int((now + timedelta(minutes=exp_minutes)).timestamp()),
|
||||
}
|
||||
return jwt.encode(
|
||||
payload,
|
||||
current_app.config["JWT_SECRET"],
|
||||
algorithm=current_app.config["JWT_ALGORITHM"],
|
||||
)
|
||||
|
||||
|
||||
def decode_token(token: str) -> dict[str, Any]:
|
||||
"""Decode + validate a JWT. Raises jwt.PyJWTError on failure."""
|
||||
return jwt.decode(
|
||||
token,
|
||||
current_app.config["JWT_SECRET"],
|
||||
algorithms=[current_app.config["JWT_ALGORITHM"]],
|
||||
)
|
||||
38
backend/app/cli.py
Normal file
38
backend/app/cli.py
Normal file
@@ -0,0 +1,38 @@
|
||||
"""Flask CLI commands."""
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
|
||||
import click
|
||||
from flask import Flask, current_app
|
||||
|
||||
from backend.app.auth import hash_password
|
||||
from backend.app.extensions import db
|
||||
from backend.app.models import User, UserRole
|
||||
|
||||
|
||||
def register_cli(app: Flask) -> None:
|
||||
@app.cli.command("create-admin")
|
||||
@click.argument("username")
|
||||
@click.argument("password")
|
||||
def create_admin(username: str, password: str) -> None:
|
||||
"""Create an admin user. Used to bootstrap the first account."""
|
||||
min_len = current_app.config["MIN_PASSWORD_LENGTH"]
|
||||
if len(password) < min_len:
|
||||
click.echo(
|
||||
f"Error: password must be at least {min_len} characters.", err=True
|
||||
)
|
||||
sys.exit(2)
|
||||
|
||||
if User.query.filter_by(username=username).first() is not None:
|
||||
click.echo(f"Error: username '{username}' already exists.", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
user = User(
|
||||
username=username,
|
||||
password_hash=hash_password(password),
|
||||
role=UserRole.ADMIN,
|
||||
)
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
click.echo(f"Admin user '{username}' created (id={user.id}).")
|
||||
35
backend/app/config.py
Normal file
35
backend/app/config.py
Normal file
@@ -0,0 +1,35 @@
|
||||
"""Application configuration loaded from environment variables."""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
|
||||
|
||||
class Config:
|
||||
"""Base configuration. Reads from env vars; fails loud on missing secrets."""
|
||||
|
||||
SQLALCHEMY_TRACK_MODIFICATIONS = False
|
||||
JWT_ALGORITHM = "HS256"
|
||||
JWT_EXP_MINUTES = 60
|
||||
MIN_PASSWORD_LENGTH = 8
|
||||
|
||||
def __init__(self) -> None:
|
||||
db_path = os.environ.get("MIMIC_DB_PATH", "/data/mimic.sqlite")
|
||||
self.SQLALCHEMY_DATABASE_URI = f"sqlite:///{db_path}"
|
||||
|
||||
jwt_secret = os.environ.get("MIMIC_JWT_SECRET")
|
||||
if not jwt_secret:
|
||||
raise RuntimeError(
|
||||
"MIMIC_JWT_SECRET environment variable is required but not set."
|
||||
)
|
||||
self.JWT_SECRET = jwt_secret
|
||||
|
||||
|
||||
class TestConfig(Config):
|
||||
"""Config for pytest. Uses in-memory SQLite + fixed JWT secret."""
|
||||
|
||||
TESTING = True
|
||||
|
||||
def __init__(self) -> None:
|
||||
# Bypass parent's env requirement; tests inject their own secret.
|
||||
self.SQLALCHEMY_DATABASE_URI = "sqlite:///:memory:"
|
||||
self.JWT_SECRET = "test-secret-do-not-use-in-prod"
|
||||
21
backend/app/errors.py
Normal file
21
backend/app/errors.py
Normal file
@@ -0,0 +1,21 @@
|
||||
"""Uniform JSON error handlers."""
|
||||
from __future__ import annotations
|
||||
|
||||
from flask import Flask, jsonify
|
||||
from werkzeug.exceptions import HTTPException
|
||||
|
||||
|
||||
def register_error_handlers(app: Flask) -> None:
|
||||
@app.errorhandler(HTTPException)
|
||||
def handle_http_exception(exc: HTTPException):
|
||||
response = jsonify({"error": exc.description})
|
||||
response.status_code = exc.code or 500
|
||||
return response
|
||||
|
||||
@app.errorhandler(404)
|
||||
def handle_404(_exc):
|
||||
return jsonify({"error": "Not found"}), 404
|
||||
|
||||
@app.errorhandler(405)
|
||||
def handle_405(_exc):
|
||||
return jsonify({"error": "Method not allowed"}), 405
|
||||
6
backend/app/extensions.py
Normal file
6
backend/app/extensions.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""Shared Flask extension instances."""
|
||||
from flask_migrate import Migrate
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
|
||||
db = SQLAlchemy()
|
||||
migrate = Migrate()
|
||||
5
backend/app/models/__init__.py
Normal file
5
backend/app/models/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""SQLAlchemy models."""
|
||||
from backend.app.models.engagement import Engagement, EngagementStatus
|
||||
from backend.app.models.user import User, UserRole
|
||||
|
||||
__all__ = ["User", "UserRole", "Engagement", "EngagementStatus"]
|
||||
39
backend/app/models/engagement.py
Normal file
39
backend/app/models/engagement.py
Normal file
@@ -0,0 +1,39 @@
|
||||
"""Engagement model."""
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from backend.app.extensions import db
|
||||
|
||||
|
||||
class EngagementStatus(str, enum.Enum):
|
||||
PLANNED = "planned"
|
||||
ACTIVE = "active"
|
||||
CLOSED = "closed"
|
||||
|
||||
|
||||
class Engagement(db.Model): # type: ignore[name-defined]
|
||||
__tablename__ = "engagements"
|
||||
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
name = db.Column(db.String(255), nullable=False)
|
||||
description = db.Column(db.Text, nullable=True)
|
||||
start_date = db.Column(db.Date, nullable=False)
|
||||
end_date = db.Column(db.Date, nullable=True)
|
||||
status = db.Column(
|
||||
db.Enum(EngagementStatus, name="engagement_status"),
|
||||
nullable=False,
|
||||
default=EngagementStatus.PLANNED,
|
||||
)
|
||||
created_at = db.Column(
|
||||
db.DateTime, nullable=False, default=lambda: datetime.now(UTC)
|
||||
)
|
||||
created_by_id = db.Column(
|
||||
db.Integer, db.ForeignKey("users.id", ondelete="RESTRICT"), nullable=False
|
||||
)
|
||||
|
||||
created_by = db.relationship("User", backref="engagements", lazy="joined")
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<Engagement {self.id} {self.name!r}>"
|
||||
28
backend/app/models/user.py
Normal file
28
backend/app/models/user.py
Normal file
@@ -0,0 +1,28 @@
|
||||
"""User model."""
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from backend.app.extensions import db
|
||||
|
||||
|
||||
class UserRole(str, enum.Enum):
|
||||
ADMIN = "admin"
|
||||
REDTEAM = "redteam"
|
||||
SOC = "soc"
|
||||
|
||||
|
||||
class User(db.Model): # type: ignore[name-defined]
|
||||
__tablename__ = "users"
|
||||
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
username = db.Column(db.String(64), unique=True, nullable=False, index=True)
|
||||
password_hash = db.Column(db.String(255), nullable=False)
|
||||
role = db.Column(db.Enum(UserRole, name="user_role"), nullable=False)
|
||||
created_at = db.Column(
|
||||
db.DateTime, nullable=False, default=lambda: datetime.now(UTC)
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<User {self.username} ({self.role.value})>"
|
||||
34
backend/app/serializers.py
Normal file
34
backend/app/serializers.py
Normal file
@@ -0,0 +1,34 @@
|
||||
"""JSON serializers for API responses."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from backend.app.models import Engagement, User
|
||||
|
||||
|
||||
def serialize_user(user: User) -> dict[str, Any]:
|
||||
return {
|
||||
"id": user.id,
|
||||
"username": user.username,
|
||||
"role": user.role.value,
|
||||
"created_at": user.created_at.isoformat() if user.created_at else None,
|
||||
}
|
||||
|
||||
|
||||
def serialize_user_brief(user: User) -> dict[str, Any]:
|
||||
return {"id": user.id, "username": user.username}
|
||||
|
||||
|
||||
def serialize_engagement(engagement: Engagement) -> dict[str, Any]:
|
||||
return {
|
||||
"id": engagement.id,
|
||||
"name": engagement.name,
|
||||
"description": engagement.description,
|
||||
"start_date": engagement.start_date.isoformat() if engagement.start_date else None,
|
||||
"end_date": engagement.end_date.isoformat() if engagement.end_date else None,
|
||||
"status": engagement.status.value,
|
||||
"created_at": engagement.created_at.isoformat() if engagement.created_at else None,
|
||||
"created_by": serialize_user_brief(engagement.created_by) # type: ignore[arg-type]
|
||||
if engagement.created_by
|
||||
else None,
|
||||
}
|
||||
1
backend/migrations/README
Normal file
1
backend/migrations/README
Normal file
@@ -0,0 +1 @@
|
||||
Generic single-database configuration for Flask-Migrate / Alembic.
|
||||
40
backend/migrations/alembic.ini
Normal file
40
backend/migrations/alembic.ini
Normal file
@@ -0,0 +1,40 @@
|
||||
# Alembic config. Most settings are injected at runtime by Flask-Migrate.
|
||||
[alembic]
|
||||
script_location = %(here)s
|
||||
sqlalchemy.url = driver://user:pass@localhost/dbname
|
||||
|
||||
[post_write_hooks]
|
||||
|
||||
[loggers]
|
||||
keys = root,sqlalchemy,alembic
|
||||
|
||||
[handlers]
|
||||
keys = console
|
||||
|
||||
[formatters]
|
||||
keys = generic
|
||||
|
||||
[logger_root]
|
||||
level = WARN
|
||||
handlers = console
|
||||
qualname =
|
||||
|
||||
[logger_sqlalchemy]
|
||||
level = WARN
|
||||
handlers =
|
||||
qualname = sqlalchemy.engine
|
||||
|
||||
[logger_alembic]
|
||||
level = INFO
|
||||
handlers =
|
||||
qualname = alembic
|
||||
|
||||
[handler_console]
|
||||
class = StreamHandler
|
||||
args = (sys.stderr,)
|
||||
level = NOTSET
|
||||
formatter = generic
|
||||
|
||||
[formatter_generic]
|
||||
format = %(levelname)-5.5s [%(name)s] %(message)s
|
||||
datefmt = %H:%M:%S
|
||||
73
backend/migrations/env.py
Normal file
73
backend/migrations/env.py
Normal file
@@ -0,0 +1,73 @@
|
||||
"""Alembic environment, wired to Flask-Migrate."""
|
||||
|
||||
import logging
|
||||
from logging.config import fileConfig
|
||||
|
||||
from alembic import context
|
||||
from flask import current_app
|
||||
|
||||
config = context.config
|
||||
fileConfig(config.config_file_name)
|
||||
logger = logging.getLogger("alembic.env")
|
||||
|
||||
|
||||
def get_engine():
|
||||
try:
|
||||
# Flask-SQLAlchemy < 3.x
|
||||
return current_app.extensions["migrate"].db.get_engine()
|
||||
except (TypeError, AttributeError):
|
||||
# Flask-SQLAlchemy >= 3.x
|
||||
return current_app.extensions["migrate"].db.engine
|
||||
|
||||
|
||||
def get_engine_url() -> str:
|
||||
try:
|
||||
return get_engine().url.render_as_string(hide_password=False).replace("%", "%%")
|
||||
except AttributeError:
|
||||
return str(get_engine().url).replace("%", "%%")
|
||||
|
||||
|
||||
config.set_main_option("sqlalchemy.url", get_engine_url())
|
||||
target_db = current_app.extensions["migrate"].db
|
||||
|
||||
|
||||
def get_metadata():
|
||||
if hasattr(target_db, "metadatas"):
|
||||
return target_db.metadatas[None]
|
||||
return target_db.metadata
|
||||
|
||||
|
||||
def run_migrations_offline() -> None:
|
||||
url = config.get_main_option("sqlalchemy.url")
|
||||
context.configure(url=url, target_metadata=get_metadata(), literal_binds=True)
|
||||
with context.begin_transaction():
|
||||
context.run_migrations()
|
||||
|
||||
|
||||
def run_migrations_online() -> None:
|
||||
def process_revision_directives(context, revision, directives): # noqa: ANN001
|
||||
if getattr(config.cmd_opts, "autogenerate", False):
|
||||
script = directives[0]
|
||||
if script.upgrade_ops.is_empty():
|
||||
directives[:] = []
|
||||
logger.info("No changes in schema detected.")
|
||||
|
||||
conf_args = current_app.extensions["migrate"].configure_args
|
||||
if conf_args.get("process_revision_directives") is None:
|
||||
conf_args["process_revision_directives"] = process_revision_directives
|
||||
|
||||
connectable = get_engine()
|
||||
with connectable.connect() as connection:
|
||||
context.configure(
|
||||
connection=connection,
|
||||
target_metadata=get_metadata(),
|
||||
**conf_args,
|
||||
)
|
||||
with context.begin_transaction():
|
||||
context.run_migrations()
|
||||
|
||||
|
||||
if context.is_offline_mode():
|
||||
run_migrations_offline()
|
||||
else:
|
||||
run_migrations_online()
|
||||
24
backend/migrations/script.py.mako
Normal file
24
backend/migrations/script.py.mako
Normal file
@@ -0,0 +1,24 @@
|
||||
"""${message}
|
||||
|
||||
Revision ID: ${up_revision}
|
||||
Revises: ${down_revision | comma,n}
|
||||
Create Date: ${create_date}
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
${imports if imports else ""}
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = ${repr(up_revision)}
|
||||
down_revision = ${repr(down_revision)}
|
||||
branch_labels = ${repr(branch_labels)}
|
||||
depends_on = ${repr(depends_on)}
|
||||
|
||||
|
||||
def upgrade():
|
||||
${upgrades if upgrades else "pass"}
|
||||
|
||||
|
||||
def downgrade():
|
||||
${downgrades if downgrades else "pass"}
|
||||
60
backend/migrations/versions/0001_initial_schema.py
Normal file
60
backend/migrations/versions/0001_initial_schema.py
Normal file
@@ -0,0 +1,60 @@
|
||||
"""initial schema: users + engagements
|
||||
|
||||
Revision ID: 0001
|
||||
Revises:
|
||||
Create Date: 2026-05-26 00:00:00.000000
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "0001"
|
||||
down_revision = None
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
"users",
|
||||
sa.Column("id", sa.Integer(), primary_key=True),
|
||||
sa.Column("username", sa.String(length=64), nullable=False),
|
||||
sa.Column("password_hash", sa.String(length=255), nullable=False),
|
||||
sa.Column(
|
||||
"role",
|
||||
sa.Enum("admin", "redteam", "soc", name="user_role"),
|
||||
nullable=False,
|
||||
),
|
||||
sa.Column("created_at", sa.DateTime(), nullable=False),
|
||||
sa.UniqueConstraint("username", name="uq_users_username"),
|
||||
)
|
||||
op.create_index("ix_users_username", "users", ["username"], unique=True)
|
||||
|
||||
op.create_table(
|
||||
"engagements",
|
||||
sa.Column("id", sa.Integer(), primary_key=True),
|
||||
sa.Column("name", sa.String(length=255), nullable=False),
|
||||
sa.Column("description", sa.Text(), nullable=True),
|
||||
sa.Column("start_date", sa.Date(), nullable=False),
|
||||
sa.Column("end_date", sa.Date(), nullable=True),
|
||||
sa.Column(
|
||||
"status",
|
||||
sa.Enum("planned", "active", "closed", name="engagement_status"),
|
||||
nullable=False,
|
||||
),
|
||||
sa.Column("created_at", sa.DateTime(), nullable=False),
|
||||
sa.Column("created_by_id", sa.Integer(), nullable=False),
|
||||
sa.ForeignKeyConstraint(
|
||||
["created_by_id"], ["users.id"], ondelete="RESTRICT",
|
||||
name="fk_engagements_created_by_id_users",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_table("engagements")
|
||||
op.drop_index("ix_users_username", table_name="users")
|
||||
op.drop_table("users")
|
||||
sa.Enum(name="engagement_status").drop(op.get_bind(), checkfirst=True)
|
||||
sa.Enum(name="user_role").drop(op.get_bind(), checkfirst=True)
|
||||
28
backend/pyproject.toml
Normal file
28
backend/pyproject.toml
Normal file
@@ -0,0 +1,28 @@
|
||||
[project]
|
||||
name = "mimic-backend"
|
||||
version = "0.1.0"
|
||||
description = "Mimic BAS backend (Flask API)"
|
||||
requires-python = ">=3.12"
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 100
|
||||
target-version = "py312"
|
||||
extend-exclude = ["migrations/versions"]
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = ["E", "F", "I", "B", "UP", "SIM"]
|
||||
ignore = ["E501"]
|
||||
|
||||
[tool.mypy]
|
||||
python_version = "3.12"
|
||||
strict = false
|
||||
ignore_missing_imports = true
|
||||
warn_unused_ignores = true
|
||||
warn_redundant_casts = true
|
||||
disallow_untyped_defs = false
|
||||
no_implicit_optional = true
|
||||
exclude = ["migrations/", "tests/"]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
testpaths = ["tests"]
|
||||
addopts = "-ra"
|
||||
8
backend/requirements.txt
Normal file
8
backend/requirements.txt
Normal file
@@ -0,0 +1,8 @@
|
||||
Flask==3.0.3
|
||||
Flask-SQLAlchemy==3.1.1
|
||||
Flask-Migrate==4.0.7
|
||||
PyJWT==2.9.0
|
||||
argon2-cffi==23.1.0
|
||||
pytest==8.3.3
|
||||
ruff==0.6.9
|
||||
mypy==1.11.2
|
||||
0
backend/tests/__init__.py
Normal file
0
backend/tests/__init__.py
Normal file
92
backend/tests/conftest.py
Normal file
92
backend/tests/conftest.py
Normal file
@@ -0,0 +1,92 @@
|
||||
"""Shared pytest fixtures."""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Generator
|
||||
|
||||
import pytest
|
||||
from flask import Flask
|
||||
from flask.testing import FlaskClient
|
||||
|
||||
from backend.app import create_app
|
||||
from backend.app.auth import hash_password
|
||||
from backend.app.config import TestConfig
|
||||
from backend.app.extensions import db
|
||||
from backend.app.models import User, UserRole
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def app() -> Generator[Flask, None, None]:
|
||||
application = create_app(TestConfig())
|
||||
with application.app_context():
|
||||
db.create_all()
|
||||
yield application
|
||||
db.session.remove()
|
||||
db.drop_all()
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def client(app: Flask) -> FlaskClient:
|
||||
return app.test_client()
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def admin_user(app: Flask) -> User:
|
||||
user = User(
|
||||
username="admin1",
|
||||
password_hash=hash_password("adminpass1"),
|
||||
role=UserRole.ADMIN,
|
||||
)
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
return user
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def redteam_user(app: Flask) -> User:
|
||||
user = User(
|
||||
username="redteam1",
|
||||
password_hash=hash_password("redteampass1"),
|
||||
role=UserRole.REDTEAM,
|
||||
)
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
return user
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def soc_user(app: Flask) -> User:
|
||||
user = User(
|
||||
username="soc1",
|
||||
password_hash=hash_password("socpass1"),
|
||||
role=UserRole.SOC,
|
||||
)
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
return user
|
||||
|
||||
|
||||
def _login(client: FlaskClient, username: str, password: str) -> str:
|
||||
resp = client.post(
|
||||
"/api/auth/login", json={"username": username, "password": password}
|
||||
)
|
||||
assert resp.status_code == 200, resp.get_json()
|
||||
return resp.get_json()["access_token"]
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def admin_token(client: FlaskClient, admin_user: User) -> str:
|
||||
return _login(client, "admin1", "adminpass1")
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def redteam_token(client: FlaskClient, redteam_user: User) -> str:
|
||||
return _login(client, "redteam1", "redteampass1")
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def soc_token(client: FlaskClient, soc_user: User) -> str:
|
||||
return _login(client, "soc1", "socpass1")
|
||||
|
||||
|
||||
def auth_headers(token: str) -> dict[str, str]:
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
41
backend/tests/test_app.py
Normal file
41
backend/tests/test_app.py
Normal file
@@ -0,0 +1,41 @@
|
||||
"""App-level tests: SPA fallback, health, error shapes."""
|
||||
from __future__ import annotations
|
||||
|
||||
from flask.testing import FlaskClient
|
||||
|
||||
|
||||
def test_health_endpoint(client: FlaskClient) -> None:
|
||||
resp = client.get("/api/health")
|
||||
assert resp.status_code == 200
|
||||
assert resp.get_json() == {"status": "ok"}
|
||||
|
||||
|
||||
def test_unknown_api_path_returns_json_404(client: FlaskClient) -> None:
|
||||
"""SPA fallback must not shadow unknown /api/* routes with index.html."""
|
||||
resp = client.get("/api/nonexistent")
|
||||
assert resp.status_code == 404
|
||||
assert resp.is_json, f"expected JSON, got Content-Type={resp.content_type}"
|
||||
assert resp.get_json() == {"error": "Not found"}
|
||||
|
||||
|
||||
def test_unknown_nested_api_path_returns_json_404(client: FlaskClient) -> None:
|
||||
resp = client.get("/api/foo/bar/baz")
|
||||
assert resp.status_code == 404
|
||||
assert resp.is_json
|
||||
assert resp.get_json() == {"error": "Not found"}
|
||||
|
||||
|
||||
def test_wrong_method_on_api_returns_json(client: FlaskClient) -> None:
|
||||
# PUT is not defined on /api/auth/login — should stay JSON, not HTML.
|
||||
resp = client.put("/api/auth/login")
|
||||
assert resp.status_code in (404, 405)
|
||||
assert resp.is_json
|
||||
|
||||
|
||||
def test_index_without_built_frontend_returns_json(client: FlaskClient) -> None:
|
||||
resp = client.get("/")
|
||||
assert resp.status_code == 200
|
||||
# Tests run with no built frontend → factory falls back to a JSON status payload.
|
||||
assert resp.is_json
|
||||
body = resp.get_json()
|
||||
assert body["status"] == "ok"
|
||||
93
backend/tests/test_auth.py
Normal file
93
backend/tests/test_auth.py
Normal file
@@ -0,0 +1,93 @@
|
||||
"""Auth endpoint tests."""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
import jwt
|
||||
from flask import Flask
|
||||
from flask.testing import FlaskClient
|
||||
|
||||
from backend.app.models import User
|
||||
|
||||
|
||||
def test_login_success(client: FlaskClient, admin_user: User) -> None:
|
||||
resp = client.post(
|
||||
"/api/auth/login", json={"username": "admin1", "password": "adminpass1"}
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
body = resp.get_json()
|
||||
assert "access_token" in body and body["access_token"]
|
||||
assert body["user"] == {"id": admin_user.id, "username": "admin1", "role": "admin"}
|
||||
|
||||
|
||||
def test_login_wrong_password(client: FlaskClient, admin_user: User) -> None:
|
||||
resp = client.post(
|
||||
"/api/auth/login", json={"username": "admin1", "password": "wrong"}
|
||||
)
|
||||
assert resp.status_code == 401
|
||||
assert resp.get_json() == {"error": "Invalid credentials"}
|
||||
|
||||
|
||||
def test_login_unknown_user(client: FlaskClient) -> None:
|
||||
resp = client.post(
|
||||
"/api/auth/login", json={"username": "ghost", "password": "anything!!"}
|
||||
)
|
||||
assert resp.status_code == 401
|
||||
# Generic message — must not leak whether username exists.
|
||||
assert resp.get_json() == {"error": "Invalid credentials"}
|
||||
|
||||
|
||||
def test_login_missing_fields(client: FlaskClient) -> None:
|
||||
resp = client.post("/api/auth/login", json={})
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
def test_me_requires_token(client: FlaskClient) -> None:
|
||||
resp = client.get("/api/auth/me")
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
def test_me_returns_current_user(
|
||||
client: FlaskClient, admin_user: User, admin_token: str
|
||||
) -> None:
|
||||
resp = client.get("/api/auth/me", headers={"Authorization": f"Bearer {admin_token}"})
|
||||
assert resp.status_code == 200
|
||||
body = resp.get_json()
|
||||
assert body["username"] == "admin1"
|
||||
assert body["role"] == "admin"
|
||||
assert "password_hash" not in body
|
||||
|
||||
|
||||
def test_me_with_invalid_token(client: FlaskClient) -> None:
|
||||
resp = client.get("/api/auth/me", headers={"Authorization": "Bearer not.a.jwt"})
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
def test_me_with_expired_token(
|
||||
app: Flask, client: FlaskClient, admin_user: User
|
||||
) -> None:
|
||||
now = datetime.now(UTC) - timedelta(minutes=120)
|
||||
payload = {
|
||||
"sub": str(admin_user.id),
|
||||
"role": "admin",
|
||||
"iat": int(now.timestamp()),
|
||||
"exp": int((now + timedelta(minutes=1)).timestamp()),
|
||||
}
|
||||
token = jwt.encode(
|
||||
payload, app.config["JWT_SECRET"], algorithm=app.config["JWT_ALGORITHM"]
|
||||
)
|
||||
resp = client.get("/api/auth/me", headers={"Authorization": f"Bearer {token}"})
|
||||
assert resp.status_code == 401
|
||||
assert resp.get_json() == {"error": "Token expired"}
|
||||
|
||||
|
||||
def test_logout_ok_with_token(client: FlaskClient, admin_token: str) -> None:
|
||||
resp = client.post(
|
||||
"/api/auth/logout", headers={"Authorization": f"Bearer {admin_token}"}
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
|
||||
def test_logout_without_token_is_401(client: FlaskClient) -> None:
|
||||
resp = client.post("/api/auth/logout")
|
||||
assert resp.status_code == 401
|
||||
47
backend/tests/test_cli_create_admin.py
Normal file
47
backend/tests/test_cli_create_admin.py
Normal file
@@ -0,0 +1,47 @@
|
||||
"""CLI tests for `flask create-admin`."""
|
||||
from __future__ import annotations
|
||||
|
||||
from flask import Flask
|
||||
|
||||
from backend.app.auth import hash_password
|
||||
from backend.app.extensions import db
|
||||
from backend.app.models import User, UserRole
|
||||
|
||||
|
||||
def test_create_admin_success(app: Flask) -> None:
|
||||
runner = app.test_cli_runner()
|
||||
result = runner.invoke(args=["create-admin", "alice", "p4ssw0rd"])
|
||||
assert result.exit_code == 0, result.output
|
||||
assert "created" in result.output.lower()
|
||||
user = User.query.filter_by(username="alice").first()
|
||||
assert user is not None
|
||||
assert user.role == UserRole.ADMIN
|
||||
|
||||
|
||||
def test_create_admin_duplicate_username(app: Flask) -> None:
|
||||
existing = User(
|
||||
username="bob", password_hash=hash_password("originalpw"), role=UserRole.ADMIN
|
||||
)
|
||||
db.session.add(existing)
|
||||
db.session.commit()
|
||||
|
||||
runner = app.test_cli_runner()
|
||||
result = runner.invoke(args=["create-admin", "bob", "anotherpw1"])
|
||||
assert result.exit_code != 0
|
||||
assert "exists" in (result.output + (result.stderr_bytes.decode() if result.stderr_bytes else "")).lower()
|
||||
|
||||
|
||||
def test_create_admin_short_password(app: Flask) -> None:
|
||||
runner = app.test_cli_runner()
|
||||
result = runner.invoke(args=["create-admin", "charlie", "abc"])
|
||||
assert result.exit_code != 0
|
||||
combined = (result.output + (result.stderr_bytes.decode() if result.stderr_bytes else "")).lower()
|
||||
assert "8 characters" in combined
|
||||
assert User.query.filter_by(username="charlie").first() is None
|
||||
|
||||
|
||||
def test_create_admin_missing_args(app: Flask) -> None:
|
||||
runner = app.test_cli_runner()
|
||||
result = runner.invoke(args=["create-admin"])
|
||||
# Click's UsageError exits with code 2
|
||||
assert result.exit_code != 0
|
||||
281
backend/tests/test_engagements.py
Normal file
281
backend/tests/test_engagements.py
Normal file
@@ -0,0 +1,281 @@
|
||||
"""Engagement endpoint tests."""
|
||||
from __future__ import annotations
|
||||
|
||||
from flask.testing import FlaskClient
|
||||
|
||||
from backend.app.models import User
|
||||
from backend.tests.conftest import auth_headers as _h
|
||||
|
||||
|
||||
def _create(
|
||||
client: FlaskClient, token: str, **overrides: object
|
||||
) -> dict[str, object]:
|
||||
payload: dict[str, object] = {
|
||||
"name": "Op Alpha",
|
||||
"description": "first engagement",
|
||||
"start_date": "2026-06-01",
|
||||
"end_date": "2026-06-10",
|
||||
"status": "planned",
|
||||
}
|
||||
payload.update(overrides)
|
||||
resp = client.post("/api/engagements", headers=_h(token), json=payload)
|
||||
assert resp.status_code == 201, resp.get_json()
|
||||
return resp.get_json()
|
||||
|
||||
|
||||
def test_create_engagement_as_redteam(
|
||||
client: FlaskClient, redteam_user: User, redteam_token: str
|
||||
) -> None:
|
||||
body = _create(client, redteam_token)
|
||||
assert body["name"] == "Op Alpha"
|
||||
assert body["status"] == "planned"
|
||||
assert body["created_by"] == {"id": redteam_user.id, "username": "redteam1"}
|
||||
|
||||
|
||||
def test_create_engagement_as_admin(
|
||||
client: FlaskClient, admin_user: User, admin_token: str
|
||||
) -> None:
|
||||
body = _create(client, admin_token, name="Op Admin")
|
||||
assert body["created_by"]["username"] == "admin1"
|
||||
|
||||
|
||||
def test_create_engagement_soc_forbidden(
|
||||
client: FlaskClient, soc_token: str
|
||||
) -> None:
|
||||
resp = client.post(
|
||||
"/api/engagements",
|
||||
headers=_h(soc_token),
|
||||
json={"name": "x", "start_date": "2026-06-01"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
def test_create_engagement_unauth(client: FlaskClient) -> None:
|
||||
resp = client.post(
|
||||
"/api/engagements", json={"name": "x", "start_date": "2026-06-01"}
|
||||
)
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
def test_create_engagement_missing_name(
|
||||
client: FlaskClient, redteam_token: str
|
||||
) -> None:
|
||||
resp = client.post(
|
||||
"/api/engagements",
|
||||
headers=_h(redteam_token),
|
||||
json={"start_date": "2026-06-01"},
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
|
||||
def test_create_engagement_bad_date(
|
||||
client: FlaskClient, redteam_token: str
|
||||
) -> None:
|
||||
resp = client.post(
|
||||
"/api/engagements",
|
||||
headers=_h(redteam_token),
|
||||
json={"name": "bad", "start_date": "not-a-date"},
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
|
||||
def test_create_engagement_end_before_start(
|
||||
client: FlaskClient, redteam_token: str
|
||||
) -> None:
|
||||
resp = client.post(
|
||||
"/api/engagements",
|
||||
headers=_h(redteam_token),
|
||||
json={
|
||||
"name": "bad",
|
||||
"start_date": "2026-06-10",
|
||||
"end_date": "2026-06-01",
|
||||
},
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
|
||||
def test_create_engagement_bad_status(
|
||||
client: FlaskClient, redteam_token: str
|
||||
) -> None:
|
||||
resp = client.post(
|
||||
"/api/engagements",
|
||||
headers=_h(redteam_token),
|
||||
json={"name": "bad", "start_date": "2026-06-01", "status": "wat"},
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
|
||||
def test_create_engagement_default_status_planned(
|
||||
client: FlaskClient, redteam_token: str
|
||||
) -> None:
|
||||
resp = client.post(
|
||||
"/api/engagements",
|
||||
headers=_h(redteam_token),
|
||||
json={"name": "Op default", "start_date": "2026-06-01"},
|
||||
)
|
||||
assert resp.status_code == 201
|
||||
assert resp.get_json()["status"] == "planned"
|
||||
|
||||
|
||||
def test_list_engagements_all_roles_can_read(
|
||||
client: FlaskClient,
|
||||
redteam_token: str,
|
||||
soc_token: str,
|
||||
admin_token: str,
|
||||
) -> None:
|
||||
_create(client, redteam_token)
|
||||
for token in (redteam_token, soc_token, admin_token):
|
||||
resp = client.get("/api/engagements", headers=_h(token))
|
||||
assert resp.status_code == 200
|
||||
body = resp.get_json()
|
||||
assert isinstance(body, list)
|
||||
assert len(body) >= 1
|
||||
assert body[0]["created_by"] == {
|
||||
"id": body[0]["created_by"]["id"],
|
||||
"username": body[0]["created_by"]["username"],
|
||||
}
|
||||
|
||||
|
||||
def test_get_engagement_404(client: FlaskClient, redteam_token: str) -> None:
|
||||
resp = client.get("/api/engagements/9999", headers=_h(redteam_token))
|
||||
assert resp.status_code == 404
|
||||
|
||||
|
||||
def test_get_engagement_ok(client: FlaskClient, redteam_token: str) -> None:
|
||||
created = _create(client, redteam_token)
|
||||
resp = client.get(
|
||||
f"/api/engagements/{created['id']}", headers=_h(redteam_token)
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.get_json()["id"] == created["id"]
|
||||
|
||||
|
||||
def test_patch_engagement_redteam(
|
||||
client: FlaskClient, redteam_token: str
|
||||
) -> None:
|
||||
created = _create(client, redteam_token)
|
||||
resp = client.patch(
|
||||
f"/api/engagements/{created['id']}",
|
||||
headers=_h(redteam_token),
|
||||
json={"status": "active", "description": "now in progress"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
body = resp.get_json()
|
||||
assert body["status"] == "active"
|
||||
assert body["description"] == "now in progress"
|
||||
|
||||
|
||||
def test_patch_engagement_admin(
|
||||
client: FlaskClient, redteam_token: str, admin_token: str
|
||||
) -> None:
|
||||
created = _create(client, redteam_token)
|
||||
resp = client.patch(
|
||||
f"/api/engagements/{created['id']}",
|
||||
headers=_h(admin_token),
|
||||
json={"name": "Op renamed"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.get_json()["name"] == "Op renamed"
|
||||
|
||||
|
||||
def test_patch_engagement_bad_status(
|
||||
client: FlaskClient, redteam_token: str
|
||||
) -> None:
|
||||
created = _create(client, redteam_token)
|
||||
resp = client.patch(
|
||||
f"/api/engagements/{created['id']}",
|
||||
headers=_h(redteam_token),
|
||||
json={"status": "wat"},
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
|
||||
def test_patch_engagement_soc_forbidden(
|
||||
client: FlaskClient, redteam_token: str, soc_token: str
|
||||
) -> None:
|
||||
created = _create(client, redteam_token)
|
||||
resp = client.patch(
|
||||
f"/api/engagements/{created['id']}",
|
||||
headers=_h(soc_token),
|
||||
json={"status": "closed"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
def test_patch_engagement_end_before_start(
|
||||
client: FlaskClient, redteam_token: str
|
||||
) -> None:
|
||||
created = _create(client, redteam_token, start_date="2026-06-01")
|
||||
resp = client.patch(
|
||||
f"/api/engagements/{created['id']}",
|
||||
headers=_h(redteam_token),
|
||||
json={"end_date": "2026-05-30"},
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
|
||||
def test_patch_engagement_clear_end_date(
|
||||
client: FlaskClient, redteam_token: str
|
||||
) -> None:
|
||||
created = _create(client, redteam_token, end_date="2026-06-30")
|
||||
resp = client.patch(
|
||||
f"/api/engagements/{created['id']}",
|
||||
headers=_h(redteam_token),
|
||||
json={"end_date": None},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.get_json()["end_date"] is None
|
||||
|
||||
|
||||
def test_patch_engagement_empty_name_rejected(
|
||||
client: FlaskClient, redteam_token: str
|
||||
) -> None:
|
||||
created = _create(client, redteam_token)
|
||||
resp = client.patch(
|
||||
f"/api/engagements/{created['id']}",
|
||||
headers=_h(redteam_token),
|
||||
json={"name": " "},
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
|
||||
def test_patch_engagement_404(client: FlaskClient, redteam_token: str) -> None:
|
||||
resp = client.patch(
|
||||
"/api/engagements/9999", headers=_h(redteam_token), json={"name": "x"}
|
||||
)
|
||||
assert resp.status_code == 404
|
||||
|
||||
|
||||
def test_delete_engagement_redteam(
|
||||
client: FlaskClient, redteam_token: str
|
||||
) -> None:
|
||||
created = _create(client, redteam_token)
|
||||
resp = client.delete(
|
||||
f"/api/engagements/{created['id']}", headers=_h(redteam_token)
|
||||
)
|
||||
assert resp.status_code == 204
|
||||
|
||||
|
||||
def test_delete_engagement_admin(
|
||||
client: FlaskClient, redteam_token: str, admin_token: str
|
||||
) -> None:
|
||||
created = _create(client, redteam_token)
|
||||
resp = client.delete(
|
||||
f"/api/engagements/{created['id']}", headers=_h(admin_token)
|
||||
)
|
||||
assert resp.status_code == 204
|
||||
|
||||
|
||||
def test_delete_engagement_soc_forbidden(
|
||||
client: FlaskClient, redteam_token: str, soc_token: str
|
||||
) -> None:
|
||||
created = _create(client, redteam_token)
|
||||
resp = client.delete(
|
||||
f"/api/engagements/{created['id']}", headers=_h(soc_token)
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
def test_delete_engagement_404(client: FlaskClient, redteam_token: str) -> None:
|
||||
resp = client.delete("/api/engagements/9999", headers=_h(redteam_token))
|
||||
assert resp.status_code == 404
|
||||
201
backend/tests/test_users.py
Normal file
201
backend/tests/test_users.py
Normal file
@@ -0,0 +1,201 @@
|
||||
"""User management endpoint tests."""
|
||||
from __future__ import annotations
|
||||
|
||||
from flask.testing import FlaskClient
|
||||
|
||||
from backend.app.auth import hash_password
|
||||
from backend.app.extensions import db
|
||||
from backend.app.models import User, UserRole
|
||||
from backend.tests.conftest import auth_headers as _h
|
||||
|
||||
|
||||
def test_list_users_admin_only(
|
||||
client: FlaskClient, admin_user: User, admin_token: str
|
||||
) -> None:
|
||||
resp = client.get("/api/users", headers=_h(admin_token))
|
||||
assert resp.status_code == 200
|
||||
body = resp.get_json()
|
||||
assert isinstance(body, list)
|
||||
assert any(u["username"] == "admin1" for u in body)
|
||||
assert all("password_hash" not in u for u in body)
|
||||
|
||||
|
||||
def test_list_users_forbidden_for_redteam(
|
||||
client: FlaskClient, redteam_token: str
|
||||
) -> None:
|
||||
resp = client.get("/api/users", headers=_h(redteam_token))
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
def test_list_users_forbidden_for_soc(client: FlaskClient, soc_token: str) -> None:
|
||||
resp = client.get("/api/users", headers=_h(soc_token))
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
def test_list_users_unauth(client: FlaskClient) -> None:
|
||||
resp = client.get("/api/users")
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
def test_create_user_success(client: FlaskClient, admin_token: str) -> None:
|
||||
resp = client.post(
|
||||
"/api/users",
|
||||
headers=_h(admin_token),
|
||||
json={"username": "newbie", "password": "longenough1", "role": "redteam"},
|
||||
)
|
||||
assert resp.status_code == 201
|
||||
body = resp.get_json()
|
||||
assert body["username"] == "newbie"
|
||||
assert body["role"] == "redteam"
|
||||
assert "password_hash" not in body
|
||||
|
||||
|
||||
def test_create_user_duplicate_username(
|
||||
client: FlaskClient, admin_user: User, admin_token: str
|
||||
) -> None:
|
||||
resp = client.post(
|
||||
"/api/users",
|
||||
headers=_h(admin_token),
|
||||
json={"username": "admin1", "password": "longenough1", "role": "redteam"},
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
assert "exists" in resp.get_json()["error"]
|
||||
|
||||
|
||||
def test_create_user_short_password(client: FlaskClient, admin_token: str) -> None:
|
||||
resp = client.post(
|
||||
"/api/users",
|
||||
headers=_h(admin_token),
|
||||
json={"username": "short", "password": "abc", "role": "soc"},
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
assert "8 characters" in resp.get_json()["error"]
|
||||
|
||||
|
||||
def test_create_user_invalid_role(client: FlaskClient, admin_token: str) -> None:
|
||||
resp = client.post(
|
||||
"/api/users",
|
||||
headers=_h(admin_token),
|
||||
json={"username": "x", "password": "longenough1", "role": "godmode"},
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
|
||||
def test_create_user_forbidden_for_non_admin(
|
||||
client: FlaskClient, redteam_token: str
|
||||
) -> None:
|
||||
resp = client.post(
|
||||
"/api/users",
|
||||
headers=_h(redteam_token),
|
||||
json={"username": "x", "password": "longenough1", "role": "soc"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
def test_patch_user_change_role(
|
||||
client: FlaskClient, admin_token: str, soc_user: User
|
||||
) -> None:
|
||||
resp = client.patch(
|
||||
f"/api/users/{soc_user.id}",
|
||||
headers=_h(admin_token),
|
||||
json={"role": "redteam"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.get_json()["role"] == "redteam"
|
||||
|
||||
|
||||
def test_patch_user_change_password(
|
||||
client: FlaskClient, admin_token: str, soc_user: User
|
||||
) -> None:
|
||||
resp = client.patch(
|
||||
f"/api/users/{soc_user.id}",
|
||||
headers=_h(admin_token),
|
||||
json={"password": "anotherone1"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
# New password should now allow login.
|
||||
login = client.post(
|
||||
"/api/auth/login", json={"username": "soc1", "password": "anotherone1"}
|
||||
)
|
||||
assert login.status_code == 200
|
||||
|
||||
|
||||
def test_patch_user_short_password(
|
||||
client: FlaskClient, admin_token: str, soc_user: User
|
||||
) -> None:
|
||||
resp = client.patch(
|
||||
f"/api/users/{soc_user.id}",
|
||||
headers=_h(admin_token),
|
||||
json={"password": "no"},
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
|
||||
def test_patch_user_404(client: FlaskClient, admin_token: str) -> None:
|
||||
resp = client.patch(
|
||||
"/api/users/9999", headers=_h(admin_token), json={"role": "soc"}
|
||||
)
|
||||
assert resp.status_code == 404
|
||||
|
||||
|
||||
def test_patch_user_forbidden_for_redteam(
|
||||
client: FlaskClient, redteam_token: str, soc_user: User
|
||||
) -> None:
|
||||
resp = client.patch(
|
||||
f"/api/users/{soc_user.id}", headers=_h(redteam_token), json={"role": "admin"}
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
def test_delete_user_success(
|
||||
client: FlaskClient, admin_token: str, soc_user: User
|
||||
) -> None:
|
||||
resp = client.delete(f"/api/users/{soc_user.id}", headers=_h(admin_token))
|
||||
assert resp.status_code == 204
|
||||
|
||||
|
||||
def test_delete_last_admin_blocked(
|
||||
client: FlaskClient, admin_user: User, admin_token: str
|
||||
) -> None:
|
||||
resp = client.delete(f"/api/users/{admin_user.id}", headers=_h(admin_token))
|
||||
assert resp.status_code == 409
|
||||
assert "last admin" in resp.get_json()["error"]
|
||||
|
||||
|
||||
def test_delete_admin_when_other_admin_exists(
|
||||
client: FlaskClient, admin_user: User, admin_token: str
|
||||
) -> None:
|
||||
other = User(
|
||||
username="admin2",
|
||||
password_hash=hash_password("adminpass2"),
|
||||
role=UserRole.ADMIN,
|
||||
)
|
||||
db.session.add(other)
|
||||
db.session.commit()
|
||||
other_id = other.id
|
||||
|
||||
resp = client.delete(f"/api/users/{other_id}", headers=_h(admin_token))
|
||||
assert resp.status_code == 204
|
||||
|
||||
|
||||
def test_demote_last_admin_blocked(
|
||||
client: FlaskClient, admin_user: User, admin_token: str
|
||||
) -> None:
|
||||
resp = client.patch(
|
||||
f"/api/users/{admin_user.id}",
|
||||
headers=_h(admin_token),
|
||||
json={"role": "redteam"},
|
||||
)
|
||||
assert resp.status_code == 409
|
||||
|
||||
|
||||
def test_delete_user_404(client: FlaskClient, admin_token: str) -> None:
|
||||
resp = client.delete("/api/users/9999", headers=_h(admin_token))
|
||||
assert resp.status_code == 404
|
||||
|
||||
|
||||
def test_delete_user_forbidden_for_soc(
|
||||
client: FlaskClient, soc_token: str, redteam_user: User
|
||||
) -> None:
|
||||
resp = client.delete(f"/api/users/{redteam_user.id}", headers=_h(soc_token))
|
||||
assert resp.status_code == 403
|
||||
Reference in New Issue
Block a user