Skip to content

Commit

Permalink
feat(kuma-dp): add a separate component to handle kuma-sidecar readin…
Browse files Browse the repository at this point in the history
…ess probes (backport of #11107) (#11239)
  • Loading branch information
kumahq[bot] authored Aug 30, 2024
1 parent 95922ae commit 4f178cd
Show file tree
Hide file tree
Showing 28 changed files with 513 additions and 15 deletions.
14 changes: 14 additions & 0 deletions app/kuma-dp/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/dnsserver"
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/envoy"
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/metrics"
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/readiness"
kuma_cmd "github.com/kumahq/kuma/pkg/cmd"
"github.com/kumahq/kuma/pkg/config"
kumadp "github.com/kumahq/kuma/pkg/config/app/kuma-dp"
Expand Down Expand Up @@ -207,6 +208,7 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command {
bootstrap, kumaSidecarConfiguration, err := rootCtx.BootstrapGenerator(gracefulCtx, opts.Config.ControlPlane.URL, opts.Config, envoy.BootstrapParams{
Dataplane: opts.Dataplane,
DNSPort: cfg.DNS.EnvoyDNSPort,
ReadinessPort: cfg.Dataplane.ReadinessPort,
EmptyDNSPort: cfg.DNS.CoreDNSEmptyPort,
EnvoyVersion: *envoyVersion,
AccessLogSocketPath: accessLogSocketPath,
Expand Down Expand Up @@ -238,6 +240,15 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command {
kumaSidecarConfiguration.Networking.IsUsingTransparentProxy,
)
components = append(components, metricsServer)

var readinessReporter *readiness.Reporter
if cfg.Dataplane.ReadinessPort > 0 {
readinessReporter = readiness.NewReporter(
bootstrap.GetAdmin().GetAddress().GetSocketAddress().GetAddress(),
cfg.Dataplane.ReadinessPort)
components = append(components, readinessReporter)
}

if err := rootCtx.ComponentManager.Add(components...); err != nil {
return err
}
Expand All @@ -250,6 +261,9 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command {
if err := envoyComponent.DrainConnections(); err != nil {
runLog.Error(err, "could not drain connections")
} else {
if readinessReporter != nil {
readinessReporter.Terminating()
}
runLog.Info("waiting for connections to be drained", "waitTime", cfg.Dataplane.DrainTime)
select {
case <-time.After(cfg.Dataplane.DrainTime.Duration):
Expand Down
1 change: 1 addition & 0 deletions app/kuma-dp/pkg/dataplane/envoy/envoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var runLog = core.Log.WithName("kuma-dp").WithName("run").WithName("envoy")
type BootstrapParams struct {
Dataplane rest.Resource
DNSPort uint32
ReadinessPort uint32
EmptyDNSPort uint32
EnvoyVersion EnvoyVersion
DynamicMetadata map[string]string
Expand Down
1 change: 1 addition & 0 deletions app/kuma-dp/pkg/dataplane/envoy/remote_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (b *remoteBootstrap) requestForBootstrap(ctx context.Context, client *http.
},
DynamicMetadata: params.DynamicMetadata,
DNSPort: params.DNSPort,
ReadinessPort: params.ReadinessPort,
EmptyDNSPort: params.EmptyDNSPort,
OperatingSystem: b.operatingSystem,
Features: b.features,
Expand Down
111 changes: 111 additions & 0 deletions app/kuma-dp/pkg/dataplane/readiness/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package readiness

import (
"context"
"fmt"
"net"
"net/http"
"sync/atomic"
"time"

"github.com/asaskevich/govalidator"
"github.com/bakito/go-log-logr-adapter/adapter"

"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/core/runtime/component"
)

const (
pathPrefixReady = "/ready"
stateReady = "READY"
stateTerminating = "TERMINATING"
)

// Reporter reports the health status of this Kuma Dataplane Proxy
type Reporter struct {
localListenAddr string
localListenPort uint32
isTerminating atomic.Bool
}

var logger = core.Log.WithName("readiness")

func NewReporter(localIPAddr string, localListenPort uint32) *Reporter {
return &Reporter{
localListenPort: localListenPort,
localListenAddr: localIPAddr,
}
}

func (r *Reporter) Start(stop <-chan struct{}) error {
protocol := "tcp"
addr := r.localListenAddr
if govalidator.IsIPv6(addr) {
protocol = "tcp6"
addr = fmt.Sprintf("[%s]", addr)
}
lis, err := net.Listen(protocol, fmt.Sprintf("%s:%d", addr, r.localListenPort))
if err != nil {
return err
}

defer func() {
_ = lis.Close()
}()

logger.Info("starting readiness reporter", "addr", lis.Addr().String())

mux := http.NewServeMux()
mux.HandleFunc(pathPrefixReady, r.handleReadiness)
server := &http.Server{
ReadHeaderTimeout: time.Second,
Handler: mux,
ErrorLog: adapter.ToStd(logger),
}

errCh := make(chan error)
go func() {
if err := server.Serve(lis); err != nil {
errCh <- err
}
}()

select {
case err := <-errCh:
return err
case <-stop:
logger.Info("stopping readiness reporter")
return server.Shutdown(context.Background())
}
}

func (r *Reporter) Terminating() {
r.isTerminating.Store(true)
}

func (r *Reporter) handleReadiness(writer http.ResponseWriter, req *http.Request) {
state := stateReady
stateHTTPStatus := http.StatusOK
if r.isTerminating.Load() {
state = stateTerminating
stateHTTPStatus = http.StatusServiceUnavailable
}

stateBytes := []byte(state)
writer.Header().Set("content-type", "text/plain")
writer.Header().Set("content-length", fmt.Sprintf("%d", len(stateBytes)))
writer.Header().Set("cache-control", "no-cache, max-age=0")
writer.Header().Set("x-powered-by", "kuma-dp")
writer.WriteHeader(stateHTTPStatus)
_, err := writer.Write(stateBytes)
logger.V(1).Info("responding readiness state", "state", state, "client", req.RemoteAddr)
if err != nil {
logger.Info("[WARNING] could not write response", "err", err)
}
}

func (r *Reporter) NeedLeaderElection() bool {
return false
}

var _ component.Component = &Reporter{}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/Masterminds/sprig/v3 v3.2.3
github.com/Nordix/simple-ipam v1.0.0
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d
github.com/bakito/go-log-logr-adapter v0.0.3-0.20231211113354-bfa42fa7e121
github.com/cilium/ebpf v0.12.3
github.com/containernetworking/cni v1.1.2
github.com/containernetworking/plugins v1.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d h1:Byv0BzEl
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
github.com/aws/aws-sdk-go v1.44.187 h1:D5CsRomPnlwDHJCanL2mtaLIcbhjiWxNh5j8zvaWdJA=
github.com/aws/aws-sdk-go v1.44.187/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/bakito/go-log-logr-adapter v0.0.3-0.20231211113354-bfa42fa7e121 h1:0mA5T6mV0/9+t7TfYAWINV3V6PNrXl0G0bnUgSIpq4E=
github.com/bakito/go-log-logr-adapter v0.0.3-0.20231211113354-bfa42fa7e121/go.mod h1:0I6DOlUOuXiVency29rZX1chhXtuZBH1p9TVzKkMw60=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down
15 changes: 11 additions & 4 deletions pkg/config/app/kuma-dp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ var DefaultConfig = func() Config {
},
},
Dataplane: Dataplane{
Mesh: "",
Name: "", // Dataplane name must be set explicitly
DrainTime: config_types.Duration{Duration: 30 * time.Second},
ProxyType: "dataplane",
Mesh: "",
Name: "", // Dataplane name must be set explicitly
DrainTime: config_types.Duration{Duration: 30 * time.Second},
ProxyType: "dataplane",
ReadinessPort: 9902,
},
DataplaneRuntime: DataplaneRuntime{
BinaryPath: "envoy",
Expand Down Expand Up @@ -129,6 +130,8 @@ type Dataplane struct {
ProxyType string `json:"proxyType,omitempty" envconfig:"kuma_dataplane_proxy_type"`
// Drain time for listeners.
DrainTime config_types.Duration `json:"drainTime,omitempty" envconfig:"kuma_dataplane_drain_time"`
// Port that exposes kuma-dp readiness status on localhost, set this value to 0 to provide readiness by "/ready" endpoint from Envoy adminAPI
ReadinessPort uint32 `json:"readinessPort,omitempty" envconfig:"kuma_readiness_port"`
}

func (d *Dataplane) PostProcess() error {
Expand Down Expand Up @@ -293,6 +296,10 @@ func (d *Dataplane) Validate() error {
errs = multierr.Append(errs, errors.Errorf(".DrainTime must be positive"))
}

if d.ReadinessPort > 65353 {
return errors.New(".ReadinessPort has to be in [0, 65353] range")
}

return errs
}

Expand Down
1 change: 1 addition & 0 deletions pkg/config/app/kuma-dp/testdata/default-config.golden.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ controlPlane:
dataplane:
drainTime: 30s
proxyType: dataplane
readinessPort: 9902
dataplaneRuntime:
binaryPath: envoy
metrics: {}
Expand Down
10 changes: 10 additions & 0 deletions pkg/core/xds/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
// Supported Envoy node metadata fields.
FieldDataplaneAdminPort = "dataplane.admin.port"
FieldDataplaneAdminAddress = "dataplane.admin.address"
FieldDataplaneReadinessPort = "dataplane.readinessReporter.port"
FieldDataplaneDNSPort = "dataplane.dns.port"
FieldDataplaneDNSEmptyPort = "dataplane.dns.empty.port"
FieldDataplaneDataplaneResource = "dataplane.resource"
Expand Down Expand Up @@ -53,6 +54,7 @@ type DataplaneMetadata struct {
Resource model.Resource
AdminPort uint32
AdminAddress string
ReadinessPort uint32
DNSPort uint32
EmptyDNSPort uint32
DynamicMetadata map[string]string
Expand Down Expand Up @@ -116,6 +118,13 @@ func (m *DataplaneMetadata) GetAdminPort() uint32 {
return m.AdminPort
}

func (m *DataplaneMetadata) GetReadinessPort() uint32 {
if m == nil {
return 0
}
return m.ReadinessPort
}

func (m *DataplaneMetadata) GetAdminAddress() string {
if m == nil {
return ""
Expand Down Expand Up @@ -164,6 +173,7 @@ func DataplaneMetadataFromXdsMetadata(xdsMetadata *structpb.Struct, tmpDir strin
}
metadata.AdminPort = uint32Metadata(xdsMetadata, FieldDataplaneAdminPort)
metadata.AdminAddress = xdsMetadata.Fields[FieldDataplaneAdminAddress].GetStringValue()
metadata.ReadinessPort = uint32Metadata(xdsMetadata, FieldDataplaneReadinessPort)
metadata.DNSPort = uint32Metadata(xdsMetadata, FieldDataplaneDNSPort)
metadata.EmptyDNSPort = uint32Metadata(xdsMetadata, FieldDataplaneDNSEmptyPort)
if value := xdsMetadata.Fields[FieldDataplaneDataplaneResource]; value != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/core/xds/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ var _ = Describe("DataplaneMetadataFromXdsMetadata", func() {
StringValue: "8001",
},
},
"dataplane.readinessReporter.port": {
Kind: &structpb.Value_StringValue{
StringValue: "9300",
},
},
"accessLogSocketPath": {
Kind: &structpb.Value_StringValue{
StringValue: "/tmp/logs",
Expand All @@ -77,6 +82,7 @@ var _ = Describe("DataplaneMetadataFromXdsMetadata", func() {
EmptyDNSPort: 8001,
AccessLogSocketPath: "/tmp/logs",
MetricsSocketPath: "/tmp/metrics",
ReadinessPort: 9300,
},
}),
Entry("should ignore dependencies version provided through metadata if version is not set at all", testCase{
Expand Down
1 change: 1 addition & 0 deletions pkg/xds/bootstrap/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (b *bootstrapGenerator) Generate(ctx context.Context, request types.Bootstr
},
DynamicMetadata: request.DynamicMetadata,
DNSPort: request.DNSPort,
ReadinessPort: request.ReadinessPort,
EmptyDNSPort: request.EmptyDNSPort,
ProxyType: request.ProxyType,
Features: request.Features,
Expand Down
1 change: 1 addition & 0 deletions pkg/xds/bootstrap/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type configParameters struct {
Service string
AdminAddress string
AdminPort uint32
ReadinessPort uint32
AdminAccessLogPath string
XdsHost string
XdsPort uint32
Expand Down
3 changes: 3 additions & 0 deletions pkg/xds/bootstrap/template_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,9 @@ func genConfig(parameters configParameters, proxyConfig xds.Proxy, enableReloada
if parameters.EmptyDNSPort != 0 {
res.Node.Metadata.Fields[core_xds.FieldDataplaneDNSEmptyPort] = util_proto.MustNewValueForStruct(strconv.Itoa(int(parameters.EmptyDNSPort)))
}
if parameters.ReadinessPort != 0 {
res.Node.Metadata.Fields[core_xds.FieldDataplaneReadinessPort] = util_proto.MustNewValueForStruct(strconv.Itoa(int(parameters.ReadinessPort)))
}
if parameters.ProxyType != "" {
res.Node.Metadata.Fields[core_xds.FieldDataplaneProxyType] = util_proto.MustNewValueForStruct(parameters.ProxyType)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/xds/bootstrap/types/bootstrap_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type BootstrapRequest struct {
CaCert string `json:"caCert"`
DynamicMetadata map[string]string `json:"dynamicMetadata"`
DNSPort uint32 `json:"dnsPort,omitempty"`
ReadinessPort uint32 `json:"readinessPort,omitempty"`
EmptyDNSPort uint32 `json:"emptyDnsPort,omitempty"`
OperatingSystem string `json:"operatingSystem"`
Features []string `json:"features"`
Expand Down
4 changes: 4 additions & 0 deletions pkg/xds/envoy/names/resource_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func GetMetricsHijackerClusterName() string {
return Join("kuma", "metrics", "hijacker")
}

func GetDPPReadinessClusterName() string {
return Join("kuma", "readiness")
}

func GetPrometheusListenerName() string {
return Join("kuma", "metrics", "prometheus")
}
Expand Down
Loading

0 comments on commit 4f178cd

Please sign in to comment.