jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

etcd中的raft实现

作者:jasper | 分类:分布式 | 标签:     | 阅读 27 次 | 发布:2018-02-13 4:50 p.m.

在之前的一篇文章中我们了解了怎么使用ectd的raft的库来实现一个简单的分布式存储,但是只看了应用端对raft的调用以及周边,但是对于raft的库的内部没有做涉及,那么这篇文章我们就深入到raft内部看看其实现的细节。

提出问题

在看etcd里面的raft的实现之前,你首先需要对raft的原理有个大致的了解,raft consensus algorithm这篇raft的论文可以过一遍,另外这个raft最经典动画演示可以给你一个最直观的感受。

但是去看源码实现的时候往往会找不到思绪,那么我们可以带着问题去找答案,这样会更快速地理清思路。这里先提出这么几个问题:

  • leader是怎么选举的?
  • log的复制怎么实现的?
  • 如何处理节点变更(增加,减少)?

那么下面我们就从源码中来分别回答上面的几个问题。

回答问题

在回答这些问题之前,我们先来给出一个raft的node的数据结构:

type node struct {
    propc      chan pb.Message
    recvc      chan pb.Message
    confc      chan pb.ConfChange
    confstatec chan pb.ConfState
    readyc     chan Ready
    advancec   chan struct{}
    tickc      chan struct{}
    done       chan struct{}
    stop       chan struct{}
    status     chan chan Status
}   

这里有很多的channel,在raft的实现其实就是通过这些channel来传递各种消息,这些channel我们下面都会一一碰到。

1. leader的选举

当初始化一个raft的node实例之后,该node一开始都是follower,并且其term为1:

r := newRaft(c)
r.becomeFollower(1, None)

func (r *raft) becomeFollower(term uint64, lead uint64) {
    r.step = stepFollower
    r.reset(term)
    r.tick = r.tickElection
    r.lead = lead
    r.state = StateFollower
    r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}

其中当前的step为stepFollower,tick为tickElection,这两个是两个函数,后面会用到。初始化好了之后,一个node开始运行起来,分别会监听上面node里面提到的那些个channel。

在最开始的时候最先触发的是tickc这个channel,因为应用里面会设置一个heartbeat,在tickElection里面的Step是开始选举的入口:

func (r *raft) tickElection() {
    r.electionElapsed++

    if r.promotable() && r.pastElectionTimeout() {
        r.electionElapsed = 0
        r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
    }
}

消息的类型为MsgHup的,会发起一个campaignElection的campaign,在campaign里面将自身设置为candidate,并递增currentTerm:

r.campaign(campaignElection)

func (r *raft) campaign(t CampaignType) {
        r.becomeCandidate()
        voteMsg = pb.MsgVote
        term = r.Term
}       

func (r *raft) becomeCandidate() {  
   r.step = stepCandidate
    r.reset(r.Term + 1)
    r.tick = r.tickElection
    r.Vote = r.id
    r.state = StateCandidate
    r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}

接下来candidate就会向所有的peers来发送一个投票的请求,意在告诉其他的节点当前我的term是多少:

for id := range r.prs {     
    var ctx []byte
    if t == campaignTransfer {
        ctx = []byte(t)
    }
    r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}

在send函数之中,会把message放到raft的msgs之中:r.msgs = append(r.msgs, m),那么这个msgs在哪里消费的呢,答案就是在Read中:

rd := Ready{
    Entries:          r.raftLog.unstableEntries(),
    CommittedEntries: r.raftLog.nextEnts(),
    Messages:         r.msgs,
}

所以现在我们可以回到上文中提到的node里面说到的几种channel了,现在就是readyc要开始干活了:

case readyc <- rd:
    r.msgs = nil
    r.readStates = nil
    advancec = n.advancec

msgs因为已经读取过了,设置为空,并且会赋值advancec,进行到这readyc中已经有一个数据这个数据是一个Ready对象,下面接着看谁会去读Ready,答案是应用程序自己去读取:

func (n *node) Ready() <-chan Ready { return n.readyc }

