消息队列(MQ)是一种不同应用程序之间(跨进程)的通信方法。应用程序通过写入和检索出入列队的数据(消息)来通信,而无需通过专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用(Remote Procedure Call. RPC)的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。这样天然的就实现了异步的目标。那么MQ还有哪些功能场景呢。下面逐一介绍。
解耦
MQ最直接的使用场景就是可以将两个系统进行解耦,比如我们的货款抵扣业务场景,用户生成订单发送MQ后立即返回,结算系统去消费该MQ进行用户账户金额的扣款。这样订单系统只需要关注把订单创建成功,最大可能的提高订单量,并且生成订单后立即返回用户。而结算系统重点关心的是账户金额的扣减,保证账户金额最终一致。这个场景里面还会涉及到重试幂等性问题,后面有介绍。
削峰填谷
还是以订单系统和结算系统场景为例,如果订单系统通过RPC框架来调用结算系统,在有高峰促销的情况下生成订单的量会非常大,而且由于生成订单的速度也非常快,这样势必会给结算系统造成系统压力,服务器利用率则会偏高,但在不是高峰的时间点订单量比较小,结算系统的服务器利用率则会偏低。对于结算系统来说就会出现下面这样的高峰波谷现象图。
那么如果通过MQ的方式,将订单存储到MQ队列中,消费端通过拉取的方式,并且拉去速度有消费端来控制,则就可以控制流量趋于平稳。这样对于结算系统来讲,就达到了削峰填谷的目的。或者说起到了流控的目标。接下来,我们介绍一下拉取方式。
拉取模式指用户在代码里主动调用pull方法,不需要在配置文件里面再配置<mq:listener />,拉取的速度由用户控制,调用一次拉取一次消息进行消费,这里要重视消费的速度如果消费性能下降一定会造成积压,因此用户自己启用多线程控制并行度以提高消费速度。
代码样例:
messageConsumer.start(); for (;;){ //手动拉取消息 messageConsumer.pull(topic,messageListener); }
method: pull(String topic,MessageListener listener)
topic:指消费的主题名
listener:是一个回调对象,当pull拉取到消息后会主动调用listener.onMessage(),
与监听模式的区别是:监听模式由MQ客户端守护线程去不停的拉取消息进行消费,拉取模式由用户控制拉取的频率,不主动调用就不会消费消息。但是都不需要主动对消息进行确认。这种方式更适合写场景,保证最终结果落地即可,因为读是需要立即返回以免让用户长时间等待从而影响用户体验。
最终一致性
一致性问题分为强一致性、弱一致性、最终一致性。大多数互联网业务要求实现最终一致性。还是以订单系统和结算系统业务场景举例,订单系统创建成功一个订单后给用户返回的结果即是成功并明确告诉用户会从账户中扣除相应的金额。那么结算系统需要保持跟订单系统相同的状态即从用账户中实际扣除一致的金额。订单系统会涉及两个动作,一个是创建成功订单,一个是发送成功通知到MQ,我们就可以把这两个动作放入到一个本地事务中,要么成功要么失败。当一次发送MQ失败之后,可以结合定时任务进行补偿,这样可以保证生成订单的结果可以落地到mq的存储中。同样结算系统消费端依靠MQ重试机制一直发送消息,直到消费端最终确认扣款业务成功处理完成。这样我们通过消息落地加补偿,消费端从业务上面考虑重复消费的保障,也就是做好幂等性操作,利用MQ实现了最终一致性。
广播消费
MQ有两种消息模式一种是点对点模式,一种是发布/订阅模式(最常用的模式)。同时发布/订阅模式按照消费类型又可以分为集群消费和广播消费。大部分情况下我们使用的是集群消费。
集群消费:MQ发送任何一条消息,集群中只有一台服务器可随机消费到这条消息。如下图:
广播消费:MQ发送每一条消息,集群中的每一台服务器至少消费到一次。如下图:
广播消费举例:消息推送系统。首先某一个客户端与消息中心应用集群中的一台服务器建立长连接并将连接session信息保存到当前服务器内存中,集群在消费业务消息的时候,是不知道该客户端建立的长连接在哪一台服务器上面。这个时候通过广播消费,集群中的每一台服务器都可以消费到业务消息。在决定向用户推送通知之前会判断当前服务器内存中是否有该客户端的连接session信息,如果有则推送,进而客户端通过http协议拉取用户的消息实体。如果session信息不在当前服务器上面,则丢弃。如下图:
广播消费注意事项:
1、消费进度在消费端管理,比如默认会在主目录下创建offset文件夹,偏移量文件存储在offset目录下,出现重复的概率要大于集群消费。
2、MQ可以确保每条消息至少被每台消费方服务器消费一次,但是如果消费方消费失败,不会进入重试,因此业务方需要关注消费失败的情况。
3、由于广播消费消息不会进行确认,所以管理端上显示的积压数会一直不变,需要以出对数为准。
使用集群消费模拟广播
在发布/订阅模式中,如果是集群消费,那么一条消息只能被集群中的随机一台服务器消费到,如果我们有需要集群中的每台服务器消费到比如上面的消息推送的例子,我们使用广播消费来实现。但是广播消费有一些弊端比如不支持顺序消息,消费进度在客户端维护出现重复的几率要大于集群模式,广播模式下不能维护消费进度所以管理端上面的积压数一直保持不变,我们就必须以出队数为准,也就是不能够支持消息堆积的查询。如果要规避这些弊端,那么我们可以利用集群消费来模拟广播,在集群消费中,我们的每台服务器上面的消费APPID是相同的,如果要达到广播的效果,那么每台服务器上面的消费APPID保持不同就可以了。
重试之坑
MQ的重试功能可以保证数据结果最终得到处理,但同时也正因为有重试那么在业务处理的时候就需要格外注意幂等性的问题。比如货款抵扣业务,订单系统生成订单之后调用结算平台去扣除用户的账户金额。结算平台要根据流水号去计算,如果订单系统在调用结算平台的时候发生了网络异常,造成了结算平台实际上已经得到请求并且已处理。订单系统一侧认为发生异常需要重试,后续再发送到结算平台的订单就会造成重复扣款问题。所以流水号尤其要注意需要保证重试过程中每次发送的流水号是一致的,结算平台会根据流水号去做业务校验,如果已经处理,则丢弃,最终确保幂等性。
总结
我们介绍了MQ常见的使用场景,以及每种场景下的使用注意事项。尤其是在重试功能中,重试本来是MQ提供的一种保持数据最终可以得到确认的方法,但是如果业务使用上面不注意幂等性,则会带来业务数据的不一致甚至像重复扣款这样比较严重的后果。我们还介绍了发布/订阅模式下的广播消费的使用举例,也介绍了它的缺点以及可以使用集群消费来模拟广播。鉴于以上每种场景都给我们提供了很好的说明使得大家在以后使用MQ的过程中可以更好的发挥MQ的强大作用。