Concurrency
- Create worker threads using
Runnable
andCallable
, and manage concurrency using anExecutorService
andjava.util.concurrent
API- Develop thread-safe code, using different locking mechanisms and
java.util.concurrent
API
This tutorial is about fundamentals (
Thread
class,synchronized
keyword…) and advances of multithreading. Parallelism in Java gradually evolves along with the ability of Java to target big data processing. Low-level Application Programming Interfaces -APIs- tend to disappear to the benefit of asynchronous programming: applications consume “services” that produce (huge) data. The asynchronous nature of services calls for (newer) high-end APIs like, for example, streams. Asynchronous APIs for HTTP, HTTPS, WebSockets… are also part of this challenge.
From Java 5, significant modifications occur in the way “parallel programming” (i.e., concurrency, multithreading…) can be managed, including the use of the original
synchronized
andvolatile
keywords.Operating System -OS- level
Example Multithreading.Java.zip
String[] command = {"/usr/local/bin/node", "--version"}; // macOS-specific! try { Process p = Runtime.getRuntime().exec(command); // Heavyweight process with probable incompatibilities across operating systems like Windows, macOS, LINUX, etc. if (p.waitFor() == 0) { // Blocking! Better solution required, e.g., look at https://www.javaworld.com/article/2071275/when-runtime-exec---won-t.html try (java.io.BufferedReader input = new java.io.BufferedReader(new java.io.InputStreamReader(p.getInputStream()))) { String content; while ((content = input.readLine()) != null) System.out.println("Out: " + content); // 'Out: v11.8.0' is displayed... } } else { try (java.io.BufferedReader input = new java.io.BufferedReader(new java.io.InputStreamReader(p.getErrorStream()))) { String content; while ((content = input.readLine()) != null) System.err.println("Err: " + content); } } } catch (InterruptedException | java.io.IOException ieioe) { System.err.println("Runtime.getRuntime().exec(command): " + ieioe.getClass().getSimpleName() + ": " + ieioe.getMessage()); }
Java Virtual Machine -JVM- level
Example
private boolean _stop = false; private Thread _t1; // Lightweight process … _t1 = new Thread() { public void run() { while(! _stop) { /* Something to do… */ } } }; … _t1.start(); // 'Thread' objects are NEVER restartable! … _stop = true; // Never used '_t1.stop();' _t1 = null; // For garbage collector
Runnable
interfaceRule(s)
java.lang.Runnable
is a functional interface. It has a singlerun
method that requires a (deferred) body through an inheriting class. Threads require runnable objects as arguments for later executing (based onstart
)run
in split threads of control. To avoid the creation of such a class (that involves the creation of an associated.java
file) for only providing the body ofrun
, a lambda expression is useful.Example Lambda_expression.Java.zip
public class Functional_interface_test { public static void main(String[] args) { // No reference to the 'run' method is required since 'java.lang.Runnable' only owns this single method: Runnable to_do = () -> { try { Thread.sleep(1000L); // Nothing to do for 1 sec. <=> a kind of delay! } catch (InterruptedException ie) { java.util.logging.Logger.getLogger("Why it doesn't work?").log(java.util.logging.Level.SEVERE, null, ie); } }; new Thread(to_do).start(); …
synchronized
keywordSynchronized method
Rule(s)
- A synchronized method is by definition unstoppable. In other words, a thread running a synchronized method cannot be stopped for the benefit of another concurrent thread.
Example Multithreading.Java.zip
public class Illustration_of_synchronized { static private int _I = 0; // 'static' is for convenience and simplification... static { // Static initialiser is for convenience and simplification... Thread plusplus = new Thread("plusplus") { @Override public void run() { assert (Thread.currentThread() == this); // System.out.println(Thread.currentThread().getName()); // 'plusplus' _Plus(); _Plus(); } }; assert (plusplus.getState() == Thread.State.NEW); Thread minus = new Thread("minus") { @Override public void run() { assert (Thread.currentThread() == this); // System.out.println(Thread.currentThread().getName()); // "minus" _Minus(); } }; assert (minus.getState() == Thread.State.NEW); plusplus.start(); // 0 -> 1 -> 2 *OR* -1 -> 0 -> 1 assert (plusplus.getState() == Thread.State.RUNNABLE || plusplus.getState() == Thread.State.TERMINATED); minus.start(); // 2 -> 1 *OR* 0 -> -1 assert (minus.getState() == Thread.State.RUNNABLE || minus.getState() == Thread.State.TERMINATED); /** * Caution: execution can be -plusplus ; minus- *OR* -minus ; plusplus- */ } static synchronized private void _Plus() { _I++; } static synchronized private void _Minus() { _I--; } public static void main(String[] args) { System.out.println(Thread.currentThread().getName()); // 'main' System.out.println("Due to 'plusplus' and 'minus' are BOTH unstoppable, result cannot be anything but '1': " + _I); } }
Synchronized block
Rule(s)
- A synchronized block allows the delimitation of unstoppable code pieces. In this case,
synchronized
then requires the object to be synchronized while a synchronized method acts onthis
. Synchronized blocks cannot cope with primitive types… From Java 5,synchronized
benefits from being replaced by thejava.util.concurrent.locks.Lock
interface.Example
Temperature t = new Temperature(); synchronized(t) { // Mutex acquisition on 't' // Synchronized code… } // Mutex release on 't'
“Parallel programming”-based support in
java.lang.Object
Rule(s)
wait
,notify
andnotifyAll
arejava.lang.Object
routines for coordination. The only way of (safely) using them is insidesynchronized
methods.wait
,notify
andnotifyAll
are associated withjava.lang.IllegalMonitorStateException
(issues are discussed here…).- From Java 5,
await
,signal
andsignalAll
in thejava.util.concurrent.locks.Condition
interface offers a similar, but more flexible way of thread coordination.Example Multithreading.Java.zip
public class Simple_dead_lock { private int _i = 0; synchronized public void f() { System.err.println("Start of 'f()'"); _i++; _g(); System.err.println("Value of '_i': " + _i); notify(); } synchronized private void _g() { System.err.println("Start of '_g()'"); try { wait(5000); // Dead lock occurs for 5 sec.! } catch (InterruptedException ie) { ie.printStackTrace(); } System.err.println("End of 'wait()'"); } public static void main(String[] args) { Simple_dead_lock sdl = new Simple_dead_lock(); sdl.f(); } }
volatile
keywordRule(s)
- The
volatile
keyword in Java is a subtle notion in the sense that few people really understand (and thus master) this notion. A complete analysis ofvolatile
may be found here… It in particular results from the deep meaning ofvolatile
thatvolatile
andfinal
are incompatible (compilation error). Remind thatfinal
keyword just prevents multiple assignment (this is different fromconst
variables in C++ whose state cannot change at all)Example Multithreading.Java.zip
public class Illustration_of_volatile { private int _bads = 0; private int _goods = 0; volatile private boolean _stop = false; // Important issue: '_stop' is used in read mode only, which is a smart use of 'volatile'! /** * The game is avoiding the following inconsistent compositions: * "FrançoisSarkozy" and "NicolasHollande" */ // 1. Executing the program "as is" leads to some bad compositions... // 2. Uncomment 'volatile' for '_name', does it work? Unfortunately no, some bad compositions persist... // 3. Use 'java.util.concurrent.atomic.AtomicReference' instead... Does it work? Fortunately yes! /*volatile*/ private String _name = "?"; // java.util.concurrent.atomic.AtomicReference<String> _name = new java.util.concurrent.atomic.AtomicReference("?"); private Thread _t1, _t2, _t3; private void set(String given_name, String surname) { /** * Executions 1 & 2 */ _name = given_name; try { Thread.sleep(1); // Ficticious delay so that 'set' lasts and then may be "actually" interrupted... } catch (InterruptedException ie) { ie.printStackTrace(); } _name += surname; /** * Execution 3 */ // _name.set(given_name + surname); // To be used with 'java.util.concurrent.atomic.AtomicReference<String> _name = new java.util.concurrent.atomic.AtomicReference("?");' } /** * If not 'synchronized' then 'check' can be interrupted by '_t1' and '_t2', * but, anyway, bad compositions may occur... */ /*synchronized*/ private void check() { if (_name.equals("Fran" + (char) 0x00E7 + "ois" + "Sarkozy") || _name.equals("Nicolas" + "Hollande")) { _bads++; } else { _goods++; } } public Illustration_of_volatile() { _t1 = new Thread() { public void run() { while (!_stop) { set("Fran" + (char) 0x00E7 + "ois", "Hollande"); } } }; _t2 = new Thread() { public void run() { while (!_stop) { set("Nicolas", "Sarkozy"); } } }; _t3 = new Thread() { public void run() { while (!_stop) { check(); } } }; } public void start() { _t1.start(); _t2.start(); _t3.start(); try { Thread.sleep(2000); } catch (InterruptedException ie) { ie.printStackTrace(); } _stop = true; // Let think about this: while all of the three threads are stopped at the same time? _t1 = null; _t2 = null; _t3 = null; } public static void main(String[] args) { Illustration_of_volatile iov = new Illustration_of_volatile(); iov.start(); System.out.println("Bad compositions: " + iov._bads); System.out.println("Good compositions: " + iov._goods); } }
java.util.concurrent
packageFrom Java 5, the
java.util.concurrent
package revisits the way “parallel programming” may be performed in JavaRule(s)
- The key issue is the fact that native Java support may benefit from being substituted for an enhanced support in
java.util.concurrent
(further detail here…).Example Multithreading.Java.zip
public class Illustration_of_Lock_and_Condition { private final java.util.LinkedList<String> _data = new java.util.LinkedList(); private java.nio.file.Path _file; // https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/locks/Lock.html private final java.util.concurrent.locks.Lock _lock = new java.util.concurrent.locks.ReentrantLock(/* Exercise 1: */ /* true */); // https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/locks/Condition.html private final java.util.concurrent.locks.Condition _condition = _lock.newCondition(); private boolean _stop = false; /* Exercise 1: */ /* private int _count = 0; */ public Illustration_of_Lock_and_Condition(String data) { assert (data != null && data.length() > 0); try { _file = new java.io.File("my_file").toPath(); java.nio.file.Files.deleteIfExists(_file); java.nio.file.Files.write(_file, data.getBytes()); } catch (java.io.IOException ioe) { System.err.println(this.getClass().getSimpleName() + ": " + ioe.getMessage()); Runtime.getRuntime().exit(-1); // For simplicity only, i.e., didactical program } } private void _loading() { try { java.util.List<String> lines = java.nio.file.Files.readAllLines(_file, java.nio.charset.Charset.defaultCharset()); for (String line : lines) { _data.add(line); try { Thread.sleep(1); // Fictitious delay so that 'for' loop lasts and then may be artificially paused... } catch (InterruptedException ie) { System.err.println(this.getClass().getSimpleName() + ": " + ie.getMessage()); Runtime.getRuntime().exit(-2); // For simplicity only, i.e., didactical program } // System.out.println(line + " loaded..."); } } catch (java.io.IOException ioe) { System.err.println(this.getClass().getSimpleName() + ": " + ioe.getMessage()); Runtime.getRuntime().exit(-2); // For simplicity only, i.e., didactical program } finally { _stop = true; } // System.out.println("'_loading' is ending..."); } private void _processing() { while (!_stop || !_data.isEmpty()) { _lock.lock(); // Thread becomes dormant while awaiting the lock's aquisition... try { _data.addFirst(_data.removeFirst().toUpperCase()); _condition.signal(); /* Exercise 1: */ /* System.out.println(_data.getFirst() + " processed..." + _count++); */ try { Thread.sleep(1); // Ficticious delay so that processing lasts and then may be artificially paused... } catch (InterruptedException ie) { System.err.println(this.getClass().getSimpleName() + ": " + ie.getMessage()); Runtime.getRuntime().exit(-2); // For simplicity only, i.e., didactical program } } catch (java.util.NoSuchElementException nsee) { // 'removeFirst' may occur while the list is empty... It doesn't matter for a didactical program, but improvements are required for a professional program! } finally { _lock.unlock(); } } // System.out.println("'_processing' is ending..."); } private void _unloading() { while (!_stop || !_data.isEmpty()) { _lock.lock(); // Thread becomes dormant while awaiting the lock's aquisition... try { // "Spurious" wakeup unmanaged: _condition.await(); // Lock is atomically released... System.out.println(_data.removeFirst() + " unloaded..."); } catch (InterruptedException ie) { System.err.println(this.getClass().getSimpleName() + ": " + ie.getMessage()); Runtime.getRuntime().exit(-2); // For simplicity only, i.e., didactical program } catch (java.util.NoSuchElementException nsee) { // 'removeFirst' may occur while the list is empty... It doesn't matter for a didactical program, but improvements are required for a professional program! } finally { _lock.unlock(); // If 'await' fails then the lock is released... } } // System.out.println("'_unloading' is ending..."); } public static void main(String[] args) { final Illustration_of_Lock_and_Condition iolac = new Illustration_of_Lock_and_Condition("f\nr\na\nn\nc\nk\n \nb\na\nr\nb\ni\ne\nr"); new Thread("unloading") { @Override public void run() { // 'this' is the executing thread! iolac._unloading(); } }.start(); new Thread("processing") { @Override public void run() { // 'this' is the executing thread! iolac._processing(); } }.start(); new Thread("loading") { @Override public void run() { // 'this' is the executing thread! iolac._loading(); } }.start(); } }
Exercise 1
The program looks pretty slow. This is confirmed in uncommenting
/* Exercise 1: */ /* private int _count = 0; */
and/* Exercise 1: */ /* System.out.println(_data.getFirst() + " processed..." + _count++); */
.In fact, the
processing
thread performs many useless changes (≈_count
) to the first element of_data
. This occurs because theunloading
thread is the most waiting thread: data (i.e., simple strings) can be unloaded (i.e., removed the_data
list) only when processed (i.e., changed to upper case). Thejava.util.concurrent.locks.ReentrantLock
class then comes with a “fairness” parameter in the constructor:true
means that the most waiting threads have priority. By uncommenting/* Exercise 1: */ /* true */
, the program goes faster!
java.util.concurrent.ExecutorService
From Java 5, the
java.util.concurrent.ExecutorService
interface aims at making more intuitive the use of threads. From Java 7,java.util.concurrent.ForkJoinPool
☛ as subtype ofjava.util.concurrent.ExecutorService
enhances synchronization capabilities: difference is in particular discussed ☛.Example (Carrefour_Items.Java.zip )
// File system "Observer" pattern (from Java 7): java.nio.file.WatchService ws = java.nio.file.FileSystems.getDefault().newWatchService(); /** 'java.nio.file.Path' interface extends 'java.nio.file.Watchable' interface */ // 'database' directory is observed by a daemon for any file creation: java.nio.file.FileSystems.getDefault().getPath("database").register(ws, java.nio.file.StandardWatchEventKinds.ENTRY_CREATE); … // Watch service waits for just created file in 'database': final java.util.concurrent.Callable<String> c = () -> { // Override 'call' method... // 'take' is a blocking statement... java.nio.file.WatchEvent<?> we = ws.take().pollEvents().get(0); return ((java.nio.file.WatchEvent<java.nio.file.Path>) we).context().toString(); // Return file name of just created file... }; // Task 'c' is executed as a thread to prevent blocking: java.util.concurrent.Future<String> f = java.util.concurrent.Executors.newSingleThreadExecutor().submit(c); try { // Simply get result: String file_name_of_just_created_file = f.get(); } catch (java.util.concurrent.ExecutionException | InterruptedException eeie) { … }
Temporal issues
java.util.concurrent.ScheduledExecutorService
derives fromjava.util.concurrent.ExecutorService
. It is an utility to schedule and run (multiple) jobs periodically or at specific moments. It mainly aims at avoiding the use of the “legacy”java.util.Timer
andjava.util.TimerTask
classes. The pool size establishes the number of jobs at creation time.Example
// Configure data emission to a server: private final java.util.concurrent.ScheduledExecutorService _to_send_service = // Only one job: java.util.concurrent.Executors.newScheduledThreadPool(1); private java.util.concurrent.ScheduledFuture<?> _to_send = null; … _to_send = _to_send_service.scheduleAtFixedRate(() -> { this._send(); }, _animation_frequency, _animation_frequency, java.util.concurrent.TimeUnit.MILLISECONDS); … private void _reschedule() { // value of '_animation_frequency' has changed... _to_send.cancel(false); // 'false' means that any in-progress task is not interrupted... // assert (_to_send.isDone()); _to_send = _to_send_service.scheduleAtFixedRate(() -> { this._send(); }, _animation_frequency, _animation_frequency, java.util.concurrent.TimeUnit.MILLISECONDS); } … _to_send_service.shutdown(); // Soft stop (in-progress tasks complete) compared to 'shutdownNow'
WebSockets is a full-duplex technology in Web programming. One may write a Java WebSockets server from scratch ☛ or reuse libraries like Java-Websocket, Tyrus (Java reference implementation), or Vert.x.
Scenario(s)
A minimalist JavaScript program connects to a WebSockets server. This lightweight server is written in Java and obeys the WebSockets protocol and API (here…).
Example (access to the WebSockets server)
// Tested with Tyrus 1.17 WebSockets Java library const service = new WebSocket("ws://localhost:1963/FranckBarbier/WebSockets_illustration"); service.onmessage = (event: MessageEvent) => { console.log("Message from Java: " + event.data); }; service.onopen = (event: Event) => { console.log("service.onopen..."); const response = window.confirm(service.url + " just opened... Say 'Hi!'?"); if (response) service.send(JSON.stringify({Response: "Hi!"})); }; service.onclose = (event: CloseEvent) => { console.log("service.onclose... " + event.code); window.alert("Bye! See you later..."); // '1011': the server is terminating the connection because it encountered an unexpected condition that prevented it from fulfilling the request. }; service.onerror = (event: Event) => { window.alert("service.onerror..."); };
Example WebSockets_Tyrus_1_17.Java.zip (WebSockets server creation and start plus simple client)
public class WebSockets_illustration { // Danger: 'My_ServerEndpoint' constructor must be accessed by the WebSockets server. Don't forget 'static'! @javax.websocket.server.ServerEndpoint(value = "/WebSockets_illustration") public static class My_ServerEndpoint { @javax.websocket.OnClose public void onClose(javax.websocket.Session session, javax.websocket.CloseReason close_reason) { System.out.println("onClose: " + close_reason.getReasonPhrase()); } @javax.websocket.OnError public void onError(javax.websocket.Session session, Throwable throwable) { System.out.println("onError: " + throwable.getMessage()); } @javax.websocket.OnMessage public void onMessage(javax.websocket.Session session, String message) { System.out.println("Message from JavaScript: " + message); } @javax.websocket.OnOpen public void onOpen(javax.websocket.Session session, javax.websocket.EndpointConfig ec) throws java.io.IOException { System.out.println("OnOpen... " + ec.getUserProperties().get("Author")); session.getBasicRemote().sendText("{Handshaking: \"Yes\"}"); } } public static void main(String[] args) { java.util.Map<String, Object> user_properties = new java.util.HashMap(); user_properties.put("Author", "FranckBarbier"); org.glassfish.tyrus.server.Server server = new org.glassfish.tyrus.server.Server("localhost", 1963, "/FranckBarbier", user_properties /* or 'null' */, My_ServerEndpoint.class); try { server.start(); System.out.println("\n*** Please press any key to stop the server... ***\n"); // The Web page (JavaScript client) is launched from Java: java.awt.Desktop.getDesktop().browse(java.nio.file.FileSystems.getDefault().getPath("web" + java.io.File.separatorChar + "index.html").toUri()); // The Java 11 client is launched as well: Java_11_client client = new Java_11_client(java.util.Optional.of(Java_11_client.class.getSimpleName())); java.io.BufferedReader reader = new java.io.BufferedReader(new java.io.InputStreamReader(System.in)); reader.readLine(); System.out.println("\n@@@\n" + client.get_log() + "@@@\n"); } catch (Exception e) { e.printStackTrace(); } finally { server.stop(); } } }
Rule(s)
- The use of the
wss
protocol imposes the use of certificates. For instance, on macOS:Utilities > Keychain Access > Certificate Assistant > Create a Certificate
. KeyTool that comes with the Java Runtime Environment (JRE) provides an alternative to Operating System -OS- support: here…- The revised
HttpClient
API in Java 11 also supports the manipulation of asynchronous (non-blocking) WebSockets.Example WebSockets_Tyrus_1_17.Java.Maven.zip
public class Java_11_client implements java.net.http.WebSocket.Listener { private String _log = ""; public String get_log() { return _log; } @Override public void onOpen(java.net.http.WebSocket ws) { _log += Java_11_client.class.getSimpleName() + " CONNECTED..." + '\n'; java.net.http.WebSocket.Listener.super.onOpen(ws); } @Override public void onError(java.net.http.WebSocket ws, Throwable error) { _log += "\tonError: " + error.getMessage() + '\n'; java.net.http.WebSocket.Listener.super.onError(ws, error); } // WebSockets server (non-blocking) callback: @Override public java.util.concurrent.CompletionStage> onText(java.net.http.WebSocket ws, CharSequence data, boolean last) { _log += "\tonText: " + data + '\n'; return java.net.http.WebSocket.Listener.super.onText(ws, data, last); } // WebSockets server (non-blocking) callback: @Override public java.util.concurrent.CompletionStage> onPong(java.net.http.WebSocket ws, java.nio.ByteBuffer message) { _log += "\tonPong: " + new String(message.array()) + "Yes, it does..." + '\n'; return java.net.http.WebSocket.Listener.super.onPong(ws, message); } public Java_11_client(java.util.Optional<String> o) { java.net.http.HttpClient hc = java.net.http.HttpClient.newBuilder().build(); java.net.http.WebSocket.Builder b = hc.newWebSocketBuilder(); java.net.http.WebSocket ws = b.buildAsync(java.net.URI.create("ws://localhost:1963/FranckBarbier/WebSockets_illustration"), this).join(); // Test it: ws.sendPing(java.nio.ByteBuffer.wrap("Does 'ping' succeed? ".getBytes())); // Streaming: ws.sendText("Java 11 ", false); // 'false' means that the text is not yet finished... ws.sendText("client", true); try { Thread.sleep(5000); // Wait a bit before closing... } catch (InterruptedException ie) { ie.printStackTrace(); } ws.sendClose(java.net.http.WebSocket.NORMAL_CLOSURE, Java_11_client.class.getSimpleName() + " DISCONNECTED..." + '\n').thenRun(() -> _log += Java_11_client.class.getSimpleName() + " DISCONNECTED..." + '\n'); } }
Web services consumption has been revisited in Java 11 based on an asynchronous mode ☛ Rule(s)
- Requests responses are instances of the
java.util.concurrent.CompletableFuture<java.net.http.HttpResponse<X>>
class;X
has to be substituted by the (desired) effective type of the returned data for processing (e.g.,java.io.InputStream
in the following example). Note thatjava.util.concurrent.CompletableFuture<T>
is similar toPromise<T>
in TypeScript.Example
private void _get_items__(java.time.Duration time_out) { // https://dzone.com/articles/java-11-standardized-http-client-api assert (!time_out.isNegative() && !time_out.isZero()); if (_json_query != null && _json_result == null) { _get_items_elapsed_time = System.currentTimeMillis(); var client = java.net.http.HttpClient.newBuilder().build(); var request = java.net.http.HttpRequest.newBuilder(java.net.URI.create(_url)) .POST(java.net.http.HttpRequest.BodyPublishers.ofString(_json_query)) .headers("accept", "application/json", "content-type", "application/json; charset=UTF-8", "x-ibm-client-id", _x_ibm_client_id, "x-ibm-client-secret", _x_ibm_client_secret) .timeout(time_out) .build(); java.util.concurrent.CompletableFuture<java.net.http.HttpResponse<java.io.InputStream>> response = client.sendAsync(request, java.net.http.HttpResponse.BodyHandlers.ofInputStream()); try { // 'get' waits for result: var is = response.get().body(); _result_as_pretty_JSON(is); // Versus '_result_as_raw_JSON(is);' } catch (java.util.concurrent.ExecutionException | InterruptedException ee_ie) { _json_result = javax.json.Json.createObjectBuilder().add(_ERROR, ee_ie.getMessage()).build().toString(); System.err.println(ee_ie.getClass().getSimpleName() + ": " + ee_ie.getMessage()); } finally { _get_items_elapsed_time = System.currentTimeMillis() - _get_items_elapsed_time; } } }