Reactors 101
As we learned previously, event streams always propagate events on a single thread. This is useful from the standpoint of program comprehension, but we still need a way to express concurrency in our programs. In this section, we will see how this is done with reactors.
A reactor is the basic unit of concurrency. For readers familiar with the actor model, a reactor is close to the concept of an actor. While actors receive messages, reactors receive events. However, while an actor in particular state has only a single point in its definition where it can receive a message, a reactor can receive an event from many different sources at any time. Despite this flexibility, one reactor will always process at most one event at any time. We say that events received by a reactor serialize, similar to how messages received by an actor are serialized.
As before, we start by importing the io.reactors
package:
import io.reactors._
import io.reactors.japi.*;
To be able to create new reactors, we need a ReactorSystem
object, which tracks
reactors in a single machine.
val system = new ReactorSystem("test-system")
ReactorSystem system = ReactorSystem.create("test-system");
Before we can start a reactor instance, we need to define its template. One way to
do this is to call Reactor.apply[T]
method, which returns a Proto
object for the
reactor. The Proto
object is called a prototype of the reactors.
The following reactor prints all the events it receives to the standard output:
val proto: Proto[Reactor[String]] = Reactor[String] { self =>
self.main.events onEvent {
x => println(x)
}
}
Proto<String> proto = Reactor.apply(
self -> self.main().events().onEvent(x -> System.out.println(x))
);
Let’s examine this code more closely. The Reactor.apply
method is called with the
argument String
. This means that the reactor encoded in the resulting Proto
object only receives events whose type is String
. This is the first difference
with respect to the standard actor model, in which actors can receive messages of
any type. Events received by reactors are well typed.
In the reactor model, every reactor can access a special event stream called
main.events
, which emits events that the reactor receives from other reactors.
Since we are declaring an anonymous reactor with the Reactor.apply
method, we need
to add a prefix self.
to access members of the reactor.
We previously learned that we can call onEvent
to register callbacks to event
streams, and we used it in this example to print the events using println
.
After defining a reactor template, the next step is to spawn a new reactor. We do
this by calling the spawn
method on the reactor system:
val ch: Channel[String] = system.spawn(proto)
Channel<String> ch = system.spawn(proto);
The method spawn
takes a Proto
object as a parameter. The Proto
object can
generally encode the reactor’s constructor arguments, scheduler, name and other
options. In our example, we created a Proto
object for an anonymous reactor
with the Reactor.apply
method, so we don’t have any constructor arguments. We will
later see alternative ways of declaring reactors and configuring prototypes.
The method spawn
does two things. First, it registers and starts a new reactor
instance. Second, it returns a Channel
object, which is used to send events to
the newly created reactor. We can show the relationship between a reactor, its
event stream and the channel as follows:
"Hello world!"
| #-----------------------#
| | Reactor[String] |
V | |
Channel[String] ---o--> Events[String] |
^ | |
| | |
| #-----------------------#
"Hola!"
The only way for the outside world to access the inside of a reactor is to send events to its channel. These events are eventually delivered to the corresponding event stream, which the reactor can listen to. The channel and event stream can only pass events whose type corresponds to the type of the reactor.
Let’s send an event to HelloReactor
. We do this by calling the bang operator !
on the channel:
ch ! "Hola!"
ch.send("Hola!");
Running the last statement should print "Hola!"
to the standard output.
Defining and configuring reactors
In the previous section, we saw how to define a reactor using the Reactor.apply
method. In this section, we take a look at an alternative way of defining a reactor –
by extending the Reactor
class. Recall that the Reactor.apply
method defines an
anonymous reactor template. Extending the Reactor
class declares a named reactor
template.
In the following, we declare HelloReactor
, which must be a top-level class:
class HelloReactor extends Reactor[String] {
main.events onEvent {
x => println(x)
}
}
public static class HelloReactor extends Reactor<String> {
public HelloReactor() {
main().events().onEvent(
x -> System.out.println(x)
);
}
}
To run this reactor, we first create a prototype to configure it. The method
Proto.apply
takes the type of the reactor and returns a prototype for that
reactor type. We then call the spawn
method with that Proto
object to start the
reactor:
val ch = system.spawn(Proto[HelloReactor])
ch ! "Howdee!"
Proto<String> proto = Proto.create(HelloReactor.class);
Channel<String> ch = system.spawn(proto);
ch.send("Howdee!");
We can also use the prototype to, for example, set the scheduler that the reactor instance should use. If we want the reactor instance to run on its dedicated thread to give it more priority, we can do the following:
system.spawn(Proto[HelloReactor].withScheduler(JvmScheduler.Key.newThread))
system.spawn(Proto.create(HelloReactor.class)
.withScheduler(Scheduler.NEW_THREAD));
Note that if you are running Reactors on Scala.js,
you will need to use Scala.js-specific scheduler.
The reason is that the JavaScript runtime is not multi-threaded.
Asynchronous executions are placed on a single queue,
and executed one after another.
On Scala.js, you will therefore need to use the JsScheduler.Key.default
scheduler.
system.spawn(proto.withScheduler(JsScheduler.Key.default))
The call to withScheduler
returns a new prototype that runs on a predefined
scheduler called ReactorSystem.Bundle.schedulers.newThread
. A reactor started like
this is using this scheduler. Reactor systems allow registering custom schedulers.
In the following, we define a custom Timer
scheduler, which schedules the reactor
for execution once every 1000
milliseconds:
system.bundle.registerScheduler("customTimer",
new JvmScheduler.Timer(1000))
val periodic = system.spawn(
Proto[HelloReactor].withScheduler("customTimer"))
periodic ! "Ohayo!"
system.bundle().registerScheduler("customTimer", Scheduler.timer(1000));
Proto<String> protoWithTimer =
Proto.create(HelloReactor.class).withScheduler("customTimer");
Channel<String> periodic = system.spawn(protoWithTimer);
periodic.send("Ohayo");
By running the code above, we can see that the event "Ohayo!"
is processed only 1
second after the reactor starts.
There are several other configuration options for Proto
objects, listed in the
online API docs. We can summarize this section as follows – starting a reactor is
generally a three step process:
- A reactor template is created by extending the
Reactor
class. - A reactor
Proto
configuration object is created with theProto.apply
method. - A reactor instance is started with the
spawn
method of the reactor system.
For convenience, we can fuse the first two steps by using the Reactor.apply
method, which creates an anonymous reactor template and directly returns a
prototype object of type Proto[I]
, for some reactor type I
.
Typically, we do this in tests or in the REPL.
Using channels
Now that we understand how to create and configure reactors in different ways, we can
take a closer look at channels – reactor’s means of communicating with its environment.
As noted before, every reactor is created with a default channel called main
, which is
usually sufficient. But sometimes a reactor needs to be able to receive more than just
one type of an event, and needs additional channels for this purpose.
Let’s declare a reactor that stores key-value pairs. We can do this by defining the following reactor:
import scala.collection._
class PutOnlyReactor[K, V] extends Reactor[(K, V)] {
val map = mutable.Map[K, V]()
main.events onEvent {
case (k, v) => map(k) = v
}
}
public static class Pair<K, V> {
public K k;
public V v;
public Pair(K k, V v) {
this.k = k;
this.v = v;
}
}
public static class PutOnlyReactor<K, V> extends Reactor<Pair<K, V>> {
Map<K, V> map = new HashMap<K, V>();
public PutOnlyReactor() {
main().events().onEvent(p -> map.put(p.k, p.v));
}
}
The PutOnlyReactor
accepts events of type (K, V)
for some generic type parameters
K
and V
. This is fine for the purposes of storing key-value pairs into this reactor,
but it does not make it possible to query values stored to specific keys. For this, the
sender must give the reactor the desired key of type K
and a channel of type
Channel[V]
, on which the value V
can be sent back.
Since the same input channel serves two purposes, we need the following data type:
trait Op[K, V]
case class Put[K, V](k: K, v: V) extends Op[K, V]
case class Get[K, V](k: K, ch: Channel[V]) extends Op[K, V]
public static interface Op<K, V> {
public void apply(Map<K, V> map);
}
public static class Put<K, V> implements Op<K, V> {
public K k;
public V v;
public Put(K k, V v) {
this.k = k;
this.v = v;
}
public void apply(Map<K, V> map) {
map.put(k, v);
}
}
public static class Get<K, V> implements Op<K, V> {
public K k;
public Channel<V> ch;
public Get(K k, Channel<V> ch) {
this.k = k;
this.ch = ch;
}
public void apply(Map<K, V> map) {
ch.send(map.get(k));
}
}
With the Op[K, V]
data type, we can define the following reactor:
class MapReactor[K, V] extends Reactor[Op[K, V]] {
val map = mutable.Map[K, V]()
main.events onEvent {
case Put(k, v) => map(k) = v
case Get(k, ch) => ch ! map(k)
}
}
public static class MapReactor<K, V> extends Reactor<Op<K, V>> {
Map<K, V> map = new HashMap<K, V>();
public MapReactor() {
main().events().onEvent(op -> op.apply(map));
}
}
Let’s start MapReactor
and test it. We will use the MapReactor
to store some
DNS aliases. We will map each alias String
key to a URL, where the URLs are
represented with the List[String]
type. We first initialize as follows:
val mapper = system.spawn(Proto[MapReactor[String, List[String]]])
Channel<Op<String, String[]>> mapper =
system.spawn(Proto.create(MapReactor.class));
We then send a couple of Put
messages to store some alias values:
mapper ! Put("dns-main", "dns1" :: "lan" :: Nil)
mapper ! Put("dns-backup", "dns2" :: "com" :: Nil)
mapper.send(new Put("dns-main", new String[] { "dns1", "lan" }));
mapper.send(new Put("dns-backup", new String[] { "dns2", "com" }));
Next, we create a client reactor that we control by sending it String
events. This
means that the reactor’s type will be Reactor[String]
. However, the client reactor
will also have to contact the MapReactor
and ask it for one of the URLs. Since the
MapReactor
can only send it back events that are List[String]
, the client’s
default channel might not generally be able to receive the reply. The client will
have to provide the MapReactor
with a different channel. The following expression
is used to create a new channel:
val c: Connector[EventType] = system.channels.open[EventType]
The Connector
object contains two members: channel
, which is the newly created
channel, and events
, which is the event stream corresponding to that channel. The
event stream propagates all events that were sent and delivered on the channel, and
can only be used by the reactor that created it. The channel, on the other hand,
can be shared with other reactors.
Event streams are not shareable objects – never send an event stream created by one reactor to some other reactor.
The expression system.channels
returns a channel builder object, which provides
methods like named
or daemon
, used to customize the channel (see online API docs
for more details). In this example, we will use the daemon
channel, to indicate
that the channel does not need to be closed (more on that a bit later). To create a
new channel, we call open
on the channel builder with the appropriate type
parameter.
Let’s define a client reactor that waits for a "start"
message, and then checks
a DNS entry. This reactor will use the onMatch
handler instead of onEvent
, to
listen only to certain String
events and ignore others:
val ch = system.spawn(Reactor[String] { self =>
self.main.events onMatch {
case "start" =>
val reply = self.system.channels.daemon.open[List[String]]
mapper ! Get("dns-main", reply.channel)
reply.events onEvent { url =>
println(url)
}
case "end" =>
self.main.seal()
}
})
Proto<String> clientProto = Reactor.apply(
self -> self.main().events().onEvent(x -> {
if (x.equals("start")) {
Connector<String[]> reply = self.system().channels().<String[]>open();
mapper.send(new Get("dns-main", reply.channel()));
reply.events().onEvent(
url -> System.out.println(url[0] + "." + url[1]));
} else if (x.equals("end")) {
self.main().seal();
}
})
);
Channel<String> ch = system.spawn(clientProto);
Above, when the reactor receives the "start"
event, it opens a new channel reply
that accepts List[String]
events. It then sends the MapReactor
a Get
event
with the "dns-main"
key and the channel. Finally, the reactor listens to events
sent back and prints the URL to the standard output.
Another new thing in this code is in the "end"
case of the pattern match. Here,
the reactor calls seal
on the main channel to indicate that it will not receive
any further events on that channel. Once all non-daemon channels become sealed, the
reactor terminates.
A reactor terminates either when all its non-daemon channels are sealed, or when its constructor or some event handler throws an exception.
Let’s start the client reactor and see what happens:
ch ! "start"
ch.send("start");
At this point, we should witness the URL on the standard output.
Finally, we can send the "end"
message to the client reactor to stop it.
ch ! "end"
ch.send("end");