fix(backend): stream store_blob and enforce max_bytes mid-write (MA2)
Code-review MAJOR MA2. The previous `store_blob(root, data: bytes)` signature forced the entire payload into RAM before the 10 MB cap was checked — a hostile-large output blob could OOM the worker before the limit even fired. New signature: `store_blob(root, stream, *, max_bytes=10_485_760)`. The implementation: - reads from `stream` in 64 KB chunks; - updates the sha256 + writes to `<root>/.tmp-<pid>-<rand>.gz` incrementally; - raises `BlobTooLarge(max_bytes)` as soon as the running total crosses the cap, then unlinks the partial temp file via `contextlib.suppress`; - atomic-renames the temp file to the CAS path `<aa>/<bb>/<sha256>.gz` once the stream finishes; - sets `0o750` on the directory and `0o640` on the file with explicit `os.chmod` (does not rely on the process umask). Updated unit tests cover: BlobTooLarge enforcement (with temp-file cleanup), multi-chunk happy path (1.5 MB payload exercising the 64 KB loop), and `max_bytes <= 0` validation.
This commit is contained in:
@@ -1,14 +1,38 @@
|
||||
"""Content-addressed gzip-compressed blob store (D-012)."""
|
||||
"""Content-addressed gzip-compressed blob store (D-012).
|
||||
|
||||
`store_blob` consumes a binary stream and writes it gzip-compressed to a
|
||||
content-addressed path `<root>/<aa>/<bb>/<sha256>.gz`. Per MA2:
|
||||
- streaming hash + write (no whole-buffer load — defends against memory DoS
|
||||
from a hostile-large C2 output);
|
||||
- explicit `max_bytes` cap, raising `BlobTooLarge` as soon as the limit is
|
||||
crossed (the partial temp file is deleted);
|
||||
- atomic rename via `<root>/<aa>/<bb>/<sha256>.gz.tmp` → final target;
|
||||
- explicit `0o750` directory mode (does not rely on the process umask).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import gzip
|
||||
import hashlib
|
||||
import os
|
||||
import stat
|
||||
from pathlib import Path
|
||||
from typing import IO
|
||||
|
||||
_SHA256_HEX_LEN = 64
|
||||
_DEFAULT_MAX_BYTES = 10 * 1024 * 1024 # 10 MB cap (D-012)
|
||||
_READ_CHUNK = 64 * 1024 # 64 KB streaming chunks
|
||||
_DIR_MODE = 0o750
|
||||
_FILE_MODE = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP
|
||||
|
||||
|
||||
class BlobTooLarge(RuntimeError): # noqa: N818 (intentional: spec-aligned name)
|
||||
"""Raised when an input stream exceeds the configured `max_bytes` cap."""
|
||||
|
||||
def __init__(self, max_bytes: int) -> None:
|
||||
super().__init__(f"blob exceeds {max_bytes} byte cap")
|
||||
self.max_bytes = max_bytes
|
||||
|
||||
|
||||
def _validate_digest(sha256_hex: str) -> str:
|
||||
@@ -25,27 +49,54 @@ def blob_path(root: Path | str, sha256_hex: str) -> Path:
|
||||
return Path(root) / digest[0:2] / digest[2:4] / f"{digest}.gz"
|
||||
|
||||
|
||||
def store_blob(root: Path | str, data: bytes) -> tuple[str, Path]:
|
||||
"""Write `data` (gzip-compressed) under its sha256 digest path.
|
||||
def store_blob(
|
||||
root: Path | str,
|
||||
stream: IO[bytes],
|
||||
*,
|
||||
max_bytes: int = _DEFAULT_MAX_BYTES,
|
||||
) -> tuple[str, Path]:
|
||||
"""Write `stream` (gzip-compressed) under its sha256 digest path.
|
||||
|
||||
Idempotent: an existing path with the same digest is not overwritten.
|
||||
Directory permissions are `0750` so only the owner and the `mimic` group
|
||||
can read.
|
||||
Reads from `stream` in fixed-size chunks while computing the digest and
|
||||
streaming the gzip body to a temp file. Raises `BlobTooLarge` as soon as
|
||||
the running total exceeds `max_bytes`; the temp file is unlinked in that
|
||||
case. Idempotent: if the target already exists the temp file is
|
||||
discarded.
|
||||
"""
|
||||
digest = hashlib.sha256(data).hexdigest()
|
||||
if max_bytes <= 0:
|
||||
raise ValueError(f"max_bytes must be positive, got {max_bytes}")
|
||||
|
||||
sha = hashlib.sha256()
|
||||
tmp_path = Path(root) / f".tmp-{os.getpid()}-{os.urandom(8).hex()}.gz"
|
||||
tmp_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
os.chmod(tmp_path.parent, _DIR_MODE)
|
||||
|
||||
total = 0
|
||||
try:
|
||||
with gzip.open(tmp_path, "wb") as gz:
|
||||
while True:
|
||||
chunk = stream.read(_READ_CHUNK)
|
||||
if not chunk:
|
||||
break
|
||||
total += len(chunk)
|
||||
if total > max_bytes:
|
||||
raise BlobTooLarge(max_bytes)
|
||||
sha.update(chunk)
|
||||
gz.write(chunk)
|
||||
except BlobTooLarge:
|
||||
with contextlib.suppress(FileNotFoundError):
|
||||
tmp_path.unlink()
|
||||
raise
|
||||
|
||||
digest = sha.hexdigest()
|
||||
target = blob_path(root, digest)
|
||||
if target.exists():
|
||||
with contextlib.suppress(FileNotFoundError):
|
||||
tmp_path.unlink()
|
||||
return digest, target
|
||||
|
||||
target.parent.mkdir(parents=True, exist_ok=True)
|
||||
# 0o750: owner full, group r-x, others none. The blob root is owned by the
|
||||
# `mimic` system user; only the application and any explicit group member
|
||||
# (audit / backup) get read access.
|
||||
os.chmod(target.parent, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP) # noqa: S103
|
||||
|
||||
tmp = target.with_suffix(target.suffix + ".tmp")
|
||||
with gzip.open(tmp, "wb") as fh:
|
||||
fh.write(data)
|
||||
os.chmod(tmp, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
|
||||
tmp.replace(target)
|
||||
os.chmod(target.parent, _DIR_MODE)
|
||||
os.chmod(tmp_path, _FILE_MODE)
|
||||
tmp_path.replace(target)
|
||||
return digest, target
|
||||
|
||||
@@ -1,13 +1,18 @@
|
||||
"""Content-addressed gzip blob store (D-012)."""
|
||||
"""Content-addressed gzip blob store (D-012, MA2 streaming)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import gzip
|
||||
import hashlib
|
||||
import io
|
||||
|
||||
import pytest
|
||||
|
||||
from mimic.storage.blob import blob_path, store_blob
|
||||
from mimic.storage.blob import BlobTooLarge, blob_path, store_blob
|
||||
|
||||
|
||||
def _stream(data: bytes) -> io.BytesIO:
|
||||
return io.BytesIO(data)
|
||||
|
||||
|
||||
def test_blob_path_uses_two_byte_pairs(tmp_path) -> None:
|
||||
@@ -24,7 +29,7 @@ def test_blob_path_rejects_invalid_digest(tmp_path) -> None:
|
||||
def test_store_blob_writes_gzip_and_returns_digest(tmp_path) -> None:
|
||||
payload = b"hello world\n"
|
||||
expected = hashlib.sha256(payload).hexdigest()
|
||||
digest, path = store_blob(tmp_path, payload)
|
||||
digest, path = store_blob(tmp_path, _stream(payload))
|
||||
assert digest == expected
|
||||
assert path == tmp_path / expected[0:2] / expected[2:4] / f"{expected}.gz"
|
||||
with gzip.open(path, "rb") as fh:
|
||||
@@ -33,17 +38,40 @@ def test_store_blob_writes_gzip_and_returns_digest(tmp_path) -> None:
|
||||
|
||||
def test_store_blob_is_idempotent(tmp_path) -> None:
|
||||
payload = b"same content"
|
||||
digest1, path1 = store_blob(tmp_path, payload)
|
||||
digest1, path1 = store_blob(tmp_path, _stream(payload))
|
||||
mtime_before = path1.stat().st_mtime_ns
|
||||
digest2, path2 = store_blob(tmp_path, payload)
|
||||
digest2, path2 = store_blob(tmp_path, _stream(payload))
|
||||
assert digest1 == digest2
|
||||
assert path1 == path2
|
||||
assert path2.stat().st_mtime_ns == mtime_before
|
||||
|
||||
|
||||
def test_store_blob_dedupes_distinct_payloads(tmp_path) -> None:
|
||||
_, p1 = store_blob(tmp_path, b"alpha")
|
||||
_, p2 = store_blob(tmp_path, b"beta")
|
||||
_, p1 = store_blob(tmp_path, _stream(b"alpha"))
|
||||
_, p2 = store_blob(tmp_path, _stream(b"beta"))
|
||||
assert p1 != p2
|
||||
assert p1.exists()
|
||||
assert p2.exists()
|
||||
|
||||
|
||||
def test_store_blob_raises_when_stream_exceeds_cap(tmp_path) -> None:
|
||||
too_big = b"A" * (1024 + 1)
|
||||
with pytest.raises(BlobTooLarge):
|
||||
store_blob(tmp_path, _stream(too_big), max_bytes=1024)
|
||||
# No tmp file left behind.
|
||||
leftovers = [p for p in tmp_path.iterdir() if p.name.startswith(".tmp-")]
|
||||
assert leftovers == []
|
||||
|
||||
|
||||
def test_store_blob_handles_large_stream_in_chunks(tmp_path) -> None:
|
||||
# 1.5 MB payload — exercises the multi-chunk path (chunks are 64 KB).
|
||||
payload = (b"X" * 64 * 1024) * 24
|
||||
digest, path = store_blob(tmp_path, _stream(payload), max_bytes=2 * 1024 * 1024)
|
||||
assert digest == hashlib.sha256(payload).hexdigest()
|
||||
with gzip.open(path, "rb") as fh:
|
||||
assert fh.read() == payload
|
||||
|
||||
|
||||
def test_store_blob_rejects_zero_or_negative_max(tmp_path) -> None:
|
||||
with pytest.raises(ValueError, match="max_bytes"):
|
||||
store_blob(tmp_path, _stream(b"x"), max_bytes=0)
|
||||
|
||||
Reference in New Issue
Block a user