Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,18 @@
[![PyPI - Downloads](https://img.shields.io/pypi/dm/python-cq.svg?color=blue)](https://pypistats.org/packages/python-cq)
[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)

Documentation: https://python-cq.remimd.dev
An async-first Python library for structuring code around CQRS (Commands, Queries, Events) with pluggable dependency injection.

**python-cq** is a Python package designed to organize your code following CQRS principles. It provides a `DIAdapter` protocol for dependency injection, with [python-injection](https://github.com/100nm/python-injection) as the default implementation available via the `[injection]` extra.
## Documentation

The full guide lives at **<https://python-cq.remimd.dev>**. Start there: it covers installation, the message model, dispatching, bus configuration, command pipelines, and how to plug in a custom DI framework.

## Installation

⚠️ _Requires Python 3.12 or higher_
Requires Python 3.12 or higher.

Without dependency injection:
```bash
pip install python-cq
```

With [python-injection](https://github.com/100nm/python-injection) as the DI backend (recommended):
```bash
pip install "python-cq[injection]"
```

The `[injection]` extra installs [python-injection](https://github.com/100nm/python-injection) as the default DI backend (recommended). To bring your own DI framework, install `python-cq` without the extra and see the [Custom DI adapter](https://python-cq.remimd.dev/di) guide.
61 changes: 43 additions & 18 deletions docs/di.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# Custom DI
# Custom DI adapter

**python-cq** abstracts dependency injection behind the `DIAdapter` protocol, allowing you to integrate any DI framework.
!!! note
If you installed `python-cq[injection]`, you can skip this page. The default DI integration with [python-injection](https://github.com/100nm/python-injection) is already wired up. This guide is only useful if you want to plug in a different DI framework (or none at all).

**python-cq** does not depend on any specific dependency injection container. Instead, it talks to DI through the `DIAdapter` protocol. Implement this protocol once and you can use the library with any container you already have in your project.

## The `CQ` class

`CQ` is the central object that binds together the handler registries and the DI adapter. The module-level decorators (`command_handler`, `event_handler`, `query_handler`) and bus factories (`new_command_bus`, `new_event_bus`, `new_query_bus`) all derive from a default `CQ` instance created at import time.
`CQ` ties together the handler registries and the DI adapter. The module-level decorators (`command_handler`, `event_handler`, `query_handler`) and bus factories (`new_command_bus`, `new_event_bus`, `new_query_bus`) all derive from a default `CQ` instance built at import time.

You can create additional isolated instances when you need separate handler registries, for example in tests or in a multi-tenant setup:
You create your own `CQ` instance to wire the library against a custom `DIAdapter`:

```python
from cq import CQ, ContextCommandPipeline
Expand All @@ -22,15 +25,17 @@ new_event_bus = cq.new_event_bus
new_query_bus = cq.new_query_bus
```

When using `ContextCommandPipeline`, pass `cq.di` explicitly so it uses the same DI adapter:
When you build a `ContextCommandPipeline` against a non-default `CQ`, pass its DI adapter explicitly so the pipeline dispatches through the right buses:

```python
ContextCommandPipeline(cq.di)
```

If you use the default `CQ`, `ContextCommandPipeline()` (with no argument) is enough.

## Implementing a `DIAdapter`

`DIAdapter` is a `Protocol`. Implement it to integrate any DI framework:
`DIAdapter` is a `Protocol` with four methods, three of them required:

```python
from collections.abc import Awaitable, Callable
Expand All @@ -39,15 +44,28 @@ from typing import Any, AsyncContextManager

class MyDIAdapter(DIAdapter):
def command_scope(self) -> AsyncContextManager[None]:
# Return an async context manager that:
# - opens a DI scope for the duration of a command dispatch
# - manages the lifecycle of a RelatedEvents instance within that scope
# - silently ignores nested activations (re-entrant calls)
"""
Return an async context manager that delimits a command dispatch.

Responsibilities:
1. Open a DI scope for the duration of the command.
2. Build a `RelatedEvents` instance inside that scope and make it
resolvable, so command handlers can inject it.
3. Silently ignore nested re-entrant activations (see below).

Re-entrancy: `command_scope` is opened twice for a single logical
command when a `ContextCommandPipeline` wraps a command dispatch.
Implementations must detect that case (for example, by checking a
contextvar) and skip opening a second scope.
"""
...

def lazy[T](self, tp: type[T]) -> Callable[[], Awaitable[T]]:
# Ask the DI framework for a resolver for `tp`.
# The returned callable, when called and awaited, performs the resolution.
"""
Return a callable that, when called and awaited, resolves `tp` from
the container. Used to wire up bus references lazily so that buses
configured later in the import graph are still picked up.
"""
...

def register_defaults(
Expand All @@ -56,16 +74,23 @@ class MyDIAdapter(DIAdapter):
event_bus: Callable[..., EventBus],
query_bus: Callable[..., QueryBus[Any]],
) -> None:
# Register the bus factories as default providers so that handlers
# can declare CommandBus, EventBus, or QueryBus as dependencies.
# This method is optional; the default implementation is a no-op.
"""
Register the bus factories with the container so that handlers can
declare `CommandBus`, `EventBus`, or `QueryBus` as dependencies.

Optional: the default implementation is a no-op, which is fine if
your container does not need explicit registration.
"""
...

def wire[T](self, tp: type[T]) -> Callable[..., Awaitable[T]]:
# Return an async factory that instantiates `tp` with injected dependencies.
# Used internally to build handler instances.
"""
Return an async factory that instantiates `tp` with injected
dependencies. Used internally to build handler instances.
"""
...


cq = CQ(MyDIAdapter()).register_defaults()
```

The reference implementation for python-injection lives in `cq.ext.injection.InjectionAdapter`. It is a good starting point if you need to model your own adapter on a working example.
95 changes: 61 additions & 34 deletions docs/guides/configuring.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
# Configuring a Bus
# Configuring a bus

!!! note
This guide assumes the `[injection]` extra is installed.
This guide assumes the `[injection]` extra is installed. If you use a different DI framework, register the factory below with your own container, not with `@injectable`.

Each bus can be customized by attaching listeners and middlewares. The recommended pattern is a factory function that builds a configured bus and registers it in the DI container:

Each bus can be customized with listeners and middlewares. To do so, create a factory function decorated with `@injectable` that returns the configured bus.
```python
from cq import CommandBus, MiddlewareResult, new_command_bus
from cq import CommandBus, new_command_bus
from injection import injectable

async def listener(message: MessageType):
async def listener(message):
...

async def middleware(message: MessageType) -> MiddlewareResult[ReturnType]:
# do something before the handler is executed
return_value = yield
# do something after the handler is executed
async def middleware(message):
# runs before the handler
result = yield
# runs after the handler

@injectable
def command_bus_factory() -> CommandBus:
Expand All @@ -24,68 +25,74 @@ def command_bus_factory() -> CommandBus:
return bus
```

The same pattern applies to `QueryBus` and `EventBus` using `new_query_bus()` and `new_event_bus()`.
The same pattern applies to `QueryBus` and `EventBus`, with `new_query_bus()` and `new_event_bus()`.

## Listeners

Listeners are executed before the handler(s). They receive the message and can perform side effects such as logging or validation.
Listeners are fire-and-forget callables that receive the message. They are useful for logging, metrics, or any side effect that does not need to influence the handler.

```python
async def log_listener(message: MessageType):
async def log_listener(message):
print(f"Received: {message}")
```

Listeners are scheduled in an `anyio` task group, so several listeners run concurrently. The timing depends on the bus type:

* **`CommandBus` and `QueryBus`**: every listener must finish before the handler runs. The handler cannot start until listeners have settled, and `dispatch` returns the handler's value as soon as it completes.
* **`EventBus`**: listeners and handlers share the same task group, so they all run concurrently. `dispatch` returns once everything has finished.

## Middlewares

Middlewares wrap around handler execution, allowing you to run logic before and after a handler processes a message.
A middleware wraps handler execution. Use it to run logic before and after the handler processes the message, or to handle exceptions.

```python
async def timing_middleware(message: Any) -> MiddlewareResult[Any]:
import time

async def timing_middleware(message):
start = time.time()
yield
print(f"Execution time: {time.time() - start}s")
```

For commands and queries, middlewares run once around the single handler. For events, middlewares run around each handler individually.
For commands and queries, the middleware stack wraps the single registered handler once. For events, the stack is applied around each handler independently, so a middleware sees one invocation per event handler.

!!! note
The generator was chosen to keep both the input message and the return value read-only.
The `yield` form makes the middleware look like a try/finally around the handler call. The expression `result = yield` receives the handler's return value, but only for inspection: middlewares of this form cannot replace it. This was a deliberate choice to keep the message and the result read-only by default.

### Classic middlewares

As an alternative, classic middlewares receive `call_next` as their first argument, followed by the handler's arguments. This pattern allows you to read and modify the return value:
If you need to read or substitute the return value, write a "classic" middleware. It takes `call_next` as its first argument and returns the value it wants to expose to the caller:

```python
from collections.abc import Awaitable, Callable
from typing import Any
import time

async def timing_middleware(
call_next: Callable[[Any], Awaitable[Any]],
message: Any,
) -> Any:
async def timing_middleware(call_next, message):
start = time.time()
result = await call_next(message)
print(f"Execution time: {time.time() - start}s")
return result
```

Both styles can be mixed freely in the same bus.

## Class-based listeners and middlewares

For more flexibility, listeners and middlewares can be defined as classes with a `__call__` method. This allows you to inject dependencies and configure their behavior.
Listeners and middlewares can also be classes with a `__call__` method, which is convenient when they need their own dependencies:

```python
from cq import MiddlewareResult
from dataclasses import dataclass

@dataclass
class LogListener:
logger: Logger

async def __call__(self, message: Any):
async def __call__(self, message):
self.logger.info(f"Received: {message}")

@dataclass
class TimingMiddleware:
metrics: MetricsService

async def __call__(self, message: Any) -> MiddlewareResult[Any]:
async def __call__(self, message):
start = time.time()
yield
self.metrics.record(time.time() - start)
Expand All @@ -94,13 +101,33 @@ class TimingMiddleware:
class ClassicTimingMiddleware:
metrics: MetricsService

async def __call__(
self,
call_next: Callable[[Any], Awaitable[Any]],
message: Any,
) -> Any:
async def __call__(self, call_next, message):
start = time.time()
result = await call_next(message)
self.metrics.record(time.time() - start)
return result
```
```

If you build these classes through your DI container, you get the same constructor injection as for handlers.

## Built-in middlewares

### `RetryMiddleware`

`cq.middlewares.retry.RetryMiddleware` retries the wrapped call when it raises one of the configured exception types:

```python
from cq import new_command_bus
from cq.middlewares.retry import RetryMiddleware

bus = new_command_bus()
bus.add_middlewares(RetryMiddleware(retry=3, delay=0.5, exceptions=(TimeoutError,)))
```

The parameters are:

* `retry`: total number of attempts (including the first one). With `retry=3`, the call runs at most three times.
* `delay`: seconds to wait between attempts. Defaults to `0`.
* `exceptions`: the exception types that trigger a retry. Defaults to `(Exception,)`, which retries on any non-`BaseException` failure.

If every attempt fails, the last exception is re-raised.
59 changes: 32 additions & 27 deletions docs/guides/dispatching.md
Original file line number Diff line number Diff line change
@@ -1,52 +1,57 @@
# Dispatching messages

To dispatch messages to their handlers, **python-cq** provides three bus classes: `CommandBus`, `QueryBus`, and `EventBus`.
**python-cq** exposes three bus types to dispatch messages to their handlers: `CommandBus`, `QueryBus`, and `EventBus`. Each takes a generic parameter that types the return value of `dispatch`.

Each bus can take a generic parameter to specify the return type of the `dispatch` method.
A bus instance is obtained from your DI container. The examples below assume the bus has already been resolved; see [Configuring a bus](configuring.md) for how to build and register one.

## Retrieving a bus
## `CommandBus`

Bus instances are resolved through the configured DI adapter. When using the `[injection]` extra:
The `CommandBus` dispatches commands to their single registered handler and returns the handler's value:

```python
from cq import CommandBus
from injection import inject

@inject
async def create_user(bus: CommandBus[None]):
command = CreateUserCommand(name="John", email="john@example.com")
await bus.dispatch(command)
bus: CommandBus[int] = ...
command = CreateUserCommand(name="Ada", email="ada@example.com")
user_id = await bus.dispatch(command)
```

## CommandBus
## `QueryBus`

Use the CommandBus to dispatch commands. It returns the value produced by the handler.
```python
from cq import CommandBus

bus: CommandBus[None]
command = CreateUserCommand(name="John", email="john@example.com")
await bus.dispatch(command)
```
The `QueryBus` dispatches queries to their single registered handler and returns the handler's value:

## QueryBus

Use the QueryBus to dispatch queries. It returns the value produced by the handler.
```python
from cq import QueryBus

bus: QueryBus[User]
query = GetUserByIdQuery(user_id)
bus: QueryBus[User] = ...
query = GetUserByIdQuery(user_id=42)
user = await bus.dispatch(query)
```

## EventBus
## `EventBus`

The `EventBus` fans an event out to every registered handler. It does not return a value, since multiple handlers may produce conflicting results.

Use the EventBus to dispatch events. Since events can be handled by multiple handlers (or none), it does not return a value.
```python
from cq import EventBus

bus: EventBus
event = UserCreatedEvent(user_id)
bus: EventBus = ...
event = UserCreatedEvent(user_id=42)
await bus.dispatch(event)
```

Event handlers run concurrently inside a single `anyio` task group. `dispatch` returns once every handler has finished. If any handler raises (and is not declared with `fail_silently=True`), the exception is propagated through the task group, which may produce an `ExceptionGroup` when several handlers fail.

## When no handler is registered

If no handler matches the message type, `dispatch` returns the `NotImplemented` sentinel instead of raising. This is convenient when a message is optional in some contexts (for example a query that may or may not have a backing handler depending on configuration), but it means you should not assume a `dispatch` result is always meaningful:

```python
result = await bus.dispatch(SomeQuery())

if result is NotImplemented:
# No handler is registered for SomeQuery.
...
```

The same sentinel is returned when a handler declared with `fail_silently=True` raises. For events, there is nothing to return, so the sentinel is not exposed.
Loading