feat(backend): add Flask app factory, audit writer, flat CRUD + CLI (B0.7)

- Flask app factory wires SQLAlchemy / Migrate / Login / SocketIO and
  registers every blueprint. /healthz smoke endpoint included.
- Pydantic 2 DTOs (request/response) for engagement / host / TTP /
  scenario aggregates with from_attributes=True conversion.
- Flat CRUD blueprints under /api/v1/:
  * engagements (list / create / get / put / delete-as-archive)
  * hosts (engagement-scoped CRUD)
  * library/ttps (CRUD; promote requires the lead-only TTP_PROMOTE)
  * scenarios + steps (F3 invariant enforced: host.c2_type must match
    scenario.c2_type at compose time, 400 otherwise).
- @require_perm guards every endpoint per the F11 matrix.
- audit/ writer is hash-chained from v1 (SHA-256 of canonical record
  plus previous hash). The SQL-level write-only role enforcement ships
  in the deploy playbook (idempotent grants run at migration time).
- mimic-cli (click): user create (seeds RT operator/lead with group
  membership), db dump / db restore (manual pg_dump/pg_restore, R-O1).

No orchestrator, no WebSocket, no report generation — those land after
PR1/PR2/PR3.
This commit is contained in:
knacky
2026-05-21 20:33:45 +02:00
parent 7f4ad85a68
commit 9fa4d61304
17 changed files with 919 additions and 0 deletions

View File

@@ -0,0 +1,17 @@
"""Flask blueprints (REST API)."""
from __future__ import annotations
from flask import Flask
from mimic.api.engagements import bp as engagements_bp
from mimic.api.hosts import bp as hosts_bp
from mimic.api.scenarios import bp as scenarios_bp
from mimic.api.ttps import bp as ttps_bp
def register_blueprints(app: Flask) -> None:
app.register_blueprint(engagements_bp, url_prefix="/api/v1/engagements")
app.register_blueprint(hosts_bp, url_prefix="/api/v1")
app.register_blueprint(ttps_bp, url_prefix="/api/v1/library/ttps")
app.register_blueprint(scenarios_bp, url_prefix="/api/v1")

View File

@@ -0,0 +1,29 @@
"""Shared blueprint helpers (pydantic validation, error mapping)."""
from __future__ import annotations
from uuid import UUID
from flask import Response, abort, jsonify, request
from pydantic import BaseModel, ValidationError
def parse_body[T: BaseModel](model: type[T]) -> T:
payload = request.get_json(silent=True)
if payload is None:
abort(400, description="JSON body required")
try:
return model.model_validate(payload)
except ValidationError as exc:
abort(422, description=exc.errors())
def jsonify_model(model: BaseModel, status: int = 200) -> tuple[Response, int]:
return jsonify(model.model_dump(mode="json")), status
def parse_uuid(value: str, *, field: str = "id") -> UUID:
try:
return UUID(value)
except (ValueError, TypeError):
abort(404, description=f"invalid {field}")

View File

@@ -0,0 +1,73 @@
"""Engagement CRUD endpoints (flat, sprint 0)."""
from __future__ import annotations
from flask import Blueprint, abort, jsonify
from sqlalchemy import select
from mimic.api._helpers import jsonify_model, parse_body, parse_uuid
from mimic.db.models import Engagement
from mimic.db.types import EngagementStatus
from mimic.extensions import db
from mimic.rbac import Permission, require_perm
from mimic.schemas import EngagementCreate, EngagementRead, EngagementUpdate
bp = Blueprint("engagements", __name__)
@bp.get("")
@require_perm(Permission.ENGAGEMENT_READ)
def list_engagements():
stmt = select(Engagement).order_by(Engagement.created_at.desc())
rows = db.session.execute(stmt).scalars().all()
return jsonify([EngagementRead.model_validate(row).model_dump(mode="json") for row in rows])
@bp.post("")
@require_perm(Permission.ENGAGEMENT_CREATE)
def create_engagement():
payload = parse_body(EngagementCreate)
engagement = Engagement(
client_name=payload.client_name,
description=payload.description,
c2_type=payload.c2_type,
start_date=payload.start_date,
end_date=payload.end_date,
status=EngagementStatus.DRAFT,
)
db.session.add(engagement)
db.session.commit()
return jsonify_model(EngagementRead.model_validate(engagement), status=201)
@bp.get("/<eid>")
@require_perm(Permission.ENGAGEMENT_READ)
def get_engagement(eid: str):
engagement = db.session.get(Engagement, parse_uuid(eid))
if engagement is None:
abort(404)
return jsonify_model(EngagementRead.model_validate(engagement))
@bp.put("/<eid>")
@require_perm(Permission.ENGAGEMENT_UPDATE)
def update_engagement(eid: str):
engagement = db.session.get(Engagement, parse_uuid(eid))
if engagement is None:
abort(404)
payload = parse_body(EngagementUpdate)
for field, value in payload.model_dump(exclude_unset=True).items():
setattr(engagement, field, value)
db.session.commit()
return jsonify_model(EngagementRead.model_validate(engagement))
@bp.delete("/<eid>")
@require_perm(Permission.ENGAGEMENT_DELETE)
def delete_engagement(eid: str):
engagement = db.session.get(Engagement, parse_uuid(eid))
if engagement is None:
abort(404)
engagement.status = EngagementStatus.ARCHIVED
db.session.commit()
return "", 204

