feat(pusher): Add concurrent log publishing with circuit breaker and retry heap#2023
Draft
feat(pusher): Add concurrent log publishing with circuit breaker and retry heap#2023
Conversation
Co-authored-by: Akansha Agarwal <agarakan@users.noreply.github.com>
…ock-on-failure # Conflicts: # plugins/outputs/cloudwatchlogs/cloudwatchlogs.go # plugins/outputs/cloudwatchlogs/internal/pusher/batch.go # plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go # plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go # plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go # plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go # plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go # plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go # plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go # plugins/outputs/cloudwatchlogs/internal/pusher/sender.go # plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go
…ock-on-failure # Conflicts: # plugins/outputs/cloudwatchlogs/cloudwatchlogs.go # plugins/outputs/cloudwatchlogs/internal/pusher/batch.go # plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go # plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go # plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go # plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go # plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go # plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go # plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go # plugins/outputs/cloudwatchlogs/internal/pusher/sender.go # plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go
…nder-block-on-failure # Conflicts: # plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go # plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go
- Add mutex protection to Stop() method to prevent race conditions - Add stopped flag checks in Push() to prevent pushing after Stop() - Ensure Push() checks stopped flag both before and after acquiring semaphore - Fix TestRetryHeapStopTwice to verify correct behavior
- Add TestRetryHeapProcessorExpiredBatchShouldResume to demonstrate bug - When a batch expires after 14 days, RetryHeapProcessor calls updateState() but not done(), leaving circuit breaker permanently closed - Target remains blocked forever even though bad batch was dropped - Test currently fails, demonstrating the bug from PR comment
Verifies that startTime and expireAfter are only set once on first call and remain unchanged on subsequent calls, ensuring the 14-day expiration is measured from the first send attempt, not from each retry.
Concurrency is now determined by whether workerPool and retryHeap are provided, making the explicit concurrency parameter redundant. 🤖 Assisted by AI
The retry heap is now unbounded, so maxSize is no longer used. 🤖 Assisted by AI
batch.done() already calls updateState() internally, so the explicit call is unnecessary. 🤖 Assisted by AI
Test had no assertions and was not validating any behavior. 🤖 Assisted by AI
🤖 Assisted by AI
Variable was set but never checked in the test. 🤖 Assisted by AI
Circuit breaker should always block after exactly 1 send attempt, not "at most 1". 🤖 Assisted by AI
The dummyBatch was not connected to the queue's circuit breaker, so calling done() on it had no effect. Simplified test to only verify halt behavior. 🤖 Assisted by AI
- Replace sync.Cond with channel-based halt/resume to prevent shutdown deadlock (waitIfHalted now selects on haltCh and stopCh) - Add mutex to halt/resume/waitIfHalted for thread safety - Add TestQueueStopWhileHalted to verify no shutdown deadlock - Add TestQueueHaltResume with proper resume assertions - Clean up verbose test comments and weak assertions - Remove orphaned TestQueueResumeOnBatchExpiry comment 🤖 Assisted by AI
Verify state file management during retry, expiry, and shutdown: - Successful retry persists file offsets via state callbacks - Expired batch (14d) still persists offsets to prevent re-read - Clean shutdown does not persist state for unprocessed batches 🤖 Assisted by AI
- Fix TestRetryHeapProcessorSendsBatch: add events to batch, verify PutLogEvents is called and done callback fires (was testing empty batch) - Fix TestRetryHeapProcessorExpiredBatch: set expireAfter field so isExpired() actually returns true, verify done() is called - Fix race in TestRetryHeapProcessorSendsBatch: use atomic.Bool - Reduce TestRetryHeap_UnboundedPush sleep from 3s to 100ms 🤖 Assisted by AI
…Groups TestPoisonPillScenario already covers the same scenario (10 denied + 1 allowed with low concurrency). The bounded heap no longer exists so the 'smaller than' framing is no longer meaningful. 🤖 Assisted by AI
🤖 Assisted by AI
a45d9be to
98bdc89
Compare
CRITICAL fixes: - Handle retryHeap.Push() error in sender.Send() when heap is stopped during shutdown. Now calls batch.done() to persist state and resume circuit breaker instead of silently dropping the batch. - Fix Close() ordering: pushers stop before heap to allow in-flight sends to push failed batches. Remove duplicate Stop() calls. HIGH priority fixes: - Remove dead maxRetryDuration field from RetryHeapProcessor (batch expiry is handled by batch.expireAfter set in initializeStartTime) - Remove duplicate maxRetryTimeout constant from cloudwatchlogs.go (canonical definition is in batch.go) - Add clarifying comment about circuit breaker in synchronous mode MEDIUM priority fixes: - Add stopMu mutex to RetryHeapProcessor.Stop() for thread safety - Rename TestPoisonPillScenario to TestRetryHeapProcessorDoesNotStarveAllowedTarget (test doesn't exercise full pipeline) - Delete TestRecoveryAfterSystemRestart (doesn't test actual restart) - Delete TestRecoveryWithMultipleTargets (duplicates TestSingleDeniedLogGroup) LOW priority fixes: - Fix TestQueueHaltResume to avoid race condition - Replace stringPtr/int64Ptr helpers with aws.String()/aws.Int64()
Contributor
Author
|
Incorporated review feedback. Integration tests passing:
|
Contributor
Author
|
Updated PR with review feedback and merged latest from main. Commit to review: 9aabc04 Integration tests passing:
|
Contributor
|
This PR was marked stale due to lack of activity. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Add multi-threaded log publishing to CloudWatch Logs with circuit breaker isolation and retry heap for failed batches.
Problem
The CloudWatch agent publishes logs synchronously — one batch at a time per target. When a target encounters persistent failures (e.g., AccessDenied), the retry loop blocks the entire pipeline, starving healthy log groups.
Solution
Architecture
Key Components
Circuit Breaker Flow
batch.fail()→queue.halt()— queue stops sending new batchesbatch.done()→queue.resume()— queue resumesbatch.done()→ queue resumes, batch droppedPoison Pill Fix
The retry heap is unbounded. A previous bounded implementation (size = concurrency) caused deadlock when failing log groups exceeded the heap size — workers blocked on Push(), starving all targets.
Changes
batch.go: Add retry metadata, state/fail/done callbacks, expiry logicqueue.go: Add channel-based halt/resume circuit breaker with mutex protectionsender.go: Push to retry heap on failure, callbatch.fail()for circuit breakerretryheap.go: Unbounded min-heap + RetryHeapProcessor with periodic retry looppool.go: WorkerPool and SenderPool for concurrent dispatchpusher.go: Wire up retry heap and worker pool when concurrency enabledcloudwatchlogs.go: Create shared WorkerPool and RetryHeapTesting
-raceRelated PRs