February 20, 2014

# Reactive (Dataflow) Programming in Java and Clojure with Quasar and Pulsar

##### By Ron

Reactive programming is a programming paradigm that helps programmers process data that flows into the system or is computed by the system asynchronously. Reactive programming achieves this through constructs that organize concurrency into a simple, understandable form. But before we go on to show how reactive programming is used, it’s important to clarify what problem it solves. To do that, we need to better define two concepts that can sometimes be used interchangeably: parallelism and concurrency.

When contrasting the two, parallelism is usually defined as using multiple processing cores (in parallel, of course) to expedite a computation (say, like inverting a matrix). Parallelism is an implementation detail of an algorithm used to solve a problem. It is achieved by splitting the data into small bits and feeding them to as many processors as we have available. When talking about parallelism (sometimes better described as data parallelism), the various software and hardware threads cooperate to solve the problem at hand. Clojure’s reducers and Java 8’s streams provide transparent parallelism as an implementation detail of various data-manipulation operations.

Concurrency, on the other hand, is not a feature of the algorithm used to solve a problem but one of the problem itself (say, like routing all the tweets flowing concurrently into Twitter). Concurrency concerns itself with managing many concurrenct computations, each processing data that can arrive at any time. When dealing with concurrent computations, it is often beneficial to use multiple processing cores as well, only in that case, the various software and hardware threads compete with each other over system resources rather than cooperate. So, to sum up, parallelism: implementation detail, cooperation; concurrency: nature of the domain, competition.

Unlike data parallelism, which can often be completely hidden away from the programmer, concurrency requires the programmer to adopt particular programming paradigms that are required to tackle the problem. Reactive Programming (sometimes also called dataflow programming) is one such paradigm. In this blog post we’ll explore how JVM lightweight threads, combined with reactive constructs, let us work with asynchronous data using Quasar and its Clojure API, Pulsar. Reactive programming can be used either in a traditional, imperative style, or in a functional style. We’ll try both and show how each has its strengths and weaknesses.

The main idea behind reactive programming is explicitly modeling the flow of data through the application in such a way that separates the code from concerns about when its input data arrives. This way, various pieces of data arrive asynchronously, some concurrently, and the code that processes them is oblivious to when they were produced.

Quasar’s reactive constructs achive their simplicity and performance first and foremost through the use of fibers, which are true lightweight threads. Fibers are similar to regular Java threads (managed directly by the OS kernel), only they incur very little RAM and performance overhead, so much so that you needn’t think twice before launching a new fiber. Fibers are so lightweight that you can easily have hundreds of thousands of them, or even millions, running concurrently in a single JVM instance.

While the code here will make use of fibers, it’s important to note that all of Quasar’s reactive constructs work equally well when used on plain Java threads (that’s because fibers and threads are both abstracted by Quasar into a single abstraction called strands).

To make the Java code more concise, we’ll use Java 8 lambdas (supported by the current dev version of Quasar, 0.5.0-SNAPSHOT), but they work equally well with the familiar Java anonymous classes. Also, the code examples below require Quasar/Pulsar 0.5.0 (the development version), but the same effects could be achieved equally well with the 0.4.0 release, albeit with a slightly less convenient API.

## DelayedVals and Promises

The first reactive construct used by Quasar is the DelayedVal. A DelayedVal is a constant value that is computed (once) by the program (or is an input to it), at an unknown time. The DelayedVal is set once at any point in the program’s lifetime, but when you want to use it, you need not concern yourself with whether the value has been set yet or not. If it has not been set, the strand (fiber or thread) trying to read it will block until the value is available (if it sounds like a future, that’s because it is; DelayedVal implements the Future interface).

Here’s a Java example using fibers and DelayedVals:

DelayedVal<Integer> x = new DelayedVal<>();
DelayedVal<Integer> y = new DelayedVal<>();

new Fiber(() -> {
System.out.println("x + y = " + (x.get() + y.get()));
}).start();

new Fiber(() -> {
Strand.sleep(1000);
x.set(5);
}).start();

new Fiber(() -> {
Strand.sleep(400);
y.set(10);
}).start();


