5
5
"crypto/tls"
6
6
"io"
7
7
"log"
8
+ "sync"
8
9
"time"
9
10
)
10
11
@@ -27,20 +28,28 @@ func (b *buffer) Add(v interface{}) *list.Element {
27
28
return e
28
29
}
29
30
31
+ type serializedNotif struct {
32
+ id uint32
33
+ b []byte
34
+ }
35
+
30
36
type Client struct {
31
37
Conn * Conn
32
38
FailedNotifs chan NotificationResult
33
39
34
- notifs chan Notification
35
- id uint32
40
+ notifs chan serializedNotif
41
+
42
+ id uint32
43
+ idm sync.Mutex
36
44
}
37
45
38
46
func newClientWithConn (gw string , conn Conn ) Client {
39
47
c := Client {
40
48
Conn : & conn ,
41
49
FailedNotifs : make (chan NotificationResult ),
42
- id : uint32 (1 ),
43
- notifs : make (chan Notification ),
50
+ notifs : make (chan serializedNotif ),
51
+ id : 1 ,
52
+ idm : sync.Mutex {},
44
53
}
45
54
46
55
go c .runLoop ()
@@ -73,10 +82,37 @@ func NewClientWithFiles(gw string, certFile string, keyFile string) (Client, err
73
82
}
74
83
75
84
func (c * Client ) Send (n Notification ) error {
76
- c .notifs <- n
85
+ // Set identifier if not specified
86
+ if n .Identifier == 0 {
87
+ n .Identifier = c .nextID ()
88
+ } else if c .id < n .Identifier {
89
+ c .setID (n .Identifier )
90
+ }
91
+
92
+ b , err := n .ToBinary ()
93
+ if err != nil {
94
+ return err
95
+ }
96
+
97
+ c .notifs <- serializedNotif {b : b , id : n .Identifier }
77
98
return nil
78
99
}
79
100
101
+ func (c * Client ) setID (n uint32 ) {
102
+ c .idm .Lock ()
103
+ defer c .idm .Unlock ()
104
+
105
+ c .id = n
106
+ }
107
+
108
+ func (c * Client ) nextID () uint32 {
109
+ c .idm .Lock ()
110
+ defer c .idm .Unlock ()
111
+
112
+ c .id ++
113
+ return c .id
114
+ }
115
+
80
116
func (c * Client ) reportFailedPush (v interface {}, err * Error ) {
81
117
failedNotif , ok := v .(Notification )
82
118
if ! ok || v == nil {
@@ -93,7 +129,7 @@ func (c *Client) requeue(cursor *list.Element) {
93
129
// If `cursor` is not nil, this means there are notifications that
94
130
// need to be delivered (or redelivered)
95
131
for ; cursor != nil ; cursor = cursor .Next () {
96
- if n , ok := cursor .Value .(Notification ); ok {
132
+ if n , ok := cursor .Value .(serializedNotif ); ok {
97
133
go func () { c .notifs <- n }()
98
134
}
99
135
}
@@ -103,11 +139,11 @@ func (c *Client) handleError(err *Error, buffer *buffer) *list.Element {
103
139
cursor := buffer .Back ()
104
140
105
141
for cursor != nil {
106
- // Get notification
107
- n , _ := cursor .Value .(Notification )
142
+ // Get serialized notification
143
+ n , _ := cursor .Value .(serializedNotif )
108
144
109
145
// If the notification, move cursor after the trouble notification
110
- if n .Identifier == err .Identifier {
146
+ if n .id == err .Identifier {
111
147
go c .reportFailedPush (cursor .Value , err )
112
148
113
149
next := cursor .Next ()
@@ -143,7 +179,7 @@ func (c *Client) runLoop() {
143
179
// Connection open, listen for notifs and errors
144
180
for {
145
181
var err error
146
- var n Notification
182
+ var n serializedNotif
147
183
148
184
// Check for notifications or errors. There is a chance we'll send notifications
149
185
// if we already have an error since `select` will "pseudorandomly" choose a
@@ -169,21 +205,7 @@ func (c *Client) runLoop() {
169
205
// Add to list
170
206
cursor = sent .Add (n )
171
207
172
- // Set identifier if not specified
173
- if n .Identifier == 0 {
174
- n .Identifier = c .id
175
- c .id ++
176
- } else if c .id < n .Identifier {
177
- c .id = n .Identifier + 1
178
- }
179
-
180
- b , err := n .ToBinary ()
181
- if err != nil {
182
- // TODO
183
- continue
184
- }
185
-
186
- _ , err = c .Conn .Write (b )
208
+ _ , err = c .Conn .Write (n .b )
187
209
188
210
if err == io .EOF {
189
211
log .Println ("EOF trying to write notification" )
0 commit comments