The Sweet Spot
On software, engineering leadership, and anything shiny.

Lossless rate limiting with RxJS

Much of RxJS involves working with backpressure - how to reconcile streams that emit/process data at different rates, without overloading the system. Much of that model is built with lossy handling in mind - it makes sense that when your system is under duress, that you design your streams to degrade gracefully (e.g. drop certain events, or rate limit them by chunking into windows, etc).

However, there are times when it is appropriate to have a lossless approach to backpressure - e.g., to store every chunk of data that comes through a stream in memory, and not drop things. These use cases may come about when:

  • You have a short-lived, or bounded set of data you know will come over the pipe. You understand the bounds of the data that will ever come over the pipe.
  • You have a processing script you want to run, which is not part of a large system.
  • You have a honkin’ large system that can handle the load.

In my case, I had a script that called the Google Geocoding API for a set of GPS coordinates. Now for a set of several hundred coordinates, I would end up calling the API several hundred times all at once with this naive implementation:

1
2
3
4
// address$: [ "1234 Widget Way, Promiseland, WV" ] -- [...] -- [...]
const geocoded$ = addresses$
.flatMap(address => Rx.Observable.fromPromise(callGoogleGeocodingService(address)))
// geocoded$: [ { latitude: 89.99, longitude: 90.00, ... } ] -- [...] -- [...]

I searched all over for a lossless throttling mechanism, but all I could find was references to RxJS’s lossy throttle behavior.

Other frameworks, like Bacon.js’s bufferingThrottle() and Highland.js ratelimit() seemed attractive. Where was RxJS’s equivalent?

Thanks to a helpful StackOverflow post, I found the answer: the use of concatMap() and delay() forces the incoming stream to execute serially over artificial time delayed streams.

1
2
3
const geocoded$ = addresses$
.concatMap(address => Rx.Observable.just(address).delay(TIME_INTERVAL))
.flatMap(address => Rx.Observable.fromPromise(callGoogleGeocodingService(address)))

Thanks to:

  • http://stackoverflow.com/questions/34955842/rate-limiting-http-calls-made-by-rxjs
  • http://stackoverflow.com/questions/30876361/rxjs-rate-limit-requests-per-second-and-concurrency?rq=1

Partitioning RxJS streams: adventures in nested Observables with groupBy() and flatMap()

One of the confusing aspects about working with streams is diving into Rx operators that take a stream and fan out into multiple streams.

Is your head exploding yet?

The problem:

Let’s dive into a problem I ran into while working on a personal project:

The task at hand is to take a list of GPS moving point data and partition the group data into multiple clusters of points, count up each group, then return the aggregate stats. As a cyclist is moving, I want to know how often they are moving at that specific velocity (speed).

Our weapon of choice is the RxJS groupBy() function, which groups like stream values based on a key value you define.

Image of groupBy() at work, with marbles.

OK. Easy enough. So my implementation looked something like this:

1
2
gpsPointStream
.groupBy((point) => point.velocity)

The supplied (point) => point.velocity function determines the key value for the supplied event, which then 1) creates a new Observable sequence for that specific key value, if it doesn’t exist, or 2) assigns your event to an existing Observable sequence.

Let’s illustrate:

1
2
src:     -- { velocity: 0 } ---------- { velocity: 0.1 } ----------------------------------- { velocity: 0 } -->
groupBy: -- [{ Observable key: 0 }] -- [ { Observable key: 0 }, { Observable key: 0.1 } ] -- [ { Observable key: 0 count: 2 }, { Observable key: 0.1 } ] -->

Never fear, flatMap() to the rescue.

So the story turns to our hero flatMap(), which as it turns out is specifically tuned to deal with issues of dealing with multiple streams.

Marble diagram for flatMap

flatMap will take a supplied function as its argument, which is the operation to apply to each argument within the supplied stream.

1
2
3
4
5
6
gpsPointStream
.groupBy((point) => point.velocity)
.flatMap((group) => {
  return group.scan((h, v) => h + 1, 0)
              .zip(Observable.just(group.key))
});
1
2
3
src:     -- { velocity: 0 } ---------- { velocity: 0.1 } ----------------------------------- { velocity: 0 } ---->
groupBy: -- [{ Observable key: 0 }] -- [ { Observable key: 0 }, { Observable key: 0.1 } ] -- [ { Observable key: 0 count: 2 }, { Observable key: 0.1 } ] -->
flatMap: -- [ 1, 0 ] ----------------- [ 1, 0.1 ] ------------------------------------------ [ 2, 0 ] -->

What just happened here?

