Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to a bunch of bugs and issues with AsyncOnSubscribe #3356

Merged
merged 1 commit into from Sep 29, 2015

Conversation

akarnokd
Copy link
Member

There were several problems with the operator:

  • The first 2 requests were always Long.MAX_VALUE no matter what the child requested.
  • There was a race-condition and memory leak when tracking the generated Observables.
  • Both the individual generated Observables and the main concatenation could overflow the internal buffers and had to be defensively-buffered.
  • If a request was responded with a generated Observable that delivered less than this requested amount, the child ended up hanging.
  • The concatenation didn't know about the number of values it should deliver in aggregate and might have delivered more or less, causing MissingBackpressureException or hangs.
  • Exception thrown from the generateState is now delivered to the child immediately.

As I see, the usage is as follows. Each individual request() from the child is supposed to be fulfilled by individual Observables. For example, request(1) and request(5) will generate two distinct Observables where the first will have 1 value and the second 5.

@akarnokd akarnokd added the Bug label Sep 19, 2015
@akarnokd akarnokd added this to the 1.0.x milestone Sep 19, 2015
@stealthcode
Copy link

It seems to me that there are still outstanding questions as to the AsyncOnSubscribe usage. I am going to open a pull request with improved javadocs for the next function. This should help clear up confusion about it's usage. Also there are a couple items here that I'd like to discuss in greater depth. I am not okay with a complete rewrite (as you have provided here) but I would gladly accept contributions. Please read my comments below and reply so that we can move forward.

Exception thrown from the generateState is now delivered to the child immediately.

Great. Good improvement. I would like to see a pull request for this against the current existing code base.

There was a race-condition and memory leak when tracking the generated Observables.

Is this the HashSet<Subscription>? I think that your usage of a CompositeSubscription was a good fix and necessary. Again, I think you could have made this fix to the existing code base very simply.

The first 2 requests were always Long.MAX_VALUE no matter what the child requested.

When debugging your previous test it was clear to me that the TestSubscriber was not correctly using the initialRequest value to change the first requested amount in the underlying Subscriber. I do not think this behavior has anything to do with the AsyncOnSubscribe implementation. I commented to this effect in your issue #3341.

Both the individual generated Observables and the main concatenation could overflow the internal buffers and had to be defensively-buffered.

Can you be more specific? The Buffer until subscriber uses an unbounded queue and I already commented that this was a known issue. I would accept a pull request to address this issue against the existing code base.

If a request was responded with a generated Observable that delivered less than this requested amount, the child ended up hanging.

The observables emitted to the observer should emit exactly n. If they emit less than n then the observer must be onCompleted() as there is no more data available. Your test does not call a terminal event on the observer so I would expect this to hang. This is the area where the documentation needs to be improved (and I will open a pull request to improve this). It makes perfect sense how you would perceive this as a bug. I am open to ideas. However that will require that you communicate with me over the designs and present alternatives.

The concatenation didn't know about the number of values it should deliver in aggregate and might have delivered more or less, causing MissingBackpressureException or hangs.

I deliberately chose not to validate the quantity of onNexted events. This is like validating at every level when it will invalidate intermittently at runtime and then blow up or if you don't implement validation then a missing back pressure exception is thrown intermittently, and it blows up at runtime. Is there anything better that we can do? I vote to not validate and trust that the user of the AsyncOnSubscribe follows the contract (that we will document in detail).

@akarnokd
Copy link
Member Author

I am not okay with a complete rewrite (as you have provided here) but I would gladly accept contributions.

First of all, this isn't a complete rewrite as I kept everything else that wasn't contributing to a bug. The API is still the same and the behavior is what looked like you'd like to achieve: client requests of n_i should be responded to with a single Observable that produces n_i values or (less + completes the main sequence).

against the current existing code base.

The fixes apply together, not in parts.

it was clear to me that the TestSubscriber was not correctly using the initialRequest value to change the first requested amount in the underlying Subscriber

We use this constructor in many other tests that require zero request upfront. It works there. Your code composed the various Subscribers and Producers in a wrong way and the default Long.MAX_VALUE was requested way before the end TestSubscriber could issue a request.

The Buffer until subscriber uses an unbounded queue and I already commented that this was a known issue. I would accept a pull request to address this issue against the existing code base.

BufferUntilSubscriber doesn't support backpressure and thus 3 subsequent requests of any size would generate 3 sources but concat accepts 2 at a time. In addition, there is no way to know if the user actually returned a backpressure-supporting Observable or not. In testSerializesConcurrentObservables you actually over-deliver and the concat would fail with an IllegalStateException.

