diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 97cbdb86b2..69ffb8ea32 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -503,6 +503,7 @@ func runQuery( dns.ResolverType(dnsSDResolver), ), dnsSDInterval, + logger, ) dnsEndpointProvider := dns.NewProvider( @@ -608,7 +609,7 @@ func runQuery( fileSDCache.Update(update) endpoints.Update(ctxUpdate) - if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...)); err != nil { + if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil { level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err) } @@ -628,22 +629,22 @@ func runQuery( return runutil.Repeat(dnsSDInterval, ctx.Done(), func() error { resolveCtx, resolveCancel := context.WithTimeout(ctx, dnsSDInterval) defer resolveCancel() - if err := dnsStoreProvider.Resolve(resolveCtx, append(fileSDCache.Addresses(), storeAddrs...)); err != nil { + if err := dnsStoreProvider.Resolve(resolveCtx, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil { level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err) } - if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs); err != nil { + if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs, true); err != nil { level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err) } - if err := dnsTargetProvider.Resolve(ctx, targetAddrs); err != nil { + if err := dnsTargetProvider.Resolve(ctx, targetAddrs, true); err != nil { level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err) } - if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil { + if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs, true); err != nil { level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err) } - if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs); err != nil { + if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs, true); err != nil { level.Error(logger).Log("msg", "failed to resolve addresses for exemplarsAPI", "err", err) } - if err := dnsEndpointProvider.Resolve(resolveCtx, endpointAddrs); err != nil { + if err := dnsEndpointProvider.Resolve(resolveCtx, endpointAddrs, true); err != nil { level.Error(logger).Log("msg", "failed to resolve addresses passed using endpoint flag", "err", err) } diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 80e96ae7ff..e0780452fd 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -452,7 +452,7 @@ func runRule( return runutil.Repeat(5*time.Second, ctx.Done(), func() error { resolveCtx, resolveCancel := context.WithTimeout(ctx, 5*time.Second) defer resolveCancel() - if err := dnsEndpointProvider.Resolve(resolveCtx, grpcEndpoints); err != nil { + if err := dnsEndpointProvider.Resolve(resolveCtx, grpcEndpoints, true); err != nil { level.Error(logger).Log("msg", "failed to resolve addresses passed using grpc query config", "err", err) } return nil diff --git a/internal/cortex/chunk/cache/memcached_client.go b/internal/cortex/chunk/cache/memcached_client.go index a0bc202284..1f2c514589 100644 --- a/internal/cortex/chunk/cache/memcached_client.go +++ b/internal/cortex/chunk/cache/memcached_client.go @@ -251,7 +251,7 @@ func (c *memcachedClient) updateMemcacheServers() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - if err := c.provider.Resolve(ctx, c.addresses); err != nil { + if err := c.provider.Resolve(ctx, c.addresses, true); err != nil { return err } servers = c.provider.Addresses() diff --git a/pkg/cache/groupcache.go b/pkg/cache/groupcache.go index 609959d0cb..43bf1aeefe 100644 --- a/pkg/cache/groupcache.go +++ b/pkg/cache/groupcache.go @@ -134,7 +134,7 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf go func() { for { - if err := dnsGroupcacheProvider.Resolve(context.Background(), conf.Peers); err != nil { + if err := dnsGroupcacheProvider.Resolve(context.Background(), conf.Peers, true); err != nil { level.Error(logger).Log("msg", "failed to resolve addresses for groupcache", "err", err) } else { err := universe.Set(dnsGroupcacheProvider.Addresses()...) diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index a5b0c5b2a4..e700ffee7b 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -211,7 +211,7 @@ type memcachedClient struct { // AddressProvider performs node address resolution given a list of clusters. type AddressProvider interface { // Resolves the provided list of memcached cluster to the actual nodes - Resolve(context.Context, []string) error + Resolve(context.Context, []string, bool) error // Returns the nodes Addresses() []string @@ -638,7 +638,7 @@ func (c *memcachedClient) resolveAddrs() error { defer cancel() // If some of the dns resolution fails, log the error. - if err := c.addressProvider.Resolve(ctx, c.config.Addresses); err != nil { + if err := c.addressProvider.Resolve(ctx, c.config.Addresses, true); err != nil { level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err) } // Fail in case no server address is resolved. diff --git a/pkg/clientconfig/http.go b/pkg/clientconfig/http.go index 69f2baf165..4493ebc60b 100644 --- a/pkg/clientconfig/http.go +++ b/pkg/clientconfig/http.go @@ -330,7 +330,7 @@ func (c HTTPFileSDConfig) convert() (file.SDConfig, error) { } type AddressProvider interface { - Resolve(context.Context, []string) error + Resolve(context.Context, []string, bool) error Addresses() []string } @@ -433,5 +433,5 @@ func (c *HTTPClient) Discover(ctx context.Context) { // Resolve refreshes and resolves the list of targets. func (c *HTTPClient) Resolve(ctx context.Context) error { - return c.provider.Resolve(ctx, append(c.fileSDCache.Addresses(), c.staticAddresses...)) + return c.provider.Resolve(ctx, append(c.fileSDCache.Addresses(), c.staticAddresses...), true) } diff --git a/pkg/discovery/dns/grpc.go b/pkg/discovery/dns/grpc.go index 4e315596df..79e832b652 100644 --- a/pkg/discovery/dns/grpc.go +++ b/pkg/discovery/dns/grpc.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/go-kit/log" grpcresolver "google.golang.org/grpc/resolver" ) @@ -19,12 +20,14 @@ var ( type builder struct { resolveInterval time.Duration provider *Provider + logger log.Logger } -func RegisterGRPCResolver(provider *Provider, interval time.Duration) { +func RegisterGRPCResolver(provider *Provider, interval time.Duration, logger log.Logger) { grpcresolver.Register(&builder{ resolveInterval: interval, provider: provider, + logger: logger, }) } @@ -39,6 +42,7 @@ func (b *builder) Build(t grpcresolver.Target, cc grpcresolver.ClientConn, _ grp cancel: cancel, cc: cc, interval: b.resolveInterval, + logger: b.logger, } r.wg.Add(1) go r.run() @@ -55,7 +59,8 @@ type resolver struct { cc grpcresolver.ClientConn interval time.Duration - wg sync.WaitGroup + wg sync.WaitGroup + logger log.Logger } func (r *resolver) Close() { @@ -68,7 +73,7 @@ func (r *resolver) ResolveNow(_ grpcresolver.ResolveNowOptions) {} func (r *resolver) resolve() error { ctx, cancel := context.WithTimeout(r.ctx, r.interval) defer cancel() - return r.provider.Resolve(ctx, []string{r.target}) + return r.provider.Resolve(ctx, []string{r.target}, false) } func (r *resolver) addresses() []string { @@ -78,16 +83,25 @@ func (r *resolver) addresses() []string { func (r *resolver) run() { defer r.wg.Done() for { - if err := r.resolve(); err != nil { - r.cc.ReportError(err) - } else { + func() { + if err := r.resolve(); err != nil { + r.cc.ReportError(err) + r.logger.Log("msg", "failed to resolve", "err", err) + return + } state := grpcresolver.State{} - for _, addr := range r.addresses() { - raddr := grpcresolver.Address{Addr: addr} - state.Addresses = append(state.Addresses, raddr) + addrs := r.addresses() + if len(addrs) == 0 { + r.logger.Log("msg", "no addresses resolved", "target", r.target) + return } - _ = r.cc.UpdateState(state) - } + for _, addr := range addrs { + state.Addresses = append(state.Addresses, grpcresolver.Address{Addr: addr}) + } + if err := r.cc.UpdateState(state); err != nil { + r.logger.Log("msg", "failed to update state", "err", err) + } + }() select { case <-r.ctx.Done(): return diff --git a/pkg/discovery/dns/provider.go b/pkg/discovery/dns/provider.go index 8f42bf4d26..f9c1c7f583 100644 --- a/pkg/discovery/dns/provider.go +++ b/pkg/discovery/dns/provider.go @@ -110,7 +110,7 @@ func GetQTypeName(addr string) (qtype, name string) { // Resolve stores a list of provided addresses or their DNS records if requested. // Addresses prefixed with `dns+` or `dnssrv+` will be resolved through respective DNS lookup (A/AAAA or SRV). // For non-SRV records, it will return an error if a port is not supplied. -func (p *Provider) Resolve(ctx context.Context, addrs []string) error { +func (p *Provider) Resolve(ctx context.Context, addrs []string, flushOld bool) error { resolvedAddrs := map[string][]string{} errs := errutil.MultiError{} @@ -129,10 +129,7 @@ func (p *Provider) Resolve(ctx context.Context, addrs []string) error { errs.Add(err) // The DNS resolution failed. Continue without modifying the old records. p.resolverFailuresCount.Inc() - // Use cached values. - p.RLock() - resolved = p.resolved[addr] - p.RUnlock() + continue } resolvedAddrs[addr] = resolved } @@ -143,13 +140,17 @@ func (p *Provider) Resolve(ctx context.Context, addrs []string) error { defer p.Unlock() p.resolverAddrs.ResetTx() + if flushOld && len(errs) == 0 { + p.resolved = map[string][]string{} + } for name, addrs := range resolvedAddrs { + p.resolved[name] = addrs + } + for name, addrs := range p.resolved { p.resolverAddrs.WithLabelValues(name).Set(float64(len(addrs))) } p.resolverAddrs.Submit() - p.resolved = resolvedAddrs - return errs.Err() } diff --git a/pkg/discovery/dns/provider_test.go b/pkg/discovery/dns/provider_test.go index 439138ef4d..b0d48fd765 100644 --- a/pkg/discovery/dns/provider_test.go +++ b/pkg/discovery/dns/provider_test.go @@ -33,7 +33,7 @@ func TestProvider(t *testing.T) { } ctx := context.TODO() - err := prv.Resolve(ctx, []string{"any+x"}) + err := prv.Resolve(ctx, []string{"any+x"}, false) testutil.Ok(t, err) result := prv.Addresses() sort.Strings(result) @@ -41,58 +41,67 @@ func TestProvider(t *testing.T) { testutil.Equals(t, 1, promtestutil.CollectAndCount(prv.resolverAddrs)) testutil.Equals(t, float64(0), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+x"))) - err = prv.Resolve(ctx, []string{"any+a", "any+b", "any+c"}) + err = prv.Resolve(ctx, []string{"any+a", "any+b", "any+c"}, false) testutil.Ok(t, err) result = prv.Addresses() sort.Strings(result) testutil.Equals(t, ips, result) - testutil.Equals(t, 3, promtestutil.CollectAndCount(prv.resolverAddrs)) + testutil.Equals(t, 4, promtestutil.CollectAndCount(prv.resolverAddrs)) testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+a"))) testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b"))) testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c"))) - err = prv.Resolve(ctx, []string{"any+b", "any+c"}) + err = prv.Resolve(ctx, []string{"any+b", "any+c"}, false) testutil.Ok(t, err) result = prv.Addresses() sort.Strings(result) - testutil.Equals(t, ips[2:], result) - testutil.Equals(t, 2, promtestutil.CollectAndCount(prv.resolverAddrs)) + testutil.Equals(t, ips, result) + testutil.Equals(t, 4, promtestutil.CollectAndCount(prv.resolverAddrs)) testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b"))) testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c"))) - err = prv.Resolve(ctx, []string{"any+x"}) + err = prv.Resolve(ctx, []string{"any+x"}, false) testutil.Ok(t, err) result = prv.Addresses() sort.Strings(result) - testutil.Equals(t, []string(nil), result) - testutil.Equals(t, 1, promtestutil.CollectAndCount(prv.resolverAddrs)) + testutil.Equals(t, ips, result) + testutil.Equals(t, 4, promtestutil.CollectAndCount(prv.resolverAddrs)) testutil.Equals(t, float64(0), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+x"))) - err = prv.Resolve(ctx, []string{"any+a", "any+b", "any+c"}) + err = prv.Resolve(ctx, []string{"any+a", "any+b", "any+c"}, false) testutil.Ok(t, err) result = prv.Addresses() sort.Strings(result) testutil.Equals(t, ips, result) - testutil.Equals(t, 3, promtestutil.CollectAndCount(prv.resolverAddrs)) + testutil.Equals(t, 4, promtestutil.CollectAndCount(prv.resolverAddrs)) testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+a"))) testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b"))) testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c"))) - err = prv.Resolve(ctx, []string{"any+b", "example.com:90", "any+c"}) + err = prv.Resolve(ctx, []string{"any+b", "example.com:90", "any+c"}, false) testutil.Ok(t, err) result = prv.Addresses() sort.Strings(result) - testutil.Equals(t, append(ips[2:], "example.com:90"), result) - testutil.Equals(t, 3, promtestutil.CollectAndCount(prv.resolverAddrs)) + testutil.Equals(t, append(ips, "example.com:90"), result) + testutil.Equals(t, 5, promtestutil.CollectAndCount(prv.resolverAddrs)) testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b"))) testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("example.com:90"))) testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c"))) - err = prv.Resolve(ctx, []string{"any+b", "any+c"}) + err = prv.Resolve(ctx, []string{"any+b", "any+c"}, false) + testutil.Ok(t, err) + result = prv.Addresses() + sort.Strings(result) + testutil.Equals(t, append(ips, "example.com:90"), result) + testutil.Equals(t, 5, promtestutil.CollectAndCount(prv.resolverAddrs)) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b"))) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c"))) + + err = prv.Resolve(ctx, []string{"any+b", "any+c"}, true) testutil.Ok(t, err) result = prv.Addresses() sort.Strings(result) - testutil.Equals(t, ips[2:], result) + testutil.Equals(t, ips[2:5], result) testutil.Equals(t, 2, promtestutil.CollectAndCount(prv.resolverAddrs)) testutil.Equals(t, float64(2), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+b"))) testutil.Equals(t, float64(1), promtestutil.ToFloat64(prv.resolverAddrs.WithLabelValues("any+c"))) diff --git a/pkg/discovery/memcache/provider.go b/pkg/discovery/memcache/provider.go index 0456a52bf4..1560c55951 100644 --- a/pkg/discovery/memcache/provider.go +++ b/pkg/discovery/memcache/provider.go @@ -58,7 +58,7 @@ func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time. } // Resolve stores a list of nodes auto-discovered from the provided addresses. -func (p *Provider) Resolve(ctx context.Context, addresses []string) error { +func (p *Provider) Resolve(ctx context.Context, addresses []string, flushOld bool) error { clusterConfigs := map[string]*clusterConfig{} errs := errutil.MultiError{} @@ -74,13 +74,9 @@ func (p *Provider) Resolve(ctx context.Context, addresses []string) error { errs.Add(err) p.resolverFailuresCount.Inc() - // Use cached values. - p.RLock() - clusterConfigs[address] = p.clusterConfigs[address] - p.RUnlock() - } else { - clusterConfigs[address] = clusterConfig + continue } + clusterConfigs[address] = clusterConfig } p.Lock() @@ -95,7 +91,12 @@ func (p *Provider) Resolve(ctx context.Context, addresses []string) error { p.resolvedAddresses.Submit() p.configVersion.Submit() - p.clusterConfigs = clusterConfigs + if flushOld && len(errs) == 0 { + p.clusterConfigs = map[string]*clusterConfig{} + } + for addr, config := range clusterConfigs { + p.clusterConfigs[addr] = config + } return errs.Err() } diff --git a/pkg/discovery/memcache/provider_test.go b/pkg/discovery/memcache/provider_test.go index a25d392334..02a2940770 100644 --- a/pkg/discovery/memcache/provider_test.go +++ b/pkg/discovery/memcache/provider_test.go @@ -28,7 +28,7 @@ func TestProviderUpdatesAddresses(t *testing.T) { } provider.resolver = &resolver - testutil.Ok(t, provider.Resolve(ctx, clusters)) + testutil.Ok(t, provider.Resolve(ctx, clusters, true)) addresses := provider.Addresses() sort.Strings(addresses) testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses) @@ -38,7 +38,7 @@ func TestProviderUpdatesAddresses(t *testing.T) { "memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}}, } - testutil.Ok(t, provider.Resolve(ctx, clusters)) + testutil.Ok(t, provider.Resolve(ctx, clusters, true)) addresses = provider.Addresses() sort.Strings(addresses) testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080", "dns-3:11211"}, addresses) @@ -56,7 +56,7 @@ func TestProviderDoesNotUpdateAddressIfFailed(t *testing.T) { } provider.resolver = &resolver - testutil.Ok(t, provider.Resolve(ctx, clusters)) + testutil.Ok(t, provider.Resolve(ctx, clusters, true)) addresses := provider.Addresses() sort.Strings(addresses) testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses) @@ -64,7 +64,7 @@ func TestProviderDoesNotUpdateAddressIfFailed(t *testing.T) { resolver.configs = nil resolver.err = errors.New("oops") - testutil.NotOk(t, provider.Resolve(ctx, clusters)) + testutil.NotOk(t, provider.Resolve(ctx, clusters, true)) addresses = provider.Addresses() sort.Strings(addresses) testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)