I see this example frequently and thought I'd give it a shot but I've got something unexpected happening with my Queue. I have multiple threads utilizing my producer class but the output doesn't show that it is being written to by each thread.

    protected Queue<ProductMessage> prodQueue = new ConcurrentLinkedQueue<>();

    @Override
    public void run() {
        while (true) {
            prodQueue.add(new ProductMessage(MP3Util.getRandomProduct(), new Date(), MP3Util.regionLookup(MP3Util.getState())));
            System.out.println(Thread.currentThread().getName() + "Queue " + prodQueue.size());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ProductProducer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }


        Thread p1 = new Thread(new ProductProducer());
        Thread p2 = new Thread(new ProductProducer());

        p1.start();

        p2.start();

The output shows:
Thread-1Queue 3
Thread-3Queue 3
Thread-1Queue 4
Thread-3Queue 4
Thread-3Queue 5
Thread-1Queue 5

Ideas? I'm pretty sure the ConcurrentLinkedQueue is "thread safe" per the API so it has to be something I'm doing.

Recommended Answers

All 12 Replies

You seem to be having multiple threads utilizing multiple instances of the ProductProducer class i.e. each thread operates on a separate instance of the ProductProducer class which explains each item count occurring twice, once for each queue. If you want different instances to operate on the same queue, either declare the queue somewhere where it's visible by all the threads or pass in the Queue when constructing ProductProducer instances.

Obviously not in the right spot here...
I've fixed the code to pass a the queue but now I get null pointer exceptions...

    public static void main(String[] args) {
        Queue<ProductMessage> prodQueue = new ConcurrentLinkedQueue<>();

        ProductProducer p1 = new ProductProducer(prodQueue, 100);
        ProductProducer p2 = new ProductProducer(prodQueue, 300);

        ProductConsumer easternConsumer = new ProductConsumer(prodQueue, 500);
        ProductConsumer centralConsumer = new ProductConsumer(prodQueue, 700);
        ProductConsumer mountainConsumer = new ProductConsumer(prodQueue, 900);
        ProductConsumer pacificConsumer = new ProductConsumer(prodQueue, 1100);

        Thread t0 = new Thread(p1);
        t0.start();
        Thread t1 = new Thread(p2);
        t1.start();

        Thread t2 = new Thread(easternConsumer);
        t2.start();
        Thread t3 = new Thread(centralConsumer);
        t3.start();
        Thread t4 = new Thread(mountainConsumer);
        t4.start();
        Thread t5 = new Thread(pacificConsumer);
        t5.start();        


int ctr = 0;
        while (true) {
            ctr++;
            if (ctr == 10) {
                System.exit(0);
            }
            try {
                Thread.sleep(2000);
                System.out.println("In main() has yielded and ctr is: " + ctr);
            } catch (Exception ex) {
                //System.exit(0);
            }
        }        
    }    
}






public class ProductProducer implements Runnable {

    protected Queue<ProductMessage> prodQueue;
    protected int interval;

    public ProductProducer(Queue<ProductMessage> prodQueue, int interval) {
        prodQueue = this.prodQueue;
        interval = this.interval;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(interval);
            } catch (InterruptedException e) {
                prodQueue.add(new ProductMessage(MP3Util.getRandomProduct(), new Date(), MP3Util.regionLookup(MP3Util.getState())));
                System.out.println(Thread.currentThread().getName() + "Queue " + prodQueue.size());
                System.out.println("In Producer\n");
            }
        }
    }
}

Queue<ProductMessage> prodQueue = new ConcurrentLinkedQueue<>();

Are you sure that it is supposed to instantiate that way???

I'm not sure I follow

I've fixed the code to pass a the queue but now I get null pointer exceptions...

Your constructor definition of ProductConsumer is hosed. You write prodQueue = this.prodQueue instead of writing it the other way round. Think about it, do you really want to assign the passed in argument the value of prodQueue instance, which is by default null?

Are you sure that it is supposed to instantiate that way???

Yes, it's possible with the new Java 7 type inference.

@ s.o.s thanks man too much time staring at this thing; i was looking at everything except that.

