Skip to content
This repository has been archived by the owner on Jan 12, 2023. It is now read-only.

Commit

Permalink
Added memcached support to the blocks storage index cache
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci committed Mar 19, 2020
1 parent 305b16c commit 0233519
Show file tree
Hide file tree
Showing 22 changed files with 1,093 additions and 288 deletions.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ jobs:
docker pull quay.io/cortexproject/cortex:v0.6.0
docker pull shopify/bigtable-emulator:0.1.0
docker pull rinscy/cassandra:3.11.0
docker pull memcached:1.6.1
- run:
name: Integration Tests
command: |
Expand Down
5 changes: 5 additions & 0 deletions development/tsdb-blocks-storage-s3/config/cortex.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ tsdb:
bucket_store:
sync_dir: /tmp/cortex-tsdb-querier

index_cache:
backend: memcached
memcached:
addresses: dns+memcached:11211

s3:
endpoint: minio:9000
bucket_name: cortex-tsdb
Expand Down
3 changes: 3 additions & 0 deletions development/tsdb-blocks-storage-s3/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ services:
volumes:
- .data-minio:/data:delegated

memcached:
image: memcached:1.6

configstore:
image: nginx
volumes:
Expand Down
53 changes: 48 additions & 5 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2227,11 +2227,6 @@ bucket_store:
# CLI flag: -experimental.tsdb.bucket-store.sync-interval
[sync_interval: <duration> | default = 5m0s]
# Size in bytes of in-memory index cache used to speed up blocks index lookups
# (shared between all tenants).
# CLI flag: -experimental.tsdb.bucket-store.index-cache-size-bytes
[index_cache_size_bytes: <int> | default = 1073741824]
# Max size - in bytes - of a per-tenant chunk pool, used to reduce memory
# allocations.
# CLI flag: -experimental.tsdb.bucket-store.max-chunk-pool-bytes
Expand Down Expand Up @@ -2271,6 +2266,54 @@ bucket_store:
# CLI flag: -experimental.tsdb.bucket-store.consistency-delay
[consistency_delay: <duration> | default = 0s]
index_cache:
# The index cache backend type. Supported values: inmemory, memcached.
# CLI flag: -experimental.tsdb.bucket-store.index-cache.backend
[backend: <string> | default = "inmemory"]
inmemory:
# Maximum size in bytes of in-memory index cache used to speed up blocks
# index lookups (shared between all tenants).
# CLI flag: -experimental.tsdb.bucket-store.index-cache.inmemory.max-size-bytes
[max_size_bytes: <int> | default = 1073741824]
memcached:
# Comma separated list of memcached addresses. Supported prefixes are:
# dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query,
# dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after
# that).
# CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.addresses
[addresses: <string> | default = ""]
# The socket read/write timeout.
# CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.timeout
[timeout: <duration> | default = 100ms]
# The maximum number of idle connections that will be maintained per
# address.
# CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-idle-connections
[max_idle_connections: <int> | default = 16]
# The maximum number of concurrent asynchronous operations can occur.
# CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-async-concurrency
[max_async_concurrency: <int> | default = 50]
# The maximum number of enqueued asynchronous operations allowed.
# CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-async-buffer-size
[max_async_buffer_size: <int> | default = 10000]
# The maximum number of concurrent connections running get operations. If
# set to 0, concurrency is unlimited.
# CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-get-multi-concurrency
[max_get_multi_concurrency: <int> | default = 100]
# The maximum number of keys a single underlying get operation should run.
# If more keys are specified, internally keys are splitted into multiple
# batches and fetched concurrently, honoring the max concurrency. If set
# to 0, the max batch size is unlimited.
# CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-get-multi-batch-size
[max_get_multi_batch_size: <int> | default = 0]
# How frequently does Cortex try to compact TSDB head. Block is only created if
# data covers smallest block range. Must be greater than 0 and max 5 minutes.
# CLI flag: -experimental.tsdb.head-compaction-interval
Expand Down
53 changes: 48 additions & 5 deletions docs/operations/blocks-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,6 @@ tsdb:
# CLI flag: -experimental.tsdb.bucket-store.sync-interval
[sync_interval: <duration> | default = 5m0s]
# Size in bytes of in-memory index cache used to speed up blocks index
# lookups (shared between all tenants).
# CLI flag: -experimental.tsdb.bucket-store.index-cache-size-bytes
[index_cache_size_bytes: <int> | default = 1073741824]
# Max size - in bytes - of a per-tenant chunk pool, used to reduce memory
# allocations.
# CLI flag: -experimental.tsdb.bucket-store.max-chunk-pool-bytes
Expand Down Expand Up @@ -178,6 +173,54 @@ tsdb:
# CLI flag: -experimental.tsdb.bucket-store.consistency-delay
[consistency_delay: <duration> | default = 0s]
index_cache:
# The index cache backend type. Supported values: inmemory, memcached.
# CLI flag: -experimental.tsdb.bucket-store.index-cache.backend
[backend: <string> | default = "inmemory"]
inmemory:
# Maximum size in bytes of in-memory index cache used to speed up blocks
# index lookups (shared between all tenants).
# CLI flag: -experimental.tsdb.bucket-store.index-cache.inmemory.max-size-bytes
[max_size_bytes: <int> | default = 1073741824]
memcached:
# Comma separated list of memcached addresses. Supported prefixes are:
# dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV
# query, dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup
# made after that).
# CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.addresses
[addresses: <string> | default = ""]
# The socket read/write timeout.
# CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.timeout
[timeout: <duration> | default = 100ms]
# The maximum number of idle connections that will be maintained per
# address.
# CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-idle-connections
[max_idle_connections: <int> | default = 16]
# The maximum number of concurrent asynchronous operations can occur.
# CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-async-concurrency
[max_async_concurrency: <int> | default = 50]
# The maximum number of enqueued asynchronous operations allowed.
# CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-async-buffer-size
[max_async_buffer_size: <int> | default = 10000]
# The maximum number of concurrent connections running get operations.
# If set to 0, concurrency is unlimited.
# CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-get-multi-concurrency
[max_get_multi_concurrency: <int> | default = 100]
# The maximum number of keys a single underlying get operation should
# run. If more keys are specified, internally keys are splitted into
# multiple batches and fetched concurrently, honoring the max
# concurrency. If set to 0, the max batch size is unlimited.
# CLI flag: -experimental.tsdb.bucket-store.index-cache.memcached.max-get-multi-batch-size
[max_get_multi_batch_size: <int> | default = 0]
# How frequently does Cortex try to compact TSDB head. Block is only created
# if data covers smallest block range. Must be greater than 0 and max 5
# minutes.
Expand Down
21 changes: 21 additions & 0 deletions integration/e2e/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package e2ecache

