diff --git a/.travis.yml b/.travis.yml index 29d8cb6..7cc5970 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,8 +5,10 @@ go: - master sudo: required group: deprecated-2017Q2 +services: + - docker before_install: - - curl -o pilosa https://storage.googleapis.com/pilosa/pilosa-sumreduce && chmod +x pilosa && ./pilosa server & + - docker run -d -p 10101:10101 pilosa/pilosa:v0.6.0 - go get github.com/mattn/goveralls - go get -u github.com/golang/dep/cmd/dep addons: diff --git a/client.go b/client.go index 3da7a28..16acced 100644 --- a/client.go +++ b/client.go @@ -49,6 +49,7 @@ import ( ) const maxHosts = 10 +const sliceWidth = 1048576 // // both Content-Type and Accept headers must be set for protobuf content var protobufHeaders = map[string]string{ @@ -273,7 +274,6 @@ func (c *Client) Schema() (*Schema, error) { // ImportFrame imports bits from the given CSV iterator. func (c *Client) ImportFrame(frame *Frame, bitIterator BitIterator, batchSize uint) error { - const sliceWidth = 1048576 linesLeft := true bitGroup := map[uint64][]Bit{} var currentBatchSize uint @@ -315,7 +315,6 @@ func (c *Client) ImportFrame(frame *Frame, bitIterator BitIterator, batchSize ui // ImportValueFrame imports field values from the given CSV iterator. func (c *Client) ImportValueFrame(frame *Frame, field string, valueIterator ValueIterator, batchSize uint) error { - const sliceWidth = 1048576 linesLeft := true valGroup := map[uint64][]FieldValue{} var currentBatchSize uint diff --git a/client_it_test.go b/client_it_test.go index e2cf0b7..e1b2b65 100644 --- a/client_it_test.go +++ b/client_it_test.go @@ -699,6 +699,43 @@ func TestCSVImport(t *testing.T) { } } +func TestValueCSVImport(t *testing.T) { + client := getClient() + text := `10,7 + 7,1` + iterator := NewCSVValueIterator(strings.NewReader(text)) + frameOptions := &FrameOptions{} + frameOptions.AddIntField("foo", 0, 100) + frame, err := index.Frame("importvalueframe", frameOptions) + if err != nil { + t.Fatal(err) + } + err = client.EnsureFrame(frame) + if err != nil { + t.Fatal(err) + } + bq := index.BatchQuery( + frame.SetBit(1, 10), + frame.SetBit(1, 7), + ) + response, err := client.Query(bq, nil) + if err != nil { + t.Fatal(err) + } + err = client.ImportValueFrame(frame, "foo", iterator, 10) + if err != nil { + t.Fatal(err) + } + response, err = client.Query(frame.Sum(frame.Bitmap(1), "foo"), nil) + if err != nil { + t.Fatal(err) + } + target := int64(8) + if target != response.Result().Sum { + t.Fatalf("%d != %d", target, response.Result().Sum) + } +} + func TestCSVExport(t *testing.T) { client := getClient() frame, err := index.Frame("exportframe", nil) @@ -916,6 +953,19 @@ func TestImportBitIteratorError(t *testing.T) { } } +func TestImportValueIteratorError(t *testing.T) { + client := getClient() + frame, err := index.Frame("not-important", nil) + if err != nil { + t.Fatal(err) + } + iterator := NewCSVValueIterator(&BrokenReader{}) + err = client.ImportValueFrame(frame, "foo", iterator, 100) + if err == nil { + t.Fatalf("import value frame should fail with broken reader") + } +} + func TestImportFailsOnImportBitsError(t *testing.T) { server := getMockServer(500, []byte{}, 0) defer server.Close() @@ -930,6 +980,20 @@ func TestImportFailsOnImportBitsError(t *testing.T) { } } +func TestValueImportFailsOnImportValueError(t *testing.T) { + server := getMockServer(500, []byte{}, 0) + defer server.Close() + uri, err := NewURIFromAddress(server.URL) + if err != nil { + t.Fatal(err) + } + client := NewClientWithURI(uri) + err = client.importValues("foo", "bar", 0, "foo", []FieldValue{}) + if err == nil { + t.Fatalf("importValues should fail when fetch fragment nodes fails") + } +} + func TestImportFrameFailsIfImportBitsFails(t *testing.T) { data := []byte(`[{"host":"non-existing-domain:9999","internalHost":"10101"}]`) server := getMockServer(200, data, len(data)) @@ -950,6 +1014,26 @@ func TestImportFrameFailsIfImportBitsFails(t *testing.T) { } } +func TestImportValueFrameFailsIfImportValuesFails(t *testing.T) { + data := []byte(`[{"host":"non-existing-domain:9999","internalHost":"10101"}]`) + server := getMockServer(200, data, len(data)) + defer server.Close() + uri, err := NewURIFromAddress(server.URL) + if err != nil { + t.Fatal(err) + } + client := NewClientWithURI(uri) + iterator := NewCSVValueIterator(strings.NewReader("10,7")) + frame, err := index.Frame("importframe", nil) + if err != nil { + t.Fatal(err) + } + err = client.ImportValueFrame(frame, "foo", iterator, 10) + if err == nil { + t.Fatalf("ImportValueFrame should fail if importValues fails") + } +} + func TestImportBitsFailInvalidNodeAddress(t *testing.T) { data := []byte(`[{"host":"10101:","internalHost":"doesn'tmatter"}]`) server := getMockServer(200, data, len(data)) @@ -965,6 +1049,21 @@ func TestImportBitsFailInvalidNodeAddress(t *testing.T) { } } +func TestImportValuesFailInvalidNodeAddress(t *testing.T) { + data := []byte(`[{"host":"10101:","internalHost":"doesn'tmatter"}]`) + server := getMockServer(200, data, len(data)) + defer server.Close() + uri, err := NewURIFromAddress(server.URL) + if err != nil { + t.Fatal(err) + } + client := NewClientWithURI(uri) + err = client.importValues("foo", "bar", 0, "foo", []FieldValue{}) + if err == nil { + t.Fatalf("importValues should fail on invalid node host") + } +} + func TestDecodingFragmentNodesFails(t *testing.T) { server := getMockServer(200, []byte("notjson"), 7) defer server.Close() diff --git a/imports.go b/imports.go index 755ec48..6462335 100644 --- a/imports.go +++ b/imports.go @@ -127,7 +127,7 @@ func (b bitsForSort) Less(i, j int) bool { return bitCmp < 0 } -// FieldValues represents the value for a column within a +// FieldValue represents the value for a column within a // range-encoded frame. type FieldValue struct { ColumnID uint64 @@ -148,7 +148,7 @@ type CSVValueIterator struct { scanner *bufio.Scanner } -// NewCSVBitIterator creates a CSVBitIterator from a Reader. +// NewCSVValueIterator creates a CSVValueIterator from a Reader. func NewCSVValueIterator(reader io.Reader) *CSVValueIterator { return &CSVValueIterator{ reader: reader, diff --git a/imports_test.go b/imports_test.go index ed6213b..88f2e12 100644 --- a/imports_test.go +++ b/imports_test.go @@ -67,6 +67,32 @@ func TestCSVBitIterator(t *testing.T) { } } +func TestCSVValueIterator(t *testing.T) { + iterator := pilosa.NewCSVValueIterator(strings.NewReader(`1,10 + 5,20 + 3,41 + `)) + values := []pilosa.FieldValue{} + for { + value, err := iterator.NextValue() + if err != nil { + break + } + values = append(values, value) + } + if len(values) != 3 { + t.Fatalf("There should be 3 values") + } + target := []pilosa.FieldValue{ + {ColumnID: 1, Value: 10}, + {ColumnID: 5, Value: 20}, + {ColumnID: 3, Value: 41}, + } + if !reflect.DeepEqual(target, values) { + t.Fatalf("%v != %v", target, values) + } +} + func TestCSVBitIteratorInvalidInput(t *testing.T) { invalidInputs := []string{ // less than 2 columns @@ -87,6 +113,24 @@ func TestCSVBitIteratorInvalidInput(t *testing.T) { } } +func TestCSVValueIteratorInvalidInput(t *testing.T) { + invalidInputs := []string{ + // less than 2 columns + "155", + // invalid column ID + "a5,155", + // invalid value + "155,a5", + } + for _, text := range invalidInputs { + iterator := pilosa.NewCSVValueIterator(strings.NewReader(text)) + _, err := iterator.NextValue() + if err == nil { + t.Fatalf("CSVValueIterator input: %s should fail", text) + } + } +} + func TestCSVBitIteratorError(t *testing.T) { iterator := pilosa.NewCSVBitIterator(&BrokenReader{}) _, err := iterator.NextBit() @@ -95,6 +139,14 @@ func TestCSVBitIteratorError(t *testing.T) { } } +func TestCSVValueIteratorError(t *testing.T) { + iterator := pilosa.NewCSVValueIterator(&BrokenReader{}) + _, err := iterator.NextValue() + if err == nil { + t.Fatal("CSVValueIterator should fail with error") + } +} + type BrokenReader struct{} func (r BrokenReader) Read(p []byte) (n int, err error) {