Skip to content

Commit

Permalink
changefeed (ticdc): fix mysql sink config contains protocol error (#6896
Browse files Browse the repository at this point in the history
) (#6901)

close #6874
  • Loading branch information
ti-chi-bot authored Aug 26, 2022
1 parent 834e6c8 commit 8201ed9
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 79 deletions.
117 changes: 66 additions & 51 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,15 @@ func (info *ChangeFeedInfo) FixIncompatible() {
}

if creatorVersionGate.ChangefeedAcceptUnknownProtocols() {
log.Info("Start fixing incompatible changefeed sink protocol", zap.String("changefeed", info.String()))
info.fixSinkProtocol()
log.Info("Fix incompatibility changefeed sink protocol completed", zap.String("changefeed", info.String()))
log.Info("Start fixing incompatible changefeed MQ sink protocol", zap.String("changefeed", info.String()))
info.fixMQSinkProtocol()
log.Info("Fix incompatibility changefeed MQ sink protocol completed", zap.String("changefeed", info.String()))
}

if creatorVersionGate.ChangefeedAcceptProtocolInMysqlSinURI() {
log.Info("Start fixing incompatible changefeed sink uri", zap.String("changefeed", info.String()))
info.fixMySQLSinkProtocol()
log.Info("Fix incompatibility changefeed sink uri completed", zap.String("changefeed", info.String()))
}
}

Expand Down Expand Up @@ -323,11 +329,8 @@ func (info *ChangeFeedInfo) fixState() {
}
}

