2-Way Communication Protocol

In this section, we show a 2-way communication protocol that is a part of the reactors-protocol module. In 2-way communication, both the server and the client obtain a link handle of type TwoWay, which allows them to send and receive an unlimited number of events, until they decide to close this link.

In this section, we show a two-way communication protocol. In two-way communication, two parties obtain a link handle of type TwoWay, which allows them to simultaneously send and receive an unlimited number of events, until they decide to close this link. One party initiates the link, so we call that party the client, and the other party the server. The TwoWay type has two type parameters I and O, which describe the types of input and output events, respectively, from the client’s point of view. Show graphically, this looks as follows:

 client-side  I <---------- I  server-side
              O ----------> O

Note that the types of the TwoWay object are reversed depending on whether you are looking at the link from the server-side or from the client-side. The type of the client-side 2-way link is:

val clientTwoWay: TwoWay[In, Out]

Whereas the type of the server would see the 2-way link as:

val serverTwoWay: TwoWay[Out, In]

Accordingly, the TwoWay object contains an output channel output, and an input event stream input. To close the link, the TwoWay object contains a subscription object called subscription, which is used to close the link and free the associated resources.

Let’s instantiate the 2-way channel protocol. We start by importing the contents of the io.reactors and the io.reactors.protocol packages, and then instantiating a default reactor system.

import io.reactors._
import io.reactors.protocol._

val system = ReactorSystem.default("test-system")

The 2-way communication protocol works in two phases. First, a client asks a 2-way link server to establish a 2-way link. Second, the client and the server use the 2-way channel to communicate. Note that a single 2-way link server can create many 2-way links.

As explained in an earlier section, there are usually several ways to instantiate the protocol - either as standalone reactor that runs only that protocol, or as a single protocol running inside a larger reactor. We start with a more general variant. We will declare a reactor, and instantiate a 2-way link server within that reactor. The 2-way server will receive strings, and respond with the length of those strings.

val seeker = Reactor[Unit] { self =>
  val lengthServer =
    self.system.channels.twoWayServer[Int, String].serveTwoWay()

The above two lines declare a reactor Proto object, which instantiates a 2-way server called lengthServer. We did this by first calling the twoWayServer method on the Channels service, and specifying the input and the output type (from the point of view of the client). We then called serverTwoWay to start the protocol. In our case, we set the input type I to Int, meaning that the client will receive integers from the server, and the output type O to String, meaning that the client will be sending strings to the server.

The resulting object lengthServer represents the state of the link. It contains an event stream called links, which emits an event every time some client requests a link. If we do nothing with this event stream, the server will be silent - it will start new links, but ignore events incoming from the clients. How the client and server communicate over the 2-way channel (and when to terminate this communication) is up to the user to specify. To customize the 2-way communication protocol with our own logic, we need to react to the TwoWay events emitted by links, and install callbacks to the TwoWay objects.

In our case, for each incoming 2-way link, we want to react to input strings by computing the length of the string and then sending that length back along the output channel. We can do this as follows:

  lengthServer.links.onEvent { serverTwoWay =>
    serverTwoWay.input.onEvent { s =>
      serverTwoWay.output ! s.length
    }
  }

So far, so good - we have a working instance of the 2-way link server. The current state of the reactor can be illustrated with the following figure, where our new channel appears alongside standard reactor channels:

Channel[TwoWay.Req[Int, String]]
    |
    |       #-----------------------#
    \       |                       |
     \------o--> links              |
            |                       |
            o--> main channel       |
            |                       |
            o--> sys channel        |
            |                       |
            #-----------------------#

Note that the type of the 2-way server channel is Channel[TwoWay.Req[Int, String]]. If you would like to know what TwoWay.Req[Int, String]] type exactly is, you can study the implementation source code. However, if you only want to use the 2-way protocol, then understanding the implementation internals is not required, so we will skip that part.

Next, let’s start the client-side part of the protocol. The client must use the 2-way server channel to request a link. The lengthServer object that we saw earlier has a field channel that must be used for this purpose. Generally, this channel must be known to the client (only the channel must be shared, not the complete lengthServer object). To make things simple, we will instantiate the client-side part of the protocol in the same reactor as the server-side part.

To connect to the server, the client must invoke the connectTwoWay extension method on the channel. This method is only available when the package io.reactors.protocol is imported, and works on 2-way server channels. The connect method returns an IVar (single element event stream), which is completed with a TwoWay object once the link is established.

In the following, we connect to the server. Once the server responds, we use the TwoWay[Int, String] object to send a string event, and then print the length event that we get back:

  lengthServer.channel.connect() onEvent { clientTwoWay =>
    clientTwoWay.output ! "What's my length?"
    clientTwoWay.input onEvent { len =>
      if (len == 17) println("received correct reply")
      else println("reply incorrect: " + len)
    }
  }
}

system.spawn(seeker)

After the link is established, the state of the reactor and its connectors is as shown in the following diagram:

             #-----------------------#
             |                       |
             o--> links              |
             |                       |
             |                       |
      /---->[ ]-->                   |
      |     [ ]    serverTwoWay      |
      | /---[ ]<--                   |
      | |    |                       |
      | |    |                       |
      | \-->[ ]-->                   |
      |     [ ]    clientTwoWay      |
      \-----[ ]<--                   |
             |                       |
             o--> main channel       |
             |                       |
             o--> sys channel        |
             |                       |
             #-----------------------#

Note that, in this case, the two-way channel has both endpoints in the same reactor. This is because we called twoWayServe and connect in the same reactor, for the purposes of demonstration. In real scenarios, we would typically invoke these two operations on separate reactors.

Two-Way Server Reactors

In the next example, we instantiate the two-way protocol between two reactors. Furthermore, we use the short-hand version that both declares a reactor, and uses its main channel as the 2-way link server. We call this reactor a 2-way server.

To create a Proto object of a 2-way server, we use the twoWayServer extension method on Reactor. This method takes a lambda with the server state, which we saw earlier, whose links event stream emits established twoWay links. The lambda is invoked each time when a link is established.

In the following, we create a reactor seriesCalculator, which emits elements of the series 1.0 / i. For each twoWay link that is established, it responds to each integer n that it receives with a stream of events 1.0 / i, where i ranges from 1 to n:

val seriesCalculator = Reactor.twoWayServer[Double, Int] {
  server =>
  server.links onEvent { twoWay =>
    twoWay.input onEvent { n =>
      for (i <- 1 until n) {
        twoWay.output ! (1.0 / i)
      }
    }
  }
}
val server = system.spawn(seriesCalculator)

Two-Way Connection Subscriptions

We mentioned before that the 2-way server object has a subscription that can be used to stop the server and prevent establishing new links. However, existing links are not closed when the server is stopped. To close the existing links, each TwoWay object has its own subscription value.

Let’s write a client for the seriesCalculator reactor that we started previously. We request 2 series elements from the server, but unsubscribe immediately after receiving the first element:

system.spawnLocal[Unit] { self =>
  server.connect() onEvent { twoWay =>
    twoWay.output ! 2
    twoWay.input onEvent { x =>
      println(x)
      twoWay.subscription.unsubscribe()
    }
  }
}

By running the client reactor, we can see that the second event from the server is never printed to the standard output.

Note that the server-side TwoWay object is not closed in the previous example. In real scenarios, you should take care to additionally call unsubscribe on the server side TwoWay object. How the server decides that it is time to close the link is up to you - for example, the server could close the link when it receives a negative n.

The TwoWay protocol is relatively low-level. It does not itself close its links, and delegates the task of doing so to its users. As such, its purpose is mainly to serve as a building block for higher-level protocols.