likes
comments
collection
share

RocketMQ 5.x如何使用GRPC方式发送消费消息

作者站长头像
站长
· 阅读数 56

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

RocketMQ版本

  • 5.1.0

背景

我们都知道RocketMQ 5.x新增了proxy模式部署方式,也就是支持了GRPC的消费方式消费,所以今天我们来试试

本次使用的开发语言是goland

前置条件

这里默认我们已经部署了RocketMQ proxy,如果不会部署的可以参考我之前的文章

依赖管理

本次使用的依赖管理方式是go.mod 使用的goland sdk是github.com/apache/rocketmq-clients/golang

也就是这个开源项目

 RocketMQ 5.x如何使用GRPC方式发送消费消息

我们直接执行

go get github.com/apache/rocketmq-clients/golang@master

master分支作为我们的依赖

发送消息

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	rmq_client "github.com/apache/rocketmq-clients/golang"
	"github.com/apache/rocketmq-clients/golang/credentials"
)

const (
	Topic     = "xiao-zou-topic"
	Endpoint  = "127.0.0.1:8081"
	AccessKey = "xxxxxx"
	SecretKey = "xxxxxx"
)

func main() {
	os.Setenv("mq.consoleAppender.enabled", "true")
	rmq_client.ResetLogger()
	// new producer instance
	producer, err := rmq_client.NewProducer(&rmq_client.Config{
		Endpoint: Endpoint,
		Credentials: &credentials.SessionCredentials{
			AccessKey:    AccessKey,
			AccessSecret: SecretKey,
		},
	},
		rmq_client.WithTopics(Topic),
	)
	if err != nil {
		log.Fatal(err)
	}
	// start producer
	err = producer.Start()
	if err != nil {
		log.Fatal(err)
	}
	// graceful stop producer
	defer producer.GracefulStop()

	for i := 0; i < 10; i++ {
		// new a message
		msg := &rmq_client.Message{
			Topic: Topic,
			Body:  []byte("this is a message : " + strconv.Itoa(i)),
		}
		// set keys and tag
		msg.SetKeys("a", "b")
		msg.SetTag("ab")
		// send message in sync
		resp, err := producer.Send(context.TODO(), msg)
		if err != nil {
			log.Fatal(err)
		}
		for i := 0; i < len(resp); i++ {
			fmt.Printf("%#v\n", resp[i])
		}
		// wait a moment
		time.Sleep(time.Second * 1)
	}
}

我们可以直接运行,然后看到消息发送成功了

 RocketMQ 5.x如何使用GRPC方式发送消费消息

消息消费

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	rmq_client "github.com/apache/rocketmq-clients/golang"
	"github.com/apache/rocketmq-clients/golang/credentials"
)

const (
	Topic         = "xiao-zou-topic"
	ConsumerGroup = "gid-xiaozou-grpc"
	Endpoint      = "127.0.0.1:8081"
	AccessKey     = "xxxxxx"
	SecretKey     = "xxxxxx"
)

var (
	// maximum waiting time for receive func
	awaitDuration = time.Second * 5
	// maximum number of messages received at one time
	maxMessageNum int32 = 16
	// invisibleDuration should > 20s
	invisibleDuration = time.Second * 20
	// receive messages in a loop
)

func main() {
	// log to console
	os.Setenv("mq.consoleAppender.enabled", "true")
	rmq_client.ResetLogger()
	// new simpleConsumer instance
	simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{
		Endpoint:      Endpoint,
		ConsumerGroup: ConsumerGroup,
		Credentials: &credentials.SessionCredentials{
			AccessKey:    AccessKey,
			AccessSecret: SecretKey,
		},
	},
		rmq_client.WithAwaitDuration(awaitDuration),
		rmq_client.WithSubscriptionExpressions(map[string]*rmq_client.FilterExpression{
			Topic: rmq_client.SUB_ALL,
		}),
	)
	if err != nil {
		log.Fatal(err)
	}
	// start simpleConsumer
	err = simpleConsumer.Start()
	if err != nil {
		log.Fatal(err)
	}
	// graceful stop simpleConsumer
	defer simpleConsumer.GracefulStop()

	go func() {
		for {
			fmt.Println("start receive message")
			mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
			if err != nil {
				fmt.Println(err)
			}
			// ack message
			for _, mv := range mvs {
				simpleConsumer.Ack(context.TODO(), mv)
				msg := string(mv.GetBody())
				fmt.Println(msg)
			}
			fmt.Println("wait a moment")
			fmt.Println()
			time.Sleep(time.Second * 3)
		}
	}()
	select {}
}

执行结果:

 RocketMQ 5.x如何使用GRPC方式发送消费消息

源码

相关源码已上传到github,需要可以自取

github.com/weihubeats/…

总结

可以看到我们使用GRPC的方式消费和发送消息都成功了,但是需要注意的是目前rocketmq-clients还不是很稳定,有一些bug,生产使用还是需要谨慎

转载自:https://juejin.cn/post/7265260701593288715
评论
请登录