diff --git a/CHANGELOG.md b/CHANGELOG.md index f01c91517f..22dd2de8d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * `-flusher.wal-dir` for the WAL directory to recover from. * `-flusher.concurrent-flushes` for number of concurrent flushes. * `-flusher.flush-op-timeout` is duration after which a flush should timeout. +* [ENHANCEMENT] Experimental TSDB: Add support for local `filesystem` backend. #2245 ## 0.7.0-rc.0 / 2020-03-09 diff --git a/docs/architecture.md b/docs/architecture.md index 5aaa8c4aad..c9e07b0de0 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -61,6 +61,7 @@ The blocks storage doesn't require a dedicated storage backend for the index. Th * [Amazon S3](https://aws.amazon.com/s3) * [Google Cloud Storage](https://cloud.google.com/storage/) * [Microsoft Azure Storage](https://azure.microsoft.com/en-us/services/storage/) +* [Local Filesystem](https://thanos.io/storage.md/#filesystem) (single node only) For more information, please check out the [Blocks storage](operations/blocks-storage.md) documentation. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index c4f264619a..01f1f89054 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2164,7 +2164,7 @@ The `tsdb_config` configures the experimental blocks storage. # CLI flag: -experimental.tsdb.ship-concurrency [ship_concurrency: | default = 10] -# Backend storage to use. Either "s3" or "gcs". +# Backend storage to use. Supported backends are: s3, gcs, azure, filesystem. # CLI flag: -experimental.tsdb.backend [backend: | default = "s3"] @@ -2295,6 +2295,11 @@ azure: # Number of retries for recoverable errors # CLI flag: -experimental.tsdb.azure.max-retries [max_retries: | default = 20] + +filesystem: + # Local filesystem storage directory. + # CLI flag: -experimental.tsdb.filesystem.dir + [dir: | default = ""] ``` ### `compactor_config` diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index fb8860b036..2b60932a4f 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -12,6 +12,7 @@ The supported backends for the blocks storage are: * [Amazon S3](https://aws.amazon.com/s3) * [Google Cloud Storage](https://cloud.google.com/storage/) * [Microsoft Azure Storage](https://azure.microsoft.com/en-us/services/storage/) +* [Local Filesystem](https://thanos.io/storage.md/#filesystem) (single node only) _Internally, this storage engine is based on [Thanos](https://thanos.io), but no Thanos knowledge is required in order to run it._ @@ -119,7 +120,7 @@ tsdb: # CLI flag: -experimental.tsdb.ship-concurrency [ship_concurrency: | default = 10] - # Backend storage to use. Either "s3" or "gcs". + # Backend storage to use. Supported backends are: s3, gcs, azure, filesystem. # CLI flag: -experimental.tsdb.backend [backend: | default = "s3"] @@ -252,6 +253,11 @@ tsdb: # Number of retries for recoverable errors # CLI flag: -experimental.tsdb.azure.max-retries [max_retries: | default = 20] + + filesystem: + # Local filesystem storage directory. + # CLI flag: -experimental.tsdb.filesystem.dir + [dir: | default = ""] ``` ### `compactor_config` diff --git a/docs/operations/blocks-storage.template b/docs/operations/blocks-storage.template index 91f69df81f..52c2882bbf 100644 --- a/docs/operations/blocks-storage.template +++ b/docs/operations/blocks-storage.template @@ -12,6 +12,7 @@ The supported backends for the blocks storage are: * [Amazon S3](https://aws.amazon.com/s3) * [Google Cloud Storage](https://cloud.google.com/storage/) * [Microsoft Azure Storage](https://azure.microsoft.com/en-us/services/storage/) +* [Local Filesystem](https://thanos.io/storage.md/#filesystem) (single node only) _Internally, this storage engine is based on [Thanos](https://thanos.io), but no Thanos knowledge is required in order to run it._ diff --git a/pkg/storage/tsdb/backend/filesystem/bucket_client.go b/pkg/storage/tsdb/backend/filesystem/bucket_client.go new file mode 100644 index 0000000000..feed92c073 --- /dev/null +++ b/pkg/storage/tsdb/backend/filesystem/bucket_client.go @@ -0,0 +1,11 @@ +package filesystem + +import ( + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/objstore/filesystem" +) + +// NewBucketClient creates a new filesystem bucket client +func NewBucketClient(cfg Config) (objstore.Bucket, error) { + return filesystem.NewBucket(cfg.Directory) +} diff --git a/pkg/storage/tsdb/backend/filesystem/config.go b/pkg/storage/tsdb/backend/filesystem/config.go new file mode 100644 index 0000000000..03a4f85f5b --- /dev/null +++ b/pkg/storage/tsdb/backend/filesystem/config.go @@ -0,0 +1,13 @@ +package filesystem + +import "flag" + +// Config stores the configuration for storing and accessing objects in the local filesystem. +type Config struct { + Directory string `yaml:"dir"` +} + +// RegisterFlags registers the flags for TSDB filesystem storage +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.Directory, "experimental.tsdb.filesystem.dir", "", "Local filesystem storage directory.") +} diff --git a/pkg/storage/tsdb/bucket_client.go b/pkg/storage/tsdb/bucket_client.go index a12a4f32d3..69573d86e1 100644 --- a/pkg/storage/tsdb/bucket_client.go +++ b/pkg/storage/tsdb/bucket_client.go @@ -7,6 +7,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" "github.com/cortexproject/cortex/pkg/storage/tsdb/backend/azure" + "github.com/cortexproject/cortex/pkg/storage/tsdb/backend/filesystem" "github.com/cortexproject/cortex/pkg/storage/tsdb/backend/gcs" "github.com/cortexproject/cortex/pkg/storage/tsdb/backend/s3" ) @@ -20,6 +21,8 @@ func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Lo return gcs.NewBucketClient(ctx, cfg.GCS, name, logger) case BackendAzure: return azure.NewBucketClient(cfg.Azure, name, logger) + case BackendFilesystem: + return filesystem.NewBucketClient(cfg.Filesystem) default: return nil, errUnsupportedBackend } diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index bbaf8b1468..577645d864 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -3,6 +3,7 @@ package tsdb import ( "errors" "flag" + "fmt" "path/filepath" "strings" "time" @@ -10,6 +11,7 @@ import ( "github.com/alecthomas/units" "github.com/cortexproject/cortex/pkg/storage/tsdb/backend/azure" + "github.com/cortexproject/cortex/pkg/storage/tsdb/backend/filesystem" "github.com/cortexproject/cortex/pkg/storage/tsdb/backend/gcs" "github.com/cortexproject/cortex/pkg/storage/tsdb/backend/s3" ) @@ -24,12 +26,17 @@ const ( // BackendAzure is the value for the Azure storage backend BackendAzure = "azure" + // BackendFilesystem is the value for the filesystem storge backend + BackendFilesystem = "filesystem" + // TenantIDExternalLabel is the external label set when shipping blocks to the storage TenantIDExternalLabel = "__org_id__" ) // Validation errors var ( + supportedBackends = []string{BackendS3, BackendGCS, BackendAzure, BackendFilesystem} + errUnsupportedBackend = errors.New("unsupported TSDB storage backend") errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency") errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval") @@ -54,9 +61,10 @@ type Config struct { MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"` // Backends - S3 s3.Config `yaml:"s3"` - GCS gcs.Config `yaml:"gcs"` - Azure azure.Config `yaml:"azure"` + S3 s3.Config `yaml:"s3"` + GCS gcs.Config `yaml:"gcs"` + Azure azure.Config `yaml:"azure"` + Filesystem filesystem.Config `yaml:"filesystem"` } // DurationList is the block ranges for a tsdb @@ -102,6 +110,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.GCS.RegisterFlags(f) cfg.Azure.RegisterFlags(f) cfg.BucketStore.RegisterFlags(f) + cfg.Filesystem.RegisterFlags(f) if len(cfg.BlockRanges) == 0 { cfg.BlockRanges = []time.Duration{2 * time.Hour} // Default 2h block @@ -112,7 +121,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.Retention, "experimental.tsdb.retention-period", 6*time.Hour, "TSDB blocks retention in the ingester before a block is removed. This should be larger than the block_ranges_period and large enough to give queriers enough time to discover newly uploaded blocks.") f.DurationVar(&cfg.ShipInterval, "experimental.tsdb.ship-interval", 1*time.Minute, "How frequently the TSDB blocks are scanned and new ones are shipped to the storage. 0 means shipping is disabled.") f.IntVar(&cfg.ShipConcurrency, "experimental.tsdb.ship-concurrency", 10, "Maximum number of tenants concurrently shipping blocks to the storage.") - f.StringVar(&cfg.Backend, "experimental.tsdb.backend", "s3", `Backend storage to use. Either "s3" or "gcs".`) + f.StringVar(&cfg.Backend, "experimental.tsdb.backend", "s3", fmt.Sprintf("Backend storage to use. Supported backends are: %s.", strings.Join(supportedBackends, ", "))) f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "experimental.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup") f.DurationVar(&cfg.HeadCompactionInterval, "experimental.tsdb.head-compaction-interval", 1*time.Minute, "How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range. Must be greater than 0 and max 5 minutes.") f.IntVar(&cfg.HeadCompactionConcurrency, "experimental.tsdb.head-compaction-concurrency", 5, "Maximum number of tenants concurrently compacting TSDB head into a new block") @@ -121,7 +130,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // Validate the config func (cfg *Config) Validate() error { - if cfg.Backend != BackendS3 && cfg.Backend != BackendGCS && cfg.Backend != BackendAzure { + if cfg.Backend != BackendS3 && cfg.Backend != BackendGCS && cfg.Backend != BackendAzure && cfg.Backend != BackendFilesystem { return errUnsupportedBackend } diff --git a/vendor/github.com/thanos-io/thanos/pkg/objstore/filesystem/filesystem.go b/vendor/github.com/thanos-io/thanos/pkg/objstore/filesystem/filesystem.go new file mode 100644 index 0000000000..0a13994d93 --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/objstore/filesystem/filesystem.go @@ -0,0 +1,223 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package filesystem + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + + "github.com/thanos-io/thanos/pkg/objstore" + "gopkg.in/yaml.v2" + + "github.com/thanos-io/thanos/pkg/runutil" + + "github.com/pkg/errors" +) + +// Config stores the configuration for storing and accessing blobs in filesystem. +type Config struct { + Directory string `yaml:"directory"` +} + +// Bucket implements the objstore.Bucket interfaces against filesystem that binary runs on. +// Methods from Bucket interface are thread-safe. Objects are assumed to be immutable. +// NOTE: It does not follow symbolic links. +type Bucket struct { + rootDir string +} + +// NewBucketFromConfig returns a new filesystem.Bucket from config. +func NewBucketFromConfig(conf []byte) (*Bucket, error) { + var c Config + if err := yaml.Unmarshal(conf, &c); err != nil { + return nil, err + } + if c.Directory == "" { + return nil, errors.New("missing directory for filesystem bucket") + } + return NewBucket(c.Directory) +} + +// NewBucket returns a new filesystem.Bucket. +func NewBucket(rootDir string) (*Bucket, error) { + absDir, err := filepath.Abs(rootDir) + if err != nil { + return nil, err + } + return &Bucket{rootDir: absDir}, nil +} + +// Iter calls f for each entry in the given directory. The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error { + absDir := filepath.Join(b.rootDir, dir) + info, err := os.Stat(absDir) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return errors.Wrapf(err, "stat %s", absDir) + } + if !info.IsDir() { + return nil + } + + files, err := ioutil.ReadDir(absDir) + if err != nil { + return err + } + for _, file := range files { + name := filepath.Join(dir, file.Name()) + + if file.IsDir() { + empty, err := isDirEmpty(filepath.Join(absDir, file.Name())) + if err != nil { + return err + } + + if empty { + // Skip empty directories. + continue + } + name += objstore.DirDelim + } + if err := f(name); err != nil { + return err + } + } + return nil +} + +// Get returns a reader for the given object name. +func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + return b.GetRange(ctx, name, 0, -1) +} + +type rangeReaderCloser struct { + io.Reader + f *os.File +} + +func (r *rangeReaderCloser) Close() error { + return r.f.Close() +} + +// ObjectSize returns the size of the specified object. +func (b *Bucket) ObjectSize(_ context.Context, name string) (uint64, error) { + file := filepath.Join(b.rootDir, name) + st, err := os.Stat(file) + if err != nil { + return 0, errors.Wrapf(err, "stat %s", file) + } + return uint64(st.Size()), nil +} + +// GetRange returns a new range reader for the given object name and range. +func (b *Bucket) GetRange(_ context.Context, name string, off, length int64) (io.ReadCloser, error) { + if name == "" { + return nil, errors.New("object name is empty") + } + + file := filepath.Join(b.rootDir, name) + if _, err := os.Stat(file); err != nil { + return nil, errors.Wrapf(err, "stat %s", file) + } + + f, err := os.OpenFile(file, os.O_RDONLY, 0666) + if err != nil { + return nil, err + } + + if off > 0 { + _, err := f.Seek(off, 0) + if err != nil { + return nil, errors.Wrapf(err, "seek %v", off) + } + } + + if length == -1 { + return f, nil + } + + return &rangeReaderCloser{Reader: io.LimitReader(f, length), f: f}, nil +} + +// Exists checks if the given directory exists in memory. +func (b *Bucket) Exists(_ context.Context, name string) (bool, error) { + info, err := os.Stat(filepath.Join(b.rootDir, name)) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, errors.Wrapf(err, "stat %s", filepath.Join(b.rootDir, name)) + } + return !info.IsDir(), nil +} + +// Upload writes the file specified in src to into the memory. +func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) (err error) { + file := filepath.Join(b.rootDir, name) + if err := os.MkdirAll(filepath.Dir(file), os.ModePerm); err != nil { + return err + } + + f, err := os.Create(file) + if err != nil { + return err + } + defer runutil.CloseWithErrCapture(&err, f, "close") + + if _, err := io.Copy(f, r); err != nil { + return errors.Wrapf(err, "copy to %s", file) + } + return nil +} + +func isDirEmpty(name string) (ok bool, err error) { + f, err := os.Open(name) + if err != nil { + return false, err + } + defer runutil.CloseWithErrCapture(&err, f, "dir open") + + if _, err = f.Readdir(1); err == io.EOF { + return true, nil + } + return false, err +} + +// Delete removes all data prefixed with the dir. +func (b *Bucket) Delete(_ context.Context, name string) error { + file := filepath.Join(b.rootDir, name) + for file != b.rootDir { + if err := os.RemoveAll(file); err != nil { + return errors.Wrapf(err, "rm %s", file) + } + file = filepath.Dir(file) + empty, err := isDirEmpty(file) + if err != nil { + return err + } + if !empty { + break + } + } + return nil +} + +// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. +func (b *Bucket) IsObjNotFoundErr(err error) bool { + return os.IsNotExist(errors.Cause(err)) +} + +func (b *Bucket) Close() error { return nil } + +// Name returns the bucket name. +func (b *Bucket) Name() string { + return fmt.Sprintf("fs: %s", b.rootDir) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 26cc0ce90e..ceae7755b9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -533,6 +533,7 @@ github.com/thanos-io/thanos/pkg/gate github.com/thanos-io/thanos/pkg/model github.com/thanos-io/thanos/pkg/objstore github.com/thanos-io/thanos/pkg/objstore/azure +github.com/thanos-io/thanos/pkg/objstore/filesystem github.com/thanos-io/thanos/pkg/objstore/gcs github.com/thanos-io/thanos/pkg/objstore/s3 github.com/thanos-io/thanos/pkg/pool