diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index e872fcf2b..dbbb92197 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -26,6 +26,7 @@ from google.cloud.pubsub_v1.publisher import futures from google.cloud.pubsub_v1.publisher._batch import base from google.pubsub_v1 import types as gapic_types +import grpc if typing.TYPE_CHECKING: # pragma: NO COVER from google.cloud import pubsub_v1 @@ -118,6 +119,12 @@ def __init__( self._commit_retry = commit_retry self._commit_timeout = commit_timeout + self._enable_grpc_compression = ( + self.client.publisher_options.enable_grpc_compression + ) + self._compression_bytes_threshold = ( + self.client.publisher_options.compression_bytes_threshold + ) @staticmethod def make_lock() -> threading.Lock: @@ -269,6 +276,17 @@ def _commit(self) -> None: start = time.time() batch_transport_succeeded = True + + # Set compression if enabled. + compression = None + + if ( + self._enable_grpc_compression + and gapic_types.PublishRequest(messages=self._messages)._pb.ByteSize() + >= self._compression_bytes_threshold + ): + compression = grpc.Compression.Gzip + try: # Performs retries for errors defined by the retry configuration. response = self._client._gapic_publish( @@ -276,6 +294,7 @@ def _commit(self) -> None: messages=self._messages, retry=self._commit_retry, timeout=self._commit_timeout, + compression=compression, ) except google.api_core.exceptions.GoogleAPIError as exc: # We failed to publish, even after retries, so set the exception on diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 3e668533d..6487b5a83 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -153,6 +153,11 @@ def __init__( # The object controlling the message publishing flow self._flow_controller = FlowController(self.publisher_options.flow_control) + self._enable_grpc_compression = self.publisher_options.enable_grpc_compression + self._compression_bytes_threshold = ( + self.publisher_options.compression_bytes_threshold + ) + @classmethod def from_service_account_file( # type: ignore[override] cls, diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index 3d071a189..a90615f02 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -173,6 +173,9 @@ class PublisherOptions(NamedTuple): "Timeout settings for message publishing by the client. It should be " "compatible with :class:`~.pubsub_v1.types.TimeoutType`." ) + enable_grpc_compression: bool = False + + compression_bytes_threshold: int = 240 # Define the type class and default values for flow control settings. diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py index 1a92362c5..489aa3475 100644 --- a/google/pubsub_v1/services/publisher/client.py +++ b/google/pubsub_v1/services/publisher/client.py @@ -737,6 +737,7 @@ def publish( retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), + compression: grpc.Compression = None, ) -> pubsub.PublishResponse: r"""Adds one or more messages to the topic. Returns ``NOT_FOUND`` if the topic does not exist. @@ -833,6 +834,7 @@ def sample_publish(): retry=retry, timeout=timeout, metadata=metadata, + compression=compression, ) # Done; return the response. diff --git a/samples/snippets/main.py b/samples/snippets/main.py new file mode 100644 index 000000000..c26053c51 --- /dev/null +++ b/samples/snippets/main.py @@ -0,0 +1,5 @@ +from google.cloud import pubsub_v1 +pub_sub_client = pubsub_v1.PublisherClient() + +if __name__ == '__main__': + result = pub_sub_client.publish("projects/annaco-python-lib-test/topics/annaco-python-lib-test-topic", bytes("Some message here!", "utf8")).result() \ No newline at end of file diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index e2c63556c..45f71d71b 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -216,6 +216,76 @@ def callback(future: pubsub_v1.publisher.futures.Future) -> None: # [END pubsub_publisher_batch_settings] +def publish_messages_with_default_compression_threshold(project_id: str, topic_id: str) -> None: + """Publishes messages to a Pub/Sub topic with grpc compression enabled.""" + # [START pubsub_publisher_compression_settings] + from concurrent import futures + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + # Configure publisher with compression + publisher = pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True)) + topic_path = publisher.topic_path(project_id, topic_id) + publish_futures = [] + + # Resolve the publish future in a separate thread. + def callback(future: pubsub_v1.publisher.futures.Future) -> None: + message_id = future.result() + print(message_id) + + for n in range(1, 10): + data_str = f"Message number {n}" + # Data must be a bytestring + data = data_str.encode("utf-8") + publish_future = publisher.publish(topic_path, data) + # Non-blocking. Allow the publisher client to batch multiple messages. + publish_future.add_done_callback(callback) + publish_futures.append(publish_future) + + futures.wait(publish_futures, return_when=futures.ALL_COMPLETED) + + print(f"Published messages with compression settings to {topic_path}.") + # [END pubsub_publisher_compression_settings] + +def publish_messages_with_low_compression_threshold(project_id: str, topic_id: str) -> None: + """Publishes messages to a Pub/Sub topic with grpc compression enabled.""" + # [START pubsub_publisher_compression_settings] + from concurrent import futures + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + # Configure publisher with compression + publisher = pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True, compression_bytes_threshold=0)) + topic_path = publisher.topic_path(project_id, topic_id) + publish_futures = [] + + # Resolve the publish future in a separate thread. + def callback(future: pubsub_v1.publisher.futures.Future) -> None: + message_id = future.result() + print(message_id) + + for n in range(1, 10): + data_str = f"Message number {n}" + # Data must be a bytestring + data = data_str.encode("utf-8") + publish_future = publisher.publish(topic_path, data) + # Non-blocking. Allow the publisher client to batch multiple messages. + publish_future.add_done_callback(callback) + publish_futures.append(publish_future) + + futures.wait(publish_futures, return_when=futures.ALL_COMPLETED) + + print(f"Published messages with compression settings to {topic_path}.") + # [END pubsub_publisher_compression_settings] + + + def publish_messages_with_flow_control_settings(project_id: str, topic_id: str) -> None: """Publishes messages to a Pub/Sub topic with flow control settings.""" # [START pubsub_publisher_flow_control] diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index 0a6311308..f97605677 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -192,6 +192,24 @@ def test_publish_with_ordering_keys( assert f"Published messages with ordering keys to {topic_path}." in out +def test_publish_with_default_compression( + topic_path: str, capsys: CaptureFixture[str] +) -> None: + publisher.publish_messages_with_default_compression_threshold(PROJECT_ID, TOPIC_ID) + + out, _ = capsys.readouterr() + assert f"Published messages with compression settings to {topic_path}." in out + + +def test_publish_with_low_compression( + topic_path: str, capsys: CaptureFixture[str] +) -> None: + publisher.publish_messages_with_default_compression_threshold(PROJECT_ID, TOPIC_ID) + + out, _ = capsys.readouterr() + assert f"Published messages with compression settings to {topic_path}." in out + + def test_resume_publish_with_error_handler( topic_path: str, capsys: CaptureFixture[str] ) -> None: diff --git a/samples/snippets/sponge_log.xml -v b/samples/snippets/sponge_log.xml -v new file mode 100644 index 000000000..ce5f4a560 --- /dev/null +++ b/samples/snippets/sponge_log.xml -v @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/samples/snippets/sponge_log.xml -v -s b/samples/snippets/sponge_log.xml -v -s new file mode 100644 index 000000000..f79f07b64 --- /dev/null +++ b/samples/snippets/sponge_log.xml -v -s @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index 3fa94761c..3b1297365 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -61,6 +61,13 @@ def publisher_client() -> Generator[pubsub_v1.PublisherClient, None, None]: yield pubsub_v1.PublisherClient() +@pytest.fixture(scope="module") +def publisher_client_with_default_compression() -> Generator[pubsub_v1.PublisherClient, None, None]: + yield pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True)) + +@pytest.fixture(scope="module") +def publisher_client_with_low_compression() -> Generator[pubsub_v1.PublisherClient, None, None]: + yield pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True, compression_bytes_threshold=0)) @pytest.fixture(scope="module") def regional_publisher_client() -> Generator[pubsub_v1.PublisherClient, None, None]: @@ -974,6 +981,36 @@ def test_listen_for_errors( subscriber_client.delete_subscription(request={"subscription": subscription_path}) +def test_listen_for_errors_default_compression( + publisher_client_with_default_compression: pubsub_v1.PublisherClient, + topic: str, + subscription_async: str, + capsys: CaptureFixture[str], +) -> None: + _ = _publish_messages(publisher_client_with_default_compression, topic) + + subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) + + out, _ = capsys.readouterr() + assert subscription_async in out + assert "threw an exception" in out + + +def test_listen_for_errors_low_compression( + publisher_client_with_low_compression: pubsub_v1.PublisherClient, + topic: str, + subscription_async: str, + capsys: CaptureFixture[str], +) -> None: + _ = _publish_messages(publisher_client_with_low_compression, topic) + + subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) + + out, _ = capsys.readouterr() + assert subscription_async in out + assert "threw an exception" in out + + def test_receive_synchronously( subscriber_client: pubsub_v1.SubscriberClient, publisher_client: pubsub_v1.PublisherClient, diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 60658b4ce..2c8a6d65b 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -12,10 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. + import datetime import sys import threading import time +from typing import Sequence, Union # special case python < 3.8 if sys.version_info.major == 3 and sys.version_info.minor < 8: @@ -36,11 +38,12 @@ from google.cloud.pubsub_v1.publisher._batch import thread from google.cloud.pubsub_v1.publisher._batch.thread import Batch from google.pubsub_v1 import types as gapic_types +import grpc -def create_client(): +def create_client(client_options: Union[types.PublisherOptions, Sequence] = ()): creds = mock.Mock(spec=credentials.Credentials) - return publisher.Client(credentials=creds) + return publisher.Client(credentials=creds, publisher_options=client_options) def create_batch( @@ -49,6 +52,7 @@ def create_batch( commit_when_full=True, commit_retry=gapic_v1.method.DEFAULT, commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, + client_options: Union[types.PublisherOptions, Sequence] = (), **batch_settings ): """Return a batch object suitable for testing. @@ -63,13 +67,15 @@ def create_batch( for the batch commit call. commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`): The timeout to apply to the batch commit call. + client_options (Union[types.PublisherOptions, Sequence]): Arguments passed on + to the :class ``~.pubsub_v1.types.publisher.Client`` constructor. batch_settings (Mapping[str, str]): Arguments passed on to the :class:``~.pubsub_v1.types.BatchSettings`` constructor. Returns: ~.pubsub_v1.publisher.batch.thread.Batch: A batch object. """ - client = create_client() + client = create_client(client_options) settings = types.BatchSettings(**batch_settings) return Batch( client, @@ -96,6 +102,14 @@ def test_client(): assert batch.client is client +def test_client_with_compression(): + client = create_client(types.PublisherOptions(enable_grpc_compression=True)) + settings = types.BatchSettings() + batch = Batch(client, "topic_name", settings) + assert batch.client is client + assert batch.client._enable_grpc_compression + + def test_commit(): batch = create_batch() @@ -149,6 +163,125 @@ def test_blocking__commit(): ], retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, + compression=None, + ) + + # Establish that all of the futures are done, and that they have the + # expected values. + assert futures[0].done() + assert futures[0].result() == "a" + assert futures[1].done() + assert futures[1].result() == "b" + + +def test_blocking__commit_with_compression_at_zero_bytes(): + batch = create_batch( + client_options=types.PublisherOptions( + enable_grpc_compression=True, compression_bytes_threshold=0 + ) + ) + futures = ( + batch.publish({"data": b"This is my message."}), + batch.publish({"data": b"This is another message."}), + ) + + # Set up the underlying API publish method to return a PublishResponse. + publish_response = gapic_types.PublishResponse(message_ids=["a", "b"]) + patch = mock.patch.object( + type(batch.client), "_gapic_publish", return_value=publish_response + ) + with patch as publish: + batch._commit() + + # Establish that the underlying API call was made with expected + # arguments. + publish.assert_called_once_with( + topic="topic_name", + messages=[ + gapic_types.PubsubMessage(data=b"This is my message."), + gapic_types.PubsubMessage(data=b"This is another message."), + ], + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + compression=grpc.Compression.Gzip, + ) + + # Establish that all of the futures are done, and that they have the + # expected values. + assert futures[0].done() + assert futures[0].result() == "a" + assert futures[1].done() + assert futures[1].result() == "b" + + +def test_blocking__commit_with_disabled_compression_at_zero_bytes(): + batch = create_batch( + client_options=types.PublisherOptions( + enable_grpc_compression=False, compression_bytes_threshold=0 + ) + ) + futures = ( + batch.publish({"data": b"This is my message."}), + batch.publish({"data": b"This is another message."}), + ) + + # Set up the underlying API publish method to return a PublishResponse. + publish_response = gapic_types.PublishResponse(message_ids=["a", "b"]) + patch = mock.patch.object( + type(batch.client), "_gapic_publish", return_value=publish_response + ) + with patch as publish: + batch._commit() + + # Establish that the underlying API call was made with expected + # arguments. + publish.assert_called_once_with( + topic="topic_name", + messages=[ + gapic_types.PubsubMessage(data=b"This is my message."), + gapic_types.PubsubMessage(data=b"This is another message."), + ], + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + compression=None, + ) + + # Establish that all of the futures are done, and that they have the + # expected values. + assert futures[0].done() + assert futures[0].result() == "a" + assert futures[1].done() + assert futures[1].result() == "b" + + +def test_blocking__commit_with_compression_at_default(): + batch = create_batch( + client_options=types.PublisherOptions(enable_grpc_compression=True) + ) + futures = ( + batch.publish({"data": b"This is my message."}), + batch.publish({"data": b"This is another message."}), + ) + + # Set up the underlying API publish method to return a PublishResponse. + publish_response = gapic_types.PublishResponse(message_ids=["a", "b"]) + patch = mock.patch.object( + type(batch.client), "_gapic_publish", return_value=publish_response + ) + with patch as publish: + batch._commit() + + # Establish that the underlying API call was made with expected + # arguments. + publish.assert_called_once_with( + topic="topic_name", + messages=[ + gapic_types.PubsubMessage(data=b"This is my message."), + gapic_types.PubsubMessage(data=b"This is another message."), + ], + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + compression=None, ) # Establish that all of the futures are done, and that they have the @@ -178,6 +311,7 @@ def test_blocking__commit_custom_retry(): messages=[gapic_types.PubsubMessage(data=b"This is my message.")], retry=mock.sentinel.custom_retry, timeout=gapic_v1.method.DEFAULT, + compression=None, ) @@ -200,6 +334,7 @@ def test_blocking__commit_custom_timeout(): messages=[gapic_types.PubsubMessage(data=b"This is my message.")], retry=gapic_v1.method.DEFAULT, timeout=mock.sentinel.custom_timeout, + compression=None, ) @@ -207,7 +342,9 @@ def test_client_api_publish_not_blocking_additional_publish_calls(): batch = create_batch(max_messages=1) api_publish_called = threading.Event() - def api_publish_delay(topic="", messages=(), retry=None, timeout=None): + def api_publish_delay( + topic="", messages=(), retry=None, timeout=None, compression=None + ): api_publish_called.set() time.sleep(1.0) message_ids = [str(i) for i in range(len(messages))] diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 6c68c3943..fd928f329 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -399,7 +399,9 @@ def test_publish_custom_retry_overrides_configured_retry(creds): client.publish(topic, b"hello!", retry=mock.sentinel.custom_retry) fake_sequencer.publish.assert_called_once_with( - mock.ANY, retry=mock.sentinel.custom_retry, timeout=mock.ANY + mock.ANY, + retry=mock.sentinel.custom_retry, + timeout=mock.ANY, ) message = fake_sequencer.publish.call_args.args[0] assert message.data == b"hello!" @@ -418,7 +420,9 @@ def test_publish_custom_timeout_overrides_configured_timeout(creds): client.publish(topic, b"hello!", timeout=mock.sentinel.custom_timeout) fake_sequencer.publish.assert_called_once_with( - mock.ANY, retry=mock.ANY, timeout=mock.sentinel.custom_timeout + mock.ANY, + retry=mock.ANY, + timeout=mock.sentinel.custom_timeout, ) message = fake_sequencer.publish.call_args.args[0] assert message.data == b"hello!"