Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions pkg/fileservice/aws_sdk_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,16 +475,22 @@ func (a *AwsSDKv2) WriteMultipartParallel(
defer wrapSizeMismatchErr(&err)

options := normalizeParallelOption(opt)
if sizeHint != nil && *sizeHint < minMultipartPartSize {
return a.Write(ctx, key, r, sizeHint, options.Expire)
}
if sizeHint != nil {
r = &exactSizeReader{
R: r,
Expected: *sizeHint,
Key: key,
}
if *sizeHint < minMultipartPartSize {
return a.Write(ctx, key, r, sizeHint, options.Expire)
}
expectedParts := (*sizeHint + options.PartSize - 1) / options.PartSize
if expectedParts > maxMultipartParts {
return moerr.NewInternalErrorNoCtxf("too many parts for multipart upload: %d", expectedParts)
}
}

parentCtx := ctx
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down Expand Up @@ -543,7 +549,7 @@ func (a *AwsSDKv2) WriteMultipartParallel(

defer func() {
if err != nil {
_, abortErr := a.client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
_, abortErr := a.client.AbortMultipartUpload(context.WithoutCancel(parentCtx), &s3.AbortMultipartUploadInput{
Bucket: ptrTo(a.bucket),
Key: ptrTo(key),
UploadId: output.UploadId,
Expand Down
32 changes: 32 additions & 0 deletions pkg/fileservice/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package fileservice
import (
"io"
"sync/atomic"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
)

type readCloser struct {
Expand Down Expand Up @@ -47,6 +49,36 @@ func (c *countingReader) Read(data []byte) (int, error) {
return n, err
}

type exactSizeReader struct {
R io.Reader
Expected int64
Key string
}

var _ io.Reader = new(exactSizeReader)

func (r *exactSizeReader) Read(data []byte) (int, error) {
if len(data) == 0 {
return 0, nil
}
if r.Expected == 0 {
n, err := r.R.Read(data[:1])
if n > 0 {
return 0, moerr.NewSizeNotMatchNoCtx(r.Key)
}
return 0, err
}
if int64(len(data)) > r.Expected {
data = data[:r.Expected]
}
n, err := r.R.Read(data)
r.Expected -= int64(n)
if err == io.EOF && r.Expected > 0 {
return n, moerr.NewSizeNotMatchNoCtx(r.Key)
}
return n, err
}

type writeCloser struct {
w io.Writer
closeFunc func() error
Expand Down
93 changes: 93 additions & 0 deletions pkg/fileservice/parallel_sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package fileservice
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -36,6 +37,28 @@ import (
costypes "github.com/tencentyun/cos-go-sdk-v5"
)

// failAfterBytesReader wraps an io.Reader and returns errAfter once
// totalBytes have been read. Reads up to failAfter bytes succeed normally.
type failAfterBytesReader struct {
r io.Reader
readSoFar int64
failAfter int64
errAfter error
}

func (r *failAfterBytesReader) Read(p []byte) (int, error) {
if r.readSoFar >= r.failAfter {
return 0, r.errAfter
}
remaining := r.failAfter - r.readSoFar
if int64(len(p)) > remaining {
p = p[:remaining]
}
n, err := r.r.Read(p)
r.readSoFar += int64(n)
return n, err
}

func newMockAWSServer(t *testing.T, failPart int32) (*httptest.Server, *awsServerState) {
t.Helper()
state := &awsServerState{
Expand Down Expand Up @@ -725,3 +748,73 @@ func TestCOSMultipartCreateFail(t *testing.T) {
t.Fatalf("expected create multipart error")
}
}

// TestAwsParallelMultipartAbortOnReadError verifies that AbortMultipartUpload
// is sent when a read error occurs after multipart upload has been initiated.
// This covers the case where setErr cancels the derived context and the abort
// defer must use a non-canceled context.
func TestAwsParallelMultipartAbortOnReadError(t *testing.T) {
server, state := newMockAWSServer(t, 0)
defer server.Close()
state.uploadID = "uid-read-err"

sdk := newTestAWSClient(t, server)

// Provide enough data for the first readChunk to succeed (triggering
// multipart initiation), then fail on the next read.
data := bytes.Repeat([]byte("x"), int(minMultipartPartSize*2))
r := &failAfterBytesReader{
r: bytes.NewReader(data),
failAfter: minMultipartPartSize,
errAfter: errors.New("size does not match"),
}
size := int64(len(data))
err := sdk.WriteMultipartParallel(context.Background(), "object", r, &size, &ParallelMultipartOption{
PartSize: minMultipartPartSize,
Concurrency: 2,
})
if err == nil {
t.Fatal("expected read error")
}
if len(state.completeBody) > 0 {
t.Fatal("expected no complete body")
}
if !state.aborted.Load() {
t.Fatal("expected abort request to be sent for size mismatch read error")
}
}

// TestCOSParallelMultipartAbortOnReadError verifies that AbortMultipartUpload
// is sent when a read error occurs after multipart upload has been initiated
// for the COS SDK.
func TestCOSParallelMultipartAbortOnReadError(t *testing.T) {
server, state := newMockCOSServer(t, 0)
defer server.Close()
state.uploadID = "cos-uid-read-err"

sdk := newTestCOSClient(t, server)

data := bytes.Repeat([]byte("y"), int(minMultipartPartSize*2))
r := &failAfterBytesReader{
r: bytes.NewReader(data),
failAfter: minMultipartPartSize,
errAfter: errors.New("size does not match"),
}
size := int64(len(data))
err := sdk.WriteMultipartParallel(context.Background(), "object", r, &size, &ParallelMultipartOption{
PartSize: minMultipartPartSize,
Concurrency: 2,
})
if err == nil {
t.Fatal("expected read error")
}
if state.completed.Load() {
t.Fatal("expected no complete multipart upload")
}
if len(state.completeBody) > 0 {
t.Fatal("expected no complete body")
}
if !state.aborted.Load() {
t.Fatal("expected abort request to be sent for size mismatch read error")
}
}
61 changes: 49 additions & 12 deletions pkg/fileservice/qcloud_sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,21 @@ func NewQCloudSDK(
httpClient,
)

// Disable COS SDK built-in retry — MatrixOne wraps all operations in
// its own DoWithRetry, and SDK retry would double the request count
// and stretch failure latency.
client.Conf.RetryOpt.Count = 0

logutil.Info("new object storage",
zap.Any("sdk", "qcloud"),
zap.Any("arguments", args),
)

if !args.NoBucketValidation {
// validate bucket
_, err := client.Bucket.Head(ctx, &cos.BucketHeadOptions{})
_, err := DoWithRetry("cos bucket head", func() (*cos.Response, error) {
return client.Bucket.Head(ctx, &cos.BucketHeadOptions{})
}, maxRetryAttemps, IsRetryableError)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -268,13 +275,34 @@ func (a *QCloudSDK) Write(
}

} else {
err = a.putObject(
ctx,
key,
r,
sizeHint,
expire,
)
seeker, ok := r.(io.Seeker)
if !ok {
err := a.WriteMultipartParallel(ctx, key, r, sizeHint, &ParallelMultipartOption{
PartSize: defaultParallelMultipartPartSize,
Concurrency: 1,
Expire: expire,
})
if err != nil {
return err
}
return nil
}
offset, err := seeker.Seek(0, io.SeekCurrent)
if err != nil {
return err
}
_, err = DoWithRetry("write", func() (int, error) {
if _, err := seeker.Seek(offset, io.SeekStart); err != nil {
return 0, err
}
return 0, a.putObject(
ctx,
key,
r,
sizeHint,
expire,
)
}, maxRetryAttemps, IsRetryableError)
if err != nil {
return err
}
Expand All @@ -297,16 +325,22 @@ func (a *QCloudSDK) WriteMultipartParallel(
defer wrapSizeMismatchErr(&err)

options := normalizeParallelOption(opt)
if sizeHint != nil && *sizeHint < minMultipartPartSize {
return a.Write(ctx, key, r, sizeHint, options.Expire)
}
if sizeHint != nil {
r = &exactSizeReader{
R: r,
Expected: *sizeHint,
Key: key,
}
if *sizeHint < minMultipartPartSize {
return a.Write(ctx, key, r, sizeHint, options.Expire)
}
expectedParts := (*sizeHint + options.PartSize - 1) / options.PartSize
if expectedParts > maxMultipartParts {
return moerr.NewInternalErrorNoCtxf("too many parts for multipart upload: %d", expectedParts)
}
}

parentCtx := ctx
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down Expand Up @@ -372,7 +406,10 @@ func (a *QCloudSDK) WriteMultipartParallel(

defer func() {
if err != nil {
_, _ = a.client.Object.AbortMultipartUpload(ctx, key, output.UploadID)
_, _ = DoWithRetry("cos abort multipart upload", func() (*cos.Response, error) {
return a.client.Object.AbortMultipartUpload(
context.WithoutCancel(parentCtx), key, output.UploadID)
}, maxRetryAttemps, IsRetryableError)
}
}()

Expand Down
Loading
Loading