Skip to content

Commit

Permalink
kc/client: correct grpc Dail option to allow large messages (#947)
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus authored Sep 11, 2020
1 parent cc24c55 commit 51d0fd8
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 14 deletions.
19 changes: 16 additions & 3 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,11 @@ const (
tikvRequestMaxBackoff = 20000 // Maximum total sleep time(in ms)
grpcInitialWindowSize = 1 << 30 // The value for initial window size on a stream
grpcInitialConnWindowSize = 1 << 30 // The value for initial window size on a connection
grpcInitialMaxRecvMsgSize = 1 << 30 // The maximum message size the client can receive
grpcMaxCallRecvMsgSize = 1 << 30 // The maximum message size the client can receive
grpcConnCount = 10

// The threshold of warning a message is too large. TiKV split events into 6MB per-message.
warnRecvMsgSizeThreshold = 12 * 1024 * 1024
)

type singleRegionInfo struct {
Expand Down Expand Up @@ -192,7 +195,7 @@ func (a *connArray) Init(ctx context.Context) error {
grpcTLSOption,
grpc.WithInitialWindowSize(grpcInitialWindowSize),
grpc.WithInitialConnWindowSize(grpcInitialConnWindowSize),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(grpcInitialMaxRecvMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(grpcMaxCallRecvMsgSize)),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: gbackoff.Config{
BaseDelay: time.Second,
Expand All @@ -207,7 +210,6 @@ func (a *connArray) Init(ctx context.Context) error {
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(128*1024*1024)),
)
cancel()

Expand Down Expand Up @@ -1063,6 +1065,17 @@ func (s *eventFeedSession) receiveFromStream(
return nil
}

size := cevent.Size()
if size > warnRecvMsgSizeThreshold {
regionCount := 0
if cevent.ResolvedTs != nil {
regionCount = len(cevent.ResolvedTs.Regions)
}
log.Warn("change data event size too large",
zap.Int("size", size), zap.Int("event length", len(cevent.Events)),
zap.Int("resolved region count", regionCount))
}

for _, event := range cevent.Events {
state, ok := regionStates[event.RegionId]
// Every region's range is locked before sending requests and unlocked after exiting, and the requestID
Expand Down
92 changes: 82 additions & 10 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package kv

import (
"context"
"fmt"
"net"
"sync"
"testing"
Expand Down Expand Up @@ -91,10 +90,12 @@ func (s *mockChangeDataService) EventFeed(server cdcpb.ChangeData_EventFeedServe
return nil
}

func newMockService(c *check.C, port int, ch chan *cdcpb.ChangeDataEvent, wg *sync.WaitGroup) *grpc.Server {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
func newMockService(ctx context.Context, c *check.C, ch chan *cdcpb.ChangeDataEvent, wg *sync.WaitGroup) (grpcServer *grpc.Server, addr string) {
lc := &net.ListenConfig{}
lis, err := lc.Listen(ctx, "tcp", "127.0.0.1:0")
c.Assert(err, check.IsNil)
grpcServer := grpc.NewServer()
addr = lis.Addr().String()
grpcServer = grpc.NewServer()
mockService := &mockChangeDataService{c: c, ch: ch}
cdcpb.RegisterChangeDataServer(grpcServer, mockService)
wg.Add(1)
Expand All @@ -103,7 +104,7 @@ func newMockService(c *check.C, port int, ch chan *cdcpb.ChangeDataEvent, wg *sy
c.Assert(err, check.IsNil)
wg.Done()
}()
return grpcServer
return
}

type mockPDClient struct {
Expand All @@ -124,9 +125,11 @@ func (m *mockPDClient) GetStore(ctx context.Context, storeID uint64) (*metapb.St

// Use etcdSuite to workaround the race. See comments of `TestConnArray`.
func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := &sync.WaitGroup{}
ch2 := make(chan *cdcpb.ChangeDataEvent, 10)
server2 := newMockService(c, 23376, ch2, wg)
server2, addr := newMockService(ctx, c, ch2, wg)
defer func() {
close(ch2)
server2.Stop()
Expand All @@ -141,16 +144,14 @@ func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) {
kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)

cluster.AddStore(1, "localhost:23375")
cluster.AddStore(2, "localhost:23376")
cluster.AddStore(1, "localhost:1")
cluster.AddStore(2, addr)
cluster.Bootstrap(3, []uint64{1, 2}, []uint64{4, 5}, 4)

lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
isPullInit := &mockPullerInit{}
cdcClient, err := NewCDCClient(context.Background(), pdClient, kvStorage.(tikv.Storage), &security.Credential{})
c.Assert(err, check.IsNil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -199,6 +200,77 @@ func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) {
cancel()
}

func (s *etcdSuite) TestRecvLargeMessageSize(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
ch2 := make(chan *cdcpb.ChangeDataEvent, 10)
server2, addr := newMockService(ctx, c, ch2, wg)
defer func() {
close(ch2)
server2.Stop()
wg.Wait()
}()
// Cancel first, and then close the server.
defer cancel()

cluster := mocktikv.NewCluster()
mvccStore := mocktikv.MustNewMVCCStore()
rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, version: util.MinTiKVVersion.String()}
kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)

cluster.AddStore(2, addr)
cluster.Bootstrap(3, []uint64{2}, []uint64{4}, 4)

lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
isPullInit := &mockPullerInit{}
cdcClient, err := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{})
c.Assert(err, check.IsNil)
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
wg.Done()
}()

var event *model.RegionFeedEvent
select {
case event = <-eventCh:
case <-time.After(time.Second):
c.Fatalf("recving message takes too long")
}
c.Assert(event, check.NotNil)

largeValSize := 128*1024*1024 + 1 // 128MB + 1
largeMsg := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
{
RegionId: 3,
Event: &cdcpb.Event_Entries_{
Entries: &cdcpb.Event_Entries{
Entries: []*cdcpb.Event_Row{{
Type: cdcpb.Event_COMMITTED,
OpType: cdcpb.Event_Row_PUT,
Key: []byte("a"),
Value: make([]byte, largeValSize),
CommitTs: 2, // ResolvedTs = 1
}},
},
},
},
}}
ch2 <- largeMsg
select {
case event = <-eventCh:
case <-time.After(30 * time.Second): // Send 128MB object may costs lots of time.
c.Fatalf("recving message takes too long")
}
c.Assert(len(event.Val.Value), check.Equals, largeValSize)
cancel()
}

// TODO enable the test
func (s *etcdSuite) TodoTestIncompatibleTiKV(c *check.C) {
cluster := mocktikv.NewCluster()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20200902104258-eba4f1d8f6de
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20200818080353-7aaed8998596
github.com/pingcap/kvproto v0.0.0-20200909045102-2ac90648531b
github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463
github.com/pingcap/parser v0.0.0-20200911054040-258297116c4b
github.com/pingcap/tidb v1.1.0-beta.0.20200911063238-51d365fc45fd
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,8 @@ github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29/go.mod h1:IOdRDPLy
github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200818080353-7aaed8998596 h1:1cRjX7+yHQiE4pV/xwB8XcbZXV9sHshWMNTd5I6SS2o=
github.com/pingcap/kvproto v0.0.0-20200818080353-7aaed8998596/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200909045102-2ac90648531b h1:pqOXTOat/yDzc/THrkXx2YgAFfQenEvmn6ub6iEQFfo=
github.com/pingcap/kvproto v0.0.0-20200909045102-2ac90648531b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad h1:SveG82rmu/GFxYanffxsSF503SiQV+2JLnWEiGiF+Tc=
Expand Down

0 comments on commit 51d0fd8

Please sign in to comment.