Skip to content

Commit

Permalink
sink(ticdc): DDL event support for the Debezium protocol (#11674)
Browse files Browse the repository at this point in the history
close #11566
  • Loading branch information
wk989898 authored Nov 12, 2024
1 parent 64decf6 commit 0e0b615
Show file tree
Hide file tree
Showing 58 changed files with 4,388 additions and 837 deletions.
1 change: 1 addition & 0 deletions cdc/sink/ddlsink/mq/mq_ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,5 @@ func TestGetDLLDispatchRuleByProtocol(t *testing.T) {
require.Equal(t, PartitionAll, getDDLDispatchRule(config.ProtocolMaxwell))
require.Equal(t, PartitionAll, getDDLDispatchRule(config.ProtocolCraft))
require.Equal(t, PartitionAll, getDDLDispatchRule(config.ProtocolSimple))
require.Equal(t, PartitionAll, getDDLDispatchRule(config.ProtocolDebezium))
}
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ error = '''
cannot find mysql.tidb_ddl_job schema
'''

["CDC:ErrDDLUnsupportType"]
error = '''
unsupport ddl type %s, query %s
'''

["CDC:ErrDatumUnflatten"]
error = '''
unflatten datume data
Expand Down
4 changes: 4 additions & 0 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ var (
"cannot find mysql.tidb_ddl_job schema",
errors.RFCCodeText("CDC:ErrDDLSchemaNotFound"),
)
ErrDDLUnsupportType = errors.Normalize(
"unsupport ddl type %s, query %s",
errors.RFCCodeText("CDC:ErrDDLUnsupportType"),
)
ErrGRPCDialFailed = errors.Normalize(
"grpc dial failed",
errors.RFCCodeText("CDC:ErrGRPCDialFailed"),
Expand Down
15 changes: 12 additions & 3 deletions pkg/sink/codec/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math"
"strings"
"unsafe"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/log"
Expand Down Expand Up @@ -300,9 +301,7 @@ const (
)

// EscapeEnumAndSetOptions escapes ",", "\" and "”"
// https://github.com/debezium/debezium/blob/9f7ede0e0695f012c6c4e715e96aed85eecf6b5f \
// /debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/ \
// MySqlAntlrDdlParser.java#L374
// https://github.com/debezium/debezium/blob/9f7ede0e0695f012c6c4e715e96aed85eecf6b5f/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/MySqlAntlrDdlParser.java#L374
func EscapeEnumAndSetOptions(option string) string {
option = strings.ReplaceAll(option, ",", "\\,")
option = strings.ReplaceAll(option, "\\'", "'")
Expand Down Expand Up @@ -378,3 +377,13 @@ func SanitizeTopicName(name string) string {
}
return sanitizedName
}

// UnsafeBytesToString create string from byte slice without copying
func UnsafeBytesToString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}

// UnsafeStringToBytes create byte slice from string without copying
func UnsafeStringToBytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(&s))
}
15 changes: 5 additions & 10 deletions pkg/sink/codec/craft/message_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,16 @@ package craft
import (
"encoding/binary"
"math"
"unsafe"

"github.com/pingcap/errors"
pmodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
)

// / create string from byte slice without copying
func unsafeBytesToString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}

// / Primitive type decoders
func decodeUint8(bits []byte) ([]byte, byte, error) {
if len(bits) < 1 {
Expand Down Expand Up @@ -140,7 +135,7 @@ func decodeBytes(bits []byte) ([]byte, []byte, error) {
func decodeString(bits []byte) ([]byte, string, error) {
bits, bytes, err := decodeBytes(bits)
if err == nil {
return bits, unsafeBytesToString(bytes), nil
return bits, common.UnsafeBytesToString(bytes), nil
}
return bits, "", errors.Trace(err)
}
Expand All @@ -161,7 +156,7 @@ func decodeStringChunk(bits []byte, size int, allocator *SliceAllocator) ([]byte

data := allocator.stringSlice(size)
for i := 0; i < size; i++ {
data[i] = unsafeBytesToString(newBits[:larray[i]])
data[i] = common.UnsafeBytesToString(newBits[:larray[i]])
newBits = newBits[larray[i]:]
}
return newBits, data, nil
Expand All @@ -185,7 +180,7 @@ func decodeNullableStringChunk(bits []byte, size int, allocator *SliceAllocator)
if larray[i] == -1 {
continue
}
s := unsafeBytesToString(newBits[:larray[i]])
s := common.UnsafeBytesToString(newBits[:larray[i]])
data[i] = &s
newBits = newBits[larray[i]:]
}
Expand Down Expand Up @@ -324,7 +319,7 @@ func DecodeTiDBType(ty byte, flag model.ColumnFlagType, bits []byte) (interface{
switch ty {
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp, mysql.TypeDuration, mysql.TypeJSON, mysql.TypeNewDecimal:
// value type for these mysql types are string
return unsafeBytesToString(bits), nil
return common.UnsafeBytesToString(bits), nil
case mysql.TypeEnum, mysql.TypeSet, mysql.TypeBit:
// value type for thest mysql types are uint64
_, u64, err := decodeUvarint(bits)
Expand Down
14 changes: 2 additions & 12 deletions pkg/sink/codec/craft/message_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,13 @@ package craft
import (
"encoding/binary"
"math"
"unsafe"

"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
)

// create byte slice from string without copying
func unsafeStringToBytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(
&struct {
string
Cap int
}{s, len(s)},
))
}

// Primitive type encoders
func encodeFloat64(bits []byte, data float64) []byte {
v := math.Float64bits(data)
Expand Down Expand Up @@ -193,7 +183,7 @@ func EncodeTiDBType(allocator *SliceAllocator, ty byte, flag model.ColumnFlagTyp
switch ty {
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp, mysql.TypeDuration, mysql.TypeJSON, mysql.TypeNewDecimal:
// value type for these mysql types are string
return unsafeStringToBytes(value.(string))
return common.UnsafeStringToBytes(value.(string))
case mysql.TypeEnum, mysql.TypeSet, mysql.TypeBit:
// value type for these mysql types are uint64
return encodeUvarint(allocator.byteSlice(binary.MaxVarintLen64)[:0], value.(uint64))
Expand Down
Loading

0 comments on commit 0e0b615

Please sign in to comment.