diff --git a/cmd/picsv/.gitignore b/cmd/picsv/.gitignore deleted file mode 100644 index d9f1f6e..0000000 --- a/cmd/picsv/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -marketing-*.csv -config.json diff --git a/cmd/picsv/Makefile b/cmd/picsv/Makefile deleted file mode 100644 index d8276e1..0000000 --- a/cmd/picsv/Makefile +++ /dev/null @@ -1,3 +0,0 @@ - -bench: - GO111MODULE=on go test -bench=. -run=ZZZ -benchtime=3x diff --git a/cmd/picsv/main.go b/cmd/picsv/main.go deleted file mode 100644 index ba8b928..0000000 --- a/cmd/picsv/main.go +++ /dev/null @@ -1,323 +0,0 @@ -package main - -import ( - "encoding/csv" - "encoding/json" - "fmt" - "io" - "log" - "os" - "strconv" - "time" - - "github.com/jaffee/commandeer" - "github.com/pilosa/go-pilosa" - "github.com/pilosa/go-pilosa/gpexp" - "github.com/pkg/errors" -) - -type Main struct { - Pilosa []string - File string - Index string - BatchSize int - ConfigFile string - - Config *Config `flag:"-"` -} - -func NewMain() *Main { - return &Main{ - Pilosa: []string{"localhost:10101"}, - File: "data.csv", - Index: "picsvtest", - BatchSize: 1000, - - Config: NewConfig(), - } -} - -func (m *Main) Run() error { - start := time.Now() - - // Load Config File (if available) - if m.ConfigFile != "" { - f, err := os.Open(m.ConfigFile) - if err != nil { - return errors.Wrap(err, "opening config file") - } - dec := json.NewDecoder(f) - err = dec.Decode(m.Config) - if err != nil { - return errors.Wrap(err, "decoding config file") - } - } - log.Printf("Flags: %+v\n", *m) - log.Printf("Config: %+v\n", *m.Config) - - f, err := os.Open(m.File) - if err != nil { - return errors.Wrap(err, "opening file") - } - defer f.Close() - reader := csv.NewReader(f) - - client, err := pilosa.NewClient(m.Pilosa) - if err != nil { - return errors.Wrap(err, "getting pilosa client") - } - schema, err := client.Schema() - if err != nil { - return errors.Wrap(err, "getting schema") - } - opts := []pilosa.IndexOption{} - if m.Config.IDField != "" { - opts = append(opts, pilosa.OptIndexKeys(true)) - } - index := schema.Index(m.Index, opts...) - - headerRow, err := reader.Read() - if err != nil { - return errors.Wrap(err, "reading CSV header") - } - log.Println("Got Header: ", headerRow) - fields, header, getIDFn, err := processHeader(m.Config, index, headerRow) - if err != nil { - return errors.Wrap(err, "processing header") - } - - // this has a non-obvious dependence on the previous line... the fields are set up in the index which comes from the schema - client.SyncSchema(schema) - batch, err := gpexp.NewBatch(client, m.BatchSize, index, fields) - if err != nil { - return errors.Wrap(err, "getting new batch") - } - record := gpexp.Row{ - Values: make([]interface{}, len(header)), - } - - numRecords := uint64(0) - for row, err := reader.Read(); err == nil; row, err = reader.Read() { - record.ID = getIDFn(row, numRecords) - for _, meta := range header { - if meta.srcIndex < len(row) { - record.Values[meta.recordIndex] = meta.valGetter(row[meta.srcIndex]) - } else { - record.Values[meta.recordIndex] = nil - log.Printf("row is shorter than header: %v", row) - } - } - err := batch.Add(record) - if err == gpexp.ErrBatchNowFull { - err := batch.Import() - if err != nil { - return errors.Wrap(err, "importing") - } - } else if err != nil { - return errors.Wrap(err, "adding to batch") - } - - numRecords++ - } - - if err != io.EOF && err != nil { - return errors.Wrap(err, "reading csv") - } - err = batch.Import() - if err != nil { - return errors.Wrap(err, "final import") - } - - log.Printf("processed %d ids\n", numRecords) - log.Println("Duration: ", time.Since(start)) - return nil -} - -type valueMeta struct { - srcIndex int - recordIndex int - valGetter func(val string) interface{} -} - -type idGetter func(row []string, numRecords uint64) interface{} - -func processHeader(config *Config, index *pilosa.Index, headerRow []string) ([]*pilosa.Field, map[string]valueMeta, idGetter, error) { - fields := make([]*pilosa.Field, 0, len(headerRow)) - header := make(map[string]valueMeta) - getIDFn := func(row []string, numRecords uint64) interface{} { - return numRecords - } - for i, fieldName := range headerRow { - if fieldName == config.IDField { - idIndex := i - switch config.IDType { - case "uint64": - getIDFn = func(row []string, numRecords uint64) interface{} { - uintVal, err := strconv.ParseUint(row[idIndex], 0, 64) - if err != nil { - return nil - } - return uintVal - } - case "string": - getIDFn = func(row []string, numRecords uint64) interface{} { - return row[idIndex] - } - default: - return nil, nil, nil, errors.Errorf("unknown IDType: %s", config.IDType) - } - continue - } - - var valGetter func(val string) interface{} - srcField, ok := config.SourceFields[fieldName] - if !ok { - srcField = SourceField{ - TargetField: fieldName, - Type: "string", - } - config.SourceFields[fieldName] = srcField - } - pilosaField, ok := config.PilosaFields[srcField.TargetField] - if !ok { - pilosaField = Field{ - Type: "set", - CacheType: pilosa.CacheTypeRanked, - CacheSize: 100000, - Keys: true, - } - config.PilosaFields[fieldName] = pilosaField - } - - fieldName = srcField.TargetField - switch srcField.Type { - case "ignore": - continue - case "int": - valGetter = func(val string) interface{} { - intVal, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil - } - return intVal - } - fields = append(fields, index.Field(fieldName, pilosaField.MakeOptions()...)) - case "float": - if srcField.Multiplier != 0 { - valGetter = func(val string) interface{} { - floatVal, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil - } - return int64(floatVal * srcField.Multiplier) - } - } else { - valGetter = func(val string) interface{} { - floatVal, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil - } - return int64(floatVal) - } - } - fields = append(fields, index.Field(fieldName, pilosaField.MakeOptions()...)) - case "string": - valGetter = func(val string) interface{} { - if val == "" { - return nil // ignore empty strings - } - return val - } - fields = append(fields, index.Field(fieldName, pilosaField.MakeOptions()...)) - case "uint64": - valGetter = func(val string) interface{} { - uintVal, err := strconv.ParseUint(val, 0, 64) - if err != nil { - return nil - } - return uintVal - } - fields = append(fields, index.Field(fieldName, pilosaField.MakeOptions()...)) - } - header[fieldName] = valueMeta{ - valGetter: valGetter, - srcIndex: i, - recordIndex: len(fields) - 1, - } - } - - return fields, header, getIDFn, nil -} - -func main() { - if err := commandeer.Run(NewMain()); err != nil { - log.Fatal(err) - } -} - -func NewConfig() *Config { - return &Config{ - PilosaFields: make(map[string]Field), - SourceFields: make(map[string]SourceField), - IDType: "string", - } -} - -type Config struct { - PilosaFields map[string]Field `json:"pilosa-fields"` - SourceFields map[string]SourceField `json:"source-fields"` - - // IDField denotes which field in the source should be used for Pilosa record IDs. - IDField string `json:"id-field"` - - // IDType denotes whether the ID field should be parsed as a string or uint64. - IDType string `json:"id-type"` -} - -type Field struct { - Type string `json:"type"` - Min int64 `json:"min"` - Max int64 `json:"max"` - Keys bool `json:"keys"` - CacheType pilosa.CacheType `json:"cache-type"` - CacheSize int `json:"cache-size"` - // TODO time stuff -} - -func (f Field) MakeOptions() (opts []pilosa.FieldOption) { - switch f.Type { - case "set": - opts = append(opts, pilosa.OptFieldKeys(f.Keys), pilosa.OptFieldTypeSet(f.CacheType, f.CacheSize)) - case "int": - if f.Max != 0 || f.Min != 0 { - opts = append(opts, pilosa.OptFieldTypeInt(f.Min, f.Max)) - } else { - opts = append(opts, pilosa.OptFieldTypeInt()) - } - default: - panic(fmt.Sprintf("unknown pilosa field type: %s", f.Type)) - } - return opts -} - -type SourceField struct { - // TargetField is the Pilosa field that this source field should map to. - TargetField string `json:"target-field"` - - // Type denotes how the source field should be parsed. (string, - // int, rowID, float, or ignore). rowID means that the field will - // be parsed as a uint64 and then used directly as a rowID for a - // set field. If "string", key translation must be on for that - // Pilosa field, and it must be a set field. If int or float, it - // must be a Pilosa int field. - Type string `json:"type"` - - // Multiplier is for float fields. Because Pilosa does not support - // floats natively, it is sometimes useful to store a float in - // Pilosa as an integer, but first multiplied by some constant - // factor to preserve some amount of precision. If 0 this field won't be used. - Multiplier float64 `json:"multiplier"` -} - -// TODO we should validate the Config once it is constructed. -// What are valid mappings from source fields to pilosa fields? diff --git a/cmd/picsv/main_internal_test.go b/cmd/picsv/main_internal_test.go deleted file mode 100644 index bfae155..0000000 --- a/cmd/picsv/main_internal_test.go +++ /dev/null @@ -1,20 +0,0 @@ -package main - -import ( - "strings" - "testing" -) - -func TestProcessHeader(t *testing.T) { - config := NewConfig() - headerRow := []string{"a", "b", "c"} - - t.Run("invalid IDType", func(t *testing.T) { - config.IDField = "a" - config.IDType = "nope" - _, _, _, err := processHeader(config, nil, headerRow) - if err == nil || !strings.Contains(err.Error(), "unknown IDType") { - t.Fatalf("unknown IDType gave: %v", err) - } - }) -} diff --git a/cmd/picsv/main_test.go b/cmd/picsv/main_test.go deleted file mode 100644 index aa96df3..0000000 --- a/cmd/picsv/main_test.go +++ /dev/null @@ -1,331 +0,0 @@ -package main_test - -import ( - "fmt" - "io" - "net/http" - "os" - "testing" - - "github.com/pilosa/go-pilosa" - picsv "github.com/pilosa/go-pilosa/cmd/picsv" - "github.com/pkg/errors" -) - -func BenchmarkImportCSV(b *testing.B) { - m := picsv.NewMain() - m.BatchSize = 1 << 20 - m.Index = "picsvbench" - m.File = "marketing-200k.csv" - getRawData(b, m.File) - client, err := pilosa.NewClient(m.Pilosa) - if err != nil { - b.Fatalf("getting client: %v", err) - } - b.ResetTimer() - - for i := 0; i < b.N; i++ { - err := m.Run() - if err != nil { - b.Fatalf("running import: %v", err) - } - b.StopTimer() - err = client.DeleteIndexByName(m.Index) - if err != nil { - b.Fatalf("deleting index: %v", err) - } - b.StartTimer() - } - -} - -func getRawData(t testing.TB, file string) { - if _, err := os.Open(file); err == nil { - return - } else if !os.IsNotExist(err) { - t.Fatalf("opening %s: %v", file, err) - } - // if the file doesn't exist - f, err := os.Create(file) - if err != nil { - t.Fatalf("creating file: %v", err) - } - resp, err := http.Get(fmt.Sprintf("https://molecula-sample-data.s3.amazonaws.com/%s", file)) - if err != nil { - t.Fatalf("getting data: %v", err) - } - if resp.StatusCode > 299 { - t.Fatalf("getting data failed: %v", resp.Status) - } - _, err = io.Copy(f, resp.Body) - if err != nil { - t.Fatalf("copying data into file: %v", err) - } - - err = f.Close() - if err != nil { - t.Fatalf("closing file: %v", err) - } - -} - -func TestImportCSV(t *testing.T) { - m := picsv.NewMain() - m.BatchSize = 100000 - m.Index = "testpicsv" - m.File = "marketing-200k.csv" - m.Config.SourceFields["age"] = picsv.SourceField{TargetField: "age", Type: "float"} - m.Config.PilosaFields["age"] = picsv.Field{Type: "int"} - m.Config.IDField = "id" - getRawData(t, m.File) - client, err := pilosa.NewClient(m.Pilosa) - if err != nil { - t.Fatalf("getting client: %v", err) - } - - defer func() { - err = client.DeleteIndexByName(m.Index) - if err != nil { - t.Fatalf("deleting index: %v", err) - } - }() - err = m.Run() - if err != nil { - t.Fatalf("running ingest: %v", err) - } - - schema, err := client.Schema() - if err != nil { - t.Fatalf("getting schema: %v", err) - } - - index := schema.Index(m.Index) - marital := index.Field("marital") - converted := index.Field("converted") - age := index.Field("age") - - tests := []struct { - query *pilosa.PQLRowQuery - bash string - exp int64 - }{ - { - query: marital.Row("married"), - bash: `awk -F, '/married/ {print $1,$4}' marketing-200k.csv | sort | uniq | wc`, - exp: 125514, - }, - { - query: converted.Row("no"), - bash: `awk -F, '{print $1,$17}' marketing-200k.csv | grep "no" |sort | uniq | wc`, - exp: 199999, - }, - { - query: age.Equals(55), - bash: `awk -F, '{print $1,$2}' marketing-200k.csv | grep " 55.0" |sort | uniq | wc`, - exp: 3282, - }, - } - - for i, test := range tests { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - q := index.Count(test.query) - resp, err := client.Query(q) - if err != nil { - t.Fatalf("running query '%s': %v", q.Serialize(), err) - } - if resp.Result().Count() != test.exp { - t.Fatalf("Got unexpected result %d instead of %d for\nquery: %s\nbash: %s", resp.Result().Count(), test.exp, q.Serialize(), test.bash) - } - }) - } -} - -func TestSmallImport(t *testing.T) { - m := picsv.NewMain() - m.BatchSize = 1 << 20 - m.Index = "testsample" - m.File = "testdata/sample.csv" - m.ConfigFile = "config.json" - client, err := pilosa.NewClient(m.Pilosa) - if err != nil { - t.Fatalf("getting client: %v", err) - } - defer func() { - err = client.DeleteIndexByName(m.Index) - if err != nil { - t.Logf("deleting index: %v", err) - } - }() - config := `{ -"pilosa-fields": {"size": {"type": "set", "keys": true, "cache-type": "ranked", "cache-size": 100000}, - "age": {"type": "int"}, - "color": {"type": "set", "keys": true}, - "result": {"type": "int"}, - "dayofweek": {"type": "set", "keys": false, "cache-type": "ranked", "cache-size": 7} - }, -"id-field": "ID", -"id-type": "string", -"source-fields": { - "Size": {"target-field": "size", "type": "string"}, - "Color": {"target-field": "color", "type": "string"}, - "Age": {"target-field": "age", "type": "int"}, - "Result": {"target-field": "result", "type": "float", "multiplier": 100000000}, - "dayofweek": {"target-field": "dayofweek", "type": "uint64"} - } -} -` - data := ` -ID,Size,Color,Age,Result,dayofweek -ABDJ,small,green,42,1.13106317,1 -HFZP,large,red,99,30.23959735,2 -HFZP,small,green,99,NA,3 -EJSK,medium,purple,22,20.23959735,1 -EJSK,large,green,35,25.13106317, -FEFF,,,,,6 -` - writeFile(t, m.ConfigFile, config) - writeFile(t, m.File, data) - - err = m.Run() - if err != nil { - t.Fatalf("running ingest: %v", err) - } - - schema, err := client.Schema() - if err != nil { - t.Fatalf("getting schema: %v", err) - } - - index := schema.Index(m.Index) - size := index.Field("size") - color := index.Field("color") - age := index.Field("age") - result := index.Field("result") - day := index.Field("dayofweek") - - tests := []struct { - query pilosa.PQLQuery - resType string - exp interface{} - }{ - { - query: index.Count(size.Row("small")), - resType: "count", - exp: int64(2), - }, - { - query: size.Row("small"), - resType: "rowKeys", - exp: []string{"ABDJ", "HFZP"}, - }, - { - query: color.Row("green"), - resType: "rowKeys", - exp: []string{"ABDJ", "HFZP", "EJSK"}, - }, - { - query: age.Equals(99), - resType: "rowKeys", - exp: []string{"HFZP"}, - }, - { - query: age.GT(0), - resType: "rowKeys", - exp: []string{"ABDJ", "HFZP", "EJSK"}, - }, - { - query: result.GT(0), - resType: "rowKeys", - exp: []string{"ABDJ", "EJSK"}, - }, - { - query: result.GT(100000), - resType: "rowKeys", - exp: []string{"ABDJ", "EJSK"}, - }, - { - query: day.Row(1), - resType: "rowKeys", - exp: []string{"ABDJ", "EJSK"}, - }, - { - query: day.Row(6), - resType: "rowKeys", - exp: []string{"FEFF"}, - }, - { - query: index.Count(day.Row(3)), - resType: "count", - exp: int64(1), - }, - { - query: index.Count(day.Row(2)), - resType: "count", - exp: int64(1), // not mutually exclusive! - }, - { - query: size.Row(`""`), // TODO... go-pilosa should probably serialize keys into PQL using quotes. - resType: "rowKeys", - exp: []string{}, // empty strings are ignored rather than ingested - }, - { - query: color.Row(`""`), - resType: "rowKeys", - exp: []string{}, // empty strings are ignored rather than ingested - }, - } - - for i, test := range tests { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - resp, err := client.Query(test.query) - if err != nil { - t.Fatalf("running query: %v", err) - } - res := resp.Result() - switch test.resType { - case "count": - if res.Count() != test.exp.(int64) { - t.Fatalf("unexpected count %d is not %d", res.Count(), test.exp.(int64)) - } - case "rowKeys": - got := res.Row().Keys - exp := test.exp.([]string) - if err := isPermutationOf(got, exp); err != nil { - t.Fatalf("unequal rows %v expected/got:\n%v\n%v", err, exp, got) - } - } - }) - } - -} - -func writeFile(t testing.TB, name, contents string) { - cf, err := os.Create(name) - if err != nil { - t.Fatalf("creating config file: %v", err) - } - _, err = cf.Write([]byte(contents)) - if err != nil { - t.Fatalf("writing config file: %v", err) - } -} - -func isPermutationOf(one, two []string) error { - if len(one) != len(two) { - return errors.Errorf("different lengths %d and %d", len(one), len(two)) - } -outer: - for _, vOne := range one { - for j, vTwo := range two { - if vOne == vTwo { - two = append(two[:j], two[j+1:]...) - continue outer - } - } - return errors.Errorf("%s in one but not two", vOne) - } - if len(two) != 0 { - return errors.Errorf("vals in two but not one: %v", two) - } - return nil -} diff --git a/cmd/picsv/testdata/sample.csv b/cmd/picsv/testdata/sample.csv deleted file mode 100644 index 2804233..0000000 --- a/cmd/picsv/testdata/sample.csv +++ /dev/null @@ -1,8 +0,0 @@ - -ID,Size,Color,Age,Result,dayofweek -ABDJ,small,green,42,1.13106317,1 -HFZP,large,red,99,30.23959735,2 -HFZP,small,green,99,NA,3 -EJSK,medium,purple,22,20.23959735,1 -EJSK,large,green,35,25.13106317, -FEFF,,,,,6 diff --git a/gpexp/importbatch.go b/gpexp/importbatch.go index d4a1150..642facd 100644 --- a/gpexp/importbatch.go +++ b/gpexp/importbatch.go @@ -184,7 +184,7 @@ func (b *Batch) Add(rec Row) error { b.toTranslateID[rid] = ints b.ids = append(b.ids, 0) } - default: + default: // TODO support nil ID as being auto-allocated. return errors.Errorf("unsupported id type %T value %v", rid, rid) }