Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
Added tests for import values
Browse files Browse the repository at this point in the history
  • Loading branch information
yuce committed Sep 15, 2017
1 parent d4a5ac8 commit 8fe6a9b
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 4 deletions.
3 changes: 1 addition & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
)

const maxHosts = 10
const sliceWidth = 1048576

// // both Content-Type and Accept headers must be set for protobuf content
var protobufHeaders = map[string]string{
Expand Down Expand Up @@ -273,7 +274,6 @@ func (c *Client) Schema() (*Schema, error) {

// ImportFrame imports bits from the given CSV iterator.
func (c *Client) ImportFrame(frame *Frame, bitIterator BitIterator, batchSize uint) error {
const sliceWidth = 1048576
linesLeft := true
bitGroup := map[uint64][]Bit{}
var currentBatchSize uint
Expand Down Expand Up @@ -315,7 +315,6 @@ func (c *Client) ImportFrame(frame *Frame, bitIterator BitIterator, batchSize ui

// ImportValueFrame imports field values from the given CSV iterator.
func (c *Client) ImportValueFrame(frame *Frame, field string, valueIterator ValueIterator, batchSize uint) error {
const sliceWidth = 1048576
linesLeft := true
valGroup := map[uint64][]FieldValue{}
var currentBatchSize uint
Expand Down
99 changes: 99 additions & 0 deletions client_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,43 @@ func TestCSVImport(t *testing.T) {
}
}

func TestValueCSVImport(t *testing.T) {
client := getClient()
text := `10,7
7,1`
iterator := NewCSVValueIterator(strings.NewReader(text))
frameOptions := &FrameOptions{}
frameOptions.AddIntField("foo", 0, 100)
frame, err := index.Frame("importvalueframe", frameOptions)
if err != nil {
t.Fatal(err)
}
err = client.EnsureFrame(frame)
if err != nil {
t.Fatal(err)
}
bq := index.BatchQuery(
frame.SetBit(1, 10),
frame.SetBit(1, 7),
)
response, err := client.Query(bq, nil)
if err != nil {
t.Fatal(err)
}
err = client.ImportValueFrame(frame, "foo", iterator, 10)
if err != nil {
t.Fatal(err)
}
response, err = client.Query(frame.Sum(frame.Bitmap(1), "foo"), nil)
if err != nil {
t.Fatal(err)
}
target := int64(8)
if target != response.Result().Sum {
t.Fatalf("%d != %d", target, response.Result().Sum)
}
}

func TestCSVExport(t *testing.T) {
client := getClient()
frame, err := index.Frame("exportframe", nil)
Expand Down Expand Up @@ -882,6 +919,19 @@ func TestImportBitIteratorError(t *testing.T) {
}
}

func TestImportValueIteratorError(t *testing.T) {
client := getClient()
frame, err := index.Frame("not-important", nil)
if err != nil {
t.Fatal(err)
}
iterator := NewCSVValueIterator(&BrokenReader{})
err = client.ImportValueFrame(frame, "foo", iterator, 100)
if err == nil {
t.Fatalf("import value frame should fail with broken reader")
}
}

func TestImportFailsOnImportBitsError(t *testing.T) {
server := getMockServer(500, []byte{}, 0)
defer server.Close()
Expand All @@ -896,6 +946,20 @@ func TestImportFailsOnImportBitsError(t *testing.T) {
}
}

func TestValueImportFailsOnImportValueError(t *testing.T) {
server := getMockServer(500, []byte{}, 0)
defer server.Close()
uri, err := NewURIFromAddress(server.URL)
if err != nil {
t.Fatal(err)
}
client := NewClientWithURI(uri)
err = client.importValues("foo", "bar", 0, "foo", []FieldValue{})
if err == nil {
t.Fatalf("importValues should fail when fetch fragment nodes fails")
}
}

func TestImportFrameFailsIfImportBitsFails(t *testing.T) {
data := []byte(`[{"host":"non-existing-domain:9999","internalHost":"10101"}]`)
server := getMockServer(200, data, len(data))
Expand All @@ -916,6 +980,26 @@ func TestImportFrameFailsIfImportBitsFails(t *testing.T) {
}
}

func TestImportValueFrameFailsIfImportValuesFails(t *testing.T) {
data := []byte(`[{"host":"non-existing-domain:9999","internalHost":"10101"}]`)
server := getMockServer(200, data, len(data))
defer server.Close()
uri, err := NewURIFromAddress(server.URL)
if err != nil {
t.Fatal(err)
}
client := NewClientWithURI(uri)
iterator := NewCSVValueIterator(strings.NewReader("10,7"))
frame, err := index.Frame("importframe", nil)
if err != nil {
t.Fatal(err)
}
err = client.ImportValueFrame(frame, "foo", iterator, 10)
if err == nil {
t.Fatalf("ImportValueFrame should fail if importValues fails")
}
}

func TestImportBitsFailInvalidNodeAddress(t *testing.T) {
data := []byte(`[{"host":"10101:","internalHost":"doesn'tmatter"}]`)
server := getMockServer(200, data, len(data))
Expand All @@ -931,6 +1015,21 @@ func TestImportBitsFailInvalidNodeAddress(t *testing.T) {
}
}

func TestImportValuesFailInvalidNodeAddress(t *testing.T) {
data := []byte(`[{"host":"10101:","internalHost":"doesn'tmatter"}]`)
server := getMockServer(200, data, len(data))
defer server.Close()
uri, err := NewURIFromAddress(server.URL)
if err != nil {
t.Fatal(err)
}
client := NewClientWithURI(uri)
err = client.importValues("foo", "bar", 0, "foo", []FieldValue{})
if err == nil {
t.Fatalf("importValues should fail on invalid node host")
}
}

func TestDecodingFragmentNodesFails(t *testing.T) {
server := getMockServer(200, []byte("notjson"), 7)
defer server.Close()
Expand Down
4 changes: 2 additions & 2 deletions imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (b bitsForSort) Less(i, j int) bool {
return bitCmp < 0
}

// FieldValues represents the value for a column within a
// FieldValue represents the value for a column within a
// range-encoded frame.
type FieldValue struct {
ColumnID uint64
Expand All @@ -148,7 +148,7 @@ type CSVValueIterator struct {
scanner *bufio.Scanner
}

// NewCSVBitIterator creates a CSVBitIterator from a Reader.
// NewCSVValueIterator creates a CSVValueIterator from a Reader.
func NewCSVValueIterator(reader io.Reader) *CSVValueIterator {
return &CSVValueIterator{
reader: reader,
Expand Down
52 changes: 52 additions & 0 deletions imports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,32 @@ func TestCSVBitIterator(t *testing.T) {
}
}

func TestCSVValueIterator(t *testing.T) {
iterator := pilosa.NewCSVValueIterator(strings.NewReader(`1,10
5,20
3,41
`))
values := []pilosa.FieldValue{}
for {
value, err := iterator.NextValue()
if err != nil {
break
}
values = append(values, value)
}
if len(values) != 3 {
t.Fatalf("There should be 3 values")
}
target := []pilosa.FieldValue{
{ColumnID: 1, Value: 10},
{ColumnID: 5, Value: 20},
{ColumnID: 3, Value: 41},
}
if !reflect.DeepEqual(target, values) {
t.Fatalf("%v != %v", target, values)
}
}

func TestCSVBitIteratorInvalidInput(t *testing.T) {
invalidInputs := []string{
// less than 2 columns
Expand All @@ -87,6 +113,24 @@ func TestCSVBitIteratorInvalidInput(t *testing.T) {
}
}

func TestCSVValueIteratorInvalidInput(t *testing.T) {
invalidInputs := []string{
// less than 2 columns
"155",
// invalid column ID
"a5,155",
// invalid value
"155,a5",
}
for _, text := range invalidInputs {
iterator := pilosa.NewCSVValueIterator(strings.NewReader(text))
_, err := iterator.NextValue()
if err == nil {
t.Fatalf("CSVValueIterator input: %s should fail", text)
}
}
}

func TestCSVBitIteratorError(t *testing.T) {
iterator := pilosa.NewCSVBitIterator(&BrokenReader{})
_, err := iterator.NextBit()
Expand All @@ -95,6 +139,14 @@ func TestCSVBitIteratorError(t *testing.T) {
}
}

func TestCSVValueIteratorError(t *testing.T) {
iterator := pilosa.NewCSVValueIterator(&BrokenReader{})
_, err := iterator.NextValue()
if err == nil {
t.Fatal("CSVValueIterator should fail with error")
}
}

type BrokenReader struct{}

func (r BrokenReader) Read(p []byte) (n int, err error) {
Expand Down

0 comments on commit 8fe6a9b

Please sign in to comment.