From a240b2dfd71750518a2766a71cb755b1ebfbbf7e Mon Sep 17 00:00:00 2001 From: Luke Lombardi Date: Sun, 24 May 2026 09:51:46 -0400 Subject: [PATCH] add some more instrumentation --- pkg/clip/archive.go | 106 ++++++++++-- pkg/clip/clip.go | 40 +++-- pkg/clip/clipfs.go | 15 +- pkg/clip/fsnode.go | 346 ++++++++++++++++++++++++++++++++++++---- pkg/clip/oci_indexer.go | 191 +++++++++++++++++++++- pkg/common/readtrace.go | 45 ++++++ pkg/storage/local.go | 22 +++ pkg/storage/oci.go | 260 ++++++++++++++++++++++++++++-- pkg/storage/oci_test.go | 137 ++++++++++++++-- pkg/storage/s3.go | 21 +++ pkg/storage/storage.go | 38 +++++ 11 files changed, 1132 insertions(+), 89 deletions(-) create mode 100644 pkg/common/readtrace.go diff --git a/pkg/clip/archive.go b/pkg/clip/archive.go index 948e754..a211271 100644 --- a/pkg/clip/archive.go +++ b/pkg/clip/archive.go @@ -23,6 +23,7 @@ import ( "golang.org/x/sys/unix" common "github.com/beam-cloud/clip/pkg/common" + "github.com/beam-cloud/clip/pkg/storage" "github.com/karrick/godirwalk" "github.com/tidwall/btree" @@ -41,11 +42,12 @@ func init() { } type ClipArchiverOptions struct { - Compress bool - ArchivePath string - SourcePath string - OutputFile string - OutputPath string + Compress bool + ArchivePath string + SourcePath string + OutputFile string + OutputPath string + ContentCache storage.ContentCache } type ClipArchiver struct { @@ -651,13 +653,8 @@ func (ca *ClipArchiver) processNode(node *common.ClipNode, writer *bufio.Writer, // Update data position node.DataPos = *pos - // Create a multi-writer that writes to both the checksum and the writer - multi := io.MultiWriter(hash, writer) - - // Use io.Copy to simultaneously write the file to the output and update the checksum - copied, err := io.Copy(multi, f) - if err != nil { - log.Error().Msgf("error copying file %s: %v", node.Path, err) + copied, ok := ca.copyNodeContent(node, f, writer, hash, opts) + if !ok { return false } @@ -681,6 +678,91 @@ func (ca *ClipArchiver) processNode(node *common.ClipNode, writer *bufio.Writer, return true } +func (ca *ClipArchiver) copyNodeContent(node *common.ClipNode, f *os.File, writer *bufio.Writer, hash io.Writer, opts ClipArchiverOptions) (int64, bool) { + if opts.ContentCache == nil || node.ContentHash == "" { + multi := io.MultiWriter(hash, writer) + copied, err := io.Copy(multi, f) + if err != nil { + log.Error().Msgf("error copying file %s: %v", node.Path, err) + return copied, false + } + return copied, true + } + + chunks := make(chan []byte, 2) + type storeResult struct { + hash string + err error + } + storeDone := make(chan storeResult, 1) + go func() { + actualHash, err := opts.ContentCache.StoreContent(chunks, node.ContentHash, struct{ RoutingKey string }{RoutingKey: node.ContentHash}) + storeDone <- storeResult{hash: actualHash, err: err} + }() + + buf := make([]byte, indexedLayerContentCacheChunkSize) + var copied int64 + var copyOK bool + var result *storeResult + for { + n, readErr := f.Read(buf) + if n > 0 { + data := buf[:n] + if _, err := hash.Write(data); err != nil { + log.Error().Err(err).Str("path", node.Path).Msg("error hashing file while archiving") + break + } + if _, err := writer.Write(data); err != nil { + log.Error().Err(err).Str("path", node.Path).Msg("error writing file to archive") + break + } + chunk := make([]byte, n) + copy(chunk, data) + if chunks != nil { + select { + case chunks <- chunk: + case r := <-storeDone: + result = &r + chunks = nil + log.Warn(). + Err(r.err). + Str("path", node.Path). + Str("hash", node.ContentHash). + Str("actual_hash", r.hash). + Msg("file content cache store ended before archive copy completed") + } + } + copied += int64(n) + } + if readErr == io.EOF { + copyOK = true + break + } + if readErr != nil { + log.Error().Err(readErr).Str("path", node.Path).Msg("error reading file while archiving") + break + } + } + + if chunks != nil { + close(chunks) + } + if result == nil && chunks != nil { + r := <-storeDone + result = &r + } + if result != nil && (result.err != nil || result.hash != node.ContentHash) { + log.Warn(). + Err(result.err). + Str("path", node.Path). + Str("hash", node.ContentHash). + Str("actual_hash", result.hash). + Msg("error warming file content cache") + } + + return copied, copyOK +} + func (ca *ClipArchiver) EncodeHeader(header *common.ClipArchiveHeader) ([]byte, error) { buf := new(bytes.Buffer) if err := binary.Write(buf, binary.LittleEndian, header); err != nil { diff --git a/pkg/clip/clip.go b/pkg/clip/clip.go index aa90db1..2d27d0b 100644 --- a/pkg/clip/clip.go +++ b/pkg/clip/clip.go @@ -45,6 +45,7 @@ type CreateOptions struct { OutputPath string Credentials storage.ClipStorageCredentials ProgressChan chan<- int + ContentCache storage.ContentCache } type CreateRemoteOptions struct { @@ -67,6 +68,7 @@ type MountOptions struct { Credentials storage.ClipStorageCredentials UseCheckpoints bool // Enable checkpoint-based partial decompression for OCI layers RegistryCredProvider interface{} // Registry authentication (for OCI archives) + ReadTraceObserver common.ReadTraceObserver } type StoreS3Options struct { @@ -85,8 +87,9 @@ func CreateArchive(options CreateOptions) error { a := NewClipArchiver() err := a.Create(ClipArchiverOptions{ - SourcePath: options.InputPath, - OutputFile: options.OutputPath, + SourcePath: options.InputPath, + OutputFile: options.OutputPath, + ContentCache: options.ContentCache, }) if err != nil { return err @@ -108,8 +111,9 @@ func CreateAndUploadArchive(ctx context.Context, options CreateOptions, si commo localArchiver := NewClipArchiver() err = localArchiver.Create(ClipArchiverOptions{ - SourcePath: options.InputPath, - OutputFile: tempFile.Name(), + SourcePath: options.InputPath, + OutputFile: tempFile.Name(), + ContentCache: options.ContentCache, }) if err != nil { return err @@ -184,12 +188,17 @@ func MountArchive(options MountOptions) (func() error, <-chan error, *fuse.Serve UseCheckpoints: options.UseCheckpoints, ContentCacheAvailable: options.ContentCacheAvailable, RegistryCredProvider: options.RegistryCredProvider, + ReadTraceObserver: options.ReadTraceObserver, }) if err != nil { return nil, nil, nil, fmt.Errorf("could not load storage: %v", err) } - clipfs, err := NewFileSystem(storage, ClipFileSystemOpts{ContentCache: options.ContentCache, ContentCacheAvailable: options.ContentCacheAvailable}) + clipfs, err := NewFileSystem(storage, ClipFileSystemOpts{ + ContentCache: options.ContentCache, + ContentCacheAvailable: options.ContentCacheAvailable, + ReadTraceObserver: options.ReadTraceObserver, + }) if err != nil { return nil, nil, nil, fmt.Errorf("could not create filesystem: %v", err) } @@ -207,7 +216,8 @@ func MountArchive(options MountOptions) (func() error, <-chan error, *fuse.Serve EnableSymlinkCaching: true, SyncRead: false, RememberInodes: true, - MaxReadAhead: 1024 * 128, // 128KB + MaxWrite: 1024 * 1024, + MaxReadAhead: 1024 * 1024, }) if err != nil { return nil, nil, nil, fmt.Errorf("could not create server: %v", err) @@ -263,13 +273,15 @@ func StoreS3(storeS3Opts StoreS3Options) error { // CreateFromOCIImageOptions configures OCI image indexing type CreateFromOCIImageOptions struct { - ImageRef string // Source image to index (can be local) - StorageImageRef string // Optional: image reference to store in metadata (defaults to ImageRef) - OutputPath string - CheckpointMiB int64 - CredProvider interface{} - ProgressChan chan<- OCIIndexProgress // optional channel for progress updates - Platform *v1.Platform // Target platform (defaults to linux/runtime.GOARCH) + ImageRef string // Source image to index (can be local) + StorageImageRef string // Optional: image reference to store in metadata (defaults to ImageRef) + OutputPath string // Path for the metadata-only .clip archive + CheckpointMiB int64 // Gzip checkpoint interval + CredProvider interface{} // Optional registry credential provider + ProgressChan chan<- OCIIndexProgress + Platform *v1.Platform + ContentCache storage.ContentCache // Optional cache to warm with decompressed layer streams + ContentCacheDir string // Optional temp directory for layer cache upload spooling } // CreateFromOCIImage creates a metadata-only index (.clip) file from an OCI image @@ -300,6 +312,8 @@ func CreateFromOCIImage(ctx context.Context, options CreateFromOCIImageOptions) CredProvider: credProvider, ProgressChan: options.ProgressChan, Platform: options.Platform, + ContentCache: options.ContentCache, + ContentCacheDir: options.ContentCacheDir, }, options.OutputPath) if err != nil { diff --git a/pkg/clip/clipfs.go b/pkg/clip/clipfs.go index 7d53aa7..9ecf981 100644 --- a/pkg/clip/clipfs.go +++ b/pkg/clip/clipfs.go @@ -15,6 +15,7 @@ type ClipFileSystemOpts struct { Verbose bool ContentCache storage.ContentCache ContentCacheAvailable bool + ReadTraceObserver common.ReadTraceObserver } type ClipFileSystem struct { @@ -23,6 +24,7 @@ type ClipFileSystem struct { lookupCache map[string]*lookupCacheEntry contentCache storage.ContentCache contentCacheAvailable bool + readTraceObserver common.ReadTraceObserver cacheMutex sync.RWMutex cachingStatus map[string]bool cacheEventChan chan cacheEvent @@ -46,6 +48,7 @@ func NewFileSystem(s storage.ClipStorageInterface, opts ClipFileSystemOpts) (*Cl cacheEventChan: make(chan cacheEvent, 10000), cachingStatus: make(map[string]bool), contentCacheAvailable: opts.ContentCacheAvailable, + readTraceObserver: opts.ReadTraceObserver, } metadata := s.Metadata() @@ -113,12 +116,12 @@ func (cfs *ClipFileSystem) processCacheEvents() { chunkSize = clipNode.DataLen - offset } - fileContent := make([]byte, chunkSize) // Create a new buffer for each chunk - nRead, err := cfs.storage.ReadFile(clipNode, fileContent, offset) - if err != nil { - log.Error().Err(err).Str("path", clipNode.Path).Msg("error reading file for caching") - break - } + fileContent := make([]byte, chunkSize) // Create a new buffer for each chunk + nRead, err := cfs.storage.ReadFile(clipNode, fileContent, offset) + if err != nil { + log.Error().Err(err).Str("path", clipNode.Path).Msg("error reading file for caching") + break + } chunks <- fileContent[:nRead] fileContent = nil diff --git a/pkg/clip/fsnode.go b/pkg/clip/fsnode.go index c55ce50..e9381d3 100644 --- a/pkg/clip/fsnode.go +++ b/pkg/clip/fsnode.go @@ -2,10 +2,15 @@ package clip import ( "context" + "os" "path" + "strconv" + "sync" "syscall" + "time" "github.com/beam-cloud/clip/pkg/common" + "github.com/beam-cloud/clip/pkg/storage" "github.com/hanwen/go-fuse/v2/fs" "github.com/hanwen/go-fuse/v2/fuse" "github.com/rs/zerolog/log" @@ -18,6 +23,98 @@ type FSNode struct { attr fuse.Attr } +const clipFileHandleFDCacheSize = 2048 + +type clipFileHandle struct { + node *FSNode + mu sync.Mutex + files map[string]*os.File +} + +func newClipFileHandle(node *FSNode) *clipFileHandle { + return &clipFileHandle{ + node: node, + files: make(map[string]*os.File), + } +} + +func (fh *clipFileHandle) Read(ctx context.Context, dest []byte, off int64) (fuse.ReadResult, syscall.Errno) { + if caller, ok := fuse.FromContext(ctx); ok && caller != nil { + ctx = common.WithReadTraceCallerPID(ctx, caller.Pid) + } + if res, ok, errno := fh.readLocalRegion(ctx, dest, off); ok || errno != fs.OK { + return res, errno + } + return fh.node.readData(ctx, dest, off) +} + +func (fh *clipFileHandle) Release(ctx context.Context) syscall.Errno { + fh.mu.Lock() + defer fh.mu.Unlock() + var firstErr syscall.Errno + for path, file := range fh.files { + if err := file.Close(); err != nil && firstErr == fs.OK { + firstErr = fs.ToErrno(err) + } + delete(fh.files, path) + } + return firstErr +} + +func (fh *clipFileHandle) openRegionFile(path string) (*os.File, error) { + fh.mu.Lock() + defer fh.mu.Unlock() + if file := fh.files[path]; file != nil { + return file, nil + } + if len(fh.files) >= clipFileHandleFDCacheSize { + return nil, syscall.EMFILE + } + file, err := os.Open(path) + if err != nil { + return nil, err + } + fh.files[path] = file + return file, nil +} + +func (fh *clipFileHandle) readLocalRegion(ctx context.Context, dest []byte, off int64) (fuse.ReadResult, bool, syscall.Errno) { + if fh == nil || fh.node == nil || fh.node.filesystem == nil || len(dest) == 0 { + return nil, false, fs.OK + } + regioner, ok := fh.node.filesystem.storage.(storage.LocalFileRegioner) + if !ok || regioner == nil { + return nil, false, fs.OK + } + + readLen := fh.node.clampedReadLength(off, int64(len(dest))) + if readLen <= 0 { + return fuse.ReadResultData(dest[:0]), true, fs.OK + } + + started := time.Now() + region, ok, err := regioner.LocalFileRegion(ctx, fh.node.clipNode, off, readLen) + if err != nil { + fh.node.observeRead(ctx, fh.node.regionReadTrace(region, off, readLen, 0, started, err)) + return nil, false, fs.OK + } + if !ok || region.Path == "" || region.Length <= 0 { + return nil, false, fs.OK + } + if int64(region.Length) != readLen { + return nil, false, fs.OK + } + + file, err := fh.openRegionFile(region.Path) + if err != nil { + fh.node.observeRead(ctx, fh.node.regionReadTrace(region, off, readLen, 0, started, err)) + return nil, false, fs.OK + } + + fh.node.observeRead(ctx, fh.node.regionReadTrace(region, off, readLen, int64(region.Length), started, nil)) + return fuse.ReadResultFd(file.Fd(), region.Offset, region.Length), true, fs.OK +} + func (n *FSNode) OnAdd(ctx context.Context) { log.Debug().Str("path", n.clipNode.Path).Msg("OnAdd called") } @@ -85,36 +182,53 @@ func (n *FSNode) Opendir(ctx context.Context) syscall.Errno { func (n *FSNode) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fuseFlags uint32, errno syscall.Errno) { log.Debug().Str("path", n.clipNode.Path).Uint32("flags", flags).Msg("Open called") - return nil, 0, fs.OK + if n.clipNode == nil || n.clipNode.NodeType != common.FileNode { + return nil, 0, fs.OK + } + return newClipFileHandle(n), fuse.FOPEN_KEEP_CACHE, fs.OK } func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno) { - log.Debug().Str("path", n.clipNode.Path).Int64("offset", off).Msg("Read called") - - // Determine file size (support both legacy and v2 RemoteRef) - var fileSize int64 - if n.clipNode.Remote != nil { - // v2: Use RemoteRef - fileSize = n.clipNode.Remote.ULength - } else { - // Legacy: Use DataLen - fileSize = n.clipNode.DataLen + if fh, ok := f.(*clipFileHandle); ok && fh != nil { + return fh.Read(ctx, dest, off) } + return n.readData(ctx, dest, off) +} - // Immediately return zeroed buffer if read is completely beyond EOF or file is empty - if off >= fileSize || fileSize == 0 { - return fuse.ReadResultData(dest[:0]), fs.OK +func (n *FSNode) readData(ctx context.Context, dest []byte, off int64) (fuse.ReadResult, syscall.Errno) { + log.Debug().Str("path", n.clipNode.Path).Int64("offset", off).Msg("Read called") + if caller, ok := fuse.FromContext(ctx); ok && caller != nil { + ctx = common.WithReadTraceCallerPID(ctx, caller.Pid) } - // Determine readable length - maxReadable := fileSize - off - readLen := int64(len(dest)) - if readLen > maxReadable { - readLen = maxReadable + readLen := n.clampedReadLength(off, int64(len(dest))) + if readLen <= 0 { + return fuse.ReadResultData(dest[:0]), fs.OK } var nRead int var err error + readStart := time.Now() + readSource := "unknown" + + defer func() { + if n.clipNode.Remote != nil { + return + } + n.observeLegacyRead(ctx, common.ReadTraceEvent{ + Operation: "clip.read", + Source: readSource, + Path: n.clipNode.Path, + Offset: off, + Length: readLen, + BytesRead: int64(nRead), + StartedAt: readStart, + Duration: time.Since(readStart), + Success: err == nil, + Error: errorString(err), + Attrs: n.legacyReadAttrs(), + }) + }() // For OCI images (v2 with Remote), delegate ALL caching to the storage layer // The storage layer (oci.go) handles the proper 3-tier cache hierarchy: @@ -123,7 +237,11 @@ func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int // 3. OCI registry (download + decompress) if n.clipNode.Remote != nil { // OCI mode - storage layer handles all caching - nRead, err = n.filesystem.storage.ReadFile(n.clipNode, dest[:readLen], off) + if contextStorage, ok := n.filesystem.storage.(storage.ContextClipStorageInterface); ok { + nRead, err = contextStorage.ReadFileContext(ctx, n.clipNode, dest[:readLen], off) + } else { + nRead, err = n.filesystem.storage.ReadFile(n.clipNode, dest[:readLen], off) + } if err != nil { return nil, syscall.EIO } @@ -131,14 +249,54 @@ func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int // Legacy mode - use file-level ContentCache // Attempt to read from cache first for legacy archives if n.filesystem.contentCacheAvailable && n.clipNode.ContentHash != "" && !n.filesystem.storage.CachedLocally() { - content, cacheErr := n.filesystem.contentCache.GetContent(n.clipNode.ContentHash, off, readLen, struct{ RoutingKey string }{RoutingKey: n.clipNode.ContentHash}) + var cacheErr error + cacheStart := time.Now() + if readInto, ok := n.filesystem.contentCache.(storage.ContentCacheReadInto); ok { + var n64 int64 + n64, cacheErr = readInto.ReadContentInto(n.clipNode.ContentHash, off, dest[:readLen], struct{ RoutingKey string }{RoutingKey: n.clipNode.ContentHash}) + nRead = int(n64) + if cacheErr == nil && n64 != readLen { + cacheErr = syscall.EIO + } + } else { + var content []byte + content, cacheErr = n.filesystem.contentCache.GetContent(n.clipNode.ContentHash, off, readLen, struct{ RoutingKey string }{RoutingKey: n.clipNode.ContentHash}) + if cacheErr == nil { + nRead = copy(dest, content) + } + } if cacheErr == nil { + readSource = "content_cache" + n.observeLegacyRead(ctx, common.ReadTraceEvent{ + Operation: "clip.content_cache_read", + Source: "content_cache", + Path: n.clipNode.Path, + Offset: off, + Length: readLen, + BytesRead: int64(nRead), + StartedAt: cacheStart, + Duration: time.Since(cacheStart), + Success: true, + Attrs: n.legacyReadAttrsWith("cache_hit", "true"), + }) // Cache hit - use cached content - nRead = copy(dest, content) log.Debug().Str("path", n.clipNode.Path).Int64("offset", off).Int64("length", readLen).Msg("Cache hit") } else { + n.observeLegacyRead(ctx, common.ReadTraceEvent{ + Operation: "clip.content_cache_read", + Source: "content_cache", + Path: n.clipNode.Path, + Offset: off, + Length: readLen, + StartedAt: cacheStart, + Duration: time.Since(cacheStart), + Success: false, + Error: errorString(cacheErr), + Attrs: n.legacyReadAttrsWith("cache_hit", "false"), + }) // Cache miss - read from storage and populate cache - nRead, err = n.filesystem.storage.ReadFile(n.clipNode, dest[:readLen], off) + readSource = n.legacyArchiveSource() + nRead, err = n.readLegacyArchiveObserved(ctx, dest[:readLen], off, readLen) if err != nil { return nil, syscall.EIO } @@ -151,20 +309,150 @@ func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int } } else { // No cache available or local storage - read directly - nRead, err = n.filesystem.storage.ReadFile(n.clipNode, dest[:readLen], off) + readSource = n.legacyArchiveSource() + nRead, err = n.readLegacyArchiveObserved(ctx, dest[:readLen], off, readLen) if err != nil { return nil, syscall.EIO } } } - // Null-terminate immediately after last read byte if buffer is not fully filled - if nRead < len(dest) { - dest[nRead] = 0 - nRead++ + return fuse.ReadResultData(dest[:nRead]), fs.OK +} + +func (n *FSNode) clampedReadLength(off int64, requested int64) int64 { + if n == nil || n.clipNode == nil || off < 0 || requested <= 0 { + return 0 + } + fileSize := n.fileSize() + if off >= fileSize || fileSize == 0 { + return 0 + } + maxReadable := fileSize - off + if requested > maxReadable { + return maxReadable + } + return requested +} + +func (n *FSNode) fileSize() int64 { + if n == nil || n.clipNode == nil { + return 0 + } + if n.clipNode.Remote != nil { + return n.clipNode.Remote.ULength + } + return n.clipNode.DataLen +} + +func (n *FSNode) regionReadTrace(region storage.LocalFileRegion, off int64, readLen int64, bytesRead int64, started time.Time, err error) common.ReadTraceEvent { + operation := "clip.read" + attrs := map[string]string{} + if n != nil && n.clipNode != nil && n.clipNode.Remote != nil { + operation = "clip.oci_read" + attrs["storage_mode"] = "oci" + attrs["content_cache_available"] = strconv.FormatBool(n.filesystem != nil && n.filesystem.contentCacheAvailable) + } else if n != nil { + attrs = n.legacyReadAttrs() + } + attrs["fd_fast_path"] = "true" + if region.Path != "" { + attrs["region_path"] = region.Path } - return fuse.ReadResultData(dest[:nRead]), fs.OK + success := err == nil + return common.ReadTraceEvent{ + Operation: operation, + Source: region.Source, + Path: n.clipNode.Path, + LayerDigest: region.LayerDigest, + DecompressedHash: region.DecompressedHash, + Offset: off, + Length: readLen, + BytesRead: bytesRead, + StartedAt: started, + Duration: time.Since(started), + Success: success, + Error: errorString(err), + Attrs: attrs, + } +} + +func (n *FSNode) observeRead(ctx context.Context, event common.ReadTraceEvent) { + if n.filesystem == nil || n.filesystem.readTraceObserver == nil { + return + } + if event.StartedAt.IsZero() { + event.StartedAt = time.Now().Add(-event.Duration) + } + if event.CallerPID == 0 { + event.CallerPID = common.ReadTraceCallerPID(ctx) + } + n.filesystem.readTraceObserver(event) +} + +func (n *FSNode) readLegacyArchiveObserved(ctx context.Context, dest []byte, off int64, readLen int64) (int, error) { + startedAt := time.Now() + nRead, err := n.filesystem.storage.ReadFile(n.clipNode, dest, off) + n.observeLegacyRead(ctx, common.ReadTraceEvent{ + Operation: "clip.archive_read", + Source: n.legacyArchiveSource(), + Path: n.clipNode.Path, + Offset: off, + Length: readLen, + BytesRead: int64(nRead), + StartedAt: startedAt, + Duration: time.Since(startedAt), + Success: err == nil, + Error: errorString(err), + Attrs: n.legacyReadAttrs(), + }) + return nRead, err +} + +func (n *FSNode) observeLegacyRead(ctx context.Context, event common.ReadTraceEvent) { + n.observeRead(ctx, event) +} + +func (n *FSNode) legacyReadAttrs() map[string]string { + attrs := map[string]string{ + "storage_mode": "legacy", + } + if n.filesystem != nil { + attrs["content_cache_available"] = strconv.FormatBool(n.filesystem.contentCacheAvailable) + } + if n.clipNode.ContentHash != "" { + attrs["content_hash"] = n.clipNode.ContentHash + if len(n.clipNode.ContentHash) > 12 { + attrs["content_hash_short"] = n.clipNode.ContentHash[:12] + } + } + if n.filesystem != nil && n.filesystem.storage != nil { + attrs["cached_locally"] = strconv.FormatBool(n.filesystem.storage.CachedLocally()) + } + return attrs +} + +func (n *FSNode) legacyReadAttrsWith(key, value string) map[string]string { + attrs := n.legacyReadAttrs() + if key != "" { + attrs[key] = value + } + return attrs +} + +func (n *FSNode) legacyArchiveSource() string { + if n.filesystem != nil && n.filesystem.storage != nil && n.filesystem.storage.CachedLocally() { + return "local_archive" + } + return "remote_archive" +} + +func errorString(err error) string { + if err == nil { + return "" + } + return err.Error() } func (n *FSNode) Readlink(ctx context.Context) ([]byte, syscall.Errno) { diff --git a/pkg/clip/oci_indexer.go b/pkg/clip/oci_indexer.go index f3c5f2a..0c647e7 100644 --- a/pkg/clip/oci_indexer.go +++ b/pkg/clip/oci_indexer.go @@ -9,6 +9,7 @@ import ( "fmt" "hash/fnv" "io" + "os" "path" "runtime" "strings" @@ -16,6 +17,7 @@ import ( "time" "github.com/beam-cloud/clip/pkg/common" + "github.com/beam-cloud/clip/pkg/storage" "github.com/google/go-containerregistry/pkg/authn" "github.com/google/go-containerregistry/pkg/name" v1 "github.com/google/go-containerregistry/pkg/v1" @@ -43,6 +45,69 @@ type IndexOCIImageOptions struct { CredProvider common.RegistryCredentialProvider // optional credential provider for registry authentication ProgressChan chan<- OCIIndexProgress // optional channel for progress updates Platform *v1.Platform // Target platform (defaults to linux/runtime.GOARCH) + ContentCache storage.ContentCache // optional remote cache for fully decompressed layers + ContentCacheDir string // optional temp directory for cache upload spooling +} + +const indexedLayerContentCacheChunkSize = 4 * 1024 * 1024 + +type indexedLayerContentCacheSpool struct { + file *os.File + path string + err error +} + +func newIndexedLayerContentCacheSpool(dir, layerDigest string) *indexedLayerContentCacheSpool { + if dir != "" { + if err := os.MkdirAll(dir, 0755); err != nil { + log.Warn().Err(err).Str("dir", dir).Msg("failed to create layer content cache temp dir") + } + } + + file, err := os.CreateTemp(dir, "clip-index-layer-*.tar") + if err != nil { + log.Warn().Err(err).Str("layer_digest", layerDigest).Msg("failed to create layer content cache temp file") + return nil + } + + return &indexedLayerContentCacheSpool{file: file, path: file.Name()} +} + +func (s *indexedLayerContentCacheSpool) Write(p []byte) (int, error) { + if s == nil || s.file == nil { + return len(p), nil + } + + n, err := s.file.Write(p) + if err != nil || n != len(p) { + if err == nil { + err = io.ErrShortWrite + } + s.err = err + s.closeAndRemove() + return len(p), nil + } + + return len(p), nil +} + +func (s *indexedLayerContentCacheSpool) close() error { + if s == nil || s.file == nil { + return nil + } + err := s.file.Close() + s.file = nil + return err +} + +func (s *indexedLayerContentCacheSpool) closeAndRemove() { + if s == nil { + return + } + _ = s.close() + if s.path != "" { + _ = os.Remove(s.path) + } } // countingReader tracks bytes read from an io.Reader @@ -115,7 +180,7 @@ func (ca *ClipArchiver) IndexOCIImage(ctx context.Context, opts IndexOCIImageOpt // not the storage reference (which is just stored in metadata) fetchRegistryURL := ref.Context().RegistryStr() fetchRepository := ref.Context().RepositoryStr() - + // Try to get credentials from provider authConfig, err := credProvider.GetCredentials(ctx, fetchRegistryURL, fetchRepository) if err != nil && err != common.ErrNoCredentials { @@ -289,10 +354,21 @@ func (ca *ClipArchiver) indexLayerOptimized( } defer gzr.Close() - // Streaming hash computation via TeeReader (zero-copy) - // TeeReader writes to hasher while tar.Reader consumes data + var cacheSpool *indexedLayerContentCacheSpool + if opts.ContentCache != nil { + cacheSpool = newIndexedLayerContentCacheSpool(opts.ContentCacheDir, layerDigest) + defer cacheSpool.closeAndRemove() + } + + // Streaming hash computation via TeeReader. + // When a content cache is configured, the same decompressed byte stream is + // also spooled once so the runtime does not decompress this layer again. hasher := sha256.New() - hashingReader := io.TeeReader(gzr, hasher) + hashWriter := io.Writer(hasher) + if cacheSpool != nil { + hashWriter = io.MultiWriter(hasher, cacheSpool) + } + hashingReader := io.TeeReader(gzr, hashWriter) uncompressedCounter := &countingReader{r: hashingReader} tr := tar.NewReader(uncompressedCounter) @@ -360,6 +436,25 @@ func (ca *ClipArchiver) indexLayerOptimized( // Finalize hash (includes all bytes: file contents + tar headers + padding) decompressedHash := hex.EncodeToString(hasher.Sum(nil)) + if opts.ContentCache != nil && cacheSpool != nil && cacheSpool.err == nil && cacheSpool.path != "" { + if err := cacheSpool.close(); err != nil { + return nil, "", fmt.Errorf("failed to close layer content cache temp file: %w", err) + } + + if err := ca.storeIndexedLayerInContentCache(ctx, opts.ContentCache, cacheSpool.path, decompressedHash, layerDigest); err != nil { + log.Warn(). + Err(err). + Str("layer_digest", layerDigest). + Str("decompressed_hash", decompressedHash). + Msg("failed to store indexed layer in content cache") + } + } else if cacheSpool != nil && cacheSpool.err != nil { + log.Warn(). + Err(cacheSpool.err). + Str("layer_digest", layerDigest). + Msg("skipping indexed layer content cache store after spool write failure") + } + // Return gzip index and decompressed hash return &common.GzipIndex{ LayerDigest: layerDigest, @@ -367,6 +462,94 @@ func (ca *ClipArchiver) indexLayerOptimized( }, decompressedHash, nil } +func (ca *ClipArchiver) storeIndexedLayerInContentCache(ctx context.Context, contentCache storage.ContentCache, filePath, decompressedHash, layerDigest string) error { + if contentCache == nil { + return nil + } + + if existsCache, ok := contentCache.(storage.ContentCacheExists); ok { + exists, err := existsCache.ContentExists(decompressedHash, struct{ RoutingKey string }{RoutingKey: decompressedHash}) + if err != nil { + log.Warn(). + Err(err). + Str("layer_digest", layerDigest). + Str("decompressed_hash", decompressedHash). + Msg("failed to check indexed layer content cache") + } else if exists { + log.Info(). + Str("layer_digest", layerDigest). + Str("decompressed_hash", decompressedHash). + Msg("indexed layer already present in content cache") + return nil + } + } + + file, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open indexed layer temp file: %w", err) + } + defer file.Close() + + info, err := file.Stat() + if err != nil { + return fmt.Errorf("failed to stat indexed layer temp file: %w", err) + } + + storeCtx, cancel := context.WithCancel(ctx) + defer cancel() + + chunks := make(chan []byte, 2) + readErrCh := make(chan error, 1) + go func() { + defer close(chunks) + buf := make([]byte, indexedLayerContentCacheChunkSize) + for { + n, readErr := file.Read(buf) + if n > 0 { + chunk := make([]byte, n) + copy(chunk, buf[:n]) + select { + case chunks <- chunk: + case <-storeCtx.Done(): + readErrCh <- storeCtx.Err() + return + } + } + if readErr == io.EOF { + readErrCh <- nil + return + } + if readErr != nil { + readErrCh <- readErr + return + } + } + }() + + started := time.Now() + actualHash, storeErr := contentCache.StoreContent(chunks, decompressedHash, struct{ RoutingKey string }{RoutingKey: decompressedHash}) + cancel() + readErr := <-readErrCh + if storeErr != nil { + return storeErr + } + if readErr != nil { + return readErr + } + if actualHash != "" && actualHash != decompressedHash { + return fmt.Errorf("indexed layer content cache hash mismatch: expected %s, got %s", decompressedHash, actualHash) + } + + log.Info(). + Str("layer_digest", layerDigest). + Str("decompressed_hash", decompressedHash). + Int64("bytes", info.Size()). + Dur("duration", time.Since(started)). + Msg("stored indexed layer in content cache") + + return nil +} + // handleWhiteout processes OCI whiteout files func (ca *ClipArchiver) handleWhiteout(index *btree.BTree, fullPath string) bool { dir := path.Dir(fullPath) diff --git a/pkg/common/readtrace.go b/pkg/common/readtrace.go new file mode 100644 index 0000000..40ece96 --- /dev/null +++ b/pkg/common/readtrace.go @@ -0,0 +1,45 @@ +package common + +import ( + "context" + "time" +) + +type readTraceCallerPIDKey struct{} + +// ReadTraceEvent describes a lazy read operation inside a mounted CLIP image. +type ReadTraceEvent struct { + Operation string + Source string + Path string + LayerDigest string + DecompressedHash string + Offset int64 + Length int64 + BytesRead int64 + StartedAt time.Time + Duration time.Duration + CallerPID uint32 + Success bool + Error string + Attrs map[string]string +} + +type ReadTraceObserver func(ReadTraceEvent) + +func WithReadTraceCallerPID(ctx context.Context, pid uint32) context.Context { + if pid == 0 { + return ctx + } + return context.WithValue(ctx, readTraceCallerPIDKey{}, pid) +} + +func ReadTraceCallerPID(ctx context.Context) uint32 { + if ctx == nil { + return 0 + } + if pid, ok := ctx.Value(readTraceCallerPIDKey{}).(uint32); ok { + return pid + } + return 0 +} diff --git a/pkg/storage/local.go b/pkg/storage/local.go index 5d2d0a2..2e8e670 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -1,6 +1,7 @@ package storage import ( + "context" "fmt" "os" @@ -38,6 +39,27 @@ func (s *LocalClipStorage) ReadFile(node *common.ClipNode, dest []byte, off int6 return n, nil } +func (s *LocalClipStorage) LocalFileRegion(ctx context.Context, node *common.ClipNode, off int64, length int64) (LocalFileRegion, bool, error) { + if node == nil || node.NodeType != common.FileNode || length <= 0 || off < 0 { + return LocalFileRegion{}, false, nil + } + if off >= node.DataLen { + return LocalFileRegion{}, false, nil + } + if off+length > node.DataLen { + length = node.DataLen - off + } + if length <= 0 || length > int64(int(^uint(0)>>1)) { + return LocalFileRegion{}, false, nil + } + return LocalFileRegion{ + Path: s.archivePath, + Offset: node.DataPos + off, + Length: int(length), + Source: "local_archive_fd", + }, true, nil +} + func (s *LocalClipStorage) CachedLocally() bool { return true } diff --git a/pkg/storage/oci.go b/pkg/storage/oci.go index 86c58c4..c8a424d 100644 --- a/pkg/storage/oci.go +++ b/pkg/storage/oci.go @@ -31,6 +31,7 @@ type OCIClipStorage struct { contentCache ContentCache // Remote content cache (blobcache) contentCacheAvailable bool // is there an available content cache for range reads? useCheckpoints bool // Enable checkpoint-based partial decompression + readTraceObserver common.ReadTraceObserver mu sync.RWMutex layerDecompressMu sync.Mutex // Prevents duplicate decompression layersDecompressing map[string]chan struct{} // Tracks in-progress decompressions @@ -43,6 +44,7 @@ type OCIClipStorageOpts struct { ContentCacheAvailable bool // is there an available content cache for range reads? DiskCacheDir string // optional local disk cache directory UseCheckpoints bool // Enable checkpoint-based partial decompression (default: false) + ReadTraceObserver common.ReadTraceObserver } func NewOCIClipStorage(opts OCIClipStorageOpts) (*OCIClipStorage, error) { @@ -84,6 +86,7 @@ func NewOCIClipStorage(opts OCIClipStorageOpts) (*OCIClipStorage, error) { contentCache: opts.ContentCache, contentCacheAvailable: opts.ContentCacheAvailable, useCheckpoints: opts.UseCheckpoints, + readTraceObserver: opts.ReadTraceObserver, layersDecompressing: make(map[string]chan struct{}), } @@ -193,11 +196,76 @@ func (s *OCIClipStorage) initLayers(ctx context.Context) error { // 2. Check ContentCache (range read) - fast, network but only what we need // 3. Decompress from OCI - with checkpoints if enabled, otherwise full layer func (s *OCIClipStorage) ReadFile(node *common.ClipNode, dest []byte, offset int64) (int, error) { + return s.ReadFileContext(context.Background(), node, dest, offset) +} + +func (s *OCIClipStorage) LocalFileRegion(ctx context.Context, node *common.ClipNode, offset int64, length int64) (LocalFileRegion, bool, error) { + if node == nil || node.Remote == nil || length <= 0 || offset < 0 { + return LocalFileRegion{}, false, nil + } + + remote := node.Remote + wantUStart := remote.UOffset + offset + wantUEnd := remote.UOffset + remote.ULength + readLen := length + if wantUStart+readLen > wantUEnd { + readLen = wantUEnd - wantUStart + } + if readLen <= 0 || readLen > int64(int(^uint(0)>>1)) { + return LocalFileRegion{}, false, nil + } + + decompressedHash := s.getDecompressedHash(remote.LayerDigest) + if decompressedHash == "" { + return LocalFileRegion{}, false, nil + } + + layerPath := s.getDecompressedCachePath(decompressedHash) + if _, err := os.Stat(layerPath); err == nil { + return LocalFileRegion{ + Path: layerPath, + Offset: wantUStart, + Length: int(readLen), + Source: "disk_cache_fd", + LayerDigest: remote.LayerDigest, + DecompressedHash: decompressedHash, + }, true, nil + } else if !os.IsNotExist(err) { + return LocalFileRegion{}, false, err + } + + pageCache, ok := s.contentCache.(ContentCacheLocalPageRegions) + if !ok || pageCache == nil || !s.contentCacheAvailable { + return LocalFileRegion{}, false, nil + } + + regions, err := pageCache.LocalPageRegions(decompressedHash, wantUStart, readLen, struct{ RoutingKey string }{RoutingKey: decompressedHash}) + if err != nil || len(regions) != 1 { + return LocalFileRegion{}, false, err + } + region := regions[0] + if region.Path == "" || region.Offset < 0 || region.Length != int(readLen) { + return LocalFileRegion{}, false, nil + } + + return LocalFileRegion{ + Path: region.Path, + Offset: region.Offset, + Length: region.Length, + Source: "content_cache_page_fd", + LayerDigest: remote.LayerDigest, + DecompressedHash: decompressedHash, + }, true, nil +} + +func (s *OCIClipStorage) ReadFileContext(ctx context.Context, node *common.ClipNode, dest []byte, offset int64) (n int, err error) { if node.Remote == nil { return 0, fmt.Errorf("legacy data storage not supported in OCI mode") } remote := node.Remote + readStart := time.Now() + readSource := "unknown" // Calculate read range in uncompressed layer space wantUStart := remote.UOffset + offset @@ -214,6 +282,26 @@ func (s *OCIClipStorage) ReadFile(node *common.ClipNode, dest []byte, offset int metrics := common.GetGlobalMetrics() metrics.RecordLayerAccess(remote.LayerDigest) + defer func() { + s.observeRead(ctx, common.ReadTraceEvent{ + Operation: "clip.read", + Source: readSource, + Path: node.Path, + LayerDigest: remote.LayerDigest, + DecompressedHash: s.getDecompressedHash(remote.LayerDigest), + Offset: wantUStart, + Length: readLen, + BytesRead: int64(n), + StartedAt: readStart, + Duration: time.Since(readStart), + Success: err == nil, + Error: errorString(err), + Attrs: map[string]string{ + "content_cache_available": fmt.Sprintf("%t", s.contentCacheAvailable), + "storage_mode": "oci", + }, + }) + }() // Get or compute the decompressed hash decompressedHash := s.getDecompressedHash(remote.LayerDigest) @@ -228,23 +316,55 @@ func (s *OCIClipStorage) ReadFile(node *common.ClipNode, dest []byte, offset int Int64("offset", wantUStart). Int64("length", readLen). Msg("disk cache hit - using local decompressed layer") - return s.readFromDiskCache(layerPath, wantUStart, dest[:readLen]) + metrics.RecordReadHit() + readSource = "disk_cache" + return s.readFromDiskCacheObserved(ctx, node.Path, remote.LayerDigest, decompressedHash, layerPath, wantUStart, dest[:readLen]) } } // Try remote ContentCache range read if s.contentCache != nil && decompressedHash != "" && s.contentCacheAvailable { - if data, err := s.tryRangeReadFromContentCache(decompressedHash, wantUStart, readLen); err == nil { + cacheStart := time.Now() + if n, err := s.tryRangeReadFromContentCache(decompressedHash, wantUStart, dest[:readLen]); err == nil { + metrics.RecordReadHit() + metrics.RecordRangeGet(decompressedHash, int64(n)) + readSource = "content_cache" + s.observeRead(ctx, common.ReadTraceEvent{ + Operation: "clip.content_cache_read", + Source: "content_cache", + Path: node.Path, + LayerDigest: remote.LayerDigest, + DecompressedHash: decompressedHash, + Offset: wantUStart, + Length: readLen, + BytesRead: int64(n), + StartedAt: cacheStart, + Duration: time.Since(cacheStart), + Success: true, + }) log.Debug(). Str("layer_digest", remote.LayerDigest). Str("decompressed_hash", decompressedHash). Int64("offset", wantUStart). Int64("length", readLen). - Int("bytes_read", len(data)). + Int("bytes_read", n). Msg("content cache hit - range read from remote") - copy(dest, data) - return len(data), nil + return n, nil } else { + metrics.RecordReadMiss() + s.observeRead(ctx, common.ReadTraceEvent{ + Operation: "clip.content_cache_read", + Source: "content_cache", + Path: node.Path, + LayerDigest: remote.LayerDigest, + DecompressedHash: decompressedHash, + Offset: wantUStart, + Length: readLen, + StartedAt: cacheStart, + Duration: time.Since(cacheStart), + Success: false, + Error: errorString(err), + }) log.Debug(). Err(err). Str("layer_digest", remote.LayerDigest). @@ -255,7 +375,21 @@ func (s *OCIClipStorage) ReadFile(node *common.ClipNode, dest []byte, offset int // Cache miss - try checkpoint-based decompression if enabled if s.useCheckpoints { + checkpointStart := time.Now() if n, err := s.readWithCheckpoint(remote.LayerDigest, wantUStart, dest[:readLen]); err == nil { + readSource = "checkpoint" + s.observeRead(ctx, common.ReadTraceEvent{ + Operation: "clip.checkpoint_read", + Source: "checkpoint", + Path: node.Path, + LayerDigest: remote.LayerDigest, + Offset: wantUStart, + Length: readLen, + BytesRead: int64(n), + StartedAt: checkpointStart, + Duration: time.Since(checkpointStart), + Success: true, + }) log.Debug(). Str("layer_digest", remote.LayerDigest). Int64("offset", wantUStart). @@ -264,6 +398,18 @@ func (s *OCIClipStorage) ReadFile(node *common.ClipNode, dest []byte, offset int Msg("checkpoint-based decompression successful") return n, nil } else { + s.observeRead(ctx, common.ReadTraceEvent{ + Operation: "clip.checkpoint_read", + Source: "checkpoint", + Path: node.Path, + LayerDigest: remote.LayerDigest, + Offset: wantUStart, + Length: readLen, + StartedAt: checkpointStart, + Duration: time.Since(checkpointStart), + Success: false, + Error: errorString(err), + }) log.Debug(). Err(err). Str("layer_digest", remote.LayerDigest). @@ -272,18 +418,19 @@ func (s *OCIClipStorage) ReadFile(node *common.ClipNode, dest []byte, offset int } // Fallback: decompress entire layer and cache (for future range reads) - decompressedHash, layerPath, err := s.ensureLayerCached(remote.LayerDigest) + decompressedHash, layerPath, err := s.ensureLayerCached(ctx, remote.LayerDigest) if err != nil { return 0, err } // Now read the range we need from the newly cached layer - return s.readFromDiskCache(layerPath, wantUStart, dest[:readLen]) + readSource = "decompressed_layer" + return s.readFromDiskCacheObserved(ctx, node.Path, remote.LayerDigest, decompressedHash, layerPath, wantUStart, dest[:readLen]) } // ensureLayerCached ensures the decompressed layer is available on disk // Returns decompressed hash and path -func (s *OCIClipStorage) ensureLayerCached(digest string) (string, string, error) { +func (s *OCIClipStorage) ensureLayerCached(ctx context.Context, digest string) (string, string, error) { // Get pre-computed decompressed hash from metadata decompressedHash := s.getDecompressedHash(digest) if decompressedHash == "" { @@ -313,7 +460,17 @@ func (s *OCIClipStorage) ensureLayerCached(digest string) (string, string, error // Another goroutine is decompressing - wait for it s.layerDecompressMu.Unlock() log.Info().Str("digest", digest).Msg("waiting for in-progress layer decompression") + waitStart := time.Now() <-waitChan + s.observeRead(ctx, common.ReadTraceEvent{ + Operation: "clip.layer_decompress_wait", + Source: "decompressed_layer", + LayerDigest: digest, + DecompressedHash: decompressedHash, + StartedAt: waitStart, + Duration: time.Since(waitStart), + Success: true, + }) // Now it should be on disk if _, err := os.Stat(layerPath); err == nil { @@ -334,7 +491,18 @@ func (s *OCIClipStorage) ensureLayerCached(digest string) (string, string, error Str("decompressed_hash", decompressedHash). Msg("oci cache miss - downloading and decompressing layer from registry") + decompressStart := time.Now() err := s.decompressAndCacheLayer(digest, layerPath) + s.observeRead(ctx, common.ReadTraceEvent{ + Operation: "clip.layer_decompress", + Source: "oci_registry", + LayerDigest: digest, + DecompressedHash: decompressedHash, + StartedAt: decompressStart, + Duration: time.Since(decompressStart), + Success: err == nil, + Error: errorString(err), + }) // Clean up in-progress tracking s.layerDecompressMu.Lock() @@ -401,6 +569,46 @@ func (s *OCIClipStorage) readFromDiskCache(layerPath string, offset int64, dest return n, nil } +func (s *OCIClipStorage) readFromDiskCacheObserved(ctx context.Context, path, layerDigest, decompressedHash, layerPath string, offset int64, dest []byte) (int, error) { + startedAt := time.Now() + n, err := s.readFromDiskCache(layerPath, offset, dest) + s.observeRead(ctx, common.ReadTraceEvent{ + Operation: "clip.disk_cache_read", + Source: "disk_cache", + Path: path, + LayerDigest: layerDigest, + DecompressedHash: decompressedHash, + Offset: offset, + Length: int64(len(dest)), + BytesRead: int64(n), + StartedAt: startedAt, + Duration: time.Since(startedAt), + Success: err == nil, + Error: errorString(err), + }) + return n, err +} + +func (s *OCIClipStorage) observeRead(ctx context.Context, event common.ReadTraceEvent) { + if s.readTraceObserver == nil { + return + } + if event.StartedAt.IsZero() { + event.StartedAt = time.Now().Add(-event.Duration) + } + if event.CallerPID == 0 { + event.CallerPID = common.ReadTraceCallerPID(ctx) + } + s.readTraceObserver(event) +} + +func errorString(err error) string { + if err == nil { + return "" + } + return err.Error() +} + // decompressAndCacheLayer decompresses a layer from OCI registry and caches it // This is called when both disk cache and ContentCache miss // The entire layer is cached so subsequent reads (on this or other nodes) can do range reads @@ -528,35 +736,57 @@ func streamFileInChunks(filePath string, chunks chan []byte) error { // tryRangeReadFromContentCache attempts a ranged read from remote ContentCache // This enables lazy loading: we fetch only the bytes we need, not the entire layer // decompressedHash is the hash of the decompressed layer data -func (s *OCIClipStorage) tryRangeReadFromContentCache(decompressedHash string, offset, length int64) ([]byte, error) { +func (s *OCIClipStorage) tryRangeReadFromContentCache(decompressedHash string, offset int64, dest []byte) (int, error) { // Defensive nil check (should already be checked by caller) if s.contentCache == nil { - return nil, fmt.Errorf("content cache is not available") + return 0, fmt.Errorf("content cache is not available") + } + + length := int64(len(dest)) + if readInto, ok := s.contentCache.(ContentCacheReadInto); ok { + n, err := readInto.ReadContentInto(decompressedHash, offset, dest, struct{ RoutingKey string }{RoutingKey: decompressedHash}) + if err != nil { + return 0, fmt.Errorf("content cache range read failed: %w", err) + } + if n != length { + return 0, fmt.Errorf("content cache short read: want %d, got %d", length, n) + } + return int(n), nil } // Use GetContent for range reads (offset + length) // This is the KEY optimization: we only fetch the bytes we need! data, err := s.contentCache.GetContent(decompressedHash, offset, length, struct{ RoutingKey string }{RoutingKey: decompressedHash}) if err != nil { - return nil, fmt.Errorf("content cache range read failed: %w", err) + return 0, fmt.Errorf("content cache range read failed: %w", err) } - return data, nil + copy(dest, data) + return len(data), nil } // storeDecompressedInRemoteCache uploads decompressed layer to remote cache for cluster sharing. // Streams file in 32MB chunks with constant memory usage O(32MB). func (s *OCIClipStorage) storeDecompressedInRemoteCache(decompressedHash string, diskPath string) { // Guard against nil contentCache or unavailable cache - if s.contentCache == nil || !s.contentCacheAvailable { + if s.contentCache == nil { log.Debug(). Str("hash", decompressedHash). - Bool("cache_nil", s.contentCache == nil). - Bool("cache_available", s.contentCacheAvailable). + Bool("cache_nil", true). Msg("skipping remote cache store - cache not available") return } + if existsCache, ok := s.contentCache.(ContentCacheExists); ok { + exists, err := existsCache.ContentExists(decompressedHash, struct{ RoutingKey string }{RoutingKey: decompressedHash}) + if err != nil { + log.Warn().Err(err).Str("hash", decompressedHash).Msg("failed to check content cache before layer store") + } else if exists { + log.Info().Str("hash", decompressedHash).Msg("decompressed layer already present in content cache") + return + } + } + chunks := make(chan []byte, 1) go func() { defer close(chunks) diff --git a/pkg/storage/oci_test.go b/pkg/storage/oci_test.go index 7fb0516..59e158b 100644 --- a/pkg/storage/oci_test.go +++ b/pkg/storage/oci_test.go @@ -3,12 +3,14 @@ package storage import ( "bytes" "compress/gzip" + "context" "crypto/sha256" "encoding/hex" "errors" "fmt" "io" "os" + "path/filepath" "sync" "testing" "time" @@ -22,21 +24,26 @@ import ( // Mock ContentCache for testing (implements range read interface) type mockCache struct { - mu sync.Mutex - store map[string][]byte + mu sync.Mutex + store map[string][]byte + localRegions map[string][]LocalPageRegion // Error injection getError error setError error // Call tracking - getCalls int - setCalls int + getCalls int + setCalls int + localPageRegionCalls int + localPageRegionOffset int64 + localPageRegionLength int64 } func newMockCache() *mockCache { return &mockCache{ - store: make(map[string][]byte), + store: make(map[string][]byte), + localRegions: make(map[string][]LocalPageRegion), } } @@ -88,13 +95,31 @@ func (m *mockCache) StoreContent(chunks chan []byte, hash string, opts struct{ R return hash, nil } +func (m *mockCache) LocalPageRegions(hash string, offset int64, length int64, opts struct{ RoutingKey string }) ([]LocalPageRegion, error) { + m.mu.Lock() + defer m.mu.Unlock() + + m.localPageRegionCalls++ + m.localPageRegionOffset = offset + m.localPageRegionLength = length + regions := m.localRegions[hash] + if len(regions) == 0 { + return nil, fmt.Errorf("not found in cache") + } + return append([]LocalPageRegion(nil), regions...), nil +} + func (m *mockCache) reset() { m.mu.Lock() defer m.mu.Unlock() m.store = make(map[string][]byte) + m.localRegions = make(map[string][]LocalPageRegion) m.getCalls = 0 m.setCalls = 0 + m.localPageRegionCalls = 0 + m.localPageRegionOffset = 0 + m.localPageRegionLength = 0 m.getError = nil m.setError = nil } @@ -217,6 +242,98 @@ func TestOCIStorage_CacheHit(t *testing.T) { assert.Equal(t, 0, cache.setCalls, "cache.Set should not be called on cache hit") } +func TestOCIStorage_LocalFileRegionUsesDiskCache(t *testing.T) { + testData := []byte("0123456789abcdefghijklmnopqrstuvwxyz") + digest := v1.Hash{Algorithm: "sha256", Hex: "abc123"} + hasher := sha256.New() + hasher.Write(testData) + decompressedHash := hex.EncodeToString(hasher.Sum(nil)) + cacheDir := t.TempDir() + require.NoError(t, os.WriteFile(filepath.Join(cacheDir, decompressedHash), testData, 0644)) + + metadata := &common.ClipArchiveMetadata{ + StorageInfo: &common.OCIStorageInfo{ + GzipIdxByLayer: map[string]*common.GzipIndex{digest.String(): {}}, + DecompressedHashByLayer: map[string]string{ + digest.String(): decompressedHash, + }, + }, + } + storage := &OCIClipStorage{ + metadata: metadata, + storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), + diskCacheDir: cacheDir, + contentCacheAvailable: true, + } + node := &common.ClipNode{ + Remote: &common.RemoteRef{ + LayerDigest: digest.String(), + UOffset: 10, + ULength: 20, + }, + } + + region, ok, err := storage.LocalFileRegion(context.Background(), node, 3, 7) + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, filepath.Join(cacheDir, decompressedHash), region.Path) + assert.Equal(t, int64(13), region.Offset) + assert.Equal(t, 7, region.Length) + assert.Equal(t, "disk_cache_fd", region.Source) + assert.Equal(t, digest.String(), region.LayerDigest) + assert.Equal(t, decompressedHash, region.DecompressedHash) +} + +func TestOCIStorage_LocalFileRegionUsesContentCachePage(t *testing.T) { + testData := []byte("0123456789abcdefghijklmnopqrstuvwxyz") + digest := v1.Hash{Algorithm: "sha256", Hex: "abc123"} + hasher := sha256.New() + hasher.Write(testData) + decompressedHash := hex.EncodeToString(hasher.Sum(nil)) + pagePath := filepath.Join(t.TempDir(), "page") + require.NoError(t, os.WriteFile(pagePath, testData, 0644)) + + cache := newMockCache() + cache.localRegions[decompressedHash] = []LocalPageRegion{{ + Path: pagePath, + Offset: 1234, + Length: 9, + }} + metadata := &common.ClipArchiveMetadata{ + StorageInfo: &common.OCIStorageInfo{ + GzipIdxByLayer: map[string]*common.GzipIndex{digest.String(): {}}, + DecompressedHashByLayer: map[string]string{ + digest.String(): decompressedHash, + }, + }, + } + storage := &OCIClipStorage{ + metadata: metadata, + storageInfo: metadata.StorageInfo.(*common.OCIStorageInfo), + diskCacheDir: t.TempDir(), + contentCache: cache, + contentCacheAvailable: true, + } + node := &common.ClipNode{ + Remote: &common.RemoteRef{ + LayerDigest: digest.String(), + UOffset: 4096, + ULength: 64, + }, + } + + region, ok, err := storage.LocalFileRegion(context.Background(), node, 5, 9) + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, pagePath, region.Path) + assert.Equal(t, int64(1234), region.Offset) + assert.Equal(t, 9, region.Length) + assert.Equal(t, "content_cache_page_fd", region.Source) + assert.Equal(t, 1, cache.localPageRegionCalls) + assert.Equal(t, int64(4101), cache.localPageRegionOffset) + assert.Equal(t, int64(9), cache.localPageRegionLength) +} + func TestOCIStorage_CacheMiss(t *testing.T) { // Create test data testData := []byte("Hello, World! This is test data for OCI storage.") @@ -1482,11 +1599,11 @@ func TestNearestCheckpoint(t *testing.T) { } testCases := []struct { - name string - wantUOffset int64 - expectedCOff int64 - expectedUOff int64 - description string + name string + wantUOffset int64 + expectedCOff int64 + expectedUOff int64 + description string }{ {"Before first checkpoint", 0, 100, 0, "should use first checkpoint"}, {"Exactly at checkpoint", 2 * 1024 * 1024, 200, 2 * 1024 * 1024, "should use exact checkpoint"}, diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 27138e0..fb1fa88 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -332,6 +332,27 @@ func (s3c *S3ClipStorage) ReadFile(node *common.ClipNode, dest []byte, off int64 return n, nil } +func (s3c *S3ClipStorage) LocalFileRegion(ctx context.Context, node *common.ClipNode, off int64, length int64) (LocalFileRegion, bool, error) { + if node == nil || node.NodeType != common.FileNode || !s3c.cachedLocally || s3c.localCachePath == "" || length <= 0 || off < 0 { + return LocalFileRegion{}, false, nil + } + if off >= node.DataLen { + return LocalFileRegion{}, false, nil + } + if off+length > node.DataLen { + length = node.DataLen - off + } + if length <= 0 || length > int64(int(^uint(0)>>1)) { + return LocalFileRegion{}, false, nil + } + return LocalFileRegion{ + Path: s3c.localCachePath, + Offset: node.DataPos + off, + Length: int(length), + Source: "local_archive_fd", + }, true, nil +} + func (s3c *S3ClipStorage) downloadChunk(dest []byte, start int64, end int64) (int, error) { rangeHeader := fmt.Sprintf("bytes=%d-%d", start, end) getObjectInput := &s3.GetObjectInput{ diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index c91981c..abc2f63 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -1,6 +1,7 @@ package storage import ( + "context" "errors" "github.com/beam-cloud/clip/pkg/common" @@ -13,6 +14,33 @@ type ContentCache interface { StoreContent(chunks chan []byte, hash string, opts struct{ RoutingKey string }) (string, error) } +type ContentCacheReadInto interface { + ReadContentInto(hash string, offset int64, dest []byte, opts struct{ RoutingKey string }) (int64, error) +} + +type ContentCacheExists interface { + ContentExists(hash string, opts struct{ RoutingKey string }) (bool, error) +} + +type LocalPageRegion struct { + Path string + Offset int64 + Length int +} + +type ContentCacheLocalPageRegions interface { + LocalPageRegions(hash string, offset int64, length int64, opts struct{ RoutingKey string }) ([]LocalPageRegion, error) +} + +type LocalFileRegion struct { + Path string + Offset int64 + Length int + Source string + LayerDigest string + DecompressedHash string +} + type ClipStorageInterface interface { ReadFile(node *common.ClipNode, dest []byte, offset int64) (int, error) Metadata() *common.ClipArchiveMetadata @@ -20,6 +48,14 @@ type ClipStorageInterface interface { Cleanup() error } +type ContextClipStorageInterface interface { + ReadFileContext(ctx context.Context, node *common.ClipNode, dest []byte, offset int64) (int, error) +} + +type LocalFileRegioner interface { + LocalFileRegion(ctx context.Context, node *common.ClipNode, offset int64, length int64) (LocalFileRegion, bool, error) +} + type ClipStorageCredentials struct { S3 *S3ClipStorageCredentials } @@ -34,6 +70,7 @@ type ClipStorageOpts struct { ContentCacheAvailable bool UseCheckpoints bool // Enable checkpoint-based partial decompression for OCI layers RegistryCredProvider interface{} // Registry authentication (for OCI storage) + ReadTraceObserver common.ReadTraceObserver } func NewClipStorage(opts ClipStorageOpts) (ClipStorageInterface, error) { @@ -103,6 +140,7 @@ func NewClipStorage(opts ClipStorageOpts) (ClipStorageInterface, error) { ContentCacheAvailable: opts.ContentCacheAvailable, DiskCacheDir: opts.CachePath, UseCheckpoints: opts.UseCheckpoints, + ReadTraceObserver: opts.ReadTraceObserver, }) case common.StorageModeLocal: storage, err = NewLocalClipStorage(metadata, LocalClipStorageOpts{