diff --git a/jest.config.cjs b/jest.config.cjs index be2cf47..2217beb 100644 --- a/jest.config.cjs +++ b/jest.config.cjs @@ -13,6 +13,7 @@ module.exports = { 'src/tests/dummy-connector/metadata-extraction.test.ts', 'src/http/axios-client-internal.test.ts', 'src/tests/event-data-size-limit/.*.test.ts', + 'src/tests/upload-failure/upload-failure.slow.test.ts', ], }, { @@ -33,6 +34,7 @@ module.exports = { '/src/tests/dummy-connector/metadata-extraction.test.ts', '/src/http/axios-client-internal.test.ts', '/src/tests/event-data-size-limit/size-limit-1.test.ts', + '/src/tests/upload-failure/upload-failure.slow.test.ts', ], }, ], diff --git a/src/mock-server/mock-server.interfaces.ts b/src/mock-server/mock-server.interfaces.ts index 0094407..67685c1 100644 --- a/src/mock-server/mock-server.interfaces.ts +++ b/src/mock-server/mock-server.interfaces.ts @@ -56,6 +56,15 @@ export interface RouteConfig { headers?: Record; /** Optional retry configuration for simulating failures before success */ retry?: RetryConfig; + /** + * Respond successfully for the first N requests, then return an error. + * Useful for partial-upload scenarios (first batch succeeds, second fails). + */ + succeedThenFail?: { + successCount: number; + errorStatus?: number; + errorBody?: unknown; + }; /** Optional delay in milliseconds before sending the response */ delay?: number; } diff --git a/src/mock-server/mock-server.ts b/src/mock-server/mock-server.ts index 2ad6ee7..fd81e03 100644 --- a/src/mock-server/mock-server.ts +++ b/src/mock-server/mock-server.ts @@ -216,18 +216,32 @@ export class MockServer { * Configures a route to return a specific status code and optional response body. */ public setRoute(config: RouteConfig): void { - const { path, method, status, body, bodyBuffer, retry, headers, delay } = + const { path, method, status, body, bodyBuffer, retry, succeedThenFail, headers, delay } = config; const key = this.getRouteKey(method, path); - if (retry) { + if (retry || succeedThenFail) { this.requestCounts.set(key, 0); } this.routeHandlers.set(key, (req: ParsedRequest, res: MockResponse) => { const sendResponse = (responseDelay?: number) => { const send = () => { - if (retry) { + if (succeedThenFail) { + const currentCount = this.requestCounts.get(key) || 0; + this.requestCounts.set(key, currentCount + 1); + + if (currentCount < succeedThenFail.successCount) { + this.defaultRouteHandler(req, res); + } else { + const errorStatus = succeedThenFail.errorStatus ?? 400; + if (succeedThenFail.errorBody !== undefined) { + res.status(errorStatus).json(succeedThenFail.errorBody); + } else { + res.status(errorStatus).send(); + } + } + } else if (retry) { const currentCount = this.requestCounts.get(key) || 0; const failureCount = retry.failureCount ?? 4; const errorStatus = retry.errorStatus ?? 500; @@ -303,6 +317,16 @@ export class MockServer { this.requests = []; } + /** + * Returns all POST requests to the platform callback URL. + */ + public getCallbackRequests(): RequestInfo[] { + return this.requests.filter( + (req) => + req.method.toUpperCase() === 'POST' && req.url.includes('callback_url') + ); + } + /** * Returns the most recent request or undefined if no requests exist. */ diff --git a/src/multithreading/spawn/spawn.ts b/src/multithreading/spawn/spawn.ts index 0350d34..986a1b2 100644 --- a/src/multithreading/spawn/spawn.ts +++ b/src/multithreading/spawn/spawn.ts @@ -264,7 +264,9 @@ export class Spawn { const stringifiedArgs = message.payload?.stringifiedArgs; const level = message.payload?.level as LogLevel; const isSdkLog = message.payload?.isSdkLog ?? true; - this.logger.logFn(stringifiedArgs, level, isSdkLog); + if (typeof this.logger?.logFn === 'function') { + this.logger.logFn(stringifiedArgs, level, isSdkLog); + } } // If worker sends a message that it has emitted an event, then set alreadyEmitted to true. diff --git a/src/multithreading/worker-adapter/worker-adapter.emit.test.ts b/src/multithreading/worker-adapter/worker-adapter.emit.test.ts index 8afef41..9cd3f13 100644 --- a/src/multithreading/worker-adapter/worker-adapter.emit.test.ts +++ b/src/multithreading/worker-adapter/worker-adapter.emit.test.ts @@ -143,12 +143,11 @@ describe(`${WorkerAdapter.name}.emit`, () => { expect(mockPostMessage).toHaveBeenCalledTimes(1); }); - it('should correctly emit one event even if uploadAllRepos errors', async () => { + it('should log state size and current state before posting state', async () => { // Arrange + const logSpy = jest.spyOn(console, 'log').mockImplementation(); adapter['adapterState'].postState = jest.fn().mockResolvedValue(undefined); - adapter.uploadAllRepos = jest - .fn() - .mockRejectedValue(new Error('uploadAllRepos error')); + adapter.uploadAllRepos = jest.fn().mockResolvedValue(undefined); // Act await adapter.emit(ExtractorEventType.MetadataExtractionError, { @@ -157,7 +156,46 @@ describe(`${WorkerAdapter.name}.emit`, () => { }); // Assert - expect(mockPostMessage).toHaveBeenCalledTimes(1); + const stateLogCall = logSpy.mock.calls.find( + ([message]) => + typeof message === 'string' && + message.includes('Saving ') && + message.includes('Current state') + ); + expect(stateLogCall).toBeDefined(); + expect(stateLogCall?.[0]).toContain( + 'KB state before emitting event with event type: METADATA_EXTRACTION_ERROR. Current state' + ); + expect(stateLogCall?.[1]).toEqual( + expect.objectContaining({ attachments: { completed: false } }) + ); + }); + + it('should emit phase extraction error when uploadAllRepos fails', async () => { + const { emit: mockEmit } = require('../../common/control-protocol'); + adapter['adapterState'].postState = jest.fn().mockResolvedValue(undefined); + adapter.uploadAllRepos = jest + .fn() + .mockRejectedValue(new Error('uploadAllRepos error')); + + await adapter.emit(ExtractorEventType.DataExtractionDone); + + expect(mockEmit).toHaveBeenCalledWith( + expect.objectContaining({ + eventType: ExtractorEventType.DataExtractionError, + data: expect.objectContaining({ + error: expect.objectContaining({ + message: expect.stringContaining('uploadAllRepos error'), + }), + }), + }) + ); + expect(mockPostMessage).toHaveBeenCalledWith( + expect.objectContaining({ + subject: 'emit', + payload: { eventType: ExtractorEventType.DataExtractionError }, + }) + ); }); it('should include artifacts in data for extraction events', async () => { diff --git a/src/multithreading/worker-adapter/worker-adapter.ts b/src/multithreading/worker-adapter/worker-adapter.ts index 66948de..d65c221 100644 --- a/src/multithreading/worker-adapter/worker-adapter.ts +++ b/src/multithreading/worker-adapter/worker-adapter.ts @@ -12,7 +12,7 @@ import { addReportToLoaderReport, getFilesToLoad, } from './worker-adapter.helpers'; -import { serializeError } from '../../logger/logger'; +import { getPrintableState, serializeError } from '../../logger/logger'; import { runWithSdkLogContext, runWithUserLogContext, @@ -60,6 +60,7 @@ import { Uploader } from '../../uploader/uploader'; import { Artifact, SsorAttachment } from '../../uploader/uploader.interfaces'; import { translateOutgoingEventType } from '../../common/event-type-translation'; import { truncateMessage } from '../../common/helpers'; +import { getTimeoutErrorEventType } from '../spawn/spawn.helpers'; export function createWorkerAdapter({ event, @@ -297,127 +298,183 @@ export class WorkerAdapter { delete data.external_sync_units; } - // Upload all repos before emitting the event - console.log( - `Uploading all repos before emitting event with event type: ${newEventType}.` - ); + const canEmit = await this.beforeEmit(newEventType); + if (!canEmit) { + return; + } - try { - await this.uploadAllRepos(); - } catch (error) { - console.error('Error while uploading repos', error); - parentPort?.postMessage(WorkerMessageSubject.WorkerMessageExit); - this.hasWorkerEmitted = true; + const payload = this.buildEmitPayload(newEventType, data); + const sent = await this.sendToPlatform(newEventType, payload); + if (!sent) { return; } - // If the extraction is done, we want to save the timestamp of the last successful sync - if (newEventType === ExtractorEventType.AttachmentExtractionDone) { - console.log( - `Overwriting lastSuccessfulSyncStarted with lastSyncStarted (${this.state.lastSyncStarted}).` - ); + await this.afterEmit(newEventType); + }); + } - this.state.lastSuccessfulSyncStarted = this.state.lastSyncStarted; - this.state.lastSyncStarted = ''; + async uploadAllRepos(): Promise { + for (const repo of this.repos) { + await repo.upload(); + this.artifacts.push(...repo.uploadedArtifacts); + } + } - // Clear pending extraction boundaries now that the cycle is complete - this.state.pendingWorkersOldest = ''; - this.state.pendingWorkersNewest = ''; + private async beforeEmit( + eventType: ExtractorEventType | LoaderEventType + ): Promise { + // Upload all repos before emitting the event. + console.log( + `Uploading all repos before emitting event with event type: ${eventType}.` + ); - // Update workersOldest and workersNewest boundaries from resolved extraction timestamps. - // Expand boundaries: workersOldest gets the earliest timestamp, workersNewest gets the latest. - const extractionStart = this.event.payload.event_context.extract_from; - const extractionEnd = this.event.payload.event_context.extract_to; + try { + await this.uploadAllRepos(); + } catch (error) { + console.error('Error while uploading repos', error); + for (const repo of this.repos) { + this.artifacts = [...this.artifacts, ...repo.uploadedArtifacts]; + } + const { eventType: errorEventType } = getTimeoutErrorEventType( + this.event.payload.event_type + ); + await this.emitUploadFailure(errorEventType, error); + return false; + } - if ( - extractionStart && - (!this.state.workersOldest || - extractionStart < this.state.workersOldest) - ) { - console.log( - `Updating workersOldest from '${this.state.workersOldest}' to '${extractionStart}'.` - ); - this.state.workersOldest = extractionStart; - } + // If the extraction is done, save the timestamp of the last successful sync. + if (eventType === ExtractorEventType.AttachmentExtractionDone) { + console.log( + `Overwriting lastSuccessfulSyncStarted with lastSyncStarted (${this.state.lastSyncStarted}).` + ); - if ( - extractionEnd && - (!this.state.workersNewest || - extractionEnd > this.state.workersNewest) - ) { - console.log( - `Updating workersNewest from '${this.state.workersNewest}' to '${extractionEnd}'.` - ); - this.state.workersNewest = extractionEnd; - } - } + this.state.lastSuccessfulSyncStarted = this.state.lastSyncStarted; + this.state.lastSyncStarted = ''; + + // Clear pending extraction boundaries now that the cycle is complete. + this.state.pendingWorkersOldest = ''; + this.state.pendingWorkersNewest = ''; + + // Update workersOldest and workersNewest boundaries from resolved extraction timestamps. + const extractionStart = this.event.payload.event_context.extract_from; + const extractionEnd = this.event.payload.event_context.extract_to; - // We want to save the state every time we emit an event, except for the start and delete events - if (!STATELESS_EVENT_TYPES.includes(this.event.payload.event_type)) { + if ( + extractionStart && + (!this.state.workersOldest || extractionStart < this.state.workersOldest) + ) { console.log( - `Saving state before emitting event with event type: ${newEventType}.` + `Updating workersOldest from '${this.state.workersOldest}' to '${extractionStart}'.` ); + this.state.workersOldest = extractionStart; + } - try { - await this.adapterState.postState(this.state); - } catch (error) { - console.error('Error while posting state', error); - parentPort?.postMessage(WorkerMessageSubject.WorkerMessageExit); - this.hasWorkerEmitted = true; - return; - } + if ( + extractionEnd && + (!this.state.workersNewest || extractionEnd > this.state.workersNewest) + ) { + console.log( + `Updating workersNewest from '${this.state.workersNewest}' to '${extractionEnd}'.` + ); + this.state.workersNewest = extractionEnd; } + } - try { - // Always prune error messages to make them shorter before emit - if (data?.error?.message) { - data.error.message = truncateMessage(data.error.message); - } + if (STATELESS_EVENT_TYPES.includes(this.event.payload.event_type)) { + return true; + } - const isExtractionEvent = Object.values(ExtractorEventType).includes( - newEventType as ExtractorEventType - ); - const isLoaderEvent = Object.values(LoaderEventType).includes( - newEventType as LoaderEventType - ); + let stateSizeKb = 0; + try { + stateSizeKb = Buffer.byteLength(JSON.stringify(this.state), 'utf8') / 1024; + } catch { + stateSizeKb = 0; + } - await emit({ - eventType: newEventType, - event: this.event, - data: { - ...data, - ...(isExtractionEvent ? { artifacts: this.artifacts } : {}), - ...(isLoaderEvent - ? { reports: this.reports, processed_files: this.processedFiles } - : {}), - }, - }); + console.log( + `Saving ${stateSizeKb.toFixed(2)} KB state before emitting event with event type: ${eventType}. Current state`, + getPrintableState(this.state) + ); - const message: WorkerMessageEmitted = { - subject: WorkerMessageSubject.WorkerMessageEmitted, - payload: { eventType: newEventType }, - }; - this.artifacts = []; - parentPort?.postMessage(message); - this.hasWorkerEmitted = true; - } catch (error) { - console.error( - `Error while emitting event with event type: ${newEventType}.`, - serializeError(error) - ); - parentPort?.postMessage(WorkerMessageSubject.WorkerMessageExit); - this.hasWorkerEmitted = true; - } - }); + try { + await this.adapterState.postState(this.state); + return true; + } catch (error) { + console.error('Error while posting state', error); + parentPort?.postMessage(WorkerMessageSubject.WorkerMessageExit); + this.hasWorkerEmitted = true; + return false; + } } - async uploadAllRepos(): Promise { - for (const repo of this.repos) { - const error = await repo.upload(); - this.artifacts.push(...repo.uploadedArtifacts); - if (error) { - throw error; - } + private buildEmitPayload( + eventType: ExtractorEventType | LoaderEventType, + data?: EventData + ): EventData { + if (data?.error?.message) { + data.error.message = truncateMessage(data.error.message); + } + + const isExtractionEvent = Object.values(ExtractorEventType).includes( + eventType as ExtractorEventType + ); + const isLoaderEvent = Object.values(LoaderEventType).includes( + eventType as LoaderEventType + ); + + return { + ...data, + ...(isExtractionEvent ? { artifacts: this.artifacts } : {}), + ...(isLoaderEvent + ? { reports: this.reports, processed_files: this.processedFiles } + : {}), + }; + } + + private async sendToPlatform( + eventType: ExtractorEventType | LoaderEventType, + data?: EventData + ): Promise { + try { + await emit({ + eventType, + event: this.event, + data, + }); + return true; + } catch (error) { + console.error( + `Error while emitting event with event type: ${eventType}.`, + serializeError(error) + ); + parentPort?.postMessage(WorkerMessageSubject.WorkerMessageExit); + this.hasWorkerEmitted = true; + return false; + } + } + + private async afterEmit( + eventType: ExtractorEventType | LoaderEventType + ): Promise { + const message: WorkerMessageEmitted = { + subject: WorkerMessageSubject.WorkerMessageEmitted, + payload: { eventType }, + }; + this.artifacts = []; + parentPort?.postMessage(message); + this.hasWorkerEmitted = true; + } + + private async emitUploadFailure( + eventType: ExtractorEventType | LoaderEventType, + error: unknown + ): Promise { + const payload = this.buildEmitPayload(eventType, { + error: { message: serializeError(error) }, + }); + const sent = await this.sendToPlatform(eventType, payload); + if (sent) { + await this.afterEmit(eventType); } } diff --git a/src/multithreading/worker-adapter/worker-adapter.upload-failure.test.ts b/src/multithreading/worker-adapter/worker-adapter.upload-failure.test.ts new file mode 100644 index 0000000..9b677f6 --- /dev/null +++ b/src/multithreading/worker-adapter/worker-adapter.upload-failure.test.ts @@ -0,0 +1,172 @@ +import { State } from '../../state/state'; +import { mockServer } from '../../tests/jest.setup'; +import { createItems } from '../../tests/test-helpers'; +import { createMockEvent } from '../../common/test-utils'; +import { + AdapterState, + EventType, + ExtractorEventType, +} from '../../types'; +import { WorkerAdapter } from './worker-adapter'; + +/* eslint-disable @typescript-eslint/no-require-imports */ + +jest.mock('../../common/control-protocol', () => ({ + emit: jest.fn().mockResolvedValue({}), +})); + +jest.mock('../../mappers/mappers'); +jest.mock('node:worker_threads', () => ({ + parentPort: { postMessage: jest.fn() }, +})); +jest.mock('../../attachments-streaming/attachments-streaming-pool', () => ({ + AttachmentsStreamingPool: jest.fn().mockImplementation(() => ({ + streamAll: jest.fn().mockResolvedValue(undefined), + })), +})); + +interface TestState { + attachments: { completed: boolean }; +} + +const UPLOAD_URL_PATH = '/internal/airdrop.artifacts.upload-url'; + +function makeAdapter(): { + adapter: WorkerAdapter; + mockPostMessage: jest.Mock; +} { + const event = createMockEvent(mockServer.baseUrl, { + payload: { event_type: EventType.StartExtractingData }, + }); + const initialState: AdapterState = { + attachments: { completed: false }, + lastSyncStarted: '', + lastSuccessfulSyncStarted: '', + snapInVersionId: '', + toDevRev: { + attachmentsMetadata: { + artifactIds: [], + lastProcessed: 0, + lastProcessedAttachmentsIdsList: [], + }, + }, + }; + const adapterState = new State({ event, initialState }); + const adapter = new WorkerAdapter({ event, adapterState }); + + const workerThreads = require('node:worker_threads'); + const mockPostMessage = jest.fn(); + if (workerThreads.parentPort) { + jest + .spyOn(workerThreads.parentPort, 'postMessage') + .mockImplementation(mockPostMessage); + } else { + workerThreads.parentPort = { postMessage: mockPostMessage }; + } + + return { adapter, mockPostMessage }; +} + +describe(`${WorkerAdapter.name} upload failure (near-integration)`, () => { + let adapter: WorkerAdapter; + let mockPostMessage: jest.Mock; + + beforeEach(() => { + jest.clearAllMocks(); + mockServer.resetRoutes(); + ({ adapter, mockPostMessage } = makeAdapter()); + adapter['adapterState'].postState = jest.fn().mockResolvedValue(undefined); + }); + + afterEach(() => { + jest.restoreAllMocks(); + }); + + it('should throw from repo.push when upload-url returns 400 during batch upload', async () => { + mockServer.setRoute({ + path: UPLOAD_URL_PATH, + method: 'GET', + status: 400, + }); + + adapter.initializeRepos([ + { + itemType: 'tasks', + overridenOptions: { batchSize: 10 }, + }, + ]); + + await expect( + adapter.getRepo('tasks')?.push(createItems(20)) + ).rejects.toThrow('artifact upload URL'); + + expect(adapter.getRepo('tasks')?.getItems().length).toBe(20); + }); + + it('should emit DataExtractionError when uploadAllRepos fails', async () => { + const { emit: mockEmit } = require('../../common/control-protocol'); + + mockServer.setRoute({ + path: UPLOAD_URL_PATH, + method: 'GET', + status: 400, + }); + + adapter.initializeRepos([{ itemType: 'tasks' }]); + await adapter.getRepo('tasks')?.push(createItems(5)); + + await adapter.emit(ExtractorEventType.DataExtractionDone); + + expect(mockEmit).toHaveBeenCalledWith( + expect.objectContaining({ + eventType: ExtractorEventType.DataExtractionError, + data: expect.objectContaining({ + error: expect.objectContaining({ + message: expect.stringContaining('artifact upload URL'), + }), + }), + }) + ); + expect(mockPostMessage).toHaveBeenCalledWith( + expect.objectContaining({ + subject: 'emit', + payload: { eventType: ExtractorEventType.DataExtractionError }, + }) + ); + }); + + it('should include partial artifacts when push batch succeeded before uploadAllRepos fails', async () => { + const { emit: mockEmit } = require('../../common/control-protocol'); + + mockServer.setRoute({ + path: UPLOAD_URL_PATH, + method: 'GET', + status: 200, + succeedThenFail: { + successCount: 1, + errorStatus: 400, + }, + }); + + adapter.initializeRepos([ + { + itemType: 'tasks', + overridenOptions: { batchSize: 10 }, + }, + ]); + await adapter.getRepo('tasks')?.push(createItems(15)); + + await adapter.emit(ExtractorEventType.DataExtractionDone); + + expect(mockEmit).toHaveBeenCalledWith( + expect.objectContaining({ + eventType: ExtractorEventType.DataExtractionError, + data: expect.objectContaining({ + artifacts: expect.arrayContaining([ + expect.objectContaining({ item_count: 10, item_type: 'tasks' }), + ]), + }), + }) + ); + }); +}); diff --git a/src/repo/repo.test.ts b/src/repo/repo.test.ts index b1d93c7..0eb6ace 100644 --- a/src/repo/repo.test.ts +++ b/src/repo/repo.test.ts @@ -3,6 +3,7 @@ import { createItems, normalizeItem } from '../tests/test-helpers'; import { mockServer } from '../tests/jest.setup'; import { createMockEvent } from '../common/test-utils'; import { EventType } from '../types'; +import { Uploader } from '../uploader/uploader'; import { Repo } from './repo'; jest.mock('../tests/test-helpers', () => ({ @@ -228,4 +229,41 @@ describe(Repo.name, () => { expect(repo.getItems().length).toBe(5); }); }); + + it('should throw when upload fails', async () => { + jest.spyOn(Uploader.prototype, 'upload').mockResolvedValue({ + error: { message: 'upload failed' }, + }); + + await expect(repo.upload(createItems(1))).rejects.toThrow('upload failed'); + }); + + it('should retain items in repo when batch upload fails during push', async () => { + repo = new Repo({ + event: createMockEvent(mockServer.baseUrl, { + payload: { event_type: EventType.ExtractionDataStart }, + }), + itemType: 'test_item_type', + normalize, + onUpload: jest.fn(), + options: { batchSize: 10 }, + }); + + const items = createItems(20); + jest + .spyOn(Uploader.prototype, 'upload') + .mockResolvedValueOnce({ + artifact: { + id: 'artifact-1', + item_type: 'test_item_type', + item_count: 10, + }, + }) + .mockResolvedValueOnce({ + error: { message: 'second batch failed' }, + }); + + await expect(repo.push(items)).rejects.toThrow('second batch failed'); + expect(repo.getItems().length).toBe(10); + }); }); diff --git a/src/repo/repo.ts b/src/repo/repo.ts index d119eea..2079df0 100644 --- a/src/repo/repo.ts +++ b/src/repo/repo.ts @@ -4,7 +4,6 @@ import { SSOR_ATTACHMENT, } from '../common/constants'; import { Item } from '../repo/repo.interfaces'; -import { ErrorRecord } from '../types/common'; import { Uploader } from '../uploader/uploader'; import { Artifact } from '../uploader/uploader.interfaces'; @@ -15,7 +14,6 @@ import { NormalizedItem, RepoFactoryInterface, } from './repo.interfaces'; - export class Repo { readonly itemType: string; private items: (NormalizedItem | NormalizedAttachment | Item)[]; @@ -47,7 +45,7 @@ export class Repo { async upload( batch?: (NormalizedItem | NormalizedAttachment | Item)[] - ): Promise { + ): Promise { const itemsToUpload = batch || this.items; if (itemsToUpload.length > 0) { @@ -62,7 +60,10 @@ export class Repo { if (error || !artifact) { console.error('Error while uploading batch', error); - return error; + throw new Error( + error?.message ?? + `Upload failed for item type "${this.itemType}" without artifact.` + ); } this.onUpload(artifact); @@ -111,16 +112,9 @@ export class Repo { // Upload in batches while the number of items exceeds the batch size const batchSize = this.options?.batchSize || ARTIFACT_BATCH_SIZE; while (this.items.length >= batchSize) { - // Slice out a batch of batchSize items to upload - const batch = this.items.splice(0, batchSize); - - try { - // Upload the batch - await this.upload(batch); - } catch (error) { - console.error('Error while uploading batch', error); - return false; - } + const batch = this.items.slice(0, batchSize); + await this.upload(batch); + this.items.splice(0, batchSize); } return true; diff --git a/src/tests/test-helpers.ts b/src/tests/test-helpers.ts index 74ff77a..dcab2c7 100644 --- a/src/tests/test-helpers.ts +++ b/src/tests/test-helpers.ts @@ -179,3 +179,39 @@ export function spyOnPrivateMethod( // eslint-disable-next-line @typescript-eslint/no-explicit-any return jest.spyOn(instance as any, methodName as string); } + +export interface CallbackEventBody { + event_type: string; + event_data?: { + error?: { message: string }; + artifacts?: { id: string; item_type: string; item_count: number }[]; + }; +} + +export function getCallbackEventBodies( + requests: { body?: unknown }[] +): CallbackEventBody[] { + return requests.map((req) => req.body as CallbackEventBody); +} + +export function expectNoCallbackWithEventType( + bodies: CallbackEventBody[], + eventType: string +): void { + expect(bodies.some((b) => b.event_type === eventType)).toBe(false); +} + +export function expectLastCallbackError( + bodies: CallbackEventBody[], + expectedEventType: string, + messageSubstring?: string +): void { + expect(bodies.length).toBeGreaterThan(0); + const last = bodies[bodies.length - 1]; + expect(last.event_type).toBe(expectedEventType); + expect(last.event_data?.error?.message).toEqual(expect.any(String)); + expect(last.event_data?.error?.message?.length).toBeGreaterThan(0); + if (messageSubstring) { + expect(last.event_data?.error?.message).toContain(messageSubstring); + } +} diff --git a/src/tests/upload-failure/emit-partial-failure.ts b/src/tests/upload-failure/emit-partial-failure.ts new file mode 100644 index 0000000..15ebc0f --- /dev/null +++ b/src/tests/upload-failure/emit-partial-failure.ts @@ -0,0 +1,34 @@ +import { ExtractorEventType, NormalizedItem, processTask, RepoInterface } from '../../index'; +import { Item } from '../../repo/repo.interfaces'; + +const repos: RepoInterface[] = [ + { + itemType: 'tasks', + overridenOptions: { batchSize: 10 }, + normalize: (task: Item): NormalizedItem => ({ + id: task.id, + created_date: task.created_at, + modified_date: task.updated_at, + data: { name: task.name }, + }), + }, +]; + +processTask({ + task: async ({ adapter }) => { + adapter.initializeRepos(repos); + + const tasks = Array.from({ length: 15 }, (_, i) => ({ + id: `task_${i}`, + name: `Task ${i}`, + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + })); + + await adapter.getRepo('tasks')?.push(tasks); + await adapter.emit(ExtractorEventType.DataExtractionDone); + }, + onTimeout: async ({ adapter }) => { + await adapter.emit(ExtractorEventType.DataExtractionProgress); + }, +}); diff --git a/src/tests/upload-failure/emit-remainder-failure.ts b/src/tests/upload-failure/emit-remainder-failure.ts new file mode 100644 index 0000000..2d577c1 --- /dev/null +++ b/src/tests/upload-failure/emit-remainder-failure.ts @@ -0,0 +1,33 @@ +import { ExtractorEventType, NormalizedItem, processTask, RepoInterface } from '../../index'; +import { Item } from '../../repo/repo.interfaces'; + +const repos: RepoInterface[] = [ + { + itemType: 'tasks', + normalize: (task: Item): NormalizedItem => ({ + id: task.id, + created_date: task.created_at, + modified_date: task.updated_at, + data: { name: task.name }, + }), + }, +]; + +processTask({ + task: async ({ adapter }) => { + adapter.initializeRepos(repos); + + const tasks = Array.from({ length: 5 }, (_, i) => ({ + id: `task_${i}`, + name: `Task ${i}`, + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + })); + + await adapter.getRepo('tasks')?.push(tasks); + await adapter.emit(ExtractorEventType.DataExtractionDone); + }, + onTimeout: async ({ adapter }) => { + await adapter.emit(ExtractorEventType.DataExtractionProgress); + }, +}); diff --git a/src/tests/upload-failure/extraction.ts b/src/tests/upload-failure/extraction.ts new file mode 100644 index 0000000..4889fb6 --- /dev/null +++ b/src/tests/upload-failure/extraction.ts @@ -0,0 +1,24 @@ +import { AirdropEvent, spawn } from '../../index'; + +interface ExtractorState { + [key: string]: unknown; +} + +const initialState = {}; +const initialDomainMapping = {}; + +const run = async (events: AirdropEvent[], workerPath: string) => { + for (const event of events) { + await spawn({ + event, + initialState, + workerPath, + initialDomainMapping, + options: { + isLocalDevelopment: true, + }, + }); + } +}; + +export default run; diff --git a/src/tests/upload-failure/metadata-upload-failure.ts b/src/tests/upload-failure/metadata-upload-failure.ts new file mode 100644 index 0000000..1c00b06 --- /dev/null +++ b/src/tests/upload-failure/metadata-upload-failure.ts @@ -0,0 +1,24 @@ +import { ExtractorEventType, processTask } from '../../index'; + +const repos = [ + { + itemType: 'external_domain_metadata', + }, +]; + +processTask({ + task: async ({ adapter }) => { + adapter.initializeRepos(repos); + + await adapter + .getRepo('external_domain_metadata') + ?.push([{ id: 'metadata-1' }]); + + await adapter.emit(ExtractorEventType.MetadataExtractionDone); + }, + onTimeout: async ({ adapter }) => { + await adapter.emit(ExtractorEventType.MetadataExtractionError, { + error: { message: 'Failed to extract metadata. Lambda timeout.' }, + }); + }, +}); diff --git a/src/tests/upload-failure/mock-server.setup.ts b/src/tests/upload-failure/mock-server.setup.ts new file mode 100644 index 0000000..12936fa --- /dev/null +++ b/src/tests/upload-failure/mock-server.setup.ts @@ -0,0 +1,20 @@ +import { MockServer } from '../../mock-server/mock-server'; + +/** + * Dedicated mock server for upload-failure tests so they do not share the global + * jest.setup singleton with other suites running in parallel in the same worker. + * Port 0 assigns a unique port per instance. + */ +export const mockServer = new MockServer(0); + +beforeAll(async () => { + await mockServer.start(); +}); + +afterAll(async () => { + await mockServer.stop(); +}); + +beforeEach(() => { + mockServer.resetRoutes(); +}); diff --git a/src/tests/upload-failure/push-batch-failure.ts b/src/tests/upload-failure/push-batch-failure.ts new file mode 100644 index 0000000..7eddbe2 --- /dev/null +++ b/src/tests/upload-failure/push-batch-failure.ts @@ -0,0 +1,34 @@ +import { ExtractorEventType, NormalizedItem, processTask, RepoInterface } from '../../index'; +import { Item } from '../../repo/repo.interfaces'; + +const repos: RepoInterface[] = [ + { + itemType: 'tasks', + overridenOptions: { batchSize: 10 }, + normalize: (task: Item): NormalizedItem => ({ + id: task.id, + created_date: task.created_at, + modified_date: task.updated_at, + data: { name: task.name }, + }), + }, +]; + +processTask({ + task: async ({ adapter }) => { + adapter.initializeRepos(repos); + + const tasks = Array.from({ length: 20 }, (_, i) => ({ + id: `task_${i}`, + name: `Task ${i}`, + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + })); + + await adapter.getRepo('tasks')?.push(tasks); + await adapter.emit(ExtractorEventType.DataExtractionDone); + }, + onTimeout: async ({ adapter }) => { + await adapter.emit(ExtractorEventType.DataExtractionProgress); + }, +}); diff --git a/src/tests/upload-failure/upload-failure.integration.test.ts b/src/tests/upload-failure/upload-failure.integration.test.ts new file mode 100644 index 0000000..490e7c6 --- /dev/null +++ b/src/tests/upload-failure/upload-failure.integration.test.ts @@ -0,0 +1,227 @@ +import { + EventType, + ExtractorEventType, +} from '../../types/extraction'; +import { mockServer } from './mock-server.setup'; +import { createMockEvent } from '../../common/test-utils'; +import { + expectLastCallbackError, + expectNoCallbackWithEventType, + getCallbackEventBodies, +} from '../test-helpers'; + +import run from './extraction'; + +const UPLOAD_URL_PATH = '/internal/airdrop.artifacts.upload-url'; +const UPLOAD_URL_ERROR_SNIPPET = 'artifact upload URL'; + +function failUploadUrlRoute(status: number): void { + mockServer.setRoute({ + path: UPLOAD_URL_PATH, + method: 'GET', + status, + }); +} + +describe('Upload failure integration (fast)', () => { + beforeEach(() => { + mockServer.resetRoutes(); + }); + + describe('repo.push batch upload failure', () => { + it('should emit DataExtractionError when upload-url returns 400 during push', async () => { + failUploadUrlRoute(400); + + const event = createMockEvent(mockServer.baseUrl, { + payload: { event_type: EventType.StartExtractingData }, + }); + + await run([event], __dirname + '/push-batch-failure'); + + const callbacks = getCallbackEventBodies(mockServer.getCallbackRequests()); + expectNoCallbackWithEventType( + callbacks, + ExtractorEventType.DataExtractionDone + ); + expectLastCallbackError( + callbacks, + ExtractorEventType.DataExtractionError, + UPLOAD_URL_ERROR_SNIPPET + ); + expect( + mockServer.getRequestCount('GET', UPLOAD_URL_PATH) + ).toBeGreaterThanOrEqual(1); + expect( + callbacks[callbacks.length - 1].event_data?.error?.message + ).toContain('Error while processing task'); + }); + + it('should emit DataExtractionError when presigned file upload returns 400 during push', async () => { + mockServer.setRoute({ + path: '/file-upload-url', + method: 'POST', + status: 400, + }); + + const event = createMockEvent(mockServer.baseUrl, { + payload: { event_type: EventType.StartExtractingData }, + }); + + await run([event], __dirname + '/push-batch-failure'); + + const callbacks = getCallbackEventBodies(mockServer.getCallbackRequests()); + expectNoCallbackWithEventType( + callbacks, + ExtractorEventType.DataExtractionDone + ); + expectLastCallbackError( + callbacks, + ExtractorEventType.DataExtractionError, + 'uploading artifact' + ); + expect( + mockServer.getRequestCount('GET', UPLOAD_URL_PATH) + ).toBeGreaterThanOrEqual(1); + expect(mockServer.getRequestCount('POST', '/file-upload-url')).toBe(1); + expect( + callbacks[callbacks.length - 1].event_data?.error?.message + ).toContain('Error while processing task'); + }); + }); + + describe('uploadAllRepos failure at emit', () => { + it('should emit DataExtractionError when upload-url returns 400 during emit flush', async () => { + failUploadUrlRoute(400); + + const event = createMockEvent(mockServer.baseUrl, { + payload: { event_type: EventType.StartExtractingData }, + }); + + await run([event], __dirname + '/emit-remainder-failure'); + + const callbacks = getCallbackEventBodies(mockServer.getCallbackRequests()); + expectNoCallbackWithEventType( + callbacks, + ExtractorEventType.DataExtractionDone + ); + expectLastCallbackError( + callbacks, + ExtractorEventType.DataExtractionError, + UPLOAD_URL_ERROR_SNIPPET + ); + expect( + callbacks[callbacks.length - 1].event_data?.error?.message + ).not.toContain('Worker exited without emitting'); + }); + + it('should emit error when presigned file upload returns 400', async () => { + mockServer.setRoute({ + path: '/file-upload-url', + method: 'POST', + status: 400, + }); + + const event = createMockEvent(mockServer.baseUrl, { + payload: { event_type: EventType.StartExtractingData }, + }); + + await run([event], __dirname + '/emit-remainder-failure'); + + const callbacks = getCallbackEventBodies(mockServer.getCallbackRequests()); + expectNoCallbackWithEventType( + callbacks, + ExtractorEventType.DataExtractionDone + ); + expectLastCallbackError( + callbacks, + ExtractorEventType.DataExtractionError, + 'uploading artifact' + ); + }); + + it('should emit error when confirm-upload returns 400', async () => { + mockServer.setRoute({ + path: '/internal/airdrop.artifacts.confirm-upload', + method: 'POST', + status: 400, + }); + + const event = createMockEvent(mockServer.baseUrl, { + payload: { event_type: EventType.StartExtractingData }, + }); + + await run([event], __dirname + '/emit-remainder-failure'); + + const callbacks = getCallbackEventBodies(mockServer.getCallbackRequests()); + expectNoCallbackWithEventType( + callbacks, + ExtractorEventType.DataExtractionDone + ); + expectLastCallbackError( + callbacks, + ExtractorEventType.DataExtractionError, + 'confirming artifact upload' + ); + }); + }); + + describe('partial upload then uploadAllRepos failure', () => { + it('should include one artifact when first batch succeeds and remainder upload fails', async () => { + mockServer.setRoute({ + path: UPLOAD_URL_PATH, + method: 'GET', + status: 200, + succeedThenFail: { + successCount: 1, + errorStatus: 400, + }, + }); + + const event = createMockEvent(mockServer.baseUrl, { + payload: { event_type: EventType.StartExtractingData }, + }); + + await run([event], __dirname + '/emit-partial-failure'); + + const callbacks = getCallbackEventBodies(mockServer.getCallbackRequests()); + expectNoCallbackWithEventType( + callbacks, + ExtractorEventType.DataExtractionDone + ); + expectLastCallbackError( + callbacks, + ExtractorEventType.DataExtractionError, + UPLOAD_URL_ERROR_SNIPPET + ); + expect(callbacks[callbacks.length - 1].event_data?.artifacts).toHaveLength( + 1 + ); + expect( + callbacks[callbacks.length - 1].event_data?.artifacts?.[0].item_count + ).toBe(10); + }); + }); + + describe('metadata extraction phase', () => { + it('should emit MetadataExtractionError when upload-url returns 400', async () => { + failUploadUrlRoute(400); + + const event = createMockEvent(mockServer.baseUrl, { + payload: { event_type: EventType.StartExtractingMetadata }, + }); + + await run([event], __dirname + '/metadata-upload-failure'); + + const callbacks = getCallbackEventBodies(mockServer.getCallbackRequests()); + expectNoCallbackWithEventType( + callbacks, + ExtractorEventType.MetadataExtractionDone + ); + expectLastCallbackError( + callbacks, + ExtractorEventType.MetadataExtractionError, + UPLOAD_URL_ERROR_SNIPPET + ); + }); + }); +}); diff --git a/src/tests/upload-failure/upload-failure.slow.test.ts b/src/tests/upload-failure/upload-failure.slow.test.ts new file mode 100644 index 0000000..a308b8a --- /dev/null +++ b/src/tests/upload-failure/upload-failure.slow.test.ts @@ -0,0 +1,83 @@ +/** + * Upload-failure retry tests (spawn integration). + * + * Uses a dedicated mock server (./mock-server.setup) for isolation when the slow + * project runs test files in parallel. These tests use the production retry + * count and are intentionally slower than the fast upload-failure suite. + */ +import { EventType, ExtractorEventType } from '../../types/extraction'; +import { createMockEvent } from '../../common/test-utils'; +import { + expectLastCallbackError, + expectNoCallbackWithEventType, + getCallbackEventBodies, +} from '../test-helpers'; +import { mockServer } from './mock-server.setup'; + +import run from './extraction'; + +jest.setTimeout(180000); + +const UPLOAD_URL_PATH = '/internal/airdrop.artifacts.upload-url'; +const TEST_HTTP_RETRIES = 5; + +function failUploadUrlPermanently(): void { + mockServer.setRoute({ + path: UPLOAD_URL_PATH, + method: 'GET', + status: 503, + }); +} + +describe('Upload failure integration (retry exhaustion)', () => { + it('should emit DataExtractionError after upload-url retries are exhausted during push', async () => { + failUploadUrlPermanently(); + + const event = createMockEvent(mockServer.baseUrl, { + payload: { event_type: EventType.StartExtractingData }, + }); + + await run([event], __dirname + '/push-batch-failure'); + + const callbacks = getCallbackEventBodies(mockServer.getCallbackRequests()); + expectNoCallbackWithEventType( + callbacks, + ExtractorEventType.DataExtractionDone + ); + expectLastCallbackError( + callbacks, + ExtractorEventType.DataExtractionError, + 'artifact upload URL' + ); + expect(mockServer.getRequestCount('GET', UPLOAD_URL_PATH)).toBe( + TEST_HTTP_RETRIES + 1 + ); + }); + + it('should emit DataExtractionError after upload-url retries are exhausted during emit flush', async () => { + failUploadUrlPermanently(); + + const event = createMockEvent(mockServer.baseUrl, { + payload: { event_type: EventType.StartExtractingData }, + }); + + await run([event], __dirname + '/emit-remainder-failure'); + + const callbacks = getCallbackEventBodies(mockServer.getCallbackRequests()); + expectNoCallbackWithEventType( + callbacks, + ExtractorEventType.DataExtractionDone + ); + expectLastCallbackError( + callbacks, + ExtractorEventType.DataExtractionError, + 'artifact upload URL' + ); + expect( + callbacks[callbacks.length - 1].event_data?.error?.message + ).not.toContain('Worker exited without emitting'); + expect(mockServer.getRequestCount('GET', UPLOAD_URL_PATH)).toBe( + TEST_HTTP_RETRIES + 1 + ); + }); +});