Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from injection import Module

from cq import CQ, Bus, CommandBus, EventBus, QueryBus
from cq._core.dispatcher.bus import SimpleBus
from cq._core.dispatchers.bus import SimpleBus
from cq.ext.injection import InjectionAdapter
from tests.helpers.history import HistoryMiddleware

Expand Down
14 changes: 11 additions & 3 deletions cq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from ._core.cq import CQ
from ._core.di import DIAdapter
from ._core.di import NoDI as _NoDI
from ._core.dispatcher.base import Dispatcher
from ._core.dispatcher.bus import Bus
from ._core.dispatcher.pipe import ContextPipeline, Pipe
from ._core.dispatchers.abc import Dispatcher
from ._core.dispatchers.bus import Bus
from ._core.dispatchers.pipe import ContextPipeline, Pipe
from ._core.message import (
AnyCommandBus,
Command,
Expand All @@ -15,6 +15,9 @@
)
from ._core.middleware import Middleware, MiddlewareResult, resolve_handler_source
from ._core.pipetools import ContextCommandPipeline as _ContextCommandPipeline
from ._core.pump import Pump
from ._core.queues.abc import Consumer, Producer, Queue
from ._core.queues.memory import MemoryQueue
from ._core.related_events import AnyIORelatedEvents, RelatedEvents

