@@ -239,6 +239,8 @@ def __init__(
239239 raise_handler_exceptions : bool = False ,
240240 inline_methods : frozenset [str ] = frozenset (),
241241 on_stream_exception : Callable [[Exception ], Awaitable [None ]] | None = None ,
242+ drain_in_flight_on_read_eof : bool = False ,
243+ read_eof_response_drain_timeout : float = 5.0 ,
242244 ) -> None :
243245 """Wire a dispatcher over a transport's `SessionMessage` stream pair.
244246
@@ -264,12 +266,23 @@ def __init__(
264266 )
265267 self ._peer_cancel_mode : PeerCancelMode = peer_cancel_mode
266268 self ._raise_handler_exceptions = raise_handler_exceptions
269+ self ._drain_in_flight_on_read_eof = drain_in_flight_on_read_eof
270+ self ._read_eof_response_drain_timeout = read_eof_response_drain_timeout
271+ # Request methods handled inline in the read loop (awaited before the
272+ # next message is dequeued) instead of spawned concurrently. Use for
273+ # methods whose side effects must be observable to the next message,
274+ # e.g. `initialize`, so a pipelined follow-up sees the initialized state.
275+ # Only suitable for handlers that complete quickly, since inline handling
276+ # blocks dequeuing; a handler that awaits the peer (`send_raw_request`)
277+ # while inline will deadlock because the parked read loop cannot dequeue
278+ # the response.
267279 self ._inline_methods = inline_methods
268280 self ._on_stream_exception = on_stream_exception
269281
270282 self ._next_id = 0
271283 self ._pending : dict [RequestId , _Pending ] = {}
272284 self ._in_flight : dict [RequestId , _InFlight [TransportT ]] = {}
285+ self ._responses_in_flight : set [RequestId ] = set ()
273286 self ._tg : anyio .abc .TaskGroup | None = None
274287 self ._running = False
275288 self ._closed = False
@@ -451,6 +464,12 @@ async def run(
451464 except anyio .ClosedResourceError :
452465 # Receive end closed under us (stateless SHTTP teardown); same as EOF.
453466 logger .debug ("read stream closed by transport; treating as EOF" )
467+ if self ._drain_in_flight_on_read_eof :
468+ with anyio .move_on_after (self ._read_eof_response_drain_timeout ) as scope :
469+ while self ._in_flight or self ._responses_in_flight :
470+ await anyio .sleep (0 )
471+ if scope .cancelled_caught :
472+ logger .debug ("timed out draining in-flight responses after read EOF" )
454473 # EOF: wake blocked `send_raw_request` waiters with CONNECTION_CLOSED.
455474 self ._running = False
456475 self ._closed = True
@@ -722,16 +741,24 @@ async def _write(self, message: JSONRPCMessage, metadata: MessageMetadata = None
722741 await self ._write_stream .send (SessionMessage (message = message , metadata = metadata ))
723742
724743 async def _write_result (self , request_id : RequestId , result : dict [str , Any ]) -> None :
744+ key = _coerce_id (request_id )
745+ self ._responses_in_flight .add (key )
725746 try :
726747 await self ._write (JSONRPCResponse (jsonrpc = "2.0" , id = request_id , result = result ))
727748 except (anyio .BrokenResourceError , anyio .ClosedResourceError ):
728749 logger .debug ("dropped result for %r: write stream closed" , request_id )
750+ finally :
751+ self ._responses_in_flight .discard (key )
729752
730753 async def _write_error (self , request_id : RequestId , error : ErrorData ) -> None :
754+ key = _coerce_id (request_id )
755+ self ._responses_in_flight .add (key )
731756 try :
732757 await self ._write (JSONRPCError (jsonrpc = "2.0" , id = request_id , error = error ))
733758 except (anyio .BrokenResourceError , anyio .ClosedResourceError ):
734759 logger .debug ("dropped error for %r: write stream closed" , request_id )
760+ finally :
761+ self ._responses_in_flight .discard (key )
735762
736763 async def _final_write (
737764 self ,
0 commit comments