Services
In the earlier sections, we learned that reactors delimit concurrent executions, and that event streams allow routing events within each reactor. This is already a powerful set of abstractions, and we can use reactors and event streams to write all kinds of distributed programs. However, such a model is restricted to reactor computations only – we cannot, for example, start blocking I/O operations, read from a temperature sensor, wait until a GPU computation completes, or react to temporal events. In some cases, we need to interact with the native capabilities of the OS, or tap into a rich ecosystem of existing libraries. For this purpose, every reactor system has a set of services – protocols that relate event streams to the outside world.
In this section, we will take a closer look at various services that are available by default, and also show how to implement custom services and plug them into reactor systems.
The logging service
We will start with the simplest possible service called Log
.
This service is used to print logging messages to the standard output.
In the following, we create an anonymous reactor that uses the Log
service.
We start by importing the Log
service.
import io.reactors.services.Log
import io.reactors.japi.services.Log;
Next, we create a reactor system, and start a reactor instance.
The reactor invokes the service
method on the reactor system,
which returns the service with the specified type.
The reactor then calls the apply
method on the log
to print a message,
and seals itself.
system.spawn(Reactor[String] { self =>
val log = system.service[Log]
log("Test reactor started!")
self.main.seal()
})
Proto<String> proto = Reactor.apply(self -> {
Log log = system.service(Log.class);
log.info("Test reactor started!");
self.main().seal();
});
system.spawn(proto);
Running the above snippet prints the timestamped message to the standard output.
This example is very simple, but we use it to describe some important properties of services.
- Reactor system’s method
service[S]
returns a service of typeS
. - The service obtained this way is a lazily initialized singleton instance – there exists at most one instance of the service per reactor system, and it is created only after being requested by some reactor.
- Some standard services are eagerly initialized when the reactor system gets
created. Such services are usually available as a standalone method on the
ReactorSystem
class. For example,system.log
is an alternative way to obtain theLog
service.
The clock service
Having seen a trivial service example,
let’s take a look at a more involved service that connects reactors with
the outside world of events, namely, the Clock
service.
The Clock
service is capable of producing time-driven events,
for example, timeouts, countdowns or periodic counting.
This service is standard, so it is available by calling either system.clock
or system.service[Clock]
- both return the same instance.
In the following, we create an anonymous reactor that uses the Clock
service
to create a timeout event after 1 second. The timeout
method of the clock service
returns an event stream of the Unit
type
(more specifically, it returns an IVar
event stream, i.e. a single-assignment
variable, which always produces at most one event).
We install a callback to the timeout
event stream, which seals the main channel
of this reactor.
import scala.concurrent.duration._
val done = Promise[Boolean]()
system.spawn(Reactor[String] { self =>
system.clock.timeout(1.second) on {
done.success(true)
self.main.seal()
}
})
Proto<String> proto = Reactor.apply(self -> {
system.clock().timeout(1000).onEvent(v -> {
System.out.println("Timeout!");
self.main().seal();
});
});
system.spawn(proto);
The Clock
service uses a separate timer thread under-the-hood, which sends events
to the reactor when the timer thread decides it is time to do so. The events are
sent on a special channel created by the timeout
method, so they are seen only
on the corresponding event stream combinator.
Graphically, this looks as follows:
#---------------------------#
| |
| |
main.channel ---o--> main.events |
| |
o--> sysEvents |
| |
Channel[Unit] ---o--> system.clock.timeout |
^ | |
| | |
<Timer thread> | |
| |
#---------------------------#
When the main channel gets sealed, the reactor terminates - the timeout
event
stream creates a daemon channel under-the-hood, which does not prevent our anonymous
reactor from terminating after non-daemon channels are gone.
The Clock
service depicts a general pattern - when a native entity or an external
event needs to communicate with a reactor, it creates a new channel, and then
asynchronously sends events to it.
The channels service
Some services provide event streams that work with reactor system internals.
The Channels
service is one such example - it provides an event-driven view over all
channels that exist in the current reactor system. This allows polling the channels
that are currently available, or waiting until a channel with a specific name becomes
available. Awaiting a channel is particularly useful, as it allows easier handling of
asynchrony between reactors, which is inherent to distributed systems.
As a side-note, we actually saw the Channels
service earlier, when we used it to
open a second channel in a reactor. The expression system.channels.open
actually
calls the open
method on the standard channel service.
In the following, we will construct two reactors. The first reactor will create a specially named channel after some delay, and the second reactor will await that channel. When the channel appears, the second reactor will send an event to that channel.
val done = Promise[Boolean]()
val first = Reactor[String] { self =>
system.clock.timeout(1.second) on {
val c = system.channels.daemon.named("lucky").open[Int]
c.events on {
done.success(true)
self.main.seal()
}
}
}
system.spawn(first.withName("first"))
system.spawn(Reactor[String] { self =>
system.channels.await[Int]("first", "lucky") onEvent { ch =>
ch ! 7
self.main.seal()
}
})
Proto<String> proto = Reactor.apply(self -> {
system.clock().timeout(1000).onEvent(v -> {
Connector<Integer> c =
system.channels().daemon().named("lucky").<Integer>open();
c.events().onEvent(i -> {
System.out.println("Done!");
self.main().seal();
});
});
});
system.spawn(proto.withName("first"));
system.spawn(Reactor.apply(self -> {
system.channels().<Integer>await("first", "lucky").onEvent(ch -> {
ch.send(7);
self.main().seal();
});
}));
Above, we use the Clock
service seen earlier to introduce a delay in the
first
reactor. In the second reactor, we use the Channels
service to wait
for the channel named "lucky"
of the reactor named "first"
.
Both reactors start approximately at the same time.
After one second, the first
reactor uses the Channels
service to open a new
daemon channel named "lucky"
. The first reactor then installs a callback:
when the first event arrives on the lucky channel,
a promise object (used for our testing purposes) must be completed,
and the main channel must be sealed, so that the reactor terminates.
The second reactor gets an event from the Channels
service - a
channel with the requested name exists, and the second reactor can use it
to communicate. The second reactor sends 7
to the lucky channel, and terminates.
To reiterate, awaiting channels is crucial when establishing temporal order in
an asynchronous system, and the Channels
service is a useful tool for
this purpose.
Custom services
Having seen a few existing services, we now show how to create a custom service.
To do this, we must implement the Protocol.Service
trait,
which has the following members:
class CustomService(val system: ReactorSystem) extends Protocol.Service {
def shutdown(): Unit = ???
}
Note that every service needs to have a constructor with a single ReactorSystem
parameter. The shutdown
method is called when the corresponding reactor system
gets shut down, and is used to free any resources that the service potentially has.
As noted before, a service is a mechanism that gives access to events that a reactor normally cannot obtain from other reactors. Let’s implement a service that notifies a reactor when the enclosing reactor system gets shutdown. For this, we will need to keep a map of the channels that subscribed to the shutdown event.
class Shutdown(val system: ReactorSystem) extends Protocol.Service {
private val subscribers = mutable.Set[Channel[Boolean]]()
private val lock = new AnyRef
def state: Signal[Boolean] = {
val shut = system.channels.daemon.open[Boolean]
lock.synchronized {
subscribers += shut.channel
}
shut.events.toSignal(false).withSubscription(new Subscription {
def unsubscribe(): Unit = {
shut.seal()
lock.synchronized {
subscribers -= shut.channel
}
}
})
}
def shutdown() {
lock.synchronized {
for (ch <- subscribers) ch ! true
}
}
}
The Shutdown
service keeps the active shutdown notification channels in the
subscribers
map. When a reactor requests its state
signal, the service
creates a new connector shut
for that reactor. The corresponding channel
shut.channel
into the subscribers
map. The correspnding shut.events
event
stream is converted to a signal that is initially false
, and will remove its
subscription if the reactor later decides to call unsubscribe
.
Finally, when the system gets shut down, all the existing subscribers receive
a shutdown notification event.
We can now use the Shutdown
service, for example, as follows:
val done = Promise[Boolean]()
val system = ReactorSystem.default("test-shutdown-system")
system.spawn(Reactor[Unit] { self =>
system.service[Shutdown].state on {
self.main.seal()
done.success(true)
}
})
Later, when we shut down the system, we expect that the code in the callback runs and completes the promise:
system.shutdown()
Note that, when implementing a custom service, we are no longer in the same ballpark
as when writing normal reactor code. A service may be invoked by multiple reactors
concurrently, and this is why we had to synchronize access to the subscribers
map
in the Shutdown
implementation. In general, when implementing a custom service,
we have to take care to:
- Never block or acquire a lock in the service constructor.
- Ensure that access to shared state of the service is properly synchronized.
In conclusion, you should use custom services whenever you have a native event-driven API that must deliver events to reactors in your program.