Concurrency
- Create worker threads using
RunnableandCallable, and manage concurrency using anExecutorServiceandjava.util.concurrentAPI- Develop thread-safe code, using different locking mechanisms and
java.util.concurrentAPI
This tutorial is about fundamentals (
Threadclass,synchronizedkeyword…) and advances of multithreading. Parallelism in Java gradually evolves along with the ability of Java to target big data processing. Low-level Application Programming Interfaces -APIs- tend to disappear to the benefit of asynchronous programming: applications consume “services” that produce (huge) data. The asynchronous nature of services calls for (newer) high-end APIs like, for example, streams. Asynchronous APIs for HTTP, HTTPS, WebSockets… are also part of this challenge
From Java 5, significant modifications occur in the way “parallel programming” ☛ (i.e., concurrency, multithreading…) can be managed, including the use of the original
synchronizedandvolatilekeywordsOperating System -OS- level
Example Multithreading.Java.zip
![]()
String[] command = {"/usr/local/bin/node", "--version"}; // macOS-specific! try { Process p = Runtime.getRuntime().exec(command); // Heavyweight process with probable incompatibilities across operating systems like Windows, macOS, LINUX, etc. if (p.waitFor() == 0) { // Blocking! Better solution required, e.g., look at https://www.javaworld.com/article/2071275/when-runtime-exec---won-t.html try (java.io.BufferedReader input = new java.io.BufferedReader(new java.io.InputStreamReader(p.getInputStream()))) { String content; while ((content = input.readLine()) != null) System.out.println("Out: " + content); // 'Out: v11.8.0' is displayed... } } else { try (java.io.BufferedReader input = new java.io.BufferedReader(new java.io.InputStreamReader(p.getErrorStream()))) { String content; while ((content = input.readLine()) != null) System.err.println("Err: " + content); } } } catch (InterruptedException | java.io.IOException ieioe) { System.err.println("Runtime.getRuntime().exec(command): " + ieioe.getClass().getSimpleName() + ": " + ieioe.getMessage()); }Java Virtual Machine -JVM- level
Example
private boolean _stop = false; private Thread _t1; // Lightweight process … _t1 = new Thread() { public void run() { while(! _stop) { /* Something to do… */ } } }; … _t1.start(); // 'Thread' objects are NEVER restartable! … _stop = true; // Never used '_t1.stop();' _t1 = null; // For garbage collector
RunnableinterfaceRule(s)
java.lang.Runnable☛ is a functional interface. It has a singlerunmethod that requires a (deferred) body through an inheriting class. Threads require runnable objects as arguments for later executing (based onstart)runin split threads of control. To avoid the creation of such a class (that involves the creation of an associated.javafile) for only providing the body ofrun, a lambda expression is usefulExample Lambda_expression.Java.zip
![]()
public class Functional_interface_test { public static void main(String[] args) { // No reference to the 'run' method is required since 'java.lang.Runnable' only owns this single method: Runnable to_do = () -> { try { Thread.sleep(1000L); // Nothing to do for 1 sec. <=> a kind of delay! } catch (InterruptedException ie) { java.util.logging.Logger.getLogger("Why it doesn't work?").log(java.util.logging.Level.SEVERE, null, ie); } }; new Thread(to_do).start(); …
synchronizedkeywordSynchronized method
Rule(s)
- A synchronized method is by definition unstoppable. In other words, a thread running a synchronized method cannot be stopped for the benefit of another concurrent thread
Example Multithreading.Java.zip
![]()
public class Illustration_of_synchronized { static private int _I = 0; // 'static' is for convenience and simplification... static { // Static initialiser is for convenience and simplification... Thread plusplus = new Thread("plusplus") { @Override public void run() { assert (Thread.currentThread() == this); // System.out.println(Thread.currentThread().getName()); // 'plusplus' _Plus(); _Plus(); } }; assert (plusplus.getState() == Thread.State.NEW); Thread minus = new Thread("minus") { @Override public void run() { assert (Thread.currentThread() == this); // System.out.println(Thread.currentThread().getName()); // "minus" _Minus(); } }; assert (minus.getState() == Thread.State.NEW); plusplus.start(); // 0 -> 1 -> 2 *OR* -1 -> 0 -> 1 assert (plusplus.getState() == Thread.State.RUNNABLE || plusplus.getState() == Thread.State.TERMINATED); minus.start(); // 2 -> 1 *OR* 0 -> -1 assert (minus.getState() == Thread.State.RUNNABLE || minus.getState() == Thread.State.TERMINATED); /** * Caution: execution can be -plusplus ; minus- *OR* -minus ; plusplus- */ } static synchronized private void _Plus() { _I++; } static synchronized private void _Minus() { _I--; } public static void main(String[] args) { System.out.println(Thread.currentThread().getName()); // 'main' System.out.println("Due to 'plusplus' and 'minus' are BOTH unstoppable, result cannot be anything but '1': " + _I); } }Synchronized block
Rule(s)
- A synchronized block allows the delimitation of unstoppable code pieces. In this case,
synchronizedthen requires the object to be synchronized while a synchronized method acts onthis. Synchronized blocks cannot cope with primitive types… From Java 5,synchronizedbenefits from being replaced by thejava.util.concurrent.locks.LockinterfaceExample
Temperature t = new Temperature(); synchronized(t) { // Mutex acquisition on 't' // Synchronized code… } // Mutex release on 't'“Parallel programming”-based support in
java.lang.ObjectRule(s)
wait,notifyandnotifyAllarejava.lang.Objectroutines for coordination. The only way of (safely) using them is insidesynchronizedmethods.wait,notifyandnotifyAllare associated withjava.lang.IllegalMonitorStateException. Issues are discussed ☛- From Java 5,
await,signalandsignalAllin thejava.util.concurrent.locks.Conditioninterface offers a similar, but more flexible way of thread coordinationExample Multithreading.Java.zip
![]()
public class Simple_dead_lock { private int _i = 0; synchronized public void f() { System.err.println("Start of 'f()'"); _i++; _g(); System.err.println("Value of '_i': " + _i); notify(); } synchronized private void _g() { System.err.println("Start of '_g()'"); try { wait(5000); // Dead lock occurs for 5 sec.! } catch (InterruptedException ie) { ie.printStackTrace(); } System.err.println("End of 'wait()'"); } public static void main(String[] args) { Simple_dead_lock sdl = new Simple_dead_lock(); sdl.f(); } }
volatilekeywordRule(s)
- The
volatilekeyword in Java is a subtle notion in the sense that few people really understand (and thus master) this notion. A complete analysis ofvolatilemay be found ☛ It in particular results from the deep meaning ofvolatilethatvolatileandfinalare incompatible (compilation error). Remind thatfinalkeyword just prevents multiple assignment (this is different fromconstvariables in C++ whose state cannot change at all)Example Multithreading.Java.zip
![]()
public class Illustration_of_volatile { private int _bads = 0; private int _goods = 0; volatile private boolean _stop = false; // Important issue: '_stop' is used in read mode only, which is a smart use of 'volatile'! /** * The game is avoiding the following inconsistent compositions: * "FrançoisSarkozy" and "NicolasHollande" */ // 1. Executing the program "as is" leads to some bad compositions... // 2. Uncomment 'volatile' for '_name', does it work? Unfortunately no, some bad compositions persist... // 3. Use 'java.util.concurrent.atomic.AtomicReference' instead... Does it work? Fortunately yes! /*volatile*/ private String _name = "?"; // java.util.concurrent.atomic.AtomicReference<String> _name = new java.util.concurrent.atomic.AtomicReference("?"); private Thread _t1, _t2, _t3; private void set(String given_name, String surname) { /** * Executions 1 & 2 */ _name = given_name; try { Thread.sleep(1); // Ficticious delay so that 'set' lasts and then may be "actually" interrupted... } catch (InterruptedException ie) { ie.printStackTrace(); } _name += surname; /** * Execution 3 */ // _name.set(given_name + surname); // To be used with 'java.util.concurrent.atomic.AtomicReference<String> _name = new java.util.concurrent.atomic.AtomicReference("?");' } /** * If not 'synchronized' then 'check' can be interrupted by '_t1' and '_t2', * but, anyway, bad compositions may occur... */ /*synchronized*/ private void check() { if (_name.equals("Fran" + (char) 0x00E7 + "ois" + "Sarkozy") || _name.equals("Nicolas" + "Hollande")) { _bads++; } else { _goods++; } } public Illustration_of_volatile() { _t1 = new Thread() { public void run() { while (!_stop) { set("Fran" + (char) 0x00E7 + "ois", "Hollande"); } } }; _t2 = new Thread() { public void run() { while (!_stop) { set("Nicolas", "Sarkozy"); } } }; _t3 = new Thread() { public void run() { while (!_stop) { check(); } } }; } public void start() { _t1.start(); _t2.start(); _t3.start(); try { Thread.sleep(2000); } catch (InterruptedException ie) { ie.printStackTrace(); } _stop = true; // Let think about this: while all of the three threads are stopped at the same time? _t1 = null; _t2 = null; _t3 = null; } public static void main(String[] args) { Illustration_of_volatile iov = new Illustration_of_volatile(); iov.start(); System.out.println("Bad compositions: " + iov._bads); System.out.println("Good compositions: " + iov._goods); } }
java.util.concurrent packageFrom Java 5, the
java.util.concurrentpackage revisits the way “parallel programming” may be performed in JavaRule(s)
- The key issue is the fact that native Java support may benefit from being substituted for an enhanced support in
java.util.concurrent(further detail ☛)Example Multithreading.Java.zip
![]()
public class Illustration_of_Lock_and_Condition { private final java.util.LinkedList<String> _data = new java.util.LinkedList(); private java.nio.file.Path _file; // https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/locks/Lock.html private final java.util.concurrent.locks.Lock _lock = new java.util.concurrent.locks.ReentrantLock(/* Exercise 1: */ /* true */); // https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/locks/Condition.html private final java.util.concurrent.locks.Condition _condition = _lock.newCondition(); private boolean _stop = false; /* Exercise 1: */ /* private int _count = 0; */ public Illustration_of_Lock_and_Condition(String data) { assert (data != null && data.length() > 0); try { _file = new java.io.File("my_file").toPath(); java.nio.file.Files.deleteIfExists(_file); java.nio.file.Files.write(_file, data.getBytes()); } catch (java.io.IOException ioe) { System.err.println(this.getClass().getSimpleName() + ": " + ioe.getMessage()); Runtime.getRuntime().exit(-1); // For simplicity only, i.e., didactical program } } private void _loading() { try { java.util.List<String> lines = java.nio.file.Files.readAllLines(_file, java.nio.charset.Charset.defaultCharset()); for (String line : lines) { _data.add(line); try { Thread.sleep(1); // Fictitious delay so that 'for' loop lasts and then may be artificially paused... } catch (InterruptedException ie) { System.err.println(this.getClass().getSimpleName() + ": " + ie.getMessage()); Runtime.getRuntime().exit(-2); // For simplicity only, i.e., didactical program } // System.out.println(line + " loaded..."); } } catch (java.io.IOException ioe) { System.err.println(this.getClass().getSimpleName() + ": " + ioe.getMessage()); Runtime.getRuntime().exit(-2); // For simplicity only, i.e., didactical program } finally { _stop = true; } // System.out.println("'_loading' is ending..."); } private void _processing() { while (!_stop || !_data.isEmpty()) { _lock.lock(); // Thread becomes dormant while awaiting the lock's aquisition... try { _data.addFirst(_data.removeFirst().toUpperCase()); _condition.signal(); /* Exercise 1: */ /* System.out.println(_data.getFirst() + " processed..." + _count++); */ try { Thread.sleep(1); // Ficticious delay so that processing lasts and then may be artificially paused... } catch (InterruptedException ie) { System.err.println(this.getClass().getSimpleName() + ": " + ie.getMessage()); Runtime.getRuntime().exit(-2); // For simplicity only, i.e., didactical program } } catch (java.util.NoSuchElementException nsee) { // 'removeFirst' may occur while the list is empty... It doesn't matter for a didactical program, but improvements are required for a professional program! } finally { _lock.unlock(); } } // System.out.println("'_processing' is ending..."); } private void _unloading() { while (!_stop || !_data.isEmpty()) { _lock.lock(); // Thread becomes dormant while awaiting the lock's aquisition... try { // "Spurious" wakeup unmanaged: _condition.await(); // Lock is atomically released... System.out.println(_data.removeFirst() + " unloaded..."); } catch (InterruptedException ie) { System.err.println(this.getClass().getSimpleName() + ": " + ie.getMessage()); Runtime.getRuntime().exit(-2); // For simplicity only, i.e., didactical program } catch (java.util.NoSuchElementException nsee) { // 'removeFirst' may occur while the list is empty... It doesn't matter for a didactical program, but improvements are required for a professional program! } finally { _lock.unlock(); // If 'await' fails then the lock is released... } } // System.out.println("'_unloading' is ending..."); } public static void main(String[] args) { final Illustration_of_Lock_and_Condition iolac = new Illustration_of_Lock_and_Condition("f\nr\na\nn\nc\nk\n \nb\na\nr\nb\ni\ne\nr"); new Thread("unloading") { @Override public void run() { // 'this' is the executing thread! iolac._unloading(); } }.start(); new Thread("processing") { @Override public void run() { // 'this' is the executing thread! iolac._processing(); } }.start(); new Thread("loading") { @Override public void run() { // 'this' is the executing thread! iolac._loading(); } }.start(); } }Exercise 1
The program looks pretty slow. This is confirmed in uncommenting
/* Exercise 1: */ /* private int _count = 0; */and/* Exercise 1: */ /* System.out.println(_data.getFirst() + " processed..." + _count++); */In fact, the
processingthread performs many useless changes (≈_count) to the first element of_data. This occurs because theunloadingthread is the most waiting thread: data (i.e., simple strings) can be unloaded (i.e., removed the_datalist) only when processed (i.e., changed to upper case). Thejava.util.concurrent.locks.ReentrantLockclass then comes with a “fairness” parameter in the constructor:truemeans that the most waiting threads have priority. By uncommenting/* Exercise 1: */ /* true */, the program goes faster!
java.util.concurrent.ExecutorServiceFrom Java 5, the
java.util.concurrent.ExecutorServiceinterface aims at making more intuitive the use of threads. From Java 7,java.util.concurrent.ForkJoinPool☛ as subtype ofjava.util.concurrent.ExecutorServiceenhances synchronization capabilities: difference is in particular discussed ☛Example (Carrefour_Items.Java.zip
)
// File system "Observer" pattern (from Java 7): java.nio.file.WatchService ws = java.nio.file.FileSystems.getDefault().newWatchService(); /** 'java.nio.file.Path' interface extends 'java.nio.file.Watchable' interface */ // 'database' directory is observed by a daemon for any file creation: java.nio.file.FileSystems.getDefault().getPath("database").register(ws, java.nio.file.StandardWatchEventKinds.ENTRY_CREATE); … // Watch service waits for just created file in 'database': final java.util.concurrent.Callable<String> c = () -> { // Override 'call' method... // 'take' is a blocking statement... java.nio.file.WatchEvent<?> we = ws.take().pollEvents().get(0); return ((java.nio.file.WatchEvent<java.nio.file.Path>) we).context().toString(); // Return file name of just created file... }; // Task 'c' is executed as a thread to prevent blocking: java.util.concurrent.Future<String> f = java.util.concurrent.Executors.newSingleThreadExecutor().submit(c); try { // Simply get result: String file_name_of_just_created_file = f.get(); } catch (java.util.concurrent.ExecutionException | InterruptedException eeie) { … }Temporal issues
java.util.concurrent.ScheduledExecutorServicederives fromjava.util.concurrent.ExecutorService. It is an utility to schedule and run (multiple) jobs periodically or at specific moments. It mainly aims at avoiding the use of the “legacy”java.util.Timerandjava.util.TimerTaskclasses. The pool size establishes the number of jobs at creation timeExample
// Configure data emission to a server: private final java.util.concurrent.ScheduledExecutorService _to_send_service = // Only one job: java.util.concurrent.Executors.newScheduledThreadPool(1); private java.util.concurrent.ScheduledFuture<?> _to_send = null; … _to_send = _to_send_service.scheduleAtFixedRate(() -> { this._send(); }, _animation_frequency, _animation_frequency, java.util.concurrent.TimeUnit.MILLISECONDS); … private void _reschedule() { // value of '_animation_frequency' has changed... _to_send.cancel(false); // 'false' means that any in-progress task is not interrupted... // assert (_to_send.isDone()); _to_send = _to_send_service.scheduleAtFixedRate(() -> { this._send(); }, _animation_frequency, _animation_frequency, java.util.concurrent.TimeUnit.MILLISECONDS); } … _to_send_service.shutdown(); // Soft stop (in-progress tasks complete) compared to 'shutdownNow'
WebSockets is a full-duplex technology in Web programming. One may write a Java WebSockets server from scratch ☛ or reuse libraries like Java-Websocket ☛, Tyrus ☛ (Java reference implementation), or Vert.x ☛
Scenario(s)
A minimalist JavaScript program connects to a WebSockets server. This lightweight server is written in Java and obeys the WebSockets protocol and API ☛
Example (access to the WebSockets server)
// Tested with Tyrus 1.17 WebSockets Java library const service = new WebSocket("ws://localhost:1963/FranckBarbier/WebSockets_illustration"); service.onmessage = (event: MessageEvent) => { console.log("Message from Java: " + event.data); }; service.onopen = (event: Event) => { console.log("service.onopen..."); const response = window.confirm(service.url + " just opened... Say 'Hi!'?"); if (response) service.send(JSON.stringify({Response: "Hi!"})); }; service.onclose = (event: CloseEvent) => { console.log("service.onclose... " + event.code); window.alert("Bye! See you later..."); // '1011': the server is terminating the connection because it encountered an unexpected condition that prevented it from fulfilling the request. }; service.onerror = (event: Event) => { window.alert("service.onerror..."); };Example WebSockets_Tyrus_1_17.Java.zip
(WebSockets server creation and start plus simple client)
public class WebSockets_illustration { // Danger: 'My_ServerEndpoint' constructor must be accessed by the WebSockets server. Don't forget 'static'! @javax.websocket.server.ServerEndpoint(value = "/WebSockets_illustration") public static class My_ServerEndpoint { @javax.websocket.OnClose public void onClose(javax.websocket.Session session, javax.websocket.CloseReason close_reason) { System.out.println("onClose: " + close_reason.getReasonPhrase()); } @javax.websocket.OnError public void onError(javax.websocket.Session session, Throwable throwable) { System.out.println("onError: " + throwable.getMessage()); } @javax.websocket.OnMessage public void onMessage(javax.websocket.Session session, String message) { System.out.println("Message from JavaScript: " + message); } @javax.websocket.OnOpen public void onOpen(javax.websocket.Session session, javax.websocket.EndpointConfig ec) throws java.io.IOException { System.out.println("OnOpen... " + ec.getUserProperties().get("Author")); session.getBasicRemote().sendText("{Handshaking: \"Yes\"}"); } } public static void main(String[] args) { java.util.Map<String, Object> user_properties = new java.util.HashMap(); user_properties.put("Author", "FranckBarbier"); org.glassfish.tyrus.server.Server server = new org.glassfish.tyrus.server.Server("localhost", 1963, "/FranckBarbier", user_properties /* or 'null' */, My_ServerEndpoint.class); try { server.start(); System.out.println("\n*** Please press any key to stop the server... ***\n"); // The Web page (JavaScript client) is launched from Java: java.awt.Desktop.getDesktop().browse(java.nio.file.FileSystems.getDefault().getPath("web" + java.io.File.separatorChar + "index.html").toUri()); // The Java 11 client is launched as well: Java_11_client client = new Java_11_client(java.util.Optional.of(Java_11_client.class.getSimpleName())); java.io.BufferedReader reader = new java.io.BufferedReader(new java.io.InputStreamReader(System.in)); reader.readLine(); System.out.println("\n@@@\n" + client.get_log() + "@@@\n"); } catch (Exception e) { e.printStackTrace(); } finally { server.stop(); } } }Rule(s)
- The use of the
wssprotocol imposes the use of certificates. For instance, on macOS:Utilities > Keychain Access > Certificate Assistant > Create a Certificate. KeyTool ☛ that comes with the Java Runtime Environment -JRE- provides an alternative to Operating System -OS- support- The revised
HttpClientAPI ☛ in Java 11 also supports the manipulation of asynchronous (non-blocking) WebSocketsExample WebSockets_Tyrus_1_17.Java.Maven.zip
public class Java_11_client implements java.net.http.WebSocket.Listener { private String _log = ""; public String get_log() { return _log; } @Override public void onOpen(java.net.http.WebSocket ws) { _log += Java_11_client.class.getSimpleName() + " CONNECTED..." + '\n'; java.net.http.WebSocket.Listener.super.onOpen(ws); } @Override public void onError(java.net.http.WebSocket ws, Throwable error) { _log += "\tonError: " + error.getMessage() + '\n'; java.net.http.WebSocket.Listener.super.onError(ws, error); } // WebSockets server (non-blocking) callback: @Override public java.util.concurrent.CompletionStage> onText(java.net.http.WebSocket ws, CharSequence data, boolean last) { _log += "\tonText: " + data + '\n'; return java.net.http.WebSocket.Listener.super.onText(ws, data, last); } // WebSockets server (non-blocking) callback: @Override public java.util.concurrent.CompletionStage> onPong(java.net.http.WebSocket ws, java.nio.ByteBuffer message) { _log += "\tonPong: " + new String(message.array()) + "Yes, it does..." + '\n'; return java.net.http.WebSocket.Listener.super.onPong(ws, message); } public Java_11_client(java.util.Optional<String> o) { java.net.http.HttpClient hc = java.net.http.HttpClient.newBuilder().build(); java.net.http.WebSocket.Builder b = hc.newWebSocketBuilder(); java.net.http.WebSocket ws = b.buildAsync(java.net.URI.create("ws://localhost:1963/FranckBarbier/WebSockets_illustration"), this).join(); // Test it: ws.sendPing(java.nio.ByteBuffer.wrap("Does 'ping' succeed? ".getBytes())); // Streaming: ws.sendText("Java 11 ", false); // 'false' means that the text is not yet finished... ws.sendText("client", true); try { Thread.sleep(5000); // Wait a bit before closing... } catch (InterruptedException ie) { ie.printStackTrace(); } ws.sendClose(java.net.http.WebSocket.NORMAL_CLOSURE, Java_11_client.class.getSimpleName() + " DISCONNECTED..." + '\n').thenRun(() -> _log += Java_11_client.class.getSimpleName() + " DISCONNECTED..." + '\n'); } }Web services consumption has been revisited in Java 11 based on an asynchronous mode ☛ Rule(s)
- Requests responses are instances of the
java.util.concurrent.CompletableFuture<java.net.http.HttpResponse<X>>class;Xhas to be substituted by the (desired) effective type of the returned data for processing (e.g.,java.io.InputStreamin the following example). Note thatjava.util.concurrent.CompletableFuture<T>is similar toPromise<T>in TypeScriptExample
private void _get_items__(java.time.Duration time_out) { // https://dzone.com/articles/java-11-standardized-http-client-api assert (!time_out.isNegative() && !time_out.isZero()); if (_json_query != null && _json_result == null) { _get_items_elapsed_time = System.currentTimeMillis(); var client = java.net.http.HttpClient.newBuilder().build(); var request = java.net.http.HttpRequest.newBuilder(java.net.URI.create(_url)) .POST(java.net.http.HttpRequest.BodyPublishers.ofString(_json_query)) .headers("accept", "application/json", "content-type", "application/json; charset=UTF-8", "x-ibm-client-id", _x_ibm_client_id, "x-ibm-client-secret", _x_ibm_client_secret) .timeout(time_out) .build(); java.util.concurrent.CompletableFuture<java.net.http.HttpResponse<java.io.InputStream>> response = client.sendAsync(request, java.net.http.HttpResponse.BodyHandlers.ofInputStream()); try { // 'get' waits for result: var is = response.get().body(); _result_as_pretty_JSON(is); // Versus '_result_as_raw_JSON(is);' } catch (java.util.concurrent.ExecutionException | InterruptedException ee_ie) { _json_result = javax.json.Json.createObjectBuilder().add(_ERROR, ee_ie.getMessage()).build().toString(); System.err.println(ee_ie.getClass().getSimpleName() + ": " + ee_ie.getMessage()); } finally { _get_items_elapsed_time = System.currentTimeMillis() - _get_items_elapsed_time; } } }