From adab8a58e7c4809f18dd01ae56f96678ca699dbc Mon Sep 17 00:00:00 2001 From: knacky Date: Fri, 22 May 2026 05:10:51 +0200 Subject: [PATCH] chore(backend): mypy strict clean + ruff format pass MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-merge sanity per devops checklist (ruff format --check, mypy --strict). Type fixes: - ORM models: `Mapped[dict]` → `Mapped[dict[str, Any]]` (audit, scenario, run, report, ttp, detection.artifact_files_json). Equivalent on Pydantic DTOs (TtpBase.params_schema_json, ScenarioStepBase.params_override_json). - Rename `TtpRead.current_version` → `TtpRead.version` to mirror the ORM column (which itself was renamed in D-009 cleanup). - Flask blueprints: add `-> ResponseReturnValue` to every view, plus typed UUID params on `_validate_step_consistency`. - `templating/filters.py`: rewrite the conditional re2 import so mypy can narrow the union (`ModuleType | None`); the runtime branch on `_re2 is not None` removes the unused-ignore that was triggered by warn_unused_ignores. - `pyproject.toml`: add `flask_login.*` and `pythonjsonlogger.*` to the `[[tool.mypy.overrides]]` `ignore_missing_imports` list (both ship without typed marker). - Misc: drop stale `# type: ignore` comments (`app.py:36`, `rbac/decorators.py:35`) flagged by `warn_unused_ignores`. Keep `logging.JsonFormatter` ignore because the symbol exists at runtime but is not re-exported through the typed surface. Formatting: - `ruff format` applied (15 files normalized; line-length unchanged at 100). Verification on this commit: - `ruff check` → All checks passed. - `ruff format --check` → 68 files already formatted. - `mypy --strict src` → Success: no issues found in 54 source files. - `pytest tests/unit` → 49 passed. --- backend/pyproject.toml | 2 + backend/src/mimic/api/engagements.py | 11 +- backend/src/mimic/api/hosts.py | 9 +- backend/src/mimic/api/scenarios.py | 17 +- backend/src/mimic/api/ttps.py | 11 +- backend/src/mimic/app.py | 2 +- backend/src/mimic/cli/user.py | 4 +- backend/src/mimic/connectors/factory.py | 4 +- .../versions/202605210001_initial_schema.py | 175 ++++++++++++------ backend/src/mimic/db/models/audit.py | 7 +- backend/src/mimic/db/models/detection.py | 12 +- backend/src/mimic/db/models/engagement.py | 4 +- backend/src/mimic/db/models/permission.py | 8 +- backend/src/mimic/db/models/report.py | 12 +- backend/src/mimic/db/models/run.py | 12 +- backend/src/mimic/db/models/scenario.py | 8 +- backend/src/mimic/db/models/soc_session.py | 4 +- backend/src/mimic/db/models/ttp.py | 7 +- backend/src/mimic/logging.py | 6 +- backend/src/mimic/rbac/decorators.py | 7 +- backend/src/mimic/schemas/scenario.py | 3 +- backend/src/mimic/schemas/ttp.py | 7 +- backend/src/mimic/templating/filters.py | 32 ++-- backend/tests/integration/conftest.py | 4 +- 24 files changed, 203 insertions(+), 165 deletions(-) diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 0b38510..1301e16 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -113,6 +113,8 @@ module = [ "re2", "flask_socketio.*", "flask_migrate.*", + "flask_login.*", + "pythonjsonlogger.*", "gevent.*", "testcontainers.*", "authlib.*", diff --git a/backend/src/mimic/api/engagements.py b/backend/src/mimic/api/engagements.py index 0df267c..ec73b2d 100644 --- a/backend/src/mimic/api/engagements.py +++ b/backend/src/mimic/api/engagements.py @@ -3,6 +3,7 @@ from __future__ import annotations from flask import Blueprint, abort, jsonify +from flask.typing import ResponseReturnValue from sqlalchemy import select from mimic.api._helpers import jsonify_model, parse_body, parse_uuid @@ -17,7 +18,7 @@ bp = Blueprint("engagements", __name__) @bp.get("") @require_perm(Permission.ENGAGEMENT_READ) -def list_engagements(): +def list_engagements() -> ResponseReturnValue: stmt = select(Engagement).order_by(Engagement.created_at.desc()) rows = db.session.execute(stmt).scalars().all() return jsonify([EngagementRead.model_validate(row).model_dump(mode="json") for row in rows]) @@ -25,7 +26,7 @@ def list_engagements(): @bp.post("") @require_perm(Permission.ENGAGEMENT_CREATE) -def create_engagement(): +def create_engagement() -> ResponseReturnValue: payload = parse_body(EngagementCreate) engagement = Engagement( client_name=payload.client_name, @@ -42,7 +43,7 @@ def create_engagement(): @bp.get("/") @require_perm(Permission.ENGAGEMENT_READ) -def get_engagement(eid: str): +def get_engagement(eid: str) -> ResponseReturnValue: engagement = db.session.get(Engagement, parse_uuid(eid)) if engagement is None: abort(404) @@ -51,7 +52,7 @@ def get_engagement(eid: str): @bp.put("/") @require_perm(Permission.ENGAGEMENT_UPDATE) -def update_engagement(eid: str): +def update_engagement(eid: str) -> ResponseReturnValue: engagement = db.session.get(Engagement, parse_uuid(eid)) if engagement is None: abort(404) @@ -64,7 +65,7 @@ def update_engagement(eid: str): @bp.delete("/") @require_perm(Permission.ENGAGEMENT_DELETE) -def delete_engagement(eid: str): +def delete_engagement(eid: str) -> ResponseReturnValue: engagement = db.session.get(Engagement, parse_uuid(eid)) if engagement is None: abort(404) diff --git a/backend/src/mimic/api/hosts.py b/backend/src/mimic/api/hosts.py index 0594862..984d82c 100644 --- a/backend/src/mimic/api/hosts.py +++ b/backend/src/mimic/api/hosts.py @@ -3,6 +3,7 @@ from __future__ import annotations from flask import Blueprint, abort, jsonify +from flask.typing import ResponseReturnValue from sqlalchemy import select from mimic.api._helpers import jsonify_model, parse_body, parse_uuid @@ -24,7 +25,7 @@ def _engagement_or_404(eid: str) -> Engagement: @bp.get("/engagements//hosts") @require_perm(Permission.HOST_CRUD) -def list_hosts(eid: str): +def list_hosts(eid: str) -> ResponseReturnValue: engagement = _engagement_or_404(eid) stmt = select(Host).where(Host.engagement_id == engagement.id).order_by(Host.hostname) rows = db.session.execute(stmt).scalars().all() @@ -33,7 +34,7 @@ def list_hosts(eid: str): @bp.post("/engagements//hosts") @require_perm(Permission.HOST_CRUD) -def create_host(eid: str): +def create_host(eid: str) -> ResponseReturnValue: engagement = _engagement_or_404(eid) payload = parse_body(HostCreate) host = Host( @@ -52,7 +53,7 @@ def create_host(eid: str): @bp.put("/engagements//hosts/") @require_perm(Permission.HOST_CRUD) -def update_host(eid: str, hid: str): +def update_host(eid: str, hid: str) -> ResponseReturnValue: engagement = _engagement_or_404(eid) host = db.session.get(Host, parse_uuid(hid, field="host id")) if host is None or host.engagement_id != engagement.id: @@ -66,7 +67,7 @@ def update_host(eid: str, hid: str): @bp.delete("/engagements//hosts/") @require_perm(Permission.HOST_CRUD) -def delete_host(eid: str, hid: str): +def delete_host(eid: str, hid: str) -> ResponseReturnValue: engagement = _engagement_or_404(eid) host = db.session.get(Host, parse_uuid(hid, field="host id")) if host is None or host.engagement_id != engagement.id: diff --git a/backend/src/mimic/api/scenarios.py b/backend/src/mimic/api/scenarios.py index e95de70..5e0c0bf 100644 --- a/backend/src/mimic/api/scenarios.py +++ b/backend/src/mimic/api/scenarios.py @@ -2,7 +2,10 @@ from __future__ import annotations +from uuid import UUID + from flask import Blueprint, abort, jsonify +from flask.typing import ResponseReturnValue from sqlalchemy import select from mimic.api._helpers import jsonify_model, parse_body, parse_uuid @@ -34,7 +37,7 @@ def _scenario_or_404(engagement: Engagement, sid: str) -> Scenario: return scenario -def _validate_step_consistency(scenario: Scenario, ttp_id, host_id) -> None: +def _validate_step_consistency(scenario: Scenario, ttp_id: UUID, host_id: UUID) -> None: ttp = db.session.get(Ttp, ttp_id) host = db.session.get(Host, host_id) if ttp is None or host is None: @@ -47,7 +50,7 @@ def _validate_step_consistency(scenario: Scenario, ttp_id, host_id) -> None: @bp.get("/engagements//scenarios") @require_perm(Permission.SCENARIO_CRUD) -def list_scenarios(eid: str): +def list_scenarios(eid: str) -> ResponseReturnValue: engagement = _engagement_or_404(eid) stmt = ( select(Scenario) @@ -60,7 +63,7 @@ def list_scenarios(eid: str): @bp.post("/engagements//scenarios") @require_perm(Permission.SCENARIO_CRUD) -def create_scenario(eid: str): +def create_scenario(eid: str) -> ResponseReturnValue: engagement = _engagement_or_404(eid) payload = parse_body(ScenarioCreate) scenario = Scenario( @@ -90,7 +93,7 @@ def create_scenario(eid: str): @bp.get("/engagements//scenarios/") @require_perm(Permission.SCENARIO_CRUD) -def get_scenario(eid: str, sid: str): +def get_scenario(eid: str, sid: str) -> ResponseReturnValue: engagement = _engagement_or_404(eid) scenario = _scenario_or_404(engagement, sid) return jsonify_model(ScenarioRead.model_validate(scenario)) @@ -98,7 +101,7 @@ def get_scenario(eid: str, sid: str): @bp.put("/engagements//scenarios/") @require_perm(Permission.SCENARIO_CRUD) -def update_scenario(eid: str, sid: str): +def update_scenario(eid: str, sid: str) -> ResponseReturnValue: engagement = _engagement_or_404(eid) scenario = _scenario_or_404(engagement, sid) payload = parse_body(ScenarioUpdate) @@ -110,7 +113,7 @@ def update_scenario(eid: str, sid: str): @bp.delete("/engagements//scenarios/") @require_perm(Permission.SCENARIO_CRUD) -def delete_scenario(eid: str, sid: str): +def delete_scenario(eid: str, sid: str) -> ResponseReturnValue: engagement = _engagement_or_404(eid) scenario = _scenario_or_404(engagement, sid) db.session.delete(scenario) @@ -120,7 +123,7 @@ def delete_scenario(eid: str, sid: str): @bp.post("/engagements//scenarios//steps") @require_perm(Permission.SCENARIO_CRUD) -def add_step(eid: str, sid: str): +def add_step(eid: str, sid: str) -> ResponseReturnValue: engagement = _engagement_or_404(eid) scenario = _scenario_or_404(engagement, sid) payload = parse_body(ScenarioStepCreate) diff --git a/backend/src/mimic/api/ttps.py b/backend/src/mimic/api/ttps.py index 0e33e9d..929c55c 100644 --- a/backend/src/mimic/api/ttps.py +++ b/backend/src/mimic/api/ttps.py @@ -3,6 +3,7 @@ from __future__ import annotations from flask import Blueprint, abort, jsonify +from flask.typing import ResponseReturnValue from sqlalchemy import select from mimic.api._helpers import jsonify_model, parse_body, parse_uuid @@ -16,7 +17,7 @@ bp = Blueprint("ttps", __name__) @bp.get("") @require_perm(Permission.TTP_READ) -def list_ttps(): +def list_ttps() -> ResponseReturnValue: stmt = select(Ttp).order_by(Ttp.created_at.desc()) rows = db.session.execute(stmt).scalars().all() return jsonify([TtpRead.model_validate(row).model_dump(mode="json") for row in rows]) @@ -24,7 +25,7 @@ def list_ttps(): @bp.post("") @require_perm(Permission.TTP_DRAFT) -def create_ttp(): +def create_ttp() -> ResponseReturnValue: payload = parse_body(TtpCreate) ttp = Ttp(**payload.model_dump()) db.session.add(ttp) @@ -34,7 +35,7 @@ def create_ttp(): @bp.get("/") @require_perm(Permission.TTP_READ) -def get_ttp(tid: str): +def get_ttp(tid: str) -> ResponseReturnValue: ttp = db.session.get(Ttp, parse_uuid(tid, field="ttp id")) if ttp is None: abort(404) @@ -43,7 +44,7 @@ def get_ttp(tid: str): @bp.put("/") @require_perm(Permission.TTP_DRAFT) -def update_ttp(tid: str): +def update_ttp(tid: str) -> ResponseReturnValue: ttp = db.session.get(Ttp, parse_uuid(tid, field="ttp id")) if ttp is None: abort(404) @@ -63,7 +64,7 @@ def update_ttp(tid: str): @bp.delete("/") @require_perm(Permission.TTP_DRAFT) -def delete_ttp(tid: str): +def delete_ttp(tid: str) -> ResponseReturnValue: ttp = db.session.get(Ttp, parse_uuid(tid, field="ttp id")) if ttp is None: abort(404) diff --git a/backend/src/mimic/app.py b/backend/src/mimic/app.py index 9e455e9..86a2738 100644 --- a/backend/src/mimic/app.py +++ b/backend/src/mimic/app.py @@ -33,7 +33,7 @@ def create_app(settings: Settings | None = None) -> Flask: db.init_app(app) migrate.init_app(app, db, directory="src/mimic/db/migrations") login_manager.init_app(app) - login_manager.user_loader(load_user) # type: ignore[arg-type] + login_manager.user_loader(load_user) socketio.init_app( app, diff --git a/backend/src/mimic/cli/user.py b/backend/src/mimic/cli/user.py index 5c3ef54..df4aa38 100644 --- a/backend/src/mimic/cli/user.py +++ b/backend/src/mimic/cli/user.py @@ -39,9 +39,7 @@ def create_user(email: str, user_type: str, password: str, display_name: str | N db.session.add(user) db.session.flush() - group_name = ( - GroupName.RT_LEAD if user.type is UserType.RT_LEAD else GroupName.RT_OPERATOR - ) + group_name = GroupName.RT_LEAD if user.type is UserType.RT_LEAD else GroupName.RT_OPERATOR group = db.session.query(Group).filter_by(name=group_name.value).first() if group is None: raise click.ClickException( diff --git a/backend/src/mimic/connectors/factory.py b/backend/src/mimic/connectors/factory.py index 2c7c2cd..8ff7b1d 100644 --- a/backend/src/mimic/connectors/factory.py +++ b/backend/src/mimic/connectors/factory.py @@ -46,9 +46,7 @@ class ConnectorFactory: try: klass = _REGISTRY[c2_type] except KeyError as exc: - raise NotImplementedError( - f"no connector registered for {c2_type.value}" - ) from exc + raise NotImplementedError(f"no connector registered for {c2_type.value}") from exc connector = klass() connector.authenticate(self._resolver(c2_type)) diff --git a/backend/src/mimic/db/migrations/versions/202605210001_initial_schema.py b/backend/src/mimic/db/migrations/versions/202605210001_initial_schema.py index 61490d1..93d0340 100644 --- a/backend/src/mimic/db/migrations/versions/202605210001_initial_schema.py +++ b/backend/src/mimic/db/migrations/versions/202605210001_initial_schema.py @@ -13,6 +13,7 @@ Revision ID: 202605210001 Revises: Create Date: 2026-05-21 """ + from __future__ import annotations import sqlalchemy as sa @@ -30,9 +31,7 @@ depends_on: str | None = None # --------------------------------------------------------------------------- USER_TYPE = sa.Enum("rt_operator", "rt_lead", "soc_analyst", name="user_type") -ENGAGEMENT_STATUS = sa.Enum( - "draft", "active", "closed", "archived", name="engagement_status" -) +ENGAGEMENT_STATUS = sa.Enum("draft", "active", "closed", "archived", name="engagement_status") C2_TYPE = sa.Enum("mythic", "home", name="c2_type") HOST_STATUS = sa.Enum("unknown", "alive", "dead", name="host_status") PAYLOAD_TYPE = sa.Enum( @@ -64,18 +63,10 @@ RUN_STEP_STATUS = sa.Enum( "cleanup_failed", name="run_step_status", ) -CLEANUP_STATUS = sa.Enum( - "pending", "success", "failed", "partial", name="cleanup_status" -) -DETECTION_LEVEL = sa.Enum( - "detected", "partial", "not_detected", name="detection_level" -) -DETECTION_SOURCE = sa.Enum( - "ndr", "edr", "siem", "manual", "other", name="detection_source" -) -EVIDENCE_STATUS = sa.Enum( - "success", "failure", "partial", name="evidence_status" -) +CLEANUP_STATUS = sa.Enum("pending", "success", "failed", "partial", name="cleanup_status") +DETECTION_LEVEL = sa.Enum("detected", "partial", "not_detected", name="detection_level") +DETECTION_SOURCE = sa.Enum("ndr", "edr", "siem", "manual", "other", name="detection_source") +EVIDENCE_STATUS = sa.Enum("success", "failure", "partial", name="evidence_status") def upgrade() -> None: @@ -107,8 +98,12 @@ def upgrade() -> None: sa.Column("local_password_hash", sa.String(255)), sa.Column("disabled_at", sa.DateTime(timezone=True)), sa.Column("last_login_at", sa.DateTime(timezone=True)), - sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), - sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column( + "created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), sa.UniqueConstraint("email", name="uq_user_email"), ) @@ -126,8 +121,12 @@ def upgrade() -> None: sa.Column("id", UUID(as_uuid=True), primary_key=True), sa.Column("name", sa.String(80), nullable=False), sa.Column("description", sa.String(255)), - sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), - sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column( + "created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), sa.UniqueConstraint("name", name="uq_group_name"), ) @@ -136,13 +135,19 @@ def upgrade() -> None: sa.Column( "group_id", UUID(as_uuid=True), - sa.ForeignKey("group.id", ondelete="CASCADE", name="fk_group_permission_group_id_group"), + sa.ForeignKey( + "group.id", ondelete="CASCADE", name="fk_group_permission_group_id_group" + ), nullable=False, ), sa.Column( "permission_id", UUID(as_uuid=True), - sa.ForeignKey("permission.id", ondelete="CASCADE", name="fk_group_permission_permission_id_permission"), + sa.ForeignKey( + "permission.id", + ondelete="CASCADE", + name="fk_group_permission_permission_id_permission", + ), nullable=False, ), sa.PrimaryKeyConstraint("group_id", "permission_id", name="pk_group_permission"), @@ -158,9 +163,15 @@ def upgrade() -> None: sa.Column("start_date", sa.Date), sa.Column("end_date", sa.Date), sa.Column("c2_type", C2_TYPE, nullable=False, server_default="mythic"), - sa.Column("created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")), - sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), - sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column( + "created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL") + ), + sa.Column( + "created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), ) op.create_table( @@ -217,8 +228,12 @@ def upgrade() -> None: sa.Column("config_fernet", sa.LargeBinary, nullable=False), sa.Column("version", sa.Integer, nullable=False, server_default="1"), sa.Column("retired_at", sa.DateTime(timezone=True)), - sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), - sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column( + "created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), ) op.create_index( "ix_c2_credential_engagement_active", @@ -244,8 +259,12 @@ def upgrade() -> None: sa.Column("c2_type", C2_TYPE, nullable=False), sa.Column("status", HOST_STATUS, nullable=False, server_default="unknown"), sa.Column("last_seen", sa.DateTime(timezone=True)), - sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), - sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column( + "created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), ) op.create_index("ix_host_engagement_id", "host", ["engagement_id"]) @@ -267,9 +286,15 @@ def upgrade() -> None: sa.Column("tags", JSONB, nullable=False, server_default="[]"), sa.Column("version", sa.Integer, nullable=False, server_default="1"), sa.Column("is_published", sa.Boolean, nullable=False, server_default=sa.false()), - sa.Column("created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")), - sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), - sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column( + "created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL") + ), + sa.Column( + "created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), ) # No `ttp_version` table — H32 / D-009: snapshot lives on run.snapshot_json. @@ -287,9 +312,15 @@ def upgrade() -> None: sa.Column("description", sa.Text), sa.Column("version", sa.Integer, nullable=False, server_default="1"), sa.Column("c2_type", C2_TYPE, nullable=False), - sa.Column("created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")), - sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), - sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column( + "created_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL") + ), + sa.Column( + "created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), ) op.create_table( @@ -316,8 +347,12 @@ def upgrade() -> None: ), sa.Column("params_override_json", JSONB, nullable=False, server_default="{}"), sa.Column("delay_after_ms", sa.Integer, nullable=False, server_default="0"), - sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), - sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column( + "created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), sa.UniqueConstraint("scenario_id", "order_idx", name="uq_scenario_step_order_idx"), ) @@ -334,10 +369,16 @@ def upgrade() -> None: sa.Column("status", RUN_STATUS, nullable=False, server_default="queued"), sa.Column("started_at", sa.DateTime(timezone=True)), sa.Column("ended_at", sa.DateTime(timezone=True)), - sa.Column("started_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")), + sa.Column( + "started_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL") + ), sa.Column("snapshot_json", JSONB, nullable=False, server_default="{}"), - sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), - sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column( + "created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), ) op.create_table( @@ -363,8 +404,12 @@ def upgrade() -> None: sa.Column("output_blob_ref", sa.String(512)), sa.Column("exit_code", sa.Integer), sa.Column("resolved_payload_text", sa.Text), - sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), - sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column( + "created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), ) op.create_table( @@ -382,9 +427,15 @@ def upgrade() -> None: sa.Column("ended_at", sa.DateTime(timezone=True)), sa.Column("resolved_command_text", sa.Text), sa.Column("output", sa.Text), - sa.Column("executed_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")), - sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), - sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column( + "executed_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL") + ), + sa.Column( + "created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), ) # ----------------------------------------------------- detection / evidence @@ -408,8 +459,12 @@ def upgrade() -> None: sa.Column("latency_ms", sa.Integer), sa.Column("comment", sa.Text), sa.Column("recorded_at", sa.DateTime(timezone=True), nullable=False), - sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), - sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column( + "created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), ) op.create_table( @@ -432,8 +487,12 @@ def upgrade() -> None: sa.Column("artifact_files_json", JSONB, nullable=False, server_default="[]"), sa.Column("comment", sa.Text), sa.Column("recorded_at", sa.DateTime(timezone=True), nullable=False), - sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), - sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column( + "created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), ) # ----------------------------------------------------------------- report @@ -452,9 +511,15 @@ def upgrade() -> None: sa.Column("pdf_path", sa.String(512)), sa.Column("md_path", sa.String(512)), sa.Column("generated_at", sa.DateTime(timezone=True), nullable=False), - sa.Column("generated_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL")), - sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), - sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column( + "generated_by_id", UUID(as_uuid=True), sa.ForeignKey("user.id", ondelete="SET NULL") + ), + sa.Column( + "created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), ) # ------------------------------------------------------------- soc_session @@ -479,8 +544,12 @@ def upgrade() -> None: sa.Column("last_ip", sa.String(64)), sa.Column("last_user_agent", sa.String(512)), sa.Column("last_used_at", sa.DateTime(timezone=True)), - sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), - sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column( + "created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False + ), ) # ------------------------------------------------------------ audit_log diff --git a/backend/src/mimic/db/models/audit.py b/backend/src/mimic/db/models/audit.py index 3c9861f..c1a299a 100644 --- a/backend/src/mimic/db/models/audit.py +++ b/backend/src/mimic/db/models/audit.py @@ -10,6 +10,7 @@ WORM enforcement without a migration; sprint 0 fills the columns at insert. from __future__ import annotations from datetime import datetime +from typing import Any from uuid import UUID from sqlalchemy import JSON, DateTime, ForeignKey, String, Text, func @@ -26,13 +27,11 @@ class AuditLog(UuidPkMixin, Base): nullable=False, server_default=func.now(), ) - actor_id: Mapped[UUID | None] = mapped_column( - ForeignKey("user.id", ondelete="SET NULL") - ) + actor_id: Mapped[UUID | None] = mapped_column(ForeignKey("user.id", ondelete="SET NULL")) action: Mapped[str] = mapped_column(String(80), nullable=False) resource_type: Mapped[str] = mapped_column(String(80), nullable=False) resource_id: Mapped[str | None] = mapped_column(String(128)) - metadata_json: Mapped[dict] = mapped_column(JSON, nullable=False, default=dict) + metadata_json: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default=dict) prev_hash: Mapped[str | None] = mapped_column(String(64)) row_hash: Mapped[str] = mapped_column(String(64), nullable=False) diff --git a/backend/src/mimic/db/models/detection.py b/backend/src/mimic/db/models/detection.py index 9f11c29..8edce89 100644 --- a/backend/src/mimic/db/models/detection.py +++ b/backend/src/mimic/db/models/detection.py @@ -3,7 +3,7 @@ from __future__ import annotations from datetime import datetime -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from uuid import UUID from sqlalchemy import ( @@ -45,9 +45,7 @@ class Detection(UuidPkMixin, TimestampsMixin, Base): ) latency_ms: Mapped[int | None] = mapped_column(Integer) comment: Mapped[str | None] = mapped_column(Text) - recorded_at: Mapped[datetime] = mapped_column( - DateTime(timezone=True), nullable=False - ) + recorded_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) run_step: Mapped[RunStep] = relationship() soc_user: Mapped[User] = relationship() @@ -69,14 +67,12 @@ class Evidence(UuidPkMixin, TimestampsMixin, Base): nullable=False, ) artifacts_text: Mapped[str | None] = mapped_column(Text) - artifact_files_json: Mapped[list[dict]] = mapped_column( + artifact_files_json: Mapped[list[dict[str, Any]]] = mapped_column( JSON, default=list, nullable=False ) # Each entry: {"name": str, "ref": str, "sha256": str, "size_bytes": int} comment: Mapped[str | None] = mapped_column(Text) - recorded_at: Mapped[datetime] = mapped_column( - DateTime(timezone=True), nullable=False - ) + recorded_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) run_step: Mapped[RunStep] = relationship() rt_user: Mapped[User] = relationship() diff --git a/backend/src/mimic/db/models/engagement.py b/backend/src/mimic/db/models/engagement.py index 48b326e..d57a174 100644 --- a/backend/src/mimic/db/models/engagement.py +++ b/backend/src/mimic/db/models/engagement.py @@ -37,9 +37,7 @@ class Engagement(UuidPkMixin, TimestampsMixin, Base): nullable=False, ) - created_by_id: Mapped[UUID | None] = mapped_column( - ForeignKey("user.id", ondelete="SET NULL") - ) + created_by_id: Mapped[UUID | None] = mapped_column(ForeignKey("user.id", ondelete="SET NULL")) hosts: Mapped[list[Host]] = relationship( back_populates="engagement", diff --git a/backend/src/mimic/db/models/permission.py b/backend/src/mimic/db/models/permission.py index d6fbeef..81d6255 100644 --- a/backend/src/mimic/db/models/permission.py +++ b/backend/src/mimic/db/models/permission.py @@ -64,14 +64,10 @@ class GroupPermission(Base): class UserGroup(Base): __tablename__ = "user_group" __table_args__ = ( - PrimaryKeyConstraint( - "user_id", "group_id", "engagement_id", name="pk_user_group" - ), + PrimaryKeyConstraint("user_id", "group_id", "engagement_id", name="pk_user_group"), ) - user_id: Mapped[UUID] = mapped_column( - ForeignKey("user.id", ondelete="CASCADE"), nullable=False - ) + user_id: Mapped[UUID] = mapped_column(ForeignKey("user.id", ondelete="CASCADE"), nullable=False) group_id: Mapped[UUID] = mapped_column( ForeignKey("group.id", ondelete="CASCADE"), nullable=False ) diff --git a/backend/src/mimic/db/models/report.py b/backend/src/mimic/db/models/report.py index 36ad5a6..5c6dd2d 100644 --- a/backend/src/mimic/db/models/report.py +++ b/backend/src/mimic/db/models/report.py @@ -3,7 +3,7 @@ from __future__ import annotations from datetime import datetime -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from uuid import UUID from sqlalchemy import ( @@ -30,7 +30,7 @@ class Report(UuidPkMixin, TimestampsMixin, Base): ) version: Mapped[int] = mapped_column(Integer, default=1, nullable=False) - content_json: Mapped[dict] = mapped_column(JSON, nullable=False, default=dict) + content_json: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default=dict) content_sha256: Mapped[str] = mapped_column(String(64), nullable=False) # SHA-256 of canonical JSON. Identical hash referenced in PDF footer, JSON # export and Markdown export (spec H19 / F9 / F14). @@ -38,11 +38,7 @@ class Report(UuidPkMixin, TimestampsMixin, Base): pdf_path: Mapped[str | None] = mapped_column(String(512)) md_path: Mapped[str | None] = mapped_column(String(512)) - generated_at: Mapped[datetime] = mapped_column( - DateTime(timezone=True), nullable=False - ) - generated_by_id: Mapped[UUID | None] = mapped_column( - ForeignKey("user.id", ondelete="SET NULL") - ) + generated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + generated_by_id: Mapped[UUID | None] = mapped_column(ForeignKey("user.id", ondelete="SET NULL")) engagement: Mapped[Engagement] = relationship() diff --git a/backend/src/mimic/db/models/run.py b/backend/src/mimic/db/models/run.py index 1efb6a0..c0af64e 100644 --- a/backend/src/mimic/db/models/run.py +++ b/backend/src/mimic/db/models/run.py @@ -3,7 +3,7 @@ from __future__ import annotations from datetime import datetime -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from uuid import UUID from sqlalchemy import ( @@ -39,11 +39,9 @@ class Run(UuidPkMixin, TimestampsMixin, Base): started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) ended_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) - started_by_id: Mapped[UUID | None] = mapped_column( - ForeignKey("user.id", ondelete="SET NULL") - ) + started_by_id: Mapped[UUID | None] = mapped_column(ForeignKey("user.id", ondelete="SET NULL")) - snapshot_json: Mapped[dict] = mapped_column(JSON, nullable=False, default=dict) + snapshot_json: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default=dict) # Full self-contained snapshot of scenario + steps + resolved TTPs. # Source of truth for replay (spec H32). @@ -111,8 +109,6 @@ class RunStepCleanup(UuidPkMixin, TimestampsMixin, Base): ended_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) resolved_command_text: Mapped[str | None] = mapped_column(Text) output: Mapped[str | None] = mapped_column(Text) - executed_by_id: Mapped[UUID | None] = mapped_column( - ForeignKey("user.id", ondelete="SET NULL") - ) + executed_by_id: Mapped[UUID | None] = mapped_column(ForeignKey("user.id", ondelete="SET NULL")) run_step: Mapped[RunStep] = relationship(back_populates="cleanup") diff --git a/backend/src/mimic/db/models/scenario.py b/backend/src/mimic/db/models/scenario.py index 19198cd..33dc03a 100644 --- a/backend/src/mimic/db/models/scenario.py +++ b/backend/src/mimic/db/models/scenario.py @@ -6,7 +6,7 @@ run start that every referenced host matches. from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from uuid import UUID from sqlalchemy import ( @@ -45,9 +45,7 @@ class Scenario(UuidPkMixin, TimestampsMixin, Base): Enum(C2Type, name="c2_type", create_type=False), nullable=False, ) - created_by_id: Mapped[UUID | None] = mapped_column( - ForeignKey("user.id", ondelete="SET NULL") - ) + created_by_id: Mapped[UUID | None] = mapped_column(ForeignKey("user.id", ondelete="SET NULL")) engagement: Mapped[Engagement] = relationship(back_populates="scenarios") steps: Mapped[list[ScenarioStep]] = relationship( @@ -78,7 +76,7 @@ class ScenarioStep(UuidPkMixin, TimestampsMixin, Base): ForeignKey("host.id", ondelete="RESTRICT"), nullable=False, ) - params_override_json: Mapped[dict] = mapped_column(JSON, default=dict, nullable=False) + params_override_json: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict, nullable=False) delay_after_ms: Mapped[int] = mapped_column(Integer, default=0, nullable=False) scenario: Mapped[Scenario] = relationship(back_populates="steps") diff --git a/backend/src/mimic/db/models/soc_session.py b/backend/src/mimic/db/models/soc_session.py index 0c29a1a..add38cd 100644 --- a/backend/src/mimic/db/models/soc_session.py +++ b/backend/src/mimic/db/models/soc_session.py @@ -35,9 +35,7 @@ class SocSession(UuidPkMixin, TimestampsMixin, Base): token_hash: Mapped[str] = mapped_column(String(255), nullable=False, unique=True) # bcrypt hash. Plain token returned once at creation. - expires_at: Mapped[datetime] = mapped_column( - DateTime(timezone=True), nullable=False - ) + expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) revoked_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) last_ip: Mapped[str | None] = mapped_column(String(64)) diff --git a/backend/src/mimic/db/models/ttp.py b/backend/src/mimic/db/models/ttp.py index d6b82e7..7aada93 100644 --- a/backend/src/mimic/db/models/ttp.py +++ b/backend/src/mimic/db/models/ttp.py @@ -8,6 +8,7 @@ counter (§8) — bumped on edit, never referenced for replay. from __future__ import annotations +from typing import Any from uuid import UUID from sqlalchemy import ( @@ -39,7 +40,7 @@ class Ttp(UuidPkMixin, TimestampsMixin, Base): nullable=False, ) payload_template: Mapped[str] = mapped_column(Text, nullable=False, default="") - params_schema_json: Mapped[dict | None] = mapped_column(JSON) + params_schema_json: Mapped[dict[str, Any] | None] = mapped_column(JSON) opsec_notes: Mapped[str | None] = mapped_column(Text) cleanup_command: Mapped[str | None] = mapped_column(Text) @@ -57,6 +58,4 @@ class Ttp(UuidPkMixin, TimestampsMixin, Base): is_published: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) # Promoted to the library (lead RT only — F11). - created_by_id: Mapped[UUID | None] = mapped_column( - ForeignKey("user.id", ondelete="SET NULL") - ) + created_by_id: Mapped[UUID | None] = mapped_column(ForeignKey("user.id", ondelete="SET NULL")) diff --git a/backend/src/mimic/logging.py b/backend/src/mimic/logging.py index bb84ffc..1a44fde 100644 --- a/backend/src/mimic/logging.py +++ b/backend/src/mimic/logging.py @@ -5,7 +5,7 @@ from __future__ import annotations import logging import sys -from pythonjsonlogger.jsonlogger import JsonFormatter +from pythonjsonlogger.jsonlogger import JsonFormatter # type: ignore[attr-defined] def configure_logging(level: str = "INFO", *, as_json: bool = True) -> None: @@ -16,9 +16,7 @@ def configure_logging(level: str = "INFO", *, as_json: bool = True) -> None: "%(asctime)s %(levelname)s %(name)s %(message)s" ) else: - formatter = logging.Formatter( - "%(asctime)s %(levelname)-8s %(name)s: %(message)s" - ) + formatter = logging.Formatter("%(asctime)s %(levelname)-8s %(name)s: %(message)s") handler.setFormatter(formatter) root = logging.getLogger() diff --git a/backend/src/mimic/rbac/decorators.py b/backend/src/mimic/rbac/decorators.py index ae342a2..594f47f 100644 --- a/backend/src/mimic/rbac/decorators.py +++ b/backend/src/mimic/rbac/decorators.py @@ -4,16 +4,13 @@ from __future__ import annotations from collections.abc import Callable from functools import wraps -from typing import TYPE_CHECKING, ParamSpec, TypeVar +from typing import ParamSpec, TypeVar from flask import abort from flask_login import current_user from mimic.rbac.matrix import Permission -if TYPE_CHECKING: - pass - P = ParamSpec("P") R = TypeVar("R") @@ -32,7 +29,7 @@ def require_perm(perm: Permission) -> Callable[[Callable[P, R]], Callable[P, R]] abort(403) return view(*args, **kwargs) - return _wrapped # type: ignore[return-value] + return _wrapped return _decorate diff --git a/backend/src/mimic/schemas/scenario.py b/backend/src/mimic/schemas/scenario.py index 52752b3..ceebed6 100644 --- a/backend/src/mimic/schemas/scenario.py +++ b/backend/src/mimic/schemas/scenario.py @@ -1,5 +1,6 @@ from __future__ import annotations +from typing import Any from uuid import UUID from pydantic import BaseModel, ConfigDict, Field @@ -11,7 +12,7 @@ class ScenarioStepBase(BaseModel): ttp_id: UUID host_id: UUID order_idx: int = Field(ge=0) - params_override_json: dict = Field(default_factory=dict) + params_override_json: dict[str, Any] = Field(default_factory=dict) delay_after_ms: int = Field(default=0, ge=0) diff --git a/backend/src/mimic/schemas/ttp.py b/backend/src/mimic/schemas/ttp.py index f1d59db..5d475fd 100644 --- a/backend/src/mimic/schemas/ttp.py +++ b/backend/src/mimic/schemas/ttp.py @@ -1,5 +1,6 @@ from __future__ import annotations +from typing import Any from uuid import UUID from pydantic import BaseModel, ConfigDict, Field @@ -14,7 +15,7 @@ class TtpBase(BaseModel): mitre_subtechnique: str | None = Field(default=None, max_length=16) payload_type: PayloadType payload_template: str = "" - params_schema_json: dict | None = None + params_schema_json: dict[str, Any] | None = None opsec_notes: str | None = None cleanup_command: str | None = None is_stealth_variant: bool = False @@ -33,7 +34,7 @@ class TtpUpdate(BaseModel): mitre_subtechnique: str | None = Field(default=None, max_length=16) payload_type: PayloadType | None = None payload_template: str | None = None - params_schema_json: dict | None = None + params_schema_json: dict[str, Any] | None = None opsec_notes: str | None = None cleanup_command: str | None = None is_stealth_variant: bool | None = None @@ -46,4 +47,4 @@ class TtpRead(TtpBase): id: UUID is_published: bool - current_version: int + version: int diff --git a/backend/src/mimic/templating/filters.py b/backend/src/mimic/templating/filters.py index 0df2f53..950171e 100644 --- a/backend/src/mimic/templating/filters.py +++ b/backend/src/mimic/templating/filters.py @@ -13,17 +13,17 @@ from __future__ import annotations import re -from typing import Any +from types import ModuleType +from typing import Any, cast from jinja2 import TemplateError try: # pragma: no cover - presence depends on environment - import re2 as _re2 # type: ignore[import-not-found] + import re2 as _imported_re2 - _HAS_RE2 = True + _re2: ModuleType | None = _imported_re2 except ImportError: # pragma: no cover _re2 = None - _HAS_RE2 = False _FALLBACK_MAX_INPUT = 1 * 1024 * 1024 # 1 MB safety cap when re2 missing @@ -41,8 +41,8 @@ def regex_extract( raise TemplateError(f"regex_extract: cannot match against None for /{pattern}/") haystack = text if isinstance(text, str) else str(text) - if _HAS_RE2: - compiled = _re2.compile(pattern) + if _re2 is not None: + compiled = cast(Any, _re2).compile(pattern) match = compiled.search(haystack) else: if len(haystack) > _FALLBACK_MAX_INPUT: @@ -56,28 +56,22 @@ def regex_extract( try: captured = match.group(name) except IndexError as exc: - raise TemplateError( - f"regex_extract: named group {name!r} not in /{pattern}/" - ) from exc + raise TemplateError(f"regex_extract: named group {name!r} not in /{pattern}/") from exc if captured is None: raise TemplateError( f"regex_extract: named group {name!r} captured nothing in /{pattern}/" ) - return captured + return str(captured) try: captured = match.group(group) except IndexError: if group == 1: - return match.group(0) - raise TemplateError( - f"regex_extract: group {group} out of range for /{pattern}/" - ) from None + return str(match.group(0)) + raise TemplateError(f"regex_extract: group {group} out of range for /{pattern}/") from None if captured is None: if group == 1: - return match.group(0) - raise TemplateError( - f"regex_extract: group {group} captured nothing in /{pattern}/" - ) - return captured + return str(match.group(0)) + raise TemplateError(f"regex_extract: group {group} captured nothing in /{pattern}/") + return str(captured) diff --git a/backend/tests/integration/conftest.py b/backend/tests/integration/conftest.py index ebbb4f1..a9eea82 100644 --- a/backend/tests/integration/conftest.py +++ b/backend/tests/integration/conftest.py @@ -22,9 +22,7 @@ def postgres_dsn() -> Iterator[str]: if PostgresContainer is None: pytest.skip("testcontainers not installed") with PostgresContainer("postgres:16-alpine") as pg: - url = pg.get_connection_url().replace( - "postgresql+psycopg2", "postgresql+psycopg" - ) + url = pg.get_connection_url().replace("postgresql+psycopg2", "postgresql+psycopg") yield url