陈思思8分钟磁力链接完整版能给我过来吗,磁力失效了。

python多进程退出的问题主进程起了10个子进程,每个子进程做一些数据统计,最后主进程需要做汇总,主进程如何知道所有子进程已经处理完成了?
有什么方法
回答1:使用一个列表管理这10个进程
for&i&in&(10):
&&&&if&proclist[i].is_alive():
&&&&&&&&#如果还活着就...&...[Python之旅]第六篇(六):Python多进程使用
关于进程与线程的对比,下面的解释非常好的说明了这两者的区别:
& & 这里主要说明关于多进程的下面几点:
1.多进程的使用方法
2.进程间的通信之multiprocessing.Manager()使用
3.Python进程池
(1)比较简单的例子
(2)多个进程多次并发的情况
(3)验证apply.async方法是非阻塞的
(4)验证apply.async中的get()方法是阻塞的
1.多进程的使用方法
& & 直接给出下面程序代码及注释:
from multiprocessing import Process
#从多进程模块中导入Process
import time
def sayHi(name):
print 'Hi my name is %s' % name
time.sleep(3)
for i in range(10):
p = Process(target=sayHi, args=(i,))
#调用多进程使用方法
#开始执行多进程
& & 程序执行结果如下:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ python multiprocssing8.py
Hi my name is 2
Hi my name is 3
Hi my name is 6
Hi my name is 1
Hi my name is 4
Hi my name is 5
Hi my name is 0
Hi my name is 7
Hi my name is 8
Hi my name is 9
& & 输出顺序不一致,则是因为屏幕的抢占问题而已,但不同的进程执行是并发的。在执行程序的过程中,可以打开另一个窗口来查看进程的执行情况(上面sleep了3秒,所以速度一定要快):
xpleaf@xpleaf-machine:~$ ps -ef | grep mul*
00:00:00 python multiprocssing8.py
0 19:34 pts/1
00:00:00 python multiprocssing8.py
0 19:34 pts/1
00:00:00 python multiprocssing8.py
0 19:34 pts/1
00:00:00 python multiprocssing8.py
0 19:34 pts/1
00:00:00 python multiprocssing8.py
0 19:34 pts/1
00:00:00 python multiprocssing8.py
0 19:34 pts/1
00:00:00 python multiprocssing8.py
0 19:34 pts/1
00:00:00 python multiprocssing8.py
0 19:34 pts/1
00:00:00 python multiprocssing8.py
0 19:34 pts/1
00:00:00 python multiprocssing8.py
0 19:34 pts/1
00:00:00 python multiprocssing8.py
00:00:00 grep --color=auto mul*
& & 可以看到上面有11个进程,但是前面其实只开了10个进程,为什么会有11个呢?那是因为有一个主进程,即这整一个程序本身,而其它的10个进程则是这个主进程下面的子进程,但无论如何,它们都是进程。
& & 同多线程一样,多进程也有join方法,即可以在p.start()后面加上去,一个进程的执行需要等待上一个进程执行完毕后才行,这就相当于进程的执行就是串行的了。
2.进程间的通信multiprocessing.Manager()使用
& & Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。
& & Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
& & 直接看下面的一个例子:
import multiprocessing
import time
def worker(d, key, value):
d[key] = value
mgr = multiprocessing.Manager()
d = mgr.dict()
#用来接收多进程函数的返回的结果,存放的是函数的入口
for i in range(10):
jobs.append(multiprocessing.Process(target=worker,args=(d,i,i*i)))
for j in jobs:
#执行存放的函数入口
for j in jobs:
#检测进程是否执行完毕
#time.sleep(1)
#如果有join()来进程进程是否执行完毕,则这里可以省略
print ('Results:' )
& & 程序执行结果如下:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ python multiprocssing_manager9.py
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81}
3.Python进程池
& & 前面我们讲过CPU在某一时刻只能执行一个进程,那为什么上面10个进程还能够并发执行呢?实际在CPU在处理上面10个进程时是在不停的切换执行这10个进程,但由于上面10个进程的程序代码都是十分简单的,并没有涉及什么复杂的功能,并且,CPU的处理速度实在是非常快,所以这样一个过程在我们人为感知里确实是在并发执行的,实际只不过是CPU在不停地切换而已,这是通过增加切换的时间来达到目的的。
& & 10个简单的进程可以产生这样的效果,那试想一下,如果我有100个进程需要CPU执行,但因为CPU还要进行其它工作,只能一次再处理10个进程(切换处理),否则有可能会影响其它进程工作,这下可怎么办?这时候就可以用到Python中的进程池来进行调控了,在Python中,可以定义一个进程池和这个池的大小,假如定义进程池的大小为10,那么100个进程可以分10次放进进程池中,然后CPU就可以10次并发完成这100个进程了。
(1)比较简单的例子
& & 程序代码及注释如下:
from multiprocessing import Process,Pool
#导入Pool模块
import time
def sayHi(num):
time.sleep(1)
return num*num
p = Pool(processes=5)
#定义进程池的数量为5
result = p.apply_async(sayHi, [10]) &#开始执行多进程,async为异步执行,即不会等待其它
#子进程的执行结果,为非阻塞模式,除非使用了get()方法,get()方法会等待子进程返回执行结果,
#再去执行下一次进程,可以看后面的例子;同理下有apply方法,阻塞模式,会等待子进程返回执行结果
print result.get() & &#get()方法
& & 程序执行结果如下:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool10.py
sys 0m0.032s
& & 虽然是定义了进程池的数量为5,但由于这里只执行一个子进程,所以时间为1秒多。
& & 上面的程序可以改写为下面的形式:
from multiprocessing import Process,Pool
import time
def sayHi(num):
time.sleep(1)
return num*num
p = Pool(processes=5)
result = p.map(sayHi,range(3))
for i in result:print i
& & 执行结果如下:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ python multiprocssing_pool10.py
(2)多个进程多次并发的情况:解释进程池作用以及多进程并发执行消耗切换时间
& & 修改上面的程序代码如下:
from multiprocessing import Process,Pool
import time
def sayHi(num):
time.sleep(1)
return num*num
p = Pool(processes=5)
result_list = []
for i in range(30):
result_list.append(p.apply_async(sayHi, [i]))
for res in result_list:
print res.get()
& & 程序执行结果如下:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ python multiprocssing_pool_2_11.py
& & 每一部分数字之间有空白是因为我按了回车键的原因,以让这个结果更加明显,同时也可以知道,上面的30个进程是分6次来完成的,是因为我定义了进程池的数量为5(30/6=5),为了更有说服力,可以看一下程序的执行时间:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool_2_11.py | grep real
sys 0m0.028s
& & 可以看到执行的时间为6秒多,之所以不是6秒是因为主程序本身的执行需要一点时间,同时进程间的切换也是需要时间的(这里为5个进程间的切换,因为每次并发执行的进程数为5个),为了说明这一点,我们可以把pool大小改为100,但依然是并发执行6次,程序代码修改为如下:
from multiprocessing import Process,Pool
import time
def sayHi(num):
time.sleep(1)
return num*num
p = Pool(processes=100)
result_list = []
for i in range(600):
result_list.append(p.apply_async(sayHi, [i]))
for res in result_list:
print res.get()
& & 再观察一下执行时间:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool_2_11.py | grep real
sys 0m0.128s
& & 虽然相差的时间只是零点几秒,但随着并发执行进程数的增加,进程间切换需要的时间越来越多,程序执行的时间也就越多,特别是当单个进程非常消耗CPU资源时。
(3)验证apply.sync方法是非阻塞的
& & 第一个程序代码的注释中,我们说apply.sync方法是非阻塞的,也就是说,无论子进程是否已经执行完毕,只要主进程执行完毕,程序就会退出,看下面的探索过程,以验证一下。
& & 看下面的程序代码:
from multiprocessing import Process,Pool
import time
def sayHi(num):
time.sleep(10)
return num*num
p = Pool(processes=5)
result_list = []
for i in range(30):
result_list.append(p.apply_async(sayHi, [i]))
for res in result_list:
print res.get()
& & 先查看程序的执行时间:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool_2_11.py | grep real
sys 0m0.024s
& & 第一次运行这个程序时,出乎了我的意料,本来我以为这个程序的执行要18s左右才对的,因为子进程并发执行了6次,每一次都sleep了3s(并发执行的进程数比较少,切换的时间就不算上去了),但实际上也并非是如此,因为我查看进程时,情况是下面这样的:
xpleaf@xpleaf-machine:~$ ps -ef | grep mul*
00:00:00 grep --color=auto mul*
& & 如果原来我的想法是正确的,那么应该在这里可以看到多个我执行的进程才对(因为有个3s的时间在子进程里,并发6次,18s,应该有才对),为什么会没有呢?后来我把程序代码修改为如下:
from multiprocessing import Process,Pool
import time
def sayHi(num):
time.sleep(3)
return num*num
p = Pool(processes=5)
result_list = []
for i in range(30):
result_list.append(p.apply_async(sayHi, [i]))
time.sleep(3)
& & 即我在主程序中添加了time.sleep(3)的代码,还是先查看时间:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool_2_11.py | grep real
sys 0m0.032s
& & 在上面程序执行过程中,迅速地在另一个窗口查看系统进程:
xpleaf@xpleaf-machine:~$ ps -ef | grep mul*
00:00:00 python multiprocssing_pool_2_11.py
0 20:39 pts/1
00:00:00 python multiprocssing_pool_2_11.py
0 20:39 pts/1
00:00:00 python multiprocssing_pool_2_11.py
0 20:39 pts/1
00:00:00 python multiprocssing_pool_2_11.py
0 20:39 pts/1
00:00:00 python multiprocssing_pool_2_11.py
0 20:39 pts/1
00:00:00 python multiprocssing_pool_2_11.py
00:00:00 grep --color=auto mul*
& & 程序执行结束后,即显示了上面的时间后,我再查看进程:
xpleaf@xpleaf-machine:~$ ps -ef | grep mul*
00:00:00 grep --color=auto mul*
& & 于是,上网查找了一些资料,apply.async是非阻塞的,所谓的非阻塞是指:主进程不会等待子进程的返回结果后再结束;正常情况下,如果是产生于主进程的子进程,在主进程结束后也应该不会退出才对,但因为这里的子进程是由pool进程池产生的,所以主进程结束,pool即关闭,因为pool池中的进程需要pool调度才能执行,因此当pool关闭后,这些子进程也随即结束运行。
& & 其实join方法就可以实现一个功能,就是让子进程结束后才结束主进程,把上面的代码修改为如下:
from multiprocessing import Process,Pool
import time
def sayHi(num):
time.sleep(3)
return num*num
p = Pool(processes=5)
result_list = []
for i in range(30):
result_list.append(p.apply_async(sayHi, [i]))
#执行p.join()前需要先关闭进程池,否则会出错
#主进程等待子进程执行完后才结束
& & 查看执行的时间:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool_2_11.py | grep real
sys 0m0.044s
xpleaf@xpleaf-mac
& & 当然,结果就是我们可以预料的了。
(4)验证apply.async中的get()方法是阻塞的
& & 使用apply.sync中的get()方法时,是会阻塞的,即apply.sync会等进程返回执行结果后才会执行下一个进程,其实(2)中的第一个例子就可以体现出来(程序中有get(),于是就忽略apply.async的非阻塞特性,等待子进程返回结果并使用get()获得结果)。这里不妨看下来一个例子,以实现虽然是多进程并发,但是因为get()的缘故,进程是串行执行的。
& & 程序代码如下:
from multiprocessing import Process,Pool
import time
def sayHi(num):
time.sleep(1)
return num*num
p = Pool(processes=5)
for i in range(20):
result = p.apply_async(sayHi, [i])
print result.get()
& & 程序执行结果如下:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool10.py
sys 0m0.064s
& & 结果是一个一个输出的,其实从程序执行的时间也可以推算出来,至于为什么,那就是因为get()导致阻塞的原因了。
& & 上面说得其实思路是不太清晰,主要是因为对多进程的掌握是还不够多的,在这个探索的过程中,自己也是慢慢接触到了许多思想和方法,还有和操作系统相关的知识,往后深入学习后,如果有时间,会再完善一下。18:36 提问
Python如何实现多线程与多进程的配合工作?
#!/usr/bin/env python
#coding=utf8
import multiprocessing
import threading
import threadpool
import time
开启了11个进程,第一个进程是以多线程方式运行,再通过进程间通信来使其他进程工作.
def put_test(str1):
# 处理出函数名,开启多线程
print str1,
q.put(str1)
time.sleep(1)
def get_test():
# 进程池除第一个以外的进程
print q.qsize(),os.getpid()
print q.get(True,1)
time.sleep(1)
###############################################################
if name=='__main__':
multiprocessing.freeze_support()
q=multiprocessing.Queue(maxsize = 10)
data=range(1,11)
pool=threadpool.ThreadPool(10)
requests=threadpool.makeRequests(put_test,data)
[pool.putRequest(req) for req in requests]
pool.wait()
pool2 = multiprocessing.Pool(processes=3)
for var in range(1,11): # 20个进程
pool2.apply_async(get_test,[])
pool2.close()
pool2.join()
print "size:",q.qsize()
这段代码出问题了,求助。
按赞数排序
出问题具体什么问题,是不是数据同步的问题。多加一些print分析。
第一点就是现在很少使用进程,用的更多是多线程
第二点多线程或者多进程最容易出现的问题就是互斥资源的处理问题,一般的处理就是加锁
还有最好提问时能把错误删除贴上,便于分析
其他相似问题python 之 多进程
来源:博客园

