當前位置:首頁 » 編程語言 » python並發

python並發

發布時間: 2022-01-08 16:43:14

python並發和java並發的區別

使用tornado的前提是你的服務是IO密集型的,並且你得寫非同步api,也可以請參考我簽名中的框架,把tornado改造成eventloop+threadpool (GitHub - nikoloss/iceworld: tonado的multi-thread 多線程封裝)。我們公司的android ios wap後台全是這套框架在提供服務。目前已經切換到一個分布式http響應群組裡面了,此時tornado只是作為一個中繼的gateway存在:GitHub - nikoloss/cellnest: 分布式service

在沒有阻塞的情況下,python的性能肯定不如編譯型語言。這種全非同步的模型的優勢也無法體現出來,一旦有IO操作了,這種全非同步模型的第一步accpet新連接的操作並不會暫停,也就是只要有內容抵達,至少ioloop這個環節是可以照單全收的,接收之後協程處理,隨著並發量增長它的性能下降是平穩且平滑的。反觀線程模型,如果消費數據趕不上新連接新數據的生產,性能就會直線下降。

你的700qps差不多,你可以換3.1或者3.2的tornado試試,1100~1400應該可以跑出來。當然追求一個靜態文本的輸出性能是否有必要,我覺得實際情況比這種單純的壓測要復雜的多。

② 如何優雅的編寫Python並發程序

在Python中,由於歷史原因(GIL),使得Python中多線程的效果非常不理想.GIL使得任何時刻Python只能利用一個CPU核,並
且它的調度演算法簡單粗暴:多線程中,讓每個線程運行一段時間t,然後強行掛起該線程,繼而去運行其他線程,如此周而復始,直到所有線程結束.

這使得無法有效利用計算機系統中的"局部性",頻繁的線程切換也對緩存不是很友好,造成資源的浪費.

據說Python官方曾經實現了一個去除GIL的Python解釋器,但是其效果還不如有GIL的解釋器,遂放棄.後來Python官方推出了"利
用多進程替代多線程"的方案,在Python3中也有concurrent.futures這樣的包,讓我們的程序編寫可以做到"簡單和性能兼得".

多進程/多線程+Queue

一般來說,在Python中編寫並發程序的經驗是:計算密集型任務使用多進程,IO密集型任務使用多進程或者多線程.另外,因為涉及到資源共享,所
以需要同步鎖等一系列麻煩的步驟,代碼編寫不直觀.另外一種好的思路是利用多進程/多線程+Queue的方法,可以避免加鎖這樣麻煩低效的方式.

現在在Python2中利用Queue+多進程的方法來處理一個IO密集型任務.

假設現在需要下載多個網頁內容並進行解析,單進程的方式效率很低,所以使用多進程/多線程勢在必行.

③ python多進程,多線程分別是並行還是並發

並發和並行

你吃飯吃到一半,電話來了,你一直到吃完了以後才去接,這就說明你不支持並發也不支持並行。
你吃飯吃到一半,電話來了,你停了下來接了電話,接完後繼續吃飯,這說明你支持並發。
你吃飯吃到一半,電話來了,你一邊打電話一邊吃飯,這說明你支持並行。
並發的關鍵是你有處理多個任務的能力,不一定要同時。
並行的關鍵是你有同時處理多個任務的能力。
所以我認為它們最關鍵的點就是:是否是『同時』。
Python 中沒有真正的並行,只有並發
無論你的機器有多少個CPU, 同一時間只有一個Python解析器執行。這也和大部分解釋型語言一致, 都不支持並行。這應該是python設計的先天缺陷。
javascript也是相同的道理, javascript早起的版本只支持單任務,後來通過worker來支持並發。
Python中的多線程
先復習一下進程和線程的概念
所謂進程,簡單的說就是一段程序的動態執行過程,是系統進行資源分配和調度的一個基本單位。一個進程中又可以包含若干個獨立的執行流,我們將這些執行流稱為線程,線程是CPU調度和分配的基本單位。同一個進程的線程都有自己的專有寄存器,但內存等資源是共享的。
這里有一個更加形象的解釋, 出自阮一峰大神的傑作:
http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html
Python中的thread的使用
通過 thread.start_new_thread 方法
import thread
import time

