python任务队列
A. python多进程为什么一定要
前面讲了为什么Python里推荐用多进程而不是多线程,但是多进程也有其自己的限制:相比线程更加笨重、切换耗时更长,并且在python的多进程下,进程数量不推荐超过CPU核心数(一个进程只有一个GIL,所以一个进程只能跑满一个CPU),因为一个进程占用一个CPU时能充分利用机器的性能,但是进程多了就会出现频繁的进程切换,反而得不偿失。
不过特殊情况(特指IO密集型任务)下,多线程是比多进程好用的。
举个例子:给你200W条url,需要你把每个url对应的页面抓取保存起来,这种时候,单单使用多进程,效果肯定是很差的。为什么呢?
例如每次请求的等待时间是2秒,那么如下(忽略cpu计算时间):
1、单进程+单线程:需要2秒*200W=400W秒==1111.11个小时==46.3天,这个速度明显是不能接受的2、单进程+多线程:例如我们在这个进程中开了10个多线程,比1中能够提升10倍速度,也就是大约4.63天能够完成200W条抓取,请注意,这里的实际执行是:线程1遇见了阻塞,CPU切换到线程2去执行,遇见阻塞又切换到线程3等等,10个线程都阻塞后,这个进程就阻塞了,而直到某个线程阻塞完成后,这个进程才能继续执行,所以速度上提升大约能到10倍(这里忽略了线程切换带来的开销,实际上的提升应该是不能达到10倍的),但是需要考虑的是线程的切换也是有开销的,所以不能无限的启动多线程(开200W个线程肯定是不靠谱的)3、多进程+多线程:这里就厉害了,一般来说也有很多人用这个方法,多进程下,每个进程都能占一个cpu,而多线程从一定程度上绕过了阻塞的等待,所以比单进程下的多线程又更好使了,例如我们开10个进程,每个进程里开20W个线程,执行的速度理论上是比单进程开200W个线程快10倍以上的(为什么是10倍以上而不是10倍,主要是cpu切换200W个线程的消耗肯定比切换20W个进程大得多,考虑到这部分开销,所以是10倍以上)。
还有更好的方法吗?答案是肯定的,它就是:
4、协程,使用它之前我们先讲讲what/why/how(它是什么/为什么用它/怎么使用它)what:
协程是一种用户级的轻量级线程。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
在并发编程中,协程与线程类似,每个协程表示一个执行单元,有自己的本地数据,与其它协程共享全局数据和其它资源。
why:
目前主流语言基本上都选择了多线程作为并发设施,与线程相关的概念是抢占式多任务(Preemptive multitasking),而与协程相关的是协作式多任务。
不管是进程还是线程,每次阻塞、切换都需要陷入系统调用(system call),先让CPU跑操作系统的调度程序,然后再由调度程序决定该跑哪一个进程(线程)。
而且由于抢占式调度执行顺序无法确定的特点,使用线程时需要非常小心地处理同步问题,而协程完全不存在这个问题(事件驱动和异步程序也有同样的优点)。
因为协程是用户自己来编写调度逻辑的,对CPU来说,协程其实是单线程,所以CPU不用去考虑怎么调度、切换上下文,这就省去了CPU的切换开销,所以协程在一定程度上又好于多线程。
how:
python里面怎么使用协程?答案是使用gevent,使用方法:看这里使用协程,可以不受线程开销的限制,我尝试过一次把20W条url放在单进程的协程里执行,完全没问题。
所以最推荐的方法,是多进程+协程(可以看作是每个进程里都是单线程,而这个单线程是协程化的)多进程+协程下,避开了CPU切换的开销,又能把多个CPU充分利用起来,这种方式对于数据量较大的爬虫还有文件读写之类的效率提升是巨大的。
小例子:
#-*- coding=utf-8 -*-
import requests
from multiprocessing import Process
import gevent
from gevent import monkey; monkey.patch_all()import sys
reload(sys)
sys.setdefaultencoding('utf8')
def fetch(url):
try:
s = requests.Session()
r = s.get(url,timeout=1)#在这里抓取页面
except Exception,e:
print e
return ''
def process_start(tasks):
gevent.joinall(tasks)#使用协程来执行
def task_start(filepath,flag = 100000):#每10W条url启动一个进程with open(filepath,'r') as reader:#从给定的文件中读取urlurl = reader.readline().strip()
task_list = []#这个list用于存放协程任务
i = 0 #计数器,记录添加了多少个url到协程队列while url!='':
i += 1
task_list.append(gevent.spawn(fetch,url,queue))#每次读取出url,将任务添加到协程队列if i == flag:#一定数量的url就启动一个进程并执行p = Process(target=process_start,args=(task_list,))p.start()
task_list = [] #重置协程队列
i = 0 #重置计数器
url = reader.readline().strip()
if task_list not []:#若退出循环后任务队列里还有url剩余p = Process(target=process_start,args=(task_list,))#把剩余的url全都放到最后这个进程来执行p.start()
if __name__ == '__main__':
task_start('./testData.txt')#读取指定文件细心的同学会发现:上面的例子中隐藏了一个问题:进程的数量会随着url数量的增加而不断增加,我们在这里不使用进程池multiprocessing.Pool来控制进程数量的原因是multiprocessing.Pool和gevent有冲突不能同时使用,但是有兴趣的同学可以研究一下gevent.pool这个协程池。
另外还有一个问题:每个进程处理的url是累积的而不是独立的,例如第一个进程会处理10W个,第二个进程会变成20W个,以此类推。最后定位到问题是gevent.joinall()导致的问题,有兴趣的同学可以研究一下为什么会这样。不过这个问题的处理方案是:主进程只负责读取url然后写入到list中,在创建子进程的时候直接把list传给子进程,由子进程自己去构建协程。这样就不会出现累加的问题
B. Python 异步任务队列Celery 使用
在 Python 中定义 Celery 的时候,我们要引入 Broker,中文翻译过来就是“中间人”的意思。在工头(生产者)提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农(消费者)等着取出一个个任务准备着手做。这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。
其实现架构如下图所示:
可以看到,Celery 主要包含以下几个模块:
celery可以通过pip自动安装。
broker 可选择使用RabbitMQ/redis,backend可选择使用RabbitMQ/redis/MongoDB。RabbitMQ/redis/mongoDB的安装请参考对应的官方文档。
------------------------------rabbitmq相关----------------------------------------------------------
官网安装方法: http://www.rabbitmq.com/install-windows.html
启动管理插件:sbin/rabbitmq-plugins enable rabbitmq_management 启动rabbitmq:sbin/rabbitmq-server -detached
rabbitmq已经启动,可以打开页面来看看 地址: http://localhost:15672/#/
用户名密码都是guest 。进入可以看到具体页面。 关于rabbitmq的配置,网上很多 自己去搜以下就ok了。
------------------------------rabbitmq相关--------------------------------------------------------
项目结构如下:
使用前,需要三个方面:celery配置,celery实例,需执行的任务函数,如下:
Celery 的配置比较多,可以在 官方配置文档: http://docs.celeryproject.org/en/latest/userguide/configuration.html 查询每个配置项的含义。
当然,要保证上述异步任务and下述定时任务都能正常执行,就需要先启动celery worker,启动命令行如下:
需 启动beat ,执行定时任务时, Celery会通过celery beat进程来完成。Celery beat会保持运行, 一旦到了某一定时任务需要执行时, Celery beat便将其加入到queue中. 不像worker进程, Celery beat只需要一个即可。而且为了避免有重复的任务被发送出去,所以Celery beat仅能有一个。
命令行启动:
如果你想将celery worker/beat要放到后台运行,推荐可以扔给supervisor。
supervisor.conf如下:
C. Python Queue 入门
Queue 叫队列,是数据结构中的一种,基本上所有成熟的编程语言都内置了对 Queue 的支持。
Python 中的 Queue 模块实现了多生产者和多消费者模型,当需要在多线程编程中非常实用。而且该模块中的 Queue 类实现了锁原语,不需要再考虑多线程安全问题。
该模块内置了三种类型的 Queue,分别是 class queue.Queue(maxsize=0) , class queue.LifoQueue(maxsize=0) 和 class queue.PriorityQueue(maxsize=0) 。它们三个的区别仅仅是取出时的顺序不一致而已。
Queue 是一个 FIFO 队列,任务按照添加的顺序被取出。
LifoQueue 是一个 LIFO 队列,类似堆栈,后添加的任务先被取出。
PriorityQueue 是一个优先级队列,队列里面的任务按照优先级排序,优先级高的先被取出。
如你所见,就是上面所说的三种不同类型的内置队列,其中 maxsize 是个整数,用于设置可以放入队列中的任务数的上限。当达到这个大小的时候,插入操作将阻塞至队列中的任务被消费掉。如果 maxsize 小于等于零,则队列尺寸为无限大。
向队列中添加任务,直接调用 put() 函数即可
put() 函数完整的函数签名如下 Queue.put(item, block=True, timeout=None) ,如你所见,该函数有两个可选参数。
默认情况下,在队列满时,该函数会一直阻塞,直到队列中有空余的位置可以添加任务为止。如果 timeout 是正数,则最多阻塞 timeout 秒,如果这段时间内还没有空余的位置出来,则会引发 Full 异常。
当 block 为 false 时,timeout 参数将失效。同时如果队列中没有空余的位置可添加任务则会引发 Full 异常,否则会直接把任务放入队列并返回,不会阻塞。
另外,还可以通过 Queue.put_nowait(item) 来添加任务,相当于 Queue.put(item, False) ,不再赘述。同样,在队列满时,该操作会引发 Full 异常。
从队列中获取任务,直接调用 get() 函数即可。
与 put() 函数一样, get() 函数也有两个可选参数,完整签名如下 Queue.get(block=True, timeout=None) 。
默认情况下,当队列空时调用该函数会一直阻塞,直到队列中有任务可获取为止。如果 timeout 是正数,则最多阻塞 timeout 秒,如果这段时间内还没有任务可获取,则会引发 Empty 异常。
当 block 为 false 时,timeout 参数将失效。同时如果队列中没有任务可获取则会立刻引发 Empty 异常,否则会直接获取一个任务并返回,不会阻塞。
另外,还可以通过 Queue.get_nowait() 来获取任务,相当于 Queue.get(False) ,不再赘述。同样,在队列为空时,该操作会引发 Empty 异常。
Queue.qsize() 函数返回队列的大小。注意这个大小不是精确的,qsize() > 0 不保证后续的 get() 不被阻塞,同样 qsize() < maxsize 也不保证 put() 不被阻塞。
如果队列为空,返回 True ,否则返回 False 。如果 empty() 返回 True ,不保证后续调用的 put() 不被阻塞。类似的,如果 empty() 返回 False ,也不保证后续调用的 get() 不被阻塞。
如果队列是满的返回 True ,否则返回 False 。如果 full() 返回 True 不保证后续调用的 get() 不被阻塞。类似的,如果 full() 返回 False 也不保证后续调用的 put() 不被阻塞。
queue.Queue() 是 FIFO 队列,出队顺序跟入队顺序是一致的。
queue.LifoQueue() 是 LIFO 队列,出队顺序跟入队顺序是完全相反的,类似于栈。
优先级队列中的任务顺序跟放入时的顺序是无关的,而是按照任务的大小来排序,最小值先被取出。那任务比较大小的规则是怎么样的呢。
注意,因为列表的比较对规则是按照下标顺序来比较的,所以在没有比较出大小之前 ,队列中所有列表对应下标位置的元素类型要一致。
好比 [2,1] 和 ["1","b"] 因为第一个位置的元素类型不一样,所以是没有办法比较大小的,所以也就放入不了优先级队列。
然而对于 [2,1] 和 [1,"b"] 来说即使第二个元素的类型不一致也是可以放入优先级队列的,因为只需要比较第一个位置元素的大小就可以比较出结果了,就不需要比较第二个位置元素的大小了。
但是对于 [2,1] 和 1 [2,"b"] 来说,则同样不可以放入优先级队列,因为需要比较第二个位置的元素才可以比较出结果,然而第二个位置的元素类型是不一致的,无法比较大小。
综上,也就是说, 直到在比较出结果之前,对应下标位置的元素类型都是需要一致的 。
下面我们自定义一个动物类型,希望按照年龄大小来做优先级排序。年龄越小优先级越高。
本章节介绍了队列以及其常用操作。因为队列默认实现了锁原语,因此在多线程编程中就不需要再考虑多线程安全问题了,对于程序员来说相当友好了。
D. python多进程中队列不空时阻塞,求解为什么
最近接触一个项目,要在多个虚拟机中运行任务,参考别人之前项目的代码,采用了多进程来处理,于是上网查了查python中的多进程
一、先说说Queue(队列对象)
Queue是python中的标准库,可以直接import 引用,之前学习的时候有听过着名的“先吃先拉”与“后吃先吐”,其实就是这里说的队列,队列的构造的时候可以定义它的容量,别吃撑了,吃多了,就会报错,构造的时候不写或者写个小于1的数则表示无限多
import Queue
q = Queue.Queue(10)
向队列中放值(put)
q.put(‘yang')
q.put(4)
q.put([‘yan','xing'])
在队列中取值get()
默认的队列是先进先出的
>>> q.get()
‘yang'
>>> q.get()
4
>>> q.get()
[‘yan', ‘xing']
当一个队列为空的时候如果再用get取则会堵塞,所以取队列的时候一般是用到
get_nowait()方法,这种方法在向一个空队列取值的时候会抛一个Empty异常
所以更常用的方法是先判断一个队列是否为空,如果不为空则取值
队列中常用的方法
Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.get([block[, timeout]]) 获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
非阻塞 Queue.put(item) 写入队列,timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)
二、multiprocessing中使用子进程概念
from multiprocessing import Process
可以通过Process来构造一个子进程
p = Process(target=fun,args=(args))
再通过p.start()来启动子进程
再通过p.join()方法来使得子进程运行结束后再执行父进程
from multiprocessing import Process
import os
# 子进程要执行的代码
def run_proc(name):
print 'Run child process %s (%s)...' % (name, os.getpid())
if __name__=='__main__':
print 'Parent process %s.' % os.getpid()
p = Process(target=run_proc, args=('test',))
print 'Process will start.'
p.start()
p.join()
print 'Process end.'
上面的程序运行后的结果其实是按照上图中1,2,3分开进行的,先打印1,3秒后打印2,再3秒后打印3
代码中的p.close()是关掉进程池子,是不再向里面添加进程了,对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。
当时也可以是实例pool的时候给它定义一个进程的多少
如果上面的代码中p=Pool(5)那么所有的子进程就可以同时进行
三、多个子进程间的通信
多个子进程间的通信就要采用第一步中说到的Queue,比如有以下的需求,一个子进程向队列中写数据,另外一个进程从队列中取数据,
#coding:gbk
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
while True:
if not q.empty():
value = q.get(True)
print 'Get %s from queue.' % value
time.sleep(random.random())
else:
break
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 等待pw结束:
pw.join()
# 启动子进程pr,读取:
pr.start()
pr.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
print
print '所有数据都写入并且读完'
四、关于上面代码的几个有趣的问题
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
p = Pool()
pw = p.apply_async(write,args=(q,))
pr = p.apply_async(read,args=(q,))
p.close()
p.join()
print
print '所有数据都写入并且读完'
如果main函数写成上面的样本,本来我想要的是将会得到一个队列,将其作为参数传入进程池子里的每个子进程,但是却得到
RuntimeError: Queue objects should only be shared between processes through inheritance
的错误,查了下,大意是队列对象不能在父进程与子进程间通信,这个如果想要使用进程池中使用队列则要使用multiprocess的Manager类
if __name__=='__main__':
manager = multiprocessing.Manager()
# 父进程创建Queue,并传给各个子进程:
q = manager.Queue()
p = Pool()
pw = p.apply_async(write,args=(q,))
time.sleep(0.5)
pr = p.apply_async(read,args=(q,))
p.close()
p.join()
print
print '所有数据都写入并且读完'
这样这个队列对象就可以在父进程与子进程间通信,不用池则不需要Manager,以后再扩展multiprocess中的Manager类吧
关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象中的锁
#coding:gbk
from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random
# 写数据进程执行的代码:
def write(q,lock):
lock.acquire() #加上锁
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
lock.release() #释放锁
# 读数据进程执行的代码:
def read(q):
while True:
if not q.empty():
value = q.get(False)
print 'Get %s from queue.' % value
time.sleep(random.random())
else:
break
if __name__=='__main__':
manager = multiprocessing.Manager()
# 父进程创建Queue,并传给各个子进程:
q = manager.Queue()
lock = manager.Lock() #初始化一把锁
p = Pool()
pw = p.apply_async(write,args=(q,lock))
pr = p.apply_async(read,args=(q,))
p.close()
p.join()
print
print '所有数据都写入并且读完'
E. python 使用celery为了解决什么业务问题
Celery是一个专注于实时处理和任务调度的分布式任务队列。所谓任务就是消息,消息中的有效载荷中包含要执行任务需要的全部数据。
使用Celery的常见场景如下:
1. Web应用。当用户触发的一个操作需要较长时间才能执行完成时,可以把它作为任务交给Celery去异步执行,执行完再返回给用户。这段时间用户不需要等待,提高了网站的整体吞吐量和响应时间。
2. 定时任务。生产环境经常会跑一些定时任务。假如你有上千台的服务器、上千种任务,定时任务的管理很困难,Celery可以帮助我们快速在不同的机器设定不同种任务。
3. 同步完成的附加工作都可以异步完成。比如发送短信/邮件、推送消息、清理/设置缓存等。
Celery还提供了如下的特性:
1. 方便地查看定时任务的执行情况,比如执行是否成功、当前状态、执行任务花费的时间等。
2. 可以使用功能齐备的管理后台或者命令行添加、更新、删除任务。
3. 方便把任务和配置管理相关联。
4. 可选多进程、Eventlet和Gevent三种模式并发执行。
5. 提供错误处理机制。
- 提供多种任务原语,方便实现任务分组、拆分和调用链。
- 支持多种消息代理和存储后端。
F. Python实现简单多线程任务队列
Python实现简单多线程任务队列
最近我在用梯度下降算法绘制神经网络的数据时,遇到了一些算法性能的问题。梯度下降算法的代码如下(伪代码):
defgradient_descent(): # the gradient descent code plotly.write(X, Y)
一般来说,当网络请求 plot.ly 绘图时会阻塞等待返回,于是也会影响到其他的梯度下降函数的执行速度。
一种解决办法是每调用一次 plotly.write 函数就开启一个新的线程,但是这种方法感觉不是很好。 我不想用一个像 cerely(一种分布式任务队列)一样大而全的任务队列框架,因为框架对于我的这点需求来说太重了,并且我的绘图也并不需要 redis 来持久化数据。
那用什么办法解决呢?我在 python 中写了一个很小的任务队列,它可以在一个单独的线程中调用 plotly.write函数。下面是程序代码。
classTaskQueue(Queue.Queue):
首先我们继承 Queue.Queue 类。从 Queue.Queue 类可以继承 get 和 put 方法,以及队列的行为。
def__init__(self, num_workers=1): Queue.Queue.__init__(self) self.num_workers=num_workers self.start_workers()
初始化的时候,我们可以不用考虑工作线程的数量。
defadd_task(self, task,*args,**kwargs): args=argsor() kwargs=kwargsor{} self.put((task, args, kwargs))
我们把 task, args, kwargs 以元组的形式存储在队列中。*args 可以传递数量不等的参数,**kwargs 可以传递命名参数。
defstart_workers(self): foriinrange(self.num_workers): t=Thread(target=self.worker) t.daemon=True t.start()
我们为每个 worker 创建一个线程,然后在后台删除。
下面是 worker 函数的代码:
defworker(self): whileTrue: tupl=self.get() item, args, kwargs=self.get() item(*args,**kwargs) self.task_done()
worker 函数获取队列顶端的任务,并根据输入参数运行,除此之外,没有其他的功能。下面是队列的代码:
我们可以通过下面的代码测试:
defblokkah(*args,**kwargs): time.sleep(5) print“Blokkah mofo!” q=TaskQueue(num_workers=5) foriteminrange(1): q.add_task(blokkah) q.join()# wait for all the tasks to finish. print“Alldone!”
Blokkah 是我们要做的任务名称。队列已经缓存在内存中,并且没有执行很多任务。下面的步骤是把主队列当做单独的进程来运行,这样主程序退出以及执行数据库持久化时,队列任务不会停止运行。但是这个例子很好地展示了如何从一个很简单的小任务写成像工作队列这样复杂的程序。
defgradient_descent(): # the gradient descent code queue.add_task(plotly.write, x=X, y=Y)
修改之后,我的梯度下降算法工作效率似乎更高了。如果你很感兴趣的话,可以参考下面的代码。 classTaskQueue(Queue.Queue): def__init__(self, num_workers=1):Queue.Queue.__init__(self)self.num_workers=num_workersself.start_workers() defadd_task(self, task,*args,**kwargs):args=argsor()kwargs=kwargsor{}self.put((task, args, kwargs)) defstart_workers(self):foriinrange(self.num_workers):t=Thread(target=self.worker)t.daemon=Truet.start() defworker(self):whileTrue:tupl=self.get()item, args, kwargs=self.get()item(*args,**kwargs)self.task_done() deftests():defblokkah(*args,**kwargs):time.sleep(5)print"Blokkah mofo!" q=TaskQueue(num_workers=5) foriteminrange(10):q.add_task(blokkah) q.join()# block until all tasks are doneprint"All done!" if__name__=="__main__":tests()
G. 如何在Python中编写并发程序
GIL
在Python中,由于历史原因(GIL),使得Python中多线程的效果非常不理想.GIL使得任何时刻Python只能利用一个CPU核,并
且它的调度算法简单粗暴:多线程中,让每个线程运行一段时间t,然后强行挂起该线程,继而去运行其他线程,如此周而复始,直到所有线程结束.
这使得无法有效利用计算机系统中的"局部性",频繁的线程切换也对缓存不是很友好,造成资源的浪费.
据说Python官方曾经实现了一个去除GIL的Python解释器,但是其效果还不如有GIL的解释器,遂放弃.后来Python官方推出了"利
用多进程替代多线程"的方案,在Python3中也有concurrent.futures这样的包,让我们的程序编写可以做到"简单和性能兼得".
多进程/多线程+Queue
一般来说,在Python中编写并发程序的经验是:计算密集型任务使用多进程,IO密集型任务使用多进程或者多线程.另外,因为涉及到资源共享,所
以需要同步锁等一系列麻烦的步骤,代码编写不直观.另外一种好的思路是利用多进程/多线程+Queue的方法,可以避免加锁这样麻烦低效的方式.
现在在Python2中利用Queue+多进程的方法来处理一个IO密集型任务.
假设现在需要下载多个网页内容并进行解析,单进程的方式效率很低,所以使用多进程/多线程势在必行.
我们可以先初始化一个tasks队列,里面将要存储的是一系列dest_url,同时开启4个进程向tasks中取任务然后执行,处理结果存储在一个results队列中,最后对results中的结果进行解析.最后关闭两个队列.
下面是一些主要的逻辑代码.
# -*- coding:utf-8 -*-
#IO密集型任务
#多个进程同时下载多个网页
#利用Queue+多进程
#由于是IO密集型,所以同样可以利用threading模块
import multiprocessing
def main():
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
cpu_count = multiprocessing.cpu_count() #进程数目==CPU核数目
create_process(tasks, results, cpu_count) #主进程马上创建一系列进程,但是由于阻塞队列tasks开始为空,副进程全部被阻塞
add_tasks(tasks) #开始往tasks中添加任务
parse(tasks, results) #最后主进程等待其他线程处理完成结果
def create_process(tasks, results, cpu_count):
for _ in range(cpu_count):
p = multiprocessing.Process(target=_worker, args=(tasks, results)) #根据_worker创建对应的进程
p.daemon = True #让所有进程可以随主进程结束而结束
p.start() #启动
def _worker(tasks, results):
while True: #因为前面所有线程都设置了daemon=True,故不会无限循环
try:
task = tasks.get() #如果tasks中没有任务,则阻塞
result = _download(task)
results.put(result) #some exceptions do not handled
finally:
tasks.task_done()
def add_tasks(tasks):
for url in get_urls(): #get_urls() return a urls_list
tasks.put(url)
def parse(tasks, results):
try:
tasks.join()
except KeyboardInterrupt as err:
print "Tasks has been stopped!"
print err
while not results.empty():
_parse(results)
if __name__ == '__main__':
main()
利用Python3中的concurrent.futures包
在Python3中可以利用concurrent.futures包,编写更加简单易用的多线程/多进程代码.其使用感觉和Java的concurrent框架很相似(借鉴?)
比如下面的简单代码示例
def handler():
futures = set()
with concurrent.futures.ProcessPoolExecutor(max_workers=cpu_count) as executor:
for task in get_task(tasks):
future = executor.submit(task)
futures.add(future)
def wait_for(futures):
try:
for future in concurrent.futures.as_completed(futures):
err = futures.exception()
if not err:
result = future.result()
else:
raise err
except KeyboardInterrupt as e:
for future in futures:
future.cancel()
print "Task has been canceled!"
print e
return result
总结
要是一些大型Python项目也这般编写,那么效率也太低了.在Python中有许多已有的框架使用,使用它们起来更加高效.
H. Python分布式进程中你会遇到的坑
写在前面
小惊大怪
你是不是在用Python3或者在windows系统上编程?最重要的是你对进程和线程不是很清楚?那么恭喜你,在python分布式进程中,会有坑等着你去挖。。。(hahahaha,此处允许我吓唬一下你)开玩笑的啦,不过,如果你知道序列中不支持匿名函数,那这个坑就和你say byebye了。好了话不多数,直接进入正题。
分布式进程
正如大家所知道的Process比Thread更稳定,而且Process可以分布到多台机器上,而Thread最多只能分布到同一陆坦咐台机器的多个CPU上。Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
代码记录
举个例子
如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上,这应该怎么用分布式进程来实现呢?你已经知道了原有的Queue可以继续使用,而且通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程来访问Queue了。好,那我们就这么干!
写个task_master.py
我们先看服务进程。服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务。
请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。然后,在另一台机器上启动任务进程(本机上启动也可以)
写个task_worker.py
任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。
运行结果
现在,可信没以试试分布式进程的工作效果了。先启动task_master.py服务进程:
task_master.py进程发送完任务后,开始等待result队列的结果。现在启动task_worker.py进程:
看到没,结果都出错了,我们好好分析一下到底哪出错了。。。
错误分析
在task_master.py的报错提示中,我们知道它说lambda错误,这是因为序列化不支持匿名函数,所以我们得修改代码,重新对queue用QueueManager进行封装放到网络中。
其中task_queue和result_queue是两个队列,分别存放任务和结果。它们用来进行进程间通信,交换对象。
因为是分布式的环境,放入queue中的数据需要等待Workers机器运算处理后再进行读取,这样就需要对queue用QueueManager进行封装放到网络中,这是通过上面的2行代码来实现的。我们给return_task_queue的网络调用接口取了一个名早纯get_task_queue,而return_result_queue的名字是get_result_queue,方便区分对哪个queue进行操作。task.put(n)即是对task_queue进行写入数据,相当于分配任务。而result.get()即是等待workers机器处理后返回的结果。
值得注意 在windows系统中你必须要写IP地址,而其他操作系统比如linux操作系统则就不要了。
修改后的代码
在task_master.py中修改如下:
在task_worker.py中修改如下:
先运行task_master.py,然后再运行task_worker.py
(1)task_master.py运行结果如下
(2)task_worker.py运行结果如下
知识补充
这个简单的Master/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。
Queue对象存储在哪?注意到task_worker.py中根本没有创建Queue的代码,所以,Queue对象存储在task_master.py进程中:
而Queue之所以能通过网络访问,就是通过QueueManager实现的。由于QueueManager管理的不止一个Queue,所以,要给每个Queue的网络调用接口起个名字,比如get_task_queue。task_worker这里的QueueManager注册的名字必须和task_manager中的一样。对比上面的例子,可以看出Queue对象从另一个进程通过网络传递了过来。只不过这里的传递和网络通信由QueueManager完成。
authkey有什么用?这是为了保证两台机器正常通信,不被其他机器恶意干扰。如果task_worker.py的authkey和task_master.py的authkey不一致,肯定连接不上。