Skip to content

Commit

Permalink
Merge branch 'master' into release-4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed Aug 28, 2020
2 parents bf7b482 + 9ca590e commit 7b96199
Show file tree
Hide file tree
Showing 146 changed files with 8,477 additions and 3,100 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ cscope.*
cmd/cdc/cdc

# Files generated when testing
vendor/
tiflash-config-preprocessed.toml

# Files generated when running docker-compose
Expand Down
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ FROM golang:1.14-alpine as builder
RUN apk add --no-cache git make bash
WORKDIR /go/src/github.com/pingcap/ticdc
COPY . .
ENV CDC_ENABLE_VENDOR=1
RUN make

FROM alpine:3.12
RUN apk add --no-cache tzdata bash curl
RUN apk add --no-cache tzdata bash curl socat
COPY --from=builder /go/src/github.com/pingcap/ticdc/bin/cdc /cdc
EXPOSE 8300
CMD [ "/cdc" ]
12 changes: 9 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ TEST_DIR := /tmp/tidb_cdc_test
SHELL := /usr/bin/env bash

GO := GO111MODULE=on go
GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -trimpath
ifeq (${CDC_ENABLE_VENDOR}, 1)
GOVENDORFLAG := -mod=vendor
endif

GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -trimpath $(GOVENDORFLAG)
ifeq ($(GOVERSION114), 1)
GOTEST := CGO_ENABLED=1 $(GO) test -p 3 --race -gcflags=all=-d=checkptr=0
else
Expand All @@ -24,7 +28,7 @@ endif
ARCH := "`uname -s`"
LINUX := "Linux"
MAC := "Darwin"
PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto|ticdc\/tests'
PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto|ticdc\/tests|integration'
PACKAGES := $$($(PACKAGE_LIST))
PACKAGE_DIRECTORIES := $(PACKAGE_LIST) | sed 's|github.com/pingcap/$(PROJECT)/||'
FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor')
Expand Down Expand Up @@ -84,8 +88,10 @@ check_third_party_binary:
@which bin/go-ycsb
@which bin/etcdctl
@which bin/jq
@which bin/minio

integration_test_build: check_failpoint_ctl
./scripts/fix_lib_zstd.sh
$(FAILPOINT_ENABLE)
$(GOTEST) -c -cover -covemode=atomic \
-coverpkg=github.com/pingcap/ticdc/... \
Expand Down Expand Up @@ -124,7 +130,7 @@ tidy:
check: check-copyright fmt lint check-static tidy

coverage:
GO111MODULE=off go get github.com/zhouqiang-cl/gocovmerge
GO111MODULE=off go get github.com/wadey/gocovmerge
gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/cdc/kv/testing.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out"
grep -vE ".*.pb.go|$(CDC_PKG)/cdc/kv/testing.go|.*.__failpoint_binding__.go" "$(TEST_DIR)/cov.unit.out" > "$(TEST_DIR)/unit_cov.out"
ifeq ("$(JenkinsCI)", "1")
Expand Down
6 changes: 3 additions & 3 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (c *changeFeed) balanceOrphanTables(ctx context.Context, captures map[model
}

for captureID, funcs := range updateFuncs {
newStatus, err := c.etcdCli.AtomicPutTaskStatus(ctx, c.id, captureID, funcs...)
newStatus, _, err := c.etcdCli.AtomicPutTaskStatus(ctx, c.id, captureID, funcs...)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -365,7 +365,7 @@ func (c *changeFeed) balanceOrphanTables(ctx context.Context, captures map[model

func (c *changeFeed) updateTaskStatus(ctx context.Context, taskStatus map[model.CaptureID]*model.TaskStatus) error {
for captureID, status := range taskStatus {
newStatus, err := c.etcdCli.AtomicPutTaskStatus(ctx, c.id, captureID, func(modRevision int64, taskStatus *model.TaskStatus) (bool, error) {
newStatus, _, err := c.etcdCli.AtomicPutTaskStatus(ctx, c.id, captureID, func(modRevision int64, taskStatus *model.TaskStatus) (bool, error) {
if taskStatus.SomeOperationsUnapplied() {
log.Error("unexpected task status, there are operations unapplied in this status", zap.Any("status", taskStatus))
return false, errors.Errorf("waiting to processor handle the operation finished time out")
Expand Down Expand Up @@ -789,7 +789,7 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error {
for len(c.ddlJobHistory) > 0 && c.ddlJobHistory[0].BinlogInfo.FinishedTS <= c.ddlExecutedTs {
c.ddlJobHistory = c.ddlJobHistory[1:]
}
if len(c.ddlJobHistory) > 0 && minResolvedTs > c.ddlJobHistory[0].BinlogInfo.FinishedTS {
if len(c.ddlJobHistory) > 0 && minResolvedTs >= c.ddlJobHistory[0].BinlogInfo.FinishedTS {
minResolvedTs = c.ddlJobHistory[0].BinlogInfo.FinishedTS
c.ddlState = model.ChangeFeedWaitToExecDDL
}
Expand Down
12 changes: 2 additions & 10 deletions cdc/entry/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/rowcodec"
Expand Down Expand Up @@ -186,13 +185,6 @@ func decodeMetaKey(ek []byte) (meta, error) {
// decodeRow decodes a byte slice into datums with a existing row map.
func decodeRow(b []byte, recordID int64, tableInfo *model.TableInfo, tz *time.Location) (map[int64]types.Datum, error) {
if len(b) == 0 {
if tableInfo.PKIsHandle {
id, pkValue, err := fetchHandleValue(tableInfo, recordID)
if err != nil {
return nil, errors.Trace(err)
}
return map[int64]types.Datum{id: *pkValue}, nil
}
return map[int64]types.Datum{}, nil
}
if rowcodec.IsNewFormat(b) {
Expand Down Expand Up @@ -261,8 +253,8 @@ func decodeRowV1(b []byte, recordID int64, tableInfo *model.TableInfo, tz *time.
// https://github.com/pingcap/tidb/blob/master/docs/design/2018-07-19-row-format.md
func decodeRowV2(data []byte, recordID int64, tableInfo *model.TableInfo, tz *time.Location) (map[int64]types.Datum, error) {
handleColID, reqCols := tableInfo.GetRowColInfos()
decoder := rowcodec.NewDatumMapDecoder(reqCols, []int64{handleColID}, tz)
return decoder.DecodeToDatumMap(data, kv.IntHandle(recordID), nil)
decoder := rowcodec.NewDatumMapDecoder(reqCols, handleColID, tz)
return decoder.DecodeToDatumMap(data, recordID, nil)
}

// unflatten converts a raw datum to a column datum.
Expand Down
3 changes: 1 addition & 2 deletions cdc/entry/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"testing"

"github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
)
Expand All @@ -31,7 +30,7 @@ var _ = check.Suite(&codecSuite{})

func (s *codecSuite) TestDecodeRecordKey(c *check.C) {
recordPrefix := tablecodec.GenTableRecordPrefix(12345)
key := tablecodec.EncodeRecordKey(recordPrefix, kv.IntHandle(67890))
key := tablecodec.EncodeRecordKey(recordPrefix, 67890)
key, tableID, err := decodeTableID(key)
c.Assert(err, check.IsNil)
c.Assert(tableID, check.Equals, int64(12345))
Expand Down
Loading

0 comments on commit 7b96199

Please sign in to comment.