likes
comments
collection
share

「Go开源包」asynq:一个基于redis的,简单、可靠、高效的分布式任务队列包

作者站长头像
站长
· 阅读数 22

大家好,我是渔夫子。

今天给大家推荐一个基于redis实现的简单可靠高效分布式任务队列asynq。该队列出自谷歌员工Ken Hibino

项目的开源地址:github.com/hibiken/asy… ,星标star 6.1k,目前有29位贡献者。

asynq的架构设计

基于redis的集群,支持哨兵模式。因此具备了存储可横向扩展及高可用性。以下是asynq运行的顶层设计原理:

  • 客户端将任务入队。
  • 服务端从队列拉取任务,并对每个任务启动一个协程(称之为worker)进行处理。
  • 队列中的任务能够被多个worker并发处理。

以下是该包的架构图: 「Go开源包」asynq:一个基于redis的,简单、可靠、高效的分布式任务队列包

主要特点

  • 保证每个任务至少被消费一次。
  • 支持定时任务
  • 支持任务消费失败重试
  • 支持延时队列功能
  • 支持redis集群模式(可横向扩展)和Sentinals(高可用)模式
  • 支持批量消费任务
  • 自带完善的监控功能

接下来,我们通过基于async包来编写一个客户端程序和一个服务端程序,用于发送消息和消费消息。

安装包

通过go get命令安装包

go get -u github.com/hibiken/asynq

核心数据类型

首先,我们来看一下在使用该包时要用到的两个核心数据类型,

redis连接配置类型:RedisClientOpt

asynq包使用Redis作为消息代理,我们称之为message broker。在和redis建立连接时,通过该类型来自定和redis建立连接的一些属性。比如redis的地址、密码等。

redisConnOpt := async.RedisClientOpt{
    Addr: "localhost:6379",
    // Omit if no password is required
    Password: "mypassword",
    // Use a dedicated db number for asynq.
    // By default, Redis offers 16 databases (0..15)
    DB: 0,
}

任务: Tasks

asynq中,一个工作单元被包装成一个Task,在Task类型中有两个字段:TypePayload

// Type是一个字符串类型,代表该任务的类型. 
func (t *Task) Type() string

// Payload 是消息体.
func (t *Task) Payload() []byte

生产任务

往队列中发送消息包括3步:和redis建立连接、构建任务Task、发送Task到redis。

和redis建立连接

通过NewClient函数和redis建立连接

client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

支持redis集群

在和redis建立连接时,通过asynq.RedisClusterClientOpt结构体配置集群。如下:

client := asynq.NewClient(asynq.RedisClusterClientOpt{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})

同样,在消费侧和redis建立连接的时候也采用集群模式。

支持redis哨兵模式

为了支持高可用性,在出现故障时redis服务能够起到自动恢复的目的,asynq包还支持redis的哨兵模式。如下:

var redis = &asynq.RedisFailoverClientOpt{
    MasterName:    "mymaster",
    SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
}

client := asynq.NewClient(redis)

构建任务

通过NewTask构建任务。但在构建任务之前,需要先根据自己的业务定义消息体的结构。这里我们定义了一个EmailTaskPayload的消息体结构,被消费时用于业务处理的数据。

type EmailTaskPayload struct {
    // ID for the email recipient.
    UserID int
}

payload, err := json.Marshal(EmailTaskPayload{UserID: 42})

//构建了2个任务,一个任务类型为"email:welcome"
t1 := asynq.NewTask("email:welcome", payload)

//构建任务,任务类型是 "email:reminder"
t2 := asynq.NewTask("email:reminder", payload)

将任务发送至队里

通过Enqueue函数就能将任务发送至队列,并且会被立即消费。

info, err := client.Enqueue(t1)

指定任务被消费的时间

有时候,我们不希望任务立即被消费,而是等待一定的时间后再被消费,也就是常说的延时消费或延时队列。在asynq包中有两种方法可以实现。

特定时间段后再被消费

通过ProcessIn函数指定一个时间段,如下,代表是在24小时后再被消费。

info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))

通过ProcessAt函数指定一个具体的时间点,如下,也是代表在24小时后再被消费。

info, err = client.Enqueue(t2, asynq.ProcessAt(time.Now().Add(24*time.Hour))

指定任务保存时间

通过Retention函数指定一个时间段,代表当任务被处理完后,还可以保留一定的时间。如下是当task3被成功消费后再保留2个小时。

client.Enqueue(task3, asynq.Retention(2*time.Hour))

指定入队的队列

通过Queue选项函数指定将消息发送至哪个队列。如下,将消息发送至redis的名为“high”的队列中。

client.Enqueue(t2, asynq.Queue("high"))

消费任务

要消费redis队列中的任务,首先要和redis建立连接,我们称之为server。通过以下方式和redis服务实例以及要订阅的队列进行关联。

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
    )

    // NOTE: We'll cover what this `handler` is in the section below.
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

指定要消费的队列

比如,我们要消费redis中的名为"high"、"slow"两个队列,那么通过asynq.Config结构体指定即可。如下:

srv := asynq.NewServer(
    asynq.RedisClientOpt{Addr: "localhost:6379"},
    asynq.Config{
    	Queues: map[string]int{
        	"high": 6,
        	"slow": 4,
      	}
    },
)

队列中的6和4代表两个队列的优先级别。

指定worker数量

在NewServer中,通过asynq.Config结构体中的Concurrency字段,可以指定启动的worker数量。多个worker可以并发的消费队列中的任务。如下,代表启动10个worker。如果不指定该配置参数,默认启动的worker数量是服务器的CPU个数。

srv := asynq.NewServer(
    asynq.RedisClientOpt{Addr: "localhost:6379"},
    asynq.Config{
    	Concurrency: 10,
      	Queues: map[string]int{
          "high": 6,
          "slow": 4,
      }
    },
)

指定消费处理器

从redis队列中获取到任务后应该如何被处理呢?就是在执行server的Run方法时指定一个Handler,该Handler是一个接口类型,需要实现如下接口:

type Handler interface {
	ProcessTask(context.Context, *Task) error
}

接下来,我们定义一个实现了该接口的类型handler,在该handler中执行具体的业务处理。在该handler中,根据接收到的任务类型做不同的处理。

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Send Welcome Email to User %d", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Send Reminder Email to User %d", p.UserID)

    default:
        return fmt.Errorf("unexpected task type: %s", t.Type())
    }
    return nil
}

然后,启动Server,指定该handler。如下:

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Use asynq.HandlerFunc adapter for a handler function
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

通过asynq包的Mux注册处理器

上面的任务消费的handler的代码可读性比较低。在asynq包中还提供了一个Mux类型,用于进行注册任务类型和对应的消费逻辑的功能。如下:

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

在该代码中,通过mux.HandleFunc函数分别注册了任务类型email:welcome的任务的处理逻辑是sendWelcomeEmail函数;任务类型是email:reminder的任务的处理逻辑是sendReminderEmail函数。

接下来我们再看两个处理函数的定义。如下:

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Send Welcome Email to User %d", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Send Reminder Email to User %d", p.UserID)
    return nil
}

批量处理任务

asynq包还支持批量处理任务。即在入队上将多个任务发送到相同的组和相同的队列。在消费的时候就可以通过指定的聚合函数将多个任务聚合成一个任务,再发送给任务处理器进行处理。

当达到以下两个中的任意一个条件时,就可以进行聚合并发送给任务处理器:

  • 最大聚合时间:若达到了一定的时间,则无论当前组任务有多少,都会进行一次聚合。
  • 最大聚合任务数:若还没有达到聚合时间的情况下,已经有指定数量的任务存在了,则也进行聚合。

指定任务所属组

在发送任务时,通过Group函数指定任务所属组:

// Enqueue three tasks to the same group.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

指定任务聚合相关配置

在进行消费启动服务时,可以指定聚合相关的函数选项。如下:

// This function is used to aggregate multiple tasks into one.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... Your logic to aggregate the given tasks and return the aggregated task.
    // ... Use NewTask(typename, payload, opts...) to create a new task and set options if needed.
    // ... (Note) Queue option will be ignored and the aggregated task will always be enqueued to the same queue the group belonged.
}

srv := asynq.NewServer(
	redisConnOpt,
    asynq.Config{
    	GroupAggregator:  asynq.GroupAggregatorFunc(aggregate), //指定聚合函数为aggregate
        GroupMaxDelay:    10 * time.Minute, //最晚10分钟聚合一次
        GroupGracePeriod: 2 * time.Minute, //每2分钟聚合一次
        GroupMaxSize:     20, // 每20个任务聚合一次
        Queues: map[string]int{"notifications": 1},
    },
)

// 相同组的聚合函数,最终将多个任务组成一个任务,发送个任务处理器
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
        log.Printf("Aggregating %d tasks from group %q", len(tasks), group)
        var b strings.Builder
        for _, t := range tasks {
                b.Write(t.Payload())
                b.WriteString("\n")
        }
        return asynq.NewTask("aggregated-task", []byte(b.String()))
}

监控

一个完善的队列系统,监控是必不可少的。asynq包还配备了两种形式的监控:webUI和命令行工具。

webUI监控

webUI监控是通过开源的asynqmon包实现的,地址是github.com/hibiken/asy…。如下图所示:

队列视角

「Go开源包」asynq:一个基于redis的,简单、可靠、高效的分布式任务队列包

任务视角

「Go开源包」asynq:一个基于redis的,简单、可靠、高效的分布式任务队列包

命令行监控

另外一种就是命令行监控。通过go install安装asynq工具,如下:

go install github.com/hibiken/asynq/tools/asynq

然后在终端输入命令 asynq dash 启动 命令行的dashboard。效果如下: 「Go开源包」asynq:一个基于redis的,简单、可靠、高效的分布式任务队列包

好了,以上就是给大家分享的基于redis的分布式队列包。更多功能请查看文档:github.com/hibiken/asy…

特别说明: 你的关注,是我写下去的最大动力。点击下方公众号卡片,直接关注。关注送《100个go常见的错误》pdf文档、经典go学习资料。