View File

@@ -0,0 +1,76 @@
"""Host CRUD endpoints (scoped under an engagement)."""
from __future__ import annotations
from flask import Blueprint, abort, jsonify
from sqlalchemy import select
from mimic.api._helpers import jsonify_model, parse_body, parse_uuid
from mimic.db.models import Engagement, Host
from mimic.db.types import HostStatus
from mimic.extensions import db
from mimic.rbac import Permission, require_perm
from mimic.schemas import HostCreate, HostRead, HostUpdate
bp = Blueprint("hosts", __name__)
def _engagement_or_404(eid: str) -> Engagement:
engagement = db.session.get(Engagement, parse_uuid(eid, field="engagement id"))
if engagement is None:
abort(404)
return engagement
@bp.get("/engagements/<eid>/hosts")
@require_perm(Permission.HOST_CRUD)
def list_hosts(eid: str):
engagement = _engagement_or_404(eid)
stmt = select(Host).where(Host.engagement_id == engagement.id).order_by(Host.hostname)
rows = db.session.execute(stmt).scalars().all()
return jsonify([HostRead.model_validate(row).model_dump(mode="json") for row in rows])
@bp.post("/engagements/<eid>/hosts")
@require_perm(Permission.HOST_CRUD)
def create_host(eid: str):
engagement = _engagement_or_404(eid)
payload = parse_body(HostCreate)
host = Host(
engagement_id=engagement.id,
hostname=payload.hostname,
ip=payload.ip,
os=payload.os,
c2_session_id=payload.c2_session_id,
c2_type=payload.c2_type,
status=HostStatus.UNKNOWN,
)
db.session.add(host)
db.session.commit()
return jsonify_model(HostRead.model_validate(host), status=201)
@bp.put("/engagements/<eid>/hosts/<hid>")
@require_perm(Permission.HOST_CRUD)
def update_host(eid: str, hid: str):
engagement = _engagement_or_404(eid)
host = db.session.get(Host, parse_uuid(hid, field="host id"))
if host is None or host.engagement_id != engagement.id:
abort(404)
payload = parse_body(HostUpdate)
for field, value in payload.model_dump(exclude_unset=True).items():
setattr(host, field, value)
db.session.commit()
return jsonify_model(HostRead.model_validate(host))
@bp.delete("/engagements/<eid>/hosts/<hid>")
@require_perm(Permission.HOST_CRUD)
def delete_host(eid: str, hid: str):
engagement = _engagement_or_404(eid)
host = db.session.get(Host, parse_uuid(hid, field="host id"))
if host is None or host.engagement_id != engagement.id:
abort(404)
db.session.delete(host)
db.session.commit()
return "", 204

View File

