kafka存儲
A. 【kafka】kafka理論之partition & replication
基於分區和副本集的相關知識,初步了解Kafka的數據存儲、同步原理。
對於消息的生產以及消費邏輯不在本文的討論范疇,主要就Broker的數據存儲做以淺顯的總結。
首先解釋一下常見的相關專業術語:
Partition是作用於具體的Topic而言的,而不是一個獨立的概念。Partition能水平擴展客戶端的讀寫性能,是高吞吐量的 保障。
通俗的將,Partition就是一塊保存具體數據的空間,本質就是磁碟上存放數據的文件夾,所以Partition是不能跨Broker存在,也不能在同一個Broker上跨磁碟。
對於一個Topic,可以根據需要設定Partition的個數;Kafka默認的Partition個數num.partitions為1($KAFKA_HOME/config/server.properties),表示該Topic的所有數據均寫入至一個文件夾下;用戶也可以在新建Topic的時候通過顯示的指定--partitions <integer>參數實現自定義Partition個數。
在數據持久化時,每條消息都是根據一定的分區規則路由到對應的Partition中,並append在log文件的尾部(這一點類似於HDFS);在同一個Partition中消息是順序寫入的且始終保持有序性;但是不同Partition之間不能保證消息的有序性(高吞吐量的保障)。
Kafka也支持動態增加一個已存在Topic的Partition個數,但不支持動態減少Partition個數。因為被減少Partition所對應的數據處理是個難題; 由於Kafka的數據寫模式的限制,所以如果要把這些Partition的歷史數據集追加到有效的Partition的尾部,就會破壞了Kafka在Partition上消息的有序性,顯然是不合理的;但如果按照時間戳重新構分區的數據文件,可操作性和難度都將是非常大的,所以目前並不支持動態減少Partition個數。
Partition是用來存儲數據的,但並不是最小的數據存儲單元。Partition下還可以細分成Segment,每個Partition是由一個或多個Segment組成。每個Segment分別對應兩個文件: 一個是以.index結尾的索引文件,另一個是以.log結尾的數據文件,且兩個文件的文件名完全相同。 所有的Segment均存在於所屬Partition的目錄下。
Segment的必要性:如果以partition作為數據存儲的最小單元,那麼partition將會是一個很大的數據文件,且數據量是持續遞增的;當進行過期數據清理或消費指定offset數據時,操作如此的大文件將會是一個很嚴重的性能問題。
Replication是Kafka架構中一個比較重要的概念,是系統高可用的一種保障。
Replication邏輯上是作用於Topic的,但實際上是體現在每一個Partition上。
例如:有一個Topic,分區(partitions)數為3(分別為a, b, c),副本因子(replication-factor)數也為3;其本質就是該Topic一共有3個a分區,3個b分區,3個c分區。這樣的設計在某種意義上就很大程度的提高了系統的容錯率。
那麼問題來了:一個Topic下a分區一共有三個,既然是副本集,那這三個所包含的數據都完全一樣嗎?作用都一樣嗎?
說到這就不得不引出兩個概念:Leader Replica & Follower Replica
leader partition(主分區) & leader replica(主副本集)其實這兩個概念是一回事,因為副本集策略只是一種機制,是為了提高可用性而生的。
這種策略就是作用於partition上的,通俗的說增加副本集個數其實就是增加同一個partition的備份個數,同樣的對於主分區而言,就是同一個partition下所有備份中的主副本集。
同一個topic下的不同partition之間是沒有主次之分,都是同等重要且存儲不同數據的。
當新建一個topic,並指定partition個數後,會在log.dirs參數($KAFKA_HOME/config/server.properties)所指定的目錄下創建對應的分區目錄,用來存儲落到該分區上的數據。
分區目錄的命名格式為:topic名稱 + 短橫線 + 分區序號;序號默認從0開始,最大為分區數 - 1。
為了盡可能的提升服務的可用性和容錯率,Kafka遵循如下的分區分配原則:
如集群中有四個節點,均在統一機架上,新建一個topic:demoTopic,指定分區個數為4,副本因子為3,則對應的partition目錄分別為:demoTopic-0、demoTopic-1、demoTopic-2、demoTopic-3。
因為集群未跨機架,所以在這里主要驗證一下前兩條分區分配原則:四個主分區分別位於四個不同的broker上,且另外兩個replica也隨機分配到除leader所在節點以外的其他三個broker上。
每個Partition全局的第一個Segment文件名均是從0開始,後續每個Segment的文件名為上一個Segment文件中最後一條消息的offset值;數據的大小為64位,20位數字字元的長度,未用到的用0填充。同一個Segment的.index文件和.log文件的文件名完全相同。
這種命名格式的好處在於可以有效的規避單文件數據量過大導致的操作難問題,不僅如此,還可以方便、快速的定位數據。
例如:要實現從指定offset處開始讀取數據,只需要根據給定的offset值與對應Partition下的segment文件名所比對,就可以快速的定位目標數據所在的segment文件,然後根據目標segment的.index文件查找給定offset值所對應的實際磁碟偏移量,即可快速在.log中讀取目標數據。
在Kafka 0.10.1.0以後,對於每個Segment文件,在原有的.index和.log文件的基礎上,新增加一個.timeindex文件,通過該索引文件 可以實現基於時間戳操作消息的功能,具體實現詳見Kafka Timestamp。
Kafka中所說的Offset本質上是一個邏輯值,代表的是目標數據對應在Partition上的偏移量;而數據在磁碟上的實際偏移量是存儲在對應Segment的.index文件中。
通過簡單介紹replica之間的offset的變化和更新邏輯,來初步了解Kafka的數據同步機制。
首先引入幾個概念:Offset & Replica相關概念
清楚LEO、HW和ISR之間的相互關系是了解Kafka底層數據同步的關鍵:
Kafka取Partition所對應的ISR中最小的LEO作為整個Partition的HW;
每個Partition都會有自己獨立的HW,與此同時leader和follower都會負責維護和更新自己的HW。
對於leader新寫入的消息,Consumer不能立刻被發現並進行消費,leader會等待該消息被ISR中所有的replica同步更新HW後,此時leader才會更新該partition的HW為之前新寫入消息的offset,此時該消息對外才可見。
在分布式架構中,服務的可用性和數據的一致性是一個繞不開的話題,Kafka也不例外。
如上文所說:當leader接受到一條消息後,需要等待ISR中所有的replica都同步復制完成以後,該消息才能被消費。
如果在同步的過程中,ISR中如果有follower replica的同步落後延遲超過了閾值,則會被leader從ISR中剔除;只要ISR中所有的replica均同步成功,則該消息就一定不會丟失。
從數據的角度出發,這種方式很契合一致性的需求,但是當集群的節點數較多,ISR隊里的副本數變大時,每條消息的同步時長可能並不是所有業務場景所能容忍的,所以Kafka在Procer階段通過 request.required.acks 參數提供了不同類型的應答機制以方便用戶在系統吞吐量和一致性之間進行權衡:
假如一個Partition有兩個Replica,A(Leader)中包含的數據為a, b, c, d, e,LEO為5;B(Follower)包含的數據為a, b, c,LEO為3;此時該Partition的HW為3,Consumer可見的消息為a, b, c,系統對外表示正常;
當follower還未來得及同步消息d、e時,leader掛了,此時B變成Leader,並且Procer重新發了兩條消息f和g;因為此時系統中只有B一個存活,所以Partition對外的HW這會更新為5沒有問題,Consumer可見的內容為a, b, c, f, g;此時A被喚醒並作為Follower開始從Leader中拉取數據,因為follower自身的HW等於Leader的HW,所以B沒有拉去到任何數據,當Procer繼續發送消息時,就會導致副本A、B的數據集不一致。
這個問題在0.11.0.0中通過 leader epoch機制 來消除該問題,可以把epoch理解為代(版本)的概念,即每一次的leader對應一個唯一的epoch,如果leader更換,則對應的epoch值也會隨之更換,而過期的epoch請求則都會被忽略。
Kafka——broker宕機後無法消費問題
https://www.cnblogs.com/caoweixiong/p/12048276.html
B. Zookeeper 在 Kafka 中的作用
如上圖所示,kafaka集群的 broker,和 Consumer 都需要連接 Zookeeper。
Procer 直接連接 Broker。
Procer 把數據上傳到 Broker,Procer可以指定數據有幾個分區、幾個備份。上面的圖中,數據有兩個分區 0、1,每個分區都有自己的副本:0'、 1'。
黃色的分區為 leader,白色的為 follower。
leader 處理 partition 的所有讀寫請求,與此同時,follower會被動定期地去復制leader上的數據。 如下圖所示,紅色的為 leader,綠色的為 follower,leader復制自己到其他 Broker 中:
Topic 分區被放在不同的 Broker 中,保證 Procer 和 Consumer 錯開訪問 Broker,避免訪問單個 Broker造成過度的IO壓力,使得負載均衡。
Broker是分布式部署並且相互之間相互獨立,但是需要有一個注冊系統能夠將整個集群中的Broker管理起來 ,此時就使用到了Zookeeper。在Zookeeper上會有一個專門 用來進行Broker伺服器列表記錄 的節點:
/brokers/ids
每個Broker在啟動時,都會到Zookeeper上進行注冊,即到/brokers/ids下創建屬於自己的節點,如/brokers/ids/[0...N]。
Kafka使用了全局唯一的數字來指代每個Broker伺服器,不同的Broker必須使用不同的Broker ID進行注冊,創建完節點後, 每個Broker就會將自己的IP地址和埠信息記錄 到該節點中去。其中,Broker創建的節點類型是臨時節點,一旦Broker宕機,則對應的臨時節點也會被自動刪除。
在Kafka中,同一個 Topic的消息會被分成多個分區 並將其分布在多個Broker上, 這些分區信息及與Broker的對應關系 也都是由Zookeeper在維護,由專門的節點來記錄,如:
/borkers/topics
Kafka中每個Topic都會以/brokers/topics/[topic]的形式被記錄,如/brokers/topics/login和/brokers/topics/search等。Broker伺服器啟動後,會到對應Topic節點(/brokers/topics)上注冊自己的Broker ID並寫入針對該Topic的分區總數,如/brokers/topics/login/3->2,這個節點表示Broker ID為3的一個Broker伺服器,對於"login"這個Topic的消息,提供了2個分區進行消息存儲,同樣,這個分區節點也是臨時節點。
由於同一個Topic消息會被分區並將其分布在多個Broker上,因此, 生產者需要將消息合理地發送到這些分布式的Broker上 ,那麼如何實現生產者的負載均衡,Kafka支持傳統的四層負載均衡,也支持Zookeeper方式實現負載均衡。
(1) 四層負載均衡,根據生產者的IP地址和埠來為其確定一個相關聯的Broker。通常,一個生產者只會對應單個Broker,然後該生產者產生的消息都發往該Broker。這種方式邏輯簡單,每個生產者不需要同其他系統建立額外的TCP連接,只需要和Broker維護單個TCP連接即可。但是,其無法做到真正的負載均衡,因為實際系統中的每個生產者產生的消息量及每個Broker的消息存儲量都是不一樣的,如果有些生產者產生的消息遠多於其他生產者的話,那麼會導致不同的Broker接收到的消息總數差異巨大,同時,生產者也無法實時感知到Broker的新增和刪除。
(2) 使用Zookeeper進行負載均衡,由於每個Broker啟動時,都會完成Broker注冊過程,生產者會通過該節點的變化來動態地感知到Broker伺服器列表的變更,這樣就可以實現動態的負載均衡機制。
與生產者類似,Kafka中的消費者同樣需要進行負載均衡來實現多個消費者合理地從對應的Broker伺服器上接收消息,每個消費者分組包含若干消費者, 每條消息都只會發送給分組中的一個消費者 ,不同的消費者分組消費自己特定的Topic下面的消息,互不幹擾。
消費組 (Consumer Group):
consumer group 下有多個 Consumer(消費者)。
對於每個消費者組 (Consumer Group),Kafka都會為其分配一個全局唯一的Group ID,Group 內部的所有消費者共享該 ID。訂閱的topic下的每個分區只能分配給某個 group 下的一個consumer(當然該分區還可以被分配給其他group)。
同時,Kafka為每個消費者分配一個Consumer ID,通常採用"Hostname:UUID"形式表示。
在Kafka中,規定了 每個消息分區 只能被同組的一個消費者進行消費 ,因此,需要在 Zookeeper 上記錄 消息分區 與 Consumer 之間的關系,每個消費者一旦確定了對一個消息分區的消費權力,需要將其Consumer ID 寫入到 Zookeeper 對應消息分區的臨時節點上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id]就是一個 消息分區 的標識,節點內容就是該 消息分區 上 消費者的Consumer ID。
在消費者對指定消息分區進行消息消費的過程中, 需要定時地將分區消息的消費進度Offset記錄到Zookeeper上 ,以便在該消費者進行重啟或者其他消費者重新接管該消息分區的消息消費後,能夠從之前的進度開始繼續進行消息消費。Offset在Zookeeper中由一個專門節點進行記錄,其節點路徑為:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
節點內容就是Offset的值。
消費者伺服器在初始化啟動時加入消費者分組的步驟如下
注冊到消費者分組。每個消費者伺服器啟動時,都會到Zookeeper的指定節點下創建一個屬於自己的消費者節點,例如/consumers/[group_id]/ids/[consumer_id],完成節點創建後,消費者就會將自己訂閱的Topic信息寫入該臨時節點。
對 消費者分組 中的 消費者 的變化注冊監聽 。每個 消費者 都需要關注所屬 消費者分組 中其他消費者伺服器的變化情況,即對/consumers/[group_id]/ids節點注冊子節點變化的Watcher監聽,一旦發現消費者新增或減少,就觸發消費者的負載均衡。
對Broker伺服器變化注冊監聽 。消費者需要對/broker/ids/[0-N]中的節點進行監聽,如果發現Broker伺服器列表發生變化,那麼就根據具體情況來決定是否需要進行消費者負載均衡。
進行消費者負載均衡 。為了讓同一個Topic下不同分區的消息盡量均衡地被多個 消費者 消費而進行 消費者 與 消息 分區分配的過程,通常,對於一個消費者分組,如果組內的消費者伺服器發生變更或Broker伺服器發生變更,會發出消費者負載均衡。
以下是kafka在zookeep中的詳細存儲結構圖:
早期版本的 kafka 用 zk 做 meta 信息存儲,consumer 的消費狀態,group 的管理以及 offse t的值。考慮到zk本身的一些因素以及整個架構較大概率存在單點問題,新版本中確實逐漸弱化了zookeeper的作用。新的consumer使用了kafka內部的group coordination協議,也減少了對zookeeper的依賴
C. kafka的原理是什麼
在Kafka中的每一條消息都有一個topic。一般來說在我們應用中產生不同類型的數據,都可以設置不同的主題。一個主題一般會有多個消息的訂閱者,當生產者發布消息到某個主題時,訂閱了這個主題的消費者都可以接收到生產者寫入的新消息。
kafka為每個主題維護了分布式的分區(partition)日誌文件,每個partition在kafka存儲層面是append log。
任何發布到此partition的消息都會被追加到log文件的尾部,在分區中的每條消息都會按照時間順序分配到一個單調遞增的順序編號,也就是我們的offset,offset是一個long型的數字,通過這個offset可以確定一條在該partition下的唯一消息。在partition下面是保證了有序性,但是在topic下面沒有保證有序性。
(3)kafka存儲擴展閱讀
procer選擇一個topic,生產消息,消息會通過分配策略append到某個partition末尾。
consumer選擇一個topic,通過id指定從哪個位置開始消費消息。消費完成之後保留id,下次可以從這個位置開始繼續消費,也可以從其他任意位置開始消費。
保證了消息不變性,為並發消費提供了線程安全的保證。每個 consumer都保留自己的offset,互相之間不幹擾,不存在線程安全問題。
消息訪問的並行高效性。每個topic中的消息被組織成多個partition,partition均勻分配到集群server中。生產、消費消息的時候,會被路由到指定partition,減少競爭,增加了程序的並行能力。
D. 怎麼設置kafka topic數據存儲時間
1、Kafka創建topic命令很簡單,一條命令足矣:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test 。
E. kafka是幹嘛的
Kafka是由Apache軟體基金會開發的一個開源流處理平台,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。
這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網路上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。
對於像Hadoop一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行載入機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。
主要特性
Kafka是一種高吞吐量 的分布式發布訂閱消息系統,有如下特性:
通過O(1)的磁碟數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能。
高吞吐量:即使是非常普通的硬體Kafka也可以支持每秒數百萬的消息。
支持通過Kafka伺服器和消費機集群來分區消息。
支持Hadoop並行數據載入。
Kafka通過官網發布了最新版本3.0.0。
以上內容來自 網路-kafka
F. 消息隊列原理及選型
消息隊列(Message Queue)是一種進程間通信或同一進程的不同線程間的通信方式。
Broker(消息伺服器)
Broker的概念來自與Apache ActiveMQ,通俗的講就是MQ的伺服器。
Procer(生產者)
業務的發起方,負責生產消息傳輸給broker
Consumer(消費者)
業務的處理方,負責從broker獲取消息並進行業務邏輯處理
Topic(主題)
發布訂閱模式下的消息統一匯集地,不同生產者向topic發送消息,由MQ伺服器分發到不同的訂閱 者,實現消息的廣播
Queue(隊列)
PTP模式下,特定生產者向特定queue發送消息,消費者訂閱特定的queue完成指定消息的接收。
Message(消息體)
根據不同通信協議定義的固定格式進行編碼的數據包,來封裝業務數據,實現消息的傳輸
點對點模型用於消息生產者和消息消費者之間點到點的通信。
點對點模式包含三個角色:
每個消息都被發送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留著消息,可以放在內存 中也可以持久化,直到他們被消費或超時。
特點:
發布訂閱模型包含三個角色:
多個發布者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。
特點:
AMQP即Advanced Message Queuing Protocol,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP 的主要特徵是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
優點:可靠、通用
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是IBM開發的一個即時通訊協議,有可能成為物聯網的重要組成部分。該協議支持所有平台,幾乎可以把所有聯網物品和外部連接起來,被用來當做感測器和致動器(比如通過Twitter讓房屋聯網)的通信協議。
優點:格式簡潔、佔用帶寬小、移動端通信、PUSH、嵌入式系統
STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息協議,是一種為MOM(Message Oriented Middleware,面向消息的中間件)設計的簡單文本協議。STOMP提供一個可互操作的連接格式,允許客戶端與任意STOMP消息代理(Broker)進行交互。
優點:命令模式(非topicqueue模式)
XMPP(可擴展消息處理現場協議,Extensible Messaging and Presence Protocol)是基於可擴展標記語言(XML)的協議,多用於即時消息(IM)以及在線現場探測。適用於伺服器之間的准即時操作。核心是基於XML流傳輸,這個協議可能最終允許網際網路用戶向網際網路上的其他任何人發送即時消息,即使其操作系統和瀏覽器不同。
優點:通用公開、兼容性強、可擴展、安全性高,但XML編碼格式佔用帶寬大
RabbitMQ 是實現 AMQP(高級消息隊列協議)的消息中間件的一種,最初起源於金融系統,用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。 RabbitMQ 主要是為了實現系統之間的雙向解耦而實現的。當生產者大量產生數據時,消費者無法快速消費,那麼需要一個中間層。保存這個數據。
RabbitMQ 是一個開源的 AMQP 實現,伺服器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
Channel(通道)
道是兩個管理器之間的一種單向點對點的的通信連接,如果需要雙向交流,可以建立一對通道。
Exchange(消息交換機)
Exchange類似於數據通信網路中的交換機,提供消息路由策略。
RabbitMq中,procer不是通過信道直接將消息發送給queue,而是先發送給Exchange。一個Exchange可以和多個Queue進行綁定,procer在傳遞消息的時候,會傳遞一個ROUTING_KEY,Exchange會根據這個ROUTING_KEY按照特定的路由演算法,將消息路由給指定的queue。和Queue一樣,Exchange也可設置為持久化,臨時或者自動刪除。
Exchange有4種類型:direct(默認),fanout, topic, 和headers。
不同類型的Exchange轉發消息的策略有所區別:
Binding(綁定)
所謂綁定就是將一個特定的 Exchange 和一個特定的 Queue 綁定起來。Exchange 和Queue的綁定可以是多對多的關系。
Routing Key(路由關鍵字)
exchange根據這個關鍵字進行消息投遞。
vhost(虛擬主機)
在RabbitMq server上可以創建多個虛擬的message broker,又叫做virtual hosts (vhosts)。每一個vhost本質上是一個mini-rabbitmq server,分別管理各自的exchange,和bindings。vhost相當於物理的server,可以為不同app提供邊界隔離,使得應用安全的運行在不同的vhost實例上,相互之間不會干擾。procer和consumer連接rabbit server需要指定一個vhost。
假設P1和C1注冊了相同的Broker,Exchange和Queue。P1發送的消息最終會被C1消費。
基本的通信流程大概如下所示:
Consumer收到消息時需要顯式的向rabbit broker發送basic。ack消息或者consumer訂閱消息時設置auto_ack參數為true。
在通信過程中,隊列對ACK的處理有以下幾種情況:
即消息的Ackownledge確認機制,為了保證消息不丟失,消息隊列提供了消息Acknowledge機制,即ACK機制,當Consumer確認消息已經被消費處理,發送一個ACK給消息隊列,此時消息隊列便可以刪除這個消息了。如果Consumer宕機/關閉,沒有發送ACK,消息隊列將認為這個消息沒有被處理,會將這個消息重新發送給其他的Consumer重新消費處理。
消息的收發處理支持事務,例如:在任務中心場景中,一次處理可能涉及多個消息的接收、處理,這應該處於同一個事務范圍內,如果一個消息處理失敗,事務回滾,消息重新回到隊列中。
消息的持久化,對於一些關鍵的核心業務來說是非常重要的,啟用消息持久化後,消息隊列宕機重啟後,消息可以從持久化存儲恢復,消息不丟失,可以繼續消費處理。
fanout 模式
模式特點:
direct 模式
任何發送到Direct Exchange的消息都會被轉發到routing_key中指定的Queue。
如果一個exchange 聲明為direct,並且bind中指定了routing_key,那麼發送消息時需要同時指明該exchange和routing_key。
簡而言之就是:生產者生成消息發送給Exchange, Exchange根據Exchange類型和basic_publish中的routing_key進行消息發送 消費者:訂閱Exchange並根據Exchange類型和binding key(bindings 中的routing key) ,如果生產者和訂閱者的routing_key相同,Exchange就會路由到那個隊列。
topic 模式
前面講到direct類型的Exchange路由規則是完全匹配binding key與routing key,但這種嚴格的匹配方式在很多情況下不能滿足實際業務需求。
topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage相似,也是將消息路由到binding key與routing key相匹配的Queue中,但這里的匹配規則有些不同。
它約定:
以上圖中的配置為例,routingKey=」quick.orange.rabbit」的消息會同時路由到Q1與Q2,routingKey=」lazy.orange.fox」的消息會路由到Q1,routingKey=」lazy.brown.fox」的消息會路由到Q2,routingKey=」lazy.pink.rabbit」的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=」quick.brown.fox」、routingKey=」orange」、routingKey=」quick.orange.male.rabbit」的消息將會被丟棄,因為它們沒有匹配任何bindingKey。
RabbitMQ,部署分三種模式:單機模式,普通集群模式,鏡像集群模式。
普通集群模式
多台機器部署,每個機器放一個rabbitmq實例,但是創建的queue只會放在一個rabbitmq實例上,每個實例同步queue的元數據。
如果消費時連的是其他實例,那個實例會從queue所在實例拉取數據。這就會導致拉取數據的開銷,如果那個放queue的實例宕機了,那麼其他實例就無法從那個實例拉取,即便開啟了消息持久化,讓rabbitmq落地存儲消息的話,消息不一定會丟,但得等這個實例恢復了,然後才可以繼續從這個queue拉取數據, 這就沒什麼高可用可言,主要是提供吞吐量 ,讓集群中多個節點來服務某個queue的讀寫操作。
鏡像集群模式
queue的元數據和消息都會存放在多個實例,每次寫消息就自動同步到多個queue實例里。這樣任何一個機器宕機,其他機器都可以頂上,但是性能開銷太大,消息同步導致網路帶寬壓力和消耗很重,另外,沒有擴展性可言,如果queue負載很重,加機器,新增的機器也包含了這個queue的所有數據,並沒有辦法線性擴展你的queue。此時,需要開啟鏡像集群模式,在rabbitmq管理控制台新增一個策略,將數據同步到指定數量的節點,然後你再次創建queue的時候,應用這個策略,就會自動將數據同步到其他的節點上去了。
Kafka 是 Apache 的子項目,是一個高性能跨語言的分布式發布/訂閱消息隊列系統(沒有嚴格實現 JMS 規范的點對點模型,但可以實現其效果),在企業開發中有廣泛的應用。高性能是其最大優勢,劣勢是消息的可靠性(丟失或重復),這個劣勢是為了換取高性能,開發者可以以稍降低性能,來換取消息的可靠性。
一個Topic可以認為是一類消息,每個topic將被分成多個partition(區),每個partition在存儲層面是append log文件。任何發布到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個long型數字,它是唯一標記一條消息。它唯一的標記一條消息。kafka並沒有提供其他額外的索引機制來存儲offset,因為在kafka中幾乎不允許對消息進行「隨機讀寫」。
Kafka和JMS(Java Message Service)實現(activeMQ)不同的是:即使消息被消費,消息仍然不會被立即刪除。日誌文件將會根據broker中的配置要求,保留一定的時間之後刪除;比如log文件保留2天,那麼兩天後,文件會被清除,無論其中的消息是否被消費。kafka通過這種簡單的手段,來釋放磁碟空間,以及減少消息消費之後對文件內容改動的磁碟IO開支。
對於consumer而言,它需要保存消費消息的offset,對於offset的保存和使用,有consumer來控制;當consumer正常消費消息時,offset將會"線性"的向前驅動,即消息將依次順序被消費。事實上consumer可以使用任意順序消費消息,它只需要將offset重置為任意值。(offset將會保存在zookeeper中,參見下文)
kafka集群幾乎不需要維護任何consumer和procer狀態信息,這些信息有zookeeper保存;因此procer和consumer的客戶端實現非常輕量級,它們可以隨意離開,而不會對集群造成額外的影響。
partitions的設計目的有多個。最根本原因是kafka基於文件存儲。通過分區,可以將日誌內容分散到多個server上,來避免文件尺寸達到單機磁碟的上限,每個partiton都會被當前server(kafka實例)保存;可以將一個topic切分多任意多個partitions,來消息保存/消費的效率。此外越多的partitions意味著可以容納更多的consumer,有效提升並發消費的能力。(具體原理參見下文)。
一個Topic的多個partitions,被分布在kafka集群中的多個server上;每個server(kafka實例)負責partitions中消息的讀寫操作;此外kafka還可以配置partitions需要備份的個數(replicas),每個partition將會被備份到多台機器上,以提高可用性。
基於replicated方案,那麼就意味著需要對多個備份進行調度;每個partition都有一個server為"leader";leader負責所有的讀寫操作,如果leader失效,那麼將會有其他follower來接管(成為新的leader);follower只是單調的和leader跟進,同步消息即可。由此可見作為leader的server承載了全部的請求壓力,因此從集群的整體考慮,有多少個partitions就意味著有多少個"leader",kafka會將"leader"均衡的分散在每個實例上,來確保整體的性能穩定。
Procers
Procer將消息發布到指定的Topic中,同時Procer也能決定將此消息歸屬於哪個partition;比如基於"round-robin"方式或者通過其他的一些演算法等。
Consumers
本質上kafka只支持Topic。每個consumer屬於一個consumer group;反過來說,每個group中可以有多個consumer。發送到Topic的消息,只會被訂閱此Topic的每個group中的一個consumer消費。
如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息將會在consumers之間負載均衡。
如果所有的consumer都具有不同的group,那這就是"發布-訂閱";消息將會廣播給所有的消費者。
在kafka中,一個partition中的消息只會被group中的一個consumer消費;每個group中consumer消息消費互相獨立;我們可以認為一個group是一個"訂閱"者,一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以消費多個partitions中的消息。kafka只能保證一個partition中的消息被某個consumer消費時,消息是順序的。事實上,從Topic角度來說,消息仍不是有序的。
Kafka的設計原理決定,對於一個topic,同一個group中不能有多於partitions個數的consumer同時消費,否則將意味著某些consumer將無法得到消息。
Guarantees
Kafka就比較適合高吞吐量並且允許少量數據丟失的場景,如果非要保證「消息可靠傳輸」,可以使用JMS。
Kafka Procer 消息發送有兩種方式(配置參數 procer.type):
對於同步方式(procer.type=sync)?Kafka Procer 消息發送有三種確認方式(配置參數 acks):
kafka的設計初衷是希望作為一個統一的信息收集平台,能夠實時的收集反饋信息,並需要能夠支撐較大的數據量,且具備良好的容錯能力。
持久性
kafka使用文件存儲消息,這就直接決定kafka在性能上嚴重依賴文件系統的本身特性。且無論任何OS下,對文件系統本身的優化幾乎沒有可能。文件緩存/直接內存映射等是常用的手段。因為kafka是對日誌文件進行append操作,因此磁碟檢索的開支是較小的;同時為了減少磁碟寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到一定閥值時,再flush到磁碟,這樣減少了磁碟IO調用的次數。
性能
需要考慮的影響性能點很多,除磁碟IO之外,我們還需要考慮網路IO,這直接關繫到kafka的吞吐量問題。kafka並沒有提供太多高超的技巧;對於procer端,可以將消息buffer起來,當消息的條數達到一定閥值時,批量發送給broker;對於consumer端也是一樣,批量fetch多條消息。不過消息量的大小可以通過配置文件來指定。對於kafka broker端,似乎有個sendfile系統調用可以潛在的提升網路IO的性能:將文件的數據映射到系統內存中,socket直接讀取相應的內存區域即可,而無需進程再次和交換。 其實對於procer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用消息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對於kafka而言,網路IO更應該需要考慮。可以將任何在網路上傳輸的消息都經過壓縮。kafka支持gzip/snappy等多種壓縮方式。
生產者
負載均衡: procer將會和Topic下所有partition leader保持socket連接;消息由procer直接通過socket發送到broker,中間不會經過任何「路由層「。事實上,消息被路由到哪個partition上,有procer客戶端決定。比如可以採用「random「「key-hash「「輪詢「等,如果一個topic中有多個partitions,那麼在procer端實現「消息均衡分發「是必要的。
其中partition leader的位置(host:port)注冊在zookeeper中,procer作為zookeeper client,已經注冊了watch用來監聽partition leader的變更事件。
非同步發送:將多條消息暫且在客戶端buffer起來,並將他們批量的發送到broker,小數據IO太多,會拖慢整體的網路延遲,批量延遲發送事實上提升了網路效率。不過這也有一定的隱患,比如說當procer失效時,那些尚未發送的消息將會丟失。
消費者
consumer端向broker發送「fetch」請求,並告知其獲取消息的offset;此後consumer將會獲得一定條數的消息;consumer端也可以重置offset來重新消費消息。
在JMS實現中,Topic模型基於push方式,即broker將消息推送給consumer端。不過在kafka中,採用了pull方式,即consumer在和broker建立連接之後,主動去pull(或者說fetch)消息;這中模式有些優點,首先consumer端可以根據自己的消費能力適時的去fetch消息並處理,且可以控制消息消費的進度(offset);此外,消費者可以良好的控制消息消費的數量,batch fetch。
其他JMS實現,消息消費的位置是有prodiver保留,以便避免重復發送消息或者將沒有消費成功的消息重發等,同時還要控制消息的狀態。這就要求JMS broker需要太多額外的工作。在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態的控制,也沒有復雜的消息確認機制,可見kafka broker端是相當輕量級的。當消息被consumer接收之後,consumer可以在本地保存最後消息的offset,並間歇性的向zookeeper注冊offset。由此可見,consumer客戶端也很輕量級。
對於JMS實現,消息傳輸擔保非常直接:有且只有一次(exactly once)。
在kafka中稍有不同:
at most once: 消費者fetch消息,然後保存offset,然後處理消息;當client保存offset之後,但是在消息處理過程中出現了異常,導致部分消息未能繼續處理。那麼此後"未處理"的消息將不能被fetch到,這就是"at most once"。
at least once: 消費者fetch消息,然後處理消息,然後保存offset。如果消息處理成功之後,但是在保存offset階段zookeeper異常導致保存操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的消息,這就是"at least once",原因offset沒有及時的提交給zookeeper,zookeeper恢復正常還是之前offset狀態。
exactly once: kafka中並沒有嚴格的去實現(基於2階段提交,事務),我們認為這種策略在kafka中是沒有必要的。
通常情況下「at-least-once」是我們首選。(相比at most once而言,重復接收數據總比丟失數據要好)。
kafka高可用由多個broker組成,每個broker是一個節點;
創建一個topic,這個topic會劃分為多個partition,每個partition存在於不同的broker上,每個partition就放一部分數據。
kafka是一個分布式消息隊列,就是說一個topic的數據,是分散放在不同的機器上,每個機器就放一部分數據。
在0.8版本以前,是沒有HA機制的,就是任何一個broker宕機了,那個broker上的partition就廢了,沒法寫也沒法讀,沒有什麼高可用性可言。
0.8版本以後,才提供了HA機制,也就是就是replica副本機制。每個partition的數據都會同步到其他的機器上,形成自己的多個replica副本。然後所有replica會選舉一個leader出來,那麼生產和消費都跟這個leader打交道,然後其他replica就是follower。
寫的時候,leader會負責把數據同步到所有follower上去,讀的時候就直接讀leader上數據即可。
kafka會均勻的將一個partition的所有replica分布在不同的機器上,從而提高容錯性。
如果某個broker宕機了也沒事,它上面的partition在其他機器上都有副本的,如果這上面有某個partition的leader,那麼此時會重新選舉一個新的leader出來,大家繼續讀寫那個新的leader即可。這就有所謂的高可用性了。
寫數據的時候,生產者就寫leader,然後leader將數據落地寫本地磁碟,接著其他follower自己主動從leader來pull數據。一旦所有follower同步好數據了,就會發送ack給leader,leader收到所有follower的ack之後,就會返回寫成功的消息給生產者。
消息丟失會出現在三個環節,分別是生產者、mq中間件、消費者:
RabbitMQ
Kafka
大體和RabbitMQ相同。
Rabbitmq
需要保證順序的消息投遞到同一個queue中,這個queue只能有一個consumer,如果需要提升性能,可以用內存隊列做排隊,然後分發給底層不同的worker來處理。
Kafka
寫入一個partition中的數據一定是有序的。生產者在寫的時候 ,可以指定一個key,比如指定訂單id作為key,這個訂單相關數據一定會被分發到一個partition中去。消費者從partition中取出數據的時候也一定是有序的,把每個數據放入對應的一個內存隊列,一個partition中有幾條相關數據就用幾個內存隊列,消費者開啟多個線程,每個線程處理一個內存隊列。