读取到的Ready里面包含了Vote消息,应用层会调用网络层发送消息出去,并且调用Advance(),其他node接收到网络层消息后,会调用Step()函数,来比较接收到的term和自己的term 的大小,如果接收到的比较大,那么就把自己置为follower,term和接收到的消息里面的term保持一致,然后返回一个voteRespMsg,表示为其投票:

r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})

如果自己的term比接收到的大呢,Step()就会直接返回一个空。

voteRespMsg的返回信息被之前的发送方接收到了之后,就会计算收到的选票数目是否大于所有node的一般,如果大于则自己成为leader,否则将自己置为follower:

case myVoteRespType:
    gr := r.poll(m.From, m.Type, !m.Reject)
    switch r.quorum() {
    case gr:
        if r.state == StatePreCandidate {
            r.campaign(campaignElection)
        } else {
            r.becomeLeader()
            r.bcastAppend()
        }
    case len(r.votes) - gr:
        r.becomeFollower(r.Term, None)
    }

在成为leader之后,和上面的两个角色一样的,最重要的是step被置为了stepLeader,具体stepLeader中涉及到的一些操作,更多的是下一个问题会用到,这里就不多说了。

func (r *raft) becomeLeader() {
    r.step = stepLeader
}

2. log复制的实现

在raft协议之中,对于整个集群的所有变更都必须通过leader来发起;例如我们要同时更新每个节点的某个数据,入口在node.Propose:

func (n *node) Propose(ctx context.Context, data []byte) error {
    return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

很眼熟是不是,又到了step函数了,我们先来看看follower处理MsgProp的情况,在stepFollower里面可以看到:

case pb.MsgProp:
        m.To = r.lead
        r.send(m)

将消息发送的目标设为当前leader,然后发送过去,leader在接收到数据之后,会将数据中的Entry做如下处理:

case pb.MsgProp:
    r.appendEntry(m.Entries...)
    r.bcastAppend()

appendEntry是把数据放到leader的raftLog之中,但是不会把这份数据commit掉,虽然会调用maybeCommit但是现在并不满足commit的条件,接着通过RPC将数据广播给除了自身的其他所有的节点:

func (r *raft) bcastAppend() {
    r.forEachProgress(func(id uint64, _ *Progress) {
        if id == r.id {
            return
        }
        r.sendAppend(id)
    })
}

func (r *raft) sendAppend(to uint64) {
    m.Type = pb.MsgApp
    m.Index = pr.Next - 1
    m.LogTerm = term
    m.Entries = ents
    m.Commit = r.raftLog.committed
    r.send(m)   
}

接下来我们来看看follower接收到这个请求之后,会做一些什么处理:

case pb.MsgApp:
    r.electionElapsed = 0
    r.lead = m.From
    r.handleAppendEntries(m)

细看handleAppendEntries函数,就是把数据更新到自己的raftlog之中。

func (r *raft) handleAppendEntries(m pb.Message) {
    if m.Index < r.raftLog.committed {
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
        return
    }

    if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
    } else {
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
    }
}

如果数据的Index小于committed,则说明当前数据比发送的更加新,所以直接把自己的commited返回给leader,不然就更新raftlog中的数据maybeAppend,更新成功则返回给leader最新的index,否则会返回给leader一个Reject的标识。

在leader接收到follower返回的MsgAppResp之后:

case pb.MsgAppResp:
    if m.Reject {
        if pr.maybeDecrTo(m.Index, m.RejectHint) {
                r.sendAppend(m.From)
        }
    }else{
        if pr.maybeUpdate(m.Index) {
            if r.maybeCommit() {
                    r.bcastAppend()
                }else{
                    r.sendAppend(m.From)
                }
        }
    }       

主要的逻辑如上所示,如果被Reject,那么就直接返回给leader。如果没有被follower Reject,那么这时候就会继续调用之前我们说的maybeCommit,那么在这里是可能被commit的:

func (r *raft) maybeCommit() bool {
    mis := make(uint64Slice, 0, len(r.prs))
    for _, p := range r.prs {
        mis = append(mis, p.Match)
    }
    sort.Sort(sort.Reverse(mis))
    mci := mis[r.quorum()-1]
    return r.raftLog.maybeCommit(mci, r.Term)
}

