Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/controllers/registration_controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ export class RegistrationController {
logger.debug('deregister body %s', JSON.stringify(req.body))
logger.debug('deregister query %s', JSON.stringify(req.query))
const { phone } = req.params
await this.logout.run(phone)
return res.status(204).send()
try {
await this.logout.run(phone, { force: true })
return res.status(204).send()
} catch (e) {
return res.status(400).json({ status: 'error', message: `${phone} could not deregister, error: ${e.message}` })
}
}
}
2 changes: 2 additions & 0 deletions src/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ export const UNOAPI_QUEUE_BULK_WEBHOOK = `${UNOAPI_QUEUE_NAME}.bulk.webhook`
export const UNOAPI_QUEUE_COMMANDER = `${UNOAPI_QUEUE_NAME}.commander`
export const UNOAPI_QUEUE_INCOMING = `${UNOAPI_QUEUE_NAME}.incoming`
export const UNOAPI_QUEUE_TRANSCRIBER = `${UNOAPI_QUEUE_NAME}.transcribe`
export const RELOAD_PUBLISH_BROKER = process.env.RELOAD_PUBLISH_BROKER === _undefined ? false : process.env.RELOAD_PUBLISH_BROKER == 'true'
export const RELOAD_BAILEYS_DEBOUNCE_MS = parseInt(process.env.RELOAD_BAILEYS_DEBOUNCE_MS || '15000')
export const UNOAPI_QUEUE_SPEECH = `${UNOAPI_QUEUE_NAME}.speech`
export const UNOAPI_MESSAGE_RETRY_LIMIT = parseInt(process.env.UNOAPI_MESSAGE_RETRY_LIMIT || '5')
export const UNOAPI_MESSAGE_RETRY_DELAY = parseInt(process.env.UNOAPI_MESSAGE_RETRY_DELAY || '10000')
Expand Down
4 changes: 2 additions & 2 deletions src/jobs/logout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ export class LogoutJob {
this.logout = logout
}

async consume(_: string, { phone }: { phone: string }) {
async consume(_: string, { phone, force }: { phone: string; force?: boolean }) {
logger.debug('Logout service for phone %s', phone)
this.logout.run(phone)
await this.logout.run(phone, { force })
}
}
6 changes: 5 additions & 1 deletion src/services/logout.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
export interface LogoutOptions {
force?: boolean
}

export interface Logout {
run(phone: string): Promise<void>
run(phone: string, options?: LogoutOptions): Promise<void>
}
6 changes: 3 additions & 3 deletions src/services/logout_amqp.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { amqpPublish } from '../amqp'
import { UNOAPI_EXCHANGE_BRIDGE_NAME, UNOAPI_QUEUE_LOGOUT } from '../defaults'
import { getConfig } from './config'
import { Logout } from './logout'
import { Logout, LogoutOptions } from './logout'

export class LogoutAmqp implements Logout {
private getConfig: getConfig
Expand All @@ -10,8 +10,8 @@ export class LogoutAmqp implements Logout {
this.getConfig = getConfig
}

public async run(phone: string) {
public async run(phone: string, options?: LogoutOptions) {
const config = await this.getConfig(phone)
await amqpPublish(UNOAPI_EXCHANGE_BRIDGE_NAME, `${UNOAPI_QUEUE_LOGOUT}.${config.server!}`, '', { phone }, { type: 'direct' })
await amqpPublish(UNOAPI_EXCHANGE_BRIDGE_NAME, `${UNOAPI_QUEUE_LOGOUT}.${config.server!}`, '', { phone, force: !!options?.force }, { type: 'direct' })
}
}
27 changes: 17 additions & 10 deletions src/services/logout_baileys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Listener } from '../services/listener'
import { configs, getConfig } from '../services/config'
import { clients, getClient } from '../services/client'
import { OnNewLogin } from '../services/socket'
import { Logout } from './logout'
import { Logout, LogoutOptions } from './logout'
import logger from './logger'
import { stores } from './store'
import { dataStores } from './data_store'
Expand All @@ -21,19 +21,26 @@ export class LogoutBaileys implements Logout {
this.onNewLogin = onNewLogin
}

