Why do we need a Consensus Algorithm?

In any practical distributed system, we need to tolerate failures. One general approach to tolerate failures is to have redundant copies of the data which are consistent in separate nodes. This system can be modeled as a Replicated State Machine. In a replicated state machine, we have a collection of servers has the identical copies of the data. Replicated State Machine is implemented using a replicated log, where the logs are copied over to multiple servers. The logs in servers are identical in both data and order. These logs are executed in sequence by the state machine in all servers, in turn ensuring an identical state across all servers. The consensus algorithm is responsible for ensuring that the replicated logs are identical in data and order across all servers.

Raft is a consensus algorithm with a focus on ease of understanding. Please take a look at Raft’s paper. I highly recommend reading it to be familiar with Raft, and it also offers reasoning behind the design decisions gone in to Raft. I will reference diagrams and text from the paper. Also, a nice visualization of the whole Raft processes via animations can be found here.

The raft consensus algorithm

Raft is a leader-based consensus algorithm. Here the leader is responsible for the replicated log.

The Leader approach lets Raft decompose the consensus problem in to three separate problems. 1. Leader Election -> Process for leader election 2. Log Replication -> Process for taking data from clients and replicating it to other servers 3. Log Consistency -> Making sure all the servers have identical logs

Raft Paper has a condensed summary of the algorithm, which can be seen below. Raft Algorithm

Once we translate the above algorithm to code, we will have a non-prod version of Raft ready :) So let’s get started.

Implementing Raft in Code

Note: We will use the naming in code exactly like on the above page to avoid any confusion. We will also use comments from the algorithm page for corresponding code to make reading the code easier.

Let’s start with the data structures for state.

type Log struct {
	Command string
	Term    int
	Content string
}
// Updated on stable storage before responding to RPCs. Should be durable
type PersistentState struct {
	// latest term server has seen. Starts at 0 and increases monotonically
	currentTerm int
	// CandidateId that received vote from this node in current term (null if none)
	votedFor    int
	// Log Entries which contains command for state machine, along with Term when it was received.
	log         map[int]Log
}

// Volatile state on all servers
type VolatileState struct {
    // Index of the highest log entry that is committed. Starts at 0 and increases monotonically	
	commitIndex int
	// Index of the highest log entry that is applied to state machine. Starts at 0 and increases monotonically
	lastApplied int
}

// Volatile state on Leader, reset after each election
type VolatileLeaderState struct {
	// for each server, index of the next log entry to send to that server (initialized to leader last log index + 1)
	nextIndex  map[int]int
	// for each server, index of highest log entry known to be replicated on server (initialized to 0, increases monotonically)
	matchIndex map[int]int
}

Similarly, let’s introduce data structures for the RPC messages.

// Invoked by leader to replicate log entries. Also used for heartbeats.
type AppendEntriesRequest struct {
	// Leaders term
	Term         int
	// Leader Id, so follower can redirect clients to Leader
	LeaderId     int
	// Index of log entry immediately preceding new ones. Raft uses it to avoid Gaps in the logs.
	PrevLogIndex int
	// Term of the prevLogIndex entry
	PrevLogTerm  int
	// Log entries to store (empty for heartbeat; may send more than one for efficiency)
	Entries      []Log
	// Leaders commit
	LeaderCommit int
}

// Response of AppendEntriesRequest
type AppendEntriesResponse struct {
	// currentTerm of responding node. It is for the leader to update itself. If the responding node has higher term, the leader will 
    // become a follower.
	Term    int
	// True if the follower contained entry matching prevLogIndex and prevLogTerm
	Success bool
}

// Invoked by candidates to gather votes
type RequestVoteRequest struct {
	// Candidates term
	Term         int
	// Candidate requesting vote
	CandidateId  int
	// Index of candidates' last log entry. During the election other followers will accept the request only if 
    // the candidate log is at least as up to date as the receivers log
	LastLogIndex int
	// Term of candidates last log entry
	LastLogTerm  int
}

// Response of RequestVoteRequest
type RequestVoteResponse struct {
	// currentTerm of responding node, for the candidate to update itself.
	Term        int
	// True if the candidate received the vote
	VoteGranted bool
}

Now that we have introduced the data structures, let’s move to the operations performed by Raft. There are two main operations performed by Raft. Leader Election and Log Replication.

Leader Election

A node in Raft can be in one of the three states. Leader (Responsible for the log maintenance), Follower (Replicates logs of leader in to itself) and Candidate (A node which tries to be the next leader, when the current leader is not responding).

