pythonasyncio
❶ python 期物
期物(Future)是concurrent.futures模块和asyncio包的重要组件。
python3.4之后标准库中有两个名为Future的类:concurrent.futures.Future和asyncio.Future.
这两个类的作用相同:类的实例都表示可能已经完成活着尚未完成的延迟计算。与JS库中的Promise对象,Tornado框架中的Future类类似。
通常我们自己不应该创建期物,而只能由并发框架实例化。
这个例子中的future.result方法不会阻塞,因为future对象是有as_completed方法产生的。
在asyncio包中,BaseEventLoop.create_task(...)方法接收一个协程,排定他的运行时间,然后返回一个asyncio.Task实例(也是asyncio.Future类的实例),因为Task是Future的子类,用于包装协程。这与Executor.submit(...)方法创建concurrent.futures.Future实例是一个道理。
因为asyncio.Future类的目的是与yield from一起使用,所以通常不需用使用以下方法:
asyncio.async(coro_or_future, *, loop=None)
这个函数统一了协程和期物:第一个参数可以是二者中的任何一个。如果是 Future或 Task 对象,那就原封不动地返回。如果是协程,那么 async 函数会调用loop.create_task(...) 方法创建 Task 对象。loop= 关键字参数是可选的,用于传入事件循环;如果没有传入,那么 async 函数会通过调用 asyncio.get_event_loop() 函数获取循环对象.
不过,在asyncio 中,基本的流程是一样的:在一个单线程程序中使用主循环依次激活队列里的协程。各个协程向前执行几步,然后把控制权让给主循环,主循环再激活队列里的下一个协程。
asyncio.wait(...) 协程的参数是一个由期物或者协程构成的可迭代对象。wait会分别把各个协程包装进入一个Task对象。最后的结果是,wait处理的所有对象都通过某种方法变成Future实例。wait是协程函数,因此返回的是一个协程或者生成器对象。为了驱动协程,我们把协程传给loop.run_until_complete(...)方法。
❷ 同步、异步(gevent,asyncio)、多线程(threading)效率对比
对比了三种情况下采集50个网页所需时间,可以看出多线程在效率上是远高于gevent的。第一次测试的时候,没有使用monkey这个补丁,socket是阻塞调用的,效率并没有提升,因为还是同步运行的,使用monkey补丁后,使socket变为协作运行,效率大大提升。
同步执行:4.50s
gevent异步:0.47s
threading多线程:0.58s
更新asyncio: 1.1s
gevent并不是同时执行,还是按顺序执行,并未打乱输出结果,多线程不按顺序执行,打乱了输出结果。网络状况好(IO操作延时低)的情况下,gevent能稍微提高点效率,IO操作很费时的情况gevent效率将大大提高,效率最高的还是多线程。
gevent:当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。 gevent - 廖雪峰的官方网站
同步:19065ms
gevent:1555ms
threading:1166ms
增加python3.5下的asyncio测试,时间比gevent和threading长一点,1.1s
❸ Python异步编程全攻略
如果你厌倦了多线程,不妨试试python的异步编程,再引入async, await关键字之后语法变得更加简洁和直观,又经过几年的生态发展,现在是一个很不错的并发模型。
下面介绍一下python异步编程的方方面面。
因为GIL的存在,所以Python的多线程在CPU密集的任务下显得无力,但是对于IO密集的任务,多线程还是足以发挥多线程的优势的,而异步也是为了应对IO密集的任务,所以两者是一个可以相互替代的方案,因为设计的不同,理论上异步要比多线程快,因为异步的花销更少, 因为不需要额外系统申请额外的内存,而线程的创建跟系统有关,需要分配一定量的内存,一般是几兆,比如linux默认是8MB。
虽然异步很好,比如可以使用更少的内存,比如更好地控制并发(也许你并不这么认为:))。但是由于async/await 语法的存在导致与之前的语法有些割裂,所以需要适配,需要付出额外的努力,再者就是生态远远没有同步编程强大,比如很多库还不支持异步,所以你需要一些额外的适配。
为了不给其他网站带来困扰,这里首先在自己电脑启动web服务用于测试,代码很简单。
本文所有依赖如下:
所有依赖可通过代码仓库的requirements.txt一次性安装。
首先看一个错误的例子
输出如下:
发现花费了3秒,不符合预期呀。。。。这是因为虽然用了协程,但是每个协程是串行的运行,也就是说后一个等前一个完成之后才开始,那么这样的异步代码并没有并发,所以我们需要让这些协程并行起来
为了让代码变动的不是太多,所以这里用了一个笨办法来等待所有任务完成, 之所以在main函数中等待是为了不让ClientSession关闭, 如果你移除了main函数中的等待代码会发现报告异常 RuntimeError: Session is closed ,而代码里的解决方案非常的不优雅,需要手动的等待,为了解决这个问题,我们再次改进代码。
这里解决的方式是通过 asyncio.wait 方法等待一个协程列表,默认是等待所有协程结束后返回,会返回一个完成(done)列表,以及一个待办(pending)列表。
如果我们不想要协程对象而是结果,那么我们可以使用 asyncio.gather
结果输出如下:
通过 asyncio.ensure_future 我们就能创建一个协程,跟调用一个函数差别不大,为了等待所有任务完成之后退出,我们需要使用 asyncio.wait 等方法来等待,如果只想要协程输出的结果,我们可以使用 asyncio.gather 来获取结果。
虽然前面能够随心所欲的创建协程,但是就像多线程一样,我们也需要处理协程之间的同步问题,为了保持语法及使用情况的一致,多线程中用到的同步功能,asyncio中基本也能找到, 并且用法基本一致,不一致的地方主要是需要用异步的关键字,比如 async with/ await 等
通过锁让并发慢下来,让协程一个一个的运行。
输出如下:
通过观察很容易发现,并发的速度因为锁而慢下来了,因为每次只有一个协程能获得锁,所以并发变成了串行。
通过事件来通知特定的协程开始工作,假设有一个任务是根据http响应结果选择是否激活。
输出如下:
可以看到事件(Event)等待者都是在得到响应内容之后输出,并且事件(Event)可以是多个协程同时等待。
上面的事件虽然很棒,能够在不同的协程之间同步状态,并且也能够一次性同步所有的等待协程,但是还不够精细化,比如想通知指定数量的等待协程,这个时候Event就无能为力了,所以同步原语中出现了Condition。
输出如下:
可以看到,前面两个等待的协程是在同一时刻完成,而不是全部等待完成。
通过创建协程的数量来控制并发并不是非常优雅的方式,所以可以通过信号量的方式来控制并发。
输出如下:
可以发现,虽然同时创建了三个协程,但是同一时刻只有两个协程工作,而另外一个协程需要等待一个协程让出信号量才能运行。
无论是协程还是线程,任务之间的状态同步还是很重要的,所以有了应对各种同步机制的同步原语,因为要保证一个资源同一个时刻只能一个任务访问,所以引入了锁,又因为需要一个任务等待另一个任务,或者多个任务等待某个任务,因此引入了事件(Event),但是为了更精细的控制通知的程度,所以又引入了条件(Condition), 通过条件可以控制一次通知多少的任务。
有时候的并发需求是通过一个变量控制并发任务的并发数而不是通过创建协程的数量来控制并发,所以引入了信号量(Semaphore),这样就可以在创建的协程数远远大于并发数的情况下让协程在指定的并发量情况下并发。
不得不承认异步编程相比起同步编程的生态要小的很多,所以不可能完全异步编程,因此需要一种方式兼容。
多线程是为了兼容同步得代码。
多进程是为了利用CPU多核的能力。
输出如下:
可以看到总耗时1秒,说明所有的线程跟进程是同时运行的。
下面是本人使用过的一些异步库,仅供参考
web框架
http客户端
数据库
ORM
虽然异步库发展得还算不错,但是中肯的说并没有覆盖方方面面。
虽然我鼓励大家尝试异步编程,但是本文的最后却是让大家谨慎的选择开发环境,如果你觉得本文的并发,同步,兼容多线程,多进程不值得一提,那么我十分推荐你尝试以异步编程的方式开始一个新的项目,如果你对其中一些还有疑问或者你确定了要使用的依赖库并且大多数是没有异步库替代的,那么我还是建议你直接按照自己擅长的同步编程开始。
异步编程虽然很不错,不过,也许你并不需要。
❹ 详解Python中的协程,为什么说它的底层是生成器
协程又称为是微线程,英文名是Coroutine。它和线程一样可以调度,但是不同的是线程的启动和调度需要通过操作系统来处理。并且线程的启动和销毁需要涉及一些操作系统的变量申请和销毁处理,需要的时间比较长。而协程呢,它的调度和销毁都是程序自己来控制的,因此它更加轻量级也更加灵活。
协程有这么多优点,自然也会有一些缺点,其中最大的缺点就是需要编程语言自己支持,否则的话需要开发者自己通过一些方法来实现协程。对于大部分语言来说,都不支持这一机制。go语言由于天然支持协程,并且支持得非常好,使得它广受好评,短短几年时间就迅速流行起来。
对于Python来说,本身就有着一个GIL这个巨大的先天问题。GIL是Python的全局锁,在它的限制下一个Python进程同一时间只能同时执行一个线程,即使是在多核心的机器当中。这就大大影响了Python的性能,尤其是在CPU密集型的工作上。所以为了提升Python的性能,很多开发者想出了使用多进程+协程的方式。一开始是开发者自行实现的,后来在Python3.4的版本当中,官方也收入了这个功能,因此目前可以光明正大地说,Python是支持协程的语言了。
生成器(generator)
生成器我们也在之前的文章当中介绍过,为什么我们介绍协程需要用到生成器呢,是因为Python的协程底层就是通过生成器来实现的。
通过生成器来实现协程的原因也很简单,我们都知道协程需要切换挂起,而生成器当中有一个yield关键字,刚好可以实现这个功能。所以当初那些自己在Python当中开发协程功能的程序员都是通过生成器来实现的,我们想要理解Python当中协程的运用,就必须从最原始的生成器开始。
生成器我们很熟悉了,本质上就是带有yield这个关键词的函数。
async,await和future
从Python3.5版本开始,引入了async,await和future。我们来简单说说它们各自的用途,其中async其实就是@asyncio.coroutine,用途是完全一样的。同样await代替的是yield from,意为等待另外一个协程结束。
我们用这两个一改,上面的代码就成了:
async def test(k):
n = 0
while n < k:
await asyncio.sleep(0.5)
print('n = {}'.format(n))
n += 1
由于我们加上了await,所以每次在打印之前都会等待0.5秒。我们把await换成yield from也是一样的,只不过用await更加直观也更加贴合协程的含义。
Future其实可以看成是一个信号量,我们创建一个全局的future,当一个协程执行完成之后,将结果存入这个future当中。其他的协程可以await future来实现阻塞。我们来看一个例子就明白了:
future = asyncio.Future()
async def test(k):
n = 0
while n < k:
await asyncio.sleep(0.5)
print('n = {}'.format(n))
n += 1
future.set_result('success')
async def log():
result = await future
print(result)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
log(),
test(5)
]))
loop.close()
在这个例子当中我们创建了两个协程,第一个协程是每隔0.5秒print一个数字,在print完成之后把success写入到future当中。第二个协程就是等待future当中的数据,之后print出来。
在loop当中我们要调度执行的不再是一个协程对象了而是两个,所以我们用asyncio当中的wait将这两个对象包起来。只有当wait当中的两个对象执行结束,wait才会结束。loop等待的是wait的结束,而wait等待的是传入其中的协程的结束,这就形成了一个依赖循环,等价于这两个协程对象结束,loop才会结束。
总结
async并不只是可以用在函数上,事实上还有很多其他的用法,比如用在with语句上,用在for循环上等等。这些用法比较小众,细节也很多,就不一一展开了,大家感兴趣的可以自行去了解一下。
不知道大家在读这篇文章的过程当中有没有觉得有些费劲,如果有的话,其实是很正常的。原因也很简单,因为Python原生是不支持协程这个概念的,所以在一开始设计的时候也没有做这方面的准备,是后来觉得有必要才加入的。那么作为后面加入的内容,必然会对原先的很多内容产生影响,尤其是协程借助了之前生成器的概念来实现的,那么必然会有很多耦合不清楚的情况。这也是这一块的语法很乱,对初学者不友好的原因。
❺ 在Python中使用Asyncio系统(3-4)Task 和 Future
Task 和 Future
前面我们讨论了协程,以及如何在循环中运行它们才有用。现在我想简单谈谈Task和Future api。你将使用最多的是Task,因为你的大部分工作将涉及使用create_task()函数运行协程,就像在第22页的“快速开始”中设置的那样。Future类实际上是Task的超类,它提供了与循环交互操作的所有功能。
可以这样简单地理解:Future表示某个活动的未来完成状态,并由循环管理。Task是完全相同的,但是具体的“activity”是一个协程——可能是你用async def函数加上create_task()创建的协程。
Future类表示与循环交互的某个东西的状态。这个描述太模糊了,不太有用,所以你可以将Future实例视为一个切换器,一个完成状态的切换器。当创建Future实例时,切换设置为“尚未完成”状态,但稍后它将是“完成”状态。事实上,Future实例有一个名为done()的方法,它允许你检查状态,如示例 3-15所示。
示例 3-15. 用done()方法检查完成状态
Future实例还可以执行以下操作:
• 设置一个result值(用.set_result(value)设置值并且使用 .result()获取值)
• 使用.cancel()方法取消 (并且会用使用.cancelled()检查是否取消)
• 增加一个Future完成时回调的函数
即使Task更常见,也不可能完全避免使用Future:例如,在执行器上运行函数将返回Future实例,而不是Task。让我们快速看一下 示例 3-16 ,了解一下直接使用Future实例是什么感觉。
示例 3-16. 与Future实例的交互
(L3)创建一个简单的 main函数。我们运行这个函数,等上一会儿然后在Future f上设置一个结果。
(L5)设置一个结果。
(L8)手动创建一个Future实例。注意,这个实例(默认情况下)绑定到我们的循环,但它没有也不会被附加到任何协程(这就是Tasks的作用)。
(L9)在做任何事情之前,确认future还没有完成。
(L11)安排main()协程,传递future。请记住,main()协程所做的所有工作就是sleep,然后切换Future实例。(注意main()协程还不会开始运行:协程只在事件循环运行时才开始运行。)
(L13)在这里我们在Future实例上而不是Task实例上使用run_until_complete()。这和你以前见过的不一样。现在循环正在运行,main()协程将开始执行.
(L16)最终,当future的结果被设置时,它就完成了。完成后,可以访问结果。
当然,你不太可能以这里所示的方式直接使用Future;代码示例仅用于教育目的。你与asynccio的大部分联系都是通过Task实例进行的。
你可能想知道如果在Task实例上调用set_result()会发生什么。在Python 3.8之前可以这样做,但现在不允许这么做了。任务实例是协程对象的包装器,它们的结果值只能在内部设置为底层协程函数的结果,如 示例 3-17所示那样。
示例 3-17. 在task上调用set_result
(L13)唯一的区别是我们创建的是Task实例而不是Future实例。当然,Task API要求我们提供一个协程;这里我们使用sleep()只是因为简单方便。
(L7)正在传入一个Task实例。它满足函数的类型签名(因为Task是Future的子类),但从Python 3.8开始,我们不再允许在Task上调用set_result():尝试这样做将引发RuntimeError。这个想法是,一个Task代表一个正在运行的协程,所以结果应该总是来自于task自身。
(L10, L24)但是,我们仍然可以cancel()一个任务,它将在底层协程中引发CancelledError。
Create_task? Ensure_Future? 下定决心吧!
在第22页的“快速入门”中,我说过运行协程的方法是使用asyncio.create_task()。在引入该函数之前,有必要获取一个循环实例并使用loop.create_task()完成相同的任务。事实上,这也可以通过一个不同的模块级函数来实现:asyncio.ensure_future()。一些开发人员推荐create_task(),而其他人推荐ensure_future()。
在我为这本书做研究的过程中,我确信API方法asyncio.ensure_future()是引起对asyncio库广泛误解的罪魁祸首。API的大部分内容都非常清晰,但在学习过程中还存在一些严重的障碍,这就是其中之一。当你遇到ensure_future()时,你的大脑会非常努力地将其集成到关于asyncio应该如何使用的心理模型中——但很可能会失败!
在Python 3.6 asyncio 文档中,这个现在已经臭名昭着的解释突出了 ensure_future() 的问题:
asyncio.ensure_future(coro_or_future, *, _loop =None)
安排执行一个协程对象:把它包装在future中。返回一个Task对象。如果参数是Future,则直接返回。
什么!? 当我第一次读到这篇文章时,我很困惑。下面希望是对ensure_future()的更清楚的描述:
这个函数很好地说明了针对终端用户开发人员的asyncio API(高级API)和针对框架设计人员的asyncio API(低级API)之间的区别。让我们在示例 3-18中自习看看它是如何工作的。
示例 3-18. 仔细看看ensure_future()在做什么
(L3)一个简单的什么都不做的协程函数。我们只需要一些能组成协程的东西。
(L6)我们通过直接调用该函数来创建协程对象。你的代码很少会这样做,但我想在这里明确地表示,我们正在向每个create_task()和ensure_future()传递一个协程对象。
(L7)获取一个循环。
(L9)首先,我们使用loop.create_task()在循环中调度协程,并返回一个新的Task实例。
(L10)验证类型。到目前为止,没有什么有趣的。
(L12)我们展示了asyncio.ensure_future()可以被用来执行与create_task()相同的动作:我们传入了一个协程,并返回了一个Task实例(并且协程已经被安排在循环中运行)!如果传入的是协程,那么loop.create_task()和asyncio.ensure_future()之间没有区别。
(L15)如果我们给ensure_future()传递一个Task实例会发生什么呢?注意我们要传递的Task实例是已经在第4步通过loop.create_task()创建好的。
(L16)返回的Task实例与传入的Task实例完全相同:它在被传递时没有被改变。
直接传递Future实例的意义何在?为什么用同一个函数做两件不同的事情?答案是,ensure_future()的目的是让框架作者向最终用户开发者提供可以处理两种参数的API。不相信我?这是ex-BDFL自己说的:
ensure_future()的要点是,如果你有一个可能是协程或Future(后者包括一个Task,因为它是Future的子类)的东西,并且你想能够调用一个只在Future上定义的方法(可能唯一有用的例子是cancel())。当它已经是Future(或Task)时,它什么也不做;当它是协程时,它将它包装在Task中。
如果您知道您有一个协程,并且希望它被调度,那么正确的API是create_task()。唯一应该调用ensure_future()的时候是当你提供一个API(像大多数asyncio自己的API),它接受协程或Future,你需要对它做一些事情,需要你有一个Future。
—Guido van Rossum
总而言之,asyncio.sure_future()是一个为框架设计者准备的辅助函数。这一点最容易通过与一种更常见的函数进行类比来解释,所以我们来做这个解释。如果你有几年的编程经验,你可能已经见过类似于例3-19中的istify()函数的函数。示例 3-19中listify()的函数。
示例 3-19. 一个强制输入列表的工具函数
这个函数试图将参数转换为一个列表,不管输入的是什么。api和框架中经常使用这类函数将输入强制转换为已知类型,这将简化后续代码——在本例中,您知道参数(来自listify()的输出)将始终是一个列表。
如果我将listify()函数重命名为ensure_list(),那么您应该开始看到与asyncio.ensure_future()的类似之处:它总是试图将参数强制转换为Future(或子类)类型。这是一个实用函数,它使框架开发人员(而不是像你我这样的终端用户开发人员)的工作变得更容易。
实际上,asyncio标准库模块本身使用ensure_future()正是出于这个原因。当你下次查看API时,你会发现函数参数被描述为“可等待对象”,很可能内部使用ensure_future()强制转换参数。例如,asyncio.gather()函数就像下面的代码一样:
aws参数表示“可等待对象”,包括协程、task和future。在内部,gather()使用ensure_future()进行类型强制转换:task和future保持不变,而把协程强制转为task。
这里的关键是,作为终端用户应用程序开发人员,应该永远不需要使用asyncio.ensure_future()。它更像是框架设计师的工具。如果你需要在事件循环上调度协程,只需直接使用asyncio.create_task()来完成。
在接下来的几节中,我们将回到语言级别的特性,从异步上下文管理器开始。
❻ Python怎么多线程中添加协程
由于python是一种解释性脚本语言,python的多线程在运行过程中始终存在全局线程锁。
简单的来说就是在实际的运行过程中,python只能利用一个线程,因此python的多线程并不达到C语言多线程的性能。
可以使用多进程来代替多线程,但需要注意的是多进程最好不要涉及到例如文件操作的频繁操作IO的功能。
❼ python 通过 asyncio 同时使用 flask (quart)与 websocket
因需要同时使用作为服务端flask (quart)使用客户端使用websocket
参考环境
python3.8
websockets~=10.3
quart~=0.17.0
❽ python协程(4):asyncio
asyncio是官方提供的协程的类库,从python3.4开始支持该模块
async & awiat是python3.5中引入的关键字,使用async关键字可以将一个函数定义为协程函数,使用awiat关键字可以在遇到IO的时候挂起当前协程(也就是任务),去执行其他协程。
await + 可等待的对象(协程对象、Future对象、Task对象 -> IO等待)
注意:在python3.4中是通过asyncio装饰器定义协程,在python3.8中已经移除了asyncio装饰器。
事件循环,可以把他当做是一个while循环,这个while循环在周期性的运行并执行一些协程(任务),在特定条件下终止循环。
loop = asyncio.get_event_loop():生成一个事件循环
loop.run_until_complete(任务):将任务放到事件循环
Tasks用于并发调度协程,通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task() 函数以外,还可以用低层级的 loop.create_task() 或 ensure_future() 函数。不建议手动实例化 Task 对象。
本质上是将协程对象封装成task对象,并将协程立即加入事件循环,同时追踪协程的状态。
注意:asyncio.create_task() 函数在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用 asyncio.ensure_future() 函数。
下面结合async & awiat、事件循环和Task看一个示例
示例一:
*注意:python 3.7以后增加了asyncio.run(协程对象),效果等同于loop = asyncio.get_event_loop(),loop.run_until_complete(协程对象) *
示例二:
注意:asyncio.wait 源码内部会对列表中的每个协程执行ensure_future从而封装为Task对象,所以在和wait配合使用时task_list的值为[func(),func()] 也是可以的。
示例三:
❾ python2.7怎么实现异步
改进之前
之前,我的查询步骤很简单,就是:
前端提交查询请求 --> 建立数据库连接 --> 新建游标 --> 执行命令 --> 接受结果 --> 关闭游标、连接
这几大步骤的顺序执行。
这里面当然问题很大:
建立数据库连接实际上就是新建一个套接字。这是进程间通信的几种方法里,开销最大的了。
在“执行命令”和“接受结果”两个步骤中,线程在阻塞在数据库内部的运行过程中,数据库连接和游标都处于闲置状态。
这样一来,每一次查询都要顺序的新建数据库连接,都要阻塞在数据库返回结果的过程中。当前端提交大量查询请求时,查询效率肯定是很低的。
第一次改进
之前的模块里,问题最大的就是第一步——建立数据库连接套接字了。如果能够一次性建立连接,之后查询能够反复服用这个连接就好了。
所以,首先应该把数据库查询模块作为一个单独的守护进程去执行,而前端app作为主进程响应用户的点击操作。那么两条进程怎么传递消息呢?翻了几天Python文档,终于构思出来:用队列queue作为生产者(web前端)向消费者(数据库后端)传递任务的渠道。生产者,会与SQL命令一起,同时传递一个管道pipe的连接对象,作为任务完成后,回传结果的渠道。确保,任务的接收方与发送方保持一致。
作为第二个问题的解决方法,可以使用线程池来并发获取任务队列中的task,然后执行命令并回传结果。
第二次改进
第一次改进的效果还是很明显的,不用任何测试手段。直接点击页面链接,可以很直观地感觉到反应速度有很明显的加快。
但是对于第二个问题,使用线程池还是有些欠妥当。因为,CPython解释器存在GIL问题,所有线程实际上都在一个解释器进程里调度。线程稍微开多一点,解释器进程就会频繁的切换线程,而线程切换的开销也不小。线程多一点,甚至会出现“抖动”问题(也就是刚刚唤醒一个线程,就进入挂起状态,刚刚换到栈帧或内存的上下文,又被换回内存或者磁盘),效率大大降低。也就是说,线程池的并发量很有限。
试过了多进程、多线程,只能在单个线程里做文章了。
Python中的asyncio库
Python里有大量的协程库可以实现单线程内的并发操作,比如Twisted、Gevent等等。Python官方在3.5版本里提供了asyncio库同样可以实现协程并发。asyncio库大大降低了Python中协程的实现难度,就像定义普通函数那样就可以了,只是要在def前面多加一个async关键词。async def函数中,需要阻塞在其他async def函数的位置前面可以加上await关键词。
import asyncio
async def wait():
await asyncio.sleep(2)
async def execute(task):
process_task(task)
await wait()
continue_job()
async def函数的执行稍微麻烦点。需要首先获取一个loop对象,然后由这个对象代为执行async def函数。
loop = asyncio.get_event_loop()
loop.run_until_complete(execute(task))
loop.close()
loop在执行execute(task)函数时,如果遇到await关键字,就会暂时挂起当前协程,转而去执行其他阻塞在await关键词的协程,从而实现协程并发。
不过需要注意的是,run_until_complete()函数本身是一个阻塞函数。也就是说,当前线程会等候一个run_until_complete()函数执行完毕之后,才会继续执行下一部函数。所以下面这段代码并不能并发执行。
for task in task_list:
loop.run_until_complete(task)
对与这个问题,asyncio库也有相应的解决方案:gather函数。
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(execute(task))
for task in task_list]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
当然了,async def函数的执行并不只有这两种解决方案,还有call_soon与run_forever的配合执行等等,更多内容还请参考官方文档。
Python下的I/O多路复用
协程,实际上,也存在上下文切换,只不过开销很轻微。而I/O多路复用则完全不存在这个问题。
目前,Linux上比较火的I/O多路复用API要算epoll了。Tornado,就是通过调用C语言封装的epoll库,成功解决了C10K问题(当然还有Pypy的功劳)。
在Linux里查文档,可以看到epoll只有三类函数,调用起来比较方便易懂。
创建epoll对象,并返回其对应的文件描述符(file descriptor)。
int epoll_create(int size);
int epoll_create1(int flags);
控制监听事件。第一个参数epfd就对应于前面命令创建的epoll对象的文件描述符;第二个参数表示该命令要执行的动作:监听事件的新增、修改或者删除;第三个参数,是要监听的文件对应的描述符;第四个,代表要监听的事件。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
等候。这是一个阻塞函数,调用者会等候内核通知所注册的事件被触发。
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events,
int maxevents, int timeout,
const sigset_t *sigmask);
在Python的select库里:
select.epoll()对应于第一类创建函数;
epoll.register(),epoll.unregister(),epoll.modify()均是对控制函数epoll_ctl的封装;
epoll.poll()则是对等候函数epoll_wait的封装。
Python里epoll相关API的最大问题应该是在epoll.poll()。相比于其所封装的epoll_wait,用户无法手动指定要等候的事件,也就是后者的第二个参数struct epoll_event *events。没法实现精确控制。因此只能使用替代方案:select.select()函数。
根据Python官方文档,select.select(rlist, wlist, xlist[, timeout])是对Unix系统中select函数的直接调用,与C语言API的传参很接近。前三个参数都是列表,其中的元素都是要注册到内核的文件描述符。如果想用自定义类,就要确保实现了fileno()方法。
其分别对应于:
rlist: 等候直到可读
wlist: 等候直到可写
xlist: 等候直到异常。这个异常的定义,要查看系统文档。
select.select(),类似于epoll.poll(),先注册文件和事件,然后保持等候内核通知,是阻塞函数。
实际应用
Psycopg2库支持对异步和协程,但和一般情况下的用法略有区别。普通数据库连接支持不同线程中的不同游标并发查询;而异步连接则不支持不同游标的同时查询。所以异步连接的不同游标之间必须使用I/O复用方法来协调调度。
所以,我的大致实现思路是这样的:首先并发执行大量协程,从任务队列中提取任务,再向连接池请求连接,创建游标,然后执行命令,并返回结果。在获取游标和接受查询结果之前,均要阻塞等候内核通知连接可用。
其中,连接池返回连接时,会根据引用连接的协程数量,返回负载最轻的连接。这也是自己定义AsyncConnectionPool类的目的。
我的代码位于:bottle-blog/dbservice.py
存在问题
当然了,这个流程目前还一些问题。
首先就是每次轮询拿到任务之后,都会走这么一个流程。
获取连接 --> 新建游标 --> 执行任务 --> 关闭游标 --> 取消连接引用
本来,最好的情况应该是:在轮询之前,就建好游标;在轮询时,直接等候内核通知,执行相应任务。这样可以减少轮询时的任务量。但是如果协程提前对应好连接,那就不能保证在获取任务时,保持各连接负载均衡了。
所以这一块,还有工作要做。
还有就是epoll没能用上,有些遗憾。
以后打算写点C语言的内容,或者用Python/C API,或者用Ctypes包装共享库,来实现epoll的调用。
最后,请允许我吐槽一下Python的epoll相关文档:简直太弱了!!!必须看源码才能弄清楚功能。
❿ Asyncio 协议Protocol 与 传输Transport
python在asyncio库中,提供了一种简单的网络传输模型,协议与传输。
协议和传输,在socket的基础上进行了封装,是更高一层次的应用。
所以说: ASGI服务器并不是从socket基础层面实现通信,而是使用了asyncio中原生提供的一种网络通信方式。
Transport 类位于 asyncio.transports 中,有例如 BaseTransport , WriteTransport 只写, ReadTransport 只读, Transport 继承于前两个只写和只读的Transport
位于 asyncio.Protocol
接受protocol_factory,可以调用的工厂函数,其返回一个协议Protocol实例
server 对象是 asyncio.base_events.Server 的实例
我简单写了个小例子,使用协议和传输,制作一个C/S
为了方便观看调整了下key顺序
可以明确看到,使用了socket,说明socket的建立,已经是封装到内部的。
s端和c端的socket是完全对应的。
而H11是一个实现 http协议 库
uvicorn 用了HTTP协议库做了相应的 Protocol 。交由asyncio提供的网络应用服务处理