The Parallel Universe Blog

January 28, 2014

Writing Interactive Web Applications with Web Actors

By Ron

The actor model defined by Carl Hewitt and then evolved and popularized (well, relatively popularized) by the Erlang programming language, has become a very appropriate choice for designing certain kinds of software. Those groups of isolated processes communicating with one another by passing messages back and forth are an excellent way to program applications that exploit concurrency well for performance and scalability, respond quickly to barrages of realtime events, embrace distributed environments, and recover well from (quite expected) unexpected failures. They do all this while being relatively simple to understand and compose, and they model many problem domains quite naturally.

With many modern web applications becoming more interactive, responding to events as they happen and pushing updates at interactive speeds, the actor model is becoming more and more suitable for programming the web. Especially with new communication models like WebSockets, it seems like actors and the web are made for each other.

Comsat is a new open-source product that integrates Quasar, a lightweight-threads and actors library for the JVM, with web technologies. It can run Servlets and REST services in fibers rather than OS threads for scalability, but it also has an API called Web Actors that turns HTTP-based communications into actor-based interaction. This makes managing state, recovering from failure, and hot code upgrade simple for web applications, while giving them great performance and scalability.

Declaring a Web Actor

Let’s dissect the web actor example, included in the Comsat examples project (the full source code can be found here:

@WebActor(httpUrlPatterns = {"/webactor", "/send1", "/send2", "/sse"}, webSocketUrlPatterns = {"/ws"})
public class MyWebActor extends BasicActor<Object, Void> {
	// ...
}

A web actor is an actor that receives messages from a web client. The actor is attached to an HTTP session, and to as many web sockets as necessary. While it is possible (and in some circumstances, recommended) to spawn a web actor and to attach it to a session manually after some authentication process, it is simple to annotate a class as a web actor, and let Comsat find it, load it, and attach it to every new session automatically (an actors consumes very little resources; it is only about 500 bytes in size). That’s exacly what line 1 does: it declares that the class (which must be an implementation of a Quasar actor) is a web actor that responds to HTTP messages sent to any of the listed four resources, as well as to WebSockets with the given URL pattern. Again, the actor is attached to a single HTTP session.

Currenlty, Comsat runs web actors on Servlet containers like Tomcat and Jetty, but we plan on providing support for lower level servers like Netty and Undertow.

Now let’s look at the main actor loop in the doRun method:

@Override
protected Void doRun() throws InterruptedException, SuspendExecution {
    if (!initialized) {
       // ...
    }

    for (;;) {
        Object message = receive(5000, TimeUnit.MILLISECONDS);
        // -------- HTTP request -------- 
        if (message instanceof HttpRequest) {
        	// ...
        } 
        // -------- WebSocket/SSE opened -------- 
        else if (message instanceof WebStreamOpened) {
            // ...
        } 
        // -------- WebSocket message received -------- 
        else if (message instanceof WebDataMessage) {
            // ...
        } 
        // -------- Timeout -------- 
        else if (message == null) {
            // ...
        }

        checkCodeSwap();
    }
}

After some initialization, we enter an infinite loop in which we receive a message and process it. You can have as many receive calls as you want, and even what is known as “selective receive” where you block until a specific message arrives; this lets you handle messages in the order you’re prepared to, not the order they arrive over the wire, which is useful, say, when getting messages from several sources.

If a message is not received within 500 ms, we run some timeout code that pushes an update to our WebSocket and SSE clients.

Handling a Simple HTTP Request

Now let’s look at how we process a simple HTTP request:

if (message instanceof HttpRequest) {
	HttpRequest msg = (HttpRequest) message;
	// ...
    String response = BlockingCallsExample.executeSomeSql(ds)
            + BlockingCallsExample.callSomeRS(httpClient)
            + BlockingCallsExample.doSleep();

    // reply back to the sender
    msg.getFrom().send(new HttpResponse(self(),
            ok(msg, buildHtml(response, msg, i)).setContentType("text/html")
            .addCookie(cookie("userCookie", "value").build())));
	// ...
}

We build the response after executing a SQL statement, calling some other web service, and sleeping a bit. This is just to show how simple it is to run blocking operations in web actors (and Comsat in general). But these calls do not block the current OS thread – only the fiber running the actor, and you can run hundreds of thousands and even millions of fibers on a single JVM instance. You get the exact same scalability as you would using an asynchronous, callback-based API, but the code stays simple (under the covers this works because Quasar schedules threads with a state of the art work-stealing scheduler, and so can block and unblock fibers much faster than the OS can schedule fullblown threads; read our previous blog posts if you’re interested in the details).

The HTTP request is received as an immutable message, with a from property containing the sender actor. In the case of web actors, the actors sending the web messages represent the web client, but we can interact with them as we would with any actor, as we’ll see later.

To send the response, we simply send an HttpResponse message back to the sender of the request. In this example, we also add a cookie, just for fun.

WebSocket

Now let’s look at WebSockets:

// -------- WebSocket/SSE opened -------- 
else if (message instanceof WebStreamOpened) {
    WebStreamOpened msg = (WebStreamOpened) message;
    watch(msg.getFrom()); // watch the socket's "actor"
    // ...
    listeners.add(p);

    p.send(new WebDataMessage(self(), "Welcome. " + listeners.size() + " listeners"));
} 
// -------- WebSocket message received -------- 
else if (message instanceof WebDataMessage) {
    WebDataMessage msg = (WebDataMessage) message;

    for (SendPort listener : listeners)
        listener.send(new WebDataMessage(self(), "local counter:" + i + " data:" + msg.getStringBody().toUpperCase()));
}

Whenever a WebSocket is opened, our web actor receives a WebStreamOpened message from an actor representing the socket. In line 6 we add that socket as a listener for messages we’ll push to it later, but before that, in line 4, we watch that actor. “Watching” is an important actor mechanism for detecting and responding to failure. If the socket were to close for whatever reason, or some other error should occur over that connection, that virtual actor representing the socket will die, and when it does, actors “watching”, like our own web actor, will receive a message notifying us of the event.

In lines 11 through 15 we’ve received a WebSocket message, and we push a message of our own to all the listeners: web sockets and SSE streams.

Server-Sent Events (SSE)

SSE, or Server-Sent Events, is an HTML5 stnadard, supported by most modern browsers, for pushing discrete messages to the web client, without it sending new HTTP requests for each one. An SSE stream is initiated with an HTTP request; then, each event message is written to the response stream and flushed, only the messages need to be encoded according to the SSE standard.

An SSE stream is initiated by a regular HTTP request. Let’s go back and look at the code we’ve left out in the HTTP request handling section:

if (message instanceof HttpRequest) {
    HttpRequest msg = (HttpRequest) message;
	if (!msg.getRequestURI().endsWith("/sse")) {
		// ...
	    } // -------- request for SSE -------- 
	    else {
	        HttpResponse response = new HttpResponse(self(), SSE.startSSE(msg));
	        msg.getFrom().send(response);
	    }
	}
}

All we do is reply with a SSE.startSSE() message, which will result in a new actor being created, which will send us the WebStreamOpened, just like a WebSocket. Let’s go back and look at the left out pieces:

// -------- WebSocket/SSE opened -------- 
else if (message instanceof WebStreamOpened) {
    WebStreamOpened msg = (WebStreamOpened) message;
    watch(msg.getFrom());

    SendPort<WebDataMessage> p = msg.getFrom();
    
    if (msg instanceof HttpStreamOpened)
        p = wrapAsSSE(p);

    listeners.add(p);
    p.send(new WebDataMessage(self(), "Welcome. " + listeners.size() + " listeners"));
} 

In line 8 we recognize that this is an SSE stream rather then a WebSocket. Now, SSE messages need to be formatted according to the SSE standard, but we still want to treat an SSE listener just like a WebSocket listener in the code, so in line 9 we call the wrapAsSSE method, which applies a mapping transformation to the actor representing the SSE stream (actually, the transformation works on Quasar channels, but actors are also channels):

private SendPort<WebDataMessage> wrapAsSSE(SendPort<WebDataMessage> actor) {
    return Channels.map(actor, new Function<WebDataMessage, WebDataMessage>() {
        public WebDataMessage apply(WebDataMessage f) {
            return new WebDataMessage(f.getFrom(), SSE.event(f.getStringBody()));
        }
    });
}

The SSE.event method in line 4 encodes the message body as a well-formed SSE event message, and that’s all there is to it.

Detecting Failure

We mentioned earlier how we watch those virtual actors for failure. If any WebSocket or SSE actor were to fail (say, the connection has been reset), that actor will die and send us an ExitMessage, which is what is known by Quasar actors as a lifecycle message. Those messages aren’t normally returned by receive but are handled by a special method:

@Override
protected Object handleLifecycleMessage(LifecycleMessage m) {
    if (m instanceof ExitMessage) {
        ExitMessage em = (ExitMessage) m;
        System.out.println("Actor " + em.getActor() + " has died.");
        boolean res = listeners.remove(em.getActor());
        System.out.println((res ? "Successfuly" : "Unsuccessfuly") + " removed listener for actor " + em.getActor());
    }
    return super.handleLifecycleMessage(m);
}

So when a connection fails, the actor representing it will die, our web actor will detect that, and remove the actor from its list of listeners.

Hot Code Swapping

The actor model helps build application that suffer no downtime, but fault-tolerance is only part of the picture, as downtime is often incurred as a result of maintenace operations, like code upgrade. Being able to upgrade code without stopping the application and without losing state is a very useful ability supported by Quasar actors, and therefore by Comsat’s web actors.

Let’s go back and look at the actor’s doRun method:

@Override
protected Void doRun() throws InterruptedException, SuspendExecution {
    if (!initialized) { // necessary for hot code swapping, as this method might be called again after a code swap
       // ...
    }

    for (;;) {
        Object message = receive(5000, TimeUnit.MILLISECONDS);
        
        // ... handle message

        checkCodeSwap();
    }
}

When an actor is at an appropriate state (what that means depends on what the actor is doing and is up to you to decide), it may check if an upgrade has been loaded with the checkCodeSwap() method. If it has, the actor will be replaced with a new version that retains all state (though it’s free to add or remove fields), and the doRun method will get called again on the new version. Other actors are oblivious to the code swap, and no messages will be lost in the process. (More complex actors might choose to make use of behaviors (a term taken from Erlang OTP) that are templates for common types of actors, which include fully automated hot code-swapping with no need to call checkCodeSwap() explicitely)

A new code version is nothing but a simple jar file that contains the upgraded code for whichever classes you wish to upgrade. You can then tell the application to load the upgrade module with a JMX operation (using any JMX console, like VisualVM), or you can designate a directory on the file system that Quasar will watch, and any jar you put in that directory while the application runs will be automatically loaded into the program. You can also roll back upgrades simply by deleting the modules from that directory or by unloading them via JMX.

You can read more about Quasar’s hot code swapping in the documenation.

From an Example to a Real Application

Of course, this is a rather simple example. A more interesting web actor would likely spawn and/or interact with other actors. It is free to forward whatever web messages it receives as they are no different from any other message exchanged between actors.

We’ve only barely touched Quasar’s full capabilities which include supervisor hierarchies for automated fault tolerance, and other Erlang OTP-like functionality, as well as Go-like CSP channels and “reactive extensions”.

To learn more, feel free to browse through the Comsat and Quasar documentation, and to play with the examples.

To see what’s possible with Comsat and Quasar, you can play our simple MMO, Web Spaceships. Every player controls a spaceship that can fight other player- and computer-controlled spaceships. All processing is done on the server, with user actions being sent to the server and updates on the location of the surrounding spaceships sent from the server all through WebSockets. The game will be the topic of a future post, but in the meantime you’re free to examine its the source code here, or start playing:

Web Spaceships screenshot

Join our mailing list

Sign up to receive news and updates.

Tags: ,

comments powered by Disqus