Skip to content

Commit

Permalink
pkg/config, sink(ticdc): support output raw change event for mq and c…
Browse files Browse the repository at this point in the history
…loud storage sink (pingcap#11226)

close pingcap#11211
  • Loading branch information
CharlesCheung96 committed Jul 15, 2024
1 parent f203805 commit d970f03
Show file tree
Hide file tree
Showing 52 changed files with 1,591 additions and 147 deletions.
50 changes: 28 additions & 22 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
InsecureSkipVerify: c.Sink.KafkaConfig.InsecureSkipVerify,
CodecConfig: codeConfig,
LargeMessageHandle: largeMessageHandle,
OutputRawChangeEvent: c.Sink.KafkaConfig.OutputRawChangeEvent,
}
}
var mysqlConfig *config.MySQLConfig
Expand All @@ -400,13 +401,14 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
var cloudStorageConfig *config.CloudStorageConfig
if c.Sink.CloudStorageConfig != nil {
cloudStorageConfig = &config.CloudStorageConfig{
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
OutputRawChangeEvent: c.Sink.CloudStorageConfig.OutputRawChangeEvent,
}
}

Expand Down Expand Up @@ -606,6 +608,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
InsecureSkipVerify: cloned.Sink.KafkaConfig.InsecureSkipVerify,
CodecConfig: codeConfig,
LargeMessageHandle: largeMessageHandle,
OutputRawChangeEvent: cloned.Sink.KafkaConfig.OutputRawChangeEvent,
}
}
var mysqlConfig *MySQLConfig
Expand All @@ -631,13 +634,14 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
var cloudStorageConfig *CloudStorageConfig
if cloned.Sink.CloudStorageConfig != nil {
cloudStorageConfig = &CloudStorageConfig{
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
OutputRawChangeEvent: cloned.Sink.CloudStorageConfig.OutputRawChangeEvent,
}
}

Expand Down Expand Up @@ -1081,7 +1085,8 @@ type KafkaConfig struct {
InsecureSkipVerify *bool `json:"insecure_skip_verify,omitempty"`
CodecConfig *CodecConfig `json:"codec_config,omitempty"`

LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"`
LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"`
OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"`
}

// MySQLConfig represents a MySQL sink configuration
Expand All @@ -1105,13 +1110,14 @@ type MySQLConfig struct {

// CloudStorageConfig represents a cloud storage sink configuration
type CloudStorageConfig struct {
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"`
}

// ChangefeedStatus holds common information of a changefeed in cdc
Expand Down
47 changes: 23 additions & 24 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (r *RedoLog) GetCommitTs() Ts {
}

// TrySplitAndSortUpdateEvent redo log do nothing
func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string) error {
func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string, _ bool) error {
return nil
}

Expand Down Expand Up @@ -380,7 +380,7 @@ func (r *RowChangedEvent) GetCommitTs() uint64 {
}

// TrySplitAndSortUpdateEvent do nothing
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string) error {
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string, _ bool) error {
return nil
}

Expand Down Expand Up @@ -758,10 +758,19 @@ func (t *SingleTableTxn) GetCommitTs() uint64 {
}

// TrySplitAndSortUpdateEvent split update events if unique key is updated
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error {
if t.dontSplitUpdateEvent(scheme) {
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error {
if sink.IsMySQLCompatibleScheme(scheme) || outputRawChangeEvent {
// For MySQL Sink, all update events will be split into insert and delete at the puller side
// according to whether the changefeed is in safemode. We don't split update event here(in sink)
// since there may be OOM issues. For more information, ref https://github.com/tikv/tikv/issues/17062.
//
// For the Kafka and Storage sink, the outputRawChangeEvent parameter is introduced to control
// split behavior. TiCDC only output original change event if outputRawChangeEvent is true.
return nil
}

// Try to split update events for the Kafka and Storage sink if outputRawChangeEvent is false.
// Note it is only for backward compatibility, and we should remove this logic in the future.
newRows, err := trySplitAndSortUpdateEvent(t.Rows)
if err != nil {
return errors.Trace(err)
Expand All @@ -770,21 +779,6 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error {
return nil
}

// Whether split a single update event into delete and insert events?
//
// For the MySQL Sink, we don't split any update event.
// This may cause error like "duplicate entry" when sink to the downstream.
// This kind of error will cause the changefeed to restart,
// and then the related update rows will be splitted to insert and delete at puller side.
//
// For the Kafka and Storage sink, always split a single unique key changed update event, since:
// 1. Avro and CSV does not output the previous column values for the update event, so it would
// cause consumer missing data if the unique key changed event is not split.
// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split.
func (t *SingleTableTxn) dontSplitUpdateEvent(scheme string) bool {
return sink.IsMySQLCompatibleScheme(scheme)
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
// returns true if some updated events is split
func trySplitAndSortUpdateEvent(
Expand All @@ -794,8 +788,7 @@ func trySplitAndSortUpdateEvent(
split := false
for _, e := range events {
if e == nil {
log.Warn("skip emit nil event",
zap.Any("event", e))
log.Warn("skip emit nil event", zap.Any("event", e))
continue
}

Expand All @@ -805,8 +798,7 @@ func trySplitAndSortUpdateEvent(
// begin; insert into t (id) values (1); delete from t where id=1; commit;
// Just ignore these row changed events.
if colLen == 0 && preColLen == 0 {
log.Warn("skip emit empty row event",
zap.Any("event", e))
log.Warn("skip emit empty row event", zap.Any("event", e))
continue
}

Expand All @@ -832,7 +824,7 @@ func trySplitAndSortUpdateEvent(

// ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// whether the handle key column or unique key has been modified.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
// nil event will never be split.
if updateEvent == nil {
Expand Down Expand Up @@ -875,6 +867,13 @@ func SplitUpdateEvent(
// NOTICE: clean up pre cols for insert event.
insertEvent.PreColumns = nil

log.Debug("split update event", zap.Uint64("startTs", updateEvent.StartTs),
zap.Uint64("commitTs", updateEvent.CommitTs),
zap.String("schema", updateEvent.TableInfo.TableName.Schema),
zap.String("table", updateEvent.TableInfo.TableName.Table),
zap.Any("preCols", updateEvent.PreColumns),
zap.Any("cols", updateEvent.Columns))

return &deleteEvent, &insertEvent, nil
}

Expand Down
17 changes: 14 additions & 3 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,21 +598,32 @@ func TestTrySplitAndSortUpdateEventOne(t *testing.T) {
Rows: []*RowChangedEvent{ukUpdatedEvent},
}

err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme)
outputRawChangeEvent := true
notOutputRawChangeEvent := false
err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, outputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)
err = txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, notOutputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 2)

txn = &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)

txn2 := &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn2.Rows, 2)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn2.Rows, 2)
}
8 changes: 4 additions & 4 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (m *mockSink) WriteEvents(events ...*dmlsink.CallbackableEvent[*model.RowCh
return nil
}

func (m *mockSink) SchemeOption() (string, bool) {
return sink.BlackHoleScheme, false
}

func (m *mockSink) GetEvents() []*dmlsink.CallbackableEvent[*model.RowChangedEvent] {
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -70,10 +74,6 @@ func (m *mockSink) Dead() <-chan struct{} {
return make(chan struct{})
}

func (m *mockSink) Scheme() string {
return sink.BlackHoleScheme
}

func (m *mockSink) AckAllEvents() {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *DMLSink) Dead() <-chan struct{} {
return make(chan struct{})
}

// Scheme returns the sink scheme.
func (s *DMLSink) Scheme() string {
return sink.BlackHoleScheme
// SchemeOption returns the scheme and the option.
func (s *DMLSink) SchemeOption() (string, bool) {
return sink.BlackHoleScheme, true
}
30 changes: 16 additions & 14 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ type eventFragment struct {
// DMLSink is the cloud storage sink.
// It will send the events to cloud storage systems.
type DMLSink struct {
changefeedID model.ChangeFeedID
scheme string
changefeedID model.ChangeFeedID
scheme string
outputRawChangeEvent bool
// last sequence number
lastSeqNum uint64
// encodingWorkers defines a group of workers for encoding events.
Expand Down Expand Up @@ -133,13 +134,14 @@ func NewDMLSink(ctx context.Context,

wgCtx, wgCancel := context.WithCancel(ctx)
s := &DMLSink{
changefeedID: contextutil.ChangefeedIDFromCtx(wgCtx),
scheme: strings.ToLower(sinkURI.Scheme),
encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency),
workers: make([]*dmlWorker, cfg.WorkerCount),
statistics: metrics.NewStatistics(wgCtx, sink.TxnSink),
cancel: wgCancel,
dead: make(chan struct{}),
changefeedID: contextutil.ChangefeedIDFromCtx(wgCtx),
scheme: strings.ToLower(sinkURI.Scheme),
outputRawChangeEvent: replicaConfig.Sink.CloudStorageConfig.GetOutputRawChangeEvent(),
encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency),
workers: make([]*dmlWorker, cfg.WorkerCount),
statistics: metrics.NewStatistics(wgCtx, sink.TxnSink),
cancel: wgCancel,
dead: make(chan struct{}),
}
s.alive.msgCh = chann.NewAutoDrainChann[eventFragment]()

Expand Down Expand Up @@ -244,11 +246,6 @@ func (s *DMLSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa
return nil
}

// Scheme returns the sink scheme.
func (s *DMLSink) Scheme() string {
return s.scheme
}

// Close closes the cloud storage sink.
func (s *DMLSink) Close() {
if s.cancel != nil {
Expand All @@ -273,3 +270,8 @@ func (s *DMLSink) Close() {
func (s *DMLSink) Dead() <-chan struct{} {
return s.dead
}

// SchemeOption returns the scheme and the option.
func (s *DMLSink) SchemeOption() (string, bool) {
return s.scheme, s.outputRawChangeEvent
}
3 changes: 1 addition & 2 deletions cdc/sink/dmlsink/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ type TableEvent interface {
// GetCommitTs returns the commit timestamp of the event.
GetCommitTs() uint64
// TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated
// Note that sinkScheme is used to control the split behavior.
TrySplitAndSortUpdateEvent(scheme string) error
TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error
}

// CallbackFunc is the callback function for callbackable event.
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/dmlsink/event_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type EventSink[E TableEvent] interface {
// WriteEvents writes events to the sink.
// This is an asynchronously and thread-safe method.
WriteEvents(events ...*CallbackableEvent[E]) error
// Scheme returns the sink scheme.
Scheme() string
// SchemeOption returns the sink scheme and whether the sink should output raw change event.
SchemeOption() (scheme string, outputRawChangeEvent bool)
// Close closes the sink. Can be called with `WriteEvents` concurrently.
Close()
// The EventSink meets internal errors and has been dead already.
Expand Down
11 changes: 9 additions & 2 deletions cdc/sink/dmlsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ func NewKafkaDMLSink(
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

log.Info("Try to create a DML sink producer", zap.Any("options", options))
log.Info("Try to create a DML sink producer",
zap.String("namespace", changefeed.Namespace),
zap.String("changefeedID", changefeed.ID),
zap.Any("options", options))
failpointCh := make(chan error, 1)
asyncProducer, err := factory.AsyncProducer(ctx, failpointCh)
if err != nil {
Expand All @@ -113,7 +116,11 @@ func NewKafkaDMLSink(
metricsCollector := factory.MetricsCollector(tiflowutil.RoleProcessor, adminClient)
dmlProducer := producerCreator(ctx, changefeed, asyncProducer, metricsCollector, errCh, failpointCh)
s := newDMLSink(ctx, sinkURI, changefeed, dmlProducer, adminClient, topicManager, eventRouter, encoderBuilder,
replicaConfig.Sink.EncoderConcurrency, protocol, errCh)
replicaConfig.Sink.EncoderConcurrency, protocol, replicaConfig.Sink.KafkaConfig.GetOutputRawChangeEvent(), errCh)

log.Info("DML sink producer created",
zap.String("namespace", changefeed.Namespace),
zap.String("changefeedID", changefeed.ID))

return s, nil
}
Loading

0 comments on commit d970f03

Please sign in to comment.