likes
comments
collection
share

Telegram Bot Nodejs Server 设计Telegram 作为最近比较有戏份的生态,刚好我有幸开发了一

作者站长头像
站长
· 阅读数 2

Telegram 作为最近比较有戏份的生态,刚好我有幸开发了一个相关的项目。

项目结束后在这里分享一下自己的 Telegram Bot Nodejs 的 Server 设计,以下是架构设计图:

Telegram Bot Nodejs Server 设计Telegram 作为最近比较有戏份的生态,刚好我有幸开发了一

接入层

基于 Nodejs Koa WebServer 实现的数据交换与网络通信层.

Cloudflare 全球 CDN 服务

Cloudflare 提供免费的全球 CDN 服务, 我们只需要将 domain 绑定至服务端口即可.

如果没有 domain 将无法使用 Telegram webhook, 同时 Telegram Bot Client 将会降级成轮询模式.

Koa WebServer

基于 Nodejs 的基础 web 服务, 通过 Koa 我们才能实现 REST Api 服务与 Telegram WebHook 的 external 节点.

这里提供一些实现细节以供后续开发参考.

0x1 Koa 的 Router Define 模式

当我们需要模块化 Route 时, 需要一种方便的设计模式以脱离仅由约定实现的模块拆分逻辑.

这里我们使用类似工厂模式的方式提供简单直观的模块化拆分, 也就是 Route Define.

它通过参数定义的方式与解耦 Koa Router 回调函数(Middleware Callback)的方式实现模块化:

// Router Define 参数
export interface RouterOptions {
  prefix?: string;
  options?: Router.IRouterOptions;
  middlewares?: Middleware[];
  setup?: () => Promise<void>;
  routes?: Router[];
  health?: boolean;
}
  • 参数 prefixRouter.IRouterOptions 中的配置相同, 同样是 Koa Router 的定义参数.
  • 扩展字段 middlewares 提供给所有的子路由相同的中间件.
  • 扩展字段 setup 用于支持路由器的初始化启动.
  • 扩展字段 routes 用于定义该路由的子路由, 如 user 路由中即可共享 :id 的查询子路由.
  • 扩展字段 health 提供一个简单的健康检查服务子路由.

这时我们就需要注册一个应用 bootstrap 阶段执行的 setup 方法:

// 收集 Router Define 的 setup 配置, 并在应用的 bootstrap 阶段执行
let setups: Required<RouterOptions>['setup'][] = [];

// 注册 Router Define 的 setup 值 bootstrap
export const routerSetup = async () => {
  for (const setup of setups) {
    await setup(); // 独立执行以避免复杂的调度
  }

  // clear setups
  setups = [];
};

需要注意的是路由可能存在上下关系, 为了避免过于复杂的调度, 这里简化了执行流程, 所有在使用时需要考虑到这点.

接下来就是 defineRouter 的实现部分, 非常的简单:

export function defineRouter(options: RouterOptions) {
  const router = new Router({ prefix: options.prefix, ...options.options });

  // 添加健康检查服务
  if (options.health) {
    router.get('/health', (ctx) => (ctx.body = 'OK'));
  }

  // 注册中间件
  if (options.middlewares) {
    router.use(...options.middlewares);
  }
  
  // 注册子路由
  options.routes?.forEach((route) => {
    router.use(route.routes());
  });

  // 注册setup
  if (options.setup) {
    setups.push(options.setup);
  }

  return router;
}

比较巧妙的是, 由于 middlewares 中的中间件是先于 routes 并且按照添加进入的先后顺序注册的, 根据 Koa 的洋葱模型, 即可实现带有先后顺序的中间件执行逻辑.

使用时只需要引入 defineRouter 方法即可:

export const userRouter = defineRouter({
  prefix: '/user',
  // 基于独立密码的简单权限控制
  middlewares: [authPassword(ENV.SERVER_AUTH_PASSWORD)],
});

0x2 Koa 常用中间件

常见中间件有 REST 封装中间件, 鉴权中间, 表单校验中间件等

0x2.1 RESTful Middleware

封装路由的返回数据, 并定义相关返回 code.

import { Middleware } from 'koa';
import logger from '../common/logger.js';

