October 16, 2013

# No More Callbacks: 10,000 Actors, 10,000 Threads, 10,000 Spaceships

## Adopting Ideas from Erlang and Clojure for a Highly Concurrent, Simple and Maintainable Application

I recently stumbled across this quote 1 by Doron Rajwan of Intel: “For the past 30 years, computer performance has been driven by Moore’s Law; from now on, it will be driven by Amdahl’s Law. Writing code that effectively exploits multiple processors can be very challenging.” I could not agree more. Writing correct and efficient multi-threaded code is at once necessary and extremely difficult. The challenge can be met, I believe, by adopting solutions that rest on two legs: languages (or at the very least – programming idioms) designed for concurrency, and appropriate transactional concurrent data-structures.

The former will need to enforce immutability, help manage state, and provide simple and effective concurrency mechanisms, such as message passing. So far, Clojure and Erlang seem to fit the bill. The latter will need to work hand-in-hand with the application, and help it exploit parallelism rather than hinder it, as many databases are prone to do.

Seven months ago, I wrote a blog post describing a simulation of a spaceship battle that uses SpaceBase, our in-memory spatial database, to drive parallelization.

The most common complaint we received about the demo concerned the cumbersome use of callbacks. This, of course, is a problem with many asynchronous computations: you gain concurrency but lose the natural code flow. It’s hard for a programmer to reason about which line of code executes on which thread, and passing information from one callback to another is cumbersome as well.

Callbacks are indeed an annoyance (to say the least), but the alternative – lots of blocking threads – is hard to scale. To solve this, we have developed a lightweight thread library for the JVM called Quasar, along with its Clojure API – Pulsar.

A previous blog post explains in some detail how Quasar lightweight threads (which we call fibers) are implemented. Here I’ll just say that unlike regular threads, you can have hundreds of thousands, or even millions of fibers running concurrently on the same machine. Fibers are optimized for fast context-switches, and are suitable when many operations interact with one another (that is pass information to one another using some mechanism or other). They offer the same scalability afforded by asynchronous callbacks without giving up the simple and natural programming style of synchronous (or single-threaded) code. Instead of a callback, you have fiber-blocking operations, that are as simple as a plain (thread-) blocking operation, only cheaper.

Quasar then builds on the foundation of fibers, and includes Go-like CSP constructs (selectable channels), and an Erlang-like actor system.

The first spaceships demo used only the second component of the two-pillared solution – namely a concurrent, transactional data structure that helps with business logic scaling – but not the first: a concurrency-oriented programming language or paradigm.

To test how well the two parts fit together, we’ve re-written the demo so it uses Quasar actors, as well as the newly released SpaceBase 2.0, which integrates with Quasar. While written in Java rather than in a concurrency-oriented language, the demo uses Quasar’s API which is heavily based on ideas taken from Erlang and Clojure, and I think that the code demonstrates the approach well. If we get the chance, we’ll port the demo to Clojure.

## The Code

Let’s dive into the code. The basic idea is this: we have thousands of spaceships, each modeled as an actor running its logic in a separate fiber (lightweight thread). Each spaceship also stores the publicly-visible portion of its state in the SpaceBase store.

The spaceships find one another by querying their surroundings in the database, then act accordingly – move away, chase, fire – and communicate with other spaceships via messages.

The rendering thread queries the database several (currently, ten) times a second to render a frame, and extrapolates the spaceships’ locations between queries to render 30 (or more) frames per second.

Spawning and Supervising Actors

After setting up the database, the main program spawns the spaceships actors within a supervisor:

new SupervisorActor(SupervisorActor.RestartStrategy.ONE_FOR_ONE) {
@Override
protected void init() throws InterruptedException, SuspendExecution {
for (int i = 0; i < N; i++)
addChild(new Supervisor.ChildSpec("ship-" + i,
Supervisor.ChildMode.PERMANENT, 5, 1, TimeUnit.SECONDS, 3,
ActorSpec.of(Spaceship.class, Spaceships.this, i, phaser)));
}
}.spawn();


A supervisor is an actor that normally monitors other actors for failure, and takes some action – such as restarting the failing actor or all of the supervised actors – when failure occurs. Here we’ll use it to simply create a new spaceship when one is destroyed. A spaceship can be destroyed either as a result of an application event – it blows up after getting hit a number of times – or a software failure – say, an exception occurs. In any case, the supervisor will spawn a new spaceship to replace the dead one (that is the meaning of the ONE_FOR_ONE strategy).

The main thread then creates the application window along with its rendering thread, and then simply loops and prints out the effective rate of simulation cycles every second.

## The Spaceship Actor

The Spaceship class extends BasicActor, which means a spaceship is an actor. As an actor, it implements the doRun method, which is the actor’s main function: it starts running in a separate fiber when the actor is spawned, and when the function returns (or terminates abnormally with an exception), the actor dies (which will result in the supervisor noting the death and spawning a new spaceship).

Pretty much the first thing the spaceship actor does is to insert itself into the database:

global.sb.insert(new TransactionalRecord<>(this, stateRecord), getAABB());


