python yield 协程爬虫到底是单线程好还是协程

基础知识(1)什么是同步IO和异步IO,它们之间有什么区别?答:举个现实例子,假设你需要打开4个不同的网站,但每个网站都比较卡。IO过程就相当于你打开网站的过程,CPU就是你的点击动作。你的点击动作很快,但是网站打开很慢。同步IO是指你每点击一个网址,都等待该网站彻底显示,才会去点击下一个网址。异步IO是指你点击完一个网址,不等对方服务器返回结果,立马新开浏览器窗口去打开另外一个网址,以此类推,最后同时等待4个网站彻底打开。很明显异步IO的效率更高。(2)什么是协程,为什么要使用协程?Python中解决IO密集型任务(打开多个网站)的方式有很多种,比如多进程、多线程。但理论上一台电脑中的线程数、进程数是有限的,而且进程、线程之间的切换也比较浪费时间。所以就出现了“协程”的概念。协程允许一个执行过程A中断,然后转到执行过程B,在适当的时候再一次转回来,有点类似于多线程。但协程有以下2个优势:协程的数量理论上可以是无限个,而且没有线程之间的切换动作,执行效率比线程高。协程不需要“锁”机制,即不需要lock和release过程,因为所有的协程都在一个线程中。相对于线程,协程更容易调试debug,因为所有的代码是顺序执行的。Python中的异步IO和协程Python中的协程是通过“生成器(generator)”的概念实现的。这里引用廖雪峰Python教程中的例子,并做一点修改和“装饰”:def consumer():
# 定义消费者,由于有yeild关键词,此消费者为一个生成器
print("[Consumer] Init Consumer ......")
r = "init ok"
# 初始化返回结果,并在启动消费者时,返回给生产者
while True:
n = yield r
# 消费者通过yield接收生产者的消息,同时返给其结果
print("[Consumer] conusme n = %s, r = %s" % (n, r))
r = "consume %s OK" % n
# 消费者消费结果,下个循环返回给生产者
def produce(c):
# 定义生产者,此时的 c 为一个生成器
print("[Producer] Init Producer ......")
r = c.send(None)
# 启动消费者生成器,同时第一次接收返回结果
print("[Producer] Start Consumer, return %s" % r)
while n & 5:
print("[Producer] While, Producing %s ......" % n)
r = c.send(n)
# 向消费者发送消息并准备接收结果。此时会切换到消费者执行
print("[Producer] Consumer return: %s" % r)
# 关闭消费者生成器
print("[Producer] Close Producer ......")
produce(consumer())
代码中添加了很详细的print语句和注释,帮助大家更好的理解。这里删除了源代码consumer中的“return”语句。如果还是不太明白,可以在编辑器中进行debug调试,一步步跟踪程序的运行过程。关于异步IO,在Python3.4中可以使用asyncio标准库。该标准库支持一个时间循环模型(EventLoop),我们声明协程,然后将其加入到EventLoop中,即可实现异步IO。Python中也有一个关于异步IO的很经典的HelloWorld程序(同样参考于廖雪峰教程):# 异步IO例子:适配Python3.4,使用asyncio库
@asyncio.coroutine
def hello(index):
# 通过装饰器asyncio.coroutine定义协程
print('Hello world! index=%s, thread=%s' % (index, threading.currentThread()))
yield from asyncio.sleep(1)
# 模拟IO任务
print('Hello again! index=%s, thread=%s' % (index, threading.currentThread()))
loop = asyncio.get_event_loop()
# 得到一个事件循环模型
tasks = [hello(1), hello(2)]
# 初始化任务列表
loop.run_until_complete(asyncio.wait(tasks))
# 执行任务
loop.close()
# 关闭事件循环列表
同样这里的代码添加了注释,并增加了index参数。输出currentThread的目的是演示当前程序都是在一个线程中执行的。返回结果如下:Hello world! index=1, thread=&_MainThread(MainThread, started 14816)&
Hello world! index=2, thread=&_MainThread(MainThread, started 14816)&
Hello again! index=1, thread=&_MainThread(MainThread, started 14816)&
Hello again! index=2, thread=&_MainThread(MainThread, started 14816)&
在Python3.5中引入了关于异步IO的新语法:async和await关键字。# 异步IO例子:适配Python3.5,使用async和await关键字
async def hello(index):
# 通过关键字async定义协程
print('Hello world! index=%s, thread=%s' % (index, threading.currentThread()))
await asyncio.sleep(1)
# 模拟IO任务
print('Hello again! index=%s, thread=%s' % (index, threading.currentThread()))
loop = asyncio.get_event_loop()
# 得到一个事件循环模型
tasks = [hello(1), hello(2)]
# 初始化任务列表
loop.run_until_complete(asyncio.wait(tasks))
# 执行任务
loop.close()
# 关闭事件循环列表
从代码中可以看出,使用async代替@asyncio.coroutine,使用await代替yield from,使得协程代码更加简洁易懂。async关键字将一个函数声明为协程函数,函数执行时返回一个协程对象。await关键字将暂停协程函数的执行,等待异步IO返回结果。这里还出现了一个新词:事件循环模型EventLoop,这又是个什么呢?EventLoop是一个程序结构,用于等待和发送消息和事件。简单说,就是在程序中设置两个线程:一个负责程序本身的运行,称为"主线程";另一个负责主线程与其他进程(主要是各种I/O操作)的通信,被称为"Event Loop线程"(可以译为"消息线程")。在爬虫中使用协程实现异步IO异步IO特别适合爬虫的工作,因为爬虫中所有的请求都属于IO密集型任务,想得到比较好的爬虫效率,使用协程很重要。关于Http异步请求,建议使用,一个异步的HTTP客户端/服务器框架。这里举个例子,更多用法可以参考其官方文档。async def get(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(url, resp.status)
print(url, await resp.text())
loop = asyncio.get_event_loop()
# 得到一个事件循环模型
# 初始化任务列表
get("http://zhushou.360.cn/detail/index/soft_id/3283370"),
get("http://zhushou.360.cn/detail/index/soft_id/3264775"),
get("http://zhushou.360.cn/detail/index/soft_id/705490")
loop.run_until_complete(asyncio.wait(tasks))
# 执行任务
loop.close()
# 关闭事件循环列表
老规矩,以上所有代码均上传至Github:另外关于aiohttp库的使用,我也会整理一份代码,上传到GitHub。=============================================================作者主页:作者专栏主页:作者GitHub主页:欢迎大家拍砖、提意见。相互交流,共同进步!==============================================================1649 条评论分享收藏文章被以下专栏收录Talk is cheap, show me the code!Python的协程为何会令人欲仙欲死!究竟是太难学还是什么原因呢?Python的协程为何会令人欲仙欲死!究竟是太难学还是什么原因呢?数字八哥百家号这些用法放在实际开发中可能不太够,我在后面的编写中就踩到了不少的坑,这里就回应上篇文章中末尾说的,再写一篇文章来梳理一下这些问题。时隔很久,终于还是磨磨唧唧的把第二篇写出来了。本来还准备有第三篇重生篇的,但是以防拖稿,合并到一起来写吧。在给大家分享之前呢,小编推荐一下一个挺不错的交流宝地,里面都是一群热爱并在学习Python的小伙伴们,大几千了吧,各种各样的人群都有,特别喜欢看到这种大家一起交流解决难题的氛围,群资料也上传了好多,各种大牛解决小白的问题,这个Python群: 欢迎大家进来一起交流讨论,一起进步,尽早掌握这门Python语言。0x01 yield进化!前面提到过,Python后来添加了 yield from 语法,之所以要引入这种东西,有两种原因,"把异常传入嵌套协程问题"和"让协程更好的处理返回值问题"。可能有人已经发现了,我们定义的协程并不会 return 值,只会通过 yield 生产值。我们下面写个例子,这个协程不会"产出"值,而是"返回"值。我们通过发送None来终止这个协程的运行,虽然抛出了 StopIteration异常,但是异常对象的 value部分存有 return语句的返回值。我们稍微修改一下调用方法,让这段代码看起来不那么奇怪:我们来看一个使用了 yield from的例子,看起来有点复杂,而且很难受,因为是为了使用 yield from刻意编出来的代码。这段代码作用是,计算三个数值集每一个集合的总和,运行结果如下,我们依次来看每个函数的作用。这段代码的大部分含义已经在注释中说明了,可以结合注释来理解这段代码的含义。看完代码,我们总结一下关键点:0x03 在并发和异步之前我们都知道,CPython解释器这个东西,本身是非线程安全的,存在一个GIL,一次只允许一个线程执行Python代码。而且我们无法通过Python代码来控制GIL,即便有办法,也不是提倡的做法。GIL的存在给我们在编写多线程程序的时候带来了一定程度的困扰。然而实际上,对于I/O密集型任务而言,GIL在某种程度上还会有一些正面作用。0x04 一切从爬虫开始我们从一个简单的爬虫开始,这个爬虫很简单,访问指定的URL,并且获取内容并计算长度,这里我们给定5个URL。第一版的代码十分简单,顺序获取每个URL的内容,当第一个请求完成、计算完长度后,再开始第二个请求。我们多运行几次看看结果。大约需要花费14-16秒不等,这段代码并没有什么好看的,我们把关注点放在后面的代码上。现在我们使用多线程来改写这端代码。再来看一下结果,很明显,我们的耗时已经降低到10s左右了:从这两段代码中,已经可以看出并发对于处理任务的好处了,但是使用原生的 threading模块还是略显麻烦,Python已经给我们内置了一个处理并发任务的库 concurrent,我们借用这个库修改一下我们的代码,之所以修改成这个库的原因还有一个,那就是引出我们后面会谈到的 Future。执行一下,会发现耗时与上一个版本一样,稳定在10s左右。结果非常惊人,鹅妹子嘤!谢谢阅读。本文转载文!如有侵权联系小编删除。原文链接本文仅代表作者观点,不代表百度立场。系作者授权百家号发表,未经许可不得转载。数字八哥百家号最近更新:简介:从媒体态度分享数字互联网热点信息评价作者最新文章相关文章Python实现基于协程的异步爬虫 - 吃咯 - 博客园
随笔 - 150, 文章 - 18, 评论 - 5, 引用 - 0
Python实现基于协程的异步爬虫
一、课程介绍
1. 课程来源
本课程核心部分来自项目,作者是来自 MongoDB 的工程师 A. Jesse Jiryu Davis 与 Python 之父 Guido van Rossum。项目代码使用 MIT 协议,项目文档使用&&协议。
课程内容在原文档基础上做了稍许修改,增加了部分原理介绍,步骤的拆解分析及源代码注释。
2. 内容简介
传统计算机科学往往将大量精力放在如何追求更有效率的算法上。但如今大部分涉及网络的程序,它们的时间开销主要并不是在计算上,而是在维持多个Socket连接上。亦或是它们的事件循环处理的不够高效导致了更多的时间开销。对于这些程序来说,它们面临的挑战是如何更高效地等待大量的网络事件并进行调度。目前流行的解决方式就是使用异步I/O。
本课程将探讨几种实现爬虫的方法,从传统的线程池到使用协程,每节课实现一个小爬虫。另外学习协程的时候,我们会从原理入手,以ayncio协程库为原型,实现一个简单的异步编程模型。
本课程实现的爬虫为爬一个整站的爬虫,不会爬到站点外面去,且功能较简单,主要目的在于学习原理,提供实现并发与异步的思路,并不适合直接改写作为日常工具使用。
3. 课程知识点
本课程项目完成过程中,我们将学习:
线程池实现并发爬虫
回调方法实现异步爬虫
协程技术的介绍
一个基于协程的异步编程模型
协程实现异步爬虫
二、实验环境
本课程使用Python 3.4,所以本课程内运行py脚本都是使用python3命令。
打开终端,进入&Code&目录,创建&crawler&文件夹, 并将其作为我们的工作目录。
$ mkdir crawler && cd crawler
环保起见,测试爬虫的网站在本地搭建。
我们使用 Python 2.7 版本官方文档作为测试爬虫用的网站
wget http://labfile.oss.aliyuncs.com/courses/574/python-doc.zip
unzip python-doc.zip
安装serve,一个用起来很方便的静态文件服务器:
sudo npm install -g serve
启动服务器:
serve python-doc
如果访问不了npm的资源,也可以用以下方式开启服务器:
ruby -run -ehttpd python-doc -p 3000
访问localhost:3000查看网站:
三、实验原理
什么是爬虫?
网络爬虫(又被称为网页蜘蛛,网络机器人,在FOAF社区中间,更经常的称为网页追逐者),是一种按照一定的规则,自动地抓取万维网信息的程序或者脚本。
爬虫的工作流程
网络爬虫基本的工作流程是从一个根URL开始,抓取页面,解析页面中所有的URL,将还没有抓取过的URL放入工作队列中,之后继续抓取工作队列中的URL,重复抓取、解析,将解析到的url放入工作队列的步骤,直到工作队列为空为止。
线程池、回调、协程
我们希望通过并发执行来加快爬虫抓取页面的速度。一般的实现方式有三种:
线程池方式:开一个线程池,每当爬虫发现一个新链接,就将链接放入任务队列中,线程池中的线程从任务队列获取一个链接,之后建立socket,完成抓取页面、解析、将新连接放入工作队列的步骤。
回调方式:程序会有一个主循环叫做事件循环,在事件循环中会不断获得事件,通过在事件上注册解除回调函数来达到多任务并发执行的效果。缺点是一旦需要的回调操作变多,代码就会非常散,变得难以维护。
协程方式:同样通过事件循环执行程序,利用了Python&的生成器特性,生成器函数能够中途停止并在之后恢复,那么原本不得不分开写的回调函数就能够写在一个生成器函数中了,这也就实现了协程。
四、实验一:线程池实现爬虫
使用socket抓取页面需要先建立连接,之后发送GET类型的HTTP报文,等待读入,将读到的所有内容存入响应缓存。
def fetch(url):
sock = socket.socket()
sock.connect(('localhost.com', 3000))
request = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url)
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
links = parse_links(response)
q.add(links)
默认的socket连接与读写是阻塞式的,在等待读入的这段时间的CPU占用是被完全浪费的。
默认这部分同学们都是学过的,所以就粗略记几个重点,没学过的同学可以直接参考廖雪峰的教程:
导入线程库:
import threading
开启一个线程的方法:
t = 你新建的线程
#开始运行线程
#你的当前函数就阻塞在这一步直到线程运行完
建立线程的两种方式:
线程同时操作一个全局变量时会产生线程竞争所以需要锁:
lock = threading.Lock()
lock.acquire()
#..操作全局变量..
lock.release()
多线程同步-队列
默认这部分同学们都是学过的,所以就粗略记几个重点,没学过的同学可以直接参考:
多线程同步就是多个线程竞争一个全局变量时按顺序读写,一般情况下要用锁,但是使用标准库里的Queue的时候它内部已经实现了锁,不用程序员自己写了。
导入队列类:
from queue import Queue
创建一个队列:
q = Queue(maxsize=0)
maxsize为队列大小,为0默认队列大小可无穷大。
队列是先进先出的数据结构:
q.put(item) #往队列添加一个item,队列满了则阻塞
q.get(item) #从队列得到一个item,队列为空则阻塞
还有相应的不等待的版本,这里略过。
队列不为空,或者为空但是取得item的线程没有告知任务完成时都是处于阻塞状态
#阻塞直到所有任务完成
线程告知任务完成使用task_done
q.task_done()
实现线程池
创建thread.py文件作为爬虫程序的文件。
我们使用seen_urls来记录已经解析到的url地址:
seen_urls = set(['/'])
创建Fetcher类:
class Fetcher(Thread):
def __init__(self, tasks):
Thread.__init__(self)
使用正则库与url解析库来解析抓取的页面,这里图方便用了正则,同学也可以用Beautifulsoup等专门用来解析页面的Python库:
import urllib.parse
在Fetcher中实现parse_links解析页面:
def parse_links(self, fetched_url, response):
if not response:
print('error: {}'.format(fetched_url))
return set()
if not self._is_html(response):
return set()
实现线程池类与main的部分:
class ThreadPool:
def __init__(self, num_threads):
self.tasks = Queue()
for _ in range(num_threads):
Fetcher(self.tasks)
def add_task(self, url):
self.tasks.put(url)
def wait_completion(self):
self.tasks.join()
if __name__ == '__main__':
start = time.time()
这里先贴出完整代码:
from queue import Queue
from threading import Thread, Lock
import urllib.parse
import socket
import time
seen_urls = set(['/'])
lock = Lock()
class Fetcher(Thread):
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
url = self.tasks.get()
print(url)
sock = socket.socket()
sock.connect(('localhost', 3000))
get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url)
sock.send(get.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
links = self.parse_links(url, response)
lock.acquire()
for link in links.difference(seen_urls):
self.tasks.put(link)
seen_urls.update(links)
lock.release()
self.tasks.task_done()
def parse_links(self, fetched_url, response):
if not response:
print('error: {}'.format(fetched_url))
return set()
if not self._is_html(response):
return set()
urls = set(re.findall(r'''(?i)href=["']?([^\s"'&&]+)''',
self.body(response)))
links = set()
for url in urls:
normalized = urllib.parse.urljoin(fetched_url, url)
parts = urllib.parse.urlparse(normalized)
if parts.scheme not in ('', 'http', 'https'):
host, port = urllib.parse.splitport(parts.netloc)
if host and host.lower() not in ('localhost'):
defragmented, frag = urllib.parse.urldefrag(parts.path)
links.add(defragmented)
return links
def body(self, response):
body = response.split(b'\r\n\r\n', 1)[1]
return body.decode('utf-8')
def _is_html(self, response):
head, body = response.split(b'\r\n\r\n', 1)
headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:])
return headers.get('Content-Type', '').startswith('text/html')
class ThreadPool:
def __init__(self, num_threads):
self.tasks = Queue()
for _ in range(num_threads):
Fetcher(self.tasks)
def add_task(self, url):
self.tasks.put(url)
def wait_completion(self):
self.tasks.join()
if __name__ == '__main__':
start = time.time()
pool = ThreadPool(4)
pool.add_task("/")
pool.wait_completion()
print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))
运行python3 thread.py命令查看效果(记得先开网站服务器):
使用标准库中的线程池
线程池直接使用multiprocessing.pool中的ThreadPool:
代码更改如下:
from multiprocessing.pool import ThreadPool
使用ThreadPool时,它处理的对象可以不是线程对象,实际上Fetcher的线程部分ThreadPool根本用不到。因为它自己内部已开了几个线程在等待任务输入。这里偷个懒就只把self.start()去掉了。可以把Fetcher的线程部分全去掉,效果是一样的。
ThreadPool活用了map函数,这里它将每一个Fetcher对象分配给线程池中的一个线程,线程调用了Fetcher的run函数。这里使用map_async是因为不希望它在那一步阻塞,我们希望在任务队列join的地方阻塞,那么到队列为空且任务全部处理完时程序就会继续执行了。
运行python3 thread.py命令查看效果:
线程池实现的缺陷
我们希望爬虫的性能能够进一步提升,但是我们没办法开太多的线程,因为线程的内存开销很大,每创建一个线程可能需要占用50k的内存。以及还有一点,网络程序的时间开销往往花在I/O上,socket I/O 阻塞时的那段时间是完全被浪费了的。那么要如何解决这个问题呢?
下节课你就知道啦,下节课见~Python多线程鸡年不鸡肋 - Python - 伯乐在线
& Python多线程鸡年不鸡肋
术业有专攻,如是而已
当初在刚学习python多线程时,上网搜索资料几乎都是一片倒的反应python没有真正意义上的多线程,python多线程就是鸡肋。当时不明所以,只是了解到python带有GIL解释器锁的概念,同一时刻只能有一个线程在运行,遇到IO操作才会释放切换。那么,python多线程是否真的很鸡肋呢?要解决这个疑惑,我想必须亲自动手测试。
经过对比python与java的多线程测试,我发现python多线程的效率确实不如java,但远还没有达到鸡肋的程度,那么跟其他机制相比较呢?
观点:用多进程替代多线程需求
辗转了多篇博文,我看到了一些网友的观点,觉得应该使用python多进程来代替多线程的需求,因为多进程不受GIL的限制。于是我便动手使用多进程去解决一些并发问题,期间也遇到了一些坑,所幸大部分查找资料解决了,然后对多进程做了简单汇总介绍。
那么是否多进程能完全替代多线程呢?别急,我们继续往下看。
观点:协程为最佳方案
协程的概念目前来说是比较火热的,协程不同于线程的地方在于协程不是操作系统进行切换,而是由程序员编码进行切换的,也就是说切换是由程序员控制的,这样就没有了线程所谓的安全问题。协程的概念非常广而深,本文暂不做具体介绍,以后会单独成文。
好了,网上的观点无非是使用多进程或者协程来代替多线程(当然换编程语言,换解释器之类方法除外),那么我们就来测试下这三者的性能之差。既然要公平测试,就应该考虑IO密集型与CPU密集型的问题,所以分两组数据进行测试。
IO密集型测试
测试IO密集型,我选择最常用的爬虫功能,计算爬虫访问bing所需要的时间。(主要测试多线程与协程,单线程与多进程就不测了,因为没有必要)
测试代码:
#! -*- coding:utf-8 -*-
from monkey.patch_all()
import gevent
import time
import threading
import urllib2
def urllib2_(url):
urllib2.urlopen(url,timeout=10).read()
except Exception,e:
def gevent_(urls):
jobs=[gevent.spawn(urllib2_,url) for url in urls]
gevent.joinall(jobs,timeout=10)
for i in jobs:
def thread_(urls):
for url in urls:
t=threading.Thread(target=urllib2_,args=(url,))
a.append(t)
for i in a:
for i in a:
if __name__=="__main__":
urls=["https://www.bing.com/"]*10
t1=time.time()
gevent_(urls)
t2=time.time()
print 'gevent-time:%s' % str(t2-t1)
thread_(urls)
t4=time.time()
print 'thread-time:%s' % str(t4-t2)
12345678910111213141516171819202122232425262728293031323334
#! -*- coding:utf-8 -*-from gevent import monkey;monkey.patch_all()import geventimport timeimport threadingimport urllib2def urllib2_(url): try:
urllib2.urlopen(url,timeout=10).read() except Exception,e:
print edef gevent_(urls): jobs=[gevent.spawn(urllib2_,url) for url in urls] gevent.joinall(jobs,timeout=10) for i in jobs:
i.join()def thread_(urls): a=[] for url in urls:
t=threading.Thread(target=urllib2_,args=(url,))
a.append(t) for i in a:
i.start() for i in a:
i.join()if __name__=="__main__": urls=["https://www.bing.com/"]*10&&&&&& t1=time.time() gevent_(urls) t2=time.time() print 'gevent-time:%s' % str(t2-t1) thread_(urls) t4=time.time() print 'thread-time:%s' % str(t4-t2)
测试结果:
gevent-time:0.
thread-time:0.
gevent-time:1.
thread-time:1.
gevent-time:2.
thread-time:2.
gevent-time:6.
thread-time:10.
从结果可以看出,当并发数不断增大时,协程的效率确实比多线程要高,但在并发数不是那么高时,两者差异不大。
CPU密集型,我选择科学计算的一些功能,计算所需时间。(主要测试单线程、多线程、协程、多进程)
测试代码:
#! -*- coding:utf-8 -*-
from multiprocessing import Process as pro
from multiprocessing.dummy import Process as thr
from monkey.patch_all()
import gevent
def run(i):
lists=range(i)
list(set(lists))
if __name__=="__main__":
for i in range(30):
##10-2.1s 20-3.8s 30-5.9s
t=pro(target=run,args=(5000000,))
# for i in range(30):
t=thr(target=run,args=(5000000,))
# jobs=[gevent.spawn(run,5000000) for i in range(30)]
##10-4.0s 20-7.7s 30-11.5s
# gevent.joinall(jobs)
# for i in jobs:
# for i in range(30):
20-7.6s 30-11.3s
run(5000000)
12345678910111213141516171819202122232425262728293031323334
#! -*- coding:utf-8 -*-from multiprocessing import Process as profrom multiprocessing.dummy import Process as thrfrom gevent import monkey;monkey.patch_all()import geventdef run(i): lists=range(i) list(set(lists)) if __name__=="__main__": ''' 多进程 ''' for i in range(30):&&&&&&##10-2.1s 20-3.8s 30-5.9s
t=pro(target=run,args=(5000000,))
t.start() ''' 多线程 ''' # for i in range(30):&&&&##10-3.8s&&20-7.6s&&30-11.4s #
t=thr(target=run,args=(5000000,)) #
t.start() ''' 协程 ''' # jobs=[gevent.spawn(run,5000000) for i in range(30)]&&##10-4.0s 20-7.7s 30-11.5s # gevent.joinall(jobs) # for i in jobs: #
i.join() ''' 单线程 ''' # for i in range(30):&&##10-3.5s&&20-7.6s 30-11.3s #
run(5000000)
测试结果:
并发10次:【多进程】2.1s 【多线程】3.8s 【协程】4.0s 【单线程】3.5s
并发20次:【多进程】3.8s 【多线程】7.6s 【协程】7.7s 【单线程】7.6s
并发30次:【多进程】5.9s 【多线程】11.4s 【协程】11.5s 【单线程】11.3s
可以看到,在CPU密集型的测试下,多进程效果明显比其他的好,多线程、协程与单线程效果差不多。这是因为只有多进程完全使用了CPU的计算能力。在代码运行时,我们也能够看到,只有多进程可以将CPU使用率占满。
从两组数据我们不难发现,python多线程并没有那么鸡肋。如若不然,Python3为何不去除GIL呢?对于此问题,Python社区也有两派意见,这里不再论述,我们应该尊重Python之父的决定。
至于何时该用多线程,何时用多进程,何时用协程?想必答案已经很明显了。
当我们需要编写并发爬虫等IO密集型的程序时,应该选用多线程或者协程(亲测差距不是特别明显);当我们需要科学计算,设计CPU密集型程序,应该选用多进程。当然以上结论的前提是,不做分布式,只在一台服务器上测试。
答案已经给出,本文是否就此收尾?既然已经论述Python多线程尚有用武之地,那么就来介绍介绍其用法吧。
Multiprocessing.dummy模块
Multiprocessing.dummy用法与多进程Multiprocessing用法类似,只是在import包的时候,加上.dummy。
threading模块
这是python自带的threading多线程模块,其创建多线程主要有2种方式。一种为继承threading类,另一种使用threading.Thread函数,接下来将会分别介绍这两种用法。
Usage【1】
利用threading.Thread()函数创建线程。
def run(i):
for i in range(10):
t=threading.Thread(target=run,args=(i,))
def run(i): print ifor i in range(10): t=threading.Thread(target=run,args=(i,)) t.start()
说明:Thread()函数有2个参数,一个是target,内容为子线程要执行的函数名称;另一个是args,内容为需要传递的参数。创建完子线程,将会返回一个对象,调用对象的start方法,可以启动子线程。
线程对象的方法:
Start() 开始线程的执行
Run() 定义线程的功能的函数
Join(timeout=None) 程序挂起,直到线程结束;如果给了timeout,则最多阻塞timeout秒
getName() 返回线程的名字
setName() 设置线程的名字
isAlive() 布尔标志,表示这个线程是否还在运行
isDaemon() 返回线程的daemon标志
setDaemon(daemonic) 把线程的daemon标志设为daemonic(一定要在start()函数前调用)
t.setDaemon(True) 把父线程设置为守护线程,当父进程结束时,子进程也结束。
threading类的方法:
threading.enumerate() 正在运行的线程数量
Usage【2】
通过继承threading类,创建线程。
import threading
class test(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
print "code one"
for i in range(10):
cur=test()
cur.start()
for i in range(10):
cur.join()
1234567891011121314
import threadingclass test(threading.Thread):&&&&def __init__(self):&&&&&&&&threading.Thread.__init__(self)&&&&def run(self):&&&&&&&&try:&&&&&&&&&&&&print "code one"&&&&&&&&except:&&&&&&&&&&&&passfor i in range(10):&&&&cur=test()&&&&cur.start()for i in range(10):&&&&cur.join()
说明:此方法继承了threading类,并且重构了run函数功能。
获取线程返回值问题
有时候,我们往往需要获取每个子线程的返回值。然而通过调用普通函数,获取return值的方式在多线程中并不适用。因此需要一种新的方式去获取子线程返回值。
import threading
class test(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
self.tag=1
def get_result(self):
if self.tag==1:
return True
return False
while f.isAlive():
print f.get_result()
12345678910111213141516
import threadingclass test(threading.Thread):&&&&def __init__(self):&&&&&&&&threading.Thread.__init__(self)&&&&def run(self):&&&&&&&&self.tag=1&&&&def get_result(self):&&&&&&&&if self.tag==1:&&&&&&&&&&&&return True&&&&&&&&else:&&&&&&&&&&&&return Falsef=test()f.start()while f.isAlive():&&&&continueprint f.get_result()
说明:多线程获取返回值的首要问题,就是子线程什么时候结束?我们应该什么时候去获取返回值?可以使用isAlive()方法判断子线程是否存活。
控制线程运行数目
当需要执行的任务非常多时,我们往往需要控制线程的数量,threading类自带有控制线程数量的方法。
import threading
##并发的线程数量
threadLimiter=threading.BoundedSemaphore(maxs)
class test(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
threadLimiter.acquire()
print "code one"
threadLimiter.release() #释放
for i in range(100):
cur=test()
cur.start()
for i in range(100):
cur.join()
12345678910111213141516171819
import threadingmaxs=10&&##并发的线程数量threadLimiter=threading.BoundedSemaphore(maxs)class test(threading.Thread):&&&&def __init__(self):&&&&&&&&threading.Thread.__init__(self)&&&&def run(self):&&&&&&&&threadLimiter.acquire()&&#获取&&&&&&&&try:&&&&&&&&&&&&print "code one"&&&&&&&&except:&&&&&&&&&&&&pass&&&&&&&&finally:&&&&&&&&&&&&threadLimiter.release() #释放for i in range(100):&&&&cur=test()&&&&cur.start()for i in range(100):&&&&cur.join()
说明:以上程序可以控制多线程并发数为10,超过这个数量会引发异常。
除了自带的方法,我们还可以设计其他方案:
threads=[]
创建所有线程
for i in range(10):
t=threading.Thread(target=run,args=(i,))
threads.append(t)
启动列表中的线程
for t in threads:
while True:
#判断正在运行的线程数量,如果小于5则退出while循环,
#进入for循环启动新的进程.否则就一直在while循环进入死循环
if(len(threading.enumerate())&5):
1234567891011121314151617
threads=[]'''创建所有线程'''for i in range(10): t=threading.Thread(target=run,args=(i,)) threads.append(t)'''启动列表中的线程'''for t in threads:&&&&t.start()&&&&while True:&&&&&&&&#判断正在运行的线程数量,如果小于5则退出while循环,&&&&&&&&#进入for循环启动新的进程.否则就一直在while循环进入死循环&&&&&&&&if(len(threading.enumerate())&5):&&&&&&&&&&&&break
以上两种方式皆可以,本人更喜欢用下面那种方式。
import threadpool
def ThreadFun(arg1,arg2):
def main():
device_list=[object1,object2,object3......,objectn]#需要处理的设备个数
task_pool=threadpool.ThreadPool(8)#8是线程池中线程的个数
request_list=[]#存放任务列表
#首先构造任务列表
for device in device_list:
request_list.append(threadpool.makeRequests(ThreadFun,[((device, ), {})]))
#将每个任务放到线程池中,等待线程池中线程各自读取任务,然后进行处理,使用了map函数,不了解的可以去了解一下。
map(task_pool.putRequest,request_list)
#等待所有任务处理完成,则返回,如果没有处理完,则一直阻塞
task_pool.poll()
if __name__=="__main__":
12345678910111213141516
import threadpooldef ThreadFun(arg1,arg2):&&&&passdef main():&&&&device_list=[object1,object2,object3......,objectn]#需要处理的设备个数&&&&task_pool=threadpool.ThreadPool(8)#8是线程池中线程的个数&&&&request_list=[]#存放任务列表&&&&#首先构造任务列表&&&&for device in device_list:&&&&&&&&request_list.append(threadpool.makeRequests(ThreadFun,[((device, ), {})]))&&&&#将每个任务放到线程池中,等待线程池中线程各自读取任务,然后进行处理,使用了map函数,不了解的可以去了解一下。&&&&map(task_pool.putRequest,request_list)&&&&#等待所有任务处理完成,则返回,如果没有处理完,则一直阻塞&&&&task_pool.poll()if __name__=="__main__":&&&&main()
多进程问题,可以赶赴现场,其他关于多线程问题,可以下方留言讨论
申明:本文谈不上原创,其中借鉴了网上很多大牛的文章,本人只是在此测试论述Python多线程相关问题,并简单介绍Python多线程的基本用法,为新手朋友解惑。
可能感兴趣的话题
request_list.append(threadpool.makeRequests(ThreadFun,[((device, ), {})]))
request_list.append(threadpool.makeRequests(ThreadFun,[((device, ), {})]))应该变成
request_list.append(threadpool.makeRequests(ThreadFun,[((device, ), {})])[0])
原因是 &span class="crayon-v"&threadpool&/span&&span class="crayon-sy"&.&/span&&span class="crayon-e"&makeRequests 返回的是一个list&/span&
request_list.append(threadpool.makeRequests(ThreadFun,[((device, ), {})])[0])&原因是 &span class="crayon-v"&threadpool&/span&&span class="crayon-sy"&.&/span&&span class="crayon-e"&makeRequests 返回的是一个list&/span&
关于 Python 频道
Python频道分享 Python 开发技术、相关的行业动态。
新浪微博:
推荐微信号
(加好友请注明来意)
– 好的话题、有启发的回复、值得信赖的圈子
– 分享和发现有价值的内容与观点
– 为IT单身男女服务的征婚传播平台
– 优秀的工具资源导航
– 翻译传播优秀的外文文章
– 国内外的精选文章
– UI,网页,交互和用户体验
– 专注iOS技术分享
– 专注Android技术分享
– JavaScript, HTML5, CSS
– 专注Java技术分享
– 专注Python技术分享
& 2018 伯乐在线

我要回帖

更多关于 协程 爬虫 的文章

 

随机推荐