# Define a function for the thread
def print_time( threadName, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
print "%s: %s" % ( threadName, time.ctime(time.time()) )

# Create two threads as follows
try:
thread.start_new_thread( print_time, ("Thread-1", 2, ) )
thread.start_new_thread( print_time, ("Thread-2", 4, ) )
except:
print "Error: unable to start thread"

while 1:
pass

通過繼承thread
#!/usr/bin/python
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print "Starting " + self.name
print_time(self.name, self.counter, 5)
print "Exiting " + self.name

def print_time(threadName, delay, counter):
while counter:
if exitFlag:
threadName.exit()
time.sleep(delay)
print "%s: %s" % (threadName, time.ctime(time.time()))
counter -= 1

# Create new threads
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# Start new Threads
thread1.start()
thread2.start()
print "Exiting Main Thread"

線程的同步
#!/usr/bin/python

import threading
import time

class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print "Starting " + self.name
# Get lock to synchronize threads
threadLock.acquire()
print_time(self.name, self.counter, 3)
# Free lock to release next thread
threadLock.release()

def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print "%s: %s" % (threadName, time.ctime(time.time()))
counter -= 1

threadLock = threading.Lock()
threads = []

# Create new threads
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# Start new Threads
thread1.start()
thread2.start()

# Add threads to thread list
threads.append(thread1)
threads.append(thread2)

# Wait for all threads to complete
for t in threads:
t.join()
print "Exiting Main Thread"

利用multiprocessing多進程實現並行
進程的創建
Python 中有一套類似多線程API 的的類來進行多進程開發: multiprocessing
這里是一個來自官方文檔的例子:
from multiprocessing import Process
def f(name):
print 'hello', name

if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()

類似與線程,一可以通過繼承process類來實現:
from multiprocessing import Process
class Worker(Process):
def run(self):
print("in" + self.name)

if __name__ == '__main__':
jobs = []
for i in range(5):
p = Worker()
jobs.append(p)
p.start()
for j in jobs:
j.join()

進程的通信
Pipe()
pipe()函數返回一對由雙向通信的管道連接的對象,這兩個對象通過send, recv 方法實現 信息的傳遞
from multiprocessing import Process, Pipe

def f(conn):
conn.send([42, None, 'hello'])
conn.close()

if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv() # prints "[42, None, 'hello']"
p.join()

Quene
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])

if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print q.get() # prints "[42, None, 'hello']"
p.join()

進程間的同步
Python 中多進程中也有類似線程鎖的概念,使用方式幾乎一樣:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
print 'hello world', i
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()

進程間的共享內存
每個進程都有獨自的內存,是不能相互訪問的, 也行 python官方覺得通過進程通信的方式過於麻煩,提出了共享內存的概念,以下是官方給出的例子:
from multiprocessing import Process, Value, Array

def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]

if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))

p = Process(target=f, args=(num, arr))
p.start()
p.join()

print num.value
print arr[:]

總結
python通過多進程實現多並行,充分利用多處理器,彌補了語言層面不支持多並行的缺點。Python, Node.js等解釋型語言似乎都是通過這種方式來解決同一個時間,一個解釋器只能處理一段程序的問題, 十分巧妙。

④ python簡單的並發問題

  • #!/usr/bin/envpython#-*-coding:utf-8-*-#author:ChanghuaGongimporttime,threading#fromurllib.requestimportRequest,urlopenpy3#fromurllib.#URLreq=urllib2.Request('http://47.93.169.69:10080/pigeon-web/user/user

  • #!/usr/bin/env python

    # -*- coding:utf-8 -*-

    # author: Changhua Gong

    import time,threading

    # from urllib.request import Request, urlopen py3

    # from urllib.error import URLError py3

    import urllib2

    #URL

    req = urllib2.Request('http://47.93.169.69:10080/pigeon-web/user/userExtraInfo?userId=1')

    #

    rule = {0:500,1:30}

    '''

    Rule規則:0:50,第一次運行不睡眠即為0,直接並發50次;1:20,第二秒,相當於睡眠1秒,然後並發20次,

    如第三秒需並發500次,則rule = {0:50,1:20,1:500}

    '''

    #Open url

    def geturl():

    time_b = time.time()

    try:

    response = urllib2.urlopen(req)

    print(response.read().decode("utf-8")) # 列印輸出內容

    except urllib2.URLError as e:

    if hasattr(e, 'reason'):

    print('We failed to reach a server.')

    print('Reason: ', e.reason)

    elif hasattr(e, 'code'):

    print('The server couldn/'t fulfill the request.')

    print('Error code: ', e.code)

    time_e = time.time()

    print("Thread %s runned for %ss" % (threading.current_thread().name, (time_e - time_b))) #線程訪問時效

    if __name__=='__main__':

    for k in rule:

    time.sleep(k)

    for i in range(rule[k]):

    t = threading.Thread(target=geturl)

    t.start()

⑤ Python有哪些並發方案

協程。
python的爬蟲scrapy就是採用了協程。
具體看《流程的python》第十六章。

⑥ 如何使用Python實現並發編程

多線程幾乎是每一個程序猿在使用每一種語言時都會首先想到用於解決並發的工具(JS程序員請迴避),使用多線程可以有效的利用CPU資源(Python例外)。然而多線程所帶來的程序的復雜度也不可避免,尤其是對競爭資源的同步問題。

然而在python中由於使用了全局解釋鎖(GIL)的原因,代碼並不能同時在多核上並發的運行,也就是說,Python的多線程不能並發,很多人會發現使用多線程來改進自己的Python代碼後,程序的運行效率卻下降了,這是多麼蛋疼的一件事呀!如果想了解更多細節,推薦閱讀這篇文章。實際上使用多線程的編程模型是很困難的,程序員很容易犯錯,這並不是程序員的錯誤,因為並行思維是反人類的,我們大多數人的思維是串列(精神分裂不討論),而且馮諾依曼設計的計算機架構也是以順序執行為基礎的。所以如果你總是不能把你的多線程程序搞定,恭喜你,你是個思維正常的程序猿:)

Python提供兩組線程的介面,一組是thread模塊,提供基礎的,低等級(Low Level)介面,使用Function作為線程的運行體。還有一組是threading模塊,提供更容易使用的基於對象的介面(類似於Java),可以繼承Thread對象來實現線程,還提供了其它一些線程相關的對象,例如Timer,Lock

使用thread模塊的例子
import thread

def worker():
"""thread worker function"""
print 'Worker'
thread.start_new_thread(worker)
使用threading模塊的例子
import threading
def worker():
"""thread worker function"""
print 'Worker'
t = threading.Thread(target=worker)
t.start()
或者Java Style
import threading

class worker(threading.Thread):
def __init__(self):
pass
def run():
"""thread worker function"""
print 'Worker'

t = worker()
t.start()

⑦ 並發和並行的區別 python

並發:就是同時做多件事情。

例如:終端用戶程序利用並發功能,在輸入數據的同時響應用戶輸入。伺服器利用並發,在處理第一個請求的同時響應第二個請求。只要你希望程序同時做多件事情,就需要並發。

很多人看到「並發」就會想到「多線程」,其實他們是有區別的。多線程只是並發的一種形式,但不是唯一形式


並行:就是把正在執行的大量任務分割成小塊,分配給多個同時運行的線程。

一般情況下,為了讓CPU充分利用,並行處理都會採用多線程。


所以說:並行處理是多線程的一種,而多線程是並發的一種。


還有一種非常重要但很多人不熟悉的並發類型:非同步編程,它也是並發的一種形式。

⑧ python gevent 能解決並發狀態嗎

1. gevent.server.StreamServer 會針對每個客戶端連接啟動一個greenlet處理,要注意的是,如果不循環監聽( 阻塞在read ),

每個greenlet會在完成後立即退出,從而導致客戶端退出( 發送FIN_ACK給客戶端 )。這個問題折騰了一晚上,終於弄明白了。坑爹啊。。。

2. 要非常仔細的檢查,greenlet處理的代碼,發現有可能阻塞IO的地方,盡量用gevent提供的庫。

3. 一些第三方庫隱藏了自己的實現( 通常是直接封裝C庫),要使得gevent兼容它們,可以用monkey_patch,但不保證全部管用。

