diff --git a/lib/crewai-tools/src/crewai_tools/tools/files_compressor_tool/files_compressor_tool.py b/lib/crewai-tools/src/crewai_tools/tools/files_compressor_tool/files_compressor_tool.py index 379d5c2471..5955ea523b 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/files_compressor_tool/files_compressor_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/files_compressor_tool/files_compressor_tool.py @@ -77,11 +77,19 @@ def _run( "tar.xz": self._compress_tar, } if format == "zip": - format_compression[format](input_path, output_path) # type: ignore[operator] + skipped = format_compression[format](input_path, output_path) # type: ignore[operator] else: - format_compression[format](input_path, output_path, format) # type: ignore[operator] - - return f"Successfully compressed '{input_path}' into '{output_path}'" + skipped = format_compression[format]( # type: ignore[operator] + input_path, output_path, format + ) + + message = f"Successfully compressed '{input_path}' into '{output_path}'" + if isinstance(skipped, list) and skipped: + message += ( + f" ({len(skipped)} item(s) skipped for safety: resolved " + f"outside the allowed directory)" + ) + return message except FileNotFoundError: return f"Error: File not found at path: {input_path}" except PermissionError: @@ -109,8 +117,18 @@ def _prepare_output(output_path: str, overwrite: bool) -> bool: return True @staticmethod - def _compress_zip(input_path: str, output_path: str) -> None: - """Compresses input into a zip archive.""" + def _compress_zip(input_path: str, output_path: str) -> list[str]: + """Compresses input into a zip archive. + + Returns the files skipped because they resolved outside the allow-list. + ``zipfile.write`` dereferences symlinks, so each walked file is + validated to stop a symlink inside the tree from copying the contents + of an out-of-tree target (e.g. ``~/.ssh/id_rsa``) into the archive. + """ + # Defense in depth: validate the write target at the sink, so this is + # safe even if called directly rather than through _run. + output_path = validate_file_path(output_path) + skipped: list[str] = [] with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zipf: if os.path.isfile(input_path): zipf.write(input_path, os.path.basename(input_path)) @@ -118,12 +136,25 @@ def _compress_zip(input_path: str, output_path: str) -> None: for root, _, files in os.walk(input_path): for file in files: full_path = os.path.join(root, file) + try: + validate_file_path(full_path) + except ValueError: + skipped.append(full_path) + continue arcname = os.path.relpath(full_path, start=input_path) zipf.write(full_path, arcname) + return skipped @staticmethod - def _compress_tar(input_path: str, output_path: str, format: str) -> None: - """Compresses input into a tar archive with the given format.""" + def _compress_tar(input_path: str, output_path: str, format: str) -> list[str]: + """Compresses input into a tar archive with the given format. + + Returns the members skipped for safety. ``tarfile`` stores symlinks and + hardlinks as link entries rather than dereferencing them, so the + compress-time content leak that affects zip does not apply here. The + filter drops link members so an out-of-tree symlink target cannot be + shipped inside the archive and resolved at extraction time. + """ format_mode = { "tar": "w", "tar.gz": "w:gz", @@ -135,7 +166,18 @@ def _compress_tar(input_path: str, output_path: str, format: str) -> None: raise ValueError(f"Unsupported tar format: {format}") mode = format_mode[format] + skipped: list[str] = [] + def _drop_links(tarinfo: tarfile.TarInfo) -> tarfile.TarInfo | None: + if tarinfo.issym() or tarinfo.islnk(): + skipped.append(tarinfo.name) + return None + return tarinfo + + # Defense in depth: validate the write target at the sink, so this is + # safe even if called directly rather than through _run. + output_path = validate_file_path(output_path) with tarfile.open(output_path, mode) as tarf: # type: ignore[call-overload] arcname = os.path.basename(input_path) - tarf.add(input_path, arcname=arcname) + tarf.add(input_path, arcname=arcname, filter=_drop_links) + return skipped diff --git a/lib/crewai-tools/tests/tools/files_compressor_tool_test.py b/lib/crewai-tools/tests/tools/files_compressor_tool_test.py index 9c4a4228f6..668ca9780d 100644 --- a/lib/crewai-tools/tests/tools/files_compressor_tool_test.py +++ b/lib/crewai-tools/tests/tools/files_compressor_tool_test.py @@ -1,6 +1,13 @@ +import os +import shutil +import tarfile +import tempfile +import zipfile from unittest.mock import patch -from crewai_tools.tools.files_compressor_tool import FileCompressorTool +from crewai_tools.tools.files_compressor_tool.files_compressor_tool import ( + FileCompressorTool, +) import pytest @@ -129,3 +136,86 @@ def test_prepare_output_makes_dir(mock_exists, mock_makedirs): result = tool._prepare_output("some/missing/path/file.zip", overwrite=True) assert result is True mock_makedirs.assert_called_once() + + +# --- Security: symlink content must not leak out of the allow-list --- + + +@pytest.fixture +def symlink_env(): + """A working dir (the allowed root, via cwd) containing a normal file and a + symlink pointing at a secret file OUTSIDE that root. + + The working directory is the allowed root for path validation, so we chdir + into ``work_dir`` rather than relying on CREWAI_TOOLS_ALLOWED_DIRS. This + keeps the test independent of the configurable-allow-list feature and + exercises the default cwd confinement. + """ + work_dir = os.path.realpath(tempfile.mkdtemp()) + secret_dir = os.path.realpath(tempfile.mkdtemp()) # outside the allowed root + secret_file = os.path.join(secret_dir, "secret.txt") + with open(secret_file, "w") as f: + f.write("TOP_SECRET_PRIVATE_KEY") + + src = os.path.join(work_dir, "src") + os.makedirs(src) + with open(os.path.join(src, "normal.txt"), "w") as f: + f.write("safe content") + os.symlink(secret_file, os.path.join(src, "leak.txt")) + + prev_cwd = os.getcwd() + os.chdir(work_dir) + yield {"work_dir": work_dir, "src": src, "secret_dir": secret_dir} + os.chdir(prev_cwd) + shutil.rmtree(work_dir, ignore_errors=True) + shutil.rmtree(secret_dir, ignore_errors=True) + + +def test_zip_excludes_symlink_to_outside_file(tool, symlink_env): + out = os.path.join(symlink_env["work_dir"], "archive.zip") + result = tool._run( + input_path=symlink_env["src"], output_path=out, format="zip", overwrite=True + ) + assert "Successfully compressed" in result + assert "skipped for safety" in result + + with zipfile.ZipFile(out) as zf: + names = zf.namelist() + assert "normal.txt" in names + assert "leak.txt" not in names + blob = b"".join(zf.read(n) for n in names) + assert b"TOP_SECRET_PRIVATE_KEY" not in blob + + +def test_tar_excludes_symlink_to_outside_file(tool, symlink_env): + out = os.path.join(symlink_env["work_dir"], "archive.tar.gz") + result = tool._run( + input_path=symlink_env["src"], + output_path=out, + format="tar.gz", + overwrite=True, + ) + assert "Successfully compressed" in result + assert "skipped for safety" in result + + with tarfile.open(out) as tf: + members = tf.getnames() + assert any(m.endswith("normal.txt") for m in members) + assert not any(m.endswith("leak.txt") for m in members) + assert all(not (tf.getmember(m).issym() or tf.getmember(m).islnk()) for m in members) + + +def test_compress_zip_validates_output_path_at_sink(symlink_env): + # Calling the compressor directly (bypassing _run) must still refuse to + # write outside the allow-list. + outside = os.path.join(symlink_env["secret_dir"], "evil.zip") + with pytest.raises(ValueError, match="outside the allowed director"): + FileCompressorTool._compress_zip(symlink_env["src"], outside) + assert not os.path.exists(outside) + + +def test_compress_tar_validates_output_path_at_sink(symlink_env): + outside = os.path.join(symlink_env["secret_dir"], "evil.tar.gz") + with pytest.raises(ValueError, match="outside the allowed director"): + FileCompressorTool._compress_tar(symlink_env["src"], outside, "tar.gz") + assert not os.path.exists(outside)