THREADS IN JAVA

EXAMPLE OF COMPETITION: PRODUCER CONSUMER

APPLICATION OF MULTITHREADING AND CONCURRENCY IN JAVA

java-logo

The producer-consumer problem (also known as the limited buffer problem) is an example of synchronization between processes. There are two processes, one producer and the other consumer, that share a fixed size buffer.

Producer Consumer

The producer generates data and deposits it in the buffer.

The consumer simultaneously uses the data produced by the producer, removing it from the buffer. The problem is to make sure that:

  • the producer does not process new data when the buffer is full
  • the consumer does not try to read data when the buffer is empty

The solution consists of:

  • suspend the execution of the producer if the buffer is full; when the consumer takes an item from the buffer, it will wake up the producer, which will resume filling the buffer
  • suspend the execution of the consumer if the buffer is empty; when the producer has inserted data into the buffer, it will wake up the consumer, which will resume reading and empty the buffer

The solution can be implemented through the use of communication strategies between processes (typically with semaphore). If the solution is not implemented correctly, we may have a deadlock situation, in which both processes remain waiting to be awakened.

import java.util.List;
public class Producer implements Runnable {
    private final List<Integer> bufferCondiviso;
    private final int SIZE;
    private int i = 1;
    public Producer(List<Integer> bufferCondiviso, int size) {
        this.bufferCondiviso = bufferCondiviso;
        this.SIZE = size;
    }
    @Override
    public void run() {
        while(true) {
            try {
                produce();
                i++;
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }
    }
    private void produce() throws InterruptedException {
        // il thread resta in stato wait se il buffer è pieno
        while (bufferCondiviso.size() == SIZE) {
            synchronized (bufferCondiviso) {
                System.out.println("Il buffer è pieno, il thread Producer resta in attesa... la dimensione del buffer adesso è: " + bufferCondiviso.size());
                bufferCondiviso.wait();
            }
        }
        // il buffer non èpieno, quindi il thread può aggiungere un nuovo elemento e notificarlo al consumer
        synchronized (bufferCondiviso) {
            bufferCondiviso.add(i);
            bufferCondiviso.notifyAll();
            System.out.println("Il thread Producer ha aggiunto al buffer l'elemento: " + i + " la dimensione del buffer adesso è: " + bufferCondiviso.size());
        }
    }
}
import java.util.List;

public class Consumer implements Runnable {
    private final List<Integer> bufferCondiviso;
    public Consumer(List<Integer> bufferCondiviso, int size) {
        this.bufferCondiviso = bufferCondiviso;
    }
    @Override
    public void run() {
        while (true) {
            try {
                System.out.println("Il thread Consumer sta leggendo il buffer... ");
                consume();
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }
    }
    private void consume() throws InterruptedException {
        // il thread resta in stato wait se il buffer è vuoto
        while (bufferCondiviso.isEmpty()) {
            synchronized (bufferCondiviso) {
                System.out.println("Il buffer è vuoto, il thread Consumer resta in attesa... la dimensione del buffer adesso è: " + bufferCondiviso.size());
                bufferCondiviso.wait();
            }
        }
        // il buffer contiene elementi, quindi il thread può eliminarne uno e notificarlo al producer
        synchronized (bufferCondiviso) {
            System.out.println("Il thread Consumer sta leggendo il buffer ed eliminando il seguente elemento: " + bufferCondiviso.remove(0) + " la dimensione del buffer adesso è: " + bufferCondiviso.size());
            bufferCondiviso.notifyAll();
        }
    }
}

import java.util.LinkedList;
import java.util.List;

public class TestClass {
    public static void main(String args[]) {
        List<Integer> bufferCondiviso = new LinkedList<Integer>();
        int size = 4;
        Thread prodThread = new Thread(new Producer(bufferCondiviso, size), "Producer");
        Thread consThread = new Thread(new Consumer(bufferCondiviso, size), "Consumer");
        prodThread.start();
        consThread.start();
    }
}

THE WAIT NOTIFY AND NOTIFYALL METHODS

The wait(), notify() and notifyAll() methods are defined within the Object class.

wait()

  • This method puts a thread on hold.
  • You can invoke the wait() method only on objects on which it has the “lock”
  • The wait() method can only be invoked in a synchronized method or code block, otherwise we will get the IllegalMonitorStateException

When the wait() method is invoked on an object, the following effects occur:

  • lock is released on the object
  • the thread is placed in “blocked” status

Similar to the wait() method, sleep() also puts the invoking thread on hold.

Between the two methods, however, there is a difference:

  • when the sleep() method is invoked, the lock on the object is not released; therefore, no thread can use it;
  • when the wait() method is invoked, however, the lock on the object is released, and the object becomes accessible to other threads

There are the following definitions of the wait method:

