diff --git a/database/dynamodb/test/util.go b/database/dynamodb/test/util.go new file mode 100644 index 0000000..7c72f6d --- /dev/null +++ b/database/dynamodb/test/util.go @@ -0,0 +1,87 @@ +package test + +import ( + "context" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + + "github.com/pkg/errors" + + "github.com/code-payments/ocp-server/retry" + "github.com/code-payments/ocp-server/retry/backoff" +) + +const ( + containerRepository = "amazon/dynamodb-local" + containerVersion = "2.5.2" + containerAutoKill = 120 // seconds + + port = "8000" + + region = "us-east-1" + accessKey = "dummy" + secretKey = "dummy" +) + +// TestEnv is a running dynamodb-local container with a connected client. +type TestEnv struct { + Pool *dockertest.Pool + Client *dynamodb.Client + Endpoint string +} + +// NewTestEnv starts a dynamodb-local Docker container and returns a connected +// client. It mirrors the postgres test harness. +func NewTestEnv() (*TestEnv, error) { + pool, err := dockertest.NewPool("") + if err != nil { + return nil, errors.Wrap(err, "could not connect to docker") + } + + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: containerRepository, + Tag: containerVersion, + Cmd: []string{"-jar", "DynamoDBLocal.jar", "-inMemory"}, + }, func(config *docker.HostConfig) { + config.AutoRemove = true + config.RestartPolicy = docker.RestartPolicy{Name: "no"} + }) + if err != nil { + return nil, errors.Wrap(err, "could not start resource") + } + resource.Expire(containerAutoKill) + + endpoint := "http://" + resource.GetHostPort(port+"/tcp") + client := NewClient(endpoint) + + // Wait for the container to accept connections. + _, err = retry.Retry( + func() error { + _, err := client.ListTables(context.Background(), &dynamodb.ListTablesInput{}) + return err + }, + retry.Limit(60), + retry.Backoff(backoff.Constant(500*time.Millisecond), 500*time.Millisecond), + ) + if err != nil { + return nil, errors.Wrap(err, "timed out waiting for dynamodb-local to become available") + } + + return &TestEnv{Pool: pool, Client: client, Endpoint: endpoint}, nil +} + +// NewClient builds a DynamoDB client pointed at the given local endpoint with +// dummy static credentials. +func NewClient(endpoint string) *dynamodb.Client { + return dynamodb.New(dynamodb.Options{ + Region: region, + BaseEndpoint: aws.String(endpoint), + Credentials: credentials.NewStaticCredentialsProvider(accessKey, secretKey, ""), + }) +} diff --git a/go.mod b/go.mod index 1f04580..c8d93d9 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,9 @@ require ( filippo.io/edwards25519 v1.1.0 github.com/anyproto/go-slip10 v1.0.1 github.com/aws/aws-sdk-go-v2 v1.42.0 + github.com/aws/aws-sdk-go-v2/credentials v1.19.24 github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.25 + github.com/aws/aws-sdk-go-v2/service/dynamodb v1.59.0 github.com/aws/aws-sdk-go-v2/service/s3 v1.102.2 github.com/code-payments/code-vm-indexer v1.2.0 github.com/code-payments/ocp-protobuf-api v1.13.2-0.20260610171241-de46af911053 @@ -44,12 +46,13 @@ require ( github.com/Microsoft/go-winio v0.4.14 // indirect github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.11 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.25 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.25 // indirect - github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.26 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.10 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.30 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.12 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.18 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.25 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.12.6 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.29 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.25 // indirect github.com/aws/smithy-go v1.27.1 // indirect github.com/cenkalti/backoff/v4 v4.1.0 // indirect diff --git a/go.sum b/go.sum index 8d9f5e4..655fd4a 100644 --- a/go.sum +++ b/go.sum @@ -37,20 +37,26 @@ github.com/aws/aws-sdk-go-v2 v1.42.0 h1:XvXMJTkFQtpBKIWZnmr9ZEOc2InWM2yldjXEJ/by github.com/aws/aws-sdk-go-v2 v1.42.0/go.mod h1:27+ACypSLljLAEKsCYOmrjKh83vuTRkuAe9Uv/3A4bg= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.11 h1:h5+3VT69KUBK24grGuuA5saDJTj2IIjLb9au668Fo5I= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.11/go.mod h1:dnakxebH6UwFvcvujL0LVggYQ8nEvBGjU4G/V79Nv94= +github.com/aws/aws-sdk-go-v2/credentials v1.19.24 h1:2hQqYCV9yqyePQ9o6dCrZc/zO8U3TwPr9mIKlZnPu/I= +github.com/aws/aws-sdk-go-v2/credentials v1.19.24/go.mod h1:IDwpACtwqHLISdzfwUUNq4P9DsB/h5BLg4FwJPNfqFY= github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.25 h1:EeK30mZmhopHcNmKykyGF0LwmFB1ZwQNr+FeyRjcN0U= github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.25/go.mod h1:tudVnwAJyXgCh4N6ABYdzUM+i+PXsx8700FOyhMn+4k= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.25 h1:Uii3frf9ztec/ABM2/FSH9/z7PLzxfpG8h4RpkUFflQ= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.25/go.mod h1:G6kntsA2GorAxDPbap6xgB2F+amSLUF8GJTi7PUoX44= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.25 h1:r1+/l6m+WaUJF9HISEsNOLHSNj5EXYQxK8VX6Cz9NlA= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.25/go.mod h1:cKf+D+NMDK1LndD7BowHbBZPgR9V0/5HubH0PFWvA+c= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.26 h1:A1PmWU2zfkIm9EyFlJncFXL4W4phML+h8KjltUsCvNQ= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.26/go.mod h1:dY4MRzXEizrD4hqtpKvWVGPX7QleSGGVY+EBolo1RmM= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.10 h1:d5/908OJ4bXg8lyjeMPvXetEKqoDoLi5Owy1zNue3yg= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.10/go.mod h1:a57l7Hwh+FWI+we50g5NPJHYUKeJKfXbc4w8SyXu8Ig= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 h1:f3vKqSo13fhTYb+JEcXwXefZQE26I1FB5eTSniU67ko= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29/go.mod h1:MzoLFUArKGpGD+ukmPiTPG1X5x4o6M2kq4v2dr1FiEc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 h1:RdwIf/CuUsvJX3RgJagbOyotl/cxoLY4xviKuE7p2GY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29/go.mod h1:71wt8W2EgswdZy9Mf9KNnzxZ3TiZlv4caKghPktDOkA= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.30 h1:VTGy885W5DKBxWRUJbym9hytNaYzsyaPkCHGRRMAOhU= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.30/go.mod h1:AS0HycUvJRFvTt613AYDOgO2jzw+00cVSMny8XB3yMY= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.59.0 h1:S1qETDbdXKZMYVveuxACCKuRqnAt2NlnmYnlq5SeuMY= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.59.0/go.mod h1:jLkDwIDBkCIpiENQhAOjAR2L9jwj56mZgVEvuro4gUE= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.12 h1:ZD2+BSw9vFsNlKYIasSNt3uDbjqqXIBcM13UJv/Lx2k= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.12/go.mod h1:Ms4zlcVBbXbiP7EVLhl+lgjvA/a7YphqQ3Ih3174EmI= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.18 h1:W/EyPFl9A5rXrtoilfwHYEvzHER+K4SpBPtMXi24Mos= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.18/go.mod h1:UG50K+pvd/uy6xExbobg0rjqFBFZe6I3l75EPDZw4tg= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.25 h1:dD3dhHNglpd98gs72my22Ndqi1hqQGllFFg1F+twfxg= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.25/go.mod h1:0yAbjPfd64gG7mj85RW+fMEYdfBgCRZw8g/oWcL1pjc= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.12.6 h1:Bs2OwYq0HBgHYwfGmUwYIPtTNaGMGAHkRje4jmW2VoI= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.12.6/go.mod h1:OTctu4cW8t7/TRlTKPLT6akzyOkfceMWhtEHqtYDIQQ= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.29 h1:DRebniUGZ2MqiiIVmQJ04vIXr918hubdHMnarSLEWyU= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.29/go.mod h1:LfRkPCD8YHDM2E5eTkos2UpwYeZnBcVarTa8L59bJHA= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.25 h1:2pQEbwf+/6EDbiit/GcBE2K4IUpMZymaA0kOz3xK978= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.25/go.mod h1:KvT6NCcQ0EZ+ZkVRrlBMt04Po3ok23YELEp7WimhLhM= github.com/aws/aws-sdk-go-v2/service/s3 v1.102.2 h1:ie4ElCmUKS26pzrZcIk/lmt4yWjAqLLcawstyQCh298= diff --git a/ocp/data/internal.go b/ocp/data/internal.go index 2fd5e42..b846cf0 100644 --- a/ocp/data/internal.go +++ b/ocp/data/internal.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/google/uuid" "github.com/jmoiron/sqlx" "github.com/code-payments/ocp-server/cache" @@ -27,7 +26,6 @@ import ( "github.com/code-payments/ocp-server/ocp/data/deposit" "github.com/code-payments/ocp-server/ocp/data/fulfillment" "github.com/code-payments/ocp-server/ocp/data/intent" - "github.com/code-payments/ocp-server/ocp/data/messaging" "github.com/code-payments/ocp-server/ocp/data/nonce" "github.com/code-payments/ocp-server/ocp/data/rendezvous" "github.com/code-payments/ocp-server/ocp/data/swap" @@ -46,7 +44,6 @@ import ( deposit_memory_client "github.com/code-payments/ocp-server/ocp/data/deposit/memory" fulfillment_memory_client "github.com/code-payments/ocp-server/ocp/data/fulfillment/memory" intent_memory_client "github.com/code-payments/ocp-server/ocp/data/intent/memory" - messaging_memory_client "github.com/code-payments/ocp-server/ocp/data/messaging/memory" nonce_memory_client "github.com/code-payments/ocp-server/ocp/data/nonce/memory" rendezvous_memory_client "github.com/code-payments/ocp-server/ocp/data/rendezvous/memory" swap_memory_client "github.com/code-payments/ocp-server/ocp/data/swap/memory" @@ -65,7 +62,6 @@ import ( deposit_postgres_client "github.com/code-payments/ocp-server/ocp/data/deposit/postgres" fulfillment_postgres_client "github.com/code-payments/ocp-server/ocp/data/fulfillment/postgres" intent_postgres_client "github.com/code-payments/ocp-server/ocp/data/intent/postgres" - messaging_postgres_client "github.com/code-payments/ocp-server/ocp/data/messaging/postgres" nonce_postgres_client "github.com/code-payments/ocp-server/ocp/data/nonce/postgres" rendezvous_postgres_client "github.com/code-payments/ocp-server/ocp/data/rendezvous/postgres" swap_postgres_client "github.com/code-payments/ocp-server/ocp/data/swap/postgres" @@ -197,12 +193,6 @@ type DatabaseData interface { GetUsdCostBasis(ctx context.Context, owner string, mint string) (float64, error) GetUsdCostBasisBatch(ctx context.Context, mint string, owners ...string) (map[string]float64, error) - // Messaging - // -------------------------------------------------------------------------------- - CreateMessage(ctx context.Context, record *messaging.Record) error - GetMessages(ctx context.Context, account string) ([]*messaging.Record, error) - DeleteMessage(ctx context.Context, account string, messageID uuid.UUID) error - // Nonces // -------------------------------------------------------------------------------- GetNonce(ctx context.Context, address string) (*nonce.Record, error) @@ -300,7 +290,6 @@ type DatabaseProvider struct { deposits deposit.Store fulfillments fulfillment.Store intents intent.Store - messages messaging.Store nonces nonce.Store rendezvous rendezvous.Store swaps swap.Store @@ -347,7 +336,6 @@ func NewDatabaseProvider(dbConfig *pg.Config) (DatabaseData, error) { deposits: deposit_postgres_client.New(db), fulfillments: fulfillment_postgres_client.New(db), intents: intent_postgres_client.New(db), - messages: messaging_postgres_client.New(db), nonces: nonce_postgres_client.New(db), rendezvous: rendezvous_postgres_client.New(db), swaps: swap_postgres_client.New(db), @@ -375,7 +363,6 @@ func NewTestDatabaseProvider() DatabaseData { deposits: deposit_memory_client.New(), fulfillments: fulfillment_memory_client.New(), intents: intent_memory_client.New(), - messages: messaging_memory_client.New(), nonces: nonce_memory_client.New(), rendezvous: rendezvous_memory_client.New(), swaps: swap_memory_client.New(), @@ -740,17 +727,6 @@ func (dp *DatabaseProvider) GetUsdCostBasisBatch(ctx context.Context, mint strin return dp.intents.GetUsdCostBasisBatch(ctx, mint, owners...) } -// Messaging -// -------------------------------------------------------------------------------- -func (dp *DatabaseProvider) CreateMessage(ctx context.Context, record *messaging.Record) error { - return dp.messages.Insert(ctx, record) -} -func (dp *DatabaseProvider) GetMessages(ctx context.Context, account string) ([]*messaging.Record, error) { - return dp.messages.Get(ctx, account) -} -func (dp *DatabaseProvider) DeleteMessage(ctx context.Context, account string, messageID uuid.UUID) error { - return dp.messages.Delete(ctx, account, messageID) -} // Nonces // -------------------------------------------------------------------------------- diff --git a/ocp/data/messaging/dynamodb/store.go b/ocp/data/messaging/dynamodb/store.go new file mode 100644 index 0000000..d1850e4 --- /dev/null +++ b/ocp/data/messaging/dynamodb/store.go @@ -0,0 +1,174 @@ +// Package dynamodb implements the messaging.Store interface on top of DynamoDB. +// +// Each account is a bin (partition); each message is one item within it: +// +// pk = "" sk = "" message = +// +// Insert is a conditional Put (so a duplicate message ID in the bin fails), +// Delete is an idempotent DeleteItem, and Get is a single Query over the bin's +// partition. +package dynamodb + +import ( + "context" + "errors" + "fmt" + "strconv" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/google/uuid" + + "github.com/code-payments/ocp-server/ocp/data/messaging" +) + +const ( + attrPK = "pk" + attrSK = "sk" + attrMessage = "message" + attrExpiresAt = "expires_at" +) + +type store struct { + client *dynamodb.Client + table string +} + +// New returns a messaging.Store backed by the given DynamoDB table. +// Use CreateTables to provision it. +func New(client *dynamodb.Client, table string) messaging.Store { + return &store{ + client: client, + table: table, + } +} + +// Insert implements messaging.Store.Insert. +func (s *store) Insert(ctx context.Context, record *messaging.Record) error { + if err := record.Validate(); err != nil { + return err + } + + item := map[string]types.AttributeValue{ + attrPK: avS(record.Account), + attrSK: avS(record.MessageID.String()), + attrMessage: avB(record.Message), + } + // DynamoDB TTL expects epoch seconds; a zero ExpiresAt means no expiry. + if !record.ExpiresAt.IsZero() { + item[attrExpiresAt] = avN(record.ExpiresAt.Unix()) + } + + _, err := s.client.PutItem(ctx, &dynamodb.PutItemInput{ + TableName: aws.String(s.table), + Item: item, + ConditionExpression: aws.String(fmt.Sprintf("attribute_not_exists(%s)", attrPK)), + }) + if err != nil { + var ccf *types.ConditionalCheckFailedException + if errors.As(err, &ccf) { + return messaging.ErrDuplicateMessageID + } + return err + } + return nil +} + +// Delete implements messaging.Store.Delete. It is idempotent: deleting a message +// that does not exist is a no-op. +func (s *store) Delete(ctx context.Context, account string, messageID uuid.UUID) error { + _, err := s.client.DeleteItem(ctx, &dynamodb.DeleteItemInput{ + TableName: aws.String(s.table), + Key: map[string]types.AttributeValue{ + attrPK: avS(account), + attrSK: avS(messageID.String()), + }, + }) + return err +} + +// Get implements messaging.Store.Get. +func (s *store) Get(ctx context.Context, account string) ([]*messaging.Record, error) { + now := time.Now() + var records []*messaging.Record + input := &dynamodb.QueryInput{ + TableName: aws.String(s.table), + KeyConditionExpression: aws.String(fmt.Sprintf("%s = :pk", attrPK)), + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":pk": avS(account), + }, + } + for { + out, err := s.client.Query(ctx, input) + if err != nil { + return nil, err + } + for _, item := range out.Items { + record, err := recordFromItem(account, item) + if err != nil { + return nil, err + } + // DynamoDB TTL deletion is lazy, so drop already-expired messages here. + if !record.ExpiresAt.IsZero() && !record.ExpiresAt.After(now) { + continue + } + records = append(records, record) + } + if len(out.LastEvaluatedKey) == 0 { + break + } + input.ExclusiveStartKey = out.LastEvaluatedKey + } + return records, nil +} + +// reset deletes every item from the table, for tests. +func (s *store) reset() { + if err := clearTable(context.Background(), s.client, s.table); err != nil { + panic(err) + } +} + +// recordFromItem rebuilds a record from a bin item. The account is the query +// parameter (it is the partition key, so it is not re-parsed from the item). +func recordFromItem(account string, item map[string]types.AttributeValue) (*messaging.Record, error) { + id, err := uuid.Parse(asS(item[attrSK])) + if err != nil { + return nil, err + } + record := &messaging.Record{ + Account: account, + MessageID: id, + Message: asB(item[attrMessage]), + } + if av, ok := item[attrExpiresAt].(*types.AttributeValueMemberN); ok { + epoch, err := strconv.ParseInt(av.Value, 10, 64) + if err != nil { + return nil, err + } + record.ExpiresAt = time.Unix(epoch, 0).UTC() + } + return record, nil +} + +func avS(v string) types.AttributeValue { return &types.AttributeValueMemberS{Value: v} } +func avB(v []byte) types.AttributeValue { return &types.AttributeValueMemberB{Value: v} } +func avN(v int64) types.AttributeValue { + return &types.AttributeValueMemberN{Value: strconv.FormatInt(v, 10)} +} + +func asS(av types.AttributeValue) string { + if s, ok := av.(*types.AttributeValueMemberS); ok { + return s.Value + } + return "" +} + +func asB(av types.AttributeValue) []byte { + if b, ok := av.(*types.AttributeValueMemberB); ok { + return b.Value + } + return nil +} diff --git a/ocp/data/messaging/dynamodb/store_test.go b/ocp/data/messaging/dynamodb/store_test.go new file mode 100644 index 0000000..a43940c --- /dev/null +++ b/ocp/data/messaging/dynamodb/store_test.go @@ -0,0 +1,41 @@ +package dynamodb + +import ( + "context" + "os" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + dynamotest "github.com/code-payments/ocp-server/database/dynamodb/test" + "github.com/code-payments/ocp-server/ocp/data/messaging/tests" +) + +const messagingTable = "messaging_test" + +var testEnv *dynamotest.TestEnv + +func TestMain(m *testing.M) { + log := zap.Must(zap.NewDevelopment()) + + env, err := dynamotest.NewTestEnv() + if err != nil { + log.With(zap.Error(err)).Error("Error creating dynamodb test environment") + os.Exit(1) + } + + testEnv = env + + os.Exit(m.Run()) +} + +func TestMessaging_DynamoDBStore(t *testing.T) { + require.NoError(t, CreateTables(context.Background(), testEnv.Client, messagingTable)) + + testStore := New(testEnv.Client, messagingTable).(*store) + teardown := func() { + testStore.reset() + } + tests.RunTests(t, testStore, teardown) +} diff --git a/ocp/data/messaging/dynamodb/table.go b/ocp/data/messaging/dynamodb/table.go new file mode 100644 index 0000000..487154e --- /dev/null +++ b/ocp/data/messaging/dynamodb/table.go @@ -0,0 +1,115 @@ +package dynamodb + +import ( + "context" + "errors" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +// maxBatchWriteItems is DynamoDB's per-call BatchWriteItem limit. +const maxBatchWriteItems = 25 + +// CreateTables provisions the messaging table with on-demand billing, keyed by +// (pk, sk) with no secondary indexes. It is idempotent: a table that already +// exists is left as-is. The call blocks until the table is ACTIVE. +func CreateTables(ctx context.Context, client *dynamodb.Client, table string) error { + _, err := client.CreateTable(ctx, &dynamodb.CreateTableInput{ + TableName: aws.String(table), + BillingMode: types.BillingModePayPerRequest, + AttributeDefinitions: []types.AttributeDefinition{ + {AttributeName: aws.String(attrPK), AttributeType: types.ScalarAttributeTypeS}, + {AttributeName: aws.String(attrSK), AttributeType: types.ScalarAttributeTypeS}, + }, + KeySchema: []types.KeySchemaElement{ + {AttributeName: aws.String(attrPK), KeyType: types.KeyTypeHash}, + {AttributeName: aws.String(attrSK), KeyType: types.KeyTypeRange}, + }, + }) + if err != nil { + var inUse *types.ResourceInUseException + if !errors.As(err, &inUse) { + return err + } + // Already exists; still ensure it is ACTIVE before returning. + } + + if err := dynamodb.NewTableExistsWaiter(client).Wait(ctx, &dynamodb.DescribeTableInput{ + TableName: aws.String(table), + }, 2*time.Minute); err != nil { + return err + } + + return ensureTTL(ctx, client, table, attrExpiresAt) +} + +// ensureTTL idempotently enables DynamoDB TTL on the table's attr. Enabling when +// already enabled (or enabling) is a no-op, so re-running CreateTables is safe. +func ensureTTL(ctx context.Context, client *dynamodb.Client, table, attr string) error { + desc, err := client.DescribeTimeToLive(ctx, &dynamodb.DescribeTimeToLiveInput{ + TableName: aws.String(table), + }) + if err != nil { + return err + } + if d := desc.TimeToLiveDescription; d != nil { + switch d.TimeToLiveStatus { + case types.TimeToLiveStatusEnabled, types.TimeToLiveStatusEnabling: + return nil + } + } + + _, err = client.UpdateTimeToLive(ctx, &dynamodb.UpdateTimeToLiveInput{ + TableName: aws.String(table), + TimeToLiveSpecification: &types.TimeToLiveSpecification{ + Enabled: aws.Bool(true), + AttributeName: aws.String(attr), + }, + }) + return err +} + +// clearTable deletes every item from the table, for tests. +func clearTable(ctx context.Context, client *dynamodb.Client, table string) error { + var startKey map[string]types.AttributeValue + for { + out, err := client.Scan(ctx, &dynamodb.ScanInput{ + TableName: aws.String(table), + ProjectionExpression: aws.String(attrPK + ", " + attrSK), + ExclusiveStartKey: startKey, + }) + if err != nil { + return err + } + + for start := 0; start < len(out.Items); start += maxBatchWriteItems { + end := start + maxBatchWriteItems + if end > len(out.Items) { + end = len(out.Items) + } + requests := make([]types.WriteRequest, 0, end-start) + for _, item := range out.Items[start:end] { + requests = append(requests, types.WriteRequest{ + DeleteRequest: &types.DeleteRequest{Key: map[string]types.AttributeValue{ + attrPK: item[attrPK], + attrSK: item[attrSK], + }}, + }) + } + if _, err := client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ + RequestItems: map[string][]types.WriteRequest{table: requests}, + }); err != nil { + return err + } + } + + if len(out.LastEvaluatedKey) == 0 { + break + } + startKey = out.LastEvaluatedKey + } + return nil +} diff --git a/ocp/data/messaging/memory/store.go b/ocp/data/messaging/memory/store.go index a6eed43..d57fee9 100644 --- a/ocp/data/messaging/memory/store.go +++ b/ocp/data/messaging/memory/store.go @@ -3,6 +3,7 @@ package memory import ( "context" "sync" + "time" "github.com/google/uuid" @@ -63,8 +64,12 @@ func (s *store) Get(_ context.Context, account string) ([]*messaging.Record, err items := s.findByAccount(account) + now := time.Now() var copied []*messaging.Record for _, item := range items { + if !item.ExpiresAt.IsZero() && !item.ExpiresAt.After(now) { + continue + } cloned := item.Clone() copied = append(copied, &cloned) } diff --git a/ocp/data/messaging/postgres/model.go b/ocp/data/messaging/postgres/model.go deleted file mode 100644 index 0a3646c..0000000 --- a/ocp/data/messaging/postgres/model.go +++ /dev/null @@ -1,101 +0,0 @@ -package postgres - -import ( - "context" - "database/sql" - "time" - - "github.com/google/uuid" - "github.com/jmoiron/sqlx" - "github.com/pkg/errors" - - pgutil "github.com/code-payments/ocp-server/database/postgres" - - "github.com/code-payments/ocp-server/ocp/data/messaging" -) - -const ( - tableName = "ocp__core_message" -) - -type model struct { - Id sql.NullInt64 `db:"id"` - Account string `db:"account"` - MessageID string `db:"message_id"` - Message []byte `db:"message"` - CreatedAt time.Time `db:"created_at"` -} - -func toModel(record *messaging.Record) (*model, error) { - if err := record.Validate(); err != nil { - return nil, err - } - - if len(record.Account) == 0 { - return nil, errors.New("empty account") - } - - if record.Message == nil || len(record.Message) == 0 { - return nil, errors.New("empty message id") - } - - return &model{ - Account: record.Account, - MessageID: record.MessageID.String(), - Message: record.Message, - // The only time we call toModel is on create, so it's fine to default - // to UTC now. - CreatedAt: time.Now().UTC(), - }, nil -} - -func fromModel(obj *model) (*messaging.Record, error) { - parsedMessageID, err := uuid.Parse(obj.MessageID) - if err != nil { - return nil, errors.Wrap(err, "failure parsing message id") - } - - return &messaging.Record{ - Account: obj.Account, - MessageID: parsedMessageID, - Message: obj.Message, - }, nil -} - -func (m *model) dbSave(ctx context.Context, db *sqlx.DB) error { - return pgutil.ExecuteInTx(ctx, db, sql.LevelDefault, func(tx *sqlx.Tx) error { - query := `INSERT INTO ` + tableName + ` - ( - account, message_id, message, created_at - ) VALUES ($1,$2,$3,$4) RETURNING *;` - - err := tx.QueryRowxContext(ctx, query, - m.Account, - m.MessageID, - m.Message, - m.CreatedAt, - ).StructScan(m) - - return pgutil.CheckUniqueViolation(err, messaging.ErrDuplicateMessageID) - }) -} - -func dbGetAllForAccount(ctx context.Context, db *sqlx.DB, account string) ([]*model, error) { - res := []*model{} - - query := `SELECT account, message_id, message FROM ` + tableName + ` - WHERE account = $1` - - err := db.SelectContext(ctx, &res, query, account) - if err != nil { - return nil, err - } - return res, nil -} - -func dbDelete(ctx context.Context, db *sqlx.DB, account, messageID string) error { - query := `DELETE FROM ` + tableName + ` - WHERE account = $1 AND message_id = $2;` - _, err := db.ExecContext(ctx, query, account, messageID) - return err -} diff --git a/ocp/data/messaging/postgres/model_test.go b/ocp/data/messaging/postgres/model_test.go deleted file mode 100644 index c8f97ae..0000000 --- a/ocp/data/messaging/postgres/model_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package postgres - -import ( - "crypto/ed25519" - "testing" - - "github.com/google/uuid" - "github.com/mr-tron/base58" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/proto" - - commonpb "github.com/code-payments/ocp-protobuf-api/generated/go/common/v1" - messagingpb "github.com/code-payments/ocp-protobuf-api/generated/go/messaging/v1" - - "github.com/code-payments/ocp-server/ocp/data/messaging" -) - -func TestModelConversion(t *testing.T) { - pub, _, err := ed25519.GenerateKey(nil) - require.NoError(t, err) - requestor, _, err := ed25519.GenerateKey(nil) - require.NoError(t, err) - - account := &commonpb.SolanaAccountId{Value: pub} - messageID := uuid.New() - idBytes, _ := messageID.MarshalBinary() - message := &messagingpb.Message{ - Id: &messagingpb.MessageId{ - Value: idBytes, - }, - Kind: &messagingpb.Message_RequestToGrabBill{ - RequestToGrabBill: &messagingpb.RequestToGrabBill{ - RequestorAccount: &commonpb.SolanaAccountId{ - Value: requestor, - }, - }, - }, - } - messageBytes, err := proto.Marshal(message) - require.NoError(t, err) - - record := &messaging.Record{ - Account: base58.Encode(account.Value), - MessageID: messageID, - Message: messageBytes, - } - - model, err := toModel(record) - require.NoError(t, err) - assert.Equal(t, model.Account, base58.Encode(account.Value)) - assert.Equal(t, model.MessageID, messageID.String()) - assert.Equal(t, model.Message, messageBytes) - - actual, err := fromModel(model) - require.NoError(t, err) - assert.Equal(t, actual, record) -} diff --git a/ocp/data/messaging/postgres/store.go b/ocp/data/messaging/postgres/store.go deleted file mode 100644 index ac96187..0000000 --- a/ocp/data/messaging/postgres/store.go +++ /dev/null @@ -1,60 +0,0 @@ -package postgres - -import ( - "context" - "database/sql" - - "github.com/google/uuid" - "github.com/jmoiron/sqlx" - - "github.com/code-payments/ocp-server/ocp/data/messaging" -) - -// todo: This doesn't support TTL expiries, which is fine for now. We can -// manually delete old entries while in an invite-only testing phase. -type store struct { - db *sqlx.DB -} - -// New returns a postgres backed messaging.Store. -func New(db *sql.DB) messaging.Store { - return &store{ - db: sqlx.NewDb(db, "pgx"), - } -} - -// Insert implements messaging.Store.Insert. -func (s *store) Insert(ctx context.Context, record *messaging.Record) error { - model, err := toModel(record) - if err != nil { - return err - } - - return model.dbSave(ctx, s.db) -} - -// Delete implements messaging.Store.Delete. -func (s *store) Delete(ctx context.Context, account string, messageID uuid.UUID) error { - return dbDelete(ctx, s.db, account, messageID.String()) -} - -// Get implements messaging.Store.Get. -func (s *store) Get(ctx context.Context, account string) ([]*messaging.Record, error) { - models, err := dbGetAllForAccount(ctx, s.db, account) - if err != nil { - return nil, err - } - - records := make([]*messaging.Record, len(models)) - for i, model := range models { - record, err := fromModel(model) - if err != nil { - // todo(safety): this is the equivalent QoS brick case, although should be less problematic. - // we could have a valve to ignore, and also to delete - return nil, err - } - records[i] = record - } - - return records, nil -} diff --git a/ocp/data/messaging/postgres/store_test.go b/ocp/data/messaging/postgres/store_test.go deleted file mode 100644 index e3404a9..0000000 --- a/ocp/data/messaging/postgres/store_test.go +++ /dev/null @@ -1,108 +0,0 @@ -package postgres - -import ( - "database/sql" - "os" - "testing" - - "github.com/ory/dockertest/v3" - "go.uber.org/zap" - - "github.com/code-payments/ocp-server/ocp/data/messaging" - "github.com/code-payments/ocp-server/ocp/data/messaging/tests" - - postgrestest "github.com/code-payments/ocp-server/database/postgres/test" - - _ "github.com/jackc/pgx/v4/stdlib" -) - -const ( - // Used for testing ONLY, the table and migrations are external to this repository - tableCreate = ` - CREATE TABLE ocp__core_message ( - id SERIAL NOT NULL PRIMARY KEY, - - account TEXT NOT NULL, - message_id UUID NOT NULL, - message BYTEA NOT NULL, - created_at TIMESTAMP WITH TIME ZONE, - - CONSTRAINT ocp__core_message__uniq__account__and__message_id UNIQUE (account, message_id) - ); - ` - - // Used for testing ONLY, the table and migrations are external to this repository - tableDestroy = ` - DROP TABLE ocp__core_message; - ` -) - -var ( - testStore messaging.Store - teardown func() -) - -func TestMain(m *testing.M) { - log := zap.Must(zap.NewDevelopment()) - - testPool, err := dockertest.NewPool("") - if err != nil { - log.With(zap.Error(err)).Error("Error creating docker pool") - os.Exit(1) - } - - var cleanUpFunc func() - db, cleanUpFunc, err := postgrestest.StartPostgresDB(testPool) - if err != nil { - log.With(zap.Error(err)).Error("Error starting postgres image") - os.Exit(1) - } - defer db.Close() - - if err := createTestTables(log, db); err != nil { - log.With(zap.Error(err)).Error("Error creating test tables") - cleanUpFunc() - os.Exit(1) - } - - testStore = New(db) - teardown = func() { - if pc := recover(); pc != nil { - cleanUpFunc() - panic(pc) - } - - if err := resetTestTables(log, db); err != nil { - log.With(zap.Error(err)).Error("Error resetting test tables") - cleanUpFunc() - os.Exit(1) - } - } - - code := m.Run() - cleanUpFunc() - os.Exit(code) -} - -func TestMessagingPostgresStore(t *testing.T) { - tests.RunTests(t, testStore, teardown) -} - -func createTestTables(log *zap.Logger, db *sql.DB) error { - _, err := db.Exec(tableCreate) - if err != nil { - log.With(zap.Error(err)).Error("could not create test tables") - return err - } - return nil -} - -func resetTestTables(log *zap.Logger, db *sql.DB) error { - _, err := db.Exec(tableDestroy) - if err != nil { - log.With(zap.Error(err)).Error("could not drop test tables") - return err - } - - return createTestTables(log, db) -} diff --git a/ocp/data/messaging/store.go b/ocp/data/messaging/store.go index d0ed6af..aed0825 100644 --- a/ocp/data/messaging/store.go +++ b/ocp/data/messaging/store.go @@ -2,6 +2,7 @@ package messaging import ( "context" + "time" "github.com/google/uuid" "github.com/pkg/errors" @@ -15,6 +16,7 @@ type Record struct { Account string MessageID uuid.UUID Message []byte + ExpiresAt time.Time } // Store stores messages. @@ -52,8 +54,9 @@ func (r *Record) Validate() error { func (r *Record) Clone() Record { copied := Record{ - Account: r.Account, - Message: make([]byte, len(r.Message)), + Account: r.Account, + Message: make([]byte, len(r.Message)), + ExpiresAt: r.ExpiresAt, } copy(copied.MessageID[:], r.MessageID[:]) diff --git a/ocp/data/messaging/tests/tests.go b/ocp/data/messaging/tests/tests.go index 4ac9bbe..9e1512b 100644 --- a/ocp/data/messaging/tests/tests.go +++ b/ocp/data/messaging/tests/tests.go @@ -4,6 +4,7 @@ import ( "context" "reflect" "testing" + "time" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -18,6 +19,7 @@ func RunTests(t *testing.T, s messaging.Store, teardown func()) { testDuplicateMessage, testGetMultipleMessages, testDeleteNonExistantMessage, + testExpiry, } { tf(t, s) teardown() @@ -110,3 +112,29 @@ func assertEquivalentRecords(t *testing.T, obj1, obj2 *messaging.Record) { assert.Equal(t, obj1.MessageID, obj2.MessageID) assert.Equal(t, obj1.Message, obj2.Message) } + +func testExpiry(t *testing.T, s messaging.Store) { + t.Run("testExpiry", func(t *testing.T) { + ctx := context.Background() + account := "expiry-account" + + live := &messaging.Record{Account: account, MessageID: uuid.New(), Message: []byte("live"), ExpiresAt: time.Now().Add(time.Hour)} + expired := &messaging.Record{Account: account, MessageID: uuid.New(), Message: []byte("expired"), ExpiresAt: time.Now().Add(-time.Hour)} + permanent := &messaging.Record{Account: account, MessageID: uuid.New(), Message: []byte("permanent")} + for _, r := range []*messaging.Record{live, expired, permanent} { + require.NoError(t, s.Insert(ctx, r)) + } + + actual, err := s.Get(ctx, account) + require.NoError(t, err) + + ids := make(map[uuid.UUID]bool, len(actual)) + for _, r := range actual { + ids[r.MessageID] = true + } + require.Len(t, actual, 2) + assert.True(t, ids[live.MessageID]) + assert.True(t, ids[permanent.MessageID]) + assert.False(t, ids[expired.MessageID]) + }) +} diff --git a/ocp/rpc/messaging/internal.go b/ocp/rpc/messaging/internal.go index 6e1c636..8d7a527 100644 --- a/ocp/rpc/messaging/internal.go +++ b/ocp/rpc/messaging/internal.go @@ -81,7 +81,7 @@ func (s *server) InternallyCreateMessage(ctx context.Context, rendezvousKey *com } // Save the message to the DB - err = s.data.CreateMessage(ctx, record) + err = s.messages.Insert(ctx, record) if err != nil { return uuid.Nil, errors.Wrap(err, "error saving message to db") } diff --git a/ocp/rpc/messaging/message_handler.go b/ocp/rpc/messaging/message_handler.go index 52ff4e1..eb571e6 100644 --- a/ocp/rpc/messaging/message_handler.go +++ b/ocp/rpc/messaging/message_handler.go @@ -3,6 +3,7 @@ package messaging import ( "bytes" "context" + "time" "github.com/pkg/errors" @@ -23,13 +24,14 @@ type MessageHandler interface { // allowed to be sent and persisted Validate(ctx context.Context, rendezvous *common.Account, message *messagingpb.Message) error - // OnSuccess is called upon creating the message after validation - OnSuccess(ctx context.Context) error - // GetAdditionalContext returns additional server-provided context to be // injected into the message before delivering to clients. Returns nil // if no additional context is needed. GetAdditionalContext(ctx context.Context, message *messagingpb.Message) (*messagingpb.AdditionalServerContext, error) + + // GetExpiry returns how long the message remains valid once persisted. A zero + // duration means the message never expires. + GetExpiry() time.Duration } type RequestToGrabBillMessageHandler struct { @@ -67,14 +69,14 @@ func (h *RequestToGrabBillMessageHandler) Validate(ctx context.Context, rendezvo return nil } -func (h *RequestToGrabBillMessageHandler) OnSuccess(ctx context.Context) error { - return nil -} - func (h *RequestToGrabBillMessageHandler) GetAdditionalContext(ctx context.Context, message *messagingpb.Message) (*messagingpb.AdditionalServerContext, error) { return nil, nil } +func (h *RequestToGrabBillMessageHandler) GetExpiry() time.Duration { + return 5 * time.Minute +} + // todo: This message type needs tests type RequestToGiveBillMessageHandler struct { data ocp_data.Provider @@ -129,10 +131,6 @@ func (h *RequestToGiveBillMessageHandler) Validate(ctx context.Context, rendezvo return nil } -func (h *RequestToGiveBillMessageHandler) OnSuccess(ctx context.Context) error { - return nil -} - func (h *RequestToGiveBillMessageHandler) GetAdditionalContext(ctx context.Context, message *messagingpb.Message) (*messagingpb.AdditionalServerContext, error) { typedMessage := message.GetRequestToGiveBill() if typedMessage == nil { @@ -157,3 +155,7 @@ func (h *RequestToGiveBillMessageHandler) GetAdditionalContext(ctx context.Conte }, }, nil } + +func (h *RequestToGiveBillMessageHandler) GetExpiry() time.Duration { + return 5 * time.Minute +} diff --git a/ocp/rpc/messaging/server.go b/ocp/rpc/messaging/server.go index 2ca5602..6474c78 100644 --- a/ocp/rpc/messaging/server.go +++ b/ocp/rpc/messaging/server.go @@ -2,7 +2,6 @@ package messaging import ( "context" - "database/sql" "fmt" "strings" "sync" @@ -47,9 +46,10 @@ const ( ) type server struct { - log *zap.Logger - conf *conf - data ocp_data.Provider + log *zap.Logger + conf *conf + data ocp_data.Provider + messages messaging.Store mintDataProvider *currency_util.MintDataProvider @@ -70,10 +70,12 @@ type server struct { func NewMessagingClient( log *zap.Logger, data ocp_data.Provider, + messages messaging.Store, ) InternalMessageClient { return &server{ - log: log, - data: data, + log: log, + data: data, + messages: messages, } } @@ -83,6 +85,7 @@ func NewMessagingClient( func NewMessagingClientAndServer( log *zap.Logger, data ocp_data.Provider, + messages messaging.Store, mintDataProvider *currency_util.MintDataProvider, rpcSignatureVerifier *auth.RPCSignatureVerifier, broadcastAddress string, @@ -92,6 +95,7 @@ func NewMessagingClientAndServer( log: log, conf: configProvider(), data: data, + messages: messages, mintDataProvider: mintDataProvider, streams: make(map[string]*messageStream), individualStreamMu: make(map[string]*sync.Mutex), @@ -511,7 +515,7 @@ func (s *server) PollMessages(ctx context.Context, req *messagingpb.PollMessages return nil, err } - records, err := s.data.GetMessages(ctx, rendezvousAccount.PublicKey().ToBase58()) + records, err := s.messages.Get(ctx, rendezvousAccount.PublicKey().ToBase58()) if err != nil { log.With(zap.Error(err)).Warn("failed to load undelivered messages") return nil, status.Error(codes.Internal, "") @@ -567,7 +571,7 @@ func (s *server) AckMessages(ctx context.Context, req *messagingpb.AckMessagesRe return nil, status.Error(codes.Internal, "") } - if err := s.data.DeleteMessage(ctx, account, converted); err != nil { + if err := s.messages.Delete(ctx, account, converted); err != nil { log.With(zap.Error(err)).Warn("Failed to delete message") return nil, status.Error(codes.Internal, "") } @@ -691,34 +695,19 @@ func (s *server) SendMessage(ctx context.Context, req *messagingpb.SendMessageRe return nil, status.Error(codes.Internal, err.Error()) } - // Start off by persisting the message, so any async flushes will catch it. - // In the same database transaction, store any supporting DB records as - // required by the message type. - // - // Note: Not all store implementations have real support for this, so if - // anything is added, then ensure it does! - err = s.data.ExecuteInTx(ctx, sql.LevelDefault, func(ctx context.Context) error { - record := &messaging.Record{ - Account: base58.Encode(req.RendezvousKey.Value), - MessageID: id, - Message: messageWithGeneratedIDAndSignatureBytes, - } - - err = s.data.CreateMessage(ctx, record) - if err != nil { - log.With(zap.Error(err)).Warn("failed to create message") - return err - } - - err = messageHandler.OnSuccess(ctx) - if err != nil { - log.With(zap.Error(err)).Warn("failure calling message hanlder success callback") - return err - } + // Persist the message so any async flushes will catch it. + record := &messaging.Record{ + Account: base58.Encode(req.RendezvousKey.Value), + MessageID: id, + Message: messageWithGeneratedIDAndSignatureBytes, + } + if ttl := messageHandler.GetExpiry(); ttl > 0 { + record.ExpiresAt = time.Now().Add(ttl) + } - return nil - }) + err = s.messages.Insert(ctx, record) if err != nil { + log.With(zap.Error(err)).Warn("failed to create message") return nil, status.Error(codes.Internal, "") } @@ -766,7 +755,7 @@ func (s *server) flush(ctx context.Context, accountID *messagingpb.RendezvousKey zap.String("account_id", accountStr), ) - records, err := s.data.GetMessages(ctx, accountStr) + records, err := s.messages.Get(ctx, accountStr) if err != nil { log.With(zap.Error(err)).Warn("Failed to load undelivered messages") return diff --git a/ocp/rpc/messaging/testutil.go b/ocp/rpc/messaging/testutil.go index 8cb60c4..f8a8843 100644 --- a/ocp/rpc/messaging/testutil.go +++ b/ocp/rpc/messaging/testutil.go @@ -29,6 +29,7 @@ import ( "github.com/code-payments/ocp-server/ocp/data/account" "github.com/code-payments/ocp-server/ocp/data/currency" "github.com/code-payments/ocp-server/ocp/data/messaging" + messaging_memory "github.com/code-payments/ocp-server/ocp/data/messaging/memory" "github.com/code-payments/ocp-server/ocp/data/rendezvous" "github.com/code-payments/ocp-server/testutil" ) @@ -50,6 +51,7 @@ func setup(t *testing.T, enableMultiServer bool) (env testEnv, cleanup func()) { require.NoError(t, err) data := ocp_data.NewTestDataProvider() + messages := messaging_memory.New() env.client1 = &clientEnv{ ctx: context.Background(), @@ -81,14 +83,14 @@ func setup(t *testing.T, enableMultiServer bool) (env testEnv, cleanup func()) { mintDataProvider := currency_util.NewMintDataProvider(log, data, 0, time.Second, time.Second) require.NoError(t, mintDataProvider.Start(context.Background())) - s1 := NewMessagingClientAndServer(log, data, mintDataProvider, auth.NewRPCSignatureVerifier(log, data), conn1.Target(), withManualTestOverrides(&testOverrides{})) + s1 := NewMessagingClientAndServer(log, data, messages, mintDataProvider, auth.NewRPCSignatureVerifier(log, data), conn1.Target(), withManualTestOverrides(&testOverrides{})) env.server1 = &serverEnv{ ctx: context.Background(), server: s1, subsidizer: subsidizer, } - s2 := NewMessagingClientAndServer(log, data, mintDataProvider, auth.NewRPCSignatureVerifier(log, data), conn2.Target(), withManualTestOverrides(&testOverrides{})) + s2 := NewMessagingClientAndServer(log, data, messages, mintDataProvider, auth.NewRPCSignatureVerifier(log, data), conn2.Target(), withManualTestOverrides(&testOverrides{})) env.server2 = &serverEnv{ ctx: context.Background(), server: s2, @@ -122,13 +124,13 @@ type serverEnv struct { } func (s *serverEnv) getMessages(t *testing.T, rendezvousKey *common.Account) []*messaging.Record { - messages, err := s.server.data.GetMessages(s.ctx, rendezvousKey.PublicKey().ToBase58()) + messages, err := s.server.messages.Get(s.ctx, rendezvousKey.PublicKey().ToBase58()) require.NoError(t, err) return messages } func (s *serverEnv) assertNoMessages(t *testing.T, rendezvousKey *common.Account) { - messages, err := s.server.data.GetMessages(s.ctx, rendezvousKey.PublicKey().ToBase58()) + messages, err := s.server.messages.Get(s.ctx, rendezvousKey.PublicKey().ToBase58()) require.NoError(t, err) assert.Empty(t, messages) }