__all__ = (
Expand All @@ -24,17 +27,22 @@
"CQ",
"Command",
"CommandBus",
"Consumer",
"ContextCommandPipeline",
"ContextPipeline",
"DIAdapter",
"Dispatcher",
"Event",
"EventBus",
"MemoryQueue",
"Middleware",
"MiddlewareResult",
"Pipe",
"Producer",
"Pump",
"Query",
"QueryBus",
"Queue",
"RelatedEvents",
"command_handler",
"event_handler",
Expand Down
2 changes: 1 addition & 1 deletion cq/_core/cq.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Self

from cq._core.di import DIAdapter
from cq._core.dispatcher.bus import Bus, SimpleBus, TaskBus
from cq._core.dispatchers.bus import Bus, SimpleBus, TaskBus
from cq._core.handler import (
HandlerDecorator,
HandlerRegistry,
Expand Down
2 changes: 1 addition & 1 deletion cq/_core/di.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from contextlib import nullcontext
from typing import TYPE_CHECKING, Any, AsyncContextManager, Protocol, runtime_checkable

if TYPE_CHECKING:
if TYPE_CHECKING: # pragma: no cover
from cq import CommandBus, EventBus, QueryBus


Expand Down
File renamed without changes.
10 changes: 5 additions & 5 deletions cq/_core/dispatcher/base.py → cq/_core/dispatchers/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
class Dispatcher[I, O](Protocol):
__slots__ = ()

async def __call__(self, input_value: I, /) -> O:
return await self.dispatch(input_value)
async def __call__(self, message: I, /) -> O:
return await self.dispatch(message)

@abstractmethod
async def dispatch(self, input_value: I, /) -> O:
async def dispatch(self, message: I, /) -> O:
raise NotImplementedError


Expand All @@ -32,12 +32,12 @@ def add_middlewares(self, *middlewares: Middleware[[I], O]) -> Self:
async def _invoke_with_middlewares(
self,
handler: Callable[[I], Awaitable[O]],
input_value: I,
message: I,
/,
fail_silently: bool = False,
) -> O:
try:
return await self.__middleware_group.invoke(handler, input_value)
return await self.__middleware_group.invoke(handler, message)
except Exception:
if fail_silently:
return NotImplemented
Expand Down
32 changes: 16 additions & 16 deletions cq/_core/dispatcher/bus.py → cq/_core/dispatchers/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import anyio
from anyio.abc import TaskGroup

from cq._core.dispatcher.base import BaseDispatcher, Dispatcher
from cq._core.dispatchers.abc import BaseDispatcher, Dispatcher
from cq._core.handler import (
HandleFunction,
HandlerFactory,
Expand Down Expand Up @@ -33,7 +33,7 @@ def add_middlewares(self, *middlewares: Middleware[[I], O]) -> Self:
@abstractmethod
def subscribe(
self,
input_type: type[I],
message_type: type[I],
factory: HandlerFactory[[I], O],
fail_silently: bool = ...,
) -> Self:
Expand All @@ -57,19 +57,19 @@ def add_listeners(self, *listeners: Listener[I]) -> Self:

def subscribe(
self,
input_type: type[I],
message_type: type[I],
factory: HandlerFactory[[I], O],
fail_silently: bool = False,
) -> Self:
self.__registry.subscribe(input_type, factory, fail_silently=fail_silently)
self.__registry.subscribe(message_type, factory, fail_silently=fail_silently)
return self

def _handlers_from(self, input_type: type[I]) -> Iterator[HandleFunction[[I], O]]:
return self.__registry.handlers_from(input_type)
def _handlers_from(self, message_type: type[I]) -> Iterator[HandleFunction[[I], O]]:
return self.__registry.handlers_from(message_type)

def _trigger_listeners(self, input_value: I, /, task_group: TaskGroup) -> None:
def _trigger_listeners(self, message: I, /, task_group: TaskGroup) -> None:
for listener in self.__listeners:
task_group.start_soon(listener, input_value)
task_group.start_soon(listener, message)


class SimpleBus[I, O](BaseBus[I, O]):
Expand All @@ -78,14 +78,14 @@ class SimpleBus[I, O](BaseBus[I, O]):
def __init__(self, registry: HandlerRegistry[I, O] | None = None, /) -> None:
super().__init__(registry or SingleHandlerRegistry())

async def dispatch(self, input_value: I, /) -> O:
async def dispatch(self, message: I, /) -> O:
async with anyio.create_task_group() as task_group:
self._trigger_listeners(input_value, task_group)
self._trigger_listeners(message, task_group)

for handler in self._handlers_from(type(input_value)):
for handler in self._handlers_from(type(message)):
return await self._invoke_with_middlewares(
handler,
input_value,
message,
handler.fail_silently,
)

Expand All @@ -98,14 +98,14 @@ class TaskBus[I](BaseBus[I, None]):
def __init__(self, registry: HandlerRegistry[I, None] | None = None, /) -> None:
super().__init__(registry or MultipleHandlerRegistry())

async def dispatch(self, input_value: I, /) -> None:
async def dispatch(self, message: I, /) -> None:
async with anyio.create_task_group() as task_group:
self._trigger_listeners(input_value, task_group)
self._trigger_listeners(message, task_group)

for handler in self._handlers_from(type(input_value)):
for handler in self._handlers_from(type(message)):
task_group.start_soon(
self._invoke_with_middlewares,
handler,
input_value,
message,
handler.fail_silently,
)
6 changes: 3 additions & 3 deletions cq/_core/dispatcher/lazy.py → cq/_core/dispatchers/lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import TypeAliasType

from cq._core.di import DIAdapter
from cq._core.dispatcher.base import Dispatcher
from cq._core.dispatchers.abc import Dispatcher


class LazyDispatcher[I, O](Dispatcher[I, O]):
Expand All @@ -19,6 +19,6 @@ def __init__(
) -> None:
self.__resolve = di.lazy(dispatcher_type) # type: ignore[arg-type]

async def dispatch(self, input_value: I, /) -> O:
async def dispatch(self, message: I, /) -> O:
dispatcher = await self.__resolve()
return await dispatcher.dispatch(input_value)
return await dispatcher.dispatch(message)
68 changes: 32 additions & 36 deletions cq/_core/dispatcher/pipe.py → cq/_core/dispatchers/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)

from cq._core.common.typing import Decorator, Method
from cq._core.dispatcher.base import BaseDispatcher, Dispatcher
from cq._core.dispatchers.abc import BaseDispatcher, Dispatcher
from cq._core.middleware import Middleware, MiddlewareGroup

type ConvertAsync[**P, I, O] = Callable[Concatenate[O, P], Awaitable[I]]
Expand All @@ -31,7 +31,7 @@ class PipelineConverter[**P, I, O](Protocol):
__slots__ = ()

@abstractmethod
async def convert(self, output_value: O, /, *args: P.args, **kwargs: P.kwargs) -> I:
async def convert(self, result: O, /, *args: P.args, **kwargs: P.kwargs) -> I:
raise NotImplementedError


Expand All @@ -54,28 +54,24 @@ def add[T](
self.__steps.append(PipelineStep(converter, dispatcher))
return self

def add_static[T](
self,
input_value: T,
dispatcher: Dispatcher[T, Any] | None,
) -> Self:
converter = _StaticPipelineConverter(input_value)
def add_static[T](self, message: T, dispatcher: Dispatcher[T, Any] | None) -> Self:
converter = _StaticPipelineConverter(message)
self.add(converter, dispatcher) # type: ignore[arg-type]
return self

async def execute(self, input_value: I, /, *args: P.args, **kwargs: P.kwargs) -> O:
async def execute(self, message: I, /, *args: P.args, **kwargs: P.kwargs) -> O:
dispatcher = self.default_dispatcher

for step in self.__steps:
output_value = await dispatcher.dispatch(input_value)
input_value = await step.converter.convert(output_value, *args, **kwargs)
result = await dispatcher.dispatch(message)
message = await step.converter.convert(result, *args, **kwargs)

if input_value is None:
if message is None:
return NotImplemented

dispatcher = step.dispatcher or self.default_dispatcher

return await dispatcher.dispatch(input_value)
return await dispatcher.dispatch(message)


class Pipe[I, O](BaseDispatcher[I, O]):
Expand Down Expand Up @@ -136,15 +132,15 @@ def decorator(wp: Convert[[], T, Any]) -> Convert[[], T, Any]:

def add_static_step[T](
self,
input_value: T,
message: T,
/,
dispatcher: Dispatcher[T, Any] | None = None,
) -> Self:
self.__steps.add_static(input_value, dispatcher)
self.__steps.add_static(message, dispatcher)
return self

async def dispatch(self, input_value: I, /) -> O:
return await self._invoke_with_middlewares(self.__steps.execute, input_value)
async def dispatch(self, message: I, /) -> O:
return await self._invoke_with_middlewares(self.__steps.execute, message)


class ContextPipeline[I]:
Expand Down Expand Up @@ -199,11 +195,11 @@ def add_middlewares(self, *middlewares: Middleware[[I], Any]) -> Self:

def add_static_step[T](
self,
input_value: T,
message: T,
/,
dispatcher: Dispatcher[T, Any] | None = None,
) -> Self:
self.__steps.add_static(input_value, dispatcher)
self.__steps.add_static(message, dispatcher)
return self

if TYPE_CHECKING: # pragma: no cover
Expand Down Expand Up @@ -255,49 +251,49 @@ def decorator(wp: ConvertMethod[T, Any]) -> ConvertMethod[T, Any]:

async def __execute[Context](
self,
input_value: I,
message: I,
/,
*,
context: Context,
context_type: type[Context] | None,
) -> Context:
async def handler(i: I, /) -> Context:
await self.__steps.execute(i, context, context_type)
async def handler(first_message: I, /) -> Context:
await self.__steps.execute(first_message, context, context_type)
return context

return await self.__middleware_group.invoke(handler, input_value)
return await self.__middleware_group.invoke(handler, message)


@dataclass(repr=False, eq=False, frozen=True, slots=True)
class BoundContextPipeline[I, O](Dispatcher[I, O]):
dispatch_method: Callable[[I], Awaitable[O]]

async def dispatch(self, input_value: I, /) -> O:
return await self.dispatch_method(input_value)
async def dispatch(self, message: I, /) -> O:
return await self.dispatch_method(message)


@dataclass(repr=False, eq=False, frozen=True, slots=True)
class _AsyncPipelineConverter[**P, I, O](PipelineConverter[P, I, O]):
converter: ConvertAsync[P, I, O]

async def convert(self, output_value: O, /, *args: P.args, **kwargs: P.kwargs) -> I:
return await self.converter(output_value, *args, **kwargs)
async def convert(self, result: O, /, *args: P.args, **kwargs: P.kwargs) -> I:
return await self.converter(result, *args, **kwargs)


@dataclass(repr=False, eq=False, frozen=True, slots=True)
class _SyncPipelineConverter[**P, I, O](PipelineConverter[P, I, O]):
converter: ConvertSync[P, I, O]

async def convert(self, output_value: O, /, *args: P.args, **kwargs: P.kwargs) -> I:
return self.converter(output_value, *args, **kwargs)
async def convert(self, result: O, /, *args: P.args, **kwargs: P.kwargs) -> I:
return self.converter(result, *args, **kwargs)


@dataclass(repr=False, eq=False, frozen=True, slots=True)
class _StaticPipelineConverter[I](PipelineConverter[..., I, Any]):
input_value: I
message: I

async def convert(self, output_value: Any, /, *args: Any, **kwargs: Any) -> I:
return self.input_value
async def convert(self, result: Any, /, *args: Any, **kwargs: Any) -> I:
return self.message


@dataclass(repr=False, eq=False, frozen=True, slots=True)
Expand All @@ -308,13 +304,13 @@ class _AsyncContextPipelineConverter[I, O](

async def convert(
self,
output_value: O,
result: O,
/,
context: object,
context_type: type | None,
) -> I:
method = self.converter.__get__(context, context_type)
return await method(output_value)
return await method(result)


@dataclass(repr=False, eq=False, frozen=True, slots=True)
Expand All @@ -325,10 +321,10 @@ class _SyncContextPipelineConverter[I, O](

async def convert(
self,
output_value: O,
result: O,
/,
context: object,
context_type: type | None,
) -> I:
method = self.converter.__get__(context, context_type)
return method(output_value)
return method(result)
Loading