Skip to content

Commit dd8b1f9

Browse files
committed
feat: RowsEvent include NextPos(include name)
1 parent b39e40d commit dd8b1f9

File tree

3 files changed

+9
-5
lines changed

3 files changed

+9
-5
lines changed

canal/dump.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error
110110
}
111111
}
112112

113-
events := newRowsEvent(tableInfo, InsertAction, [][]interface{}{vs}, nil)
113+
events := newRowsEvent(tableInfo, InsertAction, [][]interface{}{vs}, nil, nil)
114114
return h.c.eventHandler.OnRow(events)
115115
}
116116

canal/rows.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package canal
22

33
import (
44
"fmt"
5+
"github.com/go-mysql-org/go-mysql/mysql"
56

67
"github.com/go-mysql-org/go-mysql/replication"
78
"github.com/go-mysql-org/go-mysql/schema"
@@ -26,15 +27,18 @@ type RowsEvent struct {
2627
Rows [][]interface{}
2728
// Header can be used to inspect the event
2829
Header *replication.EventHeader
30+
// NextPos including binlog name
31+
NextPos *mysql.Position
2932
}
3033

31-
func newRowsEvent(table *schema.Table, action string, rows [][]interface{}, header *replication.EventHeader) *RowsEvent {
34+
func newRowsEvent(table *schema.Table, action string, rows [][]interface{}, header *replication.EventHeader, nextPos *mysql.Position) *RowsEvent {
3235
e := new(RowsEvent)
3336

3437
e.Table = table
3538
e.Action = action
3639
e.Rows = rows
3740
e.Header = header
41+
e.NextPos = nextPos
3842

3943
e.handleUnsigned()
4044

canal/sync.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error {
101101
}
102102
case *replication.RowsEvent:
103103
// we only focus row based event
104-
err = c.handleRowsEvent(ev)
104+
err = c.handleRowsEvent(ev, pos)
105105
if err != nil {
106106
c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
107107
return errors.Trace(err)
@@ -262,7 +262,7 @@ func (c *Canal) updateReplicationDelay(ev *replication.BinlogEvent) {
262262
atomic.StoreUint32(c.delay, newDelay)
263263
}
264264

265-
func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
265+
func (c *Canal) handleRowsEvent(e *replication.BinlogEvent, nextPos mysql.Position) error {
266266
ev := e.Event.(*replication.RowsEvent)
267267

268268
// Caveat: table may be altered at runtime.
@@ -290,7 +290,7 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
290290
default:
291291
return errors.Errorf("%s not supported now", e.Header.EventType)
292292
}
293-
events := newRowsEvent(t, action, ev.Rows, e.Header)
293+
events := newRowsEvent(t, action, ev.Rows, e.Header, &nextPos)
294294
return c.eventHandler.OnRow(events)
295295
}
296296

0 commit comments

Comments
 (0)