Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
Added support for imports
Browse files Browse the repository at this point in the history
  • Loading branch information
yuce committed Jun 19, 2017
1 parent bbd4a5d commit 87573a9
Show file tree
Hide file tree
Showing 9 changed files with 560 additions and 72 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ all: test
cover:
go test -cover -tags=integration

generate-proto:
generate:
protoc --go_out=. internal/public.proto

test:
Expand Down
117 changes: 117 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"sort"
"time"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -181,6 +183,97 @@ func (c *Client) Schema() (*Schema, error) {
return schema, nil
}

func (c *Client) ImportFrame(frame *Frame, bitIterator *CSVBitIterator, batchSize uint) error {
const sliceWidth = 1048576
canContinue := true
bitGroup := map[uint64][]Bit{}
var currentBatchSize uint
indexName := frame.index.name
frameName := frame.name

callback := func(bit Bit) bool {
slice := bit.ColumnID / sliceWidth
if sliceArray, ok := bitGroup[slice]; ok {
bitGroup[slice] = append(sliceArray, bit)
} else {
bitGroup[slice] = []Bit{bit}
}
currentBatchSize++
return currentBatchSize < batchSize
}
for canContinue {
err := bitIterator.Iterate(callback)
if err == io.EOF {
canContinue = false
} else if err != nil {
return err
}
for slice, bits := range bitGroup {
err := c.importBits(indexName, frameName, slice, bits)
if err != nil {
return err
}
}
}

return nil
}

func (c *Client) importBits(indexName string, frameName string, slice uint64, bits []Bit) error {
// The maximum ingestion speed is accomplished by sorting bits by row ID and then column ID
sort.Slice(bits, func(i, j int) bool {
bit := bits[i]
other := bits[j]
bitCmp := bit.RowID - other.RowID
if bitCmp == 0 {
return bit.ColumnID < other.ColumnID
}
return bitCmp < 0
})
nodes, err := c.fetchFragmentNodes(indexName, slice)
if err != nil {
return err
}
for _, node := range nodes {
uri, err := NewURIFromAddress(node.Host)
if err != nil {
return err
}
client := NewClientWithURI(uri)
err = client.importNode(bitsToImportRequest(indexName, frameName, slice, bits))
if err != nil {
return err
}
}

return nil
}

func (c *Client) fetchFragmentNodes(indexName string, slice uint64) ([]FragmentNode, error) {
path := fmt.Sprintf("/fragment/nodes?slice=%d&index=%s", slice, indexName)
_, body, err := c.httpRequest("GET", path, []byte{}, errorCheckedResponse)
if err != nil {
return nil, err
}
fragmentNodes := []FragmentNode{}
err = json.Unmarshal(body, &fragmentNodes)
if err != nil {
return nil, err
}
return fragmentNodes, nil
}

func (c *Client) importNode(request *internal.ImportRequest) error {
data, _ := proto.Marshal(request)
// request.Marshal never returns an error
_, _, err := c.httpRequest("POST", "/import", data, errorCheckedResponse)
if err != nil {
return err
}

return nil
}

