Skip to content

Commit

Permalink
using startts to filter txn (#589)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro authored May 21, 2020
1 parent 501016c commit b66fffd
Show file tree
Hide file tree
Showing 13 changed files with 74 additions and 67 deletions.
1 change: 1 addition & 0 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ func UnmarshalDDL(raw *model.RawKVEntry) (*timodel.Job, error) {
}
// FinishedTS is only set when the job is synced,
// but we can use the entry's ts here
job.StartTS = raw.StartTs
job.BinlogInfo.FinishedTS = raw.CRTs
return job, nil
}
Expand Down
14 changes: 8 additions & 6 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ type Column struct {

// DDLEvent represents a DDL event
type DDLEvent struct {
Ts uint64
Schema string
Table string
Query string
Type model.ActionType
StartTs uint64
CommitTs uint64
Schema string
Table string
Query string
Type model.ActionType
}

// FromJob fills the values of DDLEvent from DDL job
Expand All @@ -77,7 +78,8 @@ func (e *DDLEvent) FromJob(job *model.Job) {
if job.BinlogInfo.TableInfo != nil {
tableName = job.BinlogInfo.TableInfo.Name.O
}
e.Ts = job.BinlogInfo.FinishedTS
e.StartTs = job.StartTS
e.CommitTs = job.BinlogInfo.FinishedTS
e.Query = job.Query
e.Schema = job.SchemaName
e.Table = tableName
Expand Down
8 changes: 6 additions & 2 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ func rowEventToMqMessage(e *model.RowChangedEvent) (*mqMessageKey, *mqMessageRow

func mqMessageToRowEvent(key *mqMessageKey, value *mqMessageRow) *model.RowChangedEvent {
e := new(model.RowChangedEvent)
// TODO: we lost the startTs from kafka message
// startTs-based txn filter is out of work
e.CommitTs = key.Ts
e.Table = &model.TableName{
Schema: key.Schema,
Expand All @@ -153,7 +155,7 @@ func mqMessageToRowEvent(key *mqMessageKey, value *mqMessageRow) *model.RowChang

func ddlEventtoMqMessage(e *model.DDLEvent) (*mqMessageKey, *mqMessageDDL) {
key := &mqMessageKey{
Ts: e.Ts,
Ts: e.CommitTs,
Schema: e.Schema,
Table: e.Table,
Type: model.MqMessageTypeDDL,
Expand All @@ -167,7 +169,9 @@ func ddlEventtoMqMessage(e *model.DDLEvent) (*mqMessageKey, *mqMessageDDL) {

func mqMessageToDDLEvent(key *mqMessageKey, value *mqMessageDDL) *model.DDLEvent {
e := new(model.DDLEvent)
e.Ts = key.Ts
// TODO: we lost the startTs from kafka message
// startTs-based txn filter is out of work
e.CommitTs = key.Ts
e.Table = key.Table
e.Schema = key.Schema
e.Type = value.Type
Expand Down
40 changes: 20 additions & 20 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,29 +53,29 @@ var _ = check.Suite(&batchSuite{
Columns: map[string]*model.Column{"col1": {Type: 1, Value: "bb"}},
}}, {}},
ddlCases: [][]*model.DDLEvent{{{
Ts: 1,
Schema: "a",
Table: "b",
Query: "create table a",
Type: 1,
CommitTs: 1,
Schema: "a",
Table: "b",
Query: "create table a",
Type: 1,
}}, {{
Ts: 1,
Schema: "a",
Table: "b",
Query: "create table a",
Type: 1,
CommitTs: 1,
Schema: "a",
Table: "b",
Query: "create table a",
Type: 1,
}, {
Ts: 2,
Schema: "a",
Table: "b",
Query: "create table b",
Type: 2,
CommitTs: 2,
Schema: "a",
Table: "b",
Query: "create table b",
Type: 2,
}, {
Ts: 3,
Schema: "a",
Table: "b",
Query: "create table c",
Type: 3,
CommitTs: 3,
Schema: "a",
Table: "b",
Query: "create table c",
Type: 3,
}}, {}},
resolvedTsCases: [][]uint64{{1}, {1, 2, 3}, {}},
})
Expand Down
7 changes: 4 additions & 3 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func newMqSink(ctx context.Context, mqProducer mqProducer.Producer, filter *filt

func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
if k.filter.ShouldIgnoreDMLEvent(row.CommitTs, row.Table.Schema, row.Table.Table) {
if k.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) {
log.Info("Row changed event ignored", zap.Uint64("ts", row.CommitTs))
continue
}
Expand Down Expand Up @@ -166,11 +166,12 @@ func (k *mqSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
}

func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
if k.filter.ShouldIgnoreDDLEvent(ddl.Ts, ddl.Schema, ddl.Table) {
if k.filter.ShouldIgnoreDDLEvent(ddl.StartTs, ddl.Schema, ddl.Table) {
log.Info(
"DDL event ignored",
zap.String("query", ddl.Query),
zap.Uint64("ts", ddl.Ts),
zap.Uint64("startTs", ddl.StartTs),
zap.Uint64("commitTs", ddl.CommitTs),
)
return nil
}
Expand Down
7 changes: 4 additions & 3 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Row
s.unresolvedRowsMu.Lock()
defer s.unresolvedRowsMu.Unlock()
for _, row := range rows {
if s.filter.ShouldIgnoreDMLEvent(row.CommitTs, row.Table.Schema, row.Table.Table) {
if s.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) {
log.Info("Row changed event ignored", zap.Uint64("ts", row.CommitTs))
continue
}
Expand Down Expand Up @@ -122,11 +122,12 @@ func (s *mysqlSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
}

func (s *mysqlSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
if s.filter.ShouldIgnoreDDLEvent(ddl.Ts, ddl.Schema, ddl.Table) {
if s.filter.ShouldIgnoreDDLEvent(ddl.StartTs, ddl.Schema, ddl.Table) {
log.Info(
"DDL event ignored",
zap.String("query", ddl.Query),
zap.Uint64("ts", ddl.Ts),
zap.Uint64("startTs", ddl.StartTs),
zap.Uint64("commitTs", ddl.CommitTs),
)
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/changefeed.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
case-sensitive = true

[filter]
# 忽略哪些 CommitTS 的事务
# Transactions with the following CommitTS will be ignored
ignore-txn-commit-ts = [1, 2]
# 忽略哪些 StartTs 的事务
# Transactions with the following StartTs will be ignored
ignore-txn-start-ts = [1, 2]

# 同步哪些库
# The following databases(schema) will be replicated
Expand Down
14 changes: 7 additions & 7 deletions cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (s *decodeFileSuite) TestCanDecodeTOML(c *check.C) {
case-sensitive = false
[filter]
ignore-txn-commit-ts = [1, 2]
ignore-txn-start-ts = [1, 2]
ddl-white-list = [1, 2]
ignore-dbs = ["test", "sys"]
do-dbs = ["test1", "sys1"]
Expand Down Expand Up @@ -77,8 +77,8 @@ sync-ddl = true

c.Assert(cfg.CaseSensitive, check.IsFalse)
c.Assert(cfg.Filter, check.DeepEquals, &config.FilterConfig{
IgnoreTxnCommitTs: []uint64{1, 2},
DDLWhitelist: []model.ActionType{1, 2},
IgnoreTxnStartTs: []uint64{1, 2},
DDLWhitelist: []model.ActionType{1, 2},
Rules: &filter.Rules{
IgnoreDBs: []string{"test", "sys"},
DoDBs: []string{"test1", "sys1"},
Expand Down Expand Up @@ -120,9 +120,9 @@ func (s *decodeFileSuite) TestAndWriteExampleTOML(c *check.C) {
case-sensitive = true
[filter]
# 忽略哪些 CommitTS 的事务
# Transactions with the following CommitTS will be ignored
ignore-txn-commit-ts = [1, 2]
# 忽略哪些 StartTs 的事务
# Transactions with the following StartTs will be ignored
ignore-txn-start-ts = [1, 2]
# 同步哪些库
# The following databases(schema) will be replicated
Expand Down Expand Up @@ -184,7 +184,7 @@ sync-ddl = true

c.Assert(cfg.CaseSensitive, check.IsTrue)
c.Assert(cfg.Filter, check.DeepEquals, &config.FilterConfig{
IgnoreTxnCommitTs: []uint64{1, 2},
IgnoreTxnStartTs: []uint64{1, 2},
Rules: &filter.Rules{
IgnoreDBs: []string{"test", "sys"},
DoDBs: []string{"test1", "sys1"},
Expand Down
16 changes: 8 additions & 8 deletions kafka_consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,16 +405,16 @@ ClaimMessages:
func (c *Consumer) appendDDL(ddl *model.DDLEvent) {
c.ddlListMu.Lock()
defer c.ddlListMu.Unlock()
if ddl.Ts <= c.maxDDLReceivedTs {
if ddl.CommitTs <= c.maxDDLReceivedTs {
return
}
globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs)
if ddl.Ts <= globalResolvedTs {
log.Error("unexpected ddl job", zap.Uint64("ddlts", ddl.Ts), zap.Uint64("globalResolvedTs", globalResolvedTs))
if ddl.CommitTs <= globalResolvedTs {
log.Error("unexpected ddl job", zap.Uint64("ddlts", ddl.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs))
return
}
c.ddlList = append(c.ddlList, ddl)
c.maxDDLReceivedTs = ddl.Ts
c.maxDDLReceivedTs = ddl.CommitTs
}

func (c *Consumer) getFrontDDL() *model.DDLEvent {
Expand Down Expand Up @@ -477,13 +477,13 @@ func (c *Consumer) Run(ctx context.Context) error {
return errors.Trace(err)
}
todoDDL := c.getFrontDDL()
if todoDDL != nil && globalResolvedTs >= todoDDL.Ts {
if todoDDL != nil && globalResolvedTs >= todoDDL.CommitTs {
//flush DMLs
err := c.forEachSink(func(sink *struct {
sink.Sink
resolvedTs uint64
}) error {
return sink.FlushRowChangedEvents(ctx, todoDDL.Ts)
return sink.FlushRowChangedEvents(ctx, todoDDL.CommitTs)
})
if err != nil {
return errors.Trace(err)
Expand All @@ -498,8 +498,8 @@ func (c *Consumer) Run(ctx context.Context) error {
continue
}

if todoDDL != nil && todoDDL.Ts < globalResolvedTs {
globalResolvedTs = todoDDL.Ts
if todoDDL != nil && todoDDL.CommitTs < globalResolvedTs {
globalResolvedTs = todoDDL.CommitTs
}
if lastGlobalResolvedTs == globalResolvedTs {
continue
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ import (
// FilterConfig represents filter config for a changefeed
type FilterConfig struct {
*filter.Rules
IgnoreTxnCommitTs []uint64 `toml:"ignore-txn-commit-ts" json:"ignore-txn-commit-ts"`
DDLWhitelist []model.ActionType `toml:"ddl-white-list" json:"ddl-white-list"`
IgnoreTxnStartTs []uint64 `toml:"ignore-txn-start-ts" json:"ignore-txn-start-ts"`
DDLWhitelist []model.ActionType `toml:"ddl-white-list" json:"ddl-white-list"`
}
2 changes: 0 additions & 2 deletions pkg/cyclic/mark.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,12 @@ func CreateMarkTable(sourceSchema, sourceTable string) []*model.DDLEvent {
schema, table := MarkTableName(sourceSchema, sourceTable)
events := []*model.DDLEvent{
{
Ts: 0,
Schema: schema,
Table: table,
Query: fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s;", schema),
Type: timodel.ActionCreateSchema,
},
{
Ts: 0,
Schema: schema,
Table: table,
Query: fmt.Sprintf(
Expand Down
20 changes: 10 additions & 10 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ const OptCyclicConfig string = "_cyclic_relax_sql_mode"

// Filter is a event filter implementation
type Filter struct {
filter *filter.Filter
ignoreTxnCommitTs []uint64
ddlWhitelist []model.ActionType
filter *filter.Filter
ignoreTxnStartTs []uint64
ddlWhitelist []model.ActionType
}

// NewFilter creates a filter
Expand All @@ -39,15 +39,15 @@ func NewFilter(cfg *config.ReplicaConfig) (*Filter, error) {
return nil, err
}
return &Filter{
filter: filter,
ignoreTxnCommitTs: cfg.Filter.IgnoreTxnCommitTs,
ddlWhitelist: cfg.Filter.DDLWhitelist,
filter: filter,
ignoreTxnStartTs: cfg.Filter.IgnoreTxnStartTs,
ddlWhitelist: cfg.Filter.DDLWhitelist,
}, nil
}

// ShouldIgnoreTxn returns true is the given txn should be ignored
func (f *Filter) shouldIgnoreCommitTs(ts uint64) bool {
for _, ignoreTs := range f.ignoreTxnCommitTs {
func (f *Filter) shouldIgnoreStartTs(ts uint64) bool {
for _, ignoreTs := range f.ignoreTxnStartTs {
if ignoreTs == ts {
return true
}
Expand All @@ -69,13 +69,13 @@ func (f *Filter) ShouldIgnoreTable(db, tbl string) bool {
// ShouldIgnoreDMLEvent removes DMLs that's not wanted by this change feed.
// CDC only supports filtering by database/table now.
func (f *Filter) ShouldIgnoreDMLEvent(ts uint64, schema, table string) bool {
return f.shouldIgnoreCommitTs(ts) || f.ShouldIgnoreTable(schema, table)
return f.shouldIgnoreStartTs(ts) || f.ShouldIgnoreTable(schema, table)
}

// ShouldIgnoreDDLEvent removes DDLs that's not wanted by this change feed.
// CDC only supports filtering by database/table now.
func (f *Filter) ShouldIgnoreDDLEvent(ts uint64, schema, table string) bool {
return f.shouldIgnoreCommitTs(ts) || f.ShouldIgnoreTable(schema, table)
return f.shouldIgnoreStartTs(ts) || f.ShouldIgnoreTable(schema, table)
}

// ShouldDiscardDDL returns true if this DDL should be discarded
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *filterSuite) TestShouldUseCustomRules(c *check.C) {
func (s *filterSuite) TestShouldIgnoreTxn(c *check.C) {
filter, err := NewFilter(&config.ReplicaConfig{
Filter: &config.FilterConfig{
IgnoreTxnCommitTs: []uint64{1, 3},
IgnoreTxnStartTs: []uint64{1, 3},
Rules: &filter.Rules{
DoDBs: []string{"sns", "ecom"},
IgnoreTables: []*filter.Table{
Expand Down

0 comments on commit b66fffd

Please sign in to comment.