当前位置:首页 » 编程语言 » pythonmap与多线程

pythonmap与多线程

发布时间: 2022-11-04 22:43:53

Ⅰ 有没有易懂的 python 多线程爬虫代码

Python 在程序并行化方面多少有些声名狼藉。撇开技术上的问题,例如线程的实现和 GIL1,我觉得错误的教学指导才是主要问题。常见的经典 Python 多线程、多进程教程多显得偏“重”。而且往往隔靴搔痒,没有深入探讨日常工作中最有用的内容。
传统的例子
简单搜索下“Python 多线程教程”,不难发现几乎所有的教程都给出涉及类和队列的例子:
#Example.py'''
Standard Procer/Consumer Threading Pattern
'''import time
import threading
import Queue

class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue

def run(self):
while True:
# queue.get() blocks the current thread until
# an item is retrieved.
msg = self._queue.get()
# Checks if the current message is
# the "Poison Pill"
if isinstance(msg, str) and msg == 'quit': # if so, exists the loop
break
# "Processes" (or in our case, prints) the queue item
print "I'm a thread, and I received %s!!" % msg # Always be friendly!
print 'Bye byes!'def Procer():
# Queue is used to share items between
# the threads.
queue = Queue.Queue() # Create an instance of the worker
worker = Consumer(queue) # start calls the internal run() method to
# kick off the thread
worker.start()

# variable to keep track of when we started
start_time = time.time()
# While under 5 seconds..
while time.time() - start_time < 5:
# "Proce" a piece of work and stick it in
# the queue for the Consumer to process
queue.put('something at %s' % time.time()) # Sleep a bit just to avoid an absurd number of messages
time.sleep(1) # This the "poison pill" method of killing a thread.
queue.put('quit') # wait for the thread to close down
worker.join()if __name__ == '__main__':
Procer()

哈,看起来有些像 Java 不是吗?
我并不是说使用生产者/消费者模型处理多线程/多进程任务是错误的(事实上,这一模型自有其用武之地)。只是,处理日常脚本任务时我们可以使用更有效率的模型。
问题在于…
首先,你需要一个样板类;
其次,你需要一个队列来传递对象;
而且,你还需要在通道两端都构建相应的方法来协助其工作(如果需想要进行双向通信或是保存结果还需要再引入一个队列)。
worker 越多,问题越多
按照这一思路,你现在需要一个 worker 线程的线程池。下面是一篇 IBM 经典教程中的例子——在进行网页检索时通过多线程进行加速。
#Example2.py'''
A more realistic thread pool example
'''import time
import threading
import Queue
import urllib2

class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue

def run(self):
while True:
content = self._queue.get()
if isinstance(content, str) and content == 'quit': break
response = urllib2.urlopen(content) print 'Bye byes!'def Procer():
urls = [ 'http', 'httcom'
'ala.org', 'hle.com'
# etc..
]
queue = Queue.Queue()
worker_threads = build_worker_pool(queue, 4)
start_time = time.time() # Add the urls to process
for url in urls:
queue.put(url)
# Add the poison pillv
for worker in worker_threads:
queue.put('quit') for worker in worker_threads:
worker.join() print 'Done! Time taken: {}'.format(time.time() - start_time)def build_worker_pool(queue, size):
workers = [] for _ in range(size):
worker = Consumer(queue)
worker.start()
workers.append(worker) return workersif __name__ == '__main__':
Procer()

这段代码能正确的运行,但仔细看看我们需要做些什么:构造不同的方法、追踪一系列的线程,还有为了解决恼人的死锁问题,我们需要进行一系列的 join 操作。这还只是开始……
至此我们回顾了经典的多线程教程,多少有些空洞不是吗?样板化而且易出错,这样事倍功半的风格显然不那么适合日常使用,好在我们还有更好的方法。
何不试试 map
map 这一小巧精致的函数是简捷实现 Python 程序并行化的关键。map 源于 Lisp 这类函数式编程语言。它可以通过一个序列实现两个函数之间的映射。
urls = ['ho.com', 'htdit.com']
results = map(urllib2.urlopen, urls)

上面的这两行代码将 urls 这一序列中的每个元素作为参数传递到 urlopen 方法中,并将所有结果保存到 results 这一列表中。其结果大致相当于:
results = []for url in urls:
results.append(urllib2.urlopen(url))

map 函数一手包办了序列操作、参数传递和结果保存等一系列的操作。
为什么这很重要呢?这是因为借助正确的库,map 可以轻松实现并行化操作。

在 Python 中有个两个库包含了 map 函数: multiprocessing 和它鲜为人知的子库 multiprocessing.mmy.
这里多扯两句: multiprocessing.mmy? mltiprocessing 库的线程版克隆?这是虾米?即便在 multiprocessing 库的官方文档里关于这一子库也只有一句相关描述。而这句描述译成人话基本就是说:"嘛,有这么个东西,你知道就成."相信我,这个库被严重低估了!
mmy 是 multiprocessing 模块的完整克隆,唯一的不同在于 multiprocessing 作用于进程,而 mmy 模块作用于线程(因此也包括了 Python 所有常见的多线程限制)。
所以替换使用这两个库异常容易。你可以针对 IO 密集型任务和 CPU 密集型任务来选择不同的库。2
动手尝试
使用下面的两行代码来引用包含并行化 map 函数的库:
from multiprocessing import Poolfrom multiprocessing.mmy import Pool as ThreadPool

实例化 Pool 对象:
pool = ThreadPool()

这条简单的语句替代了 example2.py 中 build_worker_pool 函数 7 行代码的工作。它生成了一系列的 worker 线程并完成初始化工作、将它们储存在变量中以方便访问
Pool 对象有一些参数,这里我所需要关注的只是它的第一个参数:processes. 这一参数用于设定线程池中的线程数。其默认值为当前机器 CPU 的核数。
一般来说,执行 CPU 密集型任务时,调用越多的核速度就越快。但是当处理网络密集型任务时,事情有有些难以预计了,通过实验来确定线程池的大小才是明智的。
pool = ThreadPool(4) # Sets the pool size to 4

线程数过多时,切换线程所消耗的时间甚至会超过实际工作时间。对于不同的工作,通过尝试来找到线程池大小的最优值是个不错的主意。
创建好 Pool 对象后,并行化的程序便呼之欲出了。我们来看看改写后的 example2.py
import urllib2
from multiprocessing.mmy import Pool as ThreadPool

urls = [ 'httorg',
'hon.org/about/',
'hnlamp.com/pub/a/python/2003/04/17/metaclasses.html',

# etc..
]

# Make the Pool of workers
pool = ThreadPool(4)
# Open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
#close the pool and wait for the work to finish
pool.close()
pool.join()

实际起作用的代码只有 4 行,其中只有一行是关键的。map 函数轻而易举的取代了前文中超过 40 行的例子。为了更有趣一些,我统计了不同方法、不同线程池大小的耗时情况。
# results = [] # for url in urls:# result = urllib2.urlopen(url)# results.append(result)# # ------- VERSUS ------- # # # ------- 4 Pool ------- # # pool = ThreadPool(4) # results = pool.map(urllib2.urlopen, urls)# # ------- 8 Pool ------- # # pool = ThreadPool(8) # results = pool.map(urllib2.urlopen, urls)# # ------- 13 Pool ------- # # pool = ThreadPool(13) # results = pool.map(urllib2.urlopen, urls)

结果:
# Single thread: 14.4 Seconds # 4 Pool: 3.1 Seconds# 8 Pool: 1.4 Seconds# 13 Pool: 1.3 Seconds

很棒的结果不是吗?这一结果也说明了为什么要通过实验来确定线程池的大小。在我的机器上当线程池大小大于 9 带来的收益就十分有限了。

Ⅱ python 多进程和多线程配合

由于python的多线程中存在PIL锁,因此python的多线程不能利用多核,那么,由于现在的计算机是多核的,就不能充分利用计算机的多核资源。但是python中的多进程是可以跑在不同的cpu上的。因此,尝试了多进程+多线程的方式,来做一个任务。比如:从中科大的镜像源中下载多个rpm包。
#!/usr/bin/pythonimport reimport commandsimport timeimport multiprocessingimport threadingdef download_image(url):
print '*****the %s rpm begin to download *******' % url
commands.getoutput('wget %s' % url)def get_rpm_url_list(url):
commands.getoutput('wget %s' % url)
rpm_info_str = open('index.html').read()

regu_mate = '(?<=<a href=")(.*?)(?=">)'
rpm_list = re.findall(regu_mate, rpm_info_str)

rpm_url_list = [url + rpm_name for rpm_name in rpm_list] print 'the count of rpm list is: ', len(rpm_url_list) return rpm_url_
def multi_thread(rpm_url_list):
threads = [] # url = 'https://mirrors.ustc.e.cn/centos/7/os/x86_64/Packages/'
# rpm_url_list = get_rpm_url_list(url)
for index in range(len(rpm_url_list)): print 'rpm_url is:', rpm_url_list[index]
one_thread = threading.Thread(target=download_image, args=(rpm_url_list[index],))
threads.append(one_thread)

thread_num = 5 # set threading pool, you have put 4 threads in it
while 1:
count = min(thread_num, len(threads)) print '**********count*********', count ###25,25,...6707%25

res = [] for index in range(count):
x = threads.pop()
res.append(x) for thread_index in res:
thread_index.start() for j in res:
j.join() if not threads:
def multi_process(rpm_url_list):
# process num at the same time is 4
process = []
rpm_url_group_0 = []
rpm_url_group_1 = []
rpm_url_group_2 = []
rpm_url_group_3 = [] for index in range(len(rpm_url_list)): if index % 4 == 0:
rpm_url_group_0.append(rpm_url_list[index]) elif index % 4 == 1:
rpm_url_group_1.append(rpm_url_list[index]) elif index % 4 == 2:
rpm_url_group_2.append(rpm_url_list[index]) elif index % 4 == 3:
rpm_url_group_3.append(rpm_url_list[index])
rpm_url_groups = [rpm_url_group_0, rpm_url_group_1, rpm_url_group_2, rpm_url_group_3] for each_rpm_group in rpm_url_groups:
each_process = multiprocessing.Process(target = multi_thread, args = (each_rpm_group,))
process.append(each_process) for one_process in process:
one_process.start() for one_process in process:
one_process.join()# for each_url in rpm_url_list:# print '*****the %s rpm begin to download *******' %each_url## commands.getoutput('wget %s' %each_url)
def main():
url = 'https://mirrors.ustc.e.cn/centos/7/os/x86_64/Packages/'
url_paas = 'http://mirrors.ustc.e.cn/centos/7.3.1611/paas/x86_64/openshift-origin/'
url_paas2 ='http://mirrors.ustc.e.cn/fedora/development/26/Server/x86_64/os/Packages/u/'

start_time = time.time()
rpm_list = get_rpm_url_list(url_paas) print multi_process(rpm_list) # print multi_thread(rpm_list)
#print multi_process()
# print multi_thread(rpm_list)
# for index in range(len(rpm_list)):
# print 'rpm_url is:', rpm_list[index]
end_time = time.time() print 'the download time is:', end_time - start_timeprint main()123456789101112131415161718

代码的功能主要是这样的:
main()方法中调用get_rpm_url_list(base_url)方法,获取要下载的每个rpm包的具体的url地址。其中base_url即中科大基础的镜像源的地址,比如:http://mirrors.ustc.e.cn/centos/7.3.1611/paas/x86_64/openshift-origin/,这个地址下有几十个rpm包,get_rpm_url_list方法将每个rpm包的url地址拼出来并返回。
multi_process(rpm_url_list)启动多进程方法,在该方法中,会调用多线程方法。该方法启动4个多进程,将上面方法得到的rpm包的url地址进行分组,分成4组,然后每一个组中的rpm包再最后由不同的线程去执行。从而达到了多进程+多线程的配合使用。
代码还有需要改进的地方,比如多进程启动的进程个数和rpm包的url地址分组是硬编码,这个还需要改进,毕竟,不同的机器,适合同时启动的进程个数是不同的。

Ⅲ map函数的用法python

map函数的用法如下:

map(func, lst) ,将传⼊的函数变量 func 作⽤到 lst 变量的每个元素中,并将结果组成新的列表 (Python2)/ 迭代器(Python3) 返回。

注意:

map()返回的是一个迭代器,直接打印map()的结果是返回的一个对象。

map函数示例代码:

lst = ['1', '2', '3', '4', '5', '6']

print(lst)

lst_int = map(lambda x: int(x), lst)

# print(list(lst_int))

for i in lst_int:

print(i, end=' ')

print()

print(list(lst_int))

Ⅳ Python多线程总结

在实际处理数据时,因系统内存有限,我们不可能一次把所有数据都导出进行操作,所以需要批量导出依次操作。为了加快运行,我们会采用多线程的方法进行数据处理, 以下为我总结的多线程批量处理数据的模板:

主要分为三大部分:


共分4部分对多线程的内容进行总结。

先为大家介绍线程的相关概念:

在飞车程序中,如果没有多线程,我们就不能一边听歌一边玩飞车,听歌与玩 游戏 不能并行;在使用多线程后,我们就可以在玩 游戏 的同时听背景音乐。在这个例子中启动飞车程序就是一个进程,玩 游戏 和听音乐是两个线程。

Python 提供了 threading 模块来实现多线程:

因为新建线程系统需要分配资源、终止线程系统需要回收资源,所以如果可以重用线程,则可以减去新建/终止的开销以提升性能。同时,使用线程池的语法比自己新建线程执行线程更加简洁。

Python 为我们提供了 ThreadPoolExecutor 来实现线程池,此线程池默认子线程守护。它的适应场景为突发性大量请求或需要大量线程完成任务,但实际任务处理时间较短。

其中 max_workers 为线程池中的线程个数,常用的遍历方法有 map 和 submit+as_completed 。根据业务场景的不同,若我们需要输出结果按遍历顺序返回,我们就用 map 方法,若想谁先完成就返回谁,我们就用 submit+as_complete 方法。

我们把一个时间段内只允许一个线程使用的资源称为临界资源,对临界资源的访问,必须互斥的进行。互斥,也称间接制约关系。线程互斥指当一个线程访问某临界资源时,另一个想要访问该临界资源的线程必须等待。当前访问临界资源的线程访问结束,释放该资源之后,另一个线程才能去访问临界资源。锁的功能就是实现线程互斥。

我把线程互斥比作厕所包间上大号的过程,因为包间里只有一个坑,所以只允许一个人进行大号。当第一个人要上厕所时,会将门上上锁,这时如果第二个人也想大号,那就必须等第一个人上完,将锁解开后才能进行,在这期间第二个人就只能在门外等着。这个过程与代码中使用锁的原理如出一辙,这里的坑就是临界资源。 Python 的 threading 模块引入了锁。 threading 模块提供了 Lock 类,它有如下方法加锁和释放锁:

我们会发现这个程序只会打印“第一道锁”,而且程序既没有终止,也没有继续运行。这是因为 Lock 锁在同一线程内第一次加锁之后还没有释放时,就进行了第二次 acquire 请求,导致无法执行 release ,所以锁永远无法释放,这就是死锁。如果我们使用 RLock 就能正常运行,不会发生死锁的状态。

在主线程中定义 Lock 锁,然后上锁,再创建一个子 线程t 运行 main 函数释放锁,结果正常输出,说明主线程上的锁,可由子线程解锁。

如果把上面的锁改为 RLock 则报错。在实际中设计程序时,我们会将每个功能分别封装成一个函数,每个函数中都可能会有临界区域,所以就需要用到 RLock 。

一句话总结就是 Lock 不能套娃, RLock 可以套娃; Lock 可以由其他线程中的锁进行操作, RLock 只能由本线程进行操作。

Ⅳ python map是不是多线程

显然不是。map是完全的单线程。

Ⅵ python需要学习什么内容

Python的学习内容还是比较多的,我们将学习的过程划分为4个阶段,每个阶段学习对应的内容,具体的学习顺序如下:

Python学习顺序:

①Python软件开发基础

  • 掌握计算机的构成和工作原理

  • 会使用Linux常用工具

  • 熟练使用Docker的基本命令

  • 建立Python开发环境,并使用print输出

  • 使用Python完成字符串的各种操作

  • 使用Python re模块进行程序设计

  • 使用Python创建文件、访问、删除文件

  • 掌握import 语句、From…import 语句、From…import* 语句、方法的引用、Python中的包

  • ②Python软件开发进阶

  • 能够使用Python面向对象方法开发软件

  • 能够自己建立数据库,表,并进行基本数据库操作

  • 掌握非关系数据库MongoDB的使用,掌握Redis开发

  • 能够独立完成TCP/UDP服务端客户端软件开发,能够实现ftp、http服务器,开发邮件软件

  • 能开发多进程、多线程软件

  • ③Python全栈式WEB工程师

  • 能够独立完成后端软件开发,深入理解Python开发后端的精髓

  • 能够独立完成前端软件开发,并和后端结合,熟练掌握使用Python进行全站Web开发的技巧

  • ④Python多领域开发

  • 能够使用Python熟练编写爬虫软件

  • 能够熟练使用Python库进行数据分析

  • 招聘网站Python招聘职位数据爬取分析

  • 掌握使用Python开源人工智能框架进行人工智能软件开发、语音识别、人脸识别

  • 掌握基本设计模式、常用算法

  • 掌握软件工程、项目管理、项目文档、软件测试调优的基本方法

互联网行业目前还是最热门的行业之一,学习IT技能之后足够优秀是有机会进入腾讯、阿里、网易等互联网大厂高薪就业的,发展前景非常好,普通人也可以学习。

想要系统学习,你可以考察对比一下开设有相关专业的热门学校,好的学校拥有根据当下企业需求自主研发课程的能力,中博软件学院、南京课工场、南京北大青鸟等开设python专业的学校都是不错的,建议实地考察对比一下。

祝你学有所成,望采纳。

Ⅶ python中多线程调用全局变量,值不是修改后的值

多线程读取全局变量需要引用线程锁,否则多个线程同时读取同一个全局变量会出现和预期不一样的值

Ⅷ python multiprocessing问题,为什么输出结果和预期不一样

众所周知,由于python(Cpython)的全局锁(GIL)问题存在,导致Thread也就是线程的并行并不可实现。 multiprocessing 模块采用多进程而不是多线程的方式实现并行,解决了GIL的问题,一定程度上使状况得到了缓解。

然而,Multiprocess本身依然有一些功能上的瓶颈。其中一个重要的是:进程之间不能共享内存(线程间则可以共享内存)。这意味着在进程间交换数据的时候,需要把数据打包、传递,解包。在python的语境下就是:

"pickle from main process to the subprocess;

depickle from subprocess to an object in memory;

pickle and return to the main process;

depickle from main process and return to memory"

(具体详见这个问题下的吐槽)

因此, 在需要在进程间共享巨大的数据包的时候,多进程的表现还不如单进程。

除此之外,当需要运行的程序本身不是计算密集型而是是IO密集型,多进程所增加的读写会抵消掉运算速度的增益;如果程序复杂度根本不需要用并行来解决,那么建立进程(池)的时间很可能比运行程序本身还要慢;另外,在进程池 multiprocessing.Pool(n) 的 n 的选择上,如果选择了多于当前CPU的核心数目的数字( multiprocessing.cpu_count() ),那么在进程之间切换的功夫会大大拉低效率。

建立对线程和进程关系的直观印象,可参考这篇文章。

快速而完整地了解python的全局锁(GIL)问题,参考这篇不错的博客。

为了解 multiprocess 的使用,我做了一些测试,测试环境是4核的Macbook Air。如下:

from multiprocessing import Process, Manager, Pool

1 def f(l):
2 l.reverse()
3
return
4
5 def main():
6
l1 = [random.randrange(0, 100000, 1) for i in range(0, 100000)]
7
l2 = [random.randrange(0, 100000, 1) for i in range(0, 100000)]
8
l3 = [random.randrange(0, 100000, 1) for i in range(0, 100000)]
9
l4 = [random.randrange(0, 100000, 1) for i in range(0, 100000)]
10
l5 = [random.randrange(0, 100000, 1) for i in range(0, 100000)]
11
l6 = [random.randrange(0, 100000, 1) for i in range(0, 100000)]
12
l7 = [random.randrange(0, 100000, 1) for i in range(0, 100000)]
13
s = time.time()
14
for l in [l1, l2, l3, l4, l5, l6, l7]:
15
f(l)
16
print "%s seconds" % (time.time() - s)
17
s = time.time()
18 map(f, [l1, l2, l3, l4, l5, l6, l7])
19
print "%s seconds" % (time.time() - s)
20
p = Pool(4)
21
s = time.time()
22 p.map(f, [l1, l2, l3, l4, l5, l6, l7])
23
print "%s seconds" % (time.time() - s)
24
return

也就是分别测试 f() 对 l1, l2, l3, l4, l5, l6, l7 7个列表的操作时间。先是循环的依次操作,再是python中非常好用的 map() 函数,最后是 multiprocessing 的进程池 multiprocessing.Pool.map() ——进程池中建立了4个 worker process , 也就是说,接下来的任务会被随机地分配给4个进程来完成。

每次操作之前都重新计时,得到了这样的结果:

>>> main()
0.00250101089478 seconds
0.000663995742798 seconds
0.907639980316 seconds

多进程出奇得慢。而 map() 相对于循环操作有很大的效率提升。

Ⅸ Python课程内容都学习什么啊

贺圣军Python轻松入门到项目实战(经典完整版)(超清视频)网络网盘

链接: https://pan..com/s/1C9k1o65FuQKNe68L3xEx3w

提取码: ja8v 复制这段内容后打开网络网盘手机App,操作更方便哦

若资源有问题欢迎追问~

Ⅹ python中map函数的使用

map() 会根据提供的函数对指定序列做映射。
第一个参数 function 以参数序列中的每一个元素调用 function 函数,返回包含每次 function 函数返回值的新列表。 (10)pythonmap与多线程扩展阅读
map() 函数语法:

map(function, iterable, ...);

参数:

function -- 函数;

iterable -- 一个或多个序列

热点内容
群辉存储服务器 发布:2025-01-11 00:50:19 浏览:428
如何用js脚本 发布:2025-01-11 00:47:32 浏览:887
日志和数据库 发布:2025-01-11 00:47:24 浏览:126
windows配置ftp 发布:2025-01-11 00:35:02 浏览:657
des算法代码c 发布:2025-01-11 00:33:42 浏览:806
共享文件夹设置密码无法访问 发布:2025-01-11 00:32:49 浏览:478
槽钢算法 发布:2025-01-11 00:26:21 浏览:885
linux命令包 发布:2025-01-10 23:54:26 浏览:33
python轮廓 发布:2025-01-10 23:49:23 浏览:179
思科配置线怎么选 发布:2025-01-10 23:48:44 浏览:705