Skip to content

Commit

Permalink
ccl/backupccl: add memory monitor to external SST iterators in restore
Browse files Browse the repository at this point in the history
Previously, there was no limit on the amount of memory that can be used while
constructing edternal SST iterators during restore. This patch adds a memory
monitor to limit the amount of memory that can be used to construct external
SST iterators. If a restore processor fails to acquire enough memory to open
the next file for a restore span, it will send the iterator for all of the open
files it has accumulated so far, and wait until it can acquire the memory to
resume constructing the iterator for the remaining files.

The memory limit can be controlled via the new cluster setting
bulkio.restore.per_processor_memory_limit. Regardless of the setting,
however, the amount of memory used will not exceed
COCKROACH_RESTORE_MEM_FRACTION * max SQL memory. The new environment
variable COCKROACH_RESTORE_MEM_FRACTION defaults to 0.5.

Release note: None
  • Loading branch information
Rui Hu committed May 10, 2023
1 parent fa92cdd commit 84ed8ac
Show file tree
Hide file tree
Showing 20 changed files with 1,113 additions and 94 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ ALL_TESTS = [
"//pkg/ccl/backupccl/backupinfo:backupinfo_test",
"//pkg/ccl/backupccl/backuprand:backuprand_test",
"//pkg/ccl/backupccl/backupresolver:backupresolver_test",
"//pkg/ccl/backupccl/backuputils:backuputils_test",
"//pkg/ccl/backupccl:backupccl_test",
"//pkg/ccl/baseccl:baseccl_test",
"//pkg/ccl/benchccl/rttanalysisccl:rttanalysisccl_test",
Expand Down Expand Up @@ -744,6 +745,7 @@ GO_TARGETS = [
"//pkg/ccl/backupccl/backupresolver:backupresolver_test",
"//pkg/ccl/backupccl/backuptestutils:backuptestutils",
"//pkg/ccl/backupccl/backuputils:backuputils",
"//pkg/ccl/backupccl/backuputils:backuputils_test",
"//pkg/ccl/backupccl:backupccl",
"//pkg/ccl/backupccl:backupccl_test",
"//pkg/ccl/baseccl:baseccl",
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ go_library(
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/quotapool",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/span",
Expand Down Expand Up @@ -208,6 +209,7 @@ go_test(
"//pkg/ccl/backupccl/backupinfo",
"//pkg/ccl/backupccl/backuppb",
"//pkg/ccl/backupccl/backuptestutils",
"//pkg/ccl/backupccl/backuputils",
"//pkg/ccl/kvccl",
"//pkg/ccl/multiregionccl",
"//pkg/ccl/multiregionccl/multiregionccltestutils",
Expand Down Expand Up @@ -289,7 +291,9 @@ go_test(
"//pkg/util/admission",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/envutil",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/ioctx",
"//pkg/util/leaktest",
"//pkg/util/limit",
Expand Down
193 changes: 190 additions & 3 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,7 @@ func checkInProgressBackupRestore(
<-exportSpanCompleteCh
}
},
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
RunAfterProcessingRestoreSpanEntry: func(_ context.Context, _ *execinfrapb.RestoreSpanEntry) {
<-allowResponse
},
},
Expand Down Expand Up @@ -1612,7 +1612,7 @@ func TestRestoreCheckpointing(t *testing.T) {
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
RunAfterProcessingRestoreSpanEntry: func(_ context.Context, _ *execinfrapb.RestoreSpanEntry) {
// Because the restore processor has several workers that
// concurrently send addsstable requests and because all workers will
// wait on the lock below, when one flush gets blocked on the
Expand Down Expand Up @@ -7364,7 +7364,7 @@ func TestClientDisconnect(t *testing.T) {

args := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{RunAfterProcessingRestoreSpanEntry: func(ctx context.Context) {
DistSQL: &execinfra.TestingKnobs{BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{RunAfterProcessingRestoreSpanEntry: func(ctx context.Context, _ *execinfrapb.RestoreSpanEntry) {
blockBackupOrRestore(ctx)
}}},
Store: &kvserver.StoreTestingKnobs{
Expand Down Expand Up @@ -11059,3 +11059,190 @@ CREATE TABLE child_pk (k INT8 PRIMARY KEY REFERENCES parent);
sqlDB.Exec(t, `DROP DATABASE test`)
}
}

// Verify that during restore, if a restore span has too many files to fit in
// the memory budget with a single SST iterator, the restore processor should
// repeatedly open and process iterators for as many files as can fit within the
// budget until the span is finished.
func TestRestoreMemoryMonitoring(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderRace(t, "the largest tests are too slow to run under race")
skip.UnderStress(t, "the largest tests are too slow to run under stress")

const splitSize = 10
for _, numSplits := range []int{10, 100, 200} {
for _, numInc := range []int{0, 1, 3, 10} {
for _, restoreProcessorMaxFiles := range []int{5, 10, 20} {
t.Run(fmt.Sprintf("splits=%d/inc=%d/procMaxFiles=%d", numSplits, numInc, restoreProcessorMaxFiles), func(t *testing.T) {
numAccounts := numSplits * splitSize
var expectedNumFiles int
restoreProcessorKnobCount := atomic.Uint32{}
args := base.TestServerArgs{
DefaultTestTenant: base.TestTenantDisabled,
SQLMemoryPoolSize: 1 << 30, // Large enough for all mem limit settings.
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(ctx context.Context, entry *execinfrapb.RestoreSpanEntry) {
// The total size of the backup files should be less than the target
// SST size, thus should all fit in one import span.
require.Equal(t, expectedNumFiles, len(entry.Files))
restoreProcessorKnobCount.Add(1)
},
},
},
},
}
params := base.TestClusterArgs{ServerArgs: args}
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
defer cleanupFn()

sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.memory_monitor_ssts=true")
sqlDB.Exec(t, "SET CLUSTER SETTING kv.bulk_io_write.restore_node_concurrency=2")

// Add some splits in the table, and set the target file size to be something
// small so that we get one flushed file per split in the backup.
sqlDB.Exec(t, "ALTER TABLE data.bank SPLIT AT SELECT generate_series($1::INT, $2, $3)", 0, numAccounts, splitSize)
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.backup.file_size = '1b'")
sqlDB.Exec(t, "BACKUP data.bank INTO 'userfile:///backup'")

// Take some incremental backups after mutating some rows. Take note of the
// splits that have been changed as that determines the number of incremental
// files that are created.
var numIncFiles int
for i := 0; i < numInc; i++ {
incSplitsWithFile := make(map[int]bool)

for n := 0; n < 100; n++ {
id := rand.Intn(numAccounts)
sqlDB.Exec(t, `UPDATE data.bank SET balance = balance + 1 WHERE id = $1`, id)
split := id / splitSize
incSplitsWithFile[split] = true
}

sqlDB.Exec(t, `BACKUP data.bank INTO latest IN 'userfile:///backup' WITH revision_history`)
numIncFiles += len(incSplitsWithFile)
}

expectedNumFiles += numSplits + numIncFiles
// Verify the file counts in the backup.
var numFiles int
sqlDB.QueryRow(t, "SELECT count(*) FROM [SHOW BACKUP FILES FROM latest IN 'userfile:///backup']").Scan(&numFiles)
require.Equal(t, expectedNumFiles, numFiles)

sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", restoreProcessorMaxFiles*sstReaderOverheadBytesPerFile)

sqlDB.Exec(t, "CREATE DATABASE data2")
sqlDB.Exec(t, "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH OPTIONS (into_db='data2')")

// Assert that the restore processor is processing the same span multiple
// times, and the count is based on what's expected from the memory budget.
// The expected number is just the ceiling of expectedNumFiles/restoreProcessorMaxFiles.
require.Equal(t, (expectedNumFiles-1)/restoreProcessorMaxFiles+1, int(restoreProcessorKnobCount.Load()))

// Verify data in the restored table.
expectedFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data.bank")
actualFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data2.bank")
require.Equal(t, expectedFingerprints, actualFingerprints)
})
}
}
}
}

// Verify that restore with memory monitoring should be able to succeed with
// partial SST iterators that shadow previously written values.
func TestRestoreMemoryMonitoringWithShadowing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numAccounts = 10
const numIncrementals = 10
const restoreProcessorMaxFiles = 5

restoreProcessorKnobCount := atomic.Uint32{}

args := base.TestServerArgs{
DefaultTestTenant: base.TestTenantDisabled,
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterProcessingRestoreSpanEntry: func(ctx context.Context, entry *execinfrapb.RestoreSpanEntry) {
restoreProcessorKnobCount.Add(1)
},
},
},
},
}
params := base.TestClusterArgs{ServerArgs: args}
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
defer cleanupFn()

sqlDB.Exec(t, "SET CLUSTER SETTING kv.bulk_io_write.restore_node_concurrency = 1")
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.memory_monitor_ssts=true")
sqlDB.Exec(t, "BACKUP data.bank INTO 'userfile:///backup'")

// Repeatedly alter a single row and do an incremental backup.
for i := 0; i < numIncrementals; i++ {
sqlDB.Exec(t, `UPDATE data.bank SET balance = $1 WHERE id = $2`, 1000+i, i)
sqlDB.Exec(t, "BACKUP data.bank INTO latest IN 'userfile:///backup'")
}

// Set the memory budget for the restore processor to be enough to open 5
// files.
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", restoreProcessorMaxFiles*sstReaderOverheadBytesPerFile)

sqlDB.Exec(t, "CREATE DATABASE data2")
sqlDB.Exec(t, "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH OPTIONS (into_db='data2')")
files := sqlDB.QueryStr(t, "SHOW BACKUP FILES FROM latest IN 'userfile:///backup'")
require.Equal(t, 11, len(files)) // 1 file for full + 10 for 10 incrementals

// Assert that the restore processor is processing the same span multiple
// times, and the count is based on what's expected from the memory budget.
require.Equal(t, 3, int(restoreProcessorKnobCount.Load())) // Ceiling(11/5)

// Verify data in the restored table.
expectedFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data.bank")
actualFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data2.bank")
require.Equal(t, expectedFingerprints, actualFingerprints)
}

// TestRestoreMemoryMonitoringMinWorkerMemory tests that restore properly fails
// fast if there's not enough memory to reserve for the minimum number of
// workers.
func TestRestoreMemoryMonitoringMinWorkerMemory(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
const numAccounts = 100

args := base.TestServerArgs{
DefaultTestTenant: base.TestTenantDisabled,
}
params := base.TestClusterArgs{ServerArgs: args}
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
defer cleanupFn()

// 4 restore workers means we need minimum 2 workers to start restore.
sqlDB.Exec(t, "SET CLUSTER SETTING kv.bulk_io_write.restore_node_concurrency=4")
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.memory_monitor_ssts=true")

sqlDB.Exec(t, "BACKUP data.bank INTO 'userfile:///backup'")

// Set the budget to be 1 byte lower than minimum mem for 2 workers. This
// restore should fail.
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", 2*minWorkerMemReservation-1)
sqlDB.Exec(t, "CREATE DATABASE restore_fail")
sqlDB.ExpectErr(t, "insufficient memory", "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH into_db='restore_fail'")

// Set the budget to be equal to the minimum mem for 2 workers. The restore
// should succeed.
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.per_processor_memory_limit = $1", 2*minWorkerMemReservation)
sqlDB.Exec(t, "CREATE DATABASE restore")
sqlDB.Exec(t, "RESTORE data.bank FROM latest IN 'userfile:///backup' WITH into_db='restore'")

// Verify data in the restored table.
expectedFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE data.bank")
actualFingerprints := sqlDB.QueryStr(t, "SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE restore.bank")
require.Equal(t, expectedFingerprints, actualFingerprints)
}
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/backuppb/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ message RestoreProgress {
roachpb.RowCount summary = 1 [(gogoproto.nullable) = false];
int64 progressIdx = 2;
roachpb.Span dataSpan = 3 [(gogoproto.nullable) = false];
// CompleteUpTo is the timestamp that the data in DataSpan has been processed
// up to so far. The entire span has been processed if this timestamp is equal
// to restore end time.
util.hlc.Timestamp complete_up_to = 4 [(gogoproto.nullable) = false];
}

message BackupProcessorPlanningTraceEvent {
Expand Down
33 changes: 30 additions & 3 deletions pkg/ccl/backupccl/backuputils/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,12 +1,39 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "backuputils",
srcs = ["utils.go"],
srcs = [
"memory_backed_quota_pool.go",
"utils.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils",
visibility = ["//visibility:public"],
deps = ["//pkg/cloud"],
deps = [
"//pkg/cloud",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
)

go_test(
name = "backuputils_test",
srcs = ["memory_backed_quota_pool_test.go"],
args = ["-test.timeout=295s"],
embed = [":backuputils"],
tags = ["ccl_test"],
deps = [
"//pkg/settings/cluster",
"//pkg/util/leaktest",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)

get_x_data(name = "get_x_data")
Loading

0 comments on commit 84ed8ac

Please sign in to comment.