The Sweet Spot
On software, engineering leadership, and anything shiny.

Evented Rails: Decoupling domains in Rails with Wisper pub/sub events

One common pattern in Domain-Driven Design is the use of publish/subscribe messaging to communicate between domains. When Domain Events are created from within a domain, other domains are able to subscribe to these events and take action within their own domains, respectively.

This is not a common pattern in Rails, particularly because of Ruby’s lack of language support for functional programming paradigms that exist in other languages. However, with a nifty framework and the help of Sidekiq, we can get just a little bit closer.

What is a Domain Event?

A domain event is a recorded property in the system that tracks an action that the system performs, and the factors/properties that lead to its creation.

In the following examples, we are going to use the Wisper gem to implement domain events in our sample Delorean app.

Imagine that we are writing an endpoint that our users will hit, indicating that they want to hail a time-traveling cab. Now the logic to hail a cab is rather complicated and lives in an entirely different area of the codebase, perhaps even in another application. How should we call the other code and ensure that our code is cleanly decoupled?

With our Domain-Driven powers, we’ve been smart enough to segregate our code into different subdomains and bounded contexts, denoted by these two Ruby modules Ridesharing and DriverRouting.

Example 1: In-process pub-sub event modeling, with a service object.

A simple way to use Wisper is to use it to implement your service objects with Wisper, calling the service from the controller.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
module Ridesharing
  class RidesController < ApplicationController
    def post
      # Hail a time-traveling Delorean:
      command = HailDelorean.new
      command.on('hailed') { |driver|
        render text: "Hailed you a cab: #{driver} is arriving!"
      }
      .on('could_not_hail') {
        render text: "Sorry, no dice."
      }
      command.hail!(current_user)
    end
  end
end

Note that the HailDelorean class has powers of event subscriptions now. Our calling code does not have to concern itself with the implementation details of the HailDelorean service – it merely needs to register handlers for the two possible outcomes, hailed and could_not_hail. Here’s how the service class is implemented:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
module Ridesharing
  class HailDelorean
    include Wisper::Publisher

    def hail!(user)
      # broadcast() is a Wisper method to fire an event
      driver = find_driver(user)
      if driver
        broadcast('hailed', driver)
      else
        broadcast('could_not_hail')
      end

    def find_driver(user)
      # Here lies slow, complex domain logic
      DriverRouting::FindDriver.new(user)
    end
  end
end

Handling side effects in subscriber classes

Other side-effects can subscribe to the HailDelorean events. Let’s say we want to fire an event to Segment analytics tracking. I can create a plain Ruby object that simply needs to implement a method with the same name as the event.

Let’s implement hailed and could_not_hail methods on this subscriber class:

1
2
3
4
5
6
7
8
9
class TrackSegmentAnalytics
  def self.hailed(driver)
    # fire analytics event to Segment
  end

  def self.could_not_hail
    # fire analytics event to Segment
  end
end

And we hook it up by subscribing it to the command handler:

1
2
3
4
5
6
7
8
9
10
11
12
module Ridesharing
  class RidesController < ApplicationController
    def post
      # snip
      command = HailDelorean.new(current_user)

      # register the subscriber to the triggering action
      command.subscribe(TrackSegmentAnalytics)
      # snip
    end
  end
end

OK, that was a little awkward, doing all that wiring up in the controller. What if we did the wiring globally, within an app initializer?

1
2
3
4
5
# config/initializers/domain_event_subscriptions.rb
Wisper.subscribe(TrackSegmentAnalytics, scope: "HailDelorean")

# alternate form:
HailDelorean.subscribe(TrackSegmentAnalytics)

This registers a global subscriber for all future instances of HailDelorean.

Example 2: Asynchronous events with subscription handlers and Sidekiq

Here’s the real power of Wisper – we can decouple our application domain responsibilities by modeling effects as subscription objects and do them out-of-band of the primary web request thread.

