Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ import {
/** @public */
export type Stream = Socket | TLSSocket;

function applyBackpressureLabels(error: MongoError) {
error.addErrorLabel(MongoErrorLabel.SystemOverloadedError);
error.addErrorLabel(MongoErrorLabel.RetryableError);
}

export async function connect(options: ConnectionOptions): Promise<Connection> {
let connection: Connection | null = null;
try {
Expand Down Expand Up @@ -103,6 +108,8 @@ export async function performInitialHandshake(
const authContext = new AuthContext(conn, credentials, options);
conn.authContext = authContext;

// If we encounter an error preparing the handshake document, do NOT apply backpressure labels. Errors
// encountered building the handshake document are all client-side, and do not indicate an overloaded server.
const handshakeDoc = await prepareHandshakeDocument(authContext);

// @ts-expect-error: TODO(NODE-5141): The options need to be filtered properly, Connection options differ from Command options
Expand Down Expand Up @@ -163,12 +170,15 @@ export async function performInitialHandshake(
try {
await provider.auth(authContext);
} catch (error) {
// NOTE: If we encounter an error authenticating a connection, do NOT apply backpressure labels.

if (error instanceof MongoError) {
error.addErrorLabel(MongoErrorLabel.HandshakeError);
if (needsRetryableWriteLabel(error, response.maxWireVersion, conn.description.type)) {
error.addErrorLabel(MongoErrorLabel.RetryableWriteError);
}
}

throw error;
}
}
Expand All @@ -189,6 +199,11 @@ export async function performInitialHandshake(
if (error instanceof MongoError) {
error.addErrorLabel(MongoErrorLabel.HandshakeError);
}
// If we encounter a network error executing the initial handshake, apply backpressure labels.
if (error instanceof MongoNetworkError) {
applyBackpressureLabels(error);
}

throw error;
}
}
Expand Down Expand Up @@ -424,6 +439,10 @@ export async function makeSocket(options: MakeConnectionOptions): Promise<Stream
socket = await connectedSocket;
return socket;
} catch (error) {
// If we encounter an error while establishing a socket, apply the backpressure labels to it. We cannot
// differentiate between DNS, TLS errors and network errors without refactoring our connection establishment to
// handle all three steps separately.
applyBackpressureLabels(error);
socket.destroy();
throw error;
} finally {
Expand Down
4 changes: 3 additions & 1 deletion src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ export const MongoErrorLabel = Object.freeze({
ResetPool: 'ResetPool',
PoolRequestedRetry: 'PoolRequestedRetry',
InterruptInUseConnections: 'InterruptInUseConnections',
NoWritesPerformed: 'NoWritesPerformed'
NoWritesPerformed: 'NoWritesPerformed',
RetryableError: 'RetryableError',
SystemOverloadedError: 'SystemOverloadedError'
} as const);

/** @public */
Expand Down
9 changes: 7 additions & 2 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ export class Server extends TypedEventEmitter<ServerEvents> {
error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError);
const isNetworkTimeoutBeforeHandshakeError =
error instanceof MongoNetworkError && error.beforeHandshake;
const isAuthHandshakeError = error.hasErrorLabel(MongoErrorLabel.HandshakeError);
const isAuthOrEstablishmentHandshakeError = error.hasErrorLabel(MongoErrorLabel.HandshakeError);
const isSystemOverloadError = error.hasErrorLabel(MongoErrorLabel.SystemOverloadedError);

// Perhaps questionable and divergent from the spec, but considering MongoParseErrors like state change errors was legacy behavior.
if (isStateChangeError(error) || error instanceof MongoParseError) {
Expand All @@ -424,8 +425,12 @@ export class Server extends TypedEventEmitter<ServerEvents> {
} else if (
isNetworkNonTimeoutError ||
isNetworkTimeoutBeforeHandshakeError ||
isAuthHandshakeError
isAuthOrEstablishmentHandshakeError
) {
// Do NOT clear the pool if we encounter a system overloaded error.
if (isSystemOverloadError) {
return;
}
// from the SDAM spec: The driver MUST synchronize clearing the pool with updating the topology.
// In load balanced mode: there is no monitoring, so there is no topology to update. We simply clear the pool.
// For other topologies: the `ResetPool` label instructs the topology to clear the server's pool in `updateServer()`.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import { expect } from 'chai';
import { once } from 'events';

import { type MongoClient } from '../../../src';
import {
type ConnectionCheckOutFailedEvent,
type ConnectionPoolClearedEvent,
type MongoClient
} from '../../../src';
import {
CONNECTION_POOL_CLEARED,
CONNECTION_POOL_READY,
SERVER_HEARTBEAT_FAILED,
SERVER_HEARTBEAT_SUCCEEDED
} from '../../../src/constants';
import { sleep } from '../../tools/utils';

describe('Server Discovery and Monitoring Prose Tests', function () {
context('Monitors sleep at least minHeartbeatFrequencyMS between checks', function () {
Expand Down Expand Up @@ -187,4 +192,93 @@ describe('Server Discovery and Monitoring Prose Tests', function () {
}
});
});

context('Connection Pool Backpressure', function () {
let client: MongoClient;
const checkoutFailedEvents: Array<ConnectionCheckOutFailedEvent> = [];
const poolClearedEvents: Array<ConnectionPoolClearedEvent> = [];

beforeEach(async function () {
// 1. Create a test client that listens to CMAP events, with maxConnecting=100.
client = this.configuration.newClient({}, { maxConnecting: 100 });

client.on('connectionCheckOutFailed', e => checkoutFailedEvents.push(e));
client.on('connectionPoolCleared', e => poolClearedEvents.push(e));

await client.connect();

// 2. Run the following commands to set up the rate limiter.
// ```python
// client.admin.command("setParameter", 1, ingressConnectionEstablishmentRateLimiterEnabled=True)
// client.admin.command("setParameter", 1, ingressConnectionEstablishmentRatePerSec=20)
// client.admin.command("setParameter", 1, ingressConnectionEstablishmentBurstCapacitySecs=1)
// client.admin.command("setParameter", 1, ingressConnectionEstablishmentMaxQueueDepth=1)
// ```
const admin = client.db('admin').admin();
await admin.command({
setParameter: 1,
ingressConnectionEstablishmentRateLimiterEnabled: true
});
await admin.command({
setParameter: 1,
ingressConnectionEstablishmentRatePerSec: 20
});
await admin.command({
setParameter: 1,
ingressConnectionEstablishmentBurstCapacitySecs: 1
});
await admin.command({
setParameter: 1,
ingressConnectionEstablishmentMaxQueueDepth: 1
});

// 3. Add a document to the test collection so that the sleep operations will actually block:
// `client.test.test.insert_one({})`.
await client.db('test').collection('test').insertOne({});
});

afterEach(async function () {
// 7. Sleep for 1 second to clear the rate limiter.
await sleep(1000);

// 8. Ensure that the following command runs at test teardown even if the test fails.
// `client.admin("setParameter", 1, ingressConnectionEstablishmentRateLimiterEnabled=False)`.
const admin = client.db('admin').admin();
await admin.command({
setParameter: 1,
ingressConnectionEstablishmentRateLimiterEnabled: false
});

await client.close();
});

it(
'does not clear the pool when connections are closed due to connection storms',
{
requires: {
// This test requires MongoDB 7.0+.
mongodb: '>=7.0' // rate limiting added in 7.0
}
},
async function () {
// 4. Run the following find command on the collection in 100 parallel threads/coroutines. Run these commands concurrently
// but block on their completion, and ignore errors raised by the command.
// `client.test.test.find_one({"$where": "function() { sleep(2000); return true; }})`
await Promise.allSettled(
Array.from({ length: 100 }).map(() =>
client
.db('test')
.collection('test')
.findOne({ $where: 'function() { sleep(2000); return true; }' })
)
);

// 5. Assert that at least 10 `ConnectionCheckOutFailedEvent` occurred.
expect(checkoutFailedEvents.length).to.be.at.least(10);

// 6. Assert that 0 `PoolClearedEvent` occurred.
expect(poolClearedEvents).to.be.empty;
}
);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"isMaster",
"hello"
],
"closeConnection": true,
"errorCode": 91,
"appName": "poolCreateMinSizeErrorTest"
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ failPoint:
mode: { times: 50 }
data:
failCommands: ["isMaster","hello"]
closeConnection: true
errorCode: 91
appName: "poolCreateMinSizeErrorTest"
poolOptions:
minPoolSize: 1
Expand Down
8 changes: 4 additions & 4 deletions test/spec/load-balancers/sdam-error-handling.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"description": "state change errors are correctly handled",
"schemaVersion": "1.3",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"topologies": [
Expand Down Expand Up @@ -263,7 +263,7 @@
"description": "errors during the initial connection hello are ignored",
"runOnRequirements": [
{
"minServerVersion": "4.9"
"minServerVersion": "4.4.7"
}
],
"operations": [
Expand All @@ -282,7 +282,7 @@
"isMaster",
"hello"
],
"closeConnection": true,
"errorCode": 11600,
"appName": "lbSDAMErrorTestClient"
}
}
Expand All @@ -297,7 +297,7 @@
}
},
"expectError": {
"isClientError": true
"isError": true
}
}
],
Expand Down
11 changes: 5 additions & 6 deletions test/spec/load-balancers/sdam-error-handling.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
description: state change errors are correctly handled

schemaVersion: '1.3'
schemaVersion: '1.4'

runOnRequirements:
- topologies: [ load-balanced ]
Expand Down Expand Up @@ -141,9 +141,8 @@ tests:
# to the same mongos on which the failpoint is set.
- description: errors during the initial connection hello are ignored
runOnRequirements:
# Server version 4.9+ is needed to set a fail point on the initial
# connection handshake with the appName filter due to SERVER-49336.
- minServerVersion: '4.9'
# Require SERVER-49336 for failCommand + appName on the initial handshake.
- minServerVersion: '4.4.7'
operations:
- name: failPoint
object: testRunner
Expand All @@ -154,14 +153,14 @@ tests:
mode: { times: 1 }
data:
failCommands: [isMaster, hello]
closeConnection: true
errorCode: 11600
appName: *singleClientAppName
- name: insertOne
object: *singleColl
arguments:
document: { x: 1 }
expectError:
isClientError: true
isError: true
expectEvents:
- client: *singleClient
eventType: cmap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,22 @@
"outcome": {
"servers": {
"a:27017": {
"type": "Unknown",
"topologyVersion": null,
"type": "RSPrimary",
"setName": "rs",
"topologyVersion": {
"processId": {
"$oid": "000000000000000000000001"
},
"counter": {
"$numberLong": "1"
}
},
"pool": {
"generation": 1
"generation": 0
}
}
},
"topologyType": "ReplicaSetNoPrimary",
"topologyType": "ReplicaSetWithPrimary",
"logicalSessionTimeoutMinutes": null,
"setName": "rs"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,4 @@ phases:
when: beforeHandshakeCompletes
maxWireVersion: 9
type: timeout
outcome:
servers:
a:27017:
type: Unknown
topologyVersion: null
pool:
generation: 1
topologyType: ReplicaSetNoPrimary
logicalSessionTimeoutMinutes: null
setName: rs
outcome: *outcome
Loading