Skip to content

Commit

Permalink
sink(cdc): clean backends if table sink is stuck too long (#9527) (#9548
Browse files Browse the repository at this point in the history
)

close #9542
  • Loading branch information
ti-chi-bot authored Aug 15, 2023
1 parent db5fb4d commit 8622904
Show file tree
Hide file tree
Showing 25 changed files with 406 additions and 192 deletions.
8 changes: 8 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/integrity"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/util"
)

// EmptyResponse return empty {} to http client
Expand Down Expand Up @@ -369,6 +370,9 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
CloudStorageConfig: cloudStorageConfig,
SafeMode: c.Sink.SafeMode,
}
if c.Sink.AdvanceTimeoutInSec != nil {
res.Sink.AdvanceTimeoutInSec = util.AddressOf(*c.Sink.AdvanceTimeoutInSec)
}
}
if c.Mounter != nil {
res.Mounter = &config.MounterConfig{
Expand Down Expand Up @@ -572,6 +576,9 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
CloudStorageConfig: cloudStorageConfig,
SafeMode: cloned.Sink.SafeMode,
}
if cloned.Sink.AdvanceTimeoutInSec != nil {
res.Sink.AdvanceTimeoutInSec = util.AddressOf(*cloned.Sink.AdvanceTimeoutInSec)
}
}
if cloned.Consistent != nil {
res.Consistent = &ConsistentConfig{
Expand Down Expand Up @@ -722,6 +729,7 @@ type SinkConfig struct {
KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"`
MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"`
CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"`
AdvanceTimeoutInSec *uint `json:"advance_timeout,omitempty"`
}

// CSVConfig denotes the csv config
Expand Down
2 changes: 2 additions & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -53,6 +54,7 @@ var defaultAPIConfig = &ReplicaConfig{
DateSeparator: config.DateSeparatorDay.String(),
EnablePartitionSeparator: true,
EnableKafkaSinkV2: false,
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
},
Consistent: &ConsistentConfig{
Level: "none",
Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ func initProcessor4Test(
},
"sink": {
"dispatchers": null,
"protocol": "open-protocol"
"protocol": "open-protocol",
"advance-timeout-in-sec": 150
}
},
"state": "normal",
Expand Down
Loading

0 comments on commit 8622904

Please sign in to comment.