diff --git a/backend/src/mimic/storage/blob.py b/backend/src/mimic/storage/blob.py index c2d1a22..3af3136 100644 --- a/backend/src/mimic/storage/blob.py +++ b/backend/src/mimic/storage/blob.py @@ -1,14 +1,38 @@ -"""Content-addressed gzip-compressed blob store (D-012).""" +"""Content-addressed gzip-compressed blob store (D-012). + +`store_blob` consumes a binary stream and writes it gzip-compressed to a +content-addressed path `///.gz`. Per MA2: +- streaming hash + write (no whole-buffer load — defends against memory DoS + from a hostile-large C2 output); +- explicit `max_bytes` cap, raising `BlobTooLarge` as soon as the limit is + crossed (the partial temp file is deleted); +- atomic rename via `///.gz.tmp` → final target; +- explicit `0o750` directory mode (does not rely on the process umask). +""" from __future__ import annotations +import contextlib import gzip import hashlib import os import stat from pathlib import Path +from typing import IO _SHA256_HEX_LEN = 64 +_DEFAULT_MAX_BYTES = 10 * 1024 * 1024 # 10 MB cap (D-012) +_READ_CHUNK = 64 * 1024 # 64 KB streaming chunks +_DIR_MODE = 0o750 +_FILE_MODE = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP + + +class BlobTooLarge(RuntimeError): # noqa: N818 (intentional: spec-aligned name) + """Raised when an input stream exceeds the configured `max_bytes` cap.""" + + def __init__(self, max_bytes: int) -> None: + super().__init__(f"blob exceeds {max_bytes} byte cap") + self.max_bytes = max_bytes def _validate_digest(sha256_hex: str) -> str: @@ -25,27 +49,54 @@ def blob_path(root: Path | str, sha256_hex: str) -> Path: return Path(root) / digest[0:2] / digest[2:4] / f"{digest}.gz" -def store_blob(root: Path | str, data: bytes) -> tuple[str, Path]: - """Write `data` (gzip-compressed) under its sha256 digest path. +def store_blob( + root: Path | str, + stream: IO[bytes], + *, + max_bytes: int = _DEFAULT_MAX_BYTES, +) -> tuple[str, Path]: + """Write `stream` (gzip-compressed) under its sha256 digest path. - Idempotent: an existing path with the same digest is not overwritten. - Directory permissions are `0750` so only the owner and the `mimic` group - can read. + Reads from `stream` in fixed-size chunks while computing the digest and + streaming the gzip body to a temp file. Raises `BlobTooLarge` as soon as + the running total exceeds `max_bytes`; the temp file is unlinked in that + case. Idempotent: if the target already exists the temp file is + discarded. """ - digest = hashlib.sha256(data).hexdigest() + if max_bytes <= 0: + raise ValueError(f"max_bytes must be positive, got {max_bytes}") + + sha = hashlib.sha256() + tmp_path = Path(root) / f".tmp-{os.getpid()}-{os.urandom(8).hex()}.gz" + tmp_path.parent.mkdir(parents=True, exist_ok=True) + os.chmod(tmp_path.parent, _DIR_MODE) + + total = 0 + try: + with gzip.open(tmp_path, "wb") as gz: + while True: + chunk = stream.read(_READ_CHUNK) + if not chunk: + break + total += len(chunk) + if total > max_bytes: + raise BlobTooLarge(max_bytes) + sha.update(chunk) + gz.write(chunk) + except BlobTooLarge: + with contextlib.suppress(FileNotFoundError): + tmp_path.unlink() + raise + + digest = sha.hexdigest() target = blob_path(root, digest) if target.exists(): + with contextlib.suppress(FileNotFoundError): + tmp_path.unlink() return digest, target target.parent.mkdir(parents=True, exist_ok=True) - # 0o750: owner full, group r-x, others none. The blob root is owned by the - # `mimic` system user; only the application and any explicit group member - # (audit / backup) get read access. - os.chmod(target.parent, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP) # noqa: S103 - - tmp = target.with_suffix(target.suffix + ".tmp") - with gzip.open(tmp, "wb") as fh: - fh.write(data) - os.chmod(tmp, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) - tmp.replace(target) + os.chmod(target.parent, _DIR_MODE) + os.chmod(tmp_path, _FILE_MODE) + tmp_path.replace(target) return digest, target diff --git a/backend/tests/unit/test_storage_blob.py b/backend/tests/unit/test_storage_blob.py index a5fa456..be569c7 100644 --- a/backend/tests/unit/test_storage_blob.py +++ b/backend/tests/unit/test_storage_blob.py @@ -1,13 +1,18 @@ -"""Content-addressed gzip blob store (D-012).""" +"""Content-addressed gzip blob store (D-012, MA2 streaming).""" from __future__ import annotations import gzip import hashlib +import io import pytest -from mimic.storage.blob import blob_path, store_blob +from mimic.storage.blob import BlobTooLarge, blob_path, store_blob + + +def _stream(data: bytes) -> io.BytesIO: + return io.BytesIO(data) def test_blob_path_uses_two_byte_pairs(tmp_path) -> None: @@ -24,7 +29,7 @@ def test_blob_path_rejects_invalid_digest(tmp_path) -> None: def test_store_blob_writes_gzip_and_returns_digest(tmp_path) -> None: payload = b"hello world\n" expected = hashlib.sha256(payload).hexdigest() - digest, path = store_blob(tmp_path, payload) + digest, path = store_blob(tmp_path, _stream(payload)) assert digest == expected assert path == tmp_path / expected[0:2] / expected[2:4] / f"{expected}.gz" with gzip.open(path, "rb") as fh: @@ -33,17 +38,40 @@ def test_store_blob_writes_gzip_and_returns_digest(tmp_path) -> None: def test_store_blob_is_idempotent(tmp_path) -> None: payload = b"same content" - digest1, path1 = store_blob(tmp_path, payload) + digest1, path1 = store_blob(tmp_path, _stream(payload)) mtime_before = path1.stat().st_mtime_ns - digest2, path2 = store_blob(tmp_path, payload) + digest2, path2 = store_blob(tmp_path, _stream(payload)) assert digest1 == digest2 assert path1 == path2 assert path2.stat().st_mtime_ns == mtime_before def test_store_blob_dedupes_distinct_payloads(tmp_path) -> None: - _, p1 = store_blob(tmp_path, b"alpha") - _, p2 = store_blob(tmp_path, b"beta") + _, p1 = store_blob(tmp_path, _stream(b"alpha")) + _, p2 = store_blob(tmp_path, _stream(b"beta")) assert p1 != p2 assert p1.exists() assert p2.exists() + + +def test_store_blob_raises_when_stream_exceeds_cap(tmp_path) -> None: + too_big = b"A" * (1024 + 1) + with pytest.raises(BlobTooLarge): + store_blob(tmp_path, _stream(too_big), max_bytes=1024) + # No tmp file left behind. + leftovers = [p for p in tmp_path.iterdir() if p.name.startswith(".tmp-")] + assert leftovers == [] + + +def test_store_blob_handles_large_stream_in_chunks(tmp_path) -> None: + # 1.5 MB payload — exercises the multi-chunk path (chunks are 64 KB). + payload = (b"X" * 64 * 1024) * 24 + digest, path = store_blob(tmp_path, _stream(payload), max_bytes=2 * 1024 * 1024) + assert digest == hashlib.sha256(payload).hexdigest() + with gzip.open(path, "rb") as fh: + assert fh.read() == payload + + +def test_store_blob_rejects_zero_or_negative_max(tmp_path) -> None: + with pytest.raises(ValueError, match="max_bytes"): + store_blob(tmp_path, _stream(b"x"), max_bytes=0)