From 3ea3d678063cb8aa2ac636eb008d23e2d16601e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 14 Feb 2025 16:02:17 +0100 Subject: [PATCH 01/36] replication: Add mysql::serialization based Gtid Log Event --- replication/event.go | 89 ++++++++++++ replication/parser.go | 2 + serialization/serialization.go | 255 +++++++++++++++++++++++++++++++++ 3 files changed, 346 insertions(+) create mode 100644 serialization/serialization.go diff --git a/replication/event.go b/replication/event.go index d16aa574c..dc55a7e5a 100644 --- a/replication/event.go +++ b/replication/event.go @@ -12,6 +12,7 @@ import ( "unicode" "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/serialization" "github.com/google/uuid" "github.com/pingcap/errors" ) @@ -543,6 +544,94 @@ func (e *GTIDEvent) OriginalCommitTime() time.Time { return microSecTimestampToTime(e.OriginalCommitTimestamp) } +type GtidTaggedLogEvent struct { + msg serialization.Message +} + +func (e *GtidTaggedLogEvent) Decode(data []byte) error { + e.msg = serialization.Message{ + Format: serialization.Format{ + Fields: []serialization.Field{ + { + Name: "gtid_flags", + Type: serialization.FieldIntFixed{ + Length: 1, + }, + }, + { + Name: "uuid", + Type: serialization.FieldIntFixed{ + Length: 16, + }, + }, + { + Name: "gno", + Type: serialization.FieldIntVar{}, + }, + { + Name: "tag", + Type: serialization.FieldString{}, + }, + { + Name: "last_committed", + Type: serialization.FieldIntVar{}, + }, + { + Name: "sequence_number", + Type: serialization.FieldIntVar{}, + }, + { + Name: "immediate_commit_timestamp", + Type: serialization.FieldIntVar{ + Unsigned: true, + }, + }, + { + Name: "original_commit_timestamp", + Type: serialization.FieldIntVar{ + Unsigned: true, + }, + Optional: true, + }, + { + Name: "transaction_length", + Type: serialization.FieldIntVar{ + Unsigned: true, + }, + }, + { + Name: "immediate_server_version", + Type: serialization.FieldIntVar{ + Unsigned: true, + }, + }, + { + Name: "original_server_version", + Type: serialization.FieldIntVar{ + Unsigned: true, + }, + Optional: true, + }, + { + Name: "commit_group_ticket", + Optional: true, + }, + }, + }, + } + + err := serialization.Unmarshal(data, &e.msg) + if err != nil { + return err + } + + return nil +} + +func (e *GtidTaggedLogEvent) Dump(w io.Writer) { + fmt.Println(e.msg.String()) +} + type BeginLoadQueryEvent struct { FileID uint32 BlockData []byte diff --git a/replication/parser.go b/replication/parser.go index 4caf496c2..afa331d5b 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -293,6 +293,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( e = >IDEvent{} case ANONYMOUS_GTID_EVENT: e = >IDEvent{} + case GTID_TAGGED_LOG_EVENT: + e = &GtidTaggedLogEvent{} case BEGIN_LOAD_QUERY_EVENT: e = &BeginLoadQueryEvent{} case EXECUTE_LOAD_QUERY_EVENT: diff --git a/serialization/serialization.go b/serialization/serialization.go new file mode 100644 index 000000000..bade45842 --- /dev/null +++ b/serialization/serialization.go @@ -0,0 +1,255 @@ +package serialization + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "slices" + "strings" +) + +// mysql::serialization is a serialization format introduced with tagged GTIDs +// +// https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html + +type Message struct { + Version uint8 // >= 0 + Format Format +} + +func (m *Message) String() (text string) { + text += fmt.Sprintf("Message (version: %d)", m.Version) + for _, line := range strings.Split(m.Format.String(), "\n") { + text += "\n " + line + } + return +} + +type Format struct { + Size uint64 + LastNonIgnorableField int + Fields []Field +} + +func (f *Format) String() (text string) { + text += fmt.Sprintf("Format (Size: %d, LastNonIgnorableField: %d)\n", f.Size, f.LastNonIgnorableField) + for _, f := range f.Fields { + text += fmt.Sprintf("Field %02d (Name: %s, Skipped: %t, Type: %T)\n", f.ID, f.Name, f.Skipped, f.Type) + if f.Type != nil { + text += fmt.Sprintf(" Value: %s\n", f.Type.String()) + } + } + return text +} + +type Field struct { + ID int + Type FieldType + Optional bool + Name string + Skipped bool +} + +type FieldType interface { + fmt.Stringer +} + +type FieldIntFixed struct { + Length int // Length of value before encoding, encoded value can be more + Value []byte +} + +func (f FieldIntFixed) String() string { + if f.Value == nil { + return "" + } + return fmt.Sprintf("0x%x", f.Value) +} + +type FieldIntVar struct { + Value uint64 + Unsigned bool +} + +func (f FieldIntVar) String() string { + return fmt.Sprintf("%d", f.Value) +} + +type FieldString struct { + Value string +} + +func (f FieldString) String() string { + return string(f.Value) +} + +type Marshaler interface { + MarshalMySQLSerial() ([]byte, error) +} + +func Unmarshal(data []byte, v interface{}) error { + r := bytes.NewReader(data) + switch m := v.(type) { + case *Message: + messageLen := 1 + tmpVer := make([]byte, messageLen) + _, err := r.Read(tmpVer) + if err != nil { + return err + } + m.Version = tmpVer[0] / 2 + + err = Unmarshal(data[messageLen:], &m.Format) + if err != nil { + return err + } + case *Format: + formatLen := 2 + tmpFormat := make([]byte, formatLen) + _, err := r.Read(tmpFormat) + if err != nil { + return err + } + m.Size = uint64(tmpFormat[0] / 2) + m.LastNonIgnorableField = int(tmpFormat[1] / 2) + + for i := 0; i < len(m.Fields); i++ { + tmpField := make([]byte, 1) + _, err := r.Read(tmpField) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err + } + if int(tmpField[0]/2) != i { + // The field number we got doesn't match what we expect, so a field was skipped. + // Rewind the reader and skip. + m.Fields[i].ID = i + m.Fields[i].Skipped = true + r.Seek(-1, io.SeekCurrent) + continue + } + m.Fields[i].ID = int(tmpField[0] / 2) + switch f := m.Fields[i].Type.(type) { + case FieldIntFixed: + tmpVal, err := decodeFixed(r, f.Length) + if err != nil { + return err + } + f.Value = tmpVal + m.Fields[i].Type = f + case FieldIntVar: + firstByte := make([]byte, 1) + _, err := r.Read(firstByte) + if err != nil { + return err + } + tb := trailingOneBitCount(firstByte[0]) + _, err = r.Seek(-1, io.SeekCurrent) + if err != nil { + return err + } + fieldBytes := make([]byte, tb+1) + _, err = r.Read(fieldBytes) + if err != nil { + return err + } + var tNum uint64 + switch len(fieldBytes) { + case 1: + tNum = uint64(fieldBytes[0]) + case 2: + tNum = uint64(binary.LittleEndian.Uint16(fieldBytes)) + case 3: + tNum = uint64(binary.LittleEndian.Uint32(slices.Concat(fieldBytes, []byte{0x0}))) + case 4: + tNum = uint64(binary.LittleEndian.Uint32(fieldBytes)) + case 5: + tNum = binary.LittleEndian.Uint64(slices.Concat(fieldBytes, []byte{0x0, 0x0, 0x0})) + case 6: + tNum = binary.LittleEndian.Uint64(slices.Concat(fieldBytes, []byte{0x0, 0x0})) + case 7: + tNum = binary.LittleEndian.Uint64(slices.Concat(fieldBytes, []byte{0x0})) + case 8: + tNum = binary.LittleEndian.Uint64(fieldBytes) + } + if f.Unsigned { + f.Value = tNum >> (tb + 2) * 2 + } else { + f.Value = tNum >> (tb + 2) + } + m.Fields[i].Type = f + case FieldString: + firstByte := make([]byte, 1) + _, err := r.Read(firstByte) + if err != nil { + return err + } + strBytes := make([]byte, firstByte[0]/2) + _, err = r.Read(strBytes) + if err != nil { + return err + } + f.Value = string(strBytes) + m.Fields[i].Type = f + default: + return fmt.Errorf("unsupported field type: %T", m.Fields[i].Type) + } + } + + default: + return fmt.Errorf("unsupported type: %T", v) + } + return nil +} + +func decodeFixed(r io.Reader, len int) ([]byte, error) { + var b bytes.Buffer + + tmpInt := make([]byte, 1) + for { + _, err := r.Read(tmpInt) + if err != nil { + return nil, err + } + if tmpInt[0]%2 == 0 { + b.WriteByte(tmpInt[0] / 2) + } else { + tmpInt2 := make([]byte, 1) + _, err := r.Read(tmpInt2) + if err != nil { + return nil, err + } + switch tmpInt2[0] { + case 0x2: + b.WriteByte((tmpInt[0] >> 2) + 0x80) + case 0x3: + b.WriteByte((tmpInt[0] >> 2) + 0xc0) + default: + return nil, fmt.Errorf("unknown decoding for %v", tmpInt2[0]) + } + } + if b.Len() == len { + break + } + } + return b.Bytes(), nil +} + +func trailingOneBitCount(b byte) (count int) { + var i byte = 0x1 + for { + if b&i == 0 { + break + } + count++ + if i >= 0x80 { + break + } + i = i << 1 + } + return +} From a3d5dee680889bd575c8b7d130aeb09c17f825b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 14 Feb 2025 16:13:28 +0100 Subject: [PATCH 02/36] fixup --- serialization/serialization.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index bade45842..ab07b81d9 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -82,7 +82,7 @@ type FieldString struct { } func (f FieldString) String() string { - return string(f.Value) + return f.Value } type Marshaler interface { @@ -129,7 +129,10 @@ func Unmarshal(data []byte, v interface{}) error { // Rewind the reader and skip. m.Fields[i].ID = i m.Fields[i].Skipped = true - r.Seek(-1, io.SeekCurrent) + _, err := r.Seek(-1, io.SeekCurrent) + if err!=nil { + return err + } continue } m.Fields[i].ID = int(tmpField[0] / 2) From 10158f47ebf468157680e123f9fad8df95a102c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 14 Feb 2025 16:15:23 +0100 Subject: [PATCH 03/36] fixup --- serialization/serialization.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index ab07b81d9..d68bab609 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -130,7 +130,7 @@ func Unmarshal(data []byte, v interface{}) error { m.Fields[i].ID = i m.Fields[i].Skipped = true _, err := r.Seek(-1, io.SeekCurrent) - if err!=nil { + if err != nil { return err } continue From 09838f543ad957b12ec381d9c37762215552edf5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 14 Feb 2025 16:19:24 +0100 Subject: [PATCH 04/36] fixup --- serialization/serialization.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index d68bab609..57fe1d6e3 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -34,9 +34,11 @@ type Format struct { } func (f *Format) String() (text string) { - text += fmt.Sprintf("Format (Size: %d, LastNonIgnorableField: %d)\n", f.Size, f.LastNonIgnorableField) + text += fmt.Sprintf("Format (Size: %d, LastNonIgnorableField: %d)\n", + f.Size, f.LastNonIgnorableField) for _, f := range f.Fields { - text += fmt.Sprintf("Field %02d (Name: %s, Skipped: %t, Type: %T)\n", f.ID, f.Name, f.Skipped, f.Type) + text += fmt.Sprintf("Field %02d (Name: %s, Skipped: %t, Type: %T)\n", + f.ID, f.Name, f.Skipped, f.Type) if f.Type != nil { text += fmt.Sprintf(" Value: %s\n", f.Type.String()) } @@ -125,8 +127,8 @@ func Unmarshal(data []byte, v interface{}) error { return err } if int(tmpField[0]/2) != i { - // The field number we got doesn't match what we expect, so a field was skipped. - // Rewind the reader and skip. + // The field number we got doesn't match what we expect, + // so a field was skipped. Rewind the reader and skip. m.Fields[i].ID = i m.Fields[i].Skipped = true _, err := r.Seek(-1, io.SeekCurrent) @@ -167,15 +169,19 @@ func Unmarshal(data []byte, v interface{}) error { case 2: tNum = uint64(binary.LittleEndian.Uint16(fieldBytes)) case 3: - tNum = uint64(binary.LittleEndian.Uint32(slices.Concat(fieldBytes, []byte{0x0}))) + tNum = uint64(binary.LittleEndian.Uint32( + slices.Concat(fieldBytes, []byte{0x0}))) case 4: tNum = uint64(binary.LittleEndian.Uint32(fieldBytes)) case 5: - tNum = binary.LittleEndian.Uint64(slices.Concat(fieldBytes, []byte{0x0, 0x0, 0x0})) + tNum = binary.LittleEndian.Uint64( + slices.Concat(fieldBytes, []byte{0x0, 0x0, 0x0})) case 6: - tNum = binary.LittleEndian.Uint64(slices.Concat(fieldBytes, []byte{0x0, 0x0})) + tNum = binary.LittleEndian.Uint64( + slices.Concat(fieldBytes, []byte{0x0, 0x0})) case 7: - tNum = binary.LittleEndian.Uint64(slices.Concat(fieldBytes, []byte{0x0})) + tNum = binary.LittleEndian.Uint64( + slices.Concat(fieldBytes, []byte{0x0})) case 8: tNum = binary.LittleEndian.Uint64(fieldBytes) } From b6ceaf3df6256a7a1efc1ec21711af500ee8c0ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 14 Feb 2025 18:42:09 +0100 Subject: [PATCH 05/36] Add test and get field by name --- replication/event.go | 9 ++++ serialization/serialization.go | 9 ++++ serialization/serialization_test.go | 80 +++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+) create mode 100644 serialization/serialization_test.go diff --git a/replication/event.go b/replication/event.go index dc55a7e5a..1ebd960cf 100644 --- a/replication/event.go +++ b/replication/event.go @@ -630,6 +630,15 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { func (e *GtidTaggedLogEvent) Dump(w io.Writer) { fmt.Println(e.msg.String()) + + f, err := e.msg.GetFieldByName("immediate_server_version") + if err != nil { + return + } + + if v, ok := f.Type.(serialization.FieldIntVar); ok { + fmt.Printf("Immediate server version: %d\n",v.Value) + } } type BeginLoadQueryEvent struct { diff --git a/serialization/serialization.go b/serialization/serialization.go index 57fe1d6e3..009204004 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -27,6 +27,15 @@ func (m *Message) String() (text string) { return } +func (m *Message) GetFieldByName(name string) (Field, error) { + for _, f := range m.Format.Fields { + if f.Name == name { + return f, nil + } + } + return Field{}, fmt.Errorf("field not found: %s", name) +} + type Format struct { Size uint64 LastNonIgnorableField int diff --git a/serialization/serialization_test.go b/serialization/serialization_test.go new file mode 100644 index 000000000..8ffd1c2a8 --- /dev/null +++ b/serialization/serialization_test.go @@ -0,0 +1,80 @@ +package serialization + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTrailingOneBitCount(t *testing.T) { + testcases := []struct { + input byte + result int + }{ + {0b00000000, 0}, + {0b00000001, 1}, + {0b00000011, 2}, + {0b00000111, 3}, + {0b00001111, 4}, + {0b00011111, 5}, + {0b00111111, 6}, + {0b01111111, 7}, + {0b11111111, 8}, + {0b10000000, 0}, + {0b11111101, 1}, + } + + for _, tc := range testcases { + actual := trailingOneBitCount(tc.input) + require.Equal(t, tc.result, actual) + } +} + +func TestDecodeFixed(t *testing.T) { + testcases := []struct { + input []byte + len int + result []byte + err string + }{ + { + []byte{0xee, 0x81, 0x02, 0xc1, 0x02, 0x01, 0x03, 0x41, 0x03, 0x81, 0x03, 0xc1, 0x03, 0xc5, 0x03, 0x22, + 0x22, 0xee, 0xfd, 0x03, 0xee, 0xfd, 0x03, 0xee, 0xfd, 0x03}, + 16, + []byte{0x77, 0xa0, 0xb0, 0xc0, 0xd0, 0xe0, 0xf0, 0xf1, 0x11, 0x11, 0x77, 0xff, 0x77, 0xff, 0x77, 0xff}, + "", + }, + { + []byte{0xee, 0x81}, + 16, + []byte{}, + "EOF", + }, + { + []byte{}, + 16, + []byte{}, + "EOF", + }, + { + []byte{0xee, 0x81, 0x04, 0xc1, 0x02, 0x01, 0x03, 0x41, 0x03, 0x81, 0x03, 0xc1, 0x03, 0xc5, 0x03, 0x22, + 0x22, 0xee, 0xfd, 0x03, 0xee, 0xfd, 0x03, 0xee, 0xfd, 0x03}, + 16, + []byte{}, + "unknown decoding for", + }, + } + + for _, tc := range testcases { + actual, err := decodeFixed(bytes.NewReader(tc.input), tc.len) + if tc.err == "" { + require.NoError(t, err) + require.Equal(t, tc.result, actual) + require.Equal(t, tc.len, len(actual)) + } else { + require.ErrorContains(t, err, tc.err) + } + + } +} From 3bd47aa59b78dc7a6eb74f51836f32d0449cea83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Sat, 15 Feb 2025 11:32:04 +0100 Subject: [PATCH 06/36] more testing --- serialization/serialization.go | 132 ++++++++++++++++------------ serialization/serialization_test.go | 39 ++++++++ 2 files changed, 113 insertions(+), 58 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index 009204004..f77a7ac1b 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -1,3 +1,8 @@ +// Package serialization is for working with the mysql::serialization format +// +// mysql::serialization is a serialization format introduced with tagged GTIDs +// +// https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html package serialization import ( @@ -10,10 +15,7 @@ import ( "strings" ) -// mysql::serialization is a serialization format introduced with tagged GTIDs -// -// https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html - +// Message is a mysql::serialization message type Message struct { Version uint8 // >= 0 Format Format @@ -27,6 +29,7 @@ func (m *Message) String() (text string) { return } +// GetFieldByName returns a field if the name matches and an error if there is no match func (m *Message) GetFieldByName(name string) (Field, error) { for _, f := range m.Format.Fields { if f.Name == name { @@ -36,6 +39,7 @@ func (m *Message) GetFieldByName(name string) (Field, error) { return Field{}, fmt.Errorf("field not found: %s", name) } +// Format is describing a `message_format` type Format struct { Size uint64 LastNonIgnorableField int @@ -96,10 +100,6 @@ func (f FieldString) String() string { return f.Value } -type Marshaler interface { - MarshalMySQLSerial() ([]byte, error) -} - func Unmarshal(data []byte, v interface{}) error { r := bytes.NewReader(data) switch m := v.(type) { @@ -149,69 +149,22 @@ func Unmarshal(data []byte, v interface{}) error { m.Fields[i].ID = int(tmpField[0] / 2) switch f := m.Fields[i].Type.(type) { case FieldIntFixed: - tmpVal, err := decodeFixed(r, f.Length) + f.Value, err = decodeFixed(r, f.Length) if err != nil { return err } - f.Value = tmpVal m.Fields[i].Type = f case FieldIntVar: - firstByte := make([]byte, 1) - _, err := r.Read(firstByte) - if err != nil { - return err - } - tb := trailingOneBitCount(firstByte[0]) - _, err = r.Seek(-1, io.SeekCurrent) + f.Value, err = decodeVar(r, f.Unsigned) if err != nil { return err } - fieldBytes := make([]byte, tb+1) - _, err = r.Read(fieldBytes) - if err != nil { - return err - } - var tNum uint64 - switch len(fieldBytes) { - case 1: - tNum = uint64(fieldBytes[0]) - case 2: - tNum = uint64(binary.LittleEndian.Uint16(fieldBytes)) - case 3: - tNum = uint64(binary.LittleEndian.Uint32( - slices.Concat(fieldBytes, []byte{0x0}))) - case 4: - tNum = uint64(binary.LittleEndian.Uint32(fieldBytes)) - case 5: - tNum = binary.LittleEndian.Uint64( - slices.Concat(fieldBytes, []byte{0x0, 0x0, 0x0})) - case 6: - tNum = binary.LittleEndian.Uint64( - slices.Concat(fieldBytes, []byte{0x0, 0x0})) - case 7: - tNum = binary.LittleEndian.Uint64( - slices.Concat(fieldBytes, []byte{0x0})) - case 8: - tNum = binary.LittleEndian.Uint64(fieldBytes) - } - if f.Unsigned { - f.Value = tNum >> (tb + 2) * 2 - } else { - f.Value = tNum >> (tb + 2) - } m.Fields[i].Type = f case FieldString: - firstByte := make([]byte, 1) - _, err := r.Read(firstByte) - if err != nil { - return err - } - strBytes := make([]byte, firstByte[0]/2) - _, err = r.Read(strBytes) + f.Value, err = decodeString(r) if err != nil { return err } - f.Value = string(strBytes) m.Fields[i].Type = f default: return fmt.Errorf("unsupported field type: %T", m.Fields[i].Type) @@ -224,6 +177,24 @@ func Unmarshal(data []byte, v interface{}) error { return nil } +func decodeString(r io.Reader) (string, error) { + firstByte := make([]byte, 1) + _, err := r.Read(firstByte) + if err != nil { + return "", err + } + strBytes := make([]byte, firstByte[0]/2) + n, err := r.Read(strBytes) + if err != nil { + return "", err + } + if n != int(firstByte[0]/2) { + return "", fmt.Errorf("only read %d bytes, expected %d", n, firstByte[0]/2) + } + fmt.Printf("string: %s (%x)\n", string(strBytes), strBytes) + return string(strBytes), nil +} + func decodeFixed(r io.Reader, len int) ([]byte, error) { var b bytes.Buffer @@ -257,6 +228,51 @@ func decodeFixed(r io.Reader, len int) ([]byte, error) { return b.Bytes(), nil } +func decodeVar(r io.ReadSeeker, unsigned bool) (uint64, error) { + firstByte := make([]byte, 1) + _, err := r.Read(firstByte) + if err != nil { + return 0, err + } + tb := trailingOneBitCount(firstByte[0]) + _, err = r.Seek(-1, io.SeekCurrent) + if err != nil { + return 0, err + } + fieldBytes := make([]byte, tb+1) + _, err = r.Read(fieldBytes) + if err != nil { + return 0, err + } + var tNum uint64 + switch len(fieldBytes) { + case 1: + tNum = uint64(fieldBytes[0]) + case 2: + tNum = uint64(binary.LittleEndian.Uint16(fieldBytes)) + case 3: + tNum = uint64(binary.LittleEndian.Uint32( + slices.Concat(fieldBytes, []byte{0x0}))) + case 4: + tNum = uint64(binary.LittleEndian.Uint32(fieldBytes)) + case 5: + tNum = binary.LittleEndian.Uint64( + slices.Concat(fieldBytes, []byte{0x0, 0x0, 0x0})) + case 6: + tNum = binary.LittleEndian.Uint64( + slices.Concat(fieldBytes, []byte{0x0, 0x0})) + case 7: + tNum = binary.LittleEndian.Uint64( + slices.Concat(fieldBytes, []byte{0x0})) + case 8: + tNum = binary.LittleEndian.Uint64(fieldBytes) + } + if unsigned { + return tNum >> (tb + 2) * 2, nil + } + return tNum >> (tb + 2), nil +} + func trailingOneBitCount(b byte) (count int) { var i byte = 0x1 for { diff --git a/serialization/serialization_test.go b/serialization/serialization_test.go index 8ffd1c2a8..96c3718d5 100644 --- a/serialization/serialization_test.go +++ b/serialization/serialization_test.go @@ -78,3 +78,42 @@ func TestDecodeFixed(t *testing.T) { } } + +func TestDecodeString(t *testing.T) { + testcases := []struct { + input []byte + result string + err string + }{ + { + []byte{0x18, 0x61, 0x62, 0x63, 0x64, 0x65, 0x66, 0x67, 0x68, 0x69, 0x6a, 0x6b, 0x6c}, + "abcdefghijkl", + "", + }, + { + []byte{0x18, 0x61, 0x62, 0x63, 0x64, 0x65, 0x66, 0x67}, + "", + "only read ", + }, + { + []byte{}, + "", + "EOF", + }, + { + []byte{0x18}, + "", + "EOF", + }, + } + + for _, tc := range testcases { + s, err := decodeString(bytes.NewReader(tc.input)) + if tc.err == "" { + require.NoError(t, err) + require.Equal(t, tc.result, s) + }else { + require.ErrorContains(t, err, tc.err) + } + } +} From 0b445a199c69c69f90b5317022663c7baac5ebad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Sat, 15 Feb 2025 12:11:49 +0100 Subject: [PATCH 07/36] More testing --- serialization/serialization.go | 5 ++- serialization/serialization_test.go | 64 ++++++++++++++++++++++++++++- 2 files changed, 67 insertions(+), 2 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index f77a7ac1b..39d0b2dbf 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -240,10 +240,13 @@ func decodeVar(r io.ReadSeeker, unsigned bool) (uint64, error) { return 0, err } fieldBytes := make([]byte, tb+1) - _, err = r.Read(fieldBytes) + n, err := r.Read(fieldBytes) if err != nil { return 0, err } + if n != tb+1{ + return 0, fmt.Errorf("only read %d bytes, expected %d", n, tb+1) + } var tNum uint64 switch len(fieldBytes) { case 1: diff --git a/serialization/serialization_test.go b/serialization/serialization_test.go index 96c3718d5..d7606d445 100644 --- a/serialization/serialization_test.go +++ b/serialization/serialization_test.go @@ -112,7 +112,69 @@ func TestDecodeString(t *testing.T) { if tc.err == "" { require.NoError(t, err) require.Equal(t, tc.result, s) - }else { + } else { + require.ErrorContains(t, err, tc.err) + } + } +} + +func TestDecodeVar(t *testing.T) { + testcases := []struct { + input []byte + unsigned bool + result uint64 + err string + }{ + { + []byte{}, + false, + 0, + "EOF", + }, + { + []byte{0xd9}, + false, + 0, + "only read ", + }, + { + []byte{0x4}, + false, + 1, + "", + }, + { + []byte{0xd9, 0x03}, + false, + 123, + "", + }, + { + []byte{0xc3, 02, 0x0b}, + true, + 90200, + "", + }, + // { + // []byte{0x5d, 0x03}, + // true, + // 215, + // "", + // }, + // { + // []byte{0x7f, 0x39, 0x7d, 0x89, 0x70, 0xdb, 0x2d, 0x06}, + // true, + // 1739270369410361, + // "", + // }, + } + + for _, tc := range testcases{ + r, err := decodeVar(bytes.NewReader(tc.input), tc.unsigned) + if tc.err == "" { + require.NoError(t, err) + require.Equal(t, tc.result, r) + } else { require.ErrorContains(t, err, tc.err) } } From b55f86ecf9a5ee4959943b2d455e916b81fa75c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Sat, 15 Feb 2025 12:41:54 +0100 Subject: [PATCH 08/36] fixup --- serialization/serialization.go | 3 +- serialization/serialization_test.go | 50 +++++++++++++++++++++++------ 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index 39d0b2dbf..a587f8980 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -191,7 +191,6 @@ func decodeString(r io.Reader) (string, error) { if n != int(firstByte[0]/2) { return "", fmt.Errorf("only read %d bytes, expected %d", n, firstByte[0]/2) } - fmt.Printf("string: %s (%x)\n", string(strBytes), strBytes) return string(strBytes), nil } @@ -271,7 +270,7 @@ func decodeVar(r io.ReadSeeker, unsigned bool) (uint64, error) { tNum = binary.LittleEndian.Uint64(fieldBytes) } if unsigned { - return tNum >> (tb + 2) * 2, nil + return (tNum >> (tb + 2) * 2)+1, nil } return tNum >> (tb + 2), nil } diff --git a/serialization/serialization_test.go b/serialization/serialization_test.go index d7606d445..058e231c5 100644 --- a/serialization/serialization_test.go +++ b/serialization/serialization_test.go @@ -149,31 +149,63 @@ func TestDecodeVar(t *testing.T) { 123, "", }, + // { + // []byte{0xc3, 02, 0x0b}, + // true, + // 90200, + // "", + // }, { - []byte{0xc3, 02, 0x0b}, + // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html + // But converted to LE + []byte{0b11111011, 0b11111111, 0b00000111}, true, - 90200, + 65535, + "", + }, + { + // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html + // But converted to LE + []byte{0b11111011, 0b11111111, 0b00001111}, + false, + 65535, "", }, // { - // []byte{0x5d, 0x03}, - // true, - // 215, + // // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html + // // But converted to LE + // []byte{0b11101011, 0b11111111, 0b00001111}, + // false, + // -65535, // "", // }, // { - // []byte{0x7f, 0x39, 0x7d, 0x89, 0x70, 0xdb, 0x2d, 0x06}, - // true, - // 1739270369410361, + // // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html + // // But converted to LE + // []byte{0b11111011, 0b11111111, 0b00001111}, + // false, + // -65536, // "", // }, + { + []byte{0x5d, 0x03}, + true, + 215, + "", + }, + { + []byte{0x7f, 0x39, 0x7d, 0x89, 0x70, 0xdb, 0x2d, 0x06}, + true, + 1739270369410361, + "", + }, } for _, tc := range testcases{ r, err := decodeVar(bytes.NewReader(tc.input), tc.unsigned) if tc.err == "" { require.NoError(t, err) - require.Equal(t, tc.result, r) + require.Equal(t, tc.result, r, tc.result) } else { require.ErrorContains(t, err, tc.err) } From 30285276f08321adeb0b0b49afd5c8eac171952d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Sun, 16 Feb 2025 11:42:16 +0100 Subject: [PATCH 09/36] prepare for negative values --- replication/event.go | 26 +++++++------------- serialization/serialization.go | 37 +++++++++++++++++++++++------ serialization/serialization_test.go | 27 ++++++++++----------- 3 files changed, 51 insertions(+), 39 deletions(-) diff --git a/replication/event.go b/replication/event.go index 1ebd960cf..57fe414b5 100644 --- a/replication/event.go +++ b/replication/event.go @@ -582,34 +582,24 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { }, { Name: "immediate_commit_timestamp", - Type: serialization.FieldIntVar{ - Unsigned: true, - }, + Type: serialization.FieldUintVar{}, }, { - Name: "original_commit_timestamp", - Type: serialization.FieldIntVar{ - Unsigned: true, - }, + Name: "original_commit_timestamp", + Type: serialization.FieldUintVar{}, Optional: true, }, { Name: "transaction_length", - Type: serialization.FieldIntVar{ - Unsigned: true, - }, + Type: serialization.FieldUintVar{}, }, { Name: "immediate_server_version", - Type: serialization.FieldIntVar{ - Unsigned: true, - }, + Type: serialization.FieldUintVar{}, }, { - Name: "original_server_version", - Type: serialization.FieldIntVar{ - Unsigned: true, - }, + Name: "original_server_version", + Type: serialization.FieldUintVar{}, Optional: true, }, { @@ -637,7 +627,7 @@ func (e *GtidTaggedLogEvent) Dump(w io.Writer) { } if v, ok := f.Type.(serialization.FieldIntVar); ok { - fmt.Printf("Immediate server version: %d\n",v.Value) + fmt.Printf("Immediate server version: %d\n", v.Value) } } diff --git a/serialization/serialization.go b/serialization/serialization.go index a587f8980..da22a77af 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -84,14 +84,21 @@ func (f FieldIntFixed) String() string { } type FieldIntVar struct { - Value uint64 - Unsigned bool + Value int64 } func (f FieldIntVar) String() string { return fmt.Sprintf("%d", f.Value) } +type FieldUintVar struct { + Value uint64 +} + +func (f FieldUintVar) String() string { + return fmt.Sprintf("%d", f.Value) +} + type FieldString struct { Value string } @@ -154,11 +161,27 @@ func Unmarshal(data []byte, v interface{}) error { return err } m.Fields[i].Type = f + case FieldUintVar: + val, err := decodeVar(r, true) + if err != nil { + return err + } + if uintval, ok := val.(uint64); ok { + f.Value = uintval + } else { + return errors.New("unexpected type, expecting uint64") + } + m.Fields[i].Type = f case FieldIntVar: - f.Value, err = decodeVar(r, f.Unsigned) + val, err := decodeVar(r, false) if err != nil { return err } + if intval, ok := val.(int64); ok { + f.Value = intval + } else { + return errors.New("unexpected type, expecting int64") + } m.Fields[i].Type = f case FieldString: f.Value, err = decodeString(r) @@ -227,7 +250,7 @@ func decodeFixed(r io.Reader, len int) ([]byte, error) { return b.Bytes(), nil } -func decodeVar(r io.ReadSeeker, unsigned bool) (uint64, error) { +func decodeVar(r io.ReadSeeker, unsigned bool) (interface{}, error) { firstByte := make([]byte, 1) _, err := r.Read(firstByte) if err != nil { @@ -243,7 +266,7 @@ func decodeVar(r io.ReadSeeker, unsigned bool) (uint64, error) { if err != nil { return 0, err } - if n != tb+1{ + if n != tb+1 { return 0, fmt.Errorf("only read %d bytes, expected %d", n, tb+1) } var tNum uint64 @@ -270,9 +293,9 @@ func decodeVar(r io.ReadSeeker, unsigned bool) (uint64, error) { tNum = binary.LittleEndian.Uint64(fieldBytes) } if unsigned { - return (tNum >> (tb + 2) * 2)+1, nil + return (tNum >> (tb + 2) * 2) + 1, nil } - return tNum >> (tb + 2), nil + return int64(tNum >> (tb + 2)), nil } func trailingOneBitCount(b byte) (count int) { diff --git a/serialization/serialization_test.go b/serialization/serialization_test.go index 058e231c5..63f0abd59 100644 --- a/serialization/serialization_test.go +++ b/serialization/serialization_test.go @@ -75,7 +75,6 @@ func TestDecodeFixed(t *testing.T) { } else { require.ErrorContains(t, err, tc.err) } - } } @@ -120,10 +119,10 @@ func TestDecodeString(t *testing.T) { func TestDecodeVar(t *testing.T) { testcases := []struct { - input []byte + input []byte unsigned bool - result uint64 - err string + result interface{} + err string }{ { []byte{}, @@ -140,19 +139,19 @@ func TestDecodeVar(t *testing.T) { { []byte{0x4}, false, - 1, + int64(1), "", }, { []byte{0xd9, 0x03}, false, - 123, + int64(123), "", }, // { // []byte{0xc3, 02, 0x0b}, // true, - // 90200, + // uint64(90200), // "", // }, { @@ -160,7 +159,7 @@ func TestDecodeVar(t *testing.T) { // But converted to LE []byte{0b11111011, 0b11111111, 0b00000111}, true, - 65535, + uint64(65535), "", }, { @@ -168,7 +167,7 @@ func TestDecodeVar(t *testing.T) { // But converted to LE []byte{0b11111011, 0b11111111, 0b00001111}, false, - 65535, + int64(65535), "", }, // { @@ -176,7 +175,7 @@ func TestDecodeVar(t *testing.T) { // // But converted to LE // []byte{0b11101011, 0b11111111, 0b00001111}, // false, - // -65535, + // int64(-65535), // "", // }, // { @@ -184,24 +183,24 @@ func TestDecodeVar(t *testing.T) { // // But converted to LE // []byte{0b11111011, 0b11111111, 0b00001111}, // false, - // -65536, + // int64(-65536), // "", // }, { []byte{0x5d, 0x03}, true, - 215, + uint64(215), "", }, { []byte{0x7f, 0x39, 0x7d, 0x89, 0x70, 0xdb, 0x2d, 0x06}, true, - 1739270369410361, + uint64(1739270369410361), "", }, } - for _, tc := range testcases{ + for _, tc := range testcases { r, err := decodeVar(bytes.NewReader(tc.input), tc.unsigned) if tc.err == "" { require.NoError(t, err) From 76b98bbf5cdfc4141a80e6ced5d106858fe590ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Sun, 16 Feb 2025 13:22:45 +0100 Subject: [PATCH 10/36] Update based on review --- serialization/serialization.go | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index da22a77af..db2fad3c0 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "io" + "math/bits" "slices" "strings" ) @@ -298,17 +299,6 @@ func decodeVar(r io.ReadSeeker, unsigned bool) (interface{}, error) { return int64(tNum >> (tb + 2)), nil } -func trailingOneBitCount(b byte) (count int) { - var i byte = 0x1 - for { - if b&i == 0 { - break - } - count++ - if i >= 0x80 { - break - } - i = i << 1 - } - return +func trailingOneBitCount(b byte) int { + return bits.TrailingZeros8(^b) } From 0237595c16cfc5ec60d2c5d4f1212b541a6260a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Mon, 17 Feb 2025 11:19:04 +0100 Subject: [PATCH 11/36] fixup --- serialization/serialization.go | 2 +- serialization/serialization_test.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index db2fad3c0..04df7395c 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -294,7 +294,7 @@ func decodeVar(r io.ReadSeeker, unsigned bool) (interface{}, error) { tNum = binary.LittleEndian.Uint64(fieldBytes) } if unsigned { - return (tNum >> (tb + 2) * 2) + 1, nil + return tNum >> (tb + 1), nil } return int64(tNum >> (tb + 2)), nil } diff --git a/serialization/serialization_test.go b/serialization/serialization_test.go index 63f0abd59..f31cf6ae0 100644 --- a/serialization/serialization_test.go +++ b/serialization/serialization_test.go @@ -148,12 +148,12 @@ func TestDecodeVar(t *testing.T) { int64(123), "", }, - // { - // []byte{0xc3, 02, 0x0b}, - // true, - // uint64(90200), - // "", - // }, + { + []byte{0xc3, 02, 0x0b}, + true, + uint64(90200), + "", + }, { // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html // But converted to LE From 209cfadb491a0b55b0222087075d2bd998a725aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Mon, 17 Feb 2025 11:52:58 +0100 Subject: [PATCH 12/36] fixup --- serialization/serialization.go | 5 +++- serialization/serialization_test.go | 38 ++++++++++++++++------------- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index 04df7395c..56cb1976c 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -296,7 +296,10 @@ func decodeVar(r io.ReadSeeker, unsigned bool) (interface{}, error) { if unsigned { return tNum >> (tb + 1), nil } - return int64(tNum >> (tb + 2)), nil + if positive := (tNum>>(tb+1))&1==0; positive { + return int64(tNum >> (tb + 2)), nil + } + return int64(-(1+(tNum >> (tb + 2)))), nil } func trailingOneBitCount(b byte) int { diff --git a/serialization/serialization_test.go b/serialization/serialization_test.go index f31cf6ae0..0212ffab7 100644 --- a/serialization/serialization_test.go +++ b/serialization/serialization_test.go @@ -157,6 +157,7 @@ func TestDecodeVar(t *testing.T) { { // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html // But converted to LE + // unsigned integer, 65535 []byte{0b11111011, 0b11111111, 0b00000111}, true, uint64(65535), @@ -165,27 +166,30 @@ func TestDecodeVar(t *testing.T) { { // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html // But converted to LE - []byte{0b11111011, 0b11111111, 0b00001111}, + // signed integer, 65535 + []byte{0b11110011, 0b11111111, 0b00001111}, false, int64(65535), "", }, - // { - // // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html - // // But converted to LE - // []byte{0b11101011, 0b11111111, 0b00001111}, - // false, - // int64(-65535), - // "", - // }, - // { - // // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html - // // But converted to LE - // []byte{0b11111011, 0b11111111, 0b00001111}, - // false, - // int64(-65536), - // "", - // }, + { + // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html + // But converted to LE + // signed integer, -65535 + []byte{0b11101011, 0b11111111, 0b00001111}, + false, + int64(-65535), + "", + }, + { + // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html + // But converted to LE + // signed integer, 65536 + []byte{0b11111011, 0b11111111, 0b00001111}, + false, + int64(-65536), + "", + }, { []byte{0x5d, 0x03}, true, From ad8acac99c28c442c073430de159a437918ab647 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Mon, 17 Feb 2025 11:59:31 +0100 Subject: [PATCH 13/36] fixup --- serialization/serialization.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index 56cb1976c..4852624e2 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -296,10 +296,10 @@ func decodeVar(r io.ReadSeeker, unsigned bool) (interface{}, error) { if unsigned { return tNum >> (tb + 1), nil } - if positive := (tNum>>(tb+1))&1==0; positive { + if positive := (tNum>>(tb+1))&1 == 0; positive { return int64(tNum >> (tb + 2)), nil } - return int64(-(1+(tNum >> (tb + 2)))), nil + return int64(-(1 + (tNum >> (tb + 2)))), nil } func trailingOneBitCount(b byte) int { From f412b2b08cc873053c3ea6e7cc94ce09aa88cda8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Mon, 17 Feb 2025 21:41:50 +0100 Subject: [PATCH 14/36] fixup --- serialization/serialization.go | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index 4852624e2..7c3bf41a5 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -60,6 +60,7 @@ func (f *Format) String() (text string) { return text } +// Field represents a `message_field` type Field struct { ID int Type FieldType @@ -68,10 +69,15 @@ type Field struct { Skipped bool } +// FieldType represents a `type_field` type FieldType interface { fmt.Stringer } +// FieldIntFixed is for values with a fixed length. +// This is also known as the 'fixlen_integer_format'. +// The encoded value can vary be between 1 and 2 times +// of that of the value before encoding. type FieldIntFixed struct { Length int // Length of value before encoding, encoded value can be more Value []byte @@ -84,6 +90,8 @@ func (f FieldIntFixed) String() string { return fmt.Sprintf("0x%x", f.Value) } +// FieldIntVar is using the signed integer variant of the 'varlen_integer_format' +// and encodes a value as a byte sequence of 1-9 bytes depending on the value. type FieldIntVar struct { Value int64 } @@ -92,6 +100,8 @@ func (f FieldIntVar) String() string { return fmt.Sprintf("%d", f.Value) } +// FieldUintVar is using the usigned integer variant of the 'varlen_integer_format' +// and encodes a value as a byte sequence of 1-9 bytes depending on the value. type FieldUintVar struct { Value uint64 } @@ -100,6 +110,7 @@ func (f FieldUintVar) String() string { return fmt.Sprintf("%d", f.Value) } +// FieldString is a 'string_format' field type FieldString struct { Value string } @@ -118,7 +129,7 @@ func Unmarshal(data []byte, v interface{}) error { if err != nil { return err } - m.Version = tmpVer[0] / 2 + m.Version = tmpVer[0] >> 1 err = Unmarshal(data[messageLen:], &m.Format) if err != nil { @@ -131,8 +142,8 @@ func Unmarshal(data []byte, v interface{}) error { if err != nil { return err } - m.Size = uint64(tmpFormat[0] / 2) - m.LastNonIgnorableField = int(tmpFormat[1] / 2) + m.Size = uint64(tmpFormat[0] >> 1) + m.LastNonIgnorableField = int(tmpFormat[1] >> 1) for i := 0; i < len(m.Fields); i++ { tmpField := make([]byte, 1) @@ -154,7 +165,7 @@ func Unmarshal(data []byte, v interface{}) error { } continue } - m.Fields[i].ID = int(tmpField[0] / 2) + m.Fields[i].ID = int(tmpField[0] >> 1) switch f := m.Fields[i].Type.(type) { case FieldIntFixed: f.Value, err = decodeFixed(r, f.Length) @@ -207,12 +218,12 @@ func decodeString(r io.Reader) (string, error) { if err != nil { return "", err } - strBytes := make([]byte, firstByte[0]/2) + strBytes := make([]byte, firstByte[0] >> 1) n, err := r.Read(strBytes) if err != nil { return "", err } - if n != int(firstByte[0]/2) { + if n != int(firstByte[0] >> 1) { return "", fmt.Errorf("only read %d bytes, expected %d", n, firstByte[0]/2) } return string(strBytes), nil @@ -228,7 +239,7 @@ func decodeFixed(r io.Reader, len int) ([]byte, error) { return nil, err } if tmpInt[0]%2 == 0 { - b.WriteByte(tmpInt[0] / 2) + b.WriteByte(tmpInt[0] >> 1) } else { tmpInt2 := make([]byte, 1) _, err := r.Read(tmpInt2) From a7e4280d47e7544f4989c5018b7588df47308e61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Tue, 18 Feb 2025 08:36:54 +0100 Subject: [PATCH 15/36] updates --- serialization/serialization.go | 161 +++++++++++----------------- serialization/serialization_test.go | 21 ++-- 2 files changed, 73 insertions(+), 109 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index 7c3bf41a5..0740a7234 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -10,7 +10,6 @@ import ( "encoding/binary" "errors" "fmt" - "io" "math/bits" "slices" "strings" @@ -120,61 +119,42 @@ func (f FieldString) String() string { } func Unmarshal(data []byte, v interface{}) error { - r := bytes.NewReader(data) switch m := v.(type) { case *Message: - messageLen := 1 - tmpVer := make([]byte, messageLen) - _, err := r.Read(tmpVer) - if err != nil { - return err - } - m.Version = tmpVer[0] >> 1 - - err = Unmarshal(data[messageLen:], &m.Format) + m.Version = data[0] >> 1 + err := Unmarshal(data[1:], &m.Format) if err != nil { return err } case *Format: - formatLen := 2 - tmpFormat := make([]byte, formatLen) - _, err := r.Read(tmpFormat) - if err != nil { - return err - } - m.Size = uint64(tmpFormat[0] >> 1) - m.LastNonIgnorableField = int(tmpFormat[1] >> 1) + pos := uint64(0) + m.Size = uint64(data[pos] >> 1) + pos++ + m.LastNonIgnorableField = int(data[pos] >> 1) + pos++ for i := 0; i < len(m.Fields); i++ { - tmpField := make([]byte, 1) - _, err := r.Read(tmpField) - if err != nil { - if errors.Is(err, io.EOF) { - break - } - return err - } - if int(tmpField[0]/2) != i { + if int(pos)+1 > len(data) || int(data[pos]>>1) != i { // The field number we got doesn't match what we expect, - // so a field was skipped. Rewind the reader and skip. + // so a field was skipped. m.Fields[i].ID = i m.Fields[i].Skipped = true - _, err := r.Seek(-1, io.SeekCurrent) - if err != nil { - return err - } continue } - m.Fields[i].ID = int(tmpField[0] >> 1) + m.Fields[i].ID = int(data[pos] >> 1) + pos++ + var n uint64 + var err error switch f := m.Fields[i].Type.(type) { case FieldIntFixed: - f.Value, err = decodeFixed(r, f.Length) + f.Value, n, err = decodeFixed(data, pos, f.Length) if err != nil { return err } m.Fields[i].Type = f case FieldUintVar: - val, err := decodeVar(r, true) + var val interface{} + val, n, err = decodeVar(data, pos, true) if err != nil { return err } @@ -185,7 +165,8 @@ func Unmarshal(data []byte, v interface{}) error { } m.Fields[i].Type = f case FieldIntVar: - val, err := decodeVar(r, false) + var val interface{} + val, n, err = decodeVar(data, pos, false) if err != nil { return err } @@ -196,7 +177,7 @@ func Unmarshal(data []byte, v interface{}) error { } m.Fields[i].Type = f case FieldString: - f.Value, err = decodeString(r) + f.Value, n, err = decodeString(data, pos) if err != nil { return err } @@ -204,6 +185,7 @@ func Unmarshal(data []byte, v interface{}) error { default: return fmt.Errorf("unsupported field type: %T", m.Fields[i].Type) } + pos = n } default: @@ -212,105 +194,88 @@ func Unmarshal(data []byte, v interface{}) error { return nil } -func decodeString(r io.Reader) (string, error) { - firstByte := make([]byte, 1) - _, err := r.Read(firstByte) - if err != nil { - return "", err +func decodeString(data []byte, pos uint64) (string, uint64, error) { + if len(data) < int(pos)+1 { + return "", pos, errors.New("string truncated, expected at least one byte") } - strBytes := make([]byte, firstByte[0] >> 1) - n, err := r.Read(strBytes) - if err != nil { - return "", err + strLen := int(data[pos] >> 1) + pos++ + if len(data) < int(pos)+strLen { + return "", pos, fmt.Errorf("string truncated, expected length: %d", strLen) } - if n != int(firstByte[0] >> 1) { - return "", fmt.Errorf("only read %d bytes, expected %d", n, firstByte[0]/2) - } - return string(strBytes), nil + return string(data[pos : pos+uint64(strLen)]), pos + uint64(strLen), nil } -func decodeFixed(r io.Reader, len int) ([]byte, error) { +func decodeFixed(data []byte, pos uint64, intlen int) ([]byte, uint64, error) { var b bytes.Buffer - tmpInt := make([]byte, 1) for { - _, err := r.Read(tmpInt) - if err != nil { - return nil, err + if len(data) < int(pos)+1 { + return b.Bytes(), pos, errors.New("data truncated") } - if tmpInt[0]%2 == 0 { - b.WriteByte(tmpInt[0] >> 1) + if data[pos]%2 == 0 { + b.WriteByte(data[pos] >> 1) } else { - tmpInt2 := make([]byte, 1) - _, err := r.Read(tmpInt2) - if err != nil { - return nil, err + if len(data) < int(pos)+2 { + return b.Bytes(), pos, errors.New("data truncated") } - switch tmpInt2[0] { + switch data[pos+1] { case 0x2: - b.WriteByte((tmpInt[0] >> 2) + 0x80) + b.WriteByte((data[pos] >> 2) + 0x80) case 0x3: - b.WriteByte((tmpInt[0] >> 2) + 0xc0) + b.WriteByte((data[pos] >> 2) + 0xc0) default: - return nil, fmt.Errorf("unknown decoding for %v", tmpInt2[0]) + return nil, pos, fmt.Errorf("unknown decoding for %v", data[pos]) } + pos++ } - if b.Len() == len { + pos++ + if b.Len() == intlen { break } } - return b.Bytes(), nil + return b.Bytes(), pos, nil } -func decodeVar(r io.ReadSeeker, unsigned bool) (interface{}, error) { - firstByte := make([]byte, 1) - _, err := r.Read(firstByte) - if err != nil { - return 0, err - } - tb := trailingOneBitCount(firstByte[0]) - _, err = r.Seek(-1, io.SeekCurrent) - if err != nil { - return 0, err - } - fieldBytes := make([]byte, tb+1) - n, err := r.Read(fieldBytes) - if err != nil { - return 0, err +func decodeVar(data []byte, pos uint64, unsigned bool) (interface{}, uint64, error) { + if len(data) < int(pos)+1 { + return 0, pos, errors.New("data truncated") } - if n != tb+1 { - return 0, fmt.Errorf("only read %d bytes, expected %d", n, tb+1) + flen := trailingOneBitCount(data[pos]) + 1 + if len(data) < int(pos)+flen { + return 0, pos, fmt.Errorf("truncated data, expected length: %d", flen) } var tNum uint64 - switch len(fieldBytes) { + switch flen { case 1: - tNum = uint64(fieldBytes[0]) + tNum = uint64(data[pos]) case 2: - tNum = uint64(binary.LittleEndian.Uint16(fieldBytes)) + tNum = uint64(binary.LittleEndian.Uint16(data[pos : int(pos)+flen])) case 3: tNum = uint64(binary.LittleEndian.Uint32( - slices.Concat(fieldBytes, []byte{0x0}))) + slices.Concat(data[pos:int(pos)+flen], []byte{0x0}))) case 4: - tNum = uint64(binary.LittleEndian.Uint32(fieldBytes)) + tNum = uint64(binary.LittleEndian.Uint32(data[pos : int(pos)+flen])) case 5: tNum = binary.LittleEndian.Uint64( - slices.Concat(fieldBytes, []byte{0x0, 0x0, 0x0})) + slices.Concat(data[pos:int(pos)+flen], []byte{0x0, 0x0, 0x0})) case 6: tNum = binary.LittleEndian.Uint64( - slices.Concat(fieldBytes, []byte{0x0, 0x0})) + slices.Concat(data[pos:int(pos)+flen], []byte{0x0, 0x0})) case 7: tNum = binary.LittleEndian.Uint64( - slices.Concat(fieldBytes, []byte{0x0})) + slices.Concat(data[pos:int(pos)+flen], []byte{0x0})) case 8: - tNum = binary.LittleEndian.Uint64(fieldBytes) + tNum = binary.LittleEndian.Uint64(data[pos : int(pos)+flen]) } + pos += uint64(flen) if unsigned { - return tNum >> (tb + 1), nil + return tNum >> flen, pos, nil } - if positive := (tNum>>(tb+1))&1 == 0; positive { - return int64(tNum >> (tb + 2)), nil + if positive := (tNum>>flen)&1 == 0; positive { + return int64(tNum >> (flen + 1)), pos, nil } - return int64(-(1 + (tNum >> (tb + 2)))), nil + return int64(-(1 + (tNum >> (flen + 1)))), pos, nil } func trailingOneBitCount(b byte) int { diff --git a/serialization/serialization_test.go b/serialization/serialization_test.go index 0212ffab7..a8e256135 100644 --- a/serialization/serialization_test.go +++ b/serialization/serialization_test.go @@ -1,7 +1,6 @@ package serialization import ( - "bytes" "testing" "github.com/stretchr/testify/require" @@ -49,13 +48,13 @@ func TestDecodeFixed(t *testing.T) { []byte{0xee, 0x81}, 16, []byte{}, - "EOF", + "data truncated", }, { []byte{}, 16, []byte{}, - "EOF", + "data truncated", }, { []byte{0xee, 0x81, 0x04, 0xc1, 0x02, 0x01, 0x03, 0x41, 0x03, 0x81, 0x03, 0xc1, 0x03, 0xc5, 0x03, 0x22, @@ -67,7 +66,7 @@ func TestDecodeFixed(t *testing.T) { } for _, tc := range testcases { - actual, err := decodeFixed(bytes.NewReader(tc.input), tc.len) + actual, _, err := decodeFixed(tc.input, 0, tc.len) if tc.err == "" { require.NoError(t, err) require.Equal(t, tc.result, actual) @@ -92,22 +91,22 @@ func TestDecodeString(t *testing.T) { { []byte{0x18, 0x61, 0x62, 0x63, 0x64, 0x65, 0x66, 0x67}, "", - "only read ", + "string truncated", }, { []byte{}, "", - "EOF", + "string truncated, expected at least one byte", }, { []byte{0x18}, "", - "EOF", + "string truncated, expected length", }, } for _, tc := range testcases { - s, err := decodeString(bytes.NewReader(tc.input)) + s, _, err := decodeString(tc.input, 0) if tc.err == "" { require.NoError(t, err) require.Equal(t, tc.result, s) @@ -128,13 +127,13 @@ func TestDecodeVar(t *testing.T) { []byte{}, false, 0, - "EOF", + "data truncated", }, { []byte{0xd9}, false, 0, - "only read ", + "truncated data", }, { []byte{0x4}, @@ -205,7 +204,7 @@ func TestDecodeVar(t *testing.T) { } for _, tc := range testcases { - r, err := decodeVar(bytes.NewReader(tc.input), tc.unsigned) + r, _, err := decodeVar(tc.input, 0, tc.unsigned) if tc.err == "" { require.NoError(t, err) require.Equal(t, tc.result, r, tc.result) From b7b8f38d2b68a76356e3279f4d5b4f603666a2ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Tue, 18 Feb 2025 14:24:23 +0100 Subject: [PATCH 16/36] updates --- replication/event.go | 123 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 115 insertions(+), 8 deletions(-) diff --git a/replication/event.go b/replication/event.go index 57fe414b5..2e1b23adb 100644 --- a/replication/event.go +++ b/replication/event.go @@ -421,6 +421,7 @@ func (e *QueryEvent) Dump(w io.Writer) { type GTIDEvent struct { CommitFlag uint8 SID []byte + Tag string GNO int64 LastCommitted int64 SequenceNumber int64 @@ -513,7 +514,11 @@ func (e *GTIDEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Commit flag: %d\n", e.CommitFlag) u, _ := uuid.FromBytes(e.SID) - fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO) + if e.Tag != "" { + fmt.Fprintf(w, "GTID_NEXT: %s:%s:%d\n", u.String(), e.Tag, e.GNO) + } else { + fmt.Fprintf(w, "GTID_NEXT: %s:%s%d\n", u.String(), e.GNO) + } fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted) fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber) fmt.Fprintf(w, "Immediate commmit timestamp: %d (%s)\n", e.ImmediateCommitTimestamp, fmtTime(e.ImmediateCommitTime())) @@ -544,7 +549,10 @@ func (e *GTIDEvent) OriginalCommitTime() time.Time { return microSecTimestampToTime(e.OriginalCommitTimestamp) } +// GtidTaggedLogEvent is for a GTID event with a tag. +// This is similar to GTIDEvent, but it has a tag and uses a different serialization format. type GtidTaggedLogEvent struct { + GTIDEvent msg serialization.Message } @@ -615,20 +623,119 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { return err } - return nil -} + f, err := e.msg.GetFieldByName("gtid_flags") + if err != nil { + return err + } + if v, ok := f.Type.(serialization.FieldIntFixed); ok { + e.CommitFlag = v.Value[0] + } else { + return errors.New("failed to get gtid_flags field") + } -func (e *GtidTaggedLogEvent) Dump(w io.Writer) { - fmt.Println(e.msg.String()) + f, err = e.msg.GetFieldByName("uuid") + if err != nil { + return err + } + if v, ok := f.Type.(serialization.FieldIntFixed); ok { + e.SID = v.Value + } else { + return errors.New("failed to get uuid field") + } - f, err := e.msg.GetFieldByName("immediate_server_version") + f, err = e.msg.GetFieldByName("gno") if err != nil { - return + return err + } + if v, ok := f.Type.(serialization.FieldIntVar); ok { + e.GNO = v.Value + } else { + return errors.New("failed to get gno field") } + f, err = e.msg.GetFieldByName("tag") + if err != nil { + return err + } + if v, ok := f.Type.(serialization.FieldString); ok { + e.Tag = v.Value + } else { + return errors.New("failed to get tag field") + } + + f, err = e.msg.GetFieldByName("last_committed") + if err != nil { + return err + } if v, ok := f.Type.(serialization.FieldIntVar); ok { - fmt.Printf("Immediate server version: %d\n", v.Value) + e.LastCommitted = v.Value + } else { + return errors.New("failed to get last_comitted field") + } + + f, err = e.msg.GetFieldByName("sequence_number") + if err != nil { + return err } + if v, ok := f.Type.(serialization.FieldIntVar); ok { + e.SequenceNumber = v.Value + } else { + return errors.New("failed to get sequence_number field") + } + + f, err = e.msg.GetFieldByName("immediate_commit_timestamp") + if err != nil { + return err + } + if v, ok := f.Type.(serialization.FieldUintVar); ok { + e.ImmediateCommitTimestamp = v.Value + } else { + return errors.New("failed to get immediate_commit_timestamp field") + } + + f, err = e.msg.GetFieldByName("original_commit_timestamp") + if err != nil { + return err + } + if v, ok := f.Type.(serialization.FieldUintVar); ok { + e.OriginalCommitTimestamp = v.Value + } else { + return errors.New("failed to get original_commit_timestamp field") + } + + f, err = e.msg.GetFieldByName("immediate_server_version") + if err != nil { + return err + } + if v, ok := f.Type.(serialization.FieldUintVar); ok { + e.ImmediateServerVersion = uint32(v.Value) + } else { + return errors.New("failed to get immediate_server_version field") + } + + f, err = e.msg.GetFieldByName("original_server_version") + if err != nil { + return err + } + if v, ok := f.Type.(serialization.FieldUintVar); ok { + e.OriginalServerVersion = uint32(v.Value) + } else { + return errors.New("failed to get original_server_version field") + } + + f, err = e.msg.GetFieldByName("transaction_length") + if err != nil { + return err + } + if v, ok := f.Type.(serialization.FieldUintVar); ok { + e.TransactionLength = v.Value + } else { + return errors.New("failed to get transaction_length field") + } + + // TODO: add and test commit_group_ticket + + return nil } type BeginLoadQueryEvent struct { From 28d3be8170a73d2622ca0452655799b84d250a98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Tue, 18 Feb 2025 14:28:32 +0100 Subject: [PATCH 17/36] fixup --- replication/event.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/replication/event.go b/replication/event.go index 2e1b23adb..b75ca14ac 100644 --- a/replication/event.go +++ b/replication/event.go @@ -517,7 +517,7 @@ func (e *GTIDEvent) Dump(w io.Writer) { if e.Tag != "" { fmt.Fprintf(w, "GTID_NEXT: %s:%s:%d\n", u.String(), e.Tag, e.GNO) } else { - fmt.Fprintf(w, "GTID_NEXT: %s:%s%d\n", u.String(), e.GNO) + fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO) } fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted) fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber) @@ -670,7 +670,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { if v, ok := f.Type.(serialization.FieldIntVar); ok { e.LastCommitted = v.Value } else { - return errors.New("failed to get last_comitted field") + return errors.New("failed to get last_committed field") } f, err = e.msg.GetFieldByName("sequence_number") From 4a8e4976ccbca8ef401901f62dd952f5effdb72a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Wed, 19 Feb 2025 09:14:10 +0100 Subject: [PATCH 18/36] More testing --- serialization/serialization_test.go | 180 +++++++++++++++++++++++++++- 1 file changed, 176 insertions(+), 4 deletions(-) diff --git a/serialization/serialization_test.go b/serialization/serialization_test.go index a8e256135..fc1f90d1c 100644 --- a/serialization/serialization_test.go +++ b/serialization/serialization_test.go @@ -154,7 +154,7 @@ func TestDecodeVar(t *testing.T) { "", }, { - // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html + // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlhtml // But converted to LE // unsigned integer, 65535 []byte{0b11111011, 0b11111111, 0b00000111}, @@ -163,7 +163,7 @@ func TestDecodeVar(t *testing.T) { "", }, { - // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html + // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlhtml // But converted to LE // signed integer, 65535 []byte{0b11110011, 0b11111111, 0b00001111}, @@ -172,7 +172,7 @@ func TestDecodeVar(t *testing.T) { "", }, { - // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html + // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlhtml // But converted to LE // signed integer, -65535 []byte{0b11101011, 0b11111111, 0b00001111}, @@ -181,7 +181,7 @@ func TestDecodeVar(t *testing.T) { "", }, { - // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlSerialization.html + // From the example on https://dev.mysql.com/doc/dev/mysql-server/latest/PageLibsMysqlhtml // But converted to LE // signed integer, 65536 []byte{0b11111011, 0b11111111, 0b00001111}, @@ -213,3 +213,175 @@ func TestDecodeVar(t *testing.T) { } } } + +func TestUmarshal_event1(t *testing.T) { + data := []byte{0x2, 0x76, 0x0, 0x0, 0x2, 0x2, 0x25, 0x2, 0xdc, 0xf0, 0x9, 0x2, 0x30, 0xf9, 0x3, 0x22, 0xbd, 0x3, + 0xad, 0x2, 0x21, 0x2, 0x44, 0x44, 0x5a, 0x68, 0x51, 0x3, 0x22, 0x4, 0x4, 0x6, 0xc, 0x66, 0x6f, 0x6f, 0x62, + 0x61, 0x7a, 0x8, 0x0, 0xa, 0x4, 0xc, 0x7f, 0x15, 0x83, 0x22, 0x2d, 0x5c, 0x2e, 0x6, 0x10, 0x49, 0x3, 0x12, + 0xc3, 0x2, 0xb} + + msg := Message{ + Format: Format{ + Fields: []Field{ + { + Name: "gtid_flags", + Type: FieldIntFixed{ + Length: 1, + }, + }, + { + Name: "uuid", + Type: FieldIntFixed{ + Length: 16, + }, + }, + { + Name: "gno", + Type: FieldIntVar{}, + }, + { + Name: "tag", + Type: FieldString{}, + }, + { + Name: "last_committed", + Type: FieldIntVar{}, + }, + { + Name: "sequence_number", + Type: FieldIntVar{}, + }, + { + Name: "immediate_commit_timestamp", + Type: FieldUintVar{}, + }, + { + Name: "original_commit_timestamp", + Type: FieldUintVar{}, + Optional: true, + }, + { + Name: "transaction_length", + Type: FieldUintVar{}, + }, + { + Name: "immediate_server_version", + Type: FieldUintVar{}, + }, + { + Name: "original_server_version", + Type: FieldUintVar{}, + Optional: true, + }, + { + Name: "commit_group_ticket", + Optional: true, + }, + }, + }, + } + + expected := Message{ + Version: 1, + Format: Format{ + Size: 59, + Fields: []Field{ + { + Name: "gtid_flags", + ID: 0, + Type: FieldIntFixed{ + Length: 1, + Value: []uint8{01}, + }, + }, + { + Name: "uuid", + ID: 1, + Type: FieldIntFixed{ + Length: 16, + Value: []uint8{0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, + 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11}, + }, + }, + { + Name: "gno", + ID: 2, + Type: FieldIntVar{ + Value: 1, + }, + }, + { + Name: "tag", + ID: 3, + Type: FieldString{ + Value: "foobaz", + }, + }, + { + Name: "last_committed", + ID: 4, + Type: FieldIntVar{ + Value: 0, + }, + }, + { + Name: "sequence_number", + ID: 5, + Type: FieldIntVar{ + Value: 1, + }, + }, + { + Name: "immediate_commit_timestamp", + ID: 6, + Type: FieldUintVar{ + Value: 1739823289369365, + }, + }, + { + Name: "original_commit_timestamp", + ID: 7, + Type: FieldUintVar{}, + Optional: true, + Skipped: true, + }, + { + Name: "transaction_length", + ID: 8, + Type: FieldUintVar{ + Value: 210, + }, + }, + { + Name: "immediate_server_version", + ID: 9, + Type: FieldUintVar{ + Value: 90200, + }, + }, + { + Name: "original_server_version", + ID: 10, + Type: FieldUintVar{}, + Optional: true, + Skipped: true, + }, + { + Name: "commit_group_ticket", + ID: 11, + Optional: true, + Skipped: true, + }, + }, + }, + } + + err := Unmarshal(data, &msg) + require.NoError(t, err) + + for i, f := range msg.Format.Fields { + require.Equal(t, expected.Format.Fields[i], f) + } + + require.Equal(t, expected, msg) +} From c74b18edb2de90e72e2f76a36dd2860381a969e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 21 Feb 2025 16:10:04 +0100 Subject: [PATCH 19/36] Update serialization/serialization.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Philip Dubé --- serialization/serialization.go | 25 +++---------------------- 1 file changed, 3 insertions(+), 22 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index 0740a7234..466fbc5ed 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -246,28 +246,9 @@ func decodeVar(data []byte, pos uint64, unsigned bool) (interface{}, uint64, err return 0, pos, fmt.Errorf("truncated data, expected length: %d", flen) } var tNum uint64 - switch flen { - case 1: - tNum = uint64(data[pos]) - case 2: - tNum = uint64(binary.LittleEndian.Uint16(data[pos : int(pos)+flen])) - case 3: - tNum = uint64(binary.LittleEndian.Uint32( - slices.Concat(data[pos:int(pos)+flen], []byte{0x0}))) - case 4: - tNum = uint64(binary.LittleEndian.Uint32(data[pos : int(pos)+flen])) - case 5: - tNum = binary.LittleEndian.Uint64( - slices.Concat(data[pos:int(pos)+flen], []byte{0x0, 0x0, 0x0})) - case 6: - tNum = binary.LittleEndian.Uint64( - slices.Concat(data[pos:int(pos)+flen], []byte{0x0, 0x0})) - case 7: - tNum = binary.LittleEndian.Uint64( - slices.Concat(data[pos:int(pos)+flen], []byte{0x0})) - case 8: - tNum = binary.LittleEndian.Uint64(data[pos : int(pos)+flen]) - } + var tNumBytes [8]byte + copy(tNumBytes[:], data[pos:int(pos)+flen]) + tNum := binary.LittleEndian.Uint64(tNumBytes[:]) pos += uint64(flen) if unsigned { return tNum >> flen, pos, nil From 86bf3fc7e935ed0e17db79d4a84d4704d7240938 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 21 Feb 2025 16:12:34 +0100 Subject: [PATCH 20/36] fixup --- serialization/serialization.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index 466fbc5ed..02df0e4a1 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -11,7 +11,6 @@ import ( "errors" "fmt" "math/bits" - "slices" "strings" ) @@ -245,7 +244,6 @@ func decodeVar(data []byte, pos uint64, unsigned bool) (interface{}, uint64, err if len(data) < int(pos)+flen { return 0, pos, fmt.Errorf("truncated data, expected length: %d", flen) } - var tNum uint64 var tNumBytes [8]byte copy(tNumBytes[:], data[pos:int(pos)+flen]) tNum := binary.LittleEndian.Uint64(tNumBytes[:]) From 8d78a72145dfec848439c861afda59f0bdea4c2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 21 Feb 2025 16:16:04 +0100 Subject: [PATCH 21/36] formatting --- serialization/serialization_test.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/serialization/serialization_test.go b/serialization/serialization_test.go index fc1f90d1c..51b69ba5c 100644 --- a/serialization/serialization_test.go +++ b/serialization/serialization_test.go @@ -38,8 +38,10 @@ func TestDecodeFixed(t *testing.T) { err string }{ { - []byte{0xee, 0x81, 0x02, 0xc1, 0x02, 0x01, 0x03, 0x41, 0x03, 0x81, 0x03, 0xc1, 0x03, 0xc5, 0x03, 0x22, - 0x22, 0xee, 0xfd, 0x03, 0xee, 0xfd, 0x03, 0xee, 0xfd, 0x03}, + []byte{ + 0xee, 0x81, 0x02, 0xc1, 0x02, 0x01, 0x03, 0x41, 0x03, 0x81, 0x03, 0xc1, 0x03, 0xc5, 0x03, 0x22, + 0x22, 0xee, 0xfd, 0x03, 0xee, 0xfd, 0x03, 0xee, 0xfd, 0x03, + }, 16, []byte{0x77, 0xa0, 0xb0, 0xc0, 0xd0, 0xe0, 0xf0, 0xf1, 0x11, 0x11, 0x77, 0xff, 0x77, 0xff, 0x77, 0xff}, "", @@ -57,8 +59,10 @@ func TestDecodeFixed(t *testing.T) { "data truncated", }, { - []byte{0xee, 0x81, 0x04, 0xc1, 0x02, 0x01, 0x03, 0x41, 0x03, 0x81, 0x03, 0xc1, 0x03, 0xc5, 0x03, 0x22, - 0x22, 0xee, 0xfd, 0x03, 0xee, 0xfd, 0x03, 0xee, 0xfd, 0x03}, + []byte{ + 0xee, 0x81, 0x04, 0xc1, 0x02, 0x01, 0x03, 0x41, 0x03, 0x81, 0x03, 0xc1, 0x03, 0xc5, 0x03, 0x22, + 0x22, 0xee, 0xfd, 0x03, 0xee, 0xfd, 0x03, 0xee, 0xfd, 0x03, + }, 16, []byte{}, "unknown decoding for", @@ -148,7 +152,7 @@ func TestDecodeVar(t *testing.T) { "", }, { - []byte{0xc3, 02, 0x0b}, + []byte{0xc3, 0o2, 0x0b}, true, uint64(90200), "", @@ -215,10 +219,12 @@ func TestDecodeVar(t *testing.T) { } func TestUmarshal_event1(t *testing.T) { - data := []byte{0x2, 0x76, 0x0, 0x0, 0x2, 0x2, 0x25, 0x2, 0xdc, 0xf0, 0x9, 0x2, 0x30, 0xf9, 0x3, 0x22, 0xbd, 0x3, + data := []byte{ + 0x2, 0x76, 0x0, 0x0, 0x2, 0x2, 0x25, 0x2, 0xdc, 0xf0, 0x9, 0x2, 0x30, 0xf9, 0x3, 0x22, 0xbd, 0x3, 0xad, 0x2, 0x21, 0x2, 0x44, 0x44, 0x5a, 0x68, 0x51, 0x3, 0x22, 0x4, 0x4, 0x6, 0xc, 0x66, 0x6f, 0x6f, 0x62, 0x61, 0x7a, 0x8, 0x0, 0xa, 0x4, 0xc, 0x7f, 0x15, 0x83, 0x22, 0x2d, 0x5c, 0x2e, 0x6, 0x10, 0x49, 0x3, 0x12, - 0xc3, 0x2, 0xb} + 0xc3, 0x2, 0xb, + } msg := Message{ Format: Format{ @@ -291,7 +297,7 @@ func TestUmarshal_event1(t *testing.T) { ID: 0, Type: FieldIntFixed{ Length: 1, - Value: []uint8{01}, + Value: []uint8{0o1}, }, }, { @@ -299,8 +305,10 @@ func TestUmarshal_event1(t *testing.T) { ID: 1, Type: FieldIntFixed{ Length: 16, - Value: []uint8{0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, - 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11}, + Value: []uint8{ + 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, + 0x88, 0x22, 0x22, 0x2d, 0x34, 0xd4, 0x11, + }, }, }, { From ba1dcc9449c72480de38a251405a988dfe635064 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 21 Feb 2025 16:25:23 +0100 Subject: [PATCH 22/36] Update based on review --- serialization/serialization.go | 1 + 1 file changed, 1 insertion(+) diff --git a/serialization/serialization.go b/serialization/serialization.go index 02df0e4a1..9dd967f2d 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -207,6 +207,7 @@ func decodeString(data []byte, pos uint64) (string, uint64, error) { func decodeFixed(data []byte, pos uint64, intlen int) ([]byte, uint64, error) { var b bytes.Buffer + b.Grow(intlen * 2) // output is between 1 and 2 times that of the input for { if len(data) < int(pos)+1 { From 1558b91d948e8b5e520b1517d158eae5217c02ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Sun, 23 Feb 2025 09:44:04 +0100 Subject: [PATCH 23/36] Update serialization/serialization.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- serialization/serialization.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index 9dd967f2d..48f0bab95 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -98,7 +98,7 @@ func (f FieldIntVar) String() string { return fmt.Sprintf("%d", f.Value) } -// FieldUintVar is using the usigned integer variant of the 'varlen_integer_format' +// FieldUintVar is using the unsigned integer variant of the 'varlen_integer_format' // and encodes a value as a byte sequence of 1-9 bytes depending on the value. type FieldUintVar struct { Value uint64 From ab00980c0529a986bb083c75a2781948bef031ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Thu, 27 Feb 2025 15:23:40 +0100 Subject: [PATCH 24/36] Don't store the message in the event struct --- replication/event.go | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/replication/event.go b/replication/event.go index 73a9c9a94..0b49c93a8 100644 --- a/replication/event.go +++ b/replication/event.go @@ -553,11 +553,10 @@ func (e *GTIDEvent) OriginalCommitTime() time.Time { // This is similar to GTIDEvent, but it has a tag and uses a different serialization format. type GtidTaggedLogEvent struct { GTIDEvent - msg serialization.Message } func (e *GtidTaggedLogEvent) Decode(data []byte) error { - e.msg = serialization.Message{ + msg := serialization.Message{ Format: serialization.Format{ Fields: []serialization.Field{ { @@ -618,12 +617,12 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { }, } - err := serialization.Unmarshal(data, &e.msg) + err := serialization.Unmarshal(data, &msg) if err != nil { return err } - f, err := e.msg.GetFieldByName("gtid_flags") + f, err := msg.GetFieldByName("gtid_flags") if err != nil { return err } @@ -633,7 +632,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { return errors.New("failed to get gtid_flags field") } - f, err = e.msg.GetFieldByName("uuid") + f, err = msg.GetFieldByName("uuid") if err != nil { return err } @@ -643,7 +642,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { return errors.New("failed to get uuid field") } - f, err = e.msg.GetFieldByName("gno") + f, err = msg.GetFieldByName("gno") if err != nil { return err } @@ -653,7 +652,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { return errors.New("failed to get gno field") } - f, err = e.msg.GetFieldByName("tag") + f, err = msg.GetFieldByName("tag") if err != nil { return err } @@ -663,7 +662,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { return errors.New("failed to get tag field") } - f, err = e.msg.GetFieldByName("last_committed") + f, err = msg.GetFieldByName("last_committed") if err != nil { return err } @@ -673,7 +672,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { return errors.New("failed to get last_committed field") } - f, err = e.msg.GetFieldByName("sequence_number") + f, err = msg.GetFieldByName("sequence_number") if err != nil { return err } @@ -683,7 +682,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { return errors.New("failed to get sequence_number field") } - f, err = e.msg.GetFieldByName("immediate_commit_timestamp") + f, err = msg.GetFieldByName("immediate_commit_timestamp") if err != nil { return err } @@ -693,7 +692,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { return errors.New("failed to get immediate_commit_timestamp field") } - f, err = e.msg.GetFieldByName("original_commit_timestamp") + f, err = msg.GetFieldByName("original_commit_timestamp") if err != nil { return err } @@ -703,7 +702,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { return errors.New("failed to get original_commit_timestamp field") } - f, err = e.msg.GetFieldByName("immediate_server_version") + f, err = msg.GetFieldByName("immediate_server_version") if err != nil { return err } @@ -713,7 +712,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { return errors.New("failed to get immediate_server_version field") } - f, err = e.msg.GetFieldByName("original_server_version") + f, err = msg.GetFieldByName("original_server_version") if err != nil { return err } @@ -723,7 +722,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { return errors.New("failed to get original_server_version field") } - f, err = e.msg.GetFieldByName("transaction_length") + f, err = msg.GetFieldByName("transaction_length") if err != nil { return err } From 80d1d62479b4d400a51b3547bff46cec9a92b2b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Thu, 27 Feb 2025 15:59:48 +0100 Subject: [PATCH 25/36] use map to speedup lookup of fields --- serialization/serialization.go | 17 +++++++++++------ serialization/serialization_test.go | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index 48f0bab95..34a05d35a 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -16,8 +16,9 @@ import ( // Message is a mysql::serialization message type Message struct { - Version uint8 // >= 0 - Format Format + Version uint8 // >= 0 + Format Format + fieldIndex map[string]int } func (m *Message) String() (text string) { @@ -30,10 +31,8 @@ func (m *Message) String() (text string) { // GetFieldByName returns a field if the name matches and an error if there is no match func (m *Message) GetFieldByName(name string) (Field, error) { - for _, f := range m.Format.Fields { - if f.Name == name { - return f, nil - } + if idx, ok := m.fieldIndex[name]; ok { + return m.Format.Fields[idx], nil } return Field{}, fmt.Errorf("field not found: %s", name) } @@ -125,6 +124,12 @@ func Unmarshal(data []byte, v interface{}) error { if err != nil { return err } + if m.fieldIndex == nil { + m.fieldIndex = make(map[string]int, len(m.Format.Fields)) + } + for _, field := range m.Format.Fields { + m.fieldIndex[field.Name] = field.ID + } case *Format: pos := uint64(0) m.Size = uint64(data[pos] >> 1) diff --git a/serialization/serialization_test.go b/serialization/serialization_test.go index 51b69ba5c..9b731a6af 100644 --- a/serialization/serialization_test.go +++ b/serialization/serialization_test.go @@ -382,6 +382,20 @@ func TestUmarshal_event1(t *testing.T) { }, }, }, + fieldIndex: map[string]int{ + "gtid_flags": 0, + "uuid": 1, + "gno": 2, + "tag": 3, + "last_committed": 4, + "sequence_number": 5, + "immediate_commit_timestamp": 6, + "original_commit_timestamp": 7, + "transaction_length": 8, + "immediate_server_version": 9, + "original_server_version": 10, + "commit_group_ticket": 11, + }, } err := Unmarshal(data, &msg) @@ -392,4 +406,8 @@ func TestUmarshal_event1(t *testing.T) { } require.Equal(t, expected, msg) + + sv, err := msg.GetFieldByName("immediate_server_version") + require.NoError(t, err) + require.Equal(t, 9, sv.ID) } From 89b293f358713de2ca1c27157eace2bc1104128b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Thu, 27 Feb 2025 16:55:52 +0100 Subject: [PATCH 26/36] Use decode method --- serialization/serialization.go | 138 ++++++++++++++++------------ serialization/serialization_test.go | 14 ++- 2 files changed, 87 insertions(+), 65 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index 34a05d35a..2b71d76a6 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -87,6 +87,39 @@ func (f FieldIntFixed) String() string { return fmt.Sprintf("0x%x", f.Value) } +func (f *FieldIntFixed) decode(data []byte, pos uint64) (uint64, error) { + var b bytes.Buffer + b.Grow(f.Length * 2) // output is between 1 and 2 times that of the input + + for { + if len(data) < int(pos)+1 { + return pos, errors.New("data truncated") + } + if data[pos]%2 == 0 { + b.WriteByte(data[pos] >> 1) + } else { + if len(data) < int(pos)+2 { + return pos, errors.New("data truncated") + } + switch data[pos+1] { + case 0x2: + b.WriteByte((data[pos] >> 2) + 0x80) + case 0x3: + b.WriteByte((data[pos] >> 2) + 0xc0) + default: + return pos, fmt.Errorf("unknown decoding for %v", data[pos]) + } + pos++ + } + pos++ + if b.Len() == f.Length { + break + } + } + f.Value = b.Bytes() + return pos, nil +} + // FieldIntVar is using the signed integer variant of the 'varlen_integer_format' // and encodes a value as a byte sequence of 1-9 bytes depending on the value. type FieldIntVar struct { @@ -97,6 +130,20 @@ func (f FieldIntVar) String() string { return fmt.Sprintf("%d", f.Value) } +func (f *FieldIntVar) decode(data []byte, pos uint64) (uint64, error) { + var val interface{} + val, pos, err := decodeVar(data, pos, false) + if err != nil { + return pos, err + } + if intval, ok := val.(int64); ok { + f.Value = intval + } else { + return pos, errors.New("unexpected type, expecting int64") + } + return pos, nil +} + // FieldUintVar is using the unsigned integer variant of the 'varlen_integer_format' // and encodes a value as a byte sequence of 1-9 bytes depending on the value. type FieldUintVar struct { @@ -107,11 +154,38 @@ func (f FieldUintVar) String() string { return fmt.Sprintf("%d", f.Value) } +func (f *FieldUintVar) decode(data []byte, pos uint64) (uint64, error) { + var val interface{} + val, pos, err := decodeVar(data, pos, true) + if err != nil { + return pos, err + } + if uintval, ok := val.(uint64); ok { + f.Value = uintval + } else { + return pos, errors.New("unexpected type, expecting uint64") + } + return pos, nil +} + // FieldString is a 'string_format' field type FieldString struct { Value string } +func (f *FieldString) decode(data []byte, pos uint64) (uint64, error) { + if len(data) < int(pos)+1 { + return pos, errors.New("string truncated, expected at least one byte") + } + strLen := int(data[pos] >> 1) + pos++ + if len(data) < int(pos)+strLen { + return pos, fmt.Errorf("string truncated, expected length: %d", strLen) + } + f.Value = string(data[pos : pos+uint64(strLen)]) + return pos + uint64(strLen), nil +} + func (f FieldString) String() string { return f.Value } @@ -151,37 +225,25 @@ func Unmarshal(data []byte, v interface{}) error { var err error switch f := m.Fields[i].Type.(type) { case FieldIntFixed: - f.Value, n, err = decodeFixed(data, pos, f.Length) + n, err = f.decode(data, pos) if err != nil { return err } m.Fields[i].Type = f case FieldUintVar: - var val interface{} - val, n, err = decodeVar(data, pos, true) + n, err = f.decode(data, pos) if err != nil { return err } - if uintval, ok := val.(uint64); ok { - f.Value = uintval - } else { - return errors.New("unexpected type, expecting uint64") - } m.Fields[i].Type = f case FieldIntVar: - var val interface{} - val, n, err = decodeVar(data, pos, false) + n, err = f.decode(data, pos) if err != nil { return err } - if intval, ok := val.(int64); ok { - f.Value = intval - } else { - return errors.New("unexpected type, expecting int64") - } m.Fields[i].Type = f case FieldString: - f.Value, n, err = decodeString(data, pos) + n, err = f.decode(data, pos) if err != nil { return err } @@ -198,50 +260,6 @@ func Unmarshal(data []byte, v interface{}) error { return nil } -func decodeString(data []byte, pos uint64) (string, uint64, error) { - if len(data) < int(pos)+1 { - return "", pos, errors.New("string truncated, expected at least one byte") - } - strLen := int(data[pos] >> 1) - pos++ - if len(data) < int(pos)+strLen { - return "", pos, fmt.Errorf("string truncated, expected length: %d", strLen) - } - return string(data[pos : pos+uint64(strLen)]), pos + uint64(strLen), nil -} - -func decodeFixed(data []byte, pos uint64, intlen int) ([]byte, uint64, error) { - var b bytes.Buffer - b.Grow(intlen * 2) // output is between 1 and 2 times that of the input - - for { - if len(data) < int(pos)+1 { - return b.Bytes(), pos, errors.New("data truncated") - } - if data[pos]%2 == 0 { - b.WriteByte(data[pos] >> 1) - } else { - if len(data) < int(pos)+2 { - return b.Bytes(), pos, errors.New("data truncated") - } - switch data[pos+1] { - case 0x2: - b.WriteByte((data[pos] >> 2) + 0x80) - case 0x3: - b.WriteByte((data[pos] >> 2) + 0xc0) - default: - return nil, pos, fmt.Errorf("unknown decoding for %v", data[pos]) - } - pos++ - } - pos++ - if b.Len() == intlen { - break - } - } - return b.Bytes(), pos, nil -} - func decodeVar(data []byte, pos uint64, unsigned bool) (interface{}, uint64, error) { if len(data) < int(pos)+1 { return 0, pos, errors.New("data truncated") diff --git a/serialization/serialization_test.go b/serialization/serialization_test.go index 9b731a6af..7134d04dc 100644 --- a/serialization/serialization_test.go +++ b/serialization/serialization_test.go @@ -70,11 +70,14 @@ func TestDecodeFixed(t *testing.T) { } for _, tc := range testcases { - actual, _, err := decodeFixed(tc.input, 0, tc.len) + f := FieldIntFixed{ + Length: tc.len, + } + _, err := f.decode(tc.input, 0) if tc.err == "" { require.NoError(t, err) - require.Equal(t, tc.result, actual) - require.Equal(t, tc.len, len(actual)) + require.Equal(t, tc.result, f.Value) + require.Equal(t, tc.len, len(f.Value)) } else { require.ErrorContains(t, err, tc.err) } @@ -110,10 +113,11 @@ func TestDecodeString(t *testing.T) { } for _, tc := range testcases { - s, _, err := decodeString(tc.input, 0) + f := FieldString{} + _, err := f.decode(tc.input, 0) if tc.err == "" { require.NoError(t, err) - require.Equal(t, tc.result, s) + require.Equal(t, tc.result, f.Value) } else { require.ErrorContains(t, err, tc.err) } From a45d8797836917ef39d5c189bd69309b7c048636 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 28 Feb 2025 09:40:13 +0100 Subject: [PATCH 27/36] Change Size and LastNonIgnoreableField to uint8 --- serialization/serialization.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index 2b71d76a6..d25a9c811 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -39,8 +39,8 @@ func (m *Message) GetFieldByName(name string) (Field, error) { // Format is describing a `message_format` type Format struct { - Size uint64 - LastNonIgnorableField int + Size uint8 + LastNonIgnorableField uint8 Fields []Field } @@ -206,9 +206,9 @@ func Unmarshal(data []byte, v interface{}) error { } case *Format: pos := uint64(0) - m.Size = uint64(data[pos] >> 1) + m.Size = data[pos] >> 1 pos++ - m.LastNonIgnorableField = int(data[pos] >> 1) + m.LastNonIgnorableField = data[pos] >> 1 pos++ for i := 0; i < len(m.Fields); i++ { From 085c631abb9a07f1186fb4b77d4441eb7d62a1f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 28 Feb 2025 09:45:34 +0100 Subject: [PATCH 28/36] Change field ID to uint8 --- serialization/serialization.go | 10 +++++----- serialization/serialization_test.go | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index d25a9c811..a494ce198 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -18,7 +18,7 @@ import ( type Message struct { Version uint8 // >= 0 Format Format - fieldIndex map[string]int + fieldIndex map[string]uint8 } func (m *Message) String() (text string) { @@ -59,7 +59,7 @@ func (f *Format) String() (text string) { // Field represents a `message_field` type Field struct { - ID int + ID uint8 Type FieldType Optional bool Name string @@ -199,7 +199,7 @@ func Unmarshal(data []byte, v interface{}) error { return err } if m.fieldIndex == nil { - m.fieldIndex = make(map[string]int, len(m.Format.Fields)) + m.fieldIndex = make(map[string]uint8, len(m.Format.Fields)) } for _, field := range m.Format.Fields { m.fieldIndex[field.Name] = field.ID @@ -215,11 +215,11 @@ func Unmarshal(data []byte, v interface{}) error { if int(pos)+1 > len(data) || int(data[pos]>>1) != i { // The field number we got doesn't match what we expect, // so a field was skipped. - m.Fields[i].ID = i + m.Fields[i].ID = uint8(i) m.Fields[i].Skipped = true continue } - m.Fields[i].ID = int(data[pos] >> 1) + m.Fields[i].ID = uint8(data[pos] >> 1) pos++ var n uint64 var err error diff --git a/serialization/serialization_test.go b/serialization/serialization_test.go index 7134d04dc..fc6955733 100644 --- a/serialization/serialization_test.go +++ b/serialization/serialization_test.go @@ -386,7 +386,7 @@ func TestUmarshal_event1(t *testing.T) { }, }, }, - fieldIndex: map[string]int{ + fieldIndex: map[string]uint8{ "gtid_flags": 0, "uuid": 1, "gno": 2, @@ -413,5 +413,5 @@ func TestUmarshal_event1(t *testing.T) { sv, err := msg.GetFieldByName("immediate_server_version") require.NoError(t, err) - require.Equal(t, 9, sv.ID) + require.Equal(t, uint8(9), sv.ID) } From ac2d26f81d16777dcd98d0272474465804c43cc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 28 Feb 2025 09:57:19 +0100 Subject: [PATCH 29/36] Use stringParts as suggested in review --- serialization/serialization.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index a494ce198..a32f72b59 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -23,7 +23,7 @@ type Message struct { func (m *Message) String() (text string) { text += fmt.Sprintf("Message (version: %d)", m.Version) - for _, line := range strings.Split(m.Format.String(), "\n") { + for _, line := range m.Format.stringParts() { text += "\n " + line } return @@ -45,16 +45,22 @@ type Format struct { } func (f *Format) String() (text string) { - text += fmt.Sprintf("Format (Size: %d, LastNonIgnorableField: %d)\n", - f.Size, f.LastNonIgnorableField) + return strings.Join(f.stringParts(), "\n") +} + +func (f *Format) stringParts() (parts []string) { + + parts = append(parts, fmt.Sprintf("Format (Size: %d, LastNonIgnorableField: %d)", + f.Size, f.LastNonIgnorableField)) + for _, f := range f.Fields { - text += fmt.Sprintf("Field %02d (Name: %s, Skipped: %t, Type: %T)\n", - f.ID, f.Name, f.Skipped, f.Type) + parts = append(parts, fmt.Sprintf("Field %02d (Name: %s, Skipped: %t, Type: %T)", + f.ID, f.Name, f.Skipped, f.Type)) if f.Type != nil { - text += fmt.Sprintf(" Value: %s\n", f.Type.String()) + parts = append(parts, fmt.Sprintf(" Value: %s", f.Type.String())) } } - return text + return } // Field represents a `message_field` From 3a1789b7acb8e629f40f00f2bf690aaea8c7f0ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 28 Feb 2025 10:00:37 +0100 Subject: [PATCH 30/36] fixup --- serialization/serialization.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index a32f72b59..b8e3ca9f0 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -49,7 +49,6 @@ func (f *Format) String() (text string) { } func (f *Format) stringParts() (parts []string) { - parts = append(parts, fmt.Sprintf("Format (Size: %d, LastNonIgnorableField: %d)", f.Size, f.LastNonIgnorableField)) @@ -225,7 +224,7 @@ func Unmarshal(data []byte, v interface{}) error { m.Fields[i].Skipped = true continue } - m.Fields[i].ID = uint8(data[pos] >> 1) + m.Fields[i].ID = data[pos] >> 1 pos++ var n uint64 var err error From 14de33cc5dba633edb7e04fe07638a25f7d3d69d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 28 Feb 2025 11:45:20 +0100 Subject: [PATCH 31/36] Set original values to immediate values if skipped --- replication/event.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/replication/event.go b/replication/event.go index 0b49c93a8..f2056e78f 100644 --- a/replication/event.go +++ b/replication/event.go @@ -697,7 +697,11 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { return err } if v, ok := f.Type.(serialization.FieldUintVar); ok { - e.OriginalCommitTimestamp = v.Value + if f.Skipped { + e.OriginalCommitTimestamp = e.ImmediateCommitTimestamp + } else { + e.OriginalCommitTimestamp = v.Value + } } else { return errors.New("failed to get original_commit_timestamp field") } @@ -717,7 +721,11 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { return err } if v, ok := f.Type.(serialization.FieldUintVar); ok { - e.OriginalServerVersion = uint32(v.Value) + if f.Skipped { + e.OriginalServerVersion = e.ImmediateServerVersion + } else { + e.OriginalServerVersion = uint32(v.Value) + } } else { return errors.New("failed to get original_server_version field") } From 173111f56a2700bc90688115a1e7e31586a34594 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Sat, 1 Mar 2025 10:54:12 +0100 Subject: [PATCH 32/36] Update serialization/serialization.go Co-authored-by: lance6716 --- serialization/serialization.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/serialization/serialization.go b/serialization/serialization.go index b8e3ca9f0..46f393b3f 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -21,12 +21,10 @@ type Message struct { fieldIndex map[string]uint8 } -func (m *Message) String() (text string) { - text += fmt.Sprintf("Message (version: %d)", m.Version) - for _, line := range m.Format.stringParts() { - text += "\n " + line - } - return +func (m *Message) String() string { + parts := []string{fmt.Sprintf("Message (version: %d)", m.Version)} + parts = append(parts, m.Format.stringParts()...) + return strings.Join(parts, "\n ") } // GetFieldByName returns a field if the name matches and an error if there is no match From f12936401f41e5fe359174793ab9b9a237b0127a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Sat, 1 Mar 2025 10:55:13 +0100 Subject: [PATCH 33/36] Update serialization/serialization.go Co-authored-by: lance6716 --- serialization/serialization.go | 1 + 1 file changed, 1 insertion(+) diff --git a/serialization/serialization.go b/serialization/serialization.go index 46f393b3f..70a012af9 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -47,6 +47,7 @@ func (f *Format) String() (text string) { } func (f *Format) stringParts() (parts []string) { + parts = make([]string, 0, len(f.Fields)*2+1) parts = append(parts, fmt.Sprintf("Format (Size: %d, LastNonIgnorableField: %d)", f.Size, f.LastNonIgnorableField)) From b7e35fe23cc01eced6864c35c18143621b436990 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 3 Mar 2025 10:56:34 +0800 Subject: [PATCH 34/36] test my suggestion Signed-off-by: lance6716 --- replication/event.go | 44 +++++++++++++++++----------------- serialization/serialization.go | 34 ++++---------------------- 2 files changed, 26 insertions(+), 52 deletions(-) diff --git a/replication/event.go b/replication/event.go index f2056e78f..18dbf6958 100644 --- a/replication/event.go +++ b/replication/event.go @@ -561,52 +561,52 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { Fields: []serialization.Field{ { Name: "gtid_flags", - Type: serialization.FieldIntFixed{ + Type: &serialization.FieldIntFixed{ Length: 1, }, }, { Name: "uuid", - Type: serialization.FieldIntFixed{ + Type: &serialization.FieldIntFixed{ Length: 16, }, }, { Name: "gno", - Type: serialization.FieldIntVar{}, + Type: &serialization.FieldIntVar{}, }, { Name: "tag", - Type: serialization.FieldString{}, + Type: &serialization.FieldString{}, }, { Name: "last_committed", - Type: serialization.FieldIntVar{}, + Type: &serialization.FieldIntVar{}, }, { Name: "sequence_number", - Type: serialization.FieldIntVar{}, + Type: &serialization.FieldIntVar{}, }, { Name: "immediate_commit_timestamp", - Type: serialization.FieldUintVar{}, + Type: &serialization.FieldUintVar{}, }, { Name: "original_commit_timestamp", - Type: serialization.FieldUintVar{}, + Type: &serialization.FieldUintVar{}, Optional: true, }, { Name: "transaction_length", - Type: serialization.FieldUintVar{}, + Type: &serialization.FieldUintVar{}, }, { Name: "immediate_server_version", - Type: serialization.FieldUintVar{}, + Type: &serialization.FieldUintVar{}, }, { Name: "original_server_version", - Type: serialization.FieldUintVar{}, + Type: &serialization.FieldUintVar{}, Optional: true, }, { @@ -626,7 +626,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { if err != nil { return err } - if v, ok := f.Type.(serialization.FieldIntFixed); ok { + if v, ok := f.Type.(*serialization.FieldIntFixed); ok { e.CommitFlag = v.Value[0] } else { return errors.New("failed to get gtid_flags field") @@ -636,7 +636,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { if err != nil { return err } - if v, ok := f.Type.(serialization.FieldIntFixed); ok { + if v, ok := f.Type.(*serialization.FieldIntFixed); ok { e.SID = v.Value } else { return errors.New("failed to get uuid field") @@ -646,7 +646,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { if err != nil { return err } - if v, ok := f.Type.(serialization.FieldIntVar); ok { + if v, ok := f.Type.(*serialization.FieldIntVar); ok { e.GNO = v.Value } else { return errors.New("failed to get gno field") @@ -656,7 +656,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { if err != nil { return err } - if v, ok := f.Type.(serialization.FieldString); ok { + if v, ok := f.Type.(*serialization.FieldString); ok { e.Tag = v.Value } else { return errors.New("failed to get tag field") @@ -666,7 +666,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { if err != nil { return err } - if v, ok := f.Type.(serialization.FieldIntVar); ok { + if v, ok := f.Type.(*serialization.FieldIntVar); ok { e.LastCommitted = v.Value } else { return errors.New("failed to get last_committed field") @@ -676,7 +676,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { if err != nil { return err } - if v, ok := f.Type.(serialization.FieldIntVar); ok { + if v, ok := f.Type.(*serialization.FieldIntVar); ok { e.SequenceNumber = v.Value } else { return errors.New("failed to get sequence_number field") @@ -686,7 +686,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { if err != nil { return err } - if v, ok := f.Type.(serialization.FieldUintVar); ok { + if v, ok := f.Type.(*serialization.FieldUintVar); ok { e.ImmediateCommitTimestamp = v.Value } else { return errors.New("failed to get immediate_commit_timestamp field") @@ -696,7 +696,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { if err != nil { return err } - if v, ok := f.Type.(serialization.FieldUintVar); ok { + if v, ok := f.Type.(*serialization.FieldUintVar); ok { if f.Skipped { e.OriginalCommitTimestamp = e.ImmediateCommitTimestamp } else { @@ -710,7 +710,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { if err != nil { return err } - if v, ok := f.Type.(serialization.FieldUintVar); ok { + if v, ok := f.Type.(*serialization.FieldUintVar); ok { e.ImmediateServerVersion = uint32(v.Value) } else { return errors.New("failed to get immediate_server_version field") @@ -720,7 +720,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { if err != nil { return err } - if v, ok := f.Type.(serialization.FieldUintVar); ok { + if v, ok := f.Type.(*serialization.FieldUintVar); ok { if f.Skipped { e.OriginalServerVersion = e.ImmediateServerVersion } else { @@ -734,7 +734,7 @@ func (e *GtidTaggedLogEvent) Decode(data []byte) error { if err != nil { return err } - if v, ok := f.Type.(serialization.FieldUintVar); ok { + if v, ok := f.Type.(*serialization.FieldUintVar); ok { e.TransactionLength = v.Value } else { return errors.New("failed to get transaction_length field") diff --git a/serialization/serialization.go b/serialization/serialization.go index 70a012af9..c757828c8 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -73,6 +73,7 @@ type Field struct { // FieldType represents a `type_field` type FieldType interface { fmt.Stringer + decode(data []byte, pos uint64) (uint64, error) } // FieldIntFixed is for values with a fixed length. @@ -224,36 +225,9 @@ func Unmarshal(data []byte, v interface{}) error { continue } m.Fields[i].ID = data[pos] >> 1 - pos++ - var n uint64 - var err error - switch f := m.Fields[i].Type.(type) { - case FieldIntFixed: - n, err = f.decode(data, pos) - if err != nil { - return err - } - m.Fields[i].Type = f - case FieldUintVar: - n, err = f.decode(data, pos) - if err != nil { - return err - } - m.Fields[i].Type = f - case FieldIntVar: - n, err = f.decode(data, pos) - if err != nil { - return err - } - m.Fields[i].Type = f - case FieldString: - n, err = f.decode(data, pos) - if err != nil { - return err - } - m.Fields[i].Type = f - default: - return fmt.Errorf("unsupported field type: %T", m.Fields[i].Type) + n, err := m.Fields[i].Type.decode(data, pos) + if err != nil { + return err } pos = n } From 78157e325a414b56f785e8c65d7cf12b793f2b68 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 3 Mar 2025 10:59:28 +0800 Subject: [PATCH 35/36] fix unit test Signed-off-by: lance6716 --- serialization/serialization_test.go | 44 ++++++++++++++--------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/serialization/serialization_test.go b/serialization/serialization_test.go index fc6955733..d8e5a7015 100644 --- a/serialization/serialization_test.go +++ b/serialization/serialization_test.go @@ -235,52 +235,52 @@ func TestUmarshal_event1(t *testing.T) { Fields: []Field{ { Name: "gtid_flags", - Type: FieldIntFixed{ + Type: &FieldIntFixed{ Length: 1, }, }, { Name: "uuid", - Type: FieldIntFixed{ + Type: &FieldIntFixed{ Length: 16, }, }, { Name: "gno", - Type: FieldIntVar{}, + Type: &FieldIntVar{}, }, { Name: "tag", - Type: FieldString{}, + Type: &FieldString{}, }, { Name: "last_committed", - Type: FieldIntVar{}, + Type: &FieldIntVar{}, }, { Name: "sequence_number", - Type: FieldIntVar{}, + Type: &FieldIntVar{}, }, { Name: "immediate_commit_timestamp", - Type: FieldUintVar{}, + Type: &FieldUintVar{}, }, { Name: "original_commit_timestamp", - Type: FieldUintVar{}, + Type: &FieldUintVar{}, Optional: true, }, { Name: "transaction_length", - Type: FieldUintVar{}, + Type: &FieldUintVar{}, }, { Name: "immediate_server_version", - Type: FieldUintVar{}, + Type: &FieldUintVar{}, }, { Name: "original_server_version", - Type: FieldUintVar{}, + Type: &FieldUintVar{}, Optional: true, }, { @@ -299,7 +299,7 @@ func TestUmarshal_event1(t *testing.T) { { Name: "gtid_flags", ID: 0, - Type: FieldIntFixed{ + Type: &FieldIntFixed{ Length: 1, Value: []uint8{0o1}, }, @@ -307,7 +307,7 @@ func TestUmarshal_event1(t *testing.T) { { Name: "uuid", ID: 1, - Type: FieldIntFixed{ + Type: &FieldIntFixed{ Length: 16, Value: []uint8{ 0x89, 0x6e, 0x78, 0x82, 0x18, 0xfe, 0x11, 0xef, 0xab, @@ -318,63 +318,63 @@ func TestUmarshal_event1(t *testing.T) { { Name: "gno", ID: 2, - Type: FieldIntVar{ + Type: &FieldIntVar{ Value: 1, }, }, { Name: "tag", ID: 3, - Type: FieldString{ + Type: &FieldString{ Value: "foobaz", }, }, { Name: "last_committed", ID: 4, - Type: FieldIntVar{ + Type: &FieldIntVar{ Value: 0, }, }, { Name: "sequence_number", ID: 5, - Type: FieldIntVar{ + Type: &FieldIntVar{ Value: 1, }, }, { Name: "immediate_commit_timestamp", ID: 6, - Type: FieldUintVar{ + Type: &FieldUintVar{ Value: 1739823289369365, }, }, { Name: "original_commit_timestamp", ID: 7, - Type: FieldUintVar{}, + Type: &FieldUintVar{}, Optional: true, Skipped: true, }, { Name: "transaction_length", ID: 8, - Type: FieldUintVar{ + Type: &FieldUintVar{ Value: 210, }, }, { Name: "immediate_server_version", ID: 9, - Type: FieldUintVar{ + Type: &FieldUintVar{ Value: 90200, }, }, { Name: "original_server_version", ID: 10, - Type: FieldUintVar{}, + Type: &FieldUintVar{}, Optional: true, Skipped: true, }, From 9b61d61d8c74956ed16aea4f23926fec8c3f8668 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 3 Mar 2025 11:08:26 +0800 Subject: [PATCH 36/36] try to fix unit test again Signed-off-by: lance6716 --- serialization/serialization.go | 1 + 1 file changed, 1 insertion(+) diff --git a/serialization/serialization.go b/serialization/serialization.go index c757828c8..59fb38865 100644 --- a/serialization/serialization.go +++ b/serialization/serialization.go @@ -225,6 +225,7 @@ func Unmarshal(data []byte, v interface{}) error { continue } m.Fields[i].ID = data[pos] >> 1 + pos++ n, err := m.Fields[i].Type.decode(data, pos) if err != nil { return err