Project source-tree

Below is the layout of the project (to 10 levels), followed by the contents of each key file.

Project directory layout
safezip/
├── src
│   └── safezip
│       ├── cli
│       │   ├── __init__.py
│       │   └── _main.py
│       ├── tests
│       │   ├── __init__.py
│       │   ├── conftest.py
│       │   ├── test_cli.py
│       │   ├── test_guard.py
│       │   ├── test_integration.py
│       │   ├── test_sandbox.py
│       │   └── test_streamer.py
│       ├── __init__.py
│       ├── _core.py
│       ├── _events.py
│       ├── _exceptions.py
│       ├── _guard.py
│       ├── _sandbox.py
│       ├── _streamer.py
│       └── py.typed
├── AGENTS.md
├── conftest.py
├── CONTRIBUTING.rst
├── docker-compose.yml
├── Dockerfile
├── Makefile
├── pyproject.toml
├── README.rst
└── tox.ini

README.rst

README.rst
=======
safezip
=======
.. image:: https://raw.githubusercontent.com/barseghyanartur/safezip/main/docs/_static/safezip_logo.webp
   :alt: SafeZip Logo
   :align: center

Hardened ZIP extraction for Python - secure by default.

.. image:: https://img.shields.io/pypi/v/safezip.svg
   :target: https://pypi.python.org/pypi/safezip
   :alt: PyPI Version

.. image:: https://img.shields.io/pypi/pyversions/safezip.svg
   :target: https://pypi.python.org/pypi/safezip/
   :alt: Supported Python versions

.. image:: https://github.com/barseghyanartur/safezip/actions/workflows/test.yml/badge.svg?branch=main
   :target: https://github.com/barseghyanartur/safezip/actions
   :alt: Build Status

.. image:: https://readthedocs.org/projects/safezip/badge/?version=latest
    :target: http://safezip.readthedocs.io
    :alt: Documentation Status

.. image:: https://img.shields.io/badge/docs-llms.txt-blue
    :target: https://safezip.readthedocs.io/en/latest/llms.txt
    :alt: llms.txt - documentation for LLMs

.. image:: https://img.shields.io/badge/license-MIT-blue.svg
   :target: https://github.com/barseghyanartur/safezip/#License
   :alt: MIT

.. image:: https://coveralls.io/repos/github/barseghyanartur/safezip/badge.svg?branch=main&service=github
    :target: https://coveralls.io/github/barseghyanartur/safezip?branch=main
    :alt: Coverage

``safezip`` is a zero-dependency, production-grade wrapper around Python's
``zipfile`` module that defends against the most common ZIP-based attacks:
ZipSlip path traversal, ZIP bombs, and malformed/crafted archives.

Features
========

- **ZipSlip protection** - relative traversal, absolute paths, Windows UNC
  paths, Unicode lookalike attacks, and null bytes in filenames are all
  blocked.
- **ZIP bomb protection** - per-member and cumulative decompression ratio
  limits abort extraction before runaway decompression can exhaust disk or
  memory.
- **File size limits** - per-member size is checked against the declared header
  value at open time (Guard phase) and again against actual decompressed bytes
  during streaming (Streamer phase).  Total extraction size is enforced
  cumulatively across all members at stream time.
- **ZIP64 consistency checks** - crafted archives with inconsistent ZIP64
  extra fields are rejected before decompression begins.
- **Symlink policy** - configurable: ``REJECT`` (default), ``IGNORE``, or
  ``RESOLVE_INTERNAL`` (symlink entries are extracted as regular files; no OS
  symlink is created on disk).
- **Atomic writes** - every member is written to a temporary file first;
  the destination is only created after all checks pass.  No partial files
  are left on disk after a security abort.
- **Secure by default** - all limits are active without any configuration.
- **Zero dependencies** - standard library only.
- **Environment variable overrides** - all
  limits (including ``symlink_policy``) can be set via ``SAFEZIP_*``
  environment variables for containerised deployments.

Prerequisites
=============

Python 3.10 or later.  No additional packages required.

Installation
============
With ``uv``:

.. code-block:: sh

    uv pip install safezip

Or with ``pip``:

.. code-block:: sh

    pip install safezip

Quick start
===========

Drop-in replacement for the common ``zipfile`` extraction pattern:

.. pytestfixture: file_zip
.. code-block:: python
    :name: test_safe_extract

    from safezip import safe_extract

    safe_extract("path/to/file.zip", "/var/files/extracted/")

Or use the ``SafeZipFile`` context manager for more control:

.. pytestfixture: file_zip
.. code-block:: python
    :name: test_safe_zipfile

    from safezip import SafeZipFile

    with SafeZipFile("path/to/file.zip") as zf:
        print(zf.namelist())
        zf.extractall("/var/files/extracted/")

Custom limits
=============
See the `Default limits`_ for reference.

.. pytestfixture: file_zip
.. code-block:: python
    :name: test_custom_limits

    from safezip import SafeZipFile, SymlinkPolicy

    with SafeZipFile(
        "path/to/file.zip",
        max_file_size=100 * 1024 * 1024,      # 100 MiB per member (default: 1 GiB)
        max_total_size=500 * 1024 * 1024,     # 500 MiB total (default: 5 GiB)
        max_files=1_000,                      # (default: 10 000)
        max_per_member_ratio=50.0,            # (default: 200)
        max_total_ratio=50.0,                 # (default: 200)
        max_nesting_depth=1,                  # (default: 3)
        symlink_policy=SymlinkPolicy.IGNORE,  # (default: SymlinkPolicy.REJECT)
    ) as zf:
        zf.extractall("/var/files/extracted/")

Recursive extraction
====================

When an archive contains nested ``.zip`` files, set ``recursive=True`` to
descend into them automatically. All safety limits apply at every level. Each
nested archive is extracted into a directory named after it (without the
extension). The ``.zip`` file itself is never written to disk.

.. pytestfixture: nested_file_zip
.. code-block:: python
    :name: test_recursive_extraction

    from safezip import SafeZipFile

    # archive.zip
    #   readme.txt
    #   data.zip          ← will be descended into, not extracted as a blob
    #     report.csv

    with SafeZipFile("path/to/archive.zip", recursive=True, max_nesting_depth=3) as zf:
        zf.extractall("/var/files/extracted/")

    # Result on disk:
    #   /var/files/extracted/readme.txt
    #   /var/files/extracted/data/report.csv

With ``max_nesting_depth=0``, opening any nested archive raises
``NestingDepthError`` before extracting a single byte from it:

.. pytestfixture: nested_file_zip
.. code-block:: python
    :name: test_recursive_extraction_depth_limit

    import pytest
    from safezip import SafeZipFile, NestingDepthError

    # archive.zip
    #   readme.txt
    #   data.zip          ← depth 1 exceeds max_nesting_depth=0 → NestingDepthError
    #     report.csv

    with pytest.raises(NestingDepthError):
        with SafeZipFile(
            "path/to/archive.zip", recursive=True, max_nesting_depth=0
        ) as zf:
            zf.extractall("/var/files/extracted/")

Security event monitoring
=========================

.. pytestfixture: file_zip
.. code-block:: python
    :name: test_security_event_monitoring

    from safezip import SafeZipFile, SecurityEvent

    def my_monitor(event: SecurityEvent) -> None:
        print(f"[safezip] {event.event_type} archive={event.archive_hash}")

    with SafeZipFile("path/to/file.zip", on_security_event=my_monitor) as zf:
        zf.extractall("/var/files/extracted/")

Environment variable overrides
==============================
See the `Default limits`_ for reference.

All limits can be overridden without changing code:

.. code-block:: sh

    export SAFEZIP_MAX_FILE_SIZE=104857600    # 100 MiB (default: 1 GiB)
    export SAFEZIP_MAX_TOTAL_SIZE=524288000   # 500 MiB (default: 5 GiB)
    export SAFEZIP_MAX_FILES=1000             # (default: 10 000)
    export SAFEZIP_MAX_PER_MEMBER_RATIO=50    # (default: 200)
    export SAFEZIP_MAX_TOTAL_RATIO=50         # (default: 200)
    export SAFEZIP_MAX_NESTING_DEPTH=1        # (default: 3)
    export SAFEZIP_SYMLINK_POLICY=ignore      # reject | ignore | resolve_internal (default: reject)

Default limits
==============

+--------------------------+------------+
| Parameter                | Default    |
+==========================+============+
| ``max_file_size``        | 1 GiB      |
+--------------------------+------------+
| ``max_total_size``       | 5 GiB      |
+--------------------------+------------+
| ``max_files``            | 10 000     |
+--------------------------+------------+
| ``max_per_member_ratio`` | 200        |
+--------------------------+------------+
| ``max_total_ratio``      | 200        |
+--------------------------+------------+
| ``max_nesting_depth``    | 3          |
+--------------------------+------------+
| ``symlink_policy``       | REJECT     |
+--------------------------+------------+
| ``recursive``            | False      |
+--------------------------+------------+

Testing
=======

All tests run inside Docker to prevent accidental pollution of the host system:

.. code-block:: sh

    make test

To test a specific Python version:

.. code-block:: sh

    make test-env ENV=py312

Writing documentation
=====================

Keep the following hierarchy:

.. code-block:: text

    =====
    title
    =====

    header
    ======

    sub-header
    ----------

    sub-sub-header
    ~~~~~~~~~~~~~~

    sub-sub-sub-header
    ^^^^^^^^^^^^^^^^^^

    sub-sub-sub-sub-header
    ++++++++++++++++++++++

    sub-sub-sub-sub-sub-header
    **************************

License
=======

MIT

Support
=======
For security issues contact me at the e-mail given in the `Author`_ section.

For overall issues, go
to `GitHub <https://github.com/barseghyanartur/safezip/issues>`_.

Author
======

Artur Barseghyan <artur.barseghyan@gmail.com>

CONTRIBUTING.rst

CONTRIBUTING.rst
Contributor guidelines
======================

.. _safezip: https://github.com/barseghyanartur/safezip/
.. _uv: https://docs.astral.sh/uv/
.. _tox: https://tox.wiki
.. _ruff: https://beta.ruff.rs/docs/
.. _doc8: https://doc8.readthedocs.io/
.. _pre-commit: https://pre-commit.com/#installation
.. _issues: https://github.com/barseghyanartur/safezip/issues
.. _discussions: https://github.com/barseghyanartur/safezip/discussions
.. _pull request: https://github.com/barseghyanartur/safezip/pulls
.. _versions manifest: https://github.com/actions/python-versions/blob/main/versions-manifest.json

Developer prerequisites
-----------------------

pre-commit
~~~~~~~~~~

Refer to `pre-commit`_ for installation instructions.

TL;DR:

.. code-block:: sh

    curl -LsSf https://astral.sh/uv/install.sh | sh  # Install uv
    uv tool install pre-commit                        # Install pre-commit
    pre-commit install                                # Install hooks

Installing `pre-commit`_ ensures all contributions adhere to the project's
code quality standards.

Code standards
--------------

`ruff`_ and `doc8`_ are triggered automatically by `pre-commit`_.

To run checks manually:

.. code-block:: sh

    make doc8
    make ruff

Virtual environment
-------------------

.. code-block:: sh

    make create-venv

Installation
------------

.. code-block:: sh

    make install

Testing
-------

**All tests must be run inside Docker.**  This prevents accidental extraction
of malicious test archives from reaching the host filesystem.

.. code-block:: sh

    make test

To test a single environment:

.. code-block:: sh

    make test-env ENV=py312

For an interactive shell inside the container:

.. code-block:: sh

    make shell

In any case, GitHub Actions runs the full matrix automatically on every push.

Releases
--------
**Build the package for releasing:**

.. code-block:: sh

    make package-build

----

**Test the built package:**

.. code-block:: sh

    make check-package-build

----

**Make a test release (test.pypi.org):**

.. code-block:: sh

    make test-release

----

**Release (pypi.org):**

.. code-block:: sh

    make release

Adding tests
------------

- All test archives must be crafted programmatically in ``conftest.py`` using
  Python's ``struct`` module or ``zipfile``.  Do not commit pre-built ``.zip``
  files.
- Every new security check must have a corresponding test in the relevant
  ``test_*.py`` file.
- Integration tests must verify that no partial files remain on disk after a
  security abort (atomic write contract).

Pull requests
-------------

Open a `pull request`_ to the ``dev`` branch only. Never directly to ``main``.

.. note::

    Create pull requests to the ``dev`` branch only!

Examples of welcome contributions:

- Fixing documentation typos or improving explanations.
- Adding test cases for new edge cases.
- Extending support for additional archive attack vectors.
- Improving error messages.

General checklist
~~~~~~~~~~~~~~~~~

- Does your change require documentation updates?
- Does your change require new tests?
- Does your change add any external dependencies?
  If so, reconsider: ``safezip`` is intentionally dependency-free.

When fixing bugs
~~~~~~~~~~~~~~~~

- Add a regression test that reproduces the bug before your fix.

When adding a new feature
~~~~~~~~~~~~~~~~~~~~~~~~~

- Update ``README.rst`` (quick start, default limits table if relevant).
- Update ``plan.rst`` if the architectural design changes.
- Add appropriate tests in the correct ``test_*.py`` file.

GitHub Actions
--------------

Tests run on Python 3.10–3.14 (all non-EOL versions).  See the
`versions manifest`_ for the full list of available Python versions.

Questions
---------

Ask on GitHub `discussions`_.

Issues
------

Report bugs or request features on GitHub `issues`_.

**Do not report security vulnerabilities on GitHub.**
Contact the author directly at artur.barseghyan@gmail.com.

AGENTS.md

AGENTS.md
# AGENTS.md — safezip

**Package version**: See pyproject.toml
**Repository**: https://github.com/barseghyanartur/safezip
**Maintainer**: Artur Barseghyan <artur.barseghyan@gmail.com>

This file is for AI agents and developers using AI assistants to work on or with
safezip. It covers two distinct roles: **using** the package in application code,
and **developing/extending** the package itself.

---

## 1. Project Mission (Never Deviate)

> Hardened ZIP extraction for Python — secure by default, zero dependencies,
> production-grade.

- Secure defaults are never relaxed without an explicit caller decision.
- No external dependencies. Ever.
- The three-phase security model (Guard → Sandbox → Streamer) is preserved.
- No partial files on disk after a security abort.

---

## 2. Using safezip in Application Code

### Simple case

<!-- pytestfixture: file_zip -->
```python name=test_simple_case
from safezip import safe_extract

# Secure defaults protect against all common attacks
safe_extract("path/to/file.zip", "/var/files/extracted/")
```

### With monitoring and custom limits

<!-- pytestfixture: file_zip -->
```python name=test_with_monitoring_and_custom_limits
from safezip import SafeZipFile, SecurityEvent

def monitor(event: SecurityEvent) -> None:
    print(f"Security event: {event.event_type}")

with SafeZipFile(
    "path/to/file.zip",
    max_file_size=100 * 1024 * 1024,  # 100 MiB per member
    on_security_event=monitor,
) as zf:
    zf.extractall("/var/files/extracted/")
```

### Exception handling

All safezip exceptions inherit from `SafezipError`:

<!-- pytestfixture: file_zip -->
```python name=test_exception_handling
from safezip import (
    safe_extract,
    SafezipError,
    UnsafeZipError,          # path traversal or disallowed symlink
    CompressionRatioError,   # ZIP bomb attempt
    FileSizeExceededError,   # member too large
    TotalSizeExceededError,  # cumulative size exceeded
    FileCountExceededError,  # too many entries
    MalformedArchiveError,   # structurally invalid archive
    NestingDepthError,       # nested archive depth exceeded
)

try:
    safe_extract("path/to/file.zip", "/var/files/extracted/")
except UnsafeZipError:
    ...
except CompressionRatioError:
    ...
except SafezipError:
    # catch-all for any safezip violation
    ...
```

### Secure defaults reference

<!-- pytestfixture: file_zip -->
```python name=test_secure_defaults_reference
from safezip import SafeZipFile, SymlinkPolicy

SafeZipFile(
    "path/to/file.zip",
    max_file_size=1 * 1024**3,       # 1 GiB per member
    max_total_size=5 * 1024**3,      # 5 GiB total
    max_files=10_000,
    max_per_member_ratio=200.0,
    max_total_ratio=200.0,
    max_nesting_depth=3,
    symlink_policy=SymlinkPolicy.REJECT,
)
```

All limits are overridable via environment variables:

| Variable | Type | Default |
|---|---|---|
| `SAFEZIP_MAX_FILE_SIZE` | int (bytes) | 1 GiB |
| `SAFEZIP_MAX_TOTAL_SIZE` | int (bytes) | 5 GiB |
| `SAFEZIP_MAX_FILES` | int | 10 000 |
| `SAFEZIP_MAX_PER_MEMBER_RATIO` | float | 200.0 |
| `SAFEZIP_MAX_TOTAL_RATIO` | float | 200.0 |
| `SAFEZIP_MAX_NESTING_DEPTH` | int | 3 |
| `SAFEZIP_SYMLINK_POLICY` | str | reject |

Resolution order: constructor argument > environment variable > hardcoded default.
Invalid env values are logged and silently ignored.

### What safezip does not do

- **Write mode**`SafeZipFile` is read-only. It does not expose `open()`,
  `read()`, or any write-mode methods from `zipfile.ZipFile`.
- **Recursive extraction** — nested `.zip` members are extracted as raw files.
  Recursion, if needed, is the caller's responsibility via `_nesting_depth`.
- **Create OS symlinks**`RESOLVE_INTERNAL` extracts symlink entries as
  regular files containing the target path as bytes. See section 5.

---

## 3. Architecture

Each extraction passes through three phases in order. Each phase owns exactly
one module. When adding a new check, identify the correct phase first.

| Phase | File | Runs | Raises |
|---|---|---|---|
| **Guard** | `_guard.py` | On `SafeZipFile.__init__()`, before any decompression | `FileCountExceededError`, `FileSizeExceededError`, `MalformedArchiveError` |
| **Sandbox** | `_sandbox.py` | Per member, before streaming begins | `UnsafeZipError` |
| **Streamer** | `_streamer.py` | Per member, during decompression | `FileSizeExceededError`, `TotalSizeExceededError`, `CompressionRatioError` |

**Guard** owns: file count limit, declared per-member size, ZIP64 consistency,
null bytes in filenames.

**Sandbox** owns: path traversal detection, absolute/UNC path rejection, Unicode
NFC normalisation, null-byte rejection, path length limit, symlink policy
(REJECT / IGNORE / RESOLVE_INTERNAL).

**Streamer** owns: per-member decompressed size, cumulative total size,
per-member ratio, cumulative ratio, atomic write contract (temp file → rename
on success, unlink on failure).

**Orchestration** (`_core.py`) — `SafeZipFile` and `safe_extract`. `_extract_one`
calls the three phases in order per member. Environment variable resolution,
security event emission, and symlink policy dispatch live here.

### Key files

| File | Purpose |
|---|---|
| `src/safezip/_core.py` | Public API, orchestration, env overrides, event emission |
| `src/safezip/_guard.py` | Phase A: static pre-checks |
| `src/safezip/_sandbox.py` | Phase B: path resolution, symlink policy |
| `src/safezip/_streamer.py` | Phase C: streaming extraction, atomic writes |
| `src/safezip/_exceptions.py` | Exception hierarchy (all inherit `SafezipError`) |
| `src/safezip/_events.py` | `SecurityEvent`, `SymlinkPolicy`, callback type |
| `src/safezip/tests/conftest.py` | All test archive fixtures |
| `pyproject.toml` | Build, ruff, mypy, pytest-cov configuration |
| `README.rst` | End-user documentation; keep in sync with code |

---

## 4. Security Principles

**1. Default limits are sacred.**
Never lower them in examples or generated code. If a user asks you to relax a
limit, warn about the tradeoff explicitly before complying.

**2. Atomicity is non-negotiable.**
Every member must follow: temp file → all checks pass → `replace()` to
destination. On any exception: `unlink(missing_ok=True)` the temp file. The
destination must never be created or modified if a check fails. No partial
files may remain on disk.

**3. Never merge phase responsibilities.**
Path checks belong in `_sandbox.py`. Static header checks in `_guard.py`.
Runtime byte checks in `_streamer.py`. Do not add path logic to the streamer
or size logic to the guard.

**4. Zero external dependencies.**
stdlib only. If you are considering adding an import that is not in the Python
standard library, the answer is no.

**5. Security events must not be suppressible.**
Exceptions raised inside `on_security_event` callbacks are caught and logged,
but the original security exception always propagates. Never let a broken
callback silently swallow a violation.

---

## 5. Known Intentional Behaviors — Do Not Treat as Bugs

### RESOLVE_INTERNAL extracts symlink entries as regular files

ZIP entries flagged as symlinks (via `external_attr` Unix mode `S_IFLNK`) are
written as regular files containing the link target path as bytes. Python's
`zipfile` does not create OS symlinks. The post-extraction `check_symlink` /
`_verify_symlink_chain` code in `_sandbox.py` is only reached if the OS creates
an actual symlink, which does not happen in the current extraction path.

This is **safe**: a regular file containing the text `"../escape.txt"` is
harmless. The README description ("full chain verification") describes intended
future behavior, not current behavior.

**If asked to implement real symlink support:** in `_extract_one`, for
`RESOLVE_INTERNAL` + `is_symlink_entry`, read the target bytes, call
`os.symlink(target, dest)`, then call `check_symlink(dest, base, policy)`,
unlink if unsafe. Add tests for both safe and escaping targets. Update README.

### compress_size == 0 skips the ratio check — this is correct

The ratio check in `_streamer.py` is gated on `compress_size > 0`. This is not
a vulnerability. Python's `zipfile` uses the central directory's `compress_size`
to control how many compressed bytes it reads. The only case where
`compress_size == 0` reaches the streamer for a member that successfully
decompresses is a genuinely empty member (zero bytes), for which skipping the
ratio check is correct behavior.

A crafted archive with `compress_size=0` in the central directory but non-empty
content is rejected by Python's `zipfile` with `BadZipFile` (CRC failure) before
the streamer is reached. This has been empirically verified. **Do not attempt to
"fix" this skip.**

### Nested archives are extracted as raw files

Members with ZIP-like extensions (`.zip`, `.jar`, `.whl`, `.egg`, etc.) are
extracted as opaque blobs. `SafeZipFile` does not auto-recurse. The
`_nesting_depth` parameter and `NestingDepthError` exist to guard against
runaway recursion if a caller implements manual recursion.

---

## 6. Agent Workflow: Adding Features or Fixing Bugs

When asked to add a feature or fix a bug, follow these steps in order:

1. **Check the mission** — Does the change preserve zero deps, secure defaults,
   and the three-phase model?
2. **Identify the correct phase** — Guard (static/header), Sandbox (path/policy),
   or Streamer (runtime/bytes).
3. **For bug fixes: write the regression fixture first** — Add a programmatic
   archive fixture to `src/safezip/tests/conftest.py` that reproduces the bug.
   The test must fail before your fix.
4. **Implement the change** in the correct phase file.
5. **Add/update exceptions** in `_exceptions.py` if a new error type is needed
   (inherit from `SafezipError`).
6. **Add event emission** in `_core.py` (`self._emit_event("event_type")`) if
   the check fires inside `_extract_one`.
