Distributed Systems in Haskell

TL;DR:

  1. Only ever block if there are no messages whatsoever waiting for your server.
  2. Don’t use interrupt-based timeouts.
  3. Separate your server logic and any networking.
  4. Try to have pure server logic.
  5. Use Monads to simplify your code as it gets bigger.
  6. Use Cloud Haskell and Lenses and other nice libraries to simplify your life and your code.

Haskell-nonspecific Advice

1: Do Not Block

Every time we used multiple blocking reads from the network, it came back to haunt us. For example, we would send a Ping and wait for a Pong before continuing. This leads to all sorts of bad behavior. What happens if both serversPing each other at the same time? Deadlock. Even blocking reads that seemed innocuous at first usually led to confusion and race conditions later on.

Instead, use an asynchronous architecture. Your program should block in exactly one place. Each node in the system should have a “superloop” that performs blocking reads from the network using an efficient epoll-like mechanism and then dispatches messages appropriately.

Why?

It may seem like this architecture introduces uneccesary logical complexity compared to a bit of blocking sprinkled throughout the code, but in every instance we came across, blocking in exactly one place turned out to be much easier in practice. We eliminated all race conditions we came across and maximized performance by eliminating any unneccesary delays. A server that only blocks in one place is guaranteed to process any waiting message the moment it has free CPU cycles.

2: Use Asynchronous Timing

Some algorithms (especially probabilistic ones) rely on things like timeouts.

In our experience, implementing timeout and other time-based behavior as a blocking read with a timeout is a recipe for confusion.

Instead, we found that the best approach was to spawn, for each node, a separate “tick generator” thread. This “tick generator” simply sends its parent thread an empty Tick message at a given frequency.

Why?

There are several advantages to this approach.

  1. All timeout logic is handled with standard run-of-the-mill server logic. There are no interrupts, timers, or exceptions to deal with. You just process Tickmessages like any other message.
  2. You need to keep track of separate timeouts for separate servers. This approach vastly simplifies doing so. You just keep a map from servers to the number of Ticks since you last heard from them. If this number gets too high, it’s a timeout. Compare this to any solution that involves reads with timeouts or checking system time. In our experience, this was much simpler.
  3. This architecture unifies two different timeout scenarios. One scenario is a complete loss of incoming messages, which normally has to be dealt with by using a blocking read with a timeout. The other scenario is when the server still receives messages on a regular basis, but doesn’t receive messages from a particular server for a long time. Blocking reads with timeouts won’t cover the latter failure case. Ticks cleanly handle both cases.

3: Separate Networking and Logic

This is a somewhat specific architectural recommendation; no doubt there are algorithms where this advice does not apply. However, in all our use cases, this approach worked very well (and we tried quite a few approaches).

Distributed systems papers are often as poorly written as they are clever. The included code rarely works properly, if at all. One bad habit that these papers tend to have is the thorough mixing of network operations and algorithm logic. It’s pretty common to see things along the lines of

send(server,msg);
x = receive_msg();
y = process(x);
send(server,y);

but with a lot more junk thrown in.

It turns out that this is not conducive to clean, understandable code. There’s a lot of implicit state being offloaded to the network when you structure things like this, and it makes it a lot harder to recover from things like network interruptions or servers going offline. You end up using timeouts and all sorts of ugly constructs to make things work in practice.

Instead, you should completely separate your server logic and your network functionality. Again, this might sound like a lot of work, but it’s almost guaranteed to save you more time in the long run.

In Haskell terms, your server logic will (ideally) have a type like this:

serverStep :: Config -> State -> Message -> (State, [Message])

In prose, the server logic takes three arguments:

  • The server’s configuration, which does not change (Hostname, directory, etc.)
  • The server’s previous state
  • A message received from the network

The server logic then returns

  • The new server state
  • A list of messages to send

Then you just have to write a simple wrapper around this function that receives messages from the network, feeds them into the function, and sends the responses out to the network.

With a bit of work, any program requiring a sequence of sends and receives can be transformed into this form (a single receive followed by arbitrarily many sends), so even an the most stubbornly ugly distributed paper can be adapted to this form.

Why?

  1. This form guarantees that you meet suggestion #1 and get all the advantages of doing so. In particular, your server will never block unless there is nothing in the incoming message queue. Therefore, your server will process any incoming messages the instant it has free CPU cycles.
  2. Network code is simpler. There’s just one place you send and receive messages, and it’s very straightforward to implement.
  3. Testing is much easier. When your server logic is a pure function as described above, server behavior is entirely deterministic and much more amenable to testing. It’s easy to build a test harness that “simulates” the network. All you have to do is keep a list of all your servers’ states and a queue of messages waiting to be delivered. A test harness looks like this:
while queue is not empty:
   pop msg off queue
   (new_state, new_msgs) = serverStep configs[msg.dest] states[msg.dest] msg
   states[msg.dest] = new_state
   put new_msgs into queue

If you want to test things like out-of-order message delivery, you just mix up your queue instead of putting messages in in order. You have complete control!

When we implemented Paxos (with some pedagogical shortcuts), this approach saved us a lot of trouble. We would run the simulation and check the Paxos invariants at each step. We found a lot of bugs this way! This would be very difficult on a real network.

http://yager.io/Distributed/Distributed.html

Advertisements