The Parallel Universe Blog

May 02, 2013

Erlang (and Go) in Clojure (and Java)

We’ll start at the end: Quasar and Pulsar are two new open-source libraries in Java and Clojure respectively that add Erlang-like actor-model (and Go-like coroutine/channel) programs to the JVM.

There have been several attempts of porting actors to the JVM. Quasar and Pulsar’s main contribution — from which many of their advantages stem — is true lightweight threads 1. Lightweight threads provide many of the same benefits “regular”, OS threads do, namely a single, simple control flow and the ability to block and wait for some resource to become available while in the meantime allowing other threads to run on the CPU. Unlike regular threads, lightweight threads are not scheduled by the operating system so their context-switch is often faster, and they require far less system resources. As a result, a single machine can handle millions of them.

Quasar/Pulsar actors have the following benefits:

  • Extreme simplicity
  • Pattern matching
  • Binaries and pattern matching on binaries (just like Erlang!)
  • Selective receive
  • Super-scalable asynchronous IO accessed with a synchronous, blocking IO API.

Just to show where we’re headed, here’s an almost line-by-line port of the canonical Erlang pingpong example to Pulsar:

 (if (== n 0)
    (do
      (! :pong :finished)
      (println "ping finished"))
    (do 
      (! :pong [:ping @self])
      (receive 
       :pong (println "Ping received pong"))
      (recur (dec n)))))

(defsusfn pong []
  (receive
   :finished (println "Pong finished")
   [:ping ping] (do
                  (println "Pong received ping")
                  (! ping :pong)
                  (recur))))

(defn -main []
  (register :pong (spawn pong))
  (spawn ping 3)
  :ok)

Why?

Writing correct and efficient multi-threaded code is extremely hard. Unfortunately, given hardware trends, it is has also become absolutely necessary.

Several paradigms that try to make writing correct and efficient multithreaded code have emerged, but the actor model offered by Erlang has won some popularity, and for good reason: the actor model is easy to understand and lends itself to building complex fault-tolerant software. My own admiration for Erlang began when I realized that, to paraphrase Philip Greenspun, every mission-critical military software I’d worked on contained an ad-hoc, informally specified implementation of half of Erlang. Other languages — notably Go with channels and goroutines, and Clojure with agents — and frameworks (like Akka), have copied some aspects of the actor model, but they all suffer from deficiencies that make them more complicated, or less powerful than Erlang.

Porting Erlang’s model to the JVM — following the original as closely as possible while still adhering to the platform’s established practices — would therefore introduce a very powerful tool to what is possibly the most ubiquitous programming environment in use today, and one of the most performant ones.

Quasar, the Implementation

An Erlang-like actor system (i.e. with blocking, selective receives), requires three components:

  1. Coroutines
  2. A scheduler
  3. Queues

Coroutines are functions that can be paused and later resumed. They are necessary to build lightweight threads because they provide “user-mode” management of the stack. A good, multithreaded scheduler, is then added to schedule and run the coroutines while making the best use of available cores. This combination gives us lightweight threads. Finally, queues are needed to implement channels or actor mailboxes.

It is important these three components be logically separated. Modularity assist in future maintenance, and allows for a best-of-breed approach 2.

Of the three pieces, coroutines are the most challenging to implement on the JVM, as they require bytecode instrumentation. Essentially, every possibly-pausing method (the Quasar/Pulsar nomenclature is “suspendable”) must be inspected to find all invocations of other suspendable methods. Before the call to a suspendable method, code must be injected to push the caller’s local variables onto a stack object. Also, code must be injected to the beginning of every suspendable method, that upon resuming would jump to the instruction following the pause-point. This has to be done to all suspendable methods on the (OS thread) stack.

Fortunately, I found Matthias Mann’s continuations library that not only does exactly that and only that, but is also small, fast and easy to understand.

Next, came the scheduler. Here the job was easy. Java’s ForkJoinPool is a masterpiece of multi-threaded task scheduling, made even better for Java 8. In the meantime, we’re using the jsr166e library that contains the fork/join implementation that’s to go into the next major JDK release. We’ve used it in SpaceBase, and the performance improvement over the Java 7 implementation (which was very impressive to begin with) are significant. With a solid foundation like fork/join you can’t go wrong.

