Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Receive] Fix race condition when adding multiple new tenants at once #7941

Merged
merged 5 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7893](https://github.com/thanos-io/thanos/pull/7893) Sidecar: Fix retrieval of external labels for Prometheus v3.0.0.
- [#7903](https://github.com/thanos-io/thanos/pull/7903) Query: Fix panic on regex store matchers.
- [#7915](https://github.com/thanos-io/thanos/pull/7915) Store: Close block series client at the end to not reuse chunk buffer
- [#7941](https://github.com/thanos-io/thanos/pull/7941) Receive: Fix race condition when adding multiple new tenants, see [issue-7892](https://github.com/thanos-io/thanos/issues/7892).

### Added

Expand Down
2 changes: 1 addition & 1 deletion docs/sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Queries against store gateway which are touching large number of blocks (no matt

# Relabelling

Similar to [promtail](https://grafana.com/docs/loki/latest/send-data/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax.
Similar to [promtail](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax.

Currently, thanos only supports the following relabel actions:

Expand Down
140 changes: 66 additions & 74 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,8 @@ type MultiTSDB struct {
hashFunc metadata.HashFunc
hashringConfigs []HashringConfig

tsdbClients []store.Client
tsdbClientsNeedUpdate bool

exemplarClients map[string]*exemplars.TSDB
exemplarClientsNeedUpdate bool
tsdbClients []store.Client
exemplarClients map[string]*exemplars.TSDB

metricNameFilterEnabled bool

Expand Down Expand Up @@ -117,19 +114,19 @@ func NewMultiTSDB(
}

mt := &MultiTSDB{
dataDir: dataDir,
logger: log.With(l, "component", "multi-tsdb"),
reg: reg,
tsdbOpts: tsdbOpts,
mtx: &sync.RWMutex{},
tenants: map[string]*tenant{},
labels: labels,
tsdbClientsNeedUpdate: true,
exemplarClientsNeedUpdate: true,
tenantLabelName: tenantLabelName,
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
dataDir: dataDir,
logger: log.With(l, "component", "multi-tsdb"),
reg: reg,
tsdbOpts: tsdbOpts,
mtx: &sync.RWMutex{},
tenants: map[string]*tenant{},
labels: labels,
tsdbClients: make([]store.Client, 0),
exemplarClients: map[string]*exemplars.TSDB{},
tenantLabelName: tenantLabelName,
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
}

for _, option := range options {
Expand All @@ -139,6 +136,49 @@ func NewMultiTSDB(
return mt
}

// testGetTenant returns the tenant with the given tenantID for testing purposes.
func (t *MultiTSDB) testGetTenant(tenantID string) *tenant {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.tenants[tenantID]
}

func (t *MultiTSDB) updateTSDBClients() {
t.tsdbClients = t.tsdbClients[:0]
for _, tenant := range t.tenants {
client := tenant.client()
if client != nil {
t.tsdbClients = append(t.tsdbClients, client)
}
}
}

func (t *MultiTSDB) addTenantUnlocked(tenantID string, newTenant *tenant) {
t.tenants[tenantID] = newTenant
t.updateTSDBClients()
if newTenant.exemplars() != nil {
t.exemplarClients[tenantID] = newTenant.exemplars()
}
}

func (t *MultiTSDB) addTenantLocked(tenantID string, newTenant *tenant) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.addTenantUnlocked(tenantID, newTenant)
}

func (t *MultiTSDB) removeTenantUnlocked(tenantID string) {
delete(t.tenants, tenantID)
delete(t.exemplarClients, tenantID)
t.updateTSDBClients()
}

func (t *MultiTSDB) removeTenantLocked(tenantID string) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.removeTenantUnlocked(tenantID)
}

type localClient struct {
store *store.TSDBStore

Expand Down Expand Up @@ -433,9 +473,7 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {
}

level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID)
delete(t.tenants, tenantID)
t.tsdbClientsNeedUpdate = true
t.exemplarClientsNeedUpdate = true
t.removeTenantUnlocked(tenantID)
}

return merr.Err()
Expand Down Expand Up @@ -595,58 +633,17 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error {
return merr.Err()
}

// TSDBLocalClients should be used as read-only.
func (t *MultiTSDB) TSDBLocalClients() []store.Client {
t.mtx.RLock()
if !t.tsdbClientsNeedUpdate {
t.mtx.RUnlock()
return t.tsdbClients
}

t.mtx.RUnlock()
t.mtx.Lock()
defer t.mtx.Unlock()
if !t.tsdbClientsNeedUpdate {
return t.tsdbClients
}

res := make([]store.Client, 0, len(t.tenants))
for _, tenant := range t.tenants {
client := tenant.client()
if client != nil {
res = append(res, client)
}
}

t.tsdbClientsNeedUpdate = false
t.tsdbClients = res

defer t.mtx.RUnlock()
return t.tsdbClients
}

// TSDBExemplars should be used as read-only.
func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB {
t.mtx.RLock()
if !t.exemplarClientsNeedUpdate {
t.mtx.RUnlock()
return t.exemplarClients
}
t.mtx.RUnlock()
t.mtx.Lock()
defer t.mtx.Unlock()

if !t.exemplarClientsNeedUpdate {
return t.exemplarClients
}

res := make(map[string]*exemplars.TSDB, len(t.tenants))
for k, tenant := range t.tenants {
e := tenant.exemplars()
if e != nil {
res[k] = e
}
}

t.exemplarClientsNeedUpdate = false
t.exemplarClients = res
defer t.mtx.RUnlock()
return t.exemplarClients
}

Expand Down Expand Up @@ -740,11 +737,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
nil,
)
if err != nil {
t.mtx.Lock()
delete(t.tenants, tenantID)
t.tsdbClientsNeedUpdate = true
t.exemplarClientsNeedUpdate = true
t.mtx.Unlock()
t.removeTenantLocked(tenantID)
return err
}
var ship *shipper.Shipper
Expand All @@ -767,6 +760,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
options = append(options, store.WithCuckooMetricNameStoreFilter())
}
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset))
t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
level.Info(logger).Log("msg", "TSDB is now ready")
return nil
}
Expand Down Expand Up @@ -795,9 +789,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan
}

