Skip to content

Commit

Permalink
Merge branch 'master' into release-4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed Aug 6, 2020
2 parents 67be7b3 + e0dcfb7 commit bf7b482
Show file tree
Hide file tree
Showing 43 changed files with 1,070 additions and 16 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

## Architecture

![architecture](./docs/media/cdc_architecture.png)
<img src="docs/media/cdc_architecture.svg?sanitize=true" alt="architecture" width="600"/>

## Documentation

Expand Down
34 changes: 27 additions & 7 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,9 @@ func (p *processor) updateInfo(ctx context.Context) error {
return errors.Trace(model.ErrAdminStopProcessor)
}
updatePosition := func() error {
failpoint.Inject("ProcessorUpdatePositionDelaying", func() {
time.Sleep(1 * time.Second)
})
//p.position.Count = p.sink.Count()
err := p.etcdCli.PutTaskPosition(ctx, p.changefeedID, p.captureInfo.ID, p.position)
if err != nil {
Expand Down Expand Up @@ -508,6 +511,10 @@ func (p *processor) updateInfo(ctx context.Context) error {
if err != nil {
return false, backoff.Permanent(errors.Trace(err))
}
err = updatePosition()
if err != nil {
return true, errors.Trace(err)
}
return true, nil
})
if err != nil {
Expand All @@ -524,10 +531,6 @@ func (p *processor) updateInfo(ctx context.Context) error {
syncTableNumGauge.
WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).
Set(float64(len(p.status.Tables)))
err = updatePosition()
if err != nil {
return errors.Trace(err)
}
return nil
}

Expand Down Expand Up @@ -770,8 +773,9 @@ func (p *processor) syncResolved(ctx context.Context) error {
}
if row.CRTs <= resolvedTs {
log.Fatal("The CRTs must be greater than the resolvedTs",
zap.Uint64("CRTs", row.CRTs),
zap.Uint64("resolvedTs", resolvedTs))
zap.String("model", "processor"),
zap.Uint64("resolvedTs", resolvedTs),
zap.Any("row", row))
}
err := processRowChangedEvent(row)
if err != nil {
Expand Down Expand Up @@ -821,7 +825,11 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
log.Warn("Ignore existing table", zap.Int64("ID", tableID))
return
}
log.Debug("Add table", zap.Int64("tableID", tableID), zap.String("name", tableName), zap.Any("replicaInfo", replicaInfo))
globalResolvedTs := atomic.LoadUint64(&p.sinkEmittedResolvedTs)
log.Debug("Add table", zap.Int64("tableID", tableID),
zap.String("name", tableName),
zap.Any("replicaInfo", replicaInfo),
zap.Uint64("globalResolvedTs", globalResolvedTs))

