Skip to content

Commit f53dd22

Browse files
dveedenserprexCopilotlance6716
authored
replication: Add mysql::serialization based Gtid Log Event (#990)
* replication: Add mysql::serialization based Gtid Log Event * fixup * fixup * fixup * Add test and get field by name * more testing * More testing * fixup * prepare for negative values * Update based on review * fixup * fixup * fixup * fixup * updates * updates * fixup * More testing * Update serialization/serialization.go Co-authored-by: Philip Dubé <[email protected]> * fixup * formatting * Update based on review * Update serialization/serialization.go Co-authored-by: Copilot <[email protected]> * Don't store the message in the event struct * use map to speedup lookup of fields * Use decode method * Change Size and LastNonIgnoreableField to uint8 * Change field ID to uint8 * Use stringParts as suggested in review * fixup * Set original values to immediate values if skipped * Update serialization/serialization.go Co-authored-by: lance6716 <[email protected]> * Update serialization/serialization.go Co-authored-by: lance6716 <[email protected]> * test my suggestion Signed-off-by: lance6716 <[email protected]> * fix unit test Signed-off-by: lance6716 <[email protected]> * try to fix unit test again Signed-off-by: lance6716 <[email protected]> --------- Signed-off-by: lance6716 <[email protected]> Co-authored-by: Philip Dubé <[email protected]> Co-authored-by: Copilot <[email protected]> Co-authored-by: lance6716 <[email protected]>
1 parent 89fb440 commit f53dd22

File tree

4 files changed

+887
-1
lines changed

4 files changed

+887
-1
lines changed

replication/event.go

+203-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"unicode"
1313

1414
"github.com/go-mysql-org/go-mysql/mysql"
15+
"github.com/go-mysql-org/go-mysql/serialization"
1516
"github.com/google/uuid"
1617
"github.com/pingcap/errors"
1718
)
@@ -420,6 +421,7 @@ func (e *QueryEvent) Dump(w io.Writer) {
420421
type GTIDEvent struct {
421422
CommitFlag uint8
422423
SID []byte
424+
Tag string
423425
GNO int64
424426
LastCommitted int64
425427
SequenceNumber int64
@@ -512,7 +514,11 @@ func (e *GTIDEvent) Dump(w io.Writer) {
512514

513515
fmt.Fprintf(w, "Commit flag: %d\n", e.CommitFlag)
514516
u, _ := uuid.FromBytes(e.SID)
515-
fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO)
517+
if e.Tag != "" {
518+
fmt.Fprintf(w, "GTID_NEXT: %s:%s:%d\n", u.String(), e.Tag, e.GNO)
519+
} else {
520+
fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO)
521+
}
516522
fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted)
517523
fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber)
518524
fmt.Fprintf(w, "Immediate commmit timestamp: %d (%s)\n", e.ImmediateCommitTimestamp, fmtTime(e.ImmediateCommitTime()))
@@ -543,6 +549,202 @@ func (e *GTIDEvent) OriginalCommitTime() time.Time {
543549
return microSecTimestampToTime(e.OriginalCommitTimestamp)
544550
}
545551

