Galaxy’s Networking
Galaxy internals, part III
This is part 3 of a three-part series. Here are part 1 and part 2.
My previous two blog posts detailed Galaxy’s cache-coherence protocol and fault tolerance mechanisms. This last post in the Galaxy internals series will describe the last missing piece: how galaxy nodes communicate over a network.
While Galaxy’s networking, like all Galaxy components, is pluggable and could employ various networking protocols (e.g., Infiniband), the current release uses TCP and UDP for all networking, and it does so using the excellent Netty library.
Communicating with slaves and the central server
As mentioned in my last post, each Galaxy node can be configured to replicate its owned data-items to slave nodes and/or to a central server for the sake of fault-tolerance. These update-packets may be large (as they can contain a potentially large number of modified items), are sent asynchronously most of the time (the node does not need to wait for the replication to complete), and are sent to a small number of machines (just the node’s slave/s and/or one server). Therefore, all replication communication is done via TCP. This is simple, and nothing more will be said about it here.
Peer-to-peer node communication
Communication among peer nodes has very different characteristics from those of data replication. Remember, messages sent among peer nodes are used to request ownership of items, sharing of items, or invalidation of items. They are relatively infrequent because a well-behaved Galaxy application keeps almost all work locally within the nodes; short, because data items are small, and a single message can contain at most one item; require low-latency because most request messages can block an operation until they complete; organized into request-reply pairs, so that acknowledging the receipt of each request and reply separately is wasteful; and finally, sent among a large number of nodes, each can send and receive messages to and from all other nodes. These characteristics make UDP a better choice than TCP for peer-to-peer communication among nodes. Using TCP would have entailed either keeping an open connection from all nodes to all other nodes, or, alternatively, paying the high cost of the TCP handshake frequently. TCP would also acknowledge both request and response. Also, some messages need to be broadcast to all nodes, which would require UDP multicast anyhow.
Using UDP, however, has its costs. It does not guarantee delivery, messages may arrive out of order, and it does not allow messages of unlimited length – all of which are required by Galaxy, so they need to be implemented by Galaxy code as they’re not natively provided by the protocol. All of that is done in the UDPComm class.
While conceptually the simplest component of Galaxy, UDP networking code has turned out to be the most complex, and will probably require some refactoring. Also, it is the component that I put the least thought into so far, so I believe it also has the best potential for further performance improvements. Any suggestion for improving the process outlined below would be greatly appreciated.
Message packets
Messages are grouped together into packets. Each node maintains a peer object for of the other nodes, which keeps the last packet sent to the respective node, and a queue of messages waiting to be sent to it.
Packets are configured to have a maximum size, which should optimally be slightly less than the maximum Ethernet packet size, with some room left for IP and UDP headers (this is why jumbo frames are recommended when using Galaxy). A packet can contain many messages, but a single message cannot be broken up into more than one packet, and this is why all Galaxy items must be smaller than the maximum packet size (and, besides, small items are good in general as they would probably be less contended). If the packet is full, remaining messages are left to wait in the queue.
Galaxy can be configured to wait a little while (usually a few microseconds) for more messages to be sent before transmitting the packet, so that more of them will be packed into the packet, but Galaxy will only wait a short while before transmission so latency does not grow.
Guaranteeing delivery
Galaxy’s internal message-delivery API specifically denotes each message as being either a request or a response (regardless of the message type). Requests and response are handled a little differently when it comes to guaranteeing delivery.
If a packet contains any requests, it will be sent over and over until responses are received; this ensures requests are delivered. Once a response is received, its matching request is removed from the packet and not sent again.
A response is re-sent whenever the request is received, so if the request is received twice (perhaps because the response has not been received by the requesting node), the same response will be sent twice. This ensures responses are always delivered.
A simple mechanism is used to ensure that any message, be it a request or a response, is delivered to Galaxy’s logic only once.
Guaranteeing ordering
All messages sent from node A to node B must be delivered to B in the same order they’ve been sent by A in order to ensure correct ordering of memory operations, which is required for consistency.
All messages in a packet are processed by the receiving node in the order they’re placed in the packet. But what if a sender wants to send a message after a packet has been sent but may not have yet been received? If we were to create a new packet, ordering may be violated because UDP does not guarantee packets are delivered in the order they’re sent. If, on the other hand, we were to wait until all messages in a packet are received before sending a new one, this will unnecessarily increase latency.
Example: putting it all together
The following example will hopefully demonstrate how delivery and ordering guarantees work together (requests are marked Q and responses are marked S, Q-A2 means request #2 originating in node A, while S-B3 means a response to request Q-B3):

Multicasting
Some requests (specifically GET or GETX the first time an item is requested) are multicast to the entire cluster. These requests need to be ordered with all other messages sent to all the nodes. This is achieved as follows. A special message that cannot be added to the regular unicast packet is added to all message queues. Once all the messages in the packet have been acknowledged (all responses have been received), the special message is taken from the queue, and the peer object responsible for communicating with that specific node is put in a special, waiting state. Once all peer objects are waiting, the multicast packet is sent, and then re-sent, until all peer objects receive a response. Once that happens, they are all released, and can continue retrieving unicast messages from their message queues.
In order not to hold up all communication if one or a few nodes fail to respond to the unicast (due to some failure, full buffers etc.), if the number of peer objects still waiting for response falls below a configurable threshold, those peer objects are told to keep re-sending the message, but as a simple unicast message, and then all peer objects are immediately released to continue regular unicast operation.
Conclusion
This concludes the 3-part series on Galaxy’s internals. Future blog posts will discuss distributed data-structures implemented on top of Galaxy and their efficiency.
How Galaxy Handles Failures
Galaxy internals, part II
This is part 2 of a three-part series. Here are part 1 and part 3.
Last time I described how Galaxy, our open-source in-memory data grid, employs a cache-coherence protocol, similar to the ones used to coordinate CPU L1 caches, in order to enforce data consistency in the cluster. One big difference between common hardware protocols and the one used by Galaxy was that Galaxy tries to minimize broadcasting messages by keeping track of all sharers of every item, and by remembering an item’s most recent owner. This is necessary because the number of nodes in the cluster, unlike in most CPUs, can be quite large, and pestering all of them with broadcast messages can seriously hurt performance. Actually, some many-core hardware systems employ a similar technique (called the directory-based approach), only they have to deal with the problem of maintaining the directory in the memory-constrained environment of the L1 cache, while Galaxy doesn’t have this problem.
However, the biggest – and most interesting – difference between the hardware implementations and Galaxy is fault tolerance; hardware doesn’t have any. If a core or the CPU bus fail – the CPU fails, and that’s that. But in a cluster, the story is different. Galaxy, like any robust distributed system, must handle network and node failures. How it does that is the subject of this post.
Timeouts
As explained last time, most Galaxy messages are grouped into request/response pairs. Requests are sent as a result of some API call, and if a response is not received within some known, configurable, duration, the call throws a timeout exception. Timeouts can be thrown by all Galaxy operations, and must be handled by the application (usually, simply by retrying the operation, possible after some delay). Timeouts do not necessarily imply a software or hardware failure. Most commonly they are the result of a deadlock, which could happen when two or more nodes pin more than one item at the same time. In that case, a timeout will cause a transaction on one node to back-off while a conflicting one retries.
A common problem in distributed systems with timeouts is that there is no way of knowing whether the remote operation failed, or perhaps succeeded but the response has been delayed. Galaxy does not have this problem due to the nature of its remote operations: none of them are destructive. In fact, none of them have any effect on the data at all. Remember, all writes (data modifications) requested by a node are carried out locally on the node once it has received ownership of the relevant items. All remote requests do is transfer item ownership or invalidate shared copies. Actually, invalidation requests never time-out; they are retried indefinitely because they can never be involved in a deadlock, and the only reason they could fail is as a result of a node failure, which will eventually be detected and handled as explained later. So the only interesting request which might time-out is GETX (a GET can time-out as well, but that scenario is not that interesting).
If node A wants ownership of an item currently owned by node B, it will send B a GETX message. B will then mark the item as owned by A and respond with a PUTX. What happens if the response is delayed for any reason? Node A’s operation will eventually time-out, and the node will continue to believe that node B is the item’s owner. Node B, however, will think A now owns the item, so it will respond to any additional requests for the item from, say, node C, with a CHNGD_OWNR message. Assuming that message is received by C, it will then try to retrieve the item from A, which will, in turn, respond with yet another CHNGD_OWNR message claiming that B is the owner. If C’s operation does not time-out it will bounce from node B to A and vice-versa, sending its requests to one and then to the other, but eventually (unless a node failure occurs), the PUTX message will arrive at node A (node B will keep re-trying), which will then assume ownership and respond correctly to requests.
Node failures
Galaxy employs third-party software for cluster membership management, shared configuration and failure detection. In particular, it uses either JGroups or Apache ZooKeeper for the job. JGroups and ZooKeeper detect node liveness or failure by means of heartbeat messages recorded at some central location, so they provide a definitive (or, at least, consistent) registry of live or dead nodes.
When they detect a failing node, it is removed from the cluster. It is Galaxy’s, role, however, to ensure that the data stored on the dead node is not lost. This is achieved by redundancy. All data items owned by a node (and are therefore written only by that node) are replicated to slave nodes (each replicating a single master node) and/or to a central server that persists all data in the cluster to secondary storage (disk). Once a node fails, its owned data is served by its slave or by the server. JGroups/ZooKeeper are also used to inform all nodes of the current slave/master status of all other nodes, so that if a slave becomes a master upon its master’s failure, all nodes know which node is the new master.
We must watch out for one thing. As we’ll see shortly, the data served by the server or the slaves following a node failure, while always consistent, may not be as up-to-date as the latest item versions on the failed node. This, and the fact that failing to do so would wreak havoc on item ownership, we must ensure that a node does not respond to any requests if other nodes believe it is dead (messages passed between nodes completely bypass JGroups/ZooKeeper). There is no efficient and 100% way to guarantee that, but we can ensure at a high probability that this does not happen if we set the node’s connection timeout to ZooKeeper/JGroups is smaller than the timeout required for them to detect a node’s failure. This way a node will disconnect from ZooKeepr/JGroups before it is known to be dead, and in that case it will shut itself down.
Data replication
Before I delve into the particulars of slave nodes and the central server, I’d like to describe the general operation of data replication. After each write operation, an item’s latest version is kept in a backup packet, which is periodically sent to the node’s slaves and/or to the central server. This asynchronous replication enables a very high write rate, as writes do not have to wait for backups to complete. However, because writes complete before replication is acknowledged, and, in fact, not every item version is replicated at all – remember, backups are only done periodically – some durability is sacrificed for the sake of reducing latency. A node failure could mean losing, say, all updates made in the last 100ms – if that is the backup period we’ve configured.
But whether or not durability is maintained, we must preserve consistency at all costs, and asynchronous, periodic replication can jeopardize that. If node A has just finished writing version 100 of item X, which is then shared (via a GET message answered by a PUT) by node B, and then node A fails having only backed up version, say, 95 to its slave, future requests for the item (from the slave-turned-master) will yield version 95, but version 100 has already been read, and possibly used by node B to produce some other item’s value, and consistency is irreparably broken.
So, just as we do when waiting for INVACKs, all local operations on the item are allowed to proceed regardless of replication, but once the item is requested by another node, we flush the backup packet and wait until it is acknowledged before responding to the request. (As long as an item’s latest version has not been replicated, it is flagged as modified. The modified flag can be set while the item is either in E state, or even in the O state if a write has been carried out before all sharers have INVACKed, and this is why Galaxy’s cache-coherence protocol does not have a separate M (modified) state as those used by CPU L1 caches.) Even if a node suddenly dies, then, though its last few writes may be lost, data consistency is always maintained.
This general principle is behind several Galaxy optimizations: we allow write operations some slack until an external observer is involved. Because a well-behaved Galaxy application requires inter-node data transfers relatively rarely compared to intra-node operations, such optimizations should result in significant latency gains.
The BackupImpl class assists the Cache class with this replication logic.
Slave nodes
Each node may have zero or more slave nodes replicating its owned items. Only the owned items are replicated – not the shared ones, so when a node dies, the slave selected to replace it as master will not have any shared items. Therefore, when the other nodes detect its death, they will remove it from all of their relevant items’ sharers list. It does not matter if they perform this bookkeeping late, as all INV requests sent to the new master will automatically result in an INVACK response if the shared item is not found (this is Galaxy’s general behavior). Also, because slaves are not informed of items’ sharers, when a slave assumes the master’s role, it assigns all items the E state, so when node death is detected, all of its items’ sharers will invalidate their copies.

The logic outlined so far suffices if a node may have only one slave. If it has more than one, we must watch out for another possible scenario. Say node A has two slaves A1 and A2, and say that the latest backup packet sent from A to its slaves contains, perhaps among other items, version 100 of item X. A1 receives the packet, but then node A fails before A2 receives it. If A1 replaces A as the master, it has no way of knowing that A2 has an older version of X, and if A1 then fails without ever writing X and replicating it to A2, A2 would become the master with an old and – much more importantly – inconsistent value for X (as version 100 may have been used to compute other values). If, on the other hand A2 becomes the master, serves, say, version 95 of X, and then fails, the new master, A1, would now have an inconsistent value for X yet again.

One way of solving this is putting in place a consensus protocol to ensure all slaves agree on all values, but that would be very inefficient. A simpler solution is to perform leader-election among all of A’s slaves when it fails, with each slave advertising the latest packet it has received, electing the most up-to-date slave as the new master, and then pushing the missing updates to the other slaves.
This solution is not yet implemented in Galaxy, and that is why the current version allows a single slave per node (though you can manually start a new slave once the old one assumes the role of the master).
The server
Optionally, and regardless of whether or not slaves are used for availability, it is possible (and recommended) to configure the Galaxy grid to use a special node called, simply, the server. The server is to the peer nodes (as the regular nodes are called) not unlike what main memory (RAM) is to the L1 caches – it holds all the data items in the cluster in a large, though relatively slow, disk storage (a Galaxy server can currently be configured to use either BerkeleyDB or any SQL database as its disk-persistence engine). When a node fails, if it has no slaves, the server can then serve its owned items. In addition, the server’s persistent data-storage can be a desired feature in and of itself.
Just like with the slaves, when a server is present, all nodes periodically send it backup packets, and the entire replication logic remains the same. However, unlike a slave node, the server retains all items from all the nodes, so it must know, at any given time, which node owns what items, so that it will only serve relevant items when a node fails (remember, the server, or a slave, serving items when the owner node is alive can destroy consistency, because replicated data can be somewhat older than live data, so serving both could be catastrophic).
Simply remembering which node sent the backup packet containing the latest item version is not enough because an item can change hands without any new versions being created. So whenever a node gains ownership of an item (when it receives the PUTX message), it informs the server of the transfer by sending it an INV message, but unlike INV messages sent to the item’s sharers, it waits for the server’s response before allowing writes for reasons that will become clear shortly.

If a node then dies and has no slave, the server and all peers mark all of its owned items as owned by the server. If a node cannot locate an item in any of the peers, i.e. no node responds with a PUT to its GET multicast, it will request it from the server.
Another tricky scenario we have to contend with is the following: assume node A owns item X, and it transfers it to node B. Node B then immediately sends an INV message to the server to inform it that X is now owned by B, but before the message is received by the server, node A dies and the server marks its items as owned by the server. Then node C can come along, request the server for ownership of item X, and the server will happily comply, resulting with both B and C believing they’re each X’s lawful owner. Galaxy handles this case by having the server reply with an INV of its own to B’s INV request, rather than with an INVACK, and this is why B must wait for the server’s reply before writing X, to ensure that it is truly the sole owner.

Because the server keeps track of every item’s current ownership, it can serve another role other than a backup mechanism: some cloud environments (most notably Amazon EC2 and Rackspace) do not allow multicasts, so Galaxy can be configured to ask the server for the identity of an item’s owner when it is initially requested, rather than multicasting the request to all peer nodes.
The server’s entire logic is implemented by the MainMemory class.
Special thanks to Henry Robinson of Cloudera for going over Galaxy’s fault-tolerance design, pointing out flaws and suggesting solutions.
How Galaxy Maintains Data Consistency in the Grid
Galaxy internals, part I
This is part 1 of a three-part series. Here are part 2 and part 3.
In my previous blog post I explained Galaxy’s basic design, the rational behind it, and the use-cases where it may be appropriate. In this post, I’d like to explain in detail exactly how Galaxy works, aside from one topic – fault tolerance – which will be explored in a future post.
As mentioned last time, Galaxy employs a software implementation of a cache-coherence protocol, similar to the one used by CPUs to coordinate L1 caches. The purpose of the protocol is to allow any cluster node to read and write any data item while maintaing coherence.
Coherence (also called consistency in the DB world) basically means that any code accessing data items on any of Galaxy’s nodes (the Galaxy grid runs application code on the same nodes holding the data) perceives the entire memory – the collection of all data items – as the being same at any single point in time, meaning that no two nodes can observe the value of item X as being different in a single instant. Consistency is a desired property of a distributed system, because it makes is much easier for distributed code to function when item X has a well defined value at a single point in time, rather than a possible set of values depending on where in the cluster the code runs.
However, it becomes harder to define what “a single point in time” means when information takes time to travel. In any distributed computer system, it takes a non-negligible duration of time for any information to between two nodes, so any change node A makes to item X can only be observed by node B (or any other computer communicating with the cluster) some time later, depending on network latency.
So we’re going to define consistency like this: if the value of item Y changes after that of item X, no code will ever observe X’s old value after observing Y’s new value; i.e. everyone sees all memory update operations occurring in the same order. We could relax this requirement to maintain correct ordering only between updates that we know to be related – i.e. if we know that the value of Y depends somehow on that of X – and not otherwise, but Galaxy does not do that: it maintains complete ordering of all updates (CPUs require costly memory fence operations to enforce memory ordering when writing and reading the same memory address on different threads, but that is because the messages comprising the cache coherence protocol themselves may be received out of order, but Galaxy enforces strict message ordering at no significant cost, so no fences are required).
The cache-coherence protocol
Now let’s delve into the cache-coherence protocol itself. We will do that in two iterations. First, I’ll explain the concepts and approximate workings of the Galaxy’s cache-coherence in a way that will, hopefully, make it easy to understand how consistency is maintained. Then, I’ll explain how Galaxy uses a few tricks to maintain the same invariants in a way the reduces latency. All of the logic explained below is implemented in the class co.paralleluniverse.galaxy.core.Cache.
An instance of a data item residing in a node is called a cache-line in the Galaxy code, after the term used in CPU caches. However, it is neither a line in the sense that, unlike in CPU caches, it does not have a fixed length, nor strictly part of a cache, because all items reside on at least one node, unlike the case in CPUs where only a small portion of memory can be found on any of the caches. Still, Galaxy follows the CPU cache nomenclature because the protocol is basically the same (see this pdf for an overview of CPU cache-coherence protocols.
At any given time, each line can be at one of several states: exclusive (E), owned (O), shared (S) and invalid (I). (For those of you familiar with cache-coherence protocols, the modified (M) state is conspicuously missing. It actually does exist in Galaxy, but it will be discussed in the next post, which will deal with fault-tolerance.)
An item is always owned by exactly one node, and may be shared by many. A line in the O state, also maintains a list of all nodes sharing the item (where the line will be in the S state), and a line in the S state holds the identity of the owning node. If an item has no sharers, then its owner is exclusive, and its line will be in the E state. An invalid line is similar to a non-existent line, except that it keeps the identity of the last known owner.
Nodes exchange messages that change lines’ states. Messages are usually grouped into request/reply pairs, where the request is usually sent when application code running on a node initiates any Galaxy operation.
GET/PUT
When the application performs the get operation, requesting the reading of an item, if the cache-line for the item is not found (on the node running the code), then a blank line is created in the I state, and a GET message with the item id is multicast to the entire cluster. The node owning the item, then replies with a PUT message containing the item’s data, and adds the requesting node to the line’s sharers list. If the line was in the E state, its state is set to O. When the requesting node receives the PUT message, it writes the item’s data into the line, sets the line’s owner, and sets its state to S; then the get operation terminates.

GETX/PUTX
Now let’s assume that an item is shared by a node (the line in the S state) when the application code on that node wishes to update it (by calling set or getx). Only the owner of the item is allowed to modify it, so first the node has to become the item’s owner. To do that, it sends the current owner (a line in the S state knows who its owner is) a GETX message. The current owner then responds with a PUTX message, that contains the current sharers list, and puts the line in the I state, though keeping track of who the new owner is. The requesting node then puts the line in the O state.

INV/INVACK
Before actually writing the item’s new value, the new owner must invalidate all sharers of the item. To see why this is necessary in order to preserve consistency, consider the case when node A is the owner of items X and Y, and node B is a sharer of X. Node A modifies X and later modifies Y. Node B reads X and gets it’s old value because it still has line X in S mode. Then, to read Y (which does not exist in its local store), it sends a GET message to A which, in turn, replies with a PUT, and node B sees Y’s new value, thus violating consistency.
To prevent that, before modifying a line in the owned state, the owner must invalidate all shared copies. It does that by sending INV messages to all sharers, who then change the line’s state from S to I and respond with an INVACK. Once all INVACKs have been received (and and all the nodes removed from the sharers list), the line’s state is changed from O to E (exclusive), and the new value can be written.

CHNGD_OWNER
When a node has no idea who owns an item it is interested in, it multicasts a GET or a GETX message to the entire cluster (optionally, it can query a central server that will be discussed in the next post). This can happen if the node has never accessed the item before, and so its cache line does not exist in the local store, but this event should be rare. It is highly likely that an application suited to run well on Galaxy, will access the same items over and over. In the common case, a node will already have an idea who owns an item. If a line is in the S state, it contains the identity of the current owner. If a line is invalidated, it keeps track of the owner by noting the node that invalidated it. However, in both cases, it is possible that the owner information is out of date. Even if in the S state, it’s possible that ownership of the item has moved to a new node, but the INV from the new owner hasn’t arrived yet. Or, maybe the line is in the I state after being INVed by the new owner, and then ownership changes again, and the node is no longer aware of the change.
In such cases, the node, when sending a GET or a GETX, will contact the last known owner, and if that node is no longer the owner, it will respond with a CHNGD_OWNR message, containing the identity of the current owner, if it knows who it is. In a well-behaved Galaxy application, ownership will not jump around much, so getting a CHNGD_OWNR and trying again without bothering all the nodes with a multicast is a good approach.

Transactions and pinning
When the application wishes to update several items at once or “snapshot” a few items without letting any other node “steal” them while they’re being processed, the items can be pinned to the requesting node. Because we assume that such transactions will be short lived and do not want to increase the chattiness of the protocol, when an item is pinned the node simply holds on to any INV or GETX messages it receives without acting on them. Once the items are released, all pending requests are performed and the responses are sent.
Reducing write latency
We’ve now finished going through the first draft of Galaxy’s implementation, and have shown how Galaxy preserves the consistency requirement by following the cache-coherence protocol. Let’s now see how Galaxy exploits some heuristics to reduce latency.
I’ve explained why it is necessary to wait for a line to be in the E state (i.e. to wait for all sharers’ INVACKs) before modifying the item. But inconsistencies might only occur if other nodes get to see the new value of the item before all previous sharers have been invalidated. If no one else can observe the change, and since we’re guaranteed that all INVACKs will be received eventually (barring node failure) we can go right ahead and perform it. So, once the PUTX message is received and the line is put in the O state, INVs are sent to all sharers, but we do not wait for the INVACKs – we modify the element while still in the O state. However, if any GET/GETX messages from other nodes are received, they are buffered until we get all INVACKs and the line is in the E state. This can reduce latency significantly, because it is quite likely that the current owner will want to do several operations with the item before any other node requests it.
Reducing read latency
Say that our node shares items X and Y, which are then invalidated by their one owner because they are about to be modified. If our node then wants to read the item again, it has to wait for the owner to receive all INVACKs as explained in the previous section before it responds to our GET request with the items’ new values. This might take a while.
But notice that while both lines were in the S state, they were consistent. Now that they are in the I state, their values haven’t changed (on the local node), so while they may no longer be up to date, they should still be consistent according to our definition. If we want to read their values we can still return the old, invalidated, values to the application. It is only when we receive X or Y’s new value, that the other may no longer be consistent with it.
So, when node B invalidates some items in node A and the application on node A wishes to read those items, Galaxy sends GET request(s) to B, but immediately returns the stale, but consistent, values to the application. Only when node B sends a PUT message to A do all previous items invalidated by B are purged and cannot be returned until their new values are received.
Fault tolerance
Naturally, the cache-coherence protocols used to coordinate CPU L1 caches were not designed to be fault-tolerant. If one or more cores, or the CPU bus, fail – the machine fails. But Galaxy runs in a cluster, so it must be able to handle node failures without losing application data.
How Galaxy manages that will be the topic of my next post.
So go check out Galaxy now.

