diff --git a/capiscio_sdk/_rpc/process.py b/capiscio_sdk/_rpc/process.py index abfeb99..50af7c3 100644 --- a/capiscio_sdk/_rpc/process.py +++ b/capiscio_sdk/_rpc/process.py @@ -1,6 +1,7 @@ """Process manager for the capiscio-core gRPC server.""" import atexit +import hashlib import logging import os import platform @@ -168,6 +169,7 @@ def _download_binary(self) -> Path: """Download the capiscio-core binary for the current platform. Downloads from GitHub releases to ~/.capiscio/bin//. + Verifies SHA-256 checksum against published checksums.txt. Retries up to 3 times with exponential backoff. Returns the path to the executable. """ @@ -177,9 +179,7 @@ def _download_binary(self) -> Path: if target_path.exists(): return target_path - ext = ".exe" if os_name == "windows" else "" - filename = f"capiscio-{os_name}-{arch_name}{ext}" - url = f"https://github.com/{GITHUB_REPO}/releases/download/v{CORE_VERSION}/{filename}" + url = f"https://github.com/{GITHUB_REPO}/releases/download/v{CORE_VERSION}/{target_path.name}" sys.stderr.write( f"capiscio-core v{CORE_VERSION} not found. " @@ -198,7 +198,32 @@ def _download_binary(self) -> Path: for chunk in resp.iter_bytes(chunk_size=8192): f.write(chunk) - # Make executable + # Verify checksum integrity BEFORE making executable + require_checksum = os.environ.get("CAPISCIO_REQUIRE_CHECKSUM", "").lower() in ("1", "true", "yes") + expected_hash = self._fetch_expected_checksum(CORE_VERSION, target_path.name) + if expected_hash is not None: + if not self._verify_checksum(target_path, expected_hash): + target_path.unlink() + raise RuntimeError( + f"Binary integrity check failed for {target_path.name}. " + "The downloaded file does not match the published checksum. " + "This may indicate a tampered or corrupted download." + ) + logger.info("Checksum verified for %s", target_path.name) + elif require_checksum: + target_path.unlink() + raise RuntimeError( + f"Checksum verification required (CAPISCIO_REQUIRE_CHECKSUM=true) " + f"but checksums.txt is not available for v{CORE_VERSION}. " + "Cannot verify binary integrity." + ) + else: + logger.warning( + "Could not verify binary integrity (checksums.txt not available). " + "Set CAPISCIO_REQUIRE_CHECKSUM=true to enforce verification." + ) + + # Make executable only after checksum passes st = os.stat(target_path) os.chmod(target_path, st.st_mode | stat.S_IEXEC) @@ -225,6 +250,36 @@ def _download_binary(self) -> Path: ) from e # unreachable, but keeps type checker happy raise RuntimeError("Download failed") + + @staticmethod + def _fetch_expected_checksum(version: str, filename: str) -> Optional[str]: + """Fetch the expected SHA-256 checksum from the release checksums.txt.""" + url = f"https://github.com/{GITHUB_REPO}/releases/download/v{version}/checksums.txt" + try: + resp = httpx.get(url, follow_redirects=True, timeout=30.0) + resp.raise_for_status() + for line in resp.text.strip().splitlines(): + parts = line.split() + if len(parts) == 2 and parts[1] == filename: + return parts[0] + logger.warning("Binary %s not found in checksums.txt", filename) + return None + except httpx.HTTPError as e: + logger.warning("Could not fetch checksums.txt: %s", e) + return None + + @staticmethod + def _verify_checksum(file_path: Path, expected_hash: str) -> bool: + """Verify SHA-256 checksum of a downloaded file.""" + sha256 = hashlib.sha256() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + sha256.update(chunk) + actual = sha256.hexdigest() + if actual != expected_hash: + logger.error("Checksum mismatch: expected %s, got %s", expected_hash, actual) + return False + return True def ensure_running( self, diff --git a/tests/unit/test_process.py b/tests/unit/test_process.py index 005086b..2eddc57 100644 --- a/tests/unit/test_process.py +++ b/tests/unit/test_process.py @@ -304,3 +304,226 @@ def test_address_property_returns_unix_by_default(self): pm._socket_path = None from capiscio_sdk._rpc.process import DEFAULT_SOCKET_PATH assert pm.address == f"unix://{DEFAULT_SOCKET_PATH}" + + +class TestChecksumVerification: + """Tests for binary checksum verification paths.""" + + @patch("httpx.get") + def test_fetch_expected_checksum_success(self, mock_get): + """Test _fetch_expected_checksum returns hash when file is found.""" + mock_resp = MagicMock() + mock_resp.text = ( + "abc123def456 capiscio-linux-amd64\n" + "789xyz000111 capiscio-darwin-arm64\n" + ) + mock_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_resp + + result = ProcessManager._fetch_expected_checksum("2.5.0", "capiscio-linux-amd64") + assert result == "abc123def456" + + @patch("httpx.get") + def test_fetch_expected_checksum_file_not_in_list(self, mock_get): + """Test _fetch_expected_checksum returns None when filename not in checksums.""" + mock_resp = MagicMock() + mock_resp.text = "abc123 capiscio-linux-amd64\n" + mock_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_resp + + result = ProcessManager._fetch_expected_checksum("2.5.0", "capiscio-darwin-arm64") + assert result is None + + @patch("httpx.get") + def test_fetch_expected_checksum_http_error(self, mock_get): + """Test _fetch_expected_checksum returns None on HTTP error.""" + import httpx as httpx_mod + mock_get.side_effect = httpx_mod.HTTPError("connection failed") + + result = ProcessManager._fetch_expected_checksum("2.5.0", "capiscio-linux-amd64") + assert result is None + + @patch("httpx.get") + @patch("httpx.stream") + @patch("os.chmod") + @patch("os.stat") + def test_download_binary_checksum_match(self, mock_stat, mock_chmod, mock_stream, mock_get): + """Test successful download with matching checksum.""" + pm = ProcessManager() + + with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"): + with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"): + with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached: + mock_path = MagicMock(spec=Path) + mock_path.exists.return_value = False + mock_path.parent = MagicMock() + mock_path.name = "capiscio-linux-amd64" + mock_cached.return_value = mock_path + + # Mock stream download + mock_response = MagicMock() + mock_response.iter_bytes.return_value = [b"binary_data"] + mock_stream.return_value.__enter__.return_value = mock_response + + # Mock checksum fetch (returns a hash) + mock_get_resp = MagicMock() + mock_get_resp.text = "fakehash123 capiscio-linux-amd64\n" + mock_get_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_get_resp + + # Mock verify_checksum to return True + with patch.object(ProcessManager, "_verify_checksum", return_value=True): + m_open = mock_open() + with patch("builtins.open", m_open): + result = pm._download_binary() + + assert result == mock_path + # chmod should be called (checksum passed) + mock_chmod.assert_called_once() + + @patch("httpx.get") + @patch("httpx.stream") + def test_download_binary_checksum_mismatch_deletes_file(self, mock_stream, mock_get): + """Test that checksum mismatch deletes the file and raises.""" + pm = ProcessManager() + + with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"): + with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"): + with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached: + mock_path = MagicMock(spec=Path) + mock_path.exists.return_value = False + mock_path.parent = MagicMock() + mock_path.name = "capiscio-linux-amd64" + mock_cached.return_value = mock_path + + mock_response = MagicMock() + mock_response.iter_bytes.return_value = [b"bad_data"] + mock_stream.return_value.__enter__.return_value = mock_response + + mock_get_resp = MagicMock() + mock_get_resp.text = "expected_hash capiscio-linux-amd64\n" + mock_get_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_get_resp + + with patch.object(ProcessManager, "_verify_checksum", return_value=False): + m_open = mock_open() + with patch("builtins.open", m_open): + with pytest.raises(RuntimeError, match="integrity check failed"): + pm._download_binary() + + # File should have been deleted + mock_path.unlink.assert_called() + + @patch("httpx.get") + @patch("httpx.stream") + def test_download_binary_require_checksum_no_checksums_available(self, mock_stream, mock_get): + """Test CAPISCIO_REQUIRE_CHECKSUM fails when checksums.txt unavailable.""" + import httpx as httpx_mod + pm = ProcessManager() + + with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"): + with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"): + with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached: + mock_path = MagicMock(spec=Path) + mock_path.exists.return_value = False + mock_path.parent = MagicMock() + mock_path.name = "capiscio-linux-amd64" + mock_cached.return_value = mock_path + + mock_response = MagicMock() + mock_response.iter_bytes.return_value = [b"data"] + mock_stream.return_value.__enter__.return_value = mock_response + + # checksums.txt fetch fails + mock_get.side_effect = httpx_mod.HTTPError("404") + + with patch.dict(os.environ, {"CAPISCIO_REQUIRE_CHECKSUM": "true"}): + m_open = mock_open() + with patch("builtins.open", m_open): + with pytest.raises(RuntimeError, match="Checksum verification required"): + pm._download_binary() + + mock_path.unlink.assert_called() + + @patch("httpx.get") + @patch("httpx.stream") + @patch("os.chmod") + @patch("os.stat") + def test_download_binary_checksums_unavailable_without_require( + self, mock_stat, mock_chmod, mock_stream, mock_get + ): + """Test download proceeds with warning when checksums unavailable and not required.""" + import httpx as httpx_mod + pm = ProcessManager() + + with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"): + with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"): + with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached: + mock_path = MagicMock(spec=Path) + mock_path.exists.return_value = False + mock_path.parent = MagicMock() + mock_path.name = "capiscio-linux-amd64" + mock_cached.return_value = mock_path + + mock_response = MagicMock() + mock_response.iter_bytes.return_value = [b"data"] + mock_stream.return_value.__enter__.return_value = mock_response + + # checksums.txt not available + mock_get.side_effect = httpx_mod.HTTPError("404") + + with patch.dict(os.environ, {}, clear=False): + # Ensure CAPISCIO_REQUIRE_CHECKSUM is not set + os.environ.pop("CAPISCIO_REQUIRE_CHECKSUM", None) + m_open = mock_open() + with patch("builtins.open", m_open): + result = pm._download_binary() + + # Should succeed despite no checksum + assert result == mock_path + mock_chmod.assert_called_once() + + @patch("httpx.get") + @patch("httpx.stream") + @patch("os.chmod") + @patch("os.stat") + def test_download_binary_chmod_after_checksum(self, mock_stat, mock_chmod, mock_stream, mock_get): + """Test that chmod happens AFTER checksum verification, not before.""" + pm = ProcessManager() + call_order = [] + + with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"): + with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"): + with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached: + mock_path = MagicMock(spec=Path) + mock_path.exists.return_value = False + mock_path.parent = MagicMock() + mock_path.name = "capiscio-linux-amd64" + mock_cached.return_value = mock_path + + mock_response = MagicMock() + mock_response.iter_bytes.return_value = [b"data"] + mock_stream.return_value.__enter__.return_value = mock_response + + mock_get_resp = MagicMock() + mock_get_resp.text = "fakehash capiscio-linux-amd64\n" + mock_get_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_get_resp + + def track_verify(*a, **kw): + call_order.append("verify") + return True + + def track_chmod(*a, **kw): + call_order.append("chmod") + + mock_chmod.side_effect = track_chmod + + with patch.object(ProcessManager, "_verify_checksum", side_effect=track_verify): + m_open = mock_open() + with patch("builtins.open", m_open): + pm._download_binary() + + assert call_order == ["verify", "chmod"], ( + f"Expected verify before chmod, got: {call_order}" + )