pythonkafka
㈠ 如何使用python 連接kafka 並獲取數據
連接 kafka 的庫有兩種類型,一種是直接連接 kafka 的,存儲 offset 的事情要自己在客戶端完成。還有一種是先連接 zookeeper 然後再通過 zookeeper 獲取 kafka 的 brokers 信息, offset 存放在 zookeeper 上面,由 zookeeper 來協調。
我現在使用 samsa 這個 highlevel 庫
Procer示例
from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']topic.publish('msg')
** Consumer示例 **
from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']consumer = topic.subscribe('groupname')for msg in consumer:
print msg
Tip
consumer 必需在 procer 向 kafka 的 topic 裡面提交數據後才能連接,否則會出錯。
在 Kafka 中一個 consumer 需要指定 groupname , groue 中保存著 offset 等信息,新開啟一個 group 會從 offset 0 的位置重新開始獲取日誌。
kafka 的配置參數中有個 partition ,默認是 1 ,這個會對數據進行分區,如果多個 consumer 想連接同個 group 就必需要增加 partition , partition 只能大於 consumer 的數量,否則多出來的 consumer 將無法獲取到數據。
㈡ python kafka 能從一個位置開始讀topic嗎
孕育嘲濾展翅蚊燦
㈢ Python 有那麼神嗎
很多對Python不熟悉的人,認為Python在最近這些年的興起是運氣而已。本文幫助大家回顧一下過去十幾年Python的3次增長和對應的歷史背景。
Python設計之處的特點,包括易學易用,以及作為膠水語言。易學易用是個難以量化的東西,但至少我接觸的絕大多數朋友都認同Python在學習和使用上是比大部分語言簡單的。有些人不認同Python通常也不是認為Python相對其他語言難學,而是認為Python的運行速度慢,或者動態類型語言的通病。至於膠水語言就比較窄了,面向作為膠水而設計的語言是很少的,印象里只有Lua算得上一個。另一個Python相較其他語言的優勢是字元串處理。
以下介紹Python獲得發展的3次機遇期。
2006年的搜索/NLP:2006年前後正好是搜索引擎公司發展的爆發期。那一年Google中國李開復搞了關門弟子計劃,網路也做了諸多廣告,比如網路更懂中文系列。在那個時代搜索引擎就是IT行業的高科技。之後不久,阿里巴巴也搞了個ASC(阿里巴巴搜索技術研究中心),後來多次追潮流更名,一直作為阿里巴巴內部的前沿研發部門。
而Python用作搜索引擎和NLP是很有歷史的。早期Google在90年代開發搜索引擎爬蟲就是使用Python實現的。而即便是現在,開發爬蟲,Python也基本上就是第一選擇。很多人學習Python的入門程序也是爬蟲。而Python適合開發爬蟲的關鍵在於Python的字元串處理是很方便的。
也就是從2006年末開始,國內的python-cn郵件列表湧入了大量的新人,並以爬蟲為例開始深入學習Python。
2010年的WEB創業潮:2010年開始,國內外出現了新一輪的創業潮,大家開發的是各種各樣的網站。此時的WEB開發上,Python已經逐步成為主流WEB伺服器開發選項中的一種。相對於同時代的java和PHP等,有著開發效率上的優勢。
正因為早期創業公司需要迅速的迭代和試錯,使得這個時代的Python成為了一個不錯的選項。進而讓更多工程師開始學習起來Python。
也就是從這個時期開始,才逐漸有較多的公司會公開招聘Python工程師。在此前招聘Python工程師的廣告是非常非常少見的。
2014年的Deep Learning:從2014年開始火起來的深度學習上,Python的優勢是近乎絕對的。深度學習的主流開發語言只有C++和Python兩種了,其他語言可以認為根本就沒能在這里分到什麼像樣的空間。所有主流的Deep Learning框架,也是官方直接提供了C++和Python兩種介面。而由於C++開發的難度,當前確實有不少從業工程師是直接用Python搞定深度學習的相關步驟。
這次Python的增長期被更多不了解Python的人認為是Python的狗屎運。但其背後的邏輯反倒是非常堅固的。原因就是Python是個很好的膠水。最初的起點是numpy庫。
numpy是封裝了BLAS的科學計算庫。BLAS是對CPU向量指令集高度優化的數學運算庫。通過BLAS進行科學計算可以得到頂級的計算性能,這個計算性能比未經向量指令集優化的C程序還能快上數倍。而numpy的另一個重要特點就是,把buffer做了封裝,使得buffer的內容是無需Python處理的,而是實際交給特定的軟體庫來處理,numpy只是負責維護該buffer的生命周期,形狀等元數據。這就使得numpy的計算性能不會受到Python的影響,但同時卻可以利用Python的易學易用來管理buffer的生命周期。
numpy對buffer的管理帶來的易用性優勢在後來得到了很大的發展。如以下幾點:
OpenCV:在opencv-python中就是使用了numpy.array來管理圖像數據,卻沒有像C++介面一樣使用Mat。同樣的頂級性能。
PyCuda/PyOpenCL:也是利用了numpy.array來透明傳遞數據給GPU做高性能計算。尤其是集成了JIT,使得可以用字元串的方式傳遞kernel function,不再像C++一樣要獨立編譯一遍。
Caffe/TensorFlow:同樣利用了numpy.array,並利用了PyCuda/PyOpenCL的集成。
所以這樣一路發現下來,由Python是個好膠水,就真的把若干很好用的庫全流程的粘在了一起。在流程集成上都是頂級的性能,而沒有Python的性能損耗,同時帶來了非常好的易用性。
相比其他語言,對這些C/C++庫的玩法都是先封裝一層對象,包準封裝的前後對不上。而且因為大部分語言並不是面向膠水設計,開發C介面都很困難。自然與這些高性能計算庫的結合困難重重。累計起來就與Python產生了差距。
題主所提到的R語言,是一種領域相關的語言,是做統計領域的,類似的還有做科學計算的Matlab。如果程序的輸出只是一份報告,甚至一份統計圖,問題是不大的。但想要成為產品,與其他系統集成則成了難題。一般的伺服器部署產品,是不會選擇在工程上這么不專業的語言。所以,實際應用時,還是要有工程師負責將演算法提取出來,移植到產品級的語言和平台上。舉個例子,R雖然可以正常的訪問Mysql資料庫之類的。但產品級系統中,涉及到memcache、kafka、etcd等,可就沒有R語言的介面了。所以這些領域相關語言,寫一些自己電腦上跑的小程序還可以,進入產品是沒什麼希望了。
再者就是因為通用性一直難以跨越出自己的領域,所以這類語言的生命周期一般都不太長,失去自己領域的強支撐後會很快消亡。所以建議題主還是花一些精力去看看業界通用的一些語言。
㈣ 大數據具體是學習什麼內容呢主要框架是什麼
首先,學習大數據是需要有java,python和R語言的基礎。
1) Java學習到什麼樣的程度才可以學習大數據呢?
java需要學會javaSE即可。javaweb,javaee對於大數據用不到。學會了javase就可以看懂hadoop框架。
2) python是最容易學習的,難易程度:python java Scala 。
python不是比java更直觀好理解么,因為會了Python 還是要學習java的,你學會了java,再來學習python會很簡單的,一周的時間就可以學會python。
3) R語言也可以學習,但是不推薦,因為java用的人最多,大數據的第一個框架Hadoop,底層全是Java寫的。就算學會了R還是看不懂hadoop。
java在大數據中的作用是構成大數據的語言,大數據的第一個框架Hadoop以及其他大數據技術框架,底層語言全是Java寫的,所以推薦首選學習java
大數據開發學習路線:
第一階段:Hadoop生態架構技術
1、語言基礎
Java:多理解和實踐在Java虛擬機的內存管理、以及多線程、線程池、設計模式、並行化就可以,不需要深入掌握。
Linux:系統安裝、基本命令、網路配置、Vim編輯器、進程管理、Shell腳本、虛擬機的菜單熟悉等等。
Python:基礎語法,數據結構,函數,條件判斷,循環等基礎知識。
2、環境准備
這里介紹在windows電腦搭建完全分布式,1主2從。
VMware虛擬機、Linux系統(Centos6.5)、Hadoop安裝包,這里准備好Hadoop完全分布式集群環境。
3、MapRece
MapRece分布式離線計算框架,是Hadoop核心編程模型。
4、HDFS1.0/2.0
HDFS能提供高吞吐量的數據訪問,適合大規模數據集上的應用。
5、Yarn(Hadoop2.0)
Yarn是一個資源調度平台,主要負責給任務分配資源。
6、Hive
Hive是一個數據倉庫,所有的數據都是存儲在HDFS上的。使用Hive主要是寫Hql。
7、Spark
Spark 是專為大規模數據處理而設計的快速通用的計算引擎。
8、SparkStreaming
Spark Streaming是實時處理框架,數據是一批一批的處理。
9、SparkHive
Spark作為Hive的計算引擎,將Hive的查詢作為Spark的任務提交到Spark集群上進行計算,可以提高Hive查詢的性能。
10、Storm
Storm是一個實時計算框架,Storm是對實時新增的每一條數據進行處理,是一條一條的處理,可以保證數據處理的時效性。
11、Zookeeper
Zookeeper是很多大數據框架的基礎,是集群的管理者。
12、Hbase
Hbase是一個Nosql資料庫,是高可靠、面向列的、可伸縮的、分布式的資料庫。
13、Kafka
kafka是一個消息中間件,作為一個中間緩沖層。
14、Flume
Flume常見的就是採集應用產生的日誌文件中的數據,一般有兩個流程。
一個是Flume採集數據存儲到Kafka中,方便Storm或者SparkStreaming進行實時處理。
另一個流程是Flume採集的數據存儲到HDFS上,為了後期使用hadoop或者spark進行離線處理。
第二階段:數據挖掘演算法
1、中文分詞
開源分詞庫的離線和在線應用
2、自然語言處理
文本相關性演算法
3、推薦演算法
基於CB、CF,歸一法,Mahout應用。
4、分類演算法
NB、SVM
5、回歸演算法
LR、DecisionTree
6、聚類演算法
層次聚類、Kmeans
7、神經網路與深度學習
NN、Tensorflow
以上就是學習Hadoop開發的一個詳細路線,如果需要了解具體框架的開發技術,可咨詢加米穀大數據老師,詳細了解。
學習大數據開發需要掌握哪些技術呢?
(1)Java語言基礎
Java開發介紹、熟悉Eclipse開發工具、Java語言基礎、Java流程式控制制、Java字元串、Java數組與類和對象、數字處理類與核心技術、I/O與反射、多線程、Swing程序與集合類
(2)HTML、CSS與Java
PC端網站布局、HTML5+CSS3基礎、WebApp頁面布局、原生Java交互功能開發、Ajax非同步交互、jQuery應用
(3)JavaWeb和資料庫
資料庫、JavaWeb開發核心、JavaWeb開發內幕
Linux&Hadoop生態體系
Linux體系、Hadoop離線計算大綱、分布式資料庫Hbase、數據倉庫Hive、數據遷移工具Sqoop、Flume分布式日誌框架
分布式計算框架和Spark&Strom生態體系
(1)分布式計算框架
Python編程語言、Scala編程語言、Spark大數據處理、Spark—Streaming大數據處理、Spark—Mlib機器學習、Spark—GraphX 圖計算、實戰一:基於Spark的推薦系統(某一線公司真實項目)、實戰二:新浪網(www.sina.com.cn)
(2)storm技術架構體系
Storm原理與基礎、消息隊列kafka、Redis工具、zookeeper詳解、大數據項目實戰數據獲取、數據處理、數據分析、數據展現、數據應用
大數據分析—AI(人工智慧)Data
Analyze工作環境准備&數據分析基礎、數據可視化、Python機器學習
以上的回答希望對你有所幫助
㈤ kafka python topic 多少數據
您好,希望以下回答能幫助您 我只想說還是換個系統比較好,win7裝tornado特別容易出問題,XP就可以 如您還有疑問可繼續追問。
㈥ from pykafka import kafkaclient什麼意思
不是一個意思,前者來自於你的系統本身,後者來自於你的存儲卡。
㈦ 如何利用pykafka遠程消費 zookeeper+kafka集群 python腳本
#從kafka消費
#consumer_area = topic_area.get_simple_consumer(auto_offset_reset=OffsetType.LATEST)
#從ZOOKEEPER消費
consumer_area = topic_area.get_balanced_consumer(
consumer_group=b'zs_download_04', # 自己命令
auto_offset_reset=OffsetType.LATEST,#在consumer_group存在的情況下,設置此變數,表示從最新的開始取
#auto_offset_reset=OffsetType.EARLIEST,
#reset_offset_on_start=True,
auto_commit_enable=True,
#auto_commit_interval_ms=1,
zookeeper_connect=ZK_LIST
)
㈧ 如何在kafka-python和confluent-kafka之間做出選擇
kafka-python:蠻荒的西部
kafka-python是最受歡迎的Kafka Python客戶端。我們過去使用時從未出現過任何問題,在我的《敏捷數據科學2.0》一書中我也用過它。然而在最近這個項目中,它卻出現了一個嚴重的問題。我們發現,當以文檔化的方式使用KafkaConsumer、Consumer迭代式地從消息隊列中獲取消息時,最終到達主題topic的由Consumer攜帶的消息通常會丟失。我們通過控制台Consumer的分析驗證了這一點。
需要更詳細說明的是,kafka-python和KafkaConsumer是與一個由SSL保護的Kafka服務(如Aiven Kafka)一同使用的,如下面這樣:
kafka_consumer = KafkaConsumer(
topic,
enable_auto_commit=True,
group_id=group_id,
bootstrap_servers=config.kafka.host,
api_version=(0, 10),
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile=config.kafka.ca_pem,
ssl_certfile=config.kafka.service_cert,
ssl_keyfile=config.kafka.service_key
)
for message in kafka_consumer:
application_message = json.loads(message.value.decode())
...
當以這樣的推薦方式使用時,KafkaConsumer會丟失消息。但有一個變通方案,就是保留所有消息。這個方案是Kafka服務提供商Aiven support提供給我們的。它看起來像這樣:
while True:
raw_messages = consumer.poll(timeout_ms=1000, max_records=5000)
for topic_partition, messages in raw_messages.items():
application_message = json.loads(message.value.decode())
...
雖然這個變通方案可能有用,但README中的方法會丟棄消息使我對其失去興趣。所以我找到了一個替代方案。
confluent-kafka:企業支持
發現coufluent-kafka Python模塊時,我感到無比驚喜。它既能做librdkafka的外封裝,又非常小巧。librdkafka是一個用C語言寫的kafka庫,它是Go和.NET的基礎。更重要的是,它由Confluent公司支持。我愛開源,但是當「由非正式社區擁有或支持」這種方式效果不行的時候,或許該考慮給替代方案印上公章、即該由某個公司擁有或支持了。不過,我們並未購買商業支持。我們知道有人會維護這個庫的軟體質量,而且可以選擇買或不買商業支持,這一點真是太棒了。
用confluent-kafka替換kafka-python非常簡單。confluent-kafka使用poll方法,它類似於上面提到的訪問kafka-python的變通方案。
kafka_consumer = Consumer(
{
"api.version.request": True,
"enable.auto.commit": True,
"group.id": group_id,
"bootstrap.servers": config.kafka.host,
"security.protocol": "ssl",
"ssl.ca.location": config.kafka.ca_pem,
"ssl.certificate.location": config.kafka.service_cert,
"ssl.key.location": config.kafka.service_key,
"default.topic.config": {"auto.offset.reset": "smallest"}
}
)
consumer.subscribe([topic])
# Now loop on the consumer to read messages
running = True
while running:
message = kafka_consumer.poll()
application_message = json.load(message.value.decode())
kafka_consumer.close()
現在我們能收到所有消息了。我並不是說kafka-python工具不好,我相信社區會對它的問題做出反應並解決。但從現在開始,我會一直堅持使用confluent-kafka。
開源治理
開源是強大的,但是涉及到復雜的「大數據」和NoSQL工具時,通常需要有一家大公司在背後推動工具的開發。這樣你就知道,如果那個公司可以使用工具,那麼該工具應該擁有很好的基本功能。它的出現可能是非正式的,就像某公司發布類似FOSS的項目一樣,但也可能是正式的,就像某公司為工具提供商業支持一樣。當然,從另一個角度來看,如果一家與開源社區作對的公司負責開發某個工具,你便失去了控制權。你的意見可能無關緊要,除非你是付費客戶。
理想情況是採取開源治理,就像Apache基金會一樣,還有就是增加可用的商業支持選項。這對互聯網上大部分的免費軟體來說根本不可能。限制自己只使用那些公司蓋章批准後的工具將非常限制你的自由。這對於一些商店可能是正確選擇,但對於我們不是。我喜歡工具測試,如果工具很小,而且只專心做一件事,我就會使用它。
信任開源
對於更大型的工具,以上決策評估過程更為復雜。通常,我會看一下提交問題和貢獻者的數量,以及最後一次commit的日期。我可能會問朋友某個工具的情況,有時也會在推特上問。當你進行嗅探檢查後從Github選擇了一個項目,即說明你信任社區可以產出好的工具。對於大多數工具來說,這是沒問題的。
但信任社區可能存在問題。對於某個特定的工具,可能並沒有充分的理由讓你信任社區可以產出好的軟體。社區在目標、經驗和開源項目的投入時間方面各不相同。選擇工具時保持審慎態度十分重要,不要讓理想蒙蔽了判斷。