ProducerConsumer_Using_ExecutorService class in a for loop initiate producer consumer threads
using worker threads from ExecutorServiceThreadPool class.
consumer thread takes an element from queue adding it to ArrayList consumerdata.
Seems that consumerdata has no elements.
why is that;
Both producer - consumer threads seem to work.

import java.util.ArrayList;
import java.util.Iterator;

public class ProducerConsumer_Using_ExecutorService {

    ExecutorServiceThreadPool executorServiceThreadPool;
        static ArrayList consumerdata = new ArrayList(); 

    public static void main(String[] args) {
        ProducerConsumer_Using_ExecutorService prodconsumer = new ProducerConsumer_Using_ExecutorService();
        prodconsumer.init();
                Iterator itr = consumerdata.iterator(); 
                    while(itr.hasNext()) {
                        Object element = itr.next(); 
                        System.out.print(element + " ");    
                    } 
    }

    private void init() {
        executorServiceThreadPool = new ExecutorServiceThreadPool();
        for(int i = 0; i < 10; i++){
            executorServiceThreadPool.addThread(new Producer(i));   
            executorServiceThreadPool.addThread(new Consumer());
        }
                executorServiceThreadPool.finish();
    }

    private class Producer implements Runnable {
            int data;
            public Producer(int datatoput) {
               data = datatoput;
             }

        @Override
        public void run() {         
            System.out.println("Inserting Element " + data);
            try {
                executorServiceThreadPool.queue.put(data);
                    Thread.sleep(100);
            } catch (InterruptedException e) {
            }
        }
    }

    private class Consumer implements Runnable {
                int datatake;         
        @Override
        public void run() {                                 
                try {
                                    datatake = executorServiceThreadPool.queue.take();
                                    consumerdata.add(datatake);
                                    Thread.sleep(100);
                } catch (InterruptedException e) {
                }
        }

    }

}



import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ExecutorServiceThreadPool {
  final BlockingQueue<Integer> queue =null; 

    ExecutorService executor = Executors.newFixedThreadPool(2); 
      public void addThread(Runnable r){  
      executor.submit(r);
      }

      public void finish(){
              try {
          executor.shutdown();
          executor.awaitTermination(50, TimeUnit.SECONDS);
      } catch (InterruptedException ex) {
          Logger.getLogger(ExecutorServiceThreadPool.class.getName()).log(Level.SEVERE, null, ex);
      }      
    System.out.println("Finished all threads");
  }

}

Recommended Answers

All 3 Replies

Unless I missed something it looks like your init() methods creates the threads and makes them eligible for execution, but then on lines 12-16 you immediately print the contents of consumerData. At that point there is absolutely no guarantee that the other threads will have executed anything. Creating and starting a thread makes it eligible for execution, but it's up to the system to determine when they get allocated any CPU time. My guess is that you are printing the data before either other thread has executed.
Ideally you should use some kind of semaphore or wait/notify synchronisation to ensure the producer and consumer threads have executed before printing the results - a CountDownLatch would be ideal.

Thank you.
I tried your suggestion but nothing is printed.
For some reason consumerdata is empty.
Should i use Future interface and java.util.concurrent.Callable
instead of runnable?

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

public class ProducerConsumer_Using_ExecutorService {

    ExecutorServiceThreadPool executorServiceThreadPool;
    static ArrayList<Integer> consumerdata = new ArrayList<Integer>();
    static CountDownLatch cdl = new CountDownLatch(20);

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ProducerConsumer_Using_ExecutorService prodconsumer = new ProducerConsumer_Using_ExecutorService();
        prodconsumer.init();
        cdl.await();
        prodconsumer.show_list();
    }

    private void init() throws InterruptedException, ExecutionException {
        executorServiceThreadPool = new ExecutorServiceThreadPool();
        for (int i = 0; i < 10; i++) {
            executorServiceThreadPool.addThread(new Producer(i, cdl));
            executorServiceThreadPool.addThread(new Consumer(cdl));
        }
        Thread.sleep(200);
        executorServiceThreadPool.finish();


    }

    private void show_list() {
        Iterator itr = consumerdata.iterator();
        while (itr.hasNext()) {
            Object element = itr.next();
            System.out.print(element + " ");
        }
    }

    private class Producer implements Runnable {
        int data;
        private final CountDownLatch stop;

        public Producer(int datatoput, CountDownLatch stopLatch) {
            data = datatoput;
            this.stop = stopLatch;
        }

        @Override
        public void run() {
            System.out.println("Inserting Element " + data);
            try {
                executorServiceThreadPool.queue.put(data);
                Thread.sleep(100);
            } catch (InterruptedException e) {
            } finally {
                stop.countDown();
            }
        }
    }

    private class Consumer implements Runnable {

        int datatake;
        private final CountDownLatch stop;

        public Consumer(CountDownLatch stopLatch) {
            this.stop = stopLatch;
        }

        @Override
        public void run() {
            try {
                datatake = executorServiceThreadPool.queue.take();
                consumerdata.add(datatake);
                Thread.sleep(100);
            } catch (InterruptedException e) {
            } finally {
                stop.countDown();
            }

        }
    }
}

It was a stupid mistake. In class ExecutorServiceThreadPool i had wrote :

final BlockingQueue<Integer> queue =null;

instead of :

final BlockingQueue<Integer> queue =new ArrayBlockingQueue<Integer>(20);

Anyway it was a chance to use CountDownLatch.

Be a part of the DaniWeb community

We're a friendly, industry-focused community of developers, IT pros, digital marketers, and technology enthusiasts meeting, networking, learning, and sharing knowledge.