The Parallel Universe Blog

June 25, 2015

Quasar and Reactive Streams

By Ron

Today we released Quasar and Pulsar 0.7.2, just a few short weeks after the 0.7.0 release. This is mostly a bug fix release, but it includes some improvements, too.

We’ve added a new instrumentation optimization, which will improve performance even further, and developers writing Quasar integration modules will love the much improved instrumentation verification: it is now more precise, it checks call sites and it prints extended stack trace information on recent HotSpot JVMs; in addition it works even without verification enabled (and without performance penalty) when an exception is thrown in a fiber and is uncaught. This work brings us a step closer to fully automatic and transparent instrumentation – in the Java 9 timeframe – which would reduce the cost of adopting fibers one the JVM to zero. Pulsar now has a Clojure FSM actor API that missed the 0.7.0 deadline, and we fixed a bug that occurred when using Pulsar in an AOT-compiled application. Finally, this release also introduces a brand new feature: Reactive Streams support.

Reactive Streams is the name of a new (non-JCP) JVM standard for an API that facilitates interoperation among various libraries for asynchronous IO streams, including RxJava, Akka Streams, Pivotal Reactor and now Quasar. The standard allows code using any of the compliant libraries to interoperate with code written using any of the other.

Quasar’s quasar-reactive-streams artifact contains a full, TCK-compliant implementation of Reactive Streams, which converts streams to Quasar channels and vice versa. The implementation contains a single public class, co.paralleluniverse.strands.channels.reactivestreams.ReactiveStreams, with a set of static methods that perform the conversion. The ReactiveStreams class Javadoc) has all the details.

The following example demonstrates how to implement a Reactive Stream publisher, subscriber and processor with Quasar channels (obviously, as Reactive Streams is an interoperation API, you’d normally just implement one of the roles using Quasar to interoperate with code using, say, RxJava):

// Publisher
final Channel<Integer> publisherChannel = Channels.newChannel(random() ? 0 : 5, OverflowPolicy.BLOCK);
final Strand publisherStrand = new Fiber<Void>(() ->
    for (long i = 0; i < 10000; i++)
        publisherChannel.send((int) (i % 1000));

    publisherChannel.close();
}).start();

final Publisher<Integer> publisher = ReactiveStreams.toPublisher(publisherChannel);

// Processor
final Processor<Integer, Integer> processor = ReactiveStreams.toProcessor(5, OverflowPolicy.BLOCK,
  (in, out) -> {
        long count = 0;
        for (Integer element; ((element = in.receive()) != null); count++) {
            out.send(element * 10);
            out.send(element * 100);
            Fiber.sleep(1); // just for fun; RS doesn't allow blocking but Quasar can block as much as you like
        }
        out.close();
    });
publisher.subscribe(processor);

// Subscriber
final ReceivePort<Integer> subscriberChannel = ReactiveStreams.subscribe(buffer, overflowPolicy, processor);
final Strand subscriberStrand = new Fiber<Void>(() -> {
    long count = 0;
    for (;;) {
        Integer x = subscriberChannel.receive();
        if (x == null)
            break;

        Fiber.sleep(5); // we can sleep here, too. RS will propagate backpressure up to publisher
        assert x % 10 == 0;
        if (count % 2 != 0)
            assert x % 100 == 0;

        count++;
    }
    subscriberChannel.close();
}).start();

With this support of Reactive Streams, we will be dropping RxQuasar – the RxJava interoperation module – as it is now superfluous. All RxJava-Quasar interoperation should be done with Reactive Streams.

Join our mailing list

Sign up to receive news and updates.

Tags:

comments powered by Disqus