Think I'm finally making so headway here.

                synchronized (this) {
                    prodQueue.add(insertToQueue);
                    System.out.println(Thread.class.getName() + " putting an object on the queue. Total size: " + prodQueue.size());
                }

Synchronizing this statement should block the second thread from accessing the queue correct? However, as each outputs the size they do not stagger as would be expected "1,2,3,4,5,6..." instead I see "2,2,4,4,6,6..."

From the API:
"Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires a traversal of the elements."

Am I to understand that in order to accurately get the size of the queue I have to create a counter to iterate through the queue?

Synchronizing this statement should block the second thread from accessing the queue correct?

No, because your code synchronizes on this inside the ProductConsumer class which means the current instance of the ProductConsumer object. Since each thread has a different ProductConsumer instance, it doesn't prevent the threads from adding items to the queue concurrently. It would be a different thing if you synchronized on the queue though.

I would personally recommend starting with the Java trail and working through the concepts incrementally instead of continuing down this path of haphazard changes. :)

I'm working on it =)
So I got the synchronization figured out; it's easy to make silly mistakes when you're just getting started.

    @Override
    public void run() {
        while (true) {
            //take a nap
            try {
                Thread.sleep(interval);
            } catch (InterruptedException ex) {
                //if you get woken up theres a reason
            }
            insertToQueue();
        }
    }

    /**
     * So long as we have less than the max queue lets keep putting into the q
     */
    private synchronized void insertToQueue() {
        //queue full? tell someone and take a nap
        while (prodQueue.size() >= MAXQUEUE_SIZE) {
            try {
                wait();
                notify();
            } catch (InterruptedException e) {
                //whats a thread gotta do to get some sleep around here
            }
        }
        //make a productmessage, add it, and tell somebody!
        insertToQueue = new ProductMessage(MP3Util.getRandomProduct(), new Date(), MP3Util.regionLookup(MP3Util.getState()));
        //add it to the queue
        prodQueue.add(insertToQueue);
        System.out.println(Thread.currentThread().getName() + " adding product at : " + time.format(insertToQueue.getTimeStamp()) + " Queue: " + prodQueue.size());
    }
}

I'm still not sure I have the multithreading running correctly. I'm still trying to get that part figured out. Take a look and tell me what you think?

If you want a bounded queue (one with capacity) for producer consumer, use a LinkedBlockingQueue or ArrayBlockingQueue and its methods, put and take which automatically block/wake the corresponding thread.

For e.g. put on a full queue will block that particular producer whereas take on an empty queue will block that particular consumer. Perfect for your use case IMO.

I'll check it out; I don't have a purpose so much as just trying to play around and learn some stuff.

If I getting you right either alternative queue natively handles the sleep, notify etc.?

If I getting you right either alternative queue natively handles the sleep, notify etc.?

None of the queues use sleep AFAIK. Using sleep is a bad because you end up waiting for a fixed amount of time. Let's say you make the consumer thread sleep for 100ms waiting for data. What happens if the data arrives in 1ms? 99ms are wasted sleeping. This is the reason why pretty much everything in the standard library uses wait and notifyAll with some very rare exceptions (read: advanced implementations).

Both the queues I mentioned are blocking queues i.e. will cause blocking if required. It's just that their implementations are different -- one uses arrays underneath, the other uses linked lists.

The queue you were using - ConcurrentLinkedQueue, is a bit different in the sense that it doesn't support blocking natively. So if you have to implement producer-consumer for it, you'll have to do the hard work of co-ordinating the corresponding threads which is a pain and easy to mess up. I must agree though that this concurrent queue has a better chance of scaling given that it doesn't use blocking (wait/notifyAll) but is much harder to use. Just stick to simple implementations and work towards complex ones in case you have visbile performance/other issues.

I'll check it out; I don't have a purpose so much as just trying to play around and learn some stuff.

If you want to learn stuff, look at how the queues are implemented. In the older Java 1.4 days, pretty much all the coordination stuff between threads was written using just wait, notify and notifyAll and writing that really tests your understanding of concurrent execution/understanding of JVM.

Be a part of the DaniWeb community

We're a friendly, industry-focused community of developers, IT pros, digital marketers, and technology enthusiasts meeting, networking, learning, and sharing knowledge.