Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5896ae4
Fs cp now upload in parallel
renaudhartert-db Dec 11, 2025
b933398
Improve performance
renaudhartert-db Dec 11, 2025
2fa03e1
Use error value
renaudhartert-db Dec 11, 2025
d393388
Cancel go routines
renaudhartert-db Dec 11, 2025
14cfad9
Properly propagate context
renaudhartert-db Dec 11, 2025
2a9ec1b
Linter
renaudhartert-db Dec 11, 2025
179a944
Test context cancellation
renaudhartert-db Dec 12, 2025
79dd600
Clarified rationale
renaudhartert-db Dec 12, 2025
2d1f19d
Gracefully wait for cancellation to complete
renaudhartert-db Dec 12, 2025
caaff68
Merge branch 'main' into renaud-hartert_data/fs-cp-fast
renaudhartert-db Dec 13, 2025
12dec5c
Add acceptance tests for fs cp
renaudhartert-db Dec 15, 2025
2be2174
Merge branch 'renaud-hartert_data/fs-cp-fast' of github.com-ghec:data…
renaudhartert-db Dec 15, 2025
7db079d
Merge branch 'main' into renaud-hartert_data/fs-cp-fast
renaudhartert-db Dec 16, 2025
e720670
Merge branch 'main' into renaud-hartert_data/fs-cp-fast
renaudhartert-db Dec 16, 2025
4029571
Merge branch 'main' into renaud-hartert_data/fs-cp-fast
renaudhartert-db Dec 29, 2025
6a8077b
Address feedback
renaudhartert-db Jan 8, 2026
1b0ecb2
Merge branch 'renaud-hartert_data/fs-cp-fast' of github.com-ghec:data…
renaudhartert-db Jan 8, 2026
8d0ede6
Enable cloud testing
renaudhartert-db Jan 9, 2026
4bbbe57
Fix whitespace: add trailing newlines to acceptance test files
renaudhartert-db Jan 9, 2026
1f331ff
Fix acceptance test output formatting
renaudhartert-db Jan 9, 2026
f8bf48a
Fix test server HEAD handler for volumes with file extension heuristic
renaudhartert-db Jan 9, 2026
22da2b3
Improve documentation for test server file extension heuristic
renaudhartert-db Jan 9, 2026
674fa2e
Improve test server HEAD handler with volume-specific heuristic
renaudhartert-db Jan 9, 2026
fdfc0e1
Improve test server HEAD handler for Unity Catalog Volumes
renaudhartert-db Jan 9, 2026
2dc735b
Merge remote changes, keep improved HEAD handler logic
renaudhartert-db Jan 9, 2026
ff6c248
Merge branch 'main' into renaud-hartert_data/fs-cp-fast
renaudhartert-db Jan 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ To disable this, set the environment variable DATABRICKS_CACHE_ENABLED to false.

### CLI

* Improve performance of `databricks fs cp` command by parallelizing file uploads when
copying directories with the `--recursive` flag.

### Bundles
* Enable caching user identity by default ([#4202](https://github.com/databricks/cli/pull/4202))

Expand Down
1 change: 1 addition & 0 deletions acceptance/cmd/fs/cp/dir-to-dir/localdir/file1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
file1 content
1 change: 1 addition & 0 deletions acceptance/cmd/fs/cp/dir-to-dir/localdir/file2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
file2 content
5 changes: 5 additions & 0 deletions acceptance/cmd/fs/cp/dir-to-dir/out.test.toml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions acceptance/cmd/fs/cp/dir-to-dir/output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

>>> [CLI] fs cp -r localdir dbfs:/Volumes/main/default/data/uploaded-dir
localdir/file1.txt -> dbfs:/Volumes/main/default/data/uploaded-dir/file1.txt
localdir/file2.txt -> dbfs:/Volumes/main/default/data/uploaded-dir/file2.txt

>>> [CLI] fs cat dbfs:/Volumes/main/default/data/uploaded-dir/file1.txt
file1 content

>>> [CLI] fs cat dbfs:/Volumes/main/default/data/uploaded-dir/file2.txt
file2 content
9 changes: 9 additions & 0 deletions acceptance/cmd/fs/cp/dir-to-dir/script
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Create parent directory.
$CLI fs mkdir dbfs:/Volumes/main/default/data

# Recursive directory copy (output sorted for deterministic ordering).
trace $CLI fs cp -r localdir dbfs:/Volumes/main/default/data/uploaded-dir 2>&1 | sort

# Verify files were uploaded correctly.
trace $CLI fs cat dbfs:/Volumes/main/default/data/uploaded-dir/file1.txt
trace $CLI fs cat dbfs:/Volumes/main/default/data/uploaded-dir/file2.txt
2 changes: 2 additions & 0 deletions acceptance/cmd/fs/cp/dir-to-dir/test.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Local = true
Cloud = true
1 change: 1 addition & 0 deletions acceptance/cmd/fs/cp/file-to-dir/local.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
hello world!
5 changes: 5 additions & 0 deletions acceptance/cmd/fs/cp/file-to-dir/out.test.toml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions acceptance/cmd/fs/cp/file-to-dir/output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

>>> [CLI] fs cp local.txt dbfs:/Volumes/main/default/data/mydir/
local.txt -> dbfs:/Volumes/main/default/data/mydir/local.txt

>>> [CLI] fs cat dbfs:/Volumes/main/default/data/mydir/local.txt
hello world!
8 changes: 8 additions & 0 deletions acceptance/cmd/fs/cp/file-to-dir/script
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Create target directory.
$CLI fs mkdir dbfs:/Volumes/main/default/data/mydir

# Copy file into a directory (trailing slash indicates directory target).
trace $CLI fs cp local.txt dbfs:/Volumes/main/default/data/mydir/

# Verify file was uploaded correctly.
trace $CLI fs cat dbfs:/Volumes/main/default/data/mydir/local.txt
2 changes: 2 additions & 0 deletions acceptance/cmd/fs/cp/file-to-dir/test.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Local = true
Cloud = true
1 change: 1 addition & 0 deletions acceptance/cmd/fs/cp/file-to-file/local.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
hello world!
5 changes: 5 additions & 0 deletions acceptance/cmd/fs/cp/file-to-file/out.test.toml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions acceptance/cmd/fs/cp/file-to-file/output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@

>>> [CLI] fs cp local.txt dbfs:/Volumes/main/default/data/uploaded.txt
local.txt -> dbfs:/Volumes/main/default/data/uploaded.txt

>>> [CLI] fs cat dbfs:/Volumes/main/default/data/uploaded.txt
hello world!

>>> [CLI] fs cp dbfs:/Volumes/main/default/data/uploaded.txt downloaded.txt
dbfs:/Volumes/main/default/data/uploaded.txt -> downloaded.txt

>>> cat downloaded.txt
hello world!
15 changes: 15 additions & 0 deletions acceptance/cmd/fs/cp/file-to-file/script
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Create parent directory.
$CLI fs mkdir dbfs:/Volumes/main/default/data

# Upload local file to volume.
trace $CLI fs cp local.txt dbfs:/Volumes/main/default/data/uploaded.txt

# Verify file was uploaded correctly.
trace $CLI fs cat dbfs:/Volumes/main/default/data/uploaded.txt

# Download the same file back to verify round-trip.
trace $CLI fs cp dbfs:/Volumes/main/default/data/uploaded.txt downloaded.txt

# Verify downloaded content matches original.
trace cat downloaded.txt
rm downloaded.txt
2 changes: 2 additions & 0 deletions acceptance/cmd/fs/cp/file-to-file/test.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Local = true
Cloud = true
5 changes: 5 additions & 0 deletions acceptance/cmd/fs/cp/input-validation/out.test.toml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions acceptance/cmd/fs/cp/input-validation/output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

>>> errcode [CLI] fs cp src dst --concurrency -1
Error: --concurrency must be at least 1

Exit code: 1

>>> errcode [CLI] fs cp src dst --concurrency 0
Error: --concurrency must be at least 1

Exit code: 1
3 changes: 3 additions & 0 deletions acceptance/cmd/fs/cp/input-validation/script
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Invalid concurrency values should fail.
trace errcode $CLI fs cp src dst --concurrency -1
trace errcode $CLI fs cp src dst --concurrency 0
2 changes: 2 additions & 0 deletions acceptance/cmd/fs/cp/input-validation/test.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Local = true
Cloud = false
131 changes: 94 additions & 37 deletions cmd/fs/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,94 +9,141 @@ import (
"path"
"path/filepath"
"strings"
"sync"

"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/filer"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)

// Default number of concurrent file copy operations. This is a conservative
// default that should be sufficient to fully utilize the available bandwidth
// in most cases.
const defaultConcurrency = 8

// errInvalidConcurrency is returned when the value of the concurrency
// flag is invalid.
var errInvalidConcurrency = errors.New("--concurrency must be at least 1")

type copy struct {
overwrite bool
recursive bool
overwrite bool
recursive bool
concurrency int

ctx context.Context
sourceFiler filer.Filer
targetFiler filer.Filer
sourceScheme string
targetScheme string

mu sync.Mutex // protect output from concurrent writes
}

func (c *copy) cpWriteCallback(sourceDir, targetDir string) fs.WalkDirFunc {
return func(sourcePath string, d fs.DirEntry, err error) error {
// cpDirToDir recursively copies the content of a directory to another
// directory.
//
// There is no guarantee on the order in which the files are copied.
//
// The method does not take care of retrying on error; this is considered to
// be the responsibility of the Filer implementation. If a file copy fails,
// the error is returned and the other copies are cancelled.
func (c *copy) cpDirToDir(ctx context.Context, sourceDir, targetDir string) error {
if !c.recursive {
return fmt.Errorf("source path %s is a directory. Please specify the --recursive flag", sourceDir)
}

// Create a cancellable context purely for the purpose of having a way to
// cancel the goroutines in case of error walking the directory.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Pool of workers to process copy operations in parallel. The created
// context is the real context for this operation. It is shared by the
// walking function and the goroutines and can be cancelled manually
// by calling the cancel() function of its parent context.
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(c.concurrency)

// Walk the source directory, queueing file copy operations for processing.
sourceFs := filer.NewFS(ctx, c.sourceFiler)
err := fs.WalkDir(sourceFs, sourceDir, func(sourcePath string, d fs.DirEntry, err error) error {
if err != nil {
return err
}

// Compute path relative to the target directory
// Compute path relative to the source directory.
relPath, err := filepath.Rel(sourceDir, sourcePath)
if err != nil {
return err
}
relPath = filepath.ToSlash(relPath)

// Compute target path for the file
// Compute target path for the file.
targetPath := path.Join(targetDir, relPath)

// create directory and return early
// Create the directory synchronously. This must happen before files
// are copied into it, and WalkDir guarantees directories are visited
// before their contents.
if d.IsDir() {
return c.targetFiler.Mkdir(c.ctx, targetPath)
return c.targetFiler.Mkdir(ctx, targetPath)
}

return c.cpFileToFile(sourcePath, targetPath)
}
}

func (c *copy) cpDirToDir(sourceDir, targetDir string) error {
if !c.recursive {
return fmt.Errorf("source path %s is a directory. Please specify the --recursive flag", sourceDir)
g.Go(func() error {
// Goroutines are queued and may start after the context is already
// cancelled (e.g. a prior copy failed). This check aims to avoid
// starting work that will inevitably fail.
if ctx.Err() != nil {
return ctx.Err()
}
return c.cpFileToFile(ctx, sourcePath, targetPath)
})
return nil
})
if err != nil {
cancel() // cancel the goroutines
_ = g.Wait() // wait for the goroutines to finish
return err // return the "real" error that led to cancellation
}

sourceFs := filer.NewFS(c.ctx, c.sourceFiler)
return fs.WalkDir(sourceFs, sourceDir, c.cpWriteCallback(sourceDir, targetDir))
return g.Wait()
}

func (c *copy) cpFileToDir(sourcePath, targetDir string) error {
func (c *copy) cpFileToDir(ctx context.Context, sourcePath, targetDir string) error {
fileName := filepath.Base(sourcePath)
targetPath := path.Join(targetDir, fileName)

return c.cpFileToFile(sourcePath, targetPath)
return c.cpFileToFile(ctx, sourcePath, targetPath)
}

func (c *copy) cpFileToFile(sourcePath, targetPath string) error {
func (c *copy) cpFileToFile(ctx context.Context, sourcePath, targetPath string) error {
// Get reader for file at source path
r, err := c.sourceFiler.Read(c.ctx, sourcePath)
r, err := c.sourceFiler.Read(ctx, sourcePath)
if err != nil {
return err
}
defer r.Close()

if c.overwrite {
err = c.targetFiler.Write(c.ctx, targetPath, r, filer.OverwriteIfExists)
err = c.targetFiler.Write(ctx, targetPath, r, filer.OverwriteIfExists)
if err != nil {
return err
}
} else {
err = c.targetFiler.Write(c.ctx, targetPath, r)
err = c.targetFiler.Write(ctx, targetPath, r)
// skip if file already exists
if err != nil && errors.Is(err, fs.ErrExist) {
return c.emitFileSkippedEvent(sourcePath, targetPath)
return c.emitFileSkippedEvent(ctx, sourcePath, targetPath)
}
if err != nil {
return err
}
}
return c.emitFileCopiedEvent(sourcePath, targetPath)
return c.emitFileCopiedEvent(ctx, sourcePath, targetPath)
}

// TODO: emit these events on stderr
// TODO: add integration tests for these events
func (c *copy) emitFileSkippedEvent(sourcePath, targetPath string) error {
func (c *copy) emitFileSkippedEvent(ctx context.Context, sourcePath, targetPath string) error {
fullSourcePath := sourcePath
if c.sourceScheme != "" {
fullSourcePath = path.Join(c.sourceScheme+":", sourcePath)
Expand All @@ -109,10 +156,12 @@ func (c *copy) emitFileSkippedEvent(sourcePath, targetPath string) error {
event := newFileSkippedEvent(fullSourcePath, fullTargetPath)
template := "{{.SourcePath}} -> {{.TargetPath}} (skipped; already exists)\n"

return cmdio.RenderWithTemplate(c.ctx, event, "", template)
c.mu.Lock()
defer c.mu.Unlock()
return cmdio.RenderWithTemplate(ctx, event, "", template)
}

func (c *copy) emitFileCopiedEvent(sourcePath, targetPath string) error {
func (c *copy) emitFileCopiedEvent(ctx context.Context, sourcePath, targetPath string) error {
fullSourcePath := sourcePath
if c.sourceScheme != "" {
fullSourcePath = path.Join(c.sourceScheme+":", sourcePath)
Expand All @@ -125,7 +174,9 @@ func (c *copy) emitFileCopiedEvent(sourcePath, targetPath string) error {
event := newFileCopiedEvent(fullSourcePath, fullTargetPath)
template := "{{.SourcePath}} -> {{.TargetPath}}\n"

return cmdio.RenderWithTemplate(c.ctx, event, "", template)
c.mu.Lock()
defer c.mu.Unlock()
return cmdio.RenderWithTemplate(ctx, event, "", template)
}

// hasTrailingDirSeparator checks if a path ends with a directory separator.
Expand Down Expand Up @@ -153,13 +204,20 @@ func newCpCommand() *cobra.Command {
When copying a file, if TARGET_PATH is a directory, the file will be created
inside the directory, otherwise the file is created at TARGET_PATH.
`,
Args: root.ExactArgs(2),
PreRunE: root.MustWorkspaceClient,
Args: root.ExactArgs(2),
}

var c copy
cmd.Flags().BoolVar(&c.overwrite, "overwrite", false, "overwrite existing files")
cmd.Flags().BoolVarP(&c.recursive, "recursive", "r", false, "recursively copy files from directory")
cmd.Flags().IntVar(&c.concurrency, "concurrency", defaultConcurrency, "number of parallel copy operations")

cmd.PreRunE = func(cmd *cobra.Command, args []string) error {
if c.concurrency <= 0 {
return errInvalidConcurrency
}
return root.MustWorkspaceClient(cmd, args)
}

cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
Expand Down Expand Up @@ -187,7 +245,6 @@ func newCpCommand() *cobra.Command {
c.targetScheme = "dbfs"
}

c.ctx = ctx
c.sourceFiler = sourceFiler
c.targetFiler = targetFiler

Expand All @@ -199,7 +256,7 @@ func newCpCommand() *cobra.Command {

// case 1: source path is a directory, then recursively create files at target path
if sourceInfo.IsDir() {
return c.cpDirToDir(sourcePath, targetPath)
return c.cpDirToDir(ctx, sourcePath, targetPath)
}

// If target path has a trailing separator, trim it and let case 2 handle it
Expand All @@ -210,11 +267,11 @@ func newCpCommand() *cobra.Command {
// case 2: source path is a file, and target path is a directory. In this case
// we copy the file to inside the directory
if targetInfo, err := targetFiler.Stat(ctx, targetPath); err == nil && targetInfo.IsDir() {
return c.cpFileToDir(sourcePath, targetPath)
return c.cpFileToDir(ctx, sourcePath, targetPath)
}

// case 3: source path is a file, and target path is a file
return c.cpFileToFile(sourcePath, targetPath)
return c.cpFileToFile(ctx, sourcePath, targetPath)
}

v := newValidArgs()
Expand Down
Loading