diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index dae63813209..a35379fd489 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -409,6 +409,9 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( if c.Sink.TxnAtomicity != nil { res.Sink.TxnAtomicity = util.AddressOf(config.AtomicityLevel(*c.Sink.TxnAtomicity)) } + if c.Sink.AdvanceTimeoutInSec != nil { + res.Sink.AdvanceTimeoutInSec = util.AddressOf(*c.Sink.AdvanceTimeoutInSec) + } } if c.Mounter != nil { @@ -637,6 +640,9 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { if cloned.Sink.TxnAtomicity != nil { res.Sink.TxnAtomicity = util.AddressOf(string(*cloned.Sink.TxnAtomicity)) } + if cloned.Sink.AdvanceTimeoutInSec != nil { + res.Sink.AdvanceTimeoutInSec = util.AddressOf(*cloned.Sink.AdvanceTimeoutInSec) + } } if cloned.Consistent != nil { res.Consistent = &ConsistentConfig{ @@ -788,6 +794,7 @@ type SinkConfig struct { KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"` MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"` CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"` + AdvanceTimeoutInSec *uint `json:"advance_timeout,omitempty"` } // CSVConfig denotes the csv config diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index 584f924502c..3b827345d3b 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -58,6 +58,7 @@ var defaultAPIConfig = &ReplicaConfig{ EnableKafkaSinkV2: util.AddressOf(false), OnlyOutputUpdatedColumns: util.AddressOf(false), DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false), + AdvanceTimeoutInSec: util.AddressOf(uint(150)), }, Consistent: &ConsistentConfig{ Level: "none", diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 4fcc82d44cd..cd259de8821 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -134,7 +134,8 @@ func initProcessor4Test( }, "sink": { "dispatchers": null, - "protocol": "open-protocol" + "protocol": "open-protocol", + "advance-timeout-in-sec": 150 } }, "state": "normal", diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 46c17124dcc..bcf120900bd 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -50,8 +50,6 @@ const ( // engine.CleanByTable can be expensive. So it's necessary to reduce useless calls. cleanTableInterval = 5 * time.Second cleanTableMinEvents = 128 - maxRetryDuration = 30 * time.Minute - errGCInterval = 10 * time.Minute ) // TableStats of a table sink. @@ -86,8 +84,15 @@ type SinkManager struct { sourceManager *sourcemanager.SourceManager // sinkFactory used to create table sink. - sinkFactory *factory.SinkFactory - sinkFactoryMu sync.Mutex + sinkFactory struct { + sync.Mutex + f *factory.SinkFactory + // When every time we want to create a new factory, version will be increased and + // errors will be replaced by a new channel. version is used to distinct different + // sink factories in table sinks. + version uint64 + errors chan error + } // tableSinks is a map from tableID to tableSink. tableSinks spanz.SyncMap @@ -190,7 +195,6 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er enableOldValue := m.changefeedInfo.Config.EnableOldValue gcErrors := make(chan error, 16) - sinkFactoryErrors := make(chan error, 16) sinkErrors := make(chan error, 16) redoErrors := make(chan error, 16) @@ -244,12 +248,7 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er // SinkManager will restart some internal modules if necessasry. for { - if err := m.initSinkFactory(sinkFactoryErrors); err != nil { - select { - case <-m.managerCtx.Done(): - case sinkFactoryErrors <- err: - } - } + sinkFactoryErrors, sinkFactoryVersion := m.initSinkFactory() select { case <-m.managerCtx.Done(): @@ -264,9 +263,9 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er log.Warn("Sink manager backend sink fails", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), + zap.Uint64("factoryVersion", sinkFactoryVersion), zap.Error(err)) m.clearSinkFactory() - sinkFactoryErrors = make(chan error, 16) start := time.Now() log.Info("Sink manager is closing all table sinks", @@ -304,48 +303,91 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er } } -func (m *SinkManager) initSinkFactory(errCh chan error) error { - m.sinkFactoryMu.Lock() - defer m.sinkFactoryMu.Unlock() - if m.sinkFactory != nil { - return nil - } +func (m *SinkManager) initSinkFactory() (chan error, uint64) { + m.sinkFactory.Lock() + defer m.sinkFactory.Unlock() uri := m.changefeedInfo.SinkURI cfg := m.changefeedInfo.Config + if m.sinkFactory.f != nil { + return m.sinkFactory.errors, m.sinkFactory.version + } + if m.sinkFactory.errors == nil { + m.sinkFactory.errors = make(chan error, 16) + m.sinkFactory.version += 1 + } + + emitError := func(err error) { + select { + case <-m.managerCtx.Done(): + case m.sinkFactory.errors <- err: + } + } + var err error = nil failpoint.Inject("SinkManagerRunError", func() { log.Info("failpoint SinkManagerRunError injected", zap.String("changefeed", m.changefeedID.ID)) err = errors.New("SinkManagerRunError") }) if err != nil { - return errors.Trace(err) + emitError(err) + return m.sinkFactory.errors, m.sinkFactory.version } - if m.sinkFactory, err = factory.New(m.managerCtx, m.changefeedID, uri, cfg, errCh); err == nil { - log.Info("Sink manager inits sink factory success", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID)) - return nil + m.sinkFactory.f, err = factory.New(m.managerCtx, m.changefeedID, uri, cfg, m.sinkFactory.errors) + if err != nil { + emitError(err) + return m.sinkFactory.errors, m.sinkFactory.version } - return errors.Trace(err) + + log.Info("Sink manager inits sink factory success", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Uint64("factoryVersion", m.sinkFactory.version)) + return m.sinkFactory.errors, m.sinkFactory.version } func (m *SinkManager) clearSinkFactory() { - m.sinkFactoryMu.Lock() - defer m.sinkFactoryMu.Unlock() - if m.sinkFactory != nil { + m.sinkFactory.Lock() + defer m.sinkFactory.Unlock() + if m.sinkFactory.f != nil { log.Info("Sink manager closing sink factory", zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID)) - m.sinkFactory.Close() - m.sinkFactory = nil + zap.String("changefeed", m.changefeedID.ID), + zap.Uint64("factoryVersion", m.sinkFactory.version)) + m.sinkFactory.f.Close() + m.sinkFactory.f = nil log.Info("Sink manager has closed sink factory", zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID)) + zap.String("changefeed", m.changefeedID.ID), + zap.Uint64("factoryVersion", m.sinkFactory.version)) + } + if m.sinkFactory.errors != nil { + close(m.sinkFactory.errors) + for range m.sinkFactory.errors { + } + m.sinkFactory.errors = nil } } +func (m *SinkManager) putSinkFactoryError(err error, version uint64) { + m.sinkFactory.Lock() + defer m.sinkFactory.Unlock() + skipped := true + if version == m.sinkFactory.version { + select { + case m.sinkFactory.errors <- err: + skipped = false + default: + } + } + log.Info("Sink manager tries to put an sink error", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Bool("skipped", skipped), + zap.String("error", err.Error())) +} + func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool, enableOldValue bool) { for i := 0; i < sinkWorkerNum; i++ { w := newSinkWorker(m.changefeedID, m.sourceManager, @@ -391,7 +433,7 @@ func (m *SinkManager) backgroundGC(errors chan<- error) { if time.Since(sink.lastCleanTime) < cleanTableInterval { return true } - checkpointTs := sink.getCheckpointTs() + checkpointTs, _, _ := sink.getCheckpointTs() resolvedMark := checkpointTs.ResolvedMark() if resolvedMark == 0 { return true @@ -756,14 +798,15 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod sinkWrapper := newTableSinkWrapper( m.changefeedID, span, - func() tablesink.TableSink { - if m.sinkFactoryMu.TryLock() { - defer m.sinkFactoryMu.Unlock() - if m.sinkFactory != nil { - return m.sinkFactory.CreateTableSink(m.changefeedID, span, startTs, m.metricsTableSinkTotalRows) + func() (s tablesink.TableSink, version uint64) { + if m.sinkFactory.TryLock() { + defer m.sinkFactory.Unlock() + if m.sinkFactory.f != nil { + s = m.sinkFactory.f.CreateTableSink(m.changefeedID, span, startTs, m.metricsTableSinkTotalRows) + version = m.sinkFactory.version } } - return nil + return }, tablepb.TableStatePreparing, startTs, @@ -862,12 +905,12 @@ func (m *SinkManager) RemoveTable(span tablepb.Span) { zap.String("changefeed", m.changefeedID.ID), zap.Stringer("span", &span)) } - sink := value.(*tableSinkWrapper) + checkpointTs, _, _ := value.(*tableSinkWrapper).getCheckpointTs() log.Info("Remove table sink successfully", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), zap.Stringer("span", &span), - zap.Uint64("checkpointTs", sink.getCheckpointTs().Ts)) + zap.Uint64("checkpointTs", checkpointTs.Ts)) if m.eventCache != nil { m.eventCache.removeTable(span) } @@ -931,9 +974,24 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats { } tableSink := value.(*tableSinkWrapper) - checkpointTs := tableSink.getCheckpointTs() + checkpointTs, version, advanced := tableSink.getCheckpointTs() m.sinkMemQuota.Release(span, checkpointTs) m.redoMemQuota.Release(span, checkpointTs) + + stuckCheck := time.Duration(*m.changefeedInfo.Config.Sink.AdvanceTimeoutInSec) * time.Second + if version > 0 && time.Since(advanced) > stuckCheck && + oracle.GetTimeFromTS(tableSink.getUpperBoundTs()).Sub(oracle.GetTimeFromTS(checkpointTs.Ts)) > stuckCheck { + log.Warn("Table checkpoint is stuck too long, will restart the sink backend", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Stringer("span", &span), + zap.Any("checkpointTs", checkpointTs), + zap.Float64("stuckCheck", stuckCheck.Seconds()), + zap.Uint64("factoryVersion", version)) + tableSink.updateTableSinkAdvanced() + m.putSinkFactoryError(errors.New("table sink stuck"), version) + } + var resolvedTs model.Ts // If redo log is enabled, we have to use redo log's resolved ts to calculate processor's min resolved ts. if m.redoDMLMgr != nil { diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 85d0cb28919..d79657e749f 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -197,7 +197,7 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) { require.Eventually(t, func() bool { tableSink, ok := manager.tableSinks.Load(span) require.True(t, ok) - checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs() + checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs() return checkpointTS.ResolvedMark() == 4 }, 5*time.Second, 10*time.Millisecond) } @@ -228,7 +228,7 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) { require.Eventually(t, func() bool { tableSink, ok := manager.tableSinks.Load(span) require.True(t, ok) - checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs() + checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs() return checkpointTS.ResolvedMark() == 3 }, 5*time.Second, 10*time.Millisecond) } @@ -283,7 +283,8 @@ func TestDoNotGenerateTableSinkTaskWhenTableIsNotReplicating(t *testing.T) { tableSink, ok := manager.tableSinks.Load(span) require.True(t, ok) require.NotNil(t, tableSink) - require.Equal(t, uint64(1), tableSink.(*tableSinkWrapper).getCheckpointTs().Ts) + checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs() + require.Equal(t, uint64(1), checkpointTS.Ts) } func TestClose(t *testing.T) { diff --git a/cdc/processor/sinkmanager/table_sink_advancer_test.go b/cdc/processor/sinkmanager/table_sink_advancer_test.go index 1732b748d0c..b1c2af8303a 100644 --- a/cdc/processor/sinkmanager/table_sink_advancer_test.go +++ b/cdc/processor/sinkmanager/table_sink_advancer_test.go @@ -136,7 +136,8 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceTableSinkWithBatchID() { expectedResolvedTs := model.NewResolvedTs(2) expectedResolvedTs.Mode = model.BatchResolvedMode expectedResolvedTs.BatchID = 1 - require.Equal(suite.T(), expectedResolvedTs, task.tableSink.getCheckpointTs()) + checkpointTs, _, _ := task.tableSink.getCheckpointTs() + require.Equal(suite.T(), expectedResolvedTs, checkpointTs) } func (suite *tableSinkAdvancerSuite) TestAdvanceTableSink() { @@ -150,7 +151,8 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceTableSink() { require.NoError(suite.T(), err) expectedResolvedTs := model.NewResolvedTs(2) - require.Equal(suite.T(), expectedResolvedTs, task.tableSink.getCheckpointTs()) + checkpointTs, _, _ := task.tableSink.getCheckpointTs() + require.Equal(suite.T(), expectedResolvedTs, checkpointTs) } func (suite *tableSinkAdvancerSuite) TestNewTableSinkAdvancer() { @@ -288,7 +290,8 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceTheSameCommitTsEventsWithCommitF require.Len(suite.T(), sink.GetEvents(), 3) sink.AckAllEvents() require.Eventually(suite.T(), func() bool { - return task.tableSink.getCheckpointTs() == model.NewResolvedTs(2) + checkpointTs, _, _ := task.tableSink.getCheckpointTs() + return checkpointTs == model.NewResolvedTs(2) }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) require.Equal(suite.T(), uint64(0), advancer.pendingTxnSize) @@ -334,7 +337,8 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceTheSameCommitTsEventsWithoutComm expectedResolvedTs := model.NewResolvedTs(3) expectedResolvedTs.Mode = model.BatchResolvedMode expectedResolvedTs.BatchID = 1 - return task.tableSink.getCheckpointTs() == expectedResolvedTs + checkpointTs, _, _ := task.tableSink.getCheckpointTs() + return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) require.Equal(suite.T(), uint64(0), advancer.pendingTxnSize) @@ -384,7 +388,8 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceDifferentCommitTsEventsWithSplit expectedResolvedTs := model.NewResolvedTs(3) expectedResolvedTs.Mode = model.BatchResolvedMode expectedResolvedTs.BatchID = 1 - return task.tableSink.getCheckpointTs() == expectedResolvedTs + checkpointTs, _, _ := task.tableSink.getCheckpointTs() + return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) require.Equal(suite.T(), uint64(0), advancer.pendingTxnSize) @@ -438,7 +443,8 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceDifferentCommitTsEventsWithoutSp sink.AckAllEvents() require.Eventually(suite.T(), func() bool { expectedResolvedTs := model.NewResolvedTs(2) - return task.tableSink.getCheckpointTs() == expectedResolvedTs + checkpointTs, _, _ := task.tableSink.getCheckpointTs() + return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) require.Equal(suite.T(), uint64(256), advancer.pendingTxnSize) @@ -493,7 +499,8 @@ func (suite *tableSinkAdvancerSuite) TestLastTimeAdvanceDifferentCommitTsEventsW sink.AckAllEvents() require.Eventually(suite.T(), func() bool { expectedResolvedTs := model.NewResolvedTs(2) - return task.tableSink.getCheckpointTs() == expectedResolvedTs + checkpointTs, _, _ := task.tableSink.getCheckpointTs() + return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) require.Equal(suite.T(), uint64(0), advancer.pendingTxnSize, @@ -550,7 +557,8 @@ func (suite *tableSinkAdvancerSuite) TestTryAdvanceWhenExceedAvailableMem() { sink.AckAllEvents() require.Eventually(suite.T(), func() bool { expectedResolvedTs := model.NewResolvedTs(3) - return task.tableSink.getCheckpointTs() == expectedResolvedTs + checkpointTs, _, _ := task.tableSink.getCheckpointTs() + return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) require.Equal(suite.T(), uint64(0), advancer.pendingTxnSize) @@ -599,7 +607,8 @@ func (suite *tableSinkAdvancerSuite) TestTryAdvanceWhenReachTheMaxUpdateIntSizeA sink.AckAllEvents() require.Eventually(suite.T(), func() bool { expectedResolvedTs := model.NewResolvedTs(3) - return task.tableSink.getCheckpointTs() == expectedResolvedTs + checkpointTs, _, _ := task.tableSink.getCheckpointTs() + return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) require.Equal(suite.T(), uint64(0), advancer.pendingTxnSize) @@ -651,7 +660,8 @@ func (suite *tableSinkAdvancerSuite) TestFinish() { sink.AckAllEvents() require.Eventually(suite.T(), func() bool { expectedResolvedTs := model.NewResolvedTs(4) - return task.tableSink.getCheckpointTs() == expectedResolvedTs + checkpointTs, _, _ := task.tableSink.getCheckpointTs() + return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) require.Equal(suite.T(), uint64(0), advancer.pendingTxnSize) @@ -700,7 +710,8 @@ func (suite *tableSinkAdvancerSuite) TestTryAdvanceAndForceAcquireWithoutSplitTx sink.AckAllEvents() require.Eventually(suite.T(), func() bool { expectedResolvedTs := model.NewResolvedTs(3) - return task.tableSink.getCheckpointTs() == expectedResolvedTs + checkpointTs, _, _ := task.tableSink.getCheckpointTs() + return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) require.Equal(suite.T(), uint64(0), advancer.pendingTxnSize) @@ -764,7 +775,8 @@ func (suite *tableSinkAdvancerSuite) TestTryAdvanceAndBlockAcquireWithSplitTxn() <-down require.Eventually(suite.T(), func() bool { expectedResolvedTs := model.NewResolvedTs(3) - return task.tableSink.getCheckpointTs() == expectedResolvedTs + checkpointTs, _, _ := task.tableSink.getCheckpointTs() + return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) require.Equal(suite.T(), uint64(0), advancer.pendingTxnSize) diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 73047ebbbfa..5c3033a33fe 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -193,7 +193,8 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // Restart the table sink based on the checkpoint position. if finalErr = task.tableSink.restart(ctx); finalErr == nil { - ckpt := task.tableSink.getCheckpointTs().ResolvedMark() + checkpointTs, _, _ := task.tableSink.getCheckpointTs() + ckpt := checkpointTs.ResolvedMark() lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} task.callback(lastWrittenPos) log.Info("table sink has been restarted", diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 33b73726599..1a6f3eeb55f 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -533,7 +533,8 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableWhen receivedEvents := sink.GetEvents() receivedEvents[0].Callback() require.Len(suite.T(), sink.GetEvents(), 1, "No more events should be sent to sink") - require.Equal(suite.T(), uint64(4), wrapper.getCheckpointTs().ResolvedMark()) + checkpointTs, _, _ := wrapper.getCheckpointTs() + require.Equal(suite.T(), uint64(4), checkpointTs.ResolvedMark()) } // Test Scenario: @@ -578,7 +579,8 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableIfNo isCanceled: func() bool { return false }, } require.Eventually(suite.T(), func() bool { - return wrapper.getCheckpointTs().ResolvedMark() == 4 + checkpointTs, _, _ := wrapper.getCheckpointTs() + return checkpointTs.ResolvedMark() == 4 }, 5*time.Second, 10*time.Millisecond, "Directly advance resolved mark to 4") cancel() wg.Wait() @@ -635,7 +637,8 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskUseDifferentBatchIDEveryTime() require.Equal(suite.T(), uint64(3), batchID.Load()) sink.AckAllEvents() require.Eventually(suite.T(), func() bool { - return wrapper.getCheckpointTs().ResolvedMark() == 2 + checkpointTs, _, _ := wrapper.getCheckpointTs() + return checkpointTs.ResolvedMark() == 2 }, 5*time.Second, 10*time.Millisecond) events = []*model.PolymorphicEvent{ diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index c507eb2be08..f01e03c53f2 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -46,12 +46,16 @@ type tableSinkWrapper struct { // tableSpan used for logging. span tablepb.Span - tableSinkCreater func() tablesink.TableSink + tableSinkCreater func() (tablesink.TableSink, uint64) // tableSink is the underlying sink. - tableSink tablesink.TableSink - tableSinkCheckpointTs model.ResolvedTs - tableSinkMu sync.RWMutex + tableSink struct { + sync.RWMutex + s tablesink.TableSink + version uint64 // it's generated by `tableSinkCreater`. + checkpointTs model.ResolvedTs + advanced atomic.Int64 + } // state used to control the lifecycle of the table. state *tablepb.TableState @@ -97,7 +101,7 @@ func newRangeEventCount(pos engine.Position, events int) rangeEventCount { func newTableSinkWrapper( changefeed model.ChangeFeedID, span tablepb.Span, - tableSinkCreater func() tablesink.TableSink, + tableSinkCreater func() (tablesink.TableSink, uint64), state tablepb.TableState, startTs model.Ts, targetTs model.Ts, @@ -113,7 +117,11 @@ func newTableSinkWrapper( targetTs: targetTs, genReplicateTs: genReplicateTs, } - res.tableSinkCheckpointTs = model.NewResolvedTs(startTs) + + res.tableSink.version = 0 + res.tableSink.checkpointTs = model.NewResolvedTs(startTs) + res.updateTableSinkAdvanced() + res.receivedSorterResolvedTs.Store(startTs) res.barrierTs.Store(startTs) return res @@ -157,13 +165,13 @@ func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err err } func (t *tableSinkWrapper) appendRowChangedEvents(events ...*model.RowChangedEvent) error { - t.tableSinkMu.RLock() - defer t.tableSinkMu.RUnlock() - if t.tableSink == nil { + t.tableSink.RLock() + defer t.tableSink.RUnlock() + if t.tableSink.s == nil { // If it's nil it means it's closed. return tablesink.NewSinkInternalError(errors.New("table sink cleared")) } - t.tableSink.AppendRowChangedEvents(events...) + t.tableSink.s.AppendRowChangedEvents(events...) return nil } @@ -183,25 +191,39 @@ func (t *tableSinkWrapper) updateReceivedSorterResolvedTs(ts model.Ts) { } func (t *tableSinkWrapper) updateResolvedTs(ts model.ResolvedTs) error { - t.tableSinkMu.RLock() - defer t.tableSinkMu.RUnlock() - if t.tableSink == nil { + t.tableSink.RLock() + defer t.tableSink.RUnlock() + if t.tableSink.s == nil { // If it's nil it means it's closed. return tablesink.NewSinkInternalError(errors.New("table sink cleared")) } - return t.tableSink.UpdateResolvedTs(ts) + return t.tableSink.s.UpdateResolvedTs(ts) } -func (t *tableSinkWrapper) getCheckpointTs() model.ResolvedTs { - t.tableSinkMu.RLock() - defer t.tableSinkMu.RUnlock() - if t.tableSink != nil { - checkpointTs := t.tableSink.GetCheckpointTs() - if t.tableSinkCheckpointTs.Less(checkpointTs) { - t.tableSinkCheckpointTs = checkpointTs +// getCheckpointTs returns +// 1. checkpoint timestamp of the table; +// 2. the table sink version, which comes from `tableSinkCreater`; +// 3. recent time of the table is advanced. +func (t *tableSinkWrapper) getCheckpointTs() (model.ResolvedTs, uint64, time.Time) { + t.tableSink.RLock() + defer t.tableSink.RUnlock() + if t.tableSink.s != nil { + checkpointTs := t.tableSink.s.GetCheckpointTs() + if t.tableSink.checkpointTs.Less(checkpointTs) { + t.tableSink.checkpointTs = checkpointTs + t.updateTableSinkAdvanced() } } - return t.tableSinkCheckpointTs + advanced := time.Unix(t.tableSink.advanced.Load(), 0) + return t.tableSink.checkpointTs, t.tableSink.version, advanced +} + +func (t *tableSinkWrapper) updateTableSinkAdvanced() { + curr := t.tableSink.advanced.Load() + now := time.Now().Unix() + if now > curr { + t.tableSink.advanced.CompareAndSwap(curr, now) + } } func (t *tableSinkWrapper) getReceivedSorterResolvedTs() model.Ts { @@ -275,31 +297,35 @@ func (t *tableSinkWrapper) stop() { // Return true means the internal table sink has been initialized. func (t *tableSinkWrapper) initTableSink() bool { - t.tableSinkMu.Lock() - defer t.tableSinkMu.Unlock() - if t.tableSink == nil { - t.tableSink = t.tableSinkCreater() - return t.tableSink != nil + t.tableSink.Lock() + defer t.tableSink.Unlock() + if t.tableSink.s == nil { + t.tableSink.s, t.tableSink.version = t.tableSinkCreater() + if t.tableSink.s != nil { + t.updateTableSinkAdvanced() + return true + } + return false } return true } func (t *tableSinkWrapper) asyncCloseTableSink() bool { - t.tableSinkMu.RLock() - defer t.tableSinkMu.RUnlock() - if t.tableSink == nil { + t.tableSink.RLock() + defer t.tableSink.RUnlock() + if t.tableSink.s == nil { return true } - return t.tableSink.AsyncClose() + return t.tableSink.s.AsyncClose() } func (t *tableSinkWrapper) closeTableSink() { - t.tableSinkMu.RLock() - defer t.tableSinkMu.RUnlock() - if t.tableSink == nil { + t.tableSink.RLock() + defer t.tableSink.RUnlock() + if t.tableSink.s == nil { return } - t.tableSink.Close() + t.tableSink.s.Close() } func (t *tableSinkWrapper) asyncCloseAndClearTableSink() bool { @@ -314,16 +340,18 @@ func (t *tableSinkWrapper) closeAndClearTableSink() { } func (t *tableSinkWrapper) doTableSinkClear() { - t.tableSinkMu.Lock() - defer t.tableSinkMu.Unlock() - if t.tableSink == nil { + t.tableSink.Lock() + defer t.tableSink.Unlock() + if t.tableSink.s == nil { return } - checkpointTs := t.tableSink.GetCheckpointTs() - if t.tableSinkCheckpointTs.Less(checkpointTs) { - t.tableSinkCheckpointTs = checkpointTs + checkpointTs := t.tableSink.s.GetCheckpointTs() + if t.tableSink.checkpointTs.Less(checkpointTs) { + t.tableSink.checkpointTs = checkpointTs } - t.tableSink = nil + t.tableSink.s = nil + t.tableSink.version = 0 + t.tableSink.advanced.Store(time.Now().Unix()) } // When the attached sink fail, there can be some events that have already been diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index a9aa6a40bc3..206d02d32fb 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -86,13 +86,13 @@ func createTableSinkWrapper( wrapper := newTableSinkWrapper( changefeedID, span, - func() tablesink.TableSink { return innerTableSink }, + func() (tablesink.TableSink, uint64) { return innerTableSink, 1 }, tableState, 0, 100, func(_ context.Context) (model.Ts, error) { return math.MaxUint64, nil }, ) - wrapper.tableSink = wrapper.tableSinkCreater() + wrapper.tableSink.s, wrapper.tableSink.version = wrapper.tableSinkCreater() return wrapper, sink } @@ -329,5 +329,6 @@ func TestNewTableSinkWrapper(t *testing.T) { require.NotNil(t, wrapper) require.Equal(t, uint64(10), wrapper.getUpperBoundTs()) require.Equal(t, uint64(10), wrapper.getReceivedSorterResolvedTs()) - require.Equal(t, uint64(10), wrapper.getCheckpointTs().ResolvedMark()) + checkpointTs, _, _ := wrapper.getCheckpointTs() + require.Equal(t, uint64(10), checkpointTs.ResolvedMark()) } diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 8c62f04d3ec..48da8ef17a4 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1641,6 +1641,10 @@ var doc = `{ "config.SinkConfig": { "type": "object", "properties": { + "advance-timeout-in-sec": { + "description": "AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.", + "type": "integer" + }, "cloud-storage-config": { "$ref": "#/definitions/config.CloudStorageConfig" }, @@ -2728,6 +2732,9 @@ var doc = `{ "v2.SinkConfig": { "type": "object", "properties": { + "advance_timeout": { + "type": "integer" + }, "cloud_storage_config": { "$ref": "#/definitions/v2.CloudStorageConfig" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index b020fec566b..16185b85cf2 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1622,6 +1622,10 @@ "config.SinkConfig": { "type": "object", "properties": { + "advance-timeout-in-sec": { + "description": "AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.", + "type": "integer" + }, "cloud-storage-config": { "$ref": "#/definitions/config.CloudStorageConfig" }, @@ -2709,6 +2713,9 @@ "v2.SinkConfig": { "type": "object", "properties": { + "advance_timeout": { + "type": "integer" + }, "cloud_storage_config": { "$ref": "#/definitions/v2.CloudStorageConfig" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index be2553eac38..214659731ff 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -200,6 +200,11 @@ definitions: type: object config.SinkConfig: properties: + advance-timeout-in-sec: + description: |- + AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been + advanced for this given duration, the sink will be canceled and re-established. + type: integer cloud-storage-config: $ref: '#/definitions/config.CloudStorageConfig' column-selectors: @@ -937,6 +942,8 @@ definitions: type: object v2.SinkConfig: properties: + advance_timeout: + type: integer cloud_storage_config: $ref: '#/definitions/v2.CloudStorageConfig' column_selectors: diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index 56700cfaf49..07149f7cf03 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -213,6 +213,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) { OnlyOutputUpdatedColumns: util.AddressOf(false), DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false), Protocol: util.AddressOf("open-protocol"), + AdvanceTimeoutInSec: util.AddressOf(uint(150)), }, cfg.Sink) } @@ -245,6 +246,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) { }, OnlyOutputUpdatedColumns: util.AddressOf(false), DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false), + AdvanceTimeoutInSec: util.AddressOf(uint(150)), }, cfg.Sink) } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index b6a8fdab298..a5724292303 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -64,7 +64,8 @@ const ( "large-message-handle-option": "none", "claim-check-storage-uri": "", "claim-check-compression": "" - } + }, + "advance-timeout-in-sec": 150 }, "consistent": { "level": "none", @@ -277,7 +278,8 @@ const ( "flush-interval": "1m", "file-size": 1024, "output-column-id":false - } + }, + "advance-timeout-in-sec": 150 }, "consistent": { "level": "none", @@ -411,7 +413,8 @@ const ( "flush-interval": "1m", "file-size": 1024, "output-column-id":false - } + }, + "advance-timeout-in-sec": 150 }, "consistent": { "level": "none", diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 37b2b58fdad..3a8937cad1f 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -68,6 +68,7 @@ var defaultReplicaConfig = &ReplicaConfig{ OnlyOutputUpdatedColumns: util.AddressOf(false), DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false), TiDBSourceID: 1, + AdvanceTimeoutInSec: util.AddressOf(uint(150)), }, Consistent: &ConsistentConfig{ Level: "none", diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index 3b381de02ba..77b84466d7b 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -66,6 +66,7 @@ func TestReplicaConfigMarshal(t *testing.T) { conf.Sink.OnlyOutputUpdatedColumns = aws.Bool(true) conf.Sink.DeleteOnlyOutputHandleKeyColumns = aws.Bool(true) conf.Sink.SafeMode = aws.Bool(true) + conf.Sink.AdvanceTimeoutInSec = util.AddressOf(uint(150)) conf.Sink.KafkaConfig = &KafkaConfig{ PartitionNum: aws.Int32(1), ReplicationFactor: aws.Int16(1), diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 5fcb266c541..974491d4b57 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -158,6 +158,10 @@ type SinkConfig struct { PulsarConfig *PulsarConfig `toml:"pulsar-config" json:"pulsar-config,omitempty"` MySQLConfig *MySQLConfig `toml:"mysql-config" json:"mysql-config,omitempty"` CloudStorageConfig *CloudStorageConfig `toml:"cloud-storage-config" json:"cloud-storage-config,omitempty"` + + // AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been + // advanced for this given duration, the sink will be canceled and re-established. + AdvanceTimeoutInSec *uint `toml:"advance-timeout-in-sec" json:"advance-timeout-in-sec,omitempty"` } // CSVConfig defines a series of configuration items for csv codec. @@ -431,6 +435,10 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { } } + if s.AdvanceTimeoutInSec != nil && *s.AdvanceTimeoutInSec == 0 { + return cerror.ErrSinkInvalidConfig.GenWithStack("advance-timeout-in-sec should be greater than 0") + } + return nil } diff --git a/pkg/orchestrator/reactor_state_test.go b/pkg/orchestrator/reactor_state_test.go index 65a0a45f79c..a74ce331c74 100644 --- a/pkg/orchestrator/reactor_state_test.go +++ b/pkg/orchestrator/reactor_state_test.go @@ -118,7 +118,8 @@ func TestChangefeedStateUpdate(t *testing.T) { Mounter: &config.MounterConfig{WorkerNum: 16}, Scheduler: config.GetDefaultReplicaConfig().Scheduler, Sink: &config.SinkConfig{ - Terminator: putil.AddressOf(config.CRLF), + Terminator: putil.AddressOf(config.CRLF), + AdvanceTimeoutInSec: putil.AddressOf(uint(150)), }, Integrity: config.GetDefaultReplicaConfig().Integrity, }, @@ -168,7 +169,8 @@ func TestChangefeedStateUpdate(t *testing.T) { Filter: &config.FilterConfig{Rules: []string{"*.*"}}, Mounter: &config.MounterConfig{WorkerNum: 16}, Sink: &config.SinkConfig{ - Terminator: putil.AddressOf(config.CRLF), + Terminator: putil.AddressOf(config.CRLF), + AdvanceTimeoutInSec: putil.AddressOf(uint(150)), }, Scheduler: config.GetDefaultReplicaConfig().Scheduler, Integrity: config.GetDefaultReplicaConfig().Integrity, @@ -224,7 +226,8 @@ func TestChangefeedStateUpdate(t *testing.T) { Filter: &config.FilterConfig{Rules: []string{"*.*"}}, Mounter: &config.MounterConfig{WorkerNum: 16}, Sink: &config.SinkConfig{ - Terminator: putil.AddressOf(config.CRLF), + Terminator: putil.AddressOf(config.CRLF), + AdvanceTimeoutInSec: putil.AddressOf(uint(150)), }, Scheduler: config.GetDefaultReplicaConfig().Scheduler, Integrity: config.GetDefaultReplicaConfig().Integrity, diff --git a/tests/integration_tests/hang_sink_suicide/conf/changefeed.toml b/tests/integration_tests/hang_sink_suicide/conf/changefeed.toml new file mode 100644 index 00000000000..9161a8511f6 --- /dev/null +++ b/tests/integration_tests/hang_sink_suicide/conf/changefeed.toml @@ -0,0 +1,2 @@ +[sink] +advance-timeout-in-sec = 10 diff --git a/tests/integration_tests/hang_sink_suicide/conf/diff_config.toml b/tests/integration_tests/hang_sink_suicide/conf/diff_config.toml new file mode 100644 index 00000000000..9c5a2102afb --- /dev/null +++ b/tests/integration_tests/hang_sink_suicide/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/hang_sink_suicide/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["hang_sink_suicide.t?*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/hang_sink_suicide/run.sh b/tests/integration_tests/hang_sink_suicide/run.sh new file mode 100644 index 00000000000..3489df74e05 --- /dev/null +++ b/tests/integration_tests/hang_sink_suicide/run.sh @@ -0,0 +1,46 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + # test with mysql sink only + if [ "$SINK_TYPE" != "mysql" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + + pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=2*return(true)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --pd $pd_addr --logsuffix 1 --addr "127.0.0.1:8300" + + SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" + changefeed_id=test + cdc cli changefeed create -c $changefeed_id --config $CUR/conf/changefeed.toml --pd=$pd_addr --sink-uri="$SINK_URI" + + run_sql "CREATE DATABASE hang_sink_suicide;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE table hang_sink_suicide.t1 (id int primary key auto_increment)" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE table hang_sink_suicide.t2 (id int primary key auto_increment)" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "hang_sink_suicide.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "hang_sink_suicide.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + run_sql "insert into hang_sink_suicide.t1 values (),(),(),(),()" + run_sql "insert into hang_sink_suicide.t2 values (),(),(),(),()" + + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + export GO_FAILPOINTS='' + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index c8965d87d5c..0a48cf73794 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -10,7 +10,7 @@ group=$2 # Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant # changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence # multi_cdc_cluster capture_suicide_while_balance_table -mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint" +mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_suicide" mysql_only_http="http_api http_api_tls api_v2" mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table"