Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
complete per-record timestamp support
Browse files Browse the repository at this point in the history
  • Loading branch information
jaffee committed Sep 9, 2019
1 parent b811c3b commit e40d84a
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 33 deletions.
108 changes: 83 additions & 25 deletions gpexp/importbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,19 @@ type Batch struct {
// values holds the values for each record of an int field
values map[string][]int64

// times holds a time for each record. (if any of the fields are time fields)
times []QuantizedTime

// clearValues holds a slice of indices into b.ids for each
// integer field which has nil values. After translation, these
// slices will be filled out with the actual column IDs those
// indices pertain to so that they can be cleared.
//
// TODO: This is actually a problem — a nil value doesn't mean
// "clear this value", it should mean "don't touch this value", so
// there is no way currently to update a record with int values
// without knowing all the int values, clearing them, or setting
// them to something else in the process.
clearValues map[string][]uint64

// TODO, support timestamps, set fields with more than one value per record, mutex, and bool.
Expand All @@ -90,8 +99,11 @@ type Batch struct {
transCache Translator
}

// BatchOption is a functional option for Batch objects.
type BatchOption func(b *Batch) error

// OptTranslator allows one to pass in a custom Translator
// implementation for mapping keys to IDs.
func OptTranslator(t Translator) BatchOption {
return func(b *Batch) error {
b.transCache = t
Expand All @@ -112,15 +124,17 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi
rowIDs := make(map[string][]uint64)
values := make(map[string][]int64)
tt := make(map[string]map[string][]int)
hasTime := false
for _, field := range fields {
headerMap[field.Name()] = field
opts := field.Opts()
switch opts.Type() {
case pilosa.FieldTypeDefault, pilosa.FieldTypeSet:
switch typ := opts.Type(); typ {
case pilosa.FieldTypeDefault, pilosa.FieldTypeSet, pilosa.FieldTypeTime:
if opts.Keys() {
tt[field.Name()] = make(map[string][]int)
}
rowIDs[field.Name()] = make([]uint64, 0, size)
hasTime = typ == pilosa.FieldTypeTime || hasTime
case pilosa.FieldTypeInt:
values[field.Name()] = make([]int64, 0, size)
}
Expand All @@ -138,6 +152,9 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi
toTranslateID: make(map[string][]int),
transCache: NewMapTranslator(),
}
if hasTime {
b.times = make([]QuantizedTime, 0, size)
}
for _, opt := range opts {
err := opt(b)
if err != nil {
Expand All @@ -155,7 +172,7 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi
type Row struct {
ID interface{}
Values []interface{}
Time *QuantizedTime
Time QuantizedTime
}

// QuantizedTime represents a moment in time down to some granularity
Expand Down Expand Up @@ -194,7 +211,20 @@ func (qt *QuantizedTime) SetHour(hour string) {
copy(qt.ymdh[8:10], hour)
}

// Reset sets the time to the zero value which generates no time views.
func (qt *QuantizedTime) Reset() {
for i := range qt.ymdh {
qt.ymdh[i] = 0
}
}

// views builds the list of Pilosa views for this particular time,
// given a quantum.
func (qt *QuantizedTime) views(q pilosa.TimeQuantum) ([]string, error) {
zero := QuantizedTime{}
if *qt == zero {
return nil, nil
}
views := make([]string, 0, len(q))
for _, unit := range q {
switch unit {
Expand Down Expand Up @@ -256,6 +286,10 @@ func (b *Batch) Add(rec Row) error {
return errors.Errorf("unsupported id type %T value %v", rid, rid)
}

if b.times != nil {
b.times = append(b.times, rec.Time)
}

for i := 0; i < len(rec.Values); i++ {
field := b.header[i]
switch val := rec.Values[i].(type) {
Expand Down Expand Up @@ -402,14 +436,17 @@ func (b *Batch) doTranslation() error {
func (b *Batch) doImport() error {
eg := errgroup.Group{}

frags := b.makeFragments()
for shard, viewMap := range frags {
for fieldView, bitmap := range viewMap {
fieldView := fieldView
bitmap := bitmap
frags, err := b.makeFragments()
if err != nil {
return errors.Wrap(err, "making fragments")
}
for shard, fieldMap := range frags {
for field, viewMap := range fieldMap {
field := field
viewMap := viewMap
eg.Go(func() error {
err := b.client.ImportRoaringBitmap(b.index.Field(fieldView.field), shard, map[string]*roaring.Bitmap{"": bitmap}, false)
return errors.Wrapf(err, "importing data for %s", fieldView.field)
err := b.client.ImportRoaringBitmap(b.index.Field(field), shard, viewMap, false)
return errors.Wrapf(err, "importing data for %s", field)
})
}
}
Expand All @@ -425,13 +462,15 @@ func (b *Batch) doImport() error {
// if needed though).
var nilSentinel = ^uint64(0)

func (b *Batch) makeFragments() fragments {
func (b *Batch) makeFragments() (fragments, error) {
shardWidth := b.index.ShardWidth()
if shardWidth == 0 {
shardWidth = pilosa.DefaultShardWidth
}
frags := make(fragments)
for fname, rowIDs := range b.rowIDs {
field := b.headerMap[fname]
opts := field.Opts()
curShard := ^uint64(0) // impossible sentinel value for shard.
var curBM *roaring.Bitmap
for j := range b.ids {
Expand All @@ -443,12 +482,30 @@ func (b *Batch) makeFragments() fragments {
curShard = col / shardWidth
curBM = frags.GetOrCreate(curShard, fname, "")
}
curBM.DirectAdd(row*shardWidth + (col % shardWidth))
// TODO this is super ugly, but we want to avoid setting
// bits on the standard view in the specific case when
// there isn't one. Should probably refactor this whole
// loop to be more general w.r.t. views. Also... tests for
// the NoStandardView case would be great.
if !(opts.Type() == pilosa.FieldTypeTime && opts.NoStandardView()) {
curBM.DirectAdd(row*shardWidth + (col % shardWidth))
}
if opts.Type() == pilosa.FieldTypeTime {
views, err := b.times[j].views(opts.TimeQuantum())
if err != nil {
return nil, errors.Wrap(err, "calculating views")
}
for _, view := range views {
tbm := frags.GetOrCreate(curShard, fname, view)
tbm.DirectAdd(row*shardWidth + (col % shardWidth))
}
}
}
}
return frags
return frags, nil
}

// importValueData imports data for int fields.
func (b *Batch) importValueData() error {
shardWidth := b.index.ShardWidth()
if shardWidth == 0 {
Expand Down Expand Up @@ -531,6 +588,7 @@ func (b *Batch) importValueData() error {
// next round. Where possible it does not re-allocate memory.
func (b *Batch) reset() {
b.ids = b.ids[:0]
b.times = b.times[:0]
for fieldName, rowIDs := range b.rowIDs {
b.rowIDs[fieldName] = rowIDs[:0]
m := b.toTranslate[fieldName]
Expand All @@ -549,24 +607,24 @@ func (b *Batch) reset() {
}
}

type fieldView struct {
field string
view string
}

// map[shard][fieldview]fragmentData
type fragments map[uint64]map[fieldView]*roaring.Bitmap
// map[shard][field][view]fragmentData
type fragments map[uint64]map[string]map[string]*roaring.Bitmap

func (f fragments) GetOrCreate(shard uint64, field, view string) *roaring.Bitmap {
viewMap, ok := f[shard]
fieldMap, ok := f[shard]
if !ok {
fieldMap = make(map[string]map[string]*roaring.Bitmap)
}
viewMap, ok := fieldMap[field]
if !ok {
viewMap = make(map[fieldView]*roaring.Bitmap)
viewMap = make(map[string]*roaring.Bitmap)
}
bm, ok := viewMap[fieldView{field: field, view: view}]
bm, ok := viewMap[view]
if !ok {
bm = roaring.NewBTreeBitmap()
viewMap[fieldView{field: field, view: view}] = bm
viewMap[view] = bm
}
f[shard] = viewMap
fieldMap[field] = viewMap
f[shard] = fieldMap
return bm
}
Loading

0 comments on commit e40d84a

Please sign in to comment.