Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dep(*): move code of sync_diff_inspector from tidb-tools to tiflow #11671

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/mailru/easyjson v0.7.7
github.com/mattn/go-shellwords v1.0.12
github.com/modern-go/reflect2 v1.0.2
github.com/olekukonko/tablewriter v0.0.5
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pierrec/lz4/v4 v4.1.18
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.13.0/go.mod h1:+REjRxOmWfHCjfv9TTWB1jD1Frx4XydAD3zm1lskyM0=
Expand Down
23 changes: 23 additions & 0 deletions sync_diff_inspector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# sync-diff-inspector

sync-diff-inspector is a tool for comparing two database's data.

## How to use

```shell
Usage of diff:
-V, --version print version of sync_diff_inspector
-L, --log-level string log level: debug, info, warn, error, fatal (default "info")
-C, --config string Config file
-T, --template string <dm|norm> export a template config file
--dm-addr string the address of DM
--dm-task string identifier of dm task
--check-thread-count int how many goroutines are created to check data (default 4)
--export-fix-sql set true if want to compare rows or set to false will only compare checksum (default true)
```

For more details you can read the [config.toml](./config/config.toml), [config_sharding.toml](./config/config_sharding.toml) and [config_dm.toml](./config/config_dm.toml).

## Documents
- `zh`: [Overview in Chinese](https://docs.pingcap.com/zh/tidb/stable/sync-diff-inspector-overview)
- `en`: [Overview in English](https://docs.pingcap.com/tidb/stable/sync-diff-inspector-overview)
273 changes: 273 additions & 0 deletions sync_diff_inspector/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package checkpoints

import (
"container/heap"
"context"
"encoding/json"
"io"
"os"
"path"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/sync_diff_inspector/chunk"
"github.com/pingcap/tiflow/sync_diff_inspector/config"
"github.com/pingcap/tiflow/sync_diff_inspector/report"
"go.uber.org/zap"
)

// Write file to temp and atomically move when everything else succeeds.
func writeFileAtomic(filename string, data []byte, perm os.FileMode) error {
dir, name := path.Dir(filename), path.Base(filename)
f, err := os.CreateTemp(dir, name)
if err != nil {
return err
}

Check warning on line 39 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L38-L39

Added lines #L38 - L39 were not covered by tests
n, err := f.Write(data)
f.Close()
if err == nil && n < len(data) {
err = io.ErrShortWrite

Check warning on line 43 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L43

Added line #L43 was not covered by tests
} else {
err = os.Chmod(f.Name(), perm)
}
if err != nil {
os.Remove(f.Name())
return err
}

Check warning on line 50 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L48-L50

Added lines #L48 - L50 were not covered by tests
return os.Rename(f.Name(), filename)
}

const (
// SuccessState means
// for chunk: this chunk's data is equal
// for table: means this all chunk in this table is equal(except ignore chunk)
SuccessState = "success"

// FailedState means
// for chunk: this chunk's data is not equal
// for table: some chunks' data is not equal or some chunk check failed in this table
FailedState = "failed"

// IgnoreState means
// for chunk: this chunk is ignored. if it is Empty chunk, will ignore some chunk
// for table: don't have this state
IgnoreState = "ignore"
)

// Node is the struct for node
type Node struct {
State string `json:"state"` // indicate the state ("success" or "failed") of the chunk

ChunkRange *chunk.Range `json:"chunk-range"`
IndexID int64 `json:"index-id"`
}

// GetID returns id from the node
func (n *Node) GetID() *chunk.CID { return n.ChunkRange.Index }

// GetState returns the state from the node
func (n *Node) GetState() string { return n.State }

// GetTableIndex returns table index
func (n *Node) GetTableIndex() int { return n.ChunkRange.Index.TableIndex }

// GetBucketIndexLeft returns BucketIndexLeft
func (n *Node) GetBucketIndexLeft() int { return n.ChunkRange.Index.BucketIndexLeft }

// GetBucketIndexRight returns BucketIndexRight
func (n *Node) GetBucketIndexRight() int { return n.ChunkRange.Index.BucketIndexRight }

// GetChunkIndex returns ChunkIndex
func (n *Node) GetChunkIndex() int { return n.ChunkRange.Index.ChunkIndex }

// IsAdjacent represents whether the next node is adjacent node.
// it's the important logic for checkpoint update.
// we need keep this node save to checkpoint in global order.
func (n *Node) IsAdjacent(next *Node) bool {
if n.GetTableIndex() == next.GetTableIndex()-1 {
if n.ChunkRange.IsLastChunkForTable() && next.ChunkRange.IsFirstChunkForTable() {
return true
}
return false

Check warning on line 105 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L105

Added line #L105 was not covered by tests
}
if n.GetTableIndex() == next.GetTableIndex() {
// same table
if n.GetBucketIndexRight() == next.GetBucketIndexLeft()-1 {
if n.ChunkRange.IsLastChunkForBucket() && next.ChunkRange.IsFirstChunkForBucket() {
return true
}
return false

Check warning on line 113 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L113

Added line #L113 was not covered by tests
}
if n.GetBucketIndexLeft() == next.GetBucketIndexLeft() {
return n.GetChunkIndex() == next.GetChunkIndex()-1
}
return false

Check warning on line 118 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L118

Added line #L118 was not covered by tests
}
return false

Check warning on line 120 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L120

Added line #L120 was not covered by tests
}

// IsLess represents whether the cur node is less than next node.
func (n *Node) IsLess(next *Node) bool {
if n.GetTableIndex() < next.GetTableIndex() {
return true
}

Check warning on line 127 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L126-L127

Added lines #L126 - L127 were not covered by tests
if n.GetTableIndex() == next.GetTableIndex() {
if n.GetBucketIndexLeft() <= next.GetBucketIndexLeft()-1 {
return true
}
if n.GetBucketIndexLeft() == next.GetBucketIndexLeft() {
return n.GetChunkIndex() < next.GetChunkIndex()
}
return false
}
return false

Check warning on line 137 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L137

Added line #L137 was not covered by tests
}

// heap maintain a Min Heap, which can be accessed by multiple threads and protected by mutex.
type nodeHeap struct {
Nodes []*Node
CurrentSavedNode *Node // CurrentSavedNode save the minimum checker chunk, updated by `GetChunkSnapshot` method
mu *sync.Mutex // protect critical section
}

// Checkpoint provide the ability to restart the sync-diff process from the
// latest previous exit point (due to error or intention).
type Checkpoint struct {
hp *nodeHeap
}

// SavedState contains the information of the latest checked chunk and state of `report`
// When sync-diff start from the checkpoint, it will load this information and continue running
type SavedState struct {
Chunk *Node `json:"chunk-info"`
Report *report.Report `json:"report-info"`
}

// InitCurrentSavedID the method is only used in initialization without lock, be cautious
func (cp *Checkpoint) InitCurrentSavedID(n *Node) {
cp.hp.CurrentSavedNode = n

Check warning on line 162 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L161-L162

Added lines #L161 - L162 were not covered by tests
}

// GetCurrentSavedID returns the saved id with lock
func (cp *Checkpoint) GetCurrentSavedID() *Node {
cp.hp.mu.Lock()
defer cp.hp.mu.Unlock()
return cp.hp.CurrentSavedNode

Check warning on line 169 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L166-L169

Added lines #L166 - L169 were not covered by tests
}

// Insert inserts a new node
func (cp *Checkpoint) Insert(node *Node) {
cp.hp.mu.Lock()
heap.Push(cp.hp, node)
cp.hp.mu.Unlock()
}

// Len gets the length of the heap
func (hp *nodeHeap) Len() int { return len(hp.Nodes) }

// Less determines which is more priority than another
func (hp *nodeHeap) Less(i, j int) bool {
return hp.Nodes[i].IsLess(hp.Nodes[j])
}

// Swap implementation of swap for the heap interface
func (hp *nodeHeap) Swap(i, j int) {
hp.Nodes[i], hp.Nodes[j] = hp.Nodes[j], hp.Nodes[i]
}

// Push implementation of push for the heap interface
func (hp *nodeHeap) Push(x interface{}) {
hp.Nodes = append(hp.Nodes, x.(*Node))
}

// Pop implementation of pop for heap interface
func (hp *nodeHeap) Pop() (item interface{}) {
if len(hp.Nodes) == 0 {
return
}

Check warning on line 201 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L200-L201

Added lines #L200 - L201 were not covered by tests

hp.Nodes, item = hp.Nodes[:len(hp.Nodes)-1], hp.Nodes[len(hp.Nodes)-1]
return
}

// Init initialize the Checkpoint
func (cp *Checkpoint) Init() {
hp := &nodeHeap{
mu: &sync.Mutex{},
Nodes: make([]*Node, 0),
CurrentSavedNode: &Node{
ChunkRange: &chunk.Range{
Index: chunk.GetInitCID(),
IsFirst: true,
IsLast: true,
},
},
}
heap.Init(hp)
cp.hp = hp
}

// GetChunkSnapshot get the snapshot of the minimum continuous checked chunk
func (cp *Checkpoint) GetChunkSnapshot() (cur *Node) {
cp.hp.mu.Lock()
defer cp.hp.mu.Unlock()
for cp.hp.Len() != 0 && cp.hp.CurrentSavedNode.IsAdjacent(cp.hp.Nodes[0]) {
cp.hp.CurrentSavedNode = heap.Pop(cp.hp).(*Node)
cur = cp.hp.CurrentSavedNode
}
// wait for next 10s to check
return cur
}

// SaveChunk saves the chunk to file.
func (cp *Checkpoint) SaveChunk(ctx context.Context, fileName string, cur *Node, reportInfo *report.Report) (*chunk.CID, error) {
if cur == nil {
return nil, nil
}

savedState := &SavedState{
Chunk: cur,
Report: reportInfo,
}
checkpointData, err := json.Marshal(savedState)
if err != nil {
log.Warn("fail to save the chunk to the file", zap.Any("chunk index", cur.GetID()), zap.Error(err))
return nil, errors.Trace(err)
}

Check warning on line 250 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L248-L250

Added lines #L248 - L250 were not covered by tests

if err = writeFileAtomic(fileName, checkpointData, config.LocalFilePerm); err != nil {
return nil, err
}

Check warning on line 254 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L253-L254

Added lines #L253 - L254 were not covered by tests
log.Info("save checkpoint",
zap.Any("chunk", cur),
zap.String("state", cur.GetState()))
return cur.GetID(), nil
}

// LoadChunk loads chunk info from file `chunk`
func (cp *Checkpoint) LoadChunk(fileName string) (*Node, *report.Report, error) {
bytes, err := os.ReadFile(fileName)
if err != nil {
return nil, nil, errors.Trace(err)
}

Check warning on line 266 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L265-L266

Added lines #L265 - L266 were not covered by tests
n := &SavedState{}
err = json.Unmarshal(bytes, n)
if err != nil {
return nil, nil, errors.Trace(err)
}

Check warning on line 271 in sync_diff_inspector/checkpoints/checkpoints.go

View check run for this annotation

Codecov / codecov/patch

sync_diff_inspector/checkpoints/checkpoints.go#L270-L271

Added lines #L270 - L271 were not covered by tests
return n.Chunk, n.Report, nil
}
Loading
Loading