4. 最後最後的一點,gevent的greenlet性能非常高,所以如果是用它作為並發的client端,那麼一定要注意,你的server端處理速度一定要足夠快!
否則你的客戶端代碼會因為服務端的慢速,而失去了greenlet的優勢。

⑨ 如何在Python中編寫並發程序

GIL

在Python中,由於歷史原因(GIL),使得Python中多線程的效果非常不理想.GIL使得任何時刻Python只能利用一個CPU核,並
且它的調度演算法簡單粗暴:多線程中,讓每個線程運行一段時間t,然後強行掛起該線程,繼而去運行其他線程,如此周而復始,直到所有線程結束.

這使得無法有效利用計算機系統中的"局部性",頻繁的線程切換也對緩存不是很友好,造成資源的浪費.

據說Python官方曾經實現了一個去除GIL的Python解釋器,但是其效果還不如有GIL的解釋器,遂放棄.後來Python官方推出了"利
用多進程替代多線程"的方案,在Python3中也有concurrent.futures這樣的包,讓我們的程序編寫可以做到"簡單和性能兼得".

多進程/多線程+Queue

一般來說,在Python中編寫並發程序的經驗是:計算密集型任務使用多進程,IO密集型任務使用多進程或者多線程.另外,因為涉及到資源共享,所
以需要同步鎖等一系列麻煩的步驟,代碼編寫不直觀.另外一種好的思路是利用多進程/多線程+Queue的方法,可以避免加鎖這樣麻煩低效的方式.

現在在Python2中利用Queue+多進程的方法來處理一個IO密集型任務.

假設現在需要下載多個網頁內容並進行解析,單進程的方式效率很低,所以使用多進程/多線程勢在必行.
我們可以先初始化一個tasks隊列,裡面將要存儲的是一系列dest_url,同時開啟4個進程向tasks中取任務然後執行,處理結果存儲在一個results隊列中,最後對results中的結果進行解析.最後關閉兩個隊列.

下面是一些主要的邏輯代碼.

# -*- coding:utf-8 -*-

#IO密集型任務
#多個進程同時下載多個網頁
#利用Queue+多進程
#由於是IO密集型,所以同樣可以利用threading模塊

import multiprocessing

def main():
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
cpu_count = multiprocessing.cpu_count() #進程數目==CPU核數目

create_process(tasks, results, cpu_count) #主進程馬上創建一系列進程,但是由於阻塞隊列tasks開始為空,副進程全部被阻塞
add_tasks(tasks) #開始往tasks中添加任務
parse(tasks, results) #最後主進程等待其他線程處理完成結果

def create_process(tasks, results, cpu_count):
for _ in range(cpu_count):
p = multiprocessing.Process(target=_worker, args=(tasks, results)) #根據_worker創建對應的進程
p.daemon = True #讓所有進程可以隨主進程結束而結束
p.start() #啟動

def _worker(tasks, results):
while True: #因為前面所有線程都設置了daemon=True,故不會無限循環
try:
task = tasks.get() #如果tasks中沒有任務,則阻塞
result = _download(task)
results.put(result) #some exceptions do not handled
finally:
tasks.task_done()

def add_tasks(tasks):
for url in get_urls(): #get_urls() return a urls_list
tasks.put(url)

def parse(tasks, results):
try:
tasks.join()
except KeyboardInterrupt as err:
print "Tasks has been stopped!"
print err

while not results.empty():
_parse(results)

if __name__ == '__main__':
main()

利用Python3中的concurrent.futures包

在Python3中可以利用concurrent.futures包,編寫更加簡單易用的多線程/多進程代碼.其使用感覺和Java的concurrent框架很相似(借鑒?)
比如下面的簡單代碼示例

def handler():
futures = set()

with concurrent.futures.ProcessPoolExecutor(max_workers=cpu_count) as executor:
for task in get_task(tasks):
future = executor.submit(task)
futures.add(future)

def wait_for(futures):
try:
for future in concurrent.futures.as_completed(futures):
err = futures.exception()
if not err:
result = future.result()
else:
raise err
except KeyboardInterrupt as e:
for future in futures:
future.cancel()
print "Task has been canceled!"
print e
return result

總結

要是一些大型Python項目也這般編寫,那麼效率也太低了.在Python中有許多已有的框架使用,使用它們起來更加高效.