Combining Matthias Mann’s coroutines with fork/join gives us lightweight threads, which in Quasar are called fibers. A fiber can block, or “park”, just like a thread, by pausing the continuation and returning from the fork/join task. To be “unparked”, it is simply scheduled (forked) again.

Finally, I rolled my own queue implementations, partly as an exercise, but also because there’s room for improvement over Java’s concurrent Queues in our case. The reason is that the queues we need aren’t symmetric. Many fibers append messages onto the queues but only one pulls them out, so these are single-consumer multiple-producers queues.

Quasar has three types of lock-free queue implementations: concurrent linked-lists based on John M. Mellor-Crummey’s 1987 paper (for some reason the simpler Craig-Landin-Hagersten queue I’ve tried gave worse performance than Java’s multiple-consumer multiple-producer ConcurrentLinkedQueue. I must have done something wrong — I’ll look into that); bounded array queues with some added “mechanical sympathy” based on Martin Thompson’s talks; and an unbounded linked-array queue based on Gidenstam, Sundell and Tsigas. Java’s memory model and excellent concurrency primitives allowed building some pretty fast queues (the bounded queue performed best by far, but the others can be improved; see benchmark code here).

Combining the queues with a poor-man’s condition variable that parks the reading fiber while it waits for messages gives us Go-like channels. A fiber attached to a channel, combined with some lifecycle management (Erlang-like “links”) gives us actors.

A special sendSync operation makes a request-reply conversation between two actors almost as efficient as a simple method call, all without changing actor semantics at all.

Pulsar

Pulsar wraps Quasar with an API identical to Erlang, pattern-matching and all.

I’ve mentioned before that I see Erlang and Clojure as the two languages 3 best suited for programming in our multicore age. They don’t only make concurrency easy (Go tries to do that, too, with a too-early-to-tell measure of success), they also greatly help to ensure that concurrency is correct. In short, they are both languages built for concurrency. They achieve this difficult task because they are opinionated languages with clear philosophies on how modern software should be written.

But Clojure and Erlang have their differences, too. The one most apparent to me is that Erlang was made for programming-in-the-large. Its roots (and motivations) are not in concurrency, but in fault tolerance. Erlang was designed to build large, complex software that needs to withstand failure. Its shortcomings are in supporting low-level constructs. It’s hard or impossible to write a high-performance data structure in Erlang, and it’s ill-suited for computation-heavy work. Erlang delegates these low-level problems to modules written in other languages.

I think Clojure is the opposite. It mostly shines when programming-in-the-small. It’s hard to beat Clojure’s expressivity when it comes to data processing, but it doesn’t offer a clear vision when it comes to fault-tolerance and provides little guidance to high-level organization of complex, large software. But Clojure is a more modern language than Erlang, as a Lisp it’s very malleable, and it runs on a very versatile platform, so it is much easier to bring Erlang to Clojure than the other way around.

Rich Hickey, Clojure’s designer, has made it very clear why he chose not to have Erlang-style actors in Clojure. His makes some good arguments, and far be it from me to disagree with Hickey on Clojure’s philosophy, but I think that that reasoning, too, serves programming-in-the-small rather than in the-large. Actors, in addition to their contribution to simplifying concurrency, help isolate and handle errors. In fact, that is why they’re used in Erlang in the first place. Actors in Clojure may be a tool for programming-in-the-large.

Tim Bray also compares Clojure and Erlang and finds Clojure more elegant, but Erlang’s message-passing model easier to understand. All this goes to say that porting Erlang actors to Clojure might be a worthwhile undertaking (and at the very least — an interesting exercise).

At Parallel Universe we develop complex data structures and distributed data grids, which require a lot of low-level programming, so I do most of my work in Java. So while I have long admired Clojure from afar, I am a total newbie when it comes to actually writing Clojure code. Nevertheless, Leiningen and our YC buddies’ Light Table editor made getting started with Clojure a cinch.

As it turns out, Clojure is so flexible that mimicking Erlang’s syntax was quite easy. Pattern-matching on the received messages was provided courtesy of core.match, which isn’t very mature and has some idiosyncrasies, but works well enough out of the box.

Actually, the hardest problem was getting the instrumentation agent to identify suspendable Clojure functions. This is quite easy with Java Quasar code as suspendable methods declare themselves as throwing a special checked exception. The Java compiler then helps ensure that any method calling a suspendable method must itself be declared suspendable. But Clojure doesn’t have checked exceptions. I though of using an annotation, but that didn’t work, and skimming through the Clojure compiler’s code proved that it’s not supported (though this feature could be added to the compiler very easily). In fact, it turns out you can’t mark the class generated by the Clojure compiler for each plain Clojure function in any sensible way that could be then detected by the instrumentation agent. Then I realized it wouldn’t have mattered because Clojure sometimes generates more than one class per function.

I ended up on notifying the instrumentation agent after the function’s class has been defined, and then retransforming the class bytecode in memory. Also, because all Clojure function calls are done via an interface (IFn), there is no easy way to recognize calls to suspendable functions in order to inject stack management code at the call-site. An easy solution was just to assume that any call to a Clojure function from within a suspendable function is a call to a suspendable function (although it adversely affects performance; we might come up with a better solution in future releases).

Another problem that caused me much grief was that Pulsar benchmarks were extremely slow. After almost giving up on the whole thing, I accidentally discovered that this wasn’t due to Clojure at all, but to Leiningen, which messes with JIT settings. Running the benchmarks with lein trampoline solved the problem: the benchmarks now ran more than 8 times faster (i.e. Leiningen introduced an 8x slowdown!).

Like I said, I’m a noob Clojureist, so things can probably be implemented much more elegantly (any suggestions would be much appreciated), but it works! Even Clojure bindings are preserved for the fiber (i.e. they introduce fiber-local variables rather than thread-local).

Finally, Pulsar mimics Erlang’s API. This was done partly to see how far Clojure could be taken to look like Erlang, but also because Erlang’s API has proven itself, so this is probably a good starting point. Down the line it’s important to preserve the model while shaping the API to fit better with the rest of Clojure. Integration with other Clojure concurrency constructs would also be necessary. Integrating with agents would be easy — in fact, Pulsar configures the agent implementation to use fork/join as the thread-pool; with refs — I don’t know. That depends on what locking, if any, takes place in Clojure’s STM implementation (I haven’t studied it yet). Any suggestions on improving the API are also welcome.

Selective Receive

Before discussing performance and the roadmap, I’d like to briefly pause on one feature that makes Erlang actors quite unique: selective receive. This feature allows an actor to process messages not in the order of their arrival.

While selective receive might cause deadlocks, it makes managing complex state-transitions much easier and less error-prone than without selective receive, as demonstrated in this talk by Ulf Wiger.

Anyway, I found this question on stackoverflow.com that gives an example of selective receive in Erlang, and translated it to Clojure using Pulsar (this isn’t idiomatic Clojure at all, but a demonstration of a direct translation from Erlang):

(ns co.paralleluniverse.pulsar-test.examples.priority
  (:use co.paralleluniverse.pulsar))

(declare normal)

(defsusfn important [] 
  (receive 
   [(priority :guard #(> % 10)) msg] (cons msg (important))
   :after 0 (normal)))

(defsusfn normal []
  (receive
   [_ msg] (cons msg (normal))
   :after 0 ()))

(defn -main []
  (join (spawn 
         (fn [] 
           (! @self [15 :high])
           (! @self [7 :low])
           (! @self [1 :low])
           (! @self [17 :high])
           (important)))))

;(:high :high :low :low)

This example is found in Pulsar’s examples directory.

Local Actor State

In Erlang, actor state is set by recursively calling the actor function with the new state as an argument. In Pulsar, we can do the same. Here’s an example from the Pulsar tests:

(is (= 25 (let [actor
                    (spawn #(loop [i (int 2)
                                   state (int 0)]
                              (if (== i 0)
                                state
                                (recur (dec i) (+ state (int (receive)))))))]
                (! actor 13)
                (! actor 12)
                (join actor))))