序. multiprocessingpython中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
 

1. Process
创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。方法:is_alive() 、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。
is_alive():判断该进程是否还活着
join([timeout]):主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
run():进程p调用start()时,自动调用run()
 
属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。
 
例1.1:创建函数并将其作为单个进程

import multiprocessing
import time

def worker(interval):
n = 5
while n & 0:
print("The time is {0}".format(time.ctime()))
#输出时间的格式
time.sleep(interval)
n -= 1

if __name__ == "__main__":
p = multiprocessing.Process(target = worker, args = (3,))
p.start()
print "p.pid:", p.pid
print "p.name:", p.name
print "p.is_alive:", p.is_alive()

结果




1
2
3
4
5
6
7
8



p.pid: 8736
p.name: Process-1
p.is_alive: True
The time is Tue Apr 21 20:55:12 2015
The time is Tue Apr 21 20:55:15 2015
The time is Tue Apr 21 20:55:18 2015
The time is Tue Apr 21 20:55:21 2015
The time is Tue Apr 21 20:55:24 2015





 
例1.2:创建函数并将其作为多个进程


import multiprocessing
import time

def worker_1(interval):
print "worker_1"
time.sleep(interval)
print "end worker_1"

def worker_2(interval):
print "worker_2"
time.sleep(interval)
print "end worker_2"

def worker_3(interval):
print "worker_3"
time.sleep(interval)
print "end worker_3"

if __name__ == "__main__":
  p1 = Process(target=worker_1, args=(6,))
p2 = Process(target=worker_2, args=(4,))
p3 = Process(target=worker_3, args=(2,))
  p1.start() p2.start() p3.start()
  print("The number of CPU is:" + str(cpu_count()))
   for p in active_children():
   print("child p.name:=%s" % p.name + "\tp.id=%s" % str(p.pid))
  print(p1.pid)
  print("END-----")


结果



1
2
3
4
5
6
7
8
9
10
11




The number of CPU is:4child p.name:=Process-2 p.id=3864child p.name:=Process-3 p.id=3256child p.name:=Process-1 p.id=73367336END-----worker_1worker_2worker_3end worker_3end worker_2end worker_1





 
例1.3:将进程定义为类

import multiprocessing
import time

class ClockProcess(multiprocessing.Process):
def __init__(self, interval):
multiprocessing.Process.__init__(self)
self.interval = interval

def run(self):
n = 5
while n & 0:
print("the time is {0}".format(time.ctime()))
time.sleep(self.interval)
n -= 1

if __name__ == '__main__':
p = ClockProcess(3)


注:进程p调用start()时,自动调用run()
结果




1
2
3
4
5



the time is Tue Apr 21 20:31:30 2015
the time is Tue Apr 21 20:31:33 2015
the time is Tue Apr 21 20:31:36 2015
the time is Tue Apr 21 20:31:39 2015
the time is Tue Apr 21 20:31:42 2015





 
例1.4:daemon程序对比结果
#1.4-1 不加daemon属性


import multiprocessing
import time

def worker(interval):
print("work start:{0}".format(time.ctime()));
time.sleep(interval)
print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
p = multiprocessing.Process(target = worker, args = (3,))
p.start()
print "end!"


结果




1
2
3



end!
work start:Tue Apr 21 21:29:10 2015
work end:Tue Apr 21 21:29:13 2015





#1.4-2 加上daemon属性


import multiprocessing
import time

def worker(interval):
print("work start:{0}".format(time.ctime()));
time.sleep(interval)
print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
p = multiprocessing.Process(target = worker, args = (3,))
p.daemon = True
p.start()
print "end!"


结果




1



end!





注:因子进程设置了daemon属性,主进程结束,它们就随着结束了。
  在多线程模型中,默认情况下(sub-Thread.daemon=False)主线程会等待子线程退出后再退出,而如果sub- Thread.setDaemon(True)时,主线程不会等待子线程,直接退出,而此时子线程会随着主线程的对出而退出,避免这种情况,主线程中需要 对子线程进行join,等待子线程执行完毕后再退出。对应的,在多进程模型中,Process类也有daemon属性,而它表示的含义与 Thread.daemon类似,当设置sub-Process.daemon=True时,主进程中需要对子进程进行等待,否则子进程会随着主进程的退 出而退出
