Skip to content

Commit

Permalink
config(ticdc): deprecate changefeed level config enable-old-value (#9224
Browse files Browse the repository at this point in the history
)

close #9667
  • Loading branch information
3AceShowHand authored Sep 5, 2023
1 parent 84f75e9 commit e28b004
Show file tree
Hide file tree
Showing 60 changed files with 174 additions and 974 deletions.
5 changes: 2 additions & 3 deletions cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,13 @@ func verifyCreateChangefeedConfig(
if err != nil {
return nil, err
}
// set sortEngine and EnableOldValue
// set sortEngine
cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos))
if err != nil {
return nil, err
}
sortEngine := model.SortUnified
if !cdcClusterVer.ShouldEnableOldValueByDefault() {
replicaConfig.EnableOldValue = false
if !cdcClusterVer.LessThan500RC() {
log.Warn("The TiCDC cluster is built from unknown branch or less than 5.0.0-rc, the old-value are disabled by default.")
if !cdcClusterVer.ShouldEnableUnifiedSorterByDefault() {
sortEngine = model.SortInMemory
Expand Down
16 changes: 0 additions & 16 deletions cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
})
cfg.ReplicaConfig = GetDefaultReplicaConfig()
cfg.ReplicaConfig.ForceReplicate = true
cfg.ReplicaConfig.EnableOldValue = false
cfg.SinkURI = "mysql://"
ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil)
// disable old value but force replicate, and using mysql sink.
Expand Down Expand Up @@ -90,7 +89,6 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
require.NotNil(t, err)
cfg.TargetTs = 6
cfg.ReplicaConfig.EnableOldValue = false
cfg.SinkURI = "aaab://"
ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil)
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
Expand All @@ -103,12 +101,10 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
cfg.StartTs = 0
// use blackhole to workaround
cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro"
cfg.ReplicaConfig.EnableOldValue = true
cfg.ReplicaConfig.ForceReplicate = false
ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil)
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
require.NoError(t, err)
require.False(t, cfInfo.Config.EnableOldValue)

cfg.ReplicaConfig.ForceReplicate = true
ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil)
Expand Down Expand Up @@ -165,16 +161,4 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
cfg.TargetTs = 9
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NotNil(t, err)

cfg.StartTs = 0
cfg.TargetTs = 0
cfg.ReplicaConfig.EnableOldValue = true
cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro"
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NoError(t, err)
require.False(t, newCfInfo.Config.EnableOldValue)

