python進程join
⑴ 小白都看懂了,python 中的線程和進程精講,建議收藏
目錄
眾所周知,CPU是計算機的核心,它承擔了所有的計算任務。而操作系統是計算機的管理者,是一個大管家,它負責任務的調度,資源的分配和管理,統領整個計算機硬體。應用程序是具有某種功能的程序,程序運行與操作系統之上
在很早的時候計算機並沒有線程這個概念,但是隨著時代的發展,只用進程來處理程序出現很多的不足。如當一個進程堵塞時,整個程序會停止在堵塞處,並且如果頻繁的切換進程,會浪費系統資源。所以線程出現了
線程是能擁有資源和獨立運行的最小單位,也是程序執行的最小單位。一個進程可以擁有多個線程,而且屬於同一個進程的多個線程間會共享該進行的資源
① 200 多本 Python 電子書(和經典的書籍)應該有
② Python標准庫資料(最全中文版)
③ 項目源碼(四五十個有趣且可靠的練手項目及源碼)
④ Python基礎入門、爬蟲、網路開發、大數據分析方面的視頻(適合小白學習)
⑤ Python學習路線圖(告別不入流的學習)
私信我01即可獲取大量Python學習資源
進程時一個具有一定功能的程序在一個數據集上的一次動態執行過程。進程由程序,數據集合和進程式控制制塊三部分組成。程序用於描述進程要完成的功能,是控制進程執行的指令集;數據集合是程序在執行時需要的數據和工作區;程序控制塊(PCB)包含程序的描述信息和控制信息,是進程存在的唯一標志
在Python中,通過兩個標准庫 thread 和 Threading 提供對線程的支持, threading 對 thread 進行了封裝。 threading 模塊中提供了 Thread , Lock , RLOCK , Condition 等組件
在Python中線程和進程的使用就是通過 Thread 這個類。這個類在我們的 thread 和 threading 模塊中。我們一般通過 threading 導入
默認情況下,只要在解釋器中,如果沒有報錯,則說明線程可用
守護模式:
現在我們程序代碼中,有多個線程, 並且在這個幾個線程中都會去 操作同一部分內容,那麼如何實現這些數據的共享呢?
這時,可以使用 threading庫裡面的鎖對象 Lock 去保護
Lock 對象的acquire方法 是申請鎖
每個線程在操作共享數據對象之前,都應該申請獲取操作權,也就是調用該共享數據對象對應的鎖對象的acquire方法,如果線程A 執行了 acquire() 方法,別的線程B 已經申請到了這個鎖, 並且還沒有釋放,那麼 線程A的代碼就在此處 等待 線程B 釋放鎖,不去執行後面的代碼。
直到線程B 執行了鎖的 release 方法釋放了這個鎖, 線程A 才可以獲取這個鎖,就可以執行下面的代碼了
如:
到在使用多線程時,如果數據出現和自己預期不符的問題,就可以考慮是否是共享的數據被調用覆蓋的問題
使用 threading 庫裡面的鎖對象 Lock 去保護
Python中的多進程是通過multiprocessing包來實現的,和多線程的threading.Thread差不多,它可以利用multiprocessing.Process對象來創建一個進程對象。這個進程對象的方法和線程對象的方法差不多也有start(), run(), join()等方法,其中有一個方法不同Thread線程對象中的守護線程方法是setDeamon,而Process進程對象的守護進程是通過設置daemon屬性來完成的
守護模式:
其使用方法和線程的那個 Lock 使用方法類似
Manager的作用是提供多進程共享的全局變數,Manager()方法會返回一個對象,該對象控制著一個服務進程,該進程中保存的對象運行其他進程使用代理進行操作
語法:
線程池的基類是 concurrent.futures 模塊中的 Executor , Executor 提供了兩個子類,即 ThreadPoolExecutor 和 ProcessPoolExecutor ,其中 ThreadPoolExecutor 用於創建線程池,而 ProcessPoolExecutor 用於創建進程池
如果使用線程池/進程池來管理並發編程,那麼只要將相應的 task 函數提交給線程池/進程池,剩下的事情就由線程池/進程池來搞定
Exectuor 提供了如下常用方法:
程序將 task 函數提交(submit)給線程池後,submit 方法會返回一個 Future 對象,Future 類主要用於獲取線程任務函數的返回值。由於線程任務會在新線程中以非同步方式執行,因此,線程執行的函數相當於一個「將來完成」的任務,所以 Python 使用 Future 來代表
Future 提供了如下方法:
使用線程池來執行線程任務的步驟如下:
最佳線程數目 = ((線程等待時間+線程CPU時間)/線程CPU時間 )* CPU數目
也可以低於 CPU 核心數
使用線程池來執行線程任務的步驟如下:
關於進程的開啟代碼一定要放在 if __name__ == '__main__': 代碼之下,不能放到函數中或其他地方
開啟進程的技巧
開啟進程的數量最好低於最大 CPU 核心數
⑵ join函數python
join函數python就是把一個list中所有的串按照你定義的分隔符連接起來。
join是string類型的一個函數,用調用他睜慶的字元串去連接參數里的列表,python裡面萬物皆對象,調用join函數,將後面的列表裡的值用逗號連接成新的字元串。str(i)foriinlist這是一個映射,就是把list中每個值都轉換成字元串。
含義
python中得thread的一些機制和C/C++不同:在C/C++中,主線程結束後,其子線程會默認被主線程kill掉。而在python中,主線程結束後,會默認等待子線程結束後,主線程才退出。
python對於thread的管理中有兩個函數:join和setDaemon。
join:如在一個線程B中調悉敬握用threada。join(),則threada結束後,線程B才會接著threada。join()往後運行。
setDaemon:主線程A啟動了子線程B,調用b。setDaemaon(True),則主線程結束時,稿鋒會把子線程B也殺死,與C/C++中得默認效果是一樣的。
⑶ Python的多進程模塊multiprocessing
眾所周知,Python中不存在真正的多線程,Python中的多線程是一個並發過程。如果想要並行的執行程序,充分的利用cpu資源(cpu核心),還是需要使用多進程解決的。其中multiprocessing模塊應該是Python中最常用的多進程模塊了。
基本上multiprocessing這個模塊和threading這個模塊用法是相同的,也是可以通過函數和類創建進程。
上述案例基本上就是筆者搬用了上篇文章多線程的案例,可見其使用的相似之處。導入multiprocessing後實例化Process就可以創建一個進程,參數的話也是和多線程一樣,target放置進程執行函數,args存放該函數的參數。
使用類來創建進程也是需要先繼承multiprocessing.Process並且實現其init方法。
Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求。
但如果池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,才會創建新的進程。
需要注意的是,在調用join方法阻塞進程前,需要先調用close方法,,否則程序會出錯。
在上述案例中,提到了非阻塞,當把創建進程的方法換為pool.apply(func, (msg,))時,就會阻塞進程,出現下面的狀況。
在multiprocessing模塊中還存在Queue對象,這是一個進程的安全隊列,近似queue.Queue。隊列一般也是需要配合多線程或者多進程使用。
下列案例是一個使用進程隊列實現的生產者消費者模式。
multiprocessing支持兩種進程間的通信,其中一種便是上述案例的隊列,另一種則稱作管道。在官方文檔的描述中,multiprocessing中的隊列是基於管道實現的,並且擁有更高的讀寫效率。
管道可以理解為進程間的通道,使用Pipe([plex])創建,並返回一個元組(conn1,conn2)。如果plex被置為True(默認值),那麼該管道是雙向的,如果plex被置為False,那麼該管道是單向的,即conn1隻能用於接收消息,而conn2僅能用於發送消息。
其中conn1、conn2表示管道兩端的連接對象,每個連接對象都有send()和recv()方法。send和recv方法分別是發送和接受消息的方法。例如,可以調用conn1.send發送消息,conn1.recv接收消息。如果沒有消息可接收,recv方法會一直阻塞。如果管道已經被關閉,那麼recv方法會拋出EOFError。
關於multiprocessing模塊其實還有很多實用的類和方法,由於篇幅有限(懶),筆者就先寫到這里。該模塊其實用起來很像threading模塊,像鎖對象和守護線程(進程)等multiprocessing模塊也是有的,使用方法也近乎相同。
如果想要更加詳細的了解multiprocessing模塊,請參考官方文檔。
⑷ Python threading 中join()的作用
Python中join()的作用:(菜鳥網路) join([time]): 等待至線程中止。這阻塞調用線程直至線程的join() 方法被調用中止-正常退出或者拋出未處理的異常-或者是可選的超時發生
看著定義大致明白,但是自己確不好理解。主要的功能就是多線程的線程獨占,讓此時只有一個線程運行。
1.子線程為什麼需要join?
join阻塞主線程,可以讓主線程獲得子線程的處理結果。
如果沒有join,由於子線程sleep,尚未append到tmp_list,例子中print tmp_list為空列表,join後即能在append執行後print出來。
如果不需要子線程的處理結果,那麼可以不join,當然join了也沒啥影響。
2.為什麼要寫成for循環join的形式?
這個在上文方式二中即提出了,可以即讓子線程非同步執行,又讓主線程等待結果。
⑸ Python入門系列(十二)——GUI+多進程
話說,python做圖形界面並不明智,效率並不高。但在某些特殊需求下還是需要我們去使用,所以python擁有多個第三方庫用以實現GUI,本章我們使用python基本模塊tkinter進行學習,因為需求並不大,所以不做太多拓展。
繼續改寫上一章的IP查詢系統(= =,要玩爛了),首先略改下IpWhere.py以備調用~
然後使用tkinter模塊進行圖形界面的實現,調用預編譯的IpWhere模塊 :
額,太丑了,但基本實現我們小小的需求,在以後的py學習中,我們再涉及其他的第三方模塊,此處就當是入門了解吧。
十分抱歉把這么重要的內容放在最後,要不是大佬指點,此次學習可能就要錯過多進程的問題了。
Unix系統提供了forx,python可藉助os模塊調用,從而實現多進程,然而windows系統並不具備,所以我們選擇python內置的multiprocessing多進程模塊進行學習。
首先我們藉助直接調用多進程來改寫下我們在多線程章節用到的例子!
顯然,這么寫實在太蠢了,如果我們的任務量巨大,這並不合適。所以我們引入了進程池的概念,使用進程池進行改寫:
在此,我們可以看到所有進程是並發執行的,同樣,我們在多線程章節就講過,主進程的結束意味著程序退出,所以我們需要藉助join()方法堵塞進程。
我們知道線程共享內存空間,而進程的內存是獨立的,同一個進程的線程之間可以直接交流,也就帶來了線程同步的苦惱,這個我們在多線程章節已經講過了;而兩個進程想通信,則必須通過一個中間代理來實現,即我們接下來的內容:進程間通信。
進程之間肯定是需要通信的,操作系統提供了很多機制來實現進程間的通信。Python的multiprocessing模塊包裝了底層的機制,提供了Queue、Pipes等多種方式來交換數據。我們接下來就以Queue的方式進行學習。
Queue.Queue是進程內非阻塞隊列,multiprocess.Queue是跨進程通信隊列,前者是各自私有,後者是各子進程共有。
還有一個在後者基礎上進行封裝的multiprocess.Manager.Queue()方法,如果要使用Pool創建進程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否則會得到一條如下的錯誤信息: RuntimeError: Queue objects should only be shared between processes through inheritance.
接下來我們就藉助進程池來進行多進程操作的改寫,感謝大佬一路輔導。
我們可以看到兩個子線程先執行,然後一個子線程單獨執行,此處有意而為之,讓大家更清晰的了解隊列的使用。期間有一處我們放棄使用jion()方法堵塞,而是自己寫了個循環堵塞,大家根據自己習慣來就好。
話說,真的沒人吐槽么?上面的例子從需求上來講,完全就不需要多線程好不好!emmmm,我們來點實力拓展,寫一個有智商的多線程腳本,順便結合上一節的web來一個綜合篇,隨便找個現實需求吧!
emmm,比如我們來到當當網買書,搜一下我們想要的書籍,發現!!太多了!!真J2亂!!看不過來!!不想翻頁!!直接告訴我哪個便宜、哪個牛逼好不好!!
簡單看下這個url:
http://search.dangdang.com/?key=滲透測試&ddsale=1&page_index=2
其中ddsale參數代表當當自營,page_index代表頁數,key代表搜索內容,我們本次的變數只有頁數。
所以我們構造請求的url為:
'http://search.dangdang.com/?key=滲透測試&ddsale=1&page_index='+str(page)
如果修改的內容不使用str字元串轉化,會收到如下報錯:
TypeError: can only concatenate str (not "int") to str
然後我們看一下頁面內容的分布情況,本次我們關心賣什麼書,賣多少錢?
對應的編寫我們的正則匹配規則,當然了,有更簡便的第三方庫可以幫我們處理,但為了更好的形成流程性認識,我們這里依然使用正則。
我們對應我們需要的書籍名稱和當前價格匹配如下:
<a title=" (.*?)" ddclick=
<span class="search_now_price">¥(.*?)</span>
那麼,思路理清了,我們就開始使用多線程來寫我們的小系統~
然後我們去查看一下我們的結果文件~
現在這個小系統具備的功能就是根據用戶需要選擇要檢索的書籍,然後整理下名稱和價格,開了10個線程,如果小夥伴pc給力的話可以繼續加。簡單的異常處理機制和界面交互,基本滿足日常所需。
⑹ python進程和線程中的join方法
python中創建進程的方式
一、Process(target=函數名,args=(),name,kwargs)
target:加進程調用的函數名,一般不加括弧
name:進程的名字
kwargs:字典參數
args:元組參數,如果參數就一個,記得加逗號』,』空察
Python多線程與多進程中join()方法的效果是相同的
join所完成的工作就是線程同步,即主線程任務結束之後,進入阻塞狀態友渣,一好虧悄直等待其他的子線程執行結束之後,主線程再終止
import threading
import time
⑺ python threading 一定要 join 嗎
Join的作用是眾所周知的,阻塞進程直到線程執行完畢。通用的做法是我們啟動一批線程,最後join這些線程結束,例如:
foriinrange(10):
t=ThreadTest(i)
thread_arr.append(t)
foriinrange(10):
thread_arr[i].start()
foriinrange(10):
thread_arr[i].join()
此處join的原理就是依次檢驗線程池中的線程是否結束,沒有結束就阻塞直到線程結束,如果結束則跳轉執行下一個線程的join函數。
而py的join函數還有一個特殊的功能就是可以設置超時,如下:
Thread.join([timeout])
Wait until the thread terminates. This blocks the calling thread until the thread whosejoin()method is called terminates – either normally or through an unhandled exception – or until the optional timeout occurs.
也就是通過傳給join一個參數來設置超時,也就是超過指定時間join就不在阻塞進程。而在實際應用測試的時候發現並不是所有的線程在超時時間內都結束的,而是順序執行檢驗是否在time_out時間內超時,例如,超時時間設置成2s,前面一個線程在沒有完成的情況下,後面線程執行join會從上一個線程結束時間起再設置2s的超時。
⑻ python 多進程
基於官方文檔:
https://docs.python.org/zh-cn/3/library/multiprocessing.html
日樂購,剛才看到的一個博客,寫的都不太對,還是基於官方的比較穩妥
我就是喜歡抄官方的,哈哈
通常我們使用Process實例化一個進程,並調用 他的 start() 方法啟動它。
這種方法和 Thread 是一樣的。
上圖中,我寫了 p.join() 所以主進程是 等待 子進程執行完後,才執行 print("運行結束")
否則就是反過來了(這個不一定,看你的語句了,順序其實是隨機的)例如:
主進加個 sleep
所以不加join() ,其實子進程和主進程是各干各的,誰也不等誰。都執行完後,文件運行就結束了
上面我們用了 os.getpid() 和 os.getppid() 獲取 當前進程,和父進程的id
下面就講一下,這兩個函數的用法:
os.getpid()
返回當前進程的id
os.getppid()
返回父進程的id。 父進程退出後,unix 返回初始化進程(1)中的一個
windows返回相同的id (可能被其他進程使用了)
這也就解釋了,為啥我上面 的程序運行多次, 第一次列印的parentid 都是 14212 了。
而子進程的父級 process id 是調用他的那個進程的 id : 1940
視頻筆記:
多進程:使用大致方法:
參考: 進程通信(pipe和queue)
pool.map (函數可以有return 也可以共享內存或queue) 結果直接是個列表
poll.apply_async() (同map,只不過是一個進程,返回結果用 xx.get() 獲得)
報錯:
參考 : https://blog.csdn.net/xiemanR/article/details/71700531
把 pool = Pool() 放到 if name == " main ": 下面初始化搞定。
結果:
這個肯定有解釋的
測試多進程計算效果:
進程池運行:
結果:
普通計算:
我們同樣傳入 1 2 10 三個參數測試:
其實對比下來開始快了一半的;
我們把循環里的數字去掉一個 0;
單進程:
多進程:
兩次測試 單進程/進程池 分別為 0.669 和 0.772 幾乎成正比的。
問題 二:
視圖:
post 視圖裡面
Music 類:
直接報錯:
寫在 類裡面也 在函數里用 self.pool 調用也不行,也是相同的錯誤。
最後 把 pool = Pool 直接寫在 search 函數裡面,奇跡出現了:
前台也能顯示搜索的音樂結果了
總結一點,進程這個東西,最好 寫在 直接運行的函數裡面,而不是 一個函數跳來跳去。因為最後可能 是在子進程的子進程運行的,這是不許的,會報錯。
還有一點,多進程運行的函數對象,不能是 lambda 函數。也許lambda 虛擬,在內存??
使用 pool.map 子進程 函數報錯,導致整個 pool 掛了:
參考: https://blog.csdn.net/hedongho/article/details/79139606
主要你要,對函數內部捕獲錯誤,而不能讓異常拋出就可以了。
關於map 傳多個函數參數
我一開始,就是正常思維,多個參數,搞個元祖,讓參數一一對應不就行了:
報錯:
參考:
https://blog.csdn.net/qq_15969343/article/details/84672527
普通的 process 當讓可以穿多個參數,map 卻不知道咋傳的。
apply_async 和map 一樣,不知道咋傳的。
最簡單的方法:
使用 starmap 而不是 map
結果:
子進程結束
1.8399453163146973
成功拿到結果了
關於map 和 starmap 不同的地方看源碼:
關於apply_async() ,我沒找到多參數的方法,大不了用 一個迭代的 starmap 實現。哈哈
關於 上面源碼裡面有 itertools.starmap
itertools 用法參考:
https://docs.python.org/zh-cn/3/library/itertools.html#itertool-functions
有個問題,多進程最好不要使用全部的 cpu , 因為這樣可能影響其他任務,所以 在進程池 添加 process 參數 指定,cpu 個數:
上面就是預留了 一個cpu 干其他事的
後面直接使用 Queue 遇到這個問題:
解決:
Manager().Queue() 代替 Queue()
因為 queue.get() 是堵塞型的,所以可以提前判斷是不是 空的,以免堵塞進程。比如下面這樣:
使用 queue.empty() 空為True
⑼ python多進程報錯load_eof
python 多進程報錯(創建運行多進程)
簡單說一下python的多進程包multiprocessing。藉助這個喚仿包,可以輕松完成從單進程到並發執行的轉換。multiprocessing支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
創建進程的類:Process([group [, target [, name [, args [, kwargs]]]]]),target表示調用對象,args表示調用對象的位置參數元組。kwargs表示調用對象的字典。name為別名。group實質上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啟動某個進程。
屬性:authkey、daemon(要通和敗纖過start()設置)、exitcode(進程在運行時為None、如果為–N,表示被信號N結束)、name、pid。其中daemon是父進程終止後自動終止,且自己不能產生新進程,必須在start()之前設置。
1.當我們希望使用python創建一個多進程運行時,碰到下面的報錯提示:
2.解決的方法很簡單,只需要將你的代碼放到 if __name__ == "__main__"枯亂下面,如下圖:
3.下面是測試多進程運行的程序。
import time
import random
from multiprocessing import Process
def run(name):
print(f' 開始運行 {name} 進程...')
# 睡眠一個1~5的隨機數,做進程對比
time.sleep(random.randrange(1,5))
print(f' {name} 進程運行結束。')
if __name__ == "__main__":
p1 = Process(target=run, args=('my_jcy',)) # 必須加,號
p2 = Process(target=run, args=('my_mm',)) # 必須加,號
p1.start()
p2.start()
print('這里是主進程,已結束!')
⑽ Python:進程(threading)
這里是自己寫下關於 Python 跟進程相關的 threading 模塊的一點筆記,跟有些跟 Linux 調用挺像的,有共通之處。
https://docs.python.org/3/library/threading.html?highlight=threading#thread-objects
直接傳入
繼承 Thread 重寫 run 方法
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
group 線程組,未實現
start() 線程就緒
join([timeout]) 阻塞其他線程,直到調用這方法的進程結束或時間到達
RuntimeError: cannot join thread before it is started
get/setName(name) 獲取/設置線程名。
isAlive() 返回線程是否在運行。
is/setDaemon(bool): 獲取/設置是後台線程(默認前台線程(False))。(在start之前設置)
The entire Python program exits when no alive non-daemon threads are left.
沒有非後台進程運行,Python 就退出。
主線程執行完畢後,後台線程不管是成功與否,主線程均停止
t.start()
t.join()
start() 後 join() 會順序執行,失去線程意義
https://docs.python.org/3/library/threading.html?#lock-objects
Lock屬於全局,Rlock屬於線程(R的意思是可重入,線程用Lock的話會死鎖,來看例子)
acquire(blocking=True, timeout=-1) 申請鎖,返回申請的結果
release() 釋放鎖,沒返回結果
https://docs.python.org/3/library/threading.html#condition-objects
可以在構造時傳入rlock lock實例,不然自己生成一個。
acquire([timeout])/release(): 與lock rlock 相同
wait([timeout]): 調用這個方法將使線程進入等待池,並釋放鎖。調用方法前線程必須已獲得鎖定,否則將拋出異常。
notify(): 調用這個方法將從等待池挑選一個線程並通知,收到通知的線程將自動調用acquire()嘗試獲得鎖定(進入鎖定池);其他線程仍然在等待池中。調用這個方法不會釋放鎖定。調用方法前線程必須已獲得鎖定,否則將拋出異常。
notifyAll(): 調用這個方法將通知等待池中所有的線程,這些線程都將進入鎖定池嘗試獲得鎖定。調用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
threading.Semaphore(value=1)
https://docs.python.org/3/library/threading.html#semaphore-objects
acquire(blocking=True, timeout=None)
資源數大於0,減一並返回,等於0時等待,blocking為False不阻塞進程
返回值是申請結果
release()
資源數加1
https://docs.python.org/3/library/threading.html#event-objects
事件內置了一個初始為False的標志
is_set() 返回內置標志的狀態
set() 設為True
clear() 設為False
wait(timeout=None) 阻塞線程並等待,為真時返回。返回值只會在等待超時時為False,其他情況為True
https://docs.python.org/3/library/threading.html#timer-objects
threading.Timer(interval, function, args=None, kwargs=None)
第一個參數是時間間隔,單位是秒,整數或者浮點數,負數不會報錯直接執行不等待
可以用cancel() 取消
https://docs.python.org/3/library/threading.html#barrier-objects
threading.Barrier(parties, action=None, timeout=None)
調用的進程數目達到第一個設置的參數就喚醒全部進程
wait(timeout=None)
reset() 重置,等待中的進程收到 BrokenBarrierError 錯誤