Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Batch ingest #253

Merged
merged 26 commits into from
Oct 11, 2019
Merged

Batch ingest #253

merged 26 commits into from
Oct 11, 2019

Conversation

jaffee
Copy link
Member

@jaffee jaffee commented Aug 27, 2019

Experimental batch ingestion interface. The godocs hopefully make things clear. See FeatureBaseDB/pdk#130 for some example usage.

Performance-wise, this seems to be far more efficient (at least 5x) than the current import interface, and can probably do even better with some profiling and tuning.

jaffee added 15 commits August 28, 2019 08:50
no channels, should be more memory efficient. Currently only supports strings.
I've confirmed that this and the "old" ingest produce the same results
in Pilosa (at least by TopNing each field)
add local cache for node URIs per index/shard
add exported (questionable) method to client for doing simple integer
imports.
add translated column keys to the local cache.
also add the ability to set different implementations on a Batch. This
is basically gearing up for a shared implementation using an embedded
k/v store.
Now importbatch only uses exported client stuff.
}

type Field struct {
Type string `json:"type"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the term "type" causes some problems in some java config file parsers/tools because it is a reserved word. Just a minor annoyance to consider

@jaffee jaffee changed the title WIP: Batch ingest Batch ingest Sep 6, 2019
adds QuantizedTime type to support efficiently tracking a time quantum
with each Row which will be applied to all the values of time-capable
fields in that row.
The test demonstrates how a nil integer value added to a Batch would
clear an existing integer value instead of ignoring it. This fixes it
by tracking which records have nil integer values for each field, and
removing those ids/values before importing.

In order to be able to execute import requests concurrently, while
re-using the same slices for ids and values for each import, we expose
encoding separately from importing in the Client, so that we can
encode serially reusing the same slices, and import the encoded data
concurrently.
@jaffee
Copy link
Member Author

jaffee commented Oct 2, 2019

@tgruben can you approve this one so I can merge it before the PDK stuff?

jaffee added 2 commits October 4, 2019 09:21
also check for unsupported field types and error
jaffee added 3 commits October 9, 2019 17:31
needed to convert rowIDs and toTranslate to map from field index
rather than field name... going to continue refactoring this to use a
map from index rather than a slice
@tgruben tgruben merged commit 29aaccb into FeatureBaseDB:master Oct 11, 2019
Copy link

@dmibor dmibor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaffee awesome PR :) thought to put my 5 cents of experience, maybe would be useful

return errors.Wrap(err, "making fragments")
}
for shard, fieldMap := range frags {
for field, viewMap := range fieldMap {
Copy link

@dmibor dmibor Oct 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

had quite a bit of time to play around with this way of ingesting on our data - maybe this info would be useful for you guys:

this way of ingesting a fixed size batch would heavily bottleneck when number of shards grows and data that is coming to the batch start to be distributed over more and more shards and therefore fragments. Eventually what happened with out data is that doesnt mapper how small batch is configured - we get 1-2 fragments of most recent shards in a batch that have most of the data in them, about 70%, big ones. And then we get 100s and 100s of tiny fragments of older shards. And importing these 100s of tiny fragments to Pilosa would heavily bottleneck on unioning bitmaps on the Pilosa cluster making ingestion of every batch slow, since data read from disk would inevitably happen when accessing fragments from all the different places of the index.

That is what hard limit on batch size does when IDs repeat themselves. If they do - this issue will always be there but how bad it would be depends on the number of shards and ID repetition pattern. For us it was extremely bad.

What proved to work for us eventually - is to go more granular, put hard limit not to a batch, but to every single fragment. I.e. import fragment to Pilosa as soon as it reaches certain size and flush it from memory. Plus limit number tiny fragment bitmaps in memory by time - to avoid memory blowing up from thousands of tiny fragment bitmaps.

This tremendously reduces and in fact puts an upper bound on number of unions Pilosa cluster has to do during import irrespective of the shard size. It made things a lot faster

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you can consider implementing streaming ingestion when rows are streamed into go-pilosa API and are flushed to Pilosa cluster periodically when above conditions are met

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or maybe I can submit my code of this logic to go-pilosa if you guys are interested in something like that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dmibor thanks for the input!

This experiment is definitely skewed toward making the easier cases really fast rather than handling the more difficult scattered/semi-random ID problem.

We have some Pilosa work going on which is making it more efficient to send small updates to Pilosa, and will have to see how that affects things.

I would be interested to understand how you decided to limit the number of small fragments in memory - you said "by time", but I'm not sure I understand if that's like oldest, or LRU or...

FWIW, this batch is fixed by number of records, not number of bits in a given shard, so no matter how scattered the IDs are, there is a hard upper limit on memory usage.

Copy link

@dmibor dmibor Oct 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaffee Yeah, I do see that this way go-pilosa memory footprint would be stable.

"by time" meaning flush small fragments every N minutes, i.e. for every fragment if can be flushed if 1) it's too big and bit count is more than some config, say 1M bits 2) if it is sitting in memory more than N configured minutes - this timeout makes sure tiny fragments sent to Pilosa not too often and are buffered in memory of go-pilosa longer so that Directadd() of bits into them is fast and does not require whole large bitmaps Unions on Pilosa cluster.
As opposed to fixed batch way of sending when tiny fragments are sent as often as batch fills up, multiple times per second at least, and every time it would require to Union these fragments with everything that exists already in this fragment in Pilosa cluster. And doing that would pften require to bring fragment bitmap data from disk.

On the other hard, this timeout also puts memory footprint of go-pilosa under control, fragment bitmaps can not become too big or there can not be too many of fragments to exhaust memory.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants