Skip to content

Commit

Permalink
puller(ticdc): fix wrong update splitting behavior after table schedu…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 11, 2024
1 parent d9f6759 commit f203805
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 92 deletions.
18 changes: 18 additions & 0 deletions cdc/model/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package model

import (
"errors"
"fmt"

"github.com/pingcap/tiflow/cdc/processor/tablepb"
Expand Down Expand Up @@ -110,3 +111,20 @@ func (v *RawKVEntry) String() string {
func (v *RawKVEntry) ApproximateDataSize() int64 {
return int64(len(v.Key) + len(v.Value) + len(v.OldValue))
}

// ShouldSplitKVEntry checks whether the raw kv entry should be splitted.
type ShouldSplitKVEntry func(raw *RawKVEntry) bool

// SplitUpdateKVEntry splits the raw kv entry into a delete entry and an insert entry.
func SplitUpdateKVEntry(raw *RawKVEntry) (*RawKVEntry, *RawKVEntry, error) {
if raw == nil {
return nil, nil, errors.New("nil event cannot be split")
}
deleteKVEntry := *raw
deleteKVEntry.Value = nil

insertKVEntry := *raw
insertKVEntry.OldValue = nil

return &deleteKVEntry, &insertKVEntry, nil
}
32 changes: 15 additions & 17 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,13 @@ func (p *processor) AddTableSpan(
zap.Bool("isPrepare", isPrepare))
}

p.sinkManager.r.AddTable(
table := p.sinkManager.r.AddTable(
span, startTs, p.changefeed.Info.TargetTs)
if p.redo.r.Enabled() {
p.redo.r.AddTable(span, startTs)
}
p.sourceManager.r.AddTable(span, p.getTableName(ctx, span.TableID), startTs)

p.sourceManager.r.AddTable(span, p.getTableName(ctx, span.TableID), startTs, table.GetReplicaTs)
return true, nil
}

Expand Down Expand Up @@ -447,18 +447,6 @@ func isProcessorIgnorableError(err error) bool {
return false
}

// needPullerSafeModeAtStart returns true if the scheme is mysql compatible.
// pullerSafeMode means to split all update kv entries whose commitTS
// is older then the start time of this changefeed.
func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// Tick implements the `orchestrator.State` interface
// the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd
// The main logic of processor is in this function, including the calculation of many kinds of ts,
Expand Down Expand Up @@ -642,6 +630,16 @@ func (p *processor) createTaskPosition() (skipThisTick bool) {
return true
}

// isMysqlCompatibleBackend returns true if the sinkURIStr is mysql compatible.
func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick.
func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) {
if p.initialized {
Expand Down Expand Up @@ -698,20 +696,20 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) {
return errors.Trace(err)
}

pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.changefeed.Info.SinkURI)
isMysqlBackend, err := isMysqlCompatibleBackend(p.changefeed.Info.SinkURI)
if err != nil {
return errors.Trace(err)
}
p.sourceManager.r = sourcemanager.New(
p.changefeedID, p.upstream, p.mg.r,
sortEngine, p.changefeed.Info.Config.BDRMode,
pullerSafeModeAtStart)
isMysqlBackend)
p.sourceManager.name = "SourceManager"
p.sourceManager.spawn(stdCtx)

p.sinkManager.r = sinkmanager.New(
p.changefeedID, p.changefeed.Info, p.upstream,
p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r)
p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r, isMysqlBackend)
p.sinkManager.name = "SinkManager"
p.sinkManager.spawn(stdCtx)

Expand Down
15 changes: 12 additions & 3 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type SinkManager struct {
// wg is used to wait for all workers to exit.
wg sync.WaitGroup

// isMysqlBackend indicates whether the backend is MySQL compatible.
isMysqlBackend bool

// Metric for table sink.
metricsTableSinkTotalRows prometheus.Counter

Expand All @@ -142,6 +145,7 @@ func New(
schemaStorage entry.SchemaStorage,
redoDMLMgr redo.DMLManager,
sourceManager *sourcemanager.SourceManager,
isMysqlBackend bool,
) *SinkManager {
m := &SinkManager{
changefeedID: changefeedID,
Expand All @@ -155,7 +159,7 @@ func New(
sinkTaskChan: make(chan *sinkTask),
sinkWorkerAvailable: make(chan struct{}, 1),
sinkRetry: retry.NewInfiniteErrorRetry(),

isMysqlBackend: isMysqlBackend,
metricsTableSinkTotalRows: tablesinkmetrics.TotalRowsCountCounter.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),

Expand Down Expand Up @@ -302,6 +306,11 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
if cerror.IsDupEntryError(err) {
return errors.Trace(err)
}

if m.isMysqlBackend {
// For MySQL backend, we should restart sink. Let owner to handle the error.
return errors.Trace(err)
}
}

// If the error is retryable, we should retry to re-establish the internal resources.
Expand Down Expand Up @@ -836,7 +845,7 @@ func (m *SinkManager) UpdateBarrierTs(globalBarrierTs model.Ts, tableBarrier map
}

// AddTable adds a table(TableSink) to the sink manager.
func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs model.Ts) {
func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs model.Ts) *tableSinkWrapper {
sinkWrapper := newTableSinkWrapper(
m.changefeedID,
span,
Expand Down Expand Up @@ -864,7 +873,6 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span))
return
}
m.sinkMemQuota.AddTable(span)
m.redoMemQuota.AddTable(span)
Expand All @@ -874,6 +882,7 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod
zap.Stringer("span", &span),
zap.Uint64("startTs", startTs),
zap.Uint64("version", sinkWrapper.version))
return sinkWrapper
}

// StartTable sets the table(TableSink) state to replicating.
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestAddTable(t *testing.T) {
require.Equal(t, 0, manager.sinkProgressHeap.len(), "Not started table shout not in progress heap")
err := manager.StartTable(span, 1)
require.NoError(t, err)
require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs)
require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs.Load())

progress := manager.sinkProgressHeap.pop()
require.Equal(t, span, progress.span)
Expand Down Expand Up @@ -356,7 +356,7 @@ func TestSinkManagerRunWithErrors(t *testing.T) {

span := spanz.TableIDToComparableSpan(1)

source.AddTable(span, "test", 100)
source.AddTable(span, "test", 100, func() model.Ts { return 0 })
manager.AddTable(span, 100, math.MaxUint64)
manager.StartTable(span, 100)
source.Add(span, model.NewResolvedPolymorphicEvent(0, 101))
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/manager_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func CreateManagerWithMemEngine(
go func() { handleError(sourceManager.Run(ctx)) }()
sourceManager.WaitForReady(ctx)

sinkManager := New(changefeedID, changefeedInfo, up, schemaStorage, nil, sourceManager)
sinkManager := New(changefeedID, changefeedInfo, up, schemaStorage, nil, sourceManager, false)
go func() { handleError(sinkManager.Run(ctx)) }()
sinkManager.WaitForReady(ctx)

Expand All @@ -90,6 +90,6 @@ func NewManagerWithMemEngine(
mg := &entry.MockMountGroup{}
schemaStorage := &entry.MockSchemaStorage{Resolved: math.MaxUint64}
sourceManager := sourcemanager.NewForTest(changefeedID, up, mg, sortEngine, false)
sinkManager := New(changefeedID, changefeedInfo, up, schemaStorage, redoMgr, sourceManager)
sinkManager := New(changefeedID, changefeedInfo, up, schemaStorage, redoMgr, sourceManager, false)
return sinkManager, sourceManager, sortEngine
}
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
// NOTICE: The event can be filtered by the event filter.
if e.Row != nil {
// For all events, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
e.Row.ReplicatingTs = task.tableSink.replicateTs.Load()
x, size = handleRowChangedEvents(w.changefeedID, task.span, e)
advancer.appendEvents(x, size)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
// NOTICE: The event can be filtered by the event filter.
if e.Row != nil {
// For all rows, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
e.Row.ReplicatingTs = task.tableSink.GetReplicaTs()
x, size := handleRowChangedEvents(w.changefeedID, task.span, e)
advancer.appendEvents(x, size)
allEventSize += size
Expand Down
25 changes: 18 additions & 7 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package sinkmanager

import (
"context"
"math"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -77,7 +78,7 @@ type tableSinkWrapper struct {
receivedSorterResolvedTs atomic.Uint64

// replicateTs is the ts that the table sink has started to replicate.
replicateTs model.Ts
replicateTs atomic.Uint64
genReplicateTs func(ctx context.Context) (model.Ts, error)

// lastCleanTime indicates the last time the table has been cleaned.
Expand All @@ -90,6 +91,11 @@ type tableSinkWrapper struct {
rangeEventCountsMu sync.Mutex
}

// GetReplicaTs returns the replicate ts of the table sink.
func (t *tableSinkWrapper) GetReplicaTs() model.Ts {
return t.replicateTs.Load()
}

type rangeEventCount struct {
// firstPos and lastPos are used to merge many rangeEventCount into one.
firstPos engine.Position
Expand Down Expand Up @@ -132,31 +138,34 @@ func newTableSinkWrapper(

res.receivedSorterResolvedTs.Store(startTs)
res.barrierTs.Store(startTs)
res.replicateTs.Store(math.MaxUint64)
return res
}

func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err error) {
if t.replicateTs != 0 {
if t.replicateTs.Load() != math.MaxUint64 {
log.Panic("The table sink has already started",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Stringer("span", &t.span),
zap.Uint64("startTs", startTs),
zap.Uint64("oldReplicateTs", t.replicateTs),
zap.Uint64("oldReplicateTs", t.replicateTs.Load()),
)
}

// FIXME(qupeng): it can be re-fetched later instead of fails.
if t.replicateTs, err = t.genReplicateTs(ctx); err != nil {
ts, err := t.genReplicateTs(ctx)
if err != nil {
return errors.Trace(err)
}
t.replicateTs.Store(ts)

log.Info("Sink is started",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Stringer("span", &t.span),
zap.Uint64("startTs", startTs),
zap.Uint64("replicateTs", t.replicateTs),
zap.Uint64("replicateTs", ts),
)

// This start ts maybe greater than the initial start ts of the table sink.
Expand Down Expand Up @@ -379,14 +388,16 @@ func (t *tableSinkWrapper) checkTableSinkHealth() (err error) {
// committed at downstream but we don't know. So we need to update `replicateTs`
// of the table so that we can re-send those events later.
func (t *tableSinkWrapper) restart(ctx context.Context) (err error) {
if t.replicateTs, err = t.genReplicateTs(ctx); err != nil {
ts, err := t.genReplicateTs(ctx)
if err != nil {
return errors.Trace(err)
}
t.replicateTs.Store(ts)
log.Info("Sink is restarted",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Stringer("span", &t.span),
zap.Uint64("replicateTs", t.replicateTs))
zap.Uint64("replicateTs", ts))
return nil
}

Expand Down
Loading

0 comments on commit f203805

Please sign in to comment.