Skip to content

Keep Kafka consumer and DAG-processing threads alive across uncaught exceptions#4188

Closed
DaisyModi wants to merge 1 commit intoapache:masterfrom
DaisyModi:dmodi/supervise-consumer-threads
Closed

Keep Kafka consumer and DAG-processing threads alive across uncaught exceptions#4188
DaisyModi wants to merge 1 commit intoapache:masterfrom
DaisyModi:dmodi/supervise-consumer-threads

Conversation

@DaisyModi
Copy link
Copy Markdown
Contributor

@DaisyModi DaisyModi commented Apr 21, 2026

Dear Gobblin maintainers,

Please accept this PR. I understand that until you sign your ICLA, we cannot actually accept your PR.

  • My name is Daisy Modi
  • I work for LinkedIn
  • I have signed the ICLA

JIRA

  • Filing a GOBBLIN ticket as a follow-up — happy to update the PR title with the ticket number once filed.

Description of PR

GaaS depends on three long-lived worker loops that previously exited silently on any uncaught exception, leaving the service running but non-functional ("server up, consumer down"):

  1. HighLevelConsumer.startUp() — the Kafka poll loop. consume() only caught InterruptedException; any other exception escaped the lambda and the poll thread stopped.
  2. HighLevelConsumer.QueueProcessor.run() — the per-partition record-processing loop. Its outer catch (Exception) exited run() and silently killed the worker for that partition.
  3. DagProcessingEngine.DagProcEngineThread.run() — the DAG-task-processing loop. The top-of-iteration blocking calls (dagTaskStream.next(), dagTask.host()) sat outside any try/catch.

Fix: move the try/catch inside each while loop so an uncaught exception logs + marks a meter + iterates to the next cycle, instead of escaping and terminating the thread. DagActionStoreChangeMonitor.setActive() inlines the same broken poll-loop pattern; it gets the same wrap.

Watermark invariant preserved (GOBBLIN-2177): under auto-commit=false the inner catch still rethrows before partitionOffsetsToCommit is updated for the failing record, and the new outer in-loop catch handles that without advancing the offset map for the failing record. The failing record is dropped in steady state; operators alert on the loopExceptions meter and investigate.

Retry semantics considered and deferred. The specific failure modes that would escape processMessage aren't well-understood yet, so retrying could add complexity without known benefit. Alert-on-failure is the intended guardrail; retry can be added later if specific retryable failure modes emerge.

One new metric (wire to on-call paging):

  • gobblin.kafka.highLevelConsumer.loopExceptions

DagProc-side reuses the existing dagProcessingExceptionMeter.

Tests

  • HighLevelConsumerTest#testQueueProcessorContinuesAfterExceptionAutoCommitDisabled — new integration test: auto-commit=false, processMessage throws on the first record, subsequent records still flow through (NUM_MSGS - 1 observed; the failing record is dropped).
  • Existing testQueueProcessorRuntimeExceptionEncounteredAutoCommitEnabled continues to pass unchanged.

Commits

  • One squashed commit.

Documentation and Formatting

  • No new user-visible config.
  • Minimal restructure — existing patterns and class structure preserved.

@DaisyModi DaisyModi force-pushed the dmodi/supervise-consumer-threads branch from 63d6fe2 to d8ea662 Compare April 22, 2026 09:47
@DaisyModi DaisyModi changed the title Supervise long-lived consumer and DAG-processing threads so they respawn on uncaught exceptions Keep Kafka consumer and DAG-processing threads alive across uncaught exceptions Apr 22, 2026
@DaisyModi DaisyModi force-pushed the dmodi/supervise-consumer-threads branch from d8ea662 to bb9c55c Compare April 22, 2026 13:18
@DaisyModi DaisyModi changed the title Keep Kafka consumer and DAG-processing threads alive across uncaught exceptions Keep Kafka consumer and DAG-processing threads alive; retry records until success Apr 22, 2026
…exceptions

GaaS depends on three long-lived worker loops that previously exited silently
on any uncaught exception, leaving the service running but non-functional
('server up, consumer down'):

  1. HighLevelConsumer.startUp() - the Kafka poll loop, whose consume() only
     caught InterruptedException.
  2. HighLevelConsumer.QueueProcessor.run() - the per-partition record-processing
     loop, whose outer catch(Exception) exited run() and killed the worker.
  3. DagProcessingEngine.DagProcEngineThread.run() - the DAG-task-processing
     loop, whose top-of-iteration blocking calls (dagTaskStream.next(),
     dagTask.host()) sat outside any try/catch.

Fix: move the try/catch INSIDE each while loop so an uncaught exception logs
+ marks a meter + iterates to the next cycle, instead of escaping and
terminating the thread. DagActionStoreChangeMonitor.setActive() inlines the
same broken poll-loop pattern; it gets the same wrap.

Watermark invariant preserved (GOBBLIN-2177): under auto-commit=false the
inner catch still rethrows before partitionOffsetsToCommit is updated for the
failing record, and the new outer in-loop catch handles that without advancing
the offset map for the failing record. The failing record is dropped in
steady state; operators alert on the loopExceptions meter and investigate.

Retry semantics were considered and rejected for now: the specific failure
modes that would kill a worker aren't well-understood yet, so retrying could
add complexity without known benefit. Alert-on-failure is the guardrail.

New metric, wire to on-call paging:
  gobblin.kafka.highLevelConsumer.loopExceptions

DagProc-side reuses the existing dagProcessingExceptionMeter.

One integration test verifies the auto-commit=false path continues after a
RuntimeException from processMessage (one record dropped, rest flow through).
@DaisyModi DaisyModi force-pushed the dmodi/supervise-consumer-threads branch from bb9c55c to 993d30b Compare April 23, 2026 05:33
@DaisyModi DaisyModi changed the title Keep Kafka consumer and DAG-processing threads alive; retry records until success Keep Kafka consumer and DAG-processing threads alive across uncaught exceptions Apr 23, 2026
@DaisyModi
Copy link
Copy Markdown
Contributor Author

Closing this upstream PR. After team discussion, we determined that production GaaS actually runs Xinfra-based consumers from an internal fork, not the apache/gobblin HighLevelConsumer path touched here. The bug still exists upstream, but fixing it here wouldn't affect our production incident driver (ACTIONITEM-16023 / incident 9928). Happy to revisit later for upstream alignment — for now, the fix is landing in the internal fork.

@DaisyModi DaisyModi closed this Apr 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant