Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 94 additions & 12 deletions pkg/clip/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"golang.org/x/sys/unix"

common "github.com/beam-cloud/clip/pkg/common"
"github.com/beam-cloud/clip/pkg/storage"

"github.com/karrick/godirwalk"
"github.com/tidwall/btree"
Expand All @@ -41,11 +42,12 @@ func init() {
}

type ClipArchiverOptions struct {
Compress bool
ArchivePath string
SourcePath string
OutputFile string
OutputPath string
Compress bool
ArchivePath string
SourcePath string
OutputFile string
OutputPath string
ContentCache storage.ContentCache
}

type ClipArchiver struct {
Expand Down Expand Up @@ -651,13 +653,8 @@ func (ca *ClipArchiver) processNode(node *common.ClipNode, writer *bufio.Writer,
// Update data position
node.DataPos = *pos

// Create a multi-writer that writes to both the checksum and the writer
multi := io.MultiWriter(hash, writer)

// Use io.Copy to simultaneously write the file to the output and update the checksum
copied, err := io.Copy(multi, f)
if err != nil {
log.Error().Msgf("error copying file %s: %v", node.Path, err)
copied, ok := ca.copyNodeContent(node, f, writer, hash, opts)
if !ok {
return false
}

Expand All @@ -681,6 +678,91 @@ func (ca *ClipArchiver) processNode(node *common.ClipNode, writer *bufio.Writer,
return true
}

func (ca *ClipArchiver) copyNodeContent(node *common.ClipNode, f *os.File, writer *bufio.Writer, hash io.Writer, opts ClipArchiverOptions) (int64, bool) {
if opts.ContentCache == nil || node.ContentHash == "" {
multi := io.MultiWriter(hash, writer)
copied, err := io.Copy(multi, f)
if err != nil {
log.Error().Msgf("error copying file %s: %v", node.Path, err)
return copied, false
}
return copied, true
}

chunks := make(chan []byte, 2)
type storeResult struct {
hash string
err error
}
storeDone := make(chan storeResult, 1)
go func() {
actualHash, err := opts.ContentCache.StoreContent(chunks, node.ContentHash, struct{ RoutingKey string }{RoutingKey: node.ContentHash})
storeDone <- storeResult{hash: actualHash, err: err}
}()

buf := make([]byte, indexedLayerContentCacheChunkSize)
var copied int64
var copyOK bool
var result *storeResult
for {
n, readErr := f.Read(buf)
if n > 0 {
data := buf[:n]
if _, err := hash.Write(data); err != nil {
log.Error().Err(err).Str("path", node.Path).Msg("error hashing file while archiving")
break
}
if _, err := writer.Write(data); err != nil {
log.Error().Err(err).Str("path", node.Path).Msg("error writing file to archive")
break
}
chunk := make([]byte, n)
copy(chunk, data)
if chunks != nil {
select {
case chunks <- chunk:
case r := <-storeDone:
result = &r
chunks = nil
log.Warn().
Err(r.err).
Str("path", node.Path).
Str("hash", node.ContentHash).
Str("actual_hash", r.hash).
Msg("file content cache store ended before archive copy completed")
}
}
copied += int64(n)
}
if readErr == io.EOF {
copyOK = true
break
}
if readErr != nil {
log.Error().Err(readErr).Str("path", node.Path).Msg("error reading file while archiving")
break
}
}

if chunks != nil {
close(chunks)
}
if result == nil && chunks != nil {
r := <-storeDone
result = &r
}
if result != nil && (result.err != nil || result.hash != node.ContentHash) {
log.Warn().
Err(result.err).
Str("path", node.Path).
Str("hash", node.ContentHash).
Str("actual_hash", result.hash).
Msg("error warming file content cache")
}

return copied, copyOK
}

func (ca *ClipArchiver) EncodeHeader(header *common.ClipArchiveHeader) ([]byte, error) {
buf := new(bytes.Buffer)
if err := binary.Write(buf, binary.LittleEndian, header); err != nil {
Expand Down
40 changes: 27 additions & 13 deletions pkg/clip/clip.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type CreateOptions struct {
OutputPath string
Credentials storage.ClipStorageCredentials
ProgressChan chan<- int
ContentCache storage.ContentCache
}

type CreateRemoteOptions struct {
Expand All @@ -67,6 +68,7 @@ type MountOptions struct {
Credentials storage.ClipStorageCredentials
UseCheckpoints bool // Enable checkpoint-based partial decompression for OCI layers
RegistryCredProvider interface{} // Registry authentication (for OCI archives)
ReadTraceObserver common.ReadTraceObserver
}

type StoreS3Options struct {
Expand All @@ -85,8 +87,9 @@ func CreateArchive(options CreateOptions) error {

a := NewClipArchiver()
err := a.Create(ClipArchiverOptions{
SourcePath: options.InputPath,
OutputFile: options.OutputPath,
SourcePath: options.InputPath,
OutputFile: options.OutputPath,
ContentCache: options.ContentCache,
})
if err != nil {
return err
Expand All @@ -108,8 +111,9 @@ func CreateAndUploadArchive(ctx context.Context, options CreateOptions, si commo

localArchiver := NewClipArchiver()
err = localArchiver.Create(ClipArchiverOptions{
SourcePath: options.InputPath,
OutputFile: tempFile.Name(),
SourcePath: options.InputPath,
OutputFile: tempFile.Name(),
ContentCache: options.ContentCache,
})
if err != nil {
return err
Expand Down Expand Up @@ -184,12 +188,17 @@ func MountArchive(options MountOptions) (func() error, <-chan error, *fuse.Serve
UseCheckpoints: options.UseCheckpoints,
ContentCacheAvailable: options.ContentCacheAvailable,
RegistryCredProvider: options.RegistryCredProvider,
ReadTraceObserver: options.ReadTraceObserver,
})
if err != nil {
return nil, nil, nil, fmt.Errorf("could not load storage: %v", err)
}

clipfs, err := NewFileSystem(storage, ClipFileSystemOpts{ContentCache: options.ContentCache, ContentCacheAvailable: options.ContentCacheAvailable})
clipfs, err := NewFileSystem(storage, ClipFileSystemOpts{
ContentCache: options.ContentCache,
ContentCacheAvailable: options.ContentCacheAvailable,
ReadTraceObserver: options.ReadTraceObserver,
})
if err != nil {
return nil, nil, nil, fmt.Errorf("could not create filesystem: %v", err)
}
Expand All @@ -207,7 +216,8 @@ func MountArchive(options MountOptions) (func() error, <-chan error, *fuse.Serve
EnableSymlinkCaching: true,
SyncRead: false,
RememberInodes: true,
MaxReadAhead: 1024 * 128, // 128KB
MaxWrite: 1024 * 1024,
MaxReadAhead: 1024 * 1024,
})
if err != nil {
return nil, nil, nil, fmt.Errorf("could not create server: %v", err)
Expand Down Expand Up @@ -263,13 +273,15 @@ func StoreS3(storeS3Opts StoreS3Options) error {

// CreateFromOCIImageOptions configures OCI image indexing
type CreateFromOCIImageOptions struct {
ImageRef string // Source image to index (can be local)
StorageImageRef string // Optional: image reference to store in metadata (defaults to ImageRef)
OutputPath string
CheckpointMiB int64
CredProvider interface{}
ProgressChan chan<- OCIIndexProgress // optional channel for progress updates
Platform *v1.Platform // Target platform (defaults to linux/runtime.GOARCH)
ImageRef string // Source image to index (can be local)
StorageImageRef string // Optional: image reference to store in metadata (defaults to ImageRef)
OutputPath string // Path for the metadata-only .clip archive
CheckpointMiB int64 // Gzip checkpoint interval
CredProvider interface{} // Optional registry credential provider
ProgressChan chan<- OCIIndexProgress
Platform *v1.Platform
ContentCache storage.ContentCache // Optional cache to warm with decompressed layer streams
ContentCacheDir string // Optional temp directory for layer cache upload spooling
}

// CreateFromOCIImage creates a metadata-only index (.clip) file from an OCI image
Expand Down Expand Up @@ -300,6 +312,8 @@ func CreateFromOCIImage(ctx context.Context, options CreateFromOCIImageOptions)
CredProvider: credProvider,
ProgressChan: options.ProgressChan,
Platform: options.Platform,
ContentCache: options.ContentCache,
ContentCacheDir: options.ContentCacheDir,
}, options.OutputPath)

if err != nil {
Expand Down
15 changes: 9 additions & 6 deletions pkg/clip/clipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type ClipFileSystemOpts struct {
Verbose bool
ContentCache storage.ContentCache
ContentCacheAvailable bool
ReadTraceObserver common.ReadTraceObserver
}

type ClipFileSystem struct {
Expand All @@ -23,6 +24,7 @@ type ClipFileSystem struct {
lookupCache map[string]*lookupCacheEntry
contentCache storage.ContentCache
contentCacheAvailable bool
readTraceObserver common.ReadTraceObserver
cacheMutex sync.RWMutex
cachingStatus map[string]bool
cacheEventChan chan cacheEvent
Expand All @@ -46,6 +48,7 @@ func NewFileSystem(s storage.ClipStorageInterface, opts ClipFileSystemOpts) (*Cl
cacheEventChan: make(chan cacheEvent, 10000),
cachingStatus: make(map[string]bool),
contentCacheAvailable: opts.ContentCacheAvailable,
readTraceObserver: opts.ReadTraceObserver,
}

metadata := s.Metadata()
Expand Down Expand Up @@ -113,12 +116,12 @@ func (cfs *ClipFileSystem) processCacheEvents() {
chunkSize = clipNode.DataLen - offset
}

fileContent := make([]byte, chunkSize) // Create a new buffer for each chunk
nRead, err := cfs.storage.ReadFile(clipNode, fileContent, offset)
if err != nil {
log.Error().Err(err).Str("path", clipNode.Path).Msg("error reading file for caching")
break
}
fileContent := make([]byte, chunkSize) // Create a new buffer for each chunk
nRead, err := cfs.storage.ReadFile(clipNode, fileContent, offset)
if err != nil {
log.Error().Err(err).Str("path", clipNode.Path).Msg("error reading file for caching")
break
}

chunks <- fileContent[:nRead]
fileContent = nil
Expand Down
Loading
Loading