Java Streams and Lambda expressions



Java SE 11 Developer certification (exam number 1Z0-819) here

Working with Streams and Lambda expressions

Headlines
Functional interfaces

Rule(s)

Example Lambda_expression.Java.zip 

public class Functional_interface_test {
    public static void main(String[] args) {
        // No reference to the 'run' method is required since 'java.lang.Runnable' only owns this single method:
        Runnable to_do = () -> { 
            try {
                Thread.sleep(1000L); // Nothing to do for 1 sec. <=> a kind of delay!
            } catch (InterruptedException ie) {
                java.util.logging.Logger.getLogger("Why it doesn't work?").log(java.util.logging.Level.SEVERE, null, ie);
            }
        };
        new Thread(to_do).start();
    …

Expressiveness

Rule(s)

Example Lambda_expression.Java.zip 

@FunctionalInterface
public interface My_functional_interface {
    void my_print(Object o);
}
…
My_functional_interface mfi = (o) -> System.out.println(o.toString()); // Java 7, which is deeply inspired by JavaScript!
mfi.my_print("This is Java 7 style...");
…
My_functional_interface mfi2 = System.out::println; // This is called "method reference"
mfi2.my_print("But Java 8 goes beyond...");

Example Heap.Java.zip 

java.util.Comparator<Short> comparator = (Short i, Short j) -> {
    return i - j;
};

Example (instead of…)

java.util.Comparator<Short> comparator = new java.util.Comparator<>() {
    @Override
    public int compare(Short i, Short j) {
        if (i < j) {
            return -1;
        }
        if (i == j) {
            return 0;
        }
//                if (i > j) {
//                    return 1;
//                }
        return 1; // Mandatory without 'if'
    }
};

Limitation

Example (more than one method prevents the utilization of a lambda expression)

// @FunctionalInterface -> compilation error because of *TWO* methods!
public interface A_given_interface {
    void a_given_method();
    void another_given_method();
}
…
/* A_given_interface get_one() {
    return new A_given_interface() {
        @Override
        public void a_given_method() {
            throw new UnsupportedOperationException("Don't be lazy! Please define body...");
        }
        @Override
        public void another_given_method() {
            throw new UnsupportedOperationException("Don't be lazy! Please define body...");
        }
    };
} */

In conjunction with streams

Rule(s)

Example

java.util.List<Integer> positives = java.util.Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
java.util.stream.Stream<Integer> stream = positives.stream().filter(p -> p > 1 && java.util.stream.IntStream.rangeClosed(2, (int) Math.sqrt(p)).noneMatch(i -> (p % i == 0)));
java.util.List<Integer> primes = stream.collect(java.util.stream.Collectors.toList());
primes.forEach(p -> System.out.print(" " + p)); // '2 3 5 7 11' is displayed...
Basic principle of streams

Rule(s)

Example (use of method reference: :: mechanism) Primitive_array_concatenation.Java.zip 

public class Primitive_array_concatenation {

    private final static String[] _Author = {"Franck", "Barbier"};
    // Delimited word: [^\w] <=> [^a-zA-Z_0-9]
    private final static java.util.regex.Pattern _Word = java.util.regex.Pattern.compile("[^\\w]");

    public static void main(String[] args) {
        String string = "Primitive array concatenation from";

        String[] words = _Word.split(string);

        String[] concatenation = java.util.stream.Stream.of(words, _Author).flatMap(java.util.stream.Stream::of).toArray(String[]::new);
// 'of': stream of two arrays
// 'flatMap': stream of the two arrays' elements
    }
}

Rule(s)

java.util.stream.Stream<String> s = java.util.Arrays.asList("Franck", "Sophie", "Oscar", "Léna", "Joseph").stream();
Streams work together with lambda expressions for code leanness

Rule(s)

Example (without lambda expression)

final javax.json.stream.JsonParser.Event event = parser.next();
assert (event == javax.json.stream.JsonParser.Event.START_ARRAY); // The JSON parser points to a JSON array
// For test only:
// parser.getArrayStream().forEach(System.out::println); // Stream is no longer usable afterwards!
java.util.stream.Stream<javax.json.JsonObject> stream = parser.getArrayStream()
    .map(new java.util.function.Function<javax.json.JsonValue, javax.json.JsonObject>() {
        @Override
        public javax.json.JsonObject apply(javax.json.JsonValue json_value) {
            // '(javax.json.JsonObject) json_value' <=> 'json_value.asJsonObject()'
            return (javax.json.JsonObject) json_value; // Be sure that array elements are actually JSON objects!
        }
    });

Example (with lambda expression) Carrefour_Items.Java.zip 

java.util.stream.Stream<javax.json.JsonObject> stream = parser.getArrayStream()
// '(javax.json.JsonObject) json_value' <=> 'json_value.asJsonObject()'
    .map(json_value -> (javax.json.JsonObject) json_value); // Be sure that array elements are actually JSON objects!

Rule(s)

Example Horner.Java.zip 

public class Polynomial {
    private java.util.Map<Integer, Double> _polynomial = new java.util.HashMap<>();
    Polynomial() {
        _polynomial.put(1, -12.D);
        _polynomial.put(39, 8.D);
    }
    public double horner_method(final double x) {
        java.util.List<Integer> x_powers = _polynomial.keySet().stream().collect(java.util.stream.Collectors.toList());
        java.util.Collections.sort(x_powers, java.util.Collections.reverseOrder());
        …
    }
}
Streams for filtering

Rule(s)

Example (filter, findAny and, findFirst)

@javax.websocket.OnMessage
public void onMessage(javax.websocket.Session sender, String message) throws java.io.IOException {
    // Get the sender session among the opened sessions:
    java.util.Optional<javax.websocket.Session> sender_session = sender.getOpenSessions().stream()
        .filter(session -> sender.getId() == session.getId() && !session.getPathParameters().values().isEmpty())
        .findAny(); // A little bit stupid, but for illustration only...
    /* Test */ assert (sender_session.isPresent()); /* End of test */
    // Not empty by construction, see filter above:
    String sender_name = sender_session.get().getPathParameters().values().iterator().next();
    // Get peers and select the (only) one (Java <-> JavaScript) that has to receive this in-progress message:
    for (java.util.Iterator<javax.websocket.Session> i = sender.getOpenSessions().iterator(); i.hasNext();) {
        javax.websocket.Session session = i.next();
        if (sender.getId() != session.getId()) { // This prevents some echo...
            /* Test */ assert (!session.getPathParameters().values().isEmpty()); /* End of test */
            java.util.Optional<String> receiver_name = session.getPathParameters().values().stream().findFirst();
            /* Test */ assert (receiver_name.isPresent()); /* End of test */
            if (sender_name.equals(receiver_name.get())) {
                session.getBasicRemote().sendText(message);
                break;
            }
        }
    }
}
Streams for big data sets

Scenario(s)

Carrefour.io (unfortunately, site is no longer operational) was the digital vision of the Carrefour French retailer. Carrefour.io offered the Items the Stores APIs. Queries about items (i.e., common consumer products) return JSON raw data that may require post-processing as in the following example:

private final static String _POULAIN = "{\"size\": 200,\"queries\": [{\"query\": \"poulain\",\"field\": \"barcodeDescription\"}]}"; as JSON query expects at most 200 items whose ‘brand’ or ‘description’ (according to the API doc., ‘barcodeDescription’ implies search in both ‘brand’ and ‘description’) value field contains the "poulain" literal (search is not case-sensitive) which, for most people in France, matches to a chocolate brand with derivatives such as cocoa powder, etc. Around 100 items are returned.

private final static String _POULAIN_NOISETTE = "{\"queries\": [{\"query\": \"poula\",\"field\": \"barcodeDescription\"},{\"query\": \"noiset\",\"field\": \"barcodeDescription\"}]}"; as JSON query returns less than 10 items due to the "noiset" literal (i.e., looking for chocolate with nuts) must also be present in the ‘brand’ or ‘description’ value field ("oiset" instead of "noiset" retrieves no item). More generally, the Carrefour.io Items API allows (cursory) filters that imply further filtering at local place (i.e., client program).

Rule(s)

Example Carrefour_Items.Java.zip 

try (javax.json.stream.JsonParser parser = javax.json.Json.createParser(new java.io.StringReader(_json_result))) {
    while (parser.hasNext()) {
        if (parser.next() == javax.json.stream.JsonParser.Event.START_ARRAY) { // The JSON parser points to a JSON array
            java.util.stream.Stream<javax.json.JsonValue> stream = parser.getArrayStream();
…
Streams technical issues

Rule(s)

Example Carrefour_Items.Java.zip 

public void check_format() {
    try (javax.json.stream.JsonParser parser = javax.json.Json.createParser(new java.io.StringReader(_json_result))) {
        while (parser.hasNext()) {
            // '_LIST' key is associated with a value being an array whose elements are the returned items (by default, only 20 are returned):
            if (parser.next() == javax.json.stream.JsonParser.Event.KEY_NAME && parser.getString().equals(_LIST)) {
                final javax.json.stream.JsonParser.Event event = parser.next();
                assert (event == javax.json.stream.JsonParser.Event.START_ARRAY);

                java.util.stream.Stream<javax.json.JsonValue> stream = parser.getArrayStream();
// Check that all members are 'javax.json.JsonValue.ValueType.OBJECT':
                assert (stream
                    .allMatch(json_value -> json_value.getValueType() == javax.json.JsonValue.ValueType.OBJECT));               
// Check (based on map-reduce principle) that all members are 'javax.json.JsonValue.ValueType.OBJECT':
                java.util.Optional<Boolean> optional = stream
                    .map(json_value -> json_value.getValueType() == javax.json.JsonValue.ValueType.OBJECT)
                    .reduce((Boolean a, Boolean b) -> a & b);
                assert (optional.isPresent() && optional.get());
            }
        }
    }
}

Rule(s)

Example Carrefour_Items.Java.zip 

stream
    .filter(json_value -> json_value.getValueType() == javax.json.JsonValue.ValueType.OBJECT)
    .peek(json_value -> System.out.println("Filtered value: " + json_value.asJsonObject().toString()))
    .forEach(new java.util.function.Consumer<javax.json.JsonValue>() {
        @Override
        public void accept(javax.json.JsonValue json_value) {
            _search_items_(json_value.asJsonObject(), words);
        }
    });
Parallel computing and streams

Rule(s)

Example (stream object is made parallel from the parallel Boolean variable at line 1)

java.util.stream.Stream<javax.json.JsonValue> stream = parallel ? parser.getArrayStream().parallel() : parser.getArrayStream();
assert (stream.isParallel() || !parallel);
stream
// '(javax.json.JsonObject) json_value' <=> 'json_value.asJsonObject()':
    .map(json_value -> (javax.json.JsonObject) json_value) // Be sure that array elements are actually JSON objects!
    .forEach(new java.util.function.Consumer<javax.json.JsonObject>() {
        @Override
        public void accept(javax.json.JsonObject json_object) {
            _search_items_(json_object, words);
        }
    });

Rule(s)

Example (trySplit)

private void _max_splitting(java.util.Set<java.util.Spliterator<javax.json.JsonObject>> split_iterators, java.util.Spliterator<javax.json.JsonObject> split_iterator) {
    // Line for test only:
    long split_iterator_estimateSize = split_iterator.estimateSize();
    java.util.Spliterator<javax.json.JsonObject> splitting = split_iterator.trySplit();
    if (splitting == null) {
        split_iterators.add(split_iterator);
    } else {
        // Line for test only:
        assert (split_iterator_estimateSize >= split_iterator.estimateSize() + splitting.estimateSize());
        _max_splitting(split_iterators, split_iterator);
        _max_splitting(split_iterators, splitting);
    }
}

Rule(s)

Example

java.util.List<javax.json.JsonObject> list = parser.getArrayStream()
    .map(json_value -> json_value.asJsonObject())
    .collect(java.util.stream.Collectors.toList()); // Reduction principle
java.util.Spliterator<javax.json.JsonObject> root_split_iterator = list.spliterator();
assert (root_split_iterator.characteristics() == (java.util.Spliterator.ORDERED | java.util.Spliterator.SIZED | java.util.Spliterator.SUBSIZED));
// Alternative:   
// java.util.Spliterator<javax.json.JsonObject> root_split_iterator = java.util.Spliterators.spliterator(list, (java.util.Spliterator.IMMUTABLE | java.util.Spliterator.NONNULL | java.util.Spliterator.ORDERED | java.util.Spliterator.SIZED | java.util.Spliterator.SUBSIZED));

java.util.Set<java.util.Spliterator<javax.json.JsonObject>> split_iterators = new java.util.HashSet<>();
_max_splitting(split_iterators, root_split_iterator); // Versus '_min_splitting(split_iterators, root_split_iterator);'
split_iterators.forEach((split_iterator) -> {
    split_iterator.forEachRemaining(json_object -> _search_items_(json_object, words));
    // Alternative:                        
    // boolean result;
    // do {
        // result = split_iterator.tryAdvance(json_object -> _search_items_(json_object, words));
    // } while (result);
});
Streams performance

Scenario(s)

The retrieval of 9.500 (requested size) items among (around) 20.000 items matching the query is followed by the application of the same (local) filter. This filtering is called 500 times with the same method in order to determine an average elapsed time. The No stream method filtering is realized with the help of filter_items while filter_items_ supports both the Sequential stream and Parallel stream methods ('parallel' argument equals to false or true). The java.util.Spliterator method is supported by filter_items__.

Measures
Method ⤳ No stream Sequential stream Parallel stream java.util.Spliterator
Elapsed time ≈ 250 ms ≈ 250 ms ≈ 250 ms ≈ 275 ms
Comments Common (end-to-end) JSON parsing Stream of JSON values (i.e., javax.json.JsonValue) from JSON parser: parser.getArrayStream() Stream of JSON values (i.e., javax.json.JsonValue) is switched to “parallel” as follows: parser.getArrayStream().parallel() Variations on trySplit) for minimum versus maximum splitting and characteristics provoke no effect on performance!

Surprisingly, using java.util.Spliterator leads to a slight overhead while other methods lead to the same performance! However, Java performance benchmarking has to preferably be carried out with the tool described here

Streams enhancements from Java 9

Rule(s)

Example From_Java_9.Java.zip 

java.util.List<Integer> positives = java.util.Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
java.util.stream.Stream<Integer> stream = positives.stream().filter(p -> p > 1 && java.util.stream.IntStream.rangeClosed(2, (int) Math.sqrt(p)).noneMatch(i -> (p % i == 0)));
System.out.println(stream.takeWhile(p -> !p.equals(7)).collect(java.util.stream.Collectors.toList())); // '[2, 3, 5]' is displayed...
java.util.stream.Stream<Integer> stream_ = positives.stream().filter(p -> p > 1 && java.util.stream.IntStream.rangeClosed(2, (int) Math.sqrt(p)).noneMatch(i -> (p % i == 0)));
System.out.println(stream_.dropWhile(p -> !p.equals(7)).collect(java.util.stream.Collectors.toList())); // '[7, 11]' is displayed...

Rule(s)

Example From_Java_9.Java.zip 

java.util.List<Object> objects = java.util.Arrays.asList(new Object(), null, new Object());
java.util.stream.Stream<Object> stream__ = objects.stream().flatMap(o -> java.util.stream.Stream.ofNullable(o));
System.out.println(stream__.count()); // '2' is displayed...
More on streams

See also

Eugen Baeldung Web site has a rich section on Java streams here