Raft log synchronization using Go language _golang_script home

Raft log synchronization using Go language

Updated: May 24, 2023 11:20:42 by nananatsu
This article mainly introduces in detail how to use Go language to achieve Raft log synchronization. The example code in this article explains in detail, which has certain help for us to deeply understand Go language. If you need it, you can refer to it

In raft, after successful selection, the cluster can work normally. A normal client proposal process is as follows:

  • The client connects to the leader and initiates a proposal
  • After receiving the proposal, the leader packages the proposal as a log
  • The leader caches the logs to be submitted
  • The leader sends logs to other nodes in the cluster
  • After receiving logs, the follower caches the logs to be submitted and responds to the leader's request
  • The leader receives a response from a follower and checks whether most nodes in the cluster have responded
  • After most nodes in the cluster have cached logs, the leader submits logs and sends empty logs requesting followers to submit logs
  • leader response The client proposal has been received

Log rules in raft are as follows:

1. If two log records on different nodes have the same tenure and ID, the two log records have the same content

  • A leader can create a maximum of one log record for a given number in a single term
  • The leader does not change the location of log records (the leader does not overwrite or delete log records).

2. If two logs on different nodes have the same tenure and ID, all the logs before the two logs are the same

When the leader sends a log to a follower, the log record contains the last log record number and duration

After receiving a request for additional logs, a follower checks the log consistency. If the check fails, the follower rejects the request to ensure the log consistency. If the check fails, the leader forces the follower to accept the leader log to ensure log consistency. There are two types of consistency check failures.

A. If a log is missing, the leader sends a log number minus one after receiving a failed response (the leader returns the latest log number when a follower check fails) and sends a log about missing followers

Log loss occurs when a node goes offline for a period of time or when network exception messages are lost

B. If the leader has redundant or inconsistent logs, after receiving a failure response, the leader sends one less log number (the leader can also return the last log number when a follower fails to check), finds the latest consistent log number in the two logs, and deletes the inconsistent log number in the follower

After receiving a message, the leader of a node adds the message to the local device but does not synchronize the message to the cluster. If the node goes online later, inconsistent logs will be displayed in the memory. If the node has been submitted, no inconsistency will occur

Log synchronization between nodes is completed in two steps.

1. The leader copies logs to other nodes through rpc

2. After the leader confirms that a log has been copied to most nodes in the cluster, the log is persisted to disks. The leader tracks the maximum number of the submitted log and sends this number through the log rpc (including heartbeat) to inform followers that logs need to be submitted

The current number and uncommitted logs before this number are persisted during log submission

Each log contains the actual proposal content of the client, leader tenure, and log number. Log records are defined as follows:

  • type Log type. Currently, there is only one type of log, which is client proposal
  • term Indicates the term of a log. The same number and term indicate the same log
  • index Indicates the log number. The value increases monotonously
  • The actual content of the data proposal
enum EntryType {
  NORMAL = 0;
}
message LogEntry {
  EntryType type = 1;
  uint64 term = 2;
  uint64 index = 3;
  bytes data = 4;
}

Define the overall log structure

The log is not submitted and is saved in the memory slice

When a log is submitted, the section to be submitted is removed for persistence

