diff --git a/pkg/watcher/internal/metricsprovider/signalfx.go b/pkg/watcher/internal/metricsprovider/signalfx.go index e13f853..9abed15 100644 --- a/pkg/watcher/internal/metricsprovider/signalfx.go +++ b/pkg/watcher/internal/metricsprovider/signalfx.go @@ -23,6 +23,7 @@ import ( "fmt" "net/http" "net/url" + "os" "strconv" "strings" "time" @@ -33,17 +34,17 @@ import ( const ( // SignalFX Request Params - DefaultSignalFxAddress = "https://api.signalfx.com" - signalFxMetricsAPI = "/v1/timeserieswindow" - // SignalFx adds a suffix to hostnames if configured - signalFxHostNameSuffix = ".group.region.gcp.com" - signalFxHostFilter = "host:" - + DefaultSignalFxAddress = "https://api.signalfx.com" + signalFxMetricsAPI = "/v1/timeserieswindow" + signalFxMetdataAPI = "/v2/metrictimeseries" + signalFxHostFilter = "host:" + signalFxHostNameSuffixKey = "SIGNALFX_HOST_NAME_SUFFIX" // SignalFX Query Params oneMinuteResolutionMs = 60000 cpuUtilizationMetric = `sf_metric:"cpu.utilization"` memoryUtilizationMetric = `sf_metric:"memory.utilization"` AND = "AND" + resultSetLimit = "10000" // Miscellaneous httpClientTimeout = 55 * time.Second @@ -53,6 +54,7 @@ type signalFxClient struct { client http.Client authToken string signalFxAddress string + hostNameSuffix string } func NewSignalFxClient(opts watcher.MetricsProviderOpts) (watcher.MetricsProviderClient, error) { @@ -62,7 +64,7 @@ func NewSignalFxClient(opts watcher.MetricsProviderOpts) (watcher.MetricsProvide tlsConfig := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, // TODO(aqadeer): Figure out a secure way to let users add SSL certs } - + hostNameSuffix, _ := os.LookupEnv(signalFxHostNameSuffixKey) var signalFxAddress, signalFxAuthToken = DefaultSignalFxAddress, "" if opts.Address != "" { signalFxAddress = opts.Address @@ -77,7 +79,8 @@ func NewSignalFxClient(opts watcher.MetricsProviderOpts) (watcher.MetricsProvide Timeout: httpClientTimeout, Transport: tlsConfig}, authToken: signalFxAuthToken, - signalFxAddress: signalFxAddress}, nil + signalFxAddress: signalFxAddress, + hostNameSuffix: hostNameSuffix}, nil } func (s signalFxClient) Name() string { @@ -87,7 +90,7 @@ func (s signalFxClient) Name() string { func (s signalFxClient) FetchHostMetrics(host string, window *watcher.Window) ([]watcher.Metric, error) { log.Debugf("fetching metrics for host %v", host) var metrics []watcher.Metric - hostQuery := signalFxHostFilter + host + signalFxHostNameSuffix + hostQuery := signalFxHostFilter + host + s.hostNameSuffix for _, metric := range []string{cpuUtilizationMetric, memoryUtilizationMetric} { uri, err := s.buildMetricURL(hostQuery, metric, window) @@ -113,16 +116,7 @@ func (s signalFxClient) FetchHostMetrics(host string, window *watcher.Window) ([ } var fetchedMetric watcher.Metric - // Added default operator and rollup for signalfx client. - fetchedMetric.Operator = watcher.Average - fetchedMetric.Rollup = window.Duration - if metric == cpuUtilizationMetric { - fetchedMetric.Name = cpuUtilizationMetric - fetchedMetric.Type = watcher.CPU - } else { - fetchedMetric.Name = memoryUtilizationMetric - fetchedMetric.Type = watcher.Memory - } + addMetadata(&fetchedMetric, metric) fetchedMetric.Value, err = decodeMetricsPayload(res) if err != nil { return metrics, err @@ -133,9 +127,74 @@ func (s signalFxClient) FetchHostMetrics(host string, window *watcher.Window) ([ return metrics, nil } -// TODO(aqadeer): Fetching metrics for all hosts is not possible currently via timeserieswindow SignalFx API -func (s signalFxClient) FetchAllHostsMetrics(*watcher.Window) (map[string][]watcher.Metric, error) { - return nil, errors.New("This function is not yet implemented") +func (s signalFxClient) FetchAllHostsMetrics(window *watcher.Window) (map[string][]watcher.Metric, error) { + hostQuery := signalFxHostFilter + "*" + s.hostNameSuffix + metrics := make(map[string][]watcher.Metric) + for _, metric := range []string{cpuUtilizationMetric, memoryUtilizationMetric} { + uri, err := s.buildMetricURL(hostQuery, metric, window) + if err != nil { + return metrics, err + } + req, _ := http.NewRequest(http.MethodGet, uri.String(), nil) + req.Header.Set("X-SF-Token", s.authToken) + req.Header.Set("Content-Type", "application/json") + + metricResp, err := s.client.Do(req) + if err != nil { + return metrics, err + } + defer metricResp.Body.Close() + if metricResp.StatusCode != http.StatusOK { + return metrics, fmt.Errorf("received status code: %v", metricResp.StatusCode) + } + var metricPayload interface{} + err = json.NewDecoder(metricResp.Body).Decode(&metricPayload) + if err != nil { + return metrics, err + } + + uri, err = s.buildMetadataURL(hostQuery, metric) + if err != nil { + return metrics, err + } + req, _ = http.NewRequest(http.MethodGet, uri.String(), nil) + req.Header.Set("X-SF-Token", s.authToken) + req.Header.Set("Content-Type", "application/json") + + metadataResp, err := s.client.Do(req) + if err != nil { + return metrics, err + } + defer metadataResp.Body.Close() + if metadataResp.StatusCode != http.StatusOK { + return metrics, fmt.Errorf("received status code: %v", metadataResp.StatusCode) + } + var metadataPayload interface{} + err = json.NewDecoder(metadataResp.Body).Decode(&metadataPayload) + if err != nil { + return metrics, err + } + mappedMetrics, err := getMetricsFromPayloads(metricPayload, metadataPayload) + if err != nil { + return metrics, err + } + for k, v := range mappedMetrics { + addMetadata(&v, metric) + metrics[k] = append(metrics[k], v) + } + } + return metrics, nil +} + +func addMetadata(metric *watcher.Metric, metricType string) { + metric.Operator = watcher.Average + if metricType == cpuUtilizationMetric { + metric.Name = cpuUtilizationMetric + metric.Type = watcher.CPU + } else { + metric.Name = memoryUtilizationMetric + metric.Type = watcher.Memory + } } func (s signalFxClient) buildMetricURL(host string, metric string, window *watcher.Window) (uri *url.URL, err error) { @@ -150,14 +209,30 @@ func (s signalFxClient) buildMetricURL(host string, metric string, window *watch builder.WriteString(fmt.Sprintf(" %v ", AND)) builder.WriteString(metric) q.Set("query", builder.String()) - - q.Set("startMs", strconv.FormatInt(window.Start, 10)) - q.Set("endMs", strconv.FormatInt(window.End, 10)) + q.Set("startMs", strconv.FormatInt(window.Start*1000, 10)) + q.Set("endMs", strconv.FormatInt(window.End*1000, 10)) q.Set("resolution", strconv.Itoa(oneMinuteResolutionMs)) uri.RawQuery = q.Encode() return } +func (s signalFxClient) buildMetadataURL(host string, metric string) (uri *url.URL, err error) { + uri, err = url.Parse(s.signalFxAddress + signalFxMetdataAPI) + if err != nil { + return nil, err + } + q := uri.Query() + + builder := strings.Builder{} + builder.WriteString(host) + builder.WriteString(fmt.Sprintf(" %v ", AND)) + builder.WriteString(metric) + q.Set("query", builder.String()) + q.Set("limit", resultSetLimit) + uri.RawQuery = q.Encode() + return +} + /** Sample payload: { @@ -206,3 +281,144 @@ func decodeMetricsPayload(payload interface{}) (float64, error) { } return timestampUtilisation[1].(float64), nil } + +/** +Sample metricData payload: +{ + "data": { + "Ehql_bxBgAc": [ + [ + 1600213380000, + 84.64246793530153 + ] + ], + "EuXgJm7BkAA": [ + [ + 1614634260000, + 5.450946379084264 + ] + ], + .... + .... + }, + "errors": [] +} + +https://dev.splunk.com/observability/reference/api/metrics_metadata/latest#endpoint-retrieve-metric-timeseries-metadata +Sample metaData payload: +{ + "count": 5, + "partialCount": false, + "results": [ + { + "active": true, + "created": 1614534848000, + "creator": null, + "dimensions": { + "host": "test.dev.com", + "sf_metric": null + }, + "id": "EvVH6P7BgAA", + "lastUpdated": 0, + "lastUpdatedBy": null, + "metric": "cpu.utilization" + }, + .... + .... + ] +} +*/ +func getMetricsFromPayloads(metricData interface{}, metadata interface{}) (map[string]watcher.Metric, error) { + keyHostMap := make(map[string]string) + hostMetricMap := make(map[string]watcher.Metric) + if _, ok := metadata.(map[string]interface{}); !ok { + return hostMetricMap, fmt.Errorf("type conversion failed, found %T", metadata) + } + results := metadata.(map[string]interface{})["results"] + if results == nil { + return hostMetricMap, errors.New("unexpected payload: missing results field") + } + + for _, v := range results.([]interface{}) { + _, ok := v.(map[string]interface{}) + if !ok { + log.Errorf("type conversion failed, found %T", v) + continue + } + id := v.(map[string]interface{})["id"] + if id == nil { + log.Errorf("id not found in %v", v) + continue + } + _, ok = id.(string) + if !ok { + log.Errorf("id not expected type string, found %T", id) + continue + } + dimensions := v.(map[string]interface{})["dimensions"] + if dimensions == nil { + log.Errorf("no dimensions found in %v", v) + continue + } + _, ok = dimensions.(map[string]interface{}) + if !ok { + log.Errorf("type conversion failed, found %T", dimensions) + continue + } + host := dimensions.(map[string]interface{})["host"] + if host == nil { + log.Errorf("no host found in %v", dimensions) + continue + } + if _, ok := host.(string); !ok { + log.Errorf("host not expected type string, found %T", host) + } + keyHostMap[id.(string)] = host.(string) + } + + var data interface{} + data = metricData.(map[string]interface{})["data"] + if data == nil { + return hostMetricMap, errors.New("unexpected payload: missing data field") + } + keyMetricMap, ok := data.(map[string]interface{}) + if !ok { + return hostMetricMap, errors.New("unable to deserialise data field") + } + for key, metric := range keyMetricMap { + if _, ok := keyHostMap[key]; !ok { + log.Errorf("no metadata found for key %v", key) + continue + } + values, ok := metric.([]interface{}) + if !ok { + log.Errorf("unable to deserialise values for key %v", key) + continue + } + if len(values) == 0 { + log.Errorf("no metric value array could be decoded for key %v", key) + continue + } + // Find the average across returned values per 1 minute resolution + var sum float64 + var count float64 + for _, value := range values { + var timestampUtilisation []interface{} + timestampUtilisation, ok = value.([]interface{}) + if !ok || len(timestampUtilisation) < 2 { + log.Errorf("unable to deserialise metric values for key %v", key) + continue + } + if _, ok := timestampUtilisation[1].(float64); !ok { + log.Errorf("unable to typecast value to float64: %v of type %T", timestampUtilisation, timestampUtilisation) + } + sum += timestampUtilisation[1].(float64) + count += 1 + } + + fetchedMetric := watcher.Metric{Value: sum / count} + hostMetricMap[keyHostMap[key]] = fetchedMetric + } + + return hostMetricMap, nil +}