The Parallel Universe Blog

February 06, 2014

Implementing, Abstracting and Benchmarking Lightweight Threads on the JVM

Quasar is an open-source library that adds true lightweight threads – or fibers – to the JVM. Fibers are an extremely valuable addition to the JVM for two reasons: first, they allow an application to overcome the scheduling limitations of OS threads, which can become the software’s main bottleneck, especially in web applications. Second, it opens the door to several programming techniques that are invaluable for writing concurrent, scalable, and fault tolerant applications, like CSP, the actor model, and dataflow programming. These programming styles help make your application extremely powerful while keeping your code simple and easy to follow and reason-about, but they do require running hundreds-of-thousands, or even millions, of threads concurrently. That’s not possible with plain OS threads, but it is with fibers.

In this blog post, we’ll describe how Quasar fibers are implemented, and how they are abstracted to seamlessly run alongside plain Java (OS) threads.

What’s a Thread?

Fibers are lightweight threads, so before we understand what lightweight threads are, we must first understand what threads are. A thread is a common and very useful abstraction for code flow. It has two important components: a program counter, which is our current location in the code, and a stack that contains function parameters and local function variables. For a thread to run, it needs to be scheduled by a scheduler that assigns it to a CPU core. The scheduler can then suspend the thread and schedule another onto the same core. Later, the thread can be resumed when the scheduler assigns it to a core (be it the same core it previously ran on or another one), and set the core’s instruction pointer to that of the thread.

Multiple threads run on multiple cores concurrently, but threads are useful even when the CPU has a single core (though that is becoming increasingly rare these days) for two reasons: 1) it allows the programmer to conceptually separate the program into unrelated flows, and 2) it allows the scheduler to better utilize the core. The latter can happen when a thread awaits the result of an IO operation (in that case we say the thread blocks on IO). During that time, the CPU has nothing to do, so instead of idling, the scheduler schedules another thread for execution while the first awaits the IO operation to complete.

A thread may block on IO or on a syncronization construct like a mutex, in which case it waits not for an external event to occur, but for another thread to signal that it has completed some task. In this case, we say that the thread blocks on synchronization, or parks. When the thread is signalled, or awakened by another, we say it is unparked. Throughout the rest of the text we might use the terms park and block interchangeably.

Virtually all modern operating systems provide threads. The OS allocates stacks for new thread, and has a scheduler that features time-slice-based preemption, which means that it may decide to suspend a runnning thread and replace it with another when the former has utilized an alloted time-slice, even without it blocking on IO or synchronization.

The OS scheduler is usually very good, but when many threads block often on synchronization constructs, it may add significant latency when threads park and unpark (i.e. when the block and then awakened by synchronization mechanisms). This pattern, of many threads executing for a very short while, then blocking on synchronization, and then waking up again and so on, is becoming very common in modern applications, and the OS may have trouble dealing. The reason for that is that the OS scheduler is general, and suited to handle various kinds of thread behavior, but it isn’t optimized for specific use-cases. But the application does know how its threads behave, and can therefore schedule them better. Threads scheduled by user-mode code are what’s known as lightweight threads; in Quasar (as well as in some other languages) they are called fibers.

How Fibers are Implemented on the JVM

So, a thread (or a fiber), needs a program counter, a stack and a scheduler. Languages that provide lightweight threads, like Erlang and Go, have their compiler insert instructions into the generated machine code that allow for the maintenance of a stack and program counter by their runtime. The Java language doesn’t directly provide lightweight threads, but we can insert similar instructions into the compiled bytecode after the compiler has done its work via what’s known as bytecode instrumentation. Quasar can perform bytecode instrumentation at build time (immediately after compilation), or at runtime by using a Java Agent or a custom class loader.

Instrumentation adds instructions to save and restore the code’s current location in the program, as well as save and restore the contents of the stack. Becasue those instructions add some overhead (normally under 3%), Quasar does not instrument all methods. Instrumentation is only required to facilitate the suspension and resumption of fibers, so it is only required for methods that might potentially run in a fiber and block (methods that never block, i.e. they only perform a computation, need never be instrumented, even if they’re called within a fiber). Currently, the programmer must mark those blocking methods manually (either by declaring them to throw SuspendExecution or by annotating them with @Suspendable). Obviously, any function calling a blocking function is itself blocking.

But the ability to suspend and resume a fiber is just half of the equation. The second half is the scheduler. Quasar allows pluggable schedulers, and even lets different fibers be scheduled by different schedulers (a fiber is assigned a scheduler at construction time). By default (if no scheduler is specified), Quasar uses one based on the excellent, work-stealing, ForkJoinPool scheduler that comes bundled with the JDK.

User-mode code does not have direct control over the CPU, so the fiber schedulers schedule fibers onto OS threads. This scheduling scheme is known as M:N scheduling, as it maps M lightweight threads onto N kernel threads. When a fiber is started, it is scheduled to run on a thread; when it parks, it is suspended and de-scheduled; when another thread or fiber unparks it, it is scheduled again.

It is interesting to note that Google has proposed changes to the Linux kernel to alow user-mode code to schedule kernel threads. This will leave stack and program counter maintenance in the hands of the OS, but allow the threads to be scheduled by the application. If these changes are adopted, Quasar will be able to skip the instrumentation step in environments that have this feature.

Two well-known JVM languages have also recently introduced constructs akin to lightweight threads: Scala with Async, and Clojure with the excellent core.async. Those employ a similar instrumentation scheme, but perform it at the language level, using macros, rather than at the JVM bytecode level like Quasar. While this allows Clojure to support core.async on non-JVM platforms (like JavaScript), the approach has two limitations: 1) it is only supported by a single language and can’t interoperate with other JVM languages, and 2) because it uses macros, the suspendable constructs are limited to the scope of a single code block, i.e. a function running in a suspendable block cannot call another blocking function; all blocking must be performed at the topmost function. It’s because of the second limitation that these constructs aren’t true lightweight threads, as threads must be able to block at a any call-stack depth (Pulsar, Quasar’s Clojure API, contains an implementation of core.async that makes use of Quasar fibers).

Transforming Asynchronous Operations into Fiber-Blocking Operations, and Fiber-Blocking IO

The fiber’s park and unpark opration allow us, as we’ll see later, to implement synchronization constructs, but what about IO? The OS is directly responsible for blocking and unblocking threads on IO calls, so if a fiber calls a blocking IO operation, it will block the entire thread assigned to it by the scheduler, thus preventing other fibers from sharing it.

Luckily, the JDK provides us with non-blocking IO, better known as asynchronous IO. Rather than block the current thread, these IO callsreturn immediately, and later call a user-supplied callback when the operation completes. These asynchronous IO operations enjoy better scalability than blocking IO. Unfortunately, programming with callbacks is cumbersome, hard to follow, and might introduce tricky race conditions (because the callback might run concurrently with the function that initiated the operation).

Because the main reason for having fibers in the first place is to enjoy the scalability of asynchronous, callback-based code, while keeping the simplicity of blocking code, Quasar provides a simple mechanism to transform any asynchronous operation into a fiber-blocking one: FiberAsync.

Assume that operation Foo.asyncOp(FooCompletion callback) is an asynchronous operation, where FooCompletion is defined as:

interface FooCompletion {
  void success(String result);
  void failure(FooException exception);
}

We then define the following subclass of FiberAsync:

class FooAsync extends FiberAsync<String, Void, FooException> implements FooCompletion {
  @Override
  public void success(String result) {
    asyncCompleted(result);
  }

  @Override
  public void failure(FooException exception) {
    asyncFailed(exception);
  }
}

Then, to transform the operation to a fiber-blocking one, we can define:

String op() {
    return new FooAsync() {
      protected Void requestAsync() {
        Foo.asyncOp(this);
      }
    }.run();
}

The call to run will block the fiber until the operation completes, and now op() is the fiber-blocking counterpart of asyncOp(callback) (FiberAsync also allows you to specify what action to take when called on a plain thread).

Transforming asynchronous code to fiber-blocking calls has a negligible overhead both in terms of memory and performance, while making the code shorter and far simpler to understand (For IO operations that do not expose an asynchronous API – most notably JDBC – we can run them in a separate, size-restricted thread pool where they’ll be able to block all they want, and thus manually make them asynchronous).

