Working with Streams and Lambda expressions
- Implement functional interfaces using lambda expressions, including interfaces from the
java.util.functionpackage- Use Java Streams to filter, transform and process data
- Perform decomposition and reduction, including grouping and partitioning on sequential and parallel streams
- Functional interfaces and lambda expressions
- Basic principle of streams
- Streams and lambda expressions
- Streams for filtering
- Data processing: map, reduce…
- Technical issues: non-reusability…
- Streams and parallelism
- Streams enhancements from Java 9
Functional interfaces Rule(s)
- Functional interfaces have only one (abstract) method. In the Java library, functional interfaces are annotated with
@java.lang.FunctionalInterfaceand are intimately associated with lambda expressions.java.lang.Runnableis a functional interface. It has a singlerunmethod that requires a (deferred) body through an inheriting class. Threads require runnable objects as arguments for later executing (based onstart)runin split threads of control. To avoid the creation of such a class (that involves the creation of an associated.javafile) for only providing the body ofrun, a lambda expression is useful.Example Lambda_expression.Java.zip
![]()
public class Functional_interface_test { public static void main(String[] args) { // No reference to the 'run' method is required since 'java.lang.Runnable' only owns this single method: Runnable to_do = () -> { try { Thread.sleep(1000L); // Nothing to do for 1 sec. <=> a kind of delay! } catch (InterruptedException ie) { java.util.logging.Logger.getLogger("Why it doesn't work?").log(java.util.logging.Level.SEVERE, null, ie); } }; new Thread(to_do).start(); …Expressiveness
Rule(s)
- While syntax of a lambda expressions is compact, it may decomposed starting from the
@FunctionalInterfaceannotation.Example Lambda_expression.Java.zip
![]()
@FunctionalInterface public interface My_functional_interface { void my_print(Object o); } … My_functional_interface mfi = (o) -> System.out.println(o.toString()); // Java 7, which is deeply inspired by JavaScript! mfi.my_print("This is Java 7 style..."); … My_functional_interface mfi2 = System.out::println; // This is called "method reference" mfi2.my_print("But Java 8 goes beyond...");Example Heap.Java.zip
![]()
java.util.Comparator<Short> comparator = (Short i, Short j) -> { return i - j; };Example (instead of…)
java.util.Comparator<Short> comparator = new java.util.Comparator<>() { @Override public int compare(Short i, Short j) { if (i < j) { return -1; } if (i == j) { return 0; } // if (i > j) { // return 1; // } return 1; // Mandatory without 'if' } };Limitation
Example (more than one method prevents the utilization of a lambda expression)
// @FunctionalInterface -> compilation error because of *TWO* methods! public interface A_given_interface { void a_given_method(); void another_given_method(); } … /* A_given_interface get_one() { return new A_given_interface() { @Override public void a_given_method() { throw new UnsupportedOperationException("Don't be lazy! Please define body..."); } @Override public void another_given_method() { throw new UnsupportedOperationException("Don't be lazy! Please define body..."); } }; } */In conjunction with streams…
Rule(s)
- Lambda expressions are natural counterparts of stream programming.
Example
java.util.List<Integer> positives = java.util.Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12); java.util.stream.Stream<Integer> stream = positives.stream().filter(p -> p > 1 && java.util.stream.IntStream.rangeClosed(2, (int) Math.sqrt(p)).noneMatch(i -> (p % i == 0))); java.util.List<Integer> primes = stream.collect(java.util.stream.Collectors.toList()); primes.forEach(p -> System.out.print(" " + p)); // '2 3 5 7 11' is displayed...
Rule(s)
- Java streams have been introduced in Java 8 with amendments later on.
- Streams support transparent data processing, especially “big” data sets. Streams are logically related data that a priori cannot, compared to collections, stay in the computer memory as a whole (i.e., “streams are lazily constructed”). Differences between collections and streams are discussed ☛.
- Streams entail the idea of parallel processing without the explicit (complicated) use of threads. Transparency then comes from facilities within the Java stream API that hides multithreading.
- Typically, primitive array concatenation ☛ may be advantageously replaced by stream usage.
Example (use of method reference:
::mechanism) Primitive_array_concatenation.Java.zip![]()
public class Primitive_array_concatenation { private final static String[] _Author = {"Franck", "Barbier"}; // Delimited word: [^\w] <=> [^a-zA-Z_0-9] private final static java.util.regex.Pattern _Word = java.util.regex.Pattern.compile("[^\\w]"); public static void main(String[] args) { String string = "Primitive array concatenation from"; String[] words = _Word.split(string); String[] concatenation = java.util.stream.Stream.of(words, _Author).flatMap(java.util.stream.Stream::of).toArray(String[]::new); // 'of': stream of two arrays // 'flatMap': stream of the two arrays' elements } }Rule(s)
- The move from collections to streams is straightforward.
java.util.stream.Stream<String> s = java.util.Arrays.asList("Franck", "Sophie", "Oscar", "Léna", "Joseph").stream();
Streams work together with lambda expressions for code leanness Rule(s)
java.util.function.Function<T, R>(a Java functional interface) is the type of the unique parameter of themapmethod for streams. Themapmethod returns a stream ofRelements fromTelements. Two examples below without, and later with, lambda expressions show two equivalent forms of instantiatingjava.util.function.Function<T, R>.Example (without lambda expression)
final javax.json.stream.JsonParser.Event event = parser.next(); assert (event == javax.json.stream.JsonParser.Event.START_ARRAY); // The JSON parser points to a JSON array // For test only: // parser.getArrayStream().forEach(System.out::println); // Stream is no longer usable afterwards! java.util.stream.Stream<javax.json.JsonObject> stream = parser.getArrayStream() .map(new java.util.function.Function<javax.json.JsonValue, javax.json.JsonObject>() { @Override public javax.json.JsonObject apply(javax.json.JsonValue json_value) { // '(javax.json.JsonObject) json_value' <=> 'json_value.asJsonObject()' return (javax.json.JsonObject) json_value; // Be sure that array elements are actually JSON objects! } });Example (with lambda expression) Carrefour_Items.Java.zip
java.util.stream.Stream<javax.json.JsonObject> stream = parser.getArrayStream() // '(javax.json.JsonObject) json_value' <=> 'json_value.asJsonObject()' .map(json_value -> (javax.json.JsonObject) json_value); // Be sure that array elements are actually JSON objects!Rule(s)
- Streams offer a high-end interoperability with collections (and vice-versa). Specifically,
java.util.Collections(on the “collections” side) andjava.util.stream.Collectors(on the “streams” side) are utility classes that foster this interoperability.Example Horner.Java.zip
public class Polynomial { private java.util.Map<Integer, Double> _polynomial = new java.util.HashMap<>(); Polynomial() { _polynomial.put(1, -12.D); _polynomial.put(39, 8.D); } public double horner_method(final double x) { java.util.List<Integer> x_powers = _polynomial.keySet().stream().collect(java.util.stream.Collectors.toList()); java.util.Collections.sort(x_powers, java.util.Collections.reverseOrder()); … } }
Rule(s)
- Web programming (HTTP, HTTPS, WebSockets…) in essence is the processing of incoming data, for instance, incoming data filtering. The following example shows how a message receiver identifies the message sender's session within multiple WebSockets sessions, i.e., multiple connected participants.
Example (
filter,findAnyand,findFirst)@javax.websocket.OnMessage public void onMessage(javax.websocket.Session sender, String message) throws java.io.IOException { // Get the sender session among the opened sessions: java.util.Optional<javax.websocket.Session> sender_session = sender.getOpenSessions().stream() .filter(session -> sender.getId() == session.getId() && !session.getPathParameters().values().isEmpty()) .findAny(); // A little bit stupid, but for illustration only... /* Test */ assert (sender_session.isPresent()); /* End of test */ // Not empty by construction, see filter above: String sender_name = sender_session.get().getPathParameters().values().iterator().next(); // Get peers and select the (only) one (Java <-> JavaScript) that has to receive this in-progress message: for (java.util.Iterator<javax.websocket.Session> i = sender.getOpenSessions().iterator(); i.hasNext();) { javax.websocket.Session session = i.next(); if (sender.getId() != session.getId()) { // This prevents some echo... /* Test */ assert (!session.getPathParameters().values().isEmpty()); /* End of test */ java.util.Optional<String> receiver_name = session.getPathParameters().values().stream().findFirst(); /* Test */ assert (receiver_name.isPresent()); /* End of test */ if (sender_name.equals(receiver_name.get())) { session.getBasicRemote().sendText(message); break; } } } }
Scenario(s)
Carrefour.io (unfortunately, site is no longer operational) was the digital vision of the Carrefour French retailer. Carrefour.io offered the Items the Stores APIs. Queries about items (i.e., common consumer products) return JSON raw data that may require post-processing as in the following example:
private final static String _POULAIN = "{\"size\": 200,\"queries\": [{\"query\": \"poulain\",\"field\": \"barcodeDescription\"}]}";as JSON query expects at most 200 items whose ‘brand’ or ‘description’ (according to the API doc., ‘barcodeDescription’ implies search in both ‘brand’ and ‘description’) value field contains the"poulain"literal (search is not case-sensitive) which, for most people in France, matches to a chocolate brand with derivatives such as cocoa powder, etc. Around 100 items are returned.
private final static String _POULAIN_NOISETTE = "{\"queries\": [{\"query\": \"poula\",\"field\": \"barcodeDescription\"},{\"query\": \"noiset\",\"field\": \"barcodeDescription\"}]}";as JSON query returns less than 10 items due to the"noiset"literal (i.e., looking for chocolate with nuts) must also be present in the ‘brand’ or ‘description’ value field ("oiset"instead of"noiset"retrieves no item). More generally, the Carrefour.io Items API allows (cursory) filters that imply further filtering at local place (i.e., client program).Rule(s)
- Queries' post-processing is the possibility of dealing with returned data for further filtering. For queries retrieving big data sets, the use of Java streams aims at accelerating filtering, through parallel processing in particular.
- As an illustration, the following example simply gets a stream from raw JSON data (returned items from Carrefour.io Items API).
Example Carrefour_Items.Java.zip
try (javax.json.stream.JsonParser parser = javax.json.Json.createParser(new java.io.StringReader(_json_result))) { while (parser.hasNext()) { if (parser.next() == javax.json.stream.JsonParser.Event.START_ARRAY) { // The JSON parser points to a JSON array java.util.stream.Stream<javax.json.JsonValue> stream = parser.getArrayStream(); …
Rule(s)
- A stream cannot be reused. If so, an
IllegalStateExceptionobject is raised. Following example fails (lines 14-16) because, line 12,allMatch(as terminal operation) makes thestreamobject non-reusable.Example Carrefour_Items.Java.zip
public void check_format() { try (javax.json.stream.JsonParser parser = javax.json.Json.createParser(new java.io.StringReader(_json_result))) { while (parser.hasNext()) { // '_LIST' key is associated with a value being an array whose elements are the returned items (by default, only 20 are returned): if (parser.next() == javax.json.stream.JsonParser.Event.KEY_NAME && parser.getString().equals(_LIST)) { final javax.json.stream.JsonParser.Event event = parser.next(); assert (event == javax.json.stream.JsonParser.Event.START_ARRAY); java.util.stream.Stream<javax.json.JsonValue> stream = parser.getArrayStream(); // Check that all members are 'javax.json.JsonValue.ValueType.OBJECT': assert (stream .allMatch(json_value -> json_value.getValueType() == javax.json.JsonValue.ValueType.OBJECT)); // Check (based on map-reduce principle) that all members are 'javax.json.JsonValue.ValueType.OBJECT': java.util.Optional<Boolean> optional = stream .map(json_value -> json_value.getValueType() == javax.json.JsonValue.ValueType.OBJECT) .reduce((Boolean a, Boolean b) -> a & b); assert (optional.isPresent() && optional.get()); } } } }Rule(s)
- Streams lazy evaluation (principle ☛) entails the fact that intermediate operations (e.g.,
map) on streams are effectively executed at the time terminal operations (e.g.,reduce) occur. The distinction between intermediate and terminal operations is described ☛. Lazy evaluation benefit only results from big, possibly infinite, data streams. Issues about the way lazy evaluation may improve performance are addressed ☛.- Following example applies
filterintermediate operation at the time theforEachterminal operation is called.peekintermediate operation is only devoted to debugging (explanation ☛). So, givenjson_value1,json_value2…json_valueN, data is processed as follows:forEach(peek(filter(json_value1))),forEach(peek(filter(json_value2)))…forEach(peek(filter(json_valueN))). Parallel streams is then the ability of possibly assigning theseN(or less) pieces of work to threads.- Note that filtering is a key facility of streams. Following example extracts members that obey a predicate before searching “words” from each member's content. While
filterandpeekare based on a lambda expression,forEachis not (i.e., thejava.util.function.Consumer<T>functional interface is instantiated).Example Carrefour_Items.Java.zip
stream .filter(json_value -> json_value.getValueType() == javax.json.JsonValue.ValueType.OBJECT) .peek(json_value -> System.out.println("Filtered value: " + json_value.asJsonObject().toString())) .forEach(new java.util.function.Consumer<javax.json.JsonValue>() { @Override public void accept(javax.json.JsonValue json_value) { _search_items_(json_value.asJsonObject(), words); } });
Parallel computing and streams Rule(s)
- A stream may be made explicitly parallel, but the use of the
java.util.Spliteratorinterface is another way of dealing with parallelism in a more implicit way.Example (
streamobject is made parallel from theparallelBoolean variable at line 1)java.util.stream.Stream<javax.json.JsonValue> stream = parallel ? parser.getArrayStream().parallel() : parser.getArrayStream(); assert (stream.isParallel() || !parallel); stream // '(javax.json.JsonObject) json_value' <=> 'json_value.asJsonObject()': .map(json_value -> (javax.json.JsonObject) json_value) // Be sure that array elements are actually JSON objects! .forEach(new java.util.function.Consumer<javax.json.JsonObject>() { @Override public void accept(javax.json.JsonObject json_object) { _search_items_(json_object, words); } });Rule(s)
- The
java.util.Spliteratorinterface offers thetrySplitmethod in order to divide data processing into pieces of work that are (virtually) assignable to threads ⇒ parallelism. Following example recursively divides ajava.util.Spliteratorobject as much as possible.Example (
trySplit)private void _max_splitting(java.util.Set<java.util.Spliterator<javax.json.JsonObject>> split_iterators, java.util.Spliterator<javax.json.JsonObject> split_iterator) { // Line for test only: long split_iterator_estimateSize = split_iterator.estimateSize(); java.util.Spliterator<javax.json.JsonObject> splitting = split_iterator.trySplit(); if (splitting == null) { split_iterators.add(split_iterator); } else { // Line for test only: assert (split_iterator_estimateSize >= split_iterator.estimateSize() + splitting.estimateSize()); _max_splitting(split_iterators, split_iterator); _max_splitting(split_iterators, splitting); } }Rule(s)
- Once split, parallel data processing occurs through the
tryAdvanceorforEachRemainingmethods.Example
java.util.List<javax.json.JsonObject> list = parser.getArrayStream() .map(json_value -> json_value.asJsonObject()) .collect(java.util.stream.Collectors.toList()); // Reduction principle java.util.Spliterator<javax.json.JsonObject> root_split_iterator = list.spliterator(); assert (root_split_iterator.characteristics() == (java.util.Spliterator.ORDERED | java.util.Spliterator.SIZED | java.util.Spliterator.SUBSIZED)); // Alternative: // java.util.Spliterator<javax.json.JsonObject> root_split_iterator = java.util.Spliterators.spliterator(list, (java.util.Spliterator.IMMUTABLE | java.util.Spliterator.NONNULL | java.util.Spliterator.ORDERED | java.util.Spliterator.SIZED | java.util.Spliterator.SUBSIZED)); java.util.Set<java.util.Spliterator<javax.json.JsonObject>> split_iterators = new java.util.HashSet<>(); _max_splitting(split_iterators, root_split_iterator); // Versus '_min_splitting(split_iterators, root_split_iterator);' split_iterators.forEach((split_iterator) -> { split_iterator.forEachRemaining(json_object -> _search_items_(json_object, words)); // Alternative: // boolean result; // do { // result = split_iterator.tryAdvance(json_object -> _search_items_(json_object, words)); // } while (result); });
Scenario(s)
The retrieval of 9.500 (requested size) items among (around) 20.000 items matching the query is followed by the application of the same (local) filter. This filtering is called 500 times with the same method in order to determine an average elapsed time. The No stream method filtering is realized with the help of
filter_itemswhilefilter_items_supports both the Sequential stream and Parallel stream methods ('parallel' argument equals tofalseortrue). Thejava.util.Spliteratormethod is supported byfilter_items__.
Measures Method ⤳ No stream Sequential stream Parallel stream java.util.SpliteratorElapsed time ≈ 250 ms ≈ 250 ms ≈ 250 ms ≈ 275 ms Comments Common (end-to-end) JSON parsing Stream of JSON values (i.e., javax.json.JsonValue) from JSON parser:parser.getArrayStream()Stream of JSON values (i.e., javax.json.JsonValue) is switched to “parallel” as follows:parser.getArrayStream().parallel()Variations on trySplit) for minimum versus maximum splitting andcharacteristicsprovoke no effect on performance!Surprisingly, using
java.util.Spliteratorleads to a slight overhead while other methods lead to the same performance! However, Java performance benchmarking has to preferably be carried out with the tool described ☛
Rule(s)
- Streams benefit from slight API enhancements in Java 9 ☛.
Example From_Java_9.Java.zip
java.util.List<Integer> positives = java.util.Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12); java.util.stream.Stream<Integer> stream = positives.stream().filter(p -> p > 1 && java.util.stream.IntStream.rangeClosed(2, (int) Math.sqrt(p)).noneMatch(i -> (p % i == 0))); System.out.println(stream.takeWhile(p -> !p.equals(7)).collect(java.util.stream.Collectors.toList())); // '[2, 3, 5]' is displayed... java.util.stream.Stream<Integer> stream_ = positives.stream().filter(p -> p > 1 && java.util.stream.IntStream.rangeClosed(2, (int) Math.sqrt(p)).noneMatch(i -> (p % i == 0))); System.out.println(stream_.dropWhile(p -> !p.equals(7)).collect(java.util.stream.Collectors.toList())); // '[7, 11]' is displayed...Rule(s)
- Streams in Java 8 cannot have
nullvalues. TheofNullablestatic method in Java 9 improves the way of dealing withnull.Example From_Java_9.Java.zip
java.util.List<Object> objects = java.util.Arrays.asList(new Object(), null, new Object()); java.util.stream.Stream<Object> stream__ = objects.stream().flatMap(o -> java.util.stream.Stream.ofNullable(o)); System.out.println(stream__.count()); // '2' is displayed...
See also
Eugen Baeldung Web site has a rich section on Java streams ☛.