-
Notifications
You must be signed in to change notification settings - Fork 529
/
create.go
277 lines (228 loc) · 6.96 KB
/
create.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
package vparquet3
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"github.com/google/uuid"
tempo_io "github.com/grafana/tempo/pkg/io"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/parquet-go/parquet-go"
)
type backendWriter struct {
ctx context.Context
w backend.Writer
name string
blockID uuid.UUID
tenantID string
tracker backend.AppendTracker
}
var _ io.WriteCloser = (*backendWriter)(nil)
func (b *backendWriter) Write(p []byte) (n int, err error) {
b.tracker, err = b.w.Append(b.ctx, b.name, b.blockID, b.tenantID, b.tracker, p)
return len(p), err
}
func (b *backendWriter) Close() error {
return b.w.CloseAppend(b.ctx, b.tracker)
}
func CreateBlock(ctx context.Context, cfg *common.BlockConfig, meta *backend.BlockMeta, i common.Iterator, r backend.Reader, to backend.Writer) (*backend.BlockMeta, error) {
s := newStreamingBlock(ctx, cfg, meta, r, to, tempo_io.NewBufferedWriter)
var next func(context.Context) (common.ID, parquet.Row, error)
if ii, ok := i.(*commonIterator); ok {
// Use interal iterator and avoid translation to/from proto
next = ii.NextRow
} else {
// Need to convert from proto->parquet obj
trp := &Trace{}
sch := parquet.SchemaOf(trp)
next = func(context.Context) (common.ID, parquet.Row, error) {
id, tr, err := i.Next(ctx)
if errors.Is(err, io.EOF) || tr == nil {
return id, nil, err
}
// Copy ID to allow it to escape the iterator.
id = append([]byte(nil), id...)
trp, _ = traceToParquet(meta, id, tr, trp) // this logic only executes when we are transitioning from one block version to another. just ignore connected here
row := sch.Deconstruct(completeBlockRowPool.Get(), trp)
return id, row, nil
}
}
for {
id, row, err := next(ctx)
if errors.Is(err, io.EOF) || row == nil {
break
}
err = s.AddRaw(id, row, 0, 0) // start and end time of the wal meta are used.
if err != nil {
return nil, err
}
completeBlockRowPool.Put(row)
if s.EstimatedBufferedBytes() > cfg.RowGroupSizeBytes {
_, err = s.Flush()
if err != nil {
return nil, err
}
}
}
_, err := s.Complete()
if err != nil {
return nil, err
}
return s.meta, nil
}
type streamingBlock struct {
ctx context.Context
bloom *common.ShardedBloomFilter
meta *backend.BlockMeta
bw tempo_io.BufferedWriteFlusher
pw *parquet.GenericWriter[*Trace]
w *backendWriter
r backend.Reader
to backend.Writer
index *index
currentBufferedTraces int
currentBufferedBytes int
}
func newStreamingBlock(ctx context.Context, cfg *common.BlockConfig, meta *backend.BlockMeta, r backend.Reader, to backend.Writer, createBufferedWriter func(w io.Writer) tempo_io.BufferedWriteFlusher) *streamingBlock {
newMeta := backend.NewBlockMetaWithDedicatedColumns(meta.TenantID, (uuid.UUID)(meta.BlockID), VersionString, backend.EncNone, "", meta.DedicatedColumns)
newMeta.StartTime = meta.StartTime
newMeta.EndTime = meta.EndTime
newMeta.ReplicationFactor = meta.ReplicationFactor
// TotalObjects is used here an an estimated count for the bloom filter.
// The real number of objects is tracked below.
bloom := common.NewBloom(cfg.BloomFP, uint(cfg.BloomShardSizeBytes), uint(meta.TotalObjects))
w := &backendWriter{ctx, to, DataFileName, (uuid.UUID)(meta.BlockID), meta.TenantID, nil}
bw := createBufferedWriter(w)
pw := parquet.NewGenericWriter[*Trace](bw)
return &streamingBlock{
ctx: ctx,
meta: newMeta,
bloom: bloom,
bw: bw,
pw: pw,
w: w,
r: r,
to: to,
index: &index{},
}
}
func (b *streamingBlock) Add(tr *Trace, start, end uint32) error {
_, err := b.pw.Write([]*Trace{tr})
if err != nil {
return err
}
id := tr.TraceID
b.index.Add(id)
b.bloom.Add(id)
b.meta.ObjectAdded(start, end)
b.currentBufferedTraces++
b.currentBufferedBytes += estimateMarshalledSizeFromTrace(tr)
return nil
}
func (b *streamingBlock) AddRaw(id []byte, row parquet.Row, start, end uint32) error {
_, err := b.pw.WriteRows([]parquet.Row{row})
if err != nil {
return err
}
b.index.Add(id)
b.bloom.Add(id)
b.meta.ObjectAdded(start, end)
b.currentBufferedTraces++
b.currentBufferedBytes += estimateMarshalledSizeFromParquetRow(row)
return nil
}
func (b *streamingBlock) EstimatedBufferedBytes() int {
return b.currentBufferedBytes
}
func (b *streamingBlock) CurrentBufferedObjects() int {
return b.currentBufferedTraces
}
func (b *streamingBlock) Flush() (int, error) {
// Flush row group
b.index.Flush()
err := b.pw.Flush()
if err != nil {
return 0, err
}
n := b.bw.Len()
b.meta.Size_ += uint64(n)
b.meta.TotalRecords++
b.currentBufferedTraces = 0
b.currentBufferedBytes = 0
// Flush to underlying writer
return n, b.bw.Flush()
}
func (b *streamingBlock) Complete() (int, error) {
// Flush final row group
b.index.Flush()
b.meta.TotalRecords++
err := b.pw.Flush()
if err != nil {
return 0, err
}
// Close parquet file. This writes the footer and metadata.
err = b.pw.Close()
if err != nil {
return 0, err
}
// Now Flush and close out in-memory buffer
n := b.bw.Len()
b.meta.Size_ += uint64(n)
err = b.bw.Flush()
if err != nil {
return 0, err
}
err = b.bw.Close()
if err != nil {
return 0, err
}
err = b.w.Close()
if err != nil {
return 0, err
}
// Read the footer size out of the parquet footer
buf := make([]byte, 8)
err = b.r.ReadRange(b.ctx, DataFileName, (uuid.UUID)(b.meta.BlockID), b.meta.TenantID, b.meta.Size_-8, buf, nil)
if err != nil {
return 0, fmt.Errorf("error reading parquet file footer: %w", err)
}
if string(buf[4:8]) != "PAR1" {
return 0, errors.New("Failed to confirm magic footer while writing a new parquet block")
}
b.meta.FooterSize = binary.LittleEndian.Uint32(buf[0:4])
b.meta.BloomShardCount = uint32(b.bloom.GetShardCount())
return n, writeBlockMeta(b.ctx, b.to, b.meta, b.bloom, b.index)
}
// estimateMarshalledSizeFromTrace attempts to estimate the size of trace in bytes. This is used to make choose
// when to cut a row group during block creation.
// TODO: This function regularly estimates lower values then estimateProtoSize() and the size
// of the actual proto. It's also quite inefficient. Perhaps just using static values per span or attribute
// would be a better choice?
func estimateMarshalledSizeFromTrace(tr *Trace) (size int) {
size += 7 // 7 trace lvl fields
for _, rs := range tr.ResourceSpans {
size += estimateAttrSize(rs.Resource.Attrs)
size += 10 // 10 resource span lvl fields
for _, ils := range rs.ScopeSpans {
size += 2 // 2 scope span lvl fields
for _, s := range ils.Spans {
size += 14 // 14 span lvl fields
size += estimateAttrSize(s.Attrs)
size += estimateEventsSize(s.Events)
}
}
}
return
}
func estimateAttrSize(attrs []Attribute) (size int) {
return len(attrs) * 7 // 7 attribute lvl fields
}
func estimateEventsSize(events []Event) (size int) {
for _, e := range events {
size += 4 // 4 event lvl fields
size += 4 * len(e.Attrs) // 2 event attribute fields
}
return
}