The Parallel Universe Blog

July 26, 2013

Distributed Actors in Java and Clojure

Last week, we released version 0.2.0 of Quasar and Pulsar. Quasar adds true lightweight threads to the JVM, which are great for writing scalable applications. Pulsar is a Clojure API for Quasar.

On top of the lightweight thread foundation, Qusar and Pulsar provide Erlang-like actors (as well as Go-like channels). One of the main additions in the last release is distributed actors, which let actors discover one another and exchange messages in a cluster. Quasar’s clustering uses Galaxy, our in-memory data grid, and while this feature is not yet recommended for use in production, it’s ready for exploration.

To enable clustering in a Quasar/Pulsar application, add the quasar-galaxy dependency to your project.

In gradle (and similarly in Maven), add this to your dependencies:

runtime "co.paralleluniverse:quasar-galaxy:0.2.0"

In Leiningen:

[co.paralleluniverse/quasar-galaxy "0.2.0"]

In addition, we want the application to join the cluster as soon as it starts up. To do that we need to set the co.paralleluniverse.galaxy.autoGoOnline system property to true.

The first step to distributing actors is making one (or some) of the actors discoverable. In Quasar and Pulsar, if clustering is enabled with the aforementioned steps, as soon as an actor is registered it becomes globally discoverable throughout the cluster. Once an actor is discoverable, actors in other cluster nodes can find it and send messages to it.

An actor does not have to be registered and discoverable in order for remote actors to send it messages. An actor can send a reference to itself in a message to a remote actor, and then the remote actor knows its identity (without discovery) and can send messages back.

To see how this works, lets follow the code of the distributed version of the classic ping-pong example (from the Erlang tutorials). The Java (Quasar) code for the example is found here, and the Clojure (Pulsar) code is here.

To run the Java examples, type the following commands (each in a different shell), in the Quasar directory, after cloning the repository:

./gradlew :quasar-galaxy:run -PmainClass=co.paralleluniverse.galaxy.example.pingpong.Ping
./gradlew :quasar-galaxy:run -PmainClass=co.paralleluniverse.galaxy.example.pingpong.Pong

To run the Clojure examples, clone the Pulsar repository, and type (each command in a different shell):

lein with-profile cluster update-in :jvm-opts conj '"-Dgalaxy.nodeId=2"' '"-Dgalaxy.port=7052"' '"-Dgalaxy.slave_port=8052"' -- run -m co.paralleluniverse.pulsar.examples.cluster.ping
lein with-profile cluster update-in :jvm-opts conj '"-Dgalaxy.nodeId=1"' '"-Dgalaxy.port=7051"' '"-Dgalaxy.slave_port=8051"' -- run -m co.paralleluniverse.pulsar.examples.cluster.pong

Now lets take look at two excerpts from the Java code. This is the Pong actor:

new BasicActor<Message, Void>() {
    @Override
    protected Void doRun() throws InterruptedException, SuspendExecution {
        register("pong");

        loop:
        while (true) {
            Message msg = receive();
            System.out.println("pong received " + msg.type);
            switch (msg.type) {
                case PING:
                    msg.from.send(new Message(this, PONG));
                    break;
                case FINISHED:
                    break loop;
            }
        }
        return null;
    }
}

and this is the Ping actor:

new BasicActor<Message, Void>() {
    @Override
    protected Void doRun() throws InterruptedException, SuspendExecution {
        Actor pong;
        while ((pong = getActor("pong")) == null) {
            System.out.println("waiting for pong");
            Strand.sleep(3000);
        }
        System.out.println("pong is " + pong);

        for (int i = 0; i < 3; i++) {
            pong.send(new Message(this, PING));
            Message msg = receive();
            System.out.println("ping received " + msg.type);
        }

        pong.send(new Message(null, FINISHED));
        return null;
    }
}

Pong registers itself and then awaits a message. When it receives a PING it replies to the sender. When it receives FINISHED it is done.

Ping first awaits Pong to appear on the cluster (with the getActor method). Then, it sends Pong three PINGs, and finally a FINISHED message. Note how Ping never registers itself, but it does send a reference to itself (it passes this as the from field in the message), so that Pong can reply.

Here’s the complete code for the same example in Pulsar. Here’s Pong:

(defsfn pong []
  (register! :pong @self)
  (loop []
    (receive
      :finished (println "Pong finished")
      [:ping ping] (do
                     (println "Pong received ping")
                     (! ping :pong)
                     (recur)))))

(defn -main []
  (join (spawn pong))
  (System/exit 0))

And there’s Ping:

(defsfn ping [n]
  (dotimes [i n]
    (! :pong [:ping @self])
    (receive
      :pong (println "Ping received pong")))
  (! :pong :finished)
  (println "ping finished"))

(defn -main []
  (when (nil? (whereis :pong))
    (println "Waiting for pong to register...")
    (loop []
      (when (nil? (whereis :pong))
        (Thread/sleep 500)
        (recur))))
  (join (spawn ping 3))
  :ok)

We have only scratched the surface of what can be done with actors. To learn more, take a look at the Pulsar documentation, and the Quasar tests and examples here (full Quasar documentation is forthcoming).

Next week, we’ll take a look at fibers (Quasar’s lightweight threads), and how they can be used to transform asynchronous code that uses callbacks to simple, sequential code that offers the same performance and scalability as the asynchronous code.

Tags: , ,

comments powered by Disqus