python任務隊列
A. python多進程為什麼一定要
前面講了為什麼Python里推薦用多進程而不是多線程,但是多進程也有其自己的限制:相比線程更加笨重、切換耗時更長,並且在python的多進程下,進程數量不推薦超過CPU核心數(一個進程只有一個GIL,所以一個進程只能跑滿一個CPU),因為一個進程佔用一個CPU時能充分利用機器的性能,但是進程多了就會出現頻繁的進程切換,反而得不償失。
不過特殊情況(特指IO密集型任務)下,多線程是比多進程好用的。
舉個例子:給你200W條url,需要你把每個url對應的頁面抓取保存起來,這種時候,單單使用多進程,效果肯定是很差的。為什麼呢?
例如每次請求的等待時間是2秒,那麼如下(忽略cpu計算時間):
1、單進程+單線程:需要2秒*200W=400W秒==1111.11個小時==46.3天,這個速度明顯是不能接受的2、單進程+多線程:例如我們在這個進程中開了10個多線程,比1中能夠提升10倍速度,也就是大約4.63天能夠完成200W條抓取,請注意,這里的實際執行是:線程1遇見了阻塞,CPU切換到線程2去執行,遇見阻塞又切換到線程3等等,10個線程都阻塞後,這個進程就阻塞了,而直到某個線程阻塞完成後,這個進程才能繼續執行,所以速度上提升大約能到10倍(這里忽略了線程切換帶來的開銷,實際上的提升應該是不能達到10倍的),但是需要考慮的是線程的切換也是有開銷的,所以不能無限的啟動多線程(開200W個線程肯定是不靠譜的)3、多進程+多線程:這里就厲害了,一般來說也有很多人用這個方法,多進程下,每個進程都能佔一個cpu,而多線程從一定程度上繞過了阻塞的等待,所以比單進程下的多線程又更好使了,例如我們開10個進程,每個進程里開20W個線程,執行的速度理論上是比單進程開200W個線程快10倍以上的(為什麼是10倍以上而不是10倍,主要是cpu切換200W個線程的消耗肯定比切換20W個進程大得多,考慮到這部分開銷,所以是10倍以上)。
還有更好的方法嗎?答案是肯定的,它就是:
4、協程,使用它之前我們先講講what/why/how(它是什麼/為什麼用它/怎麼使用它)what:
協程是一種用戶級的輕量級線程。協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:
協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
在並發編程中,協程與線程類似,每個協程表示一個執行單元,有自己的本地數據,與其它協程共享全局數據和其它資源。
why:
目前主流語言基本上都選擇了多線程作為並發設施,與線程相關的概念是搶占式多任務(Preemptive multitasking),而與協程相關的是協作式多任務。
不管是進程還是線程,每次阻塞、切換都需要陷入系統調用(system call),先讓CPU跑操作系統的調度程序,然後再由調度程序決定該跑哪一個進程(線程)。
而且由於搶占式調度執行順序無法確定的特點,使用線程時需要非常小心地處理同步問題,而協程完全不存在這個問題(事件驅動和非同步程序也有同樣的優點)。
因為協程是用戶自己來編寫調度邏輯的,對CPU來說,協程其實是單線程,所以CPU不用去考慮怎麼調度、切換上下文,這就省去了CPU的切換開銷,所以協程在一定程度上又好於多線程。
how:
python裡面怎麼使用協程?答案是使用gevent,使用方法:看這里使用協程,可以不受線程開銷的限制,我嘗試過一次把20W條url放在單進程的協程里執行,完全沒問題。
所以最推薦的方法,是多進程+協程(可以看作是每個進程里都是單線程,而這個單線程是協程化的)多進程+協程下,避開了CPU切換的開銷,又能把多個CPU充分利用起來,這種方式對於數據量較大的爬蟲還有文件讀寫之類的效率提升是巨大的。
小例子:
#-*- coding=utf-8 -*-
import requests
from multiprocessing import Process
import gevent
from gevent import monkey; monkey.patch_all()import sys
reload(sys)
sys.setdefaultencoding('utf8')
def fetch(url):
try:
s = requests.Session()
r = s.get(url,timeout=1)#在這里抓取頁面
except Exception,e:
print e
return ''
def process_start(tasks):
gevent.joinall(tasks)#使用協程來執行
def task_start(filepath,flag = 100000):#每10W條url啟動一個進程with open(filepath,'r') as reader:#從給定的文件中讀取urlurl = reader.readline().strip()
task_list = []#這個list用於存放協程任務
i = 0 #計數器,記錄添加了多少個url到協程隊列while url!='':
i += 1
task_list.append(gevent.spawn(fetch,url,queue))#每次讀取出url,將任務添加到協程隊列if i == flag:#一定數量的url就啟動一個進程並執行p = Process(target=process_start,args=(task_list,))p.start()
task_list = [] #重置協程隊列
i = 0 #重置計數器
url = reader.readline().strip()
if task_list not []:#若退出循環後任務隊列里還有url剩餘p = Process(target=process_start,args=(task_list,))#把剩餘的url全都放到最後這個進程來執行p.start()
if __name__ == '__main__':
task_start('./testData.txt')#讀取指定文件細心的同學會發現:上面的例子中隱藏了一個問題:進程的數量會隨著url數量的增加而不斷增加,我們在這里不使用進程池multiprocessing.Pool來控制進程數量的原因是multiprocessing.Pool和gevent有沖突不能同時使用,但是有興趣的同學可以研究一下gevent.pool這個協程池。
另外還有一個問題:每個進程處理的url是累積的而不是獨立的,例如第一個進程會處理10W個,第二個進程會變成20W個,以此類推。最後定位到問題是gevent.joinall()導致的問題,有興趣的同學可以研究一下為什麼會這樣。不過這個問題的處理方案是:主進程只負責讀取url然後寫入到list中,在創建子進程的時候直接把list傳給子進程,由子進程自己去構建協程。這樣就不會出現累加的問題
B. Python 非同步任務隊列Celery 使用
在 Python 中定義 Celery 的時候,我們要引入 Broker,中文翻譯過來就是「中間人」的意思。在工頭(生產者)提出任務的時候,把所有的任務放到 Broker 裡面,在 Broker 的另外一頭,一群碼農(消費者)等著取出一個個任務准備著手做。這種模式註定了整個系統會是個開環系統,工頭對於碼農們把任務做的怎樣是不知情的。所以我們要引入 Backend 來保存每次任務的結果。這個 Backend 也是存儲任務的信息用的,只不過這里存的是那些任務的返回結果。我們可以選擇只讓錯誤執行的任務返回結果到 Backend,這樣我們取回結果,便可以知道有多少任務執行失敗了。
其實現架構如下圖所示:
可以看到,Celery 主要包含以下幾個模塊:
celery可以通過pip自動安裝。
broker 可選擇使用RabbitMQ/redis,backend可選擇使用RabbitMQ/redis/MongoDB。RabbitMQ/redis/mongoDB的安裝請參考對應的官方文檔。
------------------------------rabbitmq相關----------------------------------------------------------
官網安裝方法: http://www.rabbitmq.com/install-windows.html
啟動管理插件:sbin/rabbitmq-plugins enable rabbitmq_management 啟動rabbitmq:sbin/rabbitmq-server -detached
rabbitmq已經啟動,可以打開頁面來看看 地址: http://localhost:15672/#/
用戶名密碼都是guest 。進入可以看到具體頁面。 關於rabbitmq的配置,網上很多 自己去搜以下就ok了。
------------------------------rabbitmq相關--------------------------------------------------------
項目結構如下:
使用前,需要三個方面:celery配置,celery實例,需執行的任務函數,如下:
Celery 的配置比較多,可以在 官方配置文檔: http://docs.celeryproject.org/en/latest/userguide/configuration.html 查詢每個配置項的含義。
當然,要保證上述非同步任務and下述定時任務都能正常執行,就需要先啟動celery worker,啟動命令行如下:
需 啟動beat ,執行定時任務時, Celery會通過celery beat進程來完成。Celery beat會保持運行, 一旦到了某一定時任務需要執行時, Celery beat便將其加入到queue中. 不像worker進程, Celery beat只需要一個即可。而且為了避免有重復的任務被發送出去,所以Celery beat僅能有一個。
命令行啟動:
如果你想將celery worker/beat要放到後台運行,推薦可以扔給supervisor。
supervisor.conf如下:
C. Python Queue 入門
Queue 叫隊列,是數據結構中的一種,基本上所有成熟的編程語言都內置了對 Queue 的支持。
Python 中的 Queue 模塊實現了多生產者和多消費者模型,當需要在多線程編程中非常實用。而且該模塊中的 Queue 類實現了鎖原語,不需要再考慮多線程安全問題。
該模塊內置了三種類型的 Queue,分別是 class queue.Queue(maxsize=0) , class queue.LifoQueue(maxsize=0) 和 class queue.PriorityQueue(maxsize=0) 。它們三個的區別僅僅是取出時的順序不一致而已。
Queue 是一個 FIFO 隊列,任務按照添加的順序被取出。
LifoQueue 是一個 LIFO 隊列,類似堆棧,後添加的任務先被取出。
PriorityQueue 是一個優先順序隊列,隊列裡面的任務按照優先順序排序,優先順序高的先被取出。
如你所見,就是上面所說的三種不同類型的內置隊列,其中 maxsize 是個整數,用於設置可以放入隊列中的任務數的上限。當達到這個大小的時候,插入操作將阻塞至隊列中的任務被消費掉。如果 maxsize 小於等於零,則隊列尺寸為無限大。
向隊列中添加任務,直接調用 put() 函數即可
put() 函數完整的函數簽名如下 Queue.put(item, block=True, timeout=None) ,如你所見,該函數有兩個可選參數。
默認情況下,在隊列滿時,該函數會一直阻塞,直到隊列中有空餘的位置可以添加任務為止。如果 timeout 是正數,則最多阻塞 timeout 秒,如果這段時間內還沒有空餘的位置出來,則會引發 Full 異常。
當 block 為 false 時,timeout 參數將失效。同時如果隊列中沒有空餘的位置可添加任務則會引發 Full 異常,否則會直接把任務放入隊列並返回,不會阻塞。
另外,還可以通過 Queue.put_nowait(item) 來添加任務,相當於 Queue.put(item, False) ,不再贅述。同樣,在隊列滿時,該操作會引發 Full 異常。
從隊列中獲取任務,直接調用 get() 函數即可。
與 put() 函數一樣, get() 函數也有兩個可選參數,完整簽名如下 Queue.get(block=True, timeout=None) 。
默認情況下,當隊列空時調用該函數會一直阻塞,直到隊列中有任務可獲取為止。如果 timeout 是正數,則最多阻塞 timeout 秒,如果這段時間內還沒有任務可獲取,則會引發 Empty 異常。
當 block 為 false 時,timeout 參數將失效。同時如果隊列中沒有任務可獲取則會立刻引發 Empty 異常,否則會直接獲取一個任務並返回,不會阻塞。
另外,還可以通過 Queue.get_nowait() 來獲取任務,相當於 Queue.get(False) ,不再贅述。同樣,在隊列為空時,該操作會引發 Empty 異常。
Queue.qsize() 函數返回隊列的大小。注意這個大小不是精確的,qsize() > 0 不保證後續的 get() 不被阻塞,同樣 qsize() < maxsize 也不保證 put() 不被阻塞。
如果隊列為空,返回 True ,否則返回 False 。如果 empty() 返回 True ,不保證後續調用的 put() 不被阻塞。類似的,如果 empty() 返回 False ,也不保證後續調用的 get() 不被阻塞。
如果隊列是滿的返回 True ,否則返回 False 。如果 full() 返回 True 不保證後續調用的 get() 不被阻塞。類似的,如果 full() 返回 False 也不保證後續調用的 put() 不被阻塞。
queue.Queue() 是 FIFO 隊列,出隊順序跟入隊順序是一致的。
queue.LifoQueue() 是 LIFO 隊列,出隊順序跟入隊順序是完全相反的,類似於棧。
優先順序隊列中的任務順序跟放入時的順序是無關的,而是按照任務的大小來排序,最小值先被取出。那任務比較大小的規則是怎麼樣的呢。
注意,因為列表的比較對規則是按照下標順序來比較的,所以在沒有比較出大小之前 ,隊列中所有列表對應下標位置的元素類型要一致。
好比 [2,1] 和 ["1","b"] 因為第一個位置的元素類型不一樣,所以是沒有辦法比較大小的,所以也就放入不了優先順序隊列。
然而對於 [2,1] 和 [1,"b"] 來說即使第二個元素的類型不一致也是可以放入優先順序隊列的,因為只需要比較第一個位置元素的大小就可以比較出結果了,就不需要比較第二個位置元素的大小了。
但是對於 [2,1] 和 1 [2,"b"] 來說,則同樣不可以放入優先順序隊列,因為需要比較第二個位置的元素才可以比較出結果,然而第二個位置的元素類型是不一致的,無法比較大小。
綜上,也就是說, 直到在比較出結果之前,對應下標位置的元素類型都是需要一致的 。
下面我們自定義一個動物類型,希望按照年齡大小來做優先順序排序。年齡越小優先順序越高。
本章節介紹了隊列以及其常用操作。因為隊列默認實現了鎖原語,因此在多線程編程中就不需要再考慮多線程安全問題了,對於程序員來說相當友好了。
D. python多進程中隊列不空時阻塞,求解為什麼
最近接觸一個項目,要在多個虛擬機中運行任務,參考別人之前項目的代碼,採用了多進程來處理,於是上網查了查python中的多進程
一、先說說Queue(隊列對象)
Queue是python中的標准庫,可以直接import 引用,之前學習的時候有聽過著名的「先吃先拉」與「後吃先吐」,其實就是這里說的隊列,隊列的構造的時候可以定義它的容量,別吃撐了,吃多了,就會報錯,構造的時候不寫或者寫個小於1的數則表示無限多
import Queue
q = Queue.Queue(10)
向隊列中放值(put)
q.put(『yang')
q.put(4)
q.put([『yan','xing'])
在隊列中取值get()
默認的隊列是先進先出的
>>> q.get()
『yang'
>>> q.get()
4
>>> q.get()
[『yan', 『xing']
當一個隊列為空的時候如果再用get取則會堵塞,所以取隊列的時候一般是用到
get_nowait()方法,這種方法在向一個空隊列取值的時候會拋一個Empty異常
所以更常用的方法是先判斷一個隊列是否為空,如果不為空則取值
隊列中常用的方法
Queue.qsize() 返回隊列的大小
Queue.empty() 如果隊列為空,返回True,反之False
Queue.full() 如果隊列滿了,返回True,反之False
Queue.get([block[, timeout]]) 獲取隊列,timeout等待時間
Queue.get_nowait() 相當Queue.get(False)
非阻塞 Queue.put(item) 寫入隊列,timeout等待時間
Queue.put_nowait(item) 相當Queue.put(item, False)
二、multiprocessing中使用子進程概念
from multiprocessing import Process
可以通過Process來構造一個子進程
p = Process(target=fun,args=(args))
再通過p.start()來啟動子進程
再通過p.join()方法來使得子進程運行結束後再執行父進程
from multiprocessing import Process
import os
# 子進程要執行的代碼
def run_proc(name):
print 'Run child process %s (%s)...' % (name, os.getpid())
if __name__=='__main__':
print 'Parent process %s.' % os.getpid()
p = Process(target=run_proc, args=('test',))
print 'Process will start.'
p.start()
p.join()
print 'Process end.'
上面的程序運行後的結果其實是按照上圖中1,2,3分開進行的,先列印1,3秒後列印2,再3秒後列印3
代碼中的p.close()是關掉進程池子,是不再向裡面添加進程了,對Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),調用close()之後就不能繼續添加新的Process了。
當時也可以是實例pool的時候給它定義一個進程的多少
如果上面的代碼中p=Pool(5)那麼所有的子進程就可以同時進行
三、多個子進程間的通信
多個子進程間的通信就要採用第一步中說到的Queue,比如有以下的需求,一個子進程向隊列中寫數據,另外一個進程從隊列中取數據,
#coding:gbk
from multiprocessing import Process, Queue
import os, time, random
# 寫數據進程執行的代碼:
def write(q):
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
time.sleep(random.random())
# 讀數據進程執行的代碼:
def read(q):
while True:
if not q.empty():
value = q.get(True)
print 'Get %s from queue.' % value
time.sleep(random.random())
else:
break
if __name__=='__main__':
# 父進程創建Queue,並傳給各個子進程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動子進程pw,寫入:
pw.start()
# 等待pw結束:
pw.join()
# 啟動子進程pr,讀取:
pr.start()
pr.join()
# pr進程里是死循環,無法等待其結束,只能強行終止:
print
print '所有數據都寫入並且讀完'
四、關於上面代碼的幾個有趣的問題
if __name__=='__main__':
# 父進程創建Queue,並傳給各個子進程:
q = Queue()
p = Pool()
pw = p.apply_async(write,args=(q,))
pr = p.apply_async(read,args=(q,))
p.close()
p.join()
print
print '所有數據都寫入並且讀完'
如果main函數寫成上面的樣本,本來我想要的是將會得到一個隊列,將其作為參數傳入進程池子里的每個子進程,但是卻得到
RuntimeError: Queue objects should only be shared between processes through inheritance
的錯誤,查了下,大意是隊列對象不能在父進程與子進程間通信,這個如果想要使用進程池中使用隊列則要使用multiprocess的Manager類
if __name__=='__main__':
manager = multiprocessing.Manager()
# 父進程創建Queue,並傳給各個子進程:
q = manager.Queue()
p = Pool()
pw = p.apply_async(write,args=(q,))
time.sleep(0.5)
pr = p.apply_async(read,args=(q,))
p.close()
p.join()
print
print '所有數據都寫入並且讀完'
這樣這個隊列對象就可以在父進程與子進程間通信,不用池則不需要Manager,以後再擴展multiprocess中的Manager類吧
關於鎖的應用,在不同程序間如果有同時對同一個隊列操作的時候,為了避免錯誤,可以在某個函數操作隊列的時候給它加把鎖,這樣在同一個時間內則只能有一個子進程對隊列進行操作,鎖也要在manager對象中的鎖
#coding:gbk
from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random
# 寫數據進程執行的代碼:
def write(q,lock):
lock.acquire() #加上鎖
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
lock.release() #釋放鎖
# 讀數據進程執行的代碼:
def read(q):
while True:
if not q.empty():
value = q.get(False)
print 'Get %s from queue.' % value
time.sleep(random.random())
else:
break
if __name__=='__main__':
manager = multiprocessing.Manager()
# 父進程創建Queue,並傳給各個子進程:
q = manager.Queue()
lock = manager.Lock() #初始化一把鎖
p = Pool()
pw = p.apply_async(write,args=(q,lock))
pr = p.apply_async(read,args=(q,))
p.close()
p.join()
print
print '所有數據都寫入並且讀完'
E. python 使用celery為了解決什麼業務問題
Celery是一個專注於實時處理和任務調度的分布式任務隊列。所謂任務就是消息,消息中的有效載荷中包含要執行任務需要的全部數據。
使用Celery的常見場景如下:
1. Web應用。當用戶觸發的一個操作需要較長時間才能執行完成時,可以把它作為任務交給Celery去非同步執行,執行完再返回給用戶。這段時間用戶不需要等待,提高了網站的整體吞吐量和響應時間。
2. 定時任務。生產環境經常會跑一些定時任務。假如你有上千台的伺服器、上千種任務,定時任務的管理很困難,Celery可以幫助我們快速在不同的機器設定不同種任務。
3. 同步完成的附加工作都可以非同步完成。比如發送簡訊/郵件、推送消息、清理/設置緩存等。
Celery還提供了如下的特性:
1. 方便地查看定時任務的執行情況,比如執行是否成功、當前狀態、執行任務花費的時間等。
2. 可以使用功能齊備的管理後台或者命令行添加、更新、刪除任務。
3. 方便把任務和配置管理相關聯。
4. 可選多進程、Eventlet和Gevent三種模式並發執行。
5. 提供錯誤處理機制。
- 提供多種任務原語,方便實現任務分組、拆分和調用鏈。
- 支持多種消息代理和存儲後端。
F. Python實現簡單多線程任務隊列
Python實現簡單多線程任務隊列
最近我在用梯度下降演算法繪制神經網路的數據時,遇到了一些演算法性能的問題。梯度下降演算法的代碼如下(偽代碼):
defgradient_descent(): # the gradient descent code plotly.write(X, Y)
一般來說,當網路請求 plot.ly 繪圖時會阻塞等待返回,於是也會影響到其他的梯度下降函數的執行速度。
一種解決辦法是每調用一次 plotly.write 函數就開啟一個新的線程,但是這種方法感覺不是很好。 我不想用一個像 cerely(一種分布式任務隊列)一樣大而全的任務隊列框架,因為框架對於我的這點需求來說太重了,並且我的繪圖也並不需要 redis 來持久化數據。
那用什麼辦法解決呢?我在 python 中寫了一個很小的任務隊列,它可以在一個單獨的線程中調用 plotly.write函數。下面是程序代碼。
classTaskQueue(Queue.Queue):
首先我們繼承 Queue.Queue 類。從 Queue.Queue 類可以繼承 get 和 put 方法,以及隊列的行為。
def__init__(self, num_workers=1): Queue.Queue.__init__(self) self.num_workers=num_workers self.start_workers()
初始化的時候,我們可以不用考慮工作線程的數量。
defadd_task(self, task,*args,**kwargs): args=argsor() kwargs=kwargsor{} self.put((task, args, kwargs))
我們把 task, args, kwargs 以元組的形式存儲在隊列中。*args 可以傳遞數量不等的參數,**kwargs 可以傳遞命名參數。
defstart_workers(self): foriinrange(self.num_workers): t=Thread(target=self.worker) t.daemon=True t.start()
我們為每個 worker 創建一個線程,然後在後台刪除。
下面是 worker 函數的代碼:
defworker(self): whileTrue: tupl=self.get() item, args, kwargs=self.get() item(*args,**kwargs) self.task_done()
worker 函數獲取隊列頂端的任務,並根據輸入參數運行,除此之外,沒有其他的功能。下面是隊列的代碼:
我們可以通過下面的代碼測試:
defblokkah(*args,**kwargs): time.sleep(5) print「Blokkah mofo!」 q=TaskQueue(num_workers=5) foriteminrange(1): q.add_task(blokkah) q.join()# wait for all the tasks to finish. print「Alldone!」
Blokkah 是我們要做的任務名稱。隊列已經緩存在內存中,並且沒有執行很多任務。下面的步驟是把主隊列當做單獨的進程來運行,這樣主程序退出以及執行資料庫持久化時,隊列任務不會停止運行。但是這個例子很好地展示了如何從一個很簡單的小任務寫成像工作隊列這樣復雜的程序。
defgradient_descent(): # the gradient descent code queue.add_task(plotly.write, x=X, y=Y)
修改之後,我的梯度下降演算法工作效率似乎更高了。如果你很感興趣的話,可以參考下面的代碼。 classTaskQueue(Queue.Queue): def__init__(self, num_workers=1):Queue.Queue.__init__(self)self.num_workers=num_workersself.start_workers() defadd_task(self, task,*args,**kwargs):args=argsor()kwargs=kwargsor{}self.put((task, args, kwargs)) defstart_workers(self):foriinrange(self.num_workers):t=Thread(target=self.worker)t.daemon=Truet.start() defworker(self):whileTrue:tupl=self.get()item, args, kwargs=self.get()item(*args,**kwargs)self.task_done() deftests():defblokkah(*args,**kwargs):time.sleep(5)print"Blokkah mofo!" q=TaskQueue(num_workers=5) foriteminrange(10):q.add_task(blokkah) q.join()# block until all tasks are doneprint"All done!" if__name__=="__main__":tests()
G. 如何在Python中編寫並發程序
GIL
在Python中,由於歷史原因(GIL),使得Python中多線程的效果非常不理想.GIL使得任何時刻Python只能利用一個CPU核,並
且它的調度演算法簡單粗暴:多線程中,讓每個線程運行一段時間t,然後強行掛起該線程,繼而去運行其他線程,如此周而復始,直到所有線程結束.
這使得無法有效利用計算機系統中的"局部性",頻繁的線程切換也對緩存不是很友好,造成資源的浪費.
據說Python官方曾經實現了一個去除GIL的Python解釋器,但是其效果還不如有GIL的解釋器,遂放棄.後來Python官方推出了"利
用多進程替代多線程"的方案,在Python3中也有concurrent.futures這樣的包,讓我們的程序編寫可以做到"簡單和性能兼得".
多進程/多線程+Queue
一般來說,在Python中編寫並發程序的經驗是:計算密集型任務使用多進程,IO密集型任務使用多進程或者多線程.另外,因為涉及到資源共享,所
以需要同步鎖等一系列麻煩的步驟,代碼編寫不直觀.另外一種好的思路是利用多進程/多線程+Queue的方法,可以避免加鎖這樣麻煩低效的方式.
現在在Python2中利用Queue+多進程的方法來處理一個IO密集型任務.
假設現在需要下載多個網頁內容並進行解析,單進程的方式效率很低,所以使用多進程/多線程勢在必行.
我們可以先初始化一個tasks隊列,裡面將要存儲的是一系列dest_url,同時開啟4個進程向tasks中取任務然後執行,處理結果存儲在一個results隊列中,最後對results中的結果進行解析.最後關閉兩個隊列.
下面是一些主要的邏輯代碼.
# -*- coding:utf-8 -*-
#IO密集型任務
#多個進程同時下載多個網頁
#利用Queue+多進程
#由於是IO密集型,所以同樣可以利用threading模塊
import multiprocessing
def main():
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
cpu_count = multiprocessing.cpu_count() #進程數目==CPU核數目
create_process(tasks, results, cpu_count) #主進程馬上創建一系列進程,但是由於阻塞隊列tasks開始為空,副進程全部被阻塞
add_tasks(tasks) #開始往tasks中添加任務
parse(tasks, results) #最後主進程等待其他線程處理完成結果
def create_process(tasks, results, cpu_count):
for _ in range(cpu_count):
p = multiprocessing.Process(target=_worker, args=(tasks, results)) #根據_worker創建對應的進程
p.daemon = True #讓所有進程可以隨主進程結束而結束
p.start() #啟動
def _worker(tasks, results):
while True: #因為前面所有線程都設置了daemon=True,故不會無限循環
try:
task = tasks.get() #如果tasks中沒有任務,則阻塞
result = _download(task)
results.put(result) #some exceptions do not handled
finally:
tasks.task_done()
def add_tasks(tasks):
for url in get_urls(): #get_urls() return a urls_list
tasks.put(url)
def parse(tasks, results):
try:
tasks.join()
except KeyboardInterrupt as err:
print "Tasks has been stopped!"
print err
while not results.empty():
_parse(results)
if __name__ == '__main__':
main()
利用Python3中的concurrent.futures包
在Python3中可以利用concurrent.futures包,編寫更加簡單易用的多線程/多進程代碼.其使用感覺和Java的concurrent框架很相似(借鑒?)
比如下面的簡單代碼示例
def handler():
futures = set()
with concurrent.futures.ProcessPoolExecutor(max_workers=cpu_count) as executor:
for task in get_task(tasks):
future = executor.submit(task)
futures.add(future)
def wait_for(futures):
try:
for future in concurrent.futures.as_completed(futures):
err = futures.exception()
if not err:
result = future.result()
else:
raise err
except KeyboardInterrupt as e:
for future in futures:
future.cancel()
print "Task has been canceled!"
print e
return result
總結
要是一些大型Python項目也這般編寫,那麼效率也太低了.在Python中有許多已有的框架使用,使用它們起來更加高效.
H. Python分布式進程中你會遇到的坑
寫在前面
小驚大怪
你是不是在用Python3或者在windows系統上編程?最重要的是你對進程和線程不是很清楚?那麼恭喜你,在python分布式進程中,會有坑等著你去挖。。。(hahahaha,此處允許我嚇唬一下你)開玩笑的啦,不過,如果你知道序列中不支持匿名函數,那這個坑就和你say byebye了。好了話不多數,直接進入正題。
分布式進程
正如大家所知道的Process比Thread更穩定,而且Process可以分布到多台機器上,而Thread最多隻能分布到同一陸坦咐台機器的多個CPU上。Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多台機器上。一個服務進程可以作為調度者,將任務分布到其他多個進程中,依靠網路通信。由於managers模塊封裝很好,不必了解網路通信的細節,就可以很容易地編寫分布式多進程程序。
代碼記錄
舉個例子
如果我們已經有一個通過Queue通信的多進程程序在同一台機器上運行,現在,由於處理任務的進程任務繁重,希望把發送任務的進程和處理任務的進程分布到兩台機器上,這應該怎麼用分布式進程來實現呢?你已經知道了原有的Queue可以繼續使用,而且通過managers模塊把Queue通過網路暴露出去,就可以讓其他機器的進程來訪問Queue了。好,那我們就這么干!
寫個task_master.py
我們先看服務進程。服務進程負責啟動Queue,把Queue注冊到網路上,然後往Queue裡面寫入任務。
請注意,當我們在一台機器上寫多進程程序時,創建的Queue可以直接拿來用,但是,在分布式多進程環境下,添加任務到Queue不可以直接對原始的task_queue進行操作,那樣就繞過了QueueManager的封裝,必須通過manager.get_task_queue()獲得的Queue介面添加。然後,在另一台機器上啟動任務進程(本機上啟動也可以)
寫個task_worker.py
任務進程要通過網路連接到服務進程,所以要指定服務進程的IP。
運行結果
現在,可信沒以試試分布式進程的工作效果了。先啟動task_master.py服務進程:
task_master.py進程發送完任務後,開始等待result隊列的結果。現在啟動task_worker.py進程:
看到沒,結果都出錯了,我們好好分析一下到底哪出錯了。。。
錯誤分析
在task_master.py的報錯提示中,我們知道它說lambda錯誤,這是因為序列化不支持匿名函數,所以我們得修改代碼,重新對queue用QueueManager進行封裝放到網路中。
其中task_queue和result_queue是兩個隊列,分別存放任務和結果。它們用來進行進程間通信,交換對象。
因為是分布式的環境,放入queue中的數據需要等待Workers機器運算處理後再進行讀取,這樣就需要對queue用QueueManager進行封裝放到網路中,這是通過上面的2行代碼來實現的。我們給return_task_queue的網路調用介面取了一個名早純get_task_queue,而return_result_queue的名字是get_result_queue,方便區分對哪個queue進行操作。task.put(n)即是對task_queue進行寫入數據,相當於分配任務。而result.get()即是等待workers機器處理後返回的結果。
值得注意 在windows系統中你必須要寫IP地址,而其他操作系統比如linux操作系統則就不要了。
修改後的代碼
在task_master.py中修改如下:
在task_worker.py中修改如下:
先運行task_master.py,然後再運行task_worker.py
(1)task_master.py運行結果如下
(2)task_worker.py運行結果如下
知識補充
這個簡單的Master/Worker模型有什麼用?其實這就是一個簡單但真正的分布式計算,把代碼稍加改造,啟動多個worker,就可以把任務分布到幾台甚至幾十台機器上,比如把計算n*n的代碼換成發送郵件,就實現了郵件隊列的非同步發送。
Queue對象存儲在哪?注意到task_worker.py中根本沒有創建Queue的代碼,所以,Queue對象存儲在task_master.py進程中:
而Queue之所以能通過網路訪問,就是通過QueueManager實現的。由於QueueManager管理的不止一個Queue,所以,要給每個Queue的網路調用介面起個名字,比如get_task_queue。task_worker這里的QueueManager注冊的名字必須和task_manager中的一樣。對比上面的例子,可以看出Queue對象從另一個進程通過網路傳遞了過來。只不過這里的傳遞和網路通信由QueueManager完成。
authkey有什麼用?這是為了保證兩台機器正常通信,不被其他機器惡意干擾。如果task_worker.py的authkey和task_master.py的authkey不一致,肯定連接不上。