Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
790759d
bind mqtt5 manual puback feature
sbSteveK Feb 20, 2026
78052e9
Merge branch 'main' into manual-puback
sbSteveK Feb 20, 2026
b45f3be
remove completion options
sbSteveK Mar 2, 2026
1c380bf
remove completion options and lint
sbSteveK Mar 2, 2026
d872ccb
comments
sbSteveK Mar 9, 2026
7114645
lint
sbSteveK Mar 9, 2026
26b5584
unify api language
sbSteveK Mar 9, 2026
a406590
latest submodules
sbSteveK Mar 11, 2026
a7d62c1
main submodules
sbSteveK Mar 11, 2026
db9e100
main aws-c-mqtt
sbSteveK Mar 11, 2026
3741824
bind mqtt5 manual puback feature
sbSteveK Feb 20, 2026
8cb2568
merge with main
sbSteveK Mar 11, 2026
417d77c
point to latest aws-c-mqtt
sbSteveK Mar 11, 2026
b7245db
lint
sbSteveK Mar 11, 2026
94daf2d
restore removal of completion options
sbSteveK Mar 11, 2026
82526b2
more remnants of completion options
sbSteveK Mar 11, 2026
13ce0f7
update documentation and remove completion future from invoke puback
sbSteveK Mar 12, 2026
89cf2cb
more documentation changes
sbSteveK Mar 12, 2026
31b1ea2
prevent double use and use after we are out of the publish received c…
sbSteveK Mar 12, 2026
6f41ace
manual puback tests
sbSteveK Mar 12, 2026
63f662f
more manual puback tests
sbSteveK Mar 13, 2026
56dda25
add more comments
sbSteveK Mar 13, 2026
11da313
bindings always acquires manual puback then sends if user doesn't tak…
sbSteveK Mar 17, 2026
f3d463a
handle calling the callback outside of the publish received callback
sbSteveK Mar 17, 2026
0b725f6
lint
sbSteveK Mar 17, 2026
f4585bb
aws-c-mqtt v0.15.1
sbSteveK Mar 17, 2026
ec83b7c
Use a PubackControlHandle python dataclass to simplify mem management…
sbSteveK Mar 17, 2026
41d66e6
aws-c-mqtt v0.15.2
sbSteveK Mar 18, 2026
42ccd01
puback -> publish acknowledgement
sbSteveK Mar 18, 2026
1e55e8c
lint
sbSteveK Mar 18, 2026
60eb5de
more lint
sbSteveK Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 82 additions & 4 deletions awscrt/mqtt5.py
Original file line number Diff line number Diff line change
Expand Up @@ -1222,14 +1222,38 @@ def set_done(self, exception=None):
self._done_future.set_exception(exception)


class PublishAcknowledgementControlHandle:
"""Opaque handle for manually controlling the publish acknowledgement for a received QoS 1 PUBLISH.

Obtained by calling acquire_publish_acknowledgement_control() within the on_publish_callback_fn callback.
Pass to Client.invoke_publish_acknowledgement() to send the publish acknowledgement to the broker.
"""

def __init__(self, control_id: int):
self._control_id = control_id


@dataclass
class PublishReceivedData:
"""Dataclass containing data related to a Publish Received Callback

Args:
publish_packet (PublishPacket): Data model of an `MQTT5 PUBLISH <https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901100>`_ packet.
acquire_publish_acknowledgement_control (Callable): For QoS 1 messages only: call this function within the
on_publish_callback_fn callback to take manual control of the publish acknowledgement for this message,
preventing the client from automatically sending a publish acknowledgement. Returns a
:class:`PublishAcknowledgementControlHandle` that can be passed to
Client.invoke_publish_acknowledgement() to send the publish acknowledgement to the broker.

Important: This function must be called within the on_publish_callback_fn callback. Calling it after the
callback returns will raise a RuntimeError. This function may only be called once per received PUBLISH;
calling it a second time will also raise a RuntimeError. If this function is not called, the client will
automatically send a publish acknowledgement for QoS 1 messages when the callback returns.

For QoS 0 messages, this field is None.
"""
publish_packet: PublishPacket = None
acquire_publish_acknowledgement_control: Callable = None


@dataclass
Expand Down Expand Up @@ -1434,9 +1458,11 @@ def _on_publish(
correlation_data,
subscription_identifiers_tuples,
content_type,
user_properties_tuples):
user_properties_tuples,
puback_control_id):
if self._on_publish_cb is None:
return
# Indicates that manual puback control was not taken and puback should be invoked automatically.
return False

