New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implementing the SyncOnSubscribe #3118
Conversation
You can't delete |
*/ | ||
private static final class SubscriptionProducer<T> implements Producer { | ||
private final AtomicLong requestCount = new AtomicLong(0); | ||
private final AtomicLong sentCount = new AtomicLong(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sentCount appears to be accessed only in the emission loop so there is no need for an AtomicLong
here or any other type besides a boolean since only 1 onNext is allowed ber callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'ren't sure how much synchronization we needed to insure that a bad implementation didn't call onNext concurrently. The idea was we could use something like this on line 255
long old = sendCount.get();
...
subscriber.onNext(value);
if (!sentCount.compareAndSet(old, old+1)) {
throw new IllegalStateException("concurrent onNext detected");
}
instead of incrementAndGet()
to detect that something bad was happening as soon as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I've integrated this into my latest code.
Experimental APIs can be removed or change at any time, as per the README:
|
I would have expected to make such APIs @deprecated for a few releases before completely removing it. Besides, this PR just gives an alternative to another @experimental class and could live side-by-side. Then users could decide/vote which to keep at the next major release. |
@akarnokd you have a good point that the efficacy of both implementations should be compared however I do not think that we should expose multiple alternatives in the public api. This ambiguous api leads to confusion and questions about the unexplained differences between one or the other implementation. The pull request should not be the place for debate either (since this should be about issues with THIS implementation). This comparison and debate was what I was hoping to get out of opening issue #3003. Would you like to open a separate issue for comparing the 2 implementations? |
e7ed4e0
to
68f8eaa
Compare
The latest commit adds work stealing to manage unsubscribe to avoid concurrent modification of |
b02323a
to
3cf886b
Compare
The latest commit makes a few changes.
|
@stealthcode can you give us more detail on the JMH comparison? |
Sure, sorry for leaving that out. Here's the JMH report output
Under Flight Recorder analysis, both the AOS and SOS suffer from the cost of creating a |
@Benchmark | ||
// @Group("single") | ||
public void benchSyncOnSubscribe(final SingleInput input) { | ||
createSyncOnSubscribe(input.iterator).call(input.newSubscriber()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with this is that the iterator gets consumed once and you then benchmark how fast an empty iterator comes to completion through the operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see Thanks @akarnokd ! I'm correcting and re-running now.
Here are the updated benchmarks. This paints a very different picture than last time. The performance of the
|
3cf886b
to
3dd1c5d
Compare
Here is a version where the integer stream is freshly created: (i7 4770K, Windows 7 x64, Java 8u51) AbstractOnSubscribe (AOS) has the highest overhead because of resource checks on each emission AbstractProducer (AP from #2813) offers amortized resource check overhead due to the inner loop but this loop was not particularly optimized with fast-path and/or amortized request accounting. FromIterable (FI) doesn't care about resources and has fast-path. SyncOnSubscribe (SO) in this PR is close but 33% behind FI. I've made some optimizations to the fast-path and got a much better result for longer sources:
One must be careful around volatile reads because it forces the re-read of instance variables after them. |
Nice, I'm merging your gist. This optimization makes the fast path on-par with FromIterable in the long tail (still however ~25% slower in the 1's - 10's range). |
* Used to thread the state synchronously to every iteration of the | ||
* {@link #next(Object, Subscriber) next} function. | ||
*/ | ||
private S state; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The state
should be per subscriber and not shared between them.
I spoke too soon. In running the unit tests there are now problems with the diff applied. I'll try to incorporate what I can and test to make sure the changes are more performant. |
Sorry, I didn't run any test so my changes were only valid for the perf usage and not general. Here is a version which passes the tests and has the following performance characteristics:
For size = 1, I think the extra allocation of the facade Subscriber makes it twice the overhead. The onNext safeguard also costs ~15%:
Edit:
The 1M case becomes quite varying, perhaps due to the huge amount of |
35ea407
to
b84884b
Compare
@akarnokd your changes are pretty solid. I can't argue with results. Here are the benchmarks for your changes alone.
I implemented a small api change to expose a setter on an unsubscribe
With this change we are now faster than |
return state; | ||
} | ||
}; | ||
Action1<S> onUnsubscribe = new Action1<S>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will trigger the addition of the unsubscription action in call()
.
b38f9dc
to
9a33b2d
Compare
See #3137 :) |
public static <S, T> SyncOnSubscribe<S, T> createWithState(Func0<S> generator, final Action2<S, Observer<? super T>> next) { | ||
Func2<S, Observer<? super T>, S> nextFunc = new Func2<S, Observer<? super T>, S>() { | ||
@Override | ||
public S call(S state, Observer<? super T> subscriber) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use parameter observer
rather than subscriber
now that the type has changed. This is in a few places.
4a791de
to
a2a38e0
Compare
Highlights of the recent changes includes:
I'm now happy with this pull request and believe it's ready to be merged. |
a2a38e0
to
ebd4afb
Compare
Looks good to me now, although I don't see how one could implement never() with it or a source that stops emitting but doesn't terminate. |
There is a test that implements never() behavior however it does not terminate where as the Observable.never() instance would simply terminate without setting a producer. |
Javadocs need some work here but i think that's the only thing remaining. |
This test times out: @Test(timeout = 1000)
public void testNever2() {
OnSubscribe<Integer> os = SyncOnSubscribe.createStateless(
new Action1<Observer<? super Integer>>() {
@Override
public void call(Observer<? super Integer> subscriber) {
}});
Observable.create(os).subscribe();
} |
Yes it does and I think that it should remain this way. It should be documented in the contract of using the
|
You could also extend the API and use some |
I cannot see a valid use case of That said I would rather not corrupt the api with a one-off concept of |
ebd4afb
to
f852f90
Compare
Javadoc is now updated. |
f852f90
to
8b5066a
Compare
8b5066a
to
26f14ea
Compare
26f14ea
to
ba7f910
Compare
👍 reviewed the code; looks good. |
Implementing the SyncOnSubscribe
This is useful for creating an observable that reads from a data source in a synchronous manner. This is a rewrite of the
AbstractOnSubscribe
experimental class based on the ideas in #3003.