Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dep(*): move code of sync_diff_inspector from tidb-tools to tiflow #11671

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
6 changes: 3 additions & 3 deletions .github/workflows/dm_binlog_999999.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ jobs:
key: ${{ runner.os }}-ticdc-tools-${{ hashFiles('tools/check/go.sum') }}

- name: Build DM binary
run: make dm_integration_test_build
run: |
make dm_integration_test_build
make sync_diff_inspector

- name: Setup CI environment
run: |
docker-compose -f ./dm/tests/binlog_999999/docker-compose.yml up -d
curl http://download.pingcap.org/tidb-enterprise-tools-nightly-linux-amd64.tar.gz | tar xz
mv tidb-enterprise-tools-nightly-linux-amd64/bin/sync_diff_inspector bin/
curl http://download.pingcap.org/tidb-nightly-linux-amd64.tar.gz | tar xz
mv tidb-nightly-linux-amd64/bin/tidb-server bin/
curl -O https://dl.min.io/server/minio/release/linux-amd64/minio
Expand Down
9 changes: 4 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
### Makefile for tiflow
.PHONY: build test check clean fmt cdc kafka_consumer storage_consumer coverage \
.PHONY: build test check clean fmt sync_diff_inspector cdc kafka_consumer storage_consumer coverage \
integration_test_build integration_test integration_test_mysql integration_test_kafka bank \
kafka_docker_integration_test kafka_docker_integration_test_with_build \
clean_integration_test_containers \
Expand Down Expand Up @@ -159,6 +159,9 @@ build-cdc-with-failpoint: ## Build cdc with failpoint enabled.
cdc:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/cdc

sync_diff_inspector:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/sync_diff_inspector ./sync_diff_inspector/main.go

