Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ticdc(pulsar): write DDL events to pulsar #9433

Merged
merged 15 commits into from
Aug 7, 2023
Merged
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ CURDIR := $(shell pwd)
path_to_add := $(addsuffix /bin,$(subst :,/bin:,$(GOPATH)))
export PATH := $(CURDIR)/bin:$(CURDIR)/tools/bin:$(path_to_add):$(PATH)

# DBUS_SESSION_BUS_ADDRESS pulsar client use dbus to detect the connection status,
# but it will not exit when the connection is closed.
# I try to use leak_helper to detect goroutine leak,but it does not work.
# https://github.com/benthosdev/benthos/issues/1184 suggest to use environment variable to disable dbus.
export DBUS_SESSION_BUS_ADDRESS := /dev/null

SHELL := /usr/bin/env bash

TEST_DIR := /tmp/tidb_cdc_test
Expand Down
5 changes: 5 additions & 0 deletions cdc/sink/ddlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
"github.com/pingcap/tiflow/cdc/sink/ddlsink/mq"
"github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer"
"github.com/pingcap/tiflow/cdc/sink/ddlsink/mysql"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/kafka"
kafkav2 "github.com/pingcap/tiflow/pkg/sink/kafka/v2"
pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar"
"github.com/pingcap/tiflow/pkg/util"
)

Expand Down Expand Up @@ -59,6 +61,9 @@ func New(
return mysql.NewDDLSink(ctx, changefeedID, sinkURI, cfg)
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
return cloudstorage.NewDDLSink(ctx, changefeedID, sinkURI, cfg)
case sink.PulsarScheme, sink.PulsarSSLScheme:
return mq.NewPulsarDDLSink(ctx, changefeedID, sinkURI, cfg, manager.NewPulsarTopicManager,
pulsarConfig.NewCreatorFactory, ddlproducer.NewPulsarProducer)
default:
return nil,
cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", scheme)
Expand Down
7 changes: 7 additions & 0 deletions cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package ddlproducer
import (
"context"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/sink/kafka"
pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar"
)

// DDLProducer is the interface for DDL message producer.
Expand All @@ -38,3 +41,7 @@ type DDLProducer interface {
// Factory is a function to create a producer.
type Factory func(ctx context.Context, changefeedID model.ChangeFeedID,
syncProducer kafka.SyncProducer) DDLProducer

// PulsarFactory is a function to create a pulsar producer.
type PulsarFactory func(ctx context.Context, changefeedID model.ChangeFeedID,
pConfig *pulsarConfig.Config, client pulsar.Client, sinkConfig *config.SinkConfig) (DDLProducer, error)
113 changes: 113 additions & 0 deletions cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddlproducer

import (
"context"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar"
)

// Assert DDLEventSink implementation
var _ DDLProducer = (*PulsarMockProducers)(nil)

// PulsarMockProducers is a mock pulsar producer
type PulsarMockProducers struct {
events map[string][]*pulsar.ProducerMessage
}

// SyncBroadcastMessage pulsar consume all partitions
func (p *PulsarMockProducers) SyncBroadcastMessage(ctx context.Context, topic string,
totalPartitionsNum int32, message *common.Message,
) error {
// call SyncSendMessage

log.Info("pulsarProducers SyncBroadcastMessage in")
return p.SyncSendMessage(ctx, topic, totalPartitionsNum, message)
}

// SyncSendMessage sends a message
// partitionNum is not used,pulsar consume all partitions
func (p *PulsarMockProducers) SyncSendMessage(ctx context.Context, topic string,
partitionNum int32, message *common.Message,
) error {
data := &pulsar.ProducerMessage{
Payload: message.Value,
Key: message.GetPartitionKey(),
}
p.events[topic] = append(p.events[topic], data)

return nil
}

// NewMockPulsarProducer creates a pulsar producer
func NewMockPulsarProducer(
ctx context.Context,
changefeedID model.ChangeFeedID,
pConfig *pulsarConfig.Config,
client pulsar.Client,
) (*PulsarMockProducers, error) {
return &PulsarMockProducers{
events: map[string][]*pulsar.ProducerMessage{},
}, nil
}

// NewMockPulsarProducerDDL creates a pulsar producer for DDLProducer
func NewMockPulsarProducerDDL(
ctx context.Context,
changefeedID model.ChangeFeedID,
pConfig *pulsarConfig.Config,
client pulsar.Client,
sinkConfig *config.SinkConfig,
) (DDLProducer, error) {
return NewMockPulsarProducer(ctx, changefeedID, pConfig, client)
}

// GetProducerByTopic returns a producer by topic name
func (p *PulsarMockProducers) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error) {
return producer, nil
}

// Close close all producers
func (p *PulsarMockProducers) Close() {
p.events = make(map[string][]*pulsar.ProducerMessage)
}

// Flush waits for all the messages in the async producer to be sent to Pulsar.
// Notice: this method is not thread-safe.
// Do not try to call AsyncSendMessage and Flush functions in different threads,
// otherwise Flush will not work as expected. It may never finish or flush the wrong message.
// Because inflight will be modified by mistake.
func (p *PulsarMockProducers) Flush(ctx context.Context) error {
return nil
}

// GetAllEvents returns the events received by the mock producer.
func (p *PulsarMockProducers) GetAllEvents() []*pulsar.ProducerMessage {
var events []*pulsar.ProducerMessage
for _, v := range p.events {
events = append(events, v...)
}
return events
}

// GetEvents returns the event filtered by the key.
func (p *PulsarMockProducers) GetEvents(topic string) []*pulsar.ProducerMessage {
return p.events[topic]
}
Loading
Loading