From 308c7cbdaa0a299f98cb4ecabab8b5eeaa4439dd Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 11 Aug 2023 12:04:23 +0800 Subject: [PATCH] address comments Signed-off-by: qupeng --- cdc/api/v2/model.go | 10 +++++----- cdc/api/v2/model_test.go | 2 +- cdc/processor/processor_test.go | 2 +- cdc/processor/sinkmanager/manager.go | 7 +++++-- docs/swagger/docs.go | 4 ++-- docs/swagger/swagger.json | 4 ++-- docs/swagger/swagger.yaml | 4 ++-- pkg/cmd/util/helper_test.go | 4 ++-- pkg/config/config_test_data.go | 6 +++--- pkg/config/replica_config.go | 2 +- pkg/config/replica_config_test.go | 2 +- pkg/config/sink.go | 8 ++++---- pkg/orchestrator/reactor_state_test.go | 12 ++++++------ .../hang_sink_suicide/conf/changefeed.toml | 2 +- 14 files changed, 36 insertions(+), 33 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 3c4f2af56a9..a35379fd489 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -409,8 +409,8 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( if c.Sink.TxnAtomicity != nil { res.Sink.TxnAtomicity = util.AddressOf(config.AtomicityLevel(*c.Sink.TxnAtomicity)) } - if c.Sink.AdvanceTimeout != nil { - res.Sink.AdvanceTimeout = util.AddressOf(*c.Sink.AdvanceTimeout) + if c.Sink.AdvanceTimeoutInSec != nil { + res.Sink.AdvanceTimeoutInSec = util.AddressOf(*c.Sink.AdvanceTimeoutInSec) } } @@ -640,8 +640,8 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { if cloned.Sink.TxnAtomicity != nil { res.Sink.TxnAtomicity = util.AddressOf(string(*cloned.Sink.TxnAtomicity)) } - if cloned.Sink.AdvanceTimeout != nil { - res.Sink.AdvanceTimeout = util.AddressOf(*cloned.Sink.AdvanceTimeout) + if cloned.Sink.AdvanceTimeoutInSec != nil { + res.Sink.AdvanceTimeoutInSec = util.AddressOf(*cloned.Sink.AdvanceTimeoutInSec) } } if cloned.Consistent != nil { @@ -794,7 +794,7 @@ type SinkConfig struct { KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"` MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"` CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"` - AdvanceTimeout *uint `json:"advance_timeout,omitempty"` + AdvanceTimeoutInSec *uint `json:"advance_timeout,omitempty"` } // CSVConfig denotes the csv config diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index d19f0cd5866..cd91e7d77d5 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -58,7 +58,7 @@ var defaultAPIConfig = &ReplicaConfig{ EnableKafkaSinkV2: util.AddressOf(false), OnlyOutputUpdatedColumns: util.AddressOf(false), DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false), - AdvanceTimeout: util.AddressOf(uint(600)), + AdvanceTimeoutInSec: util.AddressOf(uint(600)), }, Consistent: &ConsistentConfig{ Level: "none", diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 0e4e2ae1990..a9687dc5416 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -135,7 +135,7 @@ func initProcessor4Test( "sink": { "dispatchers": null, "protocol": "open-protocol", - "advance-timeout": 600 + "advance-timeout-in-sec": 600 } }, "state": "normal", diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 629aa5c3b2a..c32a1cf53da 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -86,7 +86,10 @@ type SinkManager struct { // sinkFactory used to create table sink. sinkFactory struct { sync.Mutex - f *factory.SinkFactory + f *factory.SinkFactory + // When every time we want to create a new factory, version will be increased and + // errors will be replaced by a new channel. version is used to distinct different + // sink factories in table sinks. version uint64 errors chan error } @@ -975,7 +978,7 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats { m.sinkMemQuota.Release(span, checkpointTs) m.redoMemQuota.Release(span, checkpointTs) - stuckCheck := time.Duration(*m.changefeedInfo.Config.Sink.AdvanceTimeout) * time.Second + stuckCheck := time.Duration(*m.changefeedInfo.Config.Sink.AdvanceTimeoutInSec) * time.Second if advanced.After(time.Unix(0, 0)) && time.Since(advanced) > stuckCheck && oracle.GetTimeFromTS(tableSink.getUpperBoundTs()).Sub(oracle.GetTimeFromTS(checkpointTs.Ts)) > stuckCheck { log.Warn("Table checkpoint is stuck too long, will restart the sink backend", diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index e5c96958ed7..48da8ef17a4 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1641,8 +1641,8 @@ var doc = `{ "config.SinkConfig": { "type": "object", "properties": { - "advance-timeout": { - "description": "AdvanceTimeout is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.", + "advance-timeout-in-sec": { + "description": "AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.", "type": "integer" }, "cloud-storage-config": { diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 0ae51dfa35d..16185b85cf2 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1622,8 +1622,8 @@ "config.SinkConfig": { "type": "object", "properties": { - "advance-timeout": { - "description": "AdvanceTimeout is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.", + "advance-timeout-in-sec": { + "description": "AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.", "type": "integer" }, "cloud-storage-config": { diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 6ca45417fc1..214659731ff 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -200,9 +200,9 @@ definitions: type: object config.SinkConfig: properties: - advance-timeout: + advance-timeout-in-sec: description: |- - AdvanceTimeout is a duration in second. If a table sink progress hasn't been + AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been advanced for this given duration, the sink will be canceled and re-established. type: integer cloud-storage-config: diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index 42d86114802..6de17e79740 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -213,7 +213,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) { OnlyOutputUpdatedColumns: util.AddressOf(false), DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false), Protocol: util.AddressOf("open-protocol"), - AdvanceTimeout: util.AddressOf(uint(600)), + AdvanceTimeoutInSec: util.AddressOf(uint(600)), }, cfg.Sink) } @@ -246,7 +246,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) { }, OnlyOutputUpdatedColumns: util.AddressOf(false), DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false), - AdvanceTimeout: util.AddressOf(uint(600)), + AdvanceTimeoutInSec: util.AddressOf(uint(600)), }, cfg.Sink) } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 7a54b8a3876..0657527dc3d 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -65,7 +65,7 @@ const ( "claim-check-storage-uri": "", "claim-check-compression": "" }, - "advance-timeout": 600 + "advance-timeout-in-sec": 600 }, "consistent": { "level": "none", @@ -279,7 +279,7 @@ const ( "file-size": 1024, "output-column-id":false }, - "advance-timeout": 600 + "advance-timeout-in-sec": 600 }, "consistent": { "level": "none", @@ -414,7 +414,7 @@ const ( "file-size": 1024, "output-column-id":false }, - "advance-timeout": 600 + "advance-timeout-in-sec": 600 }, "consistent": { "level": "none", diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index f6e744007e5..6b0bc5843a3 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -68,7 +68,7 @@ var defaultReplicaConfig = &ReplicaConfig{ OnlyOutputUpdatedColumns: util.AddressOf(false), DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false), TiDBSourceID: 1, - AdvanceTimeout: util.AddressOf(uint(600)), + AdvanceTimeoutInSec: util.AddressOf(uint(600)), }, Consistent: &ConsistentConfig{ Level: "none", diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index 8bd4658b9db..5b2f4d92823 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -66,7 +66,7 @@ func TestReplicaConfigMarshal(t *testing.T) { conf.Sink.OnlyOutputUpdatedColumns = aws.Bool(true) conf.Sink.DeleteOnlyOutputHandleKeyColumns = aws.Bool(true) conf.Sink.SafeMode = aws.Bool(true) - conf.Sink.AdvanceTimeout = util.AddressOf(uint(600)) + conf.Sink.AdvanceTimeoutInSec = util.AddressOf(uint(600)) conf.Sink.KafkaConfig = &KafkaConfig{ PartitionNum: aws.Int32(1), ReplicationFactor: aws.Int16(1), diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 60650a0cb17..974491d4b57 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -159,9 +159,9 @@ type SinkConfig struct { MySQLConfig *MySQLConfig `toml:"mysql-config" json:"mysql-config,omitempty"` CloudStorageConfig *CloudStorageConfig `toml:"cloud-storage-config" json:"cloud-storage-config,omitempty"` - // AdvanceTimeout is a duration in second. If a table sink progress hasn't been + // AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been // advanced for this given duration, the sink will be canceled and re-established. - AdvanceTimeout *uint `toml:"advance-timeout" json:"advance-timeout,omitempty"` + AdvanceTimeoutInSec *uint `toml:"advance-timeout-in-sec" json:"advance-timeout-in-sec,omitempty"` } // CSVConfig defines a series of configuration items for csv codec. @@ -435,8 +435,8 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { } } - if s.AdvanceTimeout != nil && *s.AdvanceTimeout == 0 { - return cerror.ErrSinkInvalidConfig.GenWithStack("advance-timeout should be greater than 0") + if s.AdvanceTimeoutInSec != nil && *s.AdvanceTimeoutInSec == 0 { + return cerror.ErrSinkInvalidConfig.GenWithStack("advance-timeout-in-sec should be greater than 0") } return nil diff --git a/pkg/orchestrator/reactor_state_test.go b/pkg/orchestrator/reactor_state_test.go index 10f19a81d30..db6c9ffb4e8 100644 --- a/pkg/orchestrator/reactor_state_test.go +++ b/pkg/orchestrator/reactor_state_test.go @@ -118,8 +118,8 @@ func TestChangefeedStateUpdate(t *testing.T) { Mounter: &config.MounterConfig{WorkerNum: 16}, Scheduler: config.GetDefaultReplicaConfig().Scheduler, Sink: &config.SinkConfig{ - Terminator: putil.AddressOf(config.CRLF), - AdvanceTimeout: putil.AddressOf(uint(600)), + Terminator: putil.AddressOf(config.CRLF), + AdvanceTimeoutInSec: putil.AddressOf(uint(600)), }, Integrity: config.GetDefaultReplicaConfig().Integrity, }, @@ -169,8 +169,8 @@ func TestChangefeedStateUpdate(t *testing.T) { Filter: &config.FilterConfig{Rules: []string{"*.*"}}, Mounter: &config.MounterConfig{WorkerNum: 16}, Sink: &config.SinkConfig{ - Terminator: putil.AddressOf(config.CRLF), - AdvanceTimeout: putil.AddressOf(uint(600)), + Terminator: putil.AddressOf(config.CRLF), + AdvanceTimeoutInSec: putil.AddressOf(uint(600)), }, Scheduler: config.GetDefaultReplicaConfig().Scheduler, Integrity: config.GetDefaultReplicaConfig().Integrity, @@ -226,8 +226,8 @@ func TestChangefeedStateUpdate(t *testing.T) { Filter: &config.FilterConfig{Rules: []string{"*.*"}}, Mounter: &config.MounterConfig{WorkerNum: 16}, Sink: &config.SinkConfig{ - Terminator: putil.AddressOf(config.CRLF), - AdvanceTimeout: putil.AddressOf(uint(600)), + Terminator: putil.AddressOf(config.CRLF), + AdvanceTimeoutInSec: putil.AddressOf(uint(600)), }, Scheduler: config.GetDefaultReplicaConfig().Scheduler, Integrity: config.GetDefaultReplicaConfig().Integrity, diff --git a/tests/integration_tests/hang_sink_suicide/conf/changefeed.toml b/tests/integration_tests/hang_sink_suicide/conf/changefeed.toml index 68454e892ee..9161a8511f6 100644 --- a/tests/integration_tests/hang_sink_suicide/conf/changefeed.toml +++ b/tests/integration_tests/hang_sink_suicide/conf/changefeed.toml @@ -1,2 +1,2 @@ [sink] -advance-timeout = 10 +advance-timeout-in-sec = 10