rocketmq存儲
Ⅰ RocketMQ第五講
broker是RocketMQ的核心,核心工作就是接收生成這的消息,進行存儲。同時,收到消費者的請求後,從磁碟讀取內容,把結果返回給消費者。
消息主體以及元數據的存儲主體,存儲Procer端寫入的消息主體內容,消息內容不是定長的。單個文件大小默認1G ,文件名長度為20位,左邊補零,剩餘為起始偏移量,比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當第一個文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序寫入日誌文件,當文件滿了,寫入下一個文件;
CommitLog文件中保存了消息的全量內容。不同的Topic的消息,在CommitLog都是順序存放的。就是來一個消息,不管Topic是什麼,直接追加的CommitLog中。
broker啟動了一個專門的線程來構建索引,把CommitLog中的消息,構建了兩種類型的索引。ConsumerQueue和Index。正常消費的時候,是根據Topic來消費,會用到ConsumerQueue索引。
也可根據返回的offsetMsgId,解析出ip,埠和CommitLog中的物理消息偏移量,直接去CommitLog中取數據。
引入的目的主要是提高消息消費的性能,由於RocketMQ是基於主題topic的訂閱模式,消息消費是針對主題進行的,如果要遍歷commitlog文件中根據topic檢索消息是非常低效的。Consumer即可根據ConsumeQueue來查找待消費的消息。
其中,ConsumeQueue(邏輯消費隊列)作為消費消息的索引,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基於topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結構,具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue文件採取定長設計,每一個條目共20個位元組,分別為8位元組的commitlog物理偏移量、4位元組的消息長度、8位元組tag hashcode,單個文件由30W個條目組成,可以像數組一樣隨機訪問每一個條目,每個ConsumeQueue文件大小約5.72M。
IndexFile(索引文件)提供了一種可以通過key或時間區間來查詢消息的方法。Index文件的存儲位置是: {fileName},文件名fileName是以創建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設計為在文件系統中實現HashMap結構,故rocketmq的索引文件其底層實現為hash索引。
按照Message Key查詢消息的時候,會用到這個索引文件。
IndexFile索引文件為用戶提供通過「按照Message Key查詢消息」的消息索引查詢服務,IndexFile文件的存儲位置是: {fileName},文件名fileName是以創建時的時間戳命名的,文件大小是固定的,等於40+500W 4+2000W 20= 420000040個位元組大小。如果消息的properties中設置了UNIQ_KEY這個屬性,就用 topic + 「#」 + UNIQ_KEY的value作為 key 來做寫入操作。如果消息設置了KEYS屬性(多個KEY以空格分隔),也會用 topic + 「#」 + KEY 來做索引。
其中的索引數據包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 這四個欄位,一共20 Byte。NextIndex offset 即前面讀出來的 slotValue,如果有 hash沖突,就可以用這個欄位將所有沖突的索引用鏈表的方式串起來了。Timestamp記錄的是消息storeTimestamp之間的差,並不是一個絕對的時間。整個Index File的結構如圖,40 Byte 的Header用於保存一些總的統計信息,4 500W的 Slot Table並不保存真正的索引數據,而是保存每個槽位對應的單向鏈表的頭。20 2000W 是真正的索引數據,即一個 Index File 可以保存 2000W個索引。
「按照Message Key查詢消息」的方式,RocketMQ的具體做法是,主要通過Broker端的QueryMessageProcessor業務處理器來查詢,讀取消息的過程就是用topic和key找到IndexFile索引文件中的一條記錄,根據其中的commitLog offset從CommitLog文件中讀取消息的實體內容。
RocketMQ中有兩個核心模塊,remoting模塊和store模塊。remoting模塊在NameServer,Proce,Consumer和Broker都用到。store只在Broker中用到,包含了存儲文件操作的API,對消息實體的操作是通過DefaultMessageStore進行操作。
屬性和方法很多,就不往這里放了。
文件存儲實現類,包括多個內部類
· 對於文件夾下的一個文件
上面介紹了broker的核心業務流程和架構,關鍵介面和類,啟動流程。最後介紹一下broker的線程模型,只有知道了線程模型,才能大概知道前面介紹的那些事如何協同工作的,對broker才能有一個立體的認識。
RocketMQ的RPC通信採用Netty組件作為底層通信庫,同樣也遵循了Reactor多線程模型,同時又在這之上做了一些擴展和優化。關於Reactor線程模型,可以看看我之前寫的這篇文檔: Reactor線程模型
上面的框圖中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多線程模型。一個 Reactor 主線程(eventLoopGroupBoss,即為上面的1)負責監聽 TCP網路連接請求,建立好連接,創建SocketChannel,並注冊到selector上。RocketMQ的源碼中會自動根據OS的類型選擇NIO和Epoll,也可以通過參數配置),然後監聽真正的網路數據。拿到網路數據後,再丟給Worker線程池(eventLoopGroupSelector,即為上面的「N」,源碼中默認設置為3),在真正執行業務邏輯之前需要進行SSL驗證、編解碼、空閑檢查、網路連接管理,這些工作交給defaultEventExecutorGroup(即為上面的「M1」,源碼中默認設置為8)去做。而處理業務操作放在業務線程池中執行,根據 RomotingCommand 的業務請求碼code去processorTable這個本地緩存變數中找到對應的 processor,然後封裝成task任務後,提交給對應的業務processor處理線程池來執行(sendMessageExecutor,以發送消息為例,即為上面的 「M2」)。
上面的圖和這段畫是從官方文檔抄過來的,但是文字和圖對應的不是很好,畫的也不夠詳細,但是主要流程是這個樣子。以後有時間了,我重新安裝自己的理解,畫一張更詳細的圖。
AsyncAppender-Worker-Thread-0:非同步列印日誌,logback使用,應該是守護線程
FileWatchService:
NettyEventExecutor:
NettyNIOBoss_:一個
NettyServerNIOSelector_:默認為三個
NSScheledThread:定時任務線程
ServerHouseKeepingService:守護線程
ThreadDeathWatch-2-1:守護線程,Netty用,已經廢棄
RemotingExecutorThread(1-8):工作線程池,沒有共用NettyServerNIOSelector_,直接初始化8個線程
AsyncAppender-Worker-Thread-0:非同步列印日誌,logback使用,共九個:
RocketmqBrokerAppender_inner
RocketmqFilterAppender_inner
RocketmqProtectionAppender_inner
RocketmqRemotingAppender_inner
RocketmqRebalanceLockAppender_inner
RocketmqStoreAppender_inner
RocketmqStoreErrorAppender_inner
RocketmqWaterMarkAppender_inner
RocketmqTransactionAppender_inner
SendMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_MESSAGE
PullMessageThread_:remotingServer.registerProcessor(RequestCode.PULL_MESSAGE
ProcessReplyMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE
QueryMessageThread_:remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE
AdminBrokerThread_:remotingServer.registerDefaultProcessor
ClientManageThread_:remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT
HeartbeatThread_:remotingServer.registerProcessor(RequestCode.HEART_BEAT
EndTransactionThread_:remotingServer.registerProcessor(RequestCode.END_TRANSACTION
ConsumerManageThread_:remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,RequestCode.UPDATE_CONSUMER_OFFSET,RequestCode.QUERY_CONSUMER_OFFSET
brokerOutApi_thread_:BrokerController.registerBrokerAll(true, false, true);
==================================================================
BrokerControllerScheledThread:=>
BrokerController.this.getBrokerStats().record();
BrokerController.this.consumerOffsetManager.persist();
BrokerController.this.consumerFilterManager.persist();
BrokerController.this.protectBroker();
BrokerController.this.printWaterMark();
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
BrokerController.this.printMasterAndSlaveDiff();
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
:=>
:=>
FilterServerManager.this.createFilterServer();
:=>
ClientHousekeepingService.this.scanExceptionChannel();
PullRequestHoldService
FileWatchService
AllocateMappedFileService
AcceptSocketService
BrokerStatsThread1
Ⅱ RocketMQ 之 IoT 消息解析:物聯網需要什麼樣的消息技術
隨著技術趨勢的變遷,RocketMQ 5.0 作為關鍵角色,助力物聯網(IoT)的崛起。面對物聯網的快速發展和數據規模挑戰,消息中間件的需求也在升級。物聯網設備數量預計將達到200億,實時數據佔比90%,這推動了對消息技術的新需求,如低延遲處理和邊緣計算的結合。
物聯網消息技術的核心在於連接設備、處理數據流,並支持邊緣計算。RocketMQ 5.0 通過子產品RocketMQ - MQTT,採用MQTT協議,考慮到物聯網設備的弱網和低算力特性,提供輕量級的發布訂閱功能。其架構特點包括存算分離和端雲一體化,以降低存儲成本並支持大規模連接的實時推送。
物聯網場景對消息技術的訴求包括支持設備間和設備與雲端的通信,以及實時流處理。RocketMQ - MQTT通過讀放大為主的存儲模型,結合MQTT協議的復雜訂閱機制,實現高效的消息傳遞。同時,它引入端雲一體化存儲,允許服務端和物聯網設備共享消息,簡化集成,提高效率。
為了支持百萬級隊列,RocketMQ採用Rocksdb引擎,提供高性能的順序寫入和原子性操作,保證數據一致性。推送模型則採用推拉結合的方式,通過Proxy節點進行消息匹配和可靠觸達,兼顧性能和擴展性。
總的來說,RocketMQ 5.0的物聯網優化旨在提供一個高效、兼容和可擴展的解決方案,適應物聯網技術的快速發展和挑戰,幫助企業在數字化轉型中簡化架構,降低成本,提升用戶體驗。
Ⅲ RocketMQ架構分析
RocketMQ是阿里巴巴捐贈給appache的MQ開源組件,從架構上我們分析一下。
kafka是依靠Zookeeper進行集群選舉的,在rocketMQ的同樣位置上是NameServer,這個Nameserver僅僅是注冊服務,沒有選舉能力。每個broker都和NameServer進行連接,通過心跳維持狀態。
procer和consumer定時到Nameserver拉取broker信息,並且和自己所消費的broker建立連接。這就和微服務的體系一模一樣了。
那麼rocketMQ的集群選舉怎麼實現的呢,通過集成了Dledge實現,Dledge是個jar包,實現了raft演算法。
如圖,topic可在多個broker上形成分片,procer可寫數據到不通的分片,分片信息也可以由不同的group進行消費。
如下介紹存儲,rocketMQ可配置主備,形成主備復制。
http://rocketmq.apache.org/rocketmq/how-to-support-more-queues-in-rocketmq/ 介紹了rocketMQ存儲設計初衷,和kafka存儲不同,kafka在每個partition中存儲了數據,而RocketMQ將實際消息集中存儲,在messageQueue中存儲的是元數據信息,通過元數據信息可以索引到CommitLog。
對於保存的數據,每天會刪除數據;如果磁碟滿,超過設置閾值,則不允許寫入數據。
RocketMQ的設計確保了消息的並發處理能力,但是有時候,消息是有狀態的,即有順序,RocketMQ怎麼實現呢?
發送到臨時緩存,到達延遲時間後由delay service路由給topic。
如果消費返回了consumer_later,則如上述延遲消息一樣,會延遲一段時間,進入死信隊列,消費死信隊列,重新處理。
如果業務規模小,不會改源碼,就選用RabbitMQ;如果業務規模大,不允許丟消息,追求效率高,用RocketMQ;如果業務規模大,運行少量丟消息,吞吐量大,用Kafka;如果用於大數據,毫無疑問選kafka。