7. **Export** new public symbols from `__init__.py` and `__all__`.
8. **Write tests:**
   - Unit test in `test_[phase].py` (e.g., `test_streamer.py`).
   - Integration test in `test_integration.py` verifying no partial files remain.
   - Legitimate-input test confirming the happy path still works.
9. **Update `README.rst`** if the API or default limits table changed.
10. **Suggest running:** Either single environement
    test `make test-env ENV=py312` or test all envionments `make test`.
11. **Suggest running:** `make pre-commit`.

### Acceptable new features

- Windows reserved filename detection (Phase B / Sandbox).
- Additional event types for new violation categories.
- Optional recursive extraction (caller-controlled, guarded by `_nesting_depth`).
- Real OS symlink creation under `RESOLVE_INTERNAL` (see section 5).

### Forbidden

- Adding any external dependency.
- Lowering default limits.
- Bypassing or merging phases.
- Writing directly to the destination path (must use temp file).
- Exposing write-mode or `open()`/`read()` methods on `SafeZipFile`.

---

## 7. Testing Rules

### All tests must run inside Docker

```sh
make test                   # full matrix (Python 3.10–3.14)
make test-env ENV=py312     # single version
make shell                  # interactive shell
```

Do not run `pytest` directly on the host machine. Malicious test archives must
not touch the host filesystem.

### Test layout

```
src/safezip/tests/
    conftest.py          — all archive fixtures (add new ones here)
    test_guard.py        — Phase A tests
    test_sandbox.py      — Phase B tests
    test_streamer.py     — Phase C tests
    test_integration.py  — end-to-end tests
```

The **root `conftest.py`** (project root) is for `pytest-codeblock` documentation
testing only. Do not add security fixtures there.

### Fixture rules

- Craft all test archives programmatically using `struct` or `zipfile`. Do not
  commit pre-built `.zip` files.
- Use `tmp_path` for all output. Never write to a fixed path.

### Required assertions for every security abort test

```python
# 1. pytest.raises wraps the full operation, not just extractall
with pytest.raises(SpecificError):
    with SafeZipFile(...) as zf:
        zf.extractall(dest)

# 2. Atomicity: no partial files remain
remaining = [f for f in dest.rglob("*") if not f.is_dir()]
assert not remaining
```

### Checklist for every new security check

- [ ] Fixture in `conftest.py` that triggers the violation
- [ ] Test asserting the correct exception is raised
- [ ] Test asserting no partial files remain after abort
- [ ] Test asserting a legitimate archive still extracts correctly
- [ ] Integration test in `test_integration.py`
- [ ] Event emission tested if applicable

---

## 8. Coding Conventions

Run all linting checks:

```sh
make pre-commit
```

### Formatting

- Line length: **88 characters** (ruff).
- Import sorting: `isort`; `safezip` is `known-first-party`.
- Target: `py310`. Run `make ruff` to check. `ruff fix = true` auto-fixes on
  commit — do not fight the formatter.

### Ruff rules in effect

`B`, `C4`, `E`, `F`, `G`, `I`, `ISC`, `INP`, `N`, `PERF`, `Q`, `SIM`.

Explicitly ignored:

| Rule | Reason |
|---|---|
| `G004` | f-strings in logging calls are allowed |
| `ISC003` | implicit string concatenation across lines is allowed |
| `PERF203` | `try/except` in loops allowed in `conftest.py` only |

### Style

- Every non-test module must have `__all__`, `__author__`, `__copyright__`,
  `__license__` at module level.
- Logger: always `logging.getLogger("safezip.security")`. Never use `__name__`.
- Log member names truncated to 256 characters in `extra` dicts (privacy).
- Always chain exceptions: `raise X(...) from exc`.
- Type annotations on all public functions. Use `Optional[X]` (not `X | None`)
  to match the existing codebase.
- `SecurityEvent` must never include member names, paths, or filesystem
  information — `event_type`, `archive_hash`, and `timestamp` only.

### Pull requests

Target the `dev` branch only. Never open a PR directly to `main`.

---

## 9. Prompt Templates

**Explaining usage to a user:**
> You are an expert in secure Python file handling. Explain how to use safezip
> for [task]. Start with secure defaults. Include exception handling. Note that
> symlink entries are extracted as regular files, not OS symlinks.

**Implementing a new feature:**
> Extend safezip with [feature]. Follow the AGENTS.md agent workflow (section 6):
> identify the correct phase, implement, add tests verifying atomicity and events,
> update README. Preserve zero external dependencies and secure defaults.

**Fixing a bug:**
> Reproduce [bug] with a new programmatic fixture in conftest.py. The test must
> fail before the fix. Then fix in the correct phase file. Add tests asserting
> the correct exception, no partial files on disk, and that legitimate archives
> still extract successfully.

**Reviewing a change:**
> Review this safezip change against AGENTS.md: Does it preserve zero deps?
> Does it maintain the three-phase model? Does it follow the atomic write
> contract? Are all new checks tested with both violation and legitimate inputs?

conftest.py

conftest.py
"""
Pytest fixtures for documentation testing.

DO NOT ADD OTHER FIXTURES HERE.
"""

import io
import zipfile
from pathlib import Path

import pytest


@pytest.fixture()
def file_zip(tmp_path):
    """A valid ZIP file named file.zip."""
    buf = io.BytesIO()
    with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        zf.writestr("hello.txt", b"Hello, world!\n")
    p = Path("path/to") / "file.zip"
    p.write_bytes(buf.getvalue())
    return p


