Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 28 additions & 23 deletions pkg/schedule/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,29 +91,30 @@ func (m *Manager) EnsureSchedule(ctx context.Context, cfg Config) error {
}

handle := m.scheduleClient.GetHandle(ctx, cfg.ScheduleID)
desc, err := handle.Describe(ctx)
if err != nil {
return fmt.Errorf("failed to describe existing schedule %q: %w", cfg.ScheduleID, err)
}

// Check if the cron expression or jitter has changed
existingSpec := desc.Schedule.Spec
if existingSpec == nil {
existingSpec = &client.ScheduleSpec{}
}
existingCrons := existingSpec.CronExpressions
if len(existingCrons) == 1 && existingCrons[0] == cfg.CronExpression && existingSpec.Jitter == cfg.Jitter {
fmt.Printf(" Schedule %q already configured (cron: %s)\n", cfg.ScheduleID, cfg.CronExpression)
return nil
}

// Update the schedule with the new spec.
// We replace the entire Spec rather than mutating fields because Temporal
// parses CronExpressions into Calendars/StructuredCalendar server-side on
// create. On subsequent describes, the cron lives in Calendars and
// CronExpressions comes back empty — mutating CronExpressions alone would
// leave stale calendars in place, causing the schedule to fire on both
// the old and new cadences after every restart with a changed cron.
// Always refresh existing schedules with the current Spec AND Action
// (Args + TaskQueue) on every startup. The previous "skip when
// cron+jitter match" optimization was unsafe: it only diffed Spec
// and never touched Action.Args, so a schedule created on an older
// code revision (when the orchestrator carried a hardcoded
// fallback resource list) kept a now-stale ResourceTypes:null in
// its Args forever. After the orchestrator started rejecting empty
// ResourceTypes (ErrNoResourceTypes), every cron firing failed
// instantly with no log past "Starting orchestrator workflow".
//
// Args are encoded as opaque payloads in the Temporal Schedule, so
// we cannot reliably diff them against cfg.ResourceTypes here.
// One Update RPC per pod startup is a trivial cost compared to the
// outage risk of silent arg drift; rebuild the schedule
// unconditionally and let Temporal handle the no-op case.
//
// We replace the entire Spec rather than mutating fields because
// Temporal parses CronExpressions into Calendars/StructuredCalendar
// server-side on create. On subsequent describes, the cron lives
// in Calendars and CronExpressions comes back empty — mutating
// CronExpressions alone would leave stale calendars in place,
// causing the schedule to fire on both the old and new cadences
// after every restart with a changed cron.
err = handle.Update(ctx, client.ScheduleUpdateOptions{
DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
input.Description.Schedule.Spec = &client.ScheduleSpec{
Expand All @@ -122,6 +123,9 @@ func (m *Manager) EnsureSchedule(ctx context.Context, cfg Config) error {
}
if action, ok := input.Description.Schedule.Action.(*client.ScheduleWorkflowAction); ok {
action.TaskQueue = cfg.TaskQueue
action.Args = []interface{}{orchestrator.WorkflowInput{
ResourceTypes: cfg.ResourceTypes,
}}
}
return &client.ScheduleUpdate{
Schedule: &input.Description.Schedule,
Expand All @@ -132,7 +136,8 @@ func (m *Manager) EnsureSchedule(ctx context.Context, cfg Config) error {
return fmt.Errorf("failed to update schedule %q: %w", cfg.ScheduleID, err)
}

fmt.Printf(" Schedule %q updated (cron: %s)\n", cfg.ScheduleID, cfg.CronExpression)
fmt.Printf(" Schedule %q refreshed (cron: %s, resources: %d)\n",
cfg.ScheduleID, cfg.CronExpression, len(cfg.ResourceTypes))
return nil
}

Expand Down
69 changes: 60 additions & 9 deletions pkg/schedule/schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.temporal.io/sdk/temporal"

"github.com/block/Version-Guard/pkg/types"
"github.com/block/Version-Guard/pkg/workflow/orchestrator"
)

// testResourceTypes is the canonical fixture list for ResourceTypes —
Expand Down Expand Up @@ -133,7 +134,13 @@ func TestEnsureSchedule_CreatesNew(t *testing.T) {
assert.Equal(t, 2*time.Hour, action.WorkflowExecutionTimeout)
}

func TestEnsureSchedule_AlreadyExists_SameCron(t *testing.T) {
// TestEnsureSchedule_AlreadyExists_AlwaysUpdates guards the contract
// that an existing schedule is unconditionally refreshed on every
// startup. The previous "skip when cron+jitter match" optimization
// failed to refresh Action.Args, leaving stale ResourceTypes baked
// into pre-existing schedules — see the doc comment on the update
// path in schedule.go for the full incident background.
func TestEnsureSchedule_AlreadyExists_AlwaysUpdates(t *testing.T) {
handle := &mockScheduleHandle{
id: "test-schedule",
describeOut: &client.ScheduleDescription{
Expand Down Expand Up @@ -161,7 +168,8 @@ func TestEnsureSchedule_AlreadyExists_SameCron(t *testing.T) {
})

require.NoError(t, err)
assert.False(t, handle.updateCalled, "Update should not be called when cron matches")
assert.True(t, handle.updateCalled,
"Update must always run on existing schedules so Action.Args is refreshed")
}

func TestEnsureSchedule_AlreadyExists_DifferentCron(t *testing.T) {
Expand Down Expand Up @@ -308,10 +316,44 @@ func TestEnsureSchedule_Update_ReplacesStaleCalendars(t *testing.T) {
assert.Equal(t, 1*time.Minute, captured.Schedule.Spec.Jitter)
}

func TestEnsureSchedule_DescribeError(t *testing.T) {
// TestEnsureSchedule_Update_RefreshesActionArgs is the regression
// guard for the silent-arg-drift bug. Before the fix, the update path
// only rewrote Spec and left Action.Args untouched, so a schedule
// created on an older code revision (with empty/stale ResourceTypes)
// kept the stale args forever — every cron firing then failed
// instantly with ErrNoResourceTypes. This test verifies that
// EnsureSchedule overwrites Action.Args (and TaskQueue) with the
// current cfg values whenever it touches an existing schedule.
func TestEnsureSchedule_Update_RefreshesActionArgs(t *testing.T) {
staleResourceTypes := []types.ResourceType{"old-resource-from-prior-revision"}
handle := &mockScheduleHandle{
id: "test-schedule",
describeErr: errors.New("not found"),
id: "test-schedule",
describeOut: &client.ScheduleDescription{
Schedule: client.Schedule{
Spec: &client.ScheduleSpec{
CronExpressions: []string{"0 6 * * *"},
},
Action: &client.ScheduleWorkflowAction{
TaskQueue: "old-task-queue",
Args: []interface{}{
// Simulate a stale args payload from an earlier
// schedule revision that had different
// ResourceTypes than the current YAML config.
map[string]interface{}{
"ScanID": "",
"ResourceTypes": staleResourceTypes,
},
},
},
},
},
}
var captured *client.ScheduleUpdate
handle.updateFn = func(opts client.ScheduleUpdateOptions) {
input := client.ScheduleUpdateInput{Description: *handle.describeOut}
result, err := opts.DoUpdate(input)
require.NoError(t, err)
captured = result
}
mock := &mockCreator{
createErr: temporal.ErrScheduleAlreadyRunning,
Expand All @@ -322,11 +364,20 @@ func TestEnsureSchedule_DescribeError(t *testing.T) {
err := mgr.EnsureSchedule(context.Background(), Config{
Enabled: true,
ScheduleID: "test-schedule",
CronExpression: "0 */6 * * *",
TaskQueue: "test-queue",
CronExpression: "0 6 * * *",
TaskQueue: "new-task-queue",
ResourceTypes: testResourceTypes,
})

require.Error(t, err)
assert.Contains(t, err.Error(), "not found")
require.NoError(t, err)
require.NotNil(t, captured)
action, ok := captured.Schedule.Action.(*client.ScheduleWorkflowAction)
require.True(t, ok, "Action must remain a ScheduleWorkflowAction")
assert.Equal(t, "new-task-queue", action.TaskQueue,
"TaskQueue must be refreshed from current cfg")
require.Len(t, action.Args, 1, "Args must be exactly the orchestrator WorkflowInput")
input, ok := action.Args[0].(orchestrator.WorkflowInput)
require.True(t, ok, "Args[0] must be a typed orchestrator.WorkflowInput, not the stale payload")
assert.Equal(t, testResourceTypes, input.ResourceTypes,
"ResourceTypes must be refreshed from current cfg, not preserved from the stale schedule")
}
Loading