23

Consider the following situation: We are using a Java 8 parallel stream to perform a parallel forEach loop, e.g.,

IntStream.range(0,20).parallel().forEach(i -> { /* work done here */})

The number of parallel threads is controlled by the system property "java.util.concurrent.ForkJoinPool.common.parallelism" and usually equal to the number of processors.

Now assume that we like to limit the number of parallel executions for a specific piece of work - e.g. because that part is memory intensive and memory constrain imply a limit of parallel executions.

An obvious and elegant way to limit parallel executions is to use a Semaphore (suggested here), e.g., the following pice of code limits the number of parallel executions to 5:

        final Semaphore concurrentExecutions = new Semaphore(5);
        IntStream.range(0,20).parallel().forEach(i -> {

            concurrentExecutions.acquireUninterruptibly();

            try {
                /* WORK DONE HERE */
            }
            finally {
                concurrentExecutions.release();
            }
        });

This works just fine!

However: Using any other parallel stream inside the worker (at /* WORK DONE HERE */) may result in a deadlock.

For me this is an unexpected behavior.

Explanation: Since Java streams use a ForkJoin pool, the inner forEach is forking, and the join appears to be waiting for ever. However, this behavior is still unexpected. Note that parallel streams even work if you set "java.util.concurrent.ForkJoinPool.common.parallelism" to 1.

Note also that it may not be transparent if there is an inner parallel forEach.

Question: Is this behavior in accordance with the Java 8 specification (in that case it would imply that the use of Semaphores inside parallel streams workers is forbidden) or is this a bug?

For convenience: Below is a complete test case. Any combinations of the two booleans work, except "true, true", which results in the deadlock.

Clarification: To make the point clear, let me stress one aspect: The deadlock does not occur at the acquire of the semaphore. Note that the code consists of

  1. acquire semaphore
  2. run some code
  3. release semaphore

and the deadlock occurs at 2. if that piece of code is using ANOTHER parallel stream. Then the deadlock occurs inside that OTHER stream. As a consequence it appears that it is not allowed to use nested parallel streams and blocking operations (like a semaphore) together!

Note that it is documented that parallel streams use a ForkJoinPool and that ForkJoinPool and Semaphore belong to the same package - java.util.concurrent (so one would expect that they interoperate nicely).

/*
 * (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: [email protected].
 *
 * Created on 03.05.2014
 */
package net.finmath.experiments.concurrency;

import java.util.concurrent.Semaphore;
import java.util.stream.IntStream;

/**
 * This is a test of Java 8 parallel streams.
 * 
 * The idea behind this code is that the Semaphore concurrentExecutions
 * should limit the parallel executions of the outer forEach (which is an
 * <code>IntStream.range(0,numberOfTasks).parallel().forEach</code> (for example:
 * the parallel executions of the outer forEach should be limited due to a
 * memory constrain).
 * 
 * Inside the execution block of the outer forEach we use another parallel stream
 * to create an inner forEach. The number of concurrent
 * executions of the inner forEach is not limited by us (it is however limited by a
 * system property "java.util.concurrent.ForkJoinPool.common.parallelism").
 * 
 * Problem: If the semaphore is used AND the inner forEach is active, then
 * the execution will be DEADLOCKED.
 * 
 * Note: A practical application is the implementation of the parallel
 * LevenbergMarquardt optimizer in
 * {@link http://finmath.net/java/finmath-lib/apidocs/net/finmath/optimizer/LevenbergMarquardt.html}
 * In one application the number of tasks in the outer and inner loop is very large (>1000)
 * and due to memory limitation the outer loop should be limited to a small (5) number
 * of concurrent executions.
 * 
 * @author Christian Fries
 */
public class ForkJoinPoolTest {