I specified a merging function for the flatMap() stream, which performed the scan() counting aggregation on my group before merging the stream back into the main stream. I threw in a zip, which annotated my aggregate count value with a record of the group key (velocity) that this value was computed for.

Compare it to imperative

The equivalent of groupBy/flatMap in imperative programming is, quite literally, just _.groupBy() and _.flatMap(). With a few key differences. Here it is in lodash:

1
2
3
4
5
6
7
8
9
10
11
12
var grouped = _([ { velocity: 0 }, { velocity: 0.1 }, { velocity: 0 } ])
.groupBy((point) => point.velocity)

grouped.value()
// { 0: [ { velocity: 0 }, { velocity: 0 } ], 0.1: [ { velocity: 0.1 } ] }

var flatmapped = grouped.flatMap((v, k) => {
  return [ [v.length, k] ]
  })

flatmapped.value()
// [[2, "0"], [1, "0.1"]]

So in the end, the end result was the same with one crucial difference - our Observable, reactive version was able to take intermediate accounts into time and perform an intermediate calculation as data was flowing in. This allowed us to generate an intermediate count for the “0” velocity group.

Takeaways

  • When you want to fan out a stream into groups or partitions based on a specific stream value, turn to groupBy.
  • When you have a need to combine a stream-of-streams, you want to look at flatMap. You may also consider looking at concatMap, a close cousin of flatMap.
  • Reactive programming gives you more expressive abilities to reason about time and event ordering. You just have to tilt your head a little bit.

Further reading:

  • http://blogs.microsoft.co.il/iblogger/2015/08/11/animations-of-rx-operators-groupby/

Update: 2016/03/22

Updated typo where the index variable on a GroupedObservable was changed to correctly be key.

Strange Loop 2015: Notes & Reflections

Going to Strange Loop was a huge check off my conference bucket list (lanyard?). I’d always heard about this slightly-weird, highly academic collision between academia and industry, skewing toward programming languages you haven’t heard of (or, at the very least, you’ve never used in production). I anticipated sitting at the feet of gray-haired wizards and bright-eyed hipsters with Ph.Ds.

The conference did not disappoint. And it was not quite what I expected-I less sat at the feet of geniuses than I did talk with them, peer-to-peer, about topics of interest. All around me people were saying “Don’t be afraid to ask questions. Don’t feel stupid - nobody knows everything.” Speakers were tweeting about how much they were learning. It was comforting, because lots of topics I had come to see were those in which I had no. freakin. clue. about.

The following is culled from my notes from different sessions I attended. I will focus on brevity. I will keep it clear. Here we go:

