python3 。python进程池启动不了pool使用失败

在 SegmentFault,学习技能、解决问题
每个月,我们帮助 1000 万的开发者解决各种各样的技术问题。并助力他们在技术能力、职业生涯、影响力上获得提升。
问题对人有帮助,内容完整,我也想知道答案
问题没有实际价值,缺少关键内容,没有改进余地
使用Pool进程池并发,然后让将每个进程的任务注册在一个在事件循环中,但Pool没有生效,是哪里出现了问题?
from multiprocessing import
import asyncio
import time
async def fristwork():
await asyncio.sleep(1)
print("fristwork take" ,str(time.time()))
return "Done"
async def secondwork():
fristwork()
def task(num):
coroutine = secondwork()
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print("task {}".format(num))
async def run_more(num):
print("start run_more")
pool = Pool(processes = 2)
for i in range(num):
pool.apply_async(task,args=(i,))
pool.close()
pool.join()
def main(num):
coroutine = run_more(num)
asyncio.ensure_future(coroutine),
loop2 = asyncio.get_event_loop()
loop2.run_until_complete(asyncio.wait(tasks))
if __name__ == '__main__':
以下为输出:
start run_more
fristwork take .5045896
fristwork take .5145957
fristwork take .506057
fristwork take .517011
同步到新浪微博
分享到微博?
关闭理由:
删除理由:
忽略理由:
推广(招聘、广告、SEO 等)方面的内容
与已有问题重复(请编辑该提问指向已有相同问题)
答非所问,不符合答题要求
宜作评论而非答案
带有人身攻击、辱骂、仇恨等违反条款的内容
无法获得确切结果的问题
非开发直接相关的问题
非技术提问的讨论型问题
其他原因(请补充说明)
我要该,理由是:
在 SegmentFault,学习技能、解决问题
每个月,我们帮助 1000 万的开发者解决各种各样的技术问题。并助力他们在技术能力、职业生涯、影响力上获得提升。from processpoll.pooldemo import hi
if __name__ == '__main__':
hi.t()class hi():
def install_java(self, host):
print('start',host)
import time
time.sleep(2)
def t(self):
import multiprocessing
pool = multiprocessing.Pool(processes=5)
for host in range(100):
pool.apply_async(self.install_java,
args=(host,))
pool.close()
pool.join()注意启动必须在__main__中
导入相关模块import multiprocessing
import time
import randam
import os def worker(msg):
t_start = time...
队列的使用
qq=Queue(3)#初始化一个Queue对象,最多可接收3条put消息
qq.put(&message&)#存放
qq.full() #True False 是否已满
from multiprocessing import Process
from multiprocessing import Pool
from multiprocessing import Man...
由于Python中线程封锁机制,导致Python中的多线程并不是正真意义上的多线程。当我们有并行处理需求的时候,可以采用多进程迂回地解决。如果要在主进程中启动大量的子进程,可以用进程池的方式批量创建子...
最近想用python写个爬虫,根据学校图书馆的学号密码。由于学校图书馆初始密码为6位数字,而且不需要验证码,所以破解起来很简单。思路就是生成6为数字暴力密码本,依次向网页POST“学号-密码”的表单即...
使用python进程库中的进程池Pool可以简便的对进程任务进行管理,
同时python的multiprocessing为进程间通讯提供了不少工具,
其中之一便是消息队列Queue.
在测试时,...
在刚开始学多进程或多线程时,我们迫不及待地基于多进程或多线程实现并发的套接字通信,然而这种实现方式的致命缺陷是:服务的开启的进程数或线程数都会随着并发的客户端数目地增多而增多,这会对服务端主机带...
下面的类可以创建进程池,可以吧各种数据处理任务都提交给进程池。进程池提供的功能有点类似于列表解析和功能性编程操作(如映射-规约)提供的功能。Pool( [ numprocess [, initiali...
========异步#进程池中的Queue
如果要用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue,
与multiprocessing中的Que...
初始化Pool时,可以指定一个最大进程数。在有新的请求提交到Pool中时,如果
池子还没有满,那么就会创建一个新的进程用来执行请求;如果池中的进程数
已经达到最大值,那么该请求就会等待,直到...
没有更多推荐了,人生总有离别,也总有邂逅。
CONTACT ME
& COPYRIGHT BY DuanYi 2016一,共享数据
展望未来,基于消息传递的并发编程是大势所趋
即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合
通过消息队列交换数据。这样极大地减少了对使用锁定和其他同步手段的需求,
还可以扩展到分布式系统中
进程间通信应该尽量避免使用本节所讲的共享数据的方式
进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,
from multiprocessing import Manager,Process,Lock
def work(d,lock):
# with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
d['count']-=1
if __name__ == '__main__':
lock=Lock()
with Manager() as m:
dic=m.dict({'count':100})
for i in range(100):
p=Process(target=work,args=(dic,lock))
p_l.append(p)
for p in p_l:
print(dic)
#{'count': 94}
进程之间操作共享的数据
1 # 共享文件,速度慢硬盘IO,没有锁
2 # 内存级别
3 # 队列,有锁,速度快
4 # 没有锁就是数据可能不安全
5 # 管道,隔离的进程之间通信,没有锁
7 # IPC就是进程之间怎么通信,2种方式,队列和管道
9 # 共享数据,共享内存最原始的方式,比如共享字典,没有锁,跟管道一样
<span style="color: # # Manager 共享数据
<span style="color: # # 代码一
<span style="color: # from multiprocessing import Manager,Process
<span style="color: # def work(dic):
<span style="color: #
dic['count']-=1
<span style="color: # # if __name__ == '__main__':
<span style="color: # #
m=Manager()
<span style="color: # #
share_dic=m.dict({'count':100})
<span style="color: # #
<span style="color: # #
for i in range(100):
#开100个进程
<span style="color: # #
p=Process(target=work,args=(share_dic,))
<span style="color: # #
p_l.append(p)
<span style="color: # #
#这里只是发信号
<span style="color: # #
# p.join()
#放在这里就是串行
<span style="color: # #
for i in p_l:
#可能出现同时写,因为共享数据
<span style="color: # #
<span style="color: # #
print(share_dic)
<span style="color: #
<span style="color: # # 加锁共享数据,不会对数据产生修改
<span style="color: # from multiprocessing import Manager,Process,Lock
<span style="color: # def work(dic,mutex):
<span style="color: #
# mutex.acquire()
#加锁的2重写法
<span style="color: #
# dic['count']-=1
<span style="color: #
# mutex.release()
<span style="color: #
with mutex:
<span style="color: #
dic['count']-=1
<span style="color: #
<span style="color: # if __name__ == '__main__':
<span style="color: #
mutex=Lock()
<span style="color: #
m=Manager()
<span style="color: #
share_dic=m.dict({'count':100})
<span style="color: #
<span style="color: #
for i in range(100):
<span style="color: #
p=Process(target=work,args=(share_dic,mutex))
<span style="color: #
p_l.append(p)
<span style="color: #
<span style="color: #
for i in p_l:
<span style="color: #
<span style="color: #
print(share_dic)
二,开启进程池
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:
很明显需要并发执行的任务通常要远大于核数
一个操作系统不可能无限开启进程,通常有几个核就开几个进程
进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)
例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
我们就可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数... ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。
& &&创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程
1 Pool([numprocess
[,initializer [, initargs]]]):创建进程池&
& &&参数介绍:
1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组
 &方法介绍:
& & 主要方法:
1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
2 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
4 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
5 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
& &其他方法(了解部分)
方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
obj.ready():如果调用完成,返回True
obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
obj.wait([timeout]):等待结果变为可用。
obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
View Code&
& & &应用:
from multiprocessing import Pool
import os,time
def work(n):
print('%s run' %os.getpid())
time.sleep(3)
return n**2
if __name__ == '__main__':
p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
for i in range(10):
res=p.apply(work,args=(i,)) #同步运行,阻塞、直到本次任务执行完毕拿到res
res_l.append(res)
print(res_l)
apply同步执行:阻塞式
from multiprocessing import Pool
import os,time
def work(n):
print('%s run' %os.getpid())
time.sleep(3)
return n**2
if __name__ == '__main__':
p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
for i in range(10):
res=p.apply_async(work,args=(i,)) #同步运行,阻塞、直到本次任务执行完毕拿到res
res_l.append(res)
#异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
for res in res_l:
print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
apply_async异步执行:非阻塞
#一:使用进程池(非阻塞,apply_async)
#coding: utf-8
from multiprocessing import Process,Pool
import time
def func(msg):
print( "msg:", msg)
time.sleep(1)
return msg
if __name__ == "__main__":
pool = Pool(processes = 3)
for i in range(10):
msg = "hello %d" %(i)
res=pool.apply_async(func, (msg, ))
#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
res_l.append(res)
print("==============================&") #没有后面的join,或get,则程序整体结束,进程池中的任务还没来得及全部执行完也都跟着主进程一起结束了
pool.close() #关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
pool.join()
#调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
print(res_l) #看到的是&multiprocessing.pool.ApplyResult object at 0x&对象组成的列表,而非最终的结果,但这一步是在join后执行的,证明结果已经计算完毕,剩下的事情就是调用每个对象下的get方法去获取结果
for i in res_l:
print(i.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
#二:使用进程池(阻塞,apply)
#coding: utf-8
from multiprocessing import Process,Pool
import time
def func(msg):
print( "msg:", msg)
time.sleep(0.1)
return msg
if __name__ == "__main__":
pool = Pool(processes = 3)
for i in range(10):
msg = "hello %d" %(i)
res=pool.apply(func, (msg, ))
#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
res_l.append(res) #同步执行,即执行完一个拿到结果,再去执行另外一个
print("==============================&")
pool.close()
pool.join()
#调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
print(res_l) #看到的就是最终的结果组成的列表
for i in res_l: #apply是同步的,所以直接得到结果,没有get()方法
详解:apply_async与apply
练习2:使用进程池维护固定数目的进程(重写练习1)
#Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count())
#开启6个客户端,会发现2个客户端处于等待状态
#在每个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程
from socket import *
from multiprocessing import Pool
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)
def talk(conn,client_addr):
print('进程pid: %s' %os.getpid())
while True:
msg=conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
if __name__ == '__main__':
while True:
conn,client_addr=server.accept()
p.apply_async(talk,args=(conn,client_addr))
# p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
msg=input('&&: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))
发现:并发开启多个客户端,服务端同一时间只有3个不同的pid,干掉一个客户端,另外一个客户端才会进来,被3个进程之一处理
三,进程回调函数
回掉函数:
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
from multiprocessing import Pool
import requests
import json
def get_page(url):
print('&进程%s& get %s' %(os.getpid(),url))
respone=requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text}
def pasrse_page(res):
print('&进程%s& parse %s' %(os.getpid(),res['url']))
parse_res='url:&%s& size:[%s]\n' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res)
if __name__ == '__main__':
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://help.github.com/',
'http://www.sina.com.cn/'
for url in urls:
res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
res_l.append(res)
print([res.get() for res in res_l]) #拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了
&进程3388& get https://www.baidu.com
&进程3389& get https://www.python.org
&进程3390& get https://www.openstack.org
&进程3388& get https://help.github.com/
&进程3387& parse https://www.baidu.com
&进程3389& get http://www.sina.com.cn/
&进程3387& parse https://www.python.org
&进程3387& parse https://help.github.com/
&进程3387& parse http://www.sina.com.cn/
&进程3387& parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '&!DOCTYPE html&\r\n...',...}]
from multiprocessing import Pool
import time,random
import requests
def get_page(url,pattern):
response=requests.get(url)
if response.status_code == 200:
return (response.text,pattern)
def parse_page(info):
page_content,pattern=info
res=re.findall(pattern,page_content)
for item in res:
'index':item[0],
'title':item[1],
'actor':item[2].strip()[3:],
'time':item[3][5:],
'score':item[4]+item[5]
print(dic)
if __name__ == '__main__':
pattern1=re.compile(r'&dd&.*?board-index.*?&(\d+)&.*?title="(.*?)".*?star.*?&(.*?)&.*?releasetime.*?&(.*?)&.*?integer.*?&(.*?)&.*?fraction.*?&(.*?)&',re.S)
'http://maoyan.com/board/7':pattern1,
for url,pattern in url_dic.items():
res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
res_l.append(res)
for i in res_l:
# res=requests.get('http://maoyan.com/board/7')
# print(re.findall(pattern,res.text))
&如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数
from multiprocessing import Pool
import time,random,os
def work(n):
time.sleep(1)
return n**2
if __name__ == '__main__':
for i in range(10):
res=p.apply_async(work,args=(i,))
res_l.append(res)
p.join() #等待进程池中所有进程执行完毕
for res in res_l:
nums.append(res.get()) #拿到所有结果
print(nums) #主进程拿到所有的处理结果,可以在主进程中进行统一进行处理
阅读(...) 评论()Python 进程池在面向对象过程中,进程池无法调用实例方法 - V2EX
Python 进程池在面向对象过程中,进程池无法调用实例方法
112 天前 &noobpythoner
下列代码中,func 函数无法被 pool.apply_async 调用,这是什么情况?
import time
from multiprocessing.pool import Pool
class Test:
def __init__(self):
self.pool = Pool(5)
def func(self):
time.sleep(0.2)
print("1")
def run(self):
for i in range(10):
self.pool.apply_async(self.func)
# 这里的 func 为什么不能进入执行?
time.sleep(3)
self.pool.close()
self.pool.join()
if __name__ == '__main__':
t = Test()
864 次点击所在节点 &
Rob007111 天前
justou111 天前你把 func 改成 staticmethod 试试;再试试改成这样报什么错:import timefrom multiprocessing.pool import Poolclass Test:
def __init__(self):
self.pool = Pool(5)
def func(self):
time.sleep(0.2)
print("1")
def run(self):
results = [self.pool.apply_async(self.func) for _ in range(10)]
for res in results:
print(res.get())
time.sleep(3)
self.pool.close()
self.pool.join()if __name__ == '__main__':
t = Test()
t.run()再搜索下可能就很多收获了:)
第 1 页 / 共 1 页&
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到
上打开本讨论主题的完整版本。
是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
is a community of developers, designers and creative people.

我要回帖

更多关于 python守护进程进程池 的文章

 

随机推荐