Skip to content

Commit 4035b0a

Browse files
committed
update
1 parent 12b53b0 commit 4035b0a

File tree

11 files changed

+83
-131
lines changed

11 files changed

+83
-131
lines changed

codegen/tml/config/template_config.go

+1
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,7 @@ func initsource(path string) {
608608
continue
609609
}
610610
tempredis := redis.NewRedis(&redis.Config{
611+
RedisName: k,
611612
Username: redisc.Username,
612613
Password: redisc.Passwd,
613614
Addr: redisc.Addr,

log/log.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func write(ctx context.Context, buf *bufpool.Buffer, datas ...interface{}) {
102102
buf.Append(file)
103103
buf.Append(":")
104104
buf.Append(line)
105-
traceid, _, _, _, _, _ := trace.GetTrace(ctx)
105+
traceid, _, _, _, _ := trace.GetTrace(ctx)
106106
if traceid != "" {
107107
buf.Append(" ")
108108
buf.Append("Traceid: ")

redis/listmq.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,8 @@ func (p *Pool) ListMQSub(name string, num uint64, recvbufnum uint64, stop chan s
129129
return recv, nil
130130
}
131131

132-
const pub = `if(redis.call("EXISTS",KEYS[2])~=0)
132+
const pub = `if(redis.call("EXISTS",KEYS[2])~=0 and redis.call("EXPIRE",KEYS[1],11)~=0)
133133
then
134-
redis.call("EXPIRE",KEYS[1],11)
135134
for i=1,#ARGV,1 do
136135
redis.call("rpush",KEYS[1],ARGV[i])
137136
end

redis/redis.go

+16-21
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,25 @@ package redis
22

33
import (
44
"context"
5-
"sync"
65
"time"
76

7+
"github.com/chenjie199234/Corelib/trace"
8+
89
"github.com/gomodule/redigo/redis"
910
)
1011

1112
type Pool struct {
13+
c *Config
1214
p *redis.Pool
1315
}
1416

1517
type Conn struct {
16-
c redis.Conn
18+
c redis.Conn
19+
traceend func(error)
1720
}
1821

1922
type Config struct {
23+
RedisName string
2024
Username string
2125
Password string
2226
Addr string
@@ -29,11 +33,6 @@ type Config struct {
2933
var ErrNil = redis.ErrNil
3034
var ErrPoolExhausted = redis.ErrPoolExhausted
3135

32-
var p *sync.Pool
33-
34-
func init() {
35-
p = &sync.Pool{}
36-
}
3736
func NewRedis(c *Config) *Pool {
3837
return &Pool{
3938
p: &redis.Pool{
@@ -59,18 +58,7 @@ func NewRedis(c *Config) *Pool {
5958
},
6059
}
6160
}
62-
func getconn(conn redis.Conn) *Conn {
63-
c, ok := p.Get().(*Conn)
64-
if !ok {
65-
return &Conn{c: conn}
66-
}
67-
c.c = conn
68-
return c
69-
}
70-
func putconn(c *Conn) {
71-
c.c.Close()
72-
p.Put(c)
73-
}
61+
7462
func (p *Pool) GetRedis() *redis.Pool {
7563
return p.p
7664
}
@@ -79,7 +67,8 @@ func (p *Pool) GetContext(ctx context.Context) (*Conn, error) {
7967
if e != nil {
8068
return nil, e
8169
}
82-
return getconn(c), nil
70+
traceend := trace.TraceStart(ctx, trace.CLIENT, p.c.RedisName, p.c.Addr, "REDIS", "unknown")
71+
return &Conn{c: c, traceend: traceend}, nil
8372
}
8473
func (p *Pool) Ping(ctx context.Context) error {
8574
c, e := p.GetContext(ctx)
@@ -123,8 +112,14 @@ func (c *Conn) ReceiveContext(ctx context.Context) (interface{}, error) {
123112
return c.c.Receive()
124113
}
125114
}
115+
func (c *Conn) Err() error {
116+
return c.c.Err()
117+
}
126118
func (c *Conn) Close() {
127-
putconn(c)
119+
if c.traceend != nil {
120+
c.traceend(c.c.Err())
121+
}
122+
c.c.Close()
128123
}
129124
func Int(reply interface{}, e error) (int, error) {
130125
return redis.Int(reply, e)

rpc/client.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -490,13 +490,12 @@ func (c *RpcClient) Call(ctx context.Context, functimeout time.Duration, path st
490490
Body: in,
491491
Metadata: metadata,
492492
}
493-
traceid, _, _, frommethod, frompath, fromkind := trace.GetTrace(ctx)
493+
traceid, _, _, selfmethod, selfpath := trace.GetTrace(ctx)
494494
if traceid != "" {
495495
msg.Tracedata = map[string]string{
496-
"Traceid": traceid,
497-
"Method": frommethod,
498-
"Path": frompath,
499-
"Kind": string(fromkind),
496+
"Traceid": traceid,
497+
"SourceMethod": selfmethod,
498+
"SourcePath": selfpath,
500499
}
501500
}
502501
d, _ := proto.Marshal(msg)
@@ -554,7 +553,7 @@ func (c *RpcClient) Call(ctx context.Context, functimeout time.Duration, path st
554553
server.lker.Unlock()
555554
continue
556555
}
557-
traceend := trace.TraceStart(ctx, trace.CLIENT, c.appname, server.addr, "RPC", path, trace.RPC)
556+
traceend := trace.TraceStart(ctx, trace.CLIENT, c.appname, server.addr, "RPC", path)
558557
//send message success,store req,add req num
559558
server.reqs[msg.Callid] = r
560559
atomic.AddInt32(&server.Pickinfo.Activecalls, 1)

rpc/context.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (c *Context) WriteString(resp string) {
5555
func (c *Context) GetBody() []byte {
5656
return c.msg.Body
5757
}
58-
func (c *Context) GetSourceServer() string {
58+
func (c *Context) GetSourceApp() string {
5959
return c.peeruniquename
6060
}
6161
func (c *Context) GetPath() string {

rpc/server.go

+14-19
Original file line numberDiff line numberDiff line change
@@ -361,10 +361,10 @@ func (s *RpcServer) userfunc(p *stream.Peer, peeruniquename string, data []byte,
361361
if msg.Tracedata != nil {
362362
traceid = msg.Tracedata["Traceid"]
363363
}
364-
ctx := trace.InitTrace(nil, traceid, s.instance.GetSelfName(), host.Hostip, "rpc", msg.Path, trace.RPC)
364+
ctx := trace.InitTrace(nil, traceid, s.instance.GetSelfName(), host.Hostip, "RPC", msg.Path)
365365
//if traceid is not empty,traceid will not change
366366
//if traceid is empty,init trace will create a new traceid,use the new traceid
367-
traceid, _, _, _, _, _ = trace.GetTrace(ctx)
367+
traceid, _, _, _, _ = trace.GetTrace(ctx)
368368
handler, ok := s.handler[msg.Path]
369369
if !ok {
370370
log.Error(ctx, "[rpc.server.userfunc] client:", peeruniquename, "call path:", msg.Path, "error: unknown path")
@@ -408,28 +408,23 @@ func (s *RpcServer) userfunc(p *stream.Peer, peeruniquename string, data []byte,
408408
}
409409
}
410410
go func() {
411-
var fromapp, fromip, frommethod, frompath string
412-
var fromkind trace.KIND
411+
var sourceapp, sourceip, sourcemethod, sourcepath string
413412
if msg.Tracedata != nil {
414-
frommethod = msg.Tracedata["Method"]
415-
frompath = msg.Tracedata["Path"]
416-
fromkind = trace.KIND(msg.Tracedata["Kind"])
413+
sourcemethod = msg.Tracedata["SourceMethod"]
414+
sourcepath = msg.Tracedata["SourcePath"]
417415
}
418-
fromapp = peeruniquename[:strings.Index(peeruniquename, ":")]
419-
if fromapp == "" {
420-
fromapp = "unkown"
416+
sourceapp = peeruniquename[:strings.Index(peeruniquename, ":")]
417+
if sourceapp == "" {
418+
sourceapp = "unkown"
421419
}
422-
fromip = peeruniquename[strings.Index(peeruniquename, ":")+1 : strings.LastIndex(peeruniquename, ":")]
423-
if frommethod == "" {
424-
frommethod = "unknown"
420+
sourceip = peeruniquename[strings.Index(peeruniquename, ":")+1:]
421+
if sourcemethod == "" {
422+
sourcemethod = "unknown"
425423
}
426-
if frompath == "" {
427-
frompath = "unknown"
424+
if sourcepath == "" {
425+
sourcepath = "unknown"
428426
}
429-
if fromkind == "" {
430-
fromkind = trace.KIND("unknown")
431-
}
432-
traceend := trace.TraceStart(trace.InitTrace(nil, traceid, fromapp, fromip, frommethod, frompath, fromkind), trace.SERVER, s.instance.GetSelfName(), host.Hostip, "RPC", msg.Path, trace.RPC)
427+
traceend := trace.TraceStart(trace.InitTrace(nil, traceid, sourceapp, sourceip, sourcemethod, sourcepath), trace.SERVER, s.instance.GetSelfName(), host.Hostip, "RPC", msg.Path)
433428
//logic
434429
handler(ctx, peeruniquename, msg)
435430
traceend(cerror.ErrorstrToError(msg.Error))

trace/trace.go

+8-26
Original file line numberDiff line numberDiff line change
@@ -58,17 +58,6 @@ const (
5858
SERVER ROLE = "server"
5959
)
6060

61-
type KIND string
62-
63-
const (
64-
RPC KIND = "rpc"
65-
WEB KIND = "web"
66-
MYSQL KIND = "mysql"
67-
MONGO KIND = "mongo"
68-
REDIS KIND = "redis"
69-
KAFKA KIND = "kafka"
70-
)
71-
7261
type ACTION string
7362

7463
const (
@@ -87,36 +76,34 @@ type TraceLog struct {
8776
FromIP string `json:"from_ip"`
8877
FromMethod string `json:"from_method"`
8978
FromPath string `json:"from_path"`
90-
FromKind string `json:"from_kind"`
9179
ToApp string `json:"to_app"`
9280
ToIP string `json:"to_ip"`
9381
ToMethod string `json:"to_method"`
9482
ToPath string `json:"to_path"`
95-
ToKind string `json:"to_kind"`
9683
ErrCode int32 `json:"err_code"`
9784
ErrMsg string `json:"err_msg"`
9885
}
9986

10087
type tracekey struct{}
10188

102-
func InitTrace(ctx context.Context, traceid, app, ip, method, path string, kind KIND) context.Context {
89+
func InitTrace(ctx context.Context, traceid, app, ip, method, path string) context.Context {
10390
if ctx == nil {
10491
ctx = context.Background()
10592
}
10693
tmp := ctx.Value(tracekey{})
10794
if tmp == nil {
108-
if app == "" || ip == "" || method == "" || path == "" || kind == "" {
95+
if app == "" || ip == "" || method == "" || path == "" {
10996
panic("[trace] init error: missing params")
11097
}
11198
if traceid == "" {
11299
traceid = maketraceid()
113100
}
114-
return context.WithValue(ctx, tracekey{}, map[string]string{"Traceid": traceid, "App": app, "Ip": ip, "Method": method, "Path": path, "Kind": string(kind)})
101+
return context.WithValue(ctx, tracekey{}, map[string]string{"Traceid": traceid, "App": app, "Ip": ip, "Method": method, "Path": path})
115102
}
116103
return ctx
117104
}
118105

119-
func GetTrace(ctx context.Context) (traceid, curapp, curip, curmethod, curpath string, curkind KIND) {
106+
func GetTrace(ctx context.Context) (traceid, curapp, curip, curmethod, curpath string) {
120107
if ctx == nil {
121108
return
122109
}
@@ -130,19 +117,18 @@ func GetTrace(ctx context.Context) (traceid, curapp, curip, curmethod, curpath s
130117
curip = tracedata["Ip"]
131118
curmethod = tracedata["Method"]
132119
curpath = tracedata["Path"]
133-
curkind = KIND(tracedata["Kind"])
134120
return
135121
}
136122

137123
func CopyTrace(src context.Context) context.Context {
138124
if src == nil {
139125
return context.Background()
140126
}
141-
traceid, fromapp, fromip, frommethod, frompath, fromkind := GetTrace(src)
127+
traceid, fromapp, fromip, frommethod, frompath := GetTrace(src)
142128
if traceid == "" {
143129
return context.Background()
144130
}
145-
return InitTrace(context.Background(), traceid, fromapp, fromip, frommethod, frompath, fromkind)
131+
return InitTrace(context.Background(), traceid, fromapp, fromip, frommethod, frompath)
146132
}
147133

148134
func maketraceid() string {
@@ -151,8 +137,8 @@ func maketraceid() string {
151137
return nowstr + "_" + ranstr
152138
}
153139

154-
func TraceStart(ctx context.Context, role ROLE, toapp, toip, tomethod, topath string, tokind KIND) (TraceEnd func(e error)) {
155-
traceid, fromapp, fromip, frommethod, frompath, fromkind := GetTrace(ctx)
140+
func TraceStart(ctx context.Context, role ROLE, toapp, toip, tomethod, topath string) (TraceEnd func(e error)) {
141+
traceid, fromapp, fromip, frommethod, frompath := GetTrace(ctx)
156142
if traceid == "" {
157143
return nil
158144
}
@@ -168,12 +154,10 @@ func TraceStart(ctx context.Context, role ROLE, toapp, toip, tomethod, topath st
168154
FromIP: fromip,
169155
FromMethod: frommethod,
170156
FromPath: frompath,
171-
FromKind: string(fromkind),
172157
ToApp: toapp,
173158
ToIP: toip,
174159
ToMethod: tomethod,
175160
ToPath: topath,
176-
ToKind: string(tokind),
177161
})
178162
write(startlog)
179163
return func(e error) {
@@ -188,12 +172,10 @@ func TraceStart(ctx context.Context, role ROLE, toapp, toip, tomethod, topath st
188172
FromIP: fromip,
189173
FromMethod: frommethod,
190174
FromPath: frompath,
191-
FromKind: string(fromkind),
192175
ToApp: toapp,
193176
ToIP: toip,
194177
ToMethod: tomethod,
195178
ToPath: topath,
196-
ToKind: string(tokind),
197179
}
198180
if ee := cerror.StdErrorToError(e); ee != nil {
199181
tmp.ErrCode = ee.Code

0 commit comments

Comments
 (0)