chore(backend): mypy strict clean + ruff format pass

Pre-merge sanity per devops checklist (ruff format --check, mypy --strict).

Type fixes:
- ORM models: `Mapped[dict]` → `Mapped[dict[str, Any]]` (audit, scenario, run,
  report, ttp, detection.artifact_files_json). Equivalent on Pydantic DTOs
  (TtpBase.params_schema_json, ScenarioStepBase.params_override_json).
- Rename `TtpRead.current_version` → `TtpRead.version` to mirror the ORM
  column (which itself was renamed in D-009 cleanup).
- Flask blueprints: add `-> ResponseReturnValue` to every view, plus typed
  UUID params on `_validate_step_consistency`.
- `templating/filters.py`: rewrite the conditional re2 import so mypy can
  narrow the union (`ModuleType | None`); the runtime branch on `_re2 is not
  None` removes the unused-ignore that was triggered by warn_unused_ignores.
- `pyproject.toml`: add `flask_login.*` and `pythonjsonlogger.*` to the
  `[[tool.mypy.overrides]]` `ignore_missing_imports` list (both ship without
  typed marker).
- Misc: drop stale `# type: ignore` comments (`app.py:36`,
  `rbac/decorators.py:35`) flagged by `warn_unused_ignores`. Keep
  `logging.JsonFormatter` ignore because the symbol exists at runtime but is
  not re-exported through the typed surface.

Formatting:
- `ruff format` applied (15 files normalized; line-length unchanged at 100).

Verification on this commit:
- `ruff check`  → All checks passed.
- `ruff format --check` → 68 files already formatted.
- `mypy --strict src` → Success: no issues found in 54 source files.
- `pytest tests/unit` → 49 passed.
This commit is contained in:
knacky
2026-05-22 05:10:51 +02:00
parent 12d131c826
commit adab8a58e7
24 changed files with 203 additions and 165 deletions

View File

