Skip to content

Commit

Permalink
implement dns-propagation-prober
Browse files Browse the repository at this point in the history
  • Loading branch information
besher-massri committed Apr 18, 2024
1 parent 9250e59 commit f490a78
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 2 deletions.
2 changes: 1 addition & 1 deletion util-images/probes/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
PROJECT = k8s-testimages
IMG = gcr.io/$(PROJECT)/probes
TAG = v0.0.5
TAG = v0.0.6

all: push

Expand Down
5 changes: 5 additions & 0 deletions util-images/probes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ This probe doesn't export any metrics, it's needed for the **Ping Client** to wo
go run cmd/main.go --mode=ping-server --metric-bind-address=:8070 --ping-server-bind-address=0.0.0.0:8081 --stderrthreshold=INFO
```

### DNS Propagation
This probe exports the `dns_propagation_seconds` metric. It measures the DNS propagation time for a given StatefulSet. This is calculated as the average difference between each pod's "running" and "discoverable" timestamps.

#### Running locally
This probe is designed to run within a Kubernetes cluster and cannot be executed locally.

## Building and Releasing

Expand Down
5 changes: 4 additions & 1 deletion util-images/probes/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package main

import (
"flag"
"k8s.io/perf-tests/util-images/probes/pkg/kubeclient"
"net/http"
_ "net/http/pprof"

"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/klog"
"k8s.io/perf-tests/util-images/probes/pkg/dns"
"k8s.io/perf-tests/util-images/probes/pkg/dnspropagation"
"k8s.io/perf-tests/util-images/probes/pkg/kubeclient"
pingclient "k8s.io/perf-tests/util-images/probes/pkg/ping/client"
pingserver "k8s.io/perf-tests/util-images/probes/pkg/ping/server"
)
Expand All @@ -51,6 +52,8 @@ func main() {
pingserver.Run(pingserver.NewDefaultConfig())
case "dns":
dns.Run()
case "dns-propagation":
dnspropagation.Run()
case "kubeclient":
kubeclient.Run(kubeclient.NewDefaultConfig())
default:
Expand Down
1 change: 1 addition & 0 deletions util-images/probes/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.13

require (
github.com/prometheus/client_golang v0.9.2
k8s.io/apimachinery v0.18.0
k8s.io/client-go v0.18.0
k8s.io/klog v1.0.0
)
41 changes: 41 additions & 0 deletions util-images/probes/pkg/dnspropagation/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dnspropagation

import (
"github.com/prometheus/client_golang/prometheus"
"k8s.io/perf-tests/util-images/probes/pkg/common"
)

var (
// DNSPropagationSeconds denotes the propagation time of a given StatefulSet, calculated as the average difference between the pod's "running" and "discoverable" timestamps.
DNSPropagationSeconds = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: common.ProbeNamespace,
Name: "dns_propagation_seconds",
Help: "Gauge of the time (in seconds) it took for the pods in a statefulSet to be discoverable after they start running.",
}, []string{"namespace", "service", "podName"})
// DNSPropagationCount denotes the number of DNS propagation checks performed.
DNSPropagationCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: common.ProbeNamespace,
Name: "dns_propagation_count",
Help: "Counter of the number of DNS propagation checks performed.",
}, []string{"namespace", "service", "podName"})
)

func init() {
prometheus.MustRegister(DNSPropagationSeconds, DNSPropagationCount)
}
221 changes: 221 additions & 0 deletions util-images/probes/pkg/dnspropagation/probe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dnspropagation

import (
"context"
"errors"
"flag"
"fmt"
"math"
"math/rand"
"net"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog"
)

var (
statefulSet = flag.String("dns-propagation-probe-stateful-set", "", "Name of the statefulSet workload")
service = flag.String("dns-propagation-probe-service", "", "Name of the headless service that exposes the statefulSet resides")
namespace = flag.String("dns-propagation-probe-namespace", "default", "The namespace where the statefulSet resides")
clusterDomain = flag.String("dns-propagation-probe-cluster-domain", "cluster", "Name of cluster domain where the statefulSet resides")
suffix = flag.String("dns-propagation-probe-suffix", "local", "DNS label suffix")
interval = flag.Duration("dns-propagation-probe-interval", 100*time.Millisecond, "Interval between DNS lookups")
podCount = flag.Int("dns-propagation-probe-pod-count", 0, "Number of pods in the statefulSet")
sampleCount = flag.Int("dns-propagation-probe-sample-count", 0, "Number of pods to test dns propagation against in the statefulSet, defaults to min(100, Ceil(SQRT(podCount))")
)

type DNSPodPropagationResult struct {
podName string
duration time.Duration
}

// Run is the entry function for the probe.
func Run() {
if *statefulSet == "" {
klog.Fatal("--dns-propagation-probe-stateful-set has not been set")
}
if *service == "" {
klog.Fatal("--dns-propagation-probe-service-set has not been set")
}
if *podCount <= 0 {
klog.Fatal("--dns-propagation-probe-pod-count has not been set or is not a positive number")
}
if *sampleCount <= 0 {
f := int(math.Ceil(math.Sqrt(float64(*podCount))))
f = int(math.Min(float64(f), 100))
sampleCount = &f
klog.Warningf("dns-propagation-probe-sample-count not set, defaulting to min(100, Ceil(SQRT(%v))= %v", *podCount, *sampleCount)
}
// creates the in-cluster config
kubeConfig, err := rest.InClusterConfig()
if err != nil {
klog.Fatalf("Can not build inClusterConfig, error:%v", err)
}
// creates the inCluster kube client
clientset, err := kubernetes.NewForConfig(kubeConfig)

if err != nil {
klog.Fatalf("Failed to build kubeClient, error:%v", err)
}
//TODO deprecated as of Go 1.20. To remove when go version gets bumped
rand.Seed(time.Now().UnixNano())
runProbe(clientset)
for {
klog.V(2).Infof("dns propagation probe complete, waiting until the test finishes...")
time.Sleep(60 * time.Second)
}
}

// runProbe runs the DNS propagation probe.
func runProbe(client kubernetes.Interface) {
klog.V(1).Infof("DNS propagation probe started")
var wg sync.WaitGroup
ch := make(chan DNSPodPropagationResult, *sampleCount)
indices := selectSample(*podCount, *sampleCount)
durations := make([]float64, 0, *sampleCount)
for _, idx := range indices {
podName := fmt.Sprintf("%s-%d", *statefulSet, idx)
url := fmt.Sprintf("%s.%s.%s.%s.%s.%s", podName, *service, *namespace, "svc", *clusterDomain, *suffix)
wg.Add(1)
go func(client kubernetes.Interface, url, podName string, namespace string, interval time.Duration) {
defer wg.Done()
result := runSinglePod(client, url, podName, namespace, interval)
ch <- DNSPodPropagationResult{
podName: podName,
duration: result,
}
}(client, url, podName, *namespace, *interval)
}
klog.V(2).Infof("Waiting for all sample pods processes to finish")
go func() {
wg.Wait()
close(ch)
}()

for propagationResult := range ch {
labels := prometheus.Labels{
"namespace": *namespace,
"service": *service,
"podName": propagationResult.podName,
}
DNSPropagationSeconds.With(labels).Set(propagationResult.duration.Seconds())
DNSPropagationCount.With(labels).Inc()
durations = append(durations, propagationResult.duration.Seconds())
}
klog.V(2).Infof("Finished calculating DNS propagation for all sample pods")

if len(durations) == 0 {
klog.Warningf("DNS propagation probe has zero observations")
return
}
sum := 0.0
for _, duration := range durations {
sum += duration
}
klog.V(1).Infof("DNS propagation probe finished, total of %v observations, average duration, %v s", len(durations), sum/float64(len(durations)))
}

// selectSample returns a slice of indices of length sampleCount, randomly selected from the range [0, podCount).
func selectSample(podCount int, sampleCount int) []int {
indices := make([]int, podCount)
for idx := range indices {
indices[idx] = idx
}
rand.Shuffle(len(indices), func(i, j int) { indices[i], indices[j] = indices[j], indices[i] })
indices = indices[:sampleCount]
return indices
}

// runSinglePod runs a single DNS propagation test for the given pod.
// It returns the duration between the time the DNS lookup succeeds and the time the pod was created.
func runSinglePod(client kubernetes.Interface, url string, podName string, namespace string, interval time.Duration) time.Duration {
klog.V(4).Infof("Starting dns propagation calculation for pod %s ...", url)
tick := time.NewTicker(interval)
defer tick.Stop()
for {
select {
case <-tick.C:
klog.V(4).Infof("DNS lookup %s", url)
if err := lookup(url); err != nil {
klog.Warningf("DNS lookup error: %v", err)
continue
}
endTime := time.Now()
klog.V(2).Infof("DNS lookup finished for pod %s, finding pod running time...", url)
timestamp, err := fetchPodRunningTime(client, podName, namespace)
if err != nil {
klog.Fatalf("K8s error: %v", err)
continue
}
duration := endTime.Sub(timestamp)
klog.V(2).Infof("Pod running time fetched for pod %s, timestamp= %v, DNS propagation duration= %v s", url, timestamp, duration)
return duration
}
}
}

// lookup performs a DNS lookup for the given URL.
// It returns nil if the lookup succeeds, or an error otherwise.
func lookup(url string) error {
_, err := net.LookupIP(url)
if err != nil {
return err
}
return nil
}

// fetchPodRunningTime fetches the running time of the given pod.
// It retries 3 times with 1 second intervals between retries.
// It returns the running time of the pod, or an error if the pod is not found or if the running time cannot be fetched.
func fetchPodRunningTime(client kubernetes.Interface, podName string, namespace string) (time.Time, error) {
trials := 0
for trials < 3 {
readyTimestamp, err := getPodRunningTimeFromClient(client, podName, namespace)
if err == nil {
return readyTimestamp, nil
}
klog.Warningf("Failed at obtaining pod running time for pod %s, reason: %v", podName, err)
trials++
time.Sleep(1 * time.Second)
}
return getPodRunningTimeFromClient(client, podName, namespace)
}

// getPodRunningTimeFromClient fetches the running time of the given pod.
// It returns the running time of the pod, or an error if the pod is not found or if the running time cannot be fetched.
func getPodRunningTimeFromClient(client kubernetes.Interface, podName string, namespace string) (time.Time, error) {
options := metav1.GetOptions{ResourceVersion: "0"}
pod, err := client.CoreV1().Pods(namespace).Get(context.TODO(), podName, options)
if err != nil {
return time.Now(), err
}

for _, condition := range pod.Status.Conditions {
if condition.Type == "Ready" && condition.Status == "True" {
readyTimestamp := condition.LastTransitionTime.Time
return readyTimestamp, nil
}
}
return time.Now(), errors.New("Ready status wasn't found")
}

0 comments on commit f490a78

Please sign in to comment.