Opening Keynote: “I see what you mean” - Peter Alvaro

  • Instructions, behaviors & outcomes.
  • It “feels good” to write in C (a hardcore 1000 liner)
  • But a declarative program (e.g. SQL) works well, but is harder to come
    up with.
  • The declarative world - as described in the work done in Datalog
  • How can we take concepts from Datalog and apply to real-world
    resources like network actors (distributed systems)?
  • It becomes easier to model these systems declaratively when we
    explicitly capture time.
  • Enter Dedalus: extension to Datalog where time is a modeling
    construct.
  • (Show off usage of @next and @async annotations
  • Computation is redezvous - the only thing that you know is what YOU
    know at that point in time.
  • Takeaway: Abstractions leak. Model them better (e.g. with time)
  • Inventing languages is dope.

Have your Causality and your Wall Clocks Too (Jon Moore)

  • Take concept of Lamport clocks and extend them with hybrid clocks.
  • And extend them one further with: Distributed Monotonic Clocks
  • These DMCs use population protocol (flocking) to each actor in the
    system communicate with another, updating their source of truth to
    eventually agree on a media time w/in the group
  • DMC components:
    1. Have a reset button by adding epoch bit
    2. Use flocking (via population protocol) to avoid resets
    3. Accomodates for some clockless nodes
    4. Explicitly reflects causality

Building Isomorphic Web Apps with React - Elyse Gordon

  • Vevo needed better SEO for SPAs. Old soln was to snapshot page and upload to S3.
  • Beneficial for SEO crawlers
  • React in frontend. Node in backend.
  • Vevo-developed pellet project as Flux-like framework to organize
    files.
  • Webpack aliases/shims
  • Server hands off to browser, bootstraps React in client.
  • Alternatives: Relay, Ember

Designing for the Worst Case: Peter Bailis (@pbailis)

  • Designing for worst case often penalizes average case
  • But what if designing for the worst case actually helps avg case?
  • Examples from dstbd systems:
    • Worst case of disconnected data centers, packet loss/link loss. Fix by introducing coordination-free protocols. Boom, you’ve now made your network more scalable, performant, resistent to downtime.
    • Worst case: hard to coordinate a distributed transaction between services. What do you do? You implement something like buffered writes out of process.
      • CRDT, RAMP, HAT, bloom
      • Suddenly, you have fault tolerance
    • Tail latency problem in microservices: the more microservices you query, the higher the probability of hitting a slow server response.
      • Your service’s corner case is your user’s average case
    • HCI: accessibility guidelines in W3C lift standards for all. Make webpages easier to navigate. Side effect of better page performance, higher conversion.
    • Netflix designing CC subtitles also benefits other users.
    • Curb cuts in the real world to help ADA/mobility-assisted folks also benefit normal folks too
  • Best has pitfalls too: your notion of best may be hard to hit, or risky. You may want to optimize for “stable” solution. (Robust optimization)
  • When to design for worst case?
    • common corner cases
    • environmental conditions vary
    • “normal” isn’t normal
  • worst forces a conversation
    • how do we plan for failures?
    • what is our scale-out strategy?
    • how do we audit failures? data breaches?

Ideology by Gary Bernardt

  • Rumsfeld: known knowns, known unknowns, and unknown unknowns.
  • Ideology is the thing you know you do not know you know
  • Conflict between typed vs dynamic programmers:
    • Typed: “I don’t need tests, I have types”
    • Dynamic: “I write tests, so I don’t need types”
  • In reality, they are solving different places in the problem domain, but they have different beliefs about the world that are hidden in the shadows:
    • Typed: “Correctness comes solely from types”
    • Dynamic: “Correctness comes solely from example”
  • “I need nulls” -> You believe nulls are the only way to represent absence
  • “Immutable data structures are slow” -> You believe all immutable types are slow
  • “GC is impractical” -> you believe GC algorithms won’t get faster.
  • Read CSE 341 Type systems, Dan Grossman

Building Scalable, Stateful Services: Caitlin McCaffrey

Sticky connection: always talk to the same machine

Building sticky connections:
- persistent connections (load balancing cannot rebalance server)
- implement backpressure (d/c connection)

dynamic cluster membership

  • gossip protocols -> availability
  • consensus systems -> consistency
    (everybody needs to have the same worldview.

work distribution:

random:
  • write anywhere, read from all
consistent hashing: on session ID

hash space -> node
dynamoDB, Manhattan

con: can have hotspots, could have uneven distribution of resources cannot move work.

distributed hash table

statefully store hash

Real world

Scuba (Facebook)
- distributed in-memory DB

Ringpop (Uber)
- Node.js swim gossip protocol, consistent hashing

Orleans (MS Research)
- actor model
- gossip
- consistent hash
- distributed hashtable

Idalin “Abby” Bobé: From Protesting to Programming: Becoming a Tech Activist

  • Tech to resist exploitation
  • Technologists as activists
  • Idalin Bobé -> Changed name to “Abby” to get a job.
  • Pastor Jenkins - magnifying glass vs paper
  • Philadelphia Partnership Program:
    • 1st to college
    • work <> school
  • Difficult to balance.
  • Mills MBA, CS
  • Joined Black Girls Code
    • Apply technology in the right way
  • Ferguson happened
    • Thoughtworkers joined on the ground
    • Hands Up United: www.handsupunited.org
  • “Do not be led by digital metrics” - even though the activists had digital tooling, the tools were being used against activists. Phone calls, chats monitored. Movement tracked.
  • New group starting up in St. Louis called “Ray Clark, Sr.” - named after a black man who played a strong role in the founding of Silicon Valley.
  • 21st century technologists need 21st century skillsets.
  • Dream Defenders
  • “it is our duty to fight for our freedom/it is our duty to win/we must love and support one another/we have nothing to lose but our chains”

Notes on performance tuning a Puma server

A couple of months ago, I was tuning a Rails app for one of our clients.
This client wanted to know how performant their app would be under load.

To do that, you can do several different things:

  1. Tune the thread/process balance within the VM
  2. Horizontally scale with your cloud platform.

This is a discussion of the former (#1):

1) Set up the test

Drive with a synthetic script

Our application had a synthetic load driver that would run Selenium to
execute various app tasks. This synthetic driver could be parallelized
across many notes via Rainforest QA, Sauce Labs or Browserify.

In our case, I only needed to run our synthetic load script on a single
node in multiple processes, which simulated enough load to anticipate
another order of magnitude of traffic.

Know how to inspect the server under load.

Commands you will want to know:

$ free -m # Find the total amount of free memory on your machine
$ ps uH p <pid> # List out process threads
$ kill -TTIN <puma_master_pid> # Add a puma worker
$ kill -TTOU <puma_master_pid> # Remove a puma worker
$ kill -USR2 <puma_master_pid> # Kill the puma master & workers

Generating more load: use external load testing services, or plain tools.

Try using Flood.io or JMeter for performance load.

I tried looking into the puma_auto_tune gem, but it required a higher level of production instrumentation than I was ready to give it.

Analysis: New Relic scalability analysis

New Relic gave us a scalability analysis scatter plot, plotting
throughput against average application response time. In essence, it
allows you to see spikes in response times as correlated to throughput.

Process:

My approach was to use the synthetic script to generate productionlike
node and ramp up the # of load actors in 5m increments. Each run would
test the following Puma process/thread balance:

Run #1: Single-process, multi threads.
Run #2: Multiple processes, single threaded.
Run #3: Multiple processes, multiple threads.

Aside: how many of these threads/processes should I be using?

Note that your numbers will be different on the execution
characteristics of your app and your server environment. Tweak it for
yourself. You’re designing an experiment.

If you’re curious, our Rails app started out with 4 threads on 2
workers. We made the # of Puma workers (both min and max) environment
variables so we could tweak the variables easily without deploying.

The strategy was then to look at the perf characteristics of each run in
the scatter plot. If there were any spikes in the graph with the
increase of load, then that would be noted. Even minor features like an
increase in slope would be noted - at that point, the incremental cost
of each request increases with overall system load.

Results

I don’t have the New Relic data on hand to show, now, but in our case we
discovered two things:

  1. The server easily scaled from ~10 -> ~500 rpm with a virtually flat
    line for all runs.
  2. The app exhibited no noticeable performance differences when flipped
    between uniprocess-multithreaded, multiprocess-unithreaded, and
    multiprocess-multithreaded modes. Any performance gains were under a
    tolerable threshold.

How do we parse these results?

  • We note that we didn’t really push the performance threshold on this
    app (it’s not meant to be a public web site and 95% of it is behind a
    login wall to a specialized group of users). Thus, if we pushed the
    concurrent connections even more, we may have seen more of a pronounced
    difference.
  • The absence of any major red flags was itself a validation. The
    question we wanted answered coming into this experiment was “how close
    are we to maxing out our single-node EC2 configuration such that we will
    have to begin configuring horizontal scaling?”? The answer was: we can
    safely scale further out in the near-term future, and cross the bridge
    of horizontal scaling/bursting when we get there.
  • We did not have enough statistically significant differences in
    performance for #threads/#processes in Puma. However, if we wanted to
    truly find the optimal performance in our app, we would have turned to
    tools like puma_auto_tune to answer those questions.

Let me know in the comments if you have any questions!

Toolbox: learning Swift and VIPER

The following are some notes I’m compiling as I’m beginning a journey
down the rabbit hole, writing an app in Swift utilizing the VIPER app development methodology

  • I had trouble importing nested source code into XCode before realizing that I
    needed to import the folder with corresponding Groups. This is done by
    clicking the checkbox “Create Groups for any Added Folders”

    Reference: https://developer.apple.com/library/ios/technotes/iOSStaticLibraries/Articles/configuration.html

    Without doing this, the compiler was not able to build the project.

    • Since there is no way to do method swizzling in Swift, there are no real easy ways to do mocking/stubbing the way we used to do so in Ruby. Instead, this is forcing me to rely on plain old Swift structs. There are some simple ways to stub, but it ends up looking kind of awkward and very wiring-intensive like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class NewRidePresenterSpec: QuickSpec {
  override func spec() {
    describe("#startRecordingGpsTrack") {
      class MockInteractor: NewRideInteractor {
        var wasCalled: Bool = false

        @objc private override func startRecordingGpsTrack() {
          wasCalled = true
        }
      }

      var subject = NewRidePresenter()

      it("tells the interactor to start recording") {
        let mockInteractor = MockInteractor()
        subject.interactor = mockInteractor
        subject.startRecordingGpsTrack()

        expect(mockInteractor.wasCalled).to(beTrue())
      }
    }
  }
}
  • Using the vipergen and boa scaffolding generators helped me understand the concepts behind the view.
  • Tip: Build a VIPER module, but don’t build it all at once. Just focus on the Presenter-Interactor-Wireframe component, or the DataStore-Entity-Interactor component. This will keep your head from exploding.
  • Dude. I miss vim. Alcatraz + xvim helped a little…
  • xcodebuild + xcpretty + Guard-shell == some sort of CI feedback loop.
  • Manually creating mocks in Swift = kind of painful. If you override (subclass) a NSObject in Swift, you must provide it with the @objc pragma, otherwise it throws a segfault error
  • You must contact CircleCI manually if you want to activate an iOS build (it’s still in beta). What are some other good CI tools to use with iOS?