如何举报手机支付京东用户被大规模盗刷的盗刷?

lixiaobo994 的BLOG
用户名:lixiaobo994
文章数:22
访问量:331
注册日期:
阅读量:5863
阅读量:12276
阅读量:371433
阅读量:1065654
51CTO推荐博文
建议用pycharm阅读,可以收缩,也可以测试'''IO多路复用 & &I/O多路复用指:通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。目前支持I/O多路复用的系统调用有 select,poll,epoll应用场景: & &服务器需要同时处理多个处于监听状态或者多个连接状态的套接字。 & &服务器需要同时处理多种网络协议的套接字。#!/usr/bin/python# -*- coding: utf-8 -*-import selectimport socketimport Queueserver = socket.socket(socket.AF_INET,socket.SOCK_STREAM)server.setblocking(False)server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR , 1)server_address= ('192.168.1.5',8080)server.bind(server_address)server.listen(10)#select轮询等待读socket集合inputs = [server]#select轮询等待写socket集合outputs = []message_queues = {}#select超时时间timeout = 20while True: & &print "等待活动连接......" & &readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout) & &if not (readable or writable or exceptional) : & & & &print "select超时无活动连接,重新select...... " & & & & & &#循环可读事件 & &for s in readable : & & & &#如果是server监听的socket & & & &if s is server: & & & & & &#同意连接 & & & & & &connection, client_address = s.accept() & & & & & &print "新连接: ", client_address & & & & & &connection.setblocking(0) & & & & & &#将连接加入到select可读事件队列 & & & & & &inputs.append(connection) & & & & & &#新建连接为key的字典,写回读取到的消息 & & & & & &message_queues[connection] = Queue.Queue() & & & &else: & & & & & &#不是本机监听就是客户端发来的消息 & & & & & &data = s.recv(1024) & & & & & &if data : & & & & & & & &print "收到数据:" , data , "客户端:",s.getpeername() & & & & & & & &message_queues[s].put(data) & & & & & & & &if s not in outputs: & & & & & & & & & &#将读取到的socket加入到可写事件队列 & & & & & & & & & &outputs.append(s) & & & & & &else: & & & & & & & &#空白消息,关闭连接 & & & & & & & &print "关闭连接:", client_address & & & & & & & &if s in outputs : & & & & & & & & & &outputs.remove(s) & & & & & & & &inputs.remove(s) & & & & & & & &s.close() & & & & & & & &del message_queues[s] & &for s in writable: & & & &try: & & & & & &msg = message_queues[s].get_nowait() & & & &except Queue.Empty: & & & & & &print "连接:" , s.getpeername() , '消息队列为空' & & & & & &outputs.remove(s) & & & &else: & & & & & &print "发送数据:" , msg , "到", s.getpeername() & & & & & &s.send(msg) & &for s in exceptional: & & & &print "异常连接:", s.getpeername() & & & &inputs.remove(s) & & & &if s in outputs: & & & & & &outputs.remove(s) & & & &s.close() & & & &del message_queues[s]'''''' & &进程和线程的区别和关系: & &对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开一个Word就启动了一个Word进程。 & &有些进程还不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)。 & &由于每个进程至少要干一件事,所以,一个进程至少有一个线程。当然,像Word这种复杂的进程可以有多个线程,多个线程可以同时执行,多线程的执行方式和多进程是一样的,也是由操作系统在多个线程之间快速切换,让每个线程都短暂地交替运行,看起来就像同时执行一样。当然,真正地同时执行多线程需要多核CPU才可能实现。 & &线程是最小的执行单元,而进程由至少一个线程组成。如何调度进程和线程,完全由操作系统决定,程序自己不能决定什么时候执行,执行多长时间。''''''python的进程 & &multiprocessing包的组件Process, Queue, Pipe, Lock等组件提供了与多线程类似的功能。使用这些组件,可以方便地编写多进程并发程序。''''''Queue队列 & &Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。 & &get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。from multiprocessing import Process, Queuedef offer(queue): & &queue.put("Hello World")if __name__ == '__main__': & &q = Queue() & &p = Process(target=offer, args=(q,)) & &p.start() & &print q.get()''''''Pipes管道 & &Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值)那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息 & &send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。from multiprocessing import Process, Pipedef send(conn): & &conn.send("Hello World") & &conn.close()if __name__ == '__main__': & &parent_conn, child_conn = Pipe() & &p = Process(target=send, args=(child_conn,)) & &p.start() & &print parent_conn.recv()''''''创建进程示例#!/usr/bin/env python# -*- coding:utf-8 -*-from multiprocessing import Processimport osdef run_proc(name): & &print 'Run child process %s (%s)...' % (name, os.getpid())if __name__=='__main__': & &print 'Parent process %s.' % os.getpid() & &p = Process(target=run_proc, args=('test',)) & &print 'Process will start.' & &p.start() & &print 'Process end.'创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动。注意:由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销。''''''进程锁示例from multiprocessing import Process, Array, RLockdef Foo(lock,temp,i): & &""" & &将第0个数加100 & &""" & &lock.acquire() & &temp[0] = 100+i & &for item in temp: & & & &print i,'-----&',item & &lock.release()lock = RLock()temp = Array('i', [11, 22, 33, 44])for i in range(20): & &p = Process(target=Foo,args=(lock,temp,i,)) & &p.start()''''''进程池示例 & &在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。 & &Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。#!/usr/bin/env python#coding:utf-8from multiprocessing import Poolimport os, time, randomdef long_time_task(name): & &print 'Run task %s (%s)...' % (name, os.getpid()) & &start = time.time() & &time.sleep(random.random() * 3) & &end = time.time() & &print 'Task %s runs %0.2f seconds.' % (name, (end - start))if __name__=='__main__': & &print 'Parent process %s.' % os.getpid() & &p = Pool(4) & &for i in range(5): & & & &p.apply_async(long_time_task, args=(i,)) & &print 'Waiting for all subprocesses done...' & &p.close() & &p.join() & &print 'All subprocesses done.'join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。 & &task 0,1,2,3是立刻执行的,而task 4要等待前面某个task完成后才执行,这是因为Pool的默认大小在我的电脑上是4,因此,最多同时执行4个进程。''''''进程间共享数据#!/usr/bin/env python# -*- coding:utf-8 -*-from multiprocessing import Process, Queueimport os, time, random# 写数据进程执行的代码:def write(q): & &for value in ['A', 'B', 'C']: & & & &print 'Put %s to queue...' % value & & & &q.put(value) & & & &time.sleep(random.random())# 读数据进程执行的代码:def read(q): & &while True: & & & &value = q.get(True) & & & &print 'Get %s from queue.' % valueif __name__=='__main__': & &# 父进程创建Queue,并传给各个子进程: & &q = Queue() & &pw = Process(target=write, args=(q,)) & &pr = Process(target=read, args=(q,)) & &# 启动子进程pw,写入: & &pw.start() & &# 启动子进程pr,读取: & &pr.start() & &# 等待pw结束: & &pw.join() & &# pr进程里是死循环,无法等待其结束,只能强行终止: & &pr.terminate()进程间默认无法共享数据''''''Python的线程 & &多任务可以由多进程完成,也可以由一个进程内的多线程完成。进程是由若干线程组成的,一个进程至少有一个线程。 & &Python的标准库提供了两个模块:thread和threading,thread是低级模块,threading是高级模块,对thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行''''''python的多线程模块:threading & &Thread & & & & & & & & &#线程执行的对象 & & & &start & & & & & & & 线程准备就绪,等待CPU调度 & & & &setName & & & & & & 为线程设置名称 & & & &getName & & & & & & 获取线程名称 & & & &setDaemon & & & & & 设置为后台线程或前台线程(默认) & & & & & & & & & & & & & &如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不 & & & & & & & & & & & & & &论成功与否,均停止如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行 & & & & & & & & & & & & & &完毕后,等待前台线程也执行完成后,程序停止 & & & &join & & & & & & & &逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义 & & & &run & & & & & & & & 线程被cpu调度后执行Thread类对象的run方法 & &Rlock & & & & & & & & & #线程锁:可重入锁对象.使单线程可以在此获得已获得了的锁(递归锁定) & & & &acquire & & & & & & 为线程加锁 & & & &release & & & & & & 为线程解锁 & &Event & & & & & & & & & #python线程的事件用于主线程控制其他线程的执行。 & & & &set & & & & & & & & 将全局变量设置为True & & & &wait & & & & & & & &事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 & & & & & & & & & & & & & &event.wait方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞 & & & &clear & & & & & & & 将全局变量设置为False & &Semaphore & & & & & & & 为等待锁的线程提供一个类似等候室的结构 & &BoundedSemaphore & & & &与Semaphore类似,只是不允许超过初始值 & &Time & & & & & & & & & &与Thread相似,只是他要等待一段时间后才开始运行 & &activeCount() & & & & & 当前活动的线程对象的数量 & &currentThread() & & & & 返回当前线程对象 & &enumerate() & & & & & & 返回当前活动线程的列表 & &settrace(func) & & & & &为所有线程设置一个跟踪函数 & &setprofile(func) & & & &为所有线程设置一个profile函数''''''线程示例#!/usr/bin/env python#coding:utf-8import threadingimport timedef show(arg): & &time.sleep(1) & &print 'thread'+str(arg)for i in range(10): & &t = threading.Thread(target=show, args=(i,)) & &t.start()print 'main thread stop'''''''线程锁示例 & &多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。#!/usr/bin/env python#coding:utf-8import threadingimport timegl_num = 0def show(arg): & &global gl_num & &time.sleep(1) & &gl_num +=1 & &print gl_numfor i in range(10): & &t = threading.Thread(target=show, args=(i,)) & &t.start()print 'main thread stop'由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程如果按上例的话会出现一种情况多个线程同时修改一份内存资源,造成数据的修改混乱那么线程锁可以解决这个问题#!/usr/bin/env python#coding:utf-8import threadingimport timegl_num = 0lock=threading.RLock()def show(arg): & &lock.acquire() & &global gl_num & &time.sleep(1) & &gl_num +=1 & &print gl_num & &lock.release()for i in range(10): & &t = threading.Thread(target=show, args=(i,)) & &t.start()print 'main thread stop' & &因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。 & &GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。 & &不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。 & &多线程编程,模型复杂,容易发生冲突,必须用锁加以隔离,同时,又要小心死锁的发生。 & &Python解释器由于设计时有GIL全局锁,导致了多线程无法利用多核。多线程的并发在Python中就是一个美丽的梦。''''''线程的事件示例#!/usr/bin/env python# -*- coding:utf-8 -*-import threadingdef do(event): & &print 'start' & &event.wait() & &print 'execute'event_obj = threading.Event()for i in range(10): & &t = threading.Thread(target=do, args=(event_obj,)) & &t.start()event_obj.clear()inp = raw_input('input:')if inp == 'true': & &event_obj.set()''''''协程简介 & &线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。 & &协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。 & &协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;''''''协程示例#!/usr/bin/env python# -*- coding:utf-8 -*-from greenlet import greenletdef test1(): & &print 12 & &gr2.switch() & &print 34 & &gr2.switch()def test2(): & &print 56 & &gr1.switch() & &print 78gr1 = greenlet(test1)gr2 = greenlet(test2)gr1.switch()''''''进程vs线程 & &我们可以把任务分为计算密集型和IO密集型。 & &计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。 & &计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。用Python的话适合多进程第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。这时候不需要cpu做过多的计算,应当用多线程。'''
了这篇文章
类别:未分类┆阅读(0)┆评论(0)您好,欢迎光临崔庆才的博客,欢迎来稿,质量优秀我会主动与您联系并升级为专栏作者,让您的文章被更多人看到,谢谢。
> Python爬虫进阶六之多进程的用法
在上一节中介绍了thread多线程库。python中的多线程其实并不是真正的多线程,并不能做到充分利用多核CPU资源。
如果想要充分利用,在python中大部分情况需要使用多进程,那么这个包就叫做 multiprocessing。
借助它,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
那么本节要介绍的内容有:
在multiprocessing中,每一个进程都用一个Process类来表示。首先看下它的API
Process([group [, target [, name [, args [, kwargs]]]]])
Process([group [, target [, name [, args [, kwargs]]]]])
target表示调用对象,你可以传入方法的名字
args表示被调用对象的位置参数元组,比如target是函数a,他有两个参数m,n,那么args就传入(m, n)即可
kwargs表示调用对象的字典
name是别名,相当于给这个进程取一个名字
group分组,实际上不使用
我们先用一个实例来感受一下:
import multiprocessing
def process(num):
print 'Process:', num
if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=process, args=(i,))
import multiprocessing&def process(num):&&&&print 'Process:', num&if __name__ == '__main__':&&&&for i in range(5):&&&&&&&&p = multiprocessing.Process(target=process, args=(i,))&&&&&&&&p.start()
最简单的创建Process的过程如上所示,target传入函数名,args是函数的参数,是元组的形式,如果只有一个参数,那就是长度为1的元组。
然后调用start()方法即可启动多个进程了。
另外你还可以通过 cpu_count() 方法还有 active_children() 方法获取当前机器的 CPU 核心数量以及得到目前所有的运行的进程。
通过一个实例来感受一下:
import multiprocessing
import time
def process(num):
time.sleep(num)
print 'Process:', num
if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=process, args=(i,))
print('CPU number:' + str(multiprocessing.cpu_count()))
for p in multiprocessing.active_children():
print('Child process name: ' + p.name + ' id: ' + str(p.pid))
print('Process Ended')
1234567891011121314151617
import multiprocessingimport time&def process(num):&&&&time.sleep(num)&&&&print 'Process:', num&if __name__ == '__main__':&&&&for i in range(5):&&&&&&&&p = multiprocessing.Process(target=process, args=(i,))&&&&&&&&p.start()&&&&&print('CPU number:' + str(multiprocessing.cpu_count()))&&&&for p in multiprocessing.active_children():&&&&&&&&print('Child process name: ' + p.name + ' id: ' + str(p.pid))&&&&&print('Process Ended')
运行结果:
Process: 0
CPU number:8
Child process name: Process-2 id: 9641
Child process name: Process-4 id: 9643
Child process name: Process-5 id: 9644
Child process name: Process-3 id: 9642
Process Ended
Process: 1
Process: 2
Process: 3
Process: 4
1234567891011
Process: 0CPU number:8Child process name: Process-2 id: 9641Child process name: Process-4 id: 9643Child process name: Process-5 id: 9644Child process name: Process-3 id: 9642Process EndedProcess: 1Process: 2Process: 3Process: 4
另外你还可以继承Process类,自定义进程类,实现run方法即可。
用一个实例来感受一下:
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop
def run(self):
for count in range(self.loop):
time.sleep(1)
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
if __name__ == '__main__':
for i in range(2, 5):
p = MyProcess(i)
12345678910111213141516171819
from multiprocessing import Processimport time&&class MyProcess(Process):&&&&def __init__(self, loop):&&&&&&&&Process.__init__(self)&&&&&&&&self.loop = loop&&&&&def run(self):&&&&&&&&for count in range(self.loop):&&&&&&&&&&&&time.sleep(1)&&&&&&&&&&&&print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))&&if __name__ == '__main__':&&&&for i in range(2, 5):&&&&&&&&p = MyProcess(i)&&&&&&&&p.start()
在上面的例子中,我们继承了 Process 这个类,然后实现了run方法。打印出来了进程号和参数。
运行结果:
Pid: 28116 LoopCount: 0
Pid: 28117 LoopCount: 0
Pid: 28118 LoopCount: 0
Pid: 28116 LoopCount: 1
Pid: 28117 LoopCount: 1
Pid: 28118 LoopCount: 1
Pid: 28117 LoopCount: 2
Pid: 28118 LoopCount: 2
Pid: 28118 LoopCount: 3
Pid: 28116 LoopCount: 0Pid: 28117 LoopCount: 0Pid: 28118 LoopCount: 0Pid: 28116 LoopCount: 1Pid: 28117 LoopCount: 1Pid: 28118 LoopCount: 1Pid: 28117 LoopCount: 2Pid: 28118 LoopCount: 2Pid: 28118 LoopCount: 3
可以看到,三个进程分别打印出了2、3、4条结果。
我们可以把一些方法独立的写在每个类里封装好,等用的时候直接初始化一个类运行即可。
在这里介绍一个属性,叫做deamon。每个线程都可以单独设置它的属性,如果设置为True,当父进程结束后,子进程会自动被终止。
用一个实例来感受一下,还是原来的例子,增加了deamon属性:
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop
def run(self):
for count in range(self.loop):
time.sleep(1)
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
if __name__ == '__main__':
for i in range(2, 5):
p = MyProcess(i)
p.daemon = True
print 'Main process Ended!'
1234567891011121314151617181920212223
from multiprocessing import Processimport time&&class MyProcess(Process):&&&&def __init__(self, loop):&&&&&&&&Process.__init__(self)&&&&&&&&self.loop = loop&&&&&def run(self):&&&&&&&&for count in range(self.loop):&&&&&&&&&&&&time.sleep(1)&&&&&&&&&&&&print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))&&if __name__ == '__main__':&&&&for i in range(2, 5):&&&&&&&&p = MyProcess(i)&&&&&&&&p.daemon = True&&&&&&&&p.start()&&&&&&print 'Main process Ended!'
在这里,调用的时候增加了设置deamon,最后的主进程(即父进程)打印输出了一句话。
运行结果:
Main process Ended!
Main process Ended!
结果很简单,因为主进程没有做任何事情,直接输出一句话结束,所以在这时也直接终止了子进程的运行。
这样可以有效防止无控制地生成子进程。如果这样写了,你在关闭这个主程序运行时,就无需额外担心子进程有没有被关闭了。
不过这样并不是我们想要达到的效果呀,能不能让所有子进程都执行完了然后再结束呢?那当然是可以的,只需要加入join()方法即可。
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop
def run(self):
for count in range(self.loop):
time.sleep(1)
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
if __name__ == '__main__':
for i in range(2, 5):
p = MyProcess(i)
p.daemon = True
print 'Main process Ended!'
123456789101112131415161718192021222324
from multiprocessing import Processimport time&&class MyProcess(Process):&&&&def __init__(self, loop):&&&&&&&&Process.__init__(self)&&&&&&&&self.loop = loop&&&&&def run(self):&&&&&&&&for count in range(self.loop):&&&&&&&&&&&&time.sleep(1)&&&&&&&&&&&&print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))&&if __name__ == '__main__':&&&&for i in range(2, 5):&&&&&&&&p = MyProcess(i)&&&&&&&&p.daemon = True&&&&&&&&p.start()&&&&&&&&p.join()&&&&&&print 'Main process Ended!'
在这里,每个子进程都调用了join()方法,这样父进程(主进程)就会等待子进程执行完毕。
运行结果:
Pid: 29902 LoopCount: 0
Pid: 29902 LoopCount: 1
Pid: 29905 LoopCount: 0
Pid: 29905 LoopCount: 1
Pid: 29905 LoopCount: 2
Pid: 29912 LoopCount: 0
Pid: 29912 LoopCount: 1
Pid: 29912 LoopCount: 2
Pid: 29912 LoopCount: 3
Main process Ended!
12345678910
Pid: 29902 LoopCount: 0Pid: 29902 LoopCount: 1Pid: 29905 LoopCount: 0Pid: 29905 LoopCount: 1Pid: 29905 LoopCount: 2Pid: 29912 LoopCount: 0Pid: 29912 LoopCount: 1Pid: 29912 LoopCount: 2Pid: 29912 LoopCount: 3Main process Ended!
发现所有子进程都执行完毕之后,父进程最后打印出了结束的结果。
在上面的一些小实例中,你可能会遇到如下的运行结果:
什么问题?有的输出错位了。这是由于并行导致的,两个进程同时进行了输出,结果第一个进程的换行没有来得及输出,第二个进程就输出了结果。所以导致这种排版的问题。
那这归根结底是因为线程同时资源(输出操作)而导致的。
那怎么来避免这种问题?那自然是在某一时间,只能一个进程输出,其他进程等待。等刚才那个进程输出完毕之后,另一个进程再进行输出。这种现象就叫做“互斥”。
我们可以通过 Lock 来实现,在一个进程输出时,加锁,其他进程等待。等此进程执行结束后,释放锁,其他进程可以进行输出。
我们现用一个实例来感受一下:
from multiprocessing import Process, Lock
import time
class MyProcess(Process):
def __init__(self, loop, lock):
Process.__init__(self)
self.loop = loop
self.lock = lock
def run(self):
for count in range(self.loop):
time.sleep(0.1)
#self.lock.acquire()
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
#self.lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(10, 15):
p = MyProcess(i, lock)
12345678910111213141516171819202122
from multiprocessing import Process, Lockimport time&&class MyProcess(Process):&&&&def __init__(self, loop, lock):&&&&&&&&Process.__init__(self)&&&&&&&&self.loop = loop&&&&&&&&self.lock = lock&&&&&def run(self):&&&&&&&&for count in range(self.loop):&&&&&&&&&&&&time.sleep(0.1)&&&&&&&&&&&&#self.lock.acquire()&&&&&&&&&&&&print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))&&&&&&&&&&&&#self.lock.release()&if __name__ == '__main__':&&&&lock = Lock()&&&&for i in range(10, 15):&&&&&&&&p = MyProcess(i, lock)&&&&&&&&p.start()
首先看一下不加锁的输出结果:
Pid: 45755 LoopCount: 0
Pid: 45756 LoopCount: 0
Pid: 45757 LoopCount: 0
Pid: 45758 LoopCount: 0
Pid: 45759 LoopCount: 0
Pid: 45755 LoopCount: 1
Pid: 45756 LoopCount: 1
Pid: 45757 LoopCount: 1
Pid: 45758 LoopCount: 1
Pid: 45759 LoopCount: 1
Pid: 45755 LoopCount: 2Pid: 45756 LoopCount: 2
Pid: 45757 LoopCount: 2
Pid: 45758 LoopCount: 2
Pid: 45759 LoopCount: 2
Pid: 45756 LoopCount: 3
Pid: 45755 LoopCount: 3
Pid: 45757 LoopCount: 3
Pid: 45758 LoopCount: 3
Pid: 45759 LoopCount: 3
Pid: 45755 LoopCount: 4
Pid: 45756 LoopCount: 4
Pid: 45757 LoopCount: 4
Pid: 45759 LoopCount: 4
Pid: 45758 LoopCount: 4
Pid: 45756 LoopCount: 5
Pid: 45755 LoopCount: 5
Pid: 45757 LoopCount: 5
Pid: 45759 LoopCount: 5
Pid: 45758 LoopCount: 5
Pid: 45756 LoopCount: 6Pid: 45755 LoopCount: 6
Pid: 45757 LoopCount: 6
Pid: 45759 LoopCount: 6
Pid: 45758 LoopCount: 6
Pid: 45755 LoopCount: 7Pid: 45756 LoopCount: 7
Pid: 45757 LoopCount: 7
Pid: 45758 LoopCount: 7
Pid: 45759 LoopCount: 7
Pid: 45756 LoopCount: 8Pid: 45755 LoopCount: 8
Pid: 45757 LoopCount: 8
Pid: 45758 LoopCount: 8Pid: 45759 LoopCount: 8
Pid: 45755 LoopCount: 9
Pid: 45756 LoopCount: 9
Pid: 45757 LoopCount: 9
Pid: 45758 LoopCount: 9
Pid: 45759 LoopCount: 9
Pid: 45756 LoopCount: 10
Pid: 45757 LoopCount: 10
Pid: 45758 LoopCount: 10
Pid: 45759 LoopCount: 10
Pid: 45757 LoopCount: 11
Pid: 45758 LoopCount: 11
Pid: 45759 LoopCount: 11
Pid: 45758 LoopCount: 12
Pid: 45759 LoopCount: 12
Pid: 45759 LoopCount: 13
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
Pid: 45755 LoopCount: 0Pid: 45756 LoopCount: 0Pid: 45757 LoopCount: 0Pid: 45758 LoopCount: 0Pid: 45759 LoopCount: 0Pid: 45755 LoopCount: 1Pid: 45756 LoopCount: 1Pid: 45757 LoopCount: 1Pid: 45758 LoopCount: 1Pid: 45759 LoopCount: 1Pid: 45755 LoopCount: 2Pid: 45756 LoopCount: 2&Pid: 45757 LoopCount: 2Pid: 45758 LoopCount: 2Pid: 45759 LoopCount: 2Pid: 45756 LoopCount: 3Pid: 45755 LoopCount: 3Pid: 45757 LoopCount: 3Pid: 45758 LoopCount: 3Pid: 45759 LoopCount: 3Pid: 45755 LoopCount: 4Pid: 45756 LoopCount: 4Pid: 45757 LoopCount: 4Pid: 45759 LoopCount: 4Pid: 45758 LoopCount: 4Pid: 45756 LoopCount: 5Pid: 45755 LoopCount: 5Pid: 45757 LoopCount: 5Pid: 45759 LoopCount: 5Pid: 45758 LoopCount: 5Pid: 45756 LoopCount: 6Pid: 45755 LoopCount: 6&Pid: 45757 LoopCount: 6Pid: 45759 LoopCount: 6Pid: 45758 LoopCount: 6Pid: 45755 LoopCount: 7Pid: 45756 LoopCount: 7&Pid: 45757 LoopCount: 7Pid: 45758 LoopCount: 7Pid: 45759 LoopCount: 7Pid: 45756 LoopCount: 8Pid: 45755 LoopCount: 8&Pid: 45757 LoopCount: 8Pid: 45758 LoopCount: 8Pid: 45759 LoopCount: 8&Pid: 45755 LoopCount: 9Pid: 45756 LoopCount: 9Pid: 45757 LoopCount: 9Pid: 45758 LoopCount: 9Pid: 45759 LoopCount: 9Pid: 45756 LoopCount: 10Pid: 45757 LoopCount: 10Pid: 45758 LoopCount: 10Pid: 45759 LoopCount: 10Pid: 45757 LoopCount: 11Pid: 45758 LoopCount: 11Pid: 45759 LoopCount: 11Pid: 45758 LoopCount: 12Pid: 45759 LoopCount: 12Pid: 45759 LoopCount: 13
可以看到有些输出已经造成了影响。
然后我们对其加锁:
from multiprocessing import Process, Lock
import time
class MyProcess(Process):
def __init__(self, loop, lock):
Process.__init__(self)
self.loop = loop
self.lock = lock
def run(self):
for count in range(self.loop):
time.sleep(0.1)
self.lock.acquire()
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
self.lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(10, 15):
p = MyProcess(i, lock)
12345678910111213141516171819202122
from multiprocessing import Process, Lockimport time&&class MyProcess(Process):&&&&def __init__(self, loop, lock):&&&&&&&&Process.__init__(self)&&&&&&&&self.loop = loop&&&&&&&&self.lock = lock&&&&&def run(self):&&&&&&&&for count in range(self.loop):&&&&&&&&&&&&time.sleep(0.1)&&&&&&&&&&&&self.lock.acquire()&&&&&&&&&&&&print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))&&&&&&&&&&&&self.lock.release()&if __name__ == '__main__':&&&&lock = Lock()&&&&for i in range(10, 15):&&&&&&&&p = MyProcess(i, lock)&&&&&&&&p.start()
我们在print方法的前后分别添加了获得锁和释放锁的操作。这样就能保证在同一时间只有一个print操作。
看一下运行结果:
Pid: 45889 LoopCount: 0
Pid: 45890 LoopCount: 0
Pid: 45891 LoopCount: 0
Pid: 45892 LoopCount: 0
Pid: 45893 LoopCount: 0
Pid: 45889 LoopCount: 1
Pid: 45890 LoopCount: 1
Pid: 45891 LoopCount: 1
Pid: 45892 LoopCount: 1
Pid: 45893 LoopCount: 1
Pid: 45889 LoopCount: 2
Pid: 45890 LoopCount: 2
Pid: 45891 LoopCount: 2
Pid: 45892 LoopCount: 2
Pid: 45893 LoopCount: 2
Pid: 45889 LoopCount: 3
Pid: 45890 LoopCount: 3
Pid: 45891 LoopCount: 3
Pid: 45892 LoopCount: 3
Pid: 45893 LoopCount: 3
Pid: 45889 LoopCount: 4
Pid: 45890 LoopCount: 4
Pid: 45891 LoopCount: 4
Pid: 45892 LoopCount: 4
Pid: 45893 LoopCount: 4
Pid: 45889 LoopCount: 5
Pid: 45890 LoopCount: 5
Pid: 45891 LoopCount: 5
Pid: 45892 LoopCount: 5
Pid: 45893 LoopCount: 5
Pid: 45889 LoopCount: 6
Pid: 45890 LoopCount: 6
Pid: 45891 LoopCount: 6
Pid: 45893 LoopCount: 6
Pid: 45892 LoopCount: 6
Pid: 45889 LoopCount: 7
Pid: 45890 LoopCount: 7
Pid: 45891 LoopCount: 7
Pid: 45892 LoopCount: 7
Pid: 45893 LoopCount: 7
Pid: 45889 LoopCount: 8
Pid: 45890 LoopCount: 8
Pid: 45891 LoopCount: 8
Pid: 45892 LoopCount: 8
Pid: 45893 LoopCount: 8
Pid: 45889 LoopCount: 9
Pid: 45890 LoopCount: 9
Pid: 45891 LoopCount: 9
Pid: 45892 LoopCount: 9
Pid: 45893 LoopCount: 9
Pid: 45890 LoopCount: 10
Pid: 45891 LoopCount: 10
Pid: 45892 LoopCount: 10
Pid: 45893 LoopCount: 10
Pid: 45891 LoopCount: 11
Pid: 45892 LoopCount: 11
Pid: 45893 LoopCount: 11
Pid: 45893 LoopCount: 12
Pid: 45892 LoopCount: 12
Pid: 45893 LoopCount: 13
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
Pid: 45889 LoopCount: 0Pid: 45890 LoopCount: 0Pid: 45891 LoopCount: 0Pid: 45892 LoopCount: 0Pid: 45893 LoopCount: 0Pid: 45889 LoopCount: 1Pid: 45890 LoopCount: 1Pid: 45891 LoopCount: 1Pid: 45892 LoopCount: 1Pid: 45893 LoopCount: 1Pid: 45889 LoopCount: 2Pid: 45890 LoopCount: 2Pid: 45891 LoopCount: 2Pid: 45892 LoopCount: 2Pid: 45893 LoopCount: 2Pid: 45889 LoopCount: 3Pid: 45890 LoopCount: 3Pid: 45891 LoopCount: 3Pid: 45892 LoopCount: 3Pid: 45893 LoopCount: 3Pid: 45889 LoopCount: 4Pid: 45890 LoopCount: 4Pid: 45891 LoopCount: 4Pid: 45892 LoopCount: 4Pid: 45893 LoopCount: 4Pid: 45889 LoopCount: 5Pid: 45890 LoopCount: 5Pid: 45891 LoopCount: 5Pid: 45892 LoopCount: 5Pid: 45893 LoopCount: 5Pid: 45889 LoopCount: 6Pid: 45890 LoopCount: 6Pid: 45891 LoopCount: 6Pid: 45893 LoopCount: 6Pid: 45892 LoopCount: 6Pid: 45889 LoopCount: 7Pid: 45890 LoopCount: 7Pid: 45891 LoopCount: 7Pid: 45892 LoopCount: 7Pid: 45893 LoopCount: 7Pid: 45889 LoopCount: 8Pid: 45890 LoopCount: 8Pid: 45891 LoopCount: 8Pid: 45892 LoopCount: 8Pid: 45893 LoopCount: 8Pid: 45889 LoopCount: 9Pid: 45890 LoopCount: 9Pid: 45891 LoopCount: 9Pid: 45892 LoopCount: 9Pid: 45893 LoopCount: 9Pid: 45890 LoopCount: 10Pid: 45891 LoopCount: 10Pid: 45892 LoopCount: 10Pid: 45893 LoopCount: 10Pid: 45891 LoopCount: 11Pid: 45892 LoopCount: 11Pid: 45893 LoopCount: 11Pid: 45893 LoopCount: 12Pid: 45892 LoopCount: 12Pid: 45893 LoopCount: 13
嗯,一切都没问题了。
所以在访问临界资源时,使用Lock就可以避免进程同时占用资源而导致的一些问题。
信号量,是在进程同步过程中一个比较重要的角色。可以控制临界资源的数量,保证各个进程之间的互斥和同步。
如果你学过操作系统,那么一定对这方面非常了解,如果你还不了解信号量是什么,可以参考
来了解一下它是做什么的。
那么接下来我们就用一个实例来演示一下进程之间利用Semaphore做到同步和互斥,以及控制临界资源数量。
from multiprocessing import Process, Semaphore, Lock, Queue
import time
buffer = Queue(10)
empty = Semaphore(2)
full = Semaphore(0)
lock = Lock()
class Consumer(Process):
def run(self):
global buffer, empty, full, lock
while True:
full.acquire()
lock.acquire()
buffer.get()
print('Consumer pop an element')
time.sleep(1)
lock.release()
empty.release()
class Producer(Process):
def run(self):
global buffer, empty, full, lock
while True:
empty.acquire()
lock.acquire()
buffer.put(1)
print('Producer append an element')
time.sleep(1)
lock.release()
full.release()
if __name__ == '__main__':
p = Producer()
c = Consumer()
p.daemon = c.daemon = True
print 'Ended!'
1234567891011121314151617181920212223242526272829303132333435363738394041424344
from multiprocessing import Process, Semaphore, Lock, Queueimport time&buffer = Queue(10)empty = Semaphore(2)full = Semaphore(0)lock = Lock()&class Consumer(Process):&&&&&def run(self):&&&&&&&&global buffer, empty, full, lock&&&&&&&&while True:&&&&&&&&&&&&full.acquire()&&&&&&&&&&&&lock.acquire()&&&&&&&&&&&&buffer.get()&&&&&&&&&&&&print('Consumer pop an element')&&&&&&&&&&&&time.sleep(1)&&&&&&&&&&&&lock.release()&&&&&&&&&&&&empty.release()&&class Producer(Process):&&&&def run(self):&&&&&&&&global buffer, empty, full, lock&&&&&&&&while True:&&&&&&&&&&&&empty.acquire()&&&&&&&&&&&&lock.acquire()&&&&&&&&&&&&buffer.put(1)&&&&&&&&&&&&print('Producer append an element')&&&&&&&&&&&&time.sleep(1)&&&&&&&&&&&&lock.release()&&&&&&&&&&&&full.release()&&if __name__ == '__main__':&&&&p = Producer()&&&&c = Consumer()&&&&p.daemon = c.daemon = True&&&&p.start()&&&&c.start()&&&&p.join()&&&&c.join()&&&&print 'Ended!'
如上代码实现了注明的生产者和消费者问题,定义了两个进程类,一个是消费者,一个是生产者。
定义了一个共享队列,利用了Queue数据结构,然后定义了两个信号量,一个代表缓冲区空余数,一个表示缓冲区占用数。
生产者Producer使用empty.acquire()方法来占用一个缓冲区位置,然后缓冲区空闲区大小减小1,接下来进行加锁,对缓冲区进行操作。然后释放锁,然后让代表占用的缓冲区位置数量+1,消费者则相反。
运行结果如下:
Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
1234567891011121314
Producer append an elementProducer append an elementConsumer pop an elementConsumer pop an elementProducer append an elementProducer append an elementConsumer pop an elementConsumer pop an elementProducer append an elementProducer append an elementConsumer pop an elementConsumer pop an elementProducer append an elementProducer append an element
可以发现两个进程在交替运行,生产者先放入缓冲区物品,然后消费者取出,不停地进行循环。
通过上面的例子来体会一下信号量的用法。
在上面的例子中我们使用了Queue,可以作为进程通信的共享队列使用。
在上面的程序中,如果你把Queue换成普通的list,是完全起不到效果的。即使在一个进程中改变了这个list,在另一个进程也不能获取到它的状态。
因此进程间的通信,队列需要用Queue。当然这里的队列指的是 multiprocessing.Queue
依然是用上面那个例子,我们一个进程向队列中放入数据,然后另一个进程取出数据。
from multiprocessing import Process, Semaphore, Lock, Queue
import time
from random import random
buffer = Queue(10)
empty = Semaphore(2)
full = Semaphore(0)
lock = Lock()
class Consumer(Process):
def run(self):
global buffer, empty, full, lock
while True:
full.acquire()
lock.acquire()
print 'Consumer get', buffer.get()
time.sleep(1)
lock.release()
empty.release()
class Producer(Process):
def run(self):
global buffer, empty, full, lock
while True:
empty.acquire()
lock.acquire()
num = random()
print 'Producer put ', num
buffer.put(num)
time.sleep(1)
lock.release()
full.release()
if __name__ == '__main__':
p = Producer()
c = Consumer()
p.daemon = c.daemon = True
print 'Ended!'
123456789101112131415161718192021222324252627282930313233343536373839404142434445
from multiprocessing import Process, Semaphore, Lock, Queueimport timefrom random import random&buffer = Queue(10)empty = Semaphore(2)full = Semaphore(0)lock = Lock()&class Consumer(Process):&&&&&def run(self):&&&&&&&&global buffer, empty, full, lock&&&&&&&&while True:&&&&&&&&&&&&full.acquire()&&&&&&&&&&&&lock.acquire()&&&&&&&&&&&&print 'Consumer get', buffer.get()&&&&&&&&&&&&time.sleep(1)&&&&&&&&&&&&lock.release()&&&&&&&&&&&&empty.release()&&class Producer(Process):&&&&def run(self):&&&&&&&&global buffer, empty, full, lock&&&&&&&&while True:&&&&&&&&&&&&empty.acquire()&&&&&&&&&&&&lock.acquire()&&&&&&&&&&&&num = random()&&&&&&&&&&&&print 'Producer put ', num&&&&&&&&&&&&buffer.put(num)&&&&&&&&&&&&time.sleep(1)&&&&&&&&&&&&lock.release()&&&&&&&&&&&&full.release()&&if __name__ == '__main__':&&&&p = Producer()&&&&c = Consumer()&&&&p.daemon = c.daemon = True&&&&p.start()&&&&c.start()&&&&p.join()&&&&c.join()&&&&print 'Ended!'
运行结果:
Producer put
Producer put
Consumer get 0.
Consumer get 0.
Producer put
Producer put
Consumer get 0.
Consumer get 0.
Producer put&&0.Producer put&&0.Consumer get 0.Consumer get 0.Producer put&&0.Producer put&&0.Consumer get 0.Consumer get 0.
可以看到生产者放入队列中数据,然后消费者将数据取出来。
get方法有两个参数,blocked和timeout,意思为阻塞和超时时间。默认blocked是true,即阻塞式。
当一个队列为空的时候如果再用get取则会阻塞,所以这时候就需要吧blocked设置为false,即非阻塞式,实际上它就会调用get_nowait()方法,此时还需要设置一个超时时间,在这么长的时间内还没有取到队列元素,那就抛出Queue.Empty异常。
当一个队列为满的时候如果再用put放则会阻塞,所以这时候就需要吧blocked设置为false,即非阻塞式,实际上它就会调用put_nowait()方法,此时还需要设置一个超时时间,在这么长的时间内还没有放进去元素,那就抛出Queue.Full异常。
另外队列中常用的方法
Queue.qsize() 返回队列的大小 ,不过在 Mac OS 上没法运行。
def qsize(self):
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
return self._maxsize – self._sem._semlock._get_value()
Queue.empty() 如果队列为空,返回True, 反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.get([block[, timeout]]) 获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
Queue.put(item) 阻塞式写入队列,timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)
管道,顾名思义,一端发一端收。
Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。
用一个实例来感受一下:
from multiprocessing import Process, Pipe
class Consumer(Process):
def __init__(self, pipe):
Process.__init__(self)
self.pipe = pipe
def run(self):
self.pipe.send('Consumer Words')
print 'Consumer Received:', self.pipe.recv()
class Producer(Process):
def __init__(self, pipe):
Process.__init__(self)
self.pipe = pipe
def run(self):
print 'Producer Received:', self.pipe.recv()
self.pipe.send('Producer Words')
if __name__ == '__main__':
pipe = Pipe()
p = Producer(pipe[0])
c = Consumer(pipe[1])
p.daemon = c.daemon = True
print 'Ended!'
123456789101112131415161718192021222324252627282930313233
from multiprocessing import Process, Pipe&&class Consumer(Process):&&&&def __init__(self, pipe):&&&&&&&&Process.__init__(self)&&&&&&&&self.pipe = pipe&&&&&def run(self):&&&&&&&&self.pipe.send('Consumer Words')&&&&&&&&print 'Consumer Received:', self.pipe.recv()&&class Producer(Process):&&&&def __init__(self, pipe):&&&&&&&&Process.__init__(self)&&&&&&&&self.pipe = pipe&&&&&def run(self):&&&&&&&&print 'Producer Received:', self.pipe.recv()&&&&&&&&self.pipe.send('Producer Words')&&if __name__ == '__main__':&&&&pipe = Pipe()&&&&p = Producer(pipe[0])&&&&c = Consumer(pipe[1])&&&&p.daemon = c.daemon = True&&&&p.start()&&&&c.start()&&&&p.join()&&&&c.join()&&&&print 'Ended!'
在这里声明了一个默认为双向的管道,然后将管道的两端分别传给两个进程。两个进程互相收发。观察一下结果:
Producer Received: Consumer Words
Consumer Received: Producer Words
Producer Received: Consumer WordsConsumer Received: Producer WordsEnded!
以上是对pipe的简单介绍。
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
在这里需要了解阻塞和非阻塞的概念。
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态。
阻塞即要等到回调结果出来,在有结果之前,当前进程会被挂起。
Pool的用法有阻塞和非阻塞两种方式。非阻塞即为添加进程后,不一定非要等到改进程执行完就添加其他进程运行,阻塞则相反。
现用一个实例感受一下非阻塞的用法:
from multiprocessing import Lock, Pool
import time
def function(index):
print 'Start process: ', index
time.sleep(3)
print 'End process', index
if __name__ == '__main__':
pool = Pool(processes=3)
for i in xrange(4):
pool.apply_async(function, (i,))
print "Started processes"
pool.close()
pool.join()
print "Subprocess done."
12345678910111213141516171819
from multiprocessing import Lock, Poolimport time&&def function(index):&&&&print 'Start process: ', index&&&&time.sleep(3)&&&&print 'End process', index&&if __name__ == '__main__':&&&&pool = Pool(processes=3)&&&&for i in xrange(4):&&&&&&&&pool.apply_async(function, (i,))&&&&&print "Started processes"&&&&pool.close()&&&&pool.join()&&&&print "Subprocess done."
在这里利用了apply_async方法,即非阻塞。
运行结果:
Started processes
Start process: Start process:
Start process:
End processEnd process 0
Start process:
End process 2
End process 3
Subprocess done.
12345678910
Started processesStart process: Start process:&&0 1Start process:&&2End processEnd process 0 1Start process:&&3End process 2End process 3Subprocess done.
可以发现在这里添加三个进程进去后,立马就开始执行,不用非要等到某个进程结束后再添加新的进程进去。
下面再看看阻塞的用法:
from multiprocessing import Lock, Pool
import time
def function(index):
print 'Start process: ', index
time.sleep(3)
print 'End process', index
if __name__ == '__main__':
pool = Pool(processes=3)
for i in xrange(4):
pool.apply(function, (i,))
print "Started processes"
pool.close()
pool.join()
print "Subprocess done."
12345678910111213141516171819
from multiprocessing import Lock, Poolimport time&&def function(index):&&&&print 'Start process: ', index&&&&time.sleep(3)&&&&print 'End process', index&&if __name__ == '__main__':&&&&pool = Pool(processes=3)&&&&for i in xrange(4):&&&&&&&&pool.apply(function, (i,))&&&&&print "Started processes"&&&&pool.close()&&&&pool.join()&&&&print "Subprocess done."
在这里只需要把apply_async改成apply即可。
运行结果如下:
Start process:
End process 0
Start process:
End process 1
Start process:
End process 2
Start process:
End process 3
Started processes
Subprocess done.
12345678910
Start process:&&0End process 0Start process:&&1End process 1Start process:&&2End process 2Start process:&&3End process 3Started processesSubprocess done.
这样一来就好理解了吧?
下面对函数进行解释:
apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的。
close() 关闭pool,使其不在接受新的任务。
terminate() 结束工作进程,不在处理未完成的任务。
join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
当然每个进程可以在各自的方法返回一个结果。apply或apply_async方法可以拿到这个结果并进一步进行处理。
from multiprocessing import Lock, Pool
import time
def function(index):
print 'Start process: ', index
time.sleep(3)
print 'End process', index
return index
if __name__ == '__main__':
pool = Pool(processes=3)
for i in xrange(4):
result = pool.apply_async(function, (i,))
print result.get()
print "Started processes"
pool.close()
pool.join()
print "Subprocess done."
12345678910111213141516171819
from multiprocessing import Lock, Poolimport time&&def function(index):&&&&print 'Start process: ', index&&&&time.sleep(3)&&&&print 'End process', index&&&&return index&if __name__ == '__main__':&&&&pool = Pool(processes=3)&&&&for i in xrange(4):&&&&&&&&result = pool.apply_async(function, (i,))&&&&&&&&print result.get()&&&&print "Started processes"&&&&pool.close()&&&&pool.join()&&&&print "Subprocess done."
运行结果:
Start process:
End process 0
Start process:
End process 1
Start process:
End process 2
Start process:
End process 3
Started processes
Subprocess done.
1234567891011121314
Start process:&&0End process 00Start process:&&1End process 11Start process:&&2End process 22Start process:&&3End process 33Started processesSubprocess done.
另外还有一个非常好用的map方法。
如果你现在有一堆数据要处理,每一项都需要经过一个方法来处理,那么map非常适合。
比如现在你有一个数组,包含了所有的URL,而现在已经有了一个方法用来抓取每个URL内容并解析,那么可以直接在map的第一个参数传入方法名,第二个参数传入URL数组。
现在我们用一个实例来感受一下:
from multiprocessing import Pool
import requests
from requests.exceptions import ConnectionError
def scrape(url):
print requests.get(url)
except ConnectionError:
print 'Error Occured ', url
print 'URL ', url, ' Scraped'
if __name__ == '__main__':
pool = Pool(processes=3)
'http://blog.csdn.net/',
'http://xxxyxxx.net'
pool.map(scrape, urls)
1234567891011121314151617181920212223
from multiprocessing import Poolimport requestsfrom requests.exceptions import ConnectionError&&def scrape(url):&&&&try:&&&&&&&&print requests.get(url)&&&&except ConnectionError:&&&&&&&&print 'Error Occured ', url&&&&finally:&&&&&&&&print 'URL ', url, ' Scraped'&&if __name__ == '__main__':&&&&pool = Pool(processes=3)&&&&urls = [&&&&&&&&'',&&&&&&&&'/',&&&&&&&&'http://blog.csdn.net/',&&&&&&&&'http://xxxyxxx.net'&&&&]&&&&pool.map(scrape, urls)
在这里初始化一个Pool,指定进程数为3,如果不指定,那么会自动根据CPU内核来分配进程数。
然后有一个链接列表,map函数可以遍历每个URL,然后对其分别执行scrape方法。
运行结果:
&Response [403]&
http://blog.csdn.net/
&Response [200]&
Error Occured
http://xxxyxxx.net
http://xxxyxxx.net
&Response [200]&
&Response [403]&URL&&http://blog.csdn.net/&&Scraped&Response [200]&URL&&https://&&ScrapedError Occured&&http://xxxyxxx.netURL&&http://xxxyxxx.net&&Scraped&Response [200]&URL&&http:///&&Scraped
可以看到遍历就这么轻松地实现了。
多进程multiprocessing相比多线程功能强大太多,而且使用范围更广,希望本文对大家有帮助!
转载请注明: &
or分享 (0)
您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请狠狠点击下面的

我要回帖

更多关于 手机支付用户数量 的文章

 

随机推荐