Skip to content

Conversation

@kabir
Copy link
Collaborator

@kabir kabir commented Dec 8, 2025

Cleanup and ensure that we now only listen to ChildQueues.

MainQueue is now only for sending to the central processor, and cannot be subscribed to.

Get rid of the background processing, now that tasks are stored in the central processor rather than the view layer (i.e. DefaultRequestHandler)

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant architectural shift in how events are handled, moving from a distributed, queue-per-task model to a centralized event bus. The primary goal is to ensure robust event processing, guaranteeing persistence before any event is made visible to consumers. This change streamlines the event flow, decouples concerns, and removes complex background processing logic from the request handling layer, leading to a more reliable and maintainable system.

Highlights

  • Centralized Event Bus Architecture: Introduced a new MainEventBus and MainEventBusProcessor to centralize event handling. All events are now submitted to this central bus, which then orchestrates persistence to the TaskStore and subsequent distribution to client-facing queues.
  • Decoupled Event Flow: The MainQueue is now exclusively for sending events to the central processor and can no longer be directly subscribed to. Event consumption is strictly handled by ChildQueues, ensuring a clear separation between event production and consumption.
  • Elimination of Background Processing in Request Handler: The DefaultRequestHandler has been refactored to remove its internal background processing logic for event consumption and push notifications. These responsibilities are now fully managed by the dedicated MainEventBusProcessor.
  • Guaranteed Persistence Before Distribution: Events are now guaranteed to be persisted to the TaskStore and push notifications sent before they are distributed to any ChildQueues. This prevents race conditions and ensures clients always receive events that have already been durably stored.
  • Streamlined Queue Management: The EventQueue hierarchy and InMemoryQueueManager have been updated to integrate seamlessly with the new MainEventBus architecture, simplifying event queue creation and lifecycle management.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a major and beneficial architectural change by refactoring the event queue system to use a central MainEventBus. This decouples event production from consumption, and centralizes persistence and distribution logic in the new MainEventBusProcessor. The "persistence first" approach significantly improves reliability and data consistency. The changes are extensive, touching core event queue logic, request handlers, and numerous tests. My review has identified a critical resource leak in the new ChildQueue implementation that could lead to deadlocks, along with some medium-severity suggestions to improve testability and code clarity. Overall, this is a strong refactoring that moves the system in the right direction.

@kabir
Copy link
Collaborator Author

kabir commented Dec 8, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-executed architectural refactoring to centralize event handling via a MainEventBus. The changes correctly decouple event production from persistence and consumption, which should resolve potential race conditions and improve the system's robustness. The core logic is well-encapsulated in the new MainEventBusProcessor, and the related classes like EventQueue, QueueManager, and DefaultRequestHandler have been updated accordingly. The test suite has also been commendably updated to reflect the new asynchronous nature of the event system. I have a few suggestions to further enhance thread safety, error handling, and test practices, but overall, this is a strong improvement.

