The Sweet Spot
On software, engineering leadership, and anything shiny.

Evented Rails: Decoupling domains in Rails with Wisper pub/sub events

One common pattern in Domain-Driven Design is the use of publish/subscribe messaging to communicate between domains. When Domain Events are created from within a domain, other domains are able to subscribe to these events and take action within their own domains, respectively.

This is not a common pattern in Rails, particularly because of Ruby’s lack of language support for functional programming paradigms that exist in other languages. However, with a nifty framework and the help of Sidekiq, we can get just a little bit closer.

What is a Domain Event?

A domain event is a recorded property in the system that tracks an action that the system performs, and the factors/properties that lead to its creation.

In the following examples, we are going to use the Wisper gem to implement domain events in our sample Delorean app.

Imagine that we are writing an endpoint that our users will hit, indicating that they want to hail a time-traveling cab. Now the logic to hail a cab is rather complicated and lives in an entirely different area of the codebase, perhaps even in another application. How should we call the other code and ensure that our code is cleanly decoupled?

With our Domain-Driven powers, we’ve been smart enough to segregate our code into different subdomains and bounded contexts, denoted by these two Ruby modules Ridesharing and DriverRouting.

Example 1: In-process pub-sub event modeling, with a service object.

A simple way to use Wisper is to use it to implement your service objects with Wisper, calling the service from the controller.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
module Ridesharing
  class RidesController < ApplicationController
    def post
      # Hail a time-traveling Delorean:
      command = HailDelorean.new
      command.on('hailed') { |driver|
        render text: "Hailed you a cab: #{driver} is arriving!"
      }
      .on('could_not_hail') {
        render text: "Sorry, no dice."
      }
      command.hail!(current_user)
    end
  end
end

Note that the HailDelorean class has powers of event subscriptions now. Our calling code does not have to concern itself with the implementation details of the HailDelorean service - it merely needs to register handlers for the two possible outcomes, hailed and could_not_hail. Here’s how the service class is implemented:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
module Ridesharing
  class HailDelorean
    include Wisper::Publisher

    def hail!(user)
      # broadcast() is a Wisper method to fire an event
      driver = find_driver(user)
      if driver
        broadcast('hailed', driver)
      else
        broadcast('could_not_hail')
      end

    def find_driver(user)
      # Here lies slow, complex domain logic
      DriverRouting::FindDriver.new(user)
    end
  end
end

Handling side effects in subscriber classes

Other side-effects can subscribe to the HailDelorean events. Let’s say we want to fire an event to Segment analytics tracking. I can create a plain Ruby object that simply needs to implement a method with the same name as the event.

Let’s implement hailed and could_not_hail methods on this subscriber class:

1
2
3
4
5
6
7
8
9
class TrackSegmentAnalytics
  def self.hailed(driver)
    # fire analytics event to Segment
  end

  def self.could_not_hail
    # fire analytics event to Segment
  end
end

And we hook it up by subscribing it to the command handler:

1
2
3
4
5
6
7
8
9
10
11
12
module Ridesharing
  class RidesController < ApplicationController
    def post
      # snip
      command = HailDelorean.new(current_user)

      # register the subscriber to the triggering action
      command.subscribe(TrackSegmentAnalytics)
      # snip
    end
  end
end

OK, that was a little awkward, doing all that wiring up in the controller. What if we did the wiring globally, within an app initializer?

1
2
3
4
5
# config/initializers/domain_event_subscriptions.rb
Wisper.subscribe(TrackSegmentAnalytics, scope: "HailDelorean")

# alternate form:
HailDelorean.subscribe(TrackSegmentAnalytics)

This registers a global subscriber for all future instances of HailDelorean.

Example 2: Asynchronous events with subscription handlers and Sidekiq

Here’s the real power of Wisper - we can decouple our application domain responsibilities by modeling effects as subscription objects and do them out-of-band of the primary web request thread.

Note that with the wisper-sidekiq gem, all subscriptions given with an async: true option flag will automatically execute in an external thread as a Sidekiq job. Let’s take advantage of that now.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
module Ridesharing
  class RidesController < ApplicationController
    def post
      # Hail a time-traveling Delorean:
      HailDelorean.hail(current_user.id)
      render text: 'Hailing a cab, please wait for a response...'
    end
  end

  class HailDelorean
    include Wisper::Broadcaster

    def self.hail(passenger_id)
      broadcast(:hail, passenger_id)
    end
  end
end

module DriverRouting
  # Note that this class is both a subscriber and a publisher
  class FindDriver
    include Wisper::Publisher

    def self.hail(passenger_id)
      # Do slow, complex hairy routefinding/optimization/messaging behind the scenes:
      driver = find_driver_for(passenger_id)

      if driver
        broadcast('driver_found', passenger_id, driver.id)
      else
        broadcast('driver_not_found', passenger_id)
      end
    end
  end
end

Finally, we add handlers (subscribers) to these domain objects:

1
2
3
4
5
6
7
8
9
10
11
module Ridesharing
  class NotifyPassengerWithDriverStatus
    def self.driver_found
      # send them a text message :)
    end

    def self.driver_not_found
      # send them a text message :(
    end
  end
end

Now let’s link it together with subscriptions:

1
2
3
4
# config/initializers/domain_event_subscriptions.rb
Ridesharing::HailDelorean.subscribe(DriverRouting::FindDriver, async: true)
DriverRouting::FindDriver.subscribe(Ridesharing::NotifyPassengerWithDriverStatus, async: true)
Wisper.subscribe(AnalyticsListener, scope: "Ridesharing::NotifyPassengerWithDriverStatus", "DriverRouting::FindDriver"], async: true)

Now our messages between our domains are pulled out of the main request thread, and operate in an asynchronous fashion with Sidekiq as the runner.

Code in our domains are kept clean - note that there are no direct references to the other subdomains within each subdomain. Our app more cleanly segregates the responsibilities between each app, heavy workloads are naturally balanced as they move to worker threads.

Caveats: Beware of overbuilding

If you are on a small app, you probably should go with approach #1. The weight of indirection can be a cognitive load on development, unless you truly need to build async code in #2. The overhead and conceptual complexities of the approach can only be justified with large codebases, or in apps where a domain-centric view (and segregation) of code is present.

Caveats: Event subscriptions can be a tangled mess

Note that the act of wiring can quickly fan out into a spidery mess of handlers - you could even further decouple your handlers by modeling a global event bus as a publisher, and having each domain tap into the bus’ events and figure out how to handle each event on its own.

Caveats: transactional consistency!

If you implement this asynchronously, you’ll have to think about how to deal with transactional consistency. Can you design your data models (and database schema) to support independent updates without any dependencies? How will you handle the case when one domain action fails and the other completes?

You may have to roll your own two-phase commit here, the specifics of which I won’t delve into. However, for most of our applications, we may want to skip the asynchronous and keep our events synchronous.

Comments