pythonmap多線程
『壹』 python中多線程調用全局變數,值不是修改後的值
多線程讀取全局變數需要引用線程鎖,否則多個線程同時讀取同一個全局變數會出現和預期不一樣的值
『貳』 python 中的map(轉載)
1 map()函數的簡介以及語法:
map是python內置函數,會根據提供的函數對指定的序列做映射。
map()函數的格式是:
map(function,iterable,...)
第一個參數接受一個函數名,後面的參數接受一個或多個可迭代的序列,返回的是一個集合。
把函數依次作用在list中的每一個元素上,得到一個新的list並返回。注意,map不改變原list,而是返回一個新list。
2 map()函數實例:
del square(x):
return x ** 2
map(square,[1,2,3,4,5]) ---- -要列印結果需要 print(*map(square,[1,2,3,4,5])),這塊列印了再列印就會為空
# 結果如下:
[1,4,9,16,25]
通過使用lambda匿名函數的方法使用map()函數:
map(lambda x, y: x+y,[1,3,5,7,9],[2,4,6,8,10])
# 結果如下:
[3,7,11,15,19]
通過lambda函數使返回值是一個元組:
map(lambdax, y : (x**y,x+y),[2,4,6],[3,2,1])
# 結果如下
[(8,5),(16,6),(6,7)]
當不傳入function時,map()就等同於zip(),將多個列表相同位置的元素歸並到一個元組:
map(None,[2,4,6],[3,2,1])
# 結果如下
[(2,3),(4,2),(6,1)]
通過map還可以實現類型轉換
將元組轉換為list:
map(int,(1,2,3))
# 結果如下:
[1,2,3]
將字元串轉換為list:
map(int,'1234')
# 結果如下:
[1,2,3,4]
提取字典中的key,並將結果放在一個list中:
map(int,{1:2,2:3,3:4})
# 結果如下
[1,2,3]
原文鏈接:https://blog.csdn.net/quanlingtu1272/article/details/95482253
『叄』 Python:map函數用法詳解
一個簡單的例子:將一個list中所有元素平方,常規的做法如下圖所示,雖然實現了這個功能,但並沒有給人一目瞭然的感覺。若換成map來實現,則會好很多。
1、map函數介紹及其簡單使用
上述用一個簡單的例子演示的map函數的用法及其優勢,下面將詳細介紹map函數的用法:map()函數接收兩個參數,一個是函數,一個是Iterable,map將傳入的函數依次作用到序列的每一個元素,並把結果作為新的Iterable返回。其語法格式為:
map(function,iterable...)
function---函數名
iterable---一個或多個序列
map作為高階函數,事實上它把運算規則抽象了,我們可以用這種方式計算任意復雜的函數,再比如,把一個list的所有數據轉為string類型:
再舉一個小例子,對list中的各個元素開方,一步到位:
!注意:在使用math自帶函數時,只需要函數名即可
2、map函數與lambda函數結合使用,下面方法同樣可以達到對list中的數二次方的目的
map函數與lambda函數結合使用,可以傳入兩個參數相加:
還可以同時計算多個值:
『肆』 Python多線程總結
在實際處理數據時,因系統內存有限,我們不可能一次把所有數據都導出進行操作,所以需要批量導出依次操作。為了加快運行,我們會採用多線程的方法進行數據處理, 以下為我總結的多線程批量處理數據的模板:
主要分為三大部分:
共分4部分對多線程的內容進行總結。
先為大家介紹線程的相關概念:
在飛車程序中,如果沒有多線程,我們就不能一邊聽歌一邊玩飛車,聽歌與玩 游戲 不能並行;在使用多線程後,我們就可以在玩 游戲 的同時聽背景音樂。在這個例子中啟動飛車程序就是一個進程,玩 游戲 和聽音樂是兩個線程。
Python 提供了 threading 模塊來實現多線程:
因為新建線程系統需要分配資源、終止線程系統需要回收資源,所以如果可以重用線程,則可以減去新建/終止的開銷以提升性能。同時,使用線程池的語法比自己新建線程執行線程更加簡潔。
Python 為我們提供了 ThreadPoolExecutor 來實現線程池,此線程池默認子線程守護。它的適應場景為突發性大量請求或需要大量線程完成任務,但實際任務處理時間較短。
其中 max_workers 為線程池中的線程個數,常用的遍歷方法有 map 和 submit+as_completed 。根據業務場景的不同,若我們需要輸出結果按遍歷順序返回,我們就用 map 方法,若想誰先完成就返回誰,我們就用 submit+as_complete 方法。
我們把一個時間段內只允許一個線程使用的資源稱為臨界資源,對臨界資源的訪問,必須互斥的進行。互斥,也稱間接制約關系。線程互斥指當一個線程訪問某臨界資源時,另一個想要訪問該臨界資源的線程必須等待。當前訪問臨界資源的線程訪問結束,釋放該資源之後,另一個線程才能去訪問臨界資源。鎖的功能就是實現線程互斥。
我把線程互斥比作廁所包間上大號的過程,因為包間里只有一個坑,所以只允許一個人進行大號。當第一個人要上廁所時,會將門上上鎖,這時如果第二個人也想大號,那就必須等第一個人上完,將鎖解開後才能進行,在這期間第二個人就只能在門外等著。這個過程與代碼中使用鎖的原理如出一轍,這里的坑就是臨界資源。 Python 的 threading 模塊引入了鎖。 threading 模塊提供了 Lock 類,它有如下方法加鎖和釋放鎖:
我們會發現這個程序只會列印「第一道鎖」,而且程序既沒有終止,也沒有繼續運行。這是因為 Lock 鎖在同一線程內第一次加鎖之後還沒有釋放時,就進行了第二次 acquire 請求,導致無法執行 release ,所以鎖永遠無法釋放,這就是死鎖。如果我們使用 RLock 就能正常運行,不會發生死鎖的狀態。
在主線程中定義 Lock 鎖,然後上鎖,再創建一個子 線程t 運行 main 函數釋放鎖,結果正常輸出,說明主線程上的鎖,可由子線程解鎖。
如果把上面的鎖改為 RLock 則報錯。在實際中設計程序時,我們會將每個功能分別封裝成一個函數,每個函數中都可能會有臨界區域,所以就需要用到 RLock 。
一句話總結就是 Lock 不能套娃, RLock 可以套娃; Lock 可以由其他線程中的鎖進行操作, RLock 只能由本線程進行操作。
『伍』 python map是不是多線程
顯然不是。map是完全的單線程。
『陸』 js判定是否傳入回調函數
關於js回調函數,自己之前了解過,但是概念不是很清晰了,這里重新找幾篇博客回顧一下概念,整理的感覺比較好的幾個博客的總結的概念。方便復習。
js中的回調函數的理解:回調函數就是傳遞一個參數化函數,就是將這個函數作為一個參數傳到另外一個主函數裡面,當那個主函數執行完之後,再執行傳遞過去的函數,走這個過程的參數化的函數,就叫回調函數,換個說法也就是被作為參數傳遞到另一個函數(主函數)的那個函數就叫做回調函數。
回調函數:函數a有一個參數,這個參數是個函數b,當函數a執行完以後執行函數b。那麼這個過程就叫回調。,這句話的意思是函數b以一個參數的形式傳入函數a並執行,順序是先執行a ,然後執行參數b,b就是所謂的回調函數。
function a(callback){
alert('a');
callback.call(this);//或者是 callback(), callback.apply(this),看個人喜好
}
function b(){ // 為回調函數。
alert('b');
}
//調用
a(b);
1
2
3
4
5
6
7
8
9
1
2
3
4
5
6
7
8
9
js中的回調函數:官方解釋,當程序跑起來的時候,一般情況下,應用程序會時常通過API調用庫里的所先預備好的函數,但是有些庫函數,卻要求應用先傳給它的一個函數,好在適合的時候調用,以完成目標任務。這個被傳入的,後又被調用的函數成為回調函數。
通常將一個函數B傳入另外一個函數A,並且在需要的時候調用A.,說白了就是回溯函數,先定義好將要使用的函數體,飯後在使用在調用這個函數我們通常把callback作為一個參數傳入定義的那個函數。下面我們看一段實現這個效果的js代碼。
function Buy(name,goods1,callback) {
alert(name+' buy '+goods1);
if(callback&&typeof(callback)==="function")
callback();
}
Buy('xiaoming','apple',function(){
alert("shopping finish");
});
1
2
3
4
5
6
7
8
1
2
3
4
5
6
7
8
一個簡單的代碼,一開始不知道要買啥,等到買東西的時候,立即把之前定義好的函數調用出來,最好加上判斷,因為一切的前提是callback必須是一個函數,輸出結果為:
xiaoming buy apple
shopping finish
1
2
1
2
這樣應該能理解什麼是回調函數了吧。
打開CSDN,閱讀體驗更佳
JS中的 回調函數(callback)_前端小草籽的博客_js回調函數
1.什麼是回調函數(callback)呢? 把函數當作一個參數傳到另外一個函數中,當需要用這個函數是,再回調運行()這個函數. 回調函數是一段可執行的代碼段,它作為一個參數傳遞給其他的代碼,其作用是在需要的時候方便調用這段(回調函數)代碼。
JS中什麼是回調函數?_路過的假面騎士dcd的博客
參數可以拿來用,你也可以不用。形參,形式上的參數,並沒有實際意義,只是幫你完成函數內部邏輯運算而設置的。 回調函數:被作為實參傳入另一函數,並在該外部函數內被調用,用以來完成某些任務的函數,稱為回調函數。 functiongreeting(name){...
JS回調函數——簡單易懂有實例
初學js的時候,被回調函數搞得很暈,現在回過頭來總結一下什麼是回調函數。什麼是JS?(點擊查看) 下面先看看標準的解釋: <script language="javascript"> 02 function SortNumber( obj, func ) // 定義通用排序函數 03 { 04 //...
繼續訪問
淺析JS中回調函數及用法
主要介紹了JS中回調函數及用法,通過實例代碼給大家詳細介紹了什麼是回調函數,非常不錯,具有一定的參考借鑒價值,需要的朋友參考下吧
JS回調函數(callback)
淺談JS回調函數
繼續訪問
JS中的 回調函數(callback)
目錄1.什麼是回調函數(callback)呢?2.回調函數有哪些特點?3.回調函數中this的指向問題4.為什麼要用到回調函數?5.回調函數和非同步操作的關系是什麼?回調函數是非同步么?把函數當作一個參數傳到另外一個函數中,當需要用這個函數是,再回調運行()這個函數.回調函數是一段可執行的代碼段,它作為一個參數傳遞給其他的代碼,其作用是在需要的時候方便調用這段(回調函數)代碼。(作為參數傳遞到另外一個函數中,這個作為參數的函數就是回調函數)理解:函數可以作為一個參數傳遞到另外一個函數中。 分析:add(1,
繼續訪問
js之回調函數
回調函數 回調函數被作為實參傳入另一函數,並在該外部函數內被調用,用以來完成某些任務的函數,稱為回調函數。 一個簡單的例子 <script type="text/javascript"> window.onload = function(){ // 回調函數 function a(m){ return m+m; } console.log(a(3));//6 返回一個數據 console.log(a);//f a(m){return m+n} 返
繼續訪問
<Zhuuu_ZZ>HIVE(十一)函數
Hive內置函數一 Hive函數分類二 字元函數二 類型轉換函數和數學函數三 日期函數四 集合函數五 條件函數六 聚合函數和表生成函數6.1 聚合函數6.2 表生成函數:輸出可以作為表使用 一 Hive函數分類 從輸入輸出角度分類 標准函數:一行數據中的一列或多列為輸入,結果為單一值 聚合函數:多行的零列到多列為輸入,結果為單一值 表生成函數:零個或多個輸入,結果為多列或多行 從實現方式分類 內置函數 自定義函數 UDF:自定義標准函數 UDAF:自定義聚合函數 UDTF:自定義表生成函數
繼續訪問
常見的開窗函數
開窗函數與聚合函數計算方式一樣,開窗函數也是對行集組進行聚合計算,但是它不像普通聚合函數那樣每組只返回一個值,開窗函數可以為每組返回多個值。 開窗函數的語法為:over(partition by 列名1 order by 列名2 ),括弧中的兩個關鍵詞partition by 和order by 可以只出現一個。over() 前面是一個函數,如果是聚合函數,那麼order by 不能一起使用。 開窗函數主要分為以下兩類: 窗口函數OVER()指定一組行,開窗函數計算從窗口函數輸出的結果集中各行的值。
繼續訪問
開窗函數總結
4.2.1,表的數據 4.2.3,開窗函數查詢 1,結果如下: 2,結果如下,可以參照這個結果進行理解rows和range的區別 3,結果如下,可以用於獲取當前數據行的 上次登錄時間 的需求 4,結果如下,結合lead()函數 可以獲取用戶 上次登錄時間與下次登錄時間的 需求 5,結果如下,可以用於指定時間內最新或最舊數據的需求。 6,結果如下,可用於求比例的需求 7,結果如下: 7,結果如下: 8,結果如下 9,結果如下: ,10,結果
繼續訪問
熱門推薦 python中def用法
一、函數調用的含義 函數是類似於可封裝的程序片段。允許你給一塊語句一個名字,允許您在你的程序的任何地方使用指定的名字運行任何次數。 python中有許多內置函數,如len和range。 函數概念可能是任何有價值軟體中最重要的塊(在任何編程語言中)。 二、定義函數使用def關鍵字 在這個關鍵字之後是標識函數的名字; 其次是在一對括弧中可以附上一些變數名; 最後在行的末尾...
繼續訪問
python map函數的作用_python語言基礎之map函數,urlib.request,多線程
1.map函數map 是 Python 自帶的內置函數,它的作用是把一個函數應用在一個(或多個)序列上,把列表中的每一項作為函數輸入進行計算,再把計算的結果以列表的形式返回。map 的第一個參數是一個函數,之後的參數是序列,可以是 list、tuple。當 map 中的函數為 None 時,結果將會直接返回參數組成的列表。(python3中去掉了None,會報錯)lst_1 = (1,2,3,4,...
繼續訪問
JS中什麼是回調函數?
對於剛學JS的初學者來說(包括我現在的自己),對於這個回調函數真的是踩坑無數,於是乎想作為一個淋過雨的人,想為後面剛入門的人打一把傘。 本文不會用專業的知識詞彙,只會用口語來簡單讓你有一個概念幫你淺淺的理解這個名詞,如果你是一個學習JS剛遇到這個名詞,正處於一臉懵逼的狀態,那麼本文對於會有幫助,但如果你想要研究更深層次的原理,用法和含義,可能本文不適合你。 廢話不多說,讓我們先看MDN的解釋。 這段話,首先我們要搞懂一個東西,什麼是實參。 我們都知道,函數可以接受參數,形參和實參。那麼什麼是
繼續訪問
js回調函數的兩種寫法
回調函數 應用程序時常會通過API調用庫里所預先備好的函數。但是有些庫函數(library function)卻要求應用先傳給它一個函數,好在合適的時候調用,以完成目標任務。這個被傳入的、後又被調用的函數就稱為回調函數(callback function)。 總結一下回調函數的兩種寫法與用法: 非參數回調函數: 這種回調比較簡單 ,往往只需傳一個函數名就可以。 function demo(arg,callback){ } 再來看看怎麼寫這個函數 在js中是可以通過函數名來調用函數的 例如: var
繼續訪問
【一句話攻略】徹底理解JS中的回調(Callback)函數
回調函數
繼續訪問
sql開窗函數(窗口函數)詳解
一、什麼是開窗函數 開窗函數/分析函數:over() 開窗函數也叫分析函數,有兩類:一類是聚合開窗函數,一類是排序開窗函數。 開窗函數的調用格式為: 函數名(列名) OVER(partition by 列名 order by列名) 。 如果你沒聽說過開窗函數,看到上面開窗函數的調用方法,你可能還會有些疑惑。但只要你了解聚合函數,那麼理解開窗函數就非常容易了。 我們知道聚合函數對一組值執行計算並返回單一的值,如sum(),count(),max(),min(), avg()等,這些函數常與grou
繼續訪問
最新發布 Python中很常用的函數map(),一起來看看用法
Python2中map直接返回作用後的元素的列表 Python3中map返回的則是一個map對象 如果想得到列表對象,則還需要調用list轉化為列表對象 Python2中,map()函數的 function 可以為None,如map(iterable1,iterable2[,...[,iterable n),其作用類似於將iterable*中的對應索引的值取出作為一個元組,最終返回一個包含多個元組的列表。而Python3中,不指定 function,就會報錯。
繼續訪問
Oracle分析函數Over()
一、Over()分析函數 說明:聚合函數(如sum()、max()等)可以計算基於組的某種聚合值,但是聚合函數對於某個組只能返回一行記錄。若想對於某組返回多行記錄,則需要使用分析函數。 1、rank()/dense_rank over(partition by ... order by ...) 說明:over()在什麼條件之上; partition by 按哪個欄位劃分組; ...
繼續訪問
mysql開窗函數有哪些_mysql開窗函數
開窗函數:它可以理解為記錄集合,開窗函數也就是在滿足某種條件的記錄集合上執行的特殊函數。對於每條記錄都要在此窗口內執行函數,有的函數隨著記錄不同,窗口大小都是固定的,這種屬於靜態窗口;有的函數則相反,不同的記錄對應著不同的窗口,這種動態變化的窗口叫滑動窗口。開窗函數的本質還是聚合運算,只不過它更具靈活性,它對數據的每一行,都使用與該行相關的行進行計算並返回計算結果。開窗函數和普通聚合函數的區別聚合...
繼續訪問
SQL:開窗函數(窗口函數)
4、 窗口函數 目錄4、 窗口函數4.1 排序窗口函數rank4.2 rank(), dense_rank(), row_number()區別4.3 、排序截取數據lag(),lead(),ntile(),cume_dist()4.4 聚合函數作為窗口函數4.4、over(- - rows between and ) 簡單理解,就是對查詢的結果多出一列,這一列可以是聚合值,也可以是排序值。 開窗函數一般就是說的是over()函數,其窗口是由一個 OVER 子句 定義的多行記錄 開窗函數一般分為兩類,
繼續訪問
開窗函數(分析函數)使用詳解
開窗函數 簡介 開窗函數:在開窗函數出現之前存在著很多用 SQL 語句很難解決的問題,很多都要通過復雜的相關子查詢或者存儲過程來完成。為了解決這些問題,在 2003 年 ISO SQL 標准加入了開窗函數,開窗函數的使用使得這些經典的難題可以被輕松的解決。目前在 MSSQLServer、Oracle、DB2 等主流資料庫中都提供了對開窗函數的支持,MySQL8.0支持。 5.7 --> 8.0 開窗函數簡介:與聚合函數一樣,開窗函數也是對行集組進行聚合計算,但是它不像普通聚合函數那樣每組只返回一個
繼續訪問
敲黑板啦!開窗函數你學會了嗎
特徵分析與偏移分析什麼是開窗函數?學習目標:1、累計計算窗口函數(1)sum(…) over(……)(2)avg(…) over(……)(3)語法總結:2、分區排序窗口函數3、分組排序窗口函數4、偏移分析窗口函數練習總結: 什麼是開窗函數? 開窗函數用於為行定義一個窗口(這里的窗口是指運算將要操作的行的集合),它對一組值進行操作,不需要使用GROUP BY子句對數據進行分組,能夠在同一行中同時返回...
繼續訪問
『柒』 map函數的用法python
map函數的用法如下:
map(func, lst) ,將傳⼊的函數變數 func 作⽤到 lst 變數的每個元素中,並將結果組成新的列表 (Python2)/ 迭代器(Python3) 返回。
注意:
map()返回的是一個迭代器,直接列印map()的結果是返回的一個對象。
map函數示例代碼:
lst = ['1', '2', '3', '4', '5', '6']
print(lst)
lst_int = map(lambda x: int(x), lst)
# print(list(lst_int))
for i in lst_int:
print(i, end=' ')
print()
print(list(lst_int))
『捌』 map 在多線程中的操作
只有一個線程讀,不需要加鎖。
只有一個線程寫,不需要加鎖。
多個線程讀 不需要加鎖。
只有一個線程寫,其他線程讀或者寫需要加鎖。
(留待驗證)
『玖』 python 多進程和多線程配合
由於python的多線程中存在PIL鎖,因此python的多線程不能利用多核,那麼,由於現在的計算機是多核的,就不能充分利用計算機的多核資源。但是python中的多進程是可以跑在不同的cpu上的。因此,嘗試了多進程+多線程的方式,來做一個任務。比如:從中科大的鏡像源中下載多個rpm包。
#!/usr/bin/pythonimport reimport commandsimport timeimport multiprocessingimport threadingdef download_image(url):
print '*****the %s rpm begin to download *******' % url
commands.getoutput('wget %s' % url)def get_rpm_url_list(url):
commands.getoutput('wget %s' % url)
rpm_info_str = open('index.html').read()
regu_mate = '(?<=<a href=")(.*?)(?=">)'
rpm_list = re.findall(regu_mate, rpm_info_str)
rpm_url_list = [url + rpm_name for rpm_name in rpm_list] print 'the count of rpm list is: ', len(rpm_url_list) return rpm_url_
def multi_thread(rpm_url_list):
threads = [] # url = 'https://mirrors.ustc.e.cn/centos/7/os/x86_64/Packages/'
# rpm_url_list = get_rpm_url_list(url)
for index in range(len(rpm_url_list)): print 'rpm_url is:', rpm_url_list[index]
one_thread = threading.Thread(target=download_image, args=(rpm_url_list[index],))
threads.append(one_thread)
thread_num = 5 # set threading pool, you have put 4 threads in it
while 1:
count = min(thread_num, len(threads)) print '**********count*********', count ###25,25,...6707%25
res = [] for index in range(count):
x = threads.pop()
res.append(x) for thread_index in res:
thread_index.start() for j in res:
j.join() if not threads:
def multi_process(rpm_url_list):
# process num at the same time is 4
process = []
rpm_url_group_0 = []
rpm_url_group_1 = []
rpm_url_group_2 = []
rpm_url_group_3 = [] for index in range(len(rpm_url_list)): if index % 4 == 0:
rpm_url_group_0.append(rpm_url_list[index]) elif index % 4 == 1:
rpm_url_group_1.append(rpm_url_list[index]) elif index % 4 == 2:
rpm_url_group_2.append(rpm_url_list[index]) elif index % 4 == 3:
rpm_url_group_3.append(rpm_url_list[index])
rpm_url_groups = [rpm_url_group_0, rpm_url_group_1, rpm_url_group_2, rpm_url_group_3] for each_rpm_group in rpm_url_groups:
each_process = multiprocessing.Process(target = multi_thread, args = (each_rpm_group,))
process.append(each_process) for one_process in process:
one_process.start() for one_process in process:
one_process.join()# for each_url in rpm_url_list:# print '*****the %s rpm begin to download *******' %each_url## commands.getoutput('wget %s' %each_url)
def main():
url = 'https://mirrors.ustc.e.cn/centos/7/os/x86_64/Packages/'
url_paas = 'http://mirrors.ustc.e.cn/centos/7.3.1611/paas/x86_64/openshift-origin/'
url_paas2 ='http://mirrors.ustc.e.cn/fedora/development/26/Server/x86_64/os/Packages/u/'
start_time = time.time()
rpm_list = get_rpm_url_list(url_paas) print multi_process(rpm_list) # print multi_thread(rpm_list)
#print multi_process()
# print multi_thread(rpm_list)
# for index in range(len(rpm_list)):
# print 'rpm_url is:', rpm_list[index]
end_time = time.time() print 'the download time is:', end_time - start_timeprint main()123456789101112131415161718
代碼的功能主要是這樣的:
main()方法中調用get_rpm_url_list(base_url)方法,獲取要下載的每個rpm包的具體的url地址。其中base_url即中科大基礎的鏡像源的地址,比如:http://mirrors.ustc.e.cn/centos/7.3.1611/paas/x86_64/openshift-origin/,這個地址下有幾十個rpm包,get_rpm_url_list方法將每個rpm包的url地址拼出來並返回。
multi_process(rpm_url_list)啟動多進程方法,在該方法中,會調用多線程方法。該方法啟動4個多進程,將上面方法得到的rpm包的url地址進行分組,分成4組,然後每一個組中的rpm包再最後由不同的線程去執行。從而達到了多進程+多線程的配合使用。
代碼還有需要改進的地方,比如多進程啟動的進程個數和rpm包的url地址分組是硬編碼,這個還需要改進,畢竟,不同的機器,適合同時啟動的進程個數是不同的。
『拾』 有沒有易懂的 Python 多線程爬蟲代碼
Python 在程序並行化方面多少有些聲名狼藉。撇開技術上的問題,例如線程的實現和 GIL1,我覺得錯誤的教學指導才是主要問題。常見的經典 Python 多線程、多進程教程多顯得偏「重」。而且往往隔靴搔癢,沒有深入探討日常工作中最有用的內容。
傳統的例子
簡單搜索下「Python 多線程教程」,不難發現幾乎所有的教程都給出涉及類和隊列的例子:
#Example.py
'''
Standard Procer/Consumer Threading Pattern
'''
import time
import threading
import Queue
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while True:
# queue.get() blocks the current thread until
# an item is retrieved.
msg = self._queue.get()
# Checks if the current message is
# the "Poison Pill"
if isinstance(msg, str) and msg == 'quit':
# if so, exists the loop
break
# "Processes" (or in our case, prints) the queue item
print "I'm a thread, and I received %s!!" % msg
# Always be friendly!
print 'Bye byes!'
def Procer():
# Queue is used to share items between
# the threads.
queue = Queue.Queue()
# Create an instance of the worker
worker = Consumer(queue)
# start calls the internal run() method to
# kick off the thread
worker.start()
# variable to keep track of when we started
start_time = time.time()
# While under 5 seconds..
while time.time() - start_time < 5:
# "Proce" a piece of work and stick it in
# the queue for the Consumer to process
queue.put('something at %s' % time.time())
# Sleep a bit just to avoid an absurd number of messages
time.sleep(1)
# This the "poison pill" method of killing a thread.
queue.put('quit')
# wait for the thread to close down
worker.join()
if __name__ == '__main__':
Procer()
哈,看起來有些像 Java 不是嗎?
我並不是說使用生產者/消費者模型處理多線程/多進程任務是錯誤的(事實上,這一模型自有其用武之地)。只是,處理日常腳本任務時我們可以使用更有效率的模型。
問題在於…
首先,你需要一個樣板類;
其次,你需要一個隊列來傳遞對象;
而且,你還需要在通道兩端都構建相應的方法來協助其工作(如果需想要進行雙向通信或是保存結果還需要再引入一個隊列)。
worker 越多,問題越多
按照這一思路,你現在需要一個 worker 線程的線程池。下面是一篇 IBM 經典教程中的例子——在進行網頁檢索時通過多線程進行加速。
#Example2.py
'''
A more realistic thread pool example
'''
import time
import threading
import Queue
import urllib2
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while True:
content = self._queue.get()
if isinstance(content, str) and content == 'quit':
break
response = urllib2.urlopen(content)
print 'Bye byes!'
def Procer():
urls = [
'', ''
'', ''
# etc..
]
queue = Queue.Queue()
worker_threads = build_worker_pool(queue, 4)
start_time = time.time()
# Add the urls to process
for url in urls:
queue.put(url)
# Add the poison pillv
for worker in worker_threads:
queue.put('quit')
for worker in worker_threads:
worker.join()
print 'Done! Time taken: {}'.format(time.time() - start_time)
def build_worker_pool(queue, size):
workers = []
for _ in range(size):
worker = Consumer(queue)
worker.start()
workers.append(worker)
return workers
if __name__ == '__main__':
Procer()
這段代碼能正確的運行,但仔細看看我們需要做些什麼:構造不同的方法、追蹤一系列的線程,還有為了解決惱人的死鎖問題,我們需要進行一系列的 join 操作。這還只是開始……
至此我們回顧了經典的多線程教程,多少有些空洞不是嗎?樣板化而且易出錯,這樣事倍功半的風格顯然不那麼適合日常使用,好在我們還有更好的方法。
何不試試 map
map 這一小巧精緻的函數是簡捷實現 Python 程序並行化的關鍵。map 源於 Lisp 這類函數式編程語言。它可以通過一個序列實現兩個函數之間的映射。
urls = ['', '']
results = map(urllib2.urlopen, urls)
上面的這兩行代碼將 urls 這一序列中的每個元素作為參數傳遞到 urlopen 方法中,並將所有結果保存到 results 這一列表中。其結果大致相當於:
results = []
for url in urls:
results.append(urllib2.urlopen(url))
map 函數一手包辦了序列操作、參數傳遞和結果保存等一系列的操作。
為什麼這很重要呢?這是因為藉助正確的庫,map 可以輕松實現並行化操作。
在 Python 中有個兩個庫包含了 map 函數: multiprocessing 和它鮮為人知的子庫 multiprocessing.mmy.
這里多扯兩句: multiprocessing.mmy? mltiprocessing 庫的線程版克隆?這是蝦米?即便在 multiprocessing 庫的官方文檔里關於這一子庫也只有一句相關描述。而這句描述譯成人話基本就是說:"嘛,有這么個東西,你知道就成."相信我,這個庫被嚴重低估了!
mmy 是 multiprocessing 模塊的完整克隆,唯一的不同在於 multiprocessing 作用於進程,而 mmy 模塊作用於線程(因此也包括了 Python 所有常見的多線程限制)。
所以替換使用這兩個庫異常容易。你可以針對 IO 密集型任務和 CPU 密集型任務來選擇不同的庫。2
動手嘗試
使用下面的兩行代碼來引用包含並行化 map 函數的庫:
from multiprocessing import Pool
from multiprocessing.mmy import Pool as ThreadPool
實例化 Pool 對象:
pool = ThreadPool()
這條簡單的語句替代了 example2.py 中 build_worker_pool 函數 7 行代碼的工作。它生成了一系列的 worker 線程並完成初始化工作、將它們儲存在變數中以方便訪問。
Pool 對象有一些參數,這里我所需要關注的只是它的第一個參數:processes. 這一參數用於設定線程池中的線程數。其默認值為當前機器 CPU 的核數。
一般來說,執行 CPU 密集型任務時,調用越多的核速度就越快。但是當處理網路密集型任務時,事情有有些難以預計了,通過實驗來確定線程池的大小才是明智的。
pool = ThreadPool(4) # Sets the pool size to 4
線程數過多時,切換線程所消耗的時間甚至會超過實際工作時間。對於不同的工作,通過嘗試來找到線程池大小的最優值是個不錯的主意。
創建好 Pool 對象後,並行化的程序便呼之欲出了。我們來看看改寫後的 example2.py
import urllib2
from multiprocessing.mmy import Pool as ThreadPool
urls = [
# etc..
]
# Make the Pool of workers
pool = ThreadPool(4)
# Open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
#close the pool and wait for the work to finish
pool.close()
pool.join()
實際起作用的代碼只有 4 行,其中只有一行是關鍵的。map 函數輕而易舉的取代了前文中超過 40 行的例子。為了更有趣一些,我統計了不同方法、不同線程池大小的耗時情況。
# results = []
# for url in urls:
# result = urllib2.urlopen(url)
# results.append(result)
# # ------- VERSUS ------- #
# # ------- 4 Pool ------- #
# pool = ThreadPool(4)
# results = pool.map(urllib2.urlopen, urls)
# # ------- 8 Pool ------- #
# pool = ThreadPool(8)
# results = pool.map(urllib2.urlopen, urls)
# # ------- 13 Pool ------- #
# pool = ThreadPool(13)
# results = pool.map(urllib2.urlopen, urls)
結果:
# Single thread: 14.4 Seconds
# 4 Pool: 3.1 Seconds
# 8 Pool: 1.4 Seconds
# 13 Pool: 1.3 Seconds
很棒的結果不是嗎?這一結果也說明了為什麼要通過實驗來確定線程池的大小。在我的機器上當線程池大小大於 9 帶來的收益就十分有限了。
另一個真實的例子
生成上千張圖片的縮略圖
這是一個 CPU 密集型的任務,並且十分適合進行並行化。
基礎單進程版本
import os
import PIL
from multiprocessing import Pool
from PIL import Image
SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'
def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if 'jpeg' in f)
def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base, fname = os.path.split(filename)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)
if __name__ == '__main__':
folder = os.path.abspath(
'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
images = get_image_paths(folder)
for image in images:
create_thumbnail(Image)
上邊這段代碼的主要工作就是將遍歷傳入的文件夾中的圖片文件,一一生成縮略圖,並將這些縮略圖保存到特定文件夾中。
這我的機器上,用這一程序處理 6000 張圖片需要花費 27.9 秒。
如果我們使用 map 函數來代替 for 循環:
import os
import PIL
from multiprocessing import Pool
from PIL import Image
SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'
def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if 'jpeg' in f)
def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base, fname = os.path.split(filename)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)
if __name__ == '__main__':
folder = os.path.abspath(
'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
images = get_image_paths(folder)
pool = Pool()
pool.map(creat_thumbnail, images)
pool.close()
pool.join()
5.6 秒!
雖然只改動了幾行代碼,我們卻明顯提高了程序的執行速度。在生產環境中,我們可以為 CPU 密集型任務和 IO 密集型任務分別選擇多進程和多線程庫來進一步提高執行速度——這也是解決死鎖問題的良方。此外,由於 map 函數並不支持手動線程管理,反而使得相關的 debug 工作也變得異常簡單。
到這里,我們就實現了(基本)通過一行 Python 實現並行化。