From 840eed5a606c33961f1b762babe6a8f50af83ab5 Mon Sep 17 00:00:00 2001 From: remimd Date: Sat, 16 May 2026 10:41:07 +0200 Subject: [PATCH] feat: Make MemoryQueue an async context manager --- cq/_core/queues/memory.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/cq/_core/queues/memory.py b/cq/_core/queues/memory.py index cf6ec60..4f88983 100644 --- a/cq/_core/queues/memory.py +++ b/cq/_core/queues/memory.py @@ -1,5 +1,6 @@ from collections.abc import AsyncIterator, Awaitable, Callable, Sequence from contextlib import asynccontextmanager +from types import TracebackType from typing import Any, Self import anyio @@ -19,6 +20,17 @@ class MemoryQueue[T](Queue[T]): def __init__(self, maxsize: int = 0) -> None: self.__producer, self.__consumer = anyio.create_memory_object_stream(maxsize) + async def __aenter__(self) -> Self: + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + await self.close() + def __aiter__(self) -> AsyncIterator[T]: return aiter(self.__consumer) @@ -40,10 +52,8 @@ async def draining( .add_middlewares(*middlewares) .draining(concurrency=concurrency, graceful=True) ): - try: + async with self: yield self - finally: - await self.close() async def send(self, message: T, /) -> None: await self.__producer.send(message)