Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ This is a library project without a main entry point. Integration is done by imp
- Intent-based transaction system: clients submit intents, sequencer processes them
- Key components:
- `ocp/rpc/`: gRPC service implementations (entry points for client requests): `transaction`, `account`, `currency`, `messaging`
- `ocp/worker/`: Background workers (nonce management, swap processing, sequencer, Geyser integration, account sync, currency tasks)
- `ocp/worker/`: Background workers (nonce management, swap processing, sequencer, Geyser integration, account sync, currency tasks, guaranteed task execution)
- `ocp/transaction/`: Transaction building and local nonce pool management
- `ocp/data/`: Data layer with Store interfaces for all domain entities
- `ocp/integration/`: Pluggable integration hooks (SubmitIntent, Swap, Geyser, Moderation, Antispam) for application-specific behavior
Expand Down Expand Up @@ -77,6 +77,13 @@ This is a library project without a main entry point. Integration is done by imp
- `solana/coinbasestableswapper/`: Solana program interface for Coinbase's Stable Swapper (used for USDC↔USDF)
- `coinbase/`: HTTP client for the Coinbase Developer Platform Onramp API (JWT auth, used as a swap funding source)

**Task System** (`ocp/task/`, `ocp/worker/task/`, `ocp/data/task/`)
- Durable, app-defined background work with at-least-once execution guarantees
- Apps return tasks from the `GetTasksToSchedule` SubmitIntent hook; the scheduler (`ocp/task/scheduler.go`) enqueues them in the *same* DB transaction that commits the intent, so scheduling is atomic with the intent
- After commit, a best-effort fast path attempts immediate execution; the background worker (`ocp/worker/task/`) sweeps `StatePending` tasks whose `NextAttemptAt` has elapsed and retries with backoff, dead-lettering to `StateFailed` after a max attempt count
- Execution is delegated to the app's `TaskExecutor` integration hook. Because tasks can run concurrently and more than once (fast path + worker, multiple processes), **TaskExecutor implementations must be idempotent** — the task ID is the natural dedup key. Concurrent state advances are resolved by an optimistic version check on the task record
- Task `Type`/`Data` are opaque to the base system; the implementing app owns their namespace and serialization

