kafka源码
A. 如何删除kafka积压数据
Kafka删除数据有两种方式
按照时间,超过一段时间后删除过期消息
按照消息大小,消息数量超过一定大小后删除最旧的数据
- defcleanupLogs() {debug("Beginning log cleanup...")vartotal=0valstartMs=time.millisecondsfor(log <- allLogs;if!log.config.compact) {debug("Garbage collecting '"+ log.name +"'")total +=cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)}debug("Log cleanup completed. "+ total +" files deleted in "+(time.milliseconds - startMs) /1000+" seconds")}
- (log:Log):Int={valstartMs=time.millisecondslog.deleteOldSegments(startMs -_.lastModified > log.config.retentionMs)}
- <br>(log:Log):Int={if(log.config.retentionSize <0|| log.size < log.config.retentionSize)return0vardiff=log.size - log.config.retentionSizedefshouldDelete(segment:LogSegment)={if(diff - segment.size >=0) {diff -=segment.sizetrue}else{false}}log.deleteOldSegments(shouldDelete)}
Kafka删除数据的最小单位:segment
Kafka删除数据主逻辑:kafka源码
Kafka一段时间(配置文件设置)调用一次 cleanupLogs,删除所有应该删除的日志数据。
cleanupExpiredSegments 负责清理超时的数据
cleanupSegmentsToMaintainSize 负责清理超过大小的数据
B. 《ApacheKafka源码剖析》pdf下载在线阅读,求百度网盘云资源
《Apache Kafka源码剖析》(徐郡明)电子书网盘下载免费在线阅读
资源链接:
链接:
书名:Apache Kafka源码剖析
作者:徐郡明
豆瓣评分:8.4
出版社:电子工业出版社
出版年份:2017-5
页数:604
内容简介:
《Apache Kafka源码剖析》以Kafka 0.10.0版本源码为基础,针对Kafka的架构设计到实现细节进行详细阐述。《Apache Kafka源码剖析》共5章,从Kafka的应用场景、源码环境搭建开始逐步深入,不仅介绍Kafka的核心概念,而且对Kafka生产者、消费者、服务端的源码进行深入的剖析,最后介绍Kafka常用的管理脚本实现,让读者不仅从宏观设计上了解Kafka,而且能够深入到Kafka的细节设计之中。在源码分析的过程中,还穿插了笔者工作积累的经验和对Kafka设计的理解,希望读者可以举一反三,不仅知其然,而且知其所以然。
《Apache Kafka源码剖析》旨在为读者阅读Kafka源码提供帮助和指导,让读者更加深入地了解Kafka的运行原理、设计理念,让读者在设计分布式系统时可以参考Kafka的优秀设计。《Apache Kafka源码剖析》的内容对于读者全面提升自己的技术能力有很大帮助。
C. 如何在windows下查看kafka源码
Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.(Kafka是一个分布式的、可分区的(partitioned)、基于备份的(replicated)和commit-log存储的服务.。它提供了类似于messaging system的特性,但是在设计实现上完全不同)。kafka是一种高吞吐量的分布式发布订阅消息系统,它有如下特性:(1)、通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
(2)、高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
(3)、支持通过kafka服务器和消费机集群来分区消息。
(4)、支持Hadoop并行数据加载。
一、用Kafka里面自带的脚本进行编译
下载好了Kafka源码,里面自带了一个gradlew的脚本,我们可以利用这个编译Kafka源码:
D. 如何确定Kafka的分区数,key和consumer线程数
分区实际上是调优Kafka并行度的最小单元。 对于procer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;
而consumer呢,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费(具体如何确定consumer线程数目我们后面会详细说明)。 所以说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。
E. apache kafka何时开源
良好的扩展性和性能优势.不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的"事务性""消息传输担保(消息确认机制)""消息分组"
F. kafka 有java的源码吗
我这里是使用的是,kafka自带的zookeeper。
以及关于kafka的日志文件啊,都放在默认里即/tmp下,我没修改。保存默认的
1、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ jps
2625 Jps
2、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties
此刻,这时,会一直停在这,因为是前端运行。
另开一窗口,
3、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties
也是前端运行。
G. kafka 生产者回调函数 为什么没有异常
在Kafak中国社区的群中,这个问题被提及的比例是相当高的,这也是Kafka用户最常碰到的问题之一。本文结合Kafka源码试图对该问题相关的因素进行探讨。希望对大家有所帮助。
怎么确定分区数?
“我应该选择几个分区?”——如果你在Kafka中国社区的群里,这样的问题你会经常碰到的。不过有些遗憾的是,我们似乎并没有很权威的答案能够解答这样的问题。其实这也不奇怪,毕竟这样的问题通常都是没有固定答案的。Kafka上标榜自己是"high-throughput distributed messaging system",即一个高吞吐量的分布式消息引擎。那么怎么达到高吞吐量呢?Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能。但是,这只是一个方面,毕竟单机优化的能力是有上限的。如何通过水平扩展甚至是线性扩展来进一步提升吞吐量呢? Kafka就是使用了分区(partition),通过将topic的消息打散到多个分区并分布保存在不同的broker上实现了消息处理(不管是procer还是consumer)的高吞吐量。
Kafka的生产者和消费者都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于procer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;而consumer呢,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费(具体如何确定consumer线程数目我们后面会详细说明)。所以说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。
但分区是否越多越好呢?显然也不是,因为每个分区都有自己的开销:
一、客户端/服务器端需要使用的内存就越多
先说说客户端的情况。Kafka 0.8.2之后推出了Java版的全新的procer,这个procer有个参数batch.size,默认是16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。假设你有10000个分区,按照默认设置,这部分缓存需要占用约157MB的内存。而consumer端呢?我们抛开获取数据所需的内存不说,只说线程的开销。如果还是假设有10000个分区,同时consumer线程数要匹配分区数(大部分情况下是最佳的消费吞吐量配置)的话,那么在consumer client就要创建10000个线程,也需要创建大约10000个Socket去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。
服务器端的开销也不小,如果阅读Kafka源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controller,FetcherManager等,因此分区数越多,这种缓存的成本越久越大。
二、文件句柄的开销
每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄(file handler)。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。
三、降低高可用性
Kafka通过副本(replica)机制来保证高可用。具体做法就是为每个分区保存若干个副本(replica_factor指定副本数)。每个副本保存在不同的broker上。期中的一个副本充当leader 副本,负责处理procer和consumer请求。其他副本充当follower角色,由Kafka controller负责保证与leader的同步。如果leader所在的broker挂掉了,contorller会检测到然后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区,10个broker,也就是说平均每个broker上有1000个分区。此时这个broker挂掉了,那么zookeeper和controller需要立即对这1000个分区进行leader选举。比起很少的分区leader选举而言,这必然要花更长的时间,并且通常不是线性累加的。如果这个broker还同时是controller情况就更糟了。
说了这么多“废话”,很多人肯定已经不耐烦了。那你说到底要怎么确定分区数呢?答案就是:视情况而定。基本上你还是需要通过一系列实验和测试来确定。当然测试的依据应该是吞吐量。虽然LinkedIn这篇文章做了Kafka的基准测试,但它的结果其实对你意义不大,因为不同的硬件、、负载情况测试出来的结果必然不一样。我经常碰到的问题类似于,说每秒能到10MB,为什么我的procer每秒才1MB? —— 且不说硬件条件,最后发现他使用的消息体有1KB,而的基准测试是用100B测出来的,因此根本没有可比性。不过你依然可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的procer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 = Tt / max(Tp, Tc)
Tp表示procer的吞吐量。测试procer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用的关系更大, 因为Tc的值取决于你拿到消息之后执行什么操作,因此Tc的测试通常也要麻烦一些。
另外,Kafka并不能真正地做到线性扩展(其实任何系统都不能),所以你在规划你的分区数的时候最好多规划一下,这样未来扩展时候也更加方便。
消息-分区的分配
默认情况下,Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions,如下图所示:
def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}
这就保证了相同key的消息一定会被路由到相同的分区。如果你没有指定key,那么Kafka是如何确定这条消息去往哪个分区的呢?
if(key == null) { // 如果没有指定key
val id = sendPartitionPerTopicCache.get(topic) // 先看看Kafka有没有缓存的现成的分区Id
id match {
case Some(partitionId) =>
partitionId // 如果有的话直接使用这个分区Id就好了
case None => // 如果没有的话,
val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出所有可用分区的leader所在的broker
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
val index = Utils.abs(Random.nextInt) % availablePartitions.size // 从中随机挑一个
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId) // 更新缓存以备下一次直接使用
partitionId
}
}
可以看出,Kafka几乎就是随机找一个分区发送无key的消息,然后把这个分区号加入到缓存中以备后面直接使用——当然了,Kafka本身也会清空该缓存(默认每10分钟或每次请求topic元数据时)
如何设定consumer线程数
我个人的观点,如果你的分区数是N,那么最好线程数也保持为N,这样通常能够达到最大的吞吐量。超过N的配置只是浪费系统资源,因为多出的线程不会被分配到任何分区。让我们来看看具体Kafka是如何分配的。
topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费,但反之并不成立,即一个consumer线程可以消费多个分区的数据,比如Kafka提供的ConsoleConsumer,默认就只是一个线程来消费所有分区的数据。——其实ConsoleConsumer可以使用通配符的功能实现同时消费多个topic数据,但这和本文无关。
再讨论分配策略之前,先说说KafkaStream——它是consumer的关键类,提供了遍历方法用于consumer程序调用实现数据的消费。其底层维护了一个阻塞队列,所以在没有新消息到来时,consumer是处于阻塞状态的,表现出来的状态就是consumer程序一直在等待新消息的到来。——你当然可以配置成带超时的consumer,具体参看参数consumer.timeout.ms的用法。
下面说说 Kafka提供的两种分配策略: range和roundrobin,由参数partition.assignment.strategy指定,默认是range策略。本文只讨论range策略。所谓的range其实就是按照阶段平均分配。举个例子就明白了,假设你有10个分区,P0 ~ P9,consumer线程数是3, C0 ~ C2,那么每个线程都分配哪些分区呢?
C0 消费分区 0, 1, 2, 3
C1 消费分区 4, 5, 6
C2 消费分区 7, 8, 9
具体算法就是:
val nPartsPerConsumer = curPartitions.size / curConsumers.size // 每个consumer至少保证消费的分区数
val nConsumersWithExtraPart = curPartitions.size % curConsumers.size // 还剩下多少个分区需要单独分配给开头的线程们
for (consumerThreadId <- consumerThreadIdSet) { // 对于每一个consumer线程
val myConsumerPosition = curConsumers.indexOf(consumerThreadId) //算出该线程在所有线程中的位置,介于[0, n-1]
assert(myConsumerPosition >= 0)
// startPart 就是这个线程要消费的起始分区数
val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
// nParts 就是这个线程总共要消费多少个分区
val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
}
针对于这个例子,nPartsPerConsumer就是10/3=3,nConsumersWithExtraPart为10%3=1,说明每个线程至少保证3个分区,还剩下1个分区需要单独分配给开头的若干个线程。这就是为什么C0消费4个分区,后面的2个线程每个消费3个分区,具体过程详见下面的Debug截图信息:
ctx.myTopicThreadIds
nPartsPerConsumer = 10 / 3 = 3
nConsumersWithExtraPart = 10 % 3 = 1
第一次:
myConsumerPosition = 1
startPart = 1 * 3 + min(1, 1) = 4 ---也就是从分区4开始读
nParts = 3 + (if (1 + 1 > 1) 0 else 1) = 3 读取3个分区, 即4,5,6
第二次:
myConsumerPosition = 0
startPart = 3 * 0 + min(1, 0) =0 --- 从分区0开始读
nParts = 3 + (if (0 + 1 > 1) 0 else 1) = 4 读取4个分区,即0,1,2,3
第三次:
myConsumerPosition = 2
startPart = 3 * 2 + min(2, 1) = 7 --- 从分区7开始读
nParts = 3 + if (2 + 1 > 1) 0 else 1) = 3 读取3个分区,即7, 8, 9
至此10个分区都已经分配完毕
说到这里,经常有个需求就是我想让某个consumer线程消费指定的分区而不消费其他的分区。坦率来说,目前Kafka并没有提供自定义分配策略。做到这点很难,但仔细想一想,也许我们期望Kafka做的事情太多了,毕竟它只是个消息引擎,在Kafka中加入消息消费的逻辑也许并不是Kafka该做的事情。
不消费问题
第一步:参看消费者的基本情况
查看mwbops系统,【Consumer监控】-->【对应的consumerId】
如果offset数字一直在动,说明一直在消费,说明不存在问题,return;
如果offset数字一直不动,看Owner是不是有值存在
如果Owner是空,说明消费端的程序已经跟Kafka断开连接,应该排查消费端是否正常,return;
如果Owner不为空,就是有上图上面的类似于 bennu_index_benuprdapp02--fa-0 的文字,继续看下面内容
第二步:查看消费端的程序代码
一般的消费代码是这样的
看看自己的消费代码里面,存不存在处理消息的时候出异常的情况
如果有,需要try-catch一下,其实不论有没有异常,都用try-catch包一下最好,如下面代码
return;
原因:如果在处理消息的时候有异常出现,又没有进行处理,那么while循环就会跳出,线程会结束,所以不会再去取消息,就是消费停止了。
第三步:查看消费端的配置
消费代码中一般以以下方式创建Consumer
消费端有一个配置,叫 fetch.message.max.bytes,默认是1M,此时如果有消息大于1M,会发生停止消费的情况。
此时,在配置中增加 props.put("fetch.message.max.bytes", "10 * 1024 * 1024"); 即可
return;
原因:目前Kafka集群配置的运行最大的消息大小是10M,如果客户端配置的运行接收的消息是1M,跟Kafka服务端配置的不一致,
则消息大于1M的情况下,消费端就无法消费,导致一直卡在这一条消息,现象就是消费停止。
H. 学习apache kafka源码剖析需要什么基础
先搞清楚STL怎么用并大量使用相当长的时间,代码风格尽量STL化(这个真是看STL源码的前提,我就是受不了全是模板和迭代器的代码,所以至今没去研究STL源码)
还有,现在对“基础较好”、“熟练”、“精通”之类的词本能的不信任
I. 如何保证kafka 的消息机制 ack-fail 源码跟踪
Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.(Kafka布式、区(partitioned)、基于备份(replicated)commit-log存储服务.提供类似于messaging system特性,设计实现完全同)kafka种高吞吐量布式发布订阅消息系统特性:
(1)、通O(1)磁盘数据结构提供消息持久化种结构于即使数TB消息存储能够保持间稳定性能
(2)、高吞吐量:即使非普通硬件kafka支持每秒数十万消息
(3)、支持通kafka服务器消费机集群区消息
(4)、支持Hadoop并行数据加载
、用Kafka面自带脚本进行编译
载Kafka源码面自带gradlew脚本我利用编译Kafka源码:
1 # wget
2 # tar -zxf kafka-0.8.1.1-src.tgz
3 # cd kafka-0.8.1.1-src
4 # ./gradlew releaseTarGz
运行面命令进行编译现异信息:
01 :core:signArchives FAILED
02
03 FAILURE: Build failed with an exception.
04
05 * What went wrong:
06 Execution failed for task ':core:signArchives'.
07 > Cannot perform signing task ':core:signArchives' because it
08 has no configured signatory
09
10 * Try:
11 Run with --stacktrace option to get the stack trace. Run with
12 --info or --debug option to get more log output.
13
14 BUILD FAILED
bug()用面命令进行编译
1 ./gradlew releaseTarGzAll -x signArchives
候编译功(编译程现)编译程我指定应Scala版本进行编译:
1 ./gradlew -PscalaVersion=2.10.3 releaseTarGz -x signArchives
编译完core/build/distributions/面kafka_2.10-0.8.1.1.tgz文件网载直接用
二、利用sbt进行编译
我同用sbt编译Kafka步骤:
01 # git clone
02 # cd kafka
03 # git checkout -b 0.8 remotes/origin/0.8
04 # ./sbt update
05 [info] [SUCCESSFUL ] org.eclipse.jdt#core;3.1.1!core.jar (2243ms)
06 [info] downloading ...
07 [info] [SUCCESSFUL ] ant#ant;1.6.5!ant.jar (1150ms)
08 [info] Done updating.
09 [info] Resolving org.apache.hadoop#hadoop-core;0.20.2 ...
10 [info] Done updating.
11 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
12 [info] Done updating.
13 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...
14 [info] Done updating.
15 [success] Total time: 168 s, completed Jun 18, 2014 6:51:38 PM
16
17 # ./sbt package
18 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
19 Getting Scala 2.8.0 ...
20 :: retrieving :: org.scala-sbt#boot-scala
21 confs: [default]
22 3 artifacts copied, 0 already retrieved (14544kB/27ms)
23 [success] Total time: 1 s, completed Jun 18, 2014 6:52:37 PM
于Kafka 0.8及版本需要运行命令:
01 # ./sbt assembly-package-dependency
02 [info] Loading project definition from /export1/spark/kafka/project
03 [warn] Multiple resolvers having different access mechanism configured with
04 same name 'sbt-plugin-releases'. To avoid conflict, Remove plicate project
05 resolvers (`resolvers`) or rename publishing resolver (`publishTo`).
06 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)
07 [warn] Credentials file /home/wyp/.m2/.credentials does not exist
08 [info] Including slf4j-api-1.7.2.jar
09 [info] Including metrics-annotation-2.2.0.jar
10 [info] Including scala-compiler.jar
11 [info] Including scala-library.jar
12 [info] Including slf4j-simple-1.6.4.jar
13 [info] Including metrics-core-2.2.0.jar
14 [info] Including snappy-java-1.0.4.1.jar
15 [info] Including zookeeper-3.3.4.jar
16 [info] Including log4j-1.2.15.jar
17 [info] Including zkclient-0.3.jar
18 [info] Including jopt-simple-3.2.jar
19 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
20 [warn] Merging 'org/xerial/snappy/native/README' with strategy 'rename'
21 [warn] Merging 'META-INF/maven/org.xerial.snappy/snappy-java/LICENSE'
22 with strategy 'rename'
23 [warn] Merging 'LICENSE.txt' with strategy 'rename'
24 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
25 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
26 [warn] Strategy 'discard' was applied to a file
27 [warn] Strategy 'rename' was applied to 5 files
28 [success] Total time: 3 s, completed Jun 18, 2014 6:53:41 PM
我sbt面指定scala版本:
01 <!--
02 User: 往记忆
03 Date: 14-6-18
04 Time: 20:20
05 bolg:
06 本文址:/archives/1044
07 往记忆博客专注于hadoop、hive、spark、shark、flume技术博客量干货
08 往记忆博客微信公共帐号:iteblog_hadoop
09 -->
10 sbt "++2.10.3 update"
11 sbt "++2.10.3 package"
12 sbt "++2.10.3 assembly-package-dependency"
J. 基于librdkafka库写的源代码怎么编译
可以放在当前目录下,但是要设置一下库文件的路径:LD_LIBRARY_PATH=./:/usr/local/pet20/lib:/lib/:/usr/local/lib export LD_LIBRARY_PATH 这样,在调用的时候就会自动从当前目录找。 如果是显式调用则不用,只要在程序里指定.so的文件路径就可以了。所以放在当前目录下也是没问题的。