diff --git a/.github/workflows/PR-build.yml b/.github/workflows/PR-build.yml index ac764e576c..9a89c0db3d 100644 --- a/.github/workflows/PR-build.yml +++ b/.github/workflows/PR-build.yml @@ -48,7 +48,7 @@ jobs: if: needs.changes.outputs.lint == 'true' uses: actions/setup-go@v4 with: - go-version: ~1.25 + go-version: ~1.25.7 cache: false - name: Check out code @@ -105,7 +105,7 @@ jobs: if: needs.changes.outputs.build == 'true' uses: actions/setup-go@v4 with: - go-version: ~1.25 + go-version: ~1.25.7 cache: false - name: Check out code @@ -152,7 +152,7 @@ jobs: if: needs.changes.outputs.build == 'true' uses: actions/setup-go@v4 with: - go-version: ~1.25 + go-version: ~1.25.7 cache: false - name: Check out code diff --git a/.github/workflows/PR-test.yml b/.github/workflows/PR-test.yml index 7e4f85f7bf..742e9870f2 100644 --- a/.github/workflows/PR-test.yml +++ b/.github/workflows/PR-test.yml @@ -95,7 +95,7 @@ jobs: - name: Set up Go 1.x uses: actions/setup-go@v4 with: - go-version: ~1.25 + go-version: ~1.25.7 StartLocalStack: name: 'StartLocalStack' @@ -138,7 +138,7 @@ jobs: - name: Set up Go 1.x uses: actions/setup-go@v4 with: - go-version: ~1.25 + go-version: ~1.25.7 - name: Install jq run: sudo apt-get install -y jq diff --git a/.github/workflows/application-signals-e2e-test.yml b/.github/workflows/application-signals-e2e-test.yml index 656d75b129..94f4579a58 100644 --- a/.github/workflows/application-signals-e2e-test.yml +++ b/.github/workflows/application-signals-e2e-test.yml @@ -41,13 +41,20 @@ jobs: exit 1 fi - run: | - conclusion=$(gh run view ${{ inputs.build_run_id }} --repo $GITHUB_REPOSITORY --json conclusion -q '.conclusion') - if [[ $conclusion == "success" ]]; then - echo "Run succeeded" - else - echo "Run failed" - exit 1 - fi + for i in {1..6}; do + conclusion=$(gh run view ${{ inputs.build_run_id }} --repo $GITHUB_REPOSITORY --json conclusion -q '.conclusion') + if [[ "$conclusion" == "success" ]]; then + echo "Run succeeded" + exit 0 + elif [[ "$conclusion" == "failure" || "$conclusion" == "cancelled" ]]; then + echo "Run failed with: $conclusion" + exit 1 + fi + echo "Waiting for workflow to complete (attempt $i)..." + sleep 5 + done + echo "Timed out waiting for workflow" + exit 1 env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/build-test-artifacts.yml b/.github/workflows/build-test-artifacts.yml index ad180deb77..e5b0d793a4 100644 --- a/.github/workflows/build-test-artifacts.yml +++ b/.github/workflows/build-test-artifacts.yml @@ -19,14 +19,14 @@ on: - cron: '0 11 * * 1,2,3,4,5' # Every day at 11:00 UTC on Monday to Friday workflow_dispatch: inputs: - test-image-before-upload: - description: "Run Test on the new container image" + run-tests: + description: "Run test workflows after build" default: true type: boolean workflow_call: inputs: - test-image-before-upload: - description: "Run Test on the new container image" + run-tests: + description: "Run test workflows after build" default: true type: boolean @@ -114,7 +114,7 @@ jobs: StartIntegrationTests: needs: [ BuildAndUploadPackages, BuildAndUploadITAR, BuildAndUploadCN, BuildDocker, BuildDistributor ] - if: ${{ github.event_name == 'push' || inputs.test-image-before-upload }} + if: ${{ github.event_name == 'push' || inputs.run-tests }} runs-on: ubuntu-latest permissions: actions: write @@ -126,7 +126,7 @@ jobs: StartApplicationSignalsE2ETests: needs: [ BuildAndUploadPackages, BuildAndUploadITAR, BuildAndUploadCN, BuildDocker, BuildDistributor ] # Workflow only runs against main - if: ${{ github.event_name == 'push' || inputs.test-image-before-upload }} + if: ${{ github.event_name == 'push' || inputs.run-tests }} runs-on: ubuntu-latest permissions: actions: write @@ -137,7 +137,7 @@ jobs: StartEKSE2ETests: needs: [ BuildAndUploadPackages, BuildAndUploadITAR, BuildAndUploadCN, BuildDocker, BuildDistributor ] - if: ${{ github.event_name == 'push' || inputs.test-image-before-upload }} + if: ${{ github.event_name == 'push' || inputs.run-tests }} runs-on: ubuntu-latest permissions: actions: write @@ -148,7 +148,7 @@ jobs: StartWorkloadDiscoveryIntegrationTests: needs: [ BuildAndUploadPackages, BuildAndUploadITAR, BuildAndUploadCN, BuildDocker, BuildDistributor ] - if: ${{ github.event_name == 'push' || inputs.test-image-before-upload }} + if: ${{ github.event_name == 'push' || inputs.run-tests }} runs-on: ubuntu-latest permissions: actions: write diff --git a/.github/workflows/e2e-test.yml b/.github/workflows/e2e-test.yml index 6b907c472b..5397885df6 100644 --- a/.github/workflows/e2e-test.yml +++ b/.github/workflows/e2e-test.yml @@ -69,12 +69,16 @@ jobs: permissions: id-token: write contents: read + actions: write with: - test-image-before-upload: false + run-tests: false BuildOperator: needs: [GetLatestOperatorCommitSHA] uses: aws/amazon-cloudwatch-agent-operator/.github/workflows/build-and-upload.yml@main + permissions: + id-token: write + contents: read concurrency: group: ${{ github.workflow }}-operator-${{ inputs.operator-branch || 'main' }} cancel-in-progress: true @@ -106,7 +110,7 @@ jobs: - name: Set up Go 1.x uses: actions/setup-go@v4 with: - go-version: ~1.25 + go-version: ~1.25.7 - name: SetOutputs id: set-outputs @@ -143,7 +147,7 @@ jobs: - name: Set up Go 1.x uses: actions/setup-go@v4 with: - go-version: ~1.25 + go-version: ~1.25.7 - name: Generate matrix id: set-matrix @@ -159,6 +163,9 @@ jobs: needs: [ GetLatestOperatorCommitSHA, GenerateTestMatrix, OutputEnvVariables ] if: always() && !cancelled() && !contains(needs.*.result, 'failure') name: 'EKSE2EJVMTomcatTestHelm' + permissions: + id-token: write + contents: read uses: ./.github/workflows/eks-e2e-test.yml with: terraform_dir: terraform/eks/e2e @@ -183,6 +190,9 @@ jobs: needs: [ GetLatestOperatorCommitSHA, GenerateTestMatrix, OutputEnvVariables ] if: always() && !cancelled() && !contains(needs.*.result, 'failure') name: 'EKSE2EJVMTomcatTestAddon' + permissions: + id-token: write + contents: read uses: ./.github/workflows/eks-e2e-test.yml with: terraform_dir: terraform/eks/e2e @@ -207,6 +217,9 @@ jobs: needs: [ GetLatestOperatorCommitSHA, GenerateTestMatrix, OutputEnvVariables ] if: always() && !cancelled() && !contains(needs.*.result, 'failure') name: 'EKSE2EKafkaTestHelm' + permissions: + id-token: write + contents: read uses: ./.github/workflows/eks-e2e-test.yml with: terraform_dir: terraform/eks/e2e @@ -231,6 +244,9 @@ jobs: needs: [ GetLatestOperatorCommitSHA, GenerateTestMatrix, OutputEnvVariables ] if: always() && !cancelled() && !contains(needs.*.result, 'failure') name: 'EKSE2EKafkaTestAddon' + permissions: + id-token: write + contents: read uses: ./.github/workflows/eks-e2e-test.yml with: terraform_dir: terraform/eks/e2e @@ -255,6 +271,9 @@ jobs: needs: [ GetLatestOperatorCommitSHA, GenerateTestMatrix, OutputEnvVariables ] if: always() && !cancelled() && !contains(needs.*.result, 'failure') name: 'EKSE2EJMXContainerInsightsTestHelm' + permissions: + id-token: write + contents: read uses: ./.github/workflows/eks-e2e-test.yml with: terraform_dir: terraform/eks/e2e @@ -279,6 +298,9 @@ jobs: needs: [ GetLatestOperatorCommitSHA, GenerateTestMatrix, OutputEnvVariables ] if: always() && !cancelled() && !contains(needs.*.result, 'failure') name: 'EKSE2EJMXContainerInsightsTestAddon' + permissions: + id-token: write + contents: read uses: ./.github/workflows/eks-e2e-test.yml with: terraform_dir: terraform/eks/e2e @@ -303,6 +325,9 @@ jobs: needs: [ GetLatestOperatorCommitSHA, GenerateTestMatrix, OutputEnvVariables ] if: always() && !cancelled() && !contains(needs.*.result, 'failure') name: 'EKSE2EJVMTomcatTestHelmIPv6' + permissions: + id-token: write + contents: read uses: ./.github/workflows/eks-e2e-test.yml with: terraform_dir: terraform/eks/e2e @@ -329,6 +354,9 @@ jobs: needs: [ GetLatestOperatorCommitSHA, GenerateTestMatrix, OutputEnvVariables ] if: always() && !cancelled() && !contains(needs.*.result, 'failure') name: 'EKSE2EJVMTomcatTestAddonIPv6' + permissions: + id-token: write + contents: read uses: ./.github/workflows/eks-e2e-test.yml with: terraform_dir: terraform/eks/e2e @@ -355,6 +383,9 @@ jobs: needs: [ GetLatestOperatorCommitSHA, GenerateTestMatrix, OutputEnvVariables ] if: always() && !cancelled() && !contains(needs.*.result, 'failure') name: 'EKSE2EKafkaTestHelmIPv6' + permissions: + id-token: write + contents: read uses: ./.github/workflows/eks-e2e-test.yml with: terraform_dir: terraform/eks/e2e @@ -381,6 +412,9 @@ jobs: needs: [ GetLatestOperatorCommitSHA, GenerateTestMatrix, OutputEnvVariables ] if: always() && !cancelled() && !contains(needs.*.result, 'failure') name: 'EKSE2EKafkaTestAddonIPv6' + permissions: + id-token: write + contents: read uses: ./.github/workflows/eks-e2e-test.yml with: terraform_dir: terraform/eks/e2e @@ -407,6 +441,9 @@ jobs: needs: [ GetLatestOperatorCommitSHA, GenerateTestMatrix, OutputEnvVariables ] if: always() && !cancelled() && !contains(needs.*.result, 'failure') name: 'EKSE2EJMXContainerInsightsTestHelmIPv6' + permissions: + id-token: write + contents: read uses: ./.github/workflows/eks-e2e-test.yml with: terraform_dir: terraform/eks/e2e @@ -433,6 +470,9 @@ jobs: needs: [ GetLatestOperatorCommitSHA, GenerateTestMatrix, OutputEnvVariables ] if: always() && !cancelled() && !contains(needs.*.result, 'failure') name: 'EKSE2EJMXContainerInsightsTestAddonIPv6' + permissions: + id-token: write + contents: read uses: ./.github/workflows/eks-e2e-test.yml with: terraform_dir: terraform/eks/e2e diff --git a/.github/workflows/eks-performance-cluster-addon-install.yml b/.github/workflows/eks-performance-cluster-addon-install.yml index 60292bf99d..539b4fca86 100644 --- a/.github/workflows/eks-performance-cluster-addon-install.yml +++ b/.github/workflows/eks-performance-cluster-addon-install.yml @@ -122,14 +122,18 @@ jobs: permissions: id-token: write contents: read + actions: write with: - test-image-before-upload: false + run-tests: false # Build and upload operator image to ECR repo BuildOperator: needs: [ check-trigger, GetLatestOperatorCommitSHA ] if: ${{ needs.check-trigger.outputs.should_continue == 'true' }} uses: aws/amazon-cloudwatch-agent-operator/.github/workflows/build-and-upload.yml@main + permissions: + id-token: write + contents: read concurrency: group: ${{ github.workflow }}-operator-${{ inputs.operator-branch || 'main' }} cancel-in-progress: true @@ -173,10 +177,9 @@ jobs: run: | aws eks update-kubeconfig --name $CLUSTER_NAME --region $AWS_REGION - # TODO: Revert to using main helm branch when changes from leader-election are merged in - name: Clone Helm Charts Repository env: - HELM_CHARTS_BRANCH: ${{ inputs.helm-charts-branch || 'sky333999/leader-election' }} + HELM_CHARTS_BRANCH: ${{ inputs.helm-charts-branch || 'main' }} run: | rm -rf ./helm-charts git clone -b "$HELM_CHARTS_BRANCH" https://github.com/aws-observability/helm-charts.git ./helm-charts diff --git a/.github/workflows/eks-performance-cluster-tests.yml b/.github/workflows/eks-performance-cluster-tests.yml index 89ce188b2f..90429fab79 100644 --- a/.github/workflows/eks-performance-cluster-tests.yml +++ b/.github/workflows/eks-performance-cluster-tests.yml @@ -122,7 +122,7 @@ jobs: - name: Set up Go 1.x uses: actions/setup-go@v4 with: - go-version: ~1.25 + go-version: ~1.25.7 - uses: actions/checkout@v4 with: diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 5534cd3888..ef433f8e01 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -57,13 +57,20 @@ jobs: exit 1 fi - run: | - conclusion=$(gh run view ${{ inputs.build_run_id }} --repo $GITHUB_REPOSITORY --json conclusion -q '.conclusion') - if [[ $conclusion == "success" ]]; then - echo "Run succeeded" - else - echo "Run failed" - exit 1 - fi + for i in {1..6}; do + conclusion=$(gh run view ${{ inputs.build_run_id }} --repo $GITHUB_REPOSITORY --json conclusion -q '.conclusion') + if [[ "$conclusion" == "success" ]]; then + echo "Run succeeded" + exit 0 + elif [[ "$conclusion" == "failure" || "$conclusion" == "cancelled" ]]; then + echo "Run failed with: $conclusion" + exit 1 + fi + echo "Waiting for workflow to complete (attempt $i)..." + sleep 5 + done + echo "Timed out waiting for workflow" + exit 1 env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/otel-fork-replace.yml b/.github/workflows/otel-fork-replace.yml index 29707139b4..0f33b732b0 100644 --- a/.github/workflows/otel-fork-replace.yml +++ b/.github/workflows/otel-fork-replace.yml @@ -33,7 +33,7 @@ jobs: - name: Set up Go 1.x uses: actions/setup-go@v4 with: - go-version: ~1.25 + go-version: ~1.25.7 cache: false - name: Update OTel fork components version id: set-matrix diff --git a/.github/workflows/test-artifacts.yml b/.github/workflows/test-artifacts.yml index ff4c70a63d..292adee2f3 100644 --- a/.github/workflows/test-artifacts.yml +++ b/.github/workflows/test-artifacts.yml @@ -145,7 +145,7 @@ jobs: - name: Set up Go 1.x uses: actions/setup-go@v4 with: - go-version: ~1.25 + go-version: ~1.25.7 GenerateTestMatrix: name: 'GenerateTestMatrix' @@ -183,7 +183,7 @@ jobs: - name: Set up Go 1.x uses: actions/setup-go@v4 with: - go-version: ~1.25 + go-version: ~1.25.7 - name: Generate matrix id: set-matrix @@ -303,7 +303,7 @@ jobs: - name: Set up Go 1.x uses: actions/setup-go@v2 with: - go-version: ~1.25 + go-version: ~1.25.7 - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v4 diff --git a/.github/workflows/test-build-docker.yml b/.github/workflows/test-build-docker.yml index 03c5d143c0..7d474f94dc 100644 --- a/.github/workflows/test-build-docker.yml +++ b/.github/workflows/test-build-docker.yml @@ -143,7 +143,7 @@ jobs: - name: Set up Go 1.x uses: actions/setup-go@v4 with: - go-version: ~1.25 + go-version: ~1.25.7 - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v4 diff --git a/.github/workflows/test-build-packages.yml b/.github/workflows/test-build-packages.yml index 0814a80047..a14bc4e960 100644 --- a/.github/workflows/test-build-packages.yml +++ b/.github/workflows/test-build-packages.yml @@ -80,7 +80,7 @@ jobs: - name: Set up Go 1.x uses: actions/setup-go@v4 with: - go-version: ~1.25 + go-version: ~1.25.7 - name: Free up disk space working-directory: cwa diff --git a/.github/workflows/test-build.yml b/.github/workflows/test-build.yml index d72f59339c..94e6196f01 100644 --- a/.github/workflows/test-build.yml +++ b/.github/workflows/test-build.yml @@ -75,7 +75,7 @@ jobs: - name: Set up Go 1.x uses: actions/setup-go@v4 with: - go-version: ~1.25 + go-version: ~1.25.7 cache: false - name: Free up disk space diff --git a/.github/workflows/upload-dependencies.yml b/.github/workflows/upload-dependencies.yml index eb4dafb552..14523c62cf 100644 --- a/.github/workflows/upload-dependencies.yml +++ b/.github/workflows/upload-dependencies.yml @@ -45,7 +45,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v3 with: - go-version: ~1.25 + go-version: ~1.25.7 - name: Upload Dependencies and Test Repo env: diff --git a/.github/workflows/wd-integration-test.yml b/.github/workflows/wd-integration-test.yml index b3b35bf202..c5edc3f1f5 100644 --- a/.github/workflows/wd-integration-test.yml +++ b/.github/workflows/wd-integration-test.yml @@ -42,13 +42,20 @@ jobs: exit 1 fi - run: |- - conclusion=$(gh run view ${{ inputs.build_run_id }} --repo $GITHUB_REPOSITORY --json conclusion -q '.conclusion') - if [[ $conclusion == "success" ]]; then - echo "Run succeeded" - else - echo "Run failed" - exit 1 - fi + for i in {1..6}; do + conclusion=$(gh run view ${{ inputs.build_run_id }} --repo $GITHUB_REPOSITORY --json conclusion -q '.conclusion') + if [[ "$conclusion" == "success" ]]; then + echo "Run succeeded" + exit 0 + elif [[ "$conclusion" == "failure" || "$conclusion" == "cancelled" ]]; then + echo "Run failed with: $conclusion" + exit 1 + fi + echo "Waiting for workflow to complete (attempt $i)..." + sleep 5 + done + echo "Timed out waiting for workflow" + exit 1 env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go index 2ec06f52c0..854c5b515f 100644 --- a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go +++ b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go @@ -37,8 +37,6 @@ const ( LogEntryField = "value" defaultFlushTimeout = 5 * time.Second - - maxRetryTimeout = 14*24*time.Hour + 10*time.Minute ) var ( @@ -89,16 +87,12 @@ func (c *CloudWatchLogs) Connect() error { } func (c *CloudWatchLogs) Close() error { - // Stop components in specific order to prevent race conditions: - // 1. RetryHeap - stop accepting new batches first - // 2. Pushers - stop all active pushers (queues/senders) - // 3. Wait for pushers to complete - // 4. RetryHeapProcessor - stop retry processing and wait for WorkerPool usage to complete - // 5. WorkerPool - finally stop the worker threads - - if c.retryHeap != nil { - c.retryHeap.Stop() - } + // Shutdown order: + // 1. Stop all pushers (queues stop accepting new events, final send) + // 2. Wait for pushers to complete (in-flight sends finish, failed batches pushed to heap) + // 3. Stop RetryHeap (no more pushes accepted after this point) + // 4. Stop RetryHeapProcessor (flush remaining ready batches, stop goroutine) + // 5. Stop WorkerPool (drain worker threads) c.cwDests.Range(func(_, value interface{}) bool { if d, ok := value.(*cwDest); ok { @@ -109,6 +103,10 @@ func (c *CloudWatchLogs) Close() error { c.pusherWaitGroup.Wait() + if c.retryHeap != nil { + c.retryHeap.Stop() + } + if c.retryHeapProcessor != nil { c.retryHeapProcessor.Stop() } @@ -166,16 +164,16 @@ func (c *CloudWatchLogs) getDest(t pusher.Target, logSrc logs.LogSrc) *cwDest { c.once.Do(func() { if c.Concurrency > 1 { c.workerPool = pusher.NewWorkerPool(c.Concurrency) - c.retryHeap = pusher.NewRetryHeap(c.Concurrency, c.Log) + c.retryHeap = pusher.NewRetryHeap(c.Log) retryHeapProcessorRetryer := retryer.NewLogThrottleRetryer(c.Log) retryHeapProcessorClient := c.createClient(retryHeapProcessorRetryer) - c.retryHeapProcessor = pusher.NewRetryHeapProcessor(c.retryHeap, c.workerPool, retryHeapProcessorClient, c.targetManager, c.Log, maxRetryTimeout, retryHeapProcessorRetryer) + c.retryHeapProcessor = pusher.NewRetryHeapProcessor(c.retryHeap, c.workerPool, retryHeapProcessorClient, c.targetManager, c.Log, retryHeapProcessorRetryer) c.retryHeapProcessor.Start() } c.targetManager = pusher.NewTargetManager(c.Log, client) }) - p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, &c.pusherWaitGroup, c.retryHeap) + p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, &c.pusherWaitGroup, c.retryHeap) cwd := &cwDest{ pusher: p, retryer: logThrottleRetryer, diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go b/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go index d99c80838d..d68dfdaddd 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go @@ -18,6 +18,9 @@ import ( // CloudWatch Logs PutLogEvents API limits // Taken from https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html const ( + // maxRetryTimeout is the default retry timeout for CloudWatch Logs operations + maxRetryTimeout = 14*24*time.Hour + 10*time.Minute + // The maximum batch size in bytes. This size is calculated as the sum of all event messages in UTF-8, // plus 26 bytes for each log event. reqSizeLimit = 1024 * 1024 @@ -109,6 +112,8 @@ type logEventBatch struct { retryCountLong int // Number of retries using long delay strategy startTime time.Time // Time of first request (for max retry duration calculation) nextRetryTime time.Time // When this batch should be retried next + expireAfter time.Time // When this batch expires and should be dropped + lastError error // Last error encountered } func newLogEventBatch(target Target, entityProvider logs.LogEntityProvider) *logEventBatch { @@ -251,16 +256,20 @@ func (t byTimestamp) Less(i, j int) bool { return *t[i].Timestamp < *t[j].Timestamp } -// initializeStartTime sets the start time if not already set. +// initializeStartTime sets the start time and expiration time if not already set. func (b *logEventBatch) initializeStartTime() { if b.startTime.IsZero() { b.startTime = time.Now() + b.expireAfter = b.startTime.Add(maxRetryTimeout) } } // updateRetryMetadata updates the retry metadata after a failed send attempt. // It increments the appropriate retry counter based on the error type and calculates the next retry time. func (b *logEventBatch) updateRetryMetadata(err error) { + // Store the error + b.lastError = err + // Determine retry strategy and increment counter var wait time.Duration if chooseRetryWaitStrategy(err) == retryLong { @@ -275,12 +284,9 @@ func (b *logEventBatch) updateRetryMetadata(err error) { b.nextRetryTime = time.Now().Add(wait) } -// isExpired checks if the batch has exceeded the maximum retry duration. -func (b *logEventBatch) isExpired(maxRetryDuration time.Duration) bool { - if b.startTime.IsZero() { - return false - } - return time.Since(b.startTime) > maxRetryDuration +// isExpired checks if the batch has exceeded its expiration time. +func (b *logEventBatch) isExpired() bool { + return !b.expireAfter.IsZero() && time.Now().After(b.expireAfter) } // isReadyForRetry checks if enough time has passed since the last failure to retry this batch. diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go index 677bb457cd..cc031362ad 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go @@ -411,7 +411,7 @@ func TestBatchRetryMetadata(t *testing.T) { // Test initial state assert.True(t, batch.startTime.IsZero()) assert.True(t, batch.isReadyForRetry()) - assert.False(t, batch.isExpired(time.Hour)) + assert.False(t, batch.isExpired()) // Test initializeStartTime batch.initializeStartTime() @@ -422,6 +422,7 @@ func TestBatchRetryMetadata(t *testing.T) { batch.updateRetryMetadata(err) assert.Equal(t, 1, batch.retryCountShort) assert.Equal(t, 0, batch.retryCountLong) + assert.Equal(t, err, batch.lastError) assert.False(t, batch.nextRetryTime.IsZero()) // Test isReadyForRetry - should be false immediately after retry metadata update @@ -432,6 +433,40 @@ func TestBatchRetryMetadata(t *testing.T) { assert.True(t, batch.isReadyForRetry()) // Test isExpired - batch.startTime = time.Now().Add(-25 * time.Hour) - assert.True(t, batch.isExpired(24*time.Hour)) + batch.expireAfter = time.Now().Add(-1 * time.Hour) + assert.True(t, batch.isExpired()) +} + +func TestBatchInitializeStartTimeIdempotent(t *testing.T) { + batch := newLogEventBatch(Target{Group: "test-group", Stream: "test-stream"}, nil) + + // Verify initial state + assert.True(t, batch.startTime.IsZero()) + assert.True(t, batch.expireAfter.IsZero()) + + // First call should set both values + batch.initializeStartTime() + assert.False(t, batch.startTime.IsZero()) + assert.False(t, batch.expireAfter.IsZero()) + + // Capture the values + firstStartTime := batch.startTime + firstExpireAfter := batch.expireAfter + + // Verify expireAfter is set to startTime + maxRetryTimeout + expectedExpireAfter := firstStartTime.Add(maxRetryTimeout) + assert.Equal(t, expectedExpireAfter, firstExpireAfter) + + // Wait a bit to ensure time has passed + time.Sleep(10 * time.Millisecond) + + // Second call should NOT change the values (idempotent) + batch.initializeStartTime() + assert.Equal(t, firstStartTime, batch.startTime, "startTime should not change on second call") + assert.Equal(t, firstExpireAfter, batch.expireAfter, "expireAfter should not change on second call") + + // Third call should also not change the values + batch.initializeStartTime() + assert.Equal(t, firstStartTime, batch.startTime, "startTime should not change on third call") + assert.Equal(t, firstExpireAfter, batch.expireAfter, "expireAfter should not change on third call") } diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/circuitbreaker_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/circuitbreaker_test.go new file mode 100644 index 0000000000..f12e64df1f --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/circuitbreaker_test.go @@ -0,0 +1,98 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" + "github.com/aws/amazon-cloudwatch-agent/tool/testutil" +) + +// TestCircuitBreakerBlocksTargetAfterFailure verifies that when a batch fails +// for a target, the circuit breaker prevents additional batches from that target +// from being sent until the failing batch is retried successfully. +// +// Without a circuit breaker, a problematic target continues producing new batches +// that flood the SenderQueue/WorkerPool, starving healthy targets. +func TestCircuitBreakerBlocksTargetAfterFailure(t *testing.T) { + logger := testutil.NewNopLogger() + + failingTarget := Target{Group: "failing-group", Stream: "stream"} + healthyTarget := Target{Group: "healthy-group", Stream: "stream"} + + var failingTargetSendCount atomic.Int32 + var healthyTargetSendCount atomic.Int32 + + service := &stubLogsService{ + ple: func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + if *input.LogGroupName == failingTarget.Group { + failingTargetSendCount.Add(1) + return nil, &cloudwatchlogs.ServiceUnavailableException{} + } + healthyTargetSendCount.Add(1) + return &cloudwatchlogs.PutLogEventsOutput{}, nil + }, + cls: func(_ *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + return &cloudwatchlogs.CreateLogStreamOutput{}, nil + }, + clg: func(_ *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { + return &cloudwatchlogs.CreateLogGroupOutput{}, nil + }, + dlg: func(_ *cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) { + return &cloudwatchlogs.DescribeLogGroupsOutput{}, nil + }, + } + + concurrency := 5 + workerPool := NewWorkerPool(concurrency) + retryHeap := NewRetryHeap(logger) + defer workerPool.Stop() + defer retryHeap.Stop() + + tm := NewTargetManager(logger, service) + + var wg sync.WaitGroup + flushTimeout := 50 * time.Millisecond + + failingPusher := NewPusher(logger, failingTarget, service, tm, nil, workerPool, flushTimeout, &wg, retryHeap) + healthyPusher := NewPusher(logger, healthyTarget, service, tm, nil, workerPool, flushTimeout, &wg, retryHeap) + defer failingPusher.Stop() + defer healthyPusher.Stop() + + now := time.Now() + + // Send events to both targets. The failing target will fail on PutLogEvents, + // and the circuit breaker should block it from sending more batches. + for i := 0; i < 10; i++ { + failingPusher.AddEvent(newStubLogEvent("fail", now)) + healthyPusher.AddEvent(newStubLogEvent("ok", now)) + } + + // Wait for flushes to occur + time.Sleep(500 * time.Millisecond) + + // Send more events - the failing target should be blocked by circuit breaker + for i := 0; i < 10; i++ { + failingPusher.AddEvent(newStubLogEvent("fail-more", now)) + healthyPusher.AddEvent(newStubLogEvent("ok-more", now)) + } + + time.Sleep(500 * time.Millisecond) + + // Circuit breaker assertion: after the first failure, the failing target should + // NOT have sent additional batches. Only 1 send attempt should have been made + // before the circuit breaker blocks it. + assert.Equal(t, int32(1), failingTargetSendCount.Load(), + "Circuit breaker should block failing target after exactly 1 send attempt") + + // Healthy target should continue sending successfully + assert.Greater(t, healthyTargetSendCount.Load(), int32(0), + "Healthy target should continue sending while failing target is blocked") +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/poison_pill_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/poison_pill_test.go new file mode 100644 index 0000000000..5a322c7caa --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/poison_pill_test.go @@ -0,0 +1,229 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/aws/amazon-cloudwatch-agent/internal/retryer" + "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" +) + +// TestRetryHeapProcessorDoesNotStarveAllowedTarget validates that when 10 denied + 1 allowed log groups +// share a worker pool with concurrency=2, the allowed log group continues +// publishing without being starved by failed retries. +// Note: This test pushes batches directly to the heap and bypasses the full +// queue → sender → retryHeap → processor pipeline. It validates RetryHeapProcessor +// behavior, not the end-to-end circuit breaker flow. +func TestRetryHeapProcessorDoesNotStarveAllowedTarget(t *testing.T) { + heap := NewRetryHeap(&testutil.Logger{}) + defer heap.Stop() + + workerPool := NewWorkerPool(2) // Low concurrency as in the bug scenario + defer workerPool.Stop() + + mockService := &mockLogsService{} + mockTargetManager := &mockTargetManager{} + mockTargetManager.On("EnsureTargetExists", mock.Anything).Return(nil) + + accessDeniedErr := &cloudwatchlogs.AccessDeniedException{ + Message_: aws.String("User is not authorized to perform: logs:PutLogEvents with an explicit deny"), + } + + // Track successful PutLogEvents calls for the allowed log group + var allowedGroupSuccessCount atomic.Int32 + var deniedGroupAttemptCount atomic.Int32 + + // Configure mock service responses with realistic latency + mockService.On("PutLogEvents", mock.MatchedBy(func(input *cloudwatchlogs.PutLogEventsInput) bool { + return *input.LogGroupName == "log-stream-ple-access-granted" + })).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Run(func(_ mock.Arguments) { + time.Sleep(10 * time.Millisecond) // Simulate API latency + allowedGroupSuccessCount.Add(1) + }) + + mockService.On("PutLogEvents", mock.MatchedBy(func(input *cloudwatchlogs.PutLogEventsInput) bool { + return *input.LogGroupName != "log-stream-ple-access-granted" + })).Return((*cloudwatchlogs.PutLogEventsOutput)(nil), accessDeniedErr).Run(func(_ mock.Arguments) { + time.Sleep(10 * time.Millisecond) // Simulate API latency + deniedGroupAttemptCount.Add(1) + }) + + processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, retryer.NewLogThrottleRetryer(&testutil.Logger{})) + + // Targets + allowedTarget := Target{Group: "log-stream-ple-access-granted", Stream: "i-test"} + deniedTargets := make([]Target, 10) + for i := 0; i < 10; i++ { + deniedTargets[i] = Target{ + Group: "aws-restricted-log-group-name-log-stream-ple-access-denied" + string(rune('0'+i)), + Stream: "i-test", + } + } + + // Simulate continuous batch generation over time (like force_flush_interval=5s) + done := make(chan struct{}) + var wg sync.WaitGroup + + // Continuously generate batches for denied log groups (simulating continuous log writes) + for i := 0; i < 10; i++ { + wg.Add(1) + go func(target Target) { + defer wg.Done() + ticker := time.NewTicker(50 * time.Millisecond) // Simulate flush interval + defer ticker.Stop() + batchCount := 0 + for { + select { + case <-done: + return + case <-ticker.C: + if batchCount >= 5 { // Generate 5 batches per denied log group + return + } + batch := createBatch(target, 50) + batch.nextRetryTime = time.Now().Add(-1 * time.Second) + heap.Push(batch) + batchCount++ + } + } + }(deniedTargets[i]) + } + + // Continuously generate batches for allowed log group + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(50 * time.Millisecond) + defer ticker.Stop() + batchCount := 0 + for { + select { + case <-done: + return + case <-ticker.C: + if batchCount >= 10 { // Generate 10 batches for allowed log group + return + } + batch := createBatch(allowedTarget, 20) + batch.nextRetryTime = time.Now().Add(-1 * time.Second) + heap.Push(batch) + batchCount++ + } + } + }() + + // Process batches continuously + processorDone := make(chan struct{}) + go func() { + ticker := time.NewTicker(20 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-processorDone: + return + case <-ticker.C: + processor.processReadyMessages() + } + } + }() + + // Run for 2 seconds to simulate sustained load + time.Sleep(2 * time.Second) + close(done) + wg.Wait() + + // Process remaining messages + time.Sleep(500 * time.Millisecond) + processor.processReadyMessages() + time.Sleep(200 * time.Millisecond) + close(processorDone) + + // CRITICAL ASSERTION: Allowed log group MUST receive events throughout the test + successCount := allowedGroupSuccessCount.Load() + t.Logf("Allowed group success count: %d, Denied group attempt count: %d", successCount, deniedGroupAttemptCount.Load()) + + assert.Greater(t, successCount, int32(5), + "Allowed log group must continue receiving events despite continuous denied log group failures. Got %d, expected > 5", successCount) + + // Verify denied log groups attempted to send + assert.Greater(t, deniedGroupAttemptCount.Load(), int32(0), + "Denied log groups should have attempted to send") +} + +// TestSingleDeniedLogGroup validates the baseline scenario where a single denied +// log group does not affect the allowed log group. +func TestSingleDeniedLogGroup(t *testing.T) { + heap := NewRetryHeap(&testutil.Logger{}) + defer heap.Stop() + + workerPool := NewWorkerPool(4) // Higher concurrency as in initial test + defer workerPool.Stop() + + mockService := &mockLogsService{} + mockTargetManager := &mockTargetManager{} + mockTargetManager.On("EnsureTargetExists", mock.Anything).Return(nil) + + accessDeniedErr := &cloudwatchlogs.AccessDeniedException{ + Message_: aws.String("Access denied"), + } + + var allowedGroupSuccessCount atomic.Int32 + + mockService.On("PutLogEvents", mock.MatchedBy(func(input *cloudwatchlogs.PutLogEventsInput) bool { + return *input.LogGroupName == "log-stream-ple-access-granted" + })).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Run(func(_ mock.Arguments) { + allowedGroupSuccessCount.Add(1) + }) + + mockService.On("PutLogEvents", mock.MatchedBy(func(input *cloudwatchlogs.PutLogEventsInput) bool { + return *input.LogGroupName == "aws-restricted-log-group-name-log-stream-ple-access-denied" + })).Return((*cloudwatchlogs.PutLogEventsOutput)(nil), accessDeniedErr) + + processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, retryer.NewLogThrottleRetryer(&testutil.Logger{})) + + // Create batches + allowedTarget := Target{Group: "log-stream-ple-access-granted", Stream: "i-test"} + deniedTarget := Target{Group: "aws-restricted-log-group-name-log-stream-ple-access-denied", Stream: "i-test"} + + allowedBatch := createBatch(allowedTarget, 40) + deniedBatch := createBatch(deniedTarget, 40) + + allowedBatch.nextRetryTime = time.Now().Add(-1 * time.Second) + deniedBatch.nextRetryTime = time.Now().Add(-1 * time.Second) + + err := heap.Push(allowedBatch) + assert.NoError(t, err) + err = heap.Push(deniedBatch) + assert.NoError(t, err) + + processor.processReadyMessages() + time.Sleep(100 * time.Millisecond) + + // Verify allowed log group received events + assert.Greater(t, allowedGroupSuccessCount.Load(), int32(0), + "Allowed log group must receive events with single denied log group") +} + +// createBatch creates a log event batch with the specified number of events +func createBatch(target Target, eventCount int) *logEventBatch { + batch := newLogEventBatch(target, nil) + batch.events = make([]*cloudwatchlogs.InputLogEvent, eventCount) + now := time.Now().Unix() * 1000 + for i := 0; i < eventCount; i++ { + batch.events[i] = &cloudwatchlogs.InputLogEvent{ + Message: aws.String("test message"), + Timestamp: aws.Int64(now + int64(i)), + } + } + return batch +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go index 1d6edf57e9..fb15ba9fab 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go @@ -6,7 +6,6 @@ package pusher import ( "sync" "sync/atomic" - "time" ) type WorkerPool interface { @@ -113,13 +112,3 @@ func (s *senderPool) Stop() { // workerpool is stopped by the plugin s.sender.Stop() } - -// SetRetryDuration sets the retry duration on the wrapped Sender. -func (s *senderPool) SetRetryDuration(duration time.Duration) { - s.sender.SetRetryDuration(duration) -} - -// RetryDuration returns the retry duration of the wrapped Sender. -func (s *senderPool) RetryDuration() time.Duration { - return s.sender.RetryDuration() -} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go index 7cbbc54256..ed74249250 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go @@ -107,14 +107,10 @@ func TestSenderPool(t *testing.T) { logger := testutil.NewNopLogger() mockService := new(mockLogsService) mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil) - s := newSender(logger, mockService, nil, time.Second, nil) + s := newSender(logger, mockService, nil, nil) p := NewWorkerPool(12) sp := newSenderPool(p, s) - assert.Equal(t, time.Second, sp.RetryDuration()) - sp.SetRetryDuration(time.Minute) - assert.Equal(t, time.Minute, sp.RetryDuration()) - var completed atomic.Int32 var evts []*logEvent for i := 0; i < 200; i++ { @@ -134,23 +130,3 @@ func TestSenderPool(t *testing.T) { s.Stop() assert.Equal(t, int32(200), completed.Load()) } - -func TestSenderPoolRetryHeap(t *testing.T) { - assert.NotPanics(t, func() { - logger := testutil.NewNopLogger() - mockService := new(mockLogsService) - mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil) - - // Create RetryHeap - retryHeap := NewRetryHeap(10, logger) - defer retryHeap.Stop() - - s := newSender(logger, mockService, nil, time.Second, retryHeap) - p := NewWorkerPool(12) - defer p.Stop() - - sp := newSenderPool(p, s) - - sp.Stop() - }) -} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go index 43310d6861..aa2f4be722 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go @@ -32,11 +32,10 @@ func NewPusher( entityProvider logs.LogEntityProvider, workerPool WorkerPool, flushTimeout time.Duration, - retryDuration time.Duration, wg *sync.WaitGroup, retryHeap RetryHeap, ) *Pusher { - s := createSender(logger, service, targetManager, workerPool, retryDuration, retryHeap) + s := createSender(logger, service, targetManager, workerPool, retryHeap) q := newQueue(logger, target, flushTimeout, entityProvider, s, wg) targetManager.PutRetentionPolicy(target) @@ -61,10 +60,9 @@ func createSender( service cloudWatchLogsService, targetManager TargetManager, workerPool WorkerPool, - retryDuration time.Duration, retryHeap RetryHeap, ) Sender { - s := newSender(logger, service, targetManager, retryDuration, retryHeap) + s := newSender(logger, service, targetManager, retryHeap) if workerPool == nil { return s } diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go index ef5f514501..e862c99b64 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go @@ -111,7 +111,6 @@ func setupPusher(t *testing.T, workerPool WorkerPool, wg *sync.WaitGroup) *Pushe nil, workerPool, time.Second, - time.Minute, wg, nil, // retryHeap ) @@ -136,7 +135,7 @@ func TestPusherRetryHeap(t *testing.T) { workerPool := NewWorkerPool(2) defer workerPool.Stop() - retryHeap := NewRetryHeap(10, logger) + retryHeap := NewRetryHeap(logger) defer retryHeap.Stop() var wg sync.WaitGroup @@ -148,7 +147,6 @@ func TestPusherRetryHeap(t *testing.T) { nil, workerPool, time.Second, - time.Minute, &wg, retryHeap, ) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go b/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go index e8ad65ffdc..8899554df9 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go @@ -42,6 +42,11 @@ type queue struct { initNonBlockingChOnce sync.Once startNonBlockCh chan struct{} wg *sync.WaitGroup + + // Circuit breaker halt/resume functionality + haltMu sync.Mutex + haltCh chan struct{} + halted bool } var _ (Queue) = (*queue)(nil) @@ -67,6 +72,8 @@ func newQueue( stopCh: make(chan struct{}), startNonBlockCh: make(chan struct{}), wg: wg, + haltCh: make(chan struct{}), + halted: false, } q.flushTimeout.Store(flushTimeout) q.wg.Add(1) @@ -175,6 +182,14 @@ func (q *queue) merge(mergeChan chan logs.LogEvent) { func (q *queue) send() { if len(q.batch.events) > 0 { q.batch.addDoneCallback(q.onSuccessCallback(q.batch.bufferedSize)) + q.batch.addFailCallback(q.halt) + + // In synchronous mode (no retryHeap), halt() is never called because + // sender only calls batch.fail() when retryHeap != nil. So waitIfHalted + // is a no-op. The lock acquisition is negligible overhead (~20ns) on + // the uncontended path. + q.waitIfHalted() + q.sender.Send(q.batch) q.batch = newLogEventBatch(q.target, q.entityProvider) } @@ -183,6 +198,7 @@ func (q *queue) send() { // onSuccessCallback returns a callback function to be executed after a successful send. func (q *queue) onSuccessCallback(bufferedSize int) func() { return func() { + q.resume() // Resume queue on success q.lastSentTime.Store(time.Now()) go q.addStats("rawSize", float64(bufferedSize)) q.resetFlushTimer() @@ -245,3 +261,36 @@ func hasValidTime(e logs.LogEvent) bool { } return true } + +// waitIfHalted blocks until the queue is unhalted or stopped. +func (q *queue) waitIfHalted() { + q.haltMu.Lock() + if !q.halted { + q.haltMu.Unlock() + return + } + ch := q.haltCh + q.haltMu.Unlock() + select { + case <-ch: + case <-q.stopCh: + } +} + +// halt stops the queue from sending batches (called on failure). +func (q *queue) halt() { + q.haltMu.Lock() + defer q.haltMu.Unlock() + q.halted = true +} + +// resume allows the queue to send batches again (called on success). +func (q *queue) resume() { + q.haltMu.Lock() + defer q.haltMu.Unlock() + if q.halted { + q.halted = false + close(q.haltCh) + q.haltCh = make(chan struct{}) + } +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go index 29ea4d9ca1..f2bd145fc0 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go @@ -123,7 +123,7 @@ func TestAddSingleEvent_WithAccountId(t *testing.T) { } ep := newMockEntityProvider(expectedEntity) - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, ep, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, ep, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now())) require.False(t, called.Load(), "PutLogEvents has been called too fast, it should wait until FlushTimeout.") @@ -160,7 +160,7 @@ func TestAddSingleEvent_WithoutAccountId(t *testing.T) { } ep := newMockEntityProvider(nil) - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, ep, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, ep, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now())) require.False(t, called.Load(), "PutLogEvents has been called too fast, it should wait until FlushTimeout.") @@ -190,7 +190,7 @@ func TestStopQueueWouldDoFinalSend(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now())) time.Sleep(10 * time.Millisecond) @@ -214,7 +214,7 @@ func TestStopPusherWouldStopRetries(t *testing.T) { } logSink := testutil.NewLogSink() - q, sender := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now())) time.Sleep(10 * time.Millisecond) @@ -256,7 +256,7 @@ func TestLongMessageHandling(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent(longMsg, time.Now())) triggerSend(t, q) @@ -285,7 +285,7 @@ func TestRequestIsLessThan1MB(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, nil, &wg) for i := 0; i < 8; i++ { q.AddEvent(newStubLogEvent(longMsg, time.Now())) } @@ -311,7 +311,7 @@ func TestRequestIsLessThan10kEvents(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, nil, &wg) for i := 0; i < 30000; i++ { q.AddEvent(newStubLogEvent(msg, time.Now())) } @@ -337,7 +337,7 @@ func TestTimestampPopulation(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, nil, &wg) for i := 0; i < 3; i++ { q.AddEvent(newStubLogEvent("msg", time.Time{})) } @@ -361,7 +361,7 @@ func TestIgnoreOutOfTimeRangeEvent(t *testing.T) { } logSink := testutil.NewLogSink() - q, sender := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, 2*time.Hour, nil, &wg) + q, sender := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, nil, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now().Add(-15*24*time.Hour))) q.AddEventNonBlocking(newStubLogEvent("MSG", time.Now().Add(2*time.Hour+1*time.Minute))) @@ -414,7 +414,7 @@ func TestAddMultipleEvents(t *testing.T) { )) } evts[10], evts[90] = evts[90], evts[10] // make events out of order - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, nil, &wg) for _, e := range evts { q.AddEvent(e) } @@ -466,7 +466,7 @@ func TestSendReqWhenEventsSpanMoreThan24Hrs(t *testing.T) { return nil, nil } - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("MSG 25hrs ago", time.Now().Add(-25*time.Hour))) q.AddEvent(newStubLogEvent("MSG 24hrs ago", time.Now().Add(-24*time.Hour))) q.AddEvent(newStubLogEvent("MSG 23hrs ago", time.Now().Add(-23*time.Hour))) @@ -496,7 +496,7 @@ func TestUnhandledErrorWouldNotResend(t *testing.T) { } logSink := testutil.NewLogSink() - q, sender := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, 2*time.Hour, nil, &wg) + q, sender := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, nil, &wg) q.AddEvent(newStubLogEvent("msg", time.Now())) time.Sleep(2 * time.Second) @@ -542,7 +542,7 @@ func TestCreateLogGroupAndLogStreamWhenNotFound(t *testing.T) { } logSink := testutil.NewLogSink() - q, sender := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, nil, &wg) var eventWG sync.WaitGroup eventWG.Add(1) q.AddEvent(&stubLogEvent{message: "msg", timestamp: time.Now(), done: eventWG.Done}) @@ -580,7 +580,7 @@ func TestLogRejectedLogEntryInfo(t *testing.T) { } logSink := testutil.NewLogSink() - q, sender := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, nil, &wg) var eventWG sync.WaitGroup eventWG.Add(1) q.AddEvent(&stubLogEvent{message: "msg", timestamp: time.Now(), done: eventWG.Done}) @@ -630,7 +630,7 @@ func TestAddEventNonBlocking(t *testing.T) { start.Add(time.Duration(i)*time.Millisecond), )) } - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, nil, &wg) time.Sleep(200 * time.Millisecond) // Wait until pusher started, merge channel is blocked for _, e := range evts { @@ -646,32 +646,6 @@ func TestAddEventNonBlocking(t *testing.T) { wg.Wait() } -func TestResendWouldStopAfterExhaustedRetries(t *testing.T) { - t.Parallel() - var wg sync.WaitGroup - var s stubLogsService - var cnt atomic.Int32 - - s.ple = func(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { - cnt.Add(1) - return nil, &cloudwatchlogs.ServiceUnavailableException{} - } - - logSink := testutil.NewLogSink() - q, sender := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, time.Second, nil, &wg) - q.AddEvent(newStubLogEvent("msg", time.Now())) - time.Sleep(2 * time.Second) - - logLines := logSink.Lines() - lastLine := logLines[len(logLines)-1] - expected := fmt.Sprintf("All %v retries to G/S failed for PutLogEvents, request dropped.", cnt.Load()-1) - require.True(t, strings.HasSuffix(lastLine, expected), fmt.Sprintf("Expecting error log to end with request dropped, but received '%s' in the log", logSink.String())) - - q.Stop() - sender.Stop() - wg.Wait() -} - // Cannot call q.send() directly as it would cause a race condition. Reset last sent time and trigger flush. func triggerSend(t *testing.T, q *queue) { t.Helper() @@ -684,7 +658,6 @@ func testPreparation( retention int, service cloudWatchLogsService, flushTimeout time.Duration, - retryDuration time.Duration, entityProvider logs.LogEntityProvider, wg *sync.WaitGroup, ) (*queue, Sender) { @@ -694,7 +667,6 @@ func testPreparation( retention, service, flushTimeout, - retryDuration, entityProvider, wg, ) @@ -706,13 +678,12 @@ func testPreparationWithLogger( retention int, service cloudWatchLogsService, flushTimeout time.Duration, - retryDuration time.Duration, entityProvider logs.LogEntityProvider, wg *sync.WaitGroup, ) (*queue, Sender) { t.Helper() tm := NewTargetManager(logger, service) - s := newSender(logger, service, tm, retryDuration, nil) + s := newSender(logger, service, tm, nil) q := newQueue( logger, Target{"G", "S", util.StandardLogGroupClass, retention}, @@ -759,6 +730,8 @@ func TestQueueCallbackRegistration(t *testing.T) { flushTimer: time.NewTimer(10 * time.Millisecond), startNonBlockCh: make(chan struct{}), wg: &wg, + haltCh: make(chan struct{}), + halted: false, } q.flushTimeout.Store(10 * time.Millisecond) @@ -801,6 +774,8 @@ func TestQueueCallbackRegistration(t *testing.T) { flushTimer: time.NewTimer(10 * time.Millisecond), startNonBlockCh: make(chan struct{}), wg: &wg, + haltCh: make(chan struct{}), + halted: false, } q.flushTimeout.Store(10 * time.Millisecond) @@ -814,3 +789,91 @@ func TestQueueCallbackRegistration(t *testing.T) { mockSender.AssertExpectations(t) }) } +func TestQueueHaltResume(t *testing.T) { + logger := testutil.NewNopLogger() + + var sendCount atomic.Int32 + mockSender := &mockSender{} + mockSender.On("Send", mock.Anything).Run(func(args mock.Arguments) { + sendCount.Add(1) + batch := args.Get(0).(*logEventBatch) + // Simulate failure on first call, success on subsequent calls + if sendCount.Load() == 1 { + batch.fail() // This should halt the queue + } else { + batch.done() // This should resume the queue + } + }).Return() + + var wg sync.WaitGroup + q := newQueue(logger, Target{"G", "S", util.StandardLogGroupClass, -1}, 10*time.Millisecond, nil, mockSender, &wg) + defer q.Stop() + + // Add first event - should trigger send and halt + q.AddEvent(newStubLogEvent("first message", time.Now())) + + // Wait a bit for the first send to complete and halt + time.Sleep(50 * time.Millisecond) + + // Verify queue is halted + queueImpl := q.(*queue) + queueImpl.haltMu.Lock() + assert.True(t, queueImpl.halted, "Queue should be halted after failure") + queueImpl.haltMu.Unlock() + + // Verify only one send happened (queue is halted) + assert.Equal(t, int32(1), sendCount.Load(), "Should have only one send due to halt") + + // Trigger resume by calling the success callback directly + queueImpl.resume() + + // Verify queue is no longer halted + queueImpl.haltMu.Lock() + assert.False(t, queueImpl.halted, "Queue should be resumed after success") + queueImpl.haltMu.Unlock() + + // Add second event - should trigger send since queue is resumed + q.AddEvent(newStubLogEvent("second message", time.Now())) + + // Wait for the second send to complete + time.Sleep(50 * time.Millisecond) + + // Verify second send happened (queue resumed) + assert.Equal(t, int32(2), sendCount.Load(), "Should have two sends after resume") + + mockSender.AssertExpectations(t) +} + +// TestQueueStopWhileHalted verifies that Stop() unblocks a halted queue. +// Without the stopCh select in waitIfHalted, this would deadlock. +func TestQueueStopWhileHalted(t *testing.T) { + logger := testutil.NewNopLogger() + + mockSender := &mockSender{} + mockSender.On("Send", mock.Anything).Run(func(args mock.Arguments) { + batch := args.Get(0).(*logEventBatch) + batch.fail() // Halt the queue + }).Return() + mockSender.On("Stop").Return() + + var wg sync.WaitGroup + q := newQueue(logger, Target{"G", "S", util.StandardLogGroupClass, -1}, 10*time.Millisecond, nil, mockSender, &wg) + + // Add event to trigger send → fail → halt + q.AddEvent(newStubLogEvent("msg", time.Now())) + time.Sleep(50 * time.Millisecond) + + // Queue is now halted. Stop must return without deadlocking. + done := make(chan struct{}) + go func() { + q.Stop() + close(done) + }() + + select { + case <-done: + // Success — Stop() returned + case <-time.After(2 * time.Second): + t.Fatal("Stop() deadlocked on halted queue") + } +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go b/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go index 021d513b65..a4c708ad6f 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go @@ -5,6 +5,7 @@ package pusher import ( "container/heap" + "errors" "sync" "time" @@ -41,51 +42,44 @@ func (h *retryHeapImpl) Pop() interface{} { // RetryHeap manages failed batches during their retry wait periods type RetryHeap interface { - Push(batch *logEventBatch) + Push(batch *logEventBatch) error PopReady() []*logEventBatch Size() int Stop() } type retryHeap struct { - heap retryHeapImpl - mutex sync.RWMutex - semaphore chan struct{} // Size enforcer - stopCh chan struct{} - maxSize int - stopped bool - logger telegraf.Logger + heap retryHeapImpl + mutex sync.RWMutex + stopCh chan struct{} + stopped bool + logger telegraf.Logger } var _ RetryHeap = (*retryHeap)(nil) -// NewRetryHeap creates a new retry heap with the specified maximum size -func NewRetryHeap(maxSize int, logger telegraf.Logger) RetryHeap { +// NewRetryHeap creates a new retry heap (unbounded) +func NewRetryHeap(logger telegraf.Logger) RetryHeap { rh := &retryHeap{ - heap: make(retryHeapImpl, 0, maxSize), - maxSize: maxSize, - semaphore: make(chan struct{}, maxSize), // Semaphore for size enforcement - stopCh: make(chan struct{}), - logger: logger, + heap: make(retryHeapImpl, 0), + stopCh: make(chan struct{}), + logger: logger, } heap.Init(&rh.heap) return rh } -// Push adds a batch to the heap, blocking if full -func (rh *retryHeap) Push(batch *logEventBatch) { - // Acquire semaphore slot (blocks if at maxSize capacity) - select { - case rh.semaphore <- struct{}{}: - // add batch to heap with mutex protection - rh.mutex.Lock() - heap.Push(&rh.heap, batch) - rh.mutex.Unlock() - case <-rh.stopCh: - // RetryHeap is stopped, drop the batch - rh.logger.Errorf("Stop requested for %v/%v failed for PutLogEvents, request dropped.", batch.Group, batch.Stream) - batch.updateState() +// Push adds a batch to the heap (non-blocking) +func (rh *retryHeap) Push(batch *logEventBatch) error { + rh.mutex.Lock() + defer rh.mutex.Unlock() + + if rh.stopped { + return errors.New("retry heap stopped") } + + heap.Push(&rh.heap, batch) + return nil } // PopReady returns all batches that are ready for retry (nextRetryTime <= now) @@ -100,8 +94,6 @@ func (rh *retryHeap) PopReady() []*logEventBatch { for len(rh.heap) > 0 && !rh.heap[0].nextRetryTime.After(now) { batch := heap.Pop(&rh.heap).(*logEventBatch) ready = append(ready, batch) - // Release semaphore slot for each popped batch - <-rh.semaphore } return ready @@ -116,6 +108,9 @@ func (rh *retryHeap) Size() int { // Stop stops the retry heap func (rh *retryHeap) Stop() { + rh.mutex.Lock() + defer rh.mutex.Unlock() + if rh.stopped { return } @@ -125,31 +120,30 @@ func (rh *retryHeap) Stop() { // RetryHeapProcessor manages the retry heap and moves ready batches back to sender queue type RetryHeapProcessor struct { - retryHeap RetryHeap - senderPool Sender - retryer *retryer.LogThrottleRetryer - stopCh chan struct{} - logger telegraf.Logger - stopped bool - maxRetryDuration time.Duration - wg sync.WaitGroup + retryHeap RetryHeap + senderPool Sender + retryer *retryer.LogThrottleRetryer + stopCh chan struct{} + logger telegraf.Logger + stopped bool + stopMu sync.Mutex + wg sync.WaitGroup } // NewRetryHeapProcessor creates a new retry heap processor -func NewRetryHeapProcessor(retryHeap RetryHeap, workerPool WorkerPool, service cloudWatchLogsService, targetManager TargetManager, logger telegraf.Logger, maxRetryDuration time.Duration, retryer *retryer.LogThrottleRetryer) *RetryHeapProcessor { +func NewRetryHeapProcessor(retryHeap RetryHeap, workerPool WorkerPool, service cloudWatchLogsService, targetManager TargetManager, logger telegraf.Logger, retryer *retryer.LogThrottleRetryer) *RetryHeapProcessor { // Create processor's own sender and senderPool // Pass retryHeap so failed batches go back to RetryHeap instead of blocking on sync retry - sender := newSender(logger, service, targetManager, maxRetryDuration, retryHeap) + sender := newSender(logger, service, targetManager, retryHeap) senderPool := newSenderPool(workerPool, sender) return &RetryHeapProcessor{ - retryHeap: retryHeap, - senderPool: senderPool, - retryer: retryer, - stopCh: make(chan struct{}), - logger: logger, - stopped: false, - maxRetryDuration: maxRetryDuration, + retryHeap: retryHeap, + senderPool: senderPool, + retryer: retryer, + stopCh: make(chan struct{}), + logger: logger, + stopped: false, } } @@ -161,18 +155,24 @@ func (p *RetryHeapProcessor) Start() { // Stop stops the retry heap processor func (p *RetryHeapProcessor) Stop() { + p.stopMu.Lock() + defer p.stopMu.Unlock() + if p.stopped { return } - // Process any remaining batches before stopping - p.processReadyMessages() + // Flush remaining ready batches before marking as stopped + p.flushReadyBatches() + + p.stopped = true - p.retryer.Stop() + if p.retryer != nil { + p.retryer.Stop() + } p.senderPool.Stop() close(p.stopCh) p.wg.Wait() - p.stopped = true } // processLoop runs the main processing loop @@ -193,17 +193,26 @@ func (p *RetryHeapProcessor) processLoop() { // processReadyMessages checks the heap for ready batches and moves them back to sender queue func (p *RetryHeapProcessor) processReadyMessages() { + p.stopMu.Lock() if p.stopped { + p.stopMu.Unlock() return } + p.stopMu.Unlock() + + p.flushReadyBatches() +} +// flushReadyBatches pops ready batches from the heap and sends them. +// Called by both processReadyMessages and Stop. +func (p *RetryHeapProcessor) flushReadyBatches() { readyBatches := p.retryHeap.PopReady() for _, batch := range readyBatches { // Check if batch has expired - if batch.isExpired(p.maxRetryDuration) { + if batch.isExpired() { p.logger.Errorf("Dropping expired batch for %v/%v", batch.Group, batch.Stream) - batch.updateState() + batch.done() // Resume circuit breaker to allow target to process new batches continue } diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_expiry_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_expiry_test.go new file mode 100644 index 0000000000..64bfd588e9 --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_expiry_test.go @@ -0,0 +1,91 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" + "github.com/aws/amazon-cloudwatch-agent/tool/testutil" +) + +// TestRetryHeapProcessorExpiredBatchShouldResume verifies that expired batches +// resume the circuit breaker, preventing the target from being permanently blocked. +func TestRetryHeapProcessorExpiredBatchShouldResume(t *testing.T) { + logger := testutil.NewNopLogger() + + var sendAttempts atomic.Int32 + mockService := &stubLogsService{ + ple: func(_ *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + sendAttempts.Add(1) + // Always fail to simulate a problematic target + return nil, &cloudwatchlogs.ServiceUnavailableException{} + }, + cls: func(_ *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + return &cloudwatchlogs.CreateLogStreamOutput{}, nil + }, + clg: func(_ *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { + return &cloudwatchlogs.CreateLogGroupOutput{}, nil + }, + dlg: func(_ *cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) { + return &cloudwatchlogs.DescribeLogGroupsOutput{}, nil + }, + } + + target := Target{Group: "failing-group", Stream: "stream"} + + // Create retry heap and processor with very short expiry for testing + retryHeap := NewRetryHeap(logger) + workerPool := NewWorkerPool(5) + tm := NewTargetManager(logger, mockService) + + retryHeapProcessor := NewRetryHeapProcessor(retryHeap, workerPool, mockService, tm, logger, nil) + retryHeapProcessor.Start() + + defer retryHeap.Stop() + defer workerPool.Stop() + defer retryHeapProcessor.Stop() + + // Create a batch that will expire + batch := newLogEventBatch(target, nil) + batch.append(newLogEvent(time.Now(), "test message", nil)) + + // Set up callback to track circuit breaker resume + var circuitBreakerResumed atomic.Bool + + batch.addDoneCallback(func() { + circuitBreakerResumed.Store(true) + }) + + // Initialize the batch's start time to make it already expired + batch.initializeStartTime() + batch.expireAfter = time.Now().Add(-10 * time.Millisecond) // Already expired + + // Update retry metadata to simulate a failed attempt and make it ready for retry + batch.updateRetryMetadata(&cloudwatchlogs.ServiceUnavailableException{}) + // Set nextRetryTime to past so it's ready for retry + batch.nextRetryTime = time.Now().Add(-10 * time.Millisecond) + + // Push the expired batch to the retry heap + err := retryHeap.Push(batch) + assert.NoError(t, err) + + // Verify batch is in the heap + assert.Equal(t, 1, retryHeap.Size()) + + // Wait for RetryHeapProcessor to process the expired batch + time.Sleep(200 * time.Millisecond) + + // The batch should have been removed from the heap + assert.Equal(t, 0, retryHeap.Size(), "Expired batch should be removed from heap") + + // The circuit breaker SHOULD be resumed when the batch expires + // This allows the target to continue processing new batches after the bad batch is dropped + assert.True(t, circuitBreakerResumed.Load(), + "Circuit breaker should resume after batch expiry to unblock the target") +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_recovery_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_recovery_test.go new file mode 100644 index 0000000000..7dfe1020c0 --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_recovery_test.go @@ -0,0 +1,110 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "sync" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/aws/amazon-cloudwatch-agent/internal/retryer" + "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" +) + +// TestRecoveryWhenPermissionGrantedDuringRetry validates that when PLE permissions +// are missing initially but granted while retry is ongoing, the system recovers +// and successfully publishes logs. +func TestRecoveryWhenPermissionGrantedDuringRetry(t *testing.T) { + heap := NewRetryHeap(&testutil.Logger{}) + defer heap.Stop() + + workerPool := NewWorkerPool(2) + defer workerPool.Stop() + + // Mock service that initially returns AccessDenied, then succeeds + mockService := &mockLogsService{} + accessDeniedErr := &cloudwatchlogs.AccessDeniedException{ + Message_: aws.String("Access denied"), + } + + // First call fails with AccessDenied + mockService.On("PutLogEvents", mock.Anything).Return((*cloudwatchlogs.PutLogEventsOutput)(nil), accessDeniedErr).Once() + // Second call succeeds (permission granted) + mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once() + + mockTargetManager := &mockTargetManager{} + mockTargetManager.On("EnsureTargetExists", mock.Anything).Return(nil) + + processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, retryer.NewLogThrottleRetryer(&testutil.Logger{})) + + // Create batch and track circuit breaker state + target := Target{Group: "group", Stream: "stream"} + batch := newLogEventBatch(target, nil) + batch.events = []*cloudwatchlogs.InputLogEvent{ + {Message: aws.String("test message"), Timestamp: aws.Int64(time.Now().Unix() * 1000)}, + } + + var haltCalled, resumeCalled bool + var mu sync.Mutex + + // Register circuit breaker callbacks + batch.addFailCallback(func() { + mu.Lock() + haltCalled = true + mu.Unlock() + }) + batch.addDoneCallback(func() { + mu.Lock() + resumeCalled = true + mu.Unlock() + }) + + // Set batch ready for immediate retry + batch.nextRetryTime = time.Now().Add(-1 * time.Second) + + // Push batch to heap + err := heap.Push(batch) + assert.NoError(t, err) + + // Process first attempt - should fail with AccessDenied + processor.processReadyMessages() + + // Wait for async processing to complete + time.Sleep(100 * time.Millisecond) + + // Verify circuit breaker halted + mu.Lock() + assert.True(t, haltCalled, "Circuit breaker should halt on failure") + assert.False(t, resumeCalled, "Circuit breaker should not resume yet") + mu.Unlock() + + // Batch should be back in heap for retry + assert.Equal(t, 1, heap.Size(), "Failed batch should be in retry heap") + + // Simulate permission being granted by waiting for retry time + // Set batch ready for immediate retry + batch.nextRetryTime = time.Now().Add(-1 * time.Second) + + // Process second attempt - should succeed + processor.processReadyMessages() + + // Wait for async processing to complete + time.Sleep(100 * time.Millisecond) + + // Verify circuit breaker resumed + mu.Lock() + assert.True(t, resumeCalled, "Circuit breaker should resume on success") + mu.Unlock() + + // Heap should be empty (batch successfully sent) + assert.Equal(t, 0, heap.Size(), "Heap should be empty after successful retry") + + // Verify both PutLogEvents calls were made + mockService.AssertExpectations(t) +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go index 3bfff7a114..d79e388e07 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go @@ -4,6 +4,7 @@ package pusher import ( + "sync/atomic" "testing" "time" @@ -16,7 +17,7 @@ import ( ) func TestRetryHeap(t *testing.T) { - heap := NewRetryHeap(10, &testutil.Logger{}) + heap := NewRetryHeap(&testutil.Logger{}) defer heap.Stop() // Test empty heap @@ -27,17 +28,20 @@ func TestRetryHeap(t *testing.T) { // Create test batches target := Target{Group: "group", Stream: "stream"} batch1 := newLogEventBatch(target, nil) - batch1.nextRetryTime = time.Now().Add(1 * time.Hour) + batch1.nextRetryTime = time.Now().Add(1 * time.Second) batch2 := newLogEventBatch(target, nil) - batch2.nextRetryTime = time.Now().Add(-1 * time.Second) + batch2.nextRetryTime = time.Now().Add(-1 * time.Second) // Ready now // Push batches - heap.Push(batch1) - heap.Push(batch2) + err := heap.Push(batch1) + assert.NoError(t, err) + err = heap.Push(batch2) + assert.NoError(t, err) assert.Equal(t, 2, heap.Size()) + // Pop ready batches ready = heap.PopReady() assert.Len(t, ready, 1) assert.Equal(t, batch2, ready[0]) @@ -45,7 +49,7 @@ func TestRetryHeap(t *testing.T) { } func TestRetryHeapOrdering(t *testing.T) { - heap := NewRetryHeap(10, &testutil.Logger{}) + heap := NewRetryHeap(&testutil.Logger{}) defer heap.Stop() target := Target{Group: "group", Stream: "stream"} @@ -77,7 +81,7 @@ func TestRetryHeapOrdering(t *testing.T) { } func TestRetryHeapProcessor(t *testing.T) { - heap := NewRetryHeap(10, &testutil.Logger{}) + heap := NewRetryHeap(&testutil.Logger{}) defer heap.Stop() // Create mock components with proper signature @@ -86,18 +90,17 @@ func TestRetryHeapProcessor(t *testing.T) { mockService := &mockLogsService{} mockTargetManager := &mockTargetManager{} - processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, time.Hour, retryer.NewLogThrottleRetryer(&testutil.Logger{})) + processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, retryer.NewLogThrottleRetryer(&testutil.Logger{})) defer processor.Stop() // Test start/stop processor.Start() - processor.Stop() assert.True(t, processor.stopped) } func TestRetryHeapProcessorExpiredBatch(t *testing.T) { - heap := NewRetryHeap(10, &testutil.Logger{}) + heap := NewRetryHeap(&testutil.Logger{}) defer heap.Stop() workerPool := NewWorkerPool(2) @@ -105,106 +108,101 @@ func TestRetryHeapProcessorExpiredBatch(t *testing.T) { mockService := &mockLogsService{} mockTargetManager := &mockTargetManager{} - processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, 1*time.Millisecond, retryer.NewLogThrottleRetryer(&testutil.Logger{})) + processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, retryer.NewLogThrottleRetryer(&testutil.Logger{})) - // Create expired batch target := Target{Group: "group", Stream: "stream"} batch := newLogEventBatch(target, nil) - batch.startTime = time.Now().Add(-1 * time.Hour) + batch.initializeStartTime() + batch.expireAfter = time.Now().Add(-1 * time.Hour) // Already expired batch.nextRetryTime = time.Now().Add(-1 * time.Second) + var doneCalled bool + batch.addDoneCallback(func() { doneCalled = true }) + heap.Push(batch) - // Process should drop expired batch processor.processReadyMessages() - assert.Equal(t, 0, heap.Size()) + assert.Equal(t, 0, heap.Size(), "Expired batch should be removed from heap") + assert.True(t, doneCalled, "done() should be called on expired batch to resume circuit breaker") } func TestRetryHeapProcessorSendsBatch(t *testing.T) { - heap := NewRetryHeap(10, &testutil.Logger{}) + heap := NewRetryHeap(&testutil.Logger{}) defer heap.Stop() workerPool := NewWorkerPool(2) defer workerPool.Stop() + mockService := &mockLogsService{} + mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil) mockTargetManager := &mockTargetManager{} + mockTargetManager.On("EnsureTargetExists", mock.Anything).Return(nil) - processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, time.Hour, retryer.NewLogThrottleRetryer(&testutil.Logger{})) + processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, retryer.NewLogThrottleRetryer(&testutil.Logger{})) - // Create ready batch (retryTime already past) target := Target{Group: "group", Stream: "stream"} batch := newLogEventBatch(target, nil) + batch.append(newLogEvent(time.Now(), "test message", nil)) batch.nextRetryTime = time.Now().Add(-1 * time.Second) + var doneCalled atomic.Bool + batch.addDoneCallback(func() { doneCalled.Store(true) }) + heap.Push(batch) - // Process should send batch processor.processReadyMessages() + time.Sleep(200 * time.Millisecond) + assert.Equal(t, 0, heap.Size()) + assert.True(t, doneCalled.Load(), "Batch done callback should be called on successful send") + mockService.AssertCalled(t, "PutLogEvents", mock.Anything) } -func TestRetryHeap_SemaphoreBlockingAndUnblocking(t *testing.T) { - heap := NewRetryHeap(2, &testutil.Logger{}) // maxSize = 2 +func TestRetryHeap_UnboundedPush(t *testing.T) { + heap := NewRetryHeap(&testutil.Logger{}) // maxSize parameter ignored (unbounded) defer heap.Stop() - // Fill heap to capacity with expired batches + // Push multiple batches without blocking target := Target{Group: "group", Stream: "stream"} batch1 := newLogEventBatch(target, nil) - batch1.nextRetryTime = time.Now().Add(-1 * time.Hour) + batch1.nextRetryTime = time.Now().Add(50 * time.Millisecond) batch2 := newLogEventBatch(target, nil) - batch2.nextRetryTime = time.Now().Add(-1 * time.Hour) - - heap.Push(batch1) - heap.Push(batch2) - - // Verify heap is at capacity - if heap.Size() != 2 { - t.Fatalf("Expected size 2, got %d", heap.Size()) + batch2.nextRetryTime = time.Now().Add(50 * time.Millisecond) + batch3 := newLogEventBatch(target, nil) + batch3.nextRetryTime = time.Now().Add(50 * time.Millisecond) + + // All pushes should succeed immediately (non-blocking) + err := heap.Push(batch1) + assert.NoError(t, err) + err = heap.Push(batch2) + assert.NoError(t, err) + err = heap.Push(batch3) + assert.NoError(t, err) + + // Verify heap can grow beyond original maxSize parameter + if heap.Size() != 3 { + t.Fatalf("Expected size 3, got %d", heap.Size()) } - // Test that semaphore is actually blocking by trying to push in a goroutine - pushResult := make(chan error, 1) - - go func() { - batch3 := newLogEventBatch(target, nil) - batch3.nextRetryTime = time.Now().Add(-1 * time.Hour) - heap.Push(batch3) // This should block on semaphore - pushResult <- nil - }() - - // Verify the push is blocked (expects no result in channel) - select { - case <-pushResult: - t.Fatal("Unexpected push, heap should be blocked") - case <-time.After(100 * time.Millisecond): - // Push is successfully blocked when at capacity - } + time.Sleep(100 * time.Millisecond) - // Pop ready batches to release semaphore slots + // Pop ready batches readyBatches := heap.PopReady() - assert.Len(t, readyBatches, 2, "Should pop exactly 2 ready batches") + assert.Len(t, readyBatches, 3, "Should pop exactly 3 ready batches") for _, batch := range readyBatches { assert.Equal(t, "group", batch.Group) assert.Equal(t, "stream", batch.Stream) } - // Expects push to now be unblocked - select { - case err := <-pushResult: - assert.NoError(t, err, "Push should succeed after PopReady") - case <-time.After(100 * time.Millisecond): - t.Fatal("Unexpected timeout, heap should be unblocked") - } - - // Verify 1 item remaining in heap (2 popped, 1 pushed) - if heap.Size() != 1 { - t.Fatalf("Expected size 1 after pop/push cycle, got %d", heap.Size()) + // Verify heap is empty + if heap.Size() != 0 { + t.Fatalf("Expected size 0 after pop, got %d", heap.Size()) } } func TestRetryHeapProcessorNoReadyBatches(t *testing.T) { - heap := NewRetryHeap(10, &testutil.Logger{}) + heap := NewRetryHeap(&testutil.Logger{}) defer heap.Stop() workerPool := NewWorkerPool(2) @@ -212,7 +210,7 @@ func TestRetryHeapProcessorNoReadyBatches(t *testing.T) { mockService := &mockLogsService{} mockTargetManager := &mockTargetManager{} - processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, time.Hour, retryer.NewLogThrottleRetryer(&testutil.Logger{})) + processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, retryer.NewLogThrottleRetryer(&testutil.Logger{})) // Process with empty heap - should not panic processor.processReadyMessages() @@ -221,7 +219,7 @@ func TestRetryHeapProcessorNoReadyBatches(t *testing.T) { } func TestRetryHeapProcessorFailedBatchGoesBackToHeap(t *testing.T) { - heap := NewRetryHeap(10, &testutil.Logger{}) + heap := NewRetryHeap(&testutil.Logger{}) defer heap.Stop() workerPool := NewWorkerPool(2) @@ -234,7 +232,7 @@ func TestRetryHeapProcessorFailedBatchGoesBackToHeap(t *testing.T) { mockTargetManager := &mockTargetManager{} mockTargetManager.On("InitTarget", mock.Anything).Return(nil) - processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, time.Hour, retryer.NewLogThrottleRetryer(&testutil.Logger{})) + processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, retryer.NewLogThrottleRetryer(&testutil.Logger{})) processor.Start() defer processor.Stop() @@ -261,7 +259,7 @@ func TestRetryHeapProcessorFailedBatchGoesBackToHeap(t *testing.T) { } func TestRetryHeapStopTwice(t *testing.T) { - rh := NewRetryHeap(5, &testutil.Logger{}) + rh := NewRetryHeap(&testutil.Logger{}) // Call Stop twice - should not panic rh.Stop() @@ -271,14 +269,14 @@ func TestRetryHeapStopTwice(t *testing.T) { target := Target{Group: "test-group", Stream: "test-stream"} batch := newLogEventBatch(target, nil) - rh.Push(batch) // Should not panic or return error + rh.Push(batch) // Verify heap is empty (nothing was pushed) assert.Equal(t, 0, rh.Size()) } func TestRetryHeapProcessorStoppedProcessReadyMessages(t *testing.T) { - heap := NewRetryHeap(10, &testutil.Logger{}) + heap := NewRetryHeap(&testutil.Logger{}) defer heap.Stop() workerPool := NewWorkerPool(2) @@ -286,7 +284,7 @@ func TestRetryHeapProcessorStoppedProcessReadyMessages(t *testing.T) { mockService := &mockLogsService{} mockTargetManager := &mockTargetManager{} - processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, time.Hour, retryer.NewLogThrottleRetryer(&testutil.Logger{})) + processor := NewRetryHeapProcessor(heap, workerPool, mockService, mockTargetManager, &testutil.Logger{}, retryer.NewLogThrottleRetryer(&testutil.Logger{})) // Add a ready batch to the heap target := Target{Group: "group", Stream: "stream"} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go b/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go index 1b5f13fd48..902bb166f7 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go @@ -5,7 +5,6 @@ package pusher import ( "errors" - "sync/atomic" "time" "github.com/aws/aws-sdk-go/aws/awserr" @@ -24,14 +23,11 @@ type cloudWatchLogsService interface { type Sender interface { Send(*logEventBatch) - SetRetryDuration(time.Duration) - RetryDuration() time.Duration Stop() } type sender struct { service cloudWatchLogsService - retryDuration atomic.Value targetManager TargetManager logger telegraf.Logger stopCh chan struct{} @@ -45,7 +41,6 @@ func newSender( logger telegraf.Logger, service cloudWatchLogsService, targetManager TargetManager, - retryDuration time.Duration, retryHeap RetryHeap, ) Sender { s := &sender{ @@ -56,7 +51,6 @@ func newSender( stopped: false, retryHeap: retryHeap, } - s.retryDuration.Store(retryDuration) return s } @@ -118,7 +112,7 @@ func (s *sender) Send(batch *logEventBatch) { // Check if retry would exceed max duration totalRetries := batch.retryCountShort + batch.retryCountLong - 1 - if batch.nextRetryTime.After(batch.startTime.Add(s.RetryDuration())) { + if batch.isExpired() { s.logger.Errorf("All %v retries to %v/%v failed for PutLogEvents, request dropped.", totalRetries, batch.Group, batch.Stream) batch.updateState() return @@ -127,7 +121,14 @@ func (s *sender) Send(batch *logEventBatch) { // If RetryHeap available, push to RetryHeap and return // Otherwise, continue with existing busy-wait retry behavior if s.retryHeap != nil { - s.retryHeap.Push(batch) + if err := s.retryHeap.Push(batch); err != nil { + // Heap is stopped (shutdown in progress). Persist file offsets + // so these events aren't re-read on restart, then notify the + // circuit breaker so the queue isn't permanently halted. + s.logger.Warnf("RetryHeap stopped, dropping batch for %v/%v: %v", batch.Group, batch.Stream, err) + batch.done() + return + } batch.fail() return } @@ -157,13 +158,3 @@ func (s *sender) Stop() { close(s.stopCh) s.stopped = true } - -// SetRetryDuration sets the maximum duration for retrying failed log sends. -func (s *sender) SetRetryDuration(retryDuration time.Duration) { - s.retryDuration.Store(retryDuration) -} - -// RetryDuration returns the current maximum retry duration. -func (s *sender) RetryDuration() time.Duration { - return s.retryDuration.Load().(time.Duration) -} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go index 7207283743..973533f3ab 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go @@ -80,7 +80,7 @@ func TestSender(t *testing.T) { mockManager := new(mockTargetManager) mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once() - s := newSender(logger, mockService, mockManager, time.Second, nil) + s := newSender(logger, mockService, mockManager, nil) s.Send(batch) s.Stop() @@ -103,7 +103,7 @@ func TestSender(t *testing.T) { mockManager := new(mockTargetManager) mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{RejectedLogEventsInfo: rejectedInfo}, nil).Once() - s := newSender(logger, mockService, mockManager, time.Second, nil) + s := newSender(logger, mockService, mockManager, nil) s.Send(batch) s.Stop() @@ -122,7 +122,7 @@ func TestSender(t *testing.T) { mockManager.On("InitTarget", mock.Anything).Return(nil).Once() mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once() - s := newSender(logger, mockService, mockManager, time.Second, nil) + s := newSender(logger, mockService, mockManager, nil) s.Send(batch) s.Stop() @@ -149,7 +149,7 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, &cloudwatchlogs.InvalidParameterException{}).Once() - s := newSender(logger, mockService, mockManager, time.Second, nil) + s := newSender(logger, mockService, mockManager, nil) s.Send(batch) s.Stop() @@ -177,7 +177,7 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, &cloudwatchlogs.DataAlreadyAcceptedException{}).Once() - s := newSender(logger, mockService, mockManager, time.Second, nil) + s := newSender(logger, mockService, mockManager, nil) s.Send(batch) s.Stop() @@ -205,7 +205,7 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, errors.New("test")).Once() - s := newSender(logger, mockService, mockManager, time.Second, nil) + s := newSender(logger, mockService, mockManager, nil) s.Send(batch) s.Stop() @@ -225,7 +225,7 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once() - s := newSender(logger, mockService, mockManager, time.Second, nil) + s := newSender(logger, mockService, mockManager, nil) s.Send(batch) s.Stop() @@ -251,7 +251,12 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, awserr.New("SomeAWSError", "Some AWS error", nil)).Once() - s := newSender(logger, mockService, mockManager, 100*time.Millisecond, nil) + s := newSender(logger, mockService, mockManager, nil) + + // Set expireAfter to past time so batch expires immediately after first retry + batch.initializeStartTime() + batch.expireAfter = time.Now().Add(-1 * time.Hour) + s.Send(batch) s.Stop() @@ -279,7 +284,7 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, awserr.New("SomeAWSError", "Some AWS error", nil)).Once() - s := newSender(logger, mockService, mockManager, time.Second, nil) + s := newSender(logger, mockService, mockManager, nil) go func() { time.Sleep(50 * time.Millisecond) @@ -299,10 +304,10 @@ func TestSenderConcurrencyWithRetryHeap(t *testing.T) { mockManager := new(mockTargetManager) mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, &cloudwatchlogs.ServiceUnavailableException{}).Once() - retryHeap := NewRetryHeap(10, logger) + retryHeap := NewRetryHeap(logger) defer retryHeap.Stop() - s := newSender(logger, mockService, mockManager, time.Hour, retryHeap) + s := newSender(logger, mockService, mockManager, retryHeap) batch := newLogEventBatch(Target{Group: "test-group", Stream: "test-stream"}, nil) batch.append(newLogEvent(time.Now(), "Test message", nil)) @@ -325,7 +330,7 @@ func TestSenderConcurrencyFallbackToSync(t *testing.T) { mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once() // Concurrency enabled but nil RetryHeap should fall back to sync - s := newSender(logger, mockService, mockManager, 2*time.Second, nil) + s := newSender(logger, mockService, mockManager, nil) batch := newLogEventBatch(Target{Group: "test-group", Stream: "test-stream"}, nil) batch.append(newLogEvent(time.Now(), "Test message", nil)) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/state_callback_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/state_callback_test.go new file mode 100644 index 0000000000..5450e52e80 --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/state_callback_test.go @@ -0,0 +1,188 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/aws/amazon-cloudwatch-agent/internal/retryer" + "github.com/aws/amazon-cloudwatch-agent/internal/state" + "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" + "github.com/aws/amazon-cloudwatch-agent/tool/testutil" +) + +type mockFileRangeQueue struct { + mock.Mock +} + +func (m *mockFileRangeQueue) ID() string { + return m.Called().String(0) +} + +func (m *mockFileRangeQueue) Enqueue(r state.Range) { + m.Called(r) +} + +// newStatefulBatch creates a batch with stateful events that register state callbacks. +func newStatefulBatch(target Target, queue *mockFileRangeQueue) *logEventBatch { + batch := newLogEventBatch(target, nil) + now := time.Now() + evt := newStatefulLogEvent(now, "test", nil, &logEventState{ + r: state.NewRange(0, 100), + queue: queue, + }) + batch.append(evt) + return batch +} + +// TestRetryHeapSuccessCallsStateCallback verifies that when a batch succeeds +// on retry through the heap, state callbacks fire to persist file offsets. +func TestRetryHeapSuccessCallsStateCallback(t *testing.T) { + logger := testutil.NewNopLogger() + target := Target{Group: "group", Stream: "stream"} + + queue := &mockFileRangeQueue{} + queue.On("ID").Return("file1") + queue.On("Enqueue", mock.Anything).Return() + + service := &stubLogsService{ + ple: func(_ *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + return &cloudwatchlogs.PutLogEventsOutput{}, nil + }, + cls: func(_ *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + return &cloudwatchlogs.CreateLogStreamOutput{}, nil + }, + clg: func(_ *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { + return &cloudwatchlogs.CreateLogGroupOutput{}, nil + }, + dlg: func(_ *cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) { + return &cloudwatchlogs.DescribeLogGroupsOutput{}, nil + }, + } + + retryHeap := NewRetryHeap(logger) + workerPool := NewWorkerPool(2) + tm := NewTargetManager(logger, service) + defer retryHeap.Stop() + defer workerPool.Stop() + + processor := NewRetryHeapProcessor(retryHeap, workerPool, service, tm, logger, retryer.NewLogThrottleRetryer(logger)) + + batch := newStatefulBatch(target, queue) + batch.nextRetryTime = time.Now().Add(-1 * time.Second) + + err := retryHeap.Push(batch) + assert.NoError(t, err) + + processor.processReadyMessages() + time.Sleep(200 * time.Millisecond) + + assert.Equal(t, 0, retryHeap.Size(), "Heap should be empty after success") + queue.AssertCalled(t, "Enqueue", mock.Anything) +} + +// TestRetryHeapExpiryCallsStateCallback verifies that when a batch expires +// after 14 days without successfully publishing, state callbacks still fire +// to persist file offsets and prevent re-reading on restart. +func TestRetryHeapExpiryCallsStateCallback(t *testing.T) { + logger := testutil.NewNopLogger() + target := Target{Group: "group", Stream: "stream"} + + queue := &mockFileRangeQueue{} + queue.On("ID").Return("file1") + queue.On("Enqueue", mock.Anything).Return() + + service := &stubLogsService{ + ple: func(_ *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + return nil, &cloudwatchlogs.ServiceUnavailableException{} + }, + cls: func(_ *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + return &cloudwatchlogs.CreateLogStreamOutput{}, nil + }, + clg: func(_ *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { + return &cloudwatchlogs.CreateLogGroupOutput{}, nil + }, + dlg: func(_ *cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) { + return &cloudwatchlogs.DescribeLogGroupsOutput{}, nil + }, + } + + retryHeap := NewRetryHeap(logger) + workerPool := NewWorkerPool(2) + tm := NewTargetManager(logger, service) + defer retryHeap.Stop() + defer workerPool.Stop() + + processor := NewRetryHeapProcessor(retryHeap, workerPool, service, tm, logger, nil) + + batch := newStatefulBatch(target, queue) + batch.initializeStartTime() + batch.expireAfter = time.Now().Add(-10 * time.Millisecond) // Already expired + batch.updateRetryMetadata(&cloudwatchlogs.ServiceUnavailableException{}) + batch.nextRetryTime = time.Now().Add(-1 * time.Second) // Override to make it ready + + err := retryHeap.Push(batch) + assert.NoError(t, err) + + processor.processReadyMessages() + time.Sleep(200 * time.Millisecond) + + assert.Equal(t, 0, retryHeap.Size(), "Expired batch should be removed") + queue.AssertCalled(t, "Enqueue", mock.Anything) +} + +// TestShutdownDoesNotCallStateCallback verifies that during a clean shutdown +// via Stop(), remaining batches in the retry heap do NOT have their state +// callbacks invoked. This prevents marking undelivered data as processed. +func TestShutdownDoesNotCallStateCallback(t *testing.T) { + logger := testutil.NewNopLogger() + target := Target{Group: "group", Stream: "stream"} + + var stateCallCount atomic.Int32 + + retryHeap := NewRetryHeap(logger) + workerPool := NewWorkerPool(2) + defer workerPool.Stop() + + service := &stubLogsService{ + ple: func(_ *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + return nil, &cloudwatchlogs.ServiceUnavailableException{} + }, + cls: func(_ *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + return &cloudwatchlogs.CreateLogStreamOutput{}, nil + }, + clg: func(_ *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { + return &cloudwatchlogs.CreateLogGroupOutput{}, nil + }, + dlg: func(_ *cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) { + return &cloudwatchlogs.DescribeLogGroupsOutput{}, nil + }, + } + tm := NewTargetManager(logger, service) + + processor := NewRetryHeapProcessor(retryHeap, workerPool, service, tm, logger, nil) + processor.Start() + + // Push a batch with a future retry time so it won't be processed before Stop + batch := newLogEventBatch(target, nil) + batch.append(newLogEvent(time.Now(), "test", nil)) + batch.addStateCallback(func() { stateCallCount.Add(1) }) + batch.nextRetryTime = time.Now().Add(1 * time.Hour) // Not ready yet + + err := retryHeap.Push(batch) + assert.NoError(t, err) + + // Stop the processor — batch is still in heap, not ready + processor.Stop() + retryHeap.Stop() + + assert.Equal(t, int32(0), stateCallCount.Load(), + "State callback should not be called for unprocessed batches during shutdown") + assert.Equal(t, 1, retryHeap.Size(), "Batch should remain in heap after shutdown") +}