更详细:
#1.4-3 设置daemon执行完结束的方法


import multiprocessing
import time

def worker(interval):
print("work start:{0}".format(time.ctime()));
time.sleep(interval)
print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
p = multiprocessing.Process(target = worker, args = (3,))
p.daemon = True
p.start()
p.join()
print "end!"


结果




1
2
3



work start:Tue Apr 21 22:16:32 2015
work end:Tue Apr 21 22:16:35 2015
end!





 

2. Lock
当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。

import multiprocessing
import sys

def worker_with(lock, f):
with lock:
fs = open(f, 'a+')
n = 10
while n & 1:
fs.write("Lockd acquired via with\n")
n -= 1
fs.close()

def worker_no_with(lock, f):
lock.acquire()
fs = open(f, 'a+')
n = 10
while n & 1:
fs.write("Lock acquired directly\n")
n -= 1
fs.close()
finally:
lock.release()

if __name__ == "__main__":
lock = multiprocessing.Lock()
f = "file.txt"
w = multiprocessing.Process(target = worker_with, args=(lock, f))
nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
w.start()
nw.start()
print "end"

结果(输出文件)




1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18



Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly





 

3. Semaphore
Semaphore用来控制对共享资源的访问数量,例如池的最大连接数。

import multiprocessing
import time

def worker(s, i):
s.acquire()
print(multiprocessing.current_process().name + "acquire");
time.sleep(i)
print(multiprocessing.current_process().name + "release\n");
s.release()

if __name__ == "__main__":
s = multiprocessing.Semaphore(2)
for i in range(5):
p = multiprocessing.Process(target = worker, args=(s, i*2))
p.start()

结果




1
2
3
4
5
6
7
8
9
10
11
12
13
14



Process-1acquire
Process-1release
 
Process-2acquire
Process-3acquire
Process-2release
 
Process-5acquire
Process-3release
 
Process-4acquire
Process-5release
 
Process-4release





例子2:

import multiprocessing
import time


def worker(s, ):
s.acquire()
print(multiprocessing.current_process().name + "acquire")
time.sleep(1)
# print(multiprocessing.current_process().name + "release\n")
s.release()

if __name__ == "__main__":
s = multiprocessing.Semaphore(2)
for i in range(5):
p = multiprocessing.Process(target = worker, args=(s, ))
# time.sleep(0.01)
p.start()#####结果######
Process-4acquireProcess-3acquire
Process-1acquireProcess-2acquire
Process-5acquire

 

