Skip to content

Commit

Permalink
Fix mysql sink may be blocked by network io wait (#825)
Browse files Browse the repository at this point in the history
* Fix mysql sink may be blocked by network io wait

* add integration test

* fix integration test

* fix integration test

* fix integration test

* address comment

* skip kafka in sink_hang test
  • Loading branch information
amyangfei authored Aug 6, 2020
1 parent 69ab1f3 commit e0dcfb7
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 5 deletions.
28 changes: 23 additions & 5 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,10 +420,22 @@ func (s *mysqlSink) createSinkWorkers(ctx context.Context) {
}
}

func (s *mysqlSink) notifyAndWaitExec() {
func (s *mysqlSink) notifyAndWaitExec(ctx context.Context) {
s.notifier.Notify()
for _, w := range s.workers {
w.waitAllTxnsExecuted()
done := make(chan struct{})
go func() {
for _, w := range s.workers {
w.waitAllTxnsExecuted()
}
close(done)
}()
// This is a hack code to avoid io wait in some routine blocks others to exit.
// As the network io wait is blocked in kernel code, the goroutine is in a
// D-state that we could not even stop it by cancel the context. So if this
// scenario happens, the blocked goroutine will be leak.
select {
case <-ctx.Done():
case <-done:
}
}

Expand Down Expand Up @@ -463,7 +475,7 @@ func (s *mysqlSink) dispatchAndExecTxns(ctx context.Context, txnsGroup map[model
sendFn(txn, idx)
return
}
s.notifyAndWaitExec()
s.notifyAndWaitExec(ctx)
causality.reset()
}
sendFn(txn, rowsChIdx)
Expand All @@ -477,7 +489,7 @@ func (s *mysqlSink) dispatchAndExecTxns(ctx context.Context, txnsGroup map[model
s.metricConflictDetectDurationHis.Observe(time.Since(startTime).Seconds())
}
}
s.notifyAndWaitExec()
s.notifyAndWaitExec(ctx)
}

type mysqlSinkWorker struct {
Expand Down Expand Up @@ -609,6 +621,9 @@ func (s *mysqlSink) execDMLWithMaxRetries(
failpoint.Inject("MySQLSinkTxnRandomError", func() {
failpoint.Return(checkTxnErr(errors.Trace(dmysql.ErrInvalidConn)))
})
failpoint.Inject("MySQLSinkHangLongTime", func() {
time.Sleep(time.Hour)
})
err := s.statistics.RecordBatchExecution(func() (int, error) {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
Expand Down Expand Up @@ -691,6 +706,9 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, replicaID uint64,
}

func (s *mysqlSink) execDMLs(ctx context.Context, rows []*model.RowChangedEvent, replicaID uint64, bucket int) error {
failpoint.Inject("MySQLSinkExecDMLError", func() {
failpoint.Return(errors.Trace(dmysql.ErrInvalidConn))
})
dmls, err := s.prepareDMLs(rows, replicaID, bucket)
if err != nil {
return errors.Trace(err)
Expand Down
27 changes: 27 additions & 0 deletions tests/sink_hang/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# diff Configuration.

log-level = "info"
chunk-size = 10
check-thread-count = 4
sample-percent = 100
use-rowid = false
use-checksum = true
fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "sink_hang"
tables = ["~t.*"]

[[source-db]]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
instance-id = "source-1"

[target-db]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
70 changes: 70 additions & 0 deletions tests/sink_hang/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#!/bin/bash

set -e

CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

CDC_COUNT=3
DB_COUNT=4
MAX_RETRIES=10

function check_changefeed_state() {
pd_addr=$1
changefeed_id=$2
expected=$3
state=$(cdc cli --pd=$pd_addr changefeed query -s -c $changefeed_id|jq -r ".state")
if [[ "$state" != "$expected" ]];then
echo "unexpected state $state, expected $expected"
exit 1
fi
}

export -f check_changefeed_state

function run() {
# kafka is not supported yet.
if [ "$SINK_TYPE" == "kafka" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR
cd $WORK_DIR

pd_addr="http://$UP_PD_HOST:$UP_PD_PORT"
TOPIC_NAME="ticdc-sink-hang-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";;
*) SINK_URI="mysql://[email protected]:3306/?max-txn-row=1";;
esac
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4"
fi

export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/sink/MySQLSinkHangLongTime=1*return(true);github.com/pingcap/ticdc/cdc/sink/MySQLSinkExecDMLError=1*return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')

run_sql "CREATE DATABASE sink_hang;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE table sink_hang.t1(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE table sink_hang.t2(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "BEGIN; INSERT INTO sink_hang.t1 VALUES (),(),(); INSERT INTO sink_hang.t2 VALUES (),(),(); COMMIT" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "stopped"
cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "normal"

check_table_exists "sink_hang.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "sink_hang.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit e0dcfb7

Please sign in to comment.