The Parallel Universe Blog

April 14, 2016

Distributed Quasar Actors with Kafka and ZeroMQ

By Fabio

So you’ve got a fancy design using actors, you’ve chosen the JVM and Quasar’s powerful, loyal take on the subject. All wise decisions, but then what are your options for distributing them on a cluster?

Galaxy

Galaxy is a really cool option: a fast in-memory data grid optimized for data locality with replication, optional persistence, a distributed actor registry and even actors migration between nodes! There’s only one caveat: it will take another couple of months before a we release a production-quality, formally-verified, version of Galaxy. The current version of Galaxy is not recommended for production use.

What if we need to go live before that?

Luckily Quasar Actors’ blocking programming model is so straightforward that integrating it with most messaging solutions is a breeze, and in order to demonstrate that let’s do it with two fast, popular and very different ones: Apache Kafka and ØMQ.

The code and the plan

All of the following examples can be found on GitHub, just give a quick look at the short README and you’ll be running them in no time.

There are two examples for each of Kafka and ØMQ:

  • A quick-and-dirty one performing straight publish/poll or send/receive calls from actors.
  • A more elaborate one going through proxy actors that shield your code from the messaging APIs. As a proof that I’m not lying this program uses the same producer and consumer actor classes for both technologies and almost the same bootstrap program.

Kafka

Apache Kafka has seen a steep rise in adoption due to its unique design based on commit logs for durability and consumer groups for parallel message consumption: this combination resulted in a fast, reliable, flexible and scalable broker.

The API includes two flavors of producers, sync and async, and one of consumers (only sync); Comsat includes a community-contributed, fiber-friendly Kafka producer integration.

A Kafka producer handle is thread-safe, performs best when shared and can be obtained and used easily in an actor body (or anywhere else) like so:

final Properties producerConfig = new Properties();
producerConfig.put("bootstrap.servers", "localhost:9092");
producerConfig.put("client.id", "DemoProducer");
producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

try (final FiberKafkaProducer<Integer, byte[]> producer = new FiberKafkaProducer<>(new KafkaProducer<>(producerConfig))) {
     final byte[] myBytes = getMyBytes(); // ...
     final Future<RecordMetaData> res = producer.send(new ProducerRecord<>("MyTopic", i, myBytes));
     res.get(); // Optional, blocks the fiber until the record is persisted; thre's also `producer.flush()`
}

We’re wrapping the KafkaProducer object with Comsat’s FiberKafkaProducer in order to get back a fiber-blocking future.

A consumer handle, however, is not thread-safe 1 and is thread-blocking only:

final Properties producerConfig = new Properties();
consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
consumerConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");

try (final Consumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerConfig)) {
    consumer.subscribe(Collections.singletonList(TOPIC));
    final ConsumerRecords<Integer, byte[]> records = consumer.poll(1000L);
    for (final ConsumerRecord<Integer, byte[]> record : records) {
        final byte[] v = record.value();
        useMyBytes(v); // ...
    }
}

As we don’t want to block the fiber’s underlying thread pool (besides the ones Kafka blocks under the cover – we can’t do much about them), in our actor’s doRun we’ll use instead FiberAsync.runBlocking to feed a fixed-size executor with an async task that will just block the fiber until poll (which will execute in the given pool) returns:

final ExecutorService e = Executors.newFixedThreadPool(2);

try (final Consumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerConfig)) {
    consumer.subscribe(Collections.singletonList(TOPIC));
    final ConsumerRecords<Integer, byte[]> records = call(e, () -> consumer.poll(1000L));
    for (final ConsumerRecord<Integer, byte[]> record : records) {
        final byte[] v = record.value();
        useMyBytes(v); // ...
    }
}

Where call is a utility method defined as follows (it wouldn’t have been necessary if not for this Java compiler bug):

@Suspendable
public static <V> V call(ExecutorService es, Callable<V> c) throws InterruptedException, SuspendExecution {
    try {
        return runBlocking(es, (CheckedCallable<V, Exception>) c::call);
    } catch (final InterruptedException | SuspendExecution e) {
        throw e;
    } catch (final Exception e) {
        throw new RuntimeException(e);
    }
}

In the first complete example we’re sending a thousand serialized messages from a producer actor to a consumer one.

ØMQ

