Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
this.mongoLogger = this.server.topology.client?.mongoLogger;
this.component = 'connection';

process.nextTick(() => {
queueMicrotask(() => {
this.emitAndLog(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionPoolCreatedEvent(this));
});
}
Expand Down Expand Up @@ -342,7 +342,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
});

this.waitQueue.push(waitQueueMember);
process.nextTick(() => this.processWaitQueue());
queueMicrotask(() => this.processWaitQueue());

try {
timeout?.throwIfExpired();
Expand Down Expand Up @@ -405,7 +405,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
this.destroyConnection(connection, reason);
}

process.nextTick(() => this.processWaitQueue());
queueMicrotask(() => this.processWaitQueue());
}

/**
Expand Down Expand Up @@ -461,7 +461,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}

if (interruptInUseConnections) {
process.nextTick(() => this.interruptInUseConnections(oldGeneration));
queueMicrotask(() => this.interruptInUseConnections(oldGeneration));
}

this.processWaitQueue();
Expand Down Expand Up @@ -702,7 +702,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
this.createConnection((err, connection) => {
if (!err && connection) {
this.connections.push(connection);
process.nextTick(() => this.processWaitQueue());
queueMicrotask(() => this.processWaitQueue());
}
if (this.poolState === PoolState.ready) {
clearTimeout(this.minPoolSizeTimer);
Expand Down Expand Up @@ -809,7 +809,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
waitQueueMember.resolve(connection);
}
}
process.nextTick(() => this.processWaitQueue());
queueMicrotask(() => this.processWaitQueue());
});
}
this.processingWaitQueue = false;
Expand Down
16 changes: 8 additions & 8 deletions src/gridfs/upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ export class GridFSBucketWriteStream extends Writable {
}
);
} else {
return process.nextTick(callback);
return queueMicrotask(callback);
}
}

Expand All @@ -188,7 +188,7 @@ export class GridFSBucketWriteStream extends Writable {
/** @internal */
override _final(callback: (error?: Error | null) => void): void {
if (this.state.streamEnd) {
return process.nextTick(callback);
return queueMicrotask(callback);
}
this.state.streamEnd = true;
writeRemnant(this, callback);
Expand Down Expand Up @@ -220,11 +220,11 @@ export class GridFSBucketWriteStream extends Writable {

function handleError(stream: GridFSBucketWriteStream, error: Error, callback: Callback): void {
if (stream.state.errored) {
process.nextTick(callback);
queueMicrotask(callback);
return;
}
stream.state.errored = true;
process.nextTick(callback, error);
queueMicrotask(() => callback(error));
}

function createChunkDoc(filesId: ObjectId, n: number, data: Buffer): GridFSChunk {
Expand Down Expand Up @@ -283,7 +283,7 @@ async function checkChunksIndex(stream: GridFSBucketWriteStream): Promise<void>

function checkDone(stream: GridFSBucketWriteStream, callback: Callback): void {
if (stream.done) {
return process.nextTick(callback);
return queueMicrotask(callback);
}

if (stream.state.streamEnd && stream.state.outstandingRequests === 0 && !stream.state.errored) {
Expand Down Expand Up @@ -327,7 +327,7 @@ function checkDone(stream: GridFSBucketWriteStream, callback: Callback): void {
return;
}

process.nextTick(callback);
queueMicrotask(callback);
}

async function checkIndexes(stream: GridFSBucketWriteStream): Promise<void> {
Expand Down Expand Up @@ -425,7 +425,7 @@ function doWrite(
if (stream.pos + inputBuf.length < stream.chunkSizeBytes) {
inputBuf.copy(stream.bufToStore, stream.pos);
stream.pos += inputBuf.length;
process.nextTick(callback);
queueMicrotask(callback);
return;
}

Expand Down Expand Up @@ -530,7 +530,7 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback: Callback): void

function isAborted(stream: GridFSBucketWriteStream, callback: Callback<void>): boolean {
if (stream.state.aborted) {
process.nextTick(callback, new MongoAPIError('Stream has been aborted'));
queueMicrotask(() => callback(new MongoAPIError('Stream has been aborted')));
return true;
}
return false;
Expand Down
2 changes: 1 addition & 1 deletion src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
function monitorServer(monitor: Monitor) {
return (callback: Callback) => {
if (monitor.s.state === STATE_MONITORING) {
process.nextTick(callback);
queueMicrotask(callback);
return;
}
stateTransition(monitor, STATE_MONITORING);
Expand Down
2 changes: 1 addition & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
error.addErrorLabel(MongoErrorLabel.ResetPool);
}
markServerUnknown(this, error);
process.nextTick(() => this.requestCheck());
queueMicrotask(() => this.requestCheck());
return;
}

Expand Down
2 changes: 1 addition & 1 deletion src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,7 @@ function processWaitQueue(topology: Topology) {
if (topology.waitQueue.length > 0) {
// ensure all server monitors attempt monitoring soon
for (const [, server] of topology.s.servers) {
process.nextTick(function scheduleServerCheck() {
queueMicrotask(function scheduleServerCheck() {
return server.requestCheck();
});
}
Expand Down
14 changes: 12 additions & 2 deletions test/integration/sessions/sessions.prose.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,18 @@ describe('Sessions Prose Tests', () => {
expect(allResults).to.have.lengthOf(operations.length);
expect(events).to.have.lengthOf(operations.length);

// This is a guarantee in node, unless you are performing a transaction (which is not being done in this test)
expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex')))).to.have.lengthOf(1);
// Previous version of this test was too strict: we were expecting that only one session be used for this scenario.
// That was possible at the time because the operations were simple enough and the server fast enough that the operations would complete serially.
//
// However, with a more complex operation bulkWrite (like `Array.from({ length: 100_000 }).map(() => ({ insertOne: { document: { a: 1 } } })),`),
// it's entirely possible and expected that bulkWrite would introduce a second session due to the time it takes to process all the inserts.
//
// The important bit of the test is that the number of sessions is less than the number of concurrent operations, so now instead of expecting exactly 1 session,
// we just expect less than operations.length - 1 sessions.
//
const uniqueSessionIds = new Set(events.map(ev => ev.command.lsid.id.toString('hex')));
const expectedMaxSessions = operations.length - 1;
expect(uniqueSessionIds).to.have.length.lessThan(expectedMaxSessions);
});
});

Expand Down
2 changes: 1 addition & 1 deletion test/tools/runner/ee_checker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ events.EventEmitter = class RequireErrorListenerEventEmitter extends EventEmitte
super(...args);
const ctorCallSite = new Error('EventEmitter must add an error listener synchronously');
ctorCallSite.stack;
process.nextTick(() => {
queueMicrotask(() => {
const isChangeStream = this.constructor.name
.toLowerCase()
.includes('ChangeStream'.toLowerCase());
Expand Down
4 changes: 2 additions & 2 deletions test/tools/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ export const sleep = promisify(setTimeout);

/**
* If you are using sinon fake timers, it can end up blocking queued IO from running
* awaiting a nextTick call will allow the event loop to process Networking/FS callbacks
* awaiting a setTimeout call will allow the event loop to process Networking/FS callbacks
*/
export const processTick = () => new Promise(resolve => process.nextTick(resolve));
export const processTick = () => new Promise(resolve => setTimeout(resolve, 0));

export function getIndicesOfAuthInUrl(connectionString: string | string[]) {
const doubleSlashIndex = connectionString.indexOf('//');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ describe('Polling Srv Records for Mongos Discovery', () => {
expect(topology.description).to.have.property('type', TopologyType.Sharded);
const servers = Array.from(topology.description.servers.keys());
expect(servers).to.deep.equal(srvAddresses(recordSets[0]));
process.nextTick(() => srvPoller.trigger(recordSets[1]));
queueMicrotask(() => srvPoller.trigger(recordSets[1]));

await once(topology, 'topologyDescriptionChanged');

Expand Down Expand Up @@ -296,7 +296,7 @@ describe('Polling Srv Records for Mongos Discovery', () => {
const callback = args[args.length - 1] as (err: null, address: string, family: 4) => void;

if (hostname.includes('test.mock.test.build.10gen.cc')) {
return process.nextTick(callback, null, '127.0.0.1', 4);
return queueMicrotask(() => callback(null, '127.0.0.1', 4));
}

const { wrappedMethod: lookup } = lookupStub;
Expand Down
2 changes: 1 addition & 1 deletion test/unit/cmap/connect.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ describe('Connect Tests', function () {
const cancellationToken = new CancellationToken();
// Make sure the cancel listener is added before emitting cancel
cancellationToken.addListener('newListener', () => {
process.nextTick(() => {
queueMicrotask(() => {
cancellationToken.emit('cancel');
});
});
Expand Down