pythonasyncio
❶ python 期物
期物(Future)是concurrent.futures模塊和asyncio包的重要組件。
python3.4之後標准庫中有兩個名為Future的類:concurrent.futures.Future和asyncio.Future.
這兩個類的作用相同:類的實例都表示可能已經完成活著尚未完成的延遲計算。與JS庫中的Promise對象,Tornado框架中的Future類類似。
通常我們自己不應該創建期物,而只能由並發框架實例化。
這個例子中的future.result方法不會阻塞,因為future對象是有as_completed方法產生的。
在asyncio包中,BaseEventLoop.create_task(...)方法接收一個協程,排定他的運行時間,然後返回一個asyncio.Task實例(也是asyncio.Future類的實例),因為Task是Future的子類,用於包裝協程。這與Executor.submit(...)方法創建concurrent.futures.Future實例是一個道理。
因為asyncio.Future類的目的是與yield from一起使用,所以通常不需用使用以下方法:
asyncio.async(coro_or_future, *, loop=None)
這個函數統一了協程和期物:第一個參數可以是二者中的任何一個。如果是 Future或 Task 對象,那就原封不動地返回。如果是協程,那麼 async 函數會調用loop.create_task(...) 方法創建 Task 對象。loop= 關鍵字參數是可選的,用於傳入事件循環;如果沒有傳入,那麼 async 函數會通過調用 asyncio.get_event_loop() 函數獲取循環對象.
不過,在asyncio 中,基本的流程是一樣的:在一個單線程程序中使用主循環依次激活隊列里的協程。各個協程向前執行幾步,然後把控制權讓給主循環,主循環再激活隊列里的下一個協程。
asyncio.wait(...) 協程的參數是一個由期物或者協程構成的可迭代對象。wait會分別把各個協程包裝進入一個Task對象。最後的結果是,wait處理的所有對象都通過某種方法變成Future實例。wait是協程函數,因此返回的是一個協程或者生成器對象。為了驅動協程,我們把協程傳給loop.run_until_complete(...)方法。
❷ 同步、非同步(gevent,asyncio)、多線程(threading)效率對比
對比了三種情況下採集50個網頁所需時間,可以看出多線程在效率上是遠高於gevent的。第一次測試的時候,沒有使用monkey這個補丁,socket是阻塞調用的,效率並沒有提升,因為還是同步運行的,使用monkey補丁後,使socket變為協作運行,效率大大提升。
同步執行:4.50s
gevent非同步:0.47s
threading多線程:0.58s
更新asyncio: 1.1s
gevent並不是同時執行,還是按順序執行,並未打亂輸出結果,多線程不按順序執行,打亂了輸出結果。網路狀況好(IO操作延時低)的情況下,gevent能稍微提高點效率,IO操作很費時的情況gevent效率將大大提高,效率最高的還是多線程。
gevent:當一個greenlet遇到IO操作時,比如訪問網路,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。由於IO操作非常耗時,經常使程序處於等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在運行,而不是等待IO。 gevent - 廖雪峰的官方網站
同步:19065ms
gevent:1555ms
threading:1166ms
增加python3.5下的asyncio測試,時間比gevent和threading長一點,1.1s
❸ Python非同步編程全攻略
如果你厭倦了多線程,不妨試試python的非同步編程,再引入async, await關鍵字之後語法變得更加簡潔和直觀,又經過幾年的生態發展,現在是一個很不錯的並發模型。
下面介紹一下python非同步編程的方方面面。
因為GIL的存在,所以Python的多線程在CPU密集的任務下顯得無力,但是對於IO密集的任務,多線程還是足以發揮多線程的優勢的,而非同步也是為了應對IO密集的任務,所以兩者是一個可以相互替代的方案,因為設計的不同,理論上非同步要比多線程快,因為非同步的花銷更少, 因為不需要額外系統申請額外的內存,而線程的創建跟系統有關,需要分配一定量的內存,一般是幾兆,比如linux默認是8MB。
雖然非同步很好,比如可以使用更少的內存,比如更好地控制並發(也許你並不這么認為:))。但是由於async/await 語法的存在導致與之前的語法有些割裂,所以需要適配,需要付出額外的努力,再者就是生態遠遠沒有同步編程強大,比如很多庫還不支持非同步,所以你需要一些額外的適配。
為了不給其他網站帶來困擾,這里首先在自己電腦啟動web服務用於測試,代碼很簡單。
本文所有依賴如下:
所有依賴可通過代碼倉庫的requirements.txt一次性安裝。
首先看一個錯誤的例子
輸出如下:
發現花費了3秒,不符合預期呀。。。。這是因為雖然用了協程,但是每個協程是串列的運行,也就是說後一個等前一個完成之後才開始,那麼這樣的非同步代碼並沒有並發,所以我們需要讓這些協程並行起來
為了讓代碼變動的不是太多,所以這里用了一個笨辦法來等待所有任務完成, 之所以在main函數中等待是為了不讓ClientSession關閉, 如果你移除了main函數中的等待代碼會發現報告異常 RuntimeError: Session is closed ,而代碼里的解決方案非常的不優雅,需要手動的等待,為了解決這個問題,我們再次改進代碼。
這里解決的方式是通過 asyncio.wait 方法等待一個協程列表,默認是等待所有協程結束後返回,會返回一個完成(done)列表,以及一個待辦(pending)列表。
如果我們不想要協程對象而是結果,那麼我們可以使用 asyncio.gather
結果輸出如下:
通過 asyncio.ensure_future 我們就能創建一個協程,跟調用一個函數差別不大,為了等待所有任務完成之後退出,我們需要使用 asyncio.wait 等方法來等待,如果只想要協程輸出的結果,我們可以使用 asyncio.gather 來獲取結果。
雖然前面能夠隨心所欲的創建協程,但是就像多線程一樣,我們也需要處理協程之間的同步問題,為了保持語法及使用情況的一致,多線程中用到的同步功能,asyncio中基本也能找到, 並且用法基本一致,不一致的地方主要是需要用非同步的關鍵字,比如 async with/ await 等
通過鎖讓並發慢下來,讓協程一個一個的運行。
輸出如下:
通過觀察很容易發現,並發的速度因為鎖而慢下來了,因為每次只有一個協程能獲得鎖,所以並發變成了串列。
通過事件來通知特定的協程開始工作,假設有一個任務是根據http響應結果選擇是否激活。
輸出如下:
可以看到事件(Event)等待者都是在得到響應內容之後輸出,並且事件(Event)可以是多個協程同時等待。
上面的事件雖然很棒,能夠在不同的協程之間同步狀態,並且也能夠一次性同步所有的等待協程,但是還不夠精細化,比如想通知指定數量的等待協程,這個時候Event就無能為力了,所以同步原語中出現了Condition。
輸出如下:
可以看到,前面兩個等待的協程是在同一時刻完成,而不是全部等待完成。
通過創建協程的數量來控制並發並不是非常優雅的方式,所以可以通過信號量的方式來控制並發。
輸出如下:
可以發現,雖然同時創建了三個協程,但是同一時刻只有兩個協程工作,而另外一個協程需要等待一個協程讓出信號量才能運行。
無論是協程還是線程,任務之間的狀態同步還是很重要的,所以有了應對各種同步機制的同步原語,因為要保證一個資源同一個時刻只能一個任務訪問,所以引入了鎖,又因為需要一個任務等待另一個任務,或者多個任務等待某個任務,因此引入了事件(Event),但是為了更精細的控制通知的程度,所以又引入了條件(Condition), 通過條件可以控制一次通知多少的任務。
有時候的並發需求是通過一個變數控制並發任務的並發數而不是通過創建協程的數量來控制並發,所以引入了信號量(Semaphore),這樣就可以在創建的協程數遠遠大於並發數的情況下讓協程在指定的並發量情況下並發。
不得不承認非同步編程相比起同步編程的生態要小的很多,所以不可能完全非同步編程,因此需要一種方式兼容。
多線程是為了兼容同步得代碼。
多進程是為了利用CPU多核的能力。
輸出如下:
可以看到總耗時1秒,說明所有的線程跟進程是同時運行的。
下面是本人使用過的一些非同步庫,僅供參考
web框架
http客戶端
資料庫
ORM
雖然非同步庫發展得還算不錯,但是中肯的說並沒有覆蓋方方面面。
雖然我鼓勵大家嘗試非同步編程,但是本文的最後卻是讓大家謹慎的選擇開發環境,如果你覺得本文的並發,同步,兼容多線程,多進程不值得一提,那麼我十分推薦你嘗試以非同步編程的方式開始一個新的項目,如果你對其中一些還有疑問或者你確定了要使用的依賴庫並且大多數是沒有非同步庫替代的,那麼我還是建議你直接按照自己擅長的同步編程開始。
非同步編程雖然很不錯,不過,也許你並不需要。
❹ 詳解Python中的協程,為什麼說它的底層是生成器
協程又稱為是微線程,英文名是Coroutine。它和線程一樣可以調度,但是不同的是線程的啟動和調度需要通過操作系統來處理。並且線程的啟動和銷毀需要涉及一些操作系統的變數申請和銷毀處理,需要的時間比較長。而協程呢,它的調度和銷毀都是程序自己來控制的,因此它更加輕量級也更加靈活。
協程有這么多優點,自然也會有一些缺點,其中最大的缺點就是需要編程語言自己支持,否則的話需要開發者自己通過一些方法來實現協程。對於大部分語言來說,都不支持這一機制。go語言由於天然支持協程,並且支持得非常好,使得它廣受好評,短短幾年時間就迅速流行起來。
對於Python來說,本身就有著一個GIL這個巨大的先天問題。GIL是Python的全局鎖,在它的限制下一個Python進程同一時間只能同時執行一個線程,即使是在多核心的機器當中。這就大大影響了Python的性能,尤其是在CPU密集型的工作上。所以為了提升Python的性能,很多開發者想出了使用多進程+協程的方式。一開始是開發者自行實現的,後來在Python3.4的版本當中,官方也收入了這個功能,因此目前可以光明正大地說,Python是支持協程的語言了。
生成器(generator)
生成器我們也在之前的文章當中介紹過,為什麼我們介紹協程需要用到生成器呢,是因為Python的協程底層就是通過生成器來實現的。
通過生成器來實現協程的原因也很簡單,我們都知道協程需要切換掛起,而生成器當中有一個yield關鍵字,剛好可以實現這個功能。所以當初那些自己在Python當中開發協程功能的程序員都是通過生成器來實現的,我們想要理解Python當中協程的運用,就必須從最原始的生成器開始。
生成器我們很熟悉了,本質上就是帶有yield這個關鍵詞的函數。
async,await和future
從Python3.5版本開始,引入了async,await和future。我們來簡單說說它們各自的用途,其中async其實就是@asyncio.coroutine,用途是完全一樣的。同樣await代替的是yield from,意為等待另外一個協程結束。
我們用這兩個一改,上面的代碼就成了:
async def test(k):
n = 0
while n < k:
await asyncio.sleep(0.5)
print('n = {}'.format(n))
n += 1
由於我們加上了await,所以每次在列印之前都會等待0.5秒。我們把await換成yield from也是一樣的,只不過用await更加直觀也更加貼合協程的含義。
Future其實可以看成是一個信號量,我們創建一個全局的future,當一個協程執行完成之後,將結果存入這個future當中。其他的協程可以await future來實現阻塞。我們來看一個例子就明白了:
future = asyncio.Future()
async def test(k):
n = 0
while n < k:
await asyncio.sleep(0.5)
print('n = {}'.format(n))
n += 1
future.set_result('success')
async def log():
result = await future
print(result)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
log(),
test(5)
]))
loop.close()
在這個例子當中我們創建了兩個協程,第一個協程是每隔0.5秒print一個數字,在print完成之後把success寫入到future當中。第二個協程就是等待future當中的數據,之後print出來。
在loop當中我們要調度執行的不再是一個協程對象了而是兩個,所以我們用asyncio當中的wait將這兩個對象包起來。只有當wait當中的兩個對象執行結束,wait才會結束。loop等待的是wait的結束,而wait等待的是傳入其中的協程的結束,這就形成了一個依賴循環,等價於這兩個協程對象結束,loop才會結束。
總結
async並不只是可以用在函數上,事實上還有很多其他的用法,比如用在with語句上,用在for循環上等等。這些用法比較小眾,細節也很多,就不一一展開了,大家感興趣的可以自行去了解一下。
不知道大家在讀這篇文章的過程當中有沒有覺得有些費勁,如果有的話,其實是很正常的。原因也很簡單,因為Python原生是不支持協程這個概念的,所以在一開始設計的時候也沒有做這方面的准備,是後來覺得有必要才加入的。那麼作為後面加入的內容,必然會對原先的很多內容產生影響,尤其是協程藉助了之前生成器的概念來實現的,那麼必然會有很多耦合不清楚的情況。這也是這一塊的語法很亂,對初學者不友好的原因。
❺ 在Python中使用Asyncio系統(3-4)Task 和 Future
Task 和 Future
前面我們討論了協程,以及如何在循環中運行它們才有用。現在我想簡單談談Task和Future api。你將使用最多的是Task,因為你的大部分工作將涉及使用create_task()函數運行協程,就像在第22頁的「快速開始」中設置的那樣。Future類實際上是Task的超類,它提供了與循環交互操作的所有功能。
可以這樣簡單地理解:Future表示某個活動的未來完成狀態,並由循環管理。Task是完全相同的,但是具體的「activity」是一個協程——可能是你用async def函數加上create_task()創建的協程。
Future類表示與循環交互的某個東西的狀態。這個描述太模糊了,不太有用,所以你可以將Future實例視為一個切換器,一個完成狀態的切換器。當創建Future實例時,切換設置為「尚未完成」狀態,但稍後它將是「完成」狀態。事實上,Future實例有一個名為done()的方法,它允許你檢查狀態,如示例 3-15所示。
示例 3-15. 用done()方法檢查完成狀態
Future實例還可以執行以下操作:
• 設置一個result值(用.set_result(value)設置值並且使用 .result()獲取值)
• 使用.cancel()方法取消 (並且會用使用.cancelled()檢查是否取消)
• 增加一個Future完成時回調的函數
即使Task更常見,也不可能完全避免使用Future:例如,在執行器上運行函數將返回Future實例,而不是Task。讓我們快速看一下 示例 3-16 ,了解一下直接使用Future實例是什麼感覺。
示例 3-16. 與Future實例的交互
(L3)創建一個簡單的 main函數。我們運行這個函數,等上一會兒然後在Future f上設置一個結果。
(L5)設置一個結果。
(L8)手動創建一個Future實例。注意,這個實例(默認情況下)綁定到我們的循環,但它沒有也不會被附加到任何協程(這就是Tasks的作用)。
(L9)在做任何事情之前,確認future還沒有完成。
(L11)安排main()協程,傳遞future。請記住,main()協程所做的所有工作就是sleep,然後切換Future實例。(注意main()協程還不會開始運行:協程只在事件循環運行時才開始運行。)
(L13)在這里我們在Future實例上而不是Task實例上使用run_until_complete()。這和你以前見過的不一樣。現在循環正在運行,main()協程將開始執行.
(L16)最終,當future的結果被設置時,它就完成了。完成後,可以訪問結果。
當然,你不太可能以這里所示的方式直接使用Future;代碼示例僅用於教育目的。你與asynccio的大部分聯系都是通過Task實例進行的。
你可能想知道如果在Task實例上調用set_result()會發生什麼。在Python 3.8之前可以這樣做,但現在不允許這么做了。任務實例是協程對象的包裝器,它們的結果值只能在內部設置為底層協程函數的結果,如 示例 3-17所示那樣。
示例 3-17. 在task上調用set_result
(L13)唯一的區別是我們創建的是Task實例而不是Future實例。當然,Task API要求我們提供一個協程;這里我們使用sleep()只是因為簡單方便。
(L7)正在傳入一個Task實例。它滿足函數的類型簽名(因為Task是Future的子類),但從Python 3.8開始,我們不再允許在Task上調用set_result():嘗試這樣做將引發RuntimeError。這個想法是,一個Task代表一個正在運行的協程,所以結果應該總是來自於task自身。
(L10, L24)但是,我們仍然可以cancel()一個任務,它將在底層協程中引發CancelledError。
Create_task? Ensure_Future? 下定決心吧!
在第22頁的「快速入門」中,我說過運行協程的方法是使用asyncio.create_task()。在引入該函數之前,有必要獲取一個循環實例並使用loop.create_task()完成相同的任務。事實上,這也可以通過一個不同的模塊級函數來實現:asyncio.ensure_future()。一些開發人員推薦create_task(),而其他人推薦ensure_future()。
在我為這本書做研究的過程中,我確信API方法asyncio.ensure_future()是引起對asyncio庫廣泛誤解的罪魁禍首。API的大部分內容都非常清晰,但在學習過程中還存在一些嚴重的障礙,這就是其中之一。當你遇到ensure_future()時,你的大腦會非常努力地將其集成到關於asyncio應該如何使用的心理模型中——但很可能會失敗!
在Python 3.6 asyncio 文檔中,這個現在已經臭名昭著的解釋突出了 ensure_future() 的問題:
asyncio.ensure_future(coro_or_future, *, _loop =None)
安排執行一個協程對象:把它包裝在future中。返回一個Task對象。如果參數是Future,則直接返回。
什麼!? 當我第一次讀到這篇文章時,我很困惑。下面希望是對ensure_future()的更清楚的描述:
這個函數很好地說明了針對終端用戶開發人員的asyncio API(高級API)和針對框架設計人員的asyncio API(低級API)之間的區別。讓我們在示例 3-18中自習看看它是如何工作的。
示例 3-18. 仔細看看ensure_future()在做什麼
(L3)一個簡單的什麼都不做的協程函數。我們只需要一些能組成協程的東西。
(L6)我們通過直接調用該函數來創建協程對象。你的代碼很少會這樣做,但我想在這里明確地表示,我們正在向每個create_task()和ensure_future()傳遞一個協程對象。
(L7)獲取一個循環。
(L9)首先,我們使用loop.create_task()在循環中調度協程,並返回一個新的Task實例。
(L10)驗證類型。到目前為止,沒有什麼有趣的。
(L12)我們展示了asyncio.ensure_future()可以被用來執行與create_task()相同的動作:我們傳入了一個協程,並返回了一個Task實例(並且協程已經被安排在循環中運行)!如果傳入的是協程,那麼loop.create_task()和asyncio.ensure_future()之間沒有區別。
(L15)如果我們給ensure_future()傳遞一個Task實例會發生什麼呢?注意我們要傳遞的Task實例是已經在第4步通過loop.create_task()創建好的。
(L16)返回的Task實例與傳入的Task實例完全相同:它在被傳遞時沒有被改變。
直接傳遞Future實例的意義何在?為什麼用同一個函數做兩件不同的事情?答案是,ensure_future()的目的是讓框架作者向最終用戶開發者提供可以處理兩種參數的API。不相信我?這是ex-BDFL自己說的:
ensure_future()的要點是,如果你有一個可能是協程或Future(後者包括一個Task,因為它是Future的子類)的東西,並且你想能夠調用一個只在Future上定義的方法(可能唯一有用的例子是cancel())。當它已經是Future(或Task)時,它什麼也不做;當它是協程時,它將它包裝在Task中。
如果您知道您有一個協程,並且希望它被調度,那麼正確的API是create_task()。唯一應該調用ensure_future()的時候是當你提供一個API(像大多數asyncio自己的API),它接受協程或Future,你需要對它做一些事情,需要你有一個Future。
—Guido van Rossum
總而言之,asyncio.sure_future()是一個為框架設計者准備的輔助函數。這一點最容易通過與一種更常見的函數進行類比來解釋,所以我們來做這個解釋。如果你有幾年的編程經驗,你可能已經見過類似於例3-19中的istify()函數的函數。示例 3-19中listify()的函數。
示例 3-19. 一個強制輸入列表的工具函數
這個函數試圖將參數轉換為一個列表,不管輸入的是什麼。api和框架中經常使用這類函數將輸入強制轉換為已知類型,這將簡化後續代碼——在本例中,您知道參數(來自listify()的輸出)將始終是一個列表。
如果我將listify()函數重命名為ensure_list(),那麼您應該開始看到與asyncio.ensure_future()的類似之處:它總是試圖將參數強制轉換為Future(或子類)類型。這是一個實用函數,它使框架開發人員(而不是像你我這樣的終端用戶開發人員)的工作變得更容易。
實際上,asyncio標准庫模塊本身使用ensure_future()正是出於這個原因。當你下次查看API時,你會發現函數參數被描述為「可等待對象」,很可能內部使用ensure_future()強制轉換參數。例如,asyncio.gather()函數就像下面的代碼一樣:
aws參數表示「可等待對象」,包括協程、task和future。在內部,gather()使用ensure_future()進行類型強制轉換:task和future保持不變,而把協程強制轉為task。
這里的關鍵是,作為終端用戶應用程序開發人員,應該永遠不需要使用asyncio.ensure_future()。它更像是框架設計師的工具。如果你需要在事件循環上調度協程,只需直接使用asyncio.create_task()來完成。
在接下來的幾節中,我們將回到語言級別的特性,從非同步上下文管理器開始。
❻ Python怎麼多線程中添加協程
由於python是一種解釋性腳本語言,python的多線程在運行過程中始終存在全局線程鎖。
簡單的來說就是在實際的運行過程中,python只能利用一個線程,因此python的多線程並不達到C語言多線程的性能。
可以使用多進程來代替多線程,但需要注意的是多進程最好不要涉及到例如文件操作的頻繁操作IO的功能。
❼ python 通過 asyncio 同時使用 flask (quart)與 websocket
因需要同時使用作為服務端flask (quart)使用客戶端使用websocket
參考環境
python3.8
websockets~=10.3
quart~=0.17.0
❽ python協程(4):asyncio
asyncio是官方提供的協程的類庫,從python3.4開始支持該模塊
async & awiat是python3.5中引入的關鍵字,使用async關鍵字可以將一個函數定義為協程函數,使用awiat關鍵字可以在遇到IO的時候掛起當前協程(也就是任務),去執行其他協程。
await + 可等待的對象(協程對象、Future對象、Task對象 -> IO等待)
注意:在python3.4中是通過asyncio裝飾器定義協程,在python3.8中已經移除了asyncio裝飾器。
事件循環,可以把他當做是一個while循環,這個while循環在周期性的運行並執行一些協程(任務),在特定條件下終止循環。
loop = asyncio.get_event_loop():生成一個事件循環
loop.run_until_complete(任務):將任務放到事件循環
Tasks用於並發調度協程,通過asyncio.create_task(協程對象)的方式創建Task對象,這樣可以讓協程加入事件循環中等待被調度執行。除了使用 asyncio.create_task() 函數以外,還可以用低層級的 loop.create_task() 或 ensure_future() 函數。不建議手動實例化 Task 對象。
本質上是將協程對象封裝成task對象,並將協程立即加入事件循環,同時追蹤協程的狀態。
注意:asyncio.create_task() 函數在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用 asyncio.ensure_future() 函數。
下面結合async & awiat、事件循環和Task看一個示例
示例一:
*注意:python 3.7以後增加了asyncio.run(協程對象),效果等同於loop = asyncio.get_event_loop(),loop.run_until_complete(協程對象) *
示例二:
注意:asyncio.wait 源碼內部會對列表中的每個協程執行ensure_future從而封裝為Task對象,所以在和wait配合使用時task_list的值為[func(),func()] 也是可以的。
示例三:
❾ python2.7怎麼實現非同步
改進之前
之前,我的查詢步驟很簡單,就是:
前端提交查詢請求 --> 建立資料庫連接 --> 新建游標 --> 執行命令 --> 接受結果 --> 關閉游標、連接
這幾大步驟的順序執行。
這裡面當然問題很大:
建立資料庫連接實際上就是新建一個套接字。這是進程間通信的幾種方法里,開銷最大的了。
在「執行命令」和「接受結果」兩個步驟中,線程在阻塞在資料庫內部的運行過程中,資料庫連接和游標都處於閑置狀態。
這樣一來,每一次查詢都要順序的新建資料庫連接,都要阻塞在資料庫返回結果的過程中。當前端提交大量查詢請求時,查詢效率肯定是很低的。
第一次改進
之前的模塊里,問題最大的就是第一步——建立資料庫連接套接字了。如果能夠一次性建立連接,之後查詢能夠反復服用這個連接就好了。
所以,首先應該把資料庫查詢模塊作為一個單獨的守護進程去執行,而前端app作為主進程響應用戶的點擊操作。那麼兩條進程怎麼傳遞消息呢?翻了幾天Python文檔,終於構思出來:用隊列queue作為生產者(web前端)向消費者(資料庫後端)傳遞任務的渠道。生產者,會與SQL命令一起,同時傳遞一個管道pipe的連接對象,作為任務完成後,回傳結果的渠道。確保,任務的接收方與發送方保持一致。
作為第二個問題的解決方法,可以使用線程池來並發獲取任務隊列中的task,然後執行命令並回傳結果。
第二次改進
第一次改進的效果還是很明顯的,不用任何測試手段。直接點擊頁面鏈接,可以很直觀地感覺到反應速度有很明顯的加快。
但是對於第二個問題,使用線程池還是有些欠妥當。因為,CPython解釋器存在GIL問題,所有線程實際上都在一個解釋器進程里調度。線程稍微開多一點,解釋器進程就會頻繁的切換線程,而線程切換的開銷也不小。線程多一點,甚至會出現「抖動」問題(也就是剛剛喚醒一個線程,就進入掛起狀態,剛剛換到棧幀或內存的上下文,又被換回內存或者磁碟),效率大大降低。也就是說,線程池的並發量很有限。
試過了多進程、多線程,只能在單個線程里做文章了。
Python中的asyncio庫
Python里有大量的協程庫可以實現單線程內的並發操作,比如Twisted、Gevent等等。Python官方在3.5版本里提供了asyncio庫同樣可以實現協程並發。asyncio庫大大降低了Python中協程的實現難度,就像定義普通函數那樣就可以了,只是要在def前面多加一個async關鍵詞。async def函數中,需要阻塞在其他async def函數的位置前面可以加上await關鍵詞。
import asyncio
async def wait():
await asyncio.sleep(2)
async def execute(task):
process_task(task)
await wait()
continue_job()
async def函數的執行稍微麻煩點。需要首先獲取一個loop對象,然後由這個對象代為執行async def函數。
loop = asyncio.get_event_loop()
loop.run_until_complete(execute(task))
loop.close()
loop在執行execute(task)函數時,如果遇到await關鍵字,就會暫時掛起當前協程,轉而去執行其他阻塞在await關鍵詞的協程,從而實現協程並發。
不過需要注意的是,run_until_complete()函數本身是一個阻塞函數。也就是說,當前線程會等候一個run_until_complete()函數執行完畢之後,才會繼續執行下一部函數。所以下面這段代碼並不能並發執行。
for task in task_list:
loop.run_until_complete(task)
對與這個問題,asyncio庫也有相應的解決方案:gather函數。
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(execute(task))
for task in task_list]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
當然了,async def函數的執行並不只有這兩種解決方案,還有call_soon與run_forever的配合執行等等,更多內容還請參考官方文檔。
Python下的I/O多路復用
協程,實際上,也存在上下文切換,只不過開銷很輕微。而I/O多路復用則完全不存在這個問題。
目前,Linux上比較火的I/O多路復用API要算epoll了。Tornado,就是通過調用C語言封裝的epoll庫,成功解決了C10K問題(當然還有Pypy的功勞)。
在Linux里查文檔,可以看到epoll只有三類函數,調用起來比較方便易懂。
創建epoll對象,並返回其對應的文件描述符(file descriptor)。
int epoll_create(int size);
int epoll_create1(int flags);
控制監聽事件。第一個參數epfd就對應於前面命令創建的epoll對象的文件描述符;第二個參數表示該命令要執行的動作:監聽事件的新增、修改或者刪除;第三個參數,是要監聽的文件對應的描述符;第四個,代表要監聽的事件。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
等候。這是一個阻塞函數,調用者會等候內核通知所注冊的事件被觸發。
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events,
int maxevents, int timeout,
const sigset_t *sigmask);
在Python的select庫里:
select.epoll()對應於第一類創建函數;
epoll.register(),epoll.unregister(),epoll.modify()均是對控制函數epoll_ctl的封裝;
epoll.poll()則是對等候函數epoll_wait的封裝。
Python里epoll相關API的最大問題應該是在epoll.poll()。相比於其所封裝的epoll_wait,用戶無法手動指定要等候的事件,也就是後者的第二個參數struct epoll_event *events。沒法實現精確控制。因此只能使用替代方案:select.select()函數。
根據Python官方文檔,select.select(rlist, wlist, xlist[, timeout])是對Unix系統中select函數的直接調用,與C語言API的傳參很接近。前三個參數都是列表,其中的元素都是要注冊到內核的文件描述符。如果想用自定義類,就要確保實現了fileno()方法。
其分別對應於:
rlist: 等候直到可讀
wlist: 等候直到可寫
xlist: 等候直到異常。這個異常的定義,要查看系統文檔。
select.select(),類似於epoll.poll(),先注冊文件和事件,然後保持等候內核通知,是阻塞函數。
實際應用
Psycopg2庫支持對非同步和協程,但和一般情況下的用法略有區別。普通資料庫連接支持不同線程中的不同游標並發查詢;而非同步連接則不支持不同游標的同時查詢。所以非同步連接的不同游標之間必須使用I/O復用方法來協調調度。
所以,我的大致實現思路是這樣的:首先並發執行大量協程,從任務隊列中提取任務,再向連接池請求連接,創建游標,然後執行命令,並返回結果。在獲取游標和接受查詢結果之前,均要阻塞等候內核通知連接可用。
其中,連接池返回連接時,會根據引用連接的協程數量,返回負載最輕的連接。這也是自己定義AsyncConnectionPool類的目的。
我的代碼位於:bottle-blog/dbservice.py
存在問題
當然了,這個流程目前還一些問題。
首先就是每次輪詢拿到任務之後,都會走這么一個流程。
獲取連接 --> 新建游標 --> 執行任務 --> 關閉游標 --> 取消連接引用
本來,最好的情況應該是:在輪詢之前,就建好游標;在輪詢時,直接等候內核通知,執行相應任務。這樣可以減少輪詢時的任務量。但是如果協程提前對應好連接,那就不能保證在獲取任務時,保持各連接負載均衡了。
所以這一塊,還有工作要做。
還有就是epoll沒能用上,有些遺憾。
以後打算寫點C語言的內容,或者用Python/C API,或者用Ctypes包裝共享庫,來實現epoll的調用。
最後,請允許我吐槽一下Python的epoll相關文檔:簡直太弱了!!!必須看源碼才能弄清楚功能。
❿ Asyncio 協議Protocol 與 傳輸Transport
python在asyncio庫中,提供了一種簡單的網路傳輸模型,協議與傳輸。
協議和傳輸,在socket的基礎上進行了封裝,是更高一層次的應用。
所以說: ASGI伺服器並不是從socket基礎層面實現通信,而是使用了asyncio中原生提供的一種網路通信方式。
Transport 類位於 asyncio.transports 中,有例如 BaseTransport , WriteTransport 只寫, ReadTransport 只讀, Transport 繼承於前兩個只寫和只讀的Transport
位於 asyncio.Protocol
接受protocol_factory,可以調用的工廠函數,其返回一個協議Protocol實例
server 對象是 asyncio.base_events.Server 的實例
我簡單寫了個小例子,使用協議和傳輸,製作一個C/S
為了方便觀看調整了下key順序
可以明確看到,使用了socket,說明socket的建立,已經是封裝到內部的。
s端和c端的socket是完全對應的。
而H11是一個實現 http協議 庫
uvicorn 用了HTTP協議庫做了相應的 Protocol 。交由asyncio提供的網路應用服務處理