However, Clojure is all about managing state correctly. Since only the actor can set its own state, and because an actor cannot run concurrently with itself, mutating a state variable directly shouldn’t be a problem. Every Pulsar actor has a state field that can be read like this state and written with set-state!. Here’s another example from the tests:

(is (= 25 (let [actor
                    (spawn #(do
                              (set-state! 0)
                              (set-state! (+ @state (receive)))
                              (set-state! (+ @state (receive)))
                              @state))]
                (! actor 13)
                (! actor 12)
                (join actor))))

Finally, what if we want several state fields? What if we want some or all of them to be of a primitive type? This, too, poses no risk of race conditions because all state fields are written and read only by the actor, and there is no danger of them appearing inconsistent to an observer.

Pulsar supports this as an experimental feature (implemented with deftype), like so:

(is (= 25 (let [actor (spawn (actor [^int sum 0]
                                        (set! sum (int (+ sum (receive))))
                                        (set! sum (int (+ sum (receive))))
                                        sum))]
                (! actor 13)
                (! actor 12)
                (join actor))))

These are three different ways of managing actor state. Eventually, we’ll settle on just one or two (and are open to discussion about which is preferred).

Performance

Our goal at Parallel Universe is to make it easy for developers to write fast and scalable multi-threaded and distributed software, so naturally, performance is always a main concern.

At this stage of the project, however, it is a little early for serious benchmarking; Quasar and Pulsar were no more than idea just over a month ago. But it is important to get a first impression of performance, just so we’ll have a sense of what we’re up against.

I’ve found a benchmark for both Erlang and Go, which is semi-canonical. It is the ring benchmark, and it consists of actors arranged in a loop, each waiting for a message from its predecessor; when it arrives, the actor sends a message to its successor. The Erlang code I’ve used is found here, and the Go code is here. Both were run with multiprocessing enabled, with the number of threads equal to the number of logical cores (8) on my development machine (2.3GHz i7 MBP).

The benchmarks were by no means performed under lab conditions, so the results varied quite a bit and shouldn’t be taken too seriously (again, they were just meant to get a general sense for performance). The numbers presented here are the median of a few runs. All the benchmarks were run with 1000 actors and 1000 messages (i.e. 1,000,000 messages are sent and received in total).

First, we benchmarked Quasar (code here) and Pulsar (code here) against Erlang R15B01:

erl     480 ms
quasar  550 ms
pulsar  657 ms

We then tested against Go 1.1, this time with primitive-type (int) channels, just like Go’s (Quasar code, Pulsar code).

go 1.1  657 ms
quasar  570 ms
pulsar  650 ms

These results look promising. Even though the fiber implementation is not native to the JVM but injected through instrumentation, the Pulsar and Quasar numbers are better than Go, and not too far from Erlang.

Digging a little deeper revealed that this benchmark hits a very particular fork/join weak-spot. This is a degenerate edge-case because only one actor is active at any given time. Multi-threading doesn’t help. It turns out that fork/join has difficulty handling this particular case, because when a new task is created (forked), the scheduler does not yet know that the current task will shortly terminate and that the current thread is the best choice to run the forked task. Instead, it looks for a different thread to run the task on, and if it’s idle — and it is, because only one thread has work to do at any given time — it has to be awakened (at the OS level).

So Quasar/Pulsar will probably perform better in most circumstances. In fact, they will probably perform well enough to beat both Erlang and Go even at this early stage, while this particular issue with fork/join will be resolved (according to Doug Lea, on the java-concurrency mailing list) by the time Java 8 is released.

In addition, there’s plenty of room to improve the instrumentation of Clojure functions, which, as explained above, is perhaps too conservative.

IO, Binary Buffers

Fibers, i.e. lightweight threads, can do a lot more than just support actors. One of the most interesting things they enable is fiber-blocking IO. There is a longstanding debate about which is better/faster: blocking IO, or asynchronous, event-driven IO. I don’t know what the current consensus is, if there is one, and won’t discuss the merits of either approach. Some think that blocking IO is better for long connections while event-driven IO is better for short ones. In any case, most agree that the blocking, threaded approach is easier to program.

