Skip to content

Commit

Permalink
Merge branch 'master' into dev-merge-replica-config
Browse files Browse the repository at this point in the history
  • Loading branch information
yumchina committed Aug 23, 2023
2 parents 44ea6fd + 004bc4c commit 4b2cb15
Show file tree
Hide file tree
Showing 30 changed files with 1,296 additions and 169 deletions.
1 change: 0 additions & 1 deletion cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(

// fill replicaConfig
replicaCfg := cfg.ReplicaConfig.ToInternalReplicaConfig()

// verify replicaConfig
sinkURIParsed, err := url.Parse(cfg.SinkURI)
if err != nil {
Expand Down
38 changes: 38 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,17 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
}
}

var glueSchemaRegistryConfig *config.GlueSchemaRegistryConfig
if c.Sink.KafkaConfig.GlueSchemaRegistryConfig != nil {
glueSchemaRegistryConfig = &config.GlueSchemaRegistryConfig{
RegistryName: c.Sink.KafkaConfig.GlueSchemaRegistryConfig.RegistryName,
Region: c.Sink.KafkaConfig.GlueSchemaRegistryConfig.Region,
AccessKey: c.Sink.KafkaConfig.GlueSchemaRegistryConfig.AccessKey,
SecretAccessKey: c.Sink.KafkaConfig.GlueSchemaRegistryConfig.SecretAccessKey,
Token: c.Sink.KafkaConfig.GlueSchemaRegistryConfig.Token,
}
}

kafkaConfig = &config.KafkaConfig{
PartitionNum: c.Sink.KafkaConfig.PartitionNum,
ReplicationFactor: c.Sink.KafkaConfig.ReplicationFactor,
Expand Down Expand Up @@ -386,6 +397,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
InsecureSkipVerify: c.Sink.KafkaConfig.InsecureSkipVerify,
CodecConfig: codeConfig,
LargeMessageHandle: largeMessageHandle,
GlueSchemaRegistryConfig: glueSchemaRegistryConfig,
}
}
var mysqlConfig *config.MySQLConfig
Expand Down Expand Up @@ -582,6 +594,17 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
}
}

var glueSchemaRegistryConfig *GlueSchemaRegistryConfig
if cloned.Sink.KafkaConfig.GlueSchemaRegistryConfig != nil {
glueSchemaRegistryConfig = &GlueSchemaRegistryConfig{
RegistryName: cloned.Sink.KafkaConfig.GlueSchemaRegistryConfig.RegistryName,
Region: cloned.Sink.KafkaConfig.GlueSchemaRegistryConfig.Region,
AccessKey: cloned.Sink.KafkaConfig.GlueSchemaRegistryConfig.AccessKey,
SecretAccessKey: cloned.Sink.KafkaConfig.GlueSchemaRegistryConfig.SecretAccessKey,
Token: cloned.Sink.KafkaConfig.GlueSchemaRegistryConfig.Token,
}
}

kafkaConfig = &KafkaConfig{
PartitionNum: cloned.Sink.KafkaConfig.PartitionNum,
ReplicationFactor: cloned.Sink.KafkaConfig.ReplicationFactor,
Expand Down Expand Up @@ -618,6 +641,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
InsecureSkipVerify: cloned.Sink.KafkaConfig.InsecureSkipVerify,
CodecConfig: codeConfig,
LargeMessageHandle: largeMessageHandle,
GlueSchemaRegistryConfig: glueSchemaRegistryConfig,
}
}
var mysqlConfig *MySQLConfig
Expand Down Expand Up @@ -1122,6 +1146,7 @@ type KafkaConfig struct {
InsecureSkipVerify *bool `json:"insecure_skip_verify,omitempty"`
CodecConfig *CodecConfig `json:"codec_config,omitempty"`
LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"`
GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `json:"glue_schema_registry_config,omitempty"`
}