getAABB() returns the (axis-aligned) bounding-box of the spaceship (its location, really), and is used internally by the spatial database; but what is this TransactionalRecord business?

Exposing Public State as Records

Let’s go back to the spaceship’s constructor:

this.state = new State();
this.stateRecord = SpaceshipState.stateType.newInstance(state);


The state field contains the spaceship’s public state (position, velocity etc.). The stateRecord field holds a view of the state field in the form of a record

Just as Erlang teaches us that an actor should expose its operations through a simple, standard interface that allows (pretty-much) only to send it messages, Clojure teaches us that data, or state, should also be exposed through a simple, standard, interface (in Clojure’s case it is the map). A record, then, is such a data-access interface akin to a map. It basically has get(field) and set(field, value) methods, only it preserves the type information of its field to provide type safety at runtime.

The record we’re creating here has fields defined in the SpaceshipState class, while SpaceshipState.stateType.newInstance(state) creates a Record view of the state object by mapping the record’s fields to the state object’s field 2. I should note that accessing an object through the Record interface imposes a negligible performance penalty, because records are implemented using various high-performance techniques like code-generation and HotSpot intrinsics (so r.get($x) is just as fast as o.x). So, instead of calling obj.getX() we call obj.get($x). What does this get us other than re-inventing what is a basic Java functionality, minus the type-safety? Well, both the actor approach and the Clojure standard data-representation approach give up type safety (we preserve the type of the x field, but the compiler can’t tell us whether obj even has an x field; similarly, if we send message m to actor a, the compiler can’t know whether a supports an m operation), they do so at well-defined interface points between separate software components. What we gain is loose coupling. For example, among other things, we gain the ability to swap the implementation of the record or actor at runtime for maintenance (hot code-swapping will be the major feature of the next – 0.4.0 - – release of Quasar).

We gain other things by limiting component interaction to the narrow interfaces of actors and records, and that is an ability to insert cross-cutting concerns. For example, what happens if a method that consumes a resource is called too often? We need to explicitely insert load-handling code into the method. But if we communicate with the component through an actor interface, we can implement a general policy of handling too many messages that are thrown at any actor. Similarly with records. Remember the TransactionalRecord that the spaceships inserts into the database? It applies transactional access permissions to the record. Other spaceships will be able to read our spaceship’s state only when they’re in a transaction, and will be able to modify it only when they’re in a modifying transaction. Any attempt to read or write state outside a transaction will throw a runtime exception.

## The Main Loop

The spacehip’s main loop looks like this:

for (;;) { // loop forever
final long nextCycle = status == Status.ALIVE
? Math.min(state.lastMoved + MIN_PERIOD_MILLIS, nextActionTime())
: nextActionTime();

SpaceshipMessage message = receive(nextCycle - now(), TimeUnit.MILLISECONDS);

if (message != null) {
// handle message
if (message instanceof Shot) {
boolean killed = shot(now, ((Shot) message).x, ((Shot) message).y);
// ...
} else if (message instanceof Blast)
blast(now, ((Blast) message).x, ((Blast) message).y);
} else {
// perform time-based operations
if (status == Status.GONE) {
// ...
return null; // exit loop, destroy ship
} else if (status == Status.ALIVE) {
if (!isLockedOnTarget()) {
if (canFight(now) && wantToFight(now))
searchForTargets();
} else
chaseAndShoot();

applyNeighborRejectionAndMove(now);
}
}
}
// ...
global.sb.delete(state.token);


The receive method returns any messages sent to the spaceships by other spaceships (in the demo, a message telling us we’ve been shot, or that we’re within the blast radius of an exploding ship), and if no messages are available, it waits for a while and then performs time-based operations, like moving the spaceship.

If the spaceship is destroyed (as a result of a Shot message), we exit the loop, delete the spacehisp from the database, and return from the doRun message, resulting in the actor’s termination (and the spawning of a new spaceship by the supervisor).

How do other spaceships send us messages? The state record contains a reference to the ship’s actor. This is an indirect reference of type ActorRef. It has a send method that allows sending messages to the actor, but not much else. This is Quasar actor’s way of providing isolation: no actor can hold a direct reference to another actor, which allows us to change the actor’s implementation at runtime.

## Transactions

Now let’s turn our attention to one of the transactions:

try (ResultSet<Record<SpaceshipState>> rs = global.sb.queryForUpdate(
SpatialQueries.range(myAABB, global.range),
SpatialQueries.equals((Record<SpaceshipState>) stateRecord, myAABB), false)) {

for (ElementUpdater<Record<SpaceshipState>> updater : rs.getResultForUpdate()) {
move(now);
state.status = status;
// ...
updater.update(getAABB());
}
}


queryForUpdate initiates a modifying transaction on the database, and returns a ResultSet. The ResultSet contains, in this case, the state records of the neighboring spaceships. The applyNeighborRejection reads the neighbors’ locations, and steers away from them. Because the neighbors are returned in the ResultSet’s read-only set, attempting to write their fields will result in a runtime exception, courtesy of the TransactionalRecord.

