From dc5c31185f549ffe8a33d9a77f8acbf9feb76053 Mon Sep 17 00:00:00 2001 From: Marcus Mann Date: Wed, 11 Feb 2026 15:08:01 -0500 Subject: [PATCH 1/2] Add unit tests for poison pill scenario - TestPoisonPillScenario: Validates continuous batch generation with 10 denied + 1 allowed log group - TestSingleDeniedLogGroup: Baseline test with 1 denied + 1 allowed log group - TestRetryHeapSmallerThanFailingLogGroups: Demonstrates deadlock when heap size < failing log groups (SKIPPED) The third test intentionally deadlocks to prove the bug exists when: - Retry heap size = concurrency (2) - Number of failing log groups (10) > heap size (2) - Workers block trying to push to full heap - System deadlocks, starving allowed log group --- .../internal/pusher/poison_pill_test.go | 390 ++++++++++++++++++ 1 file changed, 390 insertions(+) create mode 100644 plugins/outputs/cloudwatchlogs/internal/pusher/poison_pill_test.go diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/poison_pill_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/poison_pill_test.go new file mode 100644 index 0000000000..976f9840d0 --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/poison_pill_test.go @@ -0,0 +1,390 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/aws/amazon-cloudwatch-agent/internal/retryer" + "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" +) + +// TestPoisonPillScenario validates that when multiple log groups encounter +// AccessDenied errors simultaneously with low concurrency, the agent continues +// publishing to allowed log groups without blocking the entire pipeline. +// +// This test recreates the scenario from poison-pill-test-findings.md where: +// - 1 allowed log group + 10 denied log groups +// - Concurrency = 2 +// - Continuous stream of new batches (simulating force_flush_interval=5s) +// - Expected: Allowed log group continues receiving events +// - Historical Bug: Agent stopped publishing to ALL log groups after ~5 minutes +// +// This test validates that the retry heap and worker pool architecture correctly +// handles this scenario by: +// 1. Continuously generating batches for 10 denied + 1 allowed log group +// 2. Processing with only 2 workers (low concurrency) +// 3. Verifying allowed log group continues to receive events throughout +// 4. Ensuring worker pool doesn't get saturated by failed retry attempts +// +// The test passes because the current implementation uses a retry heap with +// proper backoff, preventing failed batches from monopolizing worker threads. +func TestPoisonPillScenario(t *testing.T) { + heap := NewRetryHeap(100, &testutil.Logger{}) + defer heap.Stop() + + workerPool := NewWorkerPool(2) // Low concurrency as in the bug scenario + defer workerPool.Stop() + + mockService := &mockLogsService{} + mockTargetManager := &mockTargetManager{} + mockTargetManager.On("EnsureTargetExists", mock.Anything).Return(nil) + + accessDeniedErr := &cloudwatchlogs.AccessDeniedException{ + Message_: stringPtr("User is not authorized to perform: logs:PutLogEvents with an explicit deny"), + } + + // Track successful PutLogEvents calls for the allowed log group + var allowedGroupSuccessCount atomic.Int32 + var deniedGroupAttemptCount atomic.Int32 + + // Configure mock service responses with realistic latency + mockService.On("PutLogEvents", mock.MatchedBy(func(input *cloudwatchlogs.PutLogEventsInput) bool { + return *input.LogGroupName == "log-stream-ple-access-granted" + })).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Run(func(args mock.Arguments) { + time.Sleep(10 * time.Millisecond) // Simulate API latency + allowedGroupSuccessCount.Add(1) + }) + + mockService.On("PutLogEvents", mock.MatchedBy(func(input *cloudwatchlogs.PutLogEventsInput) bool { + return *input.LogGroupName != "log-stream-ple-access-granted" + })).Return((*cloudwatchlogs.PutLogEventsOutput)(nil), accessDeniedErr).Run(func(args mock.Arguments) { + time.Sleep(10 * time.Millisecond) // Simulate API latency + deniedGroupAttemptCount.Add(1) + }) + + processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, 100*time.Millisecond, retryer.NewLogThrottleRetryer(&testutil.Logger{})) + + // Targets + allowedTarget := Target{Group: "log-stream-ple-access-granted", Stream: "i-test"} + deniedTargets := make([]Target, 10) + for i := 0; i < 10; i++ { + deniedTargets[i] = Target{ + Group: "aws-restricted-log-group-name-log-stream-ple-access-denied" + string(rune('0'+i)), + Stream: "i-test", + } + } + + // Simulate continuous batch generation over time (like force_flush_interval=5s) + done := make(chan struct{}) + var wg sync.WaitGroup + + // Continuously generate batches for denied log groups (simulating continuous log writes) + for i := 0; i < 10; i++ { + wg.Add(1) + go func(target Target) { + defer wg.Done() + ticker := time.NewTicker(50 * time.Millisecond) // Simulate flush interval + defer ticker.Stop() + batchCount := 0 + for { + select { + case <-done: + return + case <-ticker.C: + if batchCount >= 5 { // Generate 5 batches per denied log group + return + } + batch := createBatch(target, 50) + batch.nextRetryTime = time.Now().Add(-1 * time.Second) + heap.Push(batch) + batchCount++ + } + } + }(deniedTargets[i]) + } + + // Continuously generate batches for allowed log group + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(50 * time.Millisecond) + defer ticker.Stop() + batchCount := 0 + for { + select { + case <-done: + return + case <-ticker.C: + if batchCount >= 10 { // Generate 10 batches for allowed log group + return + } + batch := createBatch(allowedTarget, 20) + batch.nextRetryTime = time.Now().Add(-1 * time.Second) + heap.Push(batch) + batchCount++ + } + } + }() + + // Process batches continuously + processorDone := make(chan struct{}) + go func() { + ticker := time.NewTicker(20 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-processorDone: + return + case <-ticker.C: + processor.processReadyMessages() + } + } + }() + + // Run for 2 seconds to simulate sustained load + time.Sleep(2 * time.Second) + close(done) + wg.Wait() + + // Process remaining messages + time.Sleep(500 * time.Millisecond) + processor.processReadyMessages() + time.Sleep(200 * time.Millisecond) + close(processorDone) + + // CRITICAL ASSERTION: Allowed log group MUST receive events throughout the test + successCount := allowedGroupSuccessCount.Load() + t.Logf("Allowed group success count: %d, Denied group attempt count: %d", successCount, deniedGroupAttemptCount.Load()) + + assert.Greater(t, successCount, int32(5), + "Allowed log group must continue receiving events despite continuous denied log group failures. Got %d, expected > 5", successCount) + + // Verify denied log groups attempted to send + assert.Greater(t, deniedGroupAttemptCount.Load(), int32(0), + "Denied log groups should have attempted to send") +} + +// TestRetryHeapSmallerThanFailingLogGroups tests the specific bottleneck scenario where: +// - Retry heap size = concurrency (e.g., 2) +// - Number of failing log groups (10) > retry heap size (2) +// - This causes the retry heap to fill up with failed batches +// - New batches from failing log groups block trying to push to full heap +// - Workers get stuck waiting to push failed batches back to heap +// - Allowed log group gets starved of worker time +// +// This test validates the ACTUAL bug: when retry heap size (equal to concurrency) +// is smaller than the number of failing log groups, the system deadlocks. +// +// **EXPECTED BEHAVIOR**: This test will timeout/deadlock, proving the bug exists. +func TestRetryHeapSmallerThanFailingLogGroups(t *testing.T) { + t.Skip("This test intentionally deadlocks to demonstrate the poison pill bug where heap size < failing log groups") + + concurrency := 2 + numFailingLogGroups := 10 + + // CRITICAL: Retry heap size equals concurrency (this is the bug) + heap := NewRetryHeap(concurrency, &testutil.Logger{}) + defer heap.Stop() + + workerPool := NewWorkerPool(concurrency) + defer workerPool.Stop() + + mockService := &mockLogsService{} + mockTargetManager := &mockTargetManager{} + mockTargetManager.On("EnsureTargetExists", mock.Anything).Return(nil) + + accessDeniedErr := &cloudwatchlogs.AccessDeniedException{ + Message_: stringPtr("Access denied"), + } + + var allowedGroupSuccessCount atomic.Int32 + var deniedGroupAttemptCount atomic.Int32 + + mockService.On("PutLogEvents", mock.MatchedBy(func(input *cloudwatchlogs.PutLogEventsInput) bool { + return *input.LogGroupName == "allowed" + })).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Run(func(args mock.Arguments) { + time.Sleep(10 * time.Millisecond) + allowedGroupSuccessCount.Add(1) + }) + + mockService.On("PutLogEvents", mock.MatchedBy(func(input *cloudwatchlogs.PutLogEventsInput) bool { + return *input.LogGroupName != "allowed" + })).Return((*cloudwatchlogs.PutLogEventsOutput)(nil), accessDeniedErr).Run(func(args mock.Arguments) { + time.Sleep(10 * time.Millisecond) + deniedGroupAttemptCount.Add(1) + }) + + processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, 50*time.Millisecond, retryer.NewLogThrottleRetryer(&testutil.Logger{})) + + // Create targets + allowedTarget := Target{Group: "allowed", Stream: "stream"} + deniedTargets := make([]Target, numFailingLogGroups) + for i := 0; i < numFailingLogGroups; i++ { + deniedTargets[i] = Target{Group: fmt.Sprintf("denied-%d", i), Stream: "stream"} + } + + done := make(chan struct{}) + var wg sync.WaitGroup + + // Generate batches for all failing log groups continuously + // This will cause deadlock as heap fills up + for i := 0; i < numFailingLogGroups; i++ { + wg.Add(1) + go func(target Target) { + defer wg.Done() + ticker := time.NewTicker(30 * time.Millisecond) + defer ticker.Stop() + batchCount := 0 + for { + select { + case <-done: + return + case <-ticker.C: + if batchCount >= 3 { + return + } + batch := createBatch(target, 10) + batch.nextRetryTime = time.Now().Add(-1 * time.Second) + // This will block when heap is full + heap.Push(batch) + batchCount++ + } + } + }(deniedTargets[i]) + } + + // Generate batches for allowed log group + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(30 * time.Millisecond) + defer ticker.Stop() + batchCount := 0 + for { + select { + case <-done: + return + case <-ticker.C: + if batchCount >= 5 { + return + } + batch := createBatch(allowedTarget, 10) + batch.nextRetryTime = time.Now().Add(-1 * time.Second) + heap.Push(batch) + batchCount++ + } + } + }() + + // Process continuously + processorDone := make(chan struct{}) + go func() { + ticker := time.NewTicker(15 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-processorDone: + return + case <-ticker.C: + processor.processReadyMessages() + } + } + }() + + // Run for 1 second + time.Sleep(1 * time.Second) + close(done) + wg.Wait() + time.Sleep(300 * time.Millisecond) + processor.processReadyMessages() + time.Sleep(100 * time.Millisecond) + close(processorDone) + + successCount := allowedGroupSuccessCount.Load() + + t.Logf("Results: Allowed success=%d, Denied attempts=%d, Heap size=%d, Failing groups=%d", + successCount, deniedGroupAttemptCount.Load(), concurrency, numFailingLogGroups) + + // This test documents the bug: with heap size < failing log groups, the system deadlocks + if successCount == 0 { + t.Errorf("POISON PILL BUG DETECTED: Allowed log group received 0 events. Heap size (%d) < failing groups (%d) caused deadlock", concurrency, numFailingLogGroups) + } +} + +// TestSingleDeniedLogGroup validates the baseline scenario where a single denied +// log group does not affect the allowed log group. +func TestSingleDeniedLogGroup(t *testing.T) { + heap := NewRetryHeap(10, &testutil.Logger{}) + defer heap.Stop() + + workerPool := NewWorkerPool(4) // Higher concurrency as in initial test + defer workerPool.Stop() + + mockService := &mockLogsService{} + mockTargetManager := &mockTargetManager{} + mockTargetManager.On("EnsureTargetExists", mock.Anything).Return(nil) + + accessDeniedErr := &cloudwatchlogs.AccessDeniedException{ + Message_: stringPtr("Access denied"), + } + + var allowedGroupSuccessCount atomic.Int32 + + mockService.On("PutLogEvents", mock.MatchedBy(func(input *cloudwatchlogs.PutLogEventsInput) bool { + return *input.LogGroupName == "log-stream-ple-access-granted" + })).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Run(func(args mock.Arguments) { + allowedGroupSuccessCount.Add(1) + }) + + mockService.On("PutLogEvents", mock.MatchedBy(func(input *cloudwatchlogs.PutLogEventsInput) bool { + return *input.LogGroupName == "aws-restricted-log-group-name-log-stream-ple-access-denied" + })).Return((*cloudwatchlogs.PutLogEventsOutput)(nil), accessDeniedErr) + + processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, time.Hour, retryer.NewLogThrottleRetryer(&testutil.Logger{})) + + // Create batches + allowedTarget := Target{Group: "log-stream-ple-access-granted", Stream: "i-test"} + deniedTarget := Target{Group: "aws-restricted-log-group-name-log-stream-ple-access-denied", Stream: "i-test"} + + allowedBatch := createBatch(allowedTarget, 40) + deniedBatch := createBatch(deniedTarget, 40) + + allowedBatch.nextRetryTime = time.Now().Add(-1 * time.Second) + deniedBatch.nextRetryTime = time.Now().Add(-1 * time.Second) + + err := heap.Push(allowedBatch) + assert.NoError(t, err) + err = heap.Push(deniedBatch) + assert.NoError(t, err) + + processor.processReadyMessages() + time.Sleep(100 * time.Millisecond) + + // Verify allowed log group received events + assert.Greater(t, allowedGroupSuccessCount.Load(), int32(0), + "Allowed log group must receive events with single denied log group") +} + +// createBatch creates a log event batch with the specified number of events +func createBatch(target Target, eventCount int) *logEventBatch { + batch := newLogEventBatch(target, nil) + batch.events = make([]*cloudwatchlogs.InputLogEvent, eventCount) + now := time.Now().Unix() * 1000 + for i := 0; i < eventCount; i++ { + batch.events[i] = &cloudwatchlogs.InputLogEvent{ + Message: stringPtr("test message"), + Timestamp: int64Ptr(now + int64(i)), + } + } + return batch +} From 5943a09da38d5e5611984154c8c7b87ca36d2114 Mon Sep 17 00:00:00 2001 From: Marcus Mann Date: Wed, 11 Feb 2026 15:21:21 -0500 Subject: [PATCH 2/2] Fix poison pill bug: Make retry heap unbounded Remove max size constraint from retry heap to prevent deadlock when failing log groups exceed concurrency limit. Changes: - Remove maxSize and semaphore from retryHeap struct - Make Push() non-blocking (no semaphore wait) - Remove semaphore release from PopReady() - Update NewRetryHeap() to ignore maxSize parameter (kept for API compatibility) - Update TestRetryHeap_SemaphoreBlockingAndUnblocking -> TestRetryHeap_UnboundedPush - Update TestRetryHeapSmallerThanFailingLogGroups to validate fix Before: With concurrency=2 and 10 failing log groups, retry heap (size=2) would fill up, causing workers to block on Push(), leading to deadlock. After: Retry heap is unbounded, allowing all failed batches to be queued without blocking workers. Allowed log groups continue publishing normally. Test results: - TestRetryHeapSmallerThanFailingLogGroups: PASS (5/5 allowed batches published) - Heap grew to size 28 (beyond concurrency limit of 2) - No deadlock or starvation --- .gitignore | 1 + .../internal/pusher/poison_pill_test.go | 26 ++++---- .../internal/pusher/retryheap.go | 53 +++++------------ .../internal/pusher/retryheap_test.go | 59 +++++++------------ 4 files changed, 47 insertions(+), 92 deletions(-) diff --git a/.gitignore b/.gitignore index 9118e6e9c2..38313bfed5 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ CWAGENT_VERSION terraform.* **/.terraform/* coverage.txt +agent-sops/ diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/poison_pill_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/poison_pill_test.go index 976f9840d0..bb4589d4e4 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/poison_pill_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/poison_pill_test.go @@ -177,22 +177,16 @@ func TestPoisonPillScenario(t *testing.T) { // TestRetryHeapSmallerThanFailingLogGroups tests the specific bottleneck scenario where: // - Retry heap size = concurrency (e.g., 2) // - Number of failing log groups (10) > retry heap size (2) -// - This causes the retry heap to fill up with failed batches -// - New batches from failing log groups block trying to push to full heap -// - Workers get stuck waiting to push failed batches back to heap -// - Allowed log group gets starved of worker time +// - With bounded heap: This caused deadlock as heap filled up +// - With unbounded heap: System handles this gracefully // -// This test validates the ACTUAL bug: when retry heap size (equal to concurrency) -// is smaller than the number of failing log groups, the system deadlocks. -// -// **EXPECTED BEHAVIOR**: This test will timeout/deadlock, proving the bug exists. +// This test validates the FIX: unbounded retry heap allows all failed batches +// to be queued without blocking workers. func TestRetryHeapSmallerThanFailingLogGroups(t *testing.T) { - t.Skip("This test intentionally deadlocks to demonstrate the poison pill bug where heap size < failing log groups") - concurrency := 2 numFailingLogGroups := 10 - // CRITICAL: Retry heap size equals concurrency (this is the bug) + // Retry heap is now unbounded (maxSize parameter ignored) heap := NewRetryHeap(concurrency, &testutil.Logger{}) defer heap.Stop() @@ -237,7 +231,6 @@ func TestRetryHeapSmallerThanFailingLogGroups(t *testing.T) { var wg sync.WaitGroup // Generate batches for all failing log groups continuously - // This will cause deadlock as heap fills up for i := 0; i < numFailingLogGroups; i++ { wg.Add(1) go func(target Target) { @@ -255,7 +248,6 @@ func TestRetryHeapSmallerThanFailingLogGroups(t *testing.T) { } batch := createBatch(target, 10) batch.nextRetryTime = time.Now().Add(-1 * time.Second) - // This will block when heap is full heap.Push(batch) batchCount++ } @@ -313,11 +305,13 @@ func TestRetryHeapSmallerThanFailingLogGroups(t *testing.T) { successCount := allowedGroupSuccessCount.Load() t.Logf("Results: Allowed success=%d, Denied attempts=%d, Heap size=%d, Failing groups=%d", - successCount, deniedGroupAttemptCount.Load(), concurrency, numFailingLogGroups) + successCount, deniedGroupAttemptCount.Load(), heap.Size(), numFailingLogGroups) - // This test documents the bug: with heap size < failing log groups, the system deadlocks + // With unbounded heap, allowed log group should receive events if successCount == 0 { - t.Errorf("POISON PILL BUG DETECTED: Allowed log group received 0 events. Heap size (%d) < failing groups (%d) caused deadlock", concurrency, numFailingLogGroups) + t.Errorf("UNEXPECTED: Allowed log group received 0 events with unbounded heap") + } else { + t.Logf("SUCCESS: Unbounded heap handled poison pill scenario: %d successful publishes despite %d failing groups", successCount, numFailingLogGroups) } } diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go b/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go index 0c5f0f3e54..4109ea623c 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go @@ -49,56 +49,37 @@ type RetryHeap interface { } type retryHeap struct { - heap retryHeapImpl - mutex sync.RWMutex - semaphore chan struct{} // Size enforcer - stopCh chan struct{} - maxSize int - stopped bool - logger telegraf.Logger + heap retryHeapImpl + mutex sync.RWMutex + stopCh chan struct{} + stopped bool + logger telegraf.Logger } var _ RetryHeap = (*retryHeap)(nil) -// NewRetryHeap creates a new retry heap with the specified maximum size +// NewRetryHeap creates a new retry heap (unbounded) func NewRetryHeap(maxSize int, logger telegraf.Logger) RetryHeap { rh := &retryHeap{ - heap: make(retryHeapImpl, 0, maxSize), - maxSize: maxSize, - semaphore: make(chan struct{}, maxSize), // Semaphore for size enforcement - stopCh: make(chan struct{}), - logger: logger, + heap: make(retryHeapImpl, 0), + stopCh: make(chan struct{}), + logger: logger, } heap.Init(&rh.heap) return rh } -// Push adds a batch to the heap, blocking if full +// Push adds a batch to the heap (non-blocking) func (rh *retryHeap) Push(batch *logEventBatch) error { - rh.mutex.RLock() + rh.mutex.Lock() + defer rh.mutex.Unlock() + if rh.stopped { - rh.mutex.RUnlock() - return errors.New("retry heap stopped") - } - rh.mutex.RUnlock() - - // Acquire semaphore slot (blocks if at maxSize capacity) - select { - case rh.semaphore <- struct{}{}: - // add batch to heap with mutex protection - rh.mutex.Lock() - if rh.stopped { - // Release semaphore if stopped after acquiring - <-rh.semaphore - rh.mutex.Unlock() - return errors.New("retry heap stopped") - } - heap.Push(&rh.heap, batch) - rh.mutex.Unlock() - return nil - case <-rh.stopCh: return errors.New("retry heap stopped") } + + heap.Push(&rh.heap, batch) + return nil } // PopReady returns all batches that are ready for retry (nextRetryTime <= now) @@ -113,8 +94,6 @@ func (rh *retryHeap) PopReady() []*logEventBatch { for len(rh.heap) > 0 && !rh.heap[0].nextRetryTime.After(now) { batch := heap.Pop(&rh.heap).(*logEventBatch) ready = append(ready, batch) - // Release semaphore slot for each popped batch - <-rh.semaphore } return ready diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go index edc7dbc314..038bdbfdbf 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go @@ -145,65 +145,46 @@ func TestRetryHeapProcessorSendsBatch(t *testing.T) { assert.Equal(t, 0, heap.Size()) } -func TestRetryHeap_SemaphoreBlockingAndUnblocking(t *testing.T) { - heap := NewRetryHeap(2, &testutil.Logger{}) // maxSize = 2 +func TestRetryHeap_UnboundedPush(t *testing.T) { + heap := NewRetryHeap(2, &testutil.Logger{}) // maxSize parameter ignored (unbounded) defer heap.Stop() - // Fill heap to capacity with batches that will be ready in 3 seconds + // Push multiple batches without blocking target := Target{Group: "group", Stream: "stream"} batch1 := newLogEventBatch(target, nil) batch1.nextRetryTime = time.Now().Add(3 * time.Second) batch2 := newLogEventBatch(target, nil) batch2.nextRetryTime = time.Now().Add(3 * time.Second) + batch3 := newLogEventBatch(target, nil) + batch3.nextRetryTime = time.Now().Add(3 * time.Second) - heap.Push(batch1) - heap.Push(batch2) - - // Verify heap is at capacity - if heap.Size() != 2 { - t.Fatalf("Expected size 2, got %d", heap.Size()) - } + // All pushes should succeed immediately (non-blocking) + err := heap.Push(batch1) + assert.NoError(t, err) + err = heap.Push(batch2) + assert.NoError(t, err) + err = heap.Push(batch3) + assert.NoError(t, err) - // Test that semaphore is actually blocking by trying to push in a goroutine - pushResult := make(chan error, 1) - - go func() { - batch3 := newLogEventBatch(target, nil) - batch3.nextRetryTime = time.Now().Add(-1 * time.Hour) - heap.Push(batch3) // This should block on semaphore - pushResult <- nil - }() - - // Verify the push is blocked (expects no result in channel) - select { - case <-pushResult: - t.Fatal("Unexpected push, heap should be blocked") - case <-time.After(100 * time.Millisecond): - // Push is successfully blocked when at capacity + // Verify heap can grow beyond original maxSize parameter + if heap.Size() != 3 { + t.Fatalf("Expected size 3, got %d", heap.Size()) } time.Sleep(3 * time.Second) - // Pop ready batches to release semaphore slots + // Pop ready batches readyBatches := heap.PopReady() - assert.Len(t, readyBatches, 2, "Should pop exactly 2 ready batches") + assert.Len(t, readyBatches, 3, "Should pop exactly 3 ready batches") for _, batch := range readyBatches { assert.Equal(t, "group", batch.Group) assert.Equal(t, "stream", batch.Stream) } - // Expects push to now be unblocked - select { - case err := <-pushResult: - assert.NoError(t, err, "Push should succeed after PopReady") - case <-time.After(100 * time.Millisecond): - t.Fatal("Unexpected timeout, heap should be unblocked") - } - - // Verify 1 item remaining in heap (2 popped, 1 pushed) - if heap.Size() != 1 { - t.Fatalf("Expected size 1 after pop/push cycle, got %d", heap.Size()) + // Verify heap is empty + if heap.Size() != 0 { + t.Fatalf("Expected size 0 after pop, got %d", heap.Size()) } }