The Mystery of Event Dispatching in Cizen

Ryo33
Cizen
Published in
4 min readNov 4, 2018

--

Cizen is an Elixir library to build applications with collections of sagas. Sagas communicate with each other by subscribing and dispatching events. In this post, we’re going to dive into the deep inside of the mechanism of event subscription and dispatching.

First, we should know about filters. Each subscription has one filter which represents an event pattern to subscribe. We can create a filter like this:

# A filter for `Join` events to the Lobby.
Filter.new(fn %Event{body: %Join{room: room}} ->
room == "Lobby"
end)

Next principal is an event router. An event router is a system component that lists subscribers whose filter matches with a dispatched event. Cizen has a default event router implementation,Cizen.DefaultEventRouter(we call it “Router” in here), and it can be used like:

alias Cizen.DefaultEventRouter, as Router# Put subscription
Router.put(filter, subscriber)
# List subscribers for the given event.
subscribers = Router.get(event)

By the way, you might notice a probable performance issue of it. If filters are functions and the event router lists subscribers by passing an event to each filter, it does not scale. When there are a million subscribers, 1,000,000 times of function calls happen on each dispatching of events.

To check the problem, let’s measure the execution time of Router.get/1 with varying the number of subscribers 1 to 10, 100, …, 1,000,000. The benchmarking code is here. I used benchee, a benchmark library for Elixir, and the result was:

Operating System: Linux"
CPU Information: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz
Number of Available Cores: 8
Available memory: 7.54 GB
Elixir 1.7.1
Erlang 21.1
Benchmark suite executing with the following configuration:
warmup: 2 s
time: 5 s
memory time: 0 μs
parallel: 1
inputs: a. 1 subscription, b. 10 subscription, c. 100 subscription, d. 1,000 subscription, e. 10,000 subscription, f. 100,000 subscription, g. 1,000,000 subscriptions
Estimated total run time: 49 s
Benchmarking dispatch with input a. 1 subscription...
Benchmarking dispatch with input b. 10 subscription...
Benchmarking dispatch with input c. 100 subscription...
Benchmarking dispatch with input d. 1,000 subscription...
Benchmarking dispatch with input e. 10,000 subscription...
Benchmarking dispatch with input f. 100,000 subscription...
Benchmarking dispatch with input g. 1,000,000 subscriptions...
##### With input a. 1 subscription #####
Name ips average deviation median 99th %
dispatch 190.40 K 5.25 μs ±175.62% 5 μs 6 μs
##### With input b. 10 subscription #####
Name ips average deviation median 99th %
dispatch 187.52 K 5.33 μs ±161.44% 5 μs 7 μs
##### With input c. 100 subscription #####
Name ips average deviation median 99th %
dispatch 171.09 K 5.84 μs ±161.32% 6 μs 9 μs
##### With input d. 1,000 subscription #####
Name ips average deviation median 99th %
dispatch 160.43 K 6.23 μs ±153.67% 6 μs 12 μs
##### With input e. 10,000 subscription #####
Name ips average deviation median 99th %
dispatch 119.38 K 8.38 μs ±87.09% 8 μs 12 μs
##### With input f. 100,000 subscription #####
Name ips average deviation median 99th %
dispatch 26.27 K 38.06 μs ±19.50% 36 μs 48 μs
##### With input g. 1,000,000 subscriptions #####
Name ips average deviation median 99th %
dispatch 2.01 K 497.12 μs ±40.67% 466 μs 672.72 μs

It’s strange. The execution time with a million subscribers is just only 100 times slower than with one subscriber.

The scalable event dispatching is achieved by building a dispatching tree. If we create three subscriptions like the following;

filter_for_a = Filter.new(
fn %Event{body: %SendMessage{to: person}} -> person == "a" end
)
filter_for_b = Filter.new(
fn %Event{body: %SendMessage{to: person}} -> person == "b" end
)
flter_for_c = Filter.new(
fn %Event{body: %SendMessage{to: person}} -> person == "c" end
)
Router.put(filter_for_a, "subscriber a")
Router.put(filter_for_b, "subscriber b")
Router.put(filter_for_c, "subscriber c")

Router builds this tree:

%Router.Node{
operations: %{
{:access, [:__struct__]} => %{
Cizen.Event => %Router.Node{
operations: %{
{:access, [:body, :__struct__]} => %{
SendMessage => %Router.Node{
operations: %{
{:access, [:body, :to]} => %{
"a" => %Router.Node{
operations: %{},
subscriptions: #MapSet<["subscriber a"]>
},
"b" => %Router.Node{
operations: %{},
subscriptions: #MapSet<["subscriber b"]>
},
"c" => %Router.Node{
operations: %{},
subscriptions: #MapSet<["subscriber c"]>
}
}
},
subscriptions: #MapSet<[]>
}
}
},
subscriptions: #MapSet<[]>
}
}
},
subscriptions: #MapSet<[]>
}

Router can list subscribers by walking the tree, and without calling the 3 filters with an event.

To actualize this dispatching method, Filter.new/1is a macro which expands a function into a data on compile time. For example, the filter is expanded as follows:

Filter.new(fn %Event{body: %Join{room: room}} ->
room == "Lobby"
end)
# expanded to%Cizen.Filter{
code: {:and,
[
==: [{:access, [:__struct__]}, Cizen.Event],
and: [
==: [{:access, [:body, :__struct__]}, Join],
==: [{:access, [:body, :room]}, "Lobby"]
]
]}
}

By using tree-structured dispatching, Cizen satisfactorily scales with a large number of subscriptions. If you interested in, you can use Cizen by adding {:cizen, “~> 0.12.0”}to deps, or you can read the implementations of Filter.Code(a little bit complex) and DefaultEventRouter.Node (very short).

--

--