Skip to content

Commit

Permalink
processor: fix a bug that will cause processor Tick get stuck when do…
Browse files Browse the repository at this point in the history
…wnstream is Kafka (#11339) (#11389)

close #11340
  • Loading branch information
ti-chi-bot authored Sep 3, 2024
1 parent e951c13 commit ae78f2c
Show file tree
Hide file tree
Showing 9 changed files with 2 additions and 213 deletions.
38 changes: 0 additions & 38 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,6 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
}
}

func (m *SinkManager) needsStuckCheck() bool {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
return m.sinkFactory.f != nil && m.sinkFactory.f.Category() == factory.CategoryMQ
}

func (m *SinkManager) initSinkFactory() (chan error, uint64) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
Expand Down Expand Up @@ -418,19 +412,6 @@ func (m *SinkManager) clearSinkFactory() {
}
}

func (m *SinkManager) putSinkFactoryError(err error, version uint64) (success bool) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
if version == m.sinkFactory.version {
select {
case m.sinkFactory.errors <- err:
default:
}
return true
}
return false
}

func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool) {
for i := 0; i < sinkWorkerNum; i++ {
w := newSinkWorker(m.changefeedID, m.sourceManager,
Expand Down Expand Up @@ -1042,25 +1023,6 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats {
m.sinkMemQuota.Release(tableID, checkpointTs)
m.redoMemQuota.Release(tableID, checkpointTs)

advanceTimeoutInSec := m.changefeedInfo.Config.Sink.AdvanceTimeoutInSec
if advanceTimeoutInSec <= 0 {
advanceTimeoutInSec = config.DefaultAdvanceTimeoutInSec
}
stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second

if m.needsStuckCheck() {
isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck)
if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", sinkVersion))
}
}

var resolvedTs model.Ts
// If redo log is enabled, we have to use redo log's resolved ts to calculate processor's min resolved ts.
if m.redoDMLMgr != nil {
Expand Down
15 changes: 0 additions & 15 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,21 +389,6 @@ func TestSinkManagerRunWithErrors(t *testing.T) {
}
}

func TestSinkManagerNeedsStuckCheck(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
changefeedInfo := getChangefeedInfo()
manager, _ := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh)
defer func() {
cancel()
manager.Close()
}()

require.False(t, manager.needsStuckCheck())
}

func TestSinkManagerRestartTableSinks(t *testing.T) {
failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause", "return")
defer failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause")
Expand Down
19 changes: 0 additions & 19 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,25 +452,6 @@ func (t *tableSinkWrapper) cleanRangeEventCounts(upperBound engine.Position, min
return shouldClean
}

func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint64) {
t.getCheckpointTs()

t.tableSink.RLock()
defer t.tableSink.RUnlock()
t.tableSink.innerMu.Lock()
defer t.tableSink.innerMu.Unlock()

// What these conditions mean:
// 1. the table sink has been associated with a valid sink;
// 2. its checkpoint hasn't been advanced for a while;
version := t.tableSink.version
advanced := t.tableSink.advanced
if version > 0 && time.Since(advanced) > stuckCheck {
return true, version
}
return false, uint64(0)
}

// handleRowChangedEvents uses to convert RowChangedEvents to TableSinkRowChangedEvents.
// It will deal with the old value compatibility.
func handleRowChangedEvents(
Expand Down
62 changes: 0 additions & 62 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"math"
"sync"
"testing"
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
Expand All @@ -28,7 +27,6 @@ import (
"github.com/pingcap/tiflow/pkg/sink"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

type mockSink struct {
Expand Down Expand Up @@ -384,63 +382,3 @@ func TestTableSinkWrapperSinkVersion(t *testing.T) {
require.Nil(t, wrapper.tableSink.s)
require.Equal(t, wrapper.tableSink.version, uint64(0))
}

func TestTableSinkWrapperSinkInner(t *testing.T) {
t.Parallel()

innerTableSink := tablesink.New[*model.RowChangedEvent](
model.ChangeFeedID{}, 1, model.Ts(0),
newMockSink(), &eventsink.RowChangeEventAppender{},
pdutil.NewClock4Test(),
prometheus.NewCounter(prometheus.CounterOpts{}),
prometheus.NewHistogram(prometheus.HistogramOpts{}),
)
version := new(uint64)

wrapper := newTableSinkWrapper(
model.DefaultChangeFeedID("1"),
1,
func() (tablesink.TableSink, uint64) {
*version += 1
return innerTableSink, *version
},
tablepb.TableStatePrepared,
oracle.GoTimeToTS(time.Now()),
oracle.GoTimeToTS(time.Now().Add(10000*time.Second)),
func(_ context.Context) (model.Ts, error) { return math.MaxUint64, nil },
)

require.True(t, wrapper.initTableSink())

wrapper.closeAndClearTableSink()

// Shouldn't be stuck because version is 0.
require.Equal(t, wrapper.tableSink.version, uint64(0))
isStuck, _ := wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

// Shouldn't be stuck because tableSink.advanced is just updated.
require.True(t, wrapper.initTableSink())
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

// Shouldn't be stuck because upperbound hasn't been advanced.
time.Sleep(200 * time.Millisecond)
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

// Shouldn't be stuck because `getCheckpointTs` will update tableSink.advanced.
nowTs := oracle.GoTimeToTS(time.Now())
wrapper.updateReceivedSorterResolvedTs(nowTs)
wrapper.barrierTs.Store(nowTs)
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

time.Sleep(200 * time.Millisecond)
nowTs = oracle.GoTimeToTS(time.Now())
wrapper.updateReceivedSorterResolvedTs(nowTs)
wrapper.barrierTs.Store(nowTs)
wrapper.updateResolvedTs(model.NewResolvedTs(nowTs))
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.True(t, isStuck)
}
1 change: 1 addition & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ type SinkConfig struct {

// AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been
// advanced for this given duration, the sink will be canceled and re-established.
// Deprecated since v6.5.11, it takes no effect now.
AdvanceTimeoutInSec uint `toml:"advance-timeout-in-sec" json:"advance-timeout-in-sec,omitempty"`
}

Expand Down

This file was deleted.

29 changes: 0 additions & 29 deletions tests/integration_tests/hang_sink_suicide/conf/diff_config.toml

This file was deleted.

47 changes: 0 additions & 47 deletions tests/integration_tests/hang_sink_suicide/run.sh

This file was deleted.

2 changes: 1 addition & 1 deletion tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ group=$2
# Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant
# changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence
# multi_cdc_cluster capture_suicide_while_balance_table
mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_suicide server_config_compatibility changefeed_dup_error_restart"
mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint server_config_compatibility changefeed_dup_error_restart"
mysql_only_http="http_api http_api_tls api_v2"
mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table"

Expand Down

0 comments on commit ae78f2c

Please sign in to comment.