diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index b8472ff0122..6e365fe9d91 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -375,6 +375,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( InsecureSkipVerify: c.Sink.KafkaConfig.InsecureSkipVerify, CodecConfig: codeConfig, LargeMessageHandle: largeMessageHandle, + OutputRawChangeEvent: c.Sink.KafkaConfig.OutputRawChangeEvent, } } var mysqlConfig *config.MySQLConfig @@ -400,13 +401,14 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( var cloudStorageConfig *config.CloudStorageConfig if c.Sink.CloudStorageConfig != nil { cloudStorageConfig = &config.CloudStorageConfig{ - WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, - FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, - FileSize: c.Sink.CloudStorageConfig.FileSize, - OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, - FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays, - FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, - FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, + WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, + FileSize: c.Sink.CloudStorageConfig.FileSize, + OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, + FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, + OutputRawChangeEvent: c.Sink.CloudStorageConfig.OutputRawChangeEvent, } } @@ -606,6 +608,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { InsecureSkipVerify: cloned.Sink.KafkaConfig.InsecureSkipVerify, CodecConfig: codeConfig, LargeMessageHandle: largeMessageHandle, + OutputRawChangeEvent: cloned.Sink.KafkaConfig.OutputRawChangeEvent, } } var mysqlConfig *MySQLConfig @@ -631,13 +634,14 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { var cloudStorageConfig *CloudStorageConfig if cloned.Sink.CloudStorageConfig != nil { cloudStorageConfig = &CloudStorageConfig{ - WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, - FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, - FileSize: cloned.Sink.CloudStorageConfig.FileSize, - OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, - FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays, - FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, - FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, + WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, + FileSize: cloned.Sink.CloudStorageConfig.FileSize, + OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, + FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, + OutputRawChangeEvent: cloned.Sink.CloudStorageConfig.OutputRawChangeEvent, } } @@ -1081,7 +1085,8 @@ type KafkaConfig struct { InsecureSkipVerify *bool `json:"insecure_skip_verify,omitempty"` CodecConfig *CodecConfig `json:"codec_config,omitempty"` - LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"` + LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"` + OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"` } // MySQLConfig represents a MySQL sink configuration @@ -1105,13 +1110,14 @@ type MySQLConfig struct { // CloudStorageConfig represents a cloud storage sink configuration type CloudStorageConfig struct { - WorkerCount *int `json:"worker_count,omitempty"` - FlushInterval *string `json:"flush_interval,omitempty"` - FileSize *int `json:"file_size,omitempty"` - OutputColumnID *bool `json:"output_column_id,omitempty"` - FileExpirationDays *int `json:"file_expiration_days,omitempty"` - FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` - FlushConcurrency *int `json:"flush_concurrency,omitempty"` + WorkerCount *int `json:"worker_count,omitempty"` + FlushInterval *string `json:"flush_interval,omitempty"` + FileSize *int `json:"file_size,omitempty"` + OutputColumnID *bool `json:"output_column_id,omitempty"` + FileExpirationDays *int `json:"file_expiration_days,omitempty"` + FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` + FlushConcurrency *int `json:"flush_concurrency,omitempty"` + OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"` } // ChangefeedStatus holds common information of a changefeed in cdc diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 64c8244a7c6..b3cd19420af 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -272,7 +272,7 @@ func (r *RedoLog) GetCommitTs() Ts { } // TrySplitAndSortUpdateEvent redo log do nothing -func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string) error { +func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string, _ bool) error { return nil } @@ -380,7 +380,7 @@ func (r *RowChangedEvent) GetCommitTs() uint64 { } // TrySplitAndSortUpdateEvent do nothing -func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string) error { +func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string, _ bool) error { return nil } @@ -758,10 +758,19 @@ func (t *SingleTableTxn) GetCommitTs() uint64 { } // TrySplitAndSortUpdateEvent split update events if unique key is updated -func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error { - if t.dontSplitUpdateEvent(scheme) { +func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error { + if sink.IsMySQLCompatibleScheme(scheme) || outputRawChangeEvent { + // For MySQL Sink, all update events will be split into insert and delete at the puller side + // according to whether the changefeed is in safemode. We don't split update event here(in sink) + // since there may be OOM issues. For more information, ref https://github.com/tikv/tikv/issues/17062. + // + // For the Kafka and Storage sink, the outputRawChangeEvent parameter is introduced to control + // split behavior. TiCDC only output original change event if outputRawChangeEvent is true. return nil } + + // Try to split update events for the Kafka and Storage sink if outputRawChangeEvent is false. + // Note it is only for backward compatibility, and we should remove this logic in the future. newRows, err := trySplitAndSortUpdateEvent(t.Rows) if err != nil { return errors.Trace(err) @@ -770,21 +779,6 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error { return nil } -// Whether split a single update event into delete and insert events? -// -// For the MySQL Sink, we don't split any update event. -// This may cause error like "duplicate entry" when sink to the downstream. -// This kind of error will cause the changefeed to restart, -// and then the related update rows will be splitted to insert and delete at puller side. -// -// For the Kafka and Storage sink, always split a single unique key changed update event, since: -// 1. Avro and CSV does not output the previous column values for the update event, so it would -// cause consumer missing data if the unique key changed event is not split. -// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split. -func (t *SingleTableTxn) dontSplitUpdateEvent(scheme string) bool { - return sink.IsMySQLCompatibleScheme(scheme) -} - // trySplitAndSortUpdateEvent try to split update events if unique key is updated // returns true if some updated events is split func trySplitAndSortUpdateEvent( @@ -794,8 +788,7 @@ func trySplitAndSortUpdateEvent( split := false for _, e := range events { if e == nil { - log.Warn("skip emit nil event", - zap.Any("event", e)) + log.Warn("skip emit nil event", zap.Any("event", e)) continue } @@ -805,8 +798,7 @@ func trySplitAndSortUpdateEvent( // begin; insert into t (id) values (1); delete from t where id=1; commit; // Just ignore these row changed events. if colLen == 0 && preColLen == 0 { - log.Warn("skip emit empty row event", - zap.Any("event", e)) + log.Warn("skip emit empty row event", zap.Any("event", e)) continue } @@ -832,7 +824,7 @@ func trySplitAndSortUpdateEvent( // ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on // whether the handle key column or unique key has been modified. -// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event. +// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event. func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool { // nil event will never be split. if updateEvent == nil { @@ -875,6 +867,13 @@ func SplitUpdateEvent( // NOTICE: clean up pre cols for insert event. insertEvent.PreColumns = nil + log.Debug("split update event", zap.Uint64("startTs", updateEvent.StartTs), + zap.Uint64("commitTs", updateEvent.CommitTs), + zap.String("schema", updateEvent.TableInfo.TableName.Schema), + zap.String("table", updateEvent.TableInfo.TableName.Table), + zap.Any("preCols", updateEvent.PreColumns), + zap.Any("cols", updateEvent.Columns)) + return &deleteEvent, &insertEvent, nil } diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index 31f05b8d679..43fe6fa43ef 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -598,21 +598,32 @@ func TestTrySplitAndSortUpdateEventOne(t *testing.T) { Rows: []*RowChangedEvent{ukUpdatedEvent}, } - err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme) + outputRawChangeEvent := true + notOutputRawChangeEvent := false + err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, outputRawChangeEvent) + require.NoError(t, err) + require.Len(t, txn.Rows, 1) + err = txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, notOutputRawChangeEvent) require.NoError(t, err) require.Len(t, txn.Rows, 2) txn = &SingleTableTxn{ Rows: []*RowChangedEvent{ukUpdatedEvent}, } - err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme) + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent) + require.NoError(t, err) + require.Len(t, txn.Rows, 1) + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent) require.NoError(t, err) require.Len(t, txn.Rows, 1) txn2 := &SingleTableTxn{ Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent}, } - err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme) + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent) + require.NoError(t, err) + require.Len(t, txn2.Rows, 2) + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent) require.NoError(t, err) require.Len(t, txn2.Rows, 2) } diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index bef98461141..99d03b07027 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -52,6 +52,10 @@ func (m *mockSink) WriteEvents(events ...*dmlsink.CallbackableEvent[*model.RowCh return nil } +func (m *mockSink) SchemeOption() (string, bool) { + return sink.BlackHoleScheme, false +} + func (m *mockSink) GetEvents() []*dmlsink.CallbackableEvent[*model.RowChangedEvent] { m.mu.Lock() defer m.mu.Unlock() @@ -70,10 +74,6 @@ func (m *mockSink) Dead() <-chan struct{} { return make(chan struct{}) } -func (m *mockSink) Scheme() string { - return sink.BlackHoleScheme -} - func (m *mockSink) AckAllEvents() { m.mu.Lock() defer m.mu.Unlock() diff --git a/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go b/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go index 3d146a50a89..c3c90b31723 100644 --- a/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go +++ b/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go @@ -55,7 +55,7 @@ func (s *DMLSink) Dead() <-chan struct{} { return make(chan struct{}) } -// Scheme returns the sink scheme. -func (s *DMLSink) Scheme() string { - return sink.BlackHoleScheme +// SchemeOption returns the scheme and the option. +func (s *DMLSink) SchemeOption() (string, bool) { + return sink.BlackHoleScheme, true } diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index c0fa549752c..f309ce62ace 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -66,8 +66,9 @@ type eventFragment struct { // DMLSink is the cloud storage sink. // It will send the events to cloud storage systems. type DMLSink struct { - changefeedID model.ChangeFeedID - scheme string + changefeedID model.ChangeFeedID + scheme string + outputRawChangeEvent bool // last sequence number lastSeqNum uint64 // encodingWorkers defines a group of workers for encoding events. @@ -133,13 +134,14 @@ func NewDMLSink(ctx context.Context, wgCtx, wgCancel := context.WithCancel(ctx) s := &DMLSink{ - changefeedID: contextutil.ChangefeedIDFromCtx(wgCtx), - scheme: strings.ToLower(sinkURI.Scheme), - encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency), - workers: make([]*dmlWorker, cfg.WorkerCount), - statistics: metrics.NewStatistics(wgCtx, sink.TxnSink), - cancel: wgCancel, - dead: make(chan struct{}), + changefeedID: contextutil.ChangefeedIDFromCtx(wgCtx), + scheme: strings.ToLower(sinkURI.Scheme), + outputRawChangeEvent: replicaConfig.Sink.CloudStorageConfig.GetOutputRawChangeEvent(), + encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency), + workers: make([]*dmlWorker, cfg.WorkerCount), + statistics: metrics.NewStatistics(wgCtx, sink.TxnSink), + cancel: wgCancel, + dead: make(chan struct{}), } s.alive.msgCh = chann.NewAutoDrainChann[eventFragment]() @@ -244,11 +246,6 @@ func (s *DMLSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa return nil } -// Scheme returns the sink scheme. -func (s *DMLSink) Scheme() string { - return s.scheme -} - // Close closes the cloud storage sink. func (s *DMLSink) Close() { if s.cancel != nil { @@ -273,3 +270,8 @@ func (s *DMLSink) Close() { func (s *DMLSink) Dead() <-chan struct{} { return s.dead } + +// SchemeOption returns the scheme and the option. +func (s *DMLSink) SchemeOption() (string, bool) { + return s.scheme, s.outputRawChangeEvent +} diff --git a/cdc/sink/dmlsink/event.go b/cdc/sink/dmlsink/event.go index 3c9032aa11e..7126be06bbe 100644 --- a/cdc/sink/dmlsink/event.go +++ b/cdc/sink/dmlsink/event.go @@ -23,8 +23,7 @@ type TableEvent interface { // GetCommitTs returns the commit timestamp of the event. GetCommitTs() uint64 // TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated - // Note that sinkScheme is used to control the split behavior. - TrySplitAndSortUpdateEvent(scheme string) error + TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error } // CallbackFunc is the callback function for callbackable event. diff --git a/cdc/sink/dmlsink/event_sink.go b/cdc/sink/dmlsink/event_sink.go index 4210d9ca022..660ed190019 100644 --- a/cdc/sink/dmlsink/event_sink.go +++ b/cdc/sink/dmlsink/event_sink.go @@ -18,8 +18,8 @@ type EventSink[E TableEvent] interface { // WriteEvents writes events to the sink. // This is an asynchronously and thread-safe method. WriteEvents(events ...*CallbackableEvent[E]) error - // Scheme returns the sink scheme. - Scheme() string + // SchemeOption returns the sink scheme and whether the sink should output raw change event. + SchemeOption() (scheme string, outputRawChangeEvent bool) // Close closes the sink. Can be called with `WriteEvents` concurrently. Close() // The EventSink meets internal errors and has been dead already. diff --git a/cdc/sink/dmlsink/mq/kafka_dml_sink.go b/cdc/sink/dmlsink/mq/kafka_dml_sink.go index 69075fe3f49..c880f760d48 100644 --- a/cdc/sink/dmlsink/mq/kafka_dml_sink.go +++ b/cdc/sink/dmlsink/mq/kafka_dml_sink.go @@ -103,7 +103,10 @@ func NewKafkaDMLSink( return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } - log.Info("Try to create a DML sink producer", zap.Any("options", options)) + log.Info("Try to create a DML sink producer", + zap.String("namespace", changefeed.Namespace), + zap.String("changefeedID", changefeed.ID), + zap.Any("options", options)) failpointCh := make(chan error, 1) asyncProducer, err := factory.AsyncProducer(ctx, failpointCh) if err != nil { @@ -113,7 +116,11 @@ func NewKafkaDMLSink( metricsCollector := factory.MetricsCollector(tiflowutil.RoleProcessor, adminClient) dmlProducer := producerCreator(ctx, changefeed, asyncProducer, metricsCollector, errCh, failpointCh) s := newDMLSink(ctx, sinkURI, changefeed, dmlProducer, adminClient, topicManager, eventRouter, encoderBuilder, - replicaConfig.Sink.EncoderConcurrency, protocol, errCh) + replicaConfig.Sink.EncoderConcurrency, protocol, replicaConfig.Sink.KafkaConfig.GetOutputRawChangeEvent(), errCh) + + log.Info("DML sink producer created", + zap.String("namespace", changefeed.Namespace), + zap.String("changefeedID", changefeed.ID)) return s, nil } diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index d04e98f0d38..82a98510841 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -44,8 +44,7 @@ var _ dmlsink.EventSink[*model.SingleTableTxn] = (*dmlSink)(nil) // It will send the events to the MQ system. type dmlSink struct { // id indicates this sink belongs to which processor(changefeed). - id model.ChangeFeedID - scheme string + id model.ChangeFeedID // protocol indicates the protocol used by this sink. protocol config.Protocol @@ -69,6 +68,9 @@ type dmlSink struct { wg sync.WaitGroup dead chan struct{} + + scheme string + outputRawChangeEvent bool } func newDMLSink( @@ -82,6 +84,7 @@ func newDMLSink( encoderBuilder codec.RowEventEncoderBuilder, encoderConcurrency int, protocol config.Protocol, + outputRawChangeEvent bool, errCh chan error, ) *dmlSink { ctx, cancel := context.WithCancelCause(ctx) @@ -90,13 +93,14 @@ func newDMLSink( encoderBuilder, encoderConcurrency, producer, statistics) s := &dmlSink{ - id: changefeedID, - scheme: strings.ToLower(sinkURI.Scheme), - protocol: protocol, - adminClient: adminClient, - ctx: ctx, - cancel: cancel, - dead: make(chan struct{}), + id: changefeedID, + scheme: strings.ToLower(sinkURI.Scheme), + protocol: protocol, + adminClient: adminClient, + ctx: ctx, + cancel: cancel, + dead: make(chan struct{}), + outputRawChangeEvent: outputRawChangeEvent, } s.alive.eventRouter = eventRouter s.alive.topicManager = topicManager @@ -190,10 +194,6 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa return nil } -func (s *dmlSink) Scheme() string { - return s.scheme -} - // Close closes the sink. func (s *dmlSink) Close() { if s.cancel != nil { @@ -216,3 +216,8 @@ func (s *dmlSink) Close() { func (s *dmlSink) Dead() <-chan struct{} { return s.dead } + +// Scheme returns the scheme of this sink. +func (s *dmlSink) SchemeOption() (string, bool) { + return s.scheme, s.outputRawChangeEvent +} diff --git a/cdc/sink/dmlsink/txn/txn_dml_sink.go b/cdc/sink/dmlsink/txn/txn_dml_sink.go index 81d15a1485d..f49423c732c 100644 --- a/cdc/sink/dmlsink/txn/txn_dml_sink.go +++ b/cdc/sink/dmlsink/txn/txn_dml_sink.go @@ -160,11 +160,6 @@ func (s *dmlSink) WriteEvents(txnEvents ...*dmlsink.TxnCallbackableEvent) error return nil } -// Scheme returns the sink scheme. -func (s *dmlSink) Scheme() string { - return s.scheme -} - // Close closes the dmlSink. It won't wait for all pending items backend handled. func (s *dmlSink) Close() { if s.cancel != nil { @@ -181,3 +176,7 @@ func (s *dmlSink) Close() { func (s *dmlSink) Dead() <-chan struct{} { return s.dead } + +func (s *dmlSink) SchemeOption() (string, bool) { + return s.scheme, false +} diff --git a/cdc/sink/tablesink/table_sink_impl.go b/cdc/sink/tablesink/table_sink_impl.go index b351d573f9e..b8c8c1f904a 100644 --- a/cdc/sink/tablesink/table_sink_impl.go +++ b/cdc/sink/tablesink/table_sink_impl.go @@ -138,7 +138,7 @@ func (e *EventTableSink[E, P]) UpdateResolvedTs(resolvedTs model.ResolvedTs) err resolvedCallbackableEvents := make([]*dmlsink.CallbackableEvent[E], 0, len(resolvedEvents)) for _, ev := range resolvedEvents { - if err := ev.TrySplitAndSortUpdateEvent(e.backendSink.Scheme()); err != nil { + if err := ev.TrySplitAndSortUpdateEvent(e.backendSink.SchemeOption()); err != nil { return SinkInternalError{err} } // We have to record the event ID for the callback. diff --git a/cdc/sink/tablesink/table_sink_impl_test.go b/cdc/sink/tablesink/table_sink_impl_test.go index a3f1dd7579a..58b36a2dbb2 100644 --- a/cdc/sink/tablesink/table_sink_impl_test.go +++ b/cdc/sink/tablesink/table_sink_impl_test.go @@ -42,10 +42,6 @@ func (m *mockEventSink) WriteEvents(rows ...*dmlsink.TxnCallbackableEvent) error return nil } -func (m *mockEventSink) Scheme() string { - return sink.BlackHoleScheme -} - func (m *mockEventSink) Close() { close(m.dead) } @@ -54,6 +50,10 @@ func (m *mockEventSink) Dead() <-chan struct{} { return m.dead } +func (m *mockEventSink) SchemeOption() (string, bool) { + return sink.BlackHoleScheme, false +} + // acknowledge the txn events by call the callback function. func (m *mockEventSink) acknowledge(commitTs uint64) []*dmlsink.TxnCallbackableEvent { var droppedEvents []*dmlsink.TxnCallbackableEvent diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 1a750b076dc..2876e6554ad 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1067,6 +1067,57 @@ var doc = `{ } } }, + "/api/v2/changefeeds/{changefeed_id}/synced": { + "get": { + "description": "get the synced status of a changefeed", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "changefeed", + "v2" + ], + "summary": "Get synced status", + "parameters": [ + { + "type": "string", + "description": "changefeed_id", + "name": "changefeed_id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "default", + "name": "namespace", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/v2.SyncedStatus" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/model.HTTPError" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/model.HTTPError" + } + } + } + } + }, "/api/v2/health": { "get": { "description": "Check the health status of a TiCDC cluster", @@ -1328,12 +1379,28 @@ var doc = `{ "config.CloudStorageConfig": { "type": "object", "properties": { + "file-cleanup-cron-spec": { + "type": "string" + }, + "file-expiration-days": { + "type": "integer" + }, "file-size": { "type": "integer" }, + "flush-concurrency": { + "type": "integer" + }, "flush-interval": { "type": "string" }, + "output-column-id": { + "type": "boolean" + }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "worker-count": { "type": "integer" } @@ -1440,6 +1507,10 @@ var doc = `{ "max-message-bytes": { "type": "integer" }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "partition-num": { "type": "integer" }, @@ -1585,6 +1656,10 @@ var doc = `{ "$ref": "#/definitions/config.ColumnSelector" } }, + "content-compatible": { + "description": "ContentCompatible is only available when the downstream is MQ.", + "type": "boolean" + }, "csv": { "$ref": "#/definitions/config.CSVConfig" }, @@ -2107,12 +2182,27 @@ var doc = `{ "v2.CloudStorageConfig": { "type": "object", "properties": { + "file_cleanup_cron_spec": { + "type": "string" + }, + "file_expiration_days": { + "type": "integer" + }, "file_size": { "type": "integer" }, + "flush_concurrency": { + "type": "integer" + }, "flush_interval": { "type": "string" }, + "output_column_id": { + "type": "boolean" + }, + "output_raw_change_event": { + "type": "boolean" + }, "worker_count": { "type": "integer" } @@ -2158,15 +2248,33 @@ var doc = `{ "v2.ConsistentConfig": { "type": "object", "properties": { + "compression": { + "type": "string" + }, + "encoding_worker_num": { + "type": "integer" + }, + "flush_concurrency": { + "type": "integer" + }, "flush_interval": { "type": "integer" }, + "flush_worker_num": { + "type": "integer" + }, "level": { "type": "string" }, "max_log_size": { "type": "integer" }, + "memory_usage": { + "$ref": "#/definitions/v2.ConsistentMemoryUsage" + }, + "meta_flush_interval": { + "type": "integer" + }, "storage": { "type": "string" }, @@ -2175,6 +2283,17 @@ var doc = `{ } } }, + "v2.ConsistentMemoryUsage": { + "type": "object", + "properties": { + "event_cache_percentage": { + "type": "integer" + }, + "memory_quota_percentage": { + "type": "integer" + } + } + }, "v2.DispatchRule": { "type": "object", "properties": { @@ -2294,6 +2413,9 @@ var doc = `{ } } }, + "v2.JSONDuration": { + "type": "object" + }, "v2.KafkaConfig": { "type": "object", "properties": { @@ -2336,6 +2458,9 @@ var doc = `{ "max_message_bytes": { "type": "integer" }, + "output_raw_change_event": { + "type": "boolean" + }, "partition_num": { "type": "integer" }, @@ -2516,6 +2641,9 @@ var doc = `{ "case_sensitive": { "type": "boolean" }, + "changefeed_error_stuck_duration": { + "$ref": "#/definitions/v2.JSONDuration" + }, "check_gc_safe_point": { "type": "boolean" }, @@ -2552,11 +2680,17 @@ var doc = `{ "sink": { "$ref": "#/definitions/v2.SinkConfig" }, + "sql_mode": { + "type": "string" + }, "sync_point_interval": { "type": "string" }, "sync_point_retention": { "type": "string" + }, + "synced_status": { + "$ref": "#/definitions/v2.SyncedStatusConfig" } } }, @@ -2647,6 +2781,9 @@ var doc = `{ "$ref": "#/definitions/v2.ColumnSelector" } }, + "content_compatible": { + "type": "boolean" + }, "csv": { "$ref": "#/definitions/v2.CSVConfig" }, @@ -2697,6 +2834,42 @@ var doc = `{ } } }, + "v2.SyncedStatus": { + "type": "object", + "properties": { + "info": { + "type": "string" + }, + "last_synced_ts": { + "type": "string" + }, + "now_ts": { + "type": "string" + }, + "puller_resolved_ts": { + "type": "string" + }, + "sink_checkpoint_ts": { + "type": "string" + }, + "synced": { + "type": "boolean" + } + } + }, + "v2.SyncedStatusConfig": { + "type": "object", + "properties": { + "checkpoint_interval": { + "description": "The maximum interval between latest checkpoint ts and now or\nbetween latest sink's checkpoint ts and puller's checkpoint ts required to reach synced state", + "type": "integer" + }, + "synced_check_interval": { + "description": "The minimum interval between the latest synced ts and now required to reach synced state", + "type": "integer" + } + } + }, "v2.Table": { "type": "object", "properties": { diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index b24d3f3cbd5..1907d3c898c 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1048,6 +1048,57 @@ } } }, + "/api/v2/changefeeds/{changefeed_id}/synced": { + "get": { + "description": "get the synced status of a changefeed", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "changefeed", + "v2" + ], + "summary": "Get synced status", + "parameters": [ + { + "type": "string", + "description": "changefeed_id", + "name": "changefeed_id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "default", + "name": "namespace", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/v2.SyncedStatus" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/model.HTTPError" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/model.HTTPError" + } + } + } + } + }, "/api/v2/health": { "get": { "description": "Check the health status of a TiCDC cluster", @@ -1309,12 +1360,28 @@ "config.CloudStorageConfig": { "type": "object", "properties": { + "file-cleanup-cron-spec": { + "type": "string" + }, + "file-expiration-days": { + "type": "integer" + }, "file-size": { "type": "integer" }, + "flush-concurrency": { + "type": "integer" + }, "flush-interval": { "type": "string" }, + "output-column-id": { + "type": "boolean" + }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "worker-count": { "type": "integer" } @@ -1421,6 +1488,10 @@ "max-message-bytes": { "type": "integer" }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "partition-num": { "type": "integer" }, @@ -1566,6 +1637,10 @@ "$ref": "#/definitions/config.ColumnSelector" } }, + "content-compatible": { + "description": "ContentCompatible is only available when the downstream is MQ.", + "type": "boolean" + }, "csv": { "$ref": "#/definitions/config.CSVConfig" }, @@ -2088,12 +2163,27 @@ "v2.CloudStorageConfig": { "type": "object", "properties": { + "file_cleanup_cron_spec": { + "type": "string" + }, + "file_expiration_days": { + "type": "integer" + }, "file_size": { "type": "integer" }, + "flush_concurrency": { + "type": "integer" + }, "flush_interval": { "type": "string" }, + "output_column_id": { + "type": "boolean" + }, + "output_raw_change_event": { + "type": "boolean" + }, "worker_count": { "type": "integer" } @@ -2139,15 +2229,33 @@ "v2.ConsistentConfig": { "type": "object", "properties": { + "compression": { + "type": "string" + }, + "encoding_worker_num": { + "type": "integer" + }, + "flush_concurrency": { + "type": "integer" + }, "flush_interval": { "type": "integer" }, + "flush_worker_num": { + "type": "integer" + }, "level": { "type": "string" }, "max_log_size": { "type": "integer" }, + "memory_usage": { + "$ref": "#/definitions/v2.ConsistentMemoryUsage" + }, + "meta_flush_interval": { + "type": "integer" + }, "storage": { "type": "string" }, @@ -2156,6 +2264,17 @@ } } }, + "v2.ConsistentMemoryUsage": { + "type": "object", + "properties": { + "event_cache_percentage": { + "type": "integer" + }, + "memory_quota_percentage": { + "type": "integer" + } + } + }, "v2.DispatchRule": { "type": "object", "properties": { @@ -2275,6 +2394,9 @@ } } }, + "v2.JSONDuration": { + "type": "object" + }, "v2.KafkaConfig": { "type": "object", "properties": { @@ -2317,6 +2439,9 @@ "max_message_bytes": { "type": "integer" }, + "output_raw_change_event": { + "type": "boolean" + }, "partition_num": { "type": "integer" }, @@ -2497,6 +2622,9 @@ "case_sensitive": { "type": "boolean" }, + "changefeed_error_stuck_duration": { + "$ref": "#/definitions/v2.JSONDuration" + }, "check_gc_safe_point": { "type": "boolean" }, @@ -2533,11 +2661,17 @@ "sink": { "$ref": "#/definitions/v2.SinkConfig" }, + "sql_mode": { + "type": "string" + }, "sync_point_interval": { "type": "string" }, "sync_point_retention": { "type": "string" + }, + "synced_status": { + "$ref": "#/definitions/v2.SyncedStatusConfig" } } }, @@ -2628,6 +2762,9 @@ "$ref": "#/definitions/v2.ColumnSelector" } }, + "content_compatible": { + "type": "boolean" + }, "csv": { "$ref": "#/definitions/v2.CSVConfig" }, @@ -2678,6 +2815,42 @@ } } }, + "v2.SyncedStatus": { + "type": "object", + "properties": { + "info": { + "type": "string" + }, + "last_synced_ts": { + "type": "string" + }, + "now_ts": { + "type": "string" + }, + "puller_resolved_ts": { + "type": "string" + }, + "sink_checkpoint_ts": { + "type": "string" + }, + "synced": { + "type": "boolean" + } + } + }, + "v2.SyncedStatusConfig": { + "type": "object", + "properties": { + "checkpoint_interval": { + "description": "The maximum interval between latest checkpoint ts and now or\nbetween latest sink's checkpoint ts and puller's checkpoint ts required to reach synced state", + "type": "integer" + }, + "synced_check_interval": { + "description": "The minimum interval between the latest synced ts and now required to reach synced state", + "type": "integer" + } + } + }, "v2.Table": { "type": "object", "properties": { diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index a311949934f..72a3be5ad0a 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -19,10 +19,22 @@ definitions: type: object config.CloudStorageConfig: properties: + file-cleanup-cron-spec: + type: string + file-expiration-days: + type: integer file-size: type: integer + flush-concurrency: + type: integer flush-interval: type: string + output-column-id: + type: boolean + output-raw-change-event: + description: OutputRawChangeEvent controls whether to split the update pk/uk + events. + type: boolean worker-count: type: integer type: object @@ -95,6 +107,10 @@ definitions: $ref: '#/definitions/config.LargeMessageHandleConfig' max-message-bytes: type: integer + output-raw-change-event: + description: OutputRawChangeEvent controls whether to split the update pk/uk + events. + type: boolean partition-num: type: integer read-timeout: @@ -193,6 +209,9 @@ definitions: items: $ref: '#/definitions/config.ColumnSelector' type: array + content-compatible: + description: ContentCompatible is only available when the downstream is MQ. + type: boolean csv: $ref: '#/definitions/config.CSVConfig' date-separator: @@ -547,10 +566,20 @@ definitions: type: object v2.CloudStorageConfig: properties: + file_cleanup_cron_spec: + type: string + file_expiration_days: + type: integer file_size: type: integer + flush_concurrency: + type: integer flush_interval: type: string + output_column_id: + type: boolean + output_raw_change_event: + type: boolean worker_count: type: integer type: object @@ -580,17 +609,36 @@ definitions: type: object v2.ConsistentConfig: properties: + compression: + type: string + encoding_worker_num: + type: integer + flush_concurrency: + type: integer flush_interval: type: integer + flush_worker_num: + type: integer level: type: string max_log_size: type: integer + memory_usage: + $ref: '#/definitions/v2.ConsistentMemoryUsage' + meta_flush_interval: + type: integer storage: type: string use_file_backend: type: boolean type: object + v2.ConsistentMemoryUsage: + properties: + event_cache_percentage: + type: integer + memory_quota_percentage: + type: integer + type: object v2.DispatchRule: properties: matcher: @@ -671,6 +719,8 @@ definitions: integrity_check_level: type: string type: object + v2.JSONDuration: + type: object v2.KafkaConfig: properties: auto_create_topic: @@ -699,6 +749,8 @@ definitions: $ref: '#/definitions/v2.LargeMessageHandleConfig' max_message_bytes: type: integer + output_raw_change_event: + type: boolean partition_num: type: integer read_timeout: @@ -817,6 +869,8 @@ definitions: type: boolean case_sensitive: type: boolean + changefeed_error_stuck_duration: + $ref: '#/definitions/v2.JSONDuration' check_gc_safe_point: type: boolean consistent: @@ -841,10 +895,14 @@ definitions: $ref: '#/definitions/v2.ChangefeedSchedulerConfig' sink: $ref: '#/definitions/v2.SinkConfig' + sql_mode: + type: string sync_point_interval: type: string sync_point_retention: type: string + synced_status: + $ref: '#/definitions/v2.SyncedStatusConfig' type: object v2.ResumeChangefeedConfig: properties: @@ -903,6 +961,8 @@ definitions: items: $ref: '#/definitions/v2.ColumnSelector' type: array + content_compatible: + type: boolean csv: $ref: '#/definitions/v2.CSVConfig' date_separator: @@ -936,6 +996,33 @@ definitions: transaction_atomicity: type: string type: object + v2.SyncedStatus: + properties: + info: + type: string + last_synced_ts: + type: string + now_ts: + type: string + puller_resolved_ts: + type: string + sink_checkpoint_ts: + type: string + synced: + type: boolean + type: object + v2.SyncedStatusConfig: + properties: + checkpoint_interval: + description: |- + The maximum interval between latest checkpoint ts and now or + between latest sink's checkpoint ts and puller's checkpoint ts required to reach synced state + type: integer + synced_check_interval: + description: The minimum interval between the latest synced ts and now required + to reach synced state + type: integer + type: object v2.Table: properties: database_name: @@ -1639,6 +1726,40 @@ paths: tags: - changefeed - v2 + /api/v2/changefeeds/{changefeed_id}/synced: + get: + consumes: + - application/json + description: get the synced status of a changefeed + parameters: + - description: changefeed_id + in: path + name: changefeed_id + required: true + type: string + - description: default + in: query + name: namespace + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/v2.SyncedStatus' + "400": + description: Bad Request + schema: + $ref: '#/definitions/model.HTTPError' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/model.HTTPError' + summary: Get synced status + tags: + - changefeed + - v2 /api/v2/health: get: description: Check the health status of a TiCDC cluster diff --git a/errors.toml b/errors.toml index 5ca42ababef..e2f8089cc7f 100755 --- a/errors.toml +++ b/errors.toml @@ -816,6 +816,11 @@ error = ''' cdc server is not ready ''' +["CDC:ErrSinkIncompatibleConfig"] +error = ''' +incompatible configuration %s +''' + ["CDC:ErrSinkInvalidConfig"] error = ''' sink config invalid diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 28db62a8ced..13e59350ca0 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -310,42 +310,52 @@ type CodecConfig struct { // KafkaConfig represents a kafka sink configuration type KafkaConfig struct { - PartitionNum *int32 `toml:"partition-num" json:"partition-num,omitempty"` - ReplicationFactor *int16 `toml:"replication-factor" json:"replication-factor,omitempty"` - KafkaVersion *string `toml:"kafka-version" json:"kafka-version,omitempty"` - MaxMessageBytes *int `toml:"max-message-bytes" json:"max-message-bytes,omitempty"` - Compression *string `toml:"compression" json:"compression,omitempty"` - KafkaClientID *string `toml:"kafka-client-id" json:"kafka-client-id,omitempty"` - AutoCreateTopic *bool `toml:"auto-create-topic" json:"auto-create-topic,omitempty"` - DialTimeout *string `toml:"dial-timeout" json:"dial-timeout,omitempty"` - WriteTimeout *string `toml:"write-timeout" json:"write-timeout,omitempty"` - ReadTimeout *string `toml:"read-timeout" json:"read-timeout,omitempty"` - RequiredAcks *int `toml:"required-acks" json:"required-acks,omitempty"` - SASLUser *string `toml:"sasl-user" json:"sasl-user,omitempty"` - SASLPassword *string `toml:"sasl-password" json:"sasl-password,omitempty"` - SASLMechanism *string `toml:"sasl-mechanism" json:"sasl-mechanism,omitempty"` - SASLGssAPIAuthType *string `toml:"sasl-gssapi-auth-type" json:"sasl-gssapi-auth-type,omitempty"` - SASLGssAPIKeytabPath *string `toml:"sasl-gssapi-keytab-path" json:"sasl-gssapi-keytab-path,omitempty"` - SASLGssAPIKerberosConfigPath *string `toml:"sasl-gssapi-kerberos-config-path" json:"sasl-gssapi-kerberos-config-path,omitempty"` - SASLGssAPIServiceName *string `toml:"sasl-gssapi-service-name" json:"sasl-gssapi-service-name,omitempty"` - SASLGssAPIUser *string `toml:"sasl-gssapi-user" json:"sasl-gssapi-user,omitempty"` - SASLGssAPIPassword *string `toml:"sasl-gssapi-password" json:"sasl-gssapi-password,omitempty"` - SASLGssAPIRealm *string `toml:"sasl-gssapi-realm" json:"sasl-gssapi-realm,omitempty"` - SASLGssAPIDisablePafxfast *bool `toml:"sasl-gssapi-disable-pafxfast" json:"sasl-gssapi-disable-pafxfast,omitempty"` - SASLOAuthClientID *string `toml:"sasl-oauth-client-id" json:"sasl-oauth-client-id,omitempty"` - SASLOAuthClientSecret *string `toml:"sasl-oauth-client-secret" json:"sasl-oauth-client-secret,omitempty"` - SASLOAuthTokenURL *string `toml:"sasl-oauth-token-url" json:"sasl-oauth-token-url,omitempty"` - SASLOAuthScopes []string `toml:"sasl-oauth-scopes" json:"sasl-oauth-scopes,omitempty"` - SASLOAuthGrantType *string `toml:"sasl-oauth-grant-type" json:"sasl-oauth-grant-type,omitempty"` - SASLOAuthAudience *string `toml:"sasl-oauth-audience" json:"sasl-oauth-audience,omitempty"` - EnableTLS *bool `toml:"enable-tls" json:"enable-tls,omitempty"` - CA *string `toml:"ca" json:"ca,omitempty"` - Cert *string `toml:"cert" json:"cert,omitempty"` - Key *string `toml:"key" json:"key,omitempty"` - InsecureSkipVerify *bool `toml:"insecure-skip-verify" json:"insecure-skip-verify,omitempty"` - CodecConfig *CodecConfig `toml:"codec-config" json:"codec-config,omitempty"` - - LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` + PartitionNum *int32 `toml:"partition-num" json:"partition-num,omitempty"` + ReplicationFactor *int16 `toml:"replication-factor" json:"replication-factor,omitempty"` + KafkaVersion *string `toml:"kafka-version" json:"kafka-version,omitempty"` + MaxMessageBytes *int `toml:"max-message-bytes" json:"max-message-bytes,omitempty"` + Compression *string `toml:"compression" json:"compression,omitempty"` + KafkaClientID *string `toml:"kafka-client-id" json:"kafka-client-id,omitempty"` + AutoCreateTopic *bool `toml:"auto-create-topic" json:"auto-create-topic,omitempty"` + DialTimeout *string `toml:"dial-timeout" json:"dial-timeout,omitempty"` + WriteTimeout *string `toml:"write-timeout" json:"write-timeout,omitempty"` + ReadTimeout *string `toml:"read-timeout" json:"read-timeout,omitempty"` + RequiredAcks *int `toml:"required-acks" json:"required-acks,omitempty"` + SASLUser *string `toml:"sasl-user" json:"sasl-user,omitempty"` + SASLPassword *string `toml:"sasl-password" json:"sasl-password,omitempty"` + SASLMechanism *string `toml:"sasl-mechanism" json:"sasl-mechanism,omitempty"` + SASLGssAPIAuthType *string `toml:"sasl-gssapi-auth-type" json:"sasl-gssapi-auth-type,omitempty"` + SASLGssAPIKeytabPath *string `toml:"sasl-gssapi-keytab-path" json:"sasl-gssapi-keytab-path,omitempty"` + SASLGssAPIKerberosConfigPath *string `toml:"sasl-gssapi-kerberos-config-path" json:"sasl-gssapi-kerberos-config-path,omitempty"` + SASLGssAPIServiceName *string `toml:"sasl-gssapi-service-name" json:"sasl-gssapi-service-name,omitempty"` + SASLGssAPIUser *string `toml:"sasl-gssapi-user" json:"sasl-gssapi-user,omitempty"` + SASLGssAPIPassword *string `toml:"sasl-gssapi-password" json:"sasl-gssapi-password,omitempty"` + SASLGssAPIRealm *string `toml:"sasl-gssapi-realm" json:"sasl-gssapi-realm,omitempty"` + SASLGssAPIDisablePafxfast *bool `toml:"sasl-gssapi-disable-pafxfast" json:"sasl-gssapi-disable-pafxfast,omitempty"` + SASLOAuthClientID *string `toml:"sasl-oauth-client-id" json:"sasl-oauth-client-id,omitempty"` + SASLOAuthClientSecret *string `toml:"sasl-oauth-client-secret" json:"sasl-oauth-client-secret,omitempty"` + SASLOAuthTokenURL *string `toml:"sasl-oauth-token-url" json:"sasl-oauth-token-url,omitempty"` + SASLOAuthScopes []string `toml:"sasl-oauth-scopes" json:"sasl-oauth-scopes,omitempty"` + SASLOAuthGrantType *string `toml:"sasl-oauth-grant-type" json:"sasl-oauth-grant-type,omitempty"` + SASLOAuthAudience *string `toml:"sasl-oauth-audience" json:"sasl-oauth-audience,omitempty"` + EnableTLS *bool `toml:"enable-tls" json:"enable-tls,omitempty"` + CA *string `toml:"ca" json:"ca,omitempty"` + Cert *string `toml:"cert" json:"cert,omitempty"` + Key *string `toml:"key" json:"key,omitempty"` + InsecureSkipVerify *bool `toml:"insecure-skip-verify" json:"insecure-skip-verify,omitempty"` + CodecConfig *CodecConfig `toml:"codec-config" json:"codec-config,omitempty"` + LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` + + // OutputRawChangeEvent controls whether to split the update pk/uk events. + OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` +} + +// GetOutputRawChangeEvent returns the value of OutputRawChangeEvent +func (k *KafkaConfig) GetOutputRawChangeEvent() bool { + if k == nil || k.OutputRawChangeEvent == nil { + return false + } + return *k.OutputRawChangeEvent } // MaskSensitiveData masks sensitive data in KafkaConfig @@ -388,6 +398,17 @@ type CloudStorageConfig struct { FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"` FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"` FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` + + // OutputRawChangeEvent controls whether to split the update pk/uk events. + OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` +} + +// GetOutputRawChangeEvent returns the value of OutputRawChangeEvent +func (c *CloudStorageConfig) GetOutputRawChangeEvent() bool { + if c == nil || c.OutputRawChangeEvent == nil { + return false + } + return *c.OutputRawChangeEvent } func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { @@ -492,21 +513,41 @@ func (s *SinkConfig) validateAndAdjustSinkURI(sinkURI *url.URL) error { return err } - // Validate that protocol is compatible with the scheme. For testing purposes, - // any protocol should be legal for blackhole. - if sink.IsMQScheme(sinkURI.Scheme) || sink.IsStorageScheme(sinkURI.Scheme) { - _, err := ParseSinkProtocolFromString(s.Protocol) - if err != nil { - return err - } - } else if sink.IsMySQLCompatibleScheme(sinkURI.Scheme) && s.Protocol != "" { + log.Info("succeed to parse parameter from sink uri", + zap.String("protocol", s.Protocol), + zap.String("txnAtomicity", string(s.TxnAtomicity))) + + // Check that protocol config is compatible with the scheme. + if sink.IsMySQLCompatibleScheme(sinkURI.Scheme) && s.Protocol != "" { return cerror.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("protocol %s "+ "is incompatible with %s scheme", s.Protocol, sinkURI.Scheme)) } + // For testing purposes, any protocol should be legal for blackhole. + if sink.IsMQScheme(sinkURI.Scheme) || sink.IsStorageScheme(sinkURI.Scheme) { + return s.ValidateProtocol(sinkURI.Scheme) + } + return nil +} - log.Info("succeed to parse parameter from sink uri", - zap.String("protocol", s.Protocol), - zap.String("txnAtomicity", string(s.TxnAtomicity))) +// ValidateProtocol validates the protocol configuration. +func (s *SinkConfig) ValidateProtocol(scheme string) error { + protocol, err := ParseSinkProtocolFromString(s.Protocol) + if err != nil { + return err + } + + outputRawChangeEvent := false + switch scheme { + case sink.KafkaScheme, sink.KafkaSSLScheme: + outputRawChangeEvent = s.KafkaConfig.GetOutputRawChangeEvent() + default: + outputRawChangeEvent = s.CloudStorageConfig.GetOutputRawChangeEvent() + } + + if outputRawChangeEvent { + // TODO: return error if we do not need to keep backward compatibility. + log.Warn(fmt.Sprintf("TiCDC will not split the update pk/uk events if output-raw-change-event is true(scheme: %s, protocol: %s).", scheme, protocol)) + } return nil } diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 27045928b90..26d77351342 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -332,6 +332,10 @@ var ( "sink config invalid", errors.RFCCodeText("CDC:ErrSinkInvalidConfig"), ) + ErrSinkIncompatibleConfig = errors.Normalize( + "incompatible configuration %s", + errors.RFCCodeText("CDC:ErrSinkIncompatibleConfig"), + ) ErrCraftCodecInvalidData = errors.Normalize( "craft codec invalid data", errors.RFCCodeText("CDC:ErrCraftCodecInvalidData"), diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed1.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed1.toml new file mode 100644 index 00000000000..ff4fe5d6765 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed1.toml @@ -0,0 +1,27 @@ +# Case 1: default configuration where `csv.output-old-value=false` and `sink.cloud-storage-config.output-raw-change-event=false` +# Split and sort update pk/uk events in table sink. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = false + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = false \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed2.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed2.toml new file mode 100644 index 00000000000..96057f4dd48 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed2.toml @@ -0,0 +1,26 @@ +# Case 2: Split all update events in csv encoder. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = true + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = true \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed3.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed3.toml new file mode 100644 index 00000000000..033e2bd1a4f --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed3.toml @@ -0,0 +1,26 @@ +# Case 3: Don't split any update event + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = true + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = false \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed4.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed4.toml new file mode 100644 index 00000000000..112cfe4a012 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed4.toml @@ -0,0 +1,26 @@ +# Case 4: Split and sort update pk/uk events in table sink. Split other update events in csv encoder. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = false + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = true \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/conf/diff_config.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/diff_config.toml new file mode 100644 index 00000000000..8edf2368fa4 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/csv_storage_update_pk_clustered/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/data/prepare.sql b/tests/integration_tests/csv_storage_update_pk_clustered/data/prepare.sql new file mode 100644 index 00000000000..506a6e75765 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/data/prepare.sql @@ -0,0 +1,27 @@ +drop database if exists `test`; +create database `test`; +use `test`; + +CREATE TABLE `update_pk` ( + `id` int PRIMARY KEY CLUSTERED, + `pad` varchar(100) NOT NULL +); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (1, 'example1'), (2, 'example2'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (10, 'example10'), (20, 'example20'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (100, 'example100'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (1000, 'example1000'); + +SHOW INDEX FROM update_pk; + +CREATE TABLE `update_uk` ( + `id` int PRIMARY KEY CLUSTERED, + `uk` int NOT NULL, + `pad` varchar(100) NOT NULL, + UNIQUE KEY `uk` (`uk`) +); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1, 1, 'example1'), (2, 2, 'example2'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (10, 10, 'example10'), (20, 20, 'example20'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (100, 100, 'example100'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1000, 1000, 'example1000'); + +SHOW INDEX FROM update_uk; \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/data/run.sql b/tests/integration_tests/csv_storage_update_pk_clustered/data/run.sql new file mode 100644 index 00000000000..86e7d7e7d77 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/data/run.sql @@ -0,0 +1,40 @@ +USE `test`; + +-- update_pk -- + +BEGIN; -- Note: multi-row exchange +UPDATE update_pk SET id = 3 WHERE id = 1; +UPDATE update_pk SET id = 1 WHERE id = 2; +UPDATE update_pk SET id = 2 WHERE id = 3; +COMMIT; + +BEGIN; -- Note: multi-row update with no order dependency +UPDATE update_pk SET id = 30 WHERE id = 10; +UPDATE update_pk SET id = 40 WHERE id = 20; +COMMIT; + +BEGIN; -- Single row update +UPDATE update_pk SET id = 200 WHERE id = 100; +COMMIT; + +-- Normal update +UPDATE update_pk SET pad='example1001' WHERE id = 1000; + +-- update_uk -- +BEGIN; -- Note: multi-row exchange +UPDATE update_uk SET uk = 3 WHERE uk = 1; +UPDATE update_uk SET uk = 1 WHERE uk = 2; +UPDATE update_uk SET uk = 2 WHERE uk = 3; +COMMIT; + +BEGIN; -- Note: multi-row update with no order dependency +UPDATE update_uk SET uk = 30 WHERE uk = 10; +UPDATE update_uk SET uk = 40 WHERE uk = 20; +COMMIT; + +BEGIN; -- Single row update +UPDATE update_uk SET uk = 200 WHERE uk = 100; +COMMIT; + +-- Normal update +UPDATE update_uk SET pad='example1001' WHERE uk = 1000; \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_pk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_pk.res new file mode 100644 index 00000000000..ad6016c8059 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_pk.res @@ -0,0 +1,22 @@ +"I","update_pk","test",450253245302439944,1,"example1" +"I","update_pk","test",450253245302439944,2,"example2" +"I","update_pk","test",450253245302439946,10,"example10" +"I","update_pk","test",450253245302439946,20,"example20" +"I","update_pk","test",450253245302439947,100,"example100" +"I","update_pk","test",450253245302439948,1000,"example1000" + +# translate to normal update in upstream +"U","update_pk","test",450253245485940746,1,"example2" +"U","update_pk","test",450253245485940746,2,"example1" + +# split and sort in upstream +"D","update_pk","test",450253245485940749,10,"example10" +"D","update_pk","test",450253245485940749,20,"example20" +"I","update_pk","test",450253245485940749,30,"example10" +"I","update_pk","test",450253245485940749,40,"example20" + +# split and sort in upstream +"D","update_pk","test",450253245485940752,100,"example100" +"I","update_pk","test",450253245485940752,200,"example100" + +"U","update_pk","test",450253245485940753,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_uk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_uk.res new file mode 100644 index 00000000000..ebe3a635252 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_uk.res @@ -0,0 +1,24 @@ +"I","update_uk","test",450253245446619144,1,1,"example1" +"I","update_uk","test",450253245446619144,2,2,"example2" +"I","update_uk","test",450253245446619146,10,10,"example10" +"I","update_uk","test",450253245446619146,20,20,"example20" +"I","update_uk","test",450253245446619147,100,100,"example100" +"I","update_uk","test",450253245446619148,1000,1000,"example1000" + +# split and sort in table sink +"D","update_uk","test",450253245499047940,1,1,"example1" +"D","update_uk","test",450253245499047940,2,2,"example2" +"I","update_uk","test",450253245499047940,1,2,"example1" +"I","update_uk","test",450253245499047940,2,1,"example2" + +# split and sort in table sink +"D","update_uk","test",450253245499047943,10,10,"example10" +"D","update_uk","test",450253245499047943,20,20,"example20" +"I","update_uk","test",450253245499047943,10,30,"example10" +"I","update_uk","test",450253245499047943,20,40,"example20" + +# split and sort in table sink +"D","update_uk","test",450253245499047946,100,100,"example100" +"I","update_uk","test",450253245499047946,100,200,"example100" + +"U","update_uk","test",450253245512155140,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_pk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_pk.res new file mode 100644 index 00000000000..fc3ea45b65d --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_pk.res @@ -0,0 +1,26 @@ +"I","update_pk","test",450253245302439944,false,1,"example1" +"I","update_pk","test",450253245302439944,false,2,"example2" +"I","update_pk","test",450253245302439946,false,10,"example10" +"I","update_pk","test",450253245302439946,false,20,"example20" +"I","update_pk","test",450253245302439947,false,100,"example100" +"I","update_pk","test",450253245302439948,false,1000,"example1000" + +# translate to normal update in upstream, split in csv encoder +"D","update_pk","test",450253245485940746,true,1,"example1" +"I","update_pk","test",450253245485940746,true,1,"example2" +"D","update_pk","test",450253245485940746,true,2,"example2" +"I","update_pk","test",450253245485940746,true,2,"example1" + +# split and sort in upstream +"D","update_pk","test",450253245485940749,false,10,"example10" +"D","update_pk","test",450253245485940749,false,20,"example20" +"I","update_pk","test",450253245485940749,false,30,"example10" +"I","update_pk","test",450253245485940749,false,40,"example20" + +# split and sort in upstream +"D","update_pk","test",450253245485940752,false,100,"example100" +"I","update_pk","test",450253245485940752,false,200,"example100" + +# normal update event, split in csv encoder +"D","update_pk","test",450253245485940753,true,1000,"example1000" +"I","update_pk","test",450253245485940753,true,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_uk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_uk.res new file mode 100644 index 00000000000..5e7f2ce0e71 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_uk.res @@ -0,0 +1,26 @@ +"I","update_uk","test",450253245446619144,false,1,1,"example1" +"I","update_uk","test",450253245446619144,false,2,2,"example2" +"I","update_uk","test",450253245446619146,false,10,10,"example10" +"I","update_uk","test",450253245446619146,false,20,20,"example20" +"I","update_uk","test",450253245446619147,false,100,100,"example100" +"I","update_uk","test",450253245446619148,false,1000,1000,"example1000" + +# split in csv encoder, data is consistent since delete by pk +"D","update_uk","test",450253245499047940,true,1,1,"example1" +"I","update_uk","test",450253245499047940,true,1,2,"example1" +"D","update_uk","test",450253245499047940,true,2,2,"example2" +"I","update_uk","test",450253245499047940,true,2,1,"example2" + +# split in csv encoder +"D","update_uk","test",450253245499047943,true,10,10,"example10" +"I","update_uk","test",450253245499047943,true,10,30,"example10" +"D","update_uk","test",450253245499047943,true,20,20,"example20" +"I","update_uk","test",450253245499047943,true,20,40,"example20" + +# split in csv encoder +"D","update_uk","test",450253245499047946,true,100,100,"example100" +"I","update_uk","test",450253245499047946,true,100,200,"example100" + +# normal update event, also split in csv encoder +"D","update_uk","test",450253245512155140,true,1000,1000,"example1000" +"I","update_uk","test",450253245512155140,true,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_pk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_pk.res new file mode 100644 index 00000000000..9f4448e5d9a --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_pk.res @@ -0,0 +1,24 @@ +"I","update_pk","test",450253245302439944,1,"example1" +"I","update_pk","test",450253245302439944,2,"example2" +"I","update_pk","test",450253245302439946,10,"example10" +"I","update_pk","test",450253245302439946,20,"example20" +"I","update_pk","test",450253245302439947,100,"example100" +"I","update_pk","test",450253245302439948,1000,"example1000" + +# output raw change event +# translate to normal update in upstream +"U","update_pk","test",450253245485940746,1,"example2" +"U","update_pk","test",450253245485940746,2,"example1" + +# split and sort in upstream +"D","update_pk","test",450253245485940749,10,"example10" +"D","update_pk","test",450253245485940749,20,"example20" +"I","update_pk","test",450253245485940749,30,"example10" +"I","update_pk","test",450253245485940749,40,"example20" + +# split and sort in upstream +"D","update_pk","test",450253245485940752,100,"example100" +"I","update_pk","test",450253245485940752,200,"example100" + +# normal update event +"U","update_pk","test",450253245485940753,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_uk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_uk.res new file mode 100644 index 00000000000..66348746ded --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_uk.res @@ -0,0 +1,14 @@ +"I","update_uk","test",450253245446619144,1,1,"example1" +"I","update_uk","test",450253245446619144,2,2,"example2" +"I","update_uk","test",450253245446619146,10,10,"example10" +"I","update_uk","test",450253245446619146,20,20,"example20" +"I","update_uk","test",450253245446619147,100,100,"example100" +"I","update_uk","test",450253245446619148,1000,1000,"example1000" + +# output raw change event, data is consistent since replace by pk/uk +"U","update_uk","test",450253245499047940,1,2,"example1" +"U","update_uk","test",450253245499047940,2,1,"example2" +"U","update_uk","test",450253245499047943,10,30,"example10" +"U","update_uk","test",450253245499047943,20,40,"example20" +"U","update_uk","test",450253245499047946,100,200,"example100" +"U","update_uk","test",450253245512155140,1000,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_pk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_pk.res new file mode 100644 index 00000000000..fc3ea45b65d --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_pk.res @@ -0,0 +1,26 @@ +"I","update_pk","test",450253245302439944,false,1,"example1" +"I","update_pk","test",450253245302439944,false,2,"example2" +"I","update_pk","test",450253245302439946,false,10,"example10" +"I","update_pk","test",450253245302439946,false,20,"example20" +"I","update_pk","test",450253245302439947,false,100,"example100" +"I","update_pk","test",450253245302439948,false,1000,"example1000" + +# translate to normal update in upstream, split in csv encoder +"D","update_pk","test",450253245485940746,true,1,"example1" +"I","update_pk","test",450253245485940746,true,1,"example2" +"D","update_pk","test",450253245485940746,true,2,"example2" +"I","update_pk","test",450253245485940746,true,2,"example1" + +# split and sort in upstream +"D","update_pk","test",450253245485940749,false,10,"example10" +"D","update_pk","test",450253245485940749,false,20,"example20" +"I","update_pk","test",450253245485940749,false,30,"example10" +"I","update_pk","test",450253245485940749,false,40,"example20" + +# split and sort in upstream +"D","update_pk","test",450253245485940752,false,100,"example100" +"I","update_pk","test",450253245485940752,false,200,"example100" + +# normal update event, split in csv encoder +"D","update_pk","test",450253245485940753,true,1000,"example1000" +"I","update_pk","test",450253245485940753,true,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_uk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_uk.res new file mode 100644 index 00000000000..ea644868640 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_uk.res @@ -0,0 +1,26 @@ +"I","update_uk","test",450253245446619144,false,1,1,"example1" +"I","update_uk","test",450253245446619144,false,2,2,"example2" +"I","update_uk","test",450253245446619146,false,10,10,"example10" +"I","update_uk","test",450253245446619146,false,20,20,"example20" +"I","update_uk","test",450253245446619147,false,100,100,"example100" +"I","update_uk","test",450253245446619148,false,1000,1000,"example1000" + +# split and sort in table sink +"D","update_uk","test",450253245499047940,false,1,1,"example1" +"D","update_uk","test",450253245499047940,false,2,2,"example2" +"I","update_uk","test",450253245499047940,false,1,2,"example1" +"I","update_uk","test",450253245499047940,false,2,1,"example2" + +# split and sort in table sink +"D","update_uk","test",450253245499047943,false,10,10,"example10" +"D","update_uk","test",450253245499047943,false,20,20,"example20" +"I","update_uk","test",450253245499047943,false,10,30,"example10" +"I","update_uk","test",450253245499047943,false,20,40,"example20" + +# split and sort in table sink +"D","update_uk","test",450253245499047946,false,100,100,"example100" +"I","update_uk","test",450253245499047946,false,100,200,"example100" + +# normal update event, split in csv encoder +"D","update_uk","test",450253245512155140,true,1000,1000,"example1000" +"I","update_uk","test",450253245512155140,true,1000,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/run.sh b/tests/integration_tests/csv_storage_update_pk_clustered/run.sh new file mode 100644 index 00000000000..8606518da0a --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/run.sh @@ -0,0 +1,54 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run_changefeed() { + local changefeed_id=$1 + local start_ts=$2 + local expected_split_count=$3 + SINK_URI="file://$WORK_DIR/storage_test/$changefeed_id?flush-interval=5s" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config=$CUR/conf/$changefeed_id.toml -c "$changefeed_id" + + run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/$changefeed_id.toml $changefeed_id + sleep 8 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 + + real_split_count=$(grep "split update event" $WORK_DIR/cdc.log | wc -l) + if [[ $real_split_count -ne $expected_split_count ]]; then + echo "expected split count $expected_split_count, real split count $real_split_count" + exit 1 + fi + run_sql "drop database if exists test" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} +} + +function run() { + if [ "$SINK_TYPE" != "storage" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/run.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + run_changefeed "changefeed1" $start_ts 5 + run_changefeed "changefeed2" $start_ts 5 + run_changefeed "changefeed3" $start_ts 5 + run_changefeed "changefeed4" $start_ts 10 +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed1.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed1.toml new file mode 100644 index 00000000000..ff4fe5d6765 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed1.toml @@ -0,0 +1,27 @@ +# Case 1: default configuration where `csv.output-old-value=false` and `sink.cloud-storage-config.output-raw-change-event=false` +# Split and sort update pk/uk events in table sink. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = false + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = false \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed2.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed2.toml new file mode 100644 index 00000000000..96057f4dd48 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed2.toml @@ -0,0 +1,26 @@ +# Case 2: Split all update events in csv encoder. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = true + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = true \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed3.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed3.toml new file mode 100644 index 00000000000..033e2bd1a4f --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed3.toml @@ -0,0 +1,26 @@ +# Case 3: Don't split any update event + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = true + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = false \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed4.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed4.toml new file mode 100644 index 00000000000..112cfe4a012 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed4.toml @@ -0,0 +1,26 @@ +# Case 4: Split and sort update pk/uk events in table sink. Split other update events in csv encoder. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = false + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = true \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml new file mode 100644 index 00000000000..0714c0c18d9 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/csv_storage_update_pk_nonclustered/sync_diff-/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/data/prepare.sql b/tests/integration_tests/csv_storage_update_pk_nonclustered/data/prepare.sql new file mode 100644 index 00000000000..f3cd4ca4d24 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/data/prepare.sql @@ -0,0 +1,28 @@ +drop database if exists `test`; +create database `test`; +use `test`; + +CREATE TABLE `update_pk` ( + `id` int PRIMARY KEY NONCLUSTERED, + `pad` varchar(100) NOT NULL +); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (1, 'example1'), (2, 'example2'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (10, 'example10'), (20, 'example20'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (100, 'example100'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (1000, 'example1000'); + + +SHOW INDEX FROM update_pk; + +CREATE TABLE `update_uk` ( + `id` int PRIMARY KEY NONCLUSTERED, + `uk` int NOT NULL, + `pad` varchar(100) NOT NULL, + UNIQUE KEY `uk` (`uk`) +); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1, 1, 'example1'), (2, 2, 'example2'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (10, 10, 'example10'), (20, 20, 'example20'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (100, 100, 'example100'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1000, 1000, 'example1000'); + +SHOW INDEX FROM update_uk; \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/data/run.sql b/tests/integration_tests/csv_storage_update_pk_nonclustered/data/run.sql new file mode 100644 index 00000000000..86e7d7e7d77 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/data/run.sql @@ -0,0 +1,40 @@ +USE `test`; + +-- update_pk -- + +BEGIN; -- Note: multi-row exchange +UPDATE update_pk SET id = 3 WHERE id = 1; +UPDATE update_pk SET id = 1 WHERE id = 2; +UPDATE update_pk SET id = 2 WHERE id = 3; +COMMIT; + +BEGIN; -- Note: multi-row update with no order dependency +UPDATE update_pk SET id = 30 WHERE id = 10; +UPDATE update_pk SET id = 40 WHERE id = 20; +COMMIT; + +BEGIN; -- Single row update +UPDATE update_pk SET id = 200 WHERE id = 100; +COMMIT; + +-- Normal update +UPDATE update_pk SET pad='example1001' WHERE id = 1000; + +-- update_uk -- +BEGIN; -- Note: multi-row exchange +UPDATE update_uk SET uk = 3 WHERE uk = 1; +UPDATE update_uk SET uk = 1 WHERE uk = 2; +UPDATE update_uk SET uk = 2 WHERE uk = 3; +COMMIT; + +BEGIN; -- Note: multi-row update with no order dependency +UPDATE update_uk SET uk = 30 WHERE uk = 10; +UPDATE update_uk SET uk = 40 WHERE uk = 20; +COMMIT; + +BEGIN; -- Single row update +UPDATE update_uk SET uk = 200 WHERE uk = 100; +COMMIT; + +-- Normal update +UPDATE update_uk SET pad='example1001' WHERE uk = 1000; \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_pk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_pk.res new file mode 100644 index 00000000000..08f6eedb804 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_pk.res @@ -0,0 +1,24 @@ +"I","update_pk","test",450250823741472787,1,"example1" +"I","update_pk","test",450250823741472787,2,"example2" +"I","update_pk","test",450250823741472790,10,"example10" +"I","update_pk","test",450250823741472790,20,"example20" +"I","update_pk","test",450250823741472791,100,"example100" +"I","update_pk","test",450250823741472792,1000,"example1000" + +# split and sort in table sink +"D","update_pk","test",450250823807270922,1,"example1" +"D","update_pk","test",450250823807270922,2,"example2" +"I","update_pk","test",450250823807270922,2,"example1" +"I","update_pk","test",450250823807270922,1,"example2" + +# split and sort in table sink +"D","update_pk","test",450250823807270925,10,"example10" +"D","update_pk","test",450250823807270925,20,"example20" +"I","update_pk","test",450250823807270925,30,"example10" +"I","update_pk","test",450250823807270925,40,"example20" + +# split and sort in table sink +"D","update_pk","test",450250823807270927,100,"example100" +"I","update_pk","test",450250823807270927,200,"example100" + +"U","update_pk","test",450250823807270928,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_uk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_uk.res new file mode 100644 index 00000000000..b26f2219af2 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_uk.res @@ -0,0 +1,24 @@ +"I","update_uk","test",450250823780794385,1,1,"example1" +"I","update_uk","test",450250823780794385,2,2,"example2" +"I","update_uk","test",450250823780794387,10,10,"example10" +"I","update_uk","test",450250823780794387,20,20,"example20" +"I","update_uk","test",450250823780794389,100,100,"example100" +"I","update_uk","test",450250823780794390,1000,1000,"example1000" + +# split and sort in table sink +"D","update_uk","test",450250823807270931,1,1,"example1" +"D","update_uk","test",450250823807270931,2,2,"example2" +"I","update_uk","test",450250823807270931,1,2,"example1" +"I","update_uk","test",450250823807270931,2,1,"example2" + +# split and sort in table sink +"D","update_uk","test",450250823820115970,10,10,"example10" +"D","update_uk","test",450250823820115970,20,20,"example20" +"I","update_uk","test",450250823820115970,10,30,"example10" +"I","update_uk","test",450250823820115970,20,40,"example20" + +# split and sort in table sink +"D","update_uk","test",450250823820115973,100,100,"example100" +"I","update_uk","test",450250823820115973,100,200,"example100" + +"U","update_uk","test",450250823820115977,1000,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res new file mode 100644 index 00000000000..e2713a94f63 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res @@ -0,0 +1,28 @@ +"I","update_pk","test",450250823741472787,false,1,"example1" +"I","update_pk","test",450250823741472787,false,2,"example2" +"I","update_pk","test",450250823741472790,false,10,"example10" +"I","update_pk","test",450250823741472790,false,20,"example20" +"I","update_pk","test",450250823741472791,false,100,"example100" +"I","update_pk","test",450250823741472792,false,1000,"example1000" + +# split in csv encoder +# DIFF_RES: REPLACE INTO `test`.`update_pk`(`id`,`pad`) VALUES (2,'example1'); +# lost id=2 since delete are not sorted before insert within single txn +"D","update_pk","test",450250823807270922,true,1,"example1" +"I","update_pk","test",450250823807270922,true,2,"example1" +"D","update_pk","test",450250823807270922,true,2,"example2" +"I","update_pk","test",450250823807270922,true,1,"example2" + +# split in csv encoder +"D","update_pk","test",450250823807270925,true,10,"example10" +"I","update_pk","test",450250823807270925,true,30,"example10" +"D","update_pk","test",450250823807270925,true,20,"example20" +"I","update_pk","test",450250823807270925,true,40,"example20" + +# split in csv encoder +"D","update_pk","test",450250823807270927,true,100,"example100" +"I","update_pk","test",450250823807270927,true,200,"example100" + +# normal update event, also split in csv encoder +"D","update_pk","test",450250823807270928,true,1000,"example1000" +"I","update_pk","test",450250823807270928,true,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res new file mode 100644 index 00000000000..1783ee5a0dd --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res @@ -0,0 +1,26 @@ +"I","update_uk","test",450250823780794385,false,1,1,"example1" +"I","update_uk","test",450250823780794385,false,2,2,"example2" +"I","update_uk","test",450250823780794387,false,10,10,"example10" +"I","update_uk","test",450250823780794387,false,20,20,"example20" +"I","update_uk","test",450250823780794389,false,100,100,"example100" +"I","update_uk","test",450250823780794390,false,1000,1000,"example1000" + +# split in csv encoder, data is consistent since delete by pk +"D","update_uk","test",450250823807270931,true,1,1,"example1" +"I","update_uk","test",450250823807270931,true,1,2,"example1" +"D","update_uk","test",450250823807270931,true,2,2,"example2" +"I","update_uk","test",450250823807270931,true,2,1,"example2" + +# split in csv encoder +"D","update_uk","test",450250823820115970,true,10,10,"example10" +"I","update_uk","test",450250823820115970,true,10,30,"example10" +"D","update_uk","test",450250823820115970,true,20,20,"example20" +"I","update_uk","test",450250823820115970,true,20,40,"example20" + +# split in csv encoder +"D","update_uk","test",450250823820115973,true,100,100,"example100" +"I","update_uk","test",450250823820115973,true,100,200,"example100" + +# normal update event, also split in csv encoder +"D","update_uk","test",450250823820115977,true,1000,1000,"example1000" +"I","update_uk","test",450250823820115977,true,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_pk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_pk.res new file mode 100644 index 00000000000..54595b6f49b --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_pk.res @@ -0,0 +1,22 @@ +"I","update_pk","test",450250823741472787,1,"example1" +"I","update_pk","test",450250823741472787,2,"example2" +"I","update_pk","test",450250823741472790,10,"example10" +"I","update_pk","test",450250823741472790,20,"example20" +"I","update_pk","test",450250823741472791,100,"example100" +"I","update_pk","test",450250823741472792,1000,"example1000" + +# output raw change event +"U","update_pk","test",450250823807270922,2,"example1" +"U","update_pk","test",450250823807270922,1,"example2" + + +# DIFF_RES: +# DELETE FROM `test`.`update_pk` WHERE `id` = 10 AND `pad` = 'example10' LIMIT 1; +# DELETE FROM `test`.`update_pk` WHERE `id` = 20 AND `pad` = 'example20' LIMIT 1; +# DELETE FROM `test`.`update_pk` WHERE `id` = 100 AND `pad` = 'example100' LIMIT 1; +# old row is not deleted since lack of old value +"U","update_pk","test",450250823807270925,30,"example10" +"U","update_pk","test",450250823807270925,40,"example20" +"U","update_pk","test",450250823807270927,200,"example100" + +"U","update_pk","test",450250823807270928,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_uk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_uk.res new file mode 100644 index 00000000000..ed01246a1ed --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_uk.res @@ -0,0 +1,14 @@ +"I","update_uk","test",450250823780794385,1,1,"example1" +"I","update_uk","test",450250823780794385,2,2,"example2" +"I","update_uk","test",450250823780794387,10,10,"example10" +"I","update_uk","test",450250823780794387,20,20,"example20" +"I","update_uk","test",450250823780794389,100,100,"example100" +"I","update_uk","test",450250823780794390,1000,1000,"example1000" + +# output raw change event, data is consistent since replace by pk/uk +"U","update_uk","test",450250823807270931,1,2,"example1" +"U","update_uk","test",450250823807270931,2,1,"example2" +"U","update_uk","test",450250823820115970,10,30,"example10" +"U","update_uk","test",450250823820115970,20,40,"example20" +"U","update_uk","test",450250823820115973,100,200,"example100" +"U","update_uk","test",450250823820115977,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_pk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_pk.res new file mode 100644 index 00000000000..79dc50dbf3c --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_pk.res @@ -0,0 +1,26 @@ +"I","update_pk","test",450250823741472787,false,1,"example1" +"I","update_pk","test",450250823741472787,false,2,"example2" +"I","update_pk","test",450250823741472790,false,10,"example10" +"I","update_pk","test",450250823741472790,false,20,"example20" +"I","update_pk","test",450250823741472791,false,100,"example100" +"I","update_pk","test",450250823741472792,false,1000,"example1000" + +# split and sort in table sink +"D","update_pk","test",450250823807270922,false,1,"example1" +"D","update_pk","test",450250823807270922,false,2,"example2" +"I","update_pk","test",450250823807270922,false,2,"example1" +"I","update_pk","test",450250823807270922,false,1,"example2" + +# split and sort in table sink +"D","update_pk","test",450250823807270925,false,10,"example10" +"D","update_pk","test",450250823807270925,false,20,"example20" +"I","update_pk","test",450250823807270925,false,30,"example10" +"I","update_pk","test",450250823807270925,false,40,"example20" + +# split and sort in table sink +"D","update_pk","test",450250823807270927,false,100,"example100" +"I","update_pk","test",450250823807270927,false,200,"example100" + +# normal update event, split in csv encoder +"D","update_pk","test",450250823807270928,true,1000,"example1000" +"I","update_pk","test",450250823807270928,true,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_uk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_uk.res new file mode 100644 index 00000000000..2225f55ff95 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_uk.res @@ -0,0 +1,26 @@ +"I","update_uk","test",450250823780794385,false,1,1,"example1" +"I","update_uk","test",450250823780794385,false,2,2,"example2" +"I","update_uk","test",450250823780794387,false,10,10,"example10" +"I","update_uk","test",450250823780794387,false,20,20,"example20" +"I","update_uk","test",450250823780794389,false,100,100,"example100" +"I","update_uk","test",450250823780794390,false,1000,1000,"example1000" + +# split and sort in table sink +"D","update_uk","test",450250823807270931,false,1,1,"example1" +"D","update_uk","test",450250823807270931,false,2,2,"example2" +"I","update_uk","test",450250823807270931,false,1,2,"example1" +"I","update_uk","test",450250823807270931,false,2,1,"example2" + +# split and sort in table sink +"D","update_uk","test",450250823820115970,false,10,10,"example10" +"D","update_uk","test",450250823820115970,false,20,20,"example20" +"I","update_uk","test",450250823820115970,false,10,30,"example10" +"I","update_uk","test",450250823820115970,false,20,40,"example20" + +# split and sort in table sink +"D","update_uk","test",450250823820115973,false,100,100,"example100" +"I","update_uk","test",450250823820115973,false,100,200,"example100" + +# normal update event, split in csv encoder +"D","update_uk","test",450250823820115977,true,1000,1000,"example1000" +"I","update_uk","test",450250823820115977,true,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh b/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh new file mode 100644 index 00000000000..adbf3392ffd --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh @@ -0,0 +1,64 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run_changefeed() { + local changefeed_id=$1 + local start_ts=$2 + local expected_split_count=$3 + local should_pass_check=$4 + SINK_URI="file://$WORK_DIR/storage_test/$changefeed_id?flush-interval=5s" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config=$CUR/conf/$changefeed_id.toml -c "$changefeed_id" + + run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/$changefeed_id.toml $changefeed_id + sleep 8 + + cp $CUR/conf/diff_config.toml $WORK_DIR/diff_config.toml + sed -i "s//$changefeed_id/" $WORK_DIR/diff_config.toml + if [[ $should_pass_check == true ]]; then + check_sync_diff $WORK_DIR $WORK_DIR/diff_config.toml 100 + else + check_sync_diff $WORK_DIR $WORK_DIR/diff_config.toml 30 && exit 1 || echo "check_sync_diff failed as expected for $changefeed_id" + fi + + real_split_count=$(grep "split update event" $WORK_DIR/cdc.log | wc -l) + if [[ $real_split_count -ne $expected_split_count ]]; then + echo "expected split count $expected_split_count, real split count $real_split_count" + exit 1 + fi + run_sql "drop database if exists test" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} +} + +function run() { + if [ "$SINK_TYPE" != "storage" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/run.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + run_changefeed "changefeed1" $start_ts 10 true + # changefeed2 fail since delete events are not sorted + run_changefeed "changefeed2" $start_ts 10 false + # changefeed3 fail since update pk/uk events are not split + run_changefeed "changefeed3" $start_ts 10 false + run_changefeed "changefeed4" $start_ts 20 true +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index 36988c6a3f8..4205579440b 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -18,7 +18,7 @@ kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error kafka_only_protocol="canal_json_adapter_compatibility canal_json_basic canal_json_content_compatible multi_topics canal_json_handle_key_only open_protocol_handle_key_only" kafka_only_v2="kafka_big_txn_v2 kafka_big_messages_v2 multi_tables_ddl_v2 multi_topics_v2" -storage_only="lossy_ddl storage_csv_update" +storage_only="lossy_ddl storage_csv_update csv_storage_update_pk_clustered csv_storage_update_pk_nonclustered" storage_only_csv="storage_cleanup csv_storage_basic csv_storage_multi_tables_ddl csv_storage_partition_table" storage_only_canal_json="canal_json_storage_basic canal_json_storage_partition_table"