@@ -113,6 +113,8 @@ module = [
"re2",
"flask_socketio.*",
"flask_migrate.*",
"flask_login.*",
"pythonjsonlogger.*",
"gevent.*",
"testcontainers.*",
"authlib.*",

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from flask import Blueprint, abort, jsonify
from flask.typing import ResponseReturnValue
from sqlalchemy import select
from mimic.api._helpers import jsonify_model, parse_body, parse_uuid
@@ -17,7 +18,7 @@ bp = Blueprint("engagements", __name__)
@bp.get("")
@require_perm(Permission.ENGAGEMENT_READ)
def list_engagements():
def list_engagements() -> ResponseReturnValue:
stmt = select(Engagement).order_by(Engagement.created_at.desc())
rows = db.session.execute(stmt).scalars().all()
return jsonify([EngagementRead.model_validate(row).model_dump(mode="json") for row in rows])
@@ -25,7 +26,7 @@ def list_engagements():
@bp.post("")
@require_perm(Permission.ENGAGEMENT_CREATE)
def create_engagement():
def create_engagement() -> ResponseReturnValue:
payload = parse_body(EngagementCreate)
engagement = Engagement(
client_name=payload.client_name,
@@ -42,7 +43,7 @@ def create_engagement():
@bp.get("/<eid>")
@require_perm(Permission.ENGAGEMENT_READ)
def get_engagement(eid: str):
def get_engagement(eid: str) -> ResponseReturnValue:
engagement = db.session.get(Engagement, parse_uuid(eid))
if engagement is None:
abort(404)
@@ -51,7 +52,7 @@ def get_engagement(eid: str):
@bp.put("/<eid>")
@require_perm(Permission.ENGAGEMENT_UPDATE)
def update_engagement(eid: str):
def update_engagement(eid: str) -> ResponseReturnValue:
engagement = db.session.get(Engagement, parse_uuid(eid))
if engagement is None:
abort(404)
@@ -64,7 +65,7 @@ def update_engagement(eid: str):
@bp.delete("/<eid>")
@require_perm(Permission.ENGAGEMENT_DELETE)
def delete_engagement(eid: str):
def delete_engagement(eid: str) -> ResponseReturnValue:
engagement = db.session.get(Engagement, parse_uuid(eid))
if engagement is None:
abort(404)

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from flask import Blueprint, abort, jsonify
from flask.typing import ResponseReturnValue
from sqlalchemy import select
from mimic.api._helpers import jsonify_model, parse_body, parse_uuid
@@ -24,7 +25,7 @@ def _engagement_or_404(eid: str) -> Engagement:
@bp.get("/engagements/<eid>/hosts")
@require_perm(Permission.HOST_CRUD)
def list_hosts(eid: str):
def list_hosts(eid: str) -> ResponseReturnValue:
engagement = _engagement_or_404(eid)
stmt = select(Host).where(Host.engagement_id == engagement.id).order_by(Host.hostname)
rows = db.session.execute(stmt).scalars().all()
@@ -33,7 +34,7 @@ def list_hosts(eid: str):
@bp.post("/engagements/<eid>/hosts")
@require_perm(Permission.HOST_CRUD)
def create_host(eid: str):
def create_host(eid: str) -> ResponseReturnValue:
engagement = _engagement_or_404(eid)
payload = parse_body(HostCreate)
host = Host(
@@ -52,7 +53,7 @@ def create_host(eid: str):
@bp.put("/engagements/<eid>/hosts/<hid>")
@require_perm(Permission.HOST_CRUD)
def update_host(eid: str, hid: str):
def update_host(eid: str, hid: str) -> ResponseReturnValue:
engagement = _engagement_or_404(eid)
host = db.session.get(Host, parse_uuid(hid, field="host id"))
if host is None or host.engagement_id != engagement.id:
@@ -66,7 +67,7 @@ def update_host(eid: str, hid: str):
@bp.delete("/engagements/<eid>/hosts/<hid>")
@require_perm(Permission.HOST_CRUD)
def delete_host(eid: str, hid: str):
def delete_host(eid: str, hid: str) -> ResponseReturnValue:
engagement = _engagement_or_404(eid)
host = db.session.get(Host, parse_uuid(hid, field="host id"))
if host is None or host.engagement_id != engagement.id:

View File

@@ -2,7 +2,10 @@
from __future__ import annotations
from uuid import UUID
from flask import Blueprint, abort, jsonify
from flask.typing import ResponseReturnValue
from sqlalchemy import select
from mimic.api._helpers import jsonify_model, parse_body, parse_uuid
@@ -34,7 +37,7 @@ def _scenario_or_404(engagement: Engagement, sid: str) -> Scenario:
return scenario
def _validate_step_consistency(scenario: Scenario, ttp_id, host_id) -> None:
def _validate_step_consistency(scenario: Scenario, ttp_id: UUID, host_id: UUID) -> None:
ttp = db.session.get(Ttp, ttp_id)
host = db.session.get(Host, host_id)
if ttp is None or host is None:
@@ -47,7 +50,7 @@ def _validate_step_consistency(scenario: Scenario, ttp_id, host_id) -> None:
@bp.get("/engagements/<eid>/scenarios")
@require_perm(Permission.SCENARIO_CRUD)
def list_scenarios(eid: str):
def list_scenarios(eid: str) -> ResponseReturnValue:
engagement = _engagement_or_404(eid)
stmt = (
select(Scenario)
@@ -60,7 +63,7 @@ def list_scenarios(eid: str):
@bp.post("/engagements/<eid>/scenarios")
@require_perm(Permission.SCENARIO_CRUD)
def create_scenario(eid: str):
def create_scenario(eid: str) -> ResponseReturnValue:
engagement = _engagement_or_404(eid)
payload = parse_body(ScenarioCreate)
scenario = Scenario(
@@ -90,7 +93,7 @@ def create_scenario(eid: str):
@bp.get("/engagements/<eid>/scenarios/<sid>")
@require_perm(Permission.SCENARIO_CRUD)
def get_scenario(eid: str, sid: str):
def get_scenario(eid: str, sid: str) -> ResponseReturnValue:
engagement = _engagement_or_404(eid)
scenario = _scenario_or_404(engagement, sid)
return jsonify_model(ScenarioRead.model_validate(scenario))
@@ -98,7 +101,7 @@ def get_scenario(eid: str, sid: str):
@bp.put("/engagements/<eid>/scenarios/<sid>")
@require_perm(Permission.SCENARIO_CRUD)
def update_scenario(eid: str, sid: str):
def update_scenario(eid: str, sid: str) -> ResponseReturnValue:
engagement = _engagement_or_404(eid)
scenario = _scenario_or_404(engagement, sid)
payload = parse_body(ScenarioUpdate)
@@ -110,7 +113,7 @@ def update_scenario(eid: str, sid: str):
@bp.delete("/engagements/<eid>/scenarios/<sid>")
@require_perm(Permission.SCENARIO_CRUD)
def delete_scenario(eid: str, sid: str):
def delete_scenario(eid: str, sid: str) -> ResponseReturnValue:
engagement = _engagement_or_404(eid)
scenario = _scenario_or_404(engagement, sid)
db.session.delete(scenario)
@@ -120,7 +123,7 @@ def delete_scenario(eid: str, sid: str):
@bp.post("/engagements/<eid>/scenarios/<sid>/steps")
@require_perm(Permission.SCENARIO_CRUD)
def add_step(eid: str, sid: str):
def add_step(eid: str, sid: str) -> ResponseReturnValue:
engagement = _engagement_or_404(eid)
scenario = _scenario_or_404(engagement, sid)
payload = parse_body(ScenarioStepCreate)

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from flask import Blueprint, abort, jsonify
from flask.typing import ResponseReturnValue
from sqlalchemy import select
from mimic.api._helpers import jsonify_model, parse_body, parse_uuid
@@ -16,7 +17,7 @@ bp = Blueprint("ttps", __name__)
@bp.get("")
@require_perm(Permission.TTP_READ)
def list_ttps():
def list_ttps() -> ResponseReturnValue:
stmt = select(Ttp).order_by(Ttp.created_at.desc())
rows = db.session.execute(stmt).scalars().all()
return jsonify([TtpRead.model_validate(row).model_dump(mode="json") for row in rows])
@@ -24,7 +25,7 @@ def list_ttps():
@bp.post("")
@require_perm(Permission.TTP_DRAFT)
def create_ttp():
def create_ttp() -> ResponseReturnValue:
payload = parse_body(TtpCreate)
ttp = Ttp(**payload.model_dump())
db.session.add(ttp)
@@ -34,7 +35,7 @@ def create_ttp():
@bp.get("/<tid>")
@require_perm(Permission.TTP_READ)
def get_ttp(tid: str):
def get_ttp(tid: str) -> ResponseReturnValue:
ttp = db.session.get(Ttp, parse_uuid(tid, field="ttp id"))
if ttp is None:
abort(404)
@@ -43,7 +44,7 @@ def get_ttp(tid: str):
@bp.put("/<tid>")
@require_perm(Permission.TTP_DRAFT)
def update_ttp(tid: str):
def update_ttp(tid: str) -> ResponseReturnValue:
ttp = db.session.get(Ttp, parse_uuid(tid, field="ttp id"))
if ttp is None:
abort(404)
@@ -63,7 +64,7 @@ def update_ttp(tid: str):
@bp.delete("/<tid>")
@require_perm(Permission.TTP_DRAFT)
def delete_ttp(tid: str):
def delete_ttp(tid: str) -> ResponseReturnValue:
ttp = db.session.get(Ttp, parse_uuid(tid, field="ttp id"))
if ttp is None:
abort(404)

View File

@@ -33,7 +33,7 @@ def create_app(settings: Settings | None = None) -> Flask:
db.init_app(app)
migrate.init_app(app, db, directory="src/mimic/db/migrations")
login_manager.init_app(app)
login_manager.user_loader(load_user) # type: ignore[arg-type]
login_manager.user_loader(load_user)
socketio.init_app(
app,

View File

@@ -39,9 +39,7 @@ def create_user(email: str, user_type: str, password: str, display_name: str | N
db.session.add(user)
db.session.flush()
group_name = (
GroupName.RT_LEAD if user.type is UserType.RT_LEAD else GroupName.RT_OPERATOR
)
group_name = GroupName.RT_LEAD if user.type is UserType.RT_LEAD else GroupName.RT_OPERATOR
group = db.session.query(Group).filter_by(name=group_name.value).first()
if group is None:
raise click.ClickException(

View File

@@ -46,9 +46,7 @@ class ConnectorFactory:
try:
klass = _REGISTRY[c2_type]
except KeyError as exc:
raise NotImplementedError(
f"no connector registered for {c2_type.value}"
) from exc
raise NotImplementedError(f"no connector registered for {c2_type.value}") from exc
connector = klass()
connector.authenticate(self._resolver(c2_type))

View File

@@ -13,6 +13,7 @@ Revision ID: 202605210001
Revises:
Create Date: 2026-05-21
"""
from __future__ import annotations
import sqlalchemy as sa
@@ -30,9 +31,7 @@ depends_on: str | None = None
# ---------------------------------------------------------------------------
USER_TYPE = sa.Enum("rt_operator", "rt_lead", "soc_analyst", name="user_type")
ENGAGEMENT_STATUS = sa.Enum(
"draft", "active", "closed", "archived", name="engagement_status"
)
ENGAGEMENT_STATUS = sa.Enum("draft", "active", "closed", "archived", name="engagement_status")
C2_TYPE = sa.Enum("mythic", "home", name="c2_type")
HOST_STATUS = sa.Enum("unknown", "alive", "dead", name="host_status")
PAYLOAD_TYPE = sa.Enum(
@@ -64,18 +63,10 @@ RUN_STEP_STATUS = sa.Enum(
"cleanup_failed",
name="run_step_status",
)
CLEANUP_STATUS = sa.Enum(
"pending", "success", "failed", "partial", name="cleanup_status"
)
DETECTION_LEVEL = sa.Enum(
"detected", "partial", "not_detected", name="detection_level"
)
DETECTION_SOURCE = sa.Enum(
"ndr", "edr", "siem", "manual", "other", name="detection_source"
)
EVIDENCE_STATUS = sa.Enum(
"success", "failure", "partial", name="evidence_status"
)
CLEANUP_STATUS = sa.Enum("pending", "success", "failed", "partial", name="cleanup_status")
DETECTION_LEVEL = sa.Enum("detected", "partial", "not_detected", name="detection_level")
DETECTION_SOURCE = sa.Enum("ndr", "edr", "siem", "manual", "other", name="detection_source")
EVIDENCE_STATUS = sa.Enum("success", "failure", "partial", name="evidence_status")
def upgrade() -> None:
@@ -107,8 +98,12 @@ def upgrade() -> None:
sa.Column("local_password_hash", sa.String(255)),
sa.Column("disabled_at", sa.DateTime(timezone=True)),
sa.Column("last_login_at", sa.DateTime(timezone=True)),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.UniqueConstraint("email", name="uq_user_email"),
)
@@ -126,8 +121,12 @@ def upgrade() -> None:
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column("name", sa.String(80), nullable=False),
sa.Column("description", sa.String(255)),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.UniqueConstraint("name", name="uq_group_name"),
)
@@ -136,13 +135,19 @@ def upgrade() -> None:
sa.Column(
"group_id",
UUID(as_uuid=True),
sa.ForeignKey("group.id", ondelete="CASCADE", name="fk_group_permission_group_id_group"),
sa.ForeignKey(
"group.id", ondelete="CASCADE", name="fk_group_permission_group_id_group"
),
nullable=False,
),
sa.Column(
"permission_id",
UUID(as_uuid=True),
sa.ForeignKey("permission.id", ondelete="CASCADE", name="fk_group_permission_permission_id_permission"),
sa.ForeignKey(
"permission.id",
ondelete="CASCADE",
name="fk_group_permission_permission_id_permission",
),
nullable=False,
),
sa.PrimaryKeyConstraint("group_id", "permission_id", name="pk_group_permission"),
@@ -158,9 +163,15 @@ def upgrade() -> None:
sa.Column("start_date", sa.Date),
sa.Column("end_date", sa.Date),
sa.Column("c2_type", C2_TYPE, nullable=False, server_default="mythic"),
sa.Column("created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column(
"created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")
),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
)
op.create_table(
@@ -217,8 +228,12 @@ def upgrade() -> None:
sa.Column("config_fernet", sa.LargeBinary, nullable=False),
sa.Column("version", sa.Integer, nullable=False, server_default="1"),
sa.Column("retired_at", sa.DateTime(timezone=True)),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
)
op.create_index(
"ix_c2_credential_engagement_active",
@@ -244,8 +259,12 @@ def upgrade() -> None:
sa.Column("c2_type", C2_TYPE, nullable=False),
sa.Column("status", HOST_STATUS, nullable=False, server_default="unknown"),
sa.Column("last_seen", sa.DateTime(timezone=True)),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
)
op.create_index("ix_host_engagement_id", "host", ["engagement_id"])
@@ -267,9 +286,15 @@ def upgrade() -> None:
sa.Column("tags", JSONB, nullable=False, server_default="[]"),
sa.Column("version", sa.Integer, nullable=False, server_default="1"),
sa.Column("is_published", sa.Boolean, nullable=False, server_default=sa.false()),
sa.Column("created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column(
"created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")
),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
)
# No `ttp_version` table — H32 / D-009: snapshot lives on run.snapshot_json.
@@ -287,9 +312,15 @@ def upgrade() -> None:
sa.Column("description", sa.Text),
sa.Column("version", sa.Integer, nullable=False, server_default="1"),
sa.Column("c2_type", C2_TYPE, nullable=False),
sa.Column("created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column(
"created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")
),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
)
op.create_table(
@@ -316,8 +347,12 @@ def upgrade() -> None:
),
sa.Column("params_override_json", JSONB, nullable=False, server_default="{}"),
sa.Column("delay_after_ms", sa.Integer, nullable=False, server_default="0"),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.UniqueConstraint("scenario_id", "order_idx", name="uq_scenario_step_order_idx"),
)
@@ -334,10 +369,16 @@ def upgrade() -> None:
sa.Column("status", RUN_STATUS, nullable=False, server_default="queued"),
sa.Column("started_at", sa.DateTime(timezone=True)),
sa.Column("ended_at", sa.DateTime(timezone=True)),
sa.Column("started_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")),
sa.Column(
"started_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")
),
sa.Column("snapshot_json", JSONB, nullable=False, server_default="{}"),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
)
op.create_table(
@@ -363,8 +404,12 @@ def upgrade() -> None:
sa.Column("output_blob_ref", sa.String(512)),
sa.Column("exit_code", sa.Integer),
sa.Column("resolved_payload_text", sa.Text),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
)
op.create_table(
@@ -382,9 +427,15 @@ def upgrade() -> None:
sa.Column("ended_at", sa.DateTime(timezone=True)),
sa.Column("resolved_command_text", sa.Text),
sa.Column("output", sa.Text),
sa.Column("executed_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column(
"executed_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")
),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
)
# ----------------------------------------------------- detection / evidence
@@ -408,8 +459,12 @@ def upgrade() -> None:
sa.Column("latency_ms", sa.Integer),
sa.Column("comment", sa.Text),
sa.Column("recorded_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
)
op.create_table(
@@ -432,8 +487,12 @@ def upgrade() -> None:
sa.Column("artifact_files_json", JSONB, nullable=False, server_default="[]"),
sa.Column("comment", sa.Text),
sa.Column("recorded_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
)
# ----------------------------------------------------------------- report
@@ -452,9 +511,15 @@ def upgrade() -> None:
sa.Column("pdf_path", sa.String(512)),
sa.Column("md_path", sa.String(512)),
sa.Column("generated_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("generated_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column(
"generated_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")
),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
)
# ------------------------------------------------------------- soc_session
@@ -479,8 +544,12 @@ def upgrade() -> None:
sa.Column("last_ip", sa.String(64)),
sa.Column("last_user_agent", sa.String(512)),
sa.Column("last_used_at", sa.DateTime(timezone=True)),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
)
# ------------------------------------------------------------ audit_log

View File

@@ -10,6 +10,7 @@ WORM enforcement without a migration; sprint 0 fills the columns at insert.
from __future__ import annotations
from datetime import datetime
from typing import Any
from uuid import UUID
from sqlalchemy import JSON, DateTime, ForeignKey, String, Text, func
@@ -26,13 +27,11 @@ class AuditLog(UuidPkMixin, Base):
nullable=False,
server_default=func.now(),
)
actor_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
actor_id: Mapped[UUID | None] = mapped_column(ForeignKey("user.id", ondelete="SET NULL"))
action: Mapped[str] = mapped_column(String(80), nullable=False)
resource_type: Mapped[str] = mapped_column(String(80), nullable=False)
resource_id: Mapped[str | None] = mapped_column(String(128))
metadata_json: Mapped[dict] = mapped_column(JSON, nullable=False, default=dict)
metadata_json: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default=dict)
prev_hash: Mapped[str | None] = mapped_column(String(64))
row_hash: Mapped[str] = mapped_column(String(64), nullable=False)

View File

@@ -3,7 +3,7 @@
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from uuid import UUID
from sqlalchemy import (
@@ -45,9 +45,7 @@ class Detection(UuidPkMixin, TimestampsMixin, Base):
)
latency_ms: Mapped[int | None] = mapped_column(Integer)
comment: Mapped[str | None] = mapped_column(Text)
recorded_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
recorded_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
run_step: Mapped[RunStep] = relationship()
soc_user: Mapped[User] = relationship()
@@ -69,14 +67,12 @@ class Evidence(UuidPkMixin, TimestampsMixin, Base):
nullable=False,
)
artifacts_text: Mapped[str | None] = mapped_column(Text)
artifact_files_json: Mapped[list[dict]] = mapped_column(
artifact_files_json: Mapped[list[dict[str, Any]]] = mapped_column(
JSON, default=list, nullable=False
)
# Each entry: {"name": str, "ref": str, "sha256": str, "size_bytes": int}
comment: Mapped[str | None] = mapped_column(Text)
recorded_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
recorded_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
run_step: Mapped[RunStep] = relationship()
rt_user: Mapped[User] = relationship()

View File

@@ -37,9 +37,7 @@ class Engagement(UuidPkMixin, TimestampsMixin, Base):
nullable=False,
)
created_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
created_by_id: Mapped[UUID | None] = mapped_column(ForeignKey("user.id", ondelete="SET NULL"))
hosts: Mapped[list[Host]] = relationship(
back_populates="engagement",

View File

@@ -64,14 +64,10 @@ class GroupPermission(Base):
class UserGroup(Base):
__tablename__ = "user_group"
__table_args__ = (
PrimaryKeyConstraint(
"user_id", "group_id", "engagement_id", name="pk_user_group"
),
PrimaryKeyConstraint("user_id", "group_id", "engagement_id", name="pk_user_group"),
)
user_id: Mapped[UUID] = mapped_column(
ForeignKey("user.id", ondelete="CASCADE"), nullable=False
)
user_id: Mapped[UUID] = mapped_column(ForeignKey("user.id", ondelete="CASCADE"), nullable=False)
group_id: Mapped[UUID] = mapped_column(
ForeignKey("group.id", ondelete="CASCADE"), nullable=False
)

View File

@@ -3,7 +3,7 @@
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from uuid import UUID
from sqlalchemy import (
@@ -30,7 +30,7 @@ class Report(UuidPkMixin, TimestampsMixin, Base):
)
version: Mapped[int] = mapped_column(Integer, default=1, nullable=False)
content_json: Mapped[dict] = mapped_column(JSON, nullable=False, default=dict)
content_json: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default=dict)
content_sha256: Mapped[str] = mapped_column(String(64), nullable=False)
# SHA-256 of canonical JSON. Identical hash referenced in PDF footer, JSON
# export and Markdown export (spec H19 / F9 / F14).
@@ -38,11 +38,7 @@ class Report(UuidPkMixin, TimestampsMixin, Base):
pdf_path: Mapped[str | None] = mapped_column(String(512))
md_path: Mapped[str | None] = mapped_column(String(512))
generated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
generated_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
generated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
generated_by_id: Mapped[UUID | None] = mapped_column(ForeignKey("user.id", ondelete="SET NULL"))
engagement: Mapped[Engagement] = relationship()

View File

@@ -3,7 +3,7 @@
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from uuid import UUID
from sqlalchemy import (
@@ -39,11 +39,9 @@ class Run(UuidPkMixin, TimestampsMixin, Base):
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
ended_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
started_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
started_by_id: Mapped[UUID | None] = mapped_column(ForeignKey("user.id", ondelete="SET NULL"))
snapshot_json: Mapped[dict] = mapped_column(JSON, nullable=False, default=dict)
snapshot_json: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default=dict)
# Full self-contained snapshot of scenario + steps + resolved TTPs.
# Source of truth for replay (spec H32).
@@ -111,8 +109,6 @@ class RunStepCleanup(UuidPkMixin, TimestampsMixin, Base):
ended_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
resolved_command_text: Mapped[str | None] = mapped_column(Text)
output: Mapped[str | None] = mapped_column(Text)
executed_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
executed_by_id: Mapped[UUID | None] = mapped_column(ForeignKey("user.id", ondelete="SET NULL"))
run_step: Mapped[RunStep] = relationship(back_populates="cleanup")

View File

@@ -6,7 +6,7 @@ run start that every referenced host matches.
from __future__ import annotations
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from uuid import UUID
from sqlalchemy import (
@@ -45,9 +45,7 @@ class Scenario(UuidPkMixin, TimestampsMixin, Base):
Enum(C2Type, name="c2_type", create_type=False),
nullable=False,
)
created_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
created_by_id: Mapped[UUID | None] = mapped_column(ForeignKey("user.id", ondelete="SET NULL"))
engagement: Mapped[Engagement] = relationship(back_populates="scenarios")
steps: Mapped[list[ScenarioStep]] = relationship(
@@ -78,7 +76,7 @@ class ScenarioStep(UuidPkMixin, TimestampsMixin, Base):
ForeignKey("host.id", ondelete="RESTRICT"),
nullable=False,
)
params_override_json: Mapped[dict] = mapped_column(JSON, default=dict, nullable=False)
params_override_json: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict, nullable=False)
delay_after_ms: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
scenario: Mapped[Scenario] = relationship(back_populates="steps")

View File

@@ -35,9 +35,7 @@ class SocSession(UuidPkMixin, TimestampsMixin, Base):
token_hash: Mapped[str] = mapped_column(String(255), nullable=False, unique=True)
# bcrypt hash. Plain token returned once at creation.
expires_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
revoked_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
last_ip: Mapped[str | None] = mapped_column(String(64))

View File

@@ -8,6 +8,7 @@ counter (§8) — bumped on edit, never referenced for replay.
from __future__ import annotations
from typing import Any
from uuid import UUID
from sqlalchemy import (
@@ -39,7 +40,7 @@ class Ttp(UuidPkMixin, TimestampsMixin, Base):
nullable=False,
)
payload_template: Mapped[str] = mapped_column(Text, nullable=False, default="")
params_schema_json: Mapped[dict | None] = mapped_column(JSON)
params_schema_json: Mapped[dict[str, Any] | None] = mapped_column(JSON)
opsec_notes: Mapped[str | None] = mapped_column(Text)
cleanup_command: Mapped[str | None] = mapped_column(Text)
@@ -57,6 +58,4 @@ class Ttp(UuidPkMixin, TimestampsMixin, Base):
is_published: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
# Promoted to the library (lead RT only — F11).
created_by_id: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL")
)
created_by_id: Mapped[UUID | None] = mapped_column(ForeignKey("user.id", ondelete="SET NULL"))

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
import logging
import sys
from pythonjsonlogger.jsonlogger import JsonFormatter
from pythonjsonlogger.jsonlogger import JsonFormatter # type: ignore[attr-defined]
def configure_logging(level: str = "INFO", *, as_json: bool = True) -> None:
@@ -16,9 +16,7 @@ def configure_logging(level: str = "INFO", *, as_json: bool = True) -> None:
"%(asctime)s %(levelname)s %(name)s %(message)s"
)
else:
formatter = logging.Formatter(
"%(asctime)s %(levelname)-8s %(name)s: %(message)s"
)
formatter = logging.Formatter("%(asctime)s %(levelname)-8s %(name)s: %(message)s")
handler.setFormatter(formatter)
root = logging.getLogger()

View File

@@ -4,16 +4,13 @@ from __future__ import annotations
from collections.abc import Callable
from functools import wraps
from typing import TYPE_CHECKING, ParamSpec, TypeVar
from typing import ParamSpec, TypeVar
from flask import abort
from flask_login import current_user
from mimic.rbac.matrix import Permission
if TYPE_CHECKING:
pass
P = ParamSpec("P")
R = TypeVar("R")
@@ -32,7 +29,7 @@ def require_perm(perm: Permission) -> Callable[[Callable[P, R]], Callable[P, R]]
abort(403)
return view(*args, **kwargs)
return _wrapped # type: ignore[return-value]
return _wrapped
return _decorate

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
from typing import Any
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
@@ -11,7 +12,7 @@ class ScenarioStepBase(BaseModel):
ttp_id: UUID
host_id: UUID
order_idx: int = Field(ge=0)
params_override_json: dict = Field(default_factory=dict)
params_override_json: dict[str, Any] = Field(default_factory=dict)
delay_after_ms: int = Field(default=0, ge=0)

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
from typing import Any
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
@@ -14,7 +15,7 @@ class TtpBase(BaseModel):
mitre_subtechnique: str | None = Field(default=None, max_length=16)
payload_type: PayloadType
payload_template: str = ""
params_schema_json: dict | None = None
params_schema_json: dict[str, Any] | None = None
opsec_notes: str | None = None
cleanup_command: str | None = None
is_stealth_variant: bool = False
@@ -33,7 +34,7 @@ class TtpUpdate(BaseModel):
mitre_subtechnique: str | None = Field(default=None, max_length=16)
payload_type: PayloadType | None = None
payload_template: str | None = None
params_schema_json: dict | None = None
params_schema_json: dict[str, Any] | None = None
opsec_notes: str | None = None
cleanup_command: str | None = None
is_stealth_variant: bool | None = None
@@ -46,4 +47,4 @@ class TtpRead(TtpBase):
id: UUID
is_published: bool
current_version: int
version: int

View File

@@ -13,17 +13,17 @@
from __future__ import annotations
import re
from typing import Any
from types import ModuleType
from typing import Any, cast
from jinja2 import TemplateError
try: # pragma: no cover - presence depends on environment
import re2 as _re2 # type: ignore[import-not-found]
import re2 as _imported_re2
_HAS_RE2 = True
_re2: ModuleType | None = _imported_re2
except ImportError: # pragma: no cover
_re2 = None
_HAS_RE2 = False
_FALLBACK_MAX_INPUT = 1 * 1024 * 1024 # 1 MB safety cap when re2 missing
@@ -41,8 +41,8 @@ def regex_extract(
raise TemplateError(f"regex_extract: cannot match against None for /{pattern}/")
haystack = text if isinstance(text, str) else str(text)
if _HAS_RE2:
compiled = _re2.compile(pattern)
if _re2 is not None:
compiled = cast(Any, _re2).compile(pattern)
match = compiled.search(haystack)
else:
if len(haystack) > _FALLBACK_MAX_INPUT:
@@ -56,28 +56,22 @@ def regex_extract(
try:
captured = match.group(name)
except IndexError as exc:
raise TemplateError(
f"regex_extract: named group {name!r} not in /{pattern}/"
) from exc
raise TemplateError(f"regex_extract: named group {name!r} not in /{pattern}/") from exc
if captured is None:
raise TemplateError(
f"regex_extract: named group {name!r} captured nothing in /{pattern}/"
)
return captured
return str(captured)
try:
captured = match.group(group)
except IndexError:
if group == 1:
return match.group(0)
raise TemplateError(
f"regex_extract: group {group} out of range for /{pattern}/"
) from None
return str(match.group(0))
raise TemplateError(f"regex_extract: group {group} out of range for /{pattern}/") from None
if captured is None:
if group == 1:
return match.group(0)
raise TemplateError(
f"regex_extract: group {group} captured nothing in /{pattern}/"
)
return captured
return str(match.group(0))
raise TemplateError(f"regex_extract: group {group} captured nothing in /{pattern}/")
return str(captured)

View File

@@ -22,9 +22,7 @@ def postgres_dsn() -> Iterator[str]:
if PostgresContainer is None:
pytest.skip("testcontainers not installed")
with PostgresContainer("postgres:16-alpine") as pg:
url = pg.get_connection_url().replace(
"postgresql+psycopg2", "postgresql+psycopg"
)
url = pg.get_connection_url().replace("postgresql+psycopg2", "postgresql+psycopg")
yield url