GroupBy

divide an Observable into a set of Observables that each emit a different subset of items from the original Observable

GroupBy

The GroupBy operator divides an Observable that emits items into an Observable that emits Observables, each one of which emits some subset of the items from the original source Observable. Which items end up on which Observable is typically decided by a discriminating function that evaluates each item and assigns it a key. All items with the same key are emitted by the same Observable.

See Also

Language-Specific Information:

groupBy

RxGroovy implements the groupBy operator. The Observable it returns emits items of a particular subclass of Observable — the GroupedObservable. Objects that implement the GroupedObservable interface have an additional method — getkey — by which you can retrieve the key by which items were designated for this particular GroupedObservable.

The following sample code uses groupBy to transform a list of numbers into two lists, grouped by whether or not the numbers are even:

Sample Code

def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);
def groupFunc = { return(0 == (it % 2)); };

numbers.groupBy(groupFunc).flatMap({ it.reduce([it.getKey()], {a, b -> a << b}) }).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
[false, 1, 3, 5, 7, 9]
[true, 2, 4, 6, 8]
Sequence complete

Another version of groupBy allows you to pass in a transformative function that changes the elements before they are emitted by the resulting GroupedObservables.

Note that when groupBy splits up the source Observable into an Observable that emits GroupedObservables, each of these GroupedObservables begins to buffer the items that it will emit upon subscription. For this reason, if you ignore any of these GroupedObservables (you neither subscribe to it or apply an operator to it that subscribes to it), this buffer will present a potential memory leak. For this reason, rather than ignoring a GroupedObservable that you have no interest in observing, you should instead apply an operator like take(0) to it as a way of signalling to it that it may discard its buffer.

If you unsubscribe from one of the GroupedObservables, or if an operator like take that you apply to the GroupedObservable unsubscribes from it, that GroupedObservable will be terminated. If the source Observable later emits an item whose key matches the GroupedObservable that was terminated in this way, groupBy will create and emit a new GroupedObservable to match the key. In other words, unsubscribing from a GroupedObservable will not cause groupBy to swallow items from its group. For example, see the following code:

Sample Code

Observable.range(1,5)
          .groupBy({ 0 })
          .flatMap({ this.take(1) })
          .subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
1
2
3
4
5

In the above code, the source Observable emits the sequence { 1 2 3 4 5 }. When it emits the first item in this sequence, the groupBy operator creates and emits a GroupedObservable with the key of 0. The flatMap operator applies the take(1) operator to that GroupedObservable, which gives it the item (1) that it emits and that also unsubscribes from the GroupedObservable, which is terminated. When the source Observable emits the second item in its sequence, the groupBy operator creates and emits a second GroupedObservable with the same key (0) to replace the one that was terminated. flatMap again applies take(1) to this new GroupedObservable to retrieve the new item to emit (2) and to unsubscribe from and terminate the GroupedObservable, and this process repeats for the remaining items in the source sequence.

groupBy does not by default operate on any particular Scheduler.

groupBy

RxJava implements the groupBy operator. The Observable it returns emits items of a particular subclass of Observable — the GroupedObservable. Objects that implement the GroupedObservable interface have an additional method — getkey — by which you can retrieve the key by which items were designated for this particular GroupedObservable.

Another version of groupBy allows you to pass in a transformative function that changes the elements before they are emitted by the resulting GroupedObservables.

Note that when groupBy splits up the source Observable into an Observable that emits GroupedObservables, each of these GroupedObservables begins to buffer the items that it will emit upon subscription. For this reason, if you ignore any of these GroupedObservables (you neither subscribe to it or apply an operator to it that subscribes to it), this buffer will present a potential memory leak. For this reason, rather than ignoring a GroupedObservable that you have no interest in observing, you should instead apply an operator like take(0) to it as a way of signalling to it that it may discard its buffer.

If you unsubscribe from one of the GroupedObservables, that GroupedObservable will be terminated. If the source Observable later emits an item whose key matches the GroupedObservable that was terminated in this way, groupBy will create and emit a new GroupedObservable to match the key.

groupBy does not by default operate on any particular Scheduler.

groupBy

RxJS implements groupBy. It takes one to three parameters:

  1. (required) a function that accepts an item from the source Observable and returns its key
  2. a function that accepts an item from the source Observable and returns an item to be emitted in its place by one of the resulting Observables
  3. a function used to compare two keys for identity (that is, whether items with two keys should be emitted on the same Observable)

Sample Code

var codes = [
    { keyCode: 38}, // up
    { keyCode: 38}, // up
    { keyCode: 40}, // down
    { keyCode: 40}, // down
    { keyCode: 37}, // left
    { keyCode: 39}, // right
    { keyCode: 37}, // left
    { keyCode: 39}, // right
    { keyCode: 66}, // b
    { keyCode: 65}  // a
];

var source = Rx.Observable.fromArray(codes)
    .groupBy(
        function (x) { return x.keyCode; },
        function (x) { return x.keyCode; });

