diff --git a/backend/src/mimic/api/__init__.py b/backend/src/mimic/api/__init__.py new file mode 100644 index 0000000..be1f66d --- /dev/null +++ b/backend/src/mimic/api/__init__.py @@ -0,0 +1,17 @@ +"""Flask blueprints (REST API).""" + +from __future__ import annotations + +from flask import Flask + +from mimic.api.engagements import bp as engagements_bp +from mimic.api.hosts import bp as hosts_bp +from mimic.api.scenarios import bp as scenarios_bp +from mimic.api.ttps import bp as ttps_bp + + +def register_blueprints(app: Flask) -> None: + app.register_blueprint(engagements_bp, url_prefix="/api/v1/engagements") + app.register_blueprint(hosts_bp, url_prefix="/api/v1") + app.register_blueprint(ttps_bp, url_prefix="/api/v1/library/ttps") + app.register_blueprint(scenarios_bp, url_prefix="/api/v1") diff --git a/backend/src/mimic/api/_helpers.py b/backend/src/mimic/api/_helpers.py new file mode 100644 index 0000000..6d296ce --- /dev/null +++ b/backend/src/mimic/api/_helpers.py @@ -0,0 +1,29 @@ +"""Shared blueprint helpers (pydantic validation, error mapping).""" + +from __future__ import annotations + +from uuid import UUID + +from flask import Response, abort, jsonify, request +from pydantic import BaseModel, ValidationError + + +def parse_body[T: BaseModel](model: type[T]) -> T: + payload = request.get_json(silent=True) + if payload is None: + abort(400, description="JSON body required") + try: + return model.model_validate(payload) + except ValidationError as exc: + abort(422, description=exc.errors()) + + +def jsonify_model(model: BaseModel, status: int = 200) -> tuple[Response, int]: + return jsonify(model.model_dump(mode="json")), status + + +def parse_uuid(value: str, *, field: str = "id") -> UUID: + try: + return UUID(value) + except (ValueError, TypeError): + abort(404, description=f"invalid {field}") diff --git a/backend/src/mimic/api/engagements.py b/backend/src/mimic/api/engagements.py new file mode 100644 index 0000000..0df267c --- /dev/null +++ b/backend/src/mimic/api/engagements.py @@ -0,0 +1,73 @@ +"""Engagement CRUD endpoints (flat, sprint 0).""" + +from __future__ import annotations + +from flask import Blueprint, abort, jsonify +from sqlalchemy import select + +from mimic.api._helpers import jsonify_model, parse_body, parse_uuid +from mimic.db.models import Engagement +from mimic.db.types import EngagementStatus +from mimic.extensions import db +from mimic.rbac import Permission, require_perm +from mimic.schemas import EngagementCreate, EngagementRead, EngagementUpdate + +bp = Blueprint("engagements", __name__) + + +@bp.get("") +@require_perm(Permission.ENGAGEMENT_READ) +def list_engagements(): + stmt = select(Engagement).order_by(Engagement.created_at.desc()) + rows = db.session.execute(stmt).scalars().all() + return jsonify([EngagementRead.model_validate(row).model_dump(mode="json") for row in rows]) + + +@bp.post("") +@require_perm(Permission.ENGAGEMENT_CREATE) +def create_engagement(): + payload = parse_body(EngagementCreate) + engagement = Engagement( + client_name=payload.client_name, + description=payload.description, + c2_type=payload.c2_type, + start_date=payload.start_date, + end_date=payload.end_date, + status=EngagementStatus.DRAFT, + ) + db.session.add(engagement) + db.session.commit() + return jsonify_model(EngagementRead.model_validate(engagement), status=201) + + +@bp.get("/") +@require_perm(Permission.ENGAGEMENT_READ) +def get_engagement(eid: str): + engagement = db.session.get(Engagement, parse_uuid(eid)) + if engagement is None: + abort(404) + return jsonify_model(EngagementRead.model_validate(engagement)) + + +@bp.put("/") +@require_perm(Permission.ENGAGEMENT_UPDATE) +def update_engagement(eid: str): + engagement = db.session.get(Engagement, parse_uuid(eid)) + if engagement is None: + abort(404) + payload = parse_body(EngagementUpdate) + for field, value in payload.model_dump(exclude_unset=True).items(): + setattr(engagement, field, value) + db.session.commit() + return jsonify_model(EngagementRead.model_validate(engagement)) + + +@bp.delete("/") +@require_perm(Permission.ENGAGEMENT_DELETE) +def delete_engagement(eid: str): + engagement = db.session.get(Engagement, parse_uuid(eid)) + if engagement is None: + abort(404) + engagement.status = EngagementStatus.ARCHIVED + db.session.commit() + return "", 204 diff --git a/backend/src/mimic/api/hosts.py b/backend/src/mimic/api/hosts.py new file mode 100644 index 0000000..0594862 --- /dev/null +++ b/backend/src/mimic/api/hosts.py @@ -0,0 +1,76 @@ +"""Host CRUD endpoints (scoped under an engagement).""" + +from __future__ import annotations + +from flask import Blueprint, abort, jsonify +from sqlalchemy import select + +from mimic.api._helpers import jsonify_model, parse_body, parse_uuid +from mimic.db.models import Engagement, Host +from mimic.db.types import HostStatus +from mimic.extensions import db +from mimic.rbac import Permission, require_perm +from mimic.schemas import HostCreate, HostRead, HostUpdate + +bp = Blueprint("hosts", __name__) + + +def _engagement_or_404(eid: str) -> Engagement: + engagement = db.session.get(Engagement, parse_uuid(eid, field="engagement id")) + if engagement is None: + abort(404) + return engagement + + +@bp.get("/engagements//hosts") +@require_perm(Permission.HOST_CRUD) +def list_hosts(eid: str): + engagement = _engagement_or_404(eid) + stmt = select(Host).where(Host.engagement_id == engagement.id).order_by(Host.hostname) + rows = db.session.execute(stmt).scalars().all() + return jsonify([HostRead.model_validate(row).model_dump(mode="json") for row in rows]) + + +@bp.post("/engagements//hosts") +@require_perm(Permission.HOST_CRUD) +def create_host(eid: str): + engagement = _engagement_or_404(eid) + payload = parse_body(HostCreate) + host = Host( + engagement_id=engagement.id, + hostname=payload.hostname, + ip=payload.ip, + os=payload.os, + c2_session_id=payload.c2_session_id, + c2_type=payload.c2_type, + status=HostStatus.UNKNOWN, + ) + db.session.add(host) + db.session.commit() + return jsonify_model(HostRead.model_validate(host), status=201) + + +@bp.put("/engagements//hosts/") +@require_perm(Permission.HOST_CRUD) +def update_host(eid: str, hid: str): + engagement = _engagement_or_404(eid) + host = db.session.get(Host, parse_uuid(hid, field="host id")) + if host is None or host.engagement_id != engagement.id: + abort(404) + payload = parse_body(HostUpdate) + for field, value in payload.model_dump(exclude_unset=True).items(): + setattr(host, field, value) + db.session.commit() + return jsonify_model(HostRead.model_validate(host)) + + +@bp.delete("/engagements//hosts/") +@require_perm(Permission.HOST_CRUD) +def delete_host(eid: str, hid: str): + engagement = _engagement_or_404(eid) + host = db.session.get(Host, parse_uuid(hid, field="host id")) + if host is None or host.engagement_id != engagement.id: + abort(404) + db.session.delete(host) + db.session.commit() + return "", 204 diff --git a/backend/src/mimic/api/scenarios.py b/backend/src/mimic/api/scenarios.py new file mode 100644 index 0000000..e95de70 --- /dev/null +++ b/backend/src/mimic/api/scenarios.py @@ -0,0 +1,138 @@ +"""Scenario + step CRUD (flat, no orchestration yet).""" + +from __future__ import annotations + +from flask import Blueprint, abort, jsonify +from sqlalchemy import select + +from mimic.api._helpers import jsonify_model, parse_body, parse_uuid +from mimic.db.models import Engagement, Host, Scenario, ScenarioStep, Ttp +from mimic.extensions import db +from mimic.rbac import Permission, require_perm +from mimic.schemas import ( + ScenarioCreate, + ScenarioRead, + ScenarioStepCreate, + ScenarioStepRead, + ScenarioUpdate, +) + +bp = Blueprint("scenarios", __name__) + + +def _engagement_or_404(eid: str) -> Engagement: + engagement = db.session.get(Engagement, parse_uuid(eid, field="engagement id")) + if engagement is None: + abort(404) + return engagement + + +def _scenario_or_404(engagement: Engagement, sid: str) -> Scenario: + scenario = db.session.get(Scenario, parse_uuid(sid, field="scenario id")) + if scenario is None or scenario.engagement_id != engagement.id: + abort(404) + return scenario + + +def _validate_step_consistency(scenario: Scenario, ttp_id, host_id) -> None: + ttp = db.session.get(Ttp, ttp_id) + host = db.session.get(Host, host_id) + if ttp is None or host is None: + abort(404, description="ttp or host not found") + if host.engagement_id != scenario.engagement_id: + abort(400, description="host does not belong to the engagement") + if host.c2_type != scenario.c2_type: + abort(400, description="host.c2_type does not match scenario.c2_type") + + +@bp.get("/engagements//scenarios") +@require_perm(Permission.SCENARIO_CRUD) +def list_scenarios(eid: str): + engagement = _engagement_or_404(eid) + stmt = ( + select(Scenario) + .where(Scenario.engagement_id == engagement.id) + .order_by(Scenario.created_at.desc()) + ) + rows = db.session.execute(stmt).scalars().all() + return jsonify([ScenarioRead.model_validate(row).model_dump(mode="json") for row in rows]) + + +@bp.post("/engagements//scenarios") +@require_perm(Permission.SCENARIO_CRUD) +def create_scenario(eid: str): + engagement = _engagement_or_404(eid) + payload = parse_body(ScenarioCreate) + scenario = Scenario( + engagement_id=engagement.id, + name=payload.name, + description=payload.description, + c2_type=payload.c2_type, + version=payload.version, + ) + db.session.add(scenario) + db.session.flush() + for step in payload.steps: + _validate_step_consistency(scenario, step.ttp_id, step.host_id) + db.session.add( + ScenarioStep( + scenario_id=scenario.id, + ttp_id=step.ttp_id, + host_id=step.host_id, + order_idx=step.order_idx, + params_override_json=step.params_override_json, + delay_after_ms=step.delay_after_ms, + ) + ) + db.session.commit() + return jsonify_model(ScenarioRead.model_validate(scenario), status=201) + + +@bp.get("/engagements//scenarios/") +@require_perm(Permission.SCENARIO_CRUD) +def get_scenario(eid: str, sid: str): + engagement = _engagement_or_404(eid) + scenario = _scenario_or_404(engagement, sid) + return jsonify_model(ScenarioRead.model_validate(scenario)) + + +@bp.put("/engagements//scenarios/") +@require_perm(Permission.SCENARIO_CRUD) +def update_scenario(eid: str, sid: str): + engagement = _engagement_or_404(eid) + scenario = _scenario_or_404(engagement, sid) + payload = parse_body(ScenarioUpdate) + for field, value in payload.model_dump(exclude_unset=True).items(): + setattr(scenario, field, value) + db.session.commit() + return jsonify_model(ScenarioRead.model_validate(scenario)) + + +@bp.delete("/engagements//scenarios/") +@require_perm(Permission.SCENARIO_CRUD) +def delete_scenario(eid: str, sid: str): + engagement = _engagement_or_404(eid) + scenario = _scenario_or_404(engagement, sid) + db.session.delete(scenario) + db.session.commit() + return "", 204 + + +@bp.post("/engagements//scenarios//steps") +@require_perm(Permission.SCENARIO_CRUD) +def add_step(eid: str, sid: str): + engagement = _engagement_or_404(eid) + scenario = _scenario_or_404(engagement, sid) + payload = parse_body(ScenarioStepCreate) + _validate_step_consistency(scenario, payload.ttp_id, payload.host_id) + step = ScenarioStep( + scenario_id=scenario.id, + ttp_id=payload.ttp_id, + host_id=payload.host_id, + order_idx=payload.order_idx, + params_override_json=payload.params_override_json, + delay_after_ms=payload.delay_after_ms, + ) + db.session.add(step) + db.session.commit() + return jsonify_model(ScenarioStepRead.model_validate(step), status=201) diff --git a/backend/src/mimic/api/ttps.py b/backend/src/mimic/api/ttps.py new file mode 100644 index 0000000..0e33e9d --- /dev/null +++ b/backend/src/mimic/api/ttps.py @@ -0,0 +1,80 @@ +"""TTP library CRUD endpoints.""" + +from __future__ import annotations + +from flask import Blueprint, abort, jsonify +from sqlalchemy import select + +from mimic.api._helpers import jsonify_model, parse_body, parse_uuid +from mimic.db.models import Ttp +from mimic.extensions import db +from mimic.rbac import Permission, require_perm +from mimic.schemas import TtpCreate, TtpRead, TtpUpdate + +bp = Blueprint("ttps", __name__) + + +@bp.get("") +@require_perm(Permission.TTP_READ) +def list_ttps(): + stmt = select(Ttp).order_by(Ttp.created_at.desc()) + rows = db.session.execute(stmt).scalars().all() + return jsonify([TtpRead.model_validate(row).model_dump(mode="json") for row in rows]) + + +@bp.post("") +@require_perm(Permission.TTP_DRAFT) +def create_ttp(): + payload = parse_body(TtpCreate) + ttp = Ttp(**payload.model_dump()) + db.session.add(ttp) + db.session.commit() + return jsonify_model(TtpRead.model_validate(ttp), status=201) + + +@bp.get("/") +@require_perm(Permission.TTP_READ) +def get_ttp(tid: str): + ttp = db.session.get(Ttp, parse_uuid(tid, field="ttp id")) + if ttp is None: + abort(404) + return jsonify_model(TtpRead.model_validate(ttp)) + + +@bp.put("/") +@require_perm(Permission.TTP_DRAFT) +def update_ttp(tid: str): + ttp = db.session.get(Ttp, parse_uuid(tid, field="ttp id")) + if ttp is None: + abort(404) + payload = parse_body(TtpUpdate) + data = payload.model_dump(exclude_unset=True) + publish_flag = data.pop("is_published", None) + for field, value in data.items(): + setattr(ttp, field, value) + if publish_flag is not None: + # Promotion is a lead-only privilege. Decorator already gates draft + # edits; promotion gets a second-tier check at the call site. + _ensure_promote_perm() + ttp.is_published = publish_flag + db.session.commit() + return jsonify_model(TtpRead.model_validate(ttp)) + + +@bp.delete("/") +@require_perm(Permission.TTP_DRAFT) +def delete_ttp(tid: str): + ttp = db.session.get(Ttp, parse_uuid(tid, field="ttp id")) + if ttp is None: + abort(404) + db.session.delete(ttp) + db.session.commit() + return "", 204 + + +def _ensure_promote_perm() -> None: + from flask_login import current_user # noqa: PLC0415 (lazy: scope-local user only) + + perms: frozenset[Permission] = getattr(current_user, "permissions", frozenset()) + if Permission.TTP_PROMOTE not in perms: + abort(403) diff --git a/backend/src/mimic/app.py b/backend/src/mimic/app.py new file mode 100644 index 0000000..9e455e9 --- /dev/null +++ b/backend/src/mimic/app.py @@ -0,0 +1,50 @@ +"""Flask application factory.""" + +from __future__ import annotations + +from datetime import timedelta + +from flask import Flask, jsonify +from flask.typing import ResponseReturnValue + +from mimic.api import register_blueprints +from mimic.auth.identity import load_user +from mimic.config import Settings, get_settings +from mimic.extensions import db, login_manager, migrate, socketio +from mimic.logging import configure_logging + + +def create_app(settings: Settings | None = None) -> Flask: + settings = settings or get_settings() + configure_logging(settings.log_level, as_json=settings.log_json) + + app = Flask(__name__) + app.config.update( + SECRET_KEY=settings.secret_key.get_secret_value(), + SQLALCHEMY_DATABASE_URI=settings.database_url, + SQLALCHEMY_TRACK_MODIFICATIONS=False, + SESSION_COOKIE_SECURE=settings.session_cookie_secure, + SESSION_COOKIE_SAMESITE=settings.session_cookie_samesite, + SESSION_COOKIE_HTTPONLY=True, + PERMANENT_SESSION_LIFETIME=timedelta(minutes=settings.session_lifetime_minutes), + MIMIC_SETTINGS=settings, + ) + + db.init_app(app) + migrate.init_app(app, db, directory="src/mimic/db/migrations") + login_manager.init_app(app) + login_manager.user_loader(load_user) # type: ignore[arg-type] + + socketio.init_app( + app, + cors_allowed_origins=settings.cors_origins or "*", + async_mode="gevent", + ) + + register_blueprints(app) + + @app.get("/healthz") + def healthz() -> ResponseReturnValue: + return jsonify(status="ok"), 200 + + return app diff --git a/backend/src/mimic/audit/__init__.py b/backend/src/mimic/audit/__init__.py new file mode 100644 index 0000000..149cef2 --- /dev/null +++ b/backend/src/mimic/audit/__init__.py @@ -0,0 +1,5 @@ +"""Append-only audit log writer (NF-AUDIT).""" + +from mimic.audit.log import AuditWriter, audit_hash, write_audit + +__all__ = ["AuditWriter", "audit_hash", "write_audit"] diff --git a/backend/src/mimic/audit/log.py b/backend/src/mimic/audit/log.py new file mode 100644 index 0000000..0a14224 --- /dev/null +++ b/backend/src/mimic/audit/log.py @@ -0,0 +1,102 @@ +"""Hash-chained append-only audit writer. + +Sprint 0 fills `prev_hash` / `row_hash` at insert; verifier (v2) walks the +chain. The SQL-level guard (write-only role on `audit_log`) is provisioned by +the deploy playbook against the Postgres role `mimic_audit_writer`. +""" + +from __future__ import annotations + +import hashlib +import json +from datetime import UTC, datetime +from typing import Any +from uuid import UUID + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from mimic.db.models import AuditLog + + +def _canonical(payload: dict[str, Any]) -> str: + return json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str) + + +def audit_hash( + *, + prev_hash: str | None, + ts: datetime, + actor_id: UUID | None, + action: str, + resource_type: str, + resource_id: str | None, + metadata: dict[str, Any], +) -> str: + """SHA-256 of the canonical record (used both at write and at verify).""" + payload = { + "prev_hash": prev_hash or "", + "ts": ts.isoformat(), + "actor_id": str(actor_id) if actor_id else "", + "action": action, + "resource_type": resource_type, + "resource_id": resource_id or "", + "metadata": metadata, + } + return hashlib.sha256(_canonical(payload).encode("utf-8")).hexdigest() + + +class AuditWriter: + """Append a single audit entry, chaining on the latest row hash.""" + + def __init__(self, session: Session) -> None: + self._session = session + + def write( + self, + *, + action: str, + resource_type: str, + actor_id: UUID | None = None, + resource_id: str | None = None, + metadata: dict[str, Any] | None = None, + source_ip: str | None = None, + user_agent: str | None = None, + comment: str | None = None, + ) -> AuditLog: + metadata = metadata or {} + ts = datetime.now(tz=UTC) + prev_hash = self._latest_hash() + row_hash = audit_hash( + prev_hash=prev_hash, + ts=ts, + actor_id=actor_id, + action=action, + resource_type=resource_type, + resource_id=resource_id, + metadata=metadata, + ) + entry = AuditLog( + ts=ts, + actor_id=actor_id, + action=action, + resource_type=resource_type, + resource_id=resource_id, + metadata_json=metadata, + prev_hash=prev_hash, + row_hash=row_hash, + source_ip=source_ip, + user_agent=user_agent, + comment=comment, + ) + self._session.add(entry) + self._session.flush() + return entry + + def _latest_hash(self) -> str | None: + stmt = select(AuditLog.row_hash).order_by(AuditLog.ts.desc()).limit(1) + return self._session.execute(stmt).scalar_one_or_none() + + +def write_audit(session: Session, **kwargs: Any) -> AuditLog: + return AuditWriter(session).write(**kwargs) diff --git a/backend/src/mimic/cli/__init__.py b/backend/src/mimic/cli/__init__.py new file mode 100644 index 0000000..067fe24 --- /dev/null +++ b/backend/src/mimic/cli/__init__.py @@ -0,0 +1,21 @@ +"""`mimic-cli` command-line interface (click).""" + +from __future__ import annotations + +import click + +from mimic.cli.db import db_group +from mimic.cli.user import user_group + + +@click.group() +def cli() -> None: + """Mimic command-line interface.""" + + +cli.add_command(user_group, name="user") +cli.add_command(db_group, name="db") + + +if __name__ == "__main__": + cli() diff --git a/backend/src/mimic/cli/db.py b/backend/src/mimic/cli/db.py new file mode 100644 index 0000000..2c0c233 --- /dev/null +++ b/backend/src/mimic/cli/db.py @@ -0,0 +1,71 @@ +"""Database CLI: dump / restore stubs (R-O1).""" + +from __future__ import annotations + +import shlex +import subprocess +from pathlib import Path +from urllib.parse import urlparse + +import click + +from mimic.config import get_settings + + +@click.group(help="Database operations (manual dump/restore per R-O1).") +def db_group() -> None: ... + + +def _parse_dsn(dsn: str) -> tuple[str, str, str, str, str]: + parsed = urlparse(dsn) + return ( + parsed.hostname or "localhost", + str(parsed.port or 5432), + parsed.username or "", + parsed.password or "", + (parsed.path or "/").lstrip("/"), + ) + + +@db_group.command("dump") +@click.option("--out", "out_path", type=click.Path(dir_okay=False, path_type=Path), required=True) +def dump(out_path: Path) -> None: + """Manual `pg_dump` of the configured DATABASE_URL.""" + settings = get_settings() + host, port, user, password, dbname = _parse_dsn(settings.database_url) + out_path.parent.mkdir(parents=True, exist_ok=True) + cmd = [ + "pg_dump", + "--format=custom", + f"--host={host}", + f"--port={port}", + f"--username={user}", + f"--dbname={dbname}", + f"--file={out_path}", + ] + env = {"PGPASSWORD": password} if password else None + click.echo(f"running: {shlex.join(cmd)}") + subprocess.run(cmd, check=True, env=env) # noqa: S603 + click.echo(f"dump written to {out_path}") + + +@db_group.command("restore") +@click.option("--file", "in_path", type=click.Path(exists=True, path_type=Path), required=True) +def restore(in_path: Path) -> None: + """Manual `pg_restore` from a dump file.""" + settings = get_settings() + host, port, user, password, dbname = _parse_dsn(settings.database_url) + cmd = [ + "pg_restore", + "--clean", + "--if-exists", + f"--host={host}", + f"--port={port}", + f"--username={user}", + f"--dbname={dbname}", + str(in_path), + ] + env = {"PGPASSWORD": password} if password else None + click.echo(f"running: {shlex.join(cmd)}") + subprocess.run(cmd, check=True, env=env) # noqa: S603 + click.echo("restore complete") diff --git a/backend/src/mimic/cli/user.py b/backend/src/mimic/cli/user.py new file mode 100644 index 0000000..5c3ef54 --- /dev/null +++ b/backend/src/mimic/cli/user.py @@ -0,0 +1,52 @@ +"""User-related CLI commands.""" + +from __future__ import annotations + +import click + +from mimic.app import create_app +from mimic.auth.password import hash_password +from mimic.db.models import Group, User, UserGroup +from mimic.db.types import UserType +from mimic.extensions import db +from mimic.rbac.matrix import GroupName + + +@click.group(help="Manage Mimic user accounts.") +def user_group() -> None: ... + + +@user_group.command("create") +@click.option("--email", required=True) +@click.option( + "--type", + "user_type", + type=click.Choice([u.value for u in UserType]), + required=True, +) +@click.option("--password", prompt=True, hide_input=True, confirmation_prompt=True) +@click.option("--display-name", default=None) +def create_user(email: str, user_type: str, password: str, display_name: str | None) -> None: + """Create a local user (sprint 0: rt_operator or rt_lead).""" + app = create_app() + with app.app_context(): + user = User( + email=email, + display_name=display_name, + type=UserType(user_type), + local_password_hash=hash_password(password), + ) + db.session.add(user) + db.session.flush() + + group_name = ( + GroupName.RT_LEAD if user.type is UserType.RT_LEAD else GroupName.RT_OPERATOR + ) + group = db.session.query(Group).filter_by(name=group_name.value).first() + if group is None: + raise click.ClickException( + f"group {group_name.value} not seeded — run alembic upgrade head first" + ) + db.session.add(UserGroup(user_id=user.id, group_id=group.id, engagement_id=None)) + db.session.commit() + click.echo(f"created user {email} ({user.id}) in group {group_name.value}") diff --git a/backend/src/mimic/schemas/__init__.py b/backend/src/mimic/schemas/__init__.py new file mode 100644 index 0000000..7e84f79 --- /dev/null +++ b/backend/src/mimic/schemas/__init__.py @@ -0,0 +1,33 @@ +"""Pydantic 2 request/response DTOs.""" + +from mimic.schemas.engagement import ( + EngagementCreate, + EngagementRead, + EngagementUpdate, +) +from mimic.schemas.host import HostCreate, HostRead, HostUpdate +from mimic.schemas.scenario import ( + ScenarioCreate, + ScenarioRead, + ScenarioStepCreate, + ScenarioStepRead, + ScenarioUpdate, +) +from mimic.schemas.ttp import TtpCreate, TtpRead, TtpUpdate + +__all__ = [ + "EngagementCreate", + "EngagementRead", + "EngagementUpdate", + "HostCreate", + "HostRead", + "HostUpdate", + "ScenarioCreate", + "ScenarioRead", + "ScenarioStepCreate", + "ScenarioStepRead", + "ScenarioUpdate", + "TtpCreate", + "TtpRead", + "TtpUpdate", +] diff --git a/backend/src/mimic/schemas/engagement.py b/backend/src/mimic/schemas/engagement.py new file mode 100644 index 0000000..17d3a47 --- /dev/null +++ b/backend/src/mimic/schemas/engagement.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from datetime import date +from uuid import UUID + +from pydantic import BaseModel, ConfigDict, Field + +from mimic.db.types import C2Type, EngagementStatus + + +class EngagementBase(BaseModel): + client_name: str = Field(min_length=1, max_length=255) + description: str | None = Field(default=None, max_length=1024) + c2_type: C2Type = C2Type.MYTHIC + start_date: date | None = None + end_date: date | None = None + + +class EngagementCreate(EngagementBase): + pass + + +class EngagementUpdate(BaseModel): + client_name: str | None = Field(default=None, min_length=1, max_length=255) + description: str | None = Field(default=None, max_length=1024) + status: EngagementStatus | None = None + c2_type: C2Type | None = None + start_date: date | None = None + end_date: date | None = None + + +class EngagementRead(EngagementBase): + model_config = ConfigDict(from_attributes=True) + + id: UUID + status: EngagementStatus diff --git a/backend/src/mimic/schemas/host.py b/backend/src/mimic/schemas/host.py new file mode 100644 index 0000000..128748b --- /dev/null +++ b/backend/src/mimic/schemas/host.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from uuid import UUID + +from pydantic import BaseModel, ConfigDict, Field + +from mimic.db.types import C2Type, HostStatus + + +class HostBase(BaseModel): + hostname: str = Field(min_length=1, max_length=255) + ip: str | None = Field(default=None, max_length=64) + os: str | None = Field(default=None, max_length=128) + c2_session_id: str | None = Field(default=None, max_length=128) + c2_type: C2Type = C2Type.MYTHIC + + +class HostCreate(HostBase): + pass + + +class HostUpdate(BaseModel): + hostname: str | None = Field(default=None, min_length=1, max_length=255) + ip: str | None = Field(default=None, max_length=64) + os: str | None = Field(default=None, max_length=128) + c2_session_id: str | None = Field(default=None, max_length=128) + c2_type: C2Type | None = None + status: HostStatus | None = None + + +class HostRead(HostBase): + model_config = ConfigDict(from_attributes=True) + + id: UUID + engagement_id: UUID + status: HostStatus diff --git a/backend/src/mimic/schemas/scenario.py b/backend/src/mimic/schemas/scenario.py new file mode 100644 index 0000000..52752b3 --- /dev/null +++ b/backend/src/mimic/schemas/scenario.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +from uuid import UUID + +from pydantic import BaseModel, ConfigDict, Field + +from mimic.db.types import C2Type + + +class ScenarioStepBase(BaseModel): + ttp_id: UUID + host_id: UUID + order_idx: int = Field(ge=0) + params_override_json: dict = Field(default_factory=dict) + delay_after_ms: int = Field(default=0, ge=0) + + +class ScenarioStepCreate(ScenarioStepBase): + pass + + +class ScenarioStepRead(ScenarioStepBase): + model_config = ConfigDict(from_attributes=True) + + id: UUID + scenario_id: UUID + + +class ScenarioBase(BaseModel): + name: str = Field(min_length=1, max_length=255) + description: str | None = None + c2_type: C2Type + version: int = Field(default=1, ge=1) + + +class ScenarioCreate(ScenarioBase): + steps: list[ScenarioStepCreate] = Field(default_factory=list) + + +class ScenarioUpdate(BaseModel): + name: str | None = Field(default=None, min_length=1, max_length=255) + description: str | None = None + c2_type: C2Type | None = None + + +class ScenarioRead(ScenarioBase): + model_config = ConfigDict(from_attributes=True) + + id: UUID + engagement_id: UUID + steps: list[ScenarioStepRead] = Field(default_factory=list) diff --git a/backend/src/mimic/schemas/ttp.py b/backend/src/mimic/schemas/ttp.py new file mode 100644 index 0000000..f1d59db --- /dev/null +++ b/backend/src/mimic/schemas/ttp.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +from uuid import UUID + +from pydantic import BaseModel, ConfigDict, Field + +from mimic.db.types import PayloadType, TtpSource + + +class TtpBase(BaseModel): + name: str = Field(min_length=1, max_length=255) + description: str | None = None + mitre_technique: str = Field(min_length=2, max_length=16) + mitre_subtechnique: str | None = Field(default=None, max_length=16) + payload_type: PayloadType + payload_template: str = "" + params_schema_json: dict | None = None + opsec_notes: str | None = None + cleanup_command: str | None = None + is_stealth_variant: bool = False + source: TtpSource = TtpSource.CUSTOM + tags: list[str] = Field(default_factory=list) + + +class TtpCreate(TtpBase): + pass + + +class TtpUpdate(BaseModel): + name: str | None = Field(default=None, min_length=1, max_length=255) + description: str | None = None + mitre_technique: str | None = Field(default=None, min_length=2, max_length=16) + mitre_subtechnique: str | None = Field(default=None, max_length=16) + payload_type: PayloadType | None = None + payload_template: str | None = None + params_schema_json: dict | None = None + opsec_notes: str | None = None + cleanup_command: str | None = None + is_stealth_variant: bool | None = None + tags: list[str] | None = None + is_published: bool | None = None + + +class TtpRead(TtpBase): + model_config = ConfigDict(from_attributes=True) + + id: UUID + is_published: bool + current_version: int