Skip to content

Commit

Permalink
mysql sink: add a batch size limiter (#336)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro authored Mar 12, 2020
1 parent 863f8ea commit 63b1db9
Show file tree
Hide file tree
Showing 2 changed files with 387 additions and 289 deletions.
47 changes: 45 additions & 2 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
)

const defaultWorkerCount = 16
const defaultMaxTxnRow = 256

var (
printStatusInterval = 30 * time.Second
Expand Down Expand Up @@ -228,12 +229,14 @@ var _ Sink = &mysqlSink{}

type params struct {
workerCount int
maxTxnRow int
changefeedID string
captureID string
}

var defaultParams = params{
workerCount: defaultWorkerCount,
maxTxnRow: defaultMaxTxnRow,
}

func configureSinkURI(dsnCfg *dmysql.Config) (string, error) {
Expand Down Expand Up @@ -272,6 +275,14 @@ func newMySQLSink(sinkURI *url.URL, dsn *dmysql.Config, filter *util.Filter, opt
}
params.workerCount = c
}
s = sinkURI.Query().Get("max-txn-row")
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
return nil, errors.Trace(err)
}
params.maxTxnRow = c
}
// dsn format of the driver:
// [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
username := sinkURI.User.Username()
Expand Down Expand Up @@ -336,8 +347,12 @@ func (s *mysqlSink) concurrentExec(ctx context.Context, rowGroups map[string][]*
for i := 0; i < nWorkers; i++ {
eg.Go(func() error {
for rows := range jobs {
// TODO: Add retry
if err := s.execDMLs(ctx, rows); err != nil {
err := rowLimitIterator(rows, s.params.maxTxnRow,
func(rows []*model.RowChangedEvent) error {
// TODO: Add retry
return errors.Trace(s.execDMLs(ctx, rows))
})
if err != nil {
return errors.Trace(err)
}
}
Expand All @@ -347,6 +362,34 @@ func (s *mysqlSink) concurrentExec(ctx context.Context, rowGroups map[string][]*
return eg.Wait()
}

func rowLimitIterator(rows []*model.RowChangedEvent, maxTxnRow int, fn func([]*model.RowChangedEvent) error) error {
start := 0
end := maxTxnRow
for end < len(rows) {
lastTs := rows[end-1].Ts
for ; end < len(rows); end++ {
if lastTs < rows[end].Ts {
break
}
}
if err := fn(rows[start:end]); err != nil {
return errors.Trace(err)
}
start = end
end += maxTxnRow
}
if start < len(rows) {
if err := fn(rows[start:]); err != nil {
return errors.Trace(err)
}
}
return nil
}

func (s *mysqlSink) Close() error {
return nil
}

func (s *mysqlSink) PrintStatus(ctx context.Context) error {
lastTime := time.Now()
var lastCount int64
Expand Down
Loading

0 comments on commit 63b1db9

Please sign in to comment.