Implementing an Unbounded Work Queue
A work queue is used to coordinate work between a producer and
a set of worker threads. When some work needs to be performed, the
producer adds an object containing the work information to the work
queue. One of the worker threads then removes the object from the
work queue and acts upon the information. If the queue is empty, a
worker thread will block until a new object is added to the queue.
This example declares a class that implements an unbounded work
queue using a linked list.
Here's an example that uses the WorkQueue class.
The workers in this example retrieve Integer objects
from the work queue and square the values.
This worker thread removes an integer from the work queue and
calculates its square.
In J2SE 5.0, use a BoundedQueue to implement an unbounded work
queue. See Creating a Bounded Work Queue [5.0] for an example;
construct the ArrayBoundedQueue without a capacity.
public class WorkQueue {
LinkedList queue = new LinkedList();
// Add work to the work queue
public synchronized void addWork(Object o) {
queue.addLast(o);
notify();
}
// Retrieve work from the work queue; block if the queue is empty
public synchronized Object getWork() throws InterruptedException {
while (queue.isEmpty()) {
wait();
}
return queue.removeFirst();
}
}
// Create the work queue
WorkQueue queue = new WorkQueue();
// Create a set of worker threads
final int numWorkers = 2;
Worker[] workers = new Worker[numWorkers];
for (int i=0; i<workers.length; i++) {
workers[i] = new Worker(queue);
workers[i].start();
}
// Add some work to the queue; block if the queue is full.
// Note that null cannot be added to a blocking queue.
for (int i=0; i<100; i++) {
queue.addWork(i);
}
// Add special end-of-stream markers to terminate the workers
for (int i=0; i<workers.length; i++) {
queue.addWork(Worker.NO_MORE_WORK);
}
class Worker extends Thread {
// Special end-of-stream marker. If a worker retrieves
// an Integer that equals this marker, the worker will terminate.
static final Object NO_MORE_WORK = new Object();
WorkQueue q;
Worker(WorkQueue q) {
this.q = q;
}
public void run() {
try {
while (true) {
// Retrieve some work; block if the queue is empty
Object x = q.getWork();
// Terminate if the end-of-stream marker was retrieved
if (x == NO_MORE_WORK) {
break;
}
// Compute the square of x
int y = ((Integer)x).intValue() * ((Integer)x).intValue();
}
} catch (InterruptedException e) {
}
}
}
Post a comment