Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions pkg/chipingress/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ ChipIngress is a gRPC client library for interacting with the ChipIngress servic
## Features

- **CloudEvent Publishing**: Publish single events or batches of events
- **Batch Client with Partial Delivery**: Accumulate and flush events in batches, with per-event delivery results or all-or-nothing semantics
- **Authentication Support**: Multiple authentication methods including basic auth and token-based auth
- **Secure Communication**: Support for both TLS and insecure connections
- **Event Management**: Utilities for creating, converting, and managing CloudEvents
Expand Down Expand Up @@ -85,6 +86,82 @@ if err != nil {
}
```

### Batch Publishing & Partial Delivery

For higher throughput, the `batch` subpackage provides a client that accumulates
events and flushes them by batch size, byte budget, or time interval.

```go
import (
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/batch"
)
Comment on lines +95 to +98

client, err := chipingress.NewClient("example.com:9090", chipingress.WithTLS())
if err != nil {
log.Fatal(err)
}

bc, err := batch.NewBatchClient(client,
batch.WithBatchSize(100),
batch.WithBatchInterval(100*time.Millisecond),
batch.WithMaxGRPCRequestSize(10*1024*1024),
)
if err != nil {
log.Fatal(err)
}
bc.Start(context.Background())
defer bc.Stop() // flushes the pending batch and waits for callbacks

