Router Protocol
In this section, we take a look at a simple router protocol. Here, events coming to a specific channel are routed between a set of target channels, according to some user-specified policy. In practice, there are a number of applications of this protocol, ranging from data replication and sharding, to load-balancing and multicasting. The protocol is illustrated in the following figure:
#----------------#
| |
| /-+-- channel 1
| / |
router ---o--> router -----+-- channel 2
channel | \ |
| \-+-- channel 3
| |
#----------------#
In our first example, we will instantiate a master reactor that will route the incoming requests between two workers. For simplicity, requests will be just strings, and the workers will just print those strings to the standard output.
There are several ways to instantiate the router protocol. First, the protocol can be started within an existing reactor, in which case it is just one of the protocols running inside that reactor. Second, the protocol can be started as a standalone reactor, in which case that reactor is dedicated to the router protocol.
We will start by creating an instance of the router protocol in an existing reactor.
We first import the contents of the io.reactors
and the io.reactors.protocol
packages,
and then instantiate a default reactor system.
import io.reactors._
import io.reactors.protocol._
val system = ReactorSystem.default("test-system")
We can now use the reactor system to start two workers, worker1
and worker2
.
We use a shorthand method spawnLocal
, to concisely start the reactors
without first creating the Proto
object:
val worker1 = system.spawnLocal[String] { self =>
self.main.events.onEvent(x => println(s"1: ${x}"))
}
val worker2 = system.spawnLocal[String] { self =>
self.main.events.onEvent(x => println(s"2: ${x}"))
}
Next, we declare a reactor whose main channel takes Unit
events,
since we will not be using the main channel for anything special.
Inside that reactor, we first call router[String]
on the channels
service
to open a connector for the router.
Just calling the router
method does not start the protocol - we need to call
route
on the connector to actually start routing.
The route
method expects a Router.Policy
object as an argument.
The policy object contains the routing logic for the router protocol.
In this example, we will use the simple round-robin policy.
The Router.roundRobin
factory method expects a list of channels
for the round-robin policy, so we will pass a list with worker1
and worker2
:
system.spawnLocal[Unit] { self =>
val router = system.channels.daemon.router[String]
.route(Router.roundRobin(Seq(worker1, worker2)))
router.channel ! "one"
router.channel ! "two"
}
Having instantiated the router
protocol, we use the input channel
associated with the router to send two events to the workers.
After starting the router protocol and sending the events "one"
and "two"
to the router channel, the two strings are delivered to the two different
workers. The roundRobin
policy does not specify which of the target channels
is chosen first, so the output could be the following two lines, in some order:
1: one
2: two
Or, the following two lines, in some order:
1: two
2: one
The round-robin routing policy does not have any knowledge about the two target channels, so it just picks one after another in succession, and then the first one again when it reaches the end of the target list. Effectively, this policy constitutes a very simple form of load-balancing.
There are other predefined policies that can be used with the router protocol.
For example, the Router.random
policy uses a random number generator to route events
to different channels, which is more robust in scenarios when a high-load event
gets sent periodically. Another policy is Router.hash
, which computes the hash code
of the event, and uses it to find the target channel. If either of these are not
satisfactory, deficitRoundRobin
strategy tracks the expected cost of each event,
and biases its routing decisions to balance the total cost sent to each target.
The above-mentioned policies are mainly used in load-balancing. In addition, users can define their own custom policies for their use-cases. For example, if the router has some information about the load and availability of the targets, it could take that into account when making routing decisions.
Router Reactors
In the previous example, we instantiated the router protocol inside an existing reactor. Sometimes we want to completely dedicate the reactor to routing incoming events, so it is useful to have a shorthand notation for starting such router reactors.
In the following, we use a router reactor to implement simple sharding for key-value
pairs, which is typically done in distributed hash-tables.
Here, the data contained in a distributed hash table is spread across some number
of computing nodes called shards. Each shard will be represented by a separate
reactor, which will have two basic operations - Put
and Get
, used to add entries
to the distributed hash table, and to remove them, respectively.
sealed trait Op[K, V] {
def key: K
}
case class Put[K, V](key: K, value: V)
extends Op[K, V]
case class Get[K, V](key: K)
extends Op[K, V]
The Put
operation holds information about the key-value pair,
while the Get
operation contains the desired key.
We implement the distributed hash table functionality in a method
called distributedHashTable
. This method instantiates the shards,
and then starts the main router reactor.
We will use the following convention - if the key has some hash code h
,
then its shard is h % numShards
, where numShards
is the number of
different shards. Incidentally, this is exactly how the router policy
Router.hash
behaves.
The distributed hash table implementation is then shown in the following:
def distributedHashTable[K, V](
numShards: Int
): Server[Op[K, V], Option[V]] = {
// Create the shard reactors, used to hold the key-value pairs.
val shards = for (i <- 0 until numShards) yield {
system.spawnLocal[Server.Req[Op[K, V], Option[V]]] { self =>
val data = mutable.Map[K, V]()
self.main.events onMatch {
case (Put(key, value), reply) =>
reply ! data.get(key)
data(key) = value
case (Get(key), reply) =>
reply ! data.get(key)
}
}
}
// Return the router reactor, which forwards requests
// to the appropriate shard reactor.
system.router(Router.hash(shards, _._1.key.##))
}
Note that, in our implementation, the distributed hash table’s channel type
is a server. Clients must always pass a reply channel on which they
receive the Option[V]
object - the value that was previously in the hash table.
This will mean that we will be able to use the ?
operator
on the distributed hash table, which was explained in the previous
section on the server protocol.
Next, we instantiate the distributed hash table for string key-value pairs,
and specify 32
shards:
val dht = distributedHashTable[String, String](32)
We can now create a client of our distributed hash table, which inserts one entry into the hash table, and then asks to get it back.
system.spawnLocal[Option[String]] { self =>
(dht ? Put("key", "value")) onEvent { prevOpt =>
assert(prevOpt == None)
(dht ? Get("key")) onMatch {
case Some(prev) => println(prev)
case None => println("unexpected - empty!")
}
}
}
After the reactor above starts, we should soon see "value"
printed on the
standard output.