111 lines
4.0 KiB
Python
111 lines
4.0 KiB
Python
|
|
"""FakeAdapter M3 state-progression tests — get_task and get_task_output."""
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
|
||
|
|
from backend.app.services.c2.adapter import C2Error
|
||
|
|
from backend.app.services.c2.fake import FakeAdapter
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture()
|
||
|
|
def adapter() -> FakeAdapter:
|
||
|
|
return FakeAdapter()
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture()
|
||
|
|
def adapter_with_task(adapter: FakeAdapter) -> tuple[FakeAdapter, int]:
|
||
|
|
tid = adapter.create_task(callback_display_id=1, command="whoami")
|
||
|
|
return adapter, tid
|
||
|
|
|
||
|
|
|
||
|
|
class TestFakeAdapterGetTaskProgression:
|
||
|
|
def test_first_call_returns_submitted(self, adapter_with_task):
|
||
|
|
a, tid = adapter_with_task
|
||
|
|
status = a.get_task(tid)
|
||
|
|
assert status.status == "submitted"
|
||
|
|
assert status.completed is False
|
||
|
|
|
||
|
|
def test_second_call_returns_completed(self, adapter_with_task):
|
||
|
|
a, tid = adapter_with_task
|
||
|
|
a.get_task(tid) # first call
|
||
|
|
status = a.get_task(tid) # second call
|
||
|
|
assert status.status == "completed"
|
||
|
|
assert status.completed is True
|
||
|
|
|
||
|
|
def test_subsequent_calls_stay_completed(self, adapter_with_task):
|
||
|
|
a, tid = adapter_with_task
|
||
|
|
for _ in range(5):
|
||
|
|
a.get_task(tid)
|
||
|
|
status = a.get_task(tid)
|
||
|
|
assert status.completed is True
|
||
|
|
|
||
|
|
def test_unknown_task_id_returns_submitted_on_first_call(self, adapter):
|
||
|
|
"""A task ID not created by this instance still goes through submitted→completed."""
|
||
|
|
status = adapter.get_task(9999)
|
||
|
|
assert status.display_id == 9999
|
||
|
|
assert status.status == "submitted"
|
||
|
|
assert status.completed is False
|
||
|
|
|
||
|
|
def test_call_counters_are_per_task(self, adapter):
|
||
|
|
"""Two tasks have independent state — completing one does not affect the other."""
|
||
|
|
t1 = adapter.create_task(callback_display_id=1, command="whoami")
|
||
|
|
t2 = adapter.create_task(callback_display_id=1, command="ipconfig")
|
||
|
|
|
||
|
|
# Advance t1 to completed via two calls.
|
||
|
|
adapter.get_task(t1)
|
||
|
|
adapter.get_task(t1)
|
||
|
|
|
||
|
|
# t2 first call should still be submitted.
|
||
|
|
s2 = adapter.get_task(t2)
|
||
|
|
assert s2.status == "submitted"
|
||
|
|
assert s2.completed is False
|
||
|
|
|
||
|
|
def test_instances_are_isolated(self):
|
||
|
|
"""Per-instance counters — different FakeAdapter instances don't share state."""
|
||
|
|
a1 = FakeAdapter()
|
||
|
|
a2 = FakeAdapter()
|
||
|
|
|
||
|
|
t1 = a1.create_task(1, "cmd")
|
||
|
|
t2 = a2.create_task(1, "cmd")
|
||
|
|
|
||
|
|
a1.get_task(t1)
|
||
|
|
a1.get_task(t1) # a1's task is now completed
|
||
|
|
|
||
|
|
# a2's task with same display_id (both start at 1000) should be independent.
|
||
|
|
assert t1 == t2 == 1000
|
||
|
|
s2 = a2.get_task(t2)
|
||
|
|
assert s2.status == "submitted"
|
||
|
|
|
||
|
|
|
||
|
|
class TestFakeAdapterGetTaskOutput:
|
||
|
|
def test_raises_before_completed(self, adapter_with_task):
|
||
|
|
a, tid = adapter_with_task
|
||
|
|
with pytest.raises(C2Error, match="task not completed"):
|
||
|
|
a.get_task_output(tid)
|
||
|
|
|
||
|
|
def test_raises_after_first_get_task_call_only(self, adapter_with_task):
|
||
|
|
a, tid = adapter_with_task
|
||
|
|
a.get_task(tid) # first call — still submitted
|
||
|
|
with pytest.raises(C2Error, match="task not completed"):
|
||
|
|
a.get_task_output(tid)
|
||
|
|
|
||
|
|
def test_returns_output_after_completed(self, adapter_with_task):
|
||
|
|
a, tid = adapter_with_task
|
||
|
|
a.get_task(tid)
|
||
|
|
a.get_task(tid) # now completed
|
||
|
|
output = a.get_task_output(tid)
|
||
|
|
assert "whoami" in output
|
||
|
|
assert str(tid) in output
|
||
|
|
|
||
|
|
def test_output_format(self, adapter):
|
||
|
|
tid = adapter.create_task(callback_display_id=2, command="ipconfig /all")
|
||
|
|
adapter.get_task(tid)
|
||
|
|
adapter.get_task(tid)
|
||
|
|
output = adapter.get_task_output(tid)
|
||
|
|
assert output == f"output for task {tid}: ipconfig /all\n"
|
||
|
|
|
||
|
|
def test_unknown_task_raises_c2error(self, adapter):
|
||
|
|
"""Task ID never created and never polled — not completed → C2Error."""
|
||
|
|
with pytest.raises(C2Error, match="task not completed"):
|
||
|
|
adapter.get_task_output(9999)
|