pythonkafka
㈠ 如何使用python 连接kafka 并获取数据
连接 kafka 的库有两种类型,一种是直接连接 kafka 的,存储 offset 的事情要自己在客户端完成。还有一种是先连接 zookeeper 然后再通过 zookeeper 获取 kafka 的 brokers 信息, offset 存放在 zookeeper 上面,由 zookeeper 来协调。
我现在使用 samsa 这个 highlevel 库
Procer示例
from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']topic.publish('msg')
** Consumer示例 **
from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']consumer = topic.subscribe('groupname')for msg in consumer:
print msg
Tip
consumer 必需在 procer 向 kafka 的 topic 里面提交数据后才能连接,否则会出错。
在 Kafka 中一个 consumer 需要指定 groupname , groue 中保存着 offset 等信息,新开启一个 group 会从 offset 0 的位置重新开始获取日志。
kafka 的配置参数中有个 partition ,默认是 1 ,这个会对数据进行分区,如果多个 consumer 想连接同个 group 就必需要增加 partition , partition 只能大于 consumer 的数量,否则多出来的 consumer 将无法获取到数据。
㈡ python kafka 能从一个位置开始读topic吗
孕育嘲滤展翅蚊灿
㈢ Python 有那么神吗
很多对Python不熟悉的人,认为Python在最近这些年的兴起是运气而已。本文帮助大家回顾一下过去十几年Python的3次增长和对应的历史背景。
Python设计之处的特点,包括易学易用,以及作为胶水语言。易学易用是个难以量化的东西,但至少我接触的绝大多数朋友都认同Python在学习和使用上是比大部分语言简单的。有些人不认同Python通常也不是认为Python相对其他语言难学,而是认为Python的运行速度慢,或者动态类型语言的通病。至于胶水语言就比较窄了,面向作为胶水而设计的语言是很少的,印象里只有Lua算得上一个。另一个Python相较其他语言的优势是字符串处理。
以下介绍Python获得发展的3次机遇期。
2006年的搜索/NLP:2006年前后正好是搜索引擎公司发展的爆发期。那一年Google中国李开复搞了关门弟子计划,网络也做了诸多广告,比如网络更懂中文系列。在那个时代搜索引擎就是IT行业的高科技。之后不久,阿里巴巴也搞了个ASC(阿里巴巴搜索技术研究中心),后来多次追潮流更名,一直作为阿里巴巴内部的前沿研发部门。
而Python用作搜索引擎和NLP是很有历史的。早期Google在90年代开发搜索引擎爬虫就是使用Python实现的。而即便是现在,开发爬虫,Python也基本上就是第一选择。很多人学习Python的入门程序也是爬虫。而Python适合开发爬虫的关键在于Python的字符串处理是很方便的。
也就是从2006年末开始,国内的python-cn邮件列表涌入了大量的新人,并以爬虫为例开始深入学习Python。
2010年的WEB创业潮:2010年开始,国内外出现了新一轮的创业潮,大家开发的是各种各样的网站。此时的WEB开发上,Python已经逐步成为主流WEB服务器开发选项中的一种。相对于同时代的java和PHP等,有着开发效率上的优势。
正因为早期创业公司需要迅速的迭代和试错,使得这个时代的Python成为了一个不错的选项。进而让更多工程师开始学习起来Python。
也就是从这个时期开始,才逐渐有较多的公司会公开招聘Python工程师。在此前招聘Python工程师的广告是非常非常少见的。
2014年的Deep Learning:从2014年开始火起来的深度学习上,Python的优势是近乎绝对的。深度学习的主流开发语言只有C++和Python两种了,其他语言可以认为根本就没能在这里分到什么像样的空间。所有主流的Deep Learning框架,也是官方直接提供了C++和Python两种接口。而由于C++开发的难度,当前确实有不少从业工程师是直接用Python搞定深度学习的相关步骤。
这次Python的增长期被更多不了解Python的人认为是Python的狗屎运。但其背后的逻辑反倒是非常坚固的。原因就是Python是个很好的胶水。最初的起点是numpy库。
numpy是封装了BLAS的科学计算库。BLAS是对CPU向量指令集高度优化的数学运算库。通过BLAS进行科学计算可以得到顶级的计算性能,这个计算性能比未经向量指令集优化的C程序还能快上数倍。而numpy的另一个重要特点就是,把buffer做了封装,使得buffer的内容是无需Python处理的,而是实际交给特定的软件库来处理,numpy只是负责维护该buffer的生命周期,形状等元数据。这就使得numpy的计算性能不会受到Python的影响,但同时却可以利用Python的易学易用来管理buffer的生命周期。
numpy对buffer的管理带来的易用性优势在后来得到了很大的发展。如以下几点:
OpenCV:在opencv-python中就是使用了numpy.array来管理图像数据,却没有像C++接口一样使用Mat。同样的顶级性能。
PyCuda/PyOpenCL:也是利用了numpy.array来透明传递数据给GPU做高性能计算。尤其是集成了JIT,使得可以用字符串的方式传递kernel function,不再像C++一样要独立编译一遍。
Caffe/TensorFlow:同样利用了numpy.array,并利用了PyCuda/PyOpenCL的集成。
所以这样一路发现下来,由Python是个好胶水,就真的把若干很好用的库全流程的粘在了一起。在流程集成上都是顶级的性能,而没有Python的性能损耗,同时带来了非常好的易用性。
相比其他语言,对这些C/C++库的玩法都是先封装一层对象,包准封装的前后对不上。而且因为大部分语言并不是面向胶水设计,开发C接口都很困难。自然与这些高性能计算库的结合困难重重。累计起来就与Python产生了差距。
题主所提到的R语言,是一种领域相关的语言,是做统计领域的,类似的还有做科学计算的Matlab。如果程序的输出只是一份报告,甚至一份统计图,问题是不大的。但想要成为产品,与其他系统集成则成了难题。一般的服务器部署产品,是不会选择在工程上这么不专业的语言。所以,实际应用时,还是要有工程师负责将算法提取出来,移植到产品级的语言和平台上。举个例子,R虽然可以正常的访问Mysql数据库之类的。但产品级系统中,涉及到memcache、kafka、etcd等,可就没有R语言的接口了。所以这些领域相关语言,写一些自己电脑上跑的小程序还可以,进入产品是没什么希望了。
再者就是因为通用性一直难以跨越出自己的领域,所以这类语言的生命周期一般都不太长,失去自己领域的强支撑后会很快消亡。所以建议题主还是花一些精力去看看业界通用的一些语言。
㈣ 大数据具体是学习什么内容呢主要框架是什么
首先,学习大数据是需要有java,python和R语言的基础。
1) Java学习到什么样的程度才可以学习大数据呢?
java需要学会javaSE即可。javaweb,javaee对于大数据用不到。学会了javase就可以看懂hadoop框架。
2) python是最容易学习的,难易程度:python java Scala 。
python不是比java更直观好理解么,因为会了Python 还是要学习java的,你学会了java,再来学习python会很简单的,一周的时间就可以学会python。
3) R语言也可以学习,但是不推荐,因为java用的人最多,大数据的第一个框架Hadoop,底层全是Java写的。就算学会了R还是看不懂hadoop。
java在大数据中的作用是构成大数据的语言,大数据的第一个框架Hadoop以及其他大数据技术框架,底层语言全是Java写的,所以推荐首选学习java
大数据开发学习路线:
第一阶段:Hadoop生态架构技术
1、语言基础
Java:多理解和实践在Java虚拟机的内存管理、以及多线程、线程池、设计模式、并行化就可以,不需要深入掌握。
Linux:系统安装、基本命令、网络配置、Vim编辑器、进程管理、Shell脚本、虚拟机的菜单熟悉等等。
Python:基础语法,数据结构,函数,条件判断,循环等基础知识。
2、环境准备
这里介绍在windows电脑搭建完全分布式,1主2从。
VMware虚拟机、Linux系统(Centos6.5)、Hadoop安装包,这里准备好Hadoop完全分布式集群环境。
3、MapRece
MapRece分布式离线计算框架,是Hadoop核心编程模型。
4、HDFS1.0/2.0
HDFS能提供高吞吐量的数据访问,适合大规模数据集上的应用。
5、Yarn(Hadoop2.0)
Yarn是一个资源调度平台,主要负责给任务分配资源。
6、Hive
Hive是一个数据仓库,所有的数据都是存储在HDFS上的。使用Hive主要是写Hql。
7、Spark
Spark 是专为大规模数据处理而设计的快速通用的计算引擎。
8、SparkStreaming
Spark Streaming是实时处理框架,数据是一批一批的处理。
9、SparkHive
Spark作为Hive的计算引擎,将Hive的查询作为Spark的任务提交到Spark集群上进行计算,可以提高Hive查询的性能。
10、Storm
Storm是一个实时计算框架,Storm是对实时新增的每一条数据进行处理,是一条一条的处理,可以保证数据处理的时效性。
11、Zookeeper
Zookeeper是很多大数据框架的基础,是集群的管理者。
12、Hbase
Hbase是一个Nosql数据库,是高可靠、面向列的、可伸缩的、分布式的数据库。
13、Kafka
kafka是一个消息中间件,作为一个中间缓冲层。
14、Flume
Flume常见的就是采集应用产生的日志文件中的数据,一般有两个流程。
一个是Flume采集数据存储到Kafka中,方便Storm或者SparkStreaming进行实时处理。
另一个流程是Flume采集的数据存储到HDFS上,为了后期使用hadoop或者spark进行离线处理。
第二阶段:数据挖掘算法
1、中文分词
开源分词库的离线和在线应用
2、自然语言处理
文本相关性算法
3、推荐算法
基于CB、CF,归一法,Mahout应用。
4、分类算法
NB、SVM
5、回归算法
LR、DecisionTree
6、聚类算法
层次聚类、Kmeans
7、神经网络与深度学习
NN、Tensorflow
以上就是学习Hadoop开发的一个详细路线,如果需要了解具体框架的开发技术,可咨询加米谷大数据老师,详细了解。
学习大数据开发需要掌握哪些技术呢?
(1)Java语言基础
Java开发介绍、熟悉Eclipse开发工具、Java语言基础、Java流程控制、Java字符串、Java数组与类和对象、数字处理类与核心技术、I/O与反射、多线程、Swing程序与集合类
(2)HTML、CSS与Java
PC端网站布局、HTML5+CSS3基础、WebApp页面布局、原生Java交互功能开发、Ajax异步交互、jQuery应用
(3)JavaWeb和数据库
数据库、JavaWeb开发核心、JavaWeb开发内幕
Linux&Hadoop生态体系
Linux体系、Hadoop离线计算大纲、分布式数据库Hbase、数据仓库Hive、数据迁移工具Sqoop、Flume分布式日志框架
分布式计算框架和Spark&Strom生态体系
(1)分布式计算框架
Python编程语言、Scala编程语言、Spark大数据处理、Spark—Streaming大数据处理、Spark—Mlib机器学习、Spark—GraphX 图计算、实战一:基于Spark的推荐系统(某一线公司真实项目)、实战二:新浪网(www.sina.com.cn)
(2)storm技术架构体系
Storm原理与基础、消息队列kafka、Redis工具、zookeeper详解、大数据项目实战数据获取、数据处理、数据分析、数据展现、数据应用
大数据分析—AI(人工智能)Data
Analyze工作环境准备&数据分析基础、数据可视化、Python机器学习
以上的回答希望对你有所帮助
㈤ kafka python topic 多少数据
您好,希望以下回答能帮助您 我只想说还是换个系统比较好,win7装tornado特别容易出问题,XP就可以 如您还有疑问可继续追问。
㈥ from pykafka import kafkaclient什么意思
不是一个意思,前者来自于你的系统本身,后者来自于你的存储卡。
㈦ 如何利用pykafka远程消费 zookeeper+kafka集群 python脚本
#从kafka消费
#consumer_area = topic_area.get_simple_consumer(auto_offset_reset=OffsetType.LATEST)
#从ZOOKEEPER消费
consumer_area = topic_area.get_balanced_consumer(
consumer_group=b'zs_download_04', # 自己命令
auto_offset_reset=OffsetType.LATEST,#在consumer_group存在的情况下,设置此变量,表示从最新的开始取
#auto_offset_reset=OffsetType.EARLIEST,
#reset_offset_on_start=True,
auto_commit_enable=True,
#auto_commit_interval_ms=1,
zookeeper_connect=ZK_LIST
)
㈧ 如何在kafka-python和confluent-kafka之间做出选择
kafka-python:蛮荒的西部
kafka-python是最受欢迎的Kafka Python客户端。我们过去使用时从未出现过任何问题,在我的《敏捷数据科学2.0》一书中我也用过它。然而在最近这个项目中,它却出现了一个严重的问题。我们发现,当以文档化的方式使用KafkaConsumer、Consumer迭代式地从消息队列中获取消息时,最终到达主题topic的由Consumer携带的消息通常会丢失。我们通过控制台Consumer的分析验证了这一点。
需要更详细说明的是,kafka-python和KafkaConsumer是与一个由SSL保护的Kafka服务(如Aiven Kafka)一同使用的,如下面这样:
kafka_consumer = KafkaConsumer(
topic,
enable_auto_commit=True,
group_id=group_id,
bootstrap_servers=config.kafka.host,
api_version=(0, 10),
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile=config.kafka.ca_pem,
ssl_certfile=config.kafka.service_cert,
ssl_keyfile=config.kafka.service_key
)
for message in kafka_consumer:
application_message = json.loads(message.value.decode())
...
当以这样的推荐方式使用时,KafkaConsumer会丢失消息。但有一个变通方案,就是保留所有消息。这个方案是Kafka服务提供商Aiven support提供给我们的。它看起来像这样:
while True:
raw_messages = consumer.poll(timeout_ms=1000, max_records=5000)
for topic_partition, messages in raw_messages.items():
application_message = json.loads(message.value.decode())
...
虽然这个变通方案可能有用,但README中的方法会丢弃消息使我对其失去兴趣。所以我找到了一个替代方案。
confluent-kafka:企业支持
发现coufluent-kafka Python模块时,我感到无比惊喜。它既能做librdkafka的外封装,又非常小巧。librdkafka是一个用C语言写的kafka库,它是Go和.NET的基础。更重要的是,它由Confluent公司支持。我爱开源,但是当“由非正式社区拥有或支持”这种方式效果不行的时候,或许该考虑给替代方案印上公章、即该由某个公司拥有或支持了。不过,我们并未购买商业支持。我们知道有人会维护这个库的软件质量,而且可以选择买或不买商业支持,这一点真是太棒了。
用confluent-kafka替换kafka-python非常简单。confluent-kafka使用poll方法,它类似于上面提到的访问kafka-python的变通方案。
kafka_consumer = Consumer(
{
"api.version.request": True,
"enable.auto.commit": True,
"group.id": group_id,
"bootstrap.servers": config.kafka.host,
"security.protocol": "ssl",
"ssl.ca.location": config.kafka.ca_pem,
"ssl.certificate.location": config.kafka.service_cert,
"ssl.key.location": config.kafka.service_key,
"default.topic.config": {"auto.offset.reset": "smallest"}
}
)
consumer.subscribe([topic])
# Now loop on the consumer to read messages
running = True
while running:
message = kafka_consumer.poll()
application_message = json.load(message.value.decode())
kafka_consumer.close()
现在我们能收到所有消息了。我并不是说kafka-python工具不好,我相信社区会对它的问题做出反应并解决。但从现在开始,我会一直坚持使用confluent-kafka。
开源治理
开源是强大的,但是涉及到复杂的“大数据”和NoSQL工具时,通常需要有一家大公司在背后推动工具的开发。这样你就知道,如果那个公司可以使用工具,那么该工具应该拥有很好的基本功能。它的出现可能是非正式的,就像某公司发布类似FOSS的项目一样,但也可能是正式的,就像某公司为工具提供商业支持一样。当然,从另一个角度来看,如果一家与开源社区作对的公司负责开发某个工具,你便失去了控制权。你的意见可能无关紧要,除非你是付费客户。
理想情况是采取开源治理,就像Apache基金会一样,还有就是增加可用的商业支持选项。这对互联网上大部分的免费软件来说根本不可能。限制自己只使用那些公司盖章批准后的工具将非常限制你的自由。这对于一些商店可能是正确选择,但对于我们不是。我喜欢工具测试,如果工具很小,而且只专心做一件事,我就会使用它。
信任开源
对于更大型的工具,以上决策评估过程更为复杂。通常,我会看一下提交问题和贡献者的数量,以及最后一次commit的日期。我可能会问朋友某个工具的情况,有时也会在推特上问。当你进行嗅探检查后从Github选择了一个项目,即说明你信任社区可以产出好的工具。对于大多数工具来说,这是没问题的。
但信任社区可能存在问题。对于某个特定的工具,可能并没有充分的理由让你信任社区可以产出好的软件。社区在目标、经验和开源项目的投入时间方面各不相同。选择工具时保持审慎态度十分重要,不要让理想蒙蔽了判断。