import (
"github.com/cortexproject/cortex/integration/e2e"
)

const (
MemcachedPort = 11211
)

func NewMemcached() *e2e.ConcreteService {
return e2e.NewConcreteService(
"memcached",
// If you change the image tag, remember to update it in the preloading done
// by CircleCI too (see .circleci/config.yml).
"memcached:1.6.1",
nil,
e2e.NewTCPReadinessProbe(MemcachedPort),
MemcachedPort,
)
}
29 changes: 29 additions & 0 deletions integration/e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io/ioutil"
"math"
"net"
"os/exec"
"regexp"
"strconv"
Expand Down Expand Up @@ -390,6 +391,34 @@ func (p *HTTPReadinessProbe) Ready(service *ConcreteService) (err error) {
return fmt.Errorf("got no expected status code: %v, expected: %v", res.StatusCode, p.expectedStatus)
}

// TCPReadinessProbe checks readiness by ensure a TCP connection can be established.
type TCPReadinessProbe struct {
port int
}

func NewTCPReadinessProbe(port int) *TCPReadinessProbe {
return &TCPReadinessProbe{
port: port,
}
}

func (p *TCPReadinessProbe) Ready(service *ConcreteService) (err error) {
endpoint := service.Endpoint(p.port)
if endpoint == "" {
return fmt.Errorf("cannot get service endpoint for port %d", p.port)
} else if endpoint == "stopped" {
return errors.New("service has stopped")
}

conn, err := net.DialTimeout("tcp", endpoint, time.Second)
if err != nil {
return err
}

_ = conn.Close()
return nil
}

// CmdReadinessProbe checks readiness by `Exec`ing a command (within container) which returns 0 to consider status being ready
type CmdReadinessProbe struct {
cmd *Command
Expand Down
49 changes: 42 additions & 7 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
)

