Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu committed Aug 11, 2023
1 parent c82cce5 commit 308c7cb
Show file tree
Hide file tree
Showing 14 changed files with 36 additions and 33 deletions.
10 changes: 5 additions & 5 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,8 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
if c.Sink.TxnAtomicity != nil {
res.Sink.TxnAtomicity = util.AddressOf(config.AtomicityLevel(*c.Sink.TxnAtomicity))
}
if c.Sink.AdvanceTimeout != nil {
res.Sink.AdvanceTimeout = util.AddressOf(*c.Sink.AdvanceTimeout)
if c.Sink.AdvanceTimeoutInSec != nil {
res.Sink.AdvanceTimeoutInSec = util.AddressOf(*c.Sink.AdvanceTimeoutInSec)
}

}
Expand Down Expand Up @@ -640,8 +640,8 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
if cloned.Sink.TxnAtomicity != nil {
res.Sink.TxnAtomicity = util.AddressOf(string(*cloned.Sink.TxnAtomicity))
}
if cloned.Sink.AdvanceTimeout != nil {
res.Sink.AdvanceTimeout = util.AddressOf(*cloned.Sink.AdvanceTimeout)
if cloned.Sink.AdvanceTimeoutInSec != nil {
res.Sink.AdvanceTimeoutInSec = util.AddressOf(*cloned.Sink.AdvanceTimeoutInSec)
}
}
if cloned.Consistent != nil {
Expand Down Expand Up @@ -794,7 +794,7 @@ type SinkConfig struct {
KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"`
MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"`
CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"`
AdvanceTimeout *uint `json:"advance_timeout,omitempty"`
AdvanceTimeoutInSec *uint `json:"advance_timeout,omitempty"`
}

// CSVConfig denotes the csv config
Expand Down
2 changes: 1 addition & 1 deletion cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var defaultAPIConfig = &ReplicaConfig{
EnableKafkaSinkV2: util.AddressOf(false),
OnlyOutputUpdatedColumns: util.AddressOf(false),
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
AdvanceTimeout: util.AddressOf(uint(600)),
AdvanceTimeoutInSec: util.AddressOf(uint(600)),
},
Consistent: &ConsistentConfig{
Level: "none",
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func initProcessor4Test(
"sink": {
"dispatchers": null,
"protocol": "open-protocol",
"advance-timeout": 600
"advance-timeout-in-sec": 600
}
},
"state": "normal",
Expand Down
7 changes: 5 additions & 2 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ type SinkManager struct {
// sinkFactory used to create table sink.
sinkFactory struct {
sync.Mutex
f *factory.SinkFactory
f *factory.SinkFactory
// When every time we want to create a new factory, version will be increased and
// errors will be replaced by a new channel. version is used to distinct different
// sink factories in table sinks.
version uint64
errors chan error
}
Expand Down Expand Up @@ -975,7 +978,7 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
m.sinkMemQuota.Release(span, checkpointTs)
m.redoMemQuota.Release(span, checkpointTs)

stuckCheck := time.Duration(*m.changefeedInfo.Config.Sink.AdvanceTimeout) * time.Second
stuckCheck := time.Duration(*m.changefeedInfo.Config.Sink.AdvanceTimeoutInSec) * time.Second
if advanced.After(time.Unix(0, 0)) && time.Since(advanced) > stuckCheck &&
oracle.GetTimeFromTS(tableSink.getUpperBoundTs()).Sub(oracle.GetTimeFromTS(checkpointTs.Ts)) > stuckCheck {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
Expand Down
4 changes: 2 additions & 2 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1641,8 +1641,8 @@ var doc = `{
"config.SinkConfig": {
"type": "object",
"properties": {
"advance-timeout": {
"description": "AdvanceTimeout is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.",
"advance-timeout-in-sec": {
"description": "AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.",
"type": "integer"
},
"cloud-storage-config": {
Expand Down
4 changes: 2 additions & 2 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1622,8 +1622,8 @@
"config.SinkConfig": {
"type": "object",
"properties": {
"advance-timeout": {
"description": "AdvanceTimeout is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.",
"advance-timeout-in-sec": {
"description": "AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.",
"type": "integer"
},
"cloud-storage-config": {
Expand Down
4 changes: 2 additions & 2 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ definitions:
type: object
config.SinkConfig:
properties:
advance-timeout:
advance-timeout-in-sec:
description: |-
AdvanceTimeout is a duration in second. If a table sink progress hasn't been
AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been
advanced for this given duration, the sink will be canceled and re-established.
type: integer
cloud-storage-config:
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
OnlyOutputUpdatedColumns: util.AddressOf(false),
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
Protocol: util.AddressOf("open-protocol"),
AdvanceTimeout: util.AddressOf(uint(600)),
AdvanceTimeoutInSec: util.AddressOf(uint(600)),
}, cfg.Sink)
}

Expand Down Expand Up @@ -246,7 +246,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
},
OnlyOutputUpdatedColumns: util.AddressOf(false),
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
AdvanceTimeout: util.AddressOf(uint(600)),
AdvanceTimeoutInSec: util.AddressOf(uint(600)),
}, cfg.Sink)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ const (
"claim-check-storage-uri": "",
"claim-check-compression": ""
},
"advance-timeout": 600
"advance-timeout-in-sec": 600
},
"consistent": {
"level": "none",
Expand Down Expand Up @@ -279,7 +279,7 @@ const (
"file-size": 1024,
"output-column-id":false
},
"advance-timeout": 600
"advance-timeout-in-sec": 600
},
"consistent": {
"level": "none",
Expand Down Expand Up @@ -414,7 +414,7 @@ const (
"file-size": 1024,
"output-column-id":false
},
"advance-timeout": 600
"advance-timeout-in-sec": 600
},
"consistent": {
"level": "none",
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var defaultReplicaConfig = &ReplicaConfig{
OnlyOutputUpdatedColumns: util.AddressOf(false),
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
TiDBSourceID: 1,
AdvanceTimeout: util.AddressOf(uint(600)),
AdvanceTimeoutInSec: util.AddressOf(uint(600)),
},
Consistent: &ConsistentConfig{
Level: "none",
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestReplicaConfigMarshal(t *testing.T) {
conf.Sink.OnlyOutputUpdatedColumns = aws.Bool(true)
conf.Sink.DeleteOnlyOutputHandleKeyColumns = aws.Bool(true)
conf.Sink.SafeMode = aws.Bool(true)
conf.Sink.AdvanceTimeout = util.AddressOf(uint(600))
conf.Sink.AdvanceTimeoutInSec = util.AddressOf(uint(600))
conf.Sink.KafkaConfig = &KafkaConfig{
PartitionNum: aws.Int32(1),
ReplicationFactor: aws.Int16(1),
Expand Down
8 changes: 4 additions & 4 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ type SinkConfig struct {
MySQLConfig *MySQLConfig `toml:"mysql-config" json:"mysql-config,omitempty"`
CloudStorageConfig *CloudStorageConfig `toml:"cloud-storage-config" json:"cloud-storage-config,omitempty"`

// AdvanceTimeout is a duration in second. If a table sink progress hasn't been
// AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been
// advanced for this given duration, the sink will be canceled and re-established.
AdvanceTimeout *uint `toml:"advance-timeout" json:"advance-timeout,omitempty"`
AdvanceTimeoutInSec *uint `toml:"advance-timeout-in-sec" json:"advance-timeout-in-sec,omitempty"`
}

// CSVConfig defines a series of configuration items for csv codec.
Expand Down Expand Up @@ -435,8 +435,8 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error {
}
}

if s.AdvanceTimeout != nil && *s.AdvanceTimeout == 0 {
return cerror.ErrSinkInvalidConfig.GenWithStack("advance-timeout should be greater than 0")
if s.AdvanceTimeoutInSec != nil && *s.AdvanceTimeoutInSec == 0 {
return cerror.ErrSinkInvalidConfig.GenWithStack("advance-timeout-in-sec should be greater than 0")
}

return nil
Expand Down
12 changes: 6 additions & 6 deletions pkg/orchestrator/reactor_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func TestChangefeedStateUpdate(t *testing.T) {
Mounter: &config.MounterConfig{WorkerNum: 16},
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Sink: &config.SinkConfig{
Terminator: putil.AddressOf(config.CRLF),
AdvanceTimeout: putil.AddressOf(uint(600)),
Terminator: putil.AddressOf(config.CRLF),
AdvanceTimeoutInSec: putil.AddressOf(uint(600)),
},
Integrity: config.GetDefaultReplicaConfig().Integrity,
},
Expand Down Expand Up @@ -169,8 +169,8 @@ func TestChangefeedStateUpdate(t *testing.T) {
Filter: &config.FilterConfig{Rules: []string{"*.*"}},
Mounter: &config.MounterConfig{WorkerNum: 16},
Sink: &config.SinkConfig{
Terminator: putil.AddressOf(config.CRLF),
AdvanceTimeout: putil.AddressOf(uint(600)),
Terminator: putil.AddressOf(config.CRLF),
AdvanceTimeoutInSec: putil.AddressOf(uint(600)),
},
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Integrity: config.GetDefaultReplicaConfig().Integrity,
Expand Down Expand Up @@ -226,8 +226,8 @@ func TestChangefeedStateUpdate(t *testing.T) {
Filter: &config.FilterConfig{Rules: []string{"*.*"}},
Mounter: &config.MounterConfig{WorkerNum: 16},
Sink: &config.SinkConfig{
Terminator: putil.AddressOf(config.CRLF),
AdvanceTimeout: putil.AddressOf(uint(600)),
Terminator: putil.AddressOf(config.CRLF),
AdvanceTimeoutInSec: putil.AddressOf(uint(600)),
},
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Integrity: config.GetDefaultReplicaConfig().Integrity,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[sink]
advance-timeout = 10
advance-timeout-in-sec = 10

0 comments on commit 308c7cb

Please sign in to comment.