ØMQ (or ZeroMQ) is not a centralized broker solution and is more of a generalization of sockets to various communication patterns (request/reply, pub/sub etc.). In our examples we’re going to use the simplest request-reply pattern. Here’s our new producer code:

try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);
     final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) {
    trgt.connect("tcp://localhost:8000");
    final byte[] myBytes = getMyBytes(); // ...
    trgt.send(baos.toByteArray(), 0 /* flags */)
    trgt.recv(); // Reply, e.g. ACK
}

As you can see the context acts as a socket factory and is passed the number of I/O threads to be used: this is because ØMQ sockets are not connection-bound OS handles but rather a simple front-end to a machinery that will handle connection retry, multiple connections, efficient concurrent I/O and even queuing for you. This is the reason why send calls are almost never blocking and recv call is not an I/O call on a connection but rather a synchronization between your thread and a specialized I/O task that will hand incoming bytes from one or even several connections.

Rather than threads we’ll be blocking fibers in our actors though, so let’s use FiberAsync.runBlocking on read calls and, just in case it blocks, even on send ones:

final ExecutorService ep = Executors.newFixedThreadPool(2);

try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);
     final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) {
    exec(e, () -> trgt.connect("tcp://localhost:8000"));
    final byte[] myBytes = getMyBytes(); // ...
    call(e, trgt.send(myBytes, 0 /* flags */));
    call(e, trgt::recv); // Reply, e.g. ACK
}

Here’s the consumer:

try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);
     final ZMQ.Socket src = zmq.socket(ZMQ.REP)) {
    exec(e, () -> src.bind("tcp://*:8000"));
    final byte[] v = call(e, src::recv);
    exec(e, () -> src.send("ACK"));
    useMyBytes(v); // ...
}

where exec is another utility function, similar to call:

@Suspendable
public static void exec(ExecutorService es, Runnable r) throws InterruptedException, SuspendExecution {
    try {
        runBlocking(es, (CheckedCallable<Void, Exception>) () -> { r.run(); return null; });
    } catch (final InterruptedException | SuspendExecution e) {
        throw e;
    } catch (final Exception e) {
        throw new RuntimeException(e);
    }
}

And here’s the full first example.

Distributing without changing the logic: loose coupling to the rescue

It’s straightforward, isn’t it? There’s something annoying though: we’re dealing with actors on the other side of the network quite differently from local ones. Here are the actors we’d like to write instead, no matter where they’re located nor how they’re connected:

public final class ProducerActor extends BasicActor<Void, Void> {
    private final ActorRef<Msg> target;

    public ProducerActor(ActorRef<Msg> target) {
        this.target = target;
    }

    @Override
    protected final Void doRun() throws InterruptedException, SuspendExecution {
        for (int i = 0; i < MSGS; i++) {
            final Msg m = new Msg(i);
            System.err.println("USER PRODUCER: " + m);
            target.send(m);
        }
        System.err.println("USER PRODUCER: " + EXIT);
        target.send(EXIT);
        return null;
    }
}
public final class ConsumerActor extends BasicActor<Msg, Void> {
    @Override
    protected final Void doRun() throws InterruptedException, SuspendExecution {
        for (;;) {
            final Msg m = receive();
            System.err.println("USER CONSUMER: " + m);
            if (EXIT.equals(m))
                return null;
        }
    }
}

Luckily every actor, no matter what it does, has the same very basic interface: an incoming message queue called mailbox. This means we can insert between two communicating actors as many middle actors, or proxies, as we want and in particular we want a sending proxy that will get messages through our middleware to the destination host, and a receiving proxy there that will grab incoming messages and will put them into the intended destination’s mailbox.

So in our main program we’ll simply provide our ProducerActor with a suitable sending proxy and we’ll let our ConsumerActor receive from a suitable receiving proxy:

final ProducerActor pa = Actor.newActor(ProducerActor.class, getSendingProxy()); // ...
final ConsumerActor ca = Actor.newActor(ConsumerActor.class);
pa.spawn();
System.err.println("USER PRODUCER started");
subscribeToReceivingProxy(ca.spawn()); // ...
System.err.println("USER CONSUMER started");
pa.join();
System.err.println("USER PRODUCER finished");
ca.join();
System.err.println("USER CONSUMER finished");

Let’s see how we can implement these proxies with Kafka first and then with ØMQ.

