@@ -12,6 +12,7 @@ import (
12
12
"unicode"
13
13
14
14
"github.com/go-mysql-org/go-mysql/mysql"
15
+ "github.com/go-mysql-org/go-mysql/serialization"
15
16
"github.com/google/uuid"
16
17
"github.com/pingcap/errors"
17
18
)
@@ -420,6 +421,7 @@ func (e *QueryEvent) Dump(w io.Writer) {
420
421
type GTIDEvent struct {
421
422
CommitFlag uint8
422
423
SID []byte
424
+ Tag string
423
425
GNO int64
424
426
LastCommitted int64
425
427
SequenceNumber int64
@@ -512,7 +514,11 @@ func (e *GTIDEvent) Dump(w io.Writer) {
512
514
513
515
fmt .Fprintf (w , "Commit flag: %d\n " , e .CommitFlag )
514
516
u , _ := uuid .FromBytes (e .SID )
515
- fmt .Fprintf (w , "GTID_NEXT: %s:%d\n " , u .String (), e .GNO )
517
+ if e .Tag != "" {
518
+ fmt .Fprintf (w , "GTID_NEXT: %s:%s:%d\n " , u .String (), e .Tag , e .GNO )
519
+ } else {
520
+ fmt .Fprintf (w , "GTID_NEXT: %s:%d\n " , u .String (), e .GNO )
521
+ }
516
522
fmt .Fprintf (w , "LAST_COMMITTED: %d\n " , e .LastCommitted )
517
523
fmt .Fprintf (w , "SEQUENCE_NUMBER: %d\n " , e .SequenceNumber )
518
524
fmt .Fprintf (w , "Immediate commmit timestamp: %d (%s)\n " , e .ImmediateCommitTimestamp , fmtTime (e .ImmediateCommitTime ()))
@@ -543,6 +549,202 @@ func (e *GTIDEvent) OriginalCommitTime() time.Time {
543
549
return microSecTimestampToTime (e .OriginalCommitTimestamp )
544
550
}
545
551
552
+ // GtidTaggedLogEvent is for a GTID event with a tag.
553
+ // This is similar to GTIDEvent, but it has a tag and uses a different serialization format.
554
+ type GtidTaggedLogEvent struct {
555
+ GTIDEvent
556
+ }
557
+
558
+ func (e * GtidTaggedLogEvent ) Decode (data []byte ) error {
559
+ msg := serialization.Message {
560
+ Format : serialization.Format {
561
+ Fields : []serialization.Field {
562
+ {
563
+ Name : "gtid_flags" ,
564
+ Type : & serialization.FieldIntFixed {
565
+ Length : 1 ,
566
+ },
567
+ },
568
+ {
569
+ Name : "uuid" ,
570
+ Type : & serialization.FieldIntFixed {
571
+ Length : 16 ,
572
+ },
573
+ },
574
+ {
575
+ Name : "gno" ,
576
+ Type : & serialization.FieldIntVar {},
577
+ },
578
+ {
579
+ Name : "tag" ,
580
+ Type : & serialization.FieldString {},
581
+ },
582
+ {
583
+ Name : "last_committed" ,
584
+ Type : & serialization.FieldIntVar {},
585
+ },
586
+ {
587
+ Name : "sequence_number" ,
588
+ Type : & serialization.FieldIntVar {},
589
+ },
590
+ {
591
+ Name : "immediate_commit_timestamp" ,
592
+ Type : & serialization.FieldUintVar {},
593
+ },
594
+ {
595
+ Name : "original_commit_timestamp" ,
596
+ Type : & serialization.FieldUintVar {},
597
+ Optional : true ,
598
+ },
599
+ {
600
+ Name : "transaction_length" ,
601
+ Type : & serialization.FieldUintVar {},
602
+ },
603
+ {
604
+ Name : "immediate_server_version" ,
605
+ Type : & serialization.FieldUintVar {},
606
+ },
607
+ {
608
+ Name : "original_server_version" ,
609
+ Type : & serialization.FieldUintVar {},
610
+ Optional : true ,
611
+ },
612
+ {
613
+ Name : "commit_group_ticket" ,
614
+ Optional : true ,
615
+ },
616
+ },
617
+ },
618
+ }
619
+
620
+ err := serialization .Unmarshal (data , & msg )
621
+ if err != nil {
622
+ return err
623
+ }
624
+
625
+ f , err := msg .GetFieldByName ("gtid_flags" )
626
+ if err != nil {
627
+ return err
628
+ }
629
+ if v , ok := f .Type .(* serialization.FieldIntFixed ); ok {
630
+ e .CommitFlag = v .Value [0 ]
631
+ } else {
632
+ return errors .New ("failed to get gtid_flags field" )
633
+ }
634
+
635
+ f , err = msg .GetFieldByName ("uuid" )
636
+ if err != nil {
637
+ return err
638
+ }
639
+ if v , ok := f .Type .(* serialization.FieldIntFixed ); ok {
640
+ e .SID = v .Value
641
+ } else {
642
+ return errors .New ("failed to get uuid field" )
643
+ }
644
+
645
+ f , err = msg .GetFieldByName ("gno" )
646
+ if err != nil {
647
+ return err
648
+ }
649
+ if v , ok := f .Type .(* serialization.FieldIntVar ); ok {
650
+ e .GNO = v .Value
651
+ } else {
652
+ return errors .New ("failed to get gno field" )
653
+ }
654
+
655
+ f , err = msg .GetFieldByName ("tag" )
656
+ if err != nil {
657
+ return err
658
+ }
659
+ if v , ok := f .Type .(* serialization.FieldString ); ok {
660
+ e .Tag = v .Value
661
+ } else {
662
+ return errors .New ("failed to get tag field" )
663
+ }
664
+
665
+ f , err = msg .GetFieldByName ("last_committed" )
666
+ if err != nil {
667
+ return err
668
+ }
669
+ if v , ok := f .Type .(* serialization.FieldIntVar ); ok {
670
+ e .LastCommitted = v .Value
671
+ } else {
672
+ return errors .New ("failed to get last_committed field" )
673
+ }
674
+
675
+ f , err = msg .GetFieldByName ("sequence_number" )
676
+ if err != nil {
677
+ return err
678
+ }
679
+ if v , ok := f .Type .(* serialization.FieldIntVar ); ok {
680
+ e .SequenceNumber = v .Value
681
+ } else {
682
+ return errors .New ("failed to get sequence_number field" )
683
+ }
684
+
685
+ f , err = msg .GetFieldByName ("immediate_commit_timestamp" )
686
+ if err != nil {
687
+ return err
688
+ }
689
+ if v , ok := f .Type .(* serialization.FieldUintVar ); ok {
690
+ e .ImmediateCommitTimestamp = v .Value
691
+ } else {
692
+ return errors .New ("failed to get immediate_commit_timestamp field" )
693
+ }
694
+
695
+ f , err = msg .GetFieldByName ("original_commit_timestamp" )
696
+ if err != nil {
697
+ return err
698
+ }
699
+ if v , ok := f .Type .(* serialization.FieldUintVar ); ok {
700
+ if f .Skipped {
701
+ e .OriginalCommitTimestamp = e .ImmediateCommitTimestamp
702
+ } else {
703
+ e .OriginalCommitTimestamp = v .Value
704
+ }
705
+ } else {
706
+ return errors .New ("failed to get original_commit_timestamp field" )
707
+ }
708
+
709
+ f , err = msg .GetFieldByName ("immediate_server_version" )
710
+ if err != nil {
711
+ return err
712
+ }
713
+ if v , ok := f .Type .(* serialization.FieldUintVar ); ok {
714
+ e .ImmediateServerVersion = uint32 (v .Value )
715
+ } else {
716
+ return errors .New ("failed to get immediate_server_version field" )
717
+ }
718
+
719
+ f , err = msg .GetFieldByName ("original_server_version" )
720
+ if err != nil {
721
+ return err
722
+ }
723
+ if v , ok := f .Type .(* serialization.FieldUintVar ); ok {
724
+ if f .Skipped {
725
+ e .OriginalServerVersion = e .ImmediateServerVersion
726
+ } else {
727
+ e .OriginalServerVersion = uint32 (v .Value )
728
+ }
729
+ } else {
730
+ return errors .New ("failed to get original_server_version field" )
731
+ }
732
+
733
+ f , err = msg .GetFieldByName ("transaction_length" )
734
+ if err != nil {
735
+ return err
736
+ }
737
+ if v , ok := f .Type .(* serialization.FieldUintVar ); ok {
738
+ e .TransactionLength = v .Value
739
+ } else {
740
+ return errors .New ("failed to get transaction_length field" )
741
+ }
742
+
743
+ // TODO: add and test commit_group_ticket
744
+
745
+ return nil
746
+ }
747
+
546
748
type BeginLoadQueryEvent struct {
547
749
FileID uint32
548
750
BlockData []byte
0 commit comments