⑩ python 多進程是真的並發嗎

Python提供了非常好用的多進程包multiprocessing,你只需要定義一個函數,Python會替你完成其他所有事情。
藉助這個包,可以輕松完成從單進程到並發執行的轉換。

1、新建單一進程
如果我們新建少量進程,可以如下:
import multiprocessing
import time

def func(msg):
for i in xrange(3):
print msg
time.sleep(1)

if __name__ == "__main__":
p = multiprocessing.Process(target=func, args=("hello", ))
p.start()
p.join()
print "Sub-process done."12345678910111213

2、使用進程池
是的,你沒有看錯,不是線程池。它可以讓你跑滿多核CPU,而且使用方法非常簡單。
注意要用apply_async,如果落下async,就變成阻塞版本了。
processes=4是最多並發進程數量。
import multiprocessing
import time

def func(msg):
for i in xrange(3):
print msg
time.sleep(1)

if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
for i in xrange(10):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close()
pool.join()
print "Sub-process(es) done."12345678910111213141516

3、使用Pool,並需要關注結果
更多的時候,我們不僅需要多進程執行,還需要關注每個進程的執行結果,如下:
import multiprocessing
import time

def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
return "done " + msg

if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(10):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print res.get()
print "Sub-process(es) done."

2014.12.25更新
根據網友評論中的反饋,在Windows下運行有可能崩潰(開啟了一大堆新窗口、進程),可以通過如下調用來解決:
multiprocessing.freeze_support()1

附錄(自己的腳本):
#!/usr/bin/python
import threading
import subprocess
import datetime
import multiprocessing

def dd_test(round, th):
test_file_arg = 'of=/zbkc/test_mds_crash/1m_%s_%s_{}' %(round, th)
command = "seq 100 | xargs -i dd if=/dev/zero %s bs=1M count=1" %test_file_arg
print command
subprocess.call(command,shell=True,stdout=open('/dev/null','w'),stderr=subprocess.STDOUT)

def mds_stat(round):
p = subprocess.Popen("zbkc mds stat", shell = True, stdout = subprocess.PIPE)
out = p.stdout.readlines()
if out[0].find('active') != -1:
command = "echo '0205pm %s round mds status OK, %s' >> /round_record" %(round, datetime.datetime.now())
command_2 = "time (ls /zbkc/test_mds_crash/) 2>>/round_record"
command_3 = "ls /zbkc/test_mds_crash | wc -l >> /round_record"
subprocess.call(command,shell=True)
subprocess.call(command_2,shell=True)
subprocess.call(command_3,shell=True)
return 1
else:
command = "echo '0205 %s round mds status abnormal, %s, %s' >> /round_record" %(round, out[0], datetime.datetime.now())
subprocess.call(command,shell=True)
return 0

#threads = []
for round in range(1, 1600):
pool = multiprocessing.Pool(processes = 10) #使用進程池
for th in range(10):
# th_name = "thread-" + str(th)
# threads.append(th_name) #添加線程到線程列表
# threading.Thread(target = dd_test, args = (round, th), name = th_name).start() #創建多線程任務
pool.apply_async(dd_test, (round, th))
pool.close()
pool.join()
#等待線程完成
# for t in threads:
# t.join()

if mds_stat(round) == 0:
subprocess.call("zbkc -s",shell=True)
break

熱點內容
外殼加密狗 發布:2024-12-26 08:57:59 瀏覽:843
筆記本電腦密碼怎麼破解 發布:2024-12-26 08:57:20 瀏覽:70
360雲盤分享取消密碼是多少 發布:2024-12-26 08:55:37 瀏覽:820
腳本啥格式 發布:2024-12-26 08:55:00 瀏覽:128
學C語言書 發布:2024-12-26 08:46:46 瀏覽:84
win7共享文件訪問許可權 發布:2024-12-26 08:33:22 瀏覽:147
安卓如何下載play商店app 發布:2024-12-26 08:32:31 瀏覽:498
我的世界網易伺服器卡崩進不去 發布:2024-12-26 08:20:48 瀏覽:738
sqlserver導出xml 發布:2024-12-26 08:06:26 瀏覽:289
wifi無訪問許可權 發布:2024-12-26 08:05:33 瀏覽:674