Kafka actor proxies

A factory of proxy actors will be tied to a specific Kafka topic: this is because a topic can be partitioned in such a way that multiple consumers can read concurrently from it. We want to be able to exploit optimally each topic’s maximum level or concurrency:

/* ... */ KafkaProxies implements AutoCloseable {
    /* ... */ KafkaProxies(String bootstrap, String topic) { /* ... */ }

    // ...
}

Of course we want to use a topic for multiple actors, so sending proxies will specify a recipient actor ID and receiving proxies will forward the message only to user actors bound to that ID:

/* ... */ <M> ActorRef<M> create(String actorID) { /* ... */ }
/* ... */ void drop(ActorRef ref) throws ExecutionException, InterruptedException { /* ... */ }
/* ... */ <M> void subscribe(ActorRef<? super M> consumer, String actorID) { /* ... */ }
/* ... */ void unsubscribe(ActorRef<?> consumer, String actorID) { /* ... */ }

Closing the AutoClosable factory will tell all proxies to terminate and will cleanup book-keeping references:

/* ... */ void close() throws Exception { /* ... */ }

The producer implementation is rather straightforward and uninteresting while there’s a bit more spice to the consumer because it will use Quasar Actors’ selective receive to retain incoming messages in its mailbox until there is at least one subscribed user actor that can consume them:

