Skip to content

Commit 32ecbcd

Browse files
committed
update
1 parent 4a6b8c4 commit 32ecbcd

File tree

9 files changed

+85
-480
lines changed

9 files changed

+85
-480
lines changed

cgrpc/client.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -184,16 +184,12 @@ func (c *CGrpcClient) UpdateDiscovery(all map[string]*RegisterData) {
184184
c.resolver.cc.UpdateState(s)
185185
}
186186
func (c *CGrpcClient) Call(ctx context.Context, path string, req interface{}, resp interface{}, metadata map[string]string) error {
187-
start := time.Now()
188187
if c.c.GlobalTimeout != 0 {
189188
var cancel context.CancelFunc
190-
ctx, cancel = context.WithDeadline(ctx, start.Add(c.c.GlobalTimeout))
189+
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.c.GlobalTimeout))
191190
defer cancel()
192191
}
193192
dl, ok := ctx.Deadline()
194-
if ok && dl.UnixNano() <= start.UnixNano()+int64(5*time.Millisecond) {
195-
return cerror.ErrDeadlineExceeded
196-
}
197193
md := gmetadata.New(nil)
198194
if len(metadata) != 0 {
199195
d, _ := json.Marshal(metadata)
@@ -206,6 +202,11 @@ func (c *CGrpcClient) Call(ctx context.Context, path string, req interface{}, re
206202
md.Set("core_target", c.serverappname)
207203
ctx = gmetadata.NewOutgoingContext(ctx, md)
208204
for {
205+
start := time.Now()
206+
if ok && dl.UnixNano() <= start.UnixNano()+int64(5*time.Millisecond) {
207+
//at least 5ms for net lag and server logic
208+
return cerror.ErrDeadlineExceeded
209+
}
209210
p := &peer.Peer{}
210211
e := transGrpcError(c.conn.Invoke(ctx, path, req, resp, grpc.Peer(p)))
211212
end := time.Now()

codegen/tml/dao/template_dao.go

+1-31
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func NewApi() error {
5959
//if e != nil {
6060
// return e
6161
//}
62-
//ExampleWebApi = example.NewExampleWebClient(exampleweb)
62+
//ExampleWebApi = example.NewExampleWebClient(exampleweb,"http://examplehost:exampleport")
6363
6464
return nil
6565
}
@@ -158,36 +158,6 @@ func getWebClientConfig() *web.ClientConfig {
158158
MaxHeader: 1024,
159159
SocketRBuf: 2048,
160160
SocketWBuf: 2048,
161-
Discover: webDNS,
162-
}
163-
}
164-
165-
func webDNS(group, name string, manually <-chan *struct{}, client *web.WebClient) {
166-
tker := time.NewTicker(time.Second * 10)
167-
for{
168-
select {
169-
case <-tker.C:
170-
case <-manually:
171-
tker.Reset(time.Second * 10)
172-
}
173-
result := make(map[string]*web.RegisterData)
174-
addrs, e := net.LookupHost(name + "-service-headless" + "." + group)
175-
if e != nil {
176-
log.Error(nil,"[web.dns] get:", name+"-service-headless", "addrs error:", e)
177-
continue
178-
}
179-
for i := range addrs {
180-
addrs[i] = addrs[i] + ":8000"
181-
}
182-
dserver := make(map[string]struct{})
183-
dserver["dns"] = struct{}{}
184-
for _, addr := range addrs {
185-
result[addr] = &web.RegisterData{DServers: dserver}
186-
}
187-
for len(tker.C) > 0 {
188-
<-tker.C
189-
}
190-
client.UpdateDiscovery(result)
191161
}
192162
}`
193163

codegen/tml/gomod/template_gomod.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ const text = `module {{.}}
1111
go 1.17
1212
1313
require (
14-
github.com/chenjie199234/Config v0.0.25
15-
github.com/chenjie199234/Corelib v0.0.39
14+
github.com/chenjie199234/Config v0.0.26
15+
github.com/chenjie199234/Corelib v0.0.40
1616
github.com/fsnotify/fsnotify v1.5.1
1717
github.com/go-sql-driver/mysql v1.6.0
1818
github.com/segmentio/kafka-go v0.4.25

crpc/client.go

+16-16
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,10 @@ func NewCrpcClient(c *ClientConfig, selfgroup, selfname, servergroup, servername
107107
for _, cert := range c.CAs {
108108
certPEM, e := os.ReadFile(cert)
109109
if e != nil {
110-
return nil, errors.New("[web.client] read cert file:" + cert + " error:" + e.Error())
110+
return nil, errors.New("[crpc.client] read cert file:" + cert + " error:" + e.Error())
111111
}
112112
if !certpool.AppendCertsFromPEM(certPEM) {
113-
return nil, errors.New("[web.client] load cert file:" + cert + " error:" + e.Error())
113+
return nil, errors.New("[crpc.client] load cert file:" + cert + " error:" + e.Error())
114114
}
115115
}
116116
}
@@ -238,26 +238,12 @@ func (c *CrpcClient) offlinefunc(p *stream.Peer) {
238238
var errPickAgain = errors.New("[crpc.client] picked server closed")
239239

240240
func (c *CrpcClient) Call(ctx context.Context, path string, in []byte, metadata map[string]string) ([]byte, error) {
241-
start := time.Now()
242-
if c.c.GlobalTimeout != 0 {
243-
var cancel context.CancelFunc
244-
ctx, cancel = context.WithDeadline(ctx, start.Add(c.c.GlobalTimeout))
245-
defer cancel()
246-
}
247-
dl, ok := ctx.Deadline()
248-
if ok && dl.UnixNano() <= start.UnixNano()+int64(5*time.Millisecond) {
249-
return nil, cerror.ErrDeadlineExceeded
250-
}
251241
msg := &Msg{
252-
//Callid: atomic.AddUint64(&c.callid, 1),
253242
Type: MsgType_CALL,
254243
Path: path,
255244
Body: in,
256245
Metadata: metadata,
257246
}
258-
if ok {
259-
msg.Deadline = dl.UnixNano()
260-
}
261247
traceid, _, _, selfmethod, selfpath, selfdeep := trace.GetTrace(ctx)
262248
if traceid != "" {
263249
msg.Tracedata = map[string]string{
@@ -267,8 +253,22 @@ func (c *CrpcClient) Call(ctx context.Context, path string, in []byte, metadata
267253
"Deep": strconv.Itoa(selfdeep),
268254
}
269255
}
256+
if c.c.GlobalTimeout != 0 {
257+
var cancel context.CancelFunc
258+
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.c.GlobalTimeout))
259+
defer cancel()
260+
}
261+
dl, ok := ctx.Deadline()
262+
if ok {
263+
msg.Deadline = dl.UnixNano()
264+
}
270265
r := c.getreq(msg)
271266
for {
267+
start := time.Now()
268+
if ok && dl.UnixNano() <= start.UnixNano()+int64(5*time.Millisecond) {
269+
//at least 5ms for net lag and server logic
270+
return nil, cerror.ErrDeadlineExceeded
271+
}
272272
server, e := c.balancer.Pick(ctx)
273273
if e != nil {
274274
return nil, e

web/balancer.go

-172
This file was deleted.

0 commit comments

Comments
 (0)