func TestQuerierWithBlocksStorage(t *testing.T) {
Expand All @@ -29,6 +31,17 @@ func TestQuerierWithBlocksStorage(t *testing.T) {
"-querier.ingester-streaming": "true",
}),
},
"querier running with inmemory index cache": {
flags: mergeFlags(BlocksStorageFlags, map[string]string{
"-experimental.tsdb.bucket-store.index-cache.backend": "inmemory",
}),
},
"queintegration/e2e/service.gorier running with memcached index cache": {
flags: mergeFlags(BlocksStorageFlags, map[string]string{
// The address will be inject during the test execution because it's dynamic.
"-experimental.tsdb.bucket-store.index-cache.backend": "memcached",
}),
},
}

for testName, testCfg := range tests {
Expand All @@ -48,10 +61,20 @@ func TestQuerierWithBlocksStorage(t *testing.T) {
"-experimental.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
})

// Detect the index cache backend from flags.
indexCacheBackend := tsdb.IndexCacheBackendDefault
if flags["-experimental.tsdb.bucket-store.index-cache.backend"] != "" {
indexCacheBackend = flags["-experimental.tsdb.bucket-store.index-cache.backend"]
}

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-experimental.tsdb.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))
memcached := e2ecache.NewMemcached()
require.NoError(t, s.StartAndWaitReady(consul, minio, memcached))

// Add the memcached address to the flags.
flags["-experimental.tsdb.bucket-store.index-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort)

// Start Cortex components.
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
Expand Down Expand Up @@ -121,19 +144,31 @@ func TestQuerierWithBlocksStorage(t *testing.T) {
assert.Equal(t, expectedVector3, result.(model.Vector))

// Check the in-memory index cache metrics (in the querier).
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items")) // 2 series both for postings and series cache
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items_added_total")) // 2 series both for postings and series cache
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(0), "cortex_querier_blocks_index_cache_hits_total")) // no cache hit cause the cache was empty
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(7), "cortex_querier_blocks_index_cache_requests_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(0), "cortex_querier_blocks_index_cache_hits_total")) // no cache hit cause the cache was empty

if indexCacheBackend == tsdb.IndexCacheBackendInMemory {
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items")) // 2 series both for postings and series cache
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items_added_total")) // 2 series both for postings and series cache
} else if indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(11), "cortex_querier_blocks_index_cache_memcached_operations_total")) // 7 gets + 4 sets
}

// Query back again the 1st series from storage. This time it should use the index cache.
result, err = c.Query("series_1", series1Timestamp)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector1, result.(model.Vector))

require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items")) // as before
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items_added_total")) // as before
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2), "cortex_querier_blocks_index_cache_hits_total")) // this time has used the index cache
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(7+2), "cortex_querier_blocks_index_cache_requests_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2), "cortex_querier_blocks_index_cache_hits_total")) // this time has used the index cache

if indexCacheBackend == tsdb.IndexCacheBackendInMemory {
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items")) // as before
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items_added_total")) // as before
} else if indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(11+2), "cortex_querier_blocks_index_cache_memcached_operations_total")) // as before + 2 gets
}
})
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type UserStore struct {
client storepb.StoreClient
logLevel logging.Level
bucketStoreMetrics *tsdbBucketStoreMetrics
indexCacheMetrics *tsdbIndexCacheMetrics
indexCacheMetrics prometheus.Collector

// Index cache shared across all tenants.
indexCache storecache.IndexCache
Expand All @@ -67,7 +67,7 @@ func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel loggin
stores: map[string]*store.BucketStore{},
logLevel: logLevel,
bucketStoreMetrics: newTSDBBucketStoreMetrics(),
indexCacheMetrics: newTSDBIndexCacheMetrics(indexCacheRegistry),
indexCacheMetrics: tsdb.MustNewIndexCacheMetrics(cfg.BucketStore.IndexCache.Backend, indexCacheRegistry),
syncTimes: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_querier_blocks_sync_seconds",
Help: "The total time it takes to perform a sync stores",
Expand All @@ -77,7 +77,7 @@ func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel loggin

// Init the index cache.
var err error
if u.indexCache, err = tsdb.NewIndexCache(cfg.BucketStore, logger, indexCacheRegistry); err != nil {
if u.indexCache, err = tsdb.NewIndexCache(cfg.BucketStore.IndexCache, logger, indexCacheRegistry); err != nil {
return nil, errors.Wrap(err, "create index cache")
}

Expand Down
Loading

0 comments on commit 0233519

Please sign in to comment.