diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 7c5e4f31f..3c7cb9418 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -1222,14 +1222,38 @@ def set_done(self, exception=None): self._done_future.set_exception(exception) +class PublishAcknowledgementControlHandle: + """Opaque handle for manually controlling the publish acknowledgement for a received QoS 1 PUBLISH. + + Obtained by calling acquire_publish_acknowledgement_control() within the on_publish_callback_fn callback. + Pass to Client.invoke_publish_acknowledgement() to send the publish acknowledgement to the broker. + """ + + def __init__(self, control_id: int): + self._control_id = control_id + + @dataclass class PublishReceivedData: """Dataclass containing data related to a Publish Received Callback Args: publish_packet (PublishPacket): Data model of an `MQTT5 PUBLISH `_ packet. + acquire_publish_acknowledgement_control (Callable): For QoS 1 messages only: call this function within the + on_publish_callback_fn callback to take manual control of the publish acknowledgement for this message, + preventing the client from automatically sending a publish acknowledgement. Returns a + :class:`PublishAcknowledgementControlHandle` that can be passed to + Client.invoke_publish_acknowledgement() to send the publish acknowledgement to the broker. + + Important: This function must be called within the on_publish_callback_fn callback. Calling it after the + callback returns will raise a RuntimeError. This function may only be called once per received PUBLISH; + calling it a second time will also raise a RuntimeError. If this function is not called, the client will + automatically send a publish acknowledgement for QoS 1 messages when the callback returns. + + For QoS 0 messages, this field is None. """ publish_packet: PublishPacket = None + acquire_publish_acknowledgement_control: Callable = None @dataclass @@ -1434,9 +1458,11 @@ def _on_publish( correlation_data, subscription_identifiers_tuples, content_type, - user_properties_tuples): + user_properties_tuples, + puback_control_id): if self._on_publish_cb is None: - return + # Indicates that manual puback control was not taken and puback should be invoked automatically. + return False publish_packet = PublishPacket() publish_packet.topic = topic @@ -1468,9 +1494,40 @@ def _on_publish( publish_packet.content_type = content_type publish_packet.user_properties = _init_user_properties(user_properties_tuples) - self._on_publish_cb(PublishReceivedData(publish_packet=publish_packet)) + # For QoS 1 messages, set up the acquire_publish_acknowledgement_control callable. + # The native side has already called aws_mqtt5_client_acquire_publish_acknowledgement and passed us the + # puback_control_id. We wrap it in an opaque capsule handle and provide a callable that + # returns that handle. The callable may only be called once; calling it marks the publish + # acknowledgement as "taken" so the native side will not auto-invoke it after this callback returns. + puback_taken = False + callback_active = True + acquire_publish_acknowledgement_control = None + + if puback_control_id != 0: + def acquire_publish_acknowledgement_control(): + nonlocal puback_taken + if puback_taken: + raise RuntimeError( + "acquire_publish_acknowledgement_control() may only be called once per received PUBLISH.") + if not callback_active: + raise RuntimeError( + "acquire_publish_acknowledgement_control() must be called within the on_publish_callback_fn callback.") + puback_taken = True + return PublishAcknowledgementControlHandle(puback_control_id) + + # Create PublishReceivedData with the acquire_publish_acknowledgement_control callable (or None for QoS 0) + publish_data = PublishReceivedData( + publish_packet=publish_packet, + acquire_publish_acknowledgement_control=acquire_publish_acknowledgement_control + ) - return + self._on_publish_cb(publish_data) + + callback_active = False + # Return True if the user called acquire_publish_acknowledgement_control(), signalling to the native side + # that it should NOT auto-invoke the publish acknowledgement (the user is responsible for calling + # invoke_publish_acknowledgement). + return puback_taken def _on_lifecycle_stopped(self): if self._on_lifecycle_stopped_cb: @@ -1957,6 +2014,27 @@ def get_stats(self): result = _awscrt.mqtt5_client_get_stats(self._binding) return OperationStatisticsData(result[0], result[1], result[2], result[3]) + def invoke_publish_acknowledgement( + self, publish_acknowledgement_control_handle: 'PublishAcknowledgementControlHandle'): + """Sends a publish acknowledgement for a QoS 1 PUBLISH that was previously acquired for manual control. + + To use manual publish acknowledgement control, call acquire_publish_acknowledgement_control() within + the on_publish_callback_fn callback to obtain a :class:`PublishAcknowledgementControlHandle`. Then call + this method to send the publish acknowledgement. + + Args: + publish_acknowledgement_control_handle (PublishAcknowledgementControlHandle): An opaque handle obtained + from acquire_publish_acknowledgement_control() within PublishReceivedData. + + Raises: + Exception: If the native client returns an error when invoking the publish acknowledgement. + """ + + _awscrt.mqtt5_client_invoke_publish_acknowledgement( + self._binding, + publish_acknowledgement_control_handle._control_id + ) + def new_connection(self, on_connection_interrupted=None, on_connection_resumed=None, on_connection_success=None, on_connection_failure=None, on_connection_closed=None): from awscrt.mqtt import Connection diff --git a/crt/aws-c-mqtt b/crt/aws-c-mqtt index 41b6a7d6d..3c2ceee52 160000 --- a/crt/aws-c-mqtt +++ b/crt/aws-c-mqtt @@ -1 +1 @@ -Subproject commit 41b6a7d6d566a56eff69743df66c077d56a80c9d +Subproject commit 3c2ceee52b66db42228053a4fb55210c8f8433a0 diff --git a/source/module.c b/source/module.c index 0b752e03d..1228ed7ae 100644 --- a/source/module.c +++ b/source/module.c @@ -810,6 +810,7 @@ static PyMethodDef s_module_methods[] = { AWS_PY_METHOD_DEF(mqtt5_client_subscribe, METH_VARARGS), AWS_PY_METHOD_DEF(mqtt5_client_unsubscribe, METH_VARARGS), AWS_PY_METHOD_DEF(mqtt5_client_get_stats, METH_VARARGS), + AWS_PY_METHOD_DEF(mqtt5_client_invoke_publish_acknowledgement, METH_VARARGS), AWS_PY_METHOD_DEF(mqtt5_ws_handshake_transform_complete, METH_VARARGS), /* MQTT Request Response Client */ diff --git a/source/mqtt5_client.c b/source/mqtt5_client.c index 243af6a0e..ae4ccf0c8 100644 --- a/source/mqtt5_client.c +++ b/source/mqtt5_client.c @@ -234,10 +234,26 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu PyObject *result = NULL; PyObject *subscription_identifier_list = NULL; PyObject *user_properties_list = NULL; + PyObject *puback_control_id_py = NULL; size_t subscription_identifier_count = publish_packet->subscription_identifier_count; size_t user_property_count = publish_packet->user_property_count; + /* For QoS 1 messages, take manual control of the publish acknowledgement immediately. + * This gives us a pub_ack_control_id that we pass to Python as an opaque integer. + * Python's _on_publish will return True if the user called acquire_publish_acknowledgement_control(), + * in which case the user is responsible for calling invoke_publish_acknowledgement() later. + * If _on_publish returns False/None, we auto-invoke the publish acknowledgement here. */ + uint64_t puback_control_id = 0; + if (publish_packet->qos == AWS_MQTT5_QOS_AT_LEAST_ONCE) { + puback_control_id = aws_mqtt5_client_acquire_publish_acknowledgement(client->native, publish_packet); + puback_control_id_py = PyLong_FromUnsignedLongLong((unsigned long long)puback_control_id); + if (!puback_control_id_py) { + PyErr_WriteUnraisable(PyErr_Occurred()); + goto cleanup; + } + } + /* Create list of uint32_t subscription identifier tuples */ subscription_identifier_list = PyList_New(subscription_identifier_count); if (!subscription_identifier_list) { @@ -261,7 +277,7 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu result = PyObject_CallMethod( client->client_core, "_on_publish", - "(y#iOs#OiOIOHs#y#Os#O)", + "(y#iOs#OiOIOHs#y#Os#OK)", /* y */ publish_packet->payload.ptr, /* # */ publish_packet->payload.len, /* i */ (int)publish_packet->qos, @@ -284,15 +300,29 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu /* O */ subscription_identifier_count > 0 ? subscription_identifier_list : Py_None, /* s */ publish_packet->content_type ? publish_packet->content_type->ptr : NULL, /* # */ publish_packet->content_type ? publish_packet->content_type->len : 0, - /* O */ user_property_count > 0 ? user_properties_list : Py_None); + /* O */ user_property_count > 0 ? user_properties_list : Py_None, + /* K */ (unsigned long long)puback_control_id); + if (!result) { PyErr_WriteUnraisable(PyErr_Occurred()); + /* On error, auto-invoke the publish acknowledgement so it is not lost */ + if (puback_control_id != 0) { + aws_mqtt5_client_invoke_publish_acknowledgement(client->native, puback_control_id, NULL); + } + goto cleanup; + } + + /* If _on_publish returned False/None, the user did not take control of the publish acknowledgement. + * Auto-invoke it now. */ + if (puback_control_id != 0 && !PyObject_IsTrue(result)) { + aws_mqtt5_client_invoke_publish_acknowledgement(client->native, puback_control_id, NULL); } cleanup: Py_XDECREF(result); Py_XDECREF(subscription_identifier_list); Py_XDECREF(user_properties_list); + Py_XDECREF(puback_control_id_py); PyGILState_Release(state); } @@ -1683,6 +1713,41 @@ PyObject *aws_py_mqtt5_client_publish(PyObject *self, PyObject *args) { return NULL; } +/******************************************************************************* + * Invoke Publish Acknowledgement + ******************************************************************************/ + +PyObject *aws_py_mqtt5_client_invoke_publish_acknowledgement(PyObject *self, PyObject *args) { + (void)self; + bool success = true; + + PyObject *impl_capsule; + unsigned long long control_id_ull = 0; + + if (!PyArg_ParseTuple( + args, + "OK", + /* O */ &impl_capsule, + /* K */ &control_id_ull)) { + return NULL; + } + + struct mqtt5_client_binding *client = PyCapsule_GetPointer(impl_capsule, s_capsule_name_mqtt5_client); + if (!client) { + return NULL; + } + + if (aws_mqtt5_client_invoke_publish_acknowledgement(client->native, (uint64_t)control_id_ull, NULL)) { + PyErr_SetAwsLastError(); + success = false; + } + + if (success) { + Py_RETURN_NONE; + } + return NULL; +} + /******************************************************************************* * Subscribe ******************************************************************************/ diff --git a/source/mqtt5_client.h b/source/mqtt5_client.h index 46c135f82..d05930177 100644 --- a/source/mqtt5_client.h +++ b/source/mqtt5_client.h @@ -14,6 +14,7 @@ PyObject *aws_py_mqtt5_client_publish(PyObject *self, PyObject *args); PyObject *aws_py_mqtt5_client_subscribe(PyObject *self, PyObject *args); PyObject *aws_py_mqtt5_client_unsubscribe(PyObject *self, PyObject *args); PyObject *aws_py_mqtt5_client_get_stats(PyObject *self, PyObject *args); +PyObject *aws_py_mqtt5_client_invoke_publish_acknowledgement(PyObject *self, PyObject *args); PyObject *aws_py_mqtt5_ws_handshake_transform_complete(PyObject *self, PyObject *args); diff --git a/test/test_mqtt5.py b/test/test_mqtt5.py index 4d8f7d0e2..7da0e7580 100644 --- a/test/test_mqtt5.py +++ b/test/test_mqtt5.py @@ -1574,6 +1574,352 @@ def _test_qos1_happy_path(self): def test_qos1_happy_path(self): test_retry_wrapper(self._test_qos1_happy_path) + # ============================================================== + # MANUAL PUBLISH ACKNOWLEDGEMENT TEST CASES + # ============================================================== + + # Manual publish acknowledgement hold test: hold publish acknowledgement and verify broker re-delivers the message + def _test_manual_publish_acknowledgement_hold(self): + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_id = create_client_id() + topic_filter = "test/MQTT5_ManualPuback_Python_" + client_id + payload = str(uuid.uuid4()) + payload_bytes = payload.encode("utf-8") + + PUBACK_HOLD_TIMEOUT = 60.0 + + future_first_delivery = Future() + future_redelivery = Future() + puback_handle_holder = [None] # mutable container so the closure can write to it + + def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): + received_payload = publish_received_data.publish_packet.payload + if not future_first_delivery.done(): + # First delivery: acquire manual publish acknowledgement control to hold the publish acknowledgement + puback_handle_holder[0] = publish_received_data.acquire_publish_acknowledgement_control() + future_first_delivery.set_result(received_payload) + elif not future_redelivery.done(): + # Second delivery: broker re-sent because no publish acknowledgement was received + future_redelivery.set_result(received_payload) + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( + input_cert, + input_key + ) + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=8883 + ) + client_options.connect_options = mqtt5.ConnectPacket(client_id=client_id) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + + callbacks = Mqtt5TestCallbacks() + callbacks.on_publish_received = on_publish_received + + client = self._create_client(client_options=client_options, callbacks=callbacks) + client.start() + callbacks.future_connection_success.result(TIMEOUT) + + # Subscribe to the topic with QoS 1 + subscriptions = [mqtt5.Subscription(topic_filter=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)] + subscribe_packet = mqtt5.SubscribePacket(subscriptions=subscriptions) + subscribe_future = client.subscribe(subscribe_packet=subscribe_packet) + suback_packet = subscribe_future.result(TIMEOUT) + self.assertIsInstance(suback_packet, mqtt5.SubackPacket) + + # Publish a QoS 1 message with a unique UUID payload + publish_packet = mqtt5.PublishPacket( + payload=payload, + topic=topic_filter, + qos=mqtt5.QoS.AT_LEAST_ONCE) + publish_future = client.publish(publish_packet=publish_packet) + publish_completion_data = publish_future.result(TIMEOUT) + self.assertIsInstance(publish_completion_data.puback, mqtt5.PubackPacket) + + # Wait for the first delivery and confirm publish acknowledgement was held + first_payload = future_first_delivery.result(TIMEOUT) + self.assertEqual(first_payload, payload_bytes) + self.assertIsNotNone( + puback_handle_holder[0], + "acquire_publish_acknowledgement_control() should have returned a handle") + + # Wait up to 60 seconds for the broker to re-deliver the message (no publish acknowledgement was sent) + redelivered_payload = future_redelivery.result(PUBACK_HOLD_TIMEOUT) + self.assertEqual(redelivered_payload, payload_bytes, + "Re-delivered payload should match the original UUID payload") + + # Release the held publish acknowledgement now that we've confirmed re-delivery + client.invoke_publish_acknowledgement(puback_handle_holder[0]) + + client.stop() + callbacks.future_stopped.result(TIMEOUT) + + def test_manual_publish_acknowledgement_hold(self): + test_retry_wrapper(self._test_manual_publish_acknowledgement_hold) + + # Manual publish acknowledgement invoke test: acquire and immediately + # invoke publish acknowledgement, verify no re-delivery + def _test_manual_publish_acknowledgement_invoke(self): + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_id = create_client_id() + topic_filter = "test/MQTT5_ManualPuback_Python_" + client_id + payload = str(uuid.uuid4()) + payload_bytes = payload.encode("utf-8") + + NO_REDELIVERY_WAIT = 60.0 + + future_first_delivery = Future() + future_unexpected_redelivery = Future() + puback_handle_holder = [None] # mutable container so the closure can write to it + + def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): + received_payload = publish_received_data.publish_packet.payload + if not future_first_delivery.done(): + # First delivery: acquire manual publish acknowledgement control, then immediately invoke it + puback_handle_holder[0] = publish_received_data.acquire_publish_acknowledgement_control() + future_first_delivery.set_result(received_payload) + elif received_payload == payload_bytes and not future_unexpected_redelivery.done(): + # A second delivery of the same payload means the broker re-sent, this should NOT happen + future_unexpected_redelivery.set_result(received_payload) + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( + input_cert, + input_key + ) + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=8883 + ) + client_options.connect_options = mqtt5.ConnectPacket(client_id=client_id) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + + callbacks = Mqtt5TestCallbacks() + callbacks.on_publish_received = on_publish_received + + client = self._create_client(client_options=client_options, callbacks=callbacks) + client.start() + callbacks.future_connection_success.result(TIMEOUT) + + # Subscribe to the topic with QoS 1 + subscriptions = [mqtt5.Subscription(topic_filter=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)] + subscribe_packet = mqtt5.SubscribePacket(subscriptions=subscriptions) + subscribe_future = client.subscribe(subscribe_packet=subscribe_packet) + suback_packet = subscribe_future.result(TIMEOUT) + self.assertIsInstance(suback_packet, mqtt5.SubackPacket) + + # Publish a QoS 1 message with a unique UUID payload + publish_packet = mqtt5.PublishPacket( + payload=payload, + topic=topic_filter, + qos=mqtt5.QoS.AT_LEAST_ONCE) + publish_future = client.publish(publish_packet=publish_packet) + publish_completion_data = publish_future.result(TIMEOUT) + self.assertIsInstance(publish_completion_data.puback, mqtt5.PubackPacket) + + # Wait for the first delivery and confirm publish acknowledgement handle was acquired + first_payload = future_first_delivery.result(TIMEOUT) + self.assertEqual(first_payload, payload_bytes) + self.assertIsNotNone(puback_handle_holder[0], + "acquire_publish_acknowledgement_control() should have returned a handle") + + # Immediately invoke the publish acknowledgement using the acquired handle + client.invoke_publish_acknowledgement(puback_handle_holder[0]) + + # Wait 60 seconds and confirm the broker does NOT re-deliver the message + # (because we sent the publish acknowledgement via invoke_publish_acknowledgement) + redelivered = future_unexpected_redelivery.done() or \ + (not future_unexpected_redelivery.done() and + self._wait_for_future_timeout(future_unexpected_redelivery, NO_REDELIVERY_WAIT)) + self.assertFalse(redelivered, + "Broker should NOT re-deliver the message after invoke_publish_acknowledgement() was called") + + client.stop() + callbacks.future_stopped.result(TIMEOUT) + + def _wait_for_future_timeout(self, future, timeout_sec): + """Returns True if the future completed within timeout_sec, False if it timed out.""" + try: + future.result(timeout_sec) + return True + except Exception: + return False + + def test_manual_publish_acknowledgement_invoke(self): + test_retry_wrapper(self._test_manual_publish_acknowledgement_invoke) + + # Manual publish acknowledgement double-call test: calling + # acquire_publish_acknowledgement_control() twice raises RuntimeError + def _test_manual_publish_acknowledgement_acquire_double_call_raises(self): + """Verify that calling acquire_publish_acknowledgement_control() twice on the same QoS 1 PUBLISH raises RuntimeError.""" + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_id = create_client_id() + topic_filter = "test/MQTT5_Binding_Python_" + client_id + payload = str(uuid.uuid4()) + + future_result = Future() + + def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): + try: + # First call should succeed + handle = publish_received_data.acquire_publish_acknowledgement_control() + # Second call on the same message should raise RuntimeError + try: + publish_received_data.acquire_publish_acknowledgement_control() + future_result.set_result("no_error") # Should not reach here + except RuntimeError: + future_result.set_result("double_call_raised") + except Exception as e: + future_result.set_result(f"unexpected_error: {e}") + # Release the handle we acquired + # (handle is valid, invoke it to clean up) + except Exception as e: + future_result.set_result(f"first_call_failed: {e}") + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path(input_cert, input_key) + client_options = mqtt5.ClientOptions(host_name=input_host_name, port=8883) + client_options.connect_options = mqtt5.ConnectPacket(client_id=client_id) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + + callbacks = Mqtt5TestCallbacks() + callbacks.on_publish_received = on_publish_received + + client = self._create_client(client_options=client_options, callbacks=callbacks) + client.start() + callbacks.future_connection_success.result(TIMEOUT) + + subscriptions = [mqtt5.Subscription(topic_filter=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)] + subscribe_future = client.subscribe(mqtt5.SubscribePacket(subscriptions=subscriptions)) + subscribe_future.result(TIMEOUT) + + publish_future = client.publish(mqtt5.PublishPacket( + payload=payload, topic=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)) + publish_future.result(TIMEOUT) + + result = future_result.result(TIMEOUT) + self.assertEqual(result, "double_call_raised", + f"Expected RuntimeError on double-call, got: {result}") + + client.stop() + callbacks.future_stopped.result(TIMEOUT) + + def test_manual_publish_acknowledgement_acquire_double_call_raises(self): + test_retry_wrapper(self._test_manual_publish_acknowledgement_acquire_double_call_raises) + + # Manual publish acknowledgement post-callback test: calling + # acquire_publish_acknowledgement_control() after callback returns raises + # RuntimeError + def _test_manual_publish_acknowledgement_acquire_post_callback_raises(self): + """Verify that calling acquire_publish_acknowledgement_control() after the callback has returned raises RuntimeError.""" + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_id = create_client_id() + topic_filter = "test/MQTT5_Binding_Python_" + client_id + payload = str(uuid.uuid4()) + + future_callback_done = Future() + saved_acquire_fn_holder = [None] + + def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): + # Save the callable but do NOT call it within the callback + saved_acquire_fn_holder[0] = publish_received_data.acquire_publish_acknowledgement_control + future_callback_done.set_result(True) + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path(input_cert, input_key) + client_options = mqtt5.ClientOptions(host_name=input_host_name, port=8883) + client_options.connect_options = mqtt5.ConnectPacket(client_id=client_id) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + + callbacks = Mqtt5TestCallbacks() + callbacks.on_publish_received = on_publish_received + + client = self._create_client(client_options=client_options, callbacks=callbacks) + client.start() + callbacks.future_connection_success.result(TIMEOUT) + + subscriptions = [mqtt5.Subscription(topic_filter=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)] + subscribe_future = client.subscribe(mqtt5.SubscribePacket(subscriptions=subscriptions)) + subscribe_future.result(TIMEOUT) + + publish_future = client.publish(mqtt5.PublishPacket( + payload=payload, topic=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)) + publish_future.result(TIMEOUT) + + # Wait for the callback to complete + future_callback_done.result(TIMEOUT) + + # Now call acquire_publish_acknowledgement_control() after the callback + # has returned, this should raise RuntimeError + acquire_fn = saved_acquire_fn_holder[0] + self.assertIsNotNone(acquire_fn, "acquire_publish_acknowledgement_control should have been saved") + with self.assertRaises(RuntimeError): + acquire_fn() + + client.stop() + callbacks.future_stopped.result(TIMEOUT) + + def test_manual_publish_acknowledgement_acquire_post_callback_raises(self): + test_retry_wrapper(self._test_manual_publish_acknowledgement_acquire_post_callback_raises) + + # Manual publish acknowledgement QoS 0 test: acquire_publish_acknowledgement_control is None for QoS 0 messages + def _test_manual_publish_acknowledgement_qos0_acquire_is_none(self): + """Verify that acquire_publish_acknowledgement_control is None for QoS 0 messages.""" + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_id = create_client_id() + topic_filter = "test/MQTT5_Binding_Python_" + client_id + payload = str(uuid.uuid4()) + + future_acquire_value = Future() + + def on_publish_received(publish_received_data: mqtt5.PublishReceivedData): + # For QoS 0, acquire_publish_acknowledgement_control should be None + future_acquire_value.set_result(publish_received_data.acquire_publish_acknowledgement_control) + + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path(input_cert, input_key) + client_options = mqtt5.ClientOptions(host_name=input_host_name, port=8883) + client_options.connect_options = mqtt5.ConnectPacket(client_id=client_id) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + + callbacks = Mqtt5TestCallbacks() + callbacks.on_publish_received = on_publish_received + + client = self._create_client(client_options=client_options, callbacks=callbacks) + client.start() + callbacks.future_connection_success.result(TIMEOUT) + + # Subscribe with QoS 1 so the broker delivers at QoS 0 (publish at QoS 0) + subscriptions = [mqtt5.Subscription(topic_filter=topic_filter, qos=mqtt5.QoS.AT_LEAST_ONCE)] + subscribe_future = client.subscribe(mqtt5.SubscribePacket(subscriptions=subscriptions)) + subscribe_future.result(TIMEOUT) + + # Publish at QoS 0, there's no PUBACK involved + publish_future = client.publish(mqtt5.PublishPacket( + payload=payload, topic=topic_filter, qos=mqtt5.QoS.AT_MOST_ONCE)) + publish_future.result(TIMEOUT) + + acquire_value = future_acquire_value.result(TIMEOUT) + self.assertIsNone(acquire_value, + "acquire_publish_acknowledgement_control should be None for QoS 0 messages") + + client.stop() + callbacks.future_stopped.result(TIMEOUT) + + def test_manual_publish_acknowledgement_qos0_acquire_is_none(self): + test_retry_wrapper(self._test_manual_publish_acknowledgement_qos0_acquire_is_none) + # ============================================================== # RETAIN TEST CASES # ==============================================================