Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing the SyncOnSubscribe #3118

Merged
merged 1 commit into from Sep 1, 2015

Conversation

stealthcode
Copy link

This is useful for creating an observable that reads from a data source in a synchronous manner. This is a rewrite of the AbstractOnSubscribe experimental class based on the ideas in #3003.

@akarnokd
Copy link
Member

You can't delete AbstractOnSubscribe because it is part of the public API and even though marked as experimental, such components can be deleted only at a major release such as 1.1.

*/
private static final class SubscriptionProducer<T> implements Producer {
private final AtomicLong requestCount = new AtomicLong(0);
private final AtomicLong sentCount = new AtomicLong(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sentCount appears to be accessed only in the emission loop so there is no need for an AtomicLong here or any other type besides a boolean since only 1 onNext is allowed ber callback.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'ren't sure how much synchronization we needed to insure that a bad implementation didn't call onNext concurrently. The idea was we could use something like this on line 255

long old = sendCount.get();
...
subscriber.onNext(value);
if (!sentCount.compareAndSet(old, old+1)) {
    throw new IllegalStateException("concurrent onNext detected");
}

instead of incrementAndGet() to detect that something bad was happening as soon as possible.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I've integrated this into my latest code.

@benjchristensen
Copy link
Member

Experimental APIs can be removed or change at any time, as per the README:

@experimental

APIs marked with the @experimental annotation at the class or method level will almost certainly change. They can be modified in any way, or even removed, at any time. You should not use or rely on them in any production code. They are purely to allow broad testing and feedback.

@akarnokd
Copy link
Member

I would have expected to make such APIs @deprecated for a few releases before completely removing it. Besides, this PR just gives an alternative to another @experimental class and could live side-by-side. Then users could decide/vote which to keep at the next major release.

@stealthcode
Copy link
Author

@akarnokd you have a good point that the efficacy of both implementations should be compared however I do not think that we should expose multiple alternatives in the public api. This ambiguous api leads to confusion and questions about the unexplained differences between one or the other implementation. The pull request should not be the place for debate either (since this should be about issues with THIS implementation). This comparison and debate was what I was hoping to get out of opening issue #3003. Would you like to open a separate issue for comparing the 2 implementations?

@stealthcode
Copy link
Author

The latest commit adds work stealing to manage unsubscribe to avoid concurrent modification of state while running onUnsubscribe concurrently with next (and a test to exercise), fixes issues with tabs/spaces, and adds javadocs.

@stealthcode stealthcode force-pushed the sync-on-subscribe branch 2 times, most recently from b02323a to 3cf886b Compare August 6, 2015 21:24
@stealthcode
Copy link
Author

The latest commit makes a few changes.

  • Added the AbstractOnSubscribe back into the code base. The @Experimental annotation states clearly that either of these implementations can be removed at any time so it's not necessary for anything to be removed now. It is up to the consumer when they chose to use a potentially unstable api.
  • Added JMH tests to compare performance against OnSubscribeFromIterable. The performance is impacted in astounding ways by the additional complexity of a cleanup function. This requires object initialization which the OnSubscribeFromIterable does not require.
  • General optimizations. Removing a volatile and reorganizing the request loop.

@davidmoten
Copy link
Collaborator

@stealthcode can you give us more detail on the JMH comparison?

@stealthcode
Copy link
Author

Sure, sorry for leaving that out. Here's the JMH report output

$ ./gradlew clean benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*SyncOnSubscribePerf.*'
Benchmark                                            (size)   Mode   Samples        Score  Score error    Units
r.o.SyncOnSubscribePerf.benchAbstractOnSubscribe          1  thrpt         5 10345304.158   167899.030    ops/s
r.o.SyncOnSubscribePerf.benchAbstractOnSubscribe2         1  thrpt         5 10315525.503   519004.209    ops/s
r.o.SyncOnSubscribePerf.benchAbstractOnSubscribe2      1000  thrpt         5 10080651.937  1249022.786    ops/s
r.o.SyncOnSubscribePerf.benchAbstractOnSubscribe2   1000000  thrpt         5  9676927.741   799435.391    ops/s
r.o.SyncOnSubscribePerf.benchFromIterable                 1  thrpt         5 245301608.512 25693364.239    ops/s
r.o.SyncOnSubscribePerf.benchFromIterable2                1  thrpt         5 261726979.345 17122836.564    ops/s
r.o.SyncOnSubscribePerf.benchFromIterable2             1000  thrpt         5 256818323.006 17992719.065    ops/s
r.o.SyncOnSubscribePerf.benchFromIterable2          1000000  thrpt         5 259483656.124 12863102.025    ops/s
r.o.SyncOnSubscribePerf.benchSyncOnSubscribe              1  thrpt         5 12770780.794   146195.400    ops/s
r.o.SyncOnSubscribePerf.benchSyncOnSubscribe2             1  thrpt         5 12306631.098  2072808.101    ops/s
r.o.SyncOnSubscribePerf.benchSyncOnSubscribe2          1000  thrpt         5 12326658.468   682619.251    ops/s
r.o.SyncOnSubscribePerf.benchSyncOnSubscribe2       1000000  thrpt         5 12519473.508   365884.932    ops/s

Under Flight Recorder analysis, both the AOS and SOS suffer from the cost of creating a SubscriptionList and allocating memory where as a the OnSubscribeFromIterable does not require adding an unsubscribe action so doesn't need to allocate. I plan on testing some changes to get around this bottle neck. Will submit a pull request soon. Hopefully this upcoming change will be a private change and not have to wait until 2.x.

@Benchmark
// @Group("single")
public void benchSyncOnSubscribe(final SingleInput input) {
createSyncOnSubscribe(input.iterator).call(input.newSubscriber());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this is that the iterator gets consumed once and you then benchmark how fast an empty iterator comes to completion through the operator.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see Thanks @akarnokd ! I'm correcting and re-running now.

@stealthcode
Copy link
Author

Here are the updated benchmarks. This paints a very different picture than last time. The performance of the SyncOnSubscribe is now ~25% slower than OnSubscribeFromIterable for small iterators and ~17% slower for large iterators. The bottle neck seems to be in the SyncOnSubscribe$SubscriptionProducer.request(long) method.

# Run complete. Total time: 00:02:32

Benchmark                                            (size)   Mode   Samples        Score  Score error    Units
r.o.SyncOnSubscribePerf.benchAbstractOnSubscribe          1  thrpt         5  8057762.938   350532.503    ops/s
r.o.SyncOnSubscribePerf.benchAbstractOnSubscribe2         1  thrpt         5  8150650.148   174362.683    ops/s
r.o.SyncOnSubscribePerf.benchAbstractOnSubscribe2      1000  thrpt         5    41560.383      737.346    ops/s
r.o.SyncOnSubscribePerf.benchAbstractOnSubscribe2   1000000  thrpt         5       41.237        8.422    ops/s
r.o.SyncOnSubscribePerf.benchFromIterable                 1  thrpt         5 19134647.666   420991.146    ops/s
r.o.SyncOnSubscribePerf.benchFromIterable2                1  thrpt         5 18720840.805   327775.879    ops/s
r.o.SyncOnSubscribePerf.benchFromIterable2             1000  thrpt         5   183561.551     5244.489    ops/s
r.o.SyncOnSubscribePerf.benchFromIterable2          1000000  thrpt         5      184.511        4.020    ops/s
r.o.SyncOnSubscribePerf.benchSyncOnSubscribe              1  thrpt         5 11358568.468   298560.809    ops/s
r.o.SyncOnSubscribePerf.benchSyncOnSubscribe2             1  thrpt         5 11367669.868   133428.078    ops/s
r.o.SyncOnSubscribePerf.benchSyncOnSubscribe2          1000  thrpt         5   132173.869     2670.427    ops/s
r.o.SyncOnSubscribePerf.benchSyncOnSubscribe2       1000000  thrpt         5      130.224        3.425    ops/s

@akarnokd
Copy link
Member

akarnokd commented Aug 7, 2015

Here is a version where the integer stream is freshly created:

image

(i7 4770K, Windows 7 x64, Java 8u51)

AbstractOnSubscribe (AOS) has the highest overhead because of resource checks on each emission

AbstractProducer (AP from #2813) offers amortized resource check overhead due to the inner loop but this loop was not particularly optimized with fast-path and/or amortized request accounting.

FromIterable (FI) doesn't care about resources and has fast-path.

SyncOnSubscribe (SO) in this PR is close but 33% behind FI. I've made some optimizations to the fast-path and got a much better result for longer sources:

13969673,045   263281,521    ops/s
  237194,622     1475,414    ops/s
     226,248       11,892    ops/s

One must be careful around volatile reads because it forces the re-read of instance variables after them.

@stealthcode
Copy link
Author

Nice, I'm merging your gist. This optimization makes the fast path on-par with FromIterable in the long tail (still however ~25% slower in the 1's - 10's range).

* Used to thread the state synchronously to every iteration of the
* {@link #next(Object, Subscriber) next} function.
*/
private S state;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state should be per subscriber and not shared between them.

@stealthcode
Copy link
Author

I spoke too soon. In running the unit tests there are now problems with the diff applied. I'll try to incorporate what I can and test to make sure the changes are more performant.

@akarnokd
Copy link
Member

akarnokd commented Aug 7, 2015

Sorry, I didn't run any test so my changes were only valid for the perf usage and not general. Here is a version which passes the tests and has the following performance characteristics:

Benchmark                     (size)   Mode   Samples        Score  Score error    Units
benchSyncOnSubscribe2Fresh         1  thrpt         5 18314010,215    78882,829    ops/s
benchSyncOnSubscribe2Fresh      1000  thrpt         5   223973,659     2605,564    ops/s
benchSyncOnSubscribe2Fresh   1000000  thrpt         5      228,057        1,400    ops/s

For size = 1, I think the extra allocation of the facade Subscriber makes it twice the overhead. The onNext safeguard also costs ~15%:

Benchmark                     (size)   Mode   Samples        Score  Score error    Units
benchSyncOnSubscribe2Fresh         1  thrpt         5 18356265,548   512130,369    ops/s
benchSyncOnSubscribe2Fresh      1000  thrpt         5   268613,838     7801,208    ops/s
benchSyncOnSubscribe2Fresh   1000000  thrpt         5      274,654        7,522    ops/s

Edit:
If I change the API to Observer and saving the facade allocation, I get this:

Benchmark                     (size)   Mode   Samples        Score  Score error    Units
benchSyncOnSubscribe2Fresh         1  thrpt         5 19915549,079   499085,570    ops/s
benchSyncOnSubscribe2Fresh      1000  thrpt         5   243045,859     4557,930    ops/s
benchSyncOnSubscribe2Fresh   1000000  thrpt         5      201,082      124,888    ops/s

The 1M case becomes quite varying, perhaps due to the huge amount of Integer garbage or the change in what the JIT inlines.

@stealthcode stealthcode force-pushed the sync-on-subscribe branch 2 times, most recently from 35ea407 to b84884b Compare August 7, 2015 22:37
@stealthcode
Copy link
Author

@akarnokd your changes are pretty solid. I can't argue with results. Here are the benchmarks for your changes alone.

Benchmark                                        (size)   Mode   Samples        Score  Score error    Units
r.o.SyncOnSubscribePerf.benchFromIterable2            1  thrpt         5 17907742.680  1458787.396    ops/s
r.o.SyncOnSubscribePerf.benchFromIterable2         1000  thrpt         5   181328.434     3485.931    ops/s
r.o.SyncOnSubscribePerf.benchFromIterable2      1000000  thrpt         5      181.733        9.492    ops/s
r.o.SyncOnSubscribePerf.benchSyncOnSubscribe2         1  thrpt         5 14414752.664   572789.234    ops/s
r.o.SyncOnSubscribePerf.benchSyncOnSubscribe2      1000  thrpt         5   182578.431    21732.491    ops/s
r.o.SyncOnSubscribePerf.benchSyncOnSubscribe2   1000000  thrpt         5      186.404        6.581    ops/s

I implemented a small api change to expose a setter on an unsubscribe Action1<S>. With this change we can check if there's no unsubscribe (and avoid a SubscriptionList allocation).

Benchmark                                        (size)   Mode   Samples        Score  Score error    Units
r.o.SyncOnSubscribePerf.benchFromIterable2            1  thrpt         5 17832090.877  3436213.149    ops/s
r.o.SyncOnSubscribePerf.benchFromIterable2         1000  thrpt         5   180643.547    10720.476    ops/s
r.o.SyncOnSubscribePerf.benchFromIterable2      1000000  thrpt         5      179.589       20.317    ops/s
r.o.SyncOnSubscribePerf.benchSyncOnSubscribe2         1  thrpt         5 17809008.594   478600.206    ops/s
r.o.SyncOnSubscribePerf.benchSyncOnSubscribe2      1000  thrpt         5   181643.455    45353.810    ops/s
r.o.SyncOnSubscribePerf.benchSyncOnSubscribe2   1000000  thrpt         5      183.044        7.280    ops/s

With this change we are now faster than OnSubscribeFromIterable in the short and long cases (assuming we don't have to cleanup with unsubscribe).

return state;
}
};
Action1<S> onUnsubscribe = new Action1<S>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will trigger the addition of the unsubscription action in call().

@akarnokd
Copy link
Member

akarnokd commented Aug 8, 2015

With this change we are now faster than OnSubscribeFromIterable

See #3137 :)

public static <S, T> SyncOnSubscribe<S, T> createWithState(Func0<S> generator, final Action2<S, Observer<? super T>> next) {
Func2<S, Observer<? super T>, S> nextFunc = new Func2<S, Observer<? super T>, S>() {
@Override
public S call(S state, Observer<? super T> subscriber) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use parameter observer rather than subscriber now that the type has changed. This is in a few places.

@stealthcode stealthcode force-pushed the sync-on-subscribe branch 2 times, most recently from 4a791de to a2a38e0 Compare August 19, 2015 22:42
@stealthcode
Copy link
Author

Highlights of the recent changes includes:

  • removing the weird onUnsubscribe(Action1) method on SyncOnSubscribe.
  • added new tests and fixed generics issues (not enough ?s).
  • added Blackhole.consumeCpu() to perf tests in order to force the micro benchmarks to artificially reduce volatile write contention.

I'm now happy with this pull request and believe it's ready to be merged.

@akarnokd
Copy link
Member

Looks good to me now, although I don't see how one could implement never() with it or a source that stops emitting but doesn't terminate.

@stealthcode
Copy link
Author

There is a test that implements never() behavior however it does not terminate where as the Observable.never() instance would simply terminate without setting a producer.

@stealthcode
Copy link
Author

Javadocs need some work here but i think that's the only thing remaining.

@akarnokd
Copy link
Member

This test times out:

    @Test(timeout = 1000)
    public void testNever2() {
        OnSubscribe<Integer> os = SyncOnSubscribe.createStateless(
        new Action1<Observer<? super Integer>>() {
            @Override
            public void call(Observer<? super Integer> subscriber) {

            }});
        Observable.create(os).subscribe();
    }

@stealthcode
Copy link
Author

Yes it does and I think that it should remain this way. It should be documented in the contract of using the SyncOnSubscribe that your next function will not terminate if you do not eventually call the observer.onCompleted() or observer.onError(e). Its similar to an Observable.create call with an OnSubscribe that synchronously loops over each n for all requeqst(n). This would only terminate when...

  • the consumer is done (request 0)
  • the producer declares that its done (via terminal event)
  • an error occurs or subscription is otherwise unsubscribed (take)

@akarnokd
Copy link
Member

You could also extend the API and use some ObserverEx that implements Observer plus a stop() method. Then document that if you whish to not emit any further items but don't want to terminate the stream, call stop().

@stealthcode
Copy link
Author

I cannot see a valid use case of Observable.never() aside from merging in with an asynchronous hot observable such as Observable.combineLatest(someFuncThatMayReturnNever(foo), hotObs). It seems like a tool to force merging observables in ways that obviate the need to merge at all. Are there use cases that I am missing?

That said I would rather not corrupt the api with a one-off concept of stop() without good reason. It's important that we get the API correct (even in @experimental).

@stealthcode
Copy link
Author

Javadoc is now updated.

@abersnaze
Copy link
Contributor

👍 reviewed the code; looks good.

stealthcode pushed a commit that referenced this pull request Sep 1, 2015
@stealthcode stealthcode merged commit 1682f64 into ReactiveX:1.x Sep 1, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants