likes
comments
collection
share

基于redis实现的轻量级延迟队列- lmstfy

作者站长头像
站长
· 阅读数 6

什么是延迟队列

延迟队列(Delay Queue)是一种存储消息并在特定延迟时间后将其投递到消费者的队列机制。在传统消息队列中,消息会立即被推送给消费者进行处理,但在某些场景下,我们希望消息在一段时间后再被消费者处理。

延迟队列的工作原理

延迟队列的工作原理通常是将消息先存储在队列中,消息的投递时间会被延迟,直到延迟时间到达时才将消息投递给消费者进行处理。这种机制保证了消息能够在指定的延迟时间后才会被处理,实现了消息的延迟投递。

延迟队列的使用场景

  1. 电商平台,超过 30 分钟未支付的订单,将会被取消
  2. 商品签收后,3天未确认,自动确认
  3. 在平台注册但 30 天内未登录的用户,发短信提醒
  4. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
  5. 商品抢购,预定后,开售前30分钟提醒

场景很多,就不一一列举,这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务。这些就可以使用延迟队列来解决

延迟队列常见的实现方式

  1. 通过程序的方式实现,例如 JDK 自带的延迟队列 DelayQueue
  2. 通过 MQ 框架来实现,例如 RabbitMQ 可以通过 rabbitmq-delayed-message-exchange 插件来实现延迟队列
  3. 通过 Redis 的方式来实现延迟队列

JDK 自带的延迟队列 DelayQueue

优点

  1. 开发比较方便,可以直接在代码中使用
  2. 代码实现比较简单

缺点

  1. 不支持持久化保存
  2. 不支持分布式系统

MQ 实现方式

RabbitMQ 本身并不支持延迟队列,但可以通过添加插件 rabbitmq-delayed-message-exchange 来实现延迟队列。

优点

  1. 支持分布式
  2. 支持持久化

缺点

框架比较重,需要搭建和配置 MQ。

Redis 实现方式

Redis 是通过有序集合(ZSet)的方式来实现延迟消息队列的,ZSet 有一个 Score 属性可以用来存储延迟执行的时间。

优点

  1. 灵活方便,Redis 是互联网公司的标配,无序额外搭建相关环境;
  2. 可进行消息持久化,大大提高了延迟队列的可靠性;
  3. 分布式支持,不像 JDK 自身的 DelayQueue;
  4. 高可用性,利用 Redis 本身高可用方案,增加了系统健壮性。

缺点

需要使用无限循环的方式来执行任务检查,会消耗少量的系统资源。

综合以上,我们决定使用redis去实现延迟队列,刚好有个开源项目lmstfy基于redis实现的,下面介绍下这个项目的使用

lmstfy

美图开源的lmstfy(let me schedule task for you),基于redis存储,使用golang开发,资源占用少,轻量级,并且经受美图线上环境大流量验证多年,较适合用来做延迟队列。

lmstfy特点

  1. 支持基本的消息队列特性:发布消息,消费消息,删除消息;
  2. 支持以下额外特性
    1. 支持消息有效期,过期自动删除;
    2. 支持消息延迟消费;
    3. 支持消息自动重试
    4. 支持死信队列
  1. 支持命名空间,队列级别的prometheus监控,提供grafana dashboard;
  2. 支持发布,消费流量控制。

lmstfy工作原理

基于redis实现的轻量级延迟队列- lmstfy

如上图所示:

  1. 消息经过生产者发布到lmstfy,如果delay时间大于0,则消息被扔到redis的ZSet中(score为延迟绝对时间),等延迟时间到了,才会挪到就绪队列;如果delay为0,则消息直接到就绪队列。
  2. 消费者每次都是从就绪队列消费消息;
  3. 服务器内部会有Timer,每隔一秒检查ZSet中是否有到期的消息,有则将消息移到就绪队列供消费者消费;
  4. 每一个消息发布的时候,可以设置最多被消费的次数,如果达到最大次数都还未正确处理(delete或者ack),则消息会被挪到死信队列;
  5. 业务可以选择复活死信队列中的消息,也可以选择直接删除死信队列中的消息;被复活的死信消息会再次挪到就绪队列中,可以被消费者正常消息。

lmstfy服务端

安装部署

前提

