From 82c3abdcab7893ae754b1df31bc3ec9eba3e1b37 Mon Sep 17 00:00:00 2001 From: jiangxinmeng Date: Wed, 10 Jun 2026 15:51:24 +0800 Subject: [PATCH 1/5] fix: retry qcloud put object with seekable reader (cherry picked from commit c1d47a0b167db7fce2d682eb1dc2f3869ea0c68a) --- pkg/fileservice/qcloud_sdk.go | 46 +++++++++-- pkg/fileservice/qcloud_sdk_test.go | 120 +++++++++++++++++++++++++++++ 2 files changed, 159 insertions(+), 7 deletions(-) diff --git a/pkg/fileservice/qcloud_sdk.go b/pkg/fileservice/qcloud_sdk.go index 165199aa37563..95de727758bf3 100644 --- a/pkg/fileservice/qcloud_sdk.go +++ b/pkg/fileservice/qcloud_sdk.go @@ -28,6 +28,7 @@ import ( "sort" "strconv" "sync" + "sync/atomic" "time" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -268,13 +269,44 @@ func (a *QCloudSDK) Write( } } else { - err = a.putObject( - ctx, - key, - r, - sizeHint, - expire, - ) + seeker, ok := r.(io.Seeker) + if !ok { + var n atomic.Int64 + if sizeHint != nil { + r = &countingReader{ + R: r, + C: &n, + } + } + err := a.WriteMultipartParallel(ctx, key, r, sizeHint, &ParallelMultipartOption{ + PartSize: defaultParallelMultipartPartSize, + Concurrency: 1, + Expire: expire, + }) + if err != nil { + return err + } + if sizeHint != nil && n.Load() != *sizeHint { + return moerr.NewSizeNotMatchNoCtx(key) + } + return nil + } + offset, err := seeker.Seek(0, io.SeekCurrent) + if err != nil { + return err + } + _, err = DoWithRetry("write", func() (int, error) { + if _, err := seeker.Seek(offset, io.SeekStart); err != nil { + return 0, err + } + return 0, a.putObject( + ctx, + key, + r, + sizeHint, + expire, + ) + }, maxRetryAttemps, IsRetryableError) if err != nil { return err } diff --git a/pkg/fileservice/qcloud_sdk_test.go b/pkg/fileservice/qcloud_sdk_test.go index 409b68e4eaadc..dc40ddfbdd985 100644 --- a/pkg/fileservice/qcloud_sdk_test.go +++ b/pkg/fileservice/qcloud_sdk_test.go @@ -15,11 +15,19 @@ package fileservice import ( + "bytes" "context" + "errors" "fmt" + "io" "math/rand/v2" + "net/http" + "net/url" "strings" "testing" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/tencentyun/cos-go-sdk-v5" ) func TestQCloudSDK(t *testing.T) { @@ -81,3 +89,115 @@ func TestQCloudSDK(t *testing.T) { }) } + +func TestQCloudSDKWriteRetriesSeekablePut(t *testing.T) { + data := bytes.Repeat([]byte("x"), int(smallObjectThreshold)) + size := int64(len(data)) + transport := &qcloudRetryPutTransport{} + reader := &trackingSeekReader{Reader: bytes.NewReader(data)} + + baseURL, err := url.Parse("http://cos.local") + if err != nil { + t.Fatal(err) + } + client := cos.NewClient( + &cos.BaseURL{BucketURL: baseURL}, + &http.Client{Transport: transport}, + ) + client.Conf.EnableCRC = false + client.Conf.RetryOpt.Count = 0 + + sdk := &QCloudSDK{ + name: "qcloud-retry-test", + client: client, + } + if err := sdk.Write(context.Background(), "object", reader, &size, nil); err != nil { + t.Fatal(err) + } + + if transport.calls != 2 { + t.Fatalf("expected 2 put attempts, got %d", transport.calls) + } + for i, body := range transport.bodies { + if !bytes.Equal(body, data) { + t.Fatalf("attempt %d body mismatch: got %d bytes, want %d", i+1, len(body), len(data)) + } + } + if reader.seekStartCount == 0 { + t.Fatalf("expected QCloudSDK.Write to seek back before retry") + } +} + +type qcloudRetryPutTransport struct { + calls int + bodies [][]byte +} + +func (t *qcloudRetryPutTransport) RoundTrip(req *http.Request) (*http.Response, error) { + t.calls++ + body, err := io.ReadAll(req.Body) + if err != nil { + return nil, err + } + t.bodies = append(t.bodies, body) + if t.calls == 1 { + return nil, errors.New("write: connection reset by peer") + } + return &http.Response{ + StatusCode: http.StatusOK, + Status: "200 OK", + Header: make(http.Header), + Body: io.NopCloser(bytes.NewReader(nil)), + Request: req, + }, nil +} + +type trackingSeekReader struct { + *bytes.Reader + seekStartCount int +} + +func (r *trackingSeekReader) Seek(offset int64, whence int) (int64, error) { + if offset == 0 && whence == io.SeekStart { + r.seekStartCount++ + } + return r.Reader.Seek(offset, whence) +} + +func TestQCloudSDKWriteNonSeekableFallbackPreservesSizeHint(t *testing.T) { + t.Run("empty", func(t *testing.T) { + server, state := newMockCOSServer(t, 0) + defer server.Close() + state.uploadID = "cos-size-mismatch-empty" + + sdk := newTestCOSClient(t, server) + size := int64(smallObjectThreshold) + err := sdk.Write(context.Background(), "object", nonSeekReader{r: bytes.NewReader(nil)}, &size, nil) + if !moerr.IsMoErrCode(err, moerr.ErrSizeNotMatch) { + t.Fatalf("expected size mismatch, got %v", err) + } + if state.putCount != 0 || len(state.parts) != 0 { + t.Fatalf("expected no successful upload for empty short reader") + } + }) + + t.Run("short", func(t *testing.T) { + server, _ := newMockCOSServer(t, 0) + defer server.Close() + + sdk := newTestCOSClient(t, server) + size := int64(smallObjectThreshold) + err := sdk.Write(context.Background(), "object", nonSeekReader{r: bytes.NewReader([]byte("short"))}, &size, nil) + if !moerr.IsMoErrCode(err, moerr.ErrSizeNotMatch) { + t.Fatalf("expected size mismatch, got %v", err) + } + }) +} + +type nonSeekReader struct { + r io.Reader +} + +func (r nonSeekReader) Read(p []byte) (int, error) { + return r.r.Read(p) +} From 8c7186eceeaf0a975770097b8891a84c9e40b712 Mon Sep 17 00:00:00 2001 From: jiangxinmeng Date: Wed, 10 Jun 2026 17:46:16 +0800 Subject: [PATCH 2/5] fix: reject multipart size mismatch before upload (cherry picked from commit ec2b815aff5adc88817e1167343e9e25ff388459) --- pkg/fileservice/aws_sdk_v2.go | 11 +++++++--- pkg/fileservice/io.go | 32 ++++++++++++++++++++++++++++++ pkg/fileservice/qcloud_sdk.go | 22 ++++++++------------ pkg/fileservice/qcloud_sdk_test.go | 6 +++++- 4 files changed, 53 insertions(+), 18 deletions(-) diff --git a/pkg/fileservice/aws_sdk_v2.go b/pkg/fileservice/aws_sdk_v2.go index 49bff5d58ad5a..16f8714f99611 100644 --- a/pkg/fileservice/aws_sdk_v2.go +++ b/pkg/fileservice/aws_sdk_v2.go @@ -475,10 +475,15 @@ func (a *AwsSDKv2) WriteMultipartParallel( defer wrapSizeMismatchErr(&err) options := normalizeParallelOption(opt) - if sizeHint != nil && *sizeHint < minMultipartPartSize { - return a.Write(ctx, key, r, sizeHint, options.Expire) - } if sizeHint != nil { + r = &exactSizeReader{ + R: r, + Expected: *sizeHint, + Key: key, + } + if *sizeHint < minMultipartPartSize { + return a.Write(ctx, key, r, sizeHint, options.Expire) + } expectedParts := (*sizeHint + options.PartSize - 1) / options.PartSize if expectedParts > maxMultipartParts { return moerr.NewInternalErrorNoCtxf("too many parts for multipart upload: %d", expectedParts) diff --git a/pkg/fileservice/io.go b/pkg/fileservice/io.go index 83dfa6df26d79..6f2684405087d 100644 --- a/pkg/fileservice/io.go +++ b/pkg/fileservice/io.go @@ -17,6 +17,8 @@ package fileservice import ( "io" "sync/atomic" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" ) type readCloser struct { @@ -47,6 +49,36 @@ func (c *countingReader) Read(data []byte) (int, error) { return n, err } +type exactSizeReader struct { + R io.Reader + Expected int64 + Key string +} + +var _ io.Reader = new(exactSizeReader) + +func (r *exactSizeReader) Read(data []byte) (int, error) { + if len(data) == 0 { + return 0, nil + } + if r.Expected == 0 { + n, err := r.R.Read(data[:1]) + if n > 0 { + return 0, moerr.NewSizeNotMatchNoCtx(r.Key) + } + return 0, err + } + if int64(len(data)) > r.Expected { + data = data[:r.Expected] + } + n, err := r.R.Read(data) + r.Expected -= int64(n) + if err == io.EOF && r.Expected > 0 { + return n, moerr.NewSizeNotMatchNoCtx(r.Key) + } + return n, err +} + type writeCloser struct { w io.Writer closeFunc func() error diff --git a/pkg/fileservice/qcloud_sdk.go b/pkg/fileservice/qcloud_sdk.go index 95de727758bf3..80c86a5ef9558 100644 --- a/pkg/fileservice/qcloud_sdk.go +++ b/pkg/fileservice/qcloud_sdk.go @@ -28,7 +28,6 @@ import ( "sort" "strconv" "sync" - "sync/atomic" "time" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -271,13 +270,6 @@ func (a *QCloudSDK) Write( } else { seeker, ok := r.(io.Seeker) if !ok { - var n atomic.Int64 - if sizeHint != nil { - r = &countingReader{ - R: r, - C: &n, - } - } err := a.WriteMultipartParallel(ctx, key, r, sizeHint, &ParallelMultipartOption{ PartSize: defaultParallelMultipartPartSize, Concurrency: 1, @@ -286,9 +278,6 @@ func (a *QCloudSDK) Write( if err != nil { return err } - if sizeHint != nil && n.Load() != *sizeHint { - return moerr.NewSizeNotMatchNoCtx(key) - } return nil } offset, err := seeker.Seek(0, io.SeekCurrent) @@ -329,10 +318,15 @@ func (a *QCloudSDK) WriteMultipartParallel( defer wrapSizeMismatchErr(&err) options := normalizeParallelOption(opt) - if sizeHint != nil && *sizeHint < minMultipartPartSize { - return a.Write(ctx, key, r, sizeHint, options.Expire) - } if sizeHint != nil { + r = &exactSizeReader{ + R: r, + Expected: *sizeHint, + Key: key, + } + if *sizeHint < minMultipartPartSize { + return a.Write(ctx, key, r, sizeHint, options.Expire) + } expectedParts := (*sizeHint + options.PartSize - 1) / options.PartSize if expectedParts > maxMultipartParts { return moerr.NewInternalErrorNoCtxf("too many parts for multipart upload: %d", expectedParts) diff --git a/pkg/fileservice/qcloud_sdk_test.go b/pkg/fileservice/qcloud_sdk_test.go index dc40ddfbdd985..929a9554ee415 100644 --- a/pkg/fileservice/qcloud_sdk_test.go +++ b/pkg/fileservice/qcloud_sdk_test.go @@ -182,8 +182,9 @@ func TestQCloudSDKWriteNonSeekableFallbackPreservesSizeHint(t *testing.T) { }) t.Run("short", func(t *testing.T) { - server, _ := newMockCOSServer(t, 0) + server, state := newMockCOSServer(t, 0) defer server.Close() + state.uploadID = "cos-size-mismatch-short" sdk := newTestCOSClient(t, server) size := int64(smallObjectThreshold) @@ -191,6 +192,9 @@ func TestQCloudSDKWriteNonSeekableFallbackPreservesSizeHint(t *testing.T) { if !moerr.IsMoErrCode(err, moerr.ErrSizeNotMatch) { t.Fatalf("expected size mismatch, got %v", err) } + if state.putCount != 0 || len(state.parts) != 0 || state.completed.Load() { + t.Fatalf("expected no committed object for short reader") + } }) } From 28c855260e92368e62de8465a664987f6cf7647e Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 <51114574+jiangxinmeng1@users.noreply.github.com> Date: Wed, 10 Jun 2026 20:12:12 +0800 Subject: [PATCH 3/5] fix: use non-canceled context for multipart upload abort When setErr cancels the derived context in WriteMultipartParallel, the deferred abort call was using the already-canceled context, causing the AbortMultipartUpload request to be dropped. This leaked incomplete multipart uploads on COS/AWS for read error paths (e.g., size mismatch errors that reach setErr before CompleteMultipartUpload). Save the parent context before WithCancel and use context.WithoutCancel(parentCtx) for the abort defer so the abort request can always proceed regardless of context cancellation. Add tests that assert AbortMultipartUpload is sent for size-mismatch read errors in both QCloud and AWS multipart paths. Co-Authored-By: Claude Fable 5 (cherry picked from commit 698abf712d79127d8521a89714cd726e3c011142) --- pkg/fileservice/aws_sdk_v2.go | 3 +- pkg/fileservice/parallel_sdk_test.go | 93 ++++++++++++++++++++++++++++ pkg/fileservice/qcloud_sdk.go | 4 +- 3 files changed, 98 insertions(+), 2 deletions(-) diff --git a/pkg/fileservice/aws_sdk_v2.go b/pkg/fileservice/aws_sdk_v2.go index 16f8714f99611..37ab5df3753b0 100644 --- a/pkg/fileservice/aws_sdk_v2.go +++ b/pkg/fileservice/aws_sdk_v2.go @@ -490,6 +490,7 @@ func (a *AwsSDKv2) WriteMultipartParallel( } } + parentCtx := ctx ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -548,7 +549,7 @@ func (a *AwsSDKv2) WriteMultipartParallel( defer func() { if err != nil { - _, abortErr := a.client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + _, abortErr := a.client.AbortMultipartUpload(context.WithoutCancel(parentCtx), &s3.AbortMultipartUploadInput{ Bucket: ptrTo(a.bucket), Key: ptrTo(key), UploadId: output.UploadId, diff --git a/pkg/fileservice/parallel_sdk_test.go b/pkg/fileservice/parallel_sdk_test.go index 5a7d699fa5ace..a107b58743aae 100644 --- a/pkg/fileservice/parallel_sdk_test.go +++ b/pkg/fileservice/parallel_sdk_test.go @@ -17,6 +17,7 @@ package fileservice import ( "bytes" "context" + "errors" "fmt" "io" "net/http" @@ -36,6 +37,28 @@ import ( costypes "github.com/tencentyun/cos-go-sdk-v5" ) +// failAfterBytesReader wraps an io.Reader and returns errAfter once +// totalBytes have been read. Reads up to failAfter bytes succeed normally. +type failAfterBytesReader struct { + r io.Reader + readSoFar int64 + failAfter int64 + errAfter error +} + +func (r *failAfterBytesReader) Read(p []byte) (int, error) { + if r.readSoFar >= r.failAfter { + return 0, r.errAfter + } + remaining := r.failAfter - r.readSoFar + if int64(len(p)) > remaining { + p = p[:remaining] + } + n, err := r.r.Read(p) + r.readSoFar += int64(n) + return n, err +} + func newMockAWSServer(t *testing.T, failPart int32) (*httptest.Server, *awsServerState) { t.Helper() state := &awsServerState{ @@ -725,3 +748,73 @@ func TestCOSMultipartCreateFail(t *testing.T) { t.Fatalf("expected create multipart error") } } + +// TestAwsParallelMultipartAbortOnReadError verifies that AbortMultipartUpload +// is sent when a read error occurs after multipart upload has been initiated. +// This covers the case where setErr cancels the derived context and the abort +// defer must use a non-canceled context. +func TestAwsParallelMultipartAbortOnReadError(t *testing.T) { + server, state := newMockAWSServer(t, 0) + defer server.Close() + state.uploadID = "uid-read-err" + + sdk := newTestAWSClient(t, server) + + // Provide enough data for the first readChunk to succeed (triggering + // multipart initiation), then fail on the next read. + data := bytes.Repeat([]byte("x"), int(minMultipartPartSize*2)) + r := &failAfterBytesReader{ + r: bytes.NewReader(data), + failAfter: minMultipartPartSize, + errAfter: errors.New("size does not match"), + } + size := int64(len(data)) + err := sdk.WriteMultipartParallel(context.Background(), "object", r, &size, &ParallelMultipartOption{ + PartSize: minMultipartPartSize, + Concurrency: 2, + }) + if err == nil { + t.Fatal("expected read error") + } + if len(state.completeBody) > 0 { + t.Fatal("expected no complete body") + } + if !state.aborted.Load() { + t.Fatal("expected abort request to be sent for size mismatch read error") + } +} + +// TestCOSParallelMultipartAbortOnReadError verifies that AbortMultipartUpload +// is sent when a read error occurs after multipart upload has been initiated +// for the COS SDK. +func TestCOSParallelMultipartAbortOnReadError(t *testing.T) { + server, state := newMockCOSServer(t, 0) + defer server.Close() + state.uploadID = "cos-uid-read-err" + + sdk := newTestCOSClient(t, server) + + data := bytes.Repeat([]byte("y"), int(minMultipartPartSize*2)) + r := &failAfterBytesReader{ + r: bytes.NewReader(data), + failAfter: minMultipartPartSize, + errAfter: errors.New("size does not match"), + } + size := int64(len(data)) + err := sdk.WriteMultipartParallel(context.Background(), "object", r, &size, &ParallelMultipartOption{ + PartSize: minMultipartPartSize, + Concurrency: 2, + }) + if err == nil { + t.Fatal("expected read error") + } + if state.completed.Load() { + t.Fatal("expected no complete multipart upload") + } + if len(state.completeBody) > 0 { + t.Fatal("expected no complete body") + } + if !state.aborted.Load() { + t.Fatal("expected abort request to be sent for size mismatch read error") + } +} diff --git a/pkg/fileservice/qcloud_sdk.go b/pkg/fileservice/qcloud_sdk.go index 80c86a5ef9558..8c2fd06287f96 100644 --- a/pkg/fileservice/qcloud_sdk.go +++ b/pkg/fileservice/qcloud_sdk.go @@ -333,6 +333,7 @@ func (a *QCloudSDK) WriteMultipartParallel( } } + parentCtx := ctx ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -398,7 +399,8 @@ func (a *QCloudSDK) WriteMultipartParallel( defer func() { if err != nil { - _, _ = a.client.Object.AbortMultipartUpload(ctx, key, output.UploadID) + _, _ = a.client.Object.AbortMultipartUpload( + context.WithoutCancel(parentCtx), key, output.UploadID) } }() From b1584083e990aaf049c3d2a1fef24f95d6845d99 Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 <51114574+jiangxinmeng1@users.noreply.github.com> Date: Wed, 10 Jun 2026 20:38:40 +0800 Subject: [PATCH 4/5] fix: disable COS SDK built-in retry to avoid double retry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The COS SDK enables retry by default (RetryOpt.Count = 3). MatrixOne wraps all operations in its own DoWithRetry, so one transient failure was retried twice — first by the SDK and then by MatrixOne's outer loop — multiplying request count and stretching failure latency. Set client.Conf.RetryOpt.Count = 0 in NewQCloudSDK so only MatrixOne's DoWithRetry handles retries. Co-Authored-By: Claude Fable 5 (cherry picked from commit f5cb84cb3076d538714d7017020ad5df4a4dd48d) --- pkg/fileservice/qcloud_sdk.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/fileservice/qcloud_sdk.go b/pkg/fileservice/qcloud_sdk.go index 8c2fd06287f96..d402f0d984c60 100644 --- a/pkg/fileservice/qcloud_sdk.go +++ b/pkg/fileservice/qcloud_sdk.go @@ -106,6 +106,11 @@ func NewQCloudSDK( httpClient, ) + // Disable COS SDK built-in retry — MatrixOne wraps all operations in + // its own DoWithRetry, and SDK retry would double the request count + // and stretch failure latency. + client.Conf.RetryOpt.Count = 0 + logutil.Info("new object storage", zap.Any("sdk", "qcloud"), zap.Any("arguments", args), From e0b1539a3336b054a480e26c4c5f5cc25a6e182b Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 <51114574+jiangxinmeng1@users.noreply.github.com> Date: Wed, 10 Jun 2026 21:09:13 +0800 Subject: [PATCH 5/5] fix: wrap Bucket.Head and AbortMultipartUpload in DoWithRetry client.Conf.RetryOpt.Count = 0 disables COS SDK built-in retries globally, but two direct SDK calls were not wrapped in MatrixOne's DoWithRetry: - client.Bucket.Head during NewQCloudSDK initialization - AbortMultipartUpload during WriteMultipartParallel error cleanup Both are now wrapped so transient COS failures won't fail startup or drop cleanup. Co-Authored-By: Claude Fable 5 (cherry picked from commit 5704cd30c8dd72dcdc382b7daab0b0d6418a523a) --- pkg/fileservice/qcloud_sdk.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/fileservice/qcloud_sdk.go b/pkg/fileservice/qcloud_sdk.go index d402f0d984c60..687788eb98f4d 100644 --- a/pkg/fileservice/qcloud_sdk.go +++ b/pkg/fileservice/qcloud_sdk.go @@ -118,7 +118,9 @@ func NewQCloudSDK( if !args.NoBucketValidation { // validate bucket - _, err := client.Bucket.Head(ctx, &cos.BucketHeadOptions{}) + _, err := DoWithRetry("cos bucket head", func() (*cos.Response, error) { + return client.Bucket.Head(ctx, &cos.BucketHeadOptions{}) + }, maxRetryAttemps, IsRetryableError) if err != nil { return nil, err } @@ -404,8 +406,10 @@ func (a *QCloudSDK) WriteMultipartParallel( defer func() { if err != nil { - _, _ = a.client.Object.AbortMultipartUpload( - context.WithoutCancel(parentCtx), key, output.UploadID) + _, _ = DoWithRetry("cos abort multipart upload", func() (*cos.Response, error) { + return a.client.Object.AbortMultipartUpload( + context.WithoutCancel(parentCtx), key, output.UploadID) + }, maxRetryAttemps, IsRetryableError) } }()