Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented the AsyncOnSubscribe #3203

Merged
merged 1 commit into from Sep 11, 2015

Conversation

stealthcode
Copy link

This includes the first functioning unit tests. This still has a few iterations to go but opening PR for review.

@akarnokd
Copy link
Member

What's the purpose of this class?

@stealthcode
Copy link
Author

This is the asynchronous complement to the SyncOnSubscribe as described in #3003.

@stealthcode
Copy link
Author

The remaining issue with this implementation is that the inner observables are subscribed to an unbounded buffer. This could be improved to a back pressure supporting RxRingBuffer that requests from it's producer only to fill it's buffer capacity and then when it connects, drains the buffer and forwards all requests to its producer.

@stealthcode stealthcode force-pushed the async-on-subscribe branch 2 times, most recently from f54e46c to a68a219 Compare September 1, 2015 01:14
@akarnokd
Copy link
Member

akarnokd commented Sep 4, 2015

l still don't understand this class. Looks very similar to a SyncOnSubscribe that emits Observables and flatMaps it for you.

@stealthcode
Copy link
Author

I understand your concern but I'm not sure about your comment. This operator does not call flatMap. This issue comes from use cases at Netflix with large services and streaming work. I think it's pretty clear to see from the unit tests that this gives a user direct access to the request and allows fulfillment by any observable (which may or may not be asynchronous).

@stealthcode
Copy link
Author

This is very similar to the work that you tried with the AbstractProducer. It allows batching and eager subscription.

@stealthcode
Copy link
Author

This is supposed to help with creating observables from asynchronous data sources in a non-blocking way. Consider the example below...

public static void main(String[] args) {
    Observable.<Integer>create((Subscriber<? super Integer> s) -> {
        s.setProducer((long requested) -> {
            getData(requested)
                .observe()
                .subscribe(s::onNext, s::onError, s::onCompleted);
        });
    });
}

public static HystrixCommand<Integer> getData(long requested) {
    return null;
}

public static interface HystrixCommand<T> {
    public Observable<T> observe();
}

The use case here is that for each request we want to issue an asynchronous service request and when that request is fulfilled then we want to start onNexting the data for that request. The problem with this naive implementation is that it's very easy and certainly possible to have overlapping requests. You could implement the cas addToRequestAndGet and make sure that only one thread at a time is fulfilling the request at a time but you still have the possibility of onNext events interleaving or overlapping.

@abersnaze
Copy link
Contributor

it doesn't help with truly hot sources but it'll work great for progressive async sources. 👍

@benjchristensen
Copy link
Member

I have reviewed this work with @stealthcode as he's been working on it and am quite interested in where this goes. It can greatly help us solve the most difficult part of using RxJava: creating correct sources that support backpressure.

I'll open an issue to summarize the needs for 1.x and 2.x so we can all get aligned on this topic.

I'm +1 on experimenting with this and eventually getting Observable.create overloads for all these helpers.

@benjchristensen
Copy link
Member

Actually, there is already an issue: #3003

stealthcode pushed a commit that referenced this pull request Sep 11, 2015
@stealthcode stealthcode merged commit 84622bb into ReactiveX:1.x Sep 11, 2015

@Override
protected S generateState() {
return generator == null ? null : generator.call();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great, if this is to be migrated to 2.x, that user-supplied functions are called in try-catch. Perhaps not here but in call().

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's merged into 1.x. Feel free to open a pull request to 2.x if you have time or to 1.x if you feel its necessary.

if (onNextCalled)
r = poll();
if (hasTerminated || isUnsubscribed()) {
parent.onUnsubscribe(state);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If nextIteration has terminated, the queue may still hold requested values so it might be worth considering calling clear() before quitting.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. This would affect resubscriptions.

} else {
state.subscriber = s;
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this class could get used by multiple concurrent subscribers (?) but if it does then I would expect to see an atomic compareAndSet here rather than just !=null.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants