教程揭秘 | 動(dòng)力節(jié)點(diǎn)內(nèi)部Java零基礎(chǔ)教學(xué)文檔第十九篇:RocketMQ
接上期后續(xù)
本期分享第十九章節(jié)
RocketMQ
教學(xué)文檔馬上就分享完了,你們都跟上了嗎?
每天都在學(xué)習(xí)嘛?
有什么不會(huì)的嘛?
今日教學(xué)文檔分享來(lái)了?

今日新篇章
【RocketMQ】
1.?RocketMQ簡(jiǎn)介
官網(wǎng): http://rocketmq.apache.org/?
?

RocketMQ是阿里巴巴2016年MQ中間件,使用Java語(yǔ)言開發(fā),RocketMQ 是一款開源的分布式消息系統(tǒng),基于高可用分布式集群技術(shù),提供低延時(shí)的、高可靠的消息發(fā)布與訂閱服務(wù)。同時(shí),廣泛應(yīng)用于多個(gè)領(lǐng)域,包括異步通信解耦、企業(yè)解決方案、金融支付、電信、電子商務(wù)、快遞物流、廣告營(yíng)銷、社交、即時(shí)通信、移動(dòng)應(yīng)用、手游、視頻、物聯(lián)網(wǎng)、車聯(lián)網(wǎng)等。
具有以下特點(diǎn):
1.?l能夠保證嚴(yán)格的消息順序
2.?l提供豐富的消息拉取模式
3.?l高效的訂閱者水平擴(kuò)展能力
4.?l實(shí)時(shí)的消息訂閱機(jī)制
5.?l億級(jí)消息堆積能力
2.?為什么要使用MQ
1,要做到系統(tǒng)解耦,當(dāng)新的模塊進(jìn)來(lái)時(shí),可以做到代碼改動(dòng)最小; ?能夠解耦
2,設(shè)置流程緩沖池,可以讓后端系統(tǒng)按自身吞吐能力進(jìn)行消費(fèi),不被沖垮;?能夠削峰,限流
3,強(qiáng)弱依賴梳理能把非關(guān)鍵調(diào)用鏈路的操作異步化并提升整體系統(tǒng)的吞吐能力;能夠異步
Mq的作用 ?削峰限流 異步 解耦合
2.1?定義
中間件(緩存中間件 ?redis memcache ?數(shù)據(jù)庫(kù)中間件 mycat ?canal ??消息中間件mq )
面向消息的中間件(message-oriented middleware0) MOM能夠很好的解決以上的問題。
是指利用高效可靠的消息傳遞機(jī)制進(jìn)行與平臺(tái)無(wú)關(guān)(跨平臺(tái))的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來(lái)進(jìn)行分布式系統(tǒng)的集成。
通過(guò)提供消息傳遞和消息排隊(duì)模型在分布式環(huán)境下提供應(yīng)用解耦,彈性伸縮,冗余存儲(chǔ),流量削峰,異步通信,數(shù)據(jù)同步等
大致流程
發(fā)送者把消息發(fā)給消息服務(wù)器,消息服務(wù)器把消息存放在若干隊(duì)列/主題中,在合適的時(shí)候,消息服務(wù)器會(huì)把消息轉(zhuǎn)發(fā)給接受者。在這個(gè)過(guò)程中,發(fā)送和接受是異步的,也就是發(fā)送無(wú)需等待,發(fā)送者和接受者的生命周期也沒有必然關(guān)系在發(fā)布pub/訂閱sub模式下,也可以完成一對(duì)多的通信,可以讓一個(gè)消息有多個(gè)接受者[微信訂閱號(hào)就是這樣的]

2.2?特點(diǎn)
2.2.1?異步處理模式
消息發(fā)送者可以發(fā)送一個(gè)消息而無(wú)需等待響應(yīng)。消息發(fā)送者把消息發(fā)送到一條虛擬的通道(主題或隊(duì)列)上;
消息接收者則訂閱或監(jiān)聽該通道。一條信息可能最終轉(zhuǎn)發(fā)給一個(gè)或多個(gè)消息接收者,這些接收者都無(wú)需對(duì)消息發(fā)送者做出回應(yīng)。整個(gè)過(guò)程都是異步的。
案例:
也就是說(shuō),一個(gè)系統(tǒng)和另一個(gè)系統(tǒng)間進(jìn)行通信的時(shí)候,假如系統(tǒng)A希望發(fā)送一個(gè)消息給系統(tǒng)B,讓它去處理,但是系統(tǒng)A不關(guān)注系統(tǒng)B到底怎么處理或者有沒有處理好,所以系統(tǒng)A把消息發(fā)送給MQ,然后就不管這條消息的“死活” 了,接著系統(tǒng)B從MQ里面消費(fèi)出來(lái)處理即可。至于怎么處理,是否處理完畢,什么時(shí)候處理,都是系統(tǒng)B的事,與系統(tǒng)A無(wú)關(guān)。

這樣的一種通信方式,就是所謂的“異步”通信方式,對(duì)于系統(tǒng)A來(lái)說(shuō),只要把消息發(fā)給MQ,然后系統(tǒng)B就會(huì)異步處去進(jìn)行處理了,系統(tǒng)A不能“同步”的等待系統(tǒng)B處理完。這樣的好處是什么呢?解耦
2.2.2?應(yīng)用系統(tǒng)的解耦
? 發(fā)送者和接收者不必了解對(duì)方,只需要確認(rèn)消息
? 發(fā)送者和接收者不必同時(shí)在線
2.2.3?現(xiàn)實(shí)中的業(yè)務(wù)

3.?各個(gè)MQ產(chǎn)品的比較

4.?RocketMQ重要概念【重點(diǎn)】
Producer:消息的發(fā)送者,生產(chǎn)者;舉例:發(fā)件人
Consumer:消息接收者,消費(fèi)者;舉例:收件人
Broker:暫存和傳輸消息的通道;舉例:快遞
NameServer:管理Broker;舉例:各個(gè)快遞公司的管理機(jī)構(gòu) 相當(dāng)于broker的注冊(cè)中心,保留了broker的信息
Queue:隊(duì)列,消息存放的位置,一個(gè)Broker中可以有多個(gè)隊(duì)列
Topic:主題,消息的分類
ProducerGroup:生產(chǎn)者組
ConsumerGroup:消費(fèi)者組,多個(gè)消費(fèi)者組可以同時(shí)消費(fèi)一個(gè)主題的消息
消息發(fā)送的流程是,Producer詢問NameServer,NameServer分配一個(gè)broker 然后Consumer也要詢問NameServer,得到一個(gè)具體的broker,然后消費(fèi)消息
?

5.?生產(chǎn)和消費(fèi)理解【重點(diǎn)】
?

6.?RocketMQ安裝
了解了mq的基本概念和角色以后,我們開始安裝rocketmq,建議在linux上
6.1?下載RocketMQ
下載地址:https://rocketmq.apache.org/dowloading/releases/?
注意選擇版本,這里我們選擇4.9.2的版本,后面使用alibaba時(shí)對(duì)應(yīng)
?

下載地址:
https://archive.apache.org/dist/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip?
6.2?上傳服務(wù)器
在root目錄下創(chuàng)建文件夾
mkdir rocketmq
將下載后的壓縮包上傳到阿里云服務(wù)器或者虛擬機(jī)中去
?

6.3?解壓
unzip rocketmq-all-4.9.2-bin-release.zip
如果你的服務(wù)器沒有unzip命令,則下載安裝一個(gè)
yum install unzip
目錄分析
?

Benchmark:包含一些性能測(cè)試的腳本;
Bin:可執(zhí)行文件目錄;
Conf:配置文件目錄;
Lib:第三方依賴;
LICENSE:授權(quán)信息;
NOTICE:版本公告;
6.4?配置環(huán)境變量
vi /etc/profile
在文件末尾添加
export NAMESRV_ADDR=阿里云公網(wǎng)IP:9876
刷新環(huán)境變量
source /etc/profile
6.5?修改nameServer的運(yùn)行腳本
進(jìn)入bin目錄下,修改runserver.sh文件,將71行和76行的Xms和Xmx等改小一點(diǎn)
vi runserver.sh
?

保存退出
6.6?修改broker的運(yùn)行腳本
進(jìn)入bin目錄下,修改runbroker.sh文件,修改67行
?

保存退出
6.7?修改broker的配置文件
進(jìn)入conf目錄下,修改broker.conf文件
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr=localhost:9876
autoCreateTopicEnable=true
brokerIP1=阿里云公網(wǎng)IP
添加參數(shù)解釋
namesrvAddr:nameSrv地址 可以寫localhost因?yàn)閚ameSrv和broker在一個(gè)服務(wù)器
autoCreateTopicEnable:自動(dòng)創(chuàng)建主題,不然需要手動(dòng)創(chuàng)建出來(lái)
brokerIP1:broker也需要一個(gè)公網(wǎng)ip,如果不指定,那么是阿里云的內(nèi)網(wǎng)地址,我們?cè)俦镜責(zé)o法連接使用
6.8?啟動(dòng)
首先在安裝目錄下創(chuàng)建一個(gè)logs文件夾,用于存放日志
mkdir logs
?

一次運(yùn)行兩條命令
啟動(dòng)nameSrv
nohup sh bin/mqnamesrv > ./logs/namesrv.log &
啟動(dòng)broker 這里的-c是指定使用的配置文件
nohup sh bin/mqbroker -c conf/broker.conf > ./logs/broker.log &
查看啟動(dòng)結(jié)果
?

6.9?RocketMQ控制臺(tái)的安裝RocketMQ-Console
Rocketmq 控制臺(tái)可以可視化MQ的消息發(fā)送!
舊版本源碼是在rocketmq-external里的rocketmq-console,新版本已經(jīng)單獨(dú)拆分成dashboard
網(wǎng)址: https://github.com/apache/rocketmq-dashboard?
下載地址:
https://github.com/apache/rocketmq-dashboard/archive/refs/tags/rocketmq-dashboard-1.0.0.zip?
下載后解壓出來(lái),在跟目錄下執(zhí)行
mvn clean package -Dmaven.test.skip=true

?

將jar包上傳到服務(wù)器上去
?

然后運(yùn)行
nohup java -jar ./rocketmq-dashboard-1.0.0.jar > ./rocketmq-4.9.2/logs/dashboard.log &
命令拓展:--server.port指定運(yùn)行的端口
--rocketmq.config.namesrvAddr=127.0.0.1:9876?指定namesrv地址
訪問: ?http://localhost:8001?
運(yùn)行訪問端口是8001,如果從官網(wǎng)拉下來(lái)打包的話,默認(rèn)端口是8080
nohup java -jar rocketmq-dashboard-1.0.0.jar --server.port=8081 --rocketmq.config.namesrvAddr=47.100.238.122:9876 > rocketmq-dashboard.log &
?
?

7.?RocketMQ安裝之docker
7.1?下載RockerMQ需要的鏡像
docker pull rocketmqinc/rocketmq
docker pull styletang/rocketmq-console-ng
?
7.2?啟動(dòng)NameServer服務(wù)
7.2.1?創(chuàng)建NameServer數(shù)據(jù)存儲(chǔ)路徑
mkdir -p /home/rocketmq/data/namesrv/logs /home/rocketmq/data/namesrv/store
?
7.2.2?啟動(dòng)NameServer容器
docker run -d --name rmqnamesrv -p 9876:9876 -v /home/rocketmq/data/namesrv/logs:/root/logs -v /home/rocketmq/data/namesrv/store:/root/store -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv
7.3?啟動(dòng)Broker服務(wù)
7.3.1?創(chuàng)建Broker數(shù)據(jù)存儲(chǔ)路徑
mkdir -p /home/rocketmq/data/broker/logs /home/rocketmq/data/broker/store
7.3.2?創(chuàng)建conf配置文件目錄
mkdir /home/rocketmq/conf
7.3.3?在配置文件目錄下創(chuàng)建broker.conf配置文件
7.3.4?啟動(dòng)Broker容器
7.4?啟動(dòng)控制臺(tái)
?
7.5?正常啟動(dòng)后的docker ps
?

?
7.6?訪問控制臺(tái)
http://你的服務(wù)器外網(wǎng)ip:9999/
?

8.?RocketMQ快速入門
RocketMQ提供了發(fā)送多種發(fā)送消息的模式,例如同步消息,異步消息,順序消息,延遲消息,事務(wù)消息等,我們一一學(xué)習(xí)
8.1?消息發(fā)送和監(jiān)聽的流程
我們先搞清楚消息發(fā)送和監(jiān)聽的流程,然后我們?cè)陂_始敲代碼
8.1.1?消息生產(chǎn)者
1.創(chuàng)建消息生產(chǎn)者producer,并制定生產(chǎn)者組名
2.指定Nameserver地址
3.啟動(dòng)producer
4.創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體等
5.發(fā)送消息
6.關(guān)閉生產(chǎn)者producer
8.1.2?消息消費(fèi)者
1.創(chuàng)建消費(fèi)者consumer,制定消費(fèi)者組名
2.指定Nameserver地址
3.創(chuàng)建監(jiān)聽訂閱主題Topic和Tag等
4.處理消息
5.啟動(dòng)消費(fèi)者consumer
8.2?搭建Rocketmq-demo
8.2.1?加入依賴
?
8.2.2?編寫生產(chǎn)者
8.2.3?編寫消費(fèi)者
8.2.4?測(cè)試
啟動(dòng)生產(chǎn)者和消費(fèi)者進(jìn)行測(cè)試
9.?RocketMQ發(fā)送同步消息*
上面的快速入門就是發(fā)送同步消息,發(fā)送過(guò)后會(huì)有一個(gè)返回值,也就是mq服務(wù)器接收到消息后返回的一個(gè)確認(rèn),這種方式非常安全,但是性能上并沒有這么高,而且在mq集群中,也是要等到所有的從機(jī)都復(fù)制了消息以后才會(huì)返回,所以針對(duì)重要的消息可以選擇這種方式
?

10.?RocketMQ發(fā)送異步消息*
異步消息通常用在對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景,即發(fā)送端不能容忍長(zhǎng)時(shí)間地等待Broker的響應(yīng)。發(fā)送完以后會(huì)有一個(gè)異步消息通知
10.1?異步消息生產(chǎn)者
10.2?異步消息消費(fèi)者
11.?RocketMQ發(fā)送單向消息*
這種方式主要用在不關(guān)心發(fā)送結(jié)果的場(chǎng)景,這種方式吞吐量很大,但是存在消息丟失的風(fēng)險(xiǎn),例如日志信息的發(fā)送
11.1?單向消息生產(chǎn)者
11.2?單向消息消費(fèi)者
消費(fèi)者和上面一樣
12.?RocketMQ發(fā)送延遲消息*
消息放入mq后,過(guò)一段時(shí)間,才會(huì)被監(jiān)聽到,然后消費(fèi)
比如下訂單業(yè)務(wù),提交了一個(gè)訂單就可以發(fā)送一個(gè)延時(shí)消息,30min后去檢查這個(gè)訂單的狀態(tài),如果還是未付款就取消訂單釋放庫(kù)存。
12.1?延遲消息生產(chǎn)者
12.2?延遲消息消費(fèi)者
消費(fèi)者和上面一樣
這里注意的是RocketMQ不支持任意時(shí)間的延時(shí)
只支持以下幾個(gè)固定的延時(shí)等級(jí),等級(jí)1就對(duì)應(yīng)1s,以此類推,最高支持2h延遲
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
13.?RocketMQ發(fā)送順序消息**
消息有序指的是可以按照消息的發(fā)送順序來(lái)消費(fèi)(FIFO)。RocketMQ可以嚴(yán)格的保證消息有序,可以分為:分區(qū)有序或者全局有序。
可能大家會(huì)有疑問,mq不就是FIFO嗎?
rocketMq的broker的機(jī)制,導(dǎo)致了rocketMq會(huì)有這個(gè)問題
因?yàn)橐粋€(gè)broker中對(duì)應(yīng)了四個(gè)queue
?