Running this code will print x + y = 15. Note how the first fiber simply reads x and y, even though their values are set by two different fibers at two different times. Because we’re using fibers, blocking until the DelayVal is set bears no noticeable overhead at all.

DelayedVals are very similar to Clojure promises, so in Pulsar, DelayedVals are defined with the co.paralleluniverse.pulsar.core/promise function, which follows the same API as clojure.core/promise, only it supports fibers as well as threads. Here’s an example in Clojure:

(use 'co.paralleluniverse.pulsar.core)

(let [v1 (promise)
v2 (promise)
v3 (promise)
v4 (promise)
f1 (spawn-fiber  #(deliver v2 (+ @v1 1)))
t1 (spawn-thread #(deliver v3 (+ @v1 @v2)))
f2 (spawn-fiber  #(deliver v4 (+ @v3 @v2)))]
(sleep 50)
(deliver v1 1)
@v4) ; => 5


Notice how this Clojure example uses fibers as well as regular threads.

But Pulsar’s promises have one additional, handy, feature. If you pass an optional function to promise, a new fiber running that function will be spawned, and the promise will receive the value returned from the function. Here’s an example:

(use 'co.paralleluniverse.pulsar.core)

(let [v0 (promise)
v1 (promise)
v2 (promise #(+ @v1 1))
v3 (promise #(+ @v1 @v2))
v4 (promise #(* (+ @v3 @v2) @v0))]
(sleep 50)
(deliver v1 1)
(deliver v0 2)
@v4) ; => 10


A similar feature has been added to the Java API in the current dev version:

DelayedVal<Integer> x = new DelayedVal<>();
DelayedVal<Integer> y = new DelayedVal<>();
DelayedVal<Integer> z = new DelayedVal<>(() -> x.get() + y.get());

new Fiber(() -> {
System.out.println("x + y = " + z.get());
}).start();

new Fiber(() -> {
Strand.sleep(1000);
x.set(5);
}).start();

new Fiber(() -> {
Strand.sleep(400);
y.set(10);
}).start();


## Channels, Topics and Tickers

Because they can only represent constant values, DelayedVals/promises can only get us so far. We also need the ability to represent values that change over time (I hope Rich Hickey forgives me in using the word “value” in a non-rigorous manner). Quasar channels (modelled after Go channels) can be used to represent a sequence of changing values, with each message sent to the channel corresponding to an update event. But Quasar’s channels are not enough by themselves, as they are usually read by a single consumer, while we’d like to publish the changing value to any interested party. To do that, we’ll use topics. Topics are a special kind of channel that replicates values written to it and sends them to any number of subscribed channels. Subscribers can join and leave the topic at any time.

Let’s look at an example that uses both topics and DelayedVals:

DelayedVal<Integer> x = new DelayedVal<>();
Topic<Integer> t = new Topic<>();

Fiber f1 = new Fiber(() -> {
Channel<Integer> c = t.subscribe(Channels.newChannel(0)); // a regular channel with no internal buffer
for (Integer m; (m = c.receive()) != null;)
System.out.println("=> " + (m + x.get()));
}).start();

Fiber f2 = new Fiber(() -> {
try {
IntChannel c = t.subscribe(Channels.newIntChannel(3)); // a primitive int channel with a buffer of size 3
for (;;)
System.out.println("-> " + (x.get() * c.receiveInt()));
}
}).start();

// some time in the future, we'll set x
new Fiber(() -> {
Strand.sleep(1000);
x.set(5);
}).start();

// also, t changes from time to time
new Fiber(() -> {
for (int i = 0; i < 10; i++) {
Strand.sleep(100);
}
t.close();
}).start();

f1.join();
f2.join();


Just for fun, the first fiber subscribes a regular generic channel to the topic, while the second subscribes a primitive int channel.

Here’s the exact same example in Clojure (remember, we’re still using the imperative style):

(use 'co.paralleluniverse.pulsar.core)
(import 'co.paralleluniverse.strands.channels.ReceivePort$EOFException) (let [x (promise) t (topic) f1 (spawn-fiber (fn [] (let [c (subscribe! t (channel 0))] (loop [] (when-let [m (rcv c)] (println "=>" (+ m @x)) (recur)))))) f2 (spawn-fiber (fn [] (let [c (subscribe! t (int-channel 3))] (try (loop [] (println "->" (* @x (rcv-int c))) (recur)) (catch ReceivePort$EOFException e)))))]
(spawn-fiber (fn []
(sleep 1000)
(deliver x 5)))
(spawn-fiber (fn []
(dotimes [i 10]
(sleep 100)
(snd t (rand-int 100)))
(close! t)))

(join [f1 f2]))


There is one problem with topics, though: as channels may have a limited internal buffer (or even no buffer at all) and their default behavior is to block the producer when the buffer is full, a single slow subscriber might slow down the entire topic (which ensures that all subscribers receive all messages). In fact, the topic’s throughput is determined by that of its slowest subscriber. Sometimes, this behavior is not what we want. We’d rather have slow consumers miss a few updates than slow down everyone else. That’s why we have ticker channels.

A ticker-channel is a channel with a bounded buffer that employs a policy that replaces the oldest value in the buffer with the newly added value if the buffer is full (basically, it’s a circular buffer). A ticker channel may have multiple consumers, each maintaining their own position in the buffer. All consumers will then see the channel’s values in the order they are sent to the channel, but a slow consumer might miss some of them.

Let’s look at a Clojure example (Java code is similar):

(let [t (channel 10 :displace)
x (promise)
f1 (spawn-fiber (fn []
(let [c (ticker-consumer t)]
(loop []
(when-let [m (rcv c)]
(println "=>" (+ m @x))
(recur))))))
f2 (spawn-fiber (fn []
(let [c (ticker-consumer t)]
(loop []
(when-let [m (rcv c)]
(println "->" (+ m @x))
(sleep 500) ; slow consumer
(recur))))))]
(spawn-fiber (fn []
(sleep 1000)
(deliver x 5)))
(spawn-fiber (fn []
(dotimes [i 100]
(sleep 100)
(snd t (rand-int 100)))
(close! t)))

(join [f1 f2]))


The second fiber takes its time, and sleeps for 500ms between channel reads. Nevertheless, the producer and the first channel move along at full speed. The result is that the second fiber simply processes fewer values.

## Propagating errors

Before we move on to the functional style, let’s take a look at a slightly more complex example. Suppose we have a stream of floating-point values flowing into the system, and we’d like to emit their sliding-window running average. We also don’t care if some of the values are lost to the average computation because we want an approximate average, but we don’t want a slow computation to slow down the flow, so we’ll use a ticker channel. Here’s the complete Java example (Clojure code would be similar):

final int PORT = 1234;

DoubleChannel t = Channels.newDoubleChannel(100, Channels.OverflowPolicy.DISPLACE);

// This fiber reads values off a socket and sends them to the ticker channel, t
new Fiber(() -> {
try (FiberServerSocketChannel socket = FiberServerSocketChannel.open().bind(new InetSocketAddress(PORT));
FiberSocketChannel ch = socket.accept()) {
ByteBuffer buf = ByteBuffer.allocateDirect(100);

for (;;) {
buf.clear();
int n = ch.read(buf); // blocks the fiber (but not the OS thread)
assert n == 8; // we assume the message is sent in a single packet

buf.flip();
t.send(buf.getDouble());
}
} catch (Exception e) {
t.close(e); // propagate exception to consumers
}
}).start();

Channel<Double> avg = Channels.newChannel(10);

// This fiber computes a sliding-window running average, and publishes the results to channel avg
final int WINDOW_SIZE = 10;
new Fiber(() -> {
try {
DoubleReceivePort c = Channels.newTickerConsumerFor(t); // primitive int channel with a buffer of size 3
double[] window = new double[WINDOW_SIZE];
long i = 0;
for (;;) {
window[(int) (i++ % WINDOW_SIZE)] = c.receiveDouble();

double sum = 0.0;
for (double x : window)
sum += x;
double m = sum / (Math.min(WINDOW_SIZE, i));

avg.send(m);
}
avg.close();
} catch(ProducerException e) {
avg.close(e.getCause()); // propagate exception to consumers
}
}).start();

// This fiber prints out the running average
new Fiber(() -> {
try {
for (;;) {
if (x == null)
break;
System.out.println("==> " + x);
}
} catch (ProducerException e) {
System.out.println("Producer error: " + e.getCause().getClass().getName() + "(" + e.getCause().getMessage() + ")");
}
}).start();

// This fiber prints out the raw values
new Fiber(() -> {
try {
for (;;) {
if (x == null)
break;
System.out.println("-> " + x);
}
} catch (ProducerException e) {
System.out.println("Producer error: " + e.getCause().getClass().getName() + "(" + e.getCause().getMessage() + ")");
}
}).start();


First, notice how the first fiber uses the familiar Java NIO API, except that the blocking reads simply block the fiber rather than the OS thread (this is accomplished by using asynchronous IO under the covers). Also, note how any IO exception is propagated down the sequence of channels with channel.close(exception). This “close-with-exception” operation (added in 0.5.0) lets us cleanly detect and report errors. If you want to automate recovery from errors, you might want to consider using Quasar actors, which add powerful fault-tolerance capabilities (actors can read and write to channels just like any fiber can).

The average-computing fiber transforms the value stream from one channel and writes them to another. This is a common-enough pattern that we’ve added a simpler API for that (also, just for fun, lets use the new Java 8 stream API to calculate the average). The fiber could also be written like this (propagating errors and closing the output channel are handled for us):

Channels.fiberTransform(Channels.newTickerConsumerFor(t), avg,
(DoubleReceivePort in, SendPort<Double> out) -> {
try {
double[] window = new double[WINDOW_SIZE];
long i = 0;
for (;;) {
window[(int) (i++ % WINDOW_SIZE)] = in.receiveDouble();
out.send(Arrays.stream(window).average().getAsDouble());
}
}
});


Again, we used both primitive (double) channels and generic (auto-boxing) channels – the two can be used interchangeably.

This example, like all previous ones, could have used plain threads insead of fibers (simply changing new Fiber to new Thread everywhere would do the trick), but while threads place a significant burden on the system which might become a serious bottleneck, fibers consume less resources and suffer a much lower scheduling latency than kernel threads.

This imperative-reactive style, based on channels and lightweight threads, is also called Communicating Sequential Processes (CSP). That name describes the same thing but focuses on how it’s coded rather than on what it tries to achieve, namely that the code does not care when values are produced, and is thus separated from complex concurrency concerns. This is the essence of reactive programming.

## Functional Reactive Programming

The previous example may have hinted to the fact that applying transformations to “reactive values” is very common. Functional Reactive Programming, or FRP, takes functional programming’s use of operators (higher-order functions) to transform data and applies it to the reactive programming model.

Here’s a simple Java example using the map operator:

Topic<Integer> t = new Topic<>();

new Fiber(() -> {
for (int i = 0; i < 10; i++) {
Strand.sleep(100);
}
t.close();
}).start();

new Fiber(() -> {
for (Integer m; (m = c.receive()) != null;)
System.out.println("-> " + m);
}).start();

new Fiber(() -> {
(Integer x) -> "my number is: " + x); // transform the channel
for (String m; (m = c.receive()) != null;)
System.out.println("=> " + m);
}).start();


While the first fiber consumes the values as they are published by the topic, the second transforms the integer channel into a string channel using map.

Now let’s look at a more complex Clojure example:

(use 'co.paralleluniverse.pulsar.core)
(require '[co.paralleluniverse.pulsar.rx :as rx])

(let [numbers (topic)
letters (topic)

f (spawn-fiber ; consumer
(fn []
(let [c (->>
(rx/zip (->>
(subscribe! letters (channel 0))
(rx/mapcat #(repeat 3 %)))
(->>
(subscribe! numbers (channel 0))
(rx/filter odd?)
(rx/mapcat #(list % (* 10 %) (* 100 %)))))
(rx/map (fn [[c n]] (str "letter: " c " number: " n))))]
(loop []
(when-let [m (rcv c)]
(println "=> " m )
(recur))))))]
; spawn two producers
(spawn-fiber (fn []
(doseq [x (seq "abcdefghijklmnopqrstuwvxyz")]
(sleep 50)
(snd letters x))
(close! letters)))

(spawn-fiber (fn []
(doseq [x (range 1000)]
(sleep 70)
(snd numbers x))
(close! numbers)))

(join [f]))


Let’s see what’s going on here. The two producer fibers emit a sequence of numbers and the letters of the alphabet on two different topics, at different rates.

The consumer subscribes to both topics, and transforms each. It repeats each letter three times, and turns each odd number (it filters out the even ones) into a sequence of three numbers (the original number, the number multiplied by 10, and the number multiplied by 100). Then it zips the two channels together into a channel of vectors, each containing a letter and a number. Finally, it maps those vectors into a string.

The consumer doesn’t care that the values are produced at different rates. It treats the topic channels as a kind of changing quantities that it can transform without regard to concurrency concerns at all. In a sense, it treats asynchronous streams of data as values to manipulate.

Java 8’s lambdas makes writing functional-style code a viable option. For example, while writing the functional channel transformations in the last example in Java prior to version 8 would have been messy, here is what it looks like Java 8:

// with import static co.paralleluniverse.strands.channels.Channels.*

new Fiber(() -> {
transform(numbers.subscribe(newChannel(0)))
.filter(x -> x % 2 != 0)
.flatmap(x -> toReceivePort(Arrays.asList(x, x * 10, x * 100))),

transform(letters.subscribe(newChannel(0)))

(num, letter) -> "letter: " + letter + " number: " + num);

for (String m; (m = c.receive()) != null;)
System.out.println("-> " + m);
}).start();


Functional reactive programming often makes manipulation and composition of asynchronous data simple and clear. However, FRP relies on functional operators that at times may be cumbersome, and sometimes, if the right operator is missing, writing your own might be hard without knowing the details of how concurrency is treated, especially if the operator has internal state. An example of a transformation with internal state is the running average we showed earlier. Another may be one that simply polls every fifth value:

Channel<Integer> polled = newChannel(0);
Channels.fiberTransform(numbers.subscribe(newChannel(0)), polled,
(ReceivePort<Integer> in, SendPort<Integer> out) -> {
long i = 0;
for (;;) {
if (v == null)
break;
if (i++ % 5 == 0)
out.send(v);
}
});


Or even emits a ticker channel’s values more slowly:

Channel<Integer> polled = newChannel(0);
Channels.fiberTransform(Channels.newTickerConsumerFor(t), polled,
(ReceivePort<Integer> in, SendPort<Integer> out) -> {
Integer v;
while ((v = in.receive()) != null) {
out.send(v);
Strand.sleep(500);
}
});


Performing these transformations with FRP alone requires either just the right kind of pre-built operator, or the use of a cumbersome state monad. But if imperative RP is available to us, we can use it to complement the functional style.

There are certainly times when the functional style is clearer, and times when the imperative style is much simpler. Imperative code can also be used to write new functional stateful operators. Sometimes people are forced to use the functional style to avoid blocking, but because lightweight threads make blocking cheap, we can choose the right tool for the job.

## Reactive UI

Most UI toolkits, like Android or even OpenGL (well, not strictly a UI toolkit), require that all updates to the rendered graphics occur in a special UI thread. You’d think that we can’t directly use imperative reactive programming as we’ve done so far on the UI thread because we’re not allowed to block the UI thread. While we can’t directly read DelayedVals or channels on the UI thread, we can actually do something better: we can schedule fibers to execute on the UI thread. Those fibers can block or sleep all they want without blocking the UI thread underneath (this capability was added in the latest Quasar release, but it was buggy, and has since been fixed).

While Android support for Quasar is planned (probably in version 0.7.0), Quasar does not support Android just yet. So to demonstrate reactive UI, we’ll use Swing:

public static void main(String[] args) throws Exception {
// first, let's create the Swing UI:

final JFrame frame = new JFrame("Reactive Window"); // a window
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
final JPanel panel = new JPanel();
final JLabel label1 = new JLabel(); // label 1
final JLabel label2 = new JLabel(); // label 2
label1.setFont(label2.getFont().deriveFont(100.0f)); // large font
label2.setFont(label2.getFont().deriveFont(100.0f));

SwingUtilities.invokeLater(new Runnable() {
@Override
public void run() {
frame.setMinimumSize(new Dimension(600, 300));
frame.pack();
frame.setVisible(true);
}
});

// Now we get to the reactive stuff:

Channel<Integer> t = Channels.newChannel(1, Channels.OverflowPolicy.DISPLACE);
DelayedVal<String> x = new DelayedVal<>();

// publish lots of numbers:
new Fiber(() -> {
for (int i = 0;i < 100000; i++) {
Strand.sleep(10);
t.send(i);
}
}).start();

// We'll make x available only 5 seconds from now:
new Fiber(() -> {
Strand.sleep(5, TimeUnit.SECONDS);
x.set("!!!");
}).start();

// Now let's create a fiber scheduler that runs fibers on the UI thread
FiberScheduler UIScheduler = new FiberExecutorScheduler("UI-fiber-scheduler", new Executor() {
@Override
public void execute(Runnable command) {
EventQueue.invokeLater(command);
}
});

// set text for the first label
new Fiber(UIScheduler, () -> {
Integer num;
while ((num = c.receive()) != null) {

label1.setText("foo: " + num);
Strand.sleep(100); // ... yet we can sleep
}
}).start();

new Fiber(UIScheduler, () -> {
Integer num;
while ((num = c.receive()) != null) {
// we block until x is available (in 5 seconds), even though we're on the UI thread
label2.setText("bar: " + num + x.get());
Strand.sleep(500); // update the second label more slowly
}
}).start();
}


Here’s a screenshot (taken after 5 seconds, once the second label’s text was ready):

The two UI fibers loop, block and sleep without affecting the responsiveness of the UI at all (and yes, we could also perform length fiber-blocking IO in those UI fibers). Such is the power of lightweight threads combined with reactive programming!

## Other JVM Reactive Programming Libraries

Netflix’s RxJava, based on .Net’s Reactive Extensions is a functional (only) reactive programming library using “observables” as the core construct. It has a large selection of functional operators and specicialized APIs for various JVM languages. rxjava-quasar is a (yet unreleased) integration module for RxJava and Quasar. It provides dead-simple conversions from Quasar channels to Observables and vice-versa, as well as a scheduler that runs the functional operators inside Quasar fibers.

core.async is a Clojure-only reactive programming library modelled after Go channels, which supports both functional and imperative styles. It has several limitations due to the fact that its fiber-like construct, the “go block”, is not a true lightweight thread. On the other hand it is able to run not only on the JVM, but also in the browser using Clojure’s JavaScript implementation, ClojureScript. Also, it’s a very nice API. Pulsar provides the core.async API, as well (all the important stuff is there, and we’ll support the whole thing soon). Pulsar’s core.async implementation is fully interchangeable with Pulsar’s “native” API, has better performance than the original implementation, removes the mentioned limitations (for example, the !< and !!< operations, and in fact all single-bang/double-bang operators, are interchangeable), and interoperates with any JVM language making use of Quasar.

## Conclusion

We’ve seen how reactive programming helps tackle concurrency by isolating the computations from any concerns of when their input data arrives, and how both functional and imperative code can make use of reactive programming. We’ve also shown examples of the reactive constructs provided by Quasar and Pulsar, and how they can be used. Whether you use reactive programming in the imperative or the functional style (or, my personal preference, a combination of both), I hope you appreciate how it helps process asynchronous data and distribute it throught the application in clear, simple flows.

This was just a taste of Quasar’s capabilities. Quasar and Pulsar also have a rich and very Erlang-like distributed actor system; Quasar fibers provide extremely scalable blocking IO operations, useful in themselves even without reactive programming; Comsat uses Quasar fibers and fiber-blocking IO to scale JVM web applications, while keeping the same familiar, standard APIs.

The next version of Quasar/Pulsar, 0.5.0 will be released in time for the general availability of Java 8 about a month from today, but you could start using the latest realease or the development version right away. So go browse the Quasar and Pulsar documentation, and feel free to fork their GitHub repos right now: https://github.com/puniverse/quasar , https://github.com/puniverse/pulsar