Skip to content

Commit a2d9781

Browse files
authored
fix: add SHA-256 checksum verification with fail-closed mode (B5 hardening)
- Verify binary SHA-256 checksum against published checksums.txt - Checksum verification before chmod (integrity-first) - CAPISCIO_REQUIRE_CHECKSUM=true fail-closed mode - 8 unit tests covering match, mismatch, fetch failure, fail-closed - Use target_path.name to prevent cache/key drift Part of design partner re-evaluation B5 hardening.
1 parent 803e243 commit a2d9781

File tree

2 files changed

+282
-4
lines changed

2 files changed

+282
-4
lines changed

capiscio_sdk/_rpc/process.py

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Process manager for the capiscio-core gRPC server."""
22

33
import atexit
4+
import hashlib
45
import logging
56
import os
67
import platform
@@ -168,6 +169,7 @@ def _download_binary(self) -> Path:
168169
"""Download the capiscio-core binary for the current platform.
169170
170171
Downloads from GitHub releases to ~/.capiscio/bin/<version>/.
172+
Verifies SHA-256 checksum against published checksums.txt.
171173
Retries up to 3 times with exponential backoff.
172174
Returns the path to the executable.
173175
"""
@@ -177,9 +179,7 @@ def _download_binary(self) -> Path:
177179
if target_path.exists():
178180
return target_path
179181

180-
ext = ".exe" if os_name == "windows" else ""
181-
filename = f"capiscio-{os_name}-{arch_name}{ext}"
182-
url = f"https://github.com/{GITHUB_REPO}/releases/download/v{CORE_VERSION}/{filename}"
182+
url = f"https://github.com/{GITHUB_REPO}/releases/download/v{CORE_VERSION}/{target_path.name}"
183183

184184
sys.stderr.write(
185185
f"capiscio-core v{CORE_VERSION} not found. "
@@ -198,7 +198,32 @@ def _download_binary(self) -> Path:
198198
for chunk in resp.iter_bytes(chunk_size=8192):
199199
f.write(chunk)
200200

201-
# Make executable
201+
# Verify checksum integrity BEFORE making executable
202+
require_checksum = os.environ.get("CAPISCIO_REQUIRE_CHECKSUM", "").lower() in ("1", "true", "yes")
203+
expected_hash = self._fetch_expected_checksum(CORE_VERSION, target_path.name)
204+
if expected_hash is not None:
205+
if not self._verify_checksum(target_path, expected_hash):
206+
target_path.unlink()
207+
raise RuntimeError(
208+
f"Binary integrity check failed for {target_path.name}. "
209+
"The downloaded file does not match the published checksum. "
210+
"This may indicate a tampered or corrupted download."
211+
)
212+
logger.info("Checksum verified for %s", target_path.name)
213+
elif require_checksum:
214+
target_path.unlink()
215+
raise RuntimeError(
216+
f"Checksum verification required (CAPISCIO_REQUIRE_CHECKSUM=true) "
217+
f"but checksums.txt is not available for v{CORE_VERSION}. "
218+
"Cannot verify binary integrity."
219+
)
220+
else:
221+
logger.warning(
222+
"Could not verify binary integrity (checksums.txt not available). "
223+
"Set CAPISCIO_REQUIRE_CHECKSUM=true to enforce verification."
224+
)
225+
226+
# Make executable only after checksum passes
202227
st = os.stat(target_path)
203228
os.chmod(target_path, st.st_mode | stat.S_IEXEC)
204229

@@ -225,6 +250,36 @@ def _download_binary(self) -> Path:
225250
) from e
226251
# unreachable, but keeps type checker happy
227252
raise RuntimeError("Download failed")
253+
254+
@staticmethod
255+
def _fetch_expected_checksum(version: str, filename: str) -> Optional[str]:
256+
"""Fetch the expected SHA-256 checksum from the release checksums.txt."""
257+
url = f"https://github.com/{GITHUB_REPO}/releases/download/v{version}/checksums.txt"
258+
try:
259+
resp = httpx.get(url, follow_redirects=True, timeout=30.0)
260+
resp.raise_for_status()
261+
for line in resp.text.strip().splitlines():
262+
parts = line.split()
263+
if len(parts) == 2 and parts[1] == filename:
264+
return parts[0]
265+
logger.warning("Binary %s not found in checksums.txt", filename)
266+
return None
267+
except httpx.HTTPError as e:
268+
logger.warning("Could not fetch checksums.txt: %s", e)
269+
return None
270+
271+
@staticmethod
272+
def _verify_checksum(file_path: Path, expected_hash: str) -> bool:
273+
"""Verify SHA-256 checksum of a downloaded file."""
274+
sha256 = hashlib.sha256()
275+
with open(file_path, "rb") as f:
276+
for chunk in iter(lambda: f.read(8192), b""):
277+
sha256.update(chunk)
278+
actual = sha256.hexdigest()
279+
if actual != expected_hash:
280+
logger.error("Checksum mismatch: expected %s, got %s", expected_hash, actual)
281+
return False
282+
return True
228283

229284
def ensure_running(
230285
self,

tests/unit/test_process.py

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,3 +304,226 @@ def test_address_property_returns_unix_by_default(self):
304304
pm._socket_path = None
305305
from capiscio_sdk._rpc.process import DEFAULT_SOCKET_PATH
306306
assert pm.address == f"unix://{DEFAULT_SOCKET_PATH}"
307+
308+
309+
class TestChecksumVerification:
310+
"""Tests for binary checksum verification paths."""
311+
312+
@patch("httpx.get")
313+
def test_fetch_expected_checksum_success(self, mock_get):
314+
"""Test _fetch_expected_checksum returns hash when file is found."""
315+
mock_resp = MagicMock()
316+
mock_resp.text = (
317+
"abc123def456 capiscio-linux-amd64\n"
318+
"789xyz000111 capiscio-darwin-arm64\n"
319+
)
320+
mock_resp.raise_for_status = MagicMock()
321+
mock_get.return_value = mock_resp
322+
323+
result = ProcessManager._fetch_expected_checksum("2.5.0", "capiscio-linux-amd64")
324+
assert result == "abc123def456"
325+
326+
@patch("httpx.get")
327+
def test_fetch_expected_checksum_file_not_in_list(self, mock_get):
328+
"""Test _fetch_expected_checksum returns None when filename not in checksums."""
329+
mock_resp = MagicMock()
330+
mock_resp.text = "abc123 capiscio-linux-amd64\n"
331+
mock_resp.raise_for_status = MagicMock()
332+
mock_get.return_value = mock_resp
333+
334+
result = ProcessManager._fetch_expected_checksum("2.5.0", "capiscio-darwin-arm64")
335+
assert result is None
336+
337+
@patch("httpx.get")
338+
def test_fetch_expected_checksum_http_error(self, mock_get):
339+
"""Test _fetch_expected_checksum returns None on HTTP error."""
340+
import httpx as httpx_mod
341+
mock_get.side_effect = httpx_mod.HTTPError("connection failed")
342+
343+
result = ProcessManager._fetch_expected_checksum("2.5.0", "capiscio-linux-amd64")
344+
assert result is None
345+
346+
@patch("httpx.get")
347+
@patch("httpx.stream")
348+
@patch("os.chmod")
349+
@patch("os.stat")
350+
def test_download_binary_checksum_match(self, mock_stat, mock_chmod, mock_stream, mock_get):
351+
"""Test successful download with matching checksum."""
352+
pm = ProcessManager()
353+
354+
with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"):
355+
with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"):
356+
with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached:
357+
mock_path = MagicMock(spec=Path)
358+
mock_path.exists.return_value = False
359+
mock_path.parent = MagicMock()
360+
mock_path.name = "capiscio-linux-amd64"
361+
mock_cached.return_value = mock_path
362+
363+
# Mock stream download
364+
mock_response = MagicMock()
365+
mock_response.iter_bytes.return_value = [b"binary_data"]
366+
mock_stream.return_value.__enter__.return_value = mock_response
367+
368+
# Mock checksum fetch (returns a hash)
369+
mock_get_resp = MagicMock()
370+
mock_get_resp.text = "fakehash123 capiscio-linux-amd64\n"
371+
mock_get_resp.raise_for_status = MagicMock()
372+
mock_get.return_value = mock_get_resp
373+
374+
# Mock verify_checksum to return True
375+
with patch.object(ProcessManager, "_verify_checksum", return_value=True):
376+
m_open = mock_open()
377+
with patch("builtins.open", m_open):
378+
result = pm._download_binary()
379+
380+
assert result == mock_path
381+
# chmod should be called (checksum passed)
382+
mock_chmod.assert_called_once()
383+
384+
@patch("httpx.get")
385+
@patch("httpx.stream")
386+
def test_download_binary_checksum_mismatch_deletes_file(self, mock_stream, mock_get):
387+
"""Test that checksum mismatch deletes the file and raises."""
388+
pm = ProcessManager()
389+
390+
with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"):
391+
with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"):
392+
with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached:
393+
mock_path = MagicMock(spec=Path)
394+
mock_path.exists.return_value = False
395+
mock_path.parent = MagicMock()
396+
mock_path.name = "capiscio-linux-amd64"
397+
mock_cached.return_value = mock_path
398+
399+
mock_response = MagicMock()
400+
mock_response.iter_bytes.return_value = [b"bad_data"]
401+
mock_stream.return_value.__enter__.return_value = mock_response
402+
403+
mock_get_resp = MagicMock()
404+
mock_get_resp.text = "expected_hash capiscio-linux-amd64\n"
405+
mock_get_resp.raise_for_status = MagicMock()
406+
mock_get.return_value = mock_get_resp
407+
408+
with patch.object(ProcessManager, "_verify_checksum", return_value=False):
409+
m_open = mock_open()
410+
with patch("builtins.open", m_open):
411+
with pytest.raises(RuntimeError, match="integrity check failed"):
412+
pm._download_binary()
413+
414+
# File should have been deleted
415+
mock_path.unlink.assert_called()
416+
417+
@patch("httpx.get")
418+
@patch("httpx.stream")
419+
def test_download_binary_require_checksum_no_checksums_available(self, mock_stream, mock_get):
420+
"""Test CAPISCIO_REQUIRE_CHECKSUM fails when checksums.txt unavailable."""
421+
import httpx as httpx_mod
422+
pm = ProcessManager()
423+
424+
with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"):
425+
with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"):
426+
with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached:
427+
mock_path = MagicMock(spec=Path)
428+
mock_path.exists.return_value = False
429+
mock_path.parent = MagicMock()
430+
mock_path.name = "capiscio-linux-amd64"
431+
mock_cached.return_value = mock_path
432+
433+
mock_response = MagicMock()
434+
mock_response.iter_bytes.return_value = [b"data"]
435+
mock_stream.return_value.__enter__.return_value = mock_response
436+
437+
# checksums.txt fetch fails
438+
mock_get.side_effect = httpx_mod.HTTPError("404")
439+
440+
with patch.dict(os.environ, {"CAPISCIO_REQUIRE_CHECKSUM": "true"}):
441+
m_open = mock_open()
442+
with patch("builtins.open", m_open):
443+
with pytest.raises(RuntimeError, match="Checksum verification required"):
444+
pm._download_binary()
445+
446+
mock_path.unlink.assert_called()
447+
448+
@patch("httpx.get")
449+
@patch("httpx.stream")
450+
@patch("os.chmod")
451+
@patch("os.stat")
452+
def test_download_binary_checksums_unavailable_without_require(
453+
self, mock_stat, mock_chmod, mock_stream, mock_get
454+
):
455+
"""Test download proceeds with warning when checksums unavailable and not required."""
456+
import httpx as httpx_mod
457+
pm = ProcessManager()
458+
459+
with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"):
460+
with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"):
461+
with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached:
462+
mock_path = MagicMock(spec=Path)
463+
mock_path.exists.return_value = False
464+
mock_path.parent = MagicMock()
465+
mock_path.name = "capiscio-linux-amd64"
466+
mock_cached.return_value = mock_path
467+
468+
mock_response = MagicMock()
469+
mock_response.iter_bytes.return_value = [b"data"]
470+
mock_stream.return_value.__enter__.return_value = mock_response
471+
472+
# checksums.txt not available
473+
mock_get.side_effect = httpx_mod.HTTPError("404")
474+
475+
with patch.dict(os.environ, {}, clear=False):
476+
# Ensure CAPISCIO_REQUIRE_CHECKSUM is not set
477+
os.environ.pop("CAPISCIO_REQUIRE_CHECKSUM", None)
478+
m_open = mock_open()
479+
with patch("builtins.open", m_open):
480+
result = pm._download_binary()
481+
482+
# Should succeed despite no checksum
483+
assert result == mock_path
484+
mock_chmod.assert_called_once()
485+
486+
@patch("httpx.get")
487+
@patch("httpx.stream")
488+
@patch("os.chmod")
489+
@patch("os.stat")
490+
def test_download_binary_chmod_after_checksum(self, mock_stat, mock_chmod, mock_stream, mock_get):
491+
"""Test that chmod happens AFTER checksum verification, not before."""
492+
pm = ProcessManager()
493+
call_order = []
494+
495+
with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"):
496+
with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"):
497+
with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached:
498+
mock_path = MagicMock(spec=Path)
499+
mock_path.exists.return_value = False
500+
mock_path.parent = MagicMock()
501+
mock_path.name = "capiscio-linux-amd64"
502+
mock_cached.return_value = mock_path
503+
504+
mock_response = MagicMock()
505+
mock_response.iter_bytes.return_value = [b"data"]
506+
mock_stream.return_value.__enter__.return_value = mock_response
507+
508+
mock_get_resp = MagicMock()
509+
mock_get_resp.text = "fakehash capiscio-linux-amd64\n"
510+
mock_get_resp.raise_for_status = MagicMock()
511+
mock_get.return_value = mock_get_resp
512+
513+
def track_verify(*a, **kw):
514+
call_order.append("verify")
515+
return True
516+
517+
def track_chmod(*a, **kw):
518+
call_order.append("chmod")
519+
520+
mock_chmod.side_effect = track_chmod
521+
522+
with patch.object(ProcessManager, "_verify_checksum", side_effect=track_verify):
523+
m_open = mock_open()
524+
with patch("builtins.open", m_open):
525+
pm._download_binary()
526+
527+
assert call_order == ["verify", "chmod"], (
528+
f"Expected verify before chmod, got: {call_order}"
529+
)

0 commit comments

Comments
 (0)