From 9ae9e8d5e8a13a15b1c202fe23e40a5dbc03274f Mon Sep 17 00:00:00 2001 From: Ben Apprederisse Date: Thu, 30 Apr 2026 14:41:58 -0700 Subject: [PATCH] fix(schedule): always refresh Action.Args on existing schedules MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous update path skipped when cron+jitter matched and never touched Action.Args, so a schedule created on an older code revision (when the orchestrator carried a hardcoded fallback resource list) kept a stale ResourceTypes:null in its Args forever. After PR #55 made empty ResourceTypes a hard error (ErrNoResourceTypes), every cron firing failed instantly with no log past 'Starting orchestrator workflow' — the workflow returned the error before logging it. Fix: drop the skip-when-same optimization and always rewrite Spec + Action.Args + TaskQueue from the current cfg whenever an existing schedule is found. Args are encoded as opaque payloads in the Schedule and cannot be reliably diffed; one Update RPC per pod startup is a trivial cost compared to the silent-arg-drift outage. Adds a regression test that simulates a stale Args payload and asserts the rewrite produces the current cfg's ResourceTypes. Amp-Thread-ID: https://ampcode.com/threads/T-019ddff6-22a2-764e-bc7e-da55598b7ccc Co-authored-by: Amp --- pkg/schedule/schedule.go | 51 ++++++++++++++------------ pkg/schedule/schedule_test.go | 69 ++++++++++++++++++++++++++++++----- 2 files changed, 88 insertions(+), 32 deletions(-) diff --git a/pkg/schedule/schedule.go b/pkg/schedule/schedule.go index 6df92c1..fdfd584 100644 --- a/pkg/schedule/schedule.go +++ b/pkg/schedule/schedule.go @@ -91,29 +91,30 @@ func (m *Manager) EnsureSchedule(ctx context.Context, cfg Config) error { } handle := m.scheduleClient.GetHandle(ctx, cfg.ScheduleID) - desc, err := handle.Describe(ctx) - if err != nil { - return fmt.Errorf("failed to describe existing schedule %q: %w", cfg.ScheduleID, err) - } - - // Check if the cron expression or jitter has changed - existingSpec := desc.Schedule.Spec - if existingSpec == nil { - existingSpec = &client.ScheduleSpec{} - } - existingCrons := existingSpec.CronExpressions - if len(existingCrons) == 1 && existingCrons[0] == cfg.CronExpression && existingSpec.Jitter == cfg.Jitter { - fmt.Printf(" Schedule %q already configured (cron: %s)\n", cfg.ScheduleID, cfg.CronExpression) - return nil - } - // Update the schedule with the new spec. - // We replace the entire Spec rather than mutating fields because Temporal - // parses CronExpressions into Calendars/StructuredCalendar server-side on - // create. On subsequent describes, the cron lives in Calendars and - // CronExpressions comes back empty — mutating CronExpressions alone would - // leave stale calendars in place, causing the schedule to fire on both - // the old and new cadences after every restart with a changed cron. + // Always refresh existing schedules with the current Spec AND Action + // (Args + TaskQueue) on every startup. The previous "skip when + // cron+jitter match" optimization was unsafe: it only diffed Spec + // and never touched Action.Args, so a schedule created on an older + // code revision (when the orchestrator carried a hardcoded + // fallback resource list) kept a now-stale ResourceTypes:null in + // its Args forever. After the orchestrator started rejecting empty + // ResourceTypes (ErrNoResourceTypes), every cron firing failed + // instantly with no log past "Starting orchestrator workflow". + // + // Args are encoded as opaque payloads in the Temporal Schedule, so + // we cannot reliably diff them against cfg.ResourceTypes here. + // One Update RPC per pod startup is a trivial cost compared to the + // outage risk of silent arg drift; rebuild the schedule + // unconditionally and let Temporal handle the no-op case. + // + // We replace the entire Spec rather than mutating fields because + // Temporal parses CronExpressions into Calendars/StructuredCalendar + // server-side on create. On subsequent describes, the cron lives + // in Calendars and CronExpressions comes back empty — mutating + // CronExpressions alone would leave stale calendars in place, + // causing the schedule to fire on both the old and new cadences + // after every restart with a changed cron. err = handle.Update(ctx, client.ScheduleUpdateOptions{ DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { input.Description.Schedule.Spec = &client.ScheduleSpec{ @@ -122,6 +123,9 @@ func (m *Manager) EnsureSchedule(ctx context.Context, cfg Config) error { } if action, ok := input.Description.Schedule.Action.(*client.ScheduleWorkflowAction); ok { action.TaskQueue = cfg.TaskQueue + action.Args = []interface{}{orchestrator.WorkflowInput{ + ResourceTypes: cfg.ResourceTypes, + }} } return &client.ScheduleUpdate{ Schedule: &input.Description.Schedule, @@ -132,7 +136,8 @@ func (m *Manager) EnsureSchedule(ctx context.Context, cfg Config) error { return fmt.Errorf("failed to update schedule %q: %w", cfg.ScheduleID, err) } - fmt.Printf(" Schedule %q updated (cron: %s)\n", cfg.ScheduleID, cfg.CronExpression) + fmt.Printf(" Schedule %q refreshed (cron: %s, resources: %d)\n", + cfg.ScheduleID, cfg.CronExpression, len(cfg.ResourceTypes)) return nil } diff --git a/pkg/schedule/schedule_test.go b/pkg/schedule/schedule_test.go index be46df3..fda0c88 100644 --- a/pkg/schedule/schedule_test.go +++ b/pkg/schedule/schedule_test.go @@ -12,6 +12,7 @@ import ( "go.temporal.io/sdk/temporal" "github.com/block/Version-Guard/pkg/types" + "github.com/block/Version-Guard/pkg/workflow/orchestrator" ) // testResourceTypes is the canonical fixture list for ResourceTypes — @@ -133,7 +134,13 @@ func TestEnsureSchedule_CreatesNew(t *testing.T) { assert.Equal(t, 2*time.Hour, action.WorkflowExecutionTimeout) } -func TestEnsureSchedule_AlreadyExists_SameCron(t *testing.T) { +// TestEnsureSchedule_AlreadyExists_AlwaysUpdates guards the contract +// that an existing schedule is unconditionally refreshed on every +// startup. The previous "skip when cron+jitter match" optimization +// failed to refresh Action.Args, leaving stale ResourceTypes baked +// into pre-existing schedules — see the doc comment on the update +// path in schedule.go for the full incident background. +func TestEnsureSchedule_AlreadyExists_AlwaysUpdates(t *testing.T) { handle := &mockScheduleHandle{ id: "test-schedule", describeOut: &client.ScheduleDescription{ @@ -161,7 +168,8 @@ func TestEnsureSchedule_AlreadyExists_SameCron(t *testing.T) { }) require.NoError(t, err) - assert.False(t, handle.updateCalled, "Update should not be called when cron matches") + assert.True(t, handle.updateCalled, + "Update must always run on existing schedules so Action.Args is refreshed") } func TestEnsureSchedule_AlreadyExists_DifferentCron(t *testing.T) { @@ -308,10 +316,44 @@ func TestEnsureSchedule_Update_ReplacesStaleCalendars(t *testing.T) { assert.Equal(t, 1*time.Minute, captured.Schedule.Spec.Jitter) } -func TestEnsureSchedule_DescribeError(t *testing.T) { +// TestEnsureSchedule_Update_RefreshesActionArgs is the regression +// guard for the silent-arg-drift bug. Before the fix, the update path +// only rewrote Spec and left Action.Args untouched, so a schedule +// created on an older code revision (with empty/stale ResourceTypes) +// kept the stale args forever — every cron firing then failed +// instantly with ErrNoResourceTypes. This test verifies that +// EnsureSchedule overwrites Action.Args (and TaskQueue) with the +// current cfg values whenever it touches an existing schedule. +func TestEnsureSchedule_Update_RefreshesActionArgs(t *testing.T) { + staleResourceTypes := []types.ResourceType{"old-resource-from-prior-revision"} handle := &mockScheduleHandle{ - id: "test-schedule", - describeErr: errors.New("not found"), + id: "test-schedule", + describeOut: &client.ScheduleDescription{ + Schedule: client.Schedule{ + Spec: &client.ScheduleSpec{ + CronExpressions: []string{"0 6 * * *"}, + }, + Action: &client.ScheduleWorkflowAction{ + TaskQueue: "old-task-queue", + Args: []interface{}{ + // Simulate a stale args payload from an earlier + // schedule revision that had different + // ResourceTypes than the current YAML config. + map[string]interface{}{ + "ScanID": "", + "ResourceTypes": staleResourceTypes, + }, + }, + }, + }, + }, + } + var captured *client.ScheduleUpdate + handle.updateFn = func(opts client.ScheduleUpdateOptions) { + input := client.ScheduleUpdateInput{Description: *handle.describeOut} + result, err := opts.DoUpdate(input) + require.NoError(t, err) + captured = result } mock := &mockCreator{ createErr: temporal.ErrScheduleAlreadyRunning, @@ -322,11 +364,20 @@ func TestEnsureSchedule_DescribeError(t *testing.T) { err := mgr.EnsureSchedule(context.Background(), Config{ Enabled: true, ScheduleID: "test-schedule", - CronExpression: "0 */6 * * *", - TaskQueue: "test-queue", + CronExpression: "0 6 * * *", + TaskQueue: "new-task-queue", ResourceTypes: testResourceTypes, }) - require.Error(t, err) - assert.Contains(t, err.Error(), "not found") + require.NoError(t, err) + require.NotNil(t, captured) + action, ok := captured.Schedule.Action.(*client.ScheduleWorkflowAction) + require.True(t, ok, "Action must remain a ScheduleWorkflowAction") + assert.Equal(t, "new-task-queue", action.TaskQueue, + "TaskQueue must be refreshed from current cfg") + require.Len(t, action.Args, 1, "Args must be exactly the orchestrator WorkflowInput") + input, ok := action.Args[0].(orchestrator.WorkflowInput) + require.True(t, ok, "Args[0] must be a typed orchestrator.WorkflowInput, not the stale payload") + assert.Equal(t, testResourceTypes, input.ResourceTypes, + "ResourceTypes must be refreshed from current cfg, not preserved from the stale schedule") }