Skip to content

Commit 9bbda52

Browse files
committed
update
1 parent 7bf4fda commit 9bbda52

File tree

6 files changed

+146
-70
lines changed

6 files changed

+146
-70
lines changed

cgrpc/balancer.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,9 @@ func (b *corelibBalancer) UpdateClientConnState(ss balancer.ClientConnState) err
7575
b.lastResolveError = nil
7676
defer func() {
7777
if len(b.servers) == 0 || len(b.pservers) > 0 {
78-
b.c.resolver.wakemanual()
78+
b.c.resolver.wake(false)
7979
}
80+
b.c.resolver.wake(true)
8081
}()
8182
//offline
8283
for _, server := range b.servers {
@@ -219,7 +220,7 @@ func (b *corelibBalancer) rebuildpicker(reason bool) {
219220
}
220221
b.setPickerServers(tmp)
221222
if reason {
222-
b.c.resolver.wakemanual()
223+
b.c.resolver.wake(false)
223224
}
224225
return
225226
}
@@ -245,7 +246,7 @@ func (b *corelibBalancer) Pick(info balancer.PickInfo) (balancer.PickResult, err
245246
server.Pickinfo.LastFailTime = time.Now().UnixNano()
246247
if cerror.Equal(transGrpcError(doneinfo.Err), cerror.ErrClosing) {
247248
b.cc.RemoveSubConn(server.subconn)
248-
b.c.resolver.manual(nil)
249+
b.c.ResolveNow()
249250
}
250251
}
251252
},
@@ -257,7 +258,7 @@ func (b *corelibBalancer) Pick(info balancer.PickInfo) (balancer.PickResult, err
257258
}
258259
return balancer.PickResult{}, cerror.ErrNoserver
259260
}
260-
if e := b.c.resolver.waitmanual(info.Ctx); e != nil {
261+
if e := b.c.resolver.wait(info.Ctx, false); e != nil {
261262
if e == context.DeadlineExceeded {
262263
return balancer.PickResult{}, cerror.ErrDeadlineExceeded
263264
} else if e == context.Canceled {

cgrpc/resolver.go

+61-26
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,32 @@ type resolverBuilder struct {
1616
}
1717

1818
func (b *resolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
19-
b.c.resolver = &corelibResolver{
19+
r := &corelibResolver{
2020
lker: &sync.Mutex{},
21-
mstatus: true,
22-
manually: make(chan *struct{}, 1),
23-
manualNotice: make(map[chan *struct{}]*struct{}),
21+
sstatus: true,
22+
system: make(chan *struct{}),
23+
systemNotice: make(map[chan *struct{}]*struct{}),
24+
cstatus: false,
25+
call: make(chan *struct{}, 1),
26+
callNotice: make(map[chan *struct{}]*struct{}),
2427
cc: cc,
2528
}
26-
b.c.resolver.manually <- nil
29+
b.c.resolver = r
30+
r.system <- nil
2731
go func() {
2832
tker := time.NewTicker(b.c.c.DiscoverInterval)
2933
for {
3034
select {
3135
case <-tker.C:
32-
case <-b.c.resolver.manually:
36+
case <-r.system:
37+
case <-r.call:
3338
}
3439
all, e := b.c.c.Discover(b.group, b.name)
3540
if e != nil {
3641
cc.ReportError(e)
3742
log.Error(nil, "[cgrpc.client.resolver] discover servername:", b.name, "servergroup:", b.group, "error:", e)
38-
b.c.resolver.wakemanual()
43+
b.c.resolver.wake(true)
44+
b.c.resolver.wake(false)
3945
continue
4046
}
4147
s := resolver.State{
@@ -65,47 +71,76 @@ func (b *resolverBuilder) Scheme() string {
6571

6672
type corelibResolver struct {
6773
lker *sync.Mutex
68-
mstatus bool
69-
manually chan *struct{}
70-
manualNotice map[chan *struct{}]*struct{}
74+
sstatus bool
75+
system chan *struct{}
76+
systemNotice map[chan *struct{}]*struct{}
77+
cstatus bool
78+
call chan *struct{}
79+
callNotice map[chan *struct{}]*struct{}
7180
cc resolver.ClientConn
7281
}
7382

7483
func (r *corelibResolver) ResolveNow(op resolver.ResolveNowOptions) {
75-
r.manual(nil)
84+
r.triger(nil, true)
7685
}
7786

78-
func (r *corelibResolver) manual(notice chan *struct{}) {
87+
//systemORcall true - system,false - call
88+
func (r *corelibResolver) triger(notice chan *struct{}, systemORcall bool) {
7989
r.lker.Lock()
80-
if notice != nil {
81-
r.manualNotice[notice] = nil
82-
}
83-
if !r.mstatus {
84-
r.mstatus = true
85-
r.manually <- nil
90+
if systemORcall {
91+
if notice != nil {
92+
r.systemNotice[notice] = nil
93+
}
94+
if r.sstatus {
95+
return
96+
}
97+
r.sstatus = true
98+
r.system <- nil
99+
} else {
100+
if notice != nil {
101+
r.callNotice[notice] = nil
102+
}
103+
if r.cstatus {
104+
return
105+
}
106+
r.cstatus = true
107+
r.call <- nil
86108
}
87109
r.lker.Unlock()
88110
}
89111

90-
func (r *corelibResolver) waitmanual(ctx context.Context) error {
112+
//systemORcall true - system,false - call
113+
func (r *corelibResolver) wait(ctx context.Context, systemORcall bool) error {
91114
notice := make(chan *struct{}, 1)
92-
r.manual(notice)
115+
r.triger(notice, systemORcall)
93116
select {
94117
case <-notice:
95118
return nil
96119
case <-ctx.Done():
97120
r.lker.Lock()
98-
delete(r.manualNotice, notice)
121+
if systemORcall {
122+
delete(r.systemNotice, notice)
123+
} else {
124+
delete(r.callNotice, notice)
125+
}
99126
r.lker.Unlock()
100127
return ctx.Err()
101128
}
102129
}
103-
func (r *corelibResolver) wakemanual() {
130+
131+
//systemORcall true - system,false - call
132+
func (r *corelibResolver) wake(systemORcall bool) {
104133
r.lker.Lock()
105-
if r.mstatus {
106-
r.mstatus = false
107-
for notice := range r.manualNotice {
108-
delete(r.manualNotice, notice)
134+
if systemORcall {
135+
r.sstatus = false
136+
for notice := range r.systemNotice {
137+
delete(r.systemNotice, notice)
138+
notice <- nil
139+
}
140+
} else {
141+
r.cstatus = false
142+
for notice := range r.callNotice {
143+
delete(r.callNotice, notice)
109144
notice <- nil
110145
}
111146
}

codegen/tml/gomod/template_gomod.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ const text = `module {{.}}
1111
go 1.18
1212
1313
require (
14-
github.com/chenjie199234/config v0.1.2
15-
github.com/chenjie199234/Corelib v0.0.65
14+
github.com/chenjie199234/config v0.1.3
15+
github.com/chenjie199234/Corelib v0.0.66
1616
github.com/fsnotify/fsnotify v1.5.1
1717
github.com/go-sql-driver/mysql v1.6.0
1818
github.com/segmentio/kafka-go v0.4.31

crpc/balancer.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,9 @@ func (b *corelibBalancer) UpdateDiscovery(all map[string]*RegisterData) {
122122
b.lker.Lock()
123123
defer func() {
124124
if len(b.servers) == 0 || len(b.pservers) > 0 {
125-
b.c.resolver.wakemanual()
125+
b.c.resolver.wake(false)
126126
}
127+
b.c.resolver.wake(true)
127128
b.lker.Unlock()
128129
}()
129130
if bytes.Equal(b.serversRaw, d) {
@@ -215,8 +216,9 @@ func (b *corelibBalancer) ReconnectCheck(server *ServerForPick) bool {
215216
return false
216217
}
217218
b.lker.Unlock()
219+
time.Sleep(time.Millisecond * 100)
218220
//need to check server register status
219-
b.c.resolver.waitmanual(context.Background())
221+
b.c.resolver.wait(context.Background(), true)
220222
b.lker.Lock()
221223
if len(server.dservers) == 0 {
222224
//server already unregister,remove server
@@ -240,7 +242,7 @@ func (b *corelibBalancer) RebuildPicker(reason bool) {
240242
}
241243
b.setPickerServers(tmp)
242244
if reason {
243-
b.c.resolver.wakemanual()
245+
b.c.resolver.wake(false)
244246
}
245247
b.lker.Unlock()
246248
}
@@ -257,7 +259,7 @@ func (b *corelibBalancer) Pick(ctx context.Context) (*ServerForPick, error) {
257259
}
258260
return nil, cerror.ErrNoserver
259261
}
260-
if e := b.c.resolver.waitmanual(ctx); e != nil {
262+
if e := b.c.resolver.wait(ctx, false); e != nil {
261263
if e == context.DeadlineExceeded {
262264
return nil, cerror.ErrDeadlineExceeded
263265
} else if e == context.Canceled {

crpc/client.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func NewCrpcClient(c *ClientConfig, selfgroup, selfname, servergroup, servername
146146
}
147147

148148
func (c *CrpcClient) ResolveNow() {
149-
c.resolver.manual(nil)
149+
c.resolver.ResolveNow()
150150
}
151151

152152
func (c *CrpcClient) start(server *ServerForPick, reconnect bool) {
@@ -161,7 +161,6 @@ func (c *CrpcClient) start(server *ServerForPick, reconnect bool) {
161161
addr = "tcp://" + server.addr
162162
}
163163
if !c.instance.StartClient(addr, common.Str2byte(c.serverappname), c.tlsc) {
164-
time.Sleep(time.Millisecond * 100)
165164
go c.start(server, true)
166165
}
167166
}
@@ -297,8 +296,8 @@ func (c *CrpcClient) Call(ctx context.Context, path string, in []byte, metadata
297296
//req error,update last fail time
298297
server.Pickinfo.LastFailTime = time.Now().UnixNano()
299298
if cerror.Equal(r.err, cerror.ErrClosing) {
300-
//triger manually discovery
301-
c.resolver.manual(nil)
299+
//triger discovery
300+
c.ResolveNow()
302301
//server is closing,this req can be retry
303302
r.respdata = nil
304303
r.err = nil

crpc/resolver.go

+69-30
Original file line numberDiff line numberDiff line change
@@ -10,31 +10,39 @@ import (
1010

1111
type corelibResolver struct {
1212
lker *sync.Mutex
13-
mstatus bool
14-
manually chan *struct{}
15-
manualNotice map[chan *struct{}]*struct{}
13+
sstatus bool
14+
system chan *struct{}
15+
systemNotice map[chan *struct{}]*struct{}
16+
cstatus bool
17+
call chan *struct{}
18+
callNotice map[chan *struct{}]*struct{}
1619
}
1720

1821
func newCorelibResolver(group, name string, c *CrpcClient) *corelibResolver {
1922
r := &corelibResolver{
2023
lker: &sync.Mutex{},
21-
mstatus: true,
22-
manually: make(chan *struct{}, 1),
23-
manualNotice: make(map[chan *struct{}]*struct{}),
24+
sstatus: true,
25+
system: make(chan *struct{}),
26+
systemNotice: make(map[chan *struct{}]*struct{}),
27+
cstatus: false,
28+
call: make(chan *struct{}, 1),
29+
callNotice: make(map[chan *struct{}]*struct{}),
2430
}
25-
r.manually <- nil
31+
r.system <- nil
2632
go func() {
2733
tker := time.NewTicker(c.c.DiscoverInterval)
2834
for {
2935
select {
3036
case <-tker.C:
31-
case <-r.manually:
37+
case <-r.system:
38+
case <-r.call:
3239
}
3340
all, e := c.c.Discover(group, name)
3441
if e != nil {
3542
c.balancer.ResolverError(e)
3643
log.Error(nil, "[crpc.client.resolver] discover servername:", name, "servergroup:", group, "error:", e)
37-
r.wakemanual()
44+
r.wake(true)
45+
r.wake(false)
3846
continue
3947
}
4048
for k, v := range all {
@@ -48,38 +56,69 @@ func newCorelibResolver(group, name string, c *CrpcClient) *corelibResolver {
4856
return r
4957
}
5058

51-
func (c *corelibResolver) manual(notice chan *struct{}) {
52-
c.lker.Lock()
53-
if notice != nil {
54-
c.manualNotice[notice] = nil
55-
}
56-
if !c.mstatus {
57-
c.mstatus = true
58-
c.manually <- nil
59+
func (r *corelibResolver) ResolveNow() {
60+
r.triger(nil, true)
61+
}
62+
63+
//systemORcall true - system,false - call
64+
func (r *corelibResolver) triger(notice chan *struct{}, systemORcall bool) {
65+
r.lker.Lock()
66+
if systemORcall {
67+
if notice != nil {
68+
r.systemNotice[notice] = nil
69+
}
70+
if r.sstatus {
71+
return
72+
}
73+
r.sstatus = true
74+
r.system <- nil
75+
} else {
76+
if notice != nil {
77+
r.callNotice[notice] = nil
78+
}
79+
if r.cstatus {
80+
return
81+
}
82+
r.cstatus = true
83+
r.call <- nil
5984
}
60-
c.lker.Unlock()
85+
r.lker.Unlock()
6186
}
62-
func (c *corelibResolver) waitmanual(ctx context.Context) error {
87+
88+
//systemORcall true - system,false - call
89+
func (r *corelibResolver) wait(ctx context.Context, systemORcall bool) error {
6390
notice := make(chan *struct{}, 1)
64-
c.manual(notice)
91+
r.triger(notice, systemORcall)
6592
select {
6693
case <-notice:
6794
return nil
6895
case <-ctx.Done():
69-
c.lker.Lock()
70-
delete(c.manualNotice, notice)
71-
c.lker.Unlock()
96+
r.lker.Lock()
97+
if systemORcall {
98+
delete(r.systemNotice, notice)
99+
} else {
100+
delete(r.callNotice, notice)
101+
}
102+
r.lker.Unlock()
72103
return ctx.Err()
73104
}
74105
}
75-
func (c *corelibResolver) wakemanual() {
76-
c.lker.Lock()
77-
if c.mstatus {
78-
c.mstatus = false
79-
for notice := range c.manualNotice {
80-
delete(c.manualNotice, notice)
106+
107+
//systemORcall true - system,false - call
108+
func (r *corelibResolver) wake(systemORcall bool) {
109+
r.lker.Lock()
110+
if systemORcall {
111+
r.sstatus = false
112+
for notice := range r.systemNotice {
113+
delete(r.systemNotice, notice)
114+
notice <- nil
115+
}
116+
} else {
117+
r.cstatus = false
118+
for notice := range r.callNotice {
119+
delete(r.callNotice, notice)
81120
notice <- nil
82121
}
83122
}
84-
c.lker.Unlock()
123+
r.lker.Unlock()
85124
}

0 commit comments

Comments
 (0)