Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 51 additions & 27 deletions pkg/chipingress/batch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
Expand All @@ -36,18 +37,18 @@ type Client struct {
maxGRPCRequestSize int // configured max, used for metrics/error reporting
effectiveMaxRequestSize int // maxGRPCRequestSize minus grpcFramingOverhead, used for splitting
cloneEvent bool
maxConcurrentSends chan struct{}
batchInterval time.Duration
maxPublishTimeout time.Duration
messageBuffer chan *messageWithCallback
stopCh stopCh
log *zap.SugaredLogger
callbackWg sync.WaitGroup
shutdownTimeout time.Duration
shutdownOnce sync.Once
batcherDone chan struct{}
started bool
counters sync.Map // map[seqnumKey]*atomic.Uint64 for per-(source,type) seqnum, cleared on Stop()
maxConcurrentSends chan struct{}
batchInterval time.Duration
maxPublishTimeout time.Duration
messageBuffer chan *messageWithCallback
stopCh stopCh
log *zap.SugaredLogger
callbackWg sync.WaitGroup
shutdownTimeout time.Duration
shutdownOnce sync.Once
batcherDone chan struct{}
started bool
counters sync.Map // map[seqnumKey]*atomic.Uint64 for per-(source,type) seqnum, cleared on Stop()

metrics batchClientMetrics
}
Expand All @@ -73,18 +74,18 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) {
c := &Client{
client: client,
log: zap.NewNop().Sugar(),
batchSize: 10,
batchSize: 100,
maxGRPCRequestSize: 10 * 1024 * 1024,
effectiveMaxRequestSize: 10*1024*1024 - grpcFramingOverhead,
cloneEvent: true,
maxConcurrentSends: make(chan struct{}, 1),
messageBuffer: make(chan *messageWithCallback, 200),
batchInterval: 100 * time.Millisecond,
maxPublishTimeout: 5 * time.Second,
stopCh: make(chan struct{}),
callbackWg: sync.WaitGroup{},
shutdownTimeout: 5 * time.Second,
batcherDone: make(chan struct{}),
maxConcurrentSends: make(chan struct{}, 1),
messageBuffer: make(chan *messageWithCallback, 200),
batchInterval: 100 * time.Millisecond,
maxPublishTimeout: 5 * time.Second,
stopCh: make(chan struct{}),
callbackWg: sync.WaitGroup{},
shutdownTimeout: 5 * time.Second,
batcherDone: make(chan struct{}),
}

for _, opt := range opts {
Expand Down Expand Up @@ -322,24 +323,47 @@ func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize
return [][]*messageWithCallback{messages}
}

// A CloudEventBatch encodes its events as a repeated field, so the serialized
// request size is a fixed base (everything except the events) plus the
// independent contribution of each event. Accumulating per-event sizes keeps
// splitting O(n): previously each message re-serialized the whole growing
// batch via newBatchRequest -> proto.Size, costing O(n^2) bytes walked.
_, baseBytes := newBatchRequest(nil)

var batches [][]*messageWithCallback
// Size the first batch for the common case where everything fits in one
// request. Subsequent batches (only created when splitting) start small and
// grow, so we don't allocate a full-length backing array per split.
current := make([]*messageWithCallback, 0, len(messages))
currentBytes := baseBytes
for _, msg := range messages {
candidate := append(current, msg)
_, candidateBytes := newBatchRequest(candidate)
if len(current) > 0 && candidateBytes > maxRequestSize {
eventBytes := eventFieldSize(msg.event)
// Start a new batch once adding this event would exceed the limit, but
// always keep at least one event per batch (an oversized single event is
// rejected later in sendBatch).
if len(current) > 0 && currentBytes+eventBytes > maxRequestSize {
batches = append(batches, current)
current = []*messageWithCallback{msg}
continue
current = nil
currentBytes = baseBytes
}
current = candidate
current = append(current, msg)
currentBytes += eventBytes
}
if len(current) > 0 {
batches = append(batches, current)
}
return batches
}

// eventFieldSize returns the number of bytes a single event contributes to a
// CloudEventBatch: the repeated events field tag plus the length-delimited
// message. This matches proto.Size's accounting for that field exactly, so the
// running total stays identical to proto.Size(batch).
func eventFieldSize(event *chipingress.CloudEventPb) int {
const eventsFieldNumber = 1 // CloudEventBatch.events
return protowire.SizeTag(eventsFieldNumber) + protowire.SizeBytes(proto.Size(event))
}

func newBatchRequest(messages []*messageWithCallback) (*chipingress.CloudEventBatch, int) {
events := make([]*chipingress.CloudEventPb, len(messages))
for i, msg := range messages {
Expand Down
65 changes: 65 additions & 0 deletions pkg/chipingress/batch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func TestNewBatchClient(t *testing.T) {
client, err := NewBatchClient(mocks.NewClient(t))
require.NoError(t, err)
assert.NotNil(t, client)
// Default batchSize is 100. Guard the default explicitly: it changes the
// gRPC payload size and events-per-callback for every caller that doesn't
// pass WithBatchSize, so an accidental revert should fail loudly.
assert.Equal(t, 100, client.batchSize)
})

t.Run("WithBatchSize", func(t *testing.T) {
Expand Down Expand Up @@ -1599,6 +1603,67 @@ func TestSplitMessagesByRequestSize(t *testing.T) {
assert.Len(t, batch, 1)
}
})

t.Run("eventFieldSize accumulation matches proto.Size for the whole batch", func(t *testing.T) {
msgs := []*messageWithCallback{
{event: largeTestEvent("a")},
{event: largeTestEvent("bb")},
{event: largeTestEvent("ccc")},
}
events := make([]*chipingress.CloudEventPb, len(msgs))
_, summed := newBatchRequest(nil)
for i, m := range msgs {
events[i] = m.event
summed += eventFieldSize(m.event)
}
// The incremental total used for splitting must equal the real
// serialized size; otherwise splits could under/over-count.
assert.Equal(t, proto.Size(&chipingress.CloudEventBatch{Events: events}), summed)
})

t.Run("every produced batch fits within the limit by proto.Size", func(t *testing.T) {
msgs := make([]*messageWithCallback, 0, 25)
for i := 0; i < 25; i++ {
msgs = append(msgs, &messageWithCallback{event: largeTestEvent(strconv.Itoa(i))})
}
// A limit that holds two events but forces several splits.
twoEvents := &chipingress.CloudEventBatch{Events: []*chipingress.CloudEventPb{msgs[0].event, msgs[1].event}}
maxRequestSize := proto.Size(twoEvents)

result := splitMessagesByRequestSize(msgs, maxRequestSize)
require.Greater(t, len(result), 1, "expected the batch to be split")

var total int
for _, batch := range result {
require.NotEmpty(t, batch)
events := make([]*chipingress.CloudEventPb, len(batch))
for i, m := range batch {
events[i] = m.event
}
assert.LessOrEqual(t, proto.Size(&chipingress.CloudEventBatch{Events: events}), maxRequestSize)
total += len(batch)
}
assert.Equal(t, len(msgs), total, "no events dropped across splits")
})

t.Run("event larger than the limit gets its own batch and is not dropped", func(t *testing.T) {
big := largeTestEvent("big")
// A limit below a single event's contribution forces every event into its
// own batch. Splitting keeps the event (sendBatch is responsible for
// rejecting it against maxGRPCRequestSize), never drops it.
maxRequestSize := eventFieldSize(big) - 1
msgs := []*messageWithCallback{
{event: big},
{event: largeTestEvent("small")},
}

result := splitMessagesByRequestSize(msgs, maxRequestSize)
require.Len(t, result, 2, "each oversized event must land in its own batch")
for _, batch := range result {
require.Len(t, batch, 1)
}
assert.Same(t, big, result[0][0].event, "oversized event preserved, not dropped")
})
}

func BenchmarkSendBatch(b *testing.B) {
Expand Down
85 changes: 85 additions & 0 deletions pkg/chipingress/batch/split_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package batch

import (
"strconv"
"testing"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"

Check failure on line 7 in pkg/chipingress/batch/split_bench_test.go

View workflow job for this annotation

GitHub Actions / lint-module (pkg/chipingress)

File is not properly formatted (goimports)
"google.golang.org/protobuf/proto"
)

// splitMessagesByRequestSizeQuadratic is the previous implementation, kept here
// only as a benchmark baseline. It re-serializes the whole growing batch on
// every message (O(n^2) bytes walked), whereas splitMessagesByRequestSize
// accumulates per-event sizes (O(n)).
func splitMessagesByRequestSizeQuadratic(messages []*messageWithCallback, maxRequestSize int) [][]*messageWithCallback {
if len(messages) == 0 {
return nil
}
if maxRequestSize <= 0 {
return [][]*messageWithCallback{messages}
}

var batches [][]*messageWithCallback
current := make([]*messageWithCallback, 0, len(messages))
for _, msg := range messages {
candidate := append(current, msg)
_, candidateBytes := newBatchRequest(candidate)
if len(current) > 0 && candidateBytes > maxRequestSize {
batches = append(batches, current)
current = []*messageWithCallback{msg}
continue
}
current = candidate
}
if len(current) > 0 {
batches = append(batches, current)
}
return batches
}

func benchSplitMessages(n int) []*messageWithCallback {
msgs := make([]*messageWithCallback, n)
for i := range msgs {
msgs[i] = &messageWithCallback{event: largeTestEvent(strconv.Itoa(i))}
}
return msgs
}

// benchSplitMaxRequestSize returns a limit that holds ~perBatch events, so the
// input is split into many sub-batches (the case where the quadratic cost bites).
func benchSplitMaxRequestSize(msgs []*messageWithCallback, perBatch int) int {
if perBatch > len(msgs) {
perBatch = len(msgs)
}
events := make([]*chipingress.CloudEventPb, perBatch)
for i := 0; i < perBatch; i++ {
events[i] = msgs[i].event
}
return proto.Size(&chipingress.CloudEventBatch{Events: events})
}

// BenchmarkSplitMessagesByRequestSize compares the linear (current) and
// quadratic (old) splitters across batch sizes.
//
// go test ./pkg/chipingress/batch -bench BenchmarkSplitMessagesByRequestSize -benchmem -run '^$'
func BenchmarkSplitMessagesByRequestSize(b *testing.B) {
for _, n := range []int{10, 100, 1000} {
msgs := benchSplitMessages(n)
maxRequestSize := benchSplitMaxRequestSize(msgs, 10)

b.Run("linear/n="+strconv.Itoa(n), func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_ = splitMessagesByRequestSize(msgs, maxRequestSize)
}
})

b.Run("quadratic/n="+strconv.Itoa(n), func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_ = splitMessagesByRequestSizeQuadratic(msgs, maxRequestSize)
}
})
}
}
Loading