From 66e3d339d81d0e1b7bf816e03ff175901598ad8f Mon Sep 17 00:00:00 2001 From: Beon de Nood Date: Fri, 27 Mar 2026 23:53:06 -0400 Subject: [PATCH 1/2] fix: add CAPISCIO_REQUIRE_CHECKSUM fail-closed mode When CAPISCIO_REQUIRE_CHECKSUM=true, binary download fails if checksums.txt is unavailable or the asset is missing from it. Default behavior remains fail-open (warn and continue) for backward compatibility. --- src/capiscio/manager.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/capiscio/manager.py b/src/capiscio/manager.py index 1be9538..0cd5771 100644 --- a/src/capiscio/manager.py +++ b/src/capiscio/manager.py @@ -140,6 +140,7 @@ def download_binary(version: str) -> Path: os.chmod(target_path, st.st_mode | stat.S_IEXEC) # Verify checksum integrity + require_checksum = os.environ.get("CAPISCIO_REQUIRE_CHECKSUM", "").lower() in ("1", "true", "yes") expected_hash = _fetch_expected_checksum(version, filename) if expected_hash is not None: if not _verify_checksum(target_path, expected_hash): @@ -150,10 +151,17 @@ def download_binary(version: str) -> Path: "This may indicate a tampered or corrupted download." ) logger.info(f"Checksum verified for {filename}") + elif require_checksum: + target_path.unlink() + raise RuntimeError( + f"Checksum verification required (CAPISCIO_REQUIRE_CHECKSUM=true) " + f"but checksums.txt is not available for v{version}. " + "Cannot verify binary integrity." + ) else: logger.warning( "Could not verify binary integrity (checksums.txt not available). " - "Consider upgrading capiscio-core to a version that publishes checksums." + "Set CAPISCIO_REQUIRE_CHECKSUM=true to enforce verification." ) console.print(f"[green]Successfully installed CapiscIO Core v{version}[/green]") From 829d86ba578451066d30fe504cdb720233c075ac Mon Sep 17 00:00:00 2001 From: Beon de Nood Date: Sat, 28 Mar 2026 00:08:27 -0400 Subject: [PATCH 2/2] fix: address PR #16 review comments 1. Close requests.get response with context manager in _fetch_expected_checksum 2. Return (checksum, status) tuple to distinguish fetch_failed vs entry_missing 3. Tailor error messages for each failure mode 4. Document CAPISCIO_REQUIRE_CHECKSUM env var behavior in code comment 5. Move chmod +x after checksum verification (verify before trust) 6. Add unit tests for all checksum paths (match, mismatch, require+fetch_failed, require+entry_missing) --- src/capiscio/manager.py | 81 +++++++++++------- tests/unit/test_manager.py | 163 ++++++++++++++++++++++++++++++++++++- 2 files changed, 215 insertions(+), 29 deletions(-) diff --git a/src/capiscio/manager.py b/src/capiscio/manager.py index 0cd5771..af32e9b 100644 --- a/src/capiscio/manager.py +++ b/src/capiscio/manager.py @@ -69,21 +69,28 @@ def get_binary_path(version: str) -> Path: # For now, let's put it in a versioned folder return get_cache_dir() / version / filename -def _fetch_expected_checksum(version: str, filename: str) -> Optional[str]: - """Fetch the expected SHA-256 checksum from the release checksums.txt.""" +def _fetch_expected_checksum(version: str, filename: str) -> Tuple[Optional[str], str]: + """Fetch the expected SHA-256 checksum from the release checksums.txt. + + Returns: + A tuple of (checksum, status) where status is one of: + - "ok" — checksum found and returned + - "fetch_failed" — could not download checksums.txt + - "entry_missing" — checksums.txt downloaded but no entry for filename + """ url = f"https://github.com/{GITHUB_REPO}/releases/download/v{version}/checksums.txt" try: - resp = requests.get(url, timeout=30) - resp.raise_for_status() - for line in resp.text.strip().splitlines(): - parts = line.split() - if len(parts) == 2 and parts[1] == filename: - return parts[0] - logger.warning(f"Binary {filename} not found in checksums.txt") - return None + with requests.get(url, timeout=30) as resp: + resp.raise_for_status() + for line in resp.text.strip().splitlines(): + parts = line.split() + if len(parts) == 2 and parts[1] == filename: + return parts[0], "ok" + logger.warning(f"Binary {filename} not found in checksums.txt") + return None, "entry_missing" except requests.exceptions.RequestException as e: logger.warning(f"Could not fetch checksums.txt: {e}") - return None + return None, "fetch_failed" def _verify_checksum(file_path: Path, expected_hash: str) -> bool: """Verify SHA-256 checksum of a downloaded file.""" @@ -135,34 +142,52 @@ def download_binary(version: str) -> Path: f.write(chunk) progress.update(task, advance=len(chunk)) - # Make executable - st = os.stat(target_path) - os.chmod(target_path, st.st_mode | stat.S_IEXEC) - - # Verify checksum integrity + # Verify checksum BEFORE making executable (security: validate before trust) + # + # CAPISCIO_REQUIRE_CHECKSUM env var controls strict verification: + # When set to "1", "true", or "yes", the download will fail if + # checksums.txt cannot be fetched OR the binary entry is missing. + # When unset/false, a warning is logged but the binary is still used. require_checksum = os.environ.get("CAPISCIO_REQUIRE_CHECKSUM", "").lower() in ("1", "true", "yes") - expected_hash = _fetch_expected_checksum(version, filename) + expected_hash, checksum_status = _fetch_expected_checksum(version, target_path.name) if expected_hash is not None: if not _verify_checksum(target_path, expected_hash): target_path.unlink() raise RuntimeError( - f"Binary integrity check failed for {filename}. " + f"Binary integrity check failed for {target_path.name}. " "The downloaded file does not match the published checksum. " "This may indicate a tampered or corrupted download." ) - logger.info(f"Checksum verified for {filename}") + logger.info(f"Checksum verified for {target_path.name}") elif require_checksum: target_path.unlink() - raise RuntimeError( - f"Checksum verification required (CAPISCIO_REQUIRE_CHECKSUM=true) " - f"but checksums.txt is not available for v{version}. " - "Cannot verify binary integrity." - ) + if checksum_status == "fetch_failed": + raise RuntimeError( + f"Checksum verification required (CAPISCIO_REQUIRE_CHECKSUM=true) " + f"but checksums.txt could not be fetched for v{version}. " + "Cannot verify binary integrity." + ) + else: + raise RuntimeError( + f"Checksum verification required (CAPISCIO_REQUIRE_CHECKSUM=true) " + f"but no checksum entry found for {target_path.name} in v{version} checksums.txt. " + "Cannot verify binary integrity." + ) else: - logger.warning( - "Could not verify binary integrity (checksums.txt not available). " - "Set CAPISCIO_REQUIRE_CHECKSUM=true to enforce verification." - ) + if checksum_status == "fetch_failed": + logger.warning( + "Could not verify binary integrity (checksums.txt not available). " + "Set CAPISCIO_REQUIRE_CHECKSUM=true to enforce verification." + ) + else: + logger.warning( + f"Could not verify binary integrity (no entry for {target_path.name} in checksums.txt). " + "Set CAPISCIO_REQUIRE_CHECKSUM=true to enforce verification." + ) + + # Make executable only after checksum verification passes + st = os.stat(target_path) + os.chmod(target_path, st.st_mode | stat.S_IEXEC) console.print(f"[green]Successfully installed CapiscIO Core v{version}[/green]") return target_path diff --git a/tests/unit/test_manager.py b/tests/unit/test_manager.py index b7d87cf..b61d8f9 100644 --- a/tests/unit/test_manager.py +++ b/tests/unit/test_manager.py @@ -12,6 +12,8 @@ get_binary_path, download_binary, run_core, + _fetch_expected_checksum, + _verify_checksum, CORE_VERSION, GITHUB_REPO, ) @@ -145,15 +147,17 @@ def test_returns_existing_binary(self, mock_get_path): result = download_binary("1.0.0") assert result == mock_path + @patch('capiscio.manager._fetch_expected_checksum', return_value=(None, "fetch_failed")) @patch('capiscio.manager.get_platform_info', return_value=('linux', 'amd64')) @patch('capiscio.manager.get_binary_path') @patch('capiscio.manager.requests.get') @patch('capiscio.manager.console') - def test_downloads_binary_on_missing(self, mock_console, mock_requests, mock_get_path, mock_platform): + def test_downloads_binary_on_missing(self, mock_console, mock_requests, mock_get_path, mock_platform, mock_fetch_checksum): """Test that binary is downloaded when missing.""" mock_path = MagicMock(spec=Path) mock_path.exists.return_value = False mock_path.parent = MagicMock() + mock_path.name = "capiscio-linux-amd64" mock_get_path.return_value = mock_path # Mock the response @@ -194,6 +198,163 @@ def test_cleans_up_on_download_error(self, mock_console, mock_requests, mock_get mock_path.unlink.assert_called_once() +class TestFetchExpectedChecksum: + """Tests for _fetch_expected_checksum function.""" + + @patch('capiscio.manager.requests.get') + def test_returns_checksum_on_match(self, mock_get): + """Test successful checksum lookup.""" + mock_resp = MagicMock() + mock_resp.text = "abc123 capiscio-linux-amd64\ndef456 capiscio-darwin-arm64\n" + mock_resp.raise_for_status = MagicMock() + mock_resp.__enter__ = MagicMock(return_value=mock_resp) + mock_resp.__exit__ = MagicMock(return_value=False) + mock_get.return_value = mock_resp + + checksum, status = _fetch_expected_checksum("1.0.0", "capiscio-linux-amd64") + assert checksum == "abc123" + assert status == "ok" + + @patch('capiscio.manager.requests.get') + def test_returns_entry_missing_when_not_found(self, mock_get): + """Test that missing entry returns entry_missing status.""" + mock_resp = MagicMock() + mock_resp.text = "abc123 capiscio-linux-amd64\n" + mock_resp.raise_for_status = MagicMock() + mock_resp.__enter__ = MagicMock(return_value=mock_resp) + mock_resp.__exit__ = MagicMock(return_value=False) + mock_get.return_value = mock_resp + + checksum, status = _fetch_expected_checksum("1.0.0", "capiscio-darwin-arm64") + assert checksum is None + assert status == "entry_missing" + + @patch('capiscio.manager.requests.get') + def test_returns_fetch_failed_on_network_error(self, mock_get): + """Test that network errors return fetch_failed status.""" + import requests.exceptions + mock_get.side_effect = requests.exceptions.ConnectionError("timeout") + + checksum, status = _fetch_expected_checksum("1.0.0", "capiscio-linux-amd64") + assert checksum is None + assert status == "fetch_failed" + + +class TestChecksumVerificationIntegration: + """Tests for checksum verification during download.""" + + def _make_download_mocks(self): + """Helper: set up common mocks for download_binary tests.""" + mock_path = MagicMock(spec=Path) + mock_path.exists.return_value = False + mock_path.parent = MagicMock() + mock_path.name = "capiscio-linux-amd64" + return mock_path + + @patch('capiscio.manager._fetch_expected_checksum', return_value=("abc123", "ok")) + @patch('capiscio.manager._verify_checksum', return_value=True) + @patch('capiscio.manager.get_platform_info', return_value=('linux', 'amd64')) + @patch('capiscio.manager.get_binary_path') + @patch('capiscio.manager.requests.get') + @patch('capiscio.manager.console') + def test_checksum_verified_match(self, mock_console, mock_requests, mock_get_path, + mock_platform, mock_verify, mock_fetch): + """Test that download succeeds when checksum matches.""" + mock_path = self._make_download_mocks() + mock_get_path.return_value = mock_path + + mock_response = MagicMock() + mock_response.headers = {'content-length': '1024'} + mock_response.iter_content.return_value = [b'x' * 1024] + mock_response.__enter__ = MagicMock(return_value=mock_response) + mock_response.__exit__ = MagicMock(return_value=False) + mock_requests.return_value = mock_response + + with patch('builtins.open', mock_open()): + with patch.object(os, 'stat') as mock_stat: + with patch.object(os, 'chmod'): + mock_stat.return_value = MagicMock(st_mode=0o644) + result = download_binary("1.0.0") + + assert result == mock_path + mock_verify.assert_called_once_with(mock_path, "abc123") + + @patch('capiscio.manager._fetch_expected_checksum', return_value=("abc123", "ok")) + @patch('capiscio.manager._verify_checksum', return_value=False) + @patch('capiscio.manager.get_platform_info', return_value=('linux', 'amd64')) + @patch('capiscio.manager.get_binary_path') + @patch('capiscio.manager.requests.get') + @patch('capiscio.manager.console') + def test_checksum_mismatch_cleans_up(self, mock_console, mock_requests, mock_get_path, + mock_platform, mock_verify, mock_fetch): + """Test that a checksum mismatch deletes the binary and raises.""" + mock_path = self._make_download_mocks() + mock_get_path.return_value = mock_path + + mock_response = MagicMock() + mock_response.headers = {'content-length': '1024'} + mock_response.iter_content.return_value = [b'x' * 1024] + mock_response.__enter__ = MagicMock(return_value=mock_response) + mock_response.__exit__ = MagicMock(return_value=False) + mock_requests.return_value = mock_response + + with patch('builtins.open', mock_open()): + with pytest.raises(RuntimeError, match="integrity check failed"): + download_binary("1.0.0") + + mock_path.unlink.assert_called() + + @patch.dict(os.environ, {"CAPISCIO_REQUIRE_CHECKSUM": "true"}) + @patch('capiscio.manager._fetch_expected_checksum', return_value=(None, "fetch_failed")) + @patch('capiscio.manager.get_platform_info', return_value=('linux', 'amd64')) + @patch('capiscio.manager.get_binary_path') + @patch('capiscio.manager.requests.get') + @patch('capiscio.manager.console') + def test_require_checksum_fails_on_fetch_failed(self, mock_console, mock_requests, + mock_get_path, mock_platform, mock_fetch): + """Test fail-closed when CAPISCIO_REQUIRE_CHECKSUM=true and fetch fails.""" + mock_path = self._make_download_mocks() + mock_get_path.return_value = mock_path + + mock_response = MagicMock() + mock_response.headers = {'content-length': '1024'} + mock_response.iter_content.return_value = [b'x' * 1024] + mock_response.__enter__ = MagicMock(return_value=mock_response) + mock_response.__exit__ = MagicMock(return_value=False) + mock_requests.return_value = mock_response + + with patch('builtins.open', mock_open()): + with pytest.raises(RuntimeError, match="could not be fetched"): + download_binary("1.0.0") + + mock_path.unlink.assert_called() + + @patch.dict(os.environ, {"CAPISCIO_REQUIRE_CHECKSUM": "true"}) + @patch('capiscio.manager._fetch_expected_checksum', return_value=(None, "entry_missing")) + @patch('capiscio.manager.get_platform_info', return_value=('linux', 'amd64')) + @patch('capiscio.manager.get_binary_path') + @patch('capiscio.manager.requests.get') + @patch('capiscio.manager.console') + def test_require_checksum_fails_on_entry_missing(self, mock_console, mock_requests, + mock_get_path, mock_platform, mock_fetch): + """Test fail-closed when CAPISCIO_REQUIRE_CHECKSUM=true and entry is missing.""" + mock_path = self._make_download_mocks() + mock_get_path.return_value = mock_path + + mock_response = MagicMock() + mock_response.headers = {'content-length': '1024'} + mock_response.iter_content.return_value = [b'x' * 1024] + mock_response.__enter__ = MagicMock(return_value=mock_response) + mock_response.__exit__ = MagicMock(return_value=False) + mock_requests.return_value = mock_response + + with patch('builtins.open', mock_open()): + with pytest.raises(RuntimeError, match="no checksum entry found"): + download_binary("1.0.0") + + mock_path.unlink.assert_called() + + class TestRunCore: """Tests for run_core function."""