Skip to content

Commit

Permalink
sorter: avoid blocking in thread pool (#1574) (#1600)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Apr 1, 2021
1 parent e1e3416 commit 35a6d8b
Show file tree
Hide file tree
Showing 3 changed files with 301 additions and 21 deletions.
181 changes: 167 additions & 14 deletions cdc/puller/sorter/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,31 @@ import (
"context"
"math"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/edwingeng/deque"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/notify"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/store/tikv/oracle"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out chan *model.PolymorphicEvent, onExit func()) error {
// TODO refactor this into a struct Merger.
func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out chan *model.PolymorphicEvent, onExit func(), bufLen *int64) error {
// TODO remove bufLenPlaceholder when refactoring
if bufLen == nil {
var bufLenPlaceholder int64
bufLen = &bufLenPlaceholder
}

captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
_, tableName := util.TableIDFromCtx(ctx)
Expand All @@ -46,11 +58,13 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch

lastResolvedTs := make([]uint64, numSorters)
minResolvedTs := uint64(0)

taskBuf := newTaskBuffer(bufLen)
var workingSet map[*flushTask]struct{}
pendingSet := make(map[*flushTask]*model.PolymorphicEvent)

defer func() {
log.Info("Unified Sorter: merger exiting, cleaning up resources", zap.Int("pending-set-size", len(pendingSet)))
taskBuf.setClosed()
// cancel pending async IO operations.
onExit()
cleanUpTask := func(task *flushTask) {
Expand All @@ -74,12 +88,30 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch
_ = printError(task.dealloc())
}

for {
task, err := taskBuf.get(ctx)
if err != nil {
_ = printError(err)
break
}

if task == nil {
log.Debug("Merger exiting, taskBuf is exhausted")
break
}

cleanUpTask(task)
}

for task := range pendingSet {
cleanUpTask(task)
}
for task := range workingSet {
cleanUpTask(task)
}

taskBuf.close()
log.Info("Merger has exited")
}()

lastOutputTs := uint64(0)
Expand Down Expand Up @@ -351,14 +383,43 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch
resolveTicker := time.NewTicker(1 * time.Second)
defer resolveTicker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case task := <-in:
errg, ctx := errgroup.WithContext(ctx)
errg.Go(func() error {
for {
var task *flushTask
select {
case <-ctx.Done():
return ctx.Err()
case task = <-in:
}

if task == nil {
tableID, tableName := util.TableIDFromCtx(ctx)
log.Info("Merger input channel closed, exiting",
log.Debug("Merger input channel closed, exiting",
zap.Int64("table-id", tableID),
zap.String("table-name", tableName))
return nil
}

taskBuf.put(task)
}
})

errg.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
task, err := taskBuf.get(ctx)
if err != nil {
return errors.Trace(err)
}

if task == nil {
tableID, tableName := util.TableIDFromCtx(ctx)
log.Debug("Merger buffer exhausted and is closed, exiting",
zap.Int64("table-id", tableID),
zap.String("table-name", tableName),
zap.Uint64("max-output", minResolvedTs))
Expand Down Expand Up @@ -387,17 +448,21 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch
return errors.Trace(err)
}
}
case <-resolveTicker.C:
err := sendResolvedEvent(minResolvedTs)
if err != nil {
return errors.Trace(err)
}
}
}
})

return errg.Wait()
}

func mergerCleanUp(in <-chan *flushTask) {
for task := range in {
select {
case err := <-task.finished:
_ = printError(err)
default:
break
}

if task.reader != nil {
_ = printError(task.reader.resetAndClose())
}
Expand All @@ -417,3 +482,91 @@ func printError(err error) error {
}
return err
}

// taskBuffer is used to store pending flushTasks.
// The design purpose is to reduce the backpressure caused by a congested output chan of the merger,
// so that heapSorter does not block.
type taskBuffer struct {
mu sync.Mutex // mu only protects queue
queue deque.Deque

notifier notify.Notifier
len *int64
isClosed int32
}

func newTaskBuffer(len *int64) *taskBuffer {
return &taskBuffer{
queue: deque.NewDeque(),
notifier: notify.Notifier{},
len: len,
}
}

func (b *taskBuffer) put(task *flushTask) {
b.mu.Lock()
defer b.mu.Unlock()

b.queue.PushBack(task)
prevCount := atomic.AddInt64(b.len, 1)

if prevCount == 1 {
b.notifier.Notify()
}
}

func (b *taskBuffer) get(ctx context.Context) (*flushTask, error) {
if atomic.LoadInt32(&b.isClosed) == 1 && atomic.LoadInt64(b.len) == 0 {
return nil, nil
}

if atomic.LoadInt64(b.len) == 0 {
recv, err := b.notifier.NewReceiver(time.Millisecond * 50)
if err != nil {
return nil, errors.Trace(err)
}
defer recv.Stop()

startTime := time.Now()
for atomic.LoadInt64(b.len) == 0 {
select {
case <-ctx.Done():
return nil, errors.Trace(ctx.Err())
case <-recv.C:
// Note that there can be spurious wake-ups
}

if atomic.LoadInt32(&b.isClosed) == 1 && atomic.LoadInt64(b.len) == 0 {
return nil, nil
}

if time.Since(startTime) > time.Second*5 {
log.Debug("taskBuffer reading blocked for too long", zap.Duration("duration", time.Since(startTime)))
}
}
}

postCount := atomic.AddInt64(b.len, -1)
if postCount < 0 {
log.Panic("taskBuffer: len < 0, report a bug", zap.Int64("len", postCount))
}

b.mu.Lock()
defer b.mu.Unlock()

ret := b.queue.PopFront()
if ret == nil {
log.Panic("taskBuffer: PopFront() returned nil, report a bug")
}

return ret.(*flushTask), nil
}

func (b *taskBuffer) setClosed() {
atomic.SwapInt32(&b.isClosed, 1)
}

// Only call this when the taskBuffer is NEVER going to be accessed again.
func (b *taskBuffer) close() {
b.notifier.Close()
}
Loading

0 comments on commit 35a6d8b

Please sign in to comment.