Skip to content

feat(pusher): Add concurrent log publishing with circuit breaker and retry heap#2023

Draft
the-mann wants to merge 65 commits intomainfrom
fix/poison-pill-deadlock
Draft

feat(pusher): Add concurrent log publishing with circuit breaker and retry heap#2023
the-mann wants to merge 65 commits intomainfrom
fix/poison-pill-deadlock

Conversation

@the-mann
Copy link
Copy Markdown
Contributor

Summary

Add multi-threaded log publishing to CloudWatch Logs with circuit breaker isolation and retry heap for failed batches.

Problem

The CloudWatch agent publishes logs synchronously — one batch at a time per target. When a target encounters persistent failures (e.g., AccessDenied), the retry loop blocks the entire pipeline, starving healthy log groups.

Solution

Architecture

Queue → SenderPool → WorkerPool (N workers) → Sender → PutLogEvents
                                                  ↓ (on failure)
                                            RetryHeap (unbounded min-heap)
                                                  ↓ (on retry time)
                                         RetryHeapProcessor → WorkerPool

Key Components

  • WorkerPool: Shared pool of N goroutines for concurrent PutLogEvents calls
  • RetryHeap: Unbounded min-heap ordered by next retry time for failed batches
  • RetryHeapProcessor: Periodically pops ready batches and resubmits to worker pool
  • Circuit Breaker: Per-target halt/resume — failing target blocks its own queue without affecting others

Circuit Breaker Flow

  1. Batch fails → batch.fail()queue.halt() — queue stops sending new batches
  2. Failed batch pushed to RetryHeap with backoff
  3. RetryHeapProcessor retries batch later
  4. Success → batch.done()queue.resume() — queue resumes
  5. Expired (14d) → batch.done() → queue resumes, batch dropped

Poison Pill Fix

The retry heap is unbounded. A previous bounded implementation (size = concurrency) caused deadlock when failing log groups exceeded the heap size — workers blocked on Push(), starving all targets.

Changes

  • batch.go: Add retry metadata, state/fail/done callbacks, expiry logic
  • queue.go: Add channel-based halt/resume circuit breaker with mutex protection
  • sender.go: Push to retry heap on failure, call batch.fail() for circuit breaker
  • retryheap.go: Unbounded min-heap + RetryHeapProcessor with periodic retry loop
  • pool.go: WorkerPool and SenderPool for concurrent dispatch
  • pusher.go: Wire up retry heap and worker pool when concurrency enabled
  • cloudwatchlogs.go: Create shared WorkerPool and RetryHeap

Testing

  • Unit tests: 27 new tests covering retry heap, circuit breaker, poison pill, state callbacks, shutdown deadlock
  • Race detector: All tests pass with -race
  • Key scenarios tested:
    • 10 denied + 1 allowed log group with concurrency=2 (poison pill)
    • Shutdown while queue is halted (no deadlock)
    • Batch expiry after 14 days resumes circuit breaker and persists state
    • State callbacks fire on success and expiry but not on shutdown

Related PRs

agarakan and others added 30 commits December 30, 2025 05:00
Co-authored-by: Akansha Agarwal <agarakan@users.noreply.github.com>
…ock-on-failure

# Conflicts:
#	plugins/outputs/cloudwatchlogs/cloudwatchlogs.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/batch.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/sender.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go
…ock-on-failure

# Conflicts:
#	plugins/outputs/cloudwatchlogs/cloudwatchlogs.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/batch.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/sender.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go
…nder-block-on-failure

# Conflicts:
#	plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go
- Add mutex protection to Stop() method to prevent race conditions
- Add stopped flag checks in Push() to prevent pushing after Stop()
- Ensure Push() checks stopped flag both before and after acquiring semaphore
- Fix TestRetryHeapStopTwice to verify correct behavior
- Add TestRetryHeapProcessorExpiredBatchShouldResume to demonstrate bug
- When a batch expires after 14 days, RetryHeapProcessor calls updateState()
  but not done(), leaving circuit breaker permanently closed
- Target remains blocked forever even though bad batch was dropped
- Test currently fails, demonstrating the bug from PR comment
Verifies that startTime and expireAfter are only set once on first call
and remain unchanged on subsequent calls, ensuring the 14-day expiration
is measured from the first send attempt, not from each retry.
Concurrency is now determined by whether workerPool and retryHeap are
provided, making the explicit concurrency parameter redundant.

