Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
support for empty values in importbatch
Browse files Browse the repository at this point in the history
  • Loading branch information
jaffee committed Aug 28, 2019
1 parent 3146a31 commit 802f882
Show file tree
Hide file tree
Showing 9 changed files with 420 additions and 46 deletions.
13 changes: 8 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,10 +671,13 @@ func (c *Client) importValues(field *Field,
return errors.Wrap(err, "importing values to nodes")
}

// ImportValues takes the given integer values and column ids
// (which must all be in the given shard) and imports them into the
// given index,field,shard on all nodes which should hold that shard.
func (c *Client) ImportValues(index, field string, shard uint64, vals []int64, ids []uint64) error {
// ImportValues takes the given integer values and column ids (which
// must all be in the given shard) and imports them into the given
// index,field,shard on all nodes which should hold that shard. It
// assumes that the ids have been translated from keys if necessary
// and so tells Pilosa to ignore checking if the index uses column
// keys.
func (c *Client) ImportValues(index, field string, shard uint64, vals []int64, ids []uint64, clear bool) error {
msg := &pbuf.ImportValueRequest{
Index: index,
Field: field,
Expand All @@ -686,7 +689,7 @@ func (c *Client) ImportValues(index, field string, shard uint64, vals []int64, i
if err != nil {
return errors.Wrap(err, "marshaling to protobuf")
}
path := fmt.Sprintf("/index/%s/field/%s/import", index, field)
path := fmt.Sprintf("/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", index, field, strconv.FormatBool(clear))
c.logImport(index, path, shard, false, data)

uris, err := c.GetURIsForShard(index, shard)
Expand Down
2 changes: 1 addition & 1 deletion client_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ func TestImportValues(t *testing.T) {
t.Fatalf("syncing schema: %v", err)
}

err = client.ImportValues("go-testindex", "intfield", 0, []int64{1, 2, 3}, []uint64{1, 2, 3})
err = client.ImportValues("go-testindex", "intfield", 0, []int64{1, 2, 3}, []uint64{1, 2, 3}, false)
if err != nil {
t.Fatalf("importing values: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/picsv/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
marketing-*.csv
marketing-*.csv
config.json
75 changes: 55 additions & 20 deletions cmd/picsv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"encoding/csv"
"encoding/json"
"fmt"
"io"
"log"
"os"
Expand Down Expand Up @@ -50,7 +51,8 @@ func (m *Main) Run() error {
return errors.Wrap(err, "decoding config file")
}
}
log.Printf("Config: %+v\n", *m)
log.Printf("Flags: %+v\n", *m)
log.Printf("Config: %+v\n", *m.Config)

f, err := os.Open(m.File)
if err != nil {
Expand Down Expand Up @@ -78,7 +80,10 @@ func (m *Main) Run() error {
return errors.Wrap(err, "reading CSV header")
}
log.Println("Got Header: ", headerRow)
fields, header, getIDFn := processHeader(m.Config, index, headerRow)
fields, header, getIDFn, err := processHeader(m.Config, index, headerRow)
if err != nil {
return errors.Wrap(err, "processing header")
}

// this has a non-obvious dependence on the previous line... the fields are set up in the index which comes from the schema
client.SyncSchema(schema)
Expand Down Expand Up @@ -132,7 +137,7 @@ type valueMeta struct {

type idGetter func(row []string, numRecords uint64) interface{}

func processHeader(config *Config, index *pilosa.Index, headerRow []string) ([]*pilosa.Field, map[string]valueMeta, idGetter) {
func processHeader(config *Config, index *pilosa.Index, headerRow []string) ([]*pilosa.Field, map[string]valueMeta, idGetter, error) {
fields := make([]*pilosa.Field, 0, len(headerRow))
header := make(map[string]valueMeta)
getIDFn := func(row []string, numRecords uint64) interface{} {
Expand All @@ -141,8 +146,21 @@ func processHeader(config *Config, index *pilosa.Index, headerRow []string) ([]*
for i, fieldName := range headerRow {
if fieldName == config.IDField {
idIndex := i
getIDFn = func(row []string, numRecords uint64) interface{} {
return row[idIndex]
switch config.IDType {
case "uint64":
getIDFn = func(row []string, numRecords uint64) interface{} {
uintVal, err := strconv.ParseUint(row[idIndex], 0, 64)
if err != nil {
return nil
}
return uintVal
}
case "string":
getIDFn = func(row []string, numRecords uint64) interface{} {
return row[idIndex]
}
default:
return nil, nil, nil, errors.Errorf("unknown IDType: %s", config.IDType)
}
continue
}
Expand All @@ -159,29 +177,27 @@ func processHeader(config *Config, index *pilosa.Index, headerRow []string) ([]*
pilosaField, ok := config.PilosaFields[srcField.TargetField]
if !ok {
pilosaField = Field{
Type: "string",
Type: "set",
CacheType: pilosa.CacheTypeRanked,
CacheSize: 100000,
Keys: true,
}
config.PilosaFields[fieldName] = pilosaField
}

fieldName = srcField.TargetField
switch srcField.Type {
case "ignore":
continue
case "int":
valGetter = func(val string) interface{} {
intVal, err := strconv.Atoi(val)
intVal, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil
}
return intVal
}
opts := []pilosa.FieldOption{pilosa.OptFieldTypeInt()}
if pilosaField.Max != 0 || pilosaField.Min != 0 {
opts[0] = pilosa.OptFieldTypeInt(pilosaField.Min, pilosaField.Max)
}
fields = append(fields, index.Field(fieldName, opts...))
fields = append(fields, index.Field(fieldName, pilosaField.MakeOptions()...))
case "float":
if srcField.Multiplier != 0 {
valGetter = func(val string) interface{} {
Expand All @@ -200,16 +216,15 @@ func processHeader(config *Config, index *pilosa.Index, headerRow []string) ([]*
return int64(floatVal)
}
}
opts := []pilosa.FieldOption{pilosa.OptFieldTypeInt()}
if pilosaField.Max != 0 || pilosaField.Min != 0 {
opts[0] = pilosa.OptFieldTypeInt(pilosaField.Min, pilosaField.Max)
}
fields = append(fields, index.Field(fieldName, opts...))
fields = append(fields, index.Field(fieldName, pilosaField.MakeOptions()...))
case "string":
valGetter = func(val string) interface{} {
if val == "" {
return nil // ignore empty strings
}
return val
}
fields = append(fields, index.Field(fieldName, pilosa.OptFieldKeys(pilosaField.Keys), pilosa.OptFieldTypeSet(pilosaField.CacheType, pilosaField.CacheSize)))
fields = append(fields, index.Field(fieldName, pilosaField.MakeOptions()...))
case "uint64":
valGetter = func(val string) interface{} {
uintVal, err := strconv.ParseUint(val, 0, 64)
Expand All @@ -218,7 +233,7 @@ func processHeader(config *Config, index *pilosa.Index, headerRow []string) ([]*
}
return uintVal
}
fields = append(fields, index.Field(fieldName, pilosa.OptFieldKeys(pilosaField.Keys), pilosa.OptFieldTypeSet(pilosaField.CacheType, pilosaField.CacheSize)))
fields = append(fields, index.Field(fieldName, pilosaField.MakeOptions()...))
}
header[fieldName] = valueMeta{
valGetter: valGetter,
Expand All @@ -227,7 +242,7 @@ func processHeader(config *Config, index *pilosa.Index, headerRow []string) ([]*
}
}

return fields, header, getIDFn
return fields, header, getIDFn, nil
}

func main() {
Expand All @@ -240,6 +255,7 @@ func NewConfig() *Config {
return &Config{
PilosaFields: make(map[string]Field),
SourceFields: make(map[string]SourceField),
IDType: "string",
}
}

Expand All @@ -264,6 +280,22 @@ type Field struct {
// TODO time stuff
}

func (f Field) MakeOptions() (opts []pilosa.FieldOption) {
switch f.Type {
case "set":
opts = append(opts, pilosa.OptFieldKeys(f.Keys), pilosa.OptFieldTypeSet(f.CacheType, f.CacheSize))
case "int":
if f.Max != 0 || f.Min != 0 {
opts = append(opts, pilosa.OptFieldTypeInt(f.Min, f.Max))
} else {
opts = append(opts, pilosa.OptFieldTypeInt())
}
default:
panic(fmt.Sprintf("unknown pilosa field type: %s", f.Type))
}
return opts
}

type SourceField struct {
// TargetField is the Pilosa field that this source field should map to.
TargetField string `json:"target-field"`
Expand All @@ -282,3 +314,6 @@ type SourceField struct {
// factor to preserve some amount of precision. If 0 this field won't be used.
Multiplier float64 `json:"multiplier"`
}

// TODO we should validate the Config once it is constructed.
// What are valid mappings from source fields to pilosa fields?
20 changes: 20 additions & 0 deletions cmd/picsv/main_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package main

import (
"strings"
"testing"
)

func TestProcessHeader(t *testing.T) {
config := NewConfig()
headerRow := []string{"a", "b", "c"}

t.Run("invalid IDType", func(t *testing.T) {
config.IDField = "a"
config.IDType = "nope"
_, _, _, err := processHeader(config, nil, headerRow)
if err == nil || !strings.Contains(err.Error(), "unknown IDType") {
t.Fatalf("unknown IDType gave: %v", err)
}
})
}
Loading

0 comments on commit 802f882

Please sign in to comment.