August 12, 2014

# Farewell to Asynchronous Code

##### By Ron

Quasar is a library that adds true lightweight threads (fibers) to the JVM. These are very cheap and very fast – in fact, fibers behave just like Erlang processes or Go goroutines – and allow you to write simple blocking code while enjoying the same performance benefits of complex asynchronous code.

In this post we’ll learn how to transform any asynchronous, callback-based API, into a nice (fiber-)blocking API. It is intended for people who wish to integrate their own – or third-party – libraries with Quasar fibers. You don’t need to know this stuff if you just use Quasar fibers with channels or actors, or make use of the many integrations already available in the Comsat project (the code presented below is code the application developer never sees). But even if you don’t, you might find this post helpful in understanding how Quasar does its magic.

## Why Async?

The reason why many libraries provide asynchronous APIs in the first place, is that the number of running1 threads the OS can handle is far lower than, say, the number of open TCP connections the OS can maintain. Namely, your machine can support much higher concurrency than offered by threads, so libraries – and the developers using them – abandon the thread as the abstraction used for a unit of software concurrency 2. Asynchronous APIs don’t block threads and can lead to significant performance gains (usually in throughput and server capacity – not so much in latency).

But using asynchronous APIs also creates code that has rightfully earned the name “callback hell”. Callback hell is bad enough in environments that lack multi-core processing, like Javascript; it can be a lot worse in those, like the JVM, where you need to care about memory visibility and synchronization.

Writing blocking code running on fibers gives you the same advantages as async code without the downsides: you use nice blocking APIs (you can even keep using the existing ones), but you get all the performance benefits of non-blocking code.

To be sure, asynchronous APIs have one more advantage: they allow you to dispatch several IO operations (like HTTP requests) concurrently. Because these operations usually take long to complete, and they are often independent, we can simultaneously wait for several of them to complete. This useful features, however, is also possible with Java futures, without requiring callbacks. Later we’ll see how to make fiber-bocking futures.

## FiberAsync

Many modern Java IO/database libraries/drivers come with two flavors of APIs: a synchronous (thread-)blocking one, and a callback-based asynchronous one (this is true for NIO, JAX-RS client, Apache HTTP client and many more). The synchronous API is much nicer.

Quasar has a programmatic tool that transforms any callback-based asynchronous API into a nice fiber-blocking one: FiberAsync. Essentially, what FiberASync does is block the current fiber, install the async callback, and when the callback is fired, it wakes up the fiber again, and returns the result of the operation (or throws an exception if it failed).

To understand how to use FiberAsync, we’ll look at an example of an API: FooClient. FooClient is a modern IO API, so it’s got two flavors, a synchronous, thread blocking one, and an asynchronous one. Here they are:

interface FooClient {
String op(String arg) throws FooException, InterruptedException;
}

interface AsyncFooClient {
Future<String> asyncOp(String arg, FooCompletion<String> callback);
}

interface FooCompletion<T> {
void success(T result);
void failure(FooException exception);
}


Note how the async operation – as is the case in many modern libraries – both takes a callback and returns a future. For now, let’s ignore the future; we’ll come back to it later.

FooClient is a lot nicer and simpler than AsyncFooClient, but it blocks a thread and significantly reduces throughput. We want to create an implementation of the FooClient interface that can run in – and block – a fiber, so we get to have simple code and great throughput. To do that, we’ll employ AsyncFooClient under the hood, but turn it into a fiber-blocking FooClient. Here’s all the code we need (we’re going to simplify it further in a bit):

public class FiberFooClient implements FooClient {
private final AsyncFooClient asyncClient;

public FiberFooClient(AsyncFooClient asyncClient) {
this.asyncClient = asyncClient;
}

@Override
@Suspendable
String op(final String arg) throws FooException, InterruptedException {
try {
return new FiberAsync<String, FooException>() {
@Override
protected void requestAsync() {
asyncClient.asyncOp(arg, new FooCompletion<String>() {
public void success(String result) {
FiberAsync.this.asyncCompleted(result);
}
public void failure(FooException exception) {
FiberAsync.this.asyncFailed(exception);
}
});
}
}.run();
} catch(SuspendExecution e) {
throw new AssertionError(e);
}
}
}


Now, what’s going on here? We’re implementing the FooClient interface, but we’re making op fiber-blocking rather than thread-blocking. We need to tell Quasar that our method is fiber-blocking (or “suspendable”), so we annotate it with @Suspendable.

Then, we’re subclassing FiberAsync and implementing the requestAsync method (the two generic type arguments FiberAsync takes are the return type and the type of checked exception the operation may throw, if any; for no checked exceptions, the second generic argument should be RuntimeException). requestAsync is responsible for initiating the asynchronous operation and registering the callback. The callback, then, needs to call asyncCompleted – if the operation succeeds – and pass it the result we wish to return, or asyncFailed – if the operation fails – and pass it the exception at the cause of the failure.

Finally, we call FiberAsync.run(). This blocks the current fiber, and calls requestAsync to install the callback. The fiber will remain blocked until the callback is fired, which will release the FiberAsync by invoking either asyncCompleted or asyncFailed. The run method also has a version that takes a timeout argument, which can be useful if we want to time-limit the blocking operation (this is a good idea in general).

