From 38fae029be8435c9deeffc7fd32ff568e324f1c4 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 19 Dec 2025 19:58:03 +0800 Subject: [PATCH 1/6] Support encryption context on Message --- pulsar/__init__.py | 125 +++++++++++++++++++++++++++++++++++++++++++ src/message.cc | 17 +++++- tests/pulsar_test.py | 42 ++++++++++++--- 3 files changed, 177 insertions(+), 7 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 543cd0d..7278bde 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -166,6 +166,122 @@ def wrap(cls, msg_id: _pulsar.MessageId): self._msg_id = msg_id return self + +class EncryptionKey: + """ + The key used for encryption. + """ + + def __init__(self, key: _pulsar.EncryptionKey): + """ + Create EncryptionKey instance. + + Parameters + ---------- + key: _pulsar.EncryptionKey + The underlying EncryptionKey instance from the C extension. + """ + self._key = key + + @property + def key(self) -> str: + """ + Returns the key, which is usually the key file's name. + """ + return self._key.key + + @property + def value(self) -> bytes: + """ + Returns the value, which is usually the key bytes used for encryption. + """ + return self._key.value() + + @property + def metadata(self) -> dict: + """ + Returns the metadata associated with the key. + """ + return self._key.metadata + + def __str__(self) -> str: + return f"EncryptionKey(key={self.key}, value_len={len(self.value)}, metadata={self.metadata})" + + def __repr__(self) -> str: + return self.__str__() + + +class EncryptionContext: + """ + It contains encryption and compression information in it using which application can decrypt + consumed message with encrypted-payload. + """ + + def __init__(self, context: _pulsar.EncryptionContext): + """ + Create EncryptionContext instance. + + Parameters + ---------- + key: _pulsar.EncryptionContext + The underlying EncryptionContext instance from the C extension. + """ + self._context = context + + def keys(self) -> List[EncryptionKey]: + """ + Returns all EncryptionKey instances when performing encryption. + """ + keys = self._context.keys() + return [EncryptionKey(key) for key in keys] + + def param(self) -> bytes: + """ + Returns the encryption param bytes. + """ + return self._context.param() + + def algorithm(self) -> str: + """ + Returns the encryption algorithm. + """ + return self._context.algorithm() + + def compression_type(self) -> CompressionType: + """ + Returns the compression type of the message. + """ + return self._context.compression_type() + + def uncompressed_message_size(self) -> int: + """ + Returns the uncompressed message size or 0 if the compression type is NONE. + """ + return self._context.uncompressed_message_size() + + def batch_size(self) -> int: + """ + Returns the number of messages in the batch or -1 if the message is not batched. + """ + return self._context.batch_size() + + def is_decryption_failed(self) -> bool: + """ + Returns whether decryption has failed for this message. + """ + return self._context.is_decryption_failed() + + def __str__(self) -> str: + return f"EncryptionContext(algorithm={self.algorithm()}, " \ + f"compression_type={self.compression_type().name}, " \ + f"uncompressed_message_size={self.uncompressed_message_size()}, " \ + f"is_decryption_failed={self.is_decryption_failed()}, " \ + f"keys=[{', '.join(str(key) for key in self.keys())}])" + + def __repr__(self) -> str: + return self.__str__() + + class Message: """ Message objects are returned by a consumer, either by calling `receive` or @@ -250,6 +366,15 @@ def producer_name(self) -> str: """ return self._message.producer_name() + def encryption_context(self) -> EncryptionContext: + """ + Get the encryption context for this message or None if it's not encrypted. + + It should be noted that the result should not be accessed after the current Message instance is deleted. + """ + context = self._message.encryption_context() + return None if context is None else EncryptionContext(context) + @staticmethod def _wrap(_message): self = Message() diff --git a/src/message.cc b/src/message.cc index e18861a..f3247e6 100644 --- a/src/message.cc +++ b/src/message.cc @@ -86,6 +86,20 @@ void export_message(py::module_& m) { }) .def_static("deserialize", &MessageId::deserialize); + class_(m, "EncryptionKey") + .def_readonly("key", &EncryptionKey::key) + .def("value", [](const EncryptionKey& key) { return bytes(key.value); }) + .def_readonly("metadata", &EncryptionKey::metadata); + + class_(m, "EncryptionContext") + .def("keys", &EncryptionContext::keys) + .def("param", [](const EncryptionContext& context) { return bytes(context.param()); }) + .def("algorithm", &EncryptionContext::algorithm, return_value_policy::copy) + .def("compression_type", &EncryptionContext::compressionType) + .def("uncompressed_message_size", &EncryptionContext::uncompressedMessageSize) + .def("batch_size", &EncryptionContext::batchSize) + .def("is_decryption_failed", &EncryptionContext::isDecryptionFailed); + class_(m, "Message") .def(init<>()) .def("properties", &Message::getProperties) @@ -106,7 +120,8 @@ void export_message(py::module_& m) { .def("redelivery_count", &Message::getRedeliveryCount) .def("int_schema_version", &Message::getLongSchemaVersion) .def("schema_version", &Message::getSchemaVersion, return_value_policy::copy) - .def("producer_name", &Message::getProducerName, return_value_policy::copy); + .def("producer_name", &Message::getProducerName, return_value_policy::copy) + .def("encryption_context", &Message::getEncryptionContext, return_value_policy::reference); MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload, uint32_t batchSize) = &MessageBatch::parseFrom; diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index 3603d84..b5f0699 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -158,7 +158,7 @@ def send_callback(producer, msg): def test_producer_send(self): client = Client(self.serviceUrl) - topic = "test_producer_send" + topic = f"test_producer_send_{time.time()}" producer = client.create_producer(topic) consumer = client.subscribe(topic, "sub-name") msg_id = producer.send(b"hello") @@ -167,6 +167,7 @@ def test_producer_send(self): consumer.acknowledge(msg) print("receive from {}".format(msg.message_id())) self.assertEqual(msg_id, msg.message_id()) + self.assertIsNone(msg.encryption_context()) client.close() def test_producer_access_mode_exclusive(self): @@ -489,15 +490,37 @@ def test_encryption_failure(self): client = Client(self.serviceUrl) topic = "my-python-test-end-to-end-encryption-failure-" + str(time.time()) producer = client.create_producer( - topic=topic, encryption_key="client-rsa.pem", crypto_key_reader=crypto_key_reader + topic=topic, encryption_key="client-rsa.pem", crypto_key_reader=crypto_key_reader, + compression_type=CompressionType.LZ4 ) producer.send(b"msg-0") + enc_key = None + def verify_encryption_context(context: pulsar.EncryptionContext, failed: bool, batch_size: int): + nonlocal enc_key + keys = context.keys() + self.assertEqual(len(keys), 1) + key = keys[0] + self.assertEqual(key.key, "client-rsa.pem") + self.assertTrue(len(key.value) > 0) + if enc_key is None: + enc_key = key.value + else: + self.assertEqual(key.value, enc_key) + self.assertEqual(key.metadata, {}) + self.assertTrue(len(context.param()) > 0) + self.assertEqual(context.algorithm(), "") + self.assertEqual(context.compression_type(), CompressionType.LZ4) + self.assertEqual(context.uncompressed_message_size(), len(b"msg-0")) + self.assertEqual(context.batch_size(), batch_size) + self.assertEqual(context.is_decryption_failed(), failed) + def verify_next_message(value: bytes): consumer = client.subscribe(topic, subscription, crypto_key_reader=crypto_key_reader) msg = consumer.receive(3000) self.assertEqual(msg.data(), value) + verify_encryption_context(msg.encryption_context(), False, -1) consumer.acknowledge(msg) consumer.close() @@ -520,6 +543,15 @@ def verify_next_message(value: bytes): producer.send(b"msg-2") verify_next_message(b"msg-2") # msg-1 is skipped since the crypto failure action is DISCARD + producer.send_async(b"msg-3", None) + producer.send_async(b"msg-4", None) + producer.flush() + + def verify_undecrypted_message(msg: pulsar.Message, i: int): + self.assertNotEqual(msg.data(), f"msg-{i}".encode()) + self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}") + batch_size = 2 if i >=3 else -1 + verify_encryption_context(msg.encryption_context(), True, batch_size) # Encrypted messages will be consumed since the crypto failure action is CONSUME consumer = client.subscribe(topic, 'another-sub', @@ -527,15 +559,13 @@ def verify_next_message(value: bytes): crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME) for i in range(3): msg = consumer.receive(3000) - self.assertNotEqual(msg.data(), f"msg-{i}".encode()) - self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}") + verify_undecrypted_message(msg, i) reader = client.create_reader(topic, MessageId.earliest, crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME) for i in range(3): msg = reader.read_next(3000) - self.assertNotEqual(msg.data(), f"msg-{i}".encode()) - self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}") + verify_undecrypted_message(msg, i) client.close() From a9e78126d0e9a3d293daca8521bd9263d815d587 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 19 Dec 2025 19:59:28 +0800 Subject: [PATCH 2/6] revert unnecessary change --- tests/pulsar_test.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index b5f0699..d8fe0d7 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -158,7 +158,7 @@ def send_callback(producer, msg): def test_producer_send(self): client = Client(self.serviceUrl) - topic = f"test_producer_send_{time.time()}" + topic = "test_producer_send" producer = client.create_producer(topic) consumer = client.subscribe(topic, "sub-name") msg_id = producer.send(b"hello") @@ -550,8 +550,7 @@ def verify_next_message(value: bytes): def verify_undecrypted_message(msg: pulsar.Message, i: int): self.assertNotEqual(msg.data(), f"msg-{i}".encode()) self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}") - batch_size = 2 if i >=3 else -1 - verify_encryption_context(msg.encryption_context(), True, batch_size) + verify_encryption_context(msg.encryption_context(), True, 2 if i >= 3 else -1) # Encrypted messages will be consumed since the crypto failure action is CONSUME consumer = client.subscribe(topic, 'another-sub', From 315ae48c2a34e973e35afe4206b5b2914b33bf4d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 23 Dec 2025 15:05:01 +0800 Subject: [PATCH 3/6] fix tests --- tests/pulsar_test.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index d8fe0d7..fc38e39 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -495,23 +495,20 @@ def test_encryption_failure(self): ) producer.send(b"msg-0") - enc_key = None def verify_encryption_context(context: pulsar.EncryptionContext, failed: bool, batch_size: int): - nonlocal enc_key keys = context.keys() self.assertEqual(len(keys), 1) key = keys[0] self.assertEqual(key.key, "client-rsa.pem") self.assertTrue(len(key.value) > 0) - if enc_key is None: - enc_key = key.value - else: - self.assertEqual(key.value, enc_key) self.assertEqual(key.metadata, {}) self.assertTrue(len(context.param()) > 0) self.assertEqual(context.algorithm(), "") self.assertEqual(context.compression_type(), CompressionType.LZ4) - self.assertEqual(context.uncompressed_message_size(), len(b"msg-0")) + if batch_size == 1: + self.assertEqual(context.uncompressed_message_size(), len(b"msg-0")) + else: + self.assertGreater(context.uncompressed_message_size(), len(b"msg-0")) self.assertEqual(context.batch_size(), batch_size) self.assertEqual(context.is_decryption_failed(), failed) @@ -543,6 +540,16 @@ def verify_next_message(value: bytes): producer.send(b"msg-2") verify_next_message(b"msg-2") # msg-1 is skipped since the crypto failure action is DISCARD + producer.close() + + # send batched messages + producer = client.create_producer( + topic=topic, + encryption_key="client-rsa.pem", + crypto_key_reader=crypto_key_reader, + compression_type=CompressionType.LZ4, + batching_enabled=True, + ) producer.send_async(b"msg-3", None) producer.send_async(b"msg-4", None) producer.flush() @@ -553,16 +560,18 @@ def verify_undecrypted_message(msg: pulsar.Message, i: int): verify_encryption_context(msg.encryption_context(), True, 2 if i >= 3 else -1) # Encrypted messages will be consumed since the crypto failure action is CONSUME + # Only 4 messages can be received because msg-3 and msg-4 are sent in batch and they are delivered + # as a single message when decryption fails. consumer = client.subscribe(topic, 'another-sub', initial_position=InitialPosition.Earliest, crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME) - for i in range(3): + for i in range(4): msg = consumer.receive(3000) verify_undecrypted_message(msg, i) reader = client.create_reader(topic, MessageId.earliest, crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME) - for i in range(3): + for i in range(4): msg = reader.read_next(3000) verify_undecrypted_message(msg, i) From 4f158f697b79419ab7b371e560452cdd3d313c52 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 23 Dec 2025 15:07:23 +0800 Subject: [PATCH 4/6] fix docs --- pulsar/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 7278bde..2375d16 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -223,7 +223,7 @@ def __init__(self, context: _pulsar.EncryptionContext): Parameters ---------- - key: _pulsar.EncryptionContext + context: _pulsar.EncryptionContext The underlying EncryptionContext instance from the C extension. """ self._context = context @@ -366,7 +366,7 @@ def producer_name(self) -> str: """ return self._message.producer_name() - def encryption_context(self) -> EncryptionContext: + def encryption_context(self) -> EncryptionContext | None: """ Get the encryption context for this message or None if it's not encrypted. From 3a05b8ecb8c9acc834b1fd1871ac93806bd09171 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 23 Dec 2025 15:15:24 +0800 Subject: [PATCH 5/6] fix tests --- tests/pulsar_test.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index fc38e39..ea4fd06 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -495,7 +495,9 @@ def test_encryption_failure(self): ) producer.send(b"msg-0") - def verify_encryption_context(context: pulsar.EncryptionContext, failed: bool, batch_size: int): + def verify_encryption_context(context: pulsar.EncryptionContext | None, failed: bool, batch_size: int): + if context is None: + self.fail("Encryption context is None") keys = context.keys() self.assertEqual(len(keys), 1) key = keys[0] @@ -505,7 +507,7 @@ def verify_encryption_context(context: pulsar.EncryptionContext, failed: bool, b self.assertTrue(len(context.param()) > 0) self.assertEqual(context.algorithm(), "") self.assertEqual(context.compression_type(), CompressionType.LZ4) - if batch_size == 1: + if batch_size == -1: self.assertEqual(context.uncompressed_message_size(), len(b"msg-0")) else: self.assertGreater(context.uncompressed_message_size(), len(b"msg-0")) From 2cfb4130ced9a9473c68d175a04f95892e3c9966 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 23 Dec 2025 15:17:55 +0800 Subject: [PATCH 6/6] improve tests --- tests/pulsar_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index ea4fd06..b7f38ed 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -502,9 +502,9 @@ def verify_encryption_context(context: pulsar.EncryptionContext | None, failed: self.assertEqual(len(keys), 1) key = keys[0] self.assertEqual(key.key, "client-rsa.pem") - self.assertTrue(len(key.value) > 0) + self.assertGreater(len(key.value), 0) self.assertEqual(key.metadata, {}) - self.assertTrue(len(context.param()) > 0) + self.assertGreater(len(context.param()), 0) self.assertEqual(context.algorithm(), "") self.assertEqual(context.compression_type(), CompressionType.LZ4) if batch_size == -1: @@ -558,7 +558,7 @@ def verify_next_message(value: bytes): def verify_undecrypted_message(msg: pulsar.Message, i: int): self.assertNotEqual(msg.data(), f"msg-{i}".encode()) - self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}") + self.assertGreater(len(msg.data()), 5, f"msg.data() is {msg.data()}") verify_encryption_context(msg.encryption_context(), True, 2 if i >= 3 else -1) # Encrypted messages will be consumed since the crypto failure action is CONSUME