Note that with the wisper-sidekiq gem, all subscriptions given with an async: true option flag will automatically execute in an external thread as a Sidekiq job. Let’s take advantage of that now.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
module Ridesharing
  class RidesController < ApplicationController
    def post
      # Hail a time-traveling Delorean:
      HailDelorean.hail(current_user.id)
      render text: 'Hailing a cab, please wait for a response...'
    end
  end

  class HailDelorean
    include Wisper::Broadcaster

    def self.hail(passenger_id)
      broadcast(:hail, passenger_id)
    end
  end
end

module DriverRouting
  # Note that this class is both a subscriber and a publisher
  class FindDriver
    include Wisper::Publisher

    def self.hail(passenger_id)
      # Do slow, complex hairy routefinding/optimization/messaging behind the scenes:
      driver = find_driver_for(passenger_id)

      if driver
        broadcast('driver_found', passenger_id, driver.id)
      else
        broadcast('driver_not_found', passenger_id)
      end
    end
  end
end

Finally, we add handlers (subscribers) to these domain objects:

1
2
3
4
5
6
7
8
9
10
11
module Ridesharing
  class NotifyPassengerWithDriverStatus
    def self.driver_found
      # send them a text message :)
    end

    def self.driver_not_found
      # send them a text message :(
    end
  end
end

Now let’s link it together with subscriptions:

1
2
3
4
# config/initializers/domain_event_subscriptions.rb
Ridesharing::HailDelorean.subscribe(DriverRouting::FindDriver, async: true)
DriverRouting::FindDriver.subscribe(Ridesharing::NotifyPassengerWithDriverStatus, async: true)
Wisper.subscribe(AnalyticsListener, scope: "Ridesharing::NotifyPassengerWithDriverStatus", "DriverRouting::FindDriver"], async: true)

Now our messages between our domains are pulled out of the main request thread, and operate in an asynchronous fashion with Sidekiq as the runner.

Code in our domains are kept clean – note that there are no direct references to the other subdomains within each subdomain. Our app more cleanly segregates the responsibilities between each app, heavy workloads are naturally balanced as they move to worker threads.

Caveats: Beware of overbuilding

If you are on a small app, you probably should go with approach #1. The weight of indirection can be a cognitive load on development, unless you truly need to build async code in #2. The overhead and conceptual complexities of the approach can only be justified with large codebases, or in apps where a domain-centric view (and segregation) of code is present.

Caveats: Event subscriptions can be a tangled mess

Note that the act of wiring can quickly fan out into a spidery mess of handlers – you could even further decouple your handlers by modeling a global event bus as a publisher, and having each domain tap into the bus' events and figure out how to handle each event on its own.

Caveats: transactional consistency!

If you implement this asynchronously, you’ll have to think about how to deal with transactional consistency. Can you design your data models (and database schema) to support independent updates without any dependencies? How will you handle the case when one domain action fails and the other completes?

You may have to roll your own two-phase commit here, the specifics of which I won’t delve into. However, for most of our applications, we may want to skip the asynchronous and keep our events synchronous.

Domain-Driven Design & The Joy of Naming

I want to discuss a topic near and dear to my heart, and what I believe is at the crux of effective software design. It’s not a new functional language, it’s not a fancy new framework, a how-to guide to do microservices, nor a quantum leap in the field of machine learning.

It’s much simpler.

It’s about names.

In the beginning...

Names define us. They define concepts. They imbue a concept with shared understanding. They’re language concepts, but more than that, they’re units of meaning.

Software development is a fundamentally human endeavour. No amount of technical computing breakthroughs will change the fact that software development is still the arduous task of getting a team together full of humans from a kaleidescope of different cultural, linguistic backgrounds – then throwing them together to build an arbitrarily complex product in a rapidly-shifting competitive landscape.

Not only that, the thing to build is chock-full of systems that interact with other systems of unbounded complexity. Additionally, once your software system is out in the wild, you need to make sure that it was the right thing to build. Is the product you built correctly tuned to your market? Is it generating sufficient revenue?

The landscape is littered with software projects that began ambitiously, but got lost in a towering mess of fragile code. It’s no wonder that developing reliable, successful software is more art than science.

Crossing our linguistic wires

Let’s rewind back to a scene from a typical day in the life of your software development team. Think back to the last time you discussed a story with your product owner, how did it unfold?

Let’s imagine a scene at Delorean, the Uber for time travel, where you work:

