From 96f816668d8f9db777c038833d6f4367ac88d9a8 Mon Sep 17 00:00:00 2001 From: Beon de Nood Date: Fri, 27 Mar 2026 23:51:55 -0400 Subject: [PATCH 1/3] fix: add SHA-256 checksum verification for binary downloads (B5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port checksum verification from capiscio-python to capiscio-sdk-python. The SDK's process.py downloads capiscio-core from GitHub releases but previously had no integrity verification — the binary was downloaded and chmod +x'd without checking its hash. Now fetches checksums.txt from the release and verifies SHA-256 before marking the binary as executable. On mismatch, the downloaded file is deleted and a RuntimeError is raised. --- capiscio_sdk/_rpc/process.py | 49 ++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/capiscio_sdk/_rpc/process.py b/capiscio_sdk/_rpc/process.py index abfeb99..7a13752 100644 --- a/capiscio_sdk/_rpc/process.py +++ b/capiscio_sdk/_rpc/process.py @@ -1,6 +1,7 @@ """Process manager for the capiscio-core gRPC server.""" import atexit +import hashlib import logging import os import platform @@ -168,6 +169,7 @@ def _download_binary(self) -> Path: """Download the capiscio-core binary for the current platform. Downloads from GitHub releases to ~/.capiscio/bin//. + Verifies SHA-256 checksum against published checksums.txt. Retries up to 3 times with exponential backoff. Returns the path to the executable. """ @@ -202,6 +204,23 @@ def _download_binary(self) -> Path: st = os.stat(target_path) os.chmod(target_path, st.st_mode | stat.S_IEXEC) + # Verify checksum integrity + expected_hash = self._fetch_expected_checksum(CORE_VERSION, filename) + if expected_hash is not None: + if not self._verify_checksum(target_path, expected_hash): + target_path.unlink() + raise RuntimeError( + f"Binary integrity check failed for {filename}. " + "The downloaded file does not match the published checksum. " + "This may indicate a tampered or corrupted download." + ) + logger.info("Checksum verified for %s", filename) + else: + logger.warning( + "Could not verify binary integrity (checksums.txt not available). " + "Consider upgrading capiscio-core to a version that publishes checksums." + ) + sys.stderr.write(f"Installed capiscio-core v{CORE_VERSION} at {target_path}\n") sys.stderr.flush() logger.info("Installed capiscio-core v%s at %s", CORE_VERSION, target_path) @@ -225,6 +244,36 @@ def _download_binary(self) -> Path: ) from e # unreachable, but keeps type checker happy raise RuntimeError("Download failed") + + @staticmethod + def _fetch_expected_checksum(version: str, filename: str) -> Optional[str]: + """Fetch the expected SHA-256 checksum from the release checksums.txt.""" + url = f"https://github.com/{GITHUB_REPO}/releases/download/v{version}/checksums.txt" + try: + resp = httpx.get(url, follow_redirects=True, timeout=30.0) + resp.raise_for_status() + for line in resp.text.strip().splitlines(): + parts = line.split() + if len(parts) == 2 and parts[1] == filename: + return parts[0] + logger.warning("Binary %s not found in checksums.txt", filename) + return None + except httpx.HTTPError as e: + logger.warning("Could not fetch checksums.txt: %s", e) + return None + + @staticmethod + def _verify_checksum(file_path: Path, expected_hash: str) -> bool: + """Verify SHA-256 checksum of a downloaded file.""" + sha256 = hashlib.sha256() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + sha256.update(chunk) + actual = sha256.hexdigest() + if actual != expected_hash: + logger.error("Checksum mismatch: expected %s, got %s", expected_hash, actual) + return False + return True def ensure_running( self, From 1e161cab718c2a3a46606e9d96cecf7cc541d927 Mon Sep 17 00:00:00 2001 From: Beon de Nood Date: Fri, 27 Mar 2026 23:53:17 -0400 Subject: [PATCH 2/3] fix: add CAPISCIO_REQUIRE_CHECKSUM fail-closed mode When CAPISCIO_REQUIRE_CHECKSUM=true, binary download fails if checksums.txt is unavailable or the asset is missing from it. Default behavior remains fail-open (warn and continue) for backward compatibility. --- capiscio_sdk/_rpc/process.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/capiscio_sdk/_rpc/process.py b/capiscio_sdk/_rpc/process.py index 7a13752..8c3c1dc 100644 --- a/capiscio_sdk/_rpc/process.py +++ b/capiscio_sdk/_rpc/process.py @@ -205,6 +205,7 @@ def _download_binary(self) -> Path: os.chmod(target_path, st.st_mode | stat.S_IEXEC) # Verify checksum integrity + require_checksum = os.environ.get("CAPISCIO_REQUIRE_CHECKSUM", "").lower() in ("1", "true", "yes") expected_hash = self._fetch_expected_checksum(CORE_VERSION, filename) if expected_hash is not None: if not self._verify_checksum(target_path, expected_hash): @@ -215,10 +216,17 @@ def _download_binary(self) -> Path: "This may indicate a tampered or corrupted download." ) logger.info("Checksum verified for %s", filename) + elif require_checksum: + target_path.unlink() + raise RuntimeError( + f"Checksum verification required (CAPISCIO_REQUIRE_CHECKSUM=true) " + f"but checksums.txt is not available for v{CORE_VERSION}. " + "Cannot verify binary integrity." + ) else: logger.warning( "Could not verify binary integrity (checksums.txt not available). " - "Consider upgrading capiscio-core to a version that publishes checksums." + "Set CAPISCIO_REQUIRE_CHECKSUM=true to enforce verification." ) sys.stderr.write(f"Installed capiscio-core v{CORE_VERSION} at {target_path}\n") From ecb084c16109c6b4253cab3fc84aec83649699cf Mon Sep 17 00:00:00 2001 From: Beon de Nood Date: Sat, 28 Mar 2026 00:06:01 -0400 Subject: [PATCH 3/3] fix: address PR #50 review comments 1. Verify checksum BEFORE chmod +x (security hardening) 2. Use target_path.name instead of separately computed filename 3. Add unit tests for all checksum verification paths --- capiscio_sdk/_rpc/process.py | 20 ++-- tests/unit/test_process.py | 223 +++++++++++++++++++++++++++++++++++ 2 files changed, 232 insertions(+), 11 deletions(-) diff --git a/capiscio_sdk/_rpc/process.py b/capiscio_sdk/_rpc/process.py index 8c3c1dc..50af7c3 100644 --- a/capiscio_sdk/_rpc/process.py +++ b/capiscio_sdk/_rpc/process.py @@ -179,9 +179,7 @@ def _download_binary(self) -> Path: if target_path.exists(): return target_path - ext = ".exe" if os_name == "windows" else "" - filename = f"capiscio-{os_name}-{arch_name}{ext}" - url = f"https://github.com/{GITHUB_REPO}/releases/download/v{CORE_VERSION}/{filename}" + url = f"https://github.com/{GITHUB_REPO}/releases/download/v{CORE_VERSION}/{target_path.name}" sys.stderr.write( f"capiscio-core v{CORE_VERSION} not found. " @@ -200,22 +198,18 @@ def _download_binary(self) -> Path: for chunk in resp.iter_bytes(chunk_size=8192): f.write(chunk) - # Make executable - st = os.stat(target_path) - os.chmod(target_path, st.st_mode | stat.S_IEXEC) - - # Verify checksum integrity + # Verify checksum integrity BEFORE making executable require_checksum = os.environ.get("CAPISCIO_REQUIRE_CHECKSUM", "").lower() in ("1", "true", "yes") - expected_hash = self._fetch_expected_checksum(CORE_VERSION, filename) + expected_hash = self._fetch_expected_checksum(CORE_VERSION, target_path.name) if expected_hash is not None: if not self._verify_checksum(target_path, expected_hash): target_path.unlink() raise RuntimeError( - f"Binary integrity check failed for {filename}. " + f"Binary integrity check failed for {target_path.name}. " "The downloaded file does not match the published checksum. " "This may indicate a tampered or corrupted download." ) - logger.info("Checksum verified for %s", filename) + logger.info("Checksum verified for %s", target_path.name) elif require_checksum: target_path.unlink() raise RuntimeError( @@ -229,6 +223,10 @@ def _download_binary(self) -> Path: "Set CAPISCIO_REQUIRE_CHECKSUM=true to enforce verification." ) + # Make executable only after checksum passes + st = os.stat(target_path) + os.chmod(target_path, st.st_mode | stat.S_IEXEC) + sys.stderr.write(f"Installed capiscio-core v{CORE_VERSION} at {target_path}\n") sys.stderr.flush() logger.info("Installed capiscio-core v%s at %s", CORE_VERSION, target_path) diff --git a/tests/unit/test_process.py b/tests/unit/test_process.py index 005086b..2eddc57 100644 --- a/tests/unit/test_process.py +++ b/tests/unit/test_process.py @@ -304,3 +304,226 @@ def test_address_property_returns_unix_by_default(self): pm._socket_path = None from capiscio_sdk._rpc.process import DEFAULT_SOCKET_PATH assert pm.address == f"unix://{DEFAULT_SOCKET_PATH}" + + +class TestChecksumVerification: + """Tests for binary checksum verification paths.""" + + @patch("httpx.get") + def test_fetch_expected_checksum_success(self, mock_get): + """Test _fetch_expected_checksum returns hash when file is found.""" + mock_resp = MagicMock() + mock_resp.text = ( + "abc123def456 capiscio-linux-amd64\n" + "789xyz000111 capiscio-darwin-arm64\n" + ) + mock_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_resp + + result = ProcessManager._fetch_expected_checksum("2.5.0", "capiscio-linux-amd64") + assert result == "abc123def456" + + @patch("httpx.get") + def test_fetch_expected_checksum_file_not_in_list(self, mock_get): + """Test _fetch_expected_checksum returns None when filename not in checksums.""" + mock_resp = MagicMock() + mock_resp.text = "abc123 capiscio-linux-amd64\n" + mock_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_resp + + result = ProcessManager._fetch_expected_checksum("2.5.0", "capiscio-darwin-arm64") + assert result is None + + @patch("httpx.get") + def test_fetch_expected_checksum_http_error(self, mock_get): + """Test _fetch_expected_checksum returns None on HTTP error.""" + import httpx as httpx_mod + mock_get.side_effect = httpx_mod.HTTPError("connection failed") + + result = ProcessManager._fetch_expected_checksum("2.5.0", "capiscio-linux-amd64") + assert result is None + + @patch("httpx.get") + @patch("httpx.stream") + @patch("os.chmod") + @patch("os.stat") + def test_download_binary_checksum_match(self, mock_stat, mock_chmod, mock_stream, mock_get): + """Test successful download with matching checksum.""" + pm = ProcessManager() + + with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"): + with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"): + with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached: + mock_path = MagicMock(spec=Path) + mock_path.exists.return_value = False + mock_path.parent = MagicMock() + mock_path.name = "capiscio-linux-amd64" + mock_cached.return_value = mock_path + + # Mock stream download + mock_response = MagicMock() + mock_response.iter_bytes.return_value = [b"binary_data"] + mock_stream.return_value.__enter__.return_value = mock_response + + # Mock checksum fetch (returns a hash) + mock_get_resp = MagicMock() + mock_get_resp.text = "fakehash123 capiscio-linux-amd64\n" + mock_get_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_get_resp + + # Mock verify_checksum to return True + with patch.object(ProcessManager, "_verify_checksum", return_value=True): + m_open = mock_open() + with patch("builtins.open", m_open): + result = pm._download_binary() + + assert result == mock_path + # chmod should be called (checksum passed) + mock_chmod.assert_called_once() + + @patch("httpx.get") + @patch("httpx.stream") + def test_download_binary_checksum_mismatch_deletes_file(self, mock_stream, mock_get): + """Test that checksum mismatch deletes the file and raises.""" + pm = ProcessManager() + + with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"): + with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"): + with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached: + mock_path = MagicMock(spec=Path) + mock_path.exists.return_value = False + mock_path.parent = MagicMock() + mock_path.name = "capiscio-linux-amd64" + mock_cached.return_value = mock_path + + mock_response = MagicMock() + mock_response.iter_bytes.return_value = [b"bad_data"] + mock_stream.return_value.__enter__.return_value = mock_response + + mock_get_resp = MagicMock() + mock_get_resp.text = "expected_hash capiscio-linux-amd64\n" + mock_get_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_get_resp + + with patch.object(ProcessManager, "_verify_checksum", return_value=False): + m_open = mock_open() + with patch("builtins.open", m_open): + with pytest.raises(RuntimeError, match="integrity check failed"): + pm._download_binary() + + # File should have been deleted + mock_path.unlink.assert_called() + + @patch("httpx.get") + @patch("httpx.stream") + def test_download_binary_require_checksum_no_checksums_available(self, mock_stream, mock_get): + """Test CAPISCIO_REQUIRE_CHECKSUM fails when checksums.txt unavailable.""" + import httpx as httpx_mod + pm = ProcessManager() + + with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"): + with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"): + with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached: + mock_path = MagicMock(spec=Path) + mock_path.exists.return_value = False + mock_path.parent = MagicMock() + mock_path.name = "capiscio-linux-amd64" + mock_cached.return_value = mock_path + + mock_response = MagicMock() + mock_response.iter_bytes.return_value = [b"data"] + mock_stream.return_value.__enter__.return_value = mock_response + + # checksums.txt fetch fails + mock_get.side_effect = httpx_mod.HTTPError("404") + + with patch.dict(os.environ, {"CAPISCIO_REQUIRE_CHECKSUM": "true"}): + m_open = mock_open() + with patch("builtins.open", m_open): + with pytest.raises(RuntimeError, match="Checksum verification required"): + pm._download_binary() + + mock_path.unlink.assert_called() + + @patch("httpx.get") + @patch("httpx.stream") + @patch("os.chmod") + @patch("os.stat") + def test_download_binary_checksums_unavailable_without_require( + self, mock_stat, mock_chmod, mock_stream, mock_get + ): + """Test download proceeds with warning when checksums unavailable and not required.""" + import httpx as httpx_mod + pm = ProcessManager() + + with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"): + with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"): + with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached: + mock_path = MagicMock(spec=Path) + mock_path.exists.return_value = False + mock_path.parent = MagicMock() + mock_path.name = "capiscio-linux-amd64" + mock_cached.return_value = mock_path + + mock_response = MagicMock() + mock_response.iter_bytes.return_value = [b"data"] + mock_stream.return_value.__enter__.return_value = mock_response + + # checksums.txt not available + mock_get.side_effect = httpx_mod.HTTPError("404") + + with patch.dict(os.environ, {}, clear=False): + # Ensure CAPISCIO_REQUIRE_CHECKSUM is not set + os.environ.pop("CAPISCIO_REQUIRE_CHECKSUM", None) + m_open = mock_open() + with patch("builtins.open", m_open): + result = pm._download_binary() + + # Should succeed despite no checksum + assert result == mock_path + mock_chmod.assert_called_once() + + @patch("httpx.get") + @patch("httpx.stream") + @patch("os.chmod") + @patch("os.stat") + def test_download_binary_chmod_after_checksum(self, mock_stat, mock_chmod, mock_stream, mock_get): + """Test that chmod happens AFTER checksum verification, not before.""" + pm = ProcessManager() + call_order = [] + + with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"): + with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"): + with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached: + mock_path = MagicMock(spec=Path) + mock_path.exists.return_value = False + mock_path.parent = MagicMock() + mock_path.name = "capiscio-linux-amd64" + mock_cached.return_value = mock_path + + mock_response = MagicMock() + mock_response.iter_bytes.return_value = [b"data"] + mock_stream.return_value.__enter__.return_value = mock_response + + mock_get_resp = MagicMock() + mock_get_resp.text = "fakehash capiscio-linux-amd64\n" + mock_get_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_get_resp + + def track_verify(*a, **kw): + call_order.append("verify") + return True + + def track_chmod(*a, **kw): + call_order.append("chmod") + + mock_chmod.side_effect = track_chmod + + with patch.object(ProcessManager, "_verify_checksum", side_effect=track_verify): + m_open = mock_open() + with patch("builtins.open", m_open): + pm._download_binary() + + assert call_order == ["verify", "chmod"], ( + f"Expected verify before chmod, got: {call_order}" + )