var subscription = source.subscribe(
    function (obs) {
        // Print the count
        obs.count().subscribe(function (x) {
            console.log('Count: ' + x);
        });
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Count: 2
Count: 2
Count: 2
Count: 2
Count: 1
Count: 1
Completed

groupBy is found in each of the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.coincidence.js
groupByUntil

RxJS also implements groupByUntil. It monitors an additional Observable, and whenever that Observable emits an item, it closes any of the keyed Observables it has opened (it will open new ones if additional items are emitted by the source Observable that match the key). groupByUntil takes from two to four parameters:

  1. (required) a function that accepts an item from the source Observable and returns its key
  2. a function that accepts an item from the source Observable and returns an item to be emitted in its place by one of the resulting Observables
  3. (required) a function that returns an Observable, the emissions from which trigger the termination of any open Observables
  4. a function used to compare two keys for identity (that is, whether items with two keys should be emitted on the same Observable)

Sample Code

var codes = [
    { keyCode: 38}, // up
    { keyCode: 38}, // up
    { keyCode: 40}, // down
    { keyCode: 40}, // down
    { keyCode: 37}, // left
    { keyCode: 39}, // right
    { keyCode: 37}, // left
    { keyCode: 39}, // right
    { keyCode: 66}, // b
    { keyCode: 65}  // a
];

var source = Rx.Observable
    .for(codes, function (x) { return Rx.Observable.return(x).delay(1000); })
    .groupByUntil(
        function (x) { return x.keyCode; },
        function (x) { return x.keyCode; },
        function (x) { return Rx.Observable.timer(2000); });

var subscription = source.subscribe(
    function (obs) {
        // Print the count
        obs.count().subscribe(function (x) { console.log('Count: ' + x); });
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Count: 2
Count: 2
Count: 1
Count: 1
Count: 1
Count: 1
Count: 1
Count: 1
Completed

groupByUntil is found in each of the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.coincidence.js

RxPHP implements this operator as groupBy.

Groups the elements of an observable sequence according to a specified key selector function and comparer and selects the resulting elements by using a specified function.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupBy.php

$observable = \Rx\Observable::fromArray([21, 42, 21, 42, 21, 42]);
$observable
    ->groupBy(
        function ($elem) {
            if ($elem === 42) {
                return 0;
            }

            return 1;
        },
        null,
        function ($key) {
            return $key;
        }
    )
    ->subscribe(function ($groupedObserver) use ($createStdoutObserver) {
        $groupedObserver->subscribe($createStdoutObserver($groupedObserver->getKey() . ": "));
    });

   
1: Next value: 21
0: Next value: 42
1: Next value: 21
0: Next value: 42
1: Next value: 21
0: Next value: 42
1: Complete!
0: Complete!
    

RxPHP also has an operator groupByUntil.

Groups the elements of an observable sequence according to a specified key selector function and comparer and selects the resulting elements by using a specified function.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/groupBy/groupByUntil.php

$codes = [
    ['id' => 38],
    ['id' => 38],
    ['id' => 40],
    ['id' => 40],
    ['id' => 37],
    ['id' => 39],
    ['id' => 37],
    ['id' => 39],
    ['id' => 66],
    ['id' => 65]
];

$source = Rx\Observable
    ::fromArray($codes)
    ->concatMap(function ($x) {
        return \Rx\Observable::timer(100)->mapTo($x);
    })
    ->groupByUntil(
        function ($x) {
            return $x['id'];
        },
        function ($x) {
            return $x['id'];
        },
        function ($x) {
            return Rx\Observable::timer(200);
        });

$subscription = $source->subscribe(new CallbackObserver(
    function (\Rx\Observable $obs) {
        // Print the count
        $obs->count()->subscribe(new CallbackObserver(
            function ($x) {
                echo 'Count: ', $x, PHP_EOL;
            }));
    },
    function (Throwable $err) {
        echo 'Error', $err->getMessage(), PHP_EOL;
    },
    function () {
        echo 'Completed', PHP_EOL;
    }));


   
Count: 2
Count: 2
Count: 1
Count: 1
Count: 1
Count: 1
Count: 1
Count: 1
Completed
    

RxPHP also has an operator partition.

Returns two observables which partition the observations of the source by the given function. The first will trigger observations for those values for which the predicate returns true. The second will trigger observations for those values where the predicate returns false. The predicate is executed once for each subscribed observer. Both also propagate all error observations arising from the source and each completes when the source completes.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/partition/partition.php

list($evens, $odds) = \Rx\Observable::range(0, 10, \Rx\Scheduler::getImmediate())
    ->partition(function ($x) {
        return $x % 2 === 0;
    });

//Because we used the immediate scheduler with range, the subscriptions are not asynchronous.
$evens->subscribe($createStdoutObserver('Evens '));
$odds->subscribe($createStdoutObserver('Odds '));

   
Evens Next value: 0
Evens Next value: 2
Evens Next value: 4
Evens Next value: 6
Evens Next value: 8
Evens Complete!
Odds Next value: 1
Odds Next value: 3
Odds Next value: 5
Odds Next value: 7
Odds Next value: 9
Odds Complete!
    

TBD