I have a version of BufferUntilSubscriber in #3150 and before that in #3050 waiting for review for months now. And again, that would still leave the AsyncOnSubscribe buggy so why do separate PRs that would conflict with each other due to line differences?

The observables emitted to the observer should emit exactly n. If they emit less than n then the observer must be onCompleted() as there is no more data available.

testUnderdeliveryCorrection tests for the case where the developer generated the wrong source which doesn't deliver enough values. It wasn't meant to be a finite sequence and not meant to call onComplete; what mattered is how it responded to requestMore calls.

I deliberately chose not to validate the quantity of onNexted events. This is like validating at every level when it will invalidate intermittently at runtime and then blow up or if you don't implement validation then a missing back pressure exception is thrown intermittently, and it blows up at runtime. Is there anything better that we can do? I vote to not validate and trust that the user of the AsyncOnSubscribe follows the contract (that we will document in detail).

You don't seem to understand how concat backpressure works: The client requests n and the concat operator forwards that amount to its first source. Because individual sources may deliver less than this value, concat has to check the number of produced elements so in case a new source is started, concat can request this remaining amount from it. This is what the ProducerArbiter manages.

Your original code forgot to tell concat how many elements to expect and always run in unbounded mode regardless of the original child request value.

@stealthcode
Copy link

@akarnokd I here you. Here is my concern. I have asked you directly for you feedback over the design and I have not received from you what I think is sufficient input to see that we both agree on the design or even the goals. My last impression from you regarding the AsyncOnSubscribe was that you simply didn't even agree with its existence and your questions seemed to indicate that you didn't understand why I implemented it. So when you commented on the pull request (after it was merged) with bugs without test code or details of how to reproduce, I was unclear about your interpretation of the design and asked you to file detailed issues with reproducible test code.

All of that said, I need confirmation that you and I are trying to achieve the same goals before I consider merging this.

@benjchristensen
Copy link
Member

  1. Design

It seems much of the disagreement between you two is that the design and approach was not well discussed, and instead we have jumped to implementations without a common understanding. That discussion is dangling here: #3003 @akarnokd You need to involve yourself in that discussion where I and @stealthcode have weighed in, but not received further feedback.

  1. Rewriting Code

It is generally not helpful to start from scratch unless there is upfront agreement that the original should not be pursued or iterated upon. Forking in two directions dilutes the discussion, pits egos against each other, and prevents collaboration.

I view this PR as a rewrite. Looking at the two side-by-side shows hardly any code left behind, and the algorithms are changed, and it's hard to tell what is actually because of issues versus just stylistic differences between them. This in turn forces everyone to start from scratch in reading the code, understanding the issues and making decisions. If there is something fundamental about the original code that prevents correct behavior, then I suggest that be discussed up front with the original author.

  1. Communication & Collaboration

The underlying contention I sense between you two is shown in places such as:

Please communicate with respect for each other.

Another example of statement that is not helpful:

You don't seem to understand how concat backpressure works:

@akarnokd I could say very similar things if I wanted to about bugs you wrote into the last revision of merge (1, 2), or the recent discussion about Subject in #3349. Please do not make things personal (such as in #3349 (comment)).

I too have submitted PRs with mistakes and bugs. All of us have. We do code reviews and patch releases for a reason, otherwise we should have been done this project a year ago.

Stick to objective statements of functionality, performance, and usage. A bug is a bug. A misunderstanding is just that. Overlooking something is easy, particularly when dealing with asynchrony, concurrency, flow control and the like. This particular AsyncOnSubscribe behavior is non-trivial, otherwise it wouldn't be this big of a discussion. Let's not trivialize it or attack people.

  1. Next Steps

Both of you (@akarnokd & @stealthcode) are great engineers, but with different strengths and weaknesses. That is a great thing for the project to have those diverse viewpoints and skills.

Please focus more on communication and less on the code for a bit. Code is honestly the easiest part of our job. In the long run, healthy collaboration is far more important that sprinting on code.

@akarnokd
Copy link
Member Author

I did my part in reviewing the code and providing fixes. If you think the original version is what you wanted then its up to you and there is no point in discussing this any further.

@stealthcode
Copy link

You don't seem to understand how concat backpressure works

I misunderstood your original statement. My mistake.

I reviewed the code and it looks fine 👍

@stealthcode stealthcode reopened this Sep 28, 2015
stealthcode pushed a commit that referenced this pull request Sep 29, 2015
Fix to a bunch of bugs and issues with AsyncOnSubscribe
@stealthcode stealthcode merged commit f2410f8 into ReactiveX:1.x Sep 29, 2015
@akarnokd akarnokd deleted the AsyncOnSubscribeFix branch September 29, 2015 20:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants