-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathloadbalancer.go
115 lines (99 loc) · 2.89 KB
/
loadbalancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package multiproxy
import (
"context"
"math/rand"
"net/http"
"time"
)
type LoadBalancer[T any] struct {
upstreams []*Upstream[T]
// upstreams that are being prepared to be added to the load balancer
stagedUpstreams []*Upstream[T]
OnError func(up *Upstream[T], req *http.Request, err error)
BeforeRequest func(up *Upstream[T], req *http.Request)
AfterRequest func(up *Upstream[T], req *http.Request, res *http.Response)
OnMarkUnhealthy func(up *Upstream[T])
OnMarkHealthy func(up *Upstream[T])
disposed bool
cancel context.CancelFunc
context context.Context
}
func CreateLoadBalancer[T any]() *LoadBalancer[T] {
ctx, cancel := context.WithCancel(context.Background())
lb := &LoadBalancer[T]{
context: ctx,
cancel: cancel,
}
go lb.StartHealthWatcher()
return lb
}
func (lb *LoadBalancer[T]) SetUpstreams(upstreams []*Upstream[T]) {
for _, upstream := range upstreams {
upstream.Healthy = true
}
lb.upstreams = upstreams
}
func (lb *LoadBalancer[T]) AddStagedUpstream(upstream *Upstream[T]) {
lb.stagedUpstreams = append(lb.stagedUpstreams, upstream)
}
func (lb *LoadBalancer[T]) ClearStagedUpstreams() {
lb.stagedUpstreams = nil
}
func (lb *LoadBalancer[T]) ApplyStagedUpstreams() {
lb.upstreams = lb.stagedUpstreams
lb.stagedUpstreams = nil
}
func (lb *LoadBalancer[T]) Dispose() {
lb.cancel()
}
func (lb *LoadBalancer[T]) StartHealthWatcher() {
healthyTicker := time.NewTicker(10 * time.Second)
for {
select {
case <-lb.context.Done():
healthyTicker.Stop()
return
case <-healthyTicker.C:
for _, upstream := range lb.upstreams {
// if the upstream is unhealthy, and it's been more than 10 seconds since the last request
// mark it as healthy to see if it's back up
// it will be marked as unhealthy again if the next request fails
if !upstream.Healthy && time.Since(upstream.LastRequest) > 10*time.Second {
upstream.Healthy = true
if lb.OnMarkHealthy != nil {
lb.OnMarkHealthy(upstream)
}
}
}
}
}
}
func (lb *LoadBalancer[T]) Add(upstream *Upstream[T]) {
upstream.Healthy = true
lb.upstreams = append(lb.upstreams, upstream)
}
// GetValidUpstreams returns a list of upstreams that are healthy and can service the incoming request
func (lb *LoadBalancer[T]) GetValidUpstreams(req *http.Request) []*Upstream[T] {
var upstreams = make([]*Upstream[T], 0, len(lb.upstreams))
for _, upstream := range lb.upstreams {
if upstream.CanServiceRequest(req) {
upstreams = append(upstreams, upstream)
}
}
return upstreams
}
func (lb *LoadBalancer[T]) GetUpstreams() []*Upstream[T] {
return lb.upstreams
}
func (lb *LoadBalancer[T]) GetStagedUpstreams() []*Upstream[T] {
return lb.stagedUpstreams
}
func (lb *LoadBalancer[T]) Random(r *http.Request) *Upstream[T] {
upstreams := lb.GetValidUpstreams(r)
l := len(upstreams)
if l == 0 {
return nil
}
index := rand.Intn(l)
return upstreams[index]
}