Code-review MAJOR MA2. The previous `store_blob(root, data: bytes)` signature forced the entire payload into RAM before the 10 MB cap was checked — a hostile-large output blob could OOM the worker before the limit even fired. New signature: `store_blob(root, stream, *, max_bytes=10_485_760)`. The implementation: - reads from `stream` in 64 KB chunks; - updates the sha256 + writes to `<root>/.tmp-<pid>-<rand>.gz` incrementally; - raises `BlobTooLarge(max_bytes)` as soon as the running total crosses the cap, then unlinks the partial temp file via `contextlib.suppress`; - atomic-renames the temp file to the CAS path `<aa>/<bb>/<sha256>.gz` once the stream finishes; - sets `0o750` on the directory and `0o640` on the file with explicit `os.chmod` (does not rely on the process umask). Updated unit tests cover: BlobTooLarge enforcement (with temp-file cleanup), multi-chunk happy path (1.5 MB payload exercising the 64 KB loop), and `max_bytes <= 0` validation.
78 lines
2.5 KiB
Python
78 lines
2.5 KiB
Python
"""Content-addressed gzip blob store (D-012, MA2 streaming)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import gzip
|
|
import hashlib
|
|
import io
|
|
|
|
import pytest
|
|
|
|
from mimic.storage.blob import BlobTooLarge, blob_path, store_blob
|
|
|
|
|
|
def _stream(data: bytes) -> io.BytesIO:
|
|
return io.BytesIO(data)
|
|
|
|
|
|
def test_blob_path_uses_two_byte_pairs(tmp_path) -> None:
|
|
digest = "ab" + "cd" + "ef" * 30
|
|
path = blob_path(tmp_path, digest)
|
|
assert path == tmp_path / "ab" / "cd" / f"{digest}.gz"
|
|
|
|
|
|
def test_blob_path_rejects_invalid_digest(tmp_path) -> None:
|
|
with pytest.raises(ValueError, match="invalid sha256"):
|
|
blob_path(tmp_path, "not-a-digest")
|
|
|
|
|
|
def test_store_blob_writes_gzip_and_returns_digest(tmp_path) -> None:
|
|
payload = b"hello world\n"
|
|
expected = hashlib.sha256(payload).hexdigest()
|
|
digest, path = store_blob(tmp_path, _stream(payload))
|
|
assert digest == expected
|
|
assert path == tmp_path / expected[0:2] / expected[2:4] / f"{expected}.gz"
|
|
with gzip.open(path, "rb") as fh:
|
|
assert fh.read() == payload
|
|
|
|
|
|
def test_store_blob_is_idempotent(tmp_path) -> None:
|
|
payload = b"same content"
|
|
digest1, path1 = store_blob(tmp_path, _stream(payload))
|
|
mtime_before = path1.stat().st_mtime_ns
|
|
digest2, path2 = store_blob(tmp_path, _stream(payload))
|
|
assert digest1 == digest2
|
|
assert path1 == path2
|
|
assert path2.stat().st_mtime_ns == mtime_before
|
|
|
|
|
|
def test_store_blob_dedupes_distinct_payloads(tmp_path) -> None:
|
|
_, p1 = store_blob(tmp_path, _stream(b"alpha"))
|
|
_, p2 = store_blob(tmp_path, _stream(b"beta"))
|
|
assert p1 != p2
|
|
assert p1.exists()
|
|
assert p2.exists()
|
|
|
|
|
|
def test_store_blob_raises_when_stream_exceeds_cap(tmp_path) -> None:
|
|
too_big = b"A" * (1024 + 1)
|
|
with pytest.raises(BlobTooLarge):
|
|
store_blob(tmp_path, _stream(too_big), max_bytes=1024)
|
|
# No tmp file left behind.
|
|
leftovers = [p for p in tmp_path.iterdir() if p.name.startswith(".tmp-")]
|
|
assert leftovers == []
|
|
|
|
|
|
def test_store_blob_handles_large_stream_in_chunks(tmp_path) -> None:
|
|
# 1.5 MB payload — exercises the multi-chunk path (chunks are 64 KB).
|
|
payload = (b"X" * 64 * 1024) * 24
|
|
digest, path = store_blob(tmp_path, _stream(payload), max_bytes=2 * 1024 * 1024)
|
|
assert digest == hashlib.sha256(payload).hexdigest()
|
|
with gzip.open(path, "rb") as fh:
|
|
assert fh.read() == payload
|
|
|
|
|
|
def test_store_blob_rejects_zero_or_negative_max(tmp_path) -> None:
|
|
with pytest.raises(ValueError, match="max_bytes"):
|
|
store_blob(tmp_path, _stream(b"x"), max_bytes=0)
|