Skip to content

Commit

Permalink
codec(ticdc): canal-json support compatible content by output detaile…
Browse files Browse the repository at this point in the history
…d mysql type information (#10014) (#11415)

close #10106
  • Loading branch information
ti-chi-bot authored Jul 26, 2024
1 parent e652cf1 commit c7a5513
Show file tree
Hide file tree
Showing 37 changed files with 2,013 additions and 346 deletions.
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
EnableKafkaSinkV2: c.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: c.Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: c.Sink.DeleteOnlyOutputHandleKeyColumns,
ContentCompatible: c.Sink.ContentCompatible,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
PulsarConfig: pulsarConfig,
Expand Down Expand Up @@ -795,6 +796,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
EnableKafkaSinkV2: cloned.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: cloned.Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: cloned.Sink.DeleteOnlyOutputHandleKeyColumns,
ContentCompatible: cloned.Sink.ContentCompatible,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
PulsarConfig: pulsarConfig,
Expand Down Expand Up @@ -992,6 +994,7 @@ type SinkConfig struct {
EnableKafkaSinkV2 *bool `json:"enable_kafka_sink_v2,omitempty"`
OnlyOutputUpdatedColumns *bool `json:"only_output_updated_columns,omitempty"`
DeleteOnlyOutputHandleKeyColumns *bool `json:"delete_only_output_handle_key_columns"`
ContentCompatible *bool `json:"content_compatible"`
SafeMode *bool `json:"safe_mode,omitempty"`
KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"`
PulsarConfig *PulsarConfig `json:"pulsar_config,omitempty"`
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var defaultAPIConfig = &ReplicaConfig{
EnableKafkaSinkV2: util.AddressOf(false),
OnlyOutputUpdatedColumns: util.AddressOf(false),
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
ContentCompatible: util.AddressOf(false),
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
Expand Down
1 change: 1 addition & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ func (info *ChangeFeedInfo) rmMQOnlyFields() {
info.Config.Sink.EnableKafkaSinkV2 = nil
info.Config.Sink.OnlyOutputUpdatedColumns = nil
info.Config.Sink.DeleteOnlyOutputHandleKeyColumns = nil
info.Config.Sink.ContentCompatible = nil
info.Config.Sink.KafkaConfig = nil
}

Expand Down
3 changes: 3 additions & 0 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ func (ti *TableInfo) initColumnsFlag() {
if mysql.HasUnsignedFlag(colInfo.GetFlag()) {
flag.SetIsUnsigned()
}
if mysql.HasZerofillFlag(colInfo.GetFlag()) {
flag.SetZeroFill()
}
ti.ColumnsFlag[colInfo.ID] = flag
}

Expand Down
12 changes: 12 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,20 @@ const (
NullableFlag
// UnsignedFlag means the column stores an unsigned integer
UnsignedFlag
// ZerofillFlag means the column is zerofill
ZerofillFlag
)

// SetZeroFill sets ZerofillFlag
func (b *ColumnFlagType) SetZeroFill() {
(*util.Flag)(b).Add(util.Flag(ZerofillFlag))
}

// IsZerofill shows whether ZerofillFlag is set
func (b *ColumnFlagType) IsZerofill() bool {
return (*util.Flag)(b).HasAll(util.Flag(ZerofillFlag))
}

// SetIsBinary sets BinaryFlag
func (b *ColumnFlagType) SetIsBinary() {
(*util.Flag)(b).Add(util.Flag(BinaryFlag))
Expand Down
6 changes: 5 additions & 1 deletion cdc/sink/dmlsink/mq/mq_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"testing"
"time"

"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/types"
"github.com/pingcap/tidb/pkg/util/rowcodec"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
Expand Down Expand Up @@ -85,7 +88,8 @@ func TestWriteEvents(t *testing.T) {
row := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
}

events := make([]*dmlsink.CallbackableEvent[*model.SingleTableTxn], 0, 3000)
Expand Down
33 changes: 23 additions & 10 deletions cdc/sink/dmlsink/mq/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"testing"
"time"

"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/rowcodec"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
Expand Down Expand Up @@ -83,7 +86,8 @@ func TestNonBatchEncode_SendMessages(t *testing.T) {
row := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
}
tableStatus := state.TableSinkSinking

Expand Down Expand Up @@ -354,7 +358,8 @@ func TestBatchEncode_SendMessages(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -366,7 +371,8 @@ func TestBatchEncode_SendMessages(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -378,7 +384,8 @@ func TestBatchEncode_SendMessages(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 3,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "cc"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -390,7 +397,8 @@ func TestBatchEncode_SendMessages(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "aa", Table: "bb"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -402,7 +410,8 @@ func TestBatchEncode_SendMessages(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "aaa", Table: "bbb"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &tableStatus,
Expand All @@ -414,7 +423,8 @@ func TestBatchEncode_SendMessages(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 3,
Table: &model.TableName{Schema: "aaa", Table: "bbb"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &tableStatus,
Expand Down Expand Up @@ -494,7 +504,8 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "aa"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand All @@ -506,7 +517,8 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "bb"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &replicatingStatus,
Expand All @@ -518,7 +530,8 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
Event: &model.RowChangedEvent{
CommitTs: 3,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "cc"}},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: "cc"}},
ColInfos: []rowcodec.ColInfo{{ID: 1, Ft: types.NewFieldType(mysql.TypeVarchar)}},
},
Callback: func() {},
SinkState: &stoppedStatus,
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
EnableKafkaSinkV2: util.AddressOf(false),
OnlyOutputUpdatedColumns: util.AddressOf(false),
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
ContentCompatible: util.AddressOf(false),
Protocol: util.AddressOf("open-protocol"),
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
Expand Down Expand Up @@ -251,6 +252,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
},
OnlyOutputUpdatedColumns: util.AddressOf(false),
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
ContentCompatible: util.AddressOf(false),
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
Expand Down
9 changes: 6 additions & 3 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ const (
}
],
"enable-partition-separator": true,
"protocol": "open-protocol",
"protocol": "canal-json",
"enable-kafka-sink-v2": false,
"only-output-updated-columns": false,
"delete-only-output-handle-key-columns": false,
"content-compatible": false,
"large-message-handle": {
"large-message-handle-option": "none",
"large-message-handle-compression": "",
Expand Down Expand Up @@ -211,7 +212,7 @@ const (
},
"sink": {
"encoder-concurrency": 32,
"protocol": "open-protocol",
"protocol": "canal-json",
"column-selectors": [
{
"matcher": [
Expand All @@ -235,6 +236,7 @@ const (
"enable-kafka-sink-v2": true,
"only-output-updated-columns": true,
"delete-only-output-handle-key-columns": true,
"content-compatible": true,
"safe-mode": true,
"terminator": "\r\n",
"transaction-atomicity": "",
Expand Down Expand Up @@ -378,7 +380,7 @@ const (
"sink": {
"encoder-concurrency": 32,
"dispatchers": null,
"protocol": "open-protocol",
"protocol": "canal-json",
"column-selectors": [
{
"matcher": [
Expand All @@ -404,6 +406,7 @@ const (
"enable-kafka-sink-v2": true,
"only-output-updated-columns": true,
"delete-only-output-handle-key-columns": true,
"content-compatible": true,
"safe-mode": true,
"kafka-config": {
"partition-num": 1,
Expand Down
1 change: 1 addition & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ var defaultReplicaConfig = &ReplicaConfig{
EnableKafkaSinkV2: util.AddressOf(false),
OnlyOutputUpdatedColumns: util.AddressOf(false),
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
ContentCompatible: util.AddressOf(false),
TiDBSourceID: DefaultTiDBSourceID,
AdvanceTimeoutInSec: util.AddressOf(DefaultAdvanceTimeoutInSec),
SendBootstrapIntervalInSec: util.AddressOf(DefaultSendBootstrapIntervalInSec),
Expand Down
5 changes: 3 additions & 2 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestReplicaConfigMarshal(t *testing.T) {
conf.ForceReplicate = true
conf.Filter.Rules = []string{"1.1"}
conf.Mounter.WorkerNum = 3
conf.Sink.Protocol = util.AddressOf("open-protocol")
conf.Sink.Protocol = util.AddressOf("canal-json")
conf.Sink.ColumnSelectors = []*ColumnSelector{
{
Matcher: []string{"1.1"},
Expand All @@ -66,6 +66,7 @@ func TestReplicaConfigMarshal(t *testing.T) {

conf.Sink.OnlyOutputUpdatedColumns = aws.Bool(true)
conf.Sink.DeleteOnlyOutputHandleKeyColumns = aws.Bool(true)
conf.Sink.ContentCompatible = aws.Bool(true)
conf.Sink.SafeMode = aws.Bool(true)
conf.Sink.AdvanceTimeoutInSec = util.AddressOf(uint(150))
conf.Sink.KafkaConfig = &KafkaConfig{
Expand Down Expand Up @@ -180,7 +181,7 @@ func TestReplicaConfigOutDated(t *testing.T) {
conf.ForceReplicate = true
conf.Filter.Rules = []string{"1.1"}
conf.Mounter.WorkerNum = 3
conf.Sink.Protocol = util.AddressOf("open-protocol")
conf.Sink.Protocol = util.AddressOf("canal-json")
conf.Sink.DispatchRules = []*DispatchRule{
{Matcher: []string{"a.b"}, DispatcherRule: "r1"},
{Matcher: []string{"a.c"}, DispatcherRule: "r2"},
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ type SinkConfig struct {
// DeleteOnlyOutputHandleKeyColumns is only available when the downstream is MQ.
DeleteOnlyOutputHandleKeyColumns *bool `toml:"delete-only-output-handle-key-columns" json:"delete-only-output-handle-key-columns,omitempty"`

// ContentCompatible is only available when the downstream is MQ.
ContentCompatible *bool `toml:"content-compatible" json:"content-compatible,omitempty"`

// TiDBSourceID is the source ID of the upstream TiDB,
// which is used to set the `tidb_cdc_write_source` session variable.
// Note: This field is only used internally and only used in the MySQL sink.
Expand Down
3 changes: 3 additions & 0 deletions pkg/orchestrator/reactor_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func TestChangefeedStateUpdate(t *testing.T) {
EnablePartitionSeparator: config.GetDefaultReplicaConfig().Sink.EnablePartitionSeparator,
EnableKafkaSinkV2: config.GetDefaultReplicaConfig().Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns,
ContentCompatible: config.GetDefaultReplicaConfig().Sink.ContentCompatible,
DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns,
SendBootstrapIntervalInSec: config.GetDefaultReplicaConfig().Sink.SendBootstrapIntervalInSec,
SendBootstrapInMsgCount: config.GetDefaultReplicaConfig().Sink.SendBootstrapInMsgCount,
Expand Down Expand Up @@ -193,6 +194,7 @@ func TestChangefeedStateUpdate(t *testing.T) {
EnablePartitionSeparator: config.GetDefaultReplicaConfig().Sink.EnablePartitionSeparator,
EnableKafkaSinkV2: config.GetDefaultReplicaConfig().Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns,
ContentCompatible: config.GetDefaultReplicaConfig().Sink.ContentCompatible,
DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns,
SendBootstrapIntervalInSec: config.GetDefaultReplicaConfig().Sink.SendBootstrapIntervalInSec,
SendBootstrapInMsgCount: config.GetDefaultReplicaConfig().Sink.SendBootstrapInMsgCount,
Expand Down Expand Up @@ -267,6 +269,7 @@ func TestChangefeedStateUpdate(t *testing.T) {
EnablePartitionSeparator: config.GetDefaultReplicaConfig().Sink.EnablePartitionSeparator,
EnableKafkaSinkV2: config.GetDefaultReplicaConfig().Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns,
ContentCompatible: config.GetDefaultReplicaConfig().Sink.ContentCompatible,
DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns,
SendBootstrapIntervalInSec: config.GetDefaultReplicaConfig().Sink.SendBootstrapIntervalInSec,
SendBootstrapInMsgCount: config.GetDefaultReplicaConfig().Sink.SendBootstrapInMsgCount,
Expand Down
Loading

0 comments on commit c7a5513

Please sign in to comment.