From bbc0c56b949871a4f9cf4d0afe85b7d2f6ec5f95 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Mon, 23 Feb 2026 10:08:37 +0100 Subject: [PATCH 1/8] feat: Add transport-level compression, remove application-level is_compressed Add Accept-Encoding (zstd, gzip, deflate) to all API requests so the server can compress response bodies at the HTTP transport layer. This replaces the application-level is_compressed=true query param which caused base64(gzip(data)) encoding inside JSON fields. - New transport_compression module handles Accept-Encoding negotiation and Content-Encoding decompression (zstd via optional zstandard package, gzip, deflate) - Client._rest_call() sends Accept-Encoding on every request and decompresses response bodies (including error responses) before JSON parsing - Remove is_compressed=true from 5 SDK callsites (get_events, get_children_events, get_detections, get_audit_logs, get_jobs) and replace unwrap() calls with direct JSON field access - Deprecate Client.unwrap() (kept for external backward compat) - zstandard is an optional dependency: pip install limacharlie[zstd] Co-Authored-By: Claude Opus 4.6 --- limacharlie/client.py | 24 ++++- limacharlie/sdk/organization.py | 15 ++- limacharlie/sdk/sensor.py | 8 +- limacharlie/transport_compression.py | 71 +++++++++++++ pyproject.toml | 2 + tests/unit/test_client.py | 128 +++++++++++++++++++++++ tests/unit/test_sdk_organization.py | 74 +++++++------ tests/unit/test_sdk_sensor.py | 49 ++++++++- tests/unit/test_transport_compression.py | 119 +++++++++++++++++++++ 9 files changed, 439 insertions(+), 51 deletions(-) create mode 100644 limacharlie/transport_compression.py create mode 100644 tests/unit/test_transport_compression.py diff --git a/limacharlie/client.py b/limacharlie/client.py index 04654612..731c8c23 100644 --- a/limacharlie/client.py +++ b/limacharlie/client.py @@ -28,6 +28,7 @@ RateLimitError, error_from_status_code, ) +from .transport_compression import ACCEPT_ENCODING, decompress_response from .user_agent_utils import build_user_agent __version__ = "5.0.0" @@ -153,10 +154,13 @@ def _debug(self, msg: str) -> None: def unwrap(data: str, is_raw: bool = False) -> Any: """Decompress gzip+base64 encoded data from the API. - Used when is_compressed=true is set on requests. The API returns - data as base64-encoded gzip-compressed JSON. + .. deprecated:: + Application-level compression (is_compressed=true) has been + replaced by transport-level compression (Accept-Encoding). + This method is kept for backward compatibility with external + callers but is no longer used internally by the SDK. - Args: + Parameters: data: Base64-encoded gzip-compressed string. is_raw: If True, return raw bytes instead of parsed JSON. @@ -310,6 +314,10 @@ def _rest_call(self, url: str, verb: str, params: dict[str, Any] | None = None, request = URLRequest(full_url, body, headers=headers) request.get_method = lambda: verb request.add_header("User-Agent", self._user_agent) + # Request compressed responses at the transport level. + # urllib doesn't auto-decompress like requests does, so we + # handle decompression ourselves in the response path. + request.add_header("Accept-Encoding", ACCEPT_ENCODING) if content_type is not None: request.add_header("Content-Type", content_type) @@ -323,6 +331,12 @@ def _rest_call(self, url: str, verb: str, params: dict[str, Any] | None = None, try: data = u.read() + # Decompress transport-level encoding (gzip, zstd, etc.) + # before JSON parsing. The server may compress the entire + # response body when we send Accept-Encoding. + content_enc = u.headers.get("Content-Encoding") + if data and content_enc: + data = decompress_response(data, content_enc) resp = json.loads(data.decode()) if data else {} except ValueError: resp = {} @@ -347,6 +361,10 @@ def _rest_call(self, url: str, verb: str, params: dict[str, Any] | None = None, except HTTPError as e: error_body = e.read() + # Error responses can also be transport-compressed. + error_enc = e.headers.get("Content-Encoding") if hasattr(e, "headers") else None + if error_body and error_enc: + error_body = decompress_response(error_body, error_enc) try: resp = json.loads(error_body.decode()) except Exception: diff --git a/limacharlie/sdk/organization.py b/limacharlie/sdk/organization.py index 87fd8dbc..88c011e3 100644 --- a/limacharlie/sdk/organization.py +++ b/limacharlie/sdk/organization.py @@ -868,7 +868,7 @@ def get_detections(self, start: int, end: int, limit: int | None = None, categor cursor = "-" n_returned = 0 while cursor: - qp = {"start": str(int(start)), "end": str(int(end)), "cursor": cursor, "is_compressed": "true"} + qp = {"start": str(int(start)), "end": str(int(end)), "cursor": cursor} if limit is not None: qp["limit"] = str(limit) if category: @@ -876,7 +876,7 @@ def get_detections(self, start: int, end: int, limit: int | None = None, categor resp = self._client.request("GET", f"insight/{self.oid}/detections", query_params=qp) cursor = resp.get("next_cursor") - for d in self._client.unwrap(resp.get("detects", "")): + for d in resp.get("detects", []): yield d n_returned += 1 if limit is not None and n_returned >= limit: @@ -913,7 +913,7 @@ def get_audit_logs(self, start: int, end: int, limit: int | None = None, event_t cursor = "-" n_returned = 0 while cursor: - qp = {"start": str(int(start)), "end": str(int(end)), "cursor": cursor, "is_compressed": "true"} + qp = {"start": str(int(start)), "end": str(int(end)), "cursor": cursor} if limit is not None: qp["limit"] = str(limit) if event_type: @@ -923,7 +923,7 @@ def get_audit_logs(self, start: int, end: int, limit: int | None = None, event_t resp = self._client.request("GET", f"insight/{self.oid}/audit", query_params=qp) cursor = resp.get("next_cursor") - for entry in self._client.unwrap(resp.get("events", "")): + for entry in resp.get("events", []): yield entry n_returned += 1 if limit is not None and n_returned >= limit: @@ -946,7 +946,7 @@ def get_jobs(self, start_time: int | None = None, end_time: int | None = None, l list: Job dicts. """ import time as _time - qp = {"is_compressed": "true", "with_data": "false"} + qp = {"with_data": "false"} if start_time is None: start_time = int(_time.time()) - 86400 if end_time is None: @@ -958,8 +958,7 @@ def get_jobs(self, start_time: int | None = None, end_time: int | None = None, l if sid is not None: qp["sid"] = str(sid) resp = self._client.request("GET", f"job/{self.oid}", query_params=qp) - raw_jobs = resp.get("jobs", "") + raw_jobs = resp.get("jobs", {}) if not raw_jobs: return [] - jobs = self._client.unwrap(raw_jobs) - return [job for job_id, job in jobs.items()] + return [job for job_id, job in raw_jobs.items()] diff --git a/limacharlie/sdk/sensor.py b/limacharlie/sdk/sensor.py index 800b569e..fb735232 100644 --- a/limacharlie/sdk/sensor.py +++ b/limacharlie/sdk/sensor.py @@ -253,7 +253,6 @@ def get_events(self, start: int, end: int, limit: int | None = None, event_type: qp = { "start": str(int(start)), "end": str(int(end)), - "is_compressed": "true", "is_forward": "true" if is_forward else "false", "cursor": cursor, } @@ -264,7 +263,7 @@ def get_events(self, start: int, end: int, limit: int | None = None, event_type: resp = self.client.request("GET", f"insight/{self._org.oid}/{self.sid}", query_params=qp) cursor = resp.get("next_cursor") - for evt in self.client.unwrap(resp.get("events", "")): + for evt in resp.get("events", []): yield evt n_returned += 1 if limit is not None and n_returned >= limit: @@ -306,9 +305,8 @@ def get_children_events(self, atom: str) -> list[dict[str, Any]]: Returns: list: Child events. """ - data = self.client.request("GET", f"insight/{self._org.oid}/{self.sid}/{atom}/children", - query_params={"is_compressed": "true"}) - return self.client.unwrap(data.get("events", "")) + data = self.client.request("GET", f"insight/{self._org.oid}/{self.sid}/{atom}/children") + return data.get("events", []) def get_event_retention(self, start: int, end: int, is_detailed: bool = False) -> dict[str, Any]: """Get event retention statistics. diff --git a/limacharlie/transport_compression.py b/limacharlie/transport_compression.py new file mode 100644 index 00000000..05a4bd1d --- /dev/null +++ b/limacharlie/transport_compression.py @@ -0,0 +1,71 @@ +"""Transport-level HTTP compression support. + +Handles Accept-Encoding negotiation and Content-Encoding decompression +for HTTP responses. Supports zstd (if the `zstandard` package is installed), +gzip, and deflate. + +zstd is preferred when available because it offers better compression ratios +and faster decompression than gzip. Install via `pip install limacharlie[zstd]`. +""" + +from __future__ import annotations + +import zlib + +# Probe for zstandard at import time. This is an optional dependency +# installed via `pip install limacharlie[zstd]`. +try: + import zstandard as _zstd + + _HAS_ZSTD = True +except ImportError: + _zstd = None # type: ignore[assignment] + _HAS_ZSTD = False + +# Header value sent on every request. Prefer zstd when available. +ACCEPT_ENCODING: str = "zstd, gzip, deflate" if _HAS_ZSTD else "gzip, deflate" + + +def decompress_response(data: bytes, content_encoding: str | None) -> bytes: + """Decompress an HTTP response body based on Content-Encoding. + + If the encoding is unrecognized or absent, the data is returned as-is + (passthrough). This matches standard HTTP client behavior - servers may + return uncompressed responses even when Accept-Encoding was sent. + + Parameters: + data: Raw response body bytes. + content_encoding: Value of the Content-Encoding response header, + or None if the header was absent. + + Returns: + Decompressed bytes, or the original bytes if no decompression needed. + """ + if not content_encoding: + return data + + encoding = content_encoding.strip().lower() + + if encoding == "zstd": + if not _HAS_ZSTD: + # Server sent zstd but we don't have the library. This shouldn't + # happen since we only advertise zstd in Accept-Encoding when the + # library is available, but handle it gracefully. + return data + return _zstd.ZstdDecompressor().decompress(data) + + if encoding in ("gzip", "x-gzip"): + # 16 + MAX_WBITS tells zlib to auto-detect gzip vs raw deflate + return zlib.decompress(data, 16 + zlib.MAX_WBITS) + + if encoding == "deflate": + # Try raw deflate first, fall back to zlib-wrapped deflate + try: + return zlib.decompress(data, -zlib.MAX_WBITS) + except zlib.error: + return zlib.decompress(data) + + # Unknown encoding - return data as-is rather than crashing. + # The caller will attempt JSON parsing which will fail with a clear + # error if the data is actually compressed. + return data diff --git a/pyproject.toml b/pyproject.toml index 281af51c..ad11a0a6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,9 @@ dependencies = [ ] [project.optional-dependencies] +zstd = ["zstandard>=0.22.0"] dev = [ + "zstandard>=0.22.0", "pytest==8.3.4", "pytest-rerunfailures==15.0", "tomli>=1.0; python_version < '3.11'", diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index bb9b06fa..20a5b5f6 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -1,5 +1,6 @@ """Tests for limacharlie.client module.""" +import gzip import json from unittest.mock import MagicMock, patch @@ -9,6 +10,7 @@ from limacharlie.errors import ( AuthenticationError, ApiError, + LimaCharlieError, RateLimitError, ) @@ -289,6 +291,132 @@ def set_jwt(c): assert result == {"ok": True} +class TestTransportCompression: + @patch("limacharlie.client.urlopen") + def test_request_sends_accept_encoding_header(self, mock_urlopen): + """Every outgoing request should include Accept-Encoding.""" + jwt_response = MagicMock() + jwt_response.read.return_value = json.dumps({"jwt": "test-jwt"}).encode() + jwt_response.close = MagicMock() + + api_response = MagicMock() + api_response.read.return_value = json.dumps({"ok": True}).encode() + api_response.close = MagicMock() + api_response.getheaders.return_value = [] + api_response.headers = MagicMock() + api_response.headers.get.return_value = None + + mock_urlopen.side_effect = [jwt_response, api_response] + + client = Client(oid="test-oid", api_key="test-key") + client.request("GET", "sensors") + + # The second call is the API request (first is JWT) + api_request = mock_urlopen.call_args_list[1][0][0] + accept_enc = api_request.get_header("Accept-encoding") + assert "gzip" in accept_enc + assert "deflate" in accept_enc + + @patch("limacharlie.client.urlopen") + def test_request_decompresses_gzip_response(self, mock_urlopen): + """Response with Content-Encoding: gzip should be decompressed transparently.""" + jwt_response = MagicMock() + jwt_response.read.return_value = json.dumps({"jwt": "test-jwt"}).encode() + jwt_response.close = MagicMock() + + # Build a gzip-compressed JSON body + body = json.dumps({"sensors": ["s1", "s2"]}).encode() + compressed_body = gzip.compress(body) + + api_response = MagicMock() + api_response.read.return_value = compressed_body + api_response.close = MagicMock() + api_response.getheaders.return_value = [] + api_response.headers = MagicMock() + api_response.headers.get.return_value = "gzip" + + mock_urlopen.side_effect = [jwt_response, api_response] + + client = Client(oid="test-oid", api_key="test-key") + result = client.request("GET", "sensors") + + assert result == {"sensors": ["s1", "s2"]} + + @patch("limacharlie.client.urlopen") + def test_request_decompresses_zstd_response(self, mock_urlopen): + """Response with Content-Encoding: zstd should be decompressed.""" + zstandard = pytest.importorskip("zstandard") + + jwt_response = MagicMock() + jwt_response.read.return_value = json.dumps({"jwt": "test-jwt"}).encode() + jwt_response.close = MagicMock() + + body = json.dumps({"detects": [{"id": "d1"}]}).encode() + cctx = zstandard.ZstdCompressor() + compressed_body = cctx.compress(body) + + api_response = MagicMock() + api_response.read.return_value = compressed_body + api_response.close = MagicMock() + api_response.getheaders.return_value = [] + api_response.headers = MagicMock() + api_response.headers.get.return_value = "zstd" + + mock_urlopen.side_effect = [jwt_response, api_response] + + client = Client(oid="test-oid", api_key="test-key") + result = client.request("GET", "detections") + + assert result == {"detects": [{"id": "d1"}]} + + @patch("limacharlie.client.urlopen") + def test_request_handles_uncompressed_response(self, mock_urlopen): + """When server returns no Content-Encoding, response works as before.""" + jwt_response = MagicMock() + jwt_response.read.return_value = json.dumps({"jwt": "test-jwt"}).encode() + jwt_response.close = MagicMock() + + api_response = MagicMock() + api_response.read.return_value = json.dumps({"ok": True}).encode() + api_response.close = MagicMock() + api_response.getheaders.return_value = [] + api_response.headers = MagicMock() + api_response.headers.get.return_value = None + + mock_urlopen.side_effect = [jwt_response, api_response] + + client = Client(oid="test-oid", api_key="test-key") + result = client.request("GET", "test") + + assert result == {"ok": True} + + @patch("limacharlie.client.urlopen") + def test_error_response_decompressed(self, mock_urlopen): + """HTTPError bodies with Content-Encoding should be decompressed.""" + from urllib.error import HTTPError + import io + + jwt_response = MagicMock() + jwt_response.read.return_value = json.dumps({"jwt": "test-jwt"}).encode() + jwt_response.close = MagicMock() + + error_body = json.dumps({"error": "not found"}).encode() + compressed_error = gzip.compress(error_body) + + error = HTTPError( + "https://api.limacharlie.io/v1/test", 404, "Not Found", + {"Content-Encoding": "gzip"}, io.BytesIO(compressed_error), + ) + # HTTPError wraps headers in an http.client.HTTPMessage + error.headers = error.headers # already set via the constructor + + mock_urlopen.side_effect = [jwt_response, error] + + client = Client(oid="test-oid", api_key="test-key") + with pytest.raises(LimaCharlieError): + client.request("GET", "test") + + class TestBuildUserAgent: def test_user_agent_format(self): ua = _build_user_agent() diff --git a/tests/unit/test_sdk_organization.py b/tests/unit/test_sdk_organization.py index 4dadd266..f4fe9d3a 100644 --- a/tests/unit/test_sdk_organization.py +++ b/tests/unit/test_sdk_organization.py @@ -739,18 +739,25 @@ def test_remove_group_org(self, org, mock_client): class TestDetections: def test_get_detections_single_page(self, org, mock_client): - mock_client.request.return_value = {"detects": "", "next_cursor": None} - mock_client.unwrap.return_value = [{"detect_id": "d1"}] + mock_client.request.return_value = { + "detects": [{"detect_id": "d1"}], + "next_cursor": None, + } result = list(org.get_detections(1000, 2000)) mock_client.request.assert_called_once_with( "GET", "insight/test-oid-123/detections", - query_params={"start": "1000", "end": "2000", "cursor": "-", "is_compressed": "true"}, + query_params={"start": "1000", "end": "2000", "cursor": "-"}, ) + # is_compressed must NOT be in query params + qp = mock_client.request.call_args[1]["query_params"] + assert "is_compressed" not in qp assert result == [{"detect_id": "d1"}] def test_get_detections_with_limit_and_category(self, org, mock_client): - mock_client.request.return_value = {"detects": "", "next_cursor": None} - mock_client.unwrap.return_value = [{"detect_id": "d1"}] + mock_client.request.return_value = { + "detects": [{"detect_id": "d1"}], + "next_cursor": None, + } result = list(org.get_detections(1000, 2000, limit=5, category="lateral")) qp = mock_client.request.call_args[1]["query_params"] assert qp["limit"] == "5" @@ -758,20 +765,18 @@ def test_get_detections_with_limit_and_category(self, org, mock_client): def test_get_detections_pagination(self, org, mock_client): mock_client.request.side_effect = [ - {"detects": "compressed1", "next_cursor": "cursor2"}, - {"detects": "compressed2", "next_cursor": None}, - ] - mock_client.unwrap.side_effect = [ - [{"detect_id": "d1"}], - [{"detect_id": "d2"}], + {"detects": [{"detect_id": "d1"}], "next_cursor": "cursor2"}, + {"detects": [{"detect_id": "d2"}], "next_cursor": None}, ] result = list(org.get_detections(1000, 2000)) assert len(result) == 2 assert mock_client.request.call_count == 2 def test_get_detections_limit_stops_iteration(self, org, mock_client): - mock_client.request.return_value = {"detects": "", "next_cursor": "more"} - mock_client.unwrap.return_value = [{"detect_id": "d1"}, {"detect_id": "d2"}] + mock_client.request.return_value = { + "detects": [{"detect_id": "d1"}, {"detect_id": "d2"}], + "next_cursor": "more", + } result = list(org.get_detections(1000, 2000, limit=1)) assert len(result) == 1 @@ -784,18 +789,25 @@ def test_get_detection_by_id(self, org, mock_client): class TestAuditLogs: def test_get_audit_logs_single_page(self, org, mock_client): - mock_client.request.return_value = {"events": "", "next_cursor": None} - mock_client.unwrap.return_value = [{"event": "login"}] + mock_client.request.return_value = { + "events": [{"event": "login"}], + "next_cursor": None, + } result = list(org.get_audit_logs(1000, 2000)) mock_client.request.assert_called_once_with( "GET", "insight/test-oid-123/audit", - query_params={"start": "1000", "end": "2000", "cursor": "-", "is_compressed": "true"}, + query_params={"start": "1000", "end": "2000", "cursor": "-"}, ) + # is_compressed must NOT be in query params + qp = mock_client.request.call_args[1]["query_params"] + assert "is_compressed" not in qp assert result == [{"event": "login"}] def test_get_audit_logs_with_filters(self, org, mock_client): - mock_client.request.return_value = {"events": "", "next_cursor": None} - mock_client.unwrap.return_value = [] + mock_client.request.return_value = { + "events": [], + "next_cursor": None, + } list(org.get_audit_logs(1000, 2000, limit=10, event_type="auth", sid="sid-1")) qp = mock_client.request.call_args[1]["query_params"] assert qp["limit"] == "10" @@ -804,42 +816,42 @@ def test_get_audit_logs_with_filters(self, org, mock_client): def test_get_audit_logs_pagination(self, org, mock_client): mock_client.request.side_effect = [ - {"events": "c1", "next_cursor": "cursor2"}, - {"events": "c2", "next_cursor": None}, - ] - mock_client.unwrap.side_effect = [ - [{"event": "e1"}], - [{"event": "e2"}], + {"events": [{"event": "e1"}], "next_cursor": "cursor2"}, + {"events": [{"event": "e2"}], "next_cursor": None}, ] result = list(org.get_audit_logs(1000, 2000)) assert len(result) == 2 def test_get_audit_logs_limit_stops_iteration(self, org, mock_client): - mock_client.request.return_value = {"events": "", "next_cursor": "more"} - mock_client.unwrap.return_value = [{"event": "e1"}, {"event": "e2"}] + mock_client.request.return_value = { + "events": [{"event": "e1"}, {"event": "e2"}], + "next_cursor": "more", + } result = list(org.get_audit_logs(1000, 2000, limit=1)) assert len(result) == 1 class TestJobs: def test_get_jobs_with_explicit_times(self, org, mock_client): - mock_client.request.return_value = {"jobs": {"j1": {"name": "scan"}, "j2": {"name": "resp"}}} - mock_client.unwrap.return_value = {"j1": {"name": "scan"}, "j2": {"name": "resp"}} + mock_client.request.return_value = { + "jobs": {"j1": {"name": "scan"}, "j2": {"name": "resp"}}, + } result = org.get_jobs(start_time=1000, end_time=2000) qp = mock_client.request.call_args[1]["query_params"] assert qp["start"] == "1000" assert qp["end"] == "2000" - assert qp["is_compressed"] == "true" + # is_compressed must NOT be in query params + assert "is_compressed" not in qp assert qp["with_data"] == "false" assert len(result) == 2 def test_get_jobs_empty(self, org, mock_client): - mock_client.request.return_value = {"jobs": ""} + mock_client.request.return_value = {"jobs": {}} result = org.get_jobs(start_time=1000, end_time=2000) assert result == [] def test_get_jobs_with_limit_and_sid(self, org, mock_client): - mock_client.request.return_value = {"jobs": ""} + mock_client.request.return_value = {"jobs": {}} org.get_jobs(start_time=1000, end_time=2000, limit=5, sid="sid-1") qp = mock_client.request.call_args[1]["query_params"] assert qp["limit"] == "5" diff --git a/tests/unit/test_sdk_sensor.py b/tests/unit/test_sdk_sensor.py index 0efef15a..cf7f5360 100644 --- a/tests/unit/test_sdk_sensor.py +++ b/tests/unit/test_sdk_sensor.py @@ -124,6 +124,45 @@ def test_delete(self, sensor, mock_org): assert call_args[0][0] == "DELETE" +class TestSensorGetEventsContract: + def test_get_events_query_params(self, sensor, mock_org): + """get_events should not send is_compressed - transport compression handles it.""" + mock_org.client.request.return_value = { + "events": [{"type": "NEW_PROCESS"}], + "next_cursor": None, + } + result = list(sensor.get_events(1000, 2000)) + qp = mock_org.client.request.call_args[1]["query_params"] + # is_compressed must NOT be in query params + assert "is_compressed" not in qp + assert qp["start"] == "1000" + assert qp["end"] == "2000" + assert qp["is_forward"] == "true" + assert result == [{"type": "NEW_PROCESS"}] + + def test_get_events_with_limit(self, sensor, mock_org): + mock_org.client.request.return_value = { + "events": [{"type": "e1"}, {"type": "e2"}, {"type": "e3"}], + "next_cursor": "more", + } + result = list(sensor.get_events(1000, 2000, limit=2)) + assert len(result) == 2 + + def test_get_events_pagination(self, sensor, mock_org): + mock_org.client.request.side_effect = [ + {"events": [{"type": "e1"}], "next_cursor": "cursor2"}, + {"events": [{"type": "e2"}], "next_cursor": None}, + ] + result = list(sensor.get_events(1000, 2000)) + assert len(result) == 2 + assert mock_org.client.request.call_count == 2 + + def test_get_events_empty(self, sensor, mock_org): + mock_org.client.request.return_value = {"events": [], "next_cursor": None} + result = list(sensor.get_events(1000, 2000)) + assert result == [] + + class TestSensorEventRetention: def test_get_event_retention_uses_correct_params(self, sensor, mock_org): mock_org.client.request.return_value = {"retention": {}} @@ -262,12 +301,14 @@ def test_get_event_by_atom_path(self, sensor, mock_org): class TestSensorGetChildrenEventsContract: def test_get_children_events_params(self, sensor, mock_org): - mock_org.client.request.return_value = {"events": "compressed-data"} - mock_org.client.unwrap.return_value = [{"type": "FILE_CREATE"}] + mock_org.client.request.return_value = {"events": [{"type": "FILE_CREATE"}]} result = sensor.get_children_events("atom-xyz") mock_org.client.request.assert_called_once_with( "GET", "insight/test-oid/aaaa-bbbb-cccc-dddd/atom-xyz/children", - query_params={"is_compressed": "true"}, ) - mock_org.client.unwrap.assert_called_once_with("compressed-data") assert result == [{"type": "FILE_CREATE"}] + + def test_get_children_events_empty(self, sensor, mock_org): + mock_org.client.request.return_value = {} + result = sensor.get_children_events("atom-xyz") + assert result == [] diff --git a/tests/unit/test_transport_compression.py b/tests/unit/test_transport_compression.py new file mode 100644 index 00000000..a2177752 --- /dev/null +++ b/tests/unit/test_transport_compression.py @@ -0,0 +1,119 @@ +"""Tests for limacharlie.transport_compression module.""" + +import gzip +import zlib +from unittest.mock import patch + +import pytest + +from limacharlie.transport_compression import ( + ACCEPT_ENCODING, + decompress_response, +) + + +class TestAcceptEncoding: + def test_includes_zstd_when_available(self): + """zstandard is installed in dev, so zstd should be in the header.""" + assert "zstd" in ACCEPT_ENCODING + assert "gzip" in ACCEPT_ENCODING + assert "deflate" in ACCEPT_ENCODING + + def test_fallback_without_zstd(self): + """When zstandard is not importable, Accept-Encoding should omit zstd.""" + # Re-import the module with zstandard mocked away + import importlib + import limacharlie.transport_compression as tc + + original_has_zstd = tc._HAS_ZSTD + try: + tc._HAS_ZSTD = False + # Verify the constant construction logic - when _HAS_ZSTD is False, + # the module-level ACCEPT_ENCODING would be "gzip, deflate". + expected = "gzip, deflate" + assert expected == ("zstd, gzip, deflate" if tc._HAS_ZSTD else "gzip, deflate") + finally: + tc._HAS_ZSTD = original_has_zstd + + +class TestDecompressGzip: + def test_round_trip(self): + """Compress with gzip, decompress with our function.""" + original = b'{"events": [{"type": "NEW_PROCESS"}]}' + compressed = gzip.compress(original) + result = decompress_response(compressed, "gzip") + assert result == original + + def test_x_gzip_alias(self): + """x-gzip is a legacy alias for gzip.""" + original = b'{"ok": true}' + compressed = gzip.compress(original) + result = decompress_response(compressed, "x-gzip") + assert result == original + + def test_case_insensitive(self): + """Content-Encoding values should be case-insensitive.""" + original = b'{"data": "test"}' + compressed = gzip.compress(original) + result = decompress_response(compressed, "GZIP") + assert result == original + + +class TestDecompressDeflate: + def test_raw_deflate_round_trip(self): + """Compress with raw deflate, decompress with our function.""" + original = b'{"sensors": []}' + # Raw deflate (no zlib header) + compressed = zlib.compress(original) + # zlib.compress produces zlib-wrapped deflate, test that path + result = decompress_response(compressed, "deflate") + assert result == original + + +class TestDecompressZstd: + def test_round_trip(self): + """Compress with zstandard, decompress with our function.""" + zstandard = pytest.importorskip("zstandard") + original = b'{"detects": [{"id": "d-1", "title": "suspicious"}]}' + cctx = zstandard.ZstdCompressor() + compressed = cctx.compress(original) + result = decompress_response(compressed, "zstd") + assert result == original + + def test_large_payload(self): + """Verify zstd works with larger payloads (realistic JSON response).""" + zstandard = pytest.importorskip("zstandard") + # Build a realistic-ish JSON payload + import json + events = [{"type": "NEW_PROCESS", "id": f"evt-{i}", "data": "x" * 100} for i in range(500)] + original = json.dumps({"events": events}).encode() + cctx = zstandard.ZstdCompressor() + compressed = cctx.compress(original) + result = decompress_response(compressed, "zstd") + assert result == original + + +class TestPassthrough: + def test_none_encoding(self): + """None content_encoding means no compression - passthrough.""" + data = b'{"events": []}' + result = decompress_response(data, None) + assert result is data # Same object, not just equal + + def test_empty_encoding(self): + """Empty string content_encoding - passthrough.""" + data = b'{"events": []}' + result = decompress_response(data, "") + assert result is data + + def test_unknown_encoding(self): + """Unknown encoding - passthrough without crashing.""" + data = b'{"events": []}' + result = decompress_response(data, "br") # brotli, not supported + assert result is data + + def test_whitespace_encoding(self): + """Whitespace-only encoding string - passthrough.""" + data = b'{"events": []}' + result = decompress_response(data, " ") + assert result is data From 3fea868d752d2c678bd76de92b755b62f1f7de52 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Mon, 23 Feb 2026 10:21:57 +0100 Subject: [PATCH 2/8] feat: Make zstandard a hard dependency, add cross-platform GHA CI Move zstandard from optional to required dependency - pre-built wheels are available for all supported platforms (Linux/macOS/Windows, x86_64/ARM64, glibc/musl). Add GitHub Actions CI workflow that runs unit tests and wheel install verification across Python 3.10-3.13 on ubuntu, macos, and windows runners. Each job verifies zstandard installs and zstd decompression works end-to-end. Co-Authored-By: Claude Opus 4.6 --- .github/workflows/ci.yml | 84 ++++++++++++++++++++++++ limacharlie/transport_compression.py | 26 ++------ pyproject.toml | 3 +- tests/unit/test_client.py | 2 +- tests/unit/test_transport_compression.py | 32 +++------ 5 files changed, 100 insertions(+), 47 deletions(-) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 00000000..ad93eda3 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,84 @@ +name: CI + +on: + pull_request: + branches: [cli-v2, master] + push: + branches: [cli-v2, master] + +jobs: + unit-tests: + name: "Tests - Python ${{ matrix.python-version }} / ${{ matrix.os }}" + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + python-version: ["3.10", "3.11", "3.12", "3.13"] + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install package + run: | + python -m pip install --upgrade pip + pip install ".[dev]" + + - name: Verify zstandard is installed + run: python -c "import zstandard; print(f'zstandard {zstandard.__version__} OK')" + + - name: Verify transport compression imports + run: | + python -c " + from limacharlie.transport_compression import ACCEPT_ENCODING, decompress_response + assert 'zstd' in ACCEPT_ENCODING, f'zstd missing from: {ACCEPT_ENCODING}' + print(f'Accept-Encoding: {ACCEPT_ENCODING}') + " + + - name: Run unit tests + run: pytest -v --durations=10 tests/unit/ + + wheel-sanity: + name: "Wheel install - Python ${{ matrix.python-version }} / ${{ matrix.os }}" + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + python-version: ["3.10", "3.13"] + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Build and install wheel + run: | + python -m pip install --upgrade pip setuptools wheel build + python -m build + pip install dist/limacharlie-*-py3-none-any.whl + + - name: Verify CLI works + run: limacharlie version + + - name: Verify zstandard available after wheel install + run: python -c "import zstandard; print(f'zstandard {zstandard.__version__} OK')" + + - name: Verify zstd decompression works end-to-end + run: | + python -c " + import zstandard, json + from limacharlie.transport_compression import decompress_response + # Round-trip: compress then decompress + original = json.dumps({'events': [{'type': 'test'}]}).encode() + compressed = zstandard.ZstdCompressor().compress(original) + result = decompress_response(compressed, 'zstd') + assert result == original, 'zstd round-trip failed' + print('zstd round-trip OK') + " diff --git a/limacharlie/transport_compression.py b/limacharlie/transport_compression.py index 05a4bd1d..0aa01e63 100644 --- a/limacharlie/transport_compression.py +++ b/limacharlie/transport_compression.py @@ -1,29 +1,20 @@ """Transport-level HTTP compression support. Handles Accept-Encoding negotiation and Content-Encoding decompression -for HTTP responses. Supports zstd (if the `zstandard` package is installed), -gzip, and deflate. +for HTTP responses. Supports zstd, gzip, and deflate. -zstd is preferred when available because it offers better compression ratios -and faster decompression than gzip. Install via `pip install limacharlie[zstd]`. +zstd is preferred because it offers better compression ratios and faster +decompression than gzip. """ from __future__ import annotations import zlib -# Probe for zstandard at import time. This is an optional dependency -# installed via `pip install limacharlie[zstd]`. -try: - import zstandard as _zstd +import zstandard as _zstd - _HAS_ZSTD = True -except ImportError: - _zstd = None # type: ignore[assignment] - _HAS_ZSTD = False - -# Header value sent on every request. Prefer zstd when available. -ACCEPT_ENCODING: str = "zstd, gzip, deflate" if _HAS_ZSTD else "gzip, deflate" +# Header value sent on every request. Prefer zstd over gzip. +ACCEPT_ENCODING: str = "zstd, gzip, deflate" def decompress_response(data: bytes, content_encoding: str | None) -> bytes: @@ -47,11 +38,6 @@ def decompress_response(data: bytes, content_encoding: str | None) -> bytes: encoding = content_encoding.strip().lower() if encoding == "zstd": - if not _HAS_ZSTD: - # Server sent zstd but we don't have the library. This shouldn't - # happen since we only advertise zstd in Accept-Encoding when the - # library is available, but handle it gracefully. - return data return _zstd.ZstdDecompressor().decompress(data) if encoding in ("gzip", "x-gzip"): diff --git a/pyproject.toml b/pyproject.toml index ad11a0a6..ab814790 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,12 +21,11 @@ dependencies = [ "cryptography==46.0.3", "click==8.1.8", "jmespath==1.1.0", + "zstandard>=0.22.0", ] [project.optional-dependencies] -zstd = ["zstandard>=0.22.0"] dev = [ - "zstandard>=0.22.0", "pytest==8.3.4", "pytest-rerunfailures==15.0", "tomli>=1.0; python_version < '3.11'", diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 20a5b5f6..1daba5f0 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -345,7 +345,7 @@ def test_request_decompresses_gzip_response(self, mock_urlopen): @patch("limacharlie.client.urlopen") def test_request_decompresses_zstd_response(self, mock_urlopen): """Response with Content-Encoding: zstd should be decompressed.""" - zstandard = pytest.importorskip("zstandard") + import zstandard jwt_response = MagicMock() jwt_response.read.return_value = json.dumps({"jwt": "test-jwt"}).encode() diff --git a/tests/unit/test_transport_compression.py b/tests/unit/test_transport_compression.py index a2177752..a3d98e29 100644 --- a/tests/unit/test_transport_compression.py +++ b/tests/unit/test_transport_compression.py @@ -1,10 +1,11 @@ """Tests for limacharlie.transport_compression module.""" import gzip +import json import zlib -from unittest.mock import patch import pytest +import zstandard from limacharlie.transport_compression import ( ACCEPT_ENCODING, @@ -13,27 +14,15 @@ class TestAcceptEncoding: - def test_includes_zstd_when_available(self): - """zstandard is installed in dev, so zstd should be in the header.""" + def test_includes_all_algorithms(self): + """Accept-Encoding should advertise zstd, gzip, and deflate.""" assert "zstd" in ACCEPT_ENCODING assert "gzip" in ACCEPT_ENCODING assert "deflate" in ACCEPT_ENCODING - def test_fallback_without_zstd(self): - """When zstandard is not importable, Accept-Encoding should omit zstd.""" - # Re-import the module with zstandard mocked away - import importlib - import limacharlie.transport_compression as tc - - original_has_zstd = tc._HAS_ZSTD - try: - tc._HAS_ZSTD = False - # Verify the constant construction logic - when _HAS_ZSTD is False, - # the module-level ACCEPT_ENCODING would be "gzip, deflate". - expected = "gzip, deflate" - assert expected == ("zstd, gzip, deflate" if tc._HAS_ZSTD else "gzip, deflate") - finally: - tc._HAS_ZSTD = original_has_zstd + def test_zstd_preferred(self): + """zstd should be listed first (highest priority).""" + assert ACCEPT_ENCODING.startswith("zstd") class TestDecompressGzip: @@ -63,9 +52,8 @@ class TestDecompressDeflate: def test_raw_deflate_round_trip(self): """Compress with raw deflate, decompress with our function.""" original = b'{"sensors": []}' - # Raw deflate (no zlib header) - compressed = zlib.compress(original) # zlib.compress produces zlib-wrapped deflate, test that path + compressed = zlib.compress(original) result = decompress_response(compressed, "deflate") assert result == original @@ -73,7 +61,6 @@ def test_raw_deflate_round_trip(self): class TestDecompressZstd: def test_round_trip(self): """Compress with zstandard, decompress with our function.""" - zstandard = pytest.importorskip("zstandard") original = b'{"detects": [{"id": "d-1", "title": "suspicious"}]}' cctx = zstandard.ZstdCompressor() compressed = cctx.compress(original) @@ -82,9 +69,6 @@ def test_round_trip(self): def test_large_payload(self): """Verify zstd works with larger payloads (realistic JSON response).""" - zstandard = pytest.importorskip("zstandard") - # Build a realistic-ish JSON payload - import json events = [{"type": "NEW_PROCESS", "id": f"evt-{i}", "data": "x" * 100} for i in range(500)] original = json.dumps({"events": events}).encode() cctx = zstandard.ZstdCompressor() From 4751ac615e5092f168e3d8bf63e45d97f21f0fbd Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Mon, 23 Feb 2026 10:26:17 +0100 Subject: [PATCH 3/8] fix: Skip os.chown on Windows where it is not available os.chown and os.getuid are Unix-only. Guard with hasattr check so config file writing works on Windows. Co-Authored-By: Claude Opus 4.6 --- limacharlie/config.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/limacharlie/config.py b/limacharlie/config.py index 2a7fa099..079f9218 100644 --- a/limacharlie/config.py +++ b/limacharlie/config.py @@ -92,7 +92,10 @@ def save_config(config: dict[str, Any]) -> None: fd, tmp_path = tempfile.mkstemp() try: - os.chown(tmp_path, os.getuid(), os.getgid()) + # os.chown/os.getuid are Unix-only; skip on Windows where file + # ownership is managed by the OS via ACLs. + if hasattr(os, "chown"): + os.chown(tmp_path, os.getuid(), os.getgid()) os.chmod(tmp_path, stat.S_IWUSR | stat.S_IRUSR) # 0o600 try: os.write(fd, content) From 4747e6e320de0189610edc50cf631595ef857248 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Mon, 23 Feb 2026 10:26:23 +0100 Subject: [PATCH 4/8] fix: Fix GHA CI - use --version flag and cross-platform wheel install - limacharlie version -> limacharlie --version (correct CLI flag) - Use pip install --find-links instead of shell glob for wheel install (glob doesn't work on Windows PowerShell) Co-Authored-By: Claude Opus 4.6 --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ad93eda3..62143fd7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -62,10 +62,10 @@ jobs: run: | python -m pip install --upgrade pip setuptools wheel build python -m build - pip install dist/limacharlie-*-py3-none-any.whl + python -m pip install --find-links dist limacharlie - name: Verify CLI works - run: limacharlie version + run: limacharlie --version - name: Verify zstandard available after wheel install run: python -c "import zstandard; print(f'zstandard {zstandard.__version__} OK')" From 36627f05dfc5a9c03bce3f631eef0655874e2302 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Mon, 23 Feb 2026 11:25:23 +0100 Subject: [PATCH 5/8] fix: Skip Unix permission assertion on Windows in test_config.py Windows NTFS uses ACLs, not Unix permission bits. os.chmod(0o600) doesn't restrict access the same way on Windows, so the 0o777 mask assertion fails. Only assert file permissions on Unix platforms. Co-Authored-By: Claude Opus 4.6 --- tests/unit/test_config.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 9c2a3f53..2ad5787a 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -2,6 +2,7 @@ import os import stat +import sys import tempfile import pytest @@ -57,8 +58,11 @@ class TestSaveConfig: def test_creates_file_with_secure_permissions(self, tmp_config_file): save_config({"oid": "test-oid"}) assert os.path.isfile(tmp_config_file) - mode = os.stat(tmp_config_file).st_mode - assert mode & 0o777 == 0o600 + # Windows NTFS uses ACLs, not Unix permission bits - os.chmod(0o600) + # doesn't restrict access the same way. Only assert on Unix. + if sys.platform != "win32": + mode = os.stat(tmp_config_file).st_mode + assert mode & 0o777 == 0o600 def test_round_trips_data(self, tmp_config_file): data = {"oid": "abc-123", "api_key": "key-456", "env": {"prod": {"oid": "prod-oid"}}} From 4d9ad5582b19f24aa0b31a790f5804abe3290add Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Mon, 23 Feb 2026 11:40:27 +0100 Subject: [PATCH 6/8] feat: Graceful fallback when zstandard is unavailable zstandard is kept as a hard dependency (pre-built wheels for all major platforms), but the runtime now handles ImportError gracefully. If zstd can't be imported (exotic platform, no C compiler), Accept-Encoding falls back to "gzip, deflate" and zstd-encoded responses pass through as-is. This prevents pip install failures from making the entire SDK unusable. - Restore try/except ImportError guard in transport_compression.py - Add _HAS_ZSTD flag for runtime feature detection - Add tests for fallback path (mock zstandard away, verify behavior) - Add no-zstd-fallback CI job that uninstalls zstandard and verifies the SDK still works with gzip/deflate only Co-Authored-By: Claude Opus 4.6 --- .github/workflows/ci.yml | 44 ++++++++++++++++++++++++ limacharlie/transport_compression.py | 24 ++++++++++--- tests/unit/test_transport_compression.py | 43 +++++++++++++++++++++++ 3 files changed, 107 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 62143fd7..267e42a1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -82,3 +82,47 @@ jobs: assert result == original, 'zstd round-trip failed' print('zstd round-trip OK') " + + no-zstd-fallback: + name: "Fallback without zstandard - Python ${{ matrix.python-version }} / ${{ matrix.os }}" + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + python-version: ["3.10", "3.13"] + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install package then remove zstandard + run: | + python -m pip install --upgrade pip + pip install ".[dev]" + pip uninstall -y zstandard + + - name: Verify zstandard is NOT importable + run: python -c " + try: + import zstandard + raise AssertionError('zstandard should not be importable') + except ImportError: + print('zstandard correctly unavailable') + " + + - name: Verify fallback Accept-Encoding (no zstd) + run: | + python -c " + from limacharlie.transport_compression import ACCEPT_ENCODING, _HAS_ZSTD + assert not _HAS_ZSTD, '_HAS_ZSTD should be False' + assert 'zstd' not in ACCEPT_ENCODING, f'zstd should not be in: {ACCEPT_ENCODING}' + assert 'gzip' in ACCEPT_ENCODING, f'gzip missing from: {ACCEPT_ENCODING}' + print(f'Accept-Encoding (fallback): {ACCEPT_ENCODING}') + " + + - name: Run unit tests without zstandard + run: pytest -v --durations=10 tests/unit/ -k "not TestDecompressZstd" diff --git a/limacharlie/transport_compression.py b/limacharlie/transport_compression.py index 0aa01e63..7f90df8f 100644 --- a/limacharlie/transport_compression.py +++ b/limacharlie/transport_compression.py @@ -4,17 +4,29 @@ for HTTP responses. Supports zstd, gzip, and deflate. zstd is preferred because it offers better compression ratios and faster -decompression than gzip. +decompression than gzip. The zstandard package is a hard dependency but +the runtime gracefully falls back to gzip/deflate if it's unavailable +(e.g., exotic platform where the wheel couldn't be installed). """ from __future__ import annotations import zlib -import zstandard as _zstd +# zstandard is a hard dependency (listed in pyproject.toml) with pre-built +# wheels for all major platforms. However, we guard the import so the SDK +# still works if someone is on an exotic platform where the wheel isn't +# available and there's no C compiler to build from source. +try: + import zstandard as _zstd -# Header value sent on every request. Prefer zstd over gzip. -ACCEPT_ENCODING: str = "zstd, gzip, deflate" + _HAS_ZSTD = True +except ImportError: + _zstd = None # type: ignore[assignment] + _HAS_ZSTD = False + +# Header value sent on every request. Prefer zstd when available. +ACCEPT_ENCODING: str = "zstd, gzip, deflate" if _HAS_ZSTD else "gzip, deflate" def decompress_response(data: bytes, content_encoding: str | None) -> bytes: @@ -38,6 +50,10 @@ def decompress_response(data: bytes, content_encoding: str | None) -> bytes: encoding = content_encoding.strip().lower() if encoding == "zstd": + if not _HAS_ZSTD: + # Server sent zstd but we can't decompress - return as-is. + # JSON parsing will fail with a clear error downstream. + return data return _zstd.ZstdDecompressor().decompress(data) if encoding in ("gzip", "x-gzip"): diff --git a/tests/unit/test_transport_compression.py b/tests/unit/test_transport_compression.py index a3d98e29..0a1f4b4e 100644 --- a/tests/unit/test_transport_compression.py +++ b/tests/unit/test_transport_compression.py @@ -1,14 +1,18 @@ """Tests for limacharlie.transport_compression module.""" import gzip +import importlib import json +import sys import zlib +from unittest import mock import pytest import zstandard from limacharlie.transport_compression import ( ACCEPT_ENCODING, + _HAS_ZSTD, decompress_response, ) @@ -25,6 +29,45 @@ def test_zstd_preferred(self): assert ACCEPT_ENCODING.startswith("zstd") +class TestAcceptEncodingWithoutZstd: + def test_fallback_without_zstandard(self): + """When zstandard is not importable, fall back to gzip/deflate only.""" + import limacharlie.transport_compression as tc_mod + + # Temporarily make zstandard unimportable by removing it from + # sys.modules and patching the import machinery. + saved = sys.modules.pop("zstandard", None) + try: + with mock.patch.dict(sys.modules, {"zstandard": None}): + importlib.reload(tc_mod) + assert tc_mod._HAS_ZSTD is False + assert "zstd" not in tc_mod.ACCEPT_ENCODING + assert "gzip" in tc_mod.ACCEPT_ENCODING + assert "deflate" in tc_mod.ACCEPT_ENCODING + finally: + # Restore original module state + if saved is not None: + sys.modules["zstandard"] = saved + importlib.reload(tc_mod) + + def test_zstd_passthrough_when_unavailable(self): + """If server sends zstd but lib is missing, return raw bytes.""" + import limacharlie.transport_compression as tc_mod + + saved = sys.modules.pop("zstandard", None) + try: + with mock.patch.dict(sys.modules, {"zstandard": None}): + importlib.reload(tc_mod) + raw = b"some-zstd-compressed-bytes" + # Should passthrough without crashing + result = tc_mod.decompress_response(raw, "zstd") + assert result is raw + finally: + if saved is not None: + sys.modules["zstandard"] = saved + importlib.reload(tc_mod) + + class TestDecompressGzip: def test_round_trip(self): """Compress with gzip, decompress with our function.""" From b7b16f45e6a730054c0ec50bbbdb37e3b30691d2 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Mon, 23 Feb 2026 11:54:29 +0100 Subject: [PATCH 7/8] refactor: Move inline CI scripts to scripts/ci/, add edge case tests Extract inline python -c snippets from ci.yml into standalone scripts under scripts/ci/ for readability and easier local debugging. Add edge case tests for transport_compression: - Raw deflate (no zlib header) vs zlib-wrapped deflate - Case insensitivity for all encodings (zstd, gzip, deflate) - Whitespace around Content-Encoding header values - Empty data with/without encoding - _HAS_ZSTD flag assertion - zstd passthrough case insensitivity when lib unavailable Co-Authored-By: Claude Opus 4.6 --- .github/workflows/ci.yml | 40 ++-------- scripts/ci/verify_fallback_encoding.py | 8 ++ scripts/ci/verify_transport_compression.py | 6 ++ scripts/ci/verify_zstd_decompression.py | 13 ++++ scripts/ci/verify_zstd_installed.py | 5 ++ scripts/ci/verify_zstd_not_importable.py | 8 ++ tests/unit/test_transport_compression.py | 87 +++++++++++++++++++++- 7 files changed, 129 insertions(+), 38 deletions(-) create mode 100644 scripts/ci/verify_fallback_encoding.py create mode 100644 scripts/ci/verify_transport_compression.py create mode 100644 scripts/ci/verify_zstd_decompression.py create mode 100644 scripts/ci/verify_zstd_installed.py create mode 100644 scripts/ci/verify_zstd_not_importable.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 267e42a1..007438cd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,15 +29,10 @@ jobs: pip install ".[dev]" - name: Verify zstandard is installed - run: python -c "import zstandard; print(f'zstandard {zstandard.__version__} OK')" + run: python scripts/ci/verify_zstd_installed.py - name: Verify transport compression imports - run: | - python -c " - from limacharlie.transport_compression import ACCEPT_ENCODING, decompress_response - assert 'zstd' in ACCEPT_ENCODING, f'zstd missing from: {ACCEPT_ENCODING}' - print(f'Accept-Encoding: {ACCEPT_ENCODING}') - " + run: python scripts/ci/verify_transport_compression.py - name: Run unit tests run: pytest -v --durations=10 tests/unit/ @@ -68,20 +63,10 @@ jobs: run: limacharlie --version - name: Verify zstandard available after wheel install - run: python -c "import zstandard; print(f'zstandard {zstandard.__version__} OK')" + run: python scripts/ci/verify_zstd_installed.py - name: Verify zstd decompression works end-to-end - run: | - python -c " - import zstandard, json - from limacharlie.transport_compression import decompress_response - # Round-trip: compress then decompress - original = json.dumps({'events': [{'type': 'test'}]}).encode() - compressed = zstandard.ZstdCompressor().compress(original) - result = decompress_response(compressed, 'zstd') - assert result == original, 'zstd round-trip failed' - print('zstd round-trip OK') - " + run: python scripts/ci/verify_zstd_decompression.py no-zstd-fallback: name: "Fallback without zstandard - Python ${{ matrix.python-version }} / ${{ matrix.os }}" @@ -106,23 +91,10 @@ jobs: pip uninstall -y zstandard - name: Verify zstandard is NOT importable - run: python -c " - try: - import zstandard - raise AssertionError('zstandard should not be importable') - except ImportError: - print('zstandard correctly unavailable') - " + run: python scripts/ci/verify_zstd_not_importable.py - name: Verify fallback Accept-Encoding (no zstd) - run: | - python -c " - from limacharlie.transport_compression import ACCEPT_ENCODING, _HAS_ZSTD - assert not _HAS_ZSTD, '_HAS_ZSTD should be False' - assert 'zstd' not in ACCEPT_ENCODING, f'zstd should not be in: {ACCEPT_ENCODING}' - assert 'gzip' in ACCEPT_ENCODING, f'gzip missing from: {ACCEPT_ENCODING}' - print(f'Accept-Encoding (fallback): {ACCEPT_ENCODING}') - " + run: python scripts/ci/verify_fallback_encoding.py - name: Run unit tests without zstandard run: pytest -v --durations=10 tests/unit/ -k "not TestDecompressZstd" diff --git a/scripts/ci/verify_fallback_encoding.py b/scripts/ci/verify_fallback_encoding.py new file mode 100644 index 00000000..48bbd8ee --- /dev/null +++ b/scripts/ci/verify_fallback_encoding.py @@ -0,0 +1,8 @@ +"""Verify Accept-Encoding falls back to gzip/deflate when zstd is unavailable.""" + +from limacharlie.transport_compression import ACCEPT_ENCODING, _HAS_ZSTD + +assert not _HAS_ZSTD, "_HAS_ZSTD should be False" +assert "zstd" not in ACCEPT_ENCODING, f"zstd should not be in: {ACCEPT_ENCODING}" +assert "gzip" in ACCEPT_ENCODING, f"gzip missing from: {ACCEPT_ENCODING}" +print(f"Accept-Encoding (fallback): {ACCEPT_ENCODING}") diff --git a/scripts/ci/verify_transport_compression.py b/scripts/ci/verify_transport_compression.py new file mode 100644 index 00000000..1defe796 --- /dev/null +++ b/scripts/ci/verify_transport_compression.py @@ -0,0 +1,6 @@ +"""Verify transport compression module loads with zstd support.""" + +from limacharlie.transport_compression import ACCEPT_ENCODING + +assert "zstd" in ACCEPT_ENCODING, f"zstd missing from: {ACCEPT_ENCODING}" +print(f"Accept-Encoding: {ACCEPT_ENCODING}") diff --git a/scripts/ci/verify_zstd_decompression.py b/scripts/ci/verify_zstd_decompression.py new file mode 100644 index 00000000..1d1bab22 --- /dev/null +++ b/scripts/ci/verify_zstd_decompression.py @@ -0,0 +1,13 @@ +"""Verify zstd decompression works end-to-end (compress then decompress).""" + +import json + +import zstandard + +from limacharlie.transport_compression import decompress_response + +original = json.dumps({"events": [{"type": "test"}]}).encode() +compressed = zstandard.ZstdCompressor().compress(original) +result = decompress_response(compressed, "zstd") +assert result == original, "zstd round-trip failed" +print("zstd round-trip OK") diff --git a/scripts/ci/verify_zstd_installed.py b/scripts/ci/verify_zstd_installed.py new file mode 100644 index 00000000..b6380ffb --- /dev/null +++ b/scripts/ci/verify_zstd_installed.py @@ -0,0 +1,5 @@ +"""Verify zstandard is installed and importable.""" + +import zstandard + +print(f"zstandard {zstandard.__version__} OK") diff --git a/scripts/ci/verify_zstd_not_importable.py b/scripts/ci/verify_zstd_not_importable.py new file mode 100644 index 00000000..0a2e3c05 --- /dev/null +++ b/scripts/ci/verify_zstd_not_importable.py @@ -0,0 +1,8 @@ +"""Verify zstandard is NOT importable (used after pip uninstall).""" + +try: + import zstandard + + raise AssertionError("zstandard should not be importable") +except ImportError: + print("zstandard correctly unavailable") diff --git a/tests/unit/test_transport_compression.py b/tests/unit/test_transport_compression.py index 0a1f4b4e..a8db0d90 100644 --- a/tests/unit/test_transport_compression.py +++ b/tests/unit/test_transport_compression.py @@ -28,6 +28,10 @@ def test_zstd_preferred(self): """zstd should be listed first (highest priority).""" assert ACCEPT_ENCODING.startswith("zstd") + def test_has_zstd_flag_true(self): + """_HAS_ZSTD should be True when zstandard is importable.""" + assert _HAS_ZSTD is True + class TestAcceptEncodingWithoutZstd: def test_fallback_without_zstandard(self): @@ -67,6 +71,22 @@ def test_zstd_passthrough_when_unavailable(self): sys.modules["zstandard"] = saved importlib.reload(tc_mod) + def test_zstd_passthrough_case_insensitive_when_unavailable(self): + """Zstd passthrough should work regardless of header casing.""" + import limacharlie.transport_compression as tc_mod + + saved = sys.modules.pop("zstandard", None) + try: + with mock.patch.dict(sys.modules, {"zstandard": None}): + importlib.reload(tc_mod) + raw = b"compressed-bytes" + assert tc_mod.decompress_response(raw, "ZSTD") is raw + assert tc_mod.decompress_response(raw, "Zstd") is raw + finally: + if saved is not None: + sys.modules["zstandard"] = saved + importlib.reload(tc_mod) + class TestDecompressGzip: def test_round_trip(self): @@ -87,19 +107,49 @@ def test_case_insensitive(self): """Content-Encoding values should be case-insensitive.""" original = b'{"data": "test"}' compressed = gzip.compress(original) - result = decompress_response(compressed, "GZIP") + assert decompress_response(compressed, "GZIP") == original + assert decompress_response(compressed, "Gzip") == original + + def test_large_payload(self): + """Verify gzip works with larger payloads.""" + events = [{"type": "NEW_PROCESS", "id": f"evt-{i}", "data": "x" * 100} for i in range(500)] + original = json.dumps({"events": events}).encode() + compressed = gzip.compress(original) + result = decompress_response(compressed, "gzip") + assert result == original + + def test_whitespace_around_header(self): + """Leading/trailing whitespace in Content-Encoding should be stripped.""" + original = b'{"ok": true}' + compressed = gzip.compress(original) + result = decompress_response(compressed, " gzip ") assert result == original class TestDecompressDeflate: - def test_raw_deflate_round_trip(self): - """Compress with raw deflate, decompress with our function.""" + def test_zlib_wrapped_deflate(self): + """zlib.compress produces zlib-wrapped deflate - should decompress.""" original = b'{"sensors": []}' - # zlib.compress produces zlib-wrapped deflate, test that path compressed = zlib.compress(original) result = decompress_response(compressed, "deflate") assert result == original + def test_raw_deflate(self): + """Raw deflate (no zlib header) - should decompress via the try branch.""" + original = b'{"sensors": [{"sid": "test-sensor"}]}' + # Use wbits=-15 to produce raw deflate (no zlib header) + compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS) + compressed = compressor.compress(original) + compressor.flush() + result = decompress_response(compressed, "deflate") + assert result == original + + def test_case_insensitive(self): + """Content-Encoding: DEFLATE should work.""" + original = b'{"data": "test"}' + compressed = zlib.compress(original) + assert decompress_response(compressed, "DEFLATE") == original + assert decompress_response(compressed, "Deflate") == original + class TestDecompressZstd: def test_round_trip(self): @@ -119,6 +169,22 @@ def test_large_payload(self): result = decompress_response(compressed, "zstd") assert result == original + def test_case_insensitive(self): + """Content-Encoding: ZSTD should work.""" + original = b'{"data": "test"}' + cctx = zstandard.ZstdCompressor() + compressed = cctx.compress(original) + assert decompress_response(compressed, "ZSTD") == original + assert decompress_response(compressed, "Zstd") == original + + def test_whitespace_around_header(self): + """Leading/trailing whitespace in Content-Encoding should be stripped.""" + original = b'{"ok": true}' + cctx = zstandard.ZstdCompressor() + compressed = cctx.compress(original) + result = decompress_response(compressed, " zstd ") + assert result == original + class TestPassthrough: def test_none_encoding(self): @@ -144,3 +210,16 @@ def test_whitespace_encoding(self): data = b'{"events": []}' result = decompress_response(data, " ") assert result is data + + def test_empty_data_with_encoding(self): + """Empty bytes with a Content-Encoding should not crash.""" + # gzip of empty bytes is a valid gzip stream + compressed_empty = gzip.compress(b"") + result = decompress_response(compressed_empty, "gzip") + assert result == b"" + + def test_empty_data_no_encoding(self): + """Empty bytes with no encoding - passthrough.""" + data = b"" + result = decompress_response(data, None) + assert result is data From a579d685b8e55e05b7d9f3e2ce5bf427534990fa Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Mon, 23 Feb 2026 12:05:07 +0100 Subject: [PATCH 8/8] fix: Make tests collect without zstandard installed Top-level `import zstandard` in test_transport_compression.py caused pytest collection to fail when zstandard was uninstalled (no-zstd CI job). Tests that need zstandard now use local imports with pytest.mark.skipif(_HAS_ZSTD) or pytest.importorskip(). Tests that verify the no-zstd fallback run in both environments. - With zstandard: 50 passed, 2 skipped (no-zstd-only tests) - Without zstandard: 44 passed, 8 skipped (zstd-specific tests) Co-Authored-By: Claude Opus 4.6 --- .github/workflows/ci.yml | 2 +- tests/unit/test_client.py | 2 +- tests/unit/test_transport_compression.py | 32 +++++++++++++++++++++--- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 007438cd..bb60cbd3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -97,4 +97,4 @@ jobs: run: python scripts/ci/verify_fallback_encoding.py - name: Run unit tests without zstandard - run: pytest -v --durations=10 tests/unit/ -k "not TestDecompressZstd" + run: pytest -v --durations=10 tests/unit/ diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 1daba5f0..20a5b5f6 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -345,7 +345,7 @@ def test_request_decompresses_gzip_response(self, mock_urlopen): @patch("limacharlie.client.urlopen") def test_request_decompresses_zstd_response(self, mock_urlopen): """Response with Content-Encoding: zstd should be decompressed.""" - import zstandard + zstandard = pytest.importorskip("zstandard") jwt_response = MagicMock() jwt_response.read.return_value = json.dumps({"jwt": "test-jwt"}).encode() diff --git a/tests/unit/test_transport_compression.py b/tests/unit/test_transport_compression.py index a8db0d90..41491b92 100644 --- a/tests/unit/test_transport_compression.py +++ b/tests/unit/test_transport_compression.py @@ -8,7 +8,6 @@ from unittest import mock import pytest -import zstandard from limacharlie.transport_compression import ( ACCEPT_ENCODING, @@ -18,20 +17,36 @@ class TestAcceptEncoding: - def test_includes_all_algorithms(self): - """Accept-Encoding should advertise zstd, gzip, and deflate.""" - assert "zstd" in ACCEPT_ENCODING + def test_includes_gzip_and_deflate(self): + """Accept-Encoding should always advertise gzip and deflate.""" assert "gzip" in ACCEPT_ENCODING assert "deflate" in ACCEPT_ENCODING + @pytest.mark.skipif(not _HAS_ZSTD, reason="zstandard not installed") + def test_includes_zstd_when_available(self): + """Accept-Encoding should include zstd when zstandard is installed.""" + assert "zstd" in ACCEPT_ENCODING + + @pytest.mark.skipif(not _HAS_ZSTD, reason="zstandard not installed") def test_zstd_preferred(self): """zstd should be listed first (highest priority).""" assert ACCEPT_ENCODING.startswith("zstd") + @pytest.mark.skipif(not _HAS_ZSTD, reason="zstandard not installed") def test_has_zstd_flag_true(self): """_HAS_ZSTD should be True when zstandard is importable.""" assert _HAS_ZSTD is True + @pytest.mark.skipif(_HAS_ZSTD, reason="zstandard is installed") + def test_excludes_zstd_when_unavailable(self): + """Accept-Encoding should not include zstd when zstandard is missing.""" + assert "zstd" not in ACCEPT_ENCODING + + @pytest.mark.skipif(_HAS_ZSTD, reason="zstandard is installed") + def test_has_zstd_flag_false(self): + """_HAS_ZSTD should be False when zstandard is not importable.""" + assert _HAS_ZSTD is False + class TestAcceptEncodingWithoutZstd: def test_fallback_without_zstandard(self): @@ -151,9 +166,12 @@ def test_case_insensitive(self): assert decompress_response(compressed, "Deflate") == original +@pytest.mark.skipif(not _HAS_ZSTD, reason="zstandard not installed") class TestDecompressZstd: def test_round_trip(self): """Compress with zstandard, decompress with our function.""" + import zstandard + original = b'{"detects": [{"id": "d-1", "title": "suspicious"}]}' cctx = zstandard.ZstdCompressor() compressed = cctx.compress(original) @@ -162,6 +180,8 @@ def test_round_trip(self): def test_large_payload(self): """Verify zstd works with larger payloads (realistic JSON response).""" + import zstandard + events = [{"type": "NEW_PROCESS", "id": f"evt-{i}", "data": "x" * 100} for i in range(500)] original = json.dumps({"events": events}).encode() cctx = zstandard.ZstdCompressor() @@ -171,6 +191,8 @@ def test_large_payload(self): def test_case_insensitive(self): """Content-Encoding: ZSTD should work.""" + import zstandard + original = b'{"data": "test"}' cctx = zstandard.ZstdCompressor() compressed = cctx.compress(original) @@ -179,6 +201,8 @@ def test_case_insensitive(self): def test_whitespace_around_header(self): """Leading/trailing whitespace in Content-Encoding should be stripped.""" + import zstandard + original = b'{"ok": true}' cctx = zstandard.ZstdCompressor() compressed = cctx.compress(original)