delayqueue queue线程安全全吗

public&interface&Comparable&T&&{&&&&public&int&compareTo(T&o);}public&interface&Delayed&extends&Comparable&Delayed&&{&&&&long&getDelay(TimeUnit&unit);}public&class&DelayQueue&E&extends&Delayed&&implements&BlockingQueue&E&&{&&&&&private&final&PriorityQueue&E&&q&=&new&PriorityQueue&E&();}DelayQueue内部的实现使用了一个优先队列。当调用DelayQueue的offer方法时,把Delayed对象加入到优先队列q中。如下:public&boolean&offer(E&e)&{&&&&final&ReentrantLock&lock&=&this.&&&&lock.lock();&&&&try&{&&&&&&&&E&first&=&q.peek();&&&&&&&&q.offer(e);&&&&&&&&if&(first&==&null&||&pareTo(first)&&&0)&&&&&&&&&&&&available.signalAll();&&&&&&&&return&true;&&&&}&finally&{&&&&&&&&lock.unlock();&&&&}}DelayQueue的take方法,把优先队列q的first拿出来(peek),如果没有达到延时阀值,则进行await处理。如下:public&E&take()&throws&InterruptedException&{&&&&final&ReentrantLock&lock&=&this.&&&&lock.lockInterruptibly();&&&&try&{&&&&&&&&for&(;;)&{&&&&&&&&&&&&E&first&=&q.peek();&&&&&&&&&&&&if&(first&==&null)&{&&&&&&&&&&&&&&&&available.await();&&&&&&&&&&&&}&else&{&&&&&&&&&&&&&&&&long&delay&=&&first.getDelay(TimeUnit.NANOSECONDS);&&&&&&&&&&&&&&&&if&(delay&&&0)&{&&&&&&&&&&&&&&&&&&&&long&tl&=&available.awaitNanos(delay);&&&&&&&&&&&&&&&&}&else&{&&&&&&&&&&&&&&&&&&&&E&x&=&q.poll();&&&&&&&&&&&&&&&&&&&&assert&x&!=&null;&&&&&&&&&&&&&&&&&&&&if&(q.size()&!=&0)&&&&&&&&&&&&&&&&&&&&&&&&available.signalAll();&//&wake&up&other&takers&&&&&&&&&&&&&&&&&&&&return&x;&&&&&&&&&&&&&&&&}&&&&&&&&&&&&}&&&&&&&&}&&&&}&finally&{&&&&&&&&lock.unlock();&&&&}}-------------------以下是Sample,是一个缓存的简单实现。共包括三个类Pair、DelayItem、Cache。如下:public&class&Pair&K,&V&&{&&&&public&K&&&&&public&V&&&&&&&&&public&Pair()&{}&&&&&&&&public&Pair(K&first,&V&second)&{&&&&&&&&this.first&=&&&&&&&&&this.second&=&&&&&}}--------------以下是Delayed的实现import&java.util.concurrent.Dimport&java.util.concurrent.TimeUimport&java.util.concurrent.atomic.AtomicLpublic&class&DelayItem&T&&implements&Delayed&{&&&&/**&Base&of&nanosecond&timings,&to&avoid&wrapping&*/&&&&private&static&final&long&NANO_ORIGIN&=&System.nanoTime();&&&&/**&&&&&*&Returns&nanosecond&time&offset&by&origin&&&&&*/&&&&final&static&long&now()&{&&&&&&&&return&System.nanoTime()&-&NANO_ORIGIN;&&&&}&&&&/**&&&&&*&Sequence&number&to&break&scheduling&ties,&and&in&turn&to&guarantee&FIFO&order&among&tied&&&&&*&entries.&&&&&*/&&&&private&static&final&AtomicLong&sequencer&=&new&AtomicLong(0);&&&&/**&Sequence&number&to&break&ties&FIFO&*/&&&&private&final&long&sequenceN&&&&/**&The&time&the&task&is&enabled&to&execute&in&nanoTime&units&*/&&&&private&final&long&&&&&private&final&T&&&&&public&DelayItem(T&submit,&long&timeout)&{&&&&&&&&this.time&=&now()&+&&&&&&&&&this.item&=&&&&&&&&&this.sequenceNumber&=&sequencer.getAndIncrement();&&&&}&&&&public&T&getItem()&{&&&&&&&&return&this.&&&&}&&&&public&long&getDelay(TimeUnit&unit)&{&&&&&&&&long&d&=&unit.convert(time&-&now(),&TimeUnit.NANOSECONDS);&&&&&&&&return&d;&&&&}&&&&public&int&compareTo(Delayed&other)&{&&&&&&&&if&(other&==&this)&//&compare&zero&ONLY&if&same&object&&&&&&&&&&&&return&0;&&&&&&&&if&(other&instanceof&DelayItem)&{&&&&&&&&&&&&DelayItem&x&=&(DelayItem)&&&&&&&&&&&&&long&diff&=&time&-&x.&&&&&&&&&&&&if&(diff&&&0)&&&&&&&&&&&&&&&&return&-1;&&&&&&&&&&&&else&if&(diff&&&0)&&&&&&&&&&&&&&&&return&1;&&&&&&&&&&&&else&if&(sequenceNumber&&&x.sequenceNumber)&&&&&&&&&&&&&&&&return&-1;&&&&&&&&&&&&else&&&&&&&&&&&&&&&&return&1;&&&&&&&&}&&&&&&&&long&d&=&(getDelay(TimeUnit.NANOSECONDS)&-&other.getDelay(TimeUnit.NANOSECONDS));&&&&&&&&return&(d&==&0)&?&0&:&((d&&&0)&?&-1&:&1);&&&&}}以下是Cache的实现,包括了put和get方法,还包括了可执行的main函数。import&java.util.concurrent.ConcurrentHashMimport&java.util.concurrent.ConcurrentMimport&java.util.concurrent.DelayQimport&java.util.concurrent.TimeUimport&java.util.logging.Limport&java.util.logging.Lpublic&class&Cache&K,&V&&{&&&&private&static&final&Logger&LOG&=&Logger.getLogger(Cache.class.getName());&&&&private&ConcurrentMap&K,&V&&cacheObjMap&=&new&ConcurrentHashMap&K,&V&();&&&&private&DelayQueue&DelayItem&Pair&K,&V&&&&q&=&new&DelayQueue&DelayItem&Pair&K,&V&&&();&&&&private&Thread&daemonT&&&&public&Cache()&{&&&&&&&&Runnable&daemonTask&=&new&Runnable()&{&&&&&&&&&&&&public&void&run()&{&&&&&&&&&&&&&&&&daemonCheck();&&&&&&&&&&&&}&&&&&&&&};&&&&&&&&daemonThread&=&new&Thread(daemonTask);&&&&&&&&daemonThread.setDaemon(true);&&&&&&&&daemonThread.setName("Cache&Daemon");&&&&&&&&daemonThread.start();&&&&}&&&&private&void&daemonCheck()&{&&&&&&&&if&(LOG.))&&&&&&&&&&&&("cache&service&started.");&&&&&&&&for&(;;)&{&&&&&&&&&&&&try&{&&&&&&&&&&&&&&&&DelayItem&Pair&K,&V&&&delayItem&=&q.take();&&&&&&&&&&&&&&&&if&(delayItem&!=&null)&{&&&&&&&&&&&&&&&&&&&&//&超时对象处理&&&&&&&&&&&&&&&&&&&&Pair&K,&V&&pair&=&delayItem.getItem();&&&&&&&&&&&&&&&&&&&&cacheObjMap.remove(pair.first,&pair.second);&//&compare&and&remove&&&&&&&&&&&&&&&&}&&&&&&&&&&&&}&catch&(InterruptedException&e)&{&&&&&&&&&&&&&&&&if&(LOG.isLoggable(Level.SEVERE))&&&&&&&&&&&&&&&&&&&&LOG.log(Level.SEVERE,&e.getMessage(),&e);&&&&&&&&&&&&&&&&break;&&&&&&&&&&&&}&&&&&&&&}&&&&&&&&if&(LOG.))&&&&&&&&&&&&("cache&service&stopped.");&&&&}&&&&//&添加缓存对象&&&&public&void&put(K&key,&V&value,&long&time,&TimeUnit&unit)&{&&&&&&&&V&oldValue&=&cacheObjMap.put(key,&value);&&&&&&&&if&(oldValue&!=&null)&&&&&&&&&&&&q.remove(key);&&&&&&&&long&nanoTime&=&TimeUnit.NANOSECONDS.convert(time,&unit);&&&&&&&&q.put(new&DelayItem&Pair&K,&V&&(new&Pair&K,&V&(key,&value),&nanoTime));&&&&}&&&&public&V&get(K&key)&{&&&&&&&&return&cacheObjMap.get(key);&&&&}&&&&//&测试入口函数&&&&public&static&void&main(String[]&args)&throws&Exception&{&&&&&&&&Cache&Integer,&String&&cache&=&new&Cache&Integer,&String&();&&&&&&&&cache.put(1,&"aaaa",&3,&TimeUnit.SECONDS);&&&&&&&&Thread.sleep(1000&*&2);&&&&&&&&{&&&&&&&&&&&&String&str&=&cache.get(1);&&&&&&&&&&&&System.out.println(str);&&&&&&&&}&&&&&&&&Thread.sleep(1000&*&2);&&&&&&&&{&&&&&&&&&&&&String&str&=&cache.get(1);&&&&&&&&&&&&System.out.println(str);&&&&&&&&}&&&&}}运行Sample,main函数执行的结果是输出两行,第一行为aaa,第二行为null。Android之Java基础(12)

一、Java& Queue基础
Queue: 基本上,一个队列就是一个先入先出(FIFO)的数据结构
offer,add区别:
一些队列有大小限制,因此如果想在一个满的队列中加入一个新项,多出的项就会被拒绝。
这时新的 offer 方法就可以起作用了。它不是对调用 add() 方法抛出一个 unchecked 异常,而只是得到由 offer() 返回的 false。
poll,remove区别:
remove() 和 poll() 方法都是从队列中删除第一个元素(head)。remove() 的行为与 Collection 接口的版本相似,
但是新的 poll() 方法在用空集合调用时不是抛出异常,只是返回 null。因此新的方法更适合容易出现异常条件的情况。
peek,element区别:
element() 和 peek() 用于在队列的头部查询元素。与 remove() 方法类似,在队列为空时, element() 抛出一个异常,而 peek() 返回 null。
Tiger中有2组Queue的实现:实现了新的BlockingQueue接口的和没有实现的
1)没有实现的阻塞接口的LinkedList: 实现了java.util.Queue接口和java.util.AbstractQueue接口
内置的不阻塞队列: PriorityQueue 和 ConcurrentLinkedQueue
PriorityQueue 和 ConcurrentLinkedQueue 类在 Collection Framework 中加入两个具体集合实现。
PriorityQueue 类实质上维护了一个有序列表。加入到 Queue 中的元素根据它们的天然排序(通过其 parable 实现)或者根据传递给构造函数的 parator 实现来定位。
ConcurrentLinkedQueue 是基于链接节点的、线程安全的队列。并发访问不需要同步。因为它在队列的尾部添加元素并从头部删除它们,所以只要不需要知道队列的大 小,ConcurrentLinkedQueue 对公共集合的共享访问就可以工作得很好。收集关于队列大小的信息会很慢,需要遍历队列。
2)实现阻塞接口的:
新 的 java.util.concurrent 包在 Collection Framework 中可用的具体集合类中加入了 BlockingQueue 接口和五个阻塞队列类。它实质上就是一种带有一点扭曲的 FIFO 数据结构。不是立即从队列中添加或者删除元素,线程执行操作阻塞,直到有空间或者元素可用。
五个队列所提供的各有不同:
* ArrayBlockingQueue :一个由数组支持的有界队列。
* LinkedBlockingQueue :一个由链接节点支持的可选有界队列。
* PriorityBlockingQueue :一个由优先级堆支持的无界优先级队列。
* DelayQueue :一个由优先级堆支持的、基于时间的调度队列。
* SynchronousQueue :一个利用 BlockingQueue 接口的简单聚集(rendezvous)机制。
前 两个类 ArrayBlockingQueue 和 LinkedBlockingQueue 几乎相同,只是在后备存储器方面有所不同, LinkedBlockingQueue 并不总是有容量界限。无大小界限的 LinkedBlockingQueue 类在添加元素时永远不会有阻塞队列的等待(至少在其中有Integer.MAX_VALUE 元素之前不会)。
PriorityBlockingQueue 是具有无界限容量的队列,它利用所包含元素的 Comparable 排序顺序来以逻辑顺序维护元素。可以将它看作 TreeSet 的可能替代物。不过对 PriorityBlockingQueue 有一个技巧。从 iterator() 返回的 Iterator 实例不需要以优先级顺序返回元素。如果必须以优先级顺序遍历所有元素,那么让它们都通过 toArray() 方法并自己对它们排序,像 Arrays.sort(pq.toArray())。
新的 DelayQueue 实现可能是其中最有意思(也是最复杂)的一个。加入到队列中的元素必须实现新的 Delayed 接口(只有一个方法 —— long getDelay(java.util.concurrent.TimeUnit unit) )。因为队列的大小没有界限,使得添加可以立即返回,但是在延迟时间过去之前不能从队列中取出元素。如果多个元素完成了延迟,那么最早失效/失效时间最长 的元素将第一个取出。实际上没有听上去这样复杂。
SynchronousQueue 类是最简单的。它没有内部容量。它就像线程之间的手递手机制。在队列中加入一个元素的生产者会等待另一个线程的消费者。当这个消费者出现时,这个元素就直接在消费者和生产者之间传递,永远不会加入到阻塞队列中。
二、queue的使用
Queue接口与List、Set同一级别,都是继承了Collection接口。LinkedList实现了Queue接 口。Queue接口窄化了对LinkedList的方法的访问权限(即在方法中的参数类型如果是Queue时,就完全只能访问Queue接口所定义的方法 了,而不能直接访问 LinkedList的非Queue的方法),以使得只有恰当的方法才可以使用。BlockingQueue 继承了Queue接口。
队列是一种数据结构.它有两个基本操作:在队列尾部加人一个元素,和从队列头部移除一个元素就是说,队列以一种先进先出的方式管理数据,如果你试图向一个 已经满了的阻塞队列中添加一个元素或者是从一个空的阻塞队列中移除一个元索,将导致线程阻塞.在多线程进行合作时,阻塞队列是很有用的工具。工作者线程可 以定期地把中间结果存到阻塞队列中而其他工作者线线程把中间结果取出并在将来修改它们。队列会自动平衡负载。如果第一个线程集运行得比第二个慢,则第二个
线程集在等待结果时就会阻塞。如果第一个线程集运行得快,那么它将等待第二个线程集赶上来。下表显示了jdk1.5中的阻塞队列的操作:
add&&&&&&& 增加一个元索&&&&&&&&&&&&&&&&&&&&
如果队列已满,则抛出一个IIIegaISlabEepeplian异常
remove&& 移除并返回队列头部的元素&&& 如果队列为空,则抛出一个NoSuchElementException异常
element&&返回队列头部的元素&&&&&&&&&&&& 如果队列为空,则抛出一个NoSuchElementException异常
offer&&&&&& 添加一个元素并返回true&&&&&&&如果队列已满,则返回false
poll&&&&&&&& 移除并返问队列头部的元素&&&&如果队列为空,则返回null
peek&&&&&& 返回队列头部的元素&&&&&&&&&&&& 如果队列为空,则返回null
put&&&&&&&& 添加一个元素&&&&&&&&&&&&&&&&&&&&&
如果队列满,则阻塞
take&&&&&&& 移除并返回队列头部的元素&&&& 如果队列为空,则阻塞
remove、element、offer&、poll、peek&其实是属于Queue接口。&
阻塞队列的操作可以根据它们的响应方式分为以下三类:aad、removee和element操作在你试图为一个已满的队列增加元素或从空队列取得元素时 抛出异常。当然,在多线程程序中,队列在任何时间都可能变成满的或空的,所以你可能想使用offer、poll、peek方法。这些方法在无法完成任务时 只是给出一个出错示而不会抛出异常。
注意:poll和peek方法出错进返回null。因此,向队列中插入null值是不合法的。
还有带超时的offer和poll方法变种,例如,下面的调用:
boolean success = q.offer(x,100,TimeUnit.MILLISECONDS);
尝试在100毫秒内向队列尾部插入一个元素。如果成功,立即返回true;否则,当到达超时进,返回false。同样地,调用:
Object head = q.poll(100, TimeUnit.MILLISECONDS);
如果在100毫秒内成功地移除了队列头元素,则立即返回头元素;否则在到达超时时,返回null。
最后,我们有阻塞操作put和take。put方法在队列满时阻塞,take方法在队列空时阻塞。
java.ulil.concurrent包提供了阻塞队列的4个变种。默认情况下,LinkedBlockingQueue的容量是没有上限的(说的不准确,在不指定时容量为Integer.MAX_VALUE,不要然的话在put时怎么会受阻呢),但是也可以选择指定其最大容量,它是基于链表的队列,此队列按
FIFO(先进先出)排序元素。
ArrayBlockingQueue在构造时需要指定容量, 并可以选择是否需要公平性,如果公平参数被设置true,等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来 达到这种公平性的:即等待时间最长的线程会先操作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队 列,此队列按 FIFO(先进先出)原则对元素进行排序。
PriorityBlockingQueue是一个带优先级的 队列,而不是先进先出队列。元素按优先级顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue是对 PriorityQueue的再次包装,是基于堆数据结构的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞 队列上put时是不会受阻的。虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致
OutOfMemoryError),但是如果队列为空,那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外,往入该队列中的元 素要具有比较能力。
最后,DelayQueue(基于PriorityQueue来实现的)是一个存放Delayed 元素的无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll就以移除这个元素了。此队列不允许使用
null 元素。 下面是延迟接口:
public&interface&Delayed&extends&Comparable&Delayed&&{&&&&&&&long&getDelay(TimeUnit&unit);&&}&&
放入DelayQueue的元素还将要实现compareTo方法,DelayQueue使用这个来为元素排序。
下面的实例展示了如何使用阻塞队列来控制线程集。程序在一个目录及它的所有子目录下搜索所有文件,打印出包含指定关键字的文件列表。从下面实例可以看出,使用阻塞队列两个显著的好处就是:多线程操作共同的队列时不需要额外的同步,另外就是队列会自动平衡负载,即那边(生产与消费两边)处理快了就会被阻塞掉,从而减少两边的处理速度差距。下面是具体实现:
public&class&BlockingQueueTest&{&&&&&&public&static&void&main(String[]&args)&{&&&&&&&&&&Scanner&in&=&new&Scanner(System.in);&&&&&&&&&&System.out.print(&Enter&base&directory&(e.g.&/usr/local/jdk5.0/src):&&);&&&&&&&&&&String&directory&=&in.nextLine();&&&&&&&&&&System.out.print(&Enter&keyword&(e.g.&volatile):&&);&&&&&&&&&&String&keyword&=&in.nextLine();&&&&&&&&&&&&final&int&FILE_QUEUE_SIZE&=&10;//&阻塞队列大小&&&&&&&&&&final&int&SEARCH_THREADS&=&100;//&关键字搜索线程个数&&&&&&&&&&&&//&基于ArrayBlockingQueue的阻塞队列&&&&&&&&&&BlockingQueue&File&&queue&=&new&ArrayBlockingQueue&File&(&&&&&&&&&&&&&&&&&&FILE_QUEUE_SIZE);&&&&&&&&&&&&//只启动一个线程来搜索目录&&&&&&&&&&FileEnumerationTask&enumerator&=&new&FileEnumerationTask(queue,&&&&&&&&&&&&&&&&&&new&File(directory));&&&&&&&&&&new&Thread(enumerator).start();&&&&&&&&&&&&&&&&&&&&//启动100个线程用来在文件中搜索指定的关键字&&&&&&&&&&for&(int&i&=&1;&i&&=&SEARCH_THREADS;&i++)&&&&&&&&&&&&&&new&Thread(new&SearchTask(queue,&keyword)).start();&&&&&&}&&}&&class&FileEnumerationTask&implements&Runnable&{&&&&&&//哑元文件对象,放在阻塞队列最后,用来标示文件已被遍历完&&&&&&public&static&File&DUMMY&=&new&File(&&);&&&&&&&&private&BlockingQueue&File&&&&&&&&private&File&startingD&&&&&&&&public&FileEnumerationTask(BlockingQueue&File&&queue,&File&startingDirectory)&{&&&&&&&&&&this.queue&=&&&&&&&&&&&this.startingDirectory&=&startingD&&&&&&}&&&&&&&&public&void&run()&{&&&&&&&&&&try&{&&&&&&&&&&&&&&enumerate(startingDirectory);&&&&&&&&&&&&&&queue.put(DUMMY);//执行到这里说明指定的目录下文件已被遍历完&&&&&&&&&&}&catch&(InterruptedException&e)&{&&&&&&&&&&}&&&&&&}&&&&&&&&//&将指定目录下的所有文件以File对象的形式放入阻塞队列中&&&&&&public&void&enumerate(File&directory)&throws&InterruptedException&{&&&&&&&&&&File[]&files&=&directory.listFiles();&&&&&&&&&&for&(File&file&:&files)&{&&&&&&&&&&&&&&if&(file.isDirectory())&&&&&&&&&&&&&&&&&&enumerate(file);&&&&&&&&&&&&&&else&&&&&&&&&&&&&&&&&&//将元素放入队尾,如果队列满,则阻塞&&&&&&&&&&&&&&&&&&queue.put(file);&&&&&&&&&&}&&&&&&}&&}&&class&SearchTask&implements&Runnable&{&&&&&&private&BlockingQueue&File&&&&&&&&private&String&&&&&&&&&public&SearchTask(BlockingQueue&File&&queue,&String&keyword)&{&&&&&&&&&&this.queue&=&&&&&&&&&&&this.keyword&=&&&&&&&}&&&&&&&&public&void&run()&{&&&&&&&&&&try&{&&&&&&&&&&&&&&boolean&done&=&false;&&&&&&&&&&&&&&while&(!done)&{&&&&&&&&&&&&&&&&&&//取出队首元素,如果队列为空,则阻塞&&&&&&&&&&&&&&&&&&File&file&=&queue.take();&&&&&&&&&&&&&&&&&&if&(file&==&FileEnumerationTask.DUMMY)&{&&&&&&&&&&&&&&&&&&&&&&//取出来后重新放入,好让其他线程读到它时也很快的结束&&&&&&&&&&&&&&&&&&&&&&queue.put(file);&&&&&&&&&&&&&&&&&&&&&&done&=&true;&&&&&&&&&&&&&&&&&&}&else&&&&&&&&&&&&&&&&&&&&&&search(file);&&&&&&&&&&&&&&}&&&&&&&&&&}&catch&(IOException&e)&{&&&&&&&&&&&&&&e.printStackTrace();&&&&&&&&&&}&catch&(InterruptedException&e)&{&&&&&&&&&&}&&&&&&}&&&&&&public&void&search(File&file)&throws&IOException&{&&&&&&&&&&Scanner&in&=&new&Scanner(new&FileInputStream(file));&&&&&&&&&&int&lineNumber&=&0;&&&&&&&&&&while&(in.hasNextLine())&{&&&&&&&&&&&&&&lineNumber++;&&&&&&&&&&&&&&String&line&=&in.nextLine();&&&&&&&&&&&&&&if&(line.contains(keyword))&&&&&&&&&&&&&&&&&&System.out.printf(&%s:%d:%s%n&,&file.getPath(),&lineNumber,&&&&&&&&&&&&&&&&&&&&&&&&&&line);&&&&&&&&&&}&&&&&&&&&&in.close();&&&&&&}&&}&
三&、阻塞队列BlockingQueue
在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列 类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场 景。
认识BlockingQueue
阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:
从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;
常用的队列主要有以下两种:(当然通过不同的实现方式,还可以延伸出很多不同类型的队列,DelayQueue就是其中的一种)
  先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性。
  后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。
&& & &多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若 干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的 数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度, 并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。然而,
在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小 的复杂度。好在此时,强大的concurrent包横空出世了,而他也给我们带来了强大的BlockingQueue。(在多线程领域:所谓阻塞,在某些 情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒)
下面两幅图演示了BlockingQueue的两个常见阻塞场景:
       如上图所示:当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
   如上图所示:当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。
&& & 这也是我们在多线程环境下,为什么需要BlockingQueue的原因。作为BlockingQueue的使用者,我们再也不需要关心什么时候需要阻塞 线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。既然BlockingQueue如此神通广大,让我们一起来见识下 它的常用方法:
BlockingQueue的核心方法:
放入数据:
  offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,
    则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
  offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中
    加入BlockingQueue,则返回失败。
  put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断
    直到BlockingQueue里面有空间再继续.
获取数据:
  poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
    取不到时返回
  poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
    队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
  take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
    BlockingQueue有新的数据被加入;&
  drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),&
    通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
常见BlockingQueue
在了解了BlockingQueue的基本功能后,让我们来看看BlockingQueue家庭大致有哪些成员?&
BlockingQueue成员详细介绍
1. ArrayBlockingQueue
&& & &基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了 一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
   ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于 LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的 完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额 外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任
何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而 在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
2. LinkedBlockingQueue
&& & &基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列 中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时 (LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反 之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别
采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
作为开 发 者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认 一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统 内存就有可能已被消耗殆尽了。
ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。
下面的代码演示了如何使用BlockingQueue:
import java.util.concurrent.BlockingQ
import java.util.concurrent.ExecutorS
import java.util.concurrent.E
import java.util.concurrent.LinkedBlockingQ
&* @author jackyuj
public class BlockingQueueTest {
&&&&public static
void main(String[] args) throws InterruptedException {
&&&&&&&&// 声明一个容量为10的缓存队列
&&&&&&&&BlockingQueue&String& queue =
new LinkedBlockingQueue&String&(10);
&&&&&&&&Producer producer1 =
new Producer(queue);
&&&&&&&&Producer producer2 =
new Producer(queue);
&&&&&&&&Producer producer3 =
new Producer(queue);
&&&&&&&&Consumer consumer =
new Consumer(queue);
&&&&&&&&// 借助Executors
&&&&&&&&ExecutorService service = Executors.newCachedThreadPool();
&&&&&&&&// 启动线程
&&&&&&&&service.execute(producer1);
&&&&&&&&service.execute(producer2);
&&&&&&&&service.execute(producer3);
&&&&&&&&service.execute(consumer);
&&&&&&&&// 执行10s
&&&&&&&&Thread.sleep(10
&&&&&&&&producer1.stop();
&&&&&&&&producer2.stop();
&&&&&&&&producer3.stop();
&&&&&&&&Thread.sleep(2000);
&&&&&&&&// 退出Executor
&&&&&&&&service.shutdown();
import java.util.R
import java.util.concurrent.BlockingQ
import java.util.concurrent.TimeU
&* 消费者线程
&* @author jackyuj
public class Consumer
implements Runnable {
&&&&public Consumer(BlockingQueue&String& queue) {
&&&&&&&&this.queue =
&&&&public void
&&&&&&&&System.out.println(&启动消费者线程!&);
&&&&&&&&Random r = new
&&&&&&&&boolean
isRunning = true;
&&&&&&&&try {
&&&&&&&&&&&&while
(isRunning) {
&&&&&&&&&&&&&&&&System.out.println(&正从队列获取数据...&);
&&&&&&&&&&&&&&&&String data = queue.poll(2, TimeUnit.SECONDS);
&&&&&&&&&&&&&&&&if
(null != data) {
&&&&&&&&&&&&&&&&&&&&System.out.println(&拿到数据:&
+ data);
&&&&&&&&&&&&&&&&&&&&System.out.println(&正在消费数据:&
+ data);
&&&&&&&&&&&&&&&&&&&&Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
&&&&&&&&&&&&&&&&} else
&&&&&&&&&&&&&&&&&&&&// 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
&&&&&&&&&&&&&&&&&&&&isRunning =
&&&&&&&&&&&&&&&&}
&&&&&&&&&&&&}
&&&&&&&&} catch
(InterruptedException e) {
&&&&&&&&&&&&e.printStackTrace();
&&&&&&&&&&&&Thread.currentThread().interrupt();
&&&&&&&&} finally
&&&&&&&&&&&&System.out.println(&退出消费者线程!&);
&&&&private BlockingQueue&String&
&&&&private static
final int&&&&& DEFAULT_RANGE_FOR_SLEEP = 1000;
import java.util.R
import java.util.concurrent.BlockingQ
import java.util.concurrent.TimeU
import java.util.concurrent.atomic.AtomicI
&* 生产者线程
&* @author jackyuj
public class Producer
implements Runnable {
&&&&public Producer(BlockingQueue queue) {
&&&&&&&&this.queue =
&&&&public void
&&&&&&&&String data =
&&&&&&&&Random r = new
&&&&&&&&System.out.println(&启动生产者线程!&);
&&&&&&&&try {
&&&&&&&&&&&&while
(isRunning) {
&&&&&&&&&&&&&&&&System.out.println(&正在生产数据...&);
&&&&&&&&&&&&&&&&Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
&&&&&&&&&&&&&&&&data =
&data:& + count.incrementAndGet();
&&&&&&&&&&&&&&&&System.out.println(&将数据:&
+ data + &放入队列...&);
&&&&&&&&&&&&&&&&if
(!queue.offer(data, 2, TimeUnit.SECONDS)) {
&&&&&&&&&&&&&&&&&&&&System.out.println(&放入数据失败:&
+ data);
&&&&&&&&&&&&&&&&}
&&&&&&&&&&&&}
&&&&&&&&} catch
(InterruptedException e) {
&&&&&&&&&&&&e.printStackTrace();
&&&&&&&&&&&&Thread.currentThread().interrupt();
&&&&&&&&} finally
&&&&&&&&&&&&System.out.println(&退出生产者线程!&);
&&&&public void
&&&&&&&&isRunning = false;
&&&&private volatile
boolean&&&&& isRunning&&&&&&&&&&&&&& = true;
&&&&private BlockingQ
&&&&private static
AtomicInteger& count&&&&&&&&&&&&&&&&&& = new AtomicInteger();
&&&&private static
final int&&&&& DEFAULT_RANGE_FOR_SLEEP = 1000;
3. DelayQueue
&& & &DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
使用场景:
  DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。
4. PriorityBlockingQueue
&& & &基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不 会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度, 否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
5. SynchronousQueue
&& & &一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去 集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中 间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商 品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者
中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
  声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:
  如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
   但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的 生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。
  BlockingQueue不光实现了一个完整队列所具有的基本功能,同时在多线程环境下,他还自动管理了多线间的自动等待于唤醒功能,从而使得程序员可以忽略这些细节,关注更高级的功能。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:5504次
排名:千里之外
转载:57篇
(18)(5)(1)(16)(19)(1)

我要回帖

更多关于 stl queue 线程安全 的文章

 

随机推荐