@pytest.fixture()
def nested_file_zip(tmp_path):
    """archive.zip containing readme.txt and data.zip (which contains report.csv).

    Matches the README 'Recursive extraction' example exactly::

        archive.zip
          readme.txt
          data.zip
            report.csv
    """
    inner_buf = io.BytesIO()
    with zipfile.ZipFile(inner_buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        zf.writestr("report.csv", b"id,value\n1,100\n")

    buf = io.BytesIO()
    with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        zf.writestr("readme.txt", b"Archive readme\n")
        zf.writestr("data.zip", inner_buf.getvalue())

    p = Path("path/to") / "archive.zip"
    p.write_bytes(buf.getvalue())
    return p

docker-compose.yml

docker-compose.yml
services:
  tox:
    build: .
    volumes:
      - ./htmlcov:/app/htmlcov

pyproject.toml

pyproject.toml
[project]
name = "safezip"
description = "Hardened ZIP extraction for Python - secure by default."
readme = "README.rst"
version = "0.1.6"
requires-python = ">=3.10"
dependencies = []
authors = [
    { name = "Artur Barseghyan", email = "artur.barseghyan@gmail.com" },
]
maintainers = [
    { name = "Artur Barseghyan", email = "artur.barseghyan@gmail.com" },
]
license = "MIT"
classifiers = [
    "Development Status :: 4 - Beta",
    "Intended Audience :: Developers",
    "Operating System :: OS Independent",
    "Programming Language :: Python :: 3.10",
    "Programming Language :: Python :: 3.11",
    "Programming Language :: Python :: 3.12",
    "Programming Language :: Python :: 3.13",
    "Programming Language :: Python :: 3.14",
    "Programming Language :: Python :: 3.15",
    "Programming Language :: Python",
    "Topic :: Security",
    "Topic :: Software Development :: Libraries :: Python Modules",
    "Topic :: System :: Archiving :: Compression",
]
keywords = [
    "zip",
    "security",
    "zipslip",
    "zipbomb",
    "hardened",
    "safe",
]

[project.scripts]
safezip = "safezip.cli:main"

[project.urls]
Homepage = "https://github.com/barseghyanartur/safezip/"
Repository = "https://github.com/barseghyanartur/safezip/"
Issues = "https://github.com/barseghyanartur/safezip/issues"

[project.optional-dependencies]
all = ["safezip[dev,test,docs,build]"]
dev = [
    "detect-secrets",
    "doc8",
    "ipython",
    "mypy",
    "ruff",
    "uv",
]
test = [
    "pytest",
    "pytest-cov",
    "pytest-codeblock",
]
docs = [
    "sphinx",
    "sphinx-autobuild",
    "sphinx-rtd-theme>=1.3.0",
    "sphinx-no-pragma",
    "sphinx-markdown-builder",
    "sphinx-llms-txt-link",
    "sphinx-source-tree",
]
build = [
    "build",
    "twine",
    "wheel",
]

[tool.setuptools]
package-dir = {"" = "src"}

[tool.setuptools.packages.find]
where = ["src"]
include = ["safezip", "safezip.*"]

[build-system]
requires = ["setuptools>=41.0", "wheel"]
build-backend = "setuptools.build_meta"

[tool.ruff]
line-length = 88
lint.select = [
    "B",
    "C4",
    "E",
    "F",
    "G",
    "I",
    "ISC",
    "INP",
    "N",
    "PERF",
    "Q",
    "SIM",
]
lint.ignore = [
    "G004",
    "ISC003",
]
fix = true
src = ["src/safezip"]
exclude = [
    ".bzr",
    ".direnv",
    ".eggs",
    ".git",
    ".hg",
    ".mypy_cache",
    ".nox",
    ".pants.d",
    ".ruff_cache",
    ".svn",
    ".tox",
    ".venv",
    "__pypackages__",
    "_build",
    "buck-out",
    "build",
    "dist",
    "node_modules",
    "venv",
    "docs",
]
target-version = "py310"
# Allow unused variables when underscore-prefixed.
lint.dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"

[tool.ruff.lint.isort]
known-first-party = ["safezip"]

[tool.ruff.lint.per-file-ignores]
"conftest.py" = [
    "PERF203"  # Allow `try`-`except` within a loop incurs performance overhead
]

[tool.doc8]
ignore-path = [
    "docs/requirements.txt",
    "src/safezip.egg-info/SOURCES.txt",
]

[tool.pytest.ini_options]
addopts = [
    "-ra",
    "-vvv",
    "-q",
    "--cov=safezip",
    "--ignore=.tox",
    "--cov-report=html",
    "--cov-report=term",
    "--cov-append",
    "--capture=no",
]
testpaths = [
    "src/safezip/tests",
    ".",
    "**/*.rst",
    "**/*.md",
]
pythonpath = ["src"]
norecursedirs = [".git", ".tox"]

[tool.coverage.run]
relative_files = true
omit = [".tox/*"]
source = ["safezip"]

[tool.coverage.report]
show_missing = true
exclude_lines = [
    "pragma: no cover",
    "@overload",
]

[tool.mypy]
check_untyped_defs = true
warn_unused_ignores = true
warn_redundant_casts = true
warn_unused_configs = true
ignore_missing_imports = true

[tool.sphinx-source-tree]
ignore = [
    "*.egg-info",
    "*.py,cover",
    "*.pyc",
    "*.pyo",
    ".DS_Store",
    ".coverage",
    ".coverage.*",
    ".git",
    ".hg",
    ".hypothesis",
    ".idea",
    ".mypy_cache",
    ".nox",
    ".pre-commit-config.yaml",
    ".pre-commit-hooks.yaml",
    ".pytest_cache",
    ".readthedocs.yaml",
    ".ruff_cache",
    ".secrets.baseline",
    ".svn",
    ".tox",
    ".venv",
    ".vscode",
    "CHANGELOG.rst",
    "CODE_OF_CONDUCT.rst",
    "LICENSE",
    "SECURITY.rst",
    "Thumbs.db",
    "__pycache__",
    "build",
    "codebin",
    "dist",
    "docs/Makefile",
    "docs/_build",
    "docs/_static",
    "docs/changelog.rst",
    "docs/code_of_conduct.rst",
    "docs/customization",
    "docs/make.bat",
    "docs/requirements.txt",
    "docs/security.rst",
    "docs/source_tree.rst",
    "docs/source_tree_full.rst",
    "env",
    "htmlcov",
    "node_modules",
    "venv",
    "ARCHITECTURE.rst",
    ".coderabbit.yaml",
    ".coveralls",
    "docs/full-llms.rst",
    "docs/llms.rst",
    "docs/contributor_guidelines.rst",
    "docs/package.rst",
    "docs/documentation.rst",
    "docs/index.rst",
]
order = [
    "README.rst",
    "CONTRIBUTING.rst",
    "AGENTS.md",
]

[[tool.sphinx-source-tree.files]]
output = "docs/full_llms.rst"
title = "Full project source-tree"

[[tool.sphinx-source-tree.files]]
output = "docs/llms.rst"
title = "Project source-tree"
ignore = [
    "*.egg-info",
    "*.py,cover",
    "*.pyc",
    "*.pyo",
    ".DS_Store",
    ".coverage",
    ".coverage.*",
    ".git",
    ".hg",
    ".hypothesis",
    ".idea",
    ".mypy_cache",
    ".nox",
    ".pre-commit-config.yaml",
    ".pre-commit-hooks.yaml",
    ".pytest_cache",
    ".readthedocs.yaml",
    ".ruff_cache",
    ".secrets.baseline",
    ".svn",
    ".tox",
    ".venv",
    ".vscode",
    "CHANGELOG.rst",
    "CODE_OF_CONDUCT.rst",
    "LICENSE",
    "SECURITY.rst",
    "Thumbs.db",
    "__pycache__",
    "build",
    "codebin",
    "dist",
    "docs/Makefile",
    "docs/_build",
    "docs/_static",
    "docs/changelog.rst",
    "docs/code_of_conduct.rst",
    "docs/customization",
    "docs/make.bat",
    "docs/requirements.txt",
    "docs/security.rst",
    "docs/source_tree.rst",
    "docs/source_tree_full.rst",
    "env",
    "htmlcov",
    "node_modules",
    "venv",
    "examples",
    "docs",
    "ARCHITECTURE.rst",
    ".coderabbit.yaml",
    ".coveralls",
    "docs/full-llms.rst",
    "docs/llms.rst",
    "docs/contributor_guidelines.rst",
    "docs/package.rst",
    "docs/documentation.rst",
    "docs/index.rst",
]

src/safezip/__init__.py

src/safezip/__init__.py
"""safezip - Hardened ZIP extraction for Python."""

from ._core import SafeZipFile, safe_extract
from ._events import SecurityEvent, SymlinkPolicy
from ._exceptions import (
    CompressionRatioError,
    FileCountExceededError,
    FileSizeExceededError,
    MalformedArchiveError,
    NestingDepthError,
    SafezipError,
    TotalSizeExceededError,
    UnsafeZipError,
)

__title__ = "safezip"
__version__ = "0.1.6"
__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"
__all__ = (
    # Core
    "SafeZipFile",
    "safe_extract",
    # Events / policy
    "SecurityEvent",
    "SymlinkPolicy",
    # Exceptions
    "SafezipError",
    "UnsafeZipError",
    "FileSizeExceededError",
    "TotalSizeExceededError",
    "CompressionRatioError",
    "FileCountExceededError",
    "NestingDepthError",
    "MalformedArchiveError",
)

src/safezip/_core.py

src/safezip/_core.py
"""SafeZipFile: the public hardened wrapper around zipfile.ZipFile."""

import hashlib
import logging
import os
import stat
import zipfile
from contextlib import suppress
from pathlib import Path
from typing import BinaryIO, Optional, Union

from ._events import SecurityEvent, SecurityEventCallback, SymlinkPolicy
from ._exceptions import (
    CompressionRatioError,
    FileCountExceededError,
    FileSizeExceededError,
    MalformedArchiveError,
    NestingDepthError,
    TotalSizeExceededError,
    UnsafeZipError,
)
from ._guard import validate_archive
from ._sandbox import check_symlink, resolve_member_path
from ._streamer import CumulativeCounters, stream_extract_member

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"
__all__ = (
    "SafeZipFile",
    "safe_extract",
)

log = logging.getLogger("safezip.security")

_ARCHIVE_EXTENSIONS = frozenset(
    {".zip", ".jar", ".war", ".ear", ".apk", ".aar", ".whl", ".egg"}
)


def _archive_stem(name: str) -> str:
    """Strip the archive extension from *name*, returning the base stem.

    Handles single extensions only (ZIP archives do not use compound
    extensions like .tar.gz), but normalises consistently.

    Examples::

        archive.zip  → archive
        lib.whl      → lib
        app.jar      → app
        data.csv     → data.csv   (non-archive extension unchanged)
    """
    p = Path(name)
    if p.suffix.lower() in _ARCHIVE_EXTENSIONS:
        return p.stem
    return name


def _env_int(name: str, default: int) -> int:
    val = os.environ.get(name)
    if val is None:
        return default
    try:
        return int(val)
    except ValueError:
        return default


def _env_float(name: str, default: float) -> float:
    val = os.environ.get(name)
    if val is None:
        return default
    try:
        return float(val)
    except ValueError:
        return default


def _env_bool(name: str, default: bool) -> bool:
    val = os.environ.get(name)
    if val is None:
        return default
    if val.lower() in ("1", "true", "yes", "on"):
        return True
    if val.lower() in ("0", "false", "no", "off"):
        return False
    log.warning(
        "Ignoring unrecognised %s value %r; using default %r.",
        name,
        val,
        default,
    )
    return default


def _sanitise_mode(path: Path, *, strip_special_bits: bool = True) -> None:
    """Strip setuid/setgid/sticky bits from *path* if requested."""
    if not strip_special_bits:
        return
    try:
        current = path.stat().st_mode
        safe = current & ~(stat.S_ISUID | stat.S_ISGID | stat.S_ISVTX)
        if safe != current:
            os.chmod(path, safe)
    except OSError:
        pass  # best-effort; extraction already succeeded


def _env_symlink_policy(default: SymlinkPolicy) -> SymlinkPolicy:
    """Read SAFEZIP_SYMLINK_POLICY from the environment.

    Accepted values (case-insensitive): ``reject``, ``ignore``,
    ``resolve_internal``.  Any other value is logged and ignored.
    """
    val = os.environ.get("SAFEZIP_SYMLINK_POLICY")
    if val is None:
        return default
    mapping = {
        "reject": SymlinkPolicy.REJECT,
        "ignore": SymlinkPolicy.IGNORE,
        "resolve_internal": SymlinkPolicy.RESOLVE_INTERNAL,
    }
    resolved = mapping.get(val.lower())
    if resolved is None:
        log.warning(
            "Ignoring unrecognised SAFEZIP_SYMLINK_POLICY value %r; using default %r.",
            val,
            default.value,
        )
        return default
    return resolved


_DEFAULT_MAX_FILE_SIZE: int = _env_int("SAFEZIP_MAX_FILE_SIZE", 1 * 1024**3)
_DEFAULT_MAX_TOTAL_SIZE: int = _env_int("SAFEZIP_MAX_TOTAL_SIZE", 5 * 1024**3)
_DEFAULT_MAX_FILES: int = _env_int("SAFEZIP_MAX_FILES", 10_000)
_DEFAULT_MAX_PER_MEMBER_RATIO: float = _env_float("SAFEZIP_MAX_PER_MEMBER_RATIO", 200.0)
_DEFAULT_MAX_TOTAL_RATIO: float = _env_float("SAFEZIP_MAX_TOTAL_RATIO", 200.0)
_DEFAULT_MAX_NESTING_DEPTH: int = _env_int("SAFEZIP_MAX_NESTING_DEPTH", 3)
_DEFAULT_SYMLINK_POLICY: SymlinkPolicy = _env_symlink_policy(SymlinkPolicy.REJECT)
_DEFAULT_RECURSIVE: bool = _env_bool("SAFEZIP_RECURSIVE", False)


def _archive_hash(file: Union[str, os.PathLike, BinaryIO]) -> str:
    """Return first 16 hex characters of SHA-256 of archive content (first 64 KiB).

    Content-based hashing ensures different files at the same path produce
    different hashes in SecurityEvent records.
    """
    h = hashlib.sha256()
    if isinstance(file, (str, os.PathLike)):
        try:
            with open(file, "rb") as fh:
                h.update(fh.read(65536))
        except OSError:
            h.update(str(file).encode())
        return h.hexdigest()[:16]

    pos = file.tell()
    try:
        h.update(file.read(65536))
    finally:
        with suppress(OSError):
            file.seek(pos)
    return h.hexdigest()[:16]


class SafeZipFile:
    """A hardened, composition-based wrapper around :class:`zipfile.ZipFile`.

    All defences are enabled by default.  Limits can be relaxed by passing
    explicit constructor arguments or by setting environment variables.

    .. note::

        This class intentionally does **not** expose ``open()``, ``read()``,
        or any write-mode methods from the underlying ``zipfile.ZipFile``.
        Callers needing lower-level access must use ``zipfile.ZipFile``
        directly, accepting the associated risks.
    """

    def __init__(
        self,
        file: Union[str, os.PathLike, BinaryIO],
        mode: str = "r",
        *,
        max_file_size: Optional[int] = None,
        max_total_size: Optional[int] = None,
        max_files: Optional[int] = None,
        max_per_member_ratio: Optional[float] = None,
        max_total_ratio: Optional[float] = None,
        max_nesting_depth: Optional[int] = None,
        symlink_policy: Optional[SymlinkPolicy] = None,
        password: Optional[bytes] = None,
        on_security_event: SecurityEventCallback = None,
        _nesting_depth: int = 0,
        recursive: Optional[bool] = None,
        strip_special_bits: bool = True,
    ) -> None:
        # Resolve limits: constructor arg > env var > module-level default
        # Env vars are read at runtime to support test monkeypatching
        self._max_file_size = (
            max_file_size
            if max_file_size is not None
            else _env_int("SAFEZIP_MAX_FILE_SIZE", _DEFAULT_MAX_FILE_SIZE)
        )
        self._max_total_size = (
            max_total_size
            if max_total_size is not None
            else _env_int("SAFEZIP_MAX_TOTAL_SIZE", _DEFAULT_MAX_TOTAL_SIZE)
        )
        self._max_files = (
            max_files
            if max_files is not None
            else _env_int("SAFEZIP_MAX_FILES", _DEFAULT_MAX_FILES)
        )
        self._max_per_member_ratio = (
            max_per_member_ratio
            if max_per_member_ratio is not None
            else _env_float(
                "SAFEZIP_MAX_PER_MEMBER_RATIO", _DEFAULT_MAX_PER_MEMBER_RATIO
            )
        )
        self._max_total_ratio = (
            max_total_ratio
            if max_total_ratio is not None
            else _env_float("SAFEZIP_MAX_TOTAL_RATIO", _DEFAULT_MAX_TOTAL_RATIO)
        )
        self._max_nesting_depth = (
            max_nesting_depth
            if max_nesting_depth is not None
            else _env_int("SAFEZIP_MAX_NESTING_DEPTH", _DEFAULT_MAX_NESTING_DEPTH)
        )
        self._symlink_policy = (
            symlink_policy
            if symlink_policy is not None
            else _env_symlink_policy(_DEFAULT_SYMLINK_POLICY)
        )
        self._recursive = (
            recursive
            if recursive is not None
            else _env_bool("SAFEZIP_RECURSIVE", _DEFAULT_RECURSIVE)
        )
        self._strip_special_bits = strip_special_bits
        self._password = password
        self._on_security_event = on_security_event
        self._archive_hash = _archive_hash(file)
        self._nesting_depth = _nesting_depth

        if _nesting_depth > self._max_nesting_depth:
            self._emit_event("nesting_depth_exceeded")
            log.warning(
                "Nesting depth limit exceeded",
                extra={
                    "event": "nesting_depth_exceeded",
                    "nesting_depth": _nesting_depth,
                    "max_nesting_depth": self._max_nesting_depth,
                    "archive_hash": self._archive_hash,
                },
            )
            raise NestingDepthError(
                f"Nested archive depth {_nesting_depth} exceeds "
                f"max_nesting_depth={self._max_nesting_depth}."
            )

        try:
            self._zf = zipfile.ZipFile(file, mode)
        except zipfile.BadZipFile as exc:
            raise MalformedArchiveError(f"Cannot open archive: {exc}") from exc

        # Run the Guard immediately on open
        try:
            validate_archive(
                self._zf, self._max_files, self._max_file_size, self._max_total_size
            )
        except FileCountExceededError:
            self._emit_event("file_count_exceeded")
            raise
        except FileSizeExceededError:
            self._emit_event("declared_size_exceeded")
            raise
        except MalformedArchiveError:
            self._emit_event("malformed_archive")
            raise

    # ------------------------------------------------------------------
    # Context manager
    # ------------------------------------------------------------------

    def __enter__(self) -> "SafeZipFile":
        return self

    def __exit__(self, *args: object) -> None:
        self.close()

    def close(self) -> None:
        """Close the underlying archive."""
        self._zf.close()

    # ------------------------------------------------------------------
    # Read-only inspection (safe subset of zipfile.ZipFile)
    # ------------------------------------------------------------------

    def namelist(self) -> list:
        """Return a list of archive member names."""
        return self._zf.namelist()

    def infolist(self) -> list:
        """Return a list of ZipInfo objects for all archive members."""
        return self._zf.infolist()

    def getinfo(self, name: str) -> zipfile.ZipInfo:
        """Return a ZipInfo object for *name*."""
        return self._zf.getinfo(name)

    # ------------------------------------------------------------------
    # Extraction
    # ------------------------------------------------------------------

    def extract(
        self,
        member: Union[str, zipfile.ZipInfo],
        path: Union[str, os.PathLike],
        *,
        pwd: Optional[bytes] = None,
    ) -> str:
        """Safely extract a single *member* to *path*.

        :param member: Member name string or ZipInfo object.
        :param path: Destination directory (required; no default).
        :param pwd: Optional decryption password.
        :returns: The path to the extracted file as a string.
        :raises UnsafeZipError: On path traversal, absolute paths, or symlinks.
        :raises FileSizeExceededError: If the member is too large.
        :raises CompressionRatioError: If the compression ratio is too high.
        :raises TypeError: If path is None.
        """
        if path is None:
            raise TypeError(
                "SafeZipFile.extract() requires an explicit 'path' argument."
            )
        base = Path(path).resolve()
        counters = CumulativeCounters()
        info = (
            member if isinstance(member, zipfile.ZipInfo) else self._zf.getinfo(member)
        )
        dest = self._extract_one(info, base, counters, pwd or self._password)
        return str(dest)

    def extractall(
        self,
        path: Union[str, os.PathLike],
        members: Optional[list] = None,
        *,
        pwd: Optional[bytes] = None,
    ) -> None:
        """Safely extract all (or selected) members to *path*.

        :param path: Destination directory (required; no default).
        :param members: Optional list of member names or ZipInfo objects.
        :param pwd: Optional decryption password.
        :raises UnsafeZipError: On path traversal, absolute paths, or symlinks.
        :raises FileSizeExceededError: If any member is too large.
        :raises TotalSizeExceededError: If total extracted size is too large.
        :raises CompressionRatioError: If any ratio limit is exceeded.
        :raises TypeError: If path is None.
        """
        if path is None:
            raise TypeError(
                "SafeZipFile.extractall() requires an explicit 'path' argument; "
                "extraction to the current working directory is not permitted."
            )
        base = Path(path).resolve()
        counters = CumulativeCounters()
        effective_pwd = pwd or self._password

        if members is None:
            infos = self._zf.infolist()
        else:
            infos = [
                m if isinstance(m, zipfile.ZipInfo) else self._zf.getinfo(m)
                for m in members
            ]

        for info in infos:
            self._extract_one(info, base, counters, effective_pwd)

    def _extract_one(
        self,
        info: zipfile.ZipInfo,
        base: Path,
        counters: CumulativeCounters,
        pwd: Optional[bytes],
    ) -> Path:
        """Core per-member extraction logic."""
        # Directories - create and skip streaming
        if info.filename.endswith("/"):
            dest = resolve_member_path(base, info.filename.rstrip("/"))
            dest.mkdir(parents=True, exist_ok=True)
            return dest

        # Validate and resolve the destination path (Sandbox / Phase B)
        try:
            dest = resolve_member_path(base, info.filename)
        except UnsafeZipError:
            self._emit_event("zip_slip_detected")
            log.warning(
                "Path traversal attempt blocked",
                extra={
                    "event": "zip_slip_detected",
                    "member": info.filename[:256],
                    "archive_hash": self._archive_hash,
                },
            )
            raise

        # Check for symlinks in the *source* entry
        # (detect if the ZIP entry itself is stored as a symlink)
        attr = (info.external_attr >> 16) & 0xFFFF
        is_symlink_entry = bool(attr and stat.S_ISLNK(attr))

        if is_symlink_entry:
            if self._symlink_policy == SymlinkPolicy.REJECT:
                self._emit_event("symlink_rejected")
                log.warning(
                    "Symlink entry rejected",
                    extra={
                        "event": "symlink_rejected",
                        "member": info.filename[:256],
                        "archive_hash": self._archive_hash,
                    },
                )
                raise UnsafeZipError(
                    f"Symlink entry {info.filename!r} rejected (symlink_policy=REJECT)."
                )
            if self._symlink_policy == SymlinkPolicy.IGNORE:
                self._emit_event("symlink_ignored")
                log.warning(
                    "Symlink entry skipped (IGNORE policy)",
                    extra={
                        "event": "symlink_ignored",
                        "member": info.filename[:256],
                        "archive_hash": self._archive_hash,
                    },
                )
                return dest

        # Nested archive guard
        suffix = Path(info.filename).suffix.lower()
        is_archive_extension = suffix in _ARCHIVE_EXTENSIONS

        # Non-recursive: keep the debug log but don't gate on content
        if not self._recursive:
            if is_archive_extension:
                log.debug(
                    "Nested archive detected: %r - extracting as raw file,"
                    " not recursing.",
                    info.filename,
                )
        else:
            # Recursive path: stream to temp first, then content-detect
            tmp = dest.parent / (
                f"{dest.name}.safezip_tmp_{os.getpid()}_{os.urandom(4).hex()}"
            )
            try:
                try:
                    stream_extract_member(
                        self._zf,
                        info,
                        tmp,
                        max_file_size=self._max_file_size,
                        max_per_member_ratio=self._max_per_member_ratio,
                        max_total_size=self._max_total_size,
                        max_total_ratio=self._max_total_ratio,
                        counters=counters,
                        pwd=pwd,
                    )
                except FileSizeExceededError:
                    self._emit_event("file_size_exceeded")
                    log.warning(
                        "Member size limit exceeded during streaming",
                        extra={
                            "event": "file_size_exceeded",
                            "member": info.filename[:256],
                            "archive_hash": self._archive_hash,
                        },
                    )
                    raise
                except TotalSizeExceededError:
                    self._emit_event("total_size_exceeded")
                    log.warning(
                        "Cumulative extraction size limit exceeded during streaming",
                        extra={
                            "event": "total_size_exceeded",
                            "member": info.filename[:256],
                            "archive_hash": self._archive_hash,
                        },
                    )
                    raise
                except CompressionRatioError:
                    self._emit_event("compression_ratio_exceeded")
                    log.warning(
                        "Compression ratio limit exceeded during streaming",
                        extra={
                            "event": "compression_ratio_exceeded",
                            "member": info.filename[:256],
                            "archive_hash": self._archive_hash,
                        },
                    )
                    raise
                # Content-based detection (avoids extension-spoofing)
                if zipfile.is_zipfile(tmp):
                    nested_dest = dest.parent / _archive_stem(dest.name)
                    nested_dest.mkdir(parents=True, exist_ok=True)
                    with SafeZipFile(
                        tmp,
                        max_file_size=self._max_file_size,
                        max_total_size=self._max_total_size,
                        max_files=self._max_files,
                        max_per_member_ratio=self._max_per_member_ratio,
                        max_total_ratio=self._max_total_ratio,
                        max_nesting_depth=self._max_nesting_depth,
                        symlink_policy=self._symlink_policy,
                        password=self._password,
                        on_security_event=self._on_security_event,
                        recursive=True,
                        _nesting_depth=self._nesting_depth + 1,
                    ) as nested_zf:
                        nested_zf.extractall(nested_dest, pwd=pwd)
                    return nested_dest
                else:
                    # Not a ZIP — rename temp to final destination as a regular file
                    tmp.replace(dest)
                    return dest
            finally:
                tmp.unlink(missing_ok=True)

        # Stream-extract with all runtime monitors (Phase C)
        try:
            stream_extract_member(
                self._zf,
                info,
                dest,
                max_file_size=self._max_file_size,
                max_per_member_ratio=self._max_per_member_ratio,
                max_total_size=self._max_total_size,
                max_total_ratio=self._max_total_ratio,
                counters=counters,
                pwd=pwd,
            )
        except FileSizeExceededError:
            self._emit_event("file_size_exceeded")
            log.warning(
                "Member size limit exceeded during streaming",
                extra={
                    "event": "file_size_exceeded",
                    "member": info.filename[:256],
                    "archive_hash": self._archive_hash,
                },
            )
            raise
        except TotalSizeExceededError:
            self._emit_event("total_size_exceeded")
            log.warning(
                "Cumulative extraction size limit exceeded during streaming",
                extra={
                    "event": "total_size_exceeded",
                    "member": info.filename[:256],
                    "archive_hash": self._archive_hash,
                },
            )
            raise
        except CompressionRatioError:
            self._emit_event("compression_ratio_exceeded")
            log.warning(
                "Compression ratio limit exceeded during streaming",
                extra={
                    "event": "compression_ratio_exceeded",
                    "member": info.filename[:256],
                    "archive_hash": self._archive_hash,
                },
            )
            raise

        # Post-extraction permission sanitisation
        if not dest.is_symlink():
            _sanitise_mode(dest, strip_special_bits=self._strip_special_bits)

        # Post-extraction symlink check (RESOLVE_INTERNAL policy)
        if dest.is_symlink() and self._symlink_policy == SymlinkPolicy.RESOLVE_INTERNAL:
            skip = check_symlink(dest, base, self._symlink_policy)
            if skip:
                dest.unlink(missing_ok=True)

        return dest

    def _emit_event(self, event_type: str) -> None:
        """Emit a SecurityEvent to the configured callback (if any)."""
        if self._on_security_event is None:
            return
        event = SecurityEvent(
            event_type=event_type,
            archive_hash=self._archive_hash,
        )
        try:
            self._on_security_event(event)
        except Exception:
            log.exception(
                "on_security_event callback raised an exception "
                "(event_type=%r); suppressing to preserve security "
                "enforcement.",
                event_type,
            )


def safe_extract(
    archive: Union[str, os.PathLike, BinaryIO],
    destination: Union[str, os.PathLike],
    **kwargs,
) -> None:
    """
    Convenience func: extract *archive* to *destination* using safe defaults.

    All keyword arguments are forwarded to :class:`SafeZipFile`.

    :param archive: Path to the ZIP file, or a file-like binary object.
    :param destination: Directory to extract into.
    """
    with SafeZipFile(archive, **kwargs) as zf:
        zf.extractall(destination)

src/safezip/_events.py

src/safezip/_events.py
"""Security event types and symlink policy enum."""

import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Optional

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"
__all__ = (
    "SecurityEvent",
    "SymlinkPolicy",
    "SecurityEventCallback",
)


class SymlinkPolicy(Enum):
    """Controls how symlink members in archives are handled."""

    REJECT = "reject"
    """Any symlink entry raises UnsafeZipError (default)."""

    IGNORE = "ignore"
    """Symlink entries are silently skipped."""

    RESOLVE_INTERNAL = "resolve_internal"
    """Symlink entries are extracted as regular files containing the raw link-target
    bytes. No OS symlink is created on disk."""


@dataclass
class SecurityEvent:
    """Minimal, privacy-safe payload emitted to the on_security_event callback.

    Deliberately excludes filenames, paths, and member names so that
    forwarding this to a third-party service cannot leak confidential
    filesystem information.
    """

    event_type: str
    """Short string identifying what happened, e.g. 'zip_slip_detected'."""

    archive_hash: str
    """First 16 hex characters of the SHA-256 hash of the archive path/name."""

    timestamp: float = field(default_factory=time.time)
    """Wall-clock time at the moment of detection (time.time())."""


# Type alias for the optional callback
SecurityEventCallback = Optional[Callable[[SecurityEvent], None]]

src/safezip/_exceptions.py

src/safezip/_exceptions.py
"""Exception hierarchy for safezip."""

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"
__all__ = (
    "SafezipError",
    "UnsafeZipError",
    "FileSizeExceededError",
    "TotalSizeExceededError",
    "CompressionRatioError",
    "FileCountExceededError",
    "NestingDepthError",
    "MalformedArchiveError",
)


class SafezipError(Exception):
    """Base class for all safezip security exceptions."""


class UnsafeZipError(SafezipError):
    """Path traversal, absolute paths, or disallowed symlinks detected."""


class FileSizeExceededError(SafezipError):
    """A single member's decompressed size exceeds max_file_size."""


class TotalSizeExceededError(SafezipError):
    """Cumulative decompressed size across all members exceeds max_total_size."""


class CompressionRatioError(SafezipError):
    """Compression ratio exceeds the configured limit (per-member or total)."""


class FileCountExceededError(SafezipError):
    """Archive entry count exceeds max_files."""


class NestingDepthError(SafezipError):
    """Nested archive depth exceeds max_nesting_depth."""


class MalformedArchiveError(SafezipError):
    """Archive is structurally invalid (ZIP64 inconsistency, count mismatch, etc.)."""

src/safezip/_guard.py

src/safezip/_guard.py
"""Phase A: pre-extraction static validation (the Guard)."""

import logging
import mmap
import os
import struct
import tempfile
import zipfile
from contextlib import suppress
from dataclasses import dataclass, field
from typing import IO, BinaryIO, List, Optional, Tuple

from ._exceptions import (
    FileCountExceededError,
    FileSizeExceededError,
    MalformedArchiveError,
)

log = logging.getLogger("safezip.security")

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"
__all__ = ("validate_archive",)

_ZIP64_EXTRA_TAG = 0x0001
_ZIP64_SENTINEL = 0xFFFFFFFF


@dataclass
class ScanResult:
    """Three-valued outcome of inspecting a zip file for overlapping records."""

    is_bomb: Optional[bool]
    invalid_reason: Optional[str] = None
    overlap_detail: Optional[str] = None

    @classmethod
    def clean(cls) -> "ScanResult":
        return cls(is_bomb=False)

    @classmethod
    def bomb(cls, detail: str) -> "ScanResult":
        return cls(is_bomb=True, overlap_detail=detail)

    @classmethod
    def invalid(cls, reason: str) -> "ScanResult":
        return cls(is_bomb=None, invalid_reason=reason)


# ---------------------------------------------------------------------------
# Comprehensive Zip Bomb Detection (Fifield 2019)
# ---------------------------------------------------------------------------

ZIP64_EXTRA_ID = 0x0001
COMPRESS_STORED = 0
COMPRESS_DEFLATE = 8
COMPRESS_BZIP2 = 12
SENTINEL_32 = 0xFFFFFFFF
SENTINEL_16 = 0xFFFF


@dataclass
class Config:
    max_aggregate_ratio: float = 10000.0  # Very high; let Streamer handle ratio checks
    max_total_uncompressed_bytes: int = (
        10 * 1024**3
    )  # 10 GiB; above SafeZipFile default
    max_file_count: int = 100_000  # Above SafeZipFile default of 10_000
    max_deflate_ratio: float = 1_032.0
    max_bzip2_ratio: float = 1_434_375.0


@dataclass
class FileEntry:
    filename: str
    header_offset: int
    compressed_size: int
    uncompressed_size: int
    compress_type: int
    cdh_extra_len: int = 0
    lfh_extra_len: int = -1
    data_start: int = 0
    data_end: int = 0


@dataclass
class Issue:
    kind: str
    detail: str


@dataclass
class DetectionResult:
    is_bomb: bool = False
    issues: List[Issue] = field(default_factory=list)
    compression_ratio: float = 0.0
    total_uncompressed: int = 0
    file_count: int = 0
    zip_size: int = 0
    zip64: bool = False


def _find_eocd(mm: mmap.mmap, file_size: int) -> int:
    sig = b"PK\x05\x06"
    search_start = max(0, file_size - 65535 - 22)
    mm.seek(search_start)
    tail = mm.read(file_size - search_start)
    pos = tail.rfind(sig)
    return search_start + pos if pos != -1 else -1


def _read_eocd(mm: mmap.mmap, file_size: int) -> Tuple[int, int, bool]:
    eocd_pos = _find_eocd(mm, file_size)
    if eocd_pos == -1:
        raise ValueError("No End of Central Directory record found")

    mm.seek(eocd_pos)
    eocd = mm.read(22)
    if len(eocd) < 22:
        raise ValueError("Truncated EOCD")

    cd_count_16 = struct.unpack_from("<H", eocd, 8)[0]
    cd_offset_32 = struct.unpack_from("<I", eocd, 16)[0]

    if eocd_pos >= 20:
        mm.seek(eocd_pos - 20)
        locator = mm.read(20)
        if locator[:4] == b"PK\x06\x07":
            zip64_eocd_offset = struct.unpack_from("<Q", locator, 8)[0]
            mm.seek(zip64_eocd_offset)
            eocd64 = mm.read(56)
            if len(eocd64) >= 56 and eocd64[:4] == b"PK\x06\x06":
                cd_count_64 = struct.unpack_from("<Q", eocd64, 32)[0]
                cd_offset_64 = struct.unpack_from("<Q", eocd64, 48)[0]
                return cd_offset_64, cd_count_64, True

    return cd_offset_32, cd_count_16, False


def _parse_zip64_extra(extra_bytes: bytes) -> dict:
    result: dict = {}
    i = 0
    while i + 4 <= len(extra_bytes):
        hdr_id = struct.unpack_from("<H", extra_bytes, i)[0]
        data_len = struct.unpack_from("<H", extra_bytes, i + 2)[0]
        i += 4
        if hdr_id == ZIP64_EXTRA_ID:
            j = i
            if j + 8 <= i + data_len:
                result["uncompressed_size"] = struct.unpack_from("<Q", extra_bytes, j)[
                    0
                ]
                j += 8
            if j + 8 <= i + data_len:
                result["compressed_size"] = struct.unpack_from("<Q", extra_bytes, j)[0]
                j += 8
            if j + 8 <= i + data_len:
                result["header_offset"] = struct.unpack_from("<Q", extra_bytes, j)[0]
            break
        i += data_len
    return result


def parse_central_directory(
    mm: mmap.mmap, file_size: int
) -> Tuple[List[FileEntry], bool]:
    cd_offset, cd_count, is_zip64 = _read_eocd(mm, file_size)
    entries: List[FileEntry] = []

    mm.seek(cd_offset)
    cdh_sig = b"PK\x01\x02"

    for _ in range(cd_count):
        header = mm.read(46)
        if len(header) < 46:
            raise ValueError(
                f"Truncated central directory header: expected 46 bytes, "
                f"got {len(header)}"
            )
        if header[:4] != cdh_sig:
            raise ValueError(
                f"Invalid central directory header signature: "
                f"expected {cdh_sig!r}, got {header[:4]!r}"
            )

        compress_type = struct.unpack_from("<H", header, 10)[0]
        compressed_size32 = struct.unpack_from("<I", header, 20)[0]
        uncomp_size32 = struct.unpack_from("<I", header, 24)[0]
        fname_len = struct.unpack_from("<H", header, 28)[0]
        extra_len = struct.unpack_from("<H", header, 30)[0]
        comment_len = struct.unpack_from("<H", header, 32)[0]
        header_offset32 = struct.unpack_from("<I", header, 42)[0]

        fname_bytes = mm.read(fname_len)
        extra_bytes = mm.read(extra_len)
        mm.seek(comment_len, 1)

        filename = fname_bytes.decode("utf-8", errors="replace")

        z64 = _parse_zip64_extra(extra_bytes)

        compressed_size = z64.get("compressed_size", compressed_size32)
        uncompressed_size = z64.get("uncompressed_size", uncomp_size32)
        header_offset = z64.get("header_offset", header_offset32)

        if compressed_size32 == SENTINEL_32 and "compressed_size" not in z64:
            compressed_size = 0
        if uncomp_size32 == SENTINEL_32 and "uncompressed_size" not in z64:
            uncompressed_size = 0
        if header_offset32 == SENTINEL_32 and "header_offset" not in z64:
            header_offset = 0

        entries.append(
            FileEntry(
                filename=filename,
                header_offset=header_offset,
                compressed_size=compressed_size,
                uncompressed_size=uncompressed_size,
                compress_type=compress_type,
                cdh_extra_len=extra_len,
            )
        )

    return entries, is_zip64


LFH_FIXED = 30


def resolve_data_intervals(mm: mmap.mmap, entries: List[FileEntry]) -> None:
    lfh_sig = b"PK\x03\x04"
    file_size = mm.size()

    for e in entries:
        if e.header_offset + LFH_FIXED > file_size:
            e.data_start = e.header_offset
            e.data_end = e.header_offset + e.compressed_size
            continue

        mm.seek(e.header_offset)
        lfh = mm.read(LFH_FIXED)
        if len(lfh) < LFH_FIXED or lfh[:4] != lfh_sig:
            e.data_start = e.header_offset
            e.data_end = e.header_offset + e.compressed_size
            continue

        lfh_fname_len = struct.unpack_from("<H", lfh, 26)[0]
        lfh_extra_len = struct.unpack_from("<H", lfh, 28)[0]
        e.lfh_extra_len = lfh_extra_len

        e.data_start = e.header_offset + LFH_FIXED + lfh_fname_len + lfh_extra_len
        e.data_end = e.data_start + e.compressed_size


def check_overlapping_files(
    entries: List[FileEntry],
) -> List[Tuple[FileEntry, FileEntry]]:
    if not entries:
        return []

    sorted_e = sorted(entries, key=lambda e: e.data_start)
    overlaps: List[Tuple[FileEntry, FileEntry]] = []
    max_end = sorted_e[0].data_end
    max_end_entry = sorted_e[0]

    for e in sorted_e[1:]:
        if e.data_start < max_end:
            overlaps.append((max_end_entry, e))
        if e.data_end > max_end:
            max_end = e.data_end
            max_end_entry = e

    return overlaps


def check_extra_field_quoting(entries: List[FileEntry]) -> List[FileEntry]:
    if not entries:
        return []

    sorted_e = sorted(entries, key=lambda e: e.header_offset)
    suspicious: List[FileEntry] = []

    for i, e in enumerate(sorted_e[:-1]):
        next_e = sorted_e[i + 1]
        eff_extra = e.lfh_extra_len if e.lfh_extra_len >= 0 else e.cdh_extra_len
        if eff_extra > 0 and e.data_start >= next_e.header_offset:
            suspicious.append(e)

    return suspicious


def check_compression_ratios(
    entries: List[FileEntry], cfg: Config
) -> List[Tuple[FileEntry, float]]:
    suspicious = []
    for e in entries:
        if e.compressed_size <= 0:
            continue
        ratio = e.uncompressed_size / e.compressed_size
        limit = (
            cfg.max_bzip2_ratio
            if e.compress_type == COMPRESS_BZIP2
            else cfg.max_deflate_ratio
        )
        if ratio > limit:
            suspicious.append((e, ratio))
    return suspicious


def detect_zip_bomb(path: str, cfg: Optional[Config] = None) -> DetectionResult:
    if cfg is None:
        cfg = Config()

    zip_size = os.path.getsize(path)
    result = DetectionResult(is_bomb=False, zip_size=zip_size)

    with open(path, "rb") as f, mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
        try:
            entries, is_zip64 = parse_central_directory(mm, zip_size)
        except (ValueError, struct.error) as exc:
            result.issues.append(
                Issue("parse_error", f"Could not parse central directory: {exc}")
            )
            result.is_bomb = True
            return result

        result.zip64 = is_zip64
        result.file_count = len(entries)

        try:
            resolve_data_intervals(mm, entries)
        except Exception:
            for e in entries:
                if e.data_start == 0:
                    e.data_start = e.header_offset
                    e.data_end = e.header_offset + e.compressed_size

    overlaps = check_overlapping_files(entries)
    if overlaps:
        has_full = any(a.header_offset == b.header_offset for a, b in overlaps)
        kind = "full_overlap" if has_full else "quoted_overlap"
        sample = [(a.filename, b.filename) for a, b in overlaps[:3]]
        result.issues.append(
            Issue(
                kind,
                f"Overlapping file data detected ({len(overlaps)} pair(s)). "
                f"Sample: {sample}. "
                f"Matches Fifield "
                f"{'full-overlap' if has_full else 'quoted_overlap (or giant-steps)'} "
                f"construction.",
            )
        )
        result.is_bomb = True

    extra_q = check_extra_field_quoting(entries)
    if extra_q:
        names = [e.filename for e in extra_q[:3]]
        result.issues.append(
            Issue(
                "extra_field_quoting",
                f"Extra-field quoting detected in {len(extra_q)} file(s): {names}. "
                "LFH extra fields enclose subsequent local file headers.",
            )
        )
        result.is_bomb = True

    total_uncompressed = sum(e.uncompressed_size for e in entries)
    result.total_uncompressed = total_uncompressed
    overall_ratio = total_uncompressed / zip_size if zip_size > 0 else 0.0
    result.compression_ratio = overall_ratio

    if overall_ratio > cfg.max_aggregate_ratio:
        result.issues.append(
            Issue(
                "aggregate_ratio",
                f"Extreme aggregate compression ratio: {overall_ratio:,.0f}:1 "
                f"({total_uncompressed / 1e9:.2f} GiB uncompressed from "
                f"{zip_size / 1e6:.2f} MiB zip)",
            )
        )
        result.is_bomb = True

    if total_uncompressed > cfg.max_total_uncompressed_bytes:
        result.issues.append(
            Issue(
                "total_size",
                f"Total uncompressed size {total_uncompressed / 1e9:.2f} GiB "
                f"exceeds limit of {cfg.max_total_uncompressed_bytes / 1e9:.2f} GiB",
            )
        )
        result.is_bomb = True

    bad_ratios = check_compression_ratios(entries, cfg)
    if bad_ratios:
        worst_entry, worst_ratio = max(bad_ratios, key=lambda x: x[1])
        cname = {
            COMPRESS_STORED: "stored",
            COMPRESS_DEFLATE: "DEFLATE",
            COMPRESS_BZIP2: "bzip2",
        }.get(worst_entry.compress_type, str(worst_entry.compress_type))
        limit = (
            cfg.max_bzip2_ratio
            if worst_entry.compress_type == COMPRESS_BZIP2
            else cfg.max_deflate_ratio
        )
        result.issues.append(
            Issue(
                "per_file_ratio",
                f"File '{worst_entry.filename}' ({cname}) ratio {worst_ratio:,.0f}:1 "
                f"exceeds the {cname} theoretical maximum of {limit:,.0f}:1",
            )
        )
        result.is_bomb = True

    if result.file_count > cfg.max_file_count:
        result.issues.append(
            Issue(
                "file_count",
                f"Suspiciously high file count: {result.file_count:,} "
                f"(threshold {cfg.max_file_count:,})",
            )
        )
        result.is_bomb = True

    return result


class ZipInspector:
    """Parses a zip file's structural records and checks for overlapping spans.

    Based on the approach described in David Fifield's zip bomb research:
    https://www.bamsoftware.com/hacks/zipbomb/
    """

    _SEARCH_BLOCK = 8192

    def __init__(self, fileobj: BinaryIO, verbose: bool = False) -> None:
        self._fobj = fileobj
        self._verbose = verbose
        self._file_size: int = 0
        self._record_spans: list[tuple[int, int]] = []

    def scan(self) -> ScanResult:
        """Inspect the zip file and return a ScanResult."""
        self._fobj.seek(0, os.SEEK_END)
        self._file_size = self._fobj.tell()
        self._record_spans = []

        directory = self._locate_central_directory()
        if directory is None:
            return ScanResult.invalid("could not locate a valid central directory")

        num_entries, cd_byte_length, cd_offset = directory
        local_spans = self._walk_central_directory(
            num_entries, cd_byte_length, cd_offset
        )
        if local_spans is None:
            return ScanResult.invalid(
                "central directory parse error or unsupported feature"
            )

        return self._check_spans(local_spans)

    def _locate_central_directory(self) -> Optional[tuple[int, int, int]]:
        """Scan backwards through the file for a valid EOCD record."""
        block = self._SEARCH_BLOCK
        cursor = self._file_size
        readback = 22
        carry = b""
        check_count = 1

        while True:
            cursor -= readback
            if cursor < 0:
                return None

            self._fobj.seek(cursor, os.SEEK_SET)
            window = self._fobj.read(readback) + carry[:21]

            while check_count > 0:
                check_count -= 1
                if (
                    window[check_count] == 0x50
                    and window[check_count + 1] == 0x4B
                    and window[check_count + 2] == 0x05
                    and window[check_count + 3] == 0x06
                ):
                    result = self._validate_eocd(
                        window[check_count + 4 : check_count + 22],
                        cursor + check_count,
                    )
                    if result is not None:
                        return result

            carry = window
            readback = ((cursor - 1) & (block - 1)) + 1
            check_count = readback

    def _validate_eocd(
        self, eocd_body: bytes, eocd_offset: int
    ) -> Optional[tuple[int, int, int]]:
        """Validate the EOCD record and handle Zip64 when needed."""
        if len(eocd_body) < 18:
            return None

        raw = struct.unpack("<HHHHLLH", eocd_body)
        disk_num, cd_start_disk, *_, comment_len = raw

        if disk_num != 0 or cd_start_disk != 0:
            return None
        if eocd_offset + 22 + comment_len > self._file_size:
            return None

        spans_scratch: list[tuple[int, int]] = [
            (eocd_offset, eocd_offset + 22 + comment_len)
        ]

        entries_on_disk, total_entries, cd_length, cd_offset = raw[2:6]

        if (
            entries_on_disk == 0xFFFF
            or total_entries == 0xFFFF
            or cd_length == 0xFFFFFFFF
            or cd_offset == 0xFFFFFFFF
        ):
            z64 = self._read_zip64_records(eocd_offset, spans_scratch)
            if z64 is None:
                return None
            total_entries, cd_length, cd_offset = z64
        else:
            if total_entries != entries_on_disk:
                return None
            if cd_offset + cd_length > self._file_size:
                return None

        spans_scratch.append((cd_offset, cd_offset + cd_length))
        spans_scratch.reverse()
        self._record_spans.extend(spans_scratch)
        return (total_entries, cd_length, cd_offset)

    def _read_zip64_records(
        self,
        eocd_offset: int,
        spans_scratch: list[tuple[int, int]],
    ) -> Optional[tuple[int, int, int]]:
        """Read the Zip64 locator and record."""
        if eocd_offset < 20:
            return None

        self._fobj.seek(eocd_offset - 20, os.SEEK_SET)
        loc_sig, loc_disk, z64_eocd_offset, loc_total_disks = struct.unpack(
            "<LLQL", self._fobj.read(20)
        )
        if (
            loc_sig != 0x07064B50
            or loc_disk != 0
            or loc_total_disks != 1
            or z64_eocd_offset + 56 > self._file_size
        ):
            return None

        spans_scratch.append((eocd_offset - 20, eocd_offset))

        self._fobj.seek(z64_eocd_offset, os.SEEK_SET)
        z64 = struct.unpack("<LQHHLLQQQQ", self._fobj.read(56))
        z64_sig, z64_record_size, _, _, z64_disk, z64_cd_disk, *rest = z64
        if (
            z64_sig != 0x06064B50
            or z64_record_size < 44
            or z64_disk != 0
            or z64_cd_disk != 0
        ):
            return None

        spans_scratch.append((z64_eocd_offset, z64_eocd_offset + 12 + z64_record_size))

        total_entries, _, cd_length, cd_offset = rest
        return (total_entries, cd_length, cd_offset)

    def _walk_central_directory(
        self, num_entries: int, cd_byte_length: int, cd_offset: int
    ) -> Optional[list[tuple[int, int]]]:
        """Read every central directory header and resolve to local entry spans."""
        self._fobj.seek(cd_offset, os.SEEK_SET)
        cd_bytes = self._fobj.read(cd_byte_length)

        local_spans: list[tuple[int, int]] = []
        cursor = 0
        remaining = num_entries

        while remaining > 0:
            if cursor + 46 > cd_byte_length:
                return None

            span = self._parse_cdh_entry(cd_bytes, cursor, cd_byte_length)
            if span is None:
                return None

            entry_span, next_cursor = span
            local_spans.append(entry_span)
            cursor = next_cursor
            remaining -= 1

        if cursor != cd_byte_length:
            return None
        return local_spans

    def _parse_cdh_entry(
        self, cd_bytes: bytes, offset: int, cd_length: int
    ) -> Optional[tuple[tuple[int, int], int]]:
        """Parse one central directory header and return local span."""
        hdr = struct.unpack("<LHHHHHHLLLHHHHHLL", cd_bytes[offset : offset + 46])
        offset += 46

        if hdr[0] != 0x02014B50:
            return None

        fname_len, extra_len, comment_len = hdr[10], hdr[11], hdr[12]
        total_variable = fname_len + extra_len + comment_len
        if offset + total_variable > cd_length:
            return None

        compressed_size = hdr[8]
        uncompressed_size = hdr[9]
        disk_number = hdr[13]
        local_hdr_offset = hdr[16]

        if (
            compressed_size == 0xFFFFFFFF
            or uncompressed_size == 0xFFFFFFFF
            or disk_number == 0xFFFF
            or local_hdr_offset == 0xFFFFFFFF
        ):
            z64_result = self._resolve_zip64_cdh_fields(
                cd_bytes,
                offset + fname_len,
                offset + fname_len + extra_len,
                compressed_size,
                uncompressed_size,
                disk_number,
                local_hdr_offset,
            )
            if z64_result is None:
                return None
            (
                compressed_size,
                uncompressed_size,
                disk_number,
                local_hdr_offset,
            ) = z64_result
            offset += fname_len + extra_len + comment_len
        else:
            offset += total_variable

        if disk_number != 0:
            return None
        if local_hdr_offset + 30 > self._file_size:
            return None

        local_end = self._measure_local_entry(
            local_hdr_offset,
            compressed_size,
            uncompressed_size,
            hdr[7],
        )
        if local_end is None:
            return None

        return ((local_hdr_offset, local_end), offset)

    @staticmethod
    def _resolve_zip64_cdh_fields(
        cd_bytes: bytes,
        extra_start: int,
        extra_end: int,
        compressed_size: int,
        uncompressed_size: int,
        disk_number: int,
        local_hdr_offset: int,
    ) -> Optional[tuple[int, int, int, int]]:
        """Walk the extra field looking for Zip64 extended information block."""
        pos = extra_start
        while pos + 4 <= extra_end:
            field_id, field_data_len = struct.unpack("<HH", cd_bytes[pos : pos + 4])
            pos += 4
            if pos + field_data_len > extra_end:
                return None
            field_end = pos + field_data_len

            if field_id != 0x0001:
                pos = field_end
                continue

            if uncompressed_size == 0xFFFFFFFF:
                if pos + 8 > field_end:
                    return None
                uncompressed_size = struct.unpack("<Q", cd_bytes[pos : pos + 8])[0]
                pos += 8
            if compressed_size == 0xFFFFFFFF:
                if pos + 8 > field_end:
                    return None
                compressed_size = struct.unpack("<Q", cd_bytes[pos : pos + 8])[0]
                pos += 8
            if local_hdr_offset == 0xFFFFFFFF:
                if pos + 8 > field_end:
                    return None
                local_hdr_offset = struct.unpack("<Q", cd_bytes[pos : pos + 8])[0]
                pos += 8
            if disk_number == 0xFFFF:
                if pos + 4 > field_end:
                    return None
                disk_number = struct.unpack("<L", cd_bytes[pos : pos + 4])[0]
                pos += 4

            if pos != field_end:
                return None

            return (compressed_size, uncompressed_size, disk_number, local_hdr_offset)

        return None

    def _measure_local_entry(
        self,
        local_offset: int,
        compressed_size: int,
        uncompressed_size: int,
        expected_crc: int,
    ) -> Optional[int]:
        """Read the local file header and return the byte offset after this entry."""
        self._fobj.seek(local_offset, os.SEEK_SET)
        raw = self._fobj.read(30)
        if len(raw) < 30:
            return None

        lfh = struct.unpack("<LHHHHHLLLHH", raw)
        if lfh[0] != 0x04034B50:
            return None

        fname_len, extra_len = lfh[9], lfh[10]
        flags = lfh[2]

        entry_end = local_offset + 30 + fname_len + extra_len + compressed_size
        if entry_end > self._file_size:
            return None

        if flags & 0x08:
            descriptor_end = self._measure_data_descriptor(
                entry_end, expected_crc, compressed_size, uncompressed_size
            )
            if descriptor_end is None:
                return None
            entry_end = descriptor_end

        return entry_end

    def _measure_data_descriptor(
        self,
        descriptor_offset: int,
        expected_crc: int,
        compressed_size: int,
        uncompressed_size: int,
    ) -> Optional[int]:
        """Determine the extent of the optional data descriptor."""
        self._fobj.seek(descriptor_offset, os.SEEK_SET)
        raw = self._fobj.read(24)

        if len(raw) == 24:
            d = struct.unpack("<LLQQ", raw)
            if (
                d[0] == 0x08074B50
                and d[1] == expected_crc
                and d[2] == compressed_size
                and d[3] == uncompressed_size
            ):
                return descriptor_offset + 24

        if len(raw) >= 20:
            d = struct.unpack("<LQQ", raw[:20])
            if (
                d[0] == expected_crc
                and d[1] == compressed_size
                and d[2] == uncompressed_size
            ):
                return descriptor_offset + 20

        if len(raw) >= 16:
            d = struct.unpack("<LLLL", raw[:16])
            if (
                d[0] == 0x08074B50
                and d[1] == expected_crc
                and d[2] == compressed_size
                and d[3] == uncompressed_size
            ):
                return descriptor_offset + 16

        if len(raw) >= 12:
            d = struct.unpack("<LLL", raw[:12])
            if (
                d[0] == expected_crc
                and d[1] == compressed_size
                and d[2] == uncompressed_size
            ):
                return descriptor_offset + 12

        return None

    def _check_spans(self, local_spans: list[tuple[int, int]]) -> ScanResult:
        """Merge local entry spans with structural spans and scan for overlaps."""
        all_spans = local_spans + self._record_spans
        all_spans.sort()

        _, prev_end = all_spans[0]

        for span_start, span_end in all_spans[1:]:
            if prev_end > span_start:
                return ScanResult.bomb(
                    f"records overlap: previous ends at {prev_end}, "
                    f"next starts at {span_start}"
                )
            prev_end = span_end

        return ScanResult.clean()


def _run_overlap_detection(path: str, cfg: Optional[Config]) -> None:
    """Run detect_zip_bomb against a filesystem path and raise on positive."""
    try:
        result = detect_zip_bomb(path, cfg)
    except Exception as exc:
        raise MalformedArchiveError(
            f"Failed to parse archive for overlap detection: {exc}"
        ) from exc
    if result.is_bomb:
        details = "; ".join(i.detail for i in result.issues[:2])
        raise MalformedArchiveError(f"overlapping entries detected: {details}")


def _check_overlapping_entries(
    fileobj: IO[bytes], cfg: Optional[Config] = None
) -> None:
    """Detect Fifield-style zip bombs using comprehensive detection.

    This function uses `detect_zip_bomb()` to analyse the archive for overlapping
    entries, extra-field quoting, and other Fifield 2019 attack vectors.

    For in-memory BinaryIO objects without a filesystem path, the archive is
    spilled to a temporary file to enable mmap-based detection.

    :param fileobj: A seekable binary file object.
    :param cfg: Optional Config with limits. If not provided, uses defaults.
    :raises MalformedArchiveError: If overlapping entries are detected.
    """
    path = getattr(fileobj, "name", None)

    if path is not None:
        _run_overlap_detection(path, cfg)
        return

    # BinaryIO input: spill to a temporary file so mmap-based detection
    # can run. Save and restore position so the caller's zipfile.ZipFile
    # instance is not disturbed.
    try:
        pos = fileobj.tell()
    except OSError:
        pos = None
    try:
        try:
            fileobj.seek(0)
        except OSError:
            log.warning(
                "Skipping Fifield-style zip bomb detection: "
                "in-memory archive is not seekable."
            )
            return

        tmp_path = None
        try:
            with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:
                tmp_path = tmp.name
                tmp.write(fileobj.read())
            _run_overlap_detection(tmp_path, cfg)
        finally:
            if tmp_path is not None:
                with suppress(OSError):
                    os.unlink(tmp_path)
    finally:
        if pos is not None:
            with suppress(OSError):
                fileobj.seek(pos)


def _check_zip64_consistency(info: zipfile.ZipInfo) -> None:
    """Detect ZIP64 inconsistencies and missing ZIP64 blocks.

    Two classes of problem are caught:

    1. **Missing ZIP64 block**: A 32-bit field holds the sentinel value
       ``0xFFFFFFFF`` (meaning "look in ZIP64 extra field"), but no ZIP64
       extra field is present.  This is always a malformed archive.

    2. **Disagreeing ZIP64 block**: A ZIP64 extra field is present, but the
       64-bit value it reports differs from the size that Python's
       ``zipfile`` resolved from the central directory.  A crafted archive
       can set the 32-bit field to a small non-sentinel value while hiding a
       huge size in the ZIP64 block; Python uses the small 32-bit value, but
       we see the discrepancy and reject the archive.
    """
    if info.file_size == SENTINEL_32 or info.compress_size == SENTINEL_32:
        zip64 = _parse_zip64_extra(info.extra) if info.extra else {}
        if not zip64:
            raise MalformedArchiveError(
                f"Entry {info.filename!r} has a ZIP64 sentinel (0xFFFFFFFF) "
                f"in the 32-bit size field but no ZIP64 extra field is present. "
                f"Archive is malformed."
            )
        return

    if not info.extra:
        return
    zip64 = _parse_zip64_extra(info.extra)
    if not zip64:
        return

    if "uncompressed_size" in zip64 and zip64["uncompressed_size"] != info.file_size:
        raise MalformedArchiveError(
            f"ZIP64 inconsistency in entry {info.filename!r}: "
            f"extra field reports uncompressed_size="
            f"{zip64['uncompressed_size']}, "
            f"but central directory reports {info.file_size}. "
            f"Archive may be crafted."
        )

    if "compressed_size" in zip64 and zip64["compressed_size"] != info.compress_size:
        raise MalformedArchiveError(
            f"ZIP64 inconsistency in entry {info.filename!r}: "
            f"extra field reports compressed_size="
            f"{zip64['compressed_size']}, "
            f"but central directory reports {info.compress_size}. "
            f"Archive may be crafted."
        )


def _validate_entry(info: zipfile.ZipInfo, max_file_size: int) -> None:
    """Validate a single ZipInfo entry during the Guard phase."""
    # Null bytes in filename
    if "\x00" in info.filename:
        raise MalformedArchiveError(
            f"Entry filename contains a null byte: {info.filename!r}"
        )

    # ZIP64 consistency
    _check_zip64_consistency(info)

    # Declared size early-rejection (Streamer enforces at stream time too)
    if info.file_size > max_file_size:
        raise FileSizeExceededError(
            f"Entry {info.filename!r} declares uncompressed size "
            f"{info.file_size:,} bytes, which exceeds the limit of "
            f"{max_file_size:,} bytes."
        )


def validate_archive(
    zf: zipfile.ZipFile,
    max_files: int,
    max_file_size: int,
    max_total_size: int,
) -> None:
    """Phase A: run all pre-extraction static checks.

    :param zf: An open zipfile.ZipFile instance (read-only access).
    :param max_files: Maximum number of entries permitted.
    :param max_file_size: Maximum permitted uncompressed size for any entry.
    :param max_total_size: Maximum permitted total uncompressed size.
    :raises FileCountExceededError: If the archive has too many entries.
    :raises FileSizeExceededError: If any entry's declared size is too large.
    :raises MalformedArchiveError: If structural anomalies are detected.
    """
    try:
        entries = zf.infolist()
    except Exception as exc:
        raise MalformedArchiveError(f"Cannot read central directory: {exc}") from exc

    if len(entries) > max_files:
        raise FileCountExceededError(
            f"Archive contains {len(entries):,} entries, "
            f"which exceeds the limit of {max_files:,}."
        )

    if zf.fp is not None:
        cfg = Config(
            max_total_uncompressed_bytes=max_total_size,
            max_file_count=max_files,
        )
        _check_overlapping_entries(zf.fp, cfg)

    for info in entries:
        _validate_entry(info, max_file_size)

src/safezip/_sandbox.py

src/safezip/_sandbox.py
"""Phase B: path resolution and symlink policy enforcement (the Sandbox)."""

import unicodedata
from pathlib import Path

from ._events import SymlinkPolicy
from ._exceptions import UnsafeZipError

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"
__all__ = (
    "resolve_member_path",
    "check_symlink",
)

# Practical upper bound; real OS limits vary but 4096 is a safe conservative cap
_MAX_PATH_LENGTH = 4096


def resolve_member_path(
    base: Path,
    member_filename: str,
) -> Path:
    """Resolve and validate a ZIP member filename against *base*.

    Applies the full path normalisation pipeline:

    1. Unicode NFC normalisation (catch lookalike characters).
    2. Null-byte rejection.
    3. Reject absolute Unix paths (starting with ``/``) and absolute Windows
       paths (drive letter + slash, e.g. ``C:/``).
    4. Reject any ``..`` path component.
    5. Verify the resolved path is inside *base*.
    6. Reject paths whose resolved length exceeds ``_MAX_PATH_LENGTH``.

    :param base: The extraction root directory (must be absolute).
    :param member_filename: Raw filename string from the ZIP central directory.
    :returns: Resolved absolute Path inside *base*.
    :raises UnsafeZipError: If the filename is unsafe for any reason.
    """
    # 1. Unicode NFC normalisation
    try:
        normalized = unicodedata.normalize("NFC", member_filename)
    except (TypeError, ValueError) as err:
        raise UnsafeZipError(f"Cannot normalise filename: {member_filename!r}") from err

    # 2. Null-byte rejection
    if "\x00" in normalized:
        raise UnsafeZipError(f"Filename contains a null byte: {normalized!r}")

    # 3. Normalise separators
    _norm = normalized.replace("\\", "/")

    # Reject absolute Unix paths and UNC paths (start with '/')
    if _norm.startswith("/"):
        raise UnsafeZipError(f"Absolute path detected in filename: {member_filename!r}")

    # Reject absolute Windows paths with drive letters (e.g. "C:/Windows")
    if len(_norm) >= 3 and _norm[1] == ":" and _norm[2] == "/" and _norm[0].isalpha():
        raise UnsafeZipError(
            f"Absolute Windows path detected in filename: {member_filename!r}"
        )

    parts = _norm.split("/")

    # Strip Windows-style relative drive
    # references (e.g. "C:relpath") - defence-in-depth
    clean_parts = []
    for part in parts:
        # Skip empty parts (double-slashes) and current-dir dots
        if not part or part == ".":
            continue
        # Reject parent-directory traversal
        if part == "..":
            raise UnsafeZipError(
                f"Path traversal detected in filename: {member_filename!r}"
            )
        # Strip Windows-style relative drive
        # references (e.g. "C:relpath" → "relpath")
        if len(part) >= 2 and part[1] == ":" and part[0].isalpha():
            part = part[2:]
            if not part:
                continue
        clean_parts.append(part)

    if not clean_parts:
        raise UnsafeZipError(f"Filename resolves to empty path: {member_filename!r}")

    # 4. Build the resolved path
    resolved = base
    for part in clean_parts:
        resolved = resolved / part

    # 5. Confirm the resolved path is inside base
    try:
        resolved.relative_to(base)
    except ValueError as err:
        raise UnsafeZipError(
            f"Resolved path escapes base directory: {resolved!r} is not under {base!r}"
        ) from err

    # 6. Path length check
    if len(str(resolved)) > _MAX_PATH_LENGTH:
        raise UnsafeZipError(
            f"Resolved path is too long ({len(str(resolved))} chars): "
            f"{str(resolved)[:120]!r}..."
        )

    return resolved


def check_symlink(
    extracted_path: Path,
    base: Path,
    policy: SymlinkPolicy,
) -> bool:
    """
    Check whether *extracted_path* is (or contains) a symlink, & apply policy.

    :param extracted_path: The path that was just extracted.
    :param base: The extraction root directory.
    :param policy: The configured symlink policy.
    :returns: ``True`` if the member should be skipped (IGNORE policy).
    :raises UnsafeZipError: If REJECT policy or chain exits base directory.
    """
    if not extracted_path.is_symlink():
        return False

    if policy == SymlinkPolicy.REJECT:
        raise UnsafeZipError(
            f"Symlink detected and symlink_policy is REJECT: {extracted_path}"
        )

    if policy == SymlinkPolicy.IGNORE:
        return True  # caller should skip this member

    # RESOLVE_INTERNAL: follow the full chain and verify every hop
    _verify_symlink_chain(extracted_path, base)
    return False


def _verify_symlink_chain(link_path: Path, base: Path) -> None:
    """Verify the full symlink chain from *link_path* stays inside *base*.

    Follows every link until a non-symlink is reached or an escape is detected.

    :raises UnsafeZipError: If any link in the chain exits *base*.
    """
    visited = set()
    current = link_path

    while current.is_symlink():
        real = str(current.resolve())
        if real in visited:
            # Cycle detected; treat as unsafe
            raise UnsafeZipError(
                f"Symlink cycle detected at {current}: refusing to follow further."
            )
        visited.add(real)

        try:
            current.resolve().relative_to(base.resolve())
        except ValueError as err:
            raise UnsafeZipError(
                f"Symlink chain for {link_path} exits the base directory "
                f"at {current}{current.resolve()}"
            ) from err
        current = current.resolve()

src/safezip/_streamer.py

src/safezip/_streamer.py
"""Phase C: streaming extraction with runtime enforcement (the Streamer)."""

import contextlib
import logging
import os
import zipfile
from pathlib import Path
from typing import Optional

from ._exceptions import (
    CompressionRatioError,
    FileSizeExceededError,
    TotalSizeExceededError,
)

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"
__all__ = (
    "stream_extract_member",
    "CumulativeCounters",
)

log = logging.getLogger("safezip.security")

_CHUNK_SIZE = 65_536  # 64 KiB


class CumulativeCounters:
    """Tracks totals across all members in a single extractall/extract call."""

    __slots__ = ("bytes_written", "compressed_bytes")

    def __init__(self) -> None:
        self.bytes_written: int = 0
        self.compressed_bytes: int = 0


def stream_extract_member(
    zf: zipfile.ZipFile,
    member: zipfile.ZipInfo,
    dest: Path,
    *,
    max_file_size: int,
    max_per_member_ratio: float,
    max_total_size: int,
    max_total_ratio: float,
    counters: CumulativeCounters,
    pwd: Optional[bytes] = None,
) -> None:
    """
    Stream a single member from *zf* to *dest* with full runtime enforcement.

    Extraction is atomic: bytes are written to a temporary file and renamed to
    *dest* only after all checks pass.  If any check raises, the temporary file
    is deleted and *dest* is never created/modified.

    :param zf: Open zipfile.ZipFile instance (internal use only).
    :param member: The ZipInfo entry to extract.
    :param dest: Final destination path (must already be path-validated).
    :param max_file_size: Per-member decompressed size limit in bytes.
    :param max_per_member_ratio: Per-member decompressed/compressed ratio
           limit.
    :param max_total_size: Cumulative decompressed size limit across all
           members.
    :param max_total_ratio: Cumulative ratio limit across all members.
    :param counters: Shared counters for cumulative checks.
    :param pwd: Optional decryption password.
    """
    dest.parent.mkdir(parents=True, exist_ok=True)

    tmp_name = f"{dest.name}.safezip_tmp_{os.getpid()}_{os.urandom(4).hex()}"
    tmp_path = dest.parent / tmp_name

    # compress_size may be 0 for data-descriptor archives
    compress_size = member.compress_size
    member_bytes_written = 0

    try:
        with zf.open(member, pwd=pwd) as src, open(tmp_path, "wb") as dst:
            while True:
                chunk = src.read(_CHUNK_SIZE)
                if not chunk:
                    break

                chunk_len = len(chunk)
                member_bytes_written += chunk_len
                counters.bytes_written += chunk_len

                # --- Per-member size check ---
                if member_bytes_written > max_file_size:
                    raise FileSizeExceededError(
                        f"Member {member.filename!r} exceeded max_file_size="
                        f"{max_file_size:,} bytes "
                        f"(decompressed {member_bytes_written:,} bytes so "
                        "far)."
                    )

                # --- Per-member ratio check ---
                # Only when compress_size is known (not a data-descriptor
                # entry).
                if compress_size > 0:
                    ratio = member_bytes_written / compress_size
                    if ratio > max_per_member_ratio:
                        raise CompressionRatioError(
                            f"Member {member.filename!r} compression ratio "
                            f"{ratio:.1f}:1 exceeds "
                            f"max_per_member_ratio={max_per_member_ratio}:1."
                        )

                # --- Cumulative size check ---
                if counters.bytes_written > max_total_size:
                    raise TotalSizeExceededError(
                        f"Cumulative decompressed size "
                        f"{counters.bytes_written:,} bytes exceeds "
                        f"max_total_size={max_total_size:,} bytes."
                    )

                # --- Cumulative ratio check ---
                # Update compressed bytes estimate from the running member.
                if compress_size > 0:
                    counters.compressed_bytes += (
                        chunk_len * compress_size // max(member.file_size, 1)
                    )
                if counters.compressed_bytes > 0:
                    total_ratio = counters.bytes_written / counters.compressed_bytes  # noqa
                    if total_ratio > max_total_ratio:
                        raise CompressionRatioError(
                            f"Cumulative compression ratio {total_ratio:.1f}:1 "
                            f"exceeds max_total_ratio={max_total_ratio}:1."
                        )

                dst.write(chunk)

        # All checks passed - atomic rename to final destination
        tmp_path.replace(dest)

    except Exception:
        # Clean up partial / temporary file on any failure
        with contextlib.suppress(OSError):
            tmp_path.unlink(missing_ok=True)
        raise

src/safezip/cli/__init__.py

src/safezip/cli/__init__.py
"""safezip.cli — command-line interface for safezip."""

from ._main import main

__all__ = ("main",)

src/safezip/cli/_main.py

src/safezip/cli/_main.py
"""safezip CLI — hardened ZIP extraction from the command line."""

import argparse
import sys
import zipfile
from pathlib import Path

from safezip import SafeZipFile, SymlinkPolicy, safe_extract
from safezip._exceptions import SafezipError

__all__ = ("main",)

_SYMLINK_POLICIES = {
    "reject": SymlinkPolicy.REJECT,
    "ignore": SymlinkPolicy.IGNORE,
    "resolve_internal": SymlinkPolicy.RESOLVE_INTERNAL,
}


def _build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        prog="safezip",
        description="Hardened ZIP extraction — safe by default.",
    )
    parser.add_argument(
        "--version",
        action="version",
        version=f"%(prog)s {_version()}",
    )

    sub = parser.add_subparsers(dest="command", required=True)

    # ------------------------------------------------------------------ extract
    ext = sub.add_parser("extract", help="Extract a ZIP archive safely.")
    ext.add_argument("archive", help="Path to the ZIP file.")
    ext.add_argument("destination", help="Directory to extract into.")
    ext.add_argument(
        "--max-file-size",
        type=int,
        metavar="BYTES",
        help="Max uncompressed size per member (default: 1 GiB).",
    )
    ext.add_argument(
        "--max-total-size",
        type=int,
        metavar="BYTES",
        help="Max total uncompressed size (default: 5 GiB).",
    )
    ext.add_argument(
        "--max-files",
        type=int,
        metavar="N",
        help="Max number of members (default: 10 000).",
    )
    ext.add_argument(
        "--max-per-member-ratio",
        type=float,
        metavar="RATIO",
        help="Max compression ratio per member (default: 200).",
    )
    ext.add_argument(
        "--max-total-ratio",
        type=float,
        metavar="RATIO",
        help="Max overall compression ratio (default: 200).",
    )
    ext.add_argument(
        "--max-nesting-depth",
        type=int,
        metavar="N",
        help="Max nested-archive depth (default: 3).",
    )
    ext.add_argument(
        "--symlink-policy",
        choices=list(_SYMLINK_POLICIES),
        default=None,
        metavar="POLICY",
        help=(
            "How to handle symlink entries: reject (default), ignore, resolve_internal."
        ),
    )
    ext.add_argument(
        "--password",
        metavar="PWD",
        help="Decryption password for encrypted archives.",
    )
    ext.add_argument(
        "--recursive",
        action="store_true",
        default=False,
        help="Enable recursive extraction of nested archives.",
    )

    # --------------------------------------------------------------------- list
    lst = sub.add_parser("list", help="List members of a ZIP archive.")
    lst.add_argument("archive", help="Path to the ZIP file.")

    return parser


