Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/smartcontractkit/chain-selectors v1.0.101
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-ccip/chains/evm v0.0.0-20260506144252-c100eabfda74
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260527110243-883689d933be
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260601205228-7587adfeb329
github.com/smartcontractkit/chainlink-common/keystore v1.1.0
github.com/smartcontractkit/chainlink-data-streams v0.1.15-0.20260522094612-5f9f748bd87a
github.com/smartcontractkit/chainlink-deployments-framework v0.105.0
Expand Down Expand Up @@ -500,7 +500,7 @@ require (
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 // indirect
github.com/smartcontractkit/chainlink-protos/svr v1.2.0 // indirect
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260323124644-faea187e6997 // indirect
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528173149-f5b8336b19d9 // indirect
github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260513123719-d347eaf314e1 // indirect
github.com/smartcontractkit/chainlink-sui v0.0.0-20260527160341-aa3adc0abf67 // indirect
github.com/smartcontractkit/chainlink-testing-framework/framework/components/chiprouter v1.0.4 // indirect
Expand Down Expand Up @@ -598,12 +598,12 @@ require (
go.yaml.in/yaml/v3 v3.0.4 // indirect
go.yaml.in/yaml/v4 v4.0.0-rc.4 // indirect
golang.org/x/arch v0.11.0 // indirect
golang.org/x/crypto v0.51.0 // indirect
golang.org/x/crypto v0.52.0 // indirect
golang.org/x/exp v0.0.0-20260508232706-74f9aab9d74a // indirect
golang.org/x/mod v0.36.0 // indirect
golang.org/x/net v0.54.0 // indirect
golang.org/x/net v0.55.0 // indirect
golang.org/x/oauth2 v0.36.0 // indirect
golang.org/x/sys v0.44.0 // indirect
golang.org/x/sys v0.45.0 // indirect
golang.org/x/term v0.43.0 // indirect
golang.org/x/time v0.15.0 // indirect
golang.org/x/tools v0.45.0 // indirect
Expand Down
20 changes: 10 additions & 10 deletions core/scripts/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 22 additions & 4 deletions core/services/ocr2/plugins/vault/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type ReadKVStore interface {
GetSecret(ctx context.Context, id *vault.SecretIdentifier) (*vault.StoredSecret, error)
GetMetadata(ctx context.Context, owner string) (*vault.StoredMetadata, error)
GetSecretIdentifiersCountForOwner(ctx context.Context, owner string) (int, error)
GetPendingQueueIndex(ctx context.Context) (*vault.StoredPendingQueueIndex, error)
GetPendingQueue(ctx context.Context) ([]*vault.StoredPendingQueueItem, error)
}

Expand All @@ -43,7 +44,7 @@ type WriteKVStore interface {
WriteSecret(ctx context.Context, id *vault.SecretIdentifier, secret *vault.StoredSecret) error
WriteMetadata(ctx context.Context, owner string, metadata *vault.StoredMetadata) error
DeleteSecret(ctx context.Context, id *vault.SecretIdentifier) error
WritePendingQueue(ctx context.Context, pending []*vault.StoredPendingQueueItem) error
WritePendingQueue(ctx context.Context, pending []*vault.StoredPendingQueueItem, writtenSeqNr uint64) error
}

func NewReadStore(reader ocr3_1types.KeyValueStateReader, metrics *pluginMetrics) *KVStore {
Expand Down Expand Up @@ -268,8 +269,8 @@ func (s *KVStore) DeleteSecret(ctx context.Context, id *vault.SecretIdentifier)
return nil
}

func (s *KVStore) GetPendingQueue(ctx context.Context) ([]*vault.StoredPendingQueueItem, error) {
defer s.trackDuration(ctx, "GetPendingQueue", time.Now())
func (s *KVStore) GetPendingQueueIndex(ctx context.Context) (*vault.StoredPendingQueueIndex, error) {
defer s.trackDuration(ctx, "GetPendingQueueIndex", time.Now())
indexBytes, err := s.reader.Read([]byte(pendingQueueIndex))
if err != nil {
return nil, fmt.Errorf("failed to read pending queue index: %w", err)
Expand All @@ -285,6 +286,20 @@ func (s *KVStore) GetPendingQueue(ctx context.Context) ([]*vault.StoredPendingQu
return nil, fmt.Errorf("failed to unmarshal pending queue index: %w", err)
}

return index, nil
}

func (s *KVStore) GetPendingQueue(ctx context.Context) ([]*vault.StoredPendingQueueItem, error) {
defer s.trackDuration(ctx, "GetPendingQueue", time.Now())
index, err := s.GetPendingQueueIndex(ctx)
if err != nil {
return nil, err
}

if index == nil {
return nil, nil
}

items := make([]*vault.StoredPendingQueueItem, 0, index.Length)
for i := range index.Length {
itemBytes, err := s.reader.Read([]byte(pendingQueueItemPrefix + strconv.Itoa(int(i))))
Expand Down Expand Up @@ -334,7 +349,7 @@ func (s *KVStore) deletePendingQueue() error {
return nil
}

func (s *KVStore) WritePendingQueue(ctx context.Context, pending []*vault.StoredPendingQueueItem) error {
func (s *KVStore) WritePendingQueue(ctx context.Context, pending []*vault.StoredPendingQueueItem, writtenSeqNr uint64) error {
defer s.trackDuration(ctx, "WritePendingQueue", time.Now())
err := s.deletePendingQueue()
if err != nil {
Expand All @@ -355,6 +370,9 @@ func (s *KVStore) WritePendingQueue(ctx context.Context, pending []*vault.Stored
newIndex := &vault.StoredPendingQueueIndex{
Length: int64(len(pending)),
}
if writtenSeqNr != 0 {
newIndex.WrittenSeqNr = writtenSeqNr
}
newIndexBytes, err := proto.Marshal(newIndex)
if err != nil {
return fmt.Errorf("failed to marshal new pending queue index: %w", err)
Expand Down
27 changes: 25 additions & 2 deletions core/services/ocr2/plugins/vault/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,8 @@ func TestKVStore_WritePendingRequests(t *testing.T) {
Id: "test-request-id-3",
Item: empty,
}
err = store.WritePendingQueue(t.Context(), []*vault.StoredPendingQueueItem{item, item2, item3})
const writtenSeqNr = uint64(42)
err = store.WritePendingQueue(t.Context(), []*vault.StoredPendingQueueItem{item, item2, item3}, writtenSeqNr)
require.NoError(t, err)

// Ensure index is correctly written.
Expand All @@ -433,6 +434,12 @@ func TestKVStore_WritePendingRequests(t *testing.T) {
}
require.NoError(t, proto.Unmarshal(indexBytes.data, index))
assert.Equal(t, int64(3), index.Length)
assert.Equal(t, writtenSeqNr, index.WrittenSeqNr)

indexFromStore, err := store.GetPendingQueueIndex(t.Context())
require.NoError(t, err)
require.NotNil(t, indexFromStore)
assert.Equal(t, writtenSeqNr, indexFromStore.WrittenSeqNr)

// Ensure queue items are correctly written.
itemBytes, exists := kv.m[pendingQueueItemPrefix+"0"]
Expand All @@ -454,9 +461,25 @@ func TestKVStore_WritePendingRequests(t *testing.T) {
assert.Equal(t, "test-request-id-3", item2.Id)

// Writing a shorter list deletes the old one.
err = store.WritePendingQueue(t.Context(), []*vault.StoredPendingQueueItem{item, item2})
err = store.WritePendingQueue(t.Context(), []*vault.StoredPendingQueueItem{item, item2}, 99)
require.NoError(t, err)

_, exists = kv.m[pendingQueueItemPrefix+"3"]
assert.False(t, exists)
}

func TestKVStore_WritePendingQueue_omitsWrittenSeqNrWhenZero(t *testing.T) {
kv := &kv{m: make(map[string]response)}
store := newTestWriteStore(t, kv)

empty, err := anypb.New(&emptypb.Empty{})
require.NoError(t, err)
item := &vault.StoredPendingQueueItem{Id: "test-request-id-1", Item: empty}

err = store.WritePendingQueue(t.Context(), []*vault.StoredPendingQueueItem{item}, 0)
require.NoError(t, err)

legacyIndexBytes, err := proto.Marshal(&vault.StoredPendingQueueIndex{Length: 1})
require.NoError(t, err)
assert.Equal(t, legacyIndexBytes, kv.m[pendingQueueIndex].data)
}
39 changes: 39 additions & 0 deletions core/services/ocr2/plugins/vault/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type pluginMetrics struct {
localQueueSize metric.Int64Histogram
observationPendingPackedItems metric.Int64Histogram
pendingQueueWrittenSize metric.Int64Histogram
pendingQueueStaleAutoEmpty metric.Int64Counter
localQueueBlobSkipped metric.Int64Counter
}

func newPluginMetrics(configDigest string) (*pluginMetrics, error) {
Expand Down Expand Up @@ -61,13 +63,31 @@ func newPluginMetrics(configDigest string) (*pluginMetrics, error) {
return nil, fmt.Errorf("failed to create pending queue written size histogram: %w", err)
}

pendingQueueStaleAutoEmpty, err := beholder.GetMeter().Int64Counter(
"platform_vault_plugin_pending_queue_stale_auto_empty_total",
metric.WithDescription("OCR rounds that skipped store-backed pending queue observations due to stale written_seq_nr."),
)
if err != nil {
return nil, fmt.Errorf("failed to create pending queue stale auto empty counter: %w", err)
}

localQueueBlobSkipped, err := beholder.GetMeter().Int64Counter(
"platform_vault_plugin_local_queue_blob_skipped_total",
metric.WithDescription("Local queue items skipped during Observation because a single blob payload exceeded max size."),
)
if err != nil {
return nil, fmt.Errorf("failed to create local queue blob skipped counter: %w", err)
}

return &pluginMetrics{
configDigest: configDigest,
queueOverflow: queueOverflow,
kvOperationDuration: kvOperationDuration,
localQueueSize: localQueueSize,
observationPendingPackedItems: observationPendingPackedItems,
pendingQueueWrittenSize: pendingQueueWrittenSize,
pendingQueueStaleAutoEmpty: pendingQueueStaleAutoEmpty,
localQueueBlobSkipped: localQueueBlobSkipped,
}, nil
}

Expand Down Expand Up @@ -119,3 +139,22 @@ func (m *pluginMetrics) trackPendingQueueWrittenSize(ctx context.Context, writte
attribute.String("configDigest", m.configDigest),
))
}

func (m *pluginMetrics) trackPendingQueueStaleAutoEmpty(ctx context.Context) {
if m == nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we have this elsewhere, but this is a smell -- it means we're potentially instantiating a nil pointer and calling methods on it

In prod we should never have that situation; this struct should always be initialized

return
}
m.pendingQueueStaleAutoEmpty.Add(ctx, 1, metric.WithAttributes(
attribute.String("configDigest", m.configDigest),
))
}

func (m *pluginMetrics) trackLocalQueueBlobSkipped(ctx context.Context, requestID string) {
if m == nil {
return
}
m.localQueueBlobSkipped.Add(ctx, 1, metric.WithAttributes(
attribute.String("configDigest", m.configDigest),
attribute.String("requestID", requestID),
))
}
Loading
Loading