Project source-tree ******************* Below is the layout of the project (to 10 levels), followed by the contents of each key file. Project directory layout safezip/ ├── src │ └── safezip │ ├── cli │ │ ├── __init__.py │ │ └── _main.py │ ├── tests │ │ ├── __init__.py │ │ ├── conftest.py │ │ ├── test_cli.py │ │ ├── test_guard.py │ │ ├── test_integration.py │ │ ├── test_sandbox.py │ │ └── test_streamer.py │ ├── __init__.py │ ├── _core.py │ ├── _events.py │ ├── _exceptions.py │ ├── _guard.py │ ├── _sandbox.py │ ├── _streamer.py │ └── py.typed ├── AGENTS.md ├── conftest.py ├── CONTRIBUTING.rst ├── docker-compose.yml ├── Dockerfile ├── Makefile ├── pyproject.toml ├── README.rst └── tox.ini README.rst ========== README.rst ======= safezip ======= .. image:: https://raw.githubusercontent.com/barseghyanartur/safezip/main/docs/_static/safezip_logo.webp :alt: SafeZip Logo :align: center Hardened ZIP extraction for Python - secure by default. .. image:: https://img.shields.io/pypi/v/safezip.svg :target: https://pypi.python.org/pypi/safezip :alt: PyPI Version .. image:: https://img.shields.io/pypi/pyversions/safezip.svg :target: https://pypi.python.org/pypi/safezip/ :alt: Supported Python versions .. image:: https://github.com/barseghyanartur/safezip/actions/workflows/test.yml/badge.svg?branch=main :target: https://github.com/barseghyanartur/safezip/actions :alt: Build Status .. image:: https://readthedocs.org/projects/safezip/badge/?version=latest :target: http://safezip.readthedocs.io :alt: Documentation Status .. image:: https://img.shields.io/badge/docs-llms.txt-blue :target: https://safezip.readthedocs.io/en/latest/llms.txt :alt: llms.txt - documentation for LLMs .. image:: https://img.shields.io/badge/license-MIT-blue.svg :target: https://github.com/barseghyanartur/safezip/#License :alt: MIT .. image:: https://coveralls.io/repos/github/barseghyanartur/safezip/badge.svg?branch=main&service=github :target: https://coveralls.io/github/barseghyanartur/safezip?branch=main :alt: Coverage ``safezip`` is a zero-dependency, production-grade wrapper around Python's ``zipfile`` module that defends against the most common ZIP-based attacks: ZipSlip path traversal, ZIP bombs, and malformed/crafted archives. Features ======== - **ZipSlip protection** - relative traversal, absolute paths, Windows UNC paths, Unicode lookalike attacks, and null bytes in filenames are all blocked. - **ZIP bomb protection** - per-member and cumulative decompression ratio limits abort extraction before runaway decompression can exhaust disk or memory. - **File size limits** - per-member size is checked against the declared header value at open time (Guard phase) and again against actual decompressed bytes during streaming (Streamer phase). Total extraction size is enforced cumulatively across all members at stream time. - **ZIP64 consistency checks** - crafted archives with inconsistent ZIP64 extra fields are rejected before decompression begins. - **Symlink policy** - configurable: ``REJECT`` (default), ``IGNORE``, or ``RESOLVE_INTERNAL`` (symlink entries are extracted as regular files; no OS symlink is created on disk). - **Atomic writes** - every member is written to a temporary file first; the destination is only created after all checks pass. No partial files are left on disk after a security abort. - **Secure by default** - all limits are active without any configuration. - **Zero dependencies** - standard library only. - **Environment variable overrides** - all limits (including ``symlink_policy``) can be set via ``SAFEZIP_*`` environment variables for containerised deployments. Prerequisites ============= Python 3.10 or later. No additional packages required. Installation ============ With ``uv``: .. code-block:: sh uv pip install safezip Or with ``pip``: .. code-block:: sh pip install safezip Quick start =========== Drop-in replacement for the common ``zipfile`` extraction pattern: .. pytestfixture: file_zip .. code-block:: python :name: test_safe_extract from safezip import safe_extract safe_extract("path/to/file.zip", "/var/files/extracted/") Or use the ``SafeZipFile`` context manager for more control: .. pytestfixture: file_zip .. code-block:: python :name: test_safe_zipfile from safezip import SafeZipFile with SafeZipFile("path/to/file.zip") as zf: print(zf.namelist()) zf.extractall("/var/files/extracted/") Custom limits ============= See the `Default limits`_ for reference. .. pytestfixture: file_zip .. code-block:: python :name: test_custom_limits from safezip import SafeZipFile, SymlinkPolicy with SafeZipFile( "path/to/file.zip", max_file_size=100 * 1024 * 1024, # 100 MiB per member (default: 1 GiB) max_total_size=500 * 1024 * 1024, # 500 MiB total (default: 5 GiB) max_files=1_000, # (default: 10 000) max_per_member_ratio=50.0, # (default: 200) max_total_ratio=50.0, # (default: 200) max_nesting_depth=1, # (default: 3) symlink_policy=SymlinkPolicy.IGNORE, # (default: SymlinkPolicy.REJECT) ) as zf: zf.extractall("/var/files/extracted/") Recursive extraction ==================== When an archive contains nested ``.zip`` files, set ``recursive=True`` to descend into them automatically. All safety limits apply at every level. Each nested archive is extracted into a directory named after it (without the extension). The ``.zip`` file itself is never written to disk. .. pytestfixture: nested_file_zip .. code-block:: python :name: test_recursive_extraction from safezip import SafeZipFile # archive.zip # readme.txt # data.zip ← will be descended into, not extracted as a blob # report.csv with SafeZipFile("path/to/archive.zip", recursive=True, max_nesting_depth=3) as zf: zf.extractall("/var/files/extracted/") # Result on disk: # /var/files/extracted/readme.txt # /var/files/extracted/data/report.csv With ``max_nesting_depth=0``, opening any nested archive raises ``NestingDepthError`` before extracting a single byte from it: .. pytestfixture: nested_file_zip .. code-block:: python :name: test_recursive_extraction_depth_limit import pytest from safezip import SafeZipFile, NestingDepthError # archive.zip # readme.txt # data.zip ← depth 1 exceeds max_nesting_depth=0 → NestingDepthError # report.csv with pytest.raises(NestingDepthError): with SafeZipFile( "path/to/archive.zip", recursive=True, max_nesting_depth=0 ) as zf: zf.extractall("/var/files/extracted/") Security event monitoring ========================= .. pytestfixture: file_zip .. code-block:: python :name: test_security_event_monitoring from safezip import SafeZipFile, SecurityEvent def my_monitor(event: SecurityEvent) -> None: print(f"[safezip] {event.event_type} archive={event.archive_hash}") with SafeZipFile("path/to/file.zip", on_security_event=my_monitor) as zf: zf.extractall("/var/files/extracted/") Environment variable overrides ============================== See the `Default limits`_ for reference. All limits can be overridden without changing code: .. code-block:: sh export SAFEZIP_MAX_FILE_SIZE=104857600 # 100 MiB (default: 1 GiB) export SAFEZIP_MAX_TOTAL_SIZE=524288000 # 500 MiB (default: 5 GiB) export SAFEZIP_MAX_FILES=1000 # (default: 10 000) export SAFEZIP_MAX_PER_MEMBER_RATIO=50 # (default: 200) export SAFEZIP_MAX_TOTAL_RATIO=50 # (default: 200) export SAFEZIP_MAX_NESTING_DEPTH=1 # (default: 3) export SAFEZIP_SYMLINK_POLICY=ignore # reject | ignore | resolve_internal (default: reject) Default limits ============== +--------------------------+------------+ | Parameter | Default | +==========================+============+ | ``max_file_size`` | 1 GiB | +--------------------------+------------+ | ``max_total_size`` | 5 GiB | +--------------------------+------------+ | ``max_files`` | 10 000 | +--------------------------+------------+ | ``max_per_member_ratio`` | 200 | +--------------------------+------------+ | ``max_total_ratio`` | 200 | +--------------------------+------------+ | ``max_nesting_depth`` | 3 | +--------------------------+------------+ | ``symlink_policy`` | REJECT | +--------------------------+------------+ | ``recursive`` | False | +--------------------------+------------+ Testing ======= All tests run inside Docker to prevent accidental pollution of the host system: .. code-block:: sh make test To test a specific Python version: .. code-block:: sh make test-env ENV=py312 Writing documentation ===================== Keep the following hierarchy: .. code-block:: text ===== title ===== header ====== sub-header ---------- sub-sub-header ~~~~~~~~~~~~~~ sub-sub-sub-header ^^^^^^^^^^^^^^^^^^ sub-sub-sub-sub-header ++++++++++++++++++++++ sub-sub-sub-sub-sub-header ************************** License ======= MIT Support ======= For security issues contact me at the e-mail given in the `Author`_ section. For overall issues, go to `GitHub `_. Author ====== Artur Barseghyan CONTRIBUTING.rst ================ CONTRIBUTING.rst Contributor guidelines ====================== .. _safezip: https://github.com/barseghyanartur/safezip/ .. _uv: https://docs.astral.sh/uv/ .. _tox: https://tox.wiki .. _ruff: https://beta.ruff.rs/docs/ .. _doc8: https://doc8.readthedocs.io/ .. _pre-commit: https://pre-commit.com/#installation .. _issues: https://github.com/barseghyanartur/safezip/issues .. _discussions: https://github.com/barseghyanartur/safezip/discussions .. _pull request: https://github.com/barseghyanartur/safezip/pulls .. _versions manifest: https://github.com/actions/python-versions/blob/main/versions-manifest.json Developer prerequisites ----------------------- pre-commit ~~~~~~~~~~ Refer to `pre-commit`_ for installation instructions. TL;DR: .. code-block:: sh curl -LsSf https://astral.sh/uv/install.sh | sh # Install uv uv tool install pre-commit # Install pre-commit pre-commit install # Install hooks Installing `pre-commit`_ ensures all contributions adhere to the project's code quality standards. Code standards -------------- `ruff`_ and `doc8`_ are triggered automatically by `pre-commit`_. To run checks manually: .. code-block:: sh make doc8 make ruff Virtual environment ------------------- .. code-block:: sh make create-venv Installation ------------ .. code-block:: sh make install Testing ------- **All tests must be run inside Docker.** This prevents accidental extraction of malicious test archives from reaching the host filesystem. .. code-block:: sh make test To test a single environment: .. code-block:: sh make test-env ENV=py312 For an interactive shell inside the container: .. code-block:: sh make shell In any case, GitHub Actions runs the full matrix automatically on every push. Releases -------- **Build the package for releasing:** .. code-block:: sh make package-build ---- **Test the built package:** .. code-block:: sh make check-package-build ---- **Make a test release (test.pypi.org):** .. code-block:: sh make test-release ---- **Release (pypi.org):** .. code-block:: sh make release Adding tests ------------ - All test archives must be crafted programmatically in ``conftest.py`` using Python's ``struct`` module or ``zipfile``. Do not commit pre-built ``.zip`` files. - Every new security check must have a corresponding test in the relevant ``test_*.py`` file. - Integration tests must verify that no partial files remain on disk after a security abort (atomic write contract). Pull requests ------------- Open a `pull request`_ to the ``dev`` branch only. Never directly to ``main``. .. note:: Create pull requests to the ``dev`` branch only! Examples of welcome contributions: - Fixing documentation typos or improving explanations. - Adding test cases for new edge cases. - Extending support for additional archive attack vectors. - Improving error messages. General checklist ~~~~~~~~~~~~~~~~~ - Does your change require documentation updates? - Does your change require new tests? - Does your change add any external dependencies? If so, reconsider: ``safezip`` is intentionally dependency-free. When fixing bugs ~~~~~~~~~~~~~~~~ - Add a regression test that reproduces the bug before your fix. When adding a new feature ~~~~~~~~~~~~~~~~~~~~~~~~~ - Update ``README.rst`` (quick start, default limits table if relevant). - Update ``plan.rst`` if the architectural design changes. - Add appropriate tests in the correct ``test_*.py`` file. GitHub Actions -------------- Tests run on Python 3.10–3.14 (all non-EOL versions). See the `versions manifest`_ for the full list of available Python versions. Questions --------- Ask on GitHub `discussions`_. Issues ------ Report bugs or request features on GitHub `issues`_. **Do not report security vulnerabilities on GitHub.** Contact the author directly at artur.barseghyan@gmail.com. AGENTS.md ========= AGENTS.md # AGENTS.md — safezip **Package version**: See pyproject.toml **Repository**: https://github.com/barseghyanartur/safezip **Maintainer**: Artur Barseghyan This file is for AI agents and developers using AI assistants to work on or with safezip. It covers two distinct roles: **using** the package in application code, and **developing/extending** the package itself. --- ## 1. Project Mission (Never Deviate) > Hardened ZIP extraction for Python — secure by default, zero dependencies, > production-grade. - Secure defaults are never relaxed without an explicit caller decision. - No external dependencies. Ever. - The three-phase security model (Guard → Sandbox → Streamer) is preserved. - No partial files on disk after a security abort. --- ## 2. Using safezip in Application Code ### Simple case ```python name=test_simple_case from safezip import safe_extract # Secure defaults protect against all common attacks safe_extract("path/to/file.zip", "/var/files/extracted/") ``` ### With monitoring and custom limits ```python name=test_with_monitoring_and_custom_limits from safezip import SafeZipFile, SecurityEvent def monitor(event: SecurityEvent) -> None: print(f"Security event: {event.event_type}") with SafeZipFile( "path/to/file.zip", max_file_size=100 * 1024 * 1024, # 100 MiB per member on_security_event=monitor, ) as zf: zf.extractall("/var/files/extracted/") ``` ### Exception handling All safezip exceptions inherit from `SafezipError`: ```python name=test_exception_handling from safezip import ( safe_extract, SafezipError, UnsafeZipError, # path traversal or disallowed symlink CompressionRatioError, # ZIP bomb attempt FileSizeExceededError, # member too large TotalSizeExceededError, # cumulative size exceeded FileCountExceededError, # too many entries MalformedArchiveError, # structurally invalid archive NestingDepthError, # nested archive depth exceeded ) try: safe_extract("path/to/file.zip", "/var/files/extracted/") except UnsafeZipError: ... except CompressionRatioError: ... except SafezipError: # catch-all for any safezip violation ... ``` ### Secure defaults reference ```python name=test_secure_defaults_reference from safezip import SafeZipFile, SymlinkPolicy SafeZipFile( "path/to/file.zip", max_file_size=1 * 1024**3, # 1 GiB per member max_total_size=5 * 1024**3, # 5 GiB total max_files=10_000, max_per_member_ratio=200.0, max_total_ratio=200.0, max_nesting_depth=3, symlink_policy=SymlinkPolicy.REJECT, ) ``` All limits are overridable via environment variables: | Variable | Type | Default | |---|---|---| | `SAFEZIP_MAX_FILE_SIZE` | int (bytes) | 1 GiB | | `SAFEZIP_MAX_TOTAL_SIZE` | int (bytes) | 5 GiB | | `SAFEZIP_MAX_FILES` | int | 10 000 | | `SAFEZIP_MAX_PER_MEMBER_RATIO` | float | 200.0 | | `SAFEZIP_MAX_TOTAL_RATIO` | float | 200.0 | | `SAFEZIP_MAX_NESTING_DEPTH` | int | 3 | | `SAFEZIP_SYMLINK_POLICY` | str | reject | Resolution order: constructor argument > environment variable > hardcoded default. Invalid env values are logged and silently ignored. ### What safezip does not do - **Write mode** — `SafeZipFile` is read-only. It does not expose `open()`, `read()`, or any write-mode methods from `zipfile.ZipFile`. - **Recursive extraction** — nested `.zip` members are extracted as raw files. Recursion, if needed, is the caller's responsibility via `_nesting_depth`. - **Create OS symlinks** — `RESOLVE_INTERNAL` extracts symlink entries as regular files containing the target path as bytes. See section 5. --- ## 3. Architecture Each extraction passes through three phases in order. Each phase owns exactly one module. When adding a new check, identify the correct phase first. | Phase | File | Runs | Raises | |---|---|---|---| | **Guard** | `_guard.py` | On `SafeZipFile.__init__()`, before any decompression | `FileCountExceededError`, `FileSizeExceededError`, `MalformedArchiveError` | | **Sandbox** | `_sandbox.py` | Per member, before streaming begins | `UnsafeZipError` | | **Streamer** | `_streamer.py` | Per member, during decompression | `FileSizeExceededError`, `TotalSizeExceededError`, `CompressionRatioError` | **Guard** owns: file count limit, declared per-member size, ZIP64 consistency, null bytes in filenames. **Sandbox** owns: path traversal detection, absolute/UNC path rejection, Unicode NFC normalisation, null-byte rejection, path length limit, symlink policy (REJECT / IGNORE / RESOLVE_INTERNAL). **Streamer** owns: per-member decompressed size, cumulative total size, per-member ratio, cumulative ratio, atomic write contract (temp file → rename on success, unlink on failure). **Orchestration** (`_core.py`) — `SafeZipFile` and `safe_extract`. `_extract_one` calls the three phases in order per member. Environment variable resolution, security event emission, and symlink policy dispatch live here. ### Key files | File | Purpose | |---|---| | `src/safezip/_core.py` | Public API, orchestration, env overrides, event emission | | `src/safezip/_guard.py` | Phase A: static pre-checks | | `src/safezip/_sandbox.py` | Phase B: path resolution, symlink policy | | `src/safezip/_streamer.py` | Phase C: streaming extraction, atomic writes | | `src/safezip/_exceptions.py` | Exception hierarchy (all inherit `SafezipError`) | | `src/safezip/_events.py` | `SecurityEvent`, `SymlinkPolicy`, callback type | | `src/safezip/tests/conftest.py` | All test archive fixtures | | `pyproject.toml` | Build, ruff, mypy, pytest-cov configuration | | `README.rst` | End-user documentation; keep in sync with code | --- ## 4. Security Principles **1. Default limits are sacred.** Never lower them in examples or generated code. If a user asks you to relax a limit, warn about the tradeoff explicitly before complying. **2. Atomicity is non-negotiable.** Every member must follow: temp file → all checks pass → `replace()` to destination. On any exception: `unlink(missing_ok=True)` the temp file. The destination must never be created or modified if a check fails. No partial files may remain on disk. **3. Never merge phase responsibilities.** Path checks belong in `_sandbox.py`. Static header checks in `_guard.py`. Runtime byte checks in `_streamer.py`. Do not add path logic to the streamer or size logic to the guard. **4. Zero external dependencies.** stdlib only. If you are considering adding an import that is not in the Python standard library, the answer is no. **5. Security events must not be suppressible.** Exceptions raised inside `on_security_event` callbacks are caught and logged, but the original security exception always propagates. Never let a broken callback silently swallow a violation. --- ## 5. Known Intentional Behaviors — Do Not Treat as Bugs ### RESOLVE_INTERNAL extracts symlink entries as regular files ZIP entries flagged as symlinks (via `external_attr` Unix mode `S_IFLNK`) are written as regular files containing the link target path as bytes. Python's `zipfile` does not create OS symlinks. The post-extraction `check_symlink` / `_verify_symlink_chain` code in `_sandbox.py` is only reached if the OS creates an actual symlink, which does not happen in the current extraction path. This is **safe**: a regular file containing the text `"../escape.txt"` is harmless. The README description ("full chain verification") describes intended future behavior, not current behavior. **If asked to implement real symlink support:** in `_extract_one`, for `RESOLVE_INTERNAL` + `is_symlink_entry`, read the target bytes, call `os.symlink(target, dest)`, then call `check_symlink(dest, base, policy)`, unlink if unsafe. Add tests for both safe and escaping targets. Update README. ### compress_size == 0 skips the ratio check — this is correct The ratio check in `_streamer.py` is gated on `compress_size > 0`. This is not a vulnerability. Python's `zipfile` uses the central directory's `compress_size` to control how many compressed bytes it reads. The only case where `compress_size == 0` reaches the streamer for a member that successfully decompresses is a genuinely empty member (zero bytes), for which skipping the ratio check is correct behavior. A crafted archive with `compress_size=0` in the central directory but non-empty content is rejected by Python's `zipfile` with `BadZipFile` (CRC failure) before the streamer is reached. This has been empirically verified. **Do not attempt to "fix" this skip.** ### Nested archives are extracted as raw files Members with ZIP-like extensions (`.zip`, `.jar`, `.whl`, `.egg`, etc.) are extracted as opaque blobs. `SafeZipFile` does not auto-recurse. The `_nesting_depth` parameter and `NestingDepthError` exist to guard against runaway recursion if a caller implements manual recursion. --- ## 6. Agent Workflow: Adding Features or Fixing Bugs When asked to add a feature or fix a bug, follow these steps in order: 1. **Check the mission** — Does the change preserve zero deps, secure defaults, and the three-phase model? 2. **Identify the correct phase** — Guard (static/header), Sandbox (path/policy), or Streamer (runtime/bytes). 3. **For bug fixes: write the regression fixture first** — Add a programmatic archive fixture to `src/safezip/tests/conftest.py` that reproduces the bug. The test must fail before your fix. 4. **Implement the change** in the correct phase file. 5. **Add/update exceptions** in `_exceptions.py` if a new error type is needed (inherit from `SafezipError`). 6. **Add event emission** in `_core.py` (`self._emit_event("event_type")`) if the check fires inside `_extract_one`. 7. **Export** new public symbols from `__init__.py` and `__all__`. 8. **Write tests:** - Unit test in `test_[phase].py` (e.g., `test_streamer.py`). - Integration test in `test_integration.py` verifying no partial files remain. - Legitimate-input test confirming the happy path still works. 9. **Update `README.rst`** if the API or default limits table changed. 10. **Suggest running:** Either single environement test `make test-env ENV=py312` or test all envionments `make test`. 11. **Suggest running:** `make pre-commit`. ### Acceptable new features - Windows reserved filename detection (Phase B / Sandbox). - Additional event types for new violation categories. - Optional recursive extraction (caller-controlled, guarded by `_nesting_depth`). - Real OS symlink creation under `RESOLVE_INTERNAL` (see section 5). ### Forbidden - Adding any external dependency. - Lowering default limits. - Bypassing or merging phases. - Writing directly to the destination path (must use temp file). - Exposing write-mode or `open()`/`read()` methods on `SafeZipFile`. --- ## 7. Testing Rules ### All tests must run inside Docker ```sh make test # full matrix (Python 3.10–3.14) make test-env ENV=py312 # single version make shell # interactive shell ``` Do not run `pytest` directly on the host machine. Malicious test archives must not touch the host filesystem. ### Test layout ``` src/safezip/tests/ conftest.py — all archive fixtures (add new ones here) test_guard.py — Phase A tests test_sandbox.py — Phase B tests test_streamer.py — Phase C tests test_integration.py — end-to-end tests ``` The **root `conftest.py`** (project root) is for `pytest-codeblock` documentation testing only. Do not add security fixtures there. ### Fixture rules - Craft all test archives programmatically using `struct` or `zipfile`. Do not commit pre-built `.zip` files. - Use `tmp_path` for all output. Never write to a fixed path. ### Required assertions for every security abort test ```python # 1. pytest.raises wraps the full operation, not just extractall with pytest.raises(SpecificError): with SafeZipFile(...) as zf: zf.extractall(dest) # 2. Atomicity: no partial files remain remaining = [f for f in dest.rglob("*") if not f.is_dir()] assert not remaining ``` ### Checklist for every new security check - [ ] Fixture in `conftest.py` that triggers the violation - [ ] Test asserting the correct exception is raised - [ ] Test asserting no partial files remain after abort - [ ] Test asserting a legitimate archive still extracts correctly - [ ] Integration test in `test_integration.py` - [ ] Event emission tested if applicable --- ## 8. Coding Conventions Run all linting checks: ```sh make pre-commit ``` ### Formatting - Line length: **88 characters** (ruff). - Import sorting: `isort`; `safezip` is `known-first-party`. - Target: `py310`. Run `make ruff` to check. `ruff fix = true` auto-fixes on commit — do not fight the formatter. ### Ruff rules in effect `B`, `C4`, `E`, `F`, `G`, `I`, `ISC`, `INP`, `N`, `PERF`, `Q`, `SIM`. Explicitly ignored: | Rule | Reason | |---|---| | `G004` | f-strings in logging calls are allowed | | `ISC003` | implicit string concatenation across lines is allowed | | `PERF203` | `try/except` in loops allowed in `conftest.py` only | ### Style - Every non-test module must have `__all__`, `__author__`, `__copyright__`, `__license__` at module level. - Logger: always `logging.getLogger("safezip.security")`. Never use `__name__`. - Log member names truncated to 256 characters in `extra` dicts (privacy). - Always chain exceptions: `raise X(...) from exc`. - Type annotations on all public functions. Use `Optional[X]` (not `X | None`) to match the existing codebase. - `SecurityEvent` must never include member names, paths, or filesystem information — `event_type`, `archive_hash`, and `timestamp` only. ### Pull requests Target the `dev` branch only. Never open a PR directly to `main`. --- ## 9. Prompt Templates **Explaining usage to a user:** > You are an expert in secure Python file handling. Explain how to use safezip > for [task]. Start with secure defaults. Include exception handling. Note that > symlink entries are extracted as regular files, not OS symlinks. **Implementing a new feature:** > Extend safezip with [feature]. Follow the AGENTS.md agent workflow (section 6): > identify the correct phase, implement, add tests verifying atomicity and events, > update README. Preserve zero external dependencies and secure defaults. **Fixing a bug:** > Reproduce [bug] with a new programmatic fixture in conftest.py. The test must > fail before the fix. Then fix in the correct phase file. Add tests asserting > the correct exception, no partial files on disk, and that legitimate archives > still extract successfully. **Reviewing a change:** > Review this safezip change against AGENTS.md: Does it preserve zero deps? > Does it maintain the three-phase model? Does it follow the atomic write > contract? Are all new checks tested with both violation and legitimate inputs? conftest.py =========== conftest.py """ Pytest fixtures for documentation testing. DO NOT ADD OTHER FIXTURES HERE. """ import io import zipfile from pathlib import Path import pytest @pytest.fixture() def file_zip(tmp_path): """A valid ZIP file named file.zip.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf: zf.writestr("hello.txt", b"Hello, world!\n") p = Path("path/to") / "file.zip" p.write_bytes(buf.getvalue()) return p @pytest.fixture() def nested_file_zip(tmp_path): """archive.zip containing readme.txt and data.zip (which contains report.csv). Matches the README 'Recursive extraction' example exactly:: archive.zip readme.txt data.zip report.csv """ inner_buf = io.BytesIO() with zipfile.ZipFile(inner_buf, "w", compression=zipfile.ZIP_DEFLATED) as zf: zf.writestr("report.csv", b"id,value\n1,100\n") buf = io.BytesIO() with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf: zf.writestr("readme.txt", b"Archive readme\n") zf.writestr("data.zip", inner_buf.getvalue()) p = Path("path/to") / "archive.zip" p.write_bytes(buf.getvalue()) return p docker-compose.yml ================== docker-compose.yml services: tox: build: . volumes: - ./htmlcov:/app/htmlcov pyproject.toml ============== pyproject.toml [project] name = "safezip" description = "Hardened ZIP extraction for Python - secure by default." readme = "README.rst" version = "0.1.6" requires-python = ">=3.10" dependencies = [] authors = [ { name = "Artur Barseghyan", email = "artur.barseghyan@gmail.com" }, ] maintainers = [ { name = "Artur Barseghyan", email = "artur.barseghyan@gmail.com" }, ] license = "MIT" classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", "Operating System :: OS Independent", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", "Programming Language :: Python :: 3.14", "Programming Language :: Python :: 3.15", "Programming Language :: Python", "Topic :: Security", "Topic :: Software Development :: Libraries :: Python Modules", "Topic :: System :: Archiving :: Compression", ] keywords = [ "zip", "security", "zipslip", "zipbomb", "hardened", "safe", ] [project.scripts] safezip = "safezip.cli:main" [project.urls] Homepage = "https://github.com/barseghyanartur/safezip/" Repository = "https://github.com/barseghyanartur/safezip/" Issues = "https://github.com/barseghyanartur/safezip/issues" [project.optional-dependencies] all = ["safezip[dev,test,docs,build]"] dev = [ "detect-secrets", "doc8", "ipython", "mypy", "ruff", "uv", ] test = [ "pytest", "pytest-cov", "pytest-codeblock", ] docs = [ "sphinx", "sphinx-autobuild", "sphinx-rtd-theme>=1.3.0", "sphinx-no-pragma", "sphinx-markdown-builder", "sphinx-llms-txt-link", "sphinx-source-tree", ] build = [ "build", "twine", "wheel", ] [tool.setuptools] package-dir = {"" = "src"} [tool.setuptools.packages.find] where = ["src"] include = ["safezip", "safezip.*"] [build-system] requires = ["setuptools>=41.0", "wheel"] build-backend = "setuptools.build_meta" [tool.ruff] line-length = 88 lint.select = [ "B", "C4", "E", "F", "G", "I", "ISC", "INP", "N", "PERF", "Q", "SIM", ] lint.ignore = [ "G004", "ISC003", ] fix = true src = ["src/safezip"] exclude = [ ".bzr", ".direnv", ".eggs", ".git", ".hg", ".mypy_cache", ".nox", ".pants.d", ".ruff_cache", ".svn", ".tox", ".venv", "__pypackages__", "_build", "buck-out", "build", "dist", "node_modules", "venv", "docs", ] target-version = "py310" # Allow unused variables when underscore-prefixed. lint.dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" [tool.ruff.lint.isort] known-first-party = ["safezip"] [tool.ruff.lint.per-file-ignores] "conftest.py" = [ "PERF203" # Allow `try`-`except` within a loop incurs performance overhead ] [tool.doc8] ignore-path = [ "docs/requirements.txt", "src/safezip.egg-info/SOURCES.txt", ] [tool.pytest.ini_options] addopts = [ "-ra", "-vvv", "-q", "--cov=safezip", "--ignore=.tox", "--cov-report=html", "--cov-report=term", "--cov-append", "--capture=no", ] testpaths = [ "src/safezip/tests", ".", "**/*.rst", "**/*.md", ] pythonpath = ["src"] norecursedirs = [".git", ".tox"] [tool.coverage.run] relative_files = true omit = [".tox/*"] source = ["safezip"] [tool.coverage.report] show_missing = true exclude_lines = [ "pragma: no cover", "@overload", ] [tool.mypy] check_untyped_defs = true warn_unused_ignores = true warn_redundant_casts = true warn_unused_configs = true ignore_missing_imports = true [tool.sphinx-source-tree] ignore = [ "*.egg-info", "*.py,cover", "*.pyc", "*.pyo", ".DS_Store", ".coverage", ".coverage.*", ".git", ".hg", ".hypothesis", ".idea", ".mypy_cache", ".nox", ".pre-commit-config.yaml", ".pre-commit-hooks.yaml", ".pytest_cache", ".readthedocs.yaml", ".ruff_cache", ".secrets.baseline", ".svn", ".tox", ".venv", ".vscode", "CHANGELOG.rst", "CODE_OF_CONDUCT.rst", "LICENSE", "SECURITY.rst", "Thumbs.db", "__pycache__", "build", "codebin", "dist", "docs/Makefile", "docs/_build", "docs/_static", "docs/changelog.rst", "docs/code_of_conduct.rst", "docs/customization", "docs/make.bat", "docs/requirements.txt", "docs/security.rst", "docs/source_tree.rst", "docs/source_tree_full.rst", "env", "htmlcov", "node_modules", "venv", "ARCHITECTURE.rst", ".coderabbit.yaml", ".coveralls", "docs/full-llms.rst", "docs/llms.rst", "docs/contributor_guidelines.rst", "docs/package.rst", "docs/documentation.rst", "docs/index.rst", ] order = [ "README.rst", "CONTRIBUTING.rst", "AGENTS.md", ] [[tool.sphinx-source-tree.files]] output = "docs/full_llms.rst" title = "Full project source-tree" [[tool.sphinx-source-tree.files]] output = "docs/llms.rst" title = "Project source-tree" ignore = [ "*.egg-info", "*.py,cover", "*.pyc", "*.pyo", ".DS_Store", ".coverage", ".coverage.*", ".git", ".hg", ".hypothesis", ".idea", ".mypy_cache", ".nox", ".pre-commit-config.yaml", ".pre-commit-hooks.yaml", ".pytest_cache", ".readthedocs.yaml", ".ruff_cache", ".secrets.baseline", ".svn", ".tox", ".venv", ".vscode", "CHANGELOG.rst", "CODE_OF_CONDUCT.rst", "LICENSE", "SECURITY.rst", "Thumbs.db", "__pycache__", "build", "codebin", "dist", "docs/Makefile", "docs/_build", "docs/_static", "docs/changelog.rst", "docs/code_of_conduct.rst", "docs/customization", "docs/make.bat", "docs/requirements.txt", "docs/security.rst", "docs/source_tree.rst", "docs/source_tree_full.rst", "env", "htmlcov", "node_modules", "venv", "examples", "docs", "ARCHITECTURE.rst", ".coderabbit.yaml", ".coveralls", "docs/full-llms.rst", "docs/llms.rst", "docs/contributor_guidelines.rst", "docs/package.rst", "docs/documentation.rst", "docs/index.rst", ] src/safezip/__init__.py ======================= src/safezip/__init__.py """safezip - Hardened ZIP extraction for Python.""" from ._core import SafeZipFile, safe_extract from ._events import SecurityEvent, SymlinkPolicy from ._exceptions import ( CompressionRatioError, FileCountExceededError, FileSizeExceededError, MalformedArchiveError, NestingDepthError, SafezipError, TotalSizeExceededError, UnsafeZipError, ) __title__ = "safezip" __version__ = "0.1.6" __author__ = "Artur Barseghyan " __copyright__ = "2026 Artur Barseghyan" __license__ = "MIT" __all__ = ( # Core "SafeZipFile", "safe_extract", # Events / policy "SecurityEvent", "SymlinkPolicy", # Exceptions "SafezipError", "UnsafeZipError", "FileSizeExceededError", "TotalSizeExceededError", "CompressionRatioError", "FileCountExceededError", "NestingDepthError", "MalformedArchiveError", ) src/safezip/_core.py ==================== src/safezip/_core.py """SafeZipFile: the public hardened wrapper around zipfile.ZipFile.""" import hashlib import logging import os import stat import zipfile from contextlib import suppress from pathlib import Path from typing import BinaryIO, Optional, Union from ._events import SecurityEvent, SecurityEventCallback, SymlinkPolicy from ._exceptions import ( CompressionRatioError, FileCountExceededError, FileSizeExceededError, MalformedArchiveError, NestingDepthError, TotalSizeExceededError, UnsafeZipError, ) from ._guard import validate_archive from ._sandbox import check_symlink, resolve_member_path from ._streamer import CumulativeCounters, stream_extract_member __author__ = "Artur Barseghyan " __copyright__ = "2026 Artur Barseghyan" __license__ = "MIT" __all__ = ( "SafeZipFile", "safe_extract", ) log = logging.getLogger("safezip.security") _ARCHIVE_EXTENSIONS = frozenset( {".zip", ".jar", ".war", ".ear", ".apk", ".aar", ".whl", ".egg"} ) def _archive_stem(name: str) -> str: """Strip the archive extension from *name*, returning the base stem. Handles single extensions only (ZIP archives do not use compound extensions like .tar.gz), but normalises consistently. Examples:: archive.zip → archive lib.whl → lib app.jar → app data.csv → data.csv (non-archive extension unchanged) """ p = Path(name) if p.suffix.lower() in _ARCHIVE_EXTENSIONS: return p.stem return name def _env_int(name: str, default: int) -> int: val = os.environ.get(name) if val is None: return default try: return int(val) except ValueError: return default def _env_float(name: str, default: float) -> float: val = os.environ.get(name) if val is None: return default try: return float(val) except ValueError: return default def _env_bool(name: str, default: bool) -> bool: val = os.environ.get(name) if val is None: return default if val.lower() in ("1", "true", "yes", "on"): return True if val.lower() in ("0", "false", "no", "off"): return False log.warning( "Ignoring unrecognised %s value %r; using default %r.", name, val, default, ) return default def _sanitise_mode(path: Path, *, strip_special_bits: bool = True) -> None: """Strip setuid/setgid/sticky bits from *path* if requested.""" if not strip_special_bits: return try: current = path.stat().st_mode safe = current & ~(stat.S_ISUID | stat.S_ISGID | stat.S_ISVTX) if safe != current: os.chmod(path, safe) except OSError: pass # best-effort; extraction already succeeded def _env_symlink_policy(default: SymlinkPolicy) -> SymlinkPolicy: """Read SAFEZIP_SYMLINK_POLICY from the environment. Accepted values (case-insensitive): ``reject``, ``ignore``, ``resolve_internal``. Any other value is logged and ignored. """ val = os.environ.get("SAFEZIP_SYMLINK_POLICY") if val is None: return default mapping = { "reject": SymlinkPolicy.REJECT, "ignore": SymlinkPolicy.IGNORE, "resolve_internal": SymlinkPolicy.RESOLVE_INTERNAL, } resolved = mapping.get(val.lower()) if resolved is None: log.warning( "Ignoring unrecognised SAFEZIP_SYMLINK_POLICY value %r; using default %r.", val, default.value, ) return default return resolved _DEFAULT_MAX_FILE_SIZE: int = _env_int("SAFEZIP_MAX_FILE_SIZE", 1 * 1024**3) _DEFAULT_MAX_TOTAL_SIZE: int = _env_int("SAFEZIP_MAX_TOTAL_SIZE", 5 * 1024**3) _DEFAULT_MAX_FILES: int = _env_int("SAFEZIP_MAX_FILES", 10_000) _DEFAULT_MAX_PER_MEMBER_RATIO: float = _env_float("SAFEZIP_MAX_PER_MEMBER_RATIO", 200.0) _DEFAULT_MAX_TOTAL_RATIO: float = _env_float("SAFEZIP_MAX_TOTAL_RATIO", 200.0) _DEFAULT_MAX_NESTING_DEPTH: int = _env_int("SAFEZIP_MAX_NESTING_DEPTH", 3) _DEFAULT_SYMLINK_POLICY: SymlinkPolicy = _env_symlink_policy(SymlinkPolicy.REJECT) _DEFAULT_RECURSIVE: bool = _env_bool("SAFEZIP_RECURSIVE", False) def _archive_hash(file: Union[str, os.PathLike, BinaryIO]) -> str: """Return first 16 hex characters of SHA-256 of archive content (first 64 KiB). Content-based hashing ensures different files at the same path produce different hashes in SecurityEvent records. """ h = hashlib.sha256() if isinstance(file, (str, os.PathLike)): try: with open(file, "rb") as fh: h.update(fh.read(65536)) except OSError: h.update(str(file).encode()) return h.hexdigest()[:16] pos = file.tell() try: h.update(file.read(65536)) finally: with suppress(OSError): file.seek(pos) return h.hexdigest()[:16] class SafeZipFile: """A hardened, composition-based wrapper around :class:`zipfile.ZipFile`. All defences are enabled by default. Limits can be relaxed by passing explicit constructor arguments or by setting environment variables. .. note:: This class intentionally does **not** expose ``open()``, ``read()``, or any write-mode methods from the underlying ``zipfile.ZipFile``. Callers needing lower-level access must use ``zipfile.ZipFile`` directly, accepting the associated risks. """ def __init__( self, file: Union[str, os.PathLike, BinaryIO], mode: str = "r", *, max_file_size: Optional[int] = None, max_total_size: Optional[int] = None, max_files: Optional[int] = None, max_per_member_ratio: Optional[float] = None, max_total_ratio: Optional[float] = None, max_nesting_depth: Optional[int] = None, symlink_policy: Optional[SymlinkPolicy] = None, password: Optional[bytes] = None, on_security_event: SecurityEventCallback = None, _nesting_depth: int = 0, recursive: Optional[bool] = None, strip_special_bits: bool = True, ) -> None: # Resolve limits: constructor arg > env var > module-level default # Env vars are read at runtime to support test monkeypatching self._max_file_size = ( max_file_size if max_file_size is not None else _env_int("SAFEZIP_MAX_FILE_SIZE", _DEFAULT_MAX_FILE_SIZE) ) self._max_total_size = ( max_total_size if max_total_size is not None else _env_int("SAFEZIP_MAX_TOTAL_SIZE", _DEFAULT_MAX_TOTAL_SIZE) ) self._max_files = ( max_files if max_files is not None else _env_int("SAFEZIP_MAX_FILES", _DEFAULT_MAX_FILES) ) self._max_per_member_ratio = ( max_per_member_ratio if max_per_member_ratio is not None else _env_float( "SAFEZIP_MAX_PER_MEMBER_RATIO", _DEFAULT_MAX_PER_MEMBER_RATIO ) ) self._max_total_ratio = ( max_total_ratio if max_total_ratio is not None else _env_float("SAFEZIP_MAX_TOTAL_RATIO", _DEFAULT_MAX_TOTAL_RATIO) ) self._max_nesting_depth = ( max_nesting_depth if max_nesting_depth is not None else _env_int("SAFEZIP_MAX_NESTING_DEPTH", _DEFAULT_MAX_NESTING_DEPTH) ) self._symlink_policy = ( symlink_policy if symlink_policy is not None else _env_symlink_policy(_DEFAULT_SYMLINK_POLICY) ) self._recursive = ( recursive if recursive is not None else _env_bool("SAFEZIP_RECURSIVE", _DEFAULT_RECURSIVE) ) self._strip_special_bits = strip_special_bits self._password = password self._on_security_event = on_security_event self._archive_hash = _archive_hash(file) self._nesting_depth = _nesting_depth if _nesting_depth > self._max_nesting_depth: self._emit_event("nesting_depth_exceeded") log.warning( "Nesting depth limit exceeded", extra={ "event": "nesting_depth_exceeded", "nesting_depth": _nesting_depth, "max_nesting_depth": self._max_nesting_depth, "archive_hash": self._archive_hash, }, ) raise NestingDepthError( f"Nested archive depth {_nesting_depth} exceeds " f"max_nesting_depth={self._max_nesting_depth}." ) try: self._zf = zipfile.ZipFile(file, mode) except zipfile.BadZipFile as exc: raise MalformedArchiveError(f"Cannot open archive: {exc}") from exc # Run the Guard immediately on open try: validate_archive( self._zf, self._max_files, self._max_file_size, self._max_total_size ) except FileCountExceededError: self._emit_event("file_count_exceeded") raise except FileSizeExceededError: self._emit_event("declared_size_exceeded") raise except MalformedArchiveError: self._emit_event("malformed_archive") raise # ------------------------------------------------------------------ # Context manager # ------------------------------------------------------------------ def __enter__(self) -> "SafeZipFile": return self def __exit__(self, *args: object) -> None: self.close() def close(self) -> None: """Close the underlying archive.""" self._zf.close() # ------------------------------------------------------------------ # Read-only inspection (safe subset of zipfile.ZipFile) # ------------------------------------------------------------------ def namelist(self) -> list: """Return a list of archive member names.""" return self._zf.namelist() def infolist(self) -> list: """Return a list of ZipInfo objects for all archive members.""" return self._zf.infolist() def getinfo(self, name: str) -> zipfile.ZipInfo: """Return a ZipInfo object for *name*.""" return self._zf.getinfo(name) # ------------------------------------------------------------------ # Extraction # ------------------------------------------------------------------ def extract( self, member: Union[str, zipfile.ZipInfo], path: Union[str, os.PathLike], *, pwd: Optional[bytes] = None, ) -> str: """Safely extract a single *member* to *path*. :param member: Member name string or ZipInfo object. :param path: Destination directory (required; no default). :param pwd: Optional decryption password. :returns: The path to the extracted file as a string. :raises UnsafeZipError: On path traversal, absolute paths, or symlinks. :raises FileSizeExceededError: If the member is too large. :raises CompressionRatioError: If the compression ratio is too high. :raises TypeError: If path is None. """ if path is None: raise TypeError( "SafeZipFile.extract() requires an explicit 'path' argument." ) base = Path(path).resolve() counters = CumulativeCounters() info = ( member if isinstance(member, zipfile.ZipInfo) else self._zf.getinfo(member) ) dest = self._extract_one(info, base, counters, pwd or self._password) return str(dest) def extractall( self, path: Union[str, os.PathLike], members: Optional[list] = None, *, pwd: Optional[bytes] = None, ) -> None: """Safely extract all (or selected) members to *path*. :param path: Destination directory (required; no default). :param members: Optional list of member names or ZipInfo objects. :param pwd: Optional decryption password. :raises UnsafeZipError: On path traversal, absolute paths, or symlinks. :raises FileSizeExceededError: If any member is too large. :raises TotalSizeExceededError: If total extracted size is too large. :raises CompressionRatioError: If any ratio limit is exceeded. :raises TypeError: If path is None. """ if path is None: raise TypeError( "SafeZipFile.extractall() requires an explicit 'path' argument; " "extraction to the current working directory is not permitted." ) base = Path(path).resolve() counters = CumulativeCounters() effective_pwd = pwd or self._password if members is None: infos = self._zf.infolist() else: infos = [ m if isinstance(m, zipfile.ZipInfo) else self._zf.getinfo(m) for m in members ] for info in infos: self._extract_one(info, base, counters, effective_pwd) def _extract_one( self, info: zipfile.ZipInfo, base: Path, counters: CumulativeCounters, pwd: Optional[bytes], ) -> Path: """Core per-member extraction logic.""" # Directories - create and skip streaming if info.filename.endswith("/"): dest = resolve_member_path(base, info.filename.rstrip("/")) dest.mkdir(parents=True, exist_ok=True) return dest # Validate and resolve the destination path (Sandbox / Phase B) try: dest = resolve_member_path(base, info.filename) except UnsafeZipError: self._emit_event("zip_slip_detected") log.warning( "Path traversal attempt blocked", extra={ "event": "zip_slip_detected", "member": info.filename[:256], "archive_hash": self._archive_hash, }, ) raise # Check for symlinks in the *source* entry # (detect if the ZIP entry itself is stored as a symlink) attr = (info.external_attr >> 16) & 0xFFFF is_symlink_entry = bool(attr and stat.S_ISLNK(attr)) if is_symlink_entry: if self._symlink_policy == SymlinkPolicy.REJECT: self._emit_event("symlink_rejected") log.warning( "Symlink entry rejected", extra={ "event": "symlink_rejected", "member": info.filename[:256], "archive_hash": self._archive_hash, }, ) raise UnsafeZipError( f"Symlink entry {info.filename!r} rejected (symlink_policy=REJECT)." ) if self._symlink_policy == SymlinkPolicy.IGNORE: self._emit_event("symlink_ignored") log.warning( "Symlink entry skipped (IGNORE policy)", extra={ "event": "symlink_ignored", "member": info.filename[:256], "archive_hash": self._archive_hash, }, ) return dest # Nested archive guard suffix = Path(info.filename).suffix.lower() is_archive_extension = suffix in _ARCHIVE_EXTENSIONS # Non-recursive: keep the debug log but don't gate on content if not self._recursive: if is_archive_extension: log.debug( "Nested archive detected: %r - extracting as raw file," " not recursing.", info.filename, ) else: # Recursive path: stream to temp first, then content-detect tmp = dest.parent / ( f"{dest.name}.safezip_tmp_{os.getpid()}_{os.urandom(4).hex()}" ) try: try: stream_extract_member( self._zf, info, tmp, max_file_size=self._max_file_size, max_per_member_ratio=self._max_per_member_ratio, max_total_size=self._max_total_size, max_total_ratio=self._max_total_ratio, counters=counters, pwd=pwd, ) except FileSizeExceededError: self._emit_event("file_size_exceeded") log.warning( "Member size limit exceeded during streaming", extra={ "event": "file_size_exceeded", "member": info.filename[:256], "archive_hash": self._archive_hash, }, ) raise except TotalSizeExceededError: self._emit_event("total_size_exceeded") log.warning( "Cumulative extraction size limit exceeded during streaming", extra={ "event": "total_size_exceeded", "member": info.filename[:256], "archive_hash": self._archive_hash, }, ) raise except CompressionRatioError: self._emit_event("compression_ratio_exceeded") log.warning( "Compression ratio limit exceeded during streaming", extra={ "event": "compression_ratio_exceeded", "member": info.filename[:256], "archive_hash": self._archive_hash, }, ) raise # Content-based detection (avoids extension-spoofing) if zipfile.is_zipfile(tmp): nested_dest = dest.parent / _archive_stem(dest.name) nested_dest.mkdir(parents=True, exist_ok=True) with SafeZipFile( tmp, max_file_size=self._max_file_size, max_total_size=self._max_total_size, max_files=self._max_files, max_per_member_ratio=self._max_per_member_ratio, max_total_ratio=self._max_total_ratio, max_nesting_depth=self._max_nesting_depth, symlink_policy=self._symlink_policy, password=self._password, on_security_event=self._on_security_event, recursive=True, _nesting_depth=self._nesting_depth + 1, ) as nested_zf: nested_zf.extractall(nested_dest, pwd=pwd) return nested_dest else: # Not a ZIP — rename temp to final destination as a regular file tmp.replace(dest) return dest finally: tmp.unlink(missing_ok=True) # Stream-extract with all runtime monitors (Phase C) try: stream_extract_member( self._zf, info, dest, max_file_size=self._max_file_size, max_per_member_ratio=self._max_per_member_ratio, max_total_size=self._max_total_size, max_total_ratio=self._max_total_ratio, counters=counters, pwd=pwd, ) except FileSizeExceededError: self._emit_event("file_size_exceeded") log.warning( "Member size limit exceeded during streaming", extra={ "event": "file_size_exceeded", "member": info.filename[:256], "archive_hash": self._archive_hash, }, ) raise except TotalSizeExceededError: self._emit_event("total_size_exceeded") log.warning( "Cumulative extraction size limit exceeded during streaming", extra={ "event": "total_size_exceeded", "member": info.filename[:256], "archive_hash": self._archive_hash, }, ) raise except CompressionRatioError: self._emit_event("compression_ratio_exceeded") log.warning( "Compression ratio limit exceeded during streaming", extra={ "event": "compression_ratio_exceeded", "member": info.filename[:256], "archive_hash": self._archive_hash, }, ) raise # Post-extraction permission sanitisation if not dest.is_symlink(): _sanitise_mode(dest, strip_special_bits=self._strip_special_bits) # Post-extraction symlink check (RESOLVE_INTERNAL policy) if dest.is_symlink() and self._symlink_policy == SymlinkPolicy.RESOLVE_INTERNAL: skip = check_symlink(dest, base, self._symlink_policy) if skip: dest.unlink(missing_ok=True) return dest def _emit_event(self, event_type: str) -> None: """Emit a SecurityEvent to the configured callback (if any).""" if self._on_security_event is None: return event = SecurityEvent( event_type=event_type, archive_hash=self._archive_hash, ) try: self._on_security_event(event) except Exception: log.exception( "on_security_event callback raised an exception " "(event_type=%r); suppressing to preserve security " "enforcement.", event_type, ) def safe_extract( archive: Union[str, os.PathLike, BinaryIO], destination: Union[str, os.PathLike], **kwargs, ) -> None: """ Convenience func: extract *archive* to *destination* using safe defaults. All keyword arguments are forwarded to :class:`SafeZipFile`. :param archive: Path to the ZIP file, or a file-like binary object. :param destination: Directory to extract into. """ with SafeZipFile(archive, **kwargs) as zf: zf.extractall(destination) src/safezip/_events.py ====================== src/safezip/_events.py """Security event types and symlink policy enum.""" import time from dataclasses import dataclass, field from enum import Enum from typing import Callable, Optional __author__ = "Artur Barseghyan " __copyright__ = "2026 Artur Barseghyan" __license__ = "MIT" __all__ = ( "SecurityEvent", "SymlinkPolicy", "SecurityEventCallback", ) class SymlinkPolicy(Enum): """Controls how symlink members in archives are handled.""" REJECT = "reject" """Any symlink entry raises UnsafeZipError (default).""" IGNORE = "ignore" """Symlink entries are silently skipped.""" RESOLVE_INTERNAL = "resolve_internal" """Symlink entries are extracted as regular files containing the raw link-target bytes. No OS symlink is created on disk.""" @dataclass class SecurityEvent: """Minimal, privacy-safe payload emitted to the on_security_event callback. Deliberately excludes filenames, paths, and member names so that forwarding this to a third-party service cannot leak confidential filesystem information. """ event_type: str """Short string identifying what happened, e.g. 'zip_slip_detected'.""" archive_hash: str """First 16 hex characters of the SHA-256 hash of the archive path/name.""" timestamp: float = field(default_factory=time.time) """Wall-clock time at the moment of detection (time.time()).""" # Type alias for the optional callback SecurityEventCallback = Optional[Callable[[SecurityEvent], None]] src/safezip/_exceptions.py ========================== src/safezip/_exceptions.py """Exception hierarchy for safezip.""" __author__ = "Artur Barseghyan " __copyright__ = "2026 Artur Barseghyan" __license__ = "MIT" __all__ = ( "SafezipError", "UnsafeZipError", "FileSizeExceededError", "TotalSizeExceededError", "CompressionRatioError", "FileCountExceededError", "NestingDepthError", "MalformedArchiveError", ) class SafezipError(Exception): """Base class for all safezip security exceptions.""" class UnsafeZipError(SafezipError): """Path traversal, absolute paths, or disallowed symlinks detected.""" class FileSizeExceededError(SafezipError): """A single member's decompressed size exceeds max_file_size.""" class TotalSizeExceededError(SafezipError): """Cumulative decompressed size across all members exceeds max_total_size.""" class CompressionRatioError(SafezipError): """Compression ratio exceeds the configured limit (per-member or total).""" class FileCountExceededError(SafezipError): """Archive entry count exceeds max_files.""" class NestingDepthError(SafezipError): """Nested archive depth exceeds max_nesting_depth.""" class MalformedArchiveError(SafezipError): """Archive is structurally invalid (ZIP64 inconsistency, count mismatch, etc.).""" src/safezip/_guard.py ===================== src/safezip/_guard.py """Phase A: pre-extraction static validation (the Guard).""" import logging import mmap import os import struct import tempfile import zipfile from contextlib import suppress from dataclasses import dataclass, field from typing import IO, BinaryIO, List, Optional, Tuple from ._exceptions import ( FileCountExceededError, FileSizeExceededError, MalformedArchiveError, ) log = logging.getLogger("safezip.security") __author__ = "Artur Barseghyan " __copyright__ = "2026 Artur Barseghyan" __license__ = "MIT" __all__ = ("validate_archive",) _ZIP64_EXTRA_TAG = 0x0001 _ZIP64_SENTINEL = 0xFFFFFFFF @dataclass class ScanResult: """Three-valued outcome of inspecting a zip file for overlapping records.""" is_bomb: Optional[bool] invalid_reason: Optional[str] = None overlap_detail: Optional[str] = None @classmethod def clean(cls) -> "ScanResult": return cls(is_bomb=False) @classmethod def bomb(cls, detail: str) -> "ScanResult": return cls(is_bomb=True, overlap_detail=detail) @classmethod def invalid(cls, reason: str) -> "ScanResult": return cls(is_bomb=None, invalid_reason=reason) # --------------------------------------------------------------------------- # Comprehensive Zip Bomb Detection (Fifield 2019) # --------------------------------------------------------------------------- ZIP64_EXTRA_ID = 0x0001 COMPRESS_STORED = 0 COMPRESS_DEFLATE = 8 COMPRESS_BZIP2 = 12 SENTINEL_32 = 0xFFFFFFFF SENTINEL_16 = 0xFFFF @dataclass class Config: max_aggregate_ratio: float = 10000.0 # Very high; let Streamer handle ratio checks max_total_uncompressed_bytes: int = ( 10 * 1024**3 ) # 10 GiB; above SafeZipFile default max_file_count: int = 100_000 # Above SafeZipFile default of 10_000 max_deflate_ratio: float = 1_032.0 max_bzip2_ratio: float = 1_434_375.0 @dataclass class FileEntry: filename: str header_offset: int compressed_size: int uncompressed_size: int compress_type: int cdh_extra_len: int = 0 lfh_extra_len: int = -1 data_start: int = 0 data_end: int = 0 @dataclass class Issue: kind: str detail: str @dataclass class DetectionResult: is_bomb: bool = False issues: List[Issue] = field(default_factory=list) compression_ratio: float = 0.0 total_uncompressed: int = 0 file_count: int = 0 zip_size: int = 0 zip64: bool = False def _find_eocd(mm: mmap.mmap, file_size: int) -> int: sig = b"PK\x05\x06" search_start = max(0, file_size - 65535 - 22) mm.seek(search_start) tail = mm.read(file_size - search_start) pos = tail.rfind(sig) return search_start + pos if pos != -1 else -1 def _read_eocd(mm: mmap.mmap, file_size: int) -> Tuple[int, int, bool]: eocd_pos = _find_eocd(mm, file_size) if eocd_pos == -1: raise ValueError("No End of Central Directory record found") mm.seek(eocd_pos) eocd = mm.read(22) if len(eocd) < 22: raise ValueError("Truncated EOCD") cd_count_16 = struct.unpack_from("= 20: mm.seek(eocd_pos - 20) locator = mm.read(20) if locator[:4] == b"PK\x06\x07": zip64_eocd_offset = struct.unpack_from("= 56 and eocd64[:4] == b"PK\x06\x06": cd_count_64 = struct.unpack_from(" dict: result: dict = {} i = 0 while i + 4 <= len(extra_bytes): hdr_id = struct.unpack_from(" Tuple[List[FileEntry], bool]: cd_offset, cd_count, is_zip64 = _read_eocd(mm, file_size) entries: List[FileEntry] = [] mm.seek(cd_offset) cdh_sig = b"PK\x01\x02" for _ in range(cd_count): header = mm.read(46) if len(header) < 46: raise ValueError( f"Truncated central directory header: expected 46 bytes, " f"got {len(header)}" ) if header[:4] != cdh_sig: raise ValueError( f"Invalid central directory header signature: " f"expected {cdh_sig!r}, got {header[:4]!r}" ) compress_type = struct.unpack_from(" None: lfh_sig = b"PK\x03\x04" file_size = mm.size() for e in entries: if e.header_offset + LFH_FIXED > file_size: e.data_start = e.header_offset e.data_end = e.header_offset + e.compressed_size continue mm.seek(e.header_offset) lfh = mm.read(LFH_FIXED) if len(lfh) < LFH_FIXED or lfh[:4] != lfh_sig: e.data_start = e.header_offset e.data_end = e.header_offset + e.compressed_size continue lfh_fname_len = struct.unpack_from(" List[Tuple[FileEntry, FileEntry]]: if not entries: return [] sorted_e = sorted(entries, key=lambda e: e.data_start) overlaps: List[Tuple[FileEntry, FileEntry]] = [] max_end = sorted_e[0].data_end max_end_entry = sorted_e[0] for e in sorted_e[1:]: if e.data_start < max_end: overlaps.append((max_end_entry, e)) if e.data_end > max_end: max_end = e.data_end max_end_entry = e return overlaps def check_extra_field_quoting(entries: List[FileEntry]) -> List[FileEntry]: if not entries: return [] sorted_e = sorted(entries, key=lambda e: e.header_offset) suspicious: List[FileEntry] = [] for i, e in enumerate(sorted_e[:-1]): next_e = sorted_e[i + 1] eff_extra = e.lfh_extra_len if e.lfh_extra_len >= 0 else e.cdh_extra_len if eff_extra > 0 and e.data_start >= next_e.header_offset: suspicious.append(e) return suspicious def check_compression_ratios( entries: List[FileEntry], cfg: Config ) -> List[Tuple[FileEntry, float]]: suspicious = [] for e in entries: if e.compressed_size <= 0: continue ratio = e.uncompressed_size / e.compressed_size limit = ( cfg.max_bzip2_ratio if e.compress_type == COMPRESS_BZIP2 else cfg.max_deflate_ratio ) if ratio > limit: suspicious.append((e, ratio)) return suspicious def detect_zip_bomb(path: str, cfg: Optional[Config] = None) -> DetectionResult: if cfg is None: cfg = Config() zip_size = os.path.getsize(path) result = DetectionResult(is_bomb=False, zip_size=zip_size) with open(path, "rb") as f, mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm: try: entries, is_zip64 = parse_central_directory(mm, zip_size) except (ValueError, struct.error) as exc: result.issues.append( Issue("parse_error", f"Could not parse central directory: {exc}") ) result.is_bomb = True return result result.zip64 = is_zip64 result.file_count = len(entries) try: resolve_data_intervals(mm, entries) except Exception: for e in entries: if e.data_start == 0: e.data_start = e.header_offset e.data_end = e.header_offset + e.compressed_size overlaps = check_overlapping_files(entries) if overlaps: has_full = any(a.header_offset == b.header_offset for a, b in overlaps) kind = "full_overlap" if has_full else "quoted_overlap" sample = [(a.filename, b.filename) for a, b in overlaps[:3]] result.issues.append( Issue( kind, f"Overlapping file data detected ({len(overlaps)} pair(s)). " f"Sample: {sample}. " f"Matches Fifield " f"{'full-overlap' if has_full else 'quoted_overlap (or giant-steps)'} " f"construction.", ) ) result.is_bomb = True extra_q = check_extra_field_quoting(entries) if extra_q: names = [e.filename for e in extra_q[:3]] result.issues.append( Issue( "extra_field_quoting", f"Extra-field quoting detected in {len(extra_q)} file(s): {names}. " "LFH extra fields enclose subsequent local file headers.", ) ) result.is_bomb = True total_uncompressed = sum(e.uncompressed_size for e in entries) result.total_uncompressed = total_uncompressed overall_ratio = total_uncompressed / zip_size if zip_size > 0 else 0.0 result.compression_ratio = overall_ratio if overall_ratio > cfg.max_aggregate_ratio: result.issues.append( Issue( "aggregate_ratio", f"Extreme aggregate compression ratio: {overall_ratio:,.0f}:1 " f"({total_uncompressed / 1e9:.2f} GiB uncompressed from " f"{zip_size / 1e6:.2f} MiB zip)", ) ) result.is_bomb = True if total_uncompressed > cfg.max_total_uncompressed_bytes: result.issues.append( Issue( "total_size", f"Total uncompressed size {total_uncompressed / 1e9:.2f} GiB " f"exceeds limit of {cfg.max_total_uncompressed_bytes / 1e9:.2f} GiB", ) ) result.is_bomb = True bad_ratios = check_compression_ratios(entries, cfg) if bad_ratios: worst_entry, worst_ratio = max(bad_ratios, key=lambda x: x[1]) cname = { COMPRESS_STORED: "stored", COMPRESS_DEFLATE: "DEFLATE", COMPRESS_BZIP2: "bzip2", }.get(worst_entry.compress_type, str(worst_entry.compress_type)) limit = ( cfg.max_bzip2_ratio if worst_entry.compress_type == COMPRESS_BZIP2 else cfg.max_deflate_ratio ) result.issues.append( Issue( "per_file_ratio", f"File '{worst_entry.filename}' ({cname}) ratio {worst_ratio:,.0f}:1 " f"exceeds the {cname} theoretical maximum of {limit:,.0f}:1", ) ) result.is_bomb = True if result.file_count > cfg.max_file_count: result.issues.append( Issue( "file_count", f"Suspiciously high file count: {result.file_count:,} " f"(threshold {cfg.max_file_count:,})", ) ) result.is_bomb = True return result class ZipInspector: """Parses a zip file's structural records and checks for overlapping spans. Based on the approach described in David Fifield's zip bomb research: https://www.bamsoftware.com/hacks/zipbomb/ """ _SEARCH_BLOCK = 8192 def __init__(self, fileobj: BinaryIO, verbose: bool = False) -> None: self._fobj = fileobj self._verbose = verbose self._file_size: int = 0 self._record_spans: list[tuple[int, int]] = [] def scan(self) -> ScanResult: """Inspect the zip file and return a ScanResult.""" self._fobj.seek(0, os.SEEK_END) self._file_size = self._fobj.tell() self._record_spans = [] directory = self._locate_central_directory() if directory is None: return ScanResult.invalid("could not locate a valid central directory") num_entries, cd_byte_length, cd_offset = directory local_spans = self._walk_central_directory( num_entries, cd_byte_length, cd_offset ) if local_spans is None: return ScanResult.invalid( "central directory parse error or unsupported feature" ) return self._check_spans(local_spans) def _locate_central_directory(self) -> Optional[tuple[int, int, int]]: """Scan backwards through the file for a valid EOCD record.""" block = self._SEARCH_BLOCK cursor = self._file_size readback = 22 carry = b"" check_count = 1 while True: cursor -= readback if cursor < 0: return None self._fobj.seek(cursor, os.SEEK_SET) window = self._fobj.read(readback) + carry[:21] while check_count > 0: check_count -= 1 if ( window[check_count] == 0x50 and window[check_count + 1] == 0x4B and window[check_count + 2] == 0x05 and window[check_count + 3] == 0x06 ): result = self._validate_eocd( window[check_count + 4 : check_count + 22], cursor + check_count, ) if result is not None: return result carry = window readback = ((cursor - 1) & (block - 1)) + 1 check_count = readback def _validate_eocd( self, eocd_body: bytes, eocd_offset: int ) -> Optional[tuple[int, int, int]]: """Validate the EOCD record and handle Zip64 when needed.""" if len(eocd_body) < 18: return None raw = struct.unpack(" self._file_size: return None spans_scratch: list[tuple[int, int]] = [ (eocd_offset, eocd_offset + 22 + comment_len) ] entries_on_disk, total_entries, cd_length, cd_offset = raw[2:6] if ( entries_on_disk == 0xFFFF or total_entries == 0xFFFF or cd_length == 0xFFFFFFFF or cd_offset == 0xFFFFFFFF ): z64 = self._read_zip64_records(eocd_offset, spans_scratch) if z64 is None: return None total_entries, cd_length, cd_offset = z64 else: if total_entries != entries_on_disk: return None if cd_offset + cd_length > self._file_size: return None spans_scratch.append((cd_offset, cd_offset + cd_length)) spans_scratch.reverse() self._record_spans.extend(spans_scratch) return (total_entries, cd_length, cd_offset) def _read_zip64_records( self, eocd_offset: int, spans_scratch: list[tuple[int, int]], ) -> Optional[tuple[int, int, int]]: """Read the Zip64 locator and record.""" if eocd_offset < 20: return None self._fobj.seek(eocd_offset - 20, os.SEEK_SET) loc_sig, loc_disk, z64_eocd_offset, loc_total_disks = struct.unpack( " self._file_size ): return None spans_scratch.append((eocd_offset - 20, eocd_offset)) self._fobj.seek(z64_eocd_offset, os.SEEK_SET) z64 = struct.unpack(" Optional[list[tuple[int, int]]]: """Read every central directory header and resolve to local entry spans.""" self._fobj.seek(cd_offset, os.SEEK_SET) cd_bytes = self._fobj.read(cd_byte_length) local_spans: list[tuple[int, int]] = [] cursor = 0 remaining = num_entries while remaining > 0: if cursor + 46 > cd_byte_length: return None span = self._parse_cdh_entry(cd_bytes, cursor, cd_byte_length) if span is None: return None entry_span, next_cursor = span local_spans.append(entry_span) cursor = next_cursor remaining -= 1 if cursor != cd_byte_length: return None return local_spans def _parse_cdh_entry( self, cd_bytes: bytes, offset: int, cd_length: int ) -> Optional[tuple[tuple[int, int], int]]: """Parse one central directory header and return local span.""" hdr = struct.unpack(" cd_length: return None compressed_size = hdr[8] uncompressed_size = hdr[9] disk_number = hdr[13] local_hdr_offset = hdr[16] if ( compressed_size == 0xFFFFFFFF or uncompressed_size == 0xFFFFFFFF or disk_number == 0xFFFF or local_hdr_offset == 0xFFFFFFFF ): z64_result = self._resolve_zip64_cdh_fields( cd_bytes, offset + fname_len, offset + fname_len + extra_len, compressed_size, uncompressed_size, disk_number, local_hdr_offset, ) if z64_result is None: return None ( compressed_size, uncompressed_size, disk_number, local_hdr_offset, ) = z64_result offset += fname_len + extra_len + comment_len else: offset += total_variable if disk_number != 0: return None if local_hdr_offset + 30 > self._file_size: return None local_end = self._measure_local_entry( local_hdr_offset, compressed_size, uncompressed_size, hdr[7], ) if local_end is None: return None return ((local_hdr_offset, local_end), offset) @staticmethod def _resolve_zip64_cdh_fields( cd_bytes: bytes, extra_start: int, extra_end: int, compressed_size: int, uncompressed_size: int, disk_number: int, local_hdr_offset: int, ) -> Optional[tuple[int, int, int, int]]: """Walk the extra field looking for Zip64 extended information block.""" pos = extra_start while pos + 4 <= extra_end: field_id, field_data_len = struct.unpack(" extra_end: return None field_end = pos + field_data_len if field_id != 0x0001: pos = field_end continue if uncompressed_size == 0xFFFFFFFF: if pos + 8 > field_end: return None uncompressed_size = struct.unpack(" field_end: return None compressed_size = struct.unpack(" field_end: return None local_hdr_offset = struct.unpack(" field_end: return None disk_number = struct.unpack(" Optional[int]: """Read the local file header and return the byte offset after this entry.""" self._fobj.seek(local_offset, os.SEEK_SET) raw = self._fobj.read(30) if len(raw) < 30: return None lfh = struct.unpack(" self._file_size: return None if flags & 0x08: descriptor_end = self._measure_data_descriptor( entry_end, expected_crc, compressed_size, uncompressed_size ) if descriptor_end is None: return None entry_end = descriptor_end return entry_end def _measure_data_descriptor( self, descriptor_offset: int, expected_crc: int, compressed_size: int, uncompressed_size: int, ) -> Optional[int]: """Determine the extent of the optional data descriptor.""" self._fobj.seek(descriptor_offset, os.SEEK_SET) raw = self._fobj.read(24) if len(raw) == 24: d = struct.unpack("= 20: d = struct.unpack("= 16: d = struct.unpack("= 12: d = struct.unpack(" ScanResult: """Merge local entry spans with structural spans and scan for overlaps.""" all_spans = local_spans + self._record_spans all_spans.sort() _, prev_end = all_spans[0] for span_start, span_end in all_spans[1:]: if prev_end > span_start: return ScanResult.bomb( f"records overlap: previous ends at {prev_end}, " f"next starts at {span_start}" ) prev_end = span_end return ScanResult.clean() def _run_overlap_detection(path: str, cfg: Optional[Config]) -> None: """Run detect_zip_bomb against a filesystem path and raise on positive.""" try: result = detect_zip_bomb(path, cfg) except Exception as exc: raise MalformedArchiveError( f"Failed to parse archive for overlap detection: {exc}" ) from exc if result.is_bomb: details = "; ".join(i.detail for i in result.issues[:2]) raise MalformedArchiveError(f"overlapping entries detected: {details}") def _check_overlapping_entries( fileobj: IO[bytes], cfg: Optional[Config] = None ) -> None: """Detect Fifield-style zip bombs using comprehensive detection. This function uses `detect_zip_bomb()` to analyse the archive for overlapping entries, extra-field quoting, and other Fifield 2019 attack vectors. For in-memory BinaryIO objects without a filesystem path, the archive is spilled to a temporary file to enable mmap-based detection. :param fileobj: A seekable binary file object. :param cfg: Optional Config with limits. If not provided, uses defaults. :raises MalformedArchiveError: If overlapping entries are detected. """ path = getattr(fileobj, "name", None) if path is not None: _run_overlap_detection(path, cfg) return # BinaryIO input: spill to a temporary file so mmap-based detection # can run. Save and restore position so the caller's zipfile.ZipFile # instance is not disturbed. try: pos = fileobj.tell() except OSError: pos = None try: try: fileobj.seek(0) except OSError: log.warning( "Skipping Fifield-style zip bomb detection: " "in-memory archive is not seekable." ) return tmp_path = None try: with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp: tmp_path = tmp.name tmp.write(fileobj.read()) _run_overlap_detection(tmp_path, cfg) finally: if tmp_path is not None: with suppress(OSError): os.unlink(tmp_path) finally: if pos is not None: with suppress(OSError): fileobj.seek(pos) def _check_zip64_consistency(info: zipfile.ZipInfo) -> None: """Detect ZIP64 inconsistencies and missing ZIP64 blocks. Two classes of problem are caught: 1. **Missing ZIP64 block**: A 32-bit field holds the sentinel value ``0xFFFFFFFF`` (meaning "look in ZIP64 extra field"), but no ZIP64 extra field is present. This is always a malformed archive. 2. **Disagreeing ZIP64 block**: A ZIP64 extra field is present, but the 64-bit value it reports differs from the size that Python's ``zipfile`` resolved from the central directory. A crafted archive can set the 32-bit field to a small non-sentinel value while hiding a huge size in the ZIP64 block; Python uses the small 32-bit value, but we see the discrepancy and reject the archive. """ if info.file_size == SENTINEL_32 or info.compress_size == SENTINEL_32: zip64 = _parse_zip64_extra(info.extra) if info.extra else {} if not zip64: raise MalformedArchiveError( f"Entry {info.filename!r} has a ZIP64 sentinel (0xFFFFFFFF) " f"in the 32-bit size field but no ZIP64 extra field is present. " f"Archive is malformed." ) return if not info.extra: return zip64 = _parse_zip64_extra(info.extra) if not zip64: return if "uncompressed_size" in zip64 and zip64["uncompressed_size"] != info.file_size: raise MalformedArchiveError( f"ZIP64 inconsistency in entry {info.filename!r}: " f"extra field reports uncompressed_size=" f"{zip64['uncompressed_size']}, " f"but central directory reports {info.file_size}. " f"Archive may be crafted." ) if "compressed_size" in zip64 and zip64["compressed_size"] != info.compress_size: raise MalformedArchiveError( f"ZIP64 inconsistency in entry {info.filename!r}: " f"extra field reports compressed_size=" f"{zip64['compressed_size']}, " f"but central directory reports {info.compress_size}. " f"Archive may be crafted." ) def _validate_entry(info: zipfile.ZipInfo, max_file_size: int) -> None: """Validate a single ZipInfo entry during the Guard phase.""" # Null bytes in filename if "\x00" in info.filename: raise MalformedArchiveError( f"Entry filename contains a null byte: {info.filename!r}" ) # ZIP64 consistency _check_zip64_consistency(info) # Declared size early-rejection (Streamer enforces at stream time too) if info.file_size > max_file_size: raise FileSizeExceededError( f"Entry {info.filename!r} declares uncompressed size " f"{info.file_size:,} bytes, which exceeds the limit of " f"{max_file_size:,} bytes." ) def validate_archive( zf: zipfile.ZipFile, max_files: int, max_file_size: int, max_total_size: int, ) -> None: """Phase A: run all pre-extraction static checks. :param zf: An open zipfile.ZipFile instance (read-only access). :param max_files: Maximum number of entries permitted. :param max_file_size: Maximum permitted uncompressed size for any entry. :param max_total_size: Maximum permitted total uncompressed size. :raises FileCountExceededError: If the archive has too many entries. :raises FileSizeExceededError: If any entry's declared size is too large. :raises MalformedArchiveError: If structural anomalies are detected. """ try: entries = zf.infolist() except Exception as exc: raise MalformedArchiveError(f"Cannot read central directory: {exc}") from exc if len(entries) > max_files: raise FileCountExceededError( f"Archive contains {len(entries):,} entries, " f"which exceeds the limit of {max_files:,}." ) if zf.fp is not None: cfg = Config( max_total_uncompressed_bytes=max_total_size, max_file_count=max_files, ) _check_overlapping_entries(zf.fp, cfg) for info in entries: _validate_entry(info, max_file_size) src/safezip/_sandbox.py ======================= src/safezip/_sandbox.py """Phase B: path resolution and symlink policy enforcement (the Sandbox).""" import unicodedata from pathlib import Path from ._events import SymlinkPolicy from ._exceptions import UnsafeZipError __author__ = "Artur Barseghyan " __copyright__ = "2026 Artur Barseghyan" __license__ = "MIT" __all__ = ( "resolve_member_path", "check_symlink", ) # Practical upper bound; real OS limits vary but 4096 is a safe conservative cap _MAX_PATH_LENGTH = 4096 def resolve_member_path( base: Path, member_filename: str, ) -> Path: """Resolve and validate a ZIP member filename against *base*. Applies the full path normalisation pipeline: 1. Unicode NFC normalisation (catch lookalike characters). 2. Null-byte rejection. 3. Reject absolute Unix paths (starting with ``/``) and absolute Windows paths (drive letter + slash, e.g. ``C:/``). 4. Reject any ``..`` path component. 5. Verify the resolved path is inside *base*. 6. Reject paths whose resolved length exceeds ``_MAX_PATH_LENGTH``. :param base: The extraction root directory (must be absolute). :param member_filename: Raw filename string from the ZIP central directory. :returns: Resolved absolute Path inside *base*. :raises UnsafeZipError: If the filename is unsafe for any reason. """ # 1. Unicode NFC normalisation try: normalized = unicodedata.normalize("NFC", member_filename) except (TypeError, ValueError) as err: raise UnsafeZipError(f"Cannot normalise filename: {member_filename!r}") from err # 2. Null-byte rejection if "\x00" in normalized: raise UnsafeZipError(f"Filename contains a null byte: {normalized!r}") # 3. Normalise separators _norm = normalized.replace("\\", "/") # Reject absolute Unix paths and UNC paths (start with '/') if _norm.startswith("/"): raise UnsafeZipError(f"Absolute path detected in filename: {member_filename!r}") # Reject absolute Windows paths with drive letters (e.g. "C:/Windows") if len(_norm) >= 3 and _norm[1] == ":" and _norm[2] == "/" and _norm[0].isalpha(): raise UnsafeZipError( f"Absolute Windows path detected in filename: {member_filename!r}" ) parts = _norm.split("/") # Strip Windows-style relative drive # references (e.g. "C:relpath") - defence-in-depth clean_parts = [] for part in parts: # Skip empty parts (double-slashes) and current-dir dots if not part or part == ".": continue # Reject parent-directory traversal if part == "..": raise UnsafeZipError( f"Path traversal detected in filename: {member_filename!r}" ) # Strip Windows-style relative drive # references (e.g. "C:relpath" → "relpath") if len(part) >= 2 and part[1] == ":" and part[0].isalpha(): part = part[2:] if not part: continue clean_parts.append(part) if not clean_parts: raise UnsafeZipError(f"Filename resolves to empty path: {member_filename!r}") # 4. Build the resolved path resolved = base for part in clean_parts: resolved = resolved / part # 5. Confirm the resolved path is inside base try: resolved.relative_to(base) except ValueError as err: raise UnsafeZipError( f"Resolved path escapes base directory: {resolved!r} is not under {base!r}" ) from err # 6. Path length check if len(str(resolved)) > _MAX_PATH_LENGTH: raise UnsafeZipError( f"Resolved path is too long ({len(str(resolved))} chars): " f"{str(resolved)[:120]!r}..." ) return resolved def check_symlink( extracted_path: Path, base: Path, policy: SymlinkPolicy, ) -> bool: """ Check whether *extracted_path* is (or contains) a symlink, & apply policy. :param extracted_path: The path that was just extracted. :param base: The extraction root directory. :param policy: The configured symlink policy. :returns: ``True`` if the member should be skipped (IGNORE policy). :raises UnsafeZipError: If REJECT policy or chain exits base directory. """ if not extracted_path.is_symlink(): return False if policy == SymlinkPolicy.REJECT: raise UnsafeZipError( f"Symlink detected and symlink_policy is REJECT: {extracted_path}" ) if policy == SymlinkPolicy.IGNORE: return True # caller should skip this member # RESOLVE_INTERNAL: follow the full chain and verify every hop _verify_symlink_chain(extracted_path, base) return False def _verify_symlink_chain(link_path: Path, base: Path) -> None: """Verify the full symlink chain from *link_path* stays inside *base*. Follows every link until a non-symlink is reached or an escape is detected. :raises UnsafeZipError: If any link in the chain exits *base*. """ visited = set() current = link_path while current.is_symlink(): real = str(current.resolve()) if real in visited: # Cycle detected; treat as unsafe raise UnsafeZipError( f"Symlink cycle detected at {current}: refusing to follow further." ) visited.add(real) try: current.resolve().relative_to(base.resolve()) except ValueError as err: raise UnsafeZipError( f"Symlink chain for {link_path} exits the base directory " f"at {current} → {current.resolve()}" ) from err current = current.resolve() src/safezip/_streamer.py ======================== src/safezip/_streamer.py """Phase C: streaming extraction with runtime enforcement (the Streamer).""" import contextlib import logging import os import zipfile from pathlib import Path from typing import Optional from ._exceptions import ( CompressionRatioError, FileSizeExceededError, TotalSizeExceededError, ) __author__ = "Artur Barseghyan " __copyright__ = "2026 Artur Barseghyan" __license__ = "MIT" __all__ = ( "stream_extract_member", "CumulativeCounters", ) log = logging.getLogger("safezip.security") _CHUNK_SIZE = 65_536 # 64 KiB class CumulativeCounters: """Tracks totals across all members in a single extractall/extract call.""" __slots__ = ("bytes_written", "compressed_bytes") def __init__(self) -> None: self.bytes_written: int = 0 self.compressed_bytes: int = 0 def stream_extract_member( zf: zipfile.ZipFile, member: zipfile.ZipInfo, dest: Path, *, max_file_size: int, max_per_member_ratio: float, max_total_size: int, max_total_ratio: float, counters: CumulativeCounters, pwd: Optional[bytes] = None, ) -> None: """ Stream a single member from *zf* to *dest* with full runtime enforcement. Extraction is atomic: bytes are written to a temporary file and renamed to *dest* only after all checks pass. If any check raises, the temporary file is deleted and *dest* is never created/modified. :param zf: Open zipfile.ZipFile instance (internal use only). :param member: The ZipInfo entry to extract. :param dest: Final destination path (must already be path-validated). :param max_file_size: Per-member decompressed size limit in bytes. :param max_per_member_ratio: Per-member decompressed/compressed ratio limit. :param max_total_size: Cumulative decompressed size limit across all members. :param max_total_ratio: Cumulative ratio limit across all members. :param counters: Shared counters for cumulative checks. :param pwd: Optional decryption password. """ dest.parent.mkdir(parents=True, exist_ok=True) tmp_name = f"{dest.name}.safezip_tmp_{os.getpid()}_{os.urandom(4).hex()}" tmp_path = dest.parent / tmp_name # compress_size may be 0 for data-descriptor archives compress_size = member.compress_size member_bytes_written = 0 try: with zf.open(member, pwd=pwd) as src, open(tmp_path, "wb") as dst: while True: chunk = src.read(_CHUNK_SIZE) if not chunk: break chunk_len = len(chunk) member_bytes_written += chunk_len counters.bytes_written += chunk_len # --- Per-member size check --- if member_bytes_written > max_file_size: raise FileSizeExceededError( f"Member {member.filename!r} exceeded max_file_size=" f"{max_file_size:,} bytes " f"(decompressed {member_bytes_written:,} bytes so " "far)." ) # --- Per-member ratio check --- # Only when compress_size is known (not a data-descriptor # entry). if compress_size > 0: ratio = member_bytes_written / compress_size if ratio > max_per_member_ratio: raise CompressionRatioError( f"Member {member.filename!r} compression ratio " f"{ratio:.1f}:1 exceeds " f"max_per_member_ratio={max_per_member_ratio}:1." ) # --- Cumulative size check --- if counters.bytes_written > max_total_size: raise TotalSizeExceededError( f"Cumulative decompressed size " f"{counters.bytes_written:,} bytes exceeds " f"max_total_size={max_total_size:,} bytes." ) # --- Cumulative ratio check --- # Update compressed bytes estimate from the running member. if compress_size > 0: counters.compressed_bytes += ( chunk_len * compress_size // max(member.file_size, 1) ) if counters.compressed_bytes > 0: total_ratio = counters.bytes_written / counters.compressed_bytes # noqa if total_ratio > max_total_ratio: raise CompressionRatioError( f"Cumulative compression ratio {total_ratio:.1f}:1 " f"exceeds max_total_ratio={max_total_ratio}:1." ) dst.write(chunk) # All checks passed - atomic rename to final destination tmp_path.replace(dest) except Exception: # Clean up partial / temporary file on any failure with contextlib.suppress(OSError): tmp_path.unlink(missing_ok=True) raise src/safezip/cli/__init__.py =========================== src/safezip/cli/__init__.py """safezip.cli — command-line interface for safezip.""" from ._main import main __all__ = ("main",) src/safezip/cli/_main.py ======================== src/safezip/cli/_main.py """safezip CLI — hardened ZIP extraction from the command line.""" import argparse import sys import zipfile from pathlib import Path from safezip import SafeZipFile, SymlinkPolicy, safe_extract from safezip._exceptions import SafezipError __all__ = ("main",) _SYMLINK_POLICIES = { "reject": SymlinkPolicy.REJECT, "ignore": SymlinkPolicy.IGNORE, "resolve_internal": SymlinkPolicy.RESOLVE_INTERNAL, } def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="safezip", description="Hardened ZIP extraction — safe by default.", ) parser.add_argument( "--version", action="version", version=f"%(prog)s {_version()}", ) sub = parser.add_subparsers(dest="command", required=True) # ------------------------------------------------------------------ extract ext = sub.add_parser("extract", help="Extract a ZIP archive safely.") ext.add_argument("archive", help="Path to the ZIP file.") ext.add_argument("destination", help="Directory to extract into.") ext.add_argument( "--max-file-size", type=int, metavar="BYTES", help="Max uncompressed size per member (default: 1 GiB).", ) ext.add_argument( "--max-total-size", type=int, metavar="BYTES", help="Max total uncompressed size (default: 5 GiB).", ) ext.add_argument( "--max-files", type=int, metavar="N", help="Max number of members (default: 10 000).", ) ext.add_argument( "--max-per-member-ratio", type=float, metavar="RATIO", help="Max compression ratio per member (default: 200).", ) ext.add_argument( "--max-total-ratio", type=float, metavar="RATIO", help="Max overall compression ratio (default: 200).", ) ext.add_argument( "--max-nesting-depth", type=int, metavar="N", help="Max nested-archive depth (default: 3).", ) ext.add_argument( "--symlink-policy", choices=list(_SYMLINK_POLICIES), default=None, metavar="POLICY", help=( "How to handle symlink entries: reject (default), ignore, resolve_internal." ), ) ext.add_argument( "--password", metavar="PWD", help="Decryption password for encrypted archives.", ) ext.add_argument( "--recursive", action="store_true", default=False, help="Enable recursive extraction of nested archives.", ) # --------------------------------------------------------------------- list lst = sub.add_parser("list", help="List members of a ZIP archive.") lst.add_argument("archive", help="Path to the ZIP file.") return parser def _version() -> str: try: from safezip import __version__ return __version__ except ImportError: return "unknown" def _cmd_extract(args: argparse.Namespace) -> int: kwargs: dict = {} for attr in ( "max_file_size", "max_total_size", "max_files", "max_per_member_ratio", "max_total_ratio", "max_nesting_depth", "recursive", ): val = getattr(args, attr, None) if val is not None: kwargs[attr] = val if args.symlink_policy is not None: kwargs["symlink_policy"] = _SYMLINK_POLICIES[args.symlink_policy] if args.password is not None: kwargs["password"] = args.password.encode() dest = Path(args.destination) dest.mkdir(parents=True, exist_ok=True) try: safe_extract(args.archive, dest, **kwargs) except SafezipError as exc: print(f"error: {exc}", file=sys.stderr) return 1 except FileNotFoundError as exc: print(f"error: {exc}", file=sys.stderr) return 1 except zipfile.BadZipFile as exc: print(f"error: {exc}", file=sys.stderr) return 1 print(f"Extracted to {dest.resolve()}") return 0 def _cmd_list(args: argparse.Namespace) -> int: try: with SafeZipFile(args.archive) as zf: for name in zf.namelist(): print(name) except SafezipError as exc: print(f"error: {exc}", file=sys.stderr) return 1 except FileNotFoundError as exc: print(f"error: {exc}", file=sys.stderr) return 1 except zipfile.BadZipFile as exc: print(f"error: {exc}", file=sys.stderr) return 1 return 0 def main() -> None: parser = _build_parser() args = parser.parse_args() if args.command == "extract": sys.exit(_cmd_extract(args)) elif args.command == "list": sys.exit(_cmd_list(args)) else: # pragma: no cover parser.print_help() sys.exit(1) src/safezip/tests/__init__.py ============================= src/safezip/tests/__init__.py """Tests for safezip.""" __author__ = "Artur Barseghyan " __copyright__ = "2026 Artur Barseghyan" __license__ = "MIT" src/safezip/tests/conftest.py ============================= src/safezip/tests/conftest.py """Pytest fixtures: factory functions that craft malicious ZIP archives.""" import io import stat import struct import zipfile import zlib import pytest __author__ = "Artur Barseghyan " __copyright__ = "2026 Artur Barseghyan" __license__ = "MIT" __all__ = ( "zipslip_archive", "absolute_path_archive", "unicode_traversal_archive", "high_ratio_archive", "many_files_archive", "null_byte_filename_archive", "zip64_inconsistency_archive", "legitimate_archive", "symlink_archive", "fifield_bomb_archive", ) # --------------------------------------------------------------------------- # Archive factory helpers # --------------------------------------------------------------------------- def _make_zip_bytes(entries: list[tuple[str, bytes]]) -> bytes: """Create a ZIP in memory from (filename, content) pairs.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf: for name, content in entries: info = zipfile.ZipInfo(name) zf.writestr(info, content) return buf.getvalue() def _make_zip_bytes_stored(entries: list[tuple[str, bytes]]) -> bytes: """Create a stored (uncompressed) ZIP in memory.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_STORED) as zf: for name, content in entries: info = zipfile.ZipInfo(name) zf.writestr(info, content) return buf.getvalue() # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture() def zipslip_archive(tmp_path): """A ZIP whose sole entry has a path-traversal filename.""" data = _make_zip_bytes([("../../evil.txt", b"evil content")]) p = tmp_path / "zipslip.zip" p.write_bytes(data) return p @pytest.fixture() def absolute_path_archive(tmp_path): """A ZIP with an absolute Unix-style path entry.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: info = zipfile.ZipInfo("/etc/passwd") zf.writestr(info, "root:x:0:0:root:/root:/bin/bash\n") data = buf.getvalue() p = tmp_path / "absolute.zip" p.write_bytes(data) return p @pytest.fixture() def unicode_traversal_archive(tmp_path): """A ZIP with combining Unicode characters that NFC-normalises to a path still containing a ``..`` traversal component. The filename ``e\\u0301vil/../../escape.txt`` uses U+0301 COMBINING ACUTE ACCENT (NFD form of ``é``). After Unicode NFC normalisation the combining accent is folded into the precomposed ``é``, yielding ``évil/../../escape.txt``. The ``..`` components are unaffected by NFC and must still be detected and rejected. """ # e + COMBINING ACUTE ACCENT → é after NFC; the traversal stays intact data = _make_zip_bytes([("e\u0301vil/../../escape.txt", b"escaped")]) p = tmp_path / "unicode_traversal.zip" p.write_bytes(data) return p @pytest.fixture() def high_ratio_archive(tmp_path): """A ZIP whose content compresses at a very high ratio (zeros).""" # 2 MiB of zeros → compressed to ~2 KB → ratio ~1000:1 data_bytes = b"\x00" * (2 * 1024 * 1024) buf = io.BytesIO() with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf: zf.writestr("zeros.bin", data_bytes) p = tmp_path / "bomb.zip" p.write_bytes(buf.getvalue()) return p @pytest.fixture() def many_files_archive(tmp_path): """A ZIP with more entries than the default max_files limit allows.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: for i in range(15_000): zf.writestr(f"file_{i:05d}.txt", b"x") p = tmp_path / "many_files.zip" p.write_bytes(buf.getvalue()) return p @pytest.fixture() def null_byte_filename_archive(tmp_path): """A ZIP with a null byte injected into a filename via raw struct manipulation. Python's zipfile won't let us write such names directly, so we craft the raw bytes: a minimal ZIP with one entry whose filename contains \\x00. """ # Minimal ZIP structure: # Local file header + file data + central directory + end of central directory filename = b"safe\x00../../etc/passwd" fname_len = len(filename) content = b"evil" content_len = len(content) # Local file header (signature 0x04034b50) local_header = ( struct.pack( "<4s2H3H4s2I2H", b"PK\x03\x04", # signature 20, # version needed 0, # flags 0, # compression (stored) 0, # mod time 0, # mod date b"\x00\x00\x00\x00", # CRC-32 content_len, # compressed size content_len, # uncompressed size fname_len, # filename length 0, # extra field length ) + filename + content ) local_offset = 0 # Central directory header (signature 0x02014b50) # Format: 4s sig | 6H (ver_made,ver_needed,flags,compress,mod_time,mod_date) | # 4s CRC | 2I (comp_size,uncomp_size) | # 5H (fname_len,extra_len,comment_len,disk_start,int_attr) | # 2I (ext_attr, offset) → 17 items, 46 bytes central_header = ( struct.pack( "<4s6H4s2I5H2I", b"PK\x01\x02", # signature 0x031E, # version made by (Unix, v30) 20, # version needed 0, # flags 0, # compression 0, # mod time 0, # mod date b"\x00\x00\x00\x00", # CRC-32 content_len, # compressed size (I) content_len, # uncompressed size (I) fname_len, # filename length 0, # extra field length 0, # file comment length 0, # disk number start 0, # internal file attributes 0, # external file attributes (I) local_offset, # relative offset of local header (I) ) + filename ) central_offset = len(local_header) central_size = len(central_header) # End of central directory record (signature 0x06054b50) eocd = struct.pack( "<4s4H2IH", b"PK\x05\x06", # signature 0, # disk number 0, # disk with central dir 1, # entries on this disk 1, # total entries central_size, # size of central directory central_offset, # offset of central directory 0, # comment length ) data = local_header + central_header + eocd p = tmp_path / "nullbyte.zip" p.write_bytes(data) return p @pytest.fixture() def zip64_inconsistency_archive(tmp_path): """A ZIP with a ZIP64 extra field that disagrees with the central directory. We craft a minimal archive where the ZIP64 extra field reports a size of 999_999_999 bytes but the 32-bit central directory field reports 100 bytes. Python will use the 32-bit value (100), but our ZIP64 check sees 999_999_999 and raises MalformedArchiveError. """ filename = b"test.txt" fname_len = len(filename) content = b"A" * 100 # ZIP64 extra field reporting a huge uncompressed size zip64_uncompressed = 999_999_999 zip64_extra = struct.pack( " 0 def test_overlap_check_does_not_decompress(self, fifield_bomb_archive, tmp_path): with pytest.raises(MalformedArchiveError, match="overlapping"): SafeZipFile(fifield_bomb_archive, max_per_member_ratio=100_000.0) def _lfh(filename: bytes, data: bytes, compress_type: int = 0) -> bytes: """Build a Local File Header + data.""" return ( struct.pack( " bytes: """Build a Central Directory Header.""" return ( struct.pack( " bytes: """Build an End of Central Directory record.""" return ( struct.pack( " bytes: """Build a well-formed zip from (filename, data) pairs.""" lfhs, cdhs = [], [] cursor = 0 for fname, data in files: lfh = _lfh(fname, data) cdhs.append(_cdh(fname, data, cursor)) lfhs.append(lfh) cursor += len(lfh) cd = b"".join(cdhs) return b"".join(lfhs) + cd + _eocd(len(files), len(cd), cursor) def _build_overlap_zip(fname_a: bytes, fname_b: bytes, data: bytes) -> bytes: """Build a zip where two CDH entries point to the same LFH offset.""" lfh = _lfh(fname_a, data) cdh1 = _cdh(fname_a, data, 0) cdh2 = _cdh(fname_b, data, 0) cd = cdh1 + cdh2 return lfh + cd + _eocd(2, len(cd), len(lfh)) class TestZipInspector: """Tests for the ZipInspector overlap detection.""" def _scan(self, data: bytes) -> ScanResult: return ZipInspector(io.BytesIO(data)).scan() def test_clean_single_file(self): data = _build_zip((b"readme.txt", b"hello")) result = self._scan(data) assert result.is_bomb is False def test_clean_two_files_sequential(self): data = _build_zip( (b"a.txt", b"first file contents"), (b"b.txt", b"second file contents"), ) assert self._scan(data).is_bomb is False def test_clean_many_files(self): files = [(f"file{i}.txt".encode(), f"content {i}".encode()) for i in range(50)] data = _build_zip(*files) assert self._scan(data).is_bomb is False def test_clean_empty_file_entry(self): data = _build_zip((b"empty", b"")) assert self._scan(data).is_bomb is False def test_overlap_two_cdh_same_offset(self): data = _build_overlap_zip(b"a", b"b", b"kernel data") assert self._scan(data).is_bomb is True def test_overlap_detail_is_populated(self): data = _build_overlap_zip(b"x", b"y", b"data") result = self._scan(data) assert result.is_bomb is True assert result.overlap_detail is not None def test_overlap_at_offset_zero(self): """Entries with data_start=0 should still be detected as overlapping.""" lfh1 = _lfh(b"a", b"data") cdh1 = _cdh(b"a", b"data", 0) cdh2 = _cdh(b"b", b"data", 0) cd = cdh1 + cdh2 data = lfh1 + cd + _eocd(2, len(cd), len(lfh1)) result = self._scan(data) assert result.is_bomb is True def test_invalid_not_a_zip(self): result = self._scan(b"this is not a zip file at all") assert result.is_bomb is None def test_invalid_empty_bytes(self): result = self._scan(b"") assert result.is_bomb is None def test_invalid_truncated_eocd(self): result = self._scan(b"PK\x05\x06\x00\x00") assert result.is_bomb is None def test_invalid_garbage_with_pk_bytes(self): result = self._scan(b"\x00" * 100 + b"PK\x05\x06" + b"\xff" * 18) assert result.is_bomb is None def test_invalid_cdh_signature_mismatch(self): raw = bytearray(_build_zip((b"f", b"data"))) cdh_pos = raw.find(b"PK\x01\x02") raw[cdh_pos] = 0xFF assert self._scan(bytes(raw)).is_bomb is None def test_invalid_lfh_signature_mismatch(self): raw = bytearray(_build_zip((b"f", b"data"))) raw[0] = 0xFF assert self._scan(bytes(raw)).is_bomb is None def test_gap_does_not_trigger_bomb(self): gap = b"\x00" * 16 lfh1 = _lfh(b"a", b"data1") lfh2 = _lfh(b"b", b"data2") off1 = 0 off2 = len(lfh1) + len(gap) cdh1 = _cdh(b"a", b"data1", off1) cdh2 = _cdh(b"b", b"data2", off2) cd = cdh1 + cdh2 raw = lfh1 + gap + lfh2 + cd + _eocd(2, len(cd), off2 + len(lfh2)) assert self._scan(raw).is_bomb is False def test_leading_bytes_not_a_bomb(self): prefix = b"\x00" * 32 lfh = _lfh(b"x", b"payload") cdh = _cdh(b"x", b"payload", len(prefix)) cd = cdh raw = prefix + lfh + cd + _eocd(1, len(cd), len(prefix) + len(lfh)) assert self._scan(raw).is_bomb is False def test_zip_with_comment(self): raw = _build_zip((b"x.txt", b"data")) eocd_pos = raw.rfind(b"PK\x05\x06") comment = b"this is a zip comment" head = raw[:eocd_pos] eocd = raw[eocd_pos:] new_eocd = eocd[:20] + struct.pack(" max_file_size buf = io.BytesIO() with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_STORED) as zf: zf.writestr("data.bin", b"X" * 2000) p = tmp_path / "lie.zip" p.write_bytes(buf.getvalue()) dest = tmp_path / "out" dest.mkdir() with ( pytest.raises(FileSizeExceededError), SafeZipFile(p, max_file_size=500) as zf, ): zf.extractall(dest) def test_many_files_bomb_blocked(self, many_files_archive, tmp_path): """Archive with too many files is blocked at the Guard phase.""" with pytest.raises(FileCountExceededError): SafeZipFile(many_files_archive) class TestExplicitPathRequirement: """extractall must receive an explicit path; CWD is never used silently.""" def test_extractall_requires_path(self, legitimate_archive, tmp_path): """extractall with a valid path works; calling without is a TypeError.""" dest = tmp_path / "out" dest.mkdir() with SafeZipFile(legitimate_archive) as zf: zf.extractall(dest) # must not raise assert (dest / "hello.txt").exists() def test_extractall_wrong_type_raises(self, legitimate_archive): """Passing None as path raises TypeError.""" with ( SafeZipFile(legitimate_archive) as zf, pytest.raises((TypeError, AttributeError)), ): zf.extractall(None) def test_extract_with_none_path_raises(self, legitimate_archive): """Passing None as path to extract() raises TypeError.""" with SafeZipFile(legitimate_archive) as zf, pytest.raises(TypeError): zf.extract("hello.txt", None) def test_extractall_with_members_list(self, legitimate_archive, tmp_path): """extractall with a members list extracts only those members.""" dest = tmp_path / "out" dest.mkdir() with SafeZipFile(legitimate_archive) as zf: zf.extractall(dest, members=["hello.txt"]) # Only hello.txt should exist assert (dest / "hello.txt").exists() contents = list(dest.rglob("*")) assert len(contents) == 1 class TestMalformedArchive: """Structurally invalid archives raise MalformedArchiveError.""" def test_not_a_zip_raises_malformed(self, tmp_path): """A file that is not a ZIP at all raises MalformedArchiveError.""" bad = tmp_path / "bad.zip" bad.write_bytes(b"this is not a zip file") with pytest.raises(MalformedArchiveError): SafeZipFile(bad) def test_zip64_inconsistency_raises(self, zip64_inconsistency_archive): """ZIP64 extra field that disagrees with central directory is rejected.""" with pytest.raises(MalformedArchiveError): SafeZipFile(zip64_inconsistency_archive) class TestFifieldBomb: """End-to-end: Fifield-style zip bomb is blocked at Guard phase.""" def test_fifield_bomb_blocked_end_to_end(self, fifield_bomb_archive, tmp_path): dest = tmp_path / "out" dest.mkdir() with ( pytest.raises(MalformedArchiveError), SafeZipFile(fifield_bomb_archive) as zf, ): zf.extractall(dest) remaining = [f for f in dest.rglob("*") if not f.is_dir()] assert not remaining def test_security_event_fires_on_fifield_bomb(self, fifield_bomb_archive, tmp_path): """on_security_event callback receives 'malformed_archive' for Fifield bomb.""" events = [] dest = tmp_path / "out" dest.mkdir() with pytest.raises(MalformedArchiveError): SafeZipFile(fifield_bomb_archive, on_security_event=events.append) assert any(e.event_type == "malformed_archive" for e in events) def test_fifield_bomb_as_bytesio_rejected(self, fifield_bomb_archive): """Fifield bomb as BytesIO is rejected.""" data = fifield_bomb_archive.read_bytes() bio = io.BytesIO(data) with pytest.raises(MalformedArchiveError): SafeZipFile(bio) def test_legitimate_archive_as_bytesio_passes(self, legitimate_archive): """Legitimate archive as BytesIO passes.""" data = legitimate_archive.read_bytes() bio = io.BytesIO(data) with SafeZipFile(bio) as zf: assert len(zf.namelist()) > 0 def test_fifield_bomb_bytesio_event_fires(self, fifield_bomb_archive): """on_security_event fires for in-memory Fifield bomb.""" events = [] data = fifield_bomb_archive.read_bytes() bio = io.BytesIO(data) with pytest.raises(MalformedArchiveError): SafeZipFile(bio, on_security_event=events.append) assert any(e.event_type == "malformed_archive" for e in events) class TestSecurityEventCoverage: """on_security_event callback fires for all security violation types.""" def test_callback_fires_on_path_traversal(self, zipslip_archive, tmp_path): events = [] dest = tmp_path / "out" dest.mkdir() with ( pytest.raises(UnsafeZipError), SafeZipFile(zipslip_archive, on_security_event=events.append) as zf, ): zf.extractall(dest) assert any(e.event_type == "zip_slip_detected" for e in events) def test_callback_fires_on_file_size_exceeded(self, tmp_path): buf = io.BytesIO() with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_STORED) as zf: zf.writestr("data.bin", b"A" * 1000) p = tmp_path / "large.zip" p.write_bytes(buf.getvalue()) dest = tmp_path / "out" dest.mkdir() events = [] with ( pytest.raises(FileSizeExceededError), SafeZipFile(p, max_file_size=500, on_security_event=events.append) as zf, ): zf.extractall(dest) # The Guard may fire "declared_size_exceeded" (declared header size > # limit) or the Streamer may fire "file_size_exceeded" (actual # decompressed bytes > limit). Both indicate a file-size violation. size_events = {"file_size_exceeded", "declared_size_exceeded"} assert any(e.event_type in size_events for e in events) def test_callback_fires_on_ratio_exceeded(self, high_ratio_archive, tmp_path): events = [] dest = tmp_path / "out" dest.mkdir() with ( pytest.raises(CompressionRatioError), SafeZipFile( high_ratio_archive, max_per_member_ratio=10.0, on_security_event=events.append, ) as zf, ): zf.extractall(dest) assert any(e.event_type == "compression_ratio_exceeded" for e in events) def test_callback_fires_on_file_count_exceeded(self, many_files_archive, tmp_path): events = [] with pytest.raises(FileCountExceededError): SafeZipFile(many_files_archive, on_security_event=events.append) assert any(e.event_type == "file_count_exceeded" for e in events) def test_callback_fires_on_symlink_rejected(self, symlink_archive, tmp_path): events = [] dest = tmp_path / "out" dest.mkdir() with ( pytest.raises(UnsafeZipError), SafeZipFile(symlink_archive, on_security_event=events.append) as zf, ): zf.extractall(dest) assert any(e.event_type == "symlink_rejected" for e in events) class TestLegitimateExtraction: """Well-formed archives extract correctly and completely.""" def test_all_files_extracted(self, legitimate_archive, tmp_path): dest = tmp_path / "out" dest.mkdir() with SafeZipFile(legitimate_archive) as zf: zf.extractall(dest) assert (dest / "hello.txt").read_bytes() == b"Hello, world!\n" assert (dest / "subdir" / "data.txt").read_bytes() == b"Some data\n" assert (dest / "subdir" / "nested" / "deep.txt").read_bytes() == b"Deep file\n" def test_safe_extract_convenience(self, legitimate_archive, tmp_path): dest = tmp_path / "out" dest.mkdir() safe_extract(legitimate_archive, dest) assert (dest / "hello.txt").exists() def test_context_manager_closes_properly(self, legitimate_archive, tmp_path): dest = tmp_path / "out" dest.mkdir() with SafeZipFile(legitimate_archive) as zf: zf.extractall(dest) # After context exit, the underlying ZipFile's fp should be None (closed). # zipfile.ZipFile.close() sets self.fp = None. assert zf._zf.fp is None class TestSecurityEventCallback: """on_security_event callback is called on security events.""" def test_callback_called_on_zip_slip(self, zipslip_archive, tmp_path): events = [] def capture(event): events.append(event) dest = tmp_path / "out" dest.mkdir() with ( pytest.raises(UnsafeZipError), SafeZipFile(zipslip_archive, on_security_event=capture) as zf, ): zf.extractall(dest) # Note: callback is called for monitored events during extraction; # path traversal may be detected in sandbox before callback fires. # The test verifies no crash occurs. def test_callback_exception_does_not_swallow_security_error( self, zipslip_archive, tmp_path ): def broken_callback(event): raise RuntimeError("callback broken") dest = tmp_path / "out" dest.mkdir() # The UnsafeZipError must still propagate even if callback raises with ( pytest.raises(UnsafeZipError), SafeZipFile(zipslip_archive, on_security_event=broken_callback) as zf, ): zf.extractall(dest) class TestNestingDepthLimit: """SafeZipFile refuses instantiation when _nesting_depth exceeds the limit.""" def test_nesting_depth_exceeded_raises(self, legitimate_archive): """_nesting_depth > max_nesting_depth raises NestingDepthError.""" with pytest.raises(NestingDepthError): SafeZipFile(legitimate_archive, max_nesting_depth=3, _nesting_depth=4) def test_nesting_depth_at_limit_passes(self, legitimate_archive): """_nesting_depth == max_nesting_depth is allowed.""" with SafeZipFile(legitimate_archive, max_nesting_depth=3, _nesting_depth=3): pass def test_nesting_depth_zero_always_passes(self, legitimate_archive): """Default _nesting_depth=0 never raises.""" with SafeZipFile(legitimate_archive): pass def test_nesting_depth_env_var_respected(self, legitimate_archive, monkeypatch): """SAFEZIP_MAX_NESTING_DEPTH env var is honoured when no constructor arg is given.""" monkeypatch.setenv("SAFEZIP_MAX_NESTING_DEPTH", "1") # depth=2 > env-var limit of 1 → should raise with pytest.raises(NestingDepthError): SafeZipFile(legitimate_archive, _nesting_depth=2) def test_nesting_depth_exceeded_event(self, legitimate_archive): """nesting_depth_exceeded event is emitted when depth exceeds limit.""" events = [] with pytest.raises(NestingDepthError): SafeZipFile( legitimate_archive, max_nesting_depth=1, _nesting_depth=2, on_security_event=events.append, ) assert any(e.event_type == "nesting_depth_exceeded" for e in events) class TestNestedArchiveGuard: """Nested archive members are extracted as raw files, not recursed into.""" def test_inner_zip_extracted_as_raw_file(self, tmp_path): inner_buf = io.BytesIO() with zipfile.ZipFile(inner_buf, "w") as inner_zf: inner_zf.writestr("secret.txt", b"inner content") inner_bytes = inner_buf.getvalue() outer_buf = io.BytesIO() with zipfile.ZipFile(outer_buf, "w") as outer_zf: outer_zf.writestr("readme.txt", b"outer content") outer_zf.writestr("nested.zip", inner_bytes) outer_p = tmp_path / "outer.zip" outer_p.write_bytes(outer_buf.getvalue()) dest = tmp_path / "out" dest.mkdir() with SafeZipFile(outer_p) as zf: zf.extractall(dest) # The nested.zip should be present as a raw file, not recursed assert (dest / "nested.zip").exists() assert (dest / "nested.zip").read_bytes() == inner_bytes # The inner secret.txt should NOT be extracted assert not (dest / "secret.txt").exists() class TestRecursiveNestingDepthIntegration: """Real zip-within-zip recursion is stopped at max_nesting_depth. These tests use an actual nested archive and a realistic recursive extraction helper to verify that the guard fires in practice, not just when the counter is poked directly. """ @staticmethod def _build_nested_zip(levels: int) -> bytes: """Return bytes of a zip nested *levels* deep. The innermost zip contains ``secret.txt``. Every outer layer wraps the previous one as ``inner.zip`` plus a ``readme.txt`` so there is always a regular file at each level too. """ buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("secret.txt", b"innermost content") data = buf.getvalue() for _ in range(levels - 1): buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("readme.txt", b"outer level content") zf.writestr("inner.zip", data) data = buf.getvalue() return data @staticmethod def _recursive_extract(zip_path, dest, *, depth=0, max_nesting_depth=2): """Minimal recursive extractor that passes *depth* to SafeZipFile. This is the pattern a caller must follow to get nesting protection. SafeZipFile raises NestingDepthError before opening the archive when *depth* exceeds *max_nesting_depth*. """ with SafeZipFile( zip_path, max_nesting_depth=max_nesting_depth, _nesting_depth=depth, ) as zf: zf.extractall(dest) for name in zf.namelist(): if name.endswith(".zip"): nested_src = dest / name nested_dest = dest / (name[:-4] + "_contents") nested_dest.mkdir() TestRecursiveNestingDepthIntegration._recursive_extract( nested_src, nested_dest, depth=depth + 1, max_nesting_depth=max_nesting_depth, ) def test_recursive_extraction_stopped_at_depth_limit(self, tmp_path): """Recursion into a 3-level archive raises NestingDepthError at level 3. Archive layout:: outer.zip (depth 0 — opened fine) readme.txt inner.zip (depth 1 — opened fine) readme.txt inner.zip (depth 2 — raises, exceeds max_nesting_depth=1) secret.txt """ outer_p = tmp_path / "outer.zip" outer_p.write_bytes(self._build_nested_zip(3)) dest = tmp_path / "out" dest.mkdir() with pytest.raises(NestingDepthError): self._recursive_extract(outer_p, dest, max_nesting_depth=1) def test_recursive_extraction_succeeds_within_limit(self, tmp_path): """Recursion within the depth limit extracts every level successfully. With max_nesting_depth=2 and a 3-level archive (depths 0, 1, 2), all levels are within the limit and secret.txt reaches disk. """ outer_p = tmp_path / "outer.zip" outer_p.write_bytes(self._build_nested_zip(3)) dest = tmp_path / "out" dest.mkdir() self._recursive_extract(outer_p, dest, max_nesting_depth=2) innermost = dest / "inner_contents" / "inner_contents" / "secret.txt" assert innermost.read_bytes() == b"innermost content" class TestBuiltinRecursiveExtraction: """SafeZipFile with recursive=True auto-descends into nested zip members.""" @staticmethod def _build_zip(members: list[tuple[str, bytes]]) -> bytes: buf = io.BytesIO() with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf: for name, content in members: zf.writestr(name, content) return buf.getvalue() def test_recursive_false_is_default_raw_blob(self, tmp_path): """recursive=False (default) leaves nested zips as raw files.""" inner = self._build_zip([("secret.txt", b"inner")]) outer_p = tmp_path / "outer.zip" outer_p.write_bytes(self._build_zip([("inner.zip", inner)])) dest = tmp_path / "out" dest.mkdir() with SafeZipFile(outer_p) as zf: zf.extractall(dest) assert (dest / "inner.zip").exists() assert not (dest / "inner" / "secret.txt").exists() def test_recursive_extracts_nested_content(self, tmp_path): """recursive=True descends into inner.zip and extracts its content.""" inner = self._build_zip([("secret.txt", b"inner content")]) outer_p = tmp_path / "outer.zip" outer_p.write_bytes( self._build_zip([("readme.txt", b"outer"), ("inner.zip", inner)]) ) dest = tmp_path / "out" dest.mkdir() with SafeZipFile(outer_p, recursive=True) as zf: zf.extractall(dest) assert (dest / "readme.txt").read_bytes() == b"outer" assert (dest / "inner" / "secret.txt").read_bytes() == b"inner content" assert not (dest / "inner.zip").exists() def test_recursive_depth_limit_raises(self, tmp_path): """recursive=True stops at max_nesting_depth and raises NestingDepthError.""" # 3-level deep: outer -> middle.zip -> inner.zip -> secret.txt innermost = self._build_zip([("secret.txt", b"deep")]) middle = self._build_zip([("inner.zip", innermost)]) outer_p = tmp_path / "outer.zip" outer_p.write_bytes(self._build_zip([("middle.zip", middle)])) dest = tmp_path / "out" dest.mkdir() # max_nesting_depth=1 allows depth 0 and 1; opening depth-2 raises with ( pytest.raises(NestingDepthError), SafeZipFile(outer_p, recursive=True, max_nesting_depth=1) as zf, ): zf.extractall(dest) def test_recursive_file_size_enforced_in_nested_zip(self, tmp_path): """File size limit applies inside nested zips when recursive=True.""" inner = self._build_zip([("big.txt", b"A" * 2000)]) outer_p = tmp_path / "outer.zip" outer_p.write_bytes(self._build_zip([("inner.zip", inner)])) dest = tmp_path / "out" dest.mkdir() with ( pytest.raises(FileSizeExceededError), SafeZipFile(outer_p, recursive=True, max_file_size=500) as zf, ): zf.extractall(dest) def test_recursive_traversal_in_nested_zip_blocked(self, tmp_path): """Path traversal inside a nested zip is blocked when recursive=True.""" inner = self._build_zip([("../../evil.txt", b"escaped")]) outer_p = tmp_path / "outer.zip" outer_p.write_bytes(self._build_zip([("inner.zip", inner)])) dest = tmp_path / "out" dest.mkdir() with ( pytest.raises(UnsafeZipError), SafeZipFile(outer_p, recursive=True) as zf, ): zf.extractall(dest) assert not (tmp_path / "evil.txt").exists() def test_recursive_mixed_members(self, tmp_path): """Regular files and nested zips are both handled correctly.""" inner = self._build_zip([("data.txt", b"nested data")]) outer_p = tmp_path / "outer.zip" outer_p.write_bytes( self._build_zip( [ ("top.txt", b"top level"), ("pkg.zip", inner), ] ) ) dest = tmp_path / "out" dest.mkdir() with SafeZipFile(outer_p, recursive=True) as zf: zf.extractall(dest) assert (dest / "top.txt").read_bytes() == b"top level" assert (dest / "pkg" / "data.txt").read_bytes() == b"nested data" assert not (dest / "pkg.zip").exists() def test_recursive_content_detection_bypasses_extension(self, tmp_path): """A nested ZIP named with a non-ZIP extension is still recursed into when recursive=True (content-based detection).""" inner = self._build_zip([("secret.txt", b"inner content")]) outer_buf = io.BytesIO() with zipfile.ZipFile(outer_buf, "w") as zf: zf.writestr("data.csv", inner) outer_p = tmp_path / "outer.zip" outer_p.write_bytes(outer_buf.getvalue()) dest = tmp_path / "out" dest.mkdir() with SafeZipFile(outer_p, recursive=True) as zf: zf.extractall(dest) # .csv is not a known archive extension, so directory name stays as-is assert (dest / "data.csv" / "secret.txt").read_bytes() == b"inner content" def test_recursive_non_zip_with_zip_extension_not_recursed(self, tmp_path): """A file named .zip that is not actually a ZIP is extracted as a plain file.""" outer_buf = io.BytesIO() with zipfile.ZipFile(outer_buf, "w") as zf: zf.writestr("fake.zip", b"this is not a zip file at all") outer_p = tmp_path / "outer.zip" outer_p.write_bytes(outer_buf.getvalue()) dest = tmp_path / "out" dest.mkdir() with SafeZipFile(outer_p, recursive=True) as zf: zf.extractall(dest) assert (dest / "fake.zip").read_bytes() == b"this is not a zip file at all" class TestPermissionSanitisation: """Dangerous Unix permission bits are stripped from extracted files.""" def test_setuid_stripped_by_default(self, setuid_archive, tmp_path): """setuid bit is stripped by default.""" dest = tmp_path / "out" dest.mkdir() with SafeZipFile(setuid_archive) as zf: zf.extractall(dest) mode = (dest / "suid_binary").stat().st_mode assert not (mode & stat.S_ISUID), "setuid bit must be stripped by default" def test_normal_permissions_unaffected(self, legitimate_archive, tmp_path): """Stripping special bits does not affect normal file access.""" dest = tmp_path / "out" dest.mkdir() with SafeZipFile(legitimate_archive) as zf: zf.extractall(dest) for f in dest.rglob("*"): if f.is_file(): assert f.stat().st_mode & stat.S_IRUSR class TestSymlinkPolicy: """SafeZipFile enforces the configured SymlinkPolicy for ZIP symlink entries. A ZIP symlink entry is identified by the upper 16 bits of ``ZipInfo.external_attr`` carrying a Unix ``S_IFLNK`` file mode. The entry's data bytes contain the link target path. """ def test_reject_is_default(self, symlink_archive, tmp_path): """Default policy (REJECT) raises UnsafeZipError on any symlink entry.""" dest = tmp_path / "out" dest.mkdir() with pytest.raises(UnsafeZipError), SafeZipFile(symlink_archive) as zf: zf.extractall(dest) def test_reject_explicit_raises(self, symlink_archive, tmp_path): """Explicit REJECT policy raises UnsafeZipError on a symlink entry.""" dest = tmp_path / "out" dest.mkdir() with ( pytest.raises(UnsafeZipError), SafeZipFile(symlink_archive, symlink_policy=SymlinkPolicy.REJECT) as zf, ): zf.extractall(dest) def test_ignore_skips_symlink_entry(self, symlink_archive, tmp_path): """IGNORE policy silently skips symlink entries; no file is created.""" dest = tmp_path / "out" dest.mkdir() with SafeZipFile(symlink_archive, symlink_policy=SymlinkPolicy.IGNORE) as zf: zf.extractall(dest) # The symlink entry must not appear on disk assert not (dest / "link.txt").exists() def test_ignore_preserves_regular_files(self, symlink_archive, tmp_path): """IGNORE policy skips symlinks but still extracts regular entries.""" dest = tmp_path / "out" dest.mkdir() with SafeZipFile(symlink_archive, symlink_policy=SymlinkPolicy.IGNORE) as zf: zf.extractall(dest) assert (dest / "readme.txt").read_bytes() == b"safe content\n" def test_resolve_internal_extracts_target_as_file(self, symlink_archive, tmp_path): """RESOLVE_INTERNAL extracts the symlink target path as a regular file. Because the ZIP entry's content is the target string (not an OS symlink), the extracted file is a plain file containing that string. The post-extraction symlink check only fires when the OS creates an actual symlink (not applicable here), so extraction succeeds. """ dest = tmp_path / "out" dest.mkdir() with SafeZipFile( symlink_archive, symlink_policy=SymlinkPolicy.RESOLVE_INTERNAL ) as zf: zf.extractall(dest) # The entry is written as a regular file containing the target path extracted = dest / "link.txt" assert extracted.exists() assert not extracted.is_symlink() assert extracted.read_text() == "../escape.txt" class TestCompressSizeZero: """compress_size == 0 only occurs legitimately for empty members. Python's zipfile uses the central directory compress_size to control how many bytes it reads during decompression. A non-empty member with compress_size=0 in the CD causes zipfile to read 0 bytes and then fail the CRC check (BadZipFile), so it never reaches the streamer's ratio logic. The only reachable case is a genuinely empty member, for which skipping the ratio check is correct — there is nothing to decompress. """ def test_empty_member_skips_ratio_check_correctly( self, data_descriptor_empty_archive, tmp_path ): """Empty member (compress_size=0) extracts successfully even with a tight ratio limit. Skipping the ratio check is correct behaviour.""" dest = tmp_path / "out" dest.mkdir() with zipfile.ZipFile(data_descriptor_empty_archive) as zf: info = zf.infolist()[0] assert info.compress_size == 0 assert info.file_size == 0 with SafeZipFile(data_descriptor_empty_archive, max_per_member_ratio=1.0) as zf: zf.extractall(dest) assert (dest / "empty.txt").read_bytes() == b"" def test_nonempty_with_zero_cd_compress_size_rejected_by_zipfile( self, data_descriptor_invalid_bomb_archive, tmp_path ): """A crafted archive with compress_size=0 in the CD but non-empty data is rejected by Python's zipfile with BadZipFile before the streamer's ratio logic is even reached. The gap is not exploitable through Python's zipfile layer.""" dest = tmp_path / "out" dest.mkdir() # Verify the CD does report compress_size=0 despite non-empty content. with zipfile.ZipFile(data_descriptor_invalid_bomb_archive) as zf: info = zf.infolist()[0] assert info.compress_size == 0 assert info.file_size > 0 # SafeZipFile opens fine (Guard sees compress_size=0, file_size=2000, # both within limits). BadZipFile is raised by zipfile's CRC check # during streaming — before safezip's ratio logic is ever reached. with ( pytest.raises(zipfile.BadZipFile), SafeZipFile(data_descriptor_invalid_bomb_archive) as zf, ): zf.extractall(dest) # No partial files left. remaining = [f for f in dest.rglob("*") if not f.is_dir()] assert not remaining class TestEnvVarHandling: """Environment variable parsing edge cases.""" def test_invalid_symlink_policy_env(self, legitimate_archive, monkeypatch, caplog): """Invalid symlink policy is logged and defaults to REJECT.""" monkeypatch.setenv("SAFEZIP_SYMLINK_POLICY", "invalid_policy") with SafeZipFile(legitimate_archive, symlink_policy=None) as zf: assert zf._symlink_policy == SymlinkPolicy.REJECT assert "Ignoring unrecognised" in caplog.text def test_env_var_read_at_import_time(self, monkeypatch): """Changing env vars after import does not affect cached defaults. The module-level singletons (_DEFAULT_*) are evaluated once at import time. Late env changes do not alter limits on new SafeZipFile instances. """ import safezip._core as _core original_default = _core._DEFAULT_MAX_FILES monkeypatch.setenv("SAFEZIP_MAX_FILES", "99") assert original_default == _core._DEFAULT_MAX_FILES src/safezip/tests/test_sandbox.py ================================= src/safezip/tests/test_sandbox.py """Tests for Phase B: path resolution and symlink policy (the Sandbox).""" import pytest from safezip import UnsafeZipError from safezip._sandbox import resolve_member_path __author__ = "Artur Barseghyan " __copyright__ = "2026 Artur Barseghyan" __license__ = "MIT" class TestPathTraversal: """resolve_member_path rejects all forms of path traversal.""" def test_dotdot_relative(self, tmp_path): with pytest.raises(UnsafeZipError, match="traversal"): resolve_member_path(tmp_path, "../../evil.txt") def test_dotdot_in_middle(self, tmp_path): with pytest.raises(UnsafeZipError, match="traversal"): resolve_member_path(tmp_path, "subdir/../../../evil.txt") def test_dotdot_windows_style(self, tmp_path): with pytest.raises(UnsafeZipError, match="traversal"): resolve_member_path(tmp_path, "subdir\\..\\..\\evil.txt") def test_absolute_unix_path(self, tmp_path): with pytest.raises(UnsafeZipError): resolve_member_path(tmp_path, "/etc/passwd") def test_absolute_windows_path(self, tmp_path): with pytest.raises(UnsafeZipError): resolve_member_path(tmp_path, "C:\\Windows\\System32\\cmd.exe") def test_unc_path(self, tmp_path): with pytest.raises(UnsafeZipError): resolve_member_path(tmp_path, "//server/share/evil.txt") class TestNullByte: """resolve_member_path rejects filenames with null bytes.""" def test_null_byte_rejected(self, tmp_path): with pytest.raises(UnsafeZipError): resolve_member_path(tmp_path, "safe\x00../../etc/passwd") def test_null_byte_at_start(self, tmp_path): with pytest.raises(UnsafeZipError): resolve_member_path(tmp_path, "\x00evil.txt") class TestLegitimateFilenames: """resolve_member_path accepts well-formed filenames.""" def test_simple_filename(self, tmp_path): result = resolve_member_path(tmp_path, "hello.txt") assert result == tmp_path / "hello.txt" def test_nested_filename(self, tmp_path): result = resolve_member_path(tmp_path, "subdir/data.txt") assert result == tmp_path / "subdir" / "data.txt" def test_deep_nested(self, tmp_path): result = resolve_member_path(tmp_path, "a/b/c/d/e.txt") assert result == tmp_path / "a" / "b" / "c" / "d" / "e.txt" def test_windows_separator_legitimate(self, tmp_path): """Windows-style separators are normalised to forward slashes.""" result = resolve_member_path(tmp_path, "subdir\\file.txt") assert result == tmp_path / "subdir" / "file.txt" def test_result_is_inside_base(self, tmp_path): result = resolve_member_path(tmp_path, "subdir/file.txt") assert str(result).startswith(str(tmp_path)) def test_unicode_filename(self, tmp_path): result = resolve_member_path(tmp_path, "données/résumé.txt") assert result.name == "résumé.txt" def test_leading_slash_rejected(self, tmp_path): """A leading slash is treated as an absolute path and rejected.""" with pytest.raises(UnsafeZipError, match="Absolute path"): resolve_member_path(tmp_path, "/file.txt") def test_dot_components_stripped(self, tmp_path): result = resolve_member_path(tmp_path, "./subdir/./file.txt") assert result == tmp_path / "subdir" / "file.txt" def test_empty_parts_stripped(self, tmp_path): result = resolve_member_path(tmp_path, "subdir//file.txt") assert result == tmp_path / "subdir" / "file.txt" class TestPathLengthLimit: """resolve_member_path rejects excessively long paths.""" def test_very_long_filename_rejected(self, tmp_path): long_name = "a" * 5000 + ".txt" with pytest.raises(UnsafeZipError, match="too long"): resolve_member_path(tmp_path, long_name) src/safezip/tests/test_streamer.py ================================== src/safezip/tests/test_streamer.py """Tests for Phase C: streaming extraction (the Streamer).""" import io import zipfile import pytest from safezip import ( CompressionRatioError, FileSizeExceededError, MalformedArchiveError, SafeZipFile, ) __author__ = "Artur Barseghyan " __copyright__ = "2026 Artur Barseghyan" __license__ = "MIT" class TestFileSizeLimit: """Streamer enforces per-member file size limits at stream time.""" def test_size_exceeded_raises(self, tmp_path): buf = io.BytesIO() with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_STORED) as zf: zf.writestr("data.bin", b"A" * 1000) p = tmp_path / "large.zip" p.write_bytes(buf.getvalue()) dest = tmp_path / "out" dest.mkdir() with ( pytest.raises(FileSizeExceededError), SafeZipFile(p, max_file_size=500) as zf, ): zf.extractall(dest) def test_no_partial_file_after_size_failure(self, tmp_path): """Atomic write: no partial file must remain after FileSizeExceededError.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_STORED) as zf: zf.writestr("data.bin", b"A" * 1000) p = tmp_path / "large.zip" p.write_bytes(buf.getvalue()) dest = tmp_path / "out" dest.mkdir() with ( pytest.raises(FileSizeExceededError), SafeZipFile(p, max_file_size=500) as zf, ): zf.extractall(dest) # No partial files or temp files should remain remaining = list(dest.rglob("*")) assert not remaining, f"Partial files found: {remaining}" def test_size_at_limit_passes(self, tmp_path): buf = io.BytesIO() with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_STORED) as zf: zf.writestr("data.bin", b"A" * 100) p = tmp_path / "ok.zip" p.write_bytes(buf.getvalue()) dest = tmp_path / "out" dest.mkdir() with SafeZipFile(p, max_file_size=100) as zf: zf.extractall(dest) assert (dest / "data.bin").read_bytes() == b"A" * 100 class TestTotalSizeLimit: """Streamer enforces cumulative total size across all members.""" def test_total_size_exceeded(self, tmp_path): """Total size limit enforced during Guard phase when limits are threaded.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_STORED) as zf: for i in range(5): zf.writestr(f"file_{i}.bin", b"A" * 300) p = tmp_path / "multi.zip" p.write_bytes(buf.getvalue()) dest = tmp_path / "out" dest.mkdir() with pytest.raises(MalformedArchiveError): SafeZipFile(p, max_file_size=1000, max_total_size=1000) class TestCompressionRatioLimit: """Streamer enforces per-member and total compression ratio limits.""" def test_per_member_ratio_exceeded(self, high_ratio_archive, tmp_path): """High-ratio archive (zeros) triggers per-member ratio check.""" dest = tmp_path / "out" dest.mkdir() with ( pytest.raises(CompressionRatioError), SafeZipFile(high_ratio_archive, max_per_member_ratio=10.0) as zf, ): zf.extractall(dest) def test_no_partial_file_after_ratio_failure(self, high_ratio_archive, tmp_path): """Atomic write: no partial file must remain after CompressionRatioError.""" dest = tmp_path / "out" dest.mkdir() with ( pytest.raises(CompressionRatioError), SafeZipFile(high_ratio_archive, max_per_member_ratio=10.0) as zf, ): zf.extractall(dest) remaining = [f for f in dest.rglob("*") if not f.is_dir()] assert not remaining, f"Partial files found: {remaining}" def test_high_ratio_passes_with_generous_limit(self, high_ratio_archive, tmp_path): """Same archive passes if we allow a high ratio (both per-member and total).""" dest = tmp_path / "out" dest.mkdir() with SafeZipFile( high_ratio_archive, max_per_member_ratio=2000.0, max_total_ratio=2000.0, max_file_size=5 * 1024 * 1024, ) as zf: zf.extractall(dest) assert (dest / "zeros.bin").exists() class TestAtomicWrite: """Extraction destinations are created atomically.""" def test_successful_extraction_creates_file(self, tmp_path): buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("output.txt", b"hello safezip") p = tmp_path / "ok.zip" p.write_bytes(buf.getvalue()) dest = tmp_path / "out" dest.mkdir() with SafeZipFile(p) as zf: zf.extractall(dest) assert (dest / "output.txt").read_bytes() == b"hello safezip" def test_extract_single_member(self, tmp_path): buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("a.txt", b"AAA") zf.writestr("b.txt", b"BBB") p = tmp_path / "two.zip" p.write_bytes(buf.getvalue()) dest = tmp_path / "out" dest.mkdir() with SafeZipFile(p) as zf: zf.extract("a.txt", dest) assert (dest / "a.txt").read_bytes() == b"AAA" assert not (dest / "b.txt").exists() def test_no_temp_files_after_success(self, tmp_path): buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("hello.txt", b"world") p = tmp_path / "ok.zip" p.write_bytes(buf.getvalue()) dest = tmp_path / "out" dest.mkdir() with SafeZipFile(p) as zf: zf.extractall(dest) all_files = list(dest.rglob("*")) temp_files = [f for f in all_files if ".safezip_tmp_" in f.name] assert not temp_files