Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions services/telegram-bot/pod-telegram-bot/src/account.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,17 @@ export async function enableIntegration (integration: IntegrationInfo): Promise<
}
})
}

export async function updateIntegrationData (
integration: IntegrationInfo,
dataPatch: Record<string, unknown>
): Promise<void> {
const client = getAccountClient(serviceToken())
await client.updateIntegration({
...integration,
data: {
...(integration.data ?? {}),
...dataPatch
}
})
}
68 changes: 65 additions & 3 deletions services/telegram-bot/pod-telegram-bot/src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@

import postgres from 'postgres'
import { AccountUuid, Ref, WorkspaceUuid } from '@hcengineering/core'
import { ChunterSpace } from '@hcengineering/chunter'
import { ActivityMessage } from '@hcengineering/activity'

import config from './config'
import { ChannelId, ChannelRecord, MessageRecord, OtpRecord, ReplyRecord } from './types'
import { ChannelId, ChannelRecord, ForumTopicRecord, MessageRecord, OtpRecord, ReplyRecord } from './types'

export async function getDb (): Promise<PostgresDB> {
const sql = postgres(config.DbUrl, {
Expand All @@ -36,6 +37,7 @@ const otpTable = 'telegram_bot.otp'
const messagesTable = 'telegram_bot.messages'
const channelsTable = 'telegram_bot.channels'
const repliesTable = 'telegram_bot.replies'
const forumTopicsTable = 'telegram_bot.forum_topics'

type DBFlavor = 'cockroach' | 'postgres' | 'unknown'

Expand Down Expand Up @@ -73,7 +75,7 @@ export class PostgresDB {

const sql = `
CREATE SCHEMA IF NOT EXISTS telegram_bot;

CREATE TABLE IF NOT EXISTS ${otpTable} (
telegram_id INT8 NOT NULL,
telegram_username TEXT NOT NULL,
Expand All @@ -82,7 +84,7 @@ export class PostgresDB {
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (code)
);

CREATE TABLE IF NOT EXISTS ${messagesTable} (
message_id VARCHAR(255) NOT NULL,
workspace UUID NOT NULL,
Expand All @@ -108,6 +110,17 @@ export class PostgresDB {
reply_id INT8 NOT NULL,
PRIMARY KEY (message_id, telegram_user_id, reply_id)
);

CREATE TABLE IF NOT EXISTS ${forumTopicsTable} (
workspace UUID NOT NULL,
account UUID NOT NULL,
channel_id VARCHAR(255) NOT NULL,
forum_chat_id INT8 NOT NULL,
topic_id INT8 NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (workspace, account, channel_id),
UNIQUE (forum_chat_id, topic_id)
);
`

await client.unsafe(sql)
Expand Down Expand Up @@ -217,6 +230,44 @@ export class PostgresDB {
return res.map(toReplyRecord)[0]
}

async getForumTopic (
workspace: WorkspaceUuid,
account: AccountUuid,
channelId: Ref<ChunterSpace>
): Promise<ForumTopicRecord | undefined> {
const sql = `
SELECT * FROM ${forumTopicsTable}
WHERE workspace = $1::uuid AND account = $2::uuid AND channel_id = $3::varchar
LIMIT 1`
const res = await this.client.unsafe(sql, [workspace, account, channelId])
return res.map(toForumTopicRecord)[0]
}

async insertForumTopic (record: Omit<ForumTopicRecord, 'createdAt'>): Promise<void> {
const sql = `
INSERT INTO ${forumTopicsTable} (
workspace, account, channel_id, forum_chat_id, topic_id
)
VALUES ($1::uuid, $2::uuid, $3::varchar, $4::int8, $5::int8)
ON CONFLICT (workspace, account, channel_id) DO NOTHING`
await this.client.unsafe(sql, [
record.workspace,
record.account,
record.channelId,
record.forumChatId,
record.topicId
])
}

async getForumTopicByThread (forumChatId: number, topicId: number): Promise<ForumTopicRecord | undefined> {
const sql = `
SELECT * FROM ${forumTopicsTable}
WHERE forum_chat_id = $1::int8 AND topic_id = $2::int8
LIMIT 1`
const res = await this.client.unsafe(sql, [forumChatId, topicId])
return res.map(toForumTopicRecord)[0]
}

async close (): Promise<void> {
await this.client.end({ timeout: 0 })
}
Expand Down Expand Up @@ -259,3 +310,14 @@ function toMessageRecord (raw: any): MessageRecord {
telegramMessageId: Number(raw.telegram_message_id)
}
}

function toForumTopicRecord (raw: any): ForumTopicRecord {
return {
workspace: raw.workspace,
account: raw.account,
channelId: raw.channel_id,
forumChatId: Number(raw.forum_chat_id),
topicId: Number(raw.topic_id),
createdAt: new Date(raw.created_at)
}
}
100 changes: 94 additions & 6 deletions services/telegram-bot/pod-telegram-bot/src/telegraf/bot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,52 @@ async function onReply (
return await worker.reply(integration, messageRecord, htmlToMarkup(toHTML(message)), files)
}

/**
* Routes a non-reply message that arrived inside a Telegram forum topic to the matching
* Huly channel (via the forum_topics table). Returns true if the message was successfully
* routed; false if the topic is not registered (caller should fall back to legacy flow).
*/
async function handleForumTopicMessage (
ctx: Context,
worker: PlatformWorker,
chatId: number,
threadId: number,
fromId: number
): Promise<boolean> {
const topic = await worker.getForumTopicByThread(chatId, threadId)
if (topic === undefined) return false

const integration = await getAnyIntegrationByTelegramId(fromId, topic.workspace)
if (integration === undefined) return false

const channel = await worker.resolveChannelByRef(topic.workspace, topic.account, topic.channelId)
if (channel === undefined) return false

const ctxMessage = ctx.message as Message | undefined
if (ctxMessage === undefined) return false

const file = await toTelegramFileInfo(ctx as TgContext, ctxMessage)
let text = htmlToMarkup(toHTML(ctxMessage as Message.TextMessage))

if (isEmptyMarkup(text) && 'caption' in ctxMessage && ctxMessage.caption !== undefined) {
text = jsonToMarkup({
type: MarkupNodeType.text,
text: ctxMessage.caption
})
}

if (isEmptyMarkup(text) && file === undefined) return false

return await worker.sendMessage(
channel,
integration.account,
integration.socialId,
ctxMessage.message_id,
text,
file
)
}

async function handleSelectChannel (
ctx: Context<Update.CallbackQueryUpdate<CallbackQuery>>,
worker: PlatformWorker,
Expand Down Expand Up @@ -217,17 +263,32 @@ export async function setUpBot (worker: PlatformWorker): Promise<Telegraf<TgCont
await defineCommands(bot, worker)

bot.on(message('reply_to_message'), async (ctx) => {
const id = ctx.chat?.id
const chatId = ctx.chat?.id
const message = ctx.message

if (id === undefined || message.reply_to_message === undefined) {
if (chatId === undefined || message.reply_to_message === undefined) {
return
}

const fromId = ctx.from?.id
const threadId = (message as Message & { message_thread_id?: number }).message_thread_id

// Inside a forum-enabled DM, "replying" to the topic's own first system message is
// really the user opening the topic to type into it. Treat it as a fresh forum-routed
// message instead of trying to thread-link it to the synthetic topic head.
if (fromId !== undefined && threadId !== undefined && ctx.chat?.type === 'private') {
const replyToId = message.reply_to_message.message_id
const isTopicHead = replyToId === threadId
if (isTopicHead) {
const routed = await handleForumTopicMessage(ctx, worker, chatId, threadId, fromId)
if (routed) return
}
}

const replyTo = message.reply_to_message
const isReplied = await onReply(
ctx,
id,
chatId,
message as ReplyMessage,
message.message_id,
replyTo.message_id,
Expand All @@ -241,11 +302,38 @@ export async function setUpBot (worker: PlatformWorker): Promise<Telegraf<TgCont
})

bot.on(message(), async (ctx) => {
const id = ctx.chat?.id
if (id === undefined) return
const chatId = ctx.chat?.id
if (chatId === undefined) return
if ('reply_to_message' in ctx.message) return

const integrations = await listIntegrationsByTelegramId(id)
// Skip Telegram forum service messages (topic created/edited/closed/reopened, etc.).
// They carry message_thread_id but represent system events, not user input — answering
// them with reply_parameters tied to a transient probe topic causes 400 "message thread not found".
const m = ctx.message as Record<string, unknown>
if (
m.forum_topic_created !== undefined ||
m.forum_topic_edited !== undefined ||
m.forum_topic_closed !== undefined ||
m.forum_topic_reopened !== undefined ||
m.general_forum_topic_hidden !== undefined ||
m.general_forum_topic_unhidden !== undefined
) {
return
}

const fromId = ctx.from?.id
const threadId = (ctx.message as Message.TextMessage & { message_thread_id?: number }).message_thread_id

if (fromId !== undefined && threadId !== undefined && ctx.chat?.type === 'private') {
const routed = await handleForumTopicMessage(ctx, worker, chatId, threadId, fromId)
if (routed) return
// Inside a topic but the topic is not in our table (stale topic, manual user
// creation, etc). Skip the workspace/channel keyboard fallback so Telegraf does
// not echo back into a thread id that may not exist anymore.
return
}

const integrations = await listIntegrationsByTelegramId(chatId)
if (integrations === undefined) return

const workspaces: WorkspaceUuid[] = integrations
Expand Down
65 changes: 64 additions & 1 deletion services/telegram-bot/pod-telegram-bot/src/telegraf/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import {
listIntegrationsByTelegramId,
getAccountPerson,
removeIntegrationsByTg,
getAnyIntegrationByTelegramId
getAnyIntegrationByTelegramId,
updateIntegrationData
} from '../account'
import { WorkspaceUuid } from '@hcengineering/core'

Expand All @@ -34,6 +35,8 @@ export enum Command {
Connect = 'connect',
SyncAllChannels = 'sync_all_channels',
SyncStarredChannels = 'sync_starred_channels',
SetForum = 'setforum',
UnsetForum = 'unsetforum',
Help = 'help',
Stop = 'stop'
}
Expand All @@ -56,6 +59,14 @@ export async function getBotCommands (lang: string = 'en'): Promise<BotCommand[]
command: Command.SyncStarredChannels,
description: await translate(telegram.string.SyncStarredChannels, { app: config.App }, lang)
},
{
command: Command.SetForum,
description: 'Route notifications into a forum topic per Huly channel (one topic per channel)'
},
{
command: Command.UnsetForum,
description: 'Disable forum routing and send notifications back to this DM'
},
{
command: Command.Help,
description: await translate(telegram.string.ShowCommandsDetails, { app: config.App }, lang)
Expand Down Expand Up @@ -142,6 +153,56 @@ async function onSyncChannels (ctx: Context, worker: PlatformWorker, onlyStarred
await ctx.reply('List of channels updated')
}

async function onSetForum (ctx: Context, worker: PlatformWorker): Promise<void> {
const id = ctx.from?.id
if (id === undefined) return

// Bot API 9.4 surfaces the Threaded Mode bit via getMe().has_topics_enabled.
// Without it, createForumTopic will fail with 400, so refuse early with a
// helpful pointer to @BotFather. Using getMe instead of a probe createForumTopic
// avoids polluting the DM with a "topic created" service message.
const me = (await ctx.telegram.getMe()) as { username?: string, has_topics_enabled?: boolean }
if (me.has_topics_enabled !== true) {
await ctx.reply(
'Topic creation is not allowed in this DM. The bot administrator must enable Threaded Mode via @BotFather: ' +
`/mybots -> @${me.username ?? 'bot'} -> press Open (Mini App) -> Threads -> toggle ON, then retry /setforum.`
)
return
}

const integrations = await listIntegrationsByTelegramId(id)
if (integrations.length === 0) {
await ctx.reply('No Huly integration found. Connect a workspace first via /connect.')
return
}

for (const integration of integrations) {
await updateIntegrationData(integration, { forumChatId: id })
}

await ctx.reply(
'Forum routing enabled. Every Huly channel will become its own topic in this DM. ' +
'Use /unsetforum to turn it off.'
)
}

async function onUnsetForum (ctx: Context, worker: PlatformWorker): Promise<void> {
const id = ctx.from?.id
if (id === undefined) return

const integrations = await listIntegrationsByTelegramId(id)
if (integrations.length === 0) {
await ctx.reply('No Huly integration found.')
return
}

for (const integration of integrations) {
await updateIntegrationData(integration, { forumChatId: null })
}

await ctx.reply('Forum routing disabled. Notifications will return to this DM.')
}

async function onConnect (ctx: Context, worker: PlatformWorker): Promise<void> {
const id = ctx.from?.id
const lang = ctx.from?.language_code ?? 'en'
Expand Down Expand Up @@ -178,4 +239,6 @@ export async function defineCommands (bot: Telegraf<TgContext>, worker: Platform
bot.command(Command.Connect, (ctx) => onConnect(ctx, worker))
bot.command(Command.SyncAllChannels, (ctx) => onSyncChannels(ctx, worker, false))
bot.command(Command.SyncStarredChannels, (ctx) => onSyncChannels(ctx, worker, true))
bot.command(Command.SetForum, (ctx) => onSetForum(ctx, worker))
bot.command(Command.UnsetForum, (ctx) => onUnsetForum(ctx, worker))
}
9 changes: 9 additions & 0 deletions services/telegram-bot/pod-telegram-bot/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ export interface ReplyRecord {
replyId: number
}

export interface ForumTopicRecord {
workspace: WorkspaceUuid
account: AccountUuid
channelId: Ref<ChunterSpace>
forumChatId: number
topicId: number
createdAt: Date
}

export interface OtpRecord {
telegramId: number
telegramUsername: string
Expand Down
Loading