export const restful: Middleware = async (ctx, next) => {
  try {
    await next();
    if (!ctx.body) {
      ctx.status = 404; // 未知返回值 404
      ctx.body = { code: 404, message: 'Not Found' };
      return;
    }
    // 跳过自定义 code 或封装 body
    if (ctx.body.code === undefined) ctx.body = { code: 0, data: ctx.body, message: ctx.body.message ?? 'Success' };
  } catch (error: unknown) {
    console.error(error);
    logger.error(`Internal Server Error: ${error}`);
    ctx.status = 500; // 由Router抛出的错误作为服务错误
    ctx.body = { code: 500, message: 'Internal Server Error' };
  }
};
0x2.2 DTO Middleware

利用 zod 实现表单检查, 通过定义好的 DTO Schema 解耦表单验证逻辑.

import { Context, Next } from 'koa';
import { ZodObject, ZodRawShape, ZodError } from 'zod';
import logger from '../common/logger.js';
import _ from 'lodash';

export const dto =
  <T extends ZodRawShape, S extends ZodObject<T>>(schema: S) =>
  async (ctx: Context, next: Next) => {
    try {
      // schema 校验
      schema.parse(ctx.request.body);
    } catch (error: unknown) {
      // 处理未通过的情况
      const errors = _.map((error as ZodError).errors, 'message');
      logger.error(`Validation Error: ${errors}`);

      ctx.status = 412;
      ctx.body = { code: 412, message: 'DTO check error.', error };
      return;
    }

    await next();
  };
0x2.3 Auth Password Middleware

利用本地密码实现的最简单 Auth 鉴权中间件, 这种鉴权的模式不适用于公开, 如果需要更复杂的机制可以使用 jwt.

import { Context, Next } from 'koa';

export const authPassword = (psd: string) => async (ctx: Context, next: Next) => {
  // 检查是否为本地密码
  if (ctx.header.authorization !== psd) {
    ctx.status = 401;
    ctx.body = { code: 401, message: 'Unauthorized' }; // 抛出未授权的请求
    return;
  } else {
    await next();
  }
};
0x2.4 Auth JWT Middleware

使用 jsonwebtoken package 提供的 JWT 签名功能我们可以实现一个简单的 JWT 验证中间件:

import jsonwebtoken from 'jsonwebtoken';
import { User } from '@prisma/client';
import { prisma } from '../common/prisma.js';

export const authJwt = () => async (ctx: Context, next: Next) => {
  try {
    if (!ctx.header.authorization) throw new Error('No authorization header');
    const jwtUser: User = jsonwebtoken.verify(ctx.header.authorization, ENV.SERVER_AUTH_PASSWORD) as User;
    const user = await prisma.user.findUnique({ where: { id: jwtUser.id } });
    if (!user) throw new Error('Unauthorized');

    ctx.state.user = user; // 将用户注册至后续中间件与路由中
  } catch (error) {
    ctx.status = 401;
    ctx.body = { code: 401, message: 'Unauthorized' };
    return;
  }

  await next();
};

服务层

RPC/RESET API 服务

此 RPC 服务主要通过共享 Prisma 的数据操作权限来实现的。利用 ZenStack 的 OpenAPI 插件提供的中间件实现。

由于它并不提供直接的 KOA 中间件所以需要我们 Patch 一下:

function ZenStackMiddlewareKoaAdapter(ops: MiddlewareOptions) {
  const handle = ZenStackMiddleware({ ...ops, sendResponse: false });
  return async (ctx: Context, next: Next) => {
    const status = (code: number) => (ctx.status = code);
    const get = (key: string) => ctx.request.query[key];

    await handle(
      Object.assign(ctx.request, { status, get, path: ctx.request.path.replace('/api/rest', '') }) as ExpectedAnyData,
      ctx.response as ExpectedAnyData,
      next
    );

    const locals = (ctx.response as unknown as { locals: { status: number; body: object } }).locals;
    ctx.body = Object.assign(locals.body, { code: locals.status });
  };
}

调用时传入配置即可:

router.all(
  /\/api\/rest\/.*/g,
  ZenStackMiddlewareKoaAdapter({
    getPrisma: () => prisma,
    handler: RestApiHandler({ endpoint: `${ENV.APP_HOST}/api/rest` }),
    logger: {
      error: (...args) => {
        console.error(...args);
        console.trace();
      },
    },
  })
);

Telegram Bot

0x1 TelegramBotClient 的封装

