From 00cc572218b09981fff6e3c0aa5ef2b2880ed3d2 Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 13 Apr 2022 11:05:28 +0530 Subject: [PATCH 1/3] Reset writers/buffer in a defer func Signed-off-by: Annanay --- tempodb/encoding/v2/data_writer.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/tempodb/encoding/v2/data_writer.go b/tempodb/encoding/v2/data_writer.go index b5f38567109..a5634b57e70 100644 --- a/tempodb/encoding/v2/data_writer.go +++ b/tempodb/encoding/v2/data_writer.go @@ -48,10 +48,10 @@ func (p *dataWriter) Write(id common.ID, obj []byte) (int, error) { } // CutPage implements DataWriter -func (p *dataWriter) CutPage() (int, error) { +func (p *dataWriter) CutPage() (n int, err error) { // compress the raw object buffer buffer := p.objectBuffer.Bytes() - _, err := p.compressionWriter.Write(buffer) + _, err = p.compressionWriter.Write(buffer) if err != nil { return 0, err } @@ -59,21 +59,25 @@ func (p *dataWriter) CutPage() (int, error) { // force flush everything p.compressionWriter.Close() - // now marshal the buffer as a page to the output - bytesWritten, err := marshalPageToWriter(p.compressedBuffer.Bytes(), p.outputWriter, constDataHeader) - if err != nil { - return 0, err - } + defer func() { + // reset buffers for the next write + var resetErr error + p.objectBuffer.Reset() + p.compressedBuffer.Reset() + p.compressionWriter, resetErr = p.pool.ResetWriter(p.compressedBuffer, p.compressionWriter) + if resetErr != nil { // set return variables + n = 0 + err = resetErr + } + }() - // reset buffers for the next write - p.objectBuffer.Reset() - p.compressedBuffer.Reset() - p.compressionWriter, err = p.pool.ResetWriter(p.compressedBuffer, p.compressionWriter) + // now marshal the buffer as a page to the output + n, err = marshalPageToWriter(p.compressedBuffer.Bytes(), p.outputWriter, constDataHeader) if err != nil { return 0, err } - return bytesWritten, err + return n, err } // Complete implements DataWriter From 53e9c9fd10c2fd9d8eceabfbd193d2e84e3477b3 Mon Sep 17 00:00:00 2001 From: Annanay Date: Tue, 19 Apr 2022 20:32:05 +0530 Subject: [PATCH 2/3] Move logic out of defer func and add comment Signed-off-by: Annanay --- tempodb/encoding/v2/data_writer.go | 34 ++++++++++++++++-------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/tempodb/encoding/v2/data_writer.go b/tempodb/encoding/v2/data_writer.go index a5634b57e70..6e20911dddc 100644 --- a/tempodb/encoding/v2/data_writer.go +++ b/tempodb/encoding/v2/data_writer.go @@ -4,6 +4,8 @@ import ( "bytes" "io" + "github.com/pkg/errors" + "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" ) @@ -48,10 +50,10 @@ func (p *dataWriter) Write(id common.ID, obj []byte) (int, error) { } // CutPage implements DataWriter -func (p *dataWriter) CutPage() (n int, err error) { +func (p *dataWriter) CutPage() (int, error) { // compress the raw object buffer buffer := p.objectBuffer.Bytes() - _, err = p.compressionWriter.Write(buffer) + _, err := p.compressionWriter.Write(buffer) if err != nil { return 0, err } @@ -59,25 +61,25 @@ func (p *dataWriter) CutPage() (n int, err error) { // force flush everything p.compressionWriter.Close() - defer func() { - // reset buffers for the next write - var resetErr error - p.objectBuffer.Reset() - p.compressedBuffer.Reset() - p.compressionWriter, resetErr = p.pool.ResetWriter(p.compressedBuffer, p.compressionWriter) - if resetErr != nil { // set return variables - n = 0 - err = resetErr - } - }() - // now marshal the buffer as a page to the output - n, err = marshalPageToWriter(p.compressedBuffer.Bytes(), p.outputWriter, constDataHeader) + bytesWritten, marshalErr := marshalPageToWriter(p.compressedBuffer.Bytes(), p.outputWriter, constDataHeader) + + // reset buffers for the next write + p.objectBuffer.Reset() + p.compressedBuffer.Reset() + p.compressionWriter, err = p.pool.ResetWriter(p.compressedBuffer, p.compressionWriter) if err != nil { return 0, err } - return n, err + // deliberately checking marshalErr after resetting the compression writer to avoid "writer is closed" errors in + // case of issues while writing to disk + // for more details hop on to https://github.com/grafana/tempo/issues/1374 + if marshalErr != nil { + return 0, errors.Wrap(marshalErr, "error marshalling page to writer") + } + + return bytesWritten, err } // Complete implements DataWriter From 7281da7961c145ddff0699667729f23ed956e4a0 Mon Sep 17 00:00:00 2001 From: Annanay Date: Tue, 19 Apr 2022 20:36:13 +0530 Subject: [PATCH 3/3] Changelog Signed-off-by: Annanay --- CHANGELOG.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 94fb71801b8..9e66d419464 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ ## main / unreleased -* [CHANGE] Vulture now exercises search at any point during the block retention to test full backend search. +* [CHANGE] Vulture now exercises search at any point during the block retention to test full backend search. **BREAKING CHANGE** Dropped `tempo-search-retention-duration` parameter. [#1297](https://github.com/grafana/tempo/pull/1297) (@joe-elliott) * [CHANGE] Updated storage.trace.pool.queue_depth default from 200->10000. [#1345](https://github.com/grafana/tempo/pull/1345) (@joe-elliott) * [CHANGE] Update alpine images to 3.15 [#1330](https://github.com/grafana/tempo/pull/1330) (@zalegrala) @@ -8,8 +8,8 @@ * [CHANGE] Add warning threshold for TempoIngesterFlushes and adjust critical threshold [#1354](https://github.com/grafana/tempo/pull/1354) (@zalegrala) * [CHANGE] Include lambda in serverless e2e tests [#1357](https://github.com/grafana/tempo/pull/1357) (@zalegrala) * [CHANGE] Replace mixin TempoIngesterFlushes metric to only look at retries [#1354](https://github.com/grafana/tempo/pull/1354) (@zalegrala) -* [FEATURE]: v2 object encoding added. This encoding adds a start/end timestamp to every record to reduce proto marshalling and increase search speed. - **BREAKING CHANGE** After this rollout the distributors will use a new API on the ingesters. As such you must rollout all ingesters before rolling the +* [FEATURE]: v2 object encoding added. This encoding adds a start/end timestamp to every record to reduce proto marshalling and increase search speed. + **BREAKING CHANGE** After this rollout the distributors will use a new API on the ingesters. As such you must rollout all ingesters before rolling the distributors. Also, during this period, the ingesters will use considerably more resources and as such should be scaled up (or incoming traffic should be heavily throttled). Once all distributors and ingesters have rolled performance will return to normal. Internally we have observed ~1.5x CPU load on the ingesters during the rollout. [#1227](https://github.com/grafana/tempo/pull/1227) (@joe-elliott) @@ -66,6 +66,7 @@ * Includes a new metric to determine how often this range is exceeded: `tempo_warnings_total{reason="outside_ingestion_time_slack"}` * [BUGFIX] Prevent data race / ingester crash during searching by trace id by using xxhash instance as a local variable. [#1387](https://github.com/grafana/tempo/pull/1387) (@bikashmishra100, @sagarwala, @ashwinidulams) * [BUGFIX] Fix spurious "failed to mark block compacted during retention" errors [#1372](https://github.com/grafana/tempo/issues/1372) (@mdisibio) +* [BUGFIX] Fix error message "Writer is closed" by resetting compression writer correctly on the error path. [#1379](https://github.com/grafana/tempo/issues/1379) (@annanay25) * [ENHANCEMENT] Add a startTime and endTime parameter to the Trace by ID Tempo Query API to improve query performance [#1388](https://github.com/grafana/tempo/pull/1388) (@sagarwala, @bikashmishra100, @ashwinidulams) ## v1.3.2 / 2022-02-23