Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,509 changes: 741 additions & 768 deletions package-lock.json

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions src/__tests__/price.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ describe('GET /price/:assetA/:assetB confidence score', () => {
const recent = new Date(now.getTime() - 10000).toISOString() // 10s ago

mockQuery.mockImplementation(async (sql: string) => {
if (sql.includes('price::numeric') && sql.includes('FROM price_points') && sql.includes('SDEX')) return { rows: [{ price: '0.1', timestamp: new Date().toISOString() }] }
if (sql.includes('AVG(ps.spot_price::numeric)')) return { rows: [{ spot_price: '0.1', timestamp: new Date().toISOString() }] }
if (sql.includes('AVG(spot_price::numeric)')) return { rows: [{ amm_price: '0.1' }] }
if (sql.includes('MAX(timestamp) as last_trade')) return { rows: [{ last_trade: recent }] }
if (sql.includes('GROUP BY source')) return { rows: [{ source: 'SDEX', vol: '100' }, { source: 'AMM', vol: '50' }] }
Expand All @@ -79,6 +81,8 @@ describe('GET /price/:assetA/:assetB confidence score', () => {
const twoMinAgo = new Date(now.getTime() - 120000).toISOString() // 2m ago

mockQuery.mockImplementation(async (sql: string) => {
if (sql.includes('price::numeric') && sql.includes('FROM price_points') && sql.includes('SDEX')) return { rows: [{ price: '0.1', timestamp: new Date().toISOString() }] }
if (sql.includes('AVG(ps.spot_price::numeric)')) return { rows: [{ spot_price: '0.1', timestamp: new Date().toISOString() }] }
if (sql.includes('AVG(spot_price::numeric)')) return { rows: [{ amm_price: '0.1' }] }
if (sql.includes('MAX(timestamp) as last_trade')) return { rows: [{ last_trade: twoMinAgo }] }
if (sql.includes('GROUP BY source')) return { rows: [{ source: 'SDEX', vol: '100' }] }
Expand All @@ -101,6 +105,8 @@ describe('GET /price/:assetA/:assetB confidence score', () => {
const tenMinAgo = new Date(now.getTime() - 600000).toISOString() // 10m ago

mockQuery.mockImplementation(async (sql: string) => {
if (sql.includes('price::numeric') && sql.includes('FROM price_points') && sql.includes('SDEX')) return { rows: [{ price: '0.1', timestamp: new Date().toISOString() }] }
if (sql.includes('AVG(ps.spot_price::numeric)')) return { rows: [{ spot_price: '0.1', timestamp: new Date().toISOString() }] }
if (sql.includes('MAX(timestamp) as last_trade')) return { rows: [{ last_trade: tenMinAgo }] }
if (sql.includes('COUNT(DISTINCT COALESCE(pool_id')) return { rows: [{ sources: '1' }] }
if (sql.includes('GROUP BY source')) return { rows: [{ source: 'SDEX', vol: '100' }] }
Expand All @@ -119,6 +125,8 @@ describe('GET /price/:assetA/:assetB confidence score', () => {

it('returns unknown confidence when no trades found', async () => {
mockQuery.mockImplementation(async (sql: string) => {
if (sql.includes('price::numeric') && sql.includes('FROM price_points') && sql.includes('SDEX')) return { rows: [] }
if (sql.includes('AVG(ps.spot_price::numeric)')) return { rows: [] }
if (sql.includes('MAX(timestamp) as last_trade')) return { rows: [{ last_trade: null }] }
return { rows: [] }
})
Expand Down
9 changes: 7 additions & 2 deletions src/__tests__/schemaValidation.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, it, expect, vi, beforeEach } from 'vitest'
import Fastify from 'fastify'
import Ajv from 'ajv'
import Fastify from 'fastify'
import { beforeEach, describe, expect, it, vi } from 'vitest'

const { mockQuery, mockGetCachedPrice, mockGetBestRoute, mockGetAggregatedPrice } = vi.hoisted(() => ({
mockQuery: vi.fn(),
Expand Down Expand Up @@ -78,6 +78,11 @@ describe('REST response schema validation', () => {

it('returns 200 and a body that matches the declared /price schema', async () => {
mockGetAggregatedPrice.mockResolvedValue(validAggregate())
mockQuery.mockImplementation(async (sql: string) => {
if (sql.includes('price::numeric') && sql.includes('FROM price_points') && sql.includes('SDEX')) return { rows: [{ price: '0.1', timestamp: new Date().toISOString() }] }
if (sql.includes('AVG(ps.spot_price::numeric)')) return { rows: [{ spot_price: '0.1', timestamp: new Date().toISOString() }] }
return { rows: [] }
})

const app = await buildApp()
const res = await app.inject({ method: 'GET', url: '/price/XLM/USDC' })
Expand Down
89 changes: 77 additions & 12 deletions src/api/rest.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import type { FastifyInstance } from 'fastify'
import { price_requests_total } from '../metrics'
import { getCachedPrice, setCachedPrice } from '../redis'
import { getAggregatedPrice } from '../aggregator/vwap'
import { getBestRoute } from '../aggregator/bestRoute'
import { pgPool } from '../db'
import { getAggregatedPrice } from '../aggregator/vwap'
import { getMedianPrice, type PriceSource } from '../pricing/median'
import { config } from '../config'
import { pgPool } from '../db'
import { price_requests_total } from '../metrics'
import { getCachedPrice, setCachedPrice } from '../redis'
import {
statusResponseSchema,
priceResponseSchema,
routeResponseSchema,
historyResponseSchema,
poolsResponseSchema,
installResponseValidation,
historyResponseSchema,
installResponseValidation,
poolsResponseSchema,
priceResponseSchema,
routeResponseSchema,
statusResponseSchema,
} from './schemas'

function makePairKey(a: string, b: string): string {
Expand All @@ -29,6 +30,56 @@ function findPair(assetA: string, assetB: string) {
})
}

async function extractSourcePrices(pairKey: string): Promise<PriceSource[]> {
const sources: PriceSource[] = []

try {
// Get the latest SDEX price
const sdexResult = await pgPool.query(
`SELECT price::numeric, timestamp
FROM price_points
WHERE pair_key = $1 AND source = 'SDEX'
ORDER BY timestamp DESC LIMIT 1`,
[pairKey]
)

if (sdexResult.rows[0]) {
const row = sdexResult.rows[0]
sources.push({
id: 'SDEX',
price: parseFloat(row.price),
timestamp: new Date(row.timestamp).getTime(),
priority: 0,
})
}

// Get the latest AMM price
const ammResult = await pgPool.query(
`SELECT AVG(ps.spot_price::numeric) AS spot_price, MAX(ps.timestamp) AS timestamp
FROM pool_snapshots ps
WHERE ps.pool_id IN (
SELECT DISTINCT pool_id FROM price_points
WHERE pair_key = $1 AND source = 'AMM' AND pool_id IS NOT NULL
)`,
[pairKey]
)

if (ammResult.rows[0] && ammResult.rows[0].spot_price) {
sources.push({
id: 'AMM',
price: parseFloat(ammResult.rows[0].spot_price),
timestamp: new Date(ammResult.rows[0].timestamp).getTime(),
priority: 1,
})
}
} catch (err) {
// If source extraction fails, return empty sources and fall through
// The aggregated price can still be returned
}

return sources
}

export async function registerRESTRoutes(app: FastifyInstance) {
// Validate every response against its declared schema in dev/test (no-op in
// production). Must run before the routes below are registered so they pick
Expand Down Expand Up @@ -67,14 +118,28 @@ export async function registerRESTRoutes(app: FastifyInstance) {
} catch { /* fall through */ }
}

const agg = await getAggregatedPrice(pair.pairKey)
const route = await getBestRoute(pair.assetA, pair.assetB, pair.pairKey, 1000)
const [agg, route, sources] = await Promise.all([
getAggregatedPrice(pair.pairKey),
getBestRoute(pair.assetA, pair.assetB, pair.pairKey, 1000),
extractSourcePrices(pair.pairKey),
])

const medianResult = getMedianPrice(sources, {
freshnessThresholdMs: 60_000,
minFreshSources: 2,
fallbackChain: [['SDEX', 'AMM']],
})

const result = {
assetA: pair.assetA.code,
assetB: pair.assetB.code,
pairKey: pair.pairKey,
...agg,
bestRoute: route.route,
medianPrice: medianResult.median,
medianPriceSources: medianResult.includedSources,
excludedSources: medianResult.excludedSources,
medianFallbackEngaged: medianResult.fallbackEngaged,
lastUpdated: new Date().toISOString(),
}

Expand Down
15 changes: 15 additions & 0 deletions src/api/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,21 @@ export const priceResponseSchema = {
stale: { type: 'boolean' },
bestRoute: { type: 'string', enum: ['SDEX', 'AMM', 'SPLIT', 'UNKNOWN'] },
lastUpdated: { type: 'string' },
medianPrice: { type: ['number', 'null'] },
medianPriceSources: { type: 'array', items: { type: 'string' } },
excludedSources: {
type: 'array',
items: {
type: 'object',
required: ['id', 'reason'],
additionalProperties: false,
properties: {
id: { type: 'string' },
reason: { type: 'string', enum: ['stale', 'missing'] },
},
},
},
medianFallbackEngaged: { type: 'boolean' },
},
} as const

Expand Down
Loading