Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 59 additions & 4 deletions capiscio_sdk/_rpc/process.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Process manager for the capiscio-core gRPC server."""

import atexit
import hashlib
import logging
import os
import platform
Expand Down Expand Up @@ -168,6 +169,7 @@ def _download_binary(self) -> Path:
"""Download the capiscio-core binary for the current platform.

Downloads from GitHub releases to ~/.capiscio/bin/<version>/.
Verifies SHA-256 checksum against published checksums.txt.
Retries up to 3 times with exponential backoff.
Returns the path to the executable.
"""
Expand All @@ -177,9 +179,7 @@ def _download_binary(self) -> Path:
if target_path.exists():
return target_path

ext = ".exe" if os_name == "windows" else ""
filename = f"capiscio-{os_name}-{arch_name}{ext}"
url = f"https://github.com/{GITHUB_REPO}/releases/download/v{CORE_VERSION}/{filename}"
url = f"https://github.com/{GITHUB_REPO}/releases/download/v{CORE_VERSION}/{target_path.name}"

sys.stderr.write(
f"capiscio-core v{CORE_VERSION} not found. "
Expand All @@ -198,7 +198,32 @@ def _download_binary(self) -> Path:
for chunk in resp.iter_bytes(chunk_size=8192):
f.write(chunk)

# Make executable
# Verify checksum integrity BEFORE making executable
require_checksum = os.environ.get("CAPISCIO_REQUIRE_CHECKSUM", "").lower() in ("1", "true", "yes")
expected_hash = self._fetch_expected_checksum(CORE_VERSION, target_path.name)
if expected_hash is not None:
if not self._verify_checksum(target_path, expected_hash):
target_path.unlink()
raise RuntimeError(
f"Binary integrity check failed for {target_path.name}. "
"The downloaded file does not match the published checksum. "
"This may indicate a tampered or corrupted download."
)
logger.info("Checksum verified for %s", target_path.name)
elif require_checksum:
target_path.unlink()
raise RuntimeError(
f"Checksum verification required (CAPISCIO_REQUIRE_CHECKSUM=true) "
f"but checksums.txt is not available for v{CORE_VERSION}. "
"Cannot verify binary integrity."
)
else:
logger.warning(
"Could not verify binary integrity (checksums.txt not available). "
"Set CAPISCIO_REQUIRE_CHECKSUM=true to enforce verification."
)

# Make executable only after checksum passes
st = os.stat(target_path)
os.chmod(target_path, st.st_mode | stat.S_IEXEC)

Expand All @@ -225,6 +250,36 @@ def _download_binary(self) -> Path:
) from e
# unreachable, but keeps type checker happy
raise RuntimeError("Download failed")

@staticmethod
def _fetch_expected_checksum(version: str, filename: str) -> Optional[str]:
"""Fetch the expected SHA-256 checksum from the release checksums.txt."""
url = f"https://github.com/{GITHUB_REPO}/releases/download/v{version}/checksums.txt"
try:
resp = httpx.get(url, follow_redirects=True, timeout=30.0)
resp.raise_for_status()
for line in resp.text.strip().splitlines():
Comment on lines +259 to +261
Copy link

Copilot AI Mar 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change introduces a new network call path (httpx.get to fetch checksums.txt). The existing unit tests for _download_binary currently only mock httpx.stream, so they will now attempt real network I/O and/or fail unexpectedly. Add/adjust unit tests to mock httpx.get (or _fetch_expected_checksum) and cover the new cases: checksum match, mismatch (file deleted + error), and CAPISCIO_REQUIRE_CHECKSUM fail-closed behavior.

Suggested change
resp = httpx.get(url, follow_redirects=True, timeout=30.0)
resp.raise_for_status()
for line in resp.text.strip().splitlines():
# Use httpx.stream so existing tests that mock httpx.stream also
# cover this network path and prevent real I/O in unit tests.
with httpx.stream("GET", url, follow_redirects=True, timeout=30.0) as resp:
resp.raise_for_status()
# Assemble full response text from the stream.
text_chunks = []
for chunk in resp.iter_text():
if chunk:
text_chunks.append(chunk)
text = "".join(text_chunks)
for line in text.strip().splitlines():

Copilot uses AI. Check for mistakes.
parts = line.split()
if len(parts) == 2 and parts[1] == filename:
return parts[0]
logger.warning("Binary %s not found in checksums.txt", filename)
return None
except httpx.HTTPError as e:
logger.warning("Could not fetch checksums.txt: %s", e)
return None

@staticmethod
def _verify_checksum(file_path: Path, expected_hash: str) -> bool:
"""Verify SHA-256 checksum of a downloaded file."""
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
actual = sha256.hexdigest()
if actual != expected_hash:
logger.error("Checksum mismatch: expected %s, got %s", expected_hash, actual)
return False
return True

def ensure_running(
self,
Expand Down
223 changes: 223 additions & 0 deletions tests/unit/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,226 @@ def test_address_property_returns_unix_by_default(self):
pm._socket_path = None
from capiscio_sdk._rpc.process import DEFAULT_SOCKET_PATH
assert pm.address == f"unix://{DEFAULT_SOCKET_PATH}"


class TestChecksumVerification:
"""Tests for binary checksum verification paths."""

@patch("httpx.get")
def test_fetch_expected_checksum_success(self, mock_get):
"""Test _fetch_expected_checksum returns hash when file is found."""
mock_resp = MagicMock()
mock_resp.text = (
"abc123def456 capiscio-linux-amd64\n"
"789xyz000111 capiscio-darwin-arm64\n"
)
mock_resp.raise_for_status = MagicMock()
mock_get.return_value = mock_resp

result = ProcessManager._fetch_expected_checksum("2.5.0", "capiscio-linux-amd64")
assert result == "abc123def456"

@patch("httpx.get")
def test_fetch_expected_checksum_file_not_in_list(self, mock_get):
"""Test _fetch_expected_checksum returns None when filename not in checksums."""
mock_resp = MagicMock()
mock_resp.text = "abc123 capiscio-linux-amd64\n"
mock_resp.raise_for_status = MagicMock()
mock_get.return_value = mock_resp

result = ProcessManager._fetch_expected_checksum("2.5.0", "capiscio-darwin-arm64")
assert result is None

@patch("httpx.get")
def test_fetch_expected_checksum_http_error(self, mock_get):
"""Test _fetch_expected_checksum returns None on HTTP error."""
import httpx as httpx_mod
mock_get.side_effect = httpx_mod.HTTPError("connection failed")

result = ProcessManager._fetch_expected_checksum("2.5.0", "capiscio-linux-amd64")
assert result is None

@patch("httpx.get")
@patch("httpx.stream")
@patch("os.chmod")
@patch("os.stat")
def test_download_binary_checksum_match(self, mock_stat, mock_chmod, mock_stream, mock_get):
"""Test successful download with matching checksum."""
pm = ProcessManager()

with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"):
with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"):
with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached:
mock_path = MagicMock(spec=Path)
mock_path.exists.return_value = False
mock_path.parent = MagicMock()
mock_path.name = "capiscio-linux-amd64"
mock_cached.return_value = mock_path

# Mock stream download
mock_response = MagicMock()
mock_response.iter_bytes.return_value = [b"binary_data"]
mock_stream.return_value.__enter__.return_value = mock_response

# Mock checksum fetch (returns a hash)
mock_get_resp = MagicMock()
mock_get_resp.text = "fakehash123 capiscio-linux-amd64\n"
mock_get_resp.raise_for_status = MagicMock()
mock_get.return_value = mock_get_resp

# Mock verify_checksum to return True
with patch.object(ProcessManager, "_verify_checksum", return_value=True):
m_open = mock_open()
with patch("builtins.open", m_open):
result = pm._download_binary()

assert result == mock_path
# chmod should be called (checksum passed)
mock_chmod.assert_called_once()

@patch("httpx.get")
@patch("httpx.stream")
def test_download_binary_checksum_mismatch_deletes_file(self, mock_stream, mock_get):
"""Test that checksum mismatch deletes the file and raises."""
pm = ProcessManager()

with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"):
with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"):
with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached:
mock_path = MagicMock(spec=Path)
mock_path.exists.return_value = False
mock_path.parent = MagicMock()
mock_path.name = "capiscio-linux-amd64"
mock_cached.return_value = mock_path

mock_response = MagicMock()
mock_response.iter_bytes.return_value = [b"bad_data"]
mock_stream.return_value.__enter__.return_value = mock_response

mock_get_resp = MagicMock()
mock_get_resp.text = "expected_hash capiscio-linux-amd64\n"
mock_get_resp.raise_for_status = MagicMock()
mock_get.return_value = mock_get_resp

with patch.object(ProcessManager, "_verify_checksum", return_value=False):
m_open = mock_open()
with patch("builtins.open", m_open):
with pytest.raises(RuntimeError, match="integrity check failed"):
pm._download_binary()

# File should have been deleted
mock_path.unlink.assert_called()

@patch("httpx.get")
@patch("httpx.stream")
def test_download_binary_require_checksum_no_checksums_available(self, mock_stream, mock_get):
"""Test CAPISCIO_REQUIRE_CHECKSUM fails when checksums.txt unavailable."""
import httpx as httpx_mod
pm = ProcessManager()

with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"):
with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"):
with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached:
mock_path = MagicMock(spec=Path)
mock_path.exists.return_value = False
mock_path.parent = MagicMock()
mock_path.name = "capiscio-linux-amd64"
mock_cached.return_value = mock_path

mock_response = MagicMock()
mock_response.iter_bytes.return_value = [b"data"]
mock_stream.return_value.__enter__.return_value = mock_response

# checksums.txt fetch fails
mock_get.side_effect = httpx_mod.HTTPError("404")

with patch.dict(os.environ, {"CAPISCIO_REQUIRE_CHECKSUM": "true"}):
m_open = mock_open()
with patch("builtins.open", m_open):
with pytest.raises(RuntimeError, match="Checksum verification required"):
pm._download_binary()

mock_path.unlink.assert_called()

@patch("httpx.get")
@patch("httpx.stream")
@patch("os.chmod")
@patch("os.stat")
def test_download_binary_checksums_unavailable_without_require(
self, mock_stat, mock_chmod, mock_stream, mock_get
):
"""Test download proceeds with warning when checksums unavailable and not required."""
import httpx as httpx_mod
pm = ProcessManager()

with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"):
with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"):
with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached:
mock_path = MagicMock(spec=Path)
mock_path.exists.return_value = False
mock_path.parent = MagicMock()
mock_path.name = "capiscio-linux-amd64"
mock_cached.return_value = mock_path

mock_response = MagicMock()
mock_response.iter_bytes.return_value = [b"data"]
mock_stream.return_value.__enter__.return_value = mock_response

# checksums.txt not available
mock_get.side_effect = httpx_mod.HTTPError("404")

with patch.dict(os.environ, {}, clear=False):
# Ensure CAPISCIO_REQUIRE_CHECKSUM is not set
os.environ.pop("CAPISCIO_REQUIRE_CHECKSUM", None)
m_open = mock_open()
with patch("builtins.open", m_open):
result = pm._download_binary()

# Should succeed despite no checksum
assert result == mock_path
mock_chmod.assert_called_once()

@patch("httpx.get")
@patch("httpx.stream")
@patch("os.chmod")
@patch("os.stat")
def test_download_binary_chmod_after_checksum(self, mock_stat, mock_chmod, mock_stream, mock_get):
"""Test that chmod happens AFTER checksum verification, not before."""
pm = ProcessManager()
call_order = []

with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"):
with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"):
with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached:
mock_path = MagicMock(spec=Path)
mock_path.exists.return_value = False
mock_path.parent = MagicMock()
mock_path.name = "capiscio-linux-amd64"
mock_cached.return_value = mock_path

mock_response = MagicMock()
mock_response.iter_bytes.return_value = [b"data"]
mock_stream.return_value.__enter__.return_value = mock_response

mock_get_resp = MagicMock()
mock_get_resp.text = "fakehash capiscio-linux-amd64\n"
mock_get_resp.raise_for_status = MagicMock()
mock_get.return_value = mock_get_resp

def track_verify(*a, **kw):
call_order.append("verify")
return True

def track_chmod(*a, **kw):
call_order.append("chmod")

mock_chmod.side_effect = track_chmod

with patch.object(ProcessManager, "_verify_checksum", side_effect=track_verify):
m_open = mock_open()
with patch("builtins.open", m_open):
pm._download_binary()

assert call_order == ["verify", "chmod"], (
f"Expected verify before chmod, got: {call_order}"
)
Loading