func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
    if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {
        l.commitTo(maxIndex)
        return true
    }
    return false
}

具体的逻辑就是检查各个peer的matchIndex,如果有大于一半的peer是match的,才会真正commit。如果这里已经commit了,则会再向所有的节点广播这个消息,当follower接收到这个消息之后,同样的也才会真正commit本地的raftlog:

func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
    if l.matchTerm(index, logTerm) {
        lastnewi = index + uint64(len(ents))
        l.commitTo(min(committed, lastnewi))
    }
}   

看到这里我们就知道了,是不是有些类似于二阶段提交呢。可能有些朋友会问了,更新状态机的逻辑呢?状态机这部分的逻辑其实在用户代码里面调用node.Advance()之后,节点才会做apply去更新状态机:

case <-advancec:
        if prevHardSt.Commit != 0 {
            r.raftLog.appliedTo(prevHardSt.Commit)
        }

func (l *raftLog) appliedTo(i uint64) {
    l.applied = i
}       

到此为止,整个的log replication才会真正介绍。

3. 节点变更

从etcd-raft的架构来看,节点变更功能的实现需要应用和底层核心协议处理层互相配合。客户端发起节点增加或移除的命令,应用获得该请求,并将其转换为一个节点变更指令交给底层的raft协议核心处理层,并且一定是要leader来发起。

节点变更的信息在数据结构ConfChange中:

type ConfChange struct {
    ID               uint64  
    Type             ConfChangeType
    NodeID           uint64  
    Context          []byte
    XXX_unrecognized []byte   
}

NodeID为变更节点的ID,Type可以为ConfChangeAddNode,ConfChangeRemoveNode,'ConfChangeUpdateNode',ConfChangeAddLearnerNode。这里我们先只说前两种。

对于添加节点,节点启动之后首先是成为follower,然后在初始log里面对所有的peers添加一个ConfChangeAddNode的entry:

for _, peer := range peers {
        cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
        d, err := cc.Marshal()
        e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
        r.raftLog.append(e)
}       

然后会apply它们,主要是为了让应用程序在StartNode之后可以调用Campaign。请注意这些节点将被添加到raft两次:在这里,当应用程序的就绪循环调用ApplyConfChange。addNode的调用必须在raftLog.append之后。我们不设置raftLog.applied,因此应用程序将能够通过Ready.CommittedEntries观察所有的conf变化。

r.raftLog.committed = r.raftLog.lastIndex()
for _, peer := range peers {
    r.addNode(peer.ID)
}

调用的其实是r.addNodeOrLearnerNode(id, false)方法:

func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
    r.pendingConf = false
    pr := r.getProgress(id)
    if pr == nil {
        r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
    }
}   

在raft协议核心处理层,增加节点便是为其分配一个Progress结构,通过该结构追踪对端节点的运行状态:

r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}

再来看看移除一个节点的时候的逻辑,其他的大体都一样,由客户端发起一个移除节点的指令ConfChangeRemoveNode,收到之后调用的是'removeNode'的方法:

func (r *raft) removeNode(id uint64) {
    r.delProgress(id)
    r.pendingConf = false

    if len(r.prs) == 0 && len(r.learnerPrs) == 0 {
        return
    }

    if r.maybeCommit() {
        r.bcastAppend()
    }

    if r.state == StateLeader && r.leadTransferee == id {
        r.abortLeaderTransfer()
    }
}

这里移除该节点的Progress结构,而且还调用了maybeCommit,这是什么道理呢?因为节点移除一个之后,由于quorum变少而导致的某些之前pending的日志项可以被Commit。

总结

关于etcd-raft的核心的内容这里就主要说这三个点;可以看到在etcd中都是通过channel来传递不同的消息,而消息的结构也是由raft库定义好。而对于如WAL、SNAPSHOT、网络传输等都留给了使用该库的应用程序来实现,而etcdServer就是这样的一个应用程序,这些内容年后有时间我们再来探讨!


转载请注明出处:http://www.opscoder.info/ectd-raft-library.html

【上一篇】 深入Golang之context
【下一篇】 没有了
其他分类: