From f7db3485f6022cc570c507d78c07178b69bbf341 Mon Sep 17 00:00:00 2001 From: Ameya Naik Date: Fri, 6 Mar 2026 20:40:05 -0800 Subject: [PATCH 01/11] fix: fall back to individual deletes when backend returns NotImplemented for DeleteObjects Some S3-compatible backends (e.g. GCS in S3 interoperability mode) do not support the DeleteObjects batch API and return a NotImplemented error. Catch this and fall back to individual DeleteObjectCommand calls via Promise.allSettled, so a single failure does not abort remaining deletes mid-flight. --- src/storage/backend/s3/adapter.ts | 10 ++++++ src/test/s3-adapter.test.ts | 52 +++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/src/storage/backend/s3/adapter.ts b/src/storage/backend/s3/adapter.ts index 5c8fc4736..ab0d051b1 100644 --- a/src/storage/backend/s3/adapter.ts +++ b/src/storage/backend/s3/adapter.ts @@ -343,6 +343,16 @@ export class S3Backend implements StorageBackendAdapter { }) await this.client.send(command) } catch (e) { + // Some S3-compatible backends (e.g. GCS) do not support DeleteObjects; fall back to individual deletes + const code = (e as { Code?: string; name?: string })?.Code ?? (e as { name?: string })?.name + if (code === 'NotImplemented') { + await Promise.allSettled( + prefixes.map((key) => + this.client.send(new DeleteObjectCommand({ Bucket: bucket, Key: key })) + ) + ) + return + } throw StorageBackendError.fromError(e) } } diff --git a/src/test/s3-adapter.test.ts b/src/test/s3-adapter.test.ts index 0d5288e5f..27bb565db 100644 --- a/src/test/s3-adapter.test.ts +++ b/src/test/s3-adapter.test.ts @@ -74,4 +74,56 @@ describe('S3Backend', () => { expect(result.metadata.mimetype).toBe('image/png') }) }) + + describe('deleteObjects', () => { + test('should use batch DeleteObjectsCommand when backend supports it', async () => { + mockSend.mockResolvedValue({ + Deleted: [{ Key: 'file1.txt' }, { Key: 'file2.txt' }], + $metadata: { httpStatusCode: 200 }, + }) + + const backend = new S3Backend({ region: 'us-east-1', endpoint: 'http://localhost:9000' }) + await backend.deleteObjects('test-bucket', ['file1.txt', 'file2.txt']) + + expect(mockSend).toHaveBeenCalledTimes(1) + expect(mockSend.mock.calls[0][0].constructor.name).toBe('DeleteObjectsCommand') + }) + + test('should fall back to individual DeleteObjectCommands when backend returns NotImplemented', async () => { + const err = Object.assign(new Error('NotImplemented'), { Code: 'NotImplemented' }) + mockSend + .mockRejectedValueOnce(err) + .mockResolvedValue({ $metadata: { httpStatusCode: 204 } }) + + const backend = new S3Backend({ region: 'us-east-1', endpoint: 'http://localhost:9000' }) + await backend.deleteObjects('test-bucket', ['file1.txt', 'file2.txt']) + + expect(mockSend).toHaveBeenCalledTimes(3) + expect(mockSend.mock.calls[0][0].constructor.name).toBe('DeleteObjectsCommand') + expect(mockSend.mock.calls[1][0].constructor.name).toBe('DeleteObjectCommand') + expect(mockSend.mock.calls[2][0].constructor.name).toBe('DeleteObjectCommand') + }) + + test('should not throw if some individual fallback deletes fail', async () => { + const notImplemented = Object.assign(new Error('NotImplemented'), { Code: 'NotImplemented' }) + mockSend + .mockRejectedValueOnce(notImplemented) + .mockResolvedValueOnce({ $metadata: { httpStatusCode: 204 } }) + .mockRejectedValueOnce(new Error('AccessDenied')) + + const backend = new S3Backend({ region: 'us-east-1', endpoint: 'http://localhost:9000' }) + await expect( + backend.deleteObjects('test-bucket', ['file1.txt', 'file2.txt']) + ).resolves.toBeUndefined() + }) + + test('should rethrow errors that are not NotImplemented', async () => { + const err = Object.assign(new Error('AccessDenied'), { Code: 'AccessDenied' }) + mockSend.mockRejectedValue(err) + + const backend = new S3Backend({ region: 'us-east-1', endpoint: 'http://localhost:9000' }) + await expect(backend.deleteObjects('test-bucket', ['file1.txt'])).rejects.toThrow() + expect(mockSend).toHaveBeenCalledTimes(1) + }) + }) }) From cf308f16846ebf17f8dd8fc04068c4fa950679a1 Mon Sep 17 00:00:00 2001 From: Ameya Naik Date: Fri, 6 Mar 2026 20:49:19 -0800 Subject: [PATCH 02/11] fix: inspect allSettled results in NotImplemented fallback Follow the same pattern as the file.ts backend: iterate allSettled results and throw on any individual delete failure except NoSuchKey (object already gone), preventing orphaned S3 objects when a delete fails mid-batch. Also updates tests to cover NoSuchKey-ignore and real-error-throw behaviour. --- src/storage/backend/s3/adapter.ts | 12 +++++++++++- src/test/s3-adapter.test.ts | 25 ++++++++++++++++++++----- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/src/storage/backend/s3/adapter.ts b/src/storage/backend/s3/adapter.ts index ab0d051b1..ca1fd8112 100644 --- a/src/storage/backend/s3/adapter.ts +++ b/src/storage/backend/s3/adapter.ts @@ -346,11 +346,21 @@ export class S3Backend implements StorageBackendAdapter { // Some S3-compatible backends (e.g. GCS) do not support DeleteObjects; fall back to individual deletes const code = (e as { Code?: string; name?: string })?.Code ?? (e as { name?: string })?.name if (code === 'NotImplemented') { - await Promise.allSettled( + const results = await Promise.allSettled( prefixes.map((key) => this.client.send(new DeleteObjectCommand({ Bucket: bucket, Key: key })) ) ) + for (const result of results) { + if (result.status === 'rejected') { + const errCode = + (result.reason as { Code?: string })?.Code ?? + (result.reason as { name?: string })?.name + if (errCode !== 'NoSuchKey') { + throw StorageBackendError.fromError(result.reason) + } + } + } return } throw StorageBackendError.fromError(e) diff --git a/src/test/s3-adapter.test.ts b/src/test/s3-adapter.test.ts index 27bb565db..036400004 100644 --- a/src/test/s3-adapter.test.ts +++ b/src/test/s3-adapter.test.ts @@ -20,9 +20,9 @@ describe('S3Backend', () => { beforeEach(() => { jest.clearAllMocks() mockSend = jest.fn() - ;(S3Client as jest.Mock).mockImplementation(() => ({ - send: mockSend, - })) + ; (S3Client as jest.Mock).mockImplementation(() => ({ + send: mockSend, + })) }) describe('getObject', () => { @@ -104,12 +104,13 @@ describe('S3Backend', () => { expect(mockSend.mock.calls[2][0].constructor.name).toBe('DeleteObjectCommand') }) - test('should not throw if some individual fallback deletes fail', async () => { + test('should ignore NoSuchKey errors in the individual fallback', async () => { const notImplemented = Object.assign(new Error('NotImplemented'), { Code: 'NotImplemented' }) + const noSuchKey = Object.assign(new Error('NoSuchKey'), { Code: 'NoSuchKey' }) mockSend .mockRejectedValueOnce(notImplemented) .mockResolvedValueOnce({ $metadata: { httpStatusCode: 204 } }) - .mockRejectedValueOnce(new Error('AccessDenied')) + .mockRejectedValueOnce(noSuchKey) const backend = new S3Backend({ region: 'us-east-1', endpoint: 'http://localhost:9000' }) await expect( @@ -117,6 +118,20 @@ describe('S3Backend', () => { ).resolves.toBeUndefined() }) + test('should throw when an individual fallback delete fails with a real error', async () => { + const notImplemented = Object.assign(new Error('NotImplemented'), { Code: 'NotImplemented' }) + const accessDenied = Object.assign(new Error('AccessDenied'), { Code: 'AccessDenied' }) + mockSend + .mockRejectedValueOnce(notImplemented) + .mockResolvedValueOnce({ $metadata: { httpStatusCode: 204 } }) + .mockRejectedValueOnce(accessDenied) + + const backend = new S3Backend({ region: 'us-east-1', endpoint: 'http://localhost:9000' }) + await expect( + backend.deleteObjects('test-bucket', ['file1.txt', 'file2.txt']) + ).rejects.toThrow() + }) + test('should rethrow errors that are not NotImplemented', async () => { const err = Object.assign(new Error('AccessDenied'), { Code: 'AccessDenied' }) mockSend.mockRejectedValue(err) From 6d4e96c3b9a90cbc20c2f6a1c32525617836d27e Mon Sep 17 00:00:00 2001 From: Ameya Naik Date: Thu, 19 Mar 2026 12:42:14 -0700 Subject: [PATCH 03/11] feat: add STORAGE_S3_BATCH_DELETE_ENABLED config flag and refactor individual-delete helpers - Add storageS3BatchDeleteEnabled config flag (default: true) Set STORAGE_S3_BATCH_DELETE_ENABLED=false for S3-compatible backends (e.g. GCS) that don't support the bulk DeleteObjects API. - S3Backend.deleteObjects: when flag is false, skip DeleteObjectsCommand and go straight to individual deletes. Extract private deleteObjectsIndividually() helper shared by the config-off path and the existing NotImplemented runtime fallback. - S3Locker: add batchDeleteEnabled option (default: true) wired from config in the TUS route. Extract deleteLocksIndividually() helper used by cleanupZombieLocks for both the config-off path and the NotImplemented fallback added in the previous commit. - Tests: 2 new unit tests in s3-adapter.test.ts and 1 new integration test in s3-locker.test.ts covering the batchDeleteEnabled=false paths. --- src/config.ts | 11 +- src/http/routes/tus/index.ts | 34 +++--- src/storage/backend/s3/adapter.ts | 74 +++++++----- src/storage/protocols/tus/s3-locker.ts | 49 +++++++- src/test/s3-adapter.test.ts | 39 ++++++ src/test/s3-locker.test.ts | 159 +++++++++++++++++++++++++ 6 files changed, 314 insertions(+), 52 deletions(-) diff --git a/src/config.ts b/src/config.ts index 7b573c1c7..34e14710f 100644 --- a/src/config.ts +++ b/src/config.ts @@ -65,6 +65,7 @@ type StorageConfigType = { storageFileEtagAlgorithm: 'mtime' | 'md5' storageS3InternalTracesEnabled?: boolean storageS3MaxSockets: number + storageS3BatchDeleteEnabled: boolean storageS3DisableChecksum: boolean storageS3UploadQueueSize: number storageS3Bucket: string @@ -265,8 +266,8 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { tenantId: isMultitenant ? '' : getOptionalConfigFromEnv('PROJECT_REF') || - getOptionalConfigFromEnv('TENANT_ID') || - 'storage-single-tenant', + getOptionalConfigFromEnv('TENANT_ID') || + 'storage-single-tenant', // Server region: getOptionalConfigFromEnv('SERVER_REGION', 'REGION') || 'not-specified', @@ -364,6 +365,8 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { getOptionalConfigFromEnv('STORAGE_S3_MAX_SOCKETS', 'GLOBAL_S3_MAX_SOCKETS') || '200', 10 ), + storageS3BatchDeleteEnabled: + getOptionalConfigFromEnv('STORAGE_S3_BATCH_DELETE_ENABLED') !== 'false', storageS3DisableChecksum: getOptionalConfigFromEnv('STORAGE_S3_DISABLE_CHECKSUM') === 'true', storageS3UploadQueueSize: envNumber(getOptionalConfigFromEnv('STORAGE_S3_UPLOAD_QUEUE_SIZE')) ?? 2, @@ -539,12 +542,12 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { size: { min: parseInt( getOptionalConfigFromEnv('IMAGE_TRANSFORMATION_LIMIT_MIN_SIZE', 'IMG_LIMITS_MIN_SIZE') || - '1', + '1', 10 ), max: parseInt( getOptionalConfigFromEnv('IMAGE_TRANSFORMATION_LIMIT_MAX_SIZE', 'IMG_LIMITS_MAX_SIZE') || - '2000', + '2000', 10 ), }, diff --git a/src/http/routes/tus/index.ts b/src/http/routes/tus/index.ts index b75f88cab..e3cedf6d1 100644 --- a/src/http/routes/tus/index.ts +++ b/src/http/routes/tus/index.ts @@ -36,6 +36,7 @@ const { storageS3ForcePathStyle, storageS3Region, storageS3ClientTimeout, + storageS3BatchDeleteEnabled, tusUrlExpiryMs, tusPath, tusPartSize, @@ -117,6 +118,7 @@ function createTusServer( maxRetries: 10, retryDelayMs: 250, renewalIntervalMs: 10 * 1000, // 10 seconds + batchDeleteEnabled: storageS3BatchDeleteEnabled, s3Client: new S3Client({ requestHandler: new NodeHttpHandler({ ...agent, @@ -256,14 +258,14 @@ const authenticatedRoutes = fastifyPlugin( }) fastify.addHook('preHandler', async (req) => { - ;(req.raw as MultiPartRequest).log = req.log - ;(req.raw as MultiPartRequest).upload = { - storage: req.storage, - owner: req.owner, - tenantId: req.tenantId, - db: req.db, - isUpsert: req.headers['x-upsert'] === 'true', - } + ; (req.raw as MultiPartRequest).log = req.log + ; (req.raw as MultiPartRequest).upload = { + storage: req.storage, + owner: req.owner, + tenantId: req.tenantId, + db: req.db, + isUpsert: req.headers['x-upsert'] === 'true', + } }) fastify.post( @@ -358,14 +360,14 @@ const publicRoutes = fastifyPlugin( ) fastify.addHook('preHandler', async (req) => { - ;(req.raw as MultiPartRequest).log = req.log - ;(req.raw as MultiPartRequest).upload = { - storage: req.storage, - owner: req.owner, - tenantId: req.tenantId, - db: req.db, - isUpsert: req.headers['x-upsert'] === 'true', - } + ; (req.raw as MultiPartRequest).log = req.log + ; (req.raw as MultiPartRequest).upload = { + storage: req.storage, + owner: req.owner, + tenantId: req.tenantId, + db: req.db, + isUpsert: req.headers['x-upsert'] === 'true', + } }) fastify.options( diff --git a/src/storage/backend/s3/adapter.ts b/src/storage/backend/s3/adapter.ts index ca1fd8112..9def4e56a 100644 --- a/src/storage/backend/s3/adapter.ts +++ b/src/storage/backend/s3/adapter.ts @@ -185,14 +185,14 @@ export class S3Backend implements StorageBackendAdapter { const metadata = hasUploadedBytes ? await this.headObject(bucketName, key, version) : { - httpStatusCode: 200, - eTag: data.ETag || '', - mimetype: contentType, - lastModified: new Date(), - size: 0, - contentLength: 0, - contentRange: undefined, - } + httpStatusCode: 200, + eTag: data.ETag || '', + mimetype: contentType, + lastModified: new Date(), + size: 0, + contentLength: 0, + contentRange: undefined, + } return { httpStatusCode: data.$metadata.httpStatusCode || metadata.httpStatusCode, @@ -330,6 +330,13 @@ export class S3Backend implements StorageBackendAdapter { * @param prefixes */ async deleteObjects(bucket: string, prefixes: string[]): Promise { + const { storageS3BatchDeleteEnabled } = getConfig() + + if (!storageS3BatchDeleteEnabled) { + // Batch delete explicitly disabled (e.g. GCS S3-interop mode) + return this.deleteObjectsIndividually(bucket, prefixes) + } + try { const s3Prefixes = prefixes.map((ele) => { return { Key: ele } @@ -346,27 +353,36 @@ export class S3Backend implements StorageBackendAdapter { // Some S3-compatible backends (e.g. GCS) do not support DeleteObjects; fall back to individual deletes const code = (e as { Code?: string; name?: string })?.Code ?? (e as { name?: string })?.name if (code === 'NotImplemented') { - const results = await Promise.allSettled( - prefixes.map((key) => - this.client.send(new DeleteObjectCommand({ Bucket: bucket, Key: key })) - ) - ) - for (const result of results) { - if (result.status === 'rejected') { - const errCode = - (result.reason as { Code?: string })?.Code ?? - (result.reason as { name?: string })?.name - if (errCode !== 'NoSuchKey') { - throw StorageBackendError.fromError(result.reason) - } - } - } - return + return this.deleteObjectsIndividually(bucket, prefixes) } throw StorageBackendError.fromError(e) } } + /** + * Deletes objects one-by-one in parallel. + * Used when batch delete is disabled or when the S3 backend returns NotImplemented. + * NoSuchKey errors are silently ignored (object already gone). + * @param bucket + * @param prefixes + */ + private async deleteObjectsIndividually(bucket: string, prefixes: string[]): Promise { + const results = await Promise.allSettled( + prefixes.map((key) => + this.client.send(new DeleteObjectCommand({ Bucket: bucket, Key: key })) + ) + ) + for (const result of results) { + if (result.status === 'rejected') { + const errCode = + (result.reason as { Code?: string })?.Code ?? (result.reason as { name?: string })?.name + if (errCode !== 'NoSuchKey') { + throw StorageBackendError.fromError(result.reason) + } + } + } + } + /** * Returns metadata information of a specific object * @param bucket @@ -458,9 +474,9 @@ export class S3Backend implements StorageBackendAdapter { ContentType: contentType, Metadata: metadata ? { - ...metadata, - Version: version || '', - } + ...metadata, + Version: version || '', + } : undefined, }) @@ -539,8 +555,8 @@ export class S3Backend implements StorageBackendAdapter { parts.length === 0 ? undefined : { - Parts: parts, - }, + Parts: parts, + }, }) const response = await this.client.send(completeUpload) diff --git a/src/storage/protocols/tus/s3-locker.ts b/src/storage/protocols/tus/s3-locker.ts index 7e75a4901..0184e1a24 100644 --- a/src/storage/protocols/tus/s3-locker.ts +++ b/src/storage/protocols/tus/s3-locker.ts @@ -19,6 +19,8 @@ export interface S3LockerOptions { renewalIntervalMs?: number maxRetries?: number retryDelayMs?: number + /** When false, skips DeleteObjectsCommand in zombie-lock cleanup and uses individual deletes. Default: true */ + batchDeleteEnabled?: boolean logger?: Pick } @@ -37,6 +39,7 @@ export class S3Locker implements Locker { private readonly renewalIntervalMs: number private readonly maxRetries: number private readonly retryDelayMs: number + private readonly batchDeleteEnabled: boolean private readonly logger: Pick private readonly notifier: LockNotifier @@ -49,6 +52,7 @@ export class S3Locker implements Locker { this.renewalIntervalMs = options.renewalIntervalMs || 10000 // 10 seconds this.maxRetries = options.maxRetries || 10 this.retryDelayMs = options.retryDelayMs || 500 + this.batchDeleteEnabled = options.batchDeleteEnabled !== false // default true this.logger = options.logger || console // Validate configuration @@ -244,6 +248,13 @@ export class S3Locker implements Locker { for (let i = 0; i < expiredLocks.length; i += 1000) { const batch = expiredLocks.slice(i, i + 1000) + if (!this.batchDeleteEnabled) { + // Batch delete explicitly disabled — use individual deletes directly + await this.deleteLocksIndividually(batch) + this.logger.log(`Cleaned up ${batch.length} expired locks (individual, batch disabled)`) + continue + } + try { await this.s3Client.send( new DeleteObjectsCommand({ @@ -255,8 +266,18 @@ export class S3Locker implements Locker { }) ) this.logger.log(`Cleaned up ${batch.length} expired locks in batch`) - } catch (error) { - this.logger.warn(`Failed to delete batch of expired locks:`, error) + } catch (error: any) { + // Some S3-compatible backends (e.g. GCS) do not support DeleteObjects; + // fall back to individual deletes so zombie-lock cleanup still works. + const code = error?.Code ?? error?.name + if (code === 'NotImplemented') { + await this.deleteLocksIndividually(batch) + this.logger.log( + `Cleaned up ${batch.length} expired locks in batch (individual fallback)` + ) + } else { + this.logger.warn(`Failed to delete batch of expired locks:`, error) + } } } } @@ -282,6 +303,28 @@ export class S3Locker implements Locker { } } + /** + * Deletes a batch of lock keys one-by-one in parallel. + * NoSuchKey is ignored (lock already gone). Other errors are logged as warnings. + */ + private async deleteLocksIndividually(keys: string[]): Promise { + const results = await Promise.allSettled( + keys.map((key) => + this.s3Client.send(new DeleteObjectCommand({ Bucket: this.bucket, Key: key })) + ) + ) + for (const result of results) { + if (result.status === 'rejected') { + const errCode = + (result.reason as { Code?: string })?.Code ?? + (result.reason as { name?: string })?.name + if (errCode !== 'NoSuchKey') { + this.logger.warn(`Failed to delete expired lock in fallback:`, result.reason) + } + } + } + } + private async checkAndCleanupExpiredLock(lockKey: string, signal: AbortSignal): Promise { if (signal.aborted) { return false @@ -350,7 +393,7 @@ export class S3Lock implements Lock { private readonly id: string, private readonly locker: S3Locker, private readonly notifier: LockNotifier - ) {} + ) { } async lock(stopSignal: AbortSignal, cancelReq: RequestRelease): Promise { // Set up abort handler to clean up in case of abort diff --git a/src/test/s3-adapter.test.ts b/src/test/s3-adapter.test.ts index 036400004..9e153697d 100644 --- a/src/test/s3-adapter.test.ts +++ b/src/test/s3-adapter.test.ts @@ -2,6 +2,7 @@ import { S3Client } from '@aws-sdk/client-s3' import { Readable } from 'stream' +import * as config from '../config' import { S3Backend } from '../storage/backend/s3/adapter' jest.mock('@aws-sdk/client-s3', () => { @@ -140,5 +141,43 @@ describe('S3Backend', () => { await expect(backend.deleteObjects('test-bucket', ['file1.txt'])).rejects.toThrow() expect(mockSend).toHaveBeenCalledTimes(1) }) + + test('should skip DeleteObjectsCommand and use individual deletes when batchDeleteEnabled is false', async () => { + const getConfigSpy = jest + .spyOn(config, 'getConfig') + .mockReturnValue({ storageS3BatchDeleteEnabled: false } as any) + mockSend.mockResolvedValue({ $metadata: { httpStatusCode: 204 } }) + + try { + const backend = new S3Backend({ region: 'us-east-1', endpoint: 'http://localhost:9000' }) + await backend.deleteObjects('test-bucket', ['file1.txt', 'file2.txt']) + + // No DeleteObjectsCommand call — straight to individual deletes + expect(mockSend).toHaveBeenCalledTimes(2) + expect(mockSend.mock.calls[0][0].constructor.name).toBe('DeleteObjectCommand') + expect(mockSend.mock.calls[1][0].constructor.name).toBe('DeleteObjectCommand') + } finally { + getConfigSpy.mockRestore() + } + }) + + test('should ignore NoSuchKey when batchDeleteEnabled is false', async () => { + const getConfigSpy = jest + .spyOn(config, 'getConfig') + .mockReturnValue({ storageS3BatchDeleteEnabled: false } as any) + const noSuchKey = Object.assign(new Error('NoSuchKey'), { Code: 'NoSuchKey' }) + mockSend + .mockResolvedValueOnce({ $metadata: { httpStatusCode: 204 } }) + .mockRejectedValueOnce(noSuchKey) + + try { + const backend = new S3Backend({ region: 'us-east-1', endpoint: 'http://localhost:9000' }) + await expect( + backend.deleteObjects('test-bucket', ['file1.txt', 'file2.txt']) + ).resolves.toBeUndefined() + } finally { + getConfigSpy.mockRestore() + } + }) }) }) diff --git a/src/test/s3-locker.test.ts b/src/test/s3-locker.test.ts index 8954d045e..cd8c1a445 100644 --- a/src/test/s3-locker.test.ts +++ b/src/test/s3-locker.test.ts @@ -782,4 +782,163 @@ describe('S3Locker', () => { await lock.unlock() }) }) + + describe('cleanupZombieLocks – NotImplemented fallback', () => { + // Helper: place a pre-expired lock object in S3 + async function putExpiredLock(key: string) { + await s3Client.send( + new PutObjectCommand({ + Bucket: testBucket, + Key: key, + Body: JSON.stringify({ + lockId: key, + expiresAt: Date.now() - 10000, + createdAt: Date.now() - 20000, + renewedAt: Date.now() - 20000, + }), + ContentType: 'application/json', + }) + ) + } + + test('falls back to individual deletes when DeleteObjectsCommand returns NotImplemented', async () => { + const lockKey = 'test-locks/not-impl-fallback-lock.lock' + await putExpiredLock(lockKey) + + const originalSend = s3Client.send.bind(s3Client) + const sendSpy = jest.spyOn(s3Client, 'send').mockImplementation(async (command: any) => { + // Only reject the batch delete; let everything else through + if (command.constructor.name === 'DeleteObjectsCommand') { + const err: any = new Error('NotImplemented') + err.name = 'NotImplemented' + throw err + } + return originalSend(command) + }) + + try { + // Should not throw – fallback handles the error + await expect(locker.cleanupZombieLocks()).resolves.not.toThrow() + } finally { + sendSpy.mockRestore() + // Cleanup in case individual delete didn't run + try { + await originalSend( + // @ts-ignore - manual cleanup + new (await import('@aws-sdk/client-s3')).DeleteObjectCommand({ + Bucket: testBucket, + Key: lockKey, + }) + ) + } catch { + // ignore + } + } + }) + + test('ignores NoSuchKey errors for individual deletes in NotImplemented fallback', async () => { + const lockKey = 'test-locks/not-impl-nosuchkey-lock.lock' + await putExpiredLock(lockKey) + + const originalSend = s3Client.send.bind(s3Client) + const sendSpy = jest.spyOn(s3Client, 'send').mockImplementation(async (command: any) => { + if (command.constructor.name === 'DeleteObjectsCommand') { + const err: any = new Error('NotImplemented') + err.name = 'NotImplemented' + throw err + } + if (command.constructor.name === 'DeleteObjectCommand') { + const err: any = new Error('NoSuchKey') + err.name = 'NoSuchKey' + throw err + } + return originalSend(command) + }) + + try { + // NoSuchKey on individual deletes must be swallowed; method must not throw + await expect(locker.cleanupZombieLocks()).resolves.not.toThrow() + } finally { + sendSpy.mockRestore() + } + }) + + test('warns (but does not throw) on real individual-delete errors in fallback', async () => { + const lockKey = 'test-locks/not-impl-real-err-lock.lock' + await putExpiredLock(lockKey) + + const mockWarn = jest.fn() + const warnLocker = new S3Locker({ + s3Client, + bucket: testBucket, + notifier: mockNotifier, + keyPrefix: 'test-locks/', + lockTtlMs: 5000, + renewalIntervalMs: 1000, + logger: { log: jest.fn(), warn: mockWarn, error: jest.fn() }, + }) + + const originalSend = s3Client.send.bind(s3Client) + const sendSpy = jest.spyOn(s3Client, 'send').mockImplementation(async (command: any) => { + if (command.constructor.name === 'DeleteObjectsCommand') { + const err: any = new Error('NotImplemented') + err.name = 'NotImplemented' + throw err + } + if (command.constructor.name === 'DeleteObjectCommand') { + const err: any = new Error('AccessDenied') + err.name = 'AccessDenied' + throw err + } + return originalSend(command) + }) + + try { + await expect(warnLocker.cleanupZombieLocks()).resolves.not.toThrow() + // The individual failure should be logged as a warning + expect(mockWarn).toHaveBeenCalledWith( + expect.stringContaining('Failed to delete expired lock in fallback:'), + expect.anything() + ) + } finally { + sendSpy.mockRestore() + } + }) + + test('skips DeleteObjectsCommand entirely when batchDeleteEnabled is false', async () => { + const lockKey = 'test-locks/batch-disabled-lock.lock' + await putExpiredLock(lockKey) + + const deletedKeys: string[] = [] + const originalSend = s3Client.send.bind(s3Client) + const sendSpy = jest.spyOn(s3Client, 'send').mockImplementation(async (command: any) => { + if (command.constructor.name === 'DeleteObjectsCommand') { + throw new Error('Should not have called DeleteObjectsCommand when batch is disabled') + } + if (command.constructor.name === 'DeleteObjectCommand') { + deletedKeys.push(command.input.Key) + } + return originalSend(command) + }) + + const batchDisabledLocker = new S3Locker({ + s3Client, + bucket: testBucket, + notifier: mockNotifier, + keyPrefix: 'test-locks/', + lockTtlMs: 5000, + renewalIntervalMs: 1000, + batchDeleteEnabled: false, + logger: { log: jest.fn(), warn: jest.fn(), error: jest.fn() }, + }) + + try { + await expect(batchDisabledLocker.cleanupZombieLocks()).resolves.not.toThrow() + // The expired lock should have been deleted via an individual DeleteObjectCommand + expect(deletedKeys).toContain(lockKey) + } finally { + sendSpy.mockRestore() + } + }) + }) }) From 80d7e22c6b1790923c4b3eb89eb21e718e85433a Mon Sep 17 00:00:00 2001 From: Ameya Naik Date: Thu, 19 Mar 2026 12:53:50 -0700 Subject: [PATCH 04/11] refactor: restore inline fallback logic, remove extracted private helpers Keep the existing NotImplemented fallback inline (status quo) when STORAGE_S3_BATCH_DELETE_ENABLED=true. The false path is a minimal early-return that skips the batch command entirely. --- src/storage/backend/s3/adapter.ts | 47 +++++++++++--------------- src/storage/protocols/tus/s3-locker.ts | 46 +++++++++++-------------- 2 files changed, 40 insertions(+), 53 deletions(-) diff --git a/src/storage/backend/s3/adapter.ts b/src/storage/backend/s3/adapter.ts index 9def4e56a..6faae34b9 100644 --- a/src/storage/backend/s3/adapter.ts +++ b/src/storage/backend/s3/adapter.ts @@ -333,8 +333,10 @@ export class S3Backend implements StorageBackendAdapter { const { storageS3BatchDeleteEnabled } = getConfig() if (!storageS3BatchDeleteEnabled) { - // Batch delete explicitly disabled (e.g. GCS S3-interop mode) - return this.deleteObjectsIndividually(bucket, prefixes) + await Promise.allSettled( + prefixes.map((key) => this.client.send(new DeleteObjectCommand({ Bucket: bucket, Key: key }))) + ) + return } try { @@ -353,33 +355,24 @@ export class S3Backend implements StorageBackendAdapter { // Some S3-compatible backends (e.g. GCS) do not support DeleteObjects; fall back to individual deletes const code = (e as { Code?: string; name?: string })?.Code ?? (e as { name?: string })?.name if (code === 'NotImplemented') { - return this.deleteObjectsIndividually(bucket, prefixes) - } - throw StorageBackendError.fromError(e) - } - } - - /** - * Deletes objects one-by-one in parallel. - * Used when batch delete is disabled or when the S3 backend returns NotImplemented. - * NoSuchKey errors are silently ignored (object already gone). - * @param bucket - * @param prefixes - */ - private async deleteObjectsIndividually(bucket: string, prefixes: string[]): Promise { - const results = await Promise.allSettled( - prefixes.map((key) => - this.client.send(new DeleteObjectCommand({ Bucket: bucket, Key: key })) - ) - ) - for (const result of results) { - if (result.status === 'rejected') { - const errCode = - (result.reason as { Code?: string })?.Code ?? (result.reason as { name?: string })?.name - if (errCode !== 'NoSuchKey') { - throw StorageBackendError.fromError(result.reason) + const results = await Promise.allSettled( + prefixes.map((key) => + this.client.send(new DeleteObjectCommand({ Bucket: bucket, Key: key })) + ) + ) + for (const result of results) { + if (result.status === 'rejected') { + const errCode = + (result.reason as { Code?: string })?.Code ?? + (result.reason as { name?: string })?.name + if (errCode !== 'NoSuchKey') { + throw StorageBackendError.fromError(result.reason) + } + } } + return } + throw StorageBackendError.fromError(e) } } diff --git a/src/storage/protocols/tus/s3-locker.ts b/src/storage/protocols/tus/s3-locker.ts index 0184e1a24..e5c20c16f 100644 --- a/src/storage/protocols/tus/s3-locker.ts +++ b/src/storage/protocols/tus/s3-locker.ts @@ -249,9 +249,11 @@ export class S3Locker implements Locker { const batch = expiredLocks.slice(i, i + 1000) if (!this.batchDeleteEnabled) { - // Batch delete explicitly disabled — use individual deletes directly - await this.deleteLocksIndividually(batch) - this.logger.log(`Cleaned up ${batch.length} expired locks (individual, batch disabled)`) + await Promise.allSettled( + batch.map((key) => + this.s3Client.send(new DeleteObjectCommand({ Bucket: this.bucket, Key: key })) + ) + ) continue } @@ -271,7 +273,21 @@ export class S3Locker implements Locker { // fall back to individual deletes so zombie-lock cleanup still works. const code = error?.Code ?? error?.name if (code === 'NotImplemented') { - await this.deleteLocksIndividually(batch) + const results = await Promise.allSettled( + batch.map((key) => + this.s3Client.send(new DeleteObjectCommand({ Bucket: this.bucket, Key: key })) + ) + ) + for (const result of results) { + if (result.status === 'rejected') { + const errCode = + (result.reason as { Code?: string })?.Code ?? + (result.reason as { name?: string })?.name + if (errCode !== 'NoSuchKey') { + this.logger.warn(`Failed to delete expired lock in fallback:`, result.reason) + } + } + } this.logger.log( `Cleaned up ${batch.length} expired locks in batch (individual fallback)` ) @@ -303,28 +319,6 @@ export class S3Locker implements Locker { } } - /** - * Deletes a batch of lock keys one-by-one in parallel. - * NoSuchKey is ignored (lock already gone). Other errors are logged as warnings. - */ - private async deleteLocksIndividually(keys: string[]): Promise { - const results = await Promise.allSettled( - keys.map((key) => - this.s3Client.send(new DeleteObjectCommand({ Bucket: this.bucket, Key: key })) - ) - ) - for (const result of results) { - if (result.status === 'rejected') { - const errCode = - (result.reason as { Code?: string })?.Code ?? - (result.reason as { name?: string })?.name - if (errCode !== 'NoSuchKey') { - this.logger.warn(`Failed to delete expired lock in fallback:`, result.reason) - } - } - } - } - private async checkAndCleanupExpiredLock(lockKey: string, signal: AbortSignal): Promise { if (signal.aborted) { return false From d736718f4247b211a632f48f4f1061e7e5e41404 Mon Sep 17 00:00:00 2001 From: Ameya Naik Date: Thu, 19 Mar 2026 13:01:23 -0700 Subject: [PATCH 05/11] fix: remove NotImplemented runtime fallback, clean up formatting Now that STORAGE_S3_BATCH_DELETE_ENABLED handles the GCS case explicitly, the runtime NotImplemented fallback is no longer needed. --- src/http/routes/tus/index.ts | 32 ++++++++--------- src/storage/backend/s3/adapter.ts | 20 ----------- src/storage/protocols/tus/s3-locker.ts | 49 ++++++-------------------- 3 files changed, 26 insertions(+), 75 deletions(-) diff --git a/src/http/routes/tus/index.ts b/src/http/routes/tus/index.ts index e3cedf6d1..0af546525 100644 --- a/src/http/routes/tus/index.ts +++ b/src/http/routes/tus/index.ts @@ -258,14 +258,14 @@ const authenticatedRoutes = fastifyPlugin( }) fastify.addHook('preHandler', async (req) => { - ; (req.raw as MultiPartRequest).log = req.log - ; (req.raw as MultiPartRequest).upload = { - storage: req.storage, - owner: req.owner, - tenantId: req.tenantId, - db: req.db, - isUpsert: req.headers['x-upsert'] === 'true', - } + ;(req.raw as MultiPartRequest).log = req.log + ;(req.raw as MultiPartRequest).upload = { + storage: req.storage, + owner: req.owner, + tenantId: req.tenantId, + db: req.db, + isUpsert: req.headers['x-upsert'] === 'true', + } }) fastify.post( @@ -360,14 +360,14 @@ const publicRoutes = fastifyPlugin( ) fastify.addHook('preHandler', async (req) => { - ; (req.raw as MultiPartRequest).log = req.log - ; (req.raw as MultiPartRequest).upload = { - storage: req.storage, - owner: req.owner, - tenantId: req.tenantId, - db: req.db, - isUpsert: req.headers['x-upsert'] === 'true', - } + ;(req.raw as MultiPartRequest).log = req.log + ;(req.raw as MultiPartRequest).upload = { + storage: req.storage, + owner: req.owner, + tenantId: req.tenantId, + db: req.db, + isUpsert: req.headers['x-upsert'] === 'true', + } }) fastify.options( diff --git a/src/storage/backend/s3/adapter.ts b/src/storage/backend/s3/adapter.ts index 6faae34b9..2938db607 100644 --- a/src/storage/backend/s3/adapter.ts +++ b/src/storage/backend/s3/adapter.ts @@ -352,26 +352,6 @@ export class S3Backend implements StorageBackendAdapter { }) await this.client.send(command) } catch (e) { - // Some S3-compatible backends (e.g. GCS) do not support DeleteObjects; fall back to individual deletes - const code = (e as { Code?: string; name?: string })?.Code ?? (e as { name?: string })?.name - if (code === 'NotImplemented') { - const results = await Promise.allSettled( - prefixes.map((key) => - this.client.send(new DeleteObjectCommand({ Bucket: bucket, Key: key })) - ) - ) - for (const result of results) { - if (result.status === 'rejected') { - const errCode = - (result.reason as { Code?: string })?.Code ?? - (result.reason as { name?: string })?.name - if (errCode !== 'NoSuchKey') { - throw StorageBackendError.fromError(result.reason) - } - } - } - return - } throw StorageBackendError.fromError(e) } } diff --git a/src/storage/protocols/tus/s3-locker.ts b/src/storage/protocols/tus/s3-locker.ts index e5c20c16f..98c2d5441 100644 --- a/src/storage/protocols/tus/s3-locker.ts +++ b/src/storage/protocols/tus/s3-locker.ts @@ -19,7 +19,6 @@ export interface S3LockerOptions { renewalIntervalMs?: number maxRetries?: number retryDelayMs?: number - /** When false, skips DeleteObjectsCommand in zombie-lock cleanup and uses individual deletes. Default: true */ batchDeleteEnabled?: boolean logger?: Pick } @@ -257,44 +256,16 @@ export class S3Locker implements Locker { continue } - try { - await this.s3Client.send( - new DeleteObjectsCommand({ - Bucket: this.bucket, - Delete: { - Objects: batch.map((key) => ({ Key: key })), - Quiet: true, - }, - }) - ) - this.logger.log(`Cleaned up ${batch.length} expired locks in batch`) - } catch (error: any) { - // Some S3-compatible backends (e.g. GCS) do not support DeleteObjects; - // fall back to individual deletes so zombie-lock cleanup still works. - const code = error?.Code ?? error?.name - if (code === 'NotImplemented') { - const results = await Promise.allSettled( - batch.map((key) => - this.s3Client.send(new DeleteObjectCommand({ Bucket: this.bucket, Key: key })) - ) - ) - for (const result of results) { - if (result.status === 'rejected') { - const errCode = - (result.reason as { Code?: string })?.Code ?? - (result.reason as { name?: string })?.name - if (errCode !== 'NoSuchKey') { - this.logger.warn(`Failed to delete expired lock in fallback:`, result.reason) - } - } - } - this.logger.log( - `Cleaned up ${batch.length} expired locks in batch (individual fallback)` - ) - } else { - this.logger.warn(`Failed to delete batch of expired locks:`, error) - } - } + await this.s3Client.send( + new DeleteObjectsCommand({ + Bucket: this.bucket, + Delete: { + Objects: batch.map((key) => ({ Key: key })), + Quiet: true, + }, + }) + ) + this.logger.log(`Cleaned up ${batch.length} expired locks in batch`) } } From 077adc0a1b0dfaa4d6d5e5a0bc31f667e4470775 Mon Sep 17 00:00:00 2001 From: Ameya Naik Date: Thu, 19 Mar 2026 13:11:12 -0700 Subject: [PATCH 06/11] fix: restore original try-catch with NotImplemented fallback, add flag check inside it When batchDeleteEnabled is false, a synthetic NotImplemented is thrown inside the try block so it routes through the existing fallback path, keeping individual-delete logic in a single place. --- src/storage/backend/s3/adapter.ts | 36 +++++++++++++++++- src/storage/protocols/tus/s3-locker.ts | 51 +++++++++++++++++--------- 2 files changed, 68 insertions(+), 19 deletions(-) diff --git a/src/storage/backend/s3/adapter.ts b/src/storage/backend/s3/adapter.ts index 2938db607..e8effdd7d 100644 --- a/src/storage/backend/s3/adapter.ts +++ b/src/storage/backend/s3/adapter.ts @@ -333,9 +333,21 @@ export class S3Backend implements StorageBackendAdapter { const { storageS3BatchDeleteEnabled } = getConfig() if (!storageS3BatchDeleteEnabled) { - await Promise.allSettled( - prefixes.map((key) => this.client.send(new DeleteObjectCommand({ Bucket: bucket, Key: key }))) + const results = await Promise.allSettled( + prefixes.map((key) => + this.client.send(new DeleteObjectCommand({ Bucket: bucket, Key: key })) + ) ) + for (const result of results) { + if (result.status === 'rejected') { + const errCode = + (result.reason as { Code?: string })?.Code ?? + (result.reason as { name?: string })?.name + if (errCode !== 'NoSuchKey') { + throw StorageBackendError.fromError(result.reason) + } + } + } return } @@ -352,6 +364,26 @@ export class S3Backend implements StorageBackendAdapter { }) await this.client.send(command) } catch (e) { + // Some S3-compatible backends (e.g. GCS) do not support DeleteObjects; fall back to individual deletes + const code = (e as { Code?: string; name?: string })?.Code ?? (e as { name?: string })?.name + if (code === 'NotImplemented') { + const results = await Promise.allSettled( + prefixes.map((key) => + this.client.send(new DeleteObjectCommand({ Bucket: bucket, Key: key })) + ) + ) + for (const result of results) { + if (result.status === 'rejected') { + const errCode = + (result.reason as { Code?: string })?.Code ?? + (result.reason as { name?: string })?.name + if (errCode !== 'NoSuchKey') { + throw StorageBackendError.fromError(result.reason) + } + } + } + return + } throw StorageBackendError.fromError(e) } } diff --git a/src/storage/protocols/tus/s3-locker.ts b/src/storage/protocols/tus/s3-locker.ts index 98c2d5441..344f1efea 100644 --- a/src/storage/protocols/tus/s3-locker.ts +++ b/src/storage/protocols/tus/s3-locker.ts @@ -247,25 +247,42 @@ export class S3Locker implements Locker { for (let i = 0; i < expiredLocks.length; i += 1000) { const batch = expiredLocks.slice(i, i + 1000) - if (!this.batchDeleteEnabled) { - await Promise.allSettled( - batch.map((key) => - this.s3Client.send(new DeleteObjectCommand({ Bucket: this.bucket, Key: key })) - ) + try { + if (!this.batchDeleteEnabled) { + throw Object.assign(new Error('NotImplemented'), { name: 'NotImplemented' }) + } + await this.s3Client.send( + new DeleteObjectsCommand({ + Bucket: this.bucket, + Delete: { + Objects: batch.map((key) => ({ Key: key })), + Quiet: true, + }, + }) ) - continue + this.logger.log(`Cleaned up ${batch.length} expired locks in batch`) + } catch (error: any) { + const code = error?.Code ?? error?.name + if (code === 'NotImplemented') { + const results = await Promise.allSettled( + batch.map((key) => + this.s3Client.send(new DeleteObjectCommand({ Bucket: this.bucket, Key: key })) + ) + ) + for (const result of results) { + if (result.status === 'rejected') { + const errCode = + (result.reason as { Code?: string })?.Code ?? + (result.reason as { name?: string })?.name + if (errCode !== 'NoSuchKey') { + this.logger.warn(`Failed to delete expired lock in fallback:`, result.reason) + } + } + } + } else { + this.logger.warn(`Failed to delete batch of expired locks:`, error) + } } - - await this.s3Client.send( - new DeleteObjectsCommand({ - Bucket: this.bucket, - Delete: { - Objects: batch.map((key) => ({ Key: key })), - Quiet: true, - }, - }) - ) - this.logger.log(`Cleaned up ${batch.length} expired locks in batch`) } } From bb238804d72952bedc5c1630bb4811937af2c02d Mon Sep 17 00:00:00 2001 From: Ameya Naik Date: Thu, 19 Mar 2026 13:42:03 -0700 Subject: [PATCH 07/11] feat: add STORAGE_S3_BATCH_DELETE_ENABLED flag to prefer individual deletes When STORAGE_S3_BATCH_DELETE_ENABLED=false (default: true), prefer individual DeleteObjectCommand calls instead of DeleteObjectsCommand. Both S3Backend.deleteObjects and S3Locker.cleanupZombieLocks route through the existing NotImplemented catch block, keeping all individual- delete logic in one place with no structural changes to existing code. --- src/config.ts | 11 +- src/http/routes/tus/index.ts | 34 +++--- src/storage/backend/s3/adapter.ts | 5 + src/storage/protocols/tus/s3-locker.ts | 31 ++++- src/test/s3-adapter.test.ts | 39 ++++++ src/test/s3-locker.test.ts | 159 +++++++++++++++++++++++++ 6 files changed, 256 insertions(+), 23 deletions(-) diff --git a/src/config.ts b/src/config.ts index 7b573c1c7..4ed421380 100644 --- a/src/config.ts +++ b/src/config.ts @@ -72,6 +72,7 @@ type StorageConfigType = { storageS3ForcePathStyle?: boolean storageS3Region: string storageS3ClientTimeout: number + storageS3BatchDeleteEnabled: boolean isMultitenant: boolean jwtSecret: string jwtAlgorithm: string @@ -265,8 +266,8 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { tenantId: isMultitenant ? '' : getOptionalConfigFromEnv('PROJECT_REF') || - getOptionalConfigFromEnv('TENANT_ID') || - 'storage-single-tenant', + getOptionalConfigFromEnv('TENANT_ID') || + 'storage-single-tenant', // Server region: getOptionalConfigFromEnv('SERVER_REGION', 'REGION') || 'not-specified', @@ -376,6 +377,8 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { 'true', storageS3Region: getOptionalConfigFromEnv('STORAGE_S3_REGION', 'REGION') as string, storageS3ClientTimeout: Number(getOptionalConfigFromEnv('STORAGE_S3_CLIENT_TIMEOUT') || `0`), + storageS3BatchDeleteEnabled: + getOptionalConfigFromEnv('STORAGE_S3_BATCH_DELETE_ENABLED') !== 'false', // DB - Migrations dbAnonRole: getOptionalConfigFromEnv('DB_ANON_ROLE') || 'anon', @@ -539,12 +542,12 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { size: { min: parseInt( getOptionalConfigFromEnv('IMAGE_TRANSFORMATION_LIMIT_MIN_SIZE', 'IMG_LIMITS_MIN_SIZE') || - '1', + '1', 10 ), max: parseInt( getOptionalConfigFromEnv('IMAGE_TRANSFORMATION_LIMIT_MAX_SIZE', 'IMG_LIMITS_MAX_SIZE') || - '2000', + '2000', 10 ), }, diff --git a/src/http/routes/tus/index.ts b/src/http/routes/tus/index.ts index b75f88cab..e3cedf6d1 100644 --- a/src/http/routes/tus/index.ts +++ b/src/http/routes/tus/index.ts @@ -36,6 +36,7 @@ const { storageS3ForcePathStyle, storageS3Region, storageS3ClientTimeout, + storageS3BatchDeleteEnabled, tusUrlExpiryMs, tusPath, tusPartSize, @@ -117,6 +118,7 @@ function createTusServer( maxRetries: 10, retryDelayMs: 250, renewalIntervalMs: 10 * 1000, // 10 seconds + batchDeleteEnabled: storageS3BatchDeleteEnabled, s3Client: new S3Client({ requestHandler: new NodeHttpHandler({ ...agent, @@ -256,14 +258,14 @@ const authenticatedRoutes = fastifyPlugin( }) fastify.addHook('preHandler', async (req) => { - ;(req.raw as MultiPartRequest).log = req.log - ;(req.raw as MultiPartRequest).upload = { - storage: req.storage, - owner: req.owner, - tenantId: req.tenantId, - db: req.db, - isUpsert: req.headers['x-upsert'] === 'true', - } + ; (req.raw as MultiPartRequest).log = req.log + ; (req.raw as MultiPartRequest).upload = { + storage: req.storage, + owner: req.owner, + tenantId: req.tenantId, + db: req.db, + isUpsert: req.headers['x-upsert'] === 'true', + } }) fastify.post( @@ -358,14 +360,14 @@ const publicRoutes = fastifyPlugin( ) fastify.addHook('preHandler', async (req) => { - ;(req.raw as MultiPartRequest).log = req.log - ;(req.raw as MultiPartRequest).upload = { - storage: req.storage, - owner: req.owner, - tenantId: req.tenantId, - db: req.db, - isUpsert: req.headers['x-upsert'] === 'true', - } + ; (req.raw as MultiPartRequest).log = req.log + ; (req.raw as MultiPartRequest).upload = { + storage: req.storage, + owner: req.owner, + tenantId: req.tenantId, + db: req.db, + isUpsert: req.headers['x-upsert'] === 'true', + } }) fastify.options( diff --git a/src/storage/backend/s3/adapter.ts b/src/storage/backend/s3/adapter.ts index ca1fd8112..2c3d91196 100644 --- a/src/storage/backend/s3/adapter.ts +++ b/src/storage/backend/s3/adapter.ts @@ -330,7 +330,12 @@ export class S3Backend implements StorageBackendAdapter { * @param prefixes */ async deleteObjects(bucket: string, prefixes: string[]): Promise { + const { storageS3BatchDeleteEnabled } = getConfig() + try { + if (!storageS3BatchDeleteEnabled) { + throw Object.assign(new Error('NotImplemented'), { name: 'NotImplemented' }) + } const s3Prefixes = prefixes.map((ele) => { return { Key: ele } }) diff --git a/src/storage/protocols/tus/s3-locker.ts b/src/storage/protocols/tus/s3-locker.ts index 7e75a4901..344f1efea 100644 --- a/src/storage/protocols/tus/s3-locker.ts +++ b/src/storage/protocols/tus/s3-locker.ts @@ -19,6 +19,7 @@ export interface S3LockerOptions { renewalIntervalMs?: number maxRetries?: number retryDelayMs?: number + batchDeleteEnabled?: boolean logger?: Pick } @@ -37,6 +38,7 @@ export class S3Locker implements Locker { private readonly renewalIntervalMs: number private readonly maxRetries: number private readonly retryDelayMs: number + private readonly batchDeleteEnabled: boolean private readonly logger: Pick private readonly notifier: LockNotifier @@ -49,6 +51,7 @@ export class S3Locker implements Locker { this.renewalIntervalMs = options.renewalIntervalMs || 10000 // 10 seconds this.maxRetries = options.maxRetries || 10 this.retryDelayMs = options.retryDelayMs || 500 + this.batchDeleteEnabled = options.batchDeleteEnabled !== false // default true this.logger = options.logger || console // Validate configuration @@ -245,6 +248,9 @@ export class S3Locker implements Locker { const batch = expiredLocks.slice(i, i + 1000) try { + if (!this.batchDeleteEnabled) { + throw Object.assign(new Error('NotImplemented'), { name: 'NotImplemented' }) + } await this.s3Client.send( new DeleteObjectsCommand({ Bucket: this.bucket, @@ -255,8 +261,27 @@ export class S3Locker implements Locker { }) ) this.logger.log(`Cleaned up ${batch.length} expired locks in batch`) - } catch (error) { - this.logger.warn(`Failed to delete batch of expired locks:`, error) + } catch (error: any) { + const code = error?.Code ?? error?.name + if (code === 'NotImplemented') { + const results = await Promise.allSettled( + batch.map((key) => + this.s3Client.send(new DeleteObjectCommand({ Bucket: this.bucket, Key: key })) + ) + ) + for (const result of results) { + if (result.status === 'rejected') { + const errCode = + (result.reason as { Code?: string })?.Code ?? + (result.reason as { name?: string })?.name + if (errCode !== 'NoSuchKey') { + this.logger.warn(`Failed to delete expired lock in fallback:`, result.reason) + } + } + } + } else { + this.logger.warn(`Failed to delete batch of expired locks:`, error) + } } } } @@ -350,7 +375,7 @@ export class S3Lock implements Lock { private readonly id: string, private readonly locker: S3Locker, private readonly notifier: LockNotifier - ) {} + ) { } async lock(stopSignal: AbortSignal, cancelReq: RequestRelease): Promise { // Set up abort handler to clean up in case of abort diff --git a/src/test/s3-adapter.test.ts b/src/test/s3-adapter.test.ts index 036400004..9e153697d 100644 --- a/src/test/s3-adapter.test.ts +++ b/src/test/s3-adapter.test.ts @@ -2,6 +2,7 @@ import { S3Client } from '@aws-sdk/client-s3' import { Readable } from 'stream' +import * as config from '../config' import { S3Backend } from '../storage/backend/s3/adapter' jest.mock('@aws-sdk/client-s3', () => { @@ -140,5 +141,43 @@ describe('S3Backend', () => { await expect(backend.deleteObjects('test-bucket', ['file1.txt'])).rejects.toThrow() expect(mockSend).toHaveBeenCalledTimes(1) }) + + test('should skip DeleteObjectsCommand and use individual deletes when batchDeleteEnabled is false', async () => { + const getConfigSpy = jest + .spyOn(config, 'getConfig') + .mockReturnValue({ storageS3BatchDeleteEnabled: false } as any) + mockSend.mockResolvedValue({ $metadata: { httpStatusCode: 204 } }) + + try { + const backend = new S3Backend({ region: 'us-east-1', endpoint: 'http://localhost:9000' }) + await backend.deleteObjects('test-bucket', ['file1.txt', 'file2.txt']) + + // No DeleteObjectsCommand call — straight to individual deletes + expect(mockSend).toHaveBeenCalledTimes(2) + expect(mockSend.mock.calls[0][0].constructor.name).toBe('DeleteObjectCommand') + expect(mockSend.mock.calls[1][0].constructor.name).toBe('DeleteObjectCommand') + } finally { + getConfigSpy.mockRestore() + } + }) + + test('should ignore NoSuchKey when batchDeleteEnabled is false', async () => { + const getConfigSpy = jest + .spyOn(config, 'getConfig') + .mockReturnValue({ storageS3BatchDeleteEnabled: false } as any) + const noSuchKey = Object.assign(new Error('NoSuchKey'), { Code: 'NoSuchKey' }) + mockSend + .mockResolvedValueOnce({ $metadata: { httpStatusCode: 204 } }) + .mockRejectedValueOnce(noSuchKey) + + try { + const backend = new S3Backend({ region: 'us-east-1', endpoint: 'http://localhost:9000' }) + await expect( + backend.deleteObjects('test-bucket', ['file1.txt', 'file2.txt']) + ).resolves.toBeUndefined() + } finally { + getConfigSpy.mockRestore() + } + }) }) }) diff --git a/src/test/s3-locker.test.ts b/src/test/s3-locker.test.ts index 8954d045e..cd8c1a445 100644 --- a/src/test/s3-locker.test.ts +++ b/src/test/s3-locker.test.ts @@ -782,4 +782,163 @@ describe('S3Locker', () => { await lock.unlock() }) }) + + describe('cleanupZombieLocks – NotImplemented fallback', () => { + // Helper: place a pre-expired lock object in S3 + async function putExpiredLock(key: string) { + await s3Client.send( + new PutObjectCommand({ + Bucket: testBucket, + Key: key, + Body: JSON.stringify({ + lockId: key, + expiresAt: Date.now() - 10000, + createdAt: Date.now() - 20000, + renewedAt: Date.now() - 20000, + }), + ContentType: 'application/json', + }) + ) + } + + test('falls back to individual deletes when DeleteObjectsCommand returns NotImplemented', async () => { + const lockKey = 'test-locks/not-impl-fallback-lock.lock' + await putExpiredLock(lockKey) + + const originalSend = s3Client.send.bind(s3Client) + const sendSpy = jest.spyOn(s3Client, 'send').mockImplementation(async (command: any) => { + // Only reject the batch delete; let everything else through + if (command.constructor.name === 'DeleteObjectsCommand') { + const err: any = new Error('NotImplemented') + err.name = 'NotImplemented' + throw err + } + return originalSend(command) + }) + + try { + // Should not throw – fallback handles the error + await expect(locker.cleanupZombieLocks()).resolves.not.toThrow() + } finally { + sendSpy.mockRestore() + // Cleanup in case individual delete didn't run + try { + await originalSend( + // @ts-ignore - manual cleanup + new (await import('@aws-sdk/client-s3')).DeleteObjectCommand({ + Bucket: testBucket, + Key: lockKey, + }) + ) + } catch { + // ignore + } + } + }) + + test('ignores NoSuchKey errors for individual deletes in NotImplemented fallback', async () => { + const lockKey = 'test-locks/not-impl-nosuchkey-lock.lock' + await putExpiredLock(lockKey) + + const originalSend = s3Client.send.bind(s3Client) + const sendSpy = jest.spyOn(s3Client, 'send').mockImplementation(async (command: any) => { + if (command.constructor.name === 'DeleteObjectsCommand') { + const err: any = new Error('NotImplemented') + err.name = 'NotImplemented' + throw err + } + if (command.constructor.name === 'DeleteObjectCommand') { + const err: any = new Error('NoSuchKey') + err.name = 'NoSuchKey' + throw err + } + return originalSend(command) + }) + + try { + // NoSuchKey on individual deletes must be swallowed; method must not throw + await expect(locker.cleanupZombieLocks()).resolves.not.toThrow() + } finally { + sendSpy.mockRestore() + } + }) + + test('warns (but does not throw) on real individual-delete errors in fallback', async () => { + const lockKey = 'test-locks/not-impl-real-err-lock.lock' + await putExpiredLock(lockKey) + + const mockWarn = jest.fn() + const warnLocker = new S3Locker({ + s3Client, + bucket: testBucket, + notifier: mockNotifier, + keyPrefix: 'test-locks/', + lockTtlMs: 5000, + renewalIntervalMs: 1000, + logger: { log: jest.fn(), warn: mockWarn, error: jest.fn() }, + }) + + const originalSend = s3Client.send.bind(s3Client) + const sendSpy = jest.spyOn(s3Client, 'send').mockImplementation(async (command: any) => { + if (command.constructor.name === 'DeleteObjectsCommand') { + const err: any = new Error('NotImplemented') + err.name = 'NotImplemented' + throw err + } + if (command.constructor.name === 'DeleteObjectCommand') { + const err: any = new Error('AccessDenied') + err.name = 'AccessDenied' + throw err + } + return originalSend(command) + }) + + try { + await expect(warnLocker.cleanupZombieLocks()).resolves.not.toThrow() + // The individual failure should be logged as a warning + expect(mockWarn).toHaveBeenCalledWith( + expect.stringContaining('Failed to delete expired lock in fallback:'), + expect.anything() + ) + } finally { + sendSpy.mockRestore() + } + }) + + test('skips DeleteObjectsCommand entirely when batchDeleteEnabled is false', async () => { + const lockKey = 'test-locks/batch-disabled-lock.lock' + await putExpiredLock(lockKey) + + const deletedKeys: string[] = [] + const originalSend = s3Client.send.bind(s3Client) + const sendSpy = jest.spyOn(s3Client, 'send').mockImplementation(async (command: any) => { + if (command.constructor.name === 'DeleteObjectsCommand') { + throw new Error('Should not have called DeleteObjectsCommand when batch is disabled') + } + if (command.constructor.name === 'DeleteObjectCommand') { + deletedKeys.push(command.input.Key) + } + return originalSend(command) + }) + + const batchDisabledLocker = new S3Locker({ + s3Client, + bucket: testBucket, + notifier: mockNotifier, + keyPrefix: 'test-locks/', + lockTtlMs: 5000, + renewalIntervalMs: 1000, + batchDeleteEnabled: false, + logger: { log: jest.fn(), warn: jest.fn(), error: jest.fn() }, + }) + + try { + await expect(batchDisabledLocker.cleanupZombieLocks()).resolves.not.toThrow() + // The expired lock should have been deleted via an individual DeleteObjectCommand + expect(deletedKeys).toContain(lockKey) + } finally { + sendSpy.mockRestore() + } + }) + }) }) From 50c59ad862f0073242a686016f4d56fa368d9df4 Mon Sep 17 00:00:00 2001 From: Ameya Naik Date: Thu, 19 Mar 2026 13:44:40 -0700 Subject: [PATCH 08/11] fix: remove duplicate storageS3BatchDeleteEnabled from merge --- src/config.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/config.ts b/src/config.ts index 1435d137d..34e14710f 100644 --- a/src/config.ts +++ b/src/config.ts @@ -73,7 +73,6 @@ type StorageConfigType = { storageS3ForcePathStyle?: boolean storageS3Region: string storageS3ClientTimeout: number - storageS3BatchDeleteEnabled: boolean isMultitenant: boolean jwtSecret: string jwtAlgorithm: string @@ -380,8 +379,6 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { 'true', storageS3Region: getOptionalConfigFromEnv('STORAGE_S3_REGION', 'REGION') as string, storageS3ClientTimeout: Number(getOptionalConfigFromEnv('STORAGE_S3_CLIENT_TIMEOUT') || `0`), - storageS3BatchDeleteEnabled: - getOptionalConfigFromEnv('STORAGE_S3_BATCH_DELETE_ENABLED') !== 'false', // DB - Migrations dbAnonRole: getOptionalConfigFromEnv('DB_ANON_ROLE') || 'anon', From b36e5d8bacc02f2aefa63b09a5d68f3a9e2249a3 Mon Sep 17 00:00:00 2001 From: Ameya Naik Date: Thu, 19 Mar 2026 13:45:52 -0700 Subject: [PATCH 09/11] fix: restore original formatting on untouched lines --- src/storage/backend/s3/adapter.ts | 26 +++++++++++++------------- src/storage/protocols/tus/s3-locker.ts | 2 +- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/storage/backend/s3/adapter.ts b/src/storage/backend/s3/adapter.ts index ee43e9fa9..2c3d91196 100644 --- a/src/storage/backend/s3/adapter.ts +++ b/src/storage/backend/s3/adapter.ts @@ -185,14 +185,14 @@ export class S3Backend implements StorageBackendAdapter { const metadata = hasUploadedBytes ? await this.headObject(bucketName, key, version) : { - httpStatusCode: 200, - eTag: data.ETag || '', - mimetype: contentType, - lastModified: new Date(), - size: 0, - contentLength: 0, - contentRange: undefined, - } + httpStatusCode: 200, + eTag: data.ETag || '', + mimetype: contentType, + lastModified: new Date(), + size: 0, + contentLength: 0, + contentRange: undefined, + } return { httpStatusCode: data.$metadata.httpStatusCode || metadata.httpStatusCode, @@ -463,9 +463,9 @@ export class S3Backend implements StorageBackendAdapter { ContentType: contentType, Metadata: metadata ? { - ...metadata, - Version: version || '', - } + ...metadata, + Version: version || '', + } : undefined, }) @@ -544,8 +544,8 @@ export class S3Backend implements StorageBackendAdapter { parts.length === 0 ? undefined : { - Parts: parts, - }, + Parts: parts, + }, }) const response = await this.client.send(completeUpload) diff --git a/src/storage/protocols/tus/s3-locker.ts b/src/storage/protocols/tus/s3-locker.ts index 344f1efea..8ff0cc502 100644 --- a/src/storage/protocols/tus/s3-locker.ts +++ b/src/storage/protocols/tus/s3-locker.ts @@ -375,7 +375,7 @@ export class S3Lock implements Lock { private readonly id: string, private readonly locker: S3Locker, private readonly notifier: LockNotifier - ) { } + ) {} async lock(stopSignal: AbortSignal, cancelReq: RequestRelease): Promise { // Set up abort handler to clean up in case of abort From de221c169df051a850ddf4f4bc4e6008afc59fbc Mon Sep 17 00:00:00 2001 From: Ameya Naik Date: Thu, 19 Mar 2026 13:46:51 -0700 Subject: [PATCH 10/11] fix: restore original semicolon formatting in preHandler hooks --- src/http/routes/tus/index.ts | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/http/routes/tus/index.ts b/src/http/routes/tus/index.ts index e3cedf6d1..0af546525 100644 --- a/src/http/routes/tus/index.ts +++ b/src/http/routes/tus/index.ts @@ -258,14 +258,14 @@ const authenticatedRoutes = fastifyPlugin( }) fastify.addHook('preHandler', async (req) => { - ; (req.raw as MultiPartRequest).log = req.log - ; (req.raw as MultiPartRequest).upload = { - storage: req.storage, - owner: req.owner, - tenantId: req.tenantId, - db: req.db, - isUpsert: req.headers['x-upsert'] === 'true', - } + ;(req.raw as MultiPartRequest).log = req.log + ;(req.raw as MultiPartRequest).upload = { + storage: req.storage, + owner: req.owner, + tenantId: req.tenantId, + db: req.db, + isUpsert: req.headers['x-upsert'] === 'true', + } }) fastify.post( @@ -360,14 +360,14 @@ const publicRoutes = fastifyPlugin( ) fastify.addHook('preHandler', async (req) => { - ; (req.raw as MultiPartRequest).log = req.log - ; (req.raw as MultiPartRequest).upload = { - storage: req.storage, - owner: req.owner, - tenantId: req.tenantId, - db: req.db, - isUpsert: req.headers['x-upsert'] === 'true', - } + ;(req.raw as MultiPartRequest).log = req.log + ;(req.raw as MultiPartRequest).upload = { + storage: req.storage, + owner: req.owner, + tenantId: req.tenantId, + db: req.db, + isUpsert: req.headers['x-upsert'] === 'true', + } }) fastify.options( From 7940539db7a4e11e84457f284cbe20ca16b9291f Mon Sep 17 00:00:00 2001 From: Ameya Naik Date: Thu, 19 Mar 2026 13:48:44 -0700 Subject: [PATCH 11/11] fix: restore original indentation on untouched config.ts lines --- src/config.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/config.ts b/src/config.ts index 34e14710f..de9823ccb 100644 --- a/src/config.ts +++ b/src/config.ts @@ -266,8 +266,8 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { tenantId: isMultitenant ? '' : getOptionalConfigFromEnv('PROJECT_REF') || - getOptionalConfigFromEnv('TENANT_ID') || - 'storage-single-tenant', + getOptionalConfigFromEnv('TENANT_ID') || + 'storage-single-tenant', // Server region: getOptionalConfigFromEnv('SERVER_REGION', 'REGION') || 'not-specified', @@ -542,12 +542,12 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { size: { min: parseInt( getOptionalConfigFromEnv('IMAGE_TRANSFORMATION_LIMIT_MIN_SIZE', 'IMG_LIMITS_MIN_SIZE') || - '1', + '1', 10 ), max: parseInt( getOptionalConfigFromEnv('IMAGE_TRANSFORMATION_LIMIT_MAX_SIZE', 'IMG_LIMITS_MAX_SIZE') || - '2000', + '2000', 10 ), },