async run(phone: string) {
async run(phone: string, options?: LogoutOptions) {
logger.debug('Logout baileys for phone %s', phone)
const config = await this.getConfig(phone)
const store = await config.getStore(phone, config)
const { sessionStore, dataStore } = store
if (await sessionStore.isStatusOnline(phone)) {
const client = await this.getClient({
phone,
listener: this.listener,
getConfig: this.getConfig,
onNewLogin: this.onNewLogin,
})
await client?.logout()

const force = !!options?.force
const shouldTrySocketLogout = force || (await sessionStore.isStatusOnline(phone))
if (shouldTrySocketLogout) {
try {
const client = clients.get(phone) || (await this.getClient({
phone,
listener: this.listener,
getConfig: this.getConfig,
onNewLogin: this.onNewLogin,
}))
await client?.logout()
} catch (error) {
logger.warn(error, 'Ignore error on forced logout for %s', phone)
}
}
await dataStore.cleanSession(true)
clients.delete(phone)
Expand Down
20 changes: 17 additions & 3 deletions src/services/reload_amqp.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { amqpPublish } from '../amqp'
import { UNOAPI_EXCHANGE_BRIDGE_NAME, UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_RELOAD } from '../defaults'
import { RELOAD_PUBLISH_BROKER, UNOAPI_EXCHANGE_BRIDGE_NAME, UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_RELOAD } from '../defaults'
import { getConfig } from './config'
import { Reload } from './reload'

Expand All @@ -13,7 +13,21 @@ export class ReloadAmqp extends Reload {

public async run(phone: string) {
const config = await this.getConfig(phone)
await amqpPublish(UNOAPI_EXCHANGE_BROKER_NAME, UNOAPI_QUEUE_RELOAD, phone, { phone }, { type: 'topic' })
await amqpPublish(UNOAPI_EXCHANGE_BRIDGE_NAME, `${UNOAPI_QUEUE_RELOAD}.${config.server!}`, '', { phone }, { type: 'direct' })
if (RELOAD_PUBLISH_BROKER) {
await amqpPublish(
UNOAPI_EXCHANGE_BROKER_NAME,
UNOAPI_QUEUE_RELOAD,
phone,
{ phone },
{ type: 'topic' }
)
}
await amqpPublish(
UNOAPI_EXCHANGE_BRIDGE_NAME,
`${UNOAPI_QUEUE_RELOAD}.${config.server!}`,
'',
{ phone },
{ type: 'direct' }
)
}
}
31 changes: 26 additions & 5 deletions src/services/reload_baileys.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { UNOAPI_SERVER_NAME } from '../defaults'
import { RELOAD_BAILEYS_DEBOUNCE_MS, UNOAPI_SERVER_NAME } from '../defaults'
import { getClient } from '../services/client'
import { getConfig } from '../services/config'
import { Listener } from '../services/listener'
Expand All @@ -7,6 +7,8 @@ import logger from './logger'
import { Reload } from './reload'

export class ReloadBaileys extends Reload {
private static readonly inFlightByPhone: Set<string> = new Set()
private static readonly lastRunAtByPhone: Map<string, number> = new Map()
private getClient: getClient
private getConfig: getConfig
private listener: Listener
Expand All @@ -21,6 +23,16 @@ export class ReloadBaileys extends Reload {
}

async run(phone: string) {
const now = Date.now()
const lastRunAt = ReloadBaileys.lastRunAtByPhone.get(phone) || 0
const debounceRemaining = Math.max(0, RELOAD_BAILEYS_DEBOUNCE_MS - (now - lastRunAt))
if (ReloadBaileys.inFlightByPhone.has(phone) || debounceRemaining > 0) {
logger.warn('Skip duplicated reload for %s (inFlight=%s debounceRemainingMs=%s)', phone, ReloadBaileys.inFlightByPhone.has(phone), debounceRemaining)
return
}
ReloadBaileys.inFlightByPhone.add(phone)
ReloadBaileys.lastRunAtByPhone.set(phone, now)
try {
logger.debug('Reload baileys run for phone %s', phone)
const config = await this.getConfig(phone)
if (config.server != UNOAPI_SERVER_NAME) {
Expand All @@ -35,19 +47,28 @@ export class ReloadBaileys extends Reload {
})
const store = await config.getStore(phone, config)
const { sessionStore } = store
if ((await sessionStore.isStatusOnline(phone)) || (await sessionStore.isStatusStandBy(phone)) || (await sessionStore.isStatusConnecting(phone))) {
const isConnecting = await sessionStore.isStatusConnecting(phone)
const isRestartRequired = await sessionStore.isStatusRestartRequired(phone)
if (isConnecting || isRestartRequired) {
logger.warn('Skip destructive reload for %s while status is transitional (connecting=%s restartRequired=%s)', phone, isConnecting, isRestartRequired)
return
}
const isOnline = await sessionStore.isStatusOnline(phone)
const isStandBy = await sessionStore.isStatusStandBy(phone)
if (isOnline || isStandBy) {
logger.warn('Reload disconnect session %s!', phone)
await currentClient?.disconnect()
await currentClient.disconnect()
}
await super.run(phone)
await sessionStore.setStatus(phone, 'online') // to clear standby
await sessionStore.setStatus(phone, 'disconnected')
await this.getClient({
phone,
listener: this.listener,
getConfig: this.getConfig,
onNewLogin: this.onNewLogin,
})
logger.info('Reloaded session %s!', phone)
} finally {
ReloadBaileys.inFlightByPhone.delete(phone)
}
}
}