Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 57 additions & 9 deletions core/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,13 +753,16 @@ func (inode *Inode) sendUpload(priority int) bool {
canComplete = canComplete && !inode.IsRangeLocked(0, inode.Attributes.Size, true)

if canComplete && (inode.fileHandles == 0 || inode.forceFlush || atomic.LoadInt32(&inode.fs.wantFree) > 0) {
// Capture finalSize while we hold inode.mu and all parts are verified
// flushed. Reading Attributes.Size later can race with concurrent writes.
finalSize := inode.Attributes.Size
// Complete the multipart upload
inode.IsFlushing += inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.stats.flushes, 1)
atomic.AddInt64(&inode.fs.activeFlushers, 1)
go func() {
inode.mu.Lock()
inode.completeMultipart()
inode.completeMultipart(finalSize)
inode.IsFlushing -= inode.fs.flags.MaxParallelParts
inode.mu.Unlock()
atomic.AddInt64(&inode.fs.activeFlushers, -1)
Expand Down Expand Up @@ -1547,24 +1550,31 @@ func (inode *Inode) flushSmallObject() {
inode.mu.Unlock()
}

func (inode *Inode) copyUnmodifiedParts(numParts uint64) (err error) {
func (inode *Inode) copyUnmodifiedParts(numParts, finalSize uint64) (err error) {
maxMerge := inode.fs.flags.MaxMergeCopyMB * 1024 * 1024

// First collect ranges to be unaffected by sudden parallel changes
// First collect ranges of nil (unuploaded) parts.
// partRange() returns the full partition size for every part, but the
// last part of the file is typically shorter. Cap partEnd to finalSize
// so copy ranges never extend beyond the intended final object boundary.
var ranges []uint64
var startPart, endPart uint64
var startOffset, endOffset uint64
var maxRequiredSource uint64
for i := uint64(0); i < numParts; i++ {
partOffset, partSize := inode.fs.partRange(i)
partEnd := partOffset + partSize
if partEnd > inode.Attributes.Size {
partEnd = inode.Attributes.Size
if partEnd > finalSize {
partEnd = finalSize
}
if inode.mpu.Parts[i] == nil {
if endPart == 0 {
startPart, startOffset = i, partOffset
}
endPart, endOffset = i+1, partEnd
if endOffset > maxRequiredSource {
maxRequiredSource = endOffset
}
if endOffset-startOffset >= maxMerge {
ranges = append(ranges, startPart, startOffset, endOffset-startOffset)
startPart, endPart = 0, 0
Expand All @@ -1588,7 +1598,40 @@ func (inode *Inode) copyUnmodifiedParts(numParts uint64) (err error) {
guard := make(chan int, inode.fs.flags.MaxParallelCopy)
var wg sync.WaitGroup
inode.mu.Unlock()
// HeadBlob to get the actual source object size before UploadPartCopy.
var sourceSize uint64
headResp, headErr := cloud.HeadBlob(&HeadBlobInput{Key: key})
if headErr == nil {
sourceSize = headResp.Size
} else if mapAwsError(headErr) == syscall.ENOENT {
// Source object does not exist — nothing to copy
inode.mu.Lock()
return nil
} else {
log.Warnf("HeadBlob failed for %v during copyUnmodifiedParts: %v", key, headErr)
inode.mu.Lock()
return headErr
}
// Fail fast: if pending unmodified-copy ranges require bytes beyond
// the source object, copying cannot produce a consistent final object.
if maxRequiredSource > sourceSize {
log.Warnf(
"Source object %v is smaller than required for unmodified copy (source=%v, required=%v), treating as conflict",
key, sourceSize, maxRequiredSource,
)
inode.mu.Lock()
return syscall.ERANGE
}
for i := 0; i < len(ranges); i += 3 {
offset := ranges[i+1]
size := ranges[i+2]
// Clip copy range to actual source object boundaries.
if offset >= sourceSize {
continue
}
if offset+size > sourceSize {
size = sourceSize - offset
}
guard <- i
if err != nil {
break
Expand All @@ -1612,6 +1655,12 @@ func (inode *Inode) copyUnmodifiedParts(numParts uint64) (err error) {
Size: size,
})
if requestErr != nil {
// Treat stale source-size races as conflict, not hard EINVAL.
if mapAwsError(requestErr) == syscall.EINVAL &&
strings.Contains(requestErr.Error(), "InvalidArgument") &&
strings.Contains(requestErr.Error(), "Range specified is not valid for source object of size") {
requestErr = syscall.ERANGE
}
log.Warnf("Failed to copy unmodified range %v-%v MB of object %v: %v",
offset/1024/1024, (offset+size+1024*1024-1)/1024/1024, key, requestErr)
err = requestErr
Expand All @@ -1621,7 +1670,7 @@ func (inode *Inode) copyUnmodifiedParts(numParts uint64) (err error) {
}
wg.Done()
<-guard
}(uint64(ranges[i]), uint64(ranges[i+1]), uint64(ranges[i+2]))
}(uint64(ranges[i]), offset, size)
}
wg.Wait()
inode.mu.Lock()
Expand Down Expand Up @@ -1732,19 +1781,18 @@ func (inode *Inode) flushPart(part uint64) {
}

// LOCKS_REQUIRED(inode.mu)
func (inode *Inode) completeMultipart() {
func (inode *Inode) completeMultipart(finalSize uint64) {
// Server-side copy unmodified parts
if inode.mpu == nil {
// Multipart upload was canceled in the meantime (by a parallel conflict) => do not complete
return
}
finalSize := inode.Attributes.Size
numParts := inode.fs.partNum(finalSize)
numPartOffset, _ := inode.fs.partRange(numParts)
if numPartOffset < finalSize {
numParts++
}
err := inode.copyUnmodifiedParts(numParts)
err := inode.copyUnmodifiedParts(numParts, finalSize)
if !(inode.CacheState == ST_CREATED || inode.CacheState == ST_MODIFIED) {
// State changed, abort this flush (even if we get ENOENT)
return
Expand Down
Loading