// MySQLConfig represents a MySQL sink configuration
Expand Down Expand Up @@ -1159,3 +1184,16 @@ type ChangefeedStatus struct {
LastError *RunningError `json:"last_error,omitempty"`
LastWarning *RunningError `json:"last_warning,omitempty"`
}

// GlueSchemaRegistryConfig represents a glue schema registry configuration
type GlueSchemaRegistryConfig struct {
// Name of the schema registry
RegistryName string `json:"registry_name"`
// Region of the schema registry
Region string `json:"region"`
// AccessKey of the schema registry
AccessKey string `json:"access_key,omitempty"`
// SecretAccessKey of the schema registry
SecretAccessKey string `json:"secret_access_key,omitempty"`
Token string `json:"token,omitempty"`
}
10 changes: 10 additions & 0 deletions cdc/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -257,6 +258,15 @@ func (o *controllerImpl) calculateGCSafepoint(state *orchestrator.GlobalReactorS
forceUpdateMap[upstreamID] = nil
}
}

// check if the upstream has a changefeed, if not we should update the gc safepoint
_ = o.upstreamManager.Visit(func(up *upstream.Upstream) error {
if _, exist := minCheckpointTsMap[up.ID]; !exist {
ts := up.PDClock.CurrentTime()
minCheckpointTsMap[up.ID] = oracle.GoTimeToTS(ts)
}
return nil
})
return minCheckpointTsMap, forceUpdateMap
}

