Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(cdc): clean backends if table sink is stuck too long #9527

Merged
merged 16 commits into from
Aug 11, 2023
7 changes: 7 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,9 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
if c.Sink.TxnAtomicity != nil {
res.Sink.TxnAtomicity = util.AddressOf(config.AtomicityLevel(*c.Sink.TxnAtomicity))
}
if c.Sink.AdvanceTimeout != nil {
res.Sink.AdvanceTimeout = util.AddressOf(*c.Sink.AdvanceTimeout)
}

}
if c.Mounter != nil {
Expand Down Expand Up @@ -637,6 +640,9 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
if cloned.Sink.TxnAtomicity != nil {
res.Sink.TxnAtomicity = util.AddressOf(string(*cloned.Sink.TxnAtomicity))
}
if cloned.Sink.AdvanceTimeout != nil {
res.Sink.AdvanceTimeout = util.AddressOf(*cloned.Sink.AdvanceTimeout)
}
}
if cloned.Consistent != nil {
res.Consistent = &ConsistentConfig{
Expand Down Expand Up @@ -788,6 +794,7 @@ type SinkConfig struct {
KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"`
MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"`
CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"`
AdvanceTimeout *uint `json:"advance_timeout,omitempty"`
hicqu marked this conversation as resolved.
Show resolved Hide resolved
}

// CSVConfig denotes the csv config
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var defaultAPIConfig = &ReplicaConfig{
EnableKafkaSinkV2: util.AddressOf(false),
OnlyOutputUpdatedColumns: util.AddressOf(false),
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
AdvanceTimeout: util.AddressOf(uint(600)),
},
Consistent: &ConsistentConfig{
Level: "none",
Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ func initProcessor4Test(
},
"sink": {
"dispatchers": null,
"protocol": "open-protocol"
"protocol": "open-protocol",
"advance-timeout": 600
}
},
"state": "normal",
Expand Down
139 changes: 99 additions & 40 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ type SinkManager struct {
sourceManager *sourcemanager.SourceManager
hicqu marked this conversation as resolved.
Show resolved Hide resolved

// sinkFactory used to create table sink.
sinkFactory *factory.SinkFactory
sinkFactoryMu sync.Mutex
sinkFactory struct {
sync.Mutex
f *factory.SinkFactory
version uint64
hicqu marked this conversation as resolved.
Show resolved Hide resolved
errors chan error
}

// tableSinks is a map from tableID to tableSink.
tableSinks spanz.SyncMap
Expand Down Expand Up @@ -190,7 +194,6 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
enableOldValue := m.changefeedInfo.Config.EnableOldValue

gcErrors := make(chan error, 16)
sinkFactoryErrors := make(chan error, 16)
sinkErrors := make(chan error, 16)
redoErrors := make(chan error, 16)

Expand Down Expand Up @@ -244,12 +247,7 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er

// SinkManager will restart some internal modules if necessasry.
for {
if err := m.initSinkFactory(sinkFactoryErrors); err != nil {
select {
case <-m.managerCtx.Done():
case sinkFactoryErrors <- err:
}
}
sinkFactoryErrors, sinkFactoryVersion := m.initSinkFactory()

select {
case <-m.managerCtx.Done():
Expand All @@ -264,9 +262,9 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
log.Warn("Sink manager backend sink fails",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Uint64("factoryVersion", sinkFactoryVersion),
zap.Error(err))
m.clearSinkFactory()
sinkFactoryErrors = make(chan error, 16)

start := time.Now()
log.Info("Sink manager is closing all table sinks",
Expand Down Expand Up @@ -304,46 +302,91 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
}
}

func (m *SinkManager) initSinkFactory(errCh chan error) error {
m.sinkFactoryMu.Lock()
defer m.sinkFactoryMu.Unlock()
if m.sinkFactory != nil {
return nil
}
func (m *SinkManager) initSinkFactory() (chan error, uint64) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
uri := m.changefeedInfo.SinkURI
cfg := m.changefeedInfo.Config

if m.sinkFactory.f != nil {
return m.sinkFactory.errors, m.sinkFactory.version
}
if m.sinkFactory.errors == nil {
m.sinkFactory.errors = make(chan error, 16)
m.sinkFactory.version += 1
}

emitError := func(err error) {
select {
case <-m.managerCtx.Done():
case m.sinkFactory.errors <- err:
}
}

var err error = nil
failpoint.Inject("SinkManagerRunError", func() {
log.Info("failpoint SinkManagerRunError injected", zap.String("changefeed", m.changefeedID.ID))
err = errors.New("SinkManagerRunError")
})
if err != nil {
return errors.Trace(err)
emitError(err)
return m.sinkFactory.errors, m.sinkFactory.version
}

if m.sinkFactory, err = factory.New(m.managerCtx, m.changefeedID, uri, cfg, errCh); err == nil {
log.Info("Sink manager inits sink factory success",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID))
return nil
m.sinkFactory.f, err = factory.New(m.managerCtx, m.changefeedID, uri, cfg, m.sinkFactory.errors)
if err != nil {
emitError(err)
return m.sinkFactory.errors, m.sinkFactory.version
}
return errors.Trace(err)

log.Info("Sink manager inits sink factory success",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Uint64("factoryVersion", m.sinkFactory.version))
return m.sinkFactory.errors, m.sinkFactory.version
}

func (m *SinkManager) clearSinkFactory() {
m.sinkFactoryMu.Lock()
defer m.sinkFactoryMu.Unlock()
if m.sinkFactory != nil {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
if m.sinkFactory.f != nil {
log.Info("Sink manager closing sink factory",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID))
m.sinkFactory.Close()
m.sinkFactory = nil
zap.String("changefeed", m.changefeedID.ID),
zap.Uint64("factoryVersion", m.sinkFactory.version))
m.sinkFactory.f.Close()
m.sinkFactory.f = nil
log.Info("Sink manager has closed sink factory",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID))
zap.String("changefeed", m.changefeedID.ID),
zap.Uint64("factoryVersion", m.sinkFactory.version))
}
if m.sinkFactory.errors != nil {
close(m.sinkFactory.errors)
for range m.sinkFactory.errors {
}
m.sinkFactory.errors = nil
}
}

func (m *SinkManager) putSinkFactoryError(err error, version uint64) bool {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
skipped := true
hicqu marked this conversation as resolved.
Show resolved Hide resolved
if version == m.sinkFactory.version {
select {
case m.sinkFactory.errors <- err:
skipped = false
default:
}
}
log.Info("Sink manager tries to put an sink error",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Bool("skipped", skipped),
zap.String("error", err.Error()))

return !skipped
hicqu marked this conversation as resolved.
Show resolved Hide resolved
}

func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool, enableOldValue bool) {
Expand Down Expand Up @@ -391,7 +434,7 @@ func (m *SinkManager) backgroundGC(errors chan<- error) {
if time.Since(sink.lastCleanTime) < cleanTableInterval {
return true
}
checkpointTs := sink.getCheckpointTs()
checkpointTs, _, _ := sink.getCheckpointTs()
resolvedMark := checkpointTs.ResolvedMark()
if resolvedMark == 0 {
return true
Expand Down Expand Up @@ -756,14 +799,15 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod
sinkWrapper := newTableSinkWrapper(
m.changefeedID,
span,
func() tablesink.TableSink {
if m.sinkFactoryMu.TryLock() {
defer m.sinkFactoryMu.Unlock()
if m.sinkFactory != nil {
return m.sinkFactory.CreateTableSink(m.changefeedID, span, startTs, m.metricsTableSinkTotalRows)
func() (s tablesink.TableSink, version uint64) {
if m.sinkFactory.TryLock() {
defer m.sinkFactory.Unlock()
if m.sinkFactory.f != nil {
s = m.sinkFactory.f.CreateTableSink(m.changefeedID, span, startTs, m.metricsTableSinkTotalRows)
version = m.sinkFactory.version
}
}
return nil
return
},
tablepb.TableStatePreparing,
startTs,
Expand Down Expand Up @@ -862,12 +906,12 @@ func (m *SinkManager) RemoveTable(span tablepb.Span) {
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span))
}
sink := value.(*tableSinkWrapper)
checkpointTs, _, _ := value.(*tableSinkWrapper).getCheckpointTs()
log.Info("Remove table sink successfully",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Uint64("checkpointTs", sink.getCheckpointTs().Ts))
zap.Uint64("checkpointTs", checkpointTs.Ts))
if m.eventCache != nil {
m.eventCache.removeTable(span)
}
Expand Down Expand Up @@ -931,9 +975,24 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
}
tableSink := value.(*tableSinkWrapper)

checkpointTs := tableSink.getCheckpointTs()
checkpointTs, version, advanced := tableSink.getCheckpointTs()
m.sinkMemQuota.Release(span, checkpointTs)
m.redoMemQuota.Release(span, checkpointTs)

stuckCheck := time.Duration(*m.changefeedInfo.Config.Sink.AdvanceTimeout) * time.Second
hicqu marked this conversation as resolved.
Show resolved Hide resolved
if advanced.After(time.Unix(0, 0)) && time.Since(advanced) > stuckCheck &&
hicqu marked this conversation as resolved.
Show resolved Hide resolved
oracle.GetTimeFromTS(tableSink.getUpperBoundTs()).Sub(oracle.GetTimeFromTS(checkpointTs.Ts)) > stuckCheck {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", version))
tableSink.updateTableSinkAdvanced()
m.putSinkFactoryError(errors.New("table sink stuck"), version)
}

var resolvedTs model.Ts
// If redo log is enabled, we have to use redo log's resolved ts to calculate processor's min resolved ts.
if m.redoDMLMgr != nil {
Expand Down
7 changes: 4 additions & 3 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) {
require.Eventually(t, func() bool {
tableSink, ok := manager.tableSinks.Load(span)
require.True(t, ok)
checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs()
checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs()
return checkpointTS.ResolvedMark() == 4
}, 5*time.Second, 10*time.Millisecond)
}
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) {
require.Eventually(t, func() bool {
tableSink, ok := manager.tableSinks.Load(span)
require.True(t, ok)
checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs()
checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs()
return checkpointTS.ResolvedMark() == 3
}, 5*time.Second, 10*time.Millisecond)
}
Expand Down Expand Up @@ -283,7 +283,8 @@ func TestDoNotGenerateTableSinkTaskWhenTableIsNotReplicating(t *testing.T) {
tableSink, ok := manager.tableSinks.Load(span)
require.True(t, ok)
require.NotNil(t, tableSink)
require.Equal(t, uint64(1), tableSink.(*tableSinkWrapper).getCheckpointTs().Ts)
checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs()
require.Equal(t, uint64(1), checkpointTS.Ts)
}

func TestClose(t *testing.T) {
Expand Down
Loading
Loading