def _version() -> str:
    try:
        from safezip import __version__

        return __version__
    except ImportError:
        return "unknown"


def _cmd_extract(args: argparse.Namespace) -> int:
    kwargs: dict = {}

    for attr in (
        "max_file_size",
        "max_total_size",
        "max_files",
        "max_per_member_ratio",
        "max_total_ratio",
        "max_nesting_depth",
        "recursive",
    ):
        val = getattr(args, attr, None)
        if val is not None:
            kwargs[attr] = val

    if args.symlink_policy is not None:
        kwargs["symlink_policy"] = _SYMLINK_POLICIES[args.symlink_policy]

    if args.password is not None:
        kwargs["password"] = args.password.encode()

    dest = Path(args.destination)
    dest.mkdir(parents=True, exist_ok=True)

    try:
        safe_extract(args.archive, dest, **kwargs)
    except SafezipError as exc:
        print(f"error: {exc}", file=sys.stderr)
        return 1
    except FileNotFoundError as exc:
        print(f"error: {exc}", file=sys.stderr)
        return 1
    except zipfile.BadZipFile as exc:
        print(f"error: {exc}", file=sys.stderr)
        return 1

    print(f"Extracted to {dest.resolve()}")
    return 0


def _cmd_list(args: argparse.Namespace) -> int:
    try:
        with SafeZipFile(args.archive) as zf:
            for name in zf.namelist():
                print(name)
    except SafezipError as exc:
        print(f"error: {exc}", file=sys.stderr)
        return 1
    except FileNotFoundError as exc:
        print(f"error: {exc}", file=sys.stderr)
        return 1
    except zipfile.BadZipFile as exc:
        print(f"error: {exc}", file=sys.stderr)
        return 1

    return 0


