Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
415 views
in Technique[技术] by (71.8m points)

concurrency - Nested Java 8 parallel forEach loop perform poor. Is this behavior expected?

Note: I already addressed this problem in another SO post - Using a semaphore inside a nested Java 8 parallel stream action may DEADLOCK. Is this a bug? -, but the title of this post suggested that the problem is related to the use of a semaphore - which somewhat distracted the discussion. I am creating this one to stress that nested loops might have a performance issue - although both problems have likely a common cause (and maybe because it took me a lot of time to figure out this problem). (I don't see it as a duplicate, because it is stressing another symptom - but if you do just delete it).

Problem: If you nest two Java 8 stream.parallel().forEach loops and all tasks are independent, stateless, etc. - except for being submitted to the common FJ pool -, then nesting a parallel loop inside a parallel loop performs much poorer than nesting a sequential loop inside a parallel loop. Even worse: If the operation containing the inner loop is synchronized, your will get a DEADLOCK.

Demonstration of the performance issue

Without the 'synchronized' you can still observe a performance problem. You find a demo code for this at: http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachTest.java (see the JavaDoc there for a more detailed description).

Our setup here is as follows: We have a nested stream.parallel().forEach().

  • The inner loop is independent (stateless, no interference, etc. - except of the use of a common pool) and consumes 1 second in total in the worst case, namely if processed sequential.
  • Half of the tasks of the outer loop consume 10 seconds prior that loop.
  • Half consume 10 seconds after that loop.
  • Hence every thread consumes 11 seconds (worst case) in total. ?* We have a boolean which allows to switch the inner loop from parallel() to sequential().

Now: submitting 24 outer-loop-tasks to a pool with parallelism 8 we would expect 24/8 * 11 = 33 seconds at best (on an 8 core or better machine).

The result is:

  • With inner sequential loop: 33 seconds.
  • With inner parallel loop: >80 seconds (I had 92 seconds).

Question: Can you confirm this behavior? Is this something one would expect from the framework? (I am a bit more careful now with a claim that this is a bug, but I personally believe that it is due to a bug in the implementation of ForkJoinTask. Remark: I have posted this to concurrency-interest (see http://cs.oswego.edu/pipermail/concurrency-interest/2014-May/012652.html ), but so far I did not get confirmation from there).

Demonstration of the the deadlock

The following code will DEADLOCK

    // Outer loop
    IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {
        doWork();
        synchronized(this) {
            // Inner loop
            IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
                doWork();
            });
        }
    });

where numberOfTasksInOuterLoop = 24, numberOfTasksInInnerLoop = 240, outerLoopOverheadFactor = 10000 and doWork is some stateless CPU burner.

You find a complete demo code at http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachAndSynchronization.java (see the JavaDoc there for a more detailed description).

Is this behavior expected? Note that the documentation on Java parallel streams does not mention any issue with nesting or synchronization. Also, the fact that both use a common fork-join-pool is not mentioned.

Update

Another test on the performance issue can be found at http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachBenchmark.java - this test come without any blocking operation (no Thread.sleep and not synchronized). I compiled some more remarks here: http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

Update 2

It appears as if this problem and the more severe DEADLOCK with semaphores has been fixed in Java8 u40.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

The problem is that the rather limited parallelism you have configured is eaten up by the outer stream processing: if you say that you want eight threads and process a stream of more than eight items with parallel() it will create eight worker threads and let them process items.

Then within your consumer you are processing another stream using parallel() but there are no worker threads left. Since the worker threads are blocked waiting for the end of the of the inner stream processing, the ForkJoinPool has to create new worker threads which violate your configured parallelism. It seems to me that it does not recycle these extend threads but let them die right after processing. So within your inner processing, new threads are created and disposed which is an expensive operation.

You might see it as a flaw that the initiating threads do not contribute to the computation of a parallel stream processing but just wait for the result but even if that was fixed you still have a general problem that is hard (if ever) to fix:

Whenever the ratio between the number of worker threads to outer stream items is low, the implementation will use them all for the outer stream as it doesn’t know that the stream is an outer stream. So executing an inner stream in parallel requests more worker threads than available. Using the caller thread for contributing to the computation could fix it in a way that the performance equals the serial computation but getting an advantage of parallel execution here does not work well with the concept of a fixed number of worker threads.

Note that you are scratching on the surface of this problem here, as you have rather balanced processing times for the items. If the processing of both, inner items and outer items, diverge (compared to items on the same level), the problem will be even worse.


Update: by profiling and looking at the code it seems that the ForkJoinPool does attempts to use the waiting thread for “work stealing” but using different code depending on the fact whether the Thread is a worker thread or some other thread. As a result, a worker thread is actually waiting about 80% of the time and doing very little to no work while other threads really contribute to the computation…


Update 2: for completeness, here the simple parallel execution approach as described in the comments. Since it enqueues every item it is expected to have to much overhead when the execution time for a single item is rather small. So it’s not a sophisticated solution but rather a demonstration that it is possible to handle long running tasks without much magic…

import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.*;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class NestedParallelForEachTest1 {
    static final boolean isInnerStreamParallel = true;

    // Setup: Inner loop task 0.01 sec in worse case. Outer loop task: 10 sec + inner loop. This setup: (100 * 0.01 sec + 10 sec) * 24/8 = 33 sec.
    static final int numberOfTasksInOuterLoop = 24;  // In real applications this can be a large number (e.g. > 1000).
    static final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
    static final int concurrentExecutionsLimitForStreams = 8;

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.out.println(System.getProperty("java.version")+" "+System.getProperty("java.home"));
        new NestedParallelForEachTest1().testNestedLoops();
        E.shutdown();
    }

    final static ThreadPoolExecutor E = new ThreadPoolExecutor(
        concurrentExecutionsLimitForStreams, concurrentExecutionsLimitForStreams,
        2, TimeUnit.MINUTES, new SynchronousQueue<>(), (r,e)->r.run() );

    public static void parallelForEach(IntStream s, IntConsumer c) {
        s.mapToObj(i->E.submit(()->c.accept(i))).collect(Collectors.toList())
         .forEach(NestedParallelForEachTest1::waitOrHelp);
    }
    static void waitOrHelp(Future f) {
        while(!f.isDone()) {
            Runnable r=E.getQueue().poll();
            if(r!=null) r.run();
        }
        try { f.get(); }
        catch(InterruptedException ex) { throw new RuntimeException(ex); }
        catch(ExecutionException eex) {
            Throwable t=eex.getCause();
            if(t instanceof RuntimeException) throw (RuntimeException)t;
            if(t instanceof Error) throw (Error)t;
            throw new UndeclaredThrowableException(t);
        }
    }
    public void testNestedLoops(NestedParallelForEachTest1 this) {
        long start = System.nanoTime();
        // Outer loop
        parallelForEach(IntStream.range(0,numberOfTasksInOuterLoop), i -> {
            if(i < 10) sleep(10 * 1000);
            if(isInnerStreamParallel) {
                // Inner loop as parallel: worst case (sequential) it takes 10 * numberOfTasksInInnerLoop millis
                parallelForEach(IntStream.range(0,numberOfTasksInInnerLoop), j -> sleep(10));
            }
            else {
                // Inner loop as sequential
                IntStream.range(0,numberOfTasksInInnerLoop).sequential().forEach(j -> sleep(10));
            }
            if(i >= 10) sleep(10 * 1000);
        });
        long end = System.nanoTime();
        System.out.println("Done in "+TimeUnit.NANOSECONDS.toSeconds(end-start)+" sec.");
    }
    static void sleep(int milli) {
        try {
            Thread.sleep(milli);
        } catch (InterruptedException ex) {
            throw new AssertionError(ex);
        }
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...