552+
// GtidTaggedLogEvent is for a GTID event with a tag.
553+
// This is similar to GTIDEvent, but it has a tag and uses a different serialization format.
554+
type GtidTaggedLogEvent struct {
555+
GTIDEvent
556+
}
557+
558+
func (e *GtidTaggedLogEvent) Decode(data []byte) error {
559+
msg := serialization.Message{
560+
Format: serialization.Format{
561+
Fields: []serialization.Field{
562+
{
563+
Name: "gtid_flags",
564+
Type: &serialization.FieldIntFixed{
565+
Length: 1,
566+
},
567+
},
568+
{
569+
Name: "uuid",
570+
Type: &serialization.FieldIntFixed{
571+
Length: 16,
572+
},
573+
},
574+
{
575+
Name: "gno",
576+
Type: &serialization.FieldIntVar{},
577+
},
578+
{
579+
Name: "tag",
580+
Type: &serialization.FieldString{},
581+
},
582+
{
583+
Name: "last_committed",
584+
Type: &serialization.FieldIntVar{},
585+
},
586+
{
587+
Name: "sequence_number",
588+
Type: &serialization.FieldIntVar{},
589+
},
590+
{
591+
Name: "immediate_commit_timestamp",
592+
Type: &serialization.FieldUintVar{},
593+
},
594+
{
595+
Name: "original_commit_timestamp",
596+
Type: &serialization.FieldUintVar{},
597+
Optional: true,
598+
},
599+
{
600+
Name: "transaction_length",
601+
Type: &serialization.FieldUintVar{},
602+
},
603+
{
604+
Name: "immediate_server_version",
605+
Type: &serialization.FieldUintVar{},
606+
},
607+
{
608+
Name: "original_server_version",
609+
Type: &serialization.FieldUintVar{},
610+
Optional: true,
611+
},
612+
{
613+
Name: "commit_group_ticket",
614+
Optional: true,
615+
},
616+
},
617+
},
618+
}
619+
620+
err := serialization.Unmarshal(data, &msg)
621+
if err != nil {
622+
return err
623+
}
624+
625+
f, err := msg.GetFieldByName("gtid_flags")
626+
if err != nil {
627+
return err
628+
}
629+
if v, ok := f.Type.(*serialization.FieldIntFixed); ok {
630+
e.CommitFlag = v.Value[0]
631+
} else {
632+
return errors.New("failed to get gtid_flags field")
633+
}
634+
635+
f, err = msg.GetFieldByName("uuid")
636+
if err != nil {
637+
return err
638+
}
639+
if v, ok := f.Type.(*serialization.FieldIntFixed); ok {
640+
e.SID = v.Value
641+
} else {
642+
return errors.New("failed to get uuid field")
643+
}
644+
645+
f, err = msg.GetFieldByName("gno")
646+
if err != nil {
647+
return err
648+
}
649+
if v, ok := f.Type.(*serialization.FieldIntVar); ok {
650+
e.GNO = v.Value
651+
} else {
652+
return errors.New("failed to get gno field")
653+
}
654+
655+
f, err = msg.GetFieldByName("tag")
656+
if err != nil {
657+
return err
658+
}
659+
if v, ok := f.Type.(*serialization.FieldString); ok {
660+
e.Tag = v.Value
661+
} else {
662+
return errors.New("failed to get tag field")
663+
}
664+
665+
f, err = msg.GetFieldByName("last_committed")
666+
if err != nil {
667+
return err
668+
}
669+
if v, ok := f.Type.(*serialization.FieldIntVar); ok {
670+
e.LastCommitted = v.Value
671+
} else {
672+
return errors.New("failed to get last_committed field")
673+
}
674+
675+
f, err = msg.GetFieldByName("sequence_number")
676+
if err != nil {
677+
return err
678+
}
679+
if v, ok := f.Type.(*serialization.FieldIntVar); ok {
680+
e.SequenceNumber = v.Value
681+
} else {
682+
return errors.New("failed to get sequence_number field")
683+
}
684+
685+
f, err = msg.GetFieldByName("immediate_commit_timestamp")
686+
if err != nil {
687+
return err
688+
}
689+
if v, ok := f.Type.(*serialization.FieldUintVar); ok {
690+
e.ImmediateCommitTimestamp = v.Value
691+
} else {
692+
return errors.New("failed to get immediate_commit_timestamp field")
693+
}
694+
695+
f, err = msg.GetFieldByName("original_commit_timestamp")
696+
if err != nil {
697+
return err
698+
}
699+
if v, ok := f.Type.(*serialization.FieldUintVar); ok {
700+
if f.Skipped {
701+
e.OriginalCommitTimestamp = e.ImmediateCommitTimestamp
702+
} else {
703+
e.OriginalCommitTimestamp = v.Value
704+
}
705+
} else {
706+
return errors.New("failed to get original_commit_timestamp field")
707+
}
708+
709+
f, err = msg.GetFieldByName("immediate_server_version")
710+
if err != nil {
711+
return err
712+
}
713+
if v, ok := f.Type.(*serialization.FieldUintVar); ok {
714+
e.ImmediateServerVersion = uint32(v.Value)
715+
} else {
716+
return errors.New("failed to get immediate_server_version field")
717+
}
718+
719+
f, err = msg.GetFieldByName("original_server_version")
720+
if err != nil {
721+
return err
722+
}
723+
if v, ok := f.Type.(*serialization.FieldUintVar); ok {
724+
if f.Skipped {
725+
e.OriginalServerVersion = e.ImmediateServerVersion
726+
} else {
727+
e.OriginalServerVersion = uint32(v.Value)
728+
}
729+
} else {
730+
return errors.New("failed to get original_server_version field")
731+
}
732+
733+
f, err = msg.GetFieldByName("transaction_length")
734+
if err != nil {
735+
return err
736+
}
737+
if v, ok := f.Type.(*serialization.FieldUintVar); ok {
738+
e.TransactionLength = v.Value
739+
} else {
740+
return errors.New("failed to get transaction_length field")
741+
}
742+
743+
// TODO: add and test commit_group_ticket
744+
745+
return nil
746+
}
747+
546748
type BeginLoadQueryEvent struct {
547749
FileID uint32
548750
BlockData []byte

replication/parser.go

+2
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
291291
e = &GTIDEvent{}
292292
case ANONYMOUS_GTID_EVENT:
293293
e = &GTIDEvent{}
294+
case GTID_TAGGED_LOG_EVENT:
295+
e = &GtidTaggedLogEvent{}
294296
case BEGIN_LOAD_QUERY_EVENT:
295297
e = &BeginLoadQueryEvent{}
296298
case EXECUTE_LOAD_QUERY_EVENT:

0 commit comments

Comments
 (0)