Schedulers

Each reactor template can be used to start multiple reactor instances, and each reactor instance can be started with a different reactor scheduler. Different schedulers have different characteristics in terms of execution priority, frequency, latency and throughput. In this section, we’ll take a look at how to use a non-default scheduler, and how to define custom schedulers when necessary.

We start with the import of the standard Reactors.IO package:

import io.reactors._
import io.reactors.japi.*;

We then define a reactor that logs incoming events, reports every time it gets scheduled, and ends after being scheduled three times. We will use the sysEvents stream of the reactor, which will be explained shortly - for now, all you need to know is that this stream produces events when the reactor gets some execution time (i.e. gets scheduled), and pauses its execution (i.e. gets preempted).

class Logger extends Reactor[String] {
  var count = 3
  sysEvents onMatch {
    case ReactorScheduled =>
      println("scheduled")
    case ReactorPreempted =>
      count -= 1
      if (count == 0) {
        main.seal()
        println("terminating")
      }
  }
  main.events.onEvent(println)
}
public static class Logger extends Reactor<String> {
  private int count = 3;
  public Logger() {
    sysEvents().onEvent(x -> {
      if (x.isReactorScheduled()) {
        System.out.println("scheduled");
      } else if (x.isReactorPreempted()) {
        count -= 1;
        if (count == 0) {
          main().seal();
          System.out.println("terminating");
        }
      }
    });
    main().events().onEvent(
      x -> System.out.println(x)
    );
  }
}

Before starting, we need to create a reactor system, as we learned in the previous sections:

val system = new ReactorSystem("test-system")
ReactorSystem system = ReactorSystem.create("test-system");

Every reactor system is bundled with a default scheduler and some additional predefined schedulers. When a reactor is started, it uses the default scheduler, unless specified otherwise. In the following, we override the default scheduler with the one using Scala’s global execution context, i.e. Scala’s own default thread pool:

val proto = Proto[Logger].withScheduler(
  JvmScheduler.Key.globalExecutionContext)
val ch = system.spawn(proto)
Proto<String> proto = Proto.create(Logger.class).withScheduler(
  Scheduler.GLOBAL_EXECUTION_CONTEXT);
Channel<String> ch = system.spawn(proto);

In Scala.js, there is no multi-threading - executions inside a single JavaScript runtime must execute in a single thread. For this reason, you will need to use a special JsScheduler.Key.default instance with the Scala.js frontend.

system.spawn(proto.withScheduler(JsScheduler.Key.default))

Running the snippet above should start the Logger reactor and print scheduled once, because starting a reactor schedules it even before any events arrive. If we now send an event to the main channel, we will see scheduled printed again, followed by the event itself.

ch ! "event 1"
ch.send("event 1");

Sending the event again decrements the reactor’s counter. The main channel gets sealed, leaving the reactor in a state without non-daemon channels, and the reactor terminates:

ch ! "event 2"
ch.send("event 2");

Reactor Lifecycle

Every reactor goes through a certain set of stages during its lifetime, jointly called a reactor lifecycle. When the reactor enters a specific stage, it emits a lifecycle event. All lifecycle events are dispatched on a special daemon event stream called sysEvents. Every reactor is created with this event stream.

The reactor lifecycle is as follows:

  • After calling spawn, the reactor is scheduled for execution. Its constructor is started asynchronously, and immediately after that, a ReactorStarted event is dispatched.
  • Then, whenever the reactor gets execution time, the ReactorScheduled event is dispatched. After that, some number of events are dispatched on normal event streams.
  • When the scheduling system decides to preempt the reactor, the ReactorPreempted event is dispatched. This scheduling cycle can be repeated any number of times.
  • Eventually, the reactor terminates, either by normal execution or exceptionally. If a user code exception terminates execution, a ReactorDied event is dispatched.
  • In either normal or exceptional execution, ReactorTerminated event gets emitted.

This reactor lifecycle is shown in the following diagram:

    |
    V
ReactorStarted
    |
    V
ReactorScheduled <----
    |                 \
    V                 /
ReactorPreempted -----
    |                 \
    |            ReactorDied
    V                 /
ReactorTerminated <---
    |
    x

To test this, we define the following reactor:

class LifecycleReactor extends Reactor[String] {
  var first = true
  sysEvents onMatch {
    case ReactorStarted =>
      println("started")
    case ReactorScheduled =>
      println("scheduled")
    case ReactorPreempted =>
      println("preempted")
      if (first) first = false
      else throw new Exception("Manually thrown.")
    case ReactorDied(_) =>
      println("died")
    case ReactorTerminated =>
      println("terminated")
  }
}
public static class LifecycleReactor extends Reactor<String> {
  private boolean first = true;
  public LifecycleReactor() {
    sysEvents().onEvent(x -> {
      if (x.isReactorStarted()) System.out.println("started");
      else if (x.isReactorScheduled()) System.out.println("scheduled");
      else if (x.isReactorPreempted()) {
        System.out.println("preempted");
        if (first) first = false;
        else throw new RuntimeException("This exception is expected!");
      } else if (x.isReactorDied()) System.out.println("died");
      else if (x.isReactorTerminated()) System.out.println("terminated");
    });
  }
}

Upon creating the lifecycle reactor, the reactor gets the ReactorStarted event, and then ReactorStarted and ReactorScheduled events. The reactor then gets suspended, and remains that way until the scheduler gives it more execution time.

val ch = system.spawn(Proto[LifecycleReactor])
Channel<String> ch = system.spawn(Proto.create(LifecycleReactor.class));

The scheduler executes the reactor again when it detects that there are pending messages. If we send an event to the reactor now, we can see the same cycle of ReactorScheduled and ReactorPreempted on the standard output. However, the ReactorPreempted handler this time throws an exception. The exception is caught, ReactorDied event is emitted, followed by the mandatory ReactorTerminated event.

ch ! "event"
ch.send("event");

At this point, the reactor is fully removed from the reactor system.