Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(azure): append method on azure backend #736

Merged
merged 5 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## main / unreleased

* [BUGFIX] Azure Backend - Fix an issue with the append method on the Azure backend. [#736](https://github.com/grafana/tempo/pull/736)

## v1.0.0-rc.0

* [ENHANCEMENT] Performance: Improve Ingester Record Insertion. [#681](https://github.com/grafana/tempo/pull/681)
Expand Down
43 changes: 37 additions & 6 deletions tempodb/backend/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bufio"
"bytes"
"context"
"encoding/base64"
"encoding/binary"
"encoding/json"
"io"
"strings"
Expand Down Expand Up @@ -91,7 +93,7 @@ func (rw *readerWriter) Append(ctx context.Context, name string, blockID uuid.UU
} else {
a = tracker.(appendTracker)

_, err := rw.append(ctx, buffer, a.Name)
err := rw.append(ctx, buffer, a.Name)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -222,16 +224,45 @@ func (rw *readerWriter) writeAll(ctx context.Context, name string, b []byte) err
return nil
}

func (rw *readerWriter) append(ctx context.Context, src []byte, name string) (string, error) {
appendBlobURL := rw.containerURL.NewAppendBlobURL(name)
func (rw *readerWriter) append(ctx context.Context, src []byte, name string) error {
appendBlobURL := rw.containerURL.NewBlockBlobURL(name)

resp, err := appendBlobURL.AppendBlock(ctx, bytes.NewReader(src), blob.AppendBlobAccessConditions{}, nil)
// These helper functions convert a binary block ID to a base-64 string and vice versa
// NOTE: The blockID must be <= 64 bytes and ALL blockIDs for the block must be the same length
blockIDBinaryToBase64 := func(blockID []byte) string { return base64.StdEncoding.EncodeToString(blockID) }

blockIDIntToBase64 := func(blockID int) string {
binaryBlockID := (&[64]byte{})[:]
binary.LittleEndian.PutUint32(binaryBlockID, uint32(blockID))
return blockIDBinaryToBase64(binaryBlockID)
}

l, err := appendBlobURL.GetBlockList(ctx, blob.BlockListAll, blob.LeaseAccessConditions{})
if err != nil {
return err
}

// generate the next block id
id := blockIDIntToBase64(len(l.CommittedBlocks) + 1)

_, err = appendBlobURL.StageBlock(ctx, id, bytes.NewReader(src), blob.LeaseAccessConditions{}, nil)
if err != nil {
return "", err
return err
}
return resp.RequestID(), nil

base64BlockIDs := make([]string, len(l.CommittedBlocks)+1)
for i := 0; i < len(l.CommittedBlocks); i++ {
base64BlockIDs[i] = l.CommittedBlocks[i].Name
}

base64BlockIDs[len(l.CommittedBlocks)] = id

// After all the blocks are uploaded, atomically commit them to the blob.
_, err = appendBlobURL.CommitBlockList(ctx, base64BlockIDs, blob.BlobHTTPHeaders{}, blob.Metadata{}, blob.BlobAccessConditions{})
if err != nil {
return err
}
return nil
}

func (rw *readerWriter) writer(ctx context.Context, src io.Reader, name string) error {
Expand Down