diff --git a/docs/pages/features/_meta.js b/docs/pages/features/_meta.js index 62f1660ca..52e691126 100644 --- a/docs/pages/features/_meta.js +++ b/docs/pages/features/_meta.js @@ -8,4 +8,5 @@ export default { native: 'Native', esm: 'ESM', callbacks: 'Callbacks', + tracing: 'Tracing Channels', } diff --git a/docs/pages/features/tracing.mdx b/docs/pages/features/tracing.mdx new file mode 100644 index 000000000..43caa6ed7 --- /dev/null +++ b/docs/pages/features/tracing.mdx @@ -0,0 +1,229 @@ +--- +title: Tracing Channels +--- + +node-postgres publishes lifecycle events on a set of named tracing channels so +you can instrument queries, connections, and pool activity without +monkey-patching. If you're building an APM integration, custom tracer, or just +want structured logging of your database calls, you can subscribe to these +channels and node-postgres will tell you when things happen. + +Tracing is built on +[`node:diagnostics_channel`](https://nodejs.org/api/diagnostics_channel.html) +and requires Node.js 20+. On older versions or non-Node runtimes it silently +no-ops. When nothing is listening the overhead is zero since every emission site +is guarded by a `hasSubscribers` check. + +## Quick start + +Here's a minimal example that logs every query with its duration: + +```js +import dc from 'node:diagnostics_channel' + +const timings = new WeakMap() +const channel = dc.tracingChannel('pg:query') + +channel.subscribe({ + start(ctx) { + timings.set(ctx, process.hrtime.bigint()) + }, + asyncEnd(ctx) { + const elapsed = Number(process.hrtime.bigint() - timings.get(ctx)) / 1e6 + console.log(`${ctx.query.text} completed in ${elapsed.toFixed(2)}ms`) + timings.delete(ctx) + }, + error(ctx) { + timings.delete(ctx) + }, + end() {}, + asyncStart() {}, +}) +``` + +You don't need to change how you create clients or pools. Just subscribe to the +channel and node-postgres handles the rest. + +## The channels + +### TracingChannels + +These emit the full lifecycle: `start`, `end`, `asyncStart`, `asyncEnd`, and +`error`. + +| Channel | Fires for | +| --- | --- | +| `pg:query` | Each `client.query()` call | +| `pg:connection` | Each `client.connect()` call | +| `pg:pool:connect` | Each `pool.connect()` call (acquiring a client from the pool) | + +### Plain channels + +These publish a single message. Subscribe with `dc.channel(name).subscribe(cb)`. + +| Channel | Fires for | +| --- | --- | +| `pg:pool:release` | A client is released back to the pool | +| `pg:pool:remove` | A client is removed from the pool | + +### How they fit together + +A typical pooled request moves through the channels in this order: + +```text +pg:pool:connect acquire a client from the pool + └─ pg:connection connect (only if the pool creates a new client) +pg:query execute the query +pg:pool:release release the client back to the pool +``` + +`pg:connection` only fires when the pool has to establish a new connection. For +reused clients it is skipped. `pg:pool:remove` fires separately when a client +is evicted (e.g. after an error or when `maxUses` is exceeded). + +## Lifecycle events + +Each tracing channel exposes five sub-channels that fire in a fixed order +depending on whether the operation completes synchronously or asynchronously: + +- **Synchronous success:** `start` -> `end` +- **Synchronous failure:** `start` -> `error` -> `end` +- **Asynchronous success:** `start` -> `end` -> `asyncStart` -> `asyncEnd` +- **Asynchronous failure:** `start` -> `end` -> `asyncStart` -> `error` -> `asyncEnd` + +In practice queries and connections are always asynchronous, so you'll mostly +work with `start` and `asyncEnd`. The context object is shared across all +events for a single operation, and properties like `result` and `error` are +added as the operation progresses. + +## Context payloads + +### pg:query + +```js +{ + query: { + text: 'SELECT $1::int', // the query text + name: undefined, // prepared statement name, if any + }, + client: { + database: 'mydb', + host: 'localhost', + port: 5432, + user: 'postgres', + processID: 123, // PostgreSQL backend process ID + ssl: false, + }, + // added on asyncEnd: + result: { + rowCount: 1, + command: 'SELECT', + }, +} +``` + +### pg:connection + +```js +{ + connection: { + database: 'mydb', + host: 'localhost', + port: 5432, + user: 'postgres', + ssl: false, + }, +} +``` + +### pg:pool:connect + +```js +{ + pool: { + totalCount: 2, // total clients in the pool + idleCount: 1, // idle clients available + waitingCount: 0, // callers waiting for a client + maxSize: 10, // configured pool maximum + }, + // added on asyncEnd: + client: { + processID: 123, + reused: true, // whether the client was already in the pool + }, +} +``` + +### pg:pool:release + +```js +{ + client: { processID: 123 }, + error: undefined, // the Error passed to release(err), if any +} +``` + +### pg:pool:remove + +```js +{ + client: { processID: 123 }, +} +``` + +## More examples + +### Monitoring pool usage + +```js +import dc from 'node:diagnostics_channel' + +const poolConnect = dc.tracingChannel('pg:pool:connect') +const poolRelease = dc.channel('pg:pool:release') + +poolConnect.subscribe({ + start(ctx) { + console.log('pool checkout:', ctx.pool.idleCount, 'idle,', ctx.pool.waitingCount, 'waiting') + }, + asyncEnd(ctx) { + console.log('checked out client', ctx.client.processID, '(reused:', ctx.client.reused + ')') + }, + error() {}, + end() {}, + asyncStart() {}, +}) + +poolRelease.subscribe((msg) => { + console.log('client', msg.client.processID, 'released') +}) +``` + +### Subscribing to all query events + +```js +import dc from 'node:diagnostics_channel' + +const channel = dc.tracingChannel('pg:query') + +channel.subscribe({ + start(ctx) { + console.log('query started:', ctx.query.text) + }, + asyncEnd(ctx) { + console.log('query completed:', ctx.result.command, ctx.result.rowCount, 'rows') + }, + error(ctx) { + console.error('query failed:', ctx.error) + }, + end() {}, + asyncStart() {}, +}) +``` + +## Notes + +- These channels report observability events. They are not hooks for altering + behavior; mutating a context payload does not change node-postgres internals. +- The plain channels (`pg:pool:release`, `pg:pool:remove`) are not + TracingChannels because they represent point-in-time events with no async + continuation. diff --git a/packages/pg-pool/diagnostics.js b/packages/pg-pool/diagnostics.js new file mode 100644 index 000000000..15adc414d --- /dev/null +++ b/packages/pg-pool/diagnostics.js @@ -0,0 +1,35 @@ +'use strict' + +const noopChannel = { hasSubscribers: false } + +let poolConnectChannel = noopChannel +let poolReleaseChannel = noopChannel +let poolRemoveChannel = noopChannel + +try { + let dc + if (typeof process.getBuiltInModule === 'function') { + dc = process.getBuiltInModule('diagnostics_channel') + } else { + dc = require('diagnostics_channel') + } + if (typeof dc.tracingChannel === 'function') { + poolConnectChannel = dc.tracingChannel('pg:pool:connect') + } + if (typeof dc.channel === 'function') { + poolReleaseChannel = dc.channel('pg:pool:release') + poolRemoveChannel = dc.channel('pg:pool:remove') + } +} catch (e) { + // diagnostics_channel not available (non-Node environment) +} + +// Check explicitly for `false` rather than truthiness because the aggregated +// `hasSubscribers` getter on TracingChannel is `undefined` on Node 18 (which +// backported TracingChannel but not the getter). When `undefined`, we assume +// there may be subscribers and trace unconditionally. +function shouldTrace(channel) { + return channel.hasSubscribers !== false +} + +module.exports = { poolConnectChannel, poolReleaseChannel, poolRemoveChannel, shouldTrace } diff --git a/packages/pg-pool/index.js b/packages/pg-pool/index.js index ab514fa88..02740b967 100644 --- a/packages/pg-pool/index.js +++ b/packages/pg-pool/index.js @@ -1,5 +1,6 @@ 'use strict' const EventEmitter = require('events').EventEmitter +const { poolConnectChannel, poolReleaseChannel, poolRemoveChannel, shouldTrace } = require('./diagnostics') const NOOP = function () {} @@ -178,6 +179,10 @@ class Pool extends EventEmitter { this._clients = this._clients.filter((c) => c !== client) const context = this + if (shouldTrace(poolRemoveChannel)) { + poolRemoveChannel.publish({ client: { processID: client.processID } }) + } + client.end(() => { context.emit('remove', client) @@ -196,6 +201,31 @@ class Pool extends EventEmitter { const response = promisify(this.Promise, cb) const result = response.result + if (shouldTrace(poolConnectChannel)) { + const context = { + pool: { + totalCount: this.totalCount, + idleCount: this.idleCount, + waitingCount: this.waitingCount, + maxSize: this.options.max, + }, + } + const origCb = response.callback + const enrichedCb = (err, client, done) => { + if (client) context.client = { processID: client.processID, reused: !!client._poolUseCount } + return origCb(err, client, done) + } + poolConnectChannel.traceCallback( + (tracedCb) => { + response.callback = tracedCb + }, + 0, + context, + null, + enrichedCb + ) + } + // if we don't have to connect a new client, don't do so if (this._isFull() || this._idle.length) { // if we have idle clients schedule a pulse immediately @@ -388,6 +418,10 @@ class Pool extends EventEmitter { this.emit('release', err, client) + if (shouldTrace(poolReleaseChannel)) { + poolReleaseChannel.publish({ client: { processID: client.processID }, error: err || undefined }) + } + // TODO(bmc): expose a proper, public interface _queryable and _ending if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) { if (client._poolUseCount >= this.options.maxUses) { diff --git a/packages/pg-pool/package.json b/packages/pg-pool/package.json index 6b9f60155..a075de4fe 100644 --- a/packages/pg-pool/package.json +++ b/packages/pg-pool/package.json @@ -45,6 +45,7 @@ }, "files": [ "index.js", + "diagnostics.js", "esm" ] } diff --git a/packages/pg-pool/test/diagnostics.js b/packages/pg-pool/test/diagnostics.js new file mode 100644 index 000000000..1038a1300 --- /dev/null +++ b/packages/pg-pool/test/diagnostics.js @@ -0,0 +1,197 @@ +'use strict' + +const expect = require('expect.js') +const EventEmitter = require('events').EventEmitter +const describe = require('mocha').describe +const it = require('mocha').it +const dc = require('diagnostics_channel') +const Pool = require('../') + +// TracingChannel exists on Node 18+ but the aggregated hasSubscribers getter +// and stable unsubscribe behavior require Node 19.9+/20.5+. Skip tracing +// tests on older versions where TracingChannel is missing or has internal bugs. +const hasStableTracingChannel = + typeof dc.tracingChannel === 'function' && typeof dc.tracingChannel('pg:pool:test:probe').hasSubscribers === 'boolean' + +function mockClient(methods) { + return function () { + const client = new EventEmitter() + client.end = function (cb) { + if (cb) process.nextTick(cb) + } + client._queryable = true + client._ending = false + client.processID = 12345 + Object.assign(client, methods) + return client + } +} + +describe('diagnostics channels', function () { + describe('pg:pool:connect', function () { + ;(hasStableTracingChannel ? it : it.skip)('publishes start event when connect is called', function (done) { + const pool = new Pool({ + Client: mockClient({ + connect: function (cb) { + process.nextTick(() => cb(null)) + }, + }), + }) + + let capturedContext + const channel = dc.tracingChannel('pg:pool:connect') + const subs = { + start: (ctx) => { + capturedContext = ctx + }, + end: () => {}, + asyncStart: () => {}, + asyncEnd: () => {}, + error: () => {}, + } + + channel.subscribe(subs) + + pool.connect(function (err, client, release) { + if (err) return done(err) + release() + pool.end(() => { + expect(capturedContext).to.be.ok() + expect(capturedContext.pool).to.be.ok() + expect(capturedContext.pool.maxSize).to.be(10) + expect(capturedContext.pool.totalCount).to.be.a('number') + + channel.unsubscribe(subs) + done() + }) + }) + }) + ;(hasStableTracingChannel ? it : it.skip)('enriches context with client info on asyncEnd', function (done) { + const pool = new Pool({ + Client: mockClient({ + connect: function (cb) { + process.nextTick(() => cb(null)) + }, + }), + }) + + const channel = dc.tracingChannel('pg:pool:connect') + const subs = { + start: () => {}, + end: () => {}, + asyncStart: () => {}, + asyncEnd: (ctx) => { + expect(ctx.client).to.be.ok() + expect(ctx.client.processID).to.be(12345) + + channel.unsubscribe(subs) + done() + }, + error: () => {}, + } + + channel.subscribe(subs) + + pool.connect(function (err, client, release) { + if (err) return done(err) + release() + pool.end() + }) + }) + }) + + describe('pg:pool:release', function () { + it('publishes when a client is released', function (done) { + const pool = new Pool({ + Client: mockClient({ + connect: function (cb) { + process.nextTick(() => cb(null)) + }, + }), + }) + + let releaseMessage + const channel = dc.channel('pg:pool:release') + const onMessage = (msg) => { + releaseMessage = msg + } + channel.subscribe(onMessage) + + pool.connect(function (err, client, release) { + if (err) return done(err) + release() + pool.end(() => { + expect(releaseMessage).to.be.ok() + expect(releaseMessage.client).to.be.ok() + expect(releaseMessage.client.processID).to.be(12345) + + channel.unsubscribe(onMessage) + done() + }) + }) + }) + + it('includes error when released with error', function (done) { + const pool = new Pool({ + Client: mockClient({ + connect: function (cb) { + process.nextTick(() => cb(null)) + }, + }), + }) + + let releaseMessage + const channel = dc.channel('pg:pool:release') + const onMessage = (msg) => { + releaseMessage = msg + } + channel.subscribe(onMessage) + + pool.connect(function (err, client, release) { + if (err) return done(err) + const releaseError = new Error('test error') + release(releaseError) + pool.end(() => { + expect(releaseMessage).to.be.ok() + expect(releaseMessage.error).to.be(releaseError) + + channel.unsubscribe(onMessage) + done() + }) + }) + }) + }) + + describe('pg:pool:remove', function () { + it('publishes when a client is removed', function (done) { + const pool = new Pool({ + Client: mockClient({ + connect: function (cb) { + process.nextTick(() => cb(null)) + }, + }), + }) + + let removeMessage + const channel = dc.channel('pg:pool:remove') + const onMessage = (msg) => { + removeMessage = msg + } + channel.subscribe(onMessage) + + pool.connect(function (err, client, release) { + if (err) return done(err) + // release with error to trigger removal + release(new Error('force remove')) + pool.end(() => { + expect(removeMessage).to.be.ok() + expect(removeMessage.client).to.be.ok() + expect(removeMessage.client.processID).to.be(12345) + + channel.unsubscribe(onMessage) + done() + }) + }) + }) + }) +}) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index d6c57194c..4f2cd3fe5 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -9,6 +9,7 @@ const Query = require('./query') const defaults = require('./defaults') const Connection = require('./connection') const crypto = require('./crypto/utils') +const { queryChannel, connectionChannel, shouldTrace } = require('./diagnostics') const activeQueryDeprecationNotice = nodeUtils.deprecate( () => {}, @@ -220,18 +221,30 @@ class Client extends EventEmitter { connect(callback) { if (callback) { - this._connect(callback) + if (shouldTrace(connectionChannel)) { + const context = { + connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl }, + } + connectionChannel.traceCallback((tracedCb) => this._connect(tracedCb), 0, context, null, callback) + } else { + this._connect(callback) + } return } return new this._Promise((resolve, reject) => { - this._connect((error) => { - if (error) { - reject(error) - } else { - resolve(this) + const callback = (error) => { + if (error) reject(error) + else resolve(this) + } + if (shouldTrace(connectionChannel)) { + const context = { + connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl }, } - }) + connectionChannel.traceCallback((tracedCb) => this._connect(tracedCb), 0, context, null, callback) + } else { + this._connect(callback) + } }) } @@ -705,11 +718,42 @@ class Client extends EventEmitter { return result } - if (this._queryQueue.length > 0) { - queryQueueLengthDeprecationNotice() + const enqueue = () => { + if (this._queryQueue.length > 0) queryQueueLengthDeprecationNotice() + this._queryQueue.push(query) + this._pulseQueryQueue() + } + + if (shouldTrace(queryChannel) && query.callback) { + const context = { + query: { text: query.text, name: query.name }, + client: { + database: this.database, + host: this.host, + port: this.port, + user: this.user, + processID: this.processID, + ssl: !!this.ssl, + }, + } + const origCb = query.callback + const enrichedCb = (err, res) => { + if (res) context.result = { rowCount: res.rowCount, command: res.command } + return origCb(err, res) + } + queryChannel.traceCallback( + (tracedCb) => { + query.callback = tracedCb + enqueue() + }, + 0, + context, + null, + enrichedCb + ) + } else { + enqueue() } - this._queryQueue.push(query) - this._pulseQueryQueue() return result } diff --git a/packages/pg/lib/diagnostics.js b/packages/pg/lib/diagnostics.js new file mode 100644 index 000000000..03449a04b --- /dev/null +++ b/packages/pg/lib/diagnostics.js @@ -0,0 +1,31 @@ +'use strict' + +const noopChannel = { hasSubscribers: false } + +let queryChannel = noopChannel +let connectionChannel = noopChannel + +try { + let dc + if (typeof process.getBuiltInModule === 'function') { + dc = process.getBuiltInModule('diagnostics_channel') + } else { + dc = require('diagnostics_channel') + } + if (typeof dc.tracingChannel === 'function') { + queryChannel = dc.tracingChannel('pg:query') + connectionChannel = dc.tracingChannel('pg:connection') + } +} catch (e) { + // diagnostics_channel not available (non-Node environment) +} + +// Check explicitly for `false` rather than truthiness because the aggregated +// `hasSubscribers` getter on TracingChannel is `undefined` on Node 18 (which +// backported TracingChannel but not the getter). When `undefined`, we assume +// there may be subscribers and trace unconditionally. +function shouldTrace(channel) { + return channel.hasSubscribers !== false +} + +module.exports = { queryChannel, connectionChannel, shouldTrace } diff --git a/packages/pg/test/unit/client/diagnostics-tests.js b/packages/pg/test/unit/client/diagnostics-tests.js new file mode 100644 index 000000000..0db837eec --- /dev/null +++ b/packages/pg/test/unit/client/diagnostics-tests.js @@ -0,0 +1,205 @@ +'use strict' +const helper = require('./test-helper') +const assert = require('assert') +const dc = require('diagnostics_channel') + +// TracingChannel exists on Node 18+ but the aggregated hasSubscribers getter +// and stable unsubscribe behavior require Node 19.9+/20.5+. Skip tests on +// older versions where TracingChannel is missing or has internal bugs. +const hasStableTracingChannel = + typeof dc.tracingChannel === 'function' && typeof dc.tracingChannel('pg:test:probe').hasSubscribers === 'boolean' + +const suite = new helper.Suite() +const test = suite.test.bind(suite) +// pass undefined as callback to skip when TracingChannel is unavailable/unstable +const testTracing = (name, cb) => test(name, hasStableTracingChannel ? cb : undefined) + +testTracing('query diagnostics channel', function () { + testTracing('publishes start and asyncEnd on successful query', function (done) { + const client = helper.client() + client.connection.emit('readyForQuery') + + const events = [] + const channel = dc.tracingChannel('pg:query') + + const subs = { + start: (ctx) => events.push({ type: 'start', context: ctx }), + end: () => {}, + asyncStart: () => {}, + asyncEnd: (ctx) => { + events.push({ type: 'asyncEnd', context: ctx }) + + // asyncEnd fires after the callback, so check everything here + assert.equal(events.length, 2) + assert.equal(events[0].type, 'start') + assert.equal(events[0].context.query.text, 'SELECT 1') + assert.equal(events[0].context.client.database, client.database) + + assert.equal(events[1].type, 'asyncEnd') + assert.equal(events[1].context.result.command, 'SELECT') + assert.equal(events[1].context.result.rowCount, 1) + + channel.unsubscribe(subs) + done() + }, + error: (ctx) => events.push({ type: 'error', context: ctx }), + } + + channel.subscribe(subs) + + client.query('SELECT 1', (err, res) => { + assert.ifError(err) + }) + + // simulate query execution + client.connection.emit('rowDescription', { fields: [{ name: 'col' }] }) + client.connection.emit('dataRow', { fields: ['value'] }) + client.connection.emit('commandComplete', { text: 'SELECT 1' }) + client.connection.emit('readyForQuery') + }) + + testTracing('publishes error on failed query', function (done) { + const client = helper.client() + client.connection.emit('readyForQuery') + + const events = [] + const channel = dc.tracingChannel('pg:query') + + const subs = { + start: (ctx) => events.push({ type: 'start', context: ctx }), + end: () => {}, + asyncStart: () => {}, + asyncEnd: () => {}, + error: (ctx) => { + events.push({ type: 'error', context: ctx }) + + const startEvent = events.find((e) => e.type === 'start') + assert.ok(startEvent) + assert.equal(startEvent.context.query.text, 'BAD QUERY') + + channel.unsubscribe(subs) + done() + }, + } + + channel.subscribe(subs) + + client.query('BAD QUERY', (err) => { + assert.ok(err) + }) + + // simulate error + client.connection.emit('errorMessage', { + severity: 'ERROR', + message: 'syntax error', + }) + }) + + testTracing('query context includes client info', function (done) { + const client = helper.client({ database: 'testdb', host: 'localhost', port: 5432, user: 'testuser' }) + client.connection.emit('readyForQuery') + + let capturedContext + const channel = dc.tracingChannel('pg:query') + + const subs = { + start: (ctx) => { + capturedContext = ctx + }, + end: () => {}, + asyncStart: () => {}, + asyncEnd: () => { + assert.equal(capturedContext.client.host, 'localhost') + assert.equal(capturedContext.client.user, 'testuser') + + channel.unsubscribe(subs) + done() + }, + error: () => {}, + } + + channel.subscribe(subs) + + client.query('SELECT 1', () => {}) + + client.connection.emit('rowDescription', { fields: [{ name: 'col' }] }) + client.connection.emit('dataRow', { fields: ['value'] }) + client.connection.emit('commandComplete', { text: 'SELECT 1' }) + client.connection.emit('readyForQuery') + }) + + testTracing('promise query publishes diagnostics', function (done) { + const client = helper.client() + client.connection.emit('readyForQuery') + + const events = [] + const channel = dc.tracingChannel('pg:query') + + const subs = { + start: (ctx) => events.push({ type: 'start', context: ctx }), + end: () => {}, + asyncStart: () => {}, + asyncEnd: (ctx) => { + events.push({ type: 'asyncEnd', context: ctx }) + + assert.ok(events.find((e) => e.type === 'start')) + assert.equal(events[0].context.query.text, 'SELECT 1') + + channel.unsubscribe(subs) + done() + }, + error: () => {}, + } + + channel.subscribe(subs) + + client.query('SELECT 1').then(() => {}) + + client.connection.emit('rowDescription', { fields: [{ name: 'col' }] }) + client.connection.emit('dataRow', { fields: ['value'] }) + client.connection.emit('commandComplete', { text: 'SELECT 1' }) + client.connection.emit('readyForQuery') + }) +}) + +testTracing('connection diagnostics channel', function () { + testTracing('publishes start on connect with callback', function (done) { + const Connection = require('../../../lib/connection') + const { Client } = helper + + let capturedContext + const channel = dc.tracingChannel('pg:connection') + + const subs = { + start: (ctx) => { + capturedContext = ctx + }, + end: () => {}, + asyncStart: () => {}, + asyncEnd: () => { + assert.ok(capturedContext) + assert.equal(capturedContext.connection.database, 'testdb') + assert.equal(capturedContext.connection.host, 'myhost') + + channel.unsubscribe(subs) + done() + }, + error: () => {}, + } + + channel.subscribe(subs) + + const connection = new Connection({ stream: 'no' }) + connection.startup = function () {} + connection.connect = function () {} + const client = new Client({ connection: connection, database: 'testdb', host: 'myhost', port: 5432 }) + + client.connect((err) => { + assert.ifError(err) + }) + + // simulate successful connection + connection.emit('connect') + connection.emit('readyForQuery') + }) +})