diff --git a/pkg/chipingress/README.md b/pkg/chipingress/README.md index b4c64a38aa..53376e6054 100644 --- a/pkg/chipingress/README.md +++ b/pkg/chipingress/README.md @@ -5,6 +5,7 @@ ChipIngress is a gRPC client library for interacting with the ChipIngress servic ## Features - **CloudEvent Publishing**: Publish single events or batches of events +- **Batch Client with Partial Delivery**: Accumulate and flush events in batches, with per-event delivery results or all-or-nothing semantics - **Authentication Support**: Multiple authentication methods including basic auth and token-based auth - **Secure Communication**: Support for both TLS and insecure connections - **Event Management**: Utilities for creating, converting, and managing CloudEvents @@ -85,6 +86,82 @@ if err != nil { } ``` +### Batch Publishing & Partial Delivery + +For higher throughput, the `batch` subpackage provides a client that accumulates +events and flushes them by batch size, byte budget, or time interval. + +```go +import ( + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/batch" +) + +client, err := chipingress.NewClient("example.com:9090", chipingress.WithTLS()) +if err != nil { + log.Fatal(err) +} + +bc, err := batch.NewBatchClient(client, + batch.WithBatchSize(100), + batch.WithBatchInterval(100*time.Millisecond), + batch.WithMaxGRPCRequestSize(10*1024*1024), +) +if err != nil { + log.Fatal(err) +} +bc.Start(context.Background()) +defer bc.Stop() // flushes the pending batch and waits for callbacks + +eventPb, _ := chipingress.EventToProto(event) +err = bc.QueueMessage(eventPb, func(err error) { + if err != nil { + // per-event delivery failure (see PublishError below) + } +}) +``` + +`QueueMessage` returns immediately and drops the message (returning an error) if +the internal buffer is full. The optional callback is invoked once the batch +containing the event has been sent. + +#### Delivery mode + +`PublishBatch` supports two modes, selected by `transaction_enabled`: + +- **Partial delivery (default)** — valid events are produced and per-event errors + are returned for the invalid ones, rather than failing the whole batch. The + callback for a failed event receives a `*batch.PublishError`. +- **All-or-nothing** — enable with `batch.WithTransactionEnabled(true)`. Any + per-event failure fails the entire batch and every callback receives the error. + +The single-shot client exposes the same option via +`chipingress.EventsToBatchWithOpts(events, chipingress.WithTransactionEnabled(true))`. + +#### Per-event error codes + +A `*batch.PublishError` carries a `Code` and `Reason`: + +| Code | Meaning | +| --- | --- | +| `ErrCodeValidationFailed` | CloudEvent structure is invalid (missing required fields, bad attribute). | +| `ErrCodeSchemaMissing` | No schema registered for the event's subject. Register it first. | +| `ErrCodeEncodeError` | Payload could not be encoded against its registered schema. | +| `ErrCodeDomainMisconfiguration` | Event source does not map to a known domain. | +| `ErrCodeResultsMismatch` | Client-side: server returned fewer results than events sent. | + +Results are positional — `results[i]` corresponds to the i-th queued event. A +gRPC-level error (e.g. connection failure) fails every callback in the batch +regardless of delivery mode. + +#### Sizing + +`WithMaxGRPCRequestSize` bounds the serialized request size; oversized batches are +split into multiple requests, and a single event larger than the limit fails its +callback rather than being sent. Each queued event is also stamped with a +monotonic `seqnum` extension per `(source, type)` pair so downstream consumers can +detect gaps. + ### OpenTelemetry Integration The client automatically instruments gRPC calls with OpenTelemetry for distributed tracing and metrics. diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index 42f9c437d5..8fb44aee73 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -17,6 +17,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + chipingresspb "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" ) type messageWithCallback struct { @@ -43,12 +44,12 @@ func (e *PublishError) Error() string { // Error codes returned by the server in PublishError.Code. // Re-exported from the proto package for convenience. const ( - ErrCodeUnknown = chipingress.PublishErrorCode(0) // PUBLISH_ERROR_CODE_UNKNOWN - ErrCodeValidationFailed = chipingress.PublishErrorCode(1) // PUBLISH_ERROR_CODE_VALIDATION_FAILED - ErrCodeSchemaMissing = chipingress.PublishErrorCode(2) // PUBLISH_ERROR_CODE_SCHEMA_MISSING - ErrCodeEncodeError = chipingress.PublishErrorCode(3) // PUBLISH_ERROR_CODE_ENCODE_ERROR - ErrCodeDomainMisconfiguration = chipingress.PublishErrorCode(4) // PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION - ErrCodeResultsMismatch = chipingress.PublishErrorCode(-1) // client-side synthetic code + ErrCodeUnknown = chipingresspb.PublishErrorCode_PUBLISH_ERROR_CODE_UNKNOWN + ErrCodeValidationFailed = chipingresspb.PublishErrorCode_PUBLISH_ERROR_CODE_VALIDATION_FAILED + ErrCodeSchemaMissing = chipingresspb.PublishErrorCode_PUBLISH_ERROR_CODE_SCHEMA_MISSING + ErrCodeEncodeError = chipingresspb.PublishErrorCode_PUBLISH_ERROR_CODE_ENCODE_ERROR + ErrCodeDomainMisconfiguration = chipingresspb.PublishErrorCode_PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION + ErrCodeResultsMismatch = chipingress.PublishErrorCode(-1) // client-side synthetic code; no proto equivalent ) // Client is a batching client that accumulates messages and sends them in batches. @@ -314,8 +315,15 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) b.log.Errorw("failed to publish batch", "error", err) b.completeBatchCallbacks(batchMessages, err) } else if !b.transactionEnabled && resp != nil && len(resp.Results) > 0 { - b.completeBatchCallbacksFromResults(batchMessages, resp.Results) + b.completeBatchCallbacksFromResults(ctx, batchMessages, resp.Results) } else { + // All-or-nothing (transactionEnabled), or partial delivery with no + // per-event results to dispatch. The latter assumes a server in + // partial-delivery mode always returns per-event results for a + // non-empty batch; if it returns an empty/nil response instead + // (e.g. an older server unaware of results), we treat the whole + // batch as succeeded for backwards compatibility. This means a + // silent server-side drop would be reported as success. b.completeBatchCallbacks(batchMessages, nil) } } @@ -344,13 +352,13 @@ func (b *Client) completeBatchCallbacks(messages []*messageWithCallback, err err // - If len(results) < len(messages): remaining callbacks get a synthetic RESULTS_MISMATCH error. // - If len(results) > len(messages): extras are ignored. // - If results[i].EventId != messages[i].event.Id: a warning is logged. -func (b *Client) completeBatchCallbacksFromResults(messages []*messageWithCallback, results []*chipingress.PublishResult) { +func (b *Client) completeBatchCallbacksFromResults(ctx context.Context, messages []*messageWithCallback, results []*chipingress.PublishResult) { if len(results) != len(messages) { b.log.Warnw("publish results length mismatch", "results", len(results), "messages", len(messages), ) - b.metrics.resultsMismatchTotal.Add(context.Background(), 1) + b.metrics.resultsMismatchTotal.Add(ctx, 1) } b.callbackWg.Go(func() { @@ -377,7 +385,7 @@ func (b *Client) completeBatchCallbacksFromResults(messages []*messageWithCallba "expected", msg.event.Id, "got", result.EventId, ) - b.metrics.resultsMismatchTotal.Add(context.Background(), 1) + b.metrics.resultsMismatchTotal.Add(ctx, 1) } if result.Error != nil { msg.callback(&PublishError{ diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index ce29dd0c0e..f8d76d3020 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -1951,6 +1951,55 @@ func TestTransactionEnabledEdgeCases(t *testing.T) { assert.Contains(t, results["e3"].Error(), "server returned 1 results for 3 events") }) + t.Run("event_id mismatch still dispatches by index and records mismatch metric", func(t *testing.T) { + reader, restore := useTestMeterProvider(t) + defer restore() + + mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{ + Results: []*chipingress.PublishResult{ + {EventId: "e1"}, + // EventId disagrees with messages[1].event.Id ("e2"); the + // error must still be dispatched positionally to e2's callback. + {EventId: "wrong-id", Error: &chipingress.PublishError{ErrorCode: chipingress.PublishErrorCode(1), Reason: "bad"}}, + }, + }, nil) + + client, err := NewBatchClient(mockClient, WithTransactionEnabled(false)) + require.NoError(t, err) + + var mu sync.Mutex + results := make(map[string]error) + messages := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "e1", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { mu.Lock(); results["e1"] = err; mu.Unlock() }}, + {event: &chipingress.CloudEventPb{Id: "e2", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { mu.Lock(); results["e2"] = err; mu.Unlock() }}, + } + + client.sendBatch(t.Context(), messages) + client.maxConcurrentSends <- struct{}{} + <-client.maxConcurrentSends + client.callbackWg.Wait() + + mu.Lock() + require.NoError(t, results["e1"]) + require.Error(t, results["e2"], "positional dispatch: e2 gets the error despite the event_id mismatch") + var re *PublishError + require.ErrorAs(t, results["e2"], &re) + assert.Equal(t, ErrCodeValidationFailed, re.Code) + mu.Unlock() + + client.Stop() + rm := collectResourceMetrics(t, reader) + mismatch := mustMetric(t, rm, "chip_ingress.batch.results_mismatch_total") + mismatchSum, ok := mismatch.Data.(metricdata.Sum[int64]) + require.True(t, ok) + require.NotEmpty(t, mismatchSum.DataPoints) + assert.GreaterOrEqual(t, mismatchSum.DataPoints[0].Value, int64(1)) + }) + t.Run("more results than messages does not panic", func(t *testing.T) { mockClient := mocks.NewClient(t) mockClient.EXPECT().Close().Return(nil).Maybe()