205 lines
7.2 KiB
Python
205 lines
7.2 KiB
Python
|
|
"""Migration round-trip test for 0006_c2_layer.
|
||
|
|
|
||
|
|
Verifies that upgrade() creates c2_config and c2_task with the expected schema,
|
||
|
|
and that downgrade() removes both tables cleanly.
|
||
|
|
|
||
|
|
Uses the resolved-path pattern (derives path from __file__) to avoid the
|
||
|
|
hardcoded-path regression documented in lessons.md Sprint 4.
|
||
|
|
"""
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import importlib.util
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
from alembic.operations import Operations
|
||
|
|
from alembic.runtime.migration import MigrationContext
|
||
|
|
from sqlalchemy import create_engine, inspect, text
|
||
|
|
|
||
|
|
|
||
|
|
def _load_migration():
|
||
|
|
versions_dir = Path(__file__).resolve().parent.parent / "migrations" / "versions"
|
||
|
|
path = versions_dir / "0006_c2_layer.py"
|
||
|
|
spec = importlib.util.spec_from_file_location("migration_0006", path)
|
||
|
|
assert spec and spec.loader
|
||
|
|
mod = importlib.util.module_from_spec(spec)
|
||
|
|
spec.loader.exec_module(mod) # type: ignore[union-attr]
|
||
|
|
return mod
|
||
|
|
|
||
|
|
|
||
|
|
def _fresh_engine():
|
||
|
|
"""In-memory SQLite with the tables that 0006 depends on already present."""
|
||
|
|
engine = create_engine("sqlite:///:memory:")
|
||
|
|
with engine.begin() as conn:
|
||
|
|
conn.execute(
|
||
|
|
text(
|
||
|
|
"""
|
||
|
|
CREATE TABLE users (
|
||
|
|
id INTEGER PRIMARY KEY,
|
||
|
|
username TEXT NOT NULL UNIQUE,
|
||
|
|
password_hash TEXT NOT NULL,
|
||
|
|
role TEXT NOT NULL,
|
||
|
|
created_at DATETIME NOT NULL
|
||
|
|
)
|
||
|
|
"""
|
||
|
|
)
|
||
|
|
)
|
||
|
|
conn.execute(
|
||
|
|
text(
|
||
|
|
"""
|
||
|
|
CREATE TABLE engagements (
|
||
|
|
id INTEGER PRIMARY KEY,
|
||
|
|
name TEXT NOT NULL,
|
||
|
|
description TEXT,
|
||
|
|
start_date DATE NOT NULL,
|
||
|
|
end_date DATE,
|
||
|
|
status TEXT NOT NULL DEFAULT 'planned',
|
||
|
|
created_at DATETIME NOT NULL,
|
||
|
|
created_by_id INTEGER NOT NULL REFERENCES users(id)
|
||
|
|
)
|
||
|
|
"""
|
||
|
|
)
|
||
|
|
)
|
||
|
|
conn.execute(
|
||
|
|
text(
|
||
|
|
"""
|
||
|
|
CREATE TABLE simulations (
|
||
|
|
id INTEGER PRIMARY KEY,
|
||
|
|
engagement_id INTEGER NOT NULL REFERENCES engagements(id),
|
||
|
|
name TEXT NOT NULL,
|
||
|
|
techniques JSON NOT NULL DEFAULT '[]',
|
||
|
|
tactic_ids JSON NOT NULL DEFAULT '[]',
|
||
|
|
description TEXT,
|
||
|
|
commands TEXT,
|
||
|
|
prerequisites TEXT,
|
||
|
|
executed_at DATETIME,
|
||
|
|
execution_result TEXT,
|
||
|
|
log_source TEXT,
|
||
|
|
logs TEXT,
|
||
|
|
soc_comment TEXT,
|
||
|
|
incident_number TEXT,
|
||
|
|
status TEXT NOT NULL DEFAULT 'pending',
|
||
|
|
created_at DATETIME NOT NULL,
|
||
|
|
updated_at DATETIME,
|
||
|
|
created_by_id INTEGER NOT NULL REFERENCES users(id)
|
||
|
|
)
|
||
|
|
"""
|
||
|
|
)
|
||
|
|
)
|
||
|
|
return engine
|
||
|
|
|
||
|
|
|
||
|
|
def _run_upgrade(engine, migration_mod):
|
||
|
|
with engine.begin() as conn:
|
||
|
|
ctx = MigrationContext.configure(conn)
|
||
|
|
ops = Operations(ctx)
|
||
|
|
# Patch op module for the migration
|
||
|
|
import alembic.op as alembic_op
|
||
|
|
original_proxy = alembic_op._proxy # type: ignore[attr-defined]
|
||
|
|
alembic_op._proxy = ops # type: ignore[attr-defined]
|
||
|
|
try:
|
||
|
|
migration_mod.upgrade()
|
||
|
|
finally:
|
||
|
|
alembic_op._proxy = original_proxy # type: ignore[attr-defined]
|
||
|
|
|
||
|
|
|
||
|
|
def _run_downgrade(engine, migration_mod):
|
||
|
|
with engine.begin() as conn:
|
||
|
|
ctx = MigrationContext.configure(conn)
|
||
|
|
ops = Operations(ctx)
|
||
|
|
import alembic.op as alembic_op
|
||
|
|
original_proxy = alembic_op._proxy # type: ignore[attr-defined]
|
||
|
|
alembic_op._proxy = ops # type: ignore[attr-defined]
|
||
|
|
try:
|
||
|
|
migration_mod.downgrade()
|
||
|
|
finally:
|
||
|
|
alembic_op._proxy = original_proxy # type: ignore[attr-defined]
|
||
|
|
|
||
|
|
|
||
|
|
class TestMigration0006Upgrade:
|
||
|
|
def test_c2_config_table_created(self):
|
||
|
|
engine = _fresh_engine()
|
||
|
|
mod = _load_migration()
|
||
|
|
_run_upgrade(engine, mod)
|
||
|
|
|
||
|
|
insp = inspect(engine)
|
||
|
|
assert "c2_config" in insp.get_table_names()
|
||
|
|
|
||
|
|
def test_c2_task_table_created(self):
|
||
|
|
engine = _fresh_engine()
|
||
|
|
mod = _load_migration()
|
||
|
|
_run_upgrade(engine, mod)
|
||
|
|
|
||
|
|
insp = inspect(engine)
|
||
|
|
assert "c2_task" in insp.get_table_names()
|
||
|
|
|
||
|
|
def test_c2_config_columns(self):
|
||
|
|
engine = _fresh_engine()
|
||
|
|
mod = _load_migration()
|
||
|
|
_run_upgrade(engine, mod)
|
||
|
|
|
||
|
|
insp = inspect(engine)
|
||
|
|
cols = {c["name"] for c in insp.get_columns("c2_config")}
|
||
|
|
assert {"id", "engagement_id", "url", "api_token_encrypted",
|
||
|
|
"verify_tls", "created_at", "updated_at"} <= cols
|
||
|
|
|
||
|
|
def test_c2_config_unique_constraint_on_engagement_id(self):
|
||
|
|
engine = _fresh_engine()
|
||
|
|
mod = _load_migration()
|
||
|
|
_run_upgrade(engine, mod)
|
||
|
|
|
||
|
|
# Insert a user and engagement first.
|
||
|
|
with engine.begin() as conn:
|
||
|
|
conn.execute(text(
|
||
|
|
"INSERT INTO users (id, username, password_hash, role, created_at) "
|
||
|
|
"VALUES (1, 'u', 'h', 'admin', '2026-01-01')"
|
||
|
|
))
|
||
|
|
conn.execute(text(
|
||
|
|
"INSERT INTO engagements (id, name, start_date, status, created_at, created_by_id) "
|
||
|
|
"VALUES (1, 'Op', '2026-01-01', 'planned', '2026-01-01', 1)"
|
||
|
|
))
|
||
|
|
conn.execute(text(
|
||
|
|
"INSERT INTO c2_config (engagement_id, url, api_token_encrypted, verify_tls, created_at) "
|
||
|
|
"VALUES (1, 'https://c2', 'tok', 1, '2026-01-01')"
|
||
|
|
))
|
||
|
|
# Second insert on same engagement_id must fail.
|
||
|
|
try:
|
||
|
|
conn.execute(text(
|
||
|
|
"INSERT INTO c2_config (engagement_id, url, api_token_encrypted, verify_tls, created_at) "
|
||
|
|
"VALUES (1, 'https://c2b', 'tok2', 1, '2026-01-01')"
|
||
|
|
))
|
||
|
|
raised = False
|
||
|
|
except Exception:
|
||
|
|
raised = True
|
||
|
|
assert raised, "UNIQUE constraint on c2_config.engagement_id must be enforced"
|
||
|
|
|
||
|
|
def test_c2_task_columns(self):
|
||
|
|
engine = _fresh_engine()
|
||
|
|
mod = _load_migration()
|
||
|
|
_run_upgrade(engine, mod)
|
||
|
|
|
||
|
|
insp = inspect(engine)
|
||
|
|
cols = {c["name"] for c in insp.get_columns("c2_task")}
|
||
|
|
assert {"id", "simulation_id", "mythic_task_display_id", "callback_display_id",
|
||
|
|
"command", "params", "status", "completed", "output", "source",
|
||
|
|
"created_at", "completed_at"} <= cols
|
||
|
|
|
||
|
|
|
||
|
|
class TestMigration0006Downgrade:
|
||
|
|
def test_downgrade_removes_c2_config(self):
|
||
|
|
engine = _fresh_engine()
|
||
|
|
mod = _load_migration()
|
||
|
|
_run_upgrade(engine, mod)
|
||
|
|
_run_downgrade(engine, mod)
|
||
|
|
|
||
|
|
insp = inspect(engine)
|
||
|
|
assert "c2_config" not in insp.get_table_names()
|
||
|
|
|
||
|
|
def test_downgrade_removes_c2_task(self):
|
||
|
|
engine = _fresh_engine()
|
||
|
|
mod = _load_migration()
|
||
|
|
_run_upgrade(engine, mod)
|
||
|
|
_run_downgrade(engine, mod)
|
||
|
|
|
||
|
|
insp = inspect(engine)
|
||
|
|
assert "c2_task" not in insp.get_table_names()
|