Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
filippo.io/edwards25519 v1.1.0
github.com/anyproto/go-slip10 v1.0.1
github.com/aws/aws-sdk-go-v2 v1.42.0
github.com/aws/aws-sdk-go-v2/config v1.32.25
github.com/aws/aws-sdk-go-v2/credentials v1.19.24
github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.25
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.59.0
Expand Down Expand Up @@ -46,6 +47,7 @@ require (
github.com/Microsoft/go-winio v0.4.14 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.11 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.29 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.30 // indirect
Expand All @@ -54,6 +56,10 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.12.6 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.29 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.25 // indirect
github.com/aws/aws-sdk-go-v2/service/signin v1.2.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.31.3 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.36.6 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.43.3 // indirect
github.com/aws/smithy-go v1.27.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.0 // indirect
github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ github.com/aws/aws-sdk-go-v2 v1.42.0 h1:XvXMJTkFQtpBKIWZnmr9ZEOc2InWM2yldjXEJ/by
github.com/aws/aws-sdk-go-v2 v1.42.0/go.mod h1:27+ACypSLljLAEKsCYOmrjKh83vuTRkuAe9Uv/3A4bg=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.11 h1:h5+3VT69KUBK24grGuuA5saDJTj2IIjLb9au668Fo5I=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.11/go.mod h1:dnakxebH6UwFvcvujL0LVggYQ8nEvBGjU4G/V79Nv94=
github.com/aws/aws-sdk-go-v2/config v1.32.25 h1:ACCejvStYoilgwrfegSt5ZntCbPrk52qfwyNcnl3omM=
github.com/aws/aws-sdk-go-v2/config v1.32.25/go.mod h1:LJyU8sDRbXUxFn8xMJIGP+v9QYYwveNLI8a/giAOiAs=
github.com/aws/aws-sdk-go-v2/credentials v1.19.24 h1:2hQqYCV9yqyePQ9o6dCrZc/zO8U3TwPr9mIKlZnPu/I=
github.com/aws/aws-sdk-go-v2/credentials v1.19.24/go.mod h1:IDwpACtwqHLISdzfwUUNq4P9DsB/h5BLg4FwJPNfqFY=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.29 h1:r6qZHbT+wxgWO/e9vYNUEtg7lv5+UN3pRqKhLXvnArg=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.29/go.mod h1:QRnaRcTVGKPGRy8w78HMQtKUGRYcnMZAANATkeVA6Mo=
github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.25 h1:EeK30mZmhopHcNmKykyGF0LwmFB1ZwQNr+FeyRjcN0U=
github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.25/go.mod h1:tudVnwAJyXgCh4N6ABYdzUM+i+PXsx8700FOyhMn+4k=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 h1:f3vKqSo13fhTYb+JEcXwXefZQE26I1FB5eTSniU67ko=
Expand All @@ -61,6 +65,14 @@ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.25 h1:2pQEbwf+/6EDb
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.25/go.mod h1:KvT6NCcQ0EZ+ZkVRrlBMt04Po3ok23YELEp7WimhLhM=
github.com/aws/aws-sdk-go-v2/service/s3 v1.102.2 h1:ie4ElCmUKS26pzrZcIk/lmt4yWjAqLLcawstyQCh298=
github.com/aws/aws-sdk-go-v2/service/s3 v1.102.2/go.mod h1:zjsomFeX5duj+4PlMB+o4JoWTIx+G0XMyzjYrUbQkN0=
github.com/aws/aws-sdk-go-v2/service/signin v1.2.0 h1:3nXpRcFwRCW8n7HgO2QGy0Dc20eQNfBuUemGQhpF8m8=
github.com/aws/aws-sdk-go-v2/service/signin v1.2.0/go.mod h1:LxYujSTLPRlp2vTtcUO/+1ilrew8ytt6SvQyOgejzFQ=
github.com/aws/aws-sdk-go-v2/service/sso v1.31.3 h1:ey1XLTYXb9PcLt4535632o5kCGXNXEhNb620Dqwuylo=
github.com/aws/aws-sdk-go-v2/service/sso v1.31.3/go.mod h1:Lk7PlmoTYryQmyBG0EXqj5BcUbj3whXdU2s3yGI3EAc=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.36.6 h1:yLr03zQE/5Eu5l3QU0Si+xMbLMbSDF2YXsigqXngs6g=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.36.6/go.mod h1:Q5N6icH+KJZDLh+ESNwzdv6cZ6vLFF/egy3IOxWhmz4=
github.com/aws/aws-sdk-go-v2/service/sts v1.43.3 h1:VrIhKRCSK1umelSgB9RghvA9RTUYeQffyAS5ApXehNI=
github.com/aws/aws-sdk-go-v2/service/sts v1.43.3/go.mod h1:r8wkDOuLaaMFqFiYAb8dGY2A3gJCOujMc6CFOVC4Zhc=
github.com/aws/smithy-go v1.27.1 h1:4T340VFndXtADGF52gYa1POyL7s9E4Z1OeZ1hCscIw8=
github.com/aws/smithy-go v1.27.1/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
Expand Down
8 changes: 0 additions & 8 deletions ocp/aml/guard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/code-payments/ocp-server/ocp/common"
currency_util "github.com/code-payments/ocp-server/ocp/currency"
ocp_data "github.com/code-payments/ocp-server/ocp/data"
"github.com/code-payments/ocp-server/ocp/data/currency"
"github.com/code-payments/ocp-server/ocp/data/intent"
"github.com/code-payments/ocp-server/testutil"
)
Expand Down Expand Up @@ -151,13 +150,6 @@ func setupAmlTest(t *testing.T) (env amlTestEnv) {

testutil.SetupRandomSubsidizer(t, env.data)

env.data.ImportExchangeRates(env.ctx, &currency.MultiRateRecord{
Time: time.Now(),
Rates: map[string]float64{
string(currency_lib.USD): 0.1,
},
})

return env
}