    public static void main(String[] args) {

        // Any combination of the booleans works, except (true,true)
        final boolean isUseSemaphore    = true;
        final boolean isUseInnerStream  = true;

        final int       numberOfTasksInOuterLoop = 20;              // In real applications this can be a large number (e.g. > 1000).
        final int       numberOfTasksInInnerLoop = 100;             // In real applications this can be a large number (e.g. > 1000).
        final int       concurrentExecusionsLimitInOuterLoop = 5;
        final int       concurrentExecutionsLimitForStreams = 10;

        final Semaphore concurrentExecutions = new Semaphore(concurrentExecusionsLimitInOuterLoop);

        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams));
        System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));

        IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {

            if(isUseSemaphore) {
                concurrentExecutions.acquireUninterruptibly();
            }

            try {
                System.out.println(i + "\t" + concurrentExecutions.availablePermits() + "\t" + Thread.currentThread());

                if(isUseInnerStream) {
                    runCodeWhichUsesParallelStream(numberOfTasksInInnerLoop);
                }
                else {
                    try {
                        Thread.sleep(10*numberOfTasksInInnerLoop);
                    } catch (Exception e) {
                    }
                }
            }
            finally {
                if(isUseSemaphore) {
                    concurrentExecutions.release();
                }
            }
        });

        System.out.println("D O N E");
    }

    /**
     * Runs code in a parallel forEach using streams.
     * 
     * @param numberOfTasksInInnerLoop Number of tasks to execute.
     */
    private static void runCodeWhichUsesParallelStream(int numberOfTasksInInnerLoop) {
        IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
            try {
                Thread.sleep(10);
            } catch (Exception e) {
            }
        });
    }
}
8
  • Just guessing: as all stream computations performed on a shared thread pool and you have blocked it's part with a semaphore - inner stream can't be run until semaphore is released and semaphore can't be released because it's waiting inner stream. You can find some more info in this article: java.dzone.com/articles/think-twice-using-java-8 May 3, 2014 at 12:13
  • Thank you. I know that this is the reason. I know that they use a shared pool. However, I find the behavior unexpected. If the inner loop is executed there is at least one thread running (namely that, starting the execution of the inner loop). If all others are blocked, why is the inner loop forking at all? He could just run single threaded. Also note that I have only blocked 5 out of 10! May 3, 2014 at 12:42
  • Well, at first - you've blocked 5 and have another 5 occupied by outer loop instances - there is no place for inner loop in the pool. The second - you can have a separate pool as stated in this question stackoverflow.com/questions/21163108/… May 3, 2014 at 14:17
  • 2
    Your assumption is erroneous. Once you turn over the parallelism to the API you relinquish your ability to control the inner workings of that parallel mechanism. Briefly, Streams is lazy. It builds the stream then it executes the terminal operation. You can control the parallelism with the system property but you cannot control what happens therein. I’m sure someone will find a good use for Semaphores sometime within a parallel stream, but not here.
    – edharned
    May 3, 2014 at 15:24
  • 3
    It looks like a bug for me. Apparently, the tasks of the inner foreach operations are waiting on their own sub-tasks. They can't be executed by another worker thread, because all worker threads are blocked. However, that shouldn't matter. If a task needs the result of a sub-task, and the sub-task has not started yet, the parent task should execute the sub-task inline. I can see in the source code, that the ForkJoinPool implements this behaviour, but for some reason, it's not working for your code.
    – nosid
    May 4, 2014 at 17:31

3 Answers 3

49

Any time you are decomposing a problem into tasks, where those tasks could be blocked on other tasks, and try and execute them in a finite thread pool, you are at risk for pool-induced deadlock. See Java Concurrency in Practice 8.1.

This is unquestionably a bug -- in your code. You're filling up the FJ pool with tasks that are going to block waiting for the results of other tasks in the same pool. Sometimes you get lucky and things manage to not deadlock (just like not all lock-ordering errors result in deadlock all the time), but fundamentally you're skating on some very thin ice here.

4
  • You are right that I am filling up the FJP with tasks, waiting on other tasks. But if you take that Semaphore just as an evil test: Note that the Semaphore is at the top of the outer loop. For this nested loop I would not expect that an inner loop's task will wait on any other outer loop' task. But this is what happens. Your hint to implicit coupling is correct, but I would never expect that the tasks were coupled that way. Is a common FJP a good way to execute parallel (nested) streams? Note: 5 of 10 threads are blocked by Semaphore. The blocking then occurs at the inner loop's task awaitJoin May 5, 2014 at 21:48
  • 1
    @Christian: Yes, this stuff is hard, and seemingly unrelated couplings can cause surprising behavior. Which is why it is best not to arbitrarily add synchronization actions to threads you don't own! May 5, 2014 at 23:02
  • 1
    I understand your point. But I believe that there is still a problem with the FJP implementation and nesting. The Semaphore was just to make it show. I tried to create a demo without semaphore here: svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/… - the point is that the FCJ implementation introduces an implicit coupling where I would not expect one. - I hope I don't waste everyones time. May 6, 2014 at 8:22
  • 5
    So far, no one has shown a problem with the FJP implementation; only that when you abuse it, it sometimes abuses you back. We can make parallelism easier to access, but that still doesn't make it easy. May 6, 2014 at 21:49
4

I ran your test in a profiler (VisualVM) and I agree: Threads are waiting for the semaphore and on aWaitJoin() in the F/J Pool.

This framework has serious problems where join() is concerned. I’ve been writing a critique about this framework for four years now. The basic join problem starts here.

aWaitJoin() has similar problems. You can peruse the code yourself. When the framework gets to the bottom of the work deque it issues a wait(). What it all comes down to is this framework has no way of doing a context-switch.

There is a way of getting this framework to create compensation threads for the threads that are stalled. You need to implement the ForkJoinPool.ManagedBlocker interface. How you can do this, I have no idea. You’re running a basic API with streams. You’re not implementing the Streams API and writing your own code.

I stick to my comment, above: Once you turn over the parallelism to the API you relinquish your ability to control the inner workings of that parallel mechanism. There is no bug with the API (other than it is using a faulty framework for parallel operations.) The problem is that semaphores or any other method for controlling parallelism within the API are hazardous ideas.