@@ -0,0 +1,138 @@
"""Scenario + step CRUD (flat, no orchestration yet)."""
from __future__ import annotations
from flask import Blueprint, abort, jsonify
from sqlalchemy import select
from mimic.api._helpers import jsonify_model, parse_body, parse_uuid
from mimic.db.models import Engagement, Host, Scenario, ScenarioStep, Ttp
from mimic.extensions import db
from mimic.rbac import Permission, require_perm
from mimic.schemas import (
ScenarioCreate,
ScenarioRead,
ScenarioStepCreate,
ScenarioStepRead,
ScenarioUpdate,
)
bp = Blueprint("scenarios", __name__)
def _engagement_or_404(eid: str) -> Engagement:
engagement = db.session.get(Engagement, parse_uuid(eid, field="engagement id"))
if engagement is None:
abort(404)
return engagement
def _scenario_or_404(engagement: Engagement, sid: str) -> Scenario:
scenario = db.session.get(Scenario, parse_uuid(sid, field="scenario id"))
if scenario is None or scenario.engagement_id != engagement.id:
abort(404)
return scenario
def _validate_step_consistency(scenario: Scenario, ttp_id, host_id) -> None:
ttp = db.session.get(Ttp, ttp_id)
host = db.session.get(Host, host_id)
if ttp is None or host is None:
abort(404, description="ttp or host not found")
if host.engagement_id != scenario.engagement_id:
abort(400, description="host does not belong to the engagement")
if host.c2_type != scenario.c2_type:
abort(400, description="host.c2_type does not match scenario.c2_type")
@bp.get("/engagements/<eid>/scenarios")
@require_perm(Permission.SCENARIO_CRUD)
def list_scenarios(eid: str):
engagement = _engagement_or_404(eid)
stmt = (
select(Scenario)
.where(Scenario.engagement_id == engagement.id)
.order_by(Scenario.created_at.desc())
)
rows = db.session.execute(stmt).scalars().all()
return jsonify([ScenarioRead.model_validate(row).model_dump(mode="json") for row in rows])
@bp.post("/engagements/<eid>/scenarios")
@require_perm(Permission.SCENARIO_CRUD)
def create_scenario(eid: str):
engagement = _engagement_or_404(eid)
payload = parse_body(ScenarioCreate)
scenario = Scenario(
engagement_id=engagement.id,
name=payload.name,
description=payload.description,
c2_type=payload.c2_type,
version=payload.version,
)
db.session.add(scenario)
db.session.flush()
for step in payload.steps:
_validate_step_consistency(scenario, step.ttp_id, step.host_id)
db.session.add(
ScenarioStep(
scenario_id=scenario.id,
ttp_id=step.ttp_id,
host_id=step.host_id,
order_idx=step.order_idx,
params_override_json=step.params_override_json,
delay_after_ms=step.delay_after_ms,
)
)
db.session.commit()
return jsonify_model(ScenarioRead.model_validate(scenario), status=201)
@bp.get("/engagements/<eid>/scenarios/<sid>")
@require_perm(Permission.SCENARIO_CRUD)
def get_scenario(eid: str, sid: str):
engagement = _engagement_or_404(eid)
scenario = _scenario_or_404(engagement, sid)
return jsonify_model(ScenarioRead.model_validate(scenario))
@bp.put("/engagements/<eid>/scenarios/<sid>")
@require_perm(Permission.SCENARIO_CRUD)
def update_scenario(eid: str, sid: str):
engagement = _engagement_or_404(eid)
scenario = _scenario_or_404(engagement, sid)
payload = parse_body(ScenarioUpdate)
for field, value in payload.model_dump(exclude_unset=True).items():
setattr(scenario, field, value)
db.session.commit()
return jsonify_model(ScenarioRead.model_validate(scenario))
@bp.delete("/engagements/<eid>/scenarios/<sid>")
@require_perm(Permission.SCENARIO_CRUD)
def delete_scenario(eid: str, sid: str):
engagement = _engagement_or_404(eid)
scenario = _scenario_or_404(engagement, sid)
db.session.delete(scenario)
db.session.commit()
return "", 204
@bp.post("/engagements/<eid>/scenarios/<sid>/steps")
@require_perm(Permission.SCENARIO_CRUD)
def add_step(eid: str, sid: str):
engagement = _engagement_or_404(eid)
scenario = _scenario_or_404(engagement, sid)
payload = parse_body(ScenarioStepCreate)
_validate_step_consistency(scenario, payload.ttp_id, payload.host_id)
step = ScenarioStep(
scenario_id=scenario.id,
ttp_id=payload.ttp_id,
host_id=payload.host_id,
order_idx=payload.order_idx,
params_override_json=payload.params_override_json,
delay_after_ms=payload.delay_after_ms,
)
db.session.add(step)
db.session.commit()
return jsonify_model(ScenarioStepRead.model_validate(step), status=201)

