The Parallel Universe Blog

July 26, 2012

How Galaxy Maintains Data Consistency in the Grid

By Ron

Galaxy internals, part I

This is part 1 of a three-part series. Here are part 2 and part 3.

In my previous blog post I explained Galaxy’s basic design, the rational behind it, and the use-cases where it may be appropriate. In this post, I’d like to explain in detail exactly how Galaxy works, aside from one topic – fault tolerance – which will be explored in a future post.

As mentioned last time, Galaxy employs a software implementation of a cache-coherence protocol, similar to the one used by CPUs to coordinate L1 caches. The purpose of the protocol is to allow any cluster node to read and write any data item while maintaing coherence

Coherence (also called consistency in the DB world) basically means that any code accessing data items on any of Galaxy’s nodes (the Galaxy grid runs application code on the same nodes holding the data) perceives the entire memory – the collection of all data items – as the being same at any single point in time, meaning that no two nodes can observe the value of item X as being different in a single instant. Consistency is a desired property of a distributed system, because it makes is much easier for distributed code to function when item X has a well defined value at a single point in time, rather than a possible set of values depending on where in the cluster the code runs. 

However, it becomes harder to define what “a single point in time” means when information takes time to travel. In any distributed computer system, it takes a non-negligible duration of time for any information to travel between two nodes, so any change node A makes to item X can only be observed by node B (or any other computer communicating with the cluster) some time later, depending on network latency. 

So we’re going to define consistency like this: if the value of item Y changes after that of item X, no code will ever observe X’s old value after observing Y’s new value; i.e. everyone sees all memory update operations occurring in the same order. We could relax this requirement to maintain correct ordering only between updates that we know to be related – i.e. if we know that the value of Y depends somehow on that of X – and not otherwise, but Galaxy does not do that: it maintains complete ordering of all updates (CPUs require costly memory fence operations to enforce memory ordering when writing and reading the same memory address on different threads, but that is because the messages comprising the cache coherence protocol themselves may be received out of order, but Galaxy enforces strict message ordering at no significant cost, so no fences are required).

The cache-coherence protocol

Now let’s delve into the cache-coherence protocol itself. We will do that in two iterations. First, I’ll explain the concepts and approximate workings of the Galaxy’s cache-coherence in a way that will, hopefully, make it easy to understand how consistency is maintained. Then, I’ll explain how Galaxy uses a few tricks to maintain the same invariants in a way the reduces latency. All of the logic explained below is implemented in the class co.paralleluniverse.galaxy.core.Cache.

An instance of a data item residing in a node is called a cache-line in the Galaxy code, after the term used in CPU caches. However, it is neither a line in the sense that, unlike in CPU caches, it does not have a fixed length, nor strictly part of a cache, because all items reside on at least one node, unlike the case in CPUs where only a small portion of memory can be found on any of the caches. Still, Galaxy follows the CPU cache nomenclature because the protocol is basically the same (see this pdf for an overview of CPU cache-coherence protocols). 

At any given time, each line can be at one of several states: exclusive (E), owned (O), shared (S) and invalid (I). (For those of you familiar with cache-coherence protocols, the modified (M) state is conspicuously missing. It actually does exist in Galaxy, but it will be discussed in the next post, which will deal with fault-tolerance.)

An item is always owned by exactly one node, and may be shared by many. A line in the O state, also maintains a list of all nodes sharing the item (where the line will be in the S state), and a line in the S state holds the identity of the owning node. If an item has no sharers, then its owner is exclusive, and its line will be in the E state. An invalid line is similar to a non-existent line, except that it keeps the identity of the last known owner.

Nodes exchange messages that change lines’ states. Messages are usually grouped into request/reply pairs, where the request is usually sent when application code running on a node initiates any Galaxy operation.


When the application performs the get operation, requesting the reading of an item, if the cache-line for the item is not found (on the node running the code), then a blank line is created in the I state, and a GET message with the item id is multicast to the entire cluster. The node owning the item, then replies with a PUT message containing the item’s data, and adds the requesting node to the line’s sharers list. If the line was in the E state, its state is set to O. When the requesting node receives the PUT message, it writes the item’s data into the line, sets the line’s owner, and sets its state to S; then the get operation terminates.



Now let’s assume that an item is shared by a node (the line in the S state) when the application code on that node wishes to update it (by calling set or getx). Only the owner of the item is allowed to modify it, so first the node has to become the item’s owner. To do that, it sends the current owner (a line in the S state knows who its owner is) a GETX message. The current owner then responds with a PUTX message, that contains the current sharers list, and puts the line in the I state, though keeping track of who the new owner is. The requesting node then puts the line in the O state.