def main() -> None:
    parser = _build_parser()
    args = parser.parse_args()

    if args.command == "extract":
        sys.exit(_cmd_extract(args))
    elif args.command == "list":
        sys.exit(_cmd_list(args))
    else:  # pragma: no cover
        parser.print_help()
        sys.exit(1)

src/safezip/tests/__init__.py

src/safezip/tests/__init__.py
"""Tests for safezip."""

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"

src/safezip/tests/conftest.py

src/safezip/tests/conftest.py
"""Pytest fixtures: factory functions that craft malicious ZIP archives."""

import io
import stat
import struct
import zipfile
import zlib

import pytest

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"
__all__ = (
    "zipslip_archive",
    "absolute_path_archive",
    "unicode_traversal_archive",
    "high_ratio_archive",
    "many_files_archive",
    "null_byte_filename_archive",
    "zip64_inconsistency_archive",
    "legitimate_archive",
    "symlink_archive",
    "fifield_bomb_archive",
)


# ---------------------------------------------------------------------------
# Archive factory helpers
# ---------------------------------------------------------------------------


def _make_zip_bytes(entries: list[tuple[str, bytes]]) -> bytes:
    """Create a ZIP in memory from (filename, content) pairs."""
    buf = io.BytesIO()
    with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        for name, content in entries:
            info = zipfile.ZipInfo(name)
            zf.writestr(info, content)
    return buf.getvalue()


def _make_zip_bytes_stored(entries: list[tuple[str, bytes]]) -> bytes:
    """Create a stored (uncompressed) ZIP in memory."""
    buf = io.BytesIO()
    with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_STORED) as zf:
        for name, content in entries:
            info = zipfile.ZipInfo(name)
            zf.writestr(info, content)
    return buf.getvalue()


# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------


@pytest.fixture()
def zipslip_archive(tmp_path):
    """A ZIP whose sole entry has a path-traversal filename."""
    data = _make_zip_bytes([("../../evil.txt", b"evil content")])
    p = tmp_path / "zipslip.zip"
    p.write_bytes(data)
    return p


@pytest.fixture()
def absolute_path_archive(tmp_path):
    """A ZIP with an absolute Unix-style path entry."""
    buf = io.BytesIO()
    with zipfile.ZipFile(buf, "w") as zf:
        info = zipfile.ZipInfo("/etc/passwd")
        zf.writestr(info, "root:x:0:0:root:/root:/bin/bash\n")
    data = buf.getvalue()
    p = tmp_path / "absolute.zip"
    p.write_bytes(data)
    return p


@pytest.fixture()
def unicode_traversal_archive(tmp_path):
    """A ZIP with combining Unicode characters that NFC-normalises to a path
    still containing a ``..`` traversal component.

    The filename ``e\\u0301vil/../../escape.txt`` uses U+0301 COMBINING ACUTE
    ACCENT (NFD form of ``é``).  After Unicode NFC normalisation the combining
    accent is folded into the precomposed ``é``, yielding
    ``évil/../../escape.txt``.  The ``..`` components are unaffected by NFC
    and must still be detected and rejected.
    """
    # e + COMBINING ACUTE ACCENT → é after NFC; the traversal stays intact
    data = _make_zip_bytes([("e\u0301vil/../../escape.txt", b"escaped")])
    p = tmp_path / "unicode_traversal.zip"
    p.write_bytes(data)
    return p


@pytest.fixture()
def high_ratio_archive(tmp_path):
    """A ZIP whose content compresses at a very high ratio (zeros)."""
    # 2 MiB of zeros → compressed to ~2 KB → ratio ~1000:1
    data_bytes = b"\x00" * (2 * 1024 * 1024)
    buf = io.BytesIO()
    with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        zf.writestr("zeros.bin", data_bytes)
    p = tmp_path / "bomb.zip"
    p.write_bytes(buf.getvalue())
    return p


@pytest.fixture()
def many_files_archive(tmp_path):
    """A ZIP with more entries than the default max_files limit allows."""
    buf = io.BytesIO()
    with zipfile.ZipFile(buf, "w") as zf:
        for i in range(15_000):
            zf.writestr(f"file_{i:05d}.txt", b"x")
    p = tmp_path / "many_files.zip"
    p.write_bytes(buf.getvalue())
    return p


@pytest.fixture()
def null_byte_filename_archive(tmp_path):
    """A ZIP with a null byte injected into a filename via raw struct manipulation.

    Python's zipfile won't let us write such names directly, so we craft the
    raw bytes: a minimal ZIP with one entry whose filename contains \\x00.
    """
    # Minimal ZIP structure:
    # Local file header + file data + central directory + end of central directory
    filename = b"safe\x00../../etc/passwd"
    fname_len = len(filename)
    content = b"evil"
    content_len = len(content)

    # Local file header (signature 0x04034b50)
    local_header = (
        struct.pack(
            "<4s2H3H4s2I2H",
            b"PK\x03\x04",  # signature
            20,  # version needed
            0,  # flags
            0,  # compression (stored)
            0,  # mod time
            0,  # mod date
            b"\x00\x00\x00\x00",  # CRC-32
            content_len,  # compressed size
            content_len,  # uncompressed size
            fname_len,  # filename length
            0,  # extra field length
        )
        + filename
        + content
    )

    local_offset = 0

    # Central directory header (signature 0x02014b50)
    # Format: 4s sig | 6H (ver_made,ver_needed,flags,compress,mod_time,mod_date) |
    #         4s CRC | 2I (comp_size,uncomp_size) |
    #         5H (fname_len,extra_len,comment_len,disk_start,int_attr) |
    #         2I (ext_attr, offset)  → 17 items, 46 bytes
    central_header = (
        struct.pack(
            "<4s6H4s2I5H2I",
            b"PK\x01\x02",  # signature
            0x031E,  # version made by (Unix, v30)
            20,  # version needed
            0,  # flags
            0,  # compression
            0,  # mod time
            0,  # mod date
            b"\x00\x00\x00\x00",  # CRC-32
            content_len,  # compressed size (I)
            content_len,  # uncompressed size (I)
            fname_len,  # filename length
            0,  # extra field length
            0,  # file comment length
            0,  # disk number start
            0,  # internal file attributes
            0,  # external file attributes (I)
            local_offset,  # relative offset of local header (I)
        )
        + filename
    )

    central_offset = len(local_header)
    central_size = len(central_header)

    # End of central directory record (signature 0x06054b50)
    eocd = struct.pack(
        "<4s4H2IH",
        b"PK\x05\x06",  # signature
        0,  # disk number
        0,  # disk with central dir
        1,  # entries on this disk
        1,  # total entries
        central_size,  # size of central directory
        central_offset,  # offset of central directory
        0,  # comment length
    )

    data = local_header + central_header + eocd
    p = tmp_path / "nullbyte.zip"
    p.write_bytes(data)
    return p


@pytest.fixture()
def zip64_inconsistency_archive(tmp_path):
    """A ZIP with a ZIP64 extra field that disagrees with the central directory.

    We craft a minimal archive where the ZIP64 extra field reports a size of
    999_999_999 bytes but the 32-bit central directory field reports 100 bytes.
    Python will use the 32-bit value (100), but our ZIP64 check sees 999_999_999
    and raises MalformedArchiveError.
    """
    filename = b"test.txt"
    fname_len = len(filename)
    content = b"A" * 100

    # ZIP64 extra field reporting a huge uncompressed size
    zip64_uncompressed = 999_999_999
    zip64_extra = struct.pack(
        "<HHQ",
        0x0001,  # ZIP64 tag
        8,  # size of following data (8 bytes = one uint64)
        zip64_uncompressed,  # uncompressed size (disagrees with 32-bit field below)
    )
    extra_len = len(zip64_extra)

    # Local file header - 32-bit uncompressed size = 100 (not sentinel)
    local_header = (
        struct.pack(
            "<4s2H3H4s2I2H",
            b"PK\x03\x04",
            20,
            0,
            0,
            0,
            0,
            b"\x00\x00\x00\x00",
            len(content),
            len(content),  # 32-bit uncompressed size = 100
            fname_len,
            extra_len,
        )
        + filename
        + zip64_extra
        + content
    )

    local_offset = 0

    # Central directory header - 32-bit uncompressed size = 100 (not sentinel)
    # Format: 4s | 6H | 4s CRC | 2I (comp,uncomp) | 5H | 2I → 17 items, 46 bytes
    central_header = (
        struct.pack(
            "<4s6H4s2I5H2I",
            b"PK\x01\x02",
            0x031E,
            20,
            0,
            0,
            0,
            0,
            b"\x00\x00\x00\x00",
            len(content),  # compressed size (I)
            len(content),  # 32-bit uncompressed size = 100 (I, not sentinel)
            fname_len,
            extra_len,
            0,  # comment length
            0,  # disk number start
            0,  # internal attributes
            0,  # external attributes (I)
            local_offset,  # offset of local header (I)
        )
        + filename
        + zip64_extra
    )

    central_offset = len(local_header)
    central_size = len(central_header)

    eocd = struct.pack(
        "<4s4H2IH",
        b"PK\x05\x06",
        0,
        0,
        1,
        1,
        central_size,
        central_offset,
        0,
    )

    data = local_header + central_header + eocd
    p = tmp_path / "zip64_inconsistency.zip"
    p.write_bytes(data)
    return p


@pytest.fixture()
def legitimate_archive(tmp_path):
    """A well-formed, safe archive with a few text files."""
    buf = io.BytesIO()
    with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        zf.writestr("hello.txt", b"Hello, world!\n")
        zf.writestr("subdir/data.txt", b"Some data\n")
        zf.writestr("subdir/nested/deep.txt", b"Deep file\n")
    p = tmp_path / "legitimate.zip"
    p.write_bytes(buf.getvalue())
    return p


@pytest.fixture()
def symlink_archive(tmp_path):
    """A ZIP containing one regular file and one Unix symlink entry.

    The symlink entry's content (the link target) is ``../escape.txt``,
    which would point outside the extraction root if followed blindly.
    The entry is flagged as a symlink via the upper 16 bits of
    ``ZipInfo.external_attr`` (Unix mode ``S_IFLNK | 0o755``).
    """
    buf = io.BytesIO()
    with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_STORED) as zf:
        # A harmless regular file that must always be extractable
        zf.writestr("readme.txt", b"safe content\n")
        # Symlink entry: mode S_IFLNK | 0o755, content = link target
        sym = zipfile.ZipInfo("link.txt")
        sym.external_attr = (stat.S_IFLNK | 0o755) << 16
        zf.writestr(sym, "../escape.txt")
    p = tmp_path / "symlink.zip"
    p.write_bytes(buf.getvalue())
    return p


@pytest.fixture()
def setuid_archive(tmp_path):
    """A ZIP with a regular file that has setuid bit (04755) in external_attr."""
    buf = io.BytesIO()
    with zipfile.ZipFile(buf, "w") as zf:
        info = zipfile.ZipInfo("suid_binary")
        info.external_attr = (0o4755 & 0xFFFF) << 16
        zf.writestr(info, b"ELF\x00")
    p = tmp_path / "setuid.zip"
    p.write_bytes(buf.getvalue())
    return p


@pytest.fixture()
def data_descriptor_empty_archive(tmp_path):
    """Valid ZIP with empty member using data descriptor (compress_size=0)."""
    comp_data = b""
    comp_size = 0
    uncomp_size = 0
    crc = 0

    filename = b"empty.txt"
    fname_len = len(filename)

    # Local header: sizes=0, flags=0x08, method=0 (stored, since empty)
    local_header = (
        struct.pack(
            "<4sHHHHHIIIHH",
            b"PK\x03\x04",
            20,
            0x08,
            0,  # stored
            0,
            0,
            0,
            0,
            0,
            fname_len,
            0,
        )
        + filename
    )

    # Data descriptor
    descriptor = struct.pack("<4sIII", b"PK\x07\x08", crc, comp_size, uncomp_size)

    local_with_desc = local_header + comp_data + descriptor

    # Central header: sizes=0, flags=0x08
    central_header = (
        struct.pack(
            "<4sHHHHHHIIIHHHHHII",
            b"PK\x01\x02",
            0x0314,
            20,
            0x08,
            0,
            0,
            0,
            crc,
            comp_size,
            uncomp_size,
            fname_len,
            0,
            0,
            0,
            0,
            0,
            0,
        )
        + filename
    )

    cd_offset = len(local_with_desc)
    cd_size = len(central_header)
    eocd = struct.pack("<4sHHHHIIH", b"PK\x05\x06", 0, 0, 1, 1, cd_size, cd_offset, 0)

    archive_bytes = local_with_desc + central_header + eocd
    p = tmp_path / "dd_empty.zip"
    p.write_bytes(archive_bytes)
    return p


@pytest.fixture()
def data_descriptor_invalid_bomb_archive(tmp_path):
    """
    Invalid ZIP with non-empty member, data descriptor, but CD compress_size=0.
    """
    uncomp_data = b"\x00" * 2000
    compressor = zlib.compressobj(
        zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS
    )
    comp_data = compressor.compress(uncomp_data) + compressor.flush()
    comp_size = len(comp_data)
    uncomp_size = len(uncomp_data)
    crc = zlib.crc32(uncomp_data)

    filename = b"bomb.txt"
    fname_len = len(filename)

    # Local header: sizes=0, flags=0x08, method=8 (deflate)
    local_header = (
        struct.pack(
            "<4sHHHHHIIIHH",
            b"PK\x03\x04",
            20,
            0x08,
            8,
            0,
            0,
            0,
            0,
            0,
            fname_len,
            0,
        )
        + filename
    )

    # Data descriptor with real sizes
    descriptor = struct.pack("<4sIII", b"PK\x07\x08", crc, comp_size, uncomp_size)

    local_with_desc = local_header + comp_data + descriptor

    # Central header: compress_size=0 (invalid mismatch), uncomp_size=real
    central_header = (
        struct.pack(
            "<4sHHHHHHIIIHHHHHII",
            b"PK\x01\x02",
            0x0314,
            20,
            0x08,
            8,
            0,
            0,
            crc,
            0,  # invalid comp_size=0
            uncomp_size,
            fname_len,
            0,
            0,
            0,
            0,
            0,
            0,
        )
        + filename
    )

    cd_offset = len(local_with_desc)
    cd_size = len(central_header)
    eocd = struct.pack("<4sHHHHIIH", b"PK\x05\x06", 0, 0, 1, 1, cd_size, cd_offset, 0)

    archive_bytes = local_with_desc + central_header + eocd
    p = tmp_path / "dd_invalid_bomb.zip"
    p.write_bytes(archive_bytes)
    return p


@pytest.fixture()
def fifield_bomb_archive(tmp_path):
    """A Fifield-style zip bomb: multiple central directory entries all pointing
    to the same compressed local entry (overlapping spans).

    Structure:
      - One real local entry at offset 0 containing 200 bytes of zeros.
      - Central directory with 3 entries, all with local_header_offset=0,
        so their spans all overlap the single local entry.

    This is structurally invalid per the ZIP specification and should be
    detected and rejected by _check_overlapping_entries before any
    decompression occurs.
    """
    content = b"\x00" * 200
    compressed = zlib.compress(content)[2:-4]
    crc = zlib.crc32(content) & 0xFFFFFFFF
    comp_size = len(compressed)
    uncomp_size = len(content)

    fname = b"data.bin"
    fname_len = len(fname)

    local_header = (
        struct.pack(
            "<4s2H3H4s2I2H",
            b"PK\x03\x04",
            20,
            0,
            8,
            0,
            0,
            struct.pack("<I", crc),
            comp_size,
            uncomp_size,
            fname_len,
            0,
        )
        + fname
        + compressed
    )

    local_offset = 0

    def make_cd_entry(offset):
        return (
            struct.pack(
                "<4s6H4s2I5H2I",
                b"PK\x01\x02",
                0x031E,
                20,
                0,
                8,
                0,
                0,
                struct.pack("<I", crc),
                comp_size,
                uncomp_size,
                fname_len,
                0,
                0,
                0,
                0,
                0,
                offset,
            )
            + fname
        )

    cd = make_cd_entry(local_offset) * 3
    cd_offset = len(local_header)
    cd_size = len(cd)

    eocd = struct.pack(
        "<4s4H2IH",
        b"PK\x05\x06",
        0,
        0,
        3,
        3,
        cd_size,
        cd_offset,
        0,
    )

    data = local_header + cd + eocd
    p = tmp_path / "fifield_bomb.zip"
    p.write_bytes(data)
    return p

src/safezip/tests/test_cli.py

src/safezip/tests/test_cli.py
"""Tests for the safezip CLI."""

import io
import zipfile
from unittest.mock import patch

import pytest

from safezip.cli._main import main

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"


@pytest.fixture()
def simple_archive(tmp_path):
    """A simple valid ZIP archive."""
    buf = io.BytesIO()
    with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        zf.writestr("file1.txt", b"content1\n")
        zf.writestr("dir/file2.txt", b"content2\n")
    p = tmp_path / "simple.zip"
    p.write_bytes(buf.getvalue())
    return p


class TestExtractCommand:
    """Tests for the extract command."""

    def test_extract_basic(self, simple_archive, tmp_path, capsys):
        """Basic extraction works."""
        dest = tmp_path / "out"
        with patch("sys.argv", ["safezip", "extract", str(simple_archive), str(dest)]):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 0

        assert (dest / "file1.txt").read_text() == "content1\n"
        assert (dest / "dir" / "file2.txt").read_text() == "content2\n"
        captured = capsys.readouterr()
        assert "Extracted to" in captured.out

    def test_extract_with_max_file_size(self, simple_archive, tmp_path):
        """Extract with --max-file-size flag."""
        dest = tmp_path / "out"
        with patch(
            "sys.argv",
            [
                "safezip",
                "extract",
                str(simple_archive),
                str(dest),
                "--max-file-size",
                "1000",
            ],
        ):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 0

        assert (dest / "file1.txt").exists()

    def test_extract_with_max_files(self, simple_archive, tmp_path):
        """Extract with --max-files flag."""
        dest = tmp_path / "out"
        with patch(
            "sys.argv",
            [
                "safezip",
                "extract",
                str(simple_archive),
                str(dest),
                "--max-files",
                "10",
            ],
        ):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 0

        assert (dest / "file1.txt").exists()

    def test_extract_with_symlink_policy(self, simple_archive, tmp_path):
        """Extract with --symlink-policy flag."""
        dest = tmp_path / "out"
        with patch(
            "sys.argv",
            [
                "safezip",
                "extract",
                str(simple_archive),
                str(dest),
                "--symlink-policy",
                "reject",
            ],
        ):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 0

        assert (dest / "file1.txt").exists()

    def test_extract_with_recursive_flag(self, simple_archive, tmp_path):
        """Extract with --recursive flag."""
        dest = tmp_path / "out"
        with patch(
            "sys.argv",
            [
                "safezip",
                "extract",
                str(simple_archive),
                str(dest),
                "--recursive",
            ],
        ):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 0

        assert (dest / "file1.txt").exists()

    def test_extract_nonexistent_archive(self, tmp_path, capsys):
        """Extract fails with nonexistent archive."""
        dest = tmp_path / "out"
        with patch("sys.argv", ["safezip", "extract", "/nonexistent.zip", str(dest)]):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 1

        captured = capsys.readouterr()
        assert "error:" in captured.err

    def test_extract_creates_destination(self, simple_archive, tmp_path):
        """Extract creates destination directory if it doesn't exist."""
        dest = tmp_path / "nested" / "out"
        with patch("sys.argv", ["safezip", "extract", str(simple_archive), str(dest)]):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 0

        assert dest.exists()
        assert (dest / "file1.txt").exists()

    def test_extract_zipslip_rejected(self, zipslip_archive, tmp_path, capsys):
        """Extract rejects ZipSlip archive."""
        dest = tmp_path / "out"
        with patch("sys.argv", ["safezip", "extract", str(zipslip_archive), str(dest)]):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 1

        captured = capsys.readouterr()
        assert "error:" in captured.err

    def test_extract_zipbomb_rejected(self, high_ratio_archive, tmp_path, capsys):
        """Extract rejects ZIP bomb."""
        dest = tmp_path / "out"
        with patch(
            "sys.argv",
            [
                "safezip",
                "extract",
                str(high_ratio_archive),
                str(dest),
                "--max-per-member-ratio",
                "10",
            ],
        ):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 1

        captured = capsys.readouterr()
        assert "error:" in captured.err

    def test_extract_too_many_files_rejected(
        self, many_files_archive, tmp_path, capsys
    ):
        """Extract rejects archive with too many files."""
        dest = tmp_path / "out"
        with patch(
            "sys.argv",
            [
                "safezip",
                "extract",
                str(many_files_archive),
                str(dest),
                "--max-files",
                "100",
            ],
        ):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 1

        captured = capsys.readouterr()
        assert "error:" in captured.err

    def test_extract_null_byte_filename_rejected(
        self, null_byte_filename_archive, tmp_path, capsys
    ):
        """Extract rejects archive with null byte in filename."""
        dest = tmp_path / "out"
        with patch(
            "sys.argv",
            ["safezip", "extract", str(null_byte_filename_archive), str(dest)],
        ):
            with pytest.raises(SystemExit) as exc_info:
                main()
            # Can exit with 1 due to either MalformedArchiveError (null byte)
            # or BadZipFile (CRC error from crafted archive)
            assert exc_info.value.code == 1

        captured = capsys.readouterr()
        assert "error:" in captured.err


class TestListCommand:
    """Tests for the list command."""

    def test_list_basic(self, simple_archive, capsys):
        """List command shows archive members."""
        with patch("sys.argv", ["safezip", "list", str(simple_archive)]):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 0

        captured = capsys.readouterr()
        assert "file1.txt" in captured.out
        assert "dir/file2.txt" in captured.out

    def test_list_nonexistent_archive(self, capsys):
        """List fails with nonexistent archive."""
        with patch("sys.argv", ["safezip", "list", "/nonexistent.zip"]):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 1

        captured = capsys.readouterr()
        assert "error:" in captured.err


class TestVersionFlag:
    """Tests for --version flag."""

    def test_version_flag(self, capsys):
        """--version flag displays version."""
        with patch("sys.argv", ["safezip", "--version"]):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 0

        captured = capsys.readouterr()
        assert "safezip" in captured.out

src/safezip/tests/test_guard.py

src/safezip/tests/test_guard.py
"""Tests for Phase A: the Guard (pre-extraction validation)."""

import io
import struct
import zipfile
import zlib

import pytest

from safezip import (
    FileCountExceededError,
    FileSizeExceededError,
    MalformedArchiveError,
    SafeZipFile,
)
from safezip._guard import ScanResult, ZipInspector

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"


class TestFileCountLimit:
    """Guard rejects archives with too many entries."""

    def test_many_files_raises(self, many_files_archive, tmp_path):
        with pytest.raises(FileCountExceededError):
            SafeZipFile(many_files_archive)

    def test_many_files_custom_limit_passes(self, many_files_archive, tmp_path):
        # Allow up to 20 000 files - should open without error
        with SafeZipFile(many_files_archive, max_files=20_000):
            pass

    def test_file_count_exactly_at_limit(self, tmp_path):
        buf = io.BytesIO()
        with zipfile.ZipFile(buf, "w") as zf:
            for i in range(5):
                zf.writestr(f"file_{i}.txt", b"x")
        p = tmp_path / "five.zip"
        p.write_bytes(buf.getvalue())
        with SafeZipFile(p, max_files=5):
            pass

    def test_file_count_one_over_limit(self, tmp_path):
        buf = io.BytesIO()
        with zipfile.ZipFile(buf, "w") as zf:
            for i in range(6):
                zf.writestr(f"file_{i}.txt", b"x")
        p = tmp_path / "six.zip"
        p.write_bytes(buf.getvalue())
        with pytest.raises(FileCountExceededError):
            SafeZipFile(p, max_files=5)


class TestDeclaredFileSizeLimit:
    """Guard rejects archives whose declared sizes exceed max_file_size."""

    def test_large_declared_size_raises(self, tmp_path):
        # Declare a very large file but store tiny content
        buf = io.BytesIO()
        with zipfile.ZipFile(buf, "w") as zf:
            info = zipfile.ZipInfo("big.bin")
            zf.writestr(info, b"tiny")

        # Manually patch the ZipInfo to report a huge size - instead,
        # test via the limit: store a 200-byte file and set limit=100
        buf2 = io.BytesIO()
        with zipfile.ZipFile(buf2, "w") as zf2:
            zf2.writestr("data.bin", b"A" * 200)
        p = tmp_path / "large.zip"
        p.write_bytes(buf2.getvalue())

        with pytest.raises(FileSizeExceededError):
            SafeZipFile(p, max_file_size=100)

    def test_size_exactly_at_limit_passes(self, tmp_path):
        buf = io.BytesIO()
        with zipfile.ZipFile(buf, "w") as zf:
            zf.writestr("data.bin", b"A" * 100)
        p = tmp_path / "exact.zip"
        p.write_bytes(buf.getvalue())
        with SafeZipFile(p, max_file_size=100):
            pass


class TestNullByteInFilename:
    """Null bytes in ZIP filenames are neutralised by Python's zipfile layer.

    Python 3.x's :mod:`zipfile` truncates filenames at the first null byte
    when reading the central directory (e.g. ``safe\x00../../etc/passwd``
    becomes ``safe``).  Our Guard therefore never sees a null byte in
    ``ZipInfo.filename``; the Sandbox's ``resolve_member_path`` carries the
    defence-in-depth check for callers that bypass ``zipfile``.

    This test verifies the safe outcome: no traversal path survives Python's
    null-byte truncation.
    """

    def test_null_byte_filename_truncated_safely(self, null_byte_filename_archive):
        """Python strips null bytes; the traversal portion is never evaluated."""
        # Python truncates 'safe\x00../../etc/passwd' → 'safe'
        with SafeZipFile(null_byte_filename_archive) as zf:
            names = zf.namelist()
        # No null bytes survive Python's filename decoding
        assert not any("\x00" in n for n in names), (
            f"Null byte survived Python's filename decoding: {names!r}"
        )
        # No directory-traversal components should be present
        assert not any(".." in n for n in names), (
            f"Traversal component present after null-byte truncation: {names!r}"
        )


class TestZip64Inconsistency:
    """Guard detects ZIP64 extra fields that disagree with central directory."""

    def test_zip64_inconsistency_raises(self, zip64_inconsistency_archive):
        with pytest.raises(MalformedArchiveError):
            SafeZipFile(zip64_inconsistency_archive)


class TestLegitimateArchive:
    """Guard passes well-formed archives."""

    def test_legitimate_archive_passes(self, legitimate_archive):
        with SafeZipFile(legitimate_archive) as zf:
            assert len(zf.namelist()) == 3

    def test_namelist_accessible(self, legitimate_archive):
        with SafeZipFile(legitimate_archive) as zf:
            names = zf.namelist()
        assert "hello.txt" in names

    def test_infolist_accessible(self, legitimate_archive):
        with SafeZipFile(legitimate_archive) as zf:
            infos = zf.infolist()
        assert any(i.filename == "hello.txt" for i in infos)

    def test_getinfo_accessible(self, legitimate_archive):
        with SafeZipFile(legitimate_archive) as zf:
            info = zf.getinfo("hello.txt")
        assert info.filename == "hello.txt"


class TestOverlappingEntryDetection:
    """Guard rejects archives with overlapping local entries (Fifield-style bombs)."""

    def test_fifield_bomb_raises_malformed(self, fifield_bomb_archive):
        with pytest.raises(MalformedArchiveError, match="overlapping"):
            SafeZipFile(fifield_bomb_archive)

    def test_fifield_bomb_no_extraction_attempted(self, fifield_bomb_archive, tmp_path):
        dest = tmp_path / "out"
        dest.mkdir()
        with pytest.raises(MalformedArchiveError):
            SafeZipFile(fifield_bomb_archive)
        assert list(dest.iterdir()) == []

    def test_legitimate_archive_passes_overlap_check(self, legitimate_archive):
        with SafeZipFile(legitimate_archive) as zf:
            assert len(zf.namelist()) > 0

    def test_overlap_check_does_not_decompress(self, fifield_bomb_archive, tmp_path):
        with pytest.raises(MalformedArchiveError, match="overlapping"):
            SafeZipFile(fifield_bomb_archive, max_per_member_ratio=100_000.0)


def _lfh(filename: bytes, data: bytes, compress_type: int = 0) -> bytes:
    """Build a Local File Header + data."""
    return (
        struct.pack(
            "<LHHHHHLLLHH",
            0x04034B50,
            20,
            0,
            compress_type,
            0,
            0,
            zlib.crc32(data) & 0xFFFFFFFF,
            len(data),
            len(data),
            len(filename),
            0,
        )
        + filename
        + data
    )


def _cdh(
    filename: bytes, data: bytes, local_offset: int, compress_type: int = 0
) -> bytes:
    """Build a Central Directory Header."""
    return (
        struct.pack(
            "<LHHHHHHLLLHHHHHLL",
            0x02014B50,
            20,
            20,
            0,
            compress_type,
            0,
            0,
            zlib.crc32(data) & 0xFFFFFFFF,
            len(data),
            len(data),
            len(filename),
            0,
            0,
            0,
            0,
            0,
            local_offset,
        )
        + filename
    )


def _eocd(
    num_entries: int, cd_size: int, cd_offset: int, comment: bytes = b""
) -> bytes:
    """Build an End of Central Directory record."""
    return (
        struct.pack(
            "<LHHHHLLH",
            0x06054B50,
            0,
            0,
            num_entries,
            num_entries,
            cd_size,
            cd_offset,
            len(comment),
        )
        + comment
    )


def _build_zip(*files: tuple[bytes, bytes]) -> bytes:
    """Build a well-formed zip from (filename, data) pairs."""
    lfhs, cdhs = [], []
    cursor = 0
    for fname, data in files:
        lfh = _lfh(fname, data)
        cdhs.append(_cdh(fname, data, cursor))
        lfhs.append(lfh)
        cursor += len(lfh)

    cd = b"".join(cdhs)
    return b"".join(lfhs) + cd + _eocd(len(files), len(cd), cursor)


def _build_overlap_zip(fname_a: bytes, fname_b: bytes, data: bytes) -> bytes:
    """Build a zip where two CDH entries point to the same LFH offset."""
    lfh = _lfh(fname_a, data)
    cdh1 = _cdh(fname_a, data, 0)
    cdh2 = _cdh(fname_b, data, 0)
    cd = cdh1 + cdh2
    return lfh + cd + _eocd(2, len(cd), len(lfh))


