Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During directory compression, the code validates each file path but discards the resolved, canonical path returned by validate_file_path(...). The subsequent write uses the original full_path, creating a TOCTOU window where a symlink can be swapped between validation and use, allowing out-of-tree content to be archived.

try:
    validate_file_path(full_path)
except ValueError:
    skipped.append(full_path)
    continue
arcname = os.path.relpath(full_path, start=input_path)
zipf.write(full_path, arcname)

Project guardrail 59 requires using the canonical path returned by validate_file_path() for the actual I/O to prevent TOCTOU races.

Remediation: Use the resolved path from validate_file_path(...) for the write operation.

Suggested change
zipf.write(validate_file_path(full_path), arcname)

For more details, see the finding in Corridor.

Provide feedback: Reply with whether this is a valid vulnerability or false positive to help improve Corridor's accuracy.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When input_path points to a single file and _compress_zip is called directly (bypassing _run), the code writes the file to the archive without validating it against the allowed directory set. This permits archiving an arbitrary out-of-tree file, violating the path-confinement policy.

if os.path.isfile(input_path):
    zipf.write(input_path, os.path.basename(input_path))

Remediation: Validate input_path and use the returned resolved path for the write per guardrail 59.

Suggested change
zipf.write(validate_file_path(input_path), os.path.basename(input_path))

For more details, see the finding in Corridor.

Provide feedback: Reply with whether this is a valid vulnerability or false positive to help improve Corridor's accuracy.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _compress_tar, when input_path is a single file or directory and this helper is called directly (bypassing _run), input_path is not validated against the allow-list before being added to the archive. Although link members are filtered, unvalidated regular files can still be archived from outside the allowed directory.

with tarfile.open(output_path, mode) as tarf:  # type: ignore[call-overload]
    arcname = os.path.basename(input_path)
    tarf.add(input_path, arcname=arcname, filter=_drop_links)

Remediation: Validate input_path and use the resolved path for tarf.add(...) in line with guardrail 59.

Suggested change
tarf.add(validate_file_path(input_path), arcname=arcname, filter=_drop_links)

For more details, see the finding in Corridor.

Provide feedback: Reply with whether this is a valid vulnerability or false positive to help improve Corridor's accuracy.

Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,19 @@ def _run(
"tar.xz": self._compress_tar,
}
if format == "zip":
format_compression[format](input_path, output_path) # type: ignore[operator]
skipped = format_compression[format](input_path, output_path) # type: ignore[operator]
else:
format_compression[format](input_path, output_path, format) # type: ignore[operator]

return f"Successfully compressed '{input_path}' into '{output_path}'"
skipped = format_compression[format]( # type: ignore[operator]
input_path, output_path, format
)

message = f"Successfully compressed '{input_path}' into '{output_path}'"
if isinstance(skipped, list) and skipped:
message += (
f" ({len(skipped)} item(s) skipped for safety: resolved "
f"outside the allowed directory)"
)
return message
except FileNotFoundError:
return f"Error: File not found at path: {input_path}"
except PermissionError:
Expand Down Expand Up @@ -109,21 +117,44 @@ def _prepare_output(output_path: str, overwrite: bool) -> bool:
return True

@staticmethod
def _compress_zip(input_path: str, output_path: str) -> None:
"""Compresses input into a zip archive."""
def _compress_zip(input_path: str, output_path: str) -> list[str]:
"""Compresses input into a zip archive.

Returns the files skipped because they resolved outside the allow-list.
``zipfile.write`` dereferences symlinks, so each walked file is
validated to stop a symlink inside the tree from copying the contents
of an out-of-tree target (e.g. ``~/.ssh/id_rsa``) into the archive.
"""
# Defense in depth: validate the write target at the sink, so this is
# safe even if called directly rather than through _run.
output_path = validate_file_path(output_path)
skipped: list[str] = []
with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zipf:
if os.path.isfile(input_path):
zipf.write(input_path, os.path.basename(input_path))
else:
for root, _, files in os.walk(input_path):
for file in files:
full_path = os.path.join(root, file)
try:
validate_file_path(full_path)
except ValueError:
skipped.append(full_path)
continue
arcname = os.path.relpath(full_path, start=input_path)
zipf.write(full_path, arcname)
return skipped

