分布式python编程
A. 如何用python写一个分布式爬虫
本文将会以PC端微博进行讲解,因为移动端微博数据不如PC短全面,而且抓取和解析难度都会小一些。文章比较长,由于篇幅所限,文章并没有列出所有代码,只是讲了大致流程和思路。
要抓微博数据,第一步便是模拟登陆,因为很多信息(比如用户信息,用户主页微博数据翻页等各种翻页)都需要在登录状态下才能查看。关于模拟登陆进阶,我写过两篇文章,一篇是模拟登陆微博的,是从小白的角度写的。另外一篇是模拟登陆网络云的,是从有一定经验的熟手的角度写的。读了这两篇文章,并且根据我写的过程自己动手实现过的同学,应该对于模拟登陆PC端微博是没有太大难度的。那两篇文章没有讲如何处理验证码,这里我简单说一下,做爬虫的同学不要老想着用什么机器学习的方法去识别复杂验证码,真的难度非常大,这应该也不是一个爬虫工程师的工作重点,当然这只是我的个人建议。工程化的项目,我还是建议大家通过打码平台来解决验证码的问题。我在分布式微博爬虫中就是直接调用打码平台的接口来做的大规模微博账号的模拟登陆,效果还不错,而且打码成本很低。
说完模拟登陆(具体请参见我写的那两篇文章,篇幅所限,我就不过来了),我们现在正式进入微博的数据抓取。这里我会以微博用户信息抓取为例来进行分析和讲解。
关于用户信息抓取,可能我们有两个目的。一个是我们只想抓一些指定用户,另外一个是我们想尽可能多的抓取更多数量的用户的信息。我的目的假定是第二种。那么我们该以什么样的策略来抓取,才能获得尽可能多的用户信息呢?如果我们初始用户选择有误,选了一些不活跃的用户,很可能会形成一个环,这样就抓不了太多的数据。这里有一个很简单的思路:我们把一些大V拿来做为种子用户,我们先抓他们的个人信息,然后再抓大V所关注的用户和粉丝,大V关注的用户肯定也是类似大V的用户,这样的话,就不容易形成环了。
策略我们都清楚了。就该是分析和编码了。
我们先来分析如何构造用户信息的URL。这里我以微博名为一起神吐槽的博主为例进行分析。做爬虫的话,一个很重要的意识就是爬虫能抓的数据都是人能看到的数据,反过来,人能在浏览器上看到的数据,爬虫几乎都能抓。这里用的是几乎,因为有的数据抓取难度特别。我们首先需要以正常人的流程看看怎么获取到用户的信息。我们先进入该博主的主页,如下图
根据唯一性判断
我们在页面源码中搜索,只发现一个script中有该字符串,那么就是那段script是页面相关信息。我们可以通过正则表达式把该script提取出来,然后把其中的html也提取出来,再保存到本地,看看信息是否全面。这里我就不截图了。感觉还有很多要写的,不然篇幅太长了。
另外,对于具体页面的解析,我也不做太多的介绍了。太细的东西还是建议读读源码。我只讲一下,我觉得的一种处理异常的比较优雅的方式。微博爬虫的话,主要是页面样式太多,如果你打算包含所有不同的用户的模版,那么我觉得几乎不可能,不同用户模版,用到的解析规则就不一样。那么出现解析异常如何处理?尤其是你没有catch到的异常。很可能因为这个问题,程序就崩掉。其实对于Python这门语言来说,我们可以通过装饰器来捕捉我们没有考虑到的异常,比如我这个装饰器
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def parse_decorator(return_type):
"""
:param return_type: 用于捕捉页面解析的异常, 0表示返回数字0, 1表示返回空字符串, 2表示返回[],3表示返回False, 4表示返回{}, 5返回None
:return: 0,'',[],False,{},None
"""
def page_parse(func):
@wraps(func)
def handle_error(*keys):
try:
return func(*keys)
except Exception as e:
parser.error(e)
if return_type == 5:
return None
elif return_type == 4:
return {}
elif return_type == 3:
return False
elif return_type == 2:
return []
elif return_type == 1:
return ''
else:
return 0
return handle_error
return page_parse
上面的代码就是处理解析页面发生异常的情况,我们只能在数据的准确性、全面性和程序的健壮性之间做一些取舍。用装饰器的话,程序中不用写太多的try语句,代码重复率也会减少很多。
页面的解析由于篇幅所限,我就讲到这里了。没有涉及太具体的解析,其中一个还有一个比较难的点,就是数据的全面性,读者可以去多观察几个微博用户的个人信息,就会发现有的个人信息,有的用户有填写,有的并没有。解析的时候要考虑完的话,建议从自己的微博的个人信息入手,看到底有哪些可以填。这样可以保证几乎不会漏掉一些重要的信息。
最后,我再切合本文的标题,讲如何搭建一个分布式的微博爬虫。开发过程中,我们可以先就做单机单线程的爬虫,然后再改成使用celery的方式。这里这样做是为了方便开发和测试,因为你单机搭起来并且跑得通了,那么分布式的话,就很容易改了,因为celery的API使用本来就很简洁。
我们抓取的是用户信息和他的关注和粉丝uid。用户信息的话,我们一个请求大概能抓取一个用户的信息,而粉丝和关注我们一个请求可以抓取18个左右(因为这个抓的是列表),显然可以发现用户信息应该多占一些请求的资源。这时候就该介绍理论篇没有介绍的关于celery的一个高级特性了,它叫做任务路由。直白点说,它可以规定哪个分布式节点能做哪些任务,不能做哪些任务。它的存在可以让资源分配更加合理,分布式微博爬虫项目初期,就没有使用任务路由,然后抓了十多万条关注和分析,结果发现用户信息抓几万条,这就是资源分配得不合理。那么如何进行任务路由呢?
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# coding:utf-8
import os
from datetime import timedelta
from celery import Celery
from kombu import Exchange, Queue
from config.conf import get_broker_or_backend
from celery import platforms
# 允许celery以root身份启动
platforms.C_FORCE_ROOT = True
worker_log_path = os.path.join(os.path.dirname(os.path.dirname(__file__))+'/logs', 'celery.log')
beat_log_path = os.path.join(os.path.dirname(os.path.dirname(__file__))+'/logs', 'beat.log')
tasks = ['tasks.login', 'tasks.user']
# include的作用就是注册服务化函数
app = Celery('weibo_task', include=tasks, broker=get_broker_or_backend(1), backend=get_broker_or_backend(2))
app.conf.update(
CELERY_TIMEZONE='Asia/Shanghai',
CELERY_ENABLE_UTC=True,
CELERYD_LOG_FILE=worker_log_path,
CELERYBEAT_LOG_FILE=beat_log_path,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_QUEUES=(
Queue('login_queue', exchange=Exchange('login', type='direct'), routing_key='for_login'),
Queue('user_crawler', exchange=Exchange('user_info', type='direct'), routing_key='for_user_info'),
Queue('fans_followers', exchange=Exchange('fans_followers', type='direct'), routing_key='for_fans_followers'),
)
上述代码我指定了有login_queue、user_crawler、fans_followers三个任务队列。它们分别的作用是登录、用户信息抓取、粉丝和关注抓取。现在假设我有三台爬虫服务器A、B和C。我想让我所有的账号登录任务分散到三台服务器、让用户抓取在A和B上执行,让粉丝和关注抓取在C上执行,那么启动A、B、C三个服务器的celery worker的命令就分别是
Python
1
2
3
celery -A tasks.workers -Q login_queue,user_crawler worker -l info -c 1 # A服务器和B服务器启动worker的命令,它们只会执行登录和用户信息抓取任务
celery -A tasks.workers -Q login_queue,fans_followers worker -l info -c 1 # C服务器启动worker的命令,它只会执行登录、粉丝和关注抓取任务
然后我们通过命令行或者代码(如下)就能发送所有任务给各个节点执行了
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
4
B. Python分布式进程中你会遇到的坑
写在前面
小惊大怪
你是不是在用Python3或者在windows系统上编程?最重要的是你对进程和线程不是很清楚?那么恭喜你,在python分布式进程中,会有坑等着你去挖。。。(hahahaha,此处允许我吓唬一下你)开玩笑的啦,不过,如果你知道序列中不支持匿名函数,那这个坑就和你say byebye了。好了话不多数,直接进入正题。
分布式进程
正如大家所知道的Process比Thread更稳定,而且Process可以分布到多台机器上,而Thread最多只能分布到同一陆坦咐台机器的多个CPU上。Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
代码记录
举个例子
如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上,这应该怎么用分布式进程来实现呢?你已经知道了原有的Queue可以继续使用,而且通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程来访问Queue了。好,那我们就这么干!
写个task_master.py
我们先看服务进程。服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务。
请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。然后,在另一台机器上启动任务进程(本机上启动也可以)
写个task_worker.py
任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。
运行结果
现在,可信没以试试分布式进程的工作效果了。先启动task_master.py服务进程:
task_master.py进程发送完任务后,开始等待result队列的结果。现在启动task_worker.py进程:
看到没,结果都出错了,我们好好分析一下到底哪出错了。。。
错误分析
在task_master.py的报错提示中,我们知道它说lambda错误,这是因为序列化不支持匿名函数,所以我们得修改代码,重新对queue用QueueManager进行封装放到网络中。
其中task_queue和result_queue是两个队列,分别存放任务和结果。它们用来进行进程间通信,交换对象。
因为是分布式的环境,放入queue中的数据需要等待Workers机器运算处理后再进行读取,这样就需要对queue用QueueManager进行封装放到网络中,这是通过上面的2行代码来实现的。我们给return_task_queue的网络调用接口取了一个名早纯get_task_queue,而return_result_queue的名字是get_result_queue,方便区分对哪个queue进行操作。task.put(n)即是对task_queue进行写入数据,相当于分配任务。而result.get()即是等待workers机器处理后返回的结果。
值得注意 在windows系统中你必须要写IP地址,而其他操作系统比如linux操作系统则就不要了。
修改后的代码
在task_master.py中修改如下:
在task_worker.py中修改如下:
先运行task_master.py,然后再运行task_worker.py
(1)task_master.py运行结果如下
(2)task_worker.py运行结果如下
知识补充
这个简单的Master/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。
Queue对象存储在哪?注意到task_worker.py中根本没有创建Queue的代码,所以,Queue对象存储在task_master.py进程中:
而Queue之所以能通过网络访问,就是通过QueueManager实现的。由于QueueManager管理的不止一个Queue,所以,要给每个Queue的网络调用接口起个名字,比如get_task_queue。task_worker这里的QueueManager注册的名字必须和task_manager中的一样。对比上面的例子,可以看出Queue对象从另一个进程通过网络传递了过来。只不过这里的传递和网络通信由QueueManager完成。
authkey有什么用?这是为了保证两台机器正常通信,不被其他机器恶意干扰。如果task_worker.py的authkey和task_master.py的authkey不一致,肯定连接不上。
C. 如何用 Python 构建一个简单的分布式系统
分布式爬虫概览
何谓分布式爬虫?
通俗的讲,分布式爬虫就是多台机器多个
spider
对多个
url
的同时处理问题,分布式的方式可以极大提高程序的抓取效率。
构建分布式爬虫通畅需要考虑的问题
(1)如何能保证多台机器同时抓取同一个URL?
(2)如果某个节点挂掉,会不会影响其它节点,任务如何继续?
(3)既然是分布式,如何保证架构的可伸缩性和可扩展性?不同优先级的抓取任务如何进行资源分配和调度?
基于上述问题,我选择使用celery作为分布式任务调度工具,是分布式爬虫中任务和资源调度的核心模块。它会把所有任务都通过消息队列发送给各个分布式节点进行执行,所以可以很好的保证url不会被重复抓取;它在检测到worker挂掉的情况下,会尝试向其他的worker重新发送这个任务信息,这样第二个问题也可以得到解决;celery自带任务路由,我们可以根据实际情况在不同的节点上运行不同的抓取任务(在实战篇我会讲到)。本文主要就是带大家了解一下celery的方方面面(有celery相关经验的同学和大牛可以直接跳过了)
Celery知识储备
celery基础讲解
按celery官网的介绍来说
Celery
是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。
下面几个关于celery的核心知识点
broker:翻译过来叫做中间人。它是一个消息传输的中间件,可以理解为一个邮箱。每当应用程序调用celery的异步任务的时候,会向broker传递消息,而后celery的worker将会取到消息,执行相应程序。这其实就是消费者和生产者之间的桥梁。
backend:
通常程序发送的消息,发完就完了,可能都不知道对方时候接受了。为此,celery实现了一个backend,用于存储这些消息以及celery执行的一些消息和结果。
worker:
Celery类的实例,作用就是执行各种任务。注意在celery3.1.25后windows是不支持celery
worker的!
procer:
发送任务,将其传递给broker
beat:
celery实现的定时任务。可以将其理解为一个procer,因为它也是通过网络调用定时将任务发送给worker执行。注意在windows上celery是不支持定时任务的!
下面是关于celery的架构示意图,结合上面文字的话应该会更好理解
由于celery只是任务队列,而不是真正意义上的消息队列,它自身不具有存储数据的功能,所以broker和backend需要通过第三方工具来存储信息,celery官方推荐的是
RabbitMQ和Redis,另外mongodb等也可以作为broker或者backend,可能不会很稳定,我们这里选择Redis作为broker兼backend。
实际例子
先安装celery
pip
install
celery
我们以官网给出的例子来做说明,并对其进行扩展。首先在项目根目录下,这里我新建一个项目叫做celerystudy,然后切换到该项目目录下,新建文件tasks.py,然后在其中输入下面代码
这里我详细讲一下代码:我们先通过app=Celery()来实例化一个celery对象,在这个过程中,我们指定了它的broker,是redis的db
2,也指定了它的backend,是redis的db3,
broker和backend的连接形式大概是这样
redis://:password@hostname:port/db_number
然后定义了一个add函数,重点是@app.task,它的作用在我看来就是将add()
注册为一个类似服务的东西,本来只能通过本地调用的函数被它装饰后,就可以通过网络来调用。这个tasks.py中的app就是一个worker。它可以有很多任务,比如这里的任务函数add。我们再通过在命令行切换到项目根目录,执行
celery
-A
tasks
worker
-l
info
启动成功后就是下图所示的样子
这里我说一下各个参数的意思,-A指定的是app(即Celery实例)所在的文件模块,我们的app是放在tasks.py中,所以这里是
tasks;worker表示当前以worker的方式运行,难道还有别的方式?对的,比如运行定时任务就不用指定worker这个关键字;
-l
info表示该worker节点的日志等级是info,更多关于启动worker的参数(比如-c、-Q等常用的)请使用
celery
worker
--help
进行查看
将worker启动起来后,我们就可以通过网络来调用add函数了。我们在后面的分布式爬虫构建中也是采用这种方式分发和消费url的。在命令行先切换到项目根目录,然后打开python交互端
from
tasks
import
addrs
=
add.delay(2,
2)
这里的add.delay就是通过网络调用将任务发送给add所在的worker执行,这个时候我们可以在worker的界面看到接收的任务和计算的结果。
这里是异步调用,如果我们需要返回的结果,那么要等rs的ready状态true才行。这里add看不出效果,不过试想一下,如果我们是调用的比较占时间的io任务,那么异步任务就比较有价值了
上面讲的是从Python交互终端中调用add函数,如果我们要从另外一个py文件调用呢?除了通过import然后add.delay()这种方式,我们还可以通过send_task()这种方式,我们在项目根目录另外新建一个py文件叫做
excute_tasks.py,在其中写下如下的代码
from
tasks
import
addif
__name__
==
'__main__':
add.delay(5,
10)
这时候可以在celery的worker界面看到执行的结果
此外,我们还可以通过send_task()来调用,将excute_tasks.py改成这样
这种方式也是可以的。send_task()还可能接收到为注册(即通过@app.task装饰)的任务,这个时候worker会忽略这个消息
定时任务
上面部分讲了怎么启动worker和调用worker的相关函数,这里再讲一下celery的定时任务。
爬虫由于其特殊性,可能需要定时做增量抓取,也可能需要定时做模拟登陆,以防止cookie过期,而celery恰恰就实现了定时任务的功能。在上述基础上,我们将tasks.py文件改成如下内容
然后先通过ctrl+c停掉前一个worker,因为我们代码改了,需要重启worker才会生效。我们再次以celery
-A
tasks
worker
-l
info这个命令开启worker。
这个时候我们只是开启了worker,如果要让worker执行任务,那么还需要通过beat给它定时发送,我们再开一个命令行,切换到项目根目录,通过
这样就表示定时任务已经开始运行了。
眼尖的同学可能看到我这里celery的版本是3.1.25,这是因为celery支持的windows最高版本是3.1.25。由于我的分布式微博爬虫的worker也同时部署在了windows上,所以我选择了使用
3.1.25。如果全是linux系统,建议使用celery4。
此外,还有一点需要注意,在celery4后,定时任务(通过schele调度的会这样,通过crontab调度的会马上执行)会在当前时间再过定时间隔执行第一次任务,比如我这里设置的是60秒的间隔,那么第一次执行add会在我们通过celery
beat
-A
tasks
-l
info启动定时任务后60秒才执行;celery3.1.25则会马上执行该任务
D. Python主要内容学的是什么
第一步:Python开发基础
Python全栈开发与人工智能之Python开发基础知识学习内容包括:Python基础语法、数据类型、字符编码、文件操作、函数、装饰器、迭代器、内置方法、常用模块等。
第二步:Python高级编程和数据库开发
Python全栈开发与人工智能之Python高级编程和数据库开发知识学习内容包括:面向对象开发、Socket网络编程、线程、进程、队列、IO多路模型、Mysql数据库开发等。
第三步:前端开发
Python全栈开发与人工智能之前端开发知识学习内容包括:Html、CSS、JavaScript开发、Jquery&bootstrap开发、前端框架VUE开发等。
第十步:高并发语言GO开发
Python全栈开发与人工智能之高并发语言GO开发学习内容包括:GO语言基础、数据类型与文件IO操作、函数和面向对象、并发编程等。
E. 哪些分布式文件系统是由Python编写的呢
我知道分布式文件系统完全用Python 写的只有openstack 的swift。
其他还有一些不知名的分布式文件系统用python 写的如:
NCFS(基于多个云存储的分布式文件系统)
一般考虑性能都不会采用python 作为分布式文件系统的开发语言
F. python面试之分布式
主要用于分散压力,所以分布式的服务都是部署在不同的服务器上的,再将服务做集群
根据“分层”的思想进行拆分。
例如,可以将一个项目根据“三层架构” 拆分
然后再分开部署 :
根据业务进行拆分。
例如,可以根据业务逻辑,将“电商项目”拆分成 “订单项目”、“用户项目”和“秒杀项目” 。显然这三个拆分后的项目,仍然可以作为独立的项目使用。像这种拆分的方法,就成为垂直拆分
主要用于分散能力,主要是将服务的颗粒度尽量细化,且自成一脉,压力这块并不是其关注的点,所以多个微服务是可以部署在同一台服务器上的
微服务可以理解为一种 非常细粒度的垂直拆分 。例如,以上“订单项目”本来就是垂直拆分后的子项目,但实际上“订单项目”还能进一步闹蔽拆分为“购物项目”、“结算项目”和“售后项目”,如图
现在看图中的“订单项目”,它完全可以作为一个分布式项目的组成元素,但就不适合作为微服务的组成元素了(因为它还能再拆,而微服务应该是不能再拆的“微小”服务,类似于“原子性”)
分布式服务需要提供给别的分布式服务去调用,单独拆出来 未必外部可用
微服务自成一脉,可以系统内部调用,也可以单独提供服务
为什么需要用分布式锁,见下图
变量A存在三个服务器内存中(这个变量A主要体现是在一个类中的一个成员变量,是一个有状态的对象),如果不加任何控制的话,变量A同时都会在分配一块内存,三个请求发过来同时对这个变量操作,显然结果是不对的!即使不是同时发过来,三个请求分别操作三个不同内存区域的数据,变量A之间不存在共享,也不具有可见性,处理的结果也是不对的。
分布式锁应该具备哪些条件:
1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
2、高可用的获取锁与释液姿州放锁;
3、高性能的获取锁与释放锁;
4、具备可重入特性;
5、具备锁失效机制,防止死锁;
6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败
Redis性能高
命令简单,实现方便
使用setnx加锁,key为锁名,value随意不重复就行(一般用uuid)
给锁添加expire时间,超过该时间redis过期(即自动释放锁)
设置获取锁的超时时间,若超过时间,则放弃获取锁
通过锁名获取锁值
比较锁值和当前uuid是否一致,一致则释放锁(通过delete命令删除redis键值对)
2PC:two phase commit protocol,二阶段提交协议,是一种强一致性设计。
同步阻塞(导致长久的资源锁定) ,只有第一阶段全部正常完成(返回失败,回字返回超时都会返回 “准备失败” ),才会进入第二阶段
因为协调者可册码能会在任意一个时间点(发送准备命令之前,发送准备命令之后,发送回滚事务命令之前,发送回滚事务命令之后,发送提交事务命令之前,发送提交事务命令之后)故障,导致资源阻塞。
T:try,指的是预留,即资源的预留和锁定,注意是预留
C:confirm,指的是确认操作,这一步其实就是真正的执行了
C:cancel,指的是撤销操作,可以理解为把预留阶段的动作撤销了
从思想上看和 2PC 差不多,都是先试探性的执行,如果都可以那就真正的执行,如果不行就回滚。
适用于对实时性要求没那么高的业务场景,如:短信通知
G. Python中的多进程与多线程/分布式该如何使用
Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情。
借助这个包,可以轻松完成从单进程到并发执行的转换。
1、新建单一进程
如果我们新建少量进程,可以如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
p = multiprocessing.Process(target=func, args=("hello", ))
p.start()
p.join()
print "Sub-process done."12345678910111213
2、使用进程池
是的,你没有看错,不是线程池。它可以让你跑满多核CPU,而且使用方法非常简单。
注意要用apply_async,如果落下async,就变成阻塞版本了。
processes=4是最多并发进程数量。
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
for i in xrange(10):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close()
pool.join()
print "Sub-process(es) done."12345678910111213141516
3、使用Pool,并需要关注结果
更多的时候,我们不仅需要多进程执行,还需要关注每个进程的执行结果,如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
return "done " + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(10):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print res.get()
print "Sub-process(es) done."
2014.12.25更新
根据网友评论中的反馈,在Windows下运行有可能崩溃(开启了一大堆新窗口、进程),可以通过如下调用来解决:
multiprocessing.freeze_support()1
附录(自己的脚本):
#!/usr/bin/python
import threading
import subprocess
import datetime
import multiprocessing
def dd_test(round, th):
test_file_arg = 'of=/zbkc/test_mds_crash/1m_%s_%s_{}' %(round, th)
command = "seq 100 | xargs -i dd if=/dev/zero %s bs=1M count=1" %test_file_arg
print command
subprocess.call(command,shell=True,stdout=open('/dev/null','w'),stderr=subprocess.STDOUT)
def mds_stat(round):
p = subprocess.Popen("zbkc mds stat", shell = True, stdout = subprocess.PIPE)
out = p.stdout.readlines()
if out[0].find('active') != -1:
command = "echo '0205pm %s round mds status OK, %s' >> /round_record" %(round, datetime.datetime.now())
command_2 = "time (ls /zbkc/test_mds_crash/) 2>>/round_record"
command_3 = "ls /zbkc/test_mds_crash | wc -l >> /round_record"
subprocess.call(command,shell=True)
subprocess.call(command_2,shell=True)
subprocess.call(command_3,shell=True)
return 1
else:
command = "echo '0205 %s round mds status abnormal, %s, %s' >> /round_record" %(round, out[0], datetime.datetime.now())
subprocess.call(command,shell=True)
return 0
#threads = []
for round in range(1, 1600):
pool = multiprocessing.Pool(processes = 10) #使用进程池
for th in range(10):
# th_name = "thread-" + str(th)
# threads.append(th_name) #添加线程到线程列表
# threading.Thread(target = dd_test, args = (round, th), name = th_name).start() #创建多线程任务
pool.apply_async(dd_test, (round, th))
pool.close()
pool.join()
#等待线程完成
# for t in threads:
# t.join()
if mds_stat(round) == 0:
subprocess.call("zbkc -s",shell=True)
break
H. 如何用 python 构建一个简单的分布式系统
从GitHub中整理出的15个最受欢迎的Python开源框架。这些框架包括事件I/O,OLAP,Web开发,高性能网络通信,测试,爬虫等。
Django: Python Web应用开发框架
Django 应该是最出名的Python框架,GAE甚至Erlang都有框架受它影响。Django是走大而全的方向,它最出名的是其全自动化的管理后台:只需要使用起ORM,做简单的对象定义,它就能自动生成数据库结构、以及全功能的管理后台。
Diesel:基于Greenlet的事件I/O框架
Diesel提供一个整洁的API来编写网络客户端和服务器。支持TCP和UDP。
Flask:一个用Python编写的轻量级Web应用框架
Flask是一个使用Python编写的轻量级Web应用框架。基于Werkzeug WSGI工具箱和Jinja2
模板引擎。Flask也被称为“microframework”,因为它使用简单的核心,用extension增加其他功能。Flask没有默认使用的数
据库、窗体验证工具。
Cubes:轻量级Python OLAP框架
Cubes是一个轻量级Python框架,包含OLAP、多维数据分析和浏览聚合数据(aggregated data)等工具。
Kartograph.py:创造矢量地图的轻量级Python框架
Kartograph是一个Python库,用来为ESRI生成SVG地图。Kartograph.py目前仍处于beta阶段,你可以在virtualenv环境下来测试。
Pulsar:Python的事件驱动并发框架
Pulsar是一个事件驱动的并发框架,有了pulsar,你可以写出在不同进程或线程中运行一个或多个活动的异步服务器。
Web2py:全栈式Web框架
Web2py是一个为Python语言提供的全功能Web应用框架,旨在敏捷快速的开发Web应用,具有快速、安全以及可移植的数据库驱动的应用,兼容Google App Engine。
Falcon:构建云API和网络应用后端的高性能Python框架
Falcon是一个构建云API的高性能Python框架,它鼓励使用REST架构风格,尽可能以最少的力气做最多的事情。
Dpark:Python版的Spark
DPark是Spark的Python克隆,是一个Python实现的分布式计算框架,可以非常方便地实现大规模数据处理和迭代计算。DPark由豆瓣实现,目前豆瓣内部的绝大多数数据分析都使用DPark完成,正日趋完善。
Buildbot:基于Python的持续集成测试框架
Buildbot是一个开源框架,可以自动化软件构建、测试和发布等过程。每当代码有改变,服务器要求不同平台上的客户端立即进行代码构建和测试,收集并报告不同平台的构建和测试结果。
Zerorpc:基于ZeroMQ的高性能分布式RPC框架
Zerorpc是一个基于ZeroMQ和MessagePack开发的远程过程调用协议(RPC)实现。和 Zerorpc 一起使用的 Service API 被称为 zeroservice。Zerorpc 可以通过编程或命令行方式调用。
Bottle: 微型Python Web框架
Bottle是一个简单高效的遵循WSGI的微型python Web框架。说微型,是因为它只有一个文件,除Python标准库外,它不依赖于任何第三方模块。
Tornado:异步非阻塞IO的Python Web框架
Tornado的全称是Torado Web Server,从名字上看就可知道它可以用作Web服务器,但同时它也是一个Python Web的开发框架。最初是在FriendFeed公司的网站上使用,FaceBook收购了之后便开源了出来。
webpy: 轻量级的Python Web框架
webpy的设计理念力求精简(Keep it simple and powerful),源码很简短,只提供一个框架所必须的东西,不依赖大量的第三方模块,它没有URL路由、没有模板也没有数据库的访问。
Scrapy:Python的爬虫框架
Scrapy是一个使用Python编写的,轻量级的,简单轻巧,并且使用起来非常的方便。
I. 电脑可以实现分布式爬虫(python编写)吗
一般是用redis做消息列队,将所有要抓取的庆迅url放到redis里面,然后在分布式的各个誉袭此机器上面禅亏读取redis里面的url实行抓取
J. python分布式框架有哪些
Dask是Python的分布态伏斗式计算框架,它支持分布式的DataFrame,也就是pandas的DataFrame,二者接口完美兼容,但Dask是分布式计算的框架,可以支持内存无法装载的数据,进行计算,它也支持对一般的python程序进行分布式帆磨计算。是非常优秀的Python框架。本文主要介绍Dask的几种不同的调度器的使用。
Dask支持多种调度器,从单线程、多线程、多进程到本地分布式和集群分布式,各种调度器在不同情况下有不同的作用,本文来源于Dask官方文档的翻译,主要向大家介绍这五种调度厅闹器的使用情景和方式。最后提供了如何在不同情境下设置Dask调度器的方法。