THREADS IN JAVA

I THREAD POOL

java-logoA thread pool is a software component that is responsible for managing threads, with the goal of optimizing and simplifying their use. When we write a multithreaded program each thread has the task of executing a specific task. The thread pool, allows you to manage the execution of a list of threads. A thread pool has an internal queue, which allows multiple threads to be added, queuing them together. Thread execution management is the responsibility of the thread pool. To decide which thread to execute first and how many to execute, there are several algorithms. In Java there are several classes that implement these algorithms.

Why use thread pools?

  • increase the performance of the applications that use them, as these optimize memory and CPU utilization
  • resources are optimized and there is an increase in the speed of task execution as operations are parallelized
  • the source code is cleaner, since there is no need to deal with thread creation and management, since these tasks are taken care of by the thread pool.

THREAD POOLS IN JAVA

Interface java.util.concurrent.Executor

This is the basic interface that defines the main mechanisms for managing a thread pool. The main method is execute(thread to execute) which allows new threads to be added to the pool.

Interface java.util.concurrent.ExecutorService

This interface extends the Executor interface and adds methods that enable better management of the pool, including shutdown(), which tells the pool to initiate the closure of all threads. When this method is invoked, no new threads can be added to the pool.

Class java.util.concurrent.Executors

Factory class that allows you to create instances of pools (Executor, ExecutorService, etc.)

THREAD POOL
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class EsempioThreadPool {
    public static void main(String[ ] args) {
        /* creo il thread pool */
        ExecutorService pool = Executors.newCachedThreadPool();
        /* aggiunge i thread al pool */
        pool.execute(new GetSitePage("https://www.marcoalbasini.com"));
        pool.execute(new GetSitePage("https://www.google.it"));
        pool.execute(new GetSitePage("https://www.amazon.com"));
        pool.shutdown();
    }
}
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;

public class GetSitePage extends Thread {
    private String url;
    public GetSitePage(String url) {
        super();
        this.url = url;
    }
    @Override
    public void run() {
        try {
            URL site = new URL(url);
            URLConnection con = site.openConnection();
            InputStream in = con.getInputStream();
            String encoding = con.getContentEncoding();
            encoding = encoding == null ? "UTF-8" : encoding;
            System.out.println("************************************************");
            System.out.println("CONTENUTO DELLA PAGINA WEB: " + url);
            System.out.println(getString(in));
            System.out.println("************************************************");
        } catch (IOException e) {
            String a = e.getMessage();
            System.out.println(a);
        }
    }
    private String getString(InputStream is) {
        BufferedReader br = null;
        StringBuilder sb = new StringBuilder();
        String line;
        try {
            br = new BufferedReader(new InputStreamReader(is));
            while ((line = br.readLine()) != null) {
                sb.append(line);
            }

        } catch (IOException e) {
            String a = e.getMessage();
            System.out.println(a);
        } finally {
            if (br != null) {
                try {
                    br.close();
                } catch (IOException e) {
                    String a = e.getMessage();
                    System.out.println(a);
                }
            }
        }
        return sb.toString();
    }
}

ARRAYBLOCKINGQUEUE AND LINKEDBLOCKINGQUEUE CLASSES

In Java, a generic queue is represented by the interface java.util.Queue. This interface extends the java.util.Collection interface and therefore inherits all of its features (e.g., the methods add(e), remove(e), …). In addition, the Queue interface defines other methods, including:

  • peek(), retrieves the first element in the queue, without deleting it. The method returns the retrieved element or null if the queue is empty.
  • element(), similarly to peek() retrieves the first element in the queue, without deleting it, only if the queue is empty it does not return null but generates the exception: NoSuchElementException.
  • poll(), retrieves and removes the first element in the queue. The method returns the removed element or null if the queue is empty.

The Queue interface is extended by the java.util.BlockingQueue interface, which represents a generic blocking queue, ensuring safe execution of operations. BlockingQueue inherits Queue methods, as well as new methods, including:

  • put(), inserts an object at the end of the queue; if the queue is full, the method hangs putting the current thread on hold. The thread will be reactivated when an item is removed from the queue.
  • take(), returns the first item in the queue; if the queue is empty, the method hangs putting the current thread on hold. The thread will be reactivated when a new item is inserted.