The transaction appears simple: it’s a plain try (with-resources) block, that can access and modify any of the method’s local variables. Behind the scene, the story is more complex. When we issue the call to queryForUpdate, the current fiber blocks, and requests the database to schedule it to continue running when the results are collected. Within the transaction, the fiber runs under the control of the database rather than the regular fiber scheduler. When the transaction completes (at the end of the try block), the database hands the fiber back to the fiber scheduler.

We have essentially transformed an asynchronous operation, normally handled with a callback, into a fiber-blocking operation. The fiber blocks until the query results are available and then resumes, but the task-switch involved is an order-of-magnitude faster than OS controlled thread context switch.

All this is completely transaparent, of course. All you see and care about is a single thread of execution.

Other queries in the code search for targets within a cone extending from the front of the spaceship, or are used to chase after a target we’ve locked onto.

## Synchronizing the Simulation

Many simulations require all simulated objects to work in lock-step: all entities perform a single simulation step and then wait for the cycle to complete before commencing the next one. java.util.concurrent has a synchronization mechanism specifically for this behavior called a Phaser.

Our spaceships are actors, each running in its own fiber (lightweight thread), which exposes an API practically identical to that of a regular Java thread. This allows Quasar to re-use java.util.concurrent constructs designed for threads, and apply them to fibers with hardly any code changes (actually, Quasar’s synchronization constructs ported from java.util.concurrent work for plain threads as well as fibers).

We can configure the demo to use a phaser which is passed to each spaceship. When we do that, every spaceship awaits the beginning of a new cycle once it completes execution of the current one.

Using a phaser forces all spaceships to synchronize execution, but it has its performance price. If we wait for the last spaceship to finish the current cycle, we don’t continuously exploit all cores – we essentially wait for everything to quiet down before starting the whole process again.

## Running the Demo

The demo requires an installation of JDK 7.

To run the demo, clone the GitHub repository, and run ./gradlew on Unix or Mac, or gradlew.bat on Windows.

By default, the demo uses all available cores. To change the default, uncomment and set the co.paralleluniverse.fibers.DefaultFiberPool.parallelism property in gradle.build.

You can set other parameters, such as the number of spaceships, the size of the world or whether or not to use a phaser, by editing src/main/resources/spaceships.properties.

## Performance

On my 4-core (8 virtual cores) i7 MacBook, with 10,000 spaceships, I get close to 10 simulation cycles per second. Every cycle, each of the spaceships executes, on average, two database transactions – one of them is read-only, and one of them includes an update – to a total of 200,000 database queries and 100,000 updates per-second, not including sending and processing messages.

When running the simulation synchronously, i.e. with a phaser, performance drops to about 8 cycles per second on my development machine.

It’s important to mention that the JVM takes a while to warm-up and reach a steady performance level. The warming-up process may take up to two-and-a-half minutes (you’ll see the RATE value printed to the console every second slowly climb), depending on the number of spaceships and the number of cores used. Performance may be improved further by tuning garbage collection (currenctly the JVM defaults are used).

Bear in mind that this implementation is by no means minimal, or optimized for maximum simulation (or game) performance. Every database transaction is monitored, measured and reported (via JMX). Every message passed is counted and reported as well. You can query any actor’s mailbox, or get a current stack-trace of its execution at runtime; and, in the next version of Quasar, you’ll also be able to hot-swap the spaceship logic without stopping or pausing the program.

## Conclusion

The combination of a concurrent, parallelizing database with an actor system on top of lightweight threads gives us several huge advantages:

• Performance – we are able to fully exploit the computing power of modern multi-core hardware.
• Simplicity and familiarity - The code appears completely single-threaded. The programmer need not worry about locks and data races. Just like plain threads, Quasar fibers (and therefore actors) can block on IO or some other synchronization mechanism like the phaser, and handle context-switches faster than plain threads.
• Isolation – every software component is fully isolated from every other component, be it by an ActorRef that only allows message passsing, or a TransactionalRecord. No spaceship can inadvertantly impact another’s correct execution. All shared state is protected by transactions and cannot be modified outside a transaction.
• Fault tolerance – Every error is immediately recognized by the supervisor actor, which can then take immediate action.

This demo also implements, at least partially, John Carmack’s vision for concurrent game programming, outlined in his Quakecon 2013 keynote. We gain efficient concurrency without locks, using a simple and familiar coding style, all while providing good isolation for our components.

There are other actor system for the JVM. The best known ones are probably Akka and Kilim, but neither gives us the same advantages as Quasar. Akka has no true lightweight threads (the actors are actually callbacks), while Kilim does, but both interact very badly with blocking operations like IO, DB calls or synchronization constructs (like the phaser we’ve used).

Of course, the approach used in this demo can – and should – be applied not only to simulations or games, but to any concurrent application, like web services. Stay tuned for more announcements in this area.

Sadly, Quasar’s documentation is still sorely missing (we’re working on it!). But the spaceships demo as well as the examples in the Quasar repository should be enough to get you started. So clone or fork the spaceships demo and/or Quasar and start hacking some super-concurrent code!

Discuss on Hacker News

1. Which appears on the back of Java Concurrency in Practice.

2. It is also possible to create a standalone record that is not backed by a plain Java object.