// fixSinkProtocol attempts to fix protocol incompatible.
// We no longer support the acceptance of protocols that are not known.
// The ones that were already accepted need to be fixed.
func (info *ChangeFeedInfo) fixSinkProtocol() {
sinkURIParsed, err := url.Parse(info.SinkURI)
func (info *ChangeFeedInfo) fixMySQLSinkProtocol() {
uri, err := url.Parse(info.SinkURI)
if err != nil {
log.Warn("parse sink URI failed", zap.Error(err))
// SAFETY: It is safe to ignore this unresolvable sink URI here,
Expand All @@ -337,39 +340,36 @@ func (info *ChangeFeedInfo) fixSinkProtocol() {
// which is easier to troubleshoot than reporting the error directly in the bootstrap process.
return
}
rawQuery := sinkURIParsed.Query()
protocolStr := rawQuery.Get(config.ProtocolKey)

fixSinkURI := func(newProtocolStr string) {
oldRawQuery := sinkURIParsed.RawQuery
newRawQuery := rawQuery.Encode()
log.Info("handle incompatible protocol from sink URI",
zap.String("oldUriQuery", oldRawQuery),
zap.String("fixedUriQuery", newRawQuery))

sinkURIParsed.RawQuery = newRawQuery
fixedSinkURI := sinkURIParsed.String()
info.SinkURI = fixedSinkURI
info.Config.Sink.Protocol = newProtocolStr
}

// fix mysql sink
scheme := sinkURIParsed.Scheme
if !config.IsMqScheme(scheme) {
if protocolStr != "" || info.Config.Sink.Protocol != "" {
maskedSinkURI, _ := util.MaskSinkURI(info.SinkURI)
log.Warn("sink URI or sink config contains protocol, but scheme is not mq",
zap.String("sinkURI", maskedSinkURI),
zap.String("protocol", protocolStr),
zap.Any("sinkConfig", info.Config.Sink))
// always set protocol of mysql sink to ""
rawQuery.Del(config.ProtocolKey)
fixSinkURI("")
}

if config.IsMqScheme(uri.Scheme) {
return
}

query := uri.Query()
protocolStr := query.Get(config.ProtocolKey)
if protocolStr != "" || info.Config.Sink.Protocol != "" {
maskedSinkURI, _ := util.MaskSinkURI(info.SinkURI)
log.Warn("sink URI or sink config contains protocol, but scheme is not mq",
zap.String("sinkURI", maskedSinkURI),
zap.String("protocol", protocolStr),
zap.Any("sinkConfig", info.Config.Sink))
// always set protocol of mysql sink to ""
query.Del(config.ProtocolKey)
info.updateSinkURIAndConfigProtocol(uri, "", query)
}
}

func (info *ChangeFeedInfo) fixMQSinkProtocol() {
uri, err := url.Parse(info.SinkURI)
if err != nil {
log.Warn("parse sink URI failed", zap.Error(err))
return
}

if !config.IsMqScheme(uri.Scheme) {
return
}

// fix MQ sink
needsFix := func(protocolStr string) bool {
var protocol config.Protocol
err = protocol.FromString(protocolStr)
Expand All @@ -380,23 +380,38 @@ func (info *ChangeFeedInfo) fixSinkProtocol() {
return err != nil || protocolStr == config.ProtocolDefault.String()
}

openProtocolStr := config.ProtocolOpen.String()
query := uri.Query()
protocol := query.Get(config.ProtocolKey)
openProtocol := config.ProtocolOpen.String()

// The sinkURI always has a higher priority.
if protocolStr != "" {
if needsFix(protocolStr) {
rawQuery.Set(config.ProtocolKey, openProtocolStr)
fixSinkURI(openProtocolStr)
}
} else {
if needsFix(info.Config.Sink.Protocol) {
log.Info("handle incompatible protocol from sink config",
zap.String("oldProtocol", info.Config.Sink.Protocol),
zap.String("fixedProtocol", openProtocolStr))
info.Config.Sink.Protocol = openProtocolStr
}
if protocol != "" && needsFix(protocol) {
query.Set(config.ProtocolKey, openProtocol)
info.updateSinkURIAndConfigProtocol(uri, openProtocol, query)
return
}

if needsFix(info.Config.Sink.Protocol) {
log.Info("handle incompatible protocol from sink config",
zap.String("oldProtocol", info.Config.Sink.Protocol),
zap.String("fixedProtocol", openProtocol))
info.Config.Sink.Protocol = openProtocol
}
}

func (info *ChangeFeedInfo) updateSinkURIAndConfigProtocol(uri *url.URL, newProtocol string, newQuery url.Values) {
oldRawQuery := uri.RawQuery
newRawQuery := newQuery.Encode()
log.Info("handle incompatible protocol from sink URI",
zap.String("oldUriQuery", oldRawQuery),
zap.String("fixedUriQuery", newQuery.Encode()))

uri.RawQuery = newRawQuery
fixedSinkURI := uri.String()
info.SinkURI = fixedSinkURI
info.Config.Sink.Protocol = newProtocol
}

// HasFastFailError returns true if the error in changefeed is fast-fail
func (info *ChangeFeedInfo) HasFastFailError() bool {
if info.Error == nil {
Expand Down
83 changes: 61 additions & 22 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,13 +564,12 @@ func TestFixState(t *testing.T) {
}
}

func TestFixSinkProtocol(t *testing.T) {
func TestFixMysqlSinkProtocol(t *testing.T) {
t.Parallel()

// Test fixing the protocol in the configuration.
configTestCases := []struct {
info *ChangeFeedInfo
expectedProtocol config.Protocol
expectedProtocol string
}{
{
info: &ChangeFeedInfo{
Expand All @@ -579,8 +578,62 @@ func TestFixSinkProtocol(t *testing.T) {
Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()},
},
},
expectedProtocol: config.ProtocolOpen,
expectedProtocol: "",
},
{
info: &ChangeFeedInfo{
SinkURI: "mysql://root:[email protected]:3306/",
Config: &config.ReplicaConfig{
Sink: &config.SinkConfig{Protocol: "whatever"},
},
},
expectedProtocol: "",
},
}

for _, tc := range configTestCases {
tc.info.fixMySQLSinkProtocol()
require.Equal(t, tc.expectedProtocol, tc.info.Config.Sink.Protocol)
}

sinkURITestCases := []struct {
info *ChangeFeedInfo
expectedSinkURI string
}{
{
info: &ChangeFeedInfo{
SinkURI: "mysql://root:[email protected]:3306/?protocol=open-protocol",
Config: &config.ReplicaConfig{
Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()},
},
},
expectedSinkURI: "mysql://root:[email protected]:3306/",
},
{
info: &ChangeFeedInfo{
SinkURI: "mysql://root:[email protected]:3306/?protocol=default",
Config: &config.ReplicaConfig{
Sink: &config.SinkConfig{Protocol: ""},
},
},
expectedSinkURI: "mysql://root:[email protected]:3306/",
},
}

for _, tc := range sinkURITestCases {
tc.info.fixMySQLSinkProtocol()
require.Equal(t, tc.expectedSinkURI, tc.info.SinkURI)
}
}

func TestFixMQSinkProtocol(t *testing.T) {
t.Parallel()

// Test fixing the protocol in the configuration.
configTestCases := []struct {
info *ChangeFeedInfo
expectedProtocol config.Protocol
}{
{
info: &ChangeFeedInfo{
SinkURI: "kafka://127.0.0.1:9092/ticdc-test2",
Expand Down Expand Up @@ -611,32 +664,18 @@ func TestFixSinkProtocol(t *testing.T) {
}

for _, tc := range configTestCases {
tc.info.fixSinkProtocol()
tc.info.fixMQSinkProtocol()
var protocol config.Protocol
err := protocol.FromString(tc.info.Config.Sink.Protocol)
if strings.Contains(tc.info.SinkURI, "kafka") {
require.Nil(t, err)
require.Equal(t, tc.expectedProtocol, protocol)
} else {
require.Error(t, err)
require.Contains(t, err.Error(), "ErrMQSinkUnknownProtocol")
}
require.Nil(t, err)
require.Equal(t, tc.expectedProtocol, protocol)
}

// Test fixing the protocol in SinkURI.
sinkURITestCases := []struct {
info *ChangeFeedInfo
expectedSinkURI string
}{
{
info: &ChangeFeedInfo{
SinkURI: "mysql://root:[email protected]:3306/",
Config: &config.ReplicaConfig{
Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()},
},
},
expectedSinkURI: "mysql://root:[email protected]:3306/",
},
{
info: &ChangeFeedInfo{
SinkURI: "kafka://127.0.0.1:9092/ticdc-test2",
Expand Down Expand Up @@ -685,7 +724,7 @@ func TestFixSinkProtocol(t *testing.T) {
}

for _, tc := range sinkURITestCases {
tc.info.fixSinkProtocol()
tc.info.fixMQSinkProtocol()
require.Equal(t, tc.expectedSinkURI, tc.info.SinkURI)
}
}
Expand Down
27 changes: 21 additions & 6 deletions pkg/version/creator_version_gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ func NewCreatorVersionGate(version string) *CreatorVersionGate {

// ChangefeedStateFromAdminJob determines if admin job is the state
// of changefeed based on the version of the creator.
func (f *CreatorVersionGate) ChangefeedStateFromAdminJob() bool {
func (g *CreatorVersionGate) ChangefeedStateFromAdminJob() bool {
// Introduced in https://github.com/pingcap/tiflow/pull/1341.
// The changefeed before it was introduced was using the old owner.
if f.version == "" {
if g.version == "" {
return true
}

creatorVersion := semver.New(removeVAndHash(f.version))
creatorVersion := semver.New(removeVAndHash(g.version))
for _, version := range changefeedStateFromAdminJobVersions {
// NOTICE: To compare against the same major version.
if creatorVersion.Major == version.Major &&
Expand All @@ -67,13 +67,28 @@ func (f *CreatorVersionGate) ChangefeedStateFromAdminJob() bool {

// ChangefeedAcceptUnknownProtocols determines whether to accept
// unknown protocols based on the creator's version.
func (f *CreatorVersionGate) ChangefeedAcceptUnknownProtocols() bool {
func (g *CreatorVersionGate) ChangefeedAcceptUnknownProtocols() bool {
// Introduced in https://github.com/pingcap/ticdc/pull/1341.
// So it was supported at the time.
if f.version == "" {
if g.version == "" {
return true
}

creatorVersion := semver.New(removeVAndHash(f.version))
creatorVersion := semver.New(removeVAndHash(g.version))
return creatorVersion.LessThan(changefeedAcceptUnknownProtocolsVersion)
}

var changefeedAcceptProtocolInMysqlSinURI = *semver.New("6.1.1")

// ChangefeedAcceptProtocolInMysqlSinURI determines whether to accept
// protocol in mysql sink uri or configure based on the creator's version.
func (g *CreatorVersionGate) ChangefeedAcceptProtocolInMysqlSinURI() bool {
// Introduced in https://github.com/pingcap/ticdc/pull/1341.
// So it was supported at the time.
if g.version == "" {
return true
}

creatorVersion := semver.New(removeVAndHash(g.version))
return creatorVersion.LessThan(changefeedAcceptProtocolInMysqlSinURI)
}
63 changes: 63 additions & 0 deletions pkg/version/creator_version_gate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,66 @@ func TestChangefeedAcceptUnknownProtocols(t *testing.T) {
require.Equal(t, tc.expected, creatorVersionGate.ChangefeedAcceptUnknownProtocols())
}
}

func TestChangefeedAcceptProtocolInMysqlSinURI(t *testing.T) {
t.Parallel()

testCases := []struct {
creatorVersion string
expected bool
}{
{
creatorVersion: "",
expected: true,
},
{
creatorVersion: "4.0.12",
expected: true,
},
{
creatorVersion: "4.0.14",
expected: true,
},
{
creatorVersion: "4.0.15",
expected: true,
},
{
creatorVersion: "5.0.0",
expected: true,
},
{
creatorVersion: "5.0.1",
expected: true,
},
{
creatorVersion: "5.1.0",
expected: true,
},
{
creatorVersion: "5.2.0-nightly",
expected: true,
},
{
creatorVersion: "5.3.0",
expected: true,
},
{
creatorVersion: "6.1.0",
expected: true,
},
{
creatorVersion: "6.1.1",
expected: false,
},
{
creatorVersion: "6.2.0",
expected: false,
},
}

for _, tc := range testCases {
creatorVersionGate := CreatorVersionGate{version: tc.creatorVersion}
require.Equal(t, tc.expected, creatorVersionGate.ChangefeedAcceptProtocolInMysqlSinURI())
}
}

0 comments on commit 8201ed9

Please sign in to comment.