@@ -125,6 +125,8 @@ export interface PostgresAdapterOptions {
125125 errorHandler : ( err : Error ) => void ;
126126}
127127
128+ const defaultErrorHandler = ( err : Error ) => debug ( err ) ;
129+
128130/**
129131 * Returns a function that will create a PostgresAdapter instance.
130132 *
@@ -137,8 +139,104 @@ export function createAdapter(
137139 pool : Pool ,
138140 opts : Partial < PostgresAdapterOptions > = { }
139141) {
142+ const errorHandler = opts . errorHandler || defaultErrorHandler ;
143+ const tableName = opts . tableName || "socket_io_attachments" ;
144+ const cleanupInterval = opts . cleanupInterval || 30000 ;
145+
146+ const channelToAdapters = new Map < string , PostgresAdapter > ( ) ;
147+ let isConnectionInProgress = false ;
148+ let client : any ;
149+ let cleanupTimer : NodeJS . Timer ;
150+
151+ const scheduleReconnection = ( ) => {
152+ const reconnectionDelay = Math . floor ( 2000 * ( 0.5 + Math . random ( ) ) ) ;
153+ setTimeout ( initClient , reconnectionDelay ) ;
154+ } ;
155+
156+ const initClient = async ( ) => {
157+ try {
158+ debug ( "fetching client from the pool" ) ;
159+ client = await pool . connect ( ) ;
160+ isConnectionInProgress = false ;
161+
162+ for ( const [ channel ] of channelToAdapters ) {
163+ debug ( "client listening to %s" , channel ) ;
164+ await client . query ( `LISTEN "${ channel } "` ) ;
165+ }
166+
167+ client . on ( "notification" , async ( msg : any ) => {
168+ try {
169+ await channelToAdapters . get ( msg . channel ) ?. onEvent ( msg . payload ) ;
170+ } catch ( err ) {
171+ errorHandler ( err ) ;
172+ }
173+ } ) ;
174+
175+ client . on ( "error" , ( ) => {
176+ debug ( "client error" ) ;
177+ } ) ;
178+
179+ client . on ( "end" , ( ) => {
180+ debug ( "client was closed, scheduling reconnection..." ) ;
181+ scheduleReconnection ( ) ;
182+ } ) ;
183+ } catch ( err ) {
184+ errorHandler ( err ) ;
185+ debug ( "error while initializing client, scheduling reconnection..." ) ;
186+ scheduleReconnection ( ) ;
187+ }
188+ } ;
189+
190+ const scheduleCleanup = ( ) => {
191+ cleanupTimer = setTimeout ( async ( ) => {
192+ try {
193+ await pool . query (
194+ `DELETE FROM ${ tableName } WHERE created_at < now() - interval '${ cleanupInterval } milliseconds'`
195+ ) ;
196+ } catch ( err ) {
197+ errorHandler ( err ) ;
198+ }
199+ scheduleCleanup ( ) ;
200+ } , cleanupInterval ) ;
201+ } ;
202+
140203 return function ( nsp : any ) {
141- return new PostgresAdapter ( nsp , pool , opts ) ;
204+ let adapter = new PostgresAdapter ( nsp , pool , opts ) ;
205+
206+ channelToAdapters . set ( adapter . channel , adapter ) ;
207+
208+ if ( isConnectionInProgress ) {
209+ // nothing to do
210+ } else if ( client ) {
211+ debug ( "client listening to %s" , adapter . channel ) ;
212+ client . query ( `LISTEN "${ adapter . channel } "` ) . catch ( errorHandler ) ;
213+ } else {
214+ isConnectionInProgress = true ;
215+ initClient ( ) ;
216+
217+ scheduleCleanup ( ) ;
218+ }
219+
220+ const defaultClose = adapter . close ;
221+
222+ adapter . close = ( ) => {
223+ channelToAdapters . delete ( adapter . channel ) ;
224+
225+ if ( channelToAdapters . size === 0 ) {
226+ if ( client ) {
227+ client . removeAllListeners ( "end" ) ;
228+ client . release ( ) ;
229+ client = null ;
230+ }
231+ if ( cleanupTimer ) {
232+ clearTimeout ( cleanupTimer ) ;
233+ }
234+ }
235+
236+ defaultClose . call ( adapter ) ;
237+ } ;
238+
239+ return adapter ;
142240 } ;
143241}
144242
@@ -150,14 +248,11 @@ export class PostgresAdapter extends Adapter {
150248 public heartbeatInterval : number ;
151249 public heartbeatTimeout : number ;
152250 public payloadThreshold : number ;
153- public cleanupInterval : number ;
154251 public errorHandler : ( err : Error ) => void ;
155252
156253 private readonly pool : Pool ;
157- private client : any ;
158254 private nodesMap : Map < string , number > = new Map < string , number > ( ) ; // uid => timestamp of last message
159255 private heartbeatTimer : NodeJS . Timeout | undefined ;
160- private cleanupTimer : NodeJS . Timeout | undefined ;
161256 private requests : Map < string , Request > = new Map ( ) ;
162257 private ackRequests : Map < string , AckRequest > = new Map ( ) ;
163258
@@ -185,69 +280,18 @@ export class PostgresAdapter extends Adapter {
185280 this . heartbeatInterval = opts . heartbeatInterval || 5000 ;
186281 this . heartbeatTimeout = opts . heartbeatTimeout || 10000 ;
187282 this . payloadThreshold = opts . payloadThreshold || 8000 ;
188- this . cleanupInterval = opts . cleanupInterval || 30000 ;
189- const defaultErrorHandler = ( err : Error ) => debug ( err ) ;
190283 this . errorHandler = opts . errorHandler || defaultErrorHandler ;
191284
192- this . initSubscription ( ) ;
193285 this . publish ( {
194286 type : EventType . INITIAL_HEARTBEAT ,
195287 } ) ;
196- this . scheduleCleanup ( ) ;
197288 }
198289
199290 close ( ) : Promise < void > | void {
200291 debug ( "closing adapter" ) ;
201292 if ( this . heartbeatTimer ) {
202293 clearTimeout ( this . heartbeatTimer ) ;
203294 }
204- if ( this . cleanupTimer ) {
205- clearTimeout ( this . cleanupTimer ) ;
206- }
207- if ( this . client ) {
208- this . client . removeAllListeners ( "end" ) ;
209- this . client . release ( ) ;
210- this . client = null ;
211- }
212- }
213-
214- private async initSubscription ( ) {
215- try {
216- debug ( "fetching client from the pool" ) ;
217- const client = await this . pool . connect ( ) ;
218- debug ( "client listening to %s" , this . channel ) ;
219- await client . query ( `LISTEN "${ this . channel } "` ) ;
220-
221- client . on ( "notification" , async ( msg : any ) => {
222- try {
223- await this . onEvent ( msg . payload ) ;
224- } catch ( err ) {
225- this . errorHandler ( err ) ;
226- }
227- } ) ;
228-
229- client . on ( "error" , ( ) => {
230- debug ( "client error" ) ;
231- } ) ;
232-
233- client . on ( "end" , ( ) => {
234- debug ( "client was closed, scheduling reconnection..." ) ;
235- this . scheduleReconnection ( ) ;
236- } ) ;
237-
238- this . client = client ;
239- } catch ( err ) {
240- this . errorHandler ( err ) ;
241- debug ( "error while initializing client, scheduling reconnection..." ) ;
242- this . scheduleReconnection ( ) ;
243- }
244- }
245-
246- private scheduleReconnection ( ) {
247- const reconnectionDelay = Math . floor ( 2000 * ( 0.5 + Math . random ( ) ) ) ;
248- setTimeout ( ( ) => {
249- this . initSubscription ( ) ;
250- } , reconnectionDelay ) ;
251295 }
252296
253297 public async onEvent ( event : any ) {
@@ -451,19 +495,6 @@ export class PostgresAdapter extends Adapter {
451495 } , this . heartbeatInterval ) ;
452496 }
453497
454- private scheduleCleanup ( ) {
455- this . cleanupTimer = setTimeout ( async ( ) => {
456- try {
457- await this . pool . query (
458- `DELETE FROM ${ this . tableName } WHERE created_at < now() - interval '${ this . cleanupInterval } milliseconds'`
459- ) ;
460- } catch ( err ) {
461- this . errorHandler ( err ) ;
462- }
463- this . scheduleCleanup ( ) ;
464- } , this . cleanupInterval ) ;
465- }
466-
467498 private async publish ( document : any ) {
468499 document . uid = this . uid ;
469500
0 commit comments