Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SafeSubscriber - report onCompleted unsubscribe error to RxJavaPlugin #3155

Merged
merged 1 commit into from Aug 24, 2015

Conversation

davidmoten
Copy link
Collaborator

Discussed in #2464, when an observable emits onCompleted but unsubscribe in SafeSubscriber throws then the error should be reported to the RxJavaPlugin error handler and if that fails a stack trace is written to System.err.

@davidmoten
Copy link
Collaborator Author

There are a lot of repeated try catch blocks calling the RxJavaPlugins error handler so I've made a RxJavaPluginUtils class. It will also be called from the using fix which I will submit once this PR has been accepted.

@akarnokd
Copy link
Member

One of the tests fails.

@davidmoten
Copy link
Collaborator Author

I'm going to ponder the issue of throwing after reporting to the error handler a bit.

@davidmoten
Copy link
Collaborator Author

I've been looking for a bit of guidance about what to do when unsubscribe throws in SafeSubscriber. Rx Design Guidelines doesn't really cover it. reactive-streams-jvm does touch upon it saying:

3.15 Calling Subscription.cancel MUST return normally. The only legal way to signal failure to a Subscriber is via the onError method.

I think given that the contract has been breached it is fair to throw an exception and I think a new wrapper UnsubscribeFailureException would be appropriate.

I'd also like to address the case when onCompleted throws. reactive-streams-jvm has this to say:

2.13 Calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way for a Subscriber to signal failure is by cancelling its Subscription. In the case that this rule is violated, any associated Subscription to the Subscriber MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment.

So in the existing code if in SafeSubscriber an onError call to the downstream subscriber throws then the RxJavaPlugin error handler is called and an OnErrorFailedException is thrown. Comments in the method say this is ok because the contract has been broken and this is probably consistent with the advice in 3.13:

and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment.

In the existing code if in SafeSubscriber an onCompleted call to the downstream subscriber throws then the consequent call to _onError tries to signal onError to the downstream subscriber. I'm worried that calling onError after calling onCompleted is a strong enough breach of the contract that the possible side effects could be worse than just a throw. Shouldn't we throw only (and notify the plugin error handler of course)?

@davidmoten
Copy link
Collaborator Author

I've submitted code that matches my thinking above. It still fails because I haven't amended the existing unit tests associated with throwing. I'll do that after another round of review if accepted.

@akarnokd
Copy link
Member

The failing test is a coverage test for a behavior no longer present. I suggest removing it and rerunning the jacoco to see if all current code path is covered.

@davidmoten
Copy link
Collaborator Author

@akarnokd Yep that's right. Are you happy that a throwing onCompleted() call should not induce an onError() call? If so I'll clean up and update the PR.

@davidmoten
Copy link
Collaborator Author

I've fixed the unit tests and added more so that SafeSubscriber gets 100% coverage.

@akarnokd
Copy link
Member

👍 Looks good to me.

Since this adds public API classes, it requires further approval (otherwise it shouldn't affect programs other than those that badly fail, therefore, the merge policy would allow me to merge it).

safe.onCompleted();

assertTrue(safe.isUnsubscribed());
Assert.fail();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent

@zsxwing
Copy link
Member

zsxwing commented Aug 22, 2015

LGTM. It would be better if you can fix the indent. 👍 for UnsubscribeFailureException. It's a pity that we cannot change OnErrorFailedExceptions that should have been UnsubscribeFailureException now since it's a break change.

@davidmoten
Copy link
Collaborator Author

Thanks for review @akarnokd and @zsxwing. I've fixed indent in test and squashed commits.

@akarnokd
Copy link
Member

Great, thanks!

akarnokd added a commit that referenced this pull request Aug 24, 2015
SafeSubscriber - report onCompleted unsubscribe error to RxJavaPlugin
@akarnokd akarnokd merged commit d71314e into ReactiveX:1.x Aug 24, 2015
hyleung added a commit to hyleung/ratpack that referenced this pull request Apr 2, 2016
- Bump to RxJava 1.1.2 because Hystrix 1.5.1 depends on 1.1.1
- Changes expected behaviour in `RxErrorHandlingSpec` - exceptions thrown
 `onComplete` do not get propagated to `onError`.  Related to this change in RxJava
(ReactiveX/RxJava#3155), onComplete *must*
return normally.
- Code snippets in `RxRatpack.java` updated due to `Observable.x` being
  renamed `Observable.extend` -
ReactiveX/RxJava#3423
hyleung added a commit to hyleung/ratpack that referenced this pull request Apr 3, 2016
- Bump to RxJava 1.1.2. Upgrading to 1.1.1, saw test failures in
  `HystrixRequestCachingSpec` due to `ClassCastException`. I _think_ it
may be related to a bug that was fixed in 1.1.2
(ReactiveX/RxJava#3697).
- Changes expected behaviour in `RxErrorHandlingSpec` - exceptions thrown
 `onComplete` do not get propagated to `onError`.  Related to this change in RxJava
(ReactiveX/RxJava#3155), onComplete *must*
return normally. This change went was included in RxJava 1.0.15.
- Code snippets in `RxRatpack.java` updated due to `Observable.x` being
  renamed `Observable.extend` -
ReactiveX/RxJava#3423. Also part of RxJava 1.0.15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants