1212from roborock .roborock_message import RoborockMessage
1313
1414from .channel import Channel
15+ from .pending import PendingRpcs
1516
1617_LOGGER = logging .getLogger (__name__ )
1718
@@ -31,10 +32,9 @@ def __init__(self, mqtt_session: MqttSession, duid: str, local_key: str, rriot:
3132 self ._mqtt_params = mqtt_params
3233
3334 # RPC support
34- self ._waiting_queue : dict [int , asyncio . Future [ RoborockMessage ]] = {}
35+ self ._pending_rpcs : PendingRpcs [int , RoborockMessage ] = PendingRpcs ()
3536 self ._decoder = create_mqtt_decoder (local_key )
3637 self ._encoder = create_mqtt_encoder (local_key )
37- self ._queue_lock = asyncio .Lock ()
3838 self ._mqtt_unsub : Callable [[], None ] | None = None
3939
4040 @property
@@ -89,11 +89,7 @@ async def _resolve_future_with_lock(self, message: RoborockMessage) -> None:
8989 if (request_id := message .get_request_id ()) is None :
9090 _LOGGER .debug ("Received message with no request_id" )
9191 return
92- async with self ._queue_lock :
93- if (future := self ._waiting_queue .pop (request_id , None )) is not None :
94- future .set_result (message )
95- else :
96- _LOGGER .debug ("Received message with no waiting handler: request_id=%s" , request_id )
92+ await self ._pending_rpcs .resolve (request_id , message )
9793
9894 async def send_message (self , message : RoborockMessage , timeout : float = 10.0 ) -> RoborockMessage :
9995 """Send a command message and wait for the response message.
@@ -107,11 +103,7 @@ async def send_message(self, message: RoborockMessage, timeout: float = 10.0) ->
107103 _LOGGER .exception ("Error getting request_id from message: %s" , err )
108104 raise RoborockException (f"Invalid message format, Message must have a request_id: { err } " ) from err
109105
110- future : asyncio .Future [RoborockMessage ] = asyncio .Future ()
111- async with self ._queue_lock :
112- if request_id in self ._waiting_queue :
113- raise RoborockException (f"Request ID { request_id } already pending, cannot send command" )
114- self ._waiting_queue [request_id ] = future
106+ future : asyncio .Future [RoborockMessage ] = await self ._pending_rpcs .start (request_id )
115107
116108 try :
117109 encoded_msg = self ._encoder (message )
@@ -120,13 +112,11 @@ async def send_message(self, message: RoborockMessage, timeout: float = 10.0) ->
120112 return await asyncio .wait_for (future , timeout = timeout )
121113
122114 except asyncio .TimeoutError as ex :
123- async with self ._queue_lock :
124- self ._waiting_queue .pop (request_id , None )
115+ await self ._pending_rpcs .pop (request_id )
125116 raise RoborockException (f"Command timed out after { timeout } s" ) from ex
126117 except Exception :
127118 logging .exception ("Uncaught error sending command" )
128- async with self ._queue_lock :
129- self ._waiting_queue .pop (request_id , None )
119+ await self ._pending_rpcs .pop (request_id )
130120 raise
131121
132122
0 commit comments