Skip to content

Commit

Permalink
introduces KeyResolver+Location ifcs, rips out BlockPath from BlockRef
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <[email protected]>
  • Loading branch information
owen-d committed Feb 2, 2024
1 parent 8c52cf3 commit 515915e
Show file tree
Hide file tree
Showing 10 changed files with 321 additions and 254 deletions.
5 changes: 1 addition & 4 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package bloomgateway

import (
"context"
"fmt"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -343,8 +342,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
EndTimestamp: through,
}
block := bloomshipper.BlockRef{
Ref: ref,
BlockPath: fmt.Sprintf("block-%d", i),
Ref: ref,
}
meta := bloomshipper.Meta{
MetaRef: bloomshipper.MetaRef{
Expand Down Expand Up @@ -458,7 +456,6 @@ func createBlockRefsFromBlockData(t *testing.T, tenant string, data []bloomshipp
EndTimestamp: 0,
Checksum: 0,
},
BlockPath: fmt.Sprintf("block-%d", i),
})
}
return res
Expand Down
46 changes: 23 additions & 23 deletions pkg/storage/stores/shipper/bloomshipper/block_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -175,12 +175,12 @@ type cacheDownloadingStrategy struct {
}

func (s *cacheDownloadingStrategy) downloadBlock(task *BlockDownloadingTask, logger log.Logger) (blockWithQuerier, error) {
blockPath := task.block.BlockPath
s.keyMutex.LockKey(blockPath)
key := s.blockClient.Block(task.block).Addr()
s.keyMutex.LockKey(key)
defer func() {
_ = s.keyMutex.UnlockKey(blockPath)
_ = s.keyMutex.UnlockKey(key)
}()
blockFromCache, exists := s.blocksCache.Get(task.ctx, task.block.BlockPath)
blockFromCache, exists := s.blocksCache.Get(task.ctx, key)
if exists {
return blockWithQuerier{
BlockRef: task.block,
Expand All @@ -193,10 +193,10 @@ func (s *cacheDownloadingStrategy) downloadBlock(task *BlockDownloadingTask, log
return blockWithQuerier{}, err
}
blockFromCache = newCachedBlock(directory, s.config.BlocksCache.RemoveDirectoryGracefulPeriod, logger)
err = s.blocksCache.Store(task.ctx, []string{task.block.BlockPath}, []*cachedBlock{blockFromCache})
err = s.blocksCache.Store(task.ctx, []string{key}, []*cachedBlock{blockFromCache})
if err != nil {
level.Error(logger).Log("msg", "error storing the block in the cache", "block", blockPath, "err", err)
return blockWithQuerier{}, fmt.Errorf("error storing the block %s in the cache : %w", blockPath, err)
level.Error(logger).Log("msg", "error storing the block in the cache", "block", key, "err", err)
return blockWithQuerier{}, fmt.Errorf("error storing the block %s in the cache : %w", key, err)
}
return blockWithQuerier{
BlockRef: task.block,
Expand Down Expand Up @@ -229,20 +229,20 @@ func (s *storageDownloadingStrategy) close() {
}

func downloadBlockToDirectory(logger log.Logger, task *BlockDownloadingTask, workingDirectory string, blockClient BlockClient) (string, error) {
blockPath := task.block.BlockPath
blockPath := filepath.Join(workingDirectory, blockClient.Block(task.block).LocalPath())
level.Debug(logger).Log("msg", "start downloading the block", "block", blockPath)
block, err := blockClient.GetBlock(task.ctx, task.block)
if err != nil {
level.Error(logger).Log("msg", "error downloading the block", "block", blockPath, "err", err)
return "", fmt.Errorf("error downloading the block %s : %w", blockPath, err)
}
directory, err := extractBlock(&block, time.Now(), workingDirectory, logger)
err = extractBlock(block.Data, blockPath, logger)
if err != nil {
level.Error(logger).Log("msg", "error extracting the block", "block", blockPath, "err", err)
return "", fmt.Errorf("error extracting the block %s : %w", blockPath, err)
}
level.Debug(logger).Log("msg", "block has been downloaded and extracted", "block", task.block.BlockPath, "directory", directory)
return directory, nil
level.Debug(logger).Log("msg", "block has been downloaded and extracted", "block", blockPath)
return blockPath, nil
}

func (d *blockDownloader) downloadBlocks(ctx context.Context, tenantID string, references []BlockRef) (chan blockWithQuerier, chan error) {
Expand All @@ -256,10 +256,10 @@ func (d *blockDownloader) downloadBlocks(ctx context.Context, tenantID string, r

for _, reference := range references {
task := NewBlockDownloadingTask(ctx, reference, blocksCh, errCh)
level.Debug(d.logger).Log("msg", "enqueuing task to download block", "block", reference.BlockPath)
level.Debug(d.logger).Log("msg", "enqueuing task to download block", "block", reference)
err := d.queue.Enqueue(tenantID, nil, task, nil)
if err != nil {
errCh <- fmt.Errorf("error enquing downloading task for block %s : %w", reference.BlockPath, err)
errCh <- fmt.Errorf("error enquing downloading task for block %s : %w", reference, err)
return blocksCh, errCh
}
}
Expand All @@ -272,27 +272,27 @@ type blockWithQuerier struct {
}

// extract the files into directory and returns absolute path to this directory.
func extractBlock(block *LazyBlock, ts time.Time, workingDirectory string, logger log.Logger) (string, error) {
workingDirectoryPath := filepath.Join(workingDirectory, block.BlockPath, strconv.FormatInt(ts.UnixNano(), 10))
err := os.MkdirAll(workingDirectoryPath, os.ModePerm)
func extractBlock(data io.ReadCloser, blockDir string, logger log.Logger) error {

err := os.MkdirAll(blockDir, os.ModePerm)
if err != nil {
return "", fmt.Errorf("can not create directory to extract the block: %w", err)
return fmt.Errorf("can not create directory to extract the block: %w", err)
}
archivePath, err := writeDataToTempFile(workingDirectoryPath, block)
archivePath, err := writeDataToTempFile(blockDir, data)
if err != nil {
return "", fmt.Errorf("error writing data to temp file: %w", err)
return fmt.Errorf("error writing data to temp file: %w", err)
}
defer func() {
err = os.Remove(archivePath)
if err != nil {
level.Error(logger).Log("msg", "error removing temp archive file", "err", err)
}
}()
err = extractArchive(archivePath, workingDirectoryPath)
err = extractArchive(archivePath, blockDir)
if err != nil {
return "", fmt.Errorf("error extracting archive: %w", err)
return fmt.Errorf("error extracting archive: %w", err)
}
return workingDirectoryPath, nil
return nil
}

func (d *blockDownloader) stop() {
Expand Down
62 changes: 37 additions & 25 deletions pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
"io"
"os"
"path/filepath"
"strconv"
"sync"
"testing"
"time"

"github.com/go-kit/log"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

Expand Down Expand Up @@ -42,12 +42,12 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) {
}, blockClient, overrides, log.NewNopLogger(), prometheus.DefaultRegisterer)
require.NoError(t, err)
blocksCh, errorsCh := downloader.downloadBlocks(context.Background(), "fake", blockReferences)
downloadedBlocks := make(map[string]any, len(blockReferences))
downloadedBlocks := make(map[BlockRef]any, len(blockReferences))
done := make(chan bool)
go func() {
for i := 0; i < 20; i++ {
block := <-blocksCh
downloadedBlocks[block.BlockPath] = nil
downloadedBlocks[block.BlockRef] = nil
}
done <- true
}()
Expand Down Expand Up @@ -111,12 +111,12 @@ func Test_blockDownloader_downloadBlock(t *testing.T) {
require.NoError(t, err)

blocksCh, errorsCh := downloader.downloadBlocks(context.Background(), "fake", blockReferences)
downloadedBlocks := make(map[string]any, len(blockReferences))
downloadedBlocks := make(map[BlockRef]any, len(blockReferences))
done := make(chan bool)
go func() {
for i := 0; i < 20; i++ {
block := <-blocksCh
downloadedBlocks[block.BlockPath] = nil
downloadedBlocks[block.BlockRef] = nil
}
done <- true
}()
Expand All @@ -132,12 +132,12 @@ func Test_blockDownloader_downloadBlock(t *testing.T) {
require.Equal(t, int32(20), blockClient.getBlockCalls.Load())

blocksCh, errorsCh = downloader.downloadBlocks(context.Background(), "fake", blockReferences)
downloadedBlocks = make(map[string]any, len(blockReferences))
downloadedBlocks = make(map[BlockRef]any, len(blockReferences))
done = make(chan bool)
go func() {
for i := 0; i < 20; i++ {
block := <-blocksCh
downloadedBlocks[block.BlockPath] = nil
downloadedBlocks[block.BlockRef] = nil
}
done <- true
}()
Expand Down Expand Up @@ -313,17 +313,24 @@ func Test_closableBlockQuerier(t *testing.T) {

// creates fake blocks and returns map[block-path]Block and mockBlockClient
func createFakeBlocks(t *testing.T, count int) ([]BlockRef, *mockBlockClient) {
mockData := make(map[string]blockSupplier, count)
mockData := make(map[BlockRef]blockSupplier, count)
refs := make([]BlockRef, 0, count)
for i := 0; i < count; i++ {
archivePath, _, _ := createBlockArchive(t)
_, err := os.OpenFile(archivePath, os.O_RDONLY, 0700)
//ensure file can be opened
require.NoError(t, err)
blockRef := BlockRef{
BlockPath: fmt.Sprintf("block-path-%d", i),
Ref: Ref{
TenantID: "",
TableName: "",
Bounds: v1.NewBounds(model.Fingerprint(i), model.Fingerprint(i+1)),
StartTimestamp: 0,
EndTimestamp: 0,
Checksum: 0,
},
}
mockData[blockRef.BlockPath] = func() LazyBlock {
mockData[blockRef] = func() LazyBlock {
file, _ := os.OpenFile(archivePath, os.O_RDONLY, 0700)
return LazyBlock{
BlockRef: blockRef,
Expand All @@ -339,19 +346,20 @@ type blockSupplier func() LazyBlock

type mockBlockClient struct {
responseDelay time.Duration
mockData map[string]blockSupplier
mockData map[BlockRef]blockSupplier
getBlockCalls atomic.Int32
defaultKeyResolver
}

func (m *mockBlockClient) GetBlock(_ context.Context, reference BlockRef) (LazyBlock, error) {
m.getBlockCalls.Inc()
time.Sleep(m.responseDelay)
supplier, exists := m.mockData[reference.BlockPath]
supplier, exists := m.mockData[reference]
if exists {
return supplier(), nil
}

return LazyBlock{}, fmt.Errorf("block %s is not found in mockData", reference.BlockPath)
return LazyBlock{}, fmt.Errorf("block %s is not found in mockData", reference)
}

func (m *mockBlockClient) PutBlocks(_ context.Context, _ []Block) ([]Block, error) {
Expand All @@ -368,26 +376,30 @@ func Test_blockDownloader_extractBlock(t *testing.T) {
require.NoError(t, err)

workingDir := t.TempDir()
ts := time.Now().UTC()
block := LazyBlock{
BlockRef: BlockRef{BlockPath: "first-period-19621/tenantA/metas/ff-fff-1695272400-1695276000-aaa"},
Data: blockFile,
BlockRef: BlockRef{
Ref: Ref{
TenantID: "",
TableName: "",
Bounds: v1.NewBounds(0, 1),
StartTimestamp: 0,
EndTimestamp: 0,
Checksum: 0,
},
},
Data: blockFile,
}

actualPath, err := extractBlock(&block, ts, workingDir, nil)
err = extractBlock(block.Data, workingDir, nil)

require.NoError(t, err)
expectedPath := filepath.Join(workingDir, block.BlockPath, strconv.FormatInt(ts.UnixNano(), 10))
require.Equal(t, expectedPath, actualPath,
"expected archive to be extracted to working directory under the same path as blockPath and with timestamp suffix")
require.FileExists(t, filepath.Join(expectedPath, v1.BloomFileName))
require.FileExists(t, filepath.Join(expectedPath, v1.SeriesFileName))
require.FileExists(t, filepath.Join(workingDir, v1.BloomFileName))
require.FileExists(t, filepath.Join(workingDir, v1.SeriesFileName))

actualBloomFileContent, err := os.ReadFile(filepath.Join(expectedPath, v1.BloomFileName))
actualBloomFileContent, err := os.ReadFile(filepath.Join(workingDir, v1.BloomFileName))
require.NoError(t, err)
require.Equal(t, bloomFileContent, string(actualBloomFileContent))

actualSeriesFileContent, err := os.ReadFile(filepath.Join(expectedPath, v1.SeriesFileName))
actualSeriesFileContent, err := os.ReadFile(filepath.Join(workingDir, v1.SeriesFileName))
require.NoError(t, err)
require.Equal(t, seriesFileContent, string(actualSeriesFileContent))
}
Expand Down
23 changes: 11 additions & 12 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ func (r Ref) Interval() Interval {

type BlockRef struct {
Ref
BlockPath string
}

func (r BlockRef) String() string {
return defaultKeyResolver{}.Block(r).Addr()
}

type MetaRef struct {
Expand Down Expand Up @@ -110,6 +113,7 @@ type Block struct {
}

type BlockClient interface {
KeyResolver
GetBlock(ctx context.Context, ref BlockRef) (LazyBlock, error)
PutBlocks(ctx context.Context, blocks []Block) ([]Block, error)
DeleteBlocks(ctx context.Context, blocks []BlockRef) error
Expand All @@ -125,14 +129,16 @@ type Client interface {
var _ Client = &BloomClient{}

type BloomClient struct {
KeyResolver
concurrency int
client client.ObjectClient
logger log.Logger
}

func NewBloomClient(client client.ObjectClient, logger log.Logger) (*BloomClient, error) {
return &BloomClient{
concurrency: 100, // make configurable?
KeyResolver: defaultKeyResolver{}, // TODO(owen-d): hook into schema, similar to `{,Parse}ExternalKey`
concurrency: 100, // make configurable?
client: client,
logger: logger,
}, nil
Expand All @@ -147,12 +153,6 @@ func (b *BloomClient) PutMeta(ctx context.Context, meta Meta) error {
return b.client.PutObject(ctx, key, bytes.NewReader(data))
}

func externalBlockKey(ref BlockRef) string {
blockParentFolder := ref.Bounds.String()
filename := fmt.Sprintf("%d-%d-%x", ref.StartTimestamp, ref.EndTimestamp, ref.Checksum)
return path.Join(rootFolder, ref.TableName, ref.TenantID, bloomsFolder, blockParentFolder, filename)
}

func externalMetaKey(ref MetaRef) string {
filename := fmt.Sprintf("%s-%d-%d-%x", ref.Bounds.String(), ref.StartTimestamp, ref.EndTimestamp, ref.Checksum)
return path.Join(rootFolder, ref.TableName, ref.TenantID, metasFolder, filename)
Expand All @@ -175,7 +175,7 @@ func (b *BloomClient) DeleteMeta(ctx context.Context, meta Meta) error {

// GetBlock downloads the blocks from objectStorage and returns the downloaded block
func (b *BloomClient) GetBlock(ctx context.Context, reference BlockRef) (LazyBlock, error) {
readCloser, _, err := b.client.GetObject(ctx, externalBlockKey(reference))
readCloser, _, err := b.client.GetObject(ctx, b.Block(reference).Addr())
if err != nil {
return LazyBlock{}, fmt.Errorf("error while fetching object from storage: %w", err)
}
Expand All @@ -195,7 +195,7 @@ func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, e

var err error

key := externalBlockKey(block.BlockRef)
key := b.Block(block.BlockRef).Addr()
_, err = block.Data.Seek(0, 0)
if err != nil {
return fmt.Errorf("error uploading block file: %w", err)
Expand All @@ -205,7 +205,6 @@ func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, e
if err != nil {
return fmt.Errorf("error uploading block file: %w", err)
}
block.BlockPath = key
results[idx] = block
return nil
})
Expand All @@ -215,7 +214,7 @@ func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, e
func (b *BloomClient) DeleteBlocks(ctx context.Context, references []BlockRef) error {
return concurrency.ForEachJob(ctx, len(references), b.concurrency, func(ctx context.Context, idx int) error {
ref := references[idx]
key := externalBlockKey(ref)
key := b.Block(ref).Addr()
err := b.client.DeleteObject(ctx, key)
if err != nil {
return fmt.Errorf("error deleting block file: %w", err)
Expand Down
Loading

0 comments on commit 515915e

Please sign in to comment.