eventPb, _ := chipingress.EventToProto(event)
err = bc.QueueMessage(eventPb, func(err error) {
if err != nil {
// per-event delivery failure (see PublishError below)
}
})
```

`QueueMessage` returns immediately and drops the message (returning an error) if
the internal buffer is full. The optional callback is invoked once the batch
containing the event has been sent.

#### Delivery mode

`PublishBatch` supports two modes, selected by `transaction_enabled`:

- **Partial delivery (default)** — valid events are produced and per-event errors
are returned for the invalid ones, rather than failing the whole batch. The
callback for a failed event receives a `*batch.PublishError`.
- **All-or-nothing** — enable with `batch.WithTransactionEnabled(true)`. Any
per-event failure fails the entire batch and every callback receives the error.

The single-shot client exposes the same option via
`chipingress.EventsToBatchWithOpts(events, chipingress.WithTransactionEnabled(true))`.

#### Per-event error codes

A `*batch.PublishError` carries a `Code` and `Reason`:

| Code | Meaning |
| --- | --- |
| `ErrCodeValidationFailed` | CloudEvent structure is invalid (missing required fields, bad attribute). |
| `ErrCodeSchemaMissing` | No schema registered for the event's subject. Register it first. |
| `ErrCodeEncodeError` | Payload could not be encoded against its registered schema. |
| `ErrCodeDomainMisconfiguration` | Event source does not map to a known domain. |
| `ErrCodeResultsMismatch` | Client-side: server returned fewer results than events sent. |

Results are positional — `results[i]` corresponds to the i-th queued event. A
gRPC-level error (e.g. connection failure) fails every callback in the batch
regardless of delivery mode.

#### Sizing

`WithMaxGRPCRequestSize` bounds the serialized request size; oversized batches are
split into multiple requests, and a single event larger than the limit fails its
callback rather than being sent. Each queued event is also stamped with a
monotonic `seqnum` extension per `(source, type)` pair so downstream consumers can
detect gaps.

### OpenTelemetry Integration

The client automatically instruments gRPC calls with OpenTelemetry for distributed tracing and metrics.
Expand Down
28 changes: 18 additions & 10 deletions pkg/chipingress/batch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
chipingresspb "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb"
)

type messageWithCallback struct {
Expand All @@ -43,12 +44,12 @@ func (e *PublishError) Error() string {
// Error codes returned by the server in PublishError.Code.
// Re-exported from the proto package for convenience.
const (
ErrCodeUnknown = chipingress.PublishErrorCode(0) // PUBLISH_ERROR_CODE_UNKNOWN
ErrCodeValidationFailed = chipingress.PublishErrorCode(1) // PUBLISH_ERROR_CODE_VALIDATION_FAILED
ErrCodeSchemaMissing = chipingress.PublishErrorCode(2) // PUBLISH_ERROR_CODE_SCHEMA_MISSING
ErrCodeEncodeError = chipingress.PublishErrorCode(3) // PUBLISH_ERROR_CODE_ENCODE_ERROR
ErrCodeDomainMisconfiguration = chipingress.PublishErrorCode(4) // PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION
ErrCodeResultsMismatch = chipingress.PublishErrorCode(-1) // client-side synthetic code
ErrCodeUnknown = chipingresspb.PublishErrorCode_PUBLISH_ERROR_CODE_UNKNOWN
ErrCodeValidationFailed = chipingresspb.PublishErrorCode_PUBLISH_ERROR_CODE_VALIDATION_FAILED
ErrCodeSchemaMissing = chipingresspb.PublishErrorCode_PUBLISH_ERROR_CODE_SCHEMA_MISSING
ErrCodeEncodeError = chipingresspb.PublishErrorCode_PUBLISH_ERROR_CODE_ENCODE_ERROR
ErrCodeDomainMisconfiguration = chipingresspb.PublishErrorCode_PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION
ErrCodeResultsMismatch = chipingress.PublishErrorCode(-1) // client-side synthetic code; no proto equivalent
)

// Client is a batching client that accumulates messages and sends them in batches.
Expand Down Expand Up @@ -314,8 +315,15 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback)
b.log.Errorw("failed to publish batch", "error", err)
b.completeBatchCallbacks(batchMessages, err)
} else if !b.transactionEnabled && resp != nil && len(resp.Results) > 0 {
b.completeBatchCallbacksFromResults(batchMessages, resp.Results)
b.completeBatchCallbacksFromResults(ctx, batchMessages, resp.Results)
} else {
// All-or-nothing (transactionEnabled), or partial delivery with no
// per-event results to dispatch. The latter assumes a server in
// partial-delivery mode always returns per-event results for a
// non-empty batch; if it returns an empty/nil response instead
// (e.g. an older server unaware of results), we treat the whole
// batch as succeeded for backwards compatibility. This means a
// silent server-side drop would be reported as success.
b.completeBatchCallbacks(batchMessages, nil)
}
Comment on lines +320 to 328
}
Expand Down Expand Up @@ -344,13 +352,13 @@ func (b *Client) completeBatchCallbacks(messages []*messageWithCallback, err err
// - If len(results) < len(messages): remaining callbacks get a synthetic RESULTS_MISMATCH error.
// - If len(results) > len(messages): extras are ignored.
// - If results[i].EventId != messages[i].event.Id: a warning is logged.
func (b *Client) completeBatchCallbacksFromResults(messages []*messageWithCallback, results []*chipingress.PublishResult) {
func (b *Client) completeBatchCallbacksFromResults(ctx context.Context, messages []*messageWithCallback, results []*chipingress.PublishResult) {
if len(results) != len(messages) {
b.log.Warnw("publish results length mismatch",
"results", len(results),
"messages", len(messages),
)
b.metrics.resultsMismatchTotal.Add(context.Background(), 1)
b.metrics.resultsMismatchTotal.Add(ctx, 1)
}

b.callbackWg.Go(func() {
Expand All @@ -377,7 +385,7 @@ func (b *Client) completeBatchCallbacksFromResults(messages []*messageWithCallba
"expected", msg.event.Id,
"got", result.EventId,
)
b.metrics.resultsMismatchTotal.Add(context.Background(), 1)
b.metrics.resultsMismatchTotal.Add(ctx, 1)
}
if result.Error != nil {
msg.callback(&PublishError{
Expand Down
49 changes: 49 additions & 0 deletions pkg/chipingress/batch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1951,6 +1951,55 @@ func TestTransactionEnabledEdgeCases(t *testing.T) {
assert.Contains(t, results["e3"].Error(), "server returned 1 results for 3 events")
})

t.Run("event_id mismatch still dispatches by index and records mismatch metric", func(t *testing.T) {
reader, restore := useTestMeterProvider(t)
defer restore()

mockClient := mocks.NewClient(t)
mockClient.EXPECT().Close().Return(nil).Maybe()
mockClient.
On("PublishBatch", mock.Anything, mock.Anything).
Return(&chipingress.PublishResponse{
Results: []*chipingress.PublishResult{
{EventId: "e1"},
// EventId disagrees with messages[1].event.Id ("e2"); the
// error must still be dispatched positionally to e2's callback.
{EventId: "wrong-id", Error: &chipingress.PublishError{ErrorCode: chipingress.PublishErrorCode(1), Reason: "bad"}},
},
}, nil)

client, err := NewBatchClient(mockClient, WithTransactionEnabled(false))
require.NoError(t, err)

var mu sync.Mutex
results := make(map[string]error)
messages := []*messageWithCallback{
{event: &chipingress.CloudEventPb{Id: "e1", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { mu.Lock(); results["e1"] = err; mu.Unlock() }},
{event: &chipingress.CloudEventPb{Id: "e2", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { mu.Lock(); results["e2"] = err; mu.Unlock() }},
}

client.sendBatch(t.Context(), messages)
client.maxConcurrentSends <- struct{}{}
<-client.maxConcurrentSends
client.callbackWg.Wait()

mu.Lock()
require.NoError(t, results["e1"])
require.Error(t, results["e2"], "positional dispatch: e2 gets the error despite the event_id mismatch")
var re *PublishError
require.ErrorAs(t, results["e2"], &re)
assert.Equal(t, ErrCodeValidationFailed, re.Code)
mu.Unlock()

client.Stop()
rm := collectResourceMetrics(t, reader)
mismatch := mustMetric(t, rm, "chip_ingress.batch.results_mismatch_total")
mismatchSum, ok := mismatch.Data.(metricdata.Sum[int64])
require.True(t, ok)
require.NotEmpty(t, mismatchSum.DataPoints)
assert.GreaterOrEqual(t, mismatchSum.DataPoints[0].Value, int64(1))
})

t.Run("more results than messages does not panic", func(t *testing.T) {
mockClient := mocks.NewClient(t)
mockClient.EXPECT().Close().Return(nil).Maybe()
Expand Down
Loading