PO: Our next big project is to update our driver app to show rider locations on the timeline map.

You: And when do these riders show up on the timeline map?

PO: When the driver turns on the app and signals that she’s driving.

You: OK, so that means when the app boots up and the DriverStatus service receives a POST we’ll need to simultaneously fetch references from the HailingUser service based on time locality.

PO: Um… I guess so?

Or how about your last iteration planning meeting, where you discussed the intricacies of a specific story?

PO: In this story, we’re going to add a coupon box to the checkout flow.

You: [Thinking out loud] Hm… would that mean we add a /coupon route to the checkout API?

Teammate: Wait – I think we call them Discounts in the backend. And the checkout flow is technically part of the RideCommerce service.

You: Right – I mean let’s call the route /coupon but it’ll create a Discount object. And in this story, let’s just remember that the checkout API really refers to the RideCommerce service.

PO: I’ll add a note to the story.

The implementing engineer, of course, doesn’t read the note in the story (who has time to, anyways?). In the course of implementation, he gets tripped up in semantics and spends the better part of a half day re-implementing the Checkout flow as an entirely new service, before realizing his mistake in code review and backing out his changes.

Months later, a new colleague is tasked to fix the link in the checkout flow, but files an incomplete fix because she was not aware of the fact that Coupons actually had mappings back to Discounts. The bug makes its way to production, where it subtly lies dormant until a most inopportune time…

A better, Domain-Driven way

In Eric Evans' book Domain-Driven Design, he describes the concept of a Ubiquitous Language – a shared, common vocabulary that the entire team shares when discussing software.

When we say the “entire team”, we mean the combined team of designers, developers, the product owner and any other domain experts that might be at hand.

Your product owner may be your domain expert (and typically is). However, you may have other domain experts such as:

  • Any team that builds reporting or analytics off of your software.
  • Upstream data providers
  • Anybody further up the reporting chain whose purview includes the software you’re building, or its effects. Think: the Director of Finance, the COO, the head of Customer Support.
  • The users of your software

Side note: in XP, each team has an “onsite customer” – this is your domain expert!

Developing a Ubiquitous Language with a Glossary

Try this: keep a living document of all the terminology your team uses – along with all its definitions. This Glossary is exactly what it sounds – a list of terms and their definitions.

Delorean Team Glossary

  • Coupon: an applied discount to a BookingAmount. A coupon may take the form of a Fixed or a Percentage amount.
    • Fixed-type: A coupon that applies a fixed amount of money – e.g. a $30 USD discount.
    • Percentage-type: A coupon that applies a percentage savings off the total BookingAmount.
  • Driver: An employed driver who drives within the system, picking up passengers and driving Trips for payment.
  • Trip: An itinerary of passenger pick-up and drop-off location and times.
  • Rider: The passenger that books the trip and is transported by the Driver.
  • Booking: A reservation for a Trip, as booked by the Rider.
  • BookingAmount: The monetary amount of the Trip, accounting for the trip cost, surge pricing, coupons and taxes.
  • Routing Engine: The software system that maps out the driving directions for a driver.
  • Payment: A record of how a user paid.
  • Charge: A financial transaction for a specific dollar amount, for a specific charge method to an institution.
  • Checkout: A workflow in which a Payment is made for a Booking.

From now on, use only the term definitions listed here in your stories. Be explicit about how you use your language!

I’ve been on many projects where the sloppy usage of a term from project inception led to the usage of that term in the code – codifying that messy, slippery term throughout the life of the project.

Which leads us to our next point:

Refactoring your team to use the right terms

Your Glossary is a living document. It is meant to be living – either on a continually-updated Google Doc or a wiki page. It should be visible for all to see – you should print it out and post it on the walls!

Meanwhile, in a planning meeting:

You: So when a user logs into the app and broadcasts that they’re ready to drive…

PO: You mean Driver. When a Driver logs in.

You: Right. Good catch.

It seems a little silly (after all, you both know only Drivers use the broadcast feature of the app), but the laser focus on using the right words means that your team is always on the same page when talking about things.

Later that afternoon, your teammate taps you on the shoulder:

