pythonmap多线程
‘壹’ python中多线程调用全局变量,值不是修改后的值
多线程读取全局变量需要引用线程锁,否则多个线程同时读取同一个全局变量会出现和预期不一样的值
‘贰’ python 中的map(转载)
1 map()函数的简介以及语法:
map是python内置函数,会根据提供的函数对指定的序列做映射。
map()函数的格式是:
map(function,iterable,...)
第一个参数接受一个函数名,后面的参数接受一个或多个可迭代的序列,返回的是一个集合。
把函数依次作用在list中的每一个元素上,得到一个新的list并返回。注意,map不改变原list,而是返回一个新list。
2 map()函数实例:
del square(x):
return x ** 2
map(square,[1,2,3,4,5]) ---- -要打印结果需要 print(*map(square,[1,2,3,4,5])),这块打印了再打印就会为空
# 结果如下:
[1,4,9,16,25]
通过使用lambda匿名函数的方法使用map()函数:
map(lambda x, y: x+y,[1,3,5,7,9],[2,4,6,8,10])
# 结果如下:
[3,7,11,15,19]
通过lambda函数使返回值是一个元组:
map(lambdax, y : (x**y,x+y),[2,4,6],[3,2,1])
# 结果如下
[(8,5),(16,6),(6,7)]
当不传入function时,map()就等同于zip(),将多个列表相同位置的元素归并到一个元组:
map(None,[2,4,6],[3,2,1])
# 结果如下
[(2,3),(4,2),(6,1)]
通过map还可以实现类型转换
将元组转换为list:
map(int,(1,2,3))
# 结果如下:
[1,2,3]
将字符串转换为list:
map(int,'1234')
# 结果如下:
[1,2,3,4]
提取字典中的key,并将结果放在一个list中:
map(int,{1:2,2:3,3:4})
# 结果如下
[1,2,3]
原文链接:https://blog.csdn.net/quanlingtu1272/article/details/95482253
‘叁’ Python:map函数用法详解
一个简单的例子:将一个list中所有元素平方,常规的做法如下图所示,虽然实现了这个功能,但并没有给人一目了然的感觉。若换成map来实现,则会好很多。
1、map函数介绍及其简单使用
上述用一个简单的例子演示的map函数的用法及其优势,下面将详细介绍map函数的用法:map()函数接收两个参数,一个是函数,一个是Iterable,map将传入的函数依次作用到序列的每一个元素,并把结果作为新的Iterable返回。其语法格式为:
map(function,iterable...)
function---函数名
iterable---一个或多个序列
map作为高阶函数,事实上它把运算规则抽象了,我们可以用这种方式计算任意复杂的函数,再比如,把一个list的所有数据转为string类型:
再举一个小例子,对list中的各个元素开方,一步到位:
!注意:在使用math自带函数时,只需要函数名即可
2、map函数与lambda函数结合使用,下面方法同样可以达到对list中的数二次方的目的
map函数与lambda函数结合使用,可以传入两个参数相加:
还可以同时计算多个值:
‘肆’ Python多线程总结
在实际处理数据时,因系统内存有限,我们不可能一次把所有数据都导出进行操作,所以需要批量导出依次操作。为了加快运行,我们会采用多线程的方法进行数据处理, 以下为我总结的多线程批量处理数据的模板:
主要分为三大部分:
共分4部分对多线程的内容进行总结。
先为大家介绍线程的相关概念:
在飞车程序中,如果没有多线程,我们就不能一边听歌一边玩飞车,听歌与玩 游戏 不能并行;在使用多线程后,我们就可以在玩 游戏 的同时听背景音乐。在这个例子中启动飞车程序就是一个进程,玩 游戏 和听音乐是两个线程。
Python 提供了 threading 模块来实现多线程:
因为新建线程系统需要分配资源、终止线程系统需要回收资源,所以如果可以重用线程,则可以减去新建/终止的开销以提升性能。同时,使用线程池的语法比自己新建线程执行线程更加简洁。
Python 为我们提供了 ThreadPoolExecutor 来实现线程池,此线程池默认子线程守护。它的适应场景为突发性大量请求或需要大量线程完成任务,但实际任务处理时间较短。
其中 max_workers 为线程池中的线程个数,常用的遍历方法有 map 和 submit+as_completed 。根据业务场景的不同,若我们需要输出结果按遍历顺序返回,我们就用 map 方法,若想谁先完成就返回谁,我们就用 submit+as_complete 方法。
我们把一个时间段内只允许一个线程使用的资源称为临界资源,对临界资源的访问,必须互斥的进行。互斥,也称间接制约关系。线程互斥指当一个线程访问某临界资源时,另一个想要访问该临界资源的线程必须等待。当前访问临界资源的线程访问结束,释放该资源之后,另一个线程才能去访问临界资源。锁的功能就是实现线程互斥。
我把线程互斥比作厕所包间上大号的过程,因为包间里只有一个坑,所以只允许一个人进行大号。当第一个人要上厕所时,会将门上上锁,这时如果第二个人也想大号,那就必须等第一个人上完,将锁解开后才能进行,在这期间第二个人就只能在门外等着。这个过程与代码中使用锁的原理如出一辙,这里的坑就是临界资源。 Python 的 threading 模块引入了锁。 threading 模块提供了 Lock 类,它有如下方法加锁和释放锁:
我们会发现这个程序只会打印“第一道锁”,而且程序既没有终止,也没有继续运行。这是因为 Lock 锁在同一线程内第一次加锁之后还没有释放时,就进行了第二次 acquire 请求,导致无法执行 release ,所以锁永远无法释放,这就是死锁。如果我们使用 RLock 就能正常运行,不会发生死锁的状态。
在主线程中定义 Lock 锁,然后上锁,再创建一个子 线程t 运行 main 函数释放锁,结果正常输出,说明主线程上的锁,可由子线程解锁。
如果把上面的锁改为 RLock 则报错。在实际中设计程序时,我们会将每个功能分别封装成一个函数,每个函数中都可能会有临界区域,所以就需要用到 RLock 。
一句话总结就是 Lock 不能套娃, RLock 可以套娃; Lock 可以由其他线程中的锁进行操作, RLock 只能由本线程进行操作。
‘伍’ python map是不是多线程
显然不是。map是完全的单线程。
‘陆’ js判定是否传入回调函数
关于js回调函数,自己之前了解过,但是概念不是很清晰了,这里重新找几篇博客回顾一下概念,整理的感觉比较好的几个博客的总结的概念。方便复习。
js中的回调函数的理解:回调函数就是传递一个参数化函数,就是将这个函数作为一个参数传到另外一个主函数里面,当那个主函数执行完之后,再执行传递过去的函数,走这个过程的参数化的函数,就叫回调函数,换个说法也就是被作为参数传递到另一个函数(主函数)的那个函数就叫做回调函数。
回调函数:函数a有一个参数,这个参数是个函数b,当函数a执行完以后执行函数b。那么这个过程就叫回调。,这句话的意思是函数b以一个参数的形式传入函数a并执行,顺序是先执行a ,然后执行参数b,b就是所谓的回调函数。
function a(callback){
alert('a');
callback.call(this);//或者是 callback(), callback.apply(this),看个人喜好
}
function b(){ // 为回调函数。
alert('b');
}
//调用
a(b);
1
2
3
4
5
6
7
8
9
1
2
3
4
5
6
7
8
9
js中的回调函数:官方解释,当程序跑起来的时候,一般情况下,应用程序会时常通过API调用库里的所先预备好的函数,但是有些库函数,却要求应用先传给它的一个函数,好在适合的时候调用,以完成目标任务。这个被传入的,后又被调用的函数成为回调函数。
通常将一个函数B传入另外一个函数A,并且在需要的时候调用A.,说白了就是回溯函数,先定义好将要使用的函数体,饭后在使用在调用这个函数我们通常把callback作为一个参数传入定义的那个函数。下面我们看一段实现这个效果的js代码。
function Buy(name,goods1,callback) {
alert(name+' buy '+goods1);
if(callback&&typeof(callback)==="function")
callback();
}
Buy('xiaoming','apple',function(){
alert("shopping finish");
});
1
2
3
4
5
6
7
8
1
2
3
4
5
6
7
8
一个简单的代码,一开始不知道要买啥,等到买东西的时候,立即把之前定义好的函数调用出来,最好加上判断,因为一切的前提是callback必须是一个函数,输出结果为:
xiaoming buy apple
shopping finish
1
2
1
2
这样应该能理解什么是回调函数了吧。
打开CSDN,阅读体验更佳
JS中的 回调函数(callback)_前端小草籽的博客_js回调函数
1.什么是回调函数(callback)呢? 把函数当作一个参数传到另外一个函数中,当需要用这个函数是,再回调运行()这个函数. 回调函数是一段可执行的代码段,它作为一个参数传递给其他的代码,其作用是在需要的时候方便调用这段(回调函数)代码。
JS中什么是回调函数?_路过的假面骑士dcd的博客
参数可以拿来用,你也可以不用。形参,形式上的参数,并没有实际意义,只是帮你完成函数内部逻辑运算而设置的。 回调函数:被作为实参传入另一函数,并在该外部函数内被调用,用以来完成某些任务的函数,称为回调函数。 functiongreeting(name){...
JS回调函数——简单易懂有实例
初学js的时候,被回调函数搞得很晕,现在回过头来总结一下什么是回调函数。什么是JS?(点击查看) 下面先看看标准的解释: <script language="javascript"> 02 function SortNumber( obj, func ) // 定义通用排序函数 03 { 04 //...
继续访问
浅析JS中回调函数及用法
主要介绍了JS中回调函数及用法,通过实例代码给大家详细介绍了什么是回调函数,非常不错,具有一定的参考借鉴价值,需要的朋友参考下吧
JS回调函数(callback)
浅谈JS回调函数
继续访问
JS中的 回调函数(callback)
目录1.什么是回调函数(callback)呢?2.回调函数有哪些特点?3.回调函数中this的指向问题4.为什么要用到回调函数?5.回调函数和异步操作的关系是什么?回调函数是异步么?把函数当作一个参数传到另外一个函数中,当需要用这个函数是,再回调运行()这个函数.回调函数是一段可执行的代码段,它作为一个参数传递给其他的代码,其作用是在需要的时候方便调用这段(回调函数)代码。(作为参数传递到另外一个函数中,这个作为参数的函数就是回调函数)理解:函数可以作为一个参数传递到另外一个函数中。 分析:add(1,
继续访问
js之回调函数
回调函数 回调函数被作为实参传入另一函数,并在该外部函数内被调用,用以来完成某些任务的函数,称为回调函数。 一个简单的例子 <script type="text/javascript"> window.onload = function(){ // 回调函数 function a(m){ return m+m; } console.log(a(3));//6 返回一个数据 console.log(a);//f a(m){return m+n} 返
继续访问
<Zhuuu_ZZ>HIVE(十一)函数
Hive内置函数一 Hive函数分类二 字符函数二 类型转换函数和数学函数三 日期函数四 集合函数五 条件函数六 聚合函数和表生成函数6.1 聚合函数6.2 表生成函数:输出可以作为表使用 一 Hive函数分类 从输入输出角度分类 标准函数:一行数据中的一列或多列为输入,结果为单一值 聚合函数:多行的零列到多列为输入,结果为单一值 表生成函数:零个或多个输入,结果为多列或多行 从实现方式分类 内置函数 自定义函数 UDF:自定义标准函数 UDAF:自定义聚合函数 UDTF:自定义表生成函数
继续访问
常见的开窗函数
开窗函数与聚合函数计算方式一样,开窗函数也是对行集组进行聚合计算,但是它不像普通聚合函数那样每组只返回一个值,开窗函数可以为每组返回多个值。 开窗函数的语法为:over(partition by 列名1 order by 列名2 ),括号中的两个关键词partition by 和order by 可以只出现一个。over() 前面是一个函数,如果是聚合函数,那么order by 不能一起使用。 开窗函数主要分为以下两类: 窗口函数OVER()指定一组行,开窗函数计算从窗口函数输出的结果集中各行的值。
继续访问
开窗函数总结
4.2.1,表的数据 4.2.3,开窗函数查询 1,结果如下: 2,结果如下,可以参照这个结果进行理解rows和range的区别 3,结果如下,可以用于获取当前数据行的 上次登录时间 的需求 4,结果如下,结合lead()函数 可以获取用户 上次登录时间与下次登录时间的 需求 5,结果如下,可以用于指定时间内最新或最旧数据的需求。 6,结果如下,可用于求比例的需求 7,结果如下: 7,结果如下: 8,结果如下 9,结果如下: ,10,结果
继续访问
热门推荐 python中def用法
一、函数调用的含义 函数是类似于可封装的程序片段。允许你给一块语句一个名字,允许您在你的程序的任何地方使用指定的名字运行任何次数。 python中有许多内置函数,如len和range。 函数概念可能是任何有价值软件中最重要的块(在任何编程语言中)。 二、定义函数使用def关键字 在这个关键字之后是标识函数的名字; 其次是在一对括号中可以附上一些变量名; 最后在行的末尾...
继续访问
python map函数的作用_python语言基础之map函数,urlib.request,多线程
1.map函数map 是 Python 自带的内置函数,它的作用是把一个函数应用在一个(或多个)序列上,把列表中的每一项作为函数输入进行计算,再把计算的结果以列表的形式返回。map 的第一个参数是一个函数,之后的参数是序列,可以是 list、tuple。当 map 中的函数为 None 时,结果将会直接返回参数组成的列表。(python3中去掉了None,会报错)lst_1 = (1,2,3,4,...
继续访问
JS中什么是回调函数?
对于刚学JS的初学者来说(包括我现在的自己),对于这个回调函数真的是踩坑无数,于是乎想作为一个淋过雨的人,想为后面刚入门的人打一把伞。 本文不会用专业的知识词汇,只会用口语来简单让你有一个概念帮你浅浅的理解这个名词,如果你是一个学习JS刚遇到这个名词,正处于一脸懵逼的状态,那么本文对于会有帮助,但如果你想要研究更深层次的原理,用法和含义,可能本文不适合你。 废话不多说,让我们先看MDN的解释。 这段话,首先我们要搞懂一个东西,什么是实参。 我们都知道,函数可以接受参数,形参和实参。那么什么是
继续访问
js回调函数的两种写法
回调函数 应用程序时常会通过API调用库里所预先备好的函数。但是有些库函数(library function)却要求应用先传给它一个函数,好在合适的时候调用,以完成目标任务。这个被传入的、后又被调用的函数就称为回调函数(callback function)。 总结一下回调函数的两种写法与用法: 非参数回调函数: 这种回调比较简单 ,往往只需传一个函数名就可以。 function demo(arg,callback){ } 再来看看怎么写这个函数 在js中是可以通过函数名来调用函数的 例如: var
继续访问
【一句话攻略】彻底理解JS中的回调(Callback)函数
回调函数
继续访问
sql开窗函数(窗口函数)详解
一、什么是开窗函数 开窗函数/分析函数:over() 开窗函数也叫分析函数,有两类:一类是聚合开窗函数,一类是排序开窗函数。 开窗函数的调用格式为: 函数名(列名) OVER(partition by 列名 order by列名) 。 如果你没听说过开窗函数,看到上面开窗函数的调用方法,你可能还会有些疑惑。但只要你了解聚合函数,那么理解开窗函数就非常容易了。 我们知道聚合函数对一组值执行计算并返回单一的值,如sum(),count(),max(),min(), avg()等,这些函数常与grou
继续访问
最新发布 Python中很常用的函数map(),一起来看看用法
Python2中map直接返回作用后的元素的列表 Python3中map返回的则是一个map对象 如果想得到列表对象,则还需要调用list转化为列表对象 Python2中,map()函数的 function 可以为None,如map(iterable1,iterable2[,...[,iterable n),其作用类似于将iterable*中的对应索引的值取出作为一个元组,最终返回一个包含多个元组的列表。而Python3中,不指定 function,就会报错。
继续访问
Oracle分析函数Over()
一、Over()分析函数 说明:聚合函数(如sum()、max()等)可以计算基于组的某种聚合值,但是聚合函数对于某个组只能返回一行记录。若想对于某组返回多行记录,则需要使用分析函数。 1、rank()/dense_rank over(partition by ... order by ...) 说明:over()在什么条件之上; partition by 按哪个字段划分组; ...
继续访问
mysql开窗函数有哪些_mysql开窗函数
开窗函数:它可以理解为记录集合,开窗函数也就是在满足某种条件的记录集合上执行的特殊函数。对于每条记录都要在此窗口内执行函数,有的函数随着记录不同,窗口大小都是固定的,这种属于静态窗口;有的函数则相反,不同的记录对应着不同的窗口,这种动态变化的窗口叫滑动窗口。开窗函数的本质还是聚合运算,只不过它更具灵活性,它对数据的每一行,都使用与该行相关的行进行计算并返回计算结果。开窗函数和普通聚合函数的区别聚合...
继续访问
SQL:开窗函数(窗口函数)
4、 窗口函数 目录4、 窗口函数4.1 排序窗口函数rank4.2 rank(), dense_rank(), row_number()区别4.3 、排序截取数据lag(),lead(),ntile(),cume_dist()4.4 聚合函数作为窗口函数4.4、over(- - rows between and ) 简单理解,就是对查询的结果多出一列,这一列可以是聚合值,也可以是排序值。 开窗函数一般就是说的是over()函数,其窗口是由一个 OVER 子句 定义的多行记录 开窗函数一般分为两类,
继续访问
开窗函数(分析函数)使用详解
开窗函数 简介 开窗函数:在开窗函数出现之前存在着很多用 SQL 语句很难解决的问题,很多都要通过复杂的相关子查询或者存储过程来完成。为了解决这些问题,在 2003 年 ISO SQL 标准加入了开窗函数,开窗函数的使用使得这些经典的难题可以被轻松的解决。目前在 MSSQLServer、Oracle、DB2 等主流数据库中都提供了对开窗函数的支持,MySQL8.0支持。 5.7 --> 8.0 开窗函数简介:与聚合函数一样,开窗函数也是对行集组进行聚合计算,但是它不像普通聚合函数那样每组只返回一个
继续访问
敲黑板啦!开窗函数你学会了吗
特征分析与偏移分析什么是开窗函数?学习目标:1、累计计算窗口函数(1)sum(…) over(……)(2)avg(…) over(……)(3)语法总结:2、分区排序窗口函数3、分组排序窗口函数4、偏移分析窗口函数练习总结: 什么是开窗函数? 开窗函数用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用GROUP BY子句对数据进行分组,能够在同一行中同时返回...
继续访问
‘柒’ map函数的用法python
map函数的用法如下:
map(func, lst) ,将传⼊的函数变量 func 作⽤到 lst 变量的每个元素中,并将结果组成新的列表 (Python2)/ 迭代器(Python3) 返回。
注意:
map()返回的是一个迭代器,直接打印map()的结果是返回的一个对象。
map函数示例代码:
lst = ['1', '2', '3', '4', '5', '6']
print(lst)
lst_int = map(lambda x: int(x), lst)
# print(list(lst_int))
for i in lst_int:
print(i, end=' ')
print()
print(list(lst_int))
‘捌’ map 在多线程中的操作
只有一个线程读,不需要加锁。
只有一个线程写,不需要加锁。
多个线程读 不需要加锁。
只有一个线程写,其他线程读或者写需要加锁。
(留待验证)
‘玖’ python 多进程和多线程配合
由于python的多线程中存在PIL锁,因此python的多线程不能利用多核,那么,由于现在的计算机是多核的,就不能充分利用计算机的多核资源。但是python中的多进程是可以跑在不同的cpu上的。因此,尝试了多进程+多线程的方式,来做一个任务。比如:从中科大的镜像源中下载多个rpm包。
#!/usr/bin/pythonimport reimport commandsimport timeimport multiprocessingimport threadingdef download_image(url):
print '*****the %s rpm begin to download *******' % url
commands.getoutput('wget %s' % url)def get_rpm_url_list(url):
commands.getoutput('wget %s' % url)
rpm_info_str = open('index.html').read()
regu_mate = '(?<=<a href=")(.*?)(?=">)'
rpm_list = re.findall(regu_mate, rpm_info_str)
rpm_url_list = [url + rpm_name for rpm_name in rpm_list] print 'the count of rpm list is: ', len(rpm_url_list) return rpm_url_
def multi_thread(rpm_url_list):
threads = [] # url = 'https://mirrors.ustc.e.cn/centos/7/os/x86_64/Packages/'
# rpm_url_list = get_rpm_url_list(url)
for index in range(len(rpm_url_list)): print 'rpm_url is:', rpm_url_list[index]
one_thread = threading.Thread(target=download_image, args=(rpm_url_list[index],))
threads.append(one_thread)
thread_num = 5 # set threading pool, you have put 4 threads in it
while 1:
count = min(thread_num, len(threads)) print '**********count*********', count ###25,25,...6707%25
res = [] for index in range(count):
x = threads.pop()
res.append(x) for thread_index in res:
thread_index.start() for j in res:
j.join() if not threads:
def multi_process(rpm_url_list):
# process num at the same time is 4
process = []
rpm_url_group_0 = []
rpm_url_group_1 = []
rpm_url_group_2 = []
rpm_url_group_3 = [] for index in range(len(rpm_url_list)): if index % 4 == 0:
rpm_url_group_0.append(rpm_url_list[index]) elif index % 4 == 1:
rpm_url_group_1.append(rpm_url_list[index]) elif index % 4 == 2:
rpm_url_group_2.append(rpm_url_list[index]) elif index % 4 == 3:
rpm_url_group_3.append(rpm_url_list[index])
rpm_url_groups = [rpm_url_group_0, rpm_url_group_1, rpm_url_group_2, rpm_url_group_3] for each_rpm_group in rpm_url_groups:
each_process = multiprocessing.Process(target = multi_thread, args = (each_rpm_group,))
process.append(each_process) for one_process in process:
one_process.start() for one_process in process:
one_process.join()# for each_url in rpm_url_list:# print '*****the %s rpm begin to download *******' %each_url## commands.getoutput('wget %s' %each_url)
def main():
url = 'https://mirrors.ustc.e.cn/centos/7/os/x86_64/Packages/'
url_paas = 'http://mirrors.ustc.e.cn/centos/7.3.1611/paas/x86_64/openshift-origin/'
url_paas2 ='http://mirrors.ustc.e.cn/fedora/development/26/Server/x86_64/os/Packages/u/'
start_time = time.time()
rpm_list = get_rpm_url_list(url_paas) print multi_process(rpm_list) # print multi_thread(rpm_list)
#print multi_process()
# print multi_thread(rpm_list)
# for index in range(len(rpm_list)):
# print 'rpm_url is:', rpm_list[index]
end_time = time.time() print 'the download time is:', end_time - start_timeprint main()123456789101112131415161718
代码的功能主要是这样的:
main()方法中调用get_rpm_url_list(base_url)方法,获取要下载的每个rpm包的具体的url地址。其中base_url即中科大基础的镜像源的地址,比如:http://mirrors.ustc.e.cn/centos/7.3.1611/paas/x86_64/openshift-origin/,这个地址下有几十个rpm包,get_rpm_url_list方法将每个rpm包的url地址拼出来并返回。
multi_process(rpm_url_list)启动多进程方法,在该方法中,会调用多线程方法。该方法启动4个多进程,将上面方法得到的rpm包的url地址进行分组,分成4组,然后每一个组中的rpm包再最后由不同的线程去执行。从而达到了多进程+多线程的配合使用。
代码还有需要改进的地方,比如多进程启动的进程个数和rpm包的url地址分组是硬编码,这个还需要改进,毕竟,不同的机器,适合同时启动的进程个数是不同的。
‘拾’ 有没有易懂的 Python 多线程爬虫代码
Python 在程序并行化方面多少有些声名狼藉。撇开技术上的问题,例如线程的实现和 GIL1,我觉得错误的教学指导才是主要问题。常见的经典 Python 多线程、多进程教程多显得偏“重”。而且往往隔靴搔痒,没有深入探讨日常工作中最有用的内容。
传统的例子
简单搜索下“Python 多线程教程”,不难发现几乎所有的教程都给出涉及类和队列的例子:
#Example.py
'''
Standard Procer/Consumer Threading Pattern
'''
import time
import threading
import Queue
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while True:
# queue.get() blocks the current thread until
# an item is retrieved.
msg = self._queue.get()
# Checks if the current message is
# the "Poison Pill"
if isinstance(msg, str) and msg == 'quit':
# if so, exists the loop
break
# "Processes" (or in our case, prints) the queue item
print "I'm a thread, and I received %s!!" % msg
# Always be friendly!
print 'Bye byes!'
def Procer():
# Queue is used to share items between
# the threads.
queue = Queue.Queue()
# Create an instance of the worker
worker = Consumer(queue)
# start calls the internal run() method to
# kick off the thread
worker.start()
# variable to keep track of when we started
start_time = time.time()
# While under 5 seconds..
while time.time() - start_time < 5:
# "Proce" a piece of work and stick it in
# the queue for the Consumer to process
queue.put('something at %s' % time.time())
# Sleep a bit just to avoid an absurd number of messages
time.sleep(1)
# This the "poison pill" method of killing a thread.
queue.put('quit')
# wait for the thread to close down
worker.join()
if __name__ == '__main__':
Procer()
哈,看起来有些像 Java 不是吗?
我并不是说使用生产者/消费者模型处理多线程/多进程任务是错误的(事实上,这一模型自有其用武之地)。只是,处理日常脚本任务时我们可以使用更有效率的模型。
问题在于…
首先,你需要一个样板类;
其次,你需要一个队列来传递对象;
而且,你还需要在通道两端都构建相应的方法来协助其工作(如果需想要进行双向通信或是保存结果还需要再引入一个队列)。
worker 越多,问题越多
按照这一思路,你现在需要一个 worker 线程的线程池。下面是一篇 IBM 经典教程中的例子——在进行网页检索时通过多线程进行加速。
#Example2.py
'''
A more realistic thread pool example
'''
import time
import threading
import Queue
import urllib2
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while True:
content = self._queue.get()
if isinstance(content, str) and content == 'quit':
break
response = urllib2.urlopen(content)
print 'Bye byes!'
def Procer():
urls = [
'', ''
'', ''
# etc..
]
queue = Queue.Queue()
worker_threads = build_worker_pool(queue, 4)
start_time = time.time()
# Add the urls to process
for url in urls:
queue.put(url)
# Add the poison pillv
for worker in worker_threads:
queue.put('quit')
for worker in worker_threads:
worker.join()
print 'Done! Time taken: {}'.format(time.time() - start_time)
def build_worker_pool(queue, size):
workers = []
for _ in range(size):
worker = Consumer(queue)
worker.start()
workers.append(worker)
return workers
if __name__ == '__main__':
Procer()
这段代码能正确的运行,但仔细看看我们需要做些什么:构造不同的方法、追踪一系列的线程,还有为了解决恼人的死锁问题,我们需要进行一系列的 join 操作。这还只是开始……
至此我们回顾了经典的多线程教程,多少有些空洞不是吗?样板化而且易出错,这样事倍功半的风格显然不那么适合日常使用,好在我们还有更好的方法。
何不试试 map
map 这一小巧精致的函数是简捷实现 Python 程序并行化的关键。map 源于 Lisp 这类函数式编程语言。它可以通过一个序列实现两个函数之间的映射。
urls = ['', '']
results = map(urllib2.urlopen, urls)
上面的这两行代码将 urls 这一序列中的每个元素作为参数传递到 urlopen 方法中,并将所有结果保存到 results 这一列表中。其结果大致相当于:
results = []
for url in urls:
results.append(urllib2.urlopen(url))
map 函数一手包办了序列操作、参数传递和结果保存等一系列的操作。
为什么这很重要呢?这是因为借助正确的库,map 可以轻松实现并行化操作。
在 Python 中有个两个库包含了 map 函数: multiprocessing 和它鲜为人知的子库 multiprocessing.mmy.
这里多扯两句: multiprocessing.mmy? mltiprocessing 库的线程版克隆?这是虾米?即便在 multiprocessing 库的官方文档里关于这一子库也只有一句相关描述。而这句描述译成人话基本就是说:"嘛,有这么个东西,你知道就成."相信我,这个库被严重低估了!
mmy 是 multiprocessing 模块的完整克隆,唯一的不同在于 multiprocessing 作用于进程,而 mmy 模块作用于线程(因此也包括了 Python 所有常见的多线程限制)。
所以替换使用这两个库异常容易。你可以针对 IO 密集型任务和 CPU 密集型任务来选择不同的库。2
动手尝试
使用下面的两行代码来引用包含并行化 map 函数的库:
from multiprocessing import Pool
from multiprocessing.mmy import Pool as ThreadPool
实例化 Pool 对象:
pool = ThreadPool()
这条简单的语句替代了 example2.py 中 build_worker_pool 函数 7 行代码的工作。它生成了一系列的 worker 线程并完成初始化工作、将它们储存在变量中以方便访问。
Pool 对象有一些参数,这里我所需要关注的只是它的第一个参数:processes. 这一参数用于设定线程池中的线程数。其默认值为当前机器 CPU 的核数。
一般来说,执行 CPU 密集型任务时,调用越多的核速度就越快。但是当处理网络密集型任务时,事情有有些难以预计了,通过实验来确定线程池的大小才是明智的。
pool = ThreadPool(4) # Sets the pool size to 4
线程数过多时,切换线程所消耗的时间甚至会超过实际工作时间。对于不同的工作,通过尝试来找到线程池大小的最优值是个不错的主意。
创建好 Pool 对象后,并行化的程序便呼之欲出了。我们来看看改写后的 example2.py
import urllib2
from multiprocessing.mmy import Pool as ThreadPool
urls = [
# etc..
]
# Make the Pool of workers
pool = ThreadPool(4)
# Open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
#close the pool and wait for the work to finish
pool.close()
pool.join()
实际起作用的代码只有 4 行,其中只有一行是关键的。map 函数轻而易举的取代了前文中超过 40 行的例子。为了更有趣一些,我统计了不同方法、不同线程池大小的耗时情况。
# results = []
# for url in urls:
# result = urllib2.urlopen(url)
# results.append(result)
# # ------- VERSUS ------- #
# # ------- 4 Pool ------- #
# pool = ThreadPool(4)
# results = pool.map(urllib2.urlopen, urls)
# # ------- 8 Pool ------- #
# pool = ThreadPool(8)
# results = pool.map(urllib2.urlopen, urls)
# # ------- 13 Pool ------- #
# pool = ThreadPool(13)
# results = pool.map(urllib2.urlopen, urls)
结果:
# Single thread: 14.4 Seconds
# 4 Pool: 3.1 Seconds
# 8 Pool: 1.4 Seconds
# 13 Pool: 1.3 Seconds
很棒的结果不是吗?这一结果也说明了为什么要通过实验来确定线程池的大小。在我的机器上当线程池大小大于 9 带来的收益就十分有限了。
另一个真实的例子
生成上千张图片的缩略图
这是一个 CPU 密集型的任务,并且十分适合进行并行化。
基础单进程版本
import os
import PIL
from multiprocessing import Pool
from PIL import Image
SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'
def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if 'jpeg' in f)
def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base, fname = os.path.split(filename)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)
if __name__ == '__main__':
folder = os.path.abspath(
'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
images = get_image_paths(folder)
for image in images:
create_thumbnail(Image)
上边这段代码的主要工作就是将遍历传入的文件夹中的图片文件,一一生成缩略图,并将这些缩略图保存到特定文件夹中。
这我的机器上,用这一程序处理 6000 张图片需要花费 27.9 秒。
如果我们使用 map 函数来代替 for 循环:
import os
import PIL
from multiprocessing import Pool
from PIL import Image
SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'
def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if 'jpeg' in f)
def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base, fname = os.path.split(filename)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)
if __name__ == '__main__':
folder = os.path.abspath(
'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
images = get_image_paths(folder)
pool = Pool()
pool.map(creat_thumbnail, images)
pool.close()
pool.join()
5.6 秒!
虽然只改动了几行代码,我们却明显提高了程序的执行速度。在生产环境中,我们可以为 CPU 密集型任务和 IO 密集型任务分别选择多进程和多线程库来进一步提高执行速度——这也是解决死锁问题的良方。此外,由于 map 函数并不支持手动线程管理,反而使得相关的 debug 工作也变得异常简单。
到这里,我们就实现了(基本)通过一行 Python 实现并行化。