python循环怎么用python 多线程 循环去运行

Python 多线程,限制线程数运行, - 为程序员服务
为程序员服务
多线程,限制线程数运行,
[Python]代码
#encoding:utf8
import threading
import time
def func(sleeptime):
global data
print threading.currentThread().getName()
time.sleep(sleeptime)
threads = []
for i in range(0,40):
t = threading.Thread(target=func,args=(i,))
threads.append(t)
for t in threads:
while True:
#判断正在运行的线程数量,如果小于5则退出while循环,
#进入for循环启动新的进程.否则就一直在while循环进入死循环
if(len(threading.enumerate()) & 5):
您可能的代码
相关聚客文章
荣誉:1290
相关专栏文章类型:Python,C++ & Qt4,创建时间:三月 14,
标题无“转载”即原创文章,版权所有。转载请注明来源:/blogs/article/80/。
经常用Python写程序的朋友应该都知道怎么用threading模块来启动一个新线程。主要有两种方式:
直接使用threading.Thread类型。这种方法相对简单。比如下面这两行代码演示了如何启动一个新线程,并且当新线程调用sendData()函数时传入'arg1', arg2'两个参数:
sendDataThread=threading.Thread(target=sendData, args=('arg1', 'arg2'))
sendDataThread.start()
继承threading.Thread类,重载它的run()方法。这种方法比较麻烦,它的好处是,很方便把一个长的函数拆分成好几部分,方便多个线程之间的同步。比如下面这个发送数据的线程,它会发送数据的时候做统计,其中makeConnection()使用了连接池:
class SendDataThread(threading.Thread):
daemon=True
def __init__(self, counter, dataSource):
threading.Thread.__init__(self)
#多个线程共享一个counter
self.counter, self.dataSource=counter, dataSource
def run(self):
self.sendData()
del self.counter, self.dataSource
def sendData(self):
#此处省略连接,取数据等操作,分别是makeConnection()和makePacket()两个方法
connection=self.makeConnection()
data=self.dataSource.makePacket()
sentBytes=connection.send(data)
self.counter.increase(sentBytes)
def makeConnection(self):
&从连接池里面返回一个空闲的连接。&
两种方法实际上差不多。如果线程做的工作比较简单,只有一个函数就使用第一种。如果线程的工作繁复,可以拆成多个方法,就写成一个类。不过需要注意的是,第二种方法最好把run()方法像上面一样写成try...finally... 的形式,主要是为了避免循环引用。Python的GC使用了简单的引用计数,所以,如果Counter引用了SendDataThread,而SendDataThread也引用了Counter就会发生循环引用,两个对象可能不会被释放。这时,最好在SendDataThread执行完毕时解除对Counter的引用。当然,如果确定不会发生循环引用,可以不用这样做。
第二种方法需要继承自Thread,有时候不太方便。我们可以结合第一种方法,稍微变通一样。比如这样:
import threading, struct
class Writer:
def write(self, fout, string, blockSize):
for block in self.splitToBlock(string, blockSize):
header=struct.pack(&!i&, len(block))
fout.write(header)
fout.write(block)
def splitToBlock(self, string, blockSize):
data=string.encode(&utf-8&)
for i in range(0, len(data), blockSize):
yield data[i:i+blockSize]
writer=Writer()
threading.Thread(target=writer.write, args=(fout, string, blockSize)).start()
最后一句推广开来可以弄成一个decorater,让同步执行的方法立即变成异步执行。
import threading, functools
def async(wrapped):
def wrapper(*args, **kwargs):
t=threading.Thread(target=wrapped, args=args, kwargs=kwargs)
t.daemon=True
functools.update_wrapper(wrapper, wrapped)
return wrapper
于是write()可以改写成:
def write(self, fout, string, blockSize):
pass #和原来一样
下次直接调用writer.write()就变成异步运行了。
在Python里面使用线程最不爽的恐怕是不能强制结束线程。相比之下 ,Java语言的线程类支持Thread.interrupt(),当线程阻塞在Lock或者Event的时候仍然可以很方便地让线程退出。在Python里面,阻塞的调用多是直接调用C语言的系统函数,而不是像Java那样重写各种阻塞的IO。好处是Python的应用程序可能会拥有更好的IO性能,但是直接的坏处就是必须由用户自行处理阻塞函数。而且像Java那样重写阻塞IO的函数工作量太大,也给扩展Python解释器和移植工作带来很多的麻烦。
那么,既然不能强制性地结束线程,只好用一些迂回地办法,让线程自己退出。伪代码类似于这样:
class DoSomethingThread(threading.Thread):
daemon=True
def run(self):
while True:
self.doBlockIO()
if self.exiting:break
def shutdown(self):
self.exiting=True
self.interruptBlockIO()
#self.join()
有两个关键,一是每次在阻塞函数返回后都判断一下标志位,看是不是应该结束线程了。通常每个循环体内只有一个阻塞函数,所以把判断放在循环语句上面就行了。二是采用某种办法让阻塞函数返回。各种阻塞函数都不尽相同。
如果阻塞函数支持超时,那就方便了,直接在阻塞函数内传入超时时间,比如0.2秒,连self.interruptBlockIO()这一句都可以省略掉。Python默认创建的socket是永远阻塞的,可以使用socket.settimeout()来设置超时时间——要小心捕获socket.timeout异常。更好的办法是使用select模块,它不仅可以设置超时,还能够在一个线程内同时处理socket的读和写。socket.connect和socket.accept()都支持超时。threading模块的各种锁、信号都支持超时。
如果阻塞函数不支持超时,那就只好采用一些山寨办法了。假定socket.accept()不支持超时的话,我们可以在shutdown函数里面创建一个新socket连接自己监听的端口,让accept()函数返回。
如果调用的C函数确实没办法让它从阻塞状态退出,可以考虑使用multiprocessing模块。因为强制结束一个进程不是一个多大不了的事。不过multiprocessing与threading相比,交换数据的效率会低一点。
细心的朋友可能会发现上面的几段代码都设置了daemon属性。将这个属性设置为True可以保证线程在主线程退出后也会立即退出。一般说来,主线程是负责用户UI的,如果用户关闭了程序,线程继续运行有什么意义呢。这个属性其实挺常用的,不知道为什么默认值是False。
在使用多线程的时候还要注意import语句不支持多线程,也就是不能有多个线程同时在执行import语句。所以最好不要在模块导入过程中再启动一个新线程。比如模块这样写是不好的:
#encoding:utf-8
import threading, logging
logging.basicConfig()
logger=logging.getLogger(__name__)
def doSomething():
&做一项麻烦的工作,并写日志。&
logger.error(&doSomething&)
t=threading.Thread(target=doSomething)
这个mymodule写得不好。因为import mymodule将会发生死锁,原因是logger.error()的源代码内有一句import multiprocessing。如果主线程和doSomething()都执行到import那里,Python就会锁在import语句那里不会退出。于是t.join()会被阻塞,永远不会返回。事实上,我曾经发现即使去掉t.join()也有可能会出错,不过不知道怎么重现。一个比较好的写法是把线程的启动代码移进函数,不搞并发执行import语句就没关系了。
import mymodule
mymodule.doSomething()
经常使用PyQt的朋友可能会发现PyQt也有一个QThread,而且与threading.Thread相比,还多出了QThread.terminate(),它看起来能省却很多的麻烦。那么,写PyQt程序的时候到底应该选择哪一个呢?
我的看法,QThread不应被使用。
因为QThread是专为C++环境设计的,不适合Python程序。如果看过Qt文档的话,可以注意到QThread.terminate()本身也不是不推荐被使用的,如果一定要使用,就要调用QThread::setTerminationEnabled()设置可中断标志。如果C++代码仔细设置好可中断标志,QThread::terminate()就没有副作用。但是Python环境没办法像C++那样,在读写共享数据的时候设置可中断标志,除非你修改Python虚拟机,把这个调用整合到Python的源代码里面。
另外,当QThread没有被其它对象引用的时候,根据Python的内存管理模型,这个QThread会被删除。不幸的是,如果QThread的析构函数检测到线程是被强制结束的,它会打印出一行错误信息,然后结束整个进程。于是整个Python程序会意外地退出。显然,这种行为相当的不好。竟然不是抛出一个异常。如果你不想因为一个小小的可挽回的错误导致整个程序失败的话,最好不要使用QThread。
另外,尽量不要在非主线程使用QObject的对象。Qt的文档说,QObject与QThread有特别的关联。有以下几个注意事项:
只有创建QObject的进程才能使用它。不能在一个线程里面创建QTimer,而在另外一个线程里面调用QTimer.start()。
在一个线程里面创建的QObject不能在另外一个线程里面被销毁。
Qt的内存管理模型区别“父QObject”和“子QObject”。Qt要求“子QObject”必须和“父QObject”同一个线程。
又一次很不幸,第2条和Python的GC有冲突。Python的GC不固定地在某个线程里面运行。如果刚好回收了一个不在当前线程里面创建的QObject,程序就有可能会崩溃。注:貌似PyQt的开发者提到会解决这个问题,不知道现在怎么样了。
标题无“转载”即原创文章,版权所有。转载请注明来源:/blogs/article/80/。
cnDenis(三月 17,
老鱼兄,你的Blog有RSS输出吗?我想用GR订着看
老鱼(三月 18,
没有哦。。等我有空加上去。
shelper(三月 28,
这个rss是必须的:)Python多线程编程
1.全局解释器锁定& & Python虚拟机使用GIL(Global Interpreter Lock,全局解释器锁定)来互斥线程对共享资源的访问,暂时无法利用多处理器的优势。虽然python解释器可以&运行&多个线程,但在任意时刻,不管有多少的处理器,任何时候都总是只有一个线程在执行。对于I/O密集型任务,使用线程一般是没有问题的,而对于涉及大量CPU计算的应用程序而言,使用线程来细分工作没有任何好处,用户最好使用子进程和消息传递。2.threading& & &python的threading模块提供Thread类和各种同步原语,用于编写多线程的程序。2.1. Thread(target=None,name=None,args=(),kwargs={})& & 此函数创建一个Thread实例。target是一个可调用函数,线程启动时,run()方法将调用此对象。name是线程的名称,默认是'Thread-N'的格式。args是传递给target函数的参数元组,kwargs是传递给target函数的关键字参数的字典。& & Thread实例t支持以下方法和属性:t.start() & & & & & & & & 启动线程,就是调用run()方法t.run() & & & & & & & & & 可以在Thread的子类中重新定义t.join([timeout]) & & &阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数)。t.is_live() & & & & & & & &返回线程的活动状态t.name & & & & & & & & &&线程的名称t.ident & & & & & & & & & &线程标识符t.daemon & & & & & & & &设置线程是否为守护线程,必须在t.start()前设置。当设置为True时,主线程要退出时,不必等守护线程完成。'& & 创建线程有两种方法:创建一个Thread实例,传递给它一个函数import threadingimport timedef clock(nsec):
whhile True:
print 'Now is %s'%time.ctime()
time.sleep(nsec)t=threading.Thread(target=clock,args=(5,))t.daemon=True
#设置为守护线程t.start()& & &2. 从Thread派生出一个子类,然后创建一个子类的实例import threadingimport timeclass ClockThread(threading.Thread):
def __init__(self,nsec):
threading.Thread.__init__(self)
self.daemon=True
#设置为守护线程
self.nsec=nsec
def run():
while True:
print 'Now is s%'%time.ctime()
time.sleep(self.nsec)t=ClockThread(5)t.start()  后一种方法比较python一点。& & & 由于线程会无限循环,所以设置daemon为True,这样当进程结束时,线程也将被销毁。& & & 例如有个数数程序,一个线程从1数到9,另一个线程从a数到j,每个线程都要耗费9s,如果要顺序执行的话需耗费18s。import threadingimport timeclass CountThread(threading.Thread):
def __init__(self,func,name):
threading.Thread.__init__(self)
self.name=str(name)
self.func=func
def run(self):
apply(self.func)def numcount():
print threading.currentThread().name,'start at : ',time.ctime()
for i in range(10):
time.sleep(1)
print threading.currentThread().name,'done at : ',time.ctime()def alphacount():
print threading.currentThread().name,'start at : ',time.ctime()
for i in range(97,107):
print chr(i)
time.sleep(1)
print threading.currentThread().getName(),'done at : ',time.ctime()def main():
funclist=[numcount,alphacount]
threads=[]
for i in funclist:
t=CountThread(i,i.__name__)
threads.append(t)
for t in threads:
for t in threads:
print 'All done at :',time.ctime()if __name__=='__main__':
main()  结果:numcount
start at :
Fri Feb 07 12:19:28alphacount
start at : Fri Feb 07 12:19:28 c3d4 e5f6g7 h8 i9jalphacount numcount
done at : Fri Feb 07 12:19:38 2014 Fri Feb 07 12:19:38 2014All done at : Fri Feb 07 12:19:38 2014& &&10s就完成了。& & 举一个更清晰的看t.join()作用的例子:import threadingimport timedef join():
print 'in Threadjoin'
time.sleep(1)
print 'out Threadjoin'Threadjoin=threading.Thread(target=join,name='Threadjoin')def context(Threadjoin):
print 'in Threadcontext'
Threadjoin.start()
Threadjoin.join()
#Threadjoin线程开始阻塞,等待Threadjoin完成
print 'out Threadcontext'Threadcontext=threading.Thread(target=context,name='Threadcontext',args=(Threadjoin,))Threadcontext.start()  结果:&&& in Threadcontextin Threadjoinout Threadjoinout Threadcontext2.2. 线程的同步& & 线程运行在创建它的进程内部,共享所有的数据和资源,但都有自己独立的栈和堆。编写并发编程的难点在于同步和访问共享数据。多个任务同时更新一个数据结构可能导致数据损坏和程序状态不一致(也就是竞争条件)。要解决这个问题,必须找出程序的关键代码段,并使用互斥锁和其它类似的同步手段保护他们。2.2.1 Lock& &&原语锁定(互斥锁定)是一个同步原语,状态是"已锁定"或"未锁定"之一。两个方法acquire()和release()用于修改锁定的状态。如果有多个线程在等待获取锁定,当锁定释放时,只有一个线程能获得它。构造方法:&Lock() &:创建新的Lock对象,初始状态为未锁定实例方法:&Lock.acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。 成功获取锁定返回True,无法获取锁定返回False。Lock.release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。& & Python多线程分块读取大文件:import threadingimport osseekposition=0blocksize=1000000filesize=0def getFilesize(filename):
f=open(filename)
f.seek(0,os.SSEK_END)
filesize=f.tell()
return filesizedef parsefile(filename):
global seekposition,filesize
f=open(filename)
while True:
lock.acquire()
#seekposition是线程共享的,修改时需要锁定
startposition=seekposition
endposition=(startposition+blocksize) if (startposition+blocksize)&filesize else filesize
seekposition=endposition
lock.release()
if startposition==filesize:
elif startposition&0:
f.seek(startposition)
f.readline()
#分成的block第一行可能不是完整的一行,略掉不处理,而是作为上一个block的最后一行处理
position=f.tell()
outfile=open(str(endposition)+'.txt','w')
while position&=endposition:
line=f.readline()
outfile.write(line)
position=f.tell()
outfile.close()
f.close()def main(filename):
global seekposition,filesize
filesize=getFilesize(filename)
lock=threading.Lock()
threads=[]
for i in range(4):
t=threading.Thread(target=parsefile,args=(filename,))
threads.append(t)
for t in threads:
for t in threads:
t.join()if __name__=='__main__':
filename=''
main(filename)2.2.2 RLock& & 多重锁定是一个类似于Lock对象的同步原语,但同一个线程可以多次获取它。这允许拥有锁定的线程执行嵌套的acquire()和release()操作。可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。import threadingimport timerlock=threading.RLock()count=0class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global count
if rlock.acquire():
print '%s set count : %d'%(self.name,count)
time.sleep(1)
if rlock.acquire():
print '%s set count : %d'%(self.name,count)
time.sleep(1)
rlock.release()
rlock.release()if __name__=='__main__':
for i in range(5):
t=MyThread()
t.start()2.2.3 信号量Semaphore& & 信号量是一个基于计数器的同步原语,调用acquire()方法时此计数器减1,调用release()方法时此计数器加1.如果计数器为0,acquire()方法会被阻塞,直到其他线程调用release()为止。& & 下面是一个说明信号量的好例子,引用自/huxi/archive//1765808.htmlimport threadingimport time semaphore = threading.Semaphore(2)
# 计数器初值为2 def func():
# 请求Semaphore,成功后计数器-1;计数器为0时阻塞
print '%s acquire semaphore...' % threading.currentThread().getName()
if semaphore.acquire():
print '%s get semaphore' % threading.currentThread().getName()
time.sleep(4)
# 释放Semaphore,计数器+1
print '%s release semaphore' % threading.currentThread().getName()
semaphore.release() t1 = threading.Thread(target=func)t2 = threading.Thread(target=func)t3 = threading.Thread(target=func)t4 = threading.Thread(target=func)t1.start()t2.start()t3.start()t4.start() time.sleep(2) # 没有获得semaphore的主线程也可以调用release# 若使用BoundedSemaphore,t4释放semaphore时将抛出异常print 'MainThread release semaphore without acquire'semaphore.release()2.2.4 Condition& & 条件变量是构建在另一个锁定上的同步原语。典型的用法是生产者-使用者问题,其中一个线程生产的数据供另一个线程使用。& & 构造方法:& & Condition([lock/rlock])& & 实例方法:&& & acquire([timeout])/release(): 调用关联的锁的相应方法。&& & wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁,直到另一个线程在条件变量上执行notify()或notify_all()方法将其唤醒为止。使用前线程必须已获得锁定,否则将抛出异常。&& & notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。&& & notify_all(): 唤醒所有等待此条件的线程。import threadingcv=threading.Condition()alist=[]def producer():
global alist
cv.acquire()
for i in range(10):
alist.append(i)
cv.notify()
cv.release()
def consumer():
cv.acquire()
while alist is None:
cv.release()
print alisttproducer = threading.Thread(target=producer)tconsumer = threading.Thread(target=consumer)tconsumer.start()tproducer.start()2.3 local()& & 返回local对象,用于保存线程的数据,管理 thread-local(线程局部的)数据。对于同一个local,线程无法访问其他线程设置的属性;线程设置的属性不会被其他线程设置的同名属性替换。&可以把local看成是一个&线程-属性字典&的字典,local封装了从自身使用线程作为 key检索对应的属性字典、再使用属性名作为key检索属性值的细节。 import threading mydata=threading.local()mydata.number=42mydata.color='red'print mydata.__dict__log=[]def foo():
items=mydata.__dict__.items()
#在此线程中mydata属性字典为空,无number与color属性
items.sort()
log.append(items)
mydata.number=11
log.append(mydata.number)t=threading.Thread(target=foo)t.start()t.join()print logprint mydata.number
#仍为423. Queue& & 尽管在Python中可以使用各种锁定和同步原语的组合编写非常传统的多线程程序,但有一种更优的编程方式&&即将多线程程序组织为多个独立任务的集合,这些线程之间通过消息队列进行通讯。Queue模块可以用于线程间通讯,让各个线程共享数据。& & 构造方法:& & Queue():创建一个FIFO队列& & LifoQueue():创建一个LIFO栈& & 实例方法:&& & q.put(item):将item放入队列& & q.get():从队列中删除一项,然后返回这个项目& & q.task_done():队列中数据的使用者用来指示对于项目的处理已结束。从队列中删除的每一项都应该调用一次。& & q.join():阻塞直到队列中的所有项目均被处理为止。& &&python核心编程中有关于多线程编程和Queue结合使用的思路:UserThread:负责读取客户的输入,可能是一个I/O通道。程序可以创建多个线程,每个客户一个,输入放置到队列中。ReplyThread:负责把用户的输入取出来。import threadingimport Queueq=Queue.Queue()class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.daemon=True
def run(self):
while True:
item=q.get()
print threading.current_thread().name,'get',item
q.task_done()for i in range(4):
t=MyThread()
t.start()for i in range(100):
q.put(i)q.join()4. 线程终止与挂起& & 下面选自《Python参考手册》& & 线程没有任何方法可用于强制终止或挂起。由于在设计上,如果某个线程获取了锁定,在它释放之前强制终止线程,将导致整个应用程序出现死锁。& & 可以自己构建终止功能:import threadingclass MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self._terminate=False
#设置终止标志
self.lock=threading.Lock()
def terminal(self):
self._terminal=True
def acquire(self):
self.lock.acquire()
def release(self):
self.lock.release()
def run(self):
while True:
if self._terminal:
#标志为True,则终止线程
self.lock.acquire()
statements
self.lock.release()
statements  也可以利用Queue传递终止信号import threadingimport Queueclass MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.queue=Queue.Queue()
def send(self,item):
self.queue.put(item)
def close(self):
self.queue.put(None)
self.queue.join()
def run(self):
while True:
item=self.queue.get()
if item is None:
print item
self.queue.task_done()
self.queue.task_done()t=MyThread()t.start()t.send('hello')t.send('world')t.close()  
最新教程周点击榜
微信扫一扫

我要回帖

更多关于 python for循环多线程 的文章

 

随机推荐