Schedulers
Each reactor template can be used to start multiple reactor instances, and each reactor instance can be started with a different reactor scheduler. Different schedulers have different characteristics in terms of execution priority, frequency, latency and throughput. In this section, we’ll take a look at how to use a non-default scheduler, and how to define custom schedulers when necessary.
We start with the import of the standard Reactors.IO package:
import io.reactors._
import io.reactors.japi.*;
We then define a reactor that logs incoming events,
reports every time it gets scheduled,
and ends after being scheduled three times.
We will use the sysEvents
stream of the reactor,
which will be explained shortly -
for now, all you need to know is that this stream produces
events when the reactor gets some execution time (i.e. gets scheduled),
and pauses its execution (i.e. gets preempted).
class Logger extends Reactor[String] {
var count = 3
sysEvents onMatch {
case ReactorScheduled =>
println("scheduled")
case ReactorPreempted =>
count -= 1
if (count == 0) {
main.seal()
println("terminating")
}
}
main.events.onEvent(println)
}
public static class Logger extends Reactor<String> {
private int count = 3;
public Logger() {
sysEvents().onEvent(x -> {
if (x.isReactorScheduled()) {
System.out.println("scheduled");
} else if (x.isReactorPreempted()) {
count -= 1;
if (count == 0) {
main().seal();
System.out.println("terminating");
}
}
});
main().events().onEvent(
x -> System.out.println(x)
);
}
}
Before starting, we need to create a reactor system, as we learned in the previous sections:
val system = new ReactorSystem("test-system")
ReactorSystem system = ReactorSystem.create("test-system");
Every reactor system is bundled with a default scheduler and some additional predefined schedulers. When a reactor is started, it uses the default scheduler, unless specified otherwise. In the following, we override the default scheduler with the one using Scala’s global execution context, i.e. Scala’s own default thread pool:
val proto = Proto[Logger].withScheduler(
JvmScheduler.Key.globalExecutionContext)
val ch = system.spawn(proto)
Proto<String> proto = Proto.create(Logger.class).withScheduler(
Scheduler.GLOBAL_EXECUTION_CONTEXT);
Channel<String> ch = system.spawn(proto);
In Scala.js, there is no multi-threading - executions inside a single JavaScript
runtime must execute in a single thread. For this reason, you will need to use
a special JsScheduler.Key.default
instance with the Scala.js frontend.
system.spawn(proto.withScheduler(JsScheduler.Key.default))
Running the snippet above should start the Logger
reactor and print scheduled
once, because starting a reactor schedules it even before any events arrive.
If we now send an event to the main channel, we will see scheduled
printed again,
followed by the event itself.
ch ! "event 1"
ch.send("event 1");
Sending the event again decrements the reactor’s counter. The main channel gets sealed, leaving the reactor in a state without non-daemon channels, and the reactor terminates:
ch ! "event 2"
ch.send("event 2");
Reactor Lifecycle
Every reactor goes through a certain set of stages during its lifetime,
jointly called a reactor lifecycle.
When the reactor enters a specific stage, it emits a lifecycle event.
All lifecycle events are dispatched on a special daemon event stream called sysEvents
.
Every reactor is created with this event stream.
The reactor lifecycle is as follows:
- After calling
spawn
, the reactor is scheduled for execution. Its constructor is started asynchronously, and immediately after that, aReactorStarted
event is dispatched. - Then, whenever the reactor gets execution time,
the
ReactorScheduled
event is dispatched. After that, some number of events are dispatched on normal event streams. - When the scheduling system decides to preempt the reactor,
the
ReactorPreempted
event is dispatched. This scheduling cycle can be repeated any number of times. - Eventually, the reactor terminates,
either by normal execution or exceptionally.
If a user code exception terminates execution,
a
ReactorDied
event is dispatched. - In either normal or exceptional execution,
ReactorTerminated
event gets emitted.
This reactor lifecycle is shown in the following diagram:
|
V
ReactorStarted
|
V
ReactorScheduled <----
| \
V /
ReactorPreempted -----
| \
| ReactorDied
V /
ReactorTerminated <---
|
x
To test this, we define the following reactor:
class LifecycleReactor extends Reactor[String] {
var first = true
sysEvents onMatch {
case ReactorStarted =>
println("started")
case ReactorScheduled =>
println("scheduled")
case ReactorPreempted =>
println("preempted")
if (first) first = false
else throw new Exception("Manually thrown.")
case ReactorDied(_) =>
println("died")
case ReactorTerminated =>
println("terminated")
}
}
public static class LifecycleReactor extends Reactor<String> {
private boolean first = true;
public LifecycleReactor() {
sysEvents().onEvent(x -> {
if (x.isReactorStarted()) System.out.println("started");
else if (x.isReactorScheduled()) System.out.println("scheduled");
else if (x.isReactorPreempted()) {
System.out.println("preempted");
if (first) first = false;
else throw new RuntimeException("This exception is expected!");
} else if (x.isReactorDied()) System.out.println("died");
else if (x.isReactorTerminated()) System.out.println("terminated");
});
}
}
Upon creating the lifecycle reactor,
the reactor gets the ReactorStarted
event,
and then ReactorStarted
and ReactorScheduled
events.
The reactor then gets suspended,
and remains that way until the scheduler gives it more execution time.
val ch = system.spawn(Proto[LifecycleReactor])
Channel<String> ch = system.spawn(Proto.create(LifecycleReactor.class));
The scheduler executes the reactor again
when it detects that there are pending messages.
If we send an event to the reactor now,
we can see the same cycle of ReactorScheduled
and ReactorPreempted
on the standard output.
However, the ReactorPreempted
handler this time throws an exception.
The exception is caught, ReactorDied
event is emitted,
followed by the mandatory ReactorTerminated
event.
ch ! "event"
ch.send("event");
At this point, the reactor is fully removed from the reactor system.