The Sweet Spot
On software, engineering leadership, and anything shiny.

Evented Rails: Decoupling domains in Rails with Wisper pub/sub events

One common pattern in Domain-Driven Design is the use of
publish/subscribe messaging to communicate between domains. When
Domain Events are created from within a domain, other domains are
able to subscribe to these events and take action within their own
domains, respectively.

This is not a common pattern in Rails, particularly because of Ruby’s
lack of language support for functional programming paradigms that exist
in other languages. However, with a nifty framework and the help of
Sidekiq, we can get just a little bit closer.

What is a Domain Event?

A domain event is a recorded property in the system that tracks an
action that the system performs, and the factors/properties that lead to
its creation.

In the following examples, we are going to use the Wisper gem to
implement domain events in our sample Delorean app.

Imagine that we are writing an endpoint that our users will hit,
indicating that they want to hail a time-traveling cab. Now the logic to
hail a cab is rather complicated and lives in an entirely different area
of the codebase, perhaps even in another application. How should we call
the other code and ensure that our code is cleanly decoupled?

With our Domain-Driven powers, we’ve been smart enough to segregate our code into different
subdomains and bounded contexts,
denoted by these two Ruby modules Ridesharing and DriverRouting.

Example 1: In-process pub-sub event modeling, with a service object.

A simple way to use Wisper is to use it to implement your service
objects with Wisper, calling the service from the controller.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
module Ridesharing
  class RidesController < ApplicationController
    def post
      # Hail a time-traveling Delorean:
      command = HailDelorean.new
      command.on('hailed') { |driver|
        render text: "Hailed you a cab: #{driver} is arriving!"
      }
      .on('could_not_hail') {
        render text: "Sorry, no dice."
      }
      command.hail!(current_user)
    end
  end
end

Note that the HailDelorean class has powers of event subscriptions
now. Our calling code does not have to concern itself with the
implementation details of the HailDelorean service - it merely needs
to register handlers for the two possible outcomes, hailed and
could_not_hail. Here’s how the service class is implemented:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
module Ridesharing
  class HailDelorean
    include Wisper::Publisher

    def hail!(user)
      # broadcast() is a Wisper method to fire an event
      driver = find_driver(user)
      if driver
        broadcast('hailed', driver)
      else
        broadcast('could_not_hail')
      end

    def find_driver(user)
      # Here lies slow, complex domain logic
      DriverRouting::FindDriver.new(user)
    end
  end
end

Handling side effects in subscriber classes

Other side-effects can subscribe to the HailDelorean events. Let’s say we want to fire an event
to Segment analytics tracking. I can create a plain Ruby object that simply needs to implement a method with the same name as the event.

Let’s implement hailed and could_not_hail methods on this subscriber class:

1
2
3
4
5
6
7
8
9
class TrackSegmentAnalytics
  def self.hailed(driver)
    # fire analytics event to Segment
  end

  def self.could_not_hail
    # fire analytics event to Segment
  end
end

And we hook it up by subscribing it to the command handler:

1
2
3
4
5
6
7
8
9
10
11
12
module Ridesharing
  class RidesController < ApplicationController
    def post
      # snip
      command = HailDelorean.new(current_user)

      # register the subscriber to the triggering action
      command.subscribe(TrackSegmentAnalytics)
      # snip
    end
  end
end

OK, that was a little awkward, doing all that wiring up in the controller. What if we did the wiring globally,
within an app initializer?

1
2
3
4
5
# config/initializers/domain_event_subscriptions.rb
Wisper.subscribe(TrackSegmentAnalytics, scope: "HailDelorean")

# alternate form:
HailDelorean.subscribe(TrackSegmentAnalytics)

This registers a global subscriber for all future instances of HailDelorean.

Example 2: Asynchronous events with subscription handlers and Sidekiq

Here’s the real power of Wisper - we can decouple our application domain responsibilities by modeling
effects as subscription objects and do them out-of-band of the primary web request thread.

Note that with the wisper-sidekiq gem, all subscriptions
given with an async: true option flag will automatically execute in an external thread as a Sidekiq job.
Let’s take advantage of that now.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
module Ridesharing
  class RidesController < ApplicationController
    def post
      # Hail a time-traveling Delorean:
      HailDelorean.hail(current_user.id)
      render text: 'Hailing a cab, please wait for a response...'
    end
  end

  class HailDelorean
    include Wisper::Broadcaster

    def self.hail(passenger_id)
      broadcast(:hail, passenger_id)
    end
  end
end

module DriverRouting
  # Note that this class is both a subscriber and a publisher
  class FindDriver
    include Wisper::Publisher

    def self.hail(passenger_id)
      # Do slow, complex hairy routefinding/optimization/messaging behind the scenes:
      driver = find_driver_for(passenger_id)

      if driver
        broadcast('driver_found', passenger_id, driver.id)
      else
        broadcast('driver_not_found', passenger_id)
      end
    end
  end
end

Finally, we add handlers (subscribers) to these domain objects:

1
2
3
4
5
6
7
8
9
10
11
module Ridesharing
  class NotifyPassengerWithDriverStatus
    def self.driver_found
      # send them a text message :)
    end

    def self.driver_not_found
      # send them a text message :(
    end
  end
end

Now let’s link it together with subscriptions:

1
2
3
4
# config/initializers/domain_event_subscriptions.rb
Ridesharing::HailDelorean.subscribe(DriverRouting::FindDriver, async: true)
DriverRouting::FindDriver.subscribe(Ridesharing::NotifyPassengerWithDriverStatus, async: true)
Wisper.subscribe(AnalyticsListener, scope: "Ridesharing::NotifyPassengerWithDriverStatus", "DriverRouting::FindDriver"], async: true)

Now our messages between our domains are pulled out of the main request thread, and operate in an asynchronous
fashion with Sidekiq as the runner.

Code in our domains are kept clean - note that there are no direct references to the other subdomains within each
subdomain. Our app more cleanly segregates the responsibilities between each app, heavy workloads are naturally
balanced as they move to worker threads.

Caveats: Beware of overbuilding

If you are on a small app, you probably should go with approach #1. The weight of indirection
can be a cognitive load on development, unless you truly need to build async code in #2. The overhead and
conceptual complexities of the approach can only be justified with large codebases, or in apps
where a domain-centric view (and segregation) of code is present.

Caveats: Event subscriptions can be a tangled mess

Note that the act of wiring can quickly fan out into a spidery mess of handlers - you could even further
decouple your handlers by modeling a global event bus as a publisher, and having each domain tap into the bus’ events
and figure out how to handle each event on its own.

Caveats: transactional consistency!

If you implement this asynchronously, you’ll have to think about how to
deal with transactional consistency. Can you design your data models
(and database schema) to support independent updates without any
dependencies? How will you handle
the case when one domain action fails and the other completes?

You may have to roll your own two-phase commit here, the specifics of
which I won’t delve into. However, for most of our applications, we may
want to skip the asynchronous and keep our events synchronous.

Comments