excutor c 使用task开启线程池池为啥 remove (task )失败?

博客分类:
深入研究线程池
一.什么是线程池?
线程池就是以一个或多个线程[循环执行]多个应用逻辑的线程集合.
注意这里用了线程集合的概念是我生造的,目的是为了区分执行一批应用逻辑的多个线程和
线程组的区别.关于线程组的概念请参阅基础部分.
一般而言,线程池有以下几个部分:
1.完成主要任务的一个或多个线程.
2.用于调度管理的管理线程.
3.要求执行的任务队列.
那么如果一个线程循环执行一段代码是否是线程池?
如果极端而言,应该算,但实际上循环代码应该算上一个逻辑单元.我们说最最弱化的线程池
应该是循环执行多个逻辑单元.也就是有一批要执行的任务,这些任务被独立为多个不同的执行
单元.比如:
int x = 0;
while(true){
x ++;
}
这就不能说循环中执行多个逻辑单元,因为它只是简单地对循环外部的初始变量执行++操作.
而如果已经有一个队列
ArrayList al = new ArrayList();
for(int i=0;i&10000;i++){
al.add(new AClass());
}
然后在一个线程中执行:
while(al.size() != 0){
AClass a = (AClass)al.remove(0);
a.businessMethod();
}
我们说这个线程就是循环执行多个逻辑单元.可以说这个线程是弱化的线程池.我们习惯上把这些
相对独立的逻辑单元称为任务.
二.为什么要创建线程池?
线程池属于对象池.所有对象池都具有一个非常重要的共性,就是为了最大程度复用对象.那么
线程池的最重要的特征也就是最大程度利用线程.
从编程模型模型上说讲,在处理多任务时,每个任务一个线程是非常好的模型.如果确实可以这么
做我们将可以使用编程模型更清楚,更优化.但是在实际应用中,每个任务一个线程会使用系统限
入"过度切换"和"过度开销"的泥潭.
打个比方,如果可能,生活中每个人一辆房车,上面有休息,娱乐,餐饮等生活措施.而且道路交道永远
不堵车,那是多么美好的梦中王国啊.可是残酷的现实告诉我们,那是不可能的.不仅每个人一辆车
需要无数多的社会资源,而且地球上所能容纳的车辆总数是有限制的.
首先,创建线程本身需要额外(相对于执行任务而必须的资源)的开销.
作业系统在每创建一个线程时,至少需要创建以下资源:
线程内核对象用于对线程上下文的管理.
用户模式执行栈.
内核模式执行栈.
这些资源被线程占有后作业系统和用户都无法使用.
相反的过程,销毁线程需要回收资源,也需要一定开销.
其次,过多的线程将导致过度的切换.
线程切换带来的性能更是不可估量.系统完成线程切换要经过以下过程:
从用户模式切换到内核模式.
将CPU寄存器的值保存到当前线程的内核对象中.
打开一个自旋锁,根据调度策略决定下一个要执行的线程.释放自旋锁,如果要执行的线程不是同一
进程中的线程,还需要切换虚拟内存等进程环境.
将要执行的线程的内核对象的值写到CPU寄存器中.
切换到用户模式执行新线程的执行逻辑.
以上开销对于用户要执行的任务而言,都是额外的.更不可容忍的是,如果用户的任务逻辑都是很小
的单元,而新分配线程和线程切换的开销与任务逻辑需要的开销的比例可能会10:1,100:1,1000:1.
也就是你花了1000$买的衣服只穿了一天!
所以线程池的目的就是为了减少创建和切换线程的额外开销,利用已经的线程多次循环执行多个任
务从而提高系统的处理能力.也就是在"社会主义初级阶段"一件衣服应该尽量多穿一些天数.
[扩展知识]
尽管目前绝大多数JVM实现都是一个Java线程对应一个作业系统线程,但事实上(如果是我来实现JVM)
完全可以用一个作业系统线程执行多个Java线程,因为对于作业系统线程来说Java线程就是一个任务.
而且无论是作业系统线程或Java线程中都可以更细地划分为超细线程(纤程),即在线程内部实现对纤
程的调度利用纤程来执行任务.
三.如何实现线程池?
一个线程池至少应该具有以下几个方面的功能:
1.提供一个任务接口以便用户加入任务
由于Java不支持方法指针,所以操作(方法)只能绑定在对象上,将拥有操作的队象放入队列中,以便
工作线程能按一定策略获取对象然后执行其上的方法.
这里需要有两个组件,一是规定操作的任务接口:
interface ITask{
public void task();
}
一是存放操作的容器.容器可以根据调度策略来选择或自己实现,比如一个最简单的FIFO策略的容器
可以用LinkedList来实现.需要注意的是对这个容器的存取需要同步:
class TaskList{
&&& private LinkedList&ITask& tl = new LinkedList();
&&& public synchronized void addTask(ITask task){
&&&&&&& this.tl.add(task);
//notifyAll();
&&& }
&&& //加上addFistTask和addLastTask
&&& public synchronized ITask getTask(){
if(this.size() &= 0)
wait();
&&&&&&& return this.tl.poll();
&&& }
&&& //加上getFistTask和getLastTask
&&& public synchronized int getCount(){
return this.tl.size();
&&& }
&&& public synchronized void removeAll(){
this.tl.clear();
&&& }
}
加上addFistTask和addLastTask/getFistTask和getLastTask实现就可以实现简单的优先级调度.
2.工作线程
我们把用来执行用户任务的线程称为工作线程,工作线程就是不断从队列中获取任务对象并执行对象
上的业务方法:
class WorkThread extends Thread{
&&& private TaskL
&&& //多个工作线程共同从一个任务队列中获取任务,所以要从外面传入一个任务队列.
&&& public WorkThread(String name,TaskList list){
&&&&& super(name);
&&&&& this.list =
&&& }
&&& public void run(){
&&&&& while(true){&&&&
&&&&&&& ITask task =
&&&&&&& task = list.getTask();
&&&&&&& if(task != null) task.task();
&&&&& }
&&& }
}
在基础知识部分已经介绍了对线程状态的标记控制,请参照前面的知识自己加入对线线程状态的判断.
完成了这几个基础部分,就需要有一个对工作线程的调度线程.完成以下几个功能:
1.生成需要的工作线程.由于创建线程需要一定的开销,一定要注意所创建的所有线程不能超一个设定
的最大值.建议最大值不要超25.
2.动态自适应调整集合中线程数.当有太多的线程处于闲置状态时(队列中没有任务),应该按一定比例
销毁闲置了一定时的线程.如果队列中任务队列积压太多而工作线程总数没有超最大线程数时应该及时
创建工作线程直至达到是大值.
3.需要一个专门的后台线程定时扫描队列中任务与正在工作的线程总数,闲置的线程总数.
以上功能有不同优化方法实现,可以参考JDK的线程池实现.
/**ThreadPoolTaskExecutor的配置在网上找了很多解释没找到,看了下ThreadPoolExecutor的配置,名字差不多,应该含义也差不多。只不过ThreadPoolTaskExecutor对*/
ThreadPoolExecutor做了包装。
&bean id ="taskExecutor"
class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" &
&property name ="corePoolSize" value ="5" /&
&property name ="keepAliveSeconds" value ="300" /&
&property name ="maxPoolSize" value ="10" /&
&property name ="queueCapacity" value ="25" /&
线程的配置文件:
corePoolSize: 线程池维护线程的最少数量
keepAliveSeconds
线程池维护线程所允许的空闲时间
maxPoolSize
线程池维护线程的最大数量
queueCapacity 线程池所使用的缓冲队列
当一个任务通过execute(Runnable)方法欲添加到线程池时:
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程 maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
浏览: 52871 次
来自: 南京
真心谢谢,解决了我头疼好几天的问题,谢谢谢谢
这样不就得要求 被请求一方必须按您的这种XML格式解析吗。
可以演示一下如何调用吗。不是很明白呢。
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'&nbsp>&nbsp
&nbsp>&nbsp
&nbsp>&nbsp
Android中线程池使用ExecutorService
摘要:关于Android中使用线程池对性能的优化以及线程池的原理,作用能理论,这里有一篇文章说得很透彻:http://android.jobbole.com/82092/本文旨在介绍Android中使用ExecutorService实现线程池及使用Runnable的Queue(建议在阅读本文之前先阅读上面文章补一下理论知识,老司机可忽略本提示~~~)。1.执行多个AsyncTask首先,Executors工厂类提供了五种功能不同的线程池,在附件demo的ExcutorTask类的构
关于Android中使用线程池对性能的优化以及线程池的原理,作用能理论,这里有一篇文章说得很透彻:http://android.jobbole.com/82092/
本文旨在介绍Android中使用
ExecutorService实现线程池及使用Runnable的Queue(建议在阅读本文之前先阅读上面文章补一下理论知识,老司机可忽略本提示~~~)。1.执行多个AsyncTask首先,Executors工厂类提供了五种功能不同的线程池,在附件demo的ExcutorTask类的构造方法中,分别创建了这相应的对象:public ExcutorTask() { // 每次只执行一条任务的线程池 singleTaskService = Executors.newSingleThreadExecutor(); // 每次执行限定个数任务的线程池 limitedTaskService = Executors.newFixedThreadPool(3); // 根据实际情况调整线程池中线程的数量的线程池 allTaskService = Executors.newCachedThreadPool(); // 指定时间里执行任务的线程池,可重复使用 scheduledTaskService = Executors.newScheduledThreadPool(3); // 返回一个可以控制线程池内线程定时或周期性执行某任务的线程池。只不过和上面的区别是该线程池大小为1 singleScheduledTaskService = Executors.newSingleThreadScheduledExecutor();}另外,该类提供了可以获取当前线程池的方法,线程池类型可以通过调用setServiceType(SERVICE_TYPE type)方法进行修改。public ExecutorService getExecutor() { if (mExecutor != null) return mExecutor; ExecutorService executor = limitedTaskService; switch (type) { case SINGLE: executor = singleTaskService; case LIMITED: executor = limitedTaskService; case ALL: executor = allTaskService; case SCHEDULED: executor = scheduledTaskService; case SINGLE_SCHEDULED: executor = singleScheduledTaskService; default:
} return executor;}附件的demo中的线程池中是同时执行的就是多个AsyncTask任务,AsyncTask使用ExcutorTask的方法也非常简单:AsyTaskItem asyitem = new AsyTaskItem(item, position);//添加任务asyitem.executeOnExecutor(task.getExecutor());list.add(asyitem);运行效果如下图所示,实例中每次执行3个任务,执行完成以后,会继续执行下面的任务。2.异步加载多张图片该功能的实现封装在AsyncImageLoader中private ExecutorService executorService = Executors.newFixedThreadPool(3);该类使用了newFixedThreadPool实现了可以同时执行3个任务的线程池,并实现了使用SoftReference实现的软缓存保存Drawable对象;public Map&String, SoftReference&Drawable&& imageCache = new HashMap&String, SoftReference&Drawable&&();下面是加载图片的方法,如果缓存中有对应Drawable对象,直接从缓存中加载,否则从链接获取的流中解析出Drawable对象使用。public void loadImage(final String url, final ImageCallback callback, final ImageView imageView) { if (imageCache.containsKey(url)) {//有缓存 SoftReference&Drawable& soft = imageCache.get(url); if (null != imageView) imageView.setImageDrawable(soft.get()); if (callback != null) { callback.onImageLoaded(soft.get()); } } else { executorService.execute(new Runnable() { @Override public void run() { final Drawable drawable = loadFromURl(url); imageCache.put(url, new SoftReference&Drawable&(drawable)); handler.post(new Runnable() { @Override public void run() { if (null != imageView) imageView.setImageDrawable(drawable); if (callback != null) { callback.onImageLoaded(drawable); } } }); } }); }代码执行效果如下图所示:3.RunnableQueue该类实现了Runnable在Queue中列队执行的功能,可以通过addTask方法向Queue中添加任务:public void addTask(final T mr) { if (mES == null) { mES = Executors.newSingleThreadExecutor(); notifyWork(); } if (taskQueue == null) { taskQueue = new ConcurrentLinkedQueue&&(); } if (taskMap == null) { taskMap = new ConcurrentHashMap&&(); } mES.execute(new Runnable() { @Override public void run() { /** * 插入一个Runnable到任务队列中
* */ taskQueue.offer(mr); // taskQueue.add(mr); notifyWork(); } });}通过调用start方法可以执行列队中的任务,直到taskQueue.poll()返回nullpublic void start() { Log.v(TAG, &Queue Size --& & + taskQueue.size()); if (mES == null || taskQueue == null || taskMap == null) { Log.i(&KKK&, &某资源是不是已经被释放了?&); } mES.execute(new Runnable() { @Override public void run() { if (isRuning) { T myRunnable =
synchronized (lock) { // 从线程队列中取出一个Runnable对象来执行,如果此队列为空,则调用poll()方法会返回null myRunnable = taskQueue.poll();
if (myRunnable == null) { isNotify =
} } if (myRunnable != null) { taskMap.put(mES.submit(myRunnable), myRunnable); } } } });}这里需要注意一下,在Runnable的run()方法中,当耗时的任务完成以后,要调用RunnableQueue中的remove()方法将当前任务从列队中移除。
以上是的内容,更多
的内容,请您使用右上方搜索功能获取相关信息。
若你要投稿、删除文章请联系邮箱:zixun-group@service.aliyun.com,工作人员会在五个工作日内给你回复。
云服务器 ECS
可弹性伸缩、安全稳定、简单易用
&40.8元/月起
预测未发生的攻击
&24元/月起
邮箱低至5折
推荐购买再奖现金,最高25%
&200元/3月起
你可能还喜欢
你可能感兴趣
阿里云教程中心为您免费提供
Android中线程池使用ExecutorService相关信息,包括
的信息,所有Android中线程池使用ExecutorService相关内容均不代表阿里云的意见!投稿删除文章请联系邮箱:zixun-group@service.aliyun.com,工作人员会在五个工作日内答复
售前咨询热线
支持与服务
资源和社区
关注阿里云
International51CTO旗下网站
Java并发:juc Executor框架详解
Executor 框架是 juc 里提供的线程池的实现。前两天看了下 Executor 框架的一些源码,做个简单的总结。
作者:singleant来源:singleant的博客| 11:01
Executor 框架是 juc 里提供的线程池的实现。前两天看了下 Executor 框架的一些源码,做个简单的总结。
线程池大概的思路是维护一个的线程池用于执行提交的任务。我理解池的技术的主要意义有两个:
1. 资源的控制,如并发量限制。像连接池这种是对数据库资源的保护。
2. 资源的有效利用,如线程复用,避免频繁创建线程和线程上下文切换。
那么想象中设计一个线程池就需要有线程池大小、线程生命周期管理、等待队列等等功能,下面结合代码看看原理。
Excutor 整体结构如下:
Executor 接口定义了最基本的 execute 方法,用于接收用户提交任务。 ExecutorService 定义了线程池终止和创建及提交 futureTask 任务支持的方法。
AbstractExecutorService 是抽象类,主要实现了 ExecutorService 和 futureTask 相关的一些任务创建和提交的方法。
ThreadPoolExecutor 是最核心的一个类,是线程池的内部实现。线程池的功能都在这里实现了,平时用的最多的基本就是这个了。其源码很精练,远没当时想象的多。
ScheduledThreadPoolExecutor 在 ThreadPoolExecutor 的基础上提供了支持定时调度的功能。线程任务可以在一定延时时间后才被触发执行。
1、ThreadPoolExecutor 原理
1.1 ThreadPoolExecutor内部的几个重要属性
1.线程池本身的状态
volatile&int&runS& &static&final&int&RUNNING&=&0;& &static&final&int&SHUTDOWN&=&1;& &static&final&int&STOP&=&2;& &static&final&int&TERMINATED&=&3;&&
2.等待任务队列和工作集
private&final&BlockingQueue&Runnable&&workQ&&private&final&HashSet&Worker&&workers&=&new&HashSet&Worker&();&&
3.线程池的主要状态锁。线程池内部的状态变化 ( 如线程大小 ) 都需要基于此锁。
private&final&ReentrantLock&mainLock&=&new&ReentrantLock();&
4.线程的存活时间和大小
private&volatile&long&keepAliveT&private&volatile&boolean&allowCoreThreadTimeO&private&volatile&int&corePoolS&private&volatile&int&maximumPoolS&&private&volatile&int&poolS&&private&int&largestPoolS&&
5.线程工厂和拒绝策略
private&volatile&RejectedExecutionHandler&&&private&volatile&ThreadFactory&threadF&
6.线程池完成任务数
private&long&completedTaskC&
1.2 ThreadPoolExecutor 的内部工作原理
有了以上定义好的数据,下面来看看内部是如何实现的 。 Doug Lea 的整个思路总结起来就是 5 句话:
如果当前池大小 poolSize 小于 corePoolSize ,则创建新线程执行任务。
如果当前池大小 poolSize 大于 corePoolSize ,且等待队列未满,则进入等待队列
如果当前池大小 poolSize 大于 corePoolSize 且小于 maximumPoolSize ,且等待队列已满,则创建新线程执行任务。
如果当前池大小 poolSize 大于 corePoolSize 且大于 maximumPoolSize ,且等待队列已满,则调用拒绝策略来处理该任务。
线程池里的每个线程执行完任务后不会立刻退出,而是会去检查下等待队列里是否还有线程任务需要执行,如果在 keepAliveTime 里等不到新的任务了,那么线程就会退出。
下面看看代码实现:
线程池最重要的方法是由 Executor 接口定义的 execute 方法 , 是任务提交的入口。
我们看看 ThreadPoolExecutor.execute(Runnable cmd) 的实现:
public&void&execute(Runnable&command)&{ &&&&&&&&&if&(command&==&null) &&&&&&&&&&&&&throw&new&NullPointerException(); &&&&&&&&&if&(poolSize&&=&corePoolSize&||&!addIfUnderCorePoolSize(command))&{ &&&&&&&&&&&&&if&(runState&==&RUNNING&&&&workQueue.offer(command))&{ &&&&&&&&&&&&&&&&&if&(runState&!=&RUNNING&||&poolSize&==&0) &&&&&&&&&&&&&&&&&&&&&ensureQueuedTaskHandled(command); &&&&&&&&&&&&&} &&&&&&&&&&&&&else&if&(!addIfUnderMaximumPoolSize(command)) &&&&&&&&&&&&&&&&&reject(command);&&&&&&&&&&} &} &
解释如下:
当提交一个新的 Runnable 任务:
分支1 : 如果当前池大小小于 corePoolSize, 执行 addIfUnderCorePoolSize(command) , 如果线程池处于运行状态且 poolSize & corePoolSize addIfUnderCorePoolSize(command) 会做如下事情,将 Runnable 任务封装成 Worker 任务 , 创建新的 Thread ,执行 Worker 任务。如果不满足条件,则返回 false 。
代码如下:
&private&boolean&addIfUnderCorePoolSize(Runnable&firstTask)&{ &&&&&&&&&Thread&t&=&null; &&&&&&&&&final&ReentrantLock&mainLock&=&this.mainL &&&&&&&&&mainLock.lock(); &&&&&&&&&try&{ &&&&&&&&&&&&&if&(poolSize&&&corePoolSize&&&&runState&==&RUNNING) &&&&&&&&&&&&&&&&&t&=&addThread(firstTask); &&&&&&&&&}&finally&{ &&&&&&&&&&&&&mainLock.unlock(); &&&&&&&&&} &&&&&&&&&if&(t&==&null) &&&&&&&&&&&&&return&false; &&&&&&&&&t.start(); &&&&&&&&&return&true; &} &
分支2 : 如果大于 corePoolSize 或 1 失败失败,则:
如果等待队列未满,把 Runnable 任务加入到 workQueue 等待队列
workQueue .offer(command)
如多等待队列已经满了,调用 addIfUnderMaximumPoolSize(command) ,和 addIfUnderCorePoolSize 基本类似,只不过判断条件是 poolSize & maximumPoolSize 。如果大于 maximumPoolSize ,则把 Runnable 任务交由 RejectedExecutionHandler 来处理。
问题:如何实现线程的复用?
Doug Lea 的实现思路是 线程池里的每个线程执行完任务后不立刻退出,而是去检查下等待队列里是否还有线程任务需要执行,如果在 keepAliveTime 里等不到新的任务了,那么线程就会退出。这个功能的实现 关键在于 Worker 。线程池在执行 Runnable 任务的时候,并不单纯把 Runnable 任务交给创建一个 Thread 。而是会把 Runnable 任务封装成 Worker 任务。
下面看看 Worker 的实现:
代码很简单,可以看出, worker 里面包装了 firstTask 属性,在构造worker 的时候传进来的那个 Runnable 任务就是 firstTask 。 同时也实现了Runnable接口,所以是个代理模式,看看代理增加了哪些功能。 关键看 woker 的 run 方法:
public&void&run()&{ &&&&&&&&&&&&try&{ &&&&&&&&&&&&&&&&Runnable&task&=&firstT &&&&&&&&&&&&&&&&firstTask&=&null; &&&&&&&&&&&&&&&&while&(task&!=&null&||&(task&=&getTask())&!=&null)&{ &&&&&&&&&&&&&&&&&&&&runTask(task); &&&&&&&&&&&&&&&&&&&&task&=&null; &&&&&&&&&&&&&&&&} &&&&&&&&&&&&}&finally&{ &&&&&&&&&&&&&&&&workerDone(this); &&&&&&&&&&&&} &&&&&&&&} &
可以看出 worker 的 run 方法是一个循环,第一次循环运行的必然是 firstTask ,在运行完 firstTask 之后,并不会立刻结束,而是会调用 getTask 获取新的任务( getTask 会从等待队列里获取等待中的任务),如果 keepAliveTime 时间内得到新任务则继续执行,得不到新任务则那么线程才会退出。这样就保证了多个任务可以复用一个线程,而不是每次都创建新任务。 keepAliveTime 的逻辑在哪里实现的呢?主要是利用了 BlockingQueue 的 poll 方法支持等待。可看 getTask 的代码段:
if&(state&==&SHUTDOWN)&&&&&&&r&=&workQueue.poll(); &else&if&(poolSize&&&corePoolSize&||&allowCoreThreadTimeOut) &&&&&r&=&workQueue.poll(keepAliveTime,&TimeUnit.NANOSECONDS); &else&&&&&r&=&workQueue.take(); &
2.ThreadFactory 和R ejectedExecutionHandler
ThreadFactory 和 RejectedExecutionHandler是ThreadPoolExecutor的两个属性,也 可以认为是两个简单的扩展点 . ThreadFactory 是创建线程的工厂。
默认的线程工厂会创建一个带有& pool-poolNumber-thread-threadNumber &为名字的线程,如果我们有特别的需要,如线程组命名、优先级等,可以定制自己的 ThreadFactory 。
RejectedExecutionHandler 是拒绝的策略。常见有以下几种:
AbortPolicy :不执行,会抛出 RejectedExecutionException 异常。
CallerRunsPolicy :由调用者(调用线程池的主线程)执行。
DiscardOldestPolicy :抛弃等待队列中最老的。
DiscardPolicy: 不做任何处理,即抛弃当前任务。
3.ScheduledThreadPoolExecutor
ScheduleThreadPoolExecutor 是对ThreadPoolExecutor的集成。增加了定时触发线程任务的功能。需要注意
从内部实现看, ScheduleThreadPoolExecutor 使用的是 corePoolSize 线程和一个无界队列的固定大小的池,所以调整 maximumPoolSize 没有效果。无界队列是一个内部自定义的 DelayedWorkQueue 。
ScheduleThreadPoolExecutor 线程池接收定时任务的方法是 schedule ,看看内部实现:
public&ScheduledFuture&?&&schedule(Runnable&command, &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&long&delay, &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&TimeUnit&unit)&{ &&&&&if&(command&==&null&||&unit&==&null) &&&&&&&&&throw&new&NullPointerException(); &&&&&RunnableScheduledFuture&?&&t&=&decorateTask(command, &&&&&&&&&new&ScheduledFutureTask&Void&(command,&null, &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&triggerTime(delay,&unit))); &&&&&&delayedExecute(t); &&&&&return&t; &} &
以上代码会初始化一个 RunnableScheduledFuture 类型的任务 t, 并交给 delayedExecute 方法。 delayedExecute(t) 方法实现如下:
&&&&private&void&delayedExecute(Runnable&command)&{ &&&&&&&&&if&(isShutdown())&{ &&&&&&&&&&&&&reject(command); &&&&&&&&&&&&&return; &&&&&&&&&} &&&&&&&&&if&(getPoolSize()&&&getCorePoolSize()) &&&&&&&&&&&&&prestartCoreThread(); &&&&&&&&&&super.getQueue().add(command); &} &
如果当前线程池大小 poolSize 小于 CorePoolSize ,则创建一个新的线程,注意这里创建的线程是空的,不会把任务直接交给线程来做,而是把线程任务放到队列里。因为任务是要定时触发的,所以不能直接交给线程去执行。
问题: 那如何做到定时触发呢?
关键在于DelayedWorkQueue,它代理了 DelayQueue 。可以认为 DelayQueue 是这样一个队列(具体可以去看下源码,不详细分析):
1. 队列里的元素按照任务的 delay 时间长短升序排序, delay 时间短的在队头, delay 时间长的在队尾。
2. 从 DelayQueue 里 FIFO 的获取一个元素的时候,不会直接返回 head 。可能会阻塞,等到 head 节点到达 delay 时间后才能被获取。可以看下 DelayQueue 的 take 方法实现:
public&E&take()&throws&InterruptedException&{ &&&&&final&ReentrantLock&lock&=&this. &&&&&lock.lockInterruptibly(); &&&&&try&{ &&&&&&&&&for&(;;)&{ &&&&&&&&&&&&&E&first&=&q.peek(); &&&&&&&&&&&&&if&(first&==&null)&{ &&&&&&&&&&&&&&&&&available.await(); &&&&&&&&&&&&&}&else&{ &&&&&&&&&&&&&&&&&long&delay&=&&first.getDelay(TimeUnit.NANOSECONDS); &&&&&&&&&&&&&&&&&if&(delay&&&0)&{ &&&&&&&&&&&&&&&&&&&&&long&tl&=&available.awaitNanos(delay);&&&&&&&&&&&&&&&&&}&else&{ &&&&&&&&&&&&&&&&&&&&&E&x&=&q.poll(); &&&&&&&&&&&&&&&&&&&&&assert&x&!=&null; &&&&&&&&&&&&&&&&&&&&&if&(q.size()&!=&0) &&&&&&&&&&&&&&&&&&&&&&&&&available.signalAll();&&&&&&&&&&&&&&&&&&&&&&return&x; &&&&&&&&&&&&&&&&&} &&&&&&&&&&&&&} &&&&&&&&&} &&&&&}&finally&{ &&&&&&&&&lock.unlock(); &&&&&} &}&
4.线程池使用策略
通过以上的详解基本上能够定制出自己需要的策略了,下面简单介绍下Executors里面提供的一些常见线程池策略:
1.FixedThreadPool
public&static&ExecutorService&newFixedThreadPool(int&nThreads)&{ &&&&&return&new&ThreadPoolExecutor(nThreads,&nThreads, &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&0L,&TimeUnit.MILLISECONDS, &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&new&LinkedBlockingQueue&Runnable&()); &}&
实际上就是个不支持keepalivetime,且corePoolSize和maximumPoolSize相等的线程池。
2.SingleThreadExecutor
public&static&ExecutorService&newSingleThreadExecutor()&{ &&&&&return&new&FinalizableDelegatedExecutorService &&&&&&&&&(new&ThreadPoolExecutor(1,&1, &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&0L,&TimeUnit.MILLISECONDS, &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&new&LinkedBlockingQueue&Runnable&())); &}&
实际上就是个不支持keepalivetime,且corePoolSize和maximumPoolSize都等1的线程池。
3.CachedThreadPool
public&static&ExecutorService&newCachedThreadPool()&{ &&&&&return&new&ThreadPoolExecutor(0,&Integer.MAX_VALUE, &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&60L,&TimeUnit.SECONDS, &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&new&SynchronousQueue&Runnable&()); &}&
实际上就是个支持keepalivetime时间是60秒(线程空闲存活时间),且corePoolSize为0,maximumPoolSize无穷大的线程池。
4.SingleThreadScheduledExecutor
public&static&ScheduledExecutorService&newSingleThreadScheduledExecutor(ThreadFactory&threadFactory)&{ &&&&&return&new&DelegatedScheduledExecutorService &&&&&&&&&(new&ScheduledThreadPoolExecutor(1,&threadFactory)); &} &
实际上是个corePoolSize为1的ScheduledExecutor。上文说过ScheduledExecutor采用无界等待队列,所以maximumPoolSize没有作用。
5.ScheduledThreadPool
public&static&ScheduledExecutorService&newScheduledThreadPool(int&corePoolSize)&{ &&&&&return&new&ScheduledThreadPoolExecutor(corePoolSize); &}&
实际上是corePoolSize课设定的ScheduledExecutor。上文说过ScheduledExecutor采用无界等待队列,所以maximumPoolSize没有作用。
以上还不一定满足你的需要,完全可以根据自己需要去定制。
原文链接:
【编辑推荐】
【责任编辑: TEL:(010)】
大家都在看猜你喜欢
头条关注头条热点头条
24H热文一周话题本月最赞
讲师:244773人学习过
讲师:96587人学习过
讲师:31007人学习过
精选博文论坛热帖下载排行
本书由浅入深、循序渐进地介绍了目前流行的基于Eclipse的优秀框架。全书共分14章,内容涵盖了Eclipse基础、ANT资源构造、数据库应用开发、W...
订阅51CTO邮刊

我要回帖

更多关于 c 使用task开启线程池 的文章

 

随机推荐