在实际的使用中,频繁的调用 Telegram API 时常会遇到速率限制的问题,这需要我们添加速率控制器来限制这些API的调用。

使用 Bottleneck 实现的速率控制器,也可以使用 p-queue 或者其他的 package 实现,它们的方法都类似:

import Bottleneck from 'bottleneck';
import logger from '../../common/logger.js';

// 速率控制器
export class RateLimiterControl {
  failedRetryTime = 1000;  // 重试间隔
  failedRetryLimit = 10;  // 重试次数

  protected rateLimiter = new Bottleneck({
    // 30 QPS限制的配置
    minTime: Math.ceil(1000 / 30),
    maxConcurrent: 30,
  });

  // 重誓错误的检查逻辑
  protected checkJobFailError(_error: unknown, retryCount: number): boolean {
    return retryCount < this.failedRetryLimit;
  }

  constructor() {
    // 重试请求与错误处理
    this.rateLimiter.on('failed', (error, jobInfo) => {
      if (this.checkJobFailError(error, jobInfo.retryCount)) {
        logger.error(error);
        logger.warn(
          `Job ${jobInfo.options.id} failed ${jobInfo.retryCount + 1}. Retrying after ${
            this.failedRetryTime
          }ms retries.`
        );

        return this.failedRetryTime;
      }

      logger.error(error);
      return;
    });
  }
}

Nodejs 社区中,Telegram 的 API 由 node-telegram-bot-api 提供,我们不大可能为了这个来实现一个自己的 nodejs package,所以我们需要利用 Proxy 劫持其方法的调用。

export class RateLimiterControl {
  // ...
  createRateLimiterProxy<T extends object>(target: T) {
    return new Proxy(target, {
      get: (target, prop, receiver) => {
        const origProperty = target[prop as keyof T];
        // 劫持方法调用
        if (typeof origProperty === 'function') {  // 可以添加更多的过滤项 
          return this.rateLimiter.wrap(origProperty.bind(target));
        }
        return Reflect.get(target, prop, receiver);
      },
    }) as unknown as T;
  }
}

import TelegramBot from 'node-telegram-bot-api';
// 继承速率控制器
export class TelegramBotClient extends RateLimiterControl {
  bot: TelegramBot;
  
  constructor(token: string, options?: TelegramBot.ConstructorOptions) {
    super();
    // 更新速率配置
    this.rateLimiter.updateSettings({
      id: 'TelegramTGClient', // 共享速率配置
      minTime: 500,
      maxConcurrent: 2,
    });
    
    // 劫持原本的 TelegramBot 方法
    this.bot = this.createRateLimiterProxy(new TelegramBot(token, options));
  }
}

独立封装成 TGClientClass 后还可以添加自己的一些方法,比如 sendMDMessage,这里分享一下我的格式化 Telegram MarkdownV2 Message 的方法,这样就可以先写好本地的 markdown template 来发送消息了。

具体如下:

