From 8fe6a9bbffaf259b6ae392c232b5cbf9d4c57c2b Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Fri, 15 Sep 2017 13:43:22 +0300 Subject: [PATCH 1/3] Added tests for import values --- client.go | 3 +- client_it_test.go | 99 +++++++++++++++++++++++++++++++++++++++++++++++ imports.go | 4 +- imports_test.go | 52 +++++++++++++++++++++++++ 4 files changed, 154 insertions(+), 4 deletions(-) diff --git a/client.go b/client.go index 3da7a28..16acced 100644 --- a/client.go +++ b/client.go @@ -49,6 +49,7 @@ import ( ) const maxHosts = 10 +const sliceWidth = 1048576 // // both Content-Type and Accept headers must be set for protobuf content var protobufHeaders = map[string]string{ @@ -273,7 +274,6 @@ func (c *Client) Schema() (*Schema, error) { // ImportFrame imports bits from the given CSV iterator. func (c *Client) ImportFrame(frame *Frame, bitIterator BitIterator, batchSize uint) error { - const sliceWidth = 1048576 linesLeft := true bitGroup := map[uint64][]Bit{} var currentBatchSize uint @@ -315,7 +315,6 @@ func (c *Client) ImportFrame(frame *Frame, bitIterator BitIterator, batchSize ui // ImportValueFrame imports field values from the given CSV iterator. func (c *Client) ImportValueFrame(frame *Frame, field string, valueIterator ValueIterator, batchSize uint) error { - const sliceWidth = 1048576 linesLeft := true valGroup := map[uint64][]FieldValue{} var currentBatchSize uint diff --git a/client_it_test.go b/client_it_test.go index 45a74a2..5f65005 100644 --- a/client_it_test.go +++ b/client_it_test.go @@ -665,6 +665,43 @@ func TestCSVImport(t *testing.T) { } } +func TestValueCSVImport(t *testing.T) { + client := getClient() + text := `10,7 + 7,1` + iterator := NewCSVValueIterator(strings.NewReader(text)) + frameOptions := &FrameOptions{} + frameOptions.AddIntField("foo", 0, 100) + frame, err := index.Frame("importvalueframe", frameOptions) + if err != nil { + t.Fatal(err) + } + err = client.EnsureFrame(frame) + if err != nil { + t.Fatal(err) + } + bq := index.BatchQuery( + frame.SetBit(1, 10), + frame.SetBit(1, 7), + ) + response, err := client.Query(bq, nil) + if err != nil { + t.Fatal(err) + } + err = client.ImportValueFrame(frame, "foo", iterator, 10) + if err != nil { + t.Fatal(err) + } + response, err = client.Query(frame.Sum(frame.Bitmap(1), "foo"), nil) + if err != nil { + t.Fatal(err) + } + target := int64(8) + if target != response.Result().Sum { + t.Fatalf("%d != %d", target, response.Result().Sum) + } +} + func TestCSVExport(t *testing.T) { client := getClient() frame, err := index.Frame("exportframe", nil) @@ -882,6 +919,19 @@ func TestImportBitIteratorError(t *testing.T) { } } +func TestImportValueIteratorError(t *testing.T) { + client := getClient() + frame, err := index.Frame("not-important", nil) + if err != nil { + t.Fatal(err) + } + iterator := NewCSVValueIterator(&BrokenReader{}) + err = client.ImportValueFrame(frame, "foo", iterator, 100) + if err == nil { + t.Fatalf("import value frame should fail with broken reader") + } +} + func TestImportFailsOnImportBitsError(t *testing.T) { server := getMockServer(500, []byte{}, 0) defer server.Close() @@ -896,6 +946,20 @@ func TestImportFailsOnImportBitsError(t *testing.T) { } } +func TestValueImportFailsOnImportValueError(t *testing.T) { + server := getMockServer(500, []byte{}, 0) + defer server.Close() + uri, err := NewURIFromAddress(server.URL) + if err != nil { + t.Fatal(err) + } + client := NewClientWithURI(uri) + err = client.importValues("foo", "bar", 0, "foo", []FieldValue{}) + if err == nil { + t.Fatalf("importValues should fail when fetch fragment nodes fails") + } +} + func TestImportFrameFailsIfImportBitsFails(t *testing.T) { data := []byte(`[{"host":"non-existing-domain:9999","internalHost":"10101"}]`) server := getMockServer(200, data, len(data)) @@ -916,6 +980,26 @@ func TestImportFrameFailsIfImportBitsFails(t *testing.T) { } } +func TestImportValueFrameFailsIfImportValuesFails(t *testing.T) { + data := []byte(`[{"host":"non-existing-domain:9999","internalHost":"10101"}]`) + server := getMockServer(200, data, len(data)) + defer server.Close() + uri, err := NewURIFromAddress(server.URL) + if err != nil { + t.Fatal(err) + } + client := NewClientWithURI(uri) + iterator := NewCSVValueIterator(strings.NewReader("10,7")) + frame, err := index.Frame("importframe", nil) + if err != nil { + t.Fatal(err) + } + err = client.ImportValueFrame(frame, "foo", iterator, 10) + if err == nil { + t.Fatalf("ImportValueFrame should fail if importValues fails") + } +} + func TestImportBitsFailInvalidNodeAddress(t *testing.T) { data := []byte(`[{"host":"10101:","internalHost":"doesn'tmatter"}]`) server := getMockServer(200, data, len(data)) @@ -931,6 +1015,21 @@ func TestImportBitsFailInvalidNodeAddress(t *testing.T) { } } +func TestImportValuesFailInvalidNodeAddress(t *testing.T) { + data := []byte(`[{"host":"10101:","internalHost":"doesn'tmatter"}]`) + server := getMockServer(200, data, len(data)) + defer server.Close() + uri, err := NewURIFromAddress(server.URL) + if err != nil { + t.Fatal(err) + } + client := NewClientWithURI(uri) + err = client.importValues("foo", "bar", 0, "foo", []FieldValue{}) + if err == nil { + t.Fatalf("importValues should fail on invalid node host") + } +} + func TestDecodingFragmentNodesFails(t *testing.T) { server := getMockServer(200, []byte("notjson"), 7) defer server.Close() diff --git a/imports.go b/imports.go index 755ec48..6462335 100644 --- a/imports.go +++ b/imports.go @@ -127,7 +127,7 @@ func (b bitsForSort) Less(i, j int) bool { return bitCmp < 0 } -// FieldValues represents the value for a column within a +// FieldValue represents the value for a column within a // range-encoded frame. type FieldValue struct { ColumnID uint64 @@ -148,7 +148,7 @@ type CSVValueIterator struct { scanner *bufio.Scanner } -// NewCSVBitIterator creates a CSVBitIterator from a Reader. +// NewCSVValueIterator creates a CSVValueIterator from a Reader. func NewCSVValueIterator(reader io.Reader) *CSVValueIterator { return &CSVValueIterator{ reader: reader, diff --git a/imports_test.go b/imports_test.go index ed6213b..88f2e12 100644 --- a/imports_test.go +++ b/imports_test.go @@ -67,6 +67,32 @@ func TestCSVBitIterator(t *testing.T) { } } +func TestCSVValueIterator(t *testing.T) { + iterator := pilosa.NewCSVValueIterator(strings.NewReader(`1,10 + 5,20 + 3,41 + `)) + values := []pilosa.FieldValue{} + for { + value, err := iterator.NextValue() + if err != nil { + break + } + values = append(values, value) + } + if len(values) != 3 { + t.Fatalf("There should be 3 values") + } + target := []pilosa.FieldValue{ + {ColumnID: 1, Value: 10}, + {ColumnID: 5, Value: 20}, + {ColumnID: 3, Value: 41}, + } + if !reflect.DeepEqual(target, values) { + t.Fatalf("%v != %v", target, values) + } +} + func TestCSVBitIteratorInvalidInput(t *testing.T) { invalidInputs := []string{ // less than 2 columns @@ -87,6 +113,24 @@ func TestCSVBitIteratorInvalidInput(t *testing.T) { } } +func TestCSVValueIteratorInvalidInput(t *testing.T) { + invalidInputs := []string{ + // less than 2 columns + "155", + // invalid column ID + "a5,155", + // invalid value + "155,a5", + } + for _, text := range invalidInputs { + iterator := pilosa.NewCSVValueIterator(strings.NewReader(text)) + _, err := iterator.NextValue() + if err == nil { + t.Fatalf("CSVValueIterator input: %s should fail", text) + } + } +} + func TestCSVBitIteratorError(t *testing.T) { iterator := pilosa.NewCSVBitIterator(&BrokenReader{}) _, err := iterator.NextBit() @@ -95,6 +139,14 @@ func TestCSVBitIteratorError(t *testing.T) { } } +func TestCSVValueIteratorError(t *testing.T) { + iterator := pilosa.NewCSVValueIterator(&BrokenReader{}) + _, err := iterator.NextValue() + if err == nil { + t.Fatal("CSVValueIterator should fail with error") + } +} + type BrokenReader struct{} func (r BrokenReader) Read(p []byte) (n int, err error) { From 33df7736f3810a003ff01eb117f78175ff779161 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Fri, 15 Sep 2017 13:49:36 +0300 Subject: [PATCH 2/3] Use a custom pilosa server --- .travis.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 29d8cb6..26776d8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,7 +6,10 @@ go: sudo: required group: deprecated-2017Q2 before_install: - - curl -o pilosa https://storage.googleapis.com/pilosa/pilosa-sumreduce && chmod +x pilosa && ./pilosa server & + - curl -o ~/pilosa https://storage.googleapis.com/pilosa/pilosa-valueimport + - chmod +x ~/pilosa + - ~/pilosa server & + - sleep 5 - go get github.com/mattn/goveralls - go get -u github.com/golang/dep/cmd/dep addons: From fa565399a8d803788f184c4f10d5e1884f3aa6fa Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Tue, 19 Sep 2017 16:26:10 +0300 Subject: [PATCH 3/3] Restored travis config, even if the integration tests will fail for the current version of Pilosa. --- .travis.yml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 26776d8..7cc5970 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,11 +5,10 @@ go: - master sudo: required group: deprecated-2017Q2 +services: + - docker before_install: - - curl -o ~/pilosa https://storage.googleapis.com/pilosa/pilosa-valueimport - - chmod +x ~/pilosa - - ~/pilosa server & - - sleep 5 + - docker run -d -p 10101:10101 pilosa/pilosa:v0.6.0 - go get github.com/mattn/goveralls - go get -u github.com/golang/dep/cmd/dep addons: