Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
support for string record IDs, parity with old ingest
Browse files Browse the repository at this point in the history
I've confirmed that this and the "old" ingest produce the same results
in Pilosa (at least by TopNing each field)
  • Loading branch information
jaffee committed Aug 28, 2019
1 parent a3d15a2 commit 62bc87f
Show file tree
Hide file tree
Showing 3 changed files with 316 additions and 118 deletions.
69 changes: 49 additions & 20 deletions cmd/picsv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Main struct {
File string
Index string
BatchSize int
IDField string
}

func NewMain() *Main {
Expand All @@ -26,6 +27,7 @@ func NewMain() *Main {
File: "data.csv",
Index: "picsvtest",
BatchSize: 1000,
IDField: "id",
}
}

Expand All @@ -49,29 +51,34 @@ func (m *Main) Run() error {
if err != nil {
return errors.Wrap(err, "getting schema")
}
index := schema.Index(m.Index)
opts := []pilosa.IndexOption{}
if m.IDField != "" {
opts = append(opts, pilosa.OptIndexKeys(true))
}
index := schema.Index(m.Index, opts...)

header, err := reader.Read()
headerRow, err := reader.Read()
if err != nil {
return errors.Wrap(err, "reading CSV header")
}
log.Println("Got Header: ", header)
fields := processHeader(index, header)
log.Println("Got Header: ", headerRow)
fields, header, getIDFn := processHeader(index, m.IDField, headerRow)

// this has a non-obvious dependence on the previous line... the fields are set up in the index which comes from the schema
client.SyncSchema(schema)
batch := pilosa.NewBatch(client, m.BatchSize, fields)
batch := pilosa.NewBatch(client, m.BatchSize, index, fields)
record := pilosa.Row{
Values: make([]interface{}, len(header)),
}
id := uint64(0)
row, err := reader.Read()
for ; err == nil; row, err = reader.Read() {
record.ID = id
for i, _ := range record.Values {
if i < len(row) {
record.Values[i] = row[i]

numRecords := uint64(0)
for row, err := reader.Read(); err == nil; row, err = reader.Read() {
record.ID = getIDFn(row, numRecords)
for _, meta := range header {
if meta.srcIndex < len(row) {
record.Values[meta.recordIndex] = row[meta.srcIndex]
} else {
record.Values[i] = nil
record.Values[meta.recordIndex] = nil
log.Printf("row is shorter than header: %v", row)
}
}
Expand All @@ -85,9 +92,9 @@ func (m *Main) Run() error {
return errors.Wrap(err, "adding to batch")
}

id++
numRecords++
}
log.Printf("processed %d ids\n", id)

if err != io.EOF && err != nil {
return errors.Wrap(err, "reading csv")
}
Expand All @@ -96,15 +103,37 @@ func (m *Main) Run() error {
return errors.Wrap(err, "final import")
}

log.Printf("processed %d ids\n", numRecords)

return nil
}

func processHeader(index *pilosa.Index, header []string) []*pilosa.Field {
ret := make([]*pilosa.Field, 0, len(header))
for _, fieldName := range header {
ret = append(ret, index.Field(fieldName, pilosa.OptFieldKeys(true)))
type valueMeta struct {
srcIndex int
recordIndex int
}

type idGetter func(row []string, numRecords uint64) interface{}

func processHeader(index *pilosa.Index, idField string, headerRow []string) ([]*pilosa.Field, map[string]valueMeta, idGetter) {
fields := make([]*pilosa.Field, 0, len(headerRow))
header := make(map[string]valueMeta)
getIDFn := func(row []string, numRecords uint64) interface{} {
return numRecords
}
for i, fieldName := range headerRow {
if fieldName == idField {
idIndex := i
getIDFn = func(row []string, numRecords uint64) interface{} {
return row[idIndex]
}
continue
}
header[fieldName] = valueMeta{srcIndex: i, recordIndex: len(fields)}
fields = append(fields, index.Field(fieldName, pilosa.OptFieldKeys(true), pilosa.OptFieldTypeSet(pilosa.CacheTypeRanked, 100000)))
}
return ret

return fields, header, getIDFn
}

func main() {
Expand Down
Loading

0 comments on commit 62bc87f

Please sign in to comment.