Skip to content
Draft

Patch #210

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ type DBOSContext interface {
SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow
GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow
Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
Patch(_ DBOSContext, patchName string) (bool, error) // Check if workflow should use patched code
DeprecatePatch(_ DBOSContext, patchName string) (bool, error) // Check if patch should be deprecated (skipped)
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
GetStepID() (int, error) // Get the current step ID (only available within workflows)

Expand Down
41 changes: 41 additions & 0 deletions dbos/system_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ type systemDatabase interface {
// Timers (special steps)
sleep(ctx context.Context, input sleepInput) (time.Duration, error)

// Patches
patch(ctx context.Context, input patchDBInput) (bool, error)
doesPatchExists(ctx context.Context, input patchDBInput) (string, error)

// Queues
dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInput) ([]dequeuedWorkflow, error)
clearQueueAssignment(ctx context.Context, workflowID string) (bool, error)
Expand Down Expand Up @@ -1668,6 +1672,43 @@ func (s *sysDB) sleep(ctx context.Context, input sleepInput) (time.Duration, err
return remainingDuration, nil
}

/****************************************/
/******* PATCHES ********/
/****************************************/

type patchDBInput struct {
workflowID string
stepID int
patchName string
}

func (s *sysDB) doesPatchExists(ctx context.Context, input patchDBInput) (string, error) {
var functionName string
query := fmt.Sprintf(`SELECT function_name FROM %s.operation_outputs WHERE workflow_uuid = $1 AND function_id = $2`, pgx.Identifier{s.schema}.Sanitize())
return functionName, s.pool.QueryRow(ctx, query, input.workflowID, input.stepID).Scan(&functionName)
}

func (s *sysDB) patch(ctx context.Context, input patchDBInput) (bool, error) {
functionName, err := s.doesPatchExists(ctx, input)
if err != nil {
// No result means this is a new workflow, or an existing workflow that has not reached this step yet
// Insert the patch marker and return true
if err == pgx.ErrNoRows {
insertQuery := fmt.Sprintf(`INSERT INTO %s.operation_outputs (workflow_uuid, function_id, function_name) VALUES ($1, $2, $3)`, pgx.Identifier{s.schema}.Sanitize())
_, err = s.pool.Exec(ctx, insertQuery, input.workflowID, input.stepID, input.patchName)
if err != nil {
return false, fmt.Errorf("failed to insert patch marker: %w", err)
}
return true, nil
}
return false, fmt.Errorf("failed to check for patch: %w", err)
}

// If functionName != patchName, this is a workflow that existed before the patch was applied
// Else this a new (patched) workflow that is being re-executed (e.g., recovery, or forked at a later step)
return functionName == input.patchName, nil
}

/****************************************/
/******* WORKFLOW COMMUNICATIONS ********/
/****************************************/
Expand Down
100 changes: 99 additions & 1 deletion dbos/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/robfig/cron/v3"
)

Expand Down Expand Up @@ -577,7 +578,7 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], opts ...

