Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replication: Add mysql::serialization based Gtid Log Event #990

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
3ea3d67
replication: Add mysql::serialization based Gtid Log Event
dveeden Feb 14, 2025
a3d5dee
fixup
dveeden Feb 14, 2025
10158f4
fixup
dveeden Feb 14, 2025
6888c31
Merge remote-tracking branch 'upstream/master' into gtid_tagged_log_e…
dveeden Feb 14, 2025
09838f5
fixup
dveeden Feb 14, 2025
b6ceaf3
Add test and get field by name
dveeden Feb 14, 2025
3bd47aa
more testing
dveeden Feb 15, 2025
0b445a1
More testing
dveeden Feb 15, 2025
b55f86e
fixup
dveeden Feb 15, 2025
3028527
prepare for negative values
dveeden Feb 16, 2025
76b98bb
Update based on review
dveeden Feb 16, 2025
990de1d
Merge remote-tracking branch 'upstream/master' into gtid_tagged_log_e…
dveeden Feb 17, 2025
0237595
fixup
dveeden Feb 17, 2025
209cfad
fixup
dveeden Feb 17, 2025
ad8acac
fixup
dveeden Feb 17, 2025
f412b2b
fixup
dveeden Feb 17, 2025
a7e4280
updates
dveeden Feb 18, 2025
b7b8f38
updates
dveeden Feb 18, 2025
28d3be8
fixup
dveeden Feb 18, 2025
4a8e497
More testing
dveeden Feb 19, 2025
c74b18e
Update serialization/serialization.go
dveeden Feb 21, 2025
aa11671
Merge branch 'master' into gtid_tagged_log_event_serialized
dveeden Feb 21, 2025
86bf3fc
fixup
dveeden Feb 21, 2025
8d78a72
formatting
dveeden Feb 21, 2025
ba1dcc9
Update based on review
dveeden Feb 21, 2025
1558b91
Update serialization/serialization.go
dveeden Feb 23, 2025
da20b43
Merge branch 'master' into gtid_tagged_log_event_serialized
dveeden Feb 27, 2025
ab00980
Don't store the message in the event struct
dveeden Feb 27, 2025
80d1d62
use map to speedup lookup of fields
dveeden Feb 27, 2025
89b293f
Use decode method
dveeden Feb 27, 2025
a45d879
Change Size and LastNonIgnoreableField to uint8
dveeden Feb 28, 2025
085c631
Change field ID to uint8
dveeden Feb 28, 2025
ac2d26f
Use stringParts as suggested in review
dveeden Feb 28, 2025
3a1789b
fixup
dveeden Feb 28, 2025
14de33c
Set original values to immediate values if skipped
dveeden Feb 28, 2025
173111f
Update serialization/serialization.go
dveeden Mar 1, 2025
f129364
Update serialization/serialization.go
dveeden Mar 1, 2025
b7e35fe
test my suggestion
lance6716 Mar 3, 2025
78157e3
fix unit test
lance6716 Mar 3, 2025
9b61d61
try to fix unit test again
lance6716 Mar 3, 2025
dfc7475
Merge branch 'master' into gtid_tagged_log_event_serialized
lance6716 Mar 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 203 additions & 1 deletion replication/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"unicode"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/serialization"
"github.com/google/uuid"
"github.com/pingcap/errors"
)
Expand Down Expand Up @@ -420,6 +421,7 @@ func (e *QueryEvent) Dump(w io.Writer) {
type GTIDEvent struct {
CommitFlag uint8
SID []byte
Tag string
GNO int64
LastCommitted int64
SequenceNumber int64
Expand Down Expand Up @@ -512,7 +514,11 @@ func (e *GTIDEvent) Dump(w io.Writer) {

fmt.Fprintf(w, "Commit flag: %d\n", e.CommitFlag)
u, _ := uuid.FromBytes(e.SID)
fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO)
if e.Tag != "" {
fmt.Fprintf(w, "GTID_NEXT: %s:%s:%d\n", u.String(), e.Tag, e.GNO)
} else {
fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO)
}
fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted)
fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber)
fmt.Fprintf(w, "Immediate commmit timestamp: %d (%s)\n", e.ImmediateCommitTimestamp, fmtTime(e.ImmediateCommitTime()))
Expand Down Expand Up @@ -543,6 +549,202 @@ func (e *GTIDEvent) OriginalCommitTime() time.Time {
return microSecTimestampToTime(e.OriginalCommitTimestamp)
}

// GtidTaggedLogEvent is for a GTID event with a tag.
// This is similar to GTIDEvent, but it has a tag and uses a different serialization format.
type GtidTaggedLogEvent struct {
GTIDEvent
}

