Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions pkg/workflow/orchestrator/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@ type CreateSnapshotInput struct {
ResourceTypes []types.ResourceType
ScanStartTime time.Time
ScanEndTime time.Time

// ExpectedFindingsCounts is the per-resource-type findings count that
// the corresponding detection workflow reported. CreateSnapshot
// compares it to what it actually finds in the store and refuses to
// persist a snapshot if any expected count is missing.
//
// This guards against the "worker OOM mid-scan" failure mode where
// detection workflows store findings in worker-local memory, the
// worker dies, and a retry of CreateSnapshot on a fresh worker would
// otherwise read an empty store and silently publish a 0-finding
// snapshot to S3. With expected counts the activity fails (and
// Temporal retries it) instead of persisting garbage.
ExpectedFindingsCounts map[types.ResourceType]int
}

type SnapshotResult struct {
Expand Down Expand Up @@ -72,6 +85,21 @@ func (a *Activities) CreateSnapshot(ctx context.Context, input CreateSnapshotInp
return nil, fmt.Errorf("retrieve findings for %s: %w", resourceType, err)
}
logger.Info("Retrieved findings for snapshot", "resourceType", resourceType, "count", len(findings))

// Refuse to persist a snapshot whose findings disagree with what
// the detection workflow reported. This catches the case where
// the worker that ran detection died (e.g. OOMKilled) and a
// retry of CreateSnapshot lands on a fresh worker with an empty
// in-memory store — without this check the activity would
// happily publish a 0-finding snapshot to S3.
if expected, ok := input.ExpectedFindingsCounts[resourceType]; ok && len(findings) != expected {
return nil, fmt.Errorf(
"findings count mismatch for %s: expected %d (from detection workflow), found %d in store; "+
"the worker that ran detection likely restarted before the snapshot was persisted",
resourceType, expected, len(findings),
)
}

builder.AddFindings(resourceType, findings)
}

Expand Down
37 changes: 34 additions & 3 deletions pkg/workflow/orchestrator/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ func TestActivities_CreateSnapshot_PersistFailureReturnsError(t *testing.T) {
}

func TestActivities_CreateSnapshot_EmptyFindings(t *testing.T) {
// No findings in the store → snapshot is built with zero counts but
// the activity still succeeds. This mirrors the workflow's
// "successful child but empty inventory" path.
// No findings in the store and the detection workflow also reported
// 0 → snapshot is built with zero counts and the activity still
// succeeds. Mirrors the "successful child but empty inventory" path.
st := memory.NewStore()
fakeSnap := &fakeSnapshotStore{}
a := NewActivities(st, fakeSnap)
Expand All @@ -155,9 +155,40 @@ func TestActivities_CreateSnapshot_EmptyFindings(t *testing.T) {
ResourceTypes: []types.ResourceType{types.ResourceTypeAurora},
ScanStartTime: time.Now(),
ScanEndTime: time.Now(),
ExpectedFindingsCounts: map[types.ResourceType]int{
types.ResourceTypeAurora: 0,
},
})
require.NoError(t, err)
assert.Equal(t, 0, result.TotalFindings)
assert.Equal(t, "scan-empty", result.SnapshotID)
require.NotNil(t, fakeSnap.saved)
}

func TestActivities_CreateSnapshot_FindingsCountMismatch_FailsWithoutSaving(t *testing.T) {
// Detection workflow reported 5 aurora findings but the in-memory
// store has none — this is the "worker died after detection
// stored findings, snapshot retried on a fresh worker" scenario.
// The activity must error out and NOT persist anything to S3.
st := memory.NewStore()
fakeSnap := &fakeSnapshotStore{}
a := NewActivities(st, fakeSnap)

_, err := runCreateSnapshotActivity(t, a, &CreateSnapshotInput{
ScanID: "scan-mismatch",
ResourceTypes: []types.ResourceType{types.ResourceTypeAurora},
ScanStartTime: time.Now(),
ScanEndTime: time.Now(),
ExpectedFindingsCounts: map[types.ResourceType]int{
types.ResourceTypeAurora: 5,
},
})
require.Error(t, err)
assert.Contains(t, err.Error(), "findings count mismatch")
assert.Contains(t, err.Error(), "expected 5")
assert.Contains(t, err.Error(), "found 0")

// And critically: nothing was persisted to S3.
assert.Equal(t, 0, fakeSnap.saveCallCount, "must not call SaveSnapshot when validation fails")
assert.Nil(t, fakeSnap.saved)
}
11 changes: 7 additions & 4 deletions pkg/workflow/orchestrator/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO
// order pinned to the input order, which the caller controls.
resourceTypeResults := make(map[types.ResourceType]*ResourceTypeResult, len(resourceTypes))
successfulTypes := make([]types.ResourceType, 0, len(resourceTypes))
expectedFindingsCounts := make(map[types.ResourceType]int, len(resourceTypes))

for _, resourceType := range resourceTypes {
future := futures[resourceType]
Expand Down Expand Up @@ -166,6 +167,7 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO

resourceTypeResults[resourceType] = result
successfulTypes = append(successfulTypes, resourceType)
expectedFindingsCounts[resourceType] = output.FindingsCount
}

logger.Info("Stage 1: Detect - All detection workflows completed", "successCount", len(successfulTypes))
Expand All @@ -185,10 +187,11 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO
}),
CreateSnapshotActivityName,
CreateSnapshotInput{
ScanID: input.ScanID,
ResourceTypes: successfulTypes,
ScanStartTime: startTime,
ScanEndTime: workflow.Now(ctx),
ScanID: input.ScanID,
ResourceTypes: successfulTypes,
ScanStartTime: startTime,
ScanEndTime: workflow.Now(ctx),
ExpectedFindingsCounts: expectedFindingsCounts,
},
).Get(ctx, &snapshotResult)

Expand Down
Loading