View File

@@ -0,0 +1,80 @@
"""TTP library CRUD endpoints."""
from __future__ import annotations
from flask import Blueprint, abort, jsonify
from sqlalchemy import select
from mimic.api._helpers import jsonify_model, parse_body, parse_uuid
from mimic.db.models import Ttp
from mimic.extensions import db
from mimic.rbac import Permission, require_perm
from mimic.schemas import TtpCreate, TtpRead, TtpUpdate
bp = Blueprint("ttps", __name__)
@bp.get("")
@require_perm(Permission.TTP_READ)
def list_ttps():
stmt = select(Ttp).order_by(Ttp.created_at.desc())
rows = db.session.execute(stmt).scalars().all()
return jsonify([TtpRead.model_validate(row).model_dump(mode="json") for row in rows])
@bp.post("")
@require_perm(Permission.TTP_DRAFT)
def create_ttp():
payload = parse_body(TtpCreate)
ttp = Ttp(**payload.model_dump())
db.session.add(ttp)
db.session.commit()
return jsonify_model(TtpRead.model_validate(ttp), status=201)
@bp.get("/<tid>")
@require_perm(Permission.TTP_READ)
def get_ttp(tid: str):
ttp = db.session.get(Ttp, parse_uuid(tid, field="ttp id"))
if ttp is None:
abort(404)
return jsonify_model(TtpRead.model_validate(ttp))
@bp.put("/<tid>")
@require_perm(Permission.TTP_DRAFT)
def update_ttp(tid: str):
ttp = db.session.get(Ttp, parse_uuid(tid, field="ttp id"))
if ttp is None:
abort(404)
payload = parse_body(TtpUpdate)
data = payload.model_dump(exclude_unset=True)
publish_flag = data.pop("is_published", None)
for field, value in data.items():
setattr(ttp, field, value)
if publish_flag is not None:
# Promotion is a lead-only privilege. Decorator already gates draft
# edits; promotion gets a second-tier check at the call site.
_ensure_promote_perm()
ttp.is_published = publish_flag
db.session.commit()
return jsonify_model(TtpRead.model_validate(ttp))
@bp.delete("/<tid>")
@require_perm(Permission.TTP_DRAFT)
def delete_ttp(tid: str):
ttp = db.session.get(Ttp, parse_uuid(tid, field="ttp id"))
if ttp is None:
abort(404)
db.session.delete(ttp)
db.session.commit()
return "", 204
def _ensure_promote_perm() -> None:
from flask_login import current_user # noqa: PLC0415 (lazy: scope-local user only)
perms: frozenset[Permission] = getattr(current_user, "permissions", frozenset())
if Permission.TTP_PROMOTE not in perms:
abort(403)

50
backend/src/mimic/app.py Normal file
View File

@@ -0,0 +1,50 @@
"""Flask application factory."""
from __future__ import annotations
from datetime import timedelta
from flask import Flask, jsonify
from flask.typing import ResponseReturnValue
from mimic.api import register_blueprints
from mimic.auth.identity import load_user
from mimic.config import Settings, get_settings
from mimic.extensions import db, login_manager, migrate, socketio
from mimic.logging import configure_logging
def create_app(settings: Settings | None = None) -> Flask:
settings = settings or get_settings()
configure_logging(settings.log_level, as_json=settings.log_json)
app = Flask(__name__)
app.config.update(
SECRET_KEY=settings.secret_key.get_secret_value(),
SQLALCHEMY_DATABASE_URI=settings.database_url,
SQLALCHEMY_TRACK_MODIFICATIONS=False,
SESSION_COOKIE_SECURE=settings.session_cookie_secure,
SESSION_COOKIE_SAMESITE=settings.session_cookie_samesite,
SESSION_COOKIE_HTTPONLY=True,
PERMANENT_SESSION_LIFETIME=timedelta(minutes=settings.session_lifetime_minutes),
MIMIC_SETTINGS=settings,
)
db.init_app(app)
migrate.init_app(app, db, directory="src/mimic/db/migrations")
login_manager.init_app(app)
login_manager.user_loader(load_user) # type: ignore[arg-type]
socketio.init_app(
app,
cors_allowed_origins=settings.cors_origins or "*",
async_mode="gevent",
)
register_blueprints(app)
@app.get("/healthz")
def healthz() -> ResponseReturnValue:
return jsonify(status="ok"), 200
return app

