Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: RowsEvent include NextPos(include name) #1007

Closed
wants to merge 2 commits into from

Conversation

XUJiahua
Copy link

@XUJiahua XUJiahua commented Mar 5, 2025

Hello,

I need to ensure position persistence when processing each OnRow event. The RowsEvent structure currently stores the position in its header field but doesn't include the binlog name, making the information incomplete.

@dveeden
Copy link
Collaborator

dveeden commented Mar 5, 2025

Could you fix the failing check by running gofumpt on the file?

   Error: canal/rows.go:4:1: File is not properly formatted (gofumpt)
  	"fmt"
  ^

See also: https://github.com/mvdan/gofumpt

@dveeden dveeden added the canal label Mar 5, 2025
@XUJiahua
Copy link
Author

XUJiahua commented Mar 6, 2025

Could you fix the failing check by running gofumpt on the file?

   Error: canal/rows.go:4:1: File is not properly formatted (gofumpt)
  	"fmt"
  ^

See also: https://github.com/mvdan/gofumpt

fixed

Copy link
Collaborator

@lance6716 lance6716 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about your application, just some question to let myself understand it better, this PR almost LGTM

  1. in GTID based replication, the binlog name / offset is not stable after MySQL HA switch. So in fact this field is not super important in all cases. What's your usage of this field?
  2. Even without this change, your application can still maintain the state of binlog filename from OnRotate. Do you think this is also acceptable?

@@ -26,15 +28,18 @@ type RowsEvent struct {
Rows [][]interface{}
// Header can be used to inspect the event
Header *replication.EventHeader
// NextPos including binlog name
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment that when the RowsEvent is generated from dump stage, this field is nil.

nit: because your requirement only needs binlog name, how about only store binlog name here? we can discuss it before you accept my comment

Copy link
Collaborator

@dveeden dveeden left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this just the log position plus the event size of the event header?

And two more remarks:

  • The rows event is just a part of the transaction in the binlog. You might have to wait for the XID event.
  • With GTIDs etc. events can be out of order and then file+position isn't reliable anymore. And of course when switching to another source server the file and position doesn't work anymore, here you have to either use GTID or somehow map between positions on different servers.

Test event:

# at 2133
#250309 16:12:26 server id 1  end_log_pos 2173 CRC32 0x69c57f21 
# Position  Timestamp   Type   Source ID        Size      Source Pos    Flags 
# 00000855 da af cd 67   1e   01 00 00 00   28 00 00 00   7d 08 00 00   00 00
# 00000868 91 01 00 00 00 00 01 00  02 00 01 ff 00 39 30 00 |.............90.|
# 00000878 00 21 7f c5 69                                   |....i|
# 	Write_rows: table id 401 flags: STMT_END_F

BINLOG '
2q/NZxMBAAAAMAAAAFUIAAAAAJEBAAAAAAEABHRlc3QAAnQxAAEDAAABAQAERd3f
2q/NZx4BAAAAKAAAAH0IAAAAAJEBAAAAAAEAAgAB/wA5MAAAIX/FaQ==
'/*!*/;
### INSERT INTO `test`.`t1`
### SET
###   @1=12345 /* INT meta=0 nullable=0 is_null=0 */
diff --git a/cmd/go-canal/main.go b/cmd/go-canal/main.go
index 26f3460..d6a17fe 100644
--- a/cmd/go-canal/main.go
+++ b/cmd/go-canal/main.go
@@ -13,6 +13,7 @@ import (
 
        "github.com/go-mysql-org/go-mysql/canal"
        "github.com/go-mysql-org/go-mysql/mysql"
+       "github.com/go-mysql-org/go-mysql/replication"
        "github.com/pingcap/errors"
 )
 
@@ -116,10 +117,22 @@ type handler struct {
 
 func (h *handler) OnRow(e *canal.RowsEvent) error {
        fmt.Printf("%v\n", e)
+       if e.Header != nil {
+               fmt.Printf("End Pos: %d+%d=%d\n",
+                       e.Header.LogPos,
+                       e.Header.EventSize,
+                       e.Header.LogPos+e.Header.EventSize)
+       }
 
        return nil
 }
 
+func (h *handler) OnGTID(eh *replication.EventHeader, g mysql.BinlogGTIDEvent) error {
+       fmt.Printf("header: %v\n", eh)
+       fmt.Printf("gtid: %v\n", g)
+       return nil
+}
+
 func (h *handler) String() string {
        return "TestHandler"
 }

output:

header: &{1741533146 GTIDEvent 1 79 2010 0}
gtid: &{0 [137 110 120 130 24 254 17 239 171 136 34 34 45 52 212 17]  11 6 7 1741533146948618 1741533146948618 273 90200 90200}
insert test.t1 [[12345]]
End Pos: 2173+40=2213

I'm not sure the 2173 here is correct. I would have expected 2133 here. But this might be because this is MySQL 9.1.0.

@dveeden
Copy link
Collaborator

dveeden commented Mar 9, 2025

I think the binlog doesn't have the binlog name in the events except for the rotate event etc.

@dveeden
Copy link
Collaborator

dveeden commented Mar 9, 2025

So to me it looks like you need to:

  • Use OnRotate to get the binlog file name
  • Use OnGTID to get the GTID
  • Use the position/size from the OnRow method

Maybe we should update the example code to better reflect this. Maybe something like this:

diff --git a/cmd/go-canal/main.go b/cmd/go-canal/main.go
index 26f3460..c780202 100644
--- a/cmd/go-canal/main.go
+++ b/cmd/go-canal/main.go
@@ -13,6 +13,7 @@ import (
 
        "github.com/go-mysql-org/go-mysql/canal"
        "github.com/go-mysql-org/go-mysql/mysql"
+       "github.com/go-mysql-org/go-mysql/replication"
        "github.com/pingcap/errors"
 )
 
@@ -115,8 +116,18 @@ type handler struct {
 }
 
 func (h *handler) OnRow(e *canal.RowsEvent) error {
-       fmt.Printf("%v\n", e)
+       fmt.Printf("row: %v\n", e)
+       return nil
+}
 
+func (h *handler) OnGTID(eh *replication.EventHeader, g mysql.BinlogGTIDEvent) error {
+       gn, _ := g.GTIDNext()
+       fmt.Printf("GTID: %s\n", gn.String())
+       return nil
+}
+
+func (h *handler) OnRotate(eh *replication.EventHeader, r *replication.RotateEvent) error {
+       fmt.Printf("rotate: %s\n", r.NextLogName)
        return nil
 }
 

@dveeden
Copy link
Collaborator

dveeden commented Mar 10, 2025

Looks like #247 is also related to this

@XUJiahua
Copy link
Author

OnXID and OnGTID are sufficient to handle checkpointing. I’ll close this PR. Thank you for pointing this out. @dveeden @lance6716

@XUJiahua XUJiahua closed this Mar 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants