Creating a Bounded Work Queue [5.0]

A work queue is used to coordinate work between a producer and a set of worker threads. When some work needs to be performed, the producer adds an object containing the work information to the work queue. One of the worker threads then removes the object from the work queue and acts upon the information.

If a bounded work queue is full, the producer will block when it adds to the queue. It remains blocked until a worker thread removes an object from the queue. If the queue empty, a worker thread retrieving work will block until an object is made available by the producer. See also Implementing an Unbounded Work Queue.

// Create a bounded blocking queue of integers
final int capacity = 10;
BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);

// Create a set of worker threads
final int numWorkers = 2;
Worker[] workers = new Worker[numWorkers];
for (int i=0; i<workers.length; i++) {
    workers[i] = new Worker(queue);
    workers[i].start();
}

try {
    // Add some work to the queue; block if the queue is full.
    // Note that null cannot be added to a blocking queue.
    for (int i=0; i<100; i++) {
        queue.put(i);
    }

    // Add special end-of-stream markers to terminate the workers
    for (int i=0; i<workers.length; i++) {
        queue.put(Worker.NO_MORE_WORK);
    }
} catch (InterruptedException e) {
}
This worker thread removes an integer from the work queue and calculates its square.
class Worker extends Thread {
    // Special end-of-stream marker. If a worker retrieves
    // an Integer that equals this marker, the worker will terminate.
    static final Integer NO_MORE_WORK = new Integer(0);

    BlockingQueue<Integer> q;

    Worker(BlockingQueue<Integer> q) {
        this.q = q;
    }
    public void run() {
        try {
            while (true) {
                // Retrieve an integer; block if the queue is empty
                Integer x = q.take();

                // Terminate if the end-of-stream marker was retrieved
                if (x == NO_MORE_WORK) {
                    break;
                }

                // Compute the square of x
                int y = x * x;
            }
        } catch (InterruptedException e) {
        }
    }
}

Post a comment

More information about formatting options

CAPTCHA
This question is for testing whether you are a human visitor and to prevent automated spam submissions.
Image CAPTCHA
Enter the characters shown in the image. Ignore spaces and be careful about upper and lower case.