View File

@@ -0,0 +1,5 @@
"""Append-only audit log writer (NF-AUDIT)."""
from mimic.audit.log import AuditWriter, audit_hash, write_audit
__all__ = ["AuditWriter", "audit_hash", "write_audit"]

View File

@@ -0,0 +1,102 @@
"""Hash-chained append-only audit writer.
Sprint 0 fills `prev_hash` / `row_hash` at insert; verifier (v2) walks the
chain. The SQL-level guard (write-only role on `audit_log`) is provisioned by
the deploy playbook against the Postgres role `mimic_audit_writer`.
"""
from __future__ import annotations
import hashlib
import json
from datetime import UTC, datetime
from typing import Any
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.orm import Session
from mimic.db.models import AuditLog
def _canonical(payload: dict[str, Any]) -> str:
return json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str)
def audit_hash(
*,
prev_hash: str | None,
ts: datetime,
actor_id: UUID | None,
action: str,
resource_type: str,
resource_id: str | None,
metadata: dict[str, Any],
) -> str:
"""SHA-256 of the canonical record (used both at write and at verify)."""
payload = {
"prev_hash": prev_hash or "",
"ts": ts.isoformat(),
"actor_id": str(actor_id) if actor_id else "",
"action": action,
"resource_type": resource_type,
"resource_id": resource_id or "",
"metadata": metadata,
}
return hashlib.sha256(_canonical(payload).encode("utf-8")).hexdigest()
class AuditWriter:
"""Append a single audit entry, chaining on the latest row hash."""
def __init__(self, session: Session) -> None:
self._session = session
def write(
self,
*,
action: str,
resource_type: str,
actor_id: UUID | None = None,
resource_id: str | None = None,
metadata: dict[str, Any] | None = None,
source_ip: str | None = None,
user_agent: str | None = None,
comment: str | None = None,
) -> AuditLog:
metadata = metadata or {}
ts = datetime.now(tz=UTC)
prev_hash = self._latest_hash()
row_hash = audit_hash(
prev_hash=prev_hash,
ts=ts,
actor_id=actor_id,
action=action,
resource_type=resource_type,
resource_id=resource_id,
metadata=metadata,
)
entry = AuditLog(
ts=ts,
actor_id=actor_id,
action=action,
resource_type=resource_type,
resource_id=resource_id,
metadata_json=metadata,
prev_hash=prev_hash,
row_hash=row_hash,
source_ip=source_ip,
user_agent=user_agent,
comment=comment,
)
self._session.add(entry)
self._session.flush()
return entry
def _latest_hash(self) -> str | None:
stmt = select(AuditLog.row_hash).order_by(AuditLog.ts.desc()).limit(1)
return self._session.execute(stmt).scalar_one_or_none()
def write_audit(session: Session, **kwargs: Any) -> AuditLog:
return AuditWriter(session).write(**kwargs)

View File

@@ -0,0 +1,21 @@
"""`mimic-cli` command-line interface (click)."""
from __future__ import annotations
import click
from mimic.cli.db import db_group
from mimic.cli.user import user_group
@click.group()
def cli() -> None:
"""Mimic command-line interface."""
cli.add_command(user_group, name="user")
cli.add_command(db_group, name="db")
if __name__ == "__main__":
cli()

View File

