Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* direct: Fix resolving a resource reference that is used more than once within the same field ([#5558](https://github.com/databricks/cli/pull/5558)).
* Bundle variable references now accept Unicode letters in path segments (e.g. `${var.变量}`). ([#5532](https://github.com/databricks/cli/pull/5532))
* Ignore remote changes for vector search direct_access_index_spec.schema_json to prevent drift when the backend normalizes the schema ([#5481](https://github.com/databricks/cli/pull/5481)).
* engine/direct: Fix WAL corruption after two consecutive failed deploys ([#5557](https://github.com/databricks/cli/issues/5557)).

### Dependency updates

Expand Down
14 changes: 14 additions & 0 deletions acceptance/bundle/deploy/wal/two-crashed-deploys/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
bundle:
name: wal-two-crashed-deploys

resources:
jobs:
test_job:
name: "test-job"
tasks:
- task_key: "test-task"
spark_python_task:
python_file: ./test.py
new_cluster:
spark_version: 15.4.x-scala2.12
node_type_id: i3.xlarge

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions acceptance/bundle/deploy/wal/two-crashed-deploys/output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@

=== First deploy (killed before recording the job, leaves a header-only WAL)
>>> errcode [CLI] bundle deploy
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-two-crashed-deploys/default/files...
Deploying resources...
[PROCESS_KILLED]

Exit code: [KILLED]

>>> cat .databricks/bundle/default/resources.json.wal
{"state_version":2,"cli_version":"[DEV_VERSION]","lineage":"[UUID]","serial":1}

=== Second deploy (killed again, leaves another header-only WAL)
>>> errcode [CLI] bundle deploy --force-lock
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-two-crashed-deploys/default/files...
Deploying resources...
[PROCESS_KILLED]

Exit code: [KILLED]

>>> cat .databricks/bundle/default/resources.json.wal
{"state_version":2,"cli_version":"[DEV_VERSION]","lineage":"[UUID]","serial":1}

=== Third deploy (must recover and succeed, not blocked by the leftover WAL)
>>> errcode [CLI] bundle deploy --force-lock
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-two-crashed-deploys/default/files...
Deploying resources...
Updating deployment state...
Deployment complete!

>>> errcode assert_not_exists.py .databricks/bundle/default/resources.json.wal

>>> errcode cat .databricks/bundle/default/resources.json
{
"serial": 1,
"state_keys": [
"resources.jobs.test_job"
]
}
22 changes: 22 additions & 0 deletions acceptance/bundle/deploy/wal/two-crashed-deploys/script
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Two consecutive deploys are killed mid-apply, after UpgradeToWrite has written
# the WAL header but before Finalize runs (killed on the jobs/create call, before
# the job's state is recorded). A kill cannot be prevented by bailing out early,
# so each crash leaves a header-only WAL behind. Recovery must discard those
# no-op WALs without advancing the serial; otherwise the second crash would write
# its WAL header two serials ahead of the committed state and block every later
# command. Regression test for the dstate recovery fix.
kill_after.py "POST /api/2.2/jobs/create" 0 2

title "First deploy (killed before recording the job, leaves a header-only WAL)"
trace errcode $CLI bundle deploy
trace cat .databricks/bundle/default/resources.json.wal

title "Second deploy (killed again, leaves another header-only WAL)"
trace errcode $CLI bundle deploy --force-lock
trace cat .databricks/bundle/default/resources.json.wal

title "Third deploy (must recover and succeed, not blocked by the leftover WAL)"
trace errcode $CLI bundle deploy --force-lock

trace errcode assert_not_exists.py .databricks/bundle/default/resources.json.wal
trace errcode cat .databricks/bundle/default/resources.json | jq -S '{serial: .serial, state_keys: (.state | keys)}'
1 change: 1 addition & 0 deletions acceptance/bundle/deploy/wal/two-crashed-deploys/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
print("test")
14 changes: 14 additions & 0 deletions acceptance/bundle/deploy/wal/two-failed-deploys/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
bundle:
name: wal-two-failed-deploys

resources:
jobs:
test_job:
name: "test-job"
tasks:
- task_key: "test-task"
spark_python_task:
python_file: ./test.py
new_cluster:
spark_version: 15.4.x-scala2.12
node_type_id: i3.xlarge
3 changes: 3 additions & 0 deletions acceptance/bundle/deploy/wal/two-failed-deploys/out.test.toml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 48 additions & 0 deletions acceptance/bundle/deploy/wal/two-failed-deploys/output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@

=== Deploy 1 (normal: creates the job and the committed state)
>>> [CLI] bundle deploy
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-two-failed-deploys/default/files...
Deploying resources...
Updating deployment state...
Deployment complete!

=== Deploy 2 (planning fails, must not leave a WAL)
>>> errcode [CLI] bundle deploy
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-two-failed-deploys/default/files...
Error: cannot plan resources.jobs.test_job: reading id="[NUMID]": Fault injected by test. (403 INJECTED)

Endpoint: GET [DATABRICKS_URL]/api/2.2/jobs/get?job_id=[NUMID]
HTTP Status: 403 Forbidden
API error_code: INJECTED
API message: Fault injected by test.

Error: planning failed


Exit code: 1

>>> assert_not_exists.py .databricks/bundle/default/resources.json.wal

=== Deploy 3 (planning fails again, must not leave a WAL)
>>> errcode [CLI] bundle deploy
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-two-failed-deploys/default/files...
Error: cannot plan resources.jobs.test_job: reading id="[NUMID]": Fault injected by test. (403 INJECTED)

Endpoint: GET [DATABRICKS_URL]/api/2.2/jobs/get?job_id=[NUMID]
HTTP Status: 403 Forbidden
API error_code: INJECTED
API message: Fault injected by test.

Error: planning failed


Exit code: 1

>>> assert_not_exists.py .databricks/bundle/default/resources.json.wal

=== Deploy 4 (fault expired: recovers and succeeds)
>>> [CLI] bundle deploy
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-two-failed-deploys/default/files...
Deploying resources...
Updating deployment state...
Deployment complete!
29 changes: 29 additions & 0 deletions acceptance/bundle/deploy/wal/two-failed-deploys/script
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# A failed plan must not leave a write-ahead log behind, so repeated planning
# failures never block a later, healthy deploy. Previously a failed plan still
# opened the WAL for write (UpgradeToWrite) and returned without finalizing,
# leaving a header-only WAL; after two failures the WAL serial drifted two ahead
# of the committed serial and every later command failed WAL recovery until the
# WAL was deleted by hand.
#
# A first deploy creates the job normally. An injected fault then makes the next
# two deploys fail while planning (planning refreshes the existing job via
# jobs/get). The final deploy, with the fault expired, must recover and succeed.
# A non-retried 403 is used so the failure is immediate; a 5xx would be retried
# with backoff.

title "Deploy 1 (normal: creates the job and the committed state)"
trace $CLI bundle deploy

# Fail the plan-stage refresh GET for the next two deploys only.
fault.py "GET /api/2.2/jobs/get" 403 0 2

title "Deploy 2 (planning fails, must not leave a WAL)"
trace errcode $CLI bundle deploy
trace assert_not_exists.py .databricks/bundle/default/resources.json.wal

title "Deploy 3 (planning fails again, must not leave a WAL)"
trace errcode $CLI bundle deploy
trace assert_not_exists.py .databricks/bundle/default/resources.json.wal

title "Deploy 4 (fault expired: recovers and succeeds)"
trace $CLI bundle deploy
1 change: 1 addition & 0 deletions acceptance/bundle/deploy/wal/two-failed-deploys/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
print("test")
17 changes: 15 additions & 2 deletions bundle/direct/dstate/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func (db *DeploymentState) mergeWalIntoState(ctx context.Context) (bool, error)
scanner.Buffer(make([]byte, 0, initialBufferSize), maxWalEntrySize)
lineNumber := 0
var corruptedLines [][]byte
var newSerial int

for scanner.Scan() {
lineNumber++
Expand All @@ -309,7 +310,7 @@ func (db *DeploymentState) mergeWalIntoState(ctx context.Context) (bool, error)
if header.Serial > expectedSerial {
return false, fmt.Errorf("WAL serial (%d) is ahead of expected (%d), state may be corrupted", header.Serial, expectedSerial)
}
db.Data.Serial = expectedSerial
newSerial = header.Serial
} else {
var entry WALEntry
if err := json.Unmarshal(line, &entry); err != nil {
Expand Down Expand Up @@ -344,7 +345,19 @@ func (db *DeploymentState) mergeWalIntoState(ctx context.Context) (bool, error)
}
}

return lineNumber > 1, nil
hasEntries := lineNumber > 1

// Only advance the serial when the WAL carried entries, because the caller
// (replayWAL) persists the new state file only in that case. A header-only
// WAL is a deploy that started but committed nothing; advancing the serial
// for it leaves the in-memory serial ahead of the persisted one, so the
// next deploy writes its WAL header at serial+2 and recovery rejects it as
// "ahead of expected". See acceptance/bundle/deploy/wal/two-crashed-deploys.
if hasEntries {
db.Data.Serial = newSerial
}

return hasEntries, nil
}

// Finalize replays the WAL (if open for write), captures the resulting state, and resets.
Expand Down
35 changes: 35 additions & 0 deletions bundle/direct/dstate/state_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dstate

import (
"encoding/json"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -55,6 +56,40 @@ func TestPanicOnDoubleOpen(t *testing.T) {
mustFinalize(t, &db)
}

func TestHeaderOnlyWALRecoveryDoesNotAdvanceSerial(t *testing.T) {
path := filepath.Join(t.TempDir(), "state.json")
walPath := path + walSuffix

// Commit serial 1 with one resource.
var db DeploymentState
require.NoError(t, db.Open(t.Context(), path, WithRecovery(true), WithWrite(true)))
require.NoError(t, db.SaveState("jobs.my_job", "123", map[string]string{}, nil))
mustFinalize(t, &db)

var committed DeploymentState
require.NoError(t, committed.Open(t.Context(), path, WithRecovery(false), WithWrite(false)))
lineage := committed.Data.Lineage
require.Equal(t, 1, committed.Data.Serial)
mustFinalize(t, &committed)

// A deploy that opens the WAL for write but commits nothing (e.g. planning
// fails after UpgradeToWrite) leaves a header-only WAL behind, here at the
// expected serial 2. Recovering it must not advance the serial past the
// committed 1, otherwise a second such failed deploy would write its header
// at serial 3 and be rejected as ahead of the committed state.
header := Header{Lineage: lineage, Serial: 2, StateVersion: currentStateVersion}
headerLine, err := json.Marshal(header)
require.NoError(t, err)
require.NoError(t, os.WriteFile(walPath, append(headerLine, '\n'), 0o600))

var recovered DeploymentState
require.NoError(t, recovered.Open(t.Context(), path, WithRecovery(true), WithWrite(false)))
assert.Equal(t, 1, recovered.Data.Serial)
assert.Equal(t, "123", recovered.GetResourceID("jobs.my_job"))
assert.NoFileExists(t, walPath)
mustFinalize(t, &recovered)
}

func TestDeleteState(t *testing.T) {
path := filepath.Join(t.TempDir(), "state.json")

Expand Down
10 changes: 10 additions & 0 deletions bundle/phases/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand
plan = RunPlan(ctx, b, engine)
}

// Stop before opening the WAL for write if planning failed. UpgradeToWrite
// writes a WAL header that only deployCore's Finalize commits or discards;
// returning past it without finalizing leaves a header-only WAL behind.
if logdiag.HasError(ctx) {
return
}

if engine.IsDirect() {
// Upgrade from read (opened by process.go) to write mode
if err := b.DeploymentBundle.StateDB.UpgradeToWrite(); err != nil {
Expand All @@ -187,6 +194,9 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand
}
}

// InitForApply receives ctx and could log a diagnostic without returning an
// error, so re-check before deploying. (UpgradeToWrite above takes no ctx and
// thus cannot log, so the earlier check is enough to guard the WAL open.)
if logdiag.HasError(ctx) {
return
}
Expand Down
Loading