java 中 queue 的使用
Queue 接口与 List、Set 同一级别,都是继承了 Collection 接口。LinkedList 实现了 Queue 接 口。Queue 接口窄化了对 LinkedList 的方法的访问权限(即在方法中的参数类型如果是 Queue 时,就完全只能访问 Queue 接口所定义的方法 了,而不能直接访问 LinkedList 的非 Queue 的方法),以使得只有恰当的方法才可以使用。BlockingQueue 继承了 Queue 接口。
队列是一种数据结构.它有两个基本操作:在队列尾部加人一个元素,和从队列头部移除一个元素就是说,队列以一种先进先出的方式管理数据,如果你试图向一个 已经满了的阻塞队列中添加一个元素或者是从一个空的阻塞队列中移除一个元索,将导致线程阻塞.在多线程进行合作时,阻塞队列是很有用的工具。工作者线程可 以定期地把中间结果存到阻塞队列中而其他工作者线线程把中间结果取出并在将来修改它们。队列会自动平衡负载。如果第一个线程集运行得比第二个慢,则第二个 线程集在等待结果时就会阻塞。如果第一个线程集运行得快,那么它将等待第二个线程集赶上来。下表显示了 jdk1.5 中的阻塞队列的操作:
add
remove
element
offer
poll
peek
put
take
remove、element、offer
阻塞队列的操作可以根据它们的响应方式分为以下三类:aad、removee 和 element 操作在你试图为一个已满的队列增加元素或从空队列取得元素时 抛出异常。当然,在多线程程序中,队列在任何时间都可能变成满的或空的,所以你可能想使用 offer、poll、peek 方法。这些方法在无法完成任务时 只是给出一个出错示而不会抛出异常。
注意:poll 和 peek 方法出错进返回 null。因此,向队列中插入 null 值是不合法的。
还有带超时的 offer 和 poll 方法变种,例如,下面的调用:
boolean success = q.offer(x,100,TimeUnit.MILLISECONDS);
尝试在 100 毫秒内向队列尾部插入一个元素。如果成功,立即返回 true;否则,当到达超时进,返回 false。同样地,调用:
Object head = q.poll(100, TimeUnit.MILLISECONDS);
如果在 100 毫秒内成功地移除了队列头元素,则立即返回头元素;否则在到达超时时,返回 null。
最后,我们有阻塞操作 put 和 take。put 方法在队列满时阻塞,take 方法在队列空时阻塞。
java.ulil.concurrent 包提供了阻塞队列的 4 个变种。默认情况下,LinkedBlockingQueue的容量是没有上限的(说的不准确,在不指定时容量为 Integer.MAX_VALUE,不要然的话在 put 时怎么会受阻呢),但是也可以选择指定其最大容量,它是基于链表的队列,此队列按 FIFO(先进先出)排序元素。
ArrayBlockingQueue在构造时需要指定容量, 并可以选择是否需要公平性,如果公平参数被设置 true,等待时间最长的线程会优先得到处理(其实就是通过将 ReentrantLock 设置为 true 来 达到这种公平性的:即等待时间最长的线程会先操作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队 列,此队列按 FIFO(先进先出)原则对元素进行排序。
PriorityBlockingQueue是一个带优先级的 队列,而不是先进先出队列。元素按优先级顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue 是对 PriorityQueue 的再次包装,是基于堆数据结构的,而 PriorityQueue 是没有容量限制的,与 ArrayList 一样,所以在优先阻塞 队列上 put 时是不会受阻的。虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),但是如果队列为空,那么取元素的操作 take 就会阻塞,所以它的检索操作 take 是受阻的。另外,往入该队列中的元 素要具有比较能力。
最后,DelayQueue(基于 PriorityQueue 来实现的)是一个存放 Delayed 元素的无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll 就以移除这个元素了。此队列不允许使用 null 元素。 下面是延迟接口:
java 代码 public interface Delayed extends Comparable { long getDelay(TimeUnit unit); }
放入 DelayQueue 的元素还将要实现 compareTo 方法,DelayQueue 使用这个来为元素排序。
下面的实例展示了如何使用阻塞队列来控制线程集。程序在一个目录及它的所有子目录下搜索所有文件,打印出包含指定关键字的文件列表。从下面实例可以看出,使用阻塞队列两个显著的好处就是:多线程操作共同的队列时不需要额外的同步,另外就是队列会自动平衡负载,即那边(生产与消费两边)处理快了就会被阻塞掉,从而减少两边的处理速度差距。下面是具体实现
Java 代码 public class BlockingQueueTest { public static void main(String[] args) { Scanner in = new Scanner(System.in); System.out.print("Enter base directory (e.g. /usr/local/jdk5.0/src): "); String directory = in.nextLine(); System.out.print("Enter keyword (e.g. volatile): "); String keyword = in.nextLine(); final int FILE_QUEUE_SIZE = 10;// 阻塞队列大小 final int SEARCH_THREADS = 100;// 关键字搜索线程个数 // 基于 ArrayBlockingQueue 的阻塞队列 BlockingQueue queue = new ArrayBlockingQueue( FILE_QUEUE_SIZE); //只启动一个线程来搜索目录 FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory)); new Thread(enumerator).start(); //启动 100 个线程用来在文件中搜索指定的关键字 for (int i = 1; i <= SEARCH_THREADS; i++) new Thread(new SearchTask(queue, keyword)).start(); } } class FileEnumerationTask implements Runnable { //哑元文件对象,放在阻塞队列最后,用来标示文件已被遍历完 public static File DUMMY = new File(""); private BlockingQueue queue; private File startingDirectory; public FileEnumerationTask(BlockingQueue queue, File startingDirectory) { this.queue = queue; this.startingDirectory = startingDirectory; } public void run() { try { enumerate(startingDirectory); queue.put(DUMMY);//执行到这里说明指定的目录下文件已被遍历完 } catch (InterruptedException e) { } } // 将指定目录下的所有文件以 File 对象的形式放入阻塞队列中 public void enumerate(File directory) throws InterruptedException { File[] files = directory.listFiles(); for (File file : files) { if (file.isDirectory()) enumerate(file); else //将元素放入队尾,如果队列满,则阻塞 queue.put(file); } } } class SearchTask implements Runnable { private BlockingQueue queue; private String keyword; public SearchTask(BlockingQueue queue, String keyword) { this.queue = queue; this.keyword = keyword; } public void run() { try { boolean done = false; while (!done) { //取出队首元素,如果队列为空,则阻塞 File file = queue.take(); if (file == FileEnumerationTask.DUMMY) { //取出来后重新放入,好让其他线程读到它时也很快的结束 queue.put(file); done = true; } else search(file); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { } } public void search(File file) throws IOException { Scanner in = new Scanner(new FileInputStream(file)); int lineNumber = 0; while (in.hasNextLine()) { lineNumber++; String line = in.nextLine(); if (line.contains(keyword)) System.out.printf("%s:%d:%s%n", file.getPath(), lineNumber, line); } in.close(); } }