Expand Down
43 changes: 37 additions & 6 deletions ocp/currency/data_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ import (
commonpb "github.com/code-payments/ocp-protobuf-api/generated/go/common/v1"
currencypb "github.com/code-payments/ocp-protobuf-api/generated/go/currency/v1"

"github.com/code-payments/ocp-server/database/query"
"github.com/code-payments/ocp-server/ocp/auth"
"github.com/code-payments/ocp-server/ocp/common"
"github.com/code-payments/ocp-server/ocp/config"
ocp_data "github.com/code-payments/ocp-server/ocp/data"
"github.com/code-payments/ocp-server/ocp/data/currency"
"github.com/code-payments/ocp-server/ocp/data/currency/exchange"
"github.com/code-payments/ocp-server/ocp/data/currency/holder"
"github.com/code-payments/ocp-server/ocp/data/currency/reserve"
"github.com/code-payments/ocp-server/solana/currencycreator"
timelock_token "github.com/code-payments/ocp-server/solana/timelock/v1"
"github.com/code-payments/ocp-server/usdc"
Expand Down Expand Up @@ -52,8 +56,12 @@ type cachedProtoMint struct {
}

type MintDataProvider struct {
log *zap.Logger
data ocp_data.Provider
log *zap.Logger

data ocp_data.Provider
exchangeRateStore exchange.Store
reserveStore reserve.Store
holderStore holder.Store

protoMintCacheTTL time.Duration
exchangeRatePollInterval time.Duration
Expand Down Expand Up @@ -102,6 +110,9 @@ type MintDataProvider struct {
func NewMintDataProvider(
log *zap.Logger,
data ocp_data.Provider,
exchangeRateStore exchange.Store,
reserveStore reserve.Store,
holderStore holder.Store,
protoMintCacheTTL,
exchangeRatePollInterval,
launchpadCurrencyPollInterval time.Duration,
Expand All @@ -110,6 +121,9 @@ func NewMintDataProvider(
return &MintDataProvider{
log: log,
data: data,
exchangeRateStore: exchangeRateStore,
reserveStore: reserveStore,
holderStore: holderStore,
protoMintCacheTTL: protoMintCacheTTL,
exchangeRatePollInterval: exchangeRatePollInterval,
launchpadCurrencyPollInterval: launchpadCurrencyPollInterval,
Expand Down Expand Up @@ -476,6 +490,18 @@ func (m *MintDataProvider) GetLiveExchangeRates(ctx context.Context) (*LiveExcha
return m.exchangeRates, nil
}

// GetExchangeRateHistory returns historical exchange rate records for the core
// mint, passing straight through to the underlying exchange rate store.
func (m *MintDataProvider) GetExchangeRateHistory(ctx context.Context, symbol string, interval query.Interval, start, end time.Time, ordering query.Ordering) ([]*currency.ExchangeRateRecord, error) {
return m.exchangeRateStore.GetExchangeRatesInRange(ctx, symbol, interval, start, end, ordering)
}

// GetReserveHistory returns historical reserve records for a currency creator
// mint, passing straight through to the underlying reserve store.
func (m *MintDataProvider) GetReserveHistory(ctx context.Context, mint string, interval query.Interval, start, end time.Time, ordering query.Ordering) ([]*currency.ReserveRecord, error) {
return m.reserveStore.GetReservesInRange(ctx, mint, interval, start, end, ordering)
}

// GetAllCachedReserveStates returns a snapshot of all currently cached reserve
// states keyed by mint address. It blocks until the first successful poll has
// completed.
Expand Down Expand Up @@ -856,7 +882,7 @@ func (m *MintDataProvider) pollExchangeRates(ctx context.Context) {
}

func (m *MintDataProvider) fetchAndUpdateExchangeRates(ctx context.Context, log *zap.Logger) {
rates, err := m.data.GetAllExchangeRates(ctx, time.Now())
rates, err := m.exchangeRateStore.GetAllExchangeRates(ctx, time.Now())
if err != nil {
log.With(zap.Error(err)).Warn("failed to fetch exchange rates")
return
Expand Down Expand Up @@ -902,7 +928,7 @@ func (m *MintDataProvider) pollReserveState(ctx context.Context) {
}

func (m *MintDataProvider) fetchAndUpdateReserveStates(ctx context.Context) {
liveReserves, err := m.data.GetAllLiveCurrencyReserves(ctx)
liveReserves, err := m.reserveStore.GetAllLiveReserves(ctx)
if err == currency.ErrNotFound {
return
}
Expand Down Expand Up @@ -974,7 +1000,7 @@ func (m *MintDataProvider) pollHolderCounts(ctx context.Context) {
}

func (m *MintDataProvider) fetchAndUpdateHolderCounts(ctx context.Context) {
liveCounts, err := m.data.GetAllLiveCurrencyHolderCounts(ctx)
liveCounts, err := m.holderStore.GetAllLiveHolderCounts(ctx)
if err == currency.ErrNotFound {
return
}
Expand All @@ -983,10 +1009,15 @@ func (m *MintDataProvider) fetchAndUpdateHolderCounts(ctx context.Context) {
return
}

mints := make([]string, 0, len(liveCounts))
for mintAddr := range liveCounts {
mints = append(mints, mintAddr)
}

var includeWeeklyDeltas bool
oneWeekAgo := time.Now().Add(-7 * 24 * time.Hour)
endOfWeekAgoDay := time.Date(oneWeekAgo.Year(), oneWeekAgo.Month(), oneWeekAgo.Day(), 23, 59, 59, 0, time.UTC)
historicalCounts, err := m.data.GetAllCurrencyHolderCountsAtTime(ctx, endOfWeekAgoDay)
historicalCounts, err := m.holderStore.GetHolderCountsForDay(ctx, mints, endOfWeekAgoDay)
if err != nil && err != currency.ErrNotFound {
m.log.With(zap.Error(err)).Warn("failed to fetch historical holder counts for weekly delta")
} else {
Expand Down
10 changes: 6 additions & 4 deletions ocp/currency/usd_market_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/code-payments/ocp-server/ocp/common"
ocp_data "github.com/code-payments/ocp-server/ocp/data"
"github.com/code-payments/ocp-server/ocp/data/currency"
"github.com/code-payments/ocp-server/ocp/data/currency/exchange"
"github.com/code-payments/ocp-server/ocp/data/currency/reserve"
"github.com/code-payments/ocp-server/solana/currencycreator"
)

Expand All @@ -28,7 +30,7 @@ func CalculateUsdMarketValueFromFiatAmount(fiatAmount, fiatToUsdRate float64) (f

// CalculateUsdMarketValueFromTokenAmount calculates the current USD market value
// of a crypto amount in quarks.
func CalculateUsdMarketValueFromTokenAmount(ctx context.Context, data ocp_data.Provider, mint *common.Account, quarks uint64, at time.Time) (float64, error) {
func CalculateUsdMarketValueFromTokenAmount(ctx context.Context, data ocp_data.Provider, exchangeRateStore exchange.Store, reserveStore reserve.Store, mint *common.Account, quarks uint64, at time.Time) (float64, error) {
isLive := time.Since(at) < 5*time.Second

isSupportedMint, err := common.IsSupportedMint(ctx, data, mint)
Expand All @@ -48,7 +50,7 @@ func CalculateUsdMarketValueFromTokenAmount(ctx context.Context, data ocp_data.P
Time: at,
}
} else {
exchangeRateRecord, err = data.GetExchangeRate(ctx, currency_lib.USD, at)
exchangeRateRecord, err = exchangeRateStore.GetExchangeRate(ctx, string(currency_lib.USD), at)
if err != nil {
return 0, err
}
Expand All @@ -64,12 +66,12 @@ func CalculateUsdMarketValueFromTokenAmount(ctx context.Context, data ocp_data.P

var reserveRecord *currency.ReserveRecord
if isLive {
reserveRecord, err = data.GetLiveCurrencyReserve(ctx, mint.PublicKey().ToBase58())
reserveRecord, err = reserveStore.GetLiveReserve(ctx, mint.PublicKey().ToBase58())
if err != nil {
return 0, err
}
} else {
reserveRecord, err = data.GetCurrencyReserveAtTime(ctx, mint.PublicKey().ToBase58(), at)
reserveRecord, err = reserveStore.GetReserveAtTime(ctx, mint.PublicKey().ToBase58(), at)
if err != nil {
return 0, err
}
Expand Down
89 changes: 89 additions & 0 deletions ocp/data/currency/exchange/cache/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Package cache provides an exchange.Store decorator that caches single- and
// all-symbol rate lookups in front of a wrapped store.
//
// Reads are keyed by a coarse time bucket that doubles as the freshness window,
// and a full set of rates is weighted more heavily than a single-symbol entry.
// Range and history reads, and all writes, pass straight through to the wrapped
// store.
package cache

import (
"context"
"fmt"
"time"

lrucache "github.com/code-payments/ocp-server/cache"
"github.com/code-payments/ocp-server/database/query"
"github.com/code-payments/ocp-server/ocp/data/currency"
"github.com/code-payments/ocp-server/ocp/data/currency/exchange"
)

const (
// maxCacheBudget bounds the weighted size of the rate cache before the
// least-recently-used entries are evicted.
maxCacheBudget = 100_000

// singleRateWeight and multiRateWeight weight cached entries against the
// budget. A full set of rates spans many symbols, so it costs
// proportionally more than a single-symbol entry.
singleRateWeight = 1
multiRateWeight = 100

// cacheBucket is the time granularity used to build cache keys. Lookups that
// truncate to the same bucket share a cached result, which doubles as the
// effective freshness window for cached rates.
cacheBucket = 5 * time.Minute
)

type store struct {
backing exchange.Store
cache lrucache.Cache
}

// New returns an exchange.Store that caches reads in front of backing.
func New(backing exchange.Store) exchange.Store {
return &store{
backing: backing,
cache: lrucache.NewCache(maxCacheBudget),
}
}

func (s *store) PutExchangeRates(ctx context.Context, record *currency.MultiRateRecord) error {
return s.backing.PutExchangeRates(ctx, record)
}

func (s *store) GetExchangeRate(ctx context.Context, symbol string, t time.Time) (*currency.ExchangeRateRecord, error) {
key := fmt.Sprintf("%s:%s", symbol, t.Truncate(cacheBucket).Format(time.RFC3339))
if cached, ok := s.cache.Retrieve(key); ok {
return cached.(*currency.ExchangeRateRecord), nil
}

rate, err := s.backing.GetExchangeRate(ctx, symbol, t)
if err != nil {
return nil, err
}

s.cache.Insert(key, rate, singleRateWeight)

return rate, nil
}

func (s *store) GetAllExchangeRates(ctx context.Context, t time.Time) (*currency.MultiRateRecord, error) {
key := fmt.Sprintf("everything:%s", t.Truncate(cacheBucket).Format(time.RFC3339))
if cached, ok := s.cache.Retrieve(key); ok {
return cached.(*currency.MultiRateRecord), nil
}

rates, err := s.backing.GetAllExchangeRates(ctx, t)
if err != nil {
return nil, err
}

s.cache.Insert(key, rates, multiRateWeight)

return rates, nil
}

func (s *store) GetExchangeRatesInRange(ctx context.Context, symbol string, interval query.Interval, start time.Time, end time.Time, ordering query.Ordering) ([]*currency.ExchangeRateRecord, error) {
return s.backing.GetExchangeRatesInRange(ctx, symbol, interval, start, end, ordering)
}
67 changes: 67 additions & 0 deletions ocp/data/currency/exchange/cache/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package cache

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/code-payments/ocp-server/ocp/data/currency"
"github.com/code-payments/ocp-server/ocp/data/currency/exchange/memory"
"github.com/code-payments/ocp-server/ocp/data/currency/exchange/tests"
)

func TestExchange_CachedStore(t *testing.T) {
testStore := New(memory.New()).(*store)
teardown := func() {
testStore.backing = memory.New()
testStore.cache.Clear()
}
tests.RunStoreTests(t, testStore, teardown)
}

// TestExchange_CachedReadsServedFromCache verifies that once a rate is read, a
// later read in the same time bucket is served from the cache even after the
// backing store no longer holds it, while a read in a different bucket falls
// through to the (now empty) backing store.
func TestExchange_CachedReadsServedFromCache(t *testing.T) {
ctx := context.Background()
now := time.Date(2021, 01, 29, 13, 0, 5, 0, time.UTC)

s := New(memory.New()).(*store)
require.NoError(t, s.PutExchangeRates(ctx, &currency.MultiRateRecord{
Time: now,
Rates: map[string]float64{"usd": 0.000055, "cad": 0.00007},
}))

// Prime the cache with a single- and all-symbol read.
single, err := s.GetExchangeRate(ctx, "usd", now)
require.NoError(t, err)
assert.EqualValues(t, 0.000055, single.Rate)

all, err := s.GetAllExchangeRates(ctx, now)
require.NoError(t, err)
assert.Len(t, all.Rates, 2)

// Drop the backing data. Reads in the same bucket are still served.
s.backing = memory.New()

single, err = s.GetExchangeRate(ctx, "usd", now)
require.NoError(t, err)
assert.EqualValues(t, 0.000055, single.Rate)

all, err = s.GetAllExchangeRates(ctx, now)
require.NoError(t, err)
assert.Len(t, all.Rates, 2)

// A read truncating to a different bucket misses the cache and falls
// through to the empty backing store.
otherBucket := now.Add(cacheBucket)
_, err = s.GetExchangeRate(ctx, "usd", otherBucket)
assert.Equal(t, currency.ErrNotFound, err)

_, err = s.GetAllExchangeRates(ctx, otherBucket)
assert.Equal(t, currency.ErrNotFound, err)
}
Loading
Loading