// If this is a scheduled workflow, register a cron job
if registrationParams.cronSchedule != "" {
if reflect.TypeOf(p) != reflect.TypeOf(time.Time{}) {
if reflect.TypeOf(p) != reflect.TypeFor[time.Time]() {
panic(fmt.Sprintf("scheduled workflow function must accept a time.Time as input, got %T", p))
}
registerScheduledWorkflow(ctx, fqn, typedErasedWorkflow, registrationParams.cronSchedule)
Expand Down Expand Up @@ -1646,6 +1647,103 @@ func Sleep(ctx DBOSContext, duration time.Duration) (time.Duration, error) {
return ctx.Sleep(ctx, duration)
}

func (c *dbosContext) Patch(_ DBOSContext, patchName string) (bool, error) {
if patchName == "" {
return false, errors.New("patch name cannot be empty")
}

// Get workflow state to determine current step ID
wfState, ok := c.Value(workflowStateKey).(*workflowState)
if !ok || wfState == nil {
return false, errors.New("patch can only be called within a workflow")
}

if wfState.isWithinStep {
return false, newStepExecutionError(wfState.workflowID, patchName, fmt.Errorf("cannot call Patch within a step"))
}

patched, err := retryWithResult(c, func() (bool, error) {
return c.systemDB.patch(c, patchDBInput{
workflowID: wfState.workflowID,
stepID: wfState.stepID + 1, // We are checking if the upcoming step should use the patched code
patchName: patchName,
})
}, withRetrierLogger(c.logger))

if patched && err == nil {
// The patch take its own step ID
wfState.nextStepID()
}

return patched, err
}

// Patch checks if the current workflow should use patched code.
// Returns true if the workflow should use new code, false if it should use old code.
//
// The patch system allows modifying code while long-lived workflows are running:
// - Existing workflows that already passed this patch point continue with old code
// - New workflows use new code
// - Workflows that started but haven't reached this point yet use new code
//
// Example:
//
// if dbos.Patch(ctx, "my-patch") {
// // New code path
// } else {
// // Old code path
// }
func Patch(ctx DBOSContext, patchName string) (bool, error) {
if ctx == nil {
return false, errors.New("ctx cannot be nil")
}
return ctx.Patch(ctx, patchName)
}

func (c *dbosContext) DeprecatePatch(_ DBOSContext, patchName string) (bool, error) {
if patchName == "" {
return false, errors.New("patch name cannot be empty")
}

// Get workflow state to determine current step ID
wfState, ok := c.Value(workflowStateKey).(*workflowState)
if !ok || wfState == nil {
return false, errors.New("deprecate patch can only be called within a workflow")
}

if wfState.isWithinStep {
return false, newStepExecutionError(wfState.workflowID, patchName, fmt.Errorf("cannot call DeprecatePatch within a step"))
}

patchNameFromDB, err := retryWithResult(c, func() (string, error) {
return c.systemDB.doesPatchExists(c, patchDBInput{
workflowID: wfState.workflowID,
stepID: wfState.stepID + 1,
patchName: patchName,
})
}, withRetrierLogger(c.logger))

if patchNameFromDB != patchName || err == pgx.ErrNoRows {
return true, nil
}
wfState.nextStepID()
return false, err
}

// DeprecatePatch allows removing patches from code while ensuring the correct history
// of workflows that were executing before the patch was deprecated.
//
// Example:
//
// dbos.DeprecatePatch(ctx, "my-patch")
// // New code path
func DeprecatePatch(ctx DBOSContext, patchName string) (bool, error) {
if ctx == nil {
return false, errors.New("ctx cannot be nil")
}
return ctx.DeprecatePatch(ctx, patchName)
}

/***********************************/
/******* WORKFLOW MANAGEMENT *******/
/***********************************/
Expand Down
152 changes: 152 additions & 0 deletions dbos/workflows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4610,3 +4610,155 @@ func TestWorkflowHandleContextCancel(t *testing.T) {
"expected error to be detectable as context.Canceled, got: %v", err)
})
}