func (c *Client) patchIndexTimeQuantum(index *Index) error {
data := []byte(fmt.Sprintf(`{"timeQuantum": "%s"}`, index.options.TimeQuantum))
path := fmt.Sprintf("/index/%s/time-quantum", index.name)
Expand Down Expand Up @@ -273,6 +366,25 @@ func matchError(msg string) error {
return nil
}

func bitsToImportRequest(indexName string, frameName string, slice uint64, bits []Bit) *internal.ImportRequest {
bitmapIDs := make([]uint64, 0, len(bits))
columnIDs := make([]uint64, 0, len(bits))
timestamps := make([]int64, 0, len(bits))
for _, bit := range bits {
bitmapIDs = append(bitmapIDs, bit.RowID)
columnIDs = append(columnIDs, bit.ColumnID)
timestamps = append(timestamps, bit.Timestamp)
}
return &internal.ImportRequest{
Index: indexName,
Frame: frameName,
Slice: slice,
RowIDs: bitmapIDs,
ColumnIDs: columnIDs,
Timestamps: timestamps,
}
}

// ClientOptions control the properties of client connection to the server
type ClientOptions struct {
SocketTimeout time.Duration
Expand Down Expand Up @@ -325,3 +437,8 @@ const (
rawResponse
errorCheckedResponse
)

type FragmentNode struct {
Host string
InternalHost string
}
120 changes: 120 additions & 0 deletions client_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ package pilosa

import (
"bytes"
"errors"
"io"
"net/http"
"net/http/httptest"
"os"
"reflect"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -242,6 +244,39 @@ func TestTopNReturns(t *testing.T) {
}
}

func TestIntersectReturns(t *testing.T) {
client := getClient()
options := &FrameOptions{
RowLabel: "segment_id",
}
frame, err := index.Frame("segments", options)
if err != nil {
t.Fatal(err)
}
err = client.EnsureFrame(frame)
if err != nil {
t.Fatal(err)
}
qry1 := index.BatchQuery(
frame.SetBit(2, 10),
frame.SetBit(2, 15),
frame.SetBit(3, 10),
frame.SetBit(3, 20),
)
client.Query(qry1, nil)
qry2 := index.Intersect(frame.Bitmap(2), frame.Bitmap(3))
response, err := client.Query(qry2, nil)
if err != nil {
t.Fatal(err)
}
if len(response.Results()) != 1 {
t.Fatal("There must be 1 result")
}
if !reflect.DeepEqual(response.Result().Bitmap.Bits, []uint64{10}) {
t.Fatal("Returned bits must be: [10]")
}
}

func TestCreateDeleteIndexFrame(t *testing.T) {
client := getClient()
index1, err := NewIndex("to-be-deleted", nil)
Expand Down Expand Up @@ -502,6 +537,85 @@ func TestInvalidSchema(t *testing.T) {
}
}

func TestCSVImport(t *testing.T) {
client := getClient()
text := `
10,5
2,3
10,7
7,1
`
iterator := NewCSVBitIterator(strings.NewReader(text))
frame, err := index.Frame("importframe", nil)
if err != nil {
t.Fatal(err)
}
err = client.EnsureFrame(frame)
if err != nil {
t.Fatal(err)
}
err = client.ImportFrame(frame, iterator, 10)
if err != nil {
t.Fatal(err)
}

target := []uint64{3, 1, 5}
bq := index.BatchQuery(
frame.Bitmap(2),
frame.Bitmap(7),
frame.Bitmap(10),
)
response, err := client.Query(bq, nil)
if len(response.Results()) != 3 {
t.Fatalf("Result count should be 3")
}
for i, result := range response.Results() {
br := result.Bitmap
if target[i] != br.Bits[0] {
t.Fatalf("%d != %d", target[i], br.Bits[0])
}
}
}

func TestFetchFragmentNodes(t *testing.T) {
client := getClient()
nodes, err := client.fetchFragmentNodes(index.Name(), 0)
if err != nil {
t.Fatal(err)
}
if len(nodes) != 1 {
t.Fatalf("1 node should be returned")
}
}

func TestImportBitIteratorError(t *testing.T) {
client := getClient()
frame, err := index.Frame("not-important", nil)
if err != nil {
t.Fatal(err)
}
iterator := NewCSVBitIterator(&BrokenReader{})
err = client.ImportFrame(frame, iterator, 100)
if err == nil {
t.Fatalf("import frame should fail with broken reader")
}
}

func TestImportFailsForFetchFrameError(t *testing.T) {
client := getClient()
frame, err := index.Frame("not-important", nil)
if err != nil {
t.Fatal(err)
}
iterator := NewCSVBitIterator(strings.NewReader("1,10"))
server := getMockServer(500, []byte{}, 0)
defer server.Close()
err = client.ImportFrame(frame, iterator, 100)
if err == nil {
t.Fatalf("import frame should fail when fetch frames fails")
}
}

func TestResponseWithInvalidType(t *testing.T) {
qr := &internal.QueryResponse{
Err: "",
Expand Down Expand Up @@ -556,3 +670,9 @@ func getMockServer(statusCode int, response []byte, contentLength int) *httptest
})
return httptest.NewServer(handler)
}

type BrokenReader struct{}

func (r BrokenReader) Read(p []byte) (n int, err error) {
return 0, errors.New("broken reader")
}
3 changes: 3 additions & 0 deletions fixture/sample1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1,10,683793200
5,20,683793300
3,41,683793385
Loading

0 comments on commit 87573a9

Please sign in to comment.