Skip to content

Commit

Permalink
use a separate snapshot cache for EDS (#6250)
Browse files Browse the repository at this point in the history
- Triggers only EDS updates when endpoints change
- Does not trigger EDS updates when only non-endpoints change

Updates #2134.

Signed-off-by: Steve Kriss <[email protected]>
  • Loading branch information
skriss authored Mar 7, 2024
1 parent 31ad9ba commit a740314
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 49 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/6250-skriss-small.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
In the `envoy` go-control-plane xDS server, use a separate snapshot cache for Endpoints, to minimize the amount of unnecessary xDS traffic generated.
9 changes: 2 additions & 7 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/alecthomas/kingpin/v2"
envoy_cache_v3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
envoy_server_v3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -510,11 +509,7 @@ func (s *Server) doServe() error {
var snapshotHandler *xdscache_v3.SnapshotHandler

if contourConfiguration.XDSServer.Type == contour_v1alpha1.EnvoyServerType {
snapshotHandler = xdscache_v3.NewSnapshotHandler(
resources,
envoy_cache_v3.NewSnapshotCache(false, &contour_xds_v3.Hash, s.log.WithField("context", "snapshotCache")),
s.log.WithField("context", "snapshotHandler"),
)
snapshotHandler = xdscache_v3.NewSnapshotHandler(resources, s.log.WithField("context", "snapshotHandler"))

// register observer for endpoints updates.
endpointHandler.SetObserver(contour.ComposeObservers(snapshotHandler))
Expand Down Expand Up @@ -929,7 +924,7 @@ func (x *xdsServer) Start(ctx context.Context) error {

switch x.config.Type {
case contour_v1alpha1.EnvoyServerType:
contour_xds_v3.RegisterServer(envoy_server_v3.NewServer(ctx, x.snapshotHandler.SnapshotCache, contour_xds_v3.NewRequestLoggingCallbacks(log)), grpcServer)
contour_xds_v3.RegisterServer(envoy_server_v3.NewServer(ctx, x.snapshotHandler.GetCache(), contour_xds_v3.NewRequestLoggingCallbacks(log)), grpcServer)
case contour_v1alpha1.ContourServerType:
contour_xds_v3.RegisterServer(contour_xds_v3.NewContourServer(log, xdscache.ResourcesOf(x.resources)...), grpcServer)
default:
Expand Down
17 changes: 6 additions & 11 deletions internal/protobuf/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package protobuf

import (
"reflect"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand All @@ -42,21 +40,18 @@ func UInt32OrNil(val uint32) *wrapperspb.UInt32Value {
}
}

// AsMessages casts the given slice of values (that implement the proto.Message
// AsMessages converts the given slice of values (that implement the proto.Message
// interface) to a slice of proto.Message. If the length of the slice is 0, it
// returns nil.
func AsMessages(messages any) []proto.Message {
v := reflect.ValueOf(messages)
if v.Len() == 0 {
func AsMessages[T proto.Message](messages []T) []proto.Message {
if len(messages) == 0 {
return nil
}

protos := make([]proto.Message, v.Len())

for i := range protos {
protos[i] = v.Index(i).Interface().(proto.Message)
protos := make([]proto.Message, len(messages))
for i, message := range messages {
protos[i] = message
}

return protos
}

Expand Down
19 changes: 19 additions & 0 deletions internal/protobuf/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package protobuf
import (
"testing"

envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/wrapperspb"
)

Expand All @@ -29,3 +31,20 @@ func TestU32Default(t *testing.T) {
assert.Equal(t, wrapperspb.UInt32(99), UInt32OrDefault(0, 99))
assert.Equal(t, wrapperspb.UInt32(1), UInt32OrDefault(1, 99))
}

func TestAsMessages(t *testing.T) {
assert.Nil(t, AsMessages([]*envoy_config_cluster_v3.Cluster{}))

in := []*envoy_config_cluster_v3.Cluster{
{Name: "cluster-1"},
{Name: "cluster-2"},
{Name: "cluster-3"},
{Name: "cluster-4"},
}
out := AsMessages(in)

require.Len(t, out, len(in))
for i := range in {
assert.EqualValues(t, in[i], out[i])
}
}
3 changes: 3 additions & 0 deletions internal/xdscache/v3/endpointslicetranslator.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ func (e *EndpointSliceTranslator) OnChange(root *dag.DAG) {
if changed {
e.Debug("cluster load assignments changed, notifying waiters")
e.Notify()
if e.Observer != nil {
e.Observer.Refresh()
}
} else {
e.Debug("cluster load assignments did not change")
}
Expand Down
3 changes: 3 additions & 0 deletions internal/xdscache/v3/endpointstranslator.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ func (e *EndpointsTranslator) OnChange(root *dag.DAG) {
if changed {
e.Debug("cluster load assignments changed, notifying waiters")
e.Notify()
if e.Observer != nil {
e.Observer.Refresh()
}
} else {
e.Debug("cluster load assignments did not change")
}
Expand Down
107 changes: 76 additions & 31 deletions internal/xdscache/v3/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,60 +15,106 @@ package v3

import (
"context"
"reflect"

envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types"
envoy_cache_v3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
envoy_resource_v3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"

"github.com/projectcontour/contour/internal/dag"
contour_xds_v3 "github.com/projectcontour/contour/internal/xds/v3"
"github.com/projectcontour/contour/internal/xdscache"
)

// SnapshotHandler responds to DAG builds via the OnChange()
// event and generates and caches go-control-plane Snapshots.
// event and Endpoint updates via the Refresh() event and
// generates and caches go-control-plane Snapshots.
type SnapshotHandler struct {
// SnapshotCache contains go-control-plane Snapshots
// and is used by the go-control-plane xDS server.
SnapshotCache envoy_cache_v3.SnapshotCache

// resources contains the Contour xDS resource caches.
resources map[envoy_resource_v3.Type]xdscache.ResourceCache
log logrus.FieldLogger
resources map[envoy_resource_v3.Type]xdscache.ResourceCache
defaultCache envoy_cache_v3.SnapshotCache
edsCache envoy_cache_v3.SnapshotCache
mux *envoy_cache_v3.MuxCache
log logrus.FieldLogger
}

// NewSnapshotHandler returns an instance of SnapshotHandler.
func NewSnapshotHandler(resources []xdscache.ResourceCache, snapshotCache envoy_cache_v3.SnapshotCache, logger logrus.FieldLogger) *SnapshotHandler {
return &SnapshotHandler{
resources: parseResources(resources),
SnapshotCache: snapshotCache,
log: logger,
func NewSnapshotHandler(resources []xdscache.ResourceCache, log logrus.FieldLogger) *SnapshotHandler {
var (
defaultCache = envoy_cache_v3.NewSnapshotCache(false, &contour_xds_v3.Hash, log.WithField("context", "defaultCache"))
edsCache = envoy_cache_v3.NewSnapshotCache(false, &contour_xds_v3.Hash, log.WithField("context", "edsCache"))

mux = &envoy_cache_v3.MuxCache{
Caches: map[string]envoy_cache_v3.Cache{
envoy_resource_v3.ListenerType: defaultCache,
envoy_resource_v3.ClusterType: defaultCache,
envoy_resource_v3.RouteType: defaultCache,
envoy_resource_v3.SecretType: defaultCache,
envoy_resource_v3.RuntimeType: defaultCache,
envoy_resource_v3.EndpointType: edsCache,
},
Classify: func(req *envoy_service_discovery_v3.DiscoveryRequest) string {
return req.GetTypeUrl()
},
ClassifyDelta: func(dr *envoy_cache_v3.DeltaRequest) string {
return dr.GetTypeUrl()
},
}
)

sh := &SnapshotHandler{
resources: parseResources(resources),
defaultCache: defaultCache,
edsCache: edsCache,
mux: mux,
log: log,
}

// Trigger an initial snapshot, based on any static values
// present in the resource caches.
sh.OnChange(nil)

return sh
}

// GetCache returns the MuxCache, which multiplexes requests across
// underlying caches.
func (s *SnapshotHandler) GetCache() envoy_cache_v3.Cache {
return s.mux
}

// Refresh is called when the EndpointsTranslator updates values
// in its cache.
// in its cache. It updates the EDS cache.
func (s *SnapshotHandler) Refresh() {
s.generateNewSnapshot()
version := uuid.NewString()

resources := map[envoy_resource_v3.Type][]envoy_types.Resource{
envoy_resource_v3.EndpointType: asResources(s.resources[envoy_resource_v3.EndpointType].Contents()),
}

snapshot, err := envoy_cache_v3.NewSnapshot(version, resources)
if err != nil {
s.log.Errorf("failed to generate snapshot version %q: %s", version, err)
return
}

if err := s.edsCache.SetSnapshot(context.Background(), contour_xds_v3.Hash.String(), snapshot); err != nil {
s.log.Errorf("failed to store snapshot version %q: %s", version, err)
return
}
}

// OnChange is called when the DAG is rebuilt and a new snapshot is needed.
// It creates and caches a new go-control-plane Snapshot based on the
// contents of the Contour xDS resource caches.
func (s *SnapshotHandler) OnChange(*dag.DAG) {
s.generateNewSnapshot()
}

// generateNewSnapshot creates and caches a new go-control-plane
// Snapshot based on the contents of the Contour xDS resource caches.
func (s *SnapshotHandler) generateNewSnapshot() {
// Generate new snapshot version.
version := uuid.NewString()

// Convert caches to envoy xDS Resources.
resources := map[envoy_resource_v3.Type][]envoy_types.Resource{
envoy_resource_v3.EndpointType: asResources(s.resources[envoy_resource_v3.EndpointType].Contents()),
envoy_resource_v3.ClusterType: asResources(s.resources[envoy_resource_v3.ClusterType].Contents()),
envoy_resource_v3.RouteType: asResources(s.resources[envoy_resource_v3.RouteType].Contents()),
envoy_resource_v3.ListenerType: asResources(s.resources[envoy_resource_v3.ListenerType].Contents()),
Expand All @@ -82,25 +128,24 @@ func (s *SnapshotHandler) generateNewSnapshot() {
return
}

if err := s.SnapshotCache.SetSnapshot(context.Background(), contour_xds_v3.Hash.String(), snapshot); err != nil {
if err := s.defaultCache.SetSnapshot(context.Background(), contour_xds_v3.Hash.String(), snapshot); err != nil {
s.log.Errorf("failed to store snapshot version %q: %s", version, err)
return
}
}

// asResources casts the given slice of values (that implement the envoy_types.Resource
// asResources converts the given slice of values (that implement the envoy_types.Resource
// interface) to a slice of envoy_types.Resource. If the length of the slice is 0, it
// returns nil.
func asResources(messages any) []envoy_types.Resource {
v := reflect.ValueOf(messages)
if v.Len() == 0 {
func asResources[T proto.Message](messages []T) []envoy_types.Resource {
if len(messages) == 0 {
return nil
}

protos := make([]envoy_types.Resource, v.Len())
protos := make([]envoy_types.Resource, len(messages))

for i := range protos {
protos[i] = v.Index(i).Interface().(envoy_types.Resource)
for i, resource := range messages {
protos[i] = resource
}

return protos
Expand Down

0 comments on commit a740314

Please sign in to comment.