qq怎样解除语聊大厅举报被恶意举报谁可以帮忙解破

ThreadPoolExecutor机制及各参数讲解
本文系转载,原文地址:/blog/2184680
ThreadPoolExecutor机制&
1、ThreadPoolExecutor作为java.util.concurrent包对外提供基础实现,以内部线程池的形式对外提供管理任务执行,线程调度,线程池管理等等服务;&
2、Executors方法提供的线程服务,都是通过参数设置来实现不同的线程池机制。&
3、先来了解其线程池管理的机制,有助于正确使用,避免错误使用导致严重故障。同时可以根据自己的需求实现自己的线程池&
二、核心构造方法讲解&
下面是ThreadPoolExecutor最核心的构造方法&
ThreadPoolExecutor(int corePoolSize,
maximumPoolSize,
& & & long
keepAliveTime,
& & & TimeUnit
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
& if (corePoolSize & 0 ||
maximumPoolSize &= 0 ||
maximumPoolSize & corePoolSize ||
keepAliveTime & 0)
& & & throw new
IllegalArgumentException();
& if (workQueue == null || threadFactory == null
|| handler == null)
& & & throw new
NullPointerException();
& this.corePoolSize = corePoolS
& this.maximumPoolSize = maximumPoolS
& this.workQueue = workQ
& this.keepAliveTime =
unit.toNanos(keepAliveTime);
& this.threadFactory = threadF
& this.handler =
构造方法参数讲解&
corePoolSize
核心线程池大小
maximumPoolSize
最大线程池大小
keepAliveTime
线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间
keepAliveTime时间单位
阻塞任务队列
threadFactory
新建线程工厂
RejectedExecutionHandler
当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理
重点讲解:&
其中比较容易让人误解的是:corePoolSize,maximumPoolSize,workQueue之间关系。&
1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。&
2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行&
3.当workQueue已满,且maximumPoolSize&corePoolSize时,新提交任务会创建新线程执行任务&
4.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理&
5.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程&
6.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭&
线程管理机制图示:
三、Executors提供的线程池配置方案&
1、构造一个固定线程数目的线程池,配置的corePoolSize与maximumPoolSize大小相同,同时使用了一个无界LinkedBlockingQueue存放阻塞任务,因此多余的任务将存在再阻塞队列,不会由RejectedExecutionHandler处理&
static ExecutorService newFixedThreadPool(int nThreads)
& return new ThreadPoolExecutor(nThreads,
& 0L, TimeUnit.MILLISECONDS,
& new LinkedBlockingQueue());
2、构造一个缓冲功能的线程池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一个无容量的阻塞队列
SynchronousQueue,因此任务提交之后,将会创建新的线程执行;线程空闲超过60s将会销毁&
public static ExecutorService newCachedThreadPool() {
& return new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
& 60L, TimeUnit.SECONDS,
& new SynchronousQueue());
3、构造一个只支持一个线程的线程池,配置corePoolSize=maximumPoolSize=1,无界阻塞队列LinkedBlockingQueue;保证任务由一个线程串行执行&
public static ExecutorService newSingleThreadExecutor()
& return new
FinalizableDelegatedExecutorService
& & & (new
ThreadPoolExecutor(1, 1,
TimeUnit.MILLISECONDS,
LinkedBlockingQueue()));
4、构造有定时功能的线程池,配置corePoolSize,无界延迟阻塞队列DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,由于DelayedWorkQueue是无界队列,所以这个值是没有意义的&
public static ScheduledExecutorService
newScheduledThreadPool(int corePoolSize) {
& return new
ScheduledThreadPoolExecutor(corePoolSize);
public static ScheduledExecutorService
newScheduledThreadPool(
corePoolSize, ThreadFactory threadFactory) {
& return new
ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
public ScheduledThreadPoolExecutor(int corePoolSize,
&ThreadFactory threadFactory) {
& super(corePoolSize, Integer.MAX_VALUE, 0,
TimeUnit.NANOSECONDS,
& new DelayedWorkQueue(), threadFactory);
四、定制属于自己的非阻塞线程池&
import java.util.concurrent.ArrayBlockingQ
import java.util.concurrent.ExecutorS
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadF
import java.util.concurrent.ThreadPoolE
import java.util.concurrent.TimeU
import java.util.concurrent.atomic.AtomicI
public class CustomThreadPoolExecutor {
private ThreadPoolExecutor pool =
public void init() {
pool = new ThreadPoolExecutor(
TimeUnit.MINUTES,
new ArrayBlockingQueue(10),
new CustomThreadFactory(),
new CustomRejectedExecutionHandler());
public void destory() {
if(pool != null) {
pool.shutdownNow();
public ExecutorService getCustomThreadPoolExecutor() {
return this.
private class CustomThreadFactory implements ThreadFactory
private AtomicInteger count = new AtomicInteger(0);
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName =
CustomThreadPoolExecutor.class.getSimpleName() +
count.addAndGet(1);
System.out.println(threadName);
t.setName(threadName);
private class CustomRejectedExecutionHandler implements
RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor
executor) {
// 记录异常
// 报警处理等
System.out.println("error.............");
// 测试构造的线程池
public static void main(String[] args) {
CustomThreadPoolExecutor exec = new
CustomThreadPoolExecutor();
// 1.初始化
exec.init();
ExecutorService pool =
exec.getCustomThreadPoolExecutor();
for(int i=1; i&100; i++) {
System.out.println("提交第" + i + "个任务!");
pool.execute(new Runnable() {
public void run() {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("running=====");
// 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了
// exec.destory();
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
方法中建立一个核心线程数为30个,缓冲队列有10个的线程池。每个线程任务,执行时会先睡眠3秒,保证提交10任务时,线程数目被占用完,再提交30任务时,阻塞队列被占用完,,这样提交第41个任务是,会交给CustomRejectedExecutionHandler
异常处理类来处理。
提交任务的代码如下:
public void execute(Runnable command) {
& if (command == null)
& & & throw new
NullPointerException();
& int c = ctl.get();
& if (workerCountOf(c) & corePoolSize) {
(addWorker(command, true))
ctl.get();
& if (isRunning(c) &&
workQueue.offer(command)) {
recheck = ctl.get();
& & & if (!
isRunning(recheck) && remove(command))
& & reject(command);
& & & else if
(workerCountOf(recheck) == 0)
& & addWorker(null, false);
& else if (!addWorker(command, false))
reject(command);
注意:41以后提交的任务就不能正常处理了,因为,execute中提交到任务队列是用的offer方法,如上面代码,这个方法是非阻塞的,所以就会交给CustomRejectedExecutionHandler
来处理,所以对于大数据量的任务来说,这种线程池,如果不设置队列长度会OOM,设置队列长度,会有任务得不到处理,接下来我们构建一个阻塞的自定义线程池&
五、定制属于自己的阻塞线程池&
package com.tongbanjie.
import java.util.concurrent.ArrayBlockingQ
import java.util.concurrent.ExecutorS
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadF
import java.util.concurrent.ThreadPoolE
import java.util.concurrent.TimeU
import java.util.concurrent.atomic.AtomicI
public class CustomThreadPoolExecutor {
& & private
ThreadPoolExecutor pool = &
& & public void init() {
& pool = new ThreadPoolExecutor(
& & TimeUnit.MINUTES,
& & new ArrayBlockingQueue(5),
& & new CustomThreadFactory(),
CustomRejectedExecutionHandler()); &
& & public void destory() {
& if(pool != null) { &
pool.shutdownNow(); &
& & public ExecutorService
getCustomThreadPoolExecutor() { &
& return this. &
& & private class
CustomThreadFactory implements ThreadFactory {
& private AtomicInteger count = new
AtomicInteger(0); &
& @Override &
& public Thread newThread(Runnable r) {
& & & Thread t =
new Thread(r); &
& & & String
threadName = CustomThreadPoolExecutor.class.getSimpleName() +
count.addAndGet(1); &
System.out.println(threadName); &
t.setName(threadName); &
& & private class
CustomRejectedExecutionHandler implements RejectedExecutionHandler
& @Override &
& public void rejectedExecution(Runnable r,
ThreadPoolExecutor executor) { &
& // 核心改造点,由blockingqueue的offer改成put阻塞方法
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
& & // 测试构造的线程池
& & public static void
main(String[] args) { &
& CustomThreadPoolExecutor exec = new
CustomThreadPoolExecutor(); &
& // 1.初始化 &
& exec.init(); &
& ExecutorService pool =
exec.getCustomThreadPoolExecutor(); &
& for(int i=1; i&100; i++) {
System.out.println("提交第" + i + "个任务!"); &
pool.execute(new Runnable() { &
& & @Override
& & public void run() {
& System.out.println("&&&task is
running=====");&
TimeUnit.SECONDS.sleep(10);
& } catch (InterruptedException e) {
e.printStackTrace(); &
& // 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了
& // exec.destory(); &
Thread.sleep(10000); &
& } catch (InterruptedException e) {
e.printStackTrace(); &
解释:当提交任务被拒绝时,进入拒绝机制,我们实现拒绝方法,把任务重新用阻塞提交方法put提交,实现阻塞提交任务功能,防止队列过大,OOM,提交被拒绝方法在下面&
public void execute(Runnable command) {
& if (command == null)
& & & throw new
NullPointerException();
& int c = ctl.get();
& if (workerCountOf(c) & corePoolSize) {
(addWorker(command, true))
ctl.get();
& if (isRunning(c) &&
workQueue.offer(command)) {
recheck = ctl.get();
& & & if (!
isRunning(recheck) && remove(command))
& & reject(command);
& & & else if
(workerCountOf(recheck) == 0)
& & addWorker(null, false);
& else if (!addWorker(command, false))
& & & // 进入拒绝机制,
我们把runnable任务拿出来,重新用阻塞操作put,来实现提交阻塞功能
reject(command);
1、用ThreadPoolExecutor自定义线程池,看线程是的用途,如果任务量不大,可以用无界队列,如果任务量非常大,要用有界队列,防止OOM&
2、如果任务量很大,还要求每个任务都处理成功,要对提交的任务进行阻塞提交,重写拒绝机制,改为阻塞提交。保证不抛弃一个任务&
3、最大线程数一般设为2N+1最好,N是CPU核数&
4、核心线程数,看应用,如果是任务,一天跑一次,设置为0,合适,因为跑完就停掉了,如果是常用线程池,看任务量,是保留一个核心还是几个核心线程数&
5、如果要获取任务执行结果,用CompletionService,但是注意,获取任务的结果的要重新开一个线程获取,如果在主线程获取,就要等任务都提交后才获取,就会阻塞大量任务结果,队列过大OOM,所以最好异步开个线程获取结果
已投稿到:
以上网友发言只代表其个人观点,不代表新浪网的观点或立场。博客访问: 1641750
博文数量: 290
博客积分: 5485
博客等级: 大校
技术积分: 3718
注册时间:
多读书,多做事,广交朋友,趣味丛生
IT168企业级官微
微信号:IT168qiye
系统架构师大会
微信号:SACC2013
分类: Java
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:&
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,&
long keepAliveTime, TimeUnit unit,&
BlockingQueue&Runnable& workQueue,&
RejectedExecutionHandler handler)&
corePoolSize: 线程池维护线程的最少数量&
maximumPoolSize:线程池维护线程的最大数量&
keepAliveTime: 线程池维护线程所允许的空闲时间&
unit: 线程池维护线程所允许的空闲时间的单位&
workQueue: 线程池所使用的缓冲队列&
handler: 线程池对拒绝任务的处理策略&
一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。&
当一个任务通过execute(Runnable)方法欲添加到线程池时:&
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。&
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。&
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。&
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。&
也就是:处理任务的优先级为:&
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。&
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。&
unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:&
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。&
workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue&
handler有四个选择:&
ThreadPoolExecutor.AbortPolicy()&
抛出java.util.concurrent.RejectedExecutionException异常&
ThreadPoolExecutor.CallerRunsPolicy()&
重试添加当前的任务,他会自动重复调用execute()方法&
ThreadPoolExecutor.DiscardOldestPolicy()&
抛弃旧的任务&
ThreadPoolExecutor.DiscardPolicy()&
抛弃当前的任务&
二、一般用法举例&
package demo;
import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThreadPool2
&&&&private static int produceTaskSleepTime = 2;
&&&&private static int produceTaskMaxNumber = 10;
&&&&public static void main(String[] args)
&&&&&&&&// 构造一个线程池
&&&&&&&&ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue&Runnable&(3),
&&&&&&&&&&&&&&&&new ThreadPoolExecutor.DiscardOldestPolicy());
&&&&&&&&for (int i = 1; i &= produceTaskMaxNumber; i++)
&&&&&&&&&&&&try
&&&&&&&&&&&&{
&&&&&&&&&&&&&&&&// 产生一个任务,并将其加入到线程池
&&&&&&&&&&&&&&&&String task = "task@ " + i;
&&&&&&&&&&&&&&&&System.out.println("put " + task);
&&&&&&&&&&&&&&&&threadPool.execute(new ThreadPoolTask(task));
&&&&&&&&&&&&&&&&// 便于观察,等待一段时间
&&&&&&&&&&&&&&&&Thread.sleep(produceTaskSleepTime);
&&&&&&&&&&&&}
&&&&&&&&&&&&catch (Exception e)
&&&&&&&&&&&&{
&&&&&&&&&&&&&&&&e.printStackTrace();
&&&&&&&&&&&&}
&* 线程池执行的任务
class ThreadPoolTask implements Runnable, Serializable
&&&&private static final long serialVersionUID = 0;
&&&&private static int consumeTaskSleepTime = 2000;
&&&&// 保存任务所需要的数据
&&&&private Object threadPoolTaskData;
&&&&ThreadPoolTask(Object tasks)
&&&&&&&&this.threadPoolTaskData = tasks;
&&&&public void run()
&&&&&&&&// 处理一个任务,这里的处理方式太简单了,仅仅是一个打印语句
&&&&&&&&System.out.println(Thread.currentThread().getName());
&&&&&&&&System.out.println("start .." + threadPoolTaskData);
&&&&&&&&try
&&&&&&&&&&&&// //便于观察,等待一段时间
&&&&&&&&&&&&Thread.sleep(consumeTaskSleepTime);
&&&&&&&&catch (Exception e)
&&&&&&&&&&&&e.printStackTrace();
&&&&&&&&threadPoolTaskData = null;
&&&&public Object getTask()
&&&&&&&&return this.threadPoolTaskData;
1、在这段程序中,一个任务就是一个Runnable类型的对象,也就是一个ThreadPoolTask类型的对象。&
2、一般来说任务除了处理方式外,还需要处理的数据,处理的数据通过构造方法传给任务。&
3、在这段程序中,main()方法相当于一个残忍的领导,他派发出许多任务,丢给一个叫 threadPool的任劳任怨的小组来做。&
这个小组里面队员至少有两个,如果他们两个忙不过来,任务就被放到任务列表里面。&
如果积压的任务过多,多到任务列表都装不下(超过3个)的时候,就雇佣新的队员来帮忙。但是基于成本的考虑,不能雇佣太多的队员,至多只能雇佣 4个。&
如果四个队员都在忙时,再有新的任务,这个小组就处理不了了,任务就会被通过一种策略来处理,我们的处理方式是不停的派发,直到接受这个任务为止(更残忍!呵呵)。&
因为队员工作是需要成本的,如果工作很闲,闲到 3SECONDS都没有新的任务了,那么有的队员就会被解雇了,但是,为了小组的正常运转,即使工作再闲,小组的队员也不能少于两个。&
4、通过调整 produceTaskSleepTime和 consumeTaskSleepTime的大小来实现对派发任务和处理任务的速度的控制,改变这两个值就可以观察不同速率下程序的工作情况。&
5、通过调整4中所指的数据,再加上调整任务丢弃策略,换上其他三种策略,就可以看出不同策略下的不同处理方式。&
6、对于其他的使用方法,参看jdk的帮助,很容易理解和使用。&
另一个例子:&
package demo;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorTest
&&&&private static int queueDeep = 4;
&&&&public void createThreadPool()
&&&&&&&&/*
&&&&&&&&&* 创建线程池,最小线程数为2,最大线程数为4,线程池维护线程的空闲时间为3秒,
&&&&&&&&&* 使用队列深度为4的有界队列,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,
&&&&&&&&&* 然后重试执行程序(如果再次失败,则重复此过程),里面已经根据队列深度对任务加载进行了控制。
&&&&&&&&&*/
&&&&&&&&ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue&Runnable&(queueDeep),
&&&&&&&&&&&&&&&&new ThreadPoolExecutor.DiscardOldestPolicy());
&&&&&&&&// 向线程池中添加 10 个任务
&&&&&&&&for (int i = 0; i & 10; i++)
&&&&&&&&&&&&try
&&&&&&&&&&&&{
&&&&&&&&&&&&&&&&Thread.sleep(1);
&&&&&&&&&&&&}
&&&&&&&&&&&&catch (InterruptedException e)
&&&&&&&&&&&&{
&&&&&&&&&&&&&&&&e.printStackTrace();
&&&&&&&&&&&&}
&&&&&&&&&&&&while (getQueueSize(tpe.getQueue()) &= queueDeep)
&&&&&&&&&&&&{
&&&&&&&&&&&&&&&&System.out.println("队列已满,等3秒再添加任务");
&&&&&&&&&&&&&&&&try
&&&&&&&&&&&&&&&&{
&&&&&&&&&&&&&&&&&&&&Thread.sleep(3000);
&&&&&&&&&&&&&&&&}
&&&&&&&&&&&&&&&&catch (InterruptedException e)
&&&&&&&&&&&&&&&&{
&&&&&&&&&&&&&&&&&&&&e.printStackTrace();
&&&&&&&&&&&&&&&&}
&&&&&&&&&&&&}
&&&&&&&&&&&&TaskThreadPool ttp = new TaskThreadPool(i);
&&&&&&&&&&&&System.out.println("put i:" + i);
&&&&&&&&&&&&tpe.execute(ttp);
&&&&&&&&tpe.shutdown();
&&&&private synchronized int getQueueSize(Queue queue)
&&&&&&&&return queue.size();
&&&&public static void main(String[] args)
&&&&&&&&ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();
&&&&&&&&test.createThreadPool();
&&&&class TaskThreadPool implements Runnable
&&&&&&&&private int index;
&&&&&&&&public TaskThreadPool(int index)
&&&&&&&&&&&&this.index = index;
&&&&&&&&public void run()
&&&&&&&&&&&&System.out.println(Thread.currentThread() + " index:" + index);
&&&&&&&&&&&&try
&&&&&&&&&&&&{
&&&&&&&&&&&&&&&&Thread.sleep(3000);
&&&&&&&&&&&&}
&&&&&&&&&&&&catch (InterruptedException e)
&&&&&&&&&&&&{
&&&&&&&&&&&&&&&&e.printStackTrace();
&&&&&&&&&&&&}
阅读(22990) | 评论(0) | 转发(3) |
相关热门文章
给主人留下些什么吧!~~
请登录后评论。Java并发编程:线程池的使用-android100学习网
Java并发编程:线程池的使用
Java并发编程:线程池的使用
在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:
  如果并发的线程数量很多,并且每个线程都是执行一个时...
Java并发编程:线程池的使用
在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:
  如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
  那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?
  在Java中可以通过线程池来达到这样的效果。今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,然后再讲述它的实现原理,接着给出了它的使用示例,最后讨论了一下如何合理配置线程池的大小。
  以下是本文的目录大纲:
  一.Java中的ThreadPoolExecutor类
  二.深入剖析线程池实现原理
  三.使用示例
  四.如何合理配置线程池的大小 
  若有不正之处请多多谅解,并欢迎批评指正。
一.Java中的ThreadPoolExecutor类
  java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。
  在ThreadPoolExecutor类中提供了四个构造方法:
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue&Runnable& workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue&Runnable& workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue&Runnable& workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue&Runnable& workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
  从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。
  下面解释下一下构造器中各个参数的含义:
corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS;
TimeUnit.HOURS;
TimeUnit.MINUTES;
TimeUnit.SECONDS;
TimeUnit.MILLISECONDS;
TimeUnit.MICROSECONDS;
TimeUnit.NANOSECONDS;
workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQ
LinkedBlockingQ
SynchronousQ
  ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
threadFactory:线程工厂,主要用来创建线程;
handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
  具体参数的配置与线程池的关系将在下一节讲述。
  从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:
public abstract class AbstractExecutorService implements ExecutorService {
protected &T& RunnableFuture&T& newTaskFor(Runnable runnable, T value) { };
protected &T& RunnableFuture&T& newTaskFor(Callable&T& callable) { };
public Future&?& submit(Runnable task) {};
public &T& Future&T& submit(Runnable task, T result) { };
public &T& Future&T& submit(Callable&T& task) { };
private &T& T doInvokeAny(Collection&? extends Callable&T&& tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
public &T& T invokeAny(Collection&? extends Callable&T&& tasks)
throws InterruptedException, ExecutionException {
public &T& T invokeAny(Collection&? extends Callable&T&& tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
public &T& List&Future&T&& invokeAll(Collection&? extends Callable&T&& tasks)
throws InterruptedException {
public &T& List&Future&T&& invokeAll(Collection&? extends Callable&T&& tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
  AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。
  我们接着看ExecutorService接口的实现:
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedE
&T& Future&T& submit(Callable&T& task);
&T& Future&T& submit(Runnable task, T result);
Future&?& submit(Runnable task);
&T& List&Future&T&& invokeAll(Collection&? extends Callable&T&& tasks)
throws InterruptedE
&T& List&Future&T&& invokeAll(Collection&? extends Callable&T&& tasks,
long timeout, TimeUnit unit)
throws InterruptedE
&T& T invokeAny(Collection&? extends Callable&T&& tasks)
throws InterruptedException, ExecutionE
&T& T invokeAny(Collection&? extends Callable&T&& tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutE
  而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:
public interface Executor {
void execute(Runnable command);
  到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。
  Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;
  然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
  抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;
  然后ThreadPoolExecutor继承了类AbstractExecutorService。
  在ThreadPoolExecutor类中有几个非常重要的方法:
shutdown()
shutdownNow()
  execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
  submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。
  shutdown()和shutdownNow()是用来关闭线程池的。
  还有很多其他的方法:
  比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。
二.深入剖析线程池实现原理
  在上一节我们从宏观上介绍了ThreadPoolExecutor,下面我们来深入解析一下线程池的具体实现原理,将从下面几个方面讲解:
  1.线程池状态
  2.任务的执行
  3.线程池中的线程初始化
  4.任务缓存队列及排队策略
  5.任务拒绝策略
  6.线程池的关闭
  7.线程池容量的动态调整
1.线程池状态
  在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:
volatile int runS
static final int RUNNING
static final int SHUTDOWN
static final int STOP
static final int TERMINATED = 3;
  runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;
  下面的几个static final变量表示runState可能的几个取值。
  当创建线程池后,初始时,线程池处于RUNNING状态;
  如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
  如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
  当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
2.任务的执行
  在了解将任务提交给线程池到任务执行完毕整个过程之前,我们先来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:
private final BlockingQueue&Runnable& workQ
//任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock();
//线程池的主要状态锁,对线程池状态(比如线程池大小
//、runState等)的改变都要使用这个锁
private final HashSet&Worker& workers = new HashSet&Worker&();
//用来存放工作集
private volatile long
keepAliveT
//线程存货时间
private volatile boolean allowCoreThreadTimeO
//是否允许为核心线程设置存活时间
private volatile int
//核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int
maximumPoolS
//线程池最大能容忍的线程数
private volatile int
//线程池中当前的线程数
private volatile RejectedExecutionH //任务拒绝策略
private volatile ThreadFactory threadF
//线程工厂,用来创建线程
private int largestPoolS
//用来记录线程池中曾经出现过的最大线程数
private long completedTaskC
//用来记录已经执行完毕的任务个数
  每个变量的作用都已经标明出来了,这里要重点解释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。
  corePoolSize在很多地方被翻译成核心池大小,其实我的理解这个就是线程池的大小。举个简单的例子:
  假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。
  因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;
  当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;
  如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;
  然后就将任务也分配给这4个临时工人做;
  如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。
  当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。
  这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
  也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。
  不过为了方便理解,在本文后面还是将corePoolSize翻译成核心池大小。
  largestPoolSize只是一个用来起记录作用的变量,用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。
  下面我们进入正题,看一下任务从提交到最终执行完毕经历了哪些过程。
  在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize &= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
  上面的代码可能看起来不是那么容易理解,下面我们一句一句解释:
  首先,判断提交的任务command是否为null,若是null,则抛出空指针异常;
  接着是这句,这句要好好理解一下:
if (poolSize &= corePoolSize || !addIfUnderCorePoolSize(command))
  由于是或条件运算符,所以先计算前半部分的值,如果线程池中当前线程数不小于核心池大小,那么就会直接进入下面的if语句块了。
  如果线程池中当前线程数小于核心池大小,则接着执行后半部分,也就是执行
addIfUnderCorePoolSize(command)
  如果执行完addIfUnderCorePoolSize这个方法返回false,则继续执行下面的if语句块,否则整个方法就直接执行完毕了。
  如果执行完addIfUnderCorePoolSize这个方法返回false,然后接着判断:
if (runState == RUNNING && workQueue.offer(command))
  如果当前线程池处于RUNNING状态,则将任务放入任务缓存队列;如果当前线程池不处于RUNNING状态或者任务放入缓存队列失败,则执行:
addIfUnderMaximumPoolSize(command)
  如果执行addIfUnderMaximumPoolSize方法失败,则执行reject()方法进行任务拒绝处理。
  回到前面:
if (runState == RUNNING && workQueue.offer(command))
  这句的执行,如果说当前线程池处于RUNNING状态且将任务放入任务缓存队列成功,则继续进行判断:
if (runState != RUNNING || poolSize == 0)
  这句判断是为了防止在将此任务添加进任务缓存队列的同时其他线程突然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施。如果是这样就执行:
ensureQueuedTaskHandled(command)
  进行应急处理,从名字可以看出是保证 添加到任务缓存队列中的任务得到处理。
  我们接着看2个关键方法的实现:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t =
final ReentrantLock mainLock = this.mainL
mainLock.lock();
if (poolSize & corePoolSize && runState == RUNNING)
t = addThread(firstTask);
//创建线程去执行firstTask任务
} finally {
mainLock.unlock();
if (t == null)
t.start();
  这个是addIfUnderCorePoolSize方法的具体实现,从名字可以看出它的意图就是当低于核心吃大小时执行的方法。下面看其具体实现,首先获取到锁,因为这地方涉及到线程池状态的变化,先通过if语句判断当前线程池中的线程数目是否小于核心池大小,有朋友也许会有疑问:前面在execute()方法中不是已经判断过了吗,只有线程池当前线程数目小于核心池大小才会执行addIfUnderCorePoolSize方法的,为何这地方还要继续判断?原因很简单,前面的判断过程中并没有加锁,因此可能在execute方法判断的时候poolSize小于corePoolSize,而判断完之后,在其他线程中又向线程池提交了任务,就可能导致poolSize不小于corePoolSize了,所以需要在这个地方继续判断。然后接着判断线程池的状态是否为RUNNING,原因也很简单,因为有可能在其他线程中调用了shutdown或者shutdownNow方法。然后就是执行
t = addThread(firstTask);
  这个方法也非常关键,传进去的参数为提交的任务,返回值为Thread类型。然后接着在下面判断t是否为空,为空则表明创建线程失败(即poolSize&=corePoolSize或者runState不等于RUNNING),否则调用t.start()方法启动线程。
  我们来看一下addThread方法的实现:
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
//创建一个线程,执行任务
if (t != null) {
w.thread =
//将创建的线程的引用赋值为w的成员变量
workers.add(w);
int nt = ++poolS
//当前线程数加1
if (nt & largestPoolSize)
largestPoolSize =
  在addThread方法中,首先用提交的任务创建了一个Worker对象,然后调用线程工厂threadFactory创建了一个新的线程t,然后将线程t的引用赋值给了Worker对象的成员变量thread,接着通过workers.add(w)将Worker对象添加到工作集当中。
  下面我们看一下Worker类的实现:
private final class Worker implements Runnable {
private final ReentrantLock runLock = new ReentrantLock();
private Runnable firstT
volatile long completedT
Worker(Runnable firstTask) {
this.firstTask = firstT
boolean isActive() {
return runLock.isLocked();
void interruptIfIdle() {
final ReentrantLock runLock = this.runL
if (runLock.tryLock()) {
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
void interruptNow() {
thread.interrupt();
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runL
runLock.lock();
if (runState & STOP &&
Thread.interrupted() &&
runState &= STOP)
boolean ran =
beforeExecute(thread, task);
//beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据
//自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等
task.run();
afterExecute(task, null);
++completedT
} catch (RuntimeException ex) {
afterExecute(task, ex);
} finally {
runLock.unlock();
public void run() {
Runnable task = firstT
firstTask =
while (task != null || (task = getTask()) != null) {
runTask(task);
} finally {
workerDone(this);
//当任务队列中没有任务时,进行清理工作
  它实际上实现了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面这句的效果基本一样:
Thread t = new Thread(w);
  相当于传进去了一个Runnable任务,在线程t中执行这个Runnable。
  既然Worker实现了Runnable接口,那么自然最核心的方法便是run()方法了:
public void run() {
Runnable task = firstT
firstTask =
while (task != null || (task = getTask()) != null) {
runTask(task);
} finally {
workerDone(this);
  从run方法的实现可以看出,它首先执行的是通过构造器传进来的任务firstTask,在调用runTask()执行完firstTask之后,在while循环里面不断通过getTask()去取新的任务来执行,那么去哪里取呢?自然是从任务缓存队列里面去取,getTask是ThreadPoolExecutor类中的方法,并不是Worker类中的方法,下面是getTask方法的实现:
Runnable getTask() {
for (;;) {
int state = runS
if (state & SHUTDOWN)
if (state == SHUTDOWN)
// Help drain queue
r = workQueue.poll();
else if (poolSize & corePoolSize || allowCoreThreadTimeOut) //如果线程数大于核心池大小或者允许为核心池线程设置空闲时间,
//则通过poll取任务,若等待一定的时间取不到任务,则返回null
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
r = workQueue.take();
if (r != null)
if (workerCanExit()) {
//如果没取到任务,即r为null,则判断当前的worker是否可以退出
if (runState &= SHUTDOWN) // Wake up others
interruptIdleWorkers();
//中断处于空闲状态的worker
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
  在getTask中,先判断当前线程池状态,如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null。
  如果runState为SHUTDOWN或者RUNNING,则从任务缓存队列取任务。
  如果当前线程池的线程数大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用poll(time,timeUnit)来取任务,这个方法会等待一定的时间,如果取不到任务就返回null。
  然后判断取到的任务r是否为null,为null则通过调用workerCanExit()方法来判断当前worker是否可以退出,我们看一下workerCanExit()的实现:
private boolean workerCanExit() {
final ReentrantLock mainLock = this.mainL
mainLock.lock();
boolean canE
//如果runState大于等于STOP,或者任务缓存队列为空了
允许为核心池线程设置空闲存活时间并且线程池中的线程数目大于1
canExit = runState &= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize & Math.max(1, corePoolSize));
} finally {
mainLock.unlock();
return canE
  也就是说如果线程池处于STOP状态、或者任务队列已为空或者允许为核心池线程设置空闲存活时间并且线程数大于1时,允许worker退出。如果允许worker退出,则调用interruptIdleWorkers()中断处于空闲状态的worker,我们看一下interruptIdleWorkers()的实现:
void interruptIdleWorkers() {
final ReentrantLock mainLock = this.mainL
mainLock.lock();
for (Worker w : workers)
//实际上调用的是worker的interruptIfIdle()方法
w.interruptIfIdle();
} finally {
mainLock.unlock();
  从实现可以看出,它实际上调用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:
void interruptIfIdle() {
final ReentrantLock runLock = this.runL
if (runLock.tryLock()) {
//注意这里,是调用tryLock()来获取锁的,因为如果当前worker正在执行任务,锁已经被获取了,是无法获取到锁的
//如果成功获取了锁,说明当前worker处于空闲状态
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
  这里有一个非常巧妙的设计方式,假如我们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给空闲线程执行。但是在这里,并没有采用这样的方式,因为这样会要额外地对任务分派线程进行管理,无形地会增加难度和复杂度,这里直接让执行完任务的线程去任务缓存队列里面取任务来执行。
  我们再看addIfUnderMaximumPoolSize方法的实现,这个方法的实现思想和addIfUnderCorePoolSize方法的实现思想非常相似,唯一的区别在于addIfUnderMaximumPoolSize方法是在线程池中的线程数达到了核心池大小并且往任务队列中添加任务失败的情况下执行的:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t =
final ReentrantLock mainLock = this.mainL
mainLock.lock();
if (poolSize & maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
if (t == null)
t.start();
  看到没有,其实它和addIfUnderCorePoolSize方法的实现基本一模一样,只是if语句判断条件中的poolSize & maximumPoolSize不同而已。
  到这里,大部分朋友应该对任务提交给线程池之后到被执行的整个过程有了一个基本的了解,下面总结一下:
  1)首先,要清楚corePoolSize和maximumPoolSize的含义;
  2)其次,要知道Worker是用来起到什么作用的;
  3)要知道任务提交给线程池之后的处理策略,这里总结一下主要有4点:
如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
如果当前线程池中的线程数目&=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
3.线程池中的线程初始化
  默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
  在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:
prestartCoreThread():初始化一个核心线程;
prestartAllCoreThreads():初始化所有核心线程
  下面是这2个方法的实现:
public boolean prestartCoreThread() {
return addIfUnderCorePoolSize(null); //注意传进去的参数是null
public int prestartAllCoreThreads() {
int n = 0;
while (addIfUnderCorePoolSize(null))//注意传进去的参数是null
  注意上面传进去的参数是null,根据第2小节的分析可知如果传进去的参数为null,则最后执行线程会阻塞在getTask方法中的
r = workQueue.take();
  即等待任务队列中有任务。
4.任务缓存队列及排队策略
  在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。
  workQueue的类型为BlockingQueue&Runnable&,通常可以取下面三种类型:
  1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
  2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
  3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
5.任务拒绝策略
  当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
6.线程池的关闭
  ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
7.线程池容量的动态调整
  ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
setCorePoolSize:设置核心池大小
setMaximumPoolSize:设置线程池最大能创建的线程数目大小
  当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。
三.使用示例
  前面我们讨论了关于线程池的实现原理,这一节我们来看一下它的具体使用:
public class Test {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue&Runnable&(5));
for(int i=0;i&15;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
executor.shutdown();
class MyTask implements Runnable {
private int taskN
public MyTask(int num) {
this.taskNum =
public void run() {
System.out.println("正在执行task "+taskNum);
Thread.currentThread().sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("task "+taskNum+"执行完毕");
  执行结果:
正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 4
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 10
线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 11
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 12
线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 13
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 14
task 3执行完毕
task 0执行完毕
task 2执行完毕
task 1执行完毕
正在执行task 8
正在执行task 7
正在执行task 6
正在执行task 5
task 4执行完毕
task 10执行完毕
task 11执行完毕
task 13执行完毕
task 12执行完毕
正在执行task 9
task 14执行完毕
task 8执行完毕
task 5执行完毕
task 7执行完毕
task 6执行完毕
task 9执行完毕
  从执行结果可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了。
  不过在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:
Executors.newCachedThreadPool();
//创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor();
//创建容量为1的缓冲池
Executors.newFixedThreadPool(int);
//创建固定容量大小的缓冲池
  下面是这三个静态方法的具体实现;
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue&Runnable&());
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue&Runnable&()));
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue&Runnable&());
  从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。
  newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
  newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;
  newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
  实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。
  另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。
四.如何合理配置线程池的大小
  本节来讨论一个比较重要的话题:如何合理配置线程池大小,仅供参考。
  一般需要根据任务的类型来配置线程池大小:
  如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1
  如果是IO密集型任务,参考值可以设置为2*NCPU
  当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。
/Javabc/874186.true/Javabc/874186.htmlTechArticleJava并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如...

我要回帖

更多关于 qq语聊大厅哪里最刺激 的文章

 

随机推荐