One more thing to explain is the try/catch block. There are two ways of declaring a method suspendable: annotating it with @Suspendable or declaring it to throw the checked exception SuspendExecution. FiberAsync’s run method employs the latter, so in order for the code to compile, we need to catch SuspendExecution, but as it’s not a real exception, we can never actually catch it (well, at least not if Quasar is running correctly) – hence the AssertionError.

Once that’s done, you can use op in any fiber, like so:

new Fiber<Void>(() ->{
// ...
String res = client.op();
// ...
}).start();


By the way, all of that is a lot shorter with Pulsar (Quasar’s Clojure API), where the asynchronous operation:

(async-op arg #(println "result:" %))


Is transformed into the following, synchronous, fiber-blocking code with Pulsar’s await macro:

(println "result:" (await (async-op arg)))


## Simplifying and Mass-Producing

Normally, an interface like FooClient will have many methods, and normally, most of the methods in AsyncFooClient will take the same type of callback (FooCompletion). If that’s the case we can encapsule much of the code we’ve seen into a named subclass of FiberAsync:

abstract class FooAsync<T> extends FiberAsync<T, FooException> implements FooCompletion<T> {
@Override
public void success(T result) {
asyncCompleted(result);
}

@Override
public void failure(FooException exception) {
asyncFailed(exception);
}

@Override
@Suspendable
public T run() throws FooException, InterruptedException {
try {
return super.run();
} catch (SuspendExecution e) {
throw new AssertionError();
}
}

@Override
@Suspendable
public T run(long timeout, TimeUnit unit) throws FooException, InterruptedException, TimeoutException {
try {
return super.run(timeout, unit);
} catch (SuspendExecution e) {
throw new AssertionError();
}
}
}


Note how we’ve made our FiberAsync directly implement the FooCompletion callback – that’s not required, but it’s a useful pattern. Now, our fiber-blocking op method is a lot simpler, and other operations in that interface can be implemented just as easily:

@Override
@Suspendable
public String op(final String arg) throws FooException, InterruptedException {
return new FooAsync<String>() {
protected void requestAsync() {
asyncClient.asyncOp(arg, this);
}
}.run();
}


Sometimes we might like our op method to be called on regular threads, rather than on a fiber. By default, FiberAsync.run() throws an exception if called on a thread. To fix that, all we have to do is implement another FiberAsync method, requestSync, which calls the original synchronous API if run is invoked on a fiber. Our final code looks like this (we assume that the FiberFooClass has a syncClient field of type FooClient):

@Override
@Suspendable
public String op(final String arg) throws FooException, InterruptedException {
return new FooAsync<String>() {
protected void requestAsync() {
asyncClient.asyncOp(arg, this);
}
public String requestSync() {
return syncClient.op(arg);
}
}.run();
}


And that’s that!

## Futures

Futures are a convenient way to allow several long, independent IO operations to commence simultaneously while we wait for all of them to complete. We want our fibers to be able to block on futures. Many Java libraries return futures from their asynchronous operations, so that the user is able to choose between fully asynchronous, callback-based usage, and “semi-synchronous” usage that employs futures; our AsyncFooClient interface works just like that.

This is how we implement a version of AsyncFooClient that returns fiber-blocking futures:

import co.paralleluniverse.strands.SettableFuture;

public class FiberFooAsyncClient implements FooClient {
private final AsyncFooClient asyncClient;

public FiberFooClient(AsyncFooClient asyncClient) {
this.asyncClient = asyncClient;
}

@Override
public Future<String> asyncOp(String arg, FooCompletion<String> callback) {
final SettableFuture<T> future = new SettableFuture<>();
asyncClient.asyncOp(arg, callbackFuture(future, callback))
return future;
}

private static <T> FooCompletion<T> callbackFuture(final SettableFuture<T> future, final FooCompletion<T> callback) {
return new FooCompletion<T>() {
@Override
public void success(T result) {
future.set(result);
callback.completed(result);
}

@Override
public void failure(Exception ex) {
future.setException(ex);
callback.failed(ex);
}

@Override
public void cancelled() {
future.cancel(true);
callback.cancelled();
}
};
}
}


The future we return, co.paralleluniverse.strands.SettableFuture, works equally well if we block on it on either fibers or plain threads (i.e. on any type of strand).

## JDK 8’s CompletableFuture and Guava’s ListenableFuture

APIs that return CompletionStage (or CompletableFuture which implements it) – added to Java in JDK 8 – can be made fiber-blocking much more easily, with pre-built FiberAsyncs. For example,

CompletableFuture<String> asyncOp(String arg);


is turned into a fiber-blocking call with:

String res = AsyncCompletionStage.get(asyncOp(arg));


Methods returning Google Guava’s are similarly transformed to fiber blocking synchronous, so:

ListenableFuture<String> asyncOp(String arg);


is turned fiber-blocking with:

String res = AsyncListenableFuture.get(asyncOp(arg));


## An Alternative to Futures

While futures are useful and familiar, we don’t really need a special API that returns them when we use fibers. Fibers are so cheap to spawn – and the Fiber class implements Future – so that the fibers themselves can replace “hand crafted” futures. Here’s an example:

void work() {
Fiber<String> f1 = new Fiber<>(() -> fiberFooClient.op("first operation"));
Fiber<String> f2 = new Fiber<>(() -> fiberFooClient.op("second operation"));

String res1 = f1.get();
String res2 = f2.get();
}


So fibers give us futures even when the APIs we’re using don’t.

## What If There’s No Async API?

Sometimes we unfortunately encounter a library that only provides a synchronous, thread-blocking API. JDBC is a prime example of such an API. While Quasar can’t increase the throughput of working with a library like that, making the API fiber-compatible is still worthwhile (and very easy, actually). Why? Because the fibers making the calls to the synchronous service probably do other stuff as well. In fact, they might call the service rather infrequently (consider a fiber reading data from an RDBMS only when a cache-miss occurs).

The way to achieve this is to turn the blocking API into an asynchronous one by executing the actual calls in a dedicated thread-pool, and then wrapping that phony asynchronous API with FiberAsync. This process is so mechanical that FiberAsync has some static methods that take care of everything for us. So assume our service only exposed the blocking FooClient API. To make it fiber-blocking, all we do is:

public class SadFiberFooClient implements FooClient {
private final FooClient client;
private static final ExecutorService FOO_EXECUTOR = Executors.newCachedThreadPool();

public FiberFooClient(FooClient client) {
this.client = client;
}

@Override
@Suspendable
String op(final String arg) throws FooException, InterruptedException {
try {
return FiberAsync.runBlocking(FOO_EXECUTOR, () -> client.op());
} catch(SuspendExecution e) {
throw new AssertionError(e);
}
}
}


This implementation of FooClient is safe to use by both threads and fibers. In fact, when invoked on a plain thread, the method won’t bother dispatching the operation to the supplied thread pool, but execute it on the current thread – just as would happen if we were using the original FooClient implementation.

## Conclusion

The techniques shown here – with FiberAsync and c.p.strands.SettableFuture – are exactly how the integration modules comprising the Comsat project work. Comsat includes integrations for Servlets, JAX-RS (server and client), JDBC, JDBI, jOOQ, MongoDB, Retrofit and Dropwizard.

It is important to see how – to make simple and performant fiber blocking APIs – we’ve indeed re-implemented the API interfaces, but not their inner workings: the original library code is still used, only through its async API, whose ugliness is now hidden from the library consumer.

There are ways other than fibers to deal with callback hell. The best known mechanisms in the JVM world are Scala’s composable futures, RxJava’s observables, and JDK 8’s CompletionStage/CompletableFuture. These are all examples of monads and monadic composition. Monads work, and some people enjoy using them, but I think that they’re the wrong way to go for most programming languages.

You see, monads are borrowed from programming languages based on the lambda calculus. Lambda calculus is a theoretical model of computation, completely different from, yet entirely analogous to the Turing machine. But unlike the Turing machine model, lambda calculus computations don’t have a notion of steps, actions or states. Those computations don’t do anything; they just are. Monads, then, are a way for LC-based languages like Haskell to describe action, state, time etc. as pure computation. They are a way for a LC language to tell the computer “do this and then do that”.

Thing is, imperative languages already have an abstraction for “do this and then do that”, and that abstraction is the thread. Not only that, but imperative languages usually have a very simple notation for “do this and then do that”: The statement for this followed by the statement for that. The only reason imperative languages even consider adopting such a foreign concept is because the implementation of threads (by the OS kernel) is less than satisfactory. But rather than adopt a foreign, unfamiliar concept – and one that requires completely different types of APIs – it’s best to fix the implementation (of threads) rather than adopt a similar, yet subtly different abstraction. Fibers keep the abstraction and fix the implementation.

Another problem with monads in languages like Java and Scala, is that those languages are not only imperative, but allow unrestricted shared-state mutation and side effects – something that Haskell doesn’t have. The combination of unrestricted shared-state mutation and “thread” monads can be disastrous. In a pure FP language – because side-effects are controlled – a unit of computation, namely a function, is also a unit of concurrency: you can safely execute any pair of functions concurrently. This is not the case when you have unrestricted side-effect. The order of function execution, whether or not two functions can be executed simultaneously, and if and when a function can observe the shared-state mutations performed by another, are all serious concerns. As a result, functions running as part of “thread” monads need to either be pure (with no side effects whatsoever) or be very-very careful about how they perform those side effects. This is exactly what we’re trying to avoid. So while monadic compositions do indeed produce much nicer code than callback-hell, they don’t address any of the concurrency issues introduced by asynchronous code.

P.S.

The previous section should not be read as an endorsement of pure “FP” languages like Haskell, because I actually think they introduce too many other problems. I believe the (near) future is imperative languages3 that will allow shared-state mutation but with some transactional semantics. I believe those future languages will take their inspiration mostly from languages like Clojure and Erlang.

## Discuss on Reddit

1. By running I mean threads that are runnable often enough

2. Whether or not they are “functional” is a hard question as no one has come up with a good definition for what a functional programming language is and what differentiates it from non-functional languages.