tenant = newTenant()
t.tenants[tenantID] = tenant
t.tsdbClientsNeedUpdate = true
t.exemplarClientsNeedUpdate = true
t.addTenantUnlocked(tenantID, tenant)
t.mtx.Unlock()

logger := log.With(t.logger, "tenant", tenantID)
Expand Down
49 changes: 47 additions & 2 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package receive

import (
"context"
"fmt"
"io"
"math"
"os"
Expand Down Expand Up @@ -193,7 +194,7 @@ func TestMultiTSDB(t *testing.T) {
testutil.Ok(t, m.Open())
testutil.Ok(t, appendSample(m, testTenant, time.Now()))

tenant := m.tenants[testTenant]
tenant := m.testGetTenant(testTenant)
db := tenant.readyStorage().Get()

testutil.Equals(t, 0, len(db.Blocks()))
Expand Down Expand Up @@ -541,6 +542,47 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) {
testutil.Equals(t, 1, len(m.TSDBLocalClients()))
}

func TestMultiTSDBAddNewTenant(t *testing.T) {
t.Parallel()
const iterations = 10
// This test detects race conditions, so we run it multiple times to increase the chance of catching the issue.
for i := 0; i < iterations; i++ {
t.Run(fmt.Sprintf("iteration-%d", i), func(t *testing.T) {
dir := t.TempDir()
m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(),
&tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
},
labels.FromStrings("replica", "test"),
"tenant_id",
objstore.NewInMemBucket(),
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()

concurrency := 50
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
// simulate remote write with new tenant concurrently
go func(i int) {
defer wg.Done()
testutil.Ok(t, appendSample(m, fmt.Sprintf("tenant-%d", i), time.UnixMilli(int64(10))))
}(i)
// simulate read request concurrently
go func() {
m.TSDBLocalClients()
}()
}
wg.Wait()
testutil.Equals(t, concurrency, len(m.TSDBLocalClients()))
})
}
}

func TestAlignedHeadFlush(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -801,7 +843,10 @@ func appendSampleWithLabels(m *MultiTSDB, tenant string, lbls labels.Labels, tim

func queryLabelValues(ctx context.Context, m *MultiTSDB) error {
proxy := store.NewProxyStore(nil, nil, func() []store.Client {
clients := m.TSDBLocalClients()
m.mtx.Lock()
defer m.mtx.Unlock()
clients := make([]store.Client, len(m.tsdbClients))
copy(clients, m.tsdbClients)
if len(clients) > 0 {
clients[0] = &slowClient{clients[0]}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/receive/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) {

for _, c := range tc.cfg {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) {

for _, c := range initialConfig {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand All @@ -319,7 +319,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) {

for _, c := range changedConfig {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down Expand Up @@ -534,7 +534,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) {

for _, c := range initialConfig {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down Expand Up @@ -704,7 +704,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) {

for _, c := range initialConfig {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down Expand Up @@ -778,7 +778,7 @@ func TestReceiverLabelsNotOverwrittenByExternalLabels(t *testing.T) {

for _, c := range cfg {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down
Loading