diff --git a/config/dcp.go b/config/dcp.go index 8fc3771..3cabde0 100644 --- a/config/dcp.go +++ b/config/dcp.go @@ -50,7 +50,8 @@ type DCPGroup struct { } type DCPListener struct { - BufferSize uint `yaml:"bufferSize"` + BufferSize uint `yaml:"bufferSize"` + SkipUntil *time.Time `yaml:"skipUntil"` } type ExternalDcpConfig struct { diff --git a/couchbase/observer.go b/couchbase/observer.go index dd7b6be..15223d0 100644 --- a/couchbase/observer.go +++ b/couchbase/observer.go @@ -138,6 +138,13 @@ func (so *observer) canForward(vbID uint16, seqNo uint64) bool { return !so.needCatchup(vbID, seqNo) } +func (so *observer) isBeforeSkipWindow(eventTime time.Time) bool { + if so.config.Dcp.Listener.SkipUntil == nil { + return false + } + return so.config.Dcp.Listener.SkipUntil.Before(eventTime) +} + func (so *observer) convertToCollectionName(collectionID uint32) string { if name, ok := so.collectionIDs[collectionID]; ok { return name @@ -173,6 +180,12 @@ func (so *observer) Mutation(mutation gocbcore.DcpMutation) { //nolint:dupl return } + eventTime := time.Unix(int64(mutation.Cas/1000000000), 0) + + if so.isBeforeSkipWindow(eventTime) { + return + } + if currentSnapshot, ok := so.currentSnapshots.Load(mutation.VbID); ok && currentSnapshot != nil { vbUUID, _ := so.uuIDMap.Load(mutation.VbID) @@ -185,7 +198,7 @@ func (so *observer) Mutation(mutation gocbcore.DcpMutation) { //nolint:dupl SeqNo: mutation.SeqNo, }, CollectionName: so.convertToCollectionName(mutation.CollectionID), - EventTime: time.Unix(int64(mutation.Cas/1000000000), 0), + EventTime: eventTime, }, }) } @@ -204,6 +217,12 @@ func (so *observer) Deletion(deletion gocbcore.DcpDeletion) { //nolint:dupl return } + eventTime := time.Unix(int64(deletion.Cas/1000000000), 0) + + if so.isBeforeSkipWindow(eventTime) { + return + } + if currentSnapshot, ok := so.currentSnapshots.Load(deletion.VbID); ok && currentSnapshot != nil { vbUUID, _ := so.uuIDMap.Load(deletion.VbID) @@ -216,7 +235,7 @@ func (so *observer) Deletion(deletion gocbcore.DcpDeletion) { //nolint:dupl SeqNo: deletion.SeqNo, }, CollectionName: so.convertToCollectionName(deletion.CollectionID), - EventTime: time.Unix(int64(deletion.Cas/1000000000), 0), + EventTime: eventTime, }, }) } @@ -235,6 +254,12 @@ func (so *observer) Expiration(expiration gocbcore.DcpExpiration) { //nolint:dup return } + eventTime := time.Unix(int64(expiration.Cas/1000000000), 0) + + if so.isBeforeSkipWindow(eventTime) { + return + } + if currentSnapshot, ok := so.currentSnapshots.Load(expiration.VbID); ok && currentSnapshot != nil { vbUUID, _ := so.uuIDMap.Load(expiration.VbID) @@ -247,7 +272,7 @@ func (so *observer) Expiration(expiration gocbcore.DcpExpiration) { //nolint:dup SeqNo: expiration.SeqNo, }, CollectionName: so.convertToCollectionName(expiration.CollectionID), - EventTime: time.Unix(int64(expiration.Cas/1000000000), 0), + EventTime: eventTime, }, }) }