diff --git a/pkg/fileservice/aws_sdk_v2.go b/pkg/fileservice/aws_sdk_v2.go index 49bff5d58ad5a..37ab5df3753b0 100644 --- a/pkg/fileservice/aws_sdk_v2.go +++ b/pkg/fileservice/aws_sdk_v2.go @@ -475,16 +475,22 @@ func (a *AwsSDKv2) WriteMultipartParallel( defer wrapSizeMismatchErr(&err) options := normalizeParallelOption(opt) - if sizeHint != nil && *sizeHint < minMultipartPartSize { - return a.Write(ctx, key, r, sizeHint, options.Expire) - } if sizeHint != nil { + r = &exactSizeReader{ + R: r, + Expected: *sizeHint, + Key: key, + } + if *sizeHint < minMultipartPartSize { + return a.Write(ctx, key, r, sizeHint, options.Expire) + } expectedParts := (*sizeHint + options.PartSize - 1) / options.PartSize if expectedParts > maxMultipartParts { return moerr.NewInternalErrorNoCtxf("too many parts for multipart upload: %d", expectedParts) } } + parentCtx := ctx ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -543,7 +549,7 @@ func (a *AwsSDKv2) WriteMultipartParallel( defer func() { if err != nil { - _, abortErr := a.client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + _, abortErr := a.client.AbortMultipartUpload(context.WithoutCancel(parentCtx), &s3.AbortMultipartUploadInput{ Bucket: ptrTo(a.bucket), Key: ptrTo(key), UploadId: output.UploadId, diff --git a/pkg/fileservice/io.go b/pkg/fileservice/io.go index 83dfa6df26d79..6f2684405087d 100644 --- a/pkg/fileservice/io.go +++ b/pkg/fileservice/io.go @@ -17,6 +17,8 @@ package fileservice import ( "io" "sync/atomic" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" ) type readCloser struct { @@ -47,6 +49,36 @@ func (c *countingReader) Read(data []byte) (int, error) { return n, err } +type exactSizeReader struct { + R io.Reader + Expected int64 + Key string +} + +var _ io.Reader = new(exactSizeReader) + +func (r *exactSizeReader) Read(data []byte) (int, error) { + if len(data) == 0 { + return 0, nil + } + if r.Expected == 0 { + n, err := r.R.Read(data[:1]) + if n > 0 { + return 0, moerr.NewSizeNotMatchNoCtx(r.Key) + } + return 0, err + } + if int64(len(data)) > r.Expected { + data = data[:r.Expected] + } + n, err := r.R.Read(data) + r.Expected -= int64(n) + if err == io.EOF && r.Expected > 0 { + return n, moerr.NewSizeNotMatchNoCtx(r.Key) + } + return n, err +} + type writeCloser struct { w io.Writer closeFunc func() error diff --git a/pkg/fileservice/parallel_sdk_test.go b/pkg/fileservice/parallel_sdk_test.go index 5a7d699fa5ace..a107b58743aae 100644 --- a/pkg/fileservice/parallel_sdk_test.go +++ b/pkg/fileservice/parallel_sdk_test.go @@ -17,6 +17,7 @@ package fileservice import ( "bytes" "context" + "errors" "fmt" "io" "net/http" @@ -36,6 +37,28 @@ import ( costypes "github.com/tencentyun/cos-go-sdk-v5" ) +// failAfterBytesReader wraps an io.Reader and returns errAfter once +// totalBytes have been read. Reads up to failAfter bytes succeed normally. +type failAfterBytesReader struct { + r io.Reader + readSoFar int64 + failAfter int64 + errAfter error +} + +func (r *failAfterBytesReader) Read(p []byte) (int, error) { + if r.readSoFar >= r.failAfter { + return 0, r.errAfter + } + remaining := r.failAfter - r.readSoFar + if int64(len(p)) > remaining { + p = p[:remaining] + } + n, err := r.r.Read(p) + r.readSoFar += int64(n) + return n, err +} + func newMockAWSServer(t *testing.T, failPart int32) (*httptest.Server, *awsServerState) { t.Helper() state := &awsServerState{ @@ -725,3 +748,73 @@ func TestCOSMultipartCreateFail(t *testing.T) { t.Fatalf("expected create multipart error") } } + +// TestAwsParallelMultipartAbortOnReadError verifies that AbortMultipartUpload +// is sent when a read error occurs after multipart upload has been initiated. +// This covers the case where setErr cancels the derived context and the abort +// defer must use a non-canceled context. +func TestAwsParallelMultipartAbortOnReadError(t *testing.T) { + server, state := newMockAWSServer(t, 0) + defer server.Close() + state.uploadID = "uid-read-err" + + sdk := newTestAWSClient(t, server) + + // Provide enough data for the first readChunk to succeed (triggering + // multipart initiation), then fail on the next read. + data := bytes.Repeat([]byte("x"), int(minMultipartPartSize*2)) + r := &failAfterBytesReader{ + r: bytes.NewReader(data), + failAfter: minMultipartPartSize, + errAfter: errors.New("size does not match"), + } + size := int64(len(data)) + err := sdk.WriteMultipartParallel(context.Background(), "object", r, &size, &ParallelMultipartOption{ + PartSize: minMultipartPartSize, + Concurrency: 2, + }) + if err == nil { + t.Fatal("expected read error") + } + if len(state.completeBody) > 0 { + t.Fatal("expected no complete body") + } + if !state.aborted.Load() { + t.Fatal("expected abort request to be sent for size mismatch read error") + } +} + +// TestCOSParallelMultipartAbortOnReadError verifies that AbortMultipartUpload +// is sent when a read error occurs after multipart upload has been initiated +// for the COS SDK. +func TestCOSParallelMultipartAbortOnReadError(t *testing.T) { + server, state := newMockCOSServer(t, 0) + defer server.Close() + state.uploadID = "cos-uid-read-err" + + sdk := newTestCOSClient(t, server) + + data := bytes.Repeat([]byte("y"), int(minMultipartPartSize*2)) + r := &failAfterBytesReader{ + r: bytes.NewReader(data), + failAfter: minMultipartPartSize, + errAfter: errors.New("size does not match"), + } + size := int64(len(data)) + err := sdk.WriteMultipartParallel(context.Background(), "object", r, &size, &ParallelMultipartOption{ + PartSize: minMultipartPartSize, + Concurrency: 2, + }) + if err == nil { + t.Fatal("expected read error") + } + if state.completed.Load() { + t.Fatal("expected no complete multipart upload") + } + if len(state.completeBody) > 0 { + t.Fatal("expected no complete body") + } + if !state.aborted.Load() { + t.Fatal("expected abort request to be sent for size mismatch read error") + } +} diff --git a/pkg/fileservice/qcloud_sdk.go b/pkg/fileservice/qcloud_sdk.go index 165199aa37563..687788eb98f4d 100644 --- a/pkg/fileservice/qcloud_sdk.go +++ b/pkg/fileservice/qcloud_sdk.go @@ -106,6 +106,11 @@ func NewQCloudSDK( httpClient, ) + // Disable COS SDK built-in retry — MatrixOne wraps all operations in + // its own DoWithRetry, and SDK retry would double the request count + // and stretch failure latency. + client.Conf.RetryOpt.Count = 0 + logutil.Info("new object storage", zap.Any("sdk", "qcloud"), zap.Any("arguments", args), @@ -113,7 +118,9 @@ func NewQCloudSDK( if !args.NoBucketValidation { // validate bucket - _, err := client.Bucket.Head(ctx, &cos.BucketHeadOptions{}) + _, err := DoWithRetry("cos bucket head", func() (*cos.Response, error) { + return client.Bucket.Head(ctx, &cos.BucketHeadOptions{}) + }, maxRetryAttemps, IsRetryableError) if err != nil { return nil, err } @@ -268,13 +275,34 @@ func (a *QCloudSDK) Write( } } else { - err = a.putObject( - ctx, - key, - r, - sizeHint, - expire, - ) + seeker, ok := r.(io.Seeker) + if !ok { + err := a.WriteMultipartParallel(ctx, key, r, sizeHint, &ParallelMultipartOption{ + PartSize: defaultParallelMultipartPartSize, + Concurrency: 1, + Expire: expire, + }) + if err != nil { + return err + } + return nil + } + offset, err := seeker.Seek(0, io.SeekCurrent) + if err != nil { + return err + } + _, err = DoWithRetry("write", func() (int, error) { + if _, err := seeker.Seek(offset, io.SeekStart); err != nil { + return 0, err + } + return 0, a.putObject( + ctx, + key, + r, + sizeHint, + expire, + ) + }, maxRetryAttemps, IsRetryableError) if err != nil { return err } @@ -297,16 +325,22 @@ func (a *QCloudSDK) WriteMultipartParallel( defer wrapSizeMismatchErr(&err) options := normalizeParallelOption(opt) - if sizeHint != nil && *sizeHint < minMultipartPartSize { - return a.Write(ctx, key, r, sizeHint, options.Expire) - } if sizeHint != nil { + r = &exactSizeReader{ + R: r, + Expected: *sizeHint, + Key: key, + } + if *sizeHint < minMultipartPartSize { + return a.Write(ctx, key, r, sizeHint, options.Expire) + } expectedParts := (*sizeHint + options.PartSize - 1) / options.PartSize if expectedParts > maxMultipartParts { return moerr.NewInternalErrorNoCtxf("too many parts for multipart upload: %d", expectedParts) } } + parentCtx := ctx ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -372,7 +406,10 @@ func (a *QCloudSDK) WriteMultipartParallel( defer func() { if err != nil { - _, _ = a.client.Object.AbortMultipartUpload(ctx, key, output.UploadID) + _, _ = DoWithRetry("cos abort multipart upload", func() (*cos.Response, error) { + return a.client.Object.AbortMultipartUpload( + context.WithoutCancel(parentCtx), key, output.UploadID) + }, maxRetryAttemps, IsRetryableError) } }() diff --git a/pkg/fileservice/qcloud_sdk_test.go b/pkg/fileservice/qcloud_sdk_test.go index 409b68e4eaadc..929a9554ee415 100644 --- a/pkg/fileservice/qcloud_sdk_test.go +++ b/pkg/fileservice/qcloud_sdk_test.go @@ -15,11 +15,19 @@ package fileservice import ( + "bytes" "context" + "errors" "fmt" + "io" "math/rand/v2" + "net/http" + "net/url" "strings" "testing" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/tencentyun/cos-go-sdk-v5" ) func TestQCloudSDK(t *testing.T) { @@ -81,3 +89,119 @@ func TestQCloudSDK(t *testing.T) { }) } + +func TestQCloudSDKWriteRetriesSeekablePut(t *testing.T) { + data := bytes.Repeat([]byte("x"), int(smallObjectThreshold)) + size := int64(len(data)) + transport := &qcloudRetryPutTransport{} + reader := &trackingSeekReader{Reader: bytes.NewReader(data)} + + baseURL, err := url.Parse("http://cos.local") + if err != nil { + t.Fatal(err) + } + client := cos.NewClient( + &cos.BaseURL{BucketURL: baseURL}, + &http.Client{Transport: transport}, + ) + client.Conf.EnableCRC = false + client.Conf.RetryOpt.Count = 0 + + sdk := &QCloudSDK{ + name: "qcloud-retry-test", + client: client, + } + if err := sdk.Write(context.Background(), "object", reader, &size, nil); err != nil { + t.Fatal(err) + } + + if transport.calls != 2 { + t.Fatalf("expected 2 put attempts, got %d", transport.calls) + } + for i, body := range transport.bodies { + if !bytes.Equal(body, data) { + t.Fatalf("attempt %d body mismatch: got %d bytes, want %d", i+1, len(body), len(data)) + } + } + if reader.seekStartCount == 0 { + t.Fatalf("expected QCloudSDK.Write to seek back before retry") + } +} + +type qcloudRetryPutTransport struct { + calls int + bodies [][]byte +} + +func (t *qcloudRetryPutTransport) RoundTrip(req *http.Request) (*http.Response, error) { + t.calls++ + body, err := io.ReadAll(req.Body) + if err != nil { + return nil, err + } + t.bodies = append(t.bodies, body) + if t.calls == 1 { + return nil, errors.New("write: connection reset by peer") + } + return &http.Response{ + StatusCode: http.StatusOK, + Status: "200 OK", + Header: make(http.Header), + Body: io.NopCloser(bytes.NewReader(nil)), + Request: req, + }, nil +} + +type trackingSeekReader struct { + *bytes.Reader + seekStartCount int +} + +func (r *trackingSeekReader) Seek(offset int64, whence int) (int64, error) { + if offset == 0 && whence == io.SeekStart { + r.seekStartCount++ + } + return r.Reader.Seek(offset, whence) +} + +func TestQCloudSDKWriteNonSeekableFallbackPreservesSizeHint(t *testing.T) { + t.Run("empty", func(t *testing.T) { + server, state := newMockCOSServer(t, 0) + defer server.Close() + state.uploadID = "cos-size-mismatch-empty" + + sdk := newTestCOSClient(t, server) + size := int64(smallObjectThreshold) + err := sdk.Write(context.Background(), "object", nonSeekReader{r: bytes.NewReader(nil)}, &size, nil) + if !moerr.IsMoErrCode(err, moerr.ErrSizeNotMatch) { + t.Fatalf("expected size mismatch, got %v", err) + } + if state.putCount != 0 || len(state.parts) != 0 { + t.Fatalf("expected no successful upload for empty short reader") + } + }) + + t.Run("short", func(t *testing.T) { + server, state := newMockCOSServer(t, 0) + defer server.Close() + state.uploadID = "cos-size-mismatch-short" + + sdk := newTestCOSClient(t, server) + size := int64(smallObjectThreshold) + err := sdk.Write(context.Background(), "object", nonSeekReader{r: bytes.NewReader([]byte("short"))}, &size, nil) + if !moerr.IsMoErrCode(err, moerr.ErrSizeNotMatch) { + t.Fatalf("expected size mismatch, got %v", err) + } + if state.putCount != 0 || len(state.parts) != 0 || state.completed.Load() { + t.Fatalf("expected no committed object for short reader") + } + }) +} + +type nonSeekReader struct { + r io.Reader +} + +func (r nonSeekReader) Read(p []byte) (int, error) { + return r.r.Read(p) +}