Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Don't use batch size for channel buffers #258

Merged
merged 2 commits into from
Oct 28, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions import_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (rim recordImportManager) run(field *Field, iterator RecordIterator, option
}

for i := range recordChans {
recordChans[i] = make(chan []Record, options.batchSize)
recordChans[i] = make(chan []Record, 16)
recordBufs[i] = make([]Record, 0, 16)
chans := importWorkerChannels{
records: recordChans[i],
Expand Down Expand Up @@ -128,27 +128,29 @@ func recordImportWorker(id int, client *Client, field *Field, chans importWorker

readRecords:
for recordBatch := range recordChan {
// It's fine to overrun our allowed batch size slightly, and
// we don't want to generate separate batches for part of a
// 16-item batch.
for _, record := range recordBatch {
recordCount++
shard := record.Shard(shardWidth)
if batchForShard[shard] == nil {
batchForShard[shard] = make([]Record, 0, batchSize)
}
batchForShard[shard] = append(batchForShard[shard], record)

if recordCount >= batchSize {
for shard, records := range batchForShard {
if len(records) == 0 {
continue
}
err = importRecords(id, client, field, shardNodes, shard, records, options, statusChan, state)
if err != nil {
break readRecords
}
batchForShard[shard] = batchForShard[shard][:0]
}
if recordCount >= batchSize {
for shard, records := range batchForShard {
if len(records) == 0 {
continue
}
err = importRecords(id, client, field, shardNodes, shard, records, options, statusChan, state)
if err != nil {
break readRecords
}
recordCount = 0
delete(batchForShard, shard)
}
recordCount = 0
}
}

Expand Down