Java Concurrency


Creative Commons License
This -Java Concurrency- tutorial is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License
Java SE 11 Developer certification (exam number 1Z0-819) here

Concurrency

Parallelism in Java

This tutorial is about fundamentals (Thread class, synchronized keyword…) and advances of multithreading. Parallelism in Java gradually evolves along with the ability of Java to target big data processing. Low-level Application Programming Interfaces (APIs) tend to disappear to the benefit of asynchronous programming: applications consume “services” that produce (huge) data. The asynchronous nature of services calls for (newer) high-end APIs like, for example, streams. Asynchronous APIs for HTTP, HTTPS, WebSockets… are also part of this challenge.

Headlines
From Java 5, significant modifications occur in the way “parallel programming” (i.e., concurrency, multithreading…) can be managed, including the use of the original synchronized and volatile keywords

Operating System (OS) level

Example Multithreading.Java.zip 

String[] command = {"/usr/local/bin/node", "--version"}; // macOS-specific!
try {
    Process p = Runtime.getRuntime().exec(command); // Heavyweight process with probable incompatibilities across operating systems like Windows, macOS, LINUX, etc.
    if (p.waitFor() == 0) { // Blocking! Better solution required, e.g., look at https://www.javaworld.com/article/2071275/when-runtime-exec---won-t.html
        try (java.io.BufferedReader input = new java.io.BufferedReader(new java.io.InputStreamReader(p.getInputStream()))) {
            String content;
            while ((content = input.readLine()) != null) System.out.println("Out: " + content); // 'Out: v11.8.0' is displayed...
        }
    } else {
        try (java.io.BufferedReader input = new java.io.BufferedReader(new java.io.InputStreamReader(p.getErrorStream()))) {
            String content;
            while ((content = input.readLine()) != null) System.err.println("Err: " + content);
        }
    }
} catch (InterruptedException | java.io.IOException ieioe) {
    System.err.println("Runtime.getRuntime().exec(command): " + ieioe.getClass().getSimpleName() + ": " + ieioe.getMessage());
}

Java Virtual Machine (JVM) level

Example

private boolean _stop = false;
private Thread _t1; // Lightweight process
…
_t1 = new Thread() {
    public void run() {
        while(! _stop) { /* Something to do… */ }
    }
};
…
_t1.start(); // 'Thread' objects are NEVER restartable!
…
_stop = true; // Never used '_t1.stop();'
_t1 = null; // For garbage collector

Runnable interface

Rule(s)

Example Lambda_expression.Java.zip 