The state diagram from Raft paper can be seen below.

State Diagram

The gist of the leader election can be implemented as below.

// A follower node constantly checks if it received a message from leader recently.
// If not, it will become a candidate and will start election.
func (s *Server) CandidateChecker() {
	for {
		s.mutex.Lock()
		if s.role == "follower" {
			if (time.Since(s.lastRequestTime) > 1*time.Second) && (time.Since(s.lastVoteTime) > 1*time.Second) {
				s.role = "candidate"
				s.lastVoteTime = time.Now()
				s.mutex.Unlock()
				s.startElection()
			} else {
				s.mutex.Unlock()
			}
		} else {
			s.mutex.Unlock()
		}
		// Jitter time
		time.Sleep(time.Duration(rand.IntN(100)) * time.Millisecond)
		time.Sleep(100 * time.Millisecond)
	}
}

// Starts election by sending RequestVote RPC to all the neighbor nodes.
func (s *Server) startElection() {
	s.mutex.Lock()
	lastLogIndex := s.getLastLogIndexLocked()
	lastLogTerm := s.PeristentState.log[lastLogIndex].Term

	// Increment Term and start election
	s.PeristentState.currentTerm++
	requestVote := RequestVoteRequest{
		Term:         s.PeristentState.currentTerm,
		CandidateId:  s.nodeId,
		LastLogIndex: lastLogIndex,
		LastLogTerm:  lastLogTerm,
	}

	s.logger.Info("Election start", "requestVote", requestVote, "nodeId", s.nodeId)
	j, err := json.Marshal(requestVote)
	if err != nil {
		s.logger.Error("Starting election failed with Json Marshal", "requestVote", requestVote, "err", err, "nodeId", s.nodeId)
	}

	// We vote for ourselves first
	s.PeristentState.votedFor = s.nodeId
	neighbourNodes := make([]int, len(s.neighbourNodes))
	copy(neighbourNodes, s.neighbourNodes)
	s.mutex.Unlock()

	outputs := make(map[int]RequestVoteResponse)
	// We send requestVote request to all neighbors
	for _, n := range neighbourNodes {
		resp, err := http.Post("http://localhost:"+strconv.Itoa(8000+n)+"/requestVote", "application/json", bytes.NewReader(j))
		if err != nil {
			s.logger.Error(err.Error(), "requestVote", requestVote, "nodeId", s.nodeId)
			continue
		}

		var requestVoteResponse RequestVoteResponse
		err = json.NewDecoder(resp.Body).Decode(&requestVoteResponse)
		if err != nil {
			s.logger.Error(err.Error())
			continue
		}
		outputs[n] = requestVoteResponse
	}

	s.mutex.Lock()
	successCount := 0
	rejectRequest := false
	for _, v := range outputs {
		// We got a vote from a follower
		if v.VoteGranted {
			successCount++
		}
        // Raft uses Term as a Logical Clock. A node sent us a Term greater than our Term.
        // This means that node is more updated. So we should become a follower and stop election.
		if v.Term > s.PeristentState.currentTerm {
			s.role = "follower"
			s.PeristentState.currentTerm = v.Term
			rejectRequest = true
			break
		}
	}

	if rejectRequest {
		s.mutex.Unlock()
		s.logger.Info("There is another candidate with higher Term", "requestVote", requestVote, "nodeId", s.nodeId)
		return
	}

	// We need a majority (n/2 + 1). We already voted for ourselves 
	if successCount >= len(s.neighbourNodes)/2 {
		s.logger.Info("New leader elected", "requestVote", requestVote, "nodeId", s.nodeId, "pid", os.Getpid())
		s.role = "leader"
		s.mutex.Unlock()
		// Once we are elected leader, we should send an empty heartbeat for others to know, we have a new leader.
		s.sendHeartBeatEmpty()
		s.mutex.Lock()
		// Initialize the nextIndex and matchIndex for each follower, so that we can send them logs
		s.VolatileLeaderState.nextIndex = make(map[int]int)
		s.VolatileLeaderState.matchIndex = make(map[int]int)
		for _, n := range s.neighbourNodes {
			s.VolatileLeaderState.nextIndex[n] = s.getLastLogIndexLocked() + 1
			s.VolatileLeaderState.matchIndex[n] = 0
		}
		s.mutex.Unlock()
		return
	}
	// We didn't get majority votes, so we failed. So we go back to the follower role.
	s.logger.Info("Election failed", "requestVote", requestVote, "nodeId", s.nodeId)
	s.role = "follower"
	s.mutex.Unlock()
}

// Handler for RequestVote in all nodes
func (s *Server) RequestVote(request RequestVoteRequest) RequestVoteResponse {
	s.mutex.Lock()
	defer s.mutex.Unlock()

	// We always need to keep track of the time we got our last request.
    // This is useful to know when we should try to become a candidate
	s.lastRequestTime = time.Now()

	// Raft guarantees that a node will not vote for two different candidates in a single term using votedFor
    // This is also stored in a durable storage to have this guarantee
	// We will reset votedFor after election timeout time, so that we can vote for another candidate or ourselves
	if time.Since(s.lastVoteTime) > time.Second*1 {
		s.PeristentState.votedFor = -1
	}
	// If there is a more updated node, we convert to a follower
	if request.Term > s.PeristentState.currentTerm {
		s.PeristentState.currentTerm = request.Term
		s.role = "follower"
	}
	out := RequestVoteResponse{}
	// If we are more updated than the candidate, we let the node know it.
    // The node will see we are more updated than it and will step down to a follower role
	// This will give us a chance to become a leader
	if request.Term < s.PeristentState.currentTerm {
		out = RequestVoteResponse{Term: s.PeristentState.currentTerm, VoteGranted: false}
		s.logger.Info("RequestVote Overruled", "request", request, "out", out)
		return out
	}

	// We only vote if we haven't voted for anyone
	if s.PeristentState.votedFor == -1 {
		last := s.getLastLogIndexLocked()
		// The candidate should have the last log entry at least as up to date as us
		if request.LastLogIndex >= last && (request.LastLogTerm >= s.PeristentState.log[last].Term) {
			out = RequestVoteResponse{Term: s.PeristentState.currentTerm, VoteGranted: true}
		} else {
			out = RequestVoteResponse{Term: s.PeristentState.currentTerm, VoteGranted: false}
		}

		if out.VoteGranted {
			s.PeristentState.votedFor = request.CandidateId
			s.lastVoteTime = time.Now()
			s.logger.Info("RequestVote voted", "request", request, "out", out, "nodeId", s.nodeId)
		}
		return out
	} else {
		out = RequestVoteResponse{Term: s.PeristentState.currentTerm, VoteGranted: false}
		s.logger.Info("RequestVote already voted", "request", request, "out", out, "nodeId", s.nodeId)
		return out
	}
}

Now let’s move to log replication.

Log Replication

In this operation, the leader node sends new logs it has to all follower nodes using the AppendEntries RPC.

The gist of the implementation is as follows.

type KV struct {
    Key   string
    Value string
}
// This is executed in the leader node
func (s *Server) insertLog(req KV) error {
	s.mutex.Lock()
	// Only the leader should accept the writes / reads
	if s.role != "leader" {
		s.mutex.Unlock()
		return errors.New("not leader")
	}

	// Add to the local log, the values from request
	// We use our term along with the command and value
	lastIndex := s.getLastLogIndexLocked()
	s.PeristentState.log[lastIndex+1] = Log{Term: s.PeristentState.currentTerm, Command: req.Key, Content: req.Value}

	requests := make(map[int][]byte)

	// nextIndex has the index of the log entry which should be sent next to the node
    // matchIndex has the index of the highest log entry which is known to be replicated 
	// Send append requests based on nextIndex and matchIndex
	for _, n := range s.neighbourNodes {
		prevLogIndex := s.VolatileLeaderState.nextIndex[n] - 1
		prevLogTerm := s.PeristentState.log[prevLogIndex].Term
		entries := make([]Log, 0)

		for i := prevLogIndex + 1; i <= s.getLastLogIndexLocked(); i++ {
			entries = append(entries, s.PeristentState.log[i])
		}

		appendEntriesRequest := AppendEntriesRequest{
			Term:         s.PeristentState.currentTerm,
			LeaderId:     s.nodeId,
			PrevLogIndex: prevLogIndex,
			PrevLogTerm:  prevLogTerm,
			Entries:      entries,
			LeaderCommit: s.VolatileState.commitIndex,
		}

		j, err := json.Marshal(appendEntriesRequest)
		if err != nil {
			// this shouldn't happen
			panic(err)
		}
		requests[n] = j
	}
	s.mutex.Unlock()

	// Send the corresponding messages to all followers and get the response
	successCount, rejectRequest := s.sendToFollowers(requests)

	if rejectRequest {
		return Errors.New("Please send to leader node")
	}

	s.mutex.Lock()
	// We got a majority of confirmations, so it is a success, lets append it to our log
	if successCount >= len(s.neighbourNodes)/2 {
		s.PeristentState.log[s.VolatileState.commitIndex+1] = Log{
			Command: req.Key,
			Term:    s.PeristentState.currentTerm,
			Content: req.Value,
		}
		// Since we got confirmations from a majority, let's mark the entry as committed.
        // In raft is an entry with id X is committed; all entries X-1,X-2..., etc. are also considered committed.
		s.VolatileState.commitIndex++
		// Apply the log to our state machine
        // In our case it is a simple KV store
		s.applySnapshotLocked()
		commitIndex := s.VolatileState.commitIndex
		s.mutex.Unlock()
		s.logger.Info("Applied commit index: ", "commitIndex", commitIndex)
		return
	}
	s.mutex.Unlock()

	return errors.New("majority not reached or timed out")
}

func (s *Server) sendToFollowers(j map[int][]byte) (int, bool) {
	outputs := make(map[int]AppendEntriesResponse)
	for _, n := range s.neighbourNodes {
		resp, err := http.Post("http://localhost:"+strconv.Itoa(8000+n)+"/appendEntries", "application/json", bytes.NewReader(j[n]))
		if err != nil {
			// There seems to be an error, move to the next node
			continue
		}

		var appendEntriesResponse AppendEntriesResponse
		err = json.NewDecoder(resp.Body).Decode(&appendEntriesResponse)
		if err != nil {
			// This shouldn't happen. Let's move to the next node
			continue
		}
		outputs[n] = appendEntriesResponse
	}

	s.mutex.Lock()
	defer s.mutex.Unlock()
	successCount := 0
	rejectRequest := false
	for nodeId, v := range outputs {
		if v.Success {
            // Ideally, index updates should be based on the contents of j.
			// But even if it is wrong, we have a mechanism to fix this builtin to the raft protocol
			lastLogIndex := s.getLastLogIndexLocked()
			s.VolatileLeaderState.nextIndex[nodeId] = lastLogIndex + 1
			s.VolatileLeaderState.matchIndex[nodeId] = lastLogIndex
			successCount++
		}
		if v.Term > s.PeristentState.currentTerm {
			// There is someone with a more updated Term
			s.role = "follower"
			s.PeristentState.currentTerm = v.Term
			rejectRequest = true
			break
		}
        // Handle cases where followers gave false because they are far behind
		// In this case, we send the previous log to follower. If it again fails, 
        // we try an even earlier log and so on until the follower has caught up or 
        // we reach the beginning of the log
		if !v.Success {
			// Logs are behind
			s.VolatileLeaderState.nextIndex[nodeId] -= 1
			s.VolatileLeaderState.matchIndex[nodeId] -= 1
		}
	}
	return successCount, rejectRequest
}

// Executed on follower nodes when the leader sends AppendEntries
func (s *Server) AppendEntries(request AppendEntriesRequest) AppendEntriesResponse {
	s.mutex.Lock()
	defer s.mutex.Unlock()

	// Reset voted for if successful
	s.PeristentState.votedFor = -1
	s.lastRequestTime = time.Now()
	
	// Rule for all servers
	if request.Term > s.PeristentState.currentTerm {
		s.PeristentState.currentTerm = request.Term
		s.role = "follower"
	}
	out := AppendEntriesResponse{}
	
    // If we have a more up to date term than the sender, we return success false
    // This makes the sender step down to a follower role, and we will have a chance at being the leader
	if request.Term < s.PeristentState.currentTerm {
		out = AppendEntriesResponse{Term: s.PeristentState.currentTerm, Success: false}
		return out
	}

	// Check if the previousLogIndex exists. This check is to prevent Gaps in logs.
    // If the previous log is missing, we will send a false, which will let the 
    // leader know we are missing log entries. So leader will send earlier logs to us.
	v, ok := s.PeristentState.log[request.PrevLogIndex]
	if !ok {
		s.logger.Info("AppendEntries failed, previous log not found", "request", request, "nodeId", s.nodeId)
		return AppendEntriesResponse{Term: s.PeristentState.currentTerm, Success: false}
	}

	// The log exists, but the terms don't match
	// We will have to delete all non-matching entries starting at this entry
	// Leader always forces its entries on to the follower
	if v.Term != request.PrevLogTerm {
		s.logger.Info("AppendEntries failed", "request", request, "out", out, "nodeId", s.nodeId)
	}

	// delete the existing entry and all following Entries in the request
	keys := slices.Sorted(maps.Keys(s.PeristentState.log))
	for _, key := range keys {
		if key > request.PrevLogIndex {
			delete(s.PeristentState.log, key)
		}
	}
	// Append all new Entries
	for i, entry := range request.Entries {
		s.PeristentState.log[request.PrevLogIndex+i+1] = entry
	}

	// Update our commit Index
	if request.LeaderCommit > s.VolatileState.commitIndex {
		lastLogIndex := s.getLastLogIndexLocked()
		s.VolatileState.commitIndex = min(request.LeaderCommit, lastLogIndex)
	}

	// Everything seems fine
	out = AppendEntriesResponse{Term: s.PeristentState.currentTerm, Success: true}
	if len(request.Entries) > 0 {
		s.logger.Info("AppendEntries successful", "request", request, "out", out, "nodeId", s.nodeId)
	} else {
		s.logger.Info("Heartbeat received", "request", request, "out", out, "nodeId", s.nodeId)
	}
	return out
}

Now the only operations left in the algorithm are the periodic heartbeats the leader sends to followers and occasional applying of committed entries to the state machine. You can see it in the code below.

Heartbeats

The leader sends periodic heartbeats to followers to keep them in sync with the leader. This also signals to the followers that the leader is still alive, so that they won’t start a new election.

func (s *Server) LeaderHeartbeat() {
	for {
		s.mutex.Lock()
		role := s.role
		s.mutex.Unlock()
		if role == "leader" {
			s.sendPeriodicHeartBeat()
		}
		time.Sleep(time.Millisecond * 400)
	}
}

func (s *Server) sendPeriodicHeartBeat() {
	s.mutex.Lock()
	if s.role != "leader" {
		s.mutex.Unlock()
		return
	}

	requests := make(map[int][]byte)

	// Send append requests based on nextIndex and matchIndex
	for _, n := range s.neighbourNodes {
		prevLogIndex := s.VolatileLeaderState.nextIndex[n] - 1
		prevLogTerm := s.PeristentState.log[prevLogIndex].Term
		entries := make([]Log, 0)

		for i := prevLogIndex + 1; i <= s.getLastLogIndexLocked(); i++ {
			entries = append(entries, s.PeristentState.log[i])
		}

		appendEntriesRequest := AppendEntriesRequest{
			Term:         s.PeristentState.currentTerm,
			LeaderId:     s.nodeId,
			PrevLogIndex: prevLogIndex,
			PrevLogTerm:  prevLogTerm,
			Entries:      entries,
			LeaderCommit: s.VolatileState.commitIndex,
		}

		j, err := json.Marshal(appendEntriesRequest)
		if err != nil {
			// this shouldn't happen
			panic(err)
		}
		requests[n] = j
	}
	s.mutex.Unlock()

	_, _ = s.sendToFollowers(requests)
}

Applying Committed Entries

Once the log is replicated to a majority of nodes, and it is committed, we apply the changes to our state machine. This step allows us to build consistent services on top of raft. In our case, we simply apply the changes to the KV store. It is done periodically in a goroutine.

func (s *Server) SnapshotChecker() {
	for {
		// Apply changes to state machine
		s.ApplySnapshot()
		time.Sleep(10 * time.Millisecond)
	}
}

func (s *Server) ApplySnapshot() {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	s.applySnapshotLocked()
}

func (s *Server) applySnapshotLocked() {
	// check if we can update the commit index
	keys := slices.Sorted(maps.Values(s.VolatileLeaderState.matchIndex))
	if len(keys) > 0 {
		commitOnAll := keys[len(keys)-1]

		// We need a no-op from leader to advance the commit index. We leave it up to clients to send it to us.
		if s.PeristentState.log[commitOnAll].Term == s.PeristentState.currentTerm {
			s.VolatileState.commitIndex = commitOnAll
		}
	}

	for s.VolatileState.commitIndex >= s.VolatileState.lastApplied {
	    // Below function, simply performs, s.state[command] = content, as we have a simple KV store on top of Raft
		s.state = s.stateMachine(s.state, s.PeristentState.log[s.VolatileState.lastApplied].Command, s.PeristentState.log[s.VolatileState.lastApplied].Content)
		s.logger.Info("State updated", "state", s.state)
		s.VolatileState.lastApplied++
	}
}

That’s all about the implementation of raft protocol. The only thing left is wiring up the HTTP routes to handler functions along with setting up the Server. Since it is the boring part, I will leave it out.

I have also added a small benchmark and an end-to-end test to make sure the implementation works as expected. You can see the complete code here

Happy Hacking :)