Skip to content

Commit

Permalink
raft: specify voters and learners via snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
sakateka committed Jan 16, 2021
1 parent ff43025 commit 027bdb5
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 251 deletions.
4 changes: 2 additions & 2 deletions node_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func BenchmarkOneNode(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
s := newTestMemoryStorage(withPeers(1))
rn := newTestRawNode(1, 10, 1, s)
n := newNode(rn)
go n.run()

Expand Down
52 changes: 26 additions & 26 deletions node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func TestNodePropose(t *testing.T) {
return nil
}

s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
s := newTestMemoryStorage(withPeers(1))
rn := newTestRawNode(1, 10, 1, s)
n := newNode(rn)
r := rn.raft
go n.run()
Expand Down Expand Up @@ -173,8 +173,8 @@ func TestNodeReadIndex(t *testing.T) {
}
wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}

s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
s := newTestMemoryStorage(withPeers(1))
rn := newTestRawNode(1, 10, 1, s)
n := newNode(rn)
r := rn.raft
r.readStates = wrs
Expand Down Expand Up @@ -215,9 +215,9 @@ func TestNodeReadIndex(t *testing.T) {
// TestDisableProposalForwarding ensures that proposals are not forwarded to
// the leader when DisableProposalForwarding is true.
func TestDisableProposalForwarding(t *testing.T) {
r1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
r2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
cfg3 := newTestConfig(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
r1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
cfg3 := newTestConfig(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
cfg3.DisableProposalForwarding = true
r3 := newRaft(cfg3)
nt := newNetwork(r1, r2, r3)
Expand Down Expand Up @@ -247,9 +247,9 @@ func TestDisableProposalForwarding(t *testing.T) {
// TestNodeReadIndexToOldLeader ensures that raftpb.MsgReadIndex to old leader
// gets forwarded to the new leader and 'send' method does not attach its term.
func TestNodeReadIndexToOldLeader(t *testing.T) {
r1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
r2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
r3 := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
r1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
r3 := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))

nt := newNetwork(r1, r2, r3)

Expand Down Expand Up @@ -312,8 +312,8 @@ func TestNodeProposeConfig(t *testing.T) {
return nil
}

s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
s := newTestMemoryStorage(withPeers(1))
rn := newTestRawNode(1, 10, 1, s)
n := newNode(rn)
r := rn.raft
go n.run()
Expand Down Expand Up @@ -351,8 +351,8 @@ func TestNodeProposeConfig(t *testing.T) {
// TestNodeProposeAddDuplicateNode ensures that two proposes to add the same node should
// not affect the later propose to add new node.
func TestNodeProposeAddDuplicateNode(t *testing.T) {
s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
s := newTestMemoryStorage(withPeers(1))
rn := newTestRawNode(1, 10, 1, s)
n := newNode(rn)
go n.run()
n.Campaign(context.TODO())
Expand Down Expand Up @@ -427,7 +427,7 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
// know who is the current leader; node will accept proposal when it knows
// who is the current leader.
func TestBlockProposal(t *testing.T) {
rn := newTestRawNode(1, []uint64{1}, 10, 1, NewMemoryStorage())
rn := newTestRawNode(1, 10, 1, newTestMemoryStorage(withPeers(1)))
n := newNode(rn)
go n.run()
defer n.Stop()
Expand Down Expand Up @@ -467,8 +467,8 @@ func TestNodeProposeWaitDropped(t *testing.T) {
return nil
}

s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
s := newTestMemoryStorage(withPeers(1))
rn := newTestRawNode(1, 10, 1, s)
n := newNode(rn)
r := rn.raft
go n.run()
Expand Down Expand Up @@ -502,8 +502,8 @@ func TestNodeProposeWaitDropped(t *testing.T) {
// TestNodeTick ensures that node.Tick() will increase the
// elapsed of the underlying raft state machine.
func TestNodeTick(t *testing.T) {
s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
s := newTestMemoryStorage(withPeers(1))
rn := newTestRawNode(1, 10, 1, s)
n := newNode(rn)
r := rn.raft
go n.run()
Expand All @@ -523,7 +523,7 @@ func TestNodeTick(t *testing.T) {
// TestNodeStop ensures that node.Stop() blocks until the node has stopped
// processing, and that it is idempotent
func TestNodeStop(t *testing.T) {
rn := newTestRawNode(1, []uint64{1}, 10, 1, NewMemoryStorage())
rn := newTestRawNode(1, 10, 1, newTestMemoryStorage(withPeers(1)))
n := newNode(rn)
donec := make(chan struct{})

Expand Down Expand Up @@ -813,8 +813,8 @@ func TestIsHardStateEqual(t *testing.T) {
func TestNodeProposeAddLearnerNode(t *testing.T) {
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
s := newTestMemoryStorage(withPeers(1))
rn := newTestRawNode(1, 10, 1, s)
n := newNode(rn)
go n.run()
n.Campaign(context.TODO())
Expand Down Expand Up @@ -907,8 +907,8 @@ func TestAppendPagination(t *testing.T) {
}

func TestCommitPagination(t *testing.T) {
s := NewMemoryStorage()
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
s := newTestMemoryStorage(withPeers(1))
cfg := newTestConfig(1, 10, 1, s)
cfg.MaxCommittedSizePerReady = 2048
rn, err := NewRawNode(cfg)
if err != nil {
Expand Down Expand Up @@ -973,7 +973,7 @@ func (s *ignoreSizeHintMemStorage) Entries(lo, hi uint64, maxSize uint64) ([]raf
// This wouldn't need to exploit anything about Raft-internal code paths to fail.
func TestNodeCommitPaginationAfterRestart(t *testing.T) {
s := &ignoreSizeHintMemStorage{
MemoryStorage: NewMemoryStorage(),
MemoryStorage: newTestMemoryStorage(withPeers(1)),
}
persistedHardState := raftpb.HardState{
Term: 1,
Expand All @@ -996,7 +996,7 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) {
size += uint64(ent.Size())
}

cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
cfg := newTestConfig(1, 10, 1, s)
// Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should
// not be included in the initial rd.CommittedEntries. However, our storage will ignore
// this and *will* return it (which is how the Commit index ended up being 10 initially).
Expand Down
22 changes: 0 additions & 22 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,6 @@ type Config struct {
// ID is the identity of the local raft. ID cannot be 0.
ID uint64

// peers contains the IDs of all nodes (including self) in the raft cluster. It
// should only be set when starting a new raft cluster. Restarting raft from
// previous configuration will panic if peers is set. peer is private and only
// used for testing right now.
peers []uint64

// learners contains the IDs of all learner nodes (including self if the
// local node is a learner) in the raft cluster. learners only receives
// entries from the leader node. It does not vote or promote itself.
learners []uint64

// ElectionTick is the number of Node.Tick invocations that must pass between
// elections. That is, if a follower does not receive any message from the
// leader of current term before ElectionTick has elapsed, it will become
Expand Down Expand Up @@ -330,17 +319,6 @@ func newRaft(c *Config) *raft {
panic(err) // TODO(bdarnell)
}

if len(c.peers) > 0 || len(c.learners) > 0 {
if len(cs.Voters) > 0 || len(cs.Learners) > 0 {
// TODO(bdarnell): the peers argument is always nil except in
// tests; the argument should be removed and these tests should be
// updated to specify their nodes through a snapshot.
panic("cannot specify both newRaft(peers, learners) and ConfState.(Voters, Learners)")
}
cs.Voters = c.peers
cs.Learners = c.learners
}

r := &raft{
id: c.ID,
lead: None,
Expand Down
6 changes: 3 additions & 3 deletions raft_flow_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// 2. when the window is full, no more msgApp can be sent.

func TestMsgAppFlowControlFull(t *testing.T) {
r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
r.becomeCandidate()
r.becomeLeader()

Expand Down Expand Up @@ -61,7 +61,7 @@ func TestMsgAppFlowControlFull(t *testing.T) {
// 1. valid msgAppResp.index moves the windows to pass all smaller or equal index.
// 2. out-of-dated msgAppResp has no effect on the sliding window.
func TestMsgAppFlowControlMoveForward(t *testing.T) {
r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
r.becomeCandidate()
r.becomeLeader()

Expand Down Expand Up @@ -106,7 +106,7 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
// TestMsgAppFlowControlRecvHeartbeat ensures a heartbeat response
// frees one slot if the window is full.
func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
r.becomeCandidate()
r.becomeLeader()

Expand Down
Loading

0 comments on commit 027bdb5

Please sign in to comment.