@staticmethod
def _compress_tar(input_path: str, output_path: str, format: str) -> None:
"""Compresses input into a tar archive with the given format."""
def _compress_tar(input_path: str, output_path: str, format: str) -> list[str]:
"""Compresses input into a tar archive with the given format.

Returns the members skipped for safety. ``tarfile`` stores symlinks and
hardlinks as link entries rather than dereferencing them, so the
compress-time content leak that affects zip does not apply here. The
filter drops link members so an out-of-tree symlink target cannot be
shipped inside the archive and resolved at extraction time.
"""
format_mode = {
"tar": "w",
"tar.gz": "w:gz",
Expand All @@ -135,7 +166,18 @@ def _compress_tar(input_path: str, output_path: str, format: str) -> None:
raise ValueError(f"Unsupported tar format: {format}")

mode = format_mode[format]
skipped: list[str] = []

def _drop_links(tarinfo: tarfile.TarInfo) -> tarfile.TarInfo | None:
if tarinfo.issym() or tarinfo.islnk():
skipped.append(tarinfo.name)
return None
return tarinfo

# Defense in depth: validate the write target at the sink, so this is
# safe even if called directly rather than through _run.
output_path = validate_file_path(output_path)
with tarfile.open(output_path, mode) as tarf: # type: ignore[call-overload]
arcname = os.path.basename(input_path)
tarf.add(input_path, arcname=arcname)
tarf.add(input_path, arcname=arcname, filter=_drop_links)
return skipped
92 changes: 91 additions & 1 deletion lib/crewai-tools/tests/tools/files_compressor_tool_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import os
import shutil
import tarfile
import tempfile
import zipfile
from unittest.mock import patch

from crewai_tools.tools.files_compressor_tool import FileCompressorTool
from crewai_tools.tools.files_compressor_tool.files_compressor_tool import (
FileCompressorTool,
)
import pytest


Expand Down Expand Up @@ -129,3 +136,86 @@ def test_prepare_output_makes_dir(mock_exists, mock_makedirs):
result = tool._prepare_output("some/missing/path/file.zip", overwrite=True)
assert result is True
mock_makedirs.assert_called_once()


# --- Security: symlink content must not leak out of the allow-list ---


@pytest.fixture
def symlink_env():
"""A working dir (the allowed root, via cwd) containing a normal file and a
symlink pointing at a secret file OUTSIDE that root.

The working directory is the allowed root for path validation, so we chdir
into ``work_dir`` rather than relying on CREWAI_TOOLS_ALLOWED_DIRS. This
keeps the test independent of the configurable-allow-list feature and
exercises the default cwd confinement.
"""
work_dir = os.path.realpath(tempfile.mkdtemp())
secret_dir = os.path.realpath(tempfile.mkdtemp()) # outside the allowed root
secret_file = os.path.join(secret_dir, "secret.txt")
with open(secret_file, "w") as f:
f.write("TOP_SECRET_PRIVATE_KEY")

src = os.path.join(work_dir, "src")
os.makedirs(src)
with open(os.path.join(src, "normal.txt"), "w") as f:
f.write("safe content")
os.symlink(secret_file, os.path.join(src, "leak.txt"))

prev_cwd = os.getcwd()
os.chdir(work_dir)
yield {"work_dir": work_dir, "src": src, "secret_dir": secret_dir}
os.chdir(prev_cwd)
shutil.rmtree(work_dir, ignore_errors=True)
shutil.rmtree(secret_dir, ignore_errors=True)


def test_zip_excludes_symlink_to_outside_file(tool, symlink_env):
out = os.path.join(symlink_env["work_dir"], "archive.zip")
result = tool._run(
input_path=symlink_env["src"], output_path=out, format="zip", overwrite=True
)
assert "Successfully compressed" in result
assert "skipped for safety" in result

with zipfile.ZipFile(out) as zf:
names = zf.namelist()
assert "normal.txt" in names
assert "leak.txt" not in names
blob = b"".join(zf.read(n) for n in names)
assert b"TOP_SECRET_PRIVATE_KEY" not in blob


def test_tar_excludes_symlink_to_outside_file(tool, symlink_env):
out = os.path.join(symlink_env["work_dir"], "archive.tar.gz")
result = tool._run(
input_path=symlink_env["src"],
output_path=out,
format="tar.gz",
overwrite=True,
)
assert "Successfully compressed" in result
assert "skipped for safety" in result

with tarfile.open(out) as tf:
members = tf.getnames()
assert any(m.endswith("normal.txt") for m in members)
assert not any(m.endswith("leak.txt") for m in members)
assert all(not (tf.getmember(m).issym() or tf.getmember(m).islnk()) for m in members)


def test_compress_zip_validates_output_path_at_sink(symlink_env):
# Calling the compressor directly (bypassing _run) must still refuse to
# write outside the allow-list.
outside = os.path.join(symlink_env["secret_dir"], "evil.zip")
with pytest.raises(ValueError, match="outside the allowed director"):
FileCompressorTool._compress_zip(symlink_env["src"], outside)
assert not os.path.exists(outside)


def test_compress_tar_validates_output_path_at_sink(symlink_env):
outside = os.path.join(symlink_env["secret_dir"], "evil.tar.gz")
with pytest.raises(ValueError, match="outside the allowed director"):
FileCompressorTool._compress_tar(symlink_env["src"], outside, "tar.gz")
assert not os.path.exists(outside)
Loading