Skip to content

Commit

Permalink
feat: reduce memory footprint by selecting portion
Browse files Browse the repository at this point in the history
  • Loading branch information
vlourme committed Nov 13, 2024
1 parent 64b402e commit e2a7b93
Showing 1 changed file with 34 additions and 24 deletions.
58 changes: 34 additions & 24 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,47 +22,57 @@ func Batching(table Table, conn driver.Conn, batchSize int, onBatch func([][]int
query = fmt.Sprintf("%s WHERE %s > '%s'", query, table.Cursor.Column, table.Cursor.LastSync.Format(time.DateTime))
}

var scannerVal []interface{}
total := 0

rows, err := conn.Query(ctx, query)
if err != nil {
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM (%s) AS subquery", query)
var count uint64
if err := conn.QueryRow(ctx, countQuery).Scan(&count); err != nil {
return 0, err
}

batch := [][]interface{}{}
for rows.Next() {
if scannerVal == nil {
scannerVal = GetScannerValues(rows.ColumnTypes())
}
var scannerVal []interface{}
total := 0
offset := 0

values := make([]interface{}, len(scannerVal))
for i := range values {
values[i] = reflect.New(reflect.TypeOf(scannerVal[i])).Interface()
pk := ""
for _, col := range table.Columns {
if col.Primary {
pk = col.Source
break
}
}

if err := rows.Scan(values...); err != nil {
for total < int(count) {
rows, err := conn.Query(ctx, fmt.Sprintf("%s ORDER BY %s LIMIT %d OFFSET %d", query, pk, batchSize, offset))
if err != nil {
return 0, err
}

batch = append(batch, values)
batch := [][]interface{}{}
for rows.Next() {
if scannerVal == nil {
scannerVal = GetScannerValues(rows.ColumnTypes())
}

if len(batch) == batchSize {
if err := onBatch(batch); err != nil {
values := make([]interface{}, len(scannerVal))
for i := range values {
values[i] = reflect.New(reflect.TypeOf(scannerVal[i])).Interface()
}

if err := rows.Scan(values...); err != nil {
return 0, err
}

total += len(batch)
batch = [][]interface{}{}
batch = append(batch, values)
}
}

if len(batch) > 0 {
total += len(batch)
if len(batch) > 0 {
total += len(batch)

if err := onBatch(batch); err != nil {
return 0, err
if err := onBatch(batch); err != nil {
return 0, err
}
}

offset += batchSize
}

return total, nil
Expand Down

0 comments on commit e2a7b93

Please sign in to comment.