Well, because we can precisely control how fibers are scheduled, we can build fiber-blocking IO that looks just like regular blocking IO, but is, in fact, implemented with asynchronous IO. A fiber issuing an IO request will park, and when the request completes, the fiber will be rescheduled. The Quasar code contains an implementation of such an approach, which uses Java 7’s async NIO channels. I don’t know how well it works because I haven’t tried it yet (the code is there as a draft meant to test the API).

Now what about processing binary data? Erlang has excellent support for binary buffers, and Pulsar’s got that, too, thanks to Zach Tellman’s Gloss library. Here’s an example:

(ns co.paralleluniverse.pulsar-test.examples.binary
  "A binary-data buffer message example"
  (:use co.paralleluniverse.pulsar 
        [gloss core io]))

;; This is the layout of our binary buffer:
(def fr (compile-frame {:a :int16, :b :float32}))

(defsusfn receiver []
  (receive [buffer #(decode fr %)]
           {:a 1 :b b} (println "Got buffer (a=1) b: " b)
           {:a a :b b} (println "Got unexpected buffer" buffer "a: " a "b: " b)
           :after 100 (println "timeout!")))

(defn -main []
  (let [r (spawn receiver)
        buffer (encode fr {:a 1 :b 2.3})]
    (! r buffer)
    (join r)))

Shared state

In my previous blog post I wrote that actors alone are insufficient in themselves to build concurrent applications. Very often, a shared data-structure is required. Even Erlang has shared data structures in the form of ETS tables.

In that post, I suggested an approach that meshes the application and the database. The database schedules and parallelizes the application’s data-processing logic.

That approach, while yielding terrific performance and scalability, has one drawback. It requires that all data-processing logic be placed in callbacks that are run by the database, but splitting code into lots of callbacks feels unnatural for many programmers, and might be hard to reason about.

Fibers solve that. Like in the case of IO, callbacks can be trivially transformed into fiber-blocking code. The fiber issues a database request and blocks. When the operation completes, the fiber continues, but it is now scheduled by the database, which ensures optimal parallelism.

Next steps

Currently, both Quasar and Pulsar documentation is lacking (huh, it’s virtually nonexistent other than examples). But other than documentation, here are some features on the roadmap (obviously, they’re all subject to change based on recommendations from the developer community):

  • Distribution: a natural next step would be distributed actors. Luckily, Galaxy already has all the necessary infrastructure in place. It can also distribute shared data structures.
  • I/O: the code for fiber-blocking IO is in place for socket and file IO, but is completely untested.
  • Better isolation: we will carefully examine Quasar/Pulsar’s error containment to provide fault isolation to (almost) match Erlang’s.
  • Hot code-swapping: this should be trivial for Clojure, but shouldn’t be hard for Java as well. After all, we already have an instrumentation agent running, so it’s pretty simple to instruct it to redefine some classes. Haven’t tried it yet, though.
  • OTP: Much of Erlang’s software-architecture and fault tolerance power comes from the battle-tested OTP framework. We can build a similar library, which, at least for Clojure, might be even more elegant than OTP.
  • Integration with Erlang: talking to Erlang processes from Java is quite easy with JInterface (I tried it), but JInterface is outdated and could be easily made to perform much better. A good start would be Scalang, and I’ve begun porting it to Java to disentangle it from Scala’s type system, but I’ve put it aside for now. Once we have a fast, well-engineered Erlang interfacing library, plugging it into Quasar/Pulsar should be trivial.

The top three or four of these features will probably make it for the 1.0 release scheduled for late June.

Getting started

To get started, follow the steps on the Quasar or Pulsar project pages (for Clojure code, there is no need to visit Quasar).

Please be patient, though, as this is alpha phase software. 

Discuss on Hacker News


  1. A notable attempt that also provides lightweight threads is the Kilim library, and Erjang, the JVM Erlang implementation which uses Kilim under the hood. Kilim, however, is a very intrusive and complex infrastructure, while Erjang seems to no longer be actively maintained.

  2. This is why I avoided Kilim; it fuses all three into a monolithic package.

  3. Some commenters on HN added Haskell to the list, but I know too little Haskell to form an opinion on the matter.

Tags: ,

comments powered by Disqus