Erlang (and Go) in Clojure (and Java)
Lightweight Threads, Channels and Actors for the JVM
We’ll start at the end: Quasar and Pulsar are two new open-source libraries in Java and Clojure respectively that add Erlang-like actor-model (and Go-like coroutine/channel) programs to the JVM.
There have been several attempts of porting actors to the JVM. Quasar and Pulsar’s main contribution — from which many of their advantages stem — is true lightweight threads[1]. Lightweight threads provide many of the same benefits “regular”, OS threads do, namely a single, simple control flow and the ability to block and wait for some resource to become available while in the meantime allowing other threads to run on the CPU. Unlike regular threads, lightweight threads are not scheduled by the operating system so their context-switch is often faster, and they require far less system resources. As a result, a single machine can handle millions of them.
Quasar/Pulsar actors have the following benefits:
- Extreme simplicity
- Pattern matching
- Binaries and pattern matching on binaries (just like Erlang!)
- Selective receive
- Super-scalable asynchronous IO accessed with a synchronous, blocking IO API.
Just to show where we’re headed, here’s an almost line-by-line port of the canonical Erlang pingpong example to Pulsar:
(defsusfn ping [n]
(if (== n 0)
(do
(! :pong :finished)
(println "ping finished"))
(do
(! :pong [:ping @self])
(receive
:pong (println "Ping received pong"))
(recur (dec n)))))
(defsusfn pong []
(receive
:finished (println "Pong finished")
[:ping ping] (do
(println "Pong received ping")
(! ping :pong)
(recur))))
(defn -main []
(register :pong (spawn pong))
(spawn ping 3)
:ok)
Why?
Writing correct and efficient multi-threaded code is extremely hard. Unfortunately, given hardware trends, it is has also become absolutely necessary.
Several paradigms that try to make writing correct and efficient multithreaded code have emerged, but the actor model offered by Erlang has won some popularity, and for good reason: the actor model is easy to understand and lends itself to building complex fault-tolerant software. My own admiration for Erlang began when I realized that, to paraphrase Philip Greenspun, every mission-critical military software I’d worked on contained an ad-hoc, informally specified implementation of half of Erlang. Other languages — notably Go with channels and goroutines, and Clojure with agents — and frameworks (like Akka), have copied some aspects of the actor model, but they all suffer from deficiencies that make them more complicated, or less powerful than Erlang.
Porting Erlang’s model to the JVM — following the original as closely as possible while still adhering to the platform’s established practices — would therefore introduce a very powerful tool to what is possibly the most ubiquitous programming environment in use today, and one of the most performant ones.
Quasar, the Implementation
An Erlang-like actor system (i.e. with blocking, selective receives), requires three components:
- Coroutines
- A scheduler
- Queues
Coroutines are functions that can be paused and later resumed. They are necessary to build lightweight threads because they provide “user-mode” management of the stack. A good, multithreaded scheduler, is then added to schedule and run the coroutines while making the best use of available cores. This combination gives us lightweight threads. Finally, queues are needed to implement channels or actor mailboxes.
It is important these three components be logically separated. Modularity assist in future maintenance, and allows for a best-of-breed approach[2].
Of the three pieces, coroutines are the most challenging to implement on the JVM, as they require bytecode instrumentation. Essentially, every possibly-pausing method (the Quasar/Pulsar nomenclature is “suspendable”) must be inspected to find all invocations of other suspendable methods. Before the call to a suspendable method, code must be injected to push the caller’s local variables onto a stack object. Also, code must be injected to the beginning of every suspendable method, that upon resuming would jump to the instruction following the pause-point. This has to be done to all suspendable methods on the (OS thread) stack.
Fortunately, I found Matthias Mann’s continuations library that not only does exactly that and only that, but is also small, fast and easy to understand.
Next, came the scheduler. Here the job was easy. Java’s ForkJoinPool is a masterpiece of multi-threaded task scheduling, made even better for Java 8. In the meantime, we’re using the jsr166e library that contains the fork/join implementation that’s to go into the next major JDK release. We’ve used it in SpaceBase, and the performance improvement over the Java 7 implementation (which was very impressive to begin with) are significant. With a solid foundation like fork/join you can’t go wrong.
Combining Matthias Mann’s coroutines with fork/join gives us lightweight threads, which in Quasar are called fibers. A fiber can block, or “park”, just like a thread, by pausing the continuation and returning from the fork/join task. To be “unparked”, it is simply scheduled (forked) again.
Finally, I rolled my own queue implementations, partly as an exercise, but also because there’s room for improvement over Java’s concurrent Queues in our case. The reason is that the queues we need aren’t symmetric. Many fibers append messages onto the queues but only one pulls them out, so these are single-consumer multiple-producers queues.
Quasar has three types of lock-free queue implementations: concurrent linked-lists based on John M. Mellor-Crummey’s 1987 paper (for some reason the simpler Craig-Landin-Hagersten queue I’ve tried gave worse performance than Java’s multiple-consumer multiple-producer ConcurrentLinkedQueue. I must have done something wrong — I’ll look into that); bounded array queues with some added “mechanical sympathy” based on Martin Thompson’s talks; and an unbounded linked-array queue based on Gidenstam, Sundell and Tsigas. Java’s memory model and excellent concurrency primitives allowed building some pretty fast queues (the bounded queue performed best by far, but the others can be improved; see benchmark code here).
Combining the queues with a poor-man’s condition variable that parks the reading fiber while it waits for messages gives us Go-like channels. A fiber attached to a channel, combined with some lifecycle management (Erlang-like “links”) gives us actors.
A special sendSync operation makes a request-reply conversation between two actors almost as efficient as a simple method call, all without changing actor semantics at all.
Pulsar
Pulsar wraps Quasar with an API identical to Erlang, pattern-matching and all.
I’ve mentioned before that I see Erlang and Clojure as the two languages[3] best suited for programming in our multicore age. They don’t only make concurrency easy (Go tries to do that, too, with a too-early-to-tell measure of success), they also greatly help to ensure that concurrency is correct. In short, they are both languages built for concurrency. They achieve this difficult task because they are opinionated languages with clear philosophies on how modern software should be written.
But Clojure and Erlang have their differences, too. The one most apparent to me is that Erlang was made for programming-in-the-large. Its roots (and motivations) are not in concurrency, but in fault tolerance. Erlang was designed to build large, complex software that needs to withstand failure. Its shortcomings are in supporting low-level constructs. It’s hard or impossible to write a high-performance data structure in Erlang, and it’s ill-suited for computation-heavy work. Erlang delegates these low-level problems to modules written in other languages.
I think Clojure is the opposite. It mostly shines when programming-in-the-small. It’s hard to beat Clojure’s expressivity when it comes to data processing, but it doesn’t offer a clear vision when it comes to fault-tolerance and provides little guidance to high-level organization of complex, large software. But Clojure is a more modern language than Erlang, as a Lisp it’s very malleable, and it runs on a very versatile platform, so it is much easier to bring Erlang to Clojure than the other way around.
Rich Hickey, Clojure’s designer, has made it very clear why he chose not to have Erlang-style actors in Clojure. His makes some good arguments, and far be it from me to disagree with Hickey on Clojure’s philosophy, but I think that that reasoning, too, serves programming-in-the-small rather than in the-large. Actors, in addition to their contribution to simplifying concurrency, help isolate and handle errors. In fact, that is why they’re used in Erlang in the first place. Actors in Clojure may be a tool for programming-in-the-large.
Tim Bray also compares Clojure and Erlang and finds Clojure more elegant, but Erlang’s message-passing model easier to understand. All this goes to say that porting Erlang actors to Clojure might be a worthwhile undertaking (and at the very least — an interesting exercise).
At Parallel Universe we develop complex data structures and distributed data grids, which require a lot of low-level programming, so I do most of my work in Java. So while I have long admired Clojure from afar, I am a total newbie when it comes to actually writing Clojure code. Nevertheless, Leiningen and our YC buddies’ Light Table editor made getting started with Clojure a cinch.
As it turns out, Clojure is so flexible that mimicking Erlang’s syntax was quite easy. Pattern-matching on the received messages was provided courtesy of core.match, which isn’t very mature and has some idiosyncrasies, but works well enough out of the box.
Actually, the hardest problem was getting the instrumentation agent to identify suspendable Clojure functions. This is quite easy with Java Quasar code as suspendable methods declare themselves as throwing a special checked exception. The Java compiler then helps ensure that any method calling a suspendable method must itself be declared suspendable. But Clojure doesn’t have checked exceptions. I though of using an annotation, but that didn’t work, and skimming through the Clojure compiler’s code proved that it’s not supported (though this feature could be added to the compiler very easily). In fact, it turns out you can’t mark the class generated by the Clojure compiler for each plain Clojure function in any sensible way that could be then detected by the instrumentation agent. Then I realized it wouldn’t have mattered because Clojure sometimes generates more than one class per function.
I ended up on notifying the instrumentation agent after the function’s class has been defined, and then retransforming the class bytecode in memory. Also, because all Clojure function calls are done via an interface (IFn), there is no easy way to recognize calls to suspendable functions in order to inject stack management code at the call-site. An easy solution was just to assume that any call to a Clojure function from within a suspendable function is a call to a suspendable function (although it adversely affects performance; we might come up with a better solution in future releases).
Another problem that caused me much grief was that Pulsar benchmarks were extremely slow. After almost giving up on the whole thing, I accidentally discovered that this wasn’t due to Clojure at all, but to Leiningen, which messes with JIT settings. Running the benchmarks with lein trampoline solved the problem: the benchmarks now ran more than 8 times faster (i.e. Leiningen introduced an 8x slowdown!).
Like I said, I’m a noob Clojureist, so things can probably be implemented much more elegantly (any suggestions would be much appreciated), but it works! Even Clojure bindings are preserved for the fiber (i.e. they introduce fiber-local variables rather than thread-local).
Finally, Pulsar mimics Erlang’s API. This was done partly to see how far Clojure could be taken to look like Erlang, but also because Erlang’s API has proven itself, so this is probably a good starting point. Down the line it’s important to preserve the model while shaping the API to fit better with the rest of Clojure. Integration with other Clojure concurrency constructs would also be necessary. Integrating with agents would be easy — in fact, Pulsar configures the agent implementation to use fork/join as the thread-pool; with refs — I don’t know. That depends on what locking, if any, takes place in Clojure’s STM implementation (I haven’t studied it yet). Any suggestions on improving the API are also welcome.
Selective Receive
Before discussing performance and the roadmap, I’d like to briefly pause on one feature that makes Erlang actors quite unique: selective receive. This feature allows an actor to process messages not in the order of their arrival.
While selective receive might cause deadlocks, it makes managing complex state-transitions much easier and less error-prone than without selective receive, as demonstrated in
this talk by Ulf Wiger.
Anyway, I found this question on stackoverflow.com that gives an example of selective receive in Erlang, and translated it to Clojure using Pulsar (this isn’t idiomatic Clojure at all, but a demonstration of a direct translation from Erlang):
(ns co.paralleluniverse.pulsar-test.examples.priority
(:use co.paralleluniverse.pulsar))
(declare normal)
(defsusfn important []
(receive
[(priority :guard #(> % 10)) msg] (cons msg (important))
:after 0 (normal)))
(defsusfn normal []
(receive
[_ msg] (cons msg (normal))
:after 0 ()))
(defn -main []
(join (spawn
(fn []
(! @self [15 :high])
(! @self [7 :low])
(! @self [1 :low])
(! @self [17 :high])
(important)))))
; => (:high :high :low :low)
This example is found in Pulsar’s examples directory.
Local Actor State
In Erlang, actor state is set by recursively calling the actor function with the new state as an argument. In Pulsar, we can do the same. Here’s an example from the Pulsar tests:
(is (= 25 (let [actor
(spawn #(loop [i (int 2)
state (int 0)]
(if (== i 0)
state
(recur (dec i) (+ state (int (receive)))))))]
(! actor 13)
(! actor 12)
(join actor))))
However, Clojure is all about managing state correctly. Since only the actor can set its own state, and because an actor cannot run concurrently with itself, mutating a state variable directly shouldn’t be a problem. Every Pulsar actor has a state field that can be read like this @state and written with set-state!. Here’s another example from the tests:
(is (= 25 (let [actor
(spawn #(do
(set-state! 0)
(set-state! (+ @state (receive)))
(set-state! (+ @state (receive)))
@state))]
(! actor 13)
(! actor 12)
(join actor))))
Finally, what if we want several state fields? What if we want some or all of them to be of a primitive type? This, too, poses no risk of race conditions because all state fields are written and read only by the actor, and there is no danger of them appearing inconsistent to an observer.
Pulsar supports this as an experimental feature (implemented with deftype), like so:
(is (= 25 (let [actor (spawn (actor [^int sum 0]
(set! sum (int (+ sum (receive))))
(set! sum (int (+ sum (receive))))
sum))]
(! actor 13)
(! actor 12)
(join actor))))
These are three different ways of managing actor state. Eventually, we’ll settle on just one or two (and are open to discussion about which is preferred).
Performance
Our goal at Parallel Universe is to make it easy for developers to write fast and scalable multi-threaded and distributed software, so naturally, performance is always a main concern.
At this stage of the project, however, it is a little early for serious benchmarking; Quasar and Pulsar were no more than idea just over a month ago. But it is important to get a first impression of performance, just so we’ll have a sense of what we’re up against.
I’ve found a benchmark for both Erlang and Go, which is semi-canonical. It is the ring benchmark, and it consists of actors arranged in a loop, each waiting for a message from its predecessor; when it arrives, the actor sends a message to its successor. The Erlang code I’ve used is found here, and the Go code is here. Both were run with multiprocessing enabled, with the number of threads equal to the number of logical cores (8) on my development machine (2.3GHz i7 MBP).
The benchmarks were by no means performed under lab conditions, so the results varied quite a bit and shouldn’t be taken too seriously (again, they were just meant to get a general sense for performance). The numbers presented here are the median of a few runs. All the benchmarks were run with 1000 actors and 1000 messages (i.e. 1,000,000 messages are sent and received in total).
First, we benchmarked Quasar (code here) and Pulsar (code here) against Erlang R15B01:
erl 480 ms
quasar 550 ms
pulsar 657 ms
We then tested against Go 1.1, this time with primitive-type (int) channels, just like Go’s (Quasar code, Pulsar code).
go 1.1 657 ms
quasar 570 ms
pulsar 650 ms
These results look promising. Even though the fiber implementation is not native to the JVM but injected through instrumentation, the Pulsar and Quasar numbers are better than Go, and not too far from Erlang.
Digging a little deeper revealed that this benchmark hits a very particular fork/join weak-spot. This is a degenerate edge-case because only one actor is active at any given time. Multi-threading doesn’t help. It turns out that fork/join has difficulty handling this particular case, because when a new task is created (forked), the scheduler does not yet know that the current task will shortly terminate and that the current thread is the best choice to run the forked task. Instead, it looks for a different thread to run the task on, and if it’s idle — and it is, because only one thread has work to do at any given time — it has to be awakened (at the OS level).
So Quasar/Pulsar will probably perform better in most circumstances. In fact, they will probably perform well enough to beat both Erlang and Go even at this early stage, while this particular issue with fork/join will be resolved (according to Doug Lea, on the java-concurrency mailing list) by the time Java 8 is released.
In addition, there’s plenty of room to improve the instrumentation of Clojure functions, which, as explained above, is perhaps too conservative.
IO, Binary Buffers
Fibers, i.e. lightweight threads, can do a lot more than just support actors. One of the most interesting things they enable is fiber-blocking IO. There is a longstanding debate about which is better/faster: blocking IO, or asynchronous, event-driven IO. I don’t know what the current consensus is, if there is one, and won’t discuss the merits of either approach. Some think that blocking IO is better for long connections while event-driven IO is better for short ones. In any case, most agree that the blocking, threaded approach is easier to program.
Well, because we can precisely control how fibers are scheduled, we can build fiber-blocking IO that looks just like regular blocking IO, but is, in fact, implemented with asynchronous IO. A fiber issuing an IO request will park, and when the request completes, the fiber will be rescheduled. The Quasar code contains an implementation of such an approach, which uses Java 7’s async NIO channels. I don’t know how well it works because I haven’t tried it yet (the code is there as a draft meant to test the API).
Now what about processing binary data? Erlang has excellent support for binary buffers, and Pulsar’s got that, too, thanks to Zach Tellman’s Gloss library. Here’s an example:
(ns co.paralleluniverse.pulsar-test.examples.binary
"A binary-data buffer message example"
(:use co.paralleluniverse.pulsar
[gloss core io]))
;; This is the layout of our binary buffer:
(def fr (compile-frame {:a :int16, :b :float32}))
(defsusfn receiver []
(receive [buffer #(decode fr %)]
{:a 1 :b b} (println "Got buffer (a=1) b: " b)
{:a a :b b} (println "Got unexpected buffer" buffer "a: " a "b: " b)
:after 100 (println "timeout!")))
(defn -main []
(let [r (spawn receiver)
buffer (encode fr {:a 1 :b 2.3})]
(! r buffer)
(join r)))
Shared state
In my previous blog post I wrote that actors alone are insufficient in themselves to build concurrent applications. Very often, a shared data-structure is required. Even Erlang has shared data structures in the form of ETS tables.
In that post, I suggested an approach that meshes the application and the database. The database schedules and parallelizes the application’s data-processing logic.
That approach, while yielding terrific performance and scalability, has one drawback. It requires that all data-processing logic be placed in callbacks that are run by the database, but splitting code into lots of callbacks feels unnatural for many programmers, and might be hard to reason about.
Fibers solve that. Like in the case of IO, callbacks can be trivially transformed into fiber-blocking code. The fiber issues a database request and blocks. When the operation completes, the fiber continues, but it is now scheduled by the database, which ensures optimal parallelism.
Next steps
Currently, both Quasar and Pulsar documentation is lacking (huh, it’s virtually nonexistent other than examples). But other than documentation, here are some features on the roadmap (obviously, they’re all subject to change based on recommendations from the developer community):
- Distribution: a natural next step would be distributed actors. Luckily, Galaxy already has all the necessary infrastructure in place. It can also distribute shared data structures.
- I/O: the code for fiber-blocking IO is in place for socket and file IO, but is completely untested.
- Better isolation: we will carefully examine Quasar/Pulsar’s error containment to provide fault isolation to (almost) match Erlang’s.
- Hot code-swapping: this should be trivial for Clojure, but shouldn’t be hard for Java as well. After all, we already have an instrumentation agent running, so it’s pretty simple to instruct it to redefine some classes. Haven’t tried it yet, though.
- OTP: Much of Erlang’s software-architecture and fault tolerance power comes from the battle-tested OTP framework. We can build a similar library, which, at least for Clojure, might be even more elegant than OTP.
- Integration with Erlang: talking to Erlang processes from Java is quite easy with JInterface (I tried it), but JInterface is outdated and could be easily made to perform much better. A good start would be Scalang, and I’ve begun porting it to Java to disentangle it from Scala’s type system, but I’ve put it aside for now. Once we have a fast, well-engineered Erlang interfacing library, plugging it into Quasar/Pulsar should be trivial.
The top three or four of these features will probably make it for the 1.0 release scheduled for late June.
Getting started
To get started, follow the steps on the Quasar or Pulsar project pages (for Clojure code, there is no need to visit Quasar).
Please be patient, though, as this is alpha phase software.
Discuss on Hacker News
-
A notable attempt that also provides lightweight threads is the Kilim library, and Erjang, the JVM Erlang implementation which uses Kilim under the hood. Kilim, however, is a very intrusive and complex infrastructure, while Erjang seems to no longer be actively maintained. ↩
-
This is why I avoided Kilim; it fuses all three into a monolithic package. ↩
-
Some commenters on HN added Haskell to the list, but I know too little Haskell to form an opinion on the matter. ↩
A New Approach to Databases and Data Processing — Simulating 10Ks of Spaceships on My Laptop
In this post I’d like to discuss a new approach to database-aided data processing, and how we were able to use this approach to simulate tens of thousands of fighting spaceships on my laptop. Obviously, “a new approach to data processing” is a grand claim which will turn out to be little more than a repurposing of a rather old approach, and, naturally, such a broad topic has many ins and outs and specifics that I only have room to hint at and would require deeper discussion; nevertheless, I hope this bird’s-eye view of a serious problem and its suggested solution will be enough to spark your curiosity. But first, here’s what 10,000 spaceships in battle look like (best viewed in high quality):
Could we please have the old Moore’s Law back?
CPUs aren’t getting any faster. They haven’t been getting faster for a few years, and they won’t be getting faster ever again[1].
While Gordon Moore’s 1965 prediction about the rate of growth of CPU transistor density still holds, more or less, to this day, all it does now is give us more cores.
If up until five or six years ago growing requirements demanded that a piece of software handle double the amount of data, or double the number of users, all developers had to do was wait for new hardware. Now the story is different: scaling experts are hired; databases sharded; consistency guarantees relaxed.
Unfortunately, we can’t get the “old Moore’s law” back. If a program has but two instructions, one depending on the result of the other, no number of cores could make executing those two dependent instructions faster, as they must run consecutively. This is, in essence, Amdahl’s law. Heck, even a program consisting of a single instruction would run doubly-fast on a double-speed CPU, but would not be accelerated in the slightest by the addition of cores.
This, of course, isn’t news. All the vast amount of talk of scaling in the past few is a direct consequence of “old Moore’s” decline.
But Amdahl’s law isn’t quite a death sentence to software scaling. After all, software doesn’t need to do the same amount of work faster and faster; it needs to do more work at the same speed less work used to take. More work usually means processing more data, and more data brings with it more opportunities for parallelism, thus canceling-out Amdahl. But — and this is a big but, indeed — achieving this comes at a hefty price. Exploiting multiple cores well is really hard. It takes a lot of work. Doing it both correctly and efficiently is damn-near impossible. So what do we do? we shard.
Why sharding just won’t cut it
The simplest way to handle more data using more cores (whether on a single machine or in cluster) is to partition it into disjoint subsets, and work on each subset in isolation. In the database world this is called sharding, and it makes scaling relatively simple; only downside is -— it makes writing compellingly complex applications hard.
Sharding only works well when certain assumptions hold. In particular, it requires little or no interaction between data elements in separate shards. In fact, we can look at it like this: having N shards is imposing N “non-interaction” constraints on the application (or writing jury rigged code to handle such interactions as corner cases).
But if the new Moore’s law gives us exponential core-number growth, to exploit those cores we need to grow the number of shards exponentially as well, meaning we’re placing and exponential number of constraints on an application that probably only needs to accommodate a linearly increasing number of users, and that’s bad.
We could, of course, use fewer cores and fewer shards, but then data-processing performance within each shard will remain constant (because CPU clock speed has plateaued), and we won’t be able to do cooler stuff with the data.
Sharding also limits other useful abilities like secondary indices: sometimes we’d like to access our data according to different criteria that don’t align neatly with the chosen partitioning.
But, since sharding is relatively simple, that is the approach commonly taken when facing a scaling challenge. The result is usually software that’s severely hindered in its complex, useful, abilities, or one that requires inordinate amount of work to fight an uphill battle against the limitations their architecture imposes on the application, rather than have the architecture magically expand the application’s abilities.
The way to go
If we want to continue writing compellingly complex applications at an ever-increasing scale we must come to terms with the new Moore’s law and build our software on top of solid infrastructure designed specifically for this new reality; sharding just won’t cut it.
First, this means using a programming language built for concurrency. At present, there are two such languages in wide(ish) use: Erlang[2] and Clojure. Other languages might make some concurrent functionality easier than before, but are not designed from the ground up for concurrency (for example, they do not prevent races or enforce correct data visibility)[3].
Second, we need the right concurrent data structures. Every application needs a large data structure (call it a database if you like) to use as its main data-store. This data structure must be concurrent, and it must be mutable to allow for concurrent writes. Erlang and Clojure don’t readily allow for writing such data structures because, for the sake of efficiency, these data structures must internally circumvent the concurrency protections put in place by the language. Erlang provides such a data structure in the form of ETS, and Clojure programs may rely on Java’s concurrent data-structures.
For simplicity’s sake, let us call such a data structure the database, even though it is not necessarily durable durable (though it must be able to store the application’s main data set).
What the database should do
Like map-reduce frameworks and SQL databases as they’ve been traditionally used — and unlike many NoSQL solutions — we believe that it is the database’s role not only to store and retrieve data on behalf of the application, but also to assist the application in processing it. This because the database is in a unique position to know which transactions may contend for the same data items, and how to schedule them with respect to one another for the best possible performance. The database can and should be smart. If the database is dumb and sharded, it places a-priori constraints to avoid contention that it could have handled intelligently at runtime without imposing limitations on the application.
As I’ve said, this isn’t new. Traditional RDBMSs have been doing this for a long time by supporting complex SQL queries and stored procedures, and scheduling them just right. They, too, intended for the brains of the application to reside with the database, in its central server, while terminals provided little more than a simple UI.
Only, RDBMSs have had trouble scaling beyond the single server, though not due to a fault in this fundamental approach, as some NewSQL DBs have shown. And if you take this approach and retrofit it to modern day requirements and practices, you might just find that the database can become the application’s scaling engine rather than its bottleneck.
It works like this: you issue a query, but instead of waiting for a response, you hand the database a callback. The database takes care of scheduling the query, and then passes the result to your callback. The callback processed the data, and issues further queries and transactions.
You might think it’s a quite like asynchronous DB APIs popularized recently by Node.js, but it’s not. The main difference is this: your callback runs on a thread managed by the database. This means that if you issue two queries that result in two transactions, and those queries or transactions do not interfere with each other, the database will figure out that it can run your data-processing logic in parallel, on two separate cores.
You may think of it as writing your entire application as stored procedures; the database is not embedded in the application — the application becomes embedded in the database.
Domain specific data-structures are key in achieving performance magic. Too general, and the database won’t be able to exploit fortunate scheduling opportunities. But if you use just the right one, you can sit back and let your database worry about all the gruesome concurrency stuff and parallelize your business logic for you.
Simulating spaceships
So, now we get to the fun part; the part where we simulate lot’s and lots of spaceships blowing each other up.
To do this, we’ll use SpaceBase, our very own real-time spatial database, and let it parallelize our spaceships logic. The Java[4] code is up on github.
The spaceships aren’t particles in a particle system. Every spaceship constantly queries its surroundings to navigate and chase after targets, and interacts with its neighboring ships by shooting at them and propelling them away when it blows up. Each and every spaceship initiates queries and transactions, and all of them do it concurrently. The main loop merely triggers the spaceships’ actions, but they are all asynchronously run on SpaceBase’s threads. Here’s how the whole thing is implemented, more or less, in pseudocode:
main():
loop
foreach(s in spaceships)
s.run()
spaceship.run():
sbase.query(range_query(RADAR_RADIUS), visitor(results) {
foreach(s in results)
apply_rejection_force()
compute_new_position()
spacebase.update(this)
}
sbase.query(beam_query(heading, range), visitor(results) {
t = choose_target(results)
if hit(t)
t.hit()
}
spaceship.hit():
take_damage()
if destroyed
sbase.query_for_update(range_query(BLAST_RADIUS), visitor(results) {
foreach(s in results)
s.apply_blast_force()
spacebase.update(s)
}
Now, this isn’t the fastest way to simulate fighting spaceships. This isn’t even the fastest way of doing it with SpaceBase[5], but our intention here was to simulate dealing with a large number of requests coming through the network, initiating transactions.
Here’s how fast each simulation cycle runs on one of our development machines, a dual-core 2.3 GHz, i5 MacBook Pro:
10,000 ships avg: 190.33ms std: 53.48ms
20,000 ships avg: 435.38ms std: 146.07ms
SpaceBase’s performance will continue to improve, but that is not the important thing. The important thing is scalability. Here’s how fast the thing runs on another of our development machines, a quad-core 2.3GHz, i7 MacBook Pro:
10,000 ships avg: 96.38ms std: 15.56ms
20,000 ships avg: 198.53ms std: 31.42ms
Voila! We’ve got linear scalability[6] in the number of cores, with no sharding or zoning. The ships, namely our domain objects, are free to interact with one another any which way they like. It doesn’t even matter if all the spaceships decide to concentrate in, say, one quadrant of the space. The database will handle that[7].
And what about 50,000 spaceships? A cycle for 50K spaceships takes about 550ms on my i7 MacBook.
If you’d like to play some more with SpaceBase, you can download the Lite version that supports up to 20,000 objects here. If you do, and would like to ask questions about SpaceBase, you can do that here, in our new Google Group.
What’s next
Now, SpaceBase is a real-time spatial database, and it has applications in augmented reality, defense, gaming and location-based services, but we’re going to use the same approach for other kinds of databases, employing different data structures for completely different applications.
Also, a missing piece from our discussion of a scalable architecture is the distribution fabric responsible for scaling beyond the capabilities of a single machine. This component distributes and scales the database, but it must keep the data and the application code co-located (i.e., each machine must host a portion of the data alongside the code responsible for processing it), otherwise networking bottlenecks will kill off all the performance we’ve tried so hard to squeeze[8].
The distribution fabric must scale without resorting to sharding for the reasons outlined above, and it must fit well with the application’s concurrency mechanisms. But we will not discuss this component now as I’ve done this elsewhere and will do it yet again in the future, when we demonstrate how our approach can achieve cluster-wide scaling without weighing down the application with burdensome incidental limitations.
-
At least not much faster, or not without completely altering processor and software architecture. ↩
-
Actually, Erlang was not designed for concurrency but for fault-tolerance, but it may as well have been. ↩
-
Scala and Go belong in this category. ↩
-
I know, it’s not one of the “concurrency languages”, but this only goes to show that just as you get many advantages by using the right language albeit with the wrong database, you can also enjoy the benefits of the right database even with the wrong language. A Clojure version of Spaceships is planned, though. ↩
-
That would probably be running a single spatial join to find all pairs of neighboring spaceships in one query, and then running a single transaction to update all ships at once, in parallel. That is, we’d parallelize the simulation, but run it in a synchronous step-wise pattern. ↩
-
The performance seems even super-linear, but this is probably due to the i7’s better RAM throughput. ↩
-
Well, we can’t really perform magic. If, for example, all spaceships decide to fire, at once, on a single member of their crowd, contention will become a limiting factor, and Amdahl’s law will kick in. ↩
-
The only time code/data co-location isn’t required is if storage requirements far exceed processing requirements, in which case far more “storage nodes” are required than “application nodes”, but this scenario implies that most of the data is dead or dormant most of the time, or that the application is a simple CRUD and is therefore not hard to scale in the first place. We are not concerned with such applications here. ↩
Introducing SpaceBase Lite for Location Apps
SpaceBase is a real-time, in-memory spatial data store that began life as military-grade software. It has been used at the core of critical, real-time military systems. While designed for high-end uses, over the past few months we’ve seen interest in SpaceBase from web and mobile location-based services developers wanting to use SpaceBase as a real-time, spatial store/cache. So, we’ve decided to offer a free version of SpaceBase specifically aimed at such applications.
Why, when and how to use SpaceBase Lite
SpaceBase Lite is a very fast spatial data store. It stores the location of objects, allows you to update their location at a very high rate, and to query them - based on their location - very, very quickly. That’s it.
It’s especially suitable for applications tracking the current position of moving objects - say cars or people - and require either low-latency or a high volume of queries and updates. It does not persist data to disk, so it cannot be used for long-term storage. Nor is it meant to be used for analytics: It’s OLTP; not OLAP. If you do require long-term storage, you can use SpaceBase as a temporary cache. Feed all of your spatial updates into SpaceBase and serve all of your spatial queries from it; occasionally and asynchronously, query the data in SpaceBase and flush it to a slower database.
So, why use SpaceBase? Because it’s fast. Very fast. If you have a lot of spatial updates or queries flowing into your system, SpaceBase might just be what you need.
SpaceBase Lite is limited to 20,000 objects. If you need to track more than that, please contact us at info@paralleluniverse.co.
Features
- APIs for Python, Ruby and Node.js via Thrift.
- An Erlang API.
- Lightning fast insertions, deletions, updates and queries of spatial objects.
- Geo-spatial queries (which objects are within 300 yards from a given location) and joins (which pairs of objects are within 300 yards from one another).
- Automatic time-based expiration of objects (Thrift API only).
- Partial object retrieval (Thrift API only)
Requirements
All SpaceBase Lite requires is an installation of Java 7.
Download
So, download SpaceBase Lite here and give it a try. You can ask any question about it in the new forum/mailing list.
Galaxy’s Networking
Galaxy internals, part III
This is part 3 of a three-part series. Here are part 1 and part 2.
My previous two blog posts detailed Galaxy’s cache-coherence protocol and fault tolerance mechanisms. This last post in the Galaxy internals series will describe the last missing piece: how galaxy nodes communicate over a network.
While Galaxy’s networking, like all Galaxy components, is pluggable and could employ various networking protocols (e.g., Infiniband), the current release uses TCP and UDP for all networking, and it does so using the excellent Netty library.
Communicating with slaves and the central server
As mentioned in my last post, each Galaxy node can be configured to replicate its owned data-items to slave nodes and/or to a central server for the sake of fault-tolerance. These update-packets may be large (as they can contain a potentially large number of modified items), are sent asynchronously most of the time (the node does not need to wait for the replication to complete), and are sent to a small number of machines (just the node’s slave/s and/or one server). Therefore, all replication communication is done via TCP. This is simple, and nothing more will be said about it here.
Peer-to-peer node communication
Communication among peer nodes has very different characteristics from those of data replication. Remember, messages sent among peer nodes are used to request ownership of items, sharing of items, or invalidation of items. They are relatively infrequent because a well-behaved Galaxy application keeps almost all work locally within the nodes; short, because data items are small, and a single message can contain at most one item; require low-latency because most request messages can block an operation until they complete; organized into request-reply pairs, so that acknowledging the receipt of each request and reply separately is wasteful; and finally, sent among a large number of nodes, each can send and receive messages to and from all other nodes. These characteristics make UDP a better choice than TCP for peer-to-peer communication among nodes. Using TCP would have entailed either keeping an open connection from all nodes to all other nodes, or, alternatively, paying the high cost of the TCP handshake frequently. TCP would also acknowledge both request and response. Also, some messages need to be broadcast to all nodes, which would require UDP multicast anyhow.
Using UDP, however, has its costs. It does not guarantee delivery, messages may arrive out of order, and it does not allow messages of unlimited length – all of which are required by Galaxy, so they need to be implemented by Galaxy code as they’re not natively provided by the protocol. All of that is done in the UDPComm class.
While conceptually the simplest component of Galaxy, UDP networking code has turned out to be the most complex, and will probably require some refactoring. Also, it is the component that I put the least thought into so far, so I believe it also has the best potential for further performance improvements. Any suggestion for improving the process outlined below would be greatly appreciated.
Message packets
Messages are grouped together into packets. Each node maintains a peer object for of the other nodes, which keeps the last packet sent to the respective node, and a queue of messages waiting to be sent to it.
Packets are configured to have a maximum size, which should optimally be slightly less than the maximum Ethernet packet size, with some room left for IP and UDP headers (this is why jumbo frames are recommended when using Galaxy). A packet can contain many messages, but a single message cannot be broken up into more than one packet, and this is why all Galaxy items must be smaller than the maximum packet size (and, besides, small items are good in general as they would probably be less contended). If the packet is full, remaining messages are left to wait in the queue.
Galaxy can be configured to wait a little while (usually a few microseconds) for more messages to be sent before transmitting the packet, so that more of them will be packed into the packet, but Galaxy will only wait a short while before transmission so latency does not grow.
Guaranteeing delivery
Galaxy’s internal message-delivery API specifically denotes each message as being either a request or a response (regardless of the message type). Requests and response are handled a little differently when it comes to guaranteeing delivery.
If a packet contains any requests, it will be sent over and over until responses are received; this ensures requests are delivered. Once a response is received, its matching request is removed from the packet and not sent again.
A response is re-sent whenever the request is received, so if the request is received twice (perhaps because the response has not been received by the requesting node), the same response will be sent twice. This ensures responses are always delivered.
A simple mechanism is used to ensure that any message, be it a request or a response, is delivered to Galaxy’s logic only once.
Guaranteeing ordering
All messages sent from node A to node B must be delivered to B in the same order they’ve been sent by A in order to ensure correct ordering of memory operations, which is required for consistency.
All messages in a packet are processed by the receiving node in the order they’re placed in the packet. But what if a sender wants to send a message after a packet has been sent but may not have yet been received? If we were to create a new packet, ordering may be violated because UDP does not guarantee packets are delivered in the order they’re sent. If, on the other hand, we were to wait until all messages in a packet are received before sending a new one, this will unnecessarily increase latency.
Example: putting it all together
The following example will hopefully demonstrate how delivery and ordering guarantees work together (requests are marked Q and responses are marked S, Q-A2 means request #2 originating in node A, while S-B3 means a response to request Q-B3):

Multicasting
Some requests (specifically GET or GETX the first time an item is requested) are multicast to the entire cluster. These requests need to be ordered with all other messages sent to all the nodes. This is achieved as follows. A special message that cannot be added to the regular unicast packet is added to all message queues. Once all the messages in the packet have been acknowledged (all responses have been received), the special message is taken from the queue, and the peer object responsible for communicating with that specific node is put in a special, waiting state. Once all peer objects are waiting, the multicast packet is sent, and then re-sent, until all peer objects receive a response. Once that happens, they are all released, and can continue retrieving unicast messages from their message queues.
In order not to hold up all communication if one or a few nodes fail to respond to the unicast (due to some failure, full buffers etc.), if the number of peer objects still waiting for response falls below a configurable threshold, those peer objects are told to keep re-sending the message, but as a simple unicast message, and then all peer objects are immediately released to continue regular unicast operation.
Conclusion
This concludes the 3-part series on Galaxy’s internals. Future blog posts will discuss distributed data-structures implemented on top of Galaxy and their efficiency.
How Galaxy Handles Failures
Galaxy internals, part II
This is part 2 of a three-part series. Here are part 1 and part 3.
Last time I described how Galaxy, our open-source in-memory data grid, employs a cache-coherence protocol, similar to the ones used to coordinate CPU L1 caches, in order to enforce data consistency in the cluster. One big difference between common hardware protocols and the one used by Galaxy was that Galaxy tries to minimize broadcasting messages by keeping track of all sharers of every item, and by remembering an item’s most recent owner. This is necessary because the number of nodes in the cluster, unlike in most CPUs, can be quite large, and pestering all of them with broadcast messages can seriously hurt performance. Actually, some many-core hardware systems employ a similar technique (called the directory-based approach), only they have to deal with the problem of maintaining the directory in the memory-constrained environment of the L1 cache, while Galaxy doesn’t have this problem.
However, the biggest – and most interesting – difference between the hardware implementations and Galaxy is fault tolerance; hardware doesn’t have any. If a core or the CPU bus fail – the CPU fails, and that’s that. But in a cluster, the story is different. Galaxy, like any robust distributed system, must handle network and node failures. How it does that is the subject of this post.
Timeouts
As explained last time, most Galaxy messages are grouped into request/response pairs. Requests are sent as a result of some API call, and if a response is not received within some known, configurable, duration, the call throws a timeout exception. Timeouts can be thrown by all Galaxy operations, and must be handled by the application (usually, simply by retrying the operation, possible after some delay). Timeouts do not necessarily imply a software or hardware failure. Most commonly they are the result of a deadlock, which could happen when two or more nodes pin more than one item at the same time. In that case, a timeout will cause a transaction on one node to back-off while a conflicting one retries.
A common problem in distributed systems with timeouts is that there is no way of knowing whether the remote operation failed, or perhaps succeeded but the response has been delayed. Galaxy does not have this problem due to the nature of its remote operations: none of them are destructive. In fact, none of them have any effect on the data at all. Remember, all writes (data modifications) requested by a node are carried out locally on the node once it has received ownership of the relevant items. All remote requests do is transfer item ownership or invalidate shared copies. Actually, invalidation requests never time-out; they are retried indefinitely because they can never be involved in a deadlock, and the only reason they could fail is as a result of a node failure, which will eventually be detected and handled as explained later. So the only interesting request which might time-out is GETX (a GET can time-out as well, but that scenario is not that interesting).
If node A wants ownership of an item currently owned by node B, it will send B a GETX message. B will then mark the item as owned by A and respond with a PUTX. What happens if the response is delayed for any reason? Node A’s operation will eventually time-out, and the node will continue to believe that node B is the item’s owner. Node B, however, will think A now owns the item, so it will respond to any additional requests for the item from, say, node C, with a CHNGD_OWNR message. Assuming that message is received by C, it will then try to retrieve the item from A, which will, in turn, respond with yet another CHNGD_OWNR message claiming that B is the owner. If C’s operation does not time-out it will bounce from node B to A and vice-versa, sending its requests to one and then to the other, but eventually (unless a node failure occurs), the PUTX message will arrive at node A (node B will keep re-trying), which will then assume ownership and respond correctly to requests.
Node failures
Galaxy employs third-party software for cluster membership management, shared configuration and failure detection. In particular, it uses either JGroups or Apache ZooKeeper for the job. JGroups and ZooKeeper detect node liveness or failure by means of heartbeat messages recorded at some central location, so they provide a definitive (or, at least, consistent) registry of live or dead nodes.
When they detect a failing node, it is removed from the cluster. It is Galaxy’s, role, however, to ensure that the data stored on the dead node is not lost. This is achieved by redundancy. All data items owned by a node (and are therefore written only by that node) are replicated to slave nodes (each replicating a single master node) and/or to a central server that persists all data in the cluster to secondary storage (disk). Once a node fails, its owned data is served by its slave or by the server. JGroups/ZooKeeper are also used to inform all nodes of the current slave/master status of all other nodes, so that if a slave becomes a master upon its master’s failure, all nodes know which node is the new master.
We must watch out for one thing. As we’ll see shortly, the data served by the server or the slaves following a node failure, while always consistent, may not be as up-to-date as the latest item versions on the failed node. This, and the fact that failing to do so would wreak havoc on item ownership, we must ensure that a node does not respond to any requests if other nodes believe it is dead (messages passed between nodes completely bypass JGroups/ZooKeeper). There is no efficient and 100% way to guarantee that, but we can ensure at a high probability that this does not happen if we set the node’s connection timeout to ZooKeeper/JGroups is smaller than the timeout required for them to detect a node’s failure. This way a node will disconnect from ZooKeepr/JGroups before it is known to be dead, and in that case it will shut itself down.
Data replication
Before I delve into the particulars of slave nodes and the central server, I’d like to describe the general operation of data replication. After each write operation, an item’s latest version is kept in a backup packet, which is periodically sent to the node’s slaves and/or to the central server. This asynchronous replication enables a very high write rate, as writes do not have to wait for backups to complete. However, because writes complete before replication is acknowledged, and, in fact, not every item version is replicated at all – remember, backups are only done periodically – some durability is sacrificed for the sake of reducing latency. A node failure could mean losing, say, all updates made in the last 100ms – if that is the backup period we’ve configured.
But whether or not durability is maintained, we must preserve consistency at all costs, and asynchronous, periodic replication can jeopardize that. If node A has just finished writing version 100 of item X, which is then shared (via a GET message answered by a PUT) by node B, and then node A fails having only backed up version, say, 95 to its slave, future requests for the item (from the slave-turned-master) will yield version 95, but version 100 has already been read, and possibly used by node B to produce some other item’s value, and consistency is irreparably broken.
So, just as we do when waiting for INVACKs, all local operations on the item are allowed to proceed regardless of replication, but once the item is requested by another node, we flush the backup packet and wait until it is acknowledged before responding to the request. (As long as an item’s latest version has not been replicated, it is flagged as modified. The modified flag can be set while the item is either in E state, or even in the O state if a write has been carried out before all sharers have INVACKed, and this is why Galaxy’s cache-coherence protocol does not have a separate M (modified) state as those used by CPU L1 caches.) Even if a node suddenly dies, then, though its last few writes may be lost, data consistency is always maintained.
This general principle is behind several Galaxy optimizations: we allow write operations some slack until an external observer is involved. Because a well-behaved Galaxy application requires inter-node data transfers relatively rarely compared to intra-node operations, such optimizations should result in significant latency gains.
The BackupImpl class assists the Cache class with this replication logic.
Slave nodes
Each node may have zero or more slave nodes replicating its owned items. Only the owned items are replicated – not the shared ones, so when a node dies, the slave selected to replace it as master will not have any shared items. Therefore, when the other nodes detect its death, they will remove it from all of their relevant items’ sharers list. It does not matter if they perform this bookkeeping late, as all INV requests sent to the new master will automatically result in an INVACK response if the shared item is not found (this is Galaxy’s general behavior). Also, because slaves are not informed of items’ sharers, when a slave assumes the master’s role, it assigns all items the E state, so when node death is detected, all of its items’ sharers will invalidate their copies.

The logic outlined so far suffices if a node may have only one slave. If it has more than one, we must watch out for another possible scenario. Say node A has two slaves A1 and A2, and say that the latest backup packet sent from A to its slaves contains, perhaps among other items, version 100 of item X. A1 receives the packet, but then node A fails before A2 receives it. If A1 replaces A as the master, it has no way of knowing that A2 has an older version of X, and if A1 then fails without ever writing X and replicating it to A2, A2 would become the master with an old and – much more importantly – inconsistent value for X (as version 100 may have been used to compute other values). If, on the other hand A2 becomes the master, serves, say, version 95 of X, and then fails, the new master, A1, would now have an inconsistent value for X yet again.

One way of solving this is putting in place a consensus protocol to ensure all slaves agree on all values, but that would be very inefficient. A simpler solution is to perform leader-election among all of A’s slaves when it fails, with each slave advertising the latest packet it has received, electing the most up-to-date slave as the new master, and then pushing the missing updates to the other slaves.
This solution is not yet implemented in Galaxy, and that is why the current version allows a single slave per node (though you can manually start a new slave once the old one assumes the role of the master).
The server
Optionally, and regardless of whether or not slaves are used for availability, it is possible (and recommended) to configure the Galaxy grid to use a special node called, simply, the server. The server is to the peer nodes (as the regular nodes are called) not unlike what main memory (RAM) is to the L1 caches – it holds all the data items in the cluster in a large, though relatively slow, disk storage (a Galaxy server can currently be configured to use either BerkeleyDB or any SQL database as its disk-persistence engine). When a node fails, if it has no slaves, the server can then serve its owned items. In addition, the server’s persistent data-storage can be a desired feature in and of itself.
Just like with the slaves, when a server is present, all nodes periodically send it backup packets, and the entire replication logic remains the same. However, unlike a slave node, the server retains all items from all the nodes, so it must know, at any given time, which node owns what items, so that it will only serve relevant items when a node fails (remember, the server, or a slave, serving items when the owner node is alive can destroy consistency, because replicated data can be somewhat older than live data, so serving both could be catastrophic).
Simply remembering which node sent the backup packet containing the latest item version is not enough because an item can change hands without any new versions being created. So whenever a node gains ownership of an item (when it receives the PUTX message), it informs the server of the transfer by sending it an INV message, but unlike INV messages sent to the item’s sharers, it waits for the server’s response before allowing writes for reasons that will become clear shortly.

If a node then dies and has no slave, the server and all peers mark all of its owned items as owned by the server. If a node cannot locate an item in any of the peers, i.e. no node responds with a PUT to its GET multicast, it will request it from the server.
Another tricky scenario we have to contend with is the following: assume node A owns item X, and it transfers it to node B. Node B then immediately sends an INV message to the server to inform it that X is now owned by B, but before the message is received by the server, node A dies and the server marks its items as owned by the server. Then node C can come along, request the server for ownership of item X, and the server will happily comply, resulting with both B and C believing they’re each X’s lawful owner. Galaxy handles this case by having the server reply with an INV of its own to B’s INV request, rather than with an INVACK, and this is why B must wait for the server’s reply before writing X, to ensure that it is truly the sole owner.

Because the server keeps track of every item’s current ownership, it can serve another role other than a backup mechanism: some cloud environments (most notably Amazon EC2 and Rackspace) do not allow multicasts, so Galaxy can be configured to ask the server for the identity of an item’s owner when it is initially requested, rather than multicasting the request to all peer nodes.
The server’s entire logic is implemented by the MainMemory class.
Special thanks to Henry Robinson of Cloudera for going over Galaxy’s fault-tolerance design, pointing out flaws and suggesting solutions.
How Galaxy Maintains Data Consistency in the Grid
Galaxy internals, part I
This is part 1 of a three-part series. Here are part 2 and part 3.
In my previous blog post I explained Galaxy’s basic design, the rational behind it, and the use-cases where it may be appropriate. In this post, I’d like to explain in detail exactly how Galaxy works, aside from one topic – fault tolerance – which will be explored in a future post.
As mentioned last time, Galaxy employs a software implementation of a cache-coherence protocol, similar to the one used by CPUs to coordinate L1 caches. The purpose of the protocol is to allow any cluster node to read and write any data item while maintaing coherence.
Coherence (also called consistency in the DB world) basically means that any code accessing data items on any of Galaxy’s nodes (the Galaxy grid runs application code on the same nodes holding the data) perceives the entire memory – the collection of all data items – as the being same at any single point in time, meaning that no two nodes can observe the value of item X as being different in a single instant. Consistency is a desired property of a distributed system, because it makes is much easier for distributed code to function when item X has a well defined value at a single point in time, rather than a possible set of values depending on where in the cluster the code runs.
However, it becomes harder to define what “a single point in time” means when information takes time to travel. In any distributed computer system, it takes a non-negligible duration of time for any information to between two nodes, so any change node A makes to item X can only be observed by node B (or any other computer communicating with the cluster) some time later, depending on network latency.
So we’re going to define consistency like this: if the value of item Y changes after that of item X, no code will ever observe X’s old value after observing Y’s new value; i.e. everyone sees all memory update operations occurring in the same order. We could relax this requirement to maintain correct ordering only between updates that we know to be related – i.e. if we know that the value of Y depends somehow on that of X – and not otherwise, but Galaxy does not do that: it maintains complete ordering of all updates (CPUs require costly memory fence operations to enforce memory ordering when writing and reading the same memory address on different threads, but that is because the messages comprising the cache coherence protocol themselves may be received out of order, but Galaxy enforces strict message ordering at no significant cost, so no fences are required).
The cache-coherence protocol
Now let’s delve into the cache-coherence protocol itself. We will do that in two iterations. First, I’ll explain the concepts and approximate workings of the Galaxy’s cache-coherence in a way that will, hopefully, make it easy to understand how consistency is maintained. Then, I’ll explain how Galaxy uses a few tricks to maintain the same invariants in a way the reduces latency. All of the logic explained below is implemented in the class co.paralleluniverse.galaxy.core.Cache.
An instance of a data item residing in a node is called a cache-line in the Galaxy code, after the term used in CPU caches. However, it is neither a line in the sense that, unlike in CPU caches, it does not have a fixed length, nor strictly part of a cache, because all items reside on at least one node, unlike the case in CPUs where only a small portion of memory can be found on any of the caches. Still, Galaxy follows the CPU cache nomenclature because the protocol is basically the same (see this pdf for an overview of CPU cache-coherence protocols.
At any given time, each line can be at one of several states: exclusive (E), owned (O), shared (S) and invalid (I). (For those of you familiar with cache-coherence protocols, the modified (M) state is conspicuously missing. It actually does exist in Galaxy, but it will be discussed in the next post, which will deal with fault-tolerance.)
An item is always owned by exactly one node, and may be shared by many. A line in the O state, also maintains a list of all nodes sharing the item (where the line will be in the S state), and a line in the S state holds the identity of the owning node. If an item has no sharers, then its owner is exclusive, and its line will be in the E state. An invalid line is similar to a non-existent line, except that it keeps the identity of the last known owner.
Nodes exchange messages that change lines’ states. Messages are usually grouped into request/reply pairs, where the request is usually sent when application code running on a node initiates any Galaxy operation.
GET/PUT
When the application performs the get operation, requesting the reading of an item, if the cache-line for the item is not found (on the node running the code), then a blank line is created in the I state, and a GET message with the item id is multicast to the entire cluster. The node owning the item, then replies with a PUT message containing the item’s data, and adds the requesting node to the line’s sharers list. If the line was in the E state, its state is set to O. When the requesting node receives the PUT message, it writes the item’s data into the line, sets the line’s owner, and sets its state to S; then the get operation terminates.

GETX/PUTX
Now let’s assume that an item is shared by a node (the line in the S state) when the application code on that node wishes to update it (by calling set or getx). Only the owner of the item is allowed to modify it, so first the node has to become the item’s owner. To do that, it sends the current owner (a line in the S state knows who its owner is) a GETX message. The current owner then responds with a PUTX message, that contains the current sharers list, and puts the line in the I state, though keeping track of who the new owner is. The requesting node then puts the line in the O state.

INV/INVACK
Before actually writing the item’s new value, the new owner must invalidate all sharers of the item. To see why this is necessary in order to preserve consistency, consider the case when node A is the owner of items X and Y, and node B is a sharer of X. Node A modifies X and later modifies Y. Node B reads X and gets it’s old value because it still has line X in S mode. Then, to read Y (which does not exist in its local store), it sends a GET message to A which, in turn, replies with a PUT, and node B sees Y’s new value, thus violating consistency.
To prevent that, before modifying a line in the owned state, the owner must invalidate all shared copies. It does that by sending INV messages to all sharers, who then change the line’s state from S to I and respond with an INVACK. Once all INVACKs have been received (and and all the nodes removed from the sharers list), the line’s state is changed from O to E (exclusive), and the new value can be written.

CHNGD_OWNER
When a node has no idea who owns an item it is interested in, it multicasts a GET or a GETX message to the entire cluster (optionally, it can query a central server that will be discussed in the next post). This can happen if the node has never accessed the item before, and so its cache line does not exist in the local store, but this event should be rare. It is highly likely that an application suited to run well on Galaxy, will access the same items over and over. In the common case, a node will already have an idea who owns an item. If a line is in the S state, it contains the identity of the current owner. If a line is invalidated, it keeps track of the owner by noting the node that invalidated it. However, in both cases, it is possible that the owner information is out of date. Even if in the S state, it’s possible that ownership of the item has moved to a new node, but the INV from the new owner hasn’t arrived yet. Or, maybe the line is in the I state after being INVed by the new owner, and then ownership changes again, and the node is no longer aware of the change.
In such cases, the node, when sending a GET or a GETX, will contact the last known owner, and if that node is no longer the owner, it will respond with a CHNGD_OWNR message, containing the identity of the current owner, if it knows who it is. In a well-behaved Galaxy application, ownership will not jump around much, so getting a CHNGD_OWNR and trying again without bothering all the nodes with a multicast is a good approach.

Transactions and pinning
When the application wishes to update several items at once or “snapshot” a few items without letting any other node “steal” them while they’re being processed, the items can be pinned to the requesting node. Because we assume that such transactions will be short lived and do not want to increase the chattiness of the protocol, when an item is pinned the node simply holds on to any INV or GETX messages it receives without acting on them. Once the items are released, all pending requests are performed and the responses are sent.
Reducing write latency
We’ve now finished going through the first draft of Galaxy’s implementation, and have shown how Galaxy preserves the consistency requirement by following the cache-coherence protocol. Let’s now see how Galaxy exploits some heuristics to reduce latency.
I’ve explained why it is necessary to wait for a line to be in the E state (i.e. to wait for all sharers’ INVACKs) before modifying the item. But inconsistencies might only occur if other nodes get to see the new value of the item before all previous sharers have been invalidated. If no one else can observe the change, and since we’re guaranteed that all INVACKs will be received eventually (barring node failure) we can go right ahead and perform it. So, once the PUTX message is received and the line is put in the O state, INVs are sent to all sharers, but we do not wait for the INVACKs – we modify the element while still in the O state. However, if any GET/GETX messages from other nodes are received, they are buffered until we get all INVACKs and the line is in the E state. This can reduce latency significantly, because it is quite likely that the current owner will want to do several operations with the item before any other node requests it.
Reducing read latency
Say that our node shares items X and Y, which are then invalidated by their one owner because they are about to be modified. If our node then wants to read the item again, it has to wait for the owner to receive all INVACKs as explained in the previous section before it responds to our GET request with the items’ new values. This might take a while.
But notice that while both lines were in the S state, they were consistent. Now that they are in the I state, their values haven’t changed (on the local node), so while they may no longer be up to date, they should still be consistent according to our definition. If we want to read their values we can still return the old, invalidated, values to the application. It is only when we receive X or Y’s new value, that the other may no longer be consistent with it.
So, when node B invalidates some items in node A and the application on node A wishes to read those items, Galaxy sends GET request(s) to B, but immediately returns the stale, but consistent, values to the application. Only when node B sends a PUT message to A do all previous items invalidated by B are purged and cannot be returned until their new values are received.
Fault tolerance
Naturally, the cache-coherence protocols used to coordinate CPU L1 caches were not designed to be fault-tolerant. If one or more cores, or the CPU bus, fail – the machine fails. But Galaxy runs in a cluster, so it must be able to handle node failures without losing application data.
How Galaxy manages that will be the topic of my next post.
So go check out Galaxy now.
SpaceBase Can Now Be Used as a Spatial Redis
A number of people, especially location-based services (LBS) developers, have asked us if SpaceBase can run as a standalone spatial data-store server. Well, now it can!
We have added a remote networking API that uses Apache Thrift to access SpaceBase from Ruby, Python and Node.js applications, as well as an Erlang API which uses JInterface to make SpaceBase appear to an Erlang application as a regular process.
While using SpaceBase through the network API (rather than embedding it in your Java or C++ application) prevents you from taking advantage of SpaceBase multithreading framework that can parallelize your code, it can still serve as a low-latency spatial data store for write-heavy spatial applications – something like a spatial Redis.
You can receive an evaluation copy of SpaceBase by e-mailing us at info@paralleluniverse.co.
On Distributed Memory Systems
Or: Introducing Galaxy, a novel in-memory data grid by Parallel Universe
A distributed memory system is one that serves data items from multiple nodes that communicate by passing messages to each other, while providing the illusion that the items are served from a monolithic memory. There are three big problems facing a distributed memory system. The first, data placement or partitioning, is the problem of choosing on which node a data item should reside, as the entire data cannot fit in a single node. The second, consistency, concerns the possibility of reading different values of the same data item, as the distributed system may store it on different nodes. The third, fault tolerance, is the problem of handling node or network failures.
Designing a distributed memory system requires making tradeoffs with regard to those three problems, tradeoffs that affect performance, ease-of-use, and system availability.
While there has been much discussion about tradeoffs concerning consistency and fault-tolerance, in this blog-post I’d like to address the problem of data placement. The choice of how to partition data items across nodes determines the item access latency, as well as the load on each node.
To the best of my knowledge, all widespread distributed memory products, be it NoSQL databases like Dynamo, Riak, Cassandra and others, or in-memory data grids (more about those later) like Gigaspaces, Infinispan and Terracotta, use some sort of a distributed hash table (DHT), where one or several fields of each item or of the key used to access it are used to compute a hash value, which, in turn is mapped to a node. In order to locate an item, all you need to know is its hash-value.
For load balancing, these products often use consistent-hashing which helps distribute items evenly among cluster nodes as nodes are added or removed from the system. For completeness’ sake I’ll mention that some of these products (Riak, Cassandra) allow for eventual-consistency, while others (all IMDGs) acquire distributed locks to coordinate multi-item transactions.
Such DHT systems have the advantage that items can be located very easily. If you have the key, all you need to do is compute the hash and have some information about the nodes, like their position on the consistent-hash circle. Accessing an item, for either a read or a write operation, would then require a single network hop. This is the best we can do if we need to access items completely at random - which is indeed the case for many applications.
The downside to this approach is that while the single network hop is the worst-case scenario, it is also the common case. If we perform any computation on the memory nodes themselves (as is often done with data grids), if a computation requires more than one data item, chances are that they will each reside on a different node. Some systems solve this problem by using some sort of locality-preserving hashes that ensure that items are grouped in nodes in a way that keeps “similar” items together (whatever similar means for the application) so that range queries could be carried out efficiently without hopping all over the cluster to retrieve the data. Still, such solutions are rigid in the sense that items are not expected to change their hash value - and there’s usually no reason why you’d want them to in a key-value store.
How CPUs do it
Main memory (RAM) access is the slowest operation modern CPUs perform that doesn’t involve actual I/O. This is why modern CPUs employ on-core cache memory (L1 cache), which is a small memory unit attached to each of the CPU’s cores, with access latency two orders-of-magnitude smaller than main memory access. And because communication among cores is still an order-of-magnitude faster than accessing RAM, the on-core caches form a distributed memory system.
However, L1 caches do not use the distributed hash-table approach, because accessing a different core’s L1 is much slower than accessing the L1 of the cache your code is running on, and as we saw, a DHT access requires one hop in the common case. Instead, L1 caches migrate data items (called cache lines) from one cache to another using a cache-coherence protocol, which is also used to enforce consistency.
The way this works is that a cache line is “owned” by a single L1 cache at a time. If a thread running on one of the cores wishes to read a memory address, a copy of the appropriate cache line is fetched from the cache that owns it (or from the main memory if none do, but that detail is of no concern to us at this point). When a thread wishes to write to a memory address, its core’s L1 cache requests the owner cache for a transfer of ownership, and then requests all other caches to purge (invalidate) their read-only copies in order to ensure consistency (I highly recommend reading this excellent paper by IBM’s Paul McKenney that explains this process in detail). After that, the cache line remains owned by the thread’s L1 cache, until it is invalidated by another thread, running on a different core, that wishes to write to that cache line.
That means that when the thread wishes to write to the same cache line again, it would require no further bus communication, no further “network hops”. However, this it comes at the cost of an expensive initial lookup: because we cannot guess the owning node simply by the memory address (the “key”), we have to search for it on all cores, usually by broadcasting a request that requires the attention of all caches.
The reason this tradeoff pays for CPU cores is that the L1 caches are very small (a few megabytes), and most code displays a behavior known as temporal-locality: it tends to access the same memory addresses over and over in some short period of time. If the L1 caches were bigger than the span of a program’s temporal-locality (the amount of memory it accesses repeatedly in a short time-span) then it would contain many cache lines that a thread would only access once before another thread, on another core, requests them. Those lines would migrate randomly from one cache to another, and we’d lose most of the performance gains afforded by the cache.
This is why most software distributed memory systems, those that distribute memory across a cluster of machines, prefer using a DHT approach rather than cache-coherence. Each node contains a large amount of data, far exceeding applications’ temporal locality, so most applications’ data access patterns appear to the system as effectively random. In that case, it’s better to pay one network hop in the common case, and avoid the expensive lookup cache coherence requires in the worst-case.
Galaxy
When we were building SpaceBase, our real-time spatial data store for MMO games, location-based services and military C4I systems, we were looking for a solution that would let us distribute the data store over a cluster to support more objects and more processing, so we took a look at in-memory data grids. An in-memory data grid (IMDG) is a distributed memory systems that stores all data in cluster nodes’ RAM (as opposed to using the disk), and one that serves as the main data store (as opposed to a cache for some database). So IMDGs have relatively low latencies, and, unlike some NoSQL databases, they usually guarantee strong consistency, which our target applications demand.
Storing and querying spatial data is usually harder for a database than managing one-dimensional data. Spatial data structures are more complicated than their 1D counterparts, and are often less efficient. But spatial data does have one quality working in its favor - the access patterns aren’t random. Range queries are much more common for spatial data (e.g. query all objects contained in a region) than for one-dimensional data, and when accessing an object in a query or transaction, there is a very high probability that spatially near objects will be accessed as well, and a low probability that we’d access faraway objects in the same transaction.
Distributing a spatial database over a DHT would nullify this one advantage. Even if we were to hash the object’s location and use a locality-preserving hash, it would not have helped us, as the objects move around and this would require constantly changing the hash values; using a constant id for a key would not have made sense either, because then each transaction access random keys and would require many network hops.
And that’s why we’ve begun to build Galaxy, a new kind of IMDG that does not use hashing (or keys at all) for partitioning, but rather a software implementation of a cache-coherence protocol, very similar to the one used by CPU L1 caches. As objects move around in space, our database would use Galaxy to migrate them from one node to another, and because nearby objects are often accessed at the same time, keeping them on the same node would do away with network I/O for the common case, and would offset the higher cost of an initial object lookup.
An IMDG like Galaxy would not fit all applications. Those that display random data access - like commerce and finance applications I suppose, though I am not too familiar with them - are better off choosing one of the very good DHT products. But applications that require low latency processing of dynamic data that is best stored in bunches, and that requires strong consistency guarantees, would benefit greatly from a “cache-coherent” IMDG. Other than applications dealing with moving spatial objects (like MMO games), I think graph databases will benefit from distribute their data with Galaxy, as well. Graph databases’ most common operation is graph traversal, which has the same property as spatial databases - nearby objects are often accessed together, while objects far apart rarely are.
Now, at Parallel Universe we’re used to building military grade software. But, as we announced yesterday on TechCrunch, we’re now part of the summer ’12 batch of Y Combinator, so we’ve decided to try and adopt the start-up culture of “release early, release often”, and release Galaxy now, when it is not yet production ready. And because we think it has many uses that we haven’t thought of, and because we think there is a lot of interesting research to be done with a cache-coherent IMDG, we’re releasing it as open-source software, under the LGPL license. You can find it on Github right now.
How Galaxy works
Galaxy is a distributed RAM. It is not a key-value store. Rather, it is meant to be used as a infrastructure for building distributed data-structures. In fact, there is no way to query objects stored on Galaxy at all. Instead, Galaxy generates an ID for each item, that you can store in other items just like you’d store a normal reference in a plain object graph.
The application runs on all Galaxy nodes alongside with the portion of the data that is kept (in RAM) at each of the nodes, and when it wishes to read or write a data item, it requests the Galaxy API to fetch it.
At any given time an item is owned by exactly one node, but can be shared by many. Sharers store the item locally, but they can only read it. However, they remember who the owner is, and the owner maintains a list of all sharers. If a sharer (or any node) wants to update the item (a “write”) it requests the current owner for a transfer of ownership, and then receives the item and the list of sharers. Before modifying the item, it invalidates all sharers to ensure consistency. Even when the sharers are invalidated, they remember who the new owner is, so if they’d like to share or own the item again, they can request it from the new owner. If the application requests an item the local node has never seen (or it’s been migrated again after it had been validated), the node multicasts the entire cluster in search of it.
The idea is that when data access is predictable, expensive operations like item migration and a clueless lookup are rare, and more than offset by the common zero-I/O case. In addition, Galaxy uses some nifty hacks to eschew many of the I/O delays even in worst-case scenarios.
In the coming weeks I will post here the exact details of Galaxy’s inner-workings. What messages are transferred, how Galaxy deals with failures, and what tricks it employs to reduce latencies. In the meantime, I encourage you to read Galaxy’s documentation and take it for a spin.
A parting note
Galaxy is still experimental. It is absolutely not ready for production, but it is ready for research. It has bugs. Probably lots of them, and probably some conceptual bugs as well. But that’s mostly because, we think, it’s something quite new.
Discussion on Hacker News
Introducing SpaceBase: a New Realtime Spatial Data Store
Today, we are proud to launch Parallel Universe and introduce our first product: SpaceBase. SpaceBase is a server-side, in-memory, low-latency, dynamic, concurrent and distributed spatial data-store for 2D and 3D spatial objects. It is meant to be used by MMO (massively-multiplayer online) games, defense applications and location based services.
SpaceBase’s first customer was the Israeli Air Force. The IAF is currently using SpaceBase in an important soft-realtime C4I system that tracks a large number of moving object. While SpaceBase was originally designed around the IAF’s specific needs, it has since been turned into a general spatial middleware software with many possible applications.
You may think of SpaceBase as a spatial NoSQL database. Like all NoSQL databases, it’s built around the realization that not all data is the same, and not all data-usage patterns are the same. SpaceBase only maintains spatial entities: objects that have location, and possibly an extent, in 2D or 3D space. The entities can be virtual - characters and other objects in an MMO game - or represent real-life objects, be it cars and people an a LBS or tanks and aircraft in a defense system. Either way, SpaceBase has been optimized to handle a large number of dynamic entities (that move around a lot), and has been tailored for low-latency, soft- or even hard- realtime applications. It can easily handle many concurrent update transactions and queries.
All applications are built around data, and for spatial applications like games, LBSs and C4I systems, the representation of objects in space is their most basic component. Space comes before higher layers like physics simulation or other business logic. SpaceBase is meant to serve as this foundation layer, so you may use it as more than just a data-store: you can run your physics simulation, AI, or whatever business logic your application requires, as SpaceBase transactions, and let SpaceBase parallelize your computations for you across cores and across machines in your cluster.
Which brings us to another important point: SpaceBase is built to scale. It scales gracefully across CPUs and across a computing grid. It scales so well that it allows building large shardless MMO games. In-fact, shardless MMO games and virtual worlds were one of the use-cases SpaceBase was specifically built to handle. (Distributed SpaceBase, or SpaceBase-on-a-grid is currently in the advanced stages of testing, and will be available for evaluation and purchase within a couple of months. The single-node, multi- and many- core deployment is available for evaluation right now!)
Because Parallel Universe believes in openness and in a simple customer experience, our full product pricing will appear on the website, and none of the technical material available for download will require registration. However, at this point, we want to get to know our early adopters personally, so we kindly ask you to direct all pricing, as well as technical, questions - actually, any question you may have for us - to info@paralleluniverse.co. You may also ask for an evaluation copy of SpaceBase - we’d love you to take it for a spin!
You can find more details about SpaceBase, as well as download the documentation, on its product page.

