Skip to content

Commit

Permalink
Reinstate the roundtrip tests
Browse files Browse the repository at this point in the history
  • Loading branch information
simonswine committed Jul 11, 2022
1 parent d8ba301 commit dfdca66
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 25 deletions.
12 changes: 6 additions & 6 deletions pkg/firedb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,42 +435,42 @@ func (h *Head) WriteTo(ctx context.Context, path string) error {
{
name: "profiles",
f: func(f io.Writer) error {
w := schemav1.Writer[*schemav1.Profile, *schemav1.ProfilePersister]{}
w := schemav1.ReadWriter[*schemav1.Profile, *schemav1.ProfilePersister]{}
return w.WriteParquetFile(f, h.profiles.slice)
},
},
{
name: "stacktraces",
f: func(f io.Writer) error {
w := schemav1.Writer[*schemav1.Stacktrace, *schemav1.StacktracePersister]{}
w := schemav1.ReadWriter[*schemav1.Stacktrace, *schemav1.StacktracePersister]{}
return w.WriteParquetFile(f, h.stacktraces.slice)
},
},
{
name: "strings",
f: func(f io.Writer) error {
w := schemav1.Writer[string, *schemav1.StringPersister]{}
w := schemav1.ReadWriter[string, *schemav1.StringPersister]{}
return w.WriteParquetFile(f, h.strings.slice)
},
},
{
name: "mappings",
f: func(f io.Writer) error {
w := schemav1.Writer[*profilev1.Mapping, *schemav1.MappingPersister]{}
w := schemav1.ReadWriter[*profilev1.Mapping, *schemav1.MappingPersister]{}
return w.WriteParquetFile(f, h.mappings.slice)
},
},
{
name: "locations",
f: func(f io.Writer) error {
w := schemav1.Writer[*profilev1.Location, *schemav1.LocationPersister]{}
w := schemav1.ReadWriter[*profilev1.Location, *schemav1.LocationPersister]{}
return w.WriteParquetFile(f, h.locations.slice)
},
},
{
name: "functions",
f: func(f io.Writer) error {
w := schemav1.Writer[*profilev1.Function, *schemav1.FunctionPersister]{}
w := schemav1.ReadWriter[*profilev1.Function, *schemav1.FunctionPersister]{}
return w.WriteParquetFile(f, h.functions.slice)
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ type Persister[T any] interface {
SortingColumns() SortingColumns
}

type Writer[T any, P Persister[T]] struct {
}
type ReadWriter[T any, P Persister[T]] struct{}

func (*Writer[T, P]) WriteParquetFile(file io.Writer, elements []T) error {
func (*ReadWriter[T, P]) WriteParquetFile(file io.Writer, elements []T) error {
var (
persister P
rows = make([]parquet.Row, len(elements))
Expand All @@ -46,3 +45,31 @@ func (*Writer[T, P]) WriteParquetFile(file io.Writer, elements []T) error {

return writer.Close()
}

func (*ReadWriter[T, P]) ReadParquetFile(file io.ReaderAt) ([]T, error) {
var (
persister P
reader = parquet.NewReader(file, persister.Schema())
)
defer reader.Close()

var (
rows = make([]parquet.Row, reader.NumRows())
)
if _, err := reader.ReadRows(rows); err != nil {
return nil, err
}

var (
elements = make([]T, reader.NumRows())
err error
)
for pos := range elements {
_, elements[pos], err = persister.Reconstruct(rows[pos])
if err != nil {
return nil, err
}
}

return elements, nil
}
27 changes: 11 additions & 16 deletions pkg/firedb/schemas/v1/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/segmentio/parquet-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -17,6 +18,8 @@ func TestSchemaMatch(t *testing.T) {
stacktracesStructSchema := parquet.SchemaOf(&storedStacktrace{})
require.Equal(t, strings.Replace(stacktracesStructSchema.String(), "message storedStacktrace", "message Stacktrace", 1), stacktracesSchema.String())

stringsStructSchema := parquet.SchemaOf(&storedString{})
require.Equal(t, strings.Replace(stringsStructSchema.String(), "message storedString", "message String", 1), stringsSchema.String())
}

func newStacktraces() []*Stacktrace {
Expand All @@ -32,26 +35,15 @@ func newStacktraces() []*Stacktrace {
func TestStacktracesRoundTrip(t *testing.T) {
var (
s = newStacktraces()
w = &Writer[*Stacktrace, *StacktracePersister]{}
w = &ReadWriter[*Stacktrace, *StacktracePersister]{}
buf bytes.Buffer
)

require.NoError(t, w.WriteParquetFile(&buf, s))

/*
n, err := buf.WriteRows(s.ToRows())
require.NoError(t, err)
assert.Equal(t, 5, n)
var rows = make([]parquet.Row, len(s))
n, err = buf.Rows().ReadRows(rows)
require.NoError(t, err)
assert.Equal(t, 5, n)
sRoundTrip, err := StacktracesFromRows(rows)
require.NoError(t, err)
assert.Equal(t, s, sRoundTrip)
*/
sRead, err := w.ReadParquetFile(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)
assert.Equal(t, newStacktraces(), sRead)
}

func newStrings() []string {
Expand All @@ -67,10 +59,13 @@ func newStrings() []string {
func TestStringsRoundTrip(t *testing.T) {
var (
s = newStrings()
w = &Writer[string, *StringPersister]{}
w = &ReadWriter[string, *StringPersister]{}
buf bytes.Buffer
)

require.NoError(t, w.WriteParquetFile(&buf, s))

sRead, err := w.ReadParquetFile(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)
assert.Equal(t, newStrings(), sRead)
}

0 comments on commit dfdca66

Please sign in to comment.