Skip to content

Commit 1d2a8ff

Browse files
committed
feat: adaptation for gRPC
1 parent d7b32c4 commit 1d2a8ff

File tree

1 file changed

+75
-0
lines changed

1 file changed

+75
-0
lines changed

pkg/adapters/grpc/traffic.go

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package grpc
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"github.com/alibaba/sentinel-golang/core/route"
8+
"github.com/alibaba/sentinel-golang/core/route/base"
9+
"google.golang.org/grpc"
10+
"net"
11+
"strings"
12+
)
13+
14+
var (
15+
connToBaggage map[string]map[string]string = make(map[string]map[string]string)
16+
cm *route.ClusterManager = nil
17+
)
18+
19+
type Baggage map[string]string
20+
21+
func NewDialer(b Baggage) func(context.Context, string) (net.Conn, error) {
22+
return func(ctx context.Context, addr string) (net.Conn, error) {
23+
parts := strings.Split(addr, "/")
24+
if len(parts) != 2 {
25+
return nil, errors.New("invalid address format")
26+
}
27+
tc := &base.TrafficContext{
28+
ServiceName: parts[0],
29+
MethodName: parts[1],
30+
Headers: make(map[string]string),
31+
}
32+
33+
instance, err := cm.GetOne(tc)
34+
35+
if err != nil {
36+
return nil, err
37+
}
38+
if instance == nil {
39+
return nil, errors.New("no matched provider")
40+
}
41+
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%v", instance.Host, instance.Port))
42+
if err != nil {
43+
return nil, err
44+
}
45+
b = tc.Baggage
46+
47+
return conn, nil
48+
}
49+
}
50+
51+
func NewTrafficUnaryIntercepter(baggage Baggage) grpc.DialOption {
52+
return grpc.WithUnaryInterceptor(
53+
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
54+
newCtx := ctx
55+
// TODO modify the request with baggage
56+
return invoker(newCtx, method, req, reply, cc, opts...)
57+
})
58+
}
59+
60+
func NewTrafficStreamIntercepter(baggage Baggage) grpc.DialOption {
61+
return grpc.WithStreamInterceptor(
62+
func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
63+
newCtx := ctx
64+
// TODO modify the request with baggage
65+
return streamer(newCtx, desc, cc, method, opts...)
66+
})
67+
}
68+
69+
func Dial(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
70+
var b Baggage = make(map[string]string)
71+
opts = append(opts, grpc.WithContextDialer(NewDialer(b)))
72+
opts = append(opts, NewTrafficUnaryIntercepter(b))
73+
opts = append(opts, NewTrafficStreamIntercepter(b))
74+
return grpc.Dial(addr, opts...)
75+
}

0 commit comments

Comments
 (0)