4. Event
Event用来实现进程间同步通信。

import multiprocessingimport timedef wait_for_event(e):
print("wait_for_event: starting")
e.wait() #一直阻塞的去等待set值
print('*****')
print("wairt_for_event: e.is_set()-&" + str(e.is_set()))def wait_for_event_timeout(e, t):
print("wait_for_event_timeout:starting")
#等2s去取set值
print('------')
print("wait_for_event_timeout:e.is_set-&" + str(e.is_set()))if __name__ == "__main__":
e = multiprocessing.Event()
w1 = multiprocessing.Process(name="block", target=wait_for_event, args=(e,))
w2 = multiprocessing.Process(name="non-block", target=wait_for_event_timeout, args=(e, 2))
w1.start()
w2.start()
time.sleep(10)
# 设置set的值
print("main: event is set")

结果



1
2
3
4
5




wait_for_event: startingwait_for_event_timeout:starting------wait_for_event_timeout:e.is_set-&Falsemain: event is set*****wairt_for_event: e.is_set()-&True





 

5. Queue
Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
 
get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。Queue的一段示例代码:

import multiprocessing

def writer_proc(q):
q.put(1, block = False) 


def reader_proc(q):
print q.get(block = False) 
pass

if __name__ == "__main__":
q = multiprocessing.Queue()
writer = multiprocessing.Process(target=writer_proc, args=(q,))
writer.start()


reader = multiprocessing.Process(target=reader_proc, args=(q,))
reader.start()


#reader.join()
这样会一直阻塞
#writer.join()

结果




1



1





 

6. Pipe
Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。
 
send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。


import multiprocessingimport timedef proc1(pipe):
# while True:
for i in range(3):
print("send: %s" %(i))
pipe.send(i)
time.sleep(1)def proc2(pipe):
while True:
print ("proc2 rev:", pipe.recv())
time.sleep(1)def proc3(pipe):
while True:
print("PROC3 rev:", pipe.recv())
time.sleep(1)if __name__ == "__main__":
pipe = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
#p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))
p1.start()
p2.start()
# p3.start()
# p1.join()
# p2.join()
# p3.join()#######结果########
send: 0proc2 rev: 0send: 1proc2 rev: 1send: 2proc2 rev: 2

结果


 

7. Pool
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
 例子:

import multiprocessing
import time


def func(msg, a):
# if a == 1:
time.sleep(8)
print(1)
print("msg:", msg)

print("++++")
time.sleep(3)
# print("end")

if __name__ == "__main__":
pool = multiprocessing.Pool(processes=3)
for i in range(7):
msg = "hello %d" % (i)
a = i
# 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
pool.apply_async(func, (msg, a, ))
pool.close()

# 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
pool.join()
print("Sub-process(es) done.")

例7.1:使用进程池(非阻塞)

#coding: utf-8
import multiprocessing
import time

def func(msg):
print "msg:", msg
time.sleep(3)
print "end"

if __name__ == "__main__":
pool = multiprocessing.Pool(processes = 3)
for i in xrange(4):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
pool.close()
pool.join()
#调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
print "Sub-process(es) done."

一次执行结果




1
2
3
4
5
6
7
8
9
10



mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0
 
msg: hello 1
msg: hello 2
end
msg: hello 3
end
end
end
Sub-process(es) done.





函数解释:
apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)
close()
关闭pool,使其不在接受新的任务。
terminate()
结束工作进程,不在处理未完成的任务。
join()
主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
执行说明:创建一个进程池pool,并设定进程的数量为3,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()处等待各个进程的结束。
 
例7.2:使用进程池(阻塞)

#coding: utf-8
import multiprocessing
import time

def func(msg):
print "msg:", msg
time.sleep(3)
print "end"

if __name__ == "__main__":
pool = multiprocessing.Pool(processes = 3)
for i in xrange(4):
msg = "hello %d" %(i)
pool.apply(func, (msg, ))
#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
pool.close()
pool.join()
#调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
print "Sub-process(es) done."

一次执行的结果




