Why do we need a Consensus Algorithm?
In any practical distributed system, we need to tolerate failures. One general approach to tolerate failures is to have redundant copies of the data which are consistent in separate nodes. This system can be modeled as a Replicated State Machine. In a replicated state machine, we have a collection of servers has the identical copies of the data. Replicated State Machine is implemented using a replicated log, where the logs are copied over to multiple servers. The logs in servers are identical in both data and order. These logs are executed in sequence by the state machine in all servers, in turn ensuring an identical state across all servers. The consensus algorithm is responsible for ensuring that the replicated logs are identical in data and order across all servers.
Raft is a consensus algorithm with a focus on ease of understanding. Please take a look at Raft’s paper. I highly recommend reading it to be familiar with Raft, and it also offers reasoning behind the design decisions gone in to Raft. I will reference diagrams and text from the paper. Also, a nice visualization of the whole Raft processes via animations can be found here.
The raft consensus algorithm
Raft is a leader-based consensus algorithm. Here the leader is responsible for the replicated log.
The Leader approach lets Raft decompose the consensus problem in to three separate problems. 1. Leader Election -> Process for leader election 2. Log Replication -> Process for taking data from clients and replicating it to other servers 3. Log Consistency -> Making sure all the servers have identical logs
Raft Paper has a condensed summary of the algorithm, which can be seen below.

Once we translate the above algorithm to code, we will have a non-prod version of Raft ready :) So let’s get started.
Implementing Raft in Code
Note: We will use the naming in code exactly like on the above page to avoid any confusion. We will also use comments from the algorithm page for corresponding code to make reading the code easier.
Let’s start with the data structures for state.
type Log struct {
Command string
Term int
Content string
}
// Updated on stable storage before responding to RPCs. Should be durable
type PersistentState struct {
// latest term server has seen. Starts at 0 and increases monotonically
currentTerm int
// CandidateId that received vote from this node in current term (null if none)
votedFor int
// Log Entries which contains command for state machine, along with Term when it was received.
log map[int]Log
}
// Volatile state on all servers
type VolatileState struct {
// Index of the highest log entry that is committed. Starts at 0 and increases monotonically
commitIndex int
// Index of the highest log entry that is applied to state machine. Starts at 0 and increases monotonically
lastApplied int
}
// Volatile state on Leader, reset after each election
type VolatileLeaderState struct {
// for each server, index of the next log entry to send to that server (initialized to leader last log index + 1)
nextIndex map[int]int
// for each server, index of highest log entry known to be replicated on server (initialized to 0, increases monotonically)
matchIndex map[int]int
}Similarly, let’s introduce data structures for the RPC messages.
// Invoked by leader to replicate log entries. Also used for heartbeats.
type AppendEntriesRequest struct {
// Leaders term
Term int
// Leader Id, so follower can redirect clients to Leader
LeaderId int
// Index of log entry immediately preceding new ones. Raft uses it to avoid Gaps in the logs.
PrevLogIndex int
// Term of the prevLogIndex entry
PrevLogTerm int
// Log entries to store (empty for heartbeat; may send more than one for efficiency)
Entries []Log
// Leaders commit
LeaderCommit int
}
// Response of AppendEntriesRequest
type AppendEntriesResponse struct {
// currentTerm of responding node. It is for the leader to update itself. If the responding node has higher term, the leader will
// become a follower.
Term int
// True if the follower contained entry matching prevLogIndex and prevLogTerm
Success bool
}
// Invoked by candidates to gather votes
type RequestVoteRequest struct {
// Candidates term
Term int
// Candidate requesting vote
CandidateId int
// Index of candidates' last log entry. During the election other followers will accept the request only if
// the candidate log is at least as up to date as the receivers log
LastLogIndex int
// Term of candidates last log entry
LastLogTerm int
}
// Response of RequestVoteRequest
type RequestVoteResponse struct {
// currentTerm of responding node, for the candidate to update itself.
Term int
// True if the candidate received the vote
VoteGranted bool
}Now that we have introduced the data structures, let’s move to the operations performed by Raft. There are two main operations performed by Raft. Leader Election and Log Replication.
Leader Election
A node in Raft can be in one of the three states. Leader (Responsible for the log maintenance), Follower (Replicates logs of leader in to itself) and Candidate (A node which tries to be the next leader, when the current leader is not responding).
The state diagram from Raft paper can be seen below.

