Phillippe Siclait

Building Raft in Clojure

7 May 2020 • 10 min read

Diego Ongaro and John Ousterhout created Raft, an understandable algorithm for achieving consensus in distributed systems. In this post, I describe how I built an implementation of Raft in Clojure.

Last year, in a conversation about a blockchain startup, a friend suggested that many blockchain projects were poorly designed because their creators had not studied distributed computing. I found this interesting, but I didn’t know enough about distributed systems to understand how a deeper exposure to the field could have led to better system designs. Knowing that his Ph.D. was in distributed systems, I asked him for his recommendation on how I could learn more. He suggested Robert Morris’s class at MIT, 6.824: Distributed Systems, declaring that among introductory courses on the subject, its the best. I asked him, “What makes it the best?” and he replied, “The labs.”

The labs are a series of programming assignments over which students incrementally build a sharded, fault-tolerant key/value store. In addition to the lab assignments—the class schedule, notes, research papers, and video lectures are all available on the course website. Excited to learn, I organized a study group that met weekly at our local Starbucks to review the assigned research papers and discuss how we each coded the labs. The second lab tasked students with building Raft. The instructors wrote the test suite in Go, so writing the labs in Go would have been the path of least resistance. I did not choose the path of least resistance. Instead I wrote the labs in Clojure.

I was taking this class for fun, so I wanted to use a language that made coding enjoyable. And I had so much fun coding this project in Clojure. The language’s interactive REPL gave me a responsive feedback loop for testing code changes. After writing a function, I would send it to the REPL and test it on a few inputs to ensure that it worked as expected.

For example, I wrote a function called find-first-conflict to identify the index where a log, and the proposed entries to be added to the log, conflict.


(defn find-first-conflict
  [prev-index log entries]
  (some
    (fn [index]
      (let [entry-index (- index prev-index 1)]
        (when (and (< entry-index 
                      (count entries))
                   (< prev-index index)
                   (not= (-> (nth entries 
                                  entry-index)
                             :term)
                         (log-term log index)))
          index)))
    (range 1 (inc (count log)))))

Calling the function in the REPL, I was able to verify that the function works.


raft=> (find-first-conflict
         2
         [{:term 1} {:term 1} {:term 2} {:term 2} 
          {:term 2}]
         [{:term 2} {:term 3} {:term 3}])
4

The language's immutable data structures made it simple to ensure referential transparency throughout my code. Referential transparency, coupled with the REPL, enabled easy debugging of large chunks of application logic. Once I’ve presented more detail on the Raft algorithm and the structure of my code, I’ll share an example of how this setup made debugging easy.

I also made heavy use of Clojure’s concurrency library, core.async. This library is a realization of Tony Hoare’s Communicating Sequential Processes (CSP) and is very similar to, and inspired by, Go’s implementation of CSP. I restricted all communication between threads to passing immutable data structures through core.async channels. This choice freed me from having to think about locking and associated deadlock bugs.

The REPL, immutable data structures, and core.async helped me write clean code, but there was a cost to using Clojure. As I mentioned before, the course-provided code was all written in Go. Choosing Clojure added a few hours of work, translating test and library code, before I could start each lab. For a fun personal project, I thought this was a worthwhile tradeoff. Having decided to use Clojure, I was able to begin building the lab. But before describing how I built it, I’ll need to describe the problem that Raft solves.

To begin, imagine that we have a stateful server that responds to requests to update and query its state. If we have a single server then our system is not fault-tolerant; the failure of our single server causes our entire system to become unavailable. We can think of the server as being an instance of a state machine. One solution to the fault-tolerance problem is to replicate our state machine across multiple servers so that our system survives the failure of one or more servers. But this creates another problem—how do we ensure that our set of servers maintains consensus on its state? This is the problem that Raft is built to solve.

I started coding with three principles in mind. First, I built everything around a core event loop that was responsible for all state updates and side effects. The event loop continually pulls operations off the operations channel and processes them sequentially via handlers. Second, I made all of the event handlers referentially transparent. Each handler takes as input the current raft state and the operation, and returns two values, the state-delta and the effects. The state-delta is a map representing state modifications that need to be made and effects is a list of side effects that need to be executed. Side effects include pushing operations onto the operations channel, sending RPCs, replying to RPCs, and modifying timers. Returning the side effects as values, instead of executing them in the handlers, made debugging easier. And third, I restricted all communication between the RPC client and the main event loop to channels.

These three principles—a central event loop, pure function handlers, and communication via channels—are best illustrated by the handle! function. This function is executed by the event loop on each iteration and takes as an argument an operation (op-map) pulled from the operations channel. Given the current raft-state and the operation, it then executes the appropriate handler via the case expression, applies the state-delta, and finally executes the effects. Side effects and mutations are limited to the transition-state! and run-effects! functions. Note that the I use the convention that only functions that end in a bang (!) may mutate state or execute side effects. In the code snippet below, I’ve removed a number of the cases from the case expression for brevity.


(defn handle!
  [raft {:keys [op] :as op-map}]
  (let [{:keys [me ops-ch persister state]} 
        raft

        raft-state                               
        (:raft-state @state)

        [state-delta effects]
        (case [raft-state op]
          [:candidate :election-timeout]
          (start-election raft)
          [:candidate :received-vote]
          (handle-vote raft (:vote op-map))
          [:candidate :append-entries]
          [{:state :follower} [(fn replace-op [] 
                                 (a/put! 
                                   ops-ch 
                                   op-map)])]
          …
          ([:candidate :request-vote]
           [:follower :request-vote]
           [:leader :request-vote])         
          (handle-request-vote raft op-map))

        additional-effects                  
        (transition-state! raft state-delta)

        effects                             
        (concat effects additional-effects)]
    (run-effects! effects)
    :recur))

By following these three principles, I guaranteed that every iteration of the event loop was deterministic and easy to debug, even though the overall application is non-deterministic. The ordering of operations in the operations channel was the sole source of non-determinism. With these basic principles in place, we can dig into the Raft algorithm.

The core of the system is the log. The purpose of each Raft instance’s log is to maintain a total order over a set of operations. The order of operations in the log represents the list of state transitions to be taken by the state machine. To restore the state of the state machine, the operations in the log are applied successively to the state machine, starting from the initial state. Raft guarantees consensus across the state machines by ensuring the same order over committed entries in all of the Raft instance logs.

Raft separates the consensus problem into three components: leader election, log replication, and safety. The authors claim that this decomposition is an important reason for the algorithm’s ease of understanding. In this post, I’m only going to touch on the first two components. The safety topic adds restrictions to leader election and log entry commitment that are necessary for guaranteeing consistency. For information on safety, I’d recommend reading section 5.4 of the extended Raft paper.

Raft instances can be in one of three different states: leader, candidate, or follower. Leader election is the process of selecting a privileged leader server that is responsible for replying to client queries and replicating state to the other Raft instances. By having a single leader at any moment in time, Raft solves the problem of determining which order of operations is correct. The correct order is the order selected by the leader.

The system starts with all instances as followers. Followers each have election timers that trigger elections on completion. When a follower receives a heartbeat from a leader or receives a request to vote in an election, these timers are reset (without triggering an election). To simplify timer management, I created a timekeeper abstraction for each instance. The timekeeper is instantiated with the configuration for each timer and the timers only communicate with the main event loop over the operations channel.


(timekeeper
  {:election
   {:callback   #(a/put! ops-ch 
                         {:op :election-timeout})
    :timeout-fn #(+ min-election-timeout 
                    (rand-int 
                      election-timeout-range))}
   :heartbeat
   {:callback   #(a/put! ops-ch 
                         {:op :send-heartbeat})
    :timeout-fn (constantly 100)}})

Each timer has a timeout function (timeout-fn) instead of a static timeout value so that the election timeouts can be randomized on each invocation. This helps avoid cases where the election timers continually complete at the same time.

When a follower triggers an election, it becomes a candidate and requests votes from its peers via the RequestVote RPC. If a candidate receives votes from a majority of its peers, it transitions to leader. The leader periodically sends a heartbeat message via the AppendEntries RPC to its followers to ensure that they continue to recognize the leader.

In my implementation, the two RPC endpoints are simple. They are only responsible for pushing operations onto the operations channel and reading replies from the reply channel. The main event loop responds to the RPC by using the reply channel provided in the operation map.


(defrecord Raft […]
  IRaft
  (AppendEntries [_ args]
    (let [reply-ch (a/chan)]
      (a/put! ops-ch {:op       :append-entries
                      :payload  args
                      :reply-ch reply-ch})
      (a/<!! reply-ch)))
  (RequestVote [_ args]
    (let [reply-ch (a/chan)]
      (a/put! ops-ch {:op       :request-vote
                      :payload  args
                      :reply-ch reply-ch})
      (a/<!! reply-ch))))

The system enforces consistency across the Raft instance logs via log replication. When the leader receives a request from a client to commit a command to its log, it first adds the command—without commitment—to its log and then broadcasts this command via heartbeats to all its followers. Commitment is an important concept in Raft. Raft only guarantees consistency across replicas for committed commands. If followers successfully add the command to their logs, then they reply to the RPC with success = true. When the leader receives success notifications from a majority of the followers, the leader commits the command to its log and replies to the client. The followers are notified of the leader’s latest commit in the next heartbeat.

As I developed the Raft application, the REPL was a helpful tool for evaluating the handler functions. At any point during development, I could construct a raft state and a request map, pass it to a handler function, and ensure that the response was as expected. For example, I could test sending an AppendEntries RPC request to a follower from an out of date leader and guarantee both that the follower didn’t change its state (state-delta = nil) and that an effect was returned that replies to the RPC with success = false.


raft=> (handle-append-entries raft request)
[nil [#function[raft/handle-append-entries/reply-to-append-entries]]]

There is a lot of detail missing in this post on how the system ensures consensus in the face of failures. But I hope this presents a high level overview of Raft and how the REPL, pure functions, and channels can help with implementing complex concurrent systems. If you’re interested in understanding the Raft algorithm in depth, I’d recommend the extended version of the paper and the Secret Lives of Data visualization.

If you’d like to do the labs in Clojure too, I’ve created a Github repository of the translated Go code so that you don’t have to do the translation yourself. As of June 2020, there’s also a study group on Reddit at r/mit6824clojure.

I’m @siclait on Twitter—reach out to continue the conversation.