From 48251882f05b83d33dda7eb50d750191046b86de Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Tue, 20 Aug 2024 14:54:13 +0800 Subject: [PATCH] =?UTF-8?q?kafka(ticdc):=20Revert=20"kafka(ticdc):=20claim?= =?UTF-8?q?=20check=20support=20large=20message=20raw=20value=20for?= =?UTF-8?q?=E2=80=A6=20(#11494)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit close pingcap/tiflow#11493 --- cdc/api/v2/model.go | 3 - pkg/config/config_test_data.go | 3 +- pkg/config/large_message.go | 5 - pkg/config/large_message_test.go | 139 ++++-------------- pkg/sink/codec/canal/canal_json_decoder.go | 12 +- .../canal/canal_json_row_event_encoder.go | 12 +- .../canal_json_row_event_encoder_test.go | 68 ++++----- pkg/sink/codec/open/open_protocol_encoder.go | 12 +- pkg/sink/codec/simple/decoder.go | 12 +- pkg/sink/codec/simple/encoder.go | 14 +- pkg/sink/codec/simple/encoder_test.go | 131 ++++++++--------- pkg/sink/kafka/claimcheck/claim_check.go | 41 +++--- pkg/sink/kafka/claimcheck/claim_check_test.go | 14 +- 13 files changed, 182 insertions(+), 284 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 3255dd602a8..4ce64cb79e5 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -352,7 +352,6 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( LargeMessageHandleOption: oldConfig.LargeMessageHandleOption, LargeMessageHandleCompression: oldConfig.LargeMessageHandleCompression, ClaimCheckStorageURI: oldConfig.ClaimCheckStorageURI, - ClaimCheckRawValue: oldConfig.ClaimCheckRawValue, } } @@ -623,7 +622,6 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { LargeMessageHandleOption: oldConfig.LargeMessageHandleOption, LargeMessageHandleCompression: oldConfig.LargeMessageHandleCompression, ClaimCheckStorageURI: oldConfig.ClaimCheckStorageURI, - ClaimCheckRawValue: oldConfig.ClaimCheckRawValue, } } @@ -991,7 +989,6 @@ type LargeMessageHandleConfig struct { LargeMessageHandleOption string `json:"large_message_handle_option"` LargeMessageHandleCompression string `json:"large_message_handle_compression"` ClaimCheckStorageURI string `json:"claim_check_storage_uri"` - ClaimCheckRawValue bool `json:"claim_check_raw_value"` } // DispatchRule represents partition rule for a table diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index ab3d584a73f..7f9a5720b90 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -296,8 +296,7 @@ const ( "large-message-handle": { "large-message-handle-option": "handle-key-only", "large-message-handle-compression": "", - "claim-check-storage-uri": "", - "claim-check-raw-value": false + "claim-check-storage-uri": "" }, "glue-schema-registry-config": { "region":"region", diff --git a/pkg/config/large_message.go b/pkg/config/large_message.go index 587a37f4cca..f0a7ae76695 100644 --- a/pkg/config/large_message.go +++ b/pkg/config/large_message.go @@ -32,7 +32,6 @@ type LargeMessageHandleConfig struct { LargeMessageHandleOption string `toml:"large-message-handle-option" json:"large-message-handle-option"` LargeMessageHandleCompression string `toml:"large-message-handle-compression" json:"large-message-handle-compression"` ClaimCheckStorageURI string `toml:"claim-check-storage-uri" json:"claim-check-storage-uri"` - ClaimCheckRawValue bool `toml:"claim-check-raw-value" json:"claim-check-raw-value"` } // NewDefaultLargeMessageHandleConfig return the default Config. @@ -81,10 +80,6 @@ func (c *LargeMessageHandleConfig) AdjustAndValidate(protocol Protocol, enableTi return cerror.ErrInvalidReplicaConfig.GenWithStack( "large message handle is set to claim-check, but the claim-check-storage-uri is empty") } - if c.ClaimCheckRawValue && protocol == ProtocolOpen { - return cerror.ErrInvalidReplicaConfig.GenWithStack( - "large message handle is set to claim-check, raw value is not supported for the open protocol") - } } return nil diff --git a/pkg/config/large_message_test.go b/pkg/config/large_message_test.go index 53b01da1d82..3bc1dd6b212 100644 --- a/pkg/config/large_message_test.go +++ b/pkg/config/large_message_test.go @@ -58,7 +58,7 @@ func TestLargeMessageHandle4NotSupportedProtocol(t *testing.T) { require.ErrorIs(t, err, cerror.ErrInvalidReplicaConfig) } -func TestHandleKeyOnly4CanalJSON(t *testing.T) { +func TestLargeMessageHandle4CanalJSON(t *testing.T) { t.Parallel() // large-message-handle not set, always no error @@ -68,45 +68,27 @@ func TestHandleKeyOnly4CanalJSON(t *testing.T) { require.NoError(t, err) require.True(t, largeMessageHandle.Disabled()) - largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionHandleKeyOnly - - // `enable-tidb-extension` is false, return error - err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false) - require.ErrorIs(t, err, cerror.ErrInvalidReplicaConfig) - - // `enable-tidb-extension` is true, no error - err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, true) - require.NoError(t, err) - require.Equal(t, LargeMessageHandleOptionHandleKeyOnly, largeMessageHandle.LargeMessageHandleOption) -} - -func TestClaimCheck4CanalJSON(t *testing.T) { - t.Parallel() + for _, option := range []string{ + LargeMessageHandleOptionHandleKeyOnly, + LargeMessageHandleOptionClaimCheck, + } { + largeMessageHandle.LargeMessageHandleOption = option + if option == LargeMessageHandleOptionClaimCheck { + largeMessageHandle.ClaimCheckStorageURI = "file:///tmp/claim-check" + } - // large-message-handle not set, always no error - largeMessageHandle := NewDefaultLargeMessageHandleConfig() - - err := largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false) - require.NoError(t, err) - require.True(t, largeMessageHandle.Disabled()) - - largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionClaimCheck - largeMessageHandle.ClaimCheckStorageURI = "file:///tmp/claim-check" - - for _, rawValue := range []bool{false, true} { - largeMessageHandle.ClaimCheckRawValue = rawValue // `enable-tidb-extension` is false, return error - err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false) + err := largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false) require.ErrorIs(t, err, cerror.ErrInvalidReplicaConfig) // `enable-tidb-extension` is true, no error err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, true) require.NoError(t, err) - require.Equal(t, LargeMessageHandleOptionClaimCheck, largeMessageHandle.LargeMessageHandleOption) + require.Equal(t, option, largeMessageHandle.LargeMessageHandleOption) } } -func TestHandleKeyOnly4OpenProtocol(t *testing.T) { +func TestLargeMessageHandle4OpenProtocol(t *testing.T) { t.Parallel() // large-message-handle not set, always no error @@ -116,88 +98,23 @@ func TestHandleKeyOnly4OpenProtocol(t *testing.T) { require.NoError(t, err) require.True(t, largeMessageHandle.Disabled()) - largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionHandleKeyOnly - // `enable-tidb-extension` is false, return error - err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, false) - require.NoError(t, err) - - // `enable-tidb-extension` is true, no error - err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, true) - require.NoError(t, err) - require.Equal(t, LargeMessageHandleOptionHandleKeyOnly, largeMessageHandle.LargeMessageHandleOption) -} - -func TestClaimCheck4OpenProtocol(t *testing.T) { - t.Parallel() - - // large-message-handle not set, always no error - largeMessageHandle := NewDefaultLargeMessageHandleConfig() - - err := largeMessageHandle.AdjustAndValidate(ProtocolOpen, false) - require.NoError(t, err) - require.True(t, largeMessageHandle.Disabled()) - - largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionClaimCheck - largeMessageHandle.ClaimCheckStorageURI = "file:///tmp/claim-check" - - // `enable-tidb-extension` is false, return error - err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, false) - require.NoError(t, err) - - // `enable-tidb-extension` is true, no error - err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, true) - require.NoError(t, err) - require.Equal(t, LargeMessageHandleOptionClaimCheck, largeMessageHandle.LargeMessageHandleOption) - - largeMessageHandle.ClaimCheckRawValue = true - err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, true) - require.ErrorIs(t, err, cerror.ErrInvalidReplicaConfig) -} - -func TestHandleKeyOnly4SimpleProtocol(t *testing.T) { - t.Parallel() - - // large-message-handle not set, always no error - largeMessageHandle := NewDefaultLargeMessageHandleConfig() - - err := largeMessageHandle.AdjustAndValidate(ProtocolSimple, false) - require.NoError(t, err) - require.True(t, largeMessageHandle.Disabled()) - - largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionHandleKeyOnly - // `enable-tidb-extension` is false, return error - err = largeMessageHandle.AdjustAndValidate(ProtocolSimple, false) - require.NoError(t, err) - - // `enable-tidb-extension` is true, no error - err = largeMessageHandle.AdjustAndValidate(ProtocolSimple, true) - require.NoError(t, err) - require.Equal(t, LargeMessageHandleOptionHandleKeyOnly, largeMessageHandle.LargeMessageHandleOption) -} + for _, o := range []string{ + LargeMessageHandleOptionHandleKeyOnly, + LargeMessageHandleOptionClaimCheck, + } { + largeMessageHandle.LargeMessageHandleOption = o + if o == LargeMessageHandleOptionClaimCheck { + largeMessageHandle.ClaimCheckStorageURI = "file:///tmp/claim-check" + } -func TestClaimCheck4SimpleProtocol(t *testing.T) { - t.Parallel() - - // large-message-handle not set, always no error - largeMessageHandle := NewDefaultLargeMessageHandleConfig() - - err := largeMessageHandle.AdjustAndValidate(ProtocolSimple, false) - require.NoError(t, err) - require.True(t, largeMessageHandle.Disabled()) - - largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionClaimCheck - largeMessageHandle.ClaimCheckStorageURI = "file:///tmp/claim-check" - - // `enable-tidb-extension` is false, return error - err = largeMessageHandle.AdjustAndValidate(ProtocolSimple, false) - require.NoError(t, err) + // `enable-tidb-extension` is false, return error + err := largeMessageHandle.AdjustAndValidate(ProtocolOpen, false) + require.NoError(t, err) - // `enable-tidb-extension` is true, no error - err = largeMessageHandle.AdjustAndValidate(ProtocolSimple, true) - require.NoError(t, err) - require.Equal(t, LargeMessageHandleOptionClaimCheck, largeMessageHandle.LargeMessageHandleOption) + // `enable-tidb-extension` is true, no error + err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, true) + require.NoError(t, err) + require.Equal(t, o, largeMessageHandle.LargeMessageHandleOption) - largeMessageHandle.ClaimCheckRawValue = true - err = largeMessageHandle.AdjustAndValidate(ProtocolSimple, true) - require.NoError(t, err) + } } diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index cd1a0a92c98..18a135b9c12 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -143,16 +143,12 @@ func (b *batchDecoder) assembleClaimCheckRowChangedEvent(ctx context.Context, cl if err != nil { return nil, err } - - if !b.config.LargeMessageHandle.ClaimCheckRawValue { - claimCheckM, err := common.UnmarshalClaimCheckMessage(data) - if err != nil { - return nil, err - } - data = claimCheckM.Value + claimCheckM, err := common.UnmarshalClaimCheckMessage(data) + if err != nil { + return nil, err } - value, err := common.Decompress(b.config.LargeMessageHandle.LargeMessageHandleCompression, data) + value, err := common.Decompress(b.config.LargeMessageHandle.LargeMessageHandleCompression, claimCheckM.Value) if err != nil { return nil, err } diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder.go b/pkg/sink/codec/canal/canal_json_row_event_encoder.go index 76f1f5e52d3..35077da1363 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder.go @@ -534,9 +534,15 @@ type jsonRowEventEncoderBuilder struct { // NewJSONRowEventEncoderBuilder creates a canal-json batchEncoderBuilder. func NewJSONRowEventEncoderBuilder(ctx context.Context, config *common.Config) (codec.RowEventEncoderBuilder, error) { - claimCheck, err := claimcheck.New(ctx, config.LargeMessageHandle, config.ChangefeedID) - if err != nil { - return nil, errors.Trace(err) + var ( + claimCheck *claimcheck.ClaimCheck + err error + ) + if config.LargeMessageHandle.EnableClaimCheck() { + claimCheck, err = claimcheck.New(ctx, config.LargeMessageHandle.ClaimCheckStorageURI, config.ChangefeedID) + if err != nil { + return nil, errors.Trace(err) + } } return &jsonRowEventEncoderBuilder{ config: config, diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go index 9a6fafd22ba..25fae69a1b5 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -214,50 +214,46 @@ func TestCanalJSONClaimCheckE2E(t *testing.T) { codecConfig.MaxMessageBytes = 500 ctx := context.Background() - for _, rawValue := range []bool{false, true} { - codecConfig.LargeMessageHandle.ClaimCheckRawValue = rawValue - - builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) - require.NoError(t, err) - encoder := builder.Build() + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() - _, insertEvent, _, _ := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig()) - err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, func() {}) - require.NoError(t, err) + _, insertEvent, _, _ := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig()) + err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, func() {}) + require.NoError(t, err) - // this is a large message, should be delivered to the external storage. - claimCheckLocationMessage := encoder.Build()[0] + // this is a large message, should be delivered to the external storage. + claimCheckLocationMessage := encoder.Build()[0] - decoder, err := NewBatchDecoder(ctx, codecConfig, nil) - require.NoError(t, err) + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) - err = decoder.AddKeyValue(claimCheckLocationMessage.Key, claimCheckLocationMessage.Value) - require.NoError(t, err) + err = decoder.AddKeyValue(claimCheckLocationMessage.Key, claimCheckLocationMessage.Value) + require.NoError(t, err) - messageType, ok, err := decoder.HasNext() - require.NoError(t, err) - require.Equal(t, messageType, model.MessageTypeRow) - require.True(t, ok) + messageType, ok, err := decoder.HasNext() + require.NoError(t, err) + require.Equal(t, messageType, model.MessageTypeRow) + require.True(t, ok) - decodedLargeEvent, err := decoder.NextRowChangedEvent() - require.NoError(t, err, rawValue) + decodedLargeEvent, err := decoder.NextRowChangedEvent() + require.NoError(t, err) - require.Equal(t, insertEvent.CommitTs, decodedLargeEvent.CommitTs) - require.Equal(t, insertEvent.TableInfo.GetSchemaName(), decodedLargeEvent.TableInfo.GetSchemaName()) - require.Equal(t, insertEvent.TableInfo.GetTableName(), decodedLargeEvent.TableInfo.GetTableName()) - require.Nil(t, nil, decodedLargeEvent.PreColumns) + require.Equal(t, insertEvent.CommitTs, decodedLargeEvent.CommitTs) + require.Equal(t, insertEvent.TableInfo.GetSchemaName(), decodedLargeEvent.TableInfo.GetSchemaName()) + require.Equal(t, insertEvent.TableInfo.GetTableName(), decodedLargeEvent.TableInfo.GetTableName()) + require.Nil(t, nil, decodedLargeEvent.PreColumns) - decodedColumns := make(map[string]*model.ColumnData, len(decodedLargeEvent.Columns)) - for _, column := range decodedLargeEvent.Columns { - colName := decodedLargeEvent.TableInfo.ForceGetColumnName(column.ColumnID) - decodedColumns[colName] = column - } - for _, col := range insertEvent.Columns { - colName := insertEvent.TableInfo.ForceGetColumnName(col.ColumnID) - decoded, ok := decodedColumns[colName] - require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) - } + decodedColumns := make(map[string]*model.ColumnData, len(decodedLargeEvent.Columns)) + for _, column := range decodedLargeEvent.Columns { + colName := decodedLargeEvent.TableInfo.ForceGetColumnName(column.ColumnID) + decodedColumns[colName] = column + } + for _, col := range insertEvent.Columns { + colName := insertEvent.TableInfo.ForceGetColumnName(col.ColumnID) + decoded, ok := decodedColumns[colName] + require.True(t, ok) + require.EqualValues(t, col.Value, decoded.Value) } } diff --git a/pkg/sink/codec/open/open_protocol_encoder.go b/pkg/sink/codec/open/open_protocol_encoder.go index 5743d94a87f..fe8fb752a3f 100644 --- a/pkg/sink/codec/open/open_protocol_encoder.go +++ b/pkg/sink/codec/open/open_protocol_encoder.go @@ -351,9 +351,15 @@ func (b *batchEncoderBuilder) CleanMetrics() { func NewBatchEncoderBuilder( ctx context.Context, config *common.Config, ) (codec.RowEventEncoderBuilder, error) { - claimCheck, err := claimcheck.New(ctx, config.LargeMessageHandle, config.ChangefeedID) - if err != nil { - return nil, errors.Trace(err) + var ( + claimCheck *claimcheck.ClaimCheck + err error + ) + if config.LargeMessageHandle.EnableClaimCheck() { + claimCheck, err = claimcheck.New(ctx, config.LargeMessageHandle.ClaimCheckStorageURI, config.ChangefeedID) + if err != nil { + return nil, errors.Trace(err) + } } return &batchEncoderBuilder{ config: config, diff --git a/pkg/sink/codec/simple/decoder.go b/pkg/sink/codec/simple/decoder.go index 1aae2d7b016..3a910863a2a 100644 --- a/pkg/sink/codec/simple/decoder.go +++ b/pkg/sink/codec/simple/decoder.go @@ -170,16 +170,12 @@ func (d *Decoder) assembleClaimCheckRowChangedEvent(claimCheckLocation string) ( if err != nil { return nil, err } - - if !d.config.LargeMessageHandle.ClaimCheckRawValue { - claimCheckM, err := common.UnmarshalClaimCheckMessage(data) - if err != nil { - return nil, err - } - data = claimCheckM.Value + claimCheckM, err := common.UnmarshalClaimCheckMessage(data) + if err != nil { + return nil, err } - value, err := common.Decompress(d.config.LargeMessageHandle.LargeMessageHandleCompression, data) + value, err := common.Decompress(d.config.LargeMessageHandle.LargeMessageHandleCompression, claimCheckM.Value) if err != nil { return nil, err } diff --git a/pkg/sink/codec/simple/encoder.go b/pkg/sink/codec/simple/encoder.go index 36305d23a42..64774eae235 100644 --- a/pkg/sink/codec/simple/encoder.go +++ b/pkg/sink/codec/simple/encoder.go @@ -166,10 +166,18 @@ type builder struct { // NewBuilder returns a new builder func NewBuilder(ctx context.Context, config *common.Config) (*builder, error) { - claimCheck, err := claimcheck.New(ctx, config.LargeMessageHandle, config.ChangefeedID) - if err != nil { - return nil, errors.Trace(err) + var ( + claimCheck *claimcheck.ClaimCheck + err error + ) + if config.LargeMessageHandle.EnableClaimCheck() { + claimCheck, err = claimcheck.New(ctx, + config.LargeMessageHandle.ClaimCheckStorageURI, config.ChangefeedID) + if err != nil { + return nil, errors.Trace(err) + } } + m, err := newMarshaller(config) return &builder{ config: config, diff --git a/pkg/sink/codec/simple/encoder_test.go b/pkg/sink/codec/simple/encoder_test.go index 5536bf4a8c4..a10dd78cf71 100644 --- a/pkg/sink/codec/simple/encoder_test.go +++ b/pkg/sink/codec/simple/encoder_test.go @@ -1473,87 +1473,84 @@ func TestLargerMessageHandleClaimCheck(t *testing.T) { require.Nil(t, badDec) codecConfig.LargeMessageHandle.ClaimCheckStorageURI = "file:///tmp/simple-claim-check" - for _, rawValue := range []bool{false, true} { - codecConfig.LargeMessageHandle.ClaimCheckRawValue = rawValue - for _, format := range []common.EncodingFormatType{ - common.EncodingFormatAvro, - common.EncodingFormatJSON, + for _, format := range []common.EncodingFormatType{ + common.EncodingFormatAvro, + common.EncodingFormatJSON, + } { + codecConfig.EncodingFormat = format + for _, compressionType := range []string{ + compression.None, + compression.Snappy, + compression.LZ4, } { - codecConfig.EncodingFormat = format - for _, compressionType := range []string{ - compression.None, - compression.Snappy, - compression.LZ4, - } { - codecConfig.MaxMessageBytes = config.DefaultMaxMessageBytes - codecConfig.LargeMessageHandle.LargeMessageHandleCompression = compressionType - - b, err = NewBuilder(ctx, codecConfig) - require.NoError(t, err) - enc := b.Build() + codecConfig.MaxMessageBytes = config.DefaultMaxMessageBytes + codecConfig.LargeMessageHandle.LargeMessageHandleCompression = compressionType - m, err := enc.EncodeDDLEvent(ddlEvent) - require.NoError(t, err) + b, err = NewBuilder(ctx, codecConfig) + require.NoError(t, err) + enc := b.Build() - dec, err := NewDecoder(ctx, codecConfig, nil) - require.NoError(t, err) + m, err := enc.EncodeDDLEvent(ddlEvent) + require.NoError(t, err) - err = dec.AddKeyValue(m.Key, m.Value) - require.NoError(t, err) + dec, err := NewDecoder(ctx, codecConfig, nil) + require.NoError(t, err) - messageType, hasNext, err := dec.HasNext() - require.NoError(t, err) - require.True(t, hasNext) - require.Equal(t, model.MessageTypeDDL, messageType) + err = dec.AddKeyValue(m.Key, m.Value) + require.NoError(t, err) - _, err = dec.NextDDLEvent() - require.NoError(t, err) + messageType, hasNext, err := dec.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeDDL, messageType) - enc.(*encoder).config.MaxMessageBytes = 500 - err = enc.AppendRowChangedEvent(ctx, "", updateEvent, func() {}) - require.NoError(t, err) + _, err = dec.NextDDLEvent() + require.NoError(t, err) - claimCheckLocationM := enc.Build()[0] + enc.(*encoder).config.MaxMessageBytes = 500 + err = enc.AppendRowChangedEvent(ctx, "", updateEvent, func() {}) + require.NoError(t, err) - dec.config.MaxMessageBytes = 500 - err = dec.AddKeyValue(claimCheckLocationM.Key, claimCheckLocationM.Value) - require.NoError(t, err) + claimCheckLocationM := enc.Build()[0] - messageType, hasNext, err = dec.HasNext() - require.NoError(t, err) - require.True(t, hasNext) - require.Equal(t, model.MessageTypeRow, messageType) - require.NotEqual(t, "", dec.msg.ClaimCheckLocation) + dec.config.MaxMessageBytes = 500 + err = dec.AddKeyValue(claimCheckLocationM.Key, claimCheckLocationM.Value) + require.NoError(t, err) - decodedRow, err := dec.NextRowChangedEvent() - require.NoError(t, err) + messageType, hasNext, err = dec.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeRow, messageType) + require.NotEqual(t, "", dec.msg.ClaimCheckLocation) - require.Equal(t, decodedRow.CommitTs, updateEvent.CommitTs) - require.Equal(t, decodedRow.TableInfo.GetSchemaName(), updateEvent.TableInfo.GetSchemaName()) - require.Equal(t, decodedRow.TableInfo.GetTableName(), updateEvent.TableInfo.GetTableName()) + decodedRow, err := dec.NextRowChangedEvent() + require.NoError(t, err) - decodedColumns := make(map[string]*model.ColumnData, len(decodedRow.Columns)) - for _, column := range decodedRow.Columns { - colName := decodedRow.TableInfo.ForceGetColumnName(column.ColumnID) - decodedColumns[colName] = column - } - for _, col := range updateEvent.Columns { - colName := updateEvent.TableInfo.ForceGetColumnName(col.ColumnID) - decoded, ok := decodedColumns[colName] - require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) - } + require.Equal(t, decodedRow.CommitTs, updateEvent.CommitTs) + require.Equal(t, decodedRow.TableInfo.GetSchemaName(), updateEvent.TableInfo.GetSchemaName()) + require.Equal(t, decodedRow.TableInfo.GetTableName(), updateEvent.TableInfo.GetTableName()) - for _, column := range decodedRow.PreColumns { - colName := decodedRow.TableInfo.ForceGetColumnName(column.ColumnID) - decodedColumns[colName] = column - } - for _, col := range updateEvent.PreColumns { - colName := updateEvent.TableInfo.ForceGetColumnName(col.ColumnID) - decoded, ok := decodedColumns[colName] - require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) - } + decodedColumns := make(map[string]*model.ColumnData, len(decodedRow.Columns)) + for _, column := range decodedRow.Columns { + colName := decodedRow.TableInfo.ForceGetColumnName(column.ColumnID) + decodedColumns[colName] = column + } + for _, col := range updateEvent.Columns { + colName := updateEvent.TableInfo.ForceGetColumnName(col.ColumnID) + decoded, ok := decodedColumns[colName] + require.True(t, ok) + require.EqualValues(t, col.Value, decoded.Value) + } + + for _, column := range decodedRow.PreColumns { + colName := decodedRow.TableInfo.ForceGetColumnName(column.ColumnID) + decodedColumns[colName] = column + } + for _, col := range updateEvent.PreColumns { + colName := updateEvent.TableInfo.ForceGetColumnName(col.ColumnID) + decoded, ok := decodedColumns[colName] + require.True(t, ok) + require.EqualValues(t, col.Value, decoded.Value) } } } diff --git a/pkg/sink/kafka/claimcheck/claim_check.go b/pkg/sink/kafka/claimcheck/claim_check.go index 488186f2d81..f88a6b10fd0 100644 --- a/pkg/sink/kafka/claimcheck/claim_check.go +++ b/pkg/sink/kafka/claimcheck/claim_check.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/util" @@ -37,10 +36,10 @@ const ( // ClaimCheck manage send message to the claim-check external storage. type ClaimCheck struct { - storage storage.ExternalStorage - rawValue bool + storage storage.ExternalStorage changefeedID model.ChangeFeedID + // metricSendMessageDuration tracks the time duration // cost on send messages to the claim check external storage. metricSendMessageDuration prometheus.Observer @@ -48,23 +47,19 @@ type ClaimCheck struct { } // New return a new ClaimCheck. -func New(ctx context.Context, config *config.LargeMessageHandleConfig, changefeedID model.ChangeFeedID) (*ClaimCheck, error) { - if !config.EnableClaimCheck() { - return nil, nil - } - +func New(ctx context.Context, storageURI string, changefeedID model.ChangeFeedID) (*ClaimCheck, error) { log.Info("claim check enabled, start create the external storage", zap.String("namespace", changefeedID.Namespace), zap.String("changefeed", changefeedID.ID), - zap.String("storageURI", util.MaskSensitiveDataInURI(config.ClaimCheckStorageURI))) + zap.String("storageURI", util.MaskSensitiveDataInURI(storageURI))) start := time.Now() - externalStorage, err := util.GetExternalStorageWithTimeout(ctx, config.ClaimCheckStorageURI, defaultTimeout) + externalStorage, err := util.GetExternalStorageWithTimeout(ctx, storageURI, defaultTimeout) if err != nil { log.Error("create external storage failed", zap.String("namespace", changefeedID.Namespace), zap.String("changefeed", changefeedID.ID), - zap.String("storageURI", util.MaskSensitiveDataInURI(config.ClaimCheckStorageURI)), + zap.String("storageURI", util.MaskSensitiveDataInURI(storageURI)), zap.Duration("duration", time.Since(start)), zap.Error(err)) return nil, errors.Trace(err) @@ -73,32 +68,30 @@ func New(ctx context.Context, config *config.LargeMessageHandleConfig, changefee log.Info("claim-check create the external storage success", zap.String("namespace", changefeedID.Namespace), zap.String("changefeed", changefeedID.ID), - zap.String("storageURI", util.MaskSensitiveDataInURI(config.ClaimCheckStorageURI)), + zap.String("storageURI", util.MaskSensitiveDataInURI(storageURI)), zap.Duration("duration", time.Since(start))) return &ClaimCheck{ changefeedID: changefeedID, storage: externalStorage, - rawValue: config.ClaimCheckRawValue, metricSendMessageDuration: claimCheckSendMessageDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID), metricSendMessageCount: claimCheckSendMessageCount.WithLabelValues(changefeedID.Namespace, changefeedID.ID), }, nil } // WriteMessage write message to the claim check external storage. -func (c *ClaimCheck) WriteMessage(ctx context.Context, key, value []byte, fileName string) (err error) { - if !c.rawValue { - m := common.ClaimCheckMessage{ - Key: key, - Value: value, - } - value, err = json.Marshal(m) - if err != nil { - return errors.Trace(err) - } +func (c *ClaimCheck) WriteMessage(ctx context.Context, key, value []byte, fileName string) error { + m := common.ClaimCheckMessage{ + Key: key, + Value: value, } + data, err := json.Marshal(m) + if err != nil { + return errors.Trace(err) + } + start := time.Now() - err = c.storage.WriteFile(ctx, fileName, value) + err = c.storage.WriteFile(ctx, fileName, data) if err != nil { return errors.Trace(err) } diff --git a/pkg/sink/kafka/claimcheck/claim_check_test.go b/pkg/sink/kafka/claimcheck/claim_check_test.go index 088aa529810..9dd328e3ea9 100644 --- a/pkg/sink/kafka/claimcheck/claim_check_test.go +++ b/pkg/sink/kafka/claimcheck/claim_check_test.go @@ -18,23 +18,15 @@ import ( "testing" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" "github.com/stretchr/testify/require" ) -func TestClaimCheck(t *testing.T) { +func TestClaimCheckFileName(t *testing.T) { ctx := context.Background() - + storageURI := "file:///tmp/abc/" changefeedID := model.DefaultChangeFeedID("test") - largeHandleConfig := config.NewDefaultLargeMessageHandleConfig() - - claimCheck, err := New(ctx, largeHandleConfig, changefeedID) - require.NoError(t, err) - require.Nil(t, claimCheck) - largeHandleConfig.LargeMessageHandleOption = config.LargeMessageHandleOptionClaimCheck - largeHandleConfig.ClaimCheckStorageURI = "file:///tmp/abc/" - claimCheck, err = New(ctx, largeHandleConfig, changefeedID) + claimCheck, err := New(ctx, storageURI, changefeedID) require.NoError(t, err) fileName := claimCheck.FileNameWithPrefix("file.json")