From 4e177d94ca41d5f49a44c5545adfca1c72820f87 Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 25 Sep 2024 22:03:56 +0800 Subject: [PATCH] sink: avoid memory allocations for transforming ColumnData to Column (#11595) close pingcap/tiflow#11590 --- cdc/model/sink.go | 123 +++++++++-- cdc/model/sink_gen.go | 197 ++++++++++++++++++ cdc/model/sink_gen_test.go | 113 ++++++++++ cdc/sink/dmlsink/txn/event.go | 16 +- cdc/sink/dmlsink/txn/event_test.go | 12 +- cdc/sink/dmlsink/txn/mysql/dml.go | 44 ++-- cdc/sink/dmlsink/txn/mysql/dml_test.go | 17 +- cdc/sink/dmlsink/txn/mysql/mysql.go | 13 +- docs/swagger/docs.go | 15 +- docs/swagger/swagger.json | 15 +- docs/swagger/swagger.yaml | 11 + pkg/sink/codec/canal/canal_entry.go | 43 ++-- .../canal/canal_json_row_event_encoder.go | 55 ++--- pkg/sink/codec/craft/model.go | 24 +-- pkg/sink/codec/csv/csv_message.go | 18 +- pkg/sink/codec/csv/csv_message_test.go | 6 +- pkg/sink/codec/debezium/codec.go | 146 ++++++------- pkg/sink/codec/internal/column.go | 13 +- pkg/sink/codec/open/open_protocol_message.go | 24 +-- .../codec/open/open_protocol_message_test.go | 4 +- pkg/util/bitflag.go | 8 +- tools/check/go.sum | 1 + 22 files changed, 671 insertions(+), 247 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index cab078011ef..dd6396dace1 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -95,8 +95,8 @@ func (b *ColumnFlagType) UnsetIsBinary() { } // IsBinary shows whether BinaryFlag is set -func (b *ColumnFlagType) IsBinary() bool { - return (*util.Flag)(b).HasAll(util.Flag(BinaryFlag)) +func (b ColumnFlagType) IsBinary() bool { + return (util.Flag)(b).HasAll(util.Flag(BinaryFlag)) } // SetIsHandleKey sets HandleKey @@ -110,8 +110,8 @@ func (b *ColumnFlagType) UnsetIsHandleKey() { } // IsHandleKey shows whether HandleKey is set -func (b *ColumnFlagType) IsHandleKey() bool { - return (*util.Flag)(b).HasAll(util.Flag(HandleKeyFlag)) +func (b ColumnFlagType) IsHandleKey() bool { + return (util.Flag)(b).HasAll(util.Flag(HandleKeyFlag)) } // SetIsGeneratedColumn sets GeneratedColumn @@ -125,8 +125,8 @@ func (b *ColumnFlagType) UnsetIsGeneratedColumn() { } // IsGeneratedColumn shows whether GeneratedColumn is set -func (b *ColumnFlagType) IsGeneratedColumn() bool { - return (*util.Flag)(b).HasAll(util.Flag(GeneratedColumnFlag)) +func (b ColumnFlagType) IsGeneratedColumn() bool { + return (util.Flag)(b).HasAll(util.Flag(GeneratedColumnFlag)) } // SetIsPrimaryKey sets PrimaryKeyFlag @@ -140,8 +140,8 @@ func (b *ColumnFlagType) UnsetIsPrimaryKey() { } // IsPrimaryKey shows whether PrimaryKeyFlag is set -func (b *ColumnFlagType) IsPrimaryKey() bool { - return (*util.Flag)(b).HasAll(util.Flag(PrimaryKeyFlag)) +func (b ColumnFlagType) IsPrimaryKey() bool { + return (util.Flag)(b).HasAll(util.Flag(PrimaryKeyFlag)) } // SetIsUniqueKey sets UniqueKeyFlag @@ -155,13 +155,13 @@ func (b *ColumnFlagType) UnsetIsUniqueKey() { } // IsUniqueKey shows whether UniqueKeyFlag is set -func (b *ColumnFlagType) IsUniqueKey() bool { - return (*util.Flag)(b).HasAll(util.Flag(UniqueKeyFlag)) +func (b ColumnFlagType) IsUniqueKey() bool { + return (util.Flag)(b).HasAll(util.Flag(UniqueKeyFlag)) } // IsMultipleKey shows whether MultipleKeyFlag is set -func (b *ColumnFlagType) IsMultipleKey() bool { - return (*util.Flag)(b).HasAll(util.Flag(MultipleKeyFlag)) +func (b ColumnFlagType) IsMultipleKey() bool { + return (util.Flag)(b).HasAll(util.Flag(MultipleKeyFlag)) } // SetIsMultipleKey sets MultipleKeyFlag @@ -175,8 +175,8 @@ func (b *ColumnFlagType) UnsetIsMultipleKey() { } // IsNullable shows whether NullableFlag is set -func (b *ColumnFlagType) IsNullable() bool { - return (*util.Flag)(b).HasAll(util.Flag(NullableFlag)) +func (b ColumnFlagType) IsNullable() bool { + return (util.Flag)(b).HasAll(util.Flag(NullableFlag)) } // SetIsNullable sets NullableFlag @@ -190,8 +190,8 @@ func (b *ColumnFlagType) UnsetIsNullable() { } // IsUnsigned shows whether UnsignedFlag is set -func (b *ColumnFlagType) IsUnsigned() bool { - return (*util.Flag)(b).HasAll(util.Flag(UnsignedFlag)) +func (b ColumnFlagType) IsUnsigned() bool { + return (util.Flag)(b).HasAll(util.Flag(UnsignedFlag)) } // SetIsUnsigned sets UnsignedFlag @@ -1295,3 +1295,94 @@ type TopicPartitionKey struct { PartitionKey string TotalPartition int32 } + +// ColumnDataX is like ColumnData, but contains more informations. +// +//msgp:ignore RowChangedEvent +type ColumnDataX struct { + *ColumnData + flag *ColumnFlagType + info *model.ColumnInfo +} + +// GetColumnDataX encapsures ColumnData to ColumnDataX. +func GetColumnDataX(col *ColumnData, tb *TableInfo) ColumnDataX { + x := ColumnDataX{ColumnData: col} + if x.ColumnData != nil { + x.flag = tb.ColumnsFlag[col.ColumnID] + x.info = tb.Columns[tb.columnsOffset[col.ColumnID]] + } + return x +} + +// GetName returns name. +func (x ColumnDataX) GetName() string { + return x.info.Name.O +} + +// GetType returns type. +func (x ColumnDataX) GetType() byte { + return x.info.GetType() +} + +// GetCharset returns charset. +func (x ColumnDataX) GetCharset() string { + return x.info.GetCharset() +} + +// GetCollation returns collation. +func (x ColumnDataX) GetCollation() string { + return x.info.GetCollate() +} + +// GetFlag returns flag. +func (x ColumnDataX) GetFlag() ColumnFlagType { + return *x.flag +} + +// GetDefaultValue return default value. +func (x ColumnDataX) GetDefaultValue() interface{} { + return GetColumnDefaultValue(x.info) +} + +// GetColumnInfo returns column info. +func (x ColumnDataX) GetColumnInfo() *model.ColumnInfo { + return x.info +} + +// Columns2ColumnDataForTest is for tests. +func Columns2ColumnDataForTest(columns []*Column) ([]*ColumnData, *TableInfo) { + info := &TableInfo{ + TableInfo: &model.TableInfo{ + Columns: make([]*model.ColumnInfo, len(columns)), + }, + ColumnsFlag: make(map[int64]*ColumnFlagType, len(columns)), + columnsOffset: make(map[int64]int), + } + colDatas := make([]*ColumnData, 0, len(columns)) + + for i, column := range columns { + var columnID int64 = int64(i) + info.columnsOffset[columnID] = i + + info.Columns[i] = &model.ColumnInfo{} + info.Columns[i].Name.O = column.Name + info.Columns[i].SetType(column.Type) + info.Columns[i].SetCharset(column.Charset) + info.Columns[i].SetCollate(column.Collation) + info.Columns[i].DefaultValue = column.Default + + info.ColumnsFlag[columnID] = new(ColumnFlagType) + *info.ColumnsFlag[columnID] = column.Flag + + colDatas = append(colDatas, &ColumnData{ColumnID: columnID, Value: column.Value}) + } + + return colDatas, info +} + +// Column2ColumnDataXForTest is for tests. +func Column2ColumnDataXForTest(column *Column) ColumnDataX { + datas, info := Columns2ColumnDataForTest([]*Column{column}) + return GetColumnDataX(datas[0], info) +} diff --git a/cdc/model/sink_gen.go b/cdc/model/sink_gen.go index f431446021d..9483e055f2d 100644 --- a/cdc/model/sink_gen.go +++ b/cdc/model/sink_gen.go @@ -287,6 +287,203 @@ func (z ColumnData) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *ColumnDataX) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "ColumnData": + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + err = msgp.WrapError(err, "ColumnData") + return + } + z.ColumnData = nil + } else { + if z.ColumnData == nil { + z.ColumnData = new(ColumnData) + } + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "ColumnData") + return + } + for zb0002 > 0 { + zb0002-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, "ColumnData") + return + } + switch msgp.UnsafeString(field) { + case "column_id": + z.ColumnData.ColumnID, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "ColumnData", "ColumnID") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, "ColumnData") + return + } + } + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *ColumnDataX) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 1 + // write "ColumnData" + err = en.Append(0x81, 0xaa, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x44, 0x61, 0x74, 0x61) + if err != nil { + return + } + if z.ColumnData == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + // map header, size 1 + // write "column_id" + err = en.Append(0x81, 0xa9, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x5f, 0x69, 0x64) + if err != nil { + return + } + err = en.WriteInt64(z.ColumnData.ColumnID) + if err != nil { + err = msgp.WrapError(err, "ColumnData", "ColumnID") + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *ColumnDataX) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 1 + // string "ColumnData" + o = append(o, 0x81, 0xaa, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x44, 0x61, 0x74, 0x61) + if z.ColumnData == nil { + o = msgp.AppendNil(o) + } else { + // map header, size 1 + // string "column_id" + o = append(o, 0x81, 0xa9, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x5f, 0x69, 0x64) + o = msgp.AppendInt64(o, z.ColumnData.ColumnID) + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *ColumnDataX) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "ColumnData": + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + z.ColumnData = nil + } else { + if z.ColumnData == nil { + z.ColumnData = new(ColumnData) + } + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ColumnData") + return + } + for zb0002 > 0 { + zb0002-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err, "ColumnData") + return + } + switch msgp.UnsafeString(field) { + case "column_id": + z.ColumnData.ColumnID, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ColumnData", "ColumnID") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err, "ColumnData") + return + } + } + } + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *ColumnDataX) Msgsize() (s int) { + s = 1 + 11 + if z.ColumnData == nil { + s += msgp.NilSize + } else { + s += 1 + 10 + msgp.Int64Size + } + return +} + // DecodeMsg implements msgp.Decodable func (z *DDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte diff --git a/cdc/model/sink_gen_test.go b/cdc/model/sink_gen_test.go index a43663539d1..dbe154f0e79 100644 --- a/cdc/model/sink_gen_test.go +++ b/cdc/model/sink_gen_test.go @@ -235,6 +235,119 @@ func BenchmarkDecodeColumnData(b *testing.B) { } } +func TestMarshalUnmarshalColumnDataX(t *testing.T) { + v := ColumnDataX{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgColumnDataX(b *testing.B) { + v := ColumnDataX{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgColumnDataX(b *testing.B) { + v := ColumnDataX{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalColumnDataX(b *testing.B) { + v := ColumnDataX{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeColumnDataX(t *testing.T) { + v := ColumnDataX{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeColumnDataX Msgsize() is inaccurate") + } + + vn := ColumnDataX{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeColumnDataX(b *testing.B) { + v := ColumnDataX{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeColumnDataX(b *testing.B) { + v := ColumnDataX{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalDDLEvent(t *testing.T) { v := DDLEvent{} bts, err := v.MarshalMsg(nil) diff --git a/cdc/sink/dmlsink/txn/event.go b/cdc/sink/dmlsink/txn/event.go index 1632bce8823..f4e90e8595f 100644 --- a/cdc/sink/dmlsink/txn/event.go +++ b/cdc/sink/dmlsink/txn/event.go @@ -73,7 +73,7 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte { var keys [][]byte if len(row.Columns) != 0 { for iIdx, idxCol := range row.TableInfo.IndexColumnsOffset { - key := genKeyList(row.GetColumns(), iIdx, idxCol, row.GetTableID()) + key := genKeyList(row.Columns, row.TableInfo, iIdx, idxCol, row.GetTableID()) if len(key) == 0 { continue } @@ -82,7 +82,7 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte { } if len(row.PreColumns) != 0 { for iIdx, idxCol := range row.TableInfo.IndexColumnsOffset { - key := genKeyList(row.GetPreColumns(), iIdx, idxCol, row.GetTableID()) + key := genKeyList(row.PreColumns, row.TableInfo, iIdx, idxCol, row.GetTableID()) if len(key) == 0 { continue } @@ -100,20 +100,20 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte { return keys } -func genKeyList( - columns []*model.Column, iIdx int, colIdx []int, tableID int64, -) []byte { +func genKeyList(columns []*model.ColumnData, tb *model.TableInfo, iIdx int, colIdx []int, tableID int64) []byte { var key []byte for _, i := range colIdx { + col := model.GetColumnDataX(columns[i], tb) + // if a column value is null, we can ignore this index // If the index contain generated column, we can't use this key to detect conflict with other DML, // Because such as insert can't specify the generated value. - if columns[i] == nil || columns[i].Value == nil || columns[i].Flag.IsGeneratedColumn() { + if col.ColumnData == nil || col.Value == nil || col.GetFlag().IsGeneratedColumn() { return nil } - val := model.ColumnValueString(columns[i].Value) - if columnNeeds2LowerCase(columns[i].Type, columns[i].Collation) { + val := model.ColumnValueString(col.Value) + if columnNeeds2LowerCase(col.GetType(), col.GetCollation()) { val = strings.ToLower(val) } diff --git a/cdc/sink/dmlsink/txn/event_test.go b/cdc/sink/dmlsink/txn/event_test.go index bdbb5cd7db9..29d27e13759 100644 --- a/cdc/sink/dmlsink/txn/event_test.go +++ b/cdc/sink/dmlsink/txn/event_test.go @@ -25,24 +25,24 @@ import ( func TestGenKeyListCaseInSensitive(t *testing.T) { t.Parallel() - columns := []*model.Column{ + columns, tb := model.Columns2ColumnDataForTest([]*model.Column{ { Value: "XyZ", Type: mysql.TypeVarchar, Collation: "utf8_unicode_ci", }, - } + }) - first := genKeyList(columns, 0, []int{0}, 1) + first := genKeyList(columns, tb, 0, []int{0}, 1) - columns = []*model.Column{ + columns, tb = model.Columns2ColumnDataForTest([]*model.Column{ { Value: "xYZ", Type: mysql.TypeVarchar, Collation: "utf8_unicode_ci", }, - } - second := genKeyList(columns, 0, []int{0}, 1) + }) + second := genKeyList(columns, tb, 0, []int{0}, 1) require.Equal(t, first, second) } diff --git a/cdc/sink/dmlsink/txn/mysql/dml.go b/cdc/sink/dmlsink/txn/mysql/dml.go index a82d7fc65c4..39dd639f06d 100644 --- a/cdc/sink/dmlsink/txn/mysql/dml.go +++ b/cdc/sink/dmlsink/txn/mysql/dml.go @@ -25,18 +25,19 @@ import ( // prepareUpdate builds a parametrics UPDATE statement as following // sql: `UPDATE `test`.`t` SET {} = ?, {} = ? WHERE {} = ?, {} = {} LIMIT 1` // `WHERE` conditions come from `preCols` and SET clause targets come from `cols`. -func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplicate bool) (string, []interface{}) { +func prepareUpdate(quoteTable string, preCols, cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (string, []interface{}) { var builder strings.Builder builder.WriteString("UPDATE " + quoteTable + " SET ") columnNames := make([]string, 0, len(cols)) args := make([]interface{}, 0, len(cols)+len(preCols)) for _, col := range cols { - if col == nil || col.Flag.IsGeneratedColumn() { + colx := model.GetColumnDataX(col, tb) + if colx.ColumnData == nil || colx.GetFlag().IsGeneratedColumn() { continue } - columnNames = append(columnNames, col.Name) - args = appendQueryArgs(args, col) + columnNames = append(columnNames, colx.GetName()) + args = appendQueryArgs(args, colx) } if len(args) == 0 { return "", nil @@ -50,7 +51,7 @@ func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplic } builder.WriteString(" WHERE ") - colNames, wargs := whereSlice(preCols, forceReplicate) + colNames, wargs := whereSlice(preCols, tb, forceReplicate) if len(wargs) == 0 { return "", nil } @@ -74,7 +75,8 @@ func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplic // sql: `REPLACE INTO `test`.`t` VALUES (?,?,?)` func prepareReplace( quoteTable string, - cols []*model.Column, + cols []*model.ColumnData, + tb *model.TableInfo, appendPlaceHolder bool, translateToInsert bool, ) (string, []interface{}) { @@ -82,11 +84,12 @@ func prepareReplace( columnNames := make([]string, 0, len(cols)) args := make([]interface{}, 0, len(cols)) for _, col := range cols { - if col == nil || col.Flag.IsGeneratedColumn() { + colx := model.GetColumnDataX(col, tb) + if colx.ColumnData == nil || colx.GetFlag().IsGeneratedColumn() { continue } - columnNames = append(columnNames, col.Name) - args = appendQueryArgs(args, col) + columnNames = append(columnNames, colx.GetName()) + args = appendQueryArgs(args, colx) } if len(args) == 0 { return "", nil @@ -109,10 +112,11 @@ func prepareReplace( // representation. Because if we use the byte array respresentation, the go-sql-driver // will automatically set `_binary` charset for that column, which is not expected. // See https://github.com/go-sql-driver/mysql/blob/ce134bfc/connection.go#L267 -func appendQueryArgs(args []interface{}, col *model.Column) []interface{} { +func appendQueryArgs(args []interface{}, col model.ColumnDataX) []interface{} { switch v := col.Value.(type) { case []byte: - if col.Charset != "" && col.Charset != charset.CharsetBin { + cst := col.GetCharset() + if cst != "" && cst != charset.CharsetBin { args = append(args, string(v)) return args } @@ -126,11 +130,11 @@ func appendQueryArgs(args []interface{}, col *model.Column) []interface{} { // prepareDelete builds a parametric DELETE statement as following // sql: `DELETE FROM `test`.`t` WHERE x = ? AND y >= ? LIMIT 1` -func prepareDelete(quoteTable string, cols []*model.Column, forceReplicate bool) (string, []interface{}) { +func prepareDelete(quoteTable string, cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (string, []interface{}) { var builder strings.Builder builder.WriteString("DELETE FROM " + quoteTable + " WHERE ") - colNames, wargs := whereSlice(cols, forceReplicate) + colNames, wargs := whereSlice(cols, tb, forceReplicate) if len(wargs) == 0 { return "", nil } @@ -153,22 +157,24 @@ func prepareDelete(quoteTable string, cols []*model.Column, forceReplicate bool) // whereSlice builds a parametric WHERE clause as following // sql: `WHERE {} = ? AND {} > ?` -func whereSlice(cols []*model.Column, forceReplicate bool) (colNames []string, args []interface{}) { +func whereSlice(cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (colNames []string, args []interface{}) { // Try to use unique key values when available for _, col := range cols { - if col == nil || !col.Flag.IsHandleKey() { + colx := model.GetColumnDataX(col, tb) + if colx.ColumnData == nil || !colx.GetFlag().IsHandleKey() { continue } - colNames = append(colNames, col.Name) - args = appendQueryArgs(args, col) + colNames = append(colNames, colx.GetName()) + args = appendQueryArgs(args, colx) } // if no explicit row id but force replicate, use all key-values in where condition if len(colNames) == 0 && forceReplicate { colNames = make([]string, 0, len(cols)) args = make([]interface{}, 0, len(cols)) for _, col := range cols { - colNames = append(colNames, col.Name) - args = appendQueryArgs(args, col) + colx := model.GetColumnDataX(col, tb) + colNames = append(colNames, colx.GetName()) + args = appendQueryArgs(args, colx) } } return diff --git a/cdc/sink/dmlsink/txn/mysql/dml_test.go b/cdc/sink/dmlsink/txn/mysql/dml_test.go index ffc3c846982..a3589daa8c1 100644 --- a/cdc/sink/dmlsink/txn/mysql/dml_test.go +++ b/cdc/sink/dmlsink/txn/mysql/dml_test.go @@ -283,7 +283,9 @@ func TestPrepareUpdate(t *testing.T) { }, } for _, tc := range testCases { - query, args := prepareUpdate(tc.quoteTable, tc.preCols, tc.cols, false) + preDatas, info := model.Columns2ColumnDataForTest(tc.preCols) + datas, _ := model.Columns2ColumnDataForTest(tc.cols) + query, args := prepareUpdate(tc.quoteTable, preDatas, datas, info, false) require.Equal(t, tc.expectedSQL, query) require.Equal(t, tc.expectedArgs, args) } @@ -425,7 +427,8 @@ func TestPrepareDelete(t *testing.T) { }, } for _, tc := range testCases { - query, args := prepareDelete(tc.quoteTable, tc.preCols, false) + preDatas, info := model.Columns2ColumnDataForTest(tc.preCols) + query, args := prepareDelete(tc.quoteTable, preDatas, info, false) require.Equal(t, tc.expectedSQL, query) require.Equal(t, tc.expectedArgs, args) } @@ -634,9 +637,10 @@ func TestWhereSlice(t *testing.T) { expectedArgs: []interface{}{1, "你好", 100}, }, } - for _, tc := range testCases { - colNames, args := whereSlice(tc.cols, tc.forceReplicate) - require.Equal(t, tc.expectedColNames, colNames) + for i, tc := range testCases { + datas, info := model.Columns2ColumnDataForTest(tc.cols) + colNames, args := whereSlice(datas, info, tc.forceReplicate) + require.Equal(t, tc.expectedColNames, colNames, "case %d fails", i) require.Equal(t, tc.expectedArgs, args) } } @@ -763,7 +767,8 @@ func TestMapReplace(t *testing.T) { for _, tc := range testCases { // multiple times to verify the stability of column sequence in query string for i := 0; i < 10; i++ { - query, args := prepareReplace(tc.quoteTable, tc.cols, false, false) + datas, info := model.Columns2ColumnDataForTest(tc.cols) + query, args := prepareReplace(tc.quoteTable, datas, info, false, false) require.Equal(t, tc.expectedQuery, query) require.Equal(t, tc.expectedArgs, args) } diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index 66d819cb5ab..c667d742659 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -574,8 +574,9 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { if len(row.PreColumns) != 0 && len(row.Columns) != 0 { query, args = prepareUpdate( quoteTable, - row.GetPreColumns(), - row.GetColumns(), + row.PreColumns, + row.Columns, + row.TableInfo, s.cfg.ForceReplicate) if query != "" { sqls = append(sqls, query) @@ -587,7 +588,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { // Delete Event if len(row.PreColumns) != 0 { - query, args = prepareDelete(quoteTable, row.GetPreColumns(), s.cfg.ForceReplicate) + query, args = prepareDelete(quoteTable, row.PreColumns, row.TableInfo, s.cfg.ForceReplicate) if query != "" { sqls = append(sqls, query) values = append(values, args) @@ -599,11 +600,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { // INSERT(not in safe mode) // or REPLACE(in safe mode) SQL. if len(row.Columns) != 0 { - query, args = prepareReplace( - quoteTable, - row.GetColumns(), - true, /* appendPlaceHolder */ - translateToInsert) + query, args = prepareReplace(quoteTable, row.Columns, row.TableInfo, true, translateToInsert) if query != "" { sqls = append(sqls, query) values = append(values, args) diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index ad976303486..7843c1f9d8b 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1684,6 +1684,9 @@ var doc = `{ "config.LargeMessageHandleConfig": { "type": "object", "properties": { + "claim-check-raw-value": { + "type": "boolean" + }, "claim-check-storage-uri": { "type": "string" }, @@ -1860,7 +1863,7 @@ var doc = `{ "type": "object", "properties": { "advance-timeout-in-sec": { - "description": "AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.", + "description": "AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.\nDeprecated since v8.1.1", "type": "integer" }, "cloud-storage-config": { @@ -1948,6 +1951,10 @@ var doc = `{ "description": "SchemaRegistry is only available when the downstream is MQ using avro protocol.", "type": "string" }, + "send-all-bootstrap-at-start": { + "description": "SendAllBootstrapAtStart determines whether to send all tables bootstrap message at changefeed start.", + "type": "boolean" + }, "send-bootstrap-in-msg-count": { "description": "SendBootstrapInMsgCount means bootstrap messages are being sent every SendBootstrapInMsgCount row change messages.", "type": "integer" @@ -2815,6 +2822,9 @@ var doc = `{ "v2.LargeMessageHandleConfig": { "type": "object", "properties": { + "claim_check_raw_value": { + "type": "boolean" + }, "claim_check_storage_uri": { "type": "string" }, @@ -3218,6 +3228,9 @@ var doc = `{ "schema_registry": { "type": "string" }, + "send-all-bootstrap-at-start": { + "type": "boolean" + }, "send_bootstrap_in_msg_count": { "type": "integer" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 8d4f5d1907a..747d9a6da53 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1665,6 +1665,9 @@ "config.LargeMessageHandleConfig": { "type": "object", "properties": { + "claim-check-raw-value": { + "type": "boolean" + }, "claim-check-storage-uri": { "type": "string" }, @@ -1841,7 +1844,7 @@ "type": "object", "properties": { "advance-timeout-in-sec": { - "description": "AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.", + "description": "AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.\nDeprecated since v8.1.1", "type": "integer" }, "cloud-storage-config": { @@ -1929,6 +1932,10 @@ "description": "SchemaRegistry is only available when the downstream is MQ using avro protocol.", "type": "string" }, + "send-all-bootstrap-at-start": { + "description": "SendAllBootstrapAtStart determines whether to send all tables bootstrap message at changefeed start.", + "type": "boolean" + }, "send-bootstrap-in-msg-count": { "description": "SendBootstrapInMsgCount means bootstrap messages are being sent every SendBootstrapInMsgCount row change messages.", "type": "integer" @@ -2796,6 +2803,9 @@ "v2.LargeMessageHandleConfig": { "type": "object", "properties": { + "claim_check_raw_value": { + "type": "boolean" + }, "claim_check_storage_uri": { "type": "string" }, @@ -3199,6 +3209,9 @@ "schema_registry": { "type": "string" }, + "send-all-bootstrap-at-start": { + "type": "boolean" + }, "send_bootstrap_in_msg_count": { "type": "integer" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 9cb8b63d7de..eb3f89a2b6d 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -204,6 +204,8 @@ definitions: type: object config.LargeMessageHandleConfig: properties: + claim-check-raw-value: + type: boolean claim-check-storage-uri: type: string large-message-handle-compression: @@ -348,6 +350,7 @@ definitions: description: |- AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been advanced for this given duration, the sink will be canceled and re-established. + Deprecated since v8.1.1 type: integer cloud-storage-config: $ref: '#/definitions/config.CloudStorageConfig' @@ -417,6 +420,10 @@ definitions: description: SchemaRegistry is only available when the downstream is MQ using avro protocol. type: string + send-all-bootstrap-at-start: + description: SendAllBootstrapAtStart determines whether to send all tables + bootstrap message at changefeed start. + type: boolean send-bootstrap-in-msg-count: description: SendBootstrapInMsgCount means bootstrap messages are being sent every SendBootstrapInMsgCount row change messages. @@ -1008,6 +1015,8 @@ definitions: type: object v2.LargeMessageHandleConfig: properties: + claim_check_raw_value: + type: boolean claim_check_storage_uri: type: string large_message_handle_compression: @@ -1273,6 +1282,8 @@ definitions: type: boolean schema_registry: type: string + send-all-bootstrap-at-start: + type: boolean send_bootstrap_in_msg_count: type: integer send_bootstrap_interval_in_sec: diff --git a/pkg/sink/codec/canal/canal_entry.go b/pkg/sink/codec/canal/canal_entry.go index bcb4c7c09b7..8925ed88123 100644 --- a/pkg/sink/codec/canal/canal_entry.go +++ b/pkg/sink/codec/canal/canal_entry.go @@ -22,7 +22,6 @@ import ( "github.com/golang/protobuf/proto" // nolint:staticcheck "github.com/pingcap/errors" mm "github.com/pingcap/tidb/pkg/meta/model" - timodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" @@ -120,22 +119,22 @@ func (b *canalEntryBuilder) formatValue(value interface{}, isBinary bool) (resul // build the Column in the canal RowData // see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L756-L872 -func (b *canalEntryBuilder) buildColumn(c *model.Column, columnInfo *timodel.ColumnInfo, updated bool) (*canal.Column, error) { - mysqlType := utils.GetMySQLType(columnInfo, b.config.ContentCompatible) - javaType, err := getJavaSQLType(c.Value, c.Type, c.Flag) +func (b *canalEntryBuilder) buildColumn(c model.ColumnDataX, updated bool) (*canal.Column, error) { + mysqlType := utils.GetMySQLType(c.GetColumnInfo(), b.config.ContentCompatible) + javaType, err := getJavaSQLType(c.Value, c.GetType(), c.GetFlag()) if err != nil { return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } - value, err := b.formatValue(c.Value, c.Flag.IsBinary()) + value, err := b.formatValue(c.Value, c.GetFlag().IsBinary()) if err != nil { return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } canalColumn := &canal.Column{ SqlType: int32(javaType), - Name: c.Name, - IsKey: c.Flag.IsPrimaryKey(), + Name: c.GetName(), + IsKey: c.GetFlag().IsPrimaryKey(), Updated: updated, IsNullPresent: &canal.Column_IsNull{IsNull: c.Value == nil}, Value: value, @@ -147,17 +146,13 @@ func (b *canalEntryBuilder) buildColumn(c *model.Column, columnInfo *timodel.Col // build the RowData of a canal entry func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent, onlyHandleKeyColumns bool) (*canal.RowData, error) { var columns []*canal.Column - colInfos := e.TableInfo.GetColInfosForRowChangedEvent() - for idx, column := range e.GetColumns() { - if column == nil { + for _, col := range e.Columns { + column := model.GetColumnDataX(col, e.TableInfo) + if column.ColumnData == nil { continue } - columnInfo, ok := e.TableInfo.GetColumnInfo(colInfos[idx].ID) - if !ok { - return nil, cerror.ErrCanalEncodeFailed.GenWithStack( - "column info not found for column id: %d", colInfos[idx].ID) - } - c, err := b.buildColumn(column, columnInfo, !e.IsDelete()) + + c, err := b.buildColumn(column, !e.IsDelete()) if err != nil { return nil, errors.Trace(err) } @@ -166,19 +161,13 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent, onlyHandleKey onlyHandleKeyColumns = onlyHandleKeyColumns && e.IsDelete() var preColumns []*canal.Column - for idx, column := range e.GetPreColumns() { - if column == nil { - continue - } - if onlyHandleKeyColumns && !column.Flag.IsHandleKey() { + for _, col := range e.PreColumns { + column := model.GetColumnDataX(col, e.TableInfo) + if column.ColumnData == nil || onlyHandleKeyColumns && !column.GetFlag().IsHandleKey() { continue } - columnInfo, ok := e.TableInfo.GetColumnInfo(colInfos[idx].ID) - if !ok { - return nil, cerror.ErrCanalEncodeFailed.GenWithStack( - "column info not found for column id: %d", colInfos[idx].ID) - } - c, err := b.buildColumn(column, columnInfo, !e.IsDelete()) + + c, err := b.buildColumn(column, !e.IsDelete()) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder.go b/pkg/sink/codec/canal/canal_json_row_event_encoder.go index 76f1f5e52d3..e1efefb3f32 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder.go @@ -32,10 +32,11 @@ import ( ) func fillColumns( - columns []*model.Column, + columns []*model.ColumnData, + tb *model.TableInfo, onlyOutputUpdatedColumn bool, onlyHandleKeyColumn bool, - newColumnMap map[string]*model.Column, + newColumnMap map[string]model.ColumnDataX, out *jwriter.Writer, builder *canalEntryBuilder, ) error { @@ -47,12 +48,13 @@ func fillColumns( out.RawByte('{') isFirst := true for _, col := range columns { - if col != nil { + colx := model.GetColumnDataX(col, tb) + if colx.ColumnData != nil { // column equal, do not output it - if onlyOutputUpdatedColumn && shouldIgnoreColumn(col, newColumnMap) { + if onlyOutputUpdatedColumn && shouldIgnoreColumn(colx, newColumnMap) { continue } - if onlyHandleKeyColumn && !col.Flag.IsHandleKey() { + if onlyHandleKeyColumn && !colx.GetFlag().IsHandleKey() { continue } if isFirst { @@ -60,11 +62,11 @@ func fillColumns( } else { out.RawByte(',') } - value, err := builder.formatValue(col.Value, col.Flag.IsBinary()) + value, err := builder.formatValue(colx.Value, colx.GetFlag().IsBinary()) if err != nil { return cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } - out.String(col.Name) + out.String(colx.GetName()) out.RawByte(':') if col.Value == nil { out.RawString("null") @@ -217,41 +219,30 @@ func newJSONMessageForDML( if e.IsDelete() { out.RawString(",\"old\":null") out.RawString(",\"data\":") - if err := fillColumns( - e.GetPreColumns(), - false, onlyHandleKey, nil, out, builder, - ); err != nil { + if err := fillColumns(e.PreColumns, e.TableInfo, false, onlyHandleKey, nil, out, builder); err != nil { return nil, err } } else if e.IsInsert() { out.RawString(",\"old\":null") out.RawString(",\"data\":") - if err := fillColumns( - e.GetColumns(), - false, onlyHandleKey, nil, out, builder, - ); err != nil { + if err := fillColumns(e.Columns, e.TableInfo, false, onlyHandleKey, nil, out, builder); err != nil { return nil, err } } else if e.IsUpdate() { - var newColsMap map[string]*model.Column + var newColsMap map[string]model.ColumnDataX if config.OnlyOutputUpdatedColumns { - newColsMap = make(map[string]*model.Column, len(e.Columns)) - for _, col := range e.GetColumns() { - newColsMap[col.Name] = col + newColsMap = make(map[string]model.ColumnDataX, len(e.Columns)) + for _, col := range e.Columns { + colx := model.GetColumnDataX(col, e.TableInfo) + newColsMap[colx.GetName()] = colx } } out.RawString(",\"old\":") - if err := fillColumns( - e.GetPreColumns(), - config.OnlyOutputUpdatedColumns, onlyHandleKey, newColsMap, out, builder, - ); err != nil { + if err := fillColumns(e.PreColumns, e.TableInfo, config.OnlyOutputUpdatedColumns, onlyHandleKey, newColsMap, out, builder); err != nil { return nil, err } out.RawString(",\"data\":") - if err := fillColumns( - e.GetColumns(), - false, onlyHandleKey, nil, out, builder, - ); err != nil { + if err := fillColumns(e.Columns, e.TableInfo, false, onlyHandleKey, nil, out, builder); err != nil { return nil, err } } else { @@ -549,13 +540,11 @@ func (b *jsonRowEventEncoderBuilder) Build() codec.RowEventEncoder { return newJSONRowEventEncoder(b.config, b.claimCheck) } -func shouldIgnoreColumn(col *model.Column, - newColumnMap map[string]*model.Column, -) bool { - newCol, ok := newColumnMap[col.Name] - if ok && newCol != nil { +func shouldIgnoreColumn(col model.ColumnDataX, newColumnMap map[string]model.ColumnDataX) bool { + newCol, ok := newColumnMap[col.GetName()] + if ok && newCol.ColumnData != nil { // sql type is not equal - if newCol.Type != col.Type { + if newCol.GetType() != col.GetType() { return false } // value equal diff --git a/pkg/sink/codec/craft/model.go b/pkg/sink/codec/craft/model.go index b6a90cb3c61..cbccf444cad 100644 --- a/pkg/sink/codec/craft/model.go +++ b/pkg/sink/codec/craft/model.go @@ -366,7 +366,7 @@ func decodeColumnGroup(bits []byte, allocator *SliceAllocator, dict *termDiction }, nil } -func newColumnGroup(allocator *SliceAllocator, ty byte, columns []*model.Column, onlyHandleKeyColumns bool) (int, *columnGroup) { +func newColumnGroup(allocator *SliceAllocator, ty byte, columns []*model.ColumnData, tb *model.TableInfo, onlyHandleKeyColumns bool) (int, *columnGroup) { l := len(columns) if l == 0 { return 0, nil @@ -378,18 +378,16 @@ func newColumnGroup(allocator *SliceAllocator, ty byte, columns []*model.Column, estimatedSize := 0 idx := 0 for _, col := range columns { - if col == nil { + colx := model.GetColumnDataX(col, tb) + if colx.ColumnData == nil || onlyHandleKeyColumns && !colx.GetFlag().IsHandleKey() { continue } - if onlyHandleKeyColumns && !col.Flag.IsHandleKey() { - continue - } - names[idx] = col.Name - types[idx] = uint64(col.Type) - flags[idx] = uint64(col.Flag) - value := EncodeTiDBType(allocator, col.Type, col.Flag, col.Value) + names[idx] = colx.GetName() + types[idx] = uint64(colx.GetType()) + flags[idx] = uint64(colx.GetFlag()) + value := EncodeTiDBType(allocator, colx.GetType(), colx.GetFlag(), colx.Value) values[idx] = value - estimatedSize += len(col.Name) + len(value) + 16 /* two 64-bits integers */ + estimatedSize += len(colx.GetName()) + len(value) + 16 /* two 64-bits integers */ idx++ } if idx > 0 { @@ -421,7 +419,8 @@ func newRowChangedMessage(allocator *SliceAllocator, ev *model.RowChangedEvent, if size, group := newColumnGroup( allocator, columnGroupTypeNew, - ev.GetColumns(), + ev.Columns, + ev.TableInfo, false); group != nil { groups[idx] = group idx++ @@ -431,7 +430,8 @@ func newRowChangedMessage(allocator *SliceAllocator, ev *model.RowChangedEvent, if size, group := newColumnGroup( allocator, columnGroupTypeOld, - ev.GetPreColumns(), + ev.PreColumns, + ev.TableInfo, onlyHandleKeyColumns); group != nil { groups[idx] = group estimatedSize += size diff --git a/pkg/sink/codec/csv/csv_message.go b/pkg/sink/codec/csv/csv_message.go index 7b39b1f247f..c178cf70d06 100644 --- a/pkg/sink/codec/csv/csv_message.go +++ b/pkg/sink/codec/csv/csv_message.go @@ -317,15 +317,15 @@ func fromCsvValToColValue(csvConfig *common.Config, csvVal any, ft types.FieldTy } // fromColValToCsvVal converts column from TiDB type to csv type. -func fromColValToCsvVal(csvConfig *common.Config, col *model.Column, ft *types.FieldType) (any, error) { +func fromColValToCsvVal(csvConfig *common.Config, col model.ColumnDataX, ft *types.FieldType) (any, error) { if col.Value == nil { return nil, nil } - switch col.Type { + switch col.GetType() { case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: - if col.Flag.IsBinary() { + if col.GetFlag().IsBinary() { if v, ok := col.Value.([]byte); ok { switch csvConfig.BinaryEncodingMethod { case config.BinaryEncodingBase64: @@ -390,7 +390,7 @@ func rowChangedEvent2CSVMsg(csvConfig *common.Config, e *model.RowChangedEvent) if e.IsDelete() { csvMsg.opType = operationDelete - csvMsg.columns, err = rowChangeColumns2CSVColumns(csvConfig, e.GetPreColumns(), e.TableInfo) + csvMsg.columns, err = rowChangeColumns2CSVColumns(csvConfig, e.PreColumns, e.TableInfo) if err != nil { return nil, err } @@ -398,7 +398,7 @@ func rowChangedEvent2CSVMsg(csvConfig *common.Config, e *model.RowChangedEvent) if e.PreColumns == nil { // This is a insert operation. csvMsg.opType = operationInsert - csvMsg.columns, err = rowChangeColumns2CSVColumns(csvConfig, e.GetColumns(), e.TableInfo) + csvMsg.columns, err = rowChangeColumns2CSVColumns(csvConfig, e.Columns, e.TableInfo) if err != nil { return nil, err } @@ -411,12 +411,12 @@ func rowChangedEvent2CSVMsg(csvConfig *common.Config, e *model.RowChangedEvent) fmt.Errorf("the column length of preColumns %d doesn't equal to that of columns %d", len(e.PreColumns), len(e.Columns))) } - csvMsg.preColumns, err = rowChangeColumns2CSVColumns(csvConfig, e.GetPreColumns(), e.TableInfo) + csvMsg.preColumns, err = rowChangeColumns2CSVColumns(csvConfig, e.PreColumns, e.TableInfo) if err != nil { return nil, err } } - csvMsg.columns, err = rowChangeColumns2CSVColumns(csvConfig, e.GetColumns(), e.TableInfo) + csvMsg.columns, err = rowChangeColumns2CSVColumns(csvConfig, e.Columns, e.TableInfo) if err != nil { return nil, err } @@ -448,7 +448,7 @@ func csvMsg2RowChangedEvent(csvConfig *common.Config, csvMsg *csvMessage, tableI return e, nil } -func rowChangeColumns2CSVColumns(csvConfig *common.Config, cols []*model.Column, tableInfo *model.TableInfo) ([]any, error) { +func rowChangeColumns2CSVColumns(csvConfig *common.Config, cols []*model.ColumnData, tableInfo *model.TableInfo) ([]any, error) { var csvColumns []any colInfos := tableInfo.GetColInfosForRowChangedEvent() for i, column := range cols { @@ -458,7 +458,7 @@ func rowChangeColumns2CSVColumns(csvConfig *common.Config, cols []*model.Column, continue } - converted, err := fromColValToCsvVal(csvConfig, column, colInfos[i].Ft) + converted, err := fromColValToCsvVal(csvConfig, model.GetColumnDataX(column, tableInfo), colInfos[i].Ft) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/sink/codec/csv/csv_message_test.go b/pkg/sink/codec/csv/csv_message_test.go index 3ce686bcb33..27b62ce1f58 100644 --- a/pkg/sink/codec/csv/csv_message_test.go +++ b/pkg/sink/codec/csv/csv_message_test.go @@ -973,9 +973,9 @@ func TestCSVMessageEncode(t *testing.T) { func TestConvertToCSVType(t *testing.T) { for _, group := range csvTestColumnsGroup { for _, c := range group { - val, _ := fromColValToCsvVal(&common.Config{ - BinaryEncodingMethod: c.BinaryEncodingMethod, - }, &c.col, c.colInfo.Ft) + cfg := &common.Config{BinaryEncodingMethod: c.BinaryEncodingMethod} + col := model.Column2ColumnDataXForTest(&c.col) + val, _ := fromColValToCsvVal(cfg, col, c.colInfo.Ft) require.Equal(t, c.want, val, c.col.Name) } } diff --git a/pkg/sink/codec/debezium/codec.go b/pkg/sink/codec/debezium/codec.go index 75c3ada818a..7c62c66388e 100644 --- a/pkg/sink/codec/debezium/codec.go +++ b/pkg/sink/codec/debezium/codec.go @@ -43,14 +43,15 @@ type dbzCodec struct { func (c *dbzCodec) writeDebeziumFieldValues( writer *util.JSONWriter, fieldName string, - cols []*model.Column, + cols []*model.ColumnData, tableInfo *model.TableInfo, ) error { var err error colInfos := tableInfo.GetColInfosForRowChangedEvent() writer.WriteObjectField(fieldName, func() { for i, col := range cols { - err = c.writeDebeziumFieldValue(writer, col, colInfos[i].Ft) + colx := model.GetColumnDataX(col, tableInfo) + err = c.writeDebeziumFieldValue(writer, colx, colInfos[i].Ft) if err != nil { break } @@ -61,17 +62,17 @@ func (c *dbzCodec) writeDebeziumFieldValues( func (c *dbzCodec) writeDebeziumFieldSchema( writer *util.JSONWriter, - col *model.Column, + col model.ColumnDataX, ft *types.FieldType, ) { - switch col.Type { + switch col.GetType() { case mysql.TypeBit: n := ft.GetFlen() if n == 1 { writer.WriteObjectElement(func() { writer.WriteStringField("type", "boolean") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) } else { writer.WriteObjectElement(func() { @@ -82,7 +83,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteObjectField("parameters", func() { writer.WriteStringField("length", fmt.Sprintf("%d", n)) }) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) } @@ -91,7 +92,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteObjectElement(func() { writer.WriteStringField("type", "string") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeEnum: @@ -103,7 +104,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteObjectField("parameters", func() { writer.WriteStringField("allowed", strings.Join(ft.GetElems(), ",")) }) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeSet: @@ -115,14 +116,14 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteObjectField("parameters", func() { writer.WriteStringField("allowed", strings.Join(ft.GetElems(), ",")) }) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeNewDecimal: writer.WriteObjectElement(func() { writer.WriteStringField("type", "double") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeDate, mysql.TypeNewDate: @@ -131,7 +132,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) writer.WriteStringField("name", "io.debezium.time.Date") writer.WriteIntField("version", 1) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeDatetime: @@ -144,7 +145,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteStringField("name", "io.debezium.time.MicroTimestamp") } writer.WriteIntField("version", 1) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeTimestamp: @@ -153,7 +154,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) writer.WriteStringField("name", "io.debezium.time.ZonedTimestamp") writer.WriteIntField("version", 1) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeDuration: @@ -162,7 +163,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) writer.WriteStringField("name", "io.debezium.time.MicroTime") writer.WriteIntField("version", 1) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeJSON: @@ -171,14 +172,14 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) writer.WriteStringField("name", "io.debezium.data.Json") writer.WriteIntField("version", 1) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeTiny: // TINYINT writer.WriteObjectElement(func() { writer.WriteStringField("type", "int16") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeShort: // SMALLINT @@ -189,14 +190,14 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteStringField("type", "int16") } writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeInt24: // MEDIUMINT writer.WriteObjectElement(func() { writer.WriteStringField("type", "int32") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeLong: // INT @@ -207,28 +208,28 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteStringField("type", "int32") } writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeLonglong: // BIGINT writer.WriteObjectElement(func() { writer.WriteStringField("type", "int64") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeFloat: writer.WriteObjectElement(func() { writer.WriteStringField("type", "float") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeDouble: writer.WriteObjectElement(func() { writer.WriteStringField("type", "double") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeYear: @@ -237,7 +238,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) writer.WriteStringField("name", "io.debezium.time.Year") writer.WriteIntField("version", 1) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeTiDBVectorFloat32: @@ -245,13 +246,13 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteStringField("type", "string") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) writer.WriteStringField("name", "io.debezium.data.TiDBVectorFloat32") - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) default: log.Warn( "meet unsupported field type", - zap.Any("fieldType", col.Type), - zap.Any("column", col.Name), + zap.Any("fieldType", col.GetType()), + zap.Any("column", col.GetName()), ) } } @@ -261,21 +262,21 @@ func (c *dbzCodec) writeDebeziumFieldSchema( //revive:disable indent-error-flow func (c *dbzCodec) writeDebeziumFieldValue( writer *util.JSONWriter, - col *model.Column, + col model.ColumnDataX, ft *types.FieldType, ) error { if col.Value == nil { - writer.WriteNullField(col.Name) + writer.WriteNullField(col.GetName()) return nil } - switch col.Type { + switch col.GetType() { case mysql.TypeBit: v, ok := col.Value.(uint64) if !ok { return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for bit column %s", col.Value, - col.Name) + col.GetName()) } // Debezium behavior: @@ -284,7 +285,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( // contain the specified number of bits. n := ft.GetFlen() if n == 1 { - writer.WriteBoolField(col.Name, v != 0) + writer.WriteBoolField(col.GetName(), v != 0) return nil } else { var buf [8]byte @@ -293,7 +294,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( if n%8 != 0 { numBytes += 1 } - c.writeBinaryField(writer, col.Name, buf[:numBytes]) + c.writeBinaryField(writer, col.GetName(), buf[:numBytes]) return nil } @@ -304,14 +305,14 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for string column %s", col.Value, - col.Name) + col.GetName()) } - if col.Flag.IsBinary() { - c.writeBinaryField(writer, col.Name, v) + if col.GetFlag().IsBinary() { + c.writeBinaryField(writer, col.GetName(), v) return nil } else { - writer.WriteStringField(col.Name, string(hack.String(v))) + writer.WriteStringField(col.GetName(), string(hack.String(v))) return nil } @@ -321,17 +322,17 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for enum column %s", col.Value, - col.Name) + col.GetName()) } enumVar, err := types.ParseEnumValue(ft.GetElems(), v) if err != nil { // Invalid enum value inserted in non-strict mode. - writer.WriteStringField(col.Name, "") + writer.WriteStringField(col.GetName(), "") return nil } - writer.WriteStringField(col.Name, enumVar.Name) + writer.WriteStringField(col.GetName(), enumVar.Name) return nil case mysql.TypeSet: @@ -340,17 +341,17 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for set column %s", col.Value, - col.Name) + col.GetName()) } setVar, err := types.ParseSetValue(ft.GetElems(), v) if err != nil { // Invalid enum value inserted in non-strict mode. - writer.WriteStringField(col.Name, "") + writer.WriteStringField(col.GetName(), "") return nil } - writer.WriteStringField(col.Name, setVar.Name) + writer.WriteStringField(col.GetName(), setVar.Name) return nil case mysql.TypeNewDecimal: @@ -359,7 +360,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for decimal column %s", col.Value, - col.Name) + col.GetName()) } floatV, err := strconv.ParseFloat(v, 64) @@ -369,7 +370,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( err) } - writer.WriteFloat64Field(col.Name, floatV) + writer.WriteFloat64Field(col.GetName(), floatV) return nil case mysql.TypeDate, mysql.TypeNewDate: @@ -378,7 +379,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for date column %s", col.Value, - col.Name) + col.GetName()) } t, err := time.Parse("2006-01-02", v) @@ -386,15 +387,15 @@ func (c *dbzCodec) writeDebeziumFieldValue( // For example, time may be invalid like 1000-00-00 // return nil, nil if mysql.HasNotNullFlag(ft.GetFlag()) { - writer.WriteInt64Field(col.Name, 0) + writer.WriteInt64Field(col.GetName(), 0) return nil } else { - writer.WriteNullField(col.Name) + writer.WriteNullField(col.GetName()) return nil } } - writer.WriteInt64Field(col.Name, t.Unix()/60/60/24) + writer.WriteInt64Field(col.GetName(), t.Unix()/60/60/24) return nil case mysql.TypeDatetime: @@ -408,26 +409,26 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for datetime column %s", col.Value, - col.Name) + col.GetName()) } t, err := time.Parse("2006-01-02 15:04:05.999999", v) if err != nil { // For example, time may be 1000-00-00 if mysql.HasNotNullFlag(ft.GetFlag()) { - writer.WriteInt64Field(col.Name, 0) + writer.WriteInt64Field(col.GetName(), 0) return nil } else { - writer.WriteNullField(col.Name) + writer.WriteNullField(col.GetName()) return nil } } if ft.GetDecimal() <= 3 { - writer.WriteInt64Field(col.Name, t.UnixMilli()) + writer.WriteInt64Field(col.GetName(), t.UnixMilli()) return nil } else { - writer.WriteInt64Field(col.Name, t.UnixMicro()) + writer.WriteInt64Field(col.GetName(), t.UnixMicro()) return nil } @@ -446,7 +447,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for timestamp column %s", col.Value, - col.Name) + col.GetName()) } t, err := time.ParseInLocation("2006-01-02 15:04:05.999999", v, c.config.TimeZone) @@ -455,7 +456,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( if mysql.HasNotNullFlag(ft.GetFlag()) { t = time.Unix(0, 0) } else { - writer.WriteNullField(col.Name) + writer.WriteNullField(col.GetName()) return nil } } @@ -468,7 +469,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( } str += "Z" - writer.WriteStringField(col.Name, str) + writer.WriteStringField(col.GetName(), str) return nil case mysql.TypeDuration: @@ -480,7 +481,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for time column %s", col.Value, - col.Name) + col.GetName()) } d, _, _, err := types.StrToDuration(types.DefaultStmtNoWarningContext, v, ft.GetDecimal()) @@ -490,11 +491,11 @@ func (c *dbzCodec) writeDebeziumFieldValue( err) } - writer.WriteInt64Field(col.Name, d.Microseconds()) + writer.WriteInt64Field(col.GetName(), d.Microseconds()) return nil case mysql.TypeLonglong: - if col.Flag.IsUnsigned() { + if col.GetFlag().IsUnsigned() { // Handle with BIGINT UNSIGNED. // Debezium always produce INT64 instead of UINT64 for BIGINT. v, ok := col.Value.(uint64) @@ -502,16 +503,16 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for unsigned bigint column %s", col.Value, - col.Name) + col.GetName()) } - writer.WriteInt64Field(col.Name, int64(v)) + writer.WriteInt64Field(col.GetName(), int64(v)) return nil } case mysql.TypeTiDBVectorFloat32: v := col.Value.(types.VectorFloat32).String() - writer.WriteStringField(col.Name, v) + writer.WriteStringField(col.GetName(), v) return nil // Note: Although Debezium's doc claims to use INT32 for INT, but it @@ -519,7 +520,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( // So we only handle with TypeLonglong here. } - writer.WriteAnyField(col.Name, col.Value) + writer.WriteAnyField(col.GetName(), col.Value) return nil } @@ -589,18 +590,18 @@ func (c *dbzCodec) EncodeRowChangedEvent( // after: An optional field that specifies the state of the row after the event occurred. // Optional field that specifies the state of the row after the event occurred. // In a delete event value, the after field is null, signifying that the row no longer exists. - err = c.writeDebeziumFieldValues(jWriter, "after", e.GetColumns(), e.TableInfo) + err = c.writeDebeziumFieldValues(jWriter, "after", e.Columns, e.TableInfo) } else if e.IsDelete() { jWriter.WriteStringField("op", "d") jWriter.WriteNullField("after") - err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreColumns(), e.TableInfo) + err = c.writeDebeziumFieldValues(jWriter, "before", e.PreColumns, e.TableInfo) } else if e.IsUpdate() { jWriter.WriteStringField("op", "u") if c.config.DebeziumOutputOldValue { - err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreColumns(), e.TableInfo) + err = c.writeDebeziumFieldValues(jWriter, "before", e.PreColumns, e.TableInfo) } if err == nil { - err = c.writeDebeziumFieldValues(jWriter, "after", e.GetColumns(), e.TableInfo) + err = c.writeDebeziumFieldValues(jWriter, "after", e.Columns, e.TableInfo) } } }) @@ -621,17 +622,18 @@ func (c *dbzCodec) EncodeRowChangedEvent( { fieldsBuf := &bytes.Buffer{} fieldsWriter := util.BorrowJSONWriter(fieldsBuf) - var validCols []*model.Column + var validCols []*model.ColumnData if e.IsInsert() { - validCols = e.GetColumns() + validCols = e.Columns } else if e.IsDelete() { - validCols = e.GetPreColumns() + validCols = e.PreColumns } else if e.IsUpdate() { - validCols = e.GetColumns() + validCols = e.Columns } colInfos := e.TableInfo.GetColInfosForRowChangedEvent() for i, col := range validCols { - c.writeDebeziumFieldSchema(fieldsWriter, col, colInfos[i].Ft) + colx := model.GetColumnDataX(col, e.TableInfo) + c.writeDebeziumFieldSchema(fieldsWriter, colx, colInfos[i].Ft) } util.ReturnJSONWriter(fieldsWriter) fieldsJSON = fieldsBuf.String() diff --git a/pkg/sink/codec/internal/column.go b/pkg/sink/codec/internal/column.go index 0070f3f138c..9a360e4eeb9 100644 --- a/pkg/sink/codec/internal/column.go +++ b/pkg/sink/codec/internal/column.go @@ -29,24 +29,23 @@ import ( type Column struct { Type byte `json:"t"` // Deprecated: please use Flag instead. - WhereHandle *bool `json:"h,omitempty"` + WhereHandle bool `json:"h,omitempty"` Flag model.ColumnFlagType `json:"f"` Value any `json:"v"` } // FromRowChangeColumn converts from a row changed column to a codec column. -func (c *Column) FromRowChangeColumn(col *model.Column) { - c.Type = col.Type - c.Flag = col.Flag +func (c *Column) FromRowChangeColumn(col model.ColumnDataX) { + c.Type = col.GetType() + c.Flag = col.GetFlag() if c.Flag.IsHandleKey() { - whereHandle := true - c.WhereHandle = &whereHandle + c.WhereHandle = true } if col.Value == nil { c.Value = nil return } - switch col.Type { + switch c.Type { case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar: var str string switch col.Value.(type) { diff --git a/pkg/sink/codec/open/open_protocol_message.go b/pkg/sink/codec/open/open_protocol_message.go index 2e2a64df5dc..d9124c2063e 100644 --- a/pkg/sink/codec/open/open_protocol_message.go +++ b/pkg/sink/codec/open/open_protocol_message.go @@ -140,14 +140,14 @@ func rowChangeToMsg( value := &messageRow{} if e.IsDelete() { onlyHandleKeyColumns := config.DeleteOnlyHandleKeyColumns || largeMessageOnlyHandleKeyColumns - value.Delete = rowChangeColumns2CodecColumns(e.GetPreColumns(), onlyHandleKeyColumns) + value.Delete = rowChangeColumns2CodecColumns(e.PreColumns, e.TableInfo, onlyHandleKeyColumns) if onlyHandleKeyColumns && len(value.Delete) == 0 { return nil, nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found handle key columns for the delete event") } } else if e.IsUpdate() { - value.Update = rowChangeColumns2CodecColumns(e.GetColumns(), largeMessageOnlyHandleKeyColumns) + value.Update = rowChangeColumns2CodecColumns(e.Columns, e.TableInfo, largeMessageOnlyHandleKeyColumns) if config.OpenOutputOldValue { - value.PreColumns = rowChangeColumns2CodecColumns(e.GetPreColumns(), largeMessageOnlyHandleKeyColumns) + value.PreColumns = rowChangeColumns2CodecColumns(e.PreColumns, e.TableInfo, largeMessageOnlyHandleKeyColumns) } if largeMessageOnlyHandleKeyColumns && (len(value.Update) == 0 || (len(value.PreColumns) == 0 && config.OpenOutputOldValue)) { @@ -157,7 +157,7 @@ func rowChangeToMsg( value.dropNotUpdatedColumns() } } else { - value.Update = rowChangeColumns2CodecColumns(e.GetColumns(), largeMessageOnlyHandleKeyColumns) + value.Update = rowChangeColumns2CodecColumns(e.Columns, e.TableInfo, largeMessageOnlyHandleKeyColumns) if largeMessageOnlyHandleKeyColumns && len(value.Update) == 0 { return nil, nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found handle key columns for the insert event") } @@ -198,18 +198,16 @@ func msgToRowChange(key *internal.MessageKey, value *messageRow) *model.RowChang return e } -func rowChangeColumns2CodecColumns(cols []*model.Column, onlyHandleKeyColumns bool) map[string]internal.Column { +func rowChangeColumns2CodecColumns(cols []*model.ColumnData, tb *model.TableInfo, onlyHandleKeyColumns bool) map[string]internal.Column { jsonCols := make(map[string]internal.Column, len(cols)) for _, col := range cols { - if col == nil { - continue - } - if onlyHandleKeyColumns && !col.Flag.IsHandleKey() { - continue - } + colx := model.GetColumnDataX(col, tb) + if colx.ColumnData == nil || onlyHandleKeyColumns && !colx.GetFlag().IsHandleKey() { + continue + } c := internal.Column{} - c.FromRowChangeColumn(col) - jsonCols[col.Name] = c + c.FromRowChangeColumn(colx) + jsonCols[colx.GetName()] = c } if len(jsonCols) == 0 { return nil diff --git a/pkg/sink/codec/open/open_protocol_message_test.go b/pkg/sink/codec/open/open_protocol_message_test.go index 51ba13d5439..94744353fb4 100644 --- a/pkg/sink/codec/open/open_protocol_message_test.go +++ b/pkg/sink/codec/open/open_protocol_message_test.go @@ -60,7 +60,7 @@ func TestNonBinaryStringCol(t *testing.T) { Value: "value", } mqCol := internal.Column{} - mqCol.FromRowChangeColumn(col) + mqCol.FromRowChangeColumn(model.Column2ColumnDataXForTest(col)) row := &messageRow{Update: map[string]internal.Column{"test": mqCol}} rowEncode, err := row.encode() require.NoError(t, err) @@ -83,7 +83,7 @@ func TestVarBinaryCol(t *testing.T) { Value: []byte{0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A}, } mqCol := internal.Column{} - mqCol.FromRowChangeColumn(col) + mqCol.FromRowChangeColumn(model.Column2ColumnDataXForTest(col)) row := &messageRow{Update: map[string]internal.Column{"test": mqCol}} rowEncode, err := row.encode() require.NoError(t, err) diff --git a/pkg/util/bitflag.go b/pkg/util/bitflag.go index 58c446794ca..089e521ec05 100644 --- a/pkg/util/bitflag.go +++ b/pkg/util/bitflag.go @@ -17,9 +17,9 @@ package util type Flag uint64 // HasAll means has all flags -func (f *Flag) HasAll(flags ...Flag) bool { +func (f Flag) HasAll(flags ...Flag) bool { for _, flag := range flags { - if flag&*f == 0 { + if flag&f == 0 { return false } } @@ -27,9 +27,9 @@ func (f *Flag) HasAll(flags ...Flag) bool { } // HasOne means has one of the flags -func (f *Flag) HasOne(flags ...Flag) bool { +func (f Flag) HasOne(flags ...Flag) bool { for _, flag := range flags { - if flag&*f != 0 { + if flag&f != 0 { return true } } diff --git a/tools/check/go.sum b/tools/check/go.sum index 3f344cd5950..58d14e98667 100644 --- a/tools/check/go.sum +++ b/tools/check/go.sum @@ -879,6 +879,7 @@ golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=