Skip to content

Commit

Permalink
feat: better rollback mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Mar 25, 2023
1 parent fc52e16 commit 87bf00d
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 46 deletions.
65 changes: 59 additions & 6 deletions dcp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Client interface {
GetXattrs(scopeName string, collectionName string, id []byte, path string) ([]byte, error)
CreatePath(ctx context.Context, scopeName string, collectionName string, id []byte, path []byte, value interface{}) error
Get(ctx context.Context, scopeName string, collectionName string, id []byte) ([]byte, error)
ObserveVbID(vbID uint16, vbUUID gocbcore.VbUUID) (*gocbcore.ObserveVbResult, error)
}

type client struct {
Expand Down Expand Up @@ -361,10 +362,36 @@ func (s *client) GetVBucketUUIDMap(vbIds []uint16) (map[uint16]gocbcore.VbUUID,

func (s *client) openStreamWithRollback(vbID uint16,
vbUUID gocbcore.VbUUID,
seqNo gocbcore.SeqNo,
failedSeqNo gocbcore.SeqNo,
observer Observer,
openStreamOptions gocbcore.OpenStreamOptions,
) error {
observeResult, err := s.ObserveVbID(vbID, vbUUID)
if err != nil {
return err
}

persistReqNo := observeResult.PersistSeqNo

if persistReqNo >= failedSeqNo {
err := errors.New("failed seq no is less than persist seq no")
logger.ErrorLog.Printf("open stream with rollback, vbID: %d, vbUUID: %d, failedSeqNo: %d, persistReqNo: %d, err: %v",
vbID, vbUUID,
failedSeqNo, persistReqNo,
err,
)

return err
}

logger.Log.Printf(
"open stream with rollback, vbID: %d, vbUUID: %d, failedSeqNo: %d, persistReqNo: %d",
vbID, vbUUID,
failedSeqNo, persistReqNo,
)

observer.AddCatchup(vbID, uint64(failedSeqNo))

opm := helpers.NewAsyncOp(context.Background())

ch := make(chan error)
Expand All @@ -373,10 +400,10 @@ func (s *client) openStreamWithRollback(vbID uint16,
vbID,
0,
vbUUID,
seqNo,
persistReqNo,
0xffffffffffffffff,
seqNo,
seqNo,
persistReqNo,
persistReqNo,
observer,
openStreamOptions,
func(_ []gocbcore.FailoverEntry, err error) {
Expand Down Expand Up @@ -445,14 +472,40 @@ func (s *client) OpenStream(

if err != nil {
if errors.Is(err, gocbcore.ErrMemdRollback) && s.config.RollbackMitigation.Enabled {
logger.Log.Printf("rollback for vbID: %d", vbID)
return s.openStreamWithRollback(vbID, vbUUID, gocbcore.SeqNo(0), observer, openStreamOptions)
logger.Log.Printf("need to rollback for vbID: %d", vbID)
return s.openStreamWithRollback(vbID, vbUUID, gocbcore.SeqNo(offset.SeqNo), observer, openStreamOptions)
}
}

return err
}

func (s *client) ObserveVbID(vbID uint16, vbUUID gocbcore.VbUUID) (*gocbcore.ObserveVbResult, error) {
opm := helpers.NewAsyncOp(context.Background())
ch := make(chan error)

var response *gocbcore.ObserveVbResult

op, err := s.agent.ObserveVb(gocbcore.ObserveVbOptions{
VbID: vbID,
VbUUID: vbUUID,
}, func(result *gocbcore.ObserveVbResult, err error) {
opm.Resolve()

response = result

ch <- err
})

err = opm.Wait(op, err)

if err != nil {
return nil, err
}

return response, <-ch
}

func (s *client) CloseStream(vbID uint16) error {
opm := helpers.NewAsyncOp(context.Background())

Expand Down
161 changes: 121 additions & 40 deletions dcp/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Observer interface {
Close()
CloseEnd()
ListenEnd() chan models.DcpStreamEnd
AddCatchup(vbID uint16, seqNo uint64)
}

type ObserverMetric struct {
Expand All @@ -40,14 +41,50 @@ type ObserverMetric struct {
}

type observer struct {
currentSnapshots map[uint16]*models.SnapshotMarker
uuIDs map[uint16]gocbcore.VbUUID
metrics map[uint16]*ObserverMetric
collectionIDs map[uint32]string
listenerCh models.ListenerCh
endCh chan models.DcpStreamEnd
currentSnapshotsLock *sync.Mutex
metricsLock *sync.Mutex
currentSnapshots map[uint16]*models.SnapshotMarker
uuIDs map[uint16]gocbcore.VbUUID
metrics map[uint16]*ObserverMetric
collectionIDs map[uint32]string
catchup map[uint16]uint64
listenerCh models.ListenerCh
endCh chan models.DcpStreamEnd
currentSnapshotsLock *sync.Mutex
metricsLock *sync.Mutex
catchupLock *sync.Mutex
checkCatchup map[uint16]bool
catchupNeededVbIDCount int
}

func (so *observer) AddCatchup(vbID uint16, seqNo uint64) {
so.catchupLock.Lock()
defer so.catchupLock.Unlock()

so.checkCatchup[vbID] = true
so.catchup[vbID] = seqNo
so.catchupNeededVbIDCount++
}

func (so *observer) isCatchupDone(vbID uint16, seqNo uint64) bool {
if so.catchupNeededVbIDCount == 0 {
return true
}

so.catchupLock.Lock()
defer so.catchupLock.Unlock()

if catchupSeqNo, ok := so.catchup[vbID]; ok {
if seqNo >= catchupSeqNo {
delete(so.catchup, vbID)
delete(so.checkCatchup, vbID)
so.catchupNeededVbIDCount--

return seqNo != catchupSeqNo
}
} else {
return true
}

return false
}

func (so *observer) convertToCollectionName(collectionID uint32) string {
Expand All @@ -58,7 +95,7 @@ func (so *observer) convertToCollectionName(collectionID uint32) string {
return helpers.DefaultCollectionName
}

//nolint:staticcheck
// nolint:staticcheck
func (so *observer) sendOrSkip(args models.ListenerArgs) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -69,22 +106,26 @@ func (so *observer) sendOrSkip(args models.ListenerArgs) {
so.listenerCh <- args
}

func (so *observer) SnapshotMarker(marker models.DcpSnapshotMarker) {
func (so *observer) SnapshotMarker(event models.DcpSnapshotMarker) {
so.currentSnapshotsLock.Lock()

so.currentSnapshots[marker.VbID] = &models.SnapshotMarker{
StartSeqNo: marker.StartSeqNo,
EndSeqNo: marker.EndSeqNo,
so.currentSnapshots[event.VbID] = &models.SnapshotMarker{
StartSeqNo: event.StartSeqNo,
EndSeqNo: event.EndSeqNo,
}

so.currentSnapshotsLock.Unlock()

so.sendOrSkip(models.ListenerArgs{
Event: marker,
Event: event,
})
}

func (so *observer) Mutation(mutation gocbcore.DcpMutation) {
func (so *observer) Mutation(mutation gocbcore.DcpMutation) { //nolint:dupl
if !so.isCatchupDone(mutation.VbID, mutation.SeqNo) {
return
}

so.currentSnapshotsLock.Lock()

if currentSnapshot, ok := so.currentSnapshots[mutation.VbID]; ok && currentSnapshot != nil {
Expand Down Expand Up @@ -115,7 +156,11 @@ func (so *observer) Mutation(mutation gocbcore.DcpMutation) {
}
}

func (so *observer) Deletion(deletion gocbcore.DcpDeletion) {
func (so *observer) Deletion(deletion gocbcore.DcpDeletion) { //nolint:dupl
if !so.isCatchupDone(deletion.VbID, deletion.SeqNo) {
return
}

so.currentSnapshotsLock.Lock()

if currentSnapshot, ok := so.currentSnapshots[deletion.VbID]; ok && currentSnapshot != nil {
Expand Down Expand Up @@ -146,7 +191,11 @@ func (so *observer) Deletion(deletion gocbcore.DcpDeletion) {
}
}

func (so *observer) Expiration(expiration gocbcore.DcpExpiration) {
func (so *observer) Expiration(expiration gocbcore.DcpExpiration) { //nolint:dupl
if !so.isCatchupDone(expiration.VbID, expiration.SeqNo) {
return
}

so.currentSnapshotsLock.Lock()

if currentSnapshot, ok := so.currentSnapshots[expiration.VbID]; ok && currentSnapshot != nil {
Expand Down Expand Up @@ -177,64 +226,92 @@ func (so *observer) Expiration(expiration gocbcore.DcpExpiration) {
}
}

func (so *observer) End(end models.DcpStreamEnd, _ error) {
so.endCh <- end
func (so *observer) End(event models.DcpStreamEnd, _ error) {
so.endCh <- event
}

func (so *observer) CreateCollection(creation models.DcpCollectionCreation) {
func (so *observer) CreateCollection(event models.DcpCollectionCreation) {
if !so.isCatchupDone(event.VbID, event.SeqNo) {
return
}

so.sendOrSkip(models.ListenerArgs{
Event: creation,
Event: event,
})
}

func (so *observer) DeleteCollection(deletion models.DcpCollectionDeletion) {
func (so *observer) DeleteCollection(event models.DcpCollectionDeletion) {
if !so.isCatchupDone(event.VbID, event.SeqNo) {
return
}

so.sendOrSkip(models.ListenerArgs{
Event: deletion,
Event: event,
})
}

func (so *observer) FlushCollection(flush models.DcpCollectionFlush) {
func (so *observer) FlushCollection(event models.DcpCollectionFlush) {
if !so.isCatchupDone(event.VbID, event.SeqNo) {
return
}

so.sendOrSkip(models.ListenerArgs{
Event: flush,
Event: event,
})
}

func (so *observer) CreateScope(creation models.DcpScopeCreation) {
func (so *observer) CreateScope(event models.DcpScopeCreation) {
if !so.isCatchupDone(event.VbID, event.SeqNo) {
return
}

so.sendOrSkip(models.ListenerArgs{
Event: creation,
Event: event,
})
}

func (so *observer) DeleteScope(deletion models.DcpScopeDeletion) {
func (so *observer) DeleteScope(event models.DcpScopeDeletion) {
if !so.isCatchupDone(event.VbID, event.SeqNo) {
return
}

so.sendOrSkip(models.ListenerArgs{
Event: deletion,
Event: event,
})
}

func (so *observer) ModifyCollection(modification models.DcpCollectionModification) {
func (so *observer) ModifyCollection(event models.DcpCollectionModification) {
if !so.isCatchupDone(event.VbID, event.SeqNo) {
return
}

so.sendOrSkip(models.ListenerArgs{
Event: modification,
Event: event,
})
}

func (so *observer) OSOSnapshot(snapshot models.DcpOSOSnapshot) {
func (so *observer) OSOSnapshot(event models.DcpOSOSnapshot) {
so.sendOrSkip(models.ListenerArgs{
Event: snapshot,
Event: event,
})
}

func (so *observer) SeqNoAdvanced(advanced models.DcpSeqNoAdvanced) {
func (so *observer) SeqNoAdvanced(event models.DcpSeqNoAdvanced) {
if !so.isCatchupDone(event.VbID, event.SeqNo) {
return
}

so.currentSnapshotsLock.Lock()

so.currentSnapshots[advanced.VbID] = &models.SnapshotMarker{
StartSeqNo: advanced.SeqNo,
EndSeqNo: advanced.SeqNo,
so.currentSnapshots[event.VbID] = &models.SnapshotMarker{
StartSeqNo: event.SeqNo,
EndSeqNo: event.SeqNo,
}

so.currentSnapshotsLock.Unlock()

so.sendOrSkip(models.ListenerArgs{
Event: advanced,
Event: event,
})
}

Expand All @@ -258,7 +335,7 @@ func (so *observer) ListenEnd() chan models.DcpStreamEnd {
return so.endCh
}

//nolint:staticcheck
// nolint:staticcheck
func (so *observer) Close() {
defer func() {
if r := recover(); r != nil {
Expand All @@ -269,7 +346,7 @@ func (so *observer) Close() {
close(so.listenerCh)
}

//nolint:staticcheck
// nolint:staticcheck
func (so *observer) CloseEnd() {
defer func() {
if r := recover(); r != nil {
Expand All @@ -294,6 +371,10 @@ func NewObserver(
metrics: map[uint16]*ObserverMetric{},
metricsLock: &sync.Mutex{},

catchup: map[uint16]uint64{},
checkCatchup: map[uint16]bool{},
catchupLock: &sync.Mutex{},

collectionIDs: collectionIDs,

listenerCh: make(models.ListenerCh, config.Dcp.Listener.BufferSize),
Expand Down

0 comments on commit 87bf00d

Please sign in to comment.