Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 58 additions & 21 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"maps"
"net/http"
"os"
"sync"
"time"

"github.com/alecthomas/errors"
Expand Down Expand Up @@ -338,11 +339,12 @@ func filterHeaders(headers http.Header, skip ...string) http.Header {

// writeCloser wraps a pipe writer and waits for the HTTP request to complete.
type writeCloser struct {
pw *io.PipeWriter
done chan error
ctx context.Context
cancel context.CancelCauseFunc
closed bool
pw *io.PipeWriter
done chan error
ctx context.Context
cancel context.CancelCauseFunc
once sync.Once
closeErr error
}

func (wc *writeCloser) Write(p []byte) (int, error) {
Expand All @@ -355,23 +357,58 @@ func (wc *writeCloser) Abort(err error) error {
return wc.Close()
}

// Close is safe to call multiple times and from multiple goroutines (e.g. a
// deferred Close racing an explicit Abort): the underlying close-and-wait
// runs at most once, so the second caller will not deadlock waiting on a
// drained <-wc.done channel.
func (wc *writeCloser) Close() error {
if wc.closed {
return nil
}
wc.closed = true
if err := wc.ctx.Err(); err != nil {
_ = wc.pw.CloseWithError(err)
<-wc.done
return errors.Wrap(err, "create operation cancelled")
}
if err := wc.pw.Close(); err != nil {
<-wc.done
return errors.Wrap(err, "failed to close pipe writer")
}
err := <-wc.done
if err != nil {
return errors.Wrap(err, "request failed")
wc.once.Do(func() {
wc.closeErr = wc.doClose()
})
return wc.closeErr
}

func (wc *writeCloser) doClose() error {
// Close the upload pipe so the goroutine driving c.http.Do can finish.
// If the caller's ctx is cancelled or the upload was Abort'd, propagate
// the cause via CloseWithError so any in-flight Write returns the cause
// rather than EOF; otherwise do a clean Close. context.Cause is
// preferred over ctx.Err so an Abort(cause) propagates the caller's
// reason rather than a generic context.Canceled — when the parent
// context cancels independently, Cause falls back to ctx.Err so
// behaviour matches the previous code in that path.
ctxCause := context.Cause(wc.ctx)
var pipeCloseErr error
if ctxCause != nil {
_ = wc.pw.CloseWithError(ctxCause)
} else {
pipeCloseErr = wc.pw.Close()
}

// Always wait for the request goroutine to finish so connection
// resources are released and we can inspect the server's response.
serverErr := <-wc.done

// Prefer the server's typed *HTTPStatusError when present. The server
// can respond with an authoritative status (e.g. 403) before the local
// ctx is cancelled or the pipe is torn down — that response is more
// meaningful to the caller than the local symptom (a downstream
// "broken pipe" or context cancellation can fire as a consequence of
// the server closing the connection after writing its response).
// Without this preference, callers that match on *HTTPStatusError to
// classify e.g. "save not authorized" lose the signal entirely.
var statusErr *HTTPStatusError
if errors.As(serverErr, &statusErr) {
return errors.Wrap(serverErr, "request failed")
}
if ctxCause != nil {
return errors.Wrap(ctxCause, "create operation cancelled")
}
if pipeCloseErr != nil {
return errors.Wrap(pipeCloseErr, "failed to close pipe writer")
}
if serverErr != nil {
return errors.Wrap(serverErr, "request failed")
}
return nil
}
101 changes: 101 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client_test

import (
"bytes"
"context"
"encoding/json"
"io"
"maps"
Expand All @@ -14,8 +15,10 @@ import (
"strings"
"sync"
"testing"
"time"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/errors"

"github.com/block/cachew/client"
)
Expand Down Expand Up @@ -308,3 +311,101 @@ func TestParseNamespaceInvalid(t *testing.T) {
_, err := client.ParseNamespace("_bad")
assert.Error(t, err)
}

// roundTripperFunc adapts a function to http.RoundTripper for tests.
type roundTripperFunc func(*http.Request) (*http.Response, error)

func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) }

// TestCreateClosePreservesStatusErrorOnCancelledCtx exercises the
// masked-403 race in writeCloser.Close: the server has responded with
// 403 (so wc.done holds *HTTPStatusError) but ctx is cancelled before
// Close runs. The fix prefers the typed status error over the local
// ctx-cancelled error so callers can still classify the failure as a
// permission denial.
func TestCreateClosePreservesStatusErrorOnCancelledCtx(t *testing.T) {
// Controlled transport: signals via responseDone after the response
// has been delivered to the client, so the test can cancel ctx with
// a deterministic happens-before relative to wc.done being filled.
responseDone := make(chan struct{})
httpClient := &http.Client{
Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) {
// Respond 403 without reading the request body. The body is the
// io.Pipe reader from Create; reading it would deadlock because
// nothing writes to it until Close, and Close is blocked on
// wc.done. The real cachew server's behaviour on auth-denial is
// equivalent: it can write the response before/without consuming
// the body — that's exactly what produces the broken-pipe
// symptom in production.
resp := &http.Response{
StatusCode: http.StatusForbidden,
Body: io.NopCloser(strings.NewReader("")),
Header: http.Header{},
Request: r,
}
close(responseDone)
return resp, nil
}),
}
c := client.NewWithHTTPClient("http://example.invalid", httpClient).Namespace("test")
defer c.Close()

ctx, cancel := context.WithCancel(t.Context())
defer cancel()

wc, err := c.Create(ctx, client.NewKey("k"), nil, 0)
assert.NoError(t, err)

// Wait until the transport has produced the 403 response. After this
// point the goroutine inside Create has run io.Copy + Body.Close +
// `wc.done <- &HTTPStatusError{403}`. Cancelling ctx now puts us in
// exactly the state the masked-403 bug requires: ctx.Err() != nil at
// Close time, *and* a typed status error sitting on wc.done.
<-responseDone
cancel()

closeErr := wc.Close()
var statusErr *client.HTTPStatusError
assert.True(t, errors.As(closeErr, &statusErr),
"Close error must contain *HTTPStatusError, got: %v", closeErr)
assert.Equal(t, http.StatusForbidden, statusErr.StatusCode)
}

// TestCreateCloseIdempotent verifies that a second Close call (e.g. a
// deferred Close after an explicit Abort or a parallel cleanup goroutine)
// does not deadlock on the drained wc.done channel and returns the same
// error as the first.
func TestCreateCloseIdempotent(t *testing.T) {
httpClient := &http.Client{
Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("")),
Header: http.Header{},
Request: r,
}, nil
}),
}
c := client.NewWithHTTPClient("http://example.invalid", httpClient).Namespace("test")
defer c.Close()

wc, err := c.Create(t.Context(), client.NewKey("k"), nil, 0)
assert.NoError(t, err)

first := wc.Close()
assert.NoError(t, first)

// Second Close must return immediately. Run it in a goroutine with a
// generous bound so a regression manifests as a clear timeout failure
// rather than hanging the suite — without sync.Once the second caller
// would block forever on <-wc.done after the channel was drained by
// the first call.
done := make(chan error, 1)
go func() { done <- wc.Close() }()
select {
case second := <-done:
assert.Equal(t, first, second, "second Close must return the same error as the first")
case <-time.After(2 * time.Second):
t.Fatal("second Close blocked; expected idempotent return")
}
}