Skip to content

Commit

Permalink
Merge branch 'master' into release-4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed Sep 11, 2020
2 parents bcd6bb9 + 51d0fd8 commit 51404d7
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 25 deletions.
19 changes: 16 additions & 3 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,11 @@ const (
tikvRequestMaxBackoff = 20000 // Maximum total sleep time(in ms)
grpcInitialWindowSize = 1 << 30 // The value for initial window size on a stream
grpcInitialConnWindowSize = 1 << 30 // The value for initial window size on a connection
grpcInitialMaxRecvMsgSize = 1 << 30 // The maximum message size the client can receive
grpcMaxCallRecvMsgSize = 1 << 30 // The maximum message size the client can receive
grpcConnCount = 10

// The threshold of warning a message is too large. TiKV split events into 6MB per-message.
warnRecvMsgSizeThreshold = 12 * 1024 * 1024
)

type singleRegionInfo struct {
Expand Down Expand Up @@ -192,7 +195,7 @@ func (a *connArray) Init(ctx context.Context) error {
grpcTLSOption,
grpc.WithInitialWindowSize(grpcInitialWindowSize),
grpc.WithInitialConnWindowSize(grpcInitialConnWindowSize),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(grpcInitialMaxRecvMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(grpcMaxCallRecvMsgSize)),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: gbackoff.Config{
BaseDelay: time.Second,
Expand All @@ -207,7 +210,6 @@ func (a *connArray) Init(ctx context.Context) error {
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(128*1024*1024)),
)
cancel()

Expand Down Expand Up @@ -1063,6 +1065,17 @@ func (s *eventFeedSession) receiveFromStream(
return nil
}

size := cevent.Size()
if size > warnRecvMsgSizeThreshold {
regionCount := 0
if cevent.ResolvedTs != nil {
regionCount = len(cevent.ResolvedTs.Regions)
}
log.Warn("change data event size too large",
zap.Int("size", size), zap.Int("event length", len(cevent.Events)),
zap.Int("resolved region count", regionCount))
}

for _, event := range cevent.Events {
state, ok := regionStates[event.RegionId]
// Every region's range is locked before sending requests and unlocked after exiting, and the requestID
Expand Down
92 changes: 82 additions & 10 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package kv

import (
"context"
"fmt"
"net"
"sync"
"testing"
Expand Down Expand Up @@ -91,10 +90,12 @@ func (s *mockChangeDataService) EventFeed(server cdcpb.ChangeData_EventFeedServe
return nil
}

func newMockService(c *check.C, port int, ch chan *cdcpb.ChangeDataEvent, wg *sync.WaitGroup) *grpc.Server {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
func newMockService(ctx context.Context, c *check.C, ch chan *cdcpb.ChangeDataEvent, wg *sync.WaitGroup) (grpcServer *grpc.Server, addr string) {
lc := &net.ListenConfig{}
lis, err := lc.Listen(ctx, "tcp", "127.0.0.1:0")
c.Assert(err, check.IsNil)
grpcServer := grpc.NewServer()
addr = lis.Addr().String()
grpcServer = grpc.NewServer()
mockService := &mockChangeDataService{c: c, ch: ch}
cdcpb.RegisterChangeDataServer(grpcServer, mockService)
wg.Add(1)
Expand All @@ -103,7 +104,7 @@ func newMockService(c *check.C, port int, ch chan *cdcpb.ChangeDataEvent, wg *sy
c.Assert(err, check.IsNil)
wg.Done()
}()
return grpcServer
return
}

type mockPDClient struct {
Expand All @@ -124,9 +125,11 @@ func (m *mockPDClient) GetStore(ctx context.Context, storeID uint64) (*metapb.St

// Use etcdSuite to workaround the race. See comments of `TestConnArray`.
func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := &sync.WaitGroup{}
ch2 := make(chan *cdcpb.ChangeDataEvent, 10)
server2 := newMockService(c, 23376, ch2, wg)
server2, addr := newMockService(ctx, c, ch2, wg)
defer func() {
close(ch2)
server2.Stop()
Expand All @@ -141,16 +144,14 @@ func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) {
kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)

cluster.AddStore(1, "localhost:23375")
cluster.AddStore(2, "localhost:23376")
cluster.AddStore(1, "localhost:1")
cluster.AddStore(2, addr)
cluster.Bootstrap(3, []uint64{1, 2}, []uint64{4, 5}, 4)

lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
isPullInit := &mockPullerInit{}
cdcClient, err := NewCDCClient(context.Background(), pdClient, kvStorage.(tikv.Storage), &security.Credential{})
c.Assert(err, check.IsNil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -199,6 +200,77 @@ func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) {
cancel()
}

func (s *etcdSuite) TestRecvLargeMessageSize(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
ch2 := make(chan *cdcpb.ChangeDataEvent, 10)
server2, addr := newMockService(ctx, c, ch2, wg)
defer func() {
close(ch2)
server2.Stop()
wg.Wait()
}()
// Cancel first, and then close the server.
defer cancel()

cluster := mocktikv.NewCluster()
mvccStore := mocktikv.MustNewMVCCStore()
rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, version: util.MinTiKVVersion.String()}
kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)

cluster.AddStore(2, addr)
cluster.Bootstrap(3, []uint64{2}, []uint64{4}, 4)

lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
isPullInit := &mockPullerInit{}
cdcClient, err := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{})
c.Assert(err, check.IsNil)
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
wg.Done()
}()

var event *model.RegionFeedEvent
select {
case event = <-eventCh:
case <-time.After(time.Second):
c.Fatalf("recving message takes too long")
}
c.Assert(event, check.NotNil)

largeValSize := 128*1024*1024 + 1 // 128MB + 1
largeMsg := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
{
RegionId: 3,
Event: &cdcpb.Event_Entries_{
Entries: &cdcpb.Event_Entries{
Entries: []*cdcpb.Event_Row{{
Type: cdcpb.Event_COMMITTED,
OpType: cdcpb.Event_Row_PUT,
Key: []byte("a"),
Value: make([]byte, largeValSize),
CommitTs: 2, // ResolvedTs = 1
}},
},
},
},
}}
ch2 <- largeMsg
select {
case event = <-eventCh:
case <-time.After(30 * time.Second): // Send 128MB object may costs lots of time.
c.Fatalf("recving message takes too long")
}
c.Assert(len(event.Val.Value), check.Equals, largeValSize)
cancel()
}

// TODO enable the test
func (s *etcdSuite) TodoTestIncompatibleTiKV(c *check.C) {
cluster := mocktikv.NewCluster()
Expand Down
5 changes: 3 additions & 2 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/log"
timodel "github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/sink/common"
"github.com/pingcap/ticdc/pkg/config"
Expand Down Expand Up @@ -983,13 +984,13 @@ func isIgnorableDDLError(err error) bool {
}
}

func getSQLErrCode(err error) (errors.ErrCode, bool) {
func getSQLErrCode(err error) (terror.ErrCode, bool) {
mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError)
if !ok {
return -1, false
}

return errors.ErrCode(mysqlErr.Number), true
return terror.ErrCode(mysqlErr.Number), true
}

func buildColumnList(names []string) string {
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ require (
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20200902104258-eba4f1d8f6de
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20200818080353-7aaed8998596
github.com/pingcap/kvproto v0.0.0-20200909045102-2ac90648531b
github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463
github.com/pingcap/parser v0.0.0-20200908132759-b65348b6244c
github.com/pingcap/tidb v1.1.0-beta.0.20200909081327-88f98fc3b1d4
github.com/pingcap/tidb-tools v4.0.6-0.20200909062246-98d05bb77362+incompatible
github.com/pingcap/parser v0.0.0-20200911054040-258297116c4b
github.com/pingcap/tidb v1.1.0-beta.0.20200911063238-51d365fc45fd
github.com/pingcap/tidb-tools v4.0.6-0.20200828085514-03575b185007+incompatible
github.com/prometheus/client_golang v1.5.1
github.com/r3labs/diff v1.1.0
github.com/spf13/cobra v1.0.0
Expand Down
13 changes: 7 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,8 @@ github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29/go.mod h1:IOdRDPLy
github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200818080353-7aaed8998596 h1:1cRjX7+yHQiE4pV/xwB8XcbZXV9sHshWMNTd5I6SS2o=
github.com/pingcap/kvproto v0.0.0-20200818080353-7aaed8998596/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200909045102-2ac90648531b h1:pqOXTOat/yDzc/THrkXx2YgAFfQenEvmn6ub6iEQFfo=
github.com/pingcap/kvproto v0.0.0-20200909045102-2ac90648531b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad h1:SveG82rmu/GFxYanffxsSF503SiQV+2JLnWEiGiF+Tc=
Expand All @@ -495,8 +497,8 @@ github.com/pingcap/parser v0.0.0-20200623164729-3a18f1e5dceb/go.mod h1:vQdbJqobJ
github.com/pingcap/parser v0.0.0-20200803072748-fdf66528323d h1:QQMAWm/b/8EyCrqqcjdO4DcACS06tx8IhKGWC4PTqiQ=
github.com/pingcap/parser v0.0.0-20200803072748-fdf66528323d/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0=
github.com/pingcap/parser v0.0.0-20200901062802-475ea5e2e0a7/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0=
github.com/pingcap/parser v0.0.0-20200908132759-b65348b6244c h1:oJn1X+lZwG4LG2DV+73lppFfnfy+3wXUwpoVgtIOQq8=
github.com/pingcap/parser v0.0.0-20200908132759-b65348b6244c/go.mod h1:RlLfMRJwFBSiXd2lUaWdV5pSXtrpyvZM8k5bbZWsheU=
github.com/pingcap/parser v0.0.0-20200911054040-258297116c4b h1:olNvO8UWo7Y+t2oWwB46cDj5pyqosgiQts5t8tZlbSc=
github.com/pingcap/parser v0.0.0-20200911054040-258297116c4b/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0=
github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2/go.mod h1:s+utZtXDznOiL24VK0qGmtoHjjXNsscJx3m1n8cC56s=
github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181/go.mod h1:q4HTx/bA8aKBa4S7L+SQKHvjRPXCRV0tA0yRw0qkZSA=
github.com/pingcap/pd/v4 v4.0.5-0.20200817114353-e465cafe8a91 h1:zCOWP+kIzM6ZsXdu2QoM/W6+3vFZj04MYboMP2Obc0E=
Expand All @@ -512,16 +514,15 @@ github.com/pingcap/tidb v1.1.0-beta.0.20200715100003-b4da443a3c4c/go.mod h1:TplK
github.com/pingcap/tidb v1.1.0-beta.0.20200716023258-b10faca6ff89/go.mod h1:hDlQ5BJ4rLLCOUlvXqW3skyYEjyymzeTA3eXpNEDx38=
github.com/pingcap/tidb v1.1.0-beta.0.20200820092836-c5b7658b0896 h1:l2UJF9cFxwaMMNMjguqrfiC7sFZrEqbtEmAAWFyHx9w=
github.com/pingcap/tidb v1.1.0-beta.0.20200820092836-c5b7658b0896/go.mod h1:IAStISSVhEI9Gp/sE4w6Ms0WxpdBJ9qNTczNyskvd5A=
github.com/pingcap/tidb v1.1.0-beta.0.20200909081327-88f98fc3b1d4 h1:rdho1Gk4Hj5m3vDfCBy1Zh1hDMylm6ZsqRbySYcgSo4=
github.com/pingcap/tidb v1.1.0-beta.0.20200909081327-88f98fc3b1d4/go.mod h1:GFEcPPyxRKnFe5cwF58RSm6gFbbBgfoJh8BWaDfIq6o=
github.com/pingcap/tidb v1.1.0-beta.0.20200911063238-51d365fc45fd h1:NzlXvchm6aPuW29ciy63vUwznkp4OrQVnF6/TTdGcRg=
github.com/pingcap/tidb v1.1.0-beta.0.20200911063238-51d365fc45fd/go.mod h1:hZoU6jeZIj9jViblw0Pf1aOBJajgI82eMqlC7HYtRWI=
github.com/pingcap/tidb-tools v4.0.0-beta.1.0.20200306084441-875bd09aa3d5+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v4.0.0-rc.1.0.20200421113014-507d2bb3a15e+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v4.0.0-rc.1.0.20200514040632-f76b3e428e19+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v4.0.0-rc.2.0.20200521050818-6dd445d83fe0+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v4.0.1-0.20200530144555-cdec43635625+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v4.0.6-0.20200828085514-03575b185007+incompatible h1:1GY6Qu5pT7JZ4QwkPcz+daXKhkDgKY1F6qKxifSp+tI=
github.com/pingcap/tidb-tools v4.0.6-0.20200828085514-03575b185007+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v4.0.6-0.20200909062246-98d05bb77362+incompatible h1:J/GMjtaH5PBM5Hj4KRxCqcYa66LWuRPNADdMmGzLdRY=
github.com/pingcap/tidb-tools v4.0.6-0.20200909062246-98d05bb77362+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200522051215-f31a15d98fce/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
Expand Down

0 comments on commit 51404d7

Please sign in to comment.