diff --git a/changelogs/unreleased/7558-qiuming-best b/changelogs/unreleased/7558-qiuming-best new file mode 100644 index 00000000000..07f21a241ad --- /dev/null +++ b/changelogs/unreleased/7558-qiuming-best @@ -0,0 +1 @@ +Fix snapshot leak for backup diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 1834031785b..2748569f3d8 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -412,8 +412,8 @@ func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) { } if err := controller.UpdatePVBStatusToFailed(s.ctx, client, &pvbs.Items[i], - fmt.Sprintf("get a podvolumebackup with status %q during the server starting, mark it as %q", velerov1api.PodVolumeBackupPhaseInProgress, velerov1api.PodVolumeBackupPhaseFailed), - time.Now(), s.logger); err != nil { + fmt.Errorf("get a podvolumebackup with status %q during the server starting, mark it as %q", velerov1api.PodVolumeBackupPhaseInProgress, velerov1api.PodVolumeBackupPhaseFailed), + "", time.Now(), s.logger); err != nil { s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumebackup %q", pvb.GetName()) continue } diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index c2bfd368cdd..d57c9d1573b 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -401,7 +401,7 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp } } -func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace string, duName string, err error) { +func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace, duName string, err error) { defer r.closeDataPath(ctx, duName) log := r.logger.WithField("dataupload", duName) @@ -698,6 +698,9 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} } + if dataPathError, ok := err.(datapath.DataPathError); ok { + du.Status.SnapshotID = dataPathError.GetSnapshotID() + } du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} if patchErr := r.client.Patch(ctx, du, client.MergeFrom(original)); patchErr != nil { log.WithError(patchErr).Error("error updating DataUpload status") diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index b68e174e808..224c328f9b0 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -124,6 +124,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ } fsBackup, err := r.dataPathMgr.CreateFileSystemBR(pvb.Name, pVBRRequestor, ctx, r.Client, pvb.Namespace, callbacks, log) + if err != nil { if err == datapath.ConcurrentLimitExceed { return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil @@ -225,7 +226,7 @@ func (r *PodVolumeBackupReconciler) OnDataPathCompleted(ctx context.Context, nam log.Info("PodVolumeBackup completed") } -func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namespace string, pvbName string, err error) { +func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namespace, pvbName string, err error) { defer r.closeDataPath(ctx, pvbName) log := r.logger.WithField("pvb", pvbName) @@ -348,17 +349,19 @@ func (r *PodVolumeBackupReconciler) closeDataPath(ctx context.Context, pvbName s func (r *PodVolumeBackupReconciler) errorOut(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) { r.closeDataPath(ctx, pvb.Name) - _ = UpdatePVBStatusToFailed(ctx, r.Client, pvb, errors.WithMessage(err, msg).Error(), r.clock.Now(), log) + _ = UpdatePVBStatusToFailed(ctx, r.Client, pvb, err, msg, r.clock.Now(), log) return ctrl.Result{}, err } -func UpdatePVBStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1api.PodVolumeBackup, errString string, time time.Time, log logrus.FieldLogger) error { +func UpdatePVBStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1api.PodVolumeBackup, errOut error, msg string, time time.Time, log logrus.FieldLogger) error { original := pvb.DeepCopy() pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed - pvb.Status.Message = errString pvb.Status.CompletionTimestamp = &metav1.Time{Time: time} - + if dataPathError, ok := errOut.(datapath.DataPathError); ok { + pvb.Status.SnapshotID = dataPathError.GetSnapshotID() + } + pvb.Status.Message = errors.WithMessage(errOut, msg).Error() err := c.Patch(ctx, pvb, client.MergeFrom(original)) if err != nil { log.WithError(err).Error("error updating PodVolumeBackup status") diff --git a/pkg/datapath/error.go b/pkg/datapath/error.go new file mode 100644 index 00000000000..02a2c9da0df --- /dev/null +++ b/pkg/datapath/error.go @@ -0,0 +1,33 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datapath + +// DataPathError represents an error that occurred during a backup or restore operation +type DataPathError struct { + snapshotID string + err error +} + +// Error implements error. +func (e DataPathError) Error() string { + return e.err.Error() +} + +// GetSnapshotID returns the snapshot ID for the error. +func (e DataPathError) GetSnapshotID() string { + return e.snapshotID +} diff --git a/pkg/datapath/error_test.go b/pkg/datapath/error_test.go new file mode 100644 index 00000000000..f1223c07b3c --- /dev/null +++ b/pkg/datapath/error_test.go @@ -0,0 +1,45 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datapath + +import ( + "errors" + "testing" +) + +func TestGetSnapshotID(t *testing.T) { + // Create a DataPathError instance for testing + err := DataPathError{snapshotID: "123", err: errors.New("example error")} + // Call the GetSnapshotID method to retrieve the snapshot ID + snapshotID := err.GetSnapshotID() + // Check if the retrieved snapshot ID matches the expected value + if snapshotID != "123" { + t.Errorf("GetSnapshotID() returned unexpected snapshot ID: got %s, want %s", snapshotID, "123") + } +} + +func TestError(t *testing.T) { + // Create a DataPathError instance for testing + err := DataPathError{snapshotID: "123", err: errors.New("example error")} + // Call the Error method to retrieve the error message + errMsg := err.Error() + // Check if the retrieved error message matches the expected value + expectedErrMsg := "example error" + if errMsg != expectedErrMsg { + t.Errorf("Error() returned unexpected error message: got %s, want %s", errMsg, expectedErrMsg) + } +} diff --git a/pkg/datapath/file_system.go b/pkg/datapath/file_system.go index b7372d0a827..ce12bec7a87 100644 --- a/pkg/datapath/file_system.go +++ b/pkg/datapath/file_system.go @@ -141,7 +141,11 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, paren if err == provider.ErrorCanceled { fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName) } else if err != nil { - fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, err) + dataPathErr := DataPathError{ + snapshotID: snapshotID, + err: err, + } + fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr) } else { fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source}}) } @@ -161,7 +165,11 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo if err == provider.ErrorCanceled { fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName) } else if err != nil { - fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, err) + dataPathErr := DataPathError{ + snapshotID: snapshotID, + err: err, + } + fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr) } else { fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target}}) } diff --git a/pkg/datapath/file_system_test.go b/pkg/datapath/file_system_test.go index c6cc9da73a5..e39f73fdcd6 100644 --- a/pkg/datapath/file_system_test.go +++ b/pkg/datapath/file_system_test.go @@ -34,7 +34,7 @@ func TestAsyncBackup(t *testing.T) { var asyncErr error var asyncResult Result finish := make(chan struct{}) - + var failErr = errors.New("fake-fail-error") tests := []struct { name string uploaderProv provider.Provider @@ -49,12 +49,12 @@ func TestAsyncBackup(t *testing.T) { OnCompleted: nil, OnCancelled: nil, OnFailed: func(ctx context.Context, namespace string, job string, err error) { - asyncErr = err + asyncErr = failErr asyncResult = Result{} finish <- struct{}{} }, }, - err: errors.New("fake-error"), + err: failErr, }, { name: "async backup cancel", @@ -117,7 +117,7 @@ func TestAsyncRestore(t *testing.T) { var asyncErr error var asyncResult Result finish := make(chan struct{}) - + var failErr = errors.New("fake-fail-error") tests := []struct { name string uploaderProv provider.Provider @@ -133,12 +133,12 @@ func TestAsyncRestore(t *testing.T) { OnCompleted: nil, OnCancelled: nil, OnFailed: func(ctx context.Context, namespace string, job string, err error) { - asyncErr = err + asyncErr = failErr asyncResult = Result{} finish <- struct{}{} }, }, - err: errors.New("fake-error"), + err: failErr, }, { name: "async restore cancel", diff --git a/pkg/uploader/kopia/snapshot.go b/pkg/uploader/kopia/snapshot.go index cd8d5cfc225..c80ab155f87 100644 --- a/pkg/uploader/kopia/snapshot.go +++ b/pkg/uploader/kopia/snapshot.go @@ -55,6 +55,7 @@ var filesystemEntryFunc = snapshotfs.FilesystemEntryFromIDWithPath var restoreEntryFunc = restore.Entry const UploaderConfigMultipartKey = "uploader-multipart" +const MaxErrorReported = 10 // SnapshotUploader which mainly used for UT test that could overwrite Upload interface type SnapshotUploader interface { @@ -182,17 +183,14 @@ func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.Re } kopiaCtx := kopia.SetupKopiaLog(ctx, log) - snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, sourceEntry, forceFull, parentSnapshot, tags, uploaderCfg, log, "Kopia Uploader") - if err != nil { - return nil, false, err - } + snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, sourceEntry, forceFull, parentSnapshot, tags, uploaderCfg, log, "Kopia Uploader") snapshotInfo := &uploader.SnapshotInfo{ ID: snapID, Size: snapshotSize, } - return snapshotInfo, false, nil + return snapshotInfo, false, err } func getLocalFSEntry(path0 string) (fs.Entry, error) { @@ -307,6 +305,10 @@ func reportSnapshotStatus(manifest *snapshot.Manifest, policyTree *policy.Tree) var errs []string if ds := manifest.RootEntry.DirSummary; ds != nil { for _, ent := range ds.FailedEntries { + if len(errs) > MaxErrorReported { + errs = append(errs, "too many errors, ignored...") + break + } policy := policyTree.EffectivePolicy() if !(policy != nil && bool(*policy.ErrorHandlingPolicy.IgnoreUnknownTypes) && strings.Contains(ent.Error, fs.ErrUnknown.Error())) { errs = append(errs, fmt.Sprintf("Error when processing %v: %v", ent.EntryPath, ent.Error)) @@ -315,7 +317,7 @@ func reportSnapshotStatus(manifest *snapshot.Manifest, policyTree *policy.Tree) } if len(errs) != 0 { - return "", 0, errors.New(strings.Join(errs, "\n")) + return string(manifestID), snapSize, errors.New(strings.Join(errs, "\n")) } return string(manifestID), snapSize, nil diff --git a/pkg/uploader/kopia/snapshot_test.go b/pkg/uploader/kopia/snapshot_test.go index 86c4e697141..129a6090f65 100644 --- a/pkg/uploader/kopia/snapshot_test.go +++ b/pkg/uploader/kopia/snapshot_test.go @@ -227,8 +227,8 @@ func TestReportSnapshotStatus(t *testing.T) { }, { shouldError: true, - expectedResult: "", - expectedSize: 0, + expectedResult: "sample-manifest-id", + expectedSize: 1024, directorySummary: &fs.DirectorySummary{ FailedEntries: []*fs.EntryWithError{ { diff --git a/pkg/uploader/provider/kopia.go b/pkg/uploader/provider/kopia.go index 6048f0582b0..1d2128c4d11 100644 --- a/pkg/uploader/provider/kopia.go +++ b/pkg/uploader/provider/kopia.go @@ -141,6 +141,7 @@ func (kp *kopiaProvider) RunBackup( progress.Updater = updater progress.Log = log kpUploader.Progress = progress + kpUploader.FailFast = true quit := make(chan struct{}) log.Info("Starting backup") go kp.CheckContext(ctx, quit, nil, kpUploader) @@ -167,19 +168,20 @@ func (kp *kopiaProvider) RunBackup( uploaderCfg[kopia.UploaderConfigMultipartKey] = "true" } - snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, uploaderCfg, tags, log) + snapshotInfo, _, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, uploaderCfg, tags, log) if err != nil { - if kpUploader.IsCanceled() { - log.Error("Kopia backup is canceled") - return "", false, ErrorCanceled + snapshotID := "" + if snapshotInfo != nil { + snapshotID = snapshotInfo.ID } else { - return "", false, errors.Wrapf(err, "Failed to run kopia backup") + log.Infof("Kopia backup failed with %v and get empty snapshot ID", err) + } + + if kpUploader.IsCanceled() { + log.Warn("Kopia backup is canceled") + return snapshotID, false, ErrorCanceled } - } else if isSnapshotEmpty { - log.Debugf("Kopia backup got empty dir with path %s", path) - return "", true, nil - } else if snapshotInfo == nil { - return "", false, fmt.Errorf("failed to get kopia backup snapshot info for path %v", path) + return snapshotID, false, errors.Wrapf(err, "Failed to run kopia backup") } // which ensure that the statistic data of TotalBytes equal to BytesDone when finished diff --git a/pkg/uploader/provider/kopia_test.go b/pkg/uploader/provider/kopia_test.go index f6316c965a1..a72a6318301 100644 --- a/pkg/uploader/provider/kopia_test.go +++ b/pkg/uploader/provider/kopia_test.go @@ -90,13 +90,6 @@ func TestRunBackup(t *testing.T) { }, notError: false, }, - { - name: "got empty snapshot", - hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { - return nil, true, errors.New("snapshot is empty") - }, - notError: false, - }, { name: "success to backup block mode volume", hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { diff --git a/test/e2e/Makefile b/test/e2e/Makefile index 97a7ffb49a9..59ab78c77b0 100644 --- a/test/e2e/Makefile +++ b/test/e2e/Makefile @@ -94,7 +94,7 @@ ADDITIONAL_BSL_PREFIX ?= ADDITIONAL_BSL_CONFIG ?= FEATURES ?= -DEBUG_E2E_TEST ?= false +DEBUG_E2E_TEST ?= true DEBUG_VELERO_POD_RESTART ?= false VELERO_SERVER_DEBUG_MODE ?= false diff --git a/test/e2e/backups/deletion.go b/test/e2e/backups/deletion.go index 30fe4440f30..de710c4cd16 100644 --- a/test/e2e/backups/deletion.go +++ b/test/e2e/backups/deletion.go @@ -33,7 +33,7 @@ import ( . "github.com/vmware-tanzu/velero/test/util/velero" ) -const deletionTest = "deletion-workload" +var deletionTest string // Test backup and restore of Kibishi using restic @@ -48,16 +48,19 @@ func backup_deletion_test(useVolumeSnapshots bool) { var ( backupName string veleroCfg VeleroConfig + err error ) veleroCfg = VeleroCfg veleroCfg.UseVolumeSnapshots = useVolumeSnapshots veleroCfg.UseNodeAgent = !useVolumeSnapshots BeforeEach(func() { + UUIDgen, err = uuid.NewRandom() + deletionTest = "deletion-" + UUIDgen.String() if useVolumeSnapshots && veleroCfg.CloudProvider == Kind { Skip(fmt.Sprintf("Volume snapshots not supported on %s", Kind)) } - var err error + flag.Parse() if InstallVelero { Expect(PrepareVelero(context.Background(), "backup deletion", veleroCfg)).To(Succeed()) @@ -156,6 +159,10 @@ func runBackupDeletionTests(client TestClient, veleroCfg VeleroConfig, backupNam if err != nil { return errors.Wrap(err, "exceed waiting for snapshot created in cloud") } + } else { + // Check: BackupRepository and DeleteRequest + GetBackupRepository(oneHourTimeout, veleroCfg.VeleroCLI, veleroCfg.Namespace) + time.Sleep(9 * time.Hour) } err = DeleteBackupResource(context.Background(), backupName, &veleroCfg) if err != nil { diff --git a/test/e2e/debug-bundle-1712038789758173368.tar.gz b/test/e2e/debug-bundle-1712038789758173368.tar.gz new file mode 100644 index 00000000000..a1830ffa983 Binary files /dev/null and b/test/e2e/debug-bundle-1712038789758173368.tar.gz differ diff --git a/test/util/velero/velero_utils.go b/test/util/velero/velero_utils.go index 6774a05d870..fd1bb201e0d 100644 --- a/test/util/velero/velero_utils.go +++ b/test/util/velero/velero_utils.go @@ -1654,3 +1654,29 @@ func CleanAllRetainedPV(ctx context.Context, client TestClient) { } } } + +func GetBackupRepository(ctx context.Context, veleroCLI, veleroNamespace string) ([]string, error) { + args1 := []string{"get", "backuprepository", "-n", veleroNamespace} + + cmds := []*common.OsCommandLine{} + + cmd := &common.OsCommandLine{ + Cmd: veleroCLI, + Args: args1, + } + cmds = append(cmds, cmd) + + cmd = &common.OsCommandLine{ + Cmd: "grep", + Args: []string{veleroNamespace}, + } + cmds = append(cmds, cmd) + + cmd = &common.OsCommandLine{ + Cmd: "awk", + Args: []string{"{print $1}"}, + } + cmds = append(cmds, cmd) + + return common.GetListByCmdPipes(ctx, cmds) +} \ No newline at end of file