-
Notifications
You must be signed in to change notification settings - Fork 102
feat: Rework Event Queues to use central event bus #515
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant architectural shift in how events are handled, moving from a distributed, queue-per-task model to a centralized event bus. The primary goal is to ensure robust event processing, guaranteeing persistence before any event is made visible to consumers. This change streamlines the event flow, decouples concerns, and removes complex background processing logic from the request handling layer, leading to a more reliable and maintainable system. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a major and beneficial architectural change by refactoring the event queue system to use a central MainEventBus. This decouples event production from consumption, and centralizes persistence and distribution logic in the new MainEventBusProcessor. The "persistence first" approach significantly improves reliability and data consistency. The changes are extensive, touching core event queue logic, request handlers, and numerous tests. My review has identified a critical resource leak in the new ChildQueue implementation that could lead to deadlocks, along with some medium-severity suggestions to improve testability and code clarity. Overall, this is a strong refactoring that moves the system in the right direction.
server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
Outdated
Show resolved
Hide resolved
server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java
Outdated
Show resolved
Hide resolved
server-common/src/test/java/io/a2a/server/events/EventQueueTest.java
Outdated
Show resolved
Hide resolved
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant and well-executed architectural refactoring to centralize event handling via a MainEventBus. The changes correctly decouple event production from persistence and consumption, which should resolve potential race conditions and improve the system's robustness. The core logic is well-encapsulated in the new MainEventBusProcessor, and the related classes like EventQueue, QueueManager, and DefaultRequestHandler have been updated accordingly. The test suite has also been commendably updated to reflect the new asynchronous nature of the event system. I have a few suggestions to further enhance thread safety, error handling, and test practices, but overall, this is a strong improvement.
server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java
Outdated
Show resolved
Hide resolved
| } catch (Exception e) { | ||
| LOGGER.error("Error processing event from MainEventBus", e); | ||
| // Continue processing despite errors | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The broad catch (Exception e) can mask critical, persistent issues. If a fatal error occurs (e.g., the database for TaskStore is unreachable), the processor will enter a busy-loop, consuming CPU and flooding logs without any mechanism to halt or signal a system-wide failure. Consider implementing more sophisticated error handling, such as a circuit breaker pattern or shutting down the processor after a certain number of consecutive fatal errors, to make the system more resilient and prevent hidden failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this works the way it used to (but in a different location), so it is out of scope for this PR. But it is a valid point. I've opened #516 to track it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed so it propagates errors
server-common/src/main/java/io/a2a/server/events/EventQueue.java
Outdated
Show resolved
Hide resolved
server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
Outdated
Show resolved
Hide resolved
6dca4df to
2d012ca
Compare
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant and well-executed architectural refactoring by centralizing event processing with a MainEventBus and MainEventBusProcessor. This change greatly improves the system's reliability by ensuring events are persisted before being distributed to consumers, which eliminates a class of race conditions. The decoupling of concerns also simplifies the DefaultRequestHandler considerably. The changes are consistent, and the test suite has been thoroughly updated to accommodate the new asynchronous nature of event handling.
I have found one critical issue and one medium-severity issue. The critical issue is a semaphore leak that could cause the system to hang. The medium-severity issue relates to a test utility that could be improved for better test isolation and clarity. Details are in the specific comments.
server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
Show resolved
Hide resolved
| public static EventQueue.EventQueueBuilder getEventQueueBuilder() { | ||
| return EventQueue.builder(); | ||
| return EventQueue.builder(TEST_EVENT_BUS) | ||
| .taskId("test-task-" + TASK_ID_COUNTER.incrementAndGet()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The getEventQueueBuilder() method automatically assigns a task ID using a static atomic counter. This can make tests harder to reason about and could lead to flakiness if tests are run in parallel, as the sequence of task IDs is not deterministic. It's better to have tests explicitly set their own task IDs for clarity and reproducibility. Please also remove the TASK_ID_COUNTER field, as it will no longer be needed.
public static EventQueue.EventQueueBuilder getEventQueueBuilder() {
return EventQueue.builder(TEST_EVENT_BUS);
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are just there to guarantee a unique id, we don't look up anything using these.
2d012ca to
95c3a79
Compare
|
/gemini review |
1 similar comment
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant architectural change to the event queuing system by centralizing event processing through a new MainEventBus and MainEventBusProcessor. Previously, EventQueue directly managed its internal queue and event distribution. Now, MainQueue (a type of EventQueue) submits events to the MainEventBus, which is then processed asynchronously by MainEventBusProcessor. This processor is responsible for persisting events to the TaskStore, sending push notifications, and finally distributing events to ChildQueue instances. This ensures events are persisted before clients receive them, preventing race conditions and enabling reliable event replay. The EventQueue abstract class has been refactored to separate MainQueue and ChildQueue responsibilities, with ChildQueue now holding its own BlockingQueue for local consumption. The DefaultRequestHandler has been updated to remove direct push notification sending and background task tracking, as these responsibilities are now handled by the MainEventBusProcessor. Test classes across various modules (extras/queue-manager-replicated, server-common, transport/jsonrpc) have been updated to reflect these changes, including initializing MainEventBus and MainEventBusProcessor in their setUp methods, passing MainEventBus to QueueManager constructors, and adjusting EventQueue creation and consumption logic. A new EventQueueUtil class was added for test setup, and MainEventBusContext, MainEventBusProcessorCallback, and MainEventBusProcessorInitializer were introduced to support the new event bus architecture. Review comments suggest converting MainEventBusContext to a Java record for conciseness, consolidating the duplicated EventQueueUtil test utility, and handling CompletableFuture exceptions from cleanupProducer in DefaultRequestHandler to prevent silent failures.
server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java
Outdated
Show resolved
Hide resolved
extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java
Show resolved
Hide resolved
server-common/src/main/java/io/a2a/server/events/MainEventBusContext.java
Outdated
Show resolved
Hide resolved
server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
Outdated
Show resolved
Hide resolved
server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant and well-executed architectural refactoring by centralizing event processing into a MainEventBus and a dedicated MainEventBusProcessor. This change greatly improves the system's robustness by enforcing a 'persistence-first' pattern, which eliminates potential race conditions. The decoupling of concerns simplifies DefaultRequestHandler and ResultAggregator, making the codebase easier to maintain. The updates to tests to handle the new asynchronous nature of event processing are thorough and correct. I've identified one minor issue regarding code duplication in a test utility class, but overall, this is an excellent improvement to the event queueing system.
extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java
Show resolved
Hide resolved
940d695 to
2ac771e
Compare
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant refactoring of the event queuing and processing mechanism by implementing a MainEventBus and MainEventBusProcessor. The MainEventBus acts as a central hub for events, ensuring that all events are first processed by the MainEventBusProcessor in a dedicated background thread. This processor is responsible for persisting events to the TaskStore and sending push notifications before distributing them to client-facing ChildQueues, thereby guaranteeing persistence before client visibility. The EventQueue class is refactored into MainQueue and ChildQueue types, with MainQueue handling event submission to the MainEventBus and ChildQueues managing local consumption. The DefaultRequestHandler is updated to remove direct push notification sending and background task tracking, delegating these responsibilities to the MainEventBusProcessor. Test cases across ReplicatedQueueManagerTest, EventConsumerTest, EventQueueTest, InMemoryQueueManagerTest, AbstractA2ARequestHandlerTest, DefaultRequestHandlerTest, ResultAggregatorTest, and TaskUpdaterTest are updated to reflect the new asynchronous event processing model, often incorporating retry logic and MainEventBusProcessor setup/teardown. A new EventQueueUtil class is introduced in server-common/src/test/java to assist with test setup. Review comments highlight a redundant EventQueueUtil.java file in extras/queue-manager-replicated/core/src/test/java that needs removal, and a critical issue in MainEventBusProcessor.java where updateTaskStore was swallowing exceptions, violating the 'persistence first' guarantee. The reviewer suggests re-throwing these exceptions to ensure event distribution is aborted if persistence fails.
extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java
Show resolved
Hide resolved
server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
Outdated
Show resolved
Hide resolved
f8c0b6d to
306ea56
Compare
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new MainEventBus and MainEventBusProcessor to centralize event persistence and distribution, ensuring events are persisted to the TaskStore before being distributed to client ChildQueues. The EventQueue hierarchy has been refactored, making MainQueue responsible for submitting events to the MainEventBus and ChildQueue responsible for local consumption. The MainEventBusProcessor runs in a background thread, handling persistence, push notifications, and distribution, and includes a testing callback for synchronization. Consequently, DefaultRequestHandler and ResultAggregator no longer directly handle TaskStore updates or push notifications, delegating these to the MainEventBusProcessor. Test cases across ReplicatedQueueManager, EventConsumer, EventQueue, InMemoryQueueManager, DefaultRequestHandler, ResultAggregator, and TaskUpdater have been updated to integrate with the new MainEventBus architecture, using new EventQueueUtil helpers and MainEventBusProcessorCallback for deterministic testing of asynchronous event processing. Review comments suggest improving ChildQueue.internalEnqueueItem to use queue.offer() for graceful backpressure handling and removing an unused semaphore release in ChildQueue.doClose for clarity.
| private void internalEnqueueItem(EventQueueItem item) { | ||
| super.enqueueItem(item); | ||
| // Internal method called by MainEventBusProcessor to add to local queue | ||
| // Note: Semaphore is managed by parent MainQueue (acquire/release), not ChildQueue | ||
| Event event = item.getEvent(); | ||
| if (isClosed()) { | ||
| LOGGER.warn("ChildQueue is closed. Event will not be enqueued. {} {}", this, event); | ||
| return; | ||
| } | ||
| queue.add(item); | ||
| LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The internalEnqueueItem method uses queue.add(). With a bounded queue (as suggested in another comment), this will throw an IllegalStateException if the queue is full. This exception would propagate up to the MainEventBusProcessor, disrupting event distribution to other child queues for the same task. A more robust approach is to use queue.offer() and handle the backpressure gracefully by closing the full queue.
private void internalEnqueueItem(EventQueueItem item) {
// Internal method called by MainEventBusProcessor to add to local queue
Event event = item.getEvent();
if (isClosed()) {
LOGGER.warn("ChildQueue is closed. Event will not be enqueued. {} {}", this, event);
return;
}
// Use offer() to handle backpressure. If the queue is full, close it to prevent memory issues.
if (!queue.offer(item)) {
LOGGER.warn("ChildQueue {} is full. Closing immediately.", this);
close(true); // immediate close
} else {
LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this);
}
}| // Release semaphore permits for cleared events to prevent deadlock | ||
| semaphore.release(clearedCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
db9e6f3 to
e410643
Compare
MainQueue is now only for sending to the central processor, and cannot be subscribed to
e410643 to
757312d
Compare
Cleanup and ensure that we now only listen to ChildQueues.
MainQueue is now only for sending to the central processor, and cannot be subscribed to.
Get rid of the background processing, now that tasks are stored in the central processor rather than the view layer (i.e. DefaultRequestHandler)