Quasar uses FiberAsync and asynchronous NIO to implement the blocking NIO API so that it’s fiber- rather than thread-blocking if called on a fiber.

Comsat uses Quasar and FiberAsync to implement standard Java web-related APIs, and make them fiber-blocking.

Strands

While Quasar’s Fiber class has an API very similar to that of Thread, we would still like to abstract the two into a single interface. The Strand class is that abstraction. A strand in quasar is either a (plain Java, OS) thread or a fiber.

The Fiber class directly extends Strand, but Threads have to be wrapped (automatically, as we’ll see).

The static method Strand.currentStrand(), returns the current strand (the one that called the method). It will return a strand representing a fiber or a thread depending on whether it was called from a fiber or a thread.

The Strand class abstract pretty much all of the operations you can do with threads or fibers. Strand.sleep(int millis) suspends the current strand (again, thread or fiber) for the given duration. Strand.getState() and Strand.getStackTrace() returns a strand’s execution state and stack trace, again, regardless of whether the strand is a fiber or a thread.

But perhaps most importantly, Strand exposes park and unpark methods, that generalize these operations for both threads and fibers.

Synchronization and Channels

Strands allow us to write synchronization code that works equally well when called on a fiber or a thread. For example, Quasar’s channels (that work like Go channels) are queues with added synchronization that allows a caller to block until a message is received on the channel. Because all of Quasar’s synchronization code uses the strand abstraction, it works equally well whether the code runs on a fiber or a plain Java thread.

This is why fibers and threads can run side by side, and you can easily choose how you want your code to run. Code that runs in short bursts and blocks often (e.g. code that receives and responds to messages) is better suited to run in a fiber, while code performs a lot of computations and blocks infrequently better belongs in a plain thread. The two can still exchange data and synchronize with one another thanks to the strand abstraction.

In fact, the synchronization classes in java.util.concurrent (locks, semaphores, countdown-latches and more) do not rely on OS-provided constructs, but are implemented directly in Java using CPU concurrency primitives like CAS combined with the ability to park and unpark fibers. Therefore, all we had to do to adapt these classes to work for fibers as well as threads was to replace all references to Thread with Strand, and every call to LockSupport.park/unpark with calls to Strand.park/unpark. Quasar contains a few such ports from java.util.concurrent (the package’s source code is in the public domain) that work on strands and maintain the exact same API, so no code changes are required by clients of these classes.

Benchmarking Fibers

There are many ways to benchmark fiber performance: you can try to measure scheduling overhead alone, try to mimic a real application, and do either with different strand behaviors. The following experiment will not be exhaustive (far from it), partly because it will make for a dull read, and partly because benchamrks are always subject to interpretation. Instead, I will present one particular benchmark, and briefly mention the results.

You can find the benchmark code here, and this is what it does: It creates strands conceptually arranged in several rings. Strands in each ring are connected with channels to the strand preceding them in the ring and the strand following them. A strand designated as a leader in each ring sends a message to the next strand in the string. That one, in turn, upon receiving a message, sends one to the next and so on. Messages travel around the rings a givn number of times.

Our parameters are, then, the number of rings, the number of strands in each ring, and the number of cycles messages travel along each ring. Because every strand is blocked until it receives a message, then it wakes up only for it to send a message and block again, all strands in a given ring are dependent on one another, and require the scheduler to do some work, scheduling them in quick succession. Strands in different rings are independent, and as long as the number of rings is less than the number of cores, the scheduler presumably does not have to work too hard to schedule them. Once the number of rings exceeds the number of cores, though, strands in different rings can interfere with one another’s scheduling. At least that’s the theory.

In order for the benchmark to more closely resemble a true application, the strands do some string manipulation in the messages, and generate some garbage in the process.

What are the results? Well, depending on the different parameters, and running on an 8 (virtual) core ubuntu with Java 7, fibers beat threads in this benchmark by a 6x-12x margin, and the threads had a hard time dealing with more than a few thousands of them in terms of memory allocations.

Join our mailing list

Sign up to receive news and updates.

Tags:

comments powered by Disqus