cfg.ReplicaConfig.ForceReplicate = true
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.Error(t, cerror.ErrOldValueNotEnabled, err)
}
3 changes: 0 additions & 3 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ func (d *JSONDuration) UnmarshalJSON(b []byte) error {
type ReplicaConfig struct {
MemoryQuota uint64 `json:"memory_quota"`
CaseSensitive bool `json:"case_sensitive"`
EnableOldValue bool `json:"enable_old_value"`
ForceReplicate bool `json:"force_replicate"`
IgnoreIneligibleTable bool `json:"ignore_ineligible_table"`
CheckGCSafePoint bool `json:"check_gc_safe_point"`
Expand Down Expand Up @@ -206,7 +205,6 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
) *config.ReplicaConfig {
res.MemoryQuota = c.MemoryQuota
res.CaseSensitive = c.CaseSensitive
res.EnableOldValue = c.EnableOldValue
res.ForceReplicate = c.ForceReplicate
res.CheckGCSafePoint = c.CheckGCSafePoint
res.EnableSyncPoint = c.EnableSyncPoint
Expand Down Expand Up @@ -487,7 +485,6 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
res := &ReplicaConfig{
MemoryQuota: cloned.MemoryQuota,
CaseSensitive: cloned.CaseSensitive,
EnableOldValue: cloned.EnableOldValue,
ForceReplicate: cloned.ForceReplicate,
IgnoreIneligibleTable: cloned.IgnoreIneligibleTable,
CheckGCSafePoint: cloned.CheckGCSafePoint,
Expand Down
2 changes: 0 additions & 2 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
var defaultAPIConfig = &ReplicaConfig{
MemoryQuota: config.DefaultChangefeedMemoryQuota,
CaseSensitive: true,
EnableOldValue: true,
CheckGCSafePoint: true,
BDRMode: util.AddressOf(false),
EnableSyncPoint: util.AddressOf(false),
Expand Down Expand Up @@ -95,7 +94,6 @@ func TestDefaultReplicaConfig(t *testing.T) {

func TestToAPIReplicaConfig(t *testing.T) {
cfg := config.GetDefaultReplicaConfig()
cfg.EnableOldValue = false
cfg.CheckGCSafePoint = false
cfg.Sink = &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
Expand Down
5 changes: 0 additions & 5 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,11 +496,6 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
preRawCols []types.Datum
preChecksum uint32
)
// Since we now always use old value internally,
// we need to control the output(sink will use the PreColumns field to determine whether to output old value).
// Normally old value is output when only enableOldValue is on,
// but for the Delete event, when the old value feature is off,
// the HandleKey column needs to be included as well. So we need to do the following filtering.
if row.PreRowExist {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
Expand Down
21 changes: 0 additions & 21 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"math"
"net/url"
"regexp"
"strings"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -431,14 +430,6 @@ func (info *ChangeFeedInfo) FixIncompatible() {
inheritV66 := creatorVersionGate.ChangefeedInheritSchedulerConfigFromV66()
info.fixScheduler(inheritV66)
log.Info("Fix incompatible scheduler completed", zap.String("changefeed", info.String()))

if creatorVersionGate.ChangefeedAdjustEnableOldValueByProtocol() {
log.Info("Start fixing incompatible enable old value", zap.String("changefeed", info.String()),
zap.Bool("enableOldValue", info.Config.EnableOldValue))
info.fixEnableOldValue()
log.Info("Fix incompatible enable old value completed", zap.String("changefeed", info.String()),
zap.Bool("enableOldValue", info.Config.EnableOldValue))
}
}

// fixState attempts to fix state loss from upgrading the old owner to the new owner.
Expand Down Expand Up @@ -509,18 +500,6 @@ func (info *ChangeFeedInfo) fixMySQLSinkProtocol() {
}
}

func (info *ChangeFeedInfo) fixEnableOldValue() {
uri, err := url.Parse(info.SinkURI)
if err != nil {
// this is impossible to happen, since the changefeed registered successfully.
log.Warn("parse sink URI failed", zap.Error(err))
return
}
scheme := strings.ToLower(uri.Scheme)
protocol := uri.Query().Get(config.ProtocolKey)
info.Config.AdjustEnableOldValue(scheme, protocol)
}

func (info *ChangeFeedInfo) fixMQSinkProtocol() {
uri, err := url.Parse(info.SinkURI)
if err != nil {
Expand Down
5 changes: 0 additions & 5 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ func TestVerifyAndComplete(t *testing.T) {
Config: &config.ReplicaConfig{
MemoryQuota: 1073741824,
CaseSensitive: true,
EnableOldValue: true,
CheckGCSafePoint: true,
SyncPointInterval: util.AddressOf(time.Minute * 10),
SyncPointRetention: util.AddressOf(time.Hour * 24),
Expand Down Expand Up @@ -923,7 +922,6 @@ func TestChangeFeedInfoClone(t *testing.T) {
StartTs: 417257993615179777,
Config: &config.ReplicaConfig{
CaseSensitive: true,
EnableOldValue: true,
CheckGCSafePoint: true,
},
}
Expand All @@ -932,11 +930,8 @@ func TestChangeFeedInfoClone(t *testing.T) {
require.Nil(t, err)
sinkURI := "mysql://unix:/var/run/tidb.sock"
cloned.SinkURI = sinkURI
cloned.Config.EnableOldValue = false
require.Equal(t, sinkURI, cloned.SinkURI)
require.False(t, cloned.Config.EnableOldValue)
require.Equal(t, "blackhole://", info.SinkURI)
require.True(t, info.Config.EnableOldValue)
}

func TestChangefeedInfoStringer(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,6 @@ func TestChangefeedStatusNotExist(t *testing.T) {
"sort-engine": "unified",
"config": {
"case-sensitive": true,
"enable-old-value": true,
"force-replicate": false,
"check-gc-safe-point": true,
"filter": {
Expand Down
1 change: 0 additions & 1 deletion cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ func initProcessor4Test(
"sort-dir": ".",
"config": {
"case-sensitive": true,
"enable-old-value": false,
"force-replicate": false,
"check-gc-safe-point": true,
"filter": {
Expand Down
15 changes: 7 additions & 8 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
}()

splitTxn := util.GetOrZero(m.changefeedInfo.Config.Sink.TxnAtomicity).ShouldSplitTxn()
enableOldValue := m.changefeedInfo.Config.EnableOldValue

gcErrors := make(chan error, 16)
sinkErrors := make(chan error, 16)
Expand All @@ -204,7 +203,7 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
if m.sinkEg == nil {
var sinkCtx context.Context
m.sinkEg, sinkCtx = errgroup.WithContext(m.managerCtx)
m.startSinkWorkers(sinkCtx, m.sinkEg, splitTxn, enableOldValue)
m.startSinkWorkers(sinkCtx, m.sinkEg, splitTxn)
m.sinkEg.Go(func() error { return m.generateSinkTasks(sinkCtx) })
m.wg.Add(1)
go func() {
Expand All @@ -224,7 +223,7 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
if m.redoDMLMgr != nil && m.redoEg == nil {
var redoCtx context.Context
m.redoEg, redoCtx = errgroup.WithContext(m.managerCtx)
m.startRedoWorkers(redoCtx, m.redoEg, enableOldValue)
m.startRedoWorkers(redoCtx, m.redoEg)
m.redoEg.Go(func() error { return m.generateRedoTasks(redoCtx) })
m.wg.Add(1)
go func() {
Expand Down Expand Up @@ -390,20 +389,20 @@ func (m *SinkManager) putSinkFactoryError(err error, version uint64) {
zap.String("error", err.Error()))
}

func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool, enableOldValue bool) {
func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool) {
for i := 0; i < sinkWorkerNum; i++ {
w := newSinkWorker(m.changefeedID, m.sourceManager,
m.sinkMemQuota, m.redoMemQuota,
m.eventCache, splitTxn, enableOldValue)
m.eventCache, splitTxn)
m.sinkWorkers = append(m.sinkWorkers, w)
eg.Go(func() error { return w.handleTasks(ctx, m.sinkTaskChan) })
}
}

func (m *SinkManager) startRedoWorkers(ctx context.Context, eg *errgroup.Group, enableOldValue bool) {
func (m *SinkManager) startRedoWorkers(ctx context.Context, eg *errgroup.Group) {
for i := 0; i < redoWorkerNum; i++ {
w := newRedoWorker(m.changefeedID, m.sourceManager, m.redoMemQuota,
m.redoDMLMgr, m.eventCache, enableOldValue)
m.redoDMLMgr, m.eventCache)
m.redoWorkers = append(m.redoWorkers, w)
eg.Go(func() error { return w.handleTasks(ctx, m.redoTaskChan) })
}
Expand All @@ -419,7 +418,7 @@ func (m *SinkManager) backgroundGC(errors chan<- error) {
for {
select {
case <-m.managerCtx.Done():
log.Info("Background GC is stoped because context is canceled",
log.Info("Background GC is stopped because context is canceled",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID))
return
Expand Down
5 changes: 1 addition & 4 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type redoWorker struct {
memQuota *memquota.MemQuota
redoDMLManager redo.DMLManager
eventCache *redoEventCache
enableOldValue bool
}

func newRedoWorker(
Expand All @@ -42,15 +41,13 @@ func newRedoWorker(
quota *memquota.MemQuota,
redoDMLMgr redo.DMLManager,
eventCache *redoEventCache,
enableOldValue bool,
) *redoWorker {
return &redoWorker{
changefeedID: changefeedID,
sourceManager: sourceManager,
memQuota: quota,
redoDMLManager: redoDMLMgr,
eventCache: eventCache,
enableOldValue: enableOldValue,
}
}

Expand Down Expand Up @@ -148,7 +145,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
if e.Row != nil {
// For all events, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
x, size, err = convertRowChangedEvents(w.changefeedID, task.span, w.enableOldValue, e)
x, size, err = handleRowChangedEvents(w.changefeedID, task.span, e)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/redo_log_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (suite *redoLogWorkerSuite) createWorker(
eventCache := newRedoEventCache(suite.testChangefeedID, 1024)

return newRedoWorker(suite.testChangefeedID, sm, quota,
redoDMLManager, eventCache, false), sortEngine, redoDMLManager
redoDMLManager, eventCache), sortEngine, redoDMLManager
}

func (suite *redoLogWorkerSuite) addEventsToSortEngine(
Expand Down
19 changes: 7 additions & 12 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ type sinkWorker struct {
eventCache *redoEventCache
// splitTxn indicates whether to split the transaction into multiple batches.
splitTxn bool
// enableOldValue indicates whether to enable the old value feature.
// If it is enabled, we need to deal with the compatibility of the data format.
enableOldValue bool

// Metrics.
metricRedoEventCacheHit prometheus.Counter
Expand All @@ -83,16 +80,14 @@ func newSinkWorker(
redoQuota *memquota.MemQuota,
eventCache *redoEventCache,
splitTxn bool,
enableOldValue bool,
) *sinkWorker {
return &sinkWorker{
changefeedID: changefeedID,
sourceManager: sourceManager,
sinkMemQuota: sinkQuota,
redoMemQuota: redoQuota,
eventCache: eventCache,
splitTxn: splitTxn,
enableOldValue: enableOldValue,
changefeedID: changefeedID,
sourceManager: sourceManager,
sinkMemQuota: sinkQuota,
redoMemQuota: redoQuota,
eventCache: eventCache,
splitTxn: splitTxn,

metricRedoEventCacheHit: RedoEventCacheAccess.WithLabelValues(changefeedID.Namespace, changefeedID.ID, "hit"),
metricRedoEventCacheMiss: RedoEventCacheAccess.WithLabelValues(changefeedID.Namespace, changefeedID.ID, "miss"),
Expand Down Expand Up @@ -236,7 +231,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
if e.Row != nil {
// For all rows, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
x, size, err := convertRowChangedEvents(w.changefeedID, task.span, w.enableOldValue, e)
x, size, err := handleRowChangedEvents(w.changefeedID, task.span, e)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (suite *tableSinkWorkerSuite) createWorker(
quota.ForceAcquire(testEventSize)
quota.AddTable(suite.testSpan)

return newSinkWorker(suite.testChangefeedID, sm, quota, nil, nil, splitTxn, false), sortEngine
return newSinkWorker(suite.testChangefeedID, sm, quota, nil, nil, splitTxn), sortEngine
}

func (suite *tableSinkWorkerSuite) addEventsToSortEngine(
Expand Down
Loading

0 comments on commit e28b004

Please sign in to comment.