Skip to content
1 change: 1 addition & 0 deletions docs/pages/features/_meta.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ export default {
native: 'Native',
esm: 'ESM',
callbacks: 'Callbacks',
tracing: 'Tracing Channels',
}
229 changes: 229 additions & 0 deletions docs/pages/features/tracing.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
---
title: Tracing Channels
---

node-postgres publishes lifecycle events on a set of named tracing channels so
you can instrument queries, connections, and pool activity without
monkey-patching. If you're building an APM integration, custom tracer, or just
want structured logging of your database calls, you can subscribe to these
channels and node-postgres will tell you when things happen.

Tracing is built on
[`node:diagnostics_channel`](https://nodejs.org/api/diagnostics_channel.html)
and requires Node.js 20+. On older versions or non-Node runtimes it silently
no-ops. When nothing is listening the overhead is zero since every emission site
is guarded by a `hasSubscribers` check.

## Quick start

Here's a minimal example that logs every query with its duration:

```js
import dc from 'node:diagnostics_channel'

const timings = new WeakMap()
const channel = dc.tracingChannel('pg:query')

channel.subscribe({
start(ctx) {
timings.set(ctx, process.hrtime.bigint())
},
asyncEnd(ctx) {
const elapsed = Number(process.hrtime.bigint() - timings.get(ctx)) / 1e6
console.log(`${ctx.query.text} completed in ${elapsed.toFixed(2)}ms`)
timings.delete(ctx)
},
error(ctx) {
timings.delete(ctx)
},
end() {},
asyncStart() {},
})
```

You don't need to change how you create clients or pools. Just subscribe to the
channel and node-postgres handles the rest.

## The channels

### TracingChannels

These emit the full lifecycle: `start`, `end`, `asyncStart`, `asyncEnd`, and
`error`.

| Channel | Fires for |
| --- | --- |
| `pg:query` | Each `client.query()` call |
| `pg:connection` | Each `client.connect()` call |
| `pg:pool:connect` | Each `pool.connect()` call (acquiring a client from the pool) |

### Plain channels

These publish a single message. Subscribe with `dc.channel(name).subscribe(cb)`.

| Channel | Fires for |
| --- | --- |
| `pg:pool:release` | A client is released back to the pool |
| `pg:pool:remove` | A client is removed from the pool |

### How they fit together

A typical pooled request moves through the channels in this order:

```text
pg:pool:connect acquire a client from the pool
└─ pg:connection connect (only if the pool creates a new client)
pg:query execute the query
pg:pool:release release the client back to the pool
```

`pg:connection` only fires when the pool has to establish a new connection. For
reused clients it is skipped. `pg:pool:remove` fires separately when a client
is evicted (e.g. after an error or when `maxUses` is exceeded).

## Lifecycle events

Each tracing channel exposes five sub-channels that fire in a fixed order
depending on whether the operation completes synchronously or asynchronously:

- **Synchronous success:** `start` -> `end`
- **Synchronous failure:** `start` -> `error` -> `end`
- **Asynchronous success:** `start` -> `end` -> `asyncStart` -> `asyncEnd`
- **Asynchronous failure:** `start` -> `end` -> `asyncStart` -> `error` -> `asyncEnd`

In practice queries and connections are always asynchronous, so you'll mostly
work with `start` and `asyncEnd`. The context object is shared across all
events for a single operation, and properties like `result` and `error` are
added as the operation progresses.

## Context payloads

### pg:query

```js
{
query: {
text: 'SELECT $1::int', // the query text
name: undefined, // prepared statement name, if any
},
client: {
database: 'mydb',
host: 'localhost',
port: 5432,
user: 'postgres',
processID: 123, // PostgreSQL backend process ID
ssl: false,
},
// added on asyncEnd:
result: {
rowCount: 1,
command: 'SELECT',
},
}
```

### pg:connection

```js
{
connection: {
database: 'mydb',
host: 'localhost',
port: 5432,
user: 'postgres',
ssl: false,
},
}
```

### pg:pool:connect

```js
{
pool: {
totalCount: 2, // total clients in the pool
idleCount: 1, // idle clients available
waitingCount: 0, // callers waiting for a client
maxSize: 10, // configured pool maximum
},
// added on asyncEnd:
client: {
processID: 123,
reused: true, // whether the client was already in the pool
},
}
```

### pg:pool:release

```js
{
client: { processID: 123 },
error: undefined, // the Error passed to release(err), if any
}
```

### pg:pool:remove

```js
{
client: { processID: 123 },
}
```

## More examples

### Monitoring pool usage

```js
import dc from 'node:diagnostics_channel'

const poolConnect = dc.tracingChannel('pg:pool:connect')
const poolRelease = dc.channel('pg:pool:release')

poolConnect.subscribe({
start(ctx) {
console.log('pool checkout:', ctx.pool.idleCount, 'idle,', ctx.pool.waitingCount, 'waiting')
},
asyncEnd(ctx) {
console.log('checked out client', ctx.client.processID, '(reused:', ctx.client.reused + ')')
},
error() {},
end() {},
asyncStart() {},
})

poolRelease.subscribe((msg) => {
console.log('client', msg.client.processID, 'released')
})
```

### Subscribing to all query events

```js
import dc from 'node:diagnostics_channel'

const channel = dc.tracingChannel('pg:query')

channel.subscribe({
start(ctx) {
console.log('query started:', ctx.query.text)
},
asyncEnd(ctx) {
console.log('query completed:', ctx.result.command, ctx.result.rowCount, 'rows')
},
error(ctx) {
console.error('query failed:', ctx.error)
},
end() {},
asyncStart() {},
})
```

## Notes

- These channels report observability events. They are not hooks for altering
behavior; mutating a context payload does not change node-postgres internals.
- The plain channels (`pg:pool:release`, `pg:pool:remove`) are not
TracingChannels because they represent point-in-time events with no async
continuation.
35 changes: 35 additions & 0 deletions packages/pg-pool/diagnostics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
'use strict'

const noopChannel = { hasSubscribers: false }

let poolConnectChannel = noopChannel
let poolReleaseChannel = noopChannel
let poolRemoveChannel = noopChannel

try {
let dc
if (typeof process.getBuiltInModule === 'function') {
dc = process.getBuiltInModule('diagnostics_channel')
} else {
dc = require('diagnostics_channel')
}
if (typeof dc.tracingChannel === 'function') {
poolConnectChannel = dc.tracingChannel('pg:pool:connect')
}
if (typeof dc.channel === 'function') {
poolReleaseChannel = dc.channel('pg:pool:release')
poolRemoveChannel = dc.channel('pg:pool:remove')
}
} catch (e) {
// diagnostics_channel not available (non-Node environment)
}

// Check explicitly for `false` rather than truthiness because the aggregated
// `hasSubscribers` getter on TracingChannel is `undefined` on Node 18 (which
// backported TracingChannel but not the getter). When `undefined`, we assume
// there may be subscribers and trace unconditionally.
function shouldTrace(channel) {
return channel.hasSubscribers !== false
Comment thread
logaretm marked this conversation as resolved.
}

module.exports = { poolConnectChannel, poolReleaseChannel, poolRemoveChannel, shouldTrace }
34 changes: 34 additions & 0 deletions packages/pg-pool/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'
const EventEmitter = require('events').EventEmitter
const { poolConnectChannel, poolReleaseChannel, poolRemoveChannel, shouldTrace } = require('./diagnostics')

const NOOP = function () {}

Expand Down Expand Up @@ -178,6 +179,10 @@ class Pool extends EventEmitter {

this._clients = this._clients.filter((c) => c !== client)
const context = this
if (shouldTrace(poolRemoveChannel)) {
poolRemoveChannel.publish({ client: { processID: client.processID } })
}

client.end(() => {
context.emit('remove', client)

Expand All @@ -196,6 +201,31 @@ class Pool extends EventEmitter {
const response = promisify(this.Promise, cb)
const result = response.result

if (shouldTrace(poolConnectChannel)) {
const context = {
pool: {
totalCount: this.totalCount,
idleCount: this.idleCount,
waitingCount: this.waitingCount,
maxSize: this.options.max,
},
}
const origCb = response.callback
const enrichedCb = (err, client, done) => {
if (client) context.client = { processID: client.processID, reused: !!client._poolUseCount }
return origCb(err, client, done)
}
poolConnectChannel.traceCallback(
(tracedCb) => {
response.callback = tracedCb
},
0,
context,
null,
enrichedCb
)
}

// if we don't have to connect a new client, don't do so
if (this._isFull() || this._idle.length) {
// if we have idle clients schedule a pulse immediately
Expand Down Expand Up @@ -388,6 +418,10 @@ class Pool extends EventEmitter {

this.emit('release', err, client)

if (shouldTrace(poolReleaseChannel)) {
poolReleaseChannel.publish({ client: { processID: client.processID }, error: err || undefined })
}

// TODO(bmc): expose a proper, public interface _queryable and _ending
if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) {
if (client._poolUseCount >= this.options.maxUses) {
Expand Down
1 change: 1 addition & 0 deletions packages/pg-pool/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
},
"files": [
"index.js",
"diagnostics.js",
"esm"
]
}
Loading
Loading