Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added latch to async SyncOnSubscrbeTest #3285

Merged
merged 1 commit into from Sep 4, 2015

Conversation

stealthcode
Copy link

No description provided.

@akarnokd
Copy link
Member

akarnokd commented Sep 2, 2015

#3286 has encountered another failure:

rx.observables.SyncOnSubscribeTest > testSubscribeOn FAILED
    java.lang.AssertionError: expected:<4> but was:<3>
        at org.junit.Assert.fail(Assert.java:93)
        at org.junit.Assert.failNotEquals(Assert.java:647)
        at org.junit.Assert.assertEquals(Assert.java:128)
        at org.junit.Assert.assertEquals(Assert.java:472)
        at org.junit.Assert.assertEquals(Assert.java:456)
        at rx.observables.SyncOnSubscribeTest.testSubscribeOn(SyncOnSubscribeTest.java:734)

@stealthcode
Copy link
Author

This should also resolve #3287. I have added your example test to the unit tests (3,000 iterations).

IS_UNSUBSCRIBED.compareAndSet(this, 0, 1);
if (get() == 0L)
parent.onUnsubscribe(state);
while(true) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a much simpler way of fixing this: undo all these latest changes and request 1!

if (BackpressureUtils.getAndAddRequest(this, 1) == 0) {
    parent.onUnsubscribe(state);
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's interesting. If there is currently a thread processing over the request count loop (i.e. processing nextIteration()) then won't this have to block the unsubscribing thread? The unsubscribing thread would have to poll over this until it got access. I'm not seeing how this would force a thread currently iterating to terminate. If you read through the concurrency I think you will see that it's not really all that complicated and I believe it sufficiently covers the possible race conditions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no blocking. My suggestion follows the queue-drain approach where you queue the terminal flag and drain it in the emission loop. Since the loop first checks for the terminal flag there won't be any wrong emissions and the loop quits. If unsubscribe increments from zero, there isn't and won't be any emission loop after that and the resource can be freed.

There is no need for your complicated solution.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing where you are suggesting setting a terminal flag. You are more than welcome to open your own pull request to demonstrate your fix. My pull request does set a flag for the draining thread to signal when it needs to unsubscribe.

@abersnaze
Copy link
Contributor

👍

@akarnokd
Copy link
Member

akarnokd commented Sep 4, 2015

Okay, let's merge this so other PRs don't run into the test bug with the current main.

akarnokd added a commit that referenced this pull request Sep 4, 2015
Added latch to async SyncOnSubscrbeTest
@akarnokd akarnokd merged commit d60f0c0 into ReactiveX:1.x Sep 4, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants