From 2c76bffbb8b0ade3595c2d3c152903393e29c703 Mon Sep 17 00:00:00 2001 From: William Morriss Date: Mon, 3 Nov 2025 18:03:28 -0600 Subject: [PATCH 1/5] wip select by ping --- packages/synapse-sdk/src/storage/context.ts | 131 +++++++------------- 1 file changed, 45 insertions(+), 86 deletions(-) diff --git a/packages/synapse-sdk/src/storage/context.ts b/packages/synapse-sdk/src/storage/context.ts index 57224a087..4a8da552c 100644 --- a/packages/synapse-sdk/src/storage/context.ts +++ b/packages/synapse-sdk/src/storage/context.ts @@ -23,7 +23,7 @@ */ import * as SP from '@filoz/synapse-core/sp' -import { randIndex, randU256 } from '@filoz/synapse-core/utils' +import { randU256 } from '@filoz/synapse-core/utils' import type { ethers } from 'ethers' import type { Hex } from 'viem' import type { PaymentsService } from '../payments/index.ts' @@ -615,69 +615,54 @@ export class StorageContext { ) if (managedDataSets.length > 0 && !forceCreateDataSet) { - // Prefer data sets with pieces, sort by ID (older first) - const sorted = managedDataSets.sort((a, b) => { - if (a.currentPieceCount > 0 && b.currentPieceCount === 0) return -1 - if (b.currentPieceCount > 0 && a.currentPieceCount === 0) return 1 - return a.pdpVerifierDataSetId - b.pdpVerifierDataSetId - }) + // Prefer data sets with pieces + const [hasNoPieces, hasPieces] = managedDataSets + .reduce( + (results, managedDataSet) => { + results[managedDataSet.currentPieceCount > 0 ? 1 : 0].add(managedDataSet) + return results + }, + [new Set(), new Set()] + ) + .map((deduped) => [...deduped]) + + for (const managedDataSets of [hasPieces, hasNoPieces]) { + const providers = ( + await Promise.all(managedDataSets.map((dataSet) => spRegistry.getProvider(dataSet.providerId))) + ).filter((provider) => { + provider != null && + (!withIpni || provider.products.PDP?.data.ipniIpfs) && + (dev || provider.products.PDP?.capabilities?.dev == null) + }) - // Create async generator that yields providers lazily - async function* generateProviders(): AsyncGenerator { - // First, yield providers from existing data sets (in sorted order) - for (const dataSet of sorted) { - if (skipProviderIds.has(dataSet.providerId)) { - continue - } - skipProviderIds.add(dataSet.providerId) - const provider = await spRegistry.getProvider(dataSet.providerId) + try { + const selectedProvider = await StorageContext.selectProviderWithPing(providers) + + // Find the first matching data set ID for this provider + // Match by provider ID (stable identifier in the registry) + const matchingDataSet = managedDataSets.find((ps) => ps.providerId === selectedProvider.id) - if (provider == null) { + if (matchingDataSet == null) { console.warn( - `Provider ID ${dataSet.providerId} for data set ${dataSet.pdpVerifierDataSetId} is not currently approved` + `Could not match selected provider ${selectedProvider.serviceProvider} (ID: ${selectedProvider.id}) ` + + `to existing data sets. Falling back to selecting from all providers.` ) - continue - } - - if (withIpni && provider.products.PDP?.data.ipniIpfs === false) { - continue - } - - if (!dev && provider.products.PDP?.capabilities?.dev != null) { - continue + // Fall through to select from all approved providers below + } else { + // Fetch metadata for existing data set + const dataSetMetadata = await warmStorageService.getDataSetMetadata(matchingDataSet.pdpVerifierDataSetId) + + return { + provider: selectedProvider, + dataSetId: matchingDataSet.pdpVerifierDataSetId, + isExisting: true, + dataSetMetadata, + } } - - yield provider - } - } - - try { - const selectedProvider = await StorageContext.selectProviderWithPing(generateProviders()) - - // Find the first matching data set ID for this provider - // Match by provider ID (stable identifier in the registry) - const matchingDataSet = sorted.find((ps) => ps.providerId === selectedProvider.id) - - if (matchingDataSet == null) { - console.warn( - `Could not match selected provider ${selectedProvider.serviceProvider} (ID: ${selectedProvider.id}) ` + - `to existing data sets. Falling back to selecting from all providers.` - ) + } catch (_error) { + console.warn('All providers from existing data sets failed health check. Falling back to all providers.') // Fall through to select from all approved providers below - } else { - // Fetch metadata for existing data set - const dataSetMetadata = await warmStorageService.getDataSetMetadata(matchingDataSet.pdpVerifierDataSetId) - - return { - provider: selectedProvider, - dataSetId: matchingDataSet.pdpVerifierDataSetId, - isExisting: true, - dataSetMetadata, - } } - } catch (_error) { - console.warn('All providers from existing data sets failed health check. Falling back to all providers.') - // Fall through to select from all approved providers below } } @@ -697,7 +682,7 @@ export class StorageContext { } // Random selection from all providers - const provider = await StorageContext.selectRandomProvider(allProviders) + const provider = await StorageContext.selectProviderWithPing(allProviders) return { provider, @@ -707,32 +692,6 @@ export class StorageContext { } } - /** - * Select a random provider from a list with ping validation - * @param providers - Array of providers to select from - * @param withIpni - Filter for IPNI support - * @param dev - Include dev providers - * @returns Selected provider - */ - private static async selectRandomProvider(providers: ProviderInfo[]): Promise { - if (providers.length === 0) { - throw createError('StorageContext', 'selectRandomProvider', 'No providers available') - } - - // Create async generator that yields providers in random order - async function* generateRandomProviders(): AsyncGenerator { - const remaining = [...providers] - - while (remaining.length > 0) { - // Remove and yield the selected provider - const selected = remaining.splice(randIndex(remaining.length), 1)[0] - yield selected - } - } - - return await StorageContext.selectProviderWithPing(generateRandomProviders()) - } - /** * Select a provider from an async iterator with ping validation. * This is shared logic used by both smart selection and random selection. @@ -740,11 +699,11 @@ export class StorageContext { * @returns The first provider that responds * @throws If all providers fail */ - private static async selectProviderWithPing(providers: AsyncIterable): Promise { + private static async selectProviderWithPing(providers: ProviderInfo[]): Promise { let providerCount = 0 // Try providers in order until we find one that responds to ping - for await (const provider of providers) { + for (const provider of providers) { providerCount++ try { // Create a temporary PDPServer for this specific provider's endpoint From cc94ac11a208b5a8f8787a60f54a53995e8b09fc Mon Sep 17 00:00:00 2001 From: William Morriss Date: Mon, 3 Nov 2025 19:57:18 -0600 Subject: [PATCH 2/5] fix tsc --- packages/synapse-sdk/src/storage/context.ts | 27 +++++++++++-------- packages/synapse-sdk/src/test/storage.test.ts | 8 +++--- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/packages/synapse-sdk/src/storage/context.ts b/packages/synapse-sdk/src/storage/context.ts index 4a8da552c..846490fa6 100644 --- a/packages/synapse-sdk/src/storage/context.ts +++ b/packages/synapse-sdk/src/storage/context.ts @@ -605,7 +605,7 @@ export class StorageContext { const skipProviderIds = new Set(excludeProviderIds) // Filter for managed data sets with matching metadata - const managedDataSets = dataSets.filter( + const managedDataSets: EnhancedDataSetInfo[] = dataSets.filter( (ps) => ps.isLive && ps.isManaged && @@ -617,8 +617,8 @@ export class StorageContext { if (managedDataSets.length > 0 && !forceCreateDataSet) { // Prefer data sets with pieces const [hasNoPieces, hasPieces] = managedDataSets - .reduce( - (results, managedDataSet) => { + .reduce<[Set, Set]>( + (results: [Set, Set], managedDataSet: EnhancedDataSetInfo) => { results[managedDataSet.currentPieceCount > 0 ? 1 : 0].add(managedDataSet) return results }, @@ -627,20 +627,25 @@ export class StorageContext { .map((deduped) => [...deduped]) for (const managedDataSets of [hasPieces, hasNoPieces]) { - const providers = ( - await Promise.all(managedDataSets.map((dataSet) => spRegistry.getProvider(dataSet.providerId))) - ).filter((provider) => { - provider != null && - (!withIpni || provider.products.PDP?.data.ipniIpfs) && - (dev || provider.products.PDP?.capabilities?.dev == null) - }) + const providers: ProviderInfo[] = ( + await Promise.all( + managedDataSets.map((dataSet: EnhancedDataSetInfo) => spRegistry.getProvider(dataSet.providerId)) + ) + ).filter( + (provider: ProviderInfo | null): provider is ProviderInfo => + provider !== null && + (!withIpni || provider.products.PDP?.data.ipniIpfs === true) && + (dev || provider.products.PDP?.capabilities?.dev === null) + ) try { const selectedProvider = await StorageContext.selectProviderWithPing(providers) // Find the first matching data set ID for this provider // Match by provider ID (stable identifier in the registry) - const matchingDataSet = managedDataSets.find((ps) => ps.providerId === selectedProvider.id) + const matchingDataSet = managedDataSets.find( + (ps: EnhancedDataSetInfo) => ps.providerId === selectedProvider.id + ) if (matchingDataSet == null) { console.warn( diff --git a/packages/synapse-sdk/src/test/storage.test.ts b/packages/synapse-sdk/src/test/storage.test.ts index f4d6e06e0..86dd939c7 100644 --- a/packages/synapse-sdk/src/test/storage.test.ts +++ b/packages/synapse-sdk/src/test/storage.test.ts @@ -2392,7 +2392,7 @@ describe('StorageService', () => { }) describe('Provider Ping Validation', () => { - describe('selectRandomProvider with ping validation', () => { + describe('selectProviderWithPing', () => { it('should select first provider that responds to ping', async () => { const testProviders: ProviderInfo[] = [ createSimpleProvider({ @@ -2428,7 +2428,7 @@ describe('StorageService', () => { } try { - const result = await (StorageContext as any).selectRandomProvider(testProviders) + const result = await (StorageContext as any).selectProviderWithPing(testProviders) // Should have selected the second provider (first one failed ping) assert.equal(result.serviceProvider, testProviders[1].serviceProvider) @@ -2438,8 +2438,6 @@ describe('StorageService', () => { } }) - // Test removed: selectRandomProvider no longer supports exclusion functionality - it('should throw error when all providers fail ping', async () => { const testProviders: ProviderInfo[] = [ createSimpleProvider({ @@ -2463,7 +2461,7 @@ describe('StorageService', () => { } try { - await (StorageContext as any).selectRandomProvider(testProviders) + await (StorageContext as any).selectProviderWithPing(testProviders) assert.fail('Should have thrown error') } catch (error: any) { assert.include(error.message, 'StorageContext selectProviderWithPing failed') From 3558b3c9125ba2d255d672f11b20a82b12fb5f6b Mon Sep 17 00:00:00 2001 From: William Morriss Date: Mon, 3 Nov 2025 20:01:08 -0600 Subject: [PATCH 3/5] fix test --- packages/synapse-sdk/src/storage/context.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/synapse-sdk/src/storage/context.ts b/packages/synapse-sdk/src/storage/context.ts index 846490fa6..528224488 100644 --- a/packages/synapse-sdk/src/storage/context.ts +++ b/packages/synapse-sdk/src/storage/context.ts @@ -634,8 +634,8 @@ export class StorageContext { ).filter( (provider: ProviderInfo | null): provider is ProviderInfo => provider !== null && - (!withIpni || provider.products.PDP?.data.ipniIpfs === true) && - (dev || provider.products.PDP?.capabilities?.dev === null) + (!withIpni || provider.products.PDP?.data.ipniIpfs !== false) && + (dev || provider.products.PDP?.capabilities?.dev == null) ) try { From fe919a7873836ea5a0899afaa65ee178867ba18d Mon Sep 17 00:00:00 2001 From: William Morriss Date: Tue, 4 Nov 2025 00:16:49 -0600 Subject: [PATCH 4/5] Promise.race the pings --- packages/synapse-sdk/src/storage/context.ts | 119 +++++++++--------- packages/synapse-sdk/src/test/storage.test.ts | 7 +- 2 files changed, 62 insertions(+), 64 deletions(-) diff --git a/packages/synapse-sdk/src/storage/context.ts b/packages/synapse-sdk/src/storage/context.ts index 528224488..847899955 100644 --- a/packages/synapse-sdk/src/storage/context.ts +++ b/packages/synapse-sdk/src/storage/context.ts @@ -31,7 +31,7 @@ import { PDPAuthHelper, PDPServer } from '../pdp/index.ts' import { PDPVerifier } from '../pdp/verifier.ts' import { asPieceCID } from '../piece/index.ts' import { SPRegistryService } from '../sp-registry/index.ts' -import type { ProviderInfo } from '../sp-registry/types.ts' +import type { ProviderInfo, ServiceProduct } from '../sp-registry/types.ts' import type { Synapse } from '../synapse.ts' import type { CreateContextsOptions, @@ -638,37 +638,35 @@ export class StorageContext { (dev || provider.products.PDP?.capabilities?.dev == null) ) - try { - const selectedProvider = await StorageContext.selectProviderWithPing(providers) + const selectedProvider = await StorageContext.selectProviderWithPing(providers) - // Find the first matching data set ID for this provider - // Match by provider ID (stable identifier in the registry) - const matchingDataSet = managedDataSets.find( - (ps: EnhancedDataSetInfo) => ps.providerId === selectedProvider.id - ) + if (selectedProvider == null) { + continue + } - if (matchingDataSet == null) { - console.warn( - `Could not match selected provider ${selectedProvider.serviceProvider} (ID: ${selectedProvider.id}) ` + - `to existing data sets. Falling back to selecting from all providers.` - ) - // Fall through to select from all approved providers below - } else { - // Fetch metadata for existing data set - const dataSetMetadata = await warmStorageService.getDataSetMetadata(matchingDataSet.pdpVerifierDataSetId) - - return { - provider: selectedProvider, - dataSetId: matchingDataSet.pdpVerifierDataSetId, - isExisting: true, - dataSetMetadata, - } - } - } catch (_error) { - console.warn('All providers from existing data sets failed health check. Falling back to all providers.') + // Find the first matching data set ID for this provider + // Match by provider ID (stable identifier in the registry) + const matchingDataSet = managedDataSets.find((ps: EnhancedDataSetInfo) => ps.providerId === selectedProvider.id) + + if (matchingDataSet == null) { + console.warn( + `Could not match selected provider ${selectedProvider.serviceProvider} (ID: ${selectedProvider.id}) ` + + `to existing data sets. Falling back to selecting from all providers.` + ) // Fall through to select from all approved providers below + } else { + // Fetch metadata for existing data set + const dataSetMetadata = await warmStorageService.getDataSetMetadata(matchingDataSet.pdpVerifierDataSetId) + + return { + provider: selectedProvider, + dataSetId: matchingDataSet.pdpVerifierDataSetId, + isExisting: true, + dataSetMetadata, + } } } + console.warn('All providers from existing data sets failed health check. Falling back to all providers.') } // No existing data sets - select from all approved providers. First we get approved IDs from @@ -686,9 +684,17 @@ export class StorageContext { throw createError('StorageContext', 'smartSelectProvider', NO_REMAINING_PROVIDERS_ERROR_MESSAGE) } - // Random selection from all providers + // Select from all providers const provider = await StorageContext.selectProviderWithPing(allProviders) + if (provider == null) { + throw createError( + 'StorageContext', + 'selectProviderWithPing', + `All ${allProviders.length} providers failed health check. Storage may be temporarily unavailable.` + ) + } + return { provider, dataSetId: -1, // Marker for new data set @@ -698,46 +704,41 @@ export class StorageContext { } /** - * Select a provider from an async iterator with ping validation. - * This is shared logic used by both smart selection and random selection. - * @param providers - Async iterable of providers to try - * @returns The first provider that responds - * @throws If all providers fail + * Select a provider with ping validation. + * @param providers - providers to try + * @returns The first provider that responds, or null if none do */ - private static async selectProviderWithPing(providers: ProviderInfo[]): Promise { - let providerCount = 0 + private static async selectProviderWithPing(providers: ProviderInfo[]): Promise { + type ProviderWithPDP = ProviderInfo & { + products: { + PDP: ServiceProduct + } + } - // Try providers in order until we find one that responds to ping - for (const provider of providers) { - providerCount++ + function hasPDP(provider: ProviderInfo): provider is ProviderWithPDP { + return provider.products.PDP != null + } + // Ping all providers + const pings = providers.filter(hasPDP).map((provider, index) => + new PDPServer(null, provider.products.PDP.data.serviceURL).ping().then( + () => Promise.resolve(provider), + (error) => Promise.reject({ error, index, provider }) + ) + ) + let remaining = pings.length + while (remaining-- > 0) { try { - // Create a temporary PDPServer for this specific provider's endpoint - if (!provider.products.PDP?.data.serviceURL) { - // Skip providers without PDP products - continue - } - const providerPdpServer = new PDPServer(null, provider.products.PDP.data.serviceURL) - await providerPdpServer.ping() - return provider - } catch (error) { + return await Promise.race(pings) + } catch (err: any) { + const { error, index, provider } = err console.warn( `Provider ${provider.serviceProvider} failed ping test:`, error instanceof Error ? error.message : String(error) ) - // Continue to next provider + pings[index] = new Promise(() => undefined) } } - - // All providers failed ping test - if (providerCount === 0) { - throw createError('StorageContext', 'selectProviderWithPing', 'No providers available to select from') - } - - throw createError( - 'StorageContext', - 'selectProviderWithPing', - `All ${providerCount} providers failed health check. Storage may be temporarily unavailable.` - ) + return null } /** diff --git a/packages/synapse-sdk/src/test/storage.test.ts b/packages/synapse-sdk/src/test/storage.test.ts index 86dd939c7..3072e9fc5 100644 --- a/packages/synapse-sdk/src/test/storage.test.ts +++ b/packages/synapse-sdk/src/test/storage.test.ts @@ -2461,11 +2461,8 @@ describe('StorageService', () => { } try { - await (StorageContext as any).selectProviderWithPing(testProviders) - assert.fail('Should have thrown error') - } catch (error: any) { - assert.include(error.message, 'StorageContext selectProviderWithPing failed') - assert.include(error.message, 'All 2 providers failed health check') + const provider = await (StorageContext as any).selectProviderWithPing(testProviders) + assert.isNull(provider) } finally { global.fetch = originalFetch } From 040b16f57d4ce85fb49485bf55e72781db9616e3 Mon Sep 17 00:00:00 2001 From: William Morriss Date: Tue, 4 Nov 2025 00:40:16 -0600 Subject: [PATCH 5/5] rm unused randIndex, but keep fallbackRandIndex, still used by fallbackRandU256 --- packages/synapse-core/src/utils/rand.ts | 15 ---- packages/synapse-sdk/src/test/rand.test.ts | 85 +++++++++++----------- 2 files changed, 41 insertions(+), 59 deletions(-) diff --git a/packages/synapse-core/src/utils/rand.ts b/packages/synapse-core/src/utils/rand.ts index e682615ac..fdd9f39aa 100644 --- a/packages/synapse-core/src/utils/rand.ts +++ b/packages/synapse-core/src/utils/rand.ts @@ -30,18 +30,3 @@ export function randU256(): bigint { export function fallbackRandIndex(length: number): number { return Math.floor(Math.random() * length) } - -/** - * Provides a random index into an array of supplied length (0 <= index < length) - * @param length - exclusive upper boundary - * @returns a valid index - */ -export function randIndex(length: number): number { - if (crypto?.getRandomValues != null) { - const randomBytes = new Uint32Array(1) - crypto.getRandomValues(randomBytes) - return randomBytes[0] % length - } else { - return fallbackRandIndex(length) - } -} diff --git a/packages/synapse-sdk/src/test/rand.test.ts b/packages/synapse-sdk/src/test/rand.test.ts index 09d4f1683..88bc81c72 100644 --- a/packages/synapse-sdk/src/test/rand.test.ts +++ b/packages/synapse-sdk/src/test/rand.test.ts @@ -1,52 +1,49 @@ /* globals describe it */ -import { fallbackRandIndex, fallbackRandU256, randIndex, randU256 } from '@filoz/synapse-core/utils' +import { fallbackRandIndex, fallbackRandU256, randU256 } from '@filoz/synapse-core/utils' import { assert } from 'chai' -const randIndexMethods = [randIndex, fallbackRandIndex] -randIndexMethods.forEach((randIndexMethod) => { - describe(randIndexMethod.name, () => { - it('should return 0 for length 1', () => { - for (let i = 0; i < 32; i++) { - assert.equal(0, randIndexMethod(1)) - } - }) - it('returns both 0 and 1 for length 2', () => { - const counts = [0, 0] - for (let i = 0; i < 32; i++) { - counts[randIndexMethod(counts.length)]++ - } - // this test can fail probabilistically but the probability is low - // each bit should be independent with 50% likelihood - // the probability of getting the same index N times is 2**(1-N) - // so if this test fails, the 50% assumption is likely wrong - assert.isAtLeast(counts[0], 1) - assert.isAtLeast(counts[1], 1) - }) - it('has at least 10 random bits', () => { - const counts = [] - for (let i = 0; i < 10; i++) { - counts.push([0, 0]) - } - for (let i = 0; i < 32; i++) { - let index = randIndexMethod(1024) - assert.isAtLeast(index, 0) - assert.isAtMost(index, 1023) - for (let j = 0; j < 10; j++) { - counts[j][index & 1]++ - index >>= 1 - } - assert.equal(index, 0) - } - // this test can fail probabilistically but the probability is low - // each bit should be independent with 50% likelihood - // the probability of getting the same bitvalue N times is 2**(1-N) - // so if this test fails, the 50% assumption is likely wrong - for (let i = 0; i < 10; i++) { - assert.isAtLeast(counts[i][0], 1) - assert.isAtLeast(counts[i][1], 1) +describe('fallbackRandIndex', () => { + it('should return 0 for length 1', () => { + for (let i = 0; i < 32; i++) { + assert.equal(0, fallbackRandIndex(1)) + } + }) + it('returns both 0 and 1 for length 2', () => { + const counts = [0, 0] + for (let i = 0; i < 32; i++) { + counts[fallbackRandIndex(counts.length)]++ + } + // this test can fail probabilistically but the probability is low + // each bit should be independent with 50% likelihood + // the probability of getting the same index N times is 2**(1-N) + // so if this test fails, the 50% assumption is likely wrong + assert.isAtLeast(counts[0], 1) + assert.isAtLeast(counts[1], 1) + }) + it('has at least 10 random bits', () => { + const counts = [] + for (let i = 0; i < 10; i++) { + counts.push([0, 0]) + } + for (let i = 0; i < 32; i++) { + let index = fallbackRandIndex(1024) + assert.isAtLeast(index, 0) + assert.isAtMost(index, 1023) + for (let j = 0; j < 10; j++) { + counts[j][index & 1]++ + index >>= 1 } - }) + assert.equal(index, 0) + } + // this test can fail probabilistically but the probability is low + // each bit should be independent with 50% likelihood + // the probability of getting the same bitvalue N times is 2**(1-N) + // so if this test fails, the 50% assumption is likely wrong + for (let i = 0; i < 10; i++) { + assert.isAtLeast(counts[i][0], 1) + assert.isAtLeast(counts[i][1], 1) + } }) })