Implementing an Unbounded Work Queue

A work queue is used to coordinate work between a producer and a set of worker threads. When some work needs to be performed, the producer adds an object containing the work information to the work queue. One of the worker threads then removes the object from the work queue and acts upon the information. If the queue is empty, a worker thread will block until a new object is added to the queue.

This example declares a class that implements an unbounded work queue using a linked list.

public class WorkQueue { LinkedList queue = new LinkedList(); // Add work to the work queue public synchronized void addWork(Object o) { queue.addLast(o); notify(); } // Retrieve work from the work queue; block if the queue is empty public synchronized Object getWork() throws InterruptedException { while (queue.isEmpty()) { wait(); } return queue.removeFirst(); } }
Here's an example that uses the WorkQueue class. The workers in this example retrieve Integer objects from the work queue and square the values.
// Create the work queue WorkQueue queue = new WorkQueue(); // Create a set of worker threads final int numWorkers = 2; Worker[] workers = new Worker[numWorkers]; for (int i=0; i<workers.length; i++) { workers[i] = new Worker(queue); workers[i].start(); } // Add some work to the queue; block if the queue is full. // Note that null cannot be added to a blocking queue. for (int i=0; i<100; i++) { queue.addWork(i); } // Add special end-of-stream markers to terminate the workers for (int i=0; i<workers.length; i++) { queue.addWork(Worker.NO_MORE_WORK); }
This worker thread removes an integer from the work queue and calculates its square.
class Worker extends Thread { // Special end-of-stream marker. If a worker retrieves // an Integer that equals this marker, the worker will terminate. static final Object NO_MORE_WORK = new Object(); WorkQueue q; Worker(WorkQueue q) { this.q = q; } public void run() { try { while (true) { // Retrieve some work; block if the queue is empty Object x = q.getWork(); // Terminate if the end-of-stream marker was retrieved if (x == NO_MORE_WORK) { break; } // Compute the square of x int y = ((Integer)x).intValue() * ((Integer)x).intValue(); } } catch (InterruptedException e) { } } }
In J2SE 5.0, use a BoundedQueue to implement an unbounded work queue. See Creating a Bounded Work Queue [5.0] for an example; construct the ArrayBoundedQueue without a capacity.

Post a comment

More information about formatting options

CAPTCHA
This question is for testing whether you are a human visitor and to prevent automated spam submissions.
Image CAPTCHA
Enter the characters shown in the image. Ignore spaces and be careful about upper and lower case.