@Override
protected Void doRun() throws InterruptedException, SuspendExecution {
    //noinspection InfiniteLoopStatement
    for (;;) {
    // Try extracting from queue
    final Object msg = tryReceive((Object m) -> {
        if (EXIT.equals(m))
            return EXIT;
        if (m != null) {
            //noinspection unchecked
            final ProxiedMsg rmsg = (ProxiedMsg) m;
            final List<ActorRef> l = subscribers.get(rmsg.actorID);
            if (l != null) {
                boolean sent = false;
                for (final ActorRef r : l) {
                    //noinspection unchecked
                    r.send(rmsg.payload);
                    sent = true;
                }
                if (sent) // Someone was listening, remove from queue
                    return m;
            }
        }
        return null; // No subscribers (leave in queue) or no messages
    });
    // Something from queue
    if (msg != null) {
        if (EXIT.equals(msg)) {
            return null;
        }
        continue; // Go to next cycle -> precedence to queue
    }

    // Try receiving
    //noinspection Convert2Lambda
    final ConsumerRecords<Void, byte[]> records = call(e, () -> consumer.get().poll(100L));
    for (final ConsumerRecord<Void, byte[]> record : records) {
        final byte[] v = record.value();
        try (final ByteArrayInputStream bis = new ByteArrayInputStream(v);
             final ObjectInputStream ois = new ObjectInputStream(bis)) {

            //noinspection unchecked
            final ProxiedMsg rmsg = (ProxiedMsg) ois.readObject();
            final List<ActorRef> l = subscribers.get(rmsg.actorID);
            if (l != null && l.size() > 0) {
                for (final ActorRef r : l) {
                    //noinspection unchecked
                    r.send(rmsg.payload);
                }
            } else {
                ref().send(rmsg); // Enqueue
            }
        } catch (final IOException | ClassNotFoundException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
}

Since we need to process the mailbox as well, we’re polling Kafka with a small enough timeout. Also note that many actors can subscribe to the same ID and the incoming message will be broadcast to all of them. The number of receiving actor proxies (so, fibers) created per topic, as well as the number of pool threads and Kafka consumer handles (consumer is a thread-local because Kafka consumers are not thread-safe) will be equal to the number of partitions per topic: this allows the receiving throughput to be maximum.

At present this implementation uses Java serialization to convert messages to and from bytes but of course other frameworks such as Kryo can be used.

ØMQ actor proxies

The ØMQ model is fully decentralized: there aren’t brokers nor topics so we can simply equate ØMQ address/endpoint with a set of actors, without using an extra actor ID:

/* ... */ ZeroMQProxies implements AutoCloseable {
    /* ... */ ZeroMQProxies(int ioThreads) { /* ... */ }
    /* ... */ <M> ActorRef<M> to(String trgtZMQAddress) { /* ... */ }
    /* ... */ void drop(String trgtZMQAddress)
    /* ... */ void subscribe(ActorRef<? super M> consumer, String srcZMQEndpoint) { /* ... */ }
    /* ... */ void unsubscribe(ActorRef<?> consumer, String srcZMQEndpoint) { /* ... */ }
    /* ... */ void close() throws Exception { /* ... */ }
}

In this case too, and for the same reason as before, the consumer is a bit interesting minus, luckily, any issues with thread-safety because ØMQ sockets work just fine in multiple threads:

@Override
protected Void doRun() throws InterruptedException, SuspendExecution {
    try(final ZMQ.Socket src = zmq.socket(ZMQ.REP)) {
        System.err.printf("PROXY CONSUMER: binding %s\n", srcZMQEndpoint);
        Util.exec(e, () -> src.bind(srcZMQEndpoint));
        src.setReceiveTimeOut(100);
        //noinspection InfiniteLoopStatement
        for (;;) {
            // Try extracting from queue
            final Object m = tryReceive((Object o) -> {
                if (EXIT.equals(o))
                    return EXIT;
                if (o != null) {
                    //noinspection unchecked
                    final List<ActorRef> l = subscribers.get(srcZMQEndpoint);
                    if (l != null) {
                        boolean sent = false;
                        for (final ActorRef r : l) {
                            //noinspection unchecked
                            r.send(o);
                            sent = true;
                        }
                        if (sent) // Someone was listening, remove from queue
                            return o;
                    }
                }
                return null; // No subscribers (leave in queue) or no messages
            });
            // Something processable is there
            if (m != null) {
                if (EXIT.equals(m)) {
                    return null;
                }
                continue; // Go to next cycle -> precedence to queue
            }

            System.err.println("PROXY CONSUMER: receiving");
            final byte[] msg = Util.call(e, src::recv);
            if (msg != null) {
                System.err.println("PROXY CONSUMER: ACKing");
                Util.exec(e, () -> src.send(ACK));
                final Object o;
                try (final ByteArrayInputStream bis = new ByteArrayInputStream(msg);
                     final ObjectInputStream ois = new ObjectInputStream(bis)) {
                    o = ois.readObject();
                } catch (final IOException | ClassNotFoundException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
                System.err.printf("PROXY CONSUMER: distributing '%s' to %d subscribers\n", o, subscribers.size());
                //noinspection unchecked
                for (final ActorRef s : subscribers.getOrDefault(srcZMQEndpoint, (List<ActorRef>) Collections.EMPTY_LIST))
                    //noinspection unchecked
                    s.send(o);
            } else {
                System.err.println("PROXY CONSUMER: receive timeout");
            }
        }
    }
}

More features

This short writeup has hopefully given a glance of how easy it is to seamlessly interface Quasar’s Actors with messaging solutions due to their nature of straightforward sequential processes; of course it’s possible to go further, for example:

  • Actors lookup and discovery: how do we provide a global actor naming/discovery service? For example Kafka uses ZooKeeper so it’s probably worth leveraging that but ØMQ bets heavily on de-centralization and deliberately doesn’t provide a pre-packaged foundation.
  • Actors failure management: how can we support failure-management links and watches between actors that run in different nodes?
  • Messages routing: how do we dynamically adjust message flows between nodes and actors without changing the logic inside actors?
  • Actors mobility: how do we move the actors to other nodes, for example closer to their message source in order to gain performance or to a location with different security properties?
  • Scalability and fault-tolerance: how to manage the addition, removal, death and partitioning of actor nodes? Distributed IMDGs like Galaxy and broker-based solution like Kafka typically already do that but fabric-level solutions like ØMQ usually don’t.
  • Security: how do we support relevant information security properties?
  • Testing, logging, monitoring: how do we conveniently test, trace and monitor a distributed actors ensemble as a whole?

These topics are the the “hard nut” of distributed systems design and distributed actors in particular, so tackling them effectively can require substantial effort. Galaxy addresses all of them, but Quasar actors provide an SPI that covers some of the above topics and that allows for a tighter integration with distribution technologies. You might also be interested in a comparison between Akka and Quasar+Galaxy that covers many such aspects.

That’s it for now, so have fun with your distributed Quasar actors and leave a note about your journey in the Quasar-Pulsar user group!


  1. Actually it also forbids usage by any threads except the first one.

Join our mailing list

Sign up to receive news and updates.

Tags: ,

comments powered by Disqus