The Sweet Spot
On software, engineering leadership, and anything shiny.

Lossless rate limiting with RxJS

Much of RxJS involves working with
backpressure - how to reconcile
streams that emit/process data at different rates, without overloading
the system. Much of that model is built with lossy handling in mind - it
makes sense that when your system is under duress, that you design your
streams to degrade gracefully (e.g. drop certain events, or rate limit
them by chunking into windows, etc).

However, there are times when it is appropriate to have a lossless
approach to backpressure - e.g., to store every chunk of data that comes
through a stream in memory, and not drop things. These use cases may
come about when:

  • You have a short-lived, or bounded set of data you know will come over
    the pipe. You understand the bounds of the data that will ever come over
    the pipe.
  • You have a processing script you want to run, which is not part of a
    large system.
  • You have a honkin’ large system that can handle the load.

In my case, I had a script that called the Google Geocoding API for a
set of GPS coordinates. Now for a set of several hundred coordinates, I
would end up calling the API several hundred times all at once with this
naive implementation:

1
2
3
4
// address$: [ "1234 Widget Way, Promiseland, WV" ] -- [...] -- [...]
const geocoded$ = addresses$
.flatMap(address => Rx.Observable.fromPromise(callGoogleGeocodingService(address)))
// geocoded$: [ { latitude: 89.99, longitude: 90.00, ... } ] -- [...] -- [...]

I searched all over for a lossless throttling mechanism, but all I could
find was references to RxJS’s lossy throttle behavior.

Other frameworks, like Bacon.js’s bufferingThrottle() and Highland.js ratelimit() seemed attractive. Where was RxJS’s equivalent?

Thanks to a helpful StackOverflow post,
I found the answer: the use of
concatMap()
and
delay()
forces the incoming stream to execute serially over artificial time delayed streams.

1
2
3
const geocoded$ = addresses$
.concatMap(address => Rx.Observable.just(address).delay(TIME_INTERVAL))
.flatMap(address => Rx.Observable.fromPromise(callGoogleGeocodingService(address)))

Thanks to:

  • http://stackoverflow.com/questions/34955842/rate-limiting-http-calls-made-by-rxjs
  • http://stackoverflow.com/questions/30876361/rxjs-rate-limit-requests-per-second-and-concurrency?rq=1

Comments