type WaitApply struct { done bool index uint64 ch chan struct{} } type RaftLog struct { logEnties []*pb.LogEntry // storage Storage // commitIndex of committed log storage uint64 // Submission progress lastAppliedIndex uint64 // lastAppliedTerm uint64 // Last submitted log Term lastAppendIndex uint64 // Last appended log logger *zap.SugaredLogger}

Defines the log persistence interface, and the actual storage implementation is provided externally

type Storage interface {
    Append(entries []*pb.LogEntry)
    GetEntries(startIndex, endIndex uint64) []*pb.LogEntry
    GetTerm(index uint64) uint64
    GetLastLogIndexAndTerm() (uint64, uint64)
    Close()
}

Implement consistency checking

Persistent logs must be consistent with the leader. To check consistency, you only need to check log slices in the memory. The following situations may occur:

1. The last log added by the leader is found in the node log

I. Add the last log to the node

II. Indicates a log in the memory slice of a node

  • The node network fluctuates. As a result, the node does not respond to the leader. The leader resends logs and clears duplicate logs
  • During the leader period of a node, some logs fail if they are not synchronized to other nodes. A new election is held in the cluster, resulting in inconsistency in subsequent logs. Delete conflicting logs (subsequent logs in the memory).

III. Submit logs for the node

If logs exist in the memory, the records in the memory are inconsistent, and the logs are cleared

2. The node does not find the last log added by the leader

I. The same log number exists

During the leader period of a node, some logs fail if they are not synchronized to other nodes. As a result, a new election is held in the cluster. As a result, the same log number is used

II. The same log number is not recorded

The log is missing and needs to be resent from the last submission

func (l *RaftLog) HasPrevLog(lastIndex, lastTerm uint64) bool { if lastIndex == 0 { return true } var term uint64 size := len(l.logEnties) if size > 0 { lastlog  := l.logEnties[size-1] if lastlog.Index == lastIndex { term = lastlog.Term } else if lastlog.Index > lastIndex { // if lastIndex == l.lastAppliedIndex {// The submitted logs must be consistent l.logEnties = l.logEnties[:0] return true} else if lastIndex > l.lastAppliedIndex {// Check uncommitted logs for i, entry := range l.ogenties [:size] {if entry.index == lastIndex {Term = entry.term // Clears logs after the last leader addition // The leader resends logs when no response is received. / The leader resends the old unsynchronized data on the leader. L.ogenties = l.Ogenties [:i+1] break}}}}} else if lastIndex == l.lastAppliedIndex { return true } b := term == lastTerm if ! Debugf(" Latest log: %d, duration: %d, local record duration: %d", lastIndex, lastTerm, term) if term! = 0 {// If the log is inconsistent with the leader, Delete inconsistent data in memory with tenure log records for i, entry := range l.logEnties { if entry.Term == term { l.logEnties = l.logEnties[:i] break } } } } return b }

Add new logs to the memory slice and update the last appended log number

func (l *RaftLog) AppendEntry(entry []*pb.LogEntry) {
​​​​​​​    size := len(entry)
    if size == 0 {
        return
    }
    l.logEnties = append(l.logEnties, entry...)
    l.lastAppendIndex = entry[size-1].Index
}

Implement log submission

  • Followers may not synchronize all logs. During synchronization, if all logs to be submitted are synchronized, logs to be submitted are submitted. Otherwise, additional logs are submitted to the index
  • Remove the part to be committed from logs, add it to persistent storage, and update the submission progress and memory slice
func (l *RaftLog) Apply(lastCommit, lastLogIndex uint64) {// Update the committable index if lastCommit > l.commitIndex {if lastLogIndex > lastCommit {l.commitIndex = lastCommit} else {l.commitIndex = lastLogIndex}} // Commit index if l.commitIndex > l.lastAppliedIndex {n := 0 for i, entry := range l.logEnties { if l.commitIndex >= entry.Index { n = i } else { break } } entries := l.logEnties[:n+1] l.storage.Append(entries) l.lastAppliedIndex = l.logEnties[n].Index l.lastAppliedTerm = l.logEnties[n].Term l.logEnties = l.logEnties[n+1:] l.NotifyReadIndex() } }

Define a new function and provide a storage implementation when creating an instance

func NewRaftLog(storage Storage, logger *zap.SugaredLogger) *RaftLog {
	lastIndex, lastTerm := storage.GetLastLogIndexAndTerm()
	return &RaftLog{
		logEnties:        make([]*pb.LogEntry, 0),
		storage:          storage,
		commitIndex:      lastIndex,
		lastAppliedIndex: lastIndex,
		lastAppliedTerm:  lastTerm,
		lastAppendIndex:  lastIndex,
		logger:           logger,
	}
}

The consistency check, addition and submission of logs are realized. In order to implement log processing logic in raft, we first need to save log synchronization progress of other cluster nodes in the leader node

The progress is reset when the node is switched to leader

  • The latest node logs are displayed in the voting response
  • If no vote response is received, the latest log of the leader is used and dynamically updated after the consistency check

When the cluster is in use, the first message is sent to confirm that the network is available. If the network is normal, the message is successfully sent. The node does not wait for a response message until synchronization fails

  • prevResp records the last result sent, initially flase
  • pending records the number of unsent logs
  • Messages are sent when such as! If prevResp &&len (pending) is true, the last send is incomplete and subsequent messages are delayed
  • After a successful message is sent, prevResp is marked as true, and subsequent logs to be sent are sent directly
type ReplicaProgress struct {MatchIndex uint64 // Received logs NextIndex uint64 // Next sent logs pending []uint64 // prevResp unsent logs bool // Result of last log transmission maybeLostIndex uint64 // Logs that maybe lost, and are sent again after being sent incomplete}

The leader appends log records to the local computer and broadcasts them to the cluster

func (r *Raft) BroadcastAppendEntries() { r.cluster.Foreach(func(id uint64, _ *ReplicaProgress) { if id == r.id { return } r.SendAppendEntries(id) }) } func (r *Raft) SendAppendEntries(to uint64) { p := r.cluster.progress[to] if p == nil || p.IsPause() { return } nextIndex := r.cluster.GetNextIndex(to) lastLogIndex  := nextIndex - 1 lastLogTerm := r.raftlog.GetTerm(lastLogIndex) maxSize := MAX_LOG_ENTRY_SEND if ! p.prevResp { maxSize = 1 } // var entries []*pb.LogEntry entries := r.raftlog.GetEntries(nextIndex, maxSize) size := len(entries) if size > 0 { r.cluster.AppendEntry(to, entries[size-1].Index) } r.send(&pb.RaftMessage{ MsgType: pb.MessageType_APPEND_ENTRY, Term: r.currentTerm, From: r.id, To: to, LastLogIndex: lastLogIndex, LastLogTerm: lastLogTerm, LastCommit: r.raftlog.commitIndex, Entry: entries, }) }
  • Obtain the latest log number from the log, traverse the log to be added, and set the log number
  • Append logs to a memory slice
  • Update the leader append progress
  • Broadcast logs to the cluster
func (r *Raft) AppendEntry(entries []*pb.LogEntry) { lastLogIndex, _ := r.raftlog.GetLastLogIndexAndTerm() for i, entry := range entries { entry.Index = lastLogIndex + 1 + uint64(i) entry.Term = r.currentTerm } r.raftlog.AppendEntry(entries) r.cluster.UpdateLogIndex(r.id, entries[len(entries)-1].Index) r.BroadcastAppendEntries() } func (c *Cluster) UpdateLogIndex(id uint64, lastIndex uint64) { p := c.progress[id] if p ! = nil { p.NextIndex = lastIndex p.MatchIndex = lastIndex + 1 } }

The broadcast log is the same as the previous broadcast heartbeat. The broadcast log traverses the cluster and sends the information to each node

Check the sending status. If the last sending is not complete, postpone the sending

func (rp *ReplicaProgress) IsPause() bool { return (! rp.prevResp && len(rp.pending) > 0) }

Obtain the number of logs to be sent from the node synchronization progress

func (c *Cluster) GetNextIndex(id uint64) uint64 { p := c.progress[id] if p ! = nil { return p.NextIndex } return 0 }

The log to be sent is obtained from the leader log. Procedure

func (l *RaftLog) GetEntries(index uint64, maxSize int) []*pb.LogEntry {// The request log is submitted. Get from storage if index <= L.L.appliedIndex {endIndex := index + MAX_APPEND_ENTRY_SIZE if endIndex >= L.L.appliedIndex { endIndex = l.lastAppliedIndex + 1 } return l.storage.GetEntries(index, endIndex)} else {// Request log not submitted, get var entries from array []*pb.LogEntry for i, entry := range l.logEnties { if entry.Index == index { if len(l.logEnties)-i > maxSize { entries = l.logEnties[i : i+maxSize] } else { entries = l.logEnties[i:] } break } } return entries } }

Update the node sending progress, add one to the number of logs to be sent, and add the number of sent logs to the unsent slice

If the sending succeeds last time, assume that the sending succeeds this time. If the sending fails, roll back the sending progress

func (c *Cluster) AppendEntry(id uint64, lastIndex uint64) { p := c.progress[id] if p ! = nil { p.AppendEntry(lastIndex) } } func (rp *ReplicaProgress) AppendEntry(lastIndex uint64) { rp.pending = append(rp.pending, lastIndex) if rp.prevResp { rp.NextIndex = lastIndex + 1 } }

After a log is sent, a follower receives the log for processing

Perform a consistency check

  • If the log file is successfully added to the follower memory, the log file is marked as successfully added
  • Failed to check because conflicting logs have been processed in the consistency check, indicating that appending failed

Each log message contains the leader's submission progress. follower logs are submitted according to the leader's submission progress

The result of this append is answered by the leader

func (r *Raft) ReciveAppendEntries(mLeader, mTerm, mLastLogTerm, mLastLogIndex, mLastCommit uint64, mEntries []*pb.LogEntry) { var accept bool if ! R.aftlog. HasPrevLog(mLastLogIndex, mLastLogTerm) {// Check whether the node log is consistent with the leader log. R.loger. Infof(" The node does not contain the last appended log: Index: %d, Term: %d ", mLastLogIndex, mLastLogTerm) accept = false } else { r.raftlog.AppendEntry(mEntries) accept = true } lastLogIndex, lastLogTerm := r.raftlog.GetLastLogIndexAndTerm() r.raftlog.Apply(mLastCommit, lastLogIndex) r.send(&pb.RaftMessage{ MsgType: pb.MessageType_APPEND_ENTRY_RESP, Term: r.currentTerm, From: r.id, To: mLeader, LastLogIndex: lastLogIndex, LastLogTerm: lastLogTerm, Success: accept, }) }

The leader processes log addition responses from followers. The responses are classified into successful log addition responses and failed log addition responses

func (r *Raft) ReciveAppendEntriesResult(from, term, lastLogIndex uint64, success bool) { leaderLastLogIndex, _ := r.raftlog.GetLastLogIndexAndTerm() if success { r.cluster.AppendEntryResp(from, lastLogIndex) if lastLogIndex > r.raftlog.commitIndex {// Update the synchronized index to lastcommit if r.cluster.CheckCommit(lastLogIndex) { prevApplied := r.raftlog.lastAppliedIndex r.raftlog.Apply(lastLogIndex, lastLogIndex) r.BroadcastAppendEntries() } } else if len(r.raftlog.waitQueue) > 0 { r.raftlog.NotifyReadIndex() } if R.luster.GetNextIndex(from) <= leaderLastLogIndex {r.SendAppendEntries(from)}} else {r.logger.Infof(" Node %s failed to append logs, The Leader records the latest log of the node: %d, the latest log of the node: %d ", strconv.FormatUint(from, 16), r.cluster.GetNextIndex(from)-1, lastLogIndex) r.cluster.ResetLogIndex(from, lastLogIndex, leaderLastLogIndex) r.SendAppendEntries(from) } }

The log is appended successfully

Update the synchronization progress, update the node has received the progress, clear the sent part from the unsent completed slice, and mark the last sent successfully

func (c *Cluster) AppendEntryResp(id uint64, lastIndex uint64) { p := c.progress[id] if p ! = nil { p.AppendEntryResp(lastIndex) } } func (rp *ReplicaProgress) AppendEntryResp(lastIndex uint64) { if rp.MatchIndex  < lastIndex { rp.MatchIndex = lastIndex } idx := -1 for i, v := range rp.pending {if v == lastIndex {idx = i}} // Indicates that the last log is sent successfully. Update the next if! rp.prevResp {rp.prevResp = true rp.NextIndex = lastIndex +1} if idx > -1 {// Send rp.pending = rp.pending[idx+1:] }}

Check the data synchronization progress of followers and check whether the corresponding log numbers are synchronized on most nodes in the cluster

func (c *Cluster) CheckCommit(index uint64) bool {// incomingLogged is allowed to be logged only after a cluster reaches a majority consensus := 0 for id := range C.rogress {if index <= c.progress[id].MatchIndex { incomingLogged++ } } incomingCommit := incomingLogged >= len(c.progress)/2+1 return  incomingCommit }

When the cluster reaches a majority consensus, the log is submitted and the log continues to be broadcast

When the number of logs to be sent by the follower is smaller than the latest log of the leader, the follower continues to send logs

Failed to append logs

Reset the log synchronization progress according to the log response progress of the followers, marking the last failure to delay sending logs whose start log number is different from that of the followers until they are added correctly

func (c *Cluster) ResetLogIndex(id uint64, lastIndex uint64, leaderLastIndex uint64) { p := c.progress[id] if p ! = nil { p.ResetLogIndex(lastIndex, leaderLastIndex) } } func (rp *ReplicaProgress) ResetLogIndex(lastLogIndex uint64, leaderLastLogIndex uint64) {// Last log of a node is smaller than the latest log of the leader Update progress by node, if lastLogIndex < leaderLastLogIndex {rp.NextIndex = lastLogIndex + 1 rp.MatchIndex = lastLogIndex} else  { rp.NextIndex = leaderLastLogIndex + 1 rp.MatchIndex = leaderLastLogIndex } if rp.prevResp { rp.prevResp = false rp.pending = nil } }

Resend logs according to the synchronization progress after the update

Example Modify the raft creation function and add a storage port to its parameters

func NewRaft(id uint64, storage Storage, peers map[uint64]string, logger *zap.SugaredLogger) *Raft {
    raftlog := NewRaftLog(storage, logger)
    ...
}

raft log synchronization logic is basically implemented, and then the proposal method in raftNode is implemented to append logs. recv channel has been read in the raftNode main loop, raft message processing method is invoked, and proposals will be appended to logs when the leader is used. Currently, only proposal messages need to be added to the recv channel

Currently, after the leader adds the proposal to the read/write channel, the proposal is regarded as a write success. The leader responds to the client after the majority consensus of the cluster is not realized

To implement most post-response notifications, you can add a new structure including RaftMessage and a channel, add a waiting queue to raftlog, and when raft processes additional messages, return the log number of the last log entry to the proposal method through the channel. The proposal method then puts the channel into the raftlog waiting queue, submits the log to check the waiting queue to notify the object, and specifies that the log number has been submitted through the channel notification proposal method

func (n *RaftNode) Propose(ctx context.Context, entries []*pb.LogEntry) error {
    msg := &pb.RaftMessage{
        MsgType: pb.MessageType_PROPOSE,
        Term:    n.raft.currentTerm,
        Entry:   entries,
    }
    return n.Process(ctx, msg)
}

Modify the raftNode new function to add storage interface, storage implementation is implemented in the next lsm

func NewRaftNode(id uint64, storage Storage, peers map[uint64]string, logger *zap.SugaredLogger) *RaftNode { node := &RaftNode{ raft: NewRaft(id, storage, peers, logger), ... }... }

Modified the batch message sending method in raft server to combine multiple log records into one raft meassage for sending

func (p *Peer) SendBatch(msgs []*pb.RaftMessage) { p.wg.Add(1) var appEntryMsg *pb.RaftMessage var propMsg *pb.RaftMessage for _, msg := range msgs { if msg.MsgType == pb.MessageType_APPEND_ENTRY { if appEntryMsg == nil { appEntryMsg = msg } else { size := len(appEntryMsg.Entry) if size == 0 || len(msg.Entry) == 0 || appEntryMsg.Entry[size-1].Index+1 == msg.Entry[0].Index { appEntryMsg.LastCommit = msg.LastCommit appEntryMsg.Entry = append(appEntryMsg.Entry, msg.Entry...) } else if appEntryMsg.Entry[0].Index >= msg.Entry[0].Index { appEntryMsg = msg } } } else if msg.MsgType == pb.MessageType_PROPOSE { if propMsg == nil { propMsg = msg } else { propMsg.Entry = append(propMsg.Entry, msg.Entry...) } } else { p.send(msg) } } if appEntryMsg ! = nil { p.send(appEntryMsg) } if propMsg ! = nil { p.send(propMsg) } p.wg.Done() }

Through the above code, the process of proposal to the leader is realized, the leader is packaged as a log and synchronized to the cluster. Later, the log will be dropped through lsm and raft server will be used as a simple kv database.

Complete code

Reference: https://github.com/etcd-io/etcd

This article on the use of Go language to achieve Raft log synchronization is introduced to this, more related to Go Raft log synchronization content please search the script house previous articles or continue to browse the following related articles hope that you will support the script House in the future!

Related article

Latest comments