From 6f88dce19e0c1b1ceb32d0bf1961a9dfb4d72894 Mon Sep 17 00:00:00 2001 From: Pavel Safronov Date: Wed, 3 Dec 2025 12:41:07 -0800 Subject: [PATCH 1/4] fix(NODE-7308): replace process.nextTick with queueMicrotask --- src/cmap/connection_pool.ts | 12 ++++++------ src/gridfs/upload.ts | 16 ++++++++-------- src/sdam/monitor.ts | 2 +- src/sdam/server.ts | 2 +- src/sdam/topology.ts | 2 +- test/tools/runner/ee_checker.ts | 2 +- test/tools/utils.ts | 2 +- ...rv_records_for_mongos_discovery.prose.test.ts | 4 ++-- test/unit/cmap/connect.test.ts | 2 +- 9 files changed, 22 insertions(+), 22 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 3f16c719300..0df9f5d91bc 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -231,7 +231,7 @@ export class ConnectionPool extends TypedEventEmitter { this.mongoLogger = this.server.topology.client?.mongoLogger; this.component = 'connection'; - process.nextTick(() => { + queueMicrotask(() => { this.emitAndLog(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionPoolCreatedEvent(this)); }); } @@ -342,7 +342,7 @@ export class ConnectionPool extends TypedEventEmitter { }); this.waitQueue.push(waitQueueMember); - process.nextTick(() => this.processWaitQueue()); + queueMicrotask(() => this.processWaitQueue()); try { timeout?.throwIfExpired(); @@ -405,7 +405,7 @@ export class ConnectionPool extends TypedEventEmitter { this.destroyConnection(connection, reason); } - process.nextTick(() => this.processWaitQueue()); + queueMicrotask(() => this.processWaitQueue()); } /** @@ -461,7 +461,7 @@ export class ConnectionPool extends TypedEventEmitter { } if (interruptInUseConnections) { - process.nextTick(() => this.interruptInUseConnections(oldGeneration)); + queueMicrotask(() => this.interruptInUseConnections(oldGeneration)); } this.processWaitQueue(); @@ -702,7 +702,7 @@ export class ConnectionPool extends TypedEventEmitter { this.createConnection((err, connection) => { if (!err && connection) { this.connections.push(connection); - process.nextTick(() => this.processWaitQueue()); + queueMicrotask(() => this.processWaitQueue()); } if (this.poolState === PoolState.ready) { clearTimeout(this.minPoolSizeTimer); @@ -809,7 +809,7 @@ export class ConnectionPool extends TypedEventEmitter { waitQueueMember.resolve(connection); } } - process.nextTick(() => this.processWaitQueue()); + queueMicrotask(() => this.processWaitQueue()); }); } this.processingWaitQueue = false; diff --git a/src/gridfs/upload.ts b/src/gridfs/upload.ts index bd1449da4a1..9c1b2659017 100644 --- a/src/gridfs/upload.ts +++ b/src/gridfs/upload.ts @@ -165,7 +165,7 @@ export class GridFSBucketWriteStream extends Writable { } ); } else { - return process.nextTick(callback); + return queueMicrotask(callback); } } @@ -188,7 +188,7 @@ export class GridFSBucketWriteStream extends Writable { /** @internal */ override _final(callback: (error?: Error | null) => void): void { if (this.state.streamEnd) { - return process.nextTick(callback); + return queueMicrotask(callback); } this.state.streamEnd = true; writeRemnant(this, callback); @@ -220,11 +220,11 @@ export class GridFSBucketWriteStream extends Writable { function handleError(stream: GridFSBucketWriteStream, error: Error, callback: Callback): void { if (stream.state.errored) { - process.nextTick(callback); + queueMicrotask(callback); return; } stream.state.errored = true; - process.nextTick(callback, error); + queueMicrotask(() => callback(error)); } function createChunkDoc(filesId: ObjectId, n: number, data: Buffer): GridFSChunk { @@ -283,7 +283,7 @@ async function checkChunksIndex(stream: GridFSBucketWriteStream): Promise function checkDone(stream: GridFSBucketWriteStream, callback: Callback): void { if (stream.done) { - return process.nextTick(callback); + return queueMicrotask(callback); } if (stream.state.streamEnd && stream.state.outstandingRequests === 0 && !stream.state.errored) { @@ -327,7 +327,7 @@ function checkDone(stream: GridFSBucketWriteStream, callback: Callback): void { return; } - process.nextTick(callback); + queueMicrotask(callback); } async function checkIndexes(stream: GridFSBucketWriteStream): Promise { @@ -425,7 +425,7 @@ function doWrite( if (stream.pos + inputBuf.length < stream.chunkSizeBytes) { inputBuf.copy(stream.bufToStore, stream.pos); stream.pos += inputBuf.length; - process.nextTick(callback); + queueMicrotask(callback); return; } @@ -530,7 +530,7 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback: Callback): void function isAborted(stream: GridFSBucketWriteStream, callback: Callback): boolean { if (stream.state.aborted) { - process.nextTick(callback, new MongoAPIError('Stream has been aborted')); + queueMicrotask(() => callback(new MongoAPIError('Stream has been aborted'))); return true; } return false; diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index f0e574ff7e2..e5b4a32a849 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -426,7 +426,7 @@ function checkServer(monitor: Monitor, callback: Callback) { function monitorServer(monitor: Monitor) { return (callback: Callback) => { if (monitor.s.state === STATE_MONITORING) { - process.nextTick(callback); + queueMicrotask(callback); return; } stateTransition(monitor, STATE_MONITORING); diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 028dad80ffa..29b41a5958d 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -414,7 +414,7 @@ export class Server extends TypedEventEmitter { error.addErrorLabel(MongoErrorLabel.ResetPool); } markServerUnknown(this, error); - process.nextTick(() => this.requestCheck()); + queueMicrotask(() => this.requestCheck()); return; } diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index eba356b020e..74cf1a5e7eb 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -1076,7 +1076,7 @@ function processWaitQueue(topology: Topology) { if (topology.waitQueue.length > 0) { // ensure all server monitors attempt monitoring soon for (const [, server] of topology.s.servers) { - process.nextTick(function scheduleServerCheck() { + queueMicrotask(function scheduleServerCheck() { return server.requestCheck(); }); } diff --git a/test/tools/runner/ee_checker.ts b/test/tools/runner/ee_checker.ts index 3a92264d356..aa2146b2b7c 100644 --- a/test/tools/runner/ee_checker.ts +++ b/test/tools/runner/ee_checker.ts @@ -9,7 +9,7 @@ events.EventEmitter = class RequireErrorListenerEventEmitter extends EventEmitte super(...args); const ctorCallSite = new Error('EventEmitter must add an error listener synchronously'); ctorCallSite.stack; - process.nextTick(() => { + queueMicrotask(() => { const isChangeStream = this.constructor.name .toLowerCase() .includes('ChangeStream'.toLowerCase()); diff --git a/test/tools/utils.ts b/test/tools/utils.ts index d092fe2eca6..555ec5b6d3a 100644 --- a/test/tools/utils.ts +++ b/test/tools/utils.ts @@ -166,7 +166,7 @@ export const sleep = promisify(setTimeout); * If you are using sinon fake timers, it can end up blocking queued IO from running * awaiting a nextTick call will allow the event loop to process Networking/FS callbacks */ -export const processTick = () => new Promise(resolve => process.nextTick(resolve)); +export const processTick = () => new Promise(resolve => queueMicrotask(resolve)); export function getIndicesOfAuthInUrl(connectionString: string | string[]) { const doubleSlashIndex = connectionString.indexOf('//'); diff --git a/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts b/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts index 258e1922fce..971d4da6d3b 100644 --- a/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts +++ b/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts @@ -130,7 +130,7 @@ describe('Polling Srv Records for Mongos Discovery', () => { expect(topology.description).to.have.property('type', TopologyType.Sharded); const servers = Array.from(topology.description.servers.keys()); expect(servers).to.deep.equal(srvAddresses(recordSets[0])); - process.nextTick(() => srvPoller.trigger(recordSets[1])); + queueMicrotask(() => srvPoller.trigger(recordSets[1])); await once(topology, 'topologyDescriptionChanged'); @@ -296,7 +296,7 @@ describe('Polling Srv Records for Mongos Discovery', () => { const callback = args[args.length - 1] as (err: null, address: string, family: 4) => void; if (hostname.includes('test.mock.test.build.10gen.cc')) { - return process.nextTick(callback, null, '127.0.0.1', 4); + return queueMicrotask(() => callback(null, '127.0.0.1', 4)); } const { wrappedMethod: lookup } = lookupStub; diff --git a/test/unit/cmap/connect.test.ts b/test/unit/cmap/connect.test.ts index a3e8f7dac36..7247d7ceb8c 100644 --- a/test/unit/cmap/connect.test.ts +++ b/test/unit/cmap/connect.test.ts @@ -157,7 +157,7 @@ describe('Connect Tests', function () { const cancellationToken = new CancellationToken(); // Make sure the cancel listener is added before emitting cancel cancellationToken.addListener('newListener', () => { - process.nextTick(() => { + queueMicrotask(() => { cancellationToken.emit('cancel'); }); }); From 4565698ae1104ce05a2516d4306d2e13d485f9d3 Mon Sep 17 00:00:00 2001 From: Pavel Safronov Date: Fri, 5 Dec 2025 09:59:11 -0800 Subject: [PATCH 2/4] relax constraint in session prose test --- test/integration/sessions/sessions.prose.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration/sessions/sessions.prose.test.ts b/test/integration/sessions/sessions.prose.test.ts index 759faaa7901..5df7ffea9ee 100644 --- a/test/integration/sessions/sessions.prose.test.ts +++ b/test/integration/sessions/sessions.prose.test.ts @@ -107,8 +107,8 @@ describe('Sessions Prose Tests', () => { expect(allResults).to.have.lengthOf(operations.length); expect(events).to.have.lengthOf(operations.length); - // This is a guarantee in node, unless you are performing a transaction (which is not being done in this test) - expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex')))).to.have.lengthOf(1); + const uniqueSessionIds = new Set(events.map(ev => ev.command.lsid.id.toString('hex'))); + expect(uniqueSessionIds).to.have.length.lessThanOrEqual(2); }); }); From 6282a0df6b53d2a3d7e0451ddd18be35f9a5ec81 Mon Sep 17 00:00:00 2001 From: Pavel Safronov Date: Fri, 5 Dec 2025 11:19:53 -0800 Subject: [PATCH 3/4] use setTimeout for test util's processTick --- test/tools/utils.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/tools/utils.ts b/test/tools/utils.ts index 555ec5b6d3a..4748245b239 100644 --- a/test/tools/utils.ts +++ b/test/tools/utils.ts @@ -164,9 +164,9 @@ export const sleep = promisify(setTimeout); /** * If you are using sinon fake timers, it can end up blocking queued IO from running - * awaiting a nextTick call will allow the event loop to process Networking/FS callbacks + * awaiting a setTimeout call will allow the event loop to process Networking/FS callbacks */ -export const processTick = () => new Promise(resolve => queueMicrotask(resolve)); +export const processTick = () => new Promise(resolve => setTimeout(resolve, 0)); export function getIndicesOfAuthInUrl(connectionString: string | string[]) { const doubleSlashIndex = connectionString.indexOf('//'); From 25a994ca001de3d66c06f25cde9397a90fc20c6a Mon Sep 17 00:00:00 2001 From: Pavel Safronov Date: Mon, 8 Dec 2025 09:30:43 -0800 Subject: [PATCH 4/4] add comments explaining the reason for this change and why the original test worked --- test/integration/sessions/sessions.prose.test.ts | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/test/integration/sessions/sessions.prose.test.ts b/test/integration/sessions/sessions.prose.test.ts index 5df7ffea9ee..0595bfb3ad4 100644 --- a/test/integration/sessions/sessions.prose.test.ts +++ b/test/integration/sessions/sessions.prose.test.ts @@ -107,8 +107,18 @@ describe('Sessions Prose Tests', () => { expect(allResults).to.have.lengthOf(operations.length); expect(events).to.have.lengthOf(operations.length); + // Previous version of this test was too strict: we were expecting that only one session be used for this scenario. + // That was possible at the time because the operations were simple enough and the server fast enough that the operations would complete serially. + // + // However, with a more complex operation bulkWrite (like `Array.from({ length: 100_000 }).map(() => ({ insertOne: { document: { a: 1 } } })),`), + // it's entirely possible and expected that bulkWrite would introduce a second session due to the time it takes to process all the inserts. + // + // The important bit of the test is that the number of sessions is less than the number of concurrent operations, so now instead of expecting exactly 1 session, + // we just expect less than operations.length - 1 sessions. + // const uniqueSessionIds = new Set(events.map(ev => ev.command.lsid.id.toString('hex'))); - expect(uniqueSessionIds).to.have.length.lessThanOrEqual(2); + const expectedMaxSessions = operations.length - 1; + expect(uniqueSessionIds).to.have.length.lessThan(expectedMaxSessions); }); });