From dd994ecbee2a80984c9badef8f215ba8cd12d8cc Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 31 Jan 2024 14:23:26 -0800 Subject: [PATCH] slims down existing bloomcompactor code to prepare for integration of refactored logic Signed-off-by: Owen Diehl --- pkg/bloomcompactor/bloomcompactor.go | 475 ++---------------- pkg/bloomcompactor/bloomcompactor_test.go | 245 --------- pkg/bloomcompactor/chunkcompactor.go | 245 --------- pkg/bloomcompactor/chunkcompactor_test.go | 229 --------- pkg/bloomcompactor/chunksbatchesiterator.go | 48 -- .../chunksbatchesiterator_test.go | 96 ---- pkg/bloomcompactor/job.go | 85 ---- pkg/bloomcompactor/mergecompactor.go | 150 ------ pkg/bloomcompactor/table_utils.go | 21 - pkg/bloomcompactor/utils.go | 37 -- 10 files changed, 42 insertions(+), 1589 deletions(-) delete mode 100644 pkg/bloomcompactor/bloomcompactor_test.go delete mode 100644 pkg/bloomcompactor/chunkcompactor.go delete mode 100644 pkg/bloomcompactor/chunkcompactor_test.go delete mode 100644 pkg/bloomcompactor/chunksbatchesiterator.go delete mode 100644 pkg/bloomcompactor/chunksbatchesiterator_test.go delete mode 100644 pkg/bloomcompactor/job.go delete mode 100644 pkg/bloomcompactor/mergecompactor.go delete mode 100644 pkg/bloomcompactor/utils.go diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 34885ae2d3947..365372eb2584d 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -1,198 +1,71 @@ -/* -Bloom-compactor - -This is a standalone service that is responsible for compacting TSDB indexes into bloomfilters. -It creates and merges bloomfilters into an aggregated form, called bloom-blocks. -It maintains a list of references between bloom-blocks and TSDB indexes in files called meta.jsons. - -Bloom-compactor regularly runs to check for changes in meta.jsons and runs compaction only upon changes in TSDBs. - -bloomCompactor.Compactor - - | // Read/Write path - bloomshipper.Store** - | - bloomshipper.Shipper - | - bloomshipper.BloomClient - | - ObjectClient - | - .....................service boundary - | - object storage -*/ package bloomcompactor import ( "context" "fmt" - "io" - "math" - "math/rand" - "os" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" - "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/services" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" - - "path/filepath" - - "github.com/google/uuid" "github.com/grafana/loki/pkg/bloomutils" - "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/compactor" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" - "github.com/grafana/loki/pkg/storage/chunk/cache" - chunk_client "github.com/grafana/loki/pkg/storage/chunk/client" - "github.com/grafana/loki/pkg/storage/chunk/client/local" - "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" - "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper" - shipperindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/index" - index_storage "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/storage" - "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" - tsdbindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" "github.com/grafana/loki/pkg/util" ) +/* +Bloom-compactor + +This is a standalone service that is responsible for compacting TSDB indexes into bloomfilters. +It creates and merges bloomfilters into an aggregated form, called bloom-blocks. +It maintains a list of references between bloom-blocks and TSDB indexes in files called meta.jsons. + +Bloom-compactor regularly runs to check for changes in meta.jsons and runs compaction only upon changes in TSDBs. +*/ type Compactor struct { services.Service - cfg Config - logger log.Logger - schemaCfg config.SchemaConfig - limits Limits + cfg Config + logger log.Logger + limits Limits // temporary workaround until store has implemented read/write shipper interface - bloomShipperClient bloomshipper.StoreAndClient - - // Client used to run operations on the bucket storing bloom blocks. - storeClients map[config.DayTime]storeClient + store bloomshipper.StoreAndClient sharding ShardingStrategy metrics *metrics btMetrics *v1.Metrics - reg prometheus.Registerer -} - -type storeClient struct { - object chunk_client.ObjectClient - index index_storage.Client - chunk chunk_client.Client - indexShipper indexshipper.IndexShipper } func New( cfg Config, - storageCfg storage.Config, - schemaConfig config.SchemaConfig, + store bloomshipper.StoreAndClient, + sharding ShardingStrategy, limits Limits, logger log.Logger, - sharding ShardingStrategy, - clientMetrics storage.ClientMetrics, r prometheus.Registerer, ) (*Compactor, error) { c := &Compactor{ - cfg: cfg, - logger: logger, - schemaCfg: schemaConfig, - sharding: sharding, - limits: limits, - reg: r, + cfg: cfg, + store: store, + logger: logger, + sharding: sharding, + limits: limits, } - // TODO(chaudum): Plug in cache - var metasCache cache.Cache - var blocksCache *cache.EmbeddedCache[string, io.ReadCloser] - bloomClient, err := bloomshipper.NewBloomStore(schemaConfig.Configs, storageCfg, clientMetrics, metasCache, blocksCache, logger) - if err != nil { - return nil, err - } - - c.storeClients = make(map[config.DayTime]storeClient) - // initialize metrics c.btMetrics = v1.NewMetrics(prometheus.WrapRegistererWithPrefix("loki_bloom_tokenizer", r)) - - indexShipperReg := prometheus.WrapRegistererWithPrefix("loki_bloom_compactor_tsdb_shipper_", r) - - for i, periodicConfig := range schemaConfig.Configs { - if periodicConfig.IndexType != config.TSDBType { - level.Warn(c.logger).Log("msg", "skipping schema period because index type is not supported", "index_type", periodicConfig.IndexType, "period", periodicConfig.From) - continue - } - - // Configure ObjectClient and IndexShipper for series and chunk management - objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, storageCfg, clientMetrics) - if err != nil { - return nil, fmt.Errorf("error creating object client '%s': %w", periodicConfig.ObjectType, err) - } - - periodEndTime := config.DayTime{Time: math.MaxInt64} - if i < len(schemaConfig.Configs)-1 { - periodEndTime = config.DayTime{Time: schemaConfig.Configs[i+1].From.Time.Add(-time.Millisecond)} - } - - pReg := prometheus.WrapRegistererWith( - prometheus.Labels{ - "component": fmt.Sprintf( - "index-store-%s-%s", - periodicConfig.IndexType, - periodicConfig.From.String(), - ), - }, indexShipperReg) - pLogger := log.With(logger, "index-store", fmt.Sprintf("%s-%s", periodicConfig.IndexType, periodicConfig.From.String())) - - indexShipper, err := indexshipper.NewIndexShipper( - periodicConfig.IndexTables.PathPrefix, - storageCfg.TSDBShipperConfig, - objectClient, - limits, - nil, - func(p string) (shipperindex.Index, error) { - return tsdb.OpenShippableTSDB(p) - }, - periodicConfig.GetIndexTableNumberRange(periodEndTime), - pReg, - pLogger, - ) - - if err != nil { - return nil, errors.Wrap(err, "create index shipper") - } - - // The ObjectClient does not expose the key encoder it uses, - // so check the concrete type and set the FSEncoder if needed. - var keyEncoder chunk_client.KeyEncoder - switch objectClient.(type) { - case *local.FSObjectClient: - keyEncoder = chunk_client.FSEncoder - } - - c.storeClients[periodicConfig.From] = storeClient{ - object: objectClient, - index: index_storage.NewIndexStorageClient(objectClient, periodicConfig.IndexTables.PathPrefix), - chunk: chunk_client.NewClient(objectClient, keyEncoder, schemaConfig), - indexShipper: indexShipper, - } - } - - // temporary workaround until store has implemented read/write shipper interface - c.bloomShipperClient = bloomClient - c.metrics = newMetrics(r) c.metrics.compactionRunInterval.Set(cfg.CompactionInterval.Seconds()) - c.Service = services.NewBasicService(c.starting, c.running, c.stopping) return c, nil @@ -237,40 +110,23 @@ func (c *Compactor) stopping(_ error) error { func (c *Compactor) runCompaction(ctx context.Context) error { var tables []string - for _, sc := range c.storeClients { - // refresh index list cache since previous compaction would have changed the index files in the object store - sc.index.RefreshIndexTableNamesCache(ctx) - tbls, err := sc.index.ListTables(ctx) - if err != nil { - return fmt.Errorf("failed to list tables: %w", err) - } - tables = append(tables, tbls...) - } + // TODO(owen-d): resolve tables // process most recent tables first tablesIntervals := getIntervalsForTables(tables) - sortTablesByRange(tables, tablesIntervals) - - parallelism := c.cfg.MaxCompactionParallelism - if parallelism == 0 { - parallelism = len(tables) - } + compactor.SortTablesByRange(tables) - // TODO(salvacorts): We currently parallelize at the table level. We may want to parallelize at the tenant and job level as well. - // To do that, we should create a worker pool with c.cfg.MaxCompactionParallelism number of workers. - errs := multierror.New() - _ = concurrency.ForEachJob(ctx, len(tables), parallelism, func(ctx context.Context, i int) error { - tableName := tables[i] - logger := log.With(c.logger, "table", tableName) - err := c.compactTable(ctx, logger, tableName, tablesIntervals[tableName]) + // TODO(owen-d): parallelize at the bottom level, not the top level. + // Can dispatch to a queue & wait. + for _, table := range tables { + logger := log.With(c.logger, "table", table) + err := c.compactTable(ctx, logger, table, tablesIntervals[table]) if err != nil { - errs.Add(err) - return nil + level.Error(logger).Log("msg", "failed to compact table", "err", err) + return errors.Wrapf(err, "failed to compact table %s", table) } - return nil - }) - - return errs.Err() + } + return nil } func (c *Compactor) compactTable(ctx context.Context, logger log.Logger, tableName string, tableInterval model.Interval) error { @@ -279,29 +135,13 @@ func (c *Compactor) compactTable(ctx context.Context, logger log.Logger, tableNa return fmt.Errorf("interrupting compaction of table: %w", err) } - schemaCfg, ok := schemaPeriodForTable(c.schemaCfg, tableName) - if !ok { - level.Error(logger).Log("msg", "skipping compaction since we can't find schema for table") - return nil - } - - sc, ok := c.storeClients[schemaCfg.From] - if !ok { - return fmt.Errorf("index store client not found for period starting at %s", schemaCfg.From.String()) - } - - _, tenants, err := sc.index.ListFiles(ctx, tableName, true) - if err != nil { - return fmt.Errorf("failed to list files for table %s: %w", tableName, err) - } + var tenants []string - c.metrics.compactionRunDiscoveredTenants.Add(float64(len(tenants))) level.Info(logger).Log("msg", "discovered tenants from bucket", "users", len(tenants)) - return c.compactUsers(ctx, logger, sc, tableName, tableInterval, tenants) + return c.compactUsers(ctx, logger, tableName, tableInterval, tenants) } -// See: https://github.com/grafana/mimir/blob/34852137c332d4050e53128481f4f6417daee91e/pkg/compactor/compactor.go#L566-L689 -func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, sc storeClient, tableName string, tableInterval model.Interval, tenants []string) error { +func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, tableName string, tableInterval model.Interval, tenants []string) error { // Keep track of tenants owned by this shard, so that we can delete the local files for all other users. errs := multierror.New() ownedTenants := make(map[string]struct{}, len(tenants)) @@ -337,7 +177,7 @@ func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, sc stor ownedTenants[tenant] = struct{}{} start := time.Now() - if err := c.compactTenantWithRetries(ctx, tenantLogger, sc, tableName, tenant); err != nil { + if err := c.compactTenantWithRetries(ctx, tenantLogger, tableName, tenant); err != nil { switch { case errors.Is(err, context.Canceled): // We don't want to count shutdowns as failed compactions because we will pick up with the rest of the compaction after the restart. @@ -362,7 +202,7 @@ func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, sc stor // TODO: Delete local files for unowned tenants, if there are any. } -func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc storeClient, tableName string, tenant string) error { +func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, tableName string, tenant string) error { level.Info(logger).Log("msg", "starting compaction of tenant") // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). @@ -373,9 +213,8 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto // Tokenizer is not thread-safe so we need one per goroutine. nGramLen := c.limits.BloomNGramLength(tenant) nGramSkip := c.limits.BloomNGramSkip(tenant) - bt := v1.NewBloomTokenizer(nGramLen, nGramSkip, c.btMetrics) + _ = v1.NewBloomTokenizer(nGramLen, nGramSkip, c.btMetrics) - errs := multierror.New() rs, err := c.sharding.GetTenantSubRing(tenant).GetAllHealthy(RingOp) if err != nil { return err @@ -385,77 +224,8 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto level.Debug(logger).Log("msg", "got token range for instance", "id", tr.Instance.Id, "min", tr.MinToken, "max", tr.MaxToken) } - // TODO(owen-d): can be optimized to only query for series within the fp range of the compactor shard(s) rather than scanning all series - // and filtering out the ones that don't belong to the compactor shard(s). - _ = sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error { - if isMultiTenantIndex { - // Skip multi-tenant indexes - level.Debug(logger).Log("msg", "skipping multi-tenant index", "table", tableName, "index", idx.Name()) - return nil - } - - tsdbFile, ok := idx.(*tsdb.TSDBFile) - if !ok { - errs.Add(fmt.Errorf("failed to cast to TSDBFile")) - return nil - } - - tsdbIndex, ok := tsdbFile.Index.(*tsdb.TSDBIndex) - if !ok { - errs.Add(fmt.Errorf("failed to cast to TSDBIndex")) - return nil - } - - var seriesMetas []seriesMeta - - err := tsdbIndex.ForSeries( - ctx, nil, - 0, math.MaxInt64, // TODO: Replace with MaxLookBackPeriod - func(labels labels.Labels, fingerprint model.Fingerprint, chksMetas []tsdbindex.ChunkMeta) { - if !tokenRanges.Contains(uint32(fingerprint)) { - return - } - - temp := make([]tsdbindex.ChunkMeta, len(chksMetas)) - ls := labels.Copy() - _ = copy(temp, chksMetas) - //All seriesMetas given a table within fp of this compactor shard - seriesMetas = append(seriesMetas, seriesMeta{seriesFP: fingerprint, seriesLbs: ls, chunkRefs: temp}) - }, - labels.MustNewMatcher(labels.MatchEqual, "", ""), - ) - - if err != nil { - errs.Add(err) - return nil - } - - if len(seriesMetas) == 0 { - level.Debug(logger).Log("msg", "skipping index because it does not have any matching series", "table", tableName, "index", idx.Name()) - return nil - } - - job := NewJob(tenant, tableName, idx.Path(), seriesMetas) - jobLogger := log.With(logger, "job", job.String()) - c.metrics.compactionRunJobStarted.Inc() - - start := time.Now() - err = c.runCompact(ctx, jobLogger, job, bt, sc) - if err != nil { - c.metrics.compactionRunJobCompleted.WithLabelValues(statusFailure).Inc() - c.metrics.compactionRunJobTime.WithLabelValues(statusFailure).Observe(time.Since(start).Seconds()) - errs.Add(errors.Wrap(err, fmt.Sprintf("runBloomCompact failed for job %s", job.String()))) - return nil - } - - c.metrics.compactionRunJobCompleted.WithLabelValues(statusSuccess).Inc() - c.metrics.compactionRunJobTime.WithLabelValues(statusSuccess).Observe(time.Since(start).Seconds()) - level.Debug(logger).Log("msg", "compaction of job succeeded", "job", job.String(), "duration", time.Since(start)) - - return nil - }) - - return errs.Err() + // TODO(owen-d): impl + return nil } func runWithRetries( @@ -484,175 +254,14 @@ func runWithRetries( return lastErr } -func (c *Compactor) compactTenantWithRetries(ctx context.Context, logger log.Logger, sc storeClient, tableName string, tenant string) error { +func (c *Compactor) compactTenantWithRetries(ctx context.Context, logger log.Logger, tableName string, tenant string) error { return runWithRetries( ctx, c.cfg.RetryMinBackoff, c.cfg.RetryMaxBackoff, c.cfg.CompactionRetries, func(ctx context.Context) error { - return c.compactTenant(ctx, logger, sc, tableName, tenant) + return c.compactTenant(ctx, logger, tableName, tenant) }, ) } - -func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, bt *v1.BloomTokenizer, storeClient storeClient) error { - // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). - if err := ctx.Err(); err != nil { - return err - } - metaSearchParams := bloomshipper.MetaSearchParams{ - TenantID: job.tenantID, - Keyspace: v1.NewBounds(job.minFp, job.maxFp), - Interval: bloomshipper.Interval{Start: job.from, End: job.through}, - } - var metas []bloomshipper.Meta - //TODO Configure pool for these to avoid allocations - var activeBloomBlocksRefs []bloomshipper.BlockRef - - metaRefs, fetchers, err := c.bloomShipperClient.ResolveMetas(ctx, metaSearchParams) - if err != nil { - return err - } - - for i := range fetchers { - res, err := fetchers[i].FetchMetas(ctx, metaRefs[i]) - if err != nil { - return err - } - metas = append(metas, res...) - } - - // TODO This logic currently is NOT concerned with cutting blocks upon topology changes to bloom-compactors. - // It may create blocks with series outside of the fp range of the compactor. Cutting blocks will be addressed in a follow-up PR. - metasMatchingJob, blocksMatchingJob := matchingBlocks(metas, job) - - localDst := createLocalDirName(c.cfg.WorkingDirectory, job) - blockOptions := v1.NewBlockOptions(bt.GetNGramLength(), bt.GetNGramSkip()) - - defer func() { - //clean up the bloom directory - if err := os.RemoveAll(localDst); err != nil { - level.Error(logger).Log("msg", "failed to remove block directory", "dir", localDst, "err", err) - } - }() - - var resultingBlock bloomshipper.Block - defer func() { - if resultingBlock.Data != nil { - _ = resultingBlock.Data.Close() - } - }() - - level.Info(logger).Log("msg", "started compacting table", "table", job.tableName, "tenant", job.tenantID) - if len(blocksMatchingJob) == 0 && len(metasMatchingJob) > 0 { - // There is no change to any blocks, no compaction needed - level.Info(logger).Log("msg", "No changes to tsdb, no compaction needed") - return nil - } else if len(metasMatchingJob) == 0 { - // No matching existing blocks for this job, compact all series from scratch - level.Info(logger).Log("msg", "No matching existing blocks for this job, compact all series from scratch") - - builder, err := NewPersistentBlockBuilder(localDst, blockOptions) - if err != nil { - level.Error(logger).Log("msg", "failed creating block builder", "err", err) - return err - } - - // NB(owen-d): this panics/etc, but the code is being refactored and will be removed. I've replaced `bt` with `nil` - // to pass compiler checks while keeping this code around as reference - resultingBlock, err = compactNewChunks(ctx, logger, job, nil, storeClient.chunk, builder, c.limits) - if err != nil { - return level.Error(logger).Log("msg", "failed compacting new chunks", "err", err) - } - - } else if len(blocksMatchingJob) > 0 { - // When already compacted metas exists, we need to merge all blocks with amending blooms with new series - level.Info(logger).Log("msg", "already compacted metas exists, use mergeBlockBuilder") - - var populate = createPopulateFunc(ctx, job, storeClient, bt, c.limits) - - seriesIter := makeSeriesIterFromSeriesMeta(job) - - blockIters, blockPaths, err := makeBlockIterFromBlocks(ctx, logger, c.bloomShipperClient, blocksMatchingJob, c.cfg.WorkingDirectory) - defer func() { - for _, path := range blockPaths { - if err := os.RemoveAll(path); err != nil { - level.Error(logger).Log("msg", "failed removing uncompressed bloomDir", "dir", path, "err", err) - } - } - }() - - if err != nil { - level.Error(logger).Log("err", err) - return err - } - - mergeBlockBuilder, err := NewPersistentBlockBuilder(localDst, blockOptions) - if err != nil { - level.Error(logger).Log("msg", "failed creating block builder", "err", err) - return err - } - - resultingBlock, err = mergeCompactChunks(logger, populate, mergeBlockBuilder, blockIters, seriesIter, job) - if err != nil { - level.Error(logger).Log("msg", "failed merging existing blocks with new chunks", "err", err) - return err - } - - } - - archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String()) - - blockToUpload, err := bloomshipper.CompressBloomBlock(resultingBlock.BlockRef, archivePath, localDst, logger) - if err != nil { - level.Error(logger).Log("msg", "failed compressing bloom blocks into tar file", "err", err) - return err - } - - defer func() { - err = os.Remove(archivePath) - if err != nil { - level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath) - } - }() - - // Do not change the signature of PutBlocks yet. - // Once block size is limited potentially, compactNewChunks will return multiple blocks, hence a list is appropriate. - storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blockToUpload}) - if err != nil { - level.Error(logger).Log("msg", "failed uploading blocks to storage", "err", err) - return err - } - - // all blocks are new and active blocks - for _, block := range storedBlocks { - activeBloomBlocksRefs = append(activeBloomBlocksRefs, block.BlockRef) - } - - // TODO delete old metas in later compactions - // After all is done, create one meta file and upload to storage - meta := bloomshipper.Meta{ - MetaRef: bloomshipper.MetaRef{ - Ref: bloomshipper.Ref{ - TenantID: job.tenantID, - TableName: job.tableName, - MinFingerprint: uint64(job.minFp), - MaxFingerprint: uint64(job.maxFp), - StartTimestamp: job.from, - EndTimestamp: job.through, - Checksum: rand.Uint32(), // Discuss if checksum is needed for Metas, why should we read all data again. - }, - }, - Tombstones: blocksMatchingJob, - Blocks: activeBloomBlocksRefs, - } - - err = c.bloomShipperClient.PutMeta(ctx, meta) - if err != nil { - level.Error(logger).Log("msg", "failed uploading meta.json to storage", "err", err) - return err - } - level.Info(logger).Log("msg", "finished compacting table", "table", job.tableName, "tenant", job.tenantID) - return nil -} diff --git a/pkg/bloomcompactor/bloomcompactor_test.go b/pkg/bloomcompactor/bloomcompactor_test.go deleted file mode 100644 index 6221610321b69..0000000000000 --- a/pkg/bloomcompactor/bloomcompactor_test.go +++ /dev/null @@ -1,245 +0,0 @@ -package bloomcompactor - -import ( - "context" - "flag" - "fmt" - "path/filepath" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" - "github.com/grafana/dskit/kv/consul" - "github.com/grafana/dskit/ring" - "github.com/grafana/dskit/services" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/grafana/loki/pkg/compactor" - "github.com/grafana/loki/pkg/storage" - "github.com/grafana/loki/pkg/storage/chunk/client/local" - "github.com/grafana/loki/pkg/storage/config" - "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper" - lokiring "github.com/grafana/loki/pkg/util/ring" - "github.com/grafana/loki/pkg/validation" -) - -const ( - indexTablePrefix = "table_" - workingDirName = "working-dir" -) - -func parseDayTime(s string) config.DayTime { - t, err := time.Parse("2006-01-02", s) - if err != nil { - panic(err) - } - return config.DayTime{ - Time: model.TimeFromUnix(t.Unix()), - } -} - -func TestCompactor_StartStopService(t *testing.T) { - shardingStrategy := NewNoopStrategy() - logger := log.NewNopLogger() - reg := prometheus.NewRegistry() - - cm := storage.NewClientMetrics() - t.Cleanup(cm.Unregister) - - var limits validation.Limits - limits.RegisterFlags(flag.NewFlagSet("limits", flag.PanicOnError)) - overrides, _ := validation.NewOverrides(limits, nil) - - periodConfigUnsupported := config.PeriodConfig{ - From: parseDayTime("2023-09-01"), - IndexType: config.BoltDBShipperType, - ObjectType: config.StorageTypeFileSystem, - Schema: "v13", - RowShards: 16, - IndexTables: config.IndexPeriodicTableConfig{ - PathPrefix: "index/", - PeriodicTableConfig: config.PeriodicTableConfig{ - Prefix: indexTablePrefix, - Period: config.ObjectStorageIndexRequiredPeriod, - }, - }, - } - - periodConfigSupported := config.PeriodConfig{ - From: parseDayTime("2023-10-01"), - IndexType: config.TSDBType, - ObjectType: config.StorageTypeFileSystem, - Schema: "v13", - RowShards: 16, - IndexTables: config.IndexPeriodicTableConfig{ - PathPrefix: "index/", - PeriodicTableConfig: config.PeriodicTableConfig{ - Prefix: indexTablePrefix, - Period: config.ObjectStorageIndexRequiredPeriod, - }, - }, - } - - schemaCfg := config.SchemaConfig{ - Configs: []config.PeriodConfig{ - periodConfigUnsupported, - periodConfigSupported, - }, - } - - fsDir := t.TempDir() - tsdbDir := t.TempDir() - - storageCfg := storage.Config{ - FSConfig: local.FSConfig{ - Directory: fsDir, - }, - TSDBShipperConfig: indexshipper.Config{ - ActiveIndexDirectory: filepath.Join(tsdbDir, "index"), - ResyncInterval: 1 * time.Minute, - Mode: indexshipper.ModeReadWrite, - CacheLocation: filepath.Join(tsdbDir, "cache"), - }, - } - - t.Run("ignore unsupported index types in schema config", func(t *testing.T) { - kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), logger, reg) - t.Cleanup(func() { - closer.Close() - }) - - var cfg Config - flagext.DefaultValues(&cfg) - cfg.Enabled = true - cfg.WorkingDirectory = filepath.Join(t.TempDir(), workingDirName) - cfg.Ring = lokiring.RingConfig{ - KVStore: kv.Config{ - Mock: kvStore, - }, - } - - c, err := New(cfg, storageCfg, schemaCfg, overrides, logger, shardingStrategy, cm, reg) - require.NoError(t, err) - - err = services.StartAndAwaitRunning(context.Background(), c) - require.NoError(t, err) - - require.Equal(t, 1, len(c.storeClients)) - - // supported index type TSDB is present - sc, ok := c.storeClients[periodConfigSupported.From] - require.True(t, ok) - require.NotNil(t, sc) - - // unsupported index type BoltDB is not present - _, ok = c.storeClients[periodConfigUnsupported.From] - require.False(t, ok) - - err = services.StopAndAwaitTerminated(context.Background(), c) - require.NoError(t, err) - }) -} - -func TestCompactor_RunCompaction(t *testing.T) { - logger := log.NewNopLogger() - reg := prometheus.NewRegistry() - - cm := storage.NewClientMetrics() - t.Cleanup(cm.Unregister) - - tempDir := t.TempDir() - indexDir := filepath.Join(tempDir, "index") - - schemaCfg := config.SchemaConfig{ - Configs: []config.PeriodConfig{ - { - From: config.DayTime{Time: model.Time(0)}, - IndexType: "tsdb", - ObjectType: "filesystem", - Schema: "v12", - IndexTables: config.IndexPeriodicTableConfig{ - PathPrefix: "index/", - PeriodicTableConfig: config.PeriodicTableConfig{ - Prefix: indexTablePrefix, - Period: config.ObjectStorageIndexRequiredPeriod, - }}, - }, - }, - } - - daySeconds := int64(24 * time.Hour / time.Second) - tableNumEnd := time.Now().Unix() / daySeconds - tableNumStart := tableNumEnd - 5 - for i := tableNumStart; i <= tableNumEnd; i++ { - compactor.SetupTable( - t, - filepath.Join(indexDir, fmt.Sprintf("%s%d", indexTablePrefix, i)), - compactor.IndexesConfig{ - NumUnCompactedFiles: 5, - NumCompactedFiles: 5, - }, - compactor.PerUserIndexesConfig{ - NumUsers: 5, - IndexesConfig: compactor.IndexesConfig{ - NumUnCompactedFiles: 5, - NumCompactedFiles: 5, - }, - }, - ) - } - - kvStore, cleanUp := consul.NewInMemoryClient(ring.GetCodec(), logger, nil) - t.Cleanup(func() { assert.NoError(t, cleanUp.Close()) }) - - var cfg Config - flagext.DefaultValues(&cfg) - cfg.WorkingDirectory = filepath.Join(tempDir, workingDirName) - cfg.Ring.KVStore.Mock = kvStore - cfg.Ring.ListenPort = 0 - cfg.Ring.InstanceAddr = "bloomcompactor" - cfg.Ring.InstanceID = "bloomcompactor" - - storageConfig := storage.Config{ - FSConfig: local.FSConfig{Directory: tempDir}, - TSDBShipperConfig: indexshipper.Config{ - ActiveIndexDirectory: indexDir, - ResyncInterval: 1 * time.Minute, - Mode: indexshipper.ModeReadWrite, - CacheLocation: filepath.Join(tempDir, "cache"), - }, - } - - var limits validation.Limits - limits.RegisterFlags(flag.NewFlagSet("limits", flag.PanicOnError)) - overrides, _ := validation.NewOverrides(limits, nil) - - ringManager, err := lokiring.NewRingManager("bloom-compactor", lokiring.ServerMode, cfg.Ring, 1, 1, logger, reg) - require.NoError(t, err) - - err = ringManager.StartAsync(context.Background()) - require.NoError(t, err) - require.Eventually(t, func() bool { - return ringManager.State() == services.Running - }, 1*time.Minute, 100*time.Millisecond) - defer func() { - ringManager.StopAsync() - require.Eventually(t, func() bool { - return ringManager.State() == services.Terminated - }, 1*time.Minute, 100*time.Millisecond) - }() - - shuffleSharding := NewShuffleShardingStrategy(ringManager.Ring, ringManager.RingLifecycler, overrides) - - c, err := New(cfg, storageConfig, schemaCfg, overrides, logger, shuffleSharding, cm, nil) - require.NoError(t, err) - - err = c.runCompaction(context.Background()) - require.NoError(t, err) - - // TODO: Once compaction is implemented, verify compaction here. -} diff --git a/pkg/bloomcompactor/chunkcompactor.go b/pkg/bloomcompactor/chunkcompactor.go deleted file mode 100644 index c4993ccc62a59..0000000000000 --- a/pkg/bloomcompactor/chunkcompactor.go +++ /dev/null @@ -1,245 +0,0 @@ -package bloomcompactor - -import ( - "context" - "fmt" - "io" - "os" - "path/filepath" - - "github.com/google/uuid" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/prometheus/common/model" - - "github.com/grafana/loki/pkg/logproto" - v1 "github.com/grafana/loki/pkg/storage/bloom/v1" - "github.com/grafana/loki/pkg/storage/bloom/v1/filter" - "github.com/grafana/loki/pkg/storage/chunk" - "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" - tsdbindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" -) - -type compactorTokenizer interface { - PopulateSeriesWithBloom(bloom *v1.SeriesWithBloom, chunkBatchesIterator v1.Iterator[[]chunk.Chunk]) error -} - -type chunkClient interface { - // TODO: Consider using lazyChunks to avoid downloading all requested chunks. - GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) -} - -type blockBuilder interface { - BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (uint32, error) - Data() (io.ReadSeekCloser, error) -} - -type PersistentBlockBuilder struct { - builder *v1.BlockBuilder - localDst string -} - -func NewPersistentBlockBuilder(localDst string, blockOptions v1.BlockOptions) (*PersistentBlockBuilder, error) { - // write bloom to a local dir - b, err := v1.NewBlockBuilder(blockOptions, v1.NewDirectoryBlockWriter(localDst)) - if err != nil { - return nil, err - } - builder := PersistentBlockBuilder{ - builder: b, - localDst: localDst, - } - return &builder, nil -} - -func (p *PersistentBlockBuilder) BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (uint32, error) { - return p.builder.BuildFrom(itr) -} - -func (p *PersistentBlockBuilder) mergeBuild(builder *v1.MergeBuilder) (uint32, error) { - return builder.Build(p.builder) -} - -func (p *PersistentBlockBuilder) Data() (io.ReadSeekCloser, error) { - blockFile, err := os.Open(filepath.Join(p.localDst, v1.BloomFileName)) - if err != nil { - return nil, err - } - return blockFile, nil -} - -func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fingerprint) []chunk.Chunk { - chunkRefs := make([]chunk.Chunk, 0, len(chksMetas)) - for _, chk := range chksMetas { - chunkRefs = append(chunkRefs, chunk.Chunk{ - ChunkRef: logproto.ChunkRef{ - Fingerprint: uint64(fp), - UserID: tenant, - From: chk.From(), - Through: chk.Through(), - Checksum: chk.Checksum, - }, - }) - } - - return chunkRefs -} - -func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compactorTokenizer, chunks v1.Iterator[[]chunk.Chunk]) (v1.SeriesWithBloom, error) { - // Create a bloom for this series - bloomForChks := v1.SeriesWithBloom{ - Series: &v1.Series{ - Fingerprint: seriesMeta.seriesFP, - }, - Bloom: &v1.Bloom{ - ScalableBloomFilter: *filter.NewDefaultScalableBloomFilter(fpRate), - }, - } - - // Tokenize data into n-grams - err := tokenizer.PopulateSeriesWithBloom(&bloomForChks, chunks) - if err != nil { - return v1.SeriesWithBloom{}, err - } - return bloomForChks, nil -} - -// TODO Test this when bloom block size check is implemented -func buildBlockFromBlooms( - ctx context.Context, - logger log.Logger, - builder blockBuilder, - blooms v1.Iterator[v1.SeriesWithBloom], - job Job, -) (bloomshipper.Block, error) { - // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). - if err := ctx.Err(); err != nil { - return bloomshipper.Block{}, err - } - - checksum, err := builder.BuildFrom(blooms) - if err != nil { - level.Error(logger).Log("msg", "failed writing to bloom", "err", err) - return bloomshipper.Block{}, err - } - - data, err := builder.Data() - if err != nil { - level.Error(logger).Log("msg", "failed reading bloom data", "err", err) - return bloomshipper.Block{}, err - } - - block := bloomshipper.Block{ - BlockRef: bloomshipper.BlockRef{ - Ref: bloomshipper.Ref{ - TenantID: job.tenantID, - TableName: job.tableName, - MinFingerprint: uint64(job.minFp), - MaxFingerprint: uint64(job.maxFp), - StartTimestamp: job.from, - EndTimestamp: job.through, - Checksum: checksum, - }, - IndexPath: job.indexPath, - }, - Data: data, - } - - return block, nil -} - -func createLocalDirName(workingDir string, job Job) string { - dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%d-%d-%s", job.tableName, job.tenantID, job.minFp, job.maxFp, job.from, job.through, uuid.New().String()) - return filepath.Join(workingDir, dir) -} - -// Compacts given list of chunks, uploads them to storage and returns a list of bloomBlocks -func compactNewChunks(ctx context.Context, - logger log.Logger, - job Job, - bt compactorTokenizer, - storeClient chunkClient, - builder blockBuilder, - limits Limits, -) (bloomshipper.Block, error) { - // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). - if err := ctx.Err(); err != nil { - return bloomshipper.Block{}, err - } - - bloomIter := newLazyBloomBuilder(ctx, job, storeClient, bt, logger, limits) - - // Build and upload bloomBlock to storage - block, err := buildBlockFromBlooms(ctx, logger, builder, bloomIter, job) - if err != nil { - level.Error(logger).Log("msg", "failed building bloomBlocks", "err", err) - return bloomshipper.Block{}, err - } - - return block, nil -} - -type lazyBloomBuilder struct { - ctx context.Context - metas v1.Iterator[seriesMeta] - tenant string - client chunkClient - bt compactorTokenizer - fpRate float64 - logger log.Logger - chunksBatchSize int - - cur v1.SeriesWithBloom // retured by At() - err error // returned by Err() -} - -// newLazyBloomBuilder returns an iterator that yields v1.SeriesWithBloom -// which are used by the blockBuilder to write a bloom block. -// We use an interator to avoid loading all blooms into memory first, before -// building the block. -func newLazyBloomBuilder(ctx context.Context, job Job, client chunkClient, bt compactorTokenizer, logger log.Logger, limits Limits) *lazyBloomBuilder { - return &lazyBloomBuilder{ - ctx: ctx, - metas: v1.NewSliceIter(job.seriesMetas), - client: client, - tenant: job.tenantID, - bt: bt, - fpRate: limits.BloomFalsePositiveRate(job.tenantID), - logger: logger, - chunksBatchSize: limits.BloomCompactorChunksBatchSize(job.tenantID), - } -} - -func (it *lazyBloomBuilder) Next() bool { - if !it.metas.Next() { - it.cur = v1.SeriesWithBloom{} - level.Debug(it.logger).Log("msg", "No seriesMeta") - return false - } - meta := it.metas.At() - - batchesIterator, err := newChunkBatchesIterator(it.ctx, it.client, makeChunkRefs(meta.chunkRefs, it.tenant, meta.seriesFP), it.chunksBatchSize) - if err != nil { - it.err = err - it.cur = v1.SeriesWithBloom{} - level.Debug(it.logger).Log("msg", "err creating chunks batches iterator", "err", err) - return false - } - it.cur, err = buildBloomFromSeries(meta, it.fpRate, it.bt, batchesIterator) - if err != nil { - it.err = err - it.cur = v1.SeriesWithBloom{} - level.Debug(it.logger).Log("msg", "err in buildBloomFromSeries", "err", err) - return false - } - return true -} - -func (it *lazyBloomBuilder) At() v1.SeriesWithBloom { - return it.cur -} - -func (it *lazyBloomBuilder) Err() error { - return it.err -} diff --git a/pkg/bloomcompactor/chunkcompactor_test.go b/pkg/bloomcompactor/chunkcompactor_test.go deleted file mode 100644 index 8bc94fd26537a..0000000000000 --- a/pkg/bloomcompactor/chunkcompactor_test.go +++ /dev/null @@ -1,229 +0,0 @@ -package bloomcompactor - -import ( - "context" - "io" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" - - "github.com/stretchr/testify/require" - - "github.com/grafana/loki/pkg/chunkenc" - "github.com/grafana/loki/pkg/push" - v1 "github.com/grafana/loki/pkg/storage/bloom/v1" - "github.com/grafana/loki/pkg/storage/chunk" - "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" -) - -var ( - userID = "userID" - fpRate = 0.01 - - from = model.Earliest - to = model.Latest - - table = "test_table" - indexPath = "index_test_table" - - testBlockSize = 256 * 1024 - testTargetSize = 1500 * 1024 -) - -func createTestChunk(fp model.Fingerprint, lb labels.Labels) chunk.Chunk { - memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), testBlockSize, testTargetSize) - if err := memChunk.Append(&push.Entry{ - Timestamp: time.Unix(0, 1), - Line: "this is a log line", - }); err != nil { - panic(err) - } - c := chunk.NewChunk(userID, - fp, lb, chunkenc.NewFacade(memChunk, testBlockSize, testTargetSize), from, to) - - return c -} - -// Given a seriesMeta and corresponding chunks verify SeriesWithBloom can be built -func TestChunkCompactor_BuildBloomFromSeries(t *testing.T) { - label := labels.FromStrings("foo", "bar") - fp := model.Fingerprint(label.Hash()) - seriesMeta := seriesMeta{ - seriesFP: fp, - seriesLbs: label, - } - - chunks := []chunk.Chunk{createTestChunk(fp, label)} - - mbt := mockBloomTokenizer{} - bloom, err := buildBloomFromSeries(seriesMeta, fpRate, &mbt, v1.NewSliceIter([][]chunk.Chunk{chunks})) - require.NoError(t, err) - require.Equal(t, seriesMeta.seriesFP, bloom.Series.Fingerprint) - require.Equal(t, chunks, mbt.chunks) -} - -func TestChunkCompactor_CompactNewChunks(t *testing.T) { - // Setup - logger := log.NewNopLogger() - label := labels.FromStrings("foo", "bar") - fp1 := model.Fingerprint(100) - fp2 := model.Fingerprint(999) - fp3 := model.Fingerprint(200) - - chunkRef1 := index.ChunkMeta{ - Checksum: 1, - MinTime: 1, - MaxTime: 99, - } - - chunkRef2 := index.ChunkMeta{ - Checksum: 2, - MinTime: 10, - MaxTime: 999, - } - - seriesMetas := []seriesMeta{ - { - seriesFP: fp1, - seriesLbs: label, - chunkRefs: []index.ChunkMeta{chunkRef1}, - }, - { - seriesFP: fp2, - seriesLbs: label, - chunkRefs: []index.ChunkMeta{chunkRef1, chunkRef2}, - }, - { - seriesFP: fp3, - seriesLbs: label, - chunkRefs: []index.ChunkMeta{chunkRef1, chunkRef1, chunkRef2}, - }, - } - - job := NewJob(userID, table, indexPath, seriesMetas) - - mbt := mockBloomTokenizer{} - mcc := mockChunkClient{} - pbb := mockPersistentBlockBuilder{} - - // Run Compaction - compactedBlock, err := compactNewChunks(context.Background(), logger, job, &mbt, &mcc, &pbb, mockLimits{fpRate: fpRate}) - - // Validate Compaction Succeeds - require.NoError(t, err) - require.NotNil(t, compactedBlock) - - // Validate Compacted Block has expected data - require.Equal(t, job.tenantID, compactedBlock.TenantID) - require.Equal(t, job.tableName, compactedBlock.TableName) - require.Equal(t, uint64(fp1), compactedBlock.MinFingerprint) - require.Equal(t, uint64(fp2), compactedBlock.MaxFingerprint) - require.Equal(t, model.Time(chunkRef1.MinTime), compactedBlock.StartTimestamp) - require.Equal(t, model.Time(chunkRef2.MaxTime), compactedBlock.EndTimestamp) - require.Equal(t, indexPath, compactedBlock.IndexPath) -} - -func TestLazyBloomBuilder(t *testing.T) { - logger := log.NewNopLogger() - - label := labels.FromStrings("foo", "bar") - fp1 := model.Fingerprint(100) - fp2 := model.Fingerprint(999) - fp3 := model.Fingerprint(200) - - chunkRef1 := index.ChunkMeta{ - Checksum: 1, - MinTime: 1, - MaxTime: 99, - } - - chunkRef2 := index.ChunkMeta{ - Checksum: 2, - MinTime: 10, - MaxTime: 999, - } - - seriesMetas := []seriesMeta{ - { - seriesFP: fp1, - seriesLbs: label, - chunkRefs: []index.ChunkMeta{chunkRef1}, - }, - { - seriesFP: fp2, - seriesLbs: label, - chunkRefs: []index.ChunkMeta{chunkRef1, chunkRef2}, - }, - { - seriesFP: fp3, - seriesLbs: label, - chunkRefs: []index.ChunkMeta{chunkRef1, chunkRef1, chunkRef2}, - }, - } - - job := NewJob(userID, table, indexPath, seriesMetas) - - mbt := &mockBloomTokenizer{} - mcc := &mockChunkClient{} - - it := newLazyBloomBuilder(context.Background(), job, mcc, mbt, logger, mockLimits{chunksDownloadingBatchSize: 10, fpRate: fpRate}) - - // first seriesMeta has 1 chunks - require.True(t, it.Next()) - require.Equal(t, 1, mcc.requestCount) - require.Equal(t, 1, mcc.chunkCount) - require.Equal(t, fp1, it.At().Series.Fingerprint) - - // first seriesMeta has 2 chunks - require.True(t, it.Next()) - require.Equal(t, 2, mcc.requestCount) - require.Equal(t, 3, mcc.chunkCount) - require.Equal(t, fp2, it.At().Series.Fingerprint) - - // first seriesMeta has 3 chunks - require.True(t, it.Next()) - require.Equal(t, 3, mcc.requestCount) - require.Equal(t, 6, mcc.chunkCount) - require.Equal(t, fp3, it.At().Series.Fingerprint) - - // iterator is done - require.False(t, it.Next()) - require.Error(t, io.EOF, it.Err()) - require.Equal(t, v1.SeriesWithBloom{}, it.At()) -} - -type mockBloomTokenizer struct { - chunks []chunk.Chunk -} - -func (mbt *mockBloomTokenizer) PopulateSeriesWithBloom(_ *v1.SeriesWithBloom, c v1.Iterator[[]chunk.Chunk]) error { - for c.Next() { - mbt.chunks = append(mbt.chunks, c.At()...) - } - return nil -} - -type mockChunkClient struct { - requestCount int - chunkCount int -} - -func (mcc *mockChunkClient) GetChunks(_ context.Context, chks []chunk.Chunk) ([]chunk.Chunk, error) { - mcc.requestCount++ - mcc.chunkCount += len(chks) - return nil, nil -} - -type mockPersistentBlockBuilder struct { -} - -func (pbb *mockPersistentBlockBuilder) BuildFrom(_ v1.Iterator[v1.SeriesWithBloom]) (uint32, error) { - return 0, nil -} - -func (pbb *mockPersistentBlockBuilder) Data() (io.ReadSeekCloser, error) { - return nil, nil -} diff --git a/pkg/bloomcompactor/chunksbatchesiterator.go b/pkg/bloomcompactor/chunksbatchesiterator.go deleted file mode 100644 index a4494b02b7e47..0000000000000 --- a/pkg/bloomcompactor/chunksbatchesiterator.go +++ /dev/null @@ -1,48 +0,0 @@ -package bloomcompactor - -import ( - "context" - "errors" - - "github.com/grafana/loki/pkg/storage/chunk" -) - -type chunksBatchesIterator struct { - context context.Context - client chunkClient - chunksToDownload []chunk.Chunk - batchSize int - - currentBatch []chunk.Chunk - err error -} - -func newChunkBatchesIterator(context context.Context, client chunkClient, chunksToDownload []chunk.Chunk, batchSize int) (*chunksBatchesIterator, error) { - if batchSize <= 0 { - return nil, errors.New("batchSize must be greater than 0") - } - return &chunksBatchesIterator{context: context, client: client, chunksToDownload: chunksToDownload, batchSize: batchSize}, nil -} - -func (c *chunksBatchesIterator) Next() bool { - if len(c.chunksToDownload) == 0 { - return false - } - batchSize := c.batchSize - chunksToDownloadCount := len(c.chunksToDownload) - if chunksToDownloadCount < batchSize { - batchSize = chunksToDownloadCount - } - chunksToDownload := c.chunksToDownload[:batchSize] - c.chunksToDownload = c.chunksToDownload[batchSize:] - c.currentBatch, c.err = c.client.GetChunks(c.context, chunksToDownload) - return c.err == nil -} - -func (c *chunksBatchesIterator) Err() error { - return c.err -} - -func (c *chunksBatchesIterator) At() []chunk.Chunk { - return c.currentBatch -} diff --git a/pkg/bloomcompactor/chunksbatchesiterator_test.go b/pkg/bloomcompactor/chunksbatchesiterator_test.go deleted file mode 100644 index 170f2662b508b..0000000000000 --- a/pkg/bloomcompactor/chunksbatchesiterator_test.go +++ /dev/null @@ -1,96 +0,0 @@ -package bloomcompactor - -import ( - "context" - "errors" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/grafana/loki/pkg/storage/chunk" - tsdbindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" -) - -func Test_chunksBatchesIterator(t *testing.T) { - tests := map[string]struct { - batchSize int - chunksToDownload []chunk.Chunk - constructorError error - - hadNextCount int - }{ - "expected error if batch size is set to 0": { - batchSize: 0, - constructorError: errors.New("batchSize must be greater than 0"), - }, - "expected no error if there are no chunks": { - hadNextCount: 0, - batchSize: 10, - }, - "expected 1 call to the client": { - chunksToDownload: createFakeChunks(10), - hadNextCount: 1, - batchSize: 20, - }, - "expected 1 call to the client(2)": { - chunksToDownload: createFakeChunks(10), - hadNextCount: 1, - batchSize: 10, - }, - "expected 2 calls to the client": { - chunksToDownload: createFakeChunks(10), - hadNextCount: 2, - batchSize: 6, - }, - "expected 10 calls to the client": { - chunksToDownload: createFakeChunks(10), - hadNextCount: 10, - batchSize: 1, - }, - } - for name, data := range tests { - t.Run(name, func(t *testing.T) { - client := &fakeClient{} - iterator, err := newChunkBatchesIterator(context.Background(), client, data.chunksToDownload, data.batchSize) - if data.constructorError != nil { - require.Equal(t, err, data.constructorError) - return - } - hadNextCount := 0 - var downloadedChunks []chunk.Chunk - for iterator.Next() { - hadNextCount++ - downloaded := iterator.At() - downloadedChunks = append(downloadedChunks, downloaded...) - require.LessOrEqual(t, len(downloaded), data.batchSize) - } - require.NoError(t, iterator.Err()) - require.Equal(t, data.chunksToDownload, downloadedChunks) - require.Equal(t, data.hadNextCount, client.callsCount) - require.Equal(t, data.hadNextCount, hadNextCount) - }) - } -} - -func createFakeChunks(count int) []chunk.Chunk { - metas := make([]tsdbindex.ChunkMeta, 0, count) - for i := 0; i < count; i++ { - metas = append(metas, tsdbindex.ChunkMeta{ - Checksum: uint32(i), - MinTime: int64(i), - MaxTime: int64(i + 100), - KB: uint32(i * 100), - Entries: uint32(i * 10), - }) - } - return makeChunkRefs(metas, "fake", 0xFFFF) -} - -type fakeClient struct { - callsCount int -} - -func (f *fakeClient) GetChunks(_ context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) { - f.callsCount++ - return chunks, nil -} diff --git a/pkg/bloomcompactor/job.go b/pkg/bloomcompactor/job.go deleted file mode 100644 index bd43293c73cb6..0000000000000 --- a/pkg/bloomcompactor/job.go +++ /dev/null @@ -1,85 +0,0 @@ -package bloomcompactor - -import ( - "math" - - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" - - "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" -) - -type seriesMeta struct { - seriesFP model.Fingerprint - seriesLbs labels.Labels - chunkRefs []index.ChunkMeta -} - -type Job struct { - tableName, tenantID, indexPath string - seriesMetas []seriesMeta - - // We compute them lazily. Unset value is 0. - from, through model.Time - minFp, maxFp model.Fingerprint -} - -// NewJob returns a new compaction Job. -func NewJob( - tenantID string, - tableName string, - indexPath string, - seriesMetas []seriesMeta, -) Job { - j := Job{ - tenantID: tenantID, - tableName: tableName, - indexPath: indexPath, - seriesMetas: seriesMetas, - } - j.computeBounds() - return j -} - -func (j *Job) String() string { - return j.tableName + "_" + j.tenantID + "_" -} - -func (j *Job) computeBounds() { - if len(j.seriesMetas) == 0 { - return - } - - minFrom := model.Latest - maxThrough := model.Earliest - - minFp := model.Fingerprint(math.MaxInt64) - maxFp := model.Fingerprint(0) - - for _, seriesMeta := range j.seriesMetas { - // calculate timestamp boundaries - for _, chunkRef := range seriesMeta.chunkRefs { - from, through := chunkRef.Bounds() - if minFrom > from { - minFrom = from - } - if maxThrough < through { - maxThrough = through - } - } - - // calculate fingerprint boundaries - if minFp > seriesMeta.seriesFP { - minFp = seriesMeta.seriesFP - } - if maxFp < seriesMeta.seriesFP { - maxFp = seriesMeta.seriesFP - } - } - - j.from = minFrom - j.through = maxThrough - - j.minFp = minFp - j.maxFp = maxFp -} diff --git a/pkg/bloomcompactor/mergecompactor.go b/pkg/bloomcompactor/mergecompactor.go deleted file mode 100644 index 3486e40846b8a..0000000000000 --- a/pkg/bloomcompactor/mergecompactor.go +++ /dev/null @@ -1,150 +0,0 @@ -package bloomcompactor - -import ( - "context" - - "github.com/grafana/dskit/concurrency" - - "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/storage/chunk" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - - v1 "github.com/grafana/loki/pkg/storage/bloom/v1" - "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" -) - -func makeSeriesIterFromSeriesMeta(job Job) *v1.SliceIter[*v1.Series] { - // Satisfy types for series - seriesFromSeriesMeta := make([]*v1.Series, len(job.seriesMetas)) - - for i, s := range job.seriesMetas { - crefs := make([]v1.ChunkRef, len(s.chunkRefs)) - for j, chk := range s.chunkRefs { - crefs[j] = v1.ChunkRef{ - Start: chk.From(), - End: chk.Through(), - Checksum: chk.Checksum, - } - } - seriesFromSeriesMeta[i] = &v1.Series{ - Fingerprint: s.seriesFP, - Chunks: crefs, - } - } - return v1.NewSliceIter(seriesFromSeriesMeta) -} - -func makeBlockIterFromBlocks(ctx context.Context, logger log.Logger, - bloomShipperClient bloomshipper.Client, blocksToUpdate []bloomshipper.BlockRef, - workingDir string) ([]v1.PeekingIterator[*v1.SeriesWithBloom], []string, error) { - - // Download existing blocks that needs compaction - blockIters := make([]v1.PeekingIterator[*v1.SeriesWithBloom], len(blocksToUpdate)) - blockPaths := make([]string, len(blocksToUpdate)) - - err := concurrency.ForEachJob(ctx, len(blocksToUpdate), len(blocksToUpdate), func(ctx context.Context, i int) error { - b := blocksToUpdate[i] - - lazyBlock, err := bloomShipperClient.GetBlock(ctx, b) - if err != nil { - level.Error(logger).Log("msg", "failed downloading block", "err", err) - return err - } - - blockPath, err := bloomshipper.UncompressBloomBlock(&lazyBlock, workingDir, logger) - if err != nil { - level.Error(logger).Log("msg", "failed extracting block", "err", err) - return err - } - blockPaths[i] = blockPath - - reader := v1.NewDirectoryBlockReader(blockPath) - block := v1.NewBlock(reader) - blockQuerier := v1.NewBlockQuerier(block) - - blockIters[i] = v1.NewPeekingIter[*v1.SeriesWithBloom](blockQuerier) - return nil - }) - - if err != nil { - return nil, nil, err - } - return blockIters, blockPaths, nil -} - -func createPopulateFunc(_ context.Context, job Job, _ storeClient, bt *v1.BloomTokenizer, _ Limits) func(series *v1.Series, bloom *v1.Bloom) error { - return func(series *v1.Series, bloom *v1.Bloom) error { - bloomForChks := v1.SeriesWithBloom{ - Series: series, - Bloom: bloom, - } - - // Satisfy types for chunks - chunkRefs := make([]chunk.Chunk, len(series.Chunks)) - for i, chk := range series.Chunks { - chunkRefs[i] = chunk.Chunk{ - ChunkRef: logproto.ChunkRef{ - Fingerprint: uint64(series.Fingerprint), - UserID: job.tenantID, - From: chk.Start, - Through: chk.End, - Checksum: chk.Checksum, - }, - } - } - - // batchesIterator, err := newChunkBatchesIterator(ctx, storeClient.chunk, chunkRefs, limits.BloomCompactorChunksBatchSize(job.tenantID)) - // if err != nil { - // return fmt.Errorf("error creating chunks batches iterator: %w", err) - // } - // NB(owen-d): this panics/etc, but the code is being refactored and will be removed. - // I've replaced `batchesIterator` with `emptyIter` to pass compiler checks while keeping this code around as reference - err := bt.Populate(&bloomForChks, v1.NewEmptyIter[v1.ChunkRefWithIter]()) - if err != nil { - return err - } - return nil - } -} - -func mergeCompactChunks(logger log.Logger, - populate func(*v1.Series, *v1.Bloom) error, - mergeBlockBuilder *PersistentBlockBuilder, - blockIters []v1.PeekingIterator[*v1.SeriesWithBloom], seriesIter *v1.SliceIter[*v1.Series], - job Job) (bloomshipper.Block, error) { - - mergeBuilder := v1.NewMergeBuilder( - blockIters, - seriesIter, - populate) - - checksum, err := mergeBlockBuilder.mergeBuild(mergeBuilder) - if err != nil { - level.Error(logger).Log("msg", "failed merging the blooms", "err", err) - return bloomshipper.Block{}, err - } - data, err := mergeBlockBuilder.Data() - if err != nil { - level.Error(logger).Log("msg", "failed reading bloom data", "err", err) - return bloomshipper.Block{}, err - } - - mergedBlock := bloomshipper.Block{ - BlockRef: bloomshipper.BlockRef{ - Ref: bloomshipper.Ref{ - TenantID: job.tenantID, - TableName: job.tableName, - MinFingerprint: uint64(job.minFp), - MaxFingerprint: uint64(job.maxFp), - StartTimestamp: job.from, - EndTimestamp: job.through, - Checksum: checksum, - }, - IndexPath: job.indexPath, - }, - Data: data, - } - return mergedBlock, nil -} diff --git a/pkg/bloomcompactor/table_utils.go b/pkg/bloomcompactor/table_utils.go index 91940f4cfd455..55bc2e9a328f1 100644 --- a/pkg/bloomcompactor/table_utils.go +++ b/pkg/bloomcompactor/table_utils.go @@ -1,12 +1,9 @@ package bloomcompactor import ( - "sort" - "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/compactor/retention" - "github.com/grafana/loki/pkg/storage/config" ) func getIntervalsForTables(tables []string) map[string]model.Interval { @@ -17,21 +14,3 @@ func getIntervalsForTables(tables []string) map[string]model.Interval { return tablesIntervals } - -func sortTablesByRange(tables []string, intervals map[string]model.Interval) { - sort.Slice(tables, func(i, j int) bool { - // less than if start time is after produces a most recent first sort order - return intervals[tables[i]].Start.After(intervals[tables[j]].Start) - }) -} - -// TODO: comes from pkg/compactor/compactor.go -func schemaPeriodForTable(cfg config.SchemaConfig, tableName string) (config.PeriodConfig, bool) { - tableInterval := retention.ExtractIntervalFromTableName(tableName) - schemaCfg, err := cfg.SchemaForTime(tableInterval.Start) - if err != nil || schemaCfg.IndexTables.TableFor(tableInterval.Start) != tableName { - return config.PeriodConfig{}, false - } - - return schemaCfg, true -} diff --git a/pkg/bloomcompactor/utils.go b/pkg/bloomcompactor/utils.go deleted file mode 100644 index 4b9c3ff541fe2..0000000000000 --- a/pkg/bloomcompactor/utils.go +++ /dev/null @@ -1,37 +0,0 @@ -package bloomcompactor - -import "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" - -func matchingBlocks(metas []bloomshipper.Meta, job Job) ([]bloomshipper.Meta, []bloomshipper.BlockRef) { - var metasMatchingJob []bloomshipper.Meta - var blocksMatchingJob []bloomshipper.BlockRef - oldTombstonedBlockRefs := make(map[bloomshipper.BlockRef]struct{}) - - for _, meta := range metas { - if meta.TableName != job.tableName { - continue - } - metasMatchingJob = append(metasMatchingJob, meta) - - for _, tombstonedBlockRef := range meta.Tombstones { - oldTombstonedBlockRefs[tombstonedBlockRef] = struct{}{} - } - } - - for _, meta := range metasMatchingJob { - for _, blockRef := range meta.Blocks { - if _, ok := oldTombstonedBlockRefs[blockRef]; ok { - // skip any previously tombstoned blockRefs - continue - } - - if blockRef.IndexPath == job.indexPath { - // index has not changed, no compaction needed - continue - } - blocksMatchingJob = append(blocksMatchingJob, blockRef) - } - } - - return metasMatchingJob, blocksMatchingJob -}