Expand Down
17 changes: 16 additions & 1 deletion cdc/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
Expand Down Expand Up @@ -86,7 +87,6 @@ func TestUpdateGCSafePoint(t *testing.T) {
mockPDClient.UpdateServiceGCSafePointFunc = func(
ctx context.Context, serviceID string, ttl int64, safePoint uint64,
) (uint64, error) {
t.Fatal("must not update")
return 0, nil
}
changefeedID1 := model.DefaultChangeFeedID("test-changefeed1")
Expand Down Expand Up @@ -183,6 +183,7 @@ func TestCalculateGCSafepointTs(t *testing.T) {
expectMinTsMap := make(map[uint64]uint64)
expectForceUpdateMap := make(map[uint64]interface{})
o := &controllerImpl{changefeeds: make(map[model.ChangeFeedID]*orchestrator.ChangefeedReactorState)}
o.upstreamManager = upstream.NewManager4Test(nil)

stateMap := []model.FeedState{
model.StateNormal, model.StateStopped,
Expand Down Expand Up @@ -248,6 +249,20 @@ func TestCalculateGCSafepointTs(t *testing.T) {
require.Equal(t, expectForceUpdateMap, forceUpdateMap)
}

func TestCalculateGCSafepointTsNoChangefeed(t *testing.T) {
state := orchestrator.NewGlobalStateForTest(etcd.DefaultCDCClusterID)
expectForceUpdateMap := make(map[uint64]interface{})
o := &controllerImpl{changefeeds: make(map[model.ChangeFeedID]*orchestrator.ChangefeedReactorState)}
o.upstreamManager = upstream.NewManager4Test(nil)
up, err := o.upstreamManager.GetDefaultUpstream()
require.Nil(t, err)
up.PDClock = pdutil.NewClock4Test()

minCheckpoinTsMap, forceUpdateMap := o.calculateGCSafepoint(state)
require.Equal(t, 1, len(minCheckpoinTsMap))
require.Equal(t, expectForceUpdateMap, forceUpdateMap)
}

func TestFixChangefeedState(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(false)
controller4Test, state, tester := createController4Test(ctx, t)
Expand Down
2 changes: 1 addition & 1 deletion cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,7 @@ func TestE2ERowLevelChecksum(t *testing.T) {
msg := avroEncoder.Build()
require.Len(t, msg, 1)

schemaM, err := avro.NewAvroSchemaManager(
schemaM, err := avro.NewConfluentSchemaManager(
ctx, "http://127.0.0.1:8081", nil)
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
return err
}
case config.ProtocolAvro:
schemaM, err := avro.NewAvroSchemaManager(ctx, c.option.schemaRegistryURI, nil)
schemaM, err := avro.NewConfluentSchemaManager(ctx, c.option.schemaRegistryURI, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
54 changes: 54 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,30 @@ var doc = `{
}
}
},
"config.GlueSchemaRegistryConfig": {
"type": "object",
"properties": {
"access-key": {
"description": "AccessKey of the schema registry",
"type": "string"
},
"region": {
"description": "Region of the schema registry",
"type": "string"
},
"registry-name": {
"description": "Name of the schema registry",
"type": "string"
},
"secret-access-key": {
"description": "SecretAccessKey of the schema registry",
"type": "string"
},
"token": {
"type": "string"
}
}
},
"config.KafkaConfig": {
"type": "object",
"properties": {
Expand All @@ -1467,6 +1491,9 @@ var doc = `{
"enable-tls": {
"type": "boolean"
},
"glue-schema-registry-config": {
"$ref": "#/definitions/config.GlueSchemaRegistryConfig"
},
"insecure-skip-verify": {
"type": "boolean"
},
Expand Down Expand Up @@ -2455,6 +2482,30 @@ var doc = `{
}
}
},
"v2.GlueSchemaRegistryConfig": {
"type": "object",
"properties": {
"access_key": {
"description": "AccessKey of the schema registry",
"type": "string"
},
"region": {
"description": "Region of the schema registry",
"type": "string"
},
"registry_name": {
"description": "Name of the schema registry",
"type": "string"
},
"secret_access_key": {
"description": "SecretAccessKey of the schema registry",
"type": "string"
},
"token": {
"type": "string"
}
}
},
"v2.IntegrityConfig": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -2490,6 +2541,9 @@ var doc = `{
"enable_tls": {
"type": "boolean"
},
"glue_schema_registry_config": {
"$ref": "#/definitions/v2.GlueSchemaRegistryConfig"
},
"insecure_skip_verify": {
"type": "boolean"
},
Expand Down
54 changes: 54 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,30 @@
}
}
},
"config.GlueSchemaRegistryConfig": {
"type": "object",
"properties": {
"access-key": {
"description": "AccessKey of the schema registry",
"type": "string"
},
"region": {
"description": "Region of the schema registry",
"type": "string"
},
"registry-name": {
"description": "Name of the schema registry",
"type": "string"
},
"secret-access-key": {
"description": "SecretAccessKey of the schema registry",
"type": "string"
},
"token": {
"type": "string"
}
}
},
"config.KafkaConfig": {
"type": "object",
"properties": {
Expand All @@ -1448,6 +1472,9 @@
"enable-tls": {
"type": "boolean"
},
"glue-schema-registry-config": {
"$ref": "#/definitions/config.GlueSchemaRegistryConfig"
},
"insecure-skip-verify": {
"type": "boolean"
},
Expand Down Expand Up @@ -2436,6 +2463,30 @@
}
}
},
"v2.GlueSchemaRegistryConfig": {
"type": "object",
"properties": {
"access_key": {
"description": "AccessKey of the schema registry",
"type": "string"
},
"region": {
"description": "Region of the schema registry",
"type": "string"
},
"registry_name": {
"description": "Name of the schema registry",
"type": "string"
},
"secret_access_key": {
"description": "SecretAccessKey of the schema registry",
"type": "string"
},
"token": {
"type": "string"
}
}
},
"v2.IntegrityConfig": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -2471,6 +2522,9 @@
"enable_tls": {
"type": "boolean"
},
"glue_schema_registry_config": {
"$ref": "#/definitions/v2.GlueSchemaRegistryConfig"
},
"insecure_skip_verify": {
"type": "boolean"
},
Expand Down
Loading

0 comments on commit 4b2cb15

Please sign in to comment.