Teammate: I’m about to implement the Coupon story. I suggest we rename the Discount class to Coupon.

You: Great idea. That way, we aren’t tripped up by the naming mismatches in the future.

Teammate: I do have a question about the coupon, though. Do you think it’s applied to the BookingAmount, or is it added?

PO: [Overhearing conversation] You had it right. It’s applied.

You and your teammate then go and update the glossary, scribbling an addendum on the wall (or updating your wiki):

1
- **Coupon**: ... Coupons may be *applied* to BookingAmounts to discount the total cost of the booking.

Refactoring your code to use the right terms

Your teammate and you then walk over to her desk; as a pair you proceed to refactor the existing account code. We’ll use Ruby for the sake of this example.

In the beginning, the code looks like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Checkout
  def initialize(booking_amount, discount)
    @booking_amount = booking_amount
    @discount = discount
  end

  def total
    @booking_amount.total - @discount.calculate_amount_for(booking_amount: booking_amount)
  end
end

class Discount
  STRATEGY_FIXED = 'STRATEGY_FIXED'
  STRATEGY_PERCENTAGE = 'STRATEGY_PERCENTAGE'

  def initialize(amount, strategy)
    @amount = amount
    @strategy = strategy
  end

  def calculate_amount_for(booking_amount:)
    # Implementation...
  end
end

You take a first pass and rename the Discount class to Coupon.

1
2
3
4
5
6
7
8
9
10
11
12
13
class Coupon
  STRATEGY_FIXED = 'STRATEGY_FIXED'
  STRATEGY_PERCENTAGE = 'STRATEGY_PERCENTAGE'

  def initialize(amount, strategy)
    @amount = amount
    @strategy = strategy
  end

  def calculate_amount_for(booking_amount:)
    # Implementation...
  end
end

Now there’s something funny here – your domain language suggests that a Coupon is applied to a BookingAmount. You pause, because the code reads the opposite – “A Coupon calculates its amount for a BookingAmount”.

You: How about we also refactor the calculate_amount_for method to reflect the language a little better?

Teammate: Yeah. It sounds like the action occurs the other way – the BookingAmount is responsible for applying a Coupon to itself.

In your next refactoring pass, you move the calculate_amount_for method into the BookingAmount, calling it applied_discount_total:

1
2
3
4
5
6
7
class BookingAmount
  # implementation details...

  def applied_coupon_amount(coupon:)
    # Implementation...
  end
end

Finally, you change your Checkout implementation to match:

1
2
3
4
5
6
7
8
9
10
class Checkout
  def initialize(booking_amount, coupon)
    @booking_amount = booking_amount
    @coupon = coupon
  end

  def total_amount
    @booking_amount.price - @booking_amount.applied_coupon_amount(coupon: @coupon)
  end
end

When you read the implementation in plain English, it reads:

The checkout’s total amount is calculated by subtracting the booking amount’s applied coupon amount from the booking amount price.

Phew! Designing a strong Ubiquitous Language was hard work! In fact, you had spent a goodly amount of time debating and clarifying with your domain experts:

  • Is a Coupon applied to a BookingAmount, or is it discounted from one?
  • Should we call it a Coupon amount, or a Coupon cost?
  • Is the pre-tax, pre-discount amount in the BookingAmount called a price, or a cost?

Whatever you agreed on, that’s what you changed your code to reflect.

Continual refinement

Hm. Something still feels off.

You and your teammate feel your OOP spidey senses going haywire.

Teammate: Hm. I guess that worked, but that’s still not exactly as clean as we wanted it. Isn’t it kind of weird how the Checkout owns the calculation for the calculation of a discount?

You: Yeah, I see where you’re coming from. That’s just not good OO design. Additionally, if we notice the language our domain experts were using, they didn’t mention that the checkout total was some subtraction of something from another thing. The Checkout’s total simply is the order amount, after application of a Coupon.

Your partner and you take one last step:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Checkout
  def initialize(booking_amount, coupon)
    @booking_amount = booking_amount
    @booking_amount.apply!(coupon)
  end

  def total_amount
    @booking_amount.amount
  end
end