🤖 Assisted by AI
The retry heap is now unbounded, so maxSize is no longer used.

🤖 Assisted by AI
batch.done() already calls updateState() internally, so the explicit
call is unnecessary.

🤖 Assisted by AI
Test had no assertions and was not validating any behavior.

🤖 Assisted by AI
Variable was set but never checked in the test.

🤖 Assisted by AI
Circuit breaker should always block after exactly 1 send attempt,
not "at most 1".

🤖 Assisted by AI
The dummyBatch was not connected to the queue's circuit breaker,
so calling done() on it had no effect. Simplified test to only
verify halt behavior.

🤖 Assisted by AI
- Replace sync.Cond with channel-based halt/resume to prevent shutdown
  deadlock (waitIfHalted now selects on haltCh and stopCh)
- Add mutex to halt/resume/waitIfHalted for thread safety
- Add TestQueueStopWhileHalted to verify no shutdown deadlock
- Add TestQueueHaltResume with proper resume assertions
- Clean up verbose test comments and weak assertions
- Remove orphaned TestQueueResumeOnBatchExpiry comment

🤖 Assisted by AI
Verify state file management during retry, expiry, and shutdown:
- Successful retry persists file offsets via state callbacks
- Expired batch (14d) still persists offsets to prevent re-read
- Clean shutdown does not persist state for unprocessed batches

🤖 Assisted by AI
- Fix TestRetryHeapProcessorSendsBatch: add events to batch, verify
  PutLogEvents is called and done callback fires (was testing empty batch)
- Fix TestRetryHeapProcessorExpiredBatch: set expireAfter field so
  isExpired() actually returns true, verify done() is called
- Fix race in TestRetryHeapProcessorSendsBatch: use atomic.Bool
- Reduce TestRetryHeap_UnboundedPush sleep from 3s to 100ms

🤖 Assisted by AI
…Groups

TestPoisonPillScenario already covers the same scenario (10 denied +
1 allowed with low concurrency). The bounded heap no longer exists so
the 'smaller than' framing is no longer meaningful.

🤖 Assisted by AI
@the-mann the-mann added the ready for testing Indicates this PR is ready for integration tests to run label Feb 13, 2026
@the-mann the-mann force-pushed the fix/poison-pill-deadlock branch 3 times, most recently from a45d9be to 98bdc89 Compare February 19, 2026 18:24
CRITICAL fixes:
- Handle retryHeap.Push() error in sender.Send() when heap is stopped
  during shutdown. Now calls batch.done() to persist state and resume
  circuit breaker instead of silently dropping the batch.
- Fix Close() ordering: pushers stop before heap to allow in-flight
  sends to push failed batches. Remove duplicate Stop() calls.

HIGH priority fixes:
- Remove dead maxRetryDuration field from RetryHeapProcessor (batch
  expiry is handled by batch.expireAfter set in initializeStartTime)
- Remove duplicate maxRetryTimeout constant from cloudwatchlogs.go
  (canonical definition is in batch.go)
- Add clarifying comment about circuit breaker in synchronous mode

MEDIUM priority fixes:
- Add stopMu mutex to RetryHeapProcessor.Stop() for thread safety
- Rename TestPoisonPillScenario to TestRetryHeapProcessorDoesNotStarveAllowedTarget
  (test doesn't exercise full pipeline)
- Delete TestRecoveryAfterSystemRestart (doesn't test actual restart)
- Delete TestRecoveryWithMultipleTargets (duplicates TestSingleDeniedLogGroup)

LOW priority fixes:
- Fix TestQueueHaltResume to avoid race condition
- Replace stringPtr/int64Ptr helpers with aws.String()/aws.Int64()
@the-mann
Copy link
Copy Markdown
Contributor Author

Incorporated review feedback. Integration tests passing:

@the-mann
Copy link
Copy Markdown
Contributor Author

Updated PR with review feedback and merged latest from main. Commit to review: 9aabc04

Integration tests passing:

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 6, 2026

This PR was marked stale due to lack of activity.

@github-actions github-actions bot added the Stale label Mar 6, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ready for testing Indicates this PR is ready for integration tests to run Stale

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants