From 8e6d5fb38bbc8c1b0a97b46f5dba9ef8d435fa54 Mon Sep 17 00:00:00 2001 From: Rip&Tear <84775494+theCyberTech@users.noreply.github.com> Date: Sat, 20 Jun 2026 01:38:12 +0800 Subject: [PATCH 1/2] fix: stop FileCompressorTool from archiving out-of-tree symlink targets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit input_path/output_path were already allow-list validated, but when compressing a directory the per-file members were not. zipfile.write() dereferences symlinks, so a symlink inside an allowed directory pointing at an out-of-tree secret (e.g. ~/.ssh/id_rsa, /etc/passwd) had its contents copied into the archive — a confinement bypass / info-disclosure. - _compress_zip: validate each walked file against the allow-list; skip and record any whose resolved path escapes it. - _compress_tar: filter drops symlink/hardlink members (tar stores links rather than dereferencing, so this prevents shipping an out-of-tree link that resolves at extraction time). - _run: surface a "N item(s) skipped for safety" note rather than swallowing the exclusions. - tests: regression tests asserting the secret content never lands in the zip or tar archive. Also fix a pre-existing broken import that made the whole compressor test module error at collection (empty package __init__; aligned to the module-path import used by sibling tools). Co-Authored-By: Claude Opus 4.8 --- .../files_compressor_tool.py | 54 +++++++++++--- .../tests/tools/files_compressor_tool_test.py | 73 ++++++++++++++++++- 2 files changed, 117 insertions(+), 10 deletions(-) diff --git a/lib/crewai-tools/src/crewai_tools/tools/files_compressor_tool/files_compressor_tool.py b/lib/crewai-tools/src/crewai_tools/tools/files_compressor_tool/files_compressor_tool.py index 379d5c2471..4b52537c8d 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/files_compressor_tool/files_compressor_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/files_compressor_tool/files_compressor_tool.py @@ -77,11 +77,19 @@ def _run( "tar.xz": self._compress_tar, } if format == "zip": - format_compression[format](input_path, output_path) # type: ignore[operator] + skipped = format_compression[format](input_path, output_path) # type: ignore[operator] else: - format_compression[format](input_path, output_path, format) # type: ignore[operator] - - return f"Successfully compressed '{input_path}' into '{output_path}'" + skipped = format_compression[format]( # type: ignore[operator] + input_path, output_path, format + ) + + message = f"Successfully compressed '{input_path}' into '{output_path}'" + if isinstance(skipped, list) and skipped: + message += ( + f" ({len(skipped)} item(s) skipped for safety: resolved " + f"outside the allowed directory)" + ) + return message except FileNotFoundError: return f"Error: File not found at path: {input_path}" except PermissionError: @@ -109,8 +117,15 @@ def _prepare_output(output_path: str, overwrite: bool) -> bool: return True @staticmethod - def _compress_zip(input_path: str, output_path: str) -> None: - """Compresses input into a zip archive.""" + def _compress_zip(input_path: str, output_path: str) -> list[str]: + """Compresses input into a zip archive. + + Returns the files skipped because they resolved outside the allow-list. + ``zipfile.write`` dereferences symlinks, so each walked file is + validated to stop a symlink inside the tree from copying the contents + of an out-of-tree target (e.g. ``~/.ssh/id_rsa``) into the archive. + """ + skipped: list[str] = [] with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zipf: if os.path.isfile(input_path): zipf.write(input_path, os.path.basename(input_path)) @@ -118,12 +133,25 @@ def _compress_zip(input_path: str, output_path: str) -> None: for root, _, files in os.walk(input_path): for file in files: full_path = os.path.join(root, file) + try: + validate_file_path(full_path) + except ValueError: + skipped.append(full_path) + continue arcname = os.path.relpath(full_path, start=input_path) zipf.write(full_path, arcname) + return skipped @staticmethod - def _compress_tar(input_path: str, output_path: str, format: str) -> None: - """Compresses input into a tar archive with the given format.""" + def _compress_tar(input_path: str, output_path: str, format: str) -> list[str]: + """Compresses input into a tar archive with the given format. + + Returns the members skipped for safety. ``tarfile`` stores symlinks and + hardlinks as link entries rather than dereferencing them, so the + compress-time content leak that affects zip does not apply here. The + filter drops link members so an out-of-tree symlink target cannot be + shipped inside the archive and resolved at extraction time. + """ format_mode = { "tar": "w", "tar.gz": "w:gz", @@ -135,7 +163,15 @@ def _compress_tar(input_path: str, output_path: str, format: str) -> None: raise ValueError(f"Unsupported tar format: {format}") mode = format_mode[format] + skipped: list[str] = [] + + def _drop_links(tarinfo: tarfile.TarInfo) -> tarfile.TarInfo | None: + if tarinfo.issym() or tarinfo.islnk(): + skipped.append(tarinfo.name) + return None + return tarinfo with tarfile.open(output_path, mode) as tarf: # type: ignore[call-overload] arcname = os.path.basename(input_path) - tarf.add(input_path, arcname=arcname) + tarf.add(input_path, arcname=arcname, filter=_drop_links) + return skipped diff --git a/lib/crewai-tools/tests/tools/files_compressor_tool_test.py b/lib/crewai-tools/tests/tools/files_compressor_tool_test.py index 9c4a4228f6..b7845f0605 100644 --- a/lib/crewai-tools/tests/tools/files_compressor_tool_test.py +++ b/lib/crewai-tools/tests/tools/files_compressor_tool_test.py @@ -1,6 +1,13 @@ +import os +import shutil +import tarfile +import tempfile +import zipfile from unittest.mock import patch -from crewai_tools.tools.files_compressor_tool import FileCompressorTool +from crewai_tools.tools.files_compressor_tool.files_compressor_tool import ( + FileCompressorTool, +) import pytest @@ -129,3 +136,67 @@ def test_prepare_output_makes_dir(mock_exists, mock_makedirs): result = tool._prepare_output("some/missing/path/file.zip", overwrite=True) assert result is True mock_makedirs.assert_called_once() + + +# --- Security: symlink content must not leak out of the allow-list --- + + +@pytest.fixture +def symlink_env(): + """A working dir (allow-listed) containing a normal file and a symlink that + points to a secret file OUTSIDE the allow-list.""" + work_dir = tempfile.mkdtemp() + secret_dir = tempfile.mkdtemp() # deliberately NOT allow-listed + secret_file = os.path.join(secret_dir, "secret.txt") + with open(secret_file, "w") as f: + f.write("TOP_SECRET_PRIVATE_KEY") + + src = os.path.join(work_dir, "src") + os.makedirs(src) + with open(os.path.join(src, "normal.txt"), "w") as f: + f.write("safe content") + os.symlink(secret_file, os.path.join(src, "leak.txt")) + + prev = os.environ.get("CREWAI_TOOLS_ALLOWED_DIRS") + os.environ["CREWAI_TOOLS_ALLOWED_DIRS"] = work_dir + yield {"work_dir": work_dir, "src": src, "secret_dir": secret_dir} + if prev is None: + os.environ.pop("CREWAI_TOOLS_ALLOWED_DIRS", None) + else: + os.environ["CREWAI_TOOLS_ALLOWED_DIRS"] = prev + shutil.rmtree(work_dir, ignore_errors=True) + shutil.rmtree(secret_dir, ignore_errors=True) + + +def test_zip_excludes_symlink_to_outside_file(tool, symlink_env): + out = os.path.join(symlink_env["work_dir"], "archive.zip") + result = tool._run( + input_path=symlink_env["src"], output_path=out, format="zip", overwrite=True + ) + assert "Successfully compressed" in result + assert "skipped for safety" in result + + with zipfile.ZipFile(out) as zf: + names = zf.namelist() + assert "normal.txt" in names + assert "leak.txt" not in names + blob = b"".join(zf.read(n) for n in names) + assert b"TOP_SECRET_PRIVATE_KEY" not in blob + + +def test_tar_excludes_symlink_to_outside_file(tool, symlink_env): + out = os.path.join(symlink_env["work_dir"], "archive.tar.gz") + result = tool._run( + input_path=symlink_env["src"], + output_path=out, + format="tar.gz", + overwrite=True, + ) + assert "Successfully compressed" in result + assert "skipped for safety" in result + + with tarfile.open(out) as tf: + members = tf.getnames() + assert any(m.endswith("normal.txt") for m in members) + assert not any(m.endswith("leak.txt") for m in members) + assert all(not (tf.getmember(m).issym() or tf.getmember(m).islnk()) for m in members) From 034b119d347d4c6241a2aec065d06e33d32ba1b7 Mon Sep 17 00:00:00 2001 From: Rip&Tear <84775494+theCyberTech@users.noreply.github.com> Date: Sat, 20 Jun 2026 01:54:50 +0800 Subject: [PATCH 2/2] fix: validate compressor output_path at the write sink (defense in depth) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Corridor scan flagged output_path reaching the archive writer. _run does validate output_path before dispatch, but _compress_zip/_compress_tar are staticmethods reachable independently of _run — called directly they would write unvalidated. Validate at the sink so every call site is confined to the allow-list, not just the _run entrypoint. Also decouple the symlink tests from the configurable-allow-list feature (PR #6248): chdir into the working dir so the allowed root is cwd, instead of setting CREWAI_TOOLS_ALLOWED_DIRS (which this branch, off main, does not yet support). Adds direct-call tests asserting the sink rejects an out-of-root output path. Co-Authored-By: Claude Opus 4.8 --- .../files_compressor_tool.py | 6 +++ .../tests/tools/files_compressor_tool_test.py | 39 ++++++++++++++----- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/lib/crewai-tools/src/crewai_tools/tools/files_compressor_tool/files_compressor_tool.py b/lib/crewai-tools/src/crewai_tools/tools/files_compressor_tool/files_compressor_tool.py index 4b52537c8d..5955ea523b 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/files_compressor_tool/files_compressor_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/files_compressor_tool/files_compressor_tool.py @@ -125,6 +125,9 @@ def _compress_zip(input_path: str, output_path: str) -> list[str]: validated to stop a symlink inside the tree from copying the contents of an out-of-tree target (e.g. ``~/.ssh/id_rsa``) into the archive. """ + # Defense in depth: validate the write target at the sink, so this is + # safe even if called directly rather than through _run. + output_path = validate_file_path(output_path) skipped: list[str] = [] with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zipf: if os.path.isfile(input_path): @@ -171,6 +174,9 @@ def _drop_links(tarinfo: tarfile.TarInfo) -> tarfile.TarInfo | None: return None return tarinfo + # Defense in depth: validate the write target at the sink, so this is + # safe even if called directly rather than through _run. + output_path = validate_file_path(output_path) with tarfile.open(output_path, mode) as tarf: # type: ignore[call-overload] arcname = os.path.basename(input_path) tarf.add(input_path, arcname=arcname, filter=_drop_links) diff --git a/lib/crewai-tools/tests/tools/files_compressor_tool_test.py b/lib/crewai-tools/tests/tools/files_compressor_tool_test.py index b7845f0605..668ca9780d 100644 --- a/lib/crewai-tools/tests/tools/files_compressor_tool_test.py +++ b/lib/crewai-tools/tests/tools/files_compressor_tool_test.py @@ -143,10 +143,16 @@ def test_prepare_output_makes_dir(mock_exists, mock_makedirs): @pytest.fixture def symlink_env(): - """A working dir (allow-listed) containing a normal file and a symlink that - points to a secret file OUTSIDE the allow-list.""" - work_dir = tempfile.mkdtemp() - secret_dir = tempfile.mkdtemp() # deliberately NOT allow-listed + """A working dir (the allowed root, via cwd) containing a normal file and a + symlink pointing at a secret file OUTSIDE that root. + + The working directory is the allowed root for path validation, so we chdir + into ``work_dir`` rather than relying on CREWAI_TOOLS_ALLOWED_DIRS. This + keeps the test independent of the configurable-allow-list feature and + exercises the default cwd confinement. + """ + work_dir = os.path.realpath(tempfile.mkdtemp()) + secret_dir = os.path.realpath(tempfile.mkdtemp()) # outside the allowed root secret_file = os.path.join(secret_dir, "secret.txt") with open(secret_file, "w") as f: f.write("TOP_SECRET_PRIVATE_KEY") @@ -157,13 +163,10 @@ def symlink_env(): f.write("safe content") os.symlink(secret_file, os.path.join(src, "leak.txt")) - prev = os.environ.get("CREWAI_TOOLS_ALLOWED_DIRS") - os.environ["CREWAI_TOOLS_ALLOWED_DIRS"] = work_dir + prev_cwd = os.getcwd() + os.chdir(work_dir) yield {"work_dir": work_dir, "src": src, "secret_dir": secret_dir} - if prev is None: - os.environ.pop("CREWAI_TOOLS_ALLOWED_DIRS", None) - else: - os.environ["CREWAI_TOOLS_ALLOWED_DIRS"] = prev + os.chdir(prev_cwd) shutil.rmtree(work_dir, ignore_errors=True) shutil.rmtree(secret_dir, ignore_errors=True) @@ -200,3 +203,19 @@ def test_tar_excludes_symlink_to_outside_file(tool, symlink_env): assert any(m.endswith("normal.txt") for m in members) assert not any(m.endswith("leak.txt") for m in members) assert all(not (tf.getmember(m).issym() or tf.getmember(m).islnk()) for m in members) + + +def test_compress_zip_validates_output_path_at_sink(symlink_env): + # Calling the compressor directly (bypassing _run) must still refuse to + # write outside the allow-list. + outside = os.path.join(symlink_env["secret_dir"], "evil.zip") + with pytest.raises(ValueError, match="outside the allowed director"): + FileCompressorTool._compress_zip(symlink_env["src"], outside) + assert not os.path.exists(outside) + + +def test_compress_tar_validates_output_path_at_sink(symlink_env): + outside = os.path.join(symlink_env["secret_dir"], "evil.tar.gz") + with pytest.raises(ValueError, match="outside the allowed director"): + FileCompressorTool._compress_tar(symlink_env["src"], outside, "tar.gz") + assert not os.path.exists(outside)