-
Notifications
You must be signed in to change notification settings - Fork 96
/
Copy pathpubsub.go
163 lines (144 loc) · 4.18 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
// Copyright (c) 2014-2019 Ludovic Fauvet
// Licensed under the MIT license
package database
import (
"sync"
"time"
"github.com/gomodule/redigo/redis"
"github.com/op/go-logging"
)
var (
log = logging.MustGetLogger("main")
)
type pubsubEvent string
const (
CLUSTER pubsubEvent = "_mirrorbits_cluster"
FILE_UPDATE pubsubEvent = "_mirrorbits_file_update"
MIRROR_UPDATE pubsubEvent = "_mirrorbits_mirror_update"
MIRROR_FILE_UPDATE pubsubEvent = "_mirrorbits_mirror_file_update"
PUBSUB_RECONNECTED pubsubEvent = "_mirrorbits_pubsub_reconnected"
)
// Pubsub is the internal structure of the publish/subscribe handler
type Pubsub struct {
r *Redis
rconn redis.Conn
connlock sync.Mutex
extSubscribers map[string][]chan string
extSubscribersLock sync.RWMutex
stop chan bool
wg sync.WaitGroup
}
// NewPubsub returns a new instance of the publish/subscribe handler
func NewPubsub(r *Redis) *Pubsub {
pubsub := new(Pubsub)
pubsub.r = r
pubsub.stop = make(chan bool)
pubsub.extSubscribers = make(map[string][]chan string)
go pubsub.updateEvents()
return pubsub
}
// Close all the connections to the pubsub server
func (p *Pubsub) Close() {
close(p.stop)
p.connlock.Lock()
if p.rconn != nil {
// FIXME Calling p.rconn.Close() here will block indefinitely in redigo
p.rconn.Send("UNSUBSCRIBE")
p.rconn.Send("QUIT")
p.rconn.Flush()
}
p.connlock.Unlock()
p.wg.Wait()
}
// SubscribeEvent allows subscription to a particular kind of events and receive a
// notification when an event is dispatched on the given channel.
func (p *Pubsub) SubscribeEvent(event pubsubEvent, channel chan string) {
p.extSubscribersLock.Lock()
defer p.extSubscribersLock.Unlock()
listeners := p.extSubscribers[string(event)]
listeners = append(listeners, channel)
p.extSubscribers[string(event)] = listeners
}
func (p *Pubsub) updateEvents() {
p.wg.Add(1)
defer p.wg.Done()
disconnected := false
connect:
for {
select {
case <-p.stop:
return
default:
}
p.connlock.Lock()
p.rconn = p.r.Get()
if _, err := p.rconn.Do("PING"); err != nil {
disconnected = true
p.rconn.Close()
p.rconn = nil
p.connlock.Unlock()
if RedisIsLoading(err) {
// Doing a PING after (re-connection) prevents cases where redis
// is currently loading the dataset and is still not ready.
log.Warning("Redis is still loading the dataset in memory")
}
time.Sleep(500 * time.Millisecond)
continue
}
p.connlock.Unlock()
log.Debug("Subscribing pubsub")
psc := redis.PubSubConn{Conn: p.rconn}
psc.Subscribe(CLUSTER)
psc.Subscribe(FILE_UPDATE)
psc.Subscribe(MIRROR_UPDATE)
psc.Subscribe(MIRROR_FILE_UPDATE)
if disconnected == true {
// This is a way to keep the cache active while disconnected
// from redis but still clear the cache (possibly outdated)
// after a successful reconnection.
disconnected = false
p.handleMessage(string(PUBSUB_RECONNECTED), nil)
}
for {
switch v := psc.Receive().(type) {
case redis.Message:
//log.Debugf("Redis message on channel %s: message: %s", v.Channel, v.Data)
p.handleMessage(v.Channel, v.Data)
case redis.Subscription:
log.Debugf("Redis subscription on channel %s: %s (%d)", v.Channel, v.Kind, v.Count)
case error:
select {
case <-p.stop:
return
default:
}
log.Errorf("Pubsub disconnected: %s", v)
psc.Close()
p.rconn.Close()
time.Sleep(50 * time.Millisecond)
disconnected = true
goto connect
}
}
}
}
// Notify subscribers of the new message
func (p *Pubsub) handleMessage(channel string, data []byte) {
p.extSubscribersLock.RLock()
defer p.extSubscribersLock.RUnlock()
listeners := p.extSubscribers[channel]
for _, listener := range listeners {
// Block if the listener is not available
listener <- string(data)
}
}
// Publish a message on the pubsub server
func Publish(r redis.Conn, event pubsubEvent, message string) error {
_, err := r.Do("PUBLISH", string(event), message)
return err
}
// SendPublish add the message to a transaction
func SendPublish(r redis.Conn, event pubsubEvent, message string) error {
err := r.Send("PUBLISH", string(event), message)
return err
}