From d779dc857c3732f738abbef4ea066687b8db3bf7 Mon Sep 17 00:00:00 2001 From: yumchina Date: Fri, 21 Jul 2023 11:28:09 +0800 Subject: [PATCH 01/11] add pulsar ddl producer add pulsar ddl sink update deps --- cdc/sink/ddlsink/factory/factory.go | 2 + .../mq/ddlproducer/pulsar_ddl_producer.go | 254 ++++++++++++++++++ .../ddlproducer/pulsar_ddl_producer_test.go | 226 ++++++++++++++++ cdc/sink/ddlsink/mq/pulsar_ddl_sink.go | 128 +++++++++ cdc/sink/ddlsink/mq/pulsar_ddl_sink_test.go | 48 ++++ 5 files changed, 658 insertions(+) create mode 100644 cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go create mode 100644 cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer_test.go create mode 100644 cdc/sink/ddlsink/mq/pulsar_ddl_sink.go create mode 100644 cdc/sink/ddlsink/mq/pulsar_ddl_sink_test.go diff --git a/cdc/sink/ddlsink/factory/factory.go b/cdc/sink/ddlsink/factory/factory.go index b000b5daac3..8265efc8416 100644 --- a/cdc/sink/ddlsink/factory/factory.go +++ b/cdc/sink/ddlsink/factory/factory.go @@ -59,6 +59,8 @@ func New( return mysql.NewDDLSink(ctx, changefeedID, sinkURI, cfg) case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme: return cloudstorage.NewDDLSink(ctx, changefeedID, sinkURI, cfg) + case sink.PulsarScheme: + return mq.NewPulsarDDLSink(ctx, sinkURI, cfg, ddlproducer.NewPulsarProducer) default: return nil, cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", scheme) diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go new file mode 100644 index 00000000000..dcf405ae06e --- /dev/null +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go @@ -0,0 +1,254 @@ +package ddlproducer + +import ( + "context" + "encoding/json" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/contextutil" + "github.com/pingcap/tiflow/cdc/model" + pulsarMetric "github.com/pingcap/tiflow/cdc/sinkv2/metrics/mq/pulsar" + "github.com/pingcap/tiflow/pkg/config" + "go.uber.org/zap" + "strings" + "sync" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" +) + +// Assert DDLEventSink implementation +var _ DDLProducer = (*pulsarProducers)(nil) + +type pulsarProducers struct { + client pulsar.Client + pConfig *pulsarConfig.PulsarConfig + defaultTopicName string + // support multiple topics + producers map[string]pulsar.Producer + producersMutex sync.RWMutex + id model.ChangeFeedID +} + +// SyncBroadcastMessage pulsar consume all partitions +func (p *pulsarProducers) SyncBroadcastMessage(ctx context.Context, topic string, + totalPartitionsNum int32, message *common.Message) error { + // call SyncSendMessage + return p.SyncSendMessage(ctx, topic, totalPartitionsNum, message) +} + +// SyncSendMessage sends a message +// partitionNum is not used,pulsar consume all partitions +func (p *pulsarProducers) SyncSendMessage(ctx context.Context, topic string, + partitionNum int32, message *common.Message) error { + p.wrapperSchemaAndTopic(message) + + pulsarMetric.IncPublishedDDLEventCountMetric(topic, p.id.ID, message) + producer, err := p.GetProducerByTopic(topic) + if err != nil { + log.L().Error("ddl SyncSendMessage GetProducerByTopic fail", zap.Error(err)) + pulsarMetric.IncPublishedDDLEventCountMetricFail(topic, p.id.ID, message) + return err + } + + data := &pulsar.ProducerMessage{ + Payload: message.Value, + Key: p.pConfig.MessageKey, + } + mID, err := producer.Send(ctx, data) + if err != nil { + log.L().Error("ddl producer send fail", zap.Error(err)) + pulsarMetric.IncPublishedDDLEventCountMetricFail(producer.Topic(), p.id.ID, message) + return err + } + pulsarMetric.IncPublishedDDLEventCountMetricSuccess(producer.Topic(), p.id.ID, message) + + log.L().Debug("pulsarProducers SyncSendMessage success", + zap.Any("mID", mID), zap.String("topic", topic)) + + return nil +} + +// NewPulsarProducer creates a pulsar producer +func NewPulsarProducer( + ctx context.Context, + pConfig *pulsarConfig.PulsarConfig, + client pulsar.Client, +) (DDLProducer, error) { + + changefeedID := contextutil.ChangefeedIDFromCtx(ctx) + log.Info("Starting pulsar DDL producer ...", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID)) + + topicName := pConfig.GetDefaultTopicName() + + defaultProducer, err := newProducer(pConfig, client, topicName) + if err != nil { + return nil, err + } + producers := make(map[string]pulsar.Producer) + producers[topicName] = defaultProducer + return &pulsarProducers{ + client: client, + pConfig: pConfig, + producers: producers, + defaultTopicName: topicName, + id: changefeedID, + }, nil +} + +// newProducer creates a pulsar producer +// One topic is used by one producer +func newProducer( + pConfig *pulsarConfig.PulsarConfig, + client pulsar.Client, + topicName string, +) (pulsar.Producer, error) { + + po := pulsar.ProducerOptions{ + Topic: topicName, + } + if pConfig.BatchingMaxMessages > 0 { + po.BatchingMaxMessages = pConfig.BatchingMaxMessages + } + if pConfig.BatchingMaxPublishDelay > 0 { + po.BatchingMaxPublishDelay = pConfig.BatchingMaxPublishDelay + } + if len(pConfig.Compression) > 0 { + switch strings.ToLower(pConfig.Compression) { + case "lz4": + po.CompressionType = pulsar.LZ4 + case "zlib": + po.CompressionType = pulsar.ZLib + case "zstd": + po.CompressionType = pulsar.ZSTD + } + } + + if pConfig.SendTimeout > 0 { + po.SendTimeout = time.Millisecond * time.Duration(pConfig.SendTimeout) + } + + switch pConfig.ProducerMode { + case pulsarConfig.ProducerModeSingle: + // set default message key '0' + // if not set default value, will be random sent to multiple partitions + if len(pConfig.MessageKey) == 0 { + pConfig.MessageKey = "0" + } + case pulsarConfig.ProducerModeBatch, "": + // default ,message key is empty + } + + producer, err := client.CreateProducer(po) + if err != nil { + return nil, err + } + + log.L().Info("create pulsar producer successfully", + zap.String("topicName", topicName)) + + return producer, nil +} + +func (p *pulsarProducers) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error) { + + p.producersMutex.RLock() + producer, ok := p.producers[topicName] + p.producersMutex.RUnlock() + if !ok { // create a new producer for the topicName + p.producersMutex.Lock() + defer p.producersMutex.Unlock() + + producer, ok = p.producers[topicName] + if ok { + return producer, nil + } + + producer, err = newProducer(p.pConfig, p.client, topicName) + if err != nil { + return nil, err + } + p.producers[topicName] = producer + } + + return producer, nil +} + +// Close close all producers +func (p *pulsarProducers) Close() { + + for topic, producer := range p.producers { + producer.Close() + p.closeProducersMapByTopic(topic) + } + p.client.Close() +} + +// Flush waits for all the messages in the async producer to be sent to Pulsar. +// Notice: this method is not thread-safe. +// Do not try to call AsyncSendMessage and Flush functions in different threads, +// otherwise Flush will not work as expected. It may never finish or flush the wrong message. +// Because inflight will be modified by mistake. +func (p *pulsarProducers) Flush(ctx context.Context) error { + done := make(chan struct{}, 1) + p.producersMutex.Lock() + for _, pd := range p.producers { + pd.Flush() + } + p.producersMutex.Unlock() + done <- struct{}{} + select { + case <-ctx.Done(): + return ctx.Err() + case <-done: + return nil + } + +} + +func (p *pulsarProducers) closeProducersMapByTopic(topicName string) error { + + p.producersMutex.Lock() + defer p.producersMutex.Unlock() + producer, _ := p.producers[topicName] + if producer == nil { + return nil + } + p.producers[topicName] = nil + + return nil +} + +func (p *pulsarProducers) wrapperSchemaAndTopic(m *common.Message) { + if m.Schema == nil { + if m.Protocol == config.ProtocolMaxwell { + mx := &maxwellMessage{} + err := json.Unmarshal(m.Value, mx) + if err != nil { + log.Error("unmarshal maxwell message failed", zap.Error(err)) + return + } + if len(mx.Database) > 0 { + m.Schema = &mx.Database + } + if len(mx.Table) > 0 { + m.Table = &mx.Table + } + } + if m.Protocol == config.ProtocolCanal { // canal protocol set multi schemas in one topic + m.Schema = str2Pointer("multi_schema") + } + } +} + +type maxwellMessage struct { + Database string `json:"database"` + Table string `json:"table"` +} + +func str2Pointer(str string) *string { + return &str +} diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer_test.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer_test.go new file mode 100644 index 00000000000..f1273889591 --- /dev/null +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer_test.go @@ -0,0 +1,226 @@ +package ddlproducer + +import ( + "context" + "fmt" + "math/rand" + url2 "net/url" + "os" + "sync" + "testing" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/pingcap/tiflow/cdc/model" + pulsarMetric "github.com/pingcap/tiflow/cdc/sink/metrics/mq/pulsar" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" +) + +func newPulsarConfig() *pulsarConfig.PulsarConfig { + sinkUrl := os.Getenv("sink-uri") + fmt.Println(sinkUrl) + if len(sinkUrl) <= 0 { + panic("sink-uri is empty: " + sinkUrl) + } + u := &url2.URL{} + u, err := u.Parse(sinkUrl) + if err != nil { + panic(err) + } + c := &pulsarConfig.PulsarConfig{} + err = c.Apply(u) + if err != nil { + panic(err) + } + return c +} + +func TestNewPulsarProducer(t *testing.T) { + config := newPulsarConfig() + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: config.URL, + Authentication: pulsar.NewAuthenticationToken(config.AuthenticationToken), + }) + if err != nil { + t.Errorf("new client fail %+v", err) + return + } + + ctx, fc := context.WithTimeout(context.Background(), time.Second*5) + + type args struct { + ctx context.Context + client pulsar.Client + pulsarConfig *pulsarConfig.PulsarConfig + errCh chan error + } + tests := []struct { + name string + args args + want DDLProducer + wantErr bool + }{ + { + name: "New", + args: args{ + ctx: ctx, + client: client, + pulsarConfig: config, + errCh: make(chan error), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + producer, err := NewPulsarProducer(tt.args.ctx, tt.args.pulsarConfig, tt.args.client) + if err != nil { + t.Errorf("NewPulsarDMLProducer() error = %v, wantErr %v", err, tt.wantErr) + return + } + defer producer.Close() + + select { + case e := <-tt.args.errCh: + t.Logf("errChan %+v", e) + case <-ctx.Done(): + fc() + t.Logf("Done") + } + }) + } +} + +func Test_pulsarProducers_SyncSendMessage(t *testing.T) { + + config := newPulsarConfig() + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: config.URL, + Authentication: pulsar.NewAuthenticationToken(config.AuthenticationToken), + }) + if err != nil { + t.Errorf("new client fail %+v", err) + return + } + + ctx, _ := context.WithTimeout(context.Background(), time.Second*5) + + type args struct { + ctx context.Context + topic string + partition int32 + message *common.Message + client pulsar.Client + pulsarConfig *pulsarConfig.PulsarConfig + errCh chan error + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "New pulsar client and AsyncSendMessage", + args: args{ + ctx: ctx, + topic: "test", + partition: 1, + message: &common.Message{ + Key: []byte("key"), + Value: []byte("this value for test input data"), + Callback: func() { + fmt.Println("callback: message send success!") + }, + }, + client: client, + pulsarConfig: config, + errCh: make(chan error), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p, err := NewPulsarProducer(tt.args.ctx, tt.args.pulsarConfig, tt.args.client) + if err != nil { + t.Errorf("NewPulsarDMLProducer() error = %v, wantErr %v", err, tt.wantErr) + return + } + //defer p.Close() + if err := p.SyncSendMessage(tt.args.ctx, tt.args.topic, tt.args.partition, tt.args.message); err != nil { + t.Errorf("AsyncSendMessage() error = %v, wantErr %v", err, tt.wantErr) + } + time.Sleep(time.Second * 1) + p.Close() + client.Close() + }) + } + +} + +func TestDDLIncreaseMetric(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + index := 0 + m := &common.Message{ + Type: model.MessageTypeDDL, + } + + for { + x := rand.Int31() % 8 + time.Sleep(time.Second * time.Duration(x)) + if index%3 == 1 { + pulsarMetric.IncPublishedDDLEventCountMetricSuccess("testtopic", + "test", m) + } + pulsarMetric.IncPublishedDDLEventCountMetric("testtopic", + "test", m) + + pulsarMetric.IncPublishedDDLEventCountMetric("testtopic", + "test", m) + + pulsarMetric.IncPublishedDDLEventCountMetric("noTable", + "test", m) + + pulsarMetric.IncPublishedDDLEventCountMetric("noSchema", + "test", m) + + index++ + } + +} + +func Test_pulsarProducers_closeProducersMapByTopic(t *testing.T) { + type fields struct { + client pulsar.Client + pConfig *pulsarConfig.PulsarConfig + defaultTopicName string + producers map[string]pulsar.Producer + producersMutex sync.RWMutex + changefeedID model.ChangeFeedID + } + type args struct { + topicName string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &pulsarProducers{ + client: tt.fields.client, + pConfig: tt.fields.pConfig, + defaultTopicName: tt.fields.defaultTopicName, + producers: tt.fields.producers, + producersMutex: tt.fields.producersMutex, + id: tt.fields.changefeedID, + } + if err := p.closeProducersMapByTopic(tt.args.topicName); (err != nil) != tt.wantErr { + t.Errorf("closeProducersMapByTopic() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go new file mode 100644 index 00000000000..ed3edc96c63 --- /dev/null +++ b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go @@ -0,0 +1,128 @@ +package mq + +import ( + "context" + "net/url" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/contextutil" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer" + "github.com/pingcap/tiflow/cdc/sink/mq/dispatcher" + "github.com/pingcap/tiflow/cdc/sink/mq/manager" + "github.com/pingcap/tiflow/cdc/sink/util" + pulsarMetric "github.com/pingcap/tiflow/cdc/sinkv/metrics/mq/pulsar" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" + "go.uber.org/zap" +) + +// NewPulsarDDLSink will verify the config and create a Pulsar DDL Sink. +func NewPulsarDDLSink( + ctx context.Context, + sinkURI *url.URL, + replicaConfig *config.ReplicaConfig, + producerCreator ddlproducer.PulsarFactory, +) (_ *ddlSink, err error) { + + changefeedID := contextutil.ChangefeedIDFromCtx(ctx) + log.Info("Starting pulsar DDL producer ...", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID)) + + defaultTopic, err := util.GetTopic(sinkURI) + if err != nil { + return nil, errors.Trace(err) + } + + protocol, err := util.GetProtocol(replicaConfig.Sink.Protocol) + if err != nil { + return nil, errors.Trace(err) + } + + pConfig, err := pulsarConfig.NewPulsarConfig(sinkURI) + if err != nil { + return nil, errors.Trace(err) + } + + log.Info("Try to create a DDL sink producer", zap.Any("pulsarConfig", pConfig)) + + client, err := createPulsarClient(pConfig, changefeedID) + if err != nil { + log.Error("DDL sink producer client create fail", zap.Error(err)) + return nil, cerror.WrapError(cerror.ErrPulsarNewClient, err) + } + + start := time.Now() + p, err := producerCreator(ctx, pConfig, client) + log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) + if err != nil { + return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) + } + + // NewEventRouter + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic) + if err != nil { + return nil, errors.Trace(err) + } + + encoderConfig, err := util.GetEncoderConfig(sinkURI, protocol, replicaConfig, + pConfig.MaxMessageBytes) + if err != nil { + return nil, errors.Trace(err) + } + + topicManager, err := manager.NewPulsarTopicManager(pConfig, client) + if err != nil { + return nil, errors.Trace(err) + } + + s, err := newDDLSink(ctx, p, topicManager, eventRouter, encoderConfig) + if err != nil { + return nil, errors.Trace(err) + } + + return s, nil +} + +func createPulsarClient(config *pulsarConfig.PulsarConfig, changefeedID model.ChangeFeedID) (pulsar.Client, error) { + + op := pulsar.ClientOptions{ + URL: config.URL, + MetricsRegisterer: pulsarMetric.GetMetricRegistry(), + CustomMetricsLabels: map[string]string{ + "changefeed": changefeedID.ID, + "namespace": changefeedID.Namespace, + }, + } + + if len(config.AuthenticationToken) > 0 { + op.Authentication = pulsar.NewAuthenticationToken(config.AuthenticationToken) + } else if len(config.TokenFromFile) > 0 { + op.Authentication = pulsar.NewAuthenticationTokenFromFile(config.TokenFromFile) + } else if len(config.BasicUserName) > 0 && len(config.BasicPassword) > 0 { + authentication, err := pulsar.NewAuthenticationBasic(config.BasicUserName, config.BasicPassword) + if err != nil { + return nil, err + } + op.Authentication = authentication + } + + if config.ConnectionTimeout > 0 { + op.ConnectionTimeout = config.ConnectionTimeout + } + if config.OperationTimeout > 0 { + op.OperationTimeout = config.OperationTimeout + } + + pulsarClient, err := pulsar.NewClient(op) + if err != nil { + log.L().Error("Cannot connect to pulsar", zap.Error(err)) + return nil, err + } + return pulsarClient, nil +} diff --git a/cdc/sink/ddlsink/mq/pulsar_ddl_sink_test.go b/cdc/sink/ddlsink/mq/pulsar_ddl_sink_test.go new file mode 100644 index 00000000000..ceec971cc3c --- /dev/null +++ b/cdc/sink/ddlsink/mq/pulsar_ddl_sink_test.go @@ -0,0 +1,48 @@ +package mq + +import ( + "context" + "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer" + "github.com/pingcap/tiflow/pkg/config" + "net/url" + "testing" +) + +func TestNewPulsarDDLSink(t *testing.T) { + type args struct { + ctx context.Context + sinkURI *url.URL + replicaConfig *config.ReplicaConfig + producerCreator ddlproducer.Factory + } + tests := []struct { + name string + args args + want *ddlSink + wantErr bool + }{ + { + name: "test normal new pulsar ddl sink", + args: args{ + ctx: context.Background(), + sinkURI: &url.URL{}, + replicaConfig: &config.ReplicaConfig{}, + producerCreator: nil, + }, + want: &ddlSink{}, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewPulsarDDLSink(tt.args.ctx, tt.args.sinkURI, + tt.args.replicaConfig, ddlproducer.NewPulsarProducer) + if err != nil { + t.Errorf("NewPulsarDDLSink() error = %v, wantErr %v", err, tt.wantErr) + return + } + t.Logf("got sink = %+v", got) + + }) + } +} From bd704f44e60043a371fa4512a1ce79b62d1ffd15 Mon Sep 17 00:00:00 2001 From: yumchina Date: Fri, 21 Jul 2023 11:38:01 +0800 Subject: [PATCH 02/11] setup authentication for pulsar --- cdc/sink/ddlsink/mq/pulsar_ddl_sink.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go index ed3edc96c63..0442e6a8952 100644 --- a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go @@ -2,6 +2,7 @@ package mq import ( "context" + "fmt" "net/url" "time" @@ -126,3 +127,16 @@ func createPulsarClient(config *pulsarConfig.PulsarConfig, changefeedID model.Ch } return pulsarClient, nil } + +func setupAuthentication(config *pulsarConfig.PulsarConfig) (pulsar.Authentication, error) { + if len(config.AuthenticationToken) > 0 { + return pulsar.NewAuthenticationToken(config.AuthenticationToken) + } else if len(config.TokenFromFile) > 0 { + return pulsar.NewAuthenticationTokenFromFile(config.TokenFromFile) + } else if len(config.BasicUserName) > 0 && len(config.BasicPassword) > 0 { + return pulsar.NewAuthenticationBasic(config.BasicUserName, config.BasicPassword) + } else if len(config.OAuth2) > 0 { + return pulsar.NewAuthenticationBasic(config.BasicUserName, config.BasicPassword) + } + return nil, fmt.Errorf("no authentication method found") +} From 4a9d4c3d9940e2292eb7274d36d011980f5013fd Mon Sep 17 00:00:00 2001 From: yumchina Date: Wed, 26 Jul 2023 11:26:04 +0800 Subject: [PATCH 03/11] add write ddl events to pulsar add test file and mock create pulsar client and producer fix Makefile for unit test with pulsar. --- Makefile | 6 + cdc/sink/ddlsink/factory/factory.go | 5 +- .../ddlsink/mq/ddlproducer/ddl_producer.go | 6 + .../ddlproducer/pulsar_ddl_mock_producer.go | 112 +++++++++ .../mq/ddlproducer/pulsar_ddl_producer.go | 104 ++++---- .../ddlproducer/pulsar_ddl_producer_test.go | 231 +++++------------- cdc/sink/ddlsink/mq/pulsar_ddl_sink.go | 104 +++----- cdc/sink/ddlsink/mq/pulsar_ddl_sink_test.go | 176 ++++++++++--- cdc/sink/dmlsink/mq/manager/pulsar_manager.go | 99 ++++++++ .../dmlsink/mq/manager/pulsar_manager_mock.go | 39 +++ .../dmlsink/mq/manager/pulsar_manager_test.go | 61 +++++ pkg/leakutil/leak_helper.go | 4 + pkg/sink/codec/common/message.go | 27 ++ pkg/sink/pulsar/factory.go | 69 ++++++ pkg/sink/pulsar/factory_mock.go | 27 ++ pkg/sink/pulsar/factory_test.go | 27 ++ 16 files changed, 774 insertions(+), 323 deletions(-) create mode 100644 cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go create mode 100644 cdc/sink/dmlsink/mq/manager/pulsar_manager.go create mode 100644 cdc/sink/dmlsink/mq/manager/pulsar_manager_mock.go create mode 100644 cdc/sink/dmlsink/mq/manager/pulsar_manager_test.go create mode 100644 pkg/sink/pulsar/factory.go create mode 100644 pkg/sink/pulsar/factory_mock.go create mode 100644 pkg/sink/pulsar/factory_test.go diff --git a/Makefile b/Makefile index 47a0cb22870..967806d1633 100644 --- a/Makefile +++ b/Makefile @@ -29,6 +29,12 @@ CURDIR := $(shell pwd) path_to_add := $(addsuffix /bin,$(subst :,/bin:,$(GOPATH))) export PATH := $(CURDIR)/bin:$(CURDIR)/tools/bin:$(path_to_add):$(PATH) +# DBUS_SESSION_BUS_ADDRESS pulsar client use dbus to detect the connection status, +# but it will not exit when the connection is closed. +# I try to use leak_helper to detect goroutine leak,but it does not work. +# https://github.com/benthosdev/benthos/issues/1184 suggest to use environment variable to disable dbus. +export DBUS_SESSION_BUS_ADDRESS := /dev/null + SHELL := /usr/bin/env bash TEST_DIR := /tmp/tidb_cdc_test diff --git a/cdc/sink/ddlsink/factory/factory.go b/cdc/sink/ddlsink/factory/factory.go index 8265efc8416..5d9c767c6a5 100644 --- a/cdc/sink/ddlsink/factory/factory.go +++ b/cdc/sink/ddlsink/factory/factory.go @@ -25,11 +25,13 @@ import ( "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq" "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer" "github.com/pingcap/tiflow/cdc/sink/ddlsink/mysql" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/kafka" kafkav2 "github.com/pingcap/tiflow/pkg/sink/kafka/v2" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" "github.com/pingcap/tiflow/pkg/util" ) @@ -60,7 +62,8 @@ func New( case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme: return cloudstorage.NewDDLSink(ctx, changefeedID, sinkURI, cfg) case sink.PulsarScheme: - return mq.NewPulsarDDLSink(ctx, sinkURI, cfg, ddlproducer.NewPulsarProducer) + return mq.NewPulsarDDLSink(ctx, changefeedID, sinkURI, cfg, manager.NewPulsarTopicManager, + pulsarConfig.NewCreatorFactory, ddlproducer.NewPulsarProducer) default: return nil, cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", scheme) diff --git a/cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go index 8b377f55d50..466b82fe331 100644 --- a/cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go +++ b/cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go @@ -16,9 +16,11 @@ package ddlproducer import ( "context" + "github.com/apache/pulsar-client-go/pulsar" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/kafka" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" ) // DDLProducer is the interface for DDL message producer. @@ -38,3 +40,7 @@ type DDLProducer interface { // Factory is a function to create a producer. type Factory func(ctx context.Context, changefeedID model.ChangeFeedID, syncProducer kafka.SyncProducer) DDLProducer + +// PulsarFactory is a function to create a pulsar producer. +type PulsarFactory func(ctx context.Context, changefeedID model.ChangeFeedID, + pConfig *pulsarConfig.Config, client pulsar.Client) (DDLProducer, error) diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go new file mode 100644 index 00000000000..23df797cb19 --- /dev/null +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go @@ -0,0 +1,112 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddlproducer + +import ( + "context" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" +) + +// Assert DDLEventSink implementation +var _ DDLProducer = (*PulsarMockProducers)(nil) + +// PulsarMockProducers is a mock pulsar producer +type PulsarMockProducers struct { + events map[string][]*pulsar.ProducerMessage +} + +// SyncBroadcastMessage pulsar consume all partitions +func (p *PulsarMockProducers) SyncBroadcastMessage(ctx context.Context, topic string, + totalPartitionsNum int32, message *common.Message, +) error { + // call SyncSendMessage + + log.L().Info("pulsarProducers SyncBroadcastMessage in") + return p.SyncSendMessage(ctx, topic, totalPartitionsNum, message) +} + +// SyncSendMessage sends a message +// partitionNum is not used,pulsar consume all partitions +func (p *PulsarMockProducers) SyncSendMessage(ctx context.Context, topic string, + partitionNum int32, message *common.Message, +) error { + data := &pulsar.ProducerMessage{ + Payload: message.Value, + Key: message.GetPartitionKey(), + } + // fmt.Println("pulsarProducers SyncSendMessage", data) + p.events[topic] = append(p.events[topic], data) + + return nil +} + +// NewMockPulsarProducer creates a pulsar producer +func NewMockPulsarProducer( + ctx context.Context, + changefeedID model.ChangeFeedID, + pConfig *pulsarConfig.Config, + client pulsar.Client, +) (*PulsarMockProducers, error) { + return &PulsarMockProducers{ + events: map[string][]*pulsar.ProducerMessage{}, + }, nil +} + +// NewMockPulsarProducerDDL creates a pulsar producer for DDLProducer +func NewMockPulsarProducerDDL( + ctx context.Context, + changefeedID model.ChangeFeedID, + pConfig *pulsarConfig.Config, + client pulsar.Client, +) (DDLProducer, error) { + return NewMockPulsarProducer(ctx, changefeedID, pConfig, client) +} + +// GetProducerByTopic returns a producer by topic name +func (p *PulsarMockProducers) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error) { + return producer, nil +} + +// Close close all producers +func (p *PulsarMockProducers) Close() { + p.events = make(map[string][]*pulsar.ProducerMessage) +} + +// Flush waits for all the messages in the async producer to be sent to Pulsar. +// Notice: this method is not thread-safe. +// Do not try to call AsyncSendMessage and Flush functions in different threads, +// otherwise Flush will not work as expected. It may never finish or flush the wrong message. +// Because inflight will be modified by mistake. +func (p *PulsarMockProducers) Flush(ctx context.Context) error { + return nil +} + +// GetAllEvents returns the events received by the mock producer. +func (p *PulsarMockProducers) GetAllEvents() []*pulsar.ProducerMessage { + var events []*pulsar.ProducerMessage + for _, v := range p.events { + events = append(events, v...) + } + return events +} + +// GetEvents returns the event filtered by the key. +func (p *PulsarMockProducers) GetEvents(topic string) []*pulsar.ProducerMessage { + return p.events[topic] +} diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go index dcf405ae06e..d9cccccc27d 100644 --- a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go @@ -1,29 +1,40 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package ddlproducer import ( "context" "encoding/json" - "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/contextutil" - "github.com/pingcap/tiflow/cdc/model" - pulsarMetric "github.com/pingcap/tiflow/cdc/sinkv2/metrics/mq/pulsar" - "github.com/pingcap/tiflow/pkg/config" - "go.uber.org/zap" - "strings" "sync" - "time" "github.com/apache/pulsar-client-go/pulsar" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + // pulsarMetric "github.com/pingcap/tiflow/cdc/sink/metrics/mq/pulsar" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/codec/common" pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" + "go.uber.org/zap" ) // Assert DDLEventSink implementation var _ DDLProducer = (*pulsarProducers)(nil) +// pulsarProducers is a producer for pulsar type pulsarProducers struct { client pulsar.Client - pConfig *pulsarConfig.PulsarConfig + pConfig *pulsarConfig.Config defaultTopicName string // support multiple topics producers map[string]pulsar.Producer @@ -33,7 +44,8 @@ type pulsarProducers struct { // SyncBroadcastMessage pulsar consume all partitions func (p *pulsarProducers) SyncBroadcastMessage(ctx context.Context, topic string, - totalPartitionsNum int32, message *common.Message) error { + totalPartitionsNum int32, message *common.Message, +) error { // call SyncSendMessage return p.SyncSendMessage(ctx, topic, totalPartitionsNum, message) } @@ -41,28 +53,29 @@ func (p *pulsarProducers) SyncBroadcastMessage(ctx context.Context, topic string // SyncSendMessage sends a message // partitionNum is not used,pulsar consume all partitions func (p *pulsarProducers) SyncSendMessage(ctx context.Context, topic string, - partitionNum int32, message *common.Message) error { + partitionNum int32, message *common.Message, +) error { p.wrapperSchemaAndTopic(message) - pulsarMetric.IncPublishedDDLEventCountMetric(topic, p.id.ID, message) + // pulsarMetric.IncPublishedDDLEventCountMetric(topic, p.id.ID, message) producer, err := p.GetProducerByTopic(topic) if err != nil { log.L().Error("ddl SyncSendMessage GetProducerByTopic fail", zap.Error(err)) - pulsarMetric.IncPublishedDDLEventCountMetricFail(topic, p.id.ID, message) + // pulsarMetric.IncPublishedDDLEventCountMetricFail(topic, p.id.ID, message) return err } data := &pulsar.ProducerMessage{ Payload: message.Value, - Key: p.pConfig.MessageKey, + Key: message.GetPartitionKey(), } mID, err := producer.Send(ctx, data) if err != nil { log.L().Error("ddl producer send fail", zap.Error(err)) - pulsarMetric.IncPublishedDDLEventCountMetricFail(producer.Topic(), p.id.ID, message) + // pulsarMetric.IncPublishedDDLEventCountMetricFail(producer.Topic(), p.id.ID, message) return err } - pulsarMetric.IncPublishedDDLEventCountMetricSuccess(producer.Topic(), p.id.ID, message) + // pulsarMetric.IncPublishedDDLEventCountMetricSuccess(producer.Topic(), p.id.ID, message) log.L().Debug("pulsarProducers SyncSendMessage success", zap.Any("mID", mID), zap.String("topic", topic)) @@ -73,11 +86,10 @@ func (p *pulsarProducers) SyncSendMessage(ctx context.Context, topic string, // NewPulsarProducer creates a pulsar producer func NewPulsarProducer( ctx context.Context, - pConfig *pulsarConfig.PulsarConfig, + changefeedID model.ChangeFeedID, + pConfig *pulsarConfig.Config, client pulsar.Client, ) (DDLProducer, error) { - - changefeedID := contextutil.ChangefeedIDFromCtx(ctx) log.Info("Starting pulsar DDL producer ...", zap.String("namespace", changefeedID.Namespace), zap.String("changefeed", changefeedID.ID)) @@ -102,11 +114,10 @@ func NewPulsarProducer( // newProducer creates a pulsar producer // One topic is used by one producer func newProducer( - pConfig *pulsarConfig.PulsarConfig, + pConfig *pulsarConfig.Config, client pulsar.Client, topicName string, ) (pulsar.Producer, error) { - po := pulsar.ProducerOptions{ Topic: topicName, } @@ -116,30 +127,12 @@ func newProducer( if pConfig.BatchingMaxPublishDelay > 0 { po.BatchingMaxPublishDelay = pConfig.BatchingMaxPublishDelay } - if len(pConfig.Compression) > 0 { - switch strings.ToLower(pConfig.Compression) { - case "lz4": - po.CompressionType = pulsar.LZ4 - case "zlib": - po.CompressionType = pulsar.ZLib - case "zstd": - po.CompressionType = pulsar.ZSTD - } + if pConfig.CompressionType > 0 { + po.CompressionType = pConfig.CompressionType + po.CompressionLevel = pulsar.Default } - if pConfig.SendTimeout > 0 { - po.SendTimeout = time.Millisecond * time.Duration(pConfig.SendTimeout) - } - - switch pConfig.ProducerMode { - case pulsarConfig.ProducerModeSingle: - // set default message key '0' - // if not set default value, will be random sent to multiple partitions - if len(pConfig.MessageKey) == 0 { - pConfig.MessageKey = "0" - } - case pulsarConfig.ProducerModeBatch, "": - // default ,message key is empty + po.SendTimeout = pConfig.SendTimeout } producer, err := client.CreateProducer(po) @@ -147,14 +140,14 @@ func newProducer( return nil, err } - log.L().Info("create pulsar producer successfully", - zap.String("topicName", topicName)) + log.L().Info("create pulsar producer success", + zap.String("topic:", topicName)) return producer, nil } +// GetProducerByTopic get producer by topicName func (p *pulsarProducers) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error) { - p.producersMutex.RLock() producer, ok := p.producers[topicName] p.producersMutex.RUnlock() @@ -179,7 +172,6 @@ func (p *pulsarProducers) GetProducerByTopic(topicName string) (producer pulsar. // Close close all producers func (p *pulsarProducers) Close() { - for topic, producer := range p.producers { producer.Close() p.closeProducersMapByTopic(topic) @@ -206,22 +198,20 @@ func (p *pulsarProducers) Flush(ctx context.Context) error { case <-done: return nil } - } -func (p *pulsarProducers) closeProducersMapByTopic(topicName string) error { - +// closeProducersMapByTopic close producer by topicName +func (p *pulsarProducers) closeProducersMapByTopic(topicName string) { p.producersMutex.Lock() defer p.producersMutex.Unlock() - producer, _ := p.producers[topicName] - if producer == nil { - return nil + _, ok := p.producers[topicName] + if ok { + delete(p.producers, topicName) + return } - p.producers[topicName] = nil - - return nil } +// wrapperSchemaAndTopic wrapper schema and topic func (p *pulsarProducers) wrapperSchemaAndTopic(m *common.Message) { if m.Schema == nil { if m.Protocol == config.ProtocolMaxwell { @@ -244,11 +234,13 @@ func (p *pulsarProducers) wrapperSchemaAndTopic(m *common.Message) { } } +// maxwellMessage is the message format of maxwell type maxwellMessage struct { Database string `json:"database"` Table string `json:"table"` } +// str2Pointer returns the pointer of the string. func str2Pointer(str string) *string { return &str } diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer_test.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer_test.go index f1273889591..698c5b004af 100644 --- a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer_test.go +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer_test.go @@ -1,117 +1,89 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package ddlproducer import ( "context" - "fmt" - "math/rand" - url2 "net/url" - "os" - "sync" "testing" - "time" - "github.com/apache/pulsar-client-go/pulsar" "github.com/pingcap/tiflow/cdc/model" - pulsarMetric "github.com/pingcap/tiflow/cdc/sink/metrics/mq/pulsar" + "github.com/pingcap/tiflow/pkg/leakutil" "github.com/pingcap/tiflow/pkg/sink/codec/common" pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" + "github.com/stretchr/testify/require" ) -func newPulsarConfig() *pulsarConfig.PulsarConfig { - sinkUrl := os.Getenv("sink-uri") - fmt.Println(sinkUrl) - if len(sinkUrl) <= 0 { - panic("sink-uri is empty: " + sinkUrl) - } - u := &url2.URL{} - u, err := u.Parse(sinkUrl) - if err != nil { - panic(err) - } - c := &pulsarConfig.PulsarConfig{} - err = c.Apply(u) - if err != nil { - panic(err) - } - return c -} - -func TestNewPulsarProducer(t *testing.T) { - config := newPulsarConfig() - client, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: config.URL, - Authentication: pulsar.NewAuthenticationToken(config.AuthenticationToken), - }) - if err != nil { - t.Errorf("new client fail %+v", err) - return - } - - ctx, fc := context.WithTimeout(context.Background(), time.Second*5) +// TestPulsarSyncSendMessage is a integration test for pulsar producer +func TestPulsarSyncSendMessage(t *testing.T) { + leakutil.VerifyNone(t) type args struct { ctx context.Context - client pulsar.Client - pulsarConfig *pulsarConfig.PulsarConfig + topic string + partition int32 + message *common.Message + changefeedID model.ChangeFeedID + pulsarConfig *pulsarConfig.Config errCh chan error } tests := []struct { name string args args - want DDLProducer wantErr bool }{ { - name: "New", + name: "test SyncSendMessage", args: args{ - ctx: ctx, - client: client, - pulsarConfig: config, - errCh: make(chan error), + ctx: context.Background(), + topic: "test", + partition: 1, + changefeedID: model.ChangeFeedID{ID: "test", Namespace: "test_namespace"}, + message: &common.Message{ + Value: []byte("this value for test input data"), + PartitionKey: str2Pointer("test_key"), + }, + errCh: make(chan error), }, }, } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - producer, err := NewPulsarProducer(tt.args.ctx, tt.args.pulsarConfig, tt.args.client) - if err != nil { - t.Errorf("NewPulsarDMLProducer() error = %v, wantErr %v", err, tt.wantErr) - return - } - defer producer.Close() + p, err := NewMockPulsarProducer(tt.args.ctx, tt.args.changefeedID, + tt.args.pulsarConfig, nil) - select { - case e := <-tt.args.errCh: - t.Logf("errChan %+v", e) - case <-ctx.Done(): - fc() - t.Logf("Done") - } - }) - } -} + require.NoError(t, err) -func Test_pulsarProducers_SyncSendMessage(t *testing.T) { + err = p.SyncSendMessage(tt.args.ctx, tt.args.topic, + tt.args.partition, tt.args.message) + require.NoError(t, err) + require.Len(t, p.GetEvents(tt.args.topic), 1) + + p.Close() - config := newPulsarConfig() - client, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: config.URL, - Authentication: pulsar.NewAuthenticationToken(config.AuthenticationToken), - }) - if err != nil { - t.Errorf("new client fail %+v", err) - return } +} - ctx, _ := context.WithTimeout(context.Background(), time.Second*5) +// TestPulsarSyncBroadcastMessage is a integration test for pulsar producer +func TestPulsarSyncBroadcastMessage(t *testing.T) { + // leakutil.VerifyNone(t) type args struct { ctx context.Context topic string partition int32 message *common.Message - client pulsar.Client - pulsarConfig *pulsarConfig.PulsarConfig + changefeedID model.ChangeFeedID + pulsarConfig *pulsarConfig.Config errCh chan error } tests := []struct { @@ -120,107 +92,32 @@ func Test_pulsarProducers_SyncSendMessage(t *testing.T) { wantErr bool }{ { - name: "New pulsar client and AsyncSendMessage", + name: "test SyncBroadcastMessage", args: args{ - ctx: ctx, - topic: "test", - partition: 1, + ctx: context.Background(), + topic: "test", + partition: 1, + changefeedID: model.ChangeFeedID{ID: "test", Namespace: "test_namespace"}, message: &common.Message{ - Key: []byte("key"), - Value: []byte("this value for test input data"), - Callback: func() { - fmt.Println("callback: message send success!") - }, + Value: []byte("this value for test input data"), + PartitionKey: str2Pointer("test_key"), }, - client: client, - pulsarConfig: config, - errCh: make(chan error), + errCh: make(chan error), }, }, } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - p, err := NewPulsarProducer(tt.args.ctx, tt.args.pulsarConfig, tt.args.client) - if err != nil { - t.Errorf("NewPulsarDMLProducer() error = %v, wantErr %v", err, tt.wantErr) - return - } - //defer p.Close() - if err := p.SyncSendMessage(tt.args.ctx, tt.args.topic, tt.args.partition, tt.args.message); err != nil { - t.Errorf("AsyncSendMessage() error = %v, wantErr %v", err, tt.wantErr) - } - time.Sleep(time.Second * 1) - p.Close() - client.Close() - }) - } + p, err := NewMockPulsarProducer(tt.args.ctx, tt.args.changefeedID, + tt.args.pulsarConfig, nil) -} + require.NoError(t, err) -func TestDDLIncreaseMetric(t *testing.T) { - rand.Seed(time.Now().UnixNano()) - index := 0 - m := &common.Message{ - Type: model.MessageTypeDDL, - } - - for { - x := rand.Int31() % 8 - time.Sleep(time.Second * time.Duration(x)) - if index%3 == 1 { - pulsarMetric.IncPublishedDDLEventCountMetricSuccess("testtopic", - "test", m) - } - pulsarMetric.IncPublishedDDLEventCountMetric("testtopic", - "test", m) - - pulsarMetric.IncPublishedDDLEventCountMetric("testtopic", - "test", m) + err = p.SyncSendMessage(tt.args.ctx, tt.args.topic, + tt.args.partition, tt.args.message) + require.NoError(t, err) + require.Len(t, p.GetEvents(tt.args.topic), 1) - pulsarMetric.IncPublishedDDLEventCountMetric("noTable", - "test", m) + p.Close() - pulsarMetric.IncPublishedDDLEventCountMetric("noSchema", - "test", m) - - index++ - } - -} - -func Test_pulsarProducers_closeProducersMapByTopic(t *testing.T) { - type fields struct { - client pulsar.Client - pConfig *pulsarConfig.PulsarConfig - defaultTopicName string - producers map[string]pulsar.Producer - producersMutex sync.RWMutex - changefeedID model.ChangeFeedID - } - type args struct { - topicName string - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - p := &pulsarProducers{ - client: tt.fields.client, - pConfig: tt.fields.pConfig, - defaultTopicName: tt.fields.defaultTopicName, - producers: tt.fields.producers, - producersMutex: tt.fields.producersMutex, - id: tt.fields.changefeedID, - } - if err := p.closeProducersMapByTopic(tt.args.topicName); (err != nil) != tt.wantErr { - t.Errorf("closeProducersMapByTopic() error = %v, wantErr %v", err, tt.wantErr) - } - }) } } diff --git a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go index 0442e6a8952..63034dff27d 100644 --- a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go @@ -1,36 +1,50 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package mq import ( "context" - "fmt" "net/url" "time" - "github.com/apache/pulsar-client-go/pulsar" "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer" - "github.com/pingcap/tiflow/cdc/sink/mq/dispatcher" - "github.com/pingcap/tiflow/cdc/sink/mq/manager" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager" + // pulsarMetric "github.com/pingcap/tiflow/cdc/sink/metrics/mq/pulsar" "github.com/pingcap/tiflow/cdc/sink/util" - pulsarMetric "github.com/pingcap/tiflow/cdc/sinkv/metrics/mq/pulsar" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec/builder" pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" + tiflowutil "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) // NewPulsarDDLSink will verify the config and create a Pulsar DDL Sink. func NewPulsarDDLSink( ctx context.Context, + changefeedID model.ChangeFeedID, sinkURI *url.URL, replicaConfig *config.ReplicaConfig, + pulsarTopicManagerCreator manager.PulsarTopicManager, + clientCreator pulsarConfig.FactoryCreator, producerCreator ddlproducer.PulsarFactory, -) (_ *ddlSink, err error) { - - changefeedID := contextutil.ChangefeedIDFromCtx(ctx) +) (_ *DDLSink, err error) { + // changefeedID := contextutil.ChangefeedIDFromCtx(ctx) log.Info("Starting pulsar DDL producer ...", zap.String("namespace", changefeedID.Namespace), zap.String("changefeed", changefeedID.ID)) @@ -40,7 +54,7 @@ func NewPulsarDDLSink( return nil, errors.Trace(err) } - protocol, err := util.GetProtocol(replicaConfig.Sink.Protocol) + protocol, err := util.GetProtocol(tiflowutil.GetOrZero(replicaConfig.Sink.Protocol)) if err != nil { return nil, errors.Trace(err) } @@ -52,14 +66,14 @@ func NewPulsarDDLSink( log.Info("Try to create a DDL sink producer", zap.Any("pulsarConfig", pConfig)) - client, err := createPulsarClient(pConfig, changefeedID) + start := time.Now() + client, err := clientCreator(pConfig, changefeedID) if err != nil { log.Error("DDL sink producer client create fail", zap.Error(err)) return nil, cerror.WrapError(cerror.ErrPulsarNewClient, err) } - start := time.Now() - p, err := producerCreator(ctx, pConfig, client) + p, err := producerCreator(ctx, changefeedID, pConfig, client) log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) if err != nil { return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) @@ -71,72 +85,28 @@ func NewPulsarDDLSink( return nil, errors.Trace(err) } - encoderConfig, err := util.GetEncoderConfig(sinkURI, protocol, replicaConfig, - pConfig.MaxMessageBytes) + encoderConfig, err := util.GetEncoderConfig(sinkURI, protocol, replicaConfig, pConfig.MaxMessageBytes) if err != nil { return nil, errors.Trace(err) } - topicManager, err := manager.NewPulsarTopicManager(pConfig, client) + encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, changefeedID, encoderConfig) if err != nil { - return nil, errors.Trace(err) + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } - s, err := newDDLSink(ctx, p, topicManager, eventRouter, encoderConfig) + topicManager, err := pulsarTopicManagerCreator(pConfig, client) if err != nil { return nil, errors.Trace(err) } - return s, nil -} - -func createPulsarClient(config *pulsarConfig.PulsarConfig, changefeedID model.ChangeFeedID) (pulsar.Client, error) { - - op := pulsar.ClientOptions{ - URL: config.URL, - MetricsRegisterer: pulsarMetric.GetMetricRegistry(), - CustomMetricsLabels: map[string]string{ - "changefeed": changefeedID.ID, - "namespace": changefeedID.Namespace, - }, - } + // s, err := newDDLSink(ctx, changefeedID, p, topicManager, eventRouter, encoderConfig) + s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol) - if len(config.AuthenticationToken) > 0 { - op.Authentication = pulsar.NewAuthenticationToken(config.AuthenticationToken) - } else if len(config.TokenFromFile) > 0 { - op.Authentication = pulsar.NewAuthenticationTokenFromFile(config.TokenFromFile) - } else if len(config.BasicUserName) > 0 && len(config.BasicPassword) > 0 { - authentication, err := pulsar.NewAuthenticationBasic(config.BasicUserName, config.BasicPassword) - if err != nil { - return nil, err - } - op.Authentication = authentication - } - - if config.ConnectionTimeout > 0 { - op.ConnectionTimeout = config.ConnectionTimeout - } - if config.OperationTimeout > 0 { - op.OperationTimeout = config.OperationTimeout - } - - pulsarClient, err := pulsar.NewClient(op) - if err != nil { - log.L().Error("Cannot connect to pulsar", zap.Error(err)) - return nil, err - } - return pulsarClient, nil + return s, nil } -func setupAuthentication(config *pulsarConfig.PulsarConfig) (pulsar.Authentication, error) { - if len(config.AuthenticationToken) > 0 { - return pulsar.NewAuthenticationToken(config.AuthenticationToken) - } else if len(config.TokenFromFile) > 0 { - return pulsar.NewAuthenticationTokenFromFile(config.TokenFromFile) - } else if len(config.BasicUserName) > 0 && len(config.BasicPassword) > 0 { - return pulsar.NewAuthenticationBasic(config.BasicUserName, config.BasicPassword) - } else if len(config.OAuth2) > 0 { - return pulsar.NewAuthenticationBasic(config.BasicUserName, config.BasicPassword) - } - return nil, fmt.Errorf("no authentication method found") +// str2Pointer returns the pointer of the string. +func str2Pointer(str string) *string { + return &str } diff --git a/cdc/sink/ddlsink/mq/pulsar_ddl_sink_test.go b/cdc/sink/ddlsink/mq/pulsar_ddl_sink_test.go index ceec971cc3c..515bfcd0163 100644 --- a/cdc/sink/ddlsink/mq/pulsar_ddl_sink_test.go +++ b/cdc/sink/ddlsink/mq/pulsar_ddl_sink_test.go @@ -1,48 +1,160 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package mq import ( "context" - "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer" - "github.com/pingcap/tiflow/pkg/config" "net/url" "testing" + + mm "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager" + "github.com/pingcap/tiflow/pkg/config" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" + "github.com/stretchr/testify/require" ) +const ( + // MockPulsarTopic is the mock topic for pulsar + MockPulsarTopic = "pulsar_test" +) + +// newPulsarConfig set config +func newPulsarConfig(t *testing.T) (*pulsarConfig.Config, *url.URL) { + sinkURL := "pulsar://127.0.0.1:6650/persistent://public/default/test?" + + "protocol=canal-json&pulsar-version=v2.10.0&enable-tidb-extension=true&" + + "authentication-token=eyJhbcGcixxxxxxxxxxxxxx" + + sinkURI, err := url.Parse(sinkURL) + require.NoError(t, err) + replicaConfig := config.GetDefaultReplicaConfig() + require.NoError(t, replicaConfig.ValidateAndAdjust(sinkURI)) + require.NoError(t, err) + c, err := pulsarConfig.NewPulsarConfig(sinkURI) + require.NoError(t, err) + return c, sinkURI +} + +// TestNewPulsarDDLSink tests the NewPulsarDDLSink func TestNewPulsarDDLSink(t *testing.T) { - type args struct { - ctx context.Context - sinkURI *url.URL - replicaConfig *config.ReplicaConfig - producerCreator ddlproducer.Factory + t.Parallel() + + cfg, sinkURI := newPulsarConfig(t) + changefeedID := model.DefaultChangeFeedID("test") + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink = &config.SinkConfig{ + Protocol: str2Pointer(cfg.Protocol.String()), } - tests := []struct { - name string - args args - want *ddlSink - wantErr bool - }{ + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx = context.WithValue(ctx, "testing.T", t) + ddlSink, err := NewPulsarDDLSink(ctx, changefeedID, sinkURI, replicaConfig, + manager.NewMockPulsarTopicManager, pulsarConfig.NewMockCreatorFactory, ddlproducer.NewMockPulsarProducerDDL) + + require.NoError(t, err) + require.NotNil(t, ddlSink) + + checkpointTs := uint64(417318403368288260) + tables := []*model.TableInfo{ + { + TableName: model.TableName{ + Schema: "cdc", + Table: "person", + }, + }, + { + TableName: model.TableName{ + Schema: "cdc", + Table: "person1", + }, + }, { - name: "test normal new pulsar ddl sink", - args: args{ - ctx: context.Background(), - sinkURI: &url.URL{}, - replicaConfig: &config.ReplicaConfig{}, - producerCreator: nil, + TableName: model.TableName{ + Schema: "cdc", + Table: "person2", }, - want: &ddlSink{}, - wantErr: false, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := NewPulsarDDLSink(tt.args.ctx, tt.args.sinkURI, - tt.args.replicaConfig, ddlproducer.NewPulsarProducer) - if err != nil { - t.Errorf("NewPulsarDDLSink() error = %v, wantErr %v", err, tt.wantErr) - return - } - t.Logf("got sink = %+v", got) - - }) + + err = ddlSink.WriteCheckpointTs(ctx, checkpointTs, tables) + require.NoError(t, err) + + events := ddlSink.producer.(*ddlproducer.PulsarMockProducers).GetAllEvents() + require.Len(t, events, 1, "All topics and partitions should be broadcast") +} + +// TestPulsarDDLSinkNewSuccess tests the NewPulsarDDLSink write a event to pulsar +func TestPulsarDDLSinkNewSuccess(t *testing.T) { + t.Parallel() + + cfg, sinkURI := newPulsarConfig(t) + changefeedID := model.DefaultChangeFeedID("test") + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink = &config.SinkConfig{ + Protocol: str2Pointer(cfg.Protocol.String()), } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx = context.WithValue(ctx, "testing.T", t) + s, err := NewPulsarDDLSink(ctx, changefeedID, sinkURI, replicaConfig, manager.NewMockPulsarTopicManager, + pulsarConfig.NewMockCreatorFactory, ddlproducer.NewMockPulsarProducerDDL) + require.NoError(t, err) + require.NotNil(t, s) +} + +func TestPulsarWriteDDLEventToZeroPartition(t *testing.T) { + t.Parallel() + + cfg, sinkURI := newPulsarConfig(t) + changefeedID := model.DefaultChangeFeedID("test") + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink = &config.SinkConfig{ + Protocol: str2Pointer(cfg.Protocol.String()), + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx = context.WithValue(ctx, "testing.T", t) + ddlSink, err := NewPulsarDDLSink(ctx, changefeedID, sinkURI, replicaConfig, + manager.NewMockPulsarTopicManager, pulsarConfig.NewMockCreatorFactory, ddlproducer.NewMockPulsarProducerDDL) + + require.NoError(t, err) + require.NotNil(t, ddlSink) + + ddl := &model.DDLEvent{ + CommitTs: 417318403368288260, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "cdc", Table: "person", + }, + }, + Query: "create table person(id int, name varchar(32), primary key(id))", + Type: mm.ActionCreateTable, + } + err = ddlSink.WriteDDLEvent(ctx, ddl) + require.NoError(t, err) + + err = ddlSink.WriteDDLEvent(ctx, ddl) + require.NoError(t, err) + + require.Len(t, ddlSink.producer.(*ddlproducer.PulsarMockProducers).GetAllEvents(), + 2, "Write DDL 2 Events") } diff --git a/cdc/sink/dmlsink/mq/manager/pulsar_manager.go b/cdc/sink/dmlsink/mq/manager/pulsar_manager.go new file mode 100644 index 00000000000..450f9d999a7 --- /dev/null +++ b/cdc/sink/dmlsink/mq/manager/pulsar_manager.go @@ -0,0 +1,99 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "context" + "sync" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/pingcap/log" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" + "go.uber.org/zap" +) + +type partition struct { + partitions []string + since time.Time +} + +// PulsarTopicManager is a manager for pulsar topics. +type PulsarTopicManager func( + cfg *pulsarConfig.Config, + client pulsar.Client, +) (TopicManager, error) + +// pulsarTopicManager is a manager for pulsar topics. +type pulsarTopicManager struct { + client pulsar.Client + partitions sync.Map // key : topic, value : partition-name + cfg *pulsarConfig.Config +} + +// NewPulsarTopicManager creates a new topic manager. +func NewPulsarTopicManager( + cfg *pulsarConfig.Config, + client pulsar.Client, +) (TopicManager, error) { + mgr := &pulsarTopicManager{ + client: client, + cfg: cfg, + partitions: sync.Map{}, + } + + return mgr, nil +} + +// GetPartitionNum spend more time,but no use. +func (m *pulsarTopicManager) GetPartitionNum(ctx context.Context, topic string) (int32, error) { + if v, ok := m.partitions.Load(topic); ok { + pt, ok := v.(*partition) + if ok { + if time.Since(pt.since) > time.Minute { + m.partitions.Delete(topic) + } + return int32(len(pt.partitions)), nil + } + } + + partitions, err := m.client.TopicPartitions(topic) + if err != nil { + log.L().Error("pulsar GetPartitions fail", zap.Error(err)) + return 0, err + } + log.L().Debug("pulsar GetPartitions", zap.Strings("partitions", partitions)) + pt := &partition{ + partitions: partitions, + since: time.Now(), + } + m.partitions.Store(topic, pt) + + return int32(len(pt.partitions)), nil +} + +// CreateTopicAndWaitUntilVisible no need to create first +func (m *pulsarTopicManager) CreateTopicAndWaitUntilVisible(ctx context.Context, topicName string) (int32, error) { + return 0, nil +} + +// Close +func (m *pulsarTopicManager) Close() { + +} + +// str2Pointer returns the pointer of the string. +func str2Pointer(str string) *string { + return &str +} diff --git a/cdc/sink/dmlsink/mq/manager/pulsar_manager_mock.go b/cdc/sink/dmlsink/mq/manager/pulsar_manager_mock.go new file mode 100644 index 00000000000..12991941b0d --- /dev/null +++ b/cdc/sink/dmlsink/mq/manager/pulsar_manager_mock.go @@ -0,0 +1,39 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "context" + "github.com/apache/pulsar-client-go/pulsar" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" +) + +// pulsarTopicManager is a manager for pulsar topics. +type pulsarTopicManagerMock struct { + *pulsarTopicManager +} + +// NewMockPulsarTopicManager creates a new topic manager. +func NewMockPulsarTopicManager( + cfg *pulsarConfig.Config, + client pulsar.Client, +) (TopicManager, error) { + mgr := &pulsarTopicManagerMock{} + return mgr, nil +} + +// GetPartitionNum spend more time,but no use. +func (m *pulsarTopicManagerMock) GetPartitionNum(ctx context.Context, topic string) (int32, error) { + return 3, nil +} diff --git a/cdc/sink/dmlsink/mq/manager/pulsar_manager_test.go b/cdc/sink/dmlsink/mq/manager/pulsar_manager_test.go new file mode 100644 index 00000000000..b9364a4c10e --- /dev/null +++ b/cdc/sink/dmlsink/mq/manager/pulsar_manager_test.go @@ -0,0 +1,61 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "context" + "net/url" + "testing" + + "github.com/pingcap/tiflow/pkg/config" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" + "github.com/stretchr/testify/require" +) + +func newPulsarConfig(t *testing.T) (*pulsarConfig.Config, *url.URL) { + sinkURL := "pulsar://127.0.0.1:6650/persistent://public/default/test?" + + "protocol=canal-json&pulsar-version=v2.10.0&enable-tidb-extension=true&" + + "authentication-token=eyJhbGciOiJSUzIxxxxxxxxxxxxxxxx" + + sinkURI, err := url.Parse(sinkURL) + require.NoError(t, err) + replicaConfig := config.GetDefaultReplicaConfig() + require.NoError(t, replicaConfig.ValidateAndAdjust(sinkURI)) + require.NoError(t, err) + c, err := pulsarConfig.NewPulsarConfig(sinkURI) + require.NoError(t, err) + return c, sinkURI +} + +func TestGetPartitionNumMock(t *testing.T) { + t.Parallel() + + cfg, _ := newPulsarConfig(t) + + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink = &config.SinkConfig{ + Protocol: str2Pointer(cfg.Protocol.String()), + } + + ctx := context.Background() + + ctx = context.WithValue(ctx, "testing.T", t) + pm, err := NewMockPulsarTopicManager(cfg, nil) + require.NoError(t, err) + require.NotNil(t, pm) + + pn, err := pm.GetPartitionNum(ctx, "test") + require.NoError(t, err) + require.Equal(t, int32(3), pn) +} diff --git a/pkg/leakutil/leak_helper.go b/pkg/leakutil/leak_helper.go index d6844d3f90f..4984a68431b 100644 --- a/pkg/leakutil/leak_helper.go +++ b/pkg/leakutil/leak_helper.go @@ -32,6 +32,10 @@ var defaultOpts = []goleak.Option{ goleak.IgnoreTopFunction("github.com/Shopify/sarama.(*client).backgroundMetadataUpdater"), goleak.IgnoreTopFunction("github.com/Shopify/sarama.(*Broker).responseReceiver"), goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), + + // pulsar client will create a goroutine to handle the connection, but it will not exit when the connection is closed. + // it doesn't work for pulsar client v0.11.0 + // goleak.IgnoreTopFunction("github.com/godbus/dbus.(*Conn).Auth"), } // VerifyNone verifies that no unexpected leaks occur diff --git a/pkg/sink/codec/common/message.go b/pkg/sink/codec/common/message.go index 7d85fa12cf4..003230d5524 100644 --- a/pkg/sink/codec/common/message.go +++ b/pkg/sink/codec/common/message.go @@ -49,6 +49,9 @@ type Message struct { ClaimCheckFileName string Event *model.RowChangedEvent + + // PartitionKey for pulsar, route messages to one or different partitions + PartitionKey *string } // Length returns the expected size of the Kafka message @@ -78,6 +81,30 @@ func (m *Message) IncRowsCount() { m.rowsCount++ } +// GetSchema returns schema string +func (m *Message) GetSchema() string { + if m.Schema == nil { + return "" + } + return *m.Schema +} + +// GetTable returns the Table string +func (m *Message) GetTable() string { + if m.Table == nil { + return "" + } + return *m.Table +} + +// GetPartitionKey returns the GetPartitionKey +func (m *Message) GetPartitionKey() string { + if m.PartitionKey == nil { + return "" + } + return *m.PartitionKey +} + // NewDDLMsg creates a DDL message. func NewDDLMsg(proto config.Protocol, key, value []byte, event *model.DDLEvent) *Message { return NewMsg( diff --git a/pkg/sink/pulsar/factory.go b/pkg/sink/pulsar/factory.go new file mode 100644 index 00000000000..568840d0c37 --- /dev/null +++ b/pkg/sink/pulsar/factory.go @@ -0,0 +1,69 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pulsar + +import ( + "fmt" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "go.uber.org/zap" +) + +// FactoryCreator defines the type of factory creator. +type FactoryCreator func(config *Config, changefeedID model.ChangeFeedID) (pulsar.Client, error) + +// NewCreatorFactory returns a factory implemented based on kafka-go +func NewCreatorFactory(config *Config, changefeedID model.ChangeFeedID) (pulsar.Client, error) { + co := pulsar.ClientOptions{ + URL: config.URL, + // MetricsRegisterer: pulsarMetric.GetMetricRegistry(), + CustomMetricsLabels: map[string]string{ + "changefeed": changefeedID.ID, + "namespace": changefeedID.Namespace, + }, + ConnectionTimeout: config.ConnectionTimeout, + OperationTimeout: config.OperationTimeout, + } + var err error + + co.Authentication, err = setupAuthentication(config) + if err != nil { + log.L().Error("setup pulsar authentication fail", zap.Error(err)) + return nil, err + } + + pulsarClient, err := pulsar.NewClient(co) + if err != nil { + log.L().Error("cannot connect to pulsar", zap.Error(err)) + return nil, err + } + return pulsarClient, nil +} + +func setupAuthentication(config *Config) (pulsar.Authentication, error) { + if len(config.AuthenticationToken) > 0 { + return pulsar.NewAuthenticationToken(config.AuthenticationToken), nil + } else if len(config.TokenFromFile) > 0 { + return pulsar.NewAuthenticationTokenFromFile(config.TokenFromFile), nil + } else if len(config.BasicUserName) > 0 && len(config.BasicPassword) > 0 { + return pulsar.NewAuthenticationBasic(config.BasicUserName, config.BasicPassword) + } else if len(config.OAuth2) >= 5 { + return pulsar.NewAuthenticationOAuth2(config.OAuth2), nil + } else if len(config.TLSCertificatePath) > 0 && len(config.TLSPrivateKeyPath) > 0 { + return pulsar.NewAuthenticationTLS(config.TLSCertificatePath, config.TLSPrivateKeyPath), nil + } + return nil, fmt.Errorf("no authentication method found") +} diff --git a/pkg/sink/pulsar/factory_mock.go b/pkg/sink/pulsar/factory_mock.go new file mode 100644 index 00000000000..d17da44d0cf --- /dev/null +++ b/pkg/sink/pulsar/factory_mock.go @@ -0,0 +1,27 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pulsar + +import ( + "github.com/apache/pulsar-client-go/pulsar" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "go.uber.org/zap" +) + +// NewMockCreatorFactory returns a factory implemented based on kafka-go +func NewMockCreatorFactory(config *Config, changefeedID model.ChangeFeedID) (pulsar.Client, error) { + log.L().Info("mock pulsar client factory created", zap.Any("changfeedID", changefeedID)) + return nil, nil +} diff --git a/pkg/sink/pulsar/factory_test.go b/pkg/sink/pulsar/factory_test.go new file mode 100644 index 00000000000..84034e986cd --- /dev/null +++ b/pkg/sink/pulsar/factory_test.go @@ -0,0 +1,27 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pulsar + +import ( + "testing" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/stretchr/testify/require" +) + +func TestNewCreatorFactory(t *testing.T) { + client, err := NewMockCreatorFactory(nil, model.DefaultChangeFeedID("test")) + require.NoError(t, err) + require.Nil(t, client) +} From a17c8fea952ecc2518f0dd97b22805fe91d8898a Mon Sep 17 00:00:00 2001 From: yumchina Date: Thu, 27 Jul 2023 11:31:38 +0800 Subject: [PATCH 04/11] fix notes --- cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go | 1 + cdc/sink/dmlsink/mq/manager/pulsar_manager.go | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go index d9cccccc27d..fbdd888a7dc 100644 --- a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go @@ -47,6 +47,7 @@ func (p *pulsarProducers) SyncBroadcastMessage(ctx context.Context, topic string totalPartitionsNum int32, message *common.Message, ) error { // call SyncSendMessage + // pulsar consumer all partitions return p.SyncSendMessage(ctx, topic, totalPartitionsNum, message) } diff --git a/cdc/sink/dmlsink/mq/manager/pulsar_manager.go b/cdc/sink/dmlsink/mq/manager/pulsar_manager.go index 450f9d999a7..8da156df50c 100644 --- a/cdc/sink/dmlsink/mq/manager/pulsar_manager.go +++ b/cdc/sink/dmlsink/mq/manager/pulsar_manager.go @@ -90,7 +90,6 @@ func (m *pulsarTopicManager) CreateTopicAndWaitUntilVisible(ctx context.Context, // Close func (m *pulsarTopicManager) Close() { - } // str2Pointer returns the pointer of the string. From 4280181bd92fbe85b5073a91794676fce90ee7c1 Mon Sep 17 00:00:00 2001 From: yumchina Date: Tue, 1 Aug 2023 10:43:57 +0800 Subject: [PATCH 05/11] remove factory_mock.go factory_test.go remove Flush() from pulsar_ddl_producer.go --- .../mq/ddlproducer/pulsar_ddl_producer.go | 21 --------------- pkg/sink/pulsar/factory.go | 8 +++++- pkg/sink/pulsar/factory_mock.go | 27 ------------------- pkg/sink/pulsar/factory_test.go | 27 ------------------- 4 files changed, 7 insertions(+), 76 deletions(-) delete mode 100644 pkg/sink/pulsar/factory_mock.go delete mode 100644 pkg/sink/pulsar/factory_test.go diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go index fbdd888a7dc..cf13a037349 100644 --- a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go @@ -180,27 +180,6 @@ func (p *pulsarProducers) Close() { p.client.Close() } -// Flush waits for all the messages in the async producer to be sent to Pulsar. -// Notice: this method is not thread-safe. -// Do not try to call AsyncSendMessage and Flush functions in different threads, -// otherwise Flush will not work as expected. It may never finish or flush the wrong message. -// Because inflight will be modified by mistake. -func (p *pulsarProducers) Flush(ctx context.Context) error { - done := make(chan struct{}, 1) - p.producersMutex.Lock() - for _, pd := range p.producers { - pd.Flush() - } - p.producersMutex.Unlock() - done <- struct{}{} - select { - case <-ctx.Done(): - return ctx.Err() - case <-done: - return nil - } -} - // closeProducersMapByTopic close producer by topicName func (p *pulsarProducers) closeProducersMapByTopic(topicName string) { p.producersMutex.Lock() diff --git a/pkg/sink/pulsar/factory.go b/pkg/sink/pulsar/factory.go index 568840d0c37..bfdc326b443 100644 --- a/pkg/sink/pulsar/factory.go +++ b/pkg/sink/pulsar/factory.go @@ -29,7 +29,6 @@ type FactoryCreator func(config *Config, changefeedID model.ChangeFeedID) (pulsa func NewCreatorFactory(config *Config, changefeedID model.ChangeFeedID) (pulsar.Client, error) { co := pulsar.ClientOptions{ URL: config.URL, - // MetricsRegisterer: pulsarMetric.GetMetricRegistry(), CustomMetricsLabels: map[string]string{ "changefeed": changefeedID.ID, "namespace": changefeedID.Namespace, @@ -53,6 +52,7 @@ func NewCreatorFactory(config *Config, changefeedID model.ChangeFeedID) (pulsar. return pulsarClient, nil } +// setupAuthentication sets up authentication for pulsar client func setupAuthentication(config *Config) (pulsar.Authentication, error) { if len(config.AuthenticationToken) > 0 { return pulsar.NewAuthenticationToken(config.AuthenticationToken), nil @@ -67,3 +67,9 @@ func setupAuthentication(config *Config) (pulsar.Authentication, error) { } return nil, fmt.Errorf("no authentication method found") } + +// NewMockCreatorFactory returns a factory implemented based on kafka-go +func NewMockCreatorFactory(config *Config, changefeedID model.ChangeFeedID) (pulsar.Client, error) { + log.L().Info("mock pulsar client factory created", zap.Any("changfeedID", changefeedID)) + return nil, nil +} diff --git a/pkg/sink/pulsar/factory_mock.go b/pkg/sink/pulsar/factory_mock.go deleted file mode 100644 index d17da44d0cf..00000000000 --- a/pkg/sink/pulsar/factory_mock.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package pulsar - -import ( - "github.com/apache/pulsar-client-go/pulsar" - "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/model" - "go.uber.org/zap" -) - -// NewMockCreatorFactory returns a factory implemented based on kafka-go -func NewMockCreatorFactory(config *Config, changefeedID model.ChangeFeedID) (pulsar.Client, error) { - log.L().Info("mock pulsar client factory created", zap.Any("changfeedID", changefeedID)) - return nil, nil -} diff --git a/pkg/sink/pulsar/factory_test.go b/pkg/sink/pulsar/factory_test.go deleted file mode 100644 index 84034e986cd..00000000000 --- a/pkg/sink/pulsar/factory_test.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package pulsar - -import ( - "testing" - - "github.com/pingcap/tiflow/cdc/model" - "github.com/stretchr/testify/require" -) - -func TestNewCreatorFactory(t *testing.T) { - client, err := NewMockCreatorFactory(nil, model.DefaultChangeFeedID("test")) - require.NoError(t, err) - require.Nil(t, client) -} From 6e4d1b2d5ea06c01abea4c78d1d59f0464ea36ce Mon Sep 17 00:00:00 2001 From: yumchina Date: Wed, 2 Aug 2023 17:33:11 +0800 Subject: [PATCH 06/11] replace log.L() remove code comment fix bugs by gch4236 --- cdc/sink/ddlsink/factory/factory.go | 2 +- .../ddlproducer/pulsar_ddl_mock_producer.go | 3 +- .../mq/ddlproducer/pulsar_ddl_producer.go | 13 +++------ cdc/sink/ddlsink/mq/pulsar_ddl_sink.go | 29 +++++++++---------- pkg/config/sink.go | 3 ++ pkg/leakutil/leak_helper.go | 4 --- pkg/sink/pulsar/config.go | 25 +++++++++++----- pkg/sink/pulsar/factory.go | 8 ++--- pkg/sink/sink_type.go | 2 ++ 9 files changed, 44 insertions(+), 45 deletions(-) diff --git a/cdc/sink/ddlsink/factory/factory.go b/cdc/sink/ddlsink/factory/factory.go index 5d9c767c6a5..1a8107c1bf1 100644 --- a/cdc/sink/ddlsink/factory/factory.go +++ b/cdc/sink/ddlsink/factory/factory.go @@ -61,7 +61,7 @@ func New( return mysql.NewDDLSink(ctx, changefeedID, sinkURI, cfg) case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme: return cloudstorage.NewDDLSink(ctx, changefeedID, sinkURI, cfg) - case sink.PulsarScheme: + case sink.PulsarScheme, sink.PulsarSSLScheme: return mq.NewPulsarDDLSink(ctx, changefeedID, sinkURI, cfg, manager.NewPulsarTopicManager, pulsarConfig.NewCreatorFactory, ddlproducer.NewPulsarProducer) default: diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go index 23df797cb19..176e2fa8212 100644 --- a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go @@ -37,7 +37,7 @@ func (p *PulsarMockProducers) SyncBroadcastMessage(ctx context.Context, topic st ) error { // call SyncSendMessage - log.L().Info("pulsarProducers SyncBroadcastMessage in") + log.Info("pulsarProducers SyncBroadcastMessage in") return p.SyncSendMessage(ctx, topic, totalPartitionsNum, message) } @@ -50,7 +50,6 @@ func (p *PulsarMockProducers) SyncSendMessage(ctx context.Context, topic string, Payload: message.Value, Key: message.GetPartitionKey(), } - // fmt.Println("pulsarProducers SyncSendMessage", data) p.events[topic] = append(p.events[topic], data) return nil diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go index cf13a037349..ffff48e18a3 100644 --- a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go @@ -21,7 +21,6 @@ import ( "github.com/apache/pulsar-client-go/pulsar" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - // pulsarMetric "github.com/pingcap/tiflow/cdc/sink/metrics/mq/pulsar" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/codec/common" pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" @@ -58,11 +57,9 @@ func (p *pulsarProducers) SyncSendMessage(ctx context.Context, topic string, ) error { p.wrapperSchemaAndTopic(message) - // pulsarMetric.IncPublishedDDLEventCountMetric(topic, p.id.ID, message) producer, err := p.GetProducerByTopic(topic) if err != nil { - log.L().Error("ddl SyncSendMessage GetProducerByTopic fail", zap.Error(err)) - // pulsarMetric.IncPublishedDDLEventCountMetricFail(topic, p.id.ID, message) + log.Error("ddl SyncSendMessage GetProducerByTopic fail", zap.Error(err)) return err } @@ -72,13 +69,11 @@ func (p *pulsarProducers) SyncSendMessage(ctx context.Context, topic string, } mID, err := producer.Send(ctx, data) if err != nil { - log.L().Error("ddl producer send fail", zap.Error(err)) - // pulsarMetric.IncPublishedDDLEventCountMetricFail(producer.Topic(), p.id.ID, message) + log.Error("ddl producer send fail", zap.Error(err)) return err } - // pulsarMetric.IncPublishedDDLEventCountMetricSuccess(producer.Topic(), p.id.ID, message) - log.L().Debug("pulsarProducers SyncSendMessage success", + log.Debug("pulsarProducers SyncSendMessage success", zap.Any("mID", mID), zap.String("topic", topic)) return nil @@ -141,7 +136,7 @@ func newProducer( return nil, err } - log.L().Info("create pulsar producer success", + log.Info("create pulsar producer success", zap.String("topic:", topicName)) return producer, nil diff --git a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go index 63034dff27d..a2762bcf253 100644 --- a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer" "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager" - // pulsarMetric "github.com/pingcap/tiflow/cdc/sink/metrics/mq/pulsar" "github.com/pingcap/tiflow/cdc/sink/util" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -44,7 +43,6 @@ func NewPulsarDDLSink( clientCreator pulsarConfig.FactoryCreator, producerCreator ddlproducer.PulsarFactory, ) (_ *DDLSink, err error) { - // changefeedID := contextutil.ChangefeedIDFromCtx(ctx) log.Info("Starting pulsar DDL producer ...", zap.String("namespace", changefeedID.Namespace), zap.String("changefeed", changefeedID.ID)) @@ -66,19 +64,6 @@ func NewPulsarDDLSink( log.Info("Try to create a DDL sink producer", zap.Any("pulsarConfig", pConfig)) - start := time.Now() - client, err := clientCreator(pConfig, changefeedID) - if err != nil { - log.Error("DDL sink producer client create fail", zap.Error(err)) - return nil, cerror.WrapError(cerror.ErrPulsarNewClient, err) - } - - p, err := producerCreator(ctx, changefeedID, pConfig, client) - log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) - if err != nil { - return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) - } - // NewEventRouter eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic) if err != nil { @@ -95,12 +80,24 @@ func NewPulsarDDLSink( return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } + start := time.Now() + client, err := clientCreator(pConfig, changefeedID) + if err != nil { + log.Error("DDL sink producer client create fail", zap.Error(err)) + return nil, cerror.WrapError(cerror.ErrPulsarNewClient, err) + } + + p, err := producerCreator(ctx, changefeedID, pConfig, client) + log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) + if err != nil { + return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) + } + topicManager, err := pulsarTopicManagerCreator(pConfig, client) if err != nil { return nil, errors.Trace(err) } - // s, err := newDDLSink(ctx, changefeedID, p, topicManager, eventRouter, encoderConfig) s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol) return s, nil diff --git a/pkg/config/sink.go b/pkg/config/sink.go index ced67c78e51..c88e6aea6af 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -326,6 +326,9 @@ type KafkaConfig struct { LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` } +type PulsarConfig struct { +} + // MySQLConfig represents a MySQL sink configuration type MySQLConfig struct { WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"` diff --git a/pkg/leakutil/leak_helper.go b/pkg/leakutil/leak_helper.go index 4984a68431b..d6844d3f90f 100644 --- a/pkg/leakutil/leak_helper.go +++ b/pkg/leakutil/leak_helper.go @@ -32,10 +32,6 @@ var defaultOpts = []goleak.Option{ goleak.IgnoreTopFunction("github.com/Shopify/sarama.(*client).backgroundMetadataUpdater"), goleak.IgnoreTopFunction("github.com/Shopify/sarama.(*Broker).responseReceiver"), goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), - - // pulsar client will create a goroutine to handle the connection, but it will not exit when the connection is closed. - // it doesn't work for pulsar client v0.11.0 - // goleak.IgnoreTopFunction("github.com/godbus/dbus.(*Conn).Auth"), } // VerifyNone verifies that no unexpected leaks occur diff --git a/pkg/sink/pulsar/config.go b/pkg/sink/pulsar/config.go index 0ee24a23609..17e9a628c1d 100644 --- a/pkg/sink/pulsar/config.go +++ b/pkg/sink/pulsar/config.go @@ -15,6 +15,7 @@ package pulsar import ( "fmt" + "github.com/apache/pulsar-client-go/pulsar/auth" "net/url" "strconv" "strings" @@ -86,6 +87,12 @@ const ( OAuth2PrivateKey = "oauth2-private-key" // OAuth2ClientID the client ID of the application. OAuth2ClientID = "oauth2-client-id" + // OAuth2Type the type of the OAuth2 . + OAuth2Type = "oauth2-type" + // OAuth2TypeClientCredentials client_credentials + OAuth2TypeClientCredentials = "oauth2-client-credentials" + // OAuth2Scope scope + OAuth2Scope = "auth2-scope" ) // sink config default Value @@ -202,28 +209,30 @@ func (c *Config) checkSinkURI(sinkURI *url.URL) error { func (c *Config) applyOAuth(params url.Values) { // Go client use Oauth2 authentication // https://pulsar.apache.org/docs/2.10.x/security-oauth2/#authentication-types + // pulsar client now support type as client_credentials only s := params.Get(OAuth2IssuerURL) if len(s) > 0 { - c.OAuth2["issuerUrl"] = s + c.OAuth2[auth.ConfigParamIssuerURL] = s } - s = params.Get(OAuth2Audience) if len(s) > 0 { - c.OAuth2["audience"] = s + c.OAuth2[auth.ConfigParamAudience] = s + } + s = params.Get(OAuth2Scope) + if len(s) > 0 { + c.OAuth2[auth.ConfigParamScope] = s } - s = params.Get(OAuth2PrivateKey) if len(s) > 0 { - c.OAuth2["privateKey"] = s + c.OAuth2[auth.ConfigParamKeyFile] = s } - s = params.Get(OAuth2ClientID) if len(s) > 0 { - c.OAuth2["clientId"] = s + c.OAuth2[auth.ConfigParamClientID] = s } if len(c.OAuth2) >= 4 { - c.OAuth2["type"] = "client_credentials" + c.OAuth2[auth.ConfigParamType] = auth.ConfigParamTypeClientCredentials } else { c.OAuth2 = make(map[string]string) } diff --git a/pkg/sink/pulsar/factory.go b/pkg/sink/pulsar/factory.go index bfdc326b443..1a07bbaac42 100644 --- a/pkg/sink/pulsar/factory.go +++ b/pkg/sink/pulsar/factory.go @@ -40,13 +40,13 @@ func NewCreatorFactory(config *Config, changefeedID model.ChangeFeedID) (pulsar. co.Authentication, err = setupAuthentication(config) if err != nil { - log.L().Error("setup pulsar authentication fail", zap.Error(err)) + log.Error("setup pulsar authentication fail", zap.Error(err)) return nil, err } pulsarClient, err := pulsar.NewClient(co) if err != nil { - log.L().Error("cannot connect to pulsar", zap.Error(err)) + log.Error("cannot connect to pulsar", zap.Error(err)) return nil, err } return pulsarClient, nil @@ -62,14 +62,12 @@ func setupAuthentication(config *Config) (pulsar.Authentication, error) { return pulsar.NewAuthenticationBasic(config.BasicUserName, config.BasicPassword) } else if len(config.OAuth2) >= 5 { return pulsar.NewAuthenticationOAuth2(config.OAuth2), nil - } else if len(config.TLSCertificatePath) > 0 && len(config.TLSPrivateKeyPath) > 0 { - return pulsar.NewAuthenticationTLS(config.TLSCertificatePath, config.TLSPrivateKeyPath), nil } return nil, fmt.Errorf("no authentication method found") } // NewMockCreatorFactory returns a factory implemented based on kafka-go func NewMockCreatorFactory(config *Config, changefeedID model.ChangeFeedID) (pulsar.Client, error) { - log.L().Info("mock pulsar client factory created", zap.Any("changfeedID", changefeedID)) + log.Info("mock pulsar client factory created", zap.Any("changfeedID", changefeedID)) return nil, nil } diff --git a/pkg/sink/sink_type.go b/pkg/sink/sink_type.go index 18b7e18cf8e..184fdc7db11 100644 --- a/pkg/sink/sink_type.go +++ b/pkg/sink/sink_type.go @@ -65,6 +65,8 @@ const ( CloudStorageNoopScheme = "noop" // PulsarScheme indicates the scheme is pulsar PulsarScheme = "pulsar" + // PulsarSSLScheme + PulsarSSLScheme = "pulsar+ssl" ) // IsMQScheme returns true if the scheme belong to mq scheme. From 1f0fc0970a67713803192f372aa7808f29aff8a5 Mon Sep 17 00:00:00 2001 From: yumchina Date: Wed, 2 Aug 2023 17:40:11 +0800 Subject: [PATCH 07/11] an empty implementation of GetPartitionNum for pulsar --- cdc/sink/dmlsink/mq/manager/pulsar_manager.go | 36 +++---------------- .../dmlsink/mq/manager/pulsar_manager_mock.go | 1 + 2 files changed, 5 insertions(+), 32 deletions(-) diff --git a/cdc/sink/dmlsink/mq/manager/pulsar_manager.go b/cdc/sink/dmlsink/mq/manager/pulsar_manager.go index 8da156df50c..e2925ec63f4 100644 --- a/cdc/sink/dmlsink/mq/manager/pulsar_manager.go +++ b/cdc/sink/dmlsink/mq/manager/pulsar_manager.go @@ -15,20 +15,11 @@ package manager import ( "context" - "sync" - "time" - "github.com/apache/pulsar-client-go/pulsar" - "github.com/pingcap/log" pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" - "go.uber.org/zap" + "sync" ) -type partition struct { - partitions []string - since time.Time -} - // PulsarTopicManager is a manager for pulsar topics. type PulsarTopicManager func( cfg *pulsarConfig.Config, @@ -57,30 +48,11 @@ func NewPulsarTopicManager( } // GetPartitionNum spend more time,but no use. +// Neither synchronous nor asynchronous sending of pulsar will use PartitionNum +// but this method is used in mq_ddl_sink.go, so an empty implementation is required func (m *pulsarTopicManager) GetPartitionNum(ctx context.Context, topic string) (int32, error) { - if v, ok := m.partitions.Load(topic); ok { - pt, ok := v.(*partition) - if ok { - if time.Since(pt.since) > time.Minute { - m.partitions.Delete(topic) - } - return int32(len(pt.partitions)), nil - } - } - partitions, err := m.client.TopicPartitions(topic) - if err != nil { - log.L().Error("pulsar GetPartitions fail", zap.Error(err)) - return 0, err - } - log.L().Debug("pulsar GetPartitions", zap.Strings("partitions", partitions)) - pt := &partition{ - partitions: partitions, - since: time.Now(), - } - m.partitions.Store(topic, pt) - - return int32(len(pt.partitions)), nil + return 0, nil } // CreateTopicAndWaitUntilVisible no need to create first diff --git a/cdc/sink/dmlsink/mq/manager/pulsar_manager_mock.go b/cdc/sink/dmlsink/mq/manager/pulsar_manager_mock.go index 12991941b0d..3d556193fdb 100644 --- a/cdc/sink/dmlsink/mq/manager/pulsar_manager_mock.go +++ b/cdc/sink/dmlsink/mq/manager/pulsar_manager_mock.go @@ -34,6 +34,7 @@ func NewMockPulsarTopicManager( } // GetPartitionNum spend more time,but no use. +// mock 3 partitions func (m *pulsarTopicManagerMock) GetPartitionNum(ctx context.Context, topic string) (int32, error) { return 3, nil } From e3ed5a6700e09ea814ec6fb71dab9ddeb9550051 Mon Sep 17 00:00:00 2001 From: yumchina Date: Thu, 3 Aug 2023 15:05:05 +0800 Subject: [PATCH 08/11] use lru to producers cache TLS config --- .../ddlsink/mq/ddlproducer/ddl_producer.go | 3 +- .../mq/ddlproducer/pulsar_ddl_producer.go | 75 +++++++++++-------- cdc/sink/ddlsink/mq/pulsar_ddl_sink.go | 4 +- cdc/sink/dmlsink/mq/manager/pulsar_manager.go | 1 + pkg/config/sink.go | 8 ++ pkg/sink/pulsar/config.go | 16 ---- pkg/sink/pulsar/factory.go | 19 ++++- 7 files changed, 74 insertions(+), 52 deletions(-) diff --git a/cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go index 466b82fe331..aac72acb7ed 100644 --- a/cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go +++ b/cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go @@ -15,6 +15,7 @@ package ddlproducer import ( "context" + "github.com/pingcap/tiflow/pkg/config" "github.com/apache/pulsar-client-go/pulsar" "github.com/pingcap/tiflow/cdc/model" @@ -43,4 +44,4 @@ type Factory func(ctx context.Context, changefeedID model.ChangeFeedID, // PulsarFactory is a function to create a pulsar producer. type PulsarFactory func(ctx context.Context, changefeedID model.ChangeFeedID, - pConfig *pulsarConfig.Config, client pulsar.Client) (DDLProducer, error) + pConfig *pulsarConfig.Config, client pulsar.Client, sinkConfig *config.SinkConfig) (DDLProducer, error) diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go index ffff48e18a3..bc8bb44d704 100644 --- a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go @@ -16,15 +16,21 @@ package ddlproducer import ( "context" "encoding/json" - "sync" - "github.com/apache/pulsar-client-go/pulsar" + lru "github.com/hashicorp/golang-lru" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/codec/common" pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" "go.uber.org/zap" + "sync" +) + +const ( + // DefaultPulsarProducerCacheSize is the default size of the cache for producers + // 10240 producers maybe cost 1.1G memory + DefaultPulsarProducerCacheSize = 10240 ) // Assert DDLEventSink implementation @@ -36,7 +42,7 @@ type pulsarProducers struct { pConfig *pulsarConfig.Config defaultTopicName string // support multiple topics - producers map[string]pulsar.Producer + producers *lru.Cache producersMutex sync.RWMutex id model.ChangeFeedID } @@ -85,6 +91,7 @@ func NewPulsarProducer( changefeedID model.ChangeFeedID, pConfig *pulsarConfig.Config, client pulsar.Client, + sinkConfig *config.SinkConfig, ) (DDLProducer, error) { log.Info("Starting pulsar DDL producer ...", zap.String("namespace", changefeedID.Namespace), @@ -96,8 +103,21 @@ func NewPulsarProducer( if err != nil { return nil, err } - producers := make(map[string]pulsar.Producer) - producers[topicName] = defaultProducer + + producerCacheSize := DefaultPulsarProducerCacheSize + if sinkConfig.PulsarConfig != nil && sinkConfig.PulsarConfig.PulsarProducerCacheSize != nil { + producerCacheSize = int(*sinkConfig.PulsarConfig.PulsarProducerCacheSize) + } + + producers, err := lru.NewWithEvict(producerCacheSize, func(key interface{}, value interface{}) { + // remove producer + pulsarProducer, ok := value.(pulsar.Producer) + if ok && pulsarProducer != nil { + pulsarProducer.Close() + } + }) + + producers.Add(topicName, defaultProducer) return &pulsarProducers{ client: client, pConfig: pConfig, @@ -142,25 +162,30 @@ func newProducer( return producer, nil } -// GetProducerByTopic get producer by topicName -func (p *pulsarProducers) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error) { - p.producersMutex.RLock() - producer, ok := p.producers[topicName] - p.producersMutex.RUnlock() - if !ok { // create a new producer for the topicName - p.producersMutex.Lock() - defer p.producersMutex.Unlock() - - producer, ok = p.producers[topicName] +func (p *pulsarProducers) getProducer(topic string) (pulsar.Producer, bool) { + target, ok := p.producers.Get(topic) + if ok { + producer, ok := target.(pulsar.Producer) if ok { - return producer, nil + return producer, true } + } + return nil, false +} +// GetProducerByTopic get producer by topicName +func (p *pulsarProducers) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error) { + getProducer, ok := p.getProducer(topicName) + if ok && getProducer != nil { + return getProducer, nil + } + + if !ok { // create a new producer for the topicName producer, err = newProducer(p.pConfig, p.client, topicName) if err != nil { return nil, err } - p.producers[topicName] = producer + p.producers.Add(topicName, producer) } return producer, nil @@ -168,21 +193,11 @@ func (p *pulsarProducers) GetProducerByTopic(topicName string) (producer pulsar. // Close close all producers func (p *pulsarProducers) Close() { - for topic, producer := range p.producers { - producer.Close() - p.closeProducersMapByTopic(topic) - } - p.client.Close() -} - -// closeProducersMapByTopic close producer by topicName -func (p *pulsarProducers) closeProducersMapByTopic(topicName string) { + keys := p.producers.Keys() p.producersMutex.Lock() defer p.producersMutex.Unlock() - _, ok := p.producers[topicName] - if ok { - delete(p.producers, topicName) - return + for _, topic := range keys { + p.producers.Remove(topic) // callback func will be called } } diff --git a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go index a2762bcf253..5007706c148 100644 --- a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go @@ -81,13 +81,13 @@ func NewPulsarDDLSink( } start := time.Now() - client, err := clientCreator(pConfig, changefeedID) + client, err := clientCreator(pConfig, changefeedID, replicaConfig.Sink) if err != nil { log.Error("DDL sink producer client create fail", zap.Error(err)) return nil, cerror.WrapError(cerror.ErrPulsarNewClient, err) } - p, err := producerCreator(ctx, changefeedID, pConfig, client) + p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink) log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) if err != nil { return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) diff --git a/cdc/sink/dmlsink/mq/manager/pulsar_manager.go b/cdc/sink/dmlsink/mq/manager/pulsar_manager.go index e2925ec63f4..6a64b70a51e 100644 --- a/cdc/sink/dmlsink/mq/manager/pulsar_manager.go +++ b/cdc/sink/dmlsink/mq/manager/pulsar_manager.go @@ -57,6 +57,7 @@ func (m *pulsarTopicManager) GetPartitionNum(ctx context.Context, topic string) // CreateTopicAndWaitUntilVisible no need to create first func (m *pulsarTopicManager) CreateTopicAndWaitUntilVisible(ctx context.Context, topicName string) (int32, error) { + return 0, nil } diff --git a/pkg/config/sink.go b/pkg/config/sink.go index c88e6aea6af..2b92eaf5396 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -155,6 +155,7 @@ type SinkConfig struct { // SafeMode is only available when the downstream is DB. SafeMode *bool `toml:"safe-mode" json:"safe-mode,omitempty"` KafkaConfig *KafkaConfig `toml:"kafka-config" json:"kafka-config,omitempty"` + PulsarConfig *PulsarConfig `toml:"pulsar-config" json:"pulsar-config,omitempty"` MySQLConfig *MySQLConfig `toml:"mysql-config" json:"mysql-config,omitempty"` CloudStorageConfig *CloudStorageConfig `toml:"cloud-storage-config" json:"cloud-storage-config,omitempty"` } @@ -326,7 +327,14 @@ type KafkaConfig struct { LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` } +// PulsarConfig pulsar sink configuration type PulsarConfig struct { + TLSKeyFilePath *string `toml:"tls-certificate-path" json:"tls-certificate-path,omitempty"` + TLSCertificateFile *string `toml:"tls-certificate-file" json:"tls-private-key-path,omitempty"` + TLSTrustCertsFilePath *string `toml:"tls-trust-certs-file-path" json:"tls-trust-certs-file-path,omitempty"` + + // PulsarProducerCacheSize is the size of the cache of pulsar producers + PulsarProducerCacheSize *int32 `toml:"pulsar-producer-cache-size" json:"pulsar-producer-cache-size,omitempty"` } // MySQLConfig represents a MySQL sink configuration diff --git a/pkg/sink/pulsar/config.go b/pkg/sink/pulsar/config.go index 17e9a628c1d..1fb42a78dea 100644 --- a/pkg/sink/pulsar/config.go +++ b/pkg/sink/pulsar/config.go @@ -73,12 +73,6 @@ const ( // Protocol The message protocol type input to pulsar, pulsar currently supports canal-json, canal, maxwell Protocol = "protocol" - // TLSCertificatePath create new pulsar authentication provider with specified TLS certificate and private key - TLSCertificatePath = "tls-certificate-path" - - // TLSPrivateKeyPath TLS private key - TLSPrivateKeyPath = "tls-private-key-path" - // OAuth2IssuerURL the URL of the authorization server. OAuth2IssuerURL = "oauth2-issuer-url" // OAuth2Audience the URL of the resource server. @@ -344,16 +338,6 @@ func (c *Config) Apply(sinkURI *url.URL) error { c.BasicPassword = s } - s = params.Get(TLSCertificatePath) - if len(s) > 0 { - c.TLSCertificatePath = s - } - - s = params.Get(TLSPrivateKeyPath) - if len(s) > 0 { - c.TLSPrivateKeyPath = s - } - c.applyOAuth(params) s = params.Get(Protocol) diff --git a/pkg/sink/pulsar/factory.go b/pkg/sink/pulsar/factory.go index 1a07bbaac42..d6bd29c850a 100644 --- a/pkg/sink/pulsar/factory.go +++ b/pkg/sink/pulsar/factory.go @@ -15,6 +15,7 @@ package pulsar import ( "fmt" + "github.com/pingcap/tiflow/pkg/config" "github.com/apache/pulsar-client-go/pulsar" "github.com/pingcap/log" @@ -23,10 +24,10 @@ import ( ) // FactoryCreator defines the type of factory creator. -type FactoryCreator func(config *Config, changefeedID model.ChangeFeedID) (pulsar.Client, error) +type FactoryCreator func(config *Config, changefeedID model.ChangeFeedID, sinkConfig *config.SinkConfig) (pulsar.Client, error) // NewCreatorFactory returns a factory implemented based on kafka-go -func NewCreatorFactory(config *Config, changefeedID model.ChangeFeedID) (pulsar.Client, error) { +func NewCreatorFactory(config *Config, changefeedID model.ChangeFeedID, sinkConfig *config.SinkConfig) (pulsar.Client, error) { co := pulsar.ClientOptions{ URL: config.URL, CustomMetricsLabels: map[string]string{ @@ -44,6 +45,17 @@ func NewCreatorFactory(config *Config, changefeedID model.ChangeFeedID) (pulsar. return nil, err } + // pulsar TLS config + if sinkConfig.PulsarConfig != nil { + sinkPulsar := sinkConfig.PulsarConfig + if sinkPulsar.TLSCertificateFile != nil && sinkPulsar.TLSKeyFilePath != nil && + sinkPulsar.TLSTrustCertsFilePath != nil { + co.TLSCertificateFile = *sinkPulsar.TLSCertificateFile + co.TLSKeyFilePath = *sinkPulsar.TLSKeyFilePath + co.TLSTrustCertsFilePath = *sinkPulsar.TLSTrustCertsFilePath + } + } + pulsarClient, err := pulsar.NewClient(co) if err != nil { log.Error("cannot connect to pulsar", zap.Error(err)) @@ -67,7 +79,8 @@ func setupAuthentication(config *Config) (pulsar.Authentication, error) { } // NewMockCreatorFactory returns a factory implemented based on kafka-go -func NewMockCreatorFactory(config *Config, changefeedID model.ChangeFeedID) (pulsar.Client, error) { +func NewMockCreatorFactory(config *Config, changefeedID model.ChangeFeedID, + sinkConfig *config.SinkConfig) (pulsar.Client, error) { log.Info("mock pulsar client factory created", zap.Any("changfeedID", changefeedID)) return nil, nil } From 6d93b24d338048a56073b5f4384b8cab946a8dc3 Mon Sep 17 00:00:00 2001 From: yumchina Date: Fri, 4 Aug 2023 14:28:48 +0800 Subject: [PATCH 09/11] format code --- .../ddlsink/mq/ddlproducer/ddl_producer.go | 2 +- .../ddlproducer/pulsar_ddl_mock_producer.go | 2 ++ .../mq/ddlproducer/pulsar_ddl_producer.go | 6 +++++- cdc/sink/dmlsink/mq/manager/pulsar_manager.go | 5 ++--- docs/swagger/docs.go | 21 +++++++++++++++++++ docs/swagger/swagger.json | 21 +++++++++++++++++++ docs/swagger/swagger.yaml | 14 +++++++++++++ pkg/sink/pulsar/config.go | 6 +----- pkg/sink/pulsar/factory.go | 5 +++-- pkg/sink/sink_type.go | 2 +- 10 files changed, 71 insertions(+), 13 deletions(-) diff --git a/cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go index aac72acb7ed..ff35c4cf759 100644 --- a/cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go +++ b/cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go @@ -15,10 +15,10 @@ package ddlproducer import ( "context" - "github.com/pingcap/tiflow/pkg/config" "github.com/apache/pulsar-client-go/pulsar" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/kafka" pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go index 176e2fa8212..498a63039db 100644 --- a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go @@ -19,6 +19,7 @@ import ( "github.com/apache/pulsar-client-go/pulsar" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/codec/common" pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" ) @@ -73,6 +74,7 @@ func NewMockPulsarProducerDDL( changefeedID model.ChangeFeedID, pConfig *pulsarConfig.Config, client pulsar.Client, + sinkConfig *config.SinkConfig, ) (DDLProducer, error) { return NewMockPulsarProducer(ctx, changefeedID, pConfig, client) } diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go index bc8bb44d704..7d5f1b1ddfa 100644 --- a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_producer.go @@ -16,6 +16,8 @@ package ddlproducer import ( "context" "encoding/json" + "sync" + "github.com/apache/pulsar-client-go/pulsar" lru "github.com/hashicorp/golang-lru" "github.com/pingcap/log" @@ -24,7 +26,6 @@ import ( "github.com/pingcap/tiflow/pkg/sink/codec/common" pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" "go.uber.org/zap" - "sync" ) const ( @@ -116,6 +117,9 @@ func NewPulsarProducer( pulsarProducer.Close() } }) + if err != nil { + return nil, err + } producers.Add(topicName, defaultProducer) return &pulsarProducers{ diff --git a/cdc/sink/dmlsink/mq/manager/pulsar_manager.go b/cdc/sink/dmlsink/mq/manager/pulsar_manager.go index 6a64b70a51e..ddc5542d614 100644 --- a/cdc/sink/dmlsink/mq/manager/pulsar_manager.go +++ b/cdc/sink/dmlsink/mq/manager/pulsar_manager.go @@ -15,9 +15,10 @@ package manager import ( "context" + "sync" + "github.com/apache/pulsar-client-go/pulsar" pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" - "sync" ) // PulsarTopicManager is a manager for pulsar topics. @@ -51,13 +52,11 @@ func NewPulsarTopicManager( // Neither synchronous nor asynchronous sending of pulsar will use PartitionNum // but this method is used in mq_ddl_sink.go, so an empty implementation is required func (m *pulsarTopicManager) GetPartitionNum(ctx context.Context, topic string) (int32, error) { - return 0, nil } // CreateTopicAndWaitUntilVisible no need to create first func (m *pulsarTopicManager) CreateTopicAndWaitUntilVisible(ctx context.Context, topicName string) (int32, error) { - return 0, nil } diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 7c9c1779ef6..8c62f04d3ec 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1620,6 +1620,24 @@ var doc = `{ } } }, + "config.PulsarConfig": { + "type": "object", + "properties": { + "pulsar-producer-cache-size": { + "description": "PulsarProducerCacheSize is the size of the cache of pulsar producers", + "type": "integer" + }, + "tls-certificate-path": { + "type": "string" + }, + "tls-private-key-path": { + "type": "string" + }, + "tls-trust-certs-file-path": { + "type": "string" + } + } + }, "config.SinkConfig": { "type": "object", "properties": { @@ -1682,6 +1700,9 @@ var doc = `{ "description": "Protocol is NOT available when the downstream is DB.", "type": "string" }, + "pulsar-config": { + "$ref": "#/definitions/config.PulsarConfig" + }, "safe-mode": { "description": "SafeMode is only available when the downstream is DB.", "type": "boolean" diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 2b7ab30dd28..b020fec566b 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1601,6 +1601,24 @@ } } }, + "config.PulsarConfig": { + "type": "object", + "properties": { + "pulsar-producer-cache-size": { + "description": "PulsarProducerCacheSize is the size of the cache of pulsar producers", + "type": "integer" + }, + "tls-certificate-path": { + "type": "string" + }, + "tls-private-key-path": { + "type": "string" + }, + "tls-trust-certs-file-path": { + "type": "string" + } + } + }, "config.SinkConfig": { "type": "object", "properties": { @@ -1663,6 +1681,9 @@ "description": "Protocol is NOT available when the downstream is DB.", "type": "string" }, + "pulsar-config": { + "$ref": "#/definitions/config.PulsarConfig" + }, "safe-mode": { "description": "SafeMode is only available when the downstream is DB.", "type": "boolean" diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 8b9b2402147..be2553eac38 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -186,6 +186,18 @@ definitions: write-timeout: type: string type: object + config.PulsarConfig: + properties: + pulsar-producer-cache-size: + description: PulsarProducerCacheSize is the size of the cache of pulsar producers + type: integer + tls-certificate-path: + type: string + tls-private-key-path: + type: string + tls-trust-certs-file-path: + type: string + type: object config.SinkConfig: properties: cloud-storage-config: @@ -236,6 +248,8 @@ definitions: protocol: description: Protocol is NOT available when the downstream is DB. type: string + pulsar-config: + $ref: '#/definitions/config.PulsarConfig' safe-mode: description: SafeMode is only available when the downstream is DB. type: boolean diff --git a/pkg/sink/pulsar/config.go b/pkg/sink/pulsar/config.go index 1fb42a78dea..9b5f691e7e0 100644 --- a/pkg/sink/pulsar/config.go +++ b/pkg/sink/pulsar/config.go @@ -15,13 +15,13 @@ package pulsar import ( "fmt" - "github.com/apache/pulsar-client-go/pulsar/auth" "net/url" "strconv" "strings" "time" "github.com/apache/pulsar-client-go/pulsar" + "github.com/apache/pulsar-client-go/pulsar/auth" "github.com/pingcap/log" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -81,10 +81,6 @@ const ( OAuth2PrivateKey = "oauth2-private-key" // OAuth2ClientID the client ID of the application. OAuth2ClientID = "oauth2-client-id" - // OAuth2Type the type of the OAuth2 . - OAuth2Type = "oauth2-type" - // OAuth2TypeClientCredentials client_credentials - OAuth2TypeClientCredentials = "oauth2-client-credentials" // OAuth2Scope scope OAuth2Scope = "auth2-scope" ) diff --git a/pkg/sink/pulsar/factory.go b/pkg/sink/pulsar/factory.go index d6bd29c850a..c0e7ff19d48 100644 --- a/pkg/sink/pulsar/factory.go +++ b/pkg/sink/pulsar/factory.go @@ -15,11 +15,11 @@ package pulsar import ( "fmt" - "github.com/pingcap/tiflow/pkg/config" "github.com/apache/pulsar-client-go/pulsar" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" "go.uber.org/zap" ) @@ -80,7 +80,8 @@ func setupAuthentication(config *Config) (pulsar.Authentication, error) { // NewMockCreatorFactory returns a factory implemented based on kafka-go func NewMockCreatorFactory(config *Config, changefeedID model.ChangeFeedID, - sinkConfig *config.SinkConfig) (pulsar.Client, error) { + sinkConfig *config.SinkConfig, +) (pulsar.Client, error) { log.Info("mock pulsar client factory created", zap.Any("changfeedID", changefeedID)) return nil, nil } diff --git a/pkg/sink/sink_type.go b/pkg/sink/sink_type.go index 184fdc7db11..d257e87521e 100644 --- a/pkg/sink/sink_type.go +++ b/pkg/sink/sink_type.go @@ -65,7 +65,7 @@ const ( CloudStorageNoopScheme = "noop" // PulsarScheme indicates the scheme is pulsar PulsarScheme = "pulsar" - // PulsarSSLScheme + // PulsarSSLScheme indicates the scheme is pulsar+ssl PulsarSSLScheme = "pulsar+ssl" ) From 433bc32d01519b22a19867ea996d1975431fb73b Mon Sep 17 00:00:00 2001 From: yumchina Date: Fri, 4 Aug 2023 15:55:29 +0800 Subject: [PATCH 10/11] add NewAuthenticationTLS for TLS --- pkg/sink/pulsar/config.go | 24 ++++++++++++++++++++---- pkg/sink/pulsar/factory.go | 2 ++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/pkg/sink/pulsar/config.go b/pkg/sink/pulsar/config.go index 9b5f691e7e0..e19cf6011d5 100644 --- a/pkg/sink/pulsar/config.go +++ b/pkg/sink/pulsar/config.go @@ -83,6 +83,12 @@ const ( OAuth2ClientID = "oauth2-client-id" // OAuth2Scope scope OAuth2Scope = "auth2-scope" + + // AuthTLSCertificatePath create new pulsar authentication provider with specified TLS certificate and private key + AuthTLSCertificatePath = "auth-tls-certificate-path" + + // AuthTLSPrivateKeyPath auth TLS private key + AuthTLSPrivateKeyPath = "auth-tls-private-key-path" ) // sink config default Value @@ -154,10 +160,10 @@ type Config struct { // BasicPassword with account BasicPassword string - // TLSCertificatePath create new pulsar authentication provider with specified TLS certificate and private key - TLSCertificatePath string - // TLSPrivateKeyPath private key - TLSPrivateKeyPath string + // AuthTLSCertificatePath create new pulsar authentication provider with specified TLS certificate and private key + AuthTLSCertificatePath string + // AuthTLSPrivateKeyPath private key + AuthTLSPrivateKeyPath string // Oauth2 include oauth2-issuer-url oauth2-audience oauth2-private-key oauth2-client-id // and 'type' always is 'client_credentials' @@ -334,6 +340,16 @@ func (c *Config) Apply(sinkURI *url.URL) error { c.BasicPassword = s } + s = params.Get(AuthTLSCertificatePath) + if len(s) > 0 { + c.AuthTLSCertificatePath = s + } + + s = params.Get(AuthTLSPrivateKeyPath) + if len(s) > 0 { + c.AuthTLSPrivateKeyPath = s + } + c.applyOAuth(params) s = params.Get(Protocol) diff --git a/pkg/sink/pulsar/factory.go b/pkg/sink/pulsar/factory.go index c0e7ff19d48..cfefc6ec1ba 100644 --- a/pkg/sink/pulsar/factory.go +++ b/pkg/sink/pulsar/factory.go @@ -74,6 +74,8 @@ func setupAuthentication(config *Config) (pulsar.Authentication, error) { return pulsar.NewAuthenticationBasic(config.BasicUserName, config.BasicPassword) } else if len(config.OAuth2) >= 5 { return pulsar.NewAuthenticationOAuth2(config.OAuth2), nil + } else if len(config.AuthTLSCertificatePath) > 0 && len(config.AuthTLSPrivateKeyPath) > 0 { + return pulsar.NewAuthenticationTLS(config.AuthTLSCertificatePath, config.AuthTLSPrivateKeyPath), nil } return nil, fmt.Errorf("no authentication method found") } From 729a64f4850aa748c94cb120044fe0c6b7b45aa7 Mon Sep 17 00:00:00 2001 From: yumchina Date: Mon, 7 Aug 2023 15:35:08 +0800 Subject: [PATCH 11/11] format documents --- docs/design/2023-07-04-ticdc-pulsar-sink.md | 59 ++++++++++++--------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/docs/design/2023-07-04-ticdc-pulsar-sink.md b/docs/design/2023-07-04-ticdc-pulsar-sink.md index 4d963ad81f6..e24544e4c29 100644 --- a/docs/design/2023-07-04-ticdc-pulsar-sink.md +++ b/docs/design/2023-07-04-ticdc-pulsar-sink.md @@ -39,18 +39,15 @@ ## Introduction - -This document provides a complete design on implementing pulsar sink for TiCDC. +This document provides a complete design on implementing pulsar sink for TiCDC. The pulsar sink is used to distribute the DML change records, and DDL events generated by TiCDC. - ## Motivation or Background Incorporating Pulsar into Ticdc is for the purpose of expanding the downstream MQ distribution channels. Users want to output TiDB events to Pulsar, because they can reuse machines from Pulsar with others, the pulsar easily expanded horizontally etc. - ## Detailed Design #### Protocol-support @@ -58,13 +55,14 @@ the pulsar easily expanded horizontally etc. In order to maintain the consistency of the middleware of the MQ class, we give priority support some of the protocols supported by Kafka: -__CanalJSON__ +**CanalJSON** -__Canal__ +**Canal** -__Maxwell__ +**Maxwell** CanalJSON protocol sample: + ``` for more information, please refer to: https://docs.pingcap.com/tidb/dev/ticdc-canal-json @@ -94,17 +92,19 @@ for more information, please refer to: https://docs.pingcap.com/tidb/dev/ticdc-c - Ensure that there are no incomplete inner-table transactions in Pulsar. - Ensure that every event must be sent to Pulsar at least once. - #### Pulsar Client + ##### Information -https://github.com/apache/pulsar-client-go Version: v0.10.0 -Requirement Golang 1.18+ +https://github.com/apache/pulsar-client-go Version: v0.10.0 +Requirement Golang 1.18+ ##### Different from Kafka + The difference between pulsar and kafka is that the producer in the client of pulsar must be bound to a topic, but kafka does not. ##### Pulsar Client Config + ```api type ClientOptions struct { // Configure the service URL for the Pulsar service. @@ -136,36 +136,37 @@ type ClientOptions struct { **Main Note:** -- URL: like pulsar://127.0.0.1:6650 +- URL: like pulsar://127.0.0.1:6650 - Authentication: We only support token/token-from-file/account-with-password. - MetricsRegisterer: We initialize pulsar MetricsRegisterer with `prometheus.NewRegistry()` from tiflow project `cdc/server/metrics.go` - #### Pulsar Producer + ```go type ProducerOptions struct { // Topic specifies the topic this producer will be publishing on. // This argument is required when constructing the producer. Topic string - + // Properties specifies a set of application defined properties for the producer. // This properties will be visible in the topic stats Properties map[string]string - + //……… others } ``` -- Payload: is carrying real binary data + +- Payload: is carrying real binary data - Value: Value and payload is mutually exclusive, Value for schema message. - Key: The optional key associated with the message (particularly useful for things like topic compaction) **We must cache all producers to the client for different topics Every changefeed of pulsar client have a producer map. Type as `map[string]pulsar.Producer`, the key is topic name, value is producer of pulsar client.** - ##### Producer Message: + ```go type ProducerMessage struct { // Payload for the message @@ -181,7 +182,7 @@ OrderingKey string } ``` -- Payload: is carrying real binary data +- Payload: is carrying real binary data - Value: Value and payload is mutually exclusive, Value for schema message. - Key: The optional key associated with the message (particularly useful for things like topic compaction) - OrderingKey: OrderingKey sets the ordering key of the message.Same as Key, so we do not use it. @@ -195,7 +196,7 @@ OrderingKey string #### Pulsar Route Rule - We support route events to different partitions by changefeed config dispatchers, -refer to `Pulsar Topic Rule` + refer to `Pulsar Topic Rule` - You can set the message-key to any characters. We do not set any characters default, the event will be sent to the partition by hash algorithm. #### Pulsar Topic Rule @@ -203,8 +204,8 @@ refer to `Pulsar Topic Rule` ```yaml dispatchers = [ {matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1",partition="table" }, - {matcher = ['test6.*'],topic = "Topic expression 2",partition="ts" } -] + {matcher = ['test6.*'],topic = "Topic expression 2",partition="ts" } +] The topic expression syntax is legal if it meets the following conditions: 1.{schema} and {table} respectively identify the database name and table name that need to be matched, and are required fields. Pulsar support "(persistent|non-persistent)://tenant/namespace/topic" as topic name。 @@ -218,35 +219,43 @@ The topic expression syntax is legal if it meets the following conditions: ``` - #### Produce DDL Event We implement the DDLProducer interface ##### SyncSendMessage Method + It will find a producer by topic name. Send the event to pulsar. Report some metrics . `partitionNum` is not used, because the pulsar server supports set partition num only. + ##### SyncBroadcastMessage Method + It do nothing + ##### Close Method -Close every producers +Close every producers ##### Produce DML Event + We implement the DMLProducer interface + ##### AsyncSendMessage Method + It will find a producer by topic name. Set a callback function to the pulsar producer client. Send the event to pulsar. Report some metrics. `partitionNum` is not used, because the pulsar server supports set partition num only. + ##### Close Method + Close every producers #### Pulsar Metrics - + Pulsar client support metric of `prometheus.Registry` Following are pulsar client metrics @@ -281,17 +290,19 @@ pulsar_client_rpc_count ``` #### User Interface + **Sink-URI** When creating a changefeed, the user can specify the sink-uri like this: cdc cli changefeed create --sink-uri="${scheme}://${address}/${topic-name}?protocol=${protocol}&pulsar-version=${pulsar-version}&authentication-token=${authentication-token} Example: + ``` cdc cli changefeed create --server=http://127.0.0.1:8300 --sink-uri="pulsar://127.0.0.1:6650/persistent://public/default/test?protocol=canal-json&pulsar-version=v2.10.0&authentication-token=eyJhbGciOiJSUzIxxxxxxxxxxxxxxxxx" ``` - + ## Test Design Pulsar sink is a new feature, For tests, we focus on the functional tests, scenario tests and benchmark.