pythonmap與多線程
Ⅰ 有沒有易懂的 python 多線程爬蟲代碼
Python 在程序並行化方面多少有些聲名狼藉。撇開技術上的問題,例如線程的實現和 GIL1,我覺得錯誤的教學指導才是主要問題。常見的經典 Python 多線程、多進程教程多顯得偏「重」。而且往往隔靴搔癢,沒有深入探討日常工作中最有用的內容。
傳統的例子
簡單搜索下「Python 多線程教程」,不難發現幾乎所有的教程都給出涉及類和隊列的例子:
#Example.py'''
Standard Procer/Consumer Threading Pattern
'''import time
import threading
import Queue
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while True:
# queue.get() blocks the current thread until
# an item is retrieved.
msg = self._queue.get()
# Checks if the current message is
# the "Poison Pill"
if isinstance(msg, str) and msg == 'quit': # if so, exists the loop
break
# "Processes" (or in our case, prints) the queue item
print "I'm a thread, and I received %s!!" % msg # Always be friendly!
print 'Bye byes!'def Procer():
# Queue is used to share items between
# the threads.
queue = Queue.Queue() # Create an instance of the worker
worker = Consumer(queue) # start calls the internal run() method to
# kick off the thread
worker.start()
# variable to keep track of when we started
start_time = time.time()
# While under 5 seconds..
while time.time() - start_time < 5:
# "Proce" a piece of work and stick it in
# the queue for the Consumer to process
queue.put('something at %s' % time.time()) # Sleep a bit just to avoid an absurd number of messages
time.sleep(1) # This the "poison pill" method of killing a thread.
queue.put('quit') # wait for the thread to close down
worker.join()if __name__ == '__main__':
Procer()
哈,看起來有些像 Java 不是嗎?
我並不是說使用生產者/消費者模型處理多線程/多進程任務是錯誤的(事實上,這一模型自有其用武之地)。只是,處理日常腳本任務時我們可以使用更有效率的模型。
問題在於…
首先,你需要一個樣板類;
其次,你需要一個隊列來傳遞對象;
而且,你還需要在通道兩端都構建相應的方法來協助其工作(如果需想要進行雙向通信或是保存結果還需要再引入一個隊列)。
worker 越多,問題越多
按照這一思路,你現在需要一個 worker 線程的線程池。下面是一篇 IBM 經典教程中的例子——在進行網頁檢索時通過多線程進行加速。
#Example2.py'''
A more realistic thread pool example
'''import time
import threading
import Queue
import urllib2
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while True:
content = self._queue.get()
if isinstance(content, str) and content == 'quit': break
response = urllib2.urlopen(content) print 'Bye byes!'def Procer():
urls = [ 'http', 'httcom'
'ala.org', 'hle.com'
# etc..
]
queue = Queue.Queue()
worker_threads = build_worker_pool(queue, 4)
start_time = time.time() # Add the urls to process
for url in urls:
queue.put(url)
# Add the poison pillv
for worker in worker_threads:
queue.put('quit') for worker in worker_threads:
worker.join() print 'Done! Time taken: {}'.format(time.time() - start_time)def build_worker_pool(queue, size):
workers = [] for _ in range(size):
worker = Consumer(queue)
worker.start()
workers.append(worker) return workersif __name__ == '__main__':
Procer()
這段代碼能正確的運行,但仔細看看我們需要做些什麼:構造不同的方法、追蹤一系列的線程,還有為了解決惱人的死鎖問題,我們需要進行一系列的 join 操作。這還只是開始……
至此我們回顧了經典的多線程教程,多少有些空洞不是嗎?樣板化而且易出錯,這樣事倍功半的風格顯然不那麼適合日常使用,好在我們還有更好的方法。
何不試試 map
map 這一小巧精緻的函數是簡捷實現 Python 程序並行化的關鍵。map 源於 Lisp 這類函數式編程語言。它可以通過一個序列實現兩個函數之間的映射。
urls = ['ho.com', 'htdit.com']
results = map(urllib2.urlopen, urls)
上面的這兩行代碼將 urls 這一序列中的每個元素作為參數傳遞到 urlopen 方法中,並將所有結果保存到 results 這一列表中。其結果大致相當於:
results = []for url in urls:
results.append(urllib2.urlopen(url))
map 函數一手包辦了序列操作、參數傳遞和結果保存等一系列的操作。
為什麼這很重要呢?這是因為藉助正確的庫,map 可以輕松實現並行化操作。
在 Python 中有個兩個庫包含了 map 函數: multiprocessing 和它鮮為人知的子庫 multiprocessing.mmy.
這里多扯兩句: multiprocessing.mmy? mltiprocessing 庫的線程版克隆?這是蝦米?即便在 multiprocessing 庫的官方文檔里關於這一子庫也只有一句相關描述。而這句描述譯成人話基本就是說:"嘛,有這么個東西,你知道就成."相信我,這個庫被嚴重低估了!
mmy 是 multiprocessing 模塊的完整克隆,唯一的不同在於 multiprocessing 作用於進程,而 mmy 模塊作用於線程(因此也包括了 Python 所有常見的多線程限制)。
所以替換使用這兩個庫異常容易。你可以針對 IO 密集型任務和 CPU 密集型任務來選擇不同的庫。2
動手嘗試
使用下面的兩行代碼來引用包含並行化 map 函數的庫:
from multiprocessing import Poolfrom multiprocessing.mmy import Pool as ThreadPool
實例化 Pool 對象:
pool = ThreadPool()
這條簡單的語句替代了 example2.py 中 build_worker_pool 函數 7 行代碼的工作。它生成了一系列的 worker 線程並完成初始化工作、將它們儲存在變數中以方便訪問。
Pool 對象有一些參數,這里我所需要關注的只是它的第一個參數:processes. 這一參數用於設定線程池中的線程數。其默認值為當前機器 CPU 的核數。
一般來說,執行 CPU 密集型任務時,調用越多的核速度就越快。但是當處理網路密集型任務時,事情有有些難以預計了,通過實驗來確定線程池的大小才是明智的。
pool = ThreadPool(4) # Sets the pool size to 4
線程數過多時,切換線程所消耗的時間甚至會超過實際工作時間。對於不同的工作,通過嘗試來找到線程池大小的最優值是個不錯的主意。
創建好 Pool 對象後,並行化的程序便呼之欲出了。我們來看看改寫後的 example2.py
import urllib2
from multiprocessing.mmy import Pool as ThreadPool
urls = [ 'httorg',
'hon.org/about/',
'hnlamp.com/pub/a/python/2003/04/17/metaclasses.html',
# etc..
]
# Make the Pool of workers
pool = ThreadPool(4)
# Open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
#close the pool and wait for the work to finish
pool.close()
pool.join()
實際起作用的代碼只有 4 行,其中只有一行是關鍵的。map 函數輕而易舉的取代了前文中超過 40 行的例子。為了更有趣一些,我統計了不同方法、不同線程池大小的耗時情況。
# results = [] # for url in urls:# result = urllib2.urlopen(url)# results.append(result)# # ------- VERSUS ------- # # # ------- 4 Pool ------- # # pool = ThreadPool(4) # results = pool.map(urllib2.urlopen, urls)# # ------- 8 Pool ------- # # pool = ThreadPool(8) # results = pool.map(urllib2.urlopen, urls)# # ------- 13 Pool ------- # # pool = ThreadPool(13) # results = pool.map(urllib2.urlopen, urls)
結果:
# Single thread: 14.4 Seconds # 4 Pool: 3.1 Seconds# 8 Pool: 1.4 Seconds# 13 Pool: 1.3 Seconds
很棒的結果不是嗎?這一結果也說明了為什麼要通過實驗來確定線程池的大小。在我的機器上當線程池大小大於 9 帶來的收益就十分有限了。
Ⅱ python 多進程和多線程配合
由於python的多線程中存在PIL鎖,因此python的多線程不能利用多核,那麼,由於現在的計算機是多核的,就不能充分利用計算機的多核資源。但是python中的多進程是可以跑在不同的cpu上的。因此,嘗試了多進程+多線程的方式,來做一個任務。比如:從中科大的鏡像源中下載多個rpm包。
#!/usr/bin/pythonimport reimport commandsimport timeimport multiprocessingimport threadingdef download_image(url):
print '*****the %s rpm begin to download *******' % url
commands.getoutput('wget %s' % url)def get_rpm_url_list(url):
commands.getoutput('wget %s' % url)
rpm_info_str = open('index.html').read()
regu_mate = '(?<=<a href=")(.*?)(?=">)'
rpm_list = re.findall(regu_mate, rpm_info_str)
rpm_url_list = [url + rpm_name for rpm_name in rpm_list] print 'the count of rpm list is: ', len(rpm_url_list) return rpm_url_
def multi_thread(rpm_url_list):
threads = [] # url = 'https://mirrors.ustc.e.cn/centos/7/os/x86_64/Packages/'
# rpm_url_list = get_rpm_url_list(url)
for index in range(len(rpm_url_list)): print 'rpm_url is:', rpm_url_list[index]
one_thread = threading.Thread(target=download_image, args=(rpm_url_list[index],))
threads.append(one_thread)
thread_num = 5 # set threading pool, you have put 4 threads in it
while 1:
count = min(thread_num, len(threads)) print '**********count*********', count ###25,25,...6707%25
res = [] for index in range(count):
x = threads.pop()
res.append(x) for thread_index in res:
thread_index.start() for j in res:
j.join() if not threads:
def multi_process(rpm_url_list):
# process num at the same time is 4
process = []
rpm_url_group_0 = []
rpm_url_group_1 = []
rpm_url_group_2 = []
rpm_url_group_3 = [] for index in range(len(rpm_url_list)): if index % 4 == 0:
rpm_url_group_0.append(rpm_url_list[index]) elif index % 4 == 1:
rpm_url_group_1.append(rpm_url_list[index]) elif index % 4 == 2:
rpm_url_group_2.append(rpm_url_list[index]) elif index % 4 == 3:
rpm_url_group_3.append(rpm_url_list[index])
rpm_url_groups = [rpm_url_group_0, rpm_url_group_1, rpm_url_group_2, rpm_url_group_3] for each_rpm_group in rpm_url_groups:
each_process = multiprocessing.Process(target = multi_thread, args = (each_rpm_group,))
process.append(each_process) for one_process in process:
one_process.start() for one_process in process:
one_process.join()# for each_url in rpm_url_list:# print '*****the %s rpm begin to download *******' %each_url## commands.getoutput('wget %s' %each_url)
def main():
url = 'https://mirrors.ustc.e.cn/centos/7/os/x86_64/Packages/'
url_paas = 'http://mirrors.ustc.e.cn/centos/7.3.1611/paas/x86_64/openshift-origin/'
url_paas2 ='http://mirrors.ustc.e.cn/fedora/development/26/Server/x86_64/os/Packages/u/'
start_time = time.time()
rpm_list = get_rpm_url_list(url_paas) print multi_process(rpm_list) # print multi_thread(rpm_list)
#print multi_process()
# print multi_thread(rpm_list)
# for index in range(len(rpm_list)):
# print 'rpm_url is:', rpm_list[index]
end_time = time.time() print 'the download time is:', end_time - start_timeprint main()123456789101112131415161718
代碼的功能主要是這樣的:
main()方法中調用get_rpm_url_list(base_url)方法,獲取要下載的每個rpm包的具體的url地址。其中base_url即中科大基礎的鏡像源的地址,比如:http://mirrors.ustc.e.cn/centos/7.3.1611/paas/x86_64/openshift-origin/,這個地址下有幾十個rpm包,get_rpm_url_list方法將每個rpm包的url地址拼出來並返回。
multi_process(rpm_url_list)啟動多進程方法,在該方法中,會調用多線程方法。該方法啟動4個多進程,將上面方法得到的rpm包的url地址進行分組,分成4組,然後每一個組中的rpm包再最後由不同的線程去執行。從而達到了多進程+多線程的配合使用。
代碼還有需要改進的地方,比如多進程啟動的進程個數和rpm包的url地址分組是硬編碼,這個還需要改進,畢竟,不同的機器,適合同時啟動的進程個數是不同的。
Ⅲ map函數的用法python
map函數的用法如下:
map(func, lst) ,將傳⼊的函數變數 func 作⽤到 lst 變數的每個元素中,並將結果組成新的列表 (Python2)/ 迭代器(Python3) 返回。
注意:
map()返回的是一個迭代器,直接列印map()的結果是返回的一個對象。
map函數示例代碼:
lst = ['1', '2', '3', '4', '5', '6']
print(lst)
lst_int = map(lambda x: int(x), lst)
# print(list(lst_int))
for i in lst_int:
print(i, end=' ')
print()
print(list(lst_int))
Ⅳ Python多線程總結
在實際處理數據時,因系統內存有限,我們不可能一次把所有數據都導出進行操作,所以需要批量導出依次操作。為了加快運行,我們會採用多線程的方法進行數據處理, 以下為我總結的多線程批量處理數據的模板:
主要分為三大部分:
共分4部分對多線程的內容進行總結。
先為大家介紹線程的相關概念:
在飛車程序中,如果沒有多線程,我們就不能一邊聽歌一邊玩飛車,聽歌與玩 游戲 不能並行;在使用多線程後,我們就可以在玩 游戲 的同時聽背景音樂。在這個例子中啟動飛車程序就是一個進程,玩 游戲 和聽音樂是兩個線程。
Python 提供了 threading 模塊來實現多線程:
因為新建線程系統需要分配資源、終止線程系統需要回收資源,所以如果可以重用線程,則可以減去新建/終止的開銷以提升性能。同時,使用線程池的語法比自己新建線程執行線程更加簡潔。
Python 為我們提供了 ThreadPoolExecutor 來實現線程池,此線程池默認子線程守護。它的適應場景為突發性大量請求或需要大量線程完成任務,但實際任務處理時間較短。
其中 max_workers 為線程池中的線程個數,常用的遍歷方法有 map 和 submit+as_completed 。根據業務場景的不同,若我們需要輸出結果按遍歷順序返回,我們就用 map 方法,若想誰先完成就返回誰,我們就用 submit+as_complete 方法。
我們把一個時間段內只允許一個線程使用的資源稱為臨界資源,對臨界資源的訪問,必須互斥的進行。互斥,也稱間接制約關系。線程互斥指當一個線程訪問某臨界資源時,另一個想要訪問該臨界資源的線程必須等待。當前訪問臨界資源的線程訪問結束,釋放該資源之後,另一個線程才能去訪問臨界資源。鎖的功能就是實現線程互斥。
我把線程互斥比作廁所包間上大號的過程,因為包間里只有一個坑,所以只允許一個人進行大號。當第一個人要上廁所時,會將門上上鎖,這時如果第二個人也想大號,那就必須等第一個人上完,將鎖解開後才能進行,在這期間第二個人就只能在門外等著。這個過程與代碼中使用鎖的原理如出一轍,這里的坑就是臨界資源。 Python 的 threading 模塊引入了鎖。 threading 模塊提供了 Lock 類,它有如下方法加鎖和釋放鎖:
我們會發現這個程序只會列印「第一道鎖」,而且程序既沒有終止,也沒有繼續運行。這是因為 Lock 鎖在同一線程內第一次加鎖之後還沒有釋放時,就進行了第二次 acquire 請求,導致無法執行 release ,所以鎖永遠無法釋放,這就是死鎖。如果我們使用 RLock 就能正常運行,不會發生死鎖的狀態。
在主線程中定義 Lock 鎖,然後上鎖,再創建一個子 線程t 運行 main 函數釋放鎖,結果正常輸出,說明主線程上的鎖,可由子線程解鎖。
如果把上面的鎖改為 RLock 則報錯。在實際中設計程序時,我們會將每個功能分別封裝成一個函數,每個函數中都可能會有臨界區域,所以就需要用到 RLock 。
一句話總結就是 Lock 不能套娃, RLock 可以套娃; Lock 可以由其他線程中的鎖進行操作, RLock 只能由本線程進行操作。
Ⅳ python map是不是多線程
顯然不是。map是完全的單線程。
Ⅵ python需要學習什麼內容
Python的學習內容還是比較多的,我們將學習的過程劃分為4個階段,每個階段學習對應的內容,具體的學習順序如下:
Python學習順序:
①Python軟體開發基礎
掌握計算機的構成和工作原理
會使用Linux常用工具
熟練使用Docker的基本命令
建立Python開發環境,並使用print輸出
使用Python完成字元串的各種操作
使用Python re模塊進行程序設計
使用Python創建文件、訪問、刪除文件
掌握import 語句、From…import 語句、From…import* 語句、方法的引用、Python中的包
能夠使用Python面向對象方法開發軟體
能夠自己建立資料庫,表,並進行基本資料庫操作
掌握非關系資料庫MongoDB的使用,掌握Redis開發
能夠獨立完成TCP/UDP服務端客戶端軟體開發,能夠實現ftp、http伺服器,開發郵件軟體
能開發多進程、多線程軟體
能夠獨立完成後端軟體開發,深入理解Python開發後端的精髓
能夠獨立完成前端軟體開發,並和後端結合,熟練掌握使用Python進行全站Web開發的技巧
能夠使用Python熟練編寫爬蟲軟體
能夠熟練使用Python庫進行數據分析
招聘網站Python招聘職位數據爬取分析
掌握使用Python開源人工智慧框架進行人工智慧軟體開發、語音識別、人臉識別
掌握基本設計模式、常用演算法
掌握軟體工程、項目管理、項目文檔、軟體測試調優的基本方法
②Python軟體開發進階
③Python全棧式WEB工程師
④Python多領域開發
互聯網行業目前還是最熱門的行業之一,學習IT技能之後足夠優秀是有機會進入騰訊、阿里、網易等互聯網大廠高薪就業的,發展前景非常好,普通人也可以學習。
想要系統學習,你可以考察對比一下開設有相關專業的熱門學校,好的學校擁有根據當下企業需求自主研發課程的能力,中博軟體學院、南京課工場、南京北大青鳥等開設python專業的學校都是不錯的,建議實地考察對比一下。
祝你學有所成,望採納。
Ⅶ python中多線程調用全局變數,值不是修改後的值
多線程讀取全局變數需要引用線程鎖,否則多個線程同時讀取同一個全局變數會出現和預期不一樣的值
Ⅷ python multiprocessing問題,為什麼輸出結果和預期不一樣
眾所周知,由於python(Cpython)的全局鎖(GIL)問題存在,導致Thread也就是線程的並行並不可實現。 multiprocessing 模塊採用多進程而不是多線程的方式實現並行,解決了GIL的問題,一定程度上使狀況得到了緩解。
然而,Multiprocess本身依然有一些功能上的瓶頸。其中一個重要的是:進程之間不能共享內存(線程間則可以共享內存)。這意味著在進程間交換數據的時候,需要把數據打包、傳遞,解包。在python的語境下就是:
"pickle from main process to the subprocess;
depickle from subprocess to an object in memory;
pickle and return to the main process;
depickle from main process and return to memory"
(具體詳見這個問題下的吐槽)
因此, 在需要在進程間共享巨大的數據包的時候,多進程的表現還不如單進程。
除此之外,當需要運行的程序本身不是計算密集型而是是IO密集型,多進程所增加的讀寫會抵消掉運算速度的增益;如果程序復雜度根本不需要用並行來解決,那麼建立進程(池)的時間很可能比運行程序本身還要慢;另外,在進程池 multiprocessing.Pool(n) 的 n 的選擇上,如果選擇了多於當前CPU的核心數目的數字( multiprocessing.cpu_count() ),那麼在進程之間切換的功夫會大大拉低效率。
建立對線程和進程關系的直觀印象,可參考這篇文章。
快速而完整地了解python的全局鎖(GIL)問題,參考這篇不錯的博客。
為了解 multiprocess 的使用,我做了一些測試,測試環境是4核的Macbook Air。如下:
from multiprocessing import Process, Manager, Pool
1 def f(l):
2 l.reverse()
3
return
4
5 def main():
6
l1 = [random.randrange(0, 100000, 1) for i in range(0, 100000)]
7
l2 = [random.randrange(0, 100000, 1) for i in range(0, 100000)]
8
l3 = [random.randrange(0, 100000, 1) for i in range(0, 100000)]
9
l4 = [random.randrange(0, 100000, 1) for i in range(0, 100000)]
10
l5 = [random.randrange(0, 100000, 1) for i in range(0, 100000)]
11
l6 = [random.randrange(0, 100000, 1) for i in range(0, 100000)]
12
l7 = [random.randrange(0, 100000, 1) for i in range(0, 100000)]
13
s = time.time()
14
for l in [l1, l2, l3, l4, l5, l6, l7]:
15
f(l)
16
print "%s seconds" % (time.time() - s)
17
s = time.time()
18 map(f, [l1, l2, l3, l4, l5, l6, l7])
19
print "%s seconds" % (time.time() - s)
20
p = Pool(4)
21
s = time.time()
22 p.map(f, [l1, l2, l3, l4, l5, l6, l7])
23
print "%s seconds" % (time.time() - s)
24
return
也就是分別測試 f() 對 l1, l2, l3, l4, l5, l6, l7 7個列表的操作時間。先是循環的依次操作,再是python中非常好用的 map() 函數,最後是 multiprocessing 的進程池 multiprocessing.Pool.map() ——進程池中建立了4個 worker process , 也就是說,接下來的任務會被隨機地分配給4個進程來完成。
每次操作之前都重新計時,得到了這樣的結果:
>>> main()
0.00250101089478 seconds
0.000663995742798 seconds
0.907639980316 seconds
多進程出奇得慢。而 map() 相對於循環操作有很大的效率提升。
Ⅸ Python課程內容都學習什麼啊
賀聖軍Python輕松入門到項目實戰(經典完整版)(超清視頻)網路網盤
鏈接: https://pan..com/s/1C9k1o65FuQKNe68L3xEx3w
若資源有問題歡迎追問~
Ⅹ python中map函數的使用
map() 會根據提供的函數對指定序列做映射。
第一個參數 function 以參數序列中的每一個元素調用 function 函數,返回包含每次 function 函數返回值的新列表。 (10)pythonmap與多線程擴展閱讀
map() 函數語法:
map(function, iterable, ...);
參數:
function -- 函數;
iterable -- 一個或多個序列