有返回值的方法能用reentrantcracklock使用方法 吗

java线程新特性 - HelloWorld - ITeye技术网站
博客分类:
ExecutorService
import java.util.concurrent.ArrayBlockingQ
import java.util.concurrent.BlockingD
import java.util.concurrent.BlockingQ
import java.util.concurrent.ExecutorS
import java.util.concurrent.E
import java.util.concurrent.ScheduledExecutorS
import java.util.concurrent.ThreadF
import java.util.concurrent.ThreadPoolE
import java.util.concurrent.TimeU
import java.util.logging.L
import java.util.logging.L
public class ExecutorsServiceThread {
public static void main(String[] args) {
//ExecutorService pool = Executors.newFixedThreadPool(2);//固定大小的线程池
//ExecutorService pool = Executors.newSingleThreadExecutor();//单个工作线程,以上两个线程池都是固定大小的,添加线程超过大小限制后就会自动增加到队列中等待,一旦有线程池中有完成的线程,则排队等待的线程就会入池执行
//ExecutorService pool = Executors.newCachedThreadPool();
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
//ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();//单任务延迟线程池
//ThreadPoolExecutor pool2=new ThreadPoolExecutor(corePoolSize, //池中保存的线程数,包括空闲线程
//maximumPoolSize,//池中允许的最大线程数
//keepAliveTime, //当线程数大于核心时,此为终止前 多余的空闲线程等待新任务的最长时间。
//TimeUnit.DAYS, //时间的单位
//workQueue);//任务队列
BlockingQueue&Runnable& block = new ArrayBlockingQueue&Runnable&(20);
ThreadPoolExecutor pool3 = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, block);
Thread t1 = new ExecutorsThread("t1");
Thread t2 = new ExecutorsThread("t2");
Thread t3 = new ExecutorsThread("t3");
Thread t4 = new ExecutorsThread("t4");
Thread t5 = new ExecutorsThread("t5");
pool.execute(t1);
pool.execute(t2);
pool.execute(t1);
pool.schedule(t5, 10, TimeUnit.SECONDS);//参数:要执行的任务,从现在开始延迟的时间,时间的单位
pool.shutdown();
pool3.execute(t1);
pool3.execute(t2);
pool3.execute(t3);
pool3.execute(t4);
pool3.shutdown();
ThreadFactory factory=new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(r);
class ExecutorsThread extends Thread {
public ExecutorsThread(String name) {
this.name =
public void run() {
//while (true) {
System.out.println(name + " is running......");
sleep(1000);
} catch (InterruptedException ex) {
Logger.getLogger(ExecutorsThread.class.getName()).log(Level.SEVERE, null, ex);
带返回结果的线程
//下面的executer 还有一个 List&Future&T&&
invokeAll(Collection&Callable&T&& tasks)方法,执行完所有任务后可将结果保存致list中返回,比便后续处理
import java.util.concurrent.C
import java.util.concurrent.ExecutionE
import java.util.concurrent.ExecutorS
import java.util.concurrent.E
import java.util.concurrent.F
public class CallableThread {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executer = Executors.newFixedThreadPool(2);
CallThread callable1 = new CallThread("1111");
CallThread callable2 = new CallThread("2222");
Future&String&f1=executer.submit(callable1);//当执Callable任务后将返回结果保存到Future对象中
Future&String&f2=executer.submit(callable2);
System.out.println(f1.get());
System.out.println(f2.get());
executer.shutdown();
* 有返回值的任务
class CallThread implements Callable&String& {//可返回值的任务必须实现Callble接口
private String name = "";
public CallThread(String name) {
this.name =
public String call() throws Exception {
//import java.util.R
import java.lang.reflect.F
import java.util.R
import java.util.concurrent.ArrayBlockingQ
import java.util.concurrent.BlockingQ
import java.util.concurrent.ExecutorS
import java.util.concurrent.E
import java.util.concurrent.locks.L
import java.util.concurrent.locks.ReentrantL
import java.util.logging.L
import java.util.logging.L
public class BlockingQueueThread {
public static void main(String[] args) {
BlockingQueue queue = new ArrayBlockingQueue(5);
ExecutorService pool = Executors.newFixedThreadPool(15);
PutThread[] blockThread = new PutThread[10];
TakeThread[] takeThread = new TakeThread[5];
Lock lock = new ReentrantLock();
Lock takeLock = new ReentrantLock();
for (int i = 0; i & takeThread. i++) {
takeThread[i] = new TakeThread(queue, "thread" + i, takeLock);
pool.execute(takeThread[i]);
for (int i = 0; i & blockThread. i++) {
blockThread[i] = new PutThread(queue, "thread" + i, lock);//在此处takeThread和putThread内加不同锁不能保证打印时的正确性,因为执行put后
//可能已经有take线程从queue中取数据。因此打印的结果看似add都在一块,但数据不是递增的
//如果加同一个锁的话那就会阻塞:当put满或take空时queue会让其等待,但有拿着锁,而take
//或put进不到queue中无法继续执行,一直等待。
//结论:用BlockQueue put 或take 还想同时拿到其内部数据多少时不可行的。
//以上结论是不对滴:以下注释中有个非主流的做法
pool.execute(blockThread[i]);
pool.shutdown();
class PutThread extends Thread {
private BlockingQ
public PutThread(BlockingQueue queue, String name, Lock lock) {
this.queue =
this.name =
this.lock =
public void run() {
while (true) {
lock.lock();
Integer i = new Random().nextInt(10) + 1;
queue.put(i);//此处应用put take 。如果用add时,成功返回true,失败则抛出异常
} catch (InterruptedException ex) {
Logger.getLogger(PutThread.class.getName()).log(Level.SEVERE, null, ex);
} finally {
lock.unlock();
System.out.println(this.getName() + " add " + i + " ......" + queue.size());
class TakeThread extends Thread {
private BlockingQ
public TakeThread(BlockingQueue queue, String name, Lock lock) {
this.queue =
this.name =
this.lock =
public void run() {
while (true) {
lock.lock();
Integer i = 0;
i = (Integer) queue.take();
System.out.println(this.getName() + " take " + i + " ......" + queue.size());
} catch (InterruptedException ex) {
Logger.getLogger(PutThread.class.getName()).log(Level.SEVERE, null, ex);
} finally {
lock.unlock();
//public class BlockingQueueThread {
public static ReentrantLock getLock(BlockingQueue queue) throws Exception {
Field fields[] = queue.getClass().getDeclaredFields();
for (Field f : fields) {
f.setAccessible(true);
if (f.getName().equals("lock")) {
return (ReentrantLock) f.get(queue);
public static void main(String[] args) throws Exception {
BlockingQueue queue = new ArrayBlockingQueue(10);
ExecutorService pool = Executors.newFixedThreadPool(15);
PutThread[] blockThread = new PutThread[10];
TakeThread[] takeThread = new TakeThread[5];
Object lock = new Object();
Object takeLock = new Object();
lock = getLock(queue);
takeLock =
} catch (Exception e) {
e.printStackTrace();
for (int i = 0; i & takeThread. i++) {
takeThread[i] = new TakeThread(queue, "thread" + i, takeLock);
pool.execute(takeThread[i]);
for (int i = 0; i & blockThread. i++) {
blockThread[i] = new PutThread(queue, "thread" + i, lock);
pool.execute(blockThread[i]);
pool.shutdown();
//class PutThread extends Thread {
private BlockingQ
public PutThread(BlockingQueue queue, String name, Object lock) {
this.queue =
this.name =
this.lock =
public void run() {
while (true) {
Thread.sleep(1000);
} catch (Exception ex) {
ex.printStackTrace();
// synchronized (lock) {
((ReentrantLock) lock).lock();
Integer i = new Random().nextInt(10) + 1;
queue.put(i);// 此处应用put take 。如果用add时,成功返回true,失败则抛出异常
} catch (InterruptedException ex) {
Logger.getLogger(PutThread.class.getName()).log(
Level.SEVERE, null, ex);
System.out.println(this.getName() + " add " + i + " ......"
+ queue.size());
} catch (Exception e) {
e.printStackTrace();
} finally {
((ReentrantLock) lock).unlock();
//class TakeThread extends Thread {
private BlockingQ
public TakeThread(BlockingQueue queue, String name, Object lock) {
this.queue =
this.name =
this.lock =
public void run() {
while (true) {
Thread.sleep(1000);
} catch (Exception ex) {
ex.printStackTrace();
// synchronized (lock) {
((ReentrantLock) lock).lock();
Integer i = 0;
i = (Integer) queue.take();
} catch (InterruptedException ex) {
Logger.getLogger(PutThread.class.getName()).log(
Level.SEVERE, null, ex);
System.out.println(this.getName() + " take " + i + " ......"
+ queue.size());
} catch (Exception e) {
e.printStackTrace();
} finally {
((ReentrantLock) lock).unlock();
每任务每线程
import java.util.ArrayL
import java.util.L
import java.util.concurrent.C
import java.pletionS
import java.util.concurrent.ExecutionE
import java.util.concurrent.ExecutorCompletionS
import java.util.concurrent.ExecutorS
import java.util.concurrent.E
import java.util.concurrent.F
import java.util.logging.L
import java.util.logging.L
public class CompletionServiceThread {
public static void main(String[] args) {
List&String& abc = new ArrayList&String&();
abc.add("11");
abc.add("233322");
abc.add("3rdddddr33");
abc.add("4rr44");
abc.add("");
abc.add("4444");
ExecutorService executor = Executors.newFixedThreadPool(2);
CompletionService service = new ExecutorCompletionService(executor);
for (final String s : abc) {//每个字符串分别获得一个线程进行处理,假设处理字符串需要时间较长
service.submit(new Callable() {//提交一批希望得到结果的任务
public Object call() throws Exception {
Thread.sleep(5000);
if(s.equals("11")){
throw new Exception();
return Thread.currentThread().getName()+" "+s + "
" + s.length() + "";
for(int i=0;i&abc.size();i++){
Future&String& future=service.take();//检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。
String a=future.get();
System.out.println(a);//处理已完成的任务
} catch (ExecutionException ex) {
Logger.getLogger(CompletionServiceThread.class.getName()).log(Level.SEVERE, null, ex);
} catch (InterruptedException ex) {
Logger.getLogger(CompletionServiceThread.class.getName()).log(Level.SEVERE, null, ex);
executor.shutdown();
* 某次运行结果:
* pool-1-thread-1 11
pool-1-thread-2 233322
pool-1-thread-1 3rdddddr33
pool-1-thread-2 4rr44
pool-1-thread-1
pool-1-thread-2 4444
主线程等从线程
import java.util.concurrent.BrokenBarrierE
import java.util.concurrent.CyclicB
import java.util.concurrent.ExecutorS
import java.util.concurrent.E
import java.util.concurrent.ThreadF
import java.util.concurrent.TimeU
import java.util.logging.L
import java.util.logging.L
public class BarrierThread {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3, new MainThread());//当有一个线程需要等待其他线程指向完毕后才能执行时可用此类
ExecutorService pool = Executors.newFixedThreadPool(3);
pool.execute(new SubThread(barrier));
pool.execute(new SubThread(barrier));
pool.execute(new SubThread(barrier));
pool.shutdown();
class MainThread extends Thread {
public void run() {
System.out.println("I'm here....");
class SubThread extends Thread {
private CyclicB
public SubThread(CyclicBarrier barrier) {
this.barrier =
public void run() {
for (int i = 0; i & 1000; i++) {
System.out.println(this.getName() + " is running " + i);
barrier.await();
} catch (InterruptedException ex) {
Logger.getLogger(SubThread.class.getName()).log(Level.SEVERE, null, ex);
} catch (BrokenBarrierException ex) {
Logger.getLogger(SubThread.class.getName()).log(Level.SEVERE, null, ex);
awaitTermination方法的使用
import java.util.concurrent.ExecutorS
import java.util.concurrent.E
import java.util.concurrent.TimeU
import java.util.logging.L
import java.util.logging.L
public class AwaitTerminationThread {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(2);
service.submit(new Runnable() {
public void run() {
for (int i = 0; i & 1000; i++) {
System.out.println(Thread.currentThread().getName()+"bbbbb " + i);
Thread.sleep(1000);
} catch (InterruptedException ex) {
Logger.getLogger(AwaitTerminationThread.class.getName()).log(Level.SEVERE, null, ex);
service.submit(new Runnable() {
new Thread.UncaughtExceptionHandler(){
public void run() {
for (int i = 0; i & 1000; i++) {
System.out.println(Thread.currentThread().getName()+" aaaaaa " + i);
Thread.sleep(2000);
} catch (InterruptedException ex) {
Logger.getLogger(AwaitTerminationThread.class.getName()).log(Level.SEVERE, null, ex);
service.shutdown();
service.awaitTermination(10, TimeUnit.SECONDS);//开始10秒时如果任务未完成一直阻塞让service submit的任务先执行完
} catch (InterruptedException ex) {
Logger.getLogger(AwaitTerminationThread.class.getName()).log(Level.SEVERE, null, ex);
while (true) {
System.out.println("11111");
集合迭代时抛出的异常
import java.util.ArrayL
import java.util.I
import java.util.L
public class ConcurrentModificationExceptionTest {
public static void main(String[] args) {
List&String& list=new ArrayList&String&();
list.add("1111");
list.add("222");
list.add("3333");
Iterator iterator=list.iterator();
while(iterator.hasNext()){
if(iterator.next().equals("222")){
iterator.remove();
list.add("555");//此处会有异常,迭代过程中迭代器对ArrayList做了修改后在迭代器内再通过list对集合做修改会抛异常
* 抛异常的标准就是modCount != expectedModCount
* 在Itr类的成员变量里对expectedModCount初始化的赋值是int expectedModCount = modCount Itr 是在执行iterator方法是返回的迭代器
* 组若这个过程中list中有add remove等操作就会使modCount这两个值不同,就会会抛出异常
list.add("4444");
System.out.println(list);
例子:三个线程按顺序执行
//转.......
public class ABCThread extends Thread{
private static Object o = new Object();
private static int count = 0;
private char ID;
private int num = 0;
public ABCThread(int id, char ID) {
this.ID = ID;
public void run() {
synchronized (o) {
while (num & 10) {
if (count % 3 == id) {//将count与线程id联系起来,count为0 3 6 9......时才会执行线程A,其它线程wait(),count为1,4,7...时执行线程1
System.out.println(ID);
o.notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
public static void main(String[] args) {
(new ABCThread(2, 'C')).start();
(new ABCThread(0, 'A')).start();
(new ABCThread(1, 'B')).start();
import java.text.DateF
import java.util.concurrent.S
import java.util.logging.L
import java.util.logging.L
public class ABCMyThread extends Thread {
int count = 10;
public ABCMyThread(String name, Semaphore current, Semaphore next) {
this.name =
this.next =
this.current =
public void run() {
while (count & 0) {
this.current.acquire();
} catch (InterruptedException ex) {
Logger.getLogger(ABCMyThread.class.getName()).log(Level.SEVERE, null, ex);
System.out.println(this.name);
this.next.release();
public static void main(String[] args) {
Semaphore a = new Semaphore(1);
Semaphore b = new Semaphore(0);
Semaphore c = new Semaphore(0);
Thread t1=new ABCMyThread("A",a,b);//t1进程用a信号量acquire可允许一个线程进入(即t1线程),如果t1循环执行一次后又到acquire,时不能往下执行,需等t3的a信号量执行release
Thread t2=new ABCMyThread("B",b,c);//t2线程用b信号量qcquire不允许任何线程进入,所以会等t1线程的b信号量执行release时才可运行
Thread t3=new ABCMyThread("C",c,a);//.....
t1.start();
t2.start();
t3.start();
//DateFormat format=new DateFormat() {};
浏览: 17486 次
来自: 北京
多谢你的帖子,帮我解决了问题
demo地址:/?p= ...
楼主,有demo吗,发一份学习一下可以吗
邮箱:292743 ...
当我执行scheduler.rescheduleJob(tri ...
awaitTermination方法的使用 的 第46行有问题 ...

我要回帖

更多关于 java方法返回值 的文章

 

随机推荐