Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
Merge pull request #258 from seebs/channelbuffer
Browse files Browse the repository at this point in the history
Don't use batch size for channel buffers
  • Loading branch information
seebs authored Oct 28, 2019
2 parents b8f364f + 86b9d7d commit 39c2f0e
Showing 1 changed file with 15 additions and 13 deletions.
28 changes: 15 additions & 13 deletions import_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (rim recordImportManager) run(field *Field, iterator RecordIterator, option
}

for i := range recordChans {
recordChans[i] = make(chan []Record, options.batchSize)
recordChans[i] = make(chan []Record, 16)
recordBufs[i] = make([]Record, 0, 16)
chans := importWorkerChannels{
records: recordChans[i],
Expand Down Expand Up @@ -128,27 +128,29 @@ func recordImportWorker(id int, client *Client, field *Field, chans importWorker

readRecords:
for recordBatch := range recordChan {
// It's fine to overrun our allowed batch size slightly, and
// we don't want to generate separate batches for part of a
// 16-item batch.
for _, record := range recordBatch {
recordCount++
shard := record.Shard(shardWidth)
if batchForShard[shard] == nil {
batchForShard[shard] = make([]Record, 0, batchSize)
}
batchForShard[shard] = append(batchForShard[shard], record)

if recordCount >= batchSize {
for shard, records := range batchForShard {
if len(records) == 0 {
continue
}
err = importRecords(id, client, field, shardNodes, shard, records, options, statusChan, state)
if err != nil {
break readRecords
}
batchForShard[shard] = batchForShard[shard][:0]
}
if recordCount >= batchSize {
for shard, records := range batchForShard {
if len(records) == 0 {
continue
}
err = importRecords(id, client, field, shardNodes, shard, records, options, statusChan, state)
if err != nil {
break readRecords
}
recordCount = 0
delete(batchForShard, shard)
}
recordCount = 0
}
}

Expand Down

0 comments on commit 39c2f0e

Please sign in to comment.