The Sweet Spot
On software, engineering leadership, and anything shiny.

Partitioning RxJS streams: adventures in nested Observables with groupBy() and flatMap()

One of the confusing aspects about working with streams is diving into Rx operators that take a stream and fan out into multiple streams.

Is your head exploding yet?

The problem:

Let’s dive into a problem I ran into while working on a personal project:

The task at hand is to take a list of GPS moving point data and partition the group data into multiple clusters of points, count up each group, then return the aggregate stats. As a cyclist is moving, I want to know how often they are moving at that specific velocity (speed).

Our weapon of choice is the RxJS groupBy() function, which groups like stream values based on a key value you define.

Image of groupBy() at work, with marbles.

OK. Easy enough. So my implementation looked something like this:

1
2
gpsPointStream
.groupBy((point) => point.velocity)

The supplied (point) => point.velocity function determines the key value for the supplied event, which then 1) creates a new Observable sequence for that specific key value, if it doesn’t exist, or 2) assigns your event to an existing Observable sequence.

Let’s illustrate:

1
2
src:     -- { velocity: 0 } ---------- { velocity: 0.1 } ----------------------------------- { velocity: 0 } -->
groupBy: -- [{ Observable key: 0 }] -- [ { Observable key: 0 }, { Observable key: 0.1 } ] -- [ { Observable key: 0 count: 2 }, { Observable key: 0.1 } ] -->

Never fear, flatMap() to the rescue.

So the story turns to our hero flatMap(), which as it turns out is specifically tuned to deal with issues of dealing with multiple streams.

Marble diagram for flatMap

flatMap will take a supplied function as its argument, which is the operation to apply to each argument within the supplied stream.

1
2
3
4
5
6
gpsPointStream
.groupBy((point) => point.velocity)
.flatMap((group) => {
  return group.scan((h, v) => h + 1, 0)
              .zip(Observable.just(group.key))
});
1
2
3
src:     -- { velocity: 0 } ---------- { velocity: 0.1 } ----------------------------------- { velocity: 0 } ---->
groupBy: -- [{ Observable key: 0 }] -- [ { Observable key: 0 }, { Observable key: 0.1 } ] -- [ { Observable key: 0 count: 2 }, { Observable key: 0.1 } ] -->
flatMap: -- [ 1, 0 ] ----------------- [ 1, 0.1 ] ------------------------------------------ [ 2, 0 ] -->

What just happened here?

I specified a merging function for the flatMap() stream, which performed the scan() counting aggregation on my group before merging the stream back into the main stream. I threw in a zip, which annotated my aggregate count value with a record of the group key (velocity) that this value was computed for.

Compare it to imperative

The equivalent of groupBy/flatMap in imperative programming is, quite literally, just _.groupBy() and _.flatMap(). With a few key differences. Here it is in lodash:

1
2
3
4
5
6
7
8
9
10
11
12
var grouped = _([ { velocity: 0 }, { velocity: 0.1 }, { velocity: 0 } ])
.groupBy((point) => point.velocity)

grouped.value()
// { 0: [ { velocity: 0 }, { velocity: 0 } ], 0.1: [ { velocity: 0.1 } ] }

var flatmapped = grouped.flatMap((v, k) => {
  return [ [v.length, k] ]
  })

flatmapped.value()
// [[2, "0"], [1, "0.1"]]

So in the end, the end result was the same with one crucial difference - our Observable, reactive version was able to take intermediate accounts into time and perform an intermediate calculation as data was flowing in. This allowed us to generate an intermediate count for the “0” velocity group.

Takeaways

  • When you want to fan out a stream into groups or partitions based on a specific stream value, turn to groupBy.
  • When you have a need to combine a stream-of-streams, you want to look at flatMap. You may also consider looking at concatMap, a close cousin of flatMap.
  • Reactive programming gives you more expressive abilities to reason about time and event ordering. You just have to tilt your head a little bit.

Further reading:

  • http://blogs.microsoft.co.il/iblogger/2015/08/11/animations-of-rx-operators-groupby/

Update: 2016/03/22

Updated typo where the index variable on a GroupedObservable was changed to correctly be key.

Comments