func (e *GtidTaggedLogEvent) Decode(data []byte) error {
msg := serialization.Message{
Format: serialization.Format{
Fields: []serialization.Field{
{
Name: "gtid_flags",
Type: &serialization.FieldIntFixed{
Length: 1,
},
},
{
Name: "uuid",
Type: &serialization.FieldIntFixed{
Length: 16,
},
},
{
Name: "gno",
Type: &serialization.FieldIntVar{},
},
{
Name: "tag",
Type: &serialization.FieldString{},
},
{
Name: "last_committed",
Type: &serialization.FieldIntVar{},
},
{
Name: "sequence_number",
Type: &serialization.FieldIntVar{},
},
{
Name: "immediate_commit_timestamp",
Type: &serialization.FieldUintVar{},
},
{
Name: "original_commit_timestamp",
Type: &serialization.FieldUintVar{},
Optional: true,
},
{
Name: "transaction_length",
Type: &serialization.FieldUintVar{},
},
{
Name: "immediate_server_version",
Type: &serialization.FieldUintVar{},
},
{
Name: "original_server_version",
Type: &serialization.FieldUintVar{},
Optional: true,
},
{
Name: "commit_group_ticket",
Optional: true,
},
},
},
}

err := serialization.Unmarshal(data, &msg)
if err != nil {
return err
}

f, err := msg.GetFieldByName("gtid_flags")
if err != nil {
return err
}
if v, ok := f.Type.(*serialization.FieldIntFixed); ok {
e.CommitFlag = v.Value[0]
} else {
return errors.New("failed to get gtid_flags field")
}

f, err = msg.GetFieldByName("uuid")
if err != nil {
return err
}
if v, ok := f.Type.(*serialization.FieldIntFixed); ok {
e.SID = v.Value
} else {
return errors.New("failed to get uuid field")
}

f, err = msg.GetFieldByName("gno")
if err != nil {
return err
}
if v, ok := f.Type.(*serialization.FieldIntVar); ok {
e.GNO = v.Value
} else {
return errors.New("failed to get gno field")
}

f, err = msg.GetFieldByName("tag")
if err != nil {
return err
}
if v, ok := f.Type.(*serialization.FieldString); ok {
e.Tag = v.Value
} else {
return errors.New("failed to get tag field")
}

f, err = msg.GetFieldByName("last_committed")
if err != nil {
return err
}
if v, ok := f.Type.(*serialization.FieldIntVar); ok {
e.LastCommitted = v.Value
} else {
return errors.New("failed to get last_committed field")
}

f, err = msg.GetFieldByName("sequence_number")
if err != nil {
return err
}
if v, ok := f.Type.(*serialization.FieldIntVar); ok {
e.SequenceNumber = v.Value
} else {
return errors.New("failed to get sequence_number field")
}

f, err = msg.GetFieldByName("immediate_commit_timestamp")
if err != nil {
return err
}
if v, ok := f.Type.(*serialization.FieldUintVar); ok {
e.ImmediateCommitTimestamp = v.Value
} else {
return errors.New("failed to get immediate_commit_timestamp field")
}

f, err = msg.GetFieldByName("original_commit_timestamp")
if err != nil {
return err
}
if v, ok := f.Type.(*serialization.FieldUintVar); ok {
if f.Skipped {
e.OriginalCommitTimestamp = e.ImmediateCommitTimestamp
} else {
e.OriginalCommitTimestamp = v.Value
}
} else {
return errors.New("failed to get original_commit_timestamp field")
}

f, err = msg.GetFieldByName("immediate_server_version")
if err != nil {
return err
}
if v, ok := f.Type.(*serialization.FieldUintVar); ok {
e.ImmediateServerVersion = uint32(v.Value)
} else {
return errors.New("failed to get immediate_server_version field")
}

f, err = msg.GetFieldByName("original_server_version")
if err != nil {
return err
}
if v, ok := f.Type.(*serialization.FieldUintVar); ok {
if f.Skipped {
e.OriginalServerVersion = e.ImmediateServerVersion
} else {
e.OriginalServerVersion = uint32(v.Value)
}
} else {
return errors.New("failed to get original_server_version field")
}

f, err = msg.GetFieldByName("transaction_length")
if err != nil {
return err
}
if v, ok := f.Type.(*serialization.FieldUintVar); ok {
e.TransactionLength = v.Value
} else {
return errors.New("failed to get transaction_length field")
}

// TODO: add and test commit_group_ticket
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't yet been able to generate any events with an actual commit_group_ticket.

I first tried with binlog_group_commit_sync_delay=... but that didn't make any difference.

Then looking at the code I noticed that this has to do with Group Replication (part of InnoDB Cluster), So I setup a cluster with 3 sandbox instances... but that also didn't give me any events for testing.

Looks like this might also be known as BGC (Binlog Group Commit) tickets.


return nil
}

type BeginLoadQueryEvent struct {
FileID uint32
BlockData []byte
Expand Down
2 changes: 2 additions & 0 deletions replication/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
e = &GTIDEvent{}
case ANONYMOUS_GTID_EVENT:
e = &GTIDEvent{}
case GTID_TAGGED_LOG_EVENT:
e = &GtidTaggedLogEvent{}
case BEGIN_LOAD_QUERY_EVENT:
e = &BeginLoadQueryEvent{}
case EXECUTE_LOAD_QUERY_EVENT:
Expand Down
Loading
Loading