Skip to content

feat: async endpoint annotations for per-handler interceptor configuration#654

Merged
dgafka merged 5 commits intomainfrom
feat-passing-async-attributes
Apr 4, 2026
Merged

feat: async endpoint annotations for per-handler interceptor configuration#654
dgafka merged 5 commits intomainfrom
feat-passing-async-attributes

Conversation

@dgafka
Copy link
Copy Markdown
Member

@dgafka dgafka commented Apr 4, 2026

Why is this change proposed?

When async handlers (especially ProjectionV2) run on asynchronous endpoints, interceptors like DbalTransactionInterceptor and CollectorSenderInterceptor wrap the entire execution — overriding the handler's own per-batch transaction management. There was no way to configure interceptor behavior per-handler on async endpoints because the Bridge is shared across all handlers on a channel, and handler-specific attributes were invisible at the async consumer level.

This introduces new Enterprise feature: endpointAnnotations on the #[Asynchronous] attribute, allowing per-handler interceptor configuration that is resolved at runtime when the message is fetched from the channel.

Usage Examples

Per-handler interceptor configuration:

#[Asynchronous('async', endpointAnnotations: [new WithoutDatabaseTransaction()])]
#[CommandHandler('processPayment', endpointId: 'payment.endpoint')]
public function handle(ProcessPayment $command): void
{
    // This handler runs without the global DBAL transaction wrapper
}

Custom async endpoint attribute:

#[Attribute]
class MyCustomConfig implements AsynchronousEndpointAttribute
{
    public function __construct(public int $retryCount = 3) {}
}

// Use in handler
#[Asynchronous('async', endpointAnnotations: [new MyCustomConfig(retryCount: 5)])]
#[CommandHandler('doWork', endpointId: 'work.endpoint')]
public function handle(DoWork $command): void {}

// Access in interceptor targeting AsynchronousRunningEndpoint
#[Around(pointcut: AsynchronousRunningEndpoint::class)]
public function intercept(MethodInvocation $inv, ?MyCustomConfig $config = null): mixed
{
    // $config is injected from the handler's endpointAnnotations
    return $inv->proceed();
}

ProjectionV2 — automatic, no user action needed:

For new Projecting system, we honour batch and flush based projecting. Meaning we don't want to start long transaction, which will end up when all events will be projected. As for global tracked projections that could span long running transaction.

#[ProjectionV2('ticket_list')]
#[Asynchronous('projections')]
#[FromStream(Ticket::class)]
class TicketListProjection
{
    // WithoutDatabaseTransaction and WithoutMessageCollector
    // are automatically added by the framework
}

Flow

sequenceDiagram
    participant Queue as Async Channel
    participant Poll as PollToGatewayTaskExecutor
    participant Registry as AsyncHandlerAnnotationRegistry
    participant Context as AsyncEndpointAnnotationContext
    participant Interceptor as DbalTransactionInterceptor
    participant Handler as Message Handler

    Poll->>Queue: poll message
    Poll->>Poll: read routing slip[0]
    Poll->>Registry: getAnnotationsForChannel(executionChannel)
    Poll->>Context: setAnnotations([WithoutDatabaseTransaction])
    Poll->>Interceptor: gateway.execute()
    Interceptor->>Context: resolve ?WithoutDatabaseTransaction
    Note over Interceptor: skip transaction (attribute present)
    Interceptor->>Handler: proceed()
    Handler-->>Poll: done
    Poll->>Context: clear()
Loading

Pull Request Contribution Terms

  • I have read and agree to the contribution terms outlined in CONTRIBUTING.

@dgafka dgafka merged commit 0730121 into main Apr 4, 2026
12 checks passed
@dgafka dgafka deleted the feat-passing-async-attributes branch April 4, 2026 19:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant