Skip to content

Commit

Permalink
cmd/contour: replace current xds server with go-control-plane impl
Browse files Browse the repository at this point in the history
Fixes projectcontour#2134 by replacing the current XDS server implementation with the envoyproxy/go-control-plane impl.

Signed-off-by: Steve Sloka <[email protected]>
  • Loading branch information
stevesloka committed Jul 10, 2020
1 parent d835474 commit 932aa91
Show file tree
Hide file tree
Showing 14 changed files with 115 additions and 822 deletions.
53 changes: 37 additions & 16 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"fmt"
"net"
"net/http"
Expand All @@ -23,6 +24,10 @@ import (
"syscall"
"time"

api "github.com/envoyproxy/go-control-plane/envoy/api/v2"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/envoyproxy/go-control-plane/pkg/cache/v2"
xds "github.com/envoyproxy/go-control-plane/pkg/server/v2"
"github.com/projectcontour/contour/internal/annotation"
"github.com/projectcontour/contour/internal/contour"
"github.com/projectcontour/contour/internal/dag"
Expand Down Expand Up @@ -172,6 +177,9 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
return err
}

// Setup snapshot cache
snapshotCache := cache.NewSnapshotCache(true, cache.IDHash{}, nil)

listenerConfig := contour.ListenerVisitorConfig{
UseProxyProto: ctx.useProxyProto,
HTTPAddress: ctx.httpAddr,
Expand All @@ -196,14 +204,16 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {

listenerConfig.DefaultHTTPVersions = defaultHTTPVersions

ch := &contour.CacheHandler{
ListenerVisitorConfig: listenerConfig,
ListenerCache: contour.NewListenerCache(ctx.statsAddr, ctx.statsPort),
FieldLogger: log.WithField("context", "CacheHandler"),
Metrics: metrics.NewMetrics(registry),
}

// step 3. build our mammoth Kubernetes event handler.
eventHandler := &contour.EventHandler{
CacheHandler: &contour.CacheHandler{
ListenerVisitorConfig: listenerConfig,
ListenerCache: contour.NewListenerCache(ctx.statsAddr, ctx.statsPort),
FieldLogger: log.WithField("context", "CacheHandler"),
Metrics: metrics.NewMetrics(registry),
},
CacheHandler: ch,
HoldoffDelay: 100 * time.Millisecond,
HoldoffMaxDelay: 500 * time.Millisecond,
Builder: dag.Builder{
Expand Down Expand Up @@ -262,6 +272,15 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
FieldLogger: log.WithField("context", "endpointstranslator"),
}

snapshotHandler := &contour.SnapshotHandler{
CacheHandler: ch,
EndpointsTranslator: et,
SnapshotCache: snapshotCache,
}

ch.SnapshotHandler = snapshotHandler
et.SnapshotHandler = snapshotHandler

informerSyncList.InformOnResources(clusterInformerFactory,
&k8s.DynamicClientHandler{
Next: &contour.EventRecorder{
Expand Down Expand Up @@ -387,15 +406,17 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
}
log.Printf("informer caches synced")

resources := map[string]cgrpc.Resource{
eventHandler.CacheHandler.ClusterCache.TypeURL(): &eventHandler.CacheHandler.ClusterCache,
eventHandler.CacheHandler.RouteCache.TypeURL(): &eventHandler.CacheHandler.RouteCache,
eventHandler.CacheHandler.ListenerCache.TypeURL(): &eventHandler.CacheHandler.ListenerCache,
eventHandler.CacheHandler.SecretCache.TypeURL(): &eventHandler.CacheHandler.SecretCache,
et.TypeURL(): et,
}
opts := ctx.grpcOptions()
s := cgrpc.NewAPI(log, resources, registry, opts...)
grpcServer := cgrpc.NewAPI(registry, opts...)

xdsServer := xds.NewServer(context.Background(), snapshotCache, nil)

discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, xdsServer)
api.RegisterEndpointDiscoveryServiceServer(grpcServer, xdsServer)
api.RegisterClusterDiscoveryServiceServer(grpcServer, xdsServer)
api.RegisterRouteDiscoveryServiceServer(grpcServer, xdsServer)
api.RegisterListenerDiscoveryServiceServer(grpcServer, xdsServer)

addr := net.JoinHostPort(ctx.xdsAddr, strconv.Itoa(ctx.xdsPort))
l, err := net.Listen("tcp", addr)
if err != nil {
Expand All @@ -412,10 +433,10 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {

go func() {
<-stop
s.Stop()
grpcServer.Stop()
}()

return s.Serve(l)
return grpcServer.Serve(l)
})

// step 14. Setup SIGTERM handler
Expand Down
4 changes: 4 additions & 0 deletions internal/contour/cachehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type CacheHandler struct {
ClusterCache
SecretCache

SnapshotHandler *SnapshotHandler

*metrics.Metrics

logrus.FieldLogger
Expand All @@ -47,6 +49,8 @@ func (ch *CacheHandler) OnChange(dag *dag.DAG) {
ch.updateRoutes(dag)
ch.updateClusters(dag)

ch.SnapshotHandler.UpdateSnapshot()

ch.SetDAGLastRebuilt(time.Now())
}

Expand Down
28 changes: 2 additions & 26 deletions internal/contour/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ import (
"sort"
"sync"

resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2"

v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/proto"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/projectcontour/contour/internal/dag"
"github.com/projectcontour/contour/internal/envoy"
"github.com/projectcontour/contour/internal/protobuf"
Expand All @@ -31,7 +29,6 @@ import (
type ClusterCache struct {
mu sync.Mutex
values map[string]*v2.Cluster
Cond
}

// Update replaces the contents of the cache with the supplied map.
Expand All @@ -40,11 +37,10 @@ func (c *ClusterCache) Update(v map[string]*v2.Cluster) {
defer c.mu.Unlock()

c.values = v
c.Cond.Notify()
}

// Contents returns a copy of the cache's contents.
func (c *ClusterCache) Contents() []proto.Message {
func (c *ClusterCache) Contents() []types.Resource {
c.mu.Lock()
defer c.mu.Unlock()
var values []*v2.Cluster
Expand All @@ -55,26 +51,6 @@ func (c *ClusterCache) Contents() []proto.Message {
return protobuf.AsMessages(values)
}

func (c *ClusterCache) Query(names []string) []proto.Message {
c.mu.Lock()
defer c.mu.Unlock()
var values []*v2.Cluster
for _, n := range names {
// if the cluster is not registered we cannot return
// a blank cluster because each cluster has a required
// discovery type; DNS, EDS, etc. We cannot determine the
// correct value for this property from the cluster's name
// provided by the query so we must not return a blank cluster.
if v, ok := c.values[n]; ok {
values = append(values, v)
}
}
sort.Stable(sorter.For(values))
return protobuf.AsMessages(values)
}

func (*ClusterCache) TypeURL() string { return resource.ClusterType }

type clusterVisitor struct {
clusters map[string]*v2.Cluster
}
Expand Down
94 changes: 0 additions & 94 deletions internal/contour/cond.go

This file was deleted.

108 changes: 0 additions & 108 deletions internal/contour/cond_test.go

This file was deleted.

Loading

0 comments on commit 932aa91

Please sign in to comment.