Skip to content

Commit

Permalink
Merge branch 'main' into logger_duration_ms
Browse files Browse the repository at this point in the history
  • Loading branch information
rhassanein authored Aug 4, 2021
2 parents 60b4862 + cd18b61 commit afd423f
Show file tree
Hide file tree
Showing 10 changed files with 437 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Added
- [#4453](https://github.com/thanos-io/thanos/pull/4453) Tools: Add flag `--selector.relabel-config-file` / `--selector.relabel-config` / `--max-time` / `--min-time` to filter served blocks.
- [#4482](https://github.com/thanos-io/thanos/pull/4482) COS: Add http_config for cos object store client.
- [#4487](https://github.com/thanos-io/thanos/pull/4487) Query: Add memcached auto discovery support.
- [#4509](https://github.com/thanos-io/thanos/pull/4509) Logging: Adds duration_ms in int64 to the logs

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
|-----------------------|------------------------|--------------------------|----------------------------------------------|---------------|
| Bartłomiej Płotka | [email protected] | `@bwplotka` | [@bwplotka](https://github.com/bwplotka) | Red Hat |
| Frederic Branczyk | [email protected] | `@brancz` | [@brancz](https://github.com/brancz) | Polar Signals |
| Giedrius Statkevičius | [email protected] | `@Giedrius Statkevičius` | [@GiedriusS](https://github.com/GiedriusS) | AdForm |
| Giedrius Statkevičius | [email protected] | `@Giedrius Statkevičius` | [@GiedriusS](https://github.com/GiedriusS) | Vinted |
| Kemal Akkoyun | [email protected] | `@kakkoyun` | [@kakkoyun](https://github.com/kakkoyun) | Polar Signals |
| Lucas Servén Marín | [email protected] | `@squat` | [@squat](https://github.com/squat) | Red Hat |
| Prem Saraswat | [email protected] | `@Prem Saraswat` | [@onprem](https://github.com/onprem) | Red Hat |
Expand Down
1 change: 1 addition & 0 deletions docs/components/query-frontend.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ config:
max_item_size: 0
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
auto_discovery: false
expiration: 0s
```

Expand Down
4 changes: 3 additions & 1 deletion docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,12 @@ config:
max_item_size: 0
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
auto_discovery: false
```

The **required** settings are:

- `addresses`: list of memcached addresses, that will get resolved with the [DNS service discovery](../service-discovery.md/#dns-service-discovery) provider.
- `addresses`: list of memcached addresses, that will get resolved with the [DNS service discovery](../service-discovery.md/#dns-service-discovery) provider. If your cluster supports auto-discovery, you should use the flag `auto_discovery` instead and only point to *one of* the memcached servers. This typically means that there should be only one address specified that resolves to any of the alive memcached servers. Use this for Amazon ElastiCache and other similar services.

While the remaining settings are **optional**:

Expand All @@ -306,6 +307,7 @@ While the remaining settings are **optional**:
- `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited.
- `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited.
- `dns_provider_update_interval`: the DNS discovery update interval.
- `auto_discovery`: whether to use the auto-discovery mechanism for memcached.

## Caching Bucket

Expand Down
57 changes: 41 additions & 16 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/discovery/dns"
memcacheDiscovery "github.com/thanos-io/thanos/pkg/discovery/memcache"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
Expand Down Expand Up @@ -53,6 +54,7 @@ var (
MaxGetMultiConcurrency: 100,
MaxGetMultiBatchSize: 0,
DNSProviderUpdateInterval: 10 * time.Second,
AutoDiscovery: false,
}
)

Expand Down Expand Up @@ -114,6 +116,9 @@ type MemcachedClientConfig struct {

// DNSProviderUpdateInterval specifies the DNS discovery update interval.
DNSProviderUpdateInterval time.Duration `yaml:"dns_provider_update_interval"`

// AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution
AutoDiscovery bool `yaml:"auto_discovery"`
}

func (c *MemcachedClientConfig) validate() error {
Expand Down Expand Up @@ -153,8 +158,8 @@ type memcachedClient struct {
// Name provides an identifier for the instantiated Client
name string

// DNS provider used to keep the memcached servers list updated.
dnsProvider *dns.Provider
// Address provider used to keep the memcached servers list updated.
addressProvider AddressProvider

// Channel used to notify internal goroutines when they should quit.
stop chan struct{}
Expand All @@ -177,6 +182,15 @@ type memcachedClient struct {
dataSize *prometheus.HistogramVec
}

// AddressProvider performs node address resolution given a list of clusters.
type AddressProvider interface {
// Resolves the provided list of memcached cluster to the actual nodes
Resolve(context.Context, []string) error

// Returns the nodes
Addresses() []string
}

type memcachedGetMultiResult struct {
items map[string]*memcache.Item
err error
Expand Down Expand Up @@ -220,20 +234,31 @@ func newMemcachedClient(
reg prometheus.Registerer,
name string,
) (*memcachedClient, error) {
dnsProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_memcached_", reg),
dns.GolangResolverType,
)
promRegisterer := extprom.WrapRegistererWithPrefix("thanos_memcached_", reg)

var addressProvider AddressProvider
if config.AutoDiscovery {
addressProvider = memcacheDiscovery.NewProvider(
logger,
promRegisterer,
config.Timeout,
)
} else {
addressProvider = dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_memcached_", reg),
dns.GolangResolverType,
)
}

c := &memcachedClient{
logger: log.With(logger, "name", name),
config: config,
client: client,
selector: selector,
dnsProvider: dnsProvider,
asyncQueue: make(chan func(), config.MaxAsyncBufferSize),
stop: make(chan struct{}, 1),
logger: log.With(logger, "name", name),
config: config,
client: client,
selector: selector,
addressProvider: addressProvider,
asyncQueue: make(chan func(), config.MaxAsyncBufferSize),
stop: make(chan struct{}, 1),
getMultiGate: gate.New(
extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg),
config.MaxGetMultiConcurrency,
Expand Down Expand Up @@ -561,11 +586,11 @@ func (c *memcachedClient) resolveAddrs() error {
defer cancel()

// If some of the dns resolution fails, log the error.
if err := c.dnsProvider.Resolve(ctx, c.config.Addresses); err != nil {
if err := c.addressProvider.Resolve(ctx, c.config.Addresses); err != nil {
level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err)
}
// Fail in case no server address is resolved.
servers := c.dnsProvider.Addresses()
servers := c.addressProvider.Addresses()
if len(servers) == 0 {
return fmt.Errorf("no server address resolved for %s", c.name)
}
Expand Down
114 changes: 114 additions & 0 deletions pkg/discovery/memcache/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package memcache

import (
"context"
"fmt"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/extprom"
)

// Provider is a stateful cache for asynchronous memcached auto-discovery resolution. It provides a way to resolve
// addresses and obtain them.
type Provider struct {
sync.RWMutex
resolver Resolver
clusterConfigs map[string]*clusterConfig
logger log.Logger

configVersion *extprom.TxGaugeVec
resolvedAddresses *extprom.TxGaugeVec
resolverFailuresCount prometheus.Counter
resolverLookupsCount prometheus.Counter
}

func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time.Duration) *Provider {
p := &Provider{
resolver: &memcachedAutoDiscovery{dialTimeout: dialTimeout},
clusterConfigs: map[string]*clusterConfig{},
configVersion: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{
Name: "auto_discovery_config_version",
Help: "The current auto discovery config version",
}, []string{"addr"}),
resolvedAddresses: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{
Name: "auto_discovery_resolved_addresses",
Help: "The number of memcached nodes found via auto discovery",
}, []string{"addr"}),
resolverLookupsCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "auto_discovery_total",
Help: "The number of memcache auto discovery attempts",
}),
resolverFailuresCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "auto_discovery_failures_total",
Help: "The number of memcache auto discovery failures",
}),
logger: logger,
}
return p
}

// Resolve stores a list of nodes auto-discovered from the provided addresses.
func (p *Provider) Resolve(ctx context.Context, addresses []string) error {
clusterConfigs := map[string]*clusterConfig{}
errs := errutil.MultiError{}

for _, address := range addresses {
clusterConfig, err := p.resolver.Resolve(ctx, address)
p.resolverLookupsCount.Inc()

if err != nil {
level.Warn(p.logger).Log(
"msg", "failed to perform auto-discovery for memcached",
"address", address,
)
errs.Add(err)
p.resolverFailuresCount.Inc()

// Use cached values.
p.RLock()
clusterConfigs[address] = p.clusterConfigs[address]
p.RUnlock()
} else {
clusterConfigs[address] = clusterConfig
}
}

p.Lock()
defer p.Unlock()

p.resolvedAddresses.ResetTx()
p.configVersion.ResetTx()
for address, config := range clusterConfigs {
p.resolvedAddresses.WithLabelValues(address).Set(float64(len(config.nodes)))
p.configVersion.WithLabelValues(address).Set(float64(config.version))
}
p.resolvedAddresses.Submit()
p.configVersion.Submit()

p.clusterConfigs = clusterConfigs

return errs.Err()
}

// Addresses returns the latest addresses present in the Provider.
func (p *Provider) Addresses() []string {
p.RLock()
defer p.RUnlock()

var result []string
for _, config := range p.clusterConfigs {
for _, node := range config.nodes {
result = append(result, fmt.Sprintf("%s:%d", node.dns, node.port))
}
}
return result
}
81 changes: 81 additions & 0 deletions pkg/discovery/memcache/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package memcache

import (
"context"
"sort"
"testing"
"time"

"github.com/pkg/errors"

"github.com/go-kit/kit/log"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestProviderUpdatesAddresses(t *testing.T) {
ctx := context.TODO()
clusters := []string{"memcached-cluster-1", "memcached-cluster-2"}
provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second)
resolver := mockResolver{
configs: map[string]*clusterConfig{
"memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}}},
"memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
},
}
provider.resolver = &resolver

testutil.Ok(t, provider.Resolve(ctx, clusters))
addresses := provider.Addresses()
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)

resolver.configs = map[string]*clusterConfig{
"memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-3", ip: "ip-3", port: 11211}}},
"memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
}

testutil.Ok(t, provider.Resolve(ctx, clusters))
addresses = provider.Addresses()
sort.Strings(addresses)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080", "dns-3:11211"}, addresses)
}

func TestProviderDoesNotUpdateAddressIfFailed(t *testing.T) {
ctx := context.TODO()
clusters := []string{"memcached-cluster-1", "memcached-cluster-2"}
provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second)
resolver := mockResolver{
configs: map[string]*clusterConfig{
"memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}}},
"memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
},
}
provider.resolver = &resolver

testutil.Ok(t, provider.Resolve(ctx, clusters))
addresses := provider.Addresses()
sort.Strings(addresses)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)

resolver.configs = nil
resolver.err = errors.New("oops")

testutil.NotOk(t, provider.Resolve(ctx, clusters))
addresses = provider.Addresses()
sort.Strings(addresses)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)
}

type mockResolver struct {
configs map[string]*clusterConfig
err error
}

func (r *mockResolver) Resolve(_ context.Context, address string) (*clusterConfig, error) {
if r.err != nil {
return nil, r.err
}
return r.configs[address], nil
}
Loading

0 comments on commit afd423f

Please sign in to comment.