public class Functional_interface_test {
    public static void main(String[] args) {
        // No reference to the 'run' method is required since 'java.lang.Runnable' only owns this single method:
        Runnable to_do = () -> { 
            try {
                Thread.sleep(1000L); // Nothing to do for 1 sec. <=> a kind of delay!
            } catch (InterruptedException ie) {
                java.util.logging.Logger.getLogger("Why it doesn't work?").log(java.util.logging.Level.SEVERE, null, ie);
            }
        };
        new Thread(to_do).start();
    …

synchronized keyword

Synchronized method

Rule(s)

  • A synchronized method is by definition unstoppable. In other words, a thread running a synchronized method cannot be stopped for the benefit of another concurrent thread.

Example Multithreading.Java.zip 

public class Illustration_of_synchronized {

    static private int _I = 0; // 'static' is for convenience and simplification...

    static { // Static initialiser is for convenience and simplification...
        Thread plusplus = new Thread("plusplus") {
            @Override
            public void run() {
                assert (Thread.currentThread() == this);
//                System.out.println(Thread.currentThread().getName()); // 'plusplus'
                _Plus();
                _Plus();
            }
        };
        assert (plusplus.getState() == Thread.State.NEW);

        Thread minus = new Thread("minus") {
            @Override
            public void run() {
                assert (Thread.currentThread() == this);
//                System.out.println(Thread.currentThread().getName()); // "minus"
                _Minus();
            }
        };
        assert (minus.getState() == Thread.State.NEW);

        plusplus.start(); // 0 -> 1 -> 2 *OR* -1 -> 0 -> 1
        assert (plusplus.getState() == Thread.State.RUNNABLE || plusplus.getState() == Thread.State.TERMINATED);
        minus.start(); // 2 -> 1 *OR* 0 -> -1
        assert (minus.getState() == Thread.State.RUNNABLE || minus.getState() == Thread.State.TERMINATED);
        /**
         * Caution: execution can be -plusplus ; minus- *OR* -minus ; plusplus-
         */
    }

    static synchronized private void _Plus() {
        _I++;
    }

    static synchronized private void _Minus() {
        _I--;
    }

    public static void main(String[] args) {
        System.out.println(Thread.currentThread().getName()); // 'main'
        System.out.println("Due to 'plusplus' and 'minus' are BOTH unstoppable, result cannot be anything but '1': " + _I);
    }
}

Synchronized block

Rule(s)

  • A synchronized block allows the delimitation of unstoppable code pieces. In this case, synchronized then requires the object to be synchronized while a synchronized method acts on this. Synchronized blocks cannot cope with primitive types… From Java 5, synchronized benefits from being replaced by the java.util.concurrent.locks.Lock interface.

Example

Temperature t = new Temperature();
synchronized(t) { // Mutex acquisition on 't'
    // Synchronized code…
} // Mutex release on 't'

“Parallel programming”-based support in java.lang.Object

Rule(s)

Example Multithreading.Java.zip 

public class Simple_dead_lock {

    private int _i = 0;

    synchronized public void f() {
        System.err.println("Start of 'f()'");
        _i++;
        _g();
        System.err.println("Value of '_i': " + _i);
        notify();
    }

    synchronized private void _g() {
        System.err.println("Start of '_g()'");
        try {
            wait(5000); // Dead lock occurs for 5 sec.!
        } catch (InterruptedException ie) {
            ie.printStackTrace();
        }
        System.err.println("End of 'wait()'");
    }

    public static void main(String[] args) {
        Simple_dead_lock sdl = new Simple_dead_lock();
        sdl.f();
    }
}

volatile keyword

Rule(s)

Example Multithreading.Java.zip 

public class Illustration_of_volatile {

    private int _bads = 0;
    private int _goods = 0;
    volatile private boolean _stop = false; // Important issue: '_stop' is used in read mode only, which is a smart use of 'volatile'!

    /**
     * The game is avoiding the following inconsistent compositions:
     * "FrançoisSarkozy" and "NicolasHollande"
     */
// 1. Executing the program "as is" leads to some bad compositions...
// 2. Uncomment 'volatile' for '_name', does it work? Unfortunately no, some bad compositions persist...
// 3. Use 'java.util.concurrent.atomic.AtomicReference' instead... Does it work? Fortunately yes!
    /*volatile*/ private String _name = "?";
//    java.util.concurrent.atomic.AtomicReference<String> _name = new java.util.concurrent.atomic.AtomicReference("?");
    private Thread _t1, _t2, _t3;

    private void set(String given_name, String surname) {
        /**
         * Executions 1 & 2
         */
        _name = given_name;
        try {
            Thread.sleep(1); // Ficticious delay so that 'set' lasts and then may be "actually" interrupted...
        } catch (InterruptedException ie) {
            ie.printStackTrace();
        }
        _name += surname;
        /**
         * Execution 3
         */
//        _name.set(given_name + surname); // To be used with 'java.util.concurrent.atomic.AtomicReference<String> _name = new java.util.concurrent.atomic.AtomicReference("?");'
    }

    /**
     * If not 'synchronized' then 'check' can be interrupted by '_t1' and '_t2',
     * but, anyway, bad compositions may occur...
     */
    /*synchronized*/ private void check() {
        if (_name.equals("Fran" + (char) 0x00E7 + "ois" + "Sarkozy") || _name.equals("Nicolas" + "Hollande")) {
            _bads++;
        } else {
            _goods++;
        }
    }

    public Illustration_of_volatile() {
        _t1 = new Thread() {
            public void run() {
                while (!_stop) {
                    set("Fran" + (char) 0x00E7 + "ois", "Hollande");
                }
            }
        };
        _t2 = new Thread() {
            public void run() {
                while (!_stop) {
                    set("Nicolas", "Sarkozy");
                }
            }
        };
        _t3 = new Thread() {
            public void run() {
                while (!_stop) {
                    check();
                }
            }
        };
    }

    public void start() {
        _t1.start();
        _t2.start();
        _t3.start();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException ie) {
            ie.printStackTrace();
        }
        _stop = true; // Let think about this: while all of the three threads are stopped at the same time?
        _t1 = null;
        _t2 = null;
        _t3 = null;
    }

    public static void main(String[] args) {
        Illustration_of_volatile iov = new Illustration_of_volatile();
        iov.start();
        System.out.println("Bad compositions: " + iov._bads);
        System.out.println("Good compositions: " + iov._goods);
    }
}
From Java 5, the java.util.concurrent package revisits the way “parallel programming” may be performed in Java

Rule(s)

Example Multithreading.Java.zip 

public class Illustration_of_Lock_and_Condition {