**Solana Integration**
- `solana/` package: Low-level Solana primitives (accounts, transactions, programs)
- `solana/token/`: SPL Token program interface
Expand Down
37 changes: 37 additions & 0 deletions ocp/data/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/code-payments/ocp-server/ocp/data/nonce"
"github.com/code-payments/ocp-server/ocp/data/rendezvous"
"github.com/code-payments/ocp-server/ocp/data/swap"
"github.com/code-payments/ocp-server/ocp/data/task"
"github.com/code-payments/ocp-server/ocp/data/timelock"
"github.com/code-payments/ocp-server/ocp/data/transaction"
"github.com/code-payments/ocp-server/ocp/data/vault"
Expand All @@ -49,6 +50,7 @@ import (
nonce_memory_client "github.com/code-payments/ocp-server/ocp/data/nonce/memory"
rendezvous_memory_client "github.com/code-payments/ocp-server/ocp/data/rendezvous/memory"
swap_memory_client "github.com/code-payments/ocp-server/ocp/data/swap/memory"
task_memory_client "github.com/code-payments/ocp-server/ocp/data/task/memory"
timelock_memory_client "github.com/code-payments/ocp-server/ocp/data/timelock/memory"
transaction_memory_client "github.com/code-payments/ocp-server/ocp/data/transaction/memory"
vault_memory_client "github.com/code-payments/ocp-server/ocp/data/vault/memory"
Expand All @@ -67,6 +69,7 @@ import (
nonce_postgres_client "github.com/code-payments/ocp-server/ocp/data/nonce/postgres"
rendezvous_postgres_client "github.com/code-payments/ocp-server/ocp/data/rendezvous/postgres"
swap_postgres_client "github.com/code-payments/ocp-server/ocp/data/swap/postgres"
task_postgres_client "github.com/code-payments/ocp-server/ocp/data/task/postgres"
timelock_postgres_client "github.com/code-payments/ocp-server/ocp/data/timelock/postgres"
transaction_postgres_client "github.com/code-payments/ocp-server/ocp/data/transaction/postgres"
vault_postgres_client "github.com/code-payments/ocp-server/ocp/data/vault/postgres"
Expand Down Expand Up @@ -228,6 +231,14 @@ type DatabaseData interface {
GetAllSwapsByState(ctx context.Context, state swap.State, opts ...query.Option) ([]*swap.Record, error)
GetSwapCountByState(ctx context.Context, state swap.State) (uint64, error)

// Tasks
// --------------------------------------------------------------------------------
PutAllTasks(ctx context.Context, records ...*task.Record) error
UpdateTask(ctx context.Context, record *task.Record) error
GetTaskById(ctx context.Context, taskId string) (*task.Record, error)
GetAllReadyTasksByState(ctx context.Context, state task.State, asOf time.Time, opts ...query.Option) ([]*task.Record, error)
GetTaskCountByState(ctx context.Context, state task.State) (uint64, error)

// Timelocks
// --------------------------------------------------------------------------------
SaveTimelock(ctx context.Context, record *timelock.Record) error
Expand Down Expand Up @@ -293,6 +304,7 @@ type DatabaseProvider struct {
nonces nonce.Store
rendezvous rendezvous.Store
swaps swap.Store
tasks task.Store
timelocks timelock.Store
transactions transaction.Store
vault vault.Store
Expand Down Expand Up @@ -339,6 +351,7 @@ func NewDatabaseProvider(dbConfig *pg.Config) (DatabaseData, error) {
nonces: nonce_postgres_client.New(db),
rendezvous: rendezvous_postgres_client.New(db),
swaps: swap_postgres_client.New(db),
tasks: task_postgres_client.New(db),
timelocks: timelock_postgres_client.New(db),
transactions: transaction_postgres_client.New(db),
vault: vault_postgres_client.New(db),
Expand Down Expand Up @@ -366,6 +379,7 @@ func NewTestDatabaseProvider() DatabaseData {
nonces: nonce_memory_client.New(),
rendezvous: rendezvous_memory_client.New(),
swaps: swap_memory_client.New(),
tasks: task_memory_client.New(),
timelocks: timelock_memory_client.New(),
transactions: transaction_memory_client.New(),
vault: vault_memory_client.New(),
Expand Down Expand Up @@ -818,6 +832,29 @@ func (dp *DatabaseProvider) GetSwapCountByState(ctx context.Context, state swap.
return dp.swaps.CountByState(ctx, state)
}

// Tasks
// --------------------------------------------------------------------------------

func (dp *DatabaseProvider) PutAllTasks(ctx context.Context, records ...*task.Record) error {
return dp.tasks.PutAll(ctx, records...)
}
func (dp *DatabaseProvider) UpdateTask(ctx context.Context, record *task.Record) error {
return dp.tasks.Update(ctx, record)
}
func (dp *DatabaseProvider) GetTaskById(ctx context.Context, taskId string) (*task.Record, error) {
return dp.tasks.GetByTaskId(ctx, taskId)
}
func (dp *DatabaseProvider) GetAllReadyTasksByState(ctx context.Context, state task.State, asOf time.Time, opts ...query.Option) ([]*task.Record, error) {
req, err := query.DefaultPaginationHandler(opts...)
if err != nil {
return nil, err
}
return dp.tasks.GetAllReadyByState(ctx, state, asOf, req.Cursor, req.Limit, req.SortBy)
}
func (dp *DatabaseProvider) GetTaskCountByState(ctx context.Context, state task.State) (uint64, error) {
return dp.tasks.CountByState(ctx, state)
}

// Timelocks
// --------------------------------------------------------------------------------
func (dp *DatabaseProvider) SaveTimelock(ctx context.Context, record *timelock.Record) error {
Expand Down
214 changes: 214 additions & 0 deletions ocp/data/task/memory/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package memory

import (
"context"
"errors"
"sort"
"sync"
"time"

"github.com/code-payments/ocp-server/database/query"
"github.com/code-payments/ocp-server/ocp/data/task"
)

type ById []*task.Record

func (a ById) Len() int { return len(a) }
func (a ById) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ById) Less(i, j int) bool { return a[i].Id < a[j].Id }

type store struct {
mu sync.RWMutex
records []*task.Record
last uint64
}

func New() task.Store {
return &store{}
}

func (s *store) PutAll(ctx context.Context, records ...*task.Record) error {
if len(records) == 0 {
return errors.New("empty task set")
}

for _, data := range records {
if data.Id > 0 {
return task.ErrExists
}

if err := data.Validate(); err != nil {
return err
}
}

s.mu.Lock()
defer s.mu.Unlock()

seen := make(map[string]struct{})
for _, data := range records {
if _, ok := seen[data.TaskId]; ok {
return task.ErrExists
}
seen[data.TaskId] = struct{}{}

if item := s.findByTaskId(data.TaskId); item != nil {
return task.ErrExists
}
}

for _, data := range records {
s.last++
data.Id = s.last
if data.CreatedAt.IsZero() {
data.CreatedAt = time.Now()
}
if data.NextAttemptAt.IsZero() {
data.NextAttemptAt = data.CreatedAt
}
data.Version++

c := data.Clone()
s.records = append(s.records, &c)
}

return nil
}

func (s *store) Update(ctx context.Context, data *task.Record) error {
if err := data.Validate(); err != nil {
return err
}

s.mu.Lock()
defer s.mu.Unlock()

item := s.findByTaskId(data.TaskId)
if item == nil || item.Version != data.Version {
return task.ErrStaleVersion
}

data.Version++

item.State = data.State
item.FailedAttempts = data.FailedAttempts
item.NextAttemptAt = data.NextAttemptAt
item.Version = data.Version

item.CopyTo(data)

return nil
}

func (s *store) GetByTaskId(ctx context.Context, taskId string) (*task.Record, error) {
s.mu.RLock()
defer s.mu.RUnlock()

item := s.findByTaskId(taskId)
if item == nil {
return nil, task.ErrNotFound
}

cloned := item.Clone()
return &cloned, nil
}

func (s *store) GetAllReadyByState(ctx context.Context, state task.State, asOf time.Time, cursor query.Cursor, limit uint64, direction query.Ordering) ([]*task.Record, error) {
s.mu.RLock()
defer s.mu.RUnlock()

items := s.findByState(state)
items = s.filterReady(items, asOf)

if items = s.filter(items, cursor, limit, direction); len(items) > 0 {
return cloneRecords(items), nil
}

return nil, task.ErrNotFound
}

func (s *store) CountByState(ctx context.Context, state task.State) (uint64, error) {
s.mu.RLock()
defer s.mu.RUnlock()

items := s.findByState(state)
return uint64(len(items)), nil
}

func (s *store) findByTaskId(taskId string) *task.Record {
for _, item := range s.records {
if item.TaskId == taskId {
return item
}
}
return nil
}

func (s *store) findByState(state task.State) []*task.Record {
var res []*task.Record
for _, item := range s.records {
if item.State == state {
res = append(res, item)
}
}
return res
}

func (s *store) filterReady(items []*task.Record, asOf time.Time) []*task.Record {
var res []*task.Record
for _, item := range items {
if !item.NextAttemptAt.After(asOf) {
res = append(res, item)
}
}
return res
}

func (s *store) filter(items []*task.Record, cursor query.Cursor, limit uint64, direction query.Ordering) []*task.Record {
var start uint64

start = 0
if direction == query.Descending {
start = s.last + 1
}
if len(cursor) > 0 {
start = cursor.ToUint64()
}

var res []*task.Record
for _, item := range items {
if item.Id > start && direction == query.Ascending {
res = append(res, item)
}
if item.Id < start && direction == query.Descending {
res = append(res, item)
}
}

if direction == query.Descending {
sort.Sort(sort.Reverse(ById(res)))
}

if len(res) >= int(limit) {
return res[:limit]
}

return res
}

func cloneRecords(items []*task.Record) []*task.Record {
var res []*task.Record
for _, item := range items {
cloned := item.Clone()
res = append(res, &cloned)
}
return res
}

func (s *store) reset() {
s.mu.Lock()
defer s.mu.Unlock()

s.records = nil
s.last = 0
}
15 changes: 15 additions & 0 deletions ocp/data/task/memory/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package memory

import (
"testing"

"github.com/code-payments/ocp-server/ocp/data/task/tests"
)

func TestTaskMemoryStore(t *testing.T) {
testStore := New()
teardown := func() {
testStore.(*store).reset()
}
tests.RunTests(t, testStore, teardown)
}
Loading
Loading