Skip to content

Commit

Permalink
raft: track in-progress log application in log
Browse files Browse the repository at this point in the history
This commit adds a mechanism to the raft log struct to track in progress
log application that has not yet completed.

For now, committed entries are immediately considered applied by
`raft.advance`. A future commit will make it possible to accept multiple
Ready structs without immediately applying committed entries from earlier
Ready structs.

This all works towards async Raft log writes, where log application is
decoupled from Ready iterations.

Signed-off-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
nvanbenschoten committed Dec 13, 2022
1 parent a9b1984 commit 7302ee6
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 32 deletions.
1 change: 1 addition & 0 deletions diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func mustTemp(pre, body string) string {
func ltoa(l *raftLog) string {
s := fmt.Sprintf("lastIndex: %d\n", l.lastIndex())
s += fmt.Sprintf("applied: %d\n", l.applied)
s += fmt.Sprintf("applying: %d\n", l.applying)
for i, e := range l.allEntries() {
s += fmt.Sprintf("#%d: %+v\n", i, e)
}
Expand Down
31 changes: 22 additions & 9 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ type raftLog struct {
// committed is the highest log position that is known to be in
// stable storage on a quorum of nodes.
committed uint64
// applying is the highest log position that the application has
// been instructed to apply to its state machine. Some of these
// entries may be in the process of applying and have not yet
// reached applied.
// Invariant: applied <= applying && applying <= committed
applying uint64
// applied is the highest log position that the application has
// been instructed to apply to its state machine.
// successfully applied to its state machine.
// Invariant: applied <= committed
applied uint64

Expand Down Expand Up @@ -76,13 +82,14 @@ func newLogWithSize(storage Storage, logger Logger, maxNextCommittedEntsSize uin
// Initialize our committed and applied pointers to the time of the last compaction.
log.committed = firstIndex - 1
log.applied = firstIndex - 1
log.applying = firstIndex - 1

return log
}

func (l *raftLog) String() string {
return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, unstable.offsetInProgress=%d, len(unstable.Entries)=%d",
l.committed, l.applied, l.unstable.offset, l.unstable.offsetInProgress, len(l.unstable.entries))
return fmt.Sprintf("committed=%d, applied=%d, applying=%d, unstable.offset=%d, unstable.offsetInProgress=%d, len(unstable.Entries)=%d",
l.committed, l.applied, l.applying, l.unstable.offset, l.unstable.offsetInProgress, len(l.unstable.entries))
}

// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
Expand Down Expand Up @@ -192,8 +199,9 @@ func (l *raftLog) nextCommittedEnts() (ents []pb.Entry) {
// See comment in hasNextCommittedEnts.
return nil
}
if l.committed > l.applied {
lo, hi := l.applied+1, l.committed+1 // [lo, hi)
if l.committed > l.applying {
lo, hi := l.applying+1, l.committed+1 // [lo, hi)
// TODO: handle pagination correctly.
ents, err := l.slice(lo, hi, l.maxNextCommittedEntsSize)
if err != nil {
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
Expand All @@ -212,7 +220,7 @@ func (l *raftLog) hasNextCommittedEnts() bool {
// first.
return false
}
return l.committed > l.applied
return l.committed > l.applying
}

// nextUnstableSnapshot returns the snapshot, if present, that is available to
Expand Down Expand Up @@ -273,13 +281,18 @@ func (l *raftLog) commitTo(tocommit uint64) {
}

func (l *raftLog) appliedTo(i uint64) {
if i == 0 {
return
}
if l.committed < i || i < l.applied {
l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
}
l.applied = i
l.applying = max(l.applying, i)
}

func (l *raftLog) acceptApplying(i uint64) {
if l.committed < i {
l.logger.Panicf("applying(%d) is out of range [prevApplying(%d), committed(%d)]", i, l.applying, l.committed)
}
l.applying = i
}

func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
Expand Down
35 changes: 21 additions & 14 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,15 +308,18 @@ func TestHasNextCommittedEnts(t *testing.T) {
}
tests := []struct {
applied uint64
applying uint64
snap bool
whasNext bool
}{
{applied: 0, snap: false, whasNext: true},
{applied: 3, snap: false, whasNext: true},
{applied: 4, snap: false, whasNext: true},
{applied: 5, snap: false, whasNext: false},
{applied: 3, applying: 3, snap: false, whasNext: true},
{applied: 3, applying: 4, snap: false, whasNext: true},
{applied: 3, applying: 5, snap: false, whasNext: false},
{applied: 4, applying: 4, snap: false, whasNext: true},
{applied: 4, applying: 5, snap: false, whasNext: false},
{applied: 5, applying: 5, snap: false, whasNext: false},
// With snapshot.
{applied: 3, snap: true, whasNext: false},
{applied: 3, applying: 3, snap: true, whasNext: false},
}
for i, tt := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
Expand All @@ -327,6 +330,7 @@ func TestHasNextCommittedEnts(t *testing.T) {
raftLog.append(ents...)
raftLog.maybeCommit(5, 1)
raftLog.appliedTo(tt.applied)
raftLog.acceptApplying(tt.applying)
if tt.snap {
newSnap := snap
newSnap.Metadata.Index++
Expand All @@ -347,16 +351,19 @@ func TestNextCommittedEnts(t *testing.T) {
{Term: 1, Index: 6},
}
tests := []struct {
applied uint64
snap bool
wents []pb.Entry
applied uint64
applying uint64
snap bool
wents []pb.Entry
}{
{applied: 0, snap: false, wents: ents[:2]},
{applied: 3, snap: false, wents: ents[:2]},
{applied: 4, snap: false, wents: ents[1:2]},
{applied: 5, snap: false, wents: nil},
{applied: 3, applying: 3, snap: false, wents: ents[:2]},
{applied: 3, applying: 4, snap: false, wents: ents[1:2]},
{applied: 3, applying: 5, snap: false, wents: nil},
{applied: 4, applying: 4, snap: false, wents: ents[1:2]},
{applied: 4, applying: 5, snap: false, wents: nil},
{applied: 5, applying: 5, snap: false, wents: nil},
// With snapshot.
{applied: 3, snap: true, wents: nil},
{applied: 3, applying: 3, snap: true, wents: nil},
}
for i, tt := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
Expand All @@ -367,14 +374,14 @@ func TestNextCommittedEnts(t *testing.T) {
raftLog.append(ents...)
raftLog.maybeCommit(5, 1)
raftLog.appliedTo(tt.applied)
raftLog.acceptApplying(tt.applying)
if tt.snap {
newSnap := snap
newSnap.Metadata.Index++
raftLog.restore(newSnap)
}
require.Equal(t, tt.wents, raftLog.nextCommittedEnts())
})

}
}

Expand Down
7 changes: 6 additions & 1 deletion rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,13 @@ func (rn *RawNode) acceptReady(rd Ready) {
}
rn.raft.msgs = nil
// NB: this does not do anything yet, as entries and snapshots are always
// stabilized on the next Advance.
// stabilized on the next Advance and committed entries are always applied
// by the next Advance.
rn.raft.raftLog.acceptUnstable()
if len(rd.CommittedEntries) > 0 {
ents := rd.CommittedEntries
rn.raft.raftLog.acceptApplying(ents[len(ents)-1].Index)
}
}

// HasReady called when RawNode user need to check if any Ready pending.
Expand Down
2 changes: 1 addition & 1 deletion testdata/confchange_v1_add_single.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ stabilize
1->2 MsgSnap Term:1 Log:0/0 Snapshot: Index:4 Term:1 ConfState:Voters:[1 2] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
> 2 receiving messages
1->2 MsgSnap Term:1 Log:0/0 Snapshot: Index:4 Term:1 ConfState:Voters:[1 2] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
INFO log [committed=0, applied=0, unstable.offset=1, unstable.offsetInProgress=1, len(unstable.Entries)=0] starts to restore snapshot [index: 4, term: 1]
INFO log [committed=0, applied=0, applying=0, unstable.offset=1, unstable.offsetInProgress=1, len(unstable.Entries)=0] starts to restore snapshot [index: 4, term: 1]
INFO 2 switched to configuration voters=(1 2)
INFO 2 [commit: 4, lastindex: 4, lastterm: 1] restored snapshot [index: 4, term: 1]
INFO 2 [commit: 4] restored snapshot [index: 4, term: 1]
Expand Down
4 changes: 2 additions & 2 deletions testdata/confchange_v2_add_double_auto.txt
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ stabilize 1 2
1->2 MsgSnap Term:1 Log:0/0 Snapshot: Index:4 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[1] Learners:[] LearnersNext:[] AutoLeave:true
> 2 receiving messages
1->2 MsgSnap Term:1 Log:0/0 Snapshot: Index:4 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[1] Learners:[] LearnersNext:[] AutoLeave:true
INFO log [committed=0, applied=0, unstable.offset=1, unstable.offsetInProgress=1, len(unstable.Entries)=0] starts to restore snapshot [index: 4, term: 1]
INFO log [committed=0, applied=0, applying=0, unstable.offset=1, unstable.offsetInProgress=1, len(unstable.Entries)=0] starts to restore snapshot [index: 4, term: 1]
INFO 2 switched to configuration voters=(1 2 3)&&(1) autoleave
INFO 2 [commit: 4, lastindex: 4, lastterm: 1] restored snapshot [index: 4, term: 1]
INFO 2 [commit: 4] restored snapshot [index: 4, term: 1]
Expand Down Expand Up @@ -176,7 +176,7 @@ stabilize 1 3
1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:5 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
> 3 receiving messages
1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:5 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
INFO log [committed=0, applied=0, unstable.offset=1, unstable.offsetInProgress=1, len(unstable.Entries)=0] starts to restore snapshot [index: 5, term: 1]
INFO log [committed=0, applied=0, applying=0, unstable.offset=1, unstable.offsetInProgress=1, len(unstable.Entries)=0] starts to restore snapshot [index: 5, term: 1]
INFO 3 switched to configuration voters=(1 2 3)
INFO 3 [commit: 5, lastindex: 5, lastterm: 1] restored snapshot [index: 5, term: 1]
INFO 3 [commit: 5] restored snapshot [index: 5, term: 1]
Expand Down
2 changes: 1 addition & 1 deletion testdata/confchange_v2_add_double_implicit.txt
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ stabilize 1 2
1->2 MsgSnap Term:1 Log:0/0 Snapshot: Index:4 Term:1 ConfState:Voters:[1 2] VotersOutgoing:[1] Learners:[] LearnersNext:[] AutoLeave:true
> 2 receiving messages
1->2 MsgSnap Term:1 Log:0/0 Snapshot: Index:4 Term:1 ConfState:Voters:[1 2] VotersOutgoing:[1] Learners:[] LearnersNext:[] AutoLeave:true
INFO log [committed=0, applied=0, unstable.offset=1, unstable.offsetInProgress=1, len(unstable.Entries)=0] starts to restore snapshot [index: 4, term: 1]
INFO log [committed=0, applied=0, applying=0, unstable.offset=1, unstable.offsetInProgress=1, len(unstable.Entries)=0] starts to restore snapshot [index: 4, term: 1]
INFO 2 switched to configuration voters=(1 2)&&(1) autoleave
INFO 2 [commit: 4, lastindex: 4, lastterm: 1] restored snapshot [index: 4, term: 1]
INFO 2 [commit: 4] restored snapshot [index: 4, term: 1]
Expand Down
2 changes: 1 addition & 1 deletion testdata/confchange_v2_add_single_auto.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ stabilize
1->2 MsgSnap Term:1 Log:0/0 Snapshot: Index:4 Term:1 ConfState:Voters:[1 2] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
> 2 receiving messages
1->2 MsgSnap Term:1 Log:0/0 Snapshot: Index:4 Term:1 ConfState:Voters:[1 2] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
INFO log [committed=0, applied=0, unstable.offset=1, unstable.offsetInProgress=1, len(unstable.Entries)=0] starts to restore snapshot [index: 4, term: 1]
INFO log [committed=0, applied=0, applying=0, unstable.offset=1, unstable.offsetInProgress=1, len(unstable.Entries)=0] starts to restore snapshot [index: 4, term: 1]
INFO 2 switched to configuration voters=(1 2)
INFO 2 [commit: 4, lastindex: 4, lastterm: 1] restored snapshot [index: 4, term: 1]
INFO 2 [commit: 4] restored snapshot [index: 4, term: 1]
Expand Down
2 changes: 1 addition & 1 deletion testdata/confchange_v2_add_single_explicit.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ stabilize 1 2
1->2 MsgSnap Term:1 Log:0/0 Snapshot: Index:4 Term:1 ConfState:Voters:[1 2] VotersOutgoing:[1] Learners:[] LearnersNext:[] AutoLeave:false
> 2 receiving messages
1->2 MsgSnap Term:1 Log:0/0 Snapshot: Index:4 Term:1 ConfState:Voters:[1 2] VotersOutgoing:[1] Learners:[] LearnersNext:[] AutoLeave:false
INFO log [committed=0, applied=0, unstable.offset=1, unstable.offsetInProgress=1, len(unstable.Entries)=0] starts to restore snapshot [index: 4, term: 1]
INFO log [committed=0, applied=0, applying=0, unstable.offset=1, unstable.offsetInProgress=1, len(unstable.Entries)=0] starts to restore snapshot [index: 4, term: 1]
INFO 2 switched to configuration voters=(1 2)&&(1)
INFO 2 [commit: 4, lastindex: 4, lastterm: 1] restored snapshot [index: 4, term: 1]
INFO 2 [commit: 4] restored snapshot [index: 4, term: 1]
Expand Down
2 changes: 1 addition & 1 deletion testdata/confchange_v2_replace_leader.txt
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ stabilize
1->4 MsgSnap Term:1 Log:0/0 Snapshot: Index:4 Term:1 ConfState:Voters:[2 3 4] VotersOutgoing:[1 2 3] Learners:[] LearnersNext:[] AutoLeave:false
> 4 receiving messages
1->4 MsgSnap Term:1 Log:0/0 Snapshot: Index:4 Term:1 ConfState:Voters:[2 3 4] VotersOutgoing:[1 2 3] Learners:[] LearnersNext:[] AutoLeave:false
INFO log [committed=0, applied=0, unstable.offset=1, unstable.offsetInProgress=1, len(unstable.Entries)=0] starts to restore snapshot [index: 4, term: 1]
INFO log [committed=0, applied=0, applying=0, unstable.offset=1, unstable.offsetInProgress=1, len(unstable.Entries)=0] starts to restore snapshot [index: 4, term: 1]
INFO 4 switched to configuration voters=(2 3 4)&&(1 2 3)
INFO 4 [commit: 4, lastindex: 4, lastterm: 1] restored snapshot [index: 4, term: 1]
INFO 4 [commit: 4] restored snapshot [index: 4, term: 1]
Expand Down
2 changes: 1 addition & 1 deletion testdata/snapshot_succeed_via_app_resp.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ stabilize 3
----
> 3 receiving messages
1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
INFO log [committed=0, applied=0, unstable.offset=1, unstable.offsetInProgress=1, len(unstable.Entries)=0] starts to restore snapshot [index: 11, term: 1]
INFO log [committed=0, applied=0, applying=0, unstable.offset=1, unstable.offsetInProgress=1, len(unstable.Entries)=0] starts to restore snapshot [index: 11, term: 1]
INFO 3 switched to configuration voters=(1 2 3)
INFO 3 [commit: 11, lastindex: 11, lastterm: 1] restored snapshot [index: 11, term: 1]
INFO 3 [commit: 11] restored snapshot [index: 11, term: 1]
Expand Down

0 comments on commit 7302ee6

Please sign in to comment.