3
  • With respect to your comment I like to counter that I am feeling to comply with "the API": First: parallel() is a property of the stream. It tells that this stream can be processed in a non-serial manner, splitted into subtask. It does not enforce that tasks are distributed to other threads. In fact: just ran my code setting parallelism to 1 and everything works fine, even with that Semaphore. Second: It is documented that streams use a ForkJoinPool and ForkJoinPool and Semaphore belong to the same package: java.util.concurrent! So I believe everybody would assume they should be compatible. May 5, 2014 at 8:03
  • 1
    That two different classes belong to the same package is irrelevant. Semaphore is about concurrency. F/J is about parallelism (and should really be in a package of its own.) You're trying to mix parallel with concurrent -- is my concern.
    – edharned
    May 5, 2014 at 14:12
  • First: I started reading the webpage you have linked. Thank you for that. It is valueable! Second: This post and the answers and comments were too much focused on the use of the Semaphore. I would like to view that Semaphore now just as a tool, to bring up the unexpected interlocking which occurs by nesting (and which is a problem introduced by the specific common FJP). To get away from that Semaphore discussion I created that other test case, see stackoverflow.com/questions/23489993/… May 7, 2014 at 7:32
4

After a bit of investigation of the source code of ForkJoinPool and ForkJoinTask, I assume that I found an answer:

It is a bug (in my opinion), and the bug is in doInvoke() of ForkJoinTask. The problem is actually related to the nesting of the two loops and presumably not to the use of the Semaphore, however, one needs the Semaphore (or s.th. blocking in the outer loop) to make the problem become apparent and result in a deadlock (but I can imagine there are other issues implied by this bug - see Nested Java 8 parallel forEach loop perform poor. Is this behavior expected? ).

The implementation of the doInvoke() method currently looks as follows:

/**
 * Implementation for invoke, quietlyInvoke.
 *
 * @return status upon completion
 */
private int doInvoke() {
    int s; Thread t; ForkJoinWorkerThread wt;
    return (s = doExec()) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) :
        externalAwaitDone();
}

(and maybe also in doJoin which looks similar). In the line

        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?

it is tested if Thread.currentThread() is an instance of ForkJoinWorkerThread. The reason of this test is to check if the ForkJoinTask is running on a worker thread of the pool or the main thread. I believe that this line is OK for a non-nested parallel for, where it allows to distinguish if the current tasks runs on the main thread or on a pool worker. However, for tasks of the inner loop this test is problematic: Let us call the thread who runs the parallel().forEeach the creator thread. For the outer loop the creator thread is the main thread and it is not an instanceof ForkJoinWorkerThread. However, for inner loops running from a ForkJoinWorkerThread the creator thread is an instanceof ForkJoinWorkerThread too. Hence, in this situation, the test ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) IS ALWAYS TRUE!

Hence, we always call pool.awaitJoint(wt.workQueue).

Now, note that we call awaitJoint on the FULL workQueue of that thread (I believe that this is an additional flaw). It appears as if we are not only joining the inner-loops tasks, but also the task(s) of the outer loop and we JOIN ALL THOSE tasks. Unfortunately, the outer task contains that Semaphore.

To proof, that the bug is related to this, we may check a very simple workaround. I create a t = new Thread() which runs the inner loop, then perform t.start(); t.join();. Note that this will not introduce any additional parallelism (I am immediately joining). However, it will change the result of the instanceof ForkJoinWorkerThread test for the creator thread. (Note that task will still be submitted to the common pool). If that wrapper thread is created, the problem does not occur anymore - at least in my current test situation.

I postet a full demo to http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/ForkJoinPoolTest.java

In this test code the combination

final boolean isUseSemaphore        = true;
final boolean isUseInnerStream      = true;
final boolean isWrappedInnerLoopThread  = false;

will result in a deadlock, while the combination

final boolean isUseSemaphore        = true;
final boolean isUseInnerStream      = true;
final boolean isWrappedInnerLoopThread  = true;

(and actually all other combinations) will not.

Update: Since many are pointing out that the use of the Semaphore is dangerous I tried to create a demo of the problem without Semaphore. Now, there is no more deadlock, but an - in my opinion - unexpected performance issue. I created a new post for that at Nested Java 8 parallel forEach loop perform poor. Is this behavior expected?. The demo code is here: http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachTest.java

2
  • I've been looking at the F/J classes for four years now and I'm still confused. You can walk thru the method calls with a profiler and see what is happening. If you feel there is a bug then file a bug report or at least post the question on the concurrency-interest list (altair.cs.oswego.edu/mailman/listinfo/concurrency-interest) so Doug Lea can take a look at it. Who better to resolve the issue?
    – edharned
    May 5, 2014 at 14:17
  • Thanks for your comment. I have submitted a bug report already yesterday. But now I am sure that it is a bug. It is related to the nesting. The Semaphore is only needed to make the bug show up... I am sure there are other contested where this kind of "global joining" (see my update above) is faulty.... - thanks again for your interest and references! May 5, 2014 at 17:13

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.