class TestZipInspector:
    """Tests for the ZipInspector overlap detection."""

    def _scan(self, data: bytes) -> ScanResult:
        return ZipInspector(io.BytesIO(data)).scan()

    def test_clean_single_file(self):
        data = _build_zip((b"readme.txt", b"hello"))
        result = self._scan(data)
        assert result.is_bomb is False

    def test_clean_two_files_sequential(self):
        data = _build_zip(
            (b"a.txt", b"first file contents"),
            (b"b.txt", b"second file contents"),
        )
        assert self._scan(data).is_bomb is False

    def test_clean_many_files(self):
        files = [(f"file{i}.txt".encode(), f"content {i}".encode()) for i in range(50)]
        data = _build_zip(*files)
        assert self._scan(data).is_bomb is False

    def test_clean_empty_file_entry(self):
        data = _build_zip((b"empty", b""))
        assert self._scan(data).is_bomb is False

    def test_overlap_two_cdh_same_offset(self):
        data = _build_overlap_zip(b"a", b"b", b"kernel data")
        assert self._scan(data).is_bomb is True

    def test_overlap_detail_is_populated(self):
        data = _build_overlap_zip(b"x", b"y", b"data")
        result = self._scan(data)
        assert result.is_bomb is True
        assert result.overlap_detail is not None

    def test_overlap_at_offset_zero(self):
        """Entries with data_start=0 should still be detected as overlapping."""
        lfh1 = _lfh(b"a", b"data")
        cdh1 = _cdh(b"a", b"data", 0)
        cdh2 = _cdh(b"b", b"data", 0)
        cd = cdh1 + cdh2
        data = lfh1 + cd + _eocd(2, len(cd), len(lfh1))
        result = self._scan(data)
        assert result.is_bomb is True

    def test_invalid_not_a_zip(self):
        result = self._scan(b"this is not a zip file at all")
        assert result.is_bomb is None

    def test_invalid_empty_bytes(self):
        result = self._scan(b"")
        assert result.is_bomb is None

    def test_invalid_truncated_eocd(self):
        result = self._scan(b"PK\x05\x06\x00\x00")
        assert result.is_bomb is None

    def test_invalid_garbage_with_pk_bytes(self):
        result = self._scan(b"\x00" * 100 + b"PK\x05\x06" + b"\xff" * 18)
        assert result.is_bomb is None

    def test_invalid_cdh_signature_mismatch(self):
        raw = bytearray(_build_zip((b"f", b"data")))
        cdh_pos = raw.find(b"PK\x01\x02")
        raw[cdh_pos] = 0xFF
        assert self._scan(bytes(raw)).is_bomb is None

    def test_invalid_lfh_signature_mismatch(self):
        raw = bytearray(_build_zip((b"f", b"data")))
        raw[0] = 0xFF
        assert self._scan(bytes(raw)).is_bomb is None

    def test_gap_does_not_trigger_bomb(self):
        gap = b"\x00" * 16
        lfh1 = _lfh(b"a", b"data1")
        lfh2 = _lfh(b"b", b"data2")
        off1 = 0
        off2 = len(lfh1) + len(gap)
        cdh1 = _cdh(b"a", b"data1", off1)
        cdh2 = _cdh(b"b", b"data2", off2)
        cd = cdh1 + cdh2
        raw = lfh1 + gap + lfh2 + cd + _eocd(2, len(cd), off2 + len(lfh2))
        assert self._scan(raw).is_bomb is False

    def test_leading_bytes_not_a_bomb(self):
        prefix = b"\x00" * 32
        lfh = _lfh(b"x", b"payload")
        cdh = _cdh(b"x", b"payload", len(prefix))
        cd = cdh
        raw = prefix + lfh + cd + _eocd(1, len(cd), len(prefix) + len(lfh))
        assert self._scan(raw).is_bomb is False

    def test_zip_with_comment(self):
        raw = _build_zip((b"x.txt", b"data"))
        eocd_pos = raw.rfind(b"PK\x05\x06")
        comment = b"this is a zip comment"
        head = raw[:eocd_pos]
        eocd = raw[eocd_pos:]
        new_eocd = eocd[:20] + struct.pack("<H", len(comment)) + comment
        assert self._scan(head + new_eocd).is_bomb is False

    def test_invalid_split_across_disks(self):
        lfh = _lfh(b"a", b"data")
        cdh = _cdh(b"a", b"data", 0)
        cd = cdh
        eocd = struct.pack(
            "<LHHHHLLH",
            0x06054B50,
            0,
            1,
            1,
            1,
            len(cd),
            0,
            0,
        )
        raw = lfh + cd + eocd
        assert self._scan(raw).is_bomb is None

    def test_invalid_eocd_entries_mismatch(self):
        lfh = _lfh(b"a", b"data")
        cdh = _cdh(b"a", b"data", 0)
        cd = cdh
        eocd = struct.pack(
            "<LHHHHLLH",
            0x06054B50,
            0,
            0,
            1,
            2,
            len(cd),
            len(lfh),
            0,
        )
        raw = lfh + cd + eocd
        assert self._scan(raw).is_bomb is None

    def test_invalid_eocd_cd_extends_past_eof(self):
        lfh = _lfh(b"a", b"data")
        cdh = _cdh(b"a", b"data", 0)
        cd = cdh
        eocd = struct.pack(
            "<LHHHHLLH",
            0x06054B50,
            0,
            0,
            1,
            1,
            len(cd) + 1000,
            len(lfh),
            0,
        )
        raw = lfh + cd + eocd
        assert self._scan(raw).is_bomb is None

    def test_invalid_cd_extends_past_eof(self):
        lfh = _lfh(b"a", b"data")
        cdh = _cdh(b"a", b"data", 0)
        cd = cdh
        raw = lfh + cd + cd + _eocd(2, len(cd) * 2, len(lfh))
        result = self._scan(raw)
        assert result.is_bomb is not False

    def test_invalid_local_offset_past_eof(self):
        lfh = _lfh(b"a", b"data")
        cdh_bad = _cdh(b"a", b"data", 999999)
        cd = cdh_bad
        raw = lfh + cd + _eocd(1, len(cd), len(lfh))
        assert self._scan(raw).is_bomb is None

    def test_invalid_cdh_variable_field_truncated(self):
        lfh = _lfh(b"a", b"data")
        cdh_base = struct.pack(
            "<LHHHHHHLLLHHHHHLL",
            0x02014B50,
            20,
            20,
            0,
            0,
            0,
            0,
            zlib.crc32(b"data") & 0xFFFFFFFF,
            4,
            4,
            1,
            0,
            0,
            0,
            0,
            0,
            0,
        )
        cdh = cdh_base + b"a"
        raw = lfh + cdh + _eocd(1, len(cdh), len(lfh))
        assert self._scan(raw).is_bomb is not True

    def test_invalid_lfh_signature_invalid(self):
        lfh = _lfh(b"a", b"data")
        raw_lfh = bytearray(lfh)
        raw_lfh[0] = 0xFF
        cdh = _cdh(b"a", b"data", 0)
        cd = cdh
        raw = bytes(raw_lfh) + cd + _eocd(1, len(cd), len(lfh))
        assert self._scan(raw).is_bomb is None

    def test_invalid_cdh_disk_nonzero(self):
        lfh = _lfh(b"a", b"data")
        cdh_base = (
            struct.pack(
                "<LHHHHHHLLLHHHHHLL",
                0x02014B50,
                20,
                20,
                0,
                0,
                0,
                0,
                zlib.crc32(b"data") & 0xFFFFFFFF,
                4,
                4,
                1,
                0,
                0,
                0,
                0,
                0,
                0,
            )
            + b"a"
        )
        cdh = cdh_base
        cd = cdh
        raw = lfh + cd + _eocd(1, len(cd), len(lfh))
        assert self._scan(raw).is_bomb is not True

    def test_cdh_extra_field_truncated(self):
        lfh = _lfh(b"a", b"data")
        cdh_base = struct.pack(
            "<LHHHHHHLLLHHHHHLL",
            0x02014B50,
            20,
            20,
            0,
            0,
            0,
            0,
            zlib.crc32(b"data") & 0xFFFFFFFF,
            4,
            4,
            5,
            0,
            0,
            0,
            0,
            0,
            0,
        )
        extra = struct.pack("<HH", 0x0001, 4)
        cdh = cdh_base + b"filename" + extra
        cd = cdh
        raw = lfh + cd + _eocd(1, len(cd), len(lfh))
        assert self._scan(raw).is_bomb is None

    def test_cdh_zip64_extra_invalid_size(self):
        lfh = _lfh(b"a", b"data")
        cdh_base = struct.pack(
            "<LHHHHHHLLLHHHHHLL",
            0x02014B50,
            20,
            20,
            0,
            0,
            0,
            0,
            zlib.crc32(b"data") & 0xFFFFFFFF,
            0xFFFFFFFF,
            0xFFFFFFFF,
            10,
            0,
            0,
            0,
            0,
            0,
            0xFFFFFFFF,
        )
        extra = struct.pack("<HHQ", 0x0001, 4, 100)
        cdh = cdh_base + b"filename" + extra
        cd = cdh
        raw = lfh + cd + _eocd(1, len(cd), len(lfh))
        assert self._scan(raw).is_bomb is None

src/safezip/tests/test_integration.py

src/safezip/tests/test_integration.py
"""End-to-end integration tests using real crafted malicious archives."""

import io
import stat
import zipfile

import pytest

from safezip import (
    CompressionRatioError,
    FileCountExceededError,
    FileSizeExceededError,
    MalformedArchiveError,
    NestingDepthError,
    SafeZipFile,
    SymlinkPolicy,
    UnsafeZipError,
    safe_extract,
)

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"


class TestZipSlip:
    """ZipSlip path traversal attacks are blocked before any bytes reach disk."""

    def test_relative_traversal_blocked(self, zipslip_archive, tmp_path):
        dest = tmp_path / "out"
        dest.mkdir()
        with pytest.raises(UnsafeZipError), SafeZipFile(zipslip_archive) as zf:
            zf.extractall(dest)
        # Confirm no file escaped to the parent
        evil = tmp_path / "evil.txt"
        assert not evil.exists()

    def test_absolute_path_blocked(self, absolute_path_archive, tmp_path):
        dest = tmp_path / "out"
        dest.mkdir()
        with pytest.raises(UnsafeZipError), SafeZipFile(absolute_path_archive) as zf:
            zf.extractall(dest)

    def test_traversal_leaves_no_files(self, zipslip_archive, tmp_path):
        dest = tmp_path / "out"
        dest.mkdir()
        with pytest.raises(UnsafeZipError), SafeZipFile(zipslip_archive) as zf:
            zf.extractall(dest)
        assert not list(dest.rglob("*"))

    def test_unicode_traversal_blocked(self, unicode_traversal_archive, tmp_path):
        dest = tmp_path / "out"
        dest.mkdir()
        with (
            pytest.raises(UnsafeZipError),
            SafeZipFile(unicode_traversal_archive) as zf,
        ):
            zf.extractall(dest)


class TestZipBomb:
    """ZIP bomb attacks are detected and aborted."""

    def test_high_ratio_bomb_blocked(self, high_ratio_archive, tmp_path):
        dest = tmp_path / "out"
        dest.mkdir()
        with (
            pytest.raises(CompressionRatioError),
            SafeZipFile(high_ratio_archive, max_per_member_ratio=10.0) as zf,
        ):
            zf.extractall(dest)

    def test_high_ratio_no_partial_files(self, high_ratio_archive, tmp_path):
        dest = tmp_path / "out"
        dest.mkdir()
        with (
            pytest.raises(CompressionRatioError),
            SafeZipFile(high_ratio_archive, max_per_member_ratio=10.0) as zf,
        ):
            zf.extractall(dest)
        remaining = [f for f in dest.rglob("*") if not f.is_dir()]
        assert not remaining

    def test_file_size_lie_blocked(self, tmp_path):
        """Archive that lies about size in header is caught by the streamer."""
        # Store 2000 bytes but set max_file_size=500 in Guard
        # The Guard will reject the archive if declare size > max_file_size
        buf = io.BytesIO()
        with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_STORED) as zf:
            zf.writestr("data.bin", b"X" * 2000)
        p = tmp_path / "lie.zip"
        p.write_bytes(buf.getvalue())
        dest = tmp_path / "out"
        dest.mkdir()

        with (
            pytest.raises(FileSizeExceededError),
            SafeZipFile(p, max_file_size=500) as zf,
        ):
            zf.extractall(dest)

    def test_many_files_bomb_blocked(self, many_files_archive, tmp_path):
        """Archive with too many files is blocked at the Guard phase."""
        with pytest.raises(FileCountExceededError):
            SafeZipFile(many_files_archive)


class TestExplicitPathRequirement:
    """extractall must receive an explicit path; CWD is never used silently."""

    def test_extractall_requires_path(self, legitimate_archive, tmp_path):
        """extractall with a valid path works; calling without is a TypeError."""
        dest = tmp_path / "out"
        dest.mkdir()
        with SafeZipFile(legitimate_archive) as zf:
            zf.extractall(dest)  # must not raise
        assert (dest / "hello.txt").exists()

    def test_extractall_wrong_type_raises(self, legitimate_archive):
        """Passing None as path raises TypeError."""
        with (
            SafeZipFile(legitimate_archive) as zf,
            pytest.raises((TypeError, AttributeError)),
        ):
            zf.extractall(None)

    def test_extract_with_none_path_raises(self, legitimate_archive):
        """Passing None as path to extract() raises TypeError."""
        with SafeZipFile(legitimate_archive) as zf, pytest.raises(TypeError):
            zf.extract("hello.txt", None)

    def test_extractall_with_members_list(self, legitimate_archive, tmp_path):
        """extractall with a members list extracts only those members."""
        dest = tmp_path / "out"
        dest.mkdir()
        with SafeZipFile(legitimate_archive) as zf:
            zf.extractall(dest, members=["hello.txt"])
        # Only hello.txt should exist
        assert (dest / "hello.txt").exists()
        contents = list(dest.rglob("*"))
        assert len(contents) == 1


class TestMalformedArchive:
    """Structurally invalid archives raise MalformedArchiveError."""

    def test_not_a_zip_raises_malformed(self, tmp_path):
        """A file that is not a ZIP at all raises MalformedArchiveError."""
        bad = tmp_path / "bad.zip"
        bad.write_bytes(b"this is not a zip file")
        with pytest.raises(MalformedArchiveError):
            SafeZipFile(bad)

    def test_zip64_inconsistency_raises(self, zip64_inconsistency_archive):
        """ZIP64 extra field that disagrees with central directory is rejected."""
        with pytest.raises(MalformedArchiveError):
            SafeZipFile(zip64_inconsistency_archive)


class TestFifieldBomb:
    """End-to-end: Fifield-style zip bomb is blocked at Guard phase."""

    def test_fifield_bomb_blocked_end_to_end(self, fifield_bomb_archive, tmp_path):
        dest = tmp_path / "out"
        dest.mkdir()
        with (
            pytest.raises(MalformedArchiveError),
            SafeZipFile(fifield_bomb_archive) as zf,
        ):
            zf.extractall(dest)
        remaining = [f for f in dest.rglob("*") if not f.is_dir()]
        assert not remaining

    def test_security_event_fires_on_fifield_bomb(self, fifield_bomb_archive, tmp_path):
        """on_security_event callback receives 'malformed_archive' for Fifield bomb."""
        events = []
        dest = tmp_path / "out"
        dest.mkdir()
        with pytest.raises(MalformedArchiveError):
            SafeZipFile(fifield_bomb_archive, on_security_event=events.append)
        assert any(e.event_type == "malformed_archive" for e in events)

    def test_fifield_bomb_as_bytesio_rejected(self, fifield_bomb_archive):
        """Fifield bomb as BytesIO is rejected."""
        data = fifield_bomb_archive.read_bytes()
        bio = io.BytesIO(data)
        with pytest.raises(MalformedArchiveError):
            SafeZipFile(bio)

    def test_legitimate_archive_as_bytesio_passes(self, legitimate_archive):
        """Legitimate archive as BytesIO passes."""
        data = legitimate_archive.read_bytes()
        bio = io.BytesIO(data)
        with SafeZipFile(bio) as zf:
            assert len(zf.namelist()) > 0

    def test_fifield_bomb_bytesio_event_fires(self, fifield_bomb_archive):
        """on_security_event fires for in-memory Fifield bomb."""
        events = []
        data = fifield_bomb_archive.read_bytes()
        bio = io.BytesIO(data)
        with pytest.raises(MalformedArchiveError):
            SafeZipFile(bio, on_security_event=events.append)
        assert any(e.event_type == "malformed_archive" for e in events)


class TestSecurityEventCoverage:
    """on_security_event callback fires for all security violation types."""

    def test_callback_fires_on_path_traversal(self, zipslip_archive, tmp_path):
        events = []
        dest = tmp_path / "out"
        dest.mkdir()
        with (
            pytest.raises(UnsafeZipError),
            SafeZipFile(zipslip_archive, on_security_event=events.append) as zf,
        ):
            zf.extractall(dest)
        assert any(e.event_type == "zip_slip_detected" for e in events)

    def test_callback_fires_on_file_size_exceeded(self, tmp_path):
        buf = io.BytesIO()
        with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_STORED) as zf:
            zf.writestr("data.bin", b"A" * 1000)
        p = tmp_path / "large.zip"
        p.write_bytes(buf.getvalue())
        dest = tmp_path / "out"
        dest.mkdir()
        events = []
        with (
            pytest.raises(FileSizeExceededError),
            SafeZipFile(p, max_file_size=500, on_security_event=events.append) as zf,
        ):
            zf.extractall(dest)
        # The Guard may fire "declared_size_exceeded" (declared header size >
        # limit) or the Streamer may fire "file_size_exceeded" (actual
        # decompressed bytes > limit).  Both indicate a file-size violation.
        size_events = {"file_size_exceeded", "declared_size_exceeded"}
        assert any(e.event_type in size_events for e in events)

    def test_callback_fires_on_ratio_exceeded(self, high_ratio_archive, tmp_path):
        events = []
        dest = tmp_path / "out"
        dest.mkdir()
        with (
            pytest.raises(CompressionRatioError),
            SafeZipFile(
                high_ratio_archive,
                max_per_member_ratio=10.0,
                on_security_event=events.append,
            ) as zf,
        ):
            zf.extractall(dest)
        assert any(e.event_type == "compression_ratio_exceeded" for e in events)

    def test_callback_fires_on_file_count_exceeded(self, many_files_archive, tmp_path):
        events = []
        with pytest.raises(FileCountExceededError):
            SafeZipFile(many_files_archive, on_security_event=events.append)
        assert any(e.event_type == "file_count_exceeded" for e in events)

    def test_callback_fires_on_symlink_rejected(self, symlink_archive, tmp_path):
        events = []
        dest = tmp_path / "out"
        dest.mkdir()
        with (
            pytest.raises(UnsafeZipError),
            SafeZipFile(symlink_archive, on_security_event=events.append) as zf,
        ):
            zf.extractall(dest)
        assert any(e.event_type == "symlink_rejected" for e in events)


class TestLegitimateExtraction:
    """Well-formed archives extract correctly and completely."""

    def test_all_files_extracted(self, legitimate_archive, tmp_path):
        dest = tmp_path / "out"
        dest.mkdir()
        with SafeZipFile(legitimate_archive) as zf:
            zf.extractall(dest)
        assert (dest / "hello.txt").read_bytes() == b"Hello, world!\n"
        assert (dest / "subdir" / "data.txt").read_bytes() == b"Some data\n"
        assert (dest / "subdir" / "nested" / "deep.txt").read_bytes() == b"Deep file\n"

    def test_safe_extract_convenience(self, legitimate_archive, tmp_path):
        dest = tmp_path / "out"
        dest.mkdir()
        safe_extract(legitimate_archive, dest)
        assert (dest / "hello.txt").exists()

    def test_context_manager_closes_properly(self, legitimate_archive, tmp_path):
        dest = tmp_path / "out"
        dest.mkdir()
        with SafeZipFile(legitimate_archive) as zf:
            zf.extractall(dest)
        # After context exit, the underlying ZipFile's fp should be None (closed).
        # zipfile.ZipFile.close() sets self.fp = None.
        assert zf._zf.fp is None


class TestSecurityEventCallback:
    """on_security_event callback is called on security events."""

    def test_callback_called_on_zip_slip(self, zipslip_archive, tmp_path):
        events = []

        def capture(event):
            events.append(event)

        dest = tmp_path / "out"
        dest.mkdir()
        with (
            pytest.raises(UnsafeZipError),
            SafeZipFile(zipslip_archive, on_security_event=capture) as zf,
        ):
            zf.extractall(dest)
        # Note: callback is called for monitored events during extraction;
        # path traversal may be detected in sandbox before callback fires.
        # The test verifies no crash occurs.

    def test_callback_exception_does_not_swallow_security_error(
        self, zipslip_archive, tmp_path
    ):
        def broken_callback(event):
            raise RuntimeError("callback broken")

        dest = tmp_path / "out"
        dest.mkdir()
        # The UnsafeZipError must still propagate even if callback raises
        with (
            pytest.raises(UnsafeZipError),
            SafeZipFile(zipslip_archive, on_security_event=broken_callback) as zf,
        ):
            zf.extractall(dest)


class TestNestingDepthLimit:
    """SafeZipFile refuses instantiation when _nesting_depth exceeds the limit."""

    def test_nesting_depth_exceeded_raises(self, legitimate_archive):
        """_nesting_depth > max_nesting_depth raises NestingDepthError."""
        with pytest.raises(NestingDepthError):
            SafeZipFile(legitimate_archive, max_nesting_depth=3, _nesting_depth=4)

    def test_nesting_depth_at_limit_passes(self, legitimate_archive):
        """_nesting_depth == max_nesting_depth is allowed."""
        with SafeZipFile(legitimate_archive, max_nesting_depth=3, _nesting_depth=3):
            pass

    def test_nesting_depth_zero_always_passes(self, legitimate_archive):
        """Default _nesting_depth=0 never raises."""
        with SafeZipFile(legitimate_archive):
            pass

    def test_nesting_depth_env_var_respected(self, legitimate_archive, monkeypatch):
        """SAFEZIP_MAX_NESTING_DEPTH env var is honoured when no constructor arg
        is given."""
        monkeypatch.setenv("SAFEZIP_MAX_NESTING_DEPTH", "1")
        # depth=2 > env-var limit of 1 → should raise
        with pytest.raises(NestingDepthError):
            SafeZipFile(legitimate_archive, _nesting_depth=2)

    def test_nesting_depth_exceeded_event(self, legitimate_archive):
        """nesting_depth_exceeded event is emitted when depth exceeds limit."""
        events = []
        with pytest.raises(NestingDepthError):
            SafeZipFile(
                legitimate_archive,
                max_nesting_depth=1,
                _nesting_depth=2,
                on_security_event=events.append,
            )
        assert any(e.event_type == "nesting_depth_exceeded" for e in events)


class TestNestedArchiveGuard:
    """Nested archive members are extracted as raw files, not recursed into."""

    def test_inner_zip_extracted_as_raw_file(self, tmp_path):
        inner_buf = io.BytesIO()
        with zipfile.ZipFile(inner_buf, "w") as inner_zf:
            inner_zf.writestr("secret.txt", b"inner content")
        inner_bytes = inner_buf.getvalue()

        outer_buf = io.BytesIO()
        with zipfile.ZipFile(outer_buf, "w") as outer_zf:
            outer_zf.writestr("readme.txt", b"outer content")
            outer_zf.writestr("nested.zip", inner_bytes)
        outer_p = tmp_path / "outer.zip"
        outer_p.write_bytes(outer_buf.getvalue())

        dest = tmp_path / "out"
        dest.mkdir()
        with SafeZipFile(outer_p) as zf:
            zf.extractall(dest)

        # The nested.zip should be present as a raw file, not recursed
        assert (dest / "nested.zip").exists()
        assert (dest / "nested.zip").read_bytes() == inner_bytes
        # The inner secret.txt should NOT be extracted
        assert not (dest / "secret.txt").exists()


class TestRecursiveNestingDepthIntegration:
    """Real zip-within-zip recursion is stopped at max_nesting_depth.

    These tests use an actual nested archive and a realistic recursive
    extraction helper to verify that the guard fires in practice, not just
    when the counter is poked directly.
    """

    @staticmethod
    def _build_nested_zip(levels: int) -> bytes:
        """Return bytes of a zip nested *levels* deep.

        The innermost zip contains ``secret.txt``.  Every outer layer wraps
        the previous one as ``inner.zip`` plus a ``readme.txt`` so there is
        always a regular file at each level too.
        """
        buf = io.BytesIO()
        with zipfile.ZipFile(buf, "w") as zf:
            zf.writestr("secret.txt", b"innermost content")
        data = buf.getvalue()

        for _ in range(levels - 1):
            buf = io.BytesIO()
            with zipfile.ZipFile(buf, "w") as zf:
                zf.writestr("readme.txt", b"outer level content")
                zf.writestr("inner.zip", data)
            data = buf.getvalue()

        return data

    @staticmethod
    def _recursive_extract(zip_path, dest, *, depth=0, max_nesting_depth=2):
        """Minimal recursive extractor that passes *depth* to SafeZipFile.

        This is the pattern a caller must follow to get nesting protection.
        SafeZipFile raises NestingDepthError before opening the archive when
        *depth* exceeds *max_nesting_depth*.
        """
        with SafeZipFile(
            zip_path,
            max_nesting_depth=max_nesting_depth,
            _nesting_depth=depth,
        ) as zf:
            zf.extractall(dest)
            for name in zf.namelist():
                if name.endswith(".zip"):
                    nested_src = dest / name
                    nested_dest = dest / (name[:-4] + "_contents")
                    nested_dest.mkdir()
                    TestRecursiveNestingDepthIntegration._recursive_extract(
                        nested_src,
                        nested_dest,
                        depth=depth + 1,
                        max_nesting_depth=max_nesting_depth,
                    )

    def test_recursive_extraction_stopped_at_depth_limit(self, tmp_path):
        """Recursion into a 3-level archive raises NestingDepthError at level 3.

        Archive layout::

            outer.zip          (depth 0 — opened fine)
              readme.txt
              inner.zip        (depth 1 — opened fine)
                readme.txt
                inner.zip      (depth 2 — raises, exceeds max_nesting_depth=1)
                  secret.txt
        """
        outer_p = tmp_path / "outer.zip"
        outer_p.write_bytes(self._build_nested_zip(3))
        dest = tmp_path / "out"
        dest.mkdir()

        with pytest.raises(NestingDepthError):
            self._recursive_extract(outer_p, dest, max_nesting_depth=1)

    def test_recursive_extraction_succeeds_within_limit(self, tmp_path):
        """Recursion within the depth limit extracts every level successfully.

        With max_nesting_depth=2 and a 3-level archive (depths 0, 1, 2),
        all levels are within the limit and secret.txt reaches disk.
        """
        outer_p = tmp_path / "outer.zip"
        outer_p.write_bytes(self._build_nested_zip(3))
        dest = tmp_path / "out"
        dest.mkdir()

        self._recursive_extract(outer_p, dest, max_nesting_depth=2)

        innermost = dest / "inner_contents" / "inner_contents" / "secret.txt"
        assert innermost.read_bytes() == b"innermost content"


