diff --git a/cdc/owner.go b/cdc/owner.go index b77da46d1bb..7cbfb90370c 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -1095,7 +1095,8 @@ func (o *Owner) watchFeedChange(ctx context.Context) chan struct{} { return default: } - wch := o.etcdClient.Client.Watch(ctx, kv.TaskPositionKeyPrefix, clientv3.WithFilterDelete(), clientv3.WithPrefix()) + cctx, cancel := context.WithCancel(ctx) + wch := o.etcdClient.Client.Watch(cctx, kv.TaskPositionKeyPrefix, clientv3.WithFilterDelete(), clientv3.WithPrefix()) for resp := range wch { if resp.Err() != nil { @@ -1108,10 +1109,12 @@ func (o *Owner) watchFeedChange(ctx context.Context) chan struct{} { // operations should be resolved in future release. select { - case <-ctx.Done(): case output <- struct{}{}: + default: + // in case output channel is full, just ignore this event } } + cancel() } }() return output