func TestPatching(t *testing.T) {
step := func(input int) (int, error) {
return input + 1, nil
}

stepPatched := func(input int) (int, error) {
return input + 2, nil
}

wf := func(ctx DBOSContext, input int) (int, error) {
// step < step to patch
RunAsStep(ctx, func(ctx context.Context) (int, error) {
return step(input)
})
// step to patch
res, _ := RunAsStep(ctx, func(ctx context.Context) (int, error) {
return step(input)
}, WithStepName("patch-step"))
// step > step to patch
RunAsStep(ctx, func(ctx context.Context) (int, error) {
return step(input)
})
return res, nil
}

dbosCtx := setupDBOS(t, true, true)
RegisterWorkflow(dbosCtx, wf, WithWorkflowName("wf"))

handle, err := RunWorkflow(dbosCtx, wf, 1)
require.NoError(t, err, "failed to start workflow")
result, err := handle.GetResult()
require.NoError(t, err, "failed to get result")
require.Equal(t, 2, result, "expected result to be 2")

wfPatched := func(ctx DBOSContext, input int) (int, error) {
// step < step to patch
RunAsStep(ctx, func(ctx context.Context) (int, error) {
return step(input)
})

// step to patch
patched, err := Patch(ctx, "my-patch")
if err != nil {
return 0, err
}
var res int
if patched {
res, _ = RunAsStep(ctx, func(ctx context.Context) (int, error) {
return stepPatched(input)
}, WithStepName("patched-step"))
} else {
res, _ = RunAsStep(ctx, func(ctx context.Context) (int, error) {
return step(input)
}, WithStepName("patch-step"))
}

// step > step to patch
RunAsStep(ctx, func(ctx context.Context) (int, error) {
return step(input)
})

return res, nil
}

// (hack) Clear the context registry, reset is_launched, and re-gister the patched wf with the same name
dbosCtx.(*dbosContext).workflowRegistry.Clear()
dbosCtx.(*dbosContext).workflowCustomNametoFQN.Clear()
RegisterWorkflow(dbosCtx, wfPatched, WithWorkflowName("wf"))
dbosCtx.Launch()

// new invocation takes the new code and has the patch step recorded
patchedHandle, err := RunWorkflow(dbosCtx, wfPatched, 1)
require.NoError(t, err, "failed to start workflow")
result, err = patchedHandle.GetResult()
require.NoError(t, err, "failed to get result")
require.Equal(t, 3, result, "expected result to be 3")
steps, err := GetWorkflowSteps(dbosCtx, patchedHandle.GetWorkflowID())
require.NoError(t, err, "failed to get workflow steps")
require.Equal(t, 4, len(steps), "expected 4 steps")
require.Equal(t, "my-patch", steps[1].StepName, "expected step name to be my-patch")

// Fork the workflow at different steps and verify behavior
// Steps 0 and 1 should take the new code (patched), step 2 should take the old code
for startStep := 0; startStep <= 2; startStep++ {
forkHandle, err := ForkWorkflow[int](dbosCtx, ForkWorkflowInput{
OriginalWorkflowID: handle.GetWorkflowID(),
StartStep: uint(startStep),
})
require.NoError(t, err, "failed to fork workflow at step %d", startStep)
result, err := forkHandle.GetResult()
require.NoError(t, err, "failed to get result for fork at step %d", startStep)
steps, err := GetWorkflowSteps(dbosCtx, forkHandle.GetWorkflowID())
require.NoError(t, err, "failed to get workflow steps for fork at step %d", startStep)

if startStep < 2 {
// Forking before step 2 should take the new code
require.Equal(t, 3, result, "expected result to be 3 when forking at step %d", startStep)
require.Equal(t, 4, len(steps), "expected 4 steps when forking at step %d", startStep)
require.Equal(t, "my-patch", steps[1].StepName, "expected step name to be my-patch when forking at step %d", startStep)
} else {
// Forking at step 2 should take the old code
require.Equal(t, 2, result, "expected result to be 2 when forking at step %d", startStep)
require.Equal(t, 3, len(steps), "expected 3 steps when forking at step %d", startStep)
}
}

wfDeprecatePatch := func(ctx DBOSContext, input int) (int, error) {
RunAsStep(ctx, func(ctx context.Context) (int, error) {
return step(input)
})
DeprecatePatch(ctx, "my-patch")
res, err := RunAsStep(ctx, func(ctx context.Context) (int, error) {
return stepPatched(input)
}, WithStepName("patched-step"))
fmt.Println("res", res, "err", err)
RunAsStep(ctx, func(ctx context.Context) (int, error) {
return step(input)
})
return res, nil
}

// (hack) Clear the context registry, reset is_launched, and register the deprecated wf with the same name
dbosCtx.(*dbosContext).workflowRegistry.Clear()
dbosCtx.(*dbosContext).workflowCustomNametoFQN.Clear()
dbosCtx.(*dbosContext).launched.Store(false)
RegisterWorkflow(dbosCtx, wfDeprecatePatch, WithWorkflowName("wf"))

// deprecated invocation skips the patch deprecation entirely
deprecatedHandle, err := RunWorkflow(dbosCtx, wfDeprecatePatch, 1)
require.NoError(t, err, "failed to start workflow")
result, err = deprecatedHandle.GetResult()
require.NoError(t, err, "failed to get result")
require.Equal(t, 3, result, "expected result to be 3")
steps, err = GetWorkflowSteps(dbosCtx, deprecatedHandle.GetWorkflowID())
require.NoError(t, err, "failed to get workflow steps")
require.Equal(t, 3, len(steps), "expected 3 steps")

// Forking an old workflow (patch-time), _after_ the patch step, on the new code should work without underminism error
forkHandle, err := ForkWorkflow[int](dbosCtx, ForkWorkflowInput{
OriginalWorkflowID: patchedHandle.GetWorkflowID(),
StartStep: 3,
})
require.NoError(t, err, "failed to fork workflow")
result, err = forkHandle.GetResult()
require.NoError(t, err, "failed to get result")
require.Equal(t, 3, result, "expected result to be 3")
steps, err = GetWorkflowSteps(dbosCtx, forkHandle.GetWorkflowID())
require.NoError(t, err, "failed to get workflow steps")
require.Equal(t, 4, len(steps), "expected 4 steps")
require.Equal(t, "my-patch", steps[1].StepName, "expected step name to be my-patch")
}