    private final java.util.LinkedList<String> _data = new java.util.LinkedList();
    private java.nio.file.Path _file;
    // https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/locks/Lock.html
    private final java.util.concurrent.locks.Lock _lock = new java.util.concurrent.locks.ReentrantLock(/* Exercise 1: */ /* true */);
    // https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/locks/Condition.html
    private final java.util.concurrent.locks.Condition _condition = _lock.newCondition();
    private boolean _stop = false;

    /* Exercise 1: */ /* private int _count = 0; */

    public Illustration_of_Lock_and_Condition(String data) {
        assert (data != null && data.length() > 0);
        try {
            _file = new java.io.File("my_file").toPath();
            java.nio.file.Files.deleteIfExists(_file);
            java.nio.file.Files.write(_file, data.getBytes());
        } catch (java.io.IOException ioe) {
            System.err.println(this.getClass().getSimpleName() + ": " + ioe.getMessage());
            Runtime.getRuntime().exit(-1); // For simplicity only, i.e., didactical program
        }
    }

    private void _loading() {
        try {
            java.util.List<String> lines = java.nio.file.Files.readAllLines(_file, java.nio.charset.Charset.defaultCharset());
            for (String line : lines) {
                _data.add(line);
                try {
                    Thread.sleep(1); // Ficticious delay so that 'for' loop lasts and then may be artificially paused...
                } catch (InterruptedException ie) {
                    System.err.println(this.getClass().getSimpleName() + ": " + ie.getMessage());
                    Runtime.getRuntime().exit(-2); // For simplicity only, i.e., didactical program
                }
//                System.out.println(line + " loaded...");
            }
        } catch (java.io.IOException ioe) {
            System.err.println(this.getClass().getSimpleName() + ": " + ioe.getMessage());
            Runtime.getRuntime().exit(-2); // For simplicity only, i.e., didactical program
        } finally {
            _stop = true;
        }
//        System.out.println("'_loading' is ending...");
    }

    private void _processing() {
        while (!_stop || !_data.isEmpty()) {
            _lock.lock(); // Thread becomes dormant while awaiting the lock's aquisition...
            try {
                _data.addFirst(_data.removeFirst().toUpperCase());
                _condition.signal();
                /* Exercise 1: */ /* System.out.println(_data.getFirst() + " processed..." + _count++); */
                try {
                    Thread.sleep(1); // Ficticious delay so that processing lasts and then may be artificially paused...
                } catch (InterruptedException ie) {
                    System.err.println(this.getClass().getSimpleName() + ": " + ie.getMessage());
                    Runtime.getRuntime().exit(-2); // For simplicity only, i.e., didactical program
                }
            } catch (java.util.NoSuchElementException nsee) {
                // 'removeFirst' may occur while the list is empty... It doesn't matter for a didactical program, but improvements are required for a professional program!
            } finally {
                _lock.unlock();
            }
        }
//        System.out.println("'_processing' is ending...");
    }

