From 681f01b867e08ed6c4ee490c117de8998279491b Mon Sep 17 00:00:00 2001 From: Ben Apprederisse Date: Mon, 4 May 2026 10:39:35 -0700 Subject: [PATCH] orchestrator: refuse to persist a snapshot when findings are missing Background: on 2026-05-04 a scheduled scan in cash-utility-stage silently produced an empty snapshot.json in S3. The pod was OOMKilled mid-scan (exit 137 at 512Mi); after restart, Temporal retried the `CreateSnapshot` activity on a fresh worker whose in-memory finding store was empty. The activity happily reported `count 0` for every resource type and persisted an all-null snapshot, overwriting the previous good `latest.json`. Fix: `CreateSnapshot` now compares the per-resource-type findings count it reads from the in-memory store against the count each detection workflow reported. If they disagree the activity errors out *before* calling SaveSnapshot, so nothing is written to S3. Temporal retries the activity (per the existing retry policy); when a fresh worker can't recover the lost findings, the workflow fails and the next scheduled run starts cleanly. The orchestrator workflow already collects each detection workflow's `FindingsCount` from its WorkflowOutput, so the expected counts are populated for free without any new RPCs. Tests: - TestActivities_CreateSnapshot_FindingsCountMismatch_FailsWithoutSaving asserts the new failure path doesn't call SaveSnapshot. - The existing happy-path / empty-findings tests are updated to pass matching expected counts. Amp-Thread-ID: https://ampcode.com/threads/T-019df3d1-962c-7051-a156-8cf596d7dc1d Co-authored-by: Amp --- pkg/workflow/orchestrator/activities.go | 28 +++++++++++++++ pkg/workflow/orchestrator/activities_test.go | 37 ++++++++++++++++++-- pkg/workflow/orchestrator/workflow.go | 11 +++--- 3 files changed, 69 insertions(+), 7 deletions(-) diff --git a/pkg/workflow/orchestrator/activities.go b/pkg/workflow/orchestrator/activities.go index ea0ac4a..708815b 100644 --- a/pkg/workflow/orchestrator/activities.go +++ b/pkg/workflow/orchestrator/activities.go @@ -25,6 +25,19 @@ type CreateSnapshotInput struct { ResourceTypes []types.ResourceType ScanStartTime time.Time ScanEndTime time.Time + + // ExpectedFindingsCounts is the per-resource-type findings count that + // the corresponding detection workflow reported. CreateSnapshot + // compares it to what it actually finds in the store and refuses to + // persist a snapshot if any expected count is missing. + // + // This guards against the "worker OOM mid-scan" failure mode where + // detection workflows store findings in worker-local memory, the + // worker dies, and a retry of CreateSnapshot on a fresh worker would + // otherwise read an empty store and silently publish a 0-finding + // snapshot to S3. With expected counts the activity fails (and + // Temporal retries it) instead of persisting garbage. + ExpectedFindingsCounts map[types.ResourceType]int } type SnapshotResult struct { @@ -72,6 +85,21 @@ func (a *Activities) CreateSnapshot(ctx context.Context, input CreateSnapshotInp return nil, fmt.Errorf("retrieve findings for %s: %w", resourceType, err) } logger.Info("Retrieved findings for snapshot", "resourceType", resourceType, "count", len(findings)) + + // Refuse to persist a snapshot whose findings disagree with what + // the detection workflow reported. This catches the case where + // the worker that ran detection died (e.g. OOMKilled) and a + // retry of CreateSnapshot lands on a fresh worker with an empty + // in-memory store — without this check the activity would + // happily publish a 0-finding snapshot to S3. + if expected, ok := input.ExpectedFindingsCounts[resourceType]; ok && len(findings) != expected { + return nil, fmt.Errorf( + "findings count mismatch for %s: expected %d (from detection workflow), found %d in store; "+ + "the worker that ran detection likely restarted before the snapshot was persisted", + resourceType, expected, len(findings), + ) + } + builder.AddFindings(resourceType, findings) } diff --git a/pkg/workflow/orchestrator/activities_test.go b/pkg/workflow/orchestrator/activities_test.go index 2cd6534..4d874f7 100644 --- a/pkg/workflow/orchestrator/activities_test.go +++ b/pkg/workflow/orchestrator/activities_test.go @@ -143,9 +143,9 @@ func TestActivities_CreateSnapshot_PersistFailureReturnsError(t *testing.T) { } func TestActivities_CreateSnapshot_EmptyFindings(t *testing.T) { - // No findings in the store → snapshot is built with zero counts but - // the activity still succeeds. This mirrors the workflow's - // "successful child but empty inventory" path. + // No findings in the store and the detection workflow also reported + // 0 → snapshot is built with zero counts and the activity still + // succeeds. Mirrors the "successful child but empty inventory" path. st := memory.NewStore() fakeSnap := &fakeSnapshotStore{} a := NewActivities(st, fakeSnap) @@ -155,9 +155,40 @@ func TestActivities_CreateSnapshot_EmptyFindings(t *testing.T) { ResourceTypes: []types.ResourceType{types.ResourceTypeAurora}, ScanStartTime: time.Now(), ScanEndTime: time.Now(), + ExpectedFindingsCounts: map[types.ResourceType]int{ + types.ResourceTypeAurora: 0, + }, }) require.NoError(t, err) assert.Equal(t, 0, result.TotalFindings) assert.Equal(t, "scan-empty", result.SnapshotID) require.NotNil(t, fakeSnap.saved) } + +func TestActivities_CreateSnapshot_FindingsCountMismatch_FailsWithoutSaving(t *testing.T) { + // Detection workflow reported 5 aurora findings but the in-memory + // store has none — this is the "worker died after detection + // stored findings, snapshot retried on a fresh worker" scenario. + // The activity must error out and NOT persist anything to S3. + st := memory.NewStore() + fakeSnap := &fakeSnapshotStore{} + a := NewActivities(st, fakeSnap) + + _, err := runCreateSnapshotActivity(t, a, &CreateSnapshotInput{ + ScanID: "scan-mismatch", + ResourceTypes: []types.ResourceType{types.ResourceTypeAurora}, + ScanStartTime: time.Now(), + ScanEndTime: time.Now(), + ExpectedFindingsCounts: map[types.ResourceType]int{ + types.ResourceTypeAurora: 5, + }, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "findings count mismatch") + assert.Contains(t, err.Error(), "expected 5") + assert.Contains(t, err.Error(), "found 0") + + // And critically: nothing was persisted to S3. + assert.Equal(t, 0, fakeSnap.saveCallCount, "must not call SaveSnapshot when validation fails") + assert.Nil(t, fakeSnap.saved) +} diff --git a/pkg/workflow/orchestrator/workflow.go b/pkg/workflow/orchestrator/workflow.go index 34044be..ca212ba 100644 --- a/pkg/workflow/orchestrator/workflow.go +++ b/pkg/workflow/orchestrator/workflow.go @@ -137,6 +137,7 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO // order pinned to the input order, which the caller controls. resourceTypeResults := make(map[types.ResourceType]*ResourceTypeResult, len(resourceTypes)) successfulTypes := make([]types.ResourceType, 0, len(resourceTypes)) + expectedFindingsCounts := make(map[types.ResourceType]int, len(resourceTypes)) for _, resourceType := range resourceTypes { future := futures[resourceType] @@ -166,6 +167,7 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO resourceTypeResults[resourceType] = result successfulTypes = append(successfulTypes, resourceType) + expectedFindingsCounts[resourceType] = output.FindingsCount } logger.Info("Stage 1: Detect - All detection workflows completed", "successCount", len(successfulTypes)) @@ -185,10 +187,11 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO }), CreateSnapshotActivityName, CreateSnapshotInput{ - ScanID: input.ScanID, - ResourceTypes: successfulTypes, - ScanStartTime: startTime, - ScanEndTime: workflow.Now(ctx), + ScanID: input.ScanID, + ResourceTypes: successfulTypes, + ScanStartTime: startTime, + ScanEndTime: workflow.Now(ctx), + ExpectedFindingsCounts: expectedFindingsCounts, }, ).Get(ctx, &snapshotResult)