Skip to content

Commit

Permalink
SI-8689 Avoid internal error in Promise after sequence of completions
Browse files Browse the repository at this point in the history
Calling `completeWith` when the `DefaultPromise` is already completed,
leads to callbacks not being properly executed.

This happened because `Future.InternalCallbackExecutor` extends
`BatchingExecutor`[1] which assumes `unbatchedExecute` to be async,
when in this case it is sync, and if there is an exception thrown
by executing the batch, it creates a new batch with the remaining
items from the current batch and submits that to `unbatchedExecute`
and then rethrows, but if you have a sync `unbatchedExecute`, it will
fail since it is not reentrant, as witnessed by the failed `require`
as reported in this issue.

This commit avoids problem by delegating `completeWith` to
`tryComplete`, which has the effect of using `onComplete` +
`tryComplete` i.s.o. `complete`, which means that when it fails
(because of a benign race condition between completers) it won't
throw an exception.

It has been tested by the minimized reproducer.

[1] Actually, in the 2.10.x branch where this patch is starting out,
    "The BatchingExecutor trait had to be inlined into
    InternalCallbackExecutor for binary compatibility.". This comment
    will be more literally correct in the context of 2.11.x and beyond
  • Loading branch information
viktorklang authored and retronym committed Feb 4, 2015
1 parent ad0ddd4 commit bf20737
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 14 deletions.
7 changes: 2 additions & 5 deletions src/library/scala/concurrent/Promise.scala
Expand Up @@ -66,11 +66,8 @@ trait Promise[T] {
*
* @return This promise
*/
final def completeWith(other: Future[T]): this.type = {
other onComplete { this complete _ }
this
}

final def completeWith(other: Future[T]): this.type = tryCompleteWith(other)

/** Attempts to complete this promise with the specified future, once that future is completed.
*
* @return This promise
Expand Down
77 changes: 68 additions & 9 deletions test/files/jvm/future-spec/PromiseTests.scala
Expand Up @@ -43,21 +43,80 @@ object PromiseTests extends MinimalScalaTest {
Await.result(failure fallbackTo otherFailure, defaultTimeout)
}.getMessage mustBe ("br0ken")
}


"be completable with a completed Promise" in {
{
val p = Promise[String]()
p.tryCompleteWith(Promise[String]().success("foo").future)
Await.result(p.future, defaultTimeout) mustBe ("foo")
}
{
val p = Promise[String]()
p.completeWith(Promise[String]().success("foo").future)
Await.result(p.future, defaultTimeout) mustBe ("foo")
}
{
val p = Promise[String]()
p.tryCompleteWith(Promise[String]().failure(new RuntimeException("br0ken")).future)
intercept[RuntimeException] {
Await.result(p.future, defaultTimeout)
}.getMessage mustBe ("br0ken")
}
{
val p = Promise[String]()
p.tryCompleteWith(Promise[String]().failure(new RuntimeException("br0ken")).future)
intercept[RuntimeException] {
Await.result(p.future, defaultTimeout)
}.getMessage mustBe ("br0ken")
}
}
}

"A successful Promise" should {
val result = "test value"
val promise = Promise[String]().complete(Success(result))
promise.isCompleted mustBe (true)
futureWithResult(_(promise.future, result))
"be completed" in {
val result = "test value"
val promise = Promise[String]().complete(Success(result))
promise.isCompleted mustBe (true)
futureWithResult(_(promise.future, result))
}

"not be completable with a completed Promise" in {
{
val p = Promise.successful("bar")
p.tryCompleteWith(Promise[String]().success("foo").future)
Await.result(p.future, defaultTimeout) mustBe ("bar")
}
{
val p = Promise.successful("bar")
p.completeWith(Promise[String]().success("foo").future)
Await.result(p.future, defaultTimeout) mustBe ("bar")
}
}
}

"A failed Promise" should {
val message = "Expected Exception"
val promise = Promise[String]().complete(Failure(new RuntimeException(message)))
promise.isCompleted mustBe (true)
futureWithException[RuntimeException](_(promise.future, message))
"be completed" in {
val message = "Expected Exception"
val promise = Promise[String]().complete(Failure(new RuntimeException(message)))
promise.isCompleted mustBe (true)
futureWithException[RuntimeException](_(promise.future, message))
}
"not be completable with a completed Promise" in {
{
val p = Promise[String]().failure(new RuntimeException("unbr0ken"))
p.tryCompleteWith(Promise[String].failure(new Exception("br0ken")).future)
intercept[RuntimeException] {
Await.result(p.future, defaultTimeout)
}.getMessage mustBe ("unbr0ken")
}
{
val p = Promise[String]().failure(new RuntimeException("unbr0ken"))
p.completeWith(Promise[String]().failure(new Exception("br0ken")).future)
intercept[RuntimeException] {
Await.result(p.future, defaultTimeout)
}.getMessage mustBe ("unbr0ken")
}
}
}

"An interrupted Promise" should {
Expand Down
1 change: 1 addition & 0 deletions test/files/jvm/t8689.check
@@ -0,0 +1 @@
success
13 changes: 13 additions & 0 deletions test/files/jvm/t8689.scala
@@ -0,0 +1,13 @@
object Test {
def main(args: Array[String]): Unit = {
import scala.concurrent._
import ExecutionContext.Implicits.global
val source1 = Promise[Int]()
val source2 = Promise[Int]()
source2.completeWith(source1.future).future.onComplete {
case _ => print("success")
}
source2.tryFailure(new TimeoutException)
source1.success(123)
}
}

0 comments on commit bf20737

Please sign in to comment.