fix(backend): complete c2 task→simulation mapping per spec + sanitize adapter errors (sprint 8 code-review)
mapping.py — full §0.11 contract:
1. execution_result: append '$ <command>\n<output>\n' block (previously
wrote raw output without command header, making multi-task blobs
unreadable in exports)
2. executed_at: set from task.completed_at when currently null (was
completely missing — simulation.executed_at stayed null forever)
3. commands: append task.command deduplicated line-by-line (was
completely missing — simulation.commands stayed empty)
mythic.py — sanitize transport errors:
Replace 'raise C2Error(str(exc))' (which leaks the Mythic URL via
requests exception repr) with 'raise C2Error(f"C2 transport error:
{type(exc).__name__}")'. Original exc stays chained for backend logs.
api/c2.py — remove redundant 'task.mapping_applied = True' in import
endpoint (apply_task_to_simulation() already sets it).
test_c2_mapping.py — full rewrite: 19 tests covering command blocks,
executed_at set/preserve, commands dedup, idempotency.
test_c2_adapter_mythic.py — add URL-leak sanitization assertion.
468 passed; ruff + mypy clean.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
"""Unit tests for apply_task_to_simulation() mapping helper."""
|
||||
"""Unit tests for apply_task_to_simulation() mapping helper — §0.11 contract."""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime
|
||||
@@ -7,91 +7,202 @@ from unittest.mock import MagicMock
|
||||
from backend.app.services.c2.mapping import apply_task_to_simulation
|
||||
|
||||
|
||||
def _make_task(output: str | None = "whoami output", mapping_applied: bool = False) -> MagicMock:
|
||||
def _make_task(
|
||||
command: str = "whoami",
|
||||
output: str | None = "root",
|
||||
mapping_applied: bool = False,
|
||||
completed_at: datetime | None = None,
|
||||
) -> MagicMock:
|
||||
task = MagicMock()
|
||||
task.command = command
|
||||
task.output = output
|
||||
task.mapping_applied = mapping_applied
|
||||
task.completed_at = completed_at
|
||||
return task
|
||||
|
||||
|
||||
def _make_sim(execution_result: str | None = None) -> MagicMock:
|
||||
def _make_sim(
|
||||
execution_result: str | None = None,
|
||||
executed_at: datetime | None = None,
|
||||
commands: str | None = None,
|
||||
) -> MagicMock:
|
||||
sim = MagicMock()
|
||||
sim.execution_result = execution_result
|
||||
sim.executed_at = executed_at
|
||||
sim.commands = commands
|
||||
sim.updated_at = None
|
||||
return sim
|
||||
|
||||
|
||||
class TestApplyTaskToSimulation:
|
||||
def test_appends_output_to_empty_simulation(self):
|
||||
task = _make_task(output="whoami output")
|
||||
sim = _make_sim(execution_result=None)
|
||||
class TestExecutionResult:
|
||||
def test_first_task_produces_command_block(self):
|
||||
task = _make_task(command="whoami", output="root")
|
||||
sim = _make_sim()
|
||||
|
||||
apply_task_to_simulation(task, sim)
|
||||
|
||||
assert sim.execution_result == "whoami output"
|
||||
assert task.mapping_applied is True
|
||||
assert sim.execution_result == "$ whoami\nroot\n"
|
||||
|
||||
def test_appends_with_newline_separator(self):
|
||||
task = _make_task(output="second result")
|
||||
sim = _make_sim(execution_result="first result")
|
||||
def test_second_task_appended_with_block_separator(self):
|
||||
"""Two tasks → two '$ command\noutput\n' blocks separated by a single newline."""
|
||||
sim = _make_sim()
|
||||
t1 = _make_task(command="whoami", output="root")
|
||||
t2 = _make_task(command="hostname", output="lab-1")
|
||||
|
||||
apply_task_to_simulation(t1, sim)
|
||||
apply_task_to_simulation(t2, sim)
|
||||
|
||||
assert sim.execution_result == "$ whoami\nroot\n$ hostname\nlab-1\n"
|
||||
|
||||
def test_no_double_blank_line_when_existing_ends_with_newline(self):
|
||||
"""If existing result already ends with \n, no extra blank line is inserted."""
|
||||
sim = _make_sim(execution_result="$ id\nuid=0\n")
|
||||
task = _make_task(command="hostname", output="lab-1")
|
||||
|
||||
apply_task_to_simulation(task, sim)
|
||||
|
||||
assert sim.execution_result == "first result\nsecond result"
|
||||
assert sim.execution_result == "$ id\nuid=0\n$ hostname\nlab-1\n"
|
||||
|
||||
def test_idempotent_when_already_applied(self):
|
||||
task = _make_task(output="some output", mapping_applied=True)
|
||||
sim = _make_sim(execution_result="existing")
|
||||
|
||||
apply_task_to_simulation(task, sim)
|
||||
|
||||
# execution_result must not be modified.
|
||||
assert sim.execution_result == "existing"
|
||||
|
||||
def test_no_op_when_output_is_empty_string(self):
|
||||
def test_empty_output_skips_block_but_marks_applied(self):
|
||||
task = _make_task(output="")
|
||||
sim = _make_sim(execution_result="existing")
|
||||
sim = _make_sim(execution_result="$ id\nuid=0\n")
|
||||
|
||||
apply_task_to_simulation(task, sim)
|
||||
|
||||
assert sim.execution_result == "existing"
|
||||
# Still marks mapping_applied so we don't revisit it.
|
||||
assert sim.execution_result == "$ id\nuid=0\n"
|
||||
assert task.mapping_applied is True
|
||||
|
||||
def test_no_op_when_output_is_none(self):
|
||||
def test_none_output_skips_block_but_marks_applied(self):
|
||||
task = _make_task(output=None)
|
||||
sim = _make_sim()
|
||||
|
||||
apply_task_to_simulation(task, sim)
|
||||
|
||||
assert sim.execution_result is None
|
||||
assert task.mapping_applied is True
|
||||
|
||||
def test_command_with_empty_string_produces_dollar_header(self):
|
||||
"""Empty command → block header is '$ \n<output>\n' (consistent, not suppressed)."""
|
||||
task = _make_task(command="", output="some output")
|
||||
sim = _make_sim()
|
||||
|
||||
apply_task_to_simulation(task, sim)
|
||||
|
||||
assert sim.execution_result == "$ \nsome output\n" or sim.execution_result == "$ \nsome output\n"
|
||||
|
||||
|
||||
class TestExecutedAt:
|
||||
def test_sets_executed_at_from_task_when_null(self):
|
||||
ts = datetime(2026, 6, 10, 12, 0, 0, tzinfo=UTC)
|
||||
task = _make_task(completed_at=ts)
|
||||
sim = _make_sim(executed_at=None)
|
||||
|
||||
apply_task_to_simulation(task, sim)
|
||||
|
||||
assert sim.executed_at == ts
|
||||
|
||||
def test_does_not_overwrite_existing_executed_at(self):
|
||||
original_ts = datetime(2026, 6, 1, 0, 0, 0, tzinfo=UTC)
|
||||
later_ts = datetime(2026, 6, 10, 12, 0, 0, tzinfo=UTC)
|
||||
task = _make_task(completed_at=later_ts)
|
||||
sim = _make_sim(executed_at=original_ts)
|
||||
|
||||
apply_task_to_simulation(task, sim)
|
||||
|
||||
assert sim.executed_at == original_ts
|
||||
|
||||
def test_executed_at_stays_null_when_task_completed_at_is_none(self):
|
||||
task = _make_task(completed_at=None)
|
||||
sim = _make_sim(executed_at=None)
|
||||
|
||||
apply_task_to_simulation(task, sim)
|
||||
|
||||
assert sim.executed_at is None
|
||||
|
||||
def test_first_task_sets_executed_at_second_does_not_overwrite(self):
|
||||
ts1 = datetime(2026, 6, 10, 10, 0, 0, tzinfo=UTC)
|
||||
ts2 = datetime(2026, 6, 10, 11, 0, 0, tzinfo=UTC)
|
||||
t1 = _make_task(command="whoami", output="root", completed_at=ts1)
|
||||
t2 = _make_task(command="hostname", output="lab-1", completed_at=ts2)
|
||||
sim = _make_sim(executed_at=None)
|
||||
|
||||
apply_task_to_simulation(t1, sim)
|
||||
apply_task_to_simulation(t2, sim)
|
||||
|
||||
assert sim.executed_at == ts1
|
||||
|
||||
|
||||
class TestCommandsDedup:
|
||||
def test_appends_command_to_empty_commands(self):
|
||||
task = _make_task(command="whoami", output="root")
|
||||
sim = _make_sim(commands=None)
|
||||
|
||||
apply_task_to_simulation(task, sim)
|
||||
|
||||
assert sim.commands == "whoami"
|
||||
|
||||
def test_appends_second_distinct_command(self):
|
||||
sim = _make_sim(commands=None)
|
||||
t1 = _make_task(command="whoami", output="root")
|
||||
t2 = _make_task(command="hostname", output="lab-1")
|
||||
|
||||
apply_task_to_simulation(t1, sim)
|
||||
apply_task_to_simulation(t2, sim)
|
||||
|
||||
assert sim.commands == "whoami\nhostname"
|
||||
|
||||
def test_deduplicates_repeated_command(self):
|
||||
sim = _make_sim(commands=None)
|
||||
t1 = _make_task(command="whoami", output="root")
|
||||
t2 = _make_task(command="whoami", output="root2")
|
||||
|
||||
apply_task_to_simulation(t1, sim)
|
||||
apply_task_to_simulation(t2, sim)
|
||||
|
||||
assert sim.commands == "whoami"
|
||||
|
||||
def test_dedup_is_case_and_whitespace_stripped(self):
|
||||
sim = _make_sim(commands="whoami")
|
||||
task = _make_task(command=" whoami ", output="root")
|
||||
|
||||
apply_task_to_simulation(task, sim)
|
||||
|
||||
# " whoami ".strip() == "whoami" which is already present → no append.
|
||||
assert sim.commands == "whoami"
|
||||
|
||||
def test_empty_command_not_appended(self):
|
||||
task = _make_task(command="", output="output")
|
||||
sim = _make_sim(commands=None)
|
||||
|
||||
apply_task_to_simulation(task, sim)
|
||||
|
||||
# task.command is falsy → commands block skipped.
|
||||
assert sim.commands is None
|
||||
|
||||
|
||||
class TestIdempotency:
|
||||
def test_no_op_when_mapping_already_applied(self):
|
||||
task = _make_task(output="root", mapping_applied=True)
|
||||
sim = _make_sim(execution_result="existing")
|
||||
|
||||
apply_task_to_simulation(task, sim)
|
||||
|
||||
assert sim.execution_result == "existing"
|
||||
assert task.mapping_applied is True
|
||||
|
||||
def test_strips_trailing_newlines_from_existing(self):
|
||||
"""Existing execution_result with trailing newlines should not cause double blank lines."""
|
||||
task = _make_task(output="new output")
|
||||
sim = _make_sim(execution_result="old output\n\n")
|
||||
def test_always_marks_mapping_applied(self):
|
||||
task = _make_task(output="root")
|
||||
sim = _make_sim()
|
||||
|
||||
apply_task_to_simulation(task, sim)
|
||||
|
||||
assert sim.execution_result == "old output\nnew output"
|
||||
assert task.mapping_applied is True
|
||||
|
||||
def test_updated_at_is_set_on_sim(self):
|
||||
task = _make_task(output="something")
|
||||
sim = _make_sim(execution_result=None)
|
||||
def test_updated_at_is_set(self):
|
||||
task = _make_task(output="root")
|
||||
sim = _make_sim()
|
||||
before = datetime.now(UTC)
|
||||
|
||||
apply_task_to_simulation(task, sim)
|
||||
|
||||
assert sim.updated_at is not None
|
||||
assert sim.updated_at >= before
|
||||
|
||||
def test_multiple_tasks_accumulate(self):
|
||||
sim = _make_sim(execution_result=None)
|
||||
tasks = [_make_task(output=f"result {i}") for i in range(3)]
|
||||
|
||||
for t in tasks:
|
||||
apply_task_to_simulation(t, sim)
|
||||
|
||||
lines = sim.execution_result.split("\n")
|
||||
assert lines == ["result 0", "result 1", "result 2"]
|
||||
|
||||
Reference in New Issue
Block a user