I saw a tutorial from a site where it showed the use of ArrayBlockingQueue as a thread-safe concurrent data-structure . Just for learning purposes , i tried to build something similar using synchronized and wait-notify methods in java. I'm hoping someone could guide me to some improvements and point out the bad parts of this code i wrote :

import java.util.ArrayList;
import java.util.Random;

public class LowLevelProducerConsumer {
    ArrayList<Integer> list = new ArrayList<Integer>();
    private final int size = 10;
    private int elems = 0;
    Object lock = new Object();
    Random r = new Random(System.currentTimeMillis());

    public void producer() {
        while (true) {
            synchronized (lock) {
                try {
                    while (elems < size) {
                        list.add(r.nextInt(100));
                        elems++;
                    }
                } finally {
                    // allows consumer to remove an entry
                    lock.notify();
                }
            }
        }
    }

    public void consumer() throws InterruptedException {
        Thread.sleep(100); // any better way to ensure producer runs first ?
        int ran = 0;
        while (true) {
            // consumer tries to acquire lock here , but it can do so only after
            // producer has called notify
            synchronized (lock) {

                // do i need a lock.wait() here somewhere ?

                ran = r.nextInt(10);

                if (ran == 7) { // just an arbitrary condition

                    int loc = r.nextInt(list.size());
                    int data = list.remove(loc);
                    System.out.format(
                            "%d removed from index %d , size : %d\n",
                            data, loc, list.size());
                    // decrementing elems to let the producer work again
                    elems--;
                    Thread.sleep(300);
                }

                // release the lock so that producer can fill things up again
                lock.notify();
            }
        }

    }

    public static void main(String[] args) {
        final LowLevelProducerConsumer low = new LowLevelProducerConsumer();
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                low.producer();
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    low.consumer();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        t1.start();
        t2.start();
        try {
            t1.join();
            t2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

ps : i'v kept my understanding of how the code works in the comments. Any correction to them would be much appreciated.

Recommended Answers

All 2 Replies

This is something similar i had wrote.
I hope it will help you.

 public class Change{

    private  boolean flag = false;
    public static  int commonresource=1;

    public synchronized void increment(int inc) {

    while (flag){
    try{
    wait();
    } catch (InterruptedException e) {}
    }

    flag = true;
    {
    System.out.println("before increment :"+commonresource);
    commonresource += inc;
    System.out.println(Thread.currentThread().getName()+" increment by "+inc);
    System.out.println("after increment :"+commonresource+"\n");}
    notify();
    }

    public synchronized void decrement(int inc) {

    while (!flag){
    try{
    wait();
    } catch (InterruptedException e) {}
    }

    flag = false;
    {
    System.out.println("before decrement :"+commonresource);
    commonresource -= inc;
    System.out.println(Thread.currentThread().getName()+" decrement by "+inc);
    System.out.println("after decrement :"+commonresource+"\n");}
    notify();
    }
    }
/*
  if initial flag value is false Increment_Change start first
  if initial flag value is true  Decrement_Change start first
 */


    public class Decrement_Change implements Runnable {
    private Change ch;
    public Decrement_Change(Change ch) {
    this.ch = ch;
    }
    @Override
    public void run() {
    for(int i =0; i<5;i++)
    ch.decrement(1);
    }
    }


 public class Increment_Change implements Runnable {
private Change ch;
public Increment_Change(Change ch) {
this.ch = ch;
}
@Override
public void run() {
for(int i =0; i<5;i++)
ch.increment(5);
}
}

public class Start_the_two_Threads {
public static void main(String args[]) {
Change ch = new Change();
(new Thread(new Increment_Change(ch))).start();
(new Thread(new Decrement_Change(ch))).start();
}}

If you end up using sleep for inter-thread communication/co-ordination, you are doing it wrong. The entire point of wait/notify/notifyAll is to ensure that you coordinate activites based on some conditions and signal the threads accordingly. In your case it's pretty simple: wait till the shared resource (list in your case) has some data in it. This is done by checking the size of the list in a loop and wait till it's zero. This way you ensure that "consumer" doesn't run before the producer.

Take a look at this SO thread; you would basically need to take that short snippet and convert it to a code which uses an Object array instead of a linked list (manage the get and put indices etc.). The remaining logic remains the same.

Be a part of the DaniWeb community

We're a friendly, industry-focused community of developers, IT pros, digital marketers, and technology enthusiasts meeting, networking, learning, and sharing knowledge.