publish_packet = PublishPacket()
publish_packet.topic = topic
Expand Down Expand Up @@ -1468,9 +1494,40 @@ def _on_publish(
publish_packet.content_type = content_type
publish_packet.user_properties = _init_user_properties(user_properties_tuples)

self._on_publish_cb(PublishReceivedData(publish_packet=publish_packet))
# For QoS 1 messages, set up the acquire_publish_acknowledgement_control callable.
# The native side has already called aws_mqtt5_client_acquire_publish_acknowledgement and passed us the
# puback_control_id. We wrap it in an opaque capsule handle and provide a callable that
# returns that handle. The callable may only be called once; calling it marks the publish
# acknowledgement as "taken" so the native side will not auto-invoke it after this callback returns.
puback_taken = False
callback_active = True
acquire_publish_acknowledgement_control = None

if puback_control_id != 0:
def acquire_publish_acknowledgement_control():
nonlocal puback_taken
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need synchronization support to be thread-safe here? Even though we're in the client's callback, it's still possible for this value to cross thread boundaries and this to be invoked concurrently.

if puback_taken:
raise RuntimeError(
"acquire_publish_acknowledgement_control() may only be called once per received PUBLISH.")
if not callback_active:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need nonlocal declaration too?

raise RuntimeError(
"acquire_publish_acknowledgement_control() must be called within the on_publish_callback_fn callback.")
puback_taken = True
return PublishAcknowledgementControlHandle(puback_control_id)

# Create PublishReceivedData with the acquire_publish_acknowledgement_control callable (or None for QoS 0)
publish_data = PublishReceivedData(
publish_packet=publish_packet,
acquire_publish_acknowledgement_control=acquire_publish_acknowledgement_control
)

return
self._on_publish_cb(publish_data)

callback_active = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we were going to call acquire_publish_acknowledgement_control here to prevent calls to it from succeeding after we exit the callback?

# Return True if the user called acquire_publish_acknowledgement_control(), signalling to the native side
# that it should NOT auto-invoke the publish acknowledgement (the user is responsible for calling
# invoke_publish_acknowledgement).
return puback_taken

def _on_lifecycle_stopped(self):
if self._on_lifecycle_stopped_cb:
Expand Down Expand Up @@ -1957,6 +2014,27 @@ def get_stats(self):
result = _awscrt.mqtt5_client_get_stats(self._binding)
return OperationStatisticsData(result[0], result[1], result[2], result[3])

def invoke_publish_acknowledgement(
self, publish_acknowledgement_control_handle: 'PublishAcknowledgementControlHandle'):
"""Sends a publish acknowledgement for a QoS 1 PUBLISH that was previously acquired for manual control.

To use manual publish acknowledgement control, call acquire_publish_acknowledgement_control() within
the on_publish_callback_fn callback to obtain a :class:`PublishAcknowledgementControlHandle`. Then call
this method to send the publish acknowledgement.

Args:
publish_acknowledgement_control_handle (PublishAcknowledgementControlHandle): An opaque handle obtained
from acquire_publish_acknowledgement_control() within PublishReceivedData.

Raises:
Exception: If the native client returns an error when invoking the publish acknowledgement.
"""

_awscrt.mqtt5_client_invoke_publish_acknowledgement(
self._binding,
publish_acknowledgement_control_handle._control_id
)

def new_connection(self, on_connection_interrupted=None, on_connection_resumed=None,
on_connection_success=None, on_connection_failure=None, on_connection_closed=None):
from awscrt.mqtt import Connection
Expand Down
1 change: 1 addition & 0 deletions source/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ static PyMethodDef s_module_methods[] = {
AWS_PY_METHOD_DEF(mqtt5_client_subscribe, METH_VARARGS),
AWS_PY_METHOD_DEF(mqtt5_client_unsubscribe, METH_VARARGS),
AWS_PY_METHOD_DEF(mqtt5_client_get_stats, METH_VARARGS),
AWS_PY_METHOD_DEF(mqtt5_client_invoke_publish_acknowledgement, METH_VARARGS),
AWS_PY_METHOD_DEF(mqtt5_ws_handshake_transform_complete, METH_VARARGS),

/* MQTT Request Response Client */
Expand Down
69 changes: 67 additions & 2 deletions source/mqtt5_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,26 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu
PyObject *result = NULL;
PyObject *subscription_identifier_list = NULL;
PyObject *user_properties_list = NULL;
PyObject *puback_control_id_py = NULL;

size_t subscription_identifier_count = publish_packet->subscription_identifier_count;
size_t user_property_count = publish_packet->user_property_count;

/* For QoS 1 messages, take manual control of the publish acknowledgement immediately.
* This gives us a pub_ack_control_id that we pass to Python as an opaque integer.
* Python's _on_publish will return True if the user called acquire_publish_acknowledgement_control(),
* in which case the user is responsible for calling invoke_publish_acknowledgement() later.
* If _on_publish returns False/None, we auto-invoke the publish acknowledgement here. */
uint64_t puback_control_id = 0;
if (publish_packet->qos == AWS_MQTT5_QOS_AT_LEAST_ONCE) {
puback_control_id = aws_mqtt5_client_acquire_publish_acknowledgement(client->native, publish_packet);
puback_control_id_py = PyLong_FromUnsignedLongLong((unsigned long long)puback_control_id);
if (!puback_control_id_py) {
PyErr_WriteUnraisable(PyErr_Occurred());
goto cleanup;
}
}

/* Create list of uint32_t subscription identifier tuples */
subscription_identifier_list = PyList_New(subscription_identifier_count);
if (!subscription_identifier_list) {
Expand All @@ -261,7 +277,7 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu
result = PyObject_CallMethod(
client->client_core,
"_on_publish",
"(y#iOs#OiOIOHs#y#Os#O)",
"(y#iOs#OiOIOHs#y#Os#OK)",
/* y */ publish_packet->payload.ptr,
/* # */ publish_packet->payload.len,
/* i */ (int)publish_packet->qos,
Expand All @@ -284,15 +300,29 @@ static void s_on_publish_received(const struct aws_mqtt5_packet_publish_view *pu
/* O */ subscription_identifier_count > 0 ? subscription_identifier_list : Py_None,
/* s */ publish_packet->content_type ? publish_packet->content_type->ptr : NULL,
/* # */ publish_packet->content_type ? publish_packet->content_type->len : 0,
/* O */ user_property_count > 0 ? user_properties_list : Py_None);
/* O */ user_property_count > 0 ? user_properties_list : Py_None,
/* K */ (unsigned long long)puback_control_id);

if (!result) {
PyErr_WriteUnraisable(PyErr_Occurred());
/* On error, auto-invoke the publish acknowledgement so it is not lost */
if (puback_control_id != 0) {
aws_mqtt5_client_invoke_publish_acknowledgement(client->native, puback_control_id, NULL);
}
goto cleanup;
}

/* If _on_publish returned False/None, the user did not take control of the publish acknowledgement.
* Auto-invoke it now. */
if (puback_control_id != 0 && !PyObject_IsTrue(result)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't you combine these two (remove the top one) by changing this to

if (puback_control_id != 0 && (!result || !PyObject_IsTrue(result)) {
   ...
}

aws_mqtt5_client_invoke_publish_acknowledgement(client->native, puback_control_id, NULL);
}

cleanup:
Py_XDECREF(result);
Py_XDECREF(subscription_identifier_list);
Py_XDECREF(user_properties_list);
Py_XDECREF(puback_control_id_py);
PyGILState_Release(state);
}

Expand Down Expand Up @@ -1683,6 +1713,41 @@ PyObject *aws_py_mqtt5_client_publish(PyObject *self, PyObject *args) {
return NULL;
}

/*******************************************************************************
* Invoke Publish Acknowledgement
******************************************************************************/

PyObject *aws_py_mqtt5_client_invoke_publish_acknowledgement(PyObject *self, PyObject *args) {
(void)self;
bool success = true;

PyObject *impl_capsule;
unsigned long long control_id_ull = 0;

if (!PyArg_ParseTuple(
args,
"OK",
/* O */ &impl_capsule,
/* K */ &control_id_ull)) {
return NULL;
}

struct mqtt5_client_binding *client = PyCapsule_GetPointer(impl_capsule, s_capsule_name_mqtt5_client);
if (!client) {
return NULL;
}

if (aws_mqtt5_client_invoke_publish_acknowledgement(client->native, (uint64_t)control_id_ull, NULL)) {
PyErr_SetAwsLastError();
success = false;
}

if (success) {
Py_RETURN_NONE;
}
return NULL;
}

/*******************************************************************************
* Subscribe
******************************************************************************/
Expand Down
1 change: 1 addition & 0 deletions source/mqtt5_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ PyObject *aws_py_mqtt5_client_publish(PyObject *self, PyObject *args);
PyObject *aws_py_mqtt5_client_subscribe(PyObject *self, PyObject *args);
PyObject *aws_py_mqtt5_client_unsubscribe(PyObject *self, PyObject *args);
PyObject *aws_py_mqtt5_client_get_stats(PyObject *self, PyObject *args);
PyObject *aws_py_mqtt5_client_invoke_publish_acknowledgement(PyObject *self, PyObject *args);

PyObject *aws_py_mqtt5_ws_handshake_transform_complete(PyObject *self, PyObject *args);

Expand Down
Loading
Loading