diff --git a/src/controllers/registration_controller.ts b/src/controllers/registration_controller.ts index 56157a0f..5433a579 100644 --- a/src/controllers/registration_controller.ts +++ b/src/controllers/registration_controller.ts @@ -40,7 +40,11 @@ export class RegistrationController { logger.debug('deregister body %s', JSON.stringify(req.body)) logger.debug('deregister query %s', JSON.stringify(req.query)) const { phone } = req.params - await this.logout.run(phone) - return res.status(204).send() + try { + await this.logout.run(phone, { force: true }) + return res.status(204).send() + } catch (e) { + return res.status(400).json({ status: 'error', message: `${phone} could not deregister, error: ${e.message}` }) + } } } diff --git a/src/defaults.ts b/src/defaults.ts index 8e32a59a..f0f3e89b 100644 --- a/src/defaults.ts +++ b/src/defaults.ts @@ -108,6 +108,8 @@ export const UNOAPI_QUEUE_BULK_WEBHOOK = `${UNOAPI_QUEUE_NAME}.bulk.webhook` export const UNOAPI_QUEUE_COMMANDER = `${UNOAPI_QUEUE_NAME}.commander` export const UNOAPI_QUEUE_INCOMING = `${UNOAPI_QUEUE_NAME}.incoming` export const UNOAPI_QUEUE_TRANSCRIBER = `${UNOAPI_QUEUE_NAME}.transcribe` +export const RELOAD_PUBLISH_BROKER = process.env.RELOAD_PUBLISH_BROKER === _undefined ? false : process.env.RELOAD_PUBLISH_BROKER == 'true' +export const RELOAD_BAILEYS_DEBOUNCE_MS = parseInt(process.env.RELOAD_BAILEYS_DEBOUNCE_MS || '15000') export const UNOAPI_QUEUE_SPEECH = `${UNOAPI_QUEUE_NAME}.speech` export const UNOAPI_MESSAGE_RETRY_LIMIT = parseInt(process.env.UNOAPI_MESSAGE_RETRY_LIMIT || '5') export const UNOAPI_MESSAGE_RETRY_DELAY = parseInt(process.env.UNOAPI_MESSAGE_RETRY_DELAY || '10000') diff --git a/src/jobs/logout.ts b/src/jobs/logout.ts index eb951ef7..9cdc1475 100644 --- a/src/jobs/logout.ts +++ b/src/jobs/logout.ts @@ -8,8 +8,8 @@ export class LogoutJob { this.logout = logout } - async consume(_: string, { phone }: { phone: string }) { + async consume(_: string, { phone, force }: { phone: string; force?: boolean }) { logger.debug('Logout service for phone %s', phone) - this.logout.run(phone) + await this.logout.run(phone, { force }) } } diff --git a/src/services/logout.ts b/src/services/logout.ts index d6435937..43c53cee 100644 --- a/src/services/logout.ts +++ b/src/services/logout.ts @@ -1,3 +1,7 @@ +export interface LogoutOptions { + force?: boolean +} + export interface Logout { - run(phone: string): Promise + run(phone: string, options?: LogoutOptions): Promise } diff --git a/src/services/logout_amqp.ts b/src/services/logout_amqp.ts index e06b4ade..cbbb8893 100644 --- a/src/services/logout_amqp.ts +++ b/src/services/logout_amqp.ts @@ -1,7 +1,7 @@ import { amqpPublish } from '../amqp' import { UNOAPI_EXCHANGE_BRIDGE_NAME, UNOAPI_QUEUE_LOGOUT } from '../defaults' import { getConfig } from './config' -import { Logout } from './logout' +import { Logout, LogoutOptions } from './logout' export class LogoutAmqp implements Logout { private getConfig: getConfig @@ -10,8 +10,8 @@ export class LogoutAmqp implements Logout { this.getConfig = getConfig } - public async run(phone: string) { + public async run(phone: string, options?: LogoutOptions) { const config = await this.getConfig(phone) - await amqpPublish(UNOAPI_EXCHANGE_BRIDGE_NAME, `${UNOAPI_QUEUE_LOGOUT}.${config.server!}`, '', { phone }, { type: 'direct' }) + await amqpPublish(UNOAPI_EXCHANGE_BRIDGE_NAME, `${UNOAPI_QUEUE_LOGOUT}.${config.server!}`, '', { phone, force: !!options?.force }, { type: 'direct' }) } } diff --git a/src/services/logout_baileys.ts b/src/services/logout_baileys.ts index 95f1b8cc..598bf5f2 100644 --- a/src/services/logout_baileys.ts +++ b/src/services/logout_baileys.ts @@ -2,7 +2,7 @@ import { Listener } from '../services/listener' import { configs, getConfig } from '../services/config' import { clients, getClient } from '../services/client' import { OnNewLogin } from '../services/socket' -import { Logout } from './logout' +import { Logout, LogoutOptions } from './logout' import logger from './logger' import { stores } from './store' import { dataStores } from './data_store' @@ -21,19 +21,26 @@ export class LogoutBaileys implements Logout { this.onNewLogin = onNewLogin } - async run(phone: string) { + async run(phone: string, options?: LogoutOptions) { logger.debug('Logout baileys for phone %s', phone) const config = await this.getConfig(phone) const store = await config.getStore(phone, config) const { sessionStore, dataStore } = store - if (await sessionStore.isStatusOnline(phone)) { - const client = await this.getClient({ - phone, - listener: this.listener, - getConfig: this.getConfig, - onNewLogin: this.onNewLogin, - }) - await client?.logout() + + const force = !!options?.force + const shouldTrySocketLogout = force || (await sessionStore.isStatusOnline(phone)) + if (shouldTrySocketLogout) { + try { + const client = clients.get(phone) || (await this.getClient({ + phone, + listener: this.listener, + getConfig: this.getConfig, + onNewLogin: this.onNewLogin, + })) + await client?.logout() + } catch (error) { + logger.warn(error, 'Ignore error on forced logout for %s', phone) + } } await dataStore.cleanSession(true) clients.delete(phone) diff --git a/src/services/reload_amqp.ts b/src/services/reload_amqp.ts index 79c74df8..25b6bcae 100644 --- a/src/services/reload_amqp.ts +++ b/src/services/reload_amqp.ts @@ -1,5 +1,5 @@ import { amqpPublish } from '../amqp' -import { UNOAPI_EXCHANGE_BRIDGE_NAME, UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_RELOAD } from '../defaults' +import { RELOAD_PUBLISH_BROKER, UNOAPI_EXCHANGE_BRIDGE_NAME, UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_RELOAD } from '../defaults' import { getConfig } from './config' import { Reload } from './reload' @@ -13,7 +13,21 @@ export class ReloadAmqp extends Reload { public async run(phone: string) { const config = await this.getConfig(phone) - await amqpPublish(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_RELOAD, phone, { phone }, { type: 'topic' }) - await amqpPublish(UNOAPI_EXCHANGE_BRIDGE_NAME, `${UNOAPI_QUEUE_RELOAD}.${config.server!}`, '', { phone }, { type: 'direct' }) + if (RELOAD_PUBLISH_BROKER) { + await amqpPublish( + UNOAPI_EXCHANGE_BROKER_NAME, + UNOAPI_QUEUE_RELOAD, + phone, + { phone }, + { type: 'topic' } + ) + } + await amqpPublish( + UNOAPI_EXCHANGE_BRIDGE_NAME, + `${UNOAPI_QUEUE_RELOAD}.${config.server!}`, + '', + { phone }, + { type: 'direct' } + ) } } diff --git a/src/services/reload_baileys.ts b/src/services/reload_baileys.ts index cb6e2469..45dd3af6 100644 --- a/src/services/reload_baileys.ts +++ b/src/services/reload_baileys.ts @@ -1,4 +1,4 @@ -import { UNOAPI_SERVER_NAME } from '../defaults' +import { RELOAD_BAILEYS_DEBOUNCE_MS, UNOAPI_SERVER_NAME } from '../defaults' import { getClient } from '../services/client' import { getConfig } from '../services/config' import { Listener } from '../services/listener' @@ -7,6 +7,8 @@ import logger from './logger' import { Reload } from './reload' export class ReloadBaileys extends Reload { + private static readonly inFlightByPhone: Set = new Set() + private static readonly lastRunAtByPhone: Map = new Map() private getClient: getClient private getConfig: getConfig private listener: Listener @@ -21,6 +23,16 @@ export class ReloadBaileys extends Reload { } async run(phone: string) { + const now = Date.now() + const lastRunAt = ReloadBaileys.lastRunAtByPhone.get(phone) || 0 + const debounceRemaining = Math.max(0, RELOAD_BAILEYS_DEBOUNCE_MS - (now - lastRunAt)) + if (ReloadBaileys.inFlightByPhone.has(phone) || debounceRemaining > 0) { + logger.warn('Skip duplicated reload for %s (inFlight=%s debounceRemainingMs=%s)', phone, ReloadBaileys.inFlightByPhone.has(phone), debounceRemaining) + return + } + ReloadBaileys.inFlightByPhone.add(phone) + ReloadBaileys.lastRunAtByPhone.set(phone, now) + try { logger.debug('Reload baileys run for phone %s', phone) const config = await this.getConfig(phone) if (config.server != UNOAPI_SERVER_NAME) { @@ -35,13 +47,19 @@ export class ReloadBaileys extends Reload { }) const store = await config.getStore(phone, config) const { sessionStore } = store - if ((await sessionStore.isStatusOnline(phone)) || (await sessionStore.isStatusStandBy(phone)) || (await sessionStore.isStatusConnecting(phone))) { + const isConnecting = await sessionStore.isStatusConnecting(phone) + const isRestartRequired = await sessionStore.isStatusRestartRequired(phone) + if (isConnecting || isRestartRequired) { + logger.warn('Skip destructive reload for %s while status is transitional (connecting=%s restartRequired=%s)', phone, isConnecting, isRestartRequired) + return + } + const isOnline = await sessionStore.isStatusOnline(phone) + const isStandBy = await sessionStore.isStatusStandBy(phone) + if (isOnline || isStandBy) { logger.warn('Reload disconnect session %s!', phone) - await currentClient?.disconnect() + await currentClient.disconnect() } await super.run(phone) - await sessionStore.setStatus(phone, 'online') // to clear standby - await sessionStore.setStatus(phone, 'disconnected') await this.getClient({ phone, listener: this.listener, @@ -49,5 +67,8 @@ export class ReloadBaileys extends Reload { onNewLogin: this.onNewLogin, }) logger.info('Reloaded session %s!', phone) + } finally { + ReloadBaileys.inFlightByPhone.delete(phone) + } } }