The gist of the leader election can be implemented as below.
// A follower node constantly checks if it received a message from leader recently.
// If not, it will become a candidate and will start election.
func (s *Server) CandidateChecker() {
for {
s.mutex.Lock()
if s.role == "follower" {
if (time.Since(s.lastRequestTime) > 1*time.Second) && (time.Since(s.lastVoteTime) > 1*time.Second) {
s.role = "candidate"
s.lastVoteTime = time.Now()
s.mutex.Unlock()
s.startElection()
} else {
s.mutex.Unlock()
}
} else {
s.mutex.Unlock()
}
// Jitter time
time.Sleep(time.Duration(rand.IntN(100)) * time.Millisecond)
time.Sleep(100 * time.Millisecond)
}
}
// Starts election by sending RequestVote RPC to all the neighbor nodes.
func (s *Server) startElection() {
s.mutex.Lock()
lastLogIndex := s.getLastLogIndexLocked()
lastLogTerm := s.PeristentState.log[lastLogIndex].Term
// Increment Term and start election
s.PeristentState.currentTerm++
requestVote := RequestVoteRequest{
Term: s.PeristentState.currentTerm,
CandidateId: s.nodeId,
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
}
s.logger.Info("Election start", "requestVote", requestVote, "nodeId", s.nodeId)
j, err := json.Marshal(requestVote)
if err != nil {
s.logger.Error("Starting election failed with Json Marshal", "requestVote", requestVote, "err", err, "nodeId", s.nodeId)
}
// We vote for ourselves first
s.PeristentState.votedFor = s.nodeId
neighbourNodes := make([]int, len(s.neighbourNodes))
copy(neighbourNodes, s.neighbourNodes)
s.mutex.Unlock()
outputs := make(map[int]RequestVoteResponse)
// We send requestVote request to all neighbors
for _, n := range neighbourNodes {
resp, err := http.Post("http://localhost:"+strconv.Itoa(8000+n)+"/requestVote", "application/json", bytes.NewReader(j))
if err != nil {
s.logger.Error(err.Error(), "requestVote", requestVote, "nodeId", s.nodeId)
continue
}
var requestVoteResponse RequestVoteResponse
err = json.NewDecoder(resp.Body).Decode(&requestVoteResponse)
if err != nil {
s.logger.Error(err.Error())
continue
}
outputs[n] = requestVoteResponse
}
s.mutex.Lock()
successCount := 0
rejectRequest := false
for _, v := range outputs {
// We got a vote from a follower
if v.VoteGranted {
successCount++
}
// Raft uses Term as a Logical Clock. A node sent us a Term greater than our Term.
// This means that node is more updated. So we should become a follower and stop election.
if v.Term > s.PeristentState.currentTerm {
s.role = "follower"
s.PeristentState.currentTerm = v.Term
rejectRequest = true
break
}
}
if rejectRequest {
s.mutex.Unlock()
s.logger.Info("There is another candidate with higher Term", "requestVote", requestVote, "nodeId", s.nodeId)
return
}
// We need a majority (n/2 + 1). We already voted for ourselves
if successCount >= len(s.neighbourNodes)/2 {
s.logger.Info("New leader elected", "requestVote", requestVote, "nodeId", s.nodeId, "pid", os.Getpid())
s.role = "leader"
s.mutex.Unlock()
// Once we are elected leader, we should send an empty heartbeat for others to know, we have a new leader.
s.sendHeartBeatEmpty()
s.mutex.Lock()
// Initialize the nextIndex and matchIndex for each follower, so that we can send them logs
s.VolatileLeaderState.nextIndex = make(map[int]int)
s.VolatileLeaderState.matchIndex = make(map[int]int)
for _, n := range s.neighbourNodes {
s.VolatileLeaderState.nextIndex[n] = s.getLastLogIndexLocked() + 1
s.VolatileLeaderState.matchIndex[n] = 0
}
s.mutex.Unlock()
return
}
// We didn't get majority votes, so we failed. So we go back to the follower role.
s.logger.Info("Election failed", "requestVote", requestVote, "nodeId", s.nodeId)
s.role = "follower"
s.mutex.Unlock()
}
// Handler for RequestVote in all nodes
func (s *Server) RequestVote(request RequestVoteRequest) RequestVoteResponse {
s.mutex.Lock()
defer s.mutex.Unlock()
// We always need to keep track of the time we got our last request.
// This is useful to know when we should try to become a candidate
s.lastRequestTime = time.Now()
// Raft guarantees that a node will not vote for two different candidates in a single term using votedFor
// This is also stored in a durable storage to have this guarantee
// We will reset votedFor after election timeout time, so that we can vote for another candidate or ourselves
if time.Since(s.lastVoteTime) > time.Second*1 {
s.PeristentState.votedFor = -1
}
// If there is a more updated node, we convert to a follower
if request.Term > s.PeristentState.currentTerm {
s.PeristentState.currentTerm = request.Term
s.role = "follower"
}
out := RequestVoteResponse{}
// If we are more updated than the candidate, we let the node know it.
// The node will see we are more updated than it and will step down to a follower role
// This will give us a chance to become a leader
if request.Term < s.PeristentState.currentTerm {
out = RequestVoteResponse{Term: s.PeristentState.currentTerm, VoteGranted: false}
s.logger.Info("RequestVote Overruled", "request", request, "out", out)
return out
}
// We only vote if we haven't voted for anyone
if s.PeristentState.votedFor == -1 {
last := s.getLastLogIndexLocked()
// The candidate should have the last log entry at least as up to date as us
if request.LastLogIndex >= last && (request.LastLogTerm >= s.PeristentState.log[last].Term) {
out = RequestVoteResponse{Term: s.PeristentState.currentTerm, VoteGranted: true}
} else {
out = RequestVoteResponse{Term: s.PeristentState.currentTerm, VoteGranted: false}
}
if out.VoteGranted {
s.PeristentState.votedFor = request.CandidateId
s.lastVoteTime = time.Now()
s.logger.Info("RequestVote voted", "request", request, "out", out, "nodeId", s.nodeId)
}
return out
} else {
out = RequestVoteResponse{Term: s.PeristentState.currentTerm, VoteGranted: false}
s.logger.Info("RequestVote already voted", "request", request, "out", out, "nodeId", s.nodeId)
return out
}
}Now let’s move to log replication.
Log Replication
In this operation, the leader node sends new logs it has to all follower nodes using the AppendEntries RPC.
The gist of the implementation is as follows.
type KV struct {
Key string
Value string
}
// This is executed in the leader node
func (s *Server) insertLog(req KV) error {
s.mutex.Lock()
// Only the leader should accept the writes / reads
if s.role != "leader" {
s.mutex.Unlock()
return errors.New("not leader")
}
// Add to the local log, the values from request
// We use our term along with the command and value
lastIndex := s.getLastLogIndexLocked()
s.PeristentState.log[lastIndex+1] = Log{Term: s.PeristentState.currentTerm, Command: req.Key, Content: req.Value}
requests := make(map[int][]byte)
// nextIndex has the index of the log entry which should be sent next to the node
// matchIndex has the index of the highest log entry which is known to be replicated
// Send append requests based on nextIndex and matchIndex
for _, n := range s.neighbourNodes {
prevLogIndex := s.VolatileLeaderState.nextIndex[n] - 1
prevLogTerm := s.PeristentState.log[prevLogIndex].Term
entries := make([]Log, 0)
for i := prevLogIndex + 1; i <= s.getLastLogIndexLocked(); i++ {
entries = append(entries, s.PeristentState.log[i])
}
appendEntriesRequest := AppendEntriesRequest{
Term: s.PeristentState.currentTerm,
LeaderId: s.nodeId,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Entries: entries,
LeaderCommit: s.VolatileState.commitIndex,
}
j, err := json.Marshal(appendEntriesRequest)
if err != nil {
// this shouldn't happen
panic(err)
}
requests[n] = j
}
s.mutex.Unlock()
// Send the corresponding messages to all followers and get the response
successCount, rejectRequest := s.sendToFollowers(requests)
if rejectRequest {
return Errors.New("Please send to leader node")
}
s.mutex.Lock()
// We got a majority of confirmations, so it is a success, lets append it to our log
if successCount >= len(s.neighbourNodes)/2 {
s.PeristentState.log[s.VolatileState.commitIndex+1] = Log{
Command: req.Key,
Term: s.PeristentState.currentTerm,
Content: req.Value,
}
// Since we got confirmations from a majority, let's mark the entry as committed.
// In raft is an entry with id X is committed; all entries X-1,X-2..., etc. are also considered committed.
s.VolatileState.commitIndex++
// Apply the log to our state machine
// In our case it is a simple KV store
s.applySnapshotLocked()
commitIndex := s.VolatileState.commitIndex
s.mutex.Unlock()
s.logger.Info("Applied commit index: ", "commitIndex", commitIndex)
return
}
s.mutex.Unlock()
return errors.New("majority not reached or timed out")
}
func (s *Server) sendToFollowers(j map[int][]byte) (int, bool) {
outputs := make(map[int]AppendEntriesResponse)
for _, n := range s.neighbourNodes {
resp, err := http.Post("http://localhost:"+strconv.Itoa(8000+n)+"/appendEntries", "application/json", bytes.NewReader(j[n]))
if err != nil {
// There seems to be an error, move to the next node
continue
}
var appendEntriesResponse AppendEntriesResponse
err = json.NewDecoder(resp.Body).Decode(&appendEntriesResponse)
if err != nil {
// This shouldn't happen. Let's move to the next node
continue
}
outputs[n] = appendEntriesResponse
}
s.mutex.Lock()
defer s.mutex.Unlock()
successCount := 0
rejectRequest := false
for nodeId, v := range outputs {
if v.Success {
// Ideally, index updates should be based on the contents of j.
// But even if it is wrong, we have a mechanism to fix this builtin to the raft protocol
lastLogIndex := s.getLastLogIndexLocked()
s.VolatileLeaderState.nextIndex[nodeId] = lastLogIndex + 1
s.VolatileLeaderState.matchIndex[nodeId] = lastLogIndex
successCount++
}
if v.Term > s.PeristentState.currentTerm {
// There is someone with a more updated Term
s.role = "follower"
s.PeristentState.currentTerm = v.Term
rejectRequest = true
break
}
// Handle cases where followers gave false because they are far behind
// In this case, we send the previous log to follower. If it again fails,
// we try an even earlier log and so on until the follower has caught up or
// we reach the beginning of the log
if !v.Success {
// Logs are behind
s.VolatileLeaderState.nextIndex[nodeId] -= 1
s.VolatileLeaderState.matchIndex[nodeId] -= 1
}
}
return successCount, rejectRequest
}
// Executed on follower nodes when the leader sends AppendEntries
func (s *Server) AppendEntries(request AppendEntriesRequest) AppendEntriesResponse {
s.mutex.Lock()
defer s.mutex.Unlock()
// Reset voted for if successful
s.PeristentState.votedFor = -1
s.lastRequestTime = time.Now()
// Rule for all servers
if request.Term > s.PeristentState.currentTerm {
s.PeristentState.currentTerm = request.Term
s.role = "follower"
}
out := AppendEntriesResponse{}
// If we have a more up to date term than the sender, we return success false
// This makes the sender step down to a follower role, and we will have a chance at being the leader
if request.Term < s.PeristentState.currentTerm {
out = AppendEntriesResponse{Term: s.PeristentState.currentTerm, Success: false}
return out
}
// Check if the previousLogIndex exists. This check is to prevent Gaps in logs.
// If the previous log is missing, we will send a false, which will let the
// leader know we are missing log entries. So leader will send earlier logs to us.
v, ok := s.PeristentState.log[request.PrevLogIndex]
if !ok {
s.logger.Info("AppendEntries failed, previous log not found", "request", request, "nodeId", s.nodeId)
return AppendEntriesResponse{Term: s.PeristentState.currentTerm, Success: false}
}
// The log exists, but the terms don't match
// We will have to delete all non-matching entries starting at this entry
// Leader always forces its entries on to the follower
if v.Term != request.PrevLogTerm {
s.logger.Info("AppendEntries failed", "request", request, "out", out, "nodeId", s.nodeId)
}
// delete the existing entry and all following Entries in the request
keys := slices.Sorted(maps.Keys(s.PeristentState.log))
for _, key := range keys {
if key > request.PrevLogIndex {
delete(s.PeristentState.log, key)
}
}
// Append all new Entries
for i, entry := range request.Entries {
s.PeristentState.log[request.PrevLogIndex+i+1] = entry
}
// Update our commit Index
if request.LeaderCommit > s.VolatileState.commitIndex {
lastLogIndex := s.getLastLogIndexLocked()
s.VolatileState.commitIndex = min(request.LeaderCommit, lastLogIndex)
}
// Everything seems fine
out = AppendEntriesResponse{Term: s.PeristentState.currentTerm, Success: true}
if len(request.Entries) > 0 {
s.logger.Info("AppendEntries successful", "request", request, "out", out, "nodeId", s.nodeId)
} else {
s.logger.Info("Heartbeat received", "request", request, "out", out, "nodeId", s.nodeId)
}
return out
}Now the only operations left in the algorithm are the periodic heartbeats the leader sends to followers and occasional applying of committed entries to the state machine. You can see it in the code below.
Heartbeats
The leader sends periodic heartbeats to followers to keep them in sync with the leader. This also signals to the followers that the leader is still alive, so that they won’t start a new election.
func (s *Server) LeaderHeartbeat() {
for {
s.mutex.Lock()
role := s.role
s.mutex.Unlock()
if role == "leader" {
s.sendPeriodicHeartBeat()
}
time.Sleep(time.Millisecond * 400)
}
}
func (s *Server) sendPeriodicHeartBeat() {
s.mutex.Lock()
if s.role != "leader" {
s.mutex.Unlock()
return
}
requests := make(map[int][]byte)
// Send append requests based on nextIndex and matchIndex
for _, n := range s.neighbourNodes {
prevLogIndex := s.VolatileLeaderState.nextIndex[n] - 1
prevLogTerm := s.PeristentState.log[prevLogIndex].Term
entries := make([]Log, 0)
for i := prevLogIndex + 1; i <= s.getLastLogIndexLocked(); i++ {
entries = append(entries, s.PeristentState.log[i])
}
appendEntriesRequest := AppendEntriesRequest{
Term: s.PeristentState.currentTerm,
LeaderId: s.nodeId,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Entries: entries,
LeaderCommit: s.VolatileState.commitIndex,
}
j, err := json.Marshal(appendEntriesRequest)
if err != nil {
// this shouldn't happen
panic(err)
}
requests[n] = j
}
s.mutex.Unlock()
_, _ = s.sendToFollowers(requests)
}Applying Committed Entries
Once the log is replicated to a majority of nodes, and it is committed, we apply the changes to our state machine. This step allows us to build consistent services on top of raft. In our case, we simply apply the changes to the KV store. It is done periodically in a goroutine.
func (s *Server) SnapshotChecker() {
for {
// Apply changes to state machine
s.ApplySnapshot()
time.Sleep(10 * time.Millisecond)
}
}
func (s *Server) ApplySnapshot() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.applySnapshotLocked()
}
func (s *Server) applySnapshotLocked() {
// check if we can update the commit index
keys := slices.Sorted(maps.Values(s.VolatileLeaderState.matchIndex))
if len(keys) > 0 {
commitOnAll := keys[len(keys)-1]
// We need a no-op from leader to advance the commit index. We leave it up to clients to send it to us.
if s.PeristentState.log[commitOnAll].Term == s.PeristentState.currentTerm {
s.VolatileState.commitIndex = commitOnAll
}
}
for s.VolatileState.commitIndex >= s.VolatileState.lastApplied {
// Below function, simply performs, s.state[command] = content, as we have a simple KV store on top of Raft
s.state = s.stateMachine(s.state, s.PeristentState.log[s.VolatileState.lastApplied].Command, s.PeristentState.log[s.VolatileState.lastApplied].Content)
s.logger.Info("State updated", "state", s.state)
s.VolatileState.lastApplied++
}
}That’s all about the implementation of raft protocol. The only thing left is wiring up the HTTP routes to handler functions along with setting up the Server. Since it is the boring part, I will leave it out.
I have also added a small benchmark and an end-to-end test to make sure the implementation works as expected. You can see the complete code here
Happy Hacking :)