class BookingAmount
  # Implementation...

  def apply!(coupon)
    @coupons += coupon
  end

  def amount
    @amount - @coupons.sum(&:amount)
  end
end

You sit back and read it back, out loud:

The checkout’s total amount is the BookingAmount after a Coupon has been applied.

You both smile. Much better.

In closing…

In this brief time we had together,

  • We discussed why names are important – especially in a complex endeavour like software development.
  • We covered why it’s important to arrive at a shared understanding, together as a team, using the same words and vocabulary.
  • We discovered how to build and integrate a Glossary into the daily rhythm of our team
  • We refactored the code twice – illustrating how to get code in line with the domain language.

And there is much more!

In an upcoming post, we’ll investigate how the Ubiquitous Language applies to a core concept of Domain-Driven Design: the Bounded Context. Why is that important? Because Bounded Contexts give us tools to organize our code – and to do further advanced things like break up monoliths into services.

Knex.js and PostGIS cheat sheet

As follows are some code snippets for using Knex.js for executing Postgres and PostGIS queries.

Execute raw SQL in migration

I often find this useful for fancy SQL, like creating views.

1
2
3
exports.up = function(knex, Promise) {
  return knex.raw(`YOUR RAW SQL`);
};

Add a PostGIS Point type to a table in a migration:

1
2
3
return knex.schema.table('events', function(table) {
  table.specificType('point', 'geometry(point, 4326)');
})

Add a foreign key to another table.

1
2
3
return knex.schema.table('events', function(table) {
  table.integer('device_id').references('id').inTable('devices');
});

Add a multi-column unique index

1
2
3
return knex.schema.table('events', function(table) {
  table.unique(['start_time', 'end_time', 'start_location', 'end_location', 'distance_miles']);
});

Find a collection

1
2
3
4
knex.select('*')
.from('participants')
.where({ name: 'Jason' })
.andWhere('age', '>=', 20)

Custom operations in SELECT clause

1
2
3
knex('trips')
.select(knex.raw('miles * passengers as passenger_miles'))
.select(knex.raw("CONCAT('Hello, ', name) as greeting_message"))

Return PostGIS data from a spatial column:

We use knex-postgis to gain access to PostGIS functions in Postgres. Here, we return a ‘point’ column with ST_AsGeoJSON:

1
2
const knexPostgis = require('knex-postgis')(knex);
knex('events').select('*', knexPostgis.asGeoJSON('point'));

See knex-postgis documentation for a list of other PostGIS functions that are supported.

Lossless rate limiting with RxJS

Much of RxJS involves working with backpressure – how to reconcile streams that emit/process data at different rates, without overloading the system. Much of that model is built with lossy handling in mind – it makes sense that when your system is under duress, that you design your streams to degrade gracefully (e.g. drop certain events, or rate limit them by chunking into windows, etc).

However, there are times when it is appropriate to have a lossless approach to backpressure – e.g., to store every chunk of data that comes through a stream in memory, and not drop things. These use cases may come about when:

  • You have a short-lived, or bounded set of data you know will come over the pipe. You understand the bounds of the data that will ever come over the pipe.
  • You have a processing script you want to run, which is not part of a large system.
  • You have a honkin' large system that can handle the load.

In my case, I had a script that called the Google Geocoding API for a set of GPS coordinates. Now for a set of several hundred coordinates, I would end up calling the API several hundred times all at once with this naive implementation:

1
2
3
4
// address$: [ "1234 Widget Way, Promiseland, WV" ] -- [...] -- [...]
const geocoded$ = addresses$
.flatMap(address => Rx.Observable.fromPromise(callGoogleGeocodingService(address)))
// geocoded$: [ { latitude: 89.99, longitude: 90.00, ... } ] -- [...] -- [...]

I searched all over for a lossless throttling mechanism, but all I could find was references to RxJS’s lossy throttle behavior.

Other frameworks, like Bacon.js’s bufferingThrottle() and Highland.js ratelimit() seemed attractive. Where was RxJS’s equivalent?

Thanks to a helpful StackOverflow post, I found the answer: the use of concatMap() and delay() forces the incoming stream to execute serially over artificial time delayed streams.

