Skip to content

Commit

Permalink
Close gaps where append/completeblock file handles would rely on GC t…
Browse files Browse the repository at this point in the history
…o close (#570)

* Close gaps where append/completeblock file handles would rely on GC to close

* Update changelog

* Changed order of operations to delete wal block last outside of lock
  • Loading branch information
mdisibio authored Mar 8, 2021
1 parent c8cad66 commit 7ab0938
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [BUGFIX] Fixes permissions errors on startup in GCS. [#554](https://github.com/grafana/tempo/pull/554)
* [BUGFIX] Fixes error where Dell ECS cannot list objects. [#561](https://github.com/grafana/tempo/pull/561)
* [BUGFIX] Fixes listing blocks in S3 when the list is truncated. [#567](https://github.com/grafana/tempo/pull/567)
* [BUGFIX] Fixes where ingester may leave file open [#570](https://github.com/grafana/tempo/pull/570)

## v0.6.0

Expand Down
12 changes: 8 additions & 4 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,14 @@ func (i *instance) CompleteBlock(blockID uuid.UUID) (uuid.UUID, error) {

// potentially long running operation placed outside blocksMtx
completeBlock, err := i.writer.CompleteBlock(completingBlock, i)

i.blocksMtx.Lock()
if err != nil {
metricFailedFlushes.Inc()
level.Error(log.Logger).Log("msg", "unable to complete block.", "tenantID", i.instanceID, "err", err)
i.blocksMtx.Unlock()
return uuid.Nil, err
}
// remove completingBlock from list

// remove completingBlock and add completeBlock
i.blocksMtx.Lock()
for j, iterBlock := range i.completingBlocks {
if iterBlock.BlockID() == blockID {
i.completingBlocks = append(i.completingBlocks[:j], i.completingBlocks[j+1:]...)
Expand All @@ -198,6 +197,11 @@ func (i *instance) CompleteBlock(blockID uuid.UUID) (uuid.UUID, error) {
i.completeBlocks = append(i.completeBlocks, completeBlock)
i.blocksMtx.Unlock()

err = completingBlock.Clear()
if err != nil {
level.Error(log.Logger).Log("msg", "Error clearing wal", "tenantID", i.instanceID, "err", err)
}

return completeBlockID, nil
}

Expand Down
30 changes: 8 additions & 22 deletions tempodb/encoding/complete_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,27 @@ type CompleteBlock struct {
records []*common.Record

flushedTime atomic.Int64 // protecting flushedTime b/c it's accessed from the store on flush and from the ingester instance checking flush time
walFilename string

filepath string
readFile *os.File
once sync.Once
}

// NewCompleteBlock creates a new block and takes _ALL_ the parameters necessary to build the ordered, deduped file on disk
func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iterator Iterator, estimatedObjects int, filepath string, walFilename string) (*CompleteBlock, error) {
func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iterator Iterator, estimatedObjects int, filepath string) (*CompleteBlock, error) {
c := &CompleteBlock{
encoding: latestEncoding(),
meta: backend.NewBlockMeta(originatingMeta.TenantID, uuid.New(), currentVersion, cfg.Encoding),
bloom: common.NewWithEstimates(uint(estimatedObjects), cfg.BloomFP),
records: make([]*common.Record, 0),
filepath: filepath,
walFilename: walFilename,
encoding: latestEncoding(),
meta: backend.NewBlockMeta(originatingMeta.TenantID, uuid.New(), currentVersion, cfg.Encoding),
bloom: common.NewWithEstimates(uint(estimatedObjects), cfg.BloomFP),
records: make([]*common.Record, 0),
filepath: filepath,
}

_, err := os.Create(c.fullFilename())
if err != nil {
return nil, err
}

appendFile, err := os.OpenFile(c.fullFilename(), os.O_APPEND|os.O_WRONLY, 0644)
appendFile, err := os.OpenFile(c.fullFilename(), os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return nil, err
}
defer appendFile.Close()

pageWriter, err := c.encoding.newPageWriter(appendFile, cfg.Encoding)
if err != nil {
Expand All @@ -71,7 +65,6 @@ func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iter
break
}
if err != nil {
_ = appendFile.Close()
_ = os.Remove(c.fullFilename())
return nil, err
}
Expand All @@ -82,7 +75,6 @@ func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iter
writeID := append([]byte(nil), bytesID...)
err = appender.Append(writeID, bytesObject)
if err != nil {
_ = appendFile.Close()
_ = os.Remove(c.fullFilename())
return nil, err
}
Expand All @@ -91,7 +83,6 @@ func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iter
if err != nil {
return nil, err
}
appendFile.Close()
c.records = appender.Records()
c.meta.Size = appender.DataLength() // Must be after Complete()
c.meta.StartTime = originatingMeta.StartTime
Expand Down Expand Up @@ -137,11 +128,6 @@ func (c *CompleteBlock) Write(ctx context.Context, w backend.Writer) error {

// book keeping
c.flushedTime.Store(time.Now().Unix())
err = os.Remove(c.walFilename) // now that we are flushed, remove our wal file
if err != nil {
return err
}

return nil
}

Expand Down
6 changes: 3 additions & 3 deletions tempodb/encoding/complete_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func testCompleteBlockToBackendBlock(t *testing.T, cfg *BlockConfig) {
require.NoError(t, err, "error creating backend")

err = block.Write(context.Background(), w)
require.EqualError(t, err, "remove : no such file or directory") // we expect an error here b/c there is no wal file to clear
require.NoError(t, err, "error writing backend")

// meta?
uuids, err := r.Blocks(context.Background(), testTenantID)
Expand Down Expand Up @@ -198,7 +198,7 @@ func completeBlock(t *testing.T, cfg *BlockConfig, tempDir string) (*CompleteBlo
}

iterator := NewRecordIterator(appender.Records(), bytes.NewReader(buffer.Bytes()))
block, err := NewCompleteBlock(cfg, originatingMeta, iterator, numMsgs, tempDir, "")
block, err := NewCompleteBlock(cfg, originatingMeta, iterator, numMsgs, tempDir)
require.NoError(t, err, "unexpected error completing block")

// test downsample config
Expand Down Expand Up @@ -285,7 +285,7 @@ func benchmarkCompressBlock(b *testing.B, encoding backend.Encoding, indexDownsa
IndexDownsampleBytes: indexDownsample,
BloomFP: .05,
Encoding: encoding,
}, originatingMeta, iterator, 10000, tempDir, "")
}, originatingMeta, iterator, 10000, tempDir)
require.NoError(b, err, "error creating block")

lastRecord := cb.records[len(cb.records)-1]
Expand Down
9 changes: 2 additions & 7 deletions tempodb/wal/append_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,8 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string) (*AppendBloc
}

name := h.fullFilename()
unused, err := os.Create(name)
if err != nil {
return nil, err
}
unused.Close()

f, err := os.OpenFile(name, os.O_APPEND|os.O_WRONLY, 0644)
f, err := os.OpenFile(name, os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -93,7 +88,7 @@ func (h *AppendBlock) Complete(cfg *encoding.BlockConfig, w *WAL, combiner commo
}
defer iterator.Close()

orderedBlock, err := encoding.NewCompleteBlock(cfg, h.meta, iterator, len(records), w.c.CompletedFilepath, h.fullFilename())
orderedBlock, err := encoding.NewCompleteBlock(cfg, h.meta, iterator, len(records), w.c.CompletedFilepath)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 7ab0938

Please sign in to comment.