From 5f026c69556c84beed93f24153ef2284ddc06997 Mon Sep 17 00:00:00 2001 From: Alexandre Manhaes Savio Date: Thu, 19 Feb 2026 13:12:06 +0100 Subject: [PATCH] fix: harden multipart copy source-size handling Capture finalSize under lock, use HeadBlob source-size validation, and fail fast on impossible unmodified-copy ranges to avoid copy thrash. Co-Authored-By: Warp --- core/file.go | 66 +++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 57 insertions(+), 9 deletions(-) diff --git a/core/file.go b/core/file.go index 72c90c57..ec18e338 100644 --- a/core/file.go +++ b/core/file.go @@ -753,13 +753,16 @@ func (inode *Inode) sendUpload(priority int) bool { canComplete = canComplete && !inode.IsRangeLocked(0, inode.Attributes.Size, true) if canComplete && (inode.fileHandles == 0 || inode.forceFlush || atomic.LoadInt32(&inode.fs.wantFree) > 0) { + // Capture finalSize while we hold inode.mu and all parts are verified + // flushed. Reading Attributes.Size later can race with concurrent writes. + finalSize := inode.Attributes.Size // Complete the multipart upload inode.IsFlushing += inode.fs.flags.MaxParallelParts atomic.AddInt64(&inode.fs.stats.flushes, 1) atomic.AddInt64(&inode.fs.activeFlushers, 1) go func() { inode.mu.Lock() - inode.completeMultipart() + inode.completeMultipart(finalSize) inode.IsFlushing -= inode.fs.flags.MaxParallelParts inode.mu.Unlock() atomic.AddInt64(&inode.fs.activeFlushers, -1) @@ -1547,24 +1550,31 @@ func (inode *Inode) flushSmallObject() { inode.mu.Unlock() } -func (inode *Inode) copyUnmodifiedParts(numParts uint64) (err error) { +func (inode *Inode) copyUnmodifiedParts(numParts, finalSize uint64) (err error) { maxMerge := inode.fs.flags.MaxMergeCopyMB * 1024 * 1024 - // First collect ranges to be unaffected by sudden parallel changes + // First collect ranges of nil (unuploaded) parts. + // partRange() returns the full partition size for every part, but the + // last part of the file is typically shorter. Cap partEnd to finalSize + // so copy ranges never extend beyond the intended final object boundary. var ranges []uint64 var startPart, endPart uint64 var startOffset, endOffset uint64 + var maxRequiredSource uint64 for i := uint64(0); i < numParts; i++ { partOffset, partSize := inode.fs.partRange(i) partEnd := partOffset + partSize - if partEnd > inode.Attributes.Size { - partEnd = inode.Attributes.Size + if partEnd > finalSize { + partEnd = finalSize } if inode.mpu.Parts[i] == nil { if endPart == 0 { startPart, startOffset = i, partOffset } endPart, endOffset = i+1, partEnd + if endOffset > maxRequiredSource { + maxRequiredSource = endOffset + } if endOffset-startOffset >= maxMerge { ranges = append(ranges, startPart, startOffset, endOffset-startOffset) startPart, endPart = 0, 0 @@ -1588,7 +1598,40 @@ func (inode *Inode) copyUnmodifiedParts(numParts uint64) (err error) { guard := make(chan int, inode.fs.flags.MaxParallelCopy) var wg sync.WaitGroup inode.mu.Unlock() + // HeadBlob to get the actual source object size before UploadPartCopy. + var sourceSize uint64 + headResp, headErr := cloud.HeadBlob(&HeadBlobInput{Key: key}) + if headErr == nil { + sourceSize = headResp.Size + } else if mapAwsError(headErr) == syscall.ENOENT { + // Source object does not exist — nothing to copy + inode.mu.Lock() + return nil + } else { + log.Warnf("HeadBlob failed for %v during copyUnmodifiedParts: %v", key, headErr) + inode.mu.Lock() + return headErr + } + // Fail fast: if pending unmodified-copy ranges require bytes beyond + // the source object, copying cannot produce a consistent final object. + if maxRequiredSource > sourceSize { + log.Warnf( + "Source object %v is smaller than required for unmodified copy (source=%v, required=%v), treating as conflict", + key, sourceSize, maxRequiredSource, + ) + inode.mu.Lock() + return syscall.ERANGE + } for i := 0; i < len(ranges); i += 3 { + offset := ranges[i+1] + size := ranges[i+2] + // Clip copy range to actual source object boundaries. + if offset >= sourceSize { + continue + } + if offset+size > sourceSize { + size = sourceSize - offset + } guard <- i if err != nil { break @@ -1612,6 +1655,12 @@ func (inode *Inode) copyUnmodifiedParts(numParts uint64) (err error) { Size: size, }) if requestErr != nil { + // Treat stale source-size races as conflict, not hard EINVAL. + if mapAwsError(requestErr) == syscall.EINVAL && + strings.Contains(requestErr.Error(), "InvalidArgument") && + strings.Contains(requestErr.Error(), "Range specified is not valid for source object of size") { + requestErr = syscall.ERANGE + } log.Warnf("Failed to copy unmodified range %v-%v MB of object %v: %v", offset/1024/1024, (offset+size+1024*1024-1)/1024/1024, key, requestErr) err = requestErr @@ -1621,7 +1670,7 @@ func (inode *Inode) copyUnmodifiedParts(numParts uint64) (err error) { } wg.Done() <-guard - }(uint64(ranges[i]), uint64(ranges[i+1]), uint64(ranges[i+2])) + }(uint64(ranges[i]), offset, size) } wg.Wait() inode.mu.Lock() @@ -1732,19 +1781,18 @@ func (inode *Inode) flushPart(part uint64) { } // LOCKS_REQUIRED(inode.mu) -func (inode *Inode) completeMultipart() { +func (inode *Inode) completeMultipart(finalSize uint64) { // Server-side copy unmodified parts if inode.mpu == nil { // Multipart upload was canceled in the meantime (by a parallel conflict) => do not complete return } - finalSize := inode.Attributes.Size numParts := inode.fs.partNum(finalSize) numPartOffset, _ := inode.fs.partRange(numParts) if numPartOffset < finalSize { numParts++ } - err := inode.copyUnmodifiedParts(numParts) + err := inode.copyUnmodifiedParts(numParts, finalSize) if !(inode.CacheState == ST_CREATED || inode.CacheState == ST_MODIFIED) { // State changed, abort this flush (even if we get ENOENT) return