From 8e3bfc499e4f7d9dc4fd2d8af9b483b63c7c62a7 Mon Sep 17 00:00:00 2001 From: ferhat elmas Date: Wed, 22 Apr 2026 11:34:26 +0200 Subject: [PATCH 1/2] feat: add sb-request-id logging Also propagate the existing `reqId` where missed. Signed-off-by: ferhat elmas --- src/admin-app.ts | 1 + src/app.ts | 1 + src/http/plugins/db.ts | 6 + src/http/plugins/index.ts | 1 + src/http/plugins/log-request.test.ts | 50 +++- src/http/plugins/log-request.ts | 31 ++- src/http/plugins/request-context.test.ts | 37 +++ src/http/plugins/request-context.ts | 18 ++ src/http/plugins/storage.ts | 1 + src/http/plugins/tracing.ts | 6 +- src/http/routes/admin/jwks.ts | 10 +- src/http/routes/admin/migrations.ts | 64 +++-- src/http/routes/admin/objects.ts | 17 +- src/http/routes/admin/queue.test.ts | 121 +++++++++ src/http/routes/admin/queue.ts | 8 +- src/http/routes/object/copyObject.ts | 1 + src/http/routes/object/moveObject.ts | 2 +- src/http/routes/tus/index.test.ts | 66 +++++ src/http/routes/tus/index.ts | 22 +- src/http/routes/tus/lifecycle.test.ts | 125 +++++++++ src/http/routes/tus/lifecycle.ts | 49 ++-- src/internal/auth/jwks/generator.ts | 12 +- .../database/migrations/migrate.test.ts | 243 ++++++++++++++++++ src/internal/database/migrations/migrate.ts | 129 ++++++---- .../database/migrations/progressive.test.ts | 42 ++- src/internal/monitoring/index.ts | 1 + src/internal/monitoring/logflare.test.ts | 75 ++++++ src/internal/monitoring/logflare.ts | 8 +- src/internal/monitoring/logger.test.ts | 78 ++++++ src/internal/monitoring/logger.ts | 5 + .../monitoring/request-context.test.ts | 45 ++++ src/internal/monitoring/request-context.ts | 41 +++ src/internal/queue/constants.ts | 5 + src/internal/queue/event.ts | 76 +++--- src/internal/queue/index.ts | 1 + src/internal/queue/queue.test.ts | 109 ++++++++ src/internal/queue/queue.ts | 27 +- src/start/server.ts | 8 +- src/storage/database/adapter.ts | 2 + src/storage/database/knex.ts | 2 + src/storage/events/base-event.ts | 20 +- .../events/jwks/jwks-create-signing-secret.ts | 4 +- .../jwks/jwks-roll-url-signing-key.test.ts | 85 ++++++ .../events/jwks/jwks-roll-url-signing-key.ts | 4 +- .../events/lifecycle/bucket-deleted.ts | 1 + .../events/lifecycle/object-removed.ts | 5 +- .../events/lifecycle/object-updated.ts | 2 +- src/storage/events/lifecycle/webhook.ts | 7 +- .../migrations/reset-migrations.test.ts | 106 ++++++++ .../events/migrations/reset-migrations.ts | 5 + .../events/migrations/run-migrations.test.ts | 20 ++ .../events/migrations/run-migrations.ts | 6 + src/storage/events/objects/backup-object.ts | 5 +- .../objects/object-admin-delete-all-before.ts | 8 +- .../events/objects/object-admin-delete.ts | 4 +- src/storage/events/pgboss/move-jobs.test.ts | 107 ++++++++ src/storage/events/pgboss/move-jobs.ts | 4 + src/storage/events/pgboss/upgrade-v10.test.ts | 87 +++++++ src/storage/events/pgboss/upgrade-v10.ts | 3 + src/storage/object.ts | 17 +- src/storage/protocols/s3/s3-handler.ts | 1 + src/storage/scanner/scanner.ts | 1 + src/storage/storage.ts | 6 +- src/storage/uploader.ts | 8 +- src/test/admin-migrations.test.ts | 40 ++- src/test/tenant-jwks.test.ts | 3 +- src/test/webhooks.test.ts | 31 +++ 67 files changed, 1926 insertions(+), 210 deletions(-) create mode 100644 src/http/plugins/request-context.test.ts create mode 100644 src/http/plugins/request-context.ts create mode 100644 src/http/routes/admin/queue.test.ts create mode 100644 src/http/routes/tus/index.test.ts create mode 100644 src/http/routes/tus/lifecycle.test.ts create mode 100644 src/internal/database/migrations/migrate.test.ts create mode 100644 src/internal/monitoring/logflare.test.ts create mode 100644 src/internal/monitoring/request-context.test.ts create mode 100644 src/internal/monitoring/request-context.ts create mode 100644 src/internal/queue/constants.ts create mode 100644 src/internal/queue/queue.test.ts create mode 100644 src/storage/events/jwks/jwks-roll-url-signing-key.test.ts create mode 100644 src/storage/events/migrations/reset-migrations.test.ts create mode 100644 src/storage/events/pgboss/move-jobs.test.ts create mode 100644 src/storage/events/pgboss/upgrade-v10.test.ts diff --git a/src/admin-app.ts b/src/admin-app.ts index ebc1c3b77..f16dc47f5 100644 --- a/src/admin-app.ts +++ b/src/admin-app.ts @@ -45,6 +45,7 @@ const build = (opts: buildOpts = {}): FastifyInstance => { }) } + app.register(plugins.requestContext) app.register(plugins.signals) app.register(plugins.adminTenantId) app.register(plugins.logRequest({ excludeUrls: ['/status', '/metrics', '/health', '/version'] })) diff --git a/src/app.ts b/src/app.ts index b2275f0d2..f767f1c08 100644 --- a/src/app.ts +++ b/src/app.ts @@ -64,6 +64,7 @@ const build = (opts: buildOpts = {}): FastifyInstance => { app.addSchema(schemas.authSchema) app.addSchema(schemas.errorSchema) + app.register(plugins.requestContext) app.register(plugins.signals) app.register(plugins.tenantId) app.register( diff --git a/src/http/plugins/db.ts b/src/http/plugins/db.ts index 2433255c1..49145388b 100644 --- a/src/http/plugins/db.ts +++ b/src/http/plugins/db.ts @@ -68,6 +68,7 @@ export const db = fastifyPlugin( logSchema.error(request.log, 'Error disposing db connection', { type: 'db-connection', error: e, + sbReqId: request.sbReqId, }) }) } @@ -80,6 +81,7 @@ export const db = fastifyPlugin( logSchema.error(request.log, 'Error disposing db connection', { type: 'db-connection', error: e, + sbReqId: request.sbReqId, }) }) } @@ -91,6 +93,7 @@ export const db = fastifyPlugin( logSchema.error(request.log, 'Error disposing db connection', { type: 'db-connection', error: e, + sbReqId: request.sbReqId, }) }) } @@ -137,6 +140,7 @@ export const dbSuperUser = fastifyPlugin( logSchema.error(request.log, 'Error disposing db connection', { type: 'db-connection', error: e, + sbReqId: request.sbReqId, }) }) } @@ -150,6 +154,7 @@ export const dbSuperUser = fastifyPlugin( logSchema.error(request.log, 'Error disposing db connection', { type: 'db-connection', error: e, + sbReqId: request.sbReqId, }) }) } @@ -161,6 +166,7 @@ export const dbSuperUser = fastifyPlugin( logSchema.error(request.log, 'Error disposing db connection', { type: 'db-connection', error: e, + sbReqId: request.sbReqId, }) }) } diff --git a/src/http/plugins/index.ts b/src/http/plugins/index.ts index 436b2ece7..4e6c2f1dd 100644 --- a/src/http/plugins/index.ts +++ b/src/http/plugins/index.ts @@ -6,6 +6,7 @@ export * from './iceberg' export * from './jwt' export * from './log-request' export * from './metrics' +export * from './request-context' export * from './signals' export * from './signature-v4' export * from './storage' diff --git a/src/http/plugins/log-request.test.ts b/src/http/plugins/log-request.test.ts index be4849add..662d41d39 100644 --- a/src/http/plugins/log-request.test.ts +++ b/src/http/plugins/log-request.test.ts @@ -1,12 +1,38 @@ -import Fastify, { FastifyInstance } from 'fastify' +import { Writable } from 'node:stream' +import Fastify from 'fastify' +import pino from 'pino' import { logRequest } from './log-request' +import { requestContext } from './request-context' + +function createApp(lines: string[]) { + return Fastify({ + disableRequestLogging: true, + loggerInstance: pino( + { level: 'info' }, + new Writable({ + write(chunk, _encoding, callback) { + lines.push(chunk.toString()) + callback() + }, + }) + ), + }) +} describe('log-request plugin', () => { - let app: FastifyInstance + let app: ReturnType + let lines: string[] beforeEach(async () => { - app = Fastify() + lines = [] + app = createApp(lines) + + await app.register(requestContext) await app.register(logRequest({})) + + app.get('/request-log', async () => { + return { ok: true } + }) }) afterEach(async () => { @@ -56,4 +82,22 @@ describe('log-request plugin', () => { resources: ['/bucket/demo', '/object/demo'], }) }) + + it('threads sbReqId into the request log data', async () => { + const response = await app.inject({ + method: 'GET', + url: '/request-log', + headers: { + 'sb-request-id': 'sb-req-123', + }, + }) + + expect(response.statusCode).toBe(200) + + const requestLogLine = lines.find((line) => line.includes('"type":"request"')) + + expect(requestLogLine).toBeDefined() + expect(requestLogLine).toContain('"sbReqId":"sb-req-123"') + expect(requestLogLine).not.toContain('"request_id"') + }) }) diff --git a/src/http/plugins/log-request.ts b/src/http/plugins/log-request.ts index 56e8e7a69..7f77445d5 100644 --- a/src/http/plugins/log-request.ts +++ b/src/http/plugins/log-request.ts @@ -1,12 +1,22 @@ -import { logger, logSchema, redactQueryParamFromRequest } from '@internal/monitoring' -import { FastifyReply } from 'fastify/types/reply' -import { FastifyRequest } from 'fastify/types/request' +import { logSchema, redactQueryParamFromRequest } from '@internal/monitoring' +import type { FastifyReply, FastifyRequest } from 'fastify' import fastifyPlugin from 'fastify-plugin' interface RequestLoggerOptions { excludeUrls?: string[] } +type BivariantHandler = { + bivarianceHack(...args: Args): Return +}['bivarianceHack'] + +declare module 'http' { + interface IncomingMessage { + executionError?: Error + resources?: string[] + } +} + declare module 'fastify' { interface FastifyRequest { executionError?: Error @@ -18,8 +28,8 @@ declare module 'fastify' { interface FastifyContextConfig { operation?: { type: string } - resources?: (req: FastifyRequest) => string[] - logMetadata?: (req: FastifyRequest) => Record + resources?: BivariantHandler<[req: FastifyRequest], string[]> + logMetadata?: BivariantHandler<[req: FastifyRequest], Record> } } @@ -64,7 +74,7 @@ export const logRequest = (options: RequestLoggerOptions) => } if (resources === undefined) { - resources = (req.raw as any).resources + resources = req.raw.resources } if (resources === undefined) { @@ -150,7 +160,7 @@ function doRequestLog(req: FastifyRequest, options: LogRequestOptions) { const rId = req.id const cIP = req.ip const statusCode = options.statusCode - const error = (req.raw as any).executionError || req.executionError + const error = req.raw.executionError || req.executionError const tenantId = req.tenantId let reqMetadata: Record = {} @@ -166,16 +176,18 @@ function doRequestLog(req: FastifyRequest, options: LogRequestOptions) { } } catch (e) { // do nothing - logSchema.warning(logger, 'Failed to serialize log metadata', { + logSchema.warning(req.log, 'Failed to serialize log metadata', { type: 'otel', error: e, + sbReqId: req.sbReqId, }) } } } catch (e) { - logSchema.error(logger, 'Failed to get log metadata', { + logSchema.error(req.log, 'Failed to get log metadata', { type: 'request', error: e, + sbReqId: req.sbReqId, }) } } @@ -195,5 +207,6 @@ function doRequestLog(req: FastifyRequest, options: LogRequestOptions) { resources: req.resources, operation: req.operation?.type ?? req.routeOptions.config.operation?.type, serverTimes: req.serverTimings, + sbReqId: req.sbReqId, }) } diff --git a/src/http/plugins/request-context.test.ts b/src/http/plugins/request-context.test.ts new file mode 100644 index 000000000..c075aa97c --- /dev/null +++ b/src/http/plugins/request-context.test.ts @@ -0,0 +1,37 @@ +import Fastify, { FastifyInstance } from 'fastify' +import { requestContext } from './request-context' + +describe('request-context plugin', () => { + let app: FastifyInstance + + beforeEach(async () => { + app = Fastify() + await app.register(requestContext) + }) + + afterEach(async () => { + await app.close() + }) + + it('extracts sb-request-id from the header onto the request', async () => { + app.get('/context', async (request) => ({ sbReqId: request.sbReqId })) + + const response = await app.inject({ + method: 'GET', + url: '/context', + headers: { 'sb-request-id': 'sb-req-123' }, + }) + + expect(response.statusCode).toBe(200) + expect(response.json()).toEqual({ sbReqId: 'sb-req-123' }) + }) + + it('leaves sbReqId undefined when the header is absent', async () => { + app.get('/context', async (request) => ({ sbReqId: request.sbReqId ?? null })) + + const response = await app.inject({ method: 'GET', url: '/context' }) + + expect(response.statusCode).toBe(200) + expect(response.json()).toEqual({ sbReqId: null }) + }) +}) diff --git a/src/http/plugins/request-context.ts b/src/http/plugins/request-context.ts new file mode 100644 index 000000000..956fd0690 --- /dev/null +++ b/src/http/plugins/request-context.ts @@ -0,0 +1,18 @@ +import { getSbReqId } from '@internal/monitoring' +import fastifyPlugin from 'fastify-plugin' + +declare module 'fastify' { + interface FastifyRequest { + sbReqId?: string + } +} + +export const requestContext = fastifyPlugin( + async (fastify) => { + fastify.decorateRequest('sbReqId', undefined) + fastify.addHook('onRequest', async (request) => { + request.sbReqId = getSbReqId(request.headers) + }) + }, + { name: 'request-context' } +) diff --git a/src/http/plugins/storage.ts b/src/http/plugins/storage.ts index 18f4d5967..793d1d30f 100644 --- a/src/http/plugins/storage.ts +++ b/src/http/plugins/storage.ts @@ -26,6 +26,7 @@ export const storage = fastifyPlugin( tenantId: request.tenantId, host: request.headers['x-forwarded-host'] as string, reqId: request.id, + sbReqId: request.sbReqId, latestMigration: request.latestMigration, }) diff --git a/src/http/plugins/tracing.ts b/src/http/plugins/tracing.ts index e44b4c5bb..fd291fa3f 100644 --- a/src/http/plugins/tracing.ts +++ b/src/http/plugins/tracing.ts @@ -40,7 +40,11 @@ export const tracing = fastifyPlugin( } } } catch (e) { - logSchema.error(request.log, 'failed setting tracing mode', { error: e, type: 'tracing' }) + logSchema.error(request.log, 'failed setting tracing mode', { + error: e, + type: 'tracing', + sbReqId: request.sbReqId, + }) } }) }, diff --git a/src/http/routes/admin/jwks.ts b/src/http/routes/admin/jwks.ts index 414321bc6..8ccdefabd 100644 --- a/src/http/routes/admin/jwks.ts +++ b/src/http/routes/admin/jwks.ts @@ -144,7 +144,9 @@ export default async function routes(fastify: FastifyInstance) { tenantId, tenant: { ref: tenantId, + host: '', }, + sbReqId: request.sbReqId, }) return reply.send({ started: true }) @@ -162,12 +164,14 @@ export default async function routes(fastify: FastifyInstance) { .send(`Generate missing jwks is already running, and has sent ${sent} items so far`) } - UrlSigningJwkGenerator.generateUrlSigningJwksOnAllTenants( - request.signals.disconnect.signal - ).catch((e) => { + UrlSigningJwkGenerator.generateUrlSigningJwksOnAllTenants({ + sbReqId: request.sbReqId, + signal: request.signals.disconnect.signal, + }).catch((e) => { logSchema.error(request.log, 'Error generating url signing jwks for all tenants', { type: 'jwk-generator', error: e, + sbReqId: request.sbReqId, }) }) return reply.send({ started: true }) diff --git a/src/http/routes/admin/migrations.ts b/src/http/routes/admin/migrations.ts index de9bb63f8..dced0d1ee 100644 --- a/src/http/routes/admin/migrations.ts +++ b/src/http/routes/admin/migrations.ts @@ -6,13 +6,19 @@ import { } from '@internal/database/migrations' import { PG_BOSS_SCHEMA, Queue } from '@internal/queue' import { RunMigrationsOnTenants } from '@storage/events' -import { FastifyInstance } from 'fastify' +import { FastifyInstance, RequestGenericInterface } from 'fastify' import { getConfig } from '../../../config' import apiKey from '../../plugins/apikey' const { pgQueueEnable } = getConfig() const migrationQueueName = RunMigrationsOnTenants.getQueueName() +interface FailedMigrationsRequest extends RequestGenericInterface { + Querystring: { + cursor?: string + } +} + export default async function routes(fastify: FastifyInstance) { fastify.register(apiKey) @@ -21,7 +27,10 @@ export default async function routes(fastify: FastifyInstance) { return reply.status(400).send({ message: 'Queue is not enabled' }) } - await runMigrationsOnAllTenants(req.signals.disconnect.signal) + await runMigrationsOnAllTenants({ + signal: req.signals.disconnect.signal, + sbReqId: req.sbReqId, + }) return reply.send({ message: 'Migrations scheduled' }) }) @@ -50,6 +59,7 @@ export default async function routes(fastify: FastifyInstance) { ? markCompletedTillMigration : undefined, signal: req.signals.disconnect.signal, + sbReqId: req.sbReqId, }) return reply.send({ message: 'Migrations scheduled' }) @@ -92,23 +102,37 @@ export default async function routes(fastify: FastifyInstance) { return { remaining: queueSize } }) - fastify.get('/failed', { schema: { tags: ['migration'] } }, async (req, reply) => { - if (!pgQueueEnable) { - return reply.code(400).send({ message: 'Queue is not enabled' }) + fastify.get( + '/failed', + { schema: { tags: ['migration'] } }, + async (req, reply) => { + if (!pgQueueEnable) { + return reply.code(400).send({ message: 'Queue is not enabled' }) + } + let offset = 0 + + if (req.query.cursor !== undefined) { + const parsedCursor = Number(req.query.cursor) + + if (!Number.isFinite(parsedCursor) || !Number.isInteger(parsedCursor) || parsedCursor < 0) { + return reply.code(400).send({ message: 'Invalid cursor' }) + } + + offset = parsedCursor + } + + const failed = await multitenantKnex + .table('tenants') + .where('migrations_status', 'FAILED') + .where('cursor_id', '>', offset) + .limit(50) + .select('id', 'cursor_id') + .orderBy('cursor_id') + + reply.status(200).send({ + next_cursor_id: failed[failed.length - 1]?.cursor_id || null, + data: failed, + }) } - const offset = (req.query as any).cursor ? Number((req.query as any).cursor) : 0 - - const failed = await multitenantKnex - .table('tenants') - .where('migrations_status', 'FAILED') - .where('cursor_id', '>', offset) - .limit(50) - .select('id', 'cursor_id') - .orderBy('cursor_id') - - reply.status(200).send({ - next_cursor_id: failed[failed.length - 1]?.cursor_id || null, - data: failed, - }) - }) + ) } diff --git a/src/http/routes/admin/objects.ts b/src/http/routes/admin/objects.ts index 892152440..310629e16 100644 --- a/src/http/routes/admin/objects.ts +++ b/src/http/routes/admin/objects.ts @@ -1,4 +1,5 @@ import { render } from '@internal/errors' +import { logSchema } from '@internal/monitoring' import { ObjectScanner } from '@storage/scanner/scanner' import { FastifyInstance, RequestGenericInterface } from 'fastify' import { FastifyReply } from 'fastify/types/reply' @@ -121,7 +122,13 @@ export default async function routes(fastify: FastifyInstance) { } } } catch (e) { - req.log.error({ err: e, bucket }, 'list orphaned objects stream failed') + logSchema.error(req.log, 'list orphaned objects stream failed', { + type: 'orphan', + error: e, + project: req.params.tenantId, + metadata: JSON.stringify({ bucket }), + sbReqId: req.sbReqId, + }) writeNdjson(reply, { event: 'error', error: render(e), @@ -181,7 +188,13 @@ export default async function routes(fastify: FastifyInstance) { }) } } catch (e) { - req.log.error({ err: e, bucket }, 'delete orphaned objects stream failed') + logSchema.error(req.log, 'delete orphaned objects stream failed', { + type: 'orphan', + error: e, + project: req.params.tenantId, + metadata: JSON.stringify({ bucket }), + sbReqId: req.sbReqId, + }) writeNdjson(reply, { event: 'error', error: render(e), diff --git a/src/http/routes/admin/queue.test.ts b/src/http/routes/admin/queue.test.ts new file mode 100644 index 000000000..1ee0ae48e --- /dev/null +++ b/src/http/routes/admin/queue.test.ts @@ -0,0 +1,121 @@ +import { SYSTEM_TENANT } from '@internal/queue' +import { vi } from 'vitest' + +const { mockUpgradeSend, mockMoveJobsSend } = vi.hoisted(() => ({ + mockUpgradeSend: vi.fn(), + mockMoveJobsSend: vi.fn(), +})) + +vi.mock('@storage/events', () => ({ + UpgradePgBossV10: { + send: mockUpgradeSend, + }, + MoveJobs: { + send: mockMoveJobsSend, + }, +})) + +describe('admin queue routes', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('passes sbReqId to the pg-boss upgrade job', async () => { + vi.resetModules() + + const { mergeConfig } = await import('../../../config') + mergeConfig({ + pgQueueEnable: true, + adminApiKeys: 'test-admin-key', + }) + + const fastify = (await import('fastify')).default + const { default: routes } = await import('./queue') + + const app = fastify() + app.decorateRequest('sbReqId', undefined) + app.addHook('onRequest', (request, _reply, done) => { + request.sbReqId = + typeof request.headers['sb-request-id'] === 'string' + ? request.headers['sb-request-id'] + : undefined + done() + }) + app.register(routes, { prefix: '/queue' }) + + try { + const response = await app.inject({ + method: 'POST', + url: '/queue/migrate/pgboss-v10', + headers: { + apikey: 'test-admin-key', + 'sb-request-id': 'sb-req-123', + }, + }) + + expect(response.statusCode).toBe(200) + expect(response.json()).toEqual({ message: 'Migration scheduled' }) + expect(mockUpgradeSend).toHaveBeenCalledWith({ + sbReqId: 'sb-req-123', + tenant: SYSTEM_TENANT, + }) + } finally { + await app.close() + } + }) + + it('passes sbReqId to the move jobs task', async () => { + vi.resetModules() + + const { mergeConfig } = await import('../../../config') + mergeConfig({ + pgQueueEnable: true, + adminApiKeys: 'test-admin-key', + }) + + const fastify = (await import('fastify')).default + const { default: routes } = await import('./queue') + + const app = fastify() + app.decorateRequest('sbReqId', undefined) + app.addHook('onRequest', (request, _reply, done) => { + request.sbReqId = + typeof request.headers['sb-request-id'] === 'string' + ? request.headers['sb-request-id'] + : undefined + done() + }) + app.register(routes, { prefix: '/queue' }) + + try { + const response = await app.inject({ + method: 'POST', + url: '/queue/move', + headers: { + apikey: 'test-admin-key', + 'sb-request-id': 'sb-req-123', + }, + payload: { + fromQueue: 'source-queue', + toQueue: 'target-queue', + deleteJobsFromOriginalQueue: true, + }, + }) + + expect(response.statusCode).toBe(200) + expect(response.json()).toEqual({ message: 'Move jobs scheduled' }) + expect(mockMoveJobsSend).toHaveBeenCalledWith({ + fromQueue: 'source-queue', + toQueue: 'target-queue', + deleteJobsFromOriginalQueue: true, + sbReqId: 'sb-req-123', + tenant: { + ref: '', + host: '', + }, + }) + } finally { + await app.close() + } + }) +}) diff --git a/src/http/routes/admin/queue.ts b/src/http/routes/admin/queue.ts index 2af65c583..db3b5cc65 100644 --- a/src/http/routes/admin/queue.ts +++ b/src/http/routes/admin/queue.ts @@ -1,3 +1,4 @@ +import { SYSTEM_TENANT } from '@internal/queue/constants' import { MoveJobs, UpgradePgBossV10 } from '@storage/events' import { FastifyInstance, RequestGenericInterface } from 'fastify' import { FromSchema } from 'json-schema-to-ts' @@ -37,7 +38,10 @@ export default async function routes(fastify: FastifyInstance) { return reply.status(400).send({ message: 'Queue is not enabled' }) } - await UpgradePgBossV10.send({}) + await UpgradePgBossV10.send({ + sbReqId: req.sbReqId, + tenant: SYSTEM_TENANT, + }) return reply.send({ message: 'Migration scheduled' }) }) @@ -58,6 +62,8 @@ export default async function routes(fastify: FastifyInstance) { fromQueue, toQueue, deleteJobsFromOriginalQueue, + sbReqId: req.sbReqId, + tenant: SYSTEM_TENANT, }) return reply.send({ message: 'Move jobs scheduled' }) diff --git a/src/http/routes/object/copyObject.ts b/src/http/routes/object/copyObject.ts index 4366c8ed5..742b37485 100644 --- a/src/http/routes/object/copyObject.ts +++ b/src/http/routes/object/copyObject.ts @@ -74,6 +74,7 @@ export default async function routes(fastify: FastifyInstance) { metadata, copyMetadata: request.body.copyMetadata ?? true, upsert: request.headers['x-upsert'] === 'true', + uploadType: 'standard', }) return response.status(result.httpStatusCode ?? 200).send({ diff --git a/src/http/routes/object/moveObject.ts b/src/http/routes/object/moveObject.ts index 0edcbf6d8..cd61967c1 100644 --- a/src/http/routes/object/moveObject.ts +++ b/src/http/routes/object/moveObject.ts @@ -53,7 +53,7 @@ export default async function routes(fastify: FastifyInstance) { const move = await request.storage .from(bucketId) - .moveObject(sourceKey, destinationBucketId, destinationKey, request.owner) + .moveObject(sourceKey, destinationBucketId, destinationKey, 'standard', request.owner) return response.status(200).send({ message: 'Successfully moved', diff --git a/src/http/routes/tus/index.test.ts b/src/http/routes/tus/index.test.ts new file mode 100644 index 000000000..73cd5601c --- /dev/null +++ b/src/http/routes/tus/index.test.ts @@ -0,0 +1,66 @@ +import type { Server } from '@tus/server' +import Fastify, { FastifyInstance } from 'fastify' +import { requestContext } from '../../plugins/request-context' +import { publicRoutes } from './index' +import type { MultiPartRequest } from './lifecycle' + +describe('public tus route request context', () => { + let app: FastifyInstance + let observedUpload: MultiPartRequest['upload'] | undefined + + beforeEach(async () => { + observedUpload = undefined + + app = Fastify() + app.decorateRequest('tenantId') + app.decorateRequest('owner') + app.decorateRequest('db') + app.decorateRequest('storage') + + await app.register(requestContext) + + app.addHook('onRequest', async (request) => { + request.tenantId = 'tenant-123' + request.owner = 'owner-123' + request.db = { dispose: vi.fn() } as never + request.storage = { + backend: {}, + db: {}, + location: {}, + } as never + }) + + await app.register(publicRoutes, { + tusServer: { + handle: vi.fn(async (rawReq, rawRes) => { + observedUpload = (rawReq as MultiPartRequest).upload + rawRes.statusCode = 204 + rawRes.end() + }), + } as unknown as Server, + }) + }) + + afterEach(async () => { + await app.close() + }) + + it('threads sbReqId onto the public route raw upload context', async () => { + const response = await app.inject({ + method: 'OPTIONS', + url: '/public/object', + headers: { + 'sb-request-id': 'sb-req-123', + 'x-upsert': 'true', + }, + }) + + expect(response.statusCode).toBe(204) + expect(observedUpload).toMatchObject({ + owner: 'owner-123', + tenantId: 'tenant-123', + isUpsert: true, + sbReqId: 'sb-req-123', + }) + }) +}) diff --git a/src/http/routes/tus/index.ts b/src/http/routes/tus/index.ts index 6892d6e14..3024692e5 100644 --- a/src/http/routes/tus/index.ts +++ b/src/http/routes/tus/index.ts @@ -1,6 +1,6 @@ import * as https from 'node:https' import { S3Client } from '@aws-sdk/client-s3' -import { PubSub, TenantConnection } from '@internal/database' +import { PubSub } from '@internal/database' import { ERRORS } from '@internal/errors' import { createAgent } from '@internal/http' import { logSchema } from '@internal/monitoring' @@ -8,10 +8,9 @@ import { NodeHttpHandler } from '@smithy/node-http-handler' import { getFileSizeLimit } from '@storage/limits' import { AlsMemoryKV, FileStore, LockNotifier, PgLocker, UploadId } from '@storage/protocols/tus' import { S3Locker } from '@storage/protocols/tus/s3-locker' -import { Storage } from '@storage/storage' import { S3Store } from '@tus/s3-store' import { DataStore, Server, ServerOptions } from '@tus/server' -import { FastifyBaseLogger, FastifyInstance } from 'fastify' +import { FastifyInstance } from 'fastify' import fastifyPlugin from 'fastify-plugin' import * as http from 'http' import type { ServerRequest as Request } from 'srvx' @@ -21,6 +20,7 @@ import { ROUTE_OPERATIONS } from '../operations' import { generateUrl, getFileIdFromRequest, + type MultiPartRequest, namingFunction, onCreate, onIncomingRequest, @@ -47,18 +47,6 @@ const { storageFilePath, } = getConfig() -type MultiPartRequest = http.IncomingMessage & { - log: FastifyBaseLogger - upload: { - storage: Storage - owner?: string - tenantId: string - db: TenantConnection - isUpsert: boolean - resources?: string[] - } -} - function createTusStore(agent: { httpsAgent: https.Agent; httpAgent: http.Agent }) { if (storageBackendType === 's3') { return new S3Store({ @@ -263,6 +251,7 @@ const authenticatedRoutes = fastifyPlugin( tenantId: req.tenantId, db: req.db, isUpsert: req.headers['x-upsert'] === 'true', + sbReqId: req.sbReqId, } }) @@ -350,7 +339,7 @@ const authenticatedRoutes = fastifyPlugin( } ) -const publicRoutes = fastifyPlugin( +export const publicRoutes = fastifyPlugin( async (fastify: FastifyInstance, options: { tusServer: Server; operation?: string }) => { fastify.register(async (fastify) => { fastify.addContentTypeParser('application/offset+octet-stream', (request, payload, done) => @@ -365,6 +354,7 @@ const publicRoutes = fastifyPlugin( tenantId: req.tenantId, db: req.db, isUpsert: req.headers['x-upsert'] === 'true', + sbReqId: req.sbReqId, } }) diff --git a/src/http/routes/tus/lifecycle.test.ts b/src/http/routes/tus/lifecycle.test.ts new file mode 100644 index 000000000..d1fd78f9d --- /dev/null +++ b/src/http/routes/tus/lifecycle.test.ts @@ -0,0 +1,125 @@ +import { EventEmitter } from 'node:events' +import type { ServerResponse } from 'node:http' +import { logSchema } from '@internal/monitoring' +import { Uploader } from '@storage/uploader' +import type { DataStore } from '@tus/server' +import { type MultiPartRequest, onIncomingRequest } from './lifecycle' + +const uploadId = 'tenant-123/bucket/object.txt/version-123' + +function createRawTusRequest({ + headers = {}, + method = 'POST', + sbReqId = 'sb-req-123', +}: { + headers?: Record + method?: string + sbReqId?: string +} = {}) { + const response = new EventEmitter() + const reqLog = { + error: vi.fn(), + warn: vi.fn(), + } + const dispose = vi.fn().mockResolvedValue(undefined) + + const request = { + headers, + log: reqLog, + method, + upload: { + db: { + dispose, + }, + isUpsert: false, + owner: 'owner-123', + storage: { + backend: {}, + db: {}, + location: {}, + }, + tenantId: 'tenant-123', + sbReqId, + }, + url: '/upload/resumable', + } as unknown as MultiPartRequest + + return { + dispose, + rawReq: { + method, + node: { + req: request, + res: response as unknown as ServerResponse, + }, + } as unknown as Parameters[0], + reqLog, + response, + } +} + +describe('tus lifecycle logging', () => { + afterEach(() => { + vi.restoreAllMocks() + }) + + it('logs db dispose failures with sbReqId through logSchema', async () => { + const error = new Error('dispose failed') + const errorSpy = vi.spyOn(logSchema, 'error').mockImplementation(() => undefined) + const { dispose, rawReq, reqLog, response } = createRawTusRequest({ + method: 'HEAD', + }) + + dispose.mockRejectedValueOnce(error) + + await onIncomingRequest(rawReq, uploadId, {} as DataStore) + + response.emit('finish') + await new Promise((resolve) => setImmediate(resolve)) + + expect(errorSpy).toHaveBeenCalledWith(reqLog, 'Error disposing db connection', { + type: 'db-connection', + error, + sbReqId: 'sb-req-123', + }) + expect(reqLog.error).not.toHaveBeenCalled() + }) + + it('logs upload metadata parse failures with sbReqId through logSchema', async () => { + const warningSpy = vi.spyOn(logSchema, 'warning').mockImplementation(() => undefined) + const { rawReq, reqLog } = createRawTusRequest({ + headers: { + 'upload-metadata': 'contentType invalid', + }, + }) + + await expect(onIncomingRequest(rawReq, uploadId, {} as DataStore)).rejects.toThrow(Error) + + expect(warningSpy).toHaveBeenCalledWith(reqLog, 'Failed to parse upload metadata', { + type: 'tus', + error: expect.any(Error), + sbReqId: 'sb-req-123', + }) + expect(reqLog.warn).not.toHaveBeenCalled() + }) + + it('logs user metadata parse failures with sbReqId through logSchema', async () => { + const warningSpy = vi.spyOn(logSchema, 'warning').mockImplementation(() => undefined) + const canUploadSpy = vi.spyOn(Uploader.prototype, 'canUpload').mockResolvedValue(undefined) + const { rawReq, reqLog } = createRawTusRequest({ + headers: { + 'upload-metadata': 'contentType aW1hZ2UvcG5n,metadata e2ludmFsaWQtanNvbg==', + }, + }) + + await onIncomingRequest(rawReq, uploadId, {} as DataStore) + + expect(canUploadSpy).toHaveBeenCalledOnce() + expect(warningSpy).toHaveBeenCalledWith(reqLog, 'Failed to parse user metadata', { + type: 'tus', + error: expect.any(Error), + sbReqId: 'sb-req-123', + }) + expect(reqLog.warn).not.toHaveBeenCalled() + }) +}) diff --git a/src/http/routes/tus/lifecycle.ts b/src/http/routes/tus/lifecycle.ts index e8c4f497a..1a995af23 100644 --- a/src/http/routes/tus/lifecycle.ts +++ b/src/http/routes/tus/lifecycle.ts @@ -1,12 +1,13 @@ import { TenantConnection } from '@internal/database' import { ERRORS, isRenderableError } from '@internal/errors' +import { logSchema } from '@internal/monitoring' import { UploadId } from '@storage/protocols/tus' import { Storage } from '@storage/storage' import { Uploader, validateMimeType } from '@storage/uploader' import { DataStore, Metadata, Upload } from '@tus/server' import { randomUUID } from 'crypto' +import type { FastifyBaseLogger } from 'fastify' import http from 'http' -import { BaseLogger } from 'pino' import type { ServerRequest as Request } from 'srvx' import { getConfig } from '../../../config' @@ -31,7 +32,8 @@ function getNodeRequest(rawReq: Request): MultiPartRequest { return req } export type MultiPartRequest = http.IncomingMessage & { - log: BaseLogger + executionError?: Error + log: FastifyBaseLogger upload: { storage: Storage db: TenantConnection @@ -39,6 +41,17 @@ export type MultiPartRequest = http.IncomingMessage & { tenantId: string isUpsert: boolean resources?: string[] + sbReqId?: string + } +} + +type TusError = { status_code: number; body: string } + +function getTusError(error: { render(): { statusCode: string; message: string } }): TusError { + const renderedError = error.render() + return { + status_code: parseInt(renderedError.statusCode, 10), + body: renderedError.message, } } @@ -55,7 +68,11 @@ export async function onIncomingRequest(rawReq: Request, id: string, datastore: res.on('finish', () => { req.upload.db.dispose().catch((e) => { - req.log.error({ error: e }, 'Error disposing db connection') + logSchema.error(req.log, 'Error disposing db connection', { + type: 'db-connection', + error: e, + sbReqId: req.upload.sbReqId, + }) }) }) @@ -104,7 +121,11 @@ export async function onIncomingRequest(rawReq: Request, id: string, datastore: contentType = parsedMetadata?.contentType ?? undefined rawMetadata = parsedMetadata?.metadata } catch (e) { - req.log.warn({ error: e }, 'Failed to parse upload metadata') + logSchema.warning(req.log, 'Failed to parse upload metadata', { + type: 'tus', + error: e, + sbReqId: req.upload.sbReqId, + }) throw ERRORS.InvalidParameter('upload-metadata', { error: e as Error, message: 'Invalid Upload-Metadata header', @@ -125,7 +146,11 @@ export async function onIncomingRequest(rawReq: Request, id: string, datastore: try { customMd = JSON.parse(rawMetadata) } catch (e) { - req.log.warn({ error: e }, 'Failed to parse user metadata') + logSchema.warning(req.log, 'Failed to parse user metadata', { + type: 'tus', + error: e, + sbReqId: req.upload.sbReqId, + }) } } @@ -326,15 +351,12 @@ export async function onUploadFinish(rawReq: Request, upload: Upload) { } } catch (e) { if (isRenderableError(e)) { - ;(e as any).status_code = parseInt(e.render().statusCode, 10) - ;(e as any).body = e.render().message + throw Object.assign(e, getTusError(e)) } throw e } } -type TusError = { status_code: number; body: string } - /** * Runs when there is an error on the TUS upload */ @@ -342,15 +364,12 @@ export function onResponseError(rawReq: Request, e: TusError | Error) { const req = getNodeRequest(rawReq) if (e instanceof Error) { - ;(req as any).executionError = e + req.executionError = e } else { - ;(req as any).executionError = ERRORS.TusError(e.body, e.status_code).withMetadata(e) + req.executionError = ERRORS.TusError(e.body, e.status_code).withMetadata(e) } if (isRenderableError(e)) { - return { - status_code: parseInt(e.render().statusCode, 10), - body: e.render().message, - } + return getTusError(e) } } diff --git a/src/internal/auth/jwks/generator.ts b/src/internal/auth/jwks/generator.ts index 81974c1e8..c62615a3e 100644 --- a/src/internal/auth/jwks/generator.ts +++ b/src/internal/auth/jwks/generator.ts @@ -24,7 +24,13 @@ export class UrlSigningJwkGenerator { /** * Generates url signing jwks for all tenants */ - static async generateUrlSigningJwksOnAllTenants(signal: AbortSignal) { + static async generateUrlSigningJwksOnAllTenants({ + signal, + sbReqId, + }: { + signal: AbortSignal + sbReqId?: string + }) { if (!pgQueueEnable || !isMultitenant || UrlSigningJwkGenerator.isRunning) { return } @@ -32,6 +38,7 @@ export class UrlSigningJwkGenerator { UrlSigningJwkGenerator.countSent = 0 logSchema.info(logger, '[Jwks Generator] Generating url signing jwks for all tenants', { type: 'jwk-generator', + sbReqId, }) try { const tenants = jwksManager.listTenantsMissingUrlSigningJwk(signal) @@ -44,6 +51,7 @@ export class UrlSigningJwkGenerator { host: '', ref: tenant, }, + sbReqId, }) }) ) @@ -55,6 +63,7 @@ export class UrlSigningJwkGenerator { `[Jwks Generator] Completed generation of url signing jwks for ${UrlSigningJwkGenerator.countSent} tenants`, { type: 'jwk-generator', + sbReqId, } ) } catch (e) { @@ -64,6 +73,7 @@ export class UrlSigningJwkGenerator { metadata: JSON.stringify({ completed: UrlSigningJwkGenerator.countSent, }), + sbReqId, }) } UrlSigningJwkGenerator.isRunning = false diff --git a/src/internal/database/migrations/migrate.test.ts b/src/internal/database/migrations/migrate.test.ts new file mode 100644 index 000000000..07e09f224 --- /dev/null +++ b/src/internal/database/migrations/migrate.test.ts @@ -0,0 +1,243 @@ +import { vi } from 'vitest' + +const { + mockRunBatchSend, + mockResetBatchSend, + mockInfo, + mockTransaction, + mockTable, + mockLastLocalMigrationName, +} = vi.hoisted(() => ({ + mockRunBatchSend: vi.fn(), + mockResetBatchSend: vi.fn(), + mockInfo: vi.fn(), + mockTransaction: vi.fn(), + mockTable: vi.fn(), + mockLastLocalMigrationName: vi.fn(), +})) + +vi.mock('../../../config', () => ({ + MultitenantMigrationStrategy: { + ON_REQUEST: 'ON_REQUEST', + PROGRESSIVE: 'PROGRESSIVE', + FULL_FLEET: 'FULL_FLEET', + }, + getConfig: () => ({ + isMultitenant: true, + multitenantDatabaseUrl: '', + pgQueueEnable: true, + databaseSSLRootCert: '', + dbMigrationStrategy: 'ON_REQUEST', + dbAnonRole: 'anon', + dbAuthenticatedRole: 'authenticated', + dbSuperUser: 'postgres', + dbServiceRole: 'service_role', + dbInstallRoles: false, + dbRefreshMigrationHashesOnMismatch: false, + dbMigrationFreezeAt: undefined, + icebergShards: 0, + multitenantDatabaseQueryTimeout: 1000, + }), +})) + +vi.mock('@storage/events', () => ({ + RunMigrationsOnTenants: class { + static batchSend = mockRunBatchSend + payload: Record + + constructor(payload: Record) { + this.payload = payload + } + }, + ResetMigrationsOnTenant: class { + static batchSend = mockResetBatchSend + payload: Record + + constructor(payload: Record) { + this.payload = payload + } + }, +})) + +vi.mock('../../monitoring', () => ({ + logger: {}, + logSchema: { + info: mockInfo, + warning: vi.fn(), + error: vi.fn(), + }, +})) + +vi.mock('../multitenant-db', () => ({ + multitenantKnex: { + transaction: mockTransaction, + table: mockTable, + }, +})) + +vi.mock('../tenant', () => ({ + getTenantConfig: vi.fn(), + TenantMigrationStatus: { + COMPLETED: 'COMPLETED', + FAILED: 'FAILED', + FAILED_STALE: 'FAILED_STALE', + }, +})) + +vi.mock('../pool', () => ({ + searchPath: ['storage', 'public'], +})) + +vi.mock('./files', () => ({ + lastLocalMigrationName: mockLastLocalMigrationName, + loadMigrationFilesCached: vi.fn(), + localMigrationFiles: vi.fn(), +})) + +vi.mock('./progressive', () => ({ + ProgressiveMigrations: class { + start() { + return undefined + } + }, +})) + +import { resetMigrationsOnTenants, runMigrationsOnAllTenants } from './migrate' + +function createTenantQueryMock(batches: Array>) { + let batchIndex = 0 + + return vi.fn(() => { + const nestedBuilder = { + where: (arg?: unknown) => { + if (typeof arg === 'function') { + arg(nestedBuilder) + } + return nestedBuilder + }, + whereNotIn: () => nestedBuilder, + orWhere: () => nestedBuilder, + } + + const query = { + select: () => query, + where: (arg?: unknown) => { + if (typeof arg === 'function') { + arg(nestedBuilder) + } + return query + }, + whereIn: () => query, + orderBy: () => query, + limit: vi.fn(async () => batches[batchIndex++] ?? []), + } + + return query + }) +} + +function makeTransaction() { + return { + raw: vi.fn().mockResolvedValue({ + rows: [{ locked: true }], + }), + commit: vi.fn(), + rollback: vi.fn(), + } +} + +describe('migration helper request id propagation', () => { + beforeEach(() => { + vi.clearAllMocks() + mockLastLocalMigrationName.mockResolvedValue('storage-schema') + }) + + it('passes sbReqId into fleet migration jobs and scheduler logs', async () => { + const trx = makeTransaction() + mockTransaction.mockResolvedValue(trx) + mockTable.mockImplementation( + createTenantQueryMock([ + [ + { id: 'tenant-a', cursor_id: 1 }, + { id: 'tenant-b', cursor_id: 2 }, + ], + [], + ]) + ) + + await expect( + runMigrationsOnAllTenants({ signal: new AbortController().signal, sbReqId: 'sb-req-123' }) + ).resolves.toBeUndefined() + + expect(mockRunBatchSend).toHaveBeenCalledTimes(1) + const [[batch]] = mockRunBatchSend.mock.calls + expect((batch[0] as { payload: Record }).payload).toMatchObject({ + tenantId: 'tenant-a', + sbReqId: 'sb-req-123', + tenant: { + host: '', + ref: 'tenant-a', + }, + }) + expect((batch[1] as { payload: Record }).payload).toMatchObject({ + tenantId: 'tenant-b', + sbReqId: 'sb-req-123', + tenant: { + host: '', + ref: 'tenant-b', + }, + }) + expect(mockInfo).toHaveBeenCalledWith( + expect.anything(), + '[Migrations] Instance acquired the lock', + expect.objectContaining({ + type: 'migrations', + sbReqId: 'sb-req-123', + }) + ) + expect(mockInfo).toHaveBeenCalledWith( + expect.anything(), + '[Migrations] Async migrations jobs completed', + expect.objectContaining({ + type: 'migrations', + sbReqId: 'sb-req-123', + }) + ) + }) + + it('passes sbReqId into fleet reset jobs and scheduler logs', async () => { + const trx = makeTransaction() + mockTransaction.mockResolvedValue(trx) + mockTable.mockImplementation(createTenantQueryMock([[{ id: 'tenant-c', cursor_id: 1 }], []])) + + await expect( + resetMigrationsOnTenants({ + till: 'storage-schema', + markCompletedTillMigration: 'create-migrations-table', + signal: new AbortController().signal, + sbReqId: 'sb-req-123', + }) + ).resolves.toBeUndefined() + + expect(mockResetBatchSend).toHaveBeenCalledTimes(1) + const [[batch]] = mockResetBatchSend.mock.calls + expect((batch[0] as { payload: Record }).payload).toMatchObject({ + tenantId: 'tenant-c', + untilMigration: 'storage-schema', + markCompletedTillMigration: 'create-migrations-table', + sbReqId: 'sb-req-123', + tenant: { + host: '', + ref: 'tenant-c', + }, + }) + expect(mockInfo).toHaveBeenCalledWith( + expect.anything(), + '[Migrations] reset migrations jobs scheduled', + expect.objectContaining({ + type: 'migrations', + sbReqId: 'sb-req-123', + }) + ) + }) +}) diff --git a/src/internal/database/migrations/migrate.ts b/src/internal/database/migrations/migrate.ts index b4a3c7d40..d05b5dd94 100644 --- a/src/internal/database/migrations/migrate.ts +++ b/src/internal/database/migrations/migrate.ts @@ -67,7 +67,7 @@ export function startAsyncMigrations(signal: AbortSignal) { progressiveMigrations.start(signal) break case MultitenantMigrationStrategy.FULL_FLEET: - runMigrationsOnAllTenants(signal).catch((e) => { + runMigrationsOnAllTenants({ signal }).catch((e) => { logger.error( { type: 'migrations', @@ -205,7 +205,10 @@ export async function areMigrationsUpToDate(tenantId: string) { ) } -export async function obtainLockOnMultitenantDB(fn: (tnx: Knex.Transaction) => Promise) { +export async function obtainLockOnMultitenantDB( + fn: (tnx: Knex.Transaction) => Promise, + options?: { sbReqId?: string } +) { const trx = await multitenantKnex.transaction() try { const result = await trx.raw( @@ -221,6 +224,7 @@ export async function obtainLockOnMultitenantDB(fn: (tnx: Knex.Transaction) = logSchema.info(logger, '[Migrations] Instance acquired the lock', { type: 'migrations', + sbReqId: options?.sbReqId, }) const fnResult = await fn(trx) @@ -236,67 +240,83 @@ export async function resetMigrationsOnTenants(options: { till: keyof typeof DBMigration markCompletedTillMigration?: keyof typeof DBMigration signal: AbortSignal + sbReqId?: string }) { - await obtainLockOnMultitenantDB(async () => { - logSchema.info(logger, '[Migrations] Listing all tenants', { - type: 'migrations', - }) + await obtainLockOnMultitenantDB( + async () => { + logSchema.info(logger, '[Migrations] Listing all tenants', { + type: 'migrations', + sbReqId: options.sbReqId, + }) - const tenants = listTenantsToResetMigrations(options.till, options.signal) - - for await (const tenantBatch of tenants) { - await ResetMigrationsOnTenant.batchSend( - tenantBatch.map((tenant) => { - return new ResetMigrationsOnTenant({ - tenantId: tenant, - untilMigration: options.till, - markCompletedTillMigration: options.markCompletedTillMigration, - tenant: { - host: '', - ref: tenant, - }, + const tenants = listTenantsToResetMigrations(options.till, options.signal) + + for await (const tenantBatch of tenants) { + await ResetMigrationsOnTenant.batchSend( + tenantBatch.map((tenant) => { + return new ResetMigrationsOnTenant({ + tenantId: tenant, + untilMigration: options.till, + markCompletedTillMigration: options.markCompletedTillMigration, + sbReqId: options.sbReqId, + tenant: { + host: '', + ref: tenant, + }, + }) }) - }) - ) - } + ) + } - logSchema.info(logger, '[Migrations] reset migrations jobs scheduled', { - type: 'migrations', - }) - }) + logSchema.info(logger, '[Migrations] reset migrations jobs scheduled', { + type: 'migrations', + sbReqId: options.sbReqId, + }) + }, + { sbReqId: options.sbReqId } + ) } /** * Runs migrations for all tenants * only one instance at the time is allowed to run */ -export async function runMigrationsOnAllTenants(signal: AbortSignal) { +export async function runMigrationsOnAllTenants(options: { + signal: AbortSignal + sbReqId?: string +}) { if (!pgQueueEnable) { return } - await obtainLockOnMultitenantDB(async () => { - logSchema.info(logger, '[Migrations] Listing all tenants', { - type: 'migrations', - }) - const tenants = listTenantsToMigrate(signal) - for await (const tenantBatch of tenants) { - await RunMigrationsOnTenants.batchSend( - tenantBatch.map((tenant) => { - return new RunMigrationsOnTenants({ - tenantId: tenant, - tenant: { - host: '', - ref: tenant, - }, + await obtainLockOnMultitenantDB( + async () => { + logSchema.info(logger, '[Migrations] Listing all tenants', { + type: 'migrations', + sbReqId: options.sbReqId, + }) + const tenants = listTenantsToMigrate(options.signal) + for await (const tenantBatch of tenants) { + await RunMigrationsOnTenants.batchSend( + tenantBatch.map((tenant) => { + return new RunMigrationsOnTenants({ + tenantId: tenant, + sbReqId: options.sbReqId, + tenant: { + host: '', + ref: tenant, + }, + }) }) - }) - ) - } + ) + } - logSchema.info(logger, '[Migrations] Async migrations jobs completed', { - type: 'migrations', - }) - }) + logSchema.info(logger, '[Migrations] Async migrations jobs completed', { + type: 'migrations', + sbReqId: options.sbReqId, + }) + }, + { sbReqId: options.sbReqId } + ) } /** @@ -682,15 +702,18 @@ function runMigrations({ for (const migration of migrationsToRun) { try { const ignore = migration.sql.includes('-- postgres-migrations ignore') + const runnableMigration = ignore + ? { + ...migration, + sql: 'SELECT 1;', + contents: 'SELECT 1;', + } + : migration - if (ignore) { - ;(migration as any).sql = 'SELECT 1;' - ;(migration as any).contents = 'SELECT 1;' - } const result = await runMigration( migrationTableName, client - )(runMigrationTransformers(migration, transformers)) + )(runMigrationTransformers(runnableMigration, transformers)) completedMigrations.push(result) } catch (e) { throw ERRORS.DatabaseError( diff --git a/src/internal/database/migrations/progressive.test.ts b/src/internal/database/migrations/progressive.test.ts index fe4245f19..5b5338690 100644 --- a/src/internal/database/migrations/progressive.test.ts +++ b/src/internal/database/migrations/progressive.test.ts @@ -182,12 +182,20 @@ describe('ProgressiveMigrations', () => { expect(mockRunMigrationsBatchSend).toHaveBeenCalledTimes(2) expect( - (mockRunMigrationsBatchSend.mock.calls[0][0][0] as { payload: { tenantId: string } }).payload + ( + mockRunMigrationsBatchSend.mock.calls[0][0][0] as unknown as { + payload: { tenantId: string } + } + ).payload ).toMatchObject({ tenantId: 'tenant-a', }) expect( - (mockRunMigrationsBatchSend.mock.calls[1][0][0] as { payload: { tenantId: string } }).payload + ( + mockRunMigrationsBatchSend.mock.calls[1][0][0] as unknown as { + payload: { tenantId: string } + } + ).payload ).toMatchObject({ tenantId: 'tenant-b', }) @@ -218,12 +226,20 @@ describe('ProgressiveMigrations', () => { expect(mockRunMigrationsBatchSend).toHaveBeenCalledTimes(2) expect( - (mockRunMigrationsBatchSend.mock.calls[0][0][0] as { payload: { tenantId: string } }).payload + ( + mockRunMigrationsBatchSend.mock.calls[0][0][0] as unknown as { + payload: { tenantId: string } + } + ).payload ).toMatchObject({ tenantId: 'tenant-a', }) expect( - (mockRunMigrationsBatchSend.mock.calls[1][0][0] as { payload: { tenantId: string } }).payload + ( + mockRunMigrationsBatchSend.mock.calls[1][0][0] as unknown as { + payload: { tenantId: string } + } + ).payload ).toMatchObject({ tenantId: 'tenant-b', }) @@ -257,7 +273,11 @@ describe('ProgressiveMigrations', () => { await expect(migrations.flush(1)).resolves.toBeUndefined() expect(mockRunMigrationsBatchSend).toHaveBeenCalledTimes(1) expect( - (mockRunMigrationsBatchSend.mock.calls[0][0][0] as { payload: { tenantId: string } }).payload + ( + mockRunMigrationsBatchSend.mock.calls[0][0][0] as unknown as { + payload: { tenantId: string } + } + ).payload ).toMatchObject({ tenantId: 'tenant-a', }) @@ -289,7 +309,11 @@ describe('ProgressiveMigrations', () => { expect(mockRunMigrationsBatchSend).toHaveBeenCalledTimes(1) expect(mockRunMigrationsBatchSend.mock.calls[0][0]).toHaveLength(1) expect( - (mockRunMigrationsBatchSend.mock.calls[0][0][0] as { payload: { tenantId: string } }).payload + ( + mockRunMigrationsBatchSend.mock.calls[0][0][0] as unknown as { + payload: { tenantId: string } + } + ).payload ).toMatchObject({ tenantId: 'tenant-a', }) @@ -330,7 +354,11 @@ describe('ProgressiveMigrations', () => { expect(mockRunMigrationsBatchSend).toHaveBeenCalledTimes(1) expect(mockRunMigrationsBatchSend.mock.calls[0][0]).toHaveLength(1) expect( - (mockRunMigrationsBatchSend.mock.calls[0][0][0] as { payload: { tenantId: string } }).payload + ( + mockRunMigrationsBatchSend.mock.calls[0][0][0] as unknown as { + payload: { tenantId: string } + } + ).payload ).toMatchObject({ tenantId: 'tenant-a', }) diff --git a/src/internal/monitoring/index.ts b/src/internal/monitoring/index.ts index 488d888d5..43400de45 100644 --- a/src/internal/monitoring/index.ts +++ b/src/internal/monitoring/index.ts @@ -1 +1,2 @@ export * from './logger' +export * from './request-context' diff --git a/src/internal/monitoring/logflare.test.ts b/src/internal/monitoring/logflare.test.ts new file mode 100644 index 000000000..d057f1762 --- /dev/null +++ b/src/internal/monitoring/logflare.test.ts @@ -0,0 +1,75 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +const { defaultPreparePayloadMock } = vi.hoisted(() => ({ + defaultPreparePayloadMock: vi.fn(), +})) + +vi.mock('pino-logflare', () => ({ + defaultPreparePayload: defaultPreparePayloadMock, +})) + +type PayloadMeta = Parameters[1] + +describe('logflare helpers', () => { + beforeEach(() => { + defaultPreparePayloadMock.mockReset() + }) + + afterEach(() => { + vi.restoreAllMocks() + vi.resetModules() + }) + + it('promotes project and sbReqId (as request_id) onto the prepared payload', async () => { + defaultPreparePayloadMock.mockReturnValue({ + log_entry: 'hello', + metadata: { level: 'info' }, + }) + + const { onPreparePayload } = await import('./logflare') + const meta = {} as PayloadMeta + const payload = { + project: 'tenant-a', + sbReqId: 'sb-req-123', + msg: { text: 'hello' }, + } + + expect(onPreparePayload(payload, meta)).toEqual({ + log_entry: 'hello', + metadata: { level: 'info' }, + project: 'tenant-a', + request_id: 'sb-req-123', + }) + expect(defaultPreparePayloadMock).toHaveBeenCalledWith(payload, meta) + }) + + it('leaves project and request_id undefined when they are missing', async () => { + defaultPreparePayloadMock.mockReturnValue({ + log_entry: 'hello', + metadata: {}, + }) + + const { onPreparePayload } = await import('./logflare') + const payload = { + msg: { text: 'hello' }, + } + + expect(onPreparePayload(payload, {} as PayloadMeta)).toEqual({ + log_entry: 'hello', + metadata: {}, + project: undefined, + request_id: undefined, + }) + }) + + it('logs transport errors to console.error', async () => { + const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}) + const { onError } = await import('./logflare') + const err = new Error('boom') + err.stack = 'stack-trace' + + onError({}, err) + + expect(errorSpy).toHaveBeenCalledWith('[Logflare][Error] boom - stack-trace') + }) +}) diff --git a/src/internal/monitoring/logflare.ts b/src/internal/monitoring/logflare.ts index b920b6b25..2225be09c 100644 --- a/src/internal/monitoring/logflare.ts +++ b/src/internal/monitoring/logflare.ts @@ -1,12 +1,14 @@ import { defaultPreparePayload } from 'pino-logflare' -import { PayloadMeta } from 'pino-logflare/dist/httpStream' -export function onPreparePayload(payload: Record, meta: PayloadMeta) { +type PayloadMeta = Parameters[1] + +export function onPreparePayload(payload: Record, meta: PayloadMeta) { const item = defaultPreparePayload(payload, meta) item.project = payload.project + item.request_id = payload.sbReqId return item } -export function onError(_payload: Record, err: Error) { +export function onError(_payload: Record, err: Error) { console.error(`[Logflare][Error] ${err.message} - ${err.stack}`) } diff --git a/src/internal/monitoring/logger.test.ts b/src/internal/monitoring/logger.test.ts index 4780c3826..128f6b5c3 100644 --- a/src/internal/monitoring/logger.test.ts +++ b/src/internal/monitoring/logger.test.ts @@ -67,4 +67,82 @@ describe('logger serializers', () => { statusCode: 503, }) }) + + test('buildTransport wires logflare hooks when logflare is enabled', async () => { + const loggerStub = { + child: vi.fn(), + debug: vi.fn(), + error: vi.fn(), + fatal: vi.fn(), + flush: vi.fn(), + info: vi.fn(), + level: 'info', + silent: vi.fn(), + trace: vi.fn(), + warn: vi.fn(), + } + loggerStub.child.mockReturnValue(loggerStub) + + const pinoMock = Object.assign( + vi.fn(() => loggerStub), + { + stdTimeFunctions: { + isoTime: vi.fn(), + }, + } + ) + + vi.doMock('pino', () => ({ + default: pinoMock, + Logger: class Logger {}, + })) + vi.doMock('../../config', () => ({ + getConfig: vi.fn(() => ({ + logLevel: 'debug', + logflareApiKey: 'api-key', + logflareBatchSize: 25, + logflareEnabled: true, + logflareSourceToken: 'source-token', + region: 'local', + })), + })) + + const { buildTransport } = await import('./logger') + interface LogflareTransportOptions { + apiKey: string + sourceToken: string + batchSize: number + onPreparePayload: { module: string; method: string } + onError: { module: string; method: string } + } + const transport = buildTransport() as { + targets: Array<{ + level: string + target: string + options: LogflareTransportOptions + }> + } + + const logflareTarget = transport.targets.find((target) => target.target === 'pino-logflare') + + expect(logflareTarget).toMatchObject({ + level: 'debug', + target: 'pino-logflare', + options: { + apiKey: 'api-key', + sourceToken: 'source-token', + batchSize: 25, + onPreparePayload: { + method: 'onPreparePayload', + }, + onError: { + method: 'onError', + }, + }, + }) + expect(logflareTarget?.options.onPreparePayload.module).toMatch(/logflare$/) + expect(logflareTarget?.options.onPreparePayload.module).toBe( + logflareTarget?.options.onError.module + ) + }) }) diff --git a/src/internal/monitoring/logger.ts b/src/internal/monitoring/logger.ts index 2ae539602..3655146e0 100644 --- a/src/internal/monitoring/logger.ts +++ b/src/internal/monitoring/logger.ts @@ -80,6 +80,7 @@ export interface RequestLog { operation?: string resources?: string[] serverTimes?: { spanName: string; duration: number }[] + sbReqId?: string } export interface EventLog { @@ -92,6 +93,7 @@ export interface EventLog { project: string resources?: string[] reqId?: string + sbReqId?: string } interface ErrorLog { @@ -99,12 +101,14 @@ interface ErrorLog { error?: Error | unknown project?: string metadata?: string + sbReqId?: string } interface InfoLog { type: string project?: string metadata?: string + sbReqId?: string } export const logSchema = { @@ -202,6 +206,7 @@ const allowlistedHeaders = new Set([ 'transfer-encoding', 'x-kong-proxy-latency', 'x-kong-upstream-latency', + 'sb-request-id', 'sb-gateway-mode', 'sb-gateway-version', 'x-transformations', diff --git a/src/internal/monitoring/request-context.test.ts b/src/internal/monitoring/request-context.test.ts new file mode 100644 index 000000000..69f4568a8 --- /dev/null +++ b/src/internal/monitoring/request-context.test.ts @@ -0,0 +1,45 @@ +import { getSbReqId, getSbReqIdFromPayload, SUPABASE_REQUEST_ID_HEADER } from './request-context' + +describe('request log context helpers', () => { + it('extracts sbReqId values from request headers', () => { + expect( + getSbReqId({ + [SUPABASE_REQUEST_ID_HEADER]: ['sb-req-123', 'sb-req-456'], + }) + ).toBe('sb-req-123') + + expect( + getSbReqId({ + [SUPABASE_REQUEST_ID_HEADER]: [], + }) + ).toBeUndefined() + }) + + it('extracts sbReqId values from queue payloads', () => { + expect( + getSbReqIdFromPayload({ + sbReqId: 'sb-req-123', + reqId: 'trace-123', + }) + ).toBe('sb-req-123') + + expect( + getSbReqIdFromPayload({ + event: { + payload: { + sbReqId: 'sb-req-456', + reqId: 'trace-456', + }, + }, + }) + ).toBe('sb-req-456') + + expect( + getSbReqIdFromPayload({ + reqId: 'trace-only', + }) + ).toBeUndefined() + expect(getSbReqIdFromPayload({ sbReqId: '' })).toBeUndefined() + expect(getSbReqIdFromPayload(undefined)).toBeUndefined() + }) +}) diff --git a/src/internal/monitoring/request-context.ts b/src/internal/monitoring/request-context.ts new file mode 100644 index 000000000..cb5bc8af9 --- /dev/null +++ b/src/internal/monitoring/request-context.ts @@ -0,0 +1,41 @@ +import type { IncomingHttpHeaders } from 'node:http' + +export const SUPABASE_REQUEST_ID_HEADER = 'sb-request-id' + +export function getSbReqId(headers: IncomingHttpHeaders): string | undefined { + const sbReqId = headers[SUPABASE_REQUEST_ID_HEADER] + + return getNonEmptyString(sbReqId) +} + +export function getSbReqIdFromPayload(payload: unknown): string | undefined { + if (!payload || typeof payload !== 'object') { + return undefined + } + + const directSbReqId = getNonEmptyString((payload as { sbReqId?: unknown }).sbReqId) + + if (directSbReqId) { + return directSbReqId + } + + // Webhook payloads wrap the original event under `event.payload` + // every other job carries sbReqId at the top level. + const nestedSbReqId = getNonEmptyString( + (payload as { event?: { payload?: { sbReqId?: unknown } } }).event?.payload?.sbReqId + ) + + return nestedSbReqId +} + +function getNonEmptyString(value: unknown): string | undefined { + if (Array.isArray(value)) { + return getNonEmptyString(value[0]) + } + + if (typeof value !== 'string' || value.length === 0) { + return undefined + } + + return value +} diff --git a/src/internal/queue/constants.ts b/src/internal/queue/constants.ts new file mode 100644 index 000000000..35fe46baf --- /dev/null +++ b/src/internal/queue/constants.ts @@ -0,0 +1,5 @@ +/** + * Placeholder tenant for events that run across the whole fleet + * (e.g. pg-boss maintenance) and have no single tenant context. + */ +export const SYSTEM_TENANT = { ref: '', host: '' } as const diff --git a/src/internal/queue/event.ts b/src/internal/queue/event.ts index 671fd0397..eca236dc5 100644 --- a/src/internal/queue/event.ts +++ b/src/internal/queue/event.ts @@ -13,6 +13,7 @@ export interface BasePayload { singletonKey?: string scheduleAt?: Date reqId?: string + sbReqId?: string tenant: { ref: string host: string @@ -31,20 +32,14 @@ function withPayloadVersion( } } -export type StaticThis> = BaseEventConstructor +export type EventInputPayload = Omit +export type QueueEvent = Event +export type StaticThis = BaseEventConstructor -interface BaseEventConstructor> { +interface BaseEventConstructor { version: string - new (...args: any): Base - - send( - this: StaticThis, - payload: Omit - ): Promise - - eventName(): string - getWorkerOptions(): WorkOptions + new (payload: TPayload): QueueEvent> } /** @@ -77,7 +72,7 @@ export class Event> { return undefined } - static getSendOptions>(payload: T['payload']): SendOptions | undefined { + static getSendOptions(payload: BasePayload): SendOptions | undefined { return undefined } @@ -93,15 +88,20 @@ export class Event> { // no-op } - static batchSend[]>(messages: T) { + static batchSend( + this: StaticThis, + messages: Array<{ payload: TPayload; send(): Promise }> + ) { + const eventClass = this as typeof Event + if (!pgQueueEnable) { - if (this.allowSync) { + if (eventClass.allowSync) { return Promise.all(messages.map((message) => message.send())) } else { logger.warn( { type: 'queue', - eventType: this.eventName(), + eventType: eventClass.eventName(), }, '[Queue] skipped sending batch messages' ) @@ -111,11 +111,9 @@ export class Event> { return Queue.getInstance().insert( messages.map((message) => { - const payloadWithVersion = withPayloadVersion( - message.payload as (typeof message)['payload'], - this.version - ) - const sendOptions = (this.getSendOptions(payloadWithVersion) as PgBoss.JobInsert) || {} + const payloadWithVersion = withPayloadVersion(message.payload, eventClass.version) + const sendOptions = + (eventClass.getSendOptions(payloadWithVersion) as PgBoss.JobInsert) || {} if (payloadWithVersion.scheduleAt) { sendOptions.startAfter = new Date(payloadWithVersion.scheduleAt) @@ -123,48 +121,45 @@ export class Event> { return { ...sendOptions, - name: this.getQueueName(), + name: eventClass.getQueueName(), data: payloadWithVersion, - deadLetter: this.deadLetterQueueName(), + deadLetter: eventClass.deadLetterQueueName(), } }) ) } - static send>( - this: StaticThis, - payload: Omit, + static send( + this: StaticThis, + payload: Omit, opts?: SendOptions & { tnx?: Knex } ) { - const that = new this(withPayloadVersion(payload as T['payload'], this.version)) + const that = new this(withPayloadVersion(payload as TPayload, this.version)) return that.send(opts) } - static invoke>( - this: StaticThis, - payload: Omit + static invoke( + this: StaticThis, + payload: Omit ) { - const that = new this(withPayloadVersion(payload as T['payload'], this.version)) + const that = new this(withPayloadVersion(payload as TPayload, this.version)) return that.invoke() } - static invokeOrSend>( - this: StaticThis, - payload: Omit, + static invokeOrSend( + this: StaticThis, + payload: Omit, options?: SendOptions & { sendWhenError?: (error: unknown) => boolean } ) { - const that = new this(withPayloadVersion(payload as T['payload'], this.version)) + const that = new this(withPayloadVersion(payload as TPayload, this.version)) return that.invokeOrSend(options) } - static handle( - job: Job['payload']> | Job['payload']>[], - opts?: { signal?: AbortSignal } - ) { + static handle(job: Job | Job[], opts?: { signal?: AbortSignal }) { throw new Error('not implemented') } - static async shouldSend(payload: any) { + static async shouldSend(payload: BasePayload) { if (isMultitenant && payload?.tenant?.ref) { // Do not send an event if disabled for this specific tenant const tenant = await getTenantConfig(payload.tenant.ref) @@ -217,11 +212,13 @@ export class Event> { if (sendOptions?.sendWhenError && !sendOptions.sendWhenError(e)) { throw e } + logSchema.error(logger, '[Queue] Error invoking event synchronously, sending to queue', { type: 'queue', project: this.payload.tenant?.ref, error: e, metadata: JSON.stringify(this.payload), + sbReqId: this.payload.sbReqId, }) return this.send(sendOptions) @@ -326,6 +323,7 @@ export class Event> { type: 'queue', error: e, metadata: JSON.stringify(this.payload), + sbReqId: this.payload.sbReqId, } ) diff --git a/src/internal/queue/index.ts b/src/internal/queue/index.ts index 209afebc0..097a4b411 100644 --- a/src/internal/queue/index.ts +++ b/src/internal/queue/index.ts @@ -1,2 +1,3 @@ +export * from './constants' export * from './event' export * from './queue' diff --git a/src/internal/queue/queue.test.ts b/src/internal/queue/queue.test.ts new file mode 100644 index 000000000..4e905437d --- /dev/null +++ b/src/internal/queue/queue.test.ts @@ -0,0 +1,109 @@ +import { vi } from 'vitest' + +type QueueModule = typeof import('./queue') +type MonitoringModule = typeof import('../monitoring') + +async function loadQueueModule() { + vi.resetModules() + + const configModule = await import('../../config') + configModule.getConfig({ reload: true }) + + const queueModule = (await import('./queue')) as QueueModule + const monitoringModule = (await import('../monitoring')) as MonitoringModule + + return { queueModule, monitoringModule } +} + +describe('Queue worker sbReqId propagation', () => { + afterEach(() => { + vi.useRealTimers() + vi.restoreAllMocks() + vi.resetModules() + }) + + it.each([ + ['top-level sbReqId', { sbReqId: 'sb-req-123', reqId: 'trace-123' }, 'sb-req-123'], + [ + 'nested webhook sbReqId', + { + event: { + payload: { + sbReqId: 'sb-req-456', + reqId: 'trace-456', + }, + }, + }, + 'sb-req-456', + ], + [ + 'missing sbReqId', + { + reqId: 'trace-only', + }, + undefined, + ], + ])('propagates %s into queue job handling', async (_label, data, expectedSbReqId) => { + vi.useFakeTimers() + + const { + queueModule: { Queue }, + monitoringModule, + } = await loadQueueModule() + const abortController = new AbortController() + const errorSpy = vi.spyOn(monitoringModule.logSchema, 'error') + let onMessageSbReqId: string | undefined + + ;(Queue as unknown as { pgBoss: unknown }).pgBoss = { + complete: vi.fn().mockResolvedValue(undefined), + fail: vi.fn().mockResolvedValue(undefined), + fetch: vi + .fn() + .mockResolvedValueOnce([ + { + id: 'job-1', + data, + priority: 1, + retryCount: 0, + retryLimit: 1, + }, + ]) + .mockResolvedValueOnce([]), + } + + ;(Queue as unknown as { pollQueue: Function }).pollQueue( + { + getQueueName: () => 'test-queue', + getWorkerOptions: () => ({ + pollingIntervalSeconds: 1, + }), + handle: vi.fn(async () => { + throw new Error('boom') + }), + name: 'TestEvent', + }, + { + concurrentTaskCount: 1, + onMessage: (job: { + data: { + sbReqId?: string + event?: { payload?: { sbReqId?: string } } + } + }) => { + onMessageSbReqId = job.data.sbReqId ?? job.data.event?.payload?.sbReqId + }, + signal: abortController.signal, + } + ) + + await vi.advanceTimersByTimeAsync(1_000) + abortController.abort() + + expect(onMessageSbReqId).toBe(expectedSbReqId) + const processingErrorCall = errorSpy.mock.calls.find( + ([, message]) => message === '[Queue Handler] Error while processing job TestEvent' + ) + expect(processingErrorCall?.[0]).toBe(monitoringModule.logger) + expect((processingErrorCall?.[2] as { sbReqId?: string }).sbReqId).toBe(expectedSbReqId) + }) +}) diff --git a/src/internal/queue/queue.ts b/src/internal/queue/queue.ts index 102fc7009..15c69b277 100644 --- a/src/internal/queue/queue.ts +++ b/src/internal/queue/queue.ts @@ -3,20 +3,25 @@ import { QueueDB } from '@internal/queue/database' import { Semaphore } from '@shopify/semaphore' import PgBoss, { Db, Job, JobWithMetadata } from 'pg-boss' import { getConfig } from '../../config' -import { logger, logSchema } from '../monitoring' +import { getSbReqIdFromPayload, logger, logSchema } from '../monitoring' import { queueJobCompleted, queueJobError, queueJobRetryFailed } from '../monitoring/metrics' import { Event } from './event' -type SubclassOfBaseClass = (new ( - payload: any -) => Event) & { - [K in keyof typeof Event]: (typeof Event)[K] +type RegisteredEvent = { + deadLetterQueueName(): string + getQueueName(): string + getQueueOptions(): ReturnType + getWorkerOptions(): ReturnType + handle(job: Job | Job[], opts?: { signal?: AbortSignal }): unknown + onClose(): unknown + onStart(): unknown + name: string } export const PG_BOSS_SCHEMA = 'pgboss_v10' export abstract class Queue { - protected static events: SubclassOfBaseClass[] = [] + protected static events: RegisteredEvent[] = [] private static pgBoss?: PgBoss private static pgBossDb?: PgBoss.Db @@ -184,7 +189,7 @@ export abstract class Queue { return this.pgBossDb } - static register(event: T) { + static register(event: T) { Queue.events.push(event) } @@ -234,7 +239,7 @@ export abstract class Queue { } protected static async registerTask( - event: SubclassOfBaseClass, + event: RegisteredEvent, maxConcurrentTasks: number, onMessage?: (job: Job) => void, signal?: AbortSignal @@ -276,7 +281,7 @@ export abstract class Queue { } protected static pollQueue( - event: SubclassOfBaseClass, + event: RegisteredEvent, queueOpts: { concurrentTaskCount: number onMessage?: (job: Job) => void @@ -344,6 +349,8 @@ export abstract class Queue { await Promise.allSettled( jobs.map(async (job) => { const lock = await semaphore.acquire() + const sbReqId = getSbReqIdFromPayload(job.data) + try { queueOpts.onMessage?.(job as Job) @@ -379,6 +386,7 @@ export abstract class Queue { type: 'queue-task', error: e, metadata: JSON.stringify(job), + sbReqId, }) } @@ -386,6 +394,7 @@ export abstract class Queue { type: 'queue-task', error: e, metadata: JSON.stringify(job), + sbReqId, }) throw e diff --git a/src/start/server.ts b/src/start/server.ts index 0e5c6e3ad..b0e23f7e9 100644 --- a/src/start/server.ts +++ b/src/start/server.ts @@ -16,7 +16,7 @@ import { startAsyncMigrations, } from '@internal/database/migrations' import { logger, logSchema } from '@internal/monitoring' -import { Queue } from '@internal/queue' +import { Queue, SYSTEM_TENANT } from '@internal/queue' import { KnexShardStoreFactory, ShardCatalog } from '@internal/sharding' import { getGlobal } from '@platformatic/globals' import { registerWorkers } from '@storage/events' @@ -262,5 +262,9 @@ function registerPlatformaticCloseHandler() { } async function upgrades() { - return Promise.all([SyncCatalogIds.invoke({})]) + return Promise.all([ + SyncCatalogIds.invoke({ + tenant: SYSTEM_TENANT, + }), + ]) } diff --git a/src/storage/database/adapter.ts b/src/storage/database/adapter.ts index d90b0beaf..81e31f01e 100644 --- a/src/storage/database/adapter.ts +++ b/src/storage/database/adapter.ts @@ -38,6 +38,7 @@ export interface TransactionOptions { export interface DatabaseOptions { tenantId: string reqId?: string + sbReqId?: string latestMigration?: keyof typeof DBMigration host: string tnx?: TNX @@ -57,6 +58,7 @@ export interface Database { tenantHost: string tenantId: string reqId?: string + sbReqId?: string role?: string connection: TenantConnection diff --git a/src/storage/database/knex.ts b/src/storage/database/knex.ts index 5e50fa510..a684d6566 100644 --- a/src/storage/database/knex.ts +++ b/src/storage/database/knex.ts @@ -48,6 +48,7 @@ export class StorageKnexDB implements Database { public readonly tenantHost: string public readonly tenantId: string public readonly reqId: string | undefined + public readonly sbReqId: string | undefined public readonly role?: string public readonly latestMigration?: keyof typeof DBMigration @@ -58,6 +59,7 @@ export class StorageKnexDB implements Database { this.tenantHost = options.host this.tenantId = options.tenantId this.reqId = options.reqId + this.sbReqId = options.sbReqId this.role = connection?.role this.latestMigration = options.latestMigration } diff --git a/src/storage/events/base-event.ts b/src/storage/events/base-event.ts index f5a6c0464..f45d24240 100644 --- a/src/storage/events/base-event.ts +++ b/src/storage/events/base-event.ts @@ -26,21 +26,24 @@ export abstract class BaseEvent> extends * Sends a message as a webhook * @param payload */ - static async sendWebhook>( - this: StaticThis, - payload: Omit - ) { + static async sendWebhook< + TPayload extends BasePayload & { + bucketId: string + name: string + }, + >(this: StaticThis, payload: Omit) { if (!Webhook) { Webhook = (await import('./lifecycle/webhook')).Webhook } - const eventType = this.eventName() + const eventClass = this as typeof Event + const eventType = eventClass.eventName() try { await Webhook.send({ event: { type: eventType, region, - $version: this.version, + $version: eventClass.version, applyTime: Date.now(), payload, }, @@ -50,9 +53,10 @@ export abstract class BaseEvent> extends logger.error( { error: e, + sbReqId: payload.sbReqId, event: { type: eventType, - $version: this.version, + $version: eventClass.version, applyTime: Date.now(), payload: JSON.stringify(payload), }, @@ -77,6 +81,8 @@ export abstract class BaseEvent> extends const db = new StorageKnexDB(client, { tenantId: payload.tenant.ref, host: payload.tenant.host, + reqId: payload.reqId, + sbReqId: payload.sbReqId, }) return new Storage(this.getOrCreateStorageBackend(), db, new TenantLocation(storageS3Bucket)) diff --git a/src/storage/events/jwks/jwks-create-signing-secret.ts b/src/storage/events/jwks/jwks-create-signing-secret.ts index 900e3b427..7a934af66 100644 --- a/src/storage/events/jwks/jwks-create-signing-secret.ts +++ b/src/storage/events/jwks/jwks-create-signing-secret.ts @@ -35,7 +35,7 @@ export class JwksCreateSigningSecret extends BaseEvent) { - const { tenantId } = job.data + const { tenantId, sbReqId } = job.data try { const { kid } = await jwksManager.generateUrlSigningJwk(tenantId) @@ -46,6 +46,7 @@ export class JwksCreateSigningSecret extends BaseEvent ({ + mockRollUrlSigningJwk: vi.fn(), + mockInfo: vi.fn(), + mockError: vi.fn(), +})) + +vi.mock('@internal/database', () => ({ + jwksManager: { + rollUrlSigningJwk: mockRollUrlSigningJwk, + }, +})) + +vi.mock('@internal/monitoring', () => ({ + logger: {}, + logSchema: { + info: mockInfo, + error: mockError, + warning: vi.fn(), + }, +})) + +vi.mock('../base-event', () => ({ + BaseEvent: class {}, +})) + +import { JwksRollUrlSigningKey } from './jwks-roll-url-signing-key' + +function makeJob(overrides?: Partial>) { + return { + data: { + tenantId: 'tenant-a', + tenant: { + ref: 'tenant-a', + }, + sbReqId: 'sb-req-123', + }, + ...overrides, + } +} + +describe('JwksRollUrlSigningKey.handle', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('logs sbReqId on success', async () => { + mockRollUrlSigningJwk.mockResolvedValue({ + oldKid: 'old-kid', + newKid: 'new-kid', + }) + + await expect(JwksRollUrlSigningKey.handle(makeJob() as never)).resolves.toBeUndefined() + + expect(mockRollUrlSigningJwk).toHaveBeenCalledWith('tenant-a') + expect(mockInfo).toHaveBeenCalledWith( + expect.anything(), + '[Jwks] rolled url signing key for tenant tenant-a (old: old-kid, new: new-kid)', + expect.objectContaining({ + type: 'jwks', + project: 'tenant-a', + sbReqId: 'sb-req-123', + }) + ) + }) + + it('logs sbReqId on failure', async () => { + const error = new Error('boom') + mockRollUrlSigningJwk.mockRejectedValue(error) + + await expect(JwksRollUrlSigningKey.handle(makeJob() as never)).rejects.toThrow(error) + + expect(mockError).toHaveBeenCalledWith( + expect.anything(), + '[Jwks] roll url signing key failed for tenant tenant-a', + expect.objectContaining({ + type: 'jwks', + error, + project: 'tenant-a', + sbReqId: 'sb-req-123', + }) + ) + }) +}) diff --git a/src/storage/events/jwks/jwks-roll-url-signing-key.ts b/src/storage/events/jwks/jwks-roll-url-signing-key.ts index 6bb0fb579..30a1d1082 100644 --- a/src/storage/events/jwks/jwks-roll-url-signing-key.ts +++ b/src/storage/events/jwks/jwks-roll-url-signing-key.ts @@ -39,7 +39,7 @@ export class JwksRollUrlSigningKey extends BaseEvent) { - const { tenantId } = job.data + const { tenantId, sbReqId } = job.data try { const { oldKid, newKid } = await jwksManager.rollUrlSigningJwk(tenantId) @@ -50,6 +50,7 @@ export class JwksRollUrlSigningKey extends BaseEvent { { tenant: job.data.tenant, catalogId: job.data.bucketId, + sbReqId: job.data.sbReqId, }, { tnx: isMultitenant ? metastoreTx.getTnx() : undefined, diff --git a/src/storage/events/lifecycle/object-removed.ts b/src/storage/events/lifecycle/object-removed.ts index 7e7b3c2e9..0cdaea797 100644 --- a/src/storage/events/lifecycle/object-removed.ts +++ b/src/storage/events/lifecycle/object-removed.ts @@ -1,12 +1,11 @@ import { BasePayload } from '@internal/queue' -import { ObjectMetadata } from '@storage/backend' import { BaseEvent } from '../base-event' export interface ObjectRemovedEvent extends BasePayload { name: string bucketId: string - version: string - metadata?: ObjectMetadata + version?: string + metadata?: Record | null } export class ObjectRemoved extends BaseEvent { diff --git a/src/storage/events/lifecycle/object-updated.ts b/src/storage/events/lifecycle/object-updated.ts index a94f0b672..21de0d82e 100644 --- a/src/storage/events/lifecycle/object-updated.ts +++ b/src/storage/events/lifecycle/object-updated.ts @@ -5,7 +5,7 @@ import { BaseEvent } from '../base-event' interface ObjectUpdatedMetadataEvent extends BasePayload { name: string bucketId: string - version: string + version?: string metadata: ObjectMetadata } diff --git a/src/storage/events/lifecycle/webhook.ts b/src/storage/events/lifecycle/webhook.ts index 782e50453..c43e99c02 100644 --- a/src/storage/events/lifecycle/webhook.ts +++ b/src/storage/events/lifecycle/webhook.ts @@ -19,10 +19,11 @@ interface WebhookEvent { event: { $version: string type: string - payload: object & { reqId?: string; bucketId: string; name: string } + region: string + payload: object & { reqId?: string; sbReqId?: string; bucketId: string; name: string } applyTime: number } - sentAt: string + sentAt?: string tenant: { ref: string host: string @@ -105,6 +106,7 @@ export class Webhook extends BaseEvent { tenantId: job.data.tenant.ref, project: job.data.tenant.ref, reqId: job.data.event.payload.reqId, + sbReqId: job.data.event.payload.sbReqId, }) try { @@ -127,6 +129,7 @@ export class Webhook extends BaseEvent { tenantId: job.data.tenant.ref, project: job.data.tenant.ref, reqId: job.data.event.payload.reqId, + sbReqId: job.data.event.payload.sbReqId, }, `[Lifecycle]: ${job.data.event.type} ${path} - FAILED` ) diff --git a/src/storage/events/migrations/reset-migrations.test.ts b/src/storage/events/migrations/reset-migrations.test.ts new file mode 100644 index 000000000..e6ad80fb5 --- /dev/null +++ b/src/storage/events/migrations/reset-migrations.test.ts @@ -0,0 +1,106 @@ +import { vi } from 'vitest' + +const { mockGetTenantConfig, mockResetMigration, mockRunMigrationsSend, mockInfo } = vi.hoisted( + () => ({ + mockGetTenantConfig: vi.fn(), + mockResetMigration: vi.fn(), + mockRunMigrationsSend: vi.fn(), + mockInfo: vi.fn(), + }) +) + +vi.mock('@internal/database', () => ({ + getTenantConfig: mockGetTenantConfig, +})) + +vi.mock('@internal/database/migrations', () => ({ + DBMigration: { + 'create-migrations-table': 0, + 'storage-schema': 2, + }, + resetMigration: mockResetMigration, +})) + +vi.mock('@internal/monitoring', () => ({ + logger: {}, + logSchema: { + info: mockInfo, + error: vi.fn(), + warning: vi.fn(), + }, +})) + +vi.mock('../base-event', () => ({ + BaseEvent: class {}, +})) + +vi.mock('./run-migrations', () => ({ + RunMigrationsOnTenants: class { + static send = mockRunMigrationsSend + }, +})) + +import { ResetMigrationsOnTenant } from './reset-migrations' + +function makeJob(overrides?: Partial>) { + return { + data: { + tenantId: 'tenant-a', + untilMigration: 'storage-schema', + markCompletedTillMigration: 'create-migrations-table', + sbReqId: 'sb-req-123', + tenant: { + ref: 'tenant-a', + }, + }, + ...overrides, + } +} + +describe('ResetMigrationsOnTenant.handle', () => { + beforeEach(() => { + vi.clearAllMocks() + + mockGetTenantConfig.mockResolvedValue({ + databaseUrl: 'postgres://tenant-db', + }) + mockResetMigration.mockResolvedValue(true) + mockRunMigrationsSend.mockResolvedValue(undefined) + }) + + it('threads sbReqId through logs and the follow-up migration job', async () => { + await expect(ResetMigrationsOnTenant.handle(makeJob() as never)).resolves.toBeUndefined() + + expect(mockResetMigration).toHaveBeenCalledWith({ + tenantId: 'tenant-a', + markCompletedTillMigration: 'create-migrations-table', + untilMigration: 'storage-schema', + databaseUrl: 'postgres://tenant-db', + }) + expect(mockRunMigrationsSend).toHaveBeenCalledWith( + expect.objectContaining({ + tenantId: 'tenant-a', + singletonKey: 'tenant-a', + sbReqId: 'sb-req-123', + }) + ) + expect(mockInfo).toHaveBeenCalledWith( + expect.anything(), + '[Migrations] resetting migrations for tenant-a', + expect.objectContaining({ + type: 'migrations', + project: 'tenant-a', + sbReqId: 'sb-req-123', + }) + ) + expect(mockInfo).toHaveBeenCalledWith( + expect.anything(), + '[Migrations] reset successful for tenant-a', + expect.objectContaining({ + type: 'migrations', + project: 'tenant-a', + sbReqId: 'sb-req-123', + }) + ) + }) +}) diff --git a/src/storage/events/migrations/reset-migrations.ts b/src/storage/events/migrations/reset-migrations.ts index c54eda611..f8edc687a 100644 --- a/src/storage/events/migrations/reset-migrations.ts +++ b/src/storage/events/migrations/reset-migrations.ts @@ -40,11 +40,13 @@ export class ResetMigrationsOnTenant extends BaseEvent { static async handle(job: JobWithMetadata) { const tenantId = job.data.tenant.ref + const { sbReqId } = job.data const tenant = await getTenantConfig(tenantId) logSchema.info(logger, `[Migrations] resetting migrations for ${tenantId}`, { type: 'migrations', project: tenantId, + sbReqId, }) const reset = await resetMigration({ @@ -59,14 +61,17 @@ export class ResetMigrationsOnTenant extends BaseEvent { tenantId, tenant: { ref: tenantId, + host: '', }, singletonKey: tenantId, + sbReqId, }) } logSchema.info(logger, `[Migrations] reset successful for ${tenantId}`, { type: 'migrations', project: tenantId, + sbReqId, }) } } diff --git a/src/storage/events/migrations/run-migrations.test.ts b/src/storage/events/migrations/run-migrations.test.ts index 2aa668030..23a822d3d 100644 --- a/src/storage/events/migrations/run-migrations.test.ts +++ b/src/storage/events/migrations/run-migrations.test.ts @@ -69,6 +69,7 @@ function makeJob(overrides?: Partial>) { data: { tenantId: 'tenant-a', upToMigration: 'storage-schema', + sbReqId: 'sb-req-123', tenant: { ref: 'tenant-a', host: '', @@ -109,6 +110,15 @@ describe('RunMigrationsOnTenants.handle', () => { state: TenantMigrationStatus.COMPLETED, }) expect(mockDeleteIfActiveExists).not.toHaveBeenCalled() + expect(mockInfo).toHaveBeenCalledWith( + expect.anything(), + '[Migrations] completed for tenant tenant-a', + expect.objectContaining({ + type: 'migrations', + project: 'tenant-a', + sbReqId: 'sb-req-123', + }) + ) }) it('short-circuits when migrations are already up to date', async () => { @@ -134,6 +144,7 @@ describe('RunMigrationsOnTenants.handle', () => { expect.objectContaining({ type: 'migrations', project: 'tenant-a', + sbReqId: 'sb-req-123', }) ) }) @@ -153,6 +164,15 @@ describe('RunMigrationsOnTenants.handle', () => { 'migrations_tenant-a', 'job-1' ) + expect(mockError).toHaveBeenCalledWith( + expect.anything(), + '[Migrations] failed for tenant tenant-a', + expect.objectContaining({ + type: 'migrations', + project: 'tenant-a', + sbReqId: 'sb-req-123', + }) + ) }) it('marks the tenant FAILED_STALE on the final retry before rethrowing', async () => { diff --git a/src/storage/events/migrations/run-migrations.ts b/src/storage/events/migrations/run-migrations.ts index 0aeca7445..e61360941 100644 --- a/src/storage/events/migrations/run-migrations.ts +++ b/src/storage/events/migrations/run-migrations.ts @@ -46,6 +46,7 @@ export class RunMigrationsOnTenants extends BaseEvent { static async handle(job: JobWithMetadata) { const tenantId = job.data.tenant.ref + const { sbReqId } = job.data deleteTenantConfig(tenantId) const tenant = await getTenantConfig(tenantId) @@ -59,6 +60,7 @@ export class RunMigrationsOnTenants extends BaseEvent { logSchema.info(logger, `[Migrations] running for tenant ${tenantId}`, { type: 'migrations', project: tenantId, + sbReqId, }) await runMigrationsOnTenant({ databaseUrl: tenant.databaseUrl, @@ -74,12 +76,14 @@ export class RunMigrationsOnTenants extends BaseEvent { logSchema.info(logger, `[Migrations] completed for tenant ${tenantId}`, { type: 'migrations', project: tenantId, + sbReqId, }) } catch (e) { if (e instanceof StorageBackendError && e.code === ErrorCode.LockTimeout) { logSchema.info(logger, `[Migrations] lock timeout for tenant ${tenantId}`, { type: 'migrations', project: tenantId, + sbReqId, }) return } @@ -88,6 +92,7 @@ export class RunMigrationsOnTenants extends BaseEvent { type: 'migrations', error: e, project: tenantId, + sbReqId, }) if (job.retryCount === job.retryLimit) { @@ -106,6 +111,7 @@ export class RunMigrationsOnTenants extends BaseEvent { type: 'migrations', error: e, project: tenantId, + sbReqId, }) return } diff --git a/src/storage/events/objects/backup-object.ts b/src/storage/events/objects/backup-object.ts index a63f00bf8..184021d8b 100644 --- a/src/storage/events/objects/backup-object.ts +++ b/src/storage/events/objects/backup-object.ts @@ -65,6 +65,7 @@ export class BackupObjectEvent extends BaseEvent { tenantId: job.data.tenant.ref, project: job.data.tenant.ref, reqId: job.data.reqId, + sbReqId: job.data.sbReqId, }) await storage.backend.backup({ @@ -86,6 +87,7 @@ export class BackupObjectEvent extends BaseEvent { tenantId: job.data.tenant.ref, project: job.data.tenant.ref, reqId: job.data.reqId, + sbReqId: job.data.sbReqId, }) await storage.backend.deleteObject( @@ -111,6 +113,7 @@ export class BackupObjectEvent extends BaseEvent { tenantId: job.data.tenant.ref, project: job.data.tenant.ref, reqId: job.data.reqId, + sbReqId: job.data.sbReqId, }, `[Admin]: BackupObjectEvent ${s3Key} - FAILED` ) @@ -122,7 +125,7 @@ export class BackupObjectEvent extends BaseEvent { }) .catch((e) => { logger.error( - { error: e }, + { error: e, sbReqId: job.data.sbReqId }, `[Admin]: BackupObjectEvent ${tenantId} - FAILED DISPOSING CONNECTION` ) }) diff --git a/src/storage/events/objects/object-admin-delete-all-before.ts b/src/storage/events/objects/object-admin-delete-all-before.ts index fd205c6b3..fd178bee3 100644 --- a/src/storage/events/objects/object-admin-delete-all-before.ts +++ b/src/storage/events/objects/object-admin-delete-all-before.ts @@ -53,6 +53,7 @@ export class ObjectAdminDeleteAllBefore extends BaseEvent { logger.error( - { error: e }, + { error: e, sbReqId: job.data.sbReqId }, `[Admin]: ObjectAdminDeleteAllBefore ${tenant.ref} - FAILED DISPOSING CONNECTION` ) }) diff --git a/src/storage/events/objects/object-admin-delete.ts b/src/storage/events/objects/object-admin-delete.ts index f6de19fbe..08a47b998 100644 --- a/src/storage/events/objects/object-admin-delete.ts +++ b/src/storage/events/objects/object-admin-delete.ts @@ -51,6 +51,7 @@ export class ObjectAdminDelete extends BaseEvent { tenantId: job.data.tenant.ref, project: job.data.tenant.ref, reqId: job.data.reqId, + sbReqId: job.data.sbReqId, }) await storage.backend.deleteObjects(storageS3Bucket, [ @@ -72,6 +73,7 @@ export class ObjectAdminDelete extends BaseEvent { tenantId: job.data.tenant.ref, project: job.data.tenant.ref, reqId: job.data.reqId, + sbReqId: job.data.sbReqId, }, `[Admin]: ObjectAdminDelete ${s3Key} - FAILED` ) @@ -86,7 +88,7 @@ export class ObjectAdminDelete extends BaseEvent { }) .catch((e) => { logger.error( - { error: e }, + { error: e, sbReqId: job.data.sbReqId }, `[Admin]: ObjectAdminDelete ${tenant.ref} - FAILED DISPOSING CONNECTION` ) }) diff --git a/src/storage/events/pgboss/move-jobs.test.ts b/src/storage/events/pgboss/move-jobs.test.ts new file mode 100644 index 000000000..d874541f1 --- /dev/null +++ b/src/storage/events/pgboss/move-jobs.test.ts @@ -0,0 +1,107 @@ +import { vi } from 'vitest' + +const { mockTransaction, mockGetQueue, mockError } = vi.hoisted(() => ({ + mockTransaction: vi.fn(), + mockGetQueue: vi.fn(), + mockError: vi.fn(), +})) + +vi.mock('@internal/database', () => ({ + multitenantKnex: { + transaction: mockTransaction, + }, +})) + +vi.mock('@internal/monitoring', () => ({ + logger: {}, + logSchema: { + info: vi.fn(), + error: mockError, + warning: vi.fn(), + }, +})) + +vi.mock('@internal/queue', () => ({ + PG_BOSS_SCHEMA: 'storage', + Queue: { + getInstance: () => ({ + getQueue: mockGetQueue, + }), + }, +})) + +vi.mock('../base-event', () => ({ + BaseEvent: class {}, +})) + +import { MoveJobs } from './move-jobs' + +function makeJob(overrides?: Partial>) { + return { + data: { + fromQueue: 'source-queue', + toQueue: 'target-queue', + deleteJobsFromOriginalQueue: false, + sbReqId: 'sb-req-123', + }, + ...overrides, + } +} + +function mockLockedTransaction(raw: ReturnType) { + mockTransaction.mockImplementation( + async (callback: (tnx: { raw: typeof raw }) => Promise) => callback({ raw }) + ) +} + +describe('MoveJobs.handle', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('logs the missing target queue with sbReqId', async () => { + const raw = vi.fn().mockResolvedValueOnce({ + rows: [{ pg_try_advisory_xact_lock: true }], + }) + mockLockedTransaction(raw) + mockGetQueue.mockResolvedValue(undefined) + + await expect(MoveJobs.handle(makeJob() as never)).resolves.toBeUndefined() + + expect(mockError).toHaveBeenCalledWith( + expect.anything(), + '[PgBoss] Target queue target-queue does not exist', + expect.objectContaining({ + type: 'pgboss', + sbReqId: 'sb-req-123', + }) + ) + }) + + it('logs copy failures with sbReqId', async () => { + const error = new Error('copy failed') + const raw = vi + .fn() + .mockResolvedValueOnce({ + rows: [{ pg_try_advisory_xact_lock: true }], + }) + .mockRejectedValueOnce(error) + mockLockedTransaction(raw) + mockGetQueue.mockResolvedValue({ + name: 'target-queue', + policy: 'exactly_once', + }) + + await expect(MoveJobs.handle(makeJob() as never)).resolves.toBeUndefined() + + expect(mockError).toHaveBeenCalledWith( + expect.anything(), + '[PgBoss] Error while copying jobs', + expect.objectContaining({ + type: 'pgboss', + error, + sbReqId: 'sb-req-123', + }) + ) + }) +}) diff --git a/src/storage/events/pgboss/move-jobs.ts b/src/storage/events/pgboss/move-jobs.ts index 49369d3c2..4dc26f610 100644 --- a/src/storage/events/pgboss/move-jobs.ts +++ b/src/storage/events/pgboss/move-jobs.ts @@ -38,6 +38,8 @@ export class MoveJobs extends BaseEvent { } static async handle(job: Job) { + const { sbReqId } = job.data + await multitenantKnex.transaction(async (tnx) => { const resultLock = await tnx.raw('SELECT pg_try_advisory_xact_lock(-5525285245963000611)') const lockAcquired = resultLock.rows.shift()?.pg_try_advisory_xact_lock || false @@ -53,6 +55,7 @@ export class MoveJobs extends BaseEvent { if (!toQueue) { logSchema.error(logger, `[PgBoss] Target queue ${job.data.toQueue} does not exist`, { type: 'pgboss', + sbReqId, }) return } @@ -116,6 +119,7 @@ export class MoveJobs extends BaseEvent { logSchema.error(logger, '[PgBoss] Error while copying jobs', { type: 'pgboss', error, + sbReqId, }) } }) diff --git a/src/storage/events/pgboss/upgrade-v10.test.ts b/src/storage/events/pgboss/upgrade-v10.test.ts new file mode 100644 index 000000000..c5f70613a --- /dev/null +++ b/src/storage/events/pgboss/upgrade-v10.test.ts @@ -0,0 +1,87 @@ +import { vi } from 'vitest' + +const { mockTransaction, mockGetQueues, mockError } = vi.hoisted(() => ({ + mockTransaction: vi.fn(), + mockGetQueues: vi.fn(), + mockError: vi.fn(), +})) + +vi.mock('@internal/database', () => ({ + multitenantKnex: { + transaction: mockTransaction, + }, +})) + +vi.mock('@internal/monitoring', () => ({ + logger: {}, + logSchema: { + info: vi.fn(), + error: mockError, + warning: vi.fn(), + }, +})) + +vi.mock('@internal/queue', () => ({ + PG_BOSS_SCHEMA: 'storage', + Queue: { + getInstance: () => ({ + getQueues: mockGetQueues, + }), + }, +})) + +vi.mock('../base-event', () => ({ + BaseEvent: class {}, +})) + +import { UpgradePgBossV10 } from './upgrade-v10' + +function makeJob(overrides?: Partial>) { + return { + data: { + sbReqId: 'sb-req-123', + }, + ...overrides, + } +} + +function mockLockedTransaction(raw: ReturnType) { + mockTransaction.mockImplementation( + async (callback: (tnx: { raw: typeof raw }) => Promise) => callback({ raw }) + ) +} + +describe('UpgradePgBossV10.handle', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('logs copy failures with sbReqId', async () => { + const error = new Error('copy failed') + const raw = vi + .fn() + .mockResolvedValueOnce({ + rows: [{ pg_try_advisory_xact_lock: true }], + }) + .mockRejectedValueOnce(error) + mockLockedTransaction(raw) + mockGetQueues.mockResolvedValue([ + { + name: 'queue-a', + policy: 'exactly_once', + }, + ]) + + await expect(UpgradePgBossV10.handle(makeJob() as never)).resolves.toBeUndefined() + + expect(mockError).toHaveBeenCalledWith( + expect.anything(), + '[PgBoss] Error while copying jobs', + expect.objectContaining({ + type: 'pgboss', + error, + sbReqId: 'sb-req-123', + }) + ) + }) +}) diff --git a/src/storage/events/pgboss/upgrade-v10.ts b/src/storage/events/pgboss/upgrade-v10.ts index 4231679e3..4ba8db593 100644 --- a/src/storage/events/pgboss/upgrade-v10.ts +++ b/src/storage/events/pgboss/upgrade-v10.ts @@ -34,6 +34,8 @@ export class UpgradePgBossV10 extends BaseEvent { } static async handle(job: Job) { + const { sbReqId } = job.data + await multitenantKnex.transaction(async (tnx) => { const resultLock = await tnx.raw('SELECT pg_try_advisory_xact_lock(-5525285245963000606)') const lockAcquired = resultLock.rows.shift()?.pg_try_advisory_xact_lock || false @@ -96,6 +98,7 @@ export class UpgradePgBossV10 extends BaseEvent { logSchema.error(logger, '[PgBoss] Error while copying jobs', { type: 'pgboss', error, + sbReqId, }) } } diff --git a/src/storage/object.ts b/src/storage/object.ts index 1529919e5..0c6cbad2e 100644 --- a/src/storage/object.ts +++ b/src/storage/object.ts @@ -28,6 +28,7 @@ interface CopyObjectParams { owner?: string copyMetadata?: boolean upsert?: boolean + uploadType: 'standard' | 's3' | 'resumable' metadata?: { cacheControl?: string mimetype?: string @@ -155,6 +156,7 @@ export class ObjectStorage { version: obj.version, bucketId: this.bucketId, reqId: this.db.reqId, + sbReqId: this.db.sbReqId, metadata: obj.metadata, }) } @@ -217,6 +219,7 @@ export class ObjectStorage { name: object.name, bucketId: this.bucketId, reqId: this.db.reqId, + sbReqId: this.db.sbReqId, version: object.version, metadata: object.metadata, }) @@ -246,6 +249,7 @@ export class ObjectStorage { bucketId: this.bucketId, metadata, reqId: this.db.reqId, + sbReqId: this.db.sbReqId, }) return result @@ -301,6 +305,7 @@ export class ObjectStorage { conditions, copyMetadata, upsert, + uploadType, metadata: fileMetadata, userMetadata, }: CopyObjectParams) { @@ -393,10 +398,11 @@ export class ObjectStorage { if (existingDestObject) { await ObjectAdminDelete.send({ name: existingDestObject.name, - bucketId: existingDestObject.bucket_id, + bucketId: existingDestObject.bucket_id ?? destinationBucket, tenant: this.db.tenant(), version: existingDestObject.version, reqId: this.db.reqId, + sbReqId: this.db.sbReqId, }) } @@ -409,7 +415,9 @@ export class ObjectStorage { version: newVersion, bucketId: destinationBucket, metadata, + uploadType, reqId: this.db.reqId, + sbReqId: this.db.sbReqId, }) return { @@ -425,6 +433,7 @@ export class ObjectStorage { tenant: this.db.tenant(), version: newVersion, reqId: this.db.reqId, + sbReqId: this.db.sbReqId, }) throw e } @@ -441,6 +450,7 @@ export class ObjectStorage { sourceObjectName: string, destinationBucket: string, destinationObjectName: string, + uploadType: 'standard' | 's3' | 'resumable', owner?: string ) { mustBeValidKey(destinationObjectName) @@ -525,6 +535,7 @@ export class ObjectStorage { tenant: this.db.tenant(), version: sourceObj.version, reqId: this.db.reqId, + sbReqId: this.db.sbReqId, }) await Promise.allSettled([ @@ -533,6 +544,7 @@ export class ObjectStorage { name: sourceObjectName, bucketId: this.bucketId, reqId: this.db.reqId, + sbReqId: this.db.sbReqId, version: sourceObject.version, metadata: sourceObject.metadata, }), @@ -542,6 +554,7 @@ export class ObjectStorage { version: newVersion, bucketId: destinationBucket, metadata, + uploadType, oldObject: { name: sourceObjectName, bucketId: this.bucketId, @@ -549,6 +562,7 @@ export class ObjectStorage { version: sourceObject.version, }, reqId: this.db.reqId, + sbReqId: this.db.sbReqId, }), ]) @@ -570,6 +584,7 @@ export class ObjectStorage { tenant: this.db.tenant(), version: newVersion, reqId: this.db.reqId, + sbReqId: this.db.sbReqId, }) throw e } diff --git a/src/storage/protocols/s3/s3-handler.ts b/src/storage/protocols/s3/s3-handler.ts index 81c210209..bff112b45 100644 --- a/src/storage/protocols/s3/s3-handler.ts +++ b/src/storage/protocols/s3/s3-handler.ts @@ -1121,6 +1121,7 @@ export class S3ProtocolHandler { }, userMetadata: command.Metadata, copyMetadata: command.MetadataDirective === 'COPY', + uploadType: 's3', }) return { diff --git a/src/storage/scanner/scanner.ts b/src/storage/scanner/scanner.ts index 4afe50794..d29fa9ed2 100644 --- a/src/storage/scanner/scanner.ts +++ b/src/storage/scanner/scanner.ts @@ -340,6 +340,7 @@ export class ObjectScanner { version: obj.version, size: obj.size, reqId: this.storage.db.reqId, + sbReqId: this.storage.db.sbReqId, }) }) ) diff --git a/src/storage/storage.ts b/src/storage/storage.ts index d427393b0..6805437e5 100644 --- a/src/storage/storage.ts +++ b/src/storage/storage.ts @@ -157,6 +157,7 @@ export class Storage { ref: db.tenantId, host: db.tenantHost, }, + sbReqId: db.sbReqId, }, { sendWhenError: (error) => { @@ -168,6 +169,7 @@ export class Storage { project: db.tenantId, type: 'event', error, + sbReqId: db.sbReqId, }) return true }, @@ -262,6 +264,7 @@ export class Storage { ref: this.db.tenantId, host: this.db.tenantHost, }, + sbReqId: this.db.sbReqId, }) } @@ -298,10 +301,11 @@ export class Storage { // use queue to recursively delete all objects created before the specified time await ObjectAdminDeleteAllBefore.send({ - before, + before: before.toISOString(), bucketId, tenant: this.db.tenant(), reqId: this.db.reqId, + sbReqId: this.db.sbReqId, }) } diff --git a/src/storage/uploader.ts b/src/storage/uploader.ts index 20a53bb2d..06923b2c3 100644 --- a/src/storage/uploader.ts +++ b/src/storage/uploader.ts @@ -31,7 +31,7 @@ export interface UploadRequest { userMetadata?: Record owner?: string isUpsert?: boolean - uploadType?: 'standard' | 's3' | 'resumable' + uploadType: 'standard' | 's3' | 'resumable' signal?: AbortSignal } @@ -170,6 +170,7 @@ export class Uploader { tenant: this.db.tenant(), version, reqId: this.db.reqId, + sbReqId: this.db.sbReqId, }) throw shouldCloseConnectionAfterResponse(file.body) ? withConnectionClose(e) : e } @@ -199,7 +200,6 @@ export class Uploader { objectMetadata: ObjectMetadata version: string emitEvent?: boolean - uploadType?: 'standard' | 's3' | 'resumable' userMetadata?: Record }) { try { @@ -242,6 +242,7 @@ export class Uploader { tenant: this.db.tenant(), version: currentObj.version, reqId: this.db.reqId, + sbReqId: this.db.sbReqId, }) ) } @@ -257,6 +258,7 @@ export class Uploader { bucketId, metadata: objectMetadata, reqId: this.db.reqId, + sbReqId: this.db.sbReqId, uploadType, }) .catch((e) => { @@ -264,6 +266,7 @@ export class Uploader { type: 'event', error: e, project: this.db.tenantId, + sbReqId: this.db.sbReqId, metadata: JSON.stringify({ name: objectName, bucketId, @@ -291,6 +294,7 @@ export class Uploader { tenant: this.db.tenant(), version, reqId: this.db.reqId, + sbReqId: this.db.sbReqId, }) throw e } diff --git a/src/test/admin-migrations.test.ts b/src/test/admin-migrations.test.ts index c44640834..07399702b 100644 --- a/src/test/admin-migrations.test.ts +++ b/src/test/admin-migrations.test.ts @@ -147,7 +147,10 @@ describe('Admin migrations routes', () => { payload: { untilMigration: 'create-migrations-table' satisfies keyof typeof DBMigration, }, - headers, + headers: { + ...headers, + 'sb-request-id': 'sb-req-123', + }, }) expect(response.statusCode).toBe(200) @@ -156,6 +159,27 @@ describe('Admin migrations routes', () => { till: 'create-migrations-table', markCompletedTillMigration: undefined, signal: expect.any(AbortSignal), + sbReqId: 'sb-req-123', + }) + }) + + test('passes sbReqId to fleet migrate scheduling', async () => { + const runSpy = vi.mocked(migrations.runMigrationsOnAllTenants).mockResolvedValue(undefined) + + const response = await adminApp.inject({ + method: 'POST', + url: '/migrations/migrate/fleet', + headers: { + ...headers, + 'sb-request-id': 'sb-req-123', + }, + }) + + expect(response.statusCode).toBe(200) + expect(JSON.parse(response.body)).toEqual({ message: 'Migrations scheduled' }) + expect(runSpy).toHaveBeenCalledWith({ + signal: expect.any(AbortSignal), + sbReqId: 'sb-req-123', }) }) @@ -477,6 +501,20 @@ describe('Admin migrations routes', () => { }) }) + test.each([ + '/migrations/failed?cursor=cursor-NaN', + '/migrations/failed?cursor=-1', + ])('rejects invalid failed-migrations cursor for %s', async (url) => { + const response = await adminApp.inject({ + method: 'GET', + url, + headers, + }) + + expect(response.statusCode).toBe(400) + expect(JSON.parse(response.body)).toEqual({ message: 'Invalid cursor' }) + }) + test('marks only active fleet migration jobs from the current queue as completed', async () => { const matchingJobId = trackJobId(randomUUID()) const wrongQueueJobId = trackJobId(randomUUID()) diff --git a/src/test/tenant-jwks.test.ts b/src/test/tenant-jwks.test.ts index 318e28d62..36973ef59 100644 --- a/src/test/tenant-jwks.test.ts +++ b/src/test/tenant-jwks.test.ts @@ -634,6 +634,7 @@ describe('Tenant jwks configs', () => { url: `/tenants/${tenantId}/jwks/url-signing/roll`, headers: { apikey: process.env.ADMIN_API_KEYS, + 'sb-request-id': 'sb-req-123', }, }) expect(response.statusCode).toBe(200) @@ -644,7 +645,7 @@ describe('Tenant jwks configs', () => { expect(queueSendSpy).toHaveBeenCalledTimes(1) const [[callArg]] = queueSendSpy.mock.calls expect(callArg).toMatchObject({ - data: { tenantId }, + data: { tenantId, sbReqId: 'sb-req-123' }, name: 'tenants-jwks-roll-url-signing-key-v1', }) } finally { diff --git a/src/test/webhooks.test.ts b/src/test/webhooks.test.ts index 31def757f..733689769 100644 --- a/src/test/webhooks.test.ts +++ b/src/test/webhooks.test.ts @@ -9,6 +9,7 @@ const { serviceKeyAsync, tenantId } = getConfig() mergeConfig({ pgQueueEnable: true, + requestTraceHeader: 'trace-id', }) import { getPostgresConnection, getServiceKeyUser } from '@internal/database' @@ -106,6 +107,36 @@ describe('Webhooks', () => { ) }) + it('keeps trace reqId separate from sbReqId in queued webhook payloads', async () => { + const form = new FormData() + + const authorization = `Bearer ${await serviceKeyAsync}` + form.append('file', fs.createReadStream(`./src/test/assets/sadcat.jpg`)) + const headers = Object.assign({}, form.getHeaders(), { + authorization, + 'trace-id': 'trace-123', + 'sb-request-id': 'sb-req-123', + }) + + const fileName = (Math.random() + 1).toString(36).substring(7) + + const response = await appInstance.inject({ + method: 'POST', + url: `/object/bucket6/public/${fileName}.png`, + headers, + payload: form, + }) + + expect(response.statusCode).toBe(200) + expect(sendSpy).toHaveBeenCalledTimes(1) + const queuedWebhook = sendSpy.mock.calls[0][0].data + + expect(queuedWebhook).not.toHaveProperty('sbReqId') // not top level + expect(queuedWebhook.event.payload.reqId).toEqual(expect.any(String)) + expect(queuedWebhook.event.payload.sbReqId).toBe('sb-req-123') + expect(queuedWebhook.event.payload.reqId).not.toBe(queuedWebhook.event.payload.sbReqId) + }) + it('will emit a webhook upon object deletion', async () => { const obj = await createObject(pg, 'bucket6') From 5f05e2a9063e170043e1e377d4f7a0466e6b5800 Mon Sep 17 00:00:00 2001 From: ferhat elmas Date: Thu, 23 Apr 2026 18:27:22 +0200 Subject: [PATCH 2/2] chore: system tenant Signed-off-by: ferhat elmas --- src/http/routes/admin/queue.test.ts | 5 +--- src/internal/queue/constants.ts | 1 + src/internal/queue/event.ts | 4 ++- src/storage/events/pgboss/move-jobs.test.ts | 28 +++++++++++++------ src/storage/events/pgboss/move-jobs.ts | 4 ++- src/storage/events/pgboss/upgrade-v10.test.ts | 27 ++++++++++++------ src/storage/events/pgboss/upgrade-v10.ts | 5 ++-- 7 files changed, 50 insertions(+), 24 deletions(-) diff --git a/src/http/routes/admin/queue.test.ts b/src/http/routes/admin/queue.test.ts index 1ee0ae48e..af7c7469f 100644 --- a/src/http/routes/admin/queue.test.ts +++ b/src/http/routes/admin/queue.test.ts @@ -109,10 +109,7 @@ describe('admin queue routes', () => { toQueue: 'target-queue', deleteJobsFromOriginalQueue: true, sbReqId: 'sb-req-123', - tenant: { - ref: '', - host: '', - }, + tenant: SYSTEM_TENANT, }) } finally { await app.close() diff --git a/src/internal/queue/constants.ts b/src/internal/queue/constants.ts index 35fe46baf..8c1f68b10 100644 --- a/src/internal/queue/constants.ts +++ b/src/internal/queue/constants.ts @@ -2,4 +2,5 @@ * Placeholder tenant for events that run across the whole fleet * (e.g. pg-boss maintenance) and have no single tenant context. */ +export const SYSTEM_TENANT_REF = 'SYSTEM_TENANT' as const export const SYSTEM_TENANT = { ref: '', host: '' } as const diff --git a/src/internal/queue/event.ts b/src/internal/queue/event.ts index eca236dc5..8b95757f3 100644 --- a/src/internal/queue/event.ts +++ b/src/internal/queue/event.ts @@ -6,6 +6,7 @@ import { KnexQueueDB } from '@internal/queue/database' import { Knex } from 'knex' import PgBoss, { Job, Queue as PgBossQueue, SendOptions, WorkOptions } from 'pg-boss' import { getConfig } from '../../config' +import { SYSTEM_TENANT_REF } from './constants' import { PG_BOSS_SCHEMA, Queue } from './queue' export interface BasePayload { @@ -215,7 +216,7 @@ export class Event> { logSchema.error(logger, '[Queue] Error invoking event synchronously, sending to queue', { type: 'queue', - project: this.payload.tenant?.ref, + project: this.payload.tenant?.ref || SYSTEM_TENANT_REF, error: e, metadata: JSON.stringify(this.payload), sbReqId: this.payload.sbReqId, @@ -321,6 +322,7 @@ export class Event> { `[Queue Sender] Error while sending job to queue, sending synchronously`, { type: 'queue', + project: this.payload.tenant?.ref || SYSTEM_TENANT_REF, error: e, metadata: JSON.stringify(this.payload), sbReqId: this.payload.sbReqId, diff --git a/src/storage/events/pgboss/move-jobs.test.ts b/src/storage/events/pgboss/move-jobs.test.ts index d874541f1..97e9da513 100644 --- a/src/storage/events/pgboss/move-jobs.test.ts +++ b/src/storage/events/pgboss/move-jobs.test.ts @@ -21,19 +21,28 @@ vi.mock('@internal/monitoring', () => ({ }, })) -vi.mock('@internal/queue', () => ({ - PG_BOSS_SCHEMA: 'storage', - Queue: { - getInstance: () => ({ - getQueue: mockGetQueue, - }), - }, -})) +vi.mock('@internal/queue', async () => { + const { SYSTEM_TENANT, SYSTEM_TENANT_REF } = await vi.importActual< + typeof import('@internal/queue/constants') + >('@internal/queue/constants') + + return { + PG_BOSS_SCHEMA: 'storage', + SYSTEM_TENANT, + SYSTEM_TENANT_REF, + Queue: { + getInstance: () => ({ + getQueue: mockGetQueue, + }), + }, + } +}) vi.mock('../base-event', () => ({ BaseEvent: class {}, })) +import { SYSTEM_TENANT, SYSTEM_TENANT_REF } from '@internal/queue' import { MoveJobs } from './move-jobs' function makeJob(overrides?: Partial>) { @@ -43,6 +52,7 @@ function makeJob(overrides?: Partial>) { toQueue: 'target-queue', deleteJobsFromOriginalQueue: false, sbReqId: 'sb-req-123', + tenant: SYSTEM_TENANT, }, ...overrides, } @@ -73,6 +83,7 @@ describe('MoveJobs.handle', () => { '[PgBoss] Target queue target-queue does not exist', expect.objectContaining({ type: 'pgboss', + project: SYSTEM_TENANT_REF, sbReqId: 'sb-req-123', }) ) @@ -100,6 +111,7 @@ describe('MoveJobs.handle', () => { expect.objectContaining({ type: 'pgboss', error, + project: SYSTEM_TENANT_REF, sbReqId: 'sb-req-123', }) ) diff --git a/src/storage/events/pgboss/move-jobs.ts b/src/storage/events/pgboss/move-jobs.ts index 4dc26f610..3bdaf1879 100644 --- a/src/storage/events/pgboss/move-jobs.ts +++ b/src/storage/events/pgboss/move-jobs.ts @@ -1,6 +1,6 @@ import { multitenantKnex } from '@internal/database' import { logger, logSchema } from '@internal/monitoring' -import { BasePayload, PG_BOSS_SCHEMA, Queue } from '@internal/queue' +import { BasePayload, PG_BOSS_SCHEMA, Queue, SYSTEM_TENANT_REF } from '@internal/queue' import { Job, Queue as PgBossQueue, SendOptions, WorkOptions } from 'pg-boss' import { BaseEvent } from '../base-event' @@ -55,6 +55,7 @@ export class MoveJobs extends BaseEvent { if (!toQueue) { logSchema.error(logger, `[PgBoss] Target queue ${job.data.toQueue} does not exist`, { type: 'pgboss', + project: job.data.tenant?.ref || SYSTEM_TENANT_REF, sbReqId, }) return @@ -119,6 +120,7 @@ export class MoveJobs extends BaseEvent { logSchema.error(logger, '[PgBoss] Error while copying jobs', { type: 'pgboss', error, + project: job.data.tenant?.ref || SYSTEM_TENANT_REF, sbReqId, }) } diff --git a/src/storage/events/pgboss/upgrade-v10.test.ts b/src/storage/events/pgboss/upgrade-v10.test.ts index c5f70613a..e22ccdb56 100644 --- a/src/storage/events/pgboss/upgrade-v10.test.ts +++ b/src/storage/events/pgboss/upgrade-v10.test.ts @@ -21,25 +21,35 @@ vi.mock('@internal/monitoring', () => ({ }, })) -vi.mock('@internal/queue', () => ({ - PG_BOSS_SCHEMA: 'storage', - Queue: { - getInstance: () => ({ - getQueues: mockGetQueues, - }), - }, -})) +vi.mock('@internal/queue', async () => { + const { SYSTEM_TENANT, SYSTEM_TENANT_REF } = await vi.importActual< + typeof import('@internal/queue/constants') + >('@internal/queue/constants') + + return { + PG_BOSS_SCHEMA: 'storage', + SYSTEM_TENANT, + SYSTEM_TENANT_REF, + Queue: { + getInstance: () => ({ + getQueues: mockGetQueues, + }), + }, + } +}) vi.mock('../base-event', () => ({ BaseEvent: class {}, })) +import { SYSTEM_TENANT, SYSTEM_TENANT_REF } from '@internal/queue' import { UpgradePgBossV10 } from './upgrade-v10' function makeJob(overrides?: Partial>) { return { data: { sbReqId: 'sb-req-123', + tenant: SYSTEM_TENANT, }, ...overrides, } @@ -80,6 +90,7 @@ describe('UpgradePgBossV10.handle', () => { expect.objectContaining({ type: 'pgboss', error, + project: SYSTEM_TENANT_REF, sbReqId: 'sb-req-123', }) ) diff --git a/src/storage/events/pgboss/upgrade-v10.ts b/src/storage/events/pgboss/upgrade-v10.ts index 4ba8db593..e9369560b 100644 --- a/src/storage/events/pgboss/upgrade-v10.ts +++ b/src/storage/events/pgboss/upgrade-v10.ts @@ -1,6 +1,6 @@ import { multitenantKnex } from '@internal/database' import { logger, logSchema } from '@internal/monitoring' -import { BasePayload, PG_BOSS_SCHEMA, Queue } from '@internal/queue' +import { BasePayload, PG_BOSS_SCHEMA, Queue, SYSTEM_TENANT_REF } from '@internal/queue' import { Job, Queue as PgBossQueue, SendOptions, WorkOptions } from 'pg-boss' import { BaseEvent } from '../base-event' @@ -70,7 +70,7 @@ export class UpgradePgBossV10 extends BaseEvent { output, policy ) - SELECT + SELECT id, name, priority, @@ -98,6 +98,7 @@ export class UpgradePgBossV10 extends BaseEvent { logSchema.error(logger, '[PgBoss] Error while copying jobs', { type: 'pgboss', error, + project: job.data.tenant?.ref || SYSTEM_TENANT_REF, sbReqId, }) }