diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index 93c9efd049..0ec9902611 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/otel/attribute" otelmetric "go.opentelemetry.io/otel/metric" "go.uber.org/zap" + "google.golang.org/protobuf/encoding/protowire" "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" @@ -36,18 +37,18 @@ type Client struct { maxGRPCRequestSize int // configured max, used for metrics/error reporting effectiveMaxRequestSize int // maxGRPCRequestSize minus grpcFramingOverhead, used for splitting cloneEvent bool - maxConcurrentSends chan struct{} - batchInterval time.Duration - maxPublishTimeout time.Duration - messageBuffer chan *messageWithCallback - stopCh stopCh - log *zap.SugaredLogger - callbackWg sync.WaitGroup - shutdownTimeout time.Duration - shutdownOnce sync.Once - batcherDone chan struct{} - started bool - counters sync.Map // map[seqnumKey]*atomic.Uint64 for per-(source,type) seqnum, cleared on Stop() + maxConcurrentSends chan struct{} + batchInterval time.Duration + maxPublishTimeout time.Duration + messageBuffer chan *messageWithCallback + stopCh stopCh + log *zap.SugaredLogger + callbackWg sync.WaitGroup + shutdownTimeout time.Duration + shutdownOnce sync.Once + batcherDone chan struct{} + started bool + counters sync.Map // map[seqnumKey]*atomic.Uint64 for per-(source,type) seqnum, cleared on Stop() metrics batchClientMetrics } @@ -73,18 +74,18 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { c := &Client{ client: client, log: zap.NewNop().Sugar(), - batchSize: 10, + batchSize: 100, maxGRPCRequestSize: 10 * 1024 * 1024, effectiveMaxRequestSize: 10*1024*1024 - grpcFramingOverhead, cloneEvent: true, - maxConcurrentSends: make(chan struct{}, 1), - messageBuffer: make(chan *messageWithCallback, 200), - batchInterval: 100 * time.Millisecond, - maxPublishTimeout: 5 * time.Second, - stopCh: make(chan struct{}), - callbackWg: sync.WaitGroup{}, - shutdownTimeout: 5 * time.Second, - batcherDone: make(chan struct{}), + maxConcurrentSends: make(chan struct{}, 1), + messageBuffer: make(chan *messageWithCallback, 200), + batchInterval: 100 * time.Millisecond, + maxPublishTimeout: 5 * time.Second, + stopCh: make(chan struct{}), + callbackWg: sync.WaitGroup{}, + shutdownTimeout: 5 * time.Second, + batcherDone: make(chan struct{}), } for _, opt := range opts { @@ -322,17 +323,31 @@ func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize return [][]*messageWithCallback{messages} } + // A CloudEventBatch encodes its events as a repeated field, so the serialized + // request size is a fixed base (everything except the events) plus the + // independent contribution of each event. Accumulating per-event sizes keeps + // splitting O(n): previously each message re-serialized the whole growing + // batch via newBatchRequest -> proto.Size, costing O(n^2) bytes walked. + _, baseBytes := newBatchRequest(nil) + var batches [][]*messageWithCallback + // Size the first batch for the common case where everything fits in one + // request. Subsequent batches (only created when splitting) start small and + // grow, so we don't allocate a full-length backing array per split. current := make([]*messageWithCallback, 0, len(messages)) + currentBytes := baseBytes for _, msg := range messages { - candidate := append(current, msg) - _, candidateBytes := newBatchRequest(candidate) - if len(current) > 0 && candidateBytes > maxRequestSize { + eventBytes := eventFieldSize(msg.event) + // Start a new batch once adding this event would exceed the limit, but + // always keep at least one event per batch (an oversized single event is + // rejected later in sendBatch). + if len(current) > 0 && currentBytes+eventBytes > maxRequestSize { batches = append(batches, current) - current = []*messageWithCallback{msg} - continue + current = nil + currentBytes = baseBytes } - current = candidate + current = append(current, msg) + currentBytes += eventBytes } if len(current) > 0 { batches = append(batches, current) @@ -340,6 +355,15 @@ func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize return batches } +// eventFieldSize returns the number of bytes a single event contributes to a +// CloudEventBatch: the repeated events field tag plus the length-delimited +// message. This matches proto.Size's accounting for that field exactly, so the +// running total stays identical to proto.Size(batch). +func eventFieldSize(event *chipingress.CloudEventPb) int { + const eventsFieldNumber = 1 // CloudEventBatch.events + return protowire.SizeTag(eventsFieldNumber) + protowire.SizeBytes(proto.Size(event)) +} + func newBatchRequest(messages []*messageWithCallback) (*chipingress.CloudEventBatch, int) { events := make([]*chipingress.CloudEventPb, len(messages)) for i, msg := range messages { diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index e698e75c61..0792a3f747 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -28,6 +28,10 @@ func TestNewBatchClient(t *testing.T) { client, err := NewBatchClient(mocks.NewClient(t)) require.NoError(t, err) assert.NotNil(t, client) + // Default batchSize is 100. Guard the default explicitly: it changes the + // gRPC payload size and events-per-callback for every caller that doesn't + // pass WithBatchSize, so an accidental revert should fail loudly. + assert.Equal(t, 100, client.batchSize) }) t.Run("WithBatchSize", func(t *testing.T) { @@ -1599,6 +1603,67 @@ func TestSplitMessagesByRequestSize(t *testing.T) { assert.Len(t, batch, 1) } }) + + t.Run("eventFieldSize accumulation matches proto.Size for the whole batch", func(t *testing.T) { + msgs := []*messageWithCallback{ + {event: largeTestEvent("a")}, + {event: largeTestEvent("bb")}, + {event: largeTestEvent("ccc")}, + } + events := make([]*chipingress.CloudEventPb, len(msgs)) + _, summed := newBatchRequest(nil) + for i, m := range msgs { + events[i] = m.event + summed += eventFieldSize(m.event) + } + // The incremental total used for splitting must equal the real + // serialized size; otherwise splits could under/over-count. + assert.Equal(t, proto.Size(&chipingress.CloudEventBatch{Events: events}), summed) + }) + + t.Run("every produced batch fits within the limit by proto.Size", func(t *testing.T) { + msgs := make([]*messageWithCallback, 0, 25) + for i := 0; i < 25; i++ { + msgs = append(msgs, &messageWithCallback{event: largeTestEvent(strconv.Itoa(i))}) + } + // A limit that holds two events but forces several splits. + twoEvents := &chipingress.CloudEventBatch{Events: []*chipingress.CloudEventPb{msgs[0].event, msgs[1].event}} + maxRequestSize := proto.Size(twoEvents) + + result := splitMessagesByRequestSize(msgs, maxRequestSize) + require.Greater(t, len(result), 1, "expected the batch to be split") + + var total int + for _, batch := range result { + require.NotEmpty(t, batch) + events := make([]*chipingress.CloudEventPb, len(batch)) + for i, m := range batch { + events[i] = m.event + } + assert.LessOrEqual(t, proto.Size(&chipingress.CloudEventBatch{Events: events}), maxRequestSize) + total += len(batch) + } + assert.Equal(t, len(msgs), total, "no events dropped across splits") + }) + + t.Run("event larger than the limit gets its own batch and is not dropped", func(t *testing.T) { + big := largeTestEvent("big") + // A limit below a single event's contribution forces every event into its + // own batch. Splitting keeps the event (sendBatch is responsible for + // rejecting it against maxGRPCRequestSize), never drops it. + maxRequestSize := eventFieldSize(big) - 1 + msgs := []*messageWithCallback{ + {event: big}, + {event: largeTestEvent("small")}, + } + + result := splitMessagesByRequestSize(msgs, maxRequestSize) + require.Len(t, result, 2, "each oversized event must land in its own batch") + for _, batch := range result { + require.Len(t, batch, 1) + } + assert.Same(t, big, result[0][0].event, "oversized event preserved, not dropped") + }) } func BenchmarkSendBatch(b *testing.B) { diff --git a/pkg/chipingress/batch/split_bench_test.go b/pkg/chipingress/batch/split_bench_test.go new file mode 100644 index 0000000000..b5c3a467cf --- /dev/null +++ b/pkg/chipingress/batch/split_bench_test.go @@ -0,0 +1,85 @@ +package batch + +import ( + "strconv" + "testing" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "google.golang.org/protobuf/proto" +) + +// splitMessagesByRequestSizeQuadratic is the previous implementation, kept here +// only as a benchmark baseline. It re-serializes the whole growing batch on +// every message (O(n^2) bytes walked), whereas splitMessagesByRequestSize +// accumulates per-event sizes (O(n)). +func splitMessagesByRequestSizeQuadratic(messages []*messageWithCallback, maxRequestSize int) [][]*messageWithCallback { + if len(messages) == 0 { + return nil + } + if maxRequestSize <= 0 { + return [][]*messageWithCallback{messages} + } + + var batches [][]*messageWithCallback + current := make([]*messageWithCallback, 0, len(messages)) + for _, msg := range messages { + candidate := append(current, msg) + _, candidateBytes := newBatchRequest(candidate) + if len(current) > 0 && candidateBytes > maxRequestSize { + batches = append(batches, current) + current = []*messageWithCallback{msg} + continue + } + current = candidate + } + if len(current) > 0 { + batches = append(batches, current) + } + return batches +} + +func benchSplitMessages(n int) []*messageWithCallback { + msgs := make([]*messageWithCallback, n) + for i := range msgs { + msgs[i] = &messageWithCallback{event: largeTestEvent(strconv.Itoa(i))} + } + return msgs +} + +// benchSplitMaxRequestSize returns a limit that holds ~perBatch events, so the +// input is split into many sub-batches (the case where the quadratic cost bites). +func benchSplitMaxRequestSize(msgs []*messageWithCallback, perBatch int) int { + if perBatch > len(msgs) { + perBatch = len(msgs) + } + events := make([]*chipingress.CloudEventPb, perBatch) + for i := 0; i < perBatch; i++ { + events[i] = msgs[i].event + } + return proto.Size(&chipingress.CloudEventBatch{Events: events}) +} + +// BenchmarkSplitMessagesByRequestSize compares the linear (current) and +// quadratic (old) splitters across batch sizes. +// +// go test ./pkg/chipingress/batch -bench BenchmarkSplitMessagesByRequestSize -benchmem -run '^$' +func BenchmarkSplitMessagesByRequestSize(b *testing.B) { + for _, n := range []int{10, 100, 1000} { + msgs := benchSplitMessages(n) + maxRequestSize := benchSplitMaxRequestSize(msgs, 10) + + b.Run("linear/n="+strconv.Itoa(n), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _ = splitMessagesByRequestSize(msgs, maxRequestSize) + } + }) + + b.Run("quadratic/n="+strconv.Itoa(n), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _ = splitMessagesByRequestSizeQuadratic(msgs, maxRequestSize) + } + }) + } +}