Skip to content

Commit

Permalink
memorize tsdb local clients without race condition
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Jin <[email protected]>
  • Loading branch information
jnyi committed Nov 27, 2024
1 parent 3f02b08 commit 06e6aa5
Showing 1 changed file with 47 additions and 24 deletions.
71 changes: 47 additions & 24 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type MultiTSDB struct {
hashFunc metadata.HashFunc
hashringConfigs []HashringConfig

tsdbClients []store.Client
exemplarClients map[string]*exemplars.TSDB

metricNameFilterEnabled bool

headExpandedPostingsCacheSize uint64
Expand Down Expand Up @@ -118,6 +121,8 @@ func NewMultiTSDB(
mtx: &sync.RWMutex{},
tenants: map[string]*tenant{},
labels: labels,
tsdbClients: make([]store.Client, 0),
exemplarClients: map[string]*exemplars.TSDB{},
tenantLabelName: tenantLabelName,
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
Expand All @@ -131,6 +136,42 @@ func NewMultiTSDB(
return mt
}

func (t *MultiTSDB) updateTSDBClients() {
t.tsdbClients = t.tsdbClients[:0]
for _, tenant := range t.tenants {
client := tenant.client()
if client != nil {
t.tsdbClients = append(t.tsdbClients, client)
}
}
}

func (t *MultiTSDB) addTenantUnlocked(tenantID string, newTenant *tenant) {
t.tenants[tenantID] = newTenant
t.updateTSDBClients()
if newTenant.exemplars() != nil {
t.exemplarClients[tenantID] = newTenant.exemplars()
}
}

func (t *MultiTSDB) addTenantLocked(tenantID string, newTenant *tenant) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.addTenantUnlocked(tenantID, newTenant)
}

func (t *MultiTSDB) removeTenantUnlocked(tenantID string) {
delete(t.tenants, tenantID)
delete(t.exemplarClients, tenantID)
t.updateTSDBClients()
}

func (t *MultiTSDB) removeTenantLocked(tenantID string) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.removeTenantUnlocked(tenantID)
}

type localClient struct {
store *store.TSDBStore

Expand Down Expand Up @@ -425,7 +466,7 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {
}

level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID)
delete(t.tenants, tenantID)
t.removeTenantUnlocked(tenantID)
}

return merr.Err()
Expand Down Expand Up @@ -588,30 +629,13 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error {
func (t *MultiTSDB) TSDBLocalClients() []store.Client {
t.mtx.RLock()
defer t.mtx.RUnlock()

res := make([]store.Client, 0, len(t.tenants))
for _, tenant := range t.tenants {
client := tenant.client()
if client != nil {
res = append(res, client)
}
}

return res
return t.tsdbClients
}

func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB {
t.mtx.RLock()
defer t.mtx.RUnlock()

res := make(map[string]*exemplars.TSDB, len(t.tenants))
for k, tenant := range t.tenants {
e := tenant.exemplars()
if e != nil {
res[k] = e
}
}
return res
return t.exemplarClients
}

func (t *MultiTSDB) TenantStats(limit int, statsByLabelName string, tenantIDs ...string) []status.TenantStats {
Expand Down Expand Up @@ -704,9 +728,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
nil,
)
if err != nil {
t.mtx.Lock()
delete(t.tenants, tenantID)
t.mtx.Unlock()
t.removeTenantLocked(tenantID)
return err
}
var ship *shipper.Shipper
Expand All @@ -729,6 +751,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
options = append(options, store.WithCuckooMetricNameStoreFilter())
}
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset))
t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil
level.Info(logger).Log("msg", "TSDB is now ready")
return nil
}
Expand Down Expand Up @@ -757,7 +780,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan
}

tenant = newTenant()
t.tenants[tenantID] = tenant
t.addTenantUnlocked(tenantID, tenant)
t.mtx.Unlock()

logger := log.With(t.logger, "tenant", tenantID)
Expand Down

0 comments on commit 06e6aa5

Please sign in to comment.