-
Notifications
You must be signed in to change notification settings - Fork 52
Manual PUBACK Control #721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
790759d
78052e9
b45f3be
1c380bf
d872ccb
7114645
26b5584
a406590
a7d62c1
db9e100
3741824
8cb2568
417d77c
b7245db
94daf2d
82526b2
13ce0f7
89cf2cb
31b1ea2
6f41ace
63f662f
56dda25
11da313
f3d463a
0b725f6
f4585bb
ec83b7c
41d66e6
42ccd01
1e55e8c
60eb5de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1222,14 +1222,38 @@ def set_done(self, exception=None): | |
| self._done_future.set_exception(exception) | ||
|
|
||
|
|
||
| class PublishAcknowledgementControlHandle: | ||
| """Opaque handle for manually controlling the publish acknowledgement for a received QoS 1 PUBLISH. | ||
|
|
||
| Obtained by calling acquire_publish_acknowledgement_control() within the on_publish_callback_fn callback. | ||
| Pass to Client.invoke_publish_acknowledgement() to send the publish acknowledgement to the broker. | ||
| """ | ||
|
|
||
| def __init__(self, control_id: int): | ||
| self._control_id = control_id | ||
|
|
||
|
|
||
| @dataclass | ||
| class PublishReceivedData: | ||
| """Dataclass containing data related to a Publish Received Callback | ||
|
|
||
| Args: | ||
| publish_packet (PublishPacket): Data model of an `MQTT5 PUBLISH <https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901100>`_ packet. | ||
| acquire_publish_acknowledgement_control (Callable): For QoS 1 messages only: call this function within the | ||
| on_publish_callback_fn callback to take manual control of the publish acknowledgement for this message, | ||
| preventing the client from automatically sending a publish acknowledgement. Returns a | ||
| :class:`PublishAcknowledgementControlHandle` that can be passed to | ||
| Client.invoke_publish_acknowledgement() to send the publish acknowledgement to the broker. | ||
|
|
||
| Important: This function must be called within the on_publish_callback_fn callback. Calling it after the | ||
| callback returns will raise a RuntimeError. This function may only be called once per received PUBLISH; | ||
| calling it a second time will also raise a RuntimeError. If this function is not called, the client will | ||
| automatically send a publish acknowledgement for QoS 1 messages when the callback returns. | ||
|
|
||
| For QoS 0 messages, this field is None. | ||
| """ | ||
| publish_packet: PublishPacket = None | ||
| acquire_publish_acknowledgement_control: Callable = None | ||
|
|
||
|
|
||
| @dataclass | ||
|
|
@@ -1434,9 +1458,11 @@ def _on_publish( | |
| correlation_data, | ||
| subscription_identifiers_tuples, | ||
| content_type, | ||
| user_properties_tuples): | ||
| user_properties_tuples, | ||
| puback_control_id): | ||
| if self._on_publish_cb is None: | ||
| return | ||
| # Indicates that manual puback control was not taken and puback should be invoked automatically. | ||
| return False | ||
|
|
||
| publish_packet = PublishPacket() | ||
| publish_packet.topic = topic | ||
|
|
@@ -1468,9 +1494,40 @@ def _on_publish( | |
| publish_packet.content_type = content_type | ||
| publish_packet.user_properties = _init_user_properties(user_properties_tuples) | ||
|
|
||
| self._on_publish_cb(PublishReceivedData(publish_packet=publish_packet)) | ||
| # For QoS 1 messages, set up the acquire_publish_acknowledgement_control callable. | ||
| # The native side has already called aws_mqtt5_client_acquire_publish_acknowledgement and passed us the | ||
| # puback_control_id. We wrap it in an opaque capsule handle and provide a callable that | ||
| # returns that handle. The callable may only be called once; calling it marks the publish | ||
| # acknowledgement as "taken" so the native side will not auto-invoke it after this callback returns. | ||
| puback_taken = False | ||
| callback_active = True | ||
| acquire_publish_acknowledgement_control = None | ||
|
|
||
| if puback_control_id != 0: | ||
| def acquire_publish_acknowledgement_control(): | ||
| nonlocal puback_taken | ||
| if puback_taken: | ||
| raise RuntimeError( | ||
| "acquire_publish_acknowledgement_control() may only be called once per received PUBLISH.") | ||
| if not callback_active: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this need nonlocal declaration too? |
||
| raise RuntimeError( | ||
| "acquire_publish_acknowledgement_control() must be called within the on_publish_callback_fn callback.") | ||
| puback_taken = True | ||
| return PublishAcknowledgementControlHandle(puback_control_id) | ||
|
|
||
| # Create PublishReceivedData with the acquire_publish_acknowledgement_control callable (or None for QoS 0) | ||
| publish_data = PublishReceivedData( | ||
| publish_packet=publish_packet, | ||
| acquire_publish_acknowledgement_control=acquire_publish_acknowledgement_control | ||
| ) | ||
|
|
||
| return | ||
| self._on_publish_cb(publish_data) | ||
|
|
||
| callback_active = False | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought we were going to call |
||
| # Return True if the user called acquire_publish_acknowledgement_control(), signalling to the native side | ||
| # that it should NOT auto-invoke the publish acknowledgement (the user is responsible for calling | ||
| # invoke_publish_acknowledgement). | ||
| return puback_taken | ||
|
|
||
| def _on_lifecycle_stopped(self): | ||
| if self._on_lifecycle_stopped_cb: | ||
|
|
@@ -1957,6 +2014,27 @@ def get_stats(self): | |
| result = _awscrt.mqtt5_client_get_stats(self._binding) | ||
| return OperationStatisticsData(result[0], result[1], result[2], result[3]) | ||
|
|
||
| def invoke_publish_acknowledgement( | ||
| self, publish_acknowledgement_control_handle: 'PublishAcknowledgementControlHandle'): | ||
| """Sends a publish acknowledgement for a QoS 1 PUBLISH that was previously acquired for manual control. | ||
|
|
||
| To use manual publish acknowledgement control, call acquire_publish_acknowledgement_control() within | ||
| the on_publish_callback_fn callback to obtain a :class:`PublishAcknowledgementControlHandle`. Then call | ||
| this method to send the publish acknowledgement. | ||
|
|
||
| Args: | ||
| publish_acknowledgement_control_handle (PublishAcknowledgementControlHandle): An opaque handle obtained | ||
| from acquire_publish_acknowledgement_control() within PublishReceivedData. | ||
|
|
||
| Raises: | ||
| Exception: If the native client returns an error when invoking the publish acknowledgement. | ||
| """ | ||
|
|
||
| _awscrt.mqtt5_client_invoke_publish_acknowledgement( | ||
| self._binding, | ||
| publish_acknowledgement_control_handle._control_id | ||
| ) | ||
|
|
||
| def new_connection(self, on_connection_interrupted=None, on_connection_resumed=None, | ||
| on_connection_success=None, on_connection_failure=None, on_connection_closed=None): | ||
| from awscrt.mqtt import Connection | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -234,10 +234,26 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu | |
| PyObject *result = NULL; | ||
| PyObject *subscription_identifier_list = NULL; | ||
| PyObject *user_properties_list = NULL; | ||
| PyObject *puback_control_id_py = NULL; | ||
|
|
||
| size_t subscription_identifier_count = publish_packet->subscription_identifier_count; | ||
| size_t user_property_count = publish_packet->user_property_count; | ||
|
|
||
| /* For QoS 1 messages, take manual control of the publish acknowledgement immediately. | ||
| * This gives us a pub_ack_control_id that we pass to Python as an opaque integer. | ||
| * Python's _on_publish will return True if the user called acquire_publish_acknowledgement_control(), | ||
| * in which case the user is responsible for calling invoke_publish_acknowledgement() later. | ||
| * If _on_publish returns False/None, we auto-invoke the publish acknowledgement here. */ | ||
| uint64_t puback_control_id = 0; | ||
| if (publish_packet->qos == AWS_MQTT5_QOS_AT_LEAST_ONCE) { | ||
| puback_control_id = aws_mqtt5_client_acquire_publish_acknowledgement(client->native, publish_packet); | ||
| puback_control_id_py = PyLong_FromUnsignedLongLong((unsigned long long)puback_control_id); | ||
| if (!puback_control_id_py) { | ||
| PyErr_WriteUnraisable(PyErr_Occurred()); | ||
| goto cleanup; | ||
| } | ||
| } | ||
|
|
||
| /* Create list of uint32_t subscription identifier tuples */ | ||
| subscription_identifier_list = PyList_New(subscription_identifier_count); | ||
| if (!subscription_identifier_list) { | ||
|
|
@@ -261,7 +277,7 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu | |
| result = PyObject_CallMethod( | ||
| client->client_core, | ||
| "_on_publish", | ||
| "(y#iOs#OiOIOHs#y#Os#O)", | ||
| "(y#iOs#OiOIOHs#y#Os#OK)", | ||
| /* y */ publish_packet->payload.ptr, | ||
| /* # */ publish_packet->payload.len, | ||
| /* i */ (int)publish_packet->qos, | ||
|
|
@@ -284,15 +300,29 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu | |
| /* O */ subscription_identifier_count > 0 ? subscription_identifier_list : Py_None, | ||
| /* s */ publish_packet->content_type ? publish_packet->content_type->ptr : NULL, | ||
| /* # */ publish_packet->content_type ? publish_packet->content_type->len : 0, | ||
| /* O */ user_property_count > 0 ? user_properties_list : Py_None); | ||
| /* O */ user_property_count > 0 ? user_properties_list : Py_None, | ||
| /* K */ (unsigned long long)puback_control_id); | ||
|
|
||
| if (!result) { | ||
| PyErr_WriteUnraisable(PyErr_Occurred()); | ||
| /* On error, auto-invoke the publish acknowledgement so it is not lost */ | ||
| if (puback_control_id != 0) { | ||
| aws_mqtt5_client_invoke_publish_acknowledgement(client->native, puback_control_id, NULL); | ||
| } | ||
| goto cleanup; | ||
| } | ||
|
|
||
| /* If _on_publish returned False/None, the user did not take control of the publish acknowledgement. | ||
| * Auto-invoke it now. */ | ||
| if (puback_control_id != 0 && !PyObject_IsTrue(result)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can't you combine these two (remove the top one) by changing this to |
||
| aws_mqtt5_client_invoke_publish_acknowledgement(client->native, puback_control_id, NULL); | ||
| } | ||
|
|
||
| cleanup: | ||
| Py_XDECREF(result); | ||
| Py_XDECREF(subscription_identifier_list); | ||
| Py_XDECREF(user_properties_list); | ||
| Py_XDECREF(puback_control_id_py); | ||
| PyGILState_Release(state); | ||
| } | ||
|
|
||
|
|
@@ -1683,6 +1713,41 @@ PyObject *aws_py_mqtt5_client_publish(PyObject *self, PyObject *args) { | |
| return NULL; | ||
| } | ||
|
|
||
| /******************************************************************************* | ||
| * Invoke Publish Acknowledgement | ||
| ******************************************************************************/ | ||
|
|
||
| PyObject *aws_py_mqtt5_client_invoke_publish_acknowledgement(PyObject *self, PyObject *args) { | ||
| (void)self; | ||
| bool success = true; | ||
|
|
||
| PyObject *impl_capsule; | ||
| unsigned long long control_id_ull = 0; | ||
|
|
||
| if (!PyArg_ParseTuple( | ||
| args, | ||
| "OK", | ||
| /* O */ &impl_capsule, | ||
| /* K */ &control_id_ull)) { | ||
| return NULL; | ||
| } | ||
|
|
||
| struct mqtt5_client_binding *client = PyCapsule_GetPointer(impl_capsule, s_capsule_name_mqtt5_client); | ||
| if (!client) { | ||
| return NULL; | ||
| } | ||
|
|
||
| if (aws_mqtt5_client_invoke_publish_acknowledgement(client->native, (uint64_t)control_id_ull, NULL)) { | ||
| PyErr_SetAwsLastError(); | ||
| success = false; | ||
| } | ||
|
|
||
| if (success) { | ||
| Py_RETURN_NONE; | ||
| } | ||
| return NULL; | ||
| } | ||
|
|
||
| /******************************************************************************* | ||
| * Subscribe | ||
| ******************************************************************************/ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need synchronization support to be thread-safe here? Even though we're in the client's callback, it's still possible for this value to cross thread boundaries and this to be invoked concurrently.