順序消費(fèi)的原理解析,在默認(rèn)的情況下消息發(fā)送會(huì)采取Round Robin輪詢方式把消息發(fā)送到不同的queue(分區(qū)隊(duì)列);而消費(fèi)消息的時(shí)候從多個(gè)queue上拉取消息,這種情況發(fā)送和消費(fèi)是不能保證順序。但是如果控制發(fā)送的順序消息只依次發(fā)送到同一個(gè)queue中,消費(fèi)的時(shí)候只從這個(gè)queue上依次拉取,則就保證了順序。當(dāng)發(fā)送和消費(fèi)參與的queue只有一個(gè),則是全局有序;如果多個(gè)queue參與,則為分區(qū)有序,即相對(duì)每個(gè)queue,消息都是有序的。
下面用訂單進(jìn)行分區(qū)有序的示例。一個(gè)訂單的順序流程是:下訂單、發(fā)短信通知、物流、簽收。訂單順序號(hào)相同的消息會(huì)被先后發(fā)送到同一個(gè)隊(duì)列中,消費(fèi)時(shí),同一個(gè)順序獲取到的肯定是同一個(gè)隊(duì)列。
13.1?場(chǎng)景分析
模擬一個(gè)訂單的發(fā)送流程,創(chuàng)建兩個(gè)訂單,發(fā)送的消息分別是
訂單號(hào)111 消息流程 下訂單->物流->簽收
訂單號(hào)112 消息流程 下訂單->物流->拒收
13.2?創(chuàng)建一個(gè)訂單對(duì)象
?
13.3?順序消息生產(chǎn)者
13.4?順序消息消費(fèi)者,測(cè)試時(shí)等一會(huì)即可有延遲
?
14.?RocketMQ發(fā)送批量消息
Rocketmq可以一次性發(fā)送一組消息,那么這一組消息會(huì)被當(dāng)做一個(gè)消息消費(fèi)
14.1?批量消息生產(chǎn)者
14.2?批量消息消費(fèi)者
15.?RocketMQ發(fā)送事務(wù)消息****
15.1?事務(wù)消息的發(fā)送流程
它可以被認(rèn)為是一個(gè)兩階段的提交消息實(shí)現(xiàn),以確保分布式系統(tǒng)的最終一致性。事務(wù)性消息確保本地事務(wù)的執(zhí)行和消息的發(fā)送可以原子地執(zhí)行。
?


?
上圖說(shuō)明了事務(wù)消息的大致方案,其中分為兩個(gè)流程:正常事務(wù)消息的發(fā)送及提交、事務(wù)消息的補(bǔ)償流程。
事務(wù)消息發(fā)送及提交
1.?發(fā)送消息(half消息)。
2.?服務(wù)端響應(yīng)消息寫入結(jié)果。
3.?根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)(如果寫入失敗,此時(shí)half消息對(duì)業(yè)務(wù)不可見,本地邏輯不執(zhí)行)。
4.?根據(jù)本地事務(wù)狀態(tài)執(zhí)行Commit或Rollback(Commit操作生成消息索引,消息對(duì)消費(fèi)者可見)
事務(wù)補(bǔ)償
1.?對(duì)沒有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息),從服務(wù)端發(fā)起一次“回查”
2.?Producer收到回查消息,檢查回查消息對(duì)應(yīng)的本地事務(wù)的狀態(tài)
3.?根據(jù)本地事務(wù)狀態(tài),重新Commit或者Rollback
其中,補(bǔ)償階段用于解決消息UNKNOW或者Rollback發(fā)生超時(shí)或者失敗的情況。
事務(wù)消息狀態(tài)
事務(wù)消息共有三種狀態(tài),提交狀態(tài)、回滾狀態(tài)、中間狀態(tài):
l?TransactionStatus.CommitTransaction: 提交事務(wù),它允許消費(fèi)者消費(fèi)此消息。
l?TransactionStatus.RollbackTransaction: 回滾事務(wù),它代表該消息將被刪除,不允許被消費(fèi)。
l?TransactionStatus.Unknown: 中間狀態(tài),它代表需要檢查消息隊(duì)列來(lái)確定狀態(tài)。
15.2?事務(wù)消息生產(chǎn)者
15.3?事務(wù)消息消費(fèi)者
15.4?測(cè)試結(jié)果
?
?

16.?RocketMQ發(fā)送帶標(biāo)簽的消息,消息過(guò)濾
Rocketmq提供消息過(guò)濾功能,通過(guò)tag或者key進(jìn)行區(qū)分
我們往一個(gè)主題里面發(fā)送消息的時(shí)候,根據(jù)業(yè)務(wù)邏輯,可能需要區(qū)分,比如帶有tagA標(biāo)簽的被A消費(fèi),帶有tagB標(biāo)簽的被B消費(fèi),還有在事務(wù)監(jiān)聽的類里面,只要是事務(wù)消息都要走同一個(gè)監(jiān)聽,我們也需要通過(guò)過(guò)濾才區(qū)別對(duì)待
16.1?標(biāo)簽消息生產(chǎn)者
?
16.2?標(biāo)簽消息消費(fèi)者
?
17.?RocketMQ中消息的Key(業(yè)務(wù)相關(guān))
在rocketmq中的消息,默認(rèn)會(huì)有一個(gè)messageId當(dāng)做消息的唯一標(biāo)識(shí),我們也可以給消息攜帶一個(gè)key,用作唯一標(biāo)識(shí)或者業(yè)務(wù)標(biāo)識(shí),包括在控制面板查詢的時(shí)候也可以使用messageId或者key來(lái)進(jìn)行查詢
?

17.1?帶key消息生產(chǎn)者
????17.2?帶key消息消費(fèi)者
?
?
?

18.?RocketMQ重試機(jī)制
18.1?生產(chǎn)者重試
// 失敗的情況重發(fā)3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S內(nèi)沒有發(fā)送成功,就會(huì)重試
producer.send(msg, 1000);
18.2?消費(fèi)者重試
在消費(fèi)者放return ConsumeConcurrentlyStatus.RECONSUME_LATER;后就會(huì)執(zhí)行重試
上圖代碼中說(shuō)明了,我們?cè)賹?shí)際生產(chǎn)過(guò)程中,一般重試5-7次,如果還沒有消費(fèi)成功,則可以把消息簽收了,通知人工等處理
messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
19.?RocketMQ死信消息
當(dāng)消費(fèi)重試到達(dá)閾值以后,消息不會(huì)被投遞給消費(fèi)者了,而是進(jìn)入了死信隊(duì)列
19.1?消息生產(chǎn)者
19.2?消息消費(fèi)者
19.3?信消費(fèi)者
注意權(quán)限問題 perm 2讀 ??4寫 ??6讀寫
?

19.4?控制臺(tái)顯示
?



由于字?jǐn)?shù)限制本文只分享了一半哦
完整版獲取可私信小動(dòng)~
更多干貨我們下期再說(shuō)!
下期會(huì)分享
第二十章節(jié)
秒殺
相關(guān)知識(shí)~
下期見!