1
2
3
4
5
6
7
8
9
10



msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.





  
例7.3:使用进程池,并关注结果

import multiprocessing
import time

def func(msg):
print "msg:", msg
time.sleep(3)
print "end"
return "done" + msg

if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(3):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print ":::", res.get()
print "Sub-process(es) done."

一次执行结果




1
2
3
4
5
6
7
8
9
10



msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.





 
例7.4:使用多个进程池

import multiprocessing
import os, time, random


def Lee(i):
print('1', i)
time.sleep(3)
print('-----')
# print("\nRun task Lee-%s" %(os.getpid())) #os.getpid()获取当前的进程的ID
# start = time.time()
# time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
# end = time.time()
# print('Task Lee, runs %0.2f seconds.' % (end - start))


def Marlon(i):
print('2', i)
time.sleep(3)
print('-----')
# print("\nRun task Marlon-%s" % (os.getpid()))
# start = time.time()
# time.sleep(random.random() * 40)
# end = time.time()
# print('Task Marlon runs %0.2f seconds.' %(end - start))


def Allen(i):
print('3', i)
time.sleep(3)
print('-----')
# print("\nRun task Allen-%s" %(os.getpid()))
# start = time.time()
# time.sleep(random.random() * 30)
# end = time.time()
# print('Task Allen runs %0.2f seconds.' %(end - start))


def Frank(i):
print('4', i)
time.sleep(3)
print('-----')
# print("\nRun task Frank-%s" %(os.getpid()))
# start = time.time()
# time.sleep(random.random() * 20)
# end = time.time()
# print('Task Frank runs %0.2f seconds.' %(end - start))

if __name__ == '__main__':

function_list = [Lee, Marlon, Allen, Frank]

# print("parent process %s" % (os.getpid()))

pool = multiprocessing.Pool(4)
for func in function_list:
# Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
for i in ['a', 'b', 'c','d', 'e', 'f', 'g']:
pool.apply_async(func, args=(i,))

print('Waiting for all subprocesses done...')
pool.close()
# 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束

pool.join()
print('All subprocesses done.')

View Code
 

#coding: utf-8
import multiprocessing
import os, time, random

def Lee():
print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()获取当前的进程的ID
start = time.time()
time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
end = time.time()
print 'Task Lee, runs %0.2f seconds.' %(end - start)

def Marlon():
print "\nRun task Marlon-%s" %(os.getpid())
start = time.time()
time.sleep(random.random() * 40)
end=time.time()
print 'Task Marlon runs %0.2f seconds.' %(end - start)

def Allen():
print "\nRun task Allen-%s" %(os.getpid())
start = time.time()
time.sleep(random.random() * 30)
end = time.time()
print 'Task Allen runs %0.2f seconds.' %(end - start)

def Frank():
print "\nRun task Frank-%s" %(os.getpid())
start = time.time()
time.sleep(random.random() * 20)
end = time.time()
print 'Task Frank runs %0.2f seconds.' %(end - start)

if __name__=='__main__':
function_list=
[Lee, Marlon, Allen, Frank] 
print "parent process %s" %(os.getpid())

pool=multiprocessing.Pool(4)
for func in function_list:
pool.apply_async(func)
#Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

print 'Waiting for all subprocesses done...'
pool.close()
pool.join()
#调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
print 'All subprocesses done.'

一次执行结果



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15



parent process 7704
 
Waiting for all subprocesses done...
Run task Lee-6948
 
Run task Marlon-2896
 
Run task Allen-7304
 
Run task Frank-3052
Task Lee, runs 1.59 seconds.
Task Marlon runs 8.48 seconds.
Task Frank runs 15.68 seconds.
Task Allen runs 18.08 seconds.
All subprocesses done.



免责声明:本站部分内容、图片、文字、视频等来自于互联网,仅供大家学习与交流。相关内容如涉嫌侵犯您的知识产权或其他合法权益,请向本站发送有效通知,我们会及时处理。反馈邮箱&&&&。
学生服务号
在线咨询,奖学金返现,名师点评,等你来互动

我要回帖

更多关于 陈思思8分钟下载磁力 的文章

 

随机推荐