From bd237fe2a534fe00f7d7860b7f876bd027ac2d44 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Sat, 30 May 2026 02:03:01 -0400 Subject: [PATCH 1/2] chipingress/batch: incremental batch-split sizing + larger default batchSize splitMessagesByRequestSize re-serialized the entire growing batch on every message (newBatchRequest -> proto.Size), so splitting an n-event batch walked 1+2+...+n events worth of bytes. Accumulate each event's repeated-field contribution incrementally instead, sizing the batch in O(n). eventFieldSize computes the per-event cost via protowire and matches proto.Size exactly; subsequent split batches no longer pre-allocate a full-length backing array. Also raise the default NewBatchClient batchSize from 10 to 100 for ~10x more events per PublishBatch under sustained load. batchInterval (100ms) still bounds latency for partial batches, 100 stays below the default messageBuffer (200) so the size trigger still fires, and larger batches that exceed maxGRPCRequestSize are now split in O(n). Callers can still override via WithBatchSize. Benchmarks (BenchmarkSplitMessagesByRequestSize, ~10 events/batch): n=10 2940ns/1208B/22 allocs -> 580ns/168B/3 allocs n=100 30767ns/15536B/255 allocs -> 6335ns/4160B/56 allocs n=1000 322229ns/157616B/2558 allocs -> 64900ns/43040B/559 allocs Tests assert the incremental total equals proto.Size and that every produced sub-batch fits the limit with no events dropped. --- pkg/chipingress/batch/client.go | 78 ++++++++++++++------- pkg/chipingress/batch/client_test.go | 42 +++++++++++ pkg/chipingress/batch/split_bench_test.go | 85 +++++++++++++++++++++++ 3 files changed, 178 insertions(+), 27 deletions(-) create mode 100644 pkg/chipingress/batch/split_bench_test.go diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index 93c9efd049..0ec9902611 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/otel/attribute" otelmetric "go.opentelemetry.io/otel/metric" "go.uber.org/zap" + "google.golang.org/protobuf/encoding/protowire" "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" @@ -36,18 +37,18 @@ type Client struct { maxGRPCRequestSize int // configured max, used for metrics/error reporting effectiveMaxRequestSize int // maxGRPCRequestSize minus grpcFramingOverhead, used for splitting cloneEvent bool - maxConcurrentSends chan struct{} - batchInterval time.Duration - maxPublishTimeout time.Duration - messageBuffer chan *messageWithCallback - stopCh stopCh - log *zap.SugaredLogger - callbackWg sync.WaitGroup - shutdownTimeout time.Duration - shutdownOnce sync.Once - batcherDone chan struct{} - started bool - counters sync.Map // map[seqnumKey]*atomic.Uint64 for per-(source,type) seqnum, cleared on Stop() + maxConcurrentSends chan struct{} + batchInterval time.Duration + maxPublishTimeout time.Duration + messageBuffer chan *messageWithCallback + stopCh stopCh + log *zap.SugaredLogger + callbackWg sync.WaitGroup + shutdownTimeout time.Duration + shutdownOnce sync.Once + batcherDone chan struct{} + started bool + counters sync.Map // map[seqnumKey]*atomic.Uint64 for per-(source,type) seqnum, cleared on Stop() metrics batchClientMetrics } @@ -73,18 +74,18 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { c := &Client{ client: client, log: zap.NewNop().Sugar(), - batchSize: 10, + batchSize: 100, maxGRPCRequestSize: 10 * 1024 * 1024, effectiveMaxRequestSize: 10*1024*1024 - grpcFramingOverhead, cloneEvent: true, - maxConcurrentSends: make(chan struct{}, 1), - messageBuffer: make(chan *messageWithCallback, 200), - batchInterval: 100 * time.Millisecond, - maxPublishTimeout: 5 * time.Second, - stopCh: make(chan struct{}), - callbackWg: sync.WaitGroup{}, - shutdownTimeout: 5 * time.Second, - batcherDone: make(chan struct{}), + maxConcurrentSends: make(chan struct{}, 1), + messageBuffer: make(chan *messageWithCallback, 200), + batchInterval: 100 * time.Millisecond, + maxPublishTimeout: 5 * time.Second, + stopCh: make(chan struct{}), + callbackWg: sync.WaitGroup{}, + shutdownTimeout: 5 * time.Second, + batcherDone: make(chan struct{}), } for _, opt := range opts { @@ -322,17 +323,31 @@ func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize return [][]*messageWithCallback{messages} } + // A CloudEventBatch encodes its events as a repeated field, so the serialized + // request size is a fixed base (everything except the events) plus the + // independent contribution of each event. Accumulating per-event sizes keeps + // splitting O(n): previously each message re-serialized the whole growing + // batch via newBatchRequest -> proto.Size, costing O(n^2) bytes walked. + _, baseBytes := newBatchRequest(nil) + var batches [][]*messageWithCallback + // Size the first batch for the common case where everything fits in one + // request. Subsequent batches (only created when splitting) start small and + // grow, so we don't allocate a full-length backing array per split. current := make([]*messageWithCallback, 0, len(messages)) + currentBytes := baseBytes for _, msg := range messages { - candidate := append(current, msg) - _, candidateBytes := newBatchRequest(candidate) - if len(current) > 0 && candidateBytes > maxRequestSize { + eventBytes := eventFieldSize(msg.event) + // Start a new batch once adding this event would exceed the limit, but + // always keep at least one event per batch (an oversized single event is + // rejected later in sendBatch). + if len(current) > 0 && currentBytes+eventBytes > maxRequestSize { batches = append(batches, current) - current = []*messageWithCallback{msg} - continue + current = nil + currentBytes = baseBytes } - current = candidate + current = append(current, msg) + currentBytes += eventBytes } if len(current) > 0 { batches = append(batches, current) @@ -340,6 +355,15 @@ func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize return batches } +// eventFieldSize returns the number of bytes a single event contributes to a +// CloudEventBatch: the repeated events field tag plus the length-delimited +// message. This matches proto.Size's accounting for that field exactly, so the +// running total stays identical to proto.Size(batch). +func eventFieldSize(event *chipingress.CloudEventPb) int { + const eventsFieldNumber = 1 // CloudEventBatch.events + return protowire.SizeTag(eventsFieldNumber) + protowire.SizeBytes(proto.Size(event)) +} + func newBatchRequest(messages []*messageWithCallback) (*chipingress.CloudEventBatch, int) { events := make([]*chipingress.CloudEventPb, len(messages)) for i, msg := range messages { diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index e698e75c61..e2985056f1 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -1599,6 +1599,48 @@ func TestSplitMessagesByRequestSize(t *testing.T) { assert.Len(t, batch, 1) } }) + + t.Run("eventFieldSize accumulation matches proto.Size for the whole batch", func(t *testing.T) { + msgs := []*messageWithCallback{ + {event: largeTestEvent("a")}, + {event: largeTestEvent("bb")}, + {event: largeTestEvent("ccc")}, + } + events := make([]*chipingress.CloudEventPb, len(msgs)) + _, summed := newBatchRequest(nil) + for i, m := range msgs { + events[i] = m.event + summed += eventFieldSize(m.event) + } + // The incremental total used for splitting must equal the real + // serialized size; otherwise splits could under/over-count. + assert.Equal(t, proto.Size(&chipingress.CloudEventBatch{Events: events}), summed) + }) + + t.Run("every produced batch fits within the limit by proto.Size", func(t *testing.T) { + msgs := make([]*messageWithCallback, 0, 25) + for i := 0; i < 25; i++ { + msgs = append(msgs, &messageWithCallback{event: largeTestEvent(strconv.Itoa(i))}) + } + // A limit that holds two events but forces several splits. + twoEvents := &chipingress.CloudEventBatch{Events: []*chipingress.CloudEventPb{msgs[0].event, msgs[1].event}} + maxRequestSize := proto.Size(twoEvents) + + result := splitMessagesByRequestSize(msgs, maxRequestSize) + require.Greater(t, len(result), 1, "expected the batch to be split") + + var total int + for _, batch := range result { + require.NotEmpty(t, batch) + events := make([]*chipingress.CloudEventPb, len(batch)) + for i, m := range batch { + events[i] = m.event + } + assert.LessOrEqual(t, proto.Size(&chipingress.CloudEventBatch{Events: events}), maxRequestSize) + total += len(batch) + } + assert.Equal(t, len(msgs), total, "no events dropped across splits") + }) } func BenchmarkSendBatch(b *testing.B) { diff --git a/pkg/chipingress/batch/split_bench_test.go b/pkg/chipingress/batch/split_bench_test.go new file mode 100644 index 0000000000..b5c3a467cf --- /dev/null +++ b/pkg/chipingress/batch/split_bench_test.go @@ -0,0 +1,85 @@ +package batch + +import ( + "strconv" + "testing" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "google.golang.org/protobuf/proto" +) + +// splitMessagesByRequestSizeQuadratic is the previous implementation, kept here +// only as a benchmark baseline. It re-serializes the whole growing batch on +// every message (O(n^2) bytes walked), whereas splitMessagesByRequestSize +// accumulates per-event sizes (O(n)). +func splitMessagesByRequestSizeQuadratic(messages []*messageWithCallback, maxRequestSize int) [][]*messageWithCallback { + if len(messages) == 0 { + return nil + } + if maxRequestSize <= 0 { + return [][]*messageWithCallback{messages} + } + + var batches [][]*messageWithCallback + current := make([]*messageWithCallback, 0, len(messages)) + for _, msg := range messages { + candidate := append(current, msg) + _, candidateBytes := newBatchRequest(candidate) + if len(current) > 0 && candidateBytes > maxRequestSize { + batches = append(batches, current) + current = []*messageWithCallback{msg} + continue + } + current = candidate + } + if len(current) > 0 { + batches = append(batches, current) + } + return batches +} + +func benchSplitMessages(n int) []*messageWithCallback { + msgs := make([]*messageWithCallback, n) + for i := range msgs { + msgs[i] = &messageWithCallback{event: largeTestEvent(strconv.Itoa(i))} + } + return msgs +} + +// benchSplitMaxRequestSize returns a limit that holds ~perBatch events, so the +// input is split into many sub-batches (the case where the quadratic cost bites). +func benchSplitMaxRequestSize(msgs []*messageWithCallback, perBatch int) int { + if perBatch > len(msgs) { + perBatch = len(msgs) + } + events := make([]*chipingress.CloudEventPb, perBatch) + for i := 0; i < perBatch; i++ { + events[i] = msgs[i].event + } + return proto.Size(&chipingress.CloudEventBatch{Events: events}) +} + +// BenchmarkSplitMessagesByRequestSize compares the linear (current) and +// quadratic (old) splitters across batch sizes. +// +// go test ./pkg/chipingress/batch -bench BenchmarkSplitMessagesByRequestSize -benchmem -run '^$' +func BenchmarkSplitMessagesByRequestSize(b *testing.B) { + for _, n := range []int{10, 100, 1000} { + msgs := benchSplitMessages(n) + maxRequestSize := benchSplitMaxRequestSize(msgs, 10) + + b.Run("linear/n="+strconv.Itoa(n), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _ = splitMessagesByRequestSize(msgs, maxRequestSize) + } + }) + + b.Run("quadratic/n="+strconv.Itoa(n), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _ = splitMessagesByRequestSizeQuadratic(msgs, maxRequestSize) + } + }) + } +} From e1640ff84b89e4860cbedb1635704e555fa092d0 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Sun, 31 May 2026 18:46:18 -0400 Subject: [PATCH 2/2] chipingress/batch: cover default batchSize and oversized-single-event split Assert NewBatchClient defaults batchSize to 100 (not just that WithBatchSize sets it) so the payload/size-trigger behavior change for callers that don't override is guarded against accidental revert. Add a split test for an event larger than the request limit: it must land in its own batch and never be dropped (sendBatch rejects it against maxGRPCRequestSize). --- pkg/chipingress/batch/client_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index e2985056f1..0792a3f747 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -28,6 +28,10 @@ func TestNewBatchClient(t *testing.T) { client, err := NewBatchClient(mocks.NewClient(t)) require.NoError(t, err) assert.NotNil(t, client) + // Default batchSize is 100. Guard the default explicitly: it changes the + // gRPC payload size and events-per-callback for every caller that doesn't + // pass WithBatchSize, so an accidental revert should fail loudly. + assert.Equal(t, 100, client.batchSize) }) t.Run("WithBatchSize", func(t *testing.T) { @@ -1641,6 +1645,25 @@ func TestSplitMessagesByRequestSize(t *testing.T) { } assert.Equal(t, len(msgs), total, "no events dropped across splits") }) + + t.Run("event larger than the limit gets its own batch and is not dropped", func(t *testing.T) { + big := largeTestEvent("big") + // A limit below a single event's contribution forces every event into its + // own batch. Splitting keeps the event (sendBatch is responsible for + // rejecting it against maxGRPCRequestSize), never drops it. + maxRequestSize := eventFieldSize(big) - 1 + msgs := []*messageWithCallback{ + {event: big}, + {event: largeTestEvent("small")}, + } + + result := splitMessagesByRequestSize(msgs, maxRequestSize) + require.Len(t, result, 2, "each oversized event must land in its own batch") + for _, batch := range result { + require.Len(t, batch, 1) + } + assert.Same(t, big, result[0][0].event, "oversized event preserved, not dropped") + }) } func BenchmarkSendBatch(b *testing.B) {