  • wait(), causes a thread to be interrupted until another thread invokes the notify() or notifyAll() method
  • wait(long timeout), causes a thread to abort until another thread invokes the notify() or notifyAll() method or if the timeout, expressed in milliseconds, set has been reached. If timeout is 0, the behavior is the same as in the wait() method
  • wait(long timeout, int nanos), is analogous to wait(long timeout), except that nanoseconds can be added to the timeout in milliseconds.

notify() and notifyAll()

The notify() method wakes up a waiting thread on an object that was in a “lock” state. The notifyAll() method wakes up all threads waiting on an object that was in a “lock” state. When these methods are invoked, the threads receiving the notification go into Runnable state. Awakened threads, must acquire the “lock” of the object that was used by the thread that invoked the notify() or notifyAll() method. A thread can invoke these methods only if it has a “lock” on the object for which it requests notification. If the notify() method is invoked on an object on which no thread is in the “wait” state, the notification is lost (it goes dead…) The notifyAll() method should be used when there are multiple threads waiting. When the notifyAll() method is invoked, this effect occurs:

  • All pending threads are awakened
  • All awakened threads queue up to acquire lock on the released object
  • Only one thread will take the lock, the others will wait for the object to be released again

The notify() method is more efficient than the notifyAll() method.

ADVANCED SYNCHRONIZATION WITH THE LOCK INTERFACE AND THE REENTRANTLOCK CLASS

SYNCHRONIZATION USING THE KEYWORD SYNCHRONIZED

The use of the keyword synchronized:

  • Allows exclusive access to a shared portion of code
  • makes access to a block of code atomic, allowing the following problems to be avoided:

▪ race condition, which is the phenomenon that an output generated in a multi-thread environment depends on the timing or sequence with which threads are executed

▪ interleaving, that is, access to a resource by competing processes

The use of the synchronized keyword, however, has some limitations, namely:

  • hand over hand (or multi-locking): this is when we have more than one resource to lock on. The goal is to synchronize one resource at a time, allowing other threads to access resources that are still free
  • timeout: a thread accessing a synchronized block cannot exit it until the block is freed. This means that another thread must wait an unspecified time before accessing the busy block
  • interruptibility: this is when we want to interrupt the execution of a synchronized block

The use of locks makes it possible to overcome such limitations!

LOCK

The Lock interface is designed to provide developers with a more powerful tool than classic synchronization (via synchronized). This interface is found in the java.util.concurrent.locks package. The Lock interface provides all the functionality of the synchronized keyword as well as new tools to perform locking, set timeout to handle locking etc.

Some important methods defined by the interface are:

  • lock(), used to lock a resource
  • unlock(), used to free a resource
  • tryLock(), used to wait for a certain period of time before making the lock

THE REENETRANTLOCK CLASS

The ReentrantLock class implements the Lock interface and has been available since Java version 1.5. This class is found in the java.util.concurrent.locks package. This class, in addition to implementing the methods of the Lock interface, contains some utility methods that allow threads to lock, wait a certain period of time before locking etc. When we use this class, the lock is reentrant. This means that a thread can acquire a lock that it already owns multiple times. The lock is achieved by using the lock() method. The unlock() method must be used to release the lock.

import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockEsempio {
    private ReentrantLock istanzaLock = new ReentrantLock();
    private int contatore = 0;
    private int somma = 0;
    public int conta() {
        System.out.println("Il thread " + Thread.currentThread().getName() + " ha richiesto di incrementare il contatore");
        istanzaLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " contatore = " + contatore);
            contatore++;

            return contatore;
        } finally {
            istanzaLock.unlock();
        }
    }
    public void somma() {
        System.out.println("Il thread " + Thread.currentThread().getName() + " ha richiesto di visualizzare la somma dei contatori");
        if(istanzaLock.tryLock()) {
            try {
                somma += contatore;
                System.out.println(Thread.currentThread().getName() + " la somma vale = " + somma);
            } finally {
                istanzaLock.unlock();
            }
        } else {
            System.out.println("************************ Il thread che ha il lock sull'oggetto è: " + Thread.currentThread().getName());
        }
    }
}
public class Contatore extends Thread {
    private ReentrantLockEsempio counter;
    private int limite;
    private int sleep;
    public Contatore(ReentrantLockEsempio counter, int limite, int sleep) {
        super();
        this.counter = counter;
        this.limite = limite;
        this.sleep = sleep;
    }
    @Override
    public void run() {
        while (counter.conta() < limite) {
            try {
                counter.somma();
                Thread.sleep(sleep);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }
    }
}
public class TestLock {

    public static void main(String[] args) {
        ReentrantLockEsempio counter = new ReentrantLockEsempio();
        Contatore c1 = new Contatore(counter, 20, 500);
        Contatore c2 = new Contatore(counter, 20, 500);
        c1.start();
        c2.start();
    }
}

LINKS TO PREVIOUS POSTS

THE JAVA LANGUAGE

LINK TO CODE ON GITHUB

GITHUB

EXECUTION OF THE EXAMPLE CODE

  • Download the code from GITHUB, launch the JAR file with the following command in Visual Studio Code, locating in the directory containing the JAR.

         java -jar –enable-preview CorsoJava.jar

  • Or run the main found in the file CorsoJava.java.