1
2
3
const geocoded$ = addresses$
.concatMap(address => Rx.Observable.just(address).delay(TIME_INTERVAL))
.flatMap(address => Rx.Observable.fromPromise(callGoogleGeocodingService(address)))

Thanks to:

Partitioning RxJS streams: adventures in nested Observables with groupBy() and flatMap()

One of the confusing aspects about working with streams is diving into Rx operators that take a stream and fan out into multiple streams.

Is your head exploding yet?

The problem:

Let’s dive into a problem I ran into while working on a personal project:

The task at hand is to take a list of GPS moving point data and partition the group data into multiple clusters of points, count up each group, then return the aggregate stats. As a cyclist is moving, I want to know how often they are moving at that specific velocity (speed).

Our weapon of choice is the RxJS groupBy() function, which groups like stream values based on a key value you define.

Image of groupBy() at work, with marbles.

OK. Easy enough. So my implementation looked something like this:

1
2
gpsPointStream
.groupBy((point) => point.velocity)

The supplied (point) => point.velocity function determines the key value for the supplied event, which then 1) creates a new Observable sequence for that specific key value, if it doesn’t exist, or 2) assigns your event to an existing Observable sequence.

Let’s illustrate:

1
2
src:     -- { velocity: 0 } ---------- { velocity: 0.1 } ----------------------------------- { velocity: 0 } -->
groupBy: -- [{ Observable key: 0 }] -- [ { Observable key: 0 }, { Observable key: 0.1 } ] -- [ { Observable key: 0 count: 2 }, { Observable key: 0.1 } ] -->

Never fear, flatMap() to the rescue.

So the story turns to our hero flatMap(), which as it turns out is specifically tuned to deal with issues of dealing with multiple streams.

Marble diagram for flatMap

flatMap will take a supplied function as its argument, which is the operation to apply to each argument within the supplied stream.

1
2
3
4
5
6
gpsPointStream
.groupBy((point) => point.velocity)
.flatMap((group) => {
  return group.scan((h, v) => h + 1, 0)
              .zip(Observable.just(group.key))
});
1
2
3
src:     -- { velocity: 0 } ---------- { velocity: 0.1 } ----------------------------------- { velocity: 0 } ---->
groupBy: -- [{ Observable key: 0 }] -- [ { Observable key: 0 }, { Observable key: 0.1 } ] -- [ { Observable key: 0 count: 2 }, { Observable key: 0.1 } ] -->
flatMap: -- [ 1, 0 ] ----------------- [ 1, 0.1 ] ------------------------------------------ [ 2, 0 ] -->

What just happened here?

I specified a merging function for the flatMap() stream, which performed the scan() counting aggregation on my group before merging the stream back into the main stream. I threw in a zip, which annotated my aggregate count value with a record of the group key (velocity) that this value was computed for.

Compare it to imperative

The equivalent of groupBy/flatMap in imperative programming is, quite literally, just _.groupBy() and _.flatMap(). With a few key differences. Here it is in lodash:

1
2
3
4
5
6
7
8
9
10
11
12
var grouped = _([ { velocity: 0 }, { velocity: 0.1 }, { velocity: 0 } ])
.groupBy((point) => point.velocity)

grouped.value()
// { 0: [ { velocity: 0 }, { velocity: 0 } ], 0.1: [ { velocity: 0.1 } ] }

var flatmapped = grouped.flatMap((v, k) => {
  return [ [v.length, k] ]
  })

flatmapped.value()
// [[2, "0"], [1, "0.1"]]

So in the end, the end result was the same with one crucial difference – our Observable, reactive version was able to take intermediate accounts into time and perform an intermediate calculation as data was flowing in. This allowed us to generate an intermediate count for the “0” velocity group.

Takeaways

  • When you want to fan out a stream into groups or partitions based on a specific stream value, turn to groupBy.
  • When you have a need to combine a stream-of-streams, you want to look at flatMap. You may also consider looking at concatMap, a close cousin of flatMap.
  • Reactive programming gives you more expressive abilities to reason about time and event ordering. You just have to tilt your head a little bit.

Further reading:

Update: 2016/03/22

Updated typo where the index variable on a GroupedObservable was changed to correctly be key.