From e2a7b9356cdb6e273977dce803a3b91ca24cf6ea Mon Sep 17 00:00:00 2001 From: Victor Lourme Date: Wed, 13 Nov 2024 10:49:09 +0900 Subject: [PATCH] feat: reduce memory footprint by selecting portion --- batch.go | 58 +++++++++++++++++++++++++++++++++----------------------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/batch.go b/batch.go index 1c43e56..4a61863 100644 --- a/batch.go +++ b/batch.go @@ -22,47 +22,57 @@ func Batching(table Table, conn driver.Conn, batchSize int, onBatch func([][]int query = fmt.Sprintf("%s WHERE %s > '%s'", query, table.Cursor.Column, table.Cursor.LastSync.Format(time.DateTime)) } - var scannerVal []interface{} - total := 0 - - rows, err := conn.Query(ctx, query) - if err != nil { + countQuery := fmt.Sprintf("SELECT COUNT(*) FROM (%s) AS subquery", query) + var count uint64 + if err := conn.QueryRow(ctx, countQuery).Scan(&count); err != nil { return 0, err } - batch := [][]interface{}{} - for rows.Next() { - if scannerVal == nil { - scannerVal = GetScannerValues(rows.ColumnTypes()) - } + var scannerVal []interface{} + total := 0 + offset := 0 - values := make([]interface{}, len(scannerVal)) - for i := range values { - values[i] = reflect.New(reflect.TypeOf(scannerVal[i])).Interface() + pk := "" + for _, col := range table.Columns { + if col.Primary { + pk = col.Source + break } + } - if err := rows.Scan(values...); err != nil { + for total < int(count) { + rows, err := conn.Query(ctx, fmt.Sprintf("%s ORDER BY %s LIMIT %d OFFSET %d", query, pk, batchSize, offset)) + if err != nil { return 0, err } - batch = append(batch, values) + batch := [][]interface{}{} + for rows.Next() { + if scannerVal == nil { + scannerVal = GetScannerValues(rows.ColumnTypes()) + } - if len(batch) == batchSize { - if err := onBatch(batch); err != nil { + values := make([]interface{}, len(scannerVal)) + for i := range values { + values[i] = reflect.New(reflect.TypeOf(scannerVal[i])).Interface() + } + + if err := rows.Scan(values...); err != nil { return 0, err } - total += len(batch) - batch = [][]interface{}{} + batch = append(batch, values) } - } - if len(batch) > 0 { - total += len(batch) + if len(batch) > 0 { + total += len(batch) - if err := onBatch(batch); err != nil { - return 0, err + if err := onBatch(batch); err != nil { + return 0, err + } } + + offset += batchSize } return total, nil