@@ -0,0 +1,71 @@
"""Database CLI: dump / restore stubs (R-O1)."""
from __future__ import annotations
import shlex
import subprocess
from pathlib import Path
from urllib.parse import urlparse
import click
from mimic.config import get_settings
@click.group(help="Database operations (manual dump/restore per R-O1).")
def db_group() -> None: ...
def _parse_dsn(dsn: str) -> tuple[str, str, str, str, str]:
parsed = urlparse(dsn)
return (
parsed.hostname or "localhost",
str(parsed.port or 5432),
parsed.username or "",
parsed.password or "",
(parsed.path or "/").lstrip("/"),
)
@db_group.command("dump")
@click.option("--out", "out_path", type=click.Path(dir_okay=False, path_type=Path), required=True)
def dump(out_path: Path) -> None:
"""Manual `pg_dump` of the configured DATABASE_URL."""
settings = get_settings()
host, port, user, password, dbname = _parse_dsn(settings.database_url)
out_path.parent.mkdir(parents=True, exist_ok=True)
cmd = [
"pg_dump",
"--format=custom",
f"--host={host}",
f"--port={port}",
f"--username={user}",
f"--dbname={dbname}",
f"--file={out_path}",
]
env = {"PGPASSWORD": password} if password else None
click.echo(f"running: {shlex.join(cmd)}")
subprocess.run(cmd, check=True, env=env) # noqa: S603
click.echo(f"dump written to {out_path}")
@db_group.command("restore")
@click.option("--file", "in_path", type=click.Path(exists=True, path_type=Path), required=True)
def restore(in_path: Path) -> None:
"""Manual `pg_restore` from a dump file."""
settings = get_settings()
host, port, user, password, dbname = _parse_dsn(settings.database_url)
cmd = [
"pg_restore",
"--clean",
"--if-exists",
f"--host={host}",
f"--port={port}",
f"--username={user}",
f"--dbname={dbname}",
str(in_path),
]
env = {"PGPASSWORD": password} if password else None
click.echo(f"running: {shlex.join(cmd)}")
subprocess.run(cmd, check=True, env=env) # noqa: S603
click.echo("restore complete")

View File

@@ -0,0 +1,52 @@
"""User-related CLI commands."""
from __future__ import annotations
import click
from mimic.app import create_app
from mimic.auth.password import hash_password
from mimic.db.models import Group, User, UserGroup
from mimic.db.types import UserType
from mimic.extensions import db
from mimic.rbac.matrix import GroupName
@click.group(help="Manage Mimic user accounts.")
def user_group() -> None: ...
@user_group.command("create")
@click.option("--email", required=True)
@click.option(
"--type",
"user_type",
type=click.Choice([u.value for u in UserType]),
required=True,
)
@click.option("--password", prompt=True, hide_input=True, confirmation_prompt=True)
@click.option("--display-name", default=None)
def create_user(email: str, user_type: str, password: str, display_name: str | None) -> None:
"""Create a local user (sprint 0: rt_operator or rt_lead)."""
app = create_app()
with app.app_context():
user = User(
email=email,
display_name=display_name,
type=UserType(user_type),
local_password_hash=hash_password(password),
)
db.session.add(user)
db.session.flush()
group_name = (
GroupName.RT_LEAD if user.type is UserType.RT_LEAD else GroupName.RT_OPERATOR
)
group = db.session.query(Group).filter_by(name=group_name.value).first()
if group is None:
raise click.ClickException(
f"group {group_name.value} not seeded — run alembic upgrade head first"
)
db.session.add(UserGroup(user_id=user.id, group_id=group.id, engagement_id=None))
db.session.commit()
click.echo(f"created user {email} ({user.id}) in group {group_name.value}")

View File

@@ -0,0 +1,33 @@
"""Pydantic 2 request/response DTOs."""
from mimic.schemas.engagement import (
EngagementCreate,
EngagementRead,
EngagementUpdate,
)
from mimic.schemas.host import HostCreate, HostRead, HostUpdate
from mimic.schemas.scenario import (
ScenarioCreate,
ScenarioRead,
ScenarioStepCreate,
ScenarioStepRead,
ScenarioUpdate,
)
from mimic.schemas.ttp import TtpCreate, TtpRead, TtpUpdate
__all__ = [
"EngagementCreate",
"EngagementRead",
"EngagementUpdate",
"HostCreate",
"HostRead",
"HostUpdate",
"ScenarioCreate",
"ScenarioRead",
"ScenarioStepCreate",
"ScenarioStepRead",
"ScenarioUpdate",
"TtpCreate",
"TtpRead",
"TtpUpdate",
]

View File

