diff --git a/canal/dump.go b/canal/dump.go index 91b303976..58973feb9 100644 --- a/canal/dump.go +++ b/canal/dump.go @@ -110,7 +110,7 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error } } - events := newRowsEvent(tableInfo, InsertAction, [][]interface{}{vs}, nil) + events := newRowsEvent(tableInfo, InsertAction, [][]interface{}{vs}, nil, nil) return h.c.eventHandler.OnRow(events) } diff --git a/canal/rows.go b/canal/rows.go index 9f618a9fb..08d545865 100644 --- a/canal/rows.go +++ b/canal/rows.go @@ -3,6 +3,8 @@ package canal import ( "fmt" + "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" "github.com/go-mysql-org/go-mysql/schema" ) @@ -26,15 +28,18 @@ type RowsEvent struct { Rows [][]interface{} // Header can be used to inspect the event Header *replication.EventHeader + // NextPos including binlog name + NextPos *mysql.Position } -func newRowsEvent(table *schema.Table, action string, rows [][]interface{}, header *replication.EventHeader) *RowsEvent { +func newRowsEvent(table *schema.Table, action string, rows [][]interface{}, header *replication.EventHeader, nextPos *mysql.Position) *RowsEvent { e := new(RowsEvent) e.Table = table e.Action = action e.Rows = rows e.Header = header + e.NextPos = nextPos e.handleUnsigned() diff --git a/canal/sync.go b/canal/sync.go index 6e4e538c3..f81484c18 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -101,7 +101,7 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error { } case *replication.RowsEvent: // we only focus row based event - err = c.handleRowsEvent(ev) + err = c.handleRowsEvent(ev, pos) if err != nil { c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err) return errors.Trace(err) @@ -262,7 +262,7 @@ func (c *Canal) updateReplicationDelay(ev *replication.BinlogEvent) { atomic.StoreUint32(c.delay, newDelay) } -func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { +func (c *Canal) handleRowsEvent(e *replication.BinlogEvent, nextPos mysql.Position) error { ev := e.Event.(*replication.RowsEvent) // Caveat: table may be altered at runtime. @@ -290,7 +290,7 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { default: return errors.Errorf("%s not supported now", e.Header.EventType) } - events := newRowsEvent(t, action, ev.Rows, e.Header) + events := newRowsEvent(t, action, ev.Rows, e.Header, &nextPos) return c.eventHandler.OnRow(events) }