ctx = util.PutTableInfoInCtx(ctx, tableID, tableName)
ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -881,6 +889,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
}()
go func() {
resolvedTsGauge := tableResolvedTsGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, table.name)
var lastResolvedTs uint64
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -908,10 +917,21 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
}
if pEvent.RawKV != nil && pEvent.RawKV.OpType == model.OpTypeResolved {
atomic.StoreUint64(pResolvedTs, pEvent.CRTs)
lastResolvedTs = pEvent.CRTs
p.localResolvedNotifier.Notify()
resolvedTsGauge.Set(float64(oracle.ExtractPhysical(pEvent.CRTs)))
continue
}
sinkResolvedTs := atomic.LoadUint64(&p.sinkEmittedResolvedTs)
if pEvent.CRTs <= sinkResolvedTs || pEvent.CRTs <= lastResolvedTs || pEvent.CRTs < replicaInfo.StartTs {
log.Fatal("The CRTs of event is not expected, please report a bug",
zap.String("model", "sorter"),
zap.Uint64("globalResolvedTs", sinkResolvedTs),
zap.Uint64("resolvedTs", lastResolvedTs),
zap.Int64("tableID", tableID),
zap.Any("replicaInfo", replicaInfo),
zap.Any("row", pEvent))
}
select {
case <-ctx.Done():
if errors.Cause(ctx.Err()) != context.Canceled {
Expand Down
4 changes: 3 additions & 1 deletion cdc/puller/rectifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"context"
"sync/atomic"

"go.uber.org/zap"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -64,7 +66,7 @@ func (r *Rectifier) Run(ctx context.Context) error {
output := func(event *model.PolymorphicEvent) {
select {
case <-ctx.Done():
log.Warn("")
log.Warn("failed to send to output channel", zap.Error(ctx.Err()))
case r.outputCh <- event:
}
}
Expand Down
28 changes: 23 additions & 5 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,10 +420,22 @@ func (s *mysqlSink) createSinkWorkers(ctx context.Context) {
}
}

func (s *mysqlSink) notifyAndWaitExec() {
func (s *mysqlSink) notifyAndWaitExec(ctx context.Context) {
s.notifier.Notify()
for _, w := range s.workers {
w.waitAllTxnsExecuted()
done := make(chan struct{})
go func() {
for _, w := range s.workers {
w.waitAllTxnsExecuted()
}
close(done)
}()
// This is a hack code to avoid io wait in some routine blocks others to exit.
// As the network io wait is blocked in kernel code, the goroutine is in a
// D-state that we could not even stop it by cancel the context. So if this
// scenario happens, the blocked goroutine will be leak.
select {
case <-ctx.Done():
case <-done:
}
}

Expand Down Expand Up @@ -463,7 +475,7 @@ func (s *mysqlSink) dispatchAndExecTxns(ctx context.Context, txnsGroup map[model
sendFn(txn, idx)
return
}
s.notifyAndWaitExec()
s.notifyAndWaitExec(ctx)
causality.reset()
}
sendFn(txn, rowsChIdx)
Expand All @@ -477,7 +489,7 @@ func (s *mysqlSink) dispatchAndExecTxns(ctx context.Context, txnsGroup map[model
s.metricConflictDetectDurationHis.Observe(time.Since(startTime).Seconds())
}
}
s.notifyAndWaitExec()
s.notifyAndWaitExec(ctx)
}

type mysqlSinkWorker struct {
Expand Down Expand Up @@ -609,6 +621,9 @@ func (s *mysqlSink) execDMLWithMaxRetries(
failpoint.Inject("MySQLSinkTxnRandomError", func() {
failpoint.Return(checkTxnErr(errors.Trace(dmysql.ErrInvalidConn)))
})
failpoint.Inject("MySQLSinkHangLongTime", func() {
time.Sleep(time.Hour)
})
err := s.statistics.RecordBatchExecution(func() (int, error) {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
Expand Down Expand Up @@ -691,6 +706,9 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, replicaID uint64,
}

func (s *mysqlSink) execDMLs(ctx context.Context, rows []*model.RowChangedEvent, replicaID uint64, bucket int) error {
failpoint.Inject("MySQLSinkExecDMLError", func() {
failpoint.Return(errors.Trace(dmysql.ErrInvalidConn))
})
dmls, err := s.prepareDMLs(rows, replicaID, bucket)
if err != nil {
return errors.Trace(err)
Expand Down
9 changes: 9 additions & 0 deletions demo/java/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
.DS_Store
*/.DS_Store
target
dist
.idea
*.iml
.classpath
.project
.settings/
52 changes: 52 additions & 0 deletions demo/java/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# How to use

The following code shows how to parse ticdc data([ticdc open protocol](https://docs.pingcap.com/tidb/stable/ticdc-open-protocol)) which consumed from kafka.

```
TicdcEventFilter filter = new TicdcEventFilter();
for (KafkaMessage kafkaMessage : kafkaMessages) {
TicdcEventDecoder ticdcEventDecoder = new TicdcEventDecoder(kafkaMessage);
while (ticdcEventDecoder.hasNext()) {
TicdcEventData data = ticdcEventDecoder.next();
if (data.getTicdcEventValue() instanceof TicdcEventRowChange) {
boolean ok = filter.check(data.getTicdcEventKey().getTbl(), data.getTicdcEventValue().getKafkaPartition(), data.getTicdcEventKey().getTs());
if (ok) {
// deal with row change event
} else {
// ignore duplicated messages
}
} else if (data.getTicdcEventValue() instanceof TicdcEventDDL) {
// deal with ddl event
} else if (data.getTicdcEventValue() instanceof TicdcEventResolve) {
filter.resolveEvent(data.getTicdcEventValue().getKafkaPartition(), data.getTicdcEventKey().getTs());
// deal with resolve event
}
System.out.println(JSON.toJSONString(data, true));
}
}
```
[See com.pingcap.ticdc.cdc.TicdcEventDecoderTest.](src/test/java/com/pingcap/ticdc/cdc/TicdcEventDecoderTest.java).

# How to install
Prerequisites for building:

* Git
* Maven (we recommend version 3.2.5)
* Java 8

```
git clone [email protected]:pingcap/ticdc.git
cd ticdc/demo/java
mvn install
```

Now ticdc-decoder is installed. To add a dependency :

```xml
<dependency>
<groupId>com.pingcap.ticdc.cdc</groupId>
<artifactId>ticdc-decoder</artifactId>
<version>4.0.6-SNAPSHOT</version>
</dependency>
```
46 changes: 46 additions & 0 deletions demo/java/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.pingcap.ticdc.cdc</groupId>
<artifactId>ticdc-decoder</artifactId>
<version>4.0.6-SNAPSHOT</version>

<properties>
<java.version>1.8</java.version>
<fastjson.version>1.2.70</fastjson.version>
</properties>

<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<encoding>utf-8</encoding>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>

</project>
72 changes: 72 additions & 0 deletions demo/java/src/main/java/com/pingcap/ticdc/cdc/KafkaMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2020 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.pingcap.ticdc.cdc;

public class KafkaMessage {
private byte[] key;
private byte[] value;
private int partition;
private long offset;
private long timestamp;

public KafkaMessage() {
}

public KafkaMessage(byte[] key, byte[] value) {
this.key = key;
this.value = value;
}

public int getPartition() {
return partition;
}

public void setPartition(int partition) {
this.partition = partition;
}

public long getOffset() {
return offset;
}

public void setOffset(long offset) {
this.offset = offset;
}

public long getTimestamp() {
return timestamp;
}

public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}

public byte[] getKey() {
return key;
}

public void setKey(byte[] key) {
this.key = key;
}

public byte[] getValue() {
return value;
}

public void setValue(byte[] value) {
this.value = value;
}
}
Loading

0 comments on commit bf7b482

Please sign in to comment.