In this short post i want to demonstrate how can u use parallel streams in java, what can you (gain/loose) from using them .
Parallel Streams
A way to look at it is a bunch of java streams each runs on a thread and they join their individual work together, all the work is usually done in the ForkJoinPool , they help dividing the problem into individual sub-problems that can be solved simultaneously, thus the whole job could be executed faster (it depends !).
Using Parallel Streams
let’s start with the simplest example of how to use parallel streams in java.
let’s imagine having a Stock class, and we want to sum all the change values, using a parallel stream it would look something like this:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
List<Stock> stocks = getStockList(); | |
stocks.parallelStream().map(Stock::getChange).reduce(0.0, (acc, val) -> acc + val); |
using paralllelStream(), and stream() on collections is similar on the outside you just need to be careful of the common pitfalls:
-
-
Using a stateful lambda expressiondo not use anything that modify external state inside a parallel stream, cause it may have unpredictable behaviour :
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
List<Double> stockChanges = new ArrayList<>(); stocks.paralllelStream().map(Stock::getChange).forEach(e -> stockChanges.add(e)) This will simply invoke a concurrent modification exception, cause you are trying to update the state from multiple threads to demonstrate this :
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
IntStream.range(0, 10).parallel().forEach(e -> { try { Thread.sleep(300); System.out.println("the current working thred " + Thread.currentThread().getName()); } catch (InterruptedException ex) { ex.printStackTrace(); } }); The execution of the code may vary depending on your computer hardware, but it’ll look something like:the current working thred ForkJoinPool.commonPool-worker-4 the current working thred ForkJoinPool.commonPool-worker-2 the current working thred ForkJoinPool.commonPool-worker-3 the current working thred ForkJoinPool.commonPool-worker-6 the current working thred ForkJoinPool.commonPool-worker-5 the current working thred ForkJoinPool.commonPool-worker-1 the current working thred ForkJoinPool.commonPool-worker-7
- The order of the collection is not conserved
cause the stream is splited so we can run it in multiple threads, of course some may finish before others, and for that the resulting values may not be on the original order an example will be :
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
IntStream.range(0, 100).parallel().map(e -> e * 3).forEach(System.out::println); you’d expect the result to be
0 3 6
run the code, it may surprise you.the remedy for this is just to use the forEachOrdered which will guarantee the order remains the same -
Running parallel streams, is blocking
this means that even if you run a parallel stream on multiple threads, the main calling thread will block until all the operations finishes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
IntStream.range(0, 10).parallel().forEachOrdered(e -> { try { Thread.sleep(300); System.out.println("iteration " + e); } catch (InterruptedException e1) { e1.printStackTrace(); } }); System.out.println("finished"); this will produceiteration 0 iteration 1 ... iteration 8 iteration 9 finished
-
Do You Need Parallel Stream
Note that parallelism is not automatically faster than performing operations serially, although it can be if you have enough data and processor cores. While aggregate operations enable you to more easily implement parallelism, it is still your responsibility to determine if your application is suitable for parallelism.
if you do a simple test with simple data on your local computer, parallel streams might not prove to add that much of performance boost (it may be a performance drawback), to test this (correctly) we will use the jmh library, so we prepare the benchmarking test, comprising of two benchmarks, both of them will filter the list of odd values and multiply the rest by two than collect the data into a new list, the first stream will be a regular stream, and second will be a parallel stream.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import static java.util.stream.Collectors.toList; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
import org.openjdk.jmh.annotations.Benchmark; | |
import org.openjdk.jmh.annotations.OutputTimeUnit; | |
import org.openjdk.jmh.annotations.Scope; | |
import org.openjdk.jmh.annotations.State; | |
public class StreamBenchmark { | |
@State(Scope.Benchmark) | |
public static class StreamBenchamarkState { | |
static Integer[] arr; | |
static { | |
arr = new Integer[100000]; | |
for (int i = 0; i < arr.length; i++) { | |
arr[i] = i + 1; | |
} | |
} | |
} | |
@Benchmark | |
@OutputTimeUnit(TimeUnit.MILLISECONDS) | |
public List<Integer> streamTest() { | |
return Arrays.stream(StreamBenchamarkState.arr) | |
.filter(e -> e % 2 == 0) | |
.map(e -> e * 2) | |
.collect(toList()); | |
} | |
@Benchmark | |
@OutputTimeUnit(TimeUnit.MILLISECONDS) | |
public List<Integer> parallelStreamTest() { | |
return Arrays.stream(StreamBenchamarkState.arr) | |
.parallel() | |
.filter(e -> e % 2 == 0) | |
.map(e -> e * 2) | |
.collect(toList()); | |
} | |
} |
No we will use jmh java api to run the test, we can do it from command line, refer to the docs for more info.https://gist.github.com/chermehdi/1d60b8a780fb6115b084b7e32984c471
we will observe the results
# JMH version: 1.21 # VM version: JDK 1.8.0_131, Java HotSpot(TM) 64-Bit Server VM, 25.131-b11 # VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/bin/java # VM options: # Warmup: 5 iterations, 10 s each # Measurement: 5 iterations, 10 s each # Timeout: 10 min per iteration # Threads: 1 thread, will synchronize iterations # Benchmark mode: Throughput, ops/time # Benchmark: org.sample.StreamBenchmark.parallelStreamTest # Run progress: 0.00% complete, ETA 00:03:20 # Fork: 1 of 1 # Warmup Iteration 1: 2.498 ops/ms # Warmup Iteration 2: 2.647 ops/ms # Warmup Iteration 3: 2.435 ops/ms # Warmup Iteration 4: 2.376 ops/ms # Warmup Iteration 5: 2.280 ops/ms Iteration 1: 2.398 ops/ms Iteration 2: 2.570 ops/ms Iteration 3: 2.510 ops/ms Iteration 4: 2.480 ops/ms Iteration 5: 2.533 ops/ms Result "org.sample.StreamBenchmark.parallelStreamTest": 2.498 ±(99.9%) 0.250 ops/ms [Average] (min, avg, max) = (2.398, 2.498, 2.570), stdev = 0.065 CI (99.9%): [2.248, 2.748] (assumes normal distribution) # JMH version: 1.21 # VM version: JDK 1.8.0_131, Java HotSpot(TM) 64-Bit Server VM, 25.131-b11 # VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/bin/java # VM options: # Warmup: 5 iterations, 10 s each # Measurement: 5 iterations, 10 s each # Timeout: 10 min per iteration # Threads: 1 thread, will synchronize iterations # Benchmark mode: Throughput, ops/time # Benchmark: org.sample.StreamBenchmark.streamTest # Run progress: 50.00% complete, ETA 00:01:45 # Fork: 1 of 1 # Warmup Iteration 1: 1.074 ops/ms # Warmup Iteration 2: 1.220 ops/ms # Warmup Iteration 3: 1.203 ops/ms # Warmup Iteration 4: 1.235 ops/ms # Warmup Iteration 5: 1.212 ops/ms Iteration 1: 1.252 ops/ms Iteration 2: 1.191 ops/ms Iteration 3: 1.132 ops/ms Iteration 4: 1.095 ops/ms Iteration 5: 1.219 ops/ms Result "org.sample.StreamBenchmark.streamTest": 1.178 ±(99.9%) 0.246 ops/ms [Average] (min, avg, max) = (1.095, 1.178, 1.252), stdev = 0.064 CI (99.9%): [0.932, 1.424] (assumes normal distribution) # Run complete. Total time: 00:03:31 REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial experiments, perform baseline and negative tests that provide experimental control, make sure the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts. Do not assume the numbers tell you what you want them to tell. Benchmark Mode Cnt Score Error Units StreamBenchmark.parallelStreamTest thrpt 5 2.498 ± 0.250 ops/ms StreamBenchmark.streamTest thrpt 5 1.178 ± 0.246 ops/ms
we see that the parallelStream proved to be twice as fast as the normal stream on my machine
and we can see a more performance improvements on better dedicated servers.
and we can see a more performance improvements on better dedicated servers.