Before actually writing the item’s new value, the new owner must invalidate all sharers of the item. To see why this is necessary in order to preserve consistency, consider the case when node A is the owner of items X and Y, and node B is a sharer of X. Node A modifies X and later modifies Y. Node B reads X and gets it’s old value because it still has line X in S mode. Then, to read Y (which does not exist in its local store), it sends a GET message to A which, in turn, replies with a PUT, and node B sees Y’s new value, thus violating consistency.

To prevent that, before modifying a line in the owned state, the owner must invalidate all shared copies. It does that by sending INV messages to all sharers, who then change the line’s state from S to I and respond with an INVACK. Once all INVACKs have been received (and and all the nodes removed from the sharers list), the line’s state is changed from O to E (exclusive), and the new value can be written.

figure 3


When a node has no idea who owns an item it is interested in, it multicasts a GET or a GETX message to the entire cluster (optionally, it can query a central server that will be discussed in the next post). This can happen if the node has never accessed the item before, and so its cache line does not exist in the local store, but this event should be rare. It is highly likely that an application suited to run well on Galaxy, will access the same items over and over. In the common case, a node will already have an idea who owns an item. If a line is in the S state, it contains the identity of the current owner. If a line is invalidated, it keeps track of the owner by noting the node that invalidated it. However, in both cases, it is possible that the owner information is out of date. Even if in the S state, it’s possible that ownership of the item has moved to a new node, but the INV from the new owner hasn’t arrived yet. Or, maybe the line is in the I state after being INVed by the new owner, and then ownership changes again, and the node is no longer aware of the change. 

In such cases, the node, when sending a GET or a GETX, will contact the last known owner, and if that node is no longer the owner, it will respond with a CHNGD_OWNR message, containing the identity of the current owner, if it knows who it is. In a well-behaved Galaxy application, ownership will not jump around much, so getting a CHNGD_OWNR and trying again without bothering all the nodes with a multicast is a good approach.

Figure 4

Transactions and pinning

When the application wishes to update several items at once or “snapshot” a few items without letting any other node “steal” them while they’re being processed, the items can be pinned to the requesting node. Because we assume that such transactions will be short lived and do not want to increase the chattiness of the protocol, when an item is pinned  the node simply holds on to any INV or GETX messages it receives without acting on them. Once the items are released, all pending requests are performed and the responses are sent. 

Reducing write latency

We’ve now finished going through the first draft of Galaxy’s implementation, and have shown how Galaxy preserves the consistency requirement by following the cache-coherence protocol. Let’s now see how Galaxy exploits some heuristics to reduce latency.

I’ve explained why it is necessary to wait for a line to be in the E state (i.e. to wait for all sharers’ INVACKs) before modifying the item. But inconsistencies might only occur if other nodes get to see the new value of the item before all previous sharers have been invalidated. If no one else can observe the change, and since we’re guaranteed that all INVACKs will be received eventually (barring node failure) we can go right ahead and perform it. So, once the PUTX message is received and the line is put in the O state, INVs are sent to all sharers, but we do not wait for the INVACKs – we modify the element while still in the O state. However, if any GET/GETX messages from other nodes are received, they are buffered until we get all INVACKs and the line is in the E state. This can reduce latency significantly, because it is quite likely that the current owner will want to do several operations with the item before any other node requests it.

Reducing read latency

Say that our node shares items X and Y, which are then invalidated by their one owner because they are about to be modified. If our node then wants to read the item again, it has to wait for the owner to receive all INVACKs as explained in the previous section before it responds to our GET request with the items’ new values. This might take a while.

But notice that while both lines were in the S state, they were consistent. Now that they are in the I state, their values haven’t changed (on the local node), so while they may no longer be up to date, they should still be consistent according to our definition. If we want to read their values we can still return the old, invalidated, values to the application. It is only when we receive X or Y’s new value, that the other may no longer be consistent with it.

So, when node B invalidates some items in node A and the application on node A wishes to read those items, Galaxy sends GET request(s) to B, but immediately returns the stale, but consistent, values to the application. Only when node B sends a PUT message to A do all previous items invalidated by B are purged and cannot be returned until their new values are received.

Fault tolerance

Naturally, the cache-coherence protocols used to coordinate CPU L1 caches were not designed to be fault-tolerant. If one or more cores, or the CPU bus, fail – the machine fails. But Galaxy runs in a cluster, so it must be able to handle node failures without losing application data.

How Galaxy manages that will be the topic of my next post.

So go check out Galaxy now.

Join our mailing list

Sign up to receive news and updates.

Tags: ,

comments powered by Disqus