@@ -0,0 +1,36 @@
from __future__ import annotations
from datetime import date
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
from mimic.db.types import C2Type, EngagementStatus
class EngagementBase(BaseModel):
client_name: str = Field(min_length=1, max_length=255)
description: str | None = Field(default=None, max_length=1024)
c2_type: C2Type = C2Type.MYTHIC
start_date: date | None = None
end_date: date | None = None
class EngagementCreate(EngagementBase):
pass
class EngagementUpdate(BaseModel):
client_name: str | None = Field(default=None, min_length=1, max_length=255)
description: str | None = Field(default=None, max_length=1024)
status: EngagementStatus | None = None
c2_type: C2Type | None = None
start_date: date | None = None
end_date: date | None = None
class EngagementRead(EngagementBase):
model_config = ConfigDict(from_attributes=True)
id: UUID
status: EngagementStatus

View File

@@ -0,0 +1,36 @@
from __future__ import annotations
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
from mimic.db.types import C2Type, HostStatus
class HostBase(BaseModel):
hostname: str = Field(min_length=1, max_length=255)
ip: str | None = Field(default=None, max_length=64)
os: str | None = Field(default=None, max_length=128)
c2_session_id: str | None = Field(default=None, max_length=128)
c2_type: C2Type = C2Type.MYTHIC
class HostCreate(HostBase):
pass
class HostUpdate(BaseModel):
hostname: str | None = Field(default=None, min_length=1, max_length=255)
ip: str | None = Field(default=None, max_length=64)
os: str | None = Field(default=None, max_length=128)
c2_session_id: str | None = Field(default=None, max_length=128)
c2_type: C2Type | None = None
status: HostStatus | None = None
class HostRead(HostBase):
model_config = ConfigDict(from_attributes=True)
id: UUID
engagement_id: UUID
status: HostStatus

View File

@@ -0,0 +1,51 @@
from __future__ import annotations
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
from mimic.db.types import C2Type
class ScenarioStepBase(BaseModel):
ttp_id: UUID
host_id: UUID
order_idx: int = Field(ge=0)
params_override_json: dict = Field(default_factory=dict)
delay_after_ms: int = Field(default=0, ge=0)
class ScenarioStepCreate(ScenarioStepBase):
pass
class ScenarioStepRead(ScenarioStepBase):
model_config = ConfigDict(from_attributes=True)
id: UUID
scenario_id: UUID
class ScenarioBase(BaseModel):
name: str = Field(min_length=1, max_length=255)
description: str | None = None
c2_type: C2Type
version: int = Field(default=1, ge=1)
class ScenarioCreate(ScenarioBase):
steps: list[ScenarioStepCreate] = Field(default_factory=list)
class ScenarioUpdate(BaseModel):
name: str | None = Field(default=None, min_length=1, max_length=255)
description: str | None = None
c2_type: C2Type | None = None
class ScenarioRead(ScenarioBase):
model_config = ConfigDict(from_attributes=True)
id: UUID
engagement_id: UUID
steps: list[ScenarioStepRead] = Field(default_factory=list)

View File

@@ -0,0 +1,49 @@
from __future__ import annotations
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
from mimic.db.types import PayloadType, TtpSource
class TtpBase(BaseModel):
name: str = Field(min_length=1, max_length=255)
description: str | None = None
mitre_technique: str = Field(min_length=2, max_length=16)
mitre_subtechnique: str | None = Field(default=None, max_length=16)
payload_type: PayloadType
payload_template: str = ""
params_schema_json: dict | None = None
opsec_notes: str | None = None
cleanup_command: str | None = None
is_stealth_variant: bool = False
source: TtpSource = TtpSource.CUSTOM
tags: list[str] = Field(default_factory=list)
class TtpCreate(TtpBase):
pass
class TtpUpdate(BaseModel):
name: str | None = Field(default=None, min_length=1, max_length=255)
description: str | None = None
mitre_technique: str | None = Field(default=None, min_length=2, max_length=16)
mitre_subtechnique: str | None = Field(default=None, max_length=16)
payload_type: PayloadType | None = None
payload_template: str | None = None
params_schema_json: dict | None = None
opsec_notes: str | None = None
cleanup_command: str | None = None
is_stealth_variant: bool | None = None
tags: list[str] | None = None
is_published: bool | None = None
class TtpRead(TtpBase):
model_config = ConfigDict(from_attributes=True)
id: UUID
is_published: bool
current_version: int