    private void _unloading() {
        while (!_stop || !_data.isEmpty()) {
            _lock.lock(); // Thread becomes dormant while awaiting the lock's aquisition...
            try {
                // "Spurious" wakeup unmanaged:
                _condition.await(); // Lock is atomically released...
                System.out.println(_data.removeFirst() + " unloaded...");
            } catch (InterruptedException ie) {
                System.err.println(this.getClass().getSimpleName() + ": " + ie.getMessage());
                Runtime.getRuntime().exit(-2); // For simplicity only, i.e., didactical program
            } catch (java.util.NoSuchElementException nsee) {
                // 'removeFirst' may occur while the list is empty... It doesn't matter for a didactical program, but improvements are required for a professional program!
            } finally {
                _lock.unlock(); // If 'await' fails then the lock is released...
            }
        }
//        System.out.println("'_unloading' is ending...");
    }

    public static void main(String[] args) {
        final Illustration_of_Lock_and_Condition iolac = new Illustration_of_Lock_and_Condition("f\nr\na\nn\nc\nk\n \nb\na\nr\nb\ni\ne\nr");
        new Thread("unloading") {
            @Override
            public void run() { // 'this' is the executing thread!
                iolac._unloading();
            }
        }.start();
        new Thread("processing") {
            @Override
            public void run() { // 'this' is the executing thread!
                iolac._processing();
            }
        }.start();
        new Thread("loading") {
            @Override
            public void run() { // 'this' is the executing thread!
                iolac._loading();
            }
        }.start();
    }
}

Exercise 1

The program looks pretty slow. This is confirmed in uncommenting /* Exercise 1: */ /* private int _count = 0; */ and /* Exercise 1: */ /* System.out.println(_data.getFirst() + " processed..." + _count++); */.

In fact, the processing thread performs many useless changes (≈ _count) to the first element of _data. This occurs because the unloading thread is the most waiting thread: data (i.e., simple strings) can be unloaded (i.e., removed the _data list) only when processed (i.e., changed to upper case). The java.util.concurrent.locks.ReentrantLock class then comes with a “fairness” parameter in the constructor: true means that the most waiting threads have priority. By uncommenting /* Exercise 1: */ /* true */, the program goes faster!

From Java 5, the java.util.concurrent.ExecutorService interface aims at making more intuitive the use of threads. From Java 7, java.util.concurrent.ForkJoinPool (here…) as subtype of java.util.concurrent.ExecutorService enhances synchronization capabilities (difference is in particular discussed here…)

Example (Carrefour_Items.Java.zip )

// File system "Observer" pattern (from Java 7):
java.nio.file.WatchService ws = java.nio.file.FileSystems.getDefault().newWatchService();
/** 'java.nio.file.Path' interface extends 'java.nio.file.Watchable' interface */
// 'database' directory is observed by a daemon for any file creation:
java.nio.file.FileSystems.getDefault().getPath("database").register(ws, java.nio.file.StandardWatchEventKinds.ENTRY_CREATE);
…
// Watch service waits for just created file in 'database':
final java.util.concurrent.Callable<String> c = () -> { // Override 'call' method...
    // 'take' is a blocking statement...
    java.nio.file.WatchEvent<?> we = ws.take().pollEvents().get(0);
    return ((java.nio.file.WatchEvent<java.nio.file.Path>) we).context().toString(); // Return file name of just created file...
};
// Task 'c' is executed as a thread to prevent blocking:
java.util.concurrent.Future<String> f = java.util.concurrent.Executors.newSingleThreadExecutor().submit(c);
try { // Simply get result:
    String file_name_of_just_created_file = f.get();
} catch (java.util.concurrent.ExecutionException | InterruptedException eeie) {
    …
}

Temporal issues

java.util.concurrent.ScheduledExecutorService derives from java.util.concurrent.ExecutorService. It is an utility to schedule and run (multiple) jobs periodically or at specific moments. It mainly aims at avoiding the use of the “legacy” java.util.Timer and java.util.TimerTask classes. The pool size establishes the number of jobs at creation time.

Example

// Configure data emission to a server:
private final java.util.concurrent.ScheduledExecutorService _to_send_service =
// Only one job:
java.util.concurrent.Executors.newScheduledThreadPool(1);
private java.util.concurrent.ScheduledFuture<?> _to_send = null;
…
_to_send = _to_send_service.scheduleAtFixedRate(() -> {
    this._send();
}, _animation_frequency, _animation_frequency, java.util.concurrent.TimeUnit.MILLISECONDS);
…
private void _reschedule() { // value of '_animation_frequency' has changed...
    _to_send.cancel(false); // 'false' means that any in-progress task is not interrupted...
    // assert (_to_send.isDone());
    _to_send = _to_send_service.scheduleAtFixedRate(() -> {
        this._send();
    }, _animation_frequency, _animation_frequency, java.util.concurrent.TimeUnit.MILLISECONDS);
}
…
_to_send_service.shutdown(); // Soft stop (in-progress tasks complete) compared to 'shutdownNow'
WebSockets is a full-duplex technology in Web programming. One may write a Java WebSockets server from scratch (here…) or reuse libraries like Java-Websocket, Tyrus (Java reference implementation), or Vert.x

Scenario(s)

A minimalist JavaScript program connects to a WebSockets server. This lightweight server is written in Java and obeys the WebSockets protocol and API (here…).

Example (access to the WebSockets server)

// Tested with Tyrus 1.17 WebSockets Java library
const service = new WebSocket("ws://localhost:1963/FranckBarbier/WebSockets_illustration");
service.onmessage = (event: MessageEvent) => {
    console.log("Message from Java: " + event.data);
};
service.onopen = (event: Event) => {
    console.log("service.onopen...");
    const response = window.confirm(service.url + " just opened... Say 'Hi!'?");
    if (response)
        service.send(JSON.stringify({Response: "Hi!"}));
};
service.onclose = (event: CloseEvent) => {
    console.log("service.onclose... " + event.code);
    window.alert("Bye! See you later...");
// '1011': the server is terminating the connection because it encountered an unexpected condition that prevented it from fulfilling the request.
};
service.onerror = (event: Event) => {
    window.alert("service.onerror...");
};

Example WebSockets_Tyrus_1_17.Java.Maven.zip  (WebSockets server creation and start plus simple client)

public class WebSockets_illustration {