kafka_consumer:
$(CONSUMER_GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./cmd/kafka-consumer

Expand Down Expand Up @@ -566,7 +569,6 @@ check_third_party_binary_for_engine:
@which mysql || (echo "mysql not found in ${PATH}"; exit 1)
@which jq || (echo "jq not found in ${PATH}"; exit 1)
@which mc || (echo "mc not found in ${PATH}, you can use 'make bin/mc' and move bin/mc to ${PATH}"; exit 1)
@which bin/sync_diff_inspector || (echo "run 'make bin/sync_diff_inspector' to download it if you need")

check_engine_integration_test:
./engine/test/utils/check_case.sh
Expand All @@ -581,9 +583,6 @@ check_cdc_integration_test:
bin/mc:
./scripts/download-mc.sh

bin/sync_diff_inspector:
./scripts/download-sync-diff.sh

define run_engine_unit_test
@echo "running unit test for packages:" $(1)
mkdir -p $(ENGINE_TEST_DIR)
Expand Down
3 changes: 1 addition & 2 deletions dm/tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
1. The following executables must be copied or generated or linked into these locations.

* `bin/tidb-server` can be downloaded from [tidb-master-linux-amd64](https://download.pingcap.org/tidb-master-linux-amd64.tar.gz) or installed by [tiup](https://github.com/pingcap/tiup), you can use the command `find ~/.tiup -name tidb-server` to locate `tidb-server` binary file and copy it
* `bin/sync_diff_inspector` # can be downloaded from [tidb-enterprise-tools-latest-linux-amd64](http://download.pingcap.org/tidb-enterprise-tools-latest-linux-amd64.tar.gz) or build from [source code](https://github.com/pingcap/tidb-tools)
* `bin/minio` can be build from (https://github.com/minio/minio)
* `bin/dm-master.test` # generated by `make dm_integration_test_build`
* `bin/dm-worker.test` # generated by `make dm_integration_test_build`
Expand Down Expand Up @@ -32,7 +31,7 @@

### Integration Test

1. Run `make dm_integration_test_build` to generate DM related binary for integration test
1. Run `make dm_integration_test_build` and `make sync_diff_inspector` to generate DM related binary for integration test.

2. Setup two MySQL servers (the first one: 5.6 ~ 5.7; the second one: 8.0.21, suggest you are same as [CI](https://github.com/PingCAP-QE/ci/blob/main/jenkins/pipelines/ci/dm/dm_ghpr_new_test.groovy#L164-L172)) with [binlog enabled first](https://dev.mysql.com/doc/refman/5.7/en/replication-howto-masterbaseconfig.html) and [set `GTID_MODE=ON`](https://dev.mysql.com/doc/refman/5.7/en/replication-mode-change-online-enable-gtids.html), You need set the mysql port and root password according to the following table.

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/mailru/easyjson v0.7.7
github.com/mattn/go-shellwords v1.0.12
github.com/modern-go/reflect2 v1.0.2
github.com/olekukonko/tablewriter v0.0.5
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pierrec/lz4/v4 v4.1.18
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
Expand Down Expand Up @@ -345,7 +346,7 @@ require (
github.com/shurcooL/httpfs v0.0.0-20230704072500-f1e31cf0ba5c // indirect
github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546 // indirect
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 // indirect
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move it to direct group to keep it pretty.

Copy link
Author

@joechenrh joechenrh Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.13.0/go.mod h1:+REjRxOmWfHCjfv9TTWB1jD1Frx4XydAD3zm1lskyM0=
Expand Down
23 changes: 23 additions & 0 deletions sync_diff_inspector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# sync-diff-inspector

sync-diff-inspector is a tool for comparing two database's data.

## How to use

```shell
Usage of diff:
-V, --version print version of sync_diff_inspector
-L, --log-level string log level: debug, info, warn, error, fatal (default "info")
-C, --config string Config file
-T, --template string <dm|norm> export a template config file
--dm-addr string the address of DM
--dm-task string identifier of dm task
--check-thread-count int how many goroutines are created to check data (default 4)
--export-fix-sql set true if want to compare rows or set to false will only compare checksum (default true)
```

For more details you can read the [config.toml](./config/config.toml), [config_sharding.toml](./config/config_sharding.toml) and [config_dm.toml](./config/config_dm.toml).

## Documents
- `zh`: [Overview in Chinese](https://docs.pingcap.com/zh/tidb/stable/sync-diff-inspector-overview)
- `en`: [Overview in English](https://docs.pingcap.com/tidb/stable/sync-diff-inspector-overview)
251 changes: 251 additions & 0 deletions sync_diff_inspector/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package checkpoints

import (
"container/heap"
"context"
"encoding/json"
"os"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/sync_diff_inspector/chunk"
"github.com/pingcap/tiflow/sync_diff_inspector/config"
"github.com/pingcap/tiflow/sync_diff_inspector/report"
"github.com/siddontang/go/ioutil2"
"go.uber.org/zap"
)

const (
// SuccessState means
// for chunk: this chunk's data is equal
// for table: means this all chunk in this table is equal(except ignore chunk)
SuccessState = "success"

// FailedState means
// for chunk: this chunk's data is not equal
// for table: some chunks' data is not equal or some chunk check failed in this table
FailedState = "failed"

// IgnoreState means
// for chunk: this chunk is ignored. if it is Empty chunk, will ignore some chunk
// for table: don't have this state
IgnoreState = "ignore"
)

// Node is the struct for node
type Node struct {
State string `json:"state"` // indicate the state ("success" or "failed") of the chunk

ChunkRange *chunk.Range `json:"chunk-range"`
IndexID int64 `json:"index-id"`
}

// GetID returns id from the node
func (n *Node) GetID() *chunk.CID { return n.ChunkRange.Index }

// GetState returns the state from the node
func (n *Node) GetState() string { return n.State }

// GetTableIndex returns table index
func (n *Node) GetTableIndex() int { return n.ChunkRange.Index.TableIndex }

// GetBucketIndexLeft returns BucketIndexLeft
func (n *Node) GetBucketIndexLeft() int { return n.ChunkRange.Index.BucketIndexLeft }

// GetBucketIndexRight returns BucketIndexRight
func (n *Node) GetBucketIndexRight() int { return n.ChunkRange.Index.BucketIndexRight }

// GetChunkIndex returns ChunkIndex
func (n *Node) GetChunkIndex() int { return n.ChunkRange.Index.ChunkIndex }

// IsAdjacent represents whether the next node is adjacent node.
// it's the important logic for checkpoint update.
// we need keep this node save to checkpoint in global order.
func (n *Node) IsAdjacent(next *Node) bool {
if n.GetTableIndex() == next.GetTableIndex()-1 {
if n.ChunkRange.IsLastChunkForTable() && next.ChunkRange.IsFirstChunkForTable() {
return true
}
return false

Check warning on line 83 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L83

Added line #L83 was not covered by tests
}
if n.GetTableIndex() == next.GetTableIndex() {
// same table
if n.GetBucketIndexRight() == next.GetBucketIndexLeft()-1 {
if n.ChunkRange.IsLastChunkForBucket() && next.ChunkRange.IsFirstChunkForBucket() {
return true
}
return false

Check warning on line 91 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L91

Added line #L91 was not covered by tests
}
if n.GetBucketIndexLeft() == next.GetBucketIndexLeft() {
return n.GetChunkIndex() == next.GetChunkIndex()-1
}
return false

Check warning on line 96 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L96

Added line #L96 was not covered by tests
}
return false

Check warning on line 98 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L98

Added line #L98 was not covered by tests
}

// IsLess represents whether the cur node is less than next node.
func (n *Node) IsLess(next *Node) bool {
if n.GetTableIndex() < next.GetTableIndex() {
return true
}

Check warning on line 105 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L104-L105

Added lines #L104 - L105 were not covered by tests
if n.GetTableIndex() == next.GetTableIndex() {
if n.GetBucketIndexLeft() <= next.GetBucketIndexLeft()-1 {
return true
}
if n.GetBucketIndexLeft() == next.GetBucketIndexLeft() {
return n.GetChunkIndex() < next.GetChunkIndex()
}
return false
}
return false

Check warning on line 115 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L115

Added line #L115 was not covered by tests
}

// heap maintain a Min Heap, which can be accessed by multiple threads and protected by mutex.
type nodeHeap struct {
Nodes []*Node
CurrentSavedNode *Node // CurrentSavedNode save the minimum checker chunk, updated by `GetChunkSnapshot` method
mu *sync.Mutex // protect critical section
}

// Checkpoint provide the ability to restart the sync-diff process from the
// latest previous exit point (due to error or intention).
type Checkpoint struct {
hp *nodeHeap
}

// SavedState contains the information of the latest checked chunk and state of `report`
// When sync-diff start from the checkpoint, it will load this information and continue running
type SavedState struct {
Chunk *Node `json:"chunk-info"`
Report *report.Report `json:"report-info"`
}

// InitCurrentSavedID the method is only used in initialization without lock, be cautious
func (cp *Checkpoint) InitCurrentSavedID(n *Node) {
cp.hp.CurrentSavedNode = n

Check warning on line 140 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L139-L140

Added lines #L139 - L140 were not covered by tests
}

// GetCurrentSavedID returns the saved id with lock
func (cp *Checkpoint) GetCurrentSavedID() *Node {
cp.hp.mu.Lock()
defer cp.hp.mu.Unlock()
return cp.hp.CurrentSavedNode

Check warning on line 147 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L144-L147

Added lines #L144 - L147 were not covered by tests
}

// Insert inserts a new node
func (cp *Checkpoint) Insert(node *Node) {
cp.hp.mu.Lock()
heap.Push(cp.hp, node)
cp.hp.mu.Unlock()
}

// Len gets the length of the heap
func (hp *nodeHeap) Len() int { return len(hp.Nodes) }

// Less determines which is more priority than another
func (hp *nodeHeap) Less(i, j int) bool {
return hp.Nodes[i].IsLess(hp.Nodes[j])
}

// Swap implementation of swap for the heap interface
func (hp *nodeHeap) Swap(i, j int) {
hp.Nodes[i], hp.Nodes[j] = hp.Nodes[j], hp.Nodes[i]
}

// Push implementation of push for the heap interface
func (hp *nodeHeap) Push(x interface{}) {
hp.Nodes = append(hp.Nodes, x.(*Node))
}

// Pop implementation of pop for heap interface
func (hp *nodeHeap) Pop() (item interface{}) {
if len(hp.Nodes) == 0 {
return
}

Check warning on line 179 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L178-L179

Added lines #L178 - L179 were not covered by tests

hp.Nodes, item = hp.Nodes[:len(hp.Nodes)-1], hp.Nodes[len(hp.Nodes)-1]
return
}

// Init initialize the Checkpoint
func (cp *Checkpoint) Init() {
hp := &nodeHeap{
mu: &sync.Mutex{},
Nodes: make([]*Node, 0),
CurrentSavedNode: &Node{
ChunkRange: &chunk.Range{
Index: chunk.GetInitCID(),
IsFirst: true,
IsLast: true,
},
},
}
heap.Init(hp)
cp.hp = hp
}

// GetChunkSnapshot get the snapshot of the minimum continuous checked chunk
func (cp *Checkpoint) GetChunkSnapshot() (cur *Node) {
cp.hp.mu.Lock()
defer cp.hp.mu.Unlock()
for cp.hp.Len() != 0 && cp.hp.CurrentSavedNode.IsAdjacent(cp.hp.Nodes[0]) {
cp.hp.CurrentSavedNode = heap.Pop(cp.hp).(*Node)
cur = cp.hp.CurrentSavedNode
}
// wait for next 10s to check
return cur
}

// SaveChunk saves the chunk to file.
func (cp *Checkpoint) SaveChunk(ctx context.Context, fileName string, cur *Node, reportInfo *report.Report) (*chunk.CID, error) {
if cur == nil {
return nil, nil
}

savedState := &SavedState{
Chunk: cur,
Report: reportInfo,
}
checkpointData, err := json.Marshal(savedState)
if err != nil {
log.Warn("fail to save the chunk to the file", zap.Any("chunk index", cur.GetID()), zap.Error(err))
return nil, errors.Trace(err)
}

Check warning on line 228 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L226-L228

Added lines #L226 - L228 were not covered by tests

if err = ioutil2.WriteFileAtomic(fileName, checkpointData, config.LocalFilePerm); err != nil {
return nil, err
}

Check warning on line 232 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L231-L232

Added lines #L231 - L232 were not covered by tests
log.Info("save checkpoint",
zap.Any("chunk", cur),
zap.String("state", cur.GetState()))
return cur.GetID(), nil
}

// LoadChunk loads chunk info from file `chunk`
func (cp *Checkpoint) LoadChunk(fileName string) (*Node, *report.Report, error) {
bytes, err := os.ReadFile(fileName)
if err != nil {
return nil, nil, errors.Trace(err)
}

Check warning on line 244 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L243-L244

Added lines #L243 - L244 were not covered by tests
n := &SavedState{}
err = json.Unmarshal(bytes, n)
if err != nil {
return nil, nil, errors.Trace(err)
}

Check warning on line 249 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L248-L249

Added lines #L248 - L249 were not covered by tests
return n.Chunk, n.Report, nil
}
Loading
Loading