Parallel Streams in Java

In this short post i want to demonstrate how can u use parallel streams in java, what can you (gain/loose) from using them .

Parallel Streams

A way to look at it is a bunch of java streams each runs on a thread and they join their individual work together, all the work is usually done in the ForkJoinPool , they help dividing the problem into individual sub-problems that can be solved simultaneously, thus the whole job could be executed faster (it depends !).

Using Parallel Streams

let’s start with the simplest example of how to use parallel streams in java.
let’s imagine having a Stock class, and we want to sum all the change values, using a parallel stream it would look something like this:


List<Stock> stocks = getStockList();
stocks.parallelStream().map(Stock::getChange).reduce(0.0, (acc, val) -> acc + val);

using paralllelStream(), and stream() on collections is similar on the outside you just need to be careful of the common pitfalls:
    1. Using a stateful lambda expression
      do not use anything that modify external state inside a parallel stream, cause it may have unpredictable behaviour :


      List<Double> stockChanges = new ArrayList<>();
      stocks.paralllelStream().map(Stock::getChange).forEach(e -> stockChanges.add(e))

      This will simply invoke a concurrent modification exception, cause you are trying to update the state from multiple threads to demonstrate this :


      IntStream.range(0, 10).parallel().forEach(e -> {
      try {
      Thread.sleep(300);
      System.out.println("the current working thred " + Thread.currentThread().getName());
      } catch (InterruptedException ex) {
      ex.printStackTrace();
      }
      });

      The execution of the code may vary depending on your computer hardware, but it’ll look something like:
      the current working thred ForkJoinPool.commonPool-worker-4
      
      the current working thred ForkJoinPool.commonPool-worker-2
      
      the current working thred ForkJoinPool.commonPool-worker-3
      
      the current working thred ForkJoinPool.commonPool-worker-6
      
      the current working thred ForkJoinPool.commonPool-worker-5
      
      the current working thred ForkJoinPool.commonPool-worker-1
      
      the current working thred ForkJoinPool.commonPool-worker-7
    2.  The order of the collection is not conserved
      cause the stream is splited so we can run it in multiple threads, of course some may finish before others, and for that the resulting values may not be on the original order an example will be :


      IntStream.range(0, 100).parallel().map(e -> e * 3).forEach(System.out::println);

      view raw

      split.java

      hosted with ❤ by GitHub

      you’d expect the result to be

      0
      3
      6
      
      run the code, it may surprise you.
      the remedy for this is just to use the forEachOrdered which will guarantee the order remains the same
    3. Running parallel streams, is blocking

      this means that even if you run a parallel stream on multiple threads, the main calling thread will block until all the operations finishes.


      IntStream.range(0, 10).parallel().forEachOrdered(e -> {
      try {
      Thread.sleep(300);
      System.out.println("iteration " + e);
      } catch (InterruptedException e1) {
      e1.printStackTrace();
      }
      });
      System.out.println("finished");

      view raw

      parallel.java

      hosted with ❤ by GitHub

      this will produce
      iteration 0
      iteration 1
      ...
      iteration 8
      iteration 9
      finished
      

Do You Need Parallel Stream

Note that parallelism is not automatically faster than performing operations serially, although it can be if you have enough data and processor cores. While aggregate operations enable you to more easily implement parallelism, it is still your responsibility to determine if your application is suitable for parallelism.
if you do a simple test with simple data on your local computer, parallel streams might not prove to add that much of performance boost (it may be a performance drawback), to test this (correctly) we will use the jmh library, so we prepare the benchmarking test, comprising of two benchmarks, both of them will filter the list of odd values and multiply the rest by two than collect the data into a new list, the first stream will be a regular stream, and second will be a parallel stream.


import static java.util.stream.Collectors.toList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
public class StreamBenchmark {
@State(Scope.Benchmark)
public static class StreamBenchamarkState {
static Integer[] arr;
static {
arr = new Integer[100000];
for (int i = 0; i < arr.length; i++) {
arr[i] = i + 1;
}
}
}
@Benchmark
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public List<Integer> streamTest() {
return Arrays.stream(StreamBenchamarkState.arr)
.filter(e -> e % 2 == 0)
.map(e -> e * 2)
.collect(toList());
}
@Benchmark
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public List<Integer> parallelStreamTest() {
return Arrays.stream(StreamBenchamarkState.arr)
.parallel()
.filter(e -> e % 2 == 0)
.map(e -> e * 2)
.collect(toList());
}
}

No we will use jmh java api to run the test, we can do it from command line, refer to the docs for more info.https://gist.github.com/chermehdi/1d60b8a780fb6115b084b7e32984c471

we will observe the results

# JMH version: 1.21
# VM version: JDK 1.8.0_131, Java HotSpot(TM) 64-Bit Server VM, 25.131-b11
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/bin/java
# VM options: 
# Warmup: 5 iterations, 10 s each
# Measurement: 5 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: org.sample.StreamBenchmark.parallelStreamTest

# Run progress: 0.00% complete, ETA 00:03:20
# Fork: 1 of 1
# Warmup Iteration   1: 2.498 ops/ms
# Warmup Iteration   2: 2.647 ops/ms
# Warmup Iteration   3: 2.435 ops/ms
# Warmup Iteration   4: 2.376 ops/ms
# Warmup Iteration   5: 2.280 ops/ms
Iteration   1: 2.398 ops/ms
Iteration   2: 2.570 ops/ms
Iteration   3: 2.510 ops/ms
Iteration   4: 2.480 ops/ms
Iteration   5: 2.533 ops/ms


Result "org.sample.StreamBenchmark.parallelStreamTest":
  2.498 ±(99.9%) 0.250 ops/ms [Average]
  (min, avg, max) = (2.398, 2.498, 2.570), stdev = 0.065
  CI (99.9%): [2.248, 2.748] (assumes normal distribution)


# JMH version: 1.21
# VM version: JDK 1.8.0_131, Java HotSpot(TM) 64-Bit Server VM, 25.131-b11
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/bin/java
# VM options: 
# Warmup: 5 iterations, 10 s each
# Measurement: 5 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: org.sample.StreamBenchmark.streamTest

# Run progress: 50.00% complete, ETA 00:01:45
# Fork: 1 of 1
# Warmup Iteration   1: 1.074 ops/ms
# Warmup Iteration   2: 1.220 ops/ms
# Warmup Iteration   3: 1.203 ops/ms
# Warmup Iteration   4: 1.235 ops/ms
# Warmup Iteration   5: 1.212 ops/ms
Iteration   1: 1.252 ops/ms
Iteration   2: 1.191 ops/ms
Iteration   3: 1.132 ops/ms
Iteration   4: 1.095 ops/ms
Iteration   5: 1.219 ops/ms


Result "org.sample.StreamBenchmark.streamTest":
  1.178 ±(99.9%) 0.246 ops/ms [Average]
  (min, avg, max) = (1.095, 1.178, 1.252), stdev = 0.064
  CI (99.9%): [0.932, 1.424] (assumes normal distribution)


# Run complete. Total time: 00:03:31

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark                            Mode  Cnt  Score   Error   Units
StreamBenchmark.parallelStreamTest  thrpt    5  2.498 ± 0.250  ops/ms
StreamBenchmark.streamTest          thrpt    5  1.178 ± 0.246  ops/ms
we see that the parallelStream proved to be twice as fast as the normal stream on my machine
and we can see a more performance improvements on better dedicated servers.

Leave a comment