class TestBuiltinRecursiveExtraction:
    """SafeZipFile with recursive=True auto-descends into nested zip members."""

    @staticmethod
    def _build_zip(members: list[tuple[str, bytes]]) -> bytes:
        buf = io.BytesIO()
        with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
            for name, content in members:
                zf.writestr(name, content)
        return buf.getvalue()

    def test_recursive_false_is_default_raw_blob(self, tmp_path):
        """recursive=False (default) leaves nested zips as raw files."""
        inner = self._build_zip([("secret.txt", b"inner")])
        outer_p = tmp_path / "outer.zip"
        outer_p.write_bytes(self._build_zip([("inner.zip", inner)]))
        dest = tmp_path / "out"
        dest.mkdir()

        with SafeZipFile(outer_p) as zf:
            zf.extractall(dest)

        assert (dest / "inner.zip").exists()
        assert not (dest / "inner" / "secret.txt").exists()

    def test_recursive_extracts_nested_content(self, tmp_path):
        """recursive=True descends into inner.zip and extracts its content."""
        inner = self._build_zip([("secret.txt", b"inner content")])
        outer_p = tmp_path / "outer.zip"
        outer_p.write_bytes(
            self._build_zip([("readme.txt", b"outer"), ("inner.zip", inner)])
        )
        dest = tmp_path / "out"
        dest.mkdir()

        with SafeZipFile(outer_p, recursive=True) as zf:
            zf.extractall(dest)

        assert (dest / "readme.txt").read_bytes() == b"outer"
        assert (dest / "inner" / "secret.txt").read_bytes() == b"inner content"
        assert not (dest / "inner.zip").exists()

    def test_recursive_depth_limit_raises(self, tmp_path):
        """recursive=True stops at max_nesting_depth and raises NestingDepthError."""
        # 3-level deep: outer -> middle.zip -> inner.zip -> secret.txt
        innermost = self._build_zip([("secret.txt", b"deep")])
        middle = self._build_zip([("inner.zip", innermost)])
        outer_p = tmp_path / "outer.zip"
        outer_p.write_bytes(self._build_zip([("middle.zip", middle)]))
        dest = tmp_path / "out"
        dest.mkdir()

        # max_nesting_depth=1 allows depth 0 and 1; opening depth-2 raises
        with (
            pytest.raises(NestingDepthError),
            SafeZipFile(outer_p, recursive=True, max_nesting_depth=1) as zf,
        ):
            zf.extractall(dest)

    def test_recursive_file_size_enforced_in_nested_zip(self, tmp_path):
        """File size limit applies inside nested zips when recursive=True."""
        inner = self._build_zip([("big.txt", b"A" * 2000)])
        outer_p = tmp_path / "outer.zip"
        outer_p.write_bytes(self._build_zip([("inner.zip", inner)]))
        dest = tmp_path / "out"
        dest.mkdir()

        with (
            pytest.raises(FileSizeExceededError),
            SafeZipFile(outer_p, recursive=True, max_file_size=500) as zf,
        ):
            zf.extractall(dest)

    def test_recursive_traversal_in_nested_zip_blocked(self, tmp_path):
        """Path traversal inside a nested zip is blocked when recursive=True."""
        inner = self._build_zip([("../../evil.txt", b"escaped")])
        outer_p = tmp_path / "outer.zip"
        outer_p.write_bytes(self._build_zip([("inner.zip", inner)]))
        dest = tmp_path / "out"
        dest.mkdir()

        with (
            pytest.raises(UnsafeZipError),
            SafeZipFile(outer_p, recursive=True) as zf,
        ):
            zf.extractall(dest)

        assert not (tmp_path / "evil.txt").exists()

    def test_recursive_mixed_members(self, tmp_path):
        """Regular files and nested zips are both handled correctly."""
        inner = self._build_zip([("data.txt", b"nested data")])
        outer_p = tmp_path / "outer.zip"
        outer_p.write_bytes(
            self._build_zip(
                [
                    ("top.txt", b"top level"),
                    ("pkg.zip", inner),
                ]
            )
        )
        dest = tmp_path / "out"
        dest.mkdir()

        with SafeZipFile(outer_p, recursive=True) as zf:
            zf.extractall(dest)

        assert (dest / "top.txt").read_bytes() == b"top level"
        assert (dest / "pkg" / "data.txt").read_bytes() == b"nested data"
        assert not (dest / "pkg.zip").exists()

    def test_recursive_content_detection_bypasses_extension(self, tmp_path):
        """A nested ZIP named with a non-ZIP extension is still recursed into
        when recursive=True (content-based detection)."""
        inner = self._build_zip([("secret.txt", b"inner content")])
        outer_buf = io.BytesIO()
        with zipfile.ZipFile(outer_buf, "w") as zf:
            zf.writestr("data.csv", inner)
        outer_p = tmp_path / "outer.zip"
        outer_p.write_bytes(outer_buf.getvalue())
        dest = tmp_path / "out"
        dest.mkdir()

        with SafeZipFile(outer_p, recursive=True) as zf:
            zf.extractall(dest)

        # .csv is not a known archive extension, so directory name stays as-is
        assert (dest / "data.csv" / "secret.txt").read_bytes() == b"inner content"

    def test_recursive_non_zip_with_zip_extension_not_recursed(self, tmp_path):
        """A file named .zip that is not actually a ZIP is extracted as a plain file."""
        outer_buf = io.BytesIO()
        with zipfile.ZipFile(outer_buf, "w") as zf:
            zf.writestr("fake.zip", b"this is not a zip file at all")
        outer_p = tmp_path / "outer.zip"
        outer_p.write_bytes(outer_buf.getvalue())
        dest = tmp_path / "out"
        dest.mkdir()

        with SafeZipFile(outer_p, recursive=True) as zf:
            zf.extractall(dest)

        assert (dest / "fake.zip").read_bytes() == b"this is not a zip file at all"


class TestPermissionSanitisation:
    """Dangerous Unix permission bits are stripped from extracted files."""

    def test_setuid_stripped_by_default(self, setuid_archive, tmp_path):
        """setuid bit is stripped by default."""
        dest = tmp_path / "out"
        dest.mkdir()
        with SafeZipFile(setuid_archive) as zf:
            zf.extractall(dest)
        mode = (dest / "suid_binary").stat().st_mode
        assert not (mode & stat.S_ISUID), "setuid bit must be stripped by default"

    def test_normal_permissions_unaffected(self, legitimate_archive, tmp_path):
        """Stripping special bits does not affect normal file access."""
        dest = tmp_path / "out"
        dest.mkdir()
        with SafeZipFile(legitimate_archive) as zf:
            zf.extractall(dest)
        for f in dest.rglob("*"):
            if f.is_file():
                assert f.stat().st_mode & stat.S_IRUSR


class TestSymlinkPolicy:
    """SafeZipFile enforces the configured SymlinkPolicy for ZIP symlink entries.

    A ZIP symlink entry is identified by the upper 16 bits of
    ``ZipInfo.external_attr`` carrying a Unix ``S_IFLNK`` file mode.
    The entry's data bytes contain the link target path.
    """

    def test_reject_is_default(self, symlink_archive, tmp_path):
        """Default policy (REJECT) raises UnsafeZipError on any symlink entry."""
        dest = tmp_path / "out"
        dest.mkdir()
        with pytest.raises(UnsafeZipError), SafeZipFile(symlink_archive) as zf:
            zf.extractall(dest)

    def test_reject_explicit_raises(self, symlink_archive, tmp_path):
        """Explicit REJECT policy raises UnsafeZipError on a symlink entry."""
        dest = tmp_path / "out"
        dest.mkdir()
        with (
            pytest.raises(UnsafeZipError),
            SafeZipFile(symlink_archive, symlink_policy=SymlinkPolicy.REJECT) as zf,
        ):
            zf.extractall(dest)

    def test_ignore_skips_symlink_entry(self, symlink_archive, tmp_path):
        """IGNORE policy silently skips symlink entries; no file is created."""
        dest = tmp_path / "out"
        dest.mkdir()
        with SafeZipFile(symlink_archive, symlink_policy=SymlinkPolicy.IGNORE) as zf:
            zf.extractall(dest)
        # The symlink entry must not appear on disk
        assert not (dest / "link.txt").exists()

    def test_ignore_preserves_regular_files(self, symlink_archive, tmp_path):
        """IGNORE policy skips symlinks but still extracts regular entries."""
        dest = tmp_path / "out"
        dest.mkdir()
        with SafeZipFile(symlink_archive, symlink_policy=SymlinkPolicy.IGNORE) as zf:
            zf.extractall(dest)
        assert (dest / "readme.txt").read_bytes() == b"safe content\n"

    def test_resolve_internal_extracts_target_as_file(self, symlink_archive, tmp_path):
        """RESOLVE_INTERNAL extracts the symlink target path as a regular file.

        Because the ZIP entry's content is the target string (not an OS
        symlink), the extracted file is a plain file containing that string.
        The post-extraction symlink check only fires when the OS creates an
        actual symlink (not applicable here), so extraction succeeds.
        """
        dest = tmp_path / "out"
        dest.mkdir()
        with SafeZipFile(
            symlink_archive, symlink_policy=SymlinkPolicy.RESOLVE_INTERNAL
        ) as zf:
            zf.extractall(dest)
        # The entry is written as a regular file containing the target path
        extracted = dest / "link.txt"
        assert extracted.exists()
        assert not extracted.is_symlink()
        assert extracted.read_text() == "../escape.txt"


class TestCompressSizeZero:
    """compress_size == 0 only occurs legitimately for empty members.

    Python's zipfile uses the central directory compress_size to control how
    many bytes it reads during decompression.  A non-empty member with
    compress_size=0 in the CD causes zipfile to read 0 bytes and then fail
    the CRC check (BadZipFile), so it never reaches the streamer's ratio logic.

    The only reachable case is a genuinely empty member, for which skipping
    the ratio check is correct — there is nothing to decompress.
    """

    def test_empty_member_skips_ratio_check_correctly(
        self, data_descriptor_empty_archive, tmp_path
    ):
        """Empty member (compress_size=0) extracts successfully even with a
        tight ratio limit.  Skipping the ratio check is correct behaviour."""
        dest = tmp_path / "out"
        dest.mkdir()

        with zipfile.ZipFile(data_descriptor_empty_archive) as zf:
            info = zf.infolist()[0]
            assert info.compress_size == 0
            assert info.file_size == 0

        with SafeZipFile(data_descriptor_empty_archive, max_per_member_ratio=1.0) as zf:
            zf.extractall(dest)

        assert (dest / "empty.txt").read_bytes() == b""

    def test_nonempty_with_zero_cd_compress_size_rejected_by_zipfile(
        self, data_descriptor_invalid_bomb_archive, tmp_path
    ):
        """A crafted archive with compress_size=0 in the CD but non-empty data
        is rejected by Python's zipfile with BadZipFile before the streamer's
        ratio logic is even reached.  The gap is not exploitable through
        Python's zipfile layer."""
        dest = tmp_path / "out"
        dest.mkdir()

        # Verify the CD does report compress_size=0 despite non-empty content.
        with zipfile.ZipFile(data_descriptor_invalid_bomb_archive) as zf:
            info = zf.infolist()[0]
            assert info.compress_size == 0
            assert info.file_size > 0

        # SafeZipFile opens fine (Guard sees compress_size=0, file_size=2000,
        # both within limits).  BadZipFile is raised by zipfile's CRC check
        # during streaming — before safezip's ratio logic is ever reached.
        with (
            pytest.raises(zipfile.BadZipFile),
            SafeZipFile(data_descriptor_invalid_bomb_archive) as zf,
        ):
            zf.extractall(dest)

        # No partial files left.
        remaining = [f for f in dest.rglob("*") if not f.is_dir()]
        assert not remaining


class TestEnvVarHandling:
    """Environment variable parsing edge cases."""

    def test_invalid_symlink_policy_env(self, legitimate_archive, monkeypatch, caplog):
        """Invalid symlink policy is logged and defaults to REJECT."""
        monkeypatch.setenv("SAFEZIP_SYMLINK_POLICY", "invalid_policy")
        with SafeZipFile(legitimate_archive, symlink_policy=None) as zf:
            assert zf._symlink_policy == SymlinkPolicy.REJECT
        assert "Ignoring unrecognised" in caplog.text

    def test_env_var_read_at_import_time(self, monkeypatch):
        """Changing env vars after import does not affect cached defaults.

        The module-level singletons (_DEFAULT_*) are evaluated once at import time.
        Late env changes do not alter limits on new SafeZipFile instances.
        """
        import safezip._core as _core

        original_default = _core._DEFAULT_MAX_FILES
        monkeypatch.setenv("SAFEZIP_MAX_FILES", "99")
        assert original_default == _core._DEFAULT_MAX_FILES

src/safezip/tests/test_sandbox.py

src/safezip/tests/test_sandbox.py
"""Tests for Phase B: path resolution and symlink policy (the Sandbox)."""

import pytest

from safezip import UnsafeZipError
from safezip._sandbox import resolve_member_path

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"


class TestPathTraversal:
    """resolve_member_path rejects all forms of path traversal."""

    def test_dotdot_relative(self, tmp_path):
        with pytest.raises(UnsafeZipError, match="traversal"):
            resolve_member_path(tmp_path, "../../evil.txt")

    def test_dotdot_in_middle(self, tmp_path):
        with pytest.raises(UnsafeZipError, match="traversal"):
            resolve_member_path(tmp_path, "subdir/../../../evil.txt")

    def test_dotdot_windows_style(self, tmp_path):
        with pytest.raises(UnsafeZipError, match="traversal"):
            resolve_member_path(tmp_path, "subdir\\..\\..\\evil.txt")

    def test_absolute_unix_path(self, tmp_path):
        with pytest.raises(UnsafeZipError):
            resolve_member_path(tmp_path, "/etc/passwd")

    def test_absolute_windows_path(self, tmp_path):
        with pytest.raises(UnsafeZipError):
            resolve_member_path(tmp_path, "C:\\Windows\\System32\\cmd.exe")

    def test_unc_path(self, tmp_path):
        with pytest.raises(UnsafeZipError):
            resolve_member_path(tmp_path, "//server/share/evil.txt")


class TestNullByte:
    """resolve_member_path rejects filenames with null bytes."""

    def test_null_byte_rejected(self, tmp_path):
        with pytest.raises(UnsafeZipError):
            resolve_member_path(tmp_path, "safe\x00../../etc/passwd")

    def test_null_byte_at_start(self, tmp_path):
        with pytest.raises(UnsafeZipError):
            resolve_member_path(tmp_path, "\x00evil.txt")


class TestLegitimateFilenames:
    """resolve_member_path accepts well-formed filenames."""

    def test_simple_filename(self, tmp_path):
        result = resolve_member_path(tmp_path, "hello.txt")
        assert result == tmp_path / "hello.txt"

    def test_nested_filename(self, tmp_path):
        result = resolve_member_path(tmp_path, "subdir/data.txt")
        assert result == tmp_path / "subdir" / "data.txt"

    def test_deep_nested(self, tmp_path):
        result = resolve_member_path(tmp_path, "a/b/c/d/e.txt")
        assert result == tmp_path / "a" / "b" / "c" / "d" / "e.txt"

    def test_windows_separator_legitimate(self, tmp_path):
        """Windows-style separators are normalised to forward slashes."""
        result = resolve_member_path(tmp_path, "subdir\\file.txt")
        assert result == tmp_path / "subdir" / "file.txt"

    def test_result_is_inside_base(self, tmp_path):
        result = resolve_member_path(tmp_path, "subdir/file.txt")
        assert str(result).startswith(str(tmp_path))

    def test_unicode_filename(self, tmp_path):
        result = resolve_member_path(tmp_path, "données/résumé.txt")
        assert result.name == "résumé.txt"

    def test_leading_slash_rejected(self, tmp_path):
        """A leading slash is treated as an absolute path and rejected."""
        with pytest.raises(UnsafeZipError, match="Absolute path"):
            resolve_member_path(tmp_path, "/file.txt")

    def test_dot_components_stripped(self, tmp_path):
        result = resolve_member_path(tmp_path, "./subdir/./file.txt")
        assert result == tmp_path / "subdir" / "file.txt"

    def test_empty_parts_stripped(self, tmp_path):
        result = resolve_member_path(tmp_path, "subdir//file.txt")
        assert result == tmp_path / "subdir" / "file.txt"


class TestPathLengthLimit:
    """resolve_member_path rejects excessively long paths."""

    def test_very_long_filename_rejected(self, tmp_path):
        long_name = "a" * 5000 + ".txt"
        with pytest.raises(UnsafeZipError, match="too long"):
            resolve_member_path(tmp_path, long_name)

src/safezip/tests/test_streamer.py

src/safezip/tests/test_streamer.py
"""Tests for Phase C: streaming extraction (the Streamer)."""

import io
import zipfile

import pytest

from safezip import (
    CompressionRatioError,
    FileSizeExceededError,
    MalformedArchiveError,
    SafeZipFile,
)

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"


class TestFileSizeLimit:
    """Streamer enforces per-member file size limits at stream time."""

    def test_size_exceeded_raises(self, tmp_path):
        buf = io.BytesIO()
        with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_STORED) as zf:
            zf.writestr("data.bin", b"A" * 1000)
        p = tmp_path / "large.zip"
        p.write_bytes(buf.getvalue())
        dest = tmp_path / "out"
        dest.mkdir()

        with (
            pytest.raises(FileSizeExceededError),
            SafeZipFile(p, max_file_size=500) as zf,
        ):
            zf.extractall(dest)

    def test_no_partial_file_after_size_failure(self, tmp_path):
        """Atomic write: no partial file must remain after FileSizeExceededError."""
        buf = io.BytesIO()
        with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_STORED) as zf:
            zf.writestr("data.bin", b"A" * 1000)
        p = tmp_path / "large.zip"
        p.write_bytes(buf.getvalue())
        dest = tmp_path / "out"
        dest.mkdir()

        with (
            pytest.raises(FileSizeExceededError),
            SafeZipFile(p, max_file_size=500) as zf,
        ):
            zf.extractall(dest)

        # No partial files or temp files should remain
        remaining = list(dest.rglob("*"))
        assert not remaining, f"Partial files found: {remaining}"

    def test_size_at_limit_passes(self, tmp_path):
        buf = io.BytesIO()
        with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_STORED) as zf:
            zf.writestr("data.bin", b"A" * 100)
        p = tmp_path / "ok.zip"
        p.write_bytes(buf.getvalue())
        dest = tmp_path / "out"
        dest.mkdir()

        with SafeZipFile(p, max_file_size=100) as zf:
            zf.extractall(dest)
        assert (dest / "data.bin").read_bytes() == b"A" * 100


class TestTotalSizeLimit:
    """Streamer enforces cumulative total size across all members."""

    def test_total_size_exceeded(self, tmp_path):
        """Total size limit enforced during Guard phase when limits are threaded."""
        buf = io.BytesIO()
        with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_STORED) as zf:
            for i in range(5):
                zf.writestr(f"file_{i}.bin", b"A" * 300)
        p = tmp_path / "multi.zip"
        p.write_bytes(buf.getvalue())
        dest = tmp_path / "out"
        dest.mkdir()

        with pytest.raises(MalformedArchiveError):
            SafeZipFile(p, max_file_size=1000, max_total_size=1000)


class TestCompressionRatioLimit:
    """Streamer enforces per-member and total compression ratio limits."""

    def test_per_member_ratio_exceeded(self, high_ratio_archive, tmp_path):
        """High-ratio archive (zeros) triggers per-member ratio check."""
        dest = tmp_path / "out"
        dest.mkdir()
        with (
            pytest.raises(CompressionRatioError),
            SafeZipFile(high_ratio_archive, max_per_member_ratio=10.0) as zf,
        ):
            zf.extractall(dest)

    def test_no_partial_file_after_ratio_failure(self, high_ratio_archive, tmp_path):
        """Atomic write: no partial file must remain after CompressionRatioError."""
        dest = tmp_path / "out"
        dest.mkdir()

        with (
            pytest.raises(CompressionRatioError),
            SafeZipFile(high_ratio_archive, max_per_member_ratio=10.0) as zf,
        ):
            zf.extractall(dest)

        remaining = [f for f in dest.rglob("*") if not f.is_dir()]
        assert not remaining, f"Partial files found: {remaining}"

    def test_high_ratio_passes_with_generous_limit(self, high_ratio_archive, tmp_path):
        """Same archive passes if we allow a high ratio (both per-member and total)."""
        dest = tmp_path / "out"
        dest.mkdir()
        with SafeZipFile(
            high_ratio_archive,
            max_per_member_ratio=2000.0,
            max_total_ratio=2000.0,
            max_file_size=5 * 1024 * 1024,
        ) as zf:
            zf.extractall(dest)
        assert (dest / "zeros.bin").exists()


class TestAtomicWrite:
    """Extraction destinations are created atomically."""

    def test_successful_extraction_creates_file(self, tmp_path):
        buf = io.BytesIO()
        with zipfile.ZipFile(buf, "w") as zf:
            zf.writestr("output.txt", b"hello safezip")
        p = tmp_path / "ok.zip"
        p.write_bytes(buf.getvalue())
        dest = tmp_path / "out"
        dest.mkdir()

        with SafeZipFile(p) as zf:
            zf.extractall(dest)
        assert (dest / "output.txt").read_bytes() == b"hello safezip"

    def test_extract_single_member(self, tmp_path):
        buf = io.BytesIO()
        with zipfile.ZipFile(buf, "w") as zf:
            zf.writestr("a.txt", b"AAA")
            zf.writestr("b.txt", b"BBB")
        p = tmp_path / "two.zip"
        p.write_bytes(buf.getvalue())
        dest = tmp_path / "out"
        dest.mkdir()

        with SafeZipFile(p) as zf:
            zf.extract("a.txt", dest)
        assert (dest / "a.txt").read_bytes() == b"AAA"
        assert not (dest / "b.txt").exists()

    def test_no_temp_files_after_success(self, tmp_path):
        buf = io.BytesIO()
        with zipfile.ZipFile(buf, "w") as zf:
            zf.writestr("hello.txt", b"world")
        p = tmp_path / "ok.zip"
        p.write_bytes(buf.getvalue())
        dest = tmp_path / "out"
        dest.mkdir()

        with SafeZipFile(p) as zf:
            zf.extractall(dest)

        all_files = list(dest.rglob("*"))
        temp_files = [f for f in all_files if ".safezip_tmp_" in f.name]
        assert not temp_files