By putting threads on hold, software using this interface is synchronized. The BlockingQueue interface is implemented by several classes. Two classes that implement it and are Thread Safe are:

  • util.concurrent.ArrayBlockingQueue
  • util.concurrent.LinkedBlockingQueue

These classes are designed to work with threads and therefore can provide concurrent access to multiple threads. The ArrayBlockingQueue class is implemented as a circular array of blocking type. An object of type ArrayBlockingQueue is, therefore, a blocking queue. An object of type ArrayBlockingQueue has a fixed capacity and is not resizable. The size is defined in the constructor, when creating the object. Elements are ordered within the queue according to the FIFO (First-In First-Out) specification. The LinkedBlockingQueue class, unlike the ArrayBlockingQueue class, allows instances to be created without specifying capacity. In this case, the maximum capacity will be Integer.MAX_VALUE, that is, 231-1 elements (2,147,483,647). If the queue is never full, the put() method can never block! In the Producer/Consumer example, if we do not set a capacity to the queue, the producer will add items as long as it has available resources!

import java.util.concurrent.BlockingQueue;

public class Consumatore implements Runnable {
    private int i = 0;
    private BlockingQueue<String> queue;
    public Consumatore(BlockingQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        while(i < 30) {
            if (queue.remainingCapacity() > 0) {
                System.out.println("E' possibile aggiungere ancora " + queue.remainingCapacity() + " su " + queue.size());
            } else if (queue.remainingCapacity() == 0) {
                String elementoRimosso = queue.remove();
                System.out.println("E' stato rimosso l'elemento " + elementoRimosso);
            }
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            i++;
        }
    }
}
import java.util.concurrent.BlockingQueue;

public class Produttore implements Runnable {
    private BlockingQueue<String> queue;
    public Produttore(BlockingQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        int i = 0;
        while(i < 30) {
            String elem = "Elemento numero " + i;
            /* provo ad aggiungere un elemento alla coda */
            boolean aggiunto = queue.offer(elem);
            System.out.println("L'elemento " + i + " è stato aggiunto? " + aggiunto);
            i++;
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class EsempioArrayBlockingQueue {
    public static void main(String[] args) {
        // Creo una coda che può contenere al massimo 10 elementi.
        BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
        // Producer e Consumer accedono alla stessa coda...
        Thread prod = new Thread(new Produttore(queue));
        Thread cons = new Thread(new Consumatore(queue));
        prod.start();
        cons.start();
    }
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class EsempioLinkedBlockingQueue {
    public static void main(String[] args) {
        // Creo una coda senza specificare la capacit�.
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

        // Producer e Consumer accedono alla stessa coda...
        Thread prod = new Thread(new Produttore(queue));
        Thread cons = new Thread(new Consumatore(queue));

        prod.start();
        cons.start();
    }
}

DEADLOCK, STARVATION AND LIVELOCK

Deadlock occurs when thread A gets stuck waiting for thread B to free the resource shared between the two and in turn, B gets stuck waiting for A to free another resource. The two threads A and B are locked to each other, one waiting for the other. In this case there is no way out!

Deadlock

Starvation occurs when a thread never manages to acquire (or succeeds after too long) the resources it needs because they are blocked by other threads. Suppose we have 3 threads A, B, C accessing the same resource R1 and A and B have a higher priority than C. As long as A and B attempt to acquire the resource R1, C will not be able to lock and use it because A and B have a higher priority than C. In this case, C is subject to starvation. Livelock occurs when 2 threads are busy responding to each other and thus are unable to continue executing work. Threads are not blocked as they are in deadlock, they are busy.

LINKS TO PREVIOUS POSTS

THE JAVA LANGUAGE

LINK TO CODE ON GITHUB

GITHUB

EXECUTION OF THE EXAMPLE CODE

  • Download the code from GITHUB, launch the JAR file with the following command in Visual Studio Code, locating in the directory containing the JAR.

        java -jar –enable-preview CorsoJava.jar

  • Or run the main found in the file CorsoJava.java.