diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index f0010b88d6f..f18a1867484 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -342,6 +342,7 @@ func UnmarshalDDL(raw *model.RawKVEntry) (*timodel.Job, error) { } // FinishedTS is only set when the job is synced, // but we can use the entry's ts here + job.StartTS = raw.StartTs job.BinlogInfo.FinishedTS = raw.CRTs return job, nil } diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 079896edf62..7439b78665c 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -64,11 +64,12 @@ type Column struct { // DDLEvent represents a DDL event type DDLEvent struct { - Ts uint64 - Schema string - Table string - Query string - Type model.ActionType + StartTs uint64 + CommitTs uint64 + Schema string + Table string + Query string + Type model.ActionType } // FromJob fills the values of DDLEvent from DDL job @@ -77,7 +78,8 @@ func (e *DDLEvent) FromJob(job *model.Job) { if job.BinlogInfo.TableInfo != nil { tableName = job.BinlogInfo.TableInfo.Name.O } - e.Ts = job.BinlogInfo.FinishedTS + e.StartTs = job.StartTS + e.CommitTs = job.BinlogInfo.FinishedTS e.Query = job.Query e.Schema = job.SchemaName e.Table = tableName diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index 105582954f7..d003a95f6e4 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -135,6 +135,8 @@ func rowEventToMqMessage(e *model.RowChangedEvent) (*mqMessageKey, *mqMessageRow func mqMessageToRowEvent(key *mqMessageKey, value *mqMessageRow) *model.RowChangedEvent { e := new(model.RowChangedEvent) + // TODO: we lost the startTs from kafka message + // startTs-based txn filter is out of work e.CommitTs = key.Ts e.Table = &model.TableName{ Schema: key.Schema, @@ -153,7 +155,7 @@ func mqMessageToRowEvent(key *mqMessageKey, value *mqMessageRow) *model.RowChang func ddlEventtoMqMessage(e *model.DDLEvent) (*mqMessageKey, *mqMessageDDL) { key := &mqMessageKey{ - Ts: e.Ts, + Ts: e.CommitTs, Schema: e.Schema, Table: e.Table, Type: model.MqMessageTypeDDL, @@ -167,7 +169,9 @@ func ddlEventtoMqMessage(e *model.DDLEvent) (*mqMessageKey, *mqMessageDDL) { func mqMessageToDDLEvent(key *mqMessageKey, value *mqMessageDDL) *model.DDLEvent { e := new(model.DDLEvent) - e.Ts = key.Ts + // TODO: we lost the startTs from kafka message + // startTs-based txn filter is out of work + e.CommitTs = key.Ts e.Table = key.Table e.Schema = key.Schema e.Type = value.Type diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index 4fdf25bdf77..4f121fc58f1 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -53,29 +53,29 @@ var _ = check.Suite(&batchSuite{ Columns: map[string]*model.Column{"col1": {Type: 1, Value: "bb"}}, }}, {}}, ddlCases: [][]*model.DDLEvent{{{ - Ts: 1, - Schema: "a", - Table: "b", - Query: "create table a", - Type: 1, + CommitTs: 1, + Schema: "a", + Table: "b", + Query: "create table a", + Type: 1, }}, {{ - Ts: 1, - Schema: "a", - Table: "b", - Query: "create table a", - Type: 1, + CommitTs: 1, + Schema: "a", + Table: "b", + Query: "create table a", + Type: 1, }, { - Ts: 2, - Schema: "a", - Table: "b", - Query: "create table b", - Type: 2, + CommitTs: 2, + Schema: "a", + Table: "b", + Query: "create table b", + Type: 2, }, { - Ts: 3, - Schema: "a", - Table: "b", - Query: "create table c", - Type: 3, + CommitTs: 3, + Schema: "a", + Table: "b", + Query: "create table c", + Type: 3, }}, {}}, resolvedTsCases: [][]uint64{{1}, {1, 2, 3}, {}}, }) diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index ffc78592622..49e78e7fde7 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -95,7 +95,7 @@ func newMqSink(ctx context.Context, mqProducer mqProducer.Producer, filter *filt func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { for _, row := range rows { - if k.filter.ShouldIgnoreDMLEvent(row.CommitTs, row.Table.Schema, row.Table.Table) { + if k.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) { log.Info("Row changed event ignored", zap.Uint64("ts", row.CommitTs)) continue } @@ -166,11 +166,12 @@ func (k *mqSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { } func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { - if k.filter.ShouldIgnoreDDLEvent(ddl.Ts, ddl.Schema, ddl.Table) { + if k.filter.ShouldIgnoreDDLEvent(ddl.StartTs, ddl.Schema, ddl.Table) { log.Info( "DDL event ignored", zap.String("query", ddl.Query), - zap.Uint64("ts", ddl.Ts), + zap.Uint64("startTs", ddl.StartTs), + zap.Uint64("commitTs", ddl.CommitTs), ) return nil } diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 2e6f6e497e0..847e641b073 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -72,7 +72,7 @@ func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Row s.unresolvedRowsMu.Lock() defer s.unresolvedRowsMu.Unlock() for _, row := range rows { - if s.filter.ShouldIgnoreDMLEvent(row.CommitTs, row.Table.Schema, row.Table.Table) { + if s.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) { log.Info("Row changed event ignored", zap.Uint64("ts", row.CommitTs)) continue } @@ -122,11 +122,12 @@ func (s *mysqlSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { } func (s *mysqlSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { - if s.filter.ShouldIgnoreDDLEvent(ddl.Ts, ddl.Schema, ddl.Table) { + if s.filter.ShouldIgnoreDDLEvent(ddl.StartTs, ddl.Schema, ddl.Table) { log.Info( "DDL event ignored", zap.String("query", ddl.Query), - zap.Uint64("ts", ddl.Ts), + zap.Uint64("startTs", ddl.StartTs), + zap.Uint64("commitTs", ddl.CommitTs), ) return nil } diff --git a/cmd/changefeed.toml b/cmd/changefeed.toml index 8866c269190..7b4ebccea29 100644 --- a/cmd/changefeed.toml +++ b/cmd/changefeed.toml @@ -7,9 +7,9 @@ case-sensitive = true [filter] -# 忽略哪些 CommitTS 的事务 -# Transactions with the following CommitTS will be ignored -ignore-txn-commit-ts = [1, 2] +# 忽略哪些 StartTs 的事务 +# Transactions with the following StartTs will be ignored +ignore-txn-start-ts = [1, 2] # 同步哪些库 # The following databases(schema) will be replicated diff --git a/cmd/cmd_test.go b/cmd/cmd_test.go index b2700ad00ac..72a3e3d98a7 100644 --- a/cmd/cmd_test.go +++ b/cmd/cmd_test.go @@ -39,7 +39,7 @@ func (s *decodeFileSuite) TestCanDecodeTOML(c *check.C) { case-sensitive = false [filter] -ignore-txn-commit-ts = [1, 2] +ignore-txn-start-ts = [1, 2] ddl-white-list = [1, 2] ignore-dbs = ["test", "sys"] do-dbs = ["test1", "sys1"] @@ -77,8 +77,8 @@ sync-ddl = true c.Assert(cfg.CaseSensitive, check.IsFalse) c.Assert(cfg.Filter, check.DeepEquals, &config.FilterConfig{ - IgnoreTxnCommitTs: []uint64{1, 2}, - DDLWhitelist: []model.ActionType{1, 2}, + IgnoreTxnStartTs: []uint64{1, 2}, + DDLWhitelist: []model.ActionType{1, 2}, Rules: &filter.Rules{ IgnoreDBs: []string{"test", "sys"}, DoDBs: []string{"test1", "sys1"}, @@ -120,9 +120,9 @@ func (s *decodeFileSuite) TestAndWriteExampleTOML(c *check.C) { case-sensitive = true [filter] -# 忽略哪些 CommitTS 的事务 -# Transactions with the following CommitTS will be ignored -ignore-txn-commit-ts = [1, 2] +# 忽略哪些 StartTs 的事务 +# Transactions with the following StartTs will be ignored +ignore-txn-start-ts = [1, 2] # 同步哪些库 # The following databases(schema) will be replicated @@ -184,7 +184,7 @@ sync-ddl = true c.Assert(cfg.CaseSensitive, check.IsTrue) c.Assert(cfg.Filter, check.DeepEquals, &config.FilterConfig{ - IgnoreTxnCommitTs: []uint64{1, 2}, + IgnoreTxnStartTs: []uint64{1, 2}, Rules: &filter.Rules{ IgnoreDBs: []string{"test", "sys"}, DoDBs: []string{"test1", "sys1"}, diff --git a/kafka_consumer/main.go b/kafka_consumer/main.go index db0a7fd3a78..f50de5abc2f 100644 --- a/kafka_consumer/main.go +++ b/kafka_consumer/main.go @@ -405,16 +405,16 @@ ClaimMessages: func (c *Consumer) appendDDL(ddl *model.DDLEvent) { c.ddlListMu.Lock() defer c.ddlListMu.Unlock() - if ddl.Ts <= c.maxDDLReceivedTs { + if ddl.CommitTs <= c.maxDDLReceivedTs { return } globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs) - if ddl.Ts <= globalResolvedTs { - log.Error("unexpected ddl job", zap.Uint64("ddlts", ddl.Ts), zap.Uint64("globalResolvedTs", globalResolvedTs)) + if ddl.CommitTs <= globalResolvedTs { + log.Error("unexpected ddl job", zap.Uint64("ddlts", ddl.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs)) return } c.ddlList = append(c.ddlList, ddl) - c.maxDDLReceivedTs = ddl.Ts + c.maxDDLReceivedTs = ddl.CommitTs } func (c *Consumer) getFrontDDL() *model.DDLEvent { @@ -477,13 +477,13 @@ func (c *Consumer) Run(ctx context.Context) error { return errors.Trace(err) } todoDDL := c.getFrontDDL() - if todoDDL != nil && globalResolvedTs >= todoDDL.Ts { + if todoDDL != nil && globalResolvedTs >= todoDDL.CommitTs { //flush DMLs err := c.forEachSink(func(sink *struct { sink.Sink resolvedTs uint64 }) error { - return sink.FlushRowChangedEvents(ctx, todoDDL.Ts) + return sink.FlushRowChangedEvents(ctx, todoDDL.CommitTs) }) if err != nil { return errors.Trace(err) @@ -498,8 +498,8 @@ func (c *Consumer) Run(ctx context.Context) error { continue } - if todoDDL != nil && todoDDL.Ts < globalResolvedTs { - globalResolvedTs = todoDDL.Ts + if todoDDL != nil && todoDDL.CommitTs < globalResolvedTs { + globalResolvedTs = todoDDL.CommitTs } if lastGlobalResolvedTs == globalResolvedTs { continue diff --git a/pkg/config/filter.go b/pkg/config/filter.go index 1b5326b3ca8..59a3eaccacf 100644 --- a/pkg/config/filter.go +++ b/pkg/config/filter.go @@ -21,6 +21,6 @@ import ( // FilterConfig represents filter config for a changefeed type FilterConfig struct { *filter.Rules - IgnoreTxnCommitTs []uint64 `toml:"ignore-txn-commit-ts" json:"ignore-txn-commit-ts"` - DDLWhitelist []model.ActionType `toml:"ddl-white-list" json:"ddl-white-list"` + IgnoreTxnStartTs []uint64 `toml:"ignore-txn-start-ts" json:"ignore-txn-start-ts"` + DDLWhitelist []model.ActionType `toml:"ddl-white-list" json:"ddl-white-list"` } diff --git a/pkg/cyclic/mark.go b/pkg/cyclic/mark.go index 7d3183842ae..21f44404156 100644 --- a/pkg/cyclic/mark.go +++ b/pkg/cyclic/mark.go @@ -33,14 +33,12 @@ func CreateMarkTable(sourceSchema, sourceTable string) []*model.DDLEvent { schema, table := MarkTableName(sourceSchema, sourceTable) events := []*model.DDLEvent{ { - Ts: 0, Schema: schema, Table: table, Query: fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s;", schema), Type: timodel.ActionCreateSchema, }, { - Ts: 0, Schema: schema, Table: table, Query: fmt.Sprintf( diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index ef610d78a1e..db0982b5eb7 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -27,9 +27,9 @@ const OptCyclicConfig string = "_cyclic_relax_sql_mode" // Filter is a event filter implementation type Filter struct { - filter *filter.Filter - ignoreTxnCommitTs []uint64 - ddlWhitelist []model.ActionType + filter *filter.Filter + ignoreTxnStartTs []uint64 + ddlWhitelist []model.ActionType } // NewFilter creates a filter @@ -39,15 +39,15 @@ func NewFilter(cfg *config.ReplicaConfig) (*Filter, error) { return nil, err } return &Filter{ - filter: filter, - ignoreTxnCommitTs: cfg.Filter.IgnoreTxnCommitTs, - ddlWhitelist: cfg.Filter.DDLWhitelist, + filter: filter, + ignoreTxnStartTs: cfg.Filter.IgnoreTxnStartTs, + ddlWhitelist: cfg.Filter.DDLWhitelist, }, nil } // ShouldIgnoreTxn returns true is the given txn should be ignored -func (f *Filter) shouldIgnoreCommitTs(ts uint64) bool { - for _, ignoreTs := range f.ignoreTxnCommitTs { +func (f *Filter) shouldIgnoreStartTs(ts uint64) bool { + for _, ignoreTs := range f.ignoreTxnStartTs { if ignoreTs == ts { return true } @@ -69,13 +69,13 @@ func (f *Filter) ShouldIgnoreTable(db, tbl string) bool { // ShouldIgnoreDMLEvent removes DMLs that's not wanted by this change feed. // CDC only supports filtering by database/table now. func (f *Filter) ShouldIgnoreDMLEvent(ts uint64, schema, table string) bool { - return f.shouldIgnoreCommitTs(ts) || f.ShouldIgnoreTable(schema, table) + return f.shouldIgnoreStartTs(ts) || f.ShouldIgnoreTable(schema, table) } // ShouldIgnoreDDLEvent removes DDLs that's not wanted by this change feed. // CDC only supports filtering by database/table now. func (f *Filter) ShouldIgnoreDDLEvent(ts uint64, schema, table string) bool { - return f.shouldIgnoreCommitTs(ts) || f.ShouldIgnoreTable(schema, table) + return f.shouldIgnoreStartTs(ts) || f.ShouldIgnoreTable(schema, table) } // ShouldDiscardDDL returns true if this DDL should be discarded diff --git a/pkg/filter/filter_test.go b/pkg/filter/filter_test.go index f9bf08109bf..29359b24150 100644 --- a/pkg/filter/filter_test.go +++ b/pkg/filter/filter_test.go @@ -68,7 +68,7 @@ func (s *filterSuite) TestShouldUseCustomRules(c *check.C) { func (s *filterSuite) TestShouldIgnoreTxn(c *check.C) { filter, err := NewFilter(&config.ReplicaConfig{ Filter: &config.FilterConfig{ - IgnoreTxnCommitTs: []uint64{1, 3}, + IgnoreTxnStartTs: []uint64{1, 3}, Rules: &filter.Rules{ DoDBs: []string{"sns", "ecom"}, IgnoreTables: []*filter.Table{