lmstfy是依赖redis实现的延迟队列,所以首先要安装redis,redis的安装教程可以谷歌搜一搜, 只需要注意一点,redis的持久化,使用AOF即可,淘汰策略使用noeviction 不淘汰任何数据,当内存不足时,新增操作会报错

reids的配置

# 持久化设置为aof
appendonly yes
# 内存淘汰策略设置为 noeviction
maxmemory-policy noeviction

编译二进制文件

#下载代码 
git clone https://github.com/bitleak/lmstfy.git
# 进入项目目录
cd lmstfy
# 使用make编译项目,当前目录会生成一个二进制文件,文件所在目录./_build
make

lmstfy的配置文件

Host = "127.0.0.1"
Port = 7776
AdminHost = "127.0.0.1" # optional, default to localhost
AdminPort = 7778 
#LogDir = "/var/log/lmstfy"
LogLevel = "info"
#LogFormat = "text" # Use LogFormat="json" if wants to print the log with json format
EnableAccessLog = true

# default params
#TTLSecond = 24 * 60 * 60 // 1 day
#DelaySecond = 0
#TriesNum = 1
#TTRSecond = 2 * 60 // 2 minutes
#TimeoutSecond = 0  // means non-blocking

# basic auth accounts for the admin api
[Accounts]
test_user = "change.me"

[AdminRedis]  # redis used to store admin data, eg. tokens
Addr = "localhost:6379"
# Password = foobared

[Pool]
[Pool.default]
Addr = "localhost:6379"
# Password = foobared
# DB = 0
#MigrateTo = "migrate" # When migration is enabled, all PUBLISH will go to `migrate` pool. and `default` will be drained
#[Pool.migrate]
#Addr = "localhost:6389"

#[Pool.mysentinel]
# Addr = "localhost:16379,localhost:6380,localhost:6381"
# MasterName = "mymaster"
# Password = foobared

启动lmsfty

_build/lmstfy-server -c config/demo-conf.toml

以上步骤lmsfty服务器端就启动完成了,那客户端怎么使用呢?

lmstfy客户端

申请token

imstfy使用namespace做业务隔离,每个业务一个token,需要向服务端申请,如果服务端开启了basic auth accounts验证,调用接口的时候 需要传入配置文件的账号,例如上面的配置,调用的时候需要传入”change.me“

例子:namespace为kb-test,申请token

curl --location 'http://127.0.0.1:7778/token/kb-test' \
--header 'Content-Type: application/x-www-form-urlencoded' \
--header 'Authorization: Basic dGVzdF91c2VyOmNoYW5nZS5tZQ==' \
--data-urlencode 'description=test'

postman调用

基于redis实现的轻量级延迟队列- lmstfy

结果 :

{
    "token": "01HQJ7EQBDZT88JH79BDE4FAYC"
}

官方目前支持的语言有go、Java、php、Rust,我这里使用go简单演示下如果使用

生产者

引入 github.com/bitleak/lmstfy/client这个包,代码如下:

package main

import (
	"fmt"
	"github.com/bitleak/lmstfy/client"
)

func main() {
	c := client.NewLmstfyClient("127.0.0.1", 7776, "kb-test", "01HQJ7EQBDZT88JH79BDE4FAYC")

	c.ConfigRetry(3, 50) // optional, config the client to retry when some errors happened. retry 3 times with 50ms interval
	//Publish a job with ttl==forever, tries==3, delay==5s ,注意ttl设置的时间一定要大于delay
	jobId, err := c.Publish("test", []byte("test"), 100, 3, 30)
	if err == nil {
		fmt.Println("消息发送成功", jobId)
	}

}

消费者

package main

import (
	"fmt"
	"github.com/bitleak/lmstfy/client"
)

func main() {
	c := client.NewLmstfyClient("127.0.0.1", 7776, "kb-test", "01HQJ7EQBDZT88JH79BDE4FAYC")

	c.ConfigRetry(3, 50) // optional, config the client to retry when some errors happened. retry 3 times with 50ms interval

	for {
		job, err := c.Consume("test", 6, 3)
		if err != nil {
			panic(err)
		}
		if job != nil {
			fmt.Println(string(job.Data))
			//接收到消息,成功处理了, 对消息进行ack去人
			err1 := c.Ack("test", job.ID)
			if err1 == nil {
				fmt.Println("消息出来成功并且确认")
			}
		}

	}
}

以上 就是lmstfy的介绍以及使用,简单比较轻量,其他语言,可以看github上的示例。