Keep Kafka consumer and DAG-processing threads alive across uncaught exceptions#4188
Closed
DaisyModi wants to merge 1 commit intoapache:masterfrom
Closed
Keep Kafka consumer and DAG-processing threads alive across uncaught exceptions#4188DaisyModi wants to merge 1 commit intoapache:masterfrom
DaisyModi wants to merge 1 commit intoapache:masterfrom
Conversation
63d6fe2 to
d8ea662
Compare
d8ea662 to
bb9c55c
Compare
…exceptions
GaaS depends on three long-lived worker loops that previously exited silently
on any uncaught exception, leaving the service running but non-functional
('server up, consumer down'):
1. HighLevelConsumer.startUp() - the Kafka poll loop, whose consume() only
caught InterruptedException.
2. HighLevelConsumer.QueueProcessor.run() - the per-partition record-processing
loop, whose outer catch(Exception) exited run() and killed the worker.
3. DagProcessingEngine.DagProcEngineThread.run() - the DAG-task-processing
loop, whose top-of-iteration blocking calls (dagTaskStream.next(),
dagTask.host()) sat outside any try/catch.
Fix: move the try/catch INSIDE each while loop so an uncaught exception logs
+ marks a meter + iterates to the next cycle, instead of escaping and
terminating the thread. DagActionStoreChangeMonitor.setActive() inlines the
same broken poll-loop pattern; it gets the same wrap.
Watermark invariant preserved (GOBBLIN-2177): under auto-commit=false the
inner catch still rethrows before partitionOffsetsToCommit is updated for the
failing record, and the new outer in-loop catch handles that without advancing
the offset map for the failing record. The failing record is dropped in
steady state; operators alert on the loopExceptions meter and investigate.
Retry semantics were considered and rejected for now: the specific failure
modes that would kill a worker aren't well-understood yet, so retrying could
add complexity without known benefit. Alert-on-failure is the guardrail.
New metric, wire to on-call paging:
gobblin.kafka.highLevelConsumer.loopExceptions
DagProc-side reuses the existing dagProcessingExceptionMeter.
One integration test verifies the auto-commit=false path continues after a
RuntimeException from processMessage (one record dropped, rest flow through).
bb9c55c to
993d30b
Compare
Contributor
Author
|
Closing this upstream PR. After team discussion, we determined that production GaaS actually runs Xinfra-based consumers from an internal fork, not the apache/gobblin HighLevelConsumer path touched here. The bug still exists upstream, but fixing it here wouldn't affect our production incident driver (ACTIONITEM-16023 / incident 9928). Happy to revisit later for upstream alignment — for now, the fix is landing in the internal fork. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Dear Gobblin maintainers,
Please accept this PR. I understand that until you sign your ICLA, we cannot actually accept your PR.
JIRA
Description of PR
GaaS depends on three long-lived worker loops that previously exited silently on any uncaught exception, leaving the service running but non-functional ("server up, consumer down"):
HighLevelConsumer.startUp()— the Kafka poll loop.consume()only caughtInterruptedException; any other exception escaped the lambda and the poll thread stopped.HighLevelConsumer.QueueProcessor.run()— the per-partition record-processing loop. Its outercatch (Exception)exitedrun()and silently killed the worker for that partition.DagProcessingEngine.DagProcEngineThread.run()— the DAG-task-processing loop. The top-of-iteration blocking calls (dagTaskStream.next(),dagTask.host()) sat outside anytry/catch.Fix: move the
try/catchinside eachwhileloop so an uncaught exception logs + marks a meter + iterates to the next cycle, instead of escaping and terminating the thread.DagActionStoreChangeMonitor.setActive()inlines the same broken poll-loop pattern; it gets the same wrap.Watermark invariant preserved (GOBBLIN-2177): under
auto-commit=falsethe inner catch still rethrows beforepartitionOffsetsToCommitis updated for the failing record, and the new outer in-loop catch handles that without advancing the offset map for the failing record. The failing record is dropped in steady state; operators alert on theloopExceptionsmeter and investigate.Retry semantics considered and deferred. The specific failure modes that would escape
processMessagearen't well-understood yet, so retrying could add complexity without known benefit. Alert-on-failure is the intended guardrail; retry can be added later if specific retryable failure modes emerge.One new metric (wire to on-call paging):
gobblin.kafka.highLevelConsumer.loopExceptionsDagProc-side reuses the existing
dagProcessingExceptionMeter.Tests
HighLevelConsumerTest#testQueueProcessorContinuesAfterExceptionAutoCommitDisabled— new integration test:auto-commit=false,processMessagethrows on the first record, subsequent records still flow through (NUM_MSGS - 1observed; the failing record is dropped).testQueueProcessorRuntimeExceptionEncounteredAutoCommitEnabledcontinues to pass unchanged.Commits
Documentation and Formatting