diff --git a/client.go b/client.go index 94cb934..366fd6f 100644 --- a/client.go +++ b/client.go @@ -322,13 +322,17 @@ func (c *Client) EnsureField(field *Field) error { // DeleteIndex deletes an index on the server. func (c *Client) DeleteIndex(index *Index) error { + return c.DeleteIndexByName(index.Name()) +} + +// DeleteIndexByName deletes the named index on the server. +func (c *Client) DeleteIndexByName(index string) error { span := c.tracer.StartSpan("Client.DeleteIndex") defer span.Finish() - path := fmt.Sprintf("/index/%s", index.name) + path := fmt.Sprintf("/index/%s", index) _, _, err := c.httpRequest("DELETE", path, nil, nil, false) return err - } // DeleteField deletes a field on the server. diff --git a/cmd/picsv/.gitignore b/cmd/picsv/.gitignore new file mode 100644 index 0000000..d179ee1 --- /dev/null +++ b/cmd/picsv/.gitignore @@ -0,0 +1 @@ +marketing-*.csv \ No newline at end of file diff --git a/cmd/picsv/Makefile b/cmd/picsv/Makefile new file mode 100644 index 0000000..d8276e1 --- /dev/null +++ b/cmd/picsv/Makefile @@ -0,0 +1,3 @@ + +bench: + GO111MODULE=on go test -bench=. -run=ZZZ -benchtime=3x diff --git a/cmd/picsv/main.go b/cmd/picsv/main.go index 230f83a..e602c15 100644 --- a/cmd/picsv/main.go +++ b/cmd/picsv/main.go @@ -2,10 +2,11 @@ package main import ( "encoding/csv" - "fmt" + "encoding/json" "io" "log" "os" + "strconv" "time" "github.com/jaffee/commandeer" @@ -14,11 +15,13 @@ import ( ) type Main struct { - Pilosa []string - File string - Index string - BatchSize int - IDField string + Pilosa []string + File string + Index string + BatchSize int + ConfigFile string + + Config *Config `flag:"-"` } func NewMain() *Main { @@ -27,15 +30,28 @@ func NewMain() *Main { File: "data.csv", Index: "picsvtest", BatchSize: 1000, - IDField: "id", + + Config: NewConfig(), } } func (m *Main) Run() error { start := time.Now() - defer func() { - fmt.Println("Duration: ", time.Since(start)) - }() + + // Load Config File (if available) + if m.ConfigFile != "" { + f, err := os.Open(m.ConfigFile) + if err != nil { + return errors.Wrap(err, "opening config file") + } + dec := json.NewDecoder(f) + err = dec.Decode(m.Config) + if err != nil { + return errors.Wrap(err, "decoding config file") + } + } + log.Printf("Config: %+v\n", *m) + f, err := os.Open(m.File) if err != nil { return errors.Wrap(err, "opening file") @@ -52,7 +68,7 @@ func (m *Main) Run() error { return errors.Wrap(err, "getting schema") } opts := []pilosa.IndexOption{} - if m.IDField != "" { + if m.Config.IDField != "" { opts = append(opts, pilosa.OptIndexKeys(true)) } index := schema.Index(m.Index, opts...) @@ -62,7 +78,7 @@ func (m *Main) Run() error { return errors.Wrap(err, "reading CSV header") } log.Println("Got Header: ", headerRow) - fields, header, getIDFn := processHeader(index, m.IDField, headerRow) + fields, header, getIDFn := processHeader(m.Config, index, headerRow) // this has a non-obvious dependence on the previous line... the fields are set up in the index which comes from the schema client.SyncSchema(schema) @@ -76,7 +92,7 @@ func (m *Main) Run() error { record.ID = getIDFn(row, numRecords) for _, meta := range header { if meta.srcIndex < len(row) { - record.Values[meta.recordIndex] = row[meta.srcIndex] + record.Values[meta.recordIndex] = meta.valGetter(row[meta.srcIndex]) } else { record.Values[meta.recordIndex] = nil log.Printf("row is shorter than header: %v", row) @@ -104,33 +120,111 @@ func (m *Main) Run() error { } log.Printf("processed %d ids\n", numRecords) - + log.Println("Duration: ", time.Since(start)) return nil } type valueMeta struct { srcIndex int recordIndex int + valGetter func(val string) interface{} } type idGetter func(row []string, numRecords uint64) interface{} -func processHeader(index *pilosa.Index, idField string, headerRow []string) ([]*pilosa.Field, map[string]valueMeta, idGetter) { +func processHeader(config *Config, index *pilosa.Index, headerRow []string) ([]*pilosa.Field, map[string]valueMeta, idGetter) { fields := make([]*pilosa.Field, 0, len(headerRow)) header := make(map[string]valueMeta) getIDFn := func(row []string, numRecords uint64) interface{} { return numRecords } for i, fieldName := range headerRow { - if fieldName == idField { + if fieldName == config.IDField { idIndex := i getIDFn = func(row []string, numRecords uint64) interface{} { return row[idIndex] } continue } - header[fieldName] = valueMeta{srcIndex: i, recordIndex: len(fields)} - fields = append(fields, index.Field(fieldName, pilosa.OptFieldKeys(true), pilosa.OptFieldTypeSet(pilosa.CacheTypeRanked, 100000))) + + var valGetter func(val string) interface{} + srcField, ok := config.SourceFields[fieldName] + if !ok { + srcField = SourceField{ + TargetField: fieldName, + Type: "string", + } + config.SourceFields[fieldName] = srcField + } + pilosaField, ok := config.PilosaFields[srcField.TargetField] + if !ok { + pilosaField = Field{ + Type: "string", + CacheType: pilosa.CacheTypeRanked, + CacheSize: 100000, + Keys: true, + } + config.PilosaFields[fieldName] = pilosaField + } + switch srcField.Type { + case "ignore": + continue + case "int": + valGetter = func(val string) interface{} { + intVal, err := strconv.Atoi(val) + if err != nil { + return nil + } + return intVal + } + opts := []pilosa.FieldOption{pilosa.OptFieldTypeInt()} + if pilosaField.Max != 0 || pilosaField.Min != 0 { + opts[0] = pilosa.OptFieldTypeInt(pilosaField.Min, pilosaField.Max) + } + fields = append(fields, index.Field(fieldName, opts...)) + case "float": + if srcField.Multiplier != 0 { + valGetter = func(val string) interface{} { + floatVal, err := strconv.ParseFloat(val, 64) + if err != nil { + return nil + } + return int64(floatVal * srcField.Multiplier) + } + } else { + valGetter = func(val string) interface{} { + floatVal, err := strconv.ParseFloat(val, 64) + if err != nil { + return nil + } + return int64(floatVal) + } + } + opts := []pilosa.FieldOption{pilosa.OptFieldTypeInt()} + if pilosaField.Max != 0 || pilosaField.Min != 0 { + opts[0] = pilosa.OptFieldTypeInt(pilosaField.Min, pilosaField.Max) + } + fields = append(fields, index.Field(fieldName, opts...)) + case "string": + valGetter = func(val string) interface{} { + return val + } + fields = append(fields, index.Field(fieldName, pilosa.OptFieldKeys(pilosaField.Keys), pilosa.OptFieldTypeSet(pilosaField.CacheType, pilosaField.CacheSize))) + case "uint64": + valGetter = func(val string) interface{} { + uintVal, err := strconv.ParseUint(val, 0, 64) + if err != nil { + return nil + } + return uintVal + } + fields = append(fields, index.Field(fieldName, pilosa.OptFieldKeys(pilosaField.Keys), pilosa.OptFieldTypeSet(pilosaField.CacheType, pilosaField.CacheSize))) + } + header[fieldName] = valueMeta{ + valGetter: valGetter, + srcIndex: i, + recordIndex: len(fields) - 1, + } } return fields, header, getIDFn @@ -141,3 +235,50 @@ func main() { log.Fatal(err) } } + +func NewConfig() *Config { + return &Config{ + PilosaFields: make(map[string]Field), + SourceFields: make(map[string]SourceField), + } +} + +type Config struct { + PilosaFields map[string]Field `json:"pilosa-fields"` + SourceFields map[string]SourceField `json:"source-fields"` + + // IDField denotes which field in the source should be used for Pilosa record IDs. + IDField string `json:"id-field"` + + // IDType denotes whether the ID field should be parsed as a string or uint64. + IDType string `json:"id-type"` +} + +type Field struct { + Type string `json:"type"` + Min int64 `json:"min"` + Max int64 `json:"max"` + Keys bool `json:"keys"` + CacheType pilosa.CacheType `json:"cache-type"` + CacheSize int `json:"cache-size"` + // TODO time stuff +} + +type SourceField struct { + // TargetField is the Pilosa field that this source field should map to. + TargetField string `json:"target-field"` + + // Type denotes how the source field should be parsed. (string, + // int, rowID, float, or ignore). rowID means that the field will + // be parsed as a uint64 and then used directly as a rowID for a + // set field. If "string", key translation must be on for that + // Pilosa field, and it must be a set field. If int or float, it + // must be a Pilosa int field. + Type string `json:"type"` + + // Multiplier is for float fields. Because Pilosa does not support + // floats natively, it is sometimes useful to store a float in + // Pilosa as an integer, but first multiplied by some constant + // factor to preserve some amount of precision. If 0 this field won't be used. + Multiplier float64 `json:"multiplier"` +} diff --git a/cmd/picsv/main_test.go b/cmd/picsv/main_test.go new file mode 100644 index 0000000..ccad440 --- /dev/null +++ b/cmd/picsv/main_test.go @@ -0,0 +1,139 @@ +package main_test + +import ( + "fmt" + "io" + "net/http" + "os" + "testing" + + "github.com/pilosa/go-pilosa" + picsv "github.com/pilosa/go-pilosa/cmd/picsv" +) + +func BenchmarkImportCSV(b *testing.B) { + m := picsv.NewMain() + m.BatchSize = 1 << 20 + m.Index = "picsvbench" + m.File = "marketing-200k.csv" + getRawData(b, m.File) + client, err := pilosa.NewClient(m.Pilosa) + if err != nil { + b.Fatalf("getting client: %v", err) + } + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err := m.Run() + if err != nil { + b.Fatalf("running import: %v", err) + } + b.StopTimer() + err = client.DeleteIndexByName(m.Index) + if err != nil { + b.Fatalf("deleting index: %v", err) + } + b.StartTimer() + } + +} + +func getRawData(t testing.TB, file string) { + if _, err := os.Open(file); err == nil { + return + } else if !os.IsNotExist(err) { + t.Fatalf("opening %s: %v", file, err) + } + // if the file doesn't exist + f, err := os.Create(file) + if err != nil { + t.Fatalf("creating file: %v", err) + } + resp, err := http.Get(fmt.Sprintf("https://molecula-sample-data.s3.amazonaws.com/%s", file)) + if err != nil { + t.Fatalf("getting data: %v", err) + } + if resp.StatusCode > 299 { + t.Fatalf("getting data failed: %v", resp.Status) + } + _, err = io.Copy(f, resp.Body) + if err != nil { + t.Fatalf("copying data into file: %v", err) + } + + err = f.Close() + if err != nil { + t.Fatalf("closing file: %v", err) + } + +} + +func TestImportCSV(t *testing.T) { + m := picsv.NewMain() + m.BatchSize = 1 << 20 + m.Index = "testpicsv" + m.File = "marketing-200k.csv" + m.Config.SourceFields["age"] = picsv.SourceField{TargetField: "age", Type: "float"} + m.Config.PilosaFields["age"] = picsv.Field{Type: "int"} + getRawData(t, m.File) + client, err := pilosa.NewClient(m.Pilosa) + if err != nil { + t.Fatalf("getting client: %v", err) + } + + defer func() { + err = client.DeleteIndexByName(m.Index) + if err != nil { + t.Fatalf("deleting index: %v", err) + } + }() + err = m.Run() + if err != nil { + t.Fatalf("running ingest: %v", err) + } + + schema, err := client.Schema() + if err != nil { + t.Fatalf("getting schema: %v", err) + } + + index := schema.Index(m.Index) + marital := index.Field("marital") + converted := index.Field("converted") + age := index.Field("age") + + tests := []struct { + query *pilosa.PQLRowQuery + bash string + exp int64 + }{ + { + query: marital.Row("married"), + bash: `awk -F, '/married/ {print $1,$4}' marketing-200k.csv | sort | uniq | wc`, + exp: 125514, + }, + { + query: converted.Row("no"), + bash: `awk -F, '{print $1,$17}' marketing-200k.csv | grep "no" |sort | uniq | wc`, + exp: 199999, + }, + { + query: age.Equals(55), + bash: `awk -F, '{print $1,$2}' marketing-200k.csv | grep " 55.0" |sort | uniq | wc`, + exp: 3282, + }, + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + q := index.Count(test.query) + resp, err := client.Query(q) + if err != nil { + t.Fatalf("running query '%s': %v", q.Serialize(), err) + } + if resp.Result().Count() != test.exp { + t.Fatalf("Got unexpected result %d instead of %d for\nquery: %s\nbash: %s", resp.Result().Count(), test.exp, q.Serialize(), test.bash) + } + }) + } +}