Skip to content

Commit 9728156

Browse files
committed
apiserver: forward request with prefix path or header
Signed-off-by: Iceber Gu <[email protected]>
1 parent c5e31ab commit 9728156

File tree

3 files changed

+62
-29
lines changed

3 files changed

+62
-29
lines changed

pkg/kubeapiserver/apiserver.go

+35-18
Original file line numberDiff line numberDiff line change
@@ -130,30 +130,15 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
130130

131131
delegate := delegationTarget.UnprotectedHandler()
132132
if delegate == nil {
133+
// To prevent anomalous requests from being sent to member clusters due to improper usage,
134+
// we do not allow proxying requests with unmatched prefixes to member clusters.
133135
delegate = http.NotFoundHandler()
134136
}
135137

136138
restManager := NewRESTManager(c.GenericConfig.Serializer, runtime.ContentTypeJSON, c.StorageFactory, c.InitialAPIGroupResources)
137139
discoveryManager := discovery.NewDiscoveryManager(c.GenericConfig.Serializer, restManager, delegate)
138140

139-
// handle root discovery request
140-
genericserver.Handler.NonGoRestfulMux.Handle("/api", discoveryManager)
141-
genericserver.Handler.NonGoRestfulMux.Handle("/apis", discoveryManager)
142-
143-
resourceHandler := &ResourceHandler{
144-
minRequestTimeout: time.Duration(c.GenericConfig.MinRequestTimeout) * time.Second,
145-
146-
delegate: delegate,
147-
rest: restManager,
148-
discovery: discoveryManager,
149-
clusterLister: c.InformerFactory.Cluster().V1alpha2().PediaClusters().Lister(),
150-
}
151-
genericserver.Handler.NonGoRestfulMux.HandlePrefix("/api/", resourceHandler)
152-
genericserver.Handler.NonGoRestfulMux.HandlePrefix("/apis/", resourceHandler)
153-
154141
clusterInformer := c.InformerFactory.Cluster().V1alpha2().PediaClusters()
155-
_ = NewClusterResourceController(restManager, discoveryManager, clusterInformer)
156-
157142
connector := proxyrest.NewProxyConnector(clusterInformer.Lister(), c.ExtraConfig.AllowPediaClusterConfigReuse, c.ExtraConfig.ExtraProxyRequestHeaderPrefixes)
158143

159144
methodSet := sets.New("GET")
@@ -181,8 +166,30 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
181166
methods = append(methods, m)
182167
}
183168
}
169+
proxy := proxyrest.NewRemoteProxyREST(c.GenericConfig.Serializer, connector)
170+
171+
// forward request
172+
genericserver.Handler.NonGoRestfulMux.HandlePrefix("/proxy/", http.StripPrefix("/proxy", proxy))
173+
174+
// handle root discovery request
175+
discoveryHandler := WrapForwardRequestHandler(discoveryManager, proxy)
176+
genericserver.Handler.NonGoRestfulMux.Handle("/api", discoveryHandler)
177+
genericserver.Handler.NonGoRestfulMux.Handle("/apis", discoveryHandler)
184178

185-
resourceHandler.proxy = proxyrest.NewRemoteProxyREST(c.GenericConfig.Serializer, connector)
179+
resourceHandler := &ResourceHandler{
180+
minRequestTimeout: time.Duration(c.GenericConfig.MinRequestTimeout) * time.Second,
181+
182+
delegate: delegate,
183+
proxy: proxy,
184+
rest: restManager,
185+
discovery: discoveryManager,
186+
clusterLister: c.InformerFactory.Cluster().V1alpha2().PediaClusters().Lister(),
187+
}
188+
189+
genericserver.Handler.NonGoRestfulMux.HandlePrefix("/api/", resourceHandler)
190+
genericserver.Handler.NonGoRestfulMux.HandlePrefix("/apis/", resourceHandler)
191+
192+
_ = NewClusterResourceController(restManager, discoveryManager, clusterInformer)
186193
return genericserver, methods, nil
187194
}
188195

@@ -228,3 +235,13 @@ func (r wrapRequestInfoResolverForNamespace) NewRequestInfo(req *http.Request) (
228235
}
229236
return info, nil
230237
}
238+
239+
func WrapForwardRequestHandler(handler http.Handler, proxy http.Handler) http.Handler {
240+
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
241+
if HasForwardRequestHeader(req) {
242+
proxy.ServeHTTP(rw, req)
243+
return
244+
}
245+
handler.ServeHTTP(rw, req)
246+
})
247+
}

pkg/kubeapiserver/resource_handler.go

+17-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"net/http"
7+
"strings"
78
"time"
89

910
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -44,8 +45,13 @@ func (r *ResourceHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
4445
return
4546
}
4647

48+
shouldForwardRequest := HasForwardRequestHeader(req)
4749
// handle discovery request
4850
if !requestInfo.IsResourceRequest {
51+
if shouldForwardRequest {
52+
r.proxy.ServeHTTP(w, req)
53+
return
54+
}
4955
r.discovery.ServeHTTP(w, req)
5056
return
5157
}
@@ -80,11 +86,16 @@ func (r *ResourceHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
8086
return
8187
}
8288

89+
if shouldForwardRequest {
90+
r.proxy.ServeHTTP(w, req)
91+
return
92+
}
93+
8394
resource, reqScope, storage, existed := r.rest.GetResourceREST(gvr, requestInfo.Subresource)
8495
if !existed {
8596
// TODO(iceber): Add the specialized error for subresources
8697
err := fmt.Errorf("not found request scope or resource storage")
87-
klog.ErrorS(err, "Failed to handle resource request", "resource", gvr)
98+
klog.ErrorS(err, "Failed to handle resource request", "resource", gvr, "subresource", requestInfo.Subresource)
8899
responsewriters.ErrorNegotiated(
89100
apierrors.NewInternalError(err),
90101
Codecs, gvr.GroupVersion(), w, req,
@@ -168,3 +179,8 @@ func checkClusterAndWarning(ctx context.Context, cluster *clusterv1alpha2.PediaC
168179
warning.AddWarning(ctx, "", msg)
169180
}
170181
}
182+
183+
func HasForwardRequestHeader(req *http.Request) bool {
184+
value := req.Header.Get("x-clusterpedia-forward")
185+
return strings.ToLower(value) == "true"
186+
}

pkg/kubeapiserver/resourcerest/proxy/proxy.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,17 @@ package proxy
22

33
import (
44
"context"
5-
"errors"
65
"net/http"
76
"net/url"
87

8+
apierrors "k8s.io/apimachinery/pkg/api/errors"
99
"k8s.io/apimachinery/pkg/runtime"
1010
"k8s.io/apimachinery/pkg/runtime/schema"
1111
utilnet "k8s.io/apimachinery/pkg/util/net"
1212
"k8s.io/apimachinery/pkg/util/proxy"
1313
auditinternal "k8s.io/apiserver/pkg/apis/audit"
1414
"k8s.io/apiserver/pkg/audit"
1515
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
16-
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
1716

1817
"github.com/clusterpedia-io/clusterpedia/pkg/utils/request"
1918
)
@@ -46,12 +45,7 @@ func (r *RemoteProxyREST) Error(w http.ResponseWriter, req *http.Request, err er
4645
func proxyConn(ctx context.Context, connGetter ClusterConnectionGetter, upgradeRequired bool, responder proxy.ErrorResponder, wrapProxy func(*proxy.UpgradeAwareHandler) http.Handler) (http.Handler, error) {
4746
clusterName := request.ClusterNameValue(ctx)
4847
if clusterName == "" {
49-
return nil, errors.New("missing cluster")
50-
}
51-
52-
requestInfo, ok := genericrequest.RequestInfoFrom(ctx)
53-
if !ok {
54-
return nil, errors.New("missing RequestInfo")
48+
return nil, apierrors.NewBadRequest("please specify the cluster name when using proxy model.")
5549
}
5650

5751
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
@@ -62,12 +56,18 @@ func proxyConn(ctx context.Context, connGetter ClusterConnectionGetter, upgradeR
6256
return
6357
}
6458

65-
target, err := url.ParseRequestURI(endpoint + requestInfo.Path)
59+
target, err := url.ParseRequestURI(endpoint + req.URL.Path)
6660
if err != nil {
6761
responder.Error(rw, req, err)
6862
return
6963
}
70-
target.RawQuery = request.RequestQueryFrom(ctx).Encode()
64+
65+
// First get URL Query from context.Context
66+
if query := request.RequestQueryFrom(ctx); query != nil {
67+
target.RawQuery = query.Encode()
68+
} else {
69+
target.RawQuery = req.URL.RawQuery
70+
}
7171

7272
proxy := proxy.NewUpgradeAwareHandler(target, transport, false, upgradeRequired, responder)
7373
proxy.UseLocationHost = true

0 commit comments

Comments
 (0)