Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
add many config options to picsv, test, benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
jaffee committed Aug 28, 2019
1 parent edf6cb8 commit 3146a31
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 20 deletions.
8 changes: 6 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,17 @@ func (c *Client) EnsureField(field *Field) error {

// DeleteIndex deletes an index on the server.
func (c *Client) DeleteIndex(index *Index) error {
return c.DeleteIndexByName(index.Name())
}

// DeleteIndexByName deletes the named index on the server.
func (c *Client) DeleteIndexByName(index string) error {
span := c.tracer.StartSpan("Client.DeleteIndex")
defer span.Finish()

path := fmt.Sprintf("/index/%s", index.name)
path := fmt.Sprintf("/index/%s", index)
_, _, err := c.httpRequest("DELETE", path, nil, nil, false)
return err

}

// DeleteField deletes a field on the server.
Expand Down
1 change: 1 addition & 0 deletions cmd/picsv/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
marketing-*.csv
3 changes: 3 additions & 0 deletions cmd/picsv/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

bench:
GO111MODULE=on go test -bench=. -run=ZZZ -benchtime=3x
177 changes: 159 additions & 18 deletions cmd/picsv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package main

import (
"encoding/csv"
"fmt"
"encoding/json"
"io"
"log"
"os"
"strconv"
"time"

"github.com/jaffee/commandeer"
Expand All @@ -14,11 +15,13 @@ import (
)

type Main struct {
Pilosa []string
File string
Index string
BatchSize int
IDField string
Pilosa []string
File string
Index string
BatchSize int
ConfigFile string

Config *Config `flag:"-"`
}

func NewMain() *Main {
Expand All @@ -27,15 +30,28 @@ func NewMain() *Main {
File: "data.csv",
Index: "picsvtest",
BatchSize: 1000,
IDField: "id",

Config: NewConfig(),
}
}

func (m *Main) Run() error {
start := time.Now()
defer func() {
fmt.Println("Duration: ", time.Since(start))
}()

// Load Config File (if available)
if m.ConfigFile != "" {
f, err := os.Open(m.ConfigFile)
if err != nil {
return errors.Wrap(err, "opening config file")
}
dec := json.NewDecoder(f)
err = dec.Decode(m.Config)
if err != nil {
return errors.Wrap(err, "decoding config file")
}
}
log.Printf("Config: %+v\n", *m)

f, err := os.Open(m.File)
if err != nil {
return errors.Wrap(err, "opening file")
Expand All @@ -52,7 +68,7 @@ func (m *Main) Run() error {
return errors.Wrap(err, "getting schema")
}
opts := []pilosa.IndexOption{}
if m.IDField != "" {
if m.Config.IDField != "" {
opts = append(opts, pilosa.OptIndexKeys(true))
}
index := schema.Index(m.Index, opts...)
Expand All @@ -62,7 +78,7 @@ func (m *Main) Run() error {
return errors.Wrap(err, "reading CSV header")
}
log.Println("Got Header: ", headerRow)
fields, header, getIDFn := processHeader(index, m.IDField, headerRow)
fields, header, getIDFn := processHeader(m.Config, index, headerRow)

// this has a non-obvious dependence on the previous line... the fields are set up in the index which comes from the schema
client.SyncSchema(schema)
Expand All @@ -76,7 +92,7 @@ func (m *Main) Run() error {
record.ID = getIDFn(row, numRecords)
for _, meta := range header {
if meta.srcIndex < len(row) {
record.Values[meta.recordIndex] = row[meta.srcIndex]
record.Values[meta.recordIndex] = meta.valGetter(row[meta.srcIndex])
} else {
record.Values[meta.recordIndex] = nil
log.Printf("row is shorter than header: %v", row)
Expand Down Expand Up @@ -104,33 +120,111 @@ func (m *Main) Run() error {
}

log.Printf("processed %d ids\n", numRecords)

log.Println("Duration: ", time.Since(start))
return nil
}

type valueMeta struct {
srcIndex int
recordIndex int
valGetter func(val string) interface{}
}

type idGetter func(row []string, numRecords uint64) interface{}

func processHeader(index *pilosa.Index, idField string, headerRow []string) ([]*pilosa.Field, map[string]valueMeta, idGetter) {
func processHeader(config *Config, index *pilosa.Index, headerRow []string) ([]*pilosa.Field, map[string]valueMeta, idGetter) {
fields := make([]*pilosa.Field, 0, len(headerRow))
header := make(map[string]valueMeta)
getIDFn := func(row []string, numRecords uint64) interface{} {
return numRecords
}
for i, fieldName := range headerRow {
if fieldName == idField {
if fieldName == config.IDField {
idIndex := i
getIDFn = func(row []string, numRecords uint64) interface{} {
return row[idIndex]
}
continue
}
header[fieldName] = valueMeta{srcIndex: i, recordIndex: len(fields)}
fields = append(fields, index.Field(fieldName, pilosa.OptFieldKeys(true), pilosa.OptFieldTypeSet(pilosa.CacheTypeRanked, 100000)))

var valGetter func(val string) interface{}
srcField, ok := config.SourceFields[fieldName]
if !ok {
srcField = SourceField{
TargetField: fieldName,
Type: "string",
}
config.SourceFields[fieldName] = srcField
}
pilosaField, ok := config.PilosaFields[srcField.TargetField]
if !ok {
pilosaField = Field{
Type: "string",
CacheType: pilosa.CacheTypeRanked,
CacheSize: 100000,
Keys: true,
}
config.PilosaFields[fieldName] = pilosaField
}
switch srcField.Type {
case "ignore":
continue
case "int":
valGetter = func(val string) interface{} {
intVal, err := strconv.Atoi(val)
if err != nil {
return nil
}
return intVal
}
opts := []pilosa.FieldOption{pilosa.OptFieldTypeInt()}
if pilosaField.Max != 0 || pilosaField.Min != 0 {
opts[0] = pilosa.OptFieldTypeInt(pilosaField.Min, pilosaField.Max)
}
fields = append(fields, index.Field(fieldName, opts...))
case "float":
if srcField.Multiplier != 0 {
valGetter = func(val string) interface{} {
floatVal, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil
}
return int64(floatVal * srcField.Multiplier)
}
} else {
valGetter = func(val string) interface{} {
floatVal, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil
}
return int64(floatVal)
}
}
opts := []pilosa.FieldOption{pilosa.OptFieldTypeInt()}
if pilosaField.Max != 0 || pilosaField.Min != 0 {
opts[0] = pilosa.OptFieldTypeInt(pilosaField.Min, pilosaField.Max)
}
fields = append(fields, index.Field(fieldName, opts...))
case "string":
valGetter = func(val string) interface{} {
return val
}
fields = append(fields, index.Field(fieldName, pilosa.OptFieldKeys(pilosaField.Keys), pilosa.OptFieldTypeSet(pilosaField.CacheType, pilosaField.CacheSize)))
case "uint64":
valGetter = func(val string) interface{} {
uintVal, err := strconv.ParseUint(val, 0, 64)
if err != nil {
return nil
}
return uintVal
}
fields = append(fields, index.Field(fieldName, pilosa.OptFieldKeys(pilosaField.Keys), pilosa.OptFieldTypeSet(pilosaField.CacheType, pilosaField.CacheSize)))
}
header[fieldName] = valueMeta{
valGetter: valGetter,
srcIndex: i,
recordIndex: len(fields) - 1,
}
}

return fields, header, getIDFn
Expand All @@ -141,3 +235,50 @@ func main() {
log.Fatal(err)
}
}

func NewConfig() *Config {
return &Config{
PilosaFields: make(map[string]Field),
SourceFields: make(map[string]SourceField),
}
}

type Config struct {
PilosaFields map[string]Field `json:"pilosa-fields"`
SourceFields map[string]SourceField `json:"source-fields"`

// IDField denotes which field in the source should be used for Pilosa record IDs.
IDField string `json:"id-field"`

// IDType denotes whether the ID field should be parsed as a string or uint64.
IDType string `json:"id-type"`
}

type Field struct {
Type string `json:"type"`
Min int64 `json:"min"`
Max int64 `json:"max"`
Keys bool `json:"keys"`
CacheType pilosa.CacheType `json:"cache-type"`
CacheSize int `json:"cache-size"`
// TODO time stuff
}

type SourceField struct {
// TargetField is the Pilosa field that this source field should map to.
TargetField string `json:"target-field"`

// Type denotes how the source field should be parsed. (string,
// int, rowID, float, or ignore). rowID means that the field will
// be parsed as a uint64 and then used directly as a rowID for a
// set field. If "string", key translation must be on for that
// Pilosa field, and it must be a set field. If int or float, it
// must be a Pilosa int field.
Type string `json:"type"`

// Multiplier is for float fields. Because Pilosa does not support
// floats natively, it is sometimes useful to store a float in
// Pilosa as an integer, but first multiplied by some constant
// factor to preserve some amount of precision. If 0 this field won't be used.
Multiplier float64 `json:"multiplier"`
}
Loading

0 comments on commit 3146a31

Please sign in to comment.