Skip to content

Commit

Permalink
owner: fix memory accumulated when owner consume etcd update slow (#1224
Browse files Browse the repository at this point in the history
)
  • Loading branch information
amyangfei authored Dec 18, 2020
1 parent e620b5f commit 04e0284
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,8 @@ func (o *Owner) watchFeedChange(ctx context.Context) chan struct{} {
return
default:
}
wch := o.etcdClient.Client.Watch(ctx, kv.TaskPositionKeyPrefix, clientv3.WithFilterDelete(), clientv3.WithPrefix())
cctx, cancel := context.WithCancel(ctx)
wch := o.etcdClient.Client.Watch(cctx, kv.TaskPositionKeyPrefix, clientv3.WithFilterDelete(), clientv3.WithPrefix())

for resp := range wch {
if resp.Err() != nil {
Expand All @@ -1108,10 +1109,12 @@ func (o *Owner) watchFeedChange(ctx context.Context) chan struct{} {
// operations should be resolved in future release.

select {
case <-ctx.Done():
case output <- struct{}{}:
default:
// in case output channel is full, just ignore this event
}
}
cancel()
}
}()
return output
Expand Down

0 comments on commit 04e0284

Please sign in to comment.