Skip to content

Commit

Permalink
getCheckpointTs should use read lock
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu committed Aug 11, 2023
1 parent 308c7cb commit 3f9268a
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
m.redoMemQuota.Release(span, checkpointTs)

stuckCheck := time.Duration(*m.changefeedInfo.Config.Sink.AdvanceTimeoutInSec) * time.Second
if advanced.After(time.Unix(0, 0)) && time.Since(advanced) > stuckCheck &&
if version > 0 && time.Since(advanced) > stuckCheck &&
oracle.GetTimeFromTS(tableSink.getUpperBoundTs()).Sub(oracle.GetTimeFromTS(checkpointTs.Ts)) > stuckCheck {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
Expand Down
25 changes: 14 additions & 11 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type tableSinkWrapper struct {
s tablesink.TableSink
version uint64 // it's generated by `tableSinkCreater`.
checkpointTs model.ResolvedTs
advanced time.Time
advanced atomic.Int64
}

// state used to control the lifecycle of the table.
Expand Down Expand Up @@ -120,7 +120,7 @@ func newTableSinkWrapper(

res.tableSink.version = 0
res.tableSink.checkpointTs = model.NewResolvedTs(startTs)
res.tableSink.advanced = time.Unix(0, 0)
res.updateTableSinkAdvanced()

res.receivedSorterResolvedTs.Store(startTs)
res.barrierTs.Store(startTs)
Expand Down Expand Up @@ -205,22 +205,25 @@ func (t *tableSinkWrapper) updateResolvedTs(ts model.ResolvedTs) error {
// 2. the table sink version, which comes from `tableSinkCreater`;
// 3. recent time of the table is advanced.
func (t *tableSinkWrapper) getCheckpointTs() (model.ResolvedTs, uint64, time.Time) {
t.tableSink.Lock()
defer t.tableSink.Unlock()
t.tableSink.RLock()
defer t.tableSink.RUnlock()
if t.tableSink.s != nil {
checkpointTs := t.tableSink.s.GetCheckpointTs()
if t.tableSink.checkpointTs.Less(checkpointTs) {
t.tableSink.checkpointTs = checkpointTs
t.tableSink.advanced = time.Now()
t.updateTableSinkAdvanced()
}
}
return t.tableSink.checkpointTs, t.tableSink.version, t.tableSink.advanced
advanced := time.Unix(t.tableSink.advanced.Load(), 0)
return t.tableSink.checkpointTs, t.tableSink.version, advanced
}

func (t *tableSinkWrapper) updateTableSinkAdvanced() {
t.tableSink.Lock()
defer t.tableSink.Unlock()
t.tableSink.advanced = time.Now()
curr := t.tableSink.advanced.Load()
now := time.Now().Unix()
if now > curr {
t.tableSink.advanced.CompareAndSwap(curr, now)
}
}

func (t *tableSinkWrapper) getReceivedSorterResolvedTs() model.Ts {
Expand Down Expand Up @@ -299,7 +302,7 @@ func (t *tableSinkWrapper) initTableSink() bool {
if t.tableSink.s == nil {
t.tableSink.s, t.tableSink.version = t.tableSinkCreater()
if t.tableSink.s != nil {
t.tableSink.advanced = time.Now()
t.updateTableSinkAdvanced()
return true
}
return false
Expand Down Expand Up @@ -348,7 +351,7 @@ func (t *tableSinkWrapper) doTableSinkClear() {
}
t.tableSink.s = nil
t.tableSink.version = 0
t.tableSink.advanced = time.Unix(0, 0)
t.tableSink.advanced.Store(time.Now().Unix())
}

// When the attached sink fail, there can be some events that have already been
Expand Down

0 comments on commit 3f9268a

Please sign in to comment.