非同步任務python
㈠ 學習python需要掌握哪些技術
Python學習路線。
第一階段Python基礎與Linux資料庫。這是Python的入門階段,也是幫助零基礎學員打好基礎的重要階段。你需要掌握Python基本語法規則及變數、邏輯控制、內置數據結構、文件操作、高級函數、模塊、常用標准庫模塊、函數、異常處理、MySQL使用、協程等知識點。
學習目標:掌握Python基礎語法,具備基礎的編程能力;掌握Linux基本操作命令,掌握MySQL進階內容,完成銀行自動提款機系統實戰、英漢詞典、歌詞解析器等項目。
第二階段WEB全棧。這一部分主要學習Web前端相關技術,你需要掌握HTML、CSS、JavaScript、jQuery、BootStrap、Web開發基礎、VUE、Flask Views、Flask模板、 資料庫操作、Flask配置等知識。
學習目標:掌握WEB前端技術內容,掌握WEB後端框架,熟練使用Flask、Tornado、Django,可以完成數據監控後台的項目。
第三階段數據分析+人工智慧。這部分主要是學習爬蟲相關的知識點,你需要掌握數據抓取、數據提取、數據存儲、爬蟲並發、動態網頁抓取、scrapy框架、分布式爬蟲、爬蟲攻防、數據結構、演算法等知識。
學習目標:可以掌握爬蟲、數據採集,數據機構與演算法進階和人工智慧技術。可以完成爬蟲攻防、圖片馬賽克、電影推薦系統、地震預測、人工智慧項目等階段項目。
第四階段高級進階。這是Python高級知識點,你需要學習項目開發流程、部署、高並發、性能調優、Go語言基礎、區塊鏈入門等內容。
學習目標:可以掌握自動化運維與區塊鏈開發技術,可以完成自動化運維項目、區塊鏈等項目。
按照上面的Python學習路線圖學習完後,你基本上就可以成為一名合格的Python開發工程師。當然,想要快速成為企業競聘的精英人才,你需要有好的老師指導,還要有較多的項目積累實戰經驗。
自學本身難度較高,一步一步學下來肯定全面且扎實,如果自己有針對性的想學哪一部分,可以直接跳過暫時不需要的針對性的學習自己需要的模塊,可以多看一些不同的視頻學習。
㈡ python非同步有哪些方式
yield相當於return,他將相應的值返回給調用next()或者send()的調用者,從而交出了CPU使用權,而當調用者再次調用next()或者send()的時候,又會返回到yield中斷的地方,如果send有參數,還會將參數返回給yield賦值的變數,如果沒有就和next()一樣賦值為None。但是這里會遇到一個問題,就是嵌套使用generator時外層的generator需要寫大量代碼,看如下示例:
注意以下代碼均在Python3.6上運行調試
#!/usr/bin/env python# encoding:utf-8def inner_generator():
i = 0
while True:
i = yield i if i > 10: raise StopIterationdef outer_generator():
print("do something before yield")
from_inner = 0
from_outer = 1
g = inner_generator()
g.send(None) while 1: try:
from_inner = g.send(from_outer)
from_outer = yield from_inner except StopIteration: breakdef main():
g = outer_generator()
g.send(None)
i = 0
while 1: try:
i = g.send(i + 1)
print(i) except StopIteration: breakif __name__ == '__main__':
main()041
為了簡化,在Python3.3中引入了yield from
yield from
使用yield from有兩個好處,
1、可以將main中send的參數一直返回給最里層的generator,
2、同時我們也不需要再使用while循環和send (), next()來進行迭代。
我們可以將上邊的代碼修改如下:
def inner_generator():
i = 0
while True:
i = yield i if i > 10: raise StopIterationdef outer_generator():
print("do something before coroutine start") yield from inner_generator()def main():
g = outer_generator()
g.send(None)
i = 0
while 1: try:
i = g.send(i + 1)
print(i) except StopIteration: breakif __name__ == '__main__':
main()
執行結果如下:
do something before coroutine start123456789101234567891011
這里inner_generator()中執行的代碼片段我們實際就可以認為是協程,所以總的來說邏輯圖如下:
我們都知道Python由於GIL(Global Interpreter Lock)原因,其線程效率並不高,並且在*nix系統中,創建線程的開銷並不比進程小,因此在並發操作時,多線程的效率還是受到了很大制約的。所以後來人們發現通過yield來中斷代碼片段的執行,同時交出了cpu的使用權,於是協程的概念產生了。在Python3.4正式引入了協程的概念,代碼示例如下:
import asyncio# Borrowed from http://curio.readthedocs.org/en/latest/[email protected] countdown(number, n):
while n > 0:
print('T-minus', n, '({})'.format(number)) yield from asyncio.sleep(1)
n -= 1loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(countdown("A", 2)),
asyncio.ensure_future(countdown("B", 3))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()12345678910111213141516
示例顯示了在Python3.4引入兩個重要概念協程和事件循環,
通過修飾符@asyncio.coroutine定義了一個協程,而通過event loop來執行tasks中所有的協程任務。之後在Python3.5引入了新的async & await語法,從而有了原生協程的概念。
async & await
在Python3.5中,引入了aync&await 語法結構,通過」aync def」可以定義一個協程代碼片段,作用類似於Python3.4中的@asyncio.coroutine修飾符,而await則相當於」yield from」。
先來看一段代碼,這個是我剛開始使用async&await語法時,寫的一段小程序。
#!/usr/bin/env python# encoding:utf-8import asyncioimport requestsimport time
async def wait_download(url):
response = await requets.get(url)
print("get {} response complete.".format(url))
async def main():
start = time.time()
await asyncio.wait([
wait_download("http://www.163.com"),
wait_download("http://www.mi.com"),
wait_download("http://www.google.com")])
end = time.time()
print("Complete in {} seconds".format(end - start))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
這里會收到這樣的報錯:
Task exception was never retrieved
future: <Task finished coro=<wait_download() done, defined at asynctest.py:9> exception=TypeError("object Response can't be used in 'await' expression",)>
Traceback (most recent call last):
File "asynctest.py", line 10, in wait_download
data = await requests.get(url)
TypeError: object Response can't be used in 'await' expression123456
這是由於requests.get()函數返回的Response對象不能用於await表達式,可是如果不能用於await,還怎麼樣來實現非同步呢?
原來Python的await表達式是類似於」yield from」的東西,但是await會去做參數檢查,它要求await表達式中的對象必須是awaitable的,那啥是awaitable呢? awaitable對象必須滿足如下條件中其中之一:
1、A native coroutine object returned from a native coroutine function .
原生協程對象
2、A generator-based coroutine object returned from a function decorated with types.coroutine() .
types.coroutine()修飾的基於生成器的協程對象,注意不是Python3.4中asyncio.coroutine
3、An object with an await method returning an iterator.
實現了await method,並在其中返回了iterator的對象
根據這些條件定義,我們可以修改代碼如下:
#!/usr/bin/env python# encoding:utf-8import asyncioimport requestsimport time
async def download(url): # 通過async def定義的函數是原生的協程對象
response = requests.get(url)
print(response.text)
async def wait_download(url):
await download(url) # 這里download(url)就是一個原生的協程對象
print("get {} data complete.".format(url))
async def main():
start = time.time()
await asyncio.wait([
wait_download("http://www.163.com"),
wait_download("http://www.mi.com"),
wait_download("http://www.google.com")])
end = time.time()
print("Complete in {} seconds".format(end - start))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())27282930
好了現在一個真正的實現了非同步編程的小程序終於誕生了。
而目前更牛逼的非同步是使用uvloop或者pyuv,這兩個最新的Python庫都是libuv實現的,可以提供更加高效的event loop。
uvloop和pyuv
pyuv實現了Python2.x和3.x,但是該項目在github上已經許久沒有更新了,不知道是否還有人在維護。
uvloop只實現了3.x, 但是該項目在github上始終活躍。
它們的使用也非常簡單,以uvloop為例,只需要添加以下代碼就可以了
import asyncioimport uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())123
㈢ 如何用python簡單的設計開發非同步任務調度隊列
首先,客戶端可以直接扔任務到一個web services的介面上 –》 web api接收到任務後,會根據客戶端的ip和時間戳做task_id,返回給客戶,緊接著在redis裡面標記這任務的狀態。 格式為 func,args,kwargs,timeout=xx,queue_level=xx,interval_time=xx
主服務端:
一個線程,會不停的掃描那個redis hash表,取出任務的interval_time後,進行取模,如果匹配成功,就會塞到 redis sorted set有續集和裡面。
主線程,會不停的看看sorted set裡面,有沒有比自己實現小的任務,有的話,執行並刪除。 這里的執行是用多進程,為毛用多進程,因為線程很多時候是不好控制強制幹掉的。 每個任務都會用multiprocessing的方式去執行,去調用的時候,會多傳進一個task_id,用來把相關的進度推送到redis裡面。 另外,fork進程後,我會得到一個pid,我會把pid和timeout的信息,存放到kill_hash裡面。 然後會不間斷的查看,在指定的timeout內,這pid還在不在,如果還是存在,沒有退出的話,說明他的任務不太正常,我們就可以在main(),裡面幹掉這些任務。
所謂的優先順序就是個 High + middle +Low 的三合一鏈條而已,我每次都會堅持從高到低取任務,如果你的High級別的任務不斷的話,那麼我會一直幹不了低級別的任務了。 代碼的體現是在redis sorted set這邊,設立三個有序集合,我的worker隊列會從high開始做……
那麼如果想幹掉一個任務是如何操作的,首先我需要在 kill_hash 裡面標記任務應該趕緊幹掉,在就是在task_hash裡面把那個task_id幹掉,好讓他不會被持續的加入待執行的隊列裡面。
㈣ 如何讓celery接受定製的參數
許多Django應用需要執行非同步任務, 以便不耽誤http request的執行. 我們也可以選擇許多方法來完成非同步任務, 使用Celery是一個比較好的選擇, 因為Celery
有著大量的社區支持, 能夠完美的擴展, 和Django結合的也很好. Celery不僅能在Django中使用, 還能在其他地方被大量的使用. 因此一旦學會使用Celery, 我
們可以很方便的在其他項目中使用它.
1. Celery版本
本篇博文主要針對Celery 3.0.x. 早期版本的Celery可能有細微的差別.
2. Celery介紹
Celery的主要用處是執行非同步任務, 可以選擇延期或定時執行功能. 為什麼需要執行非同步任務呢?
第一, 假設用戶正發起一個request, 並等待request完成後返回. 在這一request後面的view功能中, 我們可能需要執行一段花費很長時間的程序任務, 這一時間
可能遠遠大於用戶能忍受的范圍. 當這一任務並不需要立刻執行時, 我們便可以使用Celery在後台執行, 而不影響用戶瀏覽網頁. 當有任務需要訪問遠程伺服器完
成時, 我們往往都無法確定需要花費的時間.
第二則是定期執行某些任務. 比如每小時需要檢查一下天氣預報, 然後將數據儲存到資料庫中. 我們可以編寫這一任務, 然後讓Celery每小時執行一次. 這樣我們
的web應用便能獲取最新的天氣預報信息.
我們這里所講的任務task, 就是一個Python功能(function). 定期執行一個任務可以被認為是延時執行該功能. 我們可以使用Celery延遲5分鍾調用function
task1, 並傳入參數(1, 2, 3). 或者我們也可以每天午夜運行該function.
我們偏向於將Celery放入項目中, 便於task訪問統一資料庫和Django設置.
當task准備運行時, Celery會將其放入列隊queue中. queue中儲存著可以運行的task的list. 我們可以使用多個queue, 但為了簡單, 這里我們只使用一個.
將任務task放入queue就像加入todo list一樣. 為了使task運行, 我們還需要在其他線程中運行的苦工worker. worker實時觀察著代運行的task, 並逐一運行這
些task. 你可以使用多個worker, 通常他們位於不同伺服器上. 同樣為了簡單起見, 我們這只是用一個worker.
我們稍後會討論queue, worker和另外一個十分重要的進程, 接下來我們來動動手:
3. 安裝Celery
我們可以使用pip在vietualenv中安裝:
pip install django-celery
4. Django設置
我們暫時使用django runserver來啟動celery. 而Celery代理人(broker), 我們使用Django database broker implementation. 現在我們只需要知道Celery
需要broker, 使用django自身便可以充當broker. (但在部署時, 我們最好使用更穩定和高效的broker, 例如Redis.)
在settings.py中:
import djcelery
djcelery.setup_loader()
BROKER_URL = 'django://'
...
INSTALLED_APPS = (
...
'djcelery',
'kombu.transport.django',
...
)
第一二項是必須的, 第三項則告訴Celery使用Django項目作為broker.
在INSTALLED_APPS中添加的djcelery是必須的. kombu.transport.django則是基於Django的broker
最後創建Celery所需的數據表, 如果使用South作為數據遷移工具, 則運行:
python manage.py migrate
否則運行: (Django 1.6或Django 1.7都可以)
python manage.py syncdb
5. 創建一個task
正如前面所說的, 一個task就是一個Pyhton function. 但Celery需要知道這一function是task, 因此我們可以使用celery自帶的裝飾器decorator: @task. 在
django app目錄中創建taske.py:
from celery import task
@task()
def add(x, y):
return x + y
當settings.py中的djcelery.setup_loader()運行時, Celery便會查看所有INSTALLED_APPS中app目錄中的tasks.py文件, 找到標記為task的function, 並
將它們注冊為celery task.
將function標注為task並不會妨礙他們的正常執行. 你還是可以像平時那樣調用它: z = add(1, 2).
6. 執行task
讓我們以一個簡單的例子作為開始. 例如我們希望在用戶發出request後非同步執行該task, 馬上返回response, 從而不阻塞該request, 使用戶有一個流暢的訪問
過程. 那麼, 我們可以使用.delay, 例如在在views.py的一個view中:
from myapp.tasks import add
...
add.delay(2, 2)
...
Celery會將task加入到queue中, 並馬上返回. 而在一旁待命的worker看到該task後, 便會按照設定執行它, 並將他從queue中移除. 而worker則會執行以下代
碼:
import myapp.tasks.add
myapp.tasks.add(2, 2)
7. 關於import
這里需要注意的是, 在impprt task時, 需要保持一致. 因為在執行djcelery.setup_loader()時, task是以INSTALLED_APPS中的app名,
加.tasks.function_name注冊的, 如果我們由於python path不同而使用不同的引用方式時(例如在tasks.py中使用from myproject.myapp.tasks import
add形式), Celery將無法得知這是同一task, 因此可能會引起奇怪的bug.
8. 測試
a. 啟動worker
正如之前說到的, 我們需要worker來執行task. 以下是在開發環境中的如何啟動worker:
首先啟動terminal, 如同開發django項目一樣, 激活virtualenv, 切換到django項目目錄. 然後啟動django自帶web伺服器: python manage.py runserver.
然後啟動worker:
python manage.py celery worker --loglevel=info
此時, worker將會在該terminal中運行, 並顯示輸出結果.
b. 啟動task
打開新的terminal, 激活virtualenv, 並切換到django項目目錄:
$ python manage.py shell
>>> from myapp.tasks import add
>>> add.delay(2, 2)
此時, 你可以在worker窗口中看到worker執行該task:
[2014-10-07 08:47:08,076: INFO/MainProcess] Got task from broker: myapp.tasks.add[e080e047-b2a2-43a7-af74-d7d9d98b02fc]
[2014-10-07 08:47:08,299: INFO/MainProcess] Task myapp.tasks.add[e080e047-b2a2-43a7-af74-d7d9d98b02fc] succeeded in 0.183349132538s: 4
9. 另一個例子
下面我們來看一個更為真實的例子, 在views.py和tasks.py中:
# views.py
from myapp.tasks import do_something_with_form_data
def view(request):
form = SomeForm(request.POST)
if form.is_valid():
data = form.cleaned_data
# Schele a task to process the data later
do_something_with_form_data.delay(data)
return render_to_response(...)
# tasks.py
@task
def do_something_with_form_data(data):
call_slow_web_service(data['user'], data['text'], ...)
10. 調試
由於Celery的運行需要啟動多個部件, 我們可能會漏掉一兩個. 所以我們建議:
使用最簡單的設置
使用python debug和logging功能顯示當前的進程
11. Eager模式
如果在settings.py設置:
CELERY_ALWAYS_EAGER = True
那麼Celery便以eager模式運行, 則task便不需要加delay運行:
# 若啟用eager模式, 則以下兩行代碼相同
add.delay(2, 2)
add(2, 2)
12. 查看queue
因為我們使用了django作為broker, queue儲存在django的資料庫中. 這就意味著我們可以通過django admin查看該queue:
# admin.py
from django.contrib import admin
from kombu.transport.django import models as kombu_models
admin.site.register(kombu_models.Message)
13. 檢查結果
每次運行非同步task後, Celery都會返回AsyncResult對象作為結果. 你可以將其保存, 然後在將來查看該task是否運行成功和返回結果:
# views.py
result = add.delay(2, 2)
...
if result.ready():
print "Task has run"
if result.successful():
print "Result was: %s" % result.result
else:
if isinstance(result.result, Exception):
print "Task failed e to raising an exception"
raise result.result
else:
print "Task failed without raising exception"
else:
print "Task has not yet run"
14. 定期任務
還有一種Celery的常用模式便是執行定期任務. 執行定期任務時, Celery會通過celerybeat進程來完成. Celerybeat會保持運行, 一旦到了某一定期任務需要執
行時, Celerybeat便將其加入到queue中. 不像worker進程, Celerybeat只有需要一個即可.
啟動Celerybeat:
python manage.py celery beat
使Celery運行定期任務的方式有很多種, 我們先看第一種, 將定期任務儲存在django資料庫中. 即使是在django和celery都運行的狀態, 這一方式也可以讓我們
方便的修改定期任務. 我們只需要設置settings.py中的一項便能開啟這一方式:
# settings.py
CELERYBEAT_SCHEDULER = 'djcelery.schelers.DatabaseScheler'
㈤ Python用asyncio模塊做協程非同步IO爬蟲功能,為啥我這兩個模塊下的代碼錯誤這么多!
隨著node.js的盛行,相信大家今年多多少少都聽到了非同步編程這個概念。Python社區雖然對於非同步編程的支持相比其他語言稍顯遲緩,但是也在Python3.4中加入了asyncio,在Python3.5上又提供了async/await語法層面的支持,剛正式發布的Python3.6中asynico也已經由臨時版改為了穩定版。下面我們就基於Python3.4+來了解一下非同步編程的概念以及asyncio的用法。
什麼是協程
通常在Python中我們進行並發編程一般都是使用多線程或者多進程來實現的,對於計算型任務由於GIL的存在我們通常使用多進程來實現,而對與IO型任務我們可以通過線程調度來讓線程在執行IO任務時讓出GIL,從而實現表面上的並發。
其實對於IO型任務我們還有一種選擇就是協程,協程是運行在單線程當中的「並發」,協程相比多線程一大優勢就是省去了多線程之間的切換開銷,獲得了更大的運行效率。Python中的asyncio也是基於協程來進行實現的。在進入asyncio之前我們先來了解一下Python中怎麼通過生成器進行協程來實現並發。
example1
我們先來看一個簡單的例子來了解一下什麼是協程(coroutine),對生成器不了解的朋友建議先看一下Stackoverflow上面的這篇高票回答。
㈥ python 什麼是http非同步請求
http請求為耗時IO操作,如果同步阻塞的話,進程會等待請求完成。
非同步的話,進程會發出http請求(請求以後不需要cpu),然後跳轉到別的任務,直到http請求完成,再調回來繼續處理得到的http回應。
最經典的例子就是燒水,同步阻塞就是你一直蹲在爐子旁邊等待水燒開,而非同步是把水壺放在爐子上,等水開了以後茶壺會叫,這時候你聽到聲音就會回來處理開水~
㈦ python非同步爬蟲例子
gevent是一個python的並發庫,它為各種並發和網路相關的任務提供了整潔的API。
gevent中用到的主要模式是greenlet,它是以C擴展模塊形式接入Python的輕量級協程。 greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。
實戰
通過用gevent把非同步訪問得到的數據提取出來。
在有道詞典搜索框輸入「hello」按回車。觀察數據請求情況觀察有道的url構建。
㈧ python 非同步是什麼意思
非同步是計算機多線程的非同步處理。與同步處理相對,非同步處理不用阻塞當前線程來等待處理完成,而是允許後續操作,直至其它線程將處理完成,並回調通知此線程。
㈨ python2.7怎麼實現非同步
改進之前
之前,我的查詢步驟很簡單,就是:
前端提交查詢請求 --> 建立資料庫連接 --> 新建游標 --> 執行命令 --> 接受結果 --> 關閉游標、連接
這幾大步驟的順序執行。
這裡面當然問題很大:
建立資料庫連接實際上就是新建一個套接字。這是進程間通信的幾種方法里,開銷最大的了。
在「執行命令」和「接受結果」兩個步驟中,線程在阻塞在資料庫內部的運行過程中,資料庫連接和游標都處於閑置狀態。
這樣一來,每一次查詢都要順序的新建資料庫連接,都要阻塞在資料庫返回結果的過程中。當前端提交大量查詢請求時,查詢效率肯定是很低的。
第一次改進
之前的模塊里,問題最大的就是第一步——建立資料庫連接套接字了。如果能夠一次性建立連接,之後查詢能夠反復服用這個連接就好了。
所以,首先應該把資料庫查詢模塊作為一個單獨的守護進程去執行,而前端app作為主進程響應用戶的點擊操作。那麼兩條進程怎麼傳遞消息呢?翻了幾天Python文檔,終於構思出來:用隊列queue作為生產者(web前端)向消費者(資料庫後端)傳遞任務的渠道。生產者,會與SQL命令一起,同時傳遞一個管道pipe的連接對象,作為任務完成後,回傳結果的渠道。確保,任務的接收方與發送方保持一致。
作為第二個問題的解決方法,可以使用線程池來並發獲取任務隊列中的task,然後執行命令並回傳結果。
第二次改進
第一次改進的效果還是很明顯的,不用任何測試手段。直接點擊頁面鏈接,可以很直觀地感覺到反應速度有很明顯的加快。
但是對於第二個問題,使用線程池還是有些欠妥當。因為,CPython解釋器存在GIL問題,所有線程實際上都在一個解釋器進程里調度。線程稍微開多一點,解釋器進程就會頻繁的切換線程,而線程切換的開銷也不小。線程多一點,甚至會出現「抖動」問題(也就是剛剛喚醒一個線程,就進入掛起狀態,剛剛換到棧幀或內存的上下文,又被換回內存或者磁碟),效率大大降低。也就是說,線程池的並發量很有限。
試過了多進程、多線程,只能在單個線程里做文章了。
Python中的asyncio庫
Python里有大量的協程庫可以實現單線程內的並發操作,比如Twisted、Gevent等等。Python官方在3.5版本里提供了asyncio庫同樣可以實現協程並發。asyncio庫大大降低了Python中協程的實現難度,就像定義普通函數那樣就可以了,只是要在def前面多加一個async關鍵詞。async def函數中,需要阻塞在其他async def函數的位置前面可以加上await關鍵詞。
import asyncio
async def wait():
await asyncio.sleep(2)
async def execute(task):
process_task(task)
await wait()
continue_job()
async def函數的執行稍微麻煩點。需要首先獲取一個loop對象,然後由這個對象代為執行async def函數。
loop = asyncio.get_event_loop()
loop.run_until_complete(execute(task))
loop.close()
loop在執行execute(task)函數時,如果遇到await關鍵字,就會暫時掛起當前協程,轉而去執行其他阻塞在await關鍵詞的協程,從而實現協程並發。
不過需要注意的是,run_until_complete()函數本身是一個阻塞函數。也就是說,當前線程會等候一個run_until_complete()函數執行完畢之後,才會繼續執行下一部函數。所以下面這段代碼並不能並發執行。
for task in task_list:
loop.run_until_complete(task)
對與這個問題,asyncio庫也有相應的解決方案:gather函數。
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(execute(task))
for task in task_list]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
當然了,async def函數的執行並不只有這兩種解決方案,還有call_soon與run_forever的配合執行等等,更多內容還請參考官方文檔。
Python下的I/O多路復用
協程,實際上,也存在上下文切換,只不過開銷很輕微。而I/O多路復用則完全不存在這個問題。
目前,Linux上比較火的I/O多路復用API要算epoll了。Tornado,就是通過調用C語言封裝的epoll庫,成功解決了C10K問題(當然還有Pypy的功勞)。
在Linux里查文檔,可以看到epoll只有三類函數,調用起來比較方便易懂。
創建epoll對象,並返回其對應的文件描述符(file descriptor)。
int epoll_create(int size);
int epoll_create1(int flags);
控制監聽事件。第一個參數epfd就對應於前面命令創建的epoll對象的文件描述符;第二個參數表示該命令要執行的動作:監聽事件的新增、修改或者刪除;第三個參數,是要監聽的文件對應的描述符;第四個,代表要監聽的事件。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
等候。這是一個阻塞函數,調用者會等候內核通知所注冊的事件被觸發。
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events,
int maxevents, int timeout,
const sigset_t *sigmask);
在Python的select庫里:
select.epoll()對應於第一類創建函數;
epoll.register(),epoll.unregister(),epoll.modify()均是對控制函數epoll_ctl的封裝;
epoll.poll()則是對等候函數epoll_wait的封裝。
Python里epoll相關API的最大問題應該是在epoll.poll()。相比於其所封裝的epoll_wait,用戶無法手動指定要等候的事件,也就是後者的第二個參數struct epoll_event *events。沒法實現精確控制。因此只能使用替代方案:select.select()函數。
根據Python官方文檔,select.select(rlist, wlist, xlist[, timeout])是對Unix系統中select函數的直接調用,與C語言API的傳參很接近。前三個參數都是列表,其中的元素都是要注冊到內核的文件描述符。如果想用自定義類,就要確保實現了fileno()方法。
其分別對應於:
rlist: 等候直到可讀
wlist: 等候直到可寫
xlist: 等候直到異常。這個異常的定義,要查看系統文檔。
select.select(),類似於epoll.poll(),先注冊文件和事件,然後保持等候內核通知,是阻塞函數。
實際應用
Psycopg2庫支持對非同步和協程,但和一般情況下的用法略有區別。普通資料庫連接支持不同線程中的不同游標並發查詢;而非同步連接則不支持不同游標的同時查詢。所以非同步連接的不同游標之間必須使用I/O復用方法來協調調度。
所以,我的大致實現思路是這樣的:首先並發執行大量協程,從任務隊列中提取任務,再向連接池請求連接,創建游標,然後執行命令,並返回結果。在獲取游標和接受查詢結果之前,均要阻塞等候內核通知連接可用。
其中,連接池返回連接時,會根據引用連接的協程數量,返回負載最輕的連接。這也是自己定義AsyncConnectionPool類的目的。
我的代碼位於:bottle-blog/dbservice.py
存在問題
當然了,這個流程目前還一些問題。
首先就是每次輪詢拿到任務之後,都會走這么一個流程。
獲取連接 --> 新建游標 --> 執行任務 --> 關閉游標 --> 取消連接引用
本來,最好的情況應該是:在輪詢之前,就建好游標;在輪詢時,直接等候內核通知,執行相應任務。這樣可以減少輪詢時的任務量。但是如果協程提前對應好連接,那就不能保證在獲取任務時,保持各連接負載均衡了。
所以這一塊,還有工作要做。
還有就是epoll沒能用上,有些遺憾。
以後打算寫點C語言的內容,或者用Python/C API,或者用Ctypes包裝共享庫,來實現epoll的調用。
最後,請允許我吐槽一下Python的epoll相關文檔:簡直太弱了!!!必須看源碼才能弄清楚功能。
㈩ Python進程之串列與並行
串列和並行
串列指的是任務的執行方式。串列在執行多個任務時,各個任務按順序執行,完成一個之後才能進行下一個。(早期單核CPU的情況下)
並行指的是多個任務在同一時刻可以同時執行(前提是多核CPU),不需要等待。
同步和非同步
所謂同步就是一個任務的完成需要依賴另外一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列。要成功都成功,失敗都失敗,兩個任務的狀態可以保持一致需要等待、協調運行。
所謂非同步就是彼此獨立的,分配任務後,不需要等待該任務的執行結果,繼續做自己的事,無論被分配的任務是執行成功還是失敗都是不關心的,只要自己完成了整個任務就算完成了。至於其它任務是否真正完成無法確定,所以它是不可靠的任務序列。
相關推薦:《Python視頻教程》
小結:
1、串列和同步的區別:串列指的是在早期單核CPU時,一台電腦在同一時刻只能執行一個程序,如果想要運行另一個程序需要關閉當前程序,才能執行下一個程序,是針對多個程序來說的。同步指的是在一個程序中同一時刻只能執行一個任務。是針對一個程序中多個進程或多個線程來說的。
所以兩者有著本質上的區別。串列是針對多個程序,同步是針對一個程序內部的多個進程或多個線程的。
2、並行和非同步的區別:並行指的是多核CPU,在同一時刻可以執行多個程序。非同步指的是在同一個程序內可以執行多個進程或者多個線程。
兩者本質上的區別就是並行指的是多個程序,非同步指的是一個程序內部的多個進程和多個線程。
3、並行和並發的區別:並行和並發都是指多個程序,但不同的是並行在同一時刻可以同時執行多個任務,而並發在同一時刻只能執行一個任務,通過多道技術在空間上可以開啟多個程序,在時間上通過時間片的方式輪詢多個程序,從用戶的角度來看實現了多個程序同時執行的偽並行,從CPU的角度同一時刻它只能執行一個程序,所以說他是串列的,只不過是由於CPU切換速度太快我們無法從表面看出來而已。
並行是真正的同一時刻執行多個程序,並發是通過時間輪詢的方式實現了偽並行。
阻塞與非阻塞:
阻塞:只要是涉及到I/O操作或者網路請求的都屬於阻塞如read,recv,accept。
非阻塞:只要不涉及到I/O,網路請求的在內存中可以直接計算的就是非阻塞,例如:list.append(8),dict["a"]=1就是非阻塞。
相關推薦:
Python進程之並行與並發的區別