分布式python編程
A. 如何用python寫一個分布式爬蟲
本文將會以PC端微博進行講解,因為移動端微博數據不如PC短全面,而且抓取和解析難度都會小一些。文章比較長,由於篇幅所限,文章並沒有列出所有代碼,只是講了大致流程和思路。
要抓微博數據,第一步便是模擬登陸,因為很多信息(比如用戶信息,用戶主頁微博數據翻頁等各種翻頁)都需要在登錄狀態下才能查看。關於模擬登陸進階,我寫過兩篇文章,一篇是模擬登陸微博的,是從小白的角度寫的。另外一篇是模擬登陸網路雲的,是從有一定經驗的熟手的角度寫的。讀了這兩篇文章,並且根據我寫的過程自己動手實現過的同學,應該對於模擬登陸PC端微博是沒有太大難度的。那兩篇文章沒有講如何處理驗證碼,這里我簡單說一下,做爬蟲的同學不要老想著用什麼機器學習的方法去識別復雜驗證碼,真的難度非常大,這應該也不是一個爬蟲工程師的工作重點,當然這只是我的個人建議。工程化的項目,我還是建議大家通過打碼平台來解決驗證碼的問題。我在分布式微博爬蟲中就是直接調用打碼平台的介面來做的大規模微博賬號的模擬登陸,效果還不錯,而且打碼成本很低。
說完模擬登陸(具體請參見我寫的那兩篇文章,篇幅所限,我就不過來了),我們現在正式進入微博的數據抓取。這里我會以微博用戶信息抓取為例來進行分析和講解。
關於用戶信息抓取,可能我們有兩個目的。一個是我們只想抓一些指定用戶,另外一個是我們想盡可能多的抓取更多數量的用戶的信息。我的目的假定是第二種。那麼我們該以什麼樣的策略來抓取,才能獲得盡可能多的用戶信息呢?如果我們初始用戶選擇有誤,選了一些不活躍的用戶,很可能會形成一個環,這樣就抓不了太多的數據。這里有一個很簡單的思路:我們把一些大V拿來做為種子用戶,我們先抓他們的個人信息,然後再抓大V所關注的用戶和粉絲,大V關注的用戶肯定也是類似大V的用戶,這樣的話,就不容易形成環了。
策略我們都清楚了。就該是分析和編碼了。
我們先來分析如何構造用戶信息的URL。這里我以微博名為一起神吐槽的博主為例進行分析。做爬蟲的話,一個很重要的意識就是爬蟲能抓的數據都是人能看到的數據,反過來,人能在瀏覽器上看到的數據,爬蟲幾乎都能抓。這里用的是幾乎,因為有的數據抓取難度特別。我們首先需要以正常人的流程看看怎麼獲取到用戶的信息。我們先進入該博主的主頁,如下圖
根據唯一性判斷
我們在頁面源碼中搜索,只發現一個script中有該字元串,那麼就是那段script是頁面相關信息。我們可以通過正則表達式把該script提取出來,然後把其中的html也提取出來,再保存到本地,看看信息是否全面。這里我就不截圖了。感覺還有很多要寫的,不然篇幅太長了。
另外,對於具體頁面的解析,我也不做太多的介紹了。太細的東西還是建議讀讀源碼。我只講一下,我覺得的一種處理異常的比較優雅的方式。微博爬蟲的話,主要是頁面樣式太多,如果你打算包含所有不同的用戶的模版,那麼我覺得幾乎不可能,不同用戶模版,用到的解析規則就不一樣。那麼出現解析異常如何處理?尤其是你沒有catch到的異常。很可能因為這個問題,程序就崩掉。其實對於Python這門語言來說,我們可以通過裝飾器來捕捉我們沒有考慮到的異常,比如我這個裝飾器
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def parse_decorator(return_type):
"""
:param return_type: 用於捕捉頁面解析的異常, 0表示返回數字0, 1表示返回空字元串, 2表示返回[],3表示返回False, 4表示返回{}, 5返回None
:return: 0,'',[],False,{},None
"""
def page_parse(func):
@wraps(func)
def handle_error(*keys):
try:
return func(*keys)
except Exception as e:
parser.error(e)
if return_type == 5:
return None
elif return_type == 4:
return {}
elif return_type == 3:
return False
elif return_type == 2:
return []
elif return_type == 1:
return ''
else:
return 0
return handle_error
return page_parse
上面的代碼就是處理解析頁面發生異常的情況,我們只能在數據的准確性、全面性和程序的健壯性之間做一些取捨。用裝飾器的話,程序中不用寫太多的try語句,代碼重復率也會減少很多。
頁面的解析由於篇幅所限,我就講到這里了。沒有涉及太具體的解析,其中一個還有一個比較難的點,就是數據的全面性,讀者可以去多觀察幾個微博用戶的個人信息,就會發現有的個人信息,有的用戶有填寫,有的並沒有。解析的時候要考慮完的話,建議從自己的微博的個人信息入手,看到底有哪些可以填。這樣可以保證幾乎不會漏掉一些重要的信息。
最後,我再切合本文的標題,講如何搭建一個分布式的微博爬蟲。開發過程中,我們可以先就做單機單線程的爬蟲,然後再改成使用celery的方式。這里這樣做是為了方便開發和測試,因為你單機搭起來並且跑得通了,那麼分布式的話,就很容易改了,因為celery的API使用本來就很簡潔。
我們抓取的是用戶信息和他的關注和粉絲uid。用戶信息的話,我們一個請求大概能抓取一個用戶的信息,而粉絲和關注我們一個請求可以抓取18個左右(因為這個抓的是列表),顯然可以發現用戶信息應該多佔一些請求的資源。這時候就該介紹理論篇沒有介紹的關於celery的一個高級特性了,它叫做任務路由。直白點說,它可以規定哪個分布式節點能做哪些任務,不能做哪些任務。它的存在可以讓資源分配更加合理,分布式微博爬蟲項目初期,就沒有使用任務路由,然後抓了十多萬條關注和分析,結果發現用戶信息抓幾萬條,這就是資源分配得不合理。那麼如何進行任務路由呢?
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# coding:utf-8
import os
from datetime import timedelta
from celery import Celery
from kombu import Exchange, Queue
from config.conf import get_broker_or_backend
from celery import platforms
# 允許celery以root身份啟動
platforms.C_FORCE_ROOT = True
worker_log_path = os.path.join(os.path.dirname(os.path.dirname(__file__))+'/logs', 'celery.log')
beat_log_path = os.path.join(os.path.dirname(os.path.dirname(__file__))+'/logs', 'beat.log')
tasks = ['tasks.login', 'tasks.user']
# include的作用就是注冊服務化函數
app = Celery('weibo_task', include=tasks, broker=get_broker_or_backend(1), backend=get_broker_or_backend(2))
app.conf.update(
CELERY_TIMEZONE='Asia/Shanghai',
CELERY_ENABLE_UTC=True,
CELERYD_LOG_FILE=worker_log_path,
CELERYBEAT_LOG_FILE=beat_log_path,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_QUEUES=(
Queue('login_queue', exchange=Exchange('login', type='direct'), routing_key='for_login'),
Queue('user_crawler', exchange=Exchange('user_info', type='direct'), routing_key='for_user_info'),
Queue('fans_followers', exchange=Exchange('fans_followers', type='direct'), routing_key='for_fans_followers'),
)
上述代碼我指定了有login_queue、user_crawler、fans_followers三個任務隊列。它們分別的作用是登錄、用戶信息抓取、粉絲和關注抓取。現在假設我有三台爬蟲伺服器A、B和C。我想讓我所有的賬號登錄任務分散到三台伺服器、讓用戶抓取在A和B上執行,讓粉絲和關注抓取在C上執行,那麼啟動A、B、C三個伺服器的celery worker的命令就分別是
Python
1
2
3
celery -A tasks.workers -Q login_queue,user_crawler worker -l info -c 1 # A伺服器和B伺服器啟動worker的命令,它們只會執行登錄和用戶信息抓取任務
celery -A tasks.workers -Q login_queue,fans_followers worker -l info -c 1 # C伺服器啟動worker的命令,它只會執行登錄、粉絲和關注抓取任務
然後我們通過命令行或者代碼(如下)就能發送所有任務給各個節點執行了
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
4
B. Python分布式進程中你會遇到的坑
寫在前面
小驚大怪
你是不是在用Python3或者在windows系統上編程?最重要的是你對進程和線程不是很清楚?那麼恭喜你,在python分布式進程中,會有坑等著你去挖。。。(hahahaha,此處允許我嚇唬一下你)開玩笑的啦,不過,如果你知道序列中不支持匿名函數,那這個坑就和你say byebye了。好了話不多數,直接進入正題。
分布式進程
正如大家所知道的Process比Thread更穩定,而且Process可以分布到多台機器上,而Thread最多隻能分布到同一陸坦咐台機器的多個CPU上。Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多台機器上。一個服務進程可以作為調度者,將任務分布到其他多個進程中,依靠網路通信。由於managers模塊封裝很好,不必了解網路通信的細節,就可以很容易地編寫分布式多進程程序。
代碼記錄
舉個例子
如果我們已經有一個通過Queue通信的多進程程序在同一台機器上運行,現在,由於處理任務的進程任務繁重,希望把發送任務的進程和處理任務的進程分布到兩台機器上,這應該怎麼用分布式進程來實現呢?你已經知道了原有的Queue可以繼續使用,而且通過managers模塊把Queue通過網路暴露出去,就可以讓其他機器的進程來訪問Queue了。好,那我們就這么干!
寫個task_master.py
我們先看服務進程。服務進程負責啟動Queue,把Queue注冊到網路上,然後往Queue裡面寫入任務。
請注意,當我們在一台機器上寫多進程程序時,創建的Queue可以直接拿來用,但是,在分布式多進程環境下,添加任務到Queue不可以直接對原始的task_queue進行操作,那樣就繞過了QueueManager的封裝,必須通過manager.get_task_queue()獲得的Queue介面添加。然後,在另一台機器上啟動任務進程(本機上啟動也可以)
寫個task_worker.py
任務進程要通過網路連接到服務進程,所以要指定服務進程的IP。
運行結果
現在,可信沒以試試分布式進程的工作效果了。先啟動task_master.py服務進程:
task_master.py進程發送完任務後,開始等待result隊列的結果。現在啟動task_worker.py進程:
看到沒,結果都出錯了,我們好好分析一下到底哪出錯了。。。
錯誤分析
在task_master.py的報錯提示中,我們知道它說lambda錯誤,這是因為序列化不支持匿名函數,所以我們得修改代碼,重新對queue用QueueManager進行封裝放到網路中。
其中task_queue和result_queue是兩個隊列,分別存放任務和結果。它們用來進行進程間通信,交換對象。
因為是分布式的環境,放入queue中的數據需要等待Workers機器運算處理後再進行讀取,這樣就需要對queue用QueueManager進行封裝放到網路中,這是通過上面的2行代碼來實現的。我們給return_task_queue的網路調用介面取了一個名早純get_task_queue,而return_result_queue的名字是get_result_queue,方便區分對哪個queue進行操作。task.put(n)即是對task_queue進行寫入數據,相當於分配任務。而result.get()即是等待workers機器處理後返回的結果。
值得注意 在windows系統中你必須要寫IP地址,而其他操作系統比如linux操作系統則就不要了。
修改後的代碼
在task_master.py中修改如下:
在task_worker.py中修改如下:
先運行task_master.py,然後再運行task_worker.py
(1)task_master.py運行結果如下
(2)task_worker.py運行結果如下
知識補充
這個簡單的Master/Worker模型有什麼用?其實這就是一個簡單但真正的分布式計算,把代碼稍加改造,啟動多個worker,就可以把任務分布到幾台甚至幾十台機器上,比如把計算n*n的代碼換成發送郵件,就實現了郵件隊列的非同步發送。
Queue對象存儲在哪?注意到task_worker.py中根本沒有創建Queue的代碼,所以,Queue對象存儲在task_master.py進程中:
而Queue之所以能通過網路訪問,就是通過QueueManager實現的。由於QueueManager管理的不止一個Queue,所以,要給每個Queue的網路調用介面起個名字,比如get_task_queue。task_worker這里的QueueManager注冊的名字必須和task_manager中的一樣。對比上面的例子,可以看出Queue對象從另一個進程通過網路傳遞了過來。只不過這里的傳遞和網路通信由QueueManager完成。
authkey有什麼用?這是為了保證兩台機器正常通信,不被其他機器惡意干擾。如果task_worker.py的authkey和task_master.py的authkey不一致,肯定連接不上。
C. 如何用 Python 構建一個簡單的分布式系統
分布式爬蟲概覽
何謂分布式爬蟲?
通俗的講,分布式爬蟲就是多台機器多個
spider
對多個
url
的同時處理問題,分布式的方式可以極大提高程序的抓取效率。
構建分布式爬蟲通暢需要考慮的問題
(1)如何能保證多台機器同時抓取同一個URL?
(2)如果某個節點掛掉,會不會影響其它節點,任務如何繼續?
(3)既然是分布式,如何保證架構的可伸縮性和可擴展性?不同優先順序的抓取任務如何進行資源分配和調度?
基於上述問題,我選擇使用celery作為分布式任務調度工具,是分布式爬蟲中任務和資源調度的核心模塊。它會把所有任務都通過消息隊列發送給各個分布式節點進行執行,所以可以很好的保證url不會被重復抓取;它在檢測到worker掛掉的情況下,會嘗試向其他的worker重新發送這個任務信息,這樣第二個問題也可以得到解決;celery自帶任務路由,我們可以根據實際情況在不同的節點上運行不同的抓取任務(在實戰篇我會講到)。本文主要就是帶大家了解一下celery的方方面面(有celery相關經驗的同學和大牛可以直接跳過了)
Celery知識儲備
celery基礎講解
按celery官網的介紹來說
Celery
是一個簡單、靈活且可靠的,處理大量消息的分布式系統,並且提供維護這樣一個系統的必需工具。它是一個專注於實時處理的任務隊列,同時也支持任務調度。
下面幾個關於celery的核心知識點
broker:翻譯過來叫做中間人。它是一個消息傳輸的中間件,可以理解為一個郵箱。每當應用程序調用celery的非同步任務的時候,會向broker傳遞消息,而後celery的worker將會取到消息,執行相應程序。這其實就是消費者和生產者之間的橋梁。
backend:
通常程序發送的消息,發完就完了,可能都不知道對方時候接受了。為此,celery實現了一個backend,用於存儲這些消息以及celery執行的一些消息和結果。
worker:
Celery類的實例,作用就是執行各種任務。注意在celery3.1.25後windows是不支持celery
worker的!
procer:
發送任務,將其傳遞給broker
beat:
celery實現的定時任務。可以將其理解為一個procer,因為它也是通過網路調用定時將任務發送給worker執行。注意在windows上celery是不支持定時任務的!
下面是關於celery的架構示意圖,結合上面文字的話應該會更好理解
由於celery只是任務隊列,而不是真正意義上的消息隊列,它自身不具有存儲數據的功能,所以broker和backend需要通過第三方工具來存儲信息,celery官方推薦的是
RabbitMQ和Redis,另外mongodb等也可以作為broker或者backend,可能不會很穩定,我們這里選擇Redis作為broker兼backend。
實際例子
先安裝celery
pip
install
celery
我們以官網給出的例子來做說明,並對其進行擴展。首先在項目根目錄下,這里我新建一個項目叫做celerystudy,然後切換到該項目目錄下,新建文件tasks.py,然後在其中輸入下面代碼
這里我詳細講一下代碼:我們先通過app=Celery()來實例化一個celery對象,在這個過程中,我們指定了它的broker,是redis的db
2,也指定了它的backend,是redis的db3,
broker和backend的連接形式大概是這樣
redis://:password@hostname:port/db_number
然後定義了一個add函數,重點是@app.task,它的作用在我看來就是將add()
注冊為一個類似服務的東西,本來只能通過本地調用的函數被它裝飾後,就可以通過網路來調用。這個tasks.py中的app就是一個worker。它可以有很多任務,比如這里的任務函數add。我們再通過在命令行切換到項目根目錄,執行
celery
-A
tasks
worker
-l
info
啟動成功後就是下圖所示的樣子
這里我說一下各個參數的意思,-A指定的是app(即Celery實例)所在的文件模塊,我們的app是放在tasks.py中,所以這里是
tasks;worker表示當前以worker的方式運行,難道還有別的方式?對的,比如運行定時任務就不用指定worker這個關鍵字;
-l
info表示該worker節點的日誌等級是info,更多關於啟動worker的參數(比如-c、-Q等常用的)請使用
celery
worker
--help
進行查看
將worker啟動起來後,我們就可以通過網路來調用add函數了。我們在後面的分布式爬蟲構建中也是採用這種方式分發和消費url的。在命令行先切換到項目根目錄,然後打開python交互端
from
tasks
import
addrs
=
add.delay(2,
2)
這里的add.delay就是通過網路調用將任務發送給add所在的worker執行,這個時候我們可以在worker的界面看到接收的任務和計算的結果。
這里是非同步調用,如果我們需要返回的結果,那麼要等rs的ready狀態true才行。這里add看不出效果,不過試想一下,如果我們是調用的比較占時間的io任務,那麼非同步任務就比較有價值了
上面講的是從Python交互終端中調用add函數,如果我們要從另外一個py文件調用呢?除了通過import然後add.delay()這種方式,我們還可以通過send_task()這種方式,我們在項目根目錄另外新建一個py文件叫做
excute_tasks.py,在其中寫下如下的代碼
from
tasks
import
addif
__name__
==
'__main__':
add.delay(5,
10)
這時候可以在celery的worker界面看到執行的結果
此外,我們還可以通過send_task()來調用,將excute_tasks.py改成這樣
這種方式也是可以的。send_task()還可能接收到為注冊(即通過@app.task裝飾)的任務,這個時候worker會忽略這個消息
定時任務
上面部分講了怎麼啟動worker和調用worker的相關函數,這里再講一下celery的定時任務。
爬蟲由於其特殊性,可能需要定時做增量抓取,也可能需要定時做模擬登陸,以防止cookie過期,而celery恰恰就實現了定時任務的功能。在上述基礎上,我們將tasks.py文件改成如下內容
然後先通過ctrl+c停掉前一個worker,因為我們代碼改了,需要重啟worker才會生效。我們再次以celery
-A
tasks
worker
-l
info這個命令開啟worker。
這個時候我們只是開啟了worker,如果要讓worker執行任務,那麼還需要通過beat給它定時發送,我們再開一個命令行,切換到項目根目錄,通過
這樣就表示定時任務已經開始運行了。
眼尖的同學可能看到我這里celery的版本是3.1.25,這是因為celery支持的windows最高版本是3.1.25。由於我的分布式微博爬蟲的worker也同時部署在了windows上,所以我選擇了使用
3.1.25。如果全是linux系統,建議使用celery4。
此外,還有一點需要注意,在celery4後,定時任務(通過schele調度的會這樣,通過crontab調度的會馬上執行)會在當前時間再過定時間隔執行第一次任務,比如我這里設置的是60秒的間隔,那麼第一次執行add會在我們通過celery
beat
-A
tasks
-l
info啟動定時任務後60秒才執行;celery3.1.25則會馬上執行該任務
D. Python主要內容學的是什麼
第一步:Python開發基礎
Python全棧開發與人工智慧之Python開發基礎知識學習內容包括:Python基礎語法、數據類型、字元編碼、文件操作、函數、裝飾器、迭代器、內置方法、常用模塊等。
第二步:Python高級編程和資料庫開發
Python全棧開發與人工智慧之Python高級編程和資料庫開發知識學習內容包括:面向對象開發、Socket網路編程、線程、進程、隊列、IO多路模型、Mysql資料庫開發等。
第三步:前端開發
Python全棧開發與人工智慧之前端開發知識學習內容包括:Html、CSS、JavaScript開發、Jquery&bootstrap開發、前端框架VUE開發等。
第十步:高並發語言GO開發
Python全棧開發與人工智慧之高並發語言GO開發學習內容包括:GO語言基礎、數據類型與文件IO操作、函數和面向對象、並發編程等。
E. 哪些分布式文件系統是由Python編寫的呢
我知道分布式文件系統完全用Python 寫的只有openstack 的swift。
其他還有一些不知名的分布式文件系統用python 寫的如:
NCFS(基於多個雲存儲的分布式文件系統)
一般考慮性能都不會採用python 作為分布式文件系統的開發語言
F. python面試之分布式
主要用於分散壓力,所以分布式的服務都是部署在不同的伺服器上的,再將服務做集群
根據「分層」的思想進行拆分。
例如,可以將一個項目根據「三層架構」 拆分
然後再分開部署 :
根據業務進行拆分。
例如,可以根據業務邏輯,將「電商項目」拆分成 「訂單項目」、「用戶項目」和「秒殺項目」 。顯然這三個拆分後的項目,仍然可以作為獨立的項目使用。像這種拆分的方法,就成為垂直拆分
主要用於分散能力,主要是將服務的顆粒度盡量細化,且自成一脈,壓力這塊並不是其關注的點,所以多個微服務是可以部署在同一台伺服器上的
微服務可以理解為一種 非常細粒度的垂直拆分 。例如,以上「訂單項目」本來就是垂直拆分後的子項目,但實際上「訂單項目」還能進一步鬧蔽拆分為「購物項目」、「結算項目」和「售後項目」,如圖
現在看圖中的「訂單項目」,它完全可以作為一個分布式項目的組成元素,但就不適合作為微服務的組成元素了(因為它還能再拆,而微服務應該是不能再拆的「微小」服務,類似於「原子性」)
分布式服務需要提供給別的分布式服務去調用,單獨拆出來 未必外部可用
微服務自成一脈,可以系統內部調用,也可以單獨提供服務
為什麼需要用分布式鎖,見下圖
變數A存在三個伺服器內存中(這個變數A主要體現是在一個類中的一個成員變數,是一個有狀態的對象),如果不加任何控制的話,變數A同時都會在分配一塊內存,三個請求發過來同時對這個變數操作,顯然結果是不對的!即使不是同時發過來,三個請求分別操作三個不同內存區域的數據,變數A之間不存在共享,也不具有可見性,處理的結果也是不對的。
分布式鎖應該具備哪些條件:
1、在分布式系統環境下,一個方法在同一時間只能被一個機器的一個線程執行;
2、高可用的獲取鎖與釋液姿州放鎖;
3、高性能的獲取鎖與釋放鎖;
4、具備可重入特性;
5、具備鎖失效機制,防止死鎖;
6、具備非阻塞鎖特性,即沒有獲取到鎖將直接返回獲取鎖失敗
Redis性能高
命令簡單,實現方便
使用setnx加鎖,key為鎖名,value隨意不重復就行(一般用uuid)
給鎖添加expire時間,超過該時間redis過期(即自動釋放鎖)
設置獲取鎖的超時時間,若超過時間,則放棄獲取鎖
通過鎖名獲取鎖值
比較鎖值和當前uuid是否一致,一致則釋放鎖(通過delete命令刪除redis鍵值對)
2PC:two phase commit protocol,二階段提交協議,是一種強一致性設計。
同步阻塞(導致長久的資源鎖定) ,只有第一階段全部正常完成(返回失敗,回字返回超時都會返回 「准備失敗」 ),才會進入第二階段
因為協調者可冊碼能會在任意一個時間點(發送准備命令之前,發送准備命令之後,發送回滾事務命令之前,發送回滾事務命令之後,發送提交事務命令之前,發送提交事務命令之後)故障,導致資源阻塞。
T:try,指的是預留,即資源的預留和鎖定,注意是預留
C:confirm,指的是確認操作,這一步其實就是真正的執行了
C:cancel,指的是撤銷操作,可以理解為把預留階段的動作撤銷了
從思想上看和 2PC 差不多,都是先試探性的執行,如果都可以那就真正的執行,如果不行就回滾。
適用於對實時性要求沒那麼高的業務場景,如:簡訊通知
G. Python中的多進程與多線程/分布式該如何使用
Python提供了非常好用的多進程包multiprocessing,你只需要定義一個函數,Python會替你完成其他所有事情。
藉助這個包,可以輕松完成從單進程到並發執行的轉換。
1、新建單一進程
如果我們新建少量進程,可以如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
p = multiprocessing.Process(target=func, args=("hello", ))
p.start()
p.join()
print "Sub-process done."12345678910111213
2、使用進程池
是的,你沒有看錯,不是線程池。它可以讓你跑滿多核CPU,而且使用方法非常簡單。
注意要用apply_async,如果落下async,就變成阻塞版本了。
processes=4是最多並發進程數量。
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
for i in xrange(10):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close()
pool.join()
print "Sub-process(es) done."12345678910111213141516
3、使用Pool,並需要關注結果
更多的時候,我們不僅需要多進程執行,還需要關注每個進程的執行結果,如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
return "done " + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(10):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print res.get()
print "Sub-process(es) done."
2014.12.25更新
根據網友評論中的反饋,在Windows下運行有可能崩潰(開啟了一大堆新窗口、進程),可以通過如下調用來解決:
multiprocessing.freeze_support()1
附錄(自己的腳本):
#!/usr/bin/python
import threading
import subprocess
import datetime
import multiprocessing
def dd_test(round, th):
test_file_arg = 'of=/zbkc/test_mds_crash/1m_%s_%s_{}' %(round, th)
command = "seq 100 | xargs -i dd if=/dev/zero %s bs=1M count=1" %test_file_arg
print command
subprocess.call(command,shell=True,stdout=open('/dev/null','w'),stderr=subprocess.STDOUT)
def mds_stat(round):
p = subprocess.Popen("zbkc mds stat", shell = True, stdout = subprocess.PIPE)
out = p.stdout.readlines()
if out[0].find('active') != -1:
command = "echo '0205pm %s round mds status OK, %s' >> /round_record" %(round, datetime.datetime.now())
command_2 = "time (ls /zbkc/test_mds_crash/) 2>>/round_record"
command_3 = "ls /zbkc/test_mds_crash | wc -l >> /round_record"
subprocess.call(command,shell=True)
subprocess.call(command_2,shell=True)
subprocess.call(command_3,shell=True)
return 1
else:
command = "echo '0205 %s round mds status abnormal, %s, %s' >> /round_record" %(round, out[0], datetime.datetime.now())
subprocess.call(command,shell=True)
return 0
#threads = []
for round in range(1, 1600):
pool = multiprocessing.Pool(processes = 10) #使用進程池
for th in range(10):
# th_name = "thread-" + str(th)
# threads.append(th_name) #添加線程到線程列表
# threading.Thread(target = dd_test, args = (round, th), name = th_name).start() #創建多線程任務
pool.apply_async(dd_test, (round, th))
pool.close()
pool.join()
#等待線程完成
# for t in threads:
# t.join()
if mds_stat(round) == 0:
subprocess.call("zbkc -s",shell=True)
break
H. 如何用 python 構建一個簡單的分布式系統
從GitHub中整理出的15個最受歡迎的Python開源框架。這些框架包括事件I/O,OLAP,Web開發,高性能網路通信,測試,爬蟲等。
Django: Python Web應用開發框架
Django 應該是最出名的Python框架,GAE甚至Erlang都有框架受它影響。Django是走大而全的方向,它最出名的是其全自動化的管理後台:只需要使用起ORM,做簡單的對象定義,它就能自動生成資料庫結構、以及全功能的管理後台。
Diesel:基於Greenlet的事件I/O框架
Diesel提供一個整潔的API來編寫網路客戶端和伺服器。支持TCP和UDP。
Flask:一個用Python編寫的輕量級Web應用框架
Flask是一個使用Python編寫的輕量級Web應用框架。基於Werkzeug WSGI工具箱和Jinja2
模板引擎。Flask也被稱為「microframework」,因為它使用簡單的核心,用extension增加其他功能。Flask沒有默認使用的數
據庫、窗體驗證工具。
Cubes:輕量級Python OLAP框架
Cubes是一個輕量級Python框架,包含OLAP、多維數據分析和瀏覽聚合數據(aggregated data)等工具。
Kartograph.py:創造矢量地圖的輕量級Python框架
Kartograph是一個Python庫,用來為ESRI生成SVG地圖。Kartograph.py目前仍處於beta階段,你可以在virtualenv環境下來測試。
Pulsar:Python的事件驅動並發框架
Pulsar是一個事件驅動的並發框架,有了pulsar,你可以寫出在不同進程或線程中運行一個或多個活動的非同步伺服器。
Web2py:全棧式Web框架
Web2py是一個為Python語言提供的全功能Web應用框架,旨在敏捷快速的開發Web應用,具有快速、安全以及可移植的資料庫驅動的應用,兼容Google App Engine。
Falcon:構建雲API和網路應用後端的高性能Python框架
Falcon是一個構建雲API的高性能Python框架,它鼓勵使用REST架構風格,盡可能以最少的力氣做最多的事情。
Dpark:Python版的Spark
DPark是Spark的Python克隆,是一個Python實現的分布式計算框架,可以非常方便地實現大規模數據處理和迭代計算。DPark由豆瓣實現,目前豆瓣內部的絕大多數數據分析都使用DPark完成,正日趨完善。
Buildbot:基於Python的持續集成測試框架
Buildbot是一個開源框架,可以自動化軟體構建、測試和發布等過程。每當代碼有改變,伺服器要求不同平台上的客戶端立即進行代碼構建和測試,收集並報告不同平台的構建和測試結果。
Zerorpc:基於ZeroMQ的高性能分布式RPC框架
Zerorpc是一個基於ZeroMQ和MessagePack開發的遠程過程調用協議(RPC)實現。和 Zerorpc 一起使用的 Service API 被稱為 zeroservice。Zerorpc 可以通過編程或命令行方式調用。
Bottle: 微型Python Web框架
Bottle是一個簡單高效的遵循WSGI的微型python Web框架。說微型,是因為它只有一個文件,除Python標准庫外,它不依賴於任何第三方模塊。
Tornado:非同步非阻塞IO的Python Web框架
Tornado的全稱是Torado Web Server,從名字上看就可知道它可以用作Web伺服器,但同時它也是一個Python Web的開發框架。最初是在FriendFeed公司的網站上使用,FaceBook收購了之後便開源了出來。
webpy: 輕量級的Python Web框架
webpy的設計理念力求精簡(Keep it simple and powerful),源碼很簡短,只提供一個框架所必須的東西,不依賴大量的第三方模塊,它沒有URL路由、沒有模板也沒有資料庫的訪問。
Scrapy:Python的爬蟲框架
Scrapy是一個使用Python編寫的,輕量級的,簡單輕巧,並且使用起來非常的方便。
I. 電腦可以實現分布式爬蟲(python編寫)嗎
一般是用redis做消息列隊,將所有要抓取的慶迅url放到redis裡面,然後在分布式的各個譽襲此機器上面禪虧讀取redis裡面的url實行抓取
J. python分布式框架有哪些
Dask是Python的分布態伏斗式計算框架,它支持分布式的DataFrame,也就是pandas的DataFrame,二者介面完美兼容,但Dask是分布式計算的框架,可以支持內存無法裝載的數據,進行計算,它也支持對一般的python程序進行分布式帆磨計算。是非常優秀的Python框架。本文主要介紹Dask的幾種不同的調度器的使用。
Dask支持多種調度器,從單線程、多線程、多進程到本地分布式和集群分布式,各種調度器在不同情況下有不同的作用,本文來源於Dask官方文檔的翻譯,主要向大家介紹這五種調度廳鬧器的使用情景和方式。最後提供了如何在不同情境下設置Dask調度器的方法。