    // Danger: 'My_ServerEndpoint' constructor must be accessed by the WebSockets server. Don't forget 'static'!
    @javax.websocket.server.ServerEndpoint(value = "/WebSockets_illustration")
    public static class My_ServerEndpoint {
        @javax.websocket.OnClose
        public void onClose(javax.websocket.Session session, javax.websocket.CloseReason close_reason) {
            System.out.println("onClose: " + close_reason.getReasonPhrase());
        }
        @javax.websocket.OnError
        public void onError(javax.websocket.Session session, Throwable throwable) {
            System.out.println("onError: " + throwable.getMessage());
        }
        @javax.websocket.OnMessage
        public void onMessage(javax.websocket.Session session, String message) {
            System.out.println("Message from JavaScript: " + message);
        }
        @javax.websocket.OnOpen
        public void onOpen(javax.websocket.Session session, javax.websocket.EndpointConfig ec) throws java.io.IOException {
            System.out.println("OnOpen... " + ec.getUserProperties().get("Author"));
            session.getBasicRemote().sendText("{Handshaking: \"Yes\"}");
        }
    }

    public static void main(String[] args) {
        java.util.Map<String, Object> user_properties = new java.util.HashMap();
        user_properties.put("Author", "FranckBarbier");
        org.glassfish.tyrus.server.Server server = new org.glassfish.tyrus.server.Server("localhost", 1963, "/FranckBarbier", user_properties /* or 'null' */, My_ServerEndpoint.class);
        try {
            server.start();
            System.out.println("\n*** Please press any key to stop the server... ***\n");
// The Web page (JavaScript client) is launched from Java:
            java.awt.Desktop.getDesktop().browse(java.nio.file.FileSystems.getDefault().getPath("web" + java.io.File.separatorChar + "index.html").toUri());
// The Java 11 client is launched as well:
            Java_11_client client = new Java_11_client(java.util.Optional.of(Java_11_client.class.getSimpleName()));
            java.io.BufferedReader reader = new java.io.BufferedReader(new java.io.InputStreamReader(System.in));
            reader.readLine();
            System.out.println("\n@@@\n" + client.get_log() + "@@@\n");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            server.stop();
        }
    }
}

Rule(s)

Example WebSockets_Tyrus_1_17.Java.Maven.zip 

public class Java_11_client implements java.net.http.WebSocket.Listener {
    private String _log = "";
    public String get_log() { return _log; }
    @Override
    public void onOpen(java.net.http.WebSocket ws) {
        _log += Java_11_client.class.getSimpleName() + " CONNECTED..." + '\n';
        java.net.http.WebSocket.Listener.super.onOpen(ws);
    }
    @Override
    public void onError(java.net.http.WebSocket ws, Throwable error) {
        _log += "\tonError: " + error.getMessage() + '\n';
        java.net.http.WebSocket.Listener.super.onError(ws, error);
    }
// WebSockets server (non-blocking) callback:
    @Override
    public java.util.concurrent.CompletionStage onText(java.net.http.WebSocket ws, CharSequence data, boolean last) {
        _log += "\tonText: " + data + '\n';
        return java.net.http.WebSocket.Listener.super.onText(ws, data, last);
    }
// WebSockets server (non-blocking) callback:
    @Override
    public java.util.concurrent.CompletionStage onPong(java.net.http.WebSocket ws, java.nio.ByteBuffer message) {
        _log += "\tonPong: " + new String(message.array()) + "Yes, it does..." + '\n';
        return java.net.http.WebSocket.Listener.super.onPong(ws, message);
    }

