diff --git a/.gitignore b/.gitignore index 9118e6e9c2..38313bfed5 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ CWAGENT_VERSION terraform.* **/.terraform/* coverage.txt +agent-sops/ diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/poison_pill_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/poison_pill_test.go new file mode 100644 index 0000000000..bb4589d4e4 --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/poison_pill_test.go @@ -0,0 +1,384 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/aws/amazon-cloudwatch-agent/internal/retryer" + "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" +) + +// TestPoisonPillScenario validates that when multiple log groups encounter +// AccessDenied errors simultaneously with low concurrency, the agent continues +// publishing to allowed log groups without blocking the entire pipeline. +// +// This test recreates the scenario from poison-pill-test-findings.md where: +// - 1 allowed log group + 10 denied log groups +// - Concurrency = 2 +// - Continuous stream of new batches (simulating force_flush_interval=5s) +// - Expected: Allowed log group continues receiving events +// - Historical Bug: Agent stopped publishing to ALL log groups after ~5 minutes +// +// This test validates that the retry heap and worker pool architecture correctly +// handles this scenario by: +// 1. Continuously generating batches for 10 denied + 1 allowed log group +// 2. Processing with only 2 workers (low concurrency) +// 3. Verifying allowed log group continues to receive events throughout +// 4. Ensuring worker pool doesn't get saturated by failed retry attempts +// +// The test passes because the current implementation uses a retry heap with +// proper backoff, preventing failed batches from monopolizing worker threads. +func TestPoisonPillScenario(t *testing.T) { + heap := NewRetryHeap(100, &testutil.Logger{}) + defer heap.Stop() + + workerPool := NewWorkerPool(2) // Low concurrency as in the bug scenario + defer workerPool.Stop() + + mockService := &mockLogsService{} + mockTargetManager := &mockTargetManager{} + mockTargetManager.On("EnsureTargetExists", mock.Anything).Return(nil) + + accessDeniedErr := &cloudwatchlogs.AccessDeniedException{ + Message_: stringPtr("User is not authorized to perform: logs:PutLogEvents with an explicit deny"), + } + + // Track successful PutLogEvents calls for the allowed log group + var allowedGroupSuccessCount atomic.Int32 + var deniedGroupAttemptCount atomic.Int32 + + // Configure mock service responses with realistic latency + mockService.On("PutLogEvents", mock.MatchedBy(func(input *cloudwatchlogs.PutLogEventsInput) bool { + return *input.LogGroupName == "log-stream-ple-access-granted" + })).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Run(func(args mock.Arguments) { + time.Sleep(10 * time.Millisecond) // Simulate API latency + allowedGroupSuccessCount.Add(1) + }) + + mockService.On("PutLogEvents", mock.MatchedBy(func(input *cloudwatchlogs.PutLogEventsInput) bool { + return *input.LogGroupName != "log-stream-ple-access-granted" + })).Return((*cloudwatchlogs.PutLogEventsOutput)(nil), accessDeniedErr).Run(func(args mock.Arguments) { + time.Sleep(10 * time.Millisecond) // Simulate API latency + deniedGroupAttemptCount.Add(1) + }) + + processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, 100*time.Millisecond, retryer.NewLogThrottleRetryer(&testutil.Logger{})) + + // Targets + allowedTarget := Target{Group: "log-stream-ple-access-granted", Stream: "i-test"} + deniedTargets := make([]Target, 10) + for i := 0; i < 10; i++ { + deniedTargets[i] = Target{ + Group: "aws-restricted-log-group-name-log-stream-ple-access-denied" + string(rune('0'+i)), + Stream: "i-test", + } + } + + // Simulate continuous batch generation over time (like force_flush_interval=5s) + done := make(chan struct{}) + var wg sync.WaitGroup + + // Continuously generate batches for denied log groups (simulating continuous log writes) + for i := 0; i < 10; i++ { + wg.Add(1) + go func(target Target) { + defer wg.Done() + ticker := time.NewTicker(50 * time.Millisecond) // Simulate flush interval + defer ticker.Stop() + batchCount := 0 + for { + select { + case <-done: + return + case <-ticker.C: + if batchCount >= 5 { // Generate 5 batches per denied log group + return + } + batch := createBatch(target, 50) + batch.nextRetryTime = time.Now().Add(-1 * time.Second) + heap.Push(batch) + batchCount++ + } + } + }(deniedTargets[i]) + } + + // Continuously generate batches for allowed log group + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(50 * time.Millisecond) + defer ticker.Stop() + batchCount := 0 + for { + select { + case <-done: + return + case <-ticker.C: + if batchCount >= 10 { // Generate 10 batches for allowed log group + return + } + batch := createBatch(allowedTarget, 20) + batch.nextRetryTime = time.Now().Add(-1 * time.Second) + heap.Push(batch) + batchCount++ + } + } + }() + + // Process batches continuously + processorDone := make(chan struct{}) + go func() { + ticker := time.NewTicker(20 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-processorDone: + return + case <-ticker.C: + processor.processReadyMessages() + } + } + }() + + // Run for 2 seconds to simulate sustained load + time.Sleep(2 * time.Second) + close(done) + wg.Wait() + + // Process remaining messages + time.Sleep(500 * time.Millisecond) + processor.processReadyMessages() + time.Sleep(200 * time.Millisecond) + close(processorDone) + + // CRITICAL ASSERTION: Allowed log group MUST receive events throughout the test + successCount := allowedGroupSuccessCount.Load() + t.Logf("Allowed group success count: %d, Denied group attempt count: %d", successCount, deniedGroupAttemptCount.Load()) + + assert.Greater(t, successCount, int32(5), + "Allowed log group must continue receiving events despite continuous denied log group failures. Got %d, expected > 5", successCount) + + // Verify denied log groups attempted to send + assert.Greater(t, deniedGroupAttemptCount.Load(), int32(0), + "Denied log groups should have attempted to send") +} + +// TestRetryHeapSmallerThanFailingLogGroups tests the specific bottleneck scenario where: +// - Retry heap size = concurrency (e.g., 2) +// - Number of failing log groups (10) > retry heap size (2) +// - With bounded heap: This caused deadlock as heap filled up +// - With unbounded heap: System handles this gracefully +// +// This test validates the FIX: unbounded retry heap allows all failed batches +// to be queued without blocking workers. +func TestRetryHeapSmallerThanFailingLogGroups(t *testing.T) { + concurrency := 2 + numFailingLogGroups := 10 + + // Retry heap is now unbounded (maxSize parameter ignored) + heap := NewRetryHeap(concurrency, &testutil.Logger{}) + defer heap.Stop() + + workerPool := NewWorkerPool(concurrency) + defer workerPool.Stop() + + mockService := &mockLogsService{} + mockTargetManager := &mockTargetManager{} + mockTargetManager.On("EnsureTargetExists", mock.Anything).Return(nil) + + accessDeniedErr := &cloudwatchlogs.AccessDeniedException{ + Message_: stringPtr("Access denied"), + } + + var allowedGroupSuccessCount atomic.Int32 + var deniedGroupAttemptCount atomic.Int32 + + mockService.On("PutLogEvents", mock.MatchedBy(func(input *cloudwatchlogs.PutLogEventsInput) bool { + return *input.LogGroupName == "allowed" + })).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Run(func(args mock.Arguments) { + time.Sleep(10 * time.Millisecond) + allowedGroupSuccessCount.Add(1) + }) + + mockService.On("PutLogEvents", mock.MatchedBy(func(input *cloudwatchlogs.PutLogEventsInput) bool { + return *input.LogGroupName != "allowed" + })).Return((*cloudwatchlogs.PutLogEventsOutput)(nil), accessDeniedErr).Run(func(args mock.Arguments) { + time.Sleep(10 * time.Millisecond) + deniedGroupAttemptCount.Add(1) + }) + + processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, 50*time.Millisecond, retryer.NewLogThrottleRetryer(&testutil.Logger{})) + + // Create targets + allowedTarget := Target{Group: "allowed", Stream: "stream"} + deniedTargets := make([]Target, numFailingLogGroups) + for i := 0; i < numFailingLogGroups; i++ { + deniedTargets[i] = Target{Group: fmt.Sprintf("denied-%d", i), Stream: "stream"} + } + + done := make(chan struct{}) + var wg sync.WaitGroup + + // Generate batches for all failing log groups continuously + for i := 0; i < numFailingLogGroups; i++ { + wg.Add(1) + go func(target Target) { + defer wg.Done() + ticker := time.NewTicker(30 * time.Millisecond) + defer ticker.Stop() + batchCount := 0 + for { + select { + case <-done: + return + case <-ticker.C: + if batchCount >= 3 { + return + } + batch := createBatch(target, 10) + batch.nextRetryTime = time.Now().Add(-1 * time.Second) + heap.Push(batch) + batchCount++ + } + } + }(deniedTargets[i]) + } + + // Generate batches for allowed log group + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(30 * time.Millisecond) + defer ticker.Stop() + batchCount := 0 + for { + select { + case <-done: + return + case <-ticker.C: + if batchCount >= 5 { + return + } + batch := createBatch(allowedTarget, 10) + batch.nextRetryTime = time.Now().Add(-1 * time.Second) + heap.Push(batch) + batchCount++ + } + } + }() + + // Process continuously + processorDone := make(chan struct{}) + go func() { + ticker := time.NewTicker(15 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-processorDone: + return + case <-ticker.C: + processor.processReadyMessages() + } + } + }() + + // Run for 1 second + time.Sleep(1 * time.Second) + close(done) + wg.Wait() + time.Sleep(300 * time.Millisecond) + processor.processReadyMessages() + time.Sleep(100 * time.Millisecond) + close(processorDone) + + successCount := allowedGroupSuccessCount.Load() + + t.Logf("Results: Allowed success=%d, Denied attempts=%d, Heap size=%d, Failing groups=%d", + successCount, deniedGroupAttemptCount.Load(), heap.Size(), numFailingLogGroups) + + // With unbounded heap, allowed log group should receive events + if successCount == 0 { + t.Errorf("UNEXPECTED: Allowed log group received 0 events with unbounded heap") + } else { + t.Logf("SUCCESS: Unbounded heap handled poison pill scenario: %d successful publishes despite %d failing groups", successCount, numFailingLogGroups) + } +} + +// TestSingleDeniedLogGroup validates the baseline scenario where a single denied +// log group does not affect the allowed log group. +func TestSingleDeniedLogGroup(t *testing.T) { + heap := NewRetryHeap(10, &testutil.Logger{}) + defer heap.Stop() + + workerPool := NewWorkerPool(4) // Higher concurrency as in initial test + defer workerPool.Stop() + + mockService := &mockLogsService{} + mockTargetManager := &mockTargetManager{} + mockTargetManager.On("EnsureTargetExists", mock.Anything).Return(nil) + + accessDeniedErr := &cloudwatchlogs.AccessDeniedException{ + Message_: stringPtr("Access denied"), + } + + var allowedGroupSuccessCount atomic.Int32 + + mockService.On("PutLogEvents", mock.MatchedBy(func(input *cloudwatchlogs.PutLogEventsInput) bool { + return *input.LogGroupName == "log-stream-ple-access-granted" + })).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Run(func(args mock.Arguments) { + allowedGroupSuccessCount.Add(1) + }) + + mockService.On("PutLogEvents", mock.MatchedBy(func(input *cloudwatchlogs.PutLogEventsInput) bool { + return *input.LogGroupName == "aws-restricted-log-group-name-log-stream-ple-access-denied" + })).Return((*cloudwatchlogs.PutLogEventsOutput)(nil), accessDeniedErr) + + processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, time.Hour, retryer.NewLogThrottleRetryer(&testutil.Logger{})) + + // Create batches + allowedTarget := Target{Group: "log-stream-ple-access-granted", Stream: "i-test"} + deniedTarget := Target{Group: "aws-restricted-log-group-name-log-stream-ple-access-denied", Stream: "i-test"} + + allowedBatch := createBatch(allowedTarget, 40) + deniedBatch := createBatch(deniedTarget, 40) + + allowedBatch.nextRetryTime = time.Now().Add(-1 * time.Second) + deniedBatch.nextRetryTime = time.Now().Add(-1 * time.Second) + + err := heap.Push(allowedBatch) + assert.NoError(t, err) + err = heap.Push(deniedBatch) + assert.NoError(t, err) + + processor.processReadyMessages() + time.Sleep(100 * time.Millisecond) + + // Verify allowed log group received events + assert.Greater(t, allowedGroupSuccessCount.Load(), int32(0), + "Allowed log group must receive events with single denied log group") +} + +// createBatch creates a log event batch with the specified number of events +func createBatch(target Target, eventCount int) *logEventBatch { + batch := newLogEventBatch(target, nil) + batch.events = make([]*cloudwatchlogs.InputLogEvent, eventCount) + now := time.Now().Unix() * 1000 + for i := 0; i < eventCount; i++ { + batch.events[i] = &cloudwatchlogs.InputLogEvent{ + Message: stringPtr("test message"), + Timestamp: int64Ptr(now + int64(i)), + } + } + return batch +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go b/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go index 0c5f0f3e54..4109ea623c 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go @@ -49,56 +49,37 @@ type RetryHeap interface { } type retryHeap struct { - heap retryHeapImpl - mutex sync.RWMutex - semaphore chan struct{} // Size enforcer - stopCh chan struct{} - maxSize int - stopped bool - logger telegraf.Logger + heap retryHeapImpl + mutex sync.RWMutex + stopCh chan struct{} + stopped bool + logger telegraf.Logger } var _ RetryHeap = (*retryHeap)(nil) -// NewRetryHeap creates a new retry heap with the specified maximum size +// NewRetryHeap creates a new retry heap (unbounded) func NewRetryHeap(maxSize int, logger telegraf.Logger) RetryHeap { rh := &retryHeap{ - heap: make(retryHeapImpl, 0, maxSize), - maxSize: maxSize, - semaphore: make(chan struct{}, maxSize), // Semaphore for size enforcement - stopCh: make(chan struct{}), - logger: logger, + heap: make(retryHeapImpl, 0), + stopCh: make(chan struct{}), + logger: logger, } heap.Init(&rh.heap) return rh } -// Push adds a batch to the heap, blocking if full +// Push adds a batch to the heap (non-blocking) func (rh *retryHeap) Push(batch *logEventBatch) error { - rh.mutex.RLock() + rh.mutex.Lock() + defer rh.mutex.Unlock() + if rh.stopped { - rh.mutex.RUnlock() - return errors.New("retry heap stopped") - } - rh.mutex.RUnlock() - - // Acquire semaphore slot (blocks if at maxSize capacity) - select { - case rh.semaphore <- struct{}{}: - // add batch to heap with mutex protection - rh.mutex.Lock() - if rh.stopped { - // Release semaphore if stopped after acquiring - <-rh.semaphore - rh.mutex.Unlock() - return errors.New("retry heap stopped") - } - heap.Push(&rh.heap, batch) - rh.mutex.Unlock() - return nil - case <-rh.stopCh: return errors.New("retry heap stopped") } + + heap.Push(&rh.heap, batch) + return nil } // PopReady returns all batches that are ready for retry (nextRetryTime <= now) @@ -113,8 +94,6 @@ func (rh *retryHeap) PopReady() []*logEventBatch { for len(rh.heap) > 0 && !rh.heap[0].nextRetryTime.After(now) { batch := heap.Pop(&rh.heap).(*logEventBatch) ready = append(ready, batch) - // Release semaphore slot for each popped batch - <-rh.semaphore } return ready diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go index edc7dbc314..038bdbfdbf 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go @@ -145,65 +145,46 @@ func TestRetryHeapProcessorSendsBatch(t *testing.T) { assert.Equal(t, 0, heap.Size()) } -func TestRetryHeap_SemaphoreBlockingAndUnblocking(t *testing.T) { - heap := NewRetryHeap(2, &testutil.Logger{}) // maxSize = 2 +func TestRetryHeap_UnboundedPush(t *testing.T) { + heap := NewRetryHeap(2, &testutil.Logger{}) // maxSize parameter ignored (unbounded) defer heap.Stop() - // Fill heap to capacity with batches that will be ready in 3 seconds + // Push multiple batches without blocking target := Target{Group: "group", Stream: "stream"} batch1 := newLogEventBatch(target, nil) batch1.nextRetryTime = time.Now().Add(3 * time.Second) batch2 := newLogEventBatch(target, nil) batch2.nextRetryTime = time.Now().Add(3 * time.Second) + batch3 := newLogEventBatch(target, nil) + batch3.nextRetryTime = time.Now().Add(3 * time.Second) - heap.Push(batch1) - heap.Push(batch2) - - // Verify heap is at capacity - if heap.Size() != 2 { - t.Fatalf("Expected size 2, got %d", heap.Size()) - } + // All pushes should succeed immediately (non-blocking) + err := heap.Push(batch1) + assert.NoError(t, err) + err = heap.Push(batch2) + assert.NoError(t, err) + err = heap.Push(batch3) + assert.NoError(t, err) - // Test that semaphore is actually blocking by trying to push in a goroutine - pushResult := make(chan error, 1) - - go func() { - batch3 := newLogEventBatch(target, nil) - batch3.nextRetryTime = time.Now().Add(-1 * time.Hour) - heap.Push(batch3) // This should block on semaphore - pushResult <- nil - }() - - // Verify the push is blocked (expects no result in channel) - select { - case <-pushResult: - t.Fatal("Unexpected push, heap should be blocked") - case <-time.After(100 * time.Millisecond): - // Push is successfully blocked when at capacity + // Verify heap can grow beyond original maxSize parameter + if heap.Size() != 3 { + t.Fatalf("Expected size 3, got %d", heap.Size()) } time.Sleep(3 * time.Second) - // Pop ready batches to release semaphore slots + // Pop ready batches readyBatches := heap.PopReady() - assert.Len(t, readyBatches, 2, "Should pop exactly 2 ready batches") + assert.Len(t, readyBatches, 3, "Should pop exactly 3 ready batches") for _, batch := range readyBatches { assert.Equal(t, "group", batch.Group) assert.Equal(t, "stream", batch.Stream) } - // Expects push to now be unblocked - select { - case err := <-pushResult: - assert.NoError(t, err, "Push should succeed after PopReady") - case <-time.After(100 * time.Millisecond): - t.Fatal("Unexpected timeout, heap should be unblocked") - } - - // Verify 1 item remaining in heap (2 popped, 1 pushed) - if heap.Size() != 1 { - t.Fatalf("Expected size 1 after pop/push cycle, got %d", heap.Size()) + // Verify heap is empty + if heap.Size() != 0 { + t.Fatalf("Expected size 0 after pop, got %d", heap.Size()) } }