diff --git a/cdc/kv/client.go b/cdc/kv/client.go index dcd4835497a..8e2aad17c49 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -56,8 +56,11 @@ const ( tikvRequestMaxBackoff = 20000 // Maximum total sleep time(in ms) grpcInitialWindowSize = 1 << 30 // The value for initial window size on a stream grpcInitialConnWindowSize = 1 << 30 // The value for initial window size on a connection - grpcInitialMaxRecvMsgSize = 1 << 30 // The maximum message size the client can receive + grpcMaxCallRecvMsgSize = 1 << 30 // The maximum message size the client can receive grpcConnCount = 10 + + // The threshold of warning a message is too large. TiKV split events into 6MB per-message. + warnRecvMsgSizeThreshold = 12 * 1024 * 1024 ) type singleRegionInfo struct { @@ -192,7 +195,7 @@ func (a *connArray) Init(ctx context.Context) error { grpcTLSOption, grpc.WithInitialWindowSize(grpcInitialWindowSize), grpc.WithInitialConnWindowSize(grpcInitialConnWindowSize), - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(grpcInitialMaxRecvMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(grpcMaxCallRecvMsgSize)), grpc.WithConnectParams(grpc.ConnectParams{ Backoff: gbackoff.Config{ BaseDelay: time.Second, @@ -207,7 +210,6 @@ func (a *connArray) Init(ctx context.Context) error { Timeout: 3 * time.Second, PermitWithoutStream: true, }), - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(128*1024*1024)), ) cancel() @@ -1063,6 +1065,17 @@ func (s *eventFeedSession) receiveFromStream( return nil } + size := cevent.Size() + if size > warnRecvMsgSizeThreshold { + regionCount := 0 + if cevent.ResolvedTs != nil { + regionCount = len(cevent.ResolvedTs.Regions) + } + log.Warn("change data event size too large", + zap.Int("size", size), zap.Int("event length", len(cevent.Events)), + zap.Int("resolved region count", regionCount)) + } + for _, event := range cevent.Events { state, ok := regionStates[event.RegionId] // Every region's range is locked before sending requests and unlocked after exiting, and the requestID diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index ec34baf79a0..da78b8303ea 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -15,7 +15,6 @@ package kv import ( "context" - "fmt" "net" "sync" "testing" @@ -91,10 +90,12 @@ func (s *mockChangeDataService) EventFeed(server cdcpb.ChangeData_EventFeedServe return nil } -func newMockService(c *check.C, port int, ch chan *cdcpb.ChangeDataEvent, wg *sync.WaitGroup) *grpc.Server { - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) +func newMockService(ctx context.Context, c *check.C, ch chan *cdcpb.ChangeDataEvent, wg *sync.WaitGroup) (grpcServer *grpc.Server, addr string) { + lc := &net.ListenConfig{} + lis, err := lc.Listen(ctx, "tcp", "127.0.0.1:0") c.Assert(err, check.IsNil) - grpcServer := grpc.NewServer() + addr = lis.Addr().String() + grpcServer = grpc.NewServer() mockService := &mockChangeDataService{c: c, ch: ch} cdcpb.RegisterChangeDataServer(grpcServer, mockService) wg.Add(1) @@ -103,7 +104,7 @@ func newMockService(c *check.C, port int, ch chan *cdcpb.ChangeDataEvent, wg *sy c.Assert(err, check.IsNil) wg.Done() }() - return grpcServer + return } type mockPDClient struct { @@ -124,9 +125,11 @@ func (m *mockPDClient) GetStore(ctx context.Context, storeID uint64) (*metapb.St // Use etcdSuite to workaround the race. See comments of `TestConnArray`. func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() wg := &sync.WaitGroup{} ch2 := make(chan *cdcpb.ChangeDataEvent, 10) - server2 := newMockService(c, 23376, ch2, wg) + server2, addr := newMockService(ctx, c, ch2, wg) defer func() { close(ch2) server2.Stop() @@ -141,16 +144,14 @@ func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) { kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) c.Assert(err, check.IsNil) - cluster.AddStore(1, "localhost:23375") - cluster.AddStore(2, "localhost:23376") + cluster.AddStore(1, "localhost:1") + cluster.AddStore(2, addr) cluster.Bootstrap(3, []uint64{1, 2}, []uint64{4, 5}, 4) lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage)) isPullInit := &mockPullerInit{} cdcClient, err := NewCDCClient(context.Background(), pdClient, kvStorage.(tikv.Storage), &security.Credential{}) c.Assert(err, check.IsNil) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { @@ -199,6 +200,77 @@ func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) { cancel() } +func (s *etcdSuite) TestRecvLargeMessageSize(c *check.C) { + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + ch2 := make(chan *cdcpb.ChangeDataEvent, 10) + server2, addr := newMockService(ctx, c, ch2, wg) + defer func() { + close(ch2) + server2.Stop() + wg.Wait() + }() + // Cancel first, and then close the server. + defer cancel() + + cluster := mocktikv.NewCluster() + mvccStore := mocktikv.MustNewMVCCStore() + rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "") + c.Assert(err, check.IsNil) + pdClient = &mockPDClient{Client: pdClient, version: util.MinTiKVVersion.String()} + kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) + c.Assert(err, check.IsNil) + + cluster.AddStore(2, addr) + cluster.Bootstrap(3, []uint64{2}, []uint64{4}, 4) + + lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage)) + isPullInit := &mockPullerInit{} + cdcClient, err := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{}) + c.Assert(err, check.IsNil) + eventCh := make(chan *model.RegionFeedEvent, 10) + wg.Add(1) + go func() { + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, lockresolver, isPullInit, eventCh) + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + wg.Done() + }() + + var event *model.RegionFeedEvent + select { + case event = <-eventCh: + case <-time.After(time.Second): + c.Fatalf("recving message takes too long") + } + c.Assert(event, check.NotNil) + + largeValSize := 128*1024*1024 + 1 // 128MB + 1 + largeMsg := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ + { + RegionId: 3, + Event: &cdcpb.Event_Entries_{ + Entries: &cdcpb.Event_Entries{ + Entries: []*cdcpb.Event_Row{{ + Type: cdcpb.Event_COMMITTED, + OpType: cdcpb.Event_Row_PUT, + Key: []byte("a"), + Value: make([]byte, largeValSize), + CommitTs: 2, // ResolvedTs = 1 + }}, + }, + }, + }, + }} + ch2 <- largeMsg + select { + case event = <-eventCh: + case <-time.After(30 * time.Second): // Send 128MB object may costs lots of time. + c.Fatalf("recving message takes too long") + } + c.Assert(len(event.Val.Value), check.Equals, largeValSize) + cancel() +} + // TODO enable the test func (s *etcdSuite) TodoTestIncompatibleTiKV(c *check.C) { cluster := mocktikv.NewCluster() diff --git a/go.mod b/go.mod index c45428de3f1..6e5a8b53585 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 github.com/pingcap/errors v0.11.5-0.20200902104258-eba4f1d8f6de github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce - github.com/pingcap/kvproto v0.0.0-20200818080353-7aaed8998596 + github.com/pingcap/kvproto v0.0.0-20200909045102-2ac90648531b github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463 github.com/pingcap/parser v0.0.0-20200911054040-258297116c4b github.com/pingcap/tidb v1.1.0-beta.0.20200911063238-51d365fc45fd diff --git a/go.sum b/go.sum index 5400fbb71c8..c0ab7629289 100644 --- a/go.sum +++ b/go.sum @@ -482,6 +482,8 @@ github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29/go.mod h1:IOdRDPLy github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20200818080353-7aaed8998596 h1:1cRjX7+yHQiE4pV/xwB8XcbZXV9sHshWMNTd5I6SS2o= github.com/pingcap/kvproto v0.0.0-20200818080353-7aaed8998596/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200909045102-2ac90648531b h1:pqOXTOat/yDzc/THrkXx2YgAFfQenEvmn6ub6iEQFfo= +github.com/pingcap/kvproto v0.0.0-20200909045102-2ac90648531b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad h1:SveG82rmu/GFxYanffxsSF503SiQV+2JLnWEiGiF+Tc=