Comment on lines +123 to +131
} catch (Exception e) {
LOGGER.error("Error processing event from MainEventBus", e);
// Continue processing despite errors
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The broad catch (Exception e) can mask critical, persistent issues. If a fatal error occurs (e.g., the database for TaskStore is unreachable), the processor will enter a busy-loop, consuming CPU and flooding logs without any mechanism to halt or signal a system-wide failure. Consider implementing more sophisticated error handling, such as a circuit breaker pattern or shutting down the processor after a certain number of consecutive fatal errors, to make the system more resilient and prevent hidden failures.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this works the way it used to (but in a different location), so it is out of scope for this PR. But it is a valid point. I've opened #516 to track it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed so it propagates errors

@kabir
Copy link
Collaborator Author

kabir commented Dec 9, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-executed architectural refactoring by centralizing event processing with a MainEventBus and MainEventBusProcessor. This change greatly improves the system's reliability by ensuring events are persisted before being distributed to consumers, which eliminates a class of race conditions. The decoupling of concerns also simplifies the DefaultRequestHandler considerably. The changes are consistent, and the test suite has been thoroughly updated to accommodate the new asynchronous nature of event handling.

I have found one critical issue and one medium-severity issue. The critical issue is a semaphore leak that could cause the system to hang. The medium-severity issue relates to a test utility that could be improved for better test isolation and clarity. Details are in the specific comments.

Comment on lines 21 to 24
public static EventQueue.EventQueueBuilder getEventQueueBuilder() {
return EventQueue.builder();
return EventQueue.builder(TEST_EVENT_BUS)
.taskId("test-task-" + TASK_ID_COUNTER.incrementAndGet());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The getEventQueueBuilder() method automatically assigns a task ID using a static atomic counter. This can make tests harder to reason about and could lead to flakiness if tests are run in parallel, as the sequence of task IDs is not deterministic. It's better to have tests explicitly set their own task IDs for clarity and reproducibility. Please also remove the TASK_ID_COUNTER field, as it will no longer be needed.

    public static EventQueue.EventQueueBuilder getEventQueueBuilder() {
        return EventQueue.builder(TEST_EVENT_BUS);
    }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are just there to guarantee a unique id, we don't look up anything using these.

@kabir
Copy link
Collaborator Author

kabir commented Dec 9, 2025

/gemini review

1 similar comment
@kabir
Copy link
Collaborator Author

kabir commented Dec 9, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant architectural change to the event queuing system by centralizing event processing through a new MainEventBus and MainEventBusProcessor. Previously, EventQueue directly managed its internal queue and event distribution. Now, MainQueue (a type of EventQueue) submits events to the MainEventBus, which is then processed asynchronously by MainEventBusProcessor. This processor is responsible for persisting events to the TaskStore, sending push notifications, and finally distributing events to ChildQueue instances. This ensures events are persisted before clients receive them, preventing race conditions and enabling reliable event replay. The EventQueue abstract class has been refactored to separate MainQueue and ChildQueue responsibilities, with ChildQueue now holding its own BlockingQueue for local consumption. The DefaultRequestHandler has been updated to remove direct push notification sending and background task tracking, as these responsibilities are now handled by the MainEventBusProcessor. Test classes across various modules (extras/queue-manager-replicated, server-common, transport/jsonrpc) have been updated to reflect these changes, including initializing MainEventBus and MainEventBusProcessor in their setUp methods, passing MainEventBus to QueueManager constructors, and adjusting EventQueue creation and consumption logic. A new EventQueueUtil class was added for test setup, and MainEventBusContext, MainEventBusProcessorCallback, and MainEventBusProcessorInitializer were introduced to support the new event bus architecture. Review comments suggest converting MainEventBusContext to a Java record for conciseness, consolidating the duplicated EventQueueUtil test utility, and handling CompletableFuture exceptions from cleanupProducer in DefaultRequestHandler to prevent silent failures.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-executed architectural refactoring by centralizing event processing into a MainEventBus and a dedicated MainEventBusProcessor. This change greatly improves the system's robustness by enforcing a 'persistence-first' pattern, which eliminates potential race conditions. The decoupling of concerns simplifies DefaultRequestHandler and ResultAggregator, making the codebase easier to maintain. The updates to tests to handle the new asynchronous nature of event processing are thorough and correct. I've identified one minor issue regarding code duplication in a test utility class, but overall, this is an excellent improvement to the event queueing system.

@kabir
Copy link
Collaborator Author

kabir commented Dec 9, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant refactoring of the event queuing and processing mechanism by implementing a MainEventBus and MainEventBusProcessor. The MainEventBus acts as a central hub for events, ensuring that all events are first processed by the MainEventBusProcessor in a dedicated background thread. This processor is responsible for persisting events to the TaskStore and sending push notifications before distributing them to client-facing ChildQueues, thereby guaranteeing persistence before client visibility. The EventQueue class is refactored into MainQueue and ChildQueue types, with MainQueue handling event submission to the MainEventBus and ChildQueues managing local consumption. The DefaultRequestHandler is updated to remove direct push notification sending and background task tracking, delegating these responsibilities to the MainEventBusProcessor. Test cases across ReplicatedQueueManagerTest, EventConsumerTest, EventQueueTest, InMemoryQueueManagerTest, AbstractA2ARequestHandlerTest, DefaultRequestHandlerTest, ResultAggregatorTest, and TaskUpdaterTest are updated to reflect the new asynchronous event processing model, often incorporating retry logic and MainEventBusProcessor setup/teardown. A new EventQueueUtil class is introduced in server-common/src/test/java to assist with test setup. Review comments highlight a redundant EventQueueUtil.java file in extras/queue-manager-replicated/core/src/test/java that needs removal, and a critical issue in MainEventBusProcessor.java where updateTaskStore was swallowing exceptions, violating the 'persistence first' guarantee. The reviewer suggests re-throwing these exceptions to ensure event distribution is aborted if persistence fails.

@kabir
Copy link
Collaborator Author

kabir commented Dec 10, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new MainEventBus and MainEventBusProcessor to centralize event persistence and distribution, ensuring events are persisted to the TaskStore before being distributed to client ChildQueues. The EventQueue hierarchy has been refactored, making MainQueue responsible for submitting events to the MainEventBus and ChildQueue responsible for local consumption. The MainEventBusProcessor runs in a background thread, handling persistence, push notifications, and distribution, and includes a testing callback for synchronization. Consequently, DefaultRequestHandler and ResultAggregator no longer directly handle TaskStore updates or push notifications, delegating these to the MainEventBusProcessor. Test cases across ReplicatedQueueManager, EventConsumer, EventQueue, InMemoryQueueManager, DefaultRequestHandler, ResultAggregator, and TaskUpdater have been updated to integrate with the new MainEventBus architecture, using new EventQueueUtil helpers and MainEventBusProcessorCallback for deterministic testing of asynchronous event processing. Review comments suggest improving ChildQueue.internalEnqueueItem to use queue.offer() for graceful backpressure handling and removing an unused semaphore release in ChildQueue.doClose for clarity.

Comment on lines 401 to 415
private void internalEnqueueItem(EventQueueItem item) {
super.enqueueItem(item);
// Internal method called by MainEventBusProcessor to add to local queue
// Note: Semaphore is managed by parent MainQueue (acquire/release), not ChildQueue
Event event = item.getEvent();
if (isClosed()) {
LOGGER.warn("ChildQueue is closed. Event will not be enqueued. {} {}", this, event);
return;
}
queue.add(item);
LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The internalEnqueueItem method uses queue.add(). With a bounded queue (as suggested in another comment), this will throw an IllegalStateException if the queue is full. This exception would propagate up to the MainEventBusProcessor, disrupting event distribution to other child queues for the same task. A more robust approach is to use queue.offer() and handle the backpressure gracefully by closing the full queue.

        private void internalEnqueueItem(EventQueueItem item) {
            // Internal method called by MainEventBusProcessor to add to local queue
            Event event = item.getEvent();
            if (isClosed()) {
                LOGGER.warn("ChildQueue is closed. Event will not be enqueued. {} {}", this, event);
                return;
            }
            // Use offer() to handle backpressure. If the queue is full, close it to prevent memory issues.
            if (!queue.offer(item)) {
                LOGGER.warn("ChildQueue {} is full. Closing immediately.", this);
                close(true); // immediate close
            } else {
                LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this);
            }
        }

Comment on lines 478 to 479
// Release semaphore permits for cleared events to prevent deadlock
semaphore.release(clearedCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment on line 478 is misleading. The semaphore being released here belongs to the ChildQueue instance and is never acquired, so there is no risk of deadlock. This release call is on an unused semaphore. This logic appears to be a leftover from a previous design and can be removed for clarity.

@kabir kabir force-pushed the event-queue-bus branch 2 times, most recently from db9e6f3 to e410643 Compare December 10, 2025 14:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant