likes
comments
collection
share

Spring Cloud Stream,MQ基石,兵马未动,粮草先行!

作者站长头像
站长
· 阅读数 61

觉后不止明月上,满身花影倩人扶。

背景

首先,在微服务的架构下,一个应用被拆分为多个服务,各个服务之间通信来完成一个应用的整体功能。我们可以把服务之间的通信分为两种:

  • HTTP/RPC通信,优点是实时通信,缺点是服务之间的耦合度高。
  • 消息通信,优点是服务之间耦合性低,但缺点也很明显,不能完成实时通信。

根据不同的业务场景,会使用不同的通信方式,比如给用户发短信或者邮件这种业务我们会使用RocketMQ这类的消息通信来实现。

RocketMQ应用场景

应用场景大家应该都不陌生,首先消息通信肯定是发生在多个服务之间的,那多个服务之间直接调用会导致耦合度太高,引发连锁反应,那应用场景就必然有:

  • 异步解耦:RocketMQ可以实现异步通信和应用解耦,确保主站业务的连续性。比如淘宝交易系统,每笔交易订单会引发下游系统的关注,下游系统可能会有几百个,包括物流、购物车、积分等等,整体业务系统庞大而且复杂。

如果在请求太多的情况下,也就是高并发场景下,我们也就要求使用的消息中间件具有一定的消息堆积能力,这也是我们RocketMQ的一大应用场景:

  • 削峰填谷:高并发场景下,请求太多或者限制太多导致的服务挂掉或者说服务体验极差,RocketMQ在此场景下具有一定的消息堆积能力,并在其他场景下传输消息,达到削峰填谷的作用。

同样,我们也需要保证我们发出去的消息是有顺序性的,例如我们发出去的消息是1,2,3,而别人收到的是3,1,2,那这样的消息也不会是我们想要的,所以得保证消息的顺序性:

  • 顺序收发:RocketMQ提供的顺序消息保证了消息的FIFO。

其余的业务场景大家可以了解一下:

  • 分布式事务一致性:在分布式事务中,大家知道某些步骤是需要保证不失败的,那如何保证呢?那就是通过消息队列来保证执行成功,如果执行不成功,消息队列中的消息不会被删除,而是重试执行。
  • 大数据分析:我们可以在数据在MQ中的时候,使用RocketMQ与流式计算引擎结合,可以很方便的对业务数据进行实时分析。
  • 分布式缓存同步:集中式缓存因为带宽瓶颈限制数据变更的访问流量,通过RocketMQ构建分布式缓存,可以实时通过数据的变化。

Spring Cloud Stream 架构

在正式聊RocketMQ之前,我想跟大家先简单分享一下Spring Cloud Stream框架。

显而易见,这个是Spring Cloud体系内的一个框架,目的是简化消息业务在Spring Cloud应用程序中的开发。首先我们来看看Spring Cloud Stream的架构图:

Spring Cloud Stream,MQ基石,兵马未动,粮草先行!

Spring Cloud Stream 的实现基于发布和订阅的机制,其核心是由4部分组成,分别是

  • Spring Message(消息包装,消息通道)
  • Spring Integration(Spring Message扩展机制)
  • Binders(与消息中间件集成)
  • Bindings(消息中间件与生产者消费者之间的桥梁)

其实这里本来想跟大家详细介绍一下的,但是由于我们主要要说的RocketMQ,在这里也不适合扩大篇幅,大家有兴趣可以自己去了解一下,或者后续我再给大家单独说一下这个框架。

我们还是来看上面这个架构图,middleware正是我们的消息中间件,我们是通过Spring Cloud Stream 的Bindings(4大组成部分之一)叫做目标绑定器来和绑定外部消息与消息中间件,也就是说,我们通过Binder将我们的消息中间件集成到Spring Cloud Stream。

Spring Cloud Stream 消息发送流程

我们再来看一下Spring Cloud Stream 消息发送的流程图: 我们简单来分析一下这个流程图:

Spring Cloud Stream,MQ基石,兵马未动,粮草先行!

  1. 首先我们在业务代码中调用MessageChannel接口的send方法,AbstractMessageChannel是消息通道的基本实现类,提供发送消息和接收消息的公用方法。Spring Cloud Stream,MQ基石,兵马未动,粮草先行!Spring Cloud Stream,MQ基石,兵马未动,粮草先行!
  2. 消息发送到AbstractSubscribableChannel类实现的doSend()方法。Spring Cloud Stream,MQ基石,兵马未动,粮草先行!
  3. 通过消息分发类MessageDispatcher把消息分发给MessageHandler。也就是从AbstractSubscribableChannel的实现类 DirectChannel得到MessageDispatcher的实现类,UnicastingDispatcher,并调用dispach()方法把消息分发给MessageHandler。Spring Cloud Stream,MQ基石,兵马未动,粮草先行!
  4. AbstractMessageChannelBinder在初始化Binding时,会创建并出示话SendingHandler,调用subscribe()添加到handler列表,Producer的MessageHandler是由消息中间件Binder来完成的,Spring Cloud Stream提供了创建MessageHandler的规范,也就是下面这个方法,我们后面来分析RocketMQ Binder的具体实现过程: Spring Cloud Stream,MQ基石,兵马未动,粮草先行!

是结束,也是开始

MQ的背景部分算是结束了,我们简单说完了Spring Cloud Stream的架构以及消息发送流程,接下来就是紧张刺激的RocketMQ Binder集成的部分了。

上面只是贴了一部分核心的代码,大家可以根据步骤去自己找找源码看看,简单分析一下还是不太难的,很多代码我们都能看到常见设计模式的影子。

所以,我们RocketMQ见啦!

转载自:https://juejin.cn/post/7155475974702759944
评论
请登录