Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close gaps where append/completeblock file handles would rely on GC to close #570

Merged
merged 3 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [BUGFIX] Fixes permissions errors on startup in GCS. [#554](https://github.com/grafana/tempo/pull/554)
* [BUGFIX] Fixes error where Dell ECS cannot list objects. [#561](https://github.com/grafana/tempo/pull/561)
* [BUGFIX] Fixes listing blocks in S3 when the list is truncated. [#567](https://github.com/grafana/tempo/pull/567)
* [BUGFIX] Fixes where ingester may leave file open [#569](https://github.com/grafana/tempo/issues/569)
mdisibio marked this conversation as resolved.
Show resolved Hide resolved

## v0.6.0

Expand Down
7 changes: 7 additions & 0 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,20 @@ func (i *instance) CompleteBlock(blockID uuid.UUID) (uuid.UUID, error) {
i.blocksMtx.Unlock()
return uuid.Nil, err
}

err = completingBlock.Clear()
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
level.Error(log.Logger).Log("msg", "Error clearing wal", "tenantID", i.instanceID, "err", err)
}

// remove completingBlock from list
for j, iterBlock := range i.completingBlocks {
if iterBlock.BlockID() == blockID {
i.completingBlocks = append(i.completingBlocks[:j], i.completingBlocks[j+1:]...)
break
}
}

completeBlockID := completeBlock.BlockMeta().BlockID
i.completeBlocks = append(i.completeBlocks, completeBlock)
i.blocksMtx.Unlock()
Expand Down
30 changes: 8 additions & 22 deletions tempodb/encoding/complete_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,27 @@ type CompleteBlock struct {
records []*common.Record

flushedTime atomic.Int64 // protecting flushedTime b/c it's accessed from the store on flush and from the ingester instance checking flush time
walFilename string

filepath string
readFile *os.File
once sync.Once
}

// NewCompleteBlock creates a new block and takes _ALL_ the parameters necessary to build the ordered, deduped file on disk
func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iterator Iterator, estimatedObjects int, filepath string, walFilename string) (*CompleteBlock, error) {
func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iterator Iterator, estimatedObjects int, filepath string) (*CompleteBlock, error) {
c := &CompleteBlock{
encoding: latestEncoding(),
meta: backend.NewBlockMeta(originatingMeta.TenantID, uuid.New(), currentVersion, cfg.Encoding),
bloom: common.NewWithEstimates(uint(estimatedObjects), cfg.BloomFP),
records: make([]*common.Record, 0),
filepath: filepath,
walFilename: walFilename,
encoding: latestEncoding(),
meta: backend.NewBlockMeta(originatingMeta.TenantID, uuid.New(), currentVersion, cfg.Encoding),
bloom: common.NewWithEstimates(uint(estimatedObjects), cfg.BloomFP),
records: make([]*common.Record, 0),
filepath: filepath,
}

_, err := os.Create(c.fullFilename())
if err != nil {
return nil, err
}

appendFile, err := os.OpenFile(c.fullFilename(), os.O_APPEND|os.O_WRONLY, 0644)
appendFile, err := os.OpenFile(c.fullFilename(), os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return nil, err
}
defer appendFile.Close()

pageWriter, err := c.encoding.newPageWriter(appendFile, cfg.Encoding)
if err != nil {
Expand All @@ -71,7 +65,6 @@ func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iter
break
}
if err != nil {
_ = appendFile.Close()
_ = os.Remove(c.fullFilename())
return nil, err
}
Expand All @@ -82,7 +75,6 @@ func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iter
writeID := append([]byte(nil), bytesID...)
err = appender.Append(writeID, bytesObject)
if err != nil {
_ = appendFile.Close()
_ = os.Remove(c.fullFilename())
return nil, err
}
Expand All @@ -91,7 +83,6 @@ func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iter
if err != nil {
return nil, err
}
appendFile.Close()
c.records = appender.Records()
c.meta.Size = appender.DataLength() // Must be after Complete()
c.meta.StartTime = originatingMeta.StartTime
Expand Down Expand Up @@ -137,11 +128,6 @@ func (c *CompleteBlock) Write(ctx context.Context, w backend.Writer) error {

// book keeping
c.flushedTime.Store(time.Now().Unix())
err = os.Remove(c.walFilename) // now that we are flushed, remove our wal file
if err != nil {
return err
}

return nil
}

Expand Down
6 changes: 3 additions & 3 deletions tempodb/encoding/complete_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func testCompleteBlockToBackendBlock(t *testing.T, cfg *BlockConfig) {
require.NoError(t, err, "error creating backend")

err = block.Write(context.Background(), w)
require.EqualError(t, err, "remove : no such file or directory") // we expect an error here b/c there is no wal file to clear
require.NoError(t, err, "error writing backend")

// meta?
uuids, err := r.Blocks(context.Background(), testTenantID)
Expand Down Expand Up @@ -198,7 +198,7 @@ func completeBlock(t *testing.T, cfg *BlockConfig, tempDir string) (*CompleteBlo
}

iterator := NewRecordIterator(appender.Records(), bytes.NewReader(buffer.Bytes()))
block, err := NewCompleteBlock(cfg, originatingMeta, iterator, numMsgs, tempDir, "")
block, err := NewCompleteBlock(cfg, originatingMeta, iterator, numMsgs, tempDir)
require.NoError(t, err, "unexpected error completing block")

// test downsample config
Expand Down Expand Up @@ -285,7 +285,7 @@ func benchmarkCompressBlock(b *testing.B, encoding backend.Encoding, indexDownsa
IndexDownsampleBytes: indexDownsample,
BloomFP: .05,
Encoding: encoding,
}, originatingMeta, iterator, 10000, tempDir, "")
}, originatingMeta, iterator, 10000, tempDir)
require.NoError(b, err, "error creating block")

lastRecord := cb.records[len(cb.records)-1]
Expand Down
9 changes: 2 additions & 7 deletions tempodb/wal/append_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,8 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string) (*AppendBloc
}

name := h.fullFilename()
unused, err := os.Create(name)
if err != nil {
return nil, err
}
unused.Close()

f, err := os.OpenFile(name, os.O_APPEND|os.O_WRONLY, 0644)
f, err := os.OpenFile(name, os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -93,7 +88,7 @@ func (h *AppendBlock) Complete(cfg *encoding.BlockConfig, w *WAL, combiner commo
}
defer iterator.Close()

orderedBlock, err := encoding.NewCompleteBlock(cfg, h.meta, iterator, len(records), w.c.CompletedFilepath, h.fullFilename())
orderedBlock, err := encoding.NewCompleteBlock(cfg, h.meta, iterator, len(records), w.c.CompletedFilepath)
if err != nil {
return nil, err
}
Expand Down