Protocols in the Reactor Model

The basic primitives of the reactor model - reactors, event streams and channels - allow composing powerful communication abstractions. In the following sections, we will go through some of the basic communication protocols that the Reactors framework supports. What these protocols have in common is that they are not artificial extensions of the basic model. Rather, they are composed from basic abstractions and simpler protocols.

In this section, we go through one of the simplest protocols, namely the server-client protocol. We first show how to implement a simple server-client protocol ourselves. After that, we show how to use the standard server-client implementation provided by the Reactors framework. Note that, in the later sections on protocols, we will not dive into the implementation, but instead immediately show how to use the already implemented protocol provided by the framework.

This will serve several purposes. First, you should get an idea of how to implement a communication pattern using event streams and channels. Second, you will see that there is more than one way to implement a protocol and expose it to clients. Finally, you will see how protocols are structured in the Reactors framework.

Custom Server-Client Protocol

Let’s implement the server-client protocol ourselves. Before we start, we import the contents of the io.reactors package. We then create a default reactor system:

import io.reactors._

val system = ReactorSystem.default("test-system")

Let’s now consider the server-client protocol more closely. This protocol proceeds as follows: first, the client sends a request value to the server. Then, the server uses the request to compute a response value and send it to the client. But to do that, the server needs a response channel to send the value along. This means that the client must not only send the request value to the server, but also send it a channel used for the reply. The request sent by the client is thus a tuple with a value and the reply channel. The channel used by the server must accept such tuples. We capture these relations with the following two types:

type Req[T, S] = (T, Channel[S])

type Server[T, S] = Channel[Req[T, S]]

Here, the T is type of the request value, and S is the type of the response value. The Req type represents the request - a tuple of the request value T and the reply channel for responses S. The Server type is then just a channel that accepts request objects.

Question arises - how can we create a Server channel? There are several requirements that a factory method for the Server channel should satisfy. First, it should be generic in the request and the response type. Second, it should be generic in how the request type is mapped to the response type. Third, when a request is sent to the server, the mapped response should be sent back to the server. Putting these requirements together, we arrive at the following implementation of the server method, which instantiates a new server:

def server[T, S](f: T => S): Server[T, S] = {
  val c = system.channels.open[Req[T, S]]
  c.events onMatch {
    case (x, reply) => reply ! f(x)
  }
  c.channel
}

The server method starts by creating a connector for Req[T, S] type. It then adds a callback to the event stream of the newly created connector. The callback decomposes the request into the request value x of type T and the reply channel, then maps the input value using the specified mapping function f, and finally sends the mapped value of type S back along the reply channel. The server method returns the channel associated with this connector.

We can use this method to start a server that maps input strings into uppercase strings, as follows:

val proto = Reactor[Unit] { self =>
  val s = server[String, String](_.toUpperCase)
}
system.spawn(proto)

Next, we will implement the client protocol. We will define a new method ? on the Channel type, which sends the request to the server. This method cannot immediately return the server’s response, because the response arrives asynchronously. Instead, ? must return an event stream with the server’s reply. So, the ? method must create a reply channel, send the Req object to the server, and then return the event stream associated with the reply channel. This is shown in the following:

implicit class ChannelOps[T, S: Arrayable](val s: Server[T, S]) {
  def ?(x: T): Events[S] = {
    val reply = system.channels.daemon.open[S]
    s ! (x, reply.channel)
    reply.events
  }
}

We show the interaction between the server and the client by instantiating the two protocols within the same reactor. The server just returns an uppercase version of the input string, while the client sends the request with the content "hello", and prints the response to the standard output. This is shown in the following snippet:

val serverClient = Reactor[Unit] { self =>
  val s = server[String, String](_.toUpperCase)

  (s ? "hello") onEvent { upper =>
    println(upper)
  }
}
system.spawn(serverClient)

Our implementation works, but it is not very useful to start the server-client protocol inside a single reactor. Normally, the server and the client are separated by the network, or are at least different reactors running inside the same reactor system.

It turns out that, with our current toy implementation, it is not straightforward to instantiate the server-client protocol in two different reactors. The main reason for this is that once the server channel is instantiated within one reactor, we have no way of seeing it in another reactor. We will see how to easily overcome this problem when using the standard server-client implementation in the Reactors framework.

Standard Server-Client Protocol

We have just seen an example implementation of the server-client protocol, which relies solely on the basic primitives provided by the Reactors framework. However, the implementation that was presented is very simplistic, and it ignores several important concerns. For example, how do we stop the server protocol? Then, in our toy example, we instantiated the server-client protocol in a single reactor, but is it possible to instantiate server-client in two different reactors?

In this section, we take a close look at how the server-client protocol is exposed in the Reactors framework, and explain how some of the above concerns are addressed. Most predefined protocols can be instantiated in several ways:

  • By installing the protocol on the existing connector inside an existing reactor, which has an appropriate type for that protocol. This has the benefit that you can install the protocol on, for example, the main channel of a reactor. It also makes the protocol accessible to other reactors that are aware of that respective channel.
  • By creating a new connector for the protocol, and then installing the protocol to that connector. This has the benefit that you can fully customize the protocol’s connector (for example, name it), but you will need to find some way of sharing the protocol’s channel with other reactors - for example, by relying on the Channels service, or by sending the channel to specific reactors.
  • By creating a new Proto object for a reactor that exclusively runs a specific protocol. This has the benefit of being able to fully configure both the reactor that you wish to start (e.g. specify a scheduler, reactor name or transport).
  • By immediately spawning a reactor that runs a specific protocol. This is the most concise option.

These approaches are mostly equivalent, but they offer different tradeoffs between convenience and customization. Let’s take a look at the predefined server-client protocol to study these approaches in turn.

Adding a Protocol with an Existing Connector

We first import the io.reactors and io.reactors.protocol packages, and then instantiate the default reactor system:

import io.reactors._
import io.reactors.protocol._

val system = ReactorSystem.default("test-system")

When using an existing connector, we need to ensure that the connector’s type matches the type needed by the protocol. In the case of a server, the connector’s event type must be Server.Req. In the following, we define a server prototype that multiplies the request integer by 2. To install the server-client protocol, we call the serve method on the connector:

val proto = Reactor[Server.Req[Int, Int]] { self =>
  self.main.serve(x => x * 2)
}
val server = system.spawn(proto)

The client can then query the server in the standard way, using the ? operator:

system.spawnLocal[Unit] { self =>
  (server ? 7) onEvent { response =>
    println(response)
  }
}

Adding a Protocol to a New Connector

Let’s say that the main channel is already used for something else - for example, the main channel could be accepting termination requests. Here, the main channel cannot be shared with the server protocol - protocols usually need exclusive ownership of the respective channel. In this case, we want to create a new connector for the protocol.

This approach is very similar to using an existing connector. The only difference is that we must first create the connector itself, which gives us an opportunity to customize it. In particular, we will make the server a daemon channel, and we will assign it a specific name "server", so that other reactors can find it. We will name the reactor itself "Multiplier". To create a server connector, we use the convenience method called server on the channel builder object:

val proto = Reactor[String] { self =>
  self.main.events onMatch {
    case "terminate" => self.main.seal()
  }

  self.system.channels.daemon.named("server").server[Int, Int].serve(_ * 2)
}
system.spawn(proto.withName("Multiplier"))

The client must now query the name service to find the server channel, and from there on it proceeds as before:

system.spawnLocal[Unit] { self =>
  self.system.channels.await[Server.Req[Int, Int]](
    "Multiplier", "server"
  ) onEvent { server =>
    (server ? 7) onEvent { response =>
      println(response)
    }
  }
}

Creating a Reactor Prototype for a Specific Protocol

When we are sure that the reactor will exist only, or mainly, for the purposes of the server protocol, we can directly create a reactor server. To do this, we use the server method on the Reactor companion object. The server method returns the Proto object for the server, which can then be further customized before spawning the reactor. The server method takes a user function that is invoked each time a request arrives. This user function takes the state of the server and the request event, and returns the response event.

Here is an example:

val proto = Reactor.server[Int, Int]((state, x) => x * 2)
val server = system.spawn(proto)

system.spawnLocal[Unit] { self =>
  (server ? 7) onEvent { response =>
    println(response)
  }
}

The state object for the server contains the Subscription object, which allows the users to stop the server if an unexpected event arrives.

Spawning a Reactor for a Specific Protocol

Finally, we can immediately start a server reactor, without any customization. This is done by passing a server function to the server method on the ReactorSystem, as follows:

val server = system.server[Int, Int]((state, x) => x * 2)

system.spawnLocal[Unit] { self =>
  (server ? 7) onEvent { response =>
    println(response)
  }
}

In the following sections, we will take a look at some other predefined protocols. Generally, these protocols will have a similar usage structure.