export function formatMarkdownMessages(text: string) {
  const escape = /[_*[\]()~`>#+\-=|{}.!]/;
  const replaceEscape = /[~>+=|.!]/;
  const doubleEscape = /[`_*]/;
  const bracketsEscape = /[{}[]()]/;

  // 以换行符分割字符串
  const strArr = text.split('\n');

  // ? not support table(|) and newline code(```)
  for (const [rowIndex, rawStr] of strArr.entries()) {
    const stack: string[] = [];
    const indexes: number[] = [];
    // 处理特殊字符 * 与连续的 #, 如 **bold**, 一般都是由两个*组成的
    let rawArr = rawStr.replace(/\*{2}|#{2,}\s/g, (match) => match?.[0] ?? '').split('');
    for (const [i, char] of rawArr.entries()) {
      // 跳过不需要转义的字符
      if (!escape.test(char)) continue;
      // 跳过已转义的字符
      if (rawArr[i - 1] === '\\') continue;
      // 处理特殊字符 #
      if (char === '#') {
        if (i === 0) {
          rawArr = ['*', formatMarkdownMessages(rawArr.slice(rawArr[i + 1] === ' ' ? 2 : 1).join('')), '*'];
          break;
        }
        rawArr[i] = '\\#';
      }
      // 处理特殊字符 -, 格式化后续的空格
      if (char === '-') {
        if (rawArr[i + 1] === ' ' && i === 0) {
          rawArr[i] = '';
          rawArr[i + 1] = '';
        } else {
          rawArr[i] = '\\-';
        }
        continue;
      }
      // 处理需要直接替换的字符
      if (replaceEscape.test(char)) {
        rawArr[i] = `\\${char}`;
        continue;
      }
      // 处理需要双重转义的字符
      if (doubleEscape.test(char)) {
        if (stack.at(-1) === char) {
          stack.pop();
          indexes.pop();
        } else {
          stack.push(char);
          indexes.push(i);
        }
        continue;
      }
      // 处理括号, 非链接形式的括号都需要转义, 先收集再转义
      if (bracketsEscape.test(char)) {
        stack.push(char);
        indexes.push(i);
        continue;
      }
    }
    // 检查括号是否匹配
    let start = 0;
    for (const [i, char] of stack.entries()) {
      if (char === '[') {
        start = i;
      } else if (char === ')' && i - start === 3) {
        for (let j = start; j <= i; j++) {
          indexes[j] = -1;
        }
      }
    }
    // 转义非法字符
    for (const i of indexes) {
      if (i === -1) continue;
      rawArr[i] = `\\${rawArr[i]}`;
    }
    strArr[rowIndex] = rawArr.join('');
  }
  return strArr.join('\n');
}

// TelegramBotClient 中的使用
export class TelegramBotClient extends RateLimiterControl {
  sendMDMessage(
    chatId: number | string,
    text: string,
    options?: TelegramBot.SendMessageOptions & { autoformat?: boolean }
  ) {
    return this.bot.sendMessage(chatId, options?.autoformat !== false ? formatMarkdownMessages(text) : text, {
      parse_mode: 'MarkdownV2',
      ...options,
    });
  }
}

0x2 Telegram Command 的解析模式与 defineComdan 功能封装

Telegram 的 Bot 的设计模式类似于系统终端中的 Command, 当用户发送消息后执行对应的 Command 操作.

通常来说,用户发起一次 Command 消息的执行逻辑流程如下图所示是一个交互式的、线性的流程。

Telegram Bot Nodejs Server 设计Telegram 作为最近比较有戏份的生态,刚好我有幸开发了一

所有的消息都应当来自于架构设计中的 Webhook 网关,在 node-telegram-bot-api 中,它是通过 EventEmitter 来触发每一条 Command 的。

下面是一些常用回调的展示:

bot.on('message', msg => {
  if (msg.text) {  // 解析文本消息
    logger.debug(`[TGM] @${msg.chat.username}_${msg.chat.id}: ${msg.text}`);
  }

  if (msg.document) {  // 解析文档消息,主要为用户提供的文件数据,TG会提供文档的 file_id 以供下载
    logger.debug(`[TGM] @${msg.chat.username}_${msg.chat.id}: [Document]`);

    // get document
    const document = msg.document;
    if (document) {
      // 通过 file_id 获取下载链接
      botClient.bot.getFileLink(document.file_id).then(link => {
        logger.debug(`[TGM] @${msg.chat.username}_${msg.chat.id}: ${link}`);
      });
    }
  }

  if (msg.photo) {  // 解析图片消息,TG会提供图片的 file_id 来查询其链接
    logger.debug(`[TGM] @${msg.chat.username}_${msg.chat.id}: [Photo]`);

    // get photo
    const photo = msg.photo.at(-1);
    if (photo) {
      // 通过 file_id 获取图片 url
      botClient.bot.getFileLink(photo.file_id).then(link => {
        logger.debug(`[TGM] @${msg.chat.username}_${msg.chat.id}: ${link}`);
      });
    }
  }

  if (msg.contact) {  // 解析联系人消息
    logger.debug(`[TGM] @${msg.chat.username}_${msg.chat.id}: [Contact]`);
  }
});

botClient.bot.on('callback_query', query => {  // 点击内联按钮(inline button)的回调方法
  logger.debug(`[TGCallbackQuery] @${query.from.username}_${query.from.id}: ${query.data}`);
});

使用 callback 来实现业务逻辑可能不够抽象,对数据操作的封装程度并不是很高。通过拆解执行流程图,我们可以得到每个 Command 执行时可能需要让用户提供更多的数据的需求。

这里我想到了 citty 对 cli command 的封装逻辑,实现了类似的 defineTGCommand API,下面是我的实现。

0x2.1 defineCommand 的类型定义

在业务侧,用户的输入类型很多,我们不能仅支持简单的字符串类型,还需要支持 booleannumber 甚至是 file 等输入类型。

在 Typescript 的编程环境中,我们可以利用它的泛型特性实现这一点,以便我们后续的业务逻辑开发:

// 定义参数支持的类型
type TGCommandArgTypes = 'string' | 'boolean' | 'number' | 'file';

// 工具范型
type Recordable<T = any> = Record<string, T>;

// 定义参数的具体类型
export type TGCommandArgs = Recordable<{
  type?: TGCommandArgTypes;  // 参数的类型
  required?: boolean;  // 是否为必选参数
  description?: string;  // 提供给用户该参数的描述
  templateFilepath?: string;  // 模版文件
  contentType?: string;  // 模版文件的数据类型
}>;

再通过 Typescript 的条件类型(conditional types)以及映射类型(mapped types)实现对泛型的约束与映射:

type ArgsFromConfig<T extends TGCommandArgs> = {  // 约束为 TGCommandArgs
  [K in keyof T] // 对 TGCommandArgs 进行映射,
    : T[K]['type'] extends 'string'  // 判断type是否为'string',如果是则数据的类型为string
    ? string
    : T[K]['type'] extends 'boolean'  // 判断type是否为'boolean',如果是则数据的类型为boolean
    ? boolean
    : T[K]['type'] extends 'number'  // 判断type是否为'number',如果是则数据的类型为number
    ? number
    : T[K]['type'] extends 'file'  // 判断type是否为'file',如果是则数据的类型为url链接
    ? `https://${string}`
    : never; // 解析失败则为never类型
};

当执行 Command 时携带它定义的 Args 等参数至回调函数 ,我们可以定义一个详细的 Context 以供使用:

export type TGCommandContext<T extends TGCommandArgs> = {
  command: string;  // 当前执行的命令
  msg: TelegramBot.Message;  // 用户的原始消息
  chatId: TelegramBot.Message['chat']['id'];  // 用户的chatId
  match?: RegExpMatchArray | null;  // 当用户执行/command_name?q=query参数时的匹配项
  data?: Recordable<string | number | boolean>;  // 初始化数据
  args: ArgsFromConfig<T>;  // command的参数
};

最后将其添加至 Callback 中,同时完善 TGCommand 的定义:

type TGCommand<T extends TGCommandArgs> = {
  command: string;  // 定义的命令
  description: string;  // 该命令的描述,setMyCommands 时使用
  // 启动回调
  setup?: (ctx: Omit<TGCommandContext<T>, 'msg' | 'match' | 'args' | 'chatId'>) => unknown;
  callback?: (ctx: TGCommandContext<T>) => unknown;  // 执行回调
  data?: Record<string, string | number>;  // 初始化数据
  args?: T;  // command的参数
};

export function defineTGCommand<T extends TGCommandArgs>(params: TGCommand<T>) {
  // 内部逻辑...
}
0x2.2 defineCommand 的参数获取

仅定义类型并不能帮助我们实现逻辑的封装,我们拆解开一步一步实现。

具体来说,我们首先需要解析 command 的 args 让用户提供执行时必要的参数,这就需要发送与回复多条消息,这可以使用一个较为复杂的 Promise 来实现:

// 工具方法
function getArgsValue(type: TGCommandArgTypes, value: string) {
  return type === 'boolean' ? value === 'true' : type === 'number' ? parseInt(value) : value;
}

// 获取 command 的参数
async function getCommandArgs<T extends TGCommandArgs, R extends ArgsFromConfig<T>, K extends keyof R, V extends R[K]>(
  client: TelegramBotClient,
  msgId: string,
  msg: TelegramBot.Message,
  args?: T
) {
  // 使用result变量来保存结果
  const result = {} as R;
  // 没有 args 直接返回结果即可
  if (!args) return result;
  for (const [key, { type = 'string', required, description, ...fileInfo }] of Object.entries(args)) {
    // 利用 Promise 来获取单个参数
    await new Promise<void>((resolve, reject) => {
      const callback = async (newMsg: TelegramBot.Message) => {
        // 跳过其他用户的消息
        if (newMsg.chat.id !== msg.chat.id) return;
        // 处理结束的情况
        const done = (err?: Error) => {
          if (err) {
            logger.error(err);
            reject(err);
          } else {
            resolve();
          }
          client.bot.off('message', callback);
        };
        // 跳过使用 /skip 命令跳过非必要命令
        if (newMsg.text === '/skip' && !required) return done();
        // boolean 值的参数
        if (/^\/(yes|no)/.test(newMsg.text ?? '')) {
          if (type !== 'boolean') return done(new Error('Boolean type required'));
          result[key as K] = (newMsg.text === '/yes') as V;
          return done();
        }
        // 在出现其他命令时停止
        if (newMsg.text?.startsWith('/')) return done(new Error('New command detected'));
        // 处理文件类型的数据
        if (type === 'file') {
          if (!newMsg.document) return done(new Error('File is required'));
          try {
            // 写入临时目录中
            const filepath = path.resolve(`temp/${newMsg.document.file_unique_id}-${newMsg.document.file_name}`);
            result[key as K] = (await client.downloadFileMessage(newMsg, filepath)) as V;
            // 写入 redis 缓存中以便后续清理
            await redis.set(RedisKeys.tg.argsFile(msgId), filepath);
          } catch (error) {
            return done(error as Error);
          }
          return done();
        }
        // 检查文本类参数
        if (!newMsg.text) return done(new Error('Text is required'));
        result[key as K] = getArgsValue(type, newMsg.text) as V;
        done();
      };
      // 监听事件
      client.bot.on('message', callback);
    });
  }
  return result;
}

然后再添加发送提示消息给用户的逻辑:

function generateArgsTipMessage(arg: TGCommandArgs[string]) {
  let msg = '';
  if (arg.type === 'boolean') {  // 布尔类型
    msg = `${arg.description}请选择是\(/yes\) 或 否\(/no\)`;
  } else if (arg.type === 'file') {  // 文件类型
    msg = `${arg.description ?? '请下载文件并按照按照文件中的格式填写后重新发送'}, 请勿超过 50 Mb`;
  } else {
    msg = `请输入${arg.description || ''}`;  // 文本类型
  }
  if (!arg.required) {
    msg += '或跳过\(/skip\)';  // 加上可跳过的选项
  }
  return msg;
}

将它添加至 getCommandArgs 中,并且特殊化处理一下文件类型的参数:

// getCommandArgs
// ...
for (const [/* ... */] of Object.entries(args)) {
  // ...
  client.bot.on('message', callback);
  // 文件类型需要发送模板文件
  if (type === 'file') {
    if (!fileInfo.templateFilepath) return reject(new Error('Filepath is required for file type'));
    sendTemplate(client, msg, {
      type,
      required,
      description,
      ...fileInfo,
    } as DeepNonNullable<TGCommandArgs>[string]);
    return;
  }
  // 发送参数消息
  client.sendMDMessage(msg.chat.id, generateArgsTipMessage({ type, required, description }));
}

删除临时文件可以通过 fs.unlink 实现:

// 删除临时文件
async function unlinkArgsFile(msgId: string) {
  const filepath = await redis.get(RedisKeys.tg.argsFile(msgId));
  if (filepath) {
    await fs.unlink(filepath);
  }
}
0x2.3 defineCommand 的执行与初始化

最后为了方便其他场景主动执行 Command,我们可以通过实现一个 dispatchCommand 方法,独立与抽象出具体的执行逻辑:

// 命令集合
const commands = new Map<string, TGCommand<any>>();

// 执行 command
export async function dispatchCommand(command: string, msg: TelegramBot.Message, match?: RegExpMatchArray | null) {
  if (!commands.has(command)) return;  // 跳过非法 command,可以添加错误提示
  const msgId = msg.message_id.toString();
  const {
    args: commandArgs,
    templatePath,
    data,
    callback,
  } = commands.get(command) as TGCommand<TGCommandArgs>;
  try {
    const ctx: TGCommandContext<TGCommandArgs> = {
      command,
      client: botClient,  // TelegramBotClient 的实例
      msg,
      match,
      data,
      chatId: msg.chat.id,
      args: {},
    };
    // 获取参数
    ctx.args = await getCommandArgs(botClient, msgId, msg, commandArgs);
    // 执行回调
    await callback(ctx);
  } catch (error) {
    logger.error(`Error executing command ${command}: ${(error as Error)?.message}`);
  } finally {
    await unlinkArgsFile(msgId);
  }
}

添加 onText 监听用户发送的消息,并使用 dispatchCommand 执行 Command:

export function defineTGCommand<T extends TGCommandArgs>({ command, ...other }: TGCommand<T>) {
  // 重复命令检查
  if (commands.has(command)) {
    logger.warn(`Command ${command} already exists, overwriting`);
    botClient.bot.removeTextListener(new RegExp(`^/${command}(\?.*)?$`));
  }
  // 保存命令至集合
  commands.set(command, { command, ...other });
  
  // 监听并匹配参数消息
  botClient.bot.onText(new RegExp(`^/${command}(\?.*)?$`), async (msg, match) => {
    // 执行命令
    await dispatchCommand(command, msg, match);
  });
}

最后这一步我们利提供一个 setup 方法,调用 TG API 的 getMyCommands 获取老的 command 检查是否需要更新,如果有新添加的命令再调用 setMyCommands 更新命令即可。

具体的 setupTGCommands 实现如:

export async function setupTGCommands(): Promise<boolean> {
  // 检查并生成临时文件夹的目录
  if ((await fs.access(path.resolve('temp')).catch(() => false)) === false) {
    await fs.mkdir(path.resolve('temp'));
  }

  try {
    // 获取老命令
    const oldCommands = await botClient.bot.getMyCommands();
    const oldCommandsHash = new Set(oldCommands.map(({ command }) => command));
    let updated = false;  // 标识
    const newCommands: TelegramBot.BotCommand[] = [];
    for (const [command, config] of commands) {
      // 如有新增 command 更新标识
      if (!oldCommandsHash.has(command)) updated = true;
      try {
        // 执行 command 的启动参数
        await config.setup?.({ command, client: botClient, data: config.data });
      } catch (error) {
        logger.error(`Error setting up command ${command}: ${(error as Error)?.message}`);
      }
      // 添加至新 newCommands 中
      newCommands.push({ command, description: config.description });
    }
    // 更新 commands
    if (updated) {
      logger.info('Updating Telegram commands');
      await botClient.bot.setMyCommands(newCommands);
    }
    return updated;
  } catch (error) {
    logger.error(error);
    return setupTGCommands();  // 自动重试
  }
}

0x3 Telegram Query Callback

Telegram 中所有的非 Command 场景,也就是交互式场景中频繁出现的回调操作都是通过 Query Callback 实现的。

它的执行逻辑类似 Command,区别在于 Command 是逻辑的路口,而 Query Callback 是执行分支。

0x3.1 defineTGQueryCallback 的实现

它也可以通过 defineQuery 的方式定义,以共享给所有 TG 消息的 inline_keyboard 中,它与上述的 defineCommand 的实现类似,这里就不展开更细节的部分了,直接提供代码供大家参考。

type TGQueryArgs = Recordable<{
  type?: 'string' | 'boolean' | 'number';
  required?: boolean;
}>;

type ArgsFromConfig<T extends TGQueryArgs> = {
  [K in keyof T]: T[K]['type'] extends 'string'
    ? string
    : T[K]['type'] extends 'boolean'
    ? boolean
    : T[K]['type'] extends 'number'
    ? number
    : never;
};

type IsRequired<T extends TGQueryArgs> = {
  [K in keyof T]: T[K]['required'] extends true ? K : never;
}[keyof T];

type TGQueryCallbackContext<T extends TGQueryArgs> = {
  query: string;
  client: TelegramBotClient;
  msg?: TelegramBot.Message;
  args: ArgsFromConfig<T>;
};

type TGQueryCallbackConfig<T extends TGQueryArgs> = {
  query: string;
  text: string;
  callback?: (ctx: TGQueryCallbackContext<T>) => unknown;
  args?: T;
  cleanup?: (ctx: TGQueryCallbackContext<T>) => unknown;
};

const queryCallbacks = new Map<string, TGQueryCallbackConfig<any>>();

export function setupTGQueries() {
  botClient.bot.on('callback_query', async (query) => {
    const throwError = (message: string) => {
      logger.error(`[TGCallbackQuery] @${query.from.username}_${query.from.id}: ${message}`);
      botClient.bot.answerCallbackQuery(query.id, { text: message, show_alert: true });
    };

    if (!query.data) return throwError('No data');

    const [command, argsKey] = query.data.split(':');
    if (!command) return throwError('No query');

    const callback = queryCallbacks.get(command);
    if (!callback) return throwError('Query not found');

    const parserArgs = {} as ArgsFromConfig<typeof callback.args>;

    if (argsKey) {
      const cacheParams = await redis.get(RedisKeys.tg.cbParams(command, argsKey));

      if (cacheParams) Object.assign(parserArgs, JSON.parse(cacheParams));
    }

    logger.debug(`[TGCallbackQuery] @${query.from.username}_${query.from.id}: ${command} ${safeStringify(parserArgs)}`);

    // check required args
    for (const key in callback.args) {
      if (callback.args[key]?.required && parserArgs[key] === undefined) {
        return throwError(`Missing required argument: ${key}`);
      }
    }

    const ctx = { query: command, client: botClient, msg: query.message, args: parserArgs };
    try {
      botClient.bot.answerCallbackQuery(query.id);
      await callback.callback?.(ctx);
      await callback.cleanup?.(ctx);
    } catch (error) {
      logger.error(`Error executing query callback ${command}: ${(error as Error)?.message}`);
    }
  });
}

export function defineTGQueryCallback<T extends TGQueryArgs>(
  config: TGQueryCallbackConfig<T> & { args: TGQueryArgs }
): (args: Partial<ArgsFromConfig<T>> & Pick<ArgsFromConfig<T>, IsRequired<T>>) => TelegramBot.InlineKeyboardButton;

export function defineTGQueryCallback<T extends TGQueryArgs>(
  config: TGQueryCallbackConfig<T> & { args?: TGQueryArgs }
): TelegramBot.InlineKeyboardButton;

export function defineTGQueryCallback<
  T extends TGQueryArgs,
  C extends TGQueryCallbackConfig<T>,
  R = C extends { args: TGQueryArgs }
    ? (args: ArgsFromConfig<T>) => TelegramBot.InlineKeyboardButton
    : TelegramBot.InlineKeyboardButton
>(config: TGQueryCallbackConfig<T>): R {
  queryCallbacks.set(config.query, config);

  if (config.args) {
    if (config.query.length > 64 - 9) throw new Error('Query length must be less than 55 characters(9 for key)');
    return ((args: Partial<ArgsFromConfig<T>> & Pick<ArgsFromConfig<T>, IsRequired<T>>) => {
      const key = nanoid(8);
      const params: Partial<ArgsFromConfig<T>> = {};

      for (const key in config.args) {
        const value = args?.[key];
        if (config.args[key]?.required && value === undefined) {
          throw new Error(`Missing required argument: ${key}`);
        }
        params[key] = value;
      }

      redis.set(RedisKeys.tg.cbParams(config.query, key), safeStringify(params));

      return {
        text: config.text.replace(/\{(\w+)\}/g, (_, key) => {
          return params[key as keyof typeof params] as string;
        }),
        callback_data: `${config.query}:${key}`,
      };
    }) as R;
  }

  return {
    text: config.text,
    callback_data: config.query,
  } as R;
}
0x3.1 defineTGQueryCallback 的使用

这里提供一些常用的 Query Callback 定义给大家参考。

0x3.1.1 关闭消息
export const close = defineTGQueryCallback({
  query: 'close',
  text: '❌ 关闭',
  callback: async ({ client, msg }) => {
    if (!msg) return;

    await client.bot.editMessageText('该消息已被关闭!', {
      chat_id: msg.chat.id,
      message_id: msg.message_id,
      reply_markup: {
        inline_keyboard: [[del]],
      },
    });
  },
});
0x3.1.2 删除消息
export const del = defineTGQueryCallback({
  query: 'delete',
  text: '🗑 删除',
  callback: async ({ client, msg }) => {
    if (!msg) return;

    await client.bot.deleteMessage(msg.chat.id, msg.message_id);
  },
});
0x3.1.3 实际使用
// 集中定义
export const Queries = {
  /** 关闭消息 */
  close,
  /** 删除消息 */
  del,
}

// 实际使用
const reply_markup = {
  inline_keyboard: [
    [Queries.close],
    [Queries.del],
  ],
};
转载自:https://juejin.cn/post/7416659736986370067
评论
请登录