    public Java_11_client(java.util.Optional<String> o) {
        java.net.http.HttpClient hc = java.net.http.HttpClient.newBuilder().build();
        java.net.http.WebSocket.Builder b = hc.newWebSocketBuilder();
        java.net.http.WebSocket ws = b.buildAsync(java.net.URI.create("ws://localhost:1963/FranckBarbier/WebSockets_illustration"), this).join();
// Test it:
        ws.sendPing(java.nio.ByteBuffer.wrap("Does 'ping' succeed? ".getBytes()));
// Streaming:
        ws.sendText("Java 11 ", false); // 'false' means that the text is not yet finished...
        ws.sendText("client", true);
        try {
            Thread.sleep(5000); // Wait a bit before closing...
        } catch (InterruptedException ie) {
            ie.printStackTrace();
        }
        ws.sendClose(java.net.http.WebSocket.NORMAL_CLOSURE, Java_11_client.class.getSimpleName() + " DISCONNECTED..." + '\n').thenRun(() -> _log += Java_11_client.class.getSimpleName() + " DISCONNECTED..." + '\n');
    }
}
Web services consumption has been revisited in Java 11 based on an asynchronous mode (here…)

Rule(s)

Example

private void _get_items__(java.time.Duration time_out) { // https://dzone.com/articles/java-11-standardized-http-client-api
    assert (!time_out.isNegative() && !time_out.isZero());
    if (_json_query != null && _json_result == null) {
        _get_items_elapsed_time = System.currentTimeMillis();

        var client = java.net.http.HttpClient.newBuilder().build();

        var request = java.net.http.HttpRequest.newBuilder(java.net.URI.create(_url))
            .POST(java.net.http.HttpRequest.BodyPublishers.ofString(_json_query))
            .headers("accept", "application/json",
                "content-type", "application/json; charset=UTF-8",
                "x-ibm-client-id", _x_ibm_client_id,
                "x-ibm-client-secret", _x_ibm_client_secret)
            .timeout(time_out)
            .build();

        java.util.concurrent.CompletableFuture<java.net.http.HttpResponse<java.io.InputStream>> response =
            client.sendAsync(request, java.net.http.HttpResponse.BodyHandlers.ofInputStream());
        try {
            // 'get' waits for result:
            var is = response.get().body();
            _result_as_pretty_JSON(is); // Versus '_result_as_raw_JSON(is);'
        } catch (java.util.concurrent.ExecutionException | InterruptedException ee_ie) {
            _json_result = javax.json.Json.createObjectBuilder().add(_ERROR, ee_ie.getMessage()).build().toString();
            System.err.println(ee_ie.getClass().getSimpleName